Skip to content

Commit

Permalink
Notify queue processor about cross cluster tasks (cadence-workflow#4328)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Jul 28, 2021
1 parent ffbfdb7 commit adbffa4
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 0 deletions.
1 change: 1 addition & 0 deletions service/history/engine/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,6 @@ type (
NotifyNewHistoryEvent(event *events.Notification)
NotifyNewTransferTasks(executionInfo *persistence.WorkflowExecutionInfo, tasks []persistence.Task)
NotifyNewTimerTasks(executionInfo *persistence.WorkflowExecutionInfo, tasks []persistence.Task)
NotifyNewCrossClusterTasks(executionInfo *persistence.WorkflowExecutionInfo, tasks []persistence.Task)
}
)
12 changes: 12 additions & 0 deletions service/history/engine/interface_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions service/history/execution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,7 @@ func (c *contextImpl) notifyTasksFromWorkflowSnapshot(
workflowSnapShot.ExecutionInfo,
workflowSnapShot.TransferTasks,
workflowSnapShot.TimerTasks,
workflowSnapShot.CrossClusterTasks,
)
}

Expand All @@ -856,16 +857,19 @@ func (c *contextImpl) notifyTasksFromWorkflowMutation(
workflowMutation.ExecutionInfo,
workflowMutation.TransferTasks,
workflowMutation.TimerTasks,
workflowMutation.CrossClusterTasks,
)
}

func (c *contextImpl) notifyTasks(
executionInfo *persistence.WorkflowExecutionInfo,
transferTasks []persistence.Task,
timerTasks []persistence.Task,
crossClusterTasks []persistence.Task,
) {
c.shard.GetEngine().NotifyNewTransferTasks(executionInfo, transferTasks)
c.shard.GetEngine().NotifyNewTimerTasks(executionInfo, timerTasks)
c.shard.GetEngine().NotifyNewCrossClusterTasks(executionInfo, crossClusterTasks)
}

func (c *contextImpl) mergeContinueAsNewReplicationTasks(
Expand Down
32 changes: 32 additions & 0 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2670,6 +2670,38 @@ func (e *historyEngineImpl) NotifyNewTimerTasks(
}
}

func (e *historyEngineImpl) NotifyNewCrossClusterTasks(
executionInfo *persistence.WorkflowExecutionInfo,
tasks []persistence.Task,
) {
if e.crossClusterProcessor == nil {
// TODO: remove this check when crossClusterProcessor is wired up and initialized
return
}

taskByTargetCluster := make(map[string][]persistence.Task)
for _, task := range tasks {
// TODO: consider defining a new interface in persistence package
// for cross cluster tasks and add a method for returning the target cluster
var targetCluster string
switch crossClusterTask := task.(type) {
case *persistence.CrossClusterStartChildExecutionTask:
targetCluster = crossClusterTask.TargetCluster
case *persistence.CrossClusterCancelExecutionTask:
targetCluster = crossClusterTask.TargetCluster
case *persistence.CrossClusterSignalExecutionTask:
targetCluster = crossClusterTask.TargetCluster
default:
panic("encountered unknown cross cluster task type")
}
taskByTargetCluster[targetCluster] = append(taskByTargetCluster[targetCluster], task)
}

for targetCluster, tasks := range taskByTargetCluster {
e.crossClusterProcessor.NotifyNewTask(targetCluster, executionInfo, tasks)
}
}

func (e *historyEngineImpl) ResetTransferQueue(
ctx context.Context,
clusterName string,
Expand Down
1 change: 1 addition & 0 deletions service/history/ndc/activity_replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (s *activityReplicatorSuite) SetupTest() {
s.mockEngine.EXPECT().NotifyNewHistoryEvent(gomock.Any()).AnyTimes()
s.mockEngine.EXPECT().NotifyNewTransferTasks(gomock.Any(), gomock.Any()).AnyTimes()
s.mockEngine.EXPECT().NotifyNewTimerTasks(gomock.Any(), gomock.Any()).AnyTimes()
s.mockEngine.EXPECT().NotifyNewCrossClusterTasks(gomock.Any(), gomock.Any()).AnyTimes()
s.mockShard.SetEngine(s.mockEngine)

s.activityReplicator = NewActivityReplicator(
Expand Down
1 change: 1 addition & 0 deletions service/history/task/timer_active_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func (s *timerActiveTaskExecutorSuite) SetupTest() {
s.mockEngine.EXPECT().NotifyNewHistoryEvent(gomock.Any()).AnyTimes()
s.mockEngine.EXPECT().NotifyNewTransferTasks(gomock.Any(), gomock.Any()).AnyTimes()
s.mockEngine.EXPECT().NotifyNewTimerTasks(gomock.Any(), gomock.Any()).AnyTimes()
s.mockEngine.EXPECT().NotifyNewCrossClusterTasks(gomock.Any(), gomock.Any()).AnyTimes()
s.mockShard.SetEngine(s.mockEngine)

s.mockDomainCache = s.mockShard.Resource.DomainCache
Expand Down
1 change: 1 addition & 0 deletions service/history/task/timer_standby_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func (s *timerStandbyTaskExecutorSuite) SetupTest() {
s.mockEngine.EXPECT().NotifyNewHistoryEvent(gomock.Any()).AnyTimes()
s.mockEngine.EXPECT().NotifyNewTransferTasks(gomock.Any(), gomock.Any()).AnyTimes()
s.mockEngine.EXPECT().NotifyNewTimerTasks(gomock.Any(), gomock.Any()).AnyTimes()
s.mockEngine.EXPECT().NotifyNewCrossClusterTasks(gomock.Any(), gomock.Any()).AnyTimes()
s.mockShard.SetEngine(s.mockEngine)
s.mockNDCHistoryResender = ndc.NewMockHistoryResender(s.controller)

Expand Down
1 change: 1 addition & 0 deletions service/history/task/transfer_active_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func (s *transferActiveTaskExecutorSuite) SetupTest() {
s.mockEngine.EXPECT().NotifyNewHistoryEvent(gomock.Any()).AnyTimes()
s.mockEngine.EXPECT().NotifyNewTransferTasks(gomock.Any(), gomock.Any()).AnyTimes()
s.mockEngine.EXPECT().NotifyNewTimerTasks(gomock.Any(), gomock.Any()).AnyTimes()
s.mockEngine.EXPECT().NotifyNewCrossClusterTasks(gomock.Any(), gomock.Any()).AnyTimes()
s.mockShard.SetEngine(s.mockEngine)

s.mockParentClosePolicyClient = &parentclosepolicy.ClientMock{}
Expand Down

0 comments on commit adbffa4

Please sign in to comment.