-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy patharm_control_client.py
162 lines (140 loc) · 4.25 KB
/
arm_control_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# arm_control_client.py
import asyncio
import websockets
import json
import serial
import time
ser = serial.Serial(
port='COM8', # 根据实际情况修改端口号
baudrate=115200,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=1
)
async def receive_classification():
async with websockets.connect("ws://localhost:5000") as websocket:
print("Connected to EEG processing server")
while True:
classification_data = await websocket.recv()
print("here")
classification_data = json.loads(classification_data)
arm_command = convert_to_arm_command(classification_data)
print(f"Received classification: {classification_data}")
ser.write(arm_command.encode())
print(f"Sent command: {arm_command}")
global steer0
global steer1
global steer2
global steer3
global steer4
global hand_open
def move_left(steer0):
if steer0 == 500:
return "#000P500T1500!\n"
else:
steer0 -= 100
return f"#000P{steer0}T1500!\n"
def move_right(steer0):
if steer0 == 2000:
return "#000P2000T1500!\n"
else:
steer0 += 100
return f"#000P{steer0}T1500!\n"
def move_down(steer1):
if steer1 == 1300:
return "#001P1300T1500!\n"
else:
steer1 -= 100
return f"#001P{steer1}T1500!\n"
def move_up(steer1):
if steer1 == 1700:
return "#001P1700T1500!\n"
else:
steer1 += 100
return f"#001P{steer1}T1500!\n"
def move_forward(steer2):
if steer2 == 1300:
return "#002P1300T1500!\n"
else:
steer2 -= 100
return f"#002P{steer2}T1500!\n"
def move_backward(steer2):
if steer2 == 1700:
return "#002P1700T1500!\n"
else:
steer2 += 100
return f"#002P{steer2}T1500!\n"
def move_forward3(steer3):
if steer3 == 2000:
return "#003P1000T1500!\n"
else:
steer3 += 100
return f"#003P{steer3}T1500!\n"
def move_backward3(steer3):
if steer3 == 1000:
return "#003P2000T1500!\n"
else:
steer3 -= 100
return f"#003P{steer3}T1500!\n"
def move_left4(steer4):
if steer4 == 1000:
return "#004P1000T1500!\n"
else:
steer4 -= 100
return f"#004P{steer4}T1500!\n"
def move_right4(steer4):
if steer4 == 2000:
return "#004P2000T1500!\n"
else:
steer4 += 100
return f"#004P{steer4}T1500!\n"
def convert_to_arm_command(classification_data):
# 示例转换函数,需根据具体需求实现
# if classification_data["classification"] == "move_up":
# return move_left(steer0)
# elif classification_data["classification"] == "move_down":
# return "#001P1000T1500!\n"
# elif classification_data["classification"] == "move_left":
# return "#002P1000T1500!\n"
if classification_data == 0:
return move_left(steer0)
elif classification_data == 1:
return move_right(steer0)
elif classification_data == 2:
return move_down(steer1)
elif classification_data == 3:
return move_up(steer1)
elif classification_data == 4:
return move_forward(steer2)
elif classification_data == 5:
return move_backward(steer2)
elif classification_data == 6:
return move_forward3(steer3)
elif classification_data == 7:
return move_backward3(steer3)
# elif classification_data == 8:
# return move_left4(steer4)
# elif classification_data == 9:
# return move_right4(steer4)
elif classification_data == "blink":
if hand_open == True:
return "#005P1500T1500!\n"
else:
return "#005P500T1500!\n"
return "$DST!"
if __name__ == '__main__':
steer0 = 1500
steer1 = 1500
steer2 = 1500
steer3 = 1500
steer4 = 1500
hand_open = False
ser.write("#005P500T1500!\n".encode())
time.sleep(3)
ser.write("#005P1500T1500!\n".encode())
# for i in range(5):
# ser.write(move_forward(steer4).encode())
print("Arm control client started")
asyncio.get_event_loop().run_until_complete(receive_classification())
asyncio.get_event_loop().run_forever()