implement tcp pose

This commit is contained in:
Philipp Kramer 2024-05-10 17:18:08 +02:00
parent 7899b73f40
commit 2774662b6b
6 changed files with 168 additions and 43 deletions

View file

@ -1,5 +1,5 @@
import math import math
import numpy as np
def toRad(d): def toRad(d):
return d/360*2*math.pi return d/360*2*math.pi
@ -14,3 +14,35 @@ def mapFromTo(x,a,b,c,d):
def constrain(v,_min,_max): def constrain(v,_min,_max):
return min(_max,max(_min,v)) return min(_max,max(_min,v))
def RPYtoVec(roll,pitch,yaw):
yawMatrix = np.matrix([
[math.cos(yaw), -math.sin(yaw), 0],
[math.sin(yaw), math.cos(yaw), 0],
[0, 0, 1]
])
pitchMatrix = np.matrix([
[math.cos(pitch), 0, math.sin(pitch)],
[0, 1, 0],
[-math.sin(pitch), 0, math.cos(pitch)]
])
rollMatrix = np.matrix([
[1, 0, 0],
[0, math.cos(roll), -math.sin(roll)],
[0, math.sin(roll), math.cos(roll)]
])
R = yawMatrix * pitchMatrix * rollMatrix
theta = math.acos(((R[0, 0] + R[1, 1] + R[2, 2]) - 1) / 2)
multi = 1 / (2 * math.sin(theta))
rx = multi * (R[2, 1] - R[1, 2]) * theta
ry = multi * (R[0, 2] - R[2, 0]) * theta
rz = multi * (R[1, 0] - R[0, 1]) * theta
return rx, ry, rz

View file

