Skip to content

Commit

Permalink
bugfix: related to failover version (cadence-workflow#726)
Browse files Browse the repository at this point in the history
* bugfix: when creating workflow, the timer / transfer tasks should be updated with corresponding version
  • Loading branch information
wxing1292 authored May 10, 2018
1 parent dbe4abb commit c16ac3a
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 31 deletions.
4 changes: 2 additions & 2 deletions config/development_active.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ clustersInfo:
masterClusterName: "active"
currentClusterName: "active"
clusterInitialFailoverVersion:
active: 0
standby: 1
active: 1
standby: 0

kafka:
clusters:
Expand Down
4 changes: 2 additions & 2 deletions config/development_standby.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ clustersInfo:
masterClusterName: "active"
currentClusterName: "standby"
clusterInitialFailoverVersion:
active: 0
standby: 1
active: 1
standby: 0

kafka:
clusters:
Expand Down
12 changes: 11 additions & 1 deletion service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,9 @@ func (e *historyEngineImpl) StartWorkflowExecution(startRequest *h.StartWorkflow
}
replicationTasks = append(replicationTasks, replicationTask)
}
setTaskVersion(msBuilder.GetCurrentVersion(), transferTasks, timerTasks)

createWorkflow := func(isBrandNew bool, prevRunID string) (string, error) {

_, err = e.shard.CreateWorkflowExecution(&persistence.CreateWorkflowExecutionRequest{
RequestID: common.StringDefault(request.RequestId),
DomainID: domainID,
Expand Down Expand Up @@ -1866,6 +1866,7 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(signalWithStartRequ
return nil, err
}
msBuilder.executionInfo.LastFirstEventID = startedEvent.GetEventId()
setTaskVersion(msBuilder.GetCurrentVersion(), transferTasks, timerTasks)

createWorkflow := func(isBrandNew bool, prevRunID string) (string, error) {
_, err = e.shard.CreateWorkflowExecution(&persistence.CreateWorkflowExecutionRequest{
Expand Down Expand Up @@ -2573,3 +2574,12 @@ func getStartRequest(domainID string,
}
return startRequest
}

func setTaskVersion(version int64, transferTasks []persistence.Task, timerTasks []persistence.Task) {
for _, task := range transferTasks {
task.SetVersion(version)
}
for _, task := range timerTasks {
task.SetVersion(version)
}
}
1 change: 1 addition & 0 deletions service/history/historyReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ func (r *historyReplicator) ApplyReplicationTask(context *workflowExecutionConte
decisionStartID = di.StartedID
decisionTimeout = di.DecisionTimeout
}
setTaskVersion(msBuilder.GetCurrentVersion(), sBuilder.transferTasks, sBuilder.timerTasks)

createWorkflow := func(isBrandNew bool, prevRunID string) (string, error) {
_, err = r.shard.CreateWorkflowExecution(&persistence.CreateWorkflowExecutionRequest{
Expand Down
39 changes: 21 additions & 18 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2134,25 +2134,28 @@ func (e *mutableStateBuilder) ReplicateWorkflowExecutionContinuedAsNewEvent(sour
newStateBuilder.updateReplicationStateLastEventID(sourceClusterName, di.ScheduleID)
}

newTransferTasks := []persistence.Task{&persistence.DecisionTask{
DomainID: domainID,
TaskList: newStateBuilder.executionInfo.TaskList,
ScheduleID: di.ScheduleID,
}}
setTaskVersion(newStateBuilder.GetCurrentVersion(), newTransferTasks, nil)

e.continueAsNew = &persistence.CreateWorkflowExecutionRequest{
RequestID: uuid.New(),
DomainID: domainID,
Execution: newExecution,
ParentDomainID: parentDomainID,
ParentExecution: parentExecution,
InitiatedID: initiatedID,
TaskList: newStateBuilder.executionInfo.TaskList,
WorkflowTypeName: newStateBuilder.executionInfo.WorkflowTypeName,
WorkflowTimeout: newStateBuilder.executionInfo.WorkflowTimeout,
DecisionTimeoutValue: newStateBuilder.executionInfo.DecisionTimeoutValue,
ExecutionContext: nil,
NextEventID: newStateBuilder.GetNextEventID(),
LastProcessedEvent: emptyEventID,
TransferTasks: []persistence.Task{&persistence.DecisionTask{
DomainID: domainID,
TaskList: newStateBuilder.executionInfo.TaskList,
ScheduleID: di.ScheduleID,
}},
RequestID: uuid.New(),
DomainID: domainID,
Execution: newExecution,
ParentDomainID: parentDomainID,
ParentExecution: parentExecution,
InitiatedID: initiatedID,
TaskList: newStateBuilder.executionInfo.TaskList,
WorkflowTypeName: newStateBuilder.executionInfo.WorkflowTypeName,
WorkflowTimeout: newStateBuilder.executionInfo.WorkflowTimeout,
DecisionTimeoutValue: newStateBuilder.executionInfo.DecisionTimeoutValue,
ExecutionContext: nil,
NextEventID: newStateBuilder.GetNextEventID(),
LastProcessedEvent: emptyEventID,
TransferTasks: newTransferTasks,
DecisionVersion: di.Version,
DecisionScheduleID: di.ScheduleID,
DecisionStartedID: di.StartedID,
Expand Down
9 changes: 1 addition & 8 deletions service/history/workflowExecutionContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,14 +249,7 @@ func (c *workflowExecutionContext) updateHelper(builder *historyBuilder, transfe
replicationTasks = append(replicationTasks, c.msBuilder.createReplicationTask())
}

// this is the current failover version
version := c.msBuilder.GetCurrentVersion()
for _, task := range transferTasks {
task.SetVersion(version)
}
for _, task := range timerTasks {
task.SetVersion(version)
}
setTaskVersion(c.msBuilder.GetCurrentVersion(), transferTasks, timerTasks)

if err1 := c.updateWorkflowExecutionWithRetry(&persistence.UpdateWorkflowExecutionRequest{
ExecutionInfo: c.msBuilder.executionInfo,
Expand Down

0 comments on commit c16ac3a

Please sign in to comment.