add socket tests
This commit is contained in:
parent
486b470f6c
commit
501e13f707
3 changed files with 127 additions and 0 deletions
84
tests/pantilt.py
Normal file
84
tests/pantilt.py
Normal file
|
@ -0,0 +1,84 @@
|
|||
|
||||
import rtde_control
|
||||
import time
|
||||
import math
|
||||
import socket
|
||||
|
||||
rtde_c = rtde_control.RTDEControlInterface("192.168.1.101")
|
||||
|
||||
|
||||
import rtde_receive
|
||||
rtde_r = rtde_receive.RTDEReceiveInterface("192.168.1.101")
|
||||
|
||||
def toRad(d):
|
||||
return d/360*2*math.pi
|
||||
|
||||
|
||||
'''
|
||||
actual_q = rtde_r.getActualQ() #current joint rotations in radians.
|
||||
print("Actual Q:"+str(actual_q))
|
||||
fw_actual_q = rtde_c.getForwardKinematics(actual_q)
|
||||
print("forward Actual Q:"+str(fw_actual_q))
|
||||
joint_q=rtde_c.getInverseKinematics(fw_actual_q)
|
||||
print("Final Q:"+str(joint_q))
|
||||
'''
|
||||
|
||||
|
||||
pan=0 #-=left, +=right, degrees
|
||||
tilt=0 #-=down, +=up, degrees
|
||||
|
||||
joint_q=[0,toRad(-90),toRad(90),toRad(-180),toRad(-90),toRad(0)] #start position
|
||||
|
||||
#rtde_c.moveJ([0,toRad(-90),toRad(90-tilt/2),toRad(-180-tilt/2),toRad(-90-pan),toRad(0)],0.1,0.1)
|
||||
|
||||
rtde_c.moveJ(joint_q,0.1,0.1) #move to initial position
|
||||
|
||||
|
||||
# Parameters
|
||||
duration = 20 #in seconds
|
||||
|
||||
velocity = 0.1 #0.5
|
||||
acceleration = 0.1 #0.5
|
||||
frequency= 100
|
||||
dt = 1.0/frequency # 2ms
|
||||
lookahead_time = 0.1
|
||||
gain = 300
|
||||
|
||||
speed=0.1/frequency
|
||||
|
||||
|
||||
def constrain(v,_min,_max):
|
||||
return min(_max,max(_min,v))
|
||||
|
||||
print("Starting Servo")
|
||||
for i in range(frequency*duration):
|
||||
start = time.time()
|
||||
|
||||
joint_q_aim = [0,toRad(-90),toRad(90-tilt/2),toRad(-180-tilt/2),toRad(-90-pan),toRad(0)]
|
||||
|
||||
if (round(i/(frequency*5))%2==1):
|
||||
tilt=20
|
||||
else:
|
||||
tilt=-20
|
||||
|
||||
for i in range(6):
|
||||
joint_q[i] += constrain(joint_q_aim[i]-joint_q[i], -speed,speed)
|
||||
|
||||
|
||||
print(joint_q)
|
||||
|
||||
rtde_c.servoJ(joint_q, velocity, acceleration, dt, lookahead_time, gain)
|
||||
|
||||
end = time.time()
|
||||
stepduration = end - start
|
||||
|
||||
if stepduration < dt:
|
||||
time.sleep(dt - stepduration)
|
||||
|
||||
print("Finished")
|
||||
|
||||
rtde_c.servoStop()
|
||||
rtde_c.stopScript()
|
||||
|
||||
|
||||
|
28
tests/socketreceivetest.py
Normal file
28
tests/socketreceivetest.py
Normal file
|
@ -0,0 +1,28 @@
|
|||
import socket
|
||||
import math
|
||||
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
host =""
|
||||
port =30002
|
||||
s.bind((host,port))
|
||||
s.listen(1) # Number of connections
|
||||
s.setblocking(False)
|
||||
|
||||
|
||||
while True:
|
||||
try:
|
||||
client, address = s.accept()
|
||||
print("Connected to", address)
|
||||
|
||||
|
||||
data = client.recv( 1024 ).decode( 'utf-8' )
|
||||
splitdata=data.split(",")
|
||||
|
||||
print("Received :", repr(data))
|
||||
print(splitdata)
|
||||
except socket.error:
|
||||
pass
|
||||
|
||||
|
||||
|
||||
s.close()
|
15
tests/socketsendtest.py
Normal file
15
tests/socketsendtest.py
Normal file
|
@ -0,0 +1,15 @@
|
|||
import socket
|
||||
|
||||
|
||||
TCP_IP = '127.0.0.1'
|
||||
TCP_PORT = 30002
|
||||
|
||||
BUFFER_SIZE = 1024
|
||||
message = "12.3,20.15"
|
||||
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s.connect((TCP_IP, TCP_PORT))
|
||||
s.send(message.encode('utf-8'))
|
||||
|
||||
s.close()
|
||||
|
Loading…
Reference in a new issue