Skip to content

Commit

Permalink
Add check to ensure archival bucket exists (cadence-workflow#1704)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjdawson2016 authored Apr 15, 2019
1 parent 84b6916 commit 6ce4bc3
Show file tree
Hide file tree
Showing 11 changed files with 166 additions and 16 deletions.
1 change: 0 additions & 1 deletion client/clientfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
)

const (
publicCaller = "cadence-public-client"
frontendCaller = "cadence-frontend-client"
historyCaller = "history-service-client"
matchingCaller = "matching-service-client"
Expand Down
12 changes: 12 additions & 0 deletions common/blobstore/filestore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,18 @@ func (c *client) BucketMetadata(_ context.Context, bucket string) (*blobstore.Bu
}, nil
}

func (c *client) BucketExists(_ context.Context, bucket string) (bool, error) {
c.Lock()
defer c.Unlock()

bd := bucketDirectory(c.storeDirectory, bucket)
exists, err := directoryExists(bd)
if err != nil {
return false, ErrCheckBucketExists
}
return exists, nil
}

func (c *client) IsRetryableError(err error) bool {
return false
}
Expand Down
15 changes: 15 additions & 0 deletions common/blobstore/filestore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,21 @@ func (s *ClientSuite) TestBucketMetadata_Success() {
s.Equal(defaultBucketOwner, metadata.Owner)
}

func (s *ClientSuite) TestBucketExists() {
dir, err := ioutil.TempDir("", "TestBucketExists")
s.NoError(err)
defer os.RemoveAll(dir)
client := s.constructClient(dir)

exists, err := client.BucketExists(context.Background(), "bucket-not-exists")
s.NoError(err)
s.False(exists)

exists, err = client.BucketExists(context.Background(), defaultBucketName)
s.NoError(err)
s.True(exists)
}

func (s *ClientSuite) constructClient(storeDir string) blobstore.Client {
cfg := s.constructConfig(storeDir)
client, err := NewClient(cfg)
Expand Down
1 change: 1 addition & 0 deletions common/blobstore/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Client interface {
Delete(ctx context.Context, bucket string, key blob.Key) (bool, error)
ListByPrefix(ctx context.Context, bucket string, prefix string) ([]blob.Key, error)
BucketMetadata(ctx context.Context, bucket string) (*BucketMetadataResponse, error)
BucketExists(ctx context.Context, bucket string) (bool, error)

IsRetryableError(err error) bool
GetRetryPolicy() backoff.RetryPolicy
Expand Down
13 changes: 13 additions & 0 deletions common/blobstore/metricClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,19 @@ func (c *metricClient) BucketMetadata(ctx context.Context, bucket string) (*Buck
return resp, err
}

func (c *metricClient) BucketExists(ctx context.Context, bucket string) (bool, error) {
c.metricsClient.IncCounter(metrics.BlobstoreClientBucketExistsScope, metrics.CadenceClientRequests)

sw := c.metricsClient.StartTimer(metrics.BlobstoreClientBucketExistsScope, metrics.CadenceClientLatency)
resp, err := c.client.BucketExists(ctx, bucket)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.BlobstoreClientBucketExistsScope, metrics.CadenceClientFailures)
}
return resp, err
}

func (c *metricClient) IsRetryableError(err error) bool {
return c.client.IsRetryableError(err)
}
Expand Down
11 changes: 11 additions & 0 deletions common/blobstore/retryableClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,17 @@ func (c *retryableClient) BucketMetadata(ctx context.Context, bucket string) (*B
return resp, err
}

func (c *retryableClient) BucketExists(ctx context.Context, bucket string) (bool, error) {
var resp bool
op := func() error {
var err error
resp, err = c.client.BucketExists(ctx, bucket)
return err
}
err := backoff.Retry(op, c.policy, c.isRetryable)
return resp, err
}

