diff --git a/device/rtu485_ther_device.go b/device/rtu485_ther_device.go index eb721040f..905d8c3bf 100644 --- a/device/rtu485_ther_device.go +++ b/device/rtu485_ther_device.go @@ -2,6 +2,9 @@ package device import ( "context" + "encoding/json" + golog "log" + "os" "rulex/driver" "rulex/typex" "rulex/utils" @@ -20,6 +23,14 @@ type rtu485_ther struct { slaverIds []byte } +var __debug bool = false + +// Example: 0x02 0x92 0xFF 0x98 +type __sensor_data struct { + TEMP float32 `json:"temp"` //系数: 0.1 + HUM float32 `json:"hum"` //系数: 0.1 +} + /* * * 温湿度传感器 @@ -55,17 +66,19 @@ func (tss *rtu485_ther) Start(cctx typex.CCTX) error { // 串口配置固定写法 handler := modbus.NewRTUClientHandler(rtuConfig.Uart) - handler.BaudRate = 9600 + handler.BaudRate = 4800 handler.DataBits = 8 handler.Parity = "N" handler.StopBits = 1 handler.Timeout = time.Duration(*mainConfig.Timeout) * time.Second - // handler.Logger = golog.New(os.Stdout, "485THerSource: ", log.LstdFlags) + if __debug { + handler.Logger = golog.New(os.Stdout, "485THerSource: ", log.LstdFlags) + } if err := handler.Connect(); err != nil { return err } client := modbus.NewClient(handler) - tss.driver = driver.NewRtu485_THer_Driver(tss.Details(), tss.RuleEngine, client) + tss.driver = driver.NewRtu485THerDriver(tss.Details(), tss.RuleEngine, client) tss.slaverIds = append(tss.slaverIds, mainConfig.SlaverIds...) //--------------------------------------------------------------------------------- // Start @@ -75,7 +88,8 @@ func (tss *rtu485_ther) Start(cctx typex.CCTX) error { go func(ctx context.Context, slaverId byte, rtuDriver typex.XExternalDriver, handler *modbus.RTUClientHandler) { ticker := time.NewTicker(time.Duration(5) * time.Second) defer ticker.Stop() - buffer := make([]byte, 32) //32字节数据 + // {"SlaveId":1,"Data":"{\"temp\":28.7,\"hum\":66.1}} + buffer := make([]byte, 64) //32字节数据 for { <-ticker.C select { @@ -93,7 +107,14 @@ func (tss *rtu485_ther) Start(cctx typex.CCTX) error { if err != nil { log.Error(err) } else { - tss.RuleEngine.WorkDevice(tss.RuleEngine.GetDevice(tss.PointId), string(buffer[:n])) + Device := tss.RuleEngine.GetDevice(tss.PointId) + sdata := __sensor_data{} + json.Unmarshal(buffer[:n], &sdata) + bytes, _ := json.Marshal(map[string]interface{}{ + "slaveId": handler.SlaveId, + "data": sdata, + }) + tss.RuleEngine.WorkDevice(Device, string(bytes)) } } diff --git a/device/tss200_v_0_2_device.go b/device/tss200_v_0_2_device.go index 03a0c1a00..97bd7fb85 100644 --- a/device/tss200_v_0_2_device.go +++ b/device/tss200_v_0_2_device.go @@ -64,7 +64,7 @@ func (tss *tss200_v_0_2_sensor) Start(cctx typex.CCTX) error { return err } client := modbus.NewClient(handler) - tss.driver = driver.NewRtu485_THer_Driver(tss.Details(), tss.RuleEngine, client) + tss.driver = driver.NewTSS200_v_0_2_Driver(tss.Details(), tss.RuleEngine, client) tss.slaverIds = append(tss.slaverIds, mainConfig.SlaverIds...) //--------------------------------------------------------------------------------- // Start diff --git a/driver/rtu485_ther_driver.go b/driver/rtu485_ther_driver.go index 2a94cac64..63e5022c1 100644 --- a/driver/rtu485_ther_driver.go +++ b/driver/rtu485_ther_driver.go @@ -10,8 +10,15 @@ import ( "rulex/utils" "github.com/goburrow/modbus" + "github.com/ngaut/log" ) +// Example: 0x02 0x92 0xFF 0x98 +type sensor_data struct { + TEMP float32 `json:"temp"` //系数: 0.1 + HUM float32 `json:"hum"` //系数: 0.1 +} + // 协议:UART:485 baud=4800 无校验 数据位1 停止位1 // 功能码为: 3(ReadHoldingRegisters) // 站号为:1 @@ -30,7 +37,7 @@ type rtu485_THer_Driver struct { RuleEngine typex.RuleX } -func NewRtu485_THer_Driver(d *typex.Device, e typex.RuleX, +func NewRtu485THerDriver(d *typex.Device, e typex.RuleX, client modbus.Client) typex.XExternalDriver { return &rtu485_THer_Driver{ state: typex.DRIVER_STOP, @@ -41,10 +48,12 @@ func NewRtu485_THer_Driver(d *typex.Device, e typex.RuleX, } func (rtu485 *rtu485_THer_Driver) Test() error { - return nil + _, err := rtu485.client.ReadHoldingRegisters(0x00, 2) + return err } func (rtu485 *rtu485_THer_Driver) Init(map[string]string) error { + return nil } @@ -57,21 +66,22 @@ func (rtu485 *rtu485_THer_Driver) State() typex.DriverState { } func (rtu485 *rtu485_THer_Driver) Read(data []byte) (int, error) { - // Example: 0x02 0x92 0xFF 0x98 - type sensor_data struct { - TEMP float32 `json:"temp"` //系数: 0.1 - HUM float32 `json:"hum"` //系数: 0.1 - } + results, err := rtu485.client.ReadHoldingRegisters(0x00, 2) - if len(results) == 2 { + var length = 0 + if len(results) == 4 { sdata := sensor_data{ - HUM: float32(utils.BToU16(results, 0, 1)) * 0.01, - TEMP: float32(utils.BToU16(results, 1, 2)) * 0.01, + HUM: float32(utils.BToU16(results, 0, 2)) * 0.1, + TEMP: float32(utils.BToU16(results, 2, 4)) * 0.1, + } + bytes, err := json.Marshal(sdata) + if err != nil { + log.Error(err) } - bytes, _ := json.Marshal(sdata) copy(data, bytes) + length = len(bytes) } - return len(data), err + return length, err } func (rtu485 *rtu485_THer_Driver) Write(_ []byte) (int, error) { diff --git a/engine/engine.go b/engine/engine.go index 419567311..1e1ceafe9 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -116,6 +116,7 @@ func (e *RuleEngine) PushDeviceQueue(Device *typex.Device, data string) error { func (e *RuleEngine) PushOutQueue(out *typex.OutEnd, data string) error { qd := typex.QueueData{ E: e, + D: nil, I: nil, O: out, Data: data, diff --git a/engine/absdevice.go b/engine/load_device.go similarity index 100% rename from engine/absdevice.go rename to engine/load_device.go diff --git a/release_pkg.sh b/release_pkg.sh index fec8fd7bd..9d181f574 100644 --- a/release_pkg.sh +++ b/release_pkg.sh @@ -8,11 +8,11 @@ create_pkg() { VERSION=$(git describe --tags --always --abbrev=0) echo "Create package: ${rulex-$1-${VERSION}}" if [ "$1" == "x64windows" ]; then - cd rulexc + cd rulex-cli go get CGO_ENABLED=1 GOOS=windows GOARCH=amd64 go build -v -o rulex-cli.exe main.go cd ../ - cp ./rulexc/rulex-cli.exe ./ + cp ./rulex-cli/rulex-cli.exe ./ zip -r _release/rulex-$1-${VERSION}.zip \ ./rulex-$1.exe \ ./rulex-cli.exe \ @@ -20,11 +20,11 @@ create_pkg() { rm -rf ./rulex-cli.exe rm -rf ./rulex-$1.exe else - cd rulexc + cd rulex-cli go get go build -v -o rulex-cli main.go cd ../ - cp ./rulexc/rulex-cli ./ + cp ./rulex-cli/rulex-cli ./ zip -r _release/rulex-$1-${VERSION}.zip \ ./rulex-$1 \ ./rulex-cli \ @@ -138,8 +138,8 @@ cross_compile() { # # # -build_rulexcli() { - git clone ${RESPOSITORY}/rulex-cli.git rulexc +build_rulex_cli() { + git clone ${RESPOSITORY}/rulex-cli.git } # # fetch dashboard @@ -199,7 +199,7 @@ check_cmd init_env cp -r $(ls | egrep -v '^_build$') ./_build/ cd ./_build/ -build_rulexcli +build_rulex_cli fetch_dashboard cross_compile gen_changelog diff --git a/rulexlib/common.go b/rulexlib/common.go index 751b362f8..9a3b186c4 100644 --- a/rulexlib/common.go +++ b/rulexlib/common.go @@ -2,9 +2,15 @@ package rulexlib import ( "rulex/typex" + + "github.com/ngaut/log" ) func handleDataFormat(e typex.RuleX, uuid string, incoming string) { outEnd := e.GetOutEnd(uuid) - e.PushOutQueue(outEnd, incoming) + if outEnd != nil { + e.PushOutQueue(outEnd, incoming) + }else { + log.Error() + } } diff --git a/target/mqtt_target.go b/target/mqtt_target.go index 8afd4a0d9..487050b17 100644 --- a/target/mqtt_target.go +++ b/target/mqtt_target.go @@ -63,7 +63,7 @@ func (mm *mqttOutEndTarget) Start(cctx typex.CCTX) error { mm.DataTopic = mainConfig.DataTopic opts.OnConnect = connectHandler opts.OnConnectionLost = connectLostHandler - opts.SetPingTimeout(3 * time.Second) + opts.SetPingTimeout(5 * time.Second) opts.SetAutoReconnect(true) opts.SetMaxReconnectInterval(5 * time.Second) mm.client = mqtt.NewClient(opts) @@ -132,7 +132,7 @@ func (mm *mqttOutEndTarget) Details() *typex.OutEnd { // func (mm *mqttOutEndTarget) To(data interface{}) (interface{}, error) { if mm.client != nil { - return mm.client.Publish(mm.DataTopic, 2, false, data).Error(), nil + return mm.client.Publish(mm.DataTopic, 1, false, data).Error(), nil } return nil, errors.New("mqtt client is nil") } @@ -143,5 +143,5 @@ func (mm *mqttOutEndTarget) To(data interface{}) (interface{}, error) { * */ func (*mqttOutEndTarget) Configs() *typex.XConfig { - return core.GenOutConfig(typex.MQTT_TARGET, "MQTT_TARGET", httpConfig{}) + return core.GenOutConfig(typex.MQTT_TARGET, "MQTT", httpConfig{}) } diff --git a/test/485ther_gw_test.go b/test/485ther_gw_test.go index c73b89c73..f2c881b13 100644 --- a/test/485ther_gw_test.go +++ b/test/485ther_gw_test.go @@ -1,14 +1,18 @@ package test import ( + "os" + "os/signal" "rulex/core" "rulex/engine" httpserver "rulex/plugin/http_server" "rulex/rulexlib" + "syscall" "rulex/typex" "testing" - "time" + + "github.com/ngaut/log" ) /* @@ -23,56 +27,65 @@ func Test_modbus_485_sensor_gateway(t *testing.T) { rulexlib.StartLuaLogger(core.GlobalConfig.LuaLogPath) core.SetLogLevel() core.SetPerformance() + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGINT, syscall.SIGABRT, syscall.SIGTERM) engine := engine.NewRuleEngine(mainConfig) engine.Start() - hh := httpserver.NewHttpApiServer(2580, "../rulex-test_"+time.Now().Format("2006-01-02-15_04_05")+".db", engine) + hh := httpserver.NewHttpApiServer(2580, "./rulex.db", engine) // HttpApiServer loaded default if err := engine.LoadPlugin("plugin.http_server", hh); err != nil { t.Fatal("Rule load failed:", err) } // RTU485_THER Inend - RTU485_THERInend := typex.NewDevice("RTU485_THER", "RTU485_THER", "RTU485_THER", "", map[string]interface{}{ - "slaverIds": []int{1}, - "timeout": 5, - "frequency": 5, - "config": map[string]interface{}{ - "uart": "COM3", - "baudRate": 115200, - "dataBits": 8, - "parity": "N", - "stopBits": 1, + RTU485Device := typex.NewDevice("RTU485_THER", + "温湿度采集器", "温湿度采集器", "", map[string]interface{}{ + "slaverIds": []int{1}, + "timeout": 5, + "frequency": 5, + "config": map[string]interface{}{ + "uart": "COM6", + "baudRate": 4800, + "dataBits": 8, + "parity": "N", + "stopBits": 1, + }, + }) + RTU485Device.UUID = "RTU485Device1" + if err := engine.LoadDevice(RTU485Device); err != nil { + t.Error("RTU485Device load failed:", err) + } + mqttOutEnd := typex.NewOutEnd( + "MQTT", + "MQTT桥接", + "MQTT桥接", map[string]interface{}{ + "Host": "127.0.0.1", + "Port": 1883, + "DataTopic": "iothub/upstream/IGW00000001", + "ClientId": "IGW00000001", + "Username": "IGW00000001", + "Password": "IGW00000001", }, - }) - - if err := engine.LoadDevice(RTU485_THERInend); err != nil { - t.Error("grpcInend load failed:", err) + ) + mqttOutEnd.UUID = "mqttOutEnd" + if err := engine.LoadOutEnd(mqttOutEnd); err != nil { + t.Error("mqttOutEnd load failed:", err) } rule := typex.NewRule(engine, "uuid", - "Just a test", - "Just a test", + "数据推送至IOTHUB", + "数据推送至IOTHUB", []string{}, - []string{RTU485_THERInend.UUID}, // 数据来自设备 + []string{RTU485Device.UUID}, // 数据来自设备 `function Success() print("[LUA Success Callback]=> OK") end`, ` Actions = { function(data) - local table = rulexlib:J2T(data) - local value = table['value'] - local t = rulexlib:HsubToN(value, 5, 8) - local h = rulexlib:HsubToN(value, 0, 4) - local t1 = rulexlib:HToN(string.sub(value, 5, 8)) - local h2 = rulexlib:HToN(string.sub(value, 0, 4)) - print('Data ========> ', rulexlib:T2J({ - Device = "TH00000001", - Ts = rulexlib:TsUnix(), - T = t, - H = h, - T1 = t1, - H2 = h2 - })) + local t = rulexlib:J2T(data) + t['type'] = 'sub_device' + t['sn'] = 'IGW00000001' + rulexlib:DataToMqtt('mqttOutEnd', rulexlib:T2J(t)) return true, data end }`, @@ -80,6 +93,15 @@ func Test_modbus_485_sensor_gateway(t *testing.T) { if err := engine.LoadRule(rule); err != nil { t.Error(err) } - time.Sleep(3 * time.Second) + s := <-c + log.Warn("Received stop signal:", s) engine.Stop() + + if err := typex.GLOBAL_LOGGER.Close(); err != nil { + return + } + if err := typex.LUA_LOGGER.Close(); err != nil { + return + } + os.Exit(0) } diff --git a/test/data/485gw_stl.json b/test/data/485gw_stl.json new file mode 100644 index 000000000..89e874aa2 --- /dev/null +++ b/test/data/485gw_stl.json @@ -0,0 +1,83 @@ +{ + "configurations": [ + { + "identifier": "能力唯一标识符", + "is_required": false, + "data_type": { + "type": "int(整型)、float(浮点)、double(浮点)、text(字符串)、date(数字UTC时间,单位毫秒)、enum(枚举)、bool(布尔)、struct(结构体)、array(数组)", + "specs": { + "min": "参数最小值(int、float、double类型特有)", + "max": "参数最大值(int、float、double类型特有)", + "unit": "属性单位(int、float、double类型特有)", + "unitName": "属性单位名称(int、float、double类型特有)", + "length": "文本长度,范围为1~10240(text类型特有)", + "0": "0值文字描述(bool类型必须包含该值)", + "1": "1值文字描述(bool类型必须包含该值)", + "size": "数组元素个数,范围为1~50(array类型特有)", + "item": { + "type": "数组元素的类型(array类型特有)" + } + } + } + } + ], + "properties": [ + { + "identifier": "能力唯一标识符", + "is_required": false, + "data_type": { + "type": "int(整型)、float(浮点)、double(浮点)、text(字符串)、date(数字UTC时间,单位毫秒)、enum(枚举)、bool(布尔)、struct(结构体)、array(数组)", + "specs": { + "min": "参数最小值(int、float、double类型特有)", + "max": "参数最大值(int、float、double类型特有)", + "unit": "属性单位(int、float、double类型特有)", + "unitName": "属性单位名称(int、float、double类型特有)", + "length": "文本长度,范围为1~10240(text类型特有)", + "0": "0值文字描述(bool类型必须包含该值)", + "1": "1值文字描述(bool类型必须包含该值)", + "size": "数组元素个数,范围为1~50(array类型特有)", + "item": { + "type": "数组元素的类型(array类型特有)" + } + } + } + } + ], + "events": [ + { + "identifier": "事件唯一标识符", + "is_required": false, + "event_type": "事件类型(EVENT_TYPE_INFO、EVENT_TYPE_ALERT、EVENT_TYPE_ERROR)", + "output_data": [ + { + "identifier": "事件输出参数的标识符,事件内保持唯一即可", + "data_type": { + "type": "事件输出参数的类型" + } + } + ] + } + ], + "actions": [ + { + "identifier": "方法标识符", + "is_required": false, + "input_data_param": [ + { + "identifier": "方法输入参数标识符,输入参数内保证唯一", + "data_type": { + "type": "入参类型" + } + } + ], + "output_data_param": [ + { + "identifier": "方法输出参数标识符,输出参数内保证唯一", + "data_type": { + "type": "出参类型" + } + } + ] + } + ] +} \ No newline at end of file diff --git a/typex/xqueue.go b/typex/xqueue.go index c7f9ca39f..24f8a6d9d 100644 --- a/typex/xqueue.go +++ b/typex/xqueue.go @@ -116,12 +116,12 @@ func StartQueue(maxQueueSize int) { v, ok := qd.E.AllOutEnd().Load(qd.O.UUID) if ok { if _, err := v.(*OutEnd).Target.To(qd.Data); err != nil { - statistics.IncOut() - } else { - log.Error(err.Error()) + log.Error(err) statistics.IncOutFailed() - } + } else { + statistics.IncOut() + } } } }