add test mode with mockup values

This commit is contained in:
Philipp Kramer 2024-05-09 18:29:11 +02:00
parent 1399af5fbe
commit 14b41f0f96

View file

@ -5,17 +5,19 @@ from pythonosc import udp_client
import threading import threading
import time import time
import math import math
import numpy as np
import curses import curses
from rtde_receive import RTDEReceiveInterface as RTDEReceive from rtde_receive import RTDEReceiveInterface as RTDEReceive
clients=None
rtde_r=None
clients = []
rtde_r=None
server=None
def toRad(d): def toRad(d):
@ -97,49 +99,66 @@ def main(stdscr):
height, width = stdscr.getmaxyx() height, width = stdscr.getmaxyx()
dt = 1 / args.frequency scrfrequency=30
dt = 1 / scrfrequency
key = '' key = ''
i = 0 loopcounter = 0
loopdurations = np.zeros(100)
loopdurations_pos=0
try: try:
while key != ord('q'): while key != ord('q'):
key = stdscr.getch() key = stdscr.getch()
stdscr.clear() stdscr.clear()
start = time.time() start = time.time()
q=rtde_r.getActualQ() q=[0,0,0,0,0,0]
if rtde_r is not None:
q=rtde_r.getActualQ()
else:
_offsets=[12.2,5.3,0,6,12.4,612.91]
_factors=[1.2,1.14,1.214,0.9921,1,1.092]
for _i,_offset in enumerate(_offsets):
q[_i]=(math.sin(time.time()*_factors[_i]+_offset)/2.0+0.5)*(joint_max[_i]-joint_min[_i])+joint_min[_i]
for c in clients: for c in clients:
for jointi in range(6): for jointi in range(6):
c.send_message("/j"+str(jointi), toDeg(q[jointi])) c.send_message("/j"+str(jointi), toDeg(q[jointi]))
#print("sent "+str(jointi)+" = "+str(q[jointi]))
#time.sleep(0.1)
#print(q)
#sys.stdout.write("\r")
#sys.stdout.write("{:3d} samples.".format(i))
#sys.stdout.flush()
stdscr.addstr(0, 0, "test "+str(i), curses.color_pair(1)) stdscr.addstr(0, 0, "loopcounter="+str(loopcounter), curses.color_pair(1))
stdscr.addstr(1, 0, "loop usage="+str(round(np.max(loopdurations)/dt,3)), curses.color_pair(1))
startbarsx=10
endbarsx=width-1
for j in range(6): for j in range(6):
posx=mapFromTo(q[j],joint_min[j],joint_max[j],0,width) #scale to screen width posx=mapFromTo(q[j],joint_min[j],joint_max[j],startbarsx,endbarsx) #scale to screen width
for px in range(width): for px in range(width):
diff=abs(posx-px) diff=abs(posx-px)
poschar='' poschar=''
if diff<3: if diff<3:
poschar='.' poschar='-'
if diff<2:
poschar='i'
if diff<0.5: if diff<0.5:
poschar='|' poschar='|'
if px==startbarsx:
poschar="["
if px==endbarsx:
poschar="]"
if (len(poschar)>0): if (len(poschar)>0):
stdscr.addstr(2+j, px, poschar, curses.color_pair(1)) stdscr.addstr(2+j, px, poschar, curses.color_pair(1))
valuetext=str(round(q[j],2)) valuetext=str(round(toDeg(q[j]),2))
if q[j]>=0: #has no negative sign if q[j]>=0: #has no negative sign
valuetext=" "+valuetext valuetext=" "+valuetext
stdscr.addstr(2+j, 0, valuetext, curses.color_pair(1)) stdscr.addstr(2+j, 0, valuetext, curses.color_pair(1))
@ -150,11 +169,14 @@ def main(stdscr):
end = time.time() end = time.time()
duration = end - start loopduration = end - start
loopdurations[loopdurations_pos]=loopduration
loopdurations_pos+=1
loopdurations_pos%=np.size(loopdurations)
if duration < dt: if loopduration < dt:
time.sleep(dt - duration) time.sleep(dt - loopduration)
i += 1 loopcounter += 1
except KeyboardInterrupt: except KeyboardInterrupt:
@ -178,6 +200,9 @@ if __name__ == "__main__":
parser.add_argument("--robotip", parser.add_argument("--robotip",
default="127.0.0.1", help="The ip to connect to") default="127.0.0.1", help="The ip to connect to")
parser.add_argument("--test", action='store_true', default=False, help="Disable network and robot communication")
parser.add_argument( parser.add_argument(
"-f", "-f",
"--frequency", "--frequency",
@ -190,19 +215,19 @@ if __name__ == "__main__":
clientips=[args.clientip] clientips=[args.clientip]
server,server_thread=setupOSC(args.listenip, args.listenport)
clients=setupClients(clientips,args.port) if args.test is not True:
server,server_thread=setupOSC(args.listenip, args.listenport)
rtde_r=setupRTDE(args.robotip,args.frequency) clients=setupClients(clientips,args.port)
rtde_r=setupRTDE(args.robotip,args.frequency)
curses.wrapper(main) curses.wrapper(main)
if server is not None:
input("Ended. Press Enter to shutdown") input("Ended. Press Enter to shutdown")
server.shutdown()
server_thread.join()
server.shutdown()
server_thread.join()