Skip to content

Commit

Permalink
add workflow eventmest queue type
Browse files Browse the repository at this point in the history
  • Loading branch information
horoc committed Nov 14, 2022
1 parent 791b197 commit a9af1a3
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 0 deletions.
2 changes: 2 additions & 0 deletions eventmesh-workflow-go/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Config struct {
Flow struct {
Queue struct {
Store string `yaml:"store"`
Topic string `yaml:"topic"`
} `yaml:"queue"`
Scheduler struct {
Type string `yaml:"type"`
Expand All @@ -50,6 +51,7 @@ type Config struct {
UserName string `yaml:"username"`
Password string `yaml:"password"`
ProducerGroup string `yaml:"producer_group"`
ConsumerGroup string `yaml:"consumer_group"`
TTL int `yaml:"ttl"`
} `yaml:"eventmesh"`
}
Expand Down
2 changes: 2 additions & 0 deletions eventmesh-workflow-go/internal/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type QueueType int
const (
// QueueTypeInMemory in memory queue type
QueueTypeInMemory = "in-memory"
// QueueTypeEventMesh EventMesh queue
QueueTypeEventMesh = "eventmesh"
)

const (
Expand Down
174 changes: 174 additions & 0 deletions eventmesh-workflow-go/internal/queue/eventmesh_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package queue

import (
"context"
"encoding/json"
sdk "github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc"
sdk_conf "github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc/conf"
sdk_pb "github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc/proto"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
"github.com/gogf/gf/util/gconv"
"github.com/google/uuid"

conf "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/config"

"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/constants"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal/model"
)

func init() {
cfg := conf.Get()
// init and register EventMesh task queue only when config corresponding queue type
if cfg != nil && cfg.Flow.Queue.Store == constants.QueueTypeEventMesh {
RegisterQueue(newEventMeshQueue(cfg))
}
}

type eventMeshQueue struct {
// EventMesh go sdk grpc client
grpcClient sdk.Interface
// EventMesh go sdk grpc config
grpcConfig *sdk_conf.GRPCConfig

workflowConfig *conf.Config
workflowDAL dal.WorkflowDAL
observeTopic string
}

func newEventMeshQueue(workflowConfig *conf.Config) ObserveQueue {
eventMeshConfig := workflowConfig.EventMesh
grpcConfig := &sdk_conf.GRPCConfig{
Host: eventMeshConfig.Host,
Port: eventMeshConfig.GRPC.Port,
ENV: eventMeshConfig.Env,
IDC: eventMeshConfig.IDC,
SYS: eventMeshConfig.Sys,
Username: eventMeshConfig.UserName,
Password: eventMeshConfig.Password,
ProtocolType: sdk.EventmeshMessage,
ConsumerConfig: sdk_conf.ConsumerConfig{
Enabled: true,
ConsumerGroup: eventMeshConfig.ConsumerGroup,
},
}
client, err := sdk.New(grpcConfig)
if err != nil {
log.Get(constants.LogQueue).Errorf("EventMesh task queue, fail to init EventMesh client , error=%v", err)
panic(err)
}
return &eventMeshQueue{
grpcClient: client,
grpcConfig: grpcConfig,
workflowConfig: workflowConfig,
observeTopic: workflowConfig.Flow.Queue.Topic,
workflowDAL: dal.NewWorkflowDAL(),
}
}

func (q *eventMeshQueue) Name() string {
return constants.QueueTypeEventMesh
}

// Publish send task to EventMesh queue, store task info in message content with json structure
func (q *eventMeshQueue) Publish(tasks []*model.WorkflowTaskInstance) error {
if len(tasks) == 0 {
return nil
}
for _, task := range tasks {
if task == nil {
continue
}
message, err := q.toEventMeshMessage(task)
if err != nil {
log.Get(constants.LogQueue).Errorf("EventMesh task queue, fail to publish task, error=%v", err)
return err
}
_, err = q.grpcClient.Publish(context.Background(), message)
if err != nil {
log.Get(constants.LogQueue).Errorf("EventMesh task queue, fail to publish task, error=%v", err)
return err
}
}
return nil
}

// Ack do nothing
func (q *eventMeshQueue) Ack(tasks *model.WorkflowTaskInstance) error {
return nil
}

// Observe consume task by EventMesh subscription api
func (q *eventMeshQueue) Observe() {
err := q.grpcClient.SubscribeStream(sdk_conf.SubscribeItem{
Topic: q.observeTopic,
SubscribeMode: sdk_conf.CLUSTERING,
SubscribeType: sdk_conf.SYNC,
}, q.handler)
if err != nil {
log.Get(constants.LogQueue).Errorf("EventMesh task queue observe error=%v", err)
panic(err)
}
}

func (q *eventMeshQueue) handler(message *sdk_pb.SimpleMessage) interface{} {
workflowTask, err := q.toWorkflowTask(message)
if err != nil {
return err
}
log.Get(constants.LogQueue).Infof("receive task from EventMesh queue, task=%s", gconv.String(workflowTask))
if workflowTask.ID != 0 {
if err := q.workflowDAL.UpdateTaskInstance(dal.GetDalClient(), workflowTask); err != nil {
log.Get(constants.LogQueue).Errorf("EventMesh task queue observe UpdateTaskInstance error=%v", err)
}
return err
}
// new task
if err := q.workflowDAL.InsertTaskInstance(context.Background(), workflowTask); err != nil {
log.Get(constants.LogQueue).Errorf("EventMesh task queue observe InsertTaskInstance error=%v", err)
}
return nil
}

func (q *eventMeshQueue) toEventMeshMessage(task *model.WorkflowTaskInstance) (*sdk_pb.SimpleMessage, error) {
taskJsonBytes, err := json.Marshal(task)
if err != nil {
return nil, err
}

message := &sdk_pb.SimpleMessage{
Header: sdk.CreateHeader(q.grpcConfig),
ProducerGroup: q.workflowConfig.EventMesh.ProducerGroup,
Topic: q.observeTopic,
Content: string(taskJsonBytes),
Ttl: gconv.String(q.workflowConfig.EventMesh.TTL),
UniqueId: uuid.New().String(),
SeqNum: uuid.New().String(),
}
return message, nil
}

func (q *eventMeshQueue) toWorkflowTask(message *sdk_pb.SimpleMessage) (*model.WorkflowTaskInstance, error) {
taskJsonBytes := []byte(message.Content)
task := &model.WorkflowTaskInstance{}
err := json.Unmarshal(taskJsonBytes, task)
if err != nil {
return nil, err
}
return task, nil
}

0 comments on commit a9af1a3

Please sign in to comment.