-
Notifications
You must be signed in to change notification settings - Fork 2
/
__init__.py
139 lines (102 loc) · 3.99 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# -*- coding: utf-8 -*-
import os
from subprocess import Popen, PIPE, call
from modules import cbpi, app
from modules.core.hardware import SensorPassive
import json
import os, re, threading, time
from flask import Blueprint, render_template, request
from modules.core.props import Property
from .pyowfs import Connection
blueprint = Blueprint('one_wire_owfs', __name__)
temp = 22
root = Connection('localhost:4304')
arr = []
def getSensors():
try:
arr = []
for s in root.find (family="28"):
dirname = s.get("address")
if dirname in arr:
##cbpi.app.logger.info("Device %s Found (Thermometer on owfs); double entry: skipped." % dirname)
pass
else:
cbpi.app.logger.info("Device %s Found (Family: %s, ID: %s, Type: %s, Thermometer on owfs)" % ( s.get("address"), s.get("family"), s.get("id"), s.get("type") ))
arr.append(s.get("address"))
for s in root.find (family="10"):
dirname = s.get("address")
if dirname in arr:
##cbpi.app.logger.info("Device %s Found (Thermometer on owfs); double entry: skipped." % dirname)
pass
else:
cbpi.app.logger.info("Device %s Found (Family: %s, ID: %s, Type: %s, Thermometer on owfs)" % ( s.get("address"), s.get("family"), s.get("id"), s.get("type") ))
arr.append(s.get("address"))
return arr
except:
return []
class myThread (threading.Thread):
value = 0
def __init__(self, sensor_name):
threading.Thread.__init__(self)
self.value = 0
self.sensor_name = sensor_name
self.runnig = True
def shutdown(self):
pass
def stop(self):
self.runnig = False
def run(self):
while self.runnig:
try:
##app.logger.info("READ OWFS TEMP %s" % (self.sensor_name))
## Test Mode
if self.sensor_name is None:
self.value = 0 ##sensor.temperature
app.logger.info("READ OWFS TEMP %s; self.sensor_name is None" % (self.sensor_name))
break
x=root.find(address=self.sensor_name)[0]
x.use_cache (0)
self.value = float(x.get("temperature"))
app.logger.info("READ OWFS TEMP %s; temp: %04f" % (self.sensor_name, self.value))
except:
pass
time.sleep(4)
@cbpi.sensor
class ONE_WIRE_OWFS_SENSOR(SensorPassive):
sensor_name = Property.Select("Sensor", getSensors(), description="The OneWire OWFS sensor address.")
offset = Property.Number("Offset", True, 0, description="Offset which is added to the received sensor data. Positive and negative values are both allowed.")
def init(self):
self.t = myThread(self.sensor_name)
def shudown():
shudown.cb.shutdown()
shudown.cb = self.t
self.t.start()
def stop(self):
try:
self.t.stop()
except:
pass
def read(self):
if self.get_config_parameter("unit", "C") == "C":
self.data_received(round(self.t.value + self.offset_value(), 2))
else:
self.data_received(round(9.0 / 5.0 * self.t.value + 32 + self.offset_value(), 2))
@cbpi.try_catch(0)
def offset_value(self):
return float(self.offset)
@classmethod
def init_global(self):
try:
##call(["modprobe", "w1-gpio"])
##call(["modprobe", "w1-therm"])
pass
except Exception as e:
pass
@blueprint.route('/<int:t>', methods=['GET'])
def set_temp(t):
global temp
temp = t
return ('', 204)
@cbpi.initalizer()
def init(cbpi):
cbpi.app.register_blueprint(blueprint, url_prefix='/api/one_wire_owfs')