Skip to content

Commit

Permalink
Add feature flag for scheduling cross-cluster operations (cadence-wor…
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Aug 30, 2021
1 parent 482f478 commit fb8e782
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 26 deletions.
13 changes: 10 additions & 3 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1383,19 +1383,25 @@ const (
// Default value: FALSE
// Allowed filters: DomainID
EnableDropStuckTaskByDomainID
// EnableConsistentQuery is indicates if consistent query is enabled for the cluster
// EnableConsistentQuery indicates if consistent query is enabled for the cluster
// KeyName: history.EnableConsistentQuery
// Value type: Bool
// Default value: TRUE
// Allowed filters: N/A
EnableConsistentQuery
// EnableConsistentQueryByDomain is indicates if consistent query is enabled for a domain
// EnableConsistentQueryByDomain indicates if consistent query is enabled for a domain
// KeyName: history.EnableConsistentQueryByDomain
// Value type: Bool
// Default value: FALSE
// Allowed filters: DomainName
EnableConsistentQueryByDomain
// MaxBufferedQueryCount is indicates the maximum number of queries which can be buffered at a given time for a single workflow
// EnableCrossClusterOperations indicates if cross cluster operations can be scheduled for a domain
// KeyName: history.enableCrossClusterOperations
// Value type: Bool
// Default value: FALSE
// Allowed filters: DomainName
EnableCrossClusterOperations
// MaxBufferedQueryCount indicates the maximum number of queries which can be buffered at a given time for a single workflow
// KeyName: history.MaxBufferedQueryCount
// Value type: Int
// Default value: 1
Expand Down Expand Up @@ -2208,6 +2214,7 @@ var Keys = map[Key]string{
ReplicationTaskGenerationQPS: "history.ReplicationTaskGenerationQPS",
EnableConsistentQuery: "history.EnableConsistentQuery",
EnableConsistentQueryByDomain: "history.EnableConsistentQueryByDomain",
EnableCrossClusterOperations: "history.enableCrossClusterOperations",
MaxBufferedQueryCount: "history.MaxBufferedQueryCount",
MutableStateChecksumGenProbability: "history.mutableStateChecksumGenProbability",
MutableStateChecksumVerifyProbability: "history.mutableStateChecksumVerifyProbability",
Expand Down
4 changes: 4 additions & 0 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ type Config struct {
EnableConsistentQueryByDomain dynamicconfig.BoolPropertyFnWithDomainFilter
MaxBufferedQueryCount dynamicconfig.IntPropertyFn

EnableCrossClusterOperations dynamicconfig.BoolPropertyFnWithDomainFilter

// Data integrity check related config knobs
MutableStateChecksumGenProbability dynamicconfig.IntPropertyFnWithDomainFilter
MutableStateChecksumVerifyProbability dynamicconfig.IntPropertyFnWithDomainFilter
Expand Down Expand Up @@ -518,6 +520,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA

EnableConsistentQuery: dc.GetBoolProperty(dynamicconfig.EnableConsistentQuery, true),
EnableConsistentQueryByDomain: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableConsistentQueryByDomain, false),
EnableCrossClusterOperations: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableCrossClusterOperations, false),
MaxBufferedQueryCount: dc.GetIntProperty(dynamicconfig.MaxBufferedQueryCount, 1),
MutableStateChecksumGenProbability: dc.GetIntPropertyFilteredByDomain(dynamicconfig.MutableStateChecksumGenProbability, 0),
MutableStateChecksumVerifyProbability: dc.GetIntPropertyFilteredByDomain(dynamicconfig.MutableStateChecksumVerifyProbability, 0),
Expand Down Expand Up @@ -556,6 +559,7 @@ func NewForTestByShardNumber(shardNumber int) *Config {
config.ReplicationTaskProcessorShardQPS = dc.GetFloat64Property(dynamicconfig.ReplicationTaskProcessorShardQPS, 10000)
config.ReplicationTaskProcessorStartWait = dc.GetDurationPropertyFilteredByShardID(dynamicconfig.ReplicationTaskProcessorStartWait, time.Nanosecond)
config.EnableActivityLocalDispatchByDomain = dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableActivityLocalDispatchByDomain, true)
config.EnableCrossClusterOperations = dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableCrossClusterOperations, true)
return config
}

Expand Down
51 changes: 35 additions & 16 deletions service/history/decision/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,16 +772,16 @@ func (v *attrValidator) validatedTaskList(
}

