-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscs_simulator_mosaik.py
81 lines (61 loc) · 2.11 KB
/
scs_simulator_mosaik.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# scs_simulator_mosaik.py
"""
Mosaik interface for the example simulator.
"""
import mosaik_api
import scs_simulator
META = {
'models': {
'ScsModel': {
'public': True,
'params': ['device_addr, datatype'],
'attrs': ['SD', 'val'],
},
},
}
class ScsSim(mosaik_api.Simulator):
def __init__(self):
super().__init__(META)
self.simulator = simulator.Simulator()
self.eid_prefix = 'Model_'
self.entities = {} # Maps EIDs to model indices in self.simulator
def init(self, sid, eid_prefix=None):
if eid_prefix is not None:
self.eid_prefix = eid_prefix
return self.meta
def create(self, num, model, device_addr, datatype):
next_eid = len(self.entities)
entities = []
for i in range(next_eid, next_eid + num):
eid = '%s%d' % (self.eid_prefix, i)
self.simulator.add_model(device_addr[i], datatype[i])
self.entities[eid] = i
entities.append({'eid': eid, 'type': model})
return entities
def step(self, time, inputs):
# Get inputs
deltas = {}
for eid, attrs in inputs.items():
for attr, values in attrs.items():
model_idx = self.entities[eid]
new_delta = sum(values.values())
deltas[model_idx] = new_delta
# Perform simulation step
self.simulator.step(deltas)
return time + 60 # Step size is 1 minute
def get_data(self, outputs):
models = self.simulator.models
data = {}
for eid, attrs in outputs.items():
model_idx = self.entities[eid]
data[eid] = {}
for attr in attrs:
if attr not in self.meta['models']['ExampleModel']['attrs']:
raise ValueError('Unknown output attribute: %s' % attr)
# Get model.val or model.delta:
data[eid][attr] = getattr(models[model_idx], attr)
return data
def main():
return mosaik_api.start_simulation(ExampleSim())
if __name__ == '__main__':
main()