From 131563b03d65741bbc1955395d77196e73d5726e Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Fri, 11 Feb 2022 15:42:16 -0800 Subject: [PATCH] Queue refactor part 3: task notification (#2489) --- service/history/historyEngine.go | 107 ++++++++---------- service/history/historyEngine2_test.go | 37 +++--- .../history/historyEngine3_eventsv2_test.go | 38 ++++--- service/history/historyEngine_test.go | 47 +++++--- service/history/nDCActivityReplicator_test.go | 22 ++-- service/history/replicatorQueueProcessor.go | 2 + service/history/shard/context_impl.go | 5 +- service/history/shard/context_test.go | 22 ++-- service/history/shard/engine.go | 5 +- service/history/shard/engine_mock.go | 48 +------- service/history/tasks/fake_task.go | 89 +++++++++++++++ .../timerQueueActiveTaskExecutor_test.go | 54 +++++---- service/history/timerQueueProcessor.go | 19 ++-- service/history/timerQueueProcessor_mock.go | 102 ----------------- .../timerQueueStandbyTaskExecutor_test.go | 58 +++++----- .../transferQueueActiveTaskExecutor_test.go | 26 +++-- service/history/transferQueueProcessor.go | 20 ++-- .../history/transferQueueProcessor_mock.go | 102 ----------------- service/history/visibilityQueueProcessor.go | 32 ++++-- service/history/workflow/transaction_impl.go | 33 +----- service/history/workflow/transaction_test.go | 5 +- .../history/workflowTaskHandlerCallbacks.go | 4 - 22 files changed, 365 insertions(+), 512 deletions(-) create mode 100644 service/history/tasks/fake_task.go diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 555b03c3f27..aadbb629f7a 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -46,6 +46,7 @@ import ( "go.temporal.io/server/client" "go.temporal.io/server/common/archiver/provider" "go.temporal.io/server/common/persistence/visibility/manager" + "go.temporal.io/server/service/history/queues" "go.temporal.io/server/service/history/tasks" enumsspb "go.temporal.io/server/api/enums/v1" @@ -95,12 +96,10 @@ type ( workflowTaskHandler workflowTaskHandlerCallbacks clusterMetadata cluster.Metadata executionManager persistence.ExecutionManager - txProcessor transferQueueProcessor - timerProcessor timerQueueProcessor - visibilityProcessor visibilityQueueProcessor + queueProcessors map[tasks.Category]queues.Processor + replicatorProcessor *replicatorQueueProcessorImpl nDCReplicator nDCHistoryReplicator nDCActivityReplicator nDCActivityReplicator - replicatorProcessor *replicatorQueueProcessorImpl eventNotifier events.Notifier tokenSerializer common.TaskTokenSerializer historyCache workflow.Cache @@ -184,12 +183,17 @@ func NewEngineWithShardContext( workflowDeleteManager: workflowDeleteManager, } - historyEngImpl.txProcessor = newTransferQueueProcessor(shard, historyEngImpl, + txProcessor := newTransferQueueProcessor(shard, historyEngImpl, matchingClient, historyClient, logger, clientBean, registry) - historyEngImpl.timerProcessor = newTimerQueueProcessor(shard, historyEngImpl, + timerProcessor := newTimerQueueProcessor(shard, historyEngImpl, matchingClient, logger, clientBean) - historyEngImpl.visibilityProcessor = newVisibilityQueueProcessor(shard, historyEngImpl, visibilityMgr, + visibilityProcessor := newVisibilityQueueProcessor(shard, historyEngImpl, visibilityMgr, matchingClient, historyClient, logger) + historyEngImpl.queueProcessors = map[tasks.Category]queues.Processor{ + txProcessor.Category(): txProcessor, + timerProcessor.Category(): timerProcessor, + visibilityProcessor.Category(): visibilityProcessor, + } historyEngImpl.eventsReapplier = newNDCEventsReapplier(shard.GetMetricsClient(), logger) if shard.GetClusterMetadata().IsGlobalNamespaceEnabled() { @@ -246,10 +250,8 @@ func (e *historyEngineImpl) Start() { e.logger.Info("", tag.LifeCycleStarting) defer e.logger.Info("", tag.LifeCycleStarted) - e.txProcessor.Start() - e.timerProcessor.Start() - if e.visibilityProcessor != nil { - e.visibilityProcessor.Start() + for _, queueProcessor := range e.queueProcessors { + queueProcessor.Start() } // failover callback will try to create a failover queue processor to scan all inflight tasks @@ -276,11 +278,10 @@ func (e *historyEngineImpl) Stop() { e.logger.Info("", tag.LifeCycleStopping) defer e.logger.Info("", tag.LifeCycleStopped) - e.txProcessor.Stop() - e.timerProcessor.Stop() - if e.visibilityProcessor != nil { - e.visibilityProcessor.Stop() + for _, queueProcessor := range e.queueProcessors { + queueProcessor.Stop() } + callbackID := getMetadataChangeCallbackID(common.HistoryServiceName, e.shard.GetShardID()) e.clusterMetadata.UnRegisterMetadataChangeCallback(callbackID) e.replicationTaskProcessorsLock.Lock() @@ -330,13 +331,15 @@ func (e *historyEngineImpl) registerNamespaceFailoverCallback() { e.shard.GetShardID(), 0, /* always want callback so UpdateHandoverNamespaces() can be called after shard reload */ func() { - e.txProcessor.LockTaskProcessing() - e.timerProcessor.LockTaskProcessing() + for _, queueProcessor := range e.queueProcessors { + queueProcessor.LockTaskProcessing() + } }, func(prevNamespaces []*namespace.Namespace, nextNamespaces []*namespace.Namespace) { defer func() { - e.txProcessor.UnlockTaskProcessing() - e.timerProcessor.UnlockTaskProcessing() + for _, queueProcessor := range e.queueProcessors { + queueProcessor.UnlockTaskProcessing() + } }() if len(nextNamespaces) == 0 { @@ -365,16 +368,18 @@ func (e *historyEngineImpl) registerNamespaceFailoverCallback() { if len(failoverNamespaceIDs) > 0 { e.logger.Info("Namespace Failover Start.", tag.WorkflowNamespaceIDs(failoverNamespaceIDs)) - e.txProcessor.FailoverNamespace(failoverNamespaceIDs) - e.timerProcessor.FailoverNamespace(failoverNamespaceIDs) + for _, queueProcessor := range e.queueProcessors { + queueProcessor.FailoverNamespace(failoverNamespaceIDs) + } - now := e.shard.GetTimeSource().Now() // the fake tasks will not be actually used, we just need to make sure // its length > 0 and has correct timestamp, to trigger a db scan - fakeWorkflowTask := []tasks.Task{&tasks.WorkflowTask{}} - fakeWorkflowTaskTimeoutTask := []tasks.Task{&tasks.WorkflowTaskTimeoutTask{VisibilityTimestamp: now}} - e.txProcessor.NotifyNewTask(e.currentClusterName, fakeWorkflowTask) - e.timerProcessor.NotifyNewTimers(e.currentClusterName, fakeWorkflowTaskTimeoutTask) + now := e.shard.GetTimeSource().Now() + fakeTasks := make(map[tasks.Category][]tasks.Task) + for category := range e.queueProcessors { + fakeTasks[category] = []tasks.Task{tasks.NewFakeTask(category, now)} + } + e.NotifyNewTasks(e.currentClusterName, fakeTasks) } // nolint:errcheck @@ -2380,8 +2385,9 @@ func (e *historyEngineImpl) SyncShardStatus( // 2. notify the timer gate in the timer queue standby processor // 3, notify the transfer (essentially a no op, just put it here so it looks symmetric) e.shard.SetCurrentTime(clusterName, now) - e.txProcessor.NotifyNewTask(clusterName, []tasks.Task{}) - e.timerProcessor.NotifyNewTimers(clusterName, []tasks.Task{}) + for _, processor := range e.queueProcessors { + processor.NotifyNewTasks(clusterName, []tasks.Task{}) + } return nil } @@ -2666,40 +2672,23 @@ func (e *historyEngineImpl) NotifyNewHistoryEvent( e.eventNotifier.NotifyNewHistoryEvent(notification) } -func (e *historyEngineImpl) NotifyNewTransferTasks( - clusterName string, - tasks []tasks.Task, -) { - if len(tasks) > 0 { - e.txProcessor.NotifyNewTask(clusterName, tasks) - } -} - -func (e *historyEngineImpl) NotifyNewTimerTasks( +func (e *historyEngineImpl) NotifyNewTasks( clusterName string, - tasks []tasks.Task, -) { - - if len(tasks) > 0 { - e.timerProcessor.NotifyNewTimers(clusterName, tasks) - } -} - -func (e *historyEngineImpl) NotifyNewReplicationTasks( - tasks []tasks.Task, -) { - - if len(tasks) > 0 && e.replicatorProcessor != nil { - e.replicatorProcessor.NotifyNewTasks(tasks) - } -} - -func (e *historyEngineImpl) NotifyNewVisibilityTasks( - tasks []tasks.Task, + newTasks map[tasks.Category][]tasks.Task, ) { + for category, tasksByCategory := range newTasks { + // TODO: make replicatorProcessor part of queueProcessors list + // and get rid of the special case here. + if category == tasks.CategoryReplication { + if e.replicatorProcessor != nil { + e.replicatorProcessor.NotifyNewTasks(tasksByCategory) + } + continue + } - if len(tasks) > 0 && e.visibilityProcessor != nil { - e.visibilityProcessor.NotifyNewTask(tasks) + if len(tasksByCategory) > 0 { + e.queueProcessors[category].NotifyNewTasks(clusterName, tasksByCategory) + } } } diff --git a/service/history/historyEngine2_test.go b/service/history/historyEngine2_test.go index a1769e4f3a0..a0591002867 100644 --- a/service/history/historyEngine2_test.go +++ b/service/history/historyEngine2_test.go @@ -55,7 +55,9 @@ import ( "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/events" + "go.temporal.io/server/service/history/queues" "go.temporal.io/server/service/history/shard" + "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/service/history/tests" "go.temporal.io/server/service/history/workflow" @@ -76,13 +78,14 @@ type ( suite.Suite *require.Assertions - controller *gomock.Controller - mockShard *shard.ContextTest - mockTxProcessor *MocktransferQueueProcessor - mockTimerProcessor *MocktimerQueueProcessor - mockEventsCache *events.MockCache - mockNamespaceCache *namespace.MockRegistry - mockClusterMetadata *cluster.MockMetadata + controller *gomock.Controller + mockShard *shard.ContextTest + mockTxProcessor *queues.MockProcessor + mockTimerProcessor *queues.MockProcessor + mockVisibilityProcessor *queues.MockProcessor + mockEventsCache *events.MockCache + mockNamespaceCache *namespace.MockRegistry + mockClusterMetadata *cluster.MockMetadata historyEngine *historyEngineImpl mockExecutionMgr *persistence.MockExecutionManager @@ -109,10 +112,15 @@ func (s *engine2Suite) SetupTest() { s.controller = gomock.NewController(s.T()) - s.mockTxProcessor = NewMocktransferQueueProcessor(s.controller) - s.mockTimerProcessor = NewMocktimerQueueProcessor(s.controller) - s.mockTxProcessor.EXPECT().NotifyNewTask(gomock.Any(), gomock.Any()).AnyTimes() - s.mockTimerProcessor.EXPECT().NotifyNewTimers(gomock.Any(), gomock.Any()).AnyTimes() + s.mockTxProcessor = queues.NewMockProcessor(s.controller) + s.mockTimerProcessor = queues.NewMockProcessor(s.controller) + s.mockVisibilityProcessor = queues.NewMockProcessor(s.controller) + s.mockTxProcessor.EXPECT().Category().Return(tasks.CategoryTransfer).AnyTimes() + s.mockTimerProcessor.EXPECT().Category().Return(tasks.CategoryTimer).AnyTimes() + s.mockVisibilityProcessor.EXPECT().Category().Return(tasks.CategoryVisibility).AnyTimes() + s.mockTxProcessor.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).AnyTimes() + s.mockTimerProcessor.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).AnyTimes() + s.mockVisibilityProcessor.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).AnyTimes() s.config = tests.NewDynamicConfig() mockShard := shard.NewTestContext( @@ -155,8 +163,11 @@ func (s *engine2Suite) SetupTest() { config: s.config, timeSource: s.mockShard.GetTimeSource(), eventNotifier: events.NewNotifier(clock.NewRealTimeSource(), metrics.NewNoopMetricsClient(), func(namespace.ID, string) int32 { return 1 }), - txProcessor: s.mockTxProcessor, - timerProcessor: s.mockTimerProcessor, + queueProcessors: map[tasks.Category]queues.Processor{ + s.mockTxProcessor.Category(): s.mockTxProcessor, + s.mockTimerProcessor.Category(): s.mockTimerProcessor, + s.mockVisibilityProcessor.Category(): s.mockVisibilityProcessor, + }, searchAttributesValidator: searchattribute.NewValidator( searchattribute.NewTestProvider(), s.mockShard.Resource.SearchAttributesMapper, diff --git a/service/history/historyEngine3_eventsv2_test.go b/service/history/historyEngine3_eventsv2_test.go index 8db12786ec4..206eda16eaa 100644 --- a/service/history/historyEngine3_eventsv2_test.go +++ b/service/history/historyEngine3_eventsv2_test.go @@ -54,7 +54,9 @@ import ( "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/events" + "go.temporal.io/server/service/history/queues" "go.temporal.io/server/service/history/shard" + "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/service/history/tests" "go.temporal.io/server/service/history/workflow" ) @@ -64,13 +66,14 @@ type ( suite.Suite *require.Assertions - controller *gomock.Controller - mockShard *shard.ContextTest - mockTxProcessor *MocktransferQueueProcessor - mockTimerProcessor *MocktimerQueueProcessor - mockEventsCache *events.MockCache - mockNamespaceCache *namespace.MockRegistry - mockClusterMetadata *cluster.MockMetadata + controller *gomock.Controller + mockShard *shard.ContextTest + mockTxProcessor *queues.MockProcessor + mockTimerProcessor *queues.MockProcessor + mockVisibilityProcessor *queues.MockProcessor + mockEventsCache *events.MockCache + mockNamespaceCache *namespace.MockRegistry + mockClusterMetadata *cluster.MockMetadata historyEngine *historyEngineImpl mockExecutionMgr *persistence.MockExecutionManager @@ -96,10 +99,16 @@ func (s *engine3Suite) SetupTest() { s.Assertions = require.New(s.T()) s.controller = gomock.NewController(s.T()) - s.mockTxProcessor = NewMocktransferQueueProcessor(s.controller) - s.mockTimerProcessor = NewMocktimerQueueProcessor(s.controller) - s.mockTxProcessor.EXPECT().NotifyNewTask(gomock.Any(), gomock.Any()).AnyTimes() - s.mockTimerProcessor.EXPECT().NotifyNewTimers(gomock.Any(), gomock.Any()).AnyTimes() + + s.mockTxProcessor = queues.NewMockProcessor(s.controller) + s.mockTimerProcessor = queues.NewMockProcessor(s.controller) + s.mockVisibilityProcessor = queues.NewMockProcessor(s.controller) + s.mockTxProcessor.EXPECT().Category().Return(tasks.CategoryTransfer).AnyTimes() + s.mockTimerProcessor.EXPECT().Category().Return(tasks.CategoryTimer).AnyTimes() + s.mockVisibilityProcessor.EXPECT().Category().Return(tasks.CategoryVisibility).AnyTimes() + s.mockTxProcessor.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).AnyTimes() + s.mockTimerProcessor.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).AnyTimes() + s.mockVisibilityProcessor.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).AnyTimes() s.mockShard = shard.NewTestContext( s.controller, @@ -137,8 +146,11 @@ func (s *engine3Suite) SetupTest() { config: s.config, timeSource: s.mockShard.GetTimeSource(), eventNotifier: events.NewNotifier(clock.NewRealTimeSource(), metrics.NewNoopMetricsClient(), func(namespace.ID, string) int32 { return 1 }), - txProcessor: s.mockTxProcessor, - timerProcessor: s.mockTimerProcessor, + queueProcessors: map[tasks.Category]queues.Processor{ + s.mockTxProcessor.Category(): s.mockTxProcessor, + s.mockTimerProcessor.Category(): s.mockTimerProcessor, + s.mockVisibilityProcessor.Category(): s.mockVisibilityProcessor, + }, } s.mockShard.SetEngineForTesting(h) h.workflowTaskHandler = newWorkflowTaskHandlerCallback(h) diff --git a/service/history/historyEngine_test.go b/service/history/historyEngine_test.go index fee3c6b86c1..397b38f863e 100644 --- a/service/history/historyEngine_test.go +++ b/service/history/historyEngine_test.go @@ -71,7 +71,9 @@ import ( "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/events" + "go.temporal.io/server/service/history/queues" "go.temporal.io/server/service/history/shard" + "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/service/history/tests" "go.temporal.io/server/service/history/workflow" ) @@ -81,16 +83,17 @@ type ( suite.Suite *require.Assertions - controller *gomock.Controller - mockShard *shard.ContextTest - mockTxProcessor *MocktransferQueueProcessor - mockTimerProcessor *MocktimerQueueProcessor - mockNamespaceCache *namespace.MockRegistry - mockMatchingClient *matchingservicemock.MockMatchingServiceClient - mockHistoryClient *historyservicemock.MockHistoryServiceClient - mockClusterMetadata *cluster.MockMetadata - mockEventsReapplier *MocknDCEventsReapplier - mockWorkflowResetter *MockworkflowResetter + controller *gomock.Controller + mockShard *shard.ContextTest + mockTxProcessor *queues.MockProcessor + mockTimerProcessor *queues.MockProcessor + mockVisibilityProcessor *queues.MockProcessor + mockNamespaceCache *namespace.MockRegistry + mockMatchingClient *matchingservicemock.MockMatchingServiceClient + mockHistoryClient *historyservicemock.MockHistoryServiceClient + mockClusterMetadata *cluster.MockMetadata + mockEventsReapplier *MocknDCEventsReapplier + mockWorkflowResetter *MockworkflowResetter mockHistoryEngine *historyEngineImpl mockExecutionMgr *persistence.MockExecutionManager @@ -117,12 +120,17 @@ func (s *engineSuite) SetupTest() { s.Assertions = require.New(s.T()) s.controller = gomock.NewController(s.T()) - s.mockTxProcessor = NewMocktransferQueueProcessor(s.controller) - s.mockTimerProcessor = NewMocktimerQueueProcessor(s.controller) s.mockEventsReapplier = NewMocknDCEventsReapplier(s.controller) s.mockWorkflowResetter = NewMockworkflowResetter(s.controller) - s.mockTxProcessor.EXPECT().NotifyNewTask(gomock.Any(), gomock.Any()).AnyTimes() - s.mockTimerProcessor.EXPECT().NotifyNewTimers(gomock.Any(), gomock.Any()).AnyTimes() + s.mockTxProcessor = queues.NewMockProcessor(s.controller) + s.mockTimerProcessor = queues.NewMockProcessor(s.controller) + s.mockVisibilityProcessor = queues.NewMockProcessor(s.controller) + s.mockTxProcessor.EXPECT().Category().Return(tasks.CategoryTransfer).AnyTimes() + s.mockTimerProcessor.EXPECT().Category().Return(tasks.CategoryTimer).AnyTimes() + s.mockVisibilityProcessor.EXPECT().Category().Return(tasks.CategoryVisibility).AnyTimes() + s.mockTxProcessor.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).AnyTimes() + s.mockTimerProcessor.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).AnyTimes() + s.mockVisibilityProcessor.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).AnyTimes() s.config = tests.NewDynamicConfig() s.mockShard = shard.NewTestContext( @@ -181,10 +189,13 @@ func (s *engineSuite) SetupTest() { tokenSerializer: common.NewProtoTaskTokenSerializer(), eventNotifier: eventNotifier, config: s.config, - txProcessor: s.mockTxProcessor, - timerProcessor: s.mockTimerProcessor, - eventsReapplier: s.mockEventsReapplier, - workflowResetter: s.mockWorkflowResetter, + queueProcessors: map[tasks.Category]queues.Processor{ + s.mockTxProcessor.Category(): s.mockTxProcessor, + s.mockTimerProcessor.Category(): s.mockTimerProcessor, + s.mockVisibilityProcessor.Category(): s.mockVisibilityProcessor, + }, + eventsReapplier: s.mockEventsReapplier, + workflowResetter: s.mockWorkflowResetter, } s.mockShard.SetEngineForTesting(h) h.workflowTaskHandler = newWorkflowTaskHandlerCallback(h) diff --git a/service/history/nDCActivityReplicator_test.go b/service/history/nDCActivityReplicator_test.go index 7cf2fae1b3e..833134588d0 100644 --- a/service/history/nDCActivityReplicator_test.go +++ b/service/history/nDCActivityReplicator_test.go @@ -51,7 +51,9 @@ import ( "go.temporal.io/server/common/primitives/timestamp" serviceerrors "go.temporal.io/server/common/serviceerror" "go.temporal.io/server/service/history/events" + "go.temporal.io/server/service/history/queues" "go.temporal.io/server/service/history/shard" + "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/service/history/tests" "go.temporal.io/server/service/history/workflow" ) @@ -63,8 +65,8 @@ type ( controller *gomock.Controller mockShard *shard.ContextTest - mockTxProcessor *MocktransferQueueProcessor - mockTimerProcessor *MocktimerQueueProcessor + mockTxProcessor *queues.MockProcessor + mockTimerProcessor *queues.MockProcessor mockNamespaceCache *namespace.MockRegistry mockClusterMetadata *cluster.MockMetadata mockMutableState *workflow.MockMutableState @@ -96,10 +98,12 @@ func (s *activityReplicatorSuite) SetupTest() { s.controller = gomock.NewController(s.T()) s.mockMutableState = workflow.NewMockMutableState(s.controller) - s.mockTxProcessor = NewMocktransferQueueProcessor(s.controller) - s.mockTimerProcessor = NewMocktimerQueueProcessor(s.controller) - s.mockTxProcessor.EXPECT().NotifyNewTask(gomock.Any(), gomock.Any()).AnyTimes() - s.mockTimerProcessor.EXPECT().NotifyNewTimers(gomock.Any(), gomock.Any()).AnyTimes() + s.mockTxProcessor = queues.NewMockProcessor(s.controller) + s.mockTimerProcessor = queues.NewMockProcessor(s.controller) + s.mockTxProcessor.EXPECT().Category().Return(tasks.CategoryTransfer).AnyTimes() + s.mockTimerProcessor.EXPECT().Category().Return(tasks.CategoryTimer).AnyTimes() + s.mockTxProcessor.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).AnyTimes() + s.mockTimerProcessor.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).AnyTimes() s.mockShard = shard.NewTestContext( s.controller, @@ -137,8 +141,10 @@ func (s *activityReplicatorSuite) SetupTest() { metrics.NewNoopMetricsClient(), func(namespace.ID, string) int32 { return 1 }, ), - txProcessor: s.mockTxProcessor, - timerProcessor: s.mockTimerProcessor, + queueProcessors: map[tasks.Category]queues.Processor{ + s.mockTxProcessor.Category(): s.mockTxProcessor, + s.mockTimerProcessor.Category(): s.mockTimerProcessor, + }, } s.mockShard.SetEngineForTesting(engine) diff --git a/service/history/replicatorQueueProcessor.go b/service/history/replicatorQueueProcessor.go index d746a5c3065..af4aa106033 100644 --- a/service/history/replicatorQueueProcessor.go +++ b/service/history/replicatorQueueProcessor.go @@ -54,6 +54,8 @@ import ( ) type ( + // TODO: define a new interface for replication queue processor in queues/queue.go + replicatorQueueProcessorImpl struct { currentClusterName string shard shard.Context diff --git a/service/history/shard/context_impl.go b/service/history/shard/context_impl.go index 17bf281e094..cdb0596aa85 100644 --- a/service/history/shard/context_impl.go +++ b/service/history/shard/context_impl.go @@ -700,10 +700,7 @@ func (s *ContextImpl) addTasksLocked( if err = s.handleErrorAndUpdateMaxReadLevelLocked(err, transferMaxReadLevel); err != nil { return err } - s.engine.NotifyNewTransferTasks(namespaceEntry.ActiveClusterName(), request.Tasks[tasks.CategoryTransfer]) - s.engine.NotifyNewTimerTasks(namespaceEntry.ActiveClusterName(), request.Tasks[tasks.CategoryTimer]) - s.engine.NotifyNewVisibilityTasks(request.Tasks[tasks.CategoryVisibility]) - s.engine.NotifyNewReplicationTasks(request.Tasks[tasks.CategoryReplication]) + s.engine.NotifyNewTasks(namespaceEntry.ActiveClusterName(), request.Tasks) return nil } diff --git a/service/history/shard/context_test.go b/service/history/shard/context_test.go index b6f85d24c12..bd745116607 100644 --- a/service/history/shard/context_test.go +++ b/service/history/shard/context_test.go @@ -116,10 +116,12 @@ func (s *contextSuite) TestAddTasks_Success() { VisibilityTime: timestamp.TimeNowPtrUtc(), } - transferTasks := []tasks.Task{&tasks.ActivityTask{}} // Just for testing purpose. In the real code ActivityTask can't be passed to shardContext.AddTasks. - timerTasks := []tasks.Task{&tasks.ActivityRetryTimerTask{}} // Just for testing purpose. In the real code ActivityRetryTimerTask can't be passed to shardContext.AddTasks. - replicationTasks := []tasks.Task{&tasks.HistoryReplicationTask{}} // Just for testing purpose. In the real code HistoryReplicationTask can't be passed to shardContext.AddTasks. - visibilityTasks := []tasks.Task{&tasks.DeleteExecutionVisibilityTask{}} + tasks := map[tasks.Category][]tasks.Task{ + tasks.CategoryTransfer: {&tasks.ActivityTask{}}, // Just for testing purpose. In the real code ActivityTask can't be passed to shardContext.AddTasks. + tasks.CategoryTimer: {&tasks.ActivityRetryTimerTask{}}, // Just for testing purpose. In the real code ActivityRetryTimerTask can't be passed to shardContext.AddTasks. + tasks.CategoryReplication: {&tasks.HistoryReplicationTask{}}, // Just for testing purpose. In the real code HistoryReplicationTask can't be passed to shardContext.AddTasks. + tasks.CategoryVisibility: {&tasks.DeleteExecutionVisibilityTask{}}, + } addTasksRequest := &persistence.AddTasksRequest{ ShardID: s.shardContext.GetShardID(), @@ -127,21 +129,13 @@ func (s *contextSuite) TestAddTasks_Success() { WorkflowID: task.GetWorkflowId(), RunID: task.GetRunId(), - Tasks: map[tasks.Category][]tasks.Task{ - tasks.CategoryTransfer: transferTasks, - tasks.CategoryTimer: timerTasks, - tasks.CategoryReplication: replicationTasks, - tasks.CategoryVisibility: visibilityTasks, - }, + Tasks: tasks, } s.mockNamespaceCache.EXPECT().GetNamespaceByID(s.namespaceID).Return(s.namespaceEntry, nil) s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName) s.mockExecutionManager.EXPECT().AddTasks(addTasksRequest).Return(nil) - s.mockHistoryEngine.EXPECT().NotifyNewTransferTasks(gomock.Any(), transferTasks) - s.mockHistoryEngine.EXPECT().NotifyNewTimerTasks(gomock.Any(), timerTasks) - s.mockHistoryEngine.EXPECT().NotifyNewVisibilityTasks(visibilityTasks) - s.mockHistoryEngine.EXPECT().NotifyNewReplicationTasks(replicationTasks) + s.mockHistoryEngine.EXPECT().NotifyNewTasks(gomock.Any(), tasks) err := s.shardContext.AddTasks(addTasksRequest) s.NoError(err) diff --git a/service/history/shard/engine.go b/service/history/shard/engine.go index 063be380e45..4a7e4f164bc 100644 --- a/service/history/shard/engine.go +++ b/service/history/shard/engine.go @@ -84,9 +84,6 @@ type ( GetReplicationStatus(ctx context.Context, request *historyservice.GetReplicationStatusRequest) (*historyservice.ShardReplicationStatus, error) NotifyNewHistoryEvent(event *events.Notification) - NotifyNewTransferTasks(clusterName string, tasks []tasks.Task) - NotifyNewTimerTasks(clusterName string, tasks []tasks.Task) - NotifyNewVisibilityTasks(tasks []tasks.Task) - NotifyNewReplicationTasks(tasks []tasks.Task) + NotifyNewTasks(clusterName string, tasks map[tasks.Category][]tasks.Task) } ) diff --git a/service/history/shard/engine_mock.go b/service/history/shard/engine_mock.go index 7bee11523d8..50ccd68462f 100644 --- a/service/history/shard/engine_mock.go +++ b/service/history/shard/engine_mock.go @@ -227,52 +227,16 @@ func (mr *MockEngineMockRecorder) NotifyNewHistoryEvent(event interface{}) *gomo return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NotifyNewHistoryEvent", reflect.TypeOf((*MockEngine)(nil).NotifyNewHistoryEvent), event) } -// NotifyNewReplicationTasks mocks base method. -func (m *MockEngine) NotifyNewReplicationTasks(tasks []tasks.Task) { +// NotifyNewTasks mocks base method. +func (m *MockEngine) NotifyNewTasks(clusterName string, tasks map[tasks.Category][]tasks.Task) { m.ctrl.T.Helper() - m.ctrl.Call(m, "NotifyNewReplicationTasks", tasks) + m.ctrl.Call(m, "NotifyNewTasks", clusterName, tasks) } -// NotifyNewReplicationTasks indicates an expected call of NotifyNewReplicationTasks. -func (mr *MockEngineMockRecorder) NotifyNewReplicationTasks(tasks interface{}) *gomock.Call { +// NotifyNewTasks indicates an expected call of NotifyNewTasks. +func (mr *MockEngineMockRecorder) NotifyNewTasks(clusterName, tasks interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NotifyNewReplicationTasks", reflect.TypeOf((*MockEngine)(nil).NotifyNewReplicationTasks), tasks) -} - -// NotifyNewTimerTasks mocks base method. -func (m *MockEngine) NotifyNewTimerTasks(clusterName string, tasks []tasks.Task) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "NotifyNewTimerTasks", clusterName, tasks) -} - -// NotifyNewTimerTasks indicates an expected call of NotifyNewTimerTasks. -func (mr *MockEngineMockRecorder) NotifyNewTimerTasks(clusterName, tasks interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NotifyNewTimerTasks", reflect.TypeOf((*MockEngine)(nil).NotifyNewTimerTasks), clusterName, tasks) -} - -// NotifyNewTransferTasks mocks base method. -func (m *MockEngine) NotifyNewTransferTasks(clusterName string, tasks []tasks.Task) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "NotifyNewTransferTasks", clusterName, tasks) -} - -// NotifyNewTransferTasks indicates an expected call of NotifyNewTransferTasks. -func (mr *MockEngineMockRecorder) NotifyNewTransferTasks(clusterName, tasks interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NotifyNewTransferTasks", reflect.TypeOf((*MockEngine)(nil).NotifyNewTransferTasks), clusterName, tasks) -} - -// NotifyNewVisibilityTasks mocks base method. -func (m *MockEngine) NotifyNewVisibilityTasks(tasks []tasks.Task) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "NotifyNewVisibilityTasks", tasks) -} - -// NotifyNewVisibilityTasks indicates an expected call of NotifyNewVisibilityTasks. -func (mr *MockEngineMockRecorder) NotifyNewVisibilityTasks(tasks interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NotifyNewVisibilityTasks", reflect.TypeOf((*MockEngine)(nil).NotifyNewVisibilityTasks), tasks) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NotifyNewTasks", reflect.TypeOf((*MockEngine)(nil).NotifyNewTasks), clusterName, tasks) } // PollMutableState mocks base method. diff --git a/service/history/tasks/fake_task.go b/service/history/tasks/fake_task.go new file mode 100644 index 00000000000..37a6cabd0eb --- /dev/null +++ b/service/history/tasks/fake_task.go @@ -0,0 +1,89 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package tasks + +import ( + "time" + + "go.temporal.io/server/common" + "go.temporal.io/server/common/definition" +) + +type ( + fakeTask struct { + definition.WorkflowKey + VisibilityTimestamp time.Time + TaskID int64 + Version int64 + Category Category + } +) + +func NewFakeTask( + category Category, + visibilityTimestamp time.Time, +) Task { + return &fakeTask{ + TaskID: common.EmptyEventTaskID, + Version: common.EmptyVersion, + VisibilityTimestamp: visibilityTimestamp, + Category: category, + } +} + +func (f *fakeTask) GetKey() Key { + return Key{ + FireTime: f.VisibilityTimestamp, + TaskID: f.TaskID, + } +} + +func (f *fakeTask) GetVersion() int64 { + return f.Version +} + +func (f *fakeTask) SetVersion(version int64) { + f.Version = version +} + +func (f *fakeTask) GetTaskID() int64 { + return f.TaskID +} + +func (f *fakeTask) SetTaskID(id int64) { + f.TaskID = id +} + +func (f *fakeTask) GetVisibilityTime() time.Time { + return f.VisibilityTimestamp +} + +func (f *fakeTask) SetVisibilityTime(t time.Time) { + f.VisibilityTimestamp = t +} + +func (f *fakeTask) GetCategory() Category { + return f.Category +} diff --git a/service/history/timerQueueActiveTaskExecutor_test.go b/service/history/timerQueueActiveTaskExecutor_test.go index 9f63f4a2eea..6646813a1ad 100644 --- a/service/history/timerQueueActiveTaskExecutor_test.go +++ b/service/history/timerQueueActiveTaskExecutor_test.go @@ -54,6 +54,7 @@ import ( "go.temporal.io/server/common/persistence/versionhistory" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/service/history/events" + "go.temporal.io/server/service/history/queues" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/service/history/tests" @@ -65,13 +66,14 @@ type ( suite.Suite *require.Assertions - controller *gomock.Controller - mockShard *shard.ContextTest - mockTxProcessor *MocktransferQueueProcessor - mockTimerProcessor *MocktimerQueueProcessor - mockNamespaceCache *namespace.MockRegistry - mockMatchingClient *matchingservicemock.MockMatchingServiceClient - mockClusterMetadata *cluster.MockMetadata + controller *gomock.Controller + mockShard *shard.ContextTest + mockTxProcessor *queues.MockProcessor + mockTimerProcessor *queues.MockProcessor + mockVisibilityProcessor *queues.MockProcessor + mockNamespaceCache *namespace.MockRegistry + mockMatchingClient *matchingservicemock.MockMatchingServiceClient + mockClusterMetadata *cluster.MockMetadata mockHistoryEngine *historyEngineImpl mockDeleteManager *workflow.MockDeleteManager @@ -106,10 +108,15 @@ func (s *timerQueueActiveTaskExecutorSuite) SetupTest() { s.timeSource = clock.NewEventTimeSource().Update(s.now) s.controller = gomock.NewController(s.T()) - s.mockTxProcessor = NewMocktransferQueueProcessor(s.controller) - s.mockTimerProcessor = NewMocktimerQueueProcessor(s.controller) - s.mockTxProcessor.EXPECT().NotifyNewTask(gomock.Any(), gomock.Any()).AnyTimes() - s.mockTimerProcessor.EXPECT().NotifyNewTimers(gomock.Any(), gomock.Any()).AnyTimes() + s.mockTxProcessor = queues.NewMockProcessor(s.controller) + s.mockTimerProcessor = queues.NewMockProcessor(s.controller) + s.mockVisibilityProcessor = queues.NewMockProcessor(s.controller) + s.mockTxProcessor.EXPECT().Category().Return(tasks.CategoryTransfer).AnyTimes() + s.mockTimerProcessor.EXPECT().Category().Return(tasks.CategoryTimer).AnyTimes() + s.mockVisibilityProcessor.EXPECT().Category().Return(tasks.CategoryVisibility).AnyTimes() + s.mockTxProcessor.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).AnyTimes() + s.mockTimerProcessor.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).AnyTimes() + s.mockVisibilityProcessor.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).AnyTimes() config := tests.NewDynamicConfig() s.mockShard = shard.NewTestContextWithTimeSource( @@ -150,17 +157,20 @@ func (s *timerQueueActiveTaskExecutorSuite) SetupTest() { historyCache := workflow.NewCache(s.mockShard) s.mockDeleteManager = workflow.NewMockDeleteManager(s.controller) h := &historyEngineImpl{ - currentClusterName: s.mockShard.Resource.GetClusterMetadata().GetCurrentClusterName(), - shard: s.mockShard, - clusterMetadata: s.mockClusterMetadata, - executionManager: s.mockExecutionMgr, - historyCache: historyCache, - logger: s.logger, - tokenSerializer: common.NewProtoTaskTokenSerializer(), - metricsClient: s.mockShard.GetMetricsClient(), - eventNotifier: events.NewNotifier(clock.NewRealTimeSource(), metrics.NewNoopMetricsClient(), func(namespace.ID, string) int32 { return 1 }), - txProcessor: s.mockTxProcessor, - timerProcessor: s.mockTimerProcessor, + currentClusterName: s.mockShard.Resource.GetClusterMetadata().GetCurrentClusterName(), + shard: s.mockShard, + clusterMetadata: s.mockClusterMetadata, + executionManager: s.mockExecutionMgr, + historyCache: historyCache, + logger: s.logger, + tokenSerializer: common.NewProtoTaskTokenSerializer(), + metricsClient: s.mockShard.GetMetricsClient(), + eventNotifier: events.NewNotifier(clock.NewRealTimeSource(), metrics.NewNoopMetricsClient(), func(namespace.ID, string) int32 { return 1 }), + queueProcessors: map[tasks.Category]queues.Processor{ + s.mockTxProcessor.Category(): s.mockTxProcessor, + s.mockTimerProcessor.Category(): s.mockTimerProcessor, + s.mockVisibilityProcessor.Category(): s.mockVisibilityProcessor, + }, workflowDeleteManager: s.mockDeleteManager, } s.mockShard.SetEngineForTesting(h) diff --git a/service/history/timerQueueProcessor.go b/service/history/timerQueueProcessor.go index 3afd82ad76d..b01c47e257d 100644 --- a/service/history/timerQueueProcessor.go +++ b/service/history/timerQueueProcessor.go @@ -46,6 +46,7 @@ import ( "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/xdc" "go.temporal.io/server/service/history/configs" + "go.temporal.io/server/service/history/queues" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" ) @@ -56,14 +57,6 @@ var ( ) type ( - timerQueueProcessor interface { - common.Daemon - FailoverNamespace(namespaceIDs map[string]struct{}) - NotifyNewTimers(clusterName string, timerTask []tasks.Task) - LockTaskProcessing() - UnlockTaskProcessing() - } - timeNow func() time.Time updateTimerAckLevel func(timerKey) error timerQueueShutdown func() error @@ -94,7 +87,7 @@ func newTimerQueueProcessor( matchingClient matchingservice.MatchingServiceClient, logger log.Logger, clientBean client.Bean, -) timerQueueProcessor { +) queues.Processor { currentClusterName := shard.GetClusterMetadata().GetCurrentClusterName() config := shard.GetConfig() @@ -157,9 +150,9 @@ func (t *timerQueueProcessorImpl) Stop() { common.AwaitWaitGroup(&t.shutdownWG, time.Minute) } -// NotifyNewTimers - Notify the processor about the new active / standby timer arrival. +// NotifyNewTasks - Notify the processor about the new active / standby timer arrival. // This should be called each time new timer arrives, otherwise timers maybe fired unexpected. -func (t *timerQueueProcessorImpl) NotifyNewTimers( +func (t *timerQueueProcessorImpl) NotifyNewTasks( clusterName string, timerTasks []tasks.Task, ) { @@ -245,6 +238,10 @@ func (t *timerQueueProcessorImpl) UnlockTaskProcessing() { t.taskAllocator.unlock() } +func (t *timerQueueProcessorImpl) Category() tasks.Category { + return tasks.CategoryTimer +} + func (t *timerQueueProcessorImpl) completeTimersLoop() { defer t.shutdownWG.Done() diff --git a/service/history/timerQueueProcessor_mock.go b/service/history/timerQueueProcessor_mock.go index 6f0f1eaa851..df0ff846158 100644 --- a/service/history/timerQueueProcessor_mock.go +++ b/service/history/timerQueueProcessor_mock.go @@ -27,105 +27,3 @@ // Package history is a generated GoMock package. package history - -import ( - reflect "reflect" - - gomock "github.com/golang/mock/gomock" - tasks "go.temporal.io/server/service/history/tasks" -) - -// MocktimerQueueProcessor is a mock of timerQueueProcessor interface. -type MocktimerQueueProcessor struct { - ctrl *gomock.Controller - recorder *MocktimerQueueProcessorMockRecorder -} - -// MocktimerQueueProcessorMockRecorder is the mock recorder for MocktimerQueueProcessor. -type MocktimerQueueProcessorMockRecorder struct { - mock *MocktimerQueueProcessor -} - -// NewMocktimerQueueProcessor creates a new mock instance. -func NewMocktimerQueueProcessor(ctrl *gomock.Controller) *MocktimerQueueProcessor { - mock := &MocktimerQueueProcessor{ctrl: ctrl} - mock.recorder = &MocktimerQueueProcessorMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MocktimerQueueProcessor) EXPECT() *MocktimerQueueProcessorMockRecorder { - return m.recorder -} - -// FailoverNamespace mocks base method. -func (m *MocktimerQueueProcessor) FailoverNamespace(namespaceIDs map[string]struct{}) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "FailoverNamespace", namespaceIDs) -} - -// FailoverNamespace indicates an expected call of FailoverNamespace. -func (mr *MocktimerQueueProcessorMockRecorder) FailoverNamespace(namespaceIDs interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FailoverNamespace", reflect.TypeOf((*MocktimerQueueProcessor)(nil).FailoverNamespace), namespaceIDs) -} - -// LockTaskProcessing mocks base method. -func (m *MocktimerQueueProcessor) LockTaskProcessing() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "LockTaskProcessing") -} - -// LockTaskProcessing indicates an expected call of LockTaskProcessing. -func (mr *MocktimerQueueProcessorMockRecorder) LockTaskProcessing() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LockTaskProcessing", reflect.TypeOf((*MocktimerQueueProcessor)(nil).LockTaskProcessing)) -} - -// NotifyNewTimers mocks base method. -func (m *MocktimerQueueProcessor) NotifyNewTimers(clusterName string, timerTask []tasks.Task) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "NotifyNewTimers", clusterName, timerTask) -} - -// NotifyNewTimers indicates an expected call of NotifyNewTimers. -func (mr *MocktimerQueueProcessorMockRecorder) NotifyNewTimers(clusterName, timerTask interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NotifyNewTimers", reflect.TypeOf((*MocktimerQueueProcessor)(nil).NotifyNewTimers), clusterName, timerTask) -} - -// Start mocks base method. -func (m *MocktimerQueueProcessor) Start() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Start") -} - -// Start indicates an expected call of Start. -func (mr *MocktimerQueueProcessorMockRecorder) Start() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MocktimerQueueProcessor)(nil).Start)) -} - -// Stop mocks base method. -func (m *MocktimerQueueProcessor) Stop() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Stop") -} - -// Stop indicates an expected call of Stop. -func (mr *MocktimerQueueProcessorMockRecorder) Stop() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MocktimerQueueProcessor)(nil).Stop)) -} - -// UnlockTaskProcessing mocks base method. -func (m *MocktimerQueueProcessor) UnlockTaskProcessing() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "UnlockTaskProcessing") -} - -// UnlockTaskProcessing indicates an expected call of UnlockTaskProcessing. -func (mr *MocktimerQueueProcessorMockRecorder) UnlockTaskProcessing() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UnlockTaskProcessing", reflect.TypeOf((*MocktimerQueueProcessor)(nil).UnlockTaskProcessing)) -} diff --git a/service/history/timerQueueStandbyTaskExecutor_test.go b/service/history/timerQueueStandbyTaskExecutor_test.go index dd810329bf7..1e3e081334b 100644 --- a/service/history/timerQueueStandbyTaskExecutor_test.go +++ b/service/history/timerQueueStandbyTaskExecutor_test.go @@ -57,6 +57,7 @@ import ( "go.temporal.io/server/common/xdc" "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/events" + "go.temporal.io/server/service/history/queues" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/service/history/tests" @@ -68,17 +69,16 @@ type ( suite.Suite *require.Assertions - controller *gomock.Controller - mockExecutionMgr *persistence.MockExecutionManager - mockShard *shard.ContextTest - mockTxProcessor *MocktransferQueueProcessor - mockReplicationProcessor *MockReplicatorQueueProcessor - mockTimerProcessor *MocktimerQueueProcessor - mockNamespaceCache *namespace.MockRegistry - mockClusterMetadata *cluster.MockMetadata - mockAdminClient *adminservicemock.MockAdminServiceClient - mockNDCHistoryResender *xdc.MockNDCHistoryResender - mockDeleteManager *workflow.MockDeleteManager + controller *gomock.Controller + mockExecutionMgr *persistence.MockExecutionManager + mockShard *shard.ContextTest + mockTxProcessor *queues.MockProcessor + mockTimerProcessor *queues.MockProcessor + mockNamespaceCache *namespace.MockRegistry + mockClusterMetadata *cluster.MockMetadata + mockAdminClient *adminservicemock.MockAdminServiceClient + mockNDCHistoryResender *xdc.MockNDCHistoryResender + mockDeleteManager *workflow.MockDeleteManager logger log.Logger namespaceID namespace.ID @@ -119,13 +119,13 @@ func (s *timerQueueStandbyTaskExecutorSuite) SetupTest() { s.discardDuration = config.StandbyTaskMissingEventsDiscardDelay() * 2 s.controller = gomock.NewController(s.T()) - s.mockTxProcessor = NewMocktransferQueueProcessor(s.controller) - s.mockReplicationProcessor = NewMockReplicatorQueueProcessor(s.controller) - s.mockTimerProcessor = NewMocktimerQueueProcessor(s.controller) s.mockNDCHistoryResender = xdc.NewMockNDCHistoryResender(s.controller) - s.mockTxProcessor.EXPECT().NotifyNewTask(gomock.Any(), gomock.Any()).AnyTimes() - s.mockReplicationProcessor.EXPECT().notifyNewTask().AnyTimes() - s.mockTimerProcessor.EXPECT().NotifyNewTimers(gomock.Any(), gomock.Any()).AnyTimes() + s.mockTxProcessor = queues.NewMockProcessor(s.controller) + s.mockTimerProcessor = queues.NewMockProcessor(s.controller) + s.mockTxProcessor.EXPECT().Category().Return(tasks.CategoryTransfer).AnyTimes() + s.mockTimerProcessor.EXPECT().Category().Return(tasks.CategoryTimer).AnyTimes() + s.mockTxProcessor.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).AnyTimes() + s.mockTimerProcessor.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).AnyTimes() s.mockShard = shard.NewTestContextWithTimeSource( s.controller, @@ -165,17 +165,19 @@ func (s *timerQueueStandbyTaskExecutorSuite) SetupTest() { historyCache := workflow.NewCache(s.mockShard) s.mockDeleteManager = workflow.NewMockDeleteManager(s.controller) h := &historyEngineImpl{ - currentClusterName: s.mockShard.Resource.GetClusterMetadata().GetCurrentClusterName(), - shard: s.mockShard, - clusterMetadata: s.mockClusterMetadata, - executionManager: s.mockExecutionMgr, - historyCache: historyCache, - logger: s.logger, - tokenSerializer: common.NewProtoTaskTokenSerializer(), - metricsClient: s.mockShard.GetMetricsClient(), - eventNotifier: events.NewNotifier(s.timeSource, metrics.NewNoopMetricsClient(), func(namespace.ID, string) int32 { return 1 }), - txProcessor: s.mockTxProcessor, - timerProcessor: s.mockTimerProcessor, + currentClusterName: s.mockShard.Resource.GetClusterMetadata().GetCurrentClusterName(), + shard: s.mockShard, + clusterMetadata: s.mockClusterMetadata, + executionManager: s.mockExecutionMgr, + historyCache: historyCache, + logger: s.logger, + tokenSerializer: common.NewProtoTaskTokenSerializer(), + metricsClient: s.mockShard.GetMetricsClient(), + eventNotifier: events.NewNotifier(s.timeSource, metrics.NewNoopMetricsClient(), func(namespace.ID, string) int32 { return 1 }), + queueProcessors: map[tasks.Category]queues.Processor{ + s.mockTxProcessor.Category(): s.mockTxProcessor, + s.mockTimerProcessor.Category(): s.mockTimerProcessor, + }, workflowDeleteManager: s.mockDeleteManager, } s.mockShard.SetEngineForTesting(h) diff --git a/service/history/transferQueueActiveTaskExecutor_test.go b/service/history/transferQueueActiveTaskExecutor_test.go index 57842a0fbcf..e8ebe7bb6db 100644 --- a/service/history/transferQueueActiveTaskExecutor_test.go +++ b/service/history/transferQueueActiveTaskExecutor_test.go @@ -44,6 +44,7 @@ import ( "go.temporal.io/server/common/definition" "go.temporal.io/server/common/persistence/visibility/manager" + "go.temporal.io/server/service/history/queues" "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/common/searchattribute" @@ -87,9 +88,8 @@ type ( controller *gomock.Controller mockShard *shard.ContextTest - mockTxProcessor *MocktransferQueueProcessor - mockReplicationProcessor *MockReplicatorQueueProcessor - mockTimerProcessor *MocktimerQueueProcessor + mockTxProcessor *queues.MockProcessor + mockTimerProcessor *queues.MockProcessor mockNamespaceCache *namespace.MockRegistry mockMatchingClient *matchingservicemock.MockMatchingServiceClient mockHistoryClient *historyservicemock.MockHistoryServiceClient @@ -149,12 +149,12 @@ func (s *transferQueueActiveTaskExecutorSuite) SetupTest() { s.timeSource = clock.NewEventTimeSource().Update(s.now) s.controller = gomock.NewController(s.T()) - s.mockTxProcessor = NewMocktransferQueueProcessor(s.controller) - s.mockReplicationProcessor = NewMockReplicatorQueueProcessor(s.controller) - s.mockTimerProcessor = NewMocktimerQueueProcessor(s.controller) - s.mockTxProcessor.EXPECT().NotifyNewTask(gomock.Any(), gomock.Any()).AnyTimes() - s.mockReplicationProcessor.EXPECT().notifyNewTask().AnyTimes() - s.mockTimerProcessor.EXPECT().NotifyNewTimers(gomock.Any(), gomock.Any()).AnyTimes() + s.mockTxProcessor = queues.NewMockProcessor(s.controller) + s.mockTimerProcessor = queues.NewMockProcessor(s.controller) + s.mockTxProcessor.EXPECT().Category().Return(tasks.CategoryTransfer).AnyTimes() + s.mockTimerProcessor.EXPECT().Category().Return(tasks.CategoryTimer).AnyTimes() + s.mockTxProcessor.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).AnyTimes() + s.mockTimerProcessor.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).AnyTimes() config := tests.NewDynamicConfig() s.mockShard = shard.NewTestContextWithTimeSource( @@ -215,9 +215,11 @@ func (s *transferQueueActiveTaskExecutorSuite) SetupTest() { tokenSerializer: common.NewProtoTaskTokenSerializer(), metricsClient: s.mockShard.GetMetricsClient(), eventNotifier: events.NewNotifier(clock.NewRealTimeSource(), metrics.NewNoopMetricsClient(), func(namespace.ID, string) int32 { return 1 }), - txProcessor: s.mockTxProcessor, - timerProcessor: s.mockTimerProcessor, - archivalClient: s.mockArchivalClient, + queueProcessors: map[tasks.Category]queues.Processor{ + s.mockTxProcessor.Category(): s.mockTxProcessor, + s.mockTimerProcessor.Category(): s.mockTimerProcessor, + }, + archivalClient: s.mockArchivalClient, } s.mockShard.SetEngineForTesting(h) diff --git a/service/history/transferQueueProcessor.go b/service/history/transferQueueProcessor.go index c4a7f61fea4..11a6206143c 100644 --- a/service/history/transferQueueProcessor.go +++ b/service/history/transferQueueProcessor.go @@ -39,7 +39,6 @@ import ( "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/api/matchingservice/v1" - "go.temporal.io/server/common" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" @@ -48,6 +47,7 @@ import ( "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/xdc" "go.temporal.io/server/service/history/configs" + "go.temporal.io/server/service/history/queues" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" ) @@ -58,14 +58,6 @@ var ( ) type ( - transferQueueProcessor interface { - common.Daemon - FailoverNamespace(namespaceIDs map[string]struct{}) - NotifyNewTask(clusterName string, transferTasks []tasks.Task) - LockTaskProcessing() - UnlockTaskProcessing() - } - taskFilter func(task tasks.Task) (bool, error) transferQueueProcessorImpl struct { @@ -99,7 +91,7 @@ func newTransferQueueProcessor( logger log.Logger, clientBean client.Bean, registry namespace.Registry, -) *transferQueueProcessorImpl { +) queues.Processor { logger = log.With(logger, tag.ComponentTransferQueue) currentClusterName := shard.GetClusterMetadata().GetCurrentClusterName() @@ -163,9 +155,9 @@ func (t *transferQueueProcessorImpl) Stop() { close(t.shutdownChan) } -// NotifyNewTask - Notify the processor about the new active / standby transfer task arrival. +// NotifyNewTasks - Notify the processor about the new active / standby transfer task arrival. // This should be called each time new transfer task arrives, otherwise tasks maybe delayed. -func (t *transferQueueProcessorImpl) NotifyNewTask( +func (t *transferQueueProcessorImpl) NotifyNewTasks( clusterName string, transferTasks []tasks.Task, ) { @@ -248,6 +240,10 @@ func (t *transferQueueProcessorImpl) UnlockTaskProcessing() { t.taskAllocator.unlock() } +func (t *transferQueueProcessorImpl) Category() tasks.Category { + return tasks.CategoryTransfer +} + func (t *transferQueueProcessorImpl) completeTransferLoop() { timer := time.NewTimer(t.config.TransferProcessorCompleteTransferInterval()) defer timer.Stop() diff --git a/service/history/transferQueueProcessor_mock.go b/service/history/transferQueueProcessor_mock.go index 9a78af2f769..7967999dcb7 100644 --- a/service/history/transferQueueProcessor_mock.go +++ b/service/history/transferQueueProcessor_mock.go @@ -27,105 +27,3 @@ // Package history is a generated GoMock package. package history - -import ( - reflect "reflect" - - gomock "github.com/golang/mock/gomock" - tasks "go.temporal.io/server/service/history/tasks" -) - -// MocktransferQueueProcessor is a mock of transferQueueProcessor interface. -type MocktransferQueueProcessor struct { - ctrl *gomock.Controller - recorder *MocktransferQueueProcessorMockRecorder -} - -// MocktransferQueueProcessorMockRecorder is the mock recorder for MocktransferQueueProcessor. -type MocktransferQueueProcessorMockRecorder struct { - mock *MocktransferQueueProcessor -} - -// NewMocktransferQueueProcessor creates a new mock instance. -func NewMocktransferQueueProcessor(ctrl *gomock.Controller) *MocktransferQueueProcessor { - mock := &MocktransferQueueProcessor{ctrl: ctrl} - mock.recorder = &MocktransferQueueProcessorMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MocktransferQueueProcessor) EXPECT() *MocktransferQueueProcessorMockRecorder { - return m.recorder -} - -// FailoverNamespace mocks base method. -func (m *MocktransferQueueProcessor) FailoverNamespace(namespaceIDs map[string]struct{}) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "FailoverNamespace", namespaceIDs) -} - -// FailoverNamespace indicates an expected call of FailoverNamespace. -func (mr *MocktransferQueueProcessorMockRecorder) FailoverNamespace(namespaceIDs interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FailoverNamespace", reflect.TypeOf((*MocktransferQueueProcessor)(nil).FailoverNamespace), namespaceIDs) -} - -// LockTaskProcessing mocks base method. -func (m *MocktransferQueueProcessor) LockTaskProcessing() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "LockTaskProcessing") -} - -// LockTaskProcessing indicates an expected call of LockTaskProcessing. -func (mr *MocktransferQueueProcessorMockRecorder) LockTaskProcessing() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LockTaskProcessing", reflect.TypeOf((*MocktransferQueueProcessor)(nil).LockTaskProcessing)) -} - -// NotifyNewTask mocks base method. -func (m *MocktransferQueueProcessor) NotifyNewTask(clusterName string, transferTasks []tasks.Task) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "NotifyNewTask", clusterName, transferTasks) -} - -// NotifyNewTask indicates an expected call of NotifyNewTask. -func (mr *MocktransferQueueProcessorMockRecorder) NotifyNewTask(clusterName, transferTasks interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NotifyNewTask", reflect.TypeOf((*MocktransferQueueProcessor)(nil).NotifyNewTask), clusterName, transferTasks) -} - -// Start mocks base method. -func (m *MocktransferQueueProcessor) Start() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Start") -} - -// Start indicates an expected call of Start. -func (mr *MocktransferQueueProcessorMockRecorder) Start() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MocktransferQueueProcessor)(nil).Start)) -} - -// Stop mocks base method. -func (m *MocktransferQueueProcessor) Stop() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Stop") -} - -// Stop indicates an expected call of Stop. -func (mr *MocktransferQueueProcessorMockRecorder) Stop() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MocktransferQueueProcessor)(nil).Stop)) -} - -// UnlockTaskProcessing mocks base method. -func (m *MocktransferQueueProcessor) UnlockTaskProcessing() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "UnlockTaskProcessing") -} - -// UnlockTaskProcessing indicates an expected call of UnlockTaskProcessing. -func (mr *MocktransferQueueProcessorMockRecorder) UnlockTaskProcessing() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UnlockTaskProcessing", reflect.TypeOf((*MocktransferQueueProcessor)(nil).UnlockTaskProcessing)) -} diff --git a/service/history/visibilityQueueProcessor.go b/service/history/visibilityQueueProcessor.go index 276e0cdfcc1..9fde0379494 100644 --- a/service/history/visibilityQueueProcessor.go +++ b/service/history/visibilityQueueProcessor.go @@ -32,23 +32,18 @@ import ( "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/api/matchingservice/v1" - "go.temporal.io/server/common" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/service/history/configs" + "go.temporal.io/server/service/history/queues" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" ) type ( - visibilityQueueProcessor interface { - common.Daemon - NotifyNewTask(visibilityTasks []tasks.Task) - } - updateVisibilityAckLevel func(ackLevel int64) error visibilityQueueShutdown func() error @@ -84,7 +79,7 @@ func newVisibilityQueueProcessor( matchingClient matchingservice.MatchingServiceClient, historyClient historyservice.HistoryServiceClient, logger log.Logger, -) *visibilityQueueProcessorImpl { +) queues.Processor { config := shard.GetConfig() logger = log.With(logger, tag.ComponentVisibilityQueue) @@ -187,9 +182,10 @@ func (t *visibilityQueueProcessorImpl) Stop() { close(t.shutdownChan) } -// NotifyNewTask - Notify the processor about the new visibility task arrival. +// NotifyNewTasks - Notify the processor about the new visibility task arrival. // This should be called each time new visibility task arrives, otherwise tasks maybe delayed. -func (t *visibilityQueueProcessorImpl) NotifyNewTask( +func (t *visibilityQueueProcessorImpl) NotifyNewTasks( + _ string, visibilityTasks []tasks.Task, ) { if len(visibilityTasks) != 0 { @@ -197,6 +193,24 @@ func (t *visibilityQueueProcessorImpl) NotifyNewTask( } } +func (t *visibilityQueueProcessorImpl) FailoverNamespace( + namespaceIDs map[string]struct{}, +) { + // no-op +} + +func (t *visibilityQueueProcessorImpl) LockTaskProcessing() { + // no-op +} + +func (t *visibilityQueueProcessorImpl) UnlockTaskProcessing() { + // no-op +} + +func (t *visibilityQueueProcessorImpl) Category() tasks.Category { + return tasks.CategoryVisibility +} + func (t *visibilityQueueProcessorImpl) completeTaskLoop() { timer := time.NewTimer(t.config.VisibilityProcessorCompleteTaskInterval()) defer timer.Stop() diff --git a/service/history/workflow/transaction_impl.go b/service/history/workflow/transaction_impl.go index ba6f56bec6d..e60be0c4389 100644 --- a/service/history/workflow/transaction_impl.go +++ b/service/history/workflow/transaction_impl.go @@ -40,7 +40,6 @@ import ( "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/events" "go.temporal.io/server/service/history/shard" - "go.temporal.io/server/service/history/tasks" ) type ( @@ -520,14 +519,7 @@ func NotifyWorkflowSnapshotTasks( if workflowSnapshot == nil { return } - notifyTasks( - engine, - workflowSnapshot.Tasks[tasks.CategoryTransfer], - workflowSnapshot.Tasks[tasks.CategoryTimer], - workflowSnapshot.Tasks[tasks.CategoryReplication], - workflowSnapshot.Tasks[tasks.CategoryVisibility], - clusterName, - ) + engine.NotifyNewTasks(clusterName, workflowSnapshot.Tasks) } func NotifyWorkflowMutationTasks( @@ -538,28 +530,7 @@ func NotifyWorkflowMutationTasks( if workflowMutation == nil { return } - notifyTasks( - engine, - workflowMutation.Tasks[tasks.CategoryTransfer], - workflowMutation.Tasks[tasks.CategoryTimer], - workflowMutation.Tasks[tasks.CategoryReplication], - workflowMutation.Tasks[tasks.CategoryVisibility], - clusterName, - ) -} - -func notifyTasks( - engine shard.Engine, - transferTasks []tasks.Task, - timerTasks []tasks.Task, - replicationTasks []tasks.Task, - visibilityTasks []tasks.Task, - clusterName string, -) { - engine.NotifyNewTransferTasks(clusterName, transferTasks) - engine.NotifyNewTimerTasks(clusterName, timerTasks) - engine.NotifyNewVisibilityTasks(visibilityTasks) - engine.NotifyNewReplicationTasks(replicationTasks) + engine.NotifyNewTasks(clusterName, workflowMutation.Tasks) } func NotifyNewHistorySnapshotEvent( diff --git a/service/history/workflow/transaction_test.go b/service/history/workflow/transaction_test.go index 74d017c7c35..7eac056ae70 100644 --- a/service/history/workflow/transaction_test.go +++ b/service/history/workflow/transaction_test.go @@ -189,8 +189,5 @@ func (s *transactionSuite) TestConflictResolveWorkflowExecution_NotifyTaskWhenFa } func (s *transactionSuite) setupMockForTaskNotification() { - s.mockEngine.EXPECT().NotifyNewTransferTasks(gomock.Any(), gomock.Any()).Times(1) - s.mockEngine.EXPECT().NotifyNewTimerTasks(gomock.Any(), gomock.Any()).Times(1) - s.mockEngine.EXPECT().NotifyNewVisibilityTasks(gomock.Any()).Times(1) - s.mockEngine.EXPECT().NotifyNewReplicationTasks(gomock.Any()).Times(1) + s.mockEngine.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).Times(1) } diff --git a/service/history/workflowTaskHandlerCallbacks.go b/service/history/workflowTaskHandlerCallbacks.go index 70c3fb0fc53..c630750d9bb 100644 --- a/service/history/workflowTaskHandlerCallbacks.go +++ b/service/history/workflowTaskHandlerCallbacks.go @@ -76,8 +76,6 @@ type ( historyEngine *historyEngineImpl namespaceRegistry namespace.Registry historyCache workflow.Cache - txProcessor transferQueueProcessor - timerProcessor timerQueueProcessor tokenSerializer common.TaskTokenSerializer metricsClient metrics.Client logger log.Logger @@ -96,8 +94,6 @@ func newWorkflowTaskHandlerCallback(historyEngine *historyEngineImpl) *workflowT historyEngine: historyEngine, namespaceRegistry: historyEngine.shard.GetNamespaceRegistry(), historyCache: historyEngine.historyCache, - txProcessor: historyEngine.txProcessor, - timerProcessor: historyEngine.timerProcessor, tokenSerializer: historyEngine.tokenSerializer, metricsClient: historyEngine.metricsClient, logger: historyEngine.logger,