From 14b41f0f9675e9a72c8f2a4b3f66108ce152dfaa Mon Sep 17 00:00:00 2001 From: Philipp Kramer Date: Thu, 9 May 2024 18:29:11 +0200 Subject: [PATCH] add test mode with mockup values --- urosc.py | 87 ++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 56 insertions(+), 31 deletions(-) diff --git a/urosc.py b/urosc.py index dbfcb0b..20febba 100644 --- a/urosc.py +++ b/urosc.py @@ -5,17 +5,19 @@ from pythonosc import udp_client import threading import time import math - +import numpy as np import curses from rtde_receive import RTDEReceiveInterface as RTDEReceive -clients=None -rtde_r=None +clients = [] +rtde_r=None +server=None + def toRad(d): @@ -96,50 +98,67 @@ def main(stdscr): height, width = stdscr.getmaxyx() + + scrfrequency=30 - dt = 1 / args.frequency + dt = 1 / scrfrequency key = '' - i = 0 + loopcounter = 0 + loopdurations = np.zeros(100) + loopdurations_pos=0 + try: while key != ord('q'): key = stdscr.getch() stdscr.clear() start = time.time() - q=rtde_r.getActualQ() + q=[0,0,0,0,0,0] + if rtde_r is not None: + q=rtde_r.getActualQ() + else: + _offsets=[12.2,5.3,0,6,12.4,612.91] + _factors=[1.2,1.14,1.214,0.9921,1,1.092] + + for _i,_offset in enumerate(_offsets): + q[_i]=(math.sin(time.time()*_factors[_i]+_offset)/2.0+0.5)*(joint_max[_i]-joint_min[_i])+joint_min[_i] + + for c in clients: for jointi in range(6): c.send_message("/j"+str(jointi), toDeg(q[jointi])) - #print("sent "+str(jointi)+" = "+str(q[jointi])) - #time.sleep(0.1) - #print(q) - #sys.stdout.write("\r") - #sys.stdout.write("{:3d} samples.".format(i)) - #sys.stdout.flush() - stdscr.addstr(0, 0, "test "+str(i), curses.color_pair(1)) + stdscr.addstr(0, 0, "loopcounter="+str(loopcounter), curses.color_pair(1)) + stdscr.addstr(1, 0, "loop usage="+str(round(np.max(loopdurations)/dt,3)), curses.color_pair(1)) + + startbarsx=10 + endbarsx=width-1 for j in range(6): - posx=mapFromTo(q[j],joint_min[j],joint_max[j],0,width) #scale to screen width + posx=mapFromTo(q[j],joint_min[j],joint_max[j],startbarsx,endbarsx) #scale to screen width for px in range(width): diff=abs(posx-px) poschar='' + if diff<3: - poschar='.' - if diff<2: - poschar='i' + poschar='-' if diff<0.5: poschar='|' + + if px==startbarsx: + poschar="[" + if px==endbarsx: + poschar="]" if (len(poschar)>0): stdscr.addstr(2+j, px, poschar, curses.color_pair(1)) - valuetext=str(round(q[j],2)) + valuetext=str(round(toDeg(q[j]),2)) if q[j]>=0: #has no negative sign valuetext=" "+valuetext stdscr.addstr(2+j, 0, valuetext, curses.color_pair(1)) @@ -150,11 +169,14 @@ def main(stdscr): end = time.time() - duration = end - start + loopduration = end - start + loopdurations[loopdurations_pos]=loopduration + loopdurations_pos+=1 + loopdurations_pos%=np.size(loopdurations) - if duration < dt: - time.sleep(dt - duration) - i += 1 + if loopduration < dt: + time.sleep(dt - loopduration) + loopcounter += 1 except KeyboardInterrupt: @@ -178,6 +200,9 @@ if __name__ == "__main__": parser.add_argument("--robotip", default="127.0.0.1", help="The ip to connect to") + parser.add_argument("--test", action='store_true', default=False, help="Disable network and robot communication") + + parser.add_argument( "-f", "--frequency", @@ -190,19 +215,19 @@ if __name__ == "__main__": clientips=[args.clientip] - server,server_thread=setupOSC(args.listenip, args.listenport) + + if args.test is not True: + server,server_thread=setupOSC(args.listenip, args.listenport) - clients=setupClients(clientips,args.port) + clients=setupClients(clientips,args.port) - rtde_r=setupRTDE(args.robotip,args.frequency) + rtde_r=setupRTDE(args.robotip,args.frequency) curses.wrapper(main) - - input("Ended. Press Enter to shutdown") - - - server.shutdown() - server_thread.join() + if server is not None: + input("Ended. Press Enter to shutdown") + server.shutdown() + server_thread.join()