Skip to content

Commit

Permalink
dev: add some modules
Browse files Browse the repository at this point in the history
  • Loading branch information
wwhai committed Jun 20, 2022
1 parent 61683ec commit 3b695e5
Show file tree
Hide file tree
Showing 11 changed files with 210 additions and 67 deletions.
31 changes: 26 additions & 5 deletions device/rtu485_ther_device.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package device

import (
"context"
"encoding/json"
golog "log"
"os"
"rulex/driver"
"rulex/typex"
"rulex/utils"
Expand All @@ -20,6 +23,14 @@ type rtu485_ther struct {
slaverIds []byte
}

var __debug bool = false

// Example: 0x02 0x92 0xFF 0x98
type __sensor_data struct {
TEMP float32 `json:"temp"` //系数: 0.1
HUM float32 `json:"hum"` //系数: 0.1
}

/*
*
* 温湿度传感器
Expand Down Expand Up @@ -55,17 +66,19 @@ func (tss *rtu485_ther) Start(cctx typex.CCTX) error {

// 串口配置固定写法
handler := modbus.NewRTUClientHandler(rtuConfig.Uart)
handler.BaudRate = 9600
handler.BaudRate = 4800
handler.DataBits = 8
handler.Parity = "N"
handler.StopBits = 1
handler.Timeout = time.Duration(*mainConfig.Timeout) * time.Second
// handler.Logger = golog.New(os.Stdout, "485THerSource: ", log.LstdFlags)
if __debug {
handler.Logger = golog.New(os.Stdout, "485THerSource: ", log.LstdFlags)
}
if err := handler.Connect(); err != nil {
return err
}
client := modbus.NewClient(handler)
tss.driver = driver.NewRtu485_THer_Driver(tss.Details(), tss.RuleEngine, client)
tss.driver = driver.NewRtu485THerDriver(tss.Details(), tss.RuleEngine, client)
tss.slaverIds = append(tss.slaverIds, mainConfig.SlaverIds...)
//---------------------------------------------------------------------------------
// Start
Expand All @@ -75,7 +88,8 @@ func (tss *rtu485_ther) Start(cctx typex.CCTX) error {
go func(ctx context.Context, slaverId byte, rtuDriver typex.XExternalDriver, handler *modbus.RTUClientHandler) {
ticker := time.NewTicker(time.Duration(5) * time.Second)
defer ticker.Stop()
buffer := make([]byte, 32) //32字节数据
// {"SlaveId":1,"Data":"{\"temp\":28.7,\"hum\":66.1}}
buffer := make([]byte, 64) //32字节数据
for {
<-ticker.C
select {
Expand All @@ -93,7 +107,14 @@ func (tss *rtu485_ther) Start(cctx typex.CCTX) error {
if err != nil {
log.Error(err)
} else {
tss.RuleEngine.WorkDevice(tss.RuleEngine.GetDevice(tss.PointId), string(buffer[:n]))
Device := tss.RuleEngine.GetDevice(tss.PointId)
sdata := __sensor_data{}
json.Unmarshal(buffer[:n], &sdata)
bytes, _ := json.Marshal(map[string]interface{}{
"slaveId": handler.SlaveId,
"data": sdata,
})
tss.RuleEngine.WorkDevice(Device, string(bytes))
}
}

Expand Down
2 changes: 1 addition & 1 deletion device/tss200_v_0_2_device.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (tss *tss200_v_0_2_sensor) Start(cctx typex.CCTX) error {
return err
}
client := modbus.NewClient(handler)
tss.driver = driver.NewRtu485_THer_Driver(tss.Details(), tss.RuleEngine, client)
tss.driver = driver.NewTSS200_v_0_2_Driver(tss.Details(), tss.RuleEngine, client)
tss.slaverIds = append(tss.slaverIds, mainConfig.SlaverIds...)
//---------------------------------------------------------------------------------
// Start
Expand Down
34 changes: 22 additions & 12 deletions driver/rtu485_ther_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,15 @@ import (
"rulex/utils"

"github.com/goburrow/modbus"
"github.com/ngaut/log"
)

// Example: 0x02 0x92 0xFF 0x98
type sensor_data struct {
TEMP float32 `json:"temp"` //系数: 0.1
HUM float32 `json:"hum"` //系数: 0.1
}

// 协议:UART:485 baud=4800 无校验 数据位1 停止位1
// 功能码为: 3(ReadHoldingRegisters)
// 站号为:1
Expand All @@ -30,7 +37,7 @@ type rtu485_THer_Driver struct {
RuleEngine typex.RuleX
}

func NewRtu485_THer_Driver(d *typex.Device, e typex.RuleX,
func NewRtu485THerDriver(d *typex.Device, e typex.RuleX,
client modbus.Client) typex.XExternalDriver {
return &rtu485_THer_Driver{
state: typex.DRIVER_STOP,
Expand All @@ -41,10 +48,12 @@ func NewRtu485_THer_Driver(d *typex.Device, e typex.RuleX,
}

func (rtu485 *rtu485_THer_Driver) Test() error {
return nil
_, err := rtu485.client.ReadHoldingRegisters(0x00, 2)
return err
}

func (rtu485 *rtu485_THer_Driver) Init(map[string]string) error {

return nil
}

Expand All @@ -57,21 +66,22 @@ func (rtu485 *rtu485_THer_Driver) State() typex.DriverState {
}

func (rtu485 *rtu485_THer_Driver) Read(data []byte) (int, error) {
// Example: 0x02 0x92 0xFF 0x98
type sensor_data struct {
TEMP float32 `json:"temp"` //系数: 0.1
HUM float32 `json:"hum"` //系数: 0.1
}

results, err := rtu485.client.ReadHoldingRegisters(0x00, 2)
if len(results) == 2 {
var length = 0
if len(results) == 4 {
sdata := sensor_data{
HUM: float32(utils.BToU16(results, 0, 1)) * 0.01,
TEMP: float32(utils.BToU16(results, 1, 2)) * 0.01,
HUM: float32(utils.BToU16(results, 0, 2)) * 0.1,
TEMP: float32(utils.BToU16(results, 2, 4)) * 0.1,
}
bytes, err := json.Marshal(sdata)
if err != nil {
log.Error(err)
}
bytes, _ := json.Marshal(sdata)
copy(data, bytes)
length = len(bytes)
}
return len(data), err
return length, err
}

func (rtu485 *rtu485_THer_Driver) Write(_ []byte) (int, error) {
Expand Down
1 change: 1 addition & 0 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func (e *RuleEngine) PushDeviceQueue(Device *typex.Device, data string) error {
func (e *RuleEngine) PushOutQueue(out *typex.OutEnd, data string) error {
qd := typex.QueueData{
E: e,
D: nil,
I: nil,
O: out,
Data: data,
Expand Down
File renamed without changes.
14 changes: 7 additions & 7 deletions release_pkg.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,23 @@ create_pkg() {
VERSION=$(git describe --tags --always --abbrev=0)
echo "Create package: ${rulex-$1-${VERSION}}"
if [ "$1" == "x64windows" ]; then
cd rulexc
cd rulex-cli
go get
CGO_ENABLED=1 GOOS=windows GOARCH=amd64 go build -v -o rulex-cli.exe main.go
cd ../
cp ./rulexc/rulex-cli.exe ./
cp ./rulex-cli/rulex-cli.exe ./
zip -r _release/rulex-$1-${VERSION}.zip \
./rulex-$1.exe \
./rulex-cli.exe \
./conf/rulex.ini
rm -rf ./rulex-cli.exe
rm -rf ./rulex-$1.exe
else
cd rulexc
cd rulex-cli
go get
go build -v -o rulex-cli main.go
cd ../
cp ./rulexc/rulex-cli ./
cp ./rulex-cli/rulex-cli ./
zip -r _release/rulex-$1-${VERSION}.zip \
./rulex-$1 \
./rulex-cli \
Expand Down Expand Up @@ -138,8 +138,8 @@ cross_compile() {
#
#
#
build_rulexcli() {
git clone ${RESPOSITORY}/rulex-cli.git rulexc
build_rulex_cli() {
git clone ${RESPOSITORY}/rulex-cli.git
}
#
# fetch dashboard
Expand Down Expand Up @@ -199,7 +199,7 @@ check_cmd
init_env
cp -r $(ls | egrep -v '^_build$') ./_build/
cd ./_build/
build_rulexcli
build_rulex_cli
fetch_dashboard
cross_compile
gen_changelog
8 changes: 7 additions & 1 deletion rulexlib/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@ package rulexlib

import (
"rulex/typex"

"github.com/ngaut/log"
)

func handleDataFormat(e typex.RuleX, uuid string, incoming string) {
outEnd := e.GetOutEnd(uuid)
e.PushOutQueue(outEnd, incoming)
if outEnd != nil {
e.PushOutQueue(outEnd, incoming)
}else {
log.Error()
}
}
6 changes: 3 additions & 3 deletions target/mqtt_target.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (mm *mqttOutEndTarget) Start(cctx typex.CCTX) error {
mm.DataTopic = mainConfig.DataTopic
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
opts.SetPingTimeout(3 * time.Second)
opts.SetPingTimeout(5 * time.Second)
opts.SetAutoReconnect(true)
opts.SetMaxReconnectInterval(5 * time.Second)
mm.client = mqtt.NewClient(opts)
Expand Down Expand Up @@ -132,7 +132,7 @@ func (mm *mqttOutEndTarget) Details() *typex.OutEnd {
//
func (mm *mqttOutEndTarget) To(data interface{}) (interface{}, error) {
if mm.client != nil {
return mm.client.Publish(mm.DataTopic, 2, false, data).Error(), nil
return mm.client.Publish(mm.DataTopic, 1, false, data).Error(), nil
}
return nil, errors.New("mqtt client is nil")
}
Expand All @@ -143,5 +143,5 @@ func (mm *mqttOutEndTarget) To(data interface{}) (interface{}, error) {
*
*/
func (*mqttOutEndTarget) Configs() *typex.XConfig {
return core.GenOutConfig(typex.MQTT_TARGET, "MQTT_TARGET", httpConfig{})
return core.GenOutConfig(typex.MQTT_TARGET, "MQTT", httpConfig{})
}
90 changes: 56 additions & 34 deletions test/485ther_gw_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package test

import (
"os"
"os/signal"
"rulex/core"
"rulex/engine"
httpserver "rulex/plugin/http_server"
"rulex/rulexlib"
"syscall"

"rulex/typex"
"testing"
"time"

"github.com/ngaut/log"
)

/*
Expand All @@ -23,63 +27,81 @@ func Test_modbus_485_sensor_gateway(t *testing.T) {
rulexlib.StartLuaLogger(core.GlobalConfig.LuaLogPath)
core.SetLogLevel()
core.SetPerformance()
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGABRT, syscall.SIGTERM)
engine := engine.NewRuleEngine(mainConfig)
engine.Start()

hh := httpserver.NewHttpApiServer(2580, "../rulex-test_"+time.Now().Format("2006-01-02-15_04_05")+".db", engine)
hh := httpserver.NewHttpApiServer(2580, "./rulex.db", engine)

// HttpApiServer loaded default
if err := engine.LoadPlugin("plugin.http_server", hh); err != nil {
t.Fatal("Rule load failed:", err)
}
// RTU485_THER Inend
RTU485_THERInend := typex.NewDevice("RTU485_THER", "RTU485_THER", "RTU485_THER", "", map[string]interface{}{
"slaverIds": []int{1},
"timeout": 5,
"frequency": 5,
"config": map[string]interface{}{
"uart": "COM3",
"baudRate": 115200,
"dataBits": 8,
"parity": "N",
"stopBits": 1,
RTU485Device := typex.NewDevice("RTU485_THER",
"温湿度采集器", "温湿度采集器", "", map[string]interface{}{
"slaverIds": []int{1},
"timeout": 5,
"frequency": 5,
"config": map[string]interface{}{
"uart": "COM6",
"baudRate": 4800,
"dataBits": 8,
"parity": "N",
"stopBits": 1,
},
})
RTU485Device.UUID = "RTU485Device1"
if err := engine.LoadDevice(RTU485Device); err != nil {
t.Error("RTU485Device load failed:", err)
}
mqttOutEnd := typex.NewOutEnd(
"MQTT",
"MQTT桥接",
"MQTT桥接", map[string]interface{}{
"Host": "127.0.0.1",
"Port": 1883,
"DataTopic": "iothub/upstream/IGW00000001",
"ClientId": "IGW00000001",
"Username": "IGW00000001",
"Password": "IGW00000001",
},
})

if err := engine.LoadDevice(RTU485_THERInend); err != nil {
t.Error("grpcInend load failed:", err)
)
mqttOutEnd.UUID = "mqttOutEnd"
if err := engine.LoadOutEnd(mqttOutEnd); err != nil {
t.Error("mqttOutEnd load failed:", err)
}
rule := typex.NewRule(engine,
"uuid",
"Just a test",
"Just a test",
"数据推送至IOTHUB",
"数据推送至IOTHUB",
[]string{},
[]string{RTU485_THERInend.UUID}, // 数据来自设备
[]string{RTU485Device.UUID}, // 数据来自设备
`function Success() print("[LUA Success Callback]=> OK") end`,
`
Actions = {
function(data)
local table = rulexlib:J2T(data)
local value = table['value']
local t = rulexlib:HsubToN(value, 5, 8)
local h = rulexlib:HsubToN(value, 0, 4)
local t1 = rulexlib:HToN(string.sub(value, 5, 8))
local h2 = rulexlib:HToN(string.sub(value, 0, 4))
print('Data ========> ', rulexlib:T2J({
Device = "TH00000001",
Ts = rulexlib:TsUnix(),
T = t,
H = h,
T1 = t1,
H2 = h2
}))
local t = rulexlib:J2T(data)
t['type'] = 'sub_device'
t['sn'] = 'IGW00000001'
rulexlib:DataToMqtt('mqttOutEnd', rulexlib:T2J(t))
return true, data
end
}`,
`function Failed(error) print("[LUA Failed Callback]", error) end`)
if err := engine.LoadRule(rule); err != nil {
t.Error(err)
}
time.Sleep(3 * time.Second)
s := <-c
log.Warn("Received stop signal:", s)
engine.Stop()

if err := typex.GLOBAL_LOGGER.Close(); err != nil {
return
}
if err := typex.LUA_LOGGER.Close(); err != nil {
return
}
os.Exit(0)
}
Loading

0 comments on commit 3b695e5

Please sign in to comment.