Skip to content

Commit

Permalink
Add failover marker coordinator (cadence-workflow#3288)
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 authored Jun 10, 2020
1 parent dd9cb49 commit 009d5c3
Show file tree
Hide file tree
Showing 13 changed files with 1,073 additions and 122 deletions.
10 changes: 5 additions & 5 deletions common/domain/failover_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,24 +153,24 @@ func (p *failoverWatcherImpl) handleFailoverTimeout(

failoverEndTime := domain.GetDomainFailoverEndTime()
if failoverEndTime != nil && p.timeSource.Now().After(time.Unix(0, *failoverEndTime)) {
domainName := domain.GetInfo().Name
domainID := domain.GetInfo().ID
// force failover the domain without setting the failover timeout
if err := CleanPendingActiveState(
p.metadataMgr,
domainName,
domainID,
domain.GetFailoverVersion(),
p.retryPolicy,
); err != nil {
p.metrics.IncCounter(metrics.DomainFailoverScope, metrics.CadenceFailures)
p.logger.Error("Failed to update pending-active domain to active.", tag.WorkflowDomainID(domainName), tag.Error(err))
p.logger.Error("Failed to update pending-active domain to active.", tag.WorkflowDomainID(domainID), tag.Error(err))
}
}
}

// CleanPendingActiveState removes the pending active state from the domain
func CleanPendingActiveState(
metadataMgr persistence.MetadataManager,
domainName string,
domainID string,
failoverVersion int64,
policy backoff.RetryPolicy,
) error {
Expand All @@ -184,7 +184,7 @@ func CleanPendingActiveState(
return err
}
notificationVersion := metadata.NotificationVersion
getResponse, err := metadataMgr.GetDomain(&persistence.GetDomainRequest{Name: domainName})
getResponse, err := metadataMgr.GetDomain(&persistence.GetDomainRequest{ID: domainID})
if err != nil {
return err
}
Expand Down
159 changes: 85 additions & 74 deletions common/domain/failover_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,24 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/mocks"
"github.com/uber/cadence/common/persistence"
persistencetests "github.com/uber/cadence/common/persistence/persistence-tests"
"github.com/uber/cadence/common/resource"
"github.com/uber/cadence/common/service/dynamicconfig"
)

type (
failoverWatcherSuite struct {
suite.Suite
persistencetests.TestBase

*require.Assertions
controller *gomock.Controller

mockResource *resource.Test
mockDomainCache *cache.MockDomainCache
timeSource clock.TimeSource
metadataMgr persistence.MetadataManager
mockMetadataMgr *mocks.MetadataManager
watcher *failoverWatcherImpl
}
)
Expand All @@ -67,15 +65,9 @@ func (s *failoverWatcherSuite) SetupSuite() {
if testing.Verbose() {
log.SetOutput(os.Stdout)
}

s.TestBase = persistencetests.NewTestBaseWithCassandra(&persistencetests.TestBaseOptions{
ClusterMetadata: cluster.GetTestClusterMetadata(true, true),
})
s.TestBase.Setup()
}

func (s *failoverWatcherSuite) TearDownSuite() {
s.TestBase.TearDownWorkflowStore()
}

func (s *failoverWatcherSuite) SetupTest() {
Expand All @@ -84,11 +76,16 @@ func (s *failoverWatcherSuite) SetupTest() {

s.mockResource = resource.NewTest(s.controller, metrics.DomainFailoverScope)
s.mockDomainCache = s.mockResource.DomainCache
s.metadataMgr = s.TestBase.MetadataManager
s.timeSource = s.mockResource.GetTimeSource()
s.mockMetadataMgr = s.mockResource.MetadataMgr

s.mockMetadataMgr.On("GetMetadata").Return(&persistence.GetMetadataResponse{
NotificationVersion: 1,
}, nil)

s.watcher = NewFailoverWatcher(
s.mockDomainCache,
s.metadataMgr,
s.mockMetadataMgr,
s.timeSource,
dynamicconfig.GetDurationPropertyFn(10*time.Second),
dynamicconfig.GetFloatPropertyFn(0.2),
Expand Down Expand Up @@ -118,55 +115,76 @@ func (s *failoverWatcherSuite) TestCleanPendingActiveState() {
EmitMetric: true,
}
replicationConfig := &persistence.DomainReplicationConfig{
ActiveClusterName: s.ClusterMetadata.GetCurrentClusterName(),
ActiveClusterName: "active",
Clusters: []*persistence.ClusterReplicationConfig{
{
s.ClusterMetadata.GetCurrentClusterName(),
"active",
},
},
}

_, err := s.metadataMgr.CreateDomain(&persistence.CreateDomainRequest{
Info: info,
Config: domainConfig,
ReplicationConfig: replicationConfig,
IsGlobalDomain: true,
ConfigVersion: 1,
FailoverVersion: 1,
})
s.NoError(err)
s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{
ID: domainName,
}).Return(&persistence.GetDomainResponse{
Info: info,
Config: domainConfig,
ReplicationConfig: replicationConfig,
IsGlobalDomain: true,
ConfigVersion: 1,
FailoverVersion: 1,
FailoverNotificationVersion: 1,
FailoverEndTime: nil,
NotificationVersion: 1,
}, nil).Times(1)

// does not have failover end time
err = CleanPendingActiveState(s.metadataMgr, domainName, 1, s.watcher.retryPolicy)
err := CleanPendingActiveState(s.mockMetadataMgr, domainName, 1, s.watcher.retryPolicy)
s.NoError(err)

metadata, err := s.metadataMgr.GetMetadata()
s.NoError(err)
notificationVersion := metadata.NotificationVersion
err = s.metadataMgr.UpdateDomain(&persistence.UpdateDomainRequest{
s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{
ID: domainName,
}).Return(&persistence.GetDomainResponse{
Info: info,
Config: domainConfig,
ReplicationConfig: replicationConfig,
IsGlobalDomain: true,
ConfigVersion: 1,
FailoverVersion: 2,
FailoverNotificationVersion: notificationVersion,
FailoverEndTime: common.Int64Ptr(2),
NotificationVersion: notificationVersion,
})
FailoverVersion: 1,
FailoverNotificationVersion: 1,
FailoverEndTime: common.Int64Ptr(1),
NotificationVersion: 1,
}, nil).Times(1)

// does not match failover versions
err = CleanPendingActiveState(s.mockMetadataMgr, domainName, 5, s.watcher.retryPolicy)
s.NoError(err)

// does not have failover version
err = CleanPendingActiveState(s.metadataMgr, domainName, 5, s.watcher.retryPolicy)
s.NoError(err)

err = CleanPendingActiveState(s.metadataMgr, domainName, 2, s.watcher.retryPolicy)
s.NoError(err)
s.mockMetadataMgr.On("UpdateDomain", &persistence.UpdateDomainRequest{
Info: info,
Config: domainConfig,
ReplicationConfig: replicationConfig,
ConfigVersion: 1,
FailoverVersion: 2,
FailoverNotificationVersion: 2,
FailoverEndTime: nil,
NotificationVersion: 1,
}).Return(nil).Times(1)
s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{
ID: domainName,
}).Return(&persistence.GetDomainResponse{
Info: info,
Config: domainConfig,
ReplicationConfig: replicationConfig,
IsGlobalDomain: true,
ConfigVersion: 1,
FailoverVersion: 2,
FailoverNotificationVersion: 2,
FailoverEndTime: common.Int64Ptr(1),
NotificationVersion: 1,
}, nil).Times(1)

resp, err := s.metadataMgr.GetDomain(&persistence.GetDomainRequest{
Name: domainName,
})
err = CleanPendingActiveState(s.mockMetadataMgr, domainName, 2, s.watcher.retryPolicy)
s.NoError(err)
s.True(resp.FailoverEndTime == nil)
}

