diff --git a/common/service/dynamicconfig/constants.go b/common/service/dynamicconfig/constants.go index e4e8f518ebb..23e9e96cdf6 100644 --- a/common/service/dynamicconfig/constants.go +++ b/common/service/dynamicconfig/constants.go @@ -136,7 +136,9 @@ var keys = map[Key]string{ AdminOperationToken: "history.adminOperationToken", EnableEventsV2: "history.enableEventsV2", - WorkerPersistenceMaxQPS: "worker.persistenceMaxQPS", + WorkerPersistenceMaxQPS: "worker.persistenceMaxQPS", + WorkerReplicatorConcurrency: "worker.replicatorConcurrency", + WorkerReplicationTaskMaxRetry: "worker.replicationTaskMaxRetry", } const ( @@ -331,6 +333,10 @@ const ( // WorkerPersistenceMaxQPS is the max qps worker host can query DB WorkerPersistenceMaxQPS + // WorkerReplicatorConcurrency is the max concurrenct tasks to be processed at any given time + WorkerReplicatorConcurrency + // WorkerReplicationTaskMaxRetry is the max retry for any task + WorkerReplicationTaskMaxRetry // lastKeyForTest must be the last one in this const group for testing purpose lastKeyForTest diff --git a/host/onebox.go b/host/onebox.go index ae904090ccc..9c84a885527 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -21,14 +21,13 @@ package host import ( + "errors" "flag" "fmt" "reflect" "sync" "time" - "errors" - "github.com/stretchr/testify/mock" "github.com/uber-common/bark" "github.com/uber-go/tally" @@ -395,7 +394,7 @@ func (c *cadenceImpl) startWorker(rpHosts []string, startWG *sync.WaitGroup) { metadataManager := persistence.NewMetadataPersistenceMetricsClient(c.metadataMgrV2, service.GetMetricsClient(), c.logger) workerConfig := worker.NewConfig(dynamicconfig.NewNopCollection()) - workerConfig.ReplicatorConcurrency = 10 + workerConfig.ReplicatorConcurrency = dynamicconfig.GetIntPropertyFn(10) c.replicator = worker.NewReplicator(c.clusterMetadata, metadataManager, historyClient, workerConfig, c.messagingClient, c.logger, service.GetMetricsClient()) if err := c.replicator.Start(); err != nil { diff --git a/service/worker/processor.go b/service/worker/processor.go index 63b3fc24fa3..d6d5597e56b 100644 --- a/service/worker/processor.go +++ b/service/worker/processor.go @@ -119,7 +119,7 @@ func (p *replicationTaskProcessor) Start() error { } logging.LogReplicationTaskProcessorStartingEvent(p.logger) - consumer, err := p.client.NewConsumer(p.currentCluster, p.sourceCluster, p.consumerName, p.config.ReplicatorConcurrency) + consumer, err := p.client.NewConsumer(p.currentCluster, p.sourceCluster, p.consumerName, p.config.ReplicatorConcurrency()) if err != nil { logging.LogReplicationTaskProcessorStartFailedEvent(p.logger, err) return err @@ -159,7 +159,7 @@ func (p *replicationTaskProcessor) processorPump() { defer p.shutdownWG.Done() var workerWG sync.WaitGroup - for workerID := 0; workerID < p.config.ReplicatorConcurrency; workerID++ { + for workerID := 0; workerID < p.config.ReplicatorConcurrency(); workerID++ { workerWG.Add(1) go p.messageProcessLoop(&workerWG, workerID) } @@ -200,7 +200,7 @@ func (p *replicationTaskProcessor) processWithRetry(msg messaging.Message, worke }) forceBuffer := false - remainingRetryCount := p.config.ReplicationTaskMaxRetry + remainingRetryCount := p.config.ReplicationTaskMaxRetry() attempt := 0 op := func() error { diff --git a/service/worker/service.go b/service/worker/service.go index 014aac0fe52..5f56e6dc670 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -54,9 +54,9 @@ type ( Config struct { // Replicator settings PersistenceMaxQPS dynamicconfig.IntPropertyFn - ReplicatorConcurrency int + ReplicatorConcurrency dynamicconfig.IntPropertyFn ReplicatorBufferRetryCount int - ReplicationTaskMaxRetry int + ReplicationTaskMaxRetry dynamicconfig.IntPropertyFn } ) @@ -74,9 +74,9 @@ func NewService(params *service.BootstrapParams) common.Daemon { func NewConfig(dc *dynamicconfig.Collection) *Config { return &Config{ PersistenceMaxQPS: dc.GetIntProperty(dynamicconfig.WorkerPersistenceMaxQPS, 500), - ReplicatorConcurrency: 1000, + ReplicatorConcurrency: dc.GetIntProperty(dynamicconfig.WorkerReplicatorConcurrency, 1000), ReplicatorBufferRetryCount: 8, - ReplicationTaskMaxRetry: 50, + ReplicationTaskMaxRetry: dc.GetIntProperty(dynamicconfig.WorkerReplicationTaskMaxRetry, 50), } }