Skip to content

Commit

Permalink
Replication task generation delay (cadence-workflow#3465)
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 authored Aug 20, 2020
1 parent 0325ee5 commit 4ac545c
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 5 deletions.
3 changes: 3 additions & 0 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ var keys = map[Key]string{
ReplicationTaskProcessorStartWaitJitterCoefficient: "history.ReplicationTaskProcessorStartWaitJitterCoefficient",
ReplicationTaskProcessorHostQPS: "history.ReplicationTaskProcessorHostQPS",
ReplicationTaskProcessorShardQPS: "history.ReplicationTaskProcessorShardQPS",
ReplicationTaskGenerationQPS: "history.ReplicationTaskGenerationQPS",
EnableConsistentQuery: "history.EnableConsistentQuery",
EnableConsistentQueryByDomain: "history.EnableConsistentQueryByDomain",
MaxBufferedQueryCount: "history.MaxBufferedQueryCount",
Expand Down Expand Up @@ -868,6 +869,8 @@ const (
ReplicationTaskProcessorHostQPS
// ReplicationTaskProcessorShardQPS is the qps of task processing rate limiter on shard level
ReplicationTaskProcessorShardQPS
//ReplicationTaskGenerationQPS is the wait time between each replication task generation qps
ReplicationTaskGenerationQPS
// EnableConsistentQuery indicates if consistent query is enabled for the cluster
EnableConsistentQuery
// EnableConsistentQueryByDomain indicates if consistent query is enabled for a domain
Expand Down
8 changes: 4 additions & 4 deletions service/frontend/adminHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ func (s *adminHandlerSuite) SetupTest() {
},
}
config := &Config{
EnableAdminProtection: dynamicconfig.GetBoolPropertyFn(false),
EnableGracefulFailover: dynamicconfig.GetBoolPropertyFn(false),
EnableAdminProtection: dynamicconfig.GetBoolPropertyFn(false),
EnableGracefulFailover: dynamicconfig.GetBoolPropertyFn(false),
}
s.handler = NewAdminHandler(s.mockResource, params, config)
s.handler.Start()
Expand Down Expand Up @@ -553,8 +553,8 @@ func (s *adminHandlerSuite) Test_AddSearchAttribute_Permission() {
ctx := context.Background()
handler := s.handler
handler.config = &Config{
EnableAdminProtection: dynamicconfig.GetBoolPropertyFn(true),
AdminOperationToken: dynamicconfig.GetStringPropertyFn(common.DefaultAdminOperationToken),
EnableAdminProtection: dynamicconfig.GetBoolPropertyFn(true),
AdminOperationToken: dynamicconfig.GetStringPropertyFn(common.DefaultAdminOperationToken),
}

type test struct {
Expand Down
2 changes: 2 additions & 0 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ type Config struct {
ReplicationTaskProcessorStartWaitJitterCoefficient dynamicconfig.FloatPropertyFnWithShardIDFilter
ReplicationTaskProcessorHostQPS dynamicconfig.FloatPropertyFn
ReplicationTaskProcessorShardQPS dynamicconfig.FloatPropertyFn
ReplicationTaskGenerationQPS dynamicconfig.FloatPropertyFn

// The following are used by consistent query
EnableConsistentQuery dynamicconfig.BoolPropertyFn
Expand Down Expand Up @@ -445,6 +446,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA
ReplicationTaskProcessorStartWaitJitterCoefficient: dc.GetFloat64PropertyFilteredByShardID(dynamicconfig.ReplicationTaskProcessorStartWaitJitterCoefficient, 0.9),
ReplicationTaskProcessorHostQPS: dc.GetFloat64Property(dynamicconfig.ReplicationTaskProcessorHostQPS, 1500),
ReplicationTaskProcessorShardQPS: dc.GetFloat64Property(dynamicconfig.ReplicationTaskProcessorShardQPS, 5),
ReplicationTaskGenerationQPS: dc.GetFloat64Property(dynamicconfig.ReplicationTaskGenerationQPS, 100),

EnableConsistentQuery: dc.GetBoolProperty(dynamicconfig.EnableConsistentQuery, true),
EnableConsistentQueryByDomain: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableConsistentQueryByDomain, false),
Expand Down
7 changes: 6 additions & 1 deletion service/history/replicatorQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/quotas"
"github.com/uber/cadence/common/service/dynamicconfig"
"github.com/uber/cadence/service/history/execution"
"github.com/uber/cadence/service/history/shard"
Expand All @@ -63,6 +64,7 @@ type (
queueAckMgr

lastShardSyncTimestamp time.Time
rateLimiter *quotas.DynamicRateLimiter
}
)

Expand Down Expand Up @@ -142,7 +144,9 @@ func newReplicatorQueueProcessor(
)
processor.queueAckMgr = queueAckMgr
processor.queueProcessorBase = queueProcessorBase

processor.rateLimiter = quotas.NewDynamicRateLimiter(func() float64 {
return config.ReplicationTaskGenerationQPS()
})
return processor
}

Expand Down Expand Up @@ -451,6 +455,7 @@ func (p *replicatorQueueProcessorImpl) getTasks(
var replicationTasks []*replicator.ReplicationTask
readLevel := lastReadTaskID
for _, taskInfo := range taskInfoList {
_ = p.rateLimiter.Wait(ctx)
var replicationTask *replicator.ReplicationTask
op := func() error {
var err error
Expand Down

0 comments on commit 4ac545c

Please sign in to comment.