Skip to content

Commit

Permalink
Add retention period check (cadence-workflow#1939)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Jun 4, 2019
1 parent efcc67b commit b9075be
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 33 deletions.
43 changes: 40 additions & 3 deletions host/integrationbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/loggerimpl"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/environment"
"go.uber.org/yarpc"
"go.uber.org/yarpc/transport/tchannel"
Expand Down Expand Up @@ -101,9 +102,7 @@ func (s *IntegrationBase) setupSuite(defaultClusterConfigFile string) {
s.Require().NoError(
s.registerDomain(s.foreignDomainName, 1, workflow.ArchivalStatusDisabled, ""))

s.archivalDomainName = s.randomizeStr("integration-archival-enabled-domain")
s.Require().NoError(
s.registerDomain(s.archivalDomainName, 0, workflow.ArchivalStatusEnabled, s.testCluster.blobstore.bucketName))
s.Require().NoError(s.registerArchivalDomain())

// this sleep is necessary because domainv2 cache gets refreshed in the
// background only every domainCacheRefreshInterval period
Expand Down Expand Up @@ -207,3 +206,41 @@ func (s *IntegrationBase) getHistory(domain string, execution *workflow.Workflow

return events
}

// To register archival domain we can't use frontend API as the retention period is set to 0 for testing,
// and request will be rejected by frontend. Here we make a call directly to persistence to register
// the domain.
func (s *IntegrationBase) registerArchivalDomain() error {
s.archivalDomainName = s.randomizeStr("integration-archival-enabled-domain")
currentClusterName := s.testCluster.testBase.ClusterMetadata.GetCurrentClusterName()
domainRequest := &persistence.CreateDomainRequest{
Info: &persistence.DomainInfo{
ID: uuid.New(),
Name: s.archivalDomainName,
Status: persistence.DomainStatusRegistered,
},
Config: &persistence.DomainConfig{
Retention: 0,
ArchivalBucket: s.testCluster.blobstore.bucketName,
ArchivalStatus: workflow.ArchivalStatusEnabled,
BadBinaries: workflow.BadBinaries{Binaries: map[string]*workflow.BadBinaryInfo{}},
},
ReplicationConfig: &persistence.DomainReplicationConfig{
ActiveClusterName: currentClusterName,
Clusters: []*persistence.ClusterReplicationConfig{
&persistence.ClusterReplicationConfig{
ClusterName: currentClusterName,
},
},
},
IsGlobalDomain: false,
FailoverVersion: 0,
}
response, err := s.testCluster.testBase.MetadataProxy.CreateDomain(domainRequest)

s.Logger.Info("Register domain succeeded",
tag.WorkflowDomainName(s.archivalDomainName),
tag.WorkflowDomainID(response.ID),
)
return err
}
5 changes: 3 additions & 2 deletions host/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
package host

import (
"github.com/olivere/elastic"
"github.com/stretchr/testify/suite"
"io/ioutil"
"time"

"github.com/olivere/elastic"
"github.com/stretchr/testify/suite"
)

// CreateESClient create ElasticSearch client for test
Expand Down
9 changes: 5 additions & 4 deletions host/xdc/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,11 @@ func (s *esCrossDCTestSuite) TestSearchAttributes() {
domainName := "test-xdc-search-attr-" + common.GenerateRandomString(5)
client1 := s.cluster1.GetFrontendClient() // active
regReq := &workflow.RegisterDomainRequest{
Name: common.StringPtr(domainName),
Clusters: clusterReplicationConfigES,
ActiveClusterName: common.StringPtr(clusterNameES[0]),
IsGlobalDomain: common.BoolPtr(true),
Name: common.StringPtr(domainName),
Clusters: clusterReplicationConfigES,
ActiveClusterName: common.StringPtr(clusterNameES[0]),
IsGlobalDomain: common.BoolPtr(true),
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(1),
}
err := client1.RegisterDomain(createContext(), regReq)
s.NoError(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,11 @@ func (s *integrationClustersTestSuite) TestDomainFailover() {
domainName := "test-domain-for-fail-over-" + common.GenerateRandomString(5)
client1 := s.cluster1.GetFrontendClient() // active
regReq := &workflow.RegisterDomainRequest{
Name: common.StringPtr(domainName),
IsGlobalDomain: common.BoolPtr(true),
Clusters: clusterReplicationConfig,
ActiveClusterName: common.StringPtr(clusterName[0]),
Name: common.StringPtr(domainName),
IsGlobalDomain: common.BoolPtr(true),
Clusters: clusterReplicationConfig,
ActiveClusterName: common.StringPtr(clusterName[0]),
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(7),
}
err := client1.RegisterDomain(createContext(), regReq)
s.NoError(err)
Expand Down
14 changes: 14 additions & 0 deletions service/frontend/domainHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ func (d *domainHandlerImpl) registerDomain(ctx context.Context,
}
}

if err := d.validateRetentionPeriod(registerRequest.GetWorkflowExecutionRetentionPeriodInDays()); err != nil {
return err
}

failoverVersion := common.EmptyVersion
if registerRequest.GetIsGlobalDomain() {
failoverVersion = d.clusterMetadata.GetNextFailoverVersion(activeClusterName, 0)
Expand Down Expand Up @@ -435,6 +439,9 @@ func (d *domainHandlerImpl) updateDomain(ctx context.Context,
if updatedConfig.WorkflowExecutionRetentionPeriodInDays != nil {
configurationChanged = true
config.Retention = updatedConfig.GetWorkflowExecutionRetentionPeriodInDays()
if err := d.validateRetentionPeriod(config.Retention); err != nil {
return nil, err
}
}
if archivalConfigChanged {
configurationChanged = true
Expand Down Expand Up @@ -683,6 +690,13 @@ func (d *domainHandlerImpl) validateClusterName(clusterName string) error {
return nil
}

func (d *domainHandlerImpl) validateRetentionPeriod(retentionDays int32) error {
if retentionDays <= 0 {
return errInvalidRetentionPeriod
}
return nil
}

func (d *domainHandlerImpl) checkPermission(securityToken *string) error {
if d.config.EnableAdminProtection() {
if securityToken == nil {
Expand Down
9 changes: 6 additions & 3 deletions service/frontend/domainHandler_GlobalDomainDisabled_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,10 @@ func (s *domainHandlerGlobalDomainDisabledSuite) TestRegisterGetDomain_AllDefaul
})
}

retention := int32(1)
err := s.handler.registerDomain(context.Background(), &shared.RegisterDomainRequest{
Name: common.StringPtr(domainName),
Name: common.StringPtr(domainName),
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention),
})
s.Nil(err)

Expand All @@ -153,7 +155,7 @@ func (s *domainHandlerGlobalDomainDisabledSuite) TestRegisterGetDomain_AllDefaul
UUID: common.StringPtr(""),
}, resp.DomainInfo)
s.Equal(&shared.DomainConfiguration{
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(0),
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention),
EmitMetric: common.BoolPtr(false),
ArchivalBucketName: common.StringPtr(""),
ArchivalRetentionPeriodInDays: nil,
Expand Down Expand Up @@ -307,7 +309,8 @@ func (s *domainHandlerGlobalDomainDisabledSuite) TestUpdateGetDomain_NoAttrSet()
func (s *domainHandlerGlobalDomainDisabledSuite) TestUpdateGetDomain_AllAttrSet() {
domainName := s.getRandomDomainName()
err := s.handler.registerDomain(context.Background(), &shared.RegisterDomainRequest{
Name: common.StringPtr(domainName),
Name: common.StringPtr(domainName),
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(1),
})
s.Nil(err)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,11 @@ func (s *domainHandlerGlobalDomainEnabledMasterClusterSuite) TestRegisterGetDoma
})
}

retention := int32(1)
err := s.handler.registerDomain(context.Background(), &shared.RegisterDomainRequest{
Name: common.StringPtr(domainName),
IsGlobalDomain: common.BoolPtr(isGlobalDomain),
Name: common.StringPtr(domainName),
IsGlobalDomain: common.BoolPtr(isGlobalDomain),
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention),
})
s.Nil(err)

Expand All @@ -127,7 +129,7 @@ func (s *domainHandlerGlobalDomainEnabledMasterClusterSuite) TestRegisterGetDoma
UUID: common.StringPtr(""),
}, resp.DomainInfo)
s.Equal(&shared.DomainConfiguration{
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(0),
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention),
EmitMetric: common.BoolPtr(false),
ArchivalBucketName: common.StringPtr(""),
ArchivalRetentionPeriodInDays: nil,
Expand Down Expand Up @@ -284,8 +286,9 @@ func (s *domainHandlerGlobalDomainEnabledMasterClusterSuite) TestUpdateGetDomain
domainName := s.getRandomDomainName()
isGlobalDomain := false
err := s.handler.registerDomain(context.Background(), &shared.RegisterDomainRequest{
Name: common.StringPtr(domainName),
IsGlobalDomain: common.BoolPtr(isGlobalDomain),
Name: common.StringPtr(domainName),
IsGlobalDomain: common.BoolPtr(isGlobalDomain),
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(1),
})
s.Nil(err)

Expand Down Expand Up @@ -373,9 +376,11 @@ func (s *domainHandlerGlobalDomainEnabledMasterClusterSuite) TestRegisterGetDoma

s.mockProducer.On("Publish", mock.Anything).Return(nil).Once()

retention := int32(1)
err := s.handler.registerDomain(context.Background(), &shared.RegisterDomainRequest{
Name: common.StringPtr(domainName),
IsGlobalDomain: common.BoolPtr(isGlobalDomain),
Name: common.StringPtr(domainName),
IsGlobalDomain: common.BoolPtr(isGlobalDomain),
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention),
})
s.Nil(err)