func (s *failoverWatcherSuite) TestHandleFailoverTimeout() {
Expand All @@ -184,54 +202,47 @@ func (s *failoverWatcherSuite) TestHandleFailoverTimeout() {
EmitMetric: true,
}
replicationConfig := &persistence.DomainReplicationConfig{
ActiveClusterName: s.ClusterMetadata.GetCurrentClusterName(),
ActiveClusterName: "active",
Clusters: []*persistence.ClusterReplicationConfig{
{
s.ClusterMetadata.GetCurrentClusterName(),
"active",
},
},
}

_, err := s.metadataMgr.CreateDomain(&persistence.CreateDomainRequest{
Info: info,
Config: domainConfig,
ReplicationConfig: replicationConfig,
IsGlobalDomain: true,
ConfigVersion: 1,
FailoverVersion: 1,
})
s.NoError(err)

metadata, err := s.metadataMgr.GetMetadata()
s.NoError(err)
notificationVersion := metadata.NotificationVersion

endtime := common.Int64Ptr(s.timeSource.Now().UnixNano() - 1)
err = s.metadataMgr.UpdateDomain(&persistence.UpdateDomainRequest{

s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{
ID: domainName,
}).Return(&persistence.GetDomainResponse{
Info: info,
Config: domainConfig,
ReplicationConfig: replicationConfig,
IsGlobalDomain: true,
ConfigVersion: 1,
FailoverVersion: 2,
FailoverNotificationVersion: notificationVersion,
FailoverVersion: 1,
FailoverNotificationVersion: 1,
FailoverEndTime: endtime,
NotificationVersion: notificationVersion,
})
s.NoError(err)
NotificationVersion: 1,
}, nil).Times(1)
s.mockMetadataMgr.On("UpdateDomain", &persistence.UpdateDomainRequest{
Info: info,
Config: domainConfig,
ReplicationConfig: replicationConfig,
ConfigVersion: 1,
FailoverVersion: 1,
FailoverNotificationVersion: 1,
FailoverEndTime: nil,
NotificationVersion: 1,
}).Return(nil).Times(1)

