urcontrol/urosc.py

209 lines
5.1 KiB
Python
Raw Normal View History

2024-05-09 15:12:12 +00:00
import argparse
from pythonosc import dispatcher
from pythonosc import osc_server
from pythonosc import udp_client
import threading
import time
import math
import curses
from rtde_receive import RTDEReceiveInterface as RTDEReceive
clients=None
rtde_r=None
def toRad(d):
return d/360*2*math.pi
def toDeg(r):
return r*360/2/math.pi
def mapFromTo(x,a,b,c,d):
y=(x-a)/(b-a)*(d-c)+c
return y
def handle_received_osc(address, *args):
print(f"Received OSC message on {address}")
for arg in args:
print(f"Data: {arg}")
def setupOSC(ip, port):
# Set up the dispatcher for the server
disp = dispatcher.Dispatcher()
disp.map("/filter", handle_received_osc) # You can change '/filter' to any address pattern you expect
# Set up OSC server
server = osc_server.ThreadingOSCUDPServer((ip, port), disp)
print(f"Serving on {server.server_address}")
server_thread = threading.Thread(target=server.serve_forever)
server_thread.start()
return server,server_thread
def setupClients(clientips,port):
clients = []
# Set up OSC client
for cip in clientips:
print("connecting to "+str(cip))
client = udp_client.SimpleUDPClient(cip, port)
clients.append(client)
return clients
def setupRTDE(robotip,frequency):
dt = 1 / frequency
rtde_r = RTDEReceive(robotip, frequency)
return rtde_r
joint_min=[toRad(-360),toRad(-360),toRad(-360),toRad(-360),toRad(-360),toRad(0)]
joint_max=[toRad(360),toRad(360),toRad(360),toRad(360),toRad(360),toRad(360)]
def main(stdscr):
"""Main entry point allowing external calls
Args:
args ([str]): command line parameter list
"""
# Clear and refresh the screen for a blank canvas
stdscr.clear()
stdscr.refresh()
curses.cbreak()
stdscr.keypad(1)
stdscr.nodelay(1)
# Start colors in curses
curses.start_color()
curses.init_pair(1, curses.COLOR_CYAN, curses.COLOR_BLACK)
curses.init_pair(2, curses.COLOR_RED, curses.COLOR_BLACK)
curses.init_pair(3, curses.COLOR_BLACK, curses.COLOR_WHITE)
height, width = stdscr.getmaxyx()
dt = 1 / args.frequency
key = ''
i = 0
try:
while key != ord('q'):
key = stdscr.getch()
stdscr.clear()
start = time.time()
q=rtde_r.getActualQ()
for c in clients:
for jointi in range(6):
c.send_message("/j"+str(jointi), toDeg(q[jointi]))
#print("sent "+str(jointi)+" = "+str(q[jointi]))
#time.sleep(0.1)
#print(q)
#sys.stdout.write("\r")
#sys.stdout.write("{:3d} samples.".format(i))
#sys.stdout.flush()
stdscr.addstr(0, 0, "test "+str(i), curses.color_pair(1))
for j in range(6):
posx=mapFromTo(q[j],joint_min[j],joint_max[j],0,width) #scale to screen width
for px in range(width):
diff=abs(posx-px)
poschar=''
if diff<3:
poschar='.'
if diff<2:
poschar='i'
if diff<0.5:
poschar='|'
if (len(poschar)>0):
stdscr.addstr(2+j, px, poschar, curses.color_pair(1))
valuetext=str(round(q[j],2))
if q[j]>=0: #has no negative sign
valuetext=" "+valuetext
stdscr.addstr(2+j, 0, valuetext, curses.color_pair(1))
# Refresh the screen
stdscr.refresh()
end = time.time()
duration = end - start
if duration < dt:
time.sleep(dt - duration)
i += 1
except KeyboardInterrupt:
print("\nData recording stopped.")
curses.endwin()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--listenip",
default="127.0.0.1", help="The ip to listen on")
parser.add_argument("--listenport",
type=int, default=5005, help="The port to listen on")
parser.add_argument("--clientip",
default="127.0.0.1", help="The ip to connect to")
parser.add_argument("--port",
type=int, default=5005, help="The port to connect to")
parser.add_argument("--robotip",
default="127.0.0.1", help="The ip to connect to")
parser.add_argument(
"-f",
"--frequency",
dest="frequency",
help="the frequency at which the data is recorded (default is 500Hz)",
type=float,
default=500.0,
metavar="<frequency>")
args = parser.parse_args()
clientips=[args.clientip]
server,server_thread=setupOSC(args.listenip, args.listenport)
clients=setupClients(clientips,args.port)
rtde_r=setupRTDE(args.robotip,args.frequency)
curses.wrapper(main)
input("Ended. Press Enter to shutdown")
server.shutdown()
server_thread.join()