@ -10,12 +10,39 @@ class OSCReceive:
def handle_received_osc_joint(self,address, *args): def __init__(self,ip,port,_oscsend,_ur=None):
self.server=None self.server=None
self.server_thread=None self.server_thread=None
#osc receive statistics
self.oscreceiveintervals = np.zeros(100)
self.oscreceiveintervals_pos=0
self.last_oscreceive=0
self.ur = _ur
self.oscsend = _oscsend
self.debug="Nothing"
# Set up the dispatcher for the server
disp = self.dispatcher.Dispatcher()
disp.map("/joints", self.handle_received_osc_joint)
disp.map("/invkin", self.handle_received_osc_invkin) #just calculate inverse kinematics
disp.map("/movetcp", self.handle_received_osc_tcp) #calculaate inverse kinematics and move
# Set up OSC server
self.server = self.osc_server.ThreadingOSCUDPServer((ip, port), disp)
print(f"Serving on {self.server.server_address}")
self.server_thread = self.threading.Thread(target=self.server.serve_forever)
self.server_thread.start()
def handle_received_osc_joint(self,address, *args):
self.oscreceiveintervals[self.oscreceiveintervals_pos]=time.time()-self.last_oscreceive self.oscreceiveintervals[self.oscreceiveintervals_pos]=time.time()-self.last_oscreceive
self.last_oscreceive=time.time() self.last_oscreceive=time.time()
self.oscreceiveintervals_pos+=1 self.oscreceiveintervals_pos+=1
@ -27,6 +54,7 @@ class OSCReceive:
for iarg, arg in enumerate(args): for iarg, arg in enumerate(args):
#print(f"Data: {arg}") #print(f"Data: {arg}")
joint_q_aim[iarg]=toRad(float(arg)) joint_q_aim[iarg]=toRad(float(arg))
self.debug=str(joint_q_aim)
except: except:
print("Data not in right format") print("Data not in right format")
print("Data: "+str(args)) print("Data: "+str(args))
@ -35,6 +63,57 @@ class OSCReceive:
self.ur.setJointQAim(joint_q_aim) self.ur.setJointQAim(joint_q_aim)
def handle_received_osc_invkin(self,address, *args):
tcp_aim=[0,0,0,0,0,0]
try:
for iarg, arg in enumerate(args):
#print(f"Data: {arg}")
tcp_aim[iarg]=float(arg)
except:
print("Data not in right format")
print("Data: "+str(args))
self.debug="Data not in right format: "+str(args)
else:
if self.ur is not None:
self.debug="tcp_aim"+str(tcp_aim)
#_rot=RPYtoVec(toRad(tcp_aim[3]),toRad(tcp_aim[4]),toRad(tcp_aim[5]))
_rot=toRad(tcp_aim[3]),toRad(tcp_aim[4]),toRad(tcp_aim[5])
pos_q=[tcp_aim[0],tcp_aim[1],tcp_aim[2],_rot[0],_rot[1],_rot[2]]
joint_q_ik=self.ur.getInverseKinematics(pos_q)
self.oscsend.sendMessage("/invkin/j",joint_q_ik)
self.debug="Send joints"+str(joint_q_ik)
def getDebug(self):
return self.debug
def handle_received_osc_tcp(self,address, *args):
self.oscreceiveintervals[self.oscreceiveintervals_pos]=time.time()-self.last_oscreceive
self.last_oscreceive=time.time()
self.oscreceiveintervals_pos+=1
self.oscreceiveintervals_pos%=np.size(self.oscreceiveintervals)
tcp_aim=[0,0,0,0,0,0]
try:
for iarg, arg in enumerate(args):
#print(f"Data: {arg}")
tcp_aim[iarg]=float(arg)
except:
print("Data not in right format")
print("Data: "+str(args))
else:
if self.ur is not None:
_rot=RPYtoVec(toRad(tcp_aim[3]),toRad(tcp_aim[4]),toRad(tcp_aim[5]))
pos_q=[tcp_aim[0],tcp_aim[1],tcp_aim[2],_rot[0],_rot[1],_rot[2]]
joint_q_ik=self.ur.getInverseKinematics(pos_q)
joint_q_aim=joint_q_ik
self.ur.setJointQAim(joint_q_aim)
def getCurrentReceiveFrequency(self): def getCurrentReceiveFrequency(self):
_mean=np.mean(self.oscreceiveintervals) _mean=np.mean(self.oscreceiveintervals)
@ -43,30 +122,8 @@ class OSCReceive:
else: else:
return 0 return 0
def __init__(self,ip,port,_ur=None):
#osc receive statistics
self.oscreceiveintervals = np.zeros(100)
self.oscreceiveintervals_pos=0
self.last_oscreceive=0
self.ur = _ur
# Set up the dispatcher for the server
disp = self.dispatcher.Dispatcher()
disp.map("/joints", self.handle_received_osc_joint) # You can change '/filter' to any address pattern you expect
# Set up OSC server
self.server = self.osc_server.ThreadingOSCUDPServer((ip, port), disp)
print(f"Serving on {self.server.server_address}")
self.server_thread = self.threading.Thread(target=self.server.serve_forever)
self.server_thread.start()
def disconnect(self): def disconnect(self):
if self.server is not None: self.server.shutdown()
self.server.shutdown() self.server_thread.join()
if self.server_thread is not None:
self.server_thread.join()

View file

@ -18,15 +18,24 @@ class OSCSend:
self.oscreceivers.append(connection) self.oscreceivers.append(connection)
def sendMessage(self,address,value):
for c in self.oscreceivers:
c.send_message(address,value)
def send(self,looptime,q): def send(self,looptime,q,tcp):
if looptime-self.last_sendosc<1.0/self.frequency_oscsend: if looptime-self.last_sendosc<1.0/self.frequency_oscsend:
return return
self.last_sendosc=looptime self.last_sendosc=looptime
for c in self.oscreceivers: for c in self.oscreceivers:
for jointi,joint in enumerate(q): joint_deg=[toDeg(x) for x in q]
c.send_message("/j"+str(jointi), toDeg(joint)) c.send_message("/j",joint_deg)
rx,ry,rz=RPYtoVec(tcp[3],tcp[4],tcp[5])
tcp_deg=[tcp[0],tcp[1],tcp[2],toDeg(rx),toDeg(ry),toDeg(rz)]
#tcp_deg=[tcp[0],tcp[1],tcp[2],toDeg(tcp[3]),toDeg(tcp[4]),toDeg(tcp[5])]
c.send_message("/tcp", tcp_deg)

