Skip to content

Commit

Permalink
Stop orphan failover queue processors when its parent stops (cadence-…
Browse files Browse the repository at this point in the history
  • Loading branch information
taylanisikdemir authored May 10, 2024
1 parent b76a4d8 commit 795aec1
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 24 deletions.
2 changes: 0 additions & 2 deletions service/history/queue/timer_queue_failover_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,13 @@ import (
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/service/history/engine"
"github.com/uber/cadence/service/history/shard"
"github.com/uber/cadence/service/history/task"
)

func newTimerQueueFailoverProcessor(
standbyClusterName string,
shardContext shard.Context,
historyEngine engine.Engine,
taskProcessor task.Processor,
taskAllocator TaskAllocator,
taskExecutor task.Executor,
Expand Down
37 changes: 26 additions & 11 deletions service/history/queue/timer_queue_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,14 @@ type timerQueueProcessor struct {
shutdownChan chan struct{}
shutdownWG sync.WaitGroup

ackLevel time.Time
taskAllocator TaskAllocator
activeTaskExecutor task.Executor
activeQueueProcessor *timerQueueProcessorBase
standbyQueueProcessors map[string]*timerQueueProcessorBase
standbyTaskExecutors []task.Executor
standbyQueueTimerGates map[string]RemoteTimerGate
ackLevel time.Time
taskAllocator TaskAllocator
activeTaskExecutor task.Executor
activeQueueProcessor *timerQueueProcessorBase
standbyQueueProcessors map[string]*timerQueueProcessorBase
standbyTaskExecutors []task.Executor
standbyQueueTimerGates map[string]RemoteTimerGate
failoverQueueProcessors []*timerQueueProcessorBase
}

// NewTimerQueueProcessor creates a new timer QueueProcessor
Expand Down Expand Up @@ -206,7 +207,7 @@ func (t *timerQueueProcessor) Stop() {
t.logger.Warn("transferQueueProcessor timed out on shut down", tag.LifeCycleStopTimedout)
}
t.activeQueueProcessor.Stop()
t.activeTaskExecutor.Stop()

for _, standbyQueueProcessor := range t.standbyQueueProcessors {
standbyQueueProcessor.Stop()
}
Expand All @@ -215,6 +216,15 @@ func (t *timerQueueProcessor) Stop() {
for _, standbyTaskExecutor := range t.standbyTaskExecutors {
standbyTaskExecutor.Stop()
}

if len(t.failoverQueueProcessors) > 0 {
t.logger.Info("Shutting down failover timer queues", tag.Counter(len(t.failoverQueueProcessors)))
for _, failoverQueueProcessor := range t.failoverQueueProcessors {
failoverQueueProcessor.Stop()
}
}

t.activeTaskExecutor.Stop()
}

func (t *timerQueueProcessor) NotifyNewTask(clusterName string, info *hcommon.NotifyTaskInfo) {
Expand Down Expand Up @@ -300,7 +310,6 @@ func (t *timerQueueProcessor) FailoverDomain(domainIDs map[string]struct{}) {
updateClusterAckLevelFn, failoverQueueProcessor := newTimerQueueFailoverProcessor(
standbyClusterName,
t.shard,
t.historyEngine,
t.taskProcessor,
t.taskAllocator,
t.activeTaskExecutor,
Expand All @@ -316,6 +325,12 @@ func (t *timerQueueProcessor) FailoverDomain(domainIDs map[string]struct{}) {
if err != nil {
t.logger.Error("Error update shard ack level", tag.Error(err))
}

// Failover queue processors are started on the fly when domains are failed over.
// Failover queue processors will be stopped when the timer queue instance is stopped (due to restart or shard movement).
// This means the failover queue processor might not finish its job.
// There is no mechanism to re-start ongoing failover queue processors in the new shard owner.
t.failoverQueueProcessors = append(t.failoverQueueProcessors, failoverQueueProcessor)
failoverQueueProcessor.Start()
}

Expand Down Expand Up @@ -367,7 +382,7 @@ func (t *timerQueueProcessor) UnlockTaskProcessing() {
func (t *timerQueueProcessor) drain() {
if !t.shard.GetConfig().QueueProcessorEnableGracefulSyncShutdown() {
if err := t.completeTimer(context.Background()); err != nil {
t.logger.Error("Failed to complete timer task during shutdown", tag.Error(err))
t.logger.Error("Failed to complete timer task during drain", tag.Error(err))
}
return
}
Expand All @@ -376,7 +391,7 @@ func (t *timerQueueProcessor) drain() {
ctx, cancel := context.WithTimeout(context.Background(), gracefulShutdownTimeout)
defer cancel()
if err := t.completeTimer(ctx); err != nil {
t.logger.Error("Failed to complete timer task during shutdown", tag.Error(err))
t.logger.Error("Failed to complete timer task during drain", tag.Error(err))
}
}

Expand Down
30 changes: 19 additions & 11 deletions service/history/queue/transfer_queue_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,12 @@ type transferQueueProcessor struct {
shutdownChan chan struct{}
shutdownWG sync.WaitGroup

ackLevel int64
taskAllocator TaskAllocator
activeTaskExecutor task.Executor
activeQueueProcessor *transferQueueProcessorBase
standbyQueueProcessors map[string]*transferQueueProcessorBase
ackLevel int64
taskAllocator TaskAllocator
activeTaskExecutor task.Executor
activeQueueProcessor *transferQueueProcessorBase
standbyQueueProcessors map[string]*transferQueueProcessorBase
failoverQueueProcessors []*transferQueueProcessorBase
}

// NewTransferQueueProcessor creates a new transfer QueueProcessor
Expand Down Expand Up @@ -110,7 +111,6 @@ func NewTransferQueueProcessor(

activeQueueProcessor := newTransferQueueActiveProcessor(
shard,
historyEngine,
taskProcessor,
taskAllocator,
activeTaskExecutor,
Expand Down Expand Up @@ -141,7 +141,6 @@ func NewTransferQueueProcessor(
standbyQueueProcessors[clusterName] = newTransferQueueStandbyProcessor(
clusterName,
shard,
historyEngine,
taskProcessor,
taskAllocator,
standbyTaskExecutor,
Expand Down Expand Up @@ -206,6 +205,13 @@ func (t *transferQueueProcessor) Stop() {
for _, standbyQueueProcessor := range t.standbyQueueProcessors {
standbyQueueProcessor.Stop()
}

if len(t.failoverQueueProcessors) > 0 {
t.logger.Info("Shutting down failover transfer queues", tag.Counter(len(t.failoverQueueProcessors)))
for _, failoverQueueProcessor := range t.failoverQueueProcessors {
failoverQueueProcessor.Stop()
}
}
}

func (t *transferQueueProcessor) NotifyNewTask(clusterName string, info *hcommon.NotifyTaskInfo) {
Expand Down Expand Up @@ -270,7 +276,6 @@ func (t *transferQueueProcessor) FailoverDomain(domainIDs map[string]struct{}) {

updateShardAckLevel, failoverQueueProcessor := newTransferQueueFailoverProcessor(
t.shard,
t.historyEngine,
t.taskProcessor,
t.taskAllocator,
t.activeTaskExecutor,
Expand All @@ -287,6 +292,12 @@ func (t *transferQueueProcessor) FailoverDomain(domainIDs map[string]struct{}) {
if err != nil {
t.logger.Error("Error update shard ack level", tag.Error(err))
}

// Failover queue processors are started on the fly when domains are failed over.
// Failover queue processors will be stopped when the transfer queue instance is stopped (due to restart or shard movement).
// This means the failover queue processor might not finish its job.
// There is no mechanism to re-start ongoing failover queue processors in the new shard owner.
t.failoverQueueProcessors = append(t.failoverQueueProcessors, failoverQueueProcessor)
failoverQueueProcessor.Start()
}

Expand Down Expand Up @@ -456,7 +467,6 @@ func (t *transferQueueProcessor) completeTransfer() error {

func newTransferQueueActiveProcessor(
shard shard.Context,
historyEngine engine.Engine,
taskProcessor task.Processor,
taskAllocator TaskAllocator,
taskExecutor task.Executor,
Expand Down Expand Up @@ -517,7 +527,6 @@ func newTransferQueueActiveProcessor(
func newTransferQueueStandbyProcessor(
clusterName string,
shard shard.Context,
historyEngine engine.Engine,
taskProcessor task.Processor,
taskAllocator TaskAllocator,
taskExecutor task.Executor,
Expand Down Expand Up @@ -594,7 +603,6 @@ func newTransferQueueStandbyProcessor(

func newTransferQueueFailoverProcessor(
shardContext shard.Context,
historyEngine engine.Engine,
taskProcessor task.Processor,
taskAllocator TaskAllocator,
taskExecutor task.Executor,
Expand Down

0 comments on commit 795aec1

Please sign in to comment.