-
Notifications
You must be signed in to change notification settings - Fork 2
/
mqtt_reader.go
102 lines (90 loc) · 2.19 KB
/
mqtt_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package main
import (
"bytes"
"encoding/json"
"github.com/AlexxIT/gw3/mqtt"
proto "github.com/huin/mqtt"
"github.com/rs/zerolog/log"
"net"
"strings"
"time"
)
var mqttClient *mqtt.ClientConn
func mqttReader() {
for {
conn, err := net.Dial("tcp", "127.0.0.1:1883")
if err != nil {
log.Error().Caller().Err(err).Send()
} else {
mqttClient = mqtt.NewClientConn(conn)
if err = mqttClient.Connect(&proto.Connect{
ClientId: "gw3",
WillRetain: true,
WillTopic: "gw3/"+gw.WiFi.MAC+"/state",
WillMessage: `{"state":"offline"}`,
}); err != nil {
log.Error().Caller().Err(err).Send()
} else {
gw.updateInfo()
mqttClient.Subscribe([]proto.TopicQos{
{Topic: "gw3/+/set"},
})
for m := range mqttClient.Incoming {
buf := bytes.Buffer{}
if err = m.Payload.WritePayload(&buf); err != nil {
log.Error().Caller().Err(err).Send()
continue
}
items := strings.Split(m.TopicName, "/")
if len(items) == 3 && items[2] == "set" {
mac := items[1]
if device, ok := devices[mac]; ok {
device.(DeviceGetSet).setState(buf.Bytes())
}
}
}
}
mqttClient = nil
}
time.Sleep(time.Second)
}
}
func mqttPublish(topic string, data interface{}, retain bool) {
if mqttClient == nil {
return
}
var payload []byte
switch data.(type) {
case []byte:
payload = data.([]byte)
case string:
payload = []byte(data.(string))
default:
var err error
if payload, err = json.Marshal(data); err != nil {
log.Warn().Err(err).Send()
return
}
}
//var re = regexp.MustCompile(`([0-9A-F]{2}:[0-9A-F]{2}:[0-9A-F]{2}):[0-9A-F]{2}:[0-9A-F]{2}:[0-9A-F]{2}`)
//topic = re.ReplaceAllString(topic, `$1:FF:FF:FF`)
//payload = re.ReplaceAll(payload, []byte(`$1:FF:FF:FF`))
msg := &proto.Publish{
Header: proto.Header{Retain: retain},
TopicName: topic,
Payload: proto.BytesPayload(payload),
}
mqttClient.Publish(msg)
}
type mqttLogWriter struct{}
func (m mqttLogWriter) Write(p []byte) (n int, err error) {
if mqttClient != nil {
msg := &proto.Publish{
Header: proto.Header{},
TopicName: "gw3/stdout",
Payload: proto.BytesPayload(p),
}
mqttClient.Publish(msg)
}
return len(p), nil
}