View file

@ -7,7 +7,7 @@ class UI:
def __init__(self,_joint_min,_joint_max,_frequency_scr=25): def __init__(self,_joint_min,_joint_max,_frequency_scr=25):
self.stdscr=None self.stdscr=None
self.debugtext=["","",""] self.debugtext=["","","",""]
self.width=0 self.width=0
self.height=0 self.height=0
self.joint_min=_joint_min self.joint_min=_joint_min
@ -55,11 +55,12 @@ class UI:
self.key = self.stdscr.getch() self.key = self.stdscr.getch()
self.stdscr.clear() self.stdscr.clear()
for i in range(3): #Print Debug text for i in range(np.size(self.debugtext)): #Print Debug text
self.stdscr.addstr(i, 0, self.debugtext[i], self.curses.color_pair(1)) self.stdscr.addstr(i, 0, self.debugtext[i], self.curses.color_pair(1))
startbarsx=10 startbarsx=10
endbarsx=self.width-1 endbarsx=self.width-1
starty=np.size(self.debugtext)
for j in range(6): for j in range(6):
posx=mapFromTo(q[j],self.joint_min[j],self.joint_max[j],startbarsx,endbarsx) #scale to screen width posx=mapFromTo(q[j],self.joint_min[j],self.joint_max[j],startbarsx,endbarsx) #scale to screen width
for px in range(self.width): for px in range(self.width):
@ -77,11 +78,13 @@ class UI:
poschar="]" poschar="]"
if (len(poschar)>0): if (len(poschar)>0):
self.stdscr.addstr(3+j, px, poschar, self.curses.color_pair(1)) self.stdscr.addstr(starty+j, px, poschar, self.curses.color_pair(1))
valuetext=str(round(toDeg(q[j]),2)) valuetext=str(round(toDeg(q[j]),2))
if q[j]>=0: #has no negative sign if q[j]>=0: #has no negative sign
valuetext=" "+valuetext valuetext=" "+valuetext
self.stdscr.addstr(3+j, 0, valuetext, self.curses.color_pair(1)) self.stdscr.addstr(starty+j, 0, valuetext, self.curses.color_pair(1))
# Refresh the screen # Refresh the screen

View file

