Skip to content

Commit

Permalink
修复部分bug,优化代码
Browse files Browse the repository at this point in the history
  • Loading branch information
兔老三 committed Oct 8, 2023
1 parent 1197986 commit 62db969
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 87 deletions.
4 changes: 2 additions & 2 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ import (
)

func DBConfig() {
DBConnect.DBConnectString = "goeasy:sNd%sLDjd*12@tcp(172.16.18.18:3306)/easy_workflow?charset=utf8mb4&parseTime=True&loc=Local"
DBConnect.DBConnectString = "goeasy:sNd%sLDjd*12@tcp(172.16.18.18:3306)/test_workflow?charset=utf8mb4&parseTime=True&loc=Local"
DBlog.LogLevel = 4 //日志级别(默认3) 1:Silent 2:Error 3:Warn 4:Info
}

func main() {
//----------------------------开启流程引擎----------------------------
StartWorkFlow(DBConfig,true,&MyEvent{})
StartWorkFlow(DBConfig,false,&MyEvent{})

//----------------------------生成一个示例流程----------------------------
CreateExampleProcess()
Expand Down
4 changes: 2 additions & 2 deletions workflow/database/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ type ProcExecution struct {
ProcVersion int `gorm:"column:proc_version;type:INT UNSIGNED NOT NULL;comment:流程版本号"`
NodeID string `gorm:"column:node_id;type:VARCHAR(250) NOT NULL;comment:节点ID"`
NodeName string `gorm:"column:node_name;type:VARCHAR(250) NOT NULL;comment:节点名称"`
PrevNodeID *string `gorm:"column:prev_node_id;type:VARCHAR(250) DEFAULT NULL;default NULL;comment:上级节点ID"`
PrevNodeID string `gorm:"column:prev_node_id;type:VARCHAR(250) DEFAULT NULL;default NULL;comment:上级节点ID"`
NodeType int `gorm:"column:node_type;type:TINYINT NOT NULL;comment:流程类型 0:开始节点 1:任务节点 2:网关节点 3:结束节点"`
IsCosigned int `gorm:"column:is_cosigned;type:TINYINT NOT NULL;comment:是否会签"`
CreateTime LocalTime `gorm:"column:create_time;type:DATETIME DEFAULT NOW();default NOW();comment:创建时间"`
Expand All @@ -115,7 +115,7 @@ type HistProcExecution struct {
ProcVersion int `gorm:"column:proc_version;type:INT UNSIGNED NOT NULL;comment:流程版本号"`
NodeID string `gorm:"column:node_id;type:VARCHAR(250) NOT NULL;comment:节点ID"`
NodeName string `gorm:"column:node_name;type:VARCHAR(250) NOT NULL;comment:节点名称"`
PrevNodeID *string `gorm:"column:prev_node_id;type:VARCHAR(250) DEFAULT NULL;default NULL;comment:上级节点ID"`
PrevNodeID string `gorm:"column:prev_node_id;type:VARCHAR(250) DEFAULT NULL;default NULL;comment:上级节点ID"`
NodeType int `gorm:"column:node_type;type:TINYINT NOT NULL;comment:流程类型 0:开始节点 1:任务节点 2:网关节点 3:结束节点"`
IsCosigned int `gorm:"column:is_cosigned;type:TINYINT NOT NULL;comment:是否会签"`
CreateTime LocalTime `gorm:"column:create_time;type:DATETIME DEFAULT NOW();default NOW();comment:创建时间"`
Expand Down
4 changes: 2 additions & 2 deletions workflow/engine/ProcessDefine.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func nodes2Execution(ProcID int, ProcVersion int, nodes []Node) []database.ProcE
ProcVersion: ProcVersion,
NodeID: n.NodeID,
NodeName: n.NodeName,
PrevNodeID: &PrevNodeID,
PrevNodeID: PrevNodeID,
NodeType: int(n.NodeType),
IsCosigned: int(n.IsCosigned),
CreateTime: database.LTime.Now(),
Expand All @@ -140,7 +140,7 @@ func nodes2Execution(ProcID int, ProcVersion int, nodes []Node) []database.ProcE
ProcVersion: ProcVersion,
NodeID: n.NodeID,
NodeName: n.NodeName,
PrevNodeID: &prev,
PrevNodeID: prev,
NodeType: int(n.NodeType),
IsCosigned: int(n.IsCosigned),
CreateTime: database.LTime.Now(),
Expand Down
3 changes: 2 additions & 1 deletion workflow/engine/ProcessInstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,11 @@ func InstanceStart(ProcessID int, BusinessID string, Comment string, VariablesJs
//开始节点处理
err = startNodeHandle(InstanceID, &StartNode, Comment, VariablesJson)
if err != nil {
//需要删除刚才已经建立的实例记录和变量记录
//需要删除刚才已经建立的实例记录、变量记录、任务记录
//这里已经没有必要对数据库执行做错误判断了,能删就删,删不掉也没多大关系
dao.DB.Where("id=?",InstanceID).Delete(database.ProcInst{})
dao.DB.Where("proc_inst_id=?",InstanceID).Delete(database.ProcInstVariable{})
dao.DB.Where("proc_inst_id=?",InstanceID).Delete(database.ProcTask{})

return InstanceID, err
}
Expand Down
166 changes: 86 additions & 80 deletions workflow/engine/ProcessTask.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,25 +103,45 @@ func CreateTask(ProcessInstanceID int, NodeID string, PrevNodeID string, UserIDs
//***需要注意的事,此功能只有在A是非会签节点时才能使用,否则试想,若节点中有甲乙两人,一人使用普通的pass,一人使用此时使用DirectlyToWhoRejectedMe,
//此时出现分歧,难道要打一架解决?
type taskOption struct {
Status int //任务状态 1:通过 2:驳回
DirectlyToWhoRejectedMe bool //任务通过(pass)时直接返回到上一个驳回我的节点
}

//完成任务,在本节点处理完毕的情况下会自动处理下一个节点
func TaskPass(TaskID int, Comment string, VariableJson string, DirectlyToWhoRejectedMe bool) error {
//获取节点信息
err := processTask(TaskID, Comment, VariableJson, taskOption{Status: 1, DirectlyToWhoRejectedMe: DirectlyToWhoRejectedMe})
return err
}

//驳回任务,在本节点处理完毕的情况下会自动处理下一个节点
func TaskReject(TaskID int, Comment string, VariableJson string) error {
err := processTask(TaskID, Comment, VariableJson, taskOption{Status: 2})
return err
}

//任务处理(通过/驳回)
func processTask(TaskID int, Comment string, VariableJson string, option taskOption) error {
////获取节点信息
taskInfo, err := GetTaskInfo(TaskID)
if err != nil {
return err
}

//获取task所在的node
CurrentNode, err := GetInstanceNode(taskInfo.ProcInstID, taskInfo.NodeID)
if err != nil {
return err
}

//判断节点是否已处理
if taskInfo.IsFinished == 1 {
return fmt.Errorf("节点ID%d已处理,无需操作", TaskID)
}

//------------------------ DirectlyToWhoRejectedMe 功能前置验证 ------------------------
//------------------------如果是通过,且DirectlyToWhoRejectedMe为true,则需做功能前置验证 ------------------------
//1、是否是会签节点
//2、是否存在上一个任务节点?上一个节点是否做的是驳回
if DirectlyToWhoRejectedMe {
if option.Status == 1 && option.DirectlyToWhoRejectedMe {
//会签节点无法使用此功能,因为会签节点没有“统一意志”
if taskInfo.IsCosigned == 1 {
return errors.New("会签节点无法使用【DirectlyToWhoRejectedMe】功能!")
Expand All @@ -143,68 +163,12 @@ func TaskPass(TaskID int, Comment string, VariableJson string, DirectlyToWhoReje
}
}

//任务提交数据保存
err = taskSubmitSave(TaskID, Comment, VariableJson, 1)
if err != nil {
return err
}

//完成任务后的后继处理
err = processAfterTaskFinished(TaskID, taskOption{DirectlyToWhoRejectedMe: DirectlyToWhoRejectedMe})
if err != nil {
return err
}

return nil
}

//驳回任务,在本节点处理完毕的情况下会自动处理下一个节点
func TaskReject(TaskID int, Comment string, VariableJson string) error {
//获取节点信息
taskInfo, err := GetTaskInfo(TaskID)
if err != nil {
return err
}
//判断节点是否已处理
if taskInfo.IsFinished == 1 {
return fmt.Errorf("节点ID%d已处理,无需操作", TaskID)
}

//获取task所在的node
taskNode, err := GetInstanceNode(taskInfo.ProcInstID, taskInfo.NodeID)
if err != nil {
return err
}
//起始节点不能做驳回
if taskNode.NodeType == RootNode {
//如果是驳回,则验证是否起始节点(起始节点不能做驳回)
if option.Status == 2 && CurrentNode.NodeType == RootNode {
return errors.New("起始节点无法驳回!")
}

//任务提交数据保存
err = taskSubmitSave(TaskID, Comment, VariableJson, 2)
if err != nil {
return err
}

//完成任务后的后继处理
err = processAfterTaskFinished(TaskID, taskOption{})
if err != nil {
return err
}

return nil
}

//任务完成后的处理
func processAfterTaskFinished(TaskID int, option taskOption) error {
////获取节点信息
taskInfo, err := GetTaskInfo(TaskID)
if err != nil {
return err
}

//当前task所在节点
CurrentNode, err := GetInstanceNode(taskInfo.ProcInstID, taskInfo.NodeID)
err = taskSubmit(taskInfo, Comment, VariableJson, option.Status)
if err != nil {
return err
}
Expand All @@ -216,27 +180,31 @@ func processAfterTaskFinished(TaskID int, option taskOption) error {
} else {
PrevNode, err = GetInstanceNode(taskInfo.ProcInstID, taskInfo.PrevNodeID)
if err != nil {
taskRevoke(TaskID)
return err
}
}

//--------------------------这里处理[任务结束]事件--------------------------
err = RunNodeEvents(CurrentNode.TaskFinishEvents, taskInfo.TaskID, &CurrentNode, PrevNode)
if err != nil {
taskRevoke(TaskID)
return err
}

//获取任务执行完毕后下一个节点
var NextNode Node
//如果任务动作是“pass” and 开启 DirectlyToWhoRejectedMe,直接使用任务的PrevNodeID
if taskInfo.Status == 1 && option.DirectlyToWhoRejectedMe {
if option.Status == 1 && option.DirectlyToWhoRejectedMe {
NextNode, err = GetInstanceNode(taskInfo.ProcInstID, taskInfo.PrevNodeID)
if err != nil {
taskRevoke(TaskID)
return err
}
} else { //否则就通过计算得出下一个节点是谁
NextNode, err = TaskNextNode(taskInfo.TaskID)
if err != nil {
taskRevoke(TaskID)
return err
}
}
Expand All @@ -253,12 +221,14 @@ func processAfterTaskFinished(TaskID int, option taskOption) error {
//--------------------------这里处理节点结束事件--------------------------
err = RunNodeEvents(CurrentNode.NodeEndEvents, taskInfo.ProcInstID, &CurrentNode, PrevNode)
if err != nil {
taskRevoke(TaskID)
return err
}

//--------------------------开始处理下一个节点--------------------------
err = ProcessNode(taskInfo.ProcInstID, &NextNode, CurrentNode)
if err != nil {
taskRevoke(TaskID)
return err
}

Expand Down Expand Up @@ -424,12 +394,17 @@ func TaskFreeRejectToUpstreamNode(TaskID int, NodeID string, Comment string, Var
}

//保存数据
taskSubmitSave(TaskID, Comment, VariableJson, 2)
err = taskSubmit(taskInfo, Comment, VariableJson, 2)
if err != nil {
return err
}

err = ProcessNode(taskInfo.ProcInstID, &RejectToNode, CurrentNode)
if err != nil {
taskRevoke(TaskID)
return err
}

return nil
}

Expand Down Expand Up @@ -564,27 +539,21 @@ func TaskNodeStatus(TaskID int) (int, int, int, error) {
}

//将任务提交数据(通过、驳回、变量)保存到数据库
func taskSubmitSave(TaskID int, Comment string, VariableJson string, Status int) error {
taskInfo, err := GetTaskInfo(TaskID)
if err != nil {
return err
}
func taskSubmit(TaskInfo Task, Comment string, VariableJson string, Status int) error {
//taskInfo, err := GetTaskInfo(TaskID)
//if err != nil {
// return err
//}
//判断节点是否已处理
if taskInfo.IsFinished == 1 {
return fmt.Errorf("节点ID%d已处理,无需操作", TaskID)
}

//设置实例变量
err = InstanceVariablesSave(taskInfo.ProcInstID, VariableJson)
if err != nil {
return err
if TaskInfo.IsFinished == 1 {
return fmt.Errorf("节点ID%d已处理,无需操作", TaskInfo.TaskID)
}

//------------------------------------开始事务------------------------------------
tx := DB.Begin()

//更新task表记录
result := tx.Model(&database.ProcTask{}).Where("id=?", taskInfo.TaskID).
result := tx.Model(&database.ProcTask{}).Where("id=?", TaskInfo.TaskID).
Updates(database.ProcTask{Status: Status, IsFinished: 1, Comment: Comment, FinishedTime: database.LTime.Now()})
if result.Error != nil {
tx.Rollback()
Expand All @@ -593,21 +562,58 @@ func taskSubmitSave(TaskID int, Comment string, VariableJson string, Status int)

//1、非会签节点,一人通过即通过,所以要把其他人的任务finish掉
//2、不论是否会签,都是一人驳回即驳回,所以需要把同一批次task的isfinish设置为1,让其他人不用再处理
if (taskInfo.IsCosigned == 0 && Status == 1) || Status == 2 {
result = tx.Model(&database.ProcTask{}).Where("batch_code=?", taskInfo.BatchCode).
if (TaskInfo.IsCosigned == 0 && Status == 1) || Status == 2 {
result = tx.Model(&database.ProcTask{}).Where("batch_code=?", TaskInfo.BatchCode).
Updates(database.ProcTask{IsFinished: 1, FinishedTime: database.LTime.Now()})
if result.Error != nil {
tx.Rollback()
return result.Error
}
}

//设置实例变量
err := InstanceVariablesSave(TaskInfo.ProcInstID, VariableJson)
if err != nil {
tx.Rollback()
return err
}

//事务提交
tx.Commit()

return nil
}

//任务提交之后需要处理后继工作:任务结束事件、节点结束事件等.
//如果这些事件出错,则之前已提交的任务就成为了死任务,整个流程就被挂起.
//所以,要在出错后对之前的任务做初始化恢复.
func taskRevoke(TaskID int) error {
tx := DB.Begin()

//首先,将task状态做初始化
result := tx.Exec("update proc_task \n"+
"set `status`=0,is_finished=0,finished_time=NULL,comment=NULL \n"+
"WHERE id=?", TaskID)
if result.Error != nil {
tx.Rollback()
return result.Error
}

//其次,对那些没有做通过或驳回,被自动被设置为finish的task做初始化
tx.Exec("UPDATE proc_task SET is_finished=0 \n"+
"WHERE `status`=0 AND batch_code IN (SELECT batch_code FROM proc_task WHERE id=?)", TaskID)
if result.Error != nil {
tx.Rollback()
return result.Error
}

//至于之前执行InstanceVariablesSave而存入的变量,不必理会。因为会在下次执行taskSubmit的时候被覆盖掉

tx.Commit()

return nil
}

//任务的上一个节点是不是做了驳回
func taskPrevNodeIsReject(TaskInfo Task) (error, bool) {
//获得实际执行过程中上一个节点的BatchCode
Expand Down

0 comments on commit 62db969

Please sign in to comment.