func (c *retryableClient) IsRetryableError(err error) bool {
return c.client.IsRetryableError(err)
}
Expand Down
3 changes: 3 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,8 @@ const (
BlobstoreClientListByPrefixScope
// BlobstoreClientBucketMetadataScope tracks BucketMetadata calls to blobstore
BlobstoreClientBucketMetadataScope
// BlobstoreClientBucketExistsScope tracks BucketExists calls to blobstore
BlobstoreClientBucketExistsScope

// ClusterMetadataArchivalConfigScope tracks ArchivalConfig calls to ClusterMetadata
ClusterMetadataArchivalConfigScope
Expand Down Expand Up @@ -816,6 +818,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
BlobstoreClientDeleteScope: {operation: "BlobstoreClientDelete", tags: map[string]string{CadenceRoleTagName: BlobstoreRoleTagValue}},
BlobstoreClientListByPrefixScope: {operation: "BlobstoreClientListByPrefix", tags: map[string]string{CadenceRoleTagName: BlobstoreRoleTagValue}},
BlobstoreClientBucketMetadataScope: {operation: "BlobstoreClientBucketMetadata", tags: map[string]string{CadenceRoleTagName: BlobstoreRoleTagValue}},
BlobstoreClientBucketExistsScope: {operation: "BlobstoreClientBucketExists", tags: map[string]string{CadenceRoleTagName: BlobstoreRoleTagValue}},

ClusterMetadataArchivalConfigScope: {operation: "ArchivalConfig"},

Expand Down
21 changes: 21 additions & 0 deletions common/mocks/BlobstoreClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 19 additions & 1 deletion service/frontend/domainArchivalConfigStateMachine.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
package frontend

import (
"context"

"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/blobstore"
)

// domainArchivalConfigStateMachine is only used by workflowHandler.
Expand Down Expand Up @@ -60,6 +63,7 @@ var (
var (
errDisallowedBucketMetadata = &shared.BadRequestError{Message: "Cannot set bucket owner or bucket retention (must update bucket manually)"}
errBucketNameUpdate = &shared.BadRequestError{Message: "Cannot update existing bucket name"}
errBucketDoesNotExist = &shared.BadRequestError{Message: "Bucket does not exist"}
)

func neverEnabledState() *archivalState {
Expand Down Expand Up @@ -113,7 +117,7 @@ func (s *archivalState) validate() error {
return nil
}

func (s *archivalState) getNextState(e *archivalEvent) (nextState *archivalState, changed bool, err error) {
func (s *archivalState) getNextState(ctx context.Context, blobstoreClient blobstore.Client, e *archivalEvent) (nextState *archivalState, changed bool, err error) {
defer func() {
// ensure that any existing bucket name was not mutated
if nextState != nil && len(s.bucket) != 0 && s.bucket != nextState.bucket {
Expand All @@ -130,6 +134,20 @@ func (s *archivalState) getNextState(e *archivalEvent) (nextState *archivalState
err = nextStateErr
}
}

// ensure the bucket exists
if nextState != nil && nextState.bucket != "" {
exists, bucketExistsErr := blobstoreClient.BucketExists(ctx, nextState.bucket)
if bucketExistsErr != nil {
nextState = nil
changed = false
err = bucketExistsErr
} else if !exists {
nextState = nil
changed = false
err = errBucketDoesNotExist
}
}
}()

if s == nil || e == nil {
Expand Down
24 changes: 12 additions & 12 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func (wh *WorkflowHandler) RegisterDomain(ctx context.Context, registerRequest *
if err != nil {
return wh.error(err, scope)
}
nextArchivalState, _, err = currentArchivalState.getNextState(archivalEvent)
nextArchivalState, _, err = currentArchivalState.getNextState(ctx, wh.blobstoreClient, archivalEvent)
if err != nil {
return wh.error(err, scope)
}
Expand Down Expand Up @@ -380,7 +380,7 @@ func (wh *WorkflowHandler) ListDomains(ctx context.Context,
IsGlobalDomain: common.BoolPtr(d.IsGlobalDomain),
FailoverVersion: common.Int64Ptr(d.FailoverVersion),
}
desc.DomainInfo, desc.Configuration, desc.ReplicationConfiguration = wh.createDomainResponse(d.Info, d.Config, d.ReplicationConfig)
desc.DomainInfo, desc.Configuration, desc.ReplicationConfiguration = wh.createDomainResponse(ctx, d.Info, d.Config, d.ReplicationConfig)
domains = append(domains, desc)
}

Expand Down Expand Up @@ -422,9 +422,7 @@ func (wh *WorkflowHandler) DescribeDomain(ctx context.Context,
IsGlobalDomain: common.BoolPtr(resp.IsGlobalDomain),
FailoverVersion: common.Int64Ptr(resp.FailoverVersion),
}
response.DomainInfo, response.Configuration, response.ReplicationConfiguration = wh.createDomainResponse(
resp.Info, resp.Config, resp.ReplicationConfig)

response.DomainInfo, response.Configuration, response.ReplicationConfiguration = wh.createDomainResponse(ctx, resp.Info, resp.Config, resp.ReplicationConfig)
return response, nil
}

Expand Down Expand Up @@ -491,7 +489,7 @@ func (wh *WorkflowHandler) UpdateDomain(ctx context.Context,
if err != nil {
return nil, wh.error(err, scope)
}
nextArchivalState, archivalConfigChanged, err = currentArchivalState.getNextState(archivalEvent)
nextArchivalState, archivalConfigChanged, err = currentArchivalState.getNextState(ctx, wh.blobstoreClient, archivalEvent)
if err != nil {
return nil, wh.error(err, scope)
}
Expand Down Expand Up @@ -666,8 +664,7 @@ func (wh *WorkflowHandler) UpdateDomain(ctx context.Context,
IsGlobalDomain: common.BoolPtr(getResponse.IsGlobalDomain),
FailoverVersion: common.Int64Ptr(failoverVersion),
}
response.DomainInfo, response.Configuration, response.ReplicationConfiguration = wh.createDomainResponse(
info, config, replicationConfig)
response.DomainInfo, response.Configuration, response.ReplicationConfiguration = wh.createDomainResponse(ctx, info, config, replicationConfig)
return response, nil
}

Expand Down Expand Up @@ -3205,9 +3202,12 @@ func getDomainStatus(info *persistence.DomainInfo) *gen.DomainStatus {
return nil
}

func (wh *WorkflowHandler) createDomainResponse(info *persistence.DomainInfo, config *persistence.DomainConfig,
replicationConfig *persistence.DomainReplicationConfig) (*gen.DomainInfo,
*gen.DomainConfiguration, *gen.DomainReplicationConfiguration) {
func (wh *WorkflowHandler) createDomainResponse(
ctx context.Context,
info *persistence.DomainInfo,
config *persistence.DomainConfig,
replicationConfig *persistence.DomainReplicationConfig,
) (*gen.DomainInfo, *gen.DomainConfiguration, *gen.DomainReplicationConfiguration) {

infoResult := &gen.DomainInfo{
Name: common.StringPtr(info.Name),
Expand All @@ -3225,7 +3225,7 @@ func (wh *WorkflowHandler) createDomainResponse(info *persistence.DomainInfo, co
ArchivalBucketName: common.StringPtr(config.ArchivalBucket),
}
if wh.GetClusterMetadata().ArchivalConfig().ConfiguredForArchival() && config.ArchivalBucket != "" {
metadata, err := wh.blobstoreClient.BucketMetadata(context.Background(), config.ArchivalBucket)
metadata, err := wh.blobstoreClient.BucketMetadata(ctx, config.ArchivalBucket)
if err == nil {
configResult.ArchivalRetentionPeriodInDays = common.Int32Ptr(int32(metadata.RetentionDays))
configResult.ArchivalBucketOwner = common.StringPtr(metadata.Owner)
Expand Down
61 changes: 59 additions & 2 deletions service/frontend/workflowHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,32 @@ func (s *workflowHandlerSuite) getWorkflowHandlerWithParams(mService cs.Service,
s.mockVisibilityMgr, s.mockProducer, blobStore)
}

func (s *workflowHandlerSuite) TestRegisterDomain_Failure_BucketNotExists() {
config := s.newConfig()
clusterMetadata := &mocks.ClusterMetadata{}
clusterMetadata.On("IsGlobalDomainEnabled").Return(false)
clusterMetadata.On("GetCurrentClusterName").Return("active")
clusterMetadata.On("ArchivalConfig").Return(cluster.NewArchivalConfig(cluster.ArchivalEnabled, "test-archival-bucket", true))
clusterMetadata.On("GetNextFailoverVersion", mock.Anything, mock.Anything).Return(int64(0))
mMetadataManager := &mocks.MetadataManager{}
mMetadataManager.On("GetDomain", mock.Anything).Return(nil, &shared.EntityNotExistsError{})
mMetadataManager.On("CreateDomain", mock.Anything).Return(&persistence.CreateDomainResponse{
ID: "test-id",
}, nil)
mBlobstore := &mocks.BlobstoreClient{}
mBlobstore.On("BucketExists", mock.Anything, mock.Anything).Return(false, nil)

mService := cs.NewTestService(clusterMetadata, s.mockMessagingClient, s.mockMetricClient, s.mockClientBean, s.logger)
wh := s.getWorkflowHandlerWithParams(mService, config, mMetadataManager, mBlobstore)
wh.metricsClient = wh.Service.GetMetricsClient()
wh.startWG.Done()

req := registerDomainRequest(common.ArchivalStatusPtr(shared.ArchivalStatusEnabled), common.StringPtr("not-exists-bucket"))
err := wh.RegisterDomain(context.Background(), req)
assert.Error(s.T(), err)
assert.Equal(s.T(), err, errBucketDoesNotExist)
}

func (s *workflowHandlerSuite) TestRegisterDomain_Success_EnabledWithNoBucket() {
config := s.newConfig()
clusterMetadata := &mocks.ClusterMetadata{}
Expand All @@ -537,9 +563,11 @@ func (s *workflowHandlerSuite) TestRegisterDomain_Success_EnabledWithNoBucket()
mMetadataManager.On("CreateDomain", mock.Anything).Return(&persistence.CreateDomainResponse{
ID: "test-id",
}, nil)
mBlobstore := &mocks.BlobstoreClient{}
mBlobstore.On("BucketExists", mock.Anything, mock.Anything).Return(true, nil)

mService := cs.NewTestService(clusterMetadata, s.mockMessagingClient, s.mockMetricClient, s.mockClientBean, s.logger)
wh := s.getWorkflowHandlerWithParams(mService, config, mMetadataManager, s.mockBlobstoreClient)
wh := s.getWorkflowHandlerWithParams(mService, config, mMetadataManager, mBlobstore)
wh.metricsClient = wh.Service.GetMetricsClient()
wh.startWG.Done()

Expand All @@ -560,9 +588,11 @@ func (s *workflowHandlerSuite) TestRegisterDomain_Success_EnabledWithBucket() {
mMetadataManager.On("CreateDomain", mock.Anything).Return(&persistence.CreateDomainResponse{
ID: "test-id",
}, nil)
mBlobstore := &mocks.BlobstoreClient{}
mBlobstore.On("BucketExists", mock.Anything, mock.Anything).Return(true, nil)

mService := cs.NewTestService(clusterMetadata, s.mockMessagingClient, s.mockMetricClient, s.mockClientBean, s.logger)
wh := s.getWorkflowHandlerWithParams(mService, config, mMetadataManager, s.mockBlobstoreClient)
wh := s.getWorkflowHandlerWithParams(mService, config, mMetadataManager, mBlobstore)
wh.metricsClient = wh.Service.GetMetricsClient()
wh.startWG.Done()

Expand Down Expand Up @@ -749,6 +779,29 @@ func (s *workflowHandlerSuite) TestUpdateDomain_Failure_UpdateExistingBucketName
assert.Equal(s.T(), errBucketNameUpdate, err)
}

func (s *workflowHandlerSuite) TestUpdateDomain_Failure_BucketNotExists() {
config := s.newConfig()
mMetadataManager := &mocks.MetadataManager{}
mMetadataManager.On("GetMetadata").Return(&persistence.GetMetadataResponse{
NotificationVersion: int64(0),
}, nil)
mMetadataManager.On("GetDomain", mock.Anything).Return(persistenceGetDomainResponse("", shared.ArchivalStatusDisabled), nil)
clusterMetadata := &mocks.ClusterMetadata{}
clusterMetadata.On("IsGlobalDomainEnabled").Return(false)
mBlobstore := &mocks.BlobstoreClient{}
mBlobstore.On("BucketExists", mock.Anything, mock.Anything).Return(false, nil)
clusterMetadata.On("ArchivalConfig").Return(cluster.NewArchivalConfig(cluster.ArchivalEnabled, "test-archival-bucket", true))
mService := cs.NewTestService(clusterMetadata, s.mockMessagingClient, s.mockMetricClient, s.mockClientBean, s.logger)
wh := s.getWorkflowHandlerWithParams(mService, config, mMetadataManager, mBlobstore)
wh.metricsClient = wh.Service.GetMetricsClient()
wh.startWG.Done()

updateReq := updateRequest(common.StringPtr("bucket-not-exists"), common.ArchivalStatusPtr(shared.ArchivalStatusEnabled), nil, nil)
_, err := wh.UpdateDomain(context.Background(), updateReq)
assert.Error(s.T(), err)
assert.Equal(s.T(), errBucketDoesNotExist, err)
}

func (s *workflowHandlerSuite) TestUpdateDomain_Success_ArchivalEnabledToArchivalDisabledWithoutSettingBucket() {
config := s.newConfig()
mMetadataManager := &mocks.MetadataManager{}
Expand All @@ -762,6 +815,7 @@ func (s *workflowHandlerSuite) TestUpdateDomain_Success_ArchivalEnabledToArchiva
clusterMetadata.On("ArchivalConfig").Return(cluster.NewArchivalConfig(cluster.ArchivalEnabled, "test-archival-bucket", true))
mService := cs.NewTestService(clusterMetadata, s.mockMessagingClient, s.mockMetricClient, s.mockClientBean, s.logger)
mBlobstore := &mocks.BlobstoreClient{}
mBlobstore.On("BucketExists", mock.Anything, mock.Anything).Return(true, nil)
mBlobstore.On("BucketMetadata", mock.Anything, mock.Anything).Return(bucketMetadataResponse("test-owner", 10), nil)
wh := s.getWorkflowHandlerWithParams(mService, config, mMetadataManager, mBlobstore)
wh.metricsClient = wh.Service.GetMetricsClient()
Expand Down Expand Up @@ -818,6 +872,7 @@ func (s *workflowHandlerSuite) TestUpdateDomain_Success_ArchivalEnabledToArchiva
clusterMetadata.On("ArchivalConfig").Return(cluster.NewArchivalConfig(cluster.ArchivalEnabled, "test-archival-bucket", true))
mService := cs.NewTestService(clusterMetadata, s.mockMessagingClient, s.mockMetricClient, s.mockClientBean, s.logger)
mBlobstore := &mocks.BlobstoreClient{}
mBlobstore.On("BucketExists", mock.Anything, mock.Anything).Return(true, nil)
mBlobstore.On("BucketMetadata", mock.Anything, mock.Anything).Return(bucketMetadataResponse("test-owner", 10), nil)
wh := s.getWorkflowHandlerWithParams(mService, config, mMetadataManager, mBlobstore)
wh.metricsClient = wh.Service.GetMetricsClient()
Expand Down Expand Up @@ -847,6 +902,7 @@ func (s *workflowHandlerSuite) TestUpdateDomain_Success_ArchivalEnabledToEnabled
clusterMetadata.On("ArchivalConfig").Return(cluster.NewArchivalConfig(cluster.ArchivalEnabled, "test-archival-bucket", true))
mService := cs.NewTestService(clusterMetadata, s.mockMessagingClient, s.mockMetricClient, s.mockClientBean, s.logger)
mBlobstore := &mocks.BlobstoreClient{}
mBlobstore.On("BucketExists", mock.Anything, mock.Anything).Return(true, nil)
mBlobstore.On("BucketMetadata", mock.Anything, mock.Anything).Return(bucketMetadataResponse("test-owner", 10), nil)
wh := s.getWorkflowHandlerWithParams(mService, config, mMetadataManager, mBlobstore)
wh.metricsClient = wh.Service.GetMetricsClient()
Expand Down Expand Up @@ -876,6 +932,7 @@ func (s *workflowHandlerSuite) TestUpdateDomain_Success_ArchivalNeverEnabledToEn
clusterMetadata.On("ArchivalConfig").Return(cluster.NewArchivalConfig(cluster.ArchivalEnabled, "test-archival-bucket", true))
mService := cs.NewTestService(clusterMetadata, s.mockMessagingClient, s.mockMetricClient, s.mockClientBean, s.logger)
mBlobstore := &mocks.BlobstoreClient{}
mBlobstore.On("BucketExists", mock.Anything, mock.Anything).Return(true, nil)
mBlobstore.On("BucketMetadata", mock.Anything, mock.Anything).Return(bucketMetadataResponse("test-owner", 10), nil)
wh := s.getWorkflowHandlerWithParams(mService, config, mMetadataManager, mBlobstore)
wh.metricsClient = wh.Service.GetMetricsClient()
Expand Down

0 comments on commit 6ce4bc3

Please sign in to comment.