#from helpfunctions import * import numpy as np import time class OSCReceive: from pythonosc import dispatcher from pythonosc import osc_server import threading def handle_received_osc_joint(self,address, *args): self.server=None self.server_thread=None self.oscreceiveintervals[self.oscreceiveintervals_pos]=time.time()-self.last_oscreceive self.last_oscreceive=time.time() self.oscreceiveintervals_pos+=1 self.oscreceiveintervals_pos%=np.size(self.oscreceiveintervals) #print(f"Received OSC message on {address}") joint_q_aim=[0,0,0,0,0,0] try: for iarg, arg in enumerate(args): #print(f"Data: {arg}") joint_q_aim[iarg]=float(arg) except: print("Data not in right format") print("Data: "+str(args)) else: if self.ur is not None: self.ur.setJointQAim(joint_q_aim) def getCurrentReceiveFrequency(self): return 1.0/np.mean(self.oscreceiveintervals) def __init__(self,ip,port,_ur=None): #osc receive statistics self.oscreceiveintervals = np.zeros(100) self.oscreceiveintervals_pos=0 self.last_oscreceive=0 self.ur = _ur # Set up the dispatcher for the server disp = self.dispatcher.Dispatcher() disp.map("/joint", self.handle_received_osc_joint) # You can change '/filter' to any address pattern you expect # Set up OSC server self.server = self.osc_server.ThreadingOSCUDPServer((ip, port), disp) print(f"Serving on {self.server.server_address}") self.server_thread = self.threading.Thread(target=self.server.serve_forever) self.server_thread.start() def disconnect(self): self.server.shutdown() self.server_thread.join()