Skip to content

Commit

Permalink
Bug fixes for cross domain operations (cadence-workflow#4623)
Browse files Browse the repository at this point in the history
- Remove InitiatedID field from recordChildCompleteTask
- Fix getRequestForApplyParentPolicy
- Remove errUnexpectedErrorFromTarget
  • Loading branch information
yycptt authored Dec 1, 2021
1 parent b730353 commit 624a1fc
Show file tree
Hide file tree
Showing 9 changed files with 25 additions and 38 deletions.
1 change: 0 additions & 1 deletion common/persistence/dataManagerInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,6 @@ type (
TargetDomainID string
TargetWorkflowID string
TargetRunID string
InitiatedID int64
Version int64
}

Expand Down
2 changes: 0 additions & 2 deletions common/persistence/nosql/nosqlExecutionStoreUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,6 @@ func (d *nosqlExecutionStore) prepareCrossClusterTasksForWorkflowTxn(
if targetRunID == "" {
targetRunID = p.CrossClusterTaskDefaultTargetRunID
}
scheduleID = task.(*p.CrossClusterRecordChildExecutionCompletedTask).InitiatedID

case p.CrossClusterTaskTypeApplyParentClosePolicy:
targetCluster = task.(*p.CrossClusterApplyParentClosePolicyTask).TargetCluster
Expand Down Expand Up @@ -495,7 +494,6 @@ func (d *nosqlExecutionStore) prepareTransferTasksForWorkflowTxn(
if targetRunID == "" {
targetRunID = p.TransferTaskTransferTargetRunID
}
scheduleID = task.(*p.RecordChildExecutionCompletedTask).InitiatedID

case p.TransferTaskTypeApplyParentClosePolicy:
targetDomainIDs = task.(*p.ApplyParentClosePolicyTask).TargetDomainIDs
Expand Down
3 changes: 1 addition & 2 deletions common/persistence/persistence-tests/executionManagerTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -2429,7 +2429,6 @@ func (s *ExecutionManagerSuite) validateCrossClusterTasks(
s.Equal(task.TargetDomainID, loadedTaskInfo[index].TargetDomainID)
s.Equal(task.TargetWorkflowID, loadedTaskInfo[index].TargetWorkflowID)
s.Equal(task.TargetRunID, loadedTaskInfo[index].TargetRunID)
s.Equal(task.InitiatedID, loadedTaskInfo[index].ScheduleID)
case *p.CrossClusterApplyParentClosePolicyTask:
s.Equal(task.TargetDomainIDs, loadedTaskInfo[index].GetTargetDomainIDs())
default:
Expand Down Expand Up @@ -2493,7 +2492,7 @@ func (s *ExecutionManagerSuite) TestTransferTasksComplete() {
&p.SignalExecutionTask{now, currentTransferID + 10005, targetDomainID, targetWorkflowID, targetRunID, true, scheduleID, 555},
&p.StartChildExecutionTask{now, currentTransferID + 10006, targetDomainID, targetWorkflowID, scheduleID, 666},
&p.RecordWorkflowClosedTask{now, currentTransferID + 10007, 777},
&p.RecordChildExecutionCompletedTask{now, currentTransferID + 10008, targetDomainID, targetWorkflowID, targetRunID, scheduleID, 888},
&p.RecordChildExecutionCompletedTask{now, currentTransferID + 10008, targetDomainID, targetWorkflowID, targetRunID, 888},
&p.ApplyParentClosePolicyTask{now, currentTransferID + 10009, map[string]struct{}{targetDomainID: {}}, 999},
}
versionHistory := p.NewVersionHistory([]byte{}, []*p.VersionHistoryItem{
Expand Down
2 changes: 0 additions & 2 deletions common/persistence/sql/sqlExecutionStoreUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,6 @@ func createCrossClusterTasks(
if targetRunID := task.(*p.CrossClusterRecordChildExecutionCompletedTask).TargetRunID; targetRunID != "" {
info.TargetRunID = serialization.MustParseUUID(targetRunID)
}
info.ScheduleID = task.(*p.CrossClusterRecordChildExecutionCompletedTask).InitiatedID

case p.CrossClusterTaskTypeApplyParentClosePolicy:
crossClusterTasksRows[i].TargetCluster = task.(*p.CrossClusterApplyParentClosePolicyTask).TargetCluster
Expand Down Expand Up @@ -932,7 +931,6 @@ func createTransferTasks(
if targetRunID := task.(*p.RecordChildExecutionCompletedTask).TargetRunID; targetRunID != "" {
info.TargetRunID = serialization.MustParseUUID(targetRunID)
}
info.ScheduleID = task.(*p.RecordChildExecutionCompletedTask).InitiatedID

case p.TransferTaskTypeApplyParentClosePolicy:
for targetDomainID := range task.(*p.ApplyParentClosePolicyTask).TargetDomainIDs {
Expand Down
3 changes: 0 additions & 3 deletions service/history/execution/mutable_state_task_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ func (r *mutableStateTaskGeneratorImpl) GenerateWorkflowCloseTasks(
TargetDomainID: executionInfo.ParentDomainID,
TargetWorkflowID: executionInfo.ParentWorkflowID,
TargetRunID: executionInfo.ParentRunID,
InitiatedID: executionInfo.InitiatedID,
Version: closeEvent.GetVersion(),
}

Expand Down Expand Up @@ -642,7 +641,6 @@ func (r *mutableStateTaskGeneratorImpl) GenerateCrossClusterRecordChildCompleted
TargetDomainID: parentInfo.DomainUUID,
TargetWorkflowID: parentInfo.GetExecution().GetWorkflowID(),
TargetRunID: parentInfo.GetExecution().GetRunID(),
InitiatedID: parentInfo.GetInitiatedID(),
Version: task.Version,
},
})
Expand Down Expand Up @@ -884,7 +882,6 @@ func (r *mutableStateTaskGeneratorImpl) GenerateFromCrossClusterTask(
TargetDomainID: task.TargetDomainID,
TargetWorkflowID: task.TargetWorkflowID,
TargetRunID: task.TargetRunID,
InitiatedID: task.ScheduleID,
}
if generateTransferTask {
newTask = recordChildExecutionCompletedTask
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ func (s *mutableStateTaskGeneratorSuite) TestGenerateWorkflowCloseTasks() {
TargetDomainID: constants.TestParentDomainID,
TargetWorkflowID: "parent workflowID",
TargetRunID: "parent runID",
InitiatedID: 101,
Version: version,
},
&persistence.ApplyParentClosePolicyTask{
Expand Down Expand Up @@ -284,7 +283,6 @@ func (s *mutableStateTaskGeneratorSuite) TestGenerateWorkflowCloseTasks() {
TargetDomainID: constants.TestRemoteTargetDomainID,
TargetWorkflowID: "parent workflowID",
TargetRunID: "parent runID",
InitiatedID: 101,
Version: version,
},
},
Expand Down Expand Up @@ -332,7 +330,6 @@ func (s *mutableStateTaskGeneratorSuite) TestGenerateWorkflowCloseTasks() {
TargetDomainID: constants.TestRemoteTargetDomainID,
TargetWorkflowID: "parent workflowID",
TargetRunID: "parent runID",
InitiatedID: 101,
Version: version,
},
},
Expand Down Expand Up @@ -514,7 +511,6 @@ func (s *mutableStateTaskGeneratorSuite) TestGenerateCrossClusterRecordChildComp
TargetDomainID: constants.TestParentDomainID,
TargetWorkflowID: constants.TestWorkflowID,
TargetRunID: constants.TestRunID,
InitiatedID: 123,
Version: 101,
},
}
Expand Down Expand Up @@ -651,7 +647,6 @@ func (s *mutableStateTaskGeneratorSuite) TestGenerateFromCrossClusterTask() {
TargetDomainID: constants.TestRemoteTargetDomainID,
TargetWorkflowID: constants.TestWorkflowID,
TargetRunID: constants.TestRunID,
InitiatedID: int64(123),
},
},
},
Expand All @@ -670,7 +665,6 @@ func (s *mutableStateTaskGeneratorSuite) TestGenerateFromCrossClusterTask() {
TargetDomainID: constants.TestRemoteTargetDomainID,
TargetWorkflowID: constants.TestWorkflowID,
TargetRunID: constants.TestRunID,
InitiatedID: int64(123),
},
},
},
Expand All @@ -688,7 +682,6 @@ func (s *mutableStateTaskGeneratorSuite) TestGenerateFromCrossClusterTask() {
TargetDomainID: constants.TestTargetDomainID,
TargetWorkflowID: constants.TestWorkflowID,
TargetRunID: constants.TestRunID,
InitiatedID: int64(123),
},
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,9 @@ func getWorkflowCloseTestCases() []struct {
expectedTaskState: ctask.TaskStateAcked,
willGenerateNewTask: false,
},
// UNEXPECTED ERROR
// UNEXPECTED ERROR for target,
// no error should be returned otherwise task will retry forever,
// task should still in pending state so it can be fetched again
{
targetError: types.CrossClusterTaskFailedCauseWorkflowAlreadyRunning.Ptr(),
// for unexpected errors we return errContinueExecution which is converted to nil
Expand Down
1 change: 0 additions & 1 deletion service/history/task/cross_cluster_target_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ var (
errUnknownTaskProcessingState = errors.New("unknown cross cluster task processing state")
errMissingTaskRequestAttributes = errors.New("request attributes not specified")
errDomainNotExists = errors.New("domain not exists")
errUnexpectedErrorFromTarget = errors.New("unexpected target error")
)

type (
Expand Down
40 changes: 21 additions & 19 deletions service/history/task/cross_cluster_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,28 +544,28 @@ func (t *crossClusterSourceTask) getRequestForApplyParentPolicy(
return nil, t.processingState, err
}
for _, childInfo := range children {
targetDomainEntry, err := execution.GetChildExecutionDomainEntry(childInfo, t.shard.GetDomainCache(), domainEntry)
// we already filtered the children so that child domainID is in task.TargetDomainIDs
// don't check if child domain is active or not here,
// we need to send the request even if the child domain is not active in target cluster
targetDomainID, err := execution.GetChildExecutionDomainID(childInfo, t.shard.GetDomainCache(), domainEntry)
if err != nil {
return nil, t.processingState, err
}
targetCluster := targetDomainEntry.GetReplicationConfig().ActiveClusterName
if targetCluster == t.targetCluster {

attributes.Children = append(
attributes.Children,
&types.ApplyParentClosePolicyRequest{
Child: &types.ApplyParentClosePolicyAttributes{
ChildDomainID: targetDomainEntry.GetInfo().ID,
ChildWorkflowID: childInfo.StartedWorkflowID,
ChildRunID: childInfo.StartedRunID,
ParentClosePolicy: &childInfo.ParentClosePolicy,
},
Status: &types.ApplyParentClosePolicyStatus{
Completed: false,
},

attributes.Children = append(
attributes.Children,
&types.ApplyParentClosePolicyRequest{
Child: &types.ApplyParentClosePolicyAttributes{
ChildDomainID: targetDomainID,
ChildWorkflowID: childInfo.StartedWorkflowID,
ChildRunID: childInfo.StartedRunID,
ParentClosePolicy: &childInfo.ParentClosePolicy,
},
)
}
Status: &types.ApplyParentClosePolicyStatus{
Completed: false,
},
},
)
}
return attributes, t.processingState, nil
}
Expand Down Expand Up @@ -786,14 +786,16 @@ func (t *crossClusterSourceTask) RecordResponse(response *types.CrossClusterTask
case persistence.CrossClusterTaskTypeApplyParentClosePolicy:
taskTypeMatch = response.GetTaskType() == types.CrossClusterTaskTypeApplyParentPolicy
emptyResponse = response.ApplyParentClosePolicyAttributes == nil
default:
return fmt.Errorf("unknown task type: %v", t.GetTaskType())
}

if !taskTypeMatch {
return fmt.Errorf("unexpected task type, expected: %v, actual: %v", t.GetTaskType(), response.GetTaskType())
}

if emptyResponse && response.FailedCause == nil {
return errors.New("empty cross cluster task response")
return fmt.Errorf("empty cross cluster task response, task type: %v", t.GetTaskType())
}

if response.FailedCause != nil {
Expand Down

0 comments on commit 624a1fc

Please sign in to comment.