-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSerialHandler.py
109 lines (84 loc) · 3.69 KB
/
SerialHandler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import serial.tools.list_ports
import time
import json
import logging
logging.basicConfig(encoding='utf-8', level=logging.ERROR)
class SerialHandler:
def __init__(self,baudrate = 19200):
self.__baudrate = baudrate
self.devices = self.__find_devices()
def __send_command(self, port, command:dict,timeout=1.0):
ser = serial.Serial(port, self.__baudrate, timeout=timeout)
json_data = json.dumps(command) # Daten als JSON-String serialisieren
ser.write(json_data.encode()) # String in Bytes umwandeln und senden
# qself.__ser.close()
ser.close()
def __read_response(self, port, timeout=1.0):
ser = serial.Serial(port, self.__baudrate, timeout=timeout)
#wait till response is available, but not longer than timeout
start_time = time.time()
while ser.in_waiting == 0:
if time.time() - start_time > timeout:
logging.error(f"Timeout beim Warten auf Antwort von {port}. Prüfe ob eine überhaupt eine Antwort erwartet wird.")
return None
raw_response = ser.readline().decode().strip()
ser.close()
#create dict from json string. When response is not a valid json string, return None and log error
try:
response = json.loads(raw_response)
except Exception as e:
logging.error(f"Response von {port} ist kein gültiger JSON-String: {raw_response}")
response = None
#ser.close()
return response
def __find_devices(self):
devices = []
ports = list(serial.tools.list_ports.comports())
for port in ports:
if "USB2.0-Ser!" in port.description or "USB Serial" in port.description:
#ESP D1 Minis werden so erkannt
device = {
'port': port.device,
'raw': port,
'name': None,
'status': None
}
devices.append(device)
else:
#weitere Systeme können ergänzt werden
pass
for device in devices:
self.__send_command(device['port'],{"command":"info"})
response = self.__read_response(device['port'])
device['name'] = response['name']
return devices
def check_for_device(self,deviceName:str):
for device in self.devices:
if device['name'] == deviceName:
return True
return False
def send_dict(self,deviceName:str,command:dict,readResponse:bool=False):
#find device with given name
device = None
for d in self.devices:
if d['name'] == deviceName:
device = d
break
if device is None:
raise Exception(f"Device mit dem Namen {deviceName} nicht gefunden.")
#send command
self.__send_command(device['port'],command)
#read response if requested
if readResponse:
response = self.__read_response(device['port'])
return response
else:
return None
if __name__ == "__main__":
serial_handler1 = SerialHandler()
deivceIsThere = serial_handler1.check_for_device("Düngeranlage")
result = serial_handler1.send_dict("Düngeranlage",{"command":"setPump","pump":3,"runtime":1000},readResponse=False)
result = serial_handler1.send_dict("Düngeranlage",{"command":"setPump","pump":2,"runtime":1000},readResponse=False)
result = serial_handler1.send_dict("Düngeranlage",{"command":"setPump","pump":1,"runtime":1000},readResponse=False)
print(result)
print("done")