diff --git a/common/domain/failover_watcher.go b/common/domain/failover_watcher.go index 1abac30ec94..45ec8f8a5ea 100644 --- a/common/domain/failover_watcher.go +++ b/common/domain/failover_watcher.go @@ -153,16 +153,16 @@ func (p *failoverWatcherImpl) handleFailoverTimeout( failoverEndTime := domain.GetDomainFailoverEndTime() if failoverEndTime != nil && p.timeSource.Now().After(time.Unix(0, *failoverEndTime)) { - domainName := domain.GetInfo().Name + domainID := domain.GetInfo().ID // force failover the domain without setting the failover timeout if err := CleanPendingActiveState( p.metadataMgr, - domainName, + domainID, domain.GetFailoverVersion(), p.retryPolicy, ); err != nil { p.metrics.IncCounter(metrics.DomainFailoverScope, metrics.CadenceFailures) - p.logger.Error("Failed to update pending-active domain to active.", tag.WorkflowDomainID(domainName), tag.Error(err)) + p.logger.Error("Failed to update pending-active domain to active.", tag.WorkflowDomainID(domainID), tag.Error(err)) } } } @@ -170,7 +170,7 @@ func (p *failoverWatcherImpl) handleFailoverTimeout( // CleanPendingActiveState removes the pending active state from the domain func CleanPendingActiveState( metadataMgr persistence.MetadataManager, - domainName string, + domainID string, failoverVersion int64, policy backoff.RetryPolicy, ) error { @@ -184,7 +184,7 @@ func CleanPendingActiveState( return err } notificationVersion := metadata.NotificationVersion - getResponse, err := metadataMgr.GetDomain(&persistence.GetDomainRequest{Name: domainName}) + getResponse, err := metadataMgr.GetDomain(&persistence.GetDomainRequest{ID: domainID}) if err != nil { return err } diff --git a/common/domain/failover_watcher_test.go b/common/domain/failover_watcher_test.go index 58cd7eb87a7..f6441d2099d 100644 --- a/common/domain/failover_watcher_test.go +++ b/common/domain/failover_watcher_test.go @@ -34,10 +34,9 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/clock" - "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/mocks" "github.com/uber/cadence/common/persistence" - persistencetests "github.com/uber/cadence/common/persistence/persistence-tests" "github.com/uber/cadence/common/resource" "github.com/uber/cadence/common/service/dynamicconfig" ) @@ -45,7 +44,6 @@ import ( type ( failoverWatcherSuite struct { suite.Suite - persistencetests.TestBase *require.Assertions controller *gomock.Controller @@ -53,7 +51,7 @@ type ( mockResource *resource.Test mockDomainCache *cache.MockDomainCache timeSource clock.TimeSource - metadataMgr persistence.MetadataManager + mockMetadataMgr *mocks.MetadataManager watcher *failoverWatcherImpl } ) @@ -67,15 +65,9 @@ func (s *failoverWatcherSuite) SetupSuite() { if testing.Verbose() { log.SetOutput(os.Stdout) } - - s.TestBase = persistencetests.NewTestBaseWithCassandra(&persistencetests.TestBaseOptions{ - ClusterMetadata: cluster.GetTestClusterMetadata(true, true), - }) - s.TestBase.Setup() } func (s *failoverWatcherSuite) TearDownSuite() { - s.TestBase.TearDownWorkflowStore() } func (s *failoverWatcherSuite) SetupTest() { @@ -84,11 +76,16 @@ func (s *failoverWatcherSuite) SetupTest() { s.mockResource = resource.NewTest(s.controller, metrics.DomainFailoverScope) s.mockDomainCache = s.mockResource.DomainCache - s.metadataMgr = s.TestBase.MetadataManager s.timeSource = s.mockResource.GetTimeSource() + s.mockMetadataMgr = s.mockResource.MetadataMgr + + s.mockMetadataMgr.On("GetMetadata").Return(&persistence.GetMetadataResponse{ + NotificationVersion: 1, + }, nil) + s.watcher = NewFailoverWatcher( s.mockDomainCache, - s.metadataMgr, + s.mockMetadataMgr, s.timeSource, dynamicconfig.GetDurationPropertyFn(10*time.Second), dynamicconfig.GetFloatPropertyFn(0.2), @@ -118,55 +115,76 @@ func (s *failoverWatcherSuite) TestCleanPendingActiveState() { EmitMetric: true, } replicationConfig := &persistence.DomainReplicationConfig{ - ActiveClusterName: s.ClusterMetadata.GetCurrentClusterName(), + ActiveClusterName: "active", Clusters: []*persistence.ClusterReplicationConfig{ { - s.ClusterMetadata.GetCurrentClusterName(), + "active", }, }, } - _, err := s.metadataMgr.CreateDomain(&persistence.CreateDomainRequest{ - Info: info, - Config: domainConfig, - ReplicationConfig: replicationConfig, - IsGlobalDomain: true, - ConfigVersion: 1, - FailoverVersion: 1, - }) - s.NoError(err) + s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ + ID: domainName, + }).Return(&persistence.GetDomainResponse{ + Info: info, + Config: domainConfig, + ReplicationConfig: replicationConfig, + IsGlobalDomain: true, + ConfigVersion: 1, + FailoverVersion: 1, + FailoverNotificationVersion: 1, + FailoverEndTime: nil, + NotificationVersion: 1, + }, nil).Times(1) // does not have failover end time - err = CleanPendingActiveState(s.metadataMgr, domainName, 1, s.watcher.retryPolicy) + err := CleanPendingActiveState(s.mockMetadataMgr, domainName, 1, s.watcher.retryPolicy) s.NoError(err) - metadata, err := s.metadataMgr.GetMetadata() - s.NoError(err) - notificationVersion := metadata.NotificationVersion - err = s.metadataMgr.UpdateDomain(&persistence.UpdateDomainRequest{ + s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ + ID: domainName, + }).Return(&persistence.GetDomainResponse{ Info: info, Config: domainConfig, ReplicationConfig: replicationConfig, + IsGlobalDomain: true, ConfigVersion: 1, - FailoverVersion: 2, - FailoverNotificationVersion: notificationVersion, - FailoverEndTime: common.Int64Ptr(2), - NotificationVersion: notificationVersion, - }) + FailoverVersion: 1, + FailoverNotificationVersion: 1, + FailoverEndTime: common.Int64Ptr(1), + NotificationVersion: 1, + }, nil).Times(1) + + // does not match failover versions + err = CleanPendingActiveState(s.mockMetadataMgr, domainName, 5, s.watcher.retryPolicy) s.NoError(err) - // does not have failover version - err = CleanPendingActiveState(s.metadataMgr, domainName, 5, s.watcher.retryPolicy) - s.NoError(err) - - err = CleanPendingActiveState(s.metadataMgr, domainName, 2, s.watcher.retryPolicy) - s.NoError(err) + s.mockMetadataMgr.On("UpdateDomain", &persistence.UpdateDomainRequest{ + Info: info, + Config: domainConfig, + ReplicationConfig: replicationConfig, + ConfigVersion: 1, + FailoverVersion: 2, + FailoverNotificationVersion: 2, + FailoverEndTime: nil, + NotificationVersion: 1, + }).Return(nil).Times(1) + s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ + ID: domainName, + }).Return(&persistence.GetDomainResponse{ + Info: info, + Config: domainConfig, + ReplicationConfig: replicationConfig, + IsGlobalDomain: true, + ConfigVersion: 1, + FailoverVersion: 2, + FailoverNotificationVersion: 2, + FailoverEndTime: common.Int64Ptr(1), + NotificationVersion: 1, + }, nil).Times(1) - resp, err := s.metadataMgr.GetDomain(&persistence.GetDomainRequest{ - Name: domainName, - }) + err = CleanPendingActiveState(s.mockMetadataMgr, domainName, 2, s.watcher.retryPolicy) s.NoError(err) - s.True(resp.FailoverEndTime == nil) } func (s *failoverWatcherSuite) TestHandleFailoverTimeout() { @@ -184,54 +202,47 @@ func (s *failoverWatcherSuite) TestHandleFailoverTimeout() { EmitMetric: true, } replicationConfig := &persistence.DomainReplicationConfig{ - ActiveClusterName: s.ClusterMetadata.GetCurrentClusterName(), + ActiveClusterName: "active", Clusters: []*persistence.ClusterReplicationConfig{ { - s.ClusterMetadata.GetCurrentClusterName(), + "active", }, }, } - - _, err := s.metadataMgr.CreateDomain(&persistence.CreateDomainRequest{ - Info: info, - Config: domainConfig, - ReplicationConfig: replicationConfig, - IsGlobalDomain: true, - ConfigVersion: 1, - FailoverVersion: 1, - }) - s.NoError(err) - - metadata, err := s.metadataMgr.GetMetadata() - s.NoError(err) - notificationVersion := metadata.NotificationVersion - endtime := common.Int64Ptr(s.timeSource.Now().UnixNano() - 1) - err = s.metadataMgr.UpdateDomain(&persistence.UpdateDomainRequest{ + + s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ + ID: domainName, + }).Return(&persistence.GetDomainResponse{ Info: info, Config: domainConfig, ReplicationConfig: replicationConfig, + IsGlobalDomain: true, ConfigVersion: 1, - FailoverVersion: 2, - FailoverNotificationVersion: notificationVersion, + FailoverVersion: 1, + FailoverNotificationVersion: 1, FailoverEndTime: endtime, - NotificationVersion: notificationVersion, - }) - s.NoError(err) + NotificationVersion: 1, + }, nil).Times(1) + s.mockMetadataMgr.On("UpdateDomain", &persistence.UpdateDomainRequest{ + Info: info, + Config: domainConfig, + ReplicationConfig: replicationConfig, + ConfigVersion: 1, + FailoverVersion: 1, + FailoverNotificationVersion: 1, + FailoverEndTime: nil, + NotificationVersion: 1, + }).Return(nil).Times(1) + domainEntry := cache.NewDomainCacheEntryForTest( info, domainConfig, true, replicationConfig, - 2, + 1, endtime, - s.ClusterMetadata, + nil, ) s.watcher.handleFailoverTimeout(domainEntry) - - resp, err := s.metadataMgr.GetDomain(&persistence.GetDomainRequest{ - Name: domainName, - }) - s.NoError(err) - s.True(resp.FailoverEndTime == nil) } diff --git a/common/service/dynamicconfig/constants.go b/common/service/dynamicconfig/constants.go index 5c4f1f79df7..4e69df3ffb1 100644 --- a/common/service/dynamicconfig/constants.go +++ b/common/service/dynamicconfig/constants.go @@ -258,6 +258,8 @@ var keys = map[Key]string{ MutableStateChecksumVerifyProbability: "history.mutableStateChecksumVerifyProbability", MutableStateChecksumInvalidateBefore: "history.mutableStateChecksumInvalidateBefore", ReplicationEventsFromCurrentCluster: "history.ReplicationEventsFromCurrentCluster", + NotifyFailoverMarkerInterval: "history.NotifyFailoverMarkerInterval", + NotifyFailoverMarkerTimerJitterCoefficient: "history.NotifyFailoverMarkerTimerJitterCoefficient", WorkerPersistenceMaxQPS: "worker.persistenceMaxQPS", WorkerPersistenceGlobalMaxQPS: "worker.persistenceGlobalMaxQPS", @@ -810,9 +812,14 @@ const ( // MutableStateChecksumInvalidateBefore is the epoch timestamp before which all checksums are to be discarded MutableStateChecksumInvalidateBefore - //ReplicationEventsFromCurrentCluster is a feature flag to allow cross DC replicate events that generated from the current cluster + // ReplicationEventsFromCurrentCluster is a feature flag to allow cross DC replicate events that generated from the current cluster ReplicationEventsFromCurrentCluster + // NotifyFailoverMarkerInterval determines the frequency to notify failover marker + NotifyFailoverMarkerInterval + // NotifyFailoverMarkerTimerJitterCoefficient is the jitter for failover marker notifier timer + NotifyFailoverMarkerTimerJitterCoefficient + // lastKeyForTest must be the last one in this const group for testing purpose lastKeyForTest ) diff --git a/service/history/config/config.go b/service/history/config/config.go index 0c4591d1dcb..0eb5105bb9b 100644 --- a/service/history/config/config.go +++ b/service/history/config/config.go @@ -228,8 +228,13 @@ type Config struct { MutableStateChecksumVerifyProbability dynamicconfig.IntPropertyFnWithDomainFilter MutableStateChecksumInvalidateBefore dynamicconfig.FloatPropertyFn - //Crocess DC Replication configuration + //Cross DC Replication configuration ReplicationEventsFromCurrentCluster dynamicconfig.BoolPropertyFnWithDomainFilter + + //Failover marker heartbeat + NotifyFailoverMarkerInterval dynamicconfig.DurationPropertyFn + NotifyFailoverMarkerTimerJitterCoefficient dynamicconfig.FloatPropertyFn + EnableGracefulFailover dynamicconfig.BoolPropertyFn } const ( @@ -402,6 +407,10 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA MutableStateChecksumInvalidateBefore: dc.GetFloat64Property(dynamicconfig.MutableStateChecksumInvalidateBefore, 0), ReplicationEventsFromCurrentCluster: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.ReplicationEventsFromCurrentCluster, false), + + NotifyFailoverMarkerInterval: dc.GetDurationProperty(dynamicconfig.NotifyFailoverMarkerInterval, 5*time.Second), + NotifyFailoverMarkerTimerJitterCoefficient: dc.GetFloat64Property(dynamicconfig.NotifyFailoverMarkerTimerJitterCoefficient, 0.15), + EnableGracefulFailover: dc.GetBoolProperty(dynamicconfig.EnableGracefulFailover, false), } return cfg diff --git a/service/history/failover/coordinator.go b/service/history/failover/coordinator.go new file mode 100644 index 00000000000..dcc9699a2b4 --- /dev/null +++ b/service/history/failover/coordinator.go @@ -0,0 +1,333 @@ +// The MIT License (MIT) +// +// Copyright (c) 2017-2020 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +//go:generate mockgen -copyright_file ../../../LICENSE -package $GOPACKAGE -source $GOFILE -destination coordinator_mock.go -self_package github.com/uber/cadence/service/history/failover + +package failover + +import ( + ctx "context" + "sync/atomic" + "time" + + workflow "github.com/uber/cadence/.gen/go/history" + "github.com/uber/cadence/.gen/go/replicator" + "github.com/uber/cadence/client/history" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/backoff" + "github.com/uber/cadence/common/clock" + "github.com/uber/cadence/common/domain" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/service/history/config" +) + +const ( + notificationChanBufferSize = 800 + receiveChanBufferSize = 400 + cleanupMarkerInterval = 30 * time.Minute + invalidMarkerDuration = 1 * time.Hour + updateDomainRetryInitialInterval = 50 * time.Millisecond + updateDomainRetryCoefficient = 2.0 + updateDomainMaxRetry = 2 +) + +type ( + // Coordinator manages the failover markers on sending and receiving + Coordinator interface { + common.Daemon + + NotifyFailoverMarkers(shardID int32, markers []*replicator.FailoverMarkerAttributes) <-chan error + ReceiveFailoverMarkers(shardIDs []int32, marker *replicator.FailoverMarkerAttributes) + } + + coordinatorImpl struct { + status int32 + recorder map[string]*failoverRecord + notificationChan chan *notificationRequest + receiveChan chan *receiveRequest + shutdownChan chan struct{} + retryPolicy backoff.RetryPolicy + + metadataMgr persistence.MetadataManager + historyClient history.Client + config *config.Config + timeSource clock.TimeSource + metrics metrics.Client + logger log.Logger + } + + notificationRequest struct { + shardID int32 + markers []*replicator.FailoverMarkerAttributes + respCh chan error + } + + receiveRequest struct { + shardIDs []int32 + marker *replicator.FailoverMarkerAttributes + } + + failoverRecord struct { + failoverVersion int64 + shards map[int32]struct{} + lastUpdatedTime time.Time + } +) + +// NewCoordinator initialize a failover coordinator +func NewCoordinator( + metadataMgr persistence.MetadataManager, + historyClient history.Client, + timeSource clock.TimeSource, + config *config.Config, + metrics metrics.Client, + logger log.Logger, +) Coordinator { + + retryPolicy := backoff.NewExponentialRetryPolicy(updateDomainRetryInitialInterval) + retryPolicy.SetBackoffCoefficient(updateDomainRetryCoefficient) + retryPolicy.SetMaximumAttempts(updateDomainMaxRetry) + + return &coordinatorImpl{ + status: common.DaemonStatusInitialized, + recorder: make(map[string]*failoverRecord), + notificationChan: make(chan *notificationRequest, notificationChanBufferSize), + receiveChan: make(chan *receiveRequest, receiveChanBufferSize), + shutdownChan: make(chan struct{}), + retryPolicy: retryPolicy, + metadataMgr: metadataMgr, + historyClient: historyClient, + timeSource: timeSource, + config: config, + metrics: metrics, + logger: logger, + } +} + +func (c *coordinatorImpl) Start() { + + if !atomic.CompareAndSwapInt32(&c.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) { + return + } + + go c.receiveFailoverMarkersLoop() + go c.notifyFailoverMarkerLoop() + + c.logger.Info("Failover coordinator started.") +} + +func (c *coordinatorImpl) Stop() { + + if !atomic.CompareAndSwapInt32(&c.status, common.DaemonStatusStarted, common.DaemonStatusStopped) { + return + } + + close(c.shutdownChan) + c.logger.Info("Failover coordinator stopped.") +} + +func (c *coordinatorImpl) NotifyFailoverMarkers( + shardID int32, + markers []*replicator.FailoverMarkerAttributes, +) <-chan error { + + respCh := make(chan error, 1) + c.notificationChan <- ¬ificationRequest{ + shardID: shardID, + markers: markers, + respCh: respCh, + } + + return respCh +} + +func (c *coordinatorImpl) ReceiveFailoverMarkers( + shardIDs []int32, + marker *replicator.FailoverMarkerAttributes, +) { + + c.receiveChan <- &receiveRequest{ + shardIDs: shardIDs, + marker: marker, + } +} + +func (c *coordinatorImpl) receiveFailoverMarkersLoop() { + + ticker := time.NewTicker(cleanupMarkerInterval) + defer ticker.Stop() + + for { + select { + case <-c.shutdownChan: + return + case <-ticker.C: + c.cleanupInvalidMarkers() + case request := <-c.receiveChan: + c.handleFailoverMarkers(request) + } + } +} + +func (c *coordinatorImpl) notifyFailoverMarkerLoop() { + + timer := time.NewTimer(backoff.JitDuration( + c.config.NotifyFailoverMarkerInterval(), + c.config.NotifyFailoverMarkerTimerJitterCoefficient(), + )) + defer timer.Stop() + requestByMarker := make(map[*replicator.FailoverMarkerAttributes]*receiveRequest) + channelsByMarker := make(map[*replicator.FailoverMarkerAttributes][]chan error) + + for { + select { + case <-c.shutdownChan: + return + case notificationReq := <-c.notificationChan: + // if there is a shard movement happen, it is fine to have duplicate shard ID in the request + // The receiver side will de-dup the shard IDs. See: handleFailoverMarkers + aggregateNotificationRequests(notificationReq, requestByMarker, channelsByMarker) + case <-timer.C: + c.notifyRemoteCoordinator(requestByMarker, channelsByMarker) + timer.Reset(backoff.JitDuration( + c.config.NotifyFailoverMarkerInterval(), + c.config.NotifyFailoverMarkerTimerJitterCoefficient(), + )) + } + } +} + +func (c *coordinatorImpl) handleFailoverMarkers( + request *receiveRequest, +) { + + marker := request.marker + domainID := marker.GetDomainID() + + if record, ok := c.recorder[domainID]; ok { + // if the local failover version is smaller than the new received marker, + // it means there is another failover happened and the local one should be invalid. + if record.failoverVersion < marker.GetFailoverVersion() { + delete(c.recorder, domainID) + } + + // if the local failover version is larger than the new received marker, + // ignore the incoming marker + if record.failoverVersion > marker.GetFailoverVersion() { + return + } + } + + if _, ok := c.recorder[domainID]; !ok { + // initialize the failover record + c.recorder[marker.GetDomainID()] = &failoverRecord{ + failoverVersion: marker.GetFailoverVersion(), + shards: make(map[int32]struct{}), + } + } + + record := c.recorder[domainID] + record.lastUpdatedTime = c.timeSource.Now() + for _, shardID := range request.shardIDs { + record.shards[shardID] = struct{}{} + } + + if len(record.shards) == c.config.NumberOfShards { + if err := domain.CleanPendingActiveState( + c.metadataMgr, + domainID, + record.failoverVersion, + c.retryPolicy, + ); err != nil { + c.logger.Error("Coordinator failed to update domain after receiving all failover markers", + tag.WorkflowDomainID(domainID)) + c.metrics.IncCounter(metrics.DomainFailoverScope, metrics.CadenceFailures) + return + } + delete(c.recorder, domainID) + } +} + +func (c *coordinatorImpl) cleanupInvalidMarkers() { + for domainID, record := range c.recorder { + if c.timeSource.Now().Sub(record.lastUpdatedTime) > invalidMarkerDuration { + delete(c.recorder, domainID) + } + } +} + +func (c *coordinatorImpl) notifyRemoteCoordinator( + requestByMarker map[*replicator.FailoverMarkerAttributes]*receiveRequest, + channelsByMarker map[*replicator.FailoverMarkerAttributes][]chan error, +) { + + if len(requestByMarker) > 0 { + var tokens []*workflow.FailoverMarkerToken + for _, request := range requestByMarker { + tokens = append(tokens, &workflow.FailoverMarkerToken{ + ShardIDs: request.shardIDs, + FailoverMarker: request.marker, + }) + } + + err := c.historyClient.NotifyFailoverMarkers( + ctx.Background(), + &workflow.NotifyFailoverMarkersRequest{ + FailoverMarkerTokens: tokens, + }, + ) + + for marker := range requestByMarker { + for _, respCh := range channelsByMarker[marker] { + respCh <- err + close(respCh) + delete(channelsByMarker, marker) + } + delete(requestByMarker, marker) + } + } +} + +func aggregateNotificationRequests( + request *notificationRequest, + requestByMarker map[*replicator.FailoverMarkerAttributes]*receiveRequest, + channelsByMarker map[*replicator.FailoverMarkerAttributes][]chan error, +) { + + for _, marker := range request.markers { + if _, ok := requestByMarker[marker]; !ok { + requestByMarker[marker] = &receiveRequest{ + shardIDs: []int32{}, + marker: marker, + } + channelsByMarker[marker] = []chan error{} + } + req := requestByMarker[marker] + req.shardIDs = append(req.shardIDs, request.shardID) + channels := channelsByMarker[marker] + channels = append(channels, request.respCh) + channelsByMarker[marker] = channels + } +} diff --git a/service/history/failover/coordinator_mock.go b/service/history/failover/coordinator_mock.go new file mode 100644 index 00000000000..fcb8ab6d4b6 --- /dev/null +++ b/service/history/failover/coordinator_mock.go @@ -0,0 +1,109 @@ +// The MIT License (MIT) +// +// Copyright (c) 2017-2020 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. +// + +// Code generated by MockGen. DO NOT EDIT. +// Source: coordinator.go + +// Package failover is a generated GoMock package. +package failover + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + + replicator "github.com/uber/cadence/.gen/go/replicator" +) + +// MockCoordinator is a mock of Coordinator interface +type MockCoordinator struct { + ctrl *gomock.Controller + recorder *MockCoordinatorMockRecorder +} + +// MockCoordinatorMockRecorder is the mock recorder for MockCoordinator +type MockCoordinatorMockRecorder struct { + mock *MockCoordinator +} + +// NewMockCoordinator creates a new mock instance +func NewMockCoordinator(ctrl *gomock.Controller) *MockCoordinator { + mock := &MockCoordinator{ctrl: ctrl} + mock.recorder = &MockCoordinatorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockCoordinator) EXPECT() *MockCoordinatorMockRecorder { + return m.recorder +} + +// Start mocks base method +func (m *MockCoordinator) Start() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Start") +} + +// Start indicates an expected call of Start +func (mr *MockCoordinatorMockRecorder) Start() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockCoordinator)(nil).Start)) +} + +// Stop mocks base method +func (m *MockCoordinator) Stop() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Stop") +} + +// Stop indicates an expected call of Stop +func (mr *MockCoordinatorMockRecorder) Stop() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockCoordinator)(nil).Stop)) +} + +// NotifyFailoverMarkers mocks base method +func (m *MockCoordinator) NotifyFailoverMarkers(shardID int32, markers []*replicator.FailoverMarkerAttributes) <-chan error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NotifyFailoverMarkers", shardID, markers) + ret0, _ := ret[0].(<-chan error) + return ret0 +} + +// NotifyFailoverMarkers indicates an expected call of NotifyFailoverMarkers +func (mr *MockCoordinatorMockRecorder) NotifyFailoverMarkers(shardID, markers interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NotifyFailoverMarkers", reflect.TypeOf((*MockCoordinator)(nil).NotifyFailoverMarkers), shardID, markers) +} + +// ReceiveFailoverMarkers mocks base method +func (m *MockCoordinator) ReceiveFailoverMarkers(shardIDs []int32, marker *replicator.FailoverMarkerAttributes) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ReceiveFailoverMarkers", shardIDs, marker) +} + +// ReceiveFailoverMarkers indicates an expected call of ReceiveFailoverMarkers +func (mr *MockCoordinatorMockRecorder) ReceiveFailoverMarkers(shardIDs, marker interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReceiveFailoverMarkers", reflect.TypeOf((*MockCoordinator)(nil).ReceiveFailoverMarkers), shardIDs, marker) +} diff --git a/service/history/failover/coordinator_test.go b/service/history/failover/coordinator_test.go new file mode 100644 index 00000000000..5dcdce05e27 --- /dev/null +++ b/service/history/failover/coordinator_test.go @@ -0,0 +1,404 @@ +// The MIT License (MIT) +// +// Copyright (c) 2017-2020 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package failover + +import ( + ctx "context" + "fmt" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/pborman/uuid" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/uber/cadence/.gen/go/history" + "github.com/uber/cadence/.gen/go/history/historyservicetest" + "github.com/uber/cadence/.gen/go/replicator" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/metrics" + mmocks "github.com/uber/cadence/common/mocks" + "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/resource" + "github.com/uber/cadence/common/service/dynamicconfig" + "github.com/uber/cadence/service/history/config" +) + +type ( + coordinatorSuite struct { + suite.Suite + *require.Assertions + + controller *gomock.Controller + mockResource *resource.Test + mockMetadataManager *mmocks.MetadataManager + historyClient *historyservicetest.MockClient + config *config.Config + coordinator *coordinatorImpl + } +) + +func TestCoordinatorSuite(t *testing.T) { + s := new(coordinatorSuite) + suite.Run(t, s) +} + +func (s *coordinatorSuite) SetupTest() { + s.Assertions = require.New(s.T()) + s.controller = gomock.NewController(s.T()) + s.mockResource = resource.NewTest(s.controller, metrics.History) + s.mockMetadataManager = s.mockResource.MetadataMgr + s.historyClient = s.mockResource.HistoryClient + s.config = config.NewForTest() + s.config.NumberOfShards = 2 + s.config.NotifyFailoverMarkerInterval = dynamicconfig.GetDurationPropertyFn(10 * time.Millisecond) + s.config.NotifyFailoverMarkerTimerJitterCoefficient = dynamicconfig.GetFloatPropertyFn(0.01) + + s.coordinator = NewCoordinator( + s.mockMetadataManager, + s.historyClient, + s.mockResource.GetTimeSource(), + s.config, + s.mockResource.GetMetricsClient(), + s.mockResource.GetLogger(), + ).(*coordinatorImpl) +} + +func (s *coordinatorSuite) TearDownTest() { + s.controller.Finish() + s.mockResource.Finish(s.T()) + s.coordinator.Stop() +} + +func (s *coordinatorSuite) TestNotifyFailoverMarkers() { + attributes := &replicator.FailoverMarkerAttributes{ + DomainID: common.StringPtr(uuid.New()), + FailoverVersion: common.Int64Ptr(1), + CreationTime: common.Int64Ptr(1), + } + s.historyClient.EXPECT().NotifyFailoverMarkers( + ctx.Background(), &history.NotifyFailoverMarkersRequest{ + FailoverMarkerTokens: []*history.FailoverMarkerToken{ + { + ShardIDs: []int32{1, 2}, + FailoverMarker: attributes, + }, + }, + }, + ).Return(nil).Times(1) + respCh1 := s.coordinator.NotifyFailoverMarkers( + 1, + []*replicator.FailoverMarkerAttributes{attributes}, + ) + respCh2 := s.coordinator.NotifyFailoverMarkers( + 2, + []*replicator.FailoverMarkerAttributes{attributes}, + ) + s.coordinator.Start() + err1 := <-respCh1 + err2 := <-respCh2 + s.NoError(err1) + s.NoError(err2) +} + +func (s *coordinatorSuite) TestNotifyRemoteCoordinator_Empty() { + requestByMarker := make(map[*replicator.FailoverMarkerAttributes]*receiveRequest) + channelsByMarker := make(map[*replicator.FailoverMarkerAttributes][]chan error) + s.historyClient.EXPECT().NotifyFailoverMarkers(ctx.Background(), gomock.Any()).Times(0) + s.coordinator.notifyRemoteCoordinator(requestByMarker, channelsByMarker) +} + +func (s *coordinatorSuite) TestNotifyRemoteCoordinator() { + + requestByMarker := make(map[*replicator.FailoverMarkerAttributes]*receiveRequest) + channelsByMarker := make(map[*replicator.FailoverMarkerAttributes][]chan error) + attributes := &replicator.FailoverMarkerAttributes{ + DomainID: common.StringPtr(uuid.New()), + FailoverVersion: common.Int64Ptr(1), + CreationTime: common.Int64Ptr(1), + } + requestByMarker[attributes] = &receiveRequest{ + shardIDs: []int32{1, 2, 3}, + marker: attributes, + } + chan1 := make(chan error, 1) + chan2 := make(chan error, 1) + chan3 := make(chan error, 1) + channelsByMarker[attributes] = []chan error{chan1, chan2, chan3} + + s.historyClient.EXPECT().NotifyFailoverMarkers( + ctx.Background(), &history.NotifyFailoverMarkersRequest{ + FailoverMarkerTokens: []*history.FailoverMarkerToken{ + { + ShardIDs: []int32{1, 2, 3}, + FailoverMarker: attributes, + }, + }, + }, + ).Return(nil).Times(1) + s.coordinator.notifyRemoteCoordinator(requestByMarker, channelsByMarker) + err1 := <-chan1 + s.NoError(err1) + err2 := <-chan2 + s.NoError(err2) + err3 := <-chan3 + s.NoError(err3) + s.Equal(0, len(requestByMarker)) + s.Equal(0, len(channelsByMarker)) +} + +func (s *coordinatorSuite) TestAggregateNotificationRequests() { + requestByMarker := make(map[*replicator.FailoverMarkerAttributes]*receiveRequest) + channelsByMarker := make(map[*replicator.FailoverMarkerAttributes][]chan error) + attributes1 := &replicator.FailoverMarkerAttributes{ + DomainID: common.StringPtr(uuid.New()), + FailoverVersion: common.Int64Ptr(1), + CreationTime: common.Int64Ptr(1), + } + attributes2 := &replicator.FailoverMarkerAttributes{ + DomainID: common.StringPtr(uuid.New()), + FailoverVersion: common.Int64Ptr(2), + CreationTime: common.Int64Ptr(2), + } + request1 := ¬ificationRequest{ + shardID: 1, + markers: []*replicator.FailoverMarkerAttributes{attributes1}, + respCh: make(chan error, 1), + } + aggregateNotificationRequests(request1, requestByMarker, channelsByMarker) + request2 := ¬ificationRequest{ + shardID: 2, + markers: []*replicator.FailoverMarkerAttributes{attributes1}, + respCh: make(chan error, 1), + } + aggregateNotificationRequests(request2, requestByMarker, channelsByMarker) + request3 := ¬ificationRequest{ + shardID: 3, + markers: []*replicator.FailoverMarkerAttributes{attributes1, attributes2}, + respCh: make(chan error, 1), + } + aggregateNotificationRequests(request3, requestByMarker, channelsByMarker) + s.Equal(3, len(channelsByMarker[attributes1])) + s.Equal(1, len(channelsByMarker[attributes2])) + s.Equal([]int32{1, 2, 3}, requestByMarker[attributes1].shardIDs) + s.Equal([]int32{3}, requestByMarker[attributes2].shardIDs) +} + +func (s *coordinatorSuite) TestHandleFailoverMarkers_DeleteExpiredFailoverMarker() { + domainID := uuid.New() + attributes1 := &replicator.FailoverMarkerAttributes{ + DomainID: common.StringPtr(domainID), + FailoverVersion: common.Int64Ptr(1), + CreationTime: common.Int64Ptr(1), + } + attributes2 := &replicator.FailoverMarkerAttributes{ + DomainID: common.StringPtr(domainID), + FailoverVersion: common.Int64Ptr(2), + CreationTime: common.Int64Ptr(1), + } + request1 := &receiveRequest{ + shardIDs: []int32{1}, + marker: attributes1, + } + request2 := &receiveRequest{ + shardIDs: []int32{2}, + marker: attributes2, + } + + s.coordinator.handleFailoverMarkers(request1) + s.coordinator.handleFailoverMarkers(request2) + s.Equal(1, len(s.coordinator.recorder)) +} + +func (s *coordinatorSuite) TestHandleFailoverMarkers_IgnoreExpiredFailoverMarker() { + domainID := uuid.New() + attributes1 := &replicator.FailoverMarkerAttributes{ + DomainID: common.StringPtr(domainID), + FailoverVersion: common.Int64Ptr(1), + CreationTime: common.Int64Ptr(1), + } + attributes2 := &replicator.FailoverMarkerAttributes{ + DomainID: common.StringPtr(domainID), + FailoverVersion: common.Int64Ptr(2), + CreationTime: common.Int64Ptr(1), + } + request1 := &receiveRequest{ + shardIDs: []int32{1}, + marker: attributes1, + } + request2 := &receiveRequest{ + shardIDs: []int32{2}, + marker: attributes2, + } + + s.coordinator.handleFailoverMarkers(request2) + s.coordinator.handleFailoverMarkers(request1) + s.Equal(1, len(s.coordinator.recorder)) +} + +func (s *coordinatorSuite) TestHandleFailoverMarkers_CleanPendingActiveState_Success() { + domainID := uuid.New() + attributes1 := &replicator.FailoverMarkerAttributes{ + DomainID: common.StringPtr(domainID), + FailoverVersion: common.Int64Ptr(2), + CreationTime: common.Int64Ptr(1), + } + attributes2 := &replicator.FailoverMarkerAttributes{ + DomainID: common.StringPtr(domainID), + FailoverVersion: common.Int64Ptr(2), + CreationTime: common.Int64Ptr(1), + } + request1 := &receiveRequest{ + shardIDs: []int32{1}, + marker: attributes1, + } + request2 := &receiveRequest{ + shardIDs: []int32{2}, + marker: attributes2, + } + info := &persistence.DomainInfo{ + ID: domainID, + Name: uuid.New(), + Status: persistence.DomainStatusRegistered, + Description: "some random description", + OwnerEmail: "some random email", + Data: nil, + } + domainConfig := &persistence.DomainConfig{ + Retention: 1, + EmitMetric: true, + } + replicationConfig := &persistence.DomainReplicationConfig{ + ActiveClusterName: "active", + Clusters: []*persistence.ClusterReplicationConfig{ + { + "active", + }, + }, + } + + s.mockMetadataManager.On("GetMetadata").Return(&persistence.GetMetadataResponse{ + NotificationVersion: 1, + }, nil) + s.mockMetadataManager.On("GetDomain", &persistence.GetDomainRequest{ + ID: domainID, + }).Return(&persistence.GetDomainResponse{ + Info: info, + Config: domainConfig, + ReplicationConfig: replicationConfig, + IsGlobalDomain: true, + ConfigVersion: 1, + FailoverVersion: 2, + FailoverNotificationVersion: 2, + FailoverEndTime: common.Int64Ptr(1), + NotificationVersion: 1, + }, nil).Times(1) + s.mockMetadataManager.On("UpdateDomain", &persistence.UpdateDomainRequest{ + Info: info, + Config: domainConfig, + ReplicationConfig: replicationConfig, + ConfigVersion: 1, + FailoverVersion: 2, + FailoverNotificationVersion: 2, + FailoverEndTime: nil, + NotificationVersion: 1, + }).Return(nil).Times(1) + + s.coordinator.handleFailoverMarkers(request1) + s.coordinator.handleFailoverMarkers(request2) + s.Equal(0, len(s.coordinator.recorder)) +} + +func (s *coordinatorSuite) TestHandleFailoverMarkers_CleanPendingActiveState_Error() { + domainID := uuid.New() + attributes1 := &replicator.FailoverMarkerAttributes{ + DomainID: common.StringPtr(domainID), + FailoverVersion: common.Int64Ptr(2), + CreationTime: common.Int64Ptr(1), + } + attributes2 := &replicator.FailoverMarkerAttributes{ + DomainID: common.StringPtr(domainID), + FailoverVersion: common.Int64Ptr(2), + CreationTime: common.Int64Ptr(1), + } + request1 := &receiveRequest{ + shardIDs: []int32{1}, + marker: attributes1, + } + request2 := &receiveRequest{ + shardIDs: []int32{2}, + marker: attributes2, + } + info := &persistence.DomainInfo{ + ID: domainID, + Name: uuid.New(), + Status: persistence.DomainStatusRegistered, + Description: "some random description", + OwnerEmail: "some random email", + Data: nil, + } + domainConfig := &persistence.DomainConfig{ + Retention: 1, + EmitMetric: true, + } + replicationConfig := &persistence.DomainReplicationConfig{ + ActiveClusterName: "active", + Clusters: []*persistence.ClusterReplicationConfig{ + { + "active", + }, + }, + } + + s.mockMetadataManager.On("GetMetadata").Return(&persistence.GetMetadataResponse{ + NotificationVersion: 1, + }, nil) + s.mockMetadataManager.On("GetDomain", &persistence.GetDomainRequest{ + ID: domainID, + }).Return(&persistence.GetDomainResponse{ + Info: info, + Config: domainConfig, + ReplicationConfig: replicationConfig, + IsGlobalDomain: true, + ConfigVersion: 1, + FailoverVersion: 2, + FailoverNotificationVersion: 2, + FailoverEndTime: common.Int64Ptr(1), + NotificationVersion: 1, + }, nil).Times(1) + s.mockMetadataManager.On("UpdateDomain", &persistence.UpdateDomainRequest{ + Info: info, + Config: domainConfig, + ReplicationConfig: replicationConfig, + ConfigVersion: 1, + FailoverVersion: 2, + FailoverNotificationVersion: 2, + FailoverEndTime: nil, + NotificationVersion: 1, + }).Return(fmt.Errorf("test error")).Times(3) + + s.coordinator.handleFailoverMarkers(request1) + s.coordinator.handleFailoverMarkers(request2) + s.Equal(1, len(s.coordinator.recorder)) +} diff --git a/service/history/handler.go b/service/history/handler.go index b8e43ed9440..09553e2a387 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -228,6 +228,7 @@ func (h *Handler) CreateEngine( h.replicationTaskFetchers, h.GetMatchingRawClient(), h.queueTaskProcessor, + h.GetFailoverCoordinator(), ) } @@ -1889,8 +1890,12 @@ func (h *Handler) NotifyFailoverMarkers( sw := h.GetMetricsClient().StartTimer(scope, metrics.CadenceLatency) defer sw.Stop() - //TODO: wire up the function with failover coordinator - return &gen.BadRequestError{Message: "This method has not been implemented."} + for _, token := range request.GetFailoverMarkerTokens() { + marker := token.GetFailoverMarker() + h.GetLogger().Debug("Handling failover maker", tag.WorkflowDomainID(marker.GetDomainID())) + h.GetFailoverCoordinator().ReceiveFailoverMarkers(token.GetShardIDs(), token.GetFailoverMarker()) + } + return nil } // convertError is a helper method to convert ShardOwnershipLostError from persistence layer returned by various diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 1707a58ac23..33ae05ddf86 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -56,6 +56,7 @@ import ( "github.com/uber/cadence/service/history/engine" "github.com/uber/cadence/service/history/events" "github.com/uber/cadence/service/history/execution" + "github.com/uber/cadence/service/history/failover" "github.com/uber/cadence/service/history/ndc" "github.com/uber/cadence/service/history/query" "github.com/uber/cadence/service/history/queue" @@ -114,6 +115,7 @@ type ( rawMatchingClient matching.Client clientChecker client.VersionChecker replicationDLQHandler replication.DLQHandler + failoverCoordinator failover.Coordinator } ) @@ -172,6 +174,7 @@ func NewEngineWithShardContext( replicationTaskFetchers replication.TaskFetchers, rawMatchingClient matching.Client, queueTaskProcessor task.Processor, + failoverCoordinator failover.Coordinator, ) engine.Engine { currentClusterName := shard.GetService().GetClusterMetadata().GetCurrentClusterName() @@ -207,11 +210,12 @@ func NewEngineWithShardContext( executionCache, logger, ), - publicClient: publicClient, - matchingClient: matching, - rawMatchingClient: rawMatchingClient, - queueTaskProcessor: queueTaskProcessor, - clientChecker: client.NewVersionChecker(), + publicClient: publicClient, + matchingClient: matching, + rawMatchingClient: rawMatchingClient, + queueTaskProcessor: queueTaskProcessor, + clientChecker: client.NewVersionChecker(), + failoverCoordinator: failoverCoordinator, } historyEngImpl.resetor = newWorkflowResetor(historyEngImpl) historyEngImpl.decisionHandler = newDecisionHandler(historyEngImpl) diff --git a/service/history/resource/resource.go b/service/history/resource/resource.go index b733a42efa4..29c0cac02bc 100644 --- a/service/history/resource/resource.go +++ b/service/history/resource/resource.go @@ -21,36 +21,84 @@ package resource import ( + "sync/atomic" + + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/resource" "github.com/uber/cadence/common/service" "github.com/uber/cadence/service/history/config" "github.com/uber/cadence/service/history/events" + "github.com/uber/cadence/service/history/failover" ) // Resource is the interface which expose common history resources type Resource interface { resource.Resource GetEventCache() events.Cache + GetFailoverCoordinator() failover.Coordinator } -// Impl contains all common resources shared across history -type Impl struct { +type resourceImpl struct { + status int32 + resource.Resource - eventCache events.Cache + config *config.Config + eventCache events.Cache + failoverCoordinator failover.Coordinator +} + +// Start starts all resources +func (h *resourceImpl) Start() { + + if !atomic.CompareAndSwapInt32( + &h.status, + common.DaemonStatusInitialized, + common.DaemonStatusStarted, + ) { + return + } + + h.Resource.Start() + if h.config.EnableGracefulFailover() { + h.failoverCoordinator.Start() + } + h.GetLogger().Info("history resource started", tag.LifeCycleStarted) +} + +// Stop stops all resources +func (h *resourceImpl) Stop() { + + if !atomic.CompareAndSwapInt32( + &h.status, + common.DaemonStatusStarted, + common.DaemonStatusStopped, + ) { + return + } + + h.Resource.Stop() + h.failoverCoordinator.Stop() + h.GetLogger().Info("history resource stopped", tag.LifeCycleStopped) } // GetEventCache return event cache -func (h *Impl) GetEventCache() events.Cache { +func (h *resourceImpl) GetEventCache() events.Cache { return h.eventCache } +// GetFailoverCoordinator return failover coordinator +func (h *resourceImpl) GetFailoverCoordinator() failover.Coordinator { + return h.failoverCoordinator +} + // New create a new resource containing common history dependencies func New( params *service.BootstrapParams, serviceName string, config *config.Config, visibilityManagerInitializer resource.VisibilityManagerInitializer, -) (impl *Impl, retError error) { +) (historyResource Resource, retError error) { serviceResource, err := resource.New( params, serviceName, @@ -63,17 +111,29 @@ func New( return nil, err } - impl = &Impl{ - Resource: serviceResource, - eventCache: events.NewGlobalCache( - config.EventsCacheGlobalInitialCount(), - config.EventsCacheGlobalMaxCount(), - config.EventsCacheTTL(), - serviceResource.GetHistoryManager(), - params.Logger, - params.MetricsClient, - uint64(config.EventsCacheMaxSize()), - ), + eventCache := events.NewGlobalCache( + config.EventsCacheGlobalInitialCount(), + config.EventsCacheGlobalMaxCount(), + config.EventsCacheTTL(), + serviceResource.GetHistoryManager(), + params.Logger, + params.MetricsClient, + uint64(config.EventsCacheMaxSize()), + ) + coordinator := failover.NewCoordinator( + serviceResource.GetMetadataManager(), + serviceResource.GetHistoryClient(), + serviceResource.GetTimeSource(), + config, + serviceResource.GetMetricsClient(), + serviceResource.GetLogger(), + ) + + historyResource = &resourceImpl{ + Resource: serviceResource, + config: config, + eventCache: eventCache, + failoverCoordinator: coordinator, } - return impl, nil + return } diff --git a/service/history/resource/resourceTest.go b/service/history/resource/resourceTest.go index 2bdbb1da83d..da308abaf9e 100644 --- a/service/history/resource/resourceTest.go +++ b/service/history/resource/resourceTest.go @@ -26,13 +26,15 @@ import ( "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/resource" "github.com/uber/cadence/service/history/events" + "github.com/uber/cadence/service/history/failover" ) type ( // Test is the test implementation used for testing Test struct { *resource.Test - EventCache *events.MockCache + EventCache *events.MockCache + FailoverCoordinator *failover.MockCoordinator } ) @@ -44,8 +46,9 @@ func NewTest( serviceMetricsIndex metrics.ServiceIdx, ) *Test { return &Test{ - Test: resource.NewTest(controller, serviceMetricsIndex), - EventCache: events.NewMockCache(controller), + Test: resource.NewTest(controller, serviceMetricsIndex), + EventCache: events.NewMockCache(controller), + FailoverCoordinator: failover.NewMockCoordinator(controller), } } @@ -53,3 +56,8 @@ func NewTest( func (s *Test) GetEventCache() events.Cache { return s.EventCache } + +// GetFailoverCoordinator return failover coordinator +func (s *Test) GetFailoverCoordinator() failover.Coordinator { + return s.FailoverCoordinator +} diff --git a/service/history/service.go b/service/history/service.go index cca27adce4c..bcef6f86e3c 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -52,7 +52,8 @@ type Service struct { func NewService( params *service.BootstrapParams, ) (resource.Resource, error) { - serviceConfig := config.New(dynamicconfig.NewCollection(params.DynamicConfig, params.Logger), + serviceConfig := config.New( + dynamicconfig.NewCollection(params.DynamicConfig, params.Logger), params.PersistenceConfig.NumHistoryShards, params.PersistenceConfig.DefaultStoreType(), params.PersistenceConfig.IsAdvancedVisibilityConfigExist()) diff --git a/service/history/shard/controller_test.go b/service/history/shard/controller_test.go index a9a70c8024d..2ba80a2165e 100644 --- a/service/history/shard/controller_test.go +++ b/service/history/shard/controller_test.go @@ -45,7 +45,7 @@ import ( ) type ( - shardControllerSuite struct { + controllerSuite struct { suite.Suite *require.Assertions @@ -65,12 +65,12 @@ type ( } ) -func TestShardControllerSuite(t *testing.T) { - s := new(shardControllerSuite) +func TestControllerSuite(t *testing.T) { + s := new(controllerSuite) suite.Run(t, s) } -func (s *shardControllerSuite) SetupTest() { +func (s *controllerSuite) SetupTest() { s.Assertions = require.New(s.T()) s.controller = gomock.NewController(s.T()) @@ -90,12 +90,12 @@ func (s *shardControllerSuite) SetupTest() { s.shardController = NewShardController(s.mockResource, s.mockEngineFactory, s.config).(*controller) } -func (s *shardControllerSuite) TearDownTest() { +func (s *controllerSuite) TearDownTest() { s.controller.Finish() s.mockResource.Finish(s.T()) } -func (s *shardControllerSuite) TestAcquireShardSuccess() { +func (s *controllerSuite) TestAcquireShardSuccess() { numShards := 10 s.config.NumberOfShards = numShards @@ -174,7 +174,7 @@ func (s *shardControllerSuite) TestAcquireShardSuccess() { s.Equal(3, count) } -func (s *shardControllerSuite) TestAcquireShardsConcurrently() { +func (s *controllerSuite) TestAcquireShardsConcurrently() { numShards := 10 s.config.NumberOfShards = numShards s.config.AcquireShardConcurrency = func(opts ...dynamicconfig.FilterOption) int { @@ -256,7 +256,7 @@ func (s *shardControllerSuite) TestAcquireShardsConcurrently() { s.Equal(3, count) } -func (s *shardControllerSuite) TestAcquireShardLookupFailure() { +func (s *controllerSuite) TestAcquireShardLookupFailure() { numShards := 2 s.config.NumberOfShards = numShards for shardID := 0; shardID < numShards; shardID++ { @@ -270,7 +270,7 @@ func (s *shardControllerSuite) TestAcquireShardLookupFailure() { } } -func (s *shardControllerSuite) TestAcquireShardRenewSuccess() { +func (s *controllerSuite) TestAcquireShardRenewSuccess() { numShards := 2 s.config.NumberOfShards = numShards @@ -344,7 +344,7 @@ func (s *shardControllerSuite) TestAcquireShardRenewSuccess() { } } -func (s *shardControllerSuite) TestAcquireShardRenewLookupFailed() { +func (s *controllerSuite) TestAcquireShardRenewLookupFailed() { numShards := 2 s.config.NumberOfShards = numShards @@ -418,7 +418,7 @@ func (s *shardControllerSuite) TestAcquireShardRenewLookupFailed() { } } -func (s *shardControllerSuite) TestHistoryEngineClosed() { +func (s *controllerSuite) TestHistoryEngineClosed() { numShards := 4 s.config.NumberOfShards = numShards s.shardController = NewShardController(s.mockResource, s.mockEngineFactory, s.config).(*controller) @@ -506,7 +506,7 @@ func (s *shardControllerSuite) TestHistoryEngineClosed() { s.shardController.Stop() } -func (s *shardControllerSuite) TestShardControllerClosed() { +func (s *controllerSuite) TestShardControllerClosed() { numShards := 4 s.config.NumberOfShards = numShards s.shardController = NewShardController(s.mockResource, s.mockEngineFactory, s.config).(*controller) @@ -554,7 +554,7 @@ func (s *shardControllerSuite) TestShardControllerClosed() { workerWG.Wait() } -func (s *shardControllerSuite) setupMocksForAcquireShard(shardID int, mockEngine *engine.MockEngine, currentRangeID, +func (s *controllerSuite) setupMocksForAcquireShard(shardID int, mockEngine *engine.MockEngine, currentRangeID, newRangeID int64) { replicationAck := int64(201)