From 501e13f707ed8f5c57bf30d3de635469e8174c00 Mon Sep 17 00:00:00 2001 From: Philipp Kramer Date: Tue, 31 May 2022 15:05:06 +0200 Subject: [PATCH] add socket tests --- tests/pantilt.py | 84 ++++++++++++++++++++++++++++++++++++++ tests/socketreceivetest.py | 28 +++++++++++++ tests/socketsendtest.py | 15 +++++++ 3 files changed, 127 insertions(+) create mode 100644 tests/pantilt.py create mode 100644 tests/socketreceivetest.py create mode 100644 tests/socketsendtest.py diff --git a/tests/pantilt.py b/tests/pantilt.py new file mode 100644 index 0000000..fdba009 --- /dev/null +++ b/tests/pantilt.py @@ -0,0 +1,84 @@ + +import rtde_control +import time +import math +import socket + +rtde_c = rtde_control.RTDEControlInterface("192.168.1.101") + + +import rtde_receive +rtde_r = rtde_receive.RTDEReceiveInterface("192.168.1.101") + +def toRad(d): + return d/360*2*math.pi + + +''' +actual_q = rtde_r.getActualQ() #current joint rotations in radians. +print("Actual Q:"+str(actual_q)) +fw_actual_q = rtde_c.getForwardKinematics(actual_q) +print("forward Actual Q:"+str(fw_actual_q)) +joint_q=rtde_c.getInverseKinematics(fw_actual_q) +print("Final Q:"+str(joint_q)) +''' + + +pan=0 #-=left, +=right, degrees +tilt=0 #-=down, +=up, degrees + +joint_q=[0,toRad(-90),toRad(90),toRad(-180),toRad(-90),toRad(0)] #start position + +#rtde_c.moveJ([0,toRad(-90),toRad(90-tilt/2),toRad(-180-tilt/2),toRad(-90-pan),toRad(0)],0.1,0.1) + +rtde_c.moveJ(joint_q,0.1,0.1) #move to initial position + + +# Parameters +duration = 20 #in seconds + +velocity = 0.1 #0.5 +acceleration = 0.1 #0.5 +frequency= 100 +dt = 1.0/frequency # 2ms +lookahead_time = 0.1 +gain = 300 + +speed=0.1/frequency + + +def constrain(v,_min,_max): + return min(_max,max(_min,v)) + +print("Starting Servo") +for i in range(frequency*duration): + start = time.time() + + joint_q_aim = [0,toRad(-90),toRad(90-tilt/2),toRad(-180-tilt/2),toRad(-90-pan),toRad(0)] + + if (round(i/(frequency*5))%2==1): + tilt=20 + else: + tilt=-20 + + for i in range(6): + joint_q[i] += constrain(joint_q_aim[i]-joint_q[i], -speed,speed) + + + print(joint_q) + + rtde_c.servoJ(joint_q, velocity, acceleration, dt, lookahead_time, gain) + + end = time.time() + stepduration = end - start + + if stepduration < dt: + time.sleep(dt - stepduration) + +print("Finished") + +rtde_c.servoStop() +rtde_c.stopScript() + + + diff --git a/tests/socketreceivetest.py b/tests/socketreceivetest.py new file mode 100644 index 0000000..00be94d --- /dev/null +++ b/tests/socketreceivetest.py @@ -0,0 +1,28 @@ +import socket +import math + +s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +host ="" +port =30002 +s.bind((host,port)) +s.listen(1) # Number of connections +s.setblocking(False) + + +while True: + try: + client, address = s.accept() + print("Connected to", address) + + + data = client.recv( 1024 ).decode( 'utf-8' ) + splitdata=data.split(",") + + print("Received :", repr(data)) + print(splitdata) + except socket.error: + pass + + + +s.close() \ No newline at end of file diff --git a/tests/socketsendtest.py b/tests/socketsendtest.py new file mode 100644 index 0000000..cdee30e --- /dev/null +++ b/tests/socketsendtest.py @@ -0,0 +1,15 @@ +import socket + + +TCP_IP = '127.0.0.1' +TCP_PORT = 30002 + +BUFFER_SIZE = 1024 +message = "12.3,20.15" + +s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +s.connect((TCP_IP, TCP_PORT)) +s.send(message.encode('utf-8')) + +s.close() +