urcontrol/urosc/oscreceive.py

130 lines
4.2 KiB
Python
Raw Permalink Normal View History

from helpfunctions import *
2024-05-10 09:17:19 +00:00
import numpy as np
import time
class OSCReceive:
from pythonosc import dispatcher
from pythonosc import osc_server
import threading
2024-05-10 15:18:08 +00:00
def __init__(self,ip,port,_oscsend,_ur=None):
2024-05-10 09:17:19 +00:00
self.server=None
self.server_thread=None
2024-05-10 15:18:08 +00:00
#osc receive statistics
self.oscreceiveintervals = np.zeros(100)
self.oscreceiveintervals_pos=0
self.last_oscreceive=0
self.ur = _ur
self.oscsend = _oscsend
self.debug="Nothing"
# Set up the dispatcher for the server
disp = self.dispatcher.Dispatcher()
disp.map("/joints", self.handle_received_osc_joint)
disp.map("/invkin", self.handle_received_osc_invkin) #just calculate inverse kinematics
disp.map("/movetcp", self.handle_received_osc_tcp) #calculaate inverse kinematics and move
# Set up OSC server
self.server = self.osc_server.ThreadingOSCUDPServer((ip, port), disp)
print(f"Serving on {self.server.server_address}")
self.server_thread = self.threading.Thread(target=self.server.serve_forever)
self.server_thread.start()
def handle_received_osc_joint(self,address, *args):
2024-05-10 09:17:19 +00:00
self.oscreceiveintervals[self.oscreceiveintervals_pos]=time.time()-self.last_oscreceive
self.last_oscreceive=time.time()
self.oscreceiveintervals_pos+=1
self.oscreceiveintervals_pos%=np.size(self.oscreceiveintervals)
#print(f"Received OSC message on {address}")
joint_q_aim=[0,0,0,0,0,0]
try:
for iarg, arg in enumerate(args):
#print(f"Data: {arg}")
joint_q_aim[iarg]=toRad(float(arg))
2024-05-10 15:18:08 +00:00
self.debug=str(joint_q_aim)
except:
print("Data not in right format")
print("Data: "+str(args))
else:
if self.ur is not None:
self.ur.setJointQAim(joint_q_aim)
def handle_received_osc_invkin(self,address, *args):
tcp_aim=[0,0,0,0,0,0]
try:
for iarg, arg in enumerate(args):
#print(f"Data: {arg}")
tcp_aim[iarg]=float(arg)
2024-05-10 09:17:19 +00:00
except:
print("Data not in right format")
print("Data: "+str(args))
2024-05-10 15:18:08 +00:00
self.debug="Data not in right format: "+str(args)
2024-05-10 09:17:19 +00:00
else:
if self.ur is not None:
2024-05-10 15:18:08 +00:00
self.debug="tcp_aim"+str(tcp_aim)
#_rot=RPYtoVec(toRad(tcp_aim[3]),toRad(tcp_aim[4]),toRad(tcp_aim[5]))
_rot=toRad(tcp_aim[3]),toRad(tcp_aim[4]),toRad(tcp_aim[5])
pos_q=[tcp_aim[0],tcp_aim[1],tcp_aim[2],_rot[0],_rot[1],_rot[2]]
joint_q_ik=self.ur.getInverseKinematics(pos_q)
self.oscsend.sendMessage("/invkin/j",joint_q_ik)
self.debug="Send joints"+str(joint_q_ik)
def getDebug(self):
return self.debug
def handle_received_osc_tcp(self,address, *args):
self.oscreceiveintervals[self.oscreceiveintervals_pos]=time.time()-self.last_oscreceive
self.last_oscreceive=time.time()
self.oscreceiveintervals_pos+=1
self.oscreceiveintervals_pos%=np.size(self.oscreceiveintervals)
tcp_aim=[0,0,0,0,0,0]
try:
for iarg, arg in enumerate(args):
#print(f"Data: {arg}")
tcp_aim[iarg]=float(arg)
except:
print("Data not in right format")
print("Data: "+str(args))
else:
if self.ur is not None:
_rot=RPYtoVec(toRad(tcp_aim[3]),toRad(tcp_aim[4]),toRad(tcp_aim[5]))
pos_q=[tcp_aim[0],tcp_aim[1],tcp_aim[2],_rot[0],_rot[1],_rot[2]]
joint_q_ik=self.ur.getInverseKinematics(pos_q)
joint_q_aim=joint_q_ik
2024-05-10 09:17:19 +00:00
self.ur.setJointQAim(joint_q_aim)
2024-05-10 15:18:08 +00:00
2024-05-10 09:17:19 +00:00
def getCurrentReceiveFrequency(self):
_mean=np.mean(self.oscreceiveintervals)
if _mean>0:
return 1.0/_mean
else:
return 0
2024-05-10 09:17:19 +00:00
def disconnect(self):
2024-05-10 15:18:08 +00:00
self.server.shutdown()
self.server_thread.join()