Skip to content

Commit

Permalink
增加"流程撤销"事件
Browse files Browse the repository at this point in the history
  • Loading branch information
兔老三 committed Sep 18, 2023
1 parent 48abc71 commit 1375328
Show file tree
Hide file tree
Showing 15 changed files with 325 additions and 254 deletions.
10 changes: 10 additions & 0 deletions example/event/ExampleEvent.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,13 @@ func (e *MyEvent) MyEvent_TaskForceNodePass(TaskID int, CurrentNode *Node, PrevN

return nil
}

func (e *MyEvent) MyEvent_Revoke(ProcessInstanceID int, RevokeUserID string) error {
processName, err := GetProcessNameByInstanceID(ProcessInstanceID)
if err != nil {
return err
}
log.Printf("--------流程[%s],由[%s]发起撤销--------", processName, RevokeUserID)

return nil
}
6 changes: 4 additions & 2 deletions example/process/ExampleProcess.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,10 @@ func CreateProcessJson() (string, error) {
Nodelist = append(Nodelist, Node8)
Nodelist = append(Nodelist, Node9)

process:=Process{ProcessName: "员工请假",Source: "办公系统",RevokeEvents: []string{"MyEvent_Revoke"},Nodes: Nodelist}

//转化为json
j, err := JSONMarshal(Nodelist, false)
j, err := JSONMarshal(process, false)

return string(j), err
}
Expand All @@ -113,7 +115,7 @@ func CreateExampleProcess(){
}

//保存流程
id, err := ProcessSave("员工请假", string(j), "001", "办公系统")
id, err := ProcessSave(j,"system")
if err != nil {
log.Fatal(err)
}
Expand Down
52 changes: 23 additions & 29 deletions workflow/engine/ProcessDefine.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,34 @@ import (
)

//流程定义解析(json->struct)
func ProcessParse(Resource string) ([]Node, error) {
var nodes []Node
err := util.Json2Struct(Resource, &nodes)
func ProcessParse(Resource string) (Process, error) {
var process Process
err := util.Json2Struct(Resource, &process)
if err != nil {
return nil, err
return Process{}, err
}
return nodes, nil
return process, nil
}

//todo:这里要写一个func,检查解析后的node结构,比如是否只有一个开始和结束节点

//流程定义保存,返回 流程ID、error
func ProcessSave(ProcessName string, Resource string, CreateUserID string, Source string) (int, error) {
if ProcessName == "" || Source == "" || CreateUserID == "" || Resource == "" {
return 0, errors.New("流程名称、资源定义、来源、创建人ID不能为空")
}

//解析传入的json,获得node列表
nodes, err := ProcessParse(Resource)
func ProcessSave(Resource string, CreateUserID string) (int, error) {
//解析传入的json,获得process数据结构
process, err := ProcessParse(Resource)
if err != nil {
return 0, err
}

if process.ProcessName == "" || process.Source == "" || CreateUserID == "" {
return 0, errors.New("流程名称、来源、创建人ID不能为空")
}

//解析node之间的关系,流程节点执行关系定义记录
Execution := Nodes2Execution(nodes)
Execution := Nodes2Execution(process.Nodes)

//首先判断此工作流是否已定义
ProcID, Version, err := GetProcessIDByProcessName(dao.DB, ProcessName, Source)
ProcID, Version, err := GetProcessIDByProcessName(dao.DB, process.ProcessName, process.Source)
if err != nil {
return 0, err
}
Expand All @@ -51,30 +51,30 @@ func ProcessSave(ProcessName string, Resource string, CreateUserID string, Sourc
//需要将老版本移到历史表中
result := tx.Exec("INSERT INTO hist_proc_def(proc_id,NAME,`version`,resource,user_id,source,create_time)\n "+
"SELECT id,NAME,`version`,resource,user_id,source,create_time\n"+
"FROM proc_def WHERE NAME=? AND source=?;", ProcessName, Source)
"FROM proc_def WHERE NAME=? AND source=?;", process.ProcessName, process.Source)
if result.Error != nil {
tx.Rollback()
return 0, result.Error
}
//而后更新现有定义
result = tx.Model(&database.ProcDef{}).
Where("name=? AND source=?", ProcessName, Source).
Where("name=? AND source=?", process.ProcessName, process.Source).
Updates(database.ProcDef{Version: Version + 1, Resource: Resource, UserID: CreateUserID, CreatTime: time.Now()})
if result.Error != nil {
tx.Rollback()
return 0, result.Error
}
} else {
//若没有老版本,则直接插入
procDef := database.ProcDef{Name: ProcessName, Resource: Resource, UserID: CreateUserID, Source: Source}
procDef := database.ProcDef{Name: process.ProcessName, Resource: Resource, UserID: CreateUserID, Source: process.Source}
result := tx.Create(&procDef)
if result.Error != nil {
tx.Rollback()
return 0, result.Error
}
}
//重新获得流程ID、版本号,此时因为是在事务中,所以需要传入tx
ProcID, Version, err = GetProcessIDByProcessName(tx, ProcessName, Source)
ProcID, Version, err = GetProcessIDByProcessName(tx, process.ProcessName, process.Source)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -157,12 +157,6 @@ func Nodes2Execution(nodes []Node) []Execution {
}
}
return executions
////转为json
//json, err := json.Marshal(executions)
//if err != nil {
// return "", err
//}
//return string(json), nil
}

