move get key to ui class

This commit is contained in:
Philipp Kramer 2024-05-10 10:35:53 +02:00
parent a3f034cc49
commit bf5a5a8509
3 changed files with 7 additions and 310 deletions

307
urosc.py
View file

@ -1,307 +0,0 @@
import argparse
from pythonosc import dispatcher
from pythonosc import osc_server
from pythonosc import udp_client
import threading
import time
import math
import numpy as np
import curses
import rtde_receive
import rtde_control
clients = []
rtde_r=None
rtde_c=None
server=None
#Looptimings
frequency_loop=500
frequency_oscsend=50
frequency_rtde=None
frequency_scr=25
#Robot Control Parameters
# Parameters
velocity = 0.1
acceleration = 0.1
lookahead_time = 0.1
gain = 300
joint_q_aim=[0,0,0,0,0,0] #joint rotations aiming for
joint_q = [0,0,0,0,0,0] #joint rotation to be send
velocitylimit=[1,1,1,3,3,3] #joint velocity limits
#osc receive statistics
oscreceiveintervals = np.zeros(100)
oscreceiveintervals_pos=0
last_oscreceive=0
def toRad(d):
return d/360*2*math.pi
def toDeg(r):
return r*360/2/math.pi
def mapFromTo(x,a,b,c,d):
y=(x-a)/(b-a)*(d-c)+c
return y
def constrain(v,_min,_max):
return min(_max,max(_min,v))
def handle_received_osc_joint(address, *args):
oscreceiveintervals[oscreceiveintervals_pos]=time.time()-last_oscreceive
last_oscreceive=time.time()
oscreceiveintervals_pos+=1
oscreceiveintervals_pos%=np.size(oscreceiveintervals)
print(f"Received OSC message on {address}")
for iarg, arg in enumerate(args):
#print(f"Data: {arg}")
joint_q_aim[iarg]=float(arg)
def setupOSC(ip, port):
# Set up the dispatcher for the server
disp = dispatcher.Dispatcher()
disp.map("/joint", handle_received_osc_joint) # You can change '/filter' to any address pattern you expect
# Set up OSC server
server = osc_server.ThreadingOSCUDPServer((ip, port), disp)
print(f"Serving on {server.server_address}")
server_thread = threading.Thread(target=server.serve_forever)
server_thread.start()
return server,server_thread
def setupClients(clientips,port):
clients = []
# Set up OSC client
for cip in clientips:
print("connecting to "+str(cip))
client = udp_client.SimpleUDPClient(cip, port)
clients.append(client)
return clients
def setupRTDE(robotip,frequency):
dt = 1 / frequency
rtde_r = rtde_receive.RTDEReceiveInterface(robotip, frequency)
rtde_c = rtde_control.RTDEControlInterface(robotip)
return rtde_r
joint_min=[toRad(-360),toRad(-360),toRad(-360),toRad(-360),toRad(-360),toRad(0)]
joint_max=[toRad(360),toRad(360),toRad(360),toRad(360),toRad(360),toRad(360)]
def main(stdscr):
# Clear and refresh the screen for a blank canvas
stdscr.clear()
stdscr.refresh()
curses.cbreak()
stdscr.keypad(1)
stdscr.nodelay(1)
# Start colors in curses
curses.start_color()
curses.init_pair(1, curses.COLOR_CYAN, curses.COLOR_BLACK)
curses.init_pair(2, curses.COLOR_RED, curses.COLOR_BLACK)
curses.init_pair(3, curses.COLOR_BLACK, curses.COLOR_WHITE)
height, width = stdscr.getmaxyx()
loop_dt = 1 / frequency_loop
key = ''
loopcounter = 0
loopdurations = np.zeros(100)
loopdurations_pos=0
last_sendosc=0
last_receivertde=0
last_sendrtde=0
last_updatescr=0
q=[0,0,0,0,0,0]
try:
while key != ord('q'):
looptime = time.time()
start = time.time()
#Get robot joint angled
if looptime-last_receivertde>=1.0/frequency_receivertde:
last_receivertde=looptime
if rtde_r is not None:
q=rtde_r.getActualQ()
else:
_offsets=[12.2,5.3,0,6,12.4,612.91]
_factors=[1.2,1.14,1.214,0.9921,1,1.092]
for _i,_offset in enumerate(_offsets):
q[_i]=(math.sin(time.time()*_factors[_i]+_offset)/2.0+0.5)*(joint_max[_i]-joint_min[_i])+joint_min[_i]
#send data to all osc clients
if looptime-last_sendosc>=1.0/frequency_oscsend:
last_sendosc=looptime
for c in clients:
for jointi in range(6):
c.send_message("/j"+str(jointi), toDeg(q[jointi]))
if looptime-last_sendrtde>=1.0/frequency_sendrtde:
last_sendrtde=looptime
for i in range(6): #update joint_q with max speeds
joint_q[i] += constrain(joint_q_aim[i]-joint_q[i], -velocitylimit[i]/frequency_sendrtde,velocitylimit[i]/frequency_sendrtde) #constrain joint speeds
if rtde_c is not None:
rtde_c.servoJ(joint_q, velocity, acceleration, 1.0/frequency_sendrtde, lookahead_time, gain)
#Display
if looptime-last_updatescr>=1.0/frequency_scr:
last_updatescr=looptime
key = stdscr.getch()
stdscr.clear()
stdscr.addstr(0, 0, "loopcounter="+str(loopcounter), curses.color_pair(1))
stdscr.addstr(1, 0, "loop usage="+str(round(np.mean(loopdurations)/loop_dt,3)), curses.color_pair(1))
stdscr.addstr(2, 0, "osc receive frequency="+str(round(1.0/np.mean(oscreceiveintervals),0))+" \t pos="+str(oscreceiveintervals_pos), curses.color_pair(1))
startbarsx=10
endbarsx=width-1
for j in range(6):
posx=mapFromTo(q[j],joint_min[j],joint_max[j],startbarsx,endbarsx) #scale to screen width
for px in range(width):
diff=abs(posx-px)
poschar=''
if diff<3:
poschar='-'
if diff<0.5:
poschar='|'
if px==startbarsx:
poschar="["
if px==endbarsx:
poschar="]"
if (len(poschar)>0):
stdscr.addstr(3+j, px, poschar, curses.color_pair(1))
valuetext=str(round(toDeg(q[j]),2))
if q[j]>=0: #has no negative sign
valuetext=" "+valuetext
stdscr.addstr(3+j, 0, valuetext, curses.color_pair(1))
# Refresh the screen
stdscr.refresh()
#Looptiming
end = time.time()
loopduration = end - start
loopdurations[loopdurations_pos]=loopduration
loopdurations_pos+=1
loopdurations_pos%=np.size(loopdurations)
if loopduration < loop_dt:
time.sleep(loop_dt - loopduration)
loopcounter += 1
except KeyboardInterrupt:
print("\nKeyboardInterrupt")
curses.endwin()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--listenip",
default="127.0.0.1", help="The ip to listen on")
parser.add_argument("--listenport",
type=int, default=5005, help="The port to listen on")
parser.add_argument("--clientip",
default="127.0.0.1", help="The ip to connect to")
parser.add_argument("--port",
type=int, default=5005, help="The port to connect to")
parser.add_argument("--robotip",
default="127.0.0.1", help="The ip to connect to")
parser.add_argument("--test", action='store_true', default=False, help="Disable network and robot communication")
parser.add_argument(
"-fs",
"--frequencysend",
dest="frequencysend",
help="the frequency at which the robot control data is send",
type=float,
default=500.0,
metavar="<frequency>")
parser.add_argument(
"-fr",
"--frequencyreceive",
dest="frequencyreceive",
help="the frequency at which the robot actualq data is received",
type=float,
default=500.0,
metavar="<frequency>")
args = parser.parse_args()
frequency_receivertde=args.frequencyreceive
frequency_sendrtde=args.frequencysend
clientips=[args.clientip]
if args.test is not True:
server,server_thread=setupOSC(args.listenip, args.listenport)
clients=setupClients(clientips,args.port)
rtde_r=setupRTDE(args.robotip,frequency_rtde)
curses.wrapper(main)
if server is not None:
input("Ended. Press Enter to shutdown")
server.shutdown()
server_thread.join()
if rtde_c is not None:
rtde_c.servoStop()
rtde_c.stopScript()

