Skip to content

Commit

Permalink
Convert transfer to cross cluster task if target domain is active in …
Browse files Browse the repository at this point in the history
…remote cluster (cadence-workflow#4268)

- Convert transfer to cross cluster task if target domain is active in remote cluster
- Refactor unit test for transfer active task executor
  • Loading branch information
yycptt authored Jun 22, 2021
1 parent ffe4e2f commit ddfc127
Show file tree
Hide file tree
Showing 6 changed files with 644 additions and 719 deletions.
66 changes: 66 additions & 0 deletions service/history/execution/mutable_state_task_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
package execution

import (
"errors"
"fmt"
"math"
"math/rand"
Expand Down Expand Up @@ -77,6 +78,10 @@ type (
) error
GenerateWorkflowSearchAttrTasks() error
GenerateWorkflowResetTasks() error
GenerateCrossClusterTaskFromTransferTask(
transferTask *persistence.TransferTaskInfo,
targetCluster string,
) error

// these 2 APIs should only be called when mutable state transaction is being closed
GenerateActivityTimerTasks(
Expand Down Expand Up @@ -547,6 +552,67 @@ func (r *mutableStateTaskGeneratorImpl) GenerateWorkflowResetTasks() error {
return nil
}

func (r *mutableStateTaskGeneratorImpl) GenerateCrossClusterTaskFromTransferTask(
task *persistence.TransferTaskInfo,
targetCluster string,
) error {
if targetCluster == r.clusterMetadata.GetCurrentClusterName() {
// this should not happen
return errors.New("unable to create cross-cluster task for current cluster")
}

var crossClusterTask persistence.Task
switch task.TaskType {
case persistence.TransferTaskTypeCancelExecution:
crossClusterTask = &persistence.CrossClusterCancelExecutionTask{
TargetCluster: targetCluster,
CancelExecutionTask: persistence.CancelExecutionTask{
// TaskID is set by shard context
TargetDomainID: task.TargetDomainID,
TargetWorkflowID: task.TargetWorkflowID,
TargetRunID: task.TargetRunID,
TargetChildWorkflowOnly: task.TargetChildWorkflowOnly,
InitiatedID: task.ScheduleID,
Version: task.Version,
},
}
case persistence.TransferTaskTypeSignalExecution:
crossClusterTask = &persistence.CrossClusterSignalExecutionTask{
TargetCluster: targetCluster,
SignalExecutionTask: persistence.SignalExecutionTask{
// TaskID is set by shard context
TargetDomainID: task.TargetDomainID,
TargetWorkflowID: task.TargetWorkflowID,
TargetRunID: task.TargetRunID,
TargetChildWorkflowOnly: task.TargetChildWorkflowOnly,
InitiatedID: task.ScheduleID,
Version: task.Version,
},
}
case persistence.TransferTaskTypeStartChildExecution:
crossClusterTask = &persistence.CrossClusterStartChildExecutionTask{
TargetCluster: targetCluster,
StartChildExecutionTask: persistence.StartChildExecutionTask{
// TaskID is set by shard context
TargetDomainID: task.TargetDomainID,
TargetWorkflowID: task.TargetWorkflowID,
InitiatedID: task.ScheduleID,
Version: task.Version,
},
}
// TODO: add the case for TransferTaskTypeCloseExecution
default:
return fmt.Errorf("unable to convert transfer task of type %v to cross-cluster task", task.TaskType)
}

// set visibility timestamp here so we the metric for task latency
// can include the latency for the original transfer task.
crossClusterTask.SetVisibilityTimestamp(task.VisibilityTimestamp)
r.mutableState.AddCrossClusterTasks(crossClusterTask)

return nil
}

func (r *mutableStateTaskGeneratorImpl) GenerateActivityTimerTasks(
now time.Time,
) error {
Expand Down
15 changes: 15 additions & 0 deletions service/history/execution/mutable_state_task_generator_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

99 changes: 99 additions & 0 deletions service/history/execution/mutable_state_task_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,105 @@ func (s *mutableStateTaskGeneratorSuite) TestIsCrossClusterTask() {
}
}

func (s *mutableStateTaskGeneratorSuite) TestGenerateCrossClusterTaskFromTransferTask() {
targetCluster := cluster.TestAlternativeClusterName
now := time.Now()
testCases := []struct {
tranferTask *persistence.TransferTaskInfo
expectError bool
expectedCrossClusterTask persistence.Task
}{
{
tranferTask: &persistence.TransferTaskInfo{
TaskType: persistence.TransferTaskTypeActivityTask,
},
expectError: true,
},
{
tranferTask: &persistence.TransferTaskInfo{
TaskType: persistence.TransferTaskTypeCancelExecution,
TargetDomainID: constants.TestTargetDomainID,
TargetWorkflowID: constants.TestWorkflowID,
TargetRunID: constants.TestRunID,
TargetChildWorkflowOnly: false,
ScheduleID: int64(123),
},
expectError: false,
expectedCrossClusterTask: &persistence.CrossClusterCancelExecutionTask{
TargetCluster: targetCluster,
CancelExecutionTask: persistence.CancelExecutionTask{
TargetDomainID: constants.TestTargetDomainID,
TargetWorkflowID: constants.TestWorkflowID,
TargetRunID: constants.TestRunID,
TargetChildWorkflowOnly: false,
InitiatedID: int64(123),
},
},
},
{
tranferTask: &persistence.TransferTaskInfo{
TaskType: persistence.TransferTaskTypeSignalExecution,
TargetDomainID: constants.TestTargetDomainID,
TargetWorkflowID: constants.TestWorkflowID,
TargetRunID: constants.TestRunID,
TargetChildWorkflowOnly: false,
ScheduleID: int64(123),
},
expectError: false,
expectedCrossClusterTask: &persistence.CrossClusterSignalExecutionTask{
TargetCluster: targetCluster,
SignalExecutionTask: persistence.SignalExecutionTask{
TargetDomainID: constants.TestTargetDomainID,
TargetWorkflowID: constants.TestWorkflowID,
TargetRunID: constants.TestRunID,
TargetChildWorkflowOnly: false,
InitiatedID: int64(123),
},
},
},
{
tranferTask: &persistence.TransferTaskInfo{
TaskType: persistence.TransferTaskTypeStartChildExecution,
TargetDomainID: constants.TestTargetDomainID,
TargetWorkflowID: constants.TestWorkflowID,
ScheduleID: int64(123),
},
expectError: false,
expectedCrossClusterTask: &persistence.CrossClusterStartChildExecutionTask{
TargetCluster: targetCluster,
StartChildExecutionTask: persistence.StartChildExecutionTask{
TargetDomainID: constants.TestTargetDomainID,
TargetWorkflowID: constants.TestWorkflowID,
InitiatedID: int64(123),
},
},
},
}

for _, tc := range testCases {
var actualCrossClusterTask persistence.Task
if !tc.expectError {
tc.tranferTask.Version = int64(101)
tc.expectedCrossClusterTask.SetVersion(int64(101))
tc.tranferTask.VisibilityTimestamp = now
tc.expectedCrossClusterTask.SetVisibilityTimestamp(now)

s.mockMutableState.EXPECT().AddCrossClusterTasks(gomock.Any()).Do(
func(crossClusterTasks ...persistence.Task) {
actualCrossClusterTask = crossClusterTasks[0]
},
).MaxTimes(1)
}

err := s.taskGenerator.GenerateCrossClusterTaskFromTransferTask(tc.tranferTask, targetCluster)
if tc.expectError {
s.Error(err)
} else {
s.Equal(tc.expectedCrossClusterTask, actualCrossClusterTask)
}
}
}

func (s *mutableStateTaskGeneratorSuite) TestGetNextDecisionTimeout() {
defaultStartToCloseTimeout := 10 * time.Second
expectedResult := []time.Duration{
Expand Down
5 changes: 4 additions & 1 deletion service/history/shard/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -1260,7 +1260,10 @@ func (s *contextImpl) allocateTransferIDsLocked(
}
s.logger.Debug(fmt.Sprintf("Assigning task ID: %v", id))
task.SetTaskID(id)
task.SetVisibilityTimestamp(now)
// only set task visibility timestamp if it's not set
if task.GetVisibilityTimestamp().IsZero() {
task.SetVisibilityTimestamp(now)
}
*transferMaxReadLevel = id
}
return nil
Expand Down
84 changes: 72 additions & 12 deletions service/history/task/transfer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/uber/cadence/client/history"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
Expand Down Expand Up @@ -403,11 +404,18 @@ func (t *transferActiveTaskExecutor) processCancelExecution(
return err
}

targetDomainName, err := t.shard.GetDomainCache().GetDomainName(task.TargetDomainID)
targetDomainEntry, err := t.shard.GetDomainCache().GetDomainByID(task.TargetDomainID)
if err != nil {
// TODO: handle the case where target domain does not exist
return err
}

if targetCluster, isCrossCluster := t.isCrossClusterTask(targetDomainEntry); isCrossCluster {
return t.generateCrossClusterTask(ctx, wfContext, task, targetCluster)
}

targetDomainName := targetDomainEntry.GetInfo().Name

// handle workflow cancel itself
if task.DomainID == task.TargetDomainID && task.WorkflowID == task.TargetWorkflowID {
// it does not matter if the run ID is a mismatch
Expand Down Expand Up @@ -495,11 +503,18 @@ func (t *transferActiveTaskExecutor) processSignalExecution(
return err
}

targetDomainName, err := t.shard.GetDomainCache().GetDomainName(task.TargetDomainID)
targetDomainEntry, err := t.shard.GetDomainCache().GetDomainByID(task.TargetDomainID)
if err != nil {
// TODO: handle the case where target domain does not exist
return err
}

if targetCluster, isCrossCluster := t.isCrossClusterTask(targetDomainEntry); isCrossCluster {
return t.generateCrossClusterTask(ctx, wfContext, task, targetCluster)
}

targetDomainName := targetDomainEntry.GetInfo().Name

// handle workflow signal itself
if task.DomainID == task.TargetDomainID && task.WorkflowID == task.TargetWorkflowID {
// it does not matter if the run ID is a mismatch
Expand Down Expand Up @@ -610,16 +625,6 @@ func (t *transferActiveTaskExecutor) processStartChildExecution(
domainName = task.DomainID
}

// Get target domain name
var targetDomainName string
if targetDomainName, err = t.shard.GetDomainCache().GetDomainName(task.TargetDomainID); err != nil {
if _, ok := err.(*types.EntityNotExistsError); !ok {
return err
}
// it is possible that the domain got deleted. Use domainID instead as this is only needed for the history event
targetDomainName = task.TargetDomainID
}

initiatedEventID := task.ScheduleID
childInfo, ok := mutableState.GetChildExecutionInfo(initiatedEventID)
if !ok {
Expand All @@ -630,6 +635,25 @@ func (t *transferActiveTaskExecutor) processStartChildExecution(
return err
}

// Get target domain name
var targetDomainName string
var targetDomainEntry *cache.DomainCacheEntry
if targetDomainEntry, err = t.shard.GetDomainCache().GetDomainByID(task.TargetDomainID); err != nil {
if _, ok := err.(*types.EntityNotExistsError); !ok {
return err
}
// TODO: handle the case where target domain does not exist

// it is possible that the domain got deleted. Use domainID instead as this is only needed for the history event
targetDomainName = task.TargetDomainID
} else {
if targetCluster, isCrossCluster := t.isCrossClusterTask(targetDomainEntry); isCrossCluster {
return t.generateCrossClusterTask(ctx, wfContext, task, targetCluster)
}

targetDomainName = targetDomainEntry.GetInfo().Name
}

initiatedEvent, err := mutableState.GetChildExecutionInitiatedEvent(ctx, initiatedEventID)
if err != nil {
return err
Expand Down Expand Up @@ -1171,6 +1195,42 @@ func (t *transferActiveTaskExecutor) signalExternalExecutionFailed(
return err
}

func (t *transferActiveTaskExecutor) isCrossClusterTask(
targetDomainEntry *cache.DomainCacheEntry,
) (string, bool) {
targetCluster := targetDomainEntry.GetReplicationConfig().ActiveClusterName
if targetCluster != t.shard.GetClusterMetadata().GetCurrentClusterName() {
return targetCluster, true
}
return "", false
}

func (t *transferActiveTaskExecutor) generateCrossClusterTask(
ctx context.Context,
wfContext execution.Context,
task *persistence.TransferTaskInfo,
targetCluster string,
) error {
return t.updateWorkflowExecution(
ctx,
wfContext,
false,
func(ctx context.Context, mutableState execution.MutableState) error {
if !mutableState.IsWorkflowExecutionRunning() {
return &types.WorkflowExecutionAlreadyCompletedError{Message: "Workflow execution already completed."}
}

taskGenerator := execution.NewMutableStateTaskGenerator(
t.shard.GetClusterMetadata(),
t.shard.GetDomainCache(),
t.logger,
mutableState,
)
return taskGenerator.GenerateCrossClusterTaskFromTransferTask(task, targetCluster)
},
)
}

func (t *transferActiveTaskExecutor) updateWorkflowExecution(
ctx context.Context,
wfContext execution.Context,
Expand Down
Loading

0 comments on commit ddfc127

Please sign in to comment.