Skip to content

Commit

Permalink
Implement xcluster source task executor (cadence-workflow#4445)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Sep 9, 2021
1 parent b22df41 commit bd7072c
Show file tree
Hide file tree
Showing 5 changed files with 1,357 additions and 5 deletions.
97 changes: 97 additions & 0 deletions service/history/execution/mutable_state_task_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ type (
transferTask *persistence.TransferTaskInfo,
targetCluster string,
) error
GenerateFromCrossClusterTask(
crossClusterTask *persistence.CrossClusterTaskInfo,
) error

// these 2 APIs should only be called when mutable state transaction is being closed
GenerateActivityTimerTasks(
Expand Down Expand Up @@ -652,6 +655,100 @@ func (r *mutableStateTaskGeneratorImpl) GenerateCrossClusterTaskFromTransferTask
return nil
}

func (r *mutableStateTaskGeneratorImpl) GenerateFromCrossClusterTask(
task *persistence.CrossClusterTaskInfo,
) error {
generateTransferTask := false
var targetCluster string

sourceDomainEntry := r.mutableState.GetDomainEntry()
if !sourceDomainEntry.IsDomainActive() && !sourceDomainEntry.IsDomainPendingActive() {
// domain is passive, generate (passive) transfer task
generateTransferTask = true
}

if !generateTransferTask {
targetDomainEntry, err := r.domainCache.GetDomainByID(task.TargetDomainID)
if err != nil {
return err
}
targetCluster = targetDomainEntry.GetReplicationConfig().ActiveClusterName
if targetCluster == r.clusterMetadata.GetCurrentClusterName() {
generateTransferTask = true
}
}

var newTask persistence.Task
switch task.GetTaskType() {
case persistence.CrossClusterTaskTypeCancelExecution:
cancelExecutionTask := &persistence.CancelExecutionTask{
// TaskID is set by shard context
TargetDomainID: task.TargetDomainID,
TargetWorkflowID: task.TargetWorkflowID,
TargetRunID: task.TargetRunID,
TargetChildWorkflowOnly: task.TargetChildWorkflowOnly,
InitiatedID: task.ScheduleID,
Version: task.Version,
}
if generateTransferTask {
newTask = cancelExecutionTask
} else {
newTask = &persistence.CrossClusterCancelExecutionTask{
TargetCluster: targetCluster,
CancelExecutionTask: *cancelExecutionTask,
}
}
case persistence.CrossClusterTaskTypeSignalExecution:
signalExecutionTask := &persistence.SignalExecutionTask{
// TaskID is set by shard context
TargetDomainID: task.TargetDomainID,
TargetWorkflowID: task.TargetWorkflowID,
TargetRunID: task.TargetRunID,
TargetChildWorkflowOnly: task.TargetChildWorkflowOnly,
InitiatedID: task.ScheduleID,
Version: task.Version,
}
if generateTransferTask {
newTask = signalExecutionTask
} else {
newTask = &persistence.CrossClusterSignalExecutionTask{
TargetCluster: targetCluster,
SignalExecutionTask: *signalExecutionTask,
}
}
case persistence.CrossClusterTaskTypeStartChildExecution:
startChildExecutionTask := &persistence.StartChildExecutionTask{
// TaskID is set by shard context
TargetDomainID: task.TargetDomainID,
TargetWorkflowID: task.TargetWorkflowID,
InitiatedID: task.ScheduleID,
Version: task.Version,
}
if generateTransferTask {
newTask = startChildExecutionTask
} else {
newTask = &persistence.CrossClusterStartChildExecutionTask{
TargetCluster: targetCluster,
StartChildExecutionTask: *startChildExecutionTask,
}
}
// TODO: add the case for CrossClusterTaskTypeRecordChildComplete and ApplyParentClosePolicy
default:
return fmt.Errorf("unable to convert cross-cluster task of type %v", task.TaskType)
}

// set visibility timestamp here so we the metric for task latency
// can include the latency for the original transfer task.
newTask.SetVisibilityTimestamp(task.VisibilityTimestamp)
if generateTransferTask {
r.mutableState.AddTransferTasks(newTask)
} else {
r.mutableState.AddCrossClusterTasks(newTask)
}

return nil
}

func (r *mutableStateTaskGeneratorImpl) GenerateActivityTimerTasks(
now time.Time,
) error {
Expand Down
14 changes: 14 additions & 0 deletions service/history/execution/mutable_state_task_generator_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

87 changes: 87 additions & 0 deletions service/history/execution/mutable_state_task_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,93 @@ func (s *mutableStateTaskGeneratorSuite) TestGenerateCrossClusterTaskFromTransfe
}
}