View file

@ -16,6 +16,7 @@ class UI:
self.joint_max=_joint_max self.joint_max=_joint_max
self.frequency_scr=_frequency_scr self.frequency_scr=_frequency_scr
self.last_updatescr=0 self.last_updatescr=0
self.key = ''
def setDebugtext(self,line,text): def setDebugtext(self,line,text):
@ -42,6 +43,9 @@ class UI:
self.height, self.width = self.stdscr.getmaxyx() self.height, self.width = self.stdscr.getmaxyx()
def getKey(self):
return self.key
def update(self,q,looptime): def update(self,q,looptime):
@ -50,7 +54,7 @@ class UI:
self.last_updatescr=looptime self.last_updatescr=looptime
key = self.stdscr.getch() self.key = self.stdscr.getch()
self.stdscr.clear() self.stdscr.clear()
for i in range(3): #Print Debug text for i in range(3): #Print Debug text

View file

@ -99,7 +99,7 @@ def main(stdscr):
key = ''
loopcounter = 0 loopcounter = 0
loopdurations = np.zeros(100) loopdurations = np.zeros(100)
loopdurations_pos=0 loopdurations_pos=0
@ -111,7 +111,7 @@ def main(stdscr):
try: try:
while key != ord('q'): while ui.getKey() != ord('q'):
looptime = time.time() looptime = time.time()
start = time.time() start = time.time()