func (v *attrValidator) validateCrossDomainCall(
domainID string,
sourceDomainID string,
targetDomainID string,
) error {

// same name, no check needed
if domainID == targetDomainID {
if sourceDomainID == targetDomainID {
return nil
}

domainEntry, err := v.domainCache.GetDomainByID(domainID)
sourceDomainEntry, err := v.domainCache.GetDomainByID(sourceDomainID)
if err != nil {
return err
}
Expand All @@ -791,23 +791,42 @@ func (v *attrValidator) validateCrossDomainCall(
return err
}

// both local domain
if !domainEntry.IsGlobalDomain() && !targetDomainEntry.IsGlobalDomain() {
return nil
}
sourceClusters := sourceDomainEntry.GetReplicationConfig().Clusters
targetClusters := targetDomainEntry.GetReplicationConfig().Clusters

domainClusters := domainEntry.GetReplicationConfig().Clusters
targetDomainClusters := targetDomainEntry.GetReplicationConfig().Clusters

// one is local domain, another one is global domain or both global domain
// treat global domain with one replication cluster as local domain
if len(domainClusters) == 1 && len(targetDomainClusters) == 1 {
if *domainClusters[0] == *targetDomainClusters[0] {
// both "local domain"
// here a domain is "local domain" when:
// - IsGlobalDomain() returns false
// - domainCluster contains only one cluster
// case 1 can be actually be combined with this case
if len(sourceClusters) == 1 && len(targetClusters) == 1 {
if sourceClusters[0].ClusterName == targetClusters[0].ClusterName {
return nil
}
return v.createCrossDomainCallError(domainEntry, targetDomainEntry)
return v.createCrossDomainCallError(sourceDomainEntry, targetDomainEntry)
}

// both global domain with > 1 replication cluster
// when code reaches here, at least one domain has more than one cluster
if len(sourceClusters) == len(targetClusters) &&
v.config.EnableCrossClusterOperations(sourceDomainEntry.GetInfo().Name) {
// check if the source domain cluster matches those for the target domain
for _, sourceCluster := range sourceClusters {
found := false
for _, targetCluster := range targetClusters {
if sourceCluster.ClusterName == targetCluster.ClusterName {
found = true
break
}
}
if !found {
return v.createCrossDomainCallError(sourceDomainEntry, targetDomainEntry)
}
}
return nil
}
return v.createCrossDomainCallError(domainEntry, targetDomainEntry)

return v.createCrossDomainCallError(sourceDomainEntry, targetDomainEntry)
}

func (v *attrValidator) createCrossDomainCallError(
Expand Down
56 changes: 49 additions & 7 deletions service/history/decision/checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func (s *attrValidatorSuite) SetupTest() {
ActivityMaxScheduleToStartTimeoutForRetry: dynamicconfig.GetDurationPropertyFnFilteredByDomain(
time.Duration(s.testActivityMaxScheduleToStartTimeoutForRetryInSeconds) * time.Second,
),
EnableCrossClusterOperations: dynamicconfig.GetBoolPropertyFnFilteredByDomain(false),
}
s.validator = newAttrValidator(
s.mockDomainCache,
Expand Down Expand Up @@ -478,7 +479,14 @@ func (s *attrValidatorSuite) TestValidateCrossDomainCall_GlobalToEffectiveLocal(
s.IsType(&types.BadRequestError{}, err)
}

func (s *attrValidatorSuite) TestValidateCrossDomainCall_GlobalToGlobal_DiffDomain() {
func (s *attrValidatorSuite) TestValidateCrossDomainCall_GlobalToGlobal_SameDomain() {
targetDomainID := s.testDomainID

err := s.validator.validateCrossDomainCall(s.testDomainID, targetDomainID)
s.Nil(err)
}

func (s *attrValidatorSuite) TestValidateCrossDomainCall_GlobalToGlobal_DiffDomain_SameCluster() {
domainEntry := cache.NewGlobalDomainCacheEntryForTest(
&persistence.DomainInfo{Name: s.testDomainID},
nil,
Expand Down Expand Up @@ -506,18 +514,52 @@ func (s *attrValidatorSuite) TestValidateCrossDomainCall_GlobalToGlobal_DiffDoma
nil,
)

s.mockDomainCache.EXPECT().GetDomainByID(s.testDomainID).Return(domainEntry, nil).Times(1)
s.mockDomainCache.EXPECT().GetDomainByID(s.testTargetDomainID).Return(targetDomainEntry, nil).Times(1)
s.mockDomainCache.EXPECT().GetDomainByID(s.testDomainID).Return(domainEntry, nil).Times(2)
s.mockDomainCache.EXPECT().GetDomainByID(s.testTargetDomainID).Return(targetDomainEntry, nil).Times(2)

err := s.validator.validateCrossDomainCall(s.testDomainID, s.testTargetDomainID)
s.IsType(&types.BadRequestError{}, err)

s.validator.config.EnableCrossClusterOperations = dynamicconfig.GetBoolPropertyFnFilteredByDomain(true)
err = s.validator.validateCrossDomainCall(s.testDomainID, s.testTargetDomainID)
s.Nil(err)
}

func (s *attrValidatorSuite) TestValidateCrossDomainCall_GlobalToGlobal_SameDomain() {
targetDomainID := s.testDomainID
func (s *attrValidatorSuite) TestValidateCrossDomainCall_GlobalToGlobal_DiffDomain_DiffCluster() {
domainEntry := cache.NewGlobalDomainCacheEntryForTest(
&persistence.DomainInfo{Name: s.testDomainID},
nil,
&persistence.DomainReplicationConfig{
ActiveClusterName: cluster.TestCurrentClusterName,
Clusters: []*persistence.ClusterReplicationConfig{
{ClusterName: cluster.TestAlternativeClusterName},
{ClusterName: cluster.TestCurrentClusterName},
{ClusterName: "cluster name for s.testDomainID"},
},
},
1234,
nil,
)
targetDomainEntry := cache.NewGlobalDomainCacheEntryForTest(
&persistence.DomainInfo{Name: s.testTargetDomainID},
nil,
&persistence.DomainReplicationConfig{
ActiveClusterName: cluster.TestCurrentClusterName,
Clusters: []*persistence.ClusterReplicationConfig{
{ClusterName: cluster.TestCurrentClusterName},
{ClusterName: cluster.TestAlternativeClusterName},
{ClusterName: "cluster name for s.testTargetDomainID"},
},
},
1234,
nil,
)

err := s.validator.validateCrossDomainCall(s.testDomainID, targetDomainID)
s.Nil(err)
s.mockDomainCache.EXPECT().GetDomainByID(s.testDomainID).Return(domainEntry, nil).Times(1)
s.mockDomainCache.EXPECT().GetDomainByID(s.testTargetDomainID).Return(targetDomainEntry, nil).Times(1)

err := s.validator.validateCrossDomainCall(s.testDomainID, s.testTargetDomainID)
s.IsType(&types.BadRequestError{}, err)
}

func (s *attrValidatorSuite) TestValidateTaskListName() {
Expand Down

0 comments on commit fb8e782

Please sign in to comment.