Skip to content

Commit

Permalink
Make worker config dynamic (cadence-workflow#1253)
Browse files Browse the repository at this point in the history
  • Loading branch information
yiminc authored Nov 15, 2018
1 parent 5100907 commit 272acd8
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 11 deletions.
8 changes: 7 additions & 1 deletion common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ var keys = map[Key]string{
AdminOperationToken: "history.adminOperationToken",
EnableEventsV2: "history.enableEventsV2",

WorkerPersistenceMaxQPS: "worker.persistenceMaxQPS",
WorkerPersistenceMaxQPS: "worker.persistenceMaxQPS",
WorkerReplicatorConcurrency: "worker.replicatorConcurrency",
WorkerReplicationTaskMaxRetry: "worker.replicationTaskMaxRetry",
}

const (
Expand Down Expand Up @@ -331,6 +333,10 @@ const (

// WorkerPersistenceMaxQPS is the max qps worker host can query DB
WorkerPersistenceMaxQPS
// WorkerReplicatorConcurrency is the max concurrenct tasks to be processed at any given time
WorkerReplicatorConcurrency
// WorkerReplicationTaskMaxRetry is the max retry for any task
WorkerReplicationTaskMaxRetry

// lastKeyForTest must be the last one in this const group for testing purpose
lastKeyForTest
Expand Down
5 changes: 2 additions & 3 deletions host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@
package host

import (
"errors"
"flag"
"fmt"
"reflect"
"sync"
"time"

"errors"

"github.com/stretchr/testify/mock"
"github.com/uber-common/bark"
"github.com/uber-go/tally"
Expand Down Expand Up @@ -395,7 +394,7 @@ func (c *cadenceImpl) startWorker(rpHosts []string, startWG *sync.WaitGroup) {
metadataManager := persistence.NewMetadataPersistenceMetricsClient(c.metadataMgrV2, service.GetMetricsClient(), c.logger)

workerConfig := worker.NewConfig(dynamicconfig.NewNopCollection())
workerConfig.ReplicatorConcurrency = 10
workerConfig.ReplicatorConcurrency = dynamicconfig.GetIntPropertyFn(10)
c.replicator = worker.NewReplicator(c.clusterMetadata, metadataManager, historyClient,
workerConfig, c.messagingClient, c.logger, service.GetMetricsClient())
if err := c.replicator.Start(); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions service/worker/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (p *replicationTaskProcessor) Start() error {
}

logging.LogReplicationTaskProcessorStartingEvent(p.logger)
consumer, err := p.client.NewConsumer(p.currentCluster, p.sourceCluster, p.consumerName, p.config.ReplicatorConcurrency)
consumer, err := p.client.NewConsumer(p.currentCluster, p.sourceCluster, p.consumerName, p.config.ReplicatorConcurrency())
if err != nil {
logging.LogReplicationTaskProcessorStartFailedEvent(p.logger, err)
return err
Expand Down Expand Up @@ -159,7 +159,7 @@ func (p *replicationTaskProcessor) processorPump() {
defer p.shutdownWG.Done()

var workerWG sync.WaitGroup
for workerID := 0; workerID < p.config.ReplicatorConcurrency; workerID++ {
for workerID := 0; workerID < p.config.ReplicatorConcurrency(); workerID++ {
workerWG.Add(1)
go p.messageProcessLoop(&workerWG, workerID)
}
Expand Down Expand Up @@ -200,7 +200,7 @@ func (p *replicationTaskProcessor) processWithRetry(msg messaging.Message, worke
})

forceBuffer := false
remainingRetryCount := p.config.ReplicationTaskMaxRetry
remainingRetryCount := p.config.ReplicationTaskMaxRetry()

attempt := 0
op := func() error {
Expand Down
8 changes: 4 additions & 4 deletions service/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ type (
Config struct {
// Replicator settings
PersistenceMaxQPS dynamicconfig.IntPropertyFn
ReplicatorConcurrency int
ReplicatorConcurrency dynamicconfig.IntPropertyFn
ReplicatorBufferRetryCount int
ReplicationTaskMaxRetry int
ReplicationTaskMaxRetry dynamicconfig.IntPropertyFn
}
)

Expand All @@ -74,9 +74,9 @@ func NewService(params *service.BootstrapParams) common.Daemon {
func NewConfig(dc *dynamicconfig.Collection) *Config {
return &Config{
PersistenceMaxQPS: dc.GetIntProperty(dynamicconfig.WorkerPersistenceMaxQPS, 500),
ReplicatorConcurrency: 1000,
ReplicatorConcurrency: dc.GetIntProperty(dynamicconfig.WorkerReplicatorConcurrency, 1000),
ReplicatorBufferRetryCount: 8,
ReplicationTaskMaxRetry: 50,
ReplicationTaskMaxRetry: dc.GetIntProperty(dynamicconfig.WorkerReplicationTaskMaxRetry, 50),
}
}

Expand Down

0 comments on commit 272acd8

Please sign in to comment.