From fb8e78277655cebea360ec5c3ab14f323db57600 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Mon, 30 Aug 2021 15:03:13 -0700 Subject: [PATCH] Add feature flag for scheduling cross-cluster operations (#4424) --- common/dynamicconfig/constants.go | 13 ++++-- service/history/config/config.go | 4 ++ service/history/decision/checker.go | 51 ++++++++++++++------- service/history/decision/checker_test.go | 56 +++++++++++++++++++++--- 4 files changed, 98 insertions(+), 26 deletions(-) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index f6085fbd9ff..c0e6f83ae61 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -1383,19 +1383,25 @@ const ( // Default value: FALSE // Allowed filters: DomainID EnableDropStuckTaskByDomainID - // EnableConsistentQuery is indicates if consistent query is enabled for the cluster + // EnableConsistentQuery indicates if consistent query is enabled for the cluster // KeyName: history.EnableConsistentQuery // Value type: Bool // Default value: TRUE // Allowed filters: N/A EnableConsistentQuery - // EnableConsistentQueryByDomain is indicates if consistent query is enabled for a domain + // EnableConsistentQueryByDomain indicates if consistent query is enabled for a domain // KeyName: history.EnableConsistentQueryByDomain // Value type: Bool // Default value: FALSE // Allowed filters: DomainName EnableConsistentQueryByDomain - // MaxBufferedQueryCount is indicates the maximum number of queries which can be buffered at a given time for a single workflow + // EnableCrossClusterOperations indicates if cross cluster operations can be scheduled for a domain + // KeyName: history.enableCrossClusterOperations + // Value type: Bool + // Default value: FALSE + // Allowed filters: DomainName + EnableCrossClusterOperations + // MaxBufferedQueryCount indicates the maximum number of queries which can be buffered at a given time for a single workflow // KeyName: history.MaxBufferedQueryCount // Value type: Int // Default value: 1 @@ -2208,6 +2214,7 @@ var Keys = map[Key]string{ ReplicationTaskGenerationQPS: "history.ReplicationTaskGenerationQPS", EnableConsistentQuery: "history.EnableConsistentQuery", EnableConsistentQueryByDomain: "history.EnableConsistentQueryByDomain", + EnableCrossClusterOperations: "history.enableCrossClusterOperations", MaxBufferedQueryCount: "history.MaxBufferedQueryCount", MutableStateChecksumGenProbability: "history.mutableStateChecksumGenProbability", MutableStateChecksumVerifyProbability: "history.mutableStateChecksumVerifyProbability", diff --git a/service/history/config/config.go b/service/history/config/config.go index cf8766bc626..d24ef3a24d0 100644 --- a/service/history/config/config.go +++ b/service/history/config/config.go @@ -270,6 +270,8 @@ type Config struct { EnableConsistentQueryByDomain dynamicconfig.BoolPropertyFnWithDomainFilter MaxBufferedQueryCount dynamicconfig.IntPropertyFn + EnableCrossClusterOperations dynamicconfig.BoolPropertyFnWithDomainFilter + // Data integrity check related config knobs MutableStateChecksumGenProbability dynamicconfig.IntPropertyFnWithDomainFilter MutableStateChecksumVerifyProbability dynamicconfig.IntPropertyFnWithDomainFilter @@ -518,6 +520,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA EnableConsistentQuery: dc.GetBoolProperty(dynamicconfig.EnableConsistentQuery, true), EnableConsistentQueryByDomain: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableConsistentQueryByDomain, false), + EnableCrossClusterOperations: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableCrossClusterOperations, false), MaxBufferedQueryCount: dc.GetIntProperty(dynamicconfig.MaxBufferedQueryCount, 1), MutableStateChecksumGenProbability: dc.GetIntPropertyFilteredByDomain(dynamicconfig.MutableStateChecksumGenProbability, 0), MutableStateChecksumVerifyProbability: dc.GetIntPropertyFilteredByDomain(dynamicconfig.MutableStateChecksumVerifyProbability, 0), @@ -556,6 +559,7 @@ func NewForTestByShardNumber(shardNumber int) *Config { config.ReplicationTaskProcessorShardQPS = dc.GetFloat64Property(dynamicconfig.ReplicationTaskProcessorShardQPS, 10000) config.ReplicationTaskProcessorStartWait = dc.GetDurationPropertyFilteredByShardID(dynamicconfig.ReplicationTaskProcessorStartWait, time.Nanosecond) config.EnableActivityLocalDispatchByDomain = dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableActivityLocalDispatchByDomain, true) + config.EnableCrossClusterOperations = dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableCrossClusterOperations, true) return config } diff --git a/service/history/decision/checker.go b/service/history/decision/checker.go index db2886aefba..a9de1ac0065 100644 --- a/service/history/decision/checker.go +++ b/service/history/decision/checker.go @@ -772,16 +772,16 @@ func (v *attrValidator) validatedTaskList( } func (v *attrValidator) validateCrossDomainCall( - domainID string, + sourceDomainID string, targetDomainID string, ) error { // same name, no check needed - if domainID == targetDomainID { + if sourceDomainID == targetDomainID { return nil } - domainEntry, err := v.domainCache.GetDomainByID(domainID) + sourceDomainEntry, err := v.domainCache.GetDomainByID(sourceDomainID) if err != nil { return err } @@ -791,23 +791,42 @@ func (v *attrValidator) validateCrossDomainCall( return err } - // both local domain - if !domainEntry.IsGlobalDomain() && !targetDomainEntry.IsGlobalDomain() { - return nil - } + sourceClusters := sourceDomainEntry.GetReplicationConfig().Clusters + targetClusters := targetDomainEntry.GetReplicationConfig().Clusters - domainClusters := domainEntry.GetReplicationConfig().Clusters - targetDomainClusters := targetDomainEntry.GetReplicationConfig().Clusters - - // one is local domain, another one is global domain or both global domain - // treat global domain with one replication cluster as local domain - if len(domainClusters) == 1 && len(targetDomainClusters) == 1 { - if *domainClusters[0] == *targetDomainClusters[0] { + // both "local domain" + // here a domain is "local domain" when: + // - IsGlobalDomain() returns false + // - domainCluster contains only one cluster + // case 1 can be actually be combined with this case + if len(sourceClusters) == 1 && len(targetClusters) == 1 { + if sourceClusters[0].ClusterName == targetClusters[0].ClusterName { return nil } - return v.createCrossDomainCallError(domainEntry, targetDomainEntry) + return v.createCrossDomainCallError(sourceDomainEntry, targetDomainEntry) + } + + // both global domain with > 1 replication cluster + // when code reaches here, at least one domain has more than one cluster + if len(sourceClusters) == len(targetClusters) && + v.config.EnableCrossClusterOperations(sourceDomainEntry.GetInfo().Name) { + // check if the source domain cluster matches those for the target domain + for _, sourceCluster := range sourceClusters { + found := false + for _, targetCluster := range targetClusters { + if sourceCluster.ClusterName == targetCluster.ClusterName { + found = true + break + } + } + if !found { + return v.createCrossDomainCallError(sourceDomainEntry, targetDomainEntry) + } + } + return nil } - return v.createCrossDomainCallError(domainEntry, targetDomainEntry) + + return v.createCrossDomainCallError(sourceDomainEntry, targetDomainEntry) } func (v *attrValidator) createCrossDomainCallError( diff --git a/service/history/decision/checker_test.go b/service/history/decision/checker_test.go index 347f11e5ff0..1bec16aaba1 100644 --- a/service/history/decision/checker_test.go +++ b/service/history/decision/checker_test.go @@ -97,6 +97,7 @@ func (s *attrValidatorSuite) SetupTest() { ActivityMaxScheduleToStartTimeoutForRetry: dynamicconfig.GetDurationPropertyFnFilteredByDomain( time.Duration(s.testActivityMaxScheduleToStartTimeoutForRetryInSeconds) * time.Second, ), + EnableCrossClusterOperations: dynamicconfig.GetBoolPropertyFnFilteredByDomain(false), } s.validator = newAttrValidator( s.mockDomainCache, @@ -478,7 +479,14 @@ func (s *attrValidatorSuite) TestValidateCrossDomainCall_GlobalToEffectiveLocal( s.IsType(&types.BadRequestError{}, err) } -func (s *attrValidatorSuite) TestValidateCrossDomainCall_GlobalToGlobal_DiffDomain() { +func (s *attrValidatorSuite) TestValidateCrossDomainCall_GlobalToGlobal_SameDomain() { + targetDomainID := s.testDomainID + + err := s.validator.validateCrossDomainCall(s.testDomainID, targetDomainID) + s.Nil(err) +} + +func (s *attrValidatorSuite) TestValidateCrossDomainCall_GlobalToGlobal_DiffDomain_SameCluster() { domainEntry := cache.NewGlobalDomainCacheEntryForTest( &persistence.DomainInfo{Name: s.testDomainID}, nil, @@ -506,18 +514,52 @@ func (s *attrValidatorSuite) TestValidateCrossDomainCall_GlobalToGlobal_DiffDoma nil, ) - s.mockDomainCache.EXPECT().GetDomainByID(s.testDomainID).Return(domainEntry, nil).Times(1) - s.mockDomainCache.EXPECT().GetDomainByID(s.testTargetDomainID).Return(targetDomainEntry, nil).Times(1) + s.mockDomainCache.EXPECT().GetDomainByID(s.testDomainID).Return(domainEntry, nil).Times(2) + s.mockDomainCache.EXPECT().GetDomainByID(s.testTargetDomainID).Return(targetDomainEntry, nil).Times(2) err := s.validator.validateCrossDomainCall(s.testDomainID, s.testTargetDomainID) s.IsType(&types.BadRequestError{}, err) + + s.validator.config.EnableCrossClusterOperations = dynamicconfig.GetBoolPropertyFnFilteredByDomain(true) + err = s.validator.validateCrossDomainCall(s.testDomainID, s.testTargetDomainID) + s.Nil(err) } -func (s *attrValidatorSuite) TestValidateCrossDomainCall_GlobalToGlobal_SameDomain() { - targetDomainID := s.testDomainID +func (s *attrValidatorSuite) TestValidateCrossDomainCall_GlobalToGlobal_DiffDomain_DiffCluster() { + domainEntry := cache.NewGlobalDomainCacheEntryForTest( + &persistence.DomainInfo{Name: s.testDomainID}, + nil, + &persistence.DomainReplicationConfig{ + ActiveClusterName: cluster.TestCurrentClusterName, + Clusters: []*persistence.ClusterReplicationConfig{ + {ClusterName: cluster.TestAlternativeClusterName}, + {ClusterName: cluster.TestCurrentClusterName}, + {ClusterName: "cluster name for s.testDomainID"}, + }, + }, + 1234, + nil, + ) + targetDomainEntry := cache.NewGlobalDomainCacheEntryForTest( + &persistence.DomainInfo{Name: s.testTargetDomainID}, + nil, + &persistence.DomainReplicationConfig{ + ActiveClusterName: cluster.TestCurrentClusterName, + Clusters: []*persistence.ClusterReplicationConfig{ + {ClusterName: cluster.TestCurrentClusterName}, + {ClusterName: cluster.TestAlternativeClusterName}, + {ClusterName: "cluster name for s.testTargetDomainID"}, + }, + }, + 1234, + nil, + ) - err := s.validator.validateCrossDomainCall(s.testDomainID, targetDomainID) - s.Nil(err) + s.mockDomainCache.EXPECT().GetDomainByID(s.testDomainID).Return(domainEntry, nil).Times(1) + s.mockDomainCache.EXPECT().GetDomainByID(s.testTargetDomainID).Return(targetDomainEntry, nil).Times(1) + + err := s.validator.validateCrossDomainCall(s.testDomainID, s.testTargetDomainID) + s.IsType(&types.BadRequestError{}, err) } func (s *attrValidatorSuite) TestValidateTaskListName() {