-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathActuator.py
86 lines (67 loc) · 2.15 KB
/
Actuator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import sys
from abc import ABC, abstractmethod
from queue import Queue
sys.path.insert(0, '../')
from Handler.DataHandler import DataHandler
import logging
from datetime import datetime
from Utils.Status import Status
class Actuator(ABC):
__name : str
__collection : str
status: Status
def __init__(self,
name,
collection,
config:dict,
dataStructure:dict,
description:str="",
active:bool = True
):
self.__dataHandler = DataHandler()
self.__name = name
self.__collection = collection
self.__config = config
self.__active = active
self.__description = description
self.__dataHandler.setupDataStack(name=collection,structure=dataStructure)
self.__lastState = None
self.status = Status.READY
def safeToMemory(self,data):
retDict = data.copy()
retDict["time"] = datetime.now()
self.__lastState = retDict
self.__dataHandler.safeData(self.__collection,data=retDict)
return retDict
def getInfos(self) -> dict:
inputDesc = self.getInputDesc()
#itearte through inputDesc and cast all types to string: type bool -> "bool"
for key in inputDesc.keys():
inputDesc[key]["type"] = str(inputDesc[key]["type"])
return {
"active":self.__active,
"name":self.__name,
"collection":self.__collection,
"class":self.__class__.__name__,
"description":self.__description,
"config":self.__config,
"state":self.__lastState,
"inputDesc":inputDesc
}
def getHistory(self,length):
result = self.__dataHandler.readData(self.__collection,length)
return result
@property
def active(self):
return self.__active
@property
def name(self):
return self.__name
@staticmethod
@abstractmethod
def getConfigDesc():
pass
@staticmethod
@abstractmethod
def getInputDesc():
pass