Skip to content

Commit

Permalink
Limit frequent domain config updates (cadence-workflow#3575)
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 authored Oct 12, 2020
1 parent f8134f5 commit 396af91
Show file tree
Hide file tree
Showing 29 changed files with 312 additions and 128 deletions.
52 changes: 48 additions & 4 deletions .gen/go/sqlblobs/sqlblobs.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions common/domain/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@

package domain

import "time"

const (
// MinRetentionDays is the minimal retention days for any domain
MinRetentionDays = 1

// MaxBadBinaries is the maximal number of bad client binaries stored in a domain
MaxBadBinaries = 10

// FailoverCoolDown is the duration between two failovers
FailoverCoolDown = 1 * time.Minute
)
88 changes: 55 additions & 33 deletions common/domain/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,18 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/archiver/provider"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service/dynamicconfig"
)

var (
errFailoverTooFrequent = &shared.ServiceBusyError{Message: "The domain failovers too frequent."}
)

type (
// Handler is the domain operation handler
Handler interface {
Expand All @@ -66,46 +71,55 @@ type (
) (*shared.UpdateDomainResponse, error)
}

// HandlerImpl is the domain operation handler implementation
HandlerImpl struct {
maxBadBinaryCount dynamicconfig.IntPropertyFnWithDomainFilter
logger log.Logger
// handlerImpl is the domain operation handler implementation
handlerImpl struct {
metadataMgr persistence.MetadataManager
clusterMetadata cluster.Metadata
domainReplicator Replicator
domainAttrValidator *AttrValidatorImpl
archivalMetadata archiver.ArchivalMetadata
archiverProvider provider.ArchiverProvider
timeSource clock.TimeSource
config Config
logger log.Logger
}

// Config is the domain config for domain handler
Config struct {
MinRetentionDays dynamicconfig.IntPropertyFn
MaxBadBinaryCount dynamicconfig.IntPropertyFnWithDomainFilter
FailoverCoolDown dynamicconfig.DurationPropertyFnWithDomainFilter
}
)

var _ Handler = (*HandlerImpl)(nil)
var _ Handler = (*handlerImpl)(nil)

// NewHandler create a new domain handler
func NewHandler(
minRetentionDays int,
maxBadBinaryCount dynamicconfig.IntPropertyFnWithDomainFilter,
config Config,
logger log.Logger,
metadataMgr persistence.MetadataManager,
clusterMetadata cluster.Metadata,
domainReplicator Replicator,
archivalMetadata archiver.ArchivalMetadata,
archiverProvider provider.ArchiverProvider,
) *HandlerImpl {
return &HandlerImpl{
maxBadBinaryCount: maxBadBinaryCount,
timeSource clock.TimeSource,
) Handler {
return &handlerImpl{
logger: logger,
metadataMgr: metadataMgr,
clusterMetadata: clusterMetadata,
domainReplicator: domainReplicator,
domainAttrValidator: newAttrValidator(clusterMetadata, int32(minRetentionDays)),
domainAttrValidator: newAttrValidator(clusterMetadata, int32(config.MinRetentionDays())),
archivalMetadata: archivalMetadata,
archiverProvider: archiverProvider,
timeSource: timeSource,
config: config,
}
}

// RegisterDomain register a new domain
func (d *HandlerImpl) RegisterDomain(
func (d *handlerImpl) RegisterDomain(
ctx context.Context,
registerRequest *shared.RegisterDomainRequest,
) error {
Expand Down Expand Up @@ -243,6 +257,7 @@ func (d *HandlerImpl) RegisterDomain(
IsGlobalDomain: isGlobalDomain,
ConfigVersion: 0,
FailoverVersion: failoverVersion,
LastUpdatedTime: d.timeSource.Now().UnixNano(),
}

domainResponse, err := d.metadataMgr.CreateDomain(ctx, domainRequest)
Expand Down Expand Up @@ -276,7 +291,7 @@ func (d *HandlerImpl) RegisterDomain(
}

// ListDomains list all domains
func (d *HandlerImpl) ListDomains(
func (d *handlerImpl) ListDomains(
ctx context.Context,
listRequest *shared.ListDomainsRequest,
) (*shared.ListDomainsResponse, error) {
Expand Down Expand Up @@ -314,7 +329,7 @@ func (d *HandlerImpl) ListDomains(
}

// DescribeDomain describe the domain
func (d *HandlerImpl) DescribeDomain(
func (d *handlerImpl) DescribeDomain(
ctx context.Context,
describeRequest *shared.DescribeDomainRequest,
) (*shared.DescribeDomainResponse, error) {
Expand All @@ -338,7 +353,7 @@ func (d *HandlerImpl) DescribeDomain(
}

// UpdateDomain update the domain
func (d *HandlerImpl) UpdateDomain(
func (d *handlerImpl) UpdateDomain(
ctx context.Context,
updateRequest *shared.UpdateDomainRequest,
) (*shared.UpdateDomainResponse, error) {
Expand Down Expand Up @@ -367,6 +382,7 @@ func (d *HandlerImpl) UpdateDomain(
gracefulFailoverEndTime := getResponse.FailoverEndTime
currentActiveCluster := replicationConfig.ActiveClusterName
previousFailoverVersion := getResponse.PreviousFailoverVersion
lastUpdatedTime := time.Unix(0, getResponse.LastUpdatedTime)

// whether history archival config changed
historyArchivalConfigChanged := false
Expand Down Expand Up @@ -453,7 +469,7 @@ func (d *HandlerImpl) UpdateDomain(
if gracefulFailoverEndTime != nil {
return nil, errOngoingGracefulFailover
}
endTime := time.Now().UTC().Add(time.Duration(updateRequest.GetFailoverTimeoutInSeconds()) * time.Second).UnixNano()
endTime := d.timeSource.Now().Add(time.Duration(updateRequest.GetFailoverTimeoutInSeconds()) * time.Second).UnixNano()
gracefulFailoverEndTime = &endTime
previousFailoverVersion = failoverVersion
}
Expand Down Expand Up @@ -486,11 +502,16 @@ func (d *HandlerImpl) UpdateDomain(
}

if configurationChanged || activeClusterChanged {
now := d.timeSource.Now()
// Check the failover cool down time
if lastUpdatedTime.Add(d.config.FailoverCoolDown(info.Name)).After(now) {
return nil, errFailoverTooFrequent
}

// set the versions
if configurationChanged {
configVersion++
}

if activeClusterChanged && isGlobalDomain {
// Force failover cleans graceful failover state
if !updateRequest.IsSetFailoverTimeoutInSeconds() {
Expand All @@ -504,7 +525,7 @@ func (d *HandlerImpl) UpdateDomain(
)
failoverNotificationVersion = notificationVersion
}

lastUpdatedTime = now
updateReq := &persistence.UpdateDomainRequest{
Info: info,
Config: config,
Expand All @@ -514,6 +535,7 @@ func (d *HandlerImpl) UpdateDomain(
FailoverNotificationVersion: failoverNotificationVersion,
FailoverEndTime: gracefulFailoverEndTime,
PreviousFailoverVersion: previousFailoverVersion,
LastUpdatedTime: lastUpdatedTime.UnixNano(),
NotificationVersion: notificationVersion,
}
err = d.metadataMgr.UpdateDomain(ctx, updateReq)
Expand Down Expand Up @@ -552,7 +574,7 @@ func (d *HandlerImpl) UpdateDomain(
}

// DeprecateDomain deprecates a domain
func (d *HandlerImpl) DeprecateDomain(
func (d *handlerImpl) DeprecateDomain(
ctx context.Context,
deprecateRequest *shared.DeprecateDomainRequest,
) error {
Expand Down Expand Up @@ -595,7 +617,7 @@ func (d *HandlerImpl) DeprecateDomain(
return nil
}

func (d *HandlerImpl) createResponse(
func (d *handlerImpl) createResponse(
info *persistence.DomainInfo,
config *persistence.DomainConfig,
replicationConfig *persistence.DomainReplicationConfig,
Expand Down Expand Up @@ -635,7 +657,7 @@ func (d *HandlerImpl) createResponse(
return infoResult, configResult, replicationConfigResult
}

func (d *HandlerImpl) mergeBadBinaries(
func (d *handlerImpl) mergeBadBinaries(
old map[string]*shared.BadBinaryInfo,
new map[string]*shared.BadBinaryInfo,
createTimeNano int64,
Expand All @@ -653,7 +675,7 @@ func (d *HandlerImpl) mergeBadBinaries(
}
}

func (d *HandlerImpl) mergeDomainData(
func (d *handlerImpl) mergeDomainData(
old map[string]string,
new map[string]string,
) map[string]string {
Expand All @@ -667,7 +689,7 @@ func (d *HandlerImpl) mergeDomainData(
return old
}

func (d *HandlerImpl) toArchivalRegisterEvent(
func (d *handlerImpl) toArchivalRegisterEvent(
status *shared.ArchivalStatus,
URI string,
defaultStatus shared.ArchivalStatus,
Expand All @@ -688,7 +710,7 @@ func (d *HandlerImpl) toArchivalRegisterEvent(
return event, nil
}

func (d *HandlerImpl) toArchivalUpdateEvent(
func (d *handlerImpl) toArchivalUpdateEvent(
status *shared.ArchivalStatus,
URI string,
defaultURI string,
Expand All @@ -705,7 +727,7 @@ func (d *HandlerImpl) toArchivalUpdateEvent(
return event, nil
}

func (d *HandlerImpl) validateHistoryArchivalURI(URIString string) error {
func (d *handlerImpl) validateHistoryArchivalURI(URIString string) error {
URI, err := archiver.NewURI(URIString)
if err != nil {
return err
Expand All @@ -719,7 +741,7 @@ func (d *HandlerImpl) validateHistoryArchivalURI(URIString string) error {
return archiver.ValidateURI(URI)
}

func (d *HandlerImpl) validateVisibilityArchivalURI(URIString string) error {
func (d *handlerImpl) validateVisibilityArchivalURI(URIString string) error {
URI, err := archiver.NewURI(URIString)
if err != nil {
return err
Expand All @@ -733,7 +755,7 @@ func (d *HandlerImpl) validateVisibilityArchivalURI(URIString string) error {
return archiver.ValidateURI(URI)
}

func (d *HandlerImpl) getHistoryArchivalState(
func (d *handlerImpl) getHistoryArchivalState(
config *persistence.DomainConfig,
requestedConfig *shared.DomainConfiguration,
) (*ArchivalState, bool, error) {
Expand All @@ -758,7 +780,7 @@ func (d *HandlerImpl) getHistoryArchivalState(
return currentHistoryArchivalState, false, nil
}

func (d *HandlerImpl) getVisibilityArchivalState(
func (d *handlerImpl) getVisibilityArchivalState(
config *persistence.DomainConfig,
requestedConfig *shared.DomainConfiguration,
) (*ArchivalState, bool, error) {
Expand All @@ -781,7 +803,7 @@ func (d *HandlerImpl) getVisibilityArchivalState(
return currentVisibilityArchivalState, false, nil
}

func (d *HandlerImpl) updateDomainInfo(
func (d *handlerImpl) updateDomainInfo(
updatedDomainInfo *shared.UpdateDomainInfo,
currentDomainInfo *persistence.DomainInfo,
) (*persistence.DomainInfo, bool) {
Expand All @@ -805,7 +827,7 @@ func (d *HandlerImpl) updateDomainInfo(
return currentDomainInfo, isDomainUpdated
}

func (d *HandlerImpl) updateDomainConfiguration(
func (d *handlerImpl) updateDomainConfiguration(
domainName string,
config *persistence.DomainConfig,
domainConfig *shared.DomainConfiguration,
Expand All @@ -822,7 +844,7 @@ func (d *HandlerImpl) updateDomainConfiguration(
config.Retention = domainConfig.GetWorkflowExecutionRetentionPeriodInDays()
}
if domainConfig.BadBinaries != nil {
maxLength := d.maxBadBinaryCount(domainName)
maxLength := d.config.MaxBadBinaryCount(domainName)
// only do merging
config.BadBinaries = d.mergeBadBinaries(config.BadBinaries.Binaries, domainConfig.BadBinaries.Binaries, time.Now().UnixNano())
if len(config.BadBinaries.Binaries) > maxLength {
Expand All @@ -835,7 +857,7 @@ func (d *HandlerImpl) updateDomainConfiguration(
return config, isConfigChanged, nil
}

func (d *HandlerImpl) updateDeleteBadBinary(
func (d *handlerImpl) updateDeleteBadBinary(
config *persistence.DomainConfig,
deleteBadBinary *string,
) (*persistence.DomainConfig, bool, error) {
Expand All @@ -853,7 +875,7 @@ func (d *HandlerImpl) updateDeleteBadBinary(
return config, false, nil
}

func (d *HandlerImpl) updateReplicationConfig(
func (d *handlerImpl) updateReplicationConfig(
config *persistence.DomainReplicationConfig,
replicationConfig *shared.DomainReplicationConfiguration,
) (*persistence.DomainReplicationConfig, bool, bool, error) {
Expand Down
15 changes: 11 additions & 4 deletions common/domain/handler_GlobalDomainDisabled_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"log"
"os"
"testing"
"time"

"github.com/pborman/uuid"
"github.com/stretchr/testify/suite"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/archiver/provider"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/log/loggerimpl"
"github.com/uber/cadence/common/mocks"
Expand All @@ -55,7 +57,7 @@ type (
archivalMetadata archiver.ArchivalMetadata
mockArchiverProvider *provider.MockArchiverProvider

handler *HandlerImpl
handler *handlerImpl
}
)

Expand Down Expand Up @@ -95,17 +97,22 @@ func (s *domainHandlerGlobalDomainDisabledSuite) SetupTest() {
false,
&config.ArchivalDomainDefaults{},
)
domainConfig := Config{
MinRetentionDays: dc.GetIntPropertyFn(s.minRetentionDays),
MaxBadBinaryCount: dc.GetIntPropertyFilteredByDomain(s.maxBadBinaryCount),
FailoverCoolDown: dc.GetDurationPropertyFnFilteredByDomain(0 * time.Second),
}
s.mockArchiverProvider = &provider.MockArchiverProvider{}
s.handler = NewHandler(
s.minRetentionDays,
dc.GetIntPropertyFilteredByDomain(s.maxBadBinaryCount),
domainConfig,
logger,
s.metadataMgr,
s.ClusterMetadata,
s.mockDomainReplicator,
s.archivalMetadata,
s.mockArchiverProvider,
)
clock.NewRealTimeSource(),
).(*handlerImpl)
}

func (s *domainHandlerGlobalDomainDisabledSuite) TearDownTest() {
Expand Down
Loading

0 comments on commit 396af91

Please sign in to comment.