func (s *mutableStateTaskGeneratorSuite) TestGenerateFromCrossClusterTask() {
testCases := []struct {
sourceActive bool
crossClusterTask *persistence.CrossClusterTaskInfo
generatedTask persistence.Task
}{
{
sourceActive: true,
crossClusterTask: &persistence.CrossClusterTaskInfo{
TaskType: persistence.CrossClusterTaskTypeStartChildExecution,
TargetDomainID: constants.TestTargetDomainID,
TargetWorkflowID: constants.TestWorkflowID,
ScheduleID: int64(123),
},
generatedTask: &persistence.StartChildExecutionTask{
TargetDomainID: constants.TestTargetDomainID,
TargetWorkflowID: constants.TestWorkflowID,
InitiatedID: int64(123),
},
},
{
sourceActive: true,
crossClusterTask: &persistence.CrossClusterTaskInfo{
TaskType: persistence.CrossClusterTaskTypeSignalExecution,
TargetDomainID: constants.TestRemoteTargetDomainID,
TargetWorkflowID: constants.TestWorkflowID,
TargetRunID: constants.TestRunID,
TargetChildWorkflowOnly: false,
ScheduleID: int64(123),
},
generatedTask: &persistence.CrossClusterSignalExecutionTask{
TargetCluster: cluster.TestAlternativeClusterName,
SignalExecutionTask: persistence.SignalExecutionTask{
TargetDomainID: constants.TestRemoteTargetDomainID,
TargetWorkflowID: constants.TestWorkflowID,
TargetRunID: constants.TestRunID,
TargetChildWorkflowOnly: false,
InitiatedID: int64(123),
},
},
},
{
sourceActive: false,
crossClusterTask: &persistence.CrossClusterTaskInfo{
TaskType: persistence.CrossClusterTaskTypeCancelExecution,
TargetDomainID: constants.TestTargetDomainID,
TargetWorkflowID: constants.TestWorkflowID,
TargetRunID: constants.TestRunID,
TargetChildWorkflowOnly: false,
ScheduleID: int64(123),
},
generatedTask: &persistence.CancelExecutionTask{
TargetDomainID: constants.TestTargetDomainID,
TargetWorkflowID: constants.TestWorkflowID,
TargetRunID: constants.TestRunID,
TargetChildWorkflowOnly: false,
InitiatedID: int64(123),
},
},
}

for _, tc := range testCases {
if tc.sourceActive {
tc.crossClusterTask.DomainID = constants.TestDomainID
s.mockMutableState.EXPECT().GetDomainEntry().Return(constants.TestGlobalDomainEntry).Times(1)
} else {
tc.crossClusterTask.DomainID = constants.TestRemoteTargetDomainID
s.mockMutableState.EXPECT().GetDomainEntry().Return(constants.TestGlobalRemoteTargetDomainEntry).Times(1)
}
targetActive := tc.crossClusterTask.TargetDomainID == constants.TestTargetDomainID

var actualGeneratedTask persistence.Task
mockDoFn := func(tasks ...persistence.Task) {
actualGeneratedTask = tasks[0]
}
if !tc.sourceActive || targetActive {
s.mockMutableState.EXPECT().AddTransferTasks(gomock.Any()).Do(mockDoFn).Times(1)
} else {
s.mockMutableState.EXPECT().AddCrossClusterTasks(gomock.Any()).Do(mockDoFn).Times(1)
}

err := s.taskGenerator.GenerateFromCrossClusterTask(tc.crossClusterTask)
s.NoError(err)
s.Equal(tc.generatedTask, actualGeneratedTask)
}
}

func (s *mutableStateTaskGeneratorSuite) TestGetNextDecisionTimeout() {
defaultStartToCloseTimeout := 10 * time.Second
expectedResult := []time.Duration{
Expand Down
Loading

0 comments on commit bd7072c

Please sign in to comment.