From 121ef1e61de156ad8b9a5954e229210dc115c9a4 Mon Sep 17 00:00:00 2001 From: Yu Xia Date: Fri, 31 Jul 2020 04:21:24 -0700 Subject: [PATCH] Bug fixes for failover (#3415) --- common/cache/domainCache.go | 5 ----- common/domain/failover_watcher.go | 4 ++-- common/errors/domainNotActiveError.go | 1 + .../cassandra/cassandraMetadataPersistenceV2.go | 8 +++++--- common/persistence/metadataStore.go | 1 + service/frontend/workflowHandler.go | 2 +- service/history/failover/coordinator.go | 4 ++++ service/history/historyEngine.go | 2 +- service/history/shard/context.go | 3 +++ 9 files changed, 18 insertions(+), 12 deletions(-) diff --git a/common/cache/domainCache.go b/common/cache/domainCache.go index cdb234589f9..7efc5d95894 100644 --- a/common/cache/domainCache.go +++ b/common/cache/domainCache.go @@ -805,11 +805,6 @@ func (entry *DomainCacheEntry) IsDomainPendingActive() bool { return entry.failoverEndTime != nil } -// GetDomainFailoverEndTime returns domain failover end time if it exists -func (entry *DomainCacheEntry) GetDomainFailoverEndTime() *int64 { - return entry.failoverEndTime -} - // GetReplicationPolicy return the derived workflow replication policy func (entry *DomainCacheEntry) GetReplicationPolicy() ReplicationPolicy { // frontend guarantee that the clusters always contains the active domain, so if the # of clusters is 1 diff --git a/common/domain/failover_watcher.go b/common/domain/failover_watcher.go index 45ec8f8a5ea..288359148f7 100644 --- a/common/domain/failover_watcher.go +++ b/common/domain/failover_watcher.go @@ -151,8 +151,8 @@ func (p *failoverWatcherImpl) handleFailoverTimeout( domain *cache.DomainCacheEntry, ) { - failoverEndTime := domain.GetDomainFailoverEndTime() - if failoverEndTime != nil && p.timeSource.Now().After(time.Unix(0, *failoverEndTime)) { + failoverEndTime := domain.GetFailoverEndTime() + if domain.IsDomainPendingActive() && p.timeSource.Now().After(time.Unix(0, *failoverEndTime)) { domainID := domain.GetInfo().ID // force failover the domain without setting the failover timeout if err := CleanPendingActiveState( diff --git a/common/errors/domainNotActiveError.go b/common/errors/domainNotActiveError.go index 839e2e6fece..34ced873025 100644 --- a/common/errors/domainNotActiveError.go +++ b/common/errors/domainNotActiveError.go @@ -51,5 +51,6 @@ func NewDomainPendingActiveError(domainName string, currentCluster string) *work ), DomainName: domainName, CurrentCluster: currentCluster, + ActiveCluster: currentCluster, } } diff --git a/common/persistence/cassandra/cassandraMetadataPersistenceV2.go b/common/persistence/cassandra/cassandraMetadataPersistenceV2.go index 63a296eaba1..3e311715450 100644 --- a/common/persistence/cassandra/cassandraMetadataPersistenceV2.go +++ b/common/persistence/cassandra/cassandraMetadataPersistenceV2.go @@ -39,7 +39,7 @@ import ( const ( constDomainPartition = 0 domainMetadataRecordName = "cadence-domain-metadata" - emptyFailoverEndTime = int64(-1) + emptyFailoverEndTime = int64(0) ) const ( @@ -435,7 +435,8 @@ func (m *cassandraMetadataPersistenceV2) GetDomain(request *p.GetDomainRequest) var responseFailoverEndTime *int64 if failoverEndTime != emptyFailoverEndTime { - responseFailoverEndTime = &failoverEndTime + domainFailoverEndTime := failoverEndTime + responseFailoverEndTime = common.Int64Ptr(domainFailoverEndTime) } return &p.InternalGetDomainResponse{ @@ -515,7 +516,8 @@ func (m *cassandraMetadataPersistenceV2) ListDomains(request *p.ListDomainsReque domain.ReplicationConfig.Clusters = p.GetOrUseDefaultClusters(m.currentClusterName, domain.ReplicationConfig.Clusters) if failoverEndTime != emptyFailoverEndTime { - domain.FailoverEndTime = &failoverEndTime + domainFailoverEndTime := failoverEndTime + domain.FailoverEndTime = common.Int64Ptr(domainFailoverEndTime) } response.Domains = append(response.Domains, domain) } diff --git a/common/persistence/metadataStore.go b/common/persistence/metadataStore.go index b92833575e6..0a5d12ad718 100644 --- a/common/persistence/metadataStore.go +++ b/common/persistence/metadataStore.go @@ -136,6 +136,7 @@ func (m *metadataManagerImpl) ListDomains(request *ListDomainsRequest) (*ListDom ConfigVersion: d.ConfigVersion, FailoverVersion: d.FailoverVersion, FailoverNotificationVersion: d.FailoverNotificationVersion, + FailoverEndTime: d.FailoverEndTime, PreviousFailoverVersion: d.PreviousFailoverVersion, NotificationVersion: d.NotificationVersion, }) diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index 6d76348a24c..72957d15a2f 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -3787,7 +3787,7 @@ func (wh *WorkflowHandler) checkOngoingFailover( if failoverVersion == nil { failoverVersion = resp.FailoverVersion } - if failoverVersion != resp.FailoverVersion { + if *failoverVersion != resp.GetFailoverVersion() { return &gen.BadRequestError{ Message: "Concurrent failover is not allow.", } diff --git a/service/history/failover/coordinator.go b/service/history/failover/coordinator.go index f686bede19a..19af9c97c88 100644 --- a/service/history/failover/coordinator.go +++ b/service/history/failover/coordinator.go @@ -276,6 +276,10 @@ func (c *coordinatorImpl) handleFailoverMarkers( metrics.GracefulFailoverLatency, now.Sub(time.Unix(0, marker.GetCreationTime())), ) + c.logger.Info("Updated domain from pending-active to active", + tag.WorkflowDomainID(domainID), + tag.FailoverVersion(*marker.FailoverVersion), + ) } } diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 3ef96790545..6fb1300288b 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -495,7 +495,7 @@ func (e *historyEngineImpl) registerDomainFailoverCallback() { e.clusterMetadata.ClusterNameForFailoverVersion(previousFailoverVersion) == e.currentClusterName { failoverMarkerTasks = append(failoverMarkerTasks, &persistence.FailoverMarkerTask{ VisibilityTimestamp: e.timeSource.Now(), - Version: shardNotificationVersion, + Version: nextDomain.GetFailoverVersion(), DomainID: nextDomain.GetInfo().ID, }) } diff --git a/service/history/shard/context.go b/service/history/shard/context.go index aed5f80ddb1..5709203ef17 100644 --- a/service/history/shard/context.go +++ b/service/history/shard/context.go @@ -1306,6 +1306,7 @@ func (s *contextImpl) AddingPendingFailoverMarker( } // domain is active, the marker is expired if domainEntry.IsDomainActive() || domainEntry.GetFailoverVersion() > marker.GetFailoverVersion() { + s.logger.Info("Skipped out-of-date failover marker", tag.WorkflowDomainName(domainEntry.GetInfo().Name)) return nil } @@ -1314,6 +1315,7 @@ func (s *contextImpl) AddingPendingFailoverMarker( s.pendingFailoverMarkers = append(s.pendingFailoverMarkers, marker) if err := s.updateFailoverMarkersInShardInfoLocked(); err != nil { + s.logger.Error("Failed to add failover marker.", tag.Error(err)) return err } return s.updateShardInfoLocked() @@ -1352,6 +1354,7 @@ func (s *contextImpl) ValidateAndUpdateFailoverMarkers() ([]*replicator.Failover } } if err := s.updateFailoverMarkersInShardInfoLocked(); err != nil { + s.logger.Error("Failed to update failover marker in shard.", tag.Error(err)) return nil, err } if err := s.updateShardInfoLocked(); err != nil {