urcontrol/urosc/oscreceive.py

72 lines
2.1 KiB
Python

from helpfunctions import *
import numpy as np
import time
class OSCReceive:
from pythonosc import dispatcher
from pythonosc import osc_server
import threading
def handle_received_osc_joint(self,address, *args):
self.server=None
self.server_thread=None
self.oscreceiveintervals[self.oscreceiveintervals_pos]=time.time()-self.last_oscreceive
self.last_oscreceive=time.time()
self.oscreceiveintervals_pos+=1
self.oscreceiveintervals_pos%=np.size(self.oscreceiveintervals)
#print(f"Received OSC message on {address}")
joint_q_aim=[0,0,0,0,0,0]
try:
for iarg, arg in enumerate(args):
#print(f"Data: {arg}")
joint_q_aim[iarg]=toRad(float(arg))
except:
print("Data not in right format")
print("Data: "+str(args))
else:
if self.ur is not None:
self.ur.setJointQAim(joint_q_aim)
def getCurrentReceiveFrequency(self):
_mean=np.mean(self.oscreceiveintervals)
if _mean>0:
return 1.0/_mean
else:
return 0
def __init__(self,ip,port,_ur=None):
#osc receive statistics
self.oscreceiveintervals = np.zeros(100)
self.oscreceiveintervals_pos=0
self.last_oscreceive=0
self.ur = _ur
# Set up the dispatcher for the server
disp = self.dispatcher.Dispatcher()
disp.map("/joints", self.handle_received_osc_joint) # You can change '/filter' to any address pattern you expect
# Set up OSC server
self.server = self.osc_server.ThreadingOSCUDPServer((ip, port), disp)
print(f"Serving on {self.server.server_address}")
self.server_thread = self.threading.Thread(target=self.server.serve_forever)
self.server_thread.start()
def disconnect(self):
if self.server is not None:
self.server.shutdown()
if self.server_thread is not None:
self.server_thread.join()