//获取流程ID、Version by 流程名、来源
Expand Down Expand Up @@ -206,23 +200,23 @@ func GetProcessNameByInstanceID(ProcessInstanceID int) (string, error) {
return r.Name, nil
}

//获取流程定义(返回流程中所有节点) by 流程ID
func GetProcessDefine(ProcessID int) ([]Node, error) {
//获取流程定义 by 流程ID
func GetProcessDefine(ProcessID int) (Process, error) {
type result struct {
Resource string
}
var r result
_, err := dao.ExecSQL("SELECT resource FROM proc_def WHERE ID=?", &r, ProcessID)
if err != nil {
return nil, err
return Process{}, err
}

return ProcessParse(r.Resource)
}

//获得某个source下所有流程信息
func GetProcessList(Source string) ([]ProcessDefine, error) {
var ProcessDefine []ProcessDefine
func GetProcessList(Source string) ([]database.ProcDef, error) {
var ProcessDefine []database.ProcDef
_, err := dao.ExecSQL("select * from proc_def where source=?", &ProcessDefine, Source)
return ProcessDefine, err
}
168 changes: 130 additions & 38 deletions workflow/engine/ProcessEvent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
. "github.com/Bunny3th/easy-workflow/workflow/model"
. "github.com/Bunny3th/easy-workflow/workflow/util"
"log"
"reflect"
)

Expand All @@ -20,65 +19,120 @@ var EventPool = make(map[string]method)
var IgnoreEventError bool

//注册一个struct中的所有func
//注意:
//1、节点事件 func签名必须是func(struct *interface{}, ProcessInstanceID int, CurrentNode *Node, PrevNode Node) error
//2、Task完成事件 func签名必须是func(struct *interface{}, TaskID int, CurrentNode *Node, PrevNode Node) error
//注意,此时不会验证事件方法参数是否正确,因为此时不知道事件到底是“节点事件”还是“流程事件”
func RegisterEvents(Struct any) {
StructValue := reflect.ValueOf(Struct)
StructType := StructValue.Type()

for i := 0; i < StructType.NumMethod(); i++ {
m := StructType.Method(i)
//自定义函数必须是4个参数,参数0:*struct{} 1:int 2:Node 3:Node
if m.Type.NumIn() != 4 || m.Type.NumOut() != 1 {
log.Printf("warning:事件方法 %s 入参、出参数量不匹配,此函数不会被导入", m.Name)
continue
}
var method = method{Struct, m}
EventPool[m.Name] = method
}
}

if m.Type.In(1).Kind().String() != "int" {
log.Printf("warning:事件方法 %s 参数1不是int类型,此函数不会被导入", m.Name)
continue
}
//验证流程事件(目前只有流程撤销事件)参数是否正确
//流程撤销事件 func签名必须是func(struct *interface{}, ProcessInstanceID int,RevokeUserID string) error
func verifyProcEventParameters(m reflect.Method) error {
//自定义函数必须是3个参数,参数0:*struct{} 1:int 2:String
if m.Type.NumIn() != 3 || m.Type.NumOut() != 1 {
fmt.Errorf("warning:事件方法 %s 入参、出参数量不匹配,此函数无法运行", m.Name)
}

if m.Type.In(2).ConvertibleTo(reflect.TypeOf(&Node{})) != true {
log.Printf("warning:事件方法 %s 参数2不是*Node类型,此函数不会被导入", m.Name)
continue
}
if m.Type.In(1).Kind().String() != "int" {
fmt.Errorf("warning:事件方法 %s 参数1不是int类型,此函数无法运行", m.Name)
}

if m.Type.In(3).ConvertibleTo(reflect.TypeOf(Node{})) != true {
log.Printf("warning:事件方法 %s 参数3不是Node类型,此函数不会被导入", m.Name)
continue
}
if m.Type.In(2).Kind().String() != "string" {
fmt.Errorf("warning:事件方法 %s 参数2不是string类型,此函数无法运行", m.Name)
}

if !TypeIsError(m.Type.Out(0)) {
log.Printf("warning:事件方法 %s 返回参数不是error类型,此函数不会被导入", m.Name)
continue
}
if !TypeIsError(m.Type.Out(0)) {
fmt.Errorf("warning:事件方法 %s 返回参数不是error类型,此函数无法运行", m.Name)
}
return nil
}

var method = method{Struct, m}

EventPool[m.Name] = method
//验证节点事件(1、节点开始 2、节点结束 3、任务结束)参数是否正确
//1、节点开始、结束事件 func签名必须是func(struct *interface{}, ProcessInstanceID int, CurrentNode *Node, PrevNode Node) error
//2、任务完成事件 func签名必须是func(struct *interface{}, TaskID int, CurrentNode *Node, PrevNode Node) error
func verifyNodeEventParameters(m reflect.Method) error {
//自定义函数必须是4个参数,参数0:*struct{} 1:int 2:Node 3:Node
if m.Type.NumIn() != 4 || m.Type.NumOut() != 1 {
fmt.Errorf("warning:事件方法 %s 入参、出参数量不匹配,此函数无法运行", m.Name)
}

if m.Type.In(1).Kind().String() != "int" {
fmt.Errorf("warning:事件方法 %s 参数1不是int类型,此函数无法运行", m.Name)
}

if m.Type.In(2).ConvertibleTo(reflect.TypeOf(&Node{})) != true {
fmt.Errorf("warning:事件方法 %s 参数2不是*Node类型,此函数无法运行", m.Name)

}

if m.Type.In(3).ConvertibleTo(reflect.TypeOf(Node{})) != true {
fmt.Errorf("warning:事件方法 %s 参数3不是Node类型,此函数无法运行", m.Name)
}

if !TypeIsError(m.Type.Out(0)) {
fmt.Errorf("warning:事件方法 %s 返回参数不是error类型,此函数无法运行", m.Name)
}
return nil
}

//检查流程节点中事件是否已经被注册
func CheckIfEventRegistered(ProcessNode Node) error {
//首先合并节点的开始和结束事件
var events []string
events = append(events, ProcessNode.NodeStartEvents...)
events = append(events, ProcessNode.NodeEndEvents...)
//判断该节点中是否所有事件都已经被注册
for _, event := range events {
if _, ok := EventPool[event]; !ok {
//检查流程:
//1、是否注册
//2、参数是否正确
func VerifyEvents(ProcessID int, Nodes ProcNodes) error {
//获取流程定义
process, err := GetProcessDefine(ProcessID)
if err != nil {
return err
}

//验证流程事件(目前只有撤销事件)
for _, event := range process.RevokeEvents {
if e, ok := EventPool[event]; !ok {
return fmt.Errorf("事件%s尚未导入", event)
}else{
if err:=verifyProcEventParameters(e.M);err!=nil{
return err
}
}
}

//各个节点中开始、结束事件 and 任务完成事件,先放入一个数组
var nodeEvents []string
for _, node := range Nodes {
nodeEvents = append(nodeEvents, node.NodeStartEvents...)
nodeEvents = append(nodeEvents, node.NodeEndEvents...)
nodeEvents = append(nodeEvents, node.TaskFinishEvents...)
}

//各个节点中事件可能有重复的,使用map去重
nodeEventsSet := make(map[string]string)
for _, event := range nodeEvents {
nodeEventsSet[event] = event
}

//验证节点事件
for _, event := range nodeEventsSet {
if e, ok := EventPool[event]; !ok {
return fmt.Errorf("事件%s尚未导入", event)
} else {
if err := verifyNodeEventParameters(e.M); err != nil {
return err
}
}
}

return nil
}

//运行事件
func RunEvents(EventNames []string, ID int, CurrentNode *Node, PrevNode Node) error {
//运行节点事件(1、节点开始 2、节点结束 3、任务结束)
func RunNodeEvents(EventNames []string, ID int, CurrentNode *Node, PrevNode Node) error {
for _, e := range EventNames {
//log.Printf("正在处理节点[%s]中事件[%s]", CurrentNode.NodeName, e)
//判断是否可以在事件池中获取事件
Expand Down Expand Up @@ -109,3 +163,41 @@ func RunEvents(EventNames []string, ID int, CurrentNode *Node, PrevNode Node) er

return nil
}

////运行流程事件(目前只有撤销事件)
func RunProcEvents(EventNames []string, ProcessInstanceID int, RevokeUserID string) error {
for _, e := range EventNames {
//log.Printf("正在处理节点[%s]中事件[%s]", CurrentNode.NodeName, e)
//判断是否可以在事件池中获取事件
event, ok := EventPool[e]
if !ok {
return fmt.Errorf("事件%s未注册", e)
}

//拼装参数
arg := []reflect.Value{
reflect.ValueOf(event.S),
reflect.ValueOf(ProcessInstanceID),
reflect.ValueOf(RevokeUserID),
}

//获取流程名
processName,err:=GetProcessNameByInstanceID(ProcessInstanceID)
if err!=nil{
return err
}

//运行func
result := event.M.Func.Call(arg)

//如果选项IgnoreEventError为false,则说明需要验证事件是否出错
if IgnoreEventError == false {
//判断第一个返回参数是否为nil,若不是,则说明事件出错
if !result[0].IsNil() {
return fmt.Errorf("流程[%s]撤销事件[%s]执行出错:%v", processName, event.M.Name, result[0])
}
}
}

return nil
}
Loading

0 comments on commit 1375328

Please sign in to comment.