import argparse from pythonosc import dispatcher from pythonosc import osc_server from pythonosc import udp_client import threading import time import math import numpy as np import curses from rtde_receive import RTDEReceiveInterface as RTDEReceive clients = [] rtde_r=None server=None #Looptimings frequency_loop=500 frequency_oscsend=50 frequency_rtde=None frequency_scr=30 def toRad(d): return d/360*2*math.pi def toDeg(r): return r*360/2/math.pi def mapFromTo(x,a,b,c,d): y=(x-a)/(b-a)*(d-c)+c return y def handle_received_osc(address, *args): print(f"Received OSC message on {address}") for arg in args: print(f"Data: {arg}") def setupOSC(ip, port): # Set up the dispatcher for the server disp = dispatcher.Dispatcher() disp.map("/filter", handle_received_osc) # You can change '/filter' to any address pattern you expect # Set up OSC server server = osc_server.ThreadingOSCUDPServer((ip, port), disp) print(f"Serving on {server.server_address}") server_thread = threading.Thread(target=server.serve_forever) server_thread.start() return server,server_thread def setupClients(clientips,port): clients = [] # Set up OSC client for cip in clientips: print("connecting to "+str(cip)) client = udp_client.SimpleUDPClient(cip, port) clients.append(client) return clients def setupRTDE(robotip,frequency): dt = 1 / frequency rtde_r = RTDEReceive(robotip, frequency) return rtde_r joint_min=[toRad(-360),toRad(-360),toRad(-360),toRad(-360),toRad(-360),toRad(0)] joint_max=[toRad(360),toRad(360),toRad(360),toRad(360),toRad(360),toRad(360)] def main(stdscr): # Clear and refresh the screen for a blank canvas stdscr.clear() stdscr.refresh() curses.cbreak() stdscr.keypad(1) stdscr.nodelay(1) # Start colors in curses curses.start_color() curses.init_pair(1, curses.COLOR_CYAN, curses.COLOR_BLACK) curses.init_pair(2, curses.COLOR_RED, curses.COLOR_BLACK) curses.init_pair(3, curses.COLOR_BLACK, curses.COLOR_WHITE) height, width = stdscr.getmaxyx() loop_dt = 1 / frequency_loop key = '' loopcounter = 0 loopdurations = np.zeros(100) loopdurations_pos=0 last_sendosc=0 last_receivertde=0 last_updatescr=0 q=[0,0,0,0,0,0] try: while key != ord('q'): looptime=time.time() start = time.time() #Get robot joint angled if looptime-last_receivertde>=1.0/frequency_rtde: last_receivertde=looptime if rtde_r is not None: q=rtde_r.getActualQ() else: _offsets=[12.2,5.3,0,6,12.4,612.91] _factors=[1.2,1.14,1.214,0.9921,1,1.092] for _i,_offset in enumerate(_offsets): q[_i]=(math.sin(time.time()*_factors[_i]+_offset)/2.0+0.5)*(joint_max[_i]-joint_min[_i])+joint_min[_i] #send data to all osc clients if looptime-last_sendosc>=1.0/frequency_oscsend: last_sendosc=looptime for c in clients: for jointi in range(6): c.send_message("/j"+str(jointi), toDeg(q[jointi])) #Display if looptime-last_updatescr>=1.0/frequency_scr: last_updatescr=looptime key = stdscr.getch() stdscr.clear() stdscr.addstr(0, 0, "loopcounter="+str(loopcounter), curses.color_pair(3)) stdscr.addstr(1, 0, "loop usage="+str(round(np.mean(loopdurations)/loop_dt,3)), curses.color_pair(1)) startbarsx=10 endbarsx=width-1 for j in range(6): posx=mapFromTo(q[j],joint_min[j],joint_max[j],startbarsx,endbarsx) #scale to screen width for px in range(width): diff=abs(posx-px) poschar='' if diff<3: poschar='-' if diff<0.5: poschar='|' if px==startbarsx: poschar="[" if px==endbarsx: poschar="]" if (len(poschar)>0): stdscr.addstr(2+j, px, poschar, curses.color_pair(1)) valuetext=str(round(toDeg(q[j]),2)) if q[j]>=0: #has no negative sign valuetext=" "+valuetext stdscr.addstr(2+j, 0, valuetext, curses.color_pair(1)) # Refresh the screen stdscr.refresh() #Looptiming end = time.time() loopduration = end - start loopdurations[loopdurations_pos]=loopduration loopdurations_pos+=1 loopdurations_pos%=np.size(loopdurations) if loopduration < loop_dt: time.sleep(loop_dt - loopduration) loopcounter += 1 except KeyboardInterrupt: print("\nKeyboardInterrupt") curses.endwin() if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--listenip", default="127.0.0.1", help="The ip to listen on") parser.add_argument("--listenport", type=int, default=5005, help="The port to listen on") parser.add_argument("--clientip", default="127.0.0.1", help="The ip to connect to") parser.add_argument("--port", type=int, default=5005, help="The port to connect to") parser.add_argument("--robotip", default="127.0.0.1", help="The ip to connect to") parser.add_argument("--test", action='store_true', default=False, help="Disable network and robot communication") parser.add_argument( "-f", "--frequency", dest="frequency", help="the frequency at which the data is recorded (default is 500Hz)", type=float, default=500.0, metavar="") args = parser.parse_args() frequency_rtde=args.frequency clientips=[args.clientip] if args.test is not True: server,server_thread=setupOSC(args.listenip, args.listenport) clients=setupClients(clientips,args.port) rtde_r=setupRTDE(args.robotip,frequency_rtde) curses.wrapper(main) if server is not None: input("Ended. Press Enter to shutdown") server.shutdown() server_thread.join()