Skip to content

Commit

Permalink
Deprecate clustersInfo configuration in favor of new clusterMetadata… (
Browse files Browse the repository at this point in the history
…cadence-workflow#1809)

Deprecate clustersInfo configuration in favor of new clusterMetadata, which provides better flexibility

* Add conversion function which turn clustersInfo into clusterMetadata
* State builder in history service will always set the replication state whenever applying an event.
* Fix & update workflow execution reset test code.

New config change allows a cluster to be disabled, which can be useful for cluster migration using cross DC

Before:
```
clustersInfo:
  enableGlobalDomain: false
  failoverVersionIncrement: 10
  masterClusterName: "active"
  currentClusterName: "active"
  clusterInitialFailoverVersion:
    active: 0
  clusterAddress:
    active:
      rpcName: "cadence-frontend"
      rpcAddress: "127.0.0.1:7933"
```

After:
```
clusterMetadata:
  enableGlobalDomain: false
  failoverVersionIncrement: 10
  masterClusterName: "active"
  currentClusterName: "active"
  clusterInformation:
    active:
      enabled: true
      initialFailoverVersion: 0
      rpcName: "cadence-frontend"
      rpcAddress: "127.0.0.1:7933"

```
  • Loading branch information
wxing1292 authored May 8, 2019
1 parent 0a94cb1 commit 66a9a25
Show file tree
Hide file tree
Showing 37 changed files with 565 additions and 410 deletions.
8 changes: 4 additions & 4 deletions client/clientBean.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,14 @@ func NewClientBean(factory Factory, dispatcherProvider DispatcherProvider, clust

remoteAdminClients := map[string]admin.Client{}
remoteFrontendClients := map[string]frontend.Client{}
for cluster, address := range clusterMetadata.GetAllClientAddress() {
dispatcher, err := dispatcherProvider.Get(address.RPCName, address.RPCAddress)
for cluster, info := range clusterMetadata.GetAllClusterInfo() {
dispatcher, err := dispatcherProvider.Get(info.RPCName, info.RPCAddress)
if err != nil {
return nil, err
}

adminClient, err := factory.NewAdminClientWithTimeoutAndDispatcher(
address.RPCName,
info.RPCName,
admin.DefaultTimeout,
dispatcher,
)
Expand All @@ -102,7 +102,7 @@ func NewClientBean(factory Factory, dispatcherProvider DispatcherProvider, clust
}

frontendclient, err := factory.NewFrontendClientWithTimeoutAndDispatcher(
address.RPCName,
info.RPCName,
frontend.DefaultTimeout,
frontend.DefaultLongPollTimeout,
dispatcher,
Expand Down
25 changes: 17 additions & 8 deletions cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ import (
"log"
"time"

"github.com/uber/cadence/common/cluster"

"github.com/uber/cadence/client"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/blobstore/filestore"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/elasticsearch"
"github.com/uber/cadence/common/log/loggerimpl"
"github.com/uber/cadence/common/log/tag"
Expand Down Expand Up @@ -123,22 +124,30 @@ func (s *server) startService() common.Daemon {
params.MetricScope = svcCfg.Metrics.NewScope(params.Logger)
params.RPCFactory = svcCfg.RPC.NewFactory(params.Name, params.Logger)
params.PProfInitializer = svcCfg.PProf.NewInitializer(params.Logger)
enableGlobalDomain := dc.GetBoolProperty(dynamicconfig.EnableGlobalDomain, s.cfg.ClustersInfo.EnableGlobalDomain)

archivalStatus := dc.GetStringProperty(dynamicconfig.ArchivalStatus, s.cfg.Archival.Status)
enableReadFromArchival := dc.GetBoolProperty(dynamicconfig.EnableReadFromArchival, s.cfg.Archival.EnableReadFromArchival)

params.DCRedirectionPolicy = s.cfg.DCRedirectionPolicy

params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, params.Logger))

clusterMetadata := s.cfg.ClusterMetadata
// TODO remove when ClustersInfo is fully deprecated
if len(s.cfg.ClustersInfo.CurrentClusterName) != 0 && len(s.cfg.ClusterMetadata.CurrentClusterName) != 0 {
log.Fatalf("cannot config both clustersInfo and clusterMetadata")
}
if len(s.cfg.ClustersInfo.CurrentClusterName) != 0 {
clusterMetadata = s.cfg.ClustersInfo.ToClusterMetadata()
}
params.ClusterMetadata = cluster.NewMetadata(
params.Logger,
params.MetricsClient,
enableGlobalDomain,
s.cfg.ClustersInfo.FailoverVersionIncrement,
s.cfg.ClustersInfo.MasterClusterName,
s.cfg.ClustersInfo.CurrentClusterName,
s.cfg.ClustersInfo.ClusterInitialFailoverVersions,
s.cfg.ClustersInfo.ClusterAddress,
dc.GetBoolProperty(dynamicconfig.EnableGlobalDomain, clusterMetadata.EnableGlobalDomain),
clusterMetadata.FailoverVersionIncrement,
clusterMetadata.MasterClusterName,
clusterMetadata.CurrentClusterName,
clusterMetadata.ClusterInformation,
archivalStatus,
s.cfg.Archival.DefaultBucket,
enableReadFromArchival,
Expand Down
7 changes: 6 additions & 1 deletion common/cache/domainCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,17 @@ func newDomainCacheEntry(clusterMetadata cluster.Metadata) *DomainCacheEntry {
}

// NewDomainCacheEntryWithReplicationForTest returns an entry with test data
func NewDomainCacheEntryWithReplicationForTest(info *persistence.DomainInfo, config *persistence.DomainConfig, repConfig *persistence.DomainReplicationConfig, clusterMetadata cluster.Metadata) *DomainCacheEntry {
func NewDomainCacheEntryWithReplicationForTest(info *persistence.DomainInfo,
config *persistence.DomainConfig,
repConfig *persistence.DomainReplicationConfig,
failoverVersion int64,
clusterMetadata cluster.Metadata) *DomainCacheEntry {
return &DomainCacheEntry{
info: info,
config: config,
isGlobalDomain: true,
replicationConfig: repConfig,
failoverVersion: failoverVersion,
clusterMetadata: clusterMetadata,
}
}
Expand Down
106 changes: 49 additions & 57 deletions common/cluster/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package cluster

import (
"fmt"

"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
Expand All @@ -45,12 +46,10 @@ type (
GetMasterClusterName() string
// GetCurrentClusterName return the current cluster name
GetCurrentClusterName() string
// GetAllClusterFailoverVersions return the all cluster name -> corresponding initial failover version
GetAllClusterFailoverVersions() map[string]int64
// GetAllClusterInfo return the all cluster name -> corresponding info
GetAllClusterInfo() map[string]config.ClusterInformation
// ClusterNameForFailoverVersion return the corresponding cluster name for a given failover version
ClusterNameForFailoverVersion(failoverVersion int64) string
// GetAllClientAddress return the frontend address for each cluster name
GetAllClientAddress() map[string]config.Address

// ArchivalConfig returns the archival config of the cluster
ArchivalConfig() *ArchivalConfig
Expand All @@ -62,17 +61,17 @@ type (
// EnableGlobalDomain whether the global domain is enabled,
// this attr should be discarded when cross DC is made public
enableGlobalDomain dynamicconfig.BoolPropertyFn
// failoverVersionIncrement is the increment of each cluster failover version
// failoverVersionIncrement is the increment of each cluster's version when failover happen
failoverVersionIncrement int64
// masterClusterName is the name of the master cluster, only the master cluster can register / update domain
// all clusters can do domain failover
masterClusterName string
// currentClusterName is the name of the current cluster
currentClusterName string
// clusterInitialFailoverVersions contains all cluster name -> corresponding initial failover version
clusterInitialFailoverVersions map[string]int64
// clusterInitialFailoverVersions contains all initial failover version -> corresponding cluster name
initialFailoverVersionClusters map[int64]string
// clusterInfo contains all cluster name -> corresponding information
clusterInfo map[string]config.ClusterInformation
// versionToClusterName contains all initial version -> corresponding cluster name
versionToClusterName map[int64]string
// clusterToAddress contains the cluster name to corresponding frontend client
clusterToAddress map[string]config.Address

Expand All @@ -93,50 +92,49 @@ func NewMetadata(
failoverVersionIncrement int64,
masterClusterName string,
currentClusterName string,
clusterInitialFailoverVersions map[string]int64,
clusterToAddress map[string]config.Address,
clusterInfo map[string]config.ClusterInformation,
archivalStatus dynamicconfig.StringPropertyFn,
defaultBucket string,
enableReadFromArchival dynamicconfig.BoolPropertyFn,
) Metadata {

if len(clusterInitialFailoverVersions) == 0 {
panic("Empty initial failover versions for cluster")
if len(clusterInfo) == 0 {
panic("Empty cluster information")
} else if len(masterClusterName) == 0 {
panic("Master cluster name is empty")
} else if len(currentClusterName) == 0 {
panic("Current cluster name is empty")
} else if failoverVersionIncrement == 0 {
panic("Version increment is 0")
}
initialFailoverVersionClusters := make(map[int64]string)
for clusterName, initialFailoverVersion := range clusterInitialFailoverVersions {
if failoverVersionIncrement <= initialFailoverVersion {

versionToClusterName := make(map[int64]string)
for clusterName, info := range clusterInfo {
if failoverVersionIncrement <= info.InitialFailoverVersion || info.InitialFailoverVersion < 0 {
panic(fmt.Sprintf(
"Failover version increment %v is smaller than initial value: %v.",
"Version increment %v is smaller than initial version: %v.",
failoverVersionIncrement,
clusterInitialFailoverVersions,
info.InitialFailoverVersion,
))
}
if len(clusterName) == 0 {
panic("Cluster name in all cluster names is empty")
}
initialFailoverVersionClusters[initialFailoverVersion] = clusterName
versionToClusterName[info.InitialFailoverVersion] = clusterName

if info.Enabled && (len(info.RPCName) == 0 || len(info.RPCAddress) == 0) {
panic(fmt.Sprintf("Cluster %v: rpc name / address is empty", clusterName))
}
}

if _, ok := clusterInitialFailoverVersions[currentClusterName]; !ok {
panic("Current cluster is not specified in all cluster names")
if _, ok := clusterInfo[currentClusterName]; !ok {
panic("Current cluster is not specified in cluster info")
}
if _, ok := clusterInitialFailoverVersions[masterClusterName]; !ok {
panic("Master cluster is not specified in all cluster names")
if _, ok := clusterInfo[masterClusterName]; !ok {
panic("Master cluster is not specified in cluster info")
}
if len(initialFailoverVersionClusters) != len(clusterInitialFailoverVersions) {
panic("Cluster to initial failover versions have duplicate initial versions")
}

// only check whether a cluster in cluster -> initial failover versions exists in cluster -> address
for clusterName := range clusterInitialFailoverVersions {
if _, ok := clusterToAddress[clusterName]; !ok {
panic("Cluster -> initial failover version does not have an address")
}
if len(versionToClusterName) != len(clusterInfo) {
panic("Cluster info initial versions have duplicates")
}

status, err := getArchivalStatus(archivalStatus())
Expand All @@ -149,18 +147,17 @@ func NewMetadata(
}

return &metadataImpl{
logger: logger,
metricsClient: metricsClient,
enableGlobalDomain: enableGlobalDomain,
failoverVersionIncrement: failoverVersionIncrement,
masterClusterName: masterClusterName,
currentClusterName: currentClusterName,
clusterInitialFailoverVersions: clusterInitialFailoverVersions,
initialFailoverVersionClusters: initialFailoverVersionClusters,
clusterToAddress: clusterToAddress,
archivalStatus: archivalStatus,
defaultBucket: defaultBucket,
enableReadFromArchival: enableReadFromArchival,
logger: logger,
metricsClient: metricsClient,
enableGlobalDomain: enableGlobalDomain,
failoverVersionIncrement: failoverVersionIncrement,
masterClusterName: masterClusterName,
currentClusterName: currentClusterName,
clusterInfo: clusterInfo,
versionToClusterName: versionToClusterName,
archivalStatus: archivalStatus,
defaultBucket: defaultBucket,
enableReadFromArchival: enableReadFromArchival,
}
}

Expand All @@ -172,15 +169,15 @@ func (metadata *metadataImpl) IsGlobalDomainEnabled() bool {

// GetNextFailoverVersion return the next failover version based on input
func (metadata *metadataImpl) GetNextFailoverVersion(cluster string, currentFailoverVersion int64) int64 {
initialFailoverVersion, ok := metadata.clusterInitialFailoverVersions[cluster]
info, ok := metadata.clusterInfo[cluster]
if !ok {
panic(fmt.Sprintf(
"Unknown cluster name: %v with given cluster initial failover version map: %v.",
cluster,
metadata.clusterInitialFailoverVersions,
metadata.clusterInfo,
))
}
failoverVersion := currentFailoverVersion/metadata.failoverVersionIncrement*metadata.failoverVersionIncrement + initialFailoverVersion
failoverVersion := currentFailoverVersion/metadata.failoverVersionIncrement*metadata.failoverVersionIncrement + info.InitialFailoverVersion
if failoverVersion < currentFailoverVersion {
return failoverVersion + metadata.failoverVersionIncrement
}
Expand All @@ -206,31 +203,26 @@ func (metadata *metadataImpl) GetCurrentClusterName() string {
return metadata.currentClusterName
}

// GetAllClusterFailoverVersions return the all cluster name -> corresponding initial failover version
func (metadata *metadataImpl) GetAllClusterFailoverVersions() map[string]int64 {
return metadata.clusterInitialFailoverVersions
// GetAllClusterInfo return the all cluster name -> corresponding information
func (metadata *metadataImpl) GetAllClusterInfo() map[string]config.ClusterInformation {
return metadata.clusterInfo
}

// ClusterNameForFailoverVersion return the corresponding cluster name for a given failover version
func (metadata *metadataImpl) ClusterNameForFailoverVersion(failoverVersion int64) string {
initialFailoverVersion := failoverVersion % metadata.failoverVersionIncrement
clusterName, ok := metadata.initialFailoverVersionClusters[initialFailoverVersion]
clusterName, ok := metadata.versionToClusterName[initialFailoverVersion]
if !ok {
panic(fmt.Sprintf(
"Unknown initial failover version %v with given cluster initial failover version map: %v and failover version increment %v.",
initialFailoverVersion,
metadata.clusterInitialFailoverVersions,
metadata.clusterInfo,
metadata.failoverVersionIncrement,
))
}
return clusterName
}

// GetAllClientAddress return the frontend address for each cluster name
func (metadata *metadataImpl) GetAllClientAddress() map[string]config.Address {
return metadata.clusterToAddress
}

// ArchivalConfig returns the archival config of the cluster.
// This method always return a well formed ArchivalConfig (this means ArchivalConfig().IsValid always returns true).
func (metadata *metadataImpl) ArchivalConfig() (retCfg *ArchivalConfig) {
Expand Down
44 changes: 24 additions & 20 deletions common/cluster/metadataTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,26 +48,32 @@ const (
var (
// TestAllClusterNames is the all cluster names used for test
TestAllClusterNames = []string{TestCurrentClusterName, TestAlternativeClusterName}
// TestAllClusterFailoverVersions is the same as above, juse convinent for test mocking
TestAllClusterFailoverVersions = map[string]int64{
TestCurrentClusterName: TestCurrentClusterInitialFailoverVersion,
TestAlternativeClusterName: TestAlternativeClusterInitialFailoverVersion,
}
// TestAllClusterAddress is the same as above, juse convinent for test mocking
TestAllClusterAddress = map[string]config.Address{
TestCurrentClusterName: config.Address{RPCName: common.FrontendServiceName, RPCAddress: TestCurrentClusterFrontendAddress},
TestAlternativeClusterName: config.Address{RPCName: common.FrontendServiceName, RPCAddress: TestAlternativeClusterFrontendAddress},
// TestAllClusterInfo is the same as above, just convenient for test mocking
TestAllClusterInfo = map[string]config.ClusterInformation{
TestCurrentClusterName: config.ClusterInformation{
Enabled: true,
InitialFailoverVersion: TestCurrentClusterInitialFailoverVersion,
RPCName: common.FrontendServiceName,
RPCAddress: TestCurrentClusterFrontendAddress,
},
TestAlternativeClusterName: config.ClusterInformation{
Enabled: true,
InitialFailoverVersion: TestAlternativeClusterInitialFailoverVersion,
RPCName: common.FrontendServiceName,
RPCAddress: TestAlternativeClusterFrontendAddress,
},
}

// TestSingleDCAllClusterNames is the all cluster names used for test
TestSingleDCAllClusterNames = []string{TestCurrentClusterName}
// TestSingleDCAllClusterFailoverVersions is the same as above, juse convinent for test mocking
TestSingleDCAllClusterFailoverVersions = map[string]int64{
TestCurrentClusterName: TestCurrentClusterInitialFailoverVersion,
}
// TestSingleDCAllClusterAddress is the same as above, juse convinent for test mocking
TestSingleDCAllClusterAddress = map[string]config.Address{
TestCurrentClusterName: config.Address{RPCName: common.FrontendServiceName, RPCAddress: TestCurrentClusterFrontendAddress},
// TestSingleDCClusterInfo is the same as above, just convenient for test mocking
TestSingleDCClusterInfo = map[string]config.ClusterInformation{
TestCurrentClusterName: config.ClusterInformation{
Enabled: true,
InitialFailoverVersion: TestCurrentClusterInitialFailoverVersion,
RPCName: common.FrontendServiceName,
RPCAddress: TestCurrentClusterFrontendAddress,
},
}
)

Expand All @@ -92,8 +98,7 @@ func GetTestClusterMetadata(enableGlobalDomain bool, isMasterCluster bool, enabl
TestFailoverVersionIncrement,
masterClusterName,
TestCurrentClusterName,
TestAllClusterFailoverVersions,
TestAllClusterAddress,
TestAllClusterInfo,
dynamicconfig.GetStringPropertyFn(archivalStatus),
clusterDefaultBucket,
dynamicconfig.GetBoolPropertyFn(enableArchival),
Expand All @@ -107,8 +112,7 @@ func GetTestClusterMetadata(enableGlobalDomain bool, isMasterCluster bool, enabl
TestFailoverVersionIncrement,
TestCurrentClusterName,
TestCurrentClusterName,
TestSingleDCAllClusterFailoverVersions,
TestSingleDCAllClusterAddress,
TestSingleDCClusterInfo,
dynamicconfig.GetStringPropertyFn(archivalStatus),
clusterDefaultBucket,
dynamicconfig.GetBoolPropertyFn(enableArchival),
Expand Down
1 change: 0 additions & 1 deletion common/messaging/kafkaConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ func (k *KafkaConfig) Validate(checkCluster bool, checkApp bool) {
}
for _, topics := range k.ClusterToTopic {
validateTopicsFn(topics.Topic)
validateTopicsFn(topics.RetryTopic)
validateTopicsFn(topics.DLQTopic)
}
}
Expand Down
Loading

0 comments on commit 66a9a25

Please sign in to comment.