Skip to content

Commit

Permalink
Wire up multi-cursor timer queue implementation (cadence-workflow#3318)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Jun 9, 2020
1 parent fc543b8 commit dd9cb49
Show file tree
Hide file tree
Showing 25 changed files with 855 additions and 276 deletions.
3 changes: 3 additions & 0 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ var keys = map[Key]string{
TimerProcessorRedispatchIntervalJitterCoefficient: "history.timerProcessorRedispatchIntervalJitterCoefficient",
TimerProcessorMaxRedispatchQueueSize: "history.timerProcessorMaxRedispatchQueueSize",
TimerProcessorEnablePriorityTaskProcessor: "history.timerProcessorEnablePriorityTaskProcessor",
TimerProcessorEnableMultiCurosrProcessor: "history.timerProcessorEnableMultiCursorProcessor",
TimerProcessorMaxTimeShift: "history.timerProcessorMaxTimeShift",
TimerProcessorHistoryArchivalSizeLimit: "history.timerProcessorHistoryArchivalSizeLimit",
TimerProcessorArchivalTimeLimit: "history.timerProcessorArchivalTimeLimit",
Expand Down Expand Up @@ -574,6 +575,8 @@ const (
TimerProcessorMaxRedispatchQueueSize
// TimerProcessorEnablePriorityTaskProcessor indicates whether priority task processor should be used for timer processor
TimerProcessorEnablePriorityTaskProcessor
// TimerProcessorEnableMultiCurosrProcessor indicates whether multi-cursor queue processor should be used for timer processor
TimerProcessorEnableMultiCurosrProcessor
// TimerProcessorMaxTimeShift is the max shift timer processor can have
TimerProcessorMaxTimeShift
// TimerProcessorHistoryArchivalSizeLimit is the max history size for inline archival
Expand Down
8 changes: 4 additions & 4 deletions host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4031,7 +4031,7 @@ func (s *integrationSuite) startWithMemoHelper(startFn startFunc, id string, tas
s.Equal(memo, descResp.WorkflowExecutionInfo.Memo)

// verify closed visibility
var closdExecutionInfo *workflow.WorkflowExecutionInfo
var closedExecutionInfo *workflow.WorkflowExecutionInfo
for i := 0; i < 10; i++ {
resp, err1 := s.engine.ListClosedWorkflowExecutions(createContext(), &workflow.ListClosedWorkflowExecutionsRequest{
Domain: common.StringPtr(s.domainName),
Expand All @@ -4046,14 +4046,14 @@ func (s *integrationSuite) startWithMemoHelper(startFn startFunc, id string, tas
})
s.Nil(err1)
if len(resp.Executions) == 1 {
closdExecutionInfo = resp.Executions[0]
closedExecutionInfo = resp.Executions[0]
break
}
s.Logger.Info("Closed WorkflowExecution is not yet visible")
time.Sleep(100 * time.Millisecond)
}
s.NotNil(closdExecutionInfo)
s.Equal(memo, closdExecutionInfo.Memo)
s.NotNil(closedExecutionInfo)
s.Equal(memo, closedExecutionInfo.Memo)
}

func (s *integrationSuite) sendSignal(domainName string, execution *workflow.WorkflowExecution, signalName string,
Expand Down
2 changes: 2 additions & 0 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ type Config struct {
TimerProcessorRedispatchIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
TimerProcessorMaxRedispatchQueueSize dynamicconfig.IntPropertyFn
TimerProcessorEnablePriorityTaskProcessor dynamicconfig.BoolPropertyFn
TimerProcessorEnableMultiCurosrProcessor dynamicconfig.BoolPropertyFn
TimerProcessorMaxTimeShift dynamicconfig.DurationPropertyFn
TimerProcessorHistoryArchivalSizeLimit dynamicconfig.IntPropertyFn
TimerProcessorArchivalTimeLimit dynamicconfig.DurationPropertyFn
Expand Down Expand Up @@ -305,6 +306,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA
TimerProcessorRedispatchIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TimerProcessorRedispatchIntervalJitterCoefficient, 0.15),
TimerProcessorMaxRedispatchQueueSize: dc.GetIntProperty(dynamicconfig.TimerProcessorMaxRedispatchQueueSize, 10000),
TimerProcessorEnablePriorityTaskProcessor: dc.GetBoolProperty(dynamicconfig.TimerProcessorEnablePriorityTaskProcessor, false),
TimerProcessorEnableMultiCurosrProcessor: dc.GetBoolProperty(dynamicconfig.TimerProcessorEnableMultiCurosrProcessor, false),
TimerProcessorMaxTimeShift: dc.GetDurationProperty(dynamicconfig.TimerProcessorMaxTimeShift, 1*time.Second),
TimerProcessorHistoryArchivalSizeLimit: dc.GetIntProperty(dynamicconfig.TimerProcessorHistoryArchivalSizeLimit, 500*1024),
TimerProcessorArchivalTimeLimit: dc.GetDurationProperty(dynamicconfig.TimerProcessorArchivalTimeLimit, 1*time.Second),
Expand Down
10 changes: 5 additions & 5 deletions service/history/conflictResolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ type (

controller *gomock.Controller
mockShard *shard.TestContext
mockTxProcessor *queue.MockTransferQueueProcessor
mockTxProcessor *queue.MockProcessor
mockTimerProcessor *queue.MockProcessor
mockReplicationProcessor *MockReplicatorQueueProcessor
mockTimerProcessor *MocktimerQueueProcessor
mockEventsCache *events.MockCache
mockDomainCache *cache.MockDomainCache
mockClusterMetadata *cluster.MockMetadata
Expand Down Expand Up @@ -88,12 +88,12 @@ func (s *conflictResolverSuite) SetupTest() {
s.Assertions = require.New(s.T())

s.controller = gomock.NewController(s.T())
s.mockTxProcessor = queue.NewMockTransferQueueProcessor(s.controller)
s.mockTxProcessor = queue.NewMockProcessor(s.controller)
s.mockTimerProcessor = queue.NewMockProcessor(s.controller)
s.mockReplicationProcessor = NewMockReplicatorQueueProcessor(s.controller)
s.mockTimerProcessor = NewMocktimerQueueProcessor(s.controller)
s.mockTxProcessor.EXPECT().NotifyNewTask(gomock.Any(), gomock.Any()).AnyTimes()
s.mockTimerProcessor.EXPECT().NotifyNewTask(gomock.Any(), gomock.Any()).AnyTimes()
s.mockReplicationProcessor.EXPECT().notifyNewTask().AnyTimes()
s.mockTimerProcessor.EXPECT().NotifyNewTimers(gomock.Any(), gomock.Any()).AnyTimes()

s.mockShard = shard.NewTestContext(
s.controller,
Expand Down
4 changes: 2 additions & 2 deletions service/history/decisionHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ type (
historyEngine *historyEngineImpl
domainCache cache.DomainCache
executionCache *execution.Cache
txProcessor queue.TransferQueueProcessor
timerProcessor timerQueueProcessor
txProcessor queue.Processor
timerProcessor queue.Processor
tokenSerializer common.TaskTokenSerializer
metricsClient metrics.Client
logger log.Logger
Expand Down
22 changes: 16 additions & 6 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ type (
historyV2Mgr persistence.HistoryManager
executionManager persistence.ExecutionManager
visibilityMgr persistence.VisibilityManager
txProcessor queue.TransferQueueProcessor
timerProcessor timerQueueProcessor
txProcessor queue.Processor
timerProcessor queue.Processor
replicator *historyReplicator
nDCReplicator ndc.HistoryReplicator
nDCActivityReplicator ndc.ActivityReplicator
Expand Down Expand Up @@ -229,7 +229,17 @@ func NewEngineWithShardContext(
} else {
historyEngImpl.txProcessor = newTransferQueueProcessor(shard, historyEngImpl, visibilityMgr, matching, historyClient, queueTaskProcessor, logger)
}
historyEngImpl.timerProcessor = newTimerQueueProcessor(shard, historyEngImpl, matching, queueTaskProcessor, logger)
if config.TimerProcessorEnableMultiCurosrProcessor() {
historyEngImpl.timerProcessor = queue.NewTimerQueueProcessor(
shard,
historyEngImpl,
queueTaskProcessor,
executionCache,
historyEngImpl.archivalClient,
)
} else {
historyEngImpl.timerProcessor = newTimerQueueProcessor(shard, historyEngImpl, matching, queueTaskProcessor, logger)
}
historyEngImpl.eventsReapplier = ndc.NewEventsReapplier(shard.GetMetricsClient(), logger)

// Only start the replicator processor if valid publisher is passed in
Expand Down Expand Up @@ -433,7 +443,7 @@ func (e *historyEngineImpl) registerDomainFailoverCallback() {
fakeDecisionTask := []persistence.Task{&persistence.DecisionTask{}}
fakeDecisionTimeoutTask := []persistence.Task{&persistence.DecisionTimeoutTask{VisibilityTimestamp: now}}
e.txProcessor.NotifyNewTask(e.currentClusterName, fakeDecisionTask)
e.timerProcessor.NotifyNewTimers(e.currentClusterName, fakeDecisionTimeoutTask)
e.timerProcessor.NotifyNewTask(e.currentClusterName, fakeDecisionTimeoutTask)
}

// handle graceful failover on active to passive
Expand Down Expand Up @@ -2409,7 +2419,7 @@ func (e *historyEngineImpl) SyncShardStatus(
// 3, notify the transfer (essentially a no op, just put it here so it looks symmetric)
e.shard.SetCurrentTime(clusterName, now)
e.txProcessor.NotifyNewTask(clusterName, []persistence.Task{})
e.timerProcessor.NotifyNewTimers(clusterName, []persistence.Task{})
e.timerProcessor.NotifyNewTask(clusterName, []persistence.Task{})
return nil
}

Expand Down Expand Up @@ -2743,7 +2753,7 @@ func (e *historyEngineImpl) NotifyNewTimerTasks(
if len(tasks) > 0 {
task := tasks[0]
clusterName := e.clusterMetadata.ClusterNameForFailoverVersion(task.GetVersion())
e.timerProcessor.NotifyNewTimers(clusterName, tasks)
e.timerProcessor.NotifyNewTask(clusterName, tasks)
}
}

Expand Down
10 changes: 5 additions & 5 deletions service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ type (

controller *gomock.Controller
mockShard *shard.TestContext
mockTxProcessor *queue.MockTransferQueueProcessor
mockTxProcessor *queue.MockProcessor
mockTimerProcessor *queue.MockProcessor
mockReplicationProcessor *MockReplicatorQueueProcessor
mockTimerProcessor *MocktimerQueueProcessor
mockEventsCache *events.MockCache
mockDomainCache *cache.MockDomainCache
mockClusterMetadata *cluster.MockMetadata
Expand Down Expand Up @@ -96,12 +96,12 @@ func (s *engine2Suite) SetupTest() {

s.controller = gomock.NewController(s.T())

s.mockTxProcessor = queue.NewMockTransferQueueProcessor(s.controller)
s.mockTxProcessor = queue.NewMockProcessor(s.controller)
s.mockTimerProcessor = queue.NewMockProcessor(s.controller)
s.mockReplicationProcessor = NewMockReplicatorQueueProcessor(s.controller)
s.mockTimerProcessor = NewMocktimerQueueProcessor(s.controller)
s.mockTxProcessor.EXPECT().NotifyNewTask(gomock.Any(), gomock.Any()).AnyTimes()
s.mockTimerProcessor.EXPECT().NotifyNewTask(gomock.Any(), gomock.Any()).AnyTimes()
s.mockReplicationProcessor.EXPECT().notifyNewTask().AnyTimes()
s.mockTimerProcessor.EXPECT().NotifyNewTimers(gomock.Any(), gomock.Any()).AnyTimes()

s.mockShard = shard.NewTestContext(
s.controller,
Expand Down
10 changes: 5 additions & 5 deletions service/history/historyEngine3_eventsv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ type (

controller *gomock.Controller
mockShard *shard.TestContext
mockTxProcessor *queue.MockTransferQueueProcessor
mockTxProcessor *queue.MockProcessor
mockTimerProcessor *queue.MockProcessor
mockReplicationProcessor *MockReplicatorQueueProcessor
mockTimerProcessor *MocktimerQueueProcessor
mockEventsCache *events.MockCache
mockDomainCache *cache.MockDomainCache
mockClusterMetadata *cluster.MockMetadata
Expand Down Expand Up @@ -91,12 +91,12 @@ func (s *engine3Suite) SetupTest() {
s.Assertions = require.New(s.T())

s.controller = gomock.NewController(s.T())
s.mockTxProcessor = queue.NewMockTransferQueueProcessor(s.controller)
s.mockTxProcessor = queue.NewMockProcessor(s.controller)
s.mockTimerProcessor = queue.NewMockProcessor(s.controller)
s.mockReplicationProcessor = NewMockReplicatorQueueProcessor(s.controller)
s.mockTimerProcessor = NewMocktimerQueueProcessor(s.controller)
s.mockTxProcessor.EXPECT().NotifyNewTask(gomock.Any(), gomock.Any()).AnyTimes()
s.mockTimerProcessor.EXPECT().NotifyNewTask(gomock.Any(), gomock.Any()).AnyTimes()
s.mockReplicationProcessor.EXPECT().notifyNewTask().AnyTimes()
s.mockTimerProcessor.EXPECT().NotifyNewTimers(gomock.Any(), gomock.Any()).AnyTimes()

s.mockShard = shard.NewTestContext(
s.controller,
Expand Down
10 changes: 5 additions & 5 deletions service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ type (

controller *gomock.Controller
mockShard *shard.TestContext
mockTxProcessor *queue.MockTransferQueueProcessor
mockTxProcessor *queue.MockProcessor
mockTimerProcessor *queue.MockProcessor
mockReplicationProcessor *MockReplicatorQueueProcessor
mockTimerProcessor *MocktimerQueueProcessor
mockDomainCache *cache.MockDomainCache
mockMatchingClient *matchingservicetest.MockClient
mockHistoryClient *historyservicetest.MockClient
Expand Down Expand Up @@ -105,14 +105,14 @@ func (s *engineSuite) SetupTest() {
s.Assertions = require.New(s.T())

s.controller = gomock.NewController(s.T())
s.mockTxProcessor = queue.NewMockTransferQueueProcessor(s.controller)
s.mockTxProcessor = queue.NewMockProcessor(s.controller)
s.mockTimerProcessor = queue.NewMockProcessor(s.controller)
s.mockReplicationProcessor = NewMockReplicatorQueueProcessor(s.controller)
s.mockTimerProcessor = NewMocktimerQueueProcessor(s.controller)
s.mockEventsReapplier = ndc.NewMockEventsReapplier(s.controller)
s.mockWorkflowResetter = reset.NewMockWorkflowResetter(s.controller)
s.mockTxProcessor.EXPECT().NotifyNewTask(gomock.Any(), gomock.Any()).AnyTimes()
s.mockTimerProcessor.EXPECT().NotifyNewTask(gomock.Any(), gomock.Any()).AnyTimes()
s.mockReplicationProcessor.EXPECT().notifyNewTask().AnyTimes()
s.mockTimerProcessor.EXPECT().NotifyNewTimers(gomock.Any(), gomock.Any()).AnyTimes()

s.mockShard = shard.NewTestContext(
s.controller,
Expand Down
10 changes: 5 additions & 5 deletions service/history/historyReplicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ type (
controller *gomock.Controller
mockShard *shard.TestContext
mockWorkflowResetor *reset.MockWorkflowResetor
mockTxProcessor *queue.MockTransferQueueProcessor
mockTxProcessor *queue.MockProcessor
mockTimerProcessor *queue.MockProcessor
mockReplicationProcessor *MockReplicatorQueueProcessor
mockTimerProcessor *MocktimerQueueProcessor
mockStateBuilder *execution.MockStateBuilder
mockDomainCache *cache.MockDomainCache
mockClusterMetadata *cluster.MockMetadata
Expand Down Expand Up @@ -102,13 +102,13 @@ func (s *historyReplicatorSuite) SetupTest() {

s.controller = gomock.NewController(s.T())
s.mockWorkflowResetor = reset.NewMockWorkflowResetor(s.controller)
s.mockTxProcessor = queue.NewMockTransferQueueProcessor(s.controller)
s.mockTxProcessor = queue.NewMockProcessor(s.controller)
s.mockTimerProcessor = queue.NewMockProcessor(s.controller)
s.mockReplicationProcessor = NewMockReplicatorQueueProcessor(s.controller)
s.mockTimerProcessor = NewMocktimerQueueProcessor(s.controller)
s.mockStateBuilder = execution.NewMockStateBuilder(s.controller)
s.mockTxProcessor.EXPECT().NotifyNewTask(gomock.Any(), gomock.Any()).AnyTimes()
s.mockTimerProcessor.EXPECT().NotifyNewTask(gomock.Any(), gomock.Any()).AnyTimes()
s.mockReplicationProcessor.EXPECT().notifyNewTask().AnyTimes()
s.mockTimerProcessor.EXPECT().NotifyNewTimers(gomock.Any(), gomock.Any()).AnyTimes()

s.mockShard = shard.NewTestContext(
s.controller,
Expand Down
4 changes: 2 additions & 2 deletions service/history/queue/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ type (
// TODO: add Offload() method
}

// TransferQueueProcessor is the interface for transfer task queue processor
TransferQueueProcessor interface {
// Processor is the interface for task queue processor
Processor interface {
common.Daemon
FailoverDomain(domainIDs map[string]struct{})
NotifyNewTask(clusterName string, transferTasks []persistence.Task)
Expand Down
Loading

0 comments on commit dd9cb49

Please sign in to comment.