From feb54ff6f99420ee3f70bbbce225682ef506e294 Mon Sep 17 00:00:00 2001 From: taylanisikdemir Date: Thu, 21 Dec 2023 09:30:00 -0800 Subject: [PATCH] Minor changes to improve readability of history's queue folder (#5517) --- service/history/queue/constants.go | 25 ++ service/history/queue/processing_queue.go | 113 +------ .../queue/processing_queue_collection.go | 47 ++- .../history/queue/processing_queue_state.go | 104 +++++++ service/history/queue/processor_base.go | 77 +---- service/history/queue/processor_options.go | 57 ++++ service/history/queue/queue_processor_util.go | 29 +- service/history/queue/split_policy.go | 24 +- service/history/queue/task_allocator.go | 6 +- .../queue/timer_queue_active_processor.go | 92 ++++++ .../queue/timer_queue_failover_processor.go | 118 ++++++++ .../history/queue/timer_queue_processor.go | 279 ++---------------- .../queue/timer_queue_standby_processor.go | 113 +++++++ .../history/queue/transfer_queue_processor.go | 40 ++- .../queue/transfer_queue_processor_base.go | 4 +- .../history/queue/transfer_queue_validator.go | 4 +- 16 files changed, 607 insertions(+), 525 deletions(-) create mode 100644 service/history/queue/constants.go create mode 100644 service/history/queue/processing_queue_state.go create mode 100644 service/history/queue/processor_options.go create mode 100644 service/history/queue/timer_queue_active_processor.go create mode 100644 service/history/queue/timer_queue_failover_processor.go create mode 100644 service/history/queue/timer_queue_standby_processor.go diff --git a/service/history/queue/constants.go b/service/history/queue/constants.go new file mode 100644 index 00000000000..71e16c9d373 --- /dev/null +++ b/service/history/queue/constants.go @@ -0,0 +1,25 @@ +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: + +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package queue + +const ( + defaultProcessingQueueLevel = 0 +) diff --git a/service/history/queue/processing_queue.go b/service/history/queue/processing_queue.go index 8bf8d69f36f..5ced012bd4c 100644 --- a/service/history/queue/processing_queue.go +++ b/service/history/queue/processing_queue.go @@ -37,14 +37,6 @@ var ( ) type ( - processingQueueStateImpl struct { - level int - ackLevel task.Key - readLevel task.Key - maxLevel task.Key - domainFilter DomainFilter - } - processingQueueImpl struct { state *processingQueueStateImpl outstandingTasks map[task.Key]task.Task @@ -54,39 +46,6 @@ type ( } ) -// NewProcessingQueueState creates a new state instance for processing queue -// readLevel will be set to the same value as ackLevel -func NewProcessingQueueState( - level int, - ackLevel task.Key, - maxLevel task.Key, - domainFilter DomainFilter, -) ProcessingQueueState { - return newProcessingQueueState( - level, - ackLevel, - ackLevel, - maxLevel, - domainFilter, - ) -} - -func newProcessingQueueState( - level int, - ackLevel task.Key, - readLevel task.Key, - maxLevel task.Key, - domainFilter DomainFilter, -) *processingQueueStateImpl { - return &processingQueueStateImpl{ - level: level, - ackLevel: ackLevel, - readLevel: readLevel, - maxLevel: maxLevel, - domainFilter: domainFilter, - } -} - // NewProcessingQueue creates a new processing queue based on its state func NewProcessingQueue( state ProcessingQueueState, @@ -134,39 +93,11 @@ func newProcessingQueue( return queue } -func (s *processingQueueStateImpl) Level() int { - return s.level -} - -func (s *processingQueueStateImpl) MaxLevel() task.Key { - return s.maxLevel -} - -func (s *processingQueueStateImpl) AckLevel() task.Key { - return s.ackLevel -} - -func (s *processingQueueStateImpl) ReadLevel() task.Key { - return s.readLevel -} - -func (s *processingQueueStateImpl) DomainFilter() DomainFilter { - return s.domainFilter -} - -func (s *processingQueueStateImpl) String() string { - return fmt.Sprintf("&{level: %+v, ackLevel: %+v, readLevel: %+v, maxLevel: %+v, domainFilter: %+v}", - s.level, s.ackLevel, s.readLevel, s.maxLevel, s.domainFilter, - ) -} - func (q *processingQueueImpl) State() ProcessingQueueState { return q.state } -func (q *processingQueueImpl) Split( - policy ProcessingQueueSplitPolicy, -) []ProcessingQueue { +func (q *processingQueueImpl) Split(policy ProcessingQueueSplitPolicy) []ProcessingQueue { newQueueStates := policy.Evaluate(q) if len(newQueueStates) == 0 { // no need to split, return self @@ -176,9 +107,7 @@ func (q *processingQueueImpl) Split( return splitProcessingQueue([]*processingQueueImpl{q}, newQueueStates, q.logger, q.metricsClient) } -func (q *processingQueueImpl) Merge( - queue ProcessingQueue, -) []ProcessingQueue { +func (q *processingQueueImpl) Merge(queue ProcessingQueue) []ProcessingQueue { q1, q2 := q, queue.(*processingQueueImpl) if q1.State().Level() != q2.State().Level() { @@ -246,10 +175,7 @@ func (q *processingQueueImpl) Merge( return splitProcessingQueue([]*processingQueueImpl{q1, q2}, newQueueStates, q.logger, q.metricsClient) } -func (q *processingQueueImpl) AddTasks( - tasks map[task.Key]task.Task, - newReadLevel task.Key, -) { +func (q *processingQueueImpl) AddTasks(tasks map[task.Key]task.Task, newReadLevel task.Key) { if newReadLevel.Less(q.state.readLevel) { q.logger.Fatal("processing queue read level moved backward", tag.Error( fmt.Errorf("current read level: %v, new read level: %v", q.state.readLevel, newReadLevel), @@ -411,51 +337,26 @@ func splitProcessingQueue( return newQueues } -func taskBelongsToProcessQueue( - state ProcessingQueueState, - key task.Key, - task task.Task, -) bool { +func taskBelongsToProcessQueue(state ProcessingQueueState, key task.Key, task task.Task) bool { return state.DomainFilter().Filter(task.GetDomainID()) && state.AckLevel().Less(key) && !state.MaxLevel().Less(key) } -func taskKeyEquals( - key1 task.Key, - key2 task.Key, -) bool { +func taskKeyEquals(key1 task.Key, key2 task.Key) bool { return !key1.Less(key2) && !key2.Less(key1) } -func minTaskKey( - key1 task.Key, - key2 task.Key, -) task.Key { +func minTaskKey(key1 task.Key, key2 task.Key) task.Key { if key1.Less(key2) { return key1 } return key2 } -func maxTaskKey( - key1 task.Key, - key2 task.Key, -) task.Key { +func maxTaskKey(key1 task.Key, key2 task.Key) task.Key { if key1.Less(key2) { return key2 } return key1 } - -func copyQueueState( - state ProcessingQueueState, -) *processingQueueStateImpl { - return newProcessingQueueState( - state.Level(), - state.AckLevel(), - state.ReadLevel(), - state.MaxLevel(), - state.DomainFilter(), - ) -} diff --git a/service/history/queue/processing_queue_collection.go b/service/history/queue/processing_queue_collection.go index 4403f68da35..739cfd575e6 100644 --- a/service/history/queue/processing_queue_collection.go +++ b/service/history/queue/processing_queue_collection.go @@ -24,16 +24,16 @@ import ( "fmt" "sort" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/service/history/task" ) -type ( - processingQueueCollection struct { - level int - queues []ProcessingQueue - activeQueue ProcessingQueue - } -) +type processingQueueCollection struct { + level int + queues []ProcessingQueue + activeQueue ProcessingQueue +} // NewProcessingQueueCollection creates a new collection for non-overlapping queues func NewProcessingQueueCollection(level int, queues []ProcessingQueue) ProcessingQueueCollection { @@ -43,10 +43,33 @@ func NewProcessingQueueCollection(level int, queues []ProcessingQueue) Processin queues: queues, } queueCollection.resetActiveQueue() - return queueCollection } +func newProcessingQueueCollections( + processingQueueStates []ProcessingQueueState, + logger log.Logger, + metricsClient metrics.Client, +) []ProcessingQueueCollection { + processingQueuesMap := make(map[int][]ProcessingQueue) // level -> state + for _, queueState := range processingQueueStates { + processingQueuesMap[queueState.Level()] = append(processingQueuesMap[queueState.Level()], NewProcessingQueue( + queueState, + logger, + metricsClient, + )) + } + processingQueueCollections := make([]ProcessingQueueCollection, 0, len(processingQueuesMap)) + for level, queues := range processingQueuesMap { + processingQueueCollections = append(processingQueueCollections, NewProcessingQueueCollection( + level, + queues, + )) + } + + return processingQueueCollections +} + func (c *processingQueueCollection) Level() int { return c.level } @@ -113,9 +136,7 @@ func (c *processingQueueCollection) UpdateAckLevels() (task.Key, int) { return minAckLevel, totalPendingTasks } -func (c *processingQueueCollection) Split( - policy ProcessingQueueSplitPolicy, -) []ProcessingQueue { +func (c *processingQueueCollection) Split(policy ProcessingQueueSplitPolicy) []ProcessingQueue { if len(c.queues) == 0 { return nil } @@ -142,9 +163,7 @@ func (c *processingQueueCollection) Split( return nextLevelQueues } -func (c *processingQueueCollection) Merge( - incomingQueues []ProcessingQueue, -) { +func (c *processingQueueCollection) Merge(incomingQueues []ProcessingQueue) { sortProcessingQueue(incomingQueues) newQueues := make([]ProcessingQueue, 0, len(c.queues)+len(incomingQueues)) diff --git a/service/history/queue/processing_queue_state.go b/service/history/queue/processing_queue_state.go new file mode 100644 index 00000000000..5cfb8e50569 --- /dev/null +++ b/service/history/queue/processing_queue_state.go @@ -0,0 +1,104 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package queue + +import ( + "fmt" + + "github.com/uber/cadence/service/history/task" +) + +type processingQueueStateImpl struct { + level int + ackLevel task.Key + readLevel task.Key + maxLevel task.Key + domainFilter DomainFilter +} + +// NewProcessingQueueState creates a new state instance for processing queue +// readLevel will be set to the same value as ackLevel +func NewProcessingQueueState( + level int, + ackLevel task.Key, + maxLevel task.Key, + domainFilter DomainFilter, +) ProcessingQueueState { + return newProcessingQueueState( + level, + ackLevel, + ackLevel, + maxLevel, + domainFilter, + ) +} + +func newProcessingQueueState( + level int, + ackLevel task.Key, + readLevel task.Key, + maxLevel task.Key, + domainFilter DomainFilter, +) *processingQueueStateImpl { + return &processingQueueStateImpl{ + level: level, + ackLevel: ackLevel, + readLevel: readLevel, + maxLevel: maxLevel, + domainFilter: domainFilter, + } +} + +func (s *processingQueueStateImpl) Level() int { + return s.level +} + +func (s *processingQueueStateImpl) MaxLevel() task.Key { + return s.maxLevel +} + +func (s *processingQueueStateImpl) AckLevel() task.Key { + return s.ackLevel +} + +func (s *processingQueueStateImpl) ReadLevel() task.Key { + return s.readLevel +} + +func (s *processingQueueStateImpl) DomainFilter() DomainFilter { + return s.domainFilter +} + +func (s *processingQueueStateImpl) String() string { + return fmt.Sprintf("&{level: %+v, ackLevel: %+v, readLevel: %+v, maxLevel: %+v, domainFilter: %+v}", + s.level, s.ackLevel, s.readLevel, s.maxLevel, s.domainFilter, + ) +} + +func copyQueueState(state ProcessingQueueState) *processingQueueStateImpl { + return newProcessingQueueState( + state.Level(), + state.AckLevel(), + state.ReadLevel(), + state.MaxLevel(), + state.DomainFilter(), + ) +} diff --git a/service/history/queue/processor_base.go b/service/history/queue/processor_base.go index afb76a88416..a9dbc8578a5 100644 --- a/service/history/queue/processor_base.go +++ b/service/history/queue/processor_base.go @@ -27,7 +27,6 @@ import ( "time" "github.com/uber/cadence/common" - "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" @@ -46,40 +45,6 @@ type ( updateProcessingQueueStatesFn func([]ProcessingQueueState) error queueShutdownFn func() error - queueProcessorOptions struct { - BatchSize dynamicconfig.IntPropertyFn - DeleteBatchSize dynamicconfig.IntPropertyFn - MaxPollRPS dynamicconfig.IntPropertyFn - MaxPollInterval dynamicconfig.DurationPropertyFn - MaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn - UpdateAckInterval dynamicconfig.DurationPropertyFn - UpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn - RedispatchInterval dynamicconfig.DurationPropertyFn - RedispatchIntervalJitterCoefficient dynamicconfig.FloatPropertyFn - MaxRedispatchQueueSize dynamicconfig.IntPropertyFn - MaxStartJitterInterval dynamicconfig.DurationPropertyFn - SplitQueueInterval dynamicconfig.DurationPropertyFn - SplitQueueIntervalJitterCoefficient dynamicconfig.FloatPropertyFn - EnableSplit dynamicconfig.BoolPropertyFn - SplitMaxLevel dynamicconfig.IntPropertyFn - EnableRandomSplitByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter - RandomSplitProbability dynamicconfig.FloatPropertyFn - EnablePendingTaskSplitByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter - PendingTaskSplitThreshold dynamicconfig.MapPropertyFn - EnableStuckTaskSplitByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter - StuckTaskSplitThreshold dynamicconfig.MapPropertyFn - SplitLookAheadDurationByDomainID dynamicconfig.DurationPropertyFnWithDomainIDFilter - PollBackoffInterval dynamicconfig.DurationPropertyFn - PollBackoffIntervalJitterCoefficient dynamicconfig.FloatPropertyFn - EnablePersistQueueStates dynamicconfig.BoolPropertyFn - EnableLoadQueueStates dynamicconfig.BoolPropertyFn - EnableValidator dynamicconfig.BoolPropertyFn - ValidationInterval dynamicconfig.DurationPropertyFn - // MaxPendingTaskSize is used in cross cluster queue to limit the pending task count - MaxPendingTaskSize dynamicconfig.IntPropertyFn - MetricScope int - } - actionNotification struct { ctx context.Context action *Action @@ -223,9 +188,7 @@ func (p *processorBase) updateAckLevel() (bool, task.Key, error) { return false, minAckLevel, nil } -func (p *processorBase) initializeSplitPolicy( - lookAheadFunc lookAheadFunc, -) ProcessingQueueSplitPolicy { +func (p *processorBase) initializeSplitPolicy(lookAheadFunc lookAheadFunc) ProcessingQueueSplitPolicy { if !p.options.EnableSplit() { return nil } @@ -280,10 +243,7 @@ func (p *processorBase) initializeSplitPolicy( return NewAggregatedSplitPolicy(policies...) } -func (p *processorBase) splitProcessingQueueCollection( - splitPolicy ProcessingQueueSplitPolicy, - upsertPollTimeFn func(int, time.Time), -) { +func (p *processorBase) splitProcessingQueueCollection(splitPolicy ProcessingQueueSplitPolicy, upsertPollTimeFn func(int, time.Time)) { defer p.emitProcessingQueueMetrics() if splitPolicy == nil { @@ -366,10 +326,7 @@ func (p *processorBase) addAction(ctx context.Context, action *Action) (chan act } } -func (p *processorBase) handleActionNotification( - notification actionNotification, - postActionFn func(), -) { +func (p *processorBase) handleActionNotification(notification actionNotification, postActionFn func()) { var result *ActionResult var err error switch notification.action.ActionType { @@ -481,33 +438,7 @@ func (p *processorBase) submitTask(task task.Task) (bool, error) { return true, nil } -func newProcessingQueueCollections( - processingQueueStates []ProcessingQueueState, - logger log.Logger, - metricsClient metrics.Client, -) []ProcessingQueueCollection { - processingQueuesMap := make(map[int][]ProcessingQueue) // level -> state - for _, queueState := range processingQueueStates { - processingQueuesMap[queueState.Level()] = append(processingQueuesMap[queueState.Level()], NewProcessingQueue( - queueState, - logger, - metricsClient, - )) - } - processingQueueCollections := make([]ProcessingQueueCollection, 0, len(processingQueuesMap)) - for level, queues := range processingQueuesMap { - processingQueueCollections = append(processingQueueCollections, NewProcessingQueueCollection( - level, - queues, - )) - } - - return processingQueueCollections -} - -func getPendingTasksMetricIdx( - scopeIdx int, -) int { +func getPendingTasksMetricIdx(scopeIdx int) int { switch scopeIdx { case metrics.TimerActiveQueueProcessorScope: return metrics.ShardInfoTimerActivePendingTasksTimer diff --git a/service/history/queue/processor_options.go b/service/history/queue/processor_options.go new file mode 100644 index 00000000000..7321afd6497 --- /dev/null +++ b/service/history/queue/processor_options.go @@ -0,0 +1,57 @@ +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: + +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package queue + +import "github.com/uber/cadence/common/dynamicconfig" + +type queueProcessorOptions struct { + BatchSize dynamicconfig.IntPropertyFn + DeleteBatchSize dynamicconfig.IntPropertyFn + MaxPollRPS dynamicconfig.IntPropertyFn + MaxPollInterval dynamicconfig.DurationPropertyFn + MaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn + UpdateAckInterval dynamicconfig.DurationPropertyFn + UpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn + RedispatchInterval dynamicconfig.DurationPropertyFn + RedispatchIntervalJitterCoefficient dynamicconfig.FloatPropertyFn + MaxRedispatchQueueSize dynamicconfig.IntPropertyFn + MaxStartJitterInterval dynamicconfig.DurationPropertyFn + SplitQueueInterval dynamicconfig.DurationPropertyFn + SplitQueueIntervalJitterCoefficient dynamicconfig.FloatPropertyFn + EnableSplit dynamicconfig.BoolPropertyFn + SplitMaxLevel dynamicconfig.IntPropertyFn + EnableRandomSplitByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter + RandomSplitProbability dynamicconfig.FloatPropertyFn + EnablePendingTaskSplitByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter + PendingTaskSplitThreshold dynamicconfig.MapPropertyFn + EnableStuckTaskSplitByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter + StuckTaskSplitThreshold dynamicconfig.MapPropertyFn + SplitLookAheadDurationByDomainID dynamicconfig.DurationPropertyFnWithDomainIDFilter + PollBackoffInterval dynamicconfig.DurationPropertyFn + PollBackoffIntervalJitterCoefficient dynamicconfig.FloatPropertyFn + EnablePersistQueueStates dynamicconfig.BoolPropertyFn + EnableLoadQueueStates dynamicconfig.BoolPropertyFn + EnableValidator dynamicconfig.BoolPropertyFn + ValidationInterval dynamicconfig.DurationPropertyFn + // MaxPendingTaskSize is used in cross cluster queue to limit the pending task count + MaxPendingTaskSize dynamicconfig.IntPropertyFn + MetricScope int +} diff --git a/service/history/queue/queue_processor_util.go b/service/history/queue/queue_processor_util.go index 3b3fc7a79b0..9f03fd070d3 100644 --- a/service/history/queue/queue_processor_util.go +++ b/service/history/queue/queue_processor_util.go @@ -27,9 +27,7 @@ import ( "github.com/uber/cadence/common/types" ) -func convertToPersistenceTransferProcessingQueueStates( - states []ProcessingQueueState, -) []*types.ProcessingQueueState { +func convertToPersistenceTransferProcessingQueueStates(states []ProcessingQueueState) []*types.ProcessingQueueState { pStates := make([]*types.ProcessingQueueState, 0, len(states)) for _, state := range states { pStates = append(pStates, &types.ProcessingQueueState{ @@ -43,9 +41,7 @@ func convertToPersistenceTransferProcessingQueueStates( return pStates } -func convertFromPersistenceTransferProcessingQueueStates( - pStates []*types.ProcessingQueueState, -) []ProcessingQueueState { +func convertFromPersistenceTransferProcessingQueueStates(pStates []*types.ProcessingQueueState) []ProcessingQueueState { states := make([]ProcessingQueueState, 0, len(pStates)) for _, pState := range pStates { states = append(states, NewProcessingQueueState( @@ -59,9 +55,7 @@ func convertFromPersistenceTransferProcessingQueueStates( return states } -func convertToPersistenceTimerProcessingQueueStates( - states []ProcessingQueueState, -) []*types.ProcessingQueueState { +func convertToPersistenceTimerProcessingQueueStates(states []ProcessingQueueState) []*types.ProcessingQueueState { pStates := make([]*types.ProcessingQueueState, 0, len(states)) for _, state := range states { pStates = append(pStates, &types.ProcessingQueueState{ @@ -75,9 +69,7 @@ func convertToPersistenceTimerProcessingQueueStates( return pStates } -func convertFromPersistenceTimerProcessingQueueStates( - pStates []*types.ProcessingQueueState, -) []ProcessingQueueState { +func convertFromPersistenceTimerProcessingQueueStates(pStates []*types.ProcessingQueueState) []ProcessingQueueState { states := make([]ProcessingQueueState, 0, len(pStates)) for _, pState := range pStates { states = append(states, NewProcessingQueueState( @@ -91,9 +83,7 @@ func convertFromPersistenceTimerProcessingQueueStates( return states } -func convertToPersistenceDomainFilter( - domainFilter DomainFilter, -) *types.DomainFilter { +func convertToPersistenceDomainFilter(domainFilter DomainFilter) *types.DomainFilter { domainIDs := make([]string, 0, len(domainFilter.DomainIDs)) for domainID := range domainFilter.DomainIDs { domainIDs = append(domainIDs, domainID) @@ -105,9 +95,7 @@ func convertToPersistenceDomainFilter( } } -func convertFromPersistenceDomainFilter( - domainFilter *types.DomainFilter, -) DomainFilter { +func convertFromPersistenceDomainFilter(domainFilter *types.DomainFilter) DomainFilter { domainIDs := make(map[string]struct{}) for _, domainID := range domainFilter.DomainIDs { domainIDs[domainID] = struct{}{} @@ -116,10 +104,7 @@ func convertFromPersistenceDomainFilter( return NewDomainFilter(domainIDs, domainFilter.GetReverseMatch()) } -func validateProcessingQueueStates( - pStates []*types.ProcessingQueueState, - ackLevel interface{}, -) bool { +func validateProcessingQueueStates(pStates []*types.ProcessingQueueState, ackLevel interface{}) bool { if len(pStates) == 0 { return false } diff --git a/service/history/queue/split_policy.go b/service/history/queue/split_policy.go index 83a1573db81..73d641fdd66 100644 --- a/service/history/queue/split_policy.go +++ b/service/history/queue/split_policy.go @@ -162,17 +162,13 @@ func NewRandomSplitPolicy( // that which combines other policies. Policies are evaluated in the order // they passed in, and if one policy returns an non-empty result, that result // will be returned as is and policies after that one will not be evaluated -func NewAggregatedSplitPolicy( - policies ...ProcessingQueueSplitPolicy, -) ProcessingQueueSplitPolicy { +func NewAggregatedSplitPolicy(policies ...ProcessingQueueSplitPolicy) ProcessingQueueSplitPolicy { return &aggregatedSplitPolicy{ policies: policies, } } -func (p *pendingTaskSplitPolicy) Evaluate( - queue ProcessingQueue, -) []ProcessingQueueState { +func (p *pendingTaskSplitPolicy) Evaluate(queue ProcessingQueue) []ProcessingQueueState { queueImpl := queue.(*processingQueueImpl) if queueImpl.state.level == p.maxNewQueueLevel { @@ -221,9 +217,7 @@ func (p *pendingTaskSplitPolicy) Evaluate( ) } -func (p *stuckTaskSplitPolicy) Evaluate( - queue ProcessingQueue, -) []ProcessingQueueState { +func (p *stuckTaskSplitPolicy) Evaluate(queue ProcessingQueue) []ProcessingQueueState { queueImpl := queue.(*processingQueueImpl) if queueImpl.state.level == p.maxNewQueueLevel { @@ -267,9 +261,7 @@ func (p *stuckTaskSplitPolicy) Evaluate( ) } -func (p *selectedDomainSplitPolicy) Evaluate( - queue ProcessingQueue, -) []ProcessingQueueState { +func (p *selectedDomainSplitPolicy) Evaluate(queue ProcessingQueue) []ProcessingQueueState { domainBelongsToQueue := false currentQueueState := queue.State() currentDomainFilter := currentQueueState.DomainFilter() @@ -312,9 +304,7 @@ func (p *selectedDomainSplitPolicy) Evaluate( } } -func (p *randomSplitPolicy) Evaluate( - queue ProcessingQueue, -) []ProcessingQueueState { +func (p *randomSplitPolicy) Evaluate(queue ProcessingQueue) []ProcessingQueueState { queueImpl := queue.(*processingQueueImpl) if queueImpl.state.level == p.maxNewQueueLevel { @@ -364,9 +354,7 @@ func (p *randomSplitPolicy) Evaluate( ) } -func (p *aggregatedSplitPolicy) Evaluate( - queue ProcessingQueue, -) []ProcessingQueueState { +func (p *aggregatedSplitPolicy) Evaluate(queue ProcessingQueue) []ProcessingQueueState { for _, policy := range p.policies { newStates := policy.Evaluate(queue) if len(newStates) != 0 { diff --git a/service/history/queue/task_allocator.go b/service/history/queue/task_allocator.go index 422e8684712..e636d958cb9 100644 --- a/service/history/queue/task_allocator.go +++ b/service/history/queue/task_allocator.go @@ -168,11 +168,7 @@ func (t *taskAllocatorImpl) VerifyStandbyTask(standbyCluster string, taskDomainI return true, nil } -func (t *taskAllocatorImpl) checkDomainPendingActive( - domainEntry *cache.DomainCacheEntry, - taskDomainID string, - task interface{}, -) error { +func (t *taskAllocatorImpl) checkDomainPendingActive(domainEntry *cache.DomainCacheEntry, taskDomainID string, task interface{}) error { if domainEntry.IsGlobalDomain() && domainEntry.GetFailoverEndTime() != nil { // the domain is pending active, pause on processing this task diff --git a/service/history/queue/timer_queue_active_processor.go b/service/history/queue/timer_queue_active_processor.go new file mode 100644 index 00000000000..be681f4f889 --- /dev/null +++ b/service/history/queue/timer_queue_active_processor.go @@ -0,0 +1,92 @@ +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: + +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package queue + +import ( + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/service/history/engine" + "github.com/uber/cadence/service/history/shard" + "github.com/uber/cadence/service/history/task" +) + +func newTimerQueueActiveProcessor( + clusterName string, + shard shard.Context, + historyEngine engine.Engine, + taskProcessor task.Processor, + taskAllocator TaskAllocator, + taskExecutor task.Executor, + logger log.Logger, +) *timerQueueProcessorBase { + config := shard.GetConfig() + options := newTimerQueueProcessorOptions(config, true, false) + + logger = logger.WithTags(tag.ClusterName(clusterName)) + + taskFilter := func(taskInfo task.Info) (bool, error) { + timer, ok := taskInfo.(*persistence.TimerTaskInfo) + if !ok { + return false, errUnexpectedQueueTask + } + if notRegistered, err := isDomainNotRegistered(shard, timer.DomainID); notRegistered && err == nil { + logger.Info("Domain is not in registered status, skip task in active timer queue.", tag.WorkflowDomainID(timer.DomainID), tag.Value(taskInfo)) + return false, nil + } + + return taskAllocator.VerifyActiveTask(timer.DomainID, timer) + } + + updateMaxReadLevel := func() task.Key { + return newTimerTaskKey(shard.UpdateTimerMaxReadLevel(clusterName), 0) + } + + updateClusterAckLevel := func(ackLevel task.Key) error { + return shard.UpdateTimerClusterAckLevel(clusterName, ackLevel.(timerTaskKey).visibilityTimestamp) + } + + updateProcessingQueueStates := func(states []ProcessingQueueState) error { + pStates := convertToPersistenceTimerProcessingQueueStates(states) + return shard.UpdateTimerProcessingQueueStates(clusterName, pStates) + } + + queueShutdown := func() error { + return nil + } + + return newTimerQueueProcessorBase( + clusterName, + shard, + loadTimerProcessingQueueStates(clusterName, shard, options, logger), + taskProcessor, + NewLocalTimerGate(shard.GetTimeSource()), + options, + updateMaxReadLevel, + updateClusterAckLevel, + updateProcessingQueueStates, + queueShutdown, + taskFilter, + taskExecutor, + logger, + shard.GetMetricsClient(), + ) +} diff --git a/service/history/queue/timer_queue_failover_processor.go b/service/history/queue/timer_queue_failover_processor.go new file mode 100644 index 00000000000..78f52e26f45 --- /dev/null +++ b/service/history/queue/timer_queue_failover_processor.go @@ -0,0 +1,118 @@ +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: + +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package queue + +import ( + "time" + + "github.com/pborman/uuid" + + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/service/history/engine" + "github.com/uber/cadence/service/history/shard" + "github.com/uber/cadence/service/history/task" +) + +func newTimerQueueFailoverProcessor( + standbyClusterName string, + shardContext shard.Context, + historyEngine engine.Engine, + taskProcessor task.Processor, + taskAllocator TaskAllocator, + taskExecutor task.Executor, + logger log.Logger, + minLevel, maxLevel time.Time, + domainIDs map[string]struct{}, +) (updateClusterAckLevelFn, *timerQueueProcessorBase) { + config := shardContext.GetConfig() + options := newTimerQueueProcessorOptions(config, true, true) + + currentClusterName := shardContext.GetService().GetClusterMetadata().GetCurrentClusterName() + failoverStartTime := shardContext.GetTimeSource().Now() + failoverUUID := uuid.New() + logger = logger.WithTags( + tag.ClusterName(currentClusterName), + tag.WorkflowDomainIDs(domainIDs), + tag.FailoverMsg("from: "+standbyClusterName), + ) + + taskFilter := func(taskInfo task.Info) (bool, error) { + timer, ok := taskInfo.(*persistence.TimerTaskInfo) + if !ok { + return false, errUnexpectedQueueTask + } + if notRegistered, err := isDomainNotRegistered(shardContext, timer.DomainID); notRegistered && err == nil { + logger.Info("Domain is not in registered status, skip task in failover timer queue.", tag.WorkflowDomainID(timer.DomainID), tag.Value(taskInfo)) + return false, nil + } + return taskAllocator.VerifyFailoverActiveTask(domainIDs, timer.DomainID, timer) + } + + maxReadLevelTaskKey := newTimerTaskKey(maxLevel, 0) + updateMaxReadLevel := func() task.Key { + return maxReadLevelTaskKey // this is a const + } + + updateClusterAckLevel := func(ackLevel task.Key) error { + return shardContext.UpdateTimerFailoverLevel( + failoverUUID, + shard.TimerFailoverLevel{ + StartTime: failoverStartTime, + MinLevel: minLevel, + CurrentLevel: ackLevel.(timerTaskKey).visibilityTimestamp, + MaxLevel: maxLevel, + DomainIDs: domainIDs, + }, + ) + } + + queueShutdown := func() error { + return shardContext.DeleteTimerFailoverLevel(failoverUUID) + } + + processingQueueStates := []ProcessingQueueState{ + NewProcessingQueueState( + defaultProcessingQueueLevel, + newTimerTaskKey(minLevel, 0), + maxReadLevelTaskKey, + NewDomainFilter(domainIDs, false), + ), + } + + return updateClusterAckLevel, newTimerQueueProcessorBase( + currentClusterName, // should use current cluster's time when doing domain failover + shardContext, + processingQueueStates, + taskProcessor, + NewLocalTimerGate(shardContext.GetTimeSource()), + options, + updateMaxReadLevel, + updateClusterAckLevel, + nil, + queueShutdown, + taskFilter, + taskExecutor, + logger, + shardContext.GetMetricsClient(), + ) +} diff --git a/service/history/queue/timer_queue_processor.go b/service/history/queue/timer_queue_processor.go index 58728c9d2b7..f5062c2387e 100644 --- a/service/history/queue/timer_queue_processor.go +++ b/service/history/queue/timer_queue_processor.go @@ -27,8 +27,6 @@ import ( "sync/atomic" "time" - "github.com/pborman/uuid" - "github.com/uber/cadence/common" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" @@ -46,30 +44,28 @@ import ( "github.com/uber/cadence/service/worker/archiver" ) -type ( - timerQueueProcessor struct { - shard shard.Context - historyEngine engine.Engine - taskProcessor task.Processor +type timerQueueProcessor struct { + shard shard.Context + historyEngine engine.Engine + taskProcessor task.Processor - config *config.Config - currentClusterName string + config *config.Config + currentClusterName string - metricsClient metrics.Client - logger log.Logger + metricsClient metrics.Client + logger log.Logger - status int32 - shutdownChan chan struct{} - shutdownWG sync.WaitGroup + status int32 + shutdownChan chan struct{} + shutdownWG sync.WaitGroup - ackLevel time.Time - taskAllocator TaskAllocator - activeTaskExecutor task.Executor - activeQueueProcessor *timerQueueProcessorBase - standbyQueueProcessors map[string]*timerQueueProcessorBase - standbyQueueTimerGates map[string]RemoteTimerGate - } -) + ackLevel time.Time + taskAllocator TaskAllocator + activeTaskExecutor task.Executor + activeQueueProcessor *timerQueueProcessorBase + standbyQueueProcessors map[string]*timerQueueProcessorBase + standbyQueueTimerGates map[string]RemoteTimerGate +} // NewTimerQueueProcessor creates a new timer QueueProcessor func NewTimerQueueProcessor( @@ -211,9 +207,7 @@ func (t *timerQueueProcessor) NotifyNewTask(clusterName string, info *hcommon.No standbyQueueProcessor.notifyNewTimers(info.Tasks) } -func (t *timerQueueProcessor) FailoverDomain( - domainIDs map[string]struct{}, -) { +func (t *timerQueueProcessor) FailoverDomain(domainIDs map[string]struct{}) { // Failover queue is used to scan all inflight tasks, if queue processor is not // started, there's no inflight task and we don't need to create a failover processor. // Also the HandleAction will be blocked if queue processor processing loop is not running. @@ -248,6 +242,7 @@ func (t *timerQueueProcessor) FailoverDomain( maxReadLevel = queueReadLevel } } + // TODO: Below Add call has no effect, understand the underlying intent and fix it. maxReadLevel.Add(1 * time.Millisecond) t.logger.Info("Timer Failover Triggered", @@ -278,11 +273,7 @@ func (t *timerQueueProcessor) FailoverDomain( failoverQueueProcessor.Start() } -func (t *timerQueueProcessor) HandleAction( - ctx context.Context, - clusterName string, - action *Action, -) (*ActionResult, error) { +func (t *timerQueueProcessor) HandleAction(ctx context.Context, clusterName string, action *Action) (*ActionResult, error) { var resultNotificationCh chan actionResultNotification var added bool if clusterName == t.currentClusterName { @@ -425,234 +416,6 @@ func (t *timerQueueProcessor) completeTimer() error { return t.shard.UpdateTimerAckLevel(t.ackLevel) } -func newTimerQueueActiveProcessor( - clusterName string, - shard shard.Context, - historyEngine engine.Engine, - taskProcessor task.Processor, - taskAllocator TaskAllocator, - taskExecutor task.Executor, - logger log.Logger, -) *timerQueueProcessorBase { - config := shard.GetConfig() - options := newTimerQueueProcessorOptions(config, true, false) - - logger = logger.WithTags(tag.ClusterName(clusterName)) - - taskFilter := func(taskInfo task.Info) (bool, error) { - timer, ok := taskInfo.(*persistence.TimerTaskInfo) - if !ok { - return false, errUnexpectedQueueTask - } - if notRegistered, err := isDomainNotRegistered(shard, timer.DomainID); notRegistered && err == nil { - logger.Info("Domain is not in registered status, skip task in active timer queue.", tag.WorkflowDomainID(timer.DomainID), tag.Value(taskInfo)) - return false, nil - } - - return taskAllocator.VerifyActiveTask(timer.DomainID, timer) - } - - updateMaxReadLevel := func() task.Key { - return newTimerTaskKey(shard.UpdateTimerMaxReadLevel(clusterName), 0) - } - - updateClusterAckLevel := func(ackLevel task.Key) error { - return shard.UpdateTimerClusterAckLevel(clusterName, ackLevel.(timerTaskKey).visibilityTimestamp) - } - - updateProcessingQueueStates := func(states []ProcessingQueueState) error { - pStates := convertToPersistenceTimerProcessingQueueStates(states) - return shard.UpdateTimerProcessingQueueStates(clusterName, pStates) - } - - queueShutdown := func() error { - return nil - } - - return newTimerQueueProcessorBase( - clusterName, - shard, - loadTimerProcessingQueueStates(clusterName, shard, options, logger), - taskProcessor, - NewLocalTimerGate(shard.GetTimeSource()), - options, - updateMaxReadLevel, - updateClusterAckLevel, - updateProcessingQueueStates, - queueShutdown, - taskFilter, - taskExecutor, - logger, - shard.GetMetricsClient(), - ) -} - -func newTimerQueueStandbyProcessor( - clusterName string, - shard shard.Context, - historyEngine engine.Engine, - taskProcessor task.Processor, - taskAllocator TaskAllocator, - taskExecutor task.Executor, - logger log.Logger, -) (*timerQueueProcessorBase, RemoteTimerGate) { - config := shard.GetConfig() - options := newTimerQueueProcessorOptions(config, false, false) - - logger = logger.WithTags(tag.ClusterName(clusterName)) - - taskFilter := func(taskInfo task.Info) (bool, error) { - timer, ok := taskInfo.(*persistence.TimerTaskInfo) - if !ok { - return false, errUnexpectedQueueTask - } - if notRegistered, err := isDomainNotRegistered(shard, timer.DomainID); notRegistered && err == nil { - logger.Info("Domain is not in registered status, skip task in standby timer queue.", tag.WorkflowDomainID(timer.DomainID), tag.Value(taskInfo)) - return false, nil - } - if timer.TaskType == persistence.TaskTypeWorkflowTimeout || - timer.TaskType == persistence.TaskTypeDeleteHistoryEvent { - domainEntry, err := shard.GetDomainCache().GetDomainByID(timer.DomainID) - if err == nil { - if domainEntry.HasReplicationCluster(clusterName) { - // guarantee the processing of workflow execution history deletion - return true, nil - } - } else { - if _, ok := err.(*types.EntityNotExistsError); !ok { - // retry the task if failed to find the domain - logger.Warn("Cannot find domain", tag.WorkflowDomainID(timer.DomainID)) - return false, err - } - logger.Warn("Cannot find domain, default to not process task.", tag.WorkflowDomainID(timer.DomainID), tag.Value(timer)) - return false, nil - } - } - return taskAllocator.VerifyStandbyTask(clusterName, timer.DomainID, timer) - } - - updateMaxReadLevel := func() task.Key { - return newTimerTaskKey(shard.UpdateTimerMaxReadLevel(clusterName), 0) - } - - updateClusterAckLevel := func(ackLevel task.Key) error { - return shard.UpdateTimerClusterAckLevel(clusterName, ackLevel.(timerTaskKey).visibilityTimestamp) - } - - updateProcessingQueueStates := func(states []ProcessingQueueState) error { - pStates := convertToPersistenceTimerProcessingQueueStates(states) - return shard.UpdateTimerProcessingQueueStates(clusterName, pStates) - } - - queueShutdown := func() error { - return nil - } - - remoteTimerGate := NewRemoteTimerGate() - remoteTimerGate.SetCurrentTime(shard.GetCurrentTime(clusterName)) - - return newTimerQueueProcessorBase( - clusterName, - shard, - loadTimerProcessingQueueStates(clusterName, shard, options, logger), - taskProcessor, - remoteTimerGate, - options, - updateMaxReadLevel, - updateClusterAckLevel, - updateProcessingQueueStates, - queueShutdown, - taskFilter, - taskExecutor, - logger, - shard.GetMetricsClient(), - ), remoteTimerGate -} - -func newTimerQueueFailoverProcessor( - standbyClusterName string, - shardContext shard.Context, - historyEngine engine.Engine, - taskProcessor task.Processor, - taskAllocator TaskAllocator, - taskExecutor task.Executor, - logger log.Logger, - minLevel, maxLevel time.Time, - domainIDs map[string]struct{}, -) (updateClusterAckLevelFn, *timerQueueProcessorBase) { - config := shardContext.GetConfig() - options := newTimerQueueProcessorOptions(config, true, true) - - currentClusterName := shardContext.GetService().GetClusterMetadata().GetCurrentClusterName() - failoverStartTime := shardContext.GetTimeSource().Now() - failoverUUID := uuid.New() - logger = logger.WithTags( - tag.ClusterName(currentClusterName), - tag.WorkflowDomainIDs(domainIDs), - tag.FailoverMsg("from: "+standbyClusterName), - ) - - taskFilter := func(taskInfo task.Info) (bool, error) { - timer, ok := taskInfo.(*persistence.TimerTaskInfo) - if !ok { - return false, errUnexpectedQueueTask - } - if notRegistered, err := isDomainNotRegistered(shardContext, timer.DomainID); notRegistered && err == nil { - logger.Info("Domain is not in registered status, skip task in failover timer queue.", tag.WorkflowDomainID(timer.DomainID), tag.Value(taskInfo)) - return false, nil - } - return taskAllocator.VerifyFailoverActiveTask(domainIDs, timer.DomainID, timer) - } - - maxReadLevelTaskKey := newTimerTaskKey(maxLevel, 0) - updateMaxReadLevel := func() task.Key { - return maxReadLevelTaskKey // this is a const - } - - updateClusterAckLevel := func(ackLevel task.Key) error { - return shardContext.UpdateTimerFailoverLevel( - failoverUUID, - shard.TimerFailoverLevel{ - StartTime: failoverStartTime, - MinLevel: minLevel, - CurrentLevel: ackLevel.(timerTaskKey).visibilityTimestamp, - MaxLevel: maxLevel, - DomainIDs: domainIDs, - }, - ) - } - - queueShutdown := func() error { - return shardContext.DeleteTimerFailoverLevel(failoverUUID) - } - - processingQueueStates := []ProcessingQueueState{ - NewProcessingQueueState( - defaultProcessingQueueLevel, - newTimerTaskKey(minLevel, 0), - maxReadLevelTaskKey, - NewDomainFilter(domainIDs, false), - ), - } - - return updateClusterAckLevel, newTimerQueueProcessorBase( - currentClusterName, // should use current cluster's time when doing domain failover - shardContext, - processingQueueStates, - taskProcessor, - NewLocalTimerGate(shardContext.GetTimeSource()), - options, - updateMaxReadLevel, - updateClusterAckLevel, - nil, - queueShutdown, - taskFilter, - taskExecutor, - logger, - shardContext.GetMetricsClient(), - ) -} - func loadTimerProcessingQueueStates( clusterName string, shard shard.Context, diff --git a/service/history/queue/timer_queue_standby_processor.go b/service/history/queue/timer_queue_standby_processor.go new file mode 100644 index 00000000000..3ed59858925 --- /dev/null +++ b/service/history/queue/timer_queue_standby_processor.go @@ -0,0 +1,113 @@ +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: + +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package queue + +import ( + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/history/engine" + "github.com/uber/cadence/service/history/shard" + "github.com/uber/cadence/service/history/task" +) + +func newTimerQueueStandbyProcessor( + clusterName string, + shard shard.Context, + historyEngine engine.Engine, + taskProcessor task.Processor, + taskAllocator TaskAllocator, + taskExecutor task.Executor, + logger log.Logger, +) (*timerQueueProcessorBase, RemoteTimerGate) { + config := shard.GetConfig() + options := newTimerQueueProcessorOptions(config, false, false) + + logger = logger.WithTags(tag.ClusterName(clusterName)) + + taskFilter := func(taskInfo task.Info) (bool, error) { + timer, ok := taskInfo.(*persistence.TimerTaskInfo) + if !ok { + return false, errUnexpectedQueueTask + } + if notRegistered, err := isDomainNotRegistered(shard, timer.DomainID); notRegistered && err == nil { + logger.Info("Domain is not in registered status, skip task in standby timer queue.", tag.WorkflowDomainID(timer.DomainID), tag.Value(taskInfo)) + return false, nil + } + if timer.TaskType == persistence.TaskTypeWorkflowTimeout || + timer.TaskType == persistence.TaskTypeDeleteHistoryEvent { + domainEntry, err := shard.GetDomainCache().GetDomainByID(timer.DomainID) + if err == nil { + if domainEntry.HasReplicationCluster(clusterName) { + // guarantee the processing of workflow execution history deletion + return true, nil + } + } else { + if _, ok := err.(*types.EntityNotExistsError); !ok { + // retry the task if failed to find the domain + logger.Warn("Cannot find domain", tag.WorkflowDomainID(timer.DomainID)) + return false, err + } + logger.Warn("Cannot find domain, default to not process task.", tag.WorkflowDomainID(timer.DomainID), tag.Value(timer)) + return false, nil + } + } + return taskAllocator.VerifyStandbyTask(clusterName, timer.DomainID, timer) + } + + updateMaxReadLevel := func() task.Key { + return newTimerTaskKey(shard.UpdateTimerMaxReadLevel(clusterName), 0) + } + + updateClusterAckLevel := func(ackLevel task.Key) error { + return shard.UpdateTimerClusterAckLevel(clusterName, ackLevel.(timerTaskKey).visibilityTimestamp) + } + + updateProcessingQueueStates := func(states []ProcessingQueueState) error { + pStates := convertToPersistenceTimerProcessingQueueStates(states) + return shard.UpdateTimerProcessingQueueStates(clusterName, pStates) + } + + queueShutdown := func() error { + return nil + } + + remoteTimerGate := NewRemoteTimerGate() + remoteTimerGate.SetCurrentTime(shard.GetCurrentTime(clusterName)) + + return newTimerQueueProcessorBase( + clusterName, + shard, + loadTimerProcessingQueueStates(clusterName, shard, options, logger), + taskProcessor, + remoteTimerGate, + options, + updateMaxReadLevel, + updateClusterAckLevel, + updateProcessingQueueStates, + queueShutdown, + taskFilter, + taskExecutor, + logger, + shard.GetMetricsClient(), + ), remoteTimerGate +} diff --git a/service/history/queue/transfer_queue_processor.go b/service/history/queue/transfer_queue_processor.go index 9eff59ae87f..f6827f7b47e 100644 --- a/service/history/queue/transfer_queue_processor.go +++ b/service/history/queue/transfer_queue_processor.go @@ -49,10 +49,6 @@ import ( "github.com/uber/cadence/service/worker/archiver" ) -const ( - defaultProcessingQueueLevel = 0 -) - var ( errUnexpectedQueueTask = errors.New("unexpected queue task") errProcessorShutdown = errors.New("queue processor has been shutdown") @@ -60,29 +56,27 @@ var ( maximumTransferTaskKey = newTransferTaskKey(math.MaxInt64) ) -type ( - transferQueueProcessor struct { - shard shard.Context - historyEngine engine.Engine - taskProcessor task.Processor +type transferQueueProcessor struct { + shard shard.Context + historyEngine engine.Engine + taskProcessor task.Processor - config *config.Config - currentClusterName string + config *config.Config + currentClusterName string - metricsClient metrics.Client - logger log.Logger + metricsClient metrics.Client + logger log.Logger - status int32 - shutdownChan chan struct{} - shutdownWG sync.WaitGroup + status int32 + shutdownChan chan struct{} + shutdownWG sync.WaitGroup - ackLevel int64 - taskAllocator TaskAllocator - activeTaskExecutor task.Executor - activeQueueProcessor *transferQueueProcessorBase - standbyQueueProcessors map[string]*transferQueueProcessorBase - } -) + ackLevel int64 + taskAllocator TaskAllocator + activeTaskExecutor task.Executor + activeQueueProcessor *transferQueueProcessorBase + standbyQueueProcessors map[string]*transferQueueProcessorBase +} // NewTransferQueueProcessor creates a new transfer QueueProcessor func NewTransferQueueProcessor( diff --git a/service/history/queue/transfer_queue_processor_base.go b/service/history/queue/transfer_queue_processor_base.go index 6d335f76f1b..598a5c3c06f 100644 --- a/service/history/queue/transfer_queue_processor_base.go +++ b/service/history/queue/transfer_queue_processor_base.go @@ -543,9 +543,7 @@ func (t *transferQueueProcessorBase) readTasks( return response.Tasks, len(response.NextPageToken) != 0, nil } -func newTransferTaskKey( - taskID int64, -) task.Key { +func newTransferTaskKey(taskID int64) task.Key { return transferTaskKey{ taskID: taskID, } diff --git a/service/history/queue/transfer_queue_validator.go b/service/history/queue/transfer_queue_validator.go index 1b6044682e3..fd6b646780f 100644 --- a/service/history/queue/transfer_queue_validator.go +++ b/service/history/queue/transfer_queue_validator.go @@ -84,9 +84,7 @@ func newTransferQueueValidator( } } -func (v *transferQueueValidator) addTasks( - info *hcommon.NotifyTaskInfo, -) { +func (v *transferQueueValidator) addTasks(info *hcommon.NotifyTaskInfo) { v.Lock() defer v.Unlock()