Skip to content

Commit

Permalink
Add feature flags for RPC replication migration (cadence-workflow#3216)
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 authored Apr 24, 2020
1 parent d6cbc1a commit a8f7c49
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 4 deletions.
12 changes: 12 additions & 0 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ var keys = map[Key]string{
EnableClientVersionCheck: "frontend.enableClientVersionCheck",
ValidSearchAttributes: "frontend.validSearchAttributes",
SendRawWorkflowHistory: "frontend.sendRawWorkflowHistory",
FrontendEnableRPCReplication: "frontend.enableRPCReplication",
SearchAttributesNumberOfKeysLimit: "frontend.searchAttributesNumberOfKeysLimit",
SearchAttributesSizeOfValueLimit: "frontend.searchAttributesSizeOfValueLimit",
SearchAttributesTotalSizeLimit: "frontend.searchAttributesTotalSizeLimit",
Expand Down Expand Up @@ -232,6 +233,8 @@ var keys = map[Key]string{
ReplicationTaskProcessorNoTaskInitialWait: "history.ReplicationTaskProcessorNoTaskInitialWait",
ReplicationTaskProcessorCleanupInterval: "history.ReplicationTaskProcessorCleanupInterval",
ReplicationTaskProcessorCleanupJitterCoefficient: "history.ReplicationTaskProcessorCleanupJitterCoefficient",
HistoryEnableRPCReplication: "history.EnableRPCReplication",
HistoryEnableKafkaReplication: "history.EnableKafkaReplication",
EnableConsistentQuery: "history.EnableConsistentQuery",
EnableConsistentQueryByDomain: "history.EnableConsistentQueryByDomain",
MaxBufferedQueryCount: "history.MaxBufferedQueryCount",
Expand All @@ -251,6 +254,7 @@ var keys = map[Key]string{
WorkerReplicationTaskMaxRetryDuration: "worker.replicationTaskMaxRetryDuration",
WorkerReplicationTaskContextDuration: "worker.replicationTaskContextDuration",
WorkerReReplicationContextTimeout: "worker.workerReReplicationContextTimeout",
WorkerEnableRPCReplication: "worker.enableWorkerRPCReplication",
WorkerIndexerConcurrency: "worker.indexerConcurrency",
WorkerESProcessorNumOfWorkers: "worker.ESProcessorNumOfWorkers",
WorkerESProcessorBulkActions: "worker.ESProcessorBulkActions",
Expand Down Expand Up @@ -382,6 +386,8 @@ const (
ValidSearchAttributes
// SendRawWorkflowHistory is whether to enable raw history retrieving
SendRawWorkflowHistory
// FrontendEnableRPCReplication is a feature flag for rpc replication
FrontendEnableRPCReplication
// SearchAttributesNumberOfKeysLimit is the limit of number of keys
SearchAttributesNumberOfKeysLimit
// SearchAttributesSizeOfValueLimit is the size limit of each value
Expand Down Expand Up @@ -657,6 +663,8 @@ const (
WorkerReplicationTaskContextDuration
// WorkerReReplicationContextTimeout is the context timeout for end to end re-replication process
WorkerReReplicationContextTimeout
// WorkerEnableRPCReplication is the feature flag for RPC replication
WorkerEnableRPCReplication
// WorkerIndexerConcurrency is the max concurrent messages to be processed at any given time
WorkerIndexerConcurrency
// WorkerESProcessorNumOfWorkers is num of workers for esProcessor
Expand Down Expand Up @@ -718,6 +726,10 @@ const (
ReplicationTaskProcessorCleanupInterval
// ReplicationTaskProcessorCleanupJitterCoefficient is the jitter for cleanup timer
ReplicationTaskProcessorCleanupJitterCoefficient
// HistoryEnableRPCReplication is the feature flag for RPC replication
HistoryEnableRPCReplication
// HistoryEnableKafkaReplication is the migration flag for Kafka replication
HistoryEnableKafkaReplication
// EnableConsistentQuery indicates if consistent query is enabled for the cluster
EnableConsistentQuery
// EnableConsistentQueryByDomain indicates if consistent query is enabled for a domain
Expand Down
4 changes: 4 additions & 0 deletions host/dynamicconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ var (
dynamicconfig.ReplicationTaskFetcherErrorRetryWait: 50 * time.Millisecond,
dynamicconfig.ReplicationTaskProcessorErrorRetryWait: time.Millisecond,
dynamicconfig.EnableConsistentQueryByDomain: true,
dynamicconfig.FrontendEnableRPCReplication: true,
dynamicconfig.HistoryEnableRPCReplication: true,
dynamicconfig.HistoryEnableKafkaReplication: false,
dynamicconfig.WorkerEnableRPCReplication: true,
}
)

Expand Down
7 changes: 6 additions & 1 deletion service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ type Config struct {
VisibilityArchivalQueryMaxPageSize dynamicconfig.IntPropertyFn

SendRawWorkflowHistory dynamicconfig.BoolPropertyFnWithDomainFilter

EnableRPCReplication dynamicconfig.BoolPropertyFn
}

// NewConfig returns new service config with default values
Expand Down Expand Up @@ -132,6 +134,7 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int, enableReadFro
VisibilityArchivalQueryMaxPageSize: dc.GetIntProperty(dynamicconfig.VisibilityArchivalQueryMaxPageSize, 10000),
DisallowQuery: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.DisallowQuery, false),
SendRawWorkflowHistory: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.SendRawWorkflowHistory, false),
EnableRPCReplication: dc.GetBoolProperty(dynamicconfig.FrontendEnableRPCReplication, false),
}
}

Expand Down Expand Up @@ -222,7 +225,9 @@ func (s *Service) Start() {
clusterMetadata := s.GetClusterMetadata()
if clusterMetadata.IsGlobalDomainEnabled() {
consumerConfig := clusterMetadata.GetReplicationConsumerConfig()
if consumerConfig != nil && consumerConfig.Type == config.ReplicationConsumerTypeRPC {
if consumerConfig != nil &&
consumerConfig.Type == config.ReplicationConsumerTypeRPC &&
s.config.EnableRPCReplication() {
replicationMessageSink = s.GetDomainReplicationQueue()
} else {
var err error
Expand Down
5 changes: 5 additions & 0 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ type Config struct {
ReplicationTaskProcessorNoTaskRetryWait dynamicconfig.DurationPropertyFn
ReplicationTaskProcessorCleanupInterval dynamicconfig.DurationPropertyFn
ReplicationTaskProcessorCleanupJitterCoefficient dynamicconfig.FloatPropertyFn
// TODO: those two flags are for migration. Consider remove them after the migration complete
EnableRPCReplication dynamicconfig.BoolPropertyFn
EnableKafkaReplication dynamicconfig.BoolPropertyFn

// The following are used by consistent query
EnableConsistentQuery dynamicconfig.BoolPropertyFn
Expand Down Expand Up @@ -366,6 +369,8 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA
ReplicationTaskProcessorNoTaskRetryWait: dc.GetDurationProperty(dynamicconfig.ReplicationTaskProcessorNoTaskInitialWait, 2*time.Second),
ReplicationTaskProcessorCleanupInterval: dc.GetDurationProperty(dynamicconfig.ReplicationTaskProcessorCleanupInterval, 1*time.Minute),
ReplicationTaskProcessorCleanupJitterCoefficient: dc.GetFloat64Property(dynamicconfig.ReplicationTaskProcessorCleanupJitterCoefficient, 0.15),
EnableRPCReplication: dc.GetBoolProperty(dynamicconfig.HistoryEnableRPCReplication, false),
EnableKafkaReplication: dc.GetBoolProperty(dynamicconfig.HistoryEnableKafkaReplication, true),

EnableConsistentQuery: dc.GetBoolProperty(dynamicconfig.EnableConsistentQuery, true),
EnableConsistentQueryByDomain: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.EnableConsistentQueryByDomain, false),
Expand Down
4 changes: 3 additions & 1 deletion service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,9 @@ func (e *historyEngineImpl) Start() {
e.timerProcessor.Start()

clusterMetadata := e.shard.GetClusterMetadata()
if e.replicatorProcessor != nil && clusterMetadata.GetReplicationConsumerConfig().Type != sconfig.ReplicationConsumerTypeRPC {
if e.replicatorProcessor != nil &&
clusterMetadata.GetReplicationConsumerConfig().Type != sconfig.ReplicationConsumerTypeRPC &&
e.config.EnableKafkaReplication() {
e.replicatorProcessor.Start()
}

Expand Down
2 changes: 1 addition & 1 deletion service/history/replication/task_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func NewTaskFetchers(
) TaskFetchers {

var fetchers []TaskFetcher
if consumerConfig.Type == serviceConfig.ReplicationConsumerTypeRPC {
if consumerConfig.Type == serviceConfig.ReplicationConsumerTypeRPC && config.EnableRPCReplication() {
for clusterName, info := range clusterMetadata.GetAllClusterInfo() {
if !info.Enabled {
continue
Expand Down
3 changes: 2 additions & 1 deletion service/worker/replicator/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type (
ReplicationTaskMaxRetryDuration dynamicconfig.DurationPropertyFn
ReplicationTaskContextTimeout dynamicconfig.DurationPropertyFn
ReReplicationContextTimeout dynamicconfig.DurationPropertyFnWithDomainIDFilter
EnableRPCReplication dynamicconfig.BoolPropertyFn
}
)

Expand Down Expand Up @@ -123,7 +124,7 @@ func (r *Replicator) Start() error {
}

if clusterName != currentClusterName {
if replicationConsumerConfig.Type == config.ReplicationConsumerTypeRPC {
if replicationConsumerConfig.Type == config.ReplicationConsumerTypeRPC && r.config.EnableRPCReplication() {
processor := newDomainReplicationMessageProcessor(
clusterName,
r.logger.WithTags(tag.ComponentReplicationTaskProcessor, tag.SourceCluster(clusterName)),
Expand Down
1 change: 1 addition & 0 deletions service/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func NewConfig(params *service.BootstrapParams) *Config {
ReplicationTaskMaxRetryDuration: dc.GetDurationProperty(dynamicconfig.WorkerReplicationTaskMaxRetryDuration, 15*time.Minute),
ReplicationTaskContextTimeout: dc.GetDurationProperty(dynamicconfig.WorkerReplicationTaskContextDuration, 30*time.Second),
ReReplicationContextTimeout: dc.GetDurationPropertyFilteredByDomainID(dynamicconfig.WorkerReReplicationContextTimeout, 0*time.Second),
EnableRPCReplication: dc.GetBoolProperty(dynamicconfig.WorkerEnableRPCReplication, false),
},
ArchiverConfig: &archiver.Config{
ArchiverConcurrency: dc.GetIntProperty(dynamicconfig.WorkerArchiverConcurrency, 50),
Expand Down

0 comments on commit a8f7c49

Please sign in to comment.