Skip to content

Commit

Permalink
dev: fix uart driver; optimize some module
Browse files Browse the repository at this point in the history
  • Loading branch information
wwhai committed Aug 7, 2022
1 parent f39f4d7 commit 882c0c3
Show file tree
Hide file tree
Showing 23 changed files with 302 additions and 205 deletions.
16 changes: 16 additions & 0 deletions common/common_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,19 @@ type MqttConfig struct {
PubTopic string `json:"pubTopic" title:"上报TOPIC" info:"上报TOPIC"` // 上报数据的 Topic
SubTopic string `json:"subTopic" title:"订阅TOPIC" info:"订阅TOPIC"` // 上报数据的 Topic
}

/*
*
* 通用串口配置
*
*/
type GenericUartConfig struct {
Tag string `json:"tag" validate:"required" title:"数据Tag" info:"给数据打标签"`
Uart string `json:"uart" validate:"required" title:"串口路径" info:"本地系统的串口路径"`
BaudRate int `json:"baudRate" validate:"required" title:"波特率" info:"串口通信波特率"`
DataBits int `json:"dataBits" validate:"required" title:"数据位" info:"串口通信数据位"`
Frequency int64 `json:"frequency" validate:"required" title:"采集频率" info:""`
Timeout int `json:"timeout" validate:"required" title:"连接超时" info:""`
Parity string `json:"parity" validate:"required" title:"奇偶校验" info:"奇偶校验"`
StopBits int `json:"stopBits" validate:"required" title:"停止位" info:"串口通信停止位"`
}
4 changes: 2 additions & 2 deletions device/generic_modbus_device.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,14 @@ func (mdev *generic_modbus_device) Start(cctx typex.CCTX) error {

go func(ctx context.Context, Driver typex.XExternalDriver) {
ticker := time.NewTicker(time.Duration(5) * time.Second)
defer ticker.Stop()
buffer := make([]byte, common.T_64KB)
mdev.driver.Read(buffer) //清理缓存
for {
<-ticker.C
select {
case <-ctx.Done():
{
mdev.status = typex.DEV_STOP
ticker.Stop()
return
}
default:
Expand Down
160 changes: 160 additions & 0 deletions device/generic_uart_device.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package device

import (
"context"
"encoding/json"
"errors"
"sync"
"time"

"github.com/i4de/rulex/common"
"github.com/i4de/rulex/driver"
"github.com/i4de/rulex/glogger"
"github.com/i4de/rulex/typex"
"github.com/i4de/rulex/utils"
serial "github.com/wwhai/goserial"
)

type genericUartDevice struct {
typex.XStatus
status typex.DeviceState
RuleEngine typex.RuleX
driver typex.XExternalDriver
mainConfig common.GenericUartConfig
locker sync.Locker
}

/*
*
* 通用串口透传
*
*/
func NewGenericUartDevice(e typex.RuleX) typex.XDevice {
uart := new(genericUartDevice)
uart.locker = &sync.Mutex{}
uart.mainConfig = common.GenericUartConfig{}
uart.RuleEngine = e
return uart
}

//  初始化
func (uart *genericUartDevice) Init(devId string, configMap map[string]interface{}) error {
uart.PointId = devId
if err := utils.BindSourceConfig(configMap, &uart.mainConfig); err != nil {
glogger.GLogger.Error(err)
return err
}
if !contains([]string{"N", "E", "O"}, uart.mainConfig.Parity) {
return errors.New("parity value only one of 'N','O','E'")
}
return nil
}

// 启动
func (uart *genericUartDevice) Start(cctx typex.CCTX) error {
uart.Ctx = cctx.Ctx
uart.CancelCTX = cctx.CancelCTX

// 串口配置固定写法
// 下面的参数是传感器固定写法
config := serial.Config{
Address: uart.mainConfig.Uart,
BaudRate: uart.mainConfig.BaudRate,
DataBits: uart.mainConfig.DataBits,
Parity: uart.mainConfig.Parity,
StopBits: uart.mainConfig.StopBits,
Timeout: time.Duration(uart.mainConfig.Frequency) * time.Second,
}
serialPort, err := serial.Open(&config)
if err != nil {
glogger.GLogger.Error("rawUartDriver start failed:", err)
return err
}
uart.driver = driver.NewRawUartDriver(uart.Ctx, uart.RuleEngine, uart.Details(), serialPort)
go func(ctx context.Context) {
ticker := time.NewTicker(time.Duration(uart.mainConfig.Frequency) * time.Second)
buffer := make([]byte, common.T_64KB)
uart.driver.Read(buffer) //清理缓存
for {
<-ticker.C
select {
case <-ctx.Done():
ticker.Stop()
return
default:
uart.locker.Lock()
n, err := uart.driver.Read(buffer)
uart.locker.Unlock()
if err != nil {
glogger.GLogger.Error(err)
} else {
mapV := map[string]interface{}{
"tag": uart.mainConfig.Tag,
"value": string(buffer[:n]),
}
bytes, _ := json.Marshal(mapV)
uart.RuleEngine.WorkDevice(uart.Details(), string(bytes))
}
}
}

}(uart.Ctx)
return nil
}

// 从设备里面读数据出来
func (uart *genericUartDevice) OnRead(data []byte) (int, error) {
return 0, nil
}

// 把数据写入设备
func (uart *genericUartDevice) OnWrite(b []byte) (int, error) {
return uart.driver.Write(b)
}

// 设备当前状态
func (uart *genericUartDevice) Status() typex.DeviceState {
return typex.DEV_UP
}

// 停止设备
func (uart *genericUartDevice) Stop() {
if uart.driver != nil {
uart.driver.Stop()
}
uart.CancelCTX()
uart.status = typex.DEV_STOP
}

// 设备属性,是一系列属性描述
func (uart *genericUartDevice) Property() []typex.DeviceProperty {
return []typex.DeviceProperty{}
}

// 真实设备
func (uart *genericUartDevice) Details() *typex.Device {
return uart.RuleEngine.GetDevice(uart.PointId)
}

// 状态
func (uart *genericUartDevice) SetState(status typex.DeviceState) {
uart.status = status

}

// 驱动
func (uart *genericUartDevice) Driver() typex.XExternalDriver {
return uart.driver
}

//--------------------------------------------------------------------------------------------------
//
//--------------------------------------------------------------------------------------------------
func contains(s []string, e string) bool {
for _, a := range s {
if a == e {
return true
}
}
return false
}
3 changes: 2 additions & 1 deletion device/rtu485_ther_device.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,15 @@ func (ther *rtu485_ther) Start(cctx typex.CCTX) error {
ther.status = typex.DEV_UP
go func(ctx context.Context, Driver typex.XExternalDriver) {
ticker := time.NewTicker(time.Duration(ther.mainConfig.Frequency) * time.Second)
defer ticker.Stop()
buffer := make([]byte, common.T_64KB)
ther.driver.Read(buffer) //清理缓存
for {
<-ticker.C
select {
case <-ctx.Done():
{
ther.status = typex.DEV_STOP
ticker.Stop()
return
}
default:
Expand Down
4 changes: 3 additions & 1 deletion device/s1200plc_device.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,14 @@ func (s1200 *s1200plc) Start(cctx typex.CCTX) error {
ticker := time.NewTicker(time.Duration(*s1200.mainConfig.ReadFrequency) * time.Second)
// 数据缓冲区,最大4KB
dataBuffer := make([]byte, common.T_4KB)
s1200.driver.Read(dataBuffer) //清理缓存
for {
<-ticker.C
select {
case <-ctx.Done():
{
s1200.status = typex.DEV_STOP
ticker.Stop()
return
}
default:
Expand Down Expand Up @@ -145,7 +147,7 @@ func (s1200 *s1200plc) OnWrite(data []byte) (int, error) {

// 设备当前状态
func (s1200 *s1200plc) Status() typex.DeviceState {
if s1200.driver.State() == typex.DRIVER_RUNNING {
if s1200.driver.State() == typex.DRIVER_UP {
return typex.DEV_UP
}
return typex.DEV_DOWN
Expand Down
2 changes: 2 additions & 0 deletions device/tss200_v_0_2_device.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,14 @@ func (tss *tss200V2) Start(cctx typex.CCTX) error {
ticker := time.NewTicker(time.Duration(tss.mainConfig.Frequency) * time.Second)
defer ticker.Stop()
buffer := make([]byte, common.T_64KB)
tss.driver.Read(buffer) //清理缓存
for {
<-ticker.C
select {
case <-ctx.Done():
{
tss.status = typex.DEV_STOP
ticker.Stop()
return
}
default:
Expand Down
2 changes: 2 additions & 0 deletions device/yk8_controller_device.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,14 @@ func (yk8 *YK8Controller) Start(cctx typex.CCTX) error {
go func(ctx context.Context, Driver typex.XExternalDriver) {
ticker := time.NewTicker(time.Duration(yk8.mainConfig.Frequency) * time.Second)
buffer := make([]byte, common.T_64KB)
yk8.driver.Read(buffer) //清理缓存
for {
<-ticker.C
select {
case <-ctx.Done():
{
yk8.status = typex.DEV_STOP
ticker.Stop()
return
}
default:
Expand Down
4 changes: 2 additions & 2 deletions driver/modbus_rtu_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"encoding/binary"
"encoding/json"

"github.com/wwhai/gomodbus"
"github.com/i4de/rulex/common"
"github.com/i4de/rulex/typex"
"github.com/wwhai/gomodbus"
)

/*
Expand All @@ -30,7 +30,7 @@ func NewModBusRtuDriver(
handler *modbus.RTUClientHandler,
client modbus.Client) typex.XExternalDriver {
return &modBusRtuDriver{
state: typex.DRIVER_RUNNING,
state: typex.DRIVER_UP,
device: d,
RuleEngine: e,
client: client,
Expand Down
2 changes: 1 addition & 1 deletion driver/modbus_tcp_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func NewModBusTCPDriver(
handler *modbus.TCPClientHandler,
client modbus.Client) typex.XExternalDriver {
return &modBusTCPDriver{
state: typex.DRIVER_RUNNING,
state: typex.DRIVER_UP,
device: d,
RuleEngine: e,
client: client,
Expand Down
30 changes: 12 additions & 18 deletions driver/raw_uart_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,53 +9,47 @@ import (
"context"
"errors"

"github.com/goburrow/serial"
"github.com/i4de/rulex/glogger"
"github.com/i4de/rulex/typex"
serial "github.com/wwhai/goserial"
)

type rawUartDriver struct {
state typex.DriverState
serialPort serial.Port
config serial.Config
serialPort serial.Port
ctx context.Context
In *typex.InEnd
RuleEngine typex.RuleX
device *typex.Device
}

//
// 初始化一个驱动
//
func NewRawUartDriver(
ctx context.Context,
config serial.Config,
in *typex.InEnd,
e typex.RuleX,
onRead func([]byte)) (typex.XExternalDriver, error) {

device *typex.Device,
serialPort serial.Port,
) typex.XExternalDriver {
return &rawUartDriver{
In: in,
RuleEngine: e,
config: config,
ctx: ctx,
}, nil
serialPort: serialPort,
device: device,
}
}

//
//
//
func (a *rawUartDriver) Init(map[string]string) error {
a.state = typex.DRIVER_RUNNING
a.state = typex.DRIVER_UP

return nil
}

func (a *rawUartDriver) Work() error {
serialPort, err := serial.Open(&a.config)
a.serialPort = serialPort
if err != nil {
glogger.GLogger.Error("uartModuleSource start failed:", err)
return err
}

return nil

}
Expand Down
4 changes: 2 additions & 2 deletions driver/rtu485_ther_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ package driver
import (
"encoding/json"

"github.com/wwhai/gomodbus"
"github.com/i4de/rulex/common"
"github.com/i4de/rulex/glogger"
"github.com/i4de/rulex/typex"
"github.com/i4de/rulex/utils"
modbus "github.com/wwhai/gomodbus"
)

// Example: 0x02 0x92 0xFF 0x98
Expand Down Expand Up @@ -69,7 +69,7 @@ func (rtu485 *rtu485_THer_Driver) Work() error {
}

func (rtu485 *rtu485_THer_Driver) State() typex.DriverState {
return typex.DRIVER_RUNNING
return typex.DRIVER_UP
}

func (rtu485 *rtu485_THer_Driver) Read(data []byte) (int, error) {
Expand Down
2 changes: 1 addition & 1 deletion driver/sidecar_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (sc *SideCarDriver) State() typex.DriverState {
if sc.t() != nil {
return typex.DRIVER_STOP
}
return typex.DRIVER_RUNNING
return typex.DRIVER_UP
}

/*
Expand Down
Loading

0 comments on commit 882c0c3

Please sign in to comment.