Skip to content

Commit

Permalink
fix: fix restart driver error
Browse files Browse the repository at this point in the history
  • Loading branch information
wwhai committed Nov 2, 2021
1 parent f7a009a commit 56ca88b
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 63 deletions.
54 changes: 21 additions & 33 deletions driver/uart_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,48 +62,36 @@ func (a *UartDriver) Work() error {

go func(ctx context.Context) {
acc := 0
data := make([]byte, 1)
ticker := time.NewTicker(time.Duration(time.Microsecond * 400))
for {
for a.state == typex.RUNNING {
<-ticker.C
select {
case <-ctx.Done():
{
break
}
default:
{
}
}
data := make([]byte, 1)
size, err0 := a.serialPort.Read(data)
if err0 != nil {
if _, err0 := a.serialPort.Read(data); err0 != nil {
a.Stop()
return
}
if size == 1 {
// # 分隔符
if data[0] == '#' {
// log.Info("bytes => ", string(buffer[:acc]), buffer[:acc], acc)
a.RuleEngine.PushQueue(typex.QueueData{
In: a.In,
Out: nil,
E: a.RuleEngine,
Data: string(buffer[1:acc]),
})
// 重新初始化缓冲区
for i := 0; i < acc-1; i++ {
buffer[i] = 0
}
acc = 0
// # 分隔符
if data[0] == '#' {
// log.Info("bytes => ", string(buffer[:acc]), buffer[:acc], acc)
a.RuleEngine.PushQueue(typex.QueueData{
In: a.In,
Out: nil,
E: a.RuleEngine,
Data: string(buffer[1:acc]),
})
// 重新初始化缓冲区
for i := 0; i < acc-1; i++ {
buffer[i] = 0
}
data[0] = 0
acc = 0
}

if (data[0] != 0) && (data[0] != '\r') && (data[0] != '\n') {
buffer[acc] = data[0]
acc += 1
}
if (data[0] != 0) && (data[0] != '\r') && (data[0] != '\n') {
buffer[acc] = data[0]
acc += 1
}
}

}(a.ctx)
return nil

Expand Down
56 changes: 26 additions & 30 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ func (e *RuleEngine) Start() map[string]interface{} {
go func(ctx context.Context, xQueue typex.XQueue) {
for {
select {
case <-ctx.Done():
return
case qd := <-xQueue.GetQueue():
{
//
Expand Down Expand Up @@ -192,26 +194,18 @@ func startResources(resource typex.XResource, in *typex.InEnd, e *RuleEngine) er
startResource(resource, e, in.Id)
go func(ctx context.Context) {
// 5 seconds
//ticker := time.NewTicker(time.Duration(time.Second * 5))
ticker := time.NewTicker(time.Duration(time.Second * 5))
defer resource.Stop()
for {
//<-ticker.C
select {
case <-ctx.Done():
{
return
}
case <-time.After(time.Second * 3):
{
//------------------------------------
// 驱动挂了资源也挂了,因此检查驱动状态在先
//------------------------------------
testResourceState(resource, e, in.Id)
testDriverState(resource, e, in.Id)
//------------------------------------
}
for resource.Status() == typex.UP {
<-ticker.C
{
//------------------------------------
// 驱动挂了资源也挂了,因此检查驱动状态在先
//------------------------------------
testResourceState(resource, e, in.Id)
testDriverState(resource, e, in.Id)
//------------------------------------
}

}

}(context.Background())
Expand Down Expand Up @@ -247,7 +241,7 @@ func testResourceState(resource typex.XResource, e *RuleEngine, id string) {
//
func startResource(resource typex.XResource, e *RuleEngine, id string) {
if err := resource.Start(); err != nil {
log.Error("Resource start error:", err, resource.Status())
log.Error("Resource start error:", err)
if resource.Status() == typex.UP {
resource.Stop()
}
Expand Down Expand Up @@ -282,18 +276,20 @@ func startResource(resource typex.XResource, e *RuleEngine, id string) {
}

func testDriverState(resource typex.XResource, e *RuleEngine, id string) {
if resource.Driver() != nil {
// 只有资源启动状态才拉起驱动
if resource.Status() == typex.UP {
// 必须资源启动, 驱动才有重启意义
if resource.Driver().State() == typex.STOP {
log.Warn("Driver stopped:", resource.Driver().DriverDetail().Name)
// 只需要把资源给拉闸, 就会触发重启
startResource(resource, e, id)
if resource.Status() == typex.UP {
if resource.Driver() != nil {
// 只有资源启动状态才拉起驱动
if resource.Status() == typex.UP {
// 必须资源启动, 驱动才有重启意义
if resource.Driver().State() == typex.STOP {
log.Warn("Driver stopped:", resource.Driver().DriverDetail().Name)
// 只需要把资源给拉闸, 就会触发重启
startResource(resource, e, id)
}
}
}

}

}

//
Expand Down Expand Up @@ -465,8 +461,7 @@ func (e *RuleEngine) AllRule() map[string]*typex.Rule {
// Stop
//
func (e *RuleEngine) Stop() {
log.Info("Stopping Rulex......")

log.Info("Ready to stop rulex")
for _, inEnd := range e.InEnds {
if inEnd.Resource != nil {
log.Info("Stop InEnd:", inEnd.Name, inEnd.Id)
Expand All @@ -487,6 +482,7 @@ func (e *RuleEngine) Stop() {
}

for _, plugin := range e.Plugins {
log.Info("Uninstall plugin:", plugin.XPluginMetaInfo().Name)
plugin.Uninstall()
plugin.Clean()

Expand Down

0 comments on commit 56ca88b

Please sign in to comment.