Expand All @@ -395,7 +400,7 @@ func (s *domainHandlerGlobalDomainEnabledMasterClusterSuite) TestRegisterGetDoma
UUID: common.StringPtr(""),
}, resp.DomainInfo)
s.Equal(&shared.DomainConfiguration{
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(0),
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention),
EmitMetric: common.BoolPtr(false),
ArchivalBucketName: common.StringPtr(""),
ArchivalRetentionPeriodInDays: nil,
Expand Down Expand Up @@ -576,10 +581,11 @@ func (s *domainHandlerGlobalDomainEnabledMasterClusterSuite) TestUpdateGetDomain
s.mockProducer.On("Publish", mock.Anything).Return(nil).Twice()

err := s.handler.registerDomain(context.Background(), &shared.RegisterDomainRequest{
Name: common.StringPtr(domainName),
IsGlobalDomain: common.BoolPtr(isGlobalDomain),
Clusters: clusters,
ActiveClusterName: common.StringPtr(activeClusterName),
Name: common.StringPtr(domainName),
IsGlobalDomain: common.BoolPtr(isGlobalDomain),
Clusters: clusters,
ActiveClusterName: common.StringPtr(activeClusterName),
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(1),
})
s.Nil(err)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,11 @@ func (s *domainHandlerGlobalDomainEnabledNotMasterClusterSuite) TestRegisterGetD
})
}

