From b9075be0b011996722a2ae9f762b4ce81a2a17d6 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Mon, 3 Jun 2019 17:46:57 -0700 Subject: [PATCH] Add retention period check (#1939) --- host/integrationbase.go | 43 +++++++++++++- host/util.go | 5 +- host/xdc/elasticsearch_test.go | 9 +-- ...r_test.go => integration_failover_test.go} | 9 +-- service/frontend/domainHandler.go | 14 +++++ ...domainHandler_GlobalDomainDisabled_test.go | 9 ++- ..._GlobalDomainEnabled_MasterCluster_test.go | 30 ++++++---- ...obalDomainEnabled_NotMasterCluster_test.go | 13 +++-- service/frontend/domainHandler_test.go | 57 +++++++++++++++++++ service/frontend/workflowHandler.go | 1 + 10 files changed, 157 insertions(+), 33 deletions(-) rename host/xdc/{integration_domain_failover_test.go => integration_failover_test.go} (99%) diff --git a/host/integrationbase.go b/host/integrationbase.go index 843016f6f52..d49d792df69 100644 --- a/host/integrationbase.go +++ b/host/integrationbase.go @@ -35,6 +35,7 @@ import ( "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/loggerimpl" "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/environment" "go.uber.org/yarpc" "go.uber.org/yarpc/transport/tchannel" @@ -101,9 +102,7 @@ func (s *IntegrationBase) setupSuite(defaultClusterConfigFile string) { s.Require().NoError( s.registerDomain(s.foreignDomainName, 1, workflow.ArchivalStatusDisabled, "")) - s.archivalDomainName = s.randomizeStr("integration-archival-enabled-domain") - s.Require().NoError( - s.registerDomain(s.archivalDomainName, 0, workflow.ArchivalStatusEnabled, s.testCluster.blobstore.bucketName)) + s.Require().NoError(s.registerArchivalDomain()) // this sleep is necessary because domainv2 cache gets refreshed in the // background only every domainCacheRefreshInterval period @@ -207,3 +206,41 @@ func (s *IntegrationBase) getHistory(domain string, execution *workflow.Workflow return events } + +// To register archival domain we can't use frontend API as the retention period is set to 0 for testing, +// and request will be rejected by frontend. Here we make a call directly to persistence to register +// the domain. +func (s *IntegrationBase) registerArchivalDomain() error { + s.archivalDomainName = s.randomizeStr("integration-archival-enabled-domain") + currentClusterName := s.testCluster.testBase.ClusterMetadata.GetCurrentClusterName() + domainRequest := &persistence.CreateDomainRequest{ + Info: &persistence.DomainInfo{ + ID: uuid.New(), + Name: s.archivalDomainName, + Status: persistence.DomainStatusRegistered, + }, + Config: &persistence.DomainConfig{ + Retention: 0, + ArchivalBucket: s.testCluster.blobstore.bucketName, + ArchivalStatus: workflow.ArchivalStatusEnabled, + BadBinaries: workflow.BadBinaries{Binaries: map[string]*workflow.BadBinaryInfo{}}, + }, + ReplicationConfig: &persistence.DomainReplicationConfig{ + ActiveClusterName: currentClusterName, + Clusters: []*persistence.ClusterReplicationConfig{ + &persistence.ClusterReplicationConfig{ + ClusterName: currentClusterName, + }, + }, + }, + IsGlobalDomain: false, + FailoverVersion: 0, + } + response, err := s.testCluster.testBase.MetadataProxy.CreateDomain(domainRequest) + + s.Logger.Info("Register domain succeeded", + tag.WorkflowDomainName(s.archivalDomainName), + tag.WorkflowDomainID(response.ID), + ) + return err +} diff --git a/host/util.go b/host/util.go index 3a197ae7082..1a3273f8158 100644 --- a/host/util.go +++ b/host/util.go @@ -21,10 +21,11 @@ package host import ( - "github.com/olivere/elastic" - "github.com/stretchr/testify/suite" "io/ioutil" "time" + + "github.com/olivere/elastic" + "github.com/stretchr/testify/suite" ) // CreateESClient create ElasticSearch client for test diff --git a/host/xdc/elasticsearch_test.go b/host/xdc/elasticsearch_test.go index a2400ba49dc..be74e83bc88 100644 --- a/host/xdc/elasticsearch_test.go +++ b/host/xdc/elasticsearch_test.go @@ -141,10 +141,11 @@ func (s *esCrossDCTestSuite) TestSearchAttributes() { domainName := "test-xdc-search-attr-" + common.GenerateRandomString(5) client1 := s.cluster1.GetFrontendClient() // active regReq := &workflow.RegisterDomainRequest{ - Name: common.StringPtr(domainName), - Clusters: clusterReplicationConfigES, - ActiveClusterName: common.StringPtr(clusterNameES[0]), - IsGlobalDomain: common.BoolPtr(true), + Name: common.StringPtr(domainName), + Clusters: clusterReplicationConfigES, + ActiveClusterName: common.StringPtr(clusterNameES[0]), + IsGlobalDomain: common.BoolPtr(true), + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(1), } err := client1.RegisterDomain(createContext(), regReq) s.NoError(err) diff --git a/host/xdc/integration_domain_failover_test.go b/host/xdc/integration_failover_test.go similarity index 99% rename from host/xdc/integration_domain_failover_test.go rename to host/xdc/integration_failover_test.go index 9545639393c..b00220d543e 100644 --- a/host/xdc/integration_domain_failover_test.go +++ b/host/xdc/integration_failover_test.go @@ -132,10 +132,11 @@ func (s *integrationClustersTestSuite) TestDomainFailover() { domainName := "test-domain-for-fail-over-" + common.GenerateRandomString(5) client1 := s.cluster1.GetFrontendClient() // active regReq := &workflow.RegisterDomainRequest{ - Name: common.StringPtr(domainName), - IsGlobalDomain: common.BoolPtr(true), - Clusters: clusterReplicationConfig, - ActiveClusterName: common.StringPtr(clusterName[0]), + Name: common.StringPtr(domainName), + IsGlobalDomain: common.BoolPtr(true), + Clusters: clusterReplicationConfig, + ActiveClusterName: common.StringPtr(clusterName[0]), + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(7), } err := client1.RegisterDomain(createContext(), regReq) s.NoError(err) diff --git a/service/frontend/domainHandler.go b/service/frontend/domainHandler.go index dac023ec0bc..a91792c7565 100644 --- a/service/frontend/domainHandler.go +++ b/service/frontend/domainHandler.go @@ -158,6 +158,10 @@ func (d *domainHandlerImpl) registerDomain(ctx context.Context, } } + if err := d.validateRetentionPeriod(registerRequest.GetWorkflowExecutionRetentionPeriodInDays()); err != nil { + return err + } + failoverVersion := common.EmptyVersion if registerRequest.GetIsGlobalDomain() { failoverVersion = d.clusterMetadata.GetNextFailoverVersion(activeClusterName, 0) @@ -435,6 +439,9 @@ func (d *domainHandlerImpl) updateDomain(ctx context.Context, if updatedConfig.WorkflowExecutionRetentionPeriodInDays != nil { configurationChanged = true config.Retention = updatedConfig.GetWorkflowExecutionRetentionPeriodInDays() + if err := d.validateRetentionPeriod(config.Retention); err != nil { + return nil, err + } } if archivalConfigChanged { configurationChanged = true @@ -683,6 +690,13 @@ func (d *domainHandlerImpl) validateClusterName(clusterName string) error { return nil } +func (d *domainHandlerImpl) validateRetentionPeriod(retentionDays int32) error { + if retentionDays <= 0 { + return errInvalidRetentionPeriod + } + return nil +} + func (d *domainHandlerImpl) checkPermission(securityToken *string) error { if d.config.EnableAdminProtection() { if securityToken == nil { diff --git a/service/frontend/domainHandler_GlobalDomainDisabled_test.go b/service/frontend/domainHandler_GlobalDomainDisabled_test.go index 51f2143cab0..35645cb6d24 100644 --- a/service/frontend/domainHandler_GlobalDomainDisabled_test.go +++ b/service/frontend/domainHandler_GlobalDomainDisabled_test.go @@ -132,8 +132,10 @@ func (s *domainHandlerGlobalDomainDisabledSuite) TestRegisterGetDomain_AllDefaul }) } + retention := int32(1) err := s.handler.registerDomain(context.Background(), &shared.RegisterDomainRequest{ - Name: common.StringPtr(domainName), + Name: common.StringPtr(domainName), + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention), }) s.Nil(err) @@ -153,7 +155,7 @@ func (s *domainHandlerGlobalDomainDisabledSuite) TestRegisterGetDomain_AllDefaul UUID: common.StringPtr(""), }, resp.DomainInfo) s.Equal(&shared.DomainConfiguration{ - WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(0), + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention), EmitMetric: common.BoolPtr(false), ArchivalBucketName: common.StringPtr(""), ArchivalRetentionPeriodInDays: nil, @@ -307,7 +309,8 @@ func (s *domainHandlerGlobalDomainDisabledSuite) TestUpdateGetDomain_NoAttrSet() func (s *domainHandlerGlobalDomainDisabledSuite) TestUpdateGetDomain_AllAttrSet() { domainName := s.getRandomDomainName() err := s.handler.registerDomain(context.Background(), &shared.RegisterDomainRequest{ - Name: common.StringPtr(domainName), + Name: common.StringPtr(domainName), + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(1), }) s.Nil(err) diff --git a/service/frontend/domainHandler_GlobalDomainEnabled_MasterCluster_test.go b/service/frontend/domainHandler_GlobalDomainEnabled_MasterCluster_test.go index 91aa0cdbc71..b7836ca4900 100644 --- a/service/frontend/domainHandler_GlobalDomainEnabled_MasterCluster_test.go +++ b/service/frontend/domainHandler_GlobalDomainEnabled_MasterCluster_test.go @@ -105,9 +105,11 @@ func (s *domainHandlerGlobalDomainEnabledMasterClusterSuite) TestRegisterGetDoma }) } + retention := int32(1) err := s.handler.registerDomain(context.Background(), &shared.RegisterDomainRequest{ - Name: common.StringPtr(domainName), - IsGlobalDomain: common.BoolPtr(isGlobalDomain), + Name: common.StringPtr(domainName), + IsGlobalDomain: common.BoolPtr(isGlobalDomain), + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention), }) s.Nil(err) @@ -127,7 +129,7 @@ func (s *domainHandlerGlobalDomainEnabledMasterClusterSuite) TestRegisterGetDoma UUID: common.StringPtr(""), }, resp.DomainInfo) s.Equal(&shared.DomainConfiguration{ - WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(0), + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention), EmitMetric: common.BoolPtr(false), ArchivalBucketName: common.StringPtr(""), ArchivalRetentionPeriodInDays: nil, @@ -284,8 +286,9 @@ func (s *domainHandlerGlobalDomainEnabledMasterClusterSuite) TestUpdateGetDomain domainName := s.getRandomDomainName() isGlobalDomain := false err := s.handler.registerDomain(context.Background(), &shared.RegisterDomainRequest{ - Name: common.StringPtr(domainName), - IsGlobalDomain: common.BoolPtr(isGlobalDomain), + Name: common.StringPtr(domainName), + IsGlobalDomain: common.BoolPtr(isGlobalDomain), + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(1), }) s.Nil(err) @@ -373,9 +376,11 @@ func (s *domainHandlerGlobalDomainEnabledMasterClusterSuite) TestRegisterGetDoma s.mockProducer.On("Publish", mock.Anything).Return(nil).Once() + retention := int32(1) err := s.handler.registerDomain(context.Background(), &shared.RegisterDomainRequest{ - Name: common.StringPtr(domainName), - IsGlobalDomain: common.BoolPtr(isGlobalDomain), + Name: common.StringPtr(domainName), + IsGlobalDomain: common.BoolPtr(isGlobalDomain), + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention), }) s.Nil(err) @@ -395,7 +400,7 @@ func (s *domainHandlerGlobalDomainEnabledMasterClusterSuite) TestRegisterGetDoma UUID: common.StringPtr(""), }, resp.DomainInfo) s.Equal(&shared.DomainConfiguration{ - WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(0), + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention), EmitMetric: common.BoolPtr(false), ArchivalBucketName: common.StringPtr(""), ArchivalRetentionPeriodInDays: nil, @@ -576,10 +581,11 @@ func (s *domainHandlerGlobalDomainEnabledMasterClusterSuite) TestUpdateGetDomain s.mockProducer.On("Publish", mock.Anything).Return(nil).Twice() err := s.handler.registerDomain(context.Background(), &shared.RegisterDomainRequest{ - Name: common.StringPtr(domainName), - IsGlobalDomain: common.BoolPtr(isGlobalDomain), - Clusters: clusters, - ActiveClusterName: common.StringPtr(activeClusterName), + Name: common.StringPtr(domainName), + IsGlobalDomain: common.BoolPtr(isGlobalDomain), + Clusters: clusters, + ActiveClusterName: common.StringPtr(activeClusterName), + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(1), }) s.Nil(err) diff --git a/service/frontend/domainHandler_GlobalDomainEnabled_NotMasterCluster_test.go b/service/frontend/domainHandler_GlobalDomainEnabled_NotMasterCluster_test.go index 945549e118c..3cf1bc7d106 100644 --- a/service/frontend/domainHandler_GlobalDomainEnabled_NotMasterCluster_test.go +++ b/service/frontend/domainHandler_GlobalDomainEnabled_NotMasterCluster_test.go @@ -105,9 +105,11 @@ func (s *domainHandlerGlobalDomainEnabledNotMasterClusterSuite) TestRegisterGetD }) } + retention := int32(1) err := s.handler.registerDomain(context.Background(), &shared.RegisterDomainRequest{ - Name: common.StringPtr(domainName), - IsGlobalDomain: common.BoolPtr(isGlobalDomain), + Name: common.StringPtr(domainName), + IsGlobalDomain: common.BoolPtr(isGlobalDomain), + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention), }) s.Nil(err) @@ -127,7 +129,7 @@ func (s *domainHandlerGlobalDomainEnabledNotMasterClusterSuite) TestRegisterGetD UUID: common.StringPtr(""), }, resp.DomainInfo) s.Equal(&shared.DomainConfiguration{ - WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(0), + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention), EmitMetric: common.BoolPtr(false), ArchivalBucketName: common.StringPtr(""), ArchivalRetentionPeriodInDays: nil, @@ -284,8 +286,9 @@ func (s *domainHandlerGlobalDomainEnabledNotMasterClusterSuite) TestUpdateGetDom domainName := s.getRandomDomainName() isGlobalDomain := false err := s.handler.registerDomain(context.Background(), &shared.RegisterDomainRequest{ - Name: common.StringPtr(domainName), - IsGlobalDomain: common.BoolPtr(isGlobalDomain), + Name: common.StringPtr(domainName), + IsGlobalDomain: common.BoolPtr(isGlobalDomain), + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(1), }) s.Nil(err) diff --git a/service/frontend/domainHandler_test.go b/service/frontend/domainHandler_test.go index 09d6fe7f651..aa2a9599894 100644 --- a/service/frontend/domainHandler_test.go +++ b/service/frontend/domainHandler_test.go @@ -32,6 +32,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "github.com/uber/cadence/.gen/go/shared" + workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/log/loggerimpl" @@ -361,6 +362,62 @@ func (s *domainHandlerCommonSuite) TestListDomain() { }, domains) } +func (s *domainHandlerCommonSuite) TestValidateRetentionPeriod() { + testCases := []struct { + retentionPeriod int32 + expectedErr error + }{ + { + retentionPeriod: 10, + expectedErr: nil, + }, + { + retentionPeriod: 0, + expectedErr: errInvalidRetentionPeriod, + }, + { + retentionPeriod: -3, + expectedErr: errInvalidRetentionPeriod, + }, + } + for _, tc := range testCases { + actualErr := s.handler.validateRetentionPeriod(tc.retentionPeriod) + s.Equal(tc.expectedErr, actualErr) + } +} + +func (s *domainHandlerCommonSuite) TestRegisterDomain_InvalidRetentionPeriod() { + registerRequest := &workflow.RegisterDomainRequest{ + Name: common.StringPtr("random domain name"), + Description: common.StringPtr("random domain name"), + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(int32(0)), + IsGlobalDomain: common.BoolPtr(false), + } + err := s.handler.registerDomain(context.Background(), registerRequest) + s.Equal(errInvalidRetentionPeriod, err) +} + +func (s *domainHandlerCommonSuite) TestUpdateDomain_InvalidRetentionPeriod() { + domain := "random domain name" + registerRequest := &workflow.RegisterDomainRequest{ + Name: common.StringPtr(domain), + Description: common.StringPtr(domain), + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(int32(10)), + IsGlobalDomain: common.BoolPtr(false), + } + err := s.handler.registerDomain(context.Background(), registerRequest) + s.NoError(err) + + updateRequest := &workflow.UpdateDomainRequest{ + Name: common.StringPtr(domain), + Configuration: &workflow.DomainConfiguration{ + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(int32(-1)), + }, + } + _, err = s.handler.updateDomain(context.Background(), updateRequest) + s.Equal(errInvalidRetentionPeriod, err) +} + func (s *domainHandlerCommonSuite) getRandomDomainName() string { return "domain" + uuid.New() } diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index 877a1f266a3..d56ef718ea2 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -127,6 +127,7 @@ var ( errInvalidExecutionStartToCloseTimeoutSeconds = &gen.BadRequestError{Message: "A valid ExecutionStartToCloseTimeoutSeconds is not set on request."} errInvalidTaskStartToCloseTimeoutSeconds = &gen.BadRequestError{Message: "A valid TaskStartToCloseTimeoutSeconds is not set on request."} errClientVersionNotSet = &gen.BadRequestError{Message: "Client version is not set on request."} + errInvalidRetentionPeriod = &gen.BadRequestError{Message: "Retention period is not set to a positive number on request."} // err for archival errDomainHasNeverBeenEnabledForArchival = &gen.BadRequestError{Message: "Attempted to fetch history from archival, but domain has never been enabled for archival."}