from __future__ import division import time from flirpy.camera.boson import Boson # import torch # import torch.nn as nn # from torch.autograd import Variable import numpy as np import cv2 from matplotlib import pyplot as plt from util import * from darknet import Darknet from preprocess import prep_image, inp_to_image, letterbox_image import pickle as pkl import argparse from scipy.signal import savgol_filter from pythonosc import udp_client from pythonosc import osc_server from pythonosc import dispatcher import threading # import sounddevice as sd import soundfile as sf from playsound import playsound client = udp_client.SimpleUDPClient('192.168.21.32', 10000 ) # client1 = udp_client.SimpleUDPClient('192.168.21.200', 30000) client1 = udp_client.SimpleUDPClient('192.168.21.32', 15000) client_is_person_present = udp_client.SimpleUDPClient('192.168.21.200', 20000) # data, fs = sf.read('bell_short.wav', dtype='float32') # sd.play(data.T, fs, device=2) print(""" Choose Object detection: Nose = 'n' Face = 'f' """) detection_model = input() if detection_model == 'f': weight_file = "data/yolov3-wider_face_16000.weights" cfg_file = "cfg/yolov3-face.cfg" print("Activating FACE Detection") else: weight_file = "data/yolov3_training_final.weights" cfg_file = "cfg/yolov3_pysource_testing.cfg" print("Activating NOSE Detection") def get_test_input(input_dim, CUDA): img = cv2.imread("data/dog-cycle-car.png") img = cv2.resize(img, (input_dim, input_dim)) img_ = img[:,:,::-1].transpose((2,0,1)) img_ = img_[np.newaxis,:,:,:]/255.0 img_ = torch.from_numpy(img_).float() img_ = Variable(img_) if CUDA: img_ = img_.cuda() return img_ def prep_image(img, inp_dim): """ Prepare image for inputting to the neural network.Returns a Variable """ orig_im = img dim = orig_im.shape[1], orig_im.shape[0] img = (letterbox_image(orig_im, (inp_dim, inp_dim))) img_ = img[:,:,::-1].transpose((2,0,1)).copy() img_ = torch.from_numpy(img_).float().div(255.0).unsqueeze(0) return img_, orig_im, dim prev_vals = [] inhale_val = 0 exhale_val = 0 is_val_up = False is_exaling = False Is_person_present = 0 def write(x, img, abs_image): global prev_vals, inhale_val, exhale_val, Is_person_present c1 = tuple(x[1:3].int()) c2 = tuple(x[3:5].int()) cls = int(x[-1]) label = "person"#"{0}".format(classes[cls]) color = (0, 0, 255)#random.choice(colors) if (label=='person'): if c1[0]>10 and c1[0]<600: client_is_person_present.send_message("/python/person", 1) else: client1.send_message("/sound", (0,0,0)) if detection_model == 'f': cv2.rectangle(img, c1, c2, (0,0,0), 2) c1 = tuple([c1[0]+20, c1[1]+25]) c2 = tuple([c2[0]-15, c2[1]-15]) cropped_abs = abs_image[c1[1]:c2[1], c1[0]:c2[0]] cropped_abs = np.ndarray.flatten(cropped_abs) number_of_min_pixl = int(len(cropped_abs) * 0.1) idx = np.argpartition(cropped_abs, number_of_min_pixl) cv2.rectangle(img, c1, c2, color, 2) else: cropped_abs = abs_image[c1[1]:c2[1], c1[0]:c2[0]] cropped_abs = np.ndarray.flatten(cropped_abs) number_of_min_pixl = int(len(cropped_abs)*0.1) idx = np.argpartition(cropped_abs, number_of_min_pixl) cv2.rectangle(img, c1, c2,color, 2) if cropped_abs.any(): # obs = np.min(cropped_abs) obs = np.mean(cropped_abs[idx[:number_of_min_pixl]]) if len(prev_vals) >= 100: prev_vals = prev_vals[1:] prev_vals.append(obs) sav_filtered = savgol_filter(prev_vals, 11, 3) prev_vals_mean = np.ones(100) * np.mean(sav_filtered[40:])#((np.max(sav_filtered[40:]) - np.min(sav_filtered[40:]))/2 + np.min(sav_filtered[40:]))# # print(prev_vals_mean) # if abs(obs - prev_vals_mean[1])< 10 and sav_filtered[-5]obs: # client1.send_message("/sound", (0, 0, 0)) # print("Exhale") if sav_filtered[-1] > prev_vals_mean[-1]: exhale_val = abs(sav_filtered[-1] - prev_vals_mean[-1]) if exhale_val>0.1: inhale_val = 0 # print('Exhaling') exhale_val = exhale_val/(160) client.send_message("/chan", (inhale_val, exhale_val)) # if is_exaling == True: # sd.play(data, fs, device=1) # print(exhale_val) # ////////////// Sound # exhale_val = exhale_val + 0.20 if c1[0] > 10 and c1[0] < 600: client1.send_message("/sound", (1, inhale_val, exhale_val)) else: client1.send_message("/sound", (0,inhale_val, exhale_val)) # ////////////// else: inhale_val = abs(prev_vals_mean[-1] - sav_filtered[-1]) if inhale_val>0.1: exhale_val = 0 # print('Inhaling') # ////////////Sound inhale_val = inhale_val/(160) client.send_message("/chan", (inhale_val, exhale_val)) if c1[0] > 10 and c1[0] < 600: client1.send_message("/sound", (1, inhale_val, exhale_val)) else: client1.send_message("/sound", (0, inhale_val, exhale_val)) # ////////////// cv2.rectangle(img, (20,140),(40,150+ int(inhale_val*50)),(255,255,255),2) cv2.rectangle(img, (80,140),(100,150+ int(exhale_val*50)),(255,255,255),2) # plt.subplot(211) # plt.plot(sav_filtered) # plt.plot(prev_vals_mean) # # # plt.subplot(212) # plt.bar(['Inhale', 'exhale'], [inhale_val, exhale_val]) # plt.ylim([0,1]) # # # plt.draw() # plt.pause(0.0011) # plt.clf() else: prev_vals.append(obs) t_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_PLAIN, 1 , 1)[0] c2 = c1[0] + t_size[0] + 3, c1[1] + t_size[1] + 4 return img def arg_parse(): parser = argparse.ArgumentParser(description='YOLO v3 Video Detection Module') parser.add_argument("--video", dest = 'video', help = "Video to run detection upon", default = "video.avi", type = str) parser.add_argument("--dataset", dest = "dataset", help = "Dataset on which the network has been trained", default = "pascal") parser.add_argument("--confidence", dest = "confidence", help = "Object Confidence to filter predictions", default = 0.5) parser.add_argument("--nms_thresh", dest = "nms_thresh", help = "NMS Threshhold", default = 0.4) # parser.add_argument("--cfg", dest = 'cfgfile', help = # "Config file", # default = "cfg/yolov3_pysource_testing.cfg", type = str)#cfg/yolov3.cfg yolov3_pysource_testing.cfg # parser.add_argument("--weights", dest = 'weightsfile', help = # "weightsfile", # default = "data/yolov3_training_final_17_10_2020.weights", type = str)#yolov3.weights yolov3-wider_face_16000 yolov3_training_final_17_10_2020.weights parser.add_argument("--reso", dest = 'reso', help = "Input resolution of the network. Increase to increase accuracy. Decrease to increase speed", default = "416", type = str) return parser.parse_args() val_reset = False def osc_command_receiver(unused_addr): global val_reset val_reset = True print('Reset') if __name__ == '__main__': args = arg_parse() confidence = float(args.confidence) nms_thesh = float(args.nms_thresh) start = 0 CUDA = torch.cuda.is_available() num_classes = 1 CUDA = torch.cuda.is_available() bbox_attrs = 5 + num_classes # print("Loading Face Landmark detector.....") print("Loading network.....") model = Darknet(cfg_file)#args.cfgfile) model.load_weights(weight_file)#args.weightsfile) print("Network successfully loaded") print(CUDA) model.net_info["height"] = args.reso inp_dim = int(model.net_info["height"]) assert inp_dim % 32 == 0 assert inp_dim > 32 if CUDA: model.cuda() print("x") model(get_test_input(inp_dim, CUDA), CUDA) model.eval() videofile = args.video # cap = cv2.VideoCapture("vids/breathing_cap.mov")#'http://192.168.21.200:81/') # cap = cv2.VideoCapture(0) camera = Boson() # assert cap.isOpened(), 'Cannot capture source' frames = 0 start = time.time() dispatcher = dispatcher.Dispatcher() dispatcher.map("/python/reset", osc_command_receiver) server = osc_server.ThreadingOSCUDPServer(('192.168.21.87', 12000), dispatcher) thread = threading.Thread(target=server.serve_forever) thread.start() while True: image = camera.grab() img = (image - np.min(image)) / (np.max(image) - np.min(image)) * 255 img = img.astype(np.uintc) frame = np.uint8(img) frame= cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR) if len(image): img, orig_im, dim = prep_image(frame, inp_dim) im_dim = torch.FloatTensor(dim).repeat(1,2) if CUDA: im_dim = im_dim.cuda() img = img.cuda() with torch.no_grad(): output = model(Variable(img), CUDA) output = write_results(output, confidence, num_classes, nms = True, nms_conf = nms_thesh) if type(output) == int: cv2.imshow("frame", orig_im) key = cv2.waitKey(1) if key & 0xFF == ord('q'): break continue im_dim = im_dim.repeat(output.size(0), 1) scaling_factor = torch.min(inp_dim/im_dim,1)[0].view(-1,1) output[:,[1,3]] -= (inp_dim - scaling_factor*im_dim[:,0].view(-1,1))/2 output[:,[2,4]] -= (inp_dim - scaling_factor*im_dim[:,1].view(-1,1))/2 output[:,1:5] /= scaling_factor for i in range(output.shape[0]): output[i, [1,3]] = torch.clamp(output[i, [1,3]], 0.0, im_dim[i,0]) output[i, [2,4]] = torch.clamp(output[i, [2,4]], 0.0, im_dim[i,1]) classes = load_classes('data/coco.names') colors = pkl.load(open("data/pallete", "rb")) list(map(lambda x: write(x, orig_im,image), output)) cv2.imshow("frame", orig_im) key = cv2.waitKey(1) if key & 0xFF == ord('r') or val_reset == True: print("______Resetting Breathing Values_______") prev_vals.clear() val_reset = False plt.draw() plt.pause(0.0011) plt.clf() if key & 0xFF == ord('q'): print("Closing Down the program") camera.close() plt.close() cv2.destroyAllWindows() break frames += 1 if frames % 100 == 0: print("FPS of the video is {:5.2f}".format( frames / (time.time() - start))) else: break