diff --git a/urosc/helpfunctions.py b/urosc/helpfunctions.py new file mode 100644 index 0000000..44c809c --- /dev/null +++ b/urosc/helpfunctions.py @@ -0,0 +1,16 @@ +import math + + +def toRad(d): + return d/360*2*math.pi + +def toDeg(r): + return r*360/2/math.pi + + +def mapFromTo(x,a,b,c,d): + y=(x-a)/(b-a)*(d-c)+c + return y + +def constrain(v,_min,_max): + return min(_max,max(_min,v)) diff --git a/urosc/ui.py b/urosc/ui.py new file mode 100644 index 0000000..8be622b --- /dev/null +++ b/urosc/ui.py @@ -0,0 +1,83 @@ +from helpfunctions import * + + +class UI: + #def __init__(self, name, age): + # self.name = name + # self.age = age + + import curses + + + def __init__(self,_joint_min,_joint_max): + self.stdscr=None + self.debugtext=["","",""] + self.width=0 + self.height=0 + self.joint_min=None + self.joint_max=None + self.joint_min=_joint_min + self.joint_max=_joint_max + + + def setDebugtext(self,line,text): + self.debugtext[line]=text + + + def begin(self,_stdscr): + self.stdscr=_stdscr + + # Clear and refresh the screen for a blank canvas + self.stdscr.clear() + self.stdscr.refresh() + + self.curses.cbreak() + self.stdscr.keypad(1) + self.stdscr.nodelay(1) + + + # Start colors in curses + self.curses.start_color() + self.curses.init_pair(1, self.curses.COLOR_CYAN, self.curses.COLOR_BLACK) + self.curses.init_pair(2, self.curses.COLOR_RED, self.curses.COLOR_BLACK) + self.curses.init_pair(3, self.curses.COLOR_BLACK, self.curses.COLOR_WHITE) + + + self.height, self.width = self.stdscr.getmaxyx() + + def update(self,q): + key = self.stdscr.getch() + self.stdscr.clear() + + for i in range(3): #Print Debug text + self.stdscr.addstr(i, 0, self.debugtext[i], self.curses.color_pair(1)) + + startbarsx=10 + endbarsx=self.width-1 + for j in range(6): + posx=mapFromTo(q[j],self.joint_min[j],self.joint_max[j],startbarsx,endbarsx) #scale to screen width + for px in range(self.width): + diff=abs(posx-px) + poschar='' + + if diff<3: + poschar='-' + if diff<0.5: + poschar='|' + + if px==startbarsx: + poschar="[" + if px==endbarsx: + poschar="]" + + if (len(poschar)>0): + self.stdscr.addstr(3+j, px, poschar, self.curses.color_pair(1)) + valuetext=str(round(toDeg(q[j]),2)) + if q[j]>=0: #has no negative sign + valuetext=" "+valuetext + self.stdscr.addstr(3+j, 0, valuetext, self.curses.color_pair(1)) + + + # Refresh the screen + self.stdscr.refresh() + diff --git a/urosc/urosc.py b/urosc/urosc.py new file mode 100644 index 0000000..12f4967 --- /dev/null +++ b/urosc/urosc.py @@ -0,0 +1,255 @@ +import argparse +from pythonosc import dispatcher +from pythonosc import osc_server +from pythonosc import udp_client +import threading +import time +import math +import numpy as np +import curses + +import rtde_receive +import rtde_control + + +from helpfunctions import * +from ui import UI + + + + + +clients = [] +rtde_r=None +rtde_c=None +server=None + +#Looptimings +frequency_loop=500 +frequency_oscsend=50 +frequency_rtde=None +frequency_scr=25 + +#Robot Control Parameters +# Parameters +velocity = 0.1 +acceleration = 0.1 +lookahead_time = 0.1 +gain = 300 +joint_q_aim=[0,0,0,0,0,0] #joint rotations aiming for +joint_q = [0,0,0,0,0,0] #joint rotation to be send +velocitylimit=[1,1,1,3,3,3] #joint velocity limits + + +#osc receive statistics +oscreceiveintervals = np.zeros(100) +oscreceiveintervals_pos=0 +last_oscreceive=0 + +def handle_received_osc_joint(address, *args): + oscreceiveintervals[oscreceiveintervals_pos]=time.time()-last_oscreceive + last_oscreceive=time.time() + oscreceiveintervals_pos+=1 + oscreceiveintervals_pos%=np.size(oscreceiveintervals) + + print(f"Received OSC message on {address}") + for iarg, arg in enumerate(args): + #print(f"Data: {arg}") + joint_q_aim[iarg]=float(arg) + + +def setupOSC(ip, port): + # Set up the dispatcher for the server + disp = dispatcher.Dispatcher() + disp.map("/joint", handle_received_osc_joint) # You can change '/filter' to any address pattern you expect + + # Set up OSC server + server = osc_server.ThreadingOSCUDPServer((ip, port), disp) + print(f"Serving on {server.server_address}") + server_thread = threading.Thread(target=server.serve_forever) + server_thread.start() + + return server,server_thread + +def setupClients(clientips,port): + clients = [] + + # Set up OSC client + for cip in clientips: + print("connecting to "+str(cip)) + client = udp_client.SimpleUDPClient(cip, port) + clients.append(client) + + return clients + + +def setupRTDE(robotip,frequency): + dt = 1 / frequency + rtde_r = rtde_receive.RTDEReceiveInterface(robotip, frequency) + + rtde_c = rtde_control.RTDEControlInterface(robotip) + + return rtde_r + + +joint_min=[toRad(-360),toRad(-360),toRad(-360),toRad(-360),toRad(-360),toRad(0)] +joint_max=[toRad(360),toRad(360),toRad(360),toRad(360),toRad(360),toRad(360)] + + +ui = UI(joint_min,joint_max) + + + +def main(stdscr): + + ui.begin(stdscr) + + + loop_dt = 1 / frequency_loop + + + + key = '' + loopcounter = 0 + loopdurations = np.zeros(100) + loopdurations_pos=0 + + last_sendosc=0 + last_receivertde=0 + last_sendrtde=0 + last_updatescr=0 + + q=[0,0,0,0,0,0] + + + try: + while key != ord('q'): + looptime = time.time() + + start = time.time() + + + #Get robot joint angled + if looptime-last_receivertde>=1.0/frequency_receivertde: + last_receivertde=looptime + + if rtde_r is not None: + q=rtde_r.getActualQ() + else: + _offsets=[12.2,5.3,0,6,12.4,612.91] + _factors=[1.2,1.14,1.214,0.9921,1,1.092] + + for _i,_offset in enumerate(_offsets): + q[_i]=(math.sin(time.time()*_factors[_i]+_offset)/2.0+0.5)*(joint_max[_i]-joint_min[_i])+joint_min[_i] + + + #send data to all osc clients + if looptime-last_sendosc>=1.0/frequency_oscsend: + last_sendosc=looptime + for c in clients: + for jointi in range(6): + c.send_message("/j"+str(jointi), toDeg(q[jointi])) + + if looptime-last_sendrtde>=1.0/frequency_sendrtde: + last_sendrtde=looptime + + for i in range(6): #update joint_q with max speeds + joint_q[i] += constrain(joint_q_aim[i]-joint_q[i], -velocitylimit[i]/frequency_sendrtde,velocitylimit[i]/frequency_sendrtde) #constrain joint speeds + + if rtde_c is not None: + rtde_c.servoJ(joint_q, velocity, acceleration, 1.0/frequency_sendrtde, lookahead_time, gain) + + + #Display + if looptime-last_updatescr>=1.0/frequency_scr: + last_updatescr=looptime + + ui.update(q) + + #Looptiming + end = time.time() + loopduration = end - start + loopdurations[loopdurations_pos]=loopduration + loopdurations_pos+=1 + loopdurations_pos%=np.size(loopdurations) + + ui.setDebugtext(0,"loopcounter="+str(loopcounter)) + ui.setDebugtext(1,"loop usage="+str(round(np.mean(loopdurations)/loop_dt,3))) + ui.setDebugtext(2,"osc receive frequency="+str(round(1.0/np.mean(oscreceiveintervals),0))+" \t pos="+str(oscreceiveintervals_pos)) + + + + + if loopduration < loop_dt: + time.sleep(loop_dt - loopduration) + loopcounter += 1 + + except KeyboardInterrupt: + + print("\nKeyboardInterrupt") + curses.endwin() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--listenip", + default="127.0.0.1", help="The ip to listen on") + parser.add_argument("--listenport", + type=int, default=5005, help="The port to listen on") + + parser.add_argument("--clientip", + default="127.0.0.1", help="The ip to connect to") + parser.add_argument("--port", + type=int, default=5005, help="The port to connect to") + + + parser.add_argument("--robotip", + default="127.0.0.1", help="The ip to connect to") + + parser.add_argument("--test", action='store_true', default=False, help="Disable network and robot communication") + + + parser.add_argument( + "-fs", + "--frequencysend", + dest="frequencysend", + help="the frequency at which the robot control data is send", + type=float, + default=500.0, + metavar="") + + parser.add_argument( + "-fr", + "--frequencyreceive", + dest="frequencyreceive", + help="the frequency at which the robot actualq data is received", + type=float, + default=500.0, + metavar="") + + args = parser.parse_args() + + frequency_receivertde=args.frequencyreceive + frequency_sendrtde=args.frequencysend + + clientips=[args.clientip] + + + if args.test is not True: + server,server_thread=setupOSC(args.listenip, args.listenport) + + clients=setupClients(clientips,args.port) + + rtde_r=setupRTDE(args.robotip,frequency_rtde) + + + curses.wrapper(main) + + if server is not None: + input("Ended. Press Enter to shutdown") + server.shutdown() + server_thread.join() + if rtde_c is not None: + rtde_c.servoStop() + rtde_c.stopScript() +