Skip to content

Commit

Permalink
Synchronous shutdown support for replication task fetcher (cadence-wo…
Browse files Browse the repository at this point in the history
  • Loading branch information
taylanisikdemir authored Dec 27, 2023
1 parent 629fbec commit 58c0658
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 19 deletions.
13 changes: 12 additions & 1 deletion common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1602,8 +1602,14 @@ const (
// KeyName: history.queueProcessorEnableGracefulSyncShutdown
// Value type: Bool
// Default value: false
// Allowed filters: ShardID
// Allowed filters: N/A
QueueProcessorEnableGracefulSyncShutdown
// ReplicationTaskFetcherEnableGracefulSyncShutdown indicates whether task fetcher should be shutdown gracefully & synchronously
// KeyName: history.replicationTaskFetcherEnableGracefulSyncShutdown
// Value type: Bool
// Default value: false
// Allowed filters: N/A
ReplicationTaskFetcherEnableGracefulSyncShutdown
// TransferProcessorEnableValidator is whether validator should be enabled for transferQueueProcessor
// KeyName: history.transferProcessorEnableValidator
// Value type: Bool
Expand Down Expand Up @@ -3839,6 +3845,11 @@ var BoolKeys = map[BoolKey]DynamicBool{
Description: "QueueProcessorEnableGracefulSyncShutdown indicates whether processing queue should be shutdown gracefully & synchronously",
DefaultValue: false,
},
ReplicationTaskFetcherEnableGracefulSyncShutdown: DynamicBool{
KeyName: "history.replicationTaskFetcherEnableGracefulSyncShutdown",
Description: "ReplicationTaskFetcherEnableGracefulSyncShutdown is whether we should gracefully drain replication task fetcher on shutdown",
DefaultValue: false,
},
TransferProcessorEnableValidator: DynamicBool{
KeyName: "history.transferProcessorEnableValidator",
Description: "TransferProcessorEnableValidator is whether validator should be enabled for transferQueueProcessor",
Expand Down
4 changes: 4 additions & 0 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ type Config struct {
ReplicationTaskFetcherTimerJitterCoefficient dynamicconfig.FloatPropertyFn
ReplicationTaskFetcherErrorRetryWait dynamicconfig.DurationPropertyFn
ReplicationTaskFetcherServiceBusyWait dynamicconfig.DurationPropertyFn
ReplicationTaskFetcherEnableGracefulSyncShutdown dynamicconfig.BoolPropertyFn
ReplicationTaskProcessorErrorRetryWait dynamicconfig.DurationPropertyFnWithShardIDFilter
ReplicationTaskProcessorErrorRetryMaxAttempts dynamicconfig.IntPropertyFnWithShardIDFilter
ReplicationTaskProcessorErrorSecondRetryWait dynamicconfig.DurationPropertyFnWithShardIDFilter
Expand Down Expand Up @@ -527,6 +528,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, s
ReplicationTaskFetcherTimerJitterCoefficient: dc.GetFloat64Property(dynamicconfig.ReplicationTaskFetcherTimerJitterCoefficient),
ReplicationTaskFetcherErrorRetryWait: dc.GetDurationProperty(dynamicconfig.ReplicationTaskFetcherErrorRetryWait),
ReplicationTaskFetcherServiceBusyWait: dc.GetDurationProperty(dynamicconfig.ReplicationTaskFetcherServiceBusyWait),
ReplicationTaskFetcherEnableGracefulSyncShutdown: dc.GetBoolProperty(dynamicconfig.ReplicationTaskFetcherEnableGracefulSyncShutdown),
ReplicationTaskProcessorErrorRetryWait: dc.GetDurationPropertyFilteredByShardID(dynamicconfig.ReplicationTaskProcessorErrorRetryWait),
ReplicationTaskProcessorErrorRetryMaxAttempts: dc.GetIntPropertyFilteredByShardID(dynamicconfig.ReplicationTaskProcessorErrorRetryMaxAttempts),
ReplicationTaskProcessorErrorSecondRetryWait: dc.GetDurationPropertyFilteredByShardID(dynamicconfig.ReplicationTaskProcessorErrorSecondRetryWait),
Expand Down Expand Up @@ -611,6 +613,7 @@ func NewForTestByShardNumber(shardNumber int) *Config {
"1": 50,
}))
panicIfErr(inMem.UpdateValue(dynamicconfig.QueueProcessorRandomSplitProbability, 0.5))
panicIfErr(inMem.UpdateValue(dynamicconfig.ReplicationTaskFetcherEnableGracefulSyncShutdown, true))

dc := dynamicconfig.NewCollection(inMem, log.NewNoop())
config := New(dc, shardNumber, 1024*1024, config.StoreTypeCassandra, false, "")
Expand All @@ -630,6 +633,7 @@ func NewForTestByShardNumber(shardNumber int) *Config {
config.QueueProcessorPendingTaskSplitThreshold = dc.GetMapProperty(dynamicconfig.QueueProcessorPendingTaskSplitThreshold)
config.QueueProcessorStuckTaskSplitThreshold = dc.GetMapProperty(dynamicconfig.QueueProcessorStuckTaskSplitThreshold)
config.QueueProcessorRandomSplitProbability = dc.GetFloat64Property(dynamicconfig.QueueProcessorRandomSplitProbability)
config.ReplicationTaskFetcherEnableGracefulSyncShutdown = dc.GetBoolProperty(dynamicconfig.ReplicationTaskFetcherEnableGracefulSyncShutdown)
return config
}

Expand Down
46 changes: 28 additions & 18 deletions service/history/replication/task_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ type (
remotePeer admin.Client
rateLimiter *quotas.DynamicRateLimiter
requestChan chan *request
done chan struct{}
ctx context.Context
cancelCtx context.CancelFunc
stoppedCh chan struct{}
}

// taskFetchersImpl is a group of fetchers, one per source DC.
Expand All @@ -92,9 +94,7 @@ func NewTaskFetchers(
clusterMetadata cluster.Metadata,
clientBean client.Bean,
) TaskFetchers {

currentCluster := clusterMetadata.GetCurrentClusterName()

var fetchers []TaskFetcher
for clusterName := range clusterMetadata.GetRemoteClusterInfo() {
remoteFrontendClient := clientBean.GetRemoteAdminClient(clusterName)
Expand Down Expand Up @@ -124,7 +124,7 @@ func (f *taskFetchersImpl) Start() {
for _, fetcher := range f.fetchers {
fetcher.Start()
}
f.logger.Info("Replication task fetchers started.")
f.logger.Info("Replication task fetchers started.", tag.Counter(len(f.fetchers)))
}

// Stop stops the fetchers
Expand All @@ -136,7 +136,7 @@ func (f *taskFetchersImpl) Stop() {
for _, fetcher := range f.fetchers {
fetcher.Stop()
}
f.logger.Info("Replication task fetchers stopped.")
f.logger.Info("Replication task fetchers stopped.", tag.Counter(len(f.fetchers)))
}

// GetFetchers returns all the fetchers
Expand All @@ -152,7 +152,7 @@ func newReplicationTaskFetcher(
config *config.Config,
sourceFrontend admin.Client,
) TaskFetcher {

ctx, cancel := context.WithCancel(context.Background())
return &taskFetcherImpl{
status: common.DaemonStatusInitialized,
config: config,
Expand All @@ -162,7 +162,9 @@ func newReplicationTaskFetcher(
sourceCluster: sourceCluster,
rateLimiter: quotas.NewDynamicRateLimiter(config.ReplicationTaskProcessorHostQPS.AsFloat64()),
requestChan: make(chan *request, requestChanBufferSize),
done: make(chan struct{}),
ctx: ctx,
cancelCtx: cancel,
stoppedCh: make(chan struct{}),
}
}

Expand All @@ -184,7 +186,11 @@ func (f *taskFetcherImpl) Stop() {
return
}

close(f.done)
f.cancelCtx()
if f.config.ReplicationTaskFetcherEnableGracefulSyncShutdown() {
f.logger.Debug("Replication task fetcher is waiting on stoppedCh before shutting down")
<-f.stoppedCh
}
f.logger.Info("Replication task fetcher stopped.")
}

Expand All @@ -194,9 +200,10 @@ func (f *taskFetcherImpl) fetchTasks() {
f.config.ReplicationTaskFetcherAggregationInterval(),
f.config.ReplicationTaskFetcherTimerJitterCoefficient(),
))
defer timer.Stop()
defer close(f.stoppedCh)

requestByShard := make(map[int32]*request)

for {
select {
case request := <-f.requestChan:
Expand Down Expand Up @@ -230,8 +237,7 @@ func (f *taskFetcherImpl) fetchTasks() {
f.config.ReplicationTaskFetcherTimerJitterCoefficient(),
))
}
case <-f.done:
timer.Stop()
case <-f.ctx.Done():
return
}
}
Expand All @@ -248,31 +254,35 @@ func (f *taskFetcherImpl) fetchAndDistributeTasks(requestByShard map[int32]*requ
if err != nil {
if _, ok := err.(*types.ServiceBusyError); !ok {
f.logger.Error("Failed to get replication tasks", tag.Error(err))
return err
} else {
f.logger.Debug("Failed to get replication tasks because service busy")
}

return err
}

f.logger.Debug("Successfully fetched replication tasks.", tag.Counter(len(messagesByShard)))

for shardID, tasks := range messagesByShard {
request := requestByShard[shardID]
request.respChan <- tasks
close(request.respChan)
delete(requestByShard, shardID)
}

return err
return nil
}

func (f *taskFetcherImpl) getMessages(
requestByShard map[int32]*request,
) (map[int32]*types.ReplicationMessages, error) {
func (f *taskFetcherImpl) getMessages(requestByShard map[int32]*request) (map[int32]*types.ReplicationMessages, error) {
var tokens []*types.ReplicationToken
for _, request := range requestByShard {
tokens = append(tokens, request.token)
}

ctx, cancel := context.WithTimeout(context.Background(), fetchTaskRequestTimeout)
parentCtx := f.ctx
if !f.config.ReplicationTaskFetcherEnableGracefulSyncShutdown() {
parentCtx = context.Background()
}
ctx, cancel := context.WithTimeout(parentCtx, fetchTaskRequestTimeout)
defer cancel()

request := &types.GetReplicationMessagesRequest{
Expand Down

0 comments on commit 58c0658

Please sign in to comment.