Reality_Exploration/main.py

341 lines
12 KiB
Python

from __future__ import division
import time
from flirpy.camera.boson import Boson
# import torch
# import torch.nn as nn
# from torch.autograd import Variable
import numpy as np
import cv2
from matplotlib import pyplot as plt
from util import *
from darknet import Darknet
from preprocess import prep_image, inp_to_image, letterbox_image
import pickle as pkl
import argparse
from scipy.signal import savgol_filter
from pythonosc import udp_client
from pythonosc import osc_server
from pythonosc import dispatcher
import threading
# import sounddevice as sd
import soundfile as sf
from playsound import playsound
client = udp_client.SimpleUDPClient('192.168.21.32', 10000 )
# client1 = udp_client.SimpleUDPClient('192.168.21.200', 30000)
client1 = udp_client.SimpleUDPClient('192.168.21.32', 15000)
client_is_person_present = udp_client.SimpleUDPClient('192.168.21.200', 20000)
# data, fs = sf.read('bell_short.wav', dtype='float32')
# sd.play(data.T, fs, device=2)
print("""
Choose Object detection:
Nose = 'n'
Face = 'f'
""")
detection_model = input()
if detection_model == 'f':
weight_file = "data/yolov3-wider_face_16000.weights"
cfg_file = "cfg/yolov3-face.cfg"
print("Activating FACE Detection")
else:
weight_file = "data/yolov3_training_final.weights"
cfg_file = "cfg/yolov3_pysource_testing.cfg"
print("Activating NOSE Detection")
def get_test_input(input_dim, CUDA):
img = cv2.imread("data/dog-cycle-car.png")
img = cv2.resize(img, (input_dim, input_dim))
img_ = img[:,:,::-1].transpose((2,0,1))
img_ = img_[np.newaxis,:,:,:]/255.0
img_ = torch.from_numpy(img_).float()
img_ = Variable(img_)
if CUDA:
img_ = img_.cuda()
return img_
def prep_image(img, inp_dim):
"""
Prepare image for inputting to the neural network.Returns a Variable
"""
orig_im = img
dim = orig_im.shape[1], orig_im.shape[0]
img = (letterbox_image(orig_im, (inp_dim, inp_dim)))
img_ = img[:,:,::-1].transpose((2,0,1)).copy()
img_ = torch.from_numpy(img_).float().div(255.0).unsqueeze(0)
return img_, orig_im, dim
prev_vals = []
inhale_val = 0
exhale_val = 0
is_val_up = False
is_exaling = False
Is_person_present = 0
def write(x, img, abs_image):
global prev_vals, inhale_val, exhale_val, Is_person_present
c1 = tuple(x[1:3].int())
c2 = tuple(x[3:5].int())
cls = int(x[-1])
label = "person"#"{0}".format(classes[cls])
color = (0, 0, 255)#random.choice(colors)
if (label=='person'):
if c1[0]>10 and c1[0]<600:
client_is_person_present.send_message("/python/person", 1)
else:
client1.send_message("/sound", (0,0,0))
if detection_model == 'f':
cv2.rectangle(img, c1, c2, (0,0,0), 2)
c1 = tuple([c1[0]+20, c1[1]+25])
c2 = tuple([c2[0]-15, c2[1]-15])
cropped_abs = abs_image[c1[1]:c2[1], c1[0]:c2[0]]
cropped_abs = np.ndarray.flatten(cropped_abs)
number_of_min_pixl = int(len(cropped_abs) * 0.1)
idx = np.argpartition(cropped_abs, number_of_min_pixl)
cv2.rectangle(img, c1, c2, color, 2)
else:
cropped_abs = abs_image[c1[1]:c2[1], c1[0]:c2[0]]
cropped_abs = np.ndarray.flatten(cropped_abs)
number_of_min_pixl = int(len(cropped_abs)*0.1)
idx = np.argpartition(cropped_abs, number_of_min_pixl)
cv2.rectangle(img, c1, c2,color, 2)
if cropped_abs.any():
# obs = np.min(cropped_abs)
obs = np.mean(cropped_abs[idx[:number_of_min_pixl]])
if len(prev_vals) >= 100:
prev_vals = prev_vals[1:]
prev_vals.append(obs)
sav_filtered = savgol_filter(prev_vals, 11, 3)
prev_vals_mean = np.ones(100) * np.mean(sav_filtered[40:])#((np.max(sav_filtered[40:]) - np.min(sav_filtered[40:]))/2 + np.min(sav_filtered[40:]))#
# print(prev_vals_mean)
# if abs(obs - prev_vals_mean[1])< 10 and sav_filtered[-5]<obs:
# client1.send_message("/sound", (0, 0.5, 0))
# print("Inhale")
# elif abs(obs - prev_vals_mean[1])< 10 and sav_filtered[-5]>obs:
# client1.send_message("/sound", (0, 0, 0))
# print("Exhale")
if sav_filtered[-1] > prev_vals_mean[-1]:
exhale_val = abs(sav_filtered[-1] - prev_vals_mean[-1])
if exhale_val>0.1:
inhale_val = 0
# print('Exhaling')
exhale_val = exhale_val/(160)
client.send_message("/chan", (inhale_val, exhale_val))
# if is_exaling == True:
# sd.play(data, fs, device=1)
# print(exhale_val)
# ////////////// Sound
# exhale_val = exhale_val + 0.20
if c1[0] > 10 and c1[0] < 600:
client1.send_message("/sound", (1, inhale_val, exhale_val))
else:
client1.send_message("/sound", (0,inhale_val, exhale_val))
# //////////////
else:
inhale_val = abs(prev_vals_mean[-1] - sav_filtered[-1])
if inhale_val>0.1:
exhale_val = 0
# print('Inhaling')
# ////////////Sound
inhale_val = inhale_val/(160)
client.send_message("/chan", (inhale_val, exhale_val))
if c1[0] > 10 and c1[0] < 600:
client1.send_message("/sound", (1, inhale_val, exhale_val))
else:
client1.send_message("/sound", (0, inhale_val, exhale_val))
# //////////////
cv2.rectangle(img, (20,140),(40,150+ int(inhale_val*50)),(255,255,255),2)
cv2.rectangle(img, (80,140),(100,150+ int(exhale_val*50)),(255,255,255),2)
# plt.subplot(211)
# plt.plot(sav_filtered)
# plt.plot(prev_vals_mean)
#
#
# plt.subplot(212)
# plt.bar(['Inhale', 'exhale'], [inhale_val, exhale_val])
# plt.ylim([0,1])
#
#
# plt.draw()
# plt.pause(0.0011)
# plt.clf()
else:
prev_vals.append(obs)
t_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_PLAIN, 1 , 1)[0]
c2 = c1[0] + t_size[0] + 3, c1[1] + t_size[1] + 4
return img
def arg_parse():
parser = argparse.ArgumentParser(description='YOLO v3 Video Detection Module')
parser.add_argument("--video", dest = 'video', help =
"Video to run detection upon",
default = "video.avi", type = str)
parser.add_argument("--dataset", dest = "dataset", help = "Dataset on which the network has been trained", default = "pascal")
parser.add_argument("--confidence", dest = "confidence", help = "Object Confidence to filter predictions", default = 0.5)
parser.add_argument("--nms_thresh", dest = "nms_thresh", help = "NMS Threshhold", default = 0.4)
# parser.add_argument("--cfg", dest = 'cfgfile', help =
# "Config file",
# default = "cfg/yolov3_pysource_testing.cfg", type = str)#cfg/yolov3.cfg yolov3_pysource_testing.cfg
# parser.add_argument("--weights", dest = 'weightsfile', help =
# "weightsfile",
# default = "data/yolov3_training_final_17_10_2020.weights", type = str)#yolov3.weights yolov3-wider_face_16000 yolov3_training_final_17_10_2020.weights
parser.add_argument("--reso", dest = 'reso', help =
"Input resolution of the network. Increase to increase accuracy. Decrease to increase speed",
default = "416", type = str)
return parser.parse_args()
val_reset = False
def osc_command_receiver(unused_addr):
global val_reset
val_reset = True
print('Reset')
if __name__ == '__main__':
args = arg_parse()
confidence = float(args.confidence)
nms_thesh = float(args.nms_thresh)
start = 0
CUDA = torch.cuda.is_available()
num_classes = 1
CUDA = torch.cuda.is_available()
bbox_attrs = 5 + num_classes
# print("Loading Face Landmark detector.....")
print("Loading network.....")
model = Darknet(cfg_file)#args.cfgfile)
model.load_weights(weight_file)#args.weightsfile)
print("Network successfully loaded")
print(CUDA)
model.net_info["height"] = args.reso
inp_dim = int(model.net_info["height"])
assert inp_dim % 32 == 0
assert inp_dim > 32
if CUDA:
model.cuda()
print("x")
model(get_test_input(inp_dim, CUDA), CUDA)
model.eval()
videofile = args.video
# cap = cv2.VideoCapture("vids/breathing_cap.mov")#'http://192.168.21.200:81/')
# cap = cv2.VideoCapture(0)
camera = Boson()
# assert cap.isOpened(), 'Cannot capture source'
frames = 0
start = time.time()
dispatcher = dispatcher.Dispatcher()
dispatcher.map("/python/reset", osc_command_receiver)
server = osc_server.ThreadingOSCUDPServer(('192.168.21.87', 12000), dispatcher)
thread = threading.Thread(target=server.serve_forever)
thread.start()
while True:
image = camera.grab()
img = (image - np.min(image)) / (np.max(image) - np.min(image)) * 255
img = img.astype(np.uintc)
frame = np.uint8(img)
frame= cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR)
if len(image):
img, orig_im, dim = prep_image(frame, inp_dim)
im_dim = torch.FloatTensor(dim).repeat(1,2)
if CUDA:
im_dim = im_dim.cuda()
img = img.cuda()
with torch.no_grad():
output = model(Variable(img), CUDA)
output = write_results(output, confidence, num_classes, nms = True, nms_conf = nms_thesh)
if type(output) == int:
cv2.imshow("frame", orig_im)
key = cv2.waitKey(1)
if key & 0xFF == ord('q'):
break
continue
im_dim = im_dim.repeat(output.size(0), 1)
scaling_factor = torch.min(inp_dim/im_dim,1)[0].view(-1,1)
output[:,[1,3]] -= (inp_dim - scaling_factor*im_dim[:,0].view(-1,1))/2
output[:,[2,4]] -= (inp_dim - scaling_factor*im_dim[:,1].view(-1,1))/2
output[:,1:5] /= scaling_factor
for i in range(output.shape[0]):
output[i, [1,3]] = torch.clamp(output[i, [1,3]], 0.0, im_dim[i,0])
output[i, [2,4]] = torch.clamp(output[i, [2,4]], 0.0, im_dim[i,1])
classes = load_classes('data/coco.names')
colors = pkl.load(open("data/pallete", "rb"))
list(map(lambda x: write(x, orig_im,image), output))
cv2.imshow("frame", orig_im)
key = cv2.waitKey(1)
if key & 0xFF == ord('r') or val_reset == True:
print("______Resetting Breathing Values_______")
prev_vals.clear()
val_reset = False
plt.draw()
plt.pause(0.0011)
plt.clf()
if key & 0xFF == ord('q'):
print("Closing Down the program")
camera.close()
plt.close()
cv2.destroyAllWindows()
break
frames += 1
if frames % 100 == 0:
print("FPS of the video is {:5.2f}".format( frames / (time.time() - start)))
else:
break