Skip to content

Commit

Permalink
Feature/min initial failover version (cadence-workflow#5015)
Browse files Browse the repository at this point in the history
Adds a migration feature for failover version
  • Loading branch information
davidporter-id-au authored Nov 3, 2022
1 parent a668ce1 commit 56d2028
Show file tree
Hide file tree
Showing 19 changed files with 1,133 additions and 44 deletions.
3 changes: 3 additions & 0 deletions cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ func (s *server) startService() common.Daemon {
clusterGroupMetadata.PrimaryClusterName,
clusterGroupMetadata.CurrentClusterName,
clusterGroupMetadata.ClusterGroup,
dc.GetBoolPropertyFilteredByDomain(dynamicconfig.UseNewInitialFailoverVersion),
params.MetricsClient,
params.Logger,
)

advancedVisMode := dc.GetStringProperty(
Expand Down
113 changes: 88 additions & 25 deletions common/cluster/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,18 @@ import (

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
)

type (
// Metadata provides information about clusters
Metadata struct {
log log.Logger
metrics metrics.Scope

// failoverVersionIncrement is the increment of each cluster's version when failover happen
failoverVersionIncrement int64
// primaryClusterName is the name of the primary cluster, only the primary cluster can register / update domain
Expand All @@ -45,6 +52,8 @@ type (
remoteClusters map[string]config.ClusterInformation
// versionToClusterName contains all initial version -> corresponding cluster name
versionToClusterName map[int64]string
// allows for a new failover version migration
useNewFailoverVersionOverride dynamicconfig.BoolPropertyFnWithDomainFilter
}
)

Expand All @@ -54,6 +63,9 @@ func NewMetadata(
primaryClusterName string,
currentClusterName string,
clusterGroup map[string]config.ClusterInformation,
useMinFailoverVersionOverrideConfig dynamicconfig.BoolPropertyFnWithDomainFilter,
metricsClient metrics.Client,
logger log.Logger,
) Metadata {
versionToClusterName := make(map[int64]string)
for clusterName, info := range clusterGroup {
Expand All @@ -77,36 +89,44 @@ func NewMetadata(
}

return Metadata{
failoverVersionIncrement: failoverVersionIncrement,
primaryClusterName: primaryClusterName,
currentClusterName: currentClusterName,
allClusters: clusterGroup,
enabledClusters: enabledClusters,
remoteClusters: remoteClusters,
versionToClusterName: versionToClusterName,
log: logger,
metrics: metricsClient.Scope(metrics.ClusterMetadataScope),
failoverVersionIncrement: failoverVersionIncrement,
primaryClusterName: primaryClusterName,
currentClusterName: currentClusterName,
allClusters: clusterGroup,
enabledClusters: enabledClusters,
remoteClusters: remoteClusters,
versionToClusterName: versionToClusterName,
useNewFailoverVersionOverride: useMinFailoverVersionOverrideConfig,
}
}

// GetNextFailoverVersion return the next failover version based on input
func (m Metadata) GetNextFailoverVersion(cluster string, currentFailoverVersion int64) int64 {
info, ok := m.allClusters[cluster]
if !ok {
panic(fmt.Sprintf(
"Unknown cluster name: %v with given cluster initial failover version map: %v.",
cluster,
m.allClusters,
))
}
failoverVersion := currentFailoverVersion/m.failoverVersionIncrement*m.failoverVersionIncrement + info.InitialFailoverVersion
func (m Metadata) GetNextFailoverVersion(cluster string, currentFailoverVersion int64, domainName string) int64 {
initialFailoverVersion := m.getInitialFailoverVersion(cluster, domainName)
failoverVersion := currentFailoverVersion/m.failoverVersionIncrement*m.failoverVersionIncrement + initialFailoverVersion
if failoverVersion < currentFailoverVersion {
return failoverVersion + m.failoverVersionIncrement
}
return failoverVersion
}

// IsVersionFromSameCluster return true if 2 version are used for the same cluster
// IsVersionFromSameCluster return true if the new version is used for the same cluster
func (m Metadata) IsVersionFromSameCluster(version1 int64, version2 int64) bool {
return (version1-version2)%m.failoverVersionIncrement == 0
v1Server, err := m.resolveServerName(version1)
if err != nil {
// preserving old behaviour however, this should never occur
m.metrics.IncCounter(metrics.ClusterMetadataFailureToResolveCounter)
m.log.Error("could not resolve an incoming version", tag.Dynamic("failover-version", version1))
return false
}
v2Server, err := m.resolveServerName(version2)
if err != nil {
m.log.Error("could not resolve an incoming version", tag.Dynamic("failover-version", version2))
return false
}
return v1Server == v2Server
}

func (m Metadata) IsPrimaryCluster() bool {
Expand Down Expand Up @@ -138,16 +158,59 @@ func (m Metadata) ClusterNameForFailoverVersion(failoverVersion int64) string {
if failoverVersion == common.EmptyVersion {
return m.currentClusterName
}
server, err := m.resolveServerName(failoverVersion)
if err != nil {
m.metrics.IncCounter(metrics.ClusterMetadataResolvingFailoverVersionCounter)
panic(fmt.Sprintf("failed to resolve failover version: %v", err))
}
return server
}

initialFailoverVersion := failoverVersion % m.failoverVersionIncrement
clusterName, ok := m.versionToClusterName[initialFailoverVersion]
// gets the initial failover version for a cluster / domain
// along with some helpers for a migration - should it be necessary
func (m Metadata) getInitialFailoverVersion(cluster string, domainName string) int64 {
info, ok := m.allClusters[cluster]
if !ok {
panic(fmt.Sprintf(
"Unknown initial failover version %v with given cluster initial failover version map: %v and failover version increment %v.",
initialFailoverVersion,
"Unknown cluster name: %v with given cluster initial failover version map: %v.",
cluster,
m.allClusters,
m.failoverVersionIncrement,
))
}
return clusterName

// if using the minFailover Version during a cluster config, then return this from config
// (assuming it's safe to do so). This is not the normal state of things and intended only
// for when migrating versions.
usingNewFailoverVersion := m.useNewFailoverVersionOverride(domainName)
if usingNewFailoverVersion && info.NewInitialFailoverVersion != nil {
m.log.Debug("using new failover version for cluster", tag.ClusterName(cluster), tag.WorkflowDomainName(domainName))
m.metrics.IncCounter(metrics.ClusterMetadataGettingMinFailoverVersionCounter)
return *info.NewInitialFailoverVersion
}
// default behaviour - return the initial failover version - a marker to
// identify the cluster for all counters
m.log.Debug("getting failover version for cluster", tag.ClusterName(cluster), tag.WorkflowDomainName(domainName))
m.metrics.IncCounter(metrics.ClusterMetadataGettingFailoverVersionCounter)
return info.InitialFailoverVersion
}

// resolves the server name from a version number. Better to use this
// than to check versionToClusterName directly, as this also falls back to catch
// when there's a migration NewInitialFailoverVersion
func (m Metadata) resolveServerName(version int64) (string, error) {
moddedFoVersion := version % m.failoverVersionIncrement
// attempt a lookup first
server, ok := m.versionToClusterName[moddedFoVersion]
if ok {
return server, nil
}

// else fall back on checking for new failover versions
for name, cluster := range m.allClusters {
if cluster.NewInitialFailoverVersion != nil && *cluster.NewInitialFailoverVersion == moddedFoVersion {
return name, nil
}
}
m.metrics.IncCounter(metrics.ClusterMetadataFailureToResolveCounter)
return "", fmt.Errorf("could not resolve failover version: %d", version)
}
11 changes: 11 additions & 0 deletions common/cluster/metadataTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ package cluster

import (
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log/loggerimpl"
commonMetrics "github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/service"
)

Expand Down Expand Up @@ -92,6 +94,9 @@ var (
TestCurrentClusterName,
TestCurrentClusterName,
TestAllClusterInfo,
func(d string) bool { return false },
commonMetrics.NewNoopMetricsClient(),
loggerimpl.NewNopLogger(),
)

// TestPassiveClusterMetadata is metadata for a passive cluster
Expand All @@ -100,6 +105,9 @@ var (
TestCurrentClusterName,
TestAlternativeClusterName,
TestAllClusterInfo,
func(d string) bool { return false },
commonMetrics.NewNoopMetricsClient(),
loggerimpl.NewNopLogger(),
)
)

Expand All @@ -115,5 +123,8 @@ func GetTestClusterMetadata(isPrimaryCluster bool) Metadata {
primaryClusterName,
TestCurrentClusterName,
TestAllClusterInfo,
func(d string) bool { return false },
commonMetrics.NewNoopMetricsClient(),
loggerimpl.NewNopLogger(),
)
}
Loading

0 comments on commit 56d2028

Please sign in to comment.