From c16ac3a3832ccd8da493e134bdfdc264654ff619 Mon Sep 17 00:00:00 2001 From: wxing1292 Date: Thu, 10 May 2018 11:54:10 -0700 Subject: [PATCH] bugfix: related to failover version (#726) * bugfix: when creating workflow, the timer / transfer tasks should be updated with corresponding version --- config/development_active.yaml | 4 +-- config/development_standby.yaml | 4 +-- service/history/historyEngine.go | 12 ++++++- service/history/historyReplicator.go | 1 + service/history/mutableStateBuilder.go | 39 +++++++++++---------- service/history/workflowExecutionContext.go | 9 +---- 6 files changed, 38 insertions(+), 31 deletions(-) diff --git a/config/development_active.yaml b/config/development_active.yaml index bd5dd9eefde..03f73a5d86b 100644 --- a/config/development_active.yaml +++ b/config/development_active.yaml @@ -62,8 +62,8 @@ clustersInfo: masterClusterName: "active" currentClusterName: "active" clusterInitialFailoverVersion: - active: 0 - standby: 1 + active: 1 + standby: 0 kafka: clusters: diff --git a/config/development_standby.yaml b/config/development_standby.yaml index db258c90b25..528dfa3666e 100644 --- a/config/development_standby.yaml +++ b/config/development_standby.yaml @@ -62,8 +62,8 @@ clustersInfo: masterClusterName: "active" currentClusterName: "standby" clusterInitialFailoverVersion: - active: 0 - standby: 1 + active: 1 + standby: 0 kafka: clusters: diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 1f38455f92e..43be04a91ea 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -318,9 +318,9 @@ func (e *historyEngineImpl) StartWorkflowExecution(startRequest *h.StartWorkflow } replicationTasks = append(replicationTasks, replicationTask) } + setTaskVersion(msBuilder.GetCurrentVersion(), transferTasks, timerTasks) createWorkflow := func(isBrandNew bool, prevRunID string) (string, error) { - _, err = e.shard.CreateWorkflowExecution(&persistence.CreateWorkflowExecutionRequest{ RequestID: common.StringDefault(request.RequestId), DomainID: domainID, @@ -1866,6 +1866,7 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(signalWithStartRequ return nil, err } msBuilder.executionInfo.LastFirstEventID = startedEvent.GetEventId() + setTaskVersion(msBuilder.GetCurrentVersion(), transferTasks, timerTasks) createWorkflow := func(isBrandNew bool, prevRunID string) (string, error) { _, err = e.shard.CreateWorkflowExecution(&persistence.CreateWorkflowExecutionRequest{ @@ -2573,3 +2574,12 @@ func getStartRequest(domainID string, } return startRequest } + +func setTaskVersion(version int64, transferTasks []persistence.Task, timerTasks []persistence.Task) { + for _, task := range transferTasks { + task.SetVersion(version) + } + for _, task := range timerTasks { + task.SetVersion(version) + } +} diff --git a/service/history/historyReplicator.go b/service/history/historyReplicator.go index 91a451494dc..a3d5634f007 100644 --- a/service/history/historyReplicator.go +++ b/service/history/historyReplicator.go @@ -259,6 +259,7 @@ func (r *historyReplicator) ApplyReplicationTask(context *workflowExecutionConte decisionStartID = di.StartedID decisionTimeout = di.DecisionTimeout } + setTaskVersion(msBuilder.GetCurrentVersion(), sBuilder.transferTasks, sBuilder.timerTasks) createWorkflow := func(isBrandNew bool, prevRunID string) (string, error) { _, err = r.shard.CreateWorkflowExecution(&persistence.CreateWorkflowExecutionRequest{ diff --git a/service/history/mutableStateBuilder.go b/service/history/mutableStateBuilder.go index 795511e413f..3fd786eedda 100644 --- a/service/history/mutableStateBuilder.go +++ b/service/history/mutableStateBuilder.go @@ -2134,25 +2134,28 @@ func (e *mutableStateBuilder) ReplicateWorkflowExecutionContinuedAsNewEvent(sour newStateBuilder.updateReplicationStateLastEventID(sourceClusterName, di.ScheduleID) } + newTransferTasks := []persistence.Task{&persistence.DecisionTask{ + DomainID: domainID, + TaskList: newStateBuilder.executionInfo.TaskList, + ScheduleID: di.ScheduleID, + }} + setTaskVersion(newStateBuilder.GetCurrentVersion(), newTransferTasks, nil) + e.continueAsNew = &persistence.CreateWorkflowExecutionRequest{ - RequestID: uuid.New(), - DomainID: domainID, - Execution: newExecution, - ParentDomainID: parentDomainID, - ParentExecution: parentExecution, - InitiatedID: initiatedID, - TaskList: newStateBuilder.executionInfo.TaskList, - WorkflowTypeName: newStateBuilder.executionInfo.WorkflowTypeName, - WorkflowTimeout: newStateBuilder.executionInfo.WorkflowTimeout, - DecisionTimeoutValue: newStateBuilder.executionInfo.DecisionTimeoutValue, - ExecutionContext: nil, - NextEventID: newStateBuilder.GetNextEventID(), - LastProcessedEvent: emptyEventID, - TransferTasks: []persistence.Task{&persistence.DecisionTask{ - DomainID: domainID, - TaskList: newStateBuilder.executionInfo.TaskList, - ScheduleID: di.ScheduleID, - }}, + RequestID: uuid.New(), + DomainID: domainID, + Execution: newExecution, + ParentDomainID: parentDomainID, + ParentExecution: parentExecution, + InitiatedID: initiatedID, + TaskList: newStateBuilder.executionInfo.TaskList, + WorkflowTypeName: newStateBuilder.executionInfo.WorkflowTypeName, + WorkflowTimeout: newStateBuilder.executionInfo.WorkflowTimeout, + DecisionTimeoutValue: newStateBuilder.executionInfo.DecisionTimeoutValue, + ExecutionContext: nil, + NextEventID: newStateBuilder.GetNextEventID(), + LastProcessedEvent: emptyEventID, + TransferTasks: newTransferTasks, DecisionVersion: di.Version, DecisionScheduleID: di.ScheduleID, DecisionStartedID: di.StartedID, diff --git a/service/history/workflowExecutionContext.go b/service/history/workflowExecutionContext.go index e5cdcbbc379..ff9639f7ba8 100644 --- a/service/history/workflowExecutionContext.go +++ b/service/history/workflowExecutionContext.go @@ -249,14 +249,7 @@ func (c *workflowExecutionContext) updateHelper(builder *historyBuilder, transfe replicationTasks = append(replicationTasks, c.msBuilder.createReplicationTask()) } - // this is the current failover version - version := c.msBuilder.GetCurrentVersion() - for _, task := range transferTasks { - task.SetVersion(version) - } - for _, task := range timerTasks { - task.SetVersion(version) - } + setTaskVersion(c.msBuilder.GetCurrentVersion(), transferTasks, timerTasks) if err1 := c.updateWorkflowExecutionWithRetry(&persistence.UpdateWorkflowExecutionRequest{ ExecutionInfo: c.msBuilder.executionInfo,