domainEntry := cache.NewDomainCacheEntryForTest(
info,
domainConfig,
true,
replicationConfig,
2,
1,
endtime,
s.ClusterMetadata,
nil,
)
s.watcher.handleFailoverTimeout(domainEntry)

resp, err := s.metadataMgr.GetDomain(&persistence.GetDomainRequest{
Name: domainName,
})
s.NoError(err)
s.True(resp.FailoverEndTime == nil)
}
9 changes: 8 additions & 1 deletion common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ var keys = map[Key]string{
MutableStateChecksumVerifyProbability: "history.mutableStateChecksumVerifyProbability",
MutableStateChecksumInvalidateBefore: "history.mutableStateChecksumInvalidateBefore",
ReplicationEventsFromCurrentCluster: "history.ReplicationEventsFromCurrentCluster",
NotifyFailoverMarkerInterval: "history.NotifyFailoverMarkerInterval",
NotifyFailoverMarkerTimerJitterCoefficient: "history.NotifyFailoverMarkerTimerJitterCoefficient",

WorkerPersistenceMaxQPS: "worker.persistenceMaxQPS",
WorkerPersistenceGlobalMaxQPS: "worker.persistenceGlobalMaxQPS",
Expand Down Expand Up @@ -810,9 +812,14 @@ const (
// MutableStateChecksumInvalidateBefore is the epoch timestamp before which all checksums are to be discarded
MutableStateChecksumInvalidateBefore

//ReplicationEventsFromCurrentCluster is a feature flag to allow cross DC replicate events that generated from the current cluster
// ReplicationEventsFromCurrentCluster is a feature flag to allow cross DC replicate events that generated from the current cluster
ReplicationEventsFromCurrentCluster

// NotifyFailoverMarkerInterval determines the frequency to notify failover marker
NotifyFailoverMarkerInterval
// NotifyFailoverMarkerTimerJitterCoefficient is the jitter for failover marker notifier timer
NotifyFailoverMarkerTimerJitterCoefficient

// lastKeyForTest must be the last one in this const group for testing purpose
lastKeyForTest
)
Expand Down
11 changes: 10 additions & 1 deletion service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,13 @@ type Config struct {
MutableStateChecksumVerifyProbability dynamicconfig.IntPropertyFnWithDomainFilter
MutableStateChecksumInvalidateBefore dynamicconfig.FloatPropertyFn

//Crocess DC Replication configuration
//Cross DC Replication configuration
ReplicationEventsFromCurrentCluster dynamicconfig.BoolPropertyFnWithDomainFilter

//Failover marker heartbeat
NotifyFailoverMarkerInterval dynamicconfig.DurationPropertyFn
NotifyFailoverMarkerTimerJitterCoefficient dynamicconfig.FloatPropertyFn
EnableGracefulFailover dynamicconfig.BoolPropertyFn
}

const (
Expand Down Expand Up @@ -402,6 +407,10 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA
MutableStateChecksumInvalidateBefore: dc.GetFloat64Property(dynamicconfig.MutableStateChecksumInvalidateBefore, 0),

ReplicationEventsFromCurrentCluster: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.ReplicationEventsFromCurrentCluster, false),

NotifyFailoverMarkerInterval: dc.GetDurationProperty(dynamicconfig.NotifyFailoverMarkerInterval, 5*time.Second),
NotifyFailoverMarkerTimerJitterCoefficient: dc.GetFloat64Property(dynamicconfig.NotifyFailoverMarkerTimerJitterCoefficient, 0.15),
EnableGracefulFailover: dc.GetBoolProperty(dynamicconfig.EnableGracefulFailover, false),
}

return cfg
Expand Down
Loading

0 comments on commit 009d5c3

Please sign in to comment.