Skip to content

Commit

Permalink
Queue refactor part 3: task notification (temporalio#2489)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Feb 11, 2022
1 parent 855ef8e commit 131563b
Show file tree
Hide file tree
Showing 22 changed files with 365 additions and 512 deletions.
107 changes: 48 additions & 59 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"go.temporal.io/server/client"
"go.temporal.io/server/common/archiver/provider"
"go.temporal.io/server/common/persistence/visibility/manager"
"go.temporal.io/server/service/history/queues"
"go.temporal.io/server/service/history/tasks"

enumsspb "go.temporal.io/server/api/enums/v1"
Expand Down Expand Up @@ -95,12 +96,10 @@ type (
workflowTaskHandler workflowTaskHandlerCallbacks
clusterMetadata cluster.Metadata
executionManager persistence.ExecutionManager
txProcessor transferQueueProcessor
timerProcessor timerQueueProcessor
visibilityProcessor visibilityQueueProcessor
queueProcessors map[tasks.Category]queues.Processor
replicatorProcessor *replicatorQueueProcessorImpl
nDCReplicator nDCHistoryReplicator
nDCActivityReplicator nDCActivityReplicator
replicatorProcessor *replicatorQueueProcessorImpl
eventNotifier events.Notifier
tokenSerializer common.TaskTokenSerializer
historyCache workflow.Cache
Expand Down Expand Up @@ -184,12 +183,17 @@ func NewEngineWithShardContext(
workflowDeleteManager: workflowDeleteManager,
}

historyEngImpl.txProcessor = newTransferQueueProcessor(shard, historyEngImpl,
txProcessor := newTransferQueueProcessor(shard, historyEngImpl,
matchingClient, historyClient, logger, clientBean, registry)
historyEngImpl.timerProcessor = newTimerQueueProcessor(shard, historyEngImpl,
timerProcessor := newTimerQueueProcessor(shard, historyEngImpl,
matchingClient, logger, clientBean)
historyEngImpl.visibilityProcessor = newVisibilityQueueProcessor(shard, historyEngImpl, visibilityMgr,
visibilityProcessor := newVisibilityQueueProcessor(shard, historyEngImpl, visibilityMgr,
matchingClient, historyClient, logger)
historyEngImpl.queueProcessors = map[tasks.Category]queues.Processor{
txProcessor.Category(): txProcessor,
timerProcessor.Category(): timerProcessor,
visibilityProcessor.Category(): visibilityProcessor,
}
historyEngImpl.eventsReapplier = newNDCEventsReapplier(shard.GetMetricsClient(), logger)

if shard.GetClusterMetadata().IsGlobalNamespaceEnabled() {
Expand Down Expand Up @@ -246,10 +250,8 @@ func (e *historyEngineImpl) Start() {
e.logger.Info("", tag.LifeCycleStarting)
defer e.logger.Info("", tag.LifeCycleStarted)

e.txProcessor.Start()
e.timerProcessor.Start()
if e.visibilityProcessor != nil {
e.visibilityProcessor.Start()
for _, queueProcessor := range e.queueProcessors {
queueProcessor.Start()
}

// failover callback will try to create a failover queue processor to scan all inflight tasks
Expand All @@ -276,11 +278,10 @@ func (e *historyEngineImpl) Stop() {
e.logger.Info("", tag.LifeCycleStopping)
defer e.logger.Info("", tag.LifeCycleStopped)

e.txProcessor.Stop()
e.timerProcessor.Stop()
if e.visibilityProcessor != nil {
e.visibilityProcessor.Stop()
for _, queueProcessor := range e.queueProcessors {
queueProcessor.Stop()
}

callbackID := getMetadataChangeCallbackID(common.HistoryServiceName, e.shard.GetShardID())
e.clusterMetadata.UnRegisterMetadataChangeCallback(callbackID)
e.replicationTaskProcessorsLock.Lock()
Expand Down Expand Up @@ -330,13 +331,15 @@ func (e *historyEngineImpl) registerNamespaceFailoverCallback() {
e.shard.GetShardID(),
0, /* always want callback so UpdateHandoverNamespaces() can be called after shard reload */
func() {
e.txProcessor.LockTaskProcessing()
e.timerProcessor.LockTaskProcessing()
for _, queueProcessor := range e.queueProcessors {
queueProcessor.LockTaskProcessing()
}
},
func(prevNamespaces []*namespace.Namespace, nextNamespaces []*namespace.Namespace) {
defer func() {
e.txProcessor.UnlockTaskProcessing()
e.timerProcessor.UnlockTaskProcessing()
for _, queueProcessor := range e.queueProcessors {
queueProcessor.UnlockTaskProcessing()
}
}()

if len(nextNamespaces) == 0 {
Expand Down Expand Up @@ -365,16 +368,18 @@ func (e *historyEngineImpl) registerNamespaceFailoverCallback() {
if len(failoverNamespaceIDs) > 0 {
e.logger.Info("Namespace Failover Start.", tag.WorkflowNamespaceIDs(failoverNamespaceIDs))

e.txProcessor.FailoverNamespace(failoverNamespaceIDs)
e.timerProcessor.FailoverNamespace(failoverNamespaceIDs)
for _, queueProcessor := range e.queueProcessors {
queueProcessor.FailoverNamespace(failoverNamespaceIDs)
}

now := e.shard.GetTimeSource().Now()
// the fake tasks will not be actually used, we just need to make sure
// its length > 0 and has correct timestamp, to trigger a db scan
fakeWorkflowTask := []tasks.Task{&tasks.WorkflowTask{}}
fakeWorkflowTaskTimeoutTask := []tasks.Task{&tasks.WorkflowTaskTimeoutTask{VisibilityTimestamp: now}}
e.txProcessor.NotifyNewTask(e.currentClusterName, fakeWorkflowTask)
e.timerProcessor.NotifyNewTimers(e.currentClusterName, fakeWorkflowTaskTimeoutTask)
now := e.shard.GetTimeSource().Now()
fakeTasks := make(map[tasks.Category][]tasks.Task)
for category := range e.queueProcessors {
fakeTasks[category] = []tasks.Task{tasks.NewFakeTask(category, now)}
}
e.NotifyNewTasks(e.currentClusterName, fakeTasks)
}

// nolint:errcheck
Expand Down Expand Up @@ -2380,8 +2385,9 @@ func (e *historyEngineImpl) SyncShardStatus(
// 2. notify the timer gate in the timer queue standby processor
// 3, notify the transfer (essentially a no op, just put it here so it looks symmetric)
e.shard.SetCurrentTime(clusterName, now)
e.txProcessor.NotifyNewTask(clusterName, []tasks.Task{})
e.timerProcessor.NotifyNewTimers(clusterName, []tasks.Task{})
for _, processor := range e.queueProcessors {
processor.NotifyNewTasks(clusterName, []tasks.Task{})
}
return nil
}

Expand Down Expand Up @@ -2666,40 +2672,23 @@ func (e *historyEngineImpl) NotifyNewHistoryEvent(
e.eventNotifier.NotifyNewHistoryEvent(notification)
}

func (e *historyEngineImpl) NotifyNewTransferTasks(
clusterName string,
tasks []tasks.Task,
) {
if len(tasks) > 0 {
e.txProcessor.NotifyNewTask(clusterName, tasks)
}
}

func (e *historyEngineImpl) NotifyNewTimerTasks(
func (e *historyEngineImpl) NotifyNewTasks(
clusterName string,
tasks []tasks.Task,
) {

if len(tasks) > 0 {
e.timerProcessor.NotifyNewTimers(clusterName, tasks)
}
}

func (e *historyEngineImpl) NotifyNewReplicationTasks(
tasks []tasks.Task,
) {

if len(tasks) > 0 && e.replicatorProcessor != nil {
e.replicatorProcessor.NotifyNewTasks(tasks)
}
}

func (e *historyEngineImpl) NotifyNewVisibilityTasks(
tasks []tasks.Task,
newTasks map[tasks.Category][]tasks.Task,
) {
for category, tasksByCategory := range newTasks {
// TODO: make replicatorProcessor part of queueProcessors list
// and get rid of the special case here.
if category == tasks.CategoryReplication {
if e.replicatorProcessor != nil {
e.replicatorProcessor.NotifyNewTasks(tasksByCategory)
}
continue
}

if len(tasks) > 0 && e.visibilityProcessor != nil {
e.visibilityProcessor.NotifyNewTask(tasks)
if len(tasksByCategory) > 0 {
e.queueProcessors[category].NotifyNewTasks(clusterName, tasksByCategory)
}
}
}

Expand Down
37 changes: 24 additions & 13 deletions service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ import (
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/events"
"go.temporal.io/server/service/history/queues"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/tests"
"go.temporal.io/server/service/history/workflow"

Expand All @@ -76,13 +78,14 @@ type (
suite.Suite
*require.Assertions

controller *gomock.Controller
mockShard *shard.ContextTest
mockTxProcessor *MocktransferQueueProcessor
mockTimerProcessor *MocktimerQueueProcessor
mockEventsCache *events.MockCache
mockNamespaceCache *namespace.MockRegistry
mockClusterMetadata *cluster.MockMetadata
controller *gomock.Controller
mockShard *shard.ContextTest
mockTxProcessor *queues.MockProcessor
mockTimerProcessor *queues.MockProcessor
mockVisibilityProcessor *queues.MockProcessor
mockEventsCache *events.MockCache
mockNamespaceCache *namespace.MockRegistry
mockClusterMetadata *cluster.MockMetadata

historyEngine *historyEngineImpl
mockExecutionMgr *persistence.MockExecutionManager
Expand All @@ -109,10 +112,15 @@ func (s *engine2Suite) SetupTest() {

s.controller = gomock.NewController(s.T())

s.mockTxProcessor = NewMocktransferQueueProcessor(s.controller)
s.mockTimerProcessor = NewMocktimerQueueProcessor(s.controller)
s.mockTxProcessor.EXPECT().NotifyNewTask(gomock.Any(), gomock.Any()).AnyTimes()
s.mockTimerProcessor.EXPECT().NotifyNewTimers(gomock.Any(), gomock.Any()).AnyTimes()
s.mockTxProcessor = queues.NewMockProcessor(s.controller)
s.mockTimerProcessor = queues.NewMockProcessor(s.controller)
s.mockVisibilityProcessor = queues.NewMockProcessor(s.controller)
s.mockTxProcessor.EXPECT().Category().Return(tasks.CategoryTransfer).AnyTimes()
s.mockTimerProcessor.EXPECT().Category().Return(tasks.CategoryTimer).AnyTimes()
s.mockVisibilityProcessor.EXPECT().Category().Return(tasks.CategoryVisibility).AnyTimes()
s.mockTxProcessor.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).AnyTimes()
s.mockTimerProcessor.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).AnyTimes()
s.mockVisibilityProcessor.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).AnyTimes()

s.config = tests.NewDynamicConfig()
mockShard := shard.NewTestContext(
Expand Down Expand Up @@ -155,8 +163,11 @@ func (s *engine2Suite) SetupTest() {
config: s.config,
timeSource: s.mockShard.GetTimeSource(),
eventNotifier: events.NewNotifier(clock.NewRealTimeSource(), metrics.NewNoopMetricsClient(), func(namespace.ID, string) int32 { return 1 }),
txProcessor: s.mockTxProcessor,
timerProcessor: s.mockTimerProcessor,
queueProcessors: map[tasks.Category]queues.Processor{
s.mockTxProcessor.Category(): s.mockTxProcessor,
s.mockTimerProcessor.Category(): s.mockTimerProcessor,
s.mockVisibilityProcessor.Category(): s.mockVisibilityProcessor,
},
searchAttributesValidator: searchattribute.NewValidator(
searchattribute.NewTestProvider(),
s.mockShard.Resource.SearchAttributesMapper,
Expand Down
38 changes: 25 additions & 13 deletions service/history/historyEngine3_eventsv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ import (
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/events"
"go.temporal.io/server/service/history/queues"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/tests"
"go.temporal.io/server/service/history/workflow"
)
Expand All @@ -64,13 +66,14 @@ type (
suite.Suite
*require.Assertions

controller *gomock.Controller
mockShard *shard.ContextTest
mockTxProcessor *MocktransferQueueProcessor
mockTimerProcessor *MocktimerQueueProcessor
mockEventsCache *events.MockCache
mockNamespaceCache *namespace.MockRegistry
mockClusterMetadata *cluster.MockMetadata
controller *gomock.Controller
mockShard *shard.ContextTest
mockTxProcessor *queues.MockProcessor
mockTimerProcessor *queues.MockProcessor
mockVisibilityProcessor *queues.MockProcessor
mockEventsCache *events.MockCache
mockNamespaceCache *namespace.MockRegistry
mockClusterMetadata *cluster.MockMetadata

historyEngine *historyEngineImpl
mockExecutionMgr *persistence.MockExecutionManager
Expand All @@ -96,10 +99,16 @@ func (s *engine3Suite) SetupTest() {
s.Assertions = require.New(s.T())

s.controller = gomock.NewController(s.T())
s.mockTxProcessor = NewMocktransferQueueProcessor(s.controller)
s.mockTimerProcessor = NewMocktimerQueueProcessor(s.controller)
s.mockTxProcessor.EXPECT().NotifyNewTask(gomock.Any(), gomock.Any()).AnyTimes()
s.mockTimerProcessor.EXPECT().NotifyNewTimers(gomock.Any(), gomock.Any()).AnyTimes()

s.mockTxProcessor = queues.NewMockProcessor(s.controller)
s.mockTimerProcessor = queues.NewMockProcessor(s.controller)
s.mockVisibilityProcessor = queues.NewMockProcessor(s.controller)
s.mockTxProcessor.EXPECT().Category().Return(tasks.CategoryTransfer).AnyTimes()
s.mockTimerProcessor.EXPECT().Category().Return(tasks.CategoryTimer).AnyTimes()
s.mockVisibilityProcessor.EXPECT().Category().Return(tasks.CategoryVisibility).AnyTimes()
s.mockTxProcessor.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).AnyTimes()
s.mockTimerProcessor.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).AnyTimes()
s.mockVisibilityProcessor.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).AnyTimes()

s.mockShard = shard.NewTestContext(
s.controller,
Expand Down Expand Up @@ -137,8 +146,11 @@ func (s *engine3Suite) SetupTest() {
config: s.config,
timeSource: s.mockShard.GetTimeSource(),
eventNotifier: events.NewNotifier(clock.NewRealTimeSource(), metrics.NewNoopMetricsClient(), func(namespace.ID, string) int32 { return 1 }),
txProcessor: s.mockTxProcessor,
timerProcessor: s.mockTimerProcessor,
queueProcessors: map[tasks.Category]queues.Processor{
s.mockTxProcessor.Category(): s.mockTxProcessor,
s.mockTimerProcessor.Category(): s.mockTimerProcessor,
s.mockVisibilityProcessor.Category(): s.mockVisibilityProcessor,
},
}
s.mockShard.SetEngineForTesting(h)
h.workflowTaskHandler = newWorkflowTaskHandlerCallback(h)
Expand Down
47 changes: 29 additions & 18 deletions service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ import (
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/events"
"go.temporal.io/server/service/history/queues"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/tests"
"go.temporal.io/server/service/history/workflow"
)
Expand All @@ -81,16 +83,17 @@ type (
suite.Suite
*require.Assertions

controller *gomock.Controller
mockShard *shard.ContextTest
mockTxProcessor *MocktransferQueueProcessor
mockTimerProcessor *MocktimerQueueProcessor
mockNamespaceCache *namespace.MockRegistry
mockMatchingClient *matchingservicemock.MockMatchingServiceClient
mockHistoryClient *historyservicemock.MockHistoryServiceClient
mockClusterMetadata *cluster.MockMetadata
mockEventsReapplier *MocknDCEventsReapplier
mockWorkflowResetter *MockworkflowResetter
controller *gomock.Controller
mockShard *shard.ContextTest
mockTxProcessor *queues.MockProcessor
mockTimerProcessor *queues.MockProcessor
mockVisibilityProcessor *queues.MockProcessor
mockNamespaceCache *namespace.MockRegistry
mockMatchingClient *matchingservicemock.MockMatchingServiceClient
mockHistoryClient *historyservicemock.MockHistoryServiceClient
mockClusterMetadata *cluster.MockMetadata
mockEventsReapplier *MocknDCEventsReapplier
mockWorkflowResetter *MockworkflowResetter

mockHistoryEngine *historyEngineImpl
mockExecutionMgr *persistence.MockExecutionManager
Expand All @@ -117,12 +120,17 @@ func (s *engineSuite) SetupTest() {
s.Assertions = require.New(s.T())

s.controller = gomock.NewController(s.T())
s.mockTxProcessor = NewMocktransferQueueProcessor(s.controller)
s.mockTimerProcessor = NewMocktimerQueueProcessor(s.controller)
s.mockEventsReapplier = NewMocknDCEventsReapplier(s.controller)
s.mockWorkflowResetter = NewMockworkflowResetter(s.controller)
s.mockTxProcessor.EXPECT().NotifyNewTask(gomock.Any(), gomock.Any()).AnyTimes()
s.mockTimerProcessor.EXPECT().NotifyNewTimers(gomock.Any(), gomock.Any()).AnyTimes()
s.mockTxProcessor = queues.NewMockProcessor(s.controller)
s.mockTimerProcessor = queues.NewMockProcessor(s.controller)
s.mockVisibilityProcessor = queues.NewMockProcessor(s.controller)
s.mockTxProcessor.EXPECT().Category().Return(tasks.CategoryTransfer).AnyTimes()
s.mockTimerProcessor.EXPECT().Category().Return(tasks.CategoryTimer).AnyTimes()
s.mockVisibilityProcessor.EXPECT().Category().Return(tasks.CategoryVisibility).AnyTimes()
s.mockTxProcessor.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).AnyTimes()
s.mockTimerProcessor.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).AnyTimes()
s.mockVisibilityProcessor.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).AnyTimes()

s.config = tests.NewDynamicConfig()
s.mockShard = shard.NewTestContext(
Expand Down Expand Up @@ -181,10 +189,13 @@ func (s *engineSuite) SetupTest() {
tokenSerializer: common.NewProtoTaskTokenSerializer(),
eventNotifier: eventNotifier,
config: s.config,
txProcessor: s.mockTxProcessor,
timerProcessor: s.mockTimerProcessor,
eventsReapplier: s.mockEventsReapplier,
workflowResetter: s.mockWorkflowResetter,
queueProcessors: map[tasks.Category]queues.Processor{
s.mockTxProcessor.Category(): s.mockTxProcessor,
s.mockTimerProcessor.Category(): s.mockTimerProcessor,
s.mockVisibilityProcessor.Category(): s.mockVisibilityProcessor,
},
eventsReapplier: s.mockEventsReapplier,
workflowResetter: s.mockWorkflowResetter,
}
s.mockShard.SetEngineForTesting(h)
h.workflowTaskHandler = newWorkflowTaskHandlerCallback(h)
Expand Down
Loading

0 comments on commit 131563b

Please sign in to comment.