@ -22,6 +22,11 @@ class UR:
self.rtde_c=None self.rtde_c=None
self.last_forwardkinematics=None
self.last_forwardkinematics_joint_q_received=None
# Parameters # Parameters
self.velocity = 0.1 self.velocity = 0.1
self.acceleration = 0.1 self.acceleration = 0.1
@ -29,11 +34,12 @@ class UR:
self.gain = 300 self.gain = 300
self.joint_q_aim=[0,0,0,0,0,0] #joint rotations aiming for self.joint_q_aim=[0,0,0,0,0,0] #joint rotations aiming for
self.joint_q = [0,0,0,0,0,0] #joint rotation last sent self.joint_q = [0,0,0,0,0,0] #joint rotation last sent
#self.velocitylimit=[1,1,1,3,3,3] #joint velocity limits self.velocitylimit=[1,1,1,3,3,3] #joint velocity limits
self.velocitylimit=[0.2,0.2,0.2,0.2,0.2,0.2] #joint velocity limits #self.velocitylimit=[0.2,0.2,0.2,0.2,0.2,0.2] #joint velocity limits
self.joint_q_received=[0,0,0,0,0,0] self.joint_q_received=[0,0,0,0,0,0]
self.tcppose_received=[0,0,0,0,0,0]
def connect(self,robotip): def connect(self,robotip):
@ -80,6 +86,7 @@ class UR:
if self.rtde_r is not None: if self.rtde_r is not None:
self.joint_q_received=self.rtde_r.getActualQ() self.joint_q_received=self.rtde_r.getActualQ()
self.tcppose_received=self.rtde_r.getActualTCPPose()
else: else:
_offsets=[12.2,5.3,0,6,12.4,612.91] _offsets=[12.2,5.3,0,6,12.4,612.91]
_factors=[1.2,1.14,1.214,0.9921,1,1.092] _factors=[1.2,1.14,1.214,0.9921,1,1.092]
@ -111,6 +118,20 @@ class UR:
def getReceivedQ(self): def getReceivedQ(self):
return self.joint_q_received return self.joint_q_received
'''def getForwardKinematics(self):
if self.last_forwardkinematics_joint_q_received!=self.joint_q_received:
self.last_forwardkinematics_joint_q_received=self.joint_q_received
self.last_forwardkinematics=self.rtde_c.getForwardKinematics(self.joint_q_received)
return self.last_forwardkinematics
'''
def getReceivedTCPPose(self):
return self.tcppose_received
def getInverseKinematics(self,pos_q):
return self.rtde_c.getInverseKinematics(pos_q,self.joint_q)
def disconnect(self): def disconnect(self):
if self.rtde_c is not None: if self.rtde_c is not None:
self.rtde_c.servoStop() self.rtde_c.servoStop()

View file

@ -59,10 +59,10 @@ def main(stdscr):
start = time.time() start = time.time()
ur.receiveJoints(looptime) ur.receiveJoints(looptime)
ur.receiveStatus(looptime) #ur.receiveStatus(looptime)
#send data to all osc receivers #send data to all osc receivers
oscsend.send(looptime,ur.getReceivedQ()) oscsend.send(looptime,ur.getReceivedQ(),ur.getReceivedTCPPose())
if ur is not None: if ur is not None:
ur.send(looptime) ur.send(looptime)
@ -81,6 +81,11 @@ def main(stdscr):
ui.setDebugtext(1,"loop usage="+str(round(np.mean(loopdurations)/loop_dt,3))) ui.setDebugtext(1,"loop usage="+str(round(np.mean(loopdurations)/loop_dt,3)))
if oscreceive is not None: if oscreceive is not None:
ui.setDebugtext(2,"osc receive frequency="+str(round(oscreceive.getCurrentReceiveFrequency(),0))) ui.setDebugtext(2,"osc receive frequency="+str(round(oscreceive.getCurrentReceiveFrequency(),0)))
#ui.setDebugtext(3,"invkin"+str(ur.getForwardKinematics()))
tcp=ur.getReceivedTCPPose()
tcp_deg=[tcp[0],tcp[1],tcp[2],toDeg(tcp[3]),toDeg(tcp[4]),toDeg(tcp[5])]
#ui.setDebugtext(3,"osc test:"+str(oscreceive.getDebug())+"\t tcp_deg="+str(tcp_deg))
ui.setDebugtext(3,"osc test:"+str(oscreceive.getDebug()))
if ui.getKey() == ord('e') and keyDown==False: if ui.getKey() == ord('e') and keyDown==False:
keyDown=True keyDown=True
@ -148,12 +153,10 @@ if __name__ == "__main__":
if args.test is not True: if args.test is not True:
ur.connect(args.robotip) ur.connect(args.robotip)
oscreceive = OSCReceive(args.listenip, args.listenport,ur) oscreceive = OSCReceive(args.listenip, args.listenport,oscsend,ur)
oscsend.setupReceivers([args.oscsendip],args.oscsendport) oscsend.setupReceivers([args.oscsendip],args.oscsendport)