-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDuenger_Actuator.py
80 lines (65 loc) · 3.13 KB
/
Duenger_Actuator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import sys
sys.path.insert(0, '../')
import logging
logging.basicConfig(encoding='utf-8', level=logging.DEBUG)
from Actuators.Actuator import Actuator
import smbus
import struct
from Handler.SerialHandler import SerialHandler
class Duenger_Actuator(Actuator):
def __init__(self,name,collection,config:dict,*args,**kwargs):
structure={"pump":int,"runtime":int}
super().__init__(name=name,collection=collection,config=config,dataStructure=structure,*args,**kwargs)
self.__deviceName = config["deviceName"]
self.__pump = config["pump"]
self.__runtime = config["runtime"]
if(super().active):
self.__serialHandler = SerialHandler(baudrate=19200)
#prüfe ob benötigtes Device vorhanden ist.
if(self.__serialHandler.check_for_device(self.__deviceName) == False):
raise Exception(f"Gerät {self.__deviceName} nicht vom SerialHandler gefunden")
def set(self,state):
if(super().active):
#HOTFIX TODO: Typsichere Lösung finden
if(type(state) != bool):
try:
state = int(state)
except:
pass
if(type(state) == bool):
if(state):
command = {"command":"setPump","pump":self.__pump,"runtime":self.__runtime}
logging.info(f"Starte Duenger_Actuator {super().name} für {self.__runtime} ms")
self.__serialHandler.send_dict(self.__deviceName,command)
super().safeToMemory({"pump":self.__pump,"runtime":self.__runtime})
elif(type(state) == int):
command = {"command":"setPump","pump":self.__pump,"runtime":state}
logging.info(f"Starte Duenger_Actuator {super().name} für {state} ms")
self.__serialHandler.send_dict(self.__deviceName,command)
super().safeToMemory({"pump":self.__pump,"runtime":state})
else:
logging.error(f"{super().name} ist nicht aktiv, wurde aber versucht per run() ausgeführt werden. Das sollte nicht passieren.")
@staticmethod
def getInputDesc():
return {
"runtime":{"type":int,"desc":"Milliseconkunden, die die Pumpe laufen soll"}
}
@staticmethod
def getConfigDesc():
return {
"deviceName":{"type":str,"desc":"Name, den das Geräte hat, welches seriel angeschlossen ist. Wird vom geräte per command 'info' abgefragt."},
"pump":{"type":int,"desc":"Pumpe, die benutzt werden soll (1-3)"},
"runtime":{"type":int,"desc":"Zeit die die Pumpe laufen soll"},
"defaultValue":{"type":int,"desc":"Standard Wert, der verwendet werden kann, wenn man den Aktor vom UserInterface steuern möchte"}
}
if __name__ == "__main__":
duengerActuator = Duenger_Actuator(
"DuengerPumpe_Serial",
"DuengerPumpe_Serial",
{
"deviceName":"Düngeranlage",
"pump": 1,
"runtime": 1000
}
)
duengerActuator.set()