retention := int32(1)
err := s.handler.registerDomain(context.Background(), &shared.RegisterDomainRequest{
Name: common.StringPtr(domainName),
IsGlobalDomain: common.BoolPtr(isGlobalDomain),
Name: common.StringPtr(domainName),
IsGlobalDomain: common.BoolPtr(isGlobalDomain),
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention),
})
s.Nil(err)

Expand All @@ -127,7 +129,7 @@ func (s *domainHandlerGlobalDomainEnabledNotMasterClusterSuite) TestRegisterGetD
UUID: common.StringPtr(""),
}, resp.DomainInfo)
s.Equal(&shared.DomainConfiguration{
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(0),
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention),
EmitMetric: common.BoolPtr(false),
ArchivalBucketName: common.StringPtr(""),
ArchivalRetentionPeriodInDays: nil,
Expand Down Expand Up @@ -284,8 +286,9 @@ func (s *domainHandlerGlobalDomainEnabledNotMasterClusterSuite) TestUpdateGetDom
domainName := s.getRandomDomainName()
isGlobalDomain := false
err := s.handler.registerDomain(context.Background(), &shared.RegisterDomainRequest{
Name: common.StringPtr(domainName),
IsGlobalDomain: common.BoolPtr(isGlobalDomain),
Name: common.StringPtr(domainName),
IsGlobalDomain: common.BoolPtr(isGlobalDomain),
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(1),
})
s.Nil(err)

Expand Down
57 changes: 57 additions & 0 deletions service/frontend/domainHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/uber/cadence/.gen/go/shared"
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/log/loggerimpl"
Expand Down Expand Up @@ -361,6 +362,62 @@ func (s *domainHandlerCommonSuite) TestListDomain() {
}, domains)
}

func (s *domainHandlerCommonSuite) TestValidateRetentionPeriod() {
testCases := []struct {
retentionPeriod int32
expectedErr error
}{
{
retentionPeriod: 10,
expectedErr: nil,
},
{
retentionPeriod: 0,
expectedErr: errInvalidRetentionPeriod,
},
{
retentionPeriod: -3,
expectedErr: errInvalidRetentionPeriod,
},
}
for _, tc := range testCases {
actualErr := s.handler.validateRetentionPeriod(tc.retentionPeriod)
s.Equal(tc.expectedErr, actualErr)
}
}

func (s *domainHandlerCommonSuite) TestRegisterDomain_InvalidRetentionPeriod() {
registerRequest := &workflow.RegisterDomainRequest{
Name: common.StringPtr("random domain name"),
Description: common.StringPtr("random domain name"),
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(int32(0)),
IsGlobalDomain: common.BoolPtr(false),
}
err := s.handler.registerDomain(context.Background(), registerRequest)
s.Equal(errInvalidRetentionPeriod, err)
}

func (s *domainHandlerCommonSuite) TestUpdateDomain_InvalidRetentionPeriod() {
domain := "random domain name"
registerRequest := &workflow.RegisterDomainRequest{
Name: common.StringPtr(domain),
Description: common.StringPtr(domain),
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(int32(10)),
IsGlobalDomain: common.BoolPtr(false),
}
err := s.handler.registerDomain(context.Background(), registerRequest)
s.NoError(err)

updateRequest := &workflow.UpdateDomainRequest{
Name: common.StringPtr(domain),
Configuration: &workflow.DomainConfiguration{
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(int32(-1)),
},
}
_, err = s.handler.updateDomain(context.Background(), updateRequest)
s.Equal(errInvalidRetentionPeriod, err)
}

func (s *domainHandlerCommonSuite) getRandomDomainName() string {
return "domain" + uuid.New()
}
1 change: 1 addition & 0 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ var (
errInvalidExecutionStartToCloseTimeoutSeconds = &gen.BadRequestError{Message: "A valid ExecutionStartToCloseTimeoutSeconds is not set on request."}
errInvalidTaskStartToCloseTimeoutSeconds = &gen.BadRequestError{Message: "A valid TaskStartToCloseTimeoutSeconds is not set on request."}
errClientVersionNotSet = &gen.BadRequestError{Message: "Client version is not set on request."}
errInvalidRetentionPeriod = &gen.BadRequestError{Message: "Retention period is not set to a positive number on request."}

// err for archival
errDomainHasNeverBeenEnabledForArchival = &gen.BadRequestError{Message: "Attempted to fetch history from archival, but domain has never been enabled for archival."}
Expand Down

0 comments on commit b9075be

Please sign in to comment.