from helpfunctions import * import numpy as np import time class OSCReceive: from pythonosc import dispatcher from pythonosc import osc_server import threading def __init__(self,ip,port,_oscsend,_ur=None): self.server=None self.server_thread=None #osc receive statistics self.oscreceiveintervals = np.zeros(100) self.oscreceiveintervals_pos=0 self.last_oscreceive=0 self.ur = _ur self.oscsend = _oscsend self.debug="Nothing" # Set up the dispatcher for the server disp = self.dispatcher.Dispatcher() disp.map("/joints", self.handle_received_osc_joint) disp.map("/invkin", self.handle_received_osc_invkin) #just calculate inverse kinematics disp.map("/movetcp", self.handle_received_osc_tcp) #calculaate inverse kinematics and move # Set up OSC server self.server = self.osc_server.ThreadingOSCUDPServer((ip, port), disp) print(f"Serving on {self.server.server_address}") self.server_thread = self.threading.Thread(target=self.server.serve_forever) self.server_thread.start() def handle_received_osc_joint(self,address, *args): self.oscreceiveintervals[self.oscreceiveintervals_pos]=time.time()-self.last_oscreceive self.last_oscreceive=time.time() self.oscreceiveintervals_pos+=1 self.oscreceiveintervals_pos%=np.size(self.oscreceiveintervals) #print(f"Received OSC message on {address}") joint_q_aim=[0,0,0,0,0,0] try: for iarg, arg in enumerate(args): #print(f"Data: {arg}") joint_q_aim[iarg]=toRad(float(arg)) self.debug=str(joint_q_aim) except: print("Data not in right format") print("Data: "+str(args)) else: if self.ur is not None: self.ur.setJointQAim(joint_q_aim) def handle_received_osc_invkin(self,address, *args): tcp_aim=[0,0,0,0,0,0] try: for iarg, arg in enumerate(args): #print(f"Data: {arg}") tcp_aim[iarg]=float(arg) except: print("Data not in right format") print("Data: "+str(args)) self.debug="Data not in right format: "+str(args) else: if self.ur is not None: self.debug="tcp_aim"+str(tcp_aim) #_rot=RPYtoVec(toRad(tcp_aim[3]),toRad(tcp_aim[4]),toRad(tcp_aim[5])) _rot=toRad(tcp_aim[3]),toRad(tcp_aim[4]),toRad(tcp_aim[5]) pos_q=[tcp_aim[0],tcp_aim[1],tcp_aim[2],_rot[0],_rot[1],_rot[2]] joint_q_ik=self.ur.getInverseKinematics(pos_q) self.oscsend.sendMessage("/invkin/j",joint_q_ik) self.debug="Send joints"+str(joint_q_ik) def getDebug(self): return self.debug def handle_received_osc_tcp(self,address, *args): self.oscreceiveintervals[self.oscreceiveintervals_pos]=time.time()-self.last_oscreceive self.last_oscreceive=time.time() self.oscreceiveintervals_pos+=1 self.oscreceiveintervals_pos%=np.size(self.oscreceiveintervals) tcp_aim=[0,0,0,0,0,0] try: for iarg, arg in enumerate(args): #print(f"Data: {arg}") tcp_aim[iarg]=float(arg) except: print("Data not in right format") print("Data: "+str(args)) else: if self.ur is not None: _rot=RPYtoVec(toRad(tcp_aim[3]),toRad(tcp_aim[4]),toRad(tcp_aim[5])) pos_q=[tcp_aim[0],tcp_aim[1],tcp_aim[2],_rot[0],_rot[1],_rot[2]] joint_q_ik=self.ur.getInverseKinematics(pos_q) joint_q_aim=joint_q_ik self.ur.setJointQAim(joint_q_aim) def getCurrentReceiveFrequency(self): _mean=np.mean(self.oscreceiveintervals) if _mean>0: return 1.0/_mean else: return 0 def disconnect(self): self.server.shutdown() self.server_thread.join()