diff --git a/cmd/server/server.go b/cmd/server/server.go index 6ef11a90295..e37d896acb0 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -21,13 +21,12 @@ package main import ( + "github.com/uber/cadence/common/blobstore/filestore" "log" "time" "github.com/uber/cadence/client" "github.com/uber/cadence/common" - "github.com/uber/cadence/common/archival" - "github.com/uber/cadence/common/blobstore" "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/service" @@ -113,14 +112,13 @@ func (s *server) startService() common.Daemon { params.DynamicConfig = dynamicconfig.NewNopClient() dc := dynamicconfig.NewCollection(params.DynamicConfig, params.Logger) - params.ArchivalClient = archival.NewNopClient() - params.BlobstoreClient = blobstore.NewNopClient() - svcCfg := s.cfg.Services[s.name] params.MetricScope = svcCfg.Metrics.NewScope() params.RPCFactory = svcCfg.RPC.NewFactory(params.Name, params.Logger) params.PProfInitializer = svcCfg.PProf.NewInitializer(params.Logger) enableGlobalDomain := dc.GetBoolProperty(dynamicconfig.EnableGlobalDomain, s.cfg.ClustersInfo.EnableGlobalDomain) + enableArchival := dc.GetBoolProperty(dynamicconfig.EnableArchival, s.cfg.Archival.Enabled) + params.ClusterMetadata = cluster.NewMetadata( enableGlobalDomain, s.cfg.ClustersInfo.FailoverVersionIncrement, @@ -128,7 +126,8 @@ func (s *server) startService() common.Daemon { s.cfg.ClustersInfo.CurrentClusterName, s.cfg.ClustersInfo.ClusterInitialFailoverVersions, s.cfg.ClustersInfo.ClusterAddress, - s.cfg.ClustersInfo.DeploymentGroup, + enableArchival, + s.cfg.Archival.Filestore.DefaultBucket.Name, ) params.DispatcherProvider = client.NewIPYarpcDispatcherProvider() // TODO: We need to switch Cadence to use zap logger, until then just pass zap.NewNop @@ -143,6 +142,13 @@ func (s *server) startService() common.Daemon { params.MessagingClient = nil } + if params.ClusterMetadata.IsArchivalEnabled() { + params.BlobstoreClient, err = filestore.NewClient(&s.cfg.Archival.Filestore) + if err != nil { + log.Fatalf("error creating blobstore: %v", err) + } + } + params.Logger.Info("Starting service " + s.name) var daemon common.Daemon diff --git a/common/archival/client.go b/common/archival/client.go deleted file mode 100644 index bc833c0611c..00000000000 --- a/common/archival/client.go +++ /dev/null @@ -1,80 +0,0 @@ -// Copyright (c) 2017 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package archival - -import ( - "context" - "errors" - "github.com/uber/cadence/.gen/go/shared" - "time" -) - -// PutRequest is request for Archive -type PutRequest struct { - DomainName string - DomainID string - WorkflowID string - RunID string - WorkflowType string - Status string - CloseTime time.Time - History *shared.History -} - -// Client is used to store and retrieve blobs -type Client interface { - PutWorkflow(ctx context.Context, request *PutRequest) error - - GetWorkflowExecutionHistory( - ctx context.Context, - request *shared.GetWorkflowExecutionHistoryRequest, - ) (*shared.GetWorkflowExecutionHistoryResponse, error) - - ListClosedWorkflowExecutions( - ctx context.Context, - ListRequest *shared.ListClosedWorkflowExecutionsRequest, - ) (*shared.ListClosedWorkflowExecutionsResponse, error) -} - -type nopClient struct{} - -func (c *nopClient) PutWorkflow(ctx context.Context, request *PutRequest) error { - return errors.New("not implemented") -} - -func (c *nopClient) GetWorkflowExecutionHistory( - ctx context.Context, - request *shared.GetWorkflowExecutionHistoryRequest, -) (*shared.GetWorkflowExecutionHistoryResponse, error) { - return nil, errors.New("not implemented") -} - -func (c *nopClient) ListClosedWorkflowExecutions( - ctx context.Context, - ListRequest *shared.ListClosedWorkflowExecutionsRequest, -) (*shared.ListClosedWorkflowExecutionsResponse, error) { - return nil, errors.New("not implemented") -} - -// NewNopClient creates a nop client -func NewNopClient() Client { - return &nopClient{} -} diff --git a/common/blobstore/filestore/client.go b/common/blobstore/filestore/client.go new file mode 100644 index 00000000000..7c095f737be --- /dev/null +++ b/common/blobstore/filestore/client.go @@ -0,0 +1,180 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package filestore + +import ( + "context" + "fmt" + "os" + "path/filepath" + "sync" + + "github.com/uber/cadence/common/blobstore" +) + +const ( + metadataFilename = "metadata" +) + +type client struct { + sync.Mutex + storeDirectory string +} + +// NewClient returns a new Client backed by file system +func NewClient(cfg *Config) (blobstore.Client, error) { + if err := cfg.Validate(); err != nil { + return nil, err + } + if err := setupDirectories(cfg); err != nil { + return nil, err + } + if err := writeMetadataFiles(cfg); err != nil { + return nil, err + } + return &client{ + storeDirectory: cfg.StoreDirectory, + }, nil +} + +func (c *client) UploadBlob(_ context.Context, bucket string, filename string, blob *blobstore.Blob) error { + c.Lock() + defer c.Unlock() + + exists, err := directoryExists(bucketDirectory(c.storeDirectory, bucket)) + if err != nil { + return err + } + if !exists { + return blobstore.ErrBucketNotExists + } + data, err := serializeBlob(blob) + if err != nil { + return err + } + blobPath := bucketItemPath(c.storeDirectory, bucket, filename) + return writeFile(blobPath, data) +} + +func (c *client) DownloadBlob(_ context.Context, bucket string, filename string) (*blobstore.Blob, error) { + c.Lock() + defer c.Unlock() + + exists, err := directoryExists(bucketDirectory(c.storeDirectory, bucket)) + if err != nil { + return nil, err + } + if !exists { + return nil, blobstore.ErrBucketNotExists + } + blobPath := bucketItemPath(c.storeDirectory, bucket, filename) + data, err := readFile(blobPath) + if err != nil { + if os.IsNotExist(err) { + return nil, blobstore.ErrBlobNotExists + } + return nil, err + } + return deserializeBlob(data) +} + +func (c *client) BucketMetadata(_ context.Context, bucket string) (*blobstore.BucketMetadataResponse, error) { + c.Lock() + defer c.Unlock() + + exists, err := directoryExists(bucketDirectory(c.storeDirectory, bucket)) + if err != nil { + return nil, err + } + if !exists { + return nil, blobstore.ErrBucketNotExists + } + + metadataFilepath := bucketItemPath(c.storeDirectory, bucket, metadataFilename) + exists, err = fileExists(metadataFilepath) + if err != nil { + return nil, err + } + if !exists { + return nil, fmt.Errorf("failed to get metadata, file at %v does not exist", metadataFilepath) + } + + data, err := readFile(metadataFilepath) + if err != nil { + return nil, err + } + bucketCfg, err := deserializeBucketConfig(data) + if err != nil { + return nil, err + } + + return &blobstore.BucketMetadataResponse{ + Owner: bucketCfg.Owner, + RetentionDays: bucketCfg.RetentionDays, + }, nil +} + +func setupDirectories(cfg *Config) error { + if err := mkdirAll(cfg.StoreDirectory); err != nil { + return err + } + if err := mkdirAll(bucketDirectory(cfg.StoreDirectory, cfg.DefaultBucket.Name)); err != nil { + return err + } + for _, b := range cfg.CustomBuckets { + if err := mkdirAll(bucketDirectory(cfg.StoreDirectory, b.Name)); err != nil { + return err + } + } + return nil +} + +func writeMetadataFiles(cfg *Config) error { + writeMetadataFile := func(bucketConfig BucketConfig) error { + path := bucketItemPath(cfg.StoreDirectory, bucketConfig.Name, metadataFilename) + bytes, err := serializeBucketConfig(&bucketConfig) + if err != nil { + return fmt.Errorf("failed to write metadata file for bucket %v: %v", bucketConfig.Name, err) + } + if err := writeFile(path, bytes); err != nil { + return fmt.Errorf("failed to write metadata file for bucket %v: %v", bucketConfig.Name, err) + } + return nil + } + + if err := writeMetadataFile(cfg.DefaultBucket); err != nil { + return err + } + for _, b := range cfg.CustomBuckets { + if err := writeMetadataFile(b); err != nil { + return err + } + } + return nil +} + +func bucketDirectory(storeDirectory string, bucketName string) string { + return filepath.Join(storeDirectory, bucketName) +} + +func bucketItemPath(storeDirectory string, bucketName string, filename string) string { + return filepath.Join(bucketDirectory(storeDirectory, bucketName), filename) +} diff --git a/common/blobstore/filestore/client_test.go b/common/blobstore/filestore/client_test.go new file mode 100644 index 00000000000..3ec11fa76db --- /dev/null +++ b/common/blobstore/filestore/client_test.go @@ -0,0 +1,315 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package filestore + +import ( + "bytes" + "context" + "fmt" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/uber/cadence/common/blobstore" + "io/ioutil" + "os" + "path" + "path/filepath" + "testing" +) + +const ( + defaultBucketName = "default-bucket-name" + defaultBucketOwner = "default-bucket-owner" + defaultBucketRetentionDays = 10 + customBucketNamePrefix = "custom-bucket-name" + customBucketOwner = "custom-bucket-owner" + customBucketRetentionDays = 100 + numberOfCustomBuckets = 5 +) + +type ClientSuite struct { + *require.Assertions + suite.Suite +} + +func TestClientSuite(t *testing.T) { + suite.Run(t, new(ClientSuite)) +} + +func (s *ClientSuite) SetupTest() { + s.Assertions = require.New(s.T()) +} + +func (s *ClientSuite) TestNewClientInvalidConfig() { + invalidCfg := &Config{ + StoreDirectory: "/test/store/dir", + DefaultBucket: BucketConfig{ + Name: "default-bucket-name", + }, + } + + client, err := NewClient(invalidCfg) + s.Error(err) + s.Nil(client) +} + +func (s *ClientSuite) TestSetupDirectoryFailure() { + dir, err := ioutil.TempDir("", "test.setup.directory.failure") + s.NoError(err) + defer os.RemoveAll(dir) + os.Chmod(dir, os.FileMode(0600)) + + cfg := s.constructConfig(dir) + client, err := NewClient(cfg) + s.Error(err) + s.Nil(client) +} + +func (s *ClientSuite) TestWriteMetadataFilesFailure() { + dir, err := ioutil.TempDir("", "test.write.metadata.files.failure") + s.NoError(err) + defer os.RemoveAll(dir) + s.NoError(mkdirAll(filepath.Join(dir, defaultBucketName, metadataFilename, "foo"))) + + cfg := s.constructConfig(dir) + client, err := NewClient(cfg) + s.Error(err) + s.Nil(client) +} + +func (s *ClientSuite) TestUploadBlobBucketNotExists() { + dir, err := ioutil.TempDir("", "test.upload.blob.bucket.not.exists") + s.NoError(err) + defer os.RemoveAll(dir) + client := s.constructClient(dir) + + blob := s.constructBlob("blob body", map[string]string{"tagKey": "tagValue"}) + blobFilename := "blob.blob" + s.Equal(blobstore.ErrBucketNotExists, client.UploadBlob(context.Background(), "bucket-not-exists", blobFilename, blob)) +} + +func (s *ClientSuite) TestUploadBlobErrorOnWrite() { + dir, err := ioutil.TempDir("", "test.upload.blob.error.on.write") + s.NoError(err) + defer os.RemoveAll(dir) + + blobFilename := "blob.blob" + s.NoError(mkdirAll(path.Join(dir, defaultBucketName, blobFilename, "foo"))) + client := s.constructClient(dir) + + blob := s.constructBlob("blob body", map[string]string{"tagKey": "tagValue"}) + s.Error(client.UploadBlob(context.Background(), defaultBucketName, blobFilename, blob)) +} + +func (s *ClientSuite) TestDownloadBlobBucketNotExists() { + dir, err := ioutil.TempDir("", "test.download.blob.bucket.not.exists") + s.NoError(err) + defer os.RemoveAll(dir) + client := s.constructClient(dir) + + blob, err := client.DownloadBlob(context.Background(), "bucket-not-exists", "blobname") + s.Equal(blobstore.ErrBucketNotExists, err) + s.Nil(blob) +} + +func (s *ClientSuite) TestDownloadBlobBlobNotExists() { + dir, err := ioutil.TempDir("", "test.download.blob.blob.not.exists") + s.NoError(err) + defer os.RemoveAll(dir) + client := s.constructClient(dir) + + blob, err := client.DownloadBlob(context.Background(), defaultBucketName, "blobname") + s.Equal(blobstore.ErrBlobNotExists, err) + s.Nil(blob) +} + +func (s *ClientSuite) TestDownloadBlobNoPermissions() { + dir, err := ioutil.TempDir("", "test.download.blob.no.permissions") + s.NoError(err) + defer os.RemoveAll(dir) + client := s.constructClient(dir) + + blob := s.constructBlob("blob body", map[string]string{"tagKey": "tagValue"}) + blobFilename := "blob.blob" + s.NoError(client.UploadBlob(context.Background(), defaultBucketName, blobFilename, blob)) + os.Chmod(bucketItemPath(dir, defaultBucketName, blobFilename), os.FileMode(0000)) + + blob, err = client.DownloadBlob(context.Background(), defaultBucketName, blobFilename) + s.NotEqual(blobstore.ErrBlobNotExists, err) + s.Error(err) + s.Nil(blob) +} + +func (s *ClientSuite) TestDownloadBlobInvalidFormat() { + dir, err := ioutil.TempDir("", "test.download.blob.invalid.format") + s.NoError(err) + defer os.RemoveAll(dir) + + client := s.constructClient(dir) + blobFilename := "blob.blob" + s.NoError(writeFile(filepath.Join(dir, defaultBucketName, blobFilename), []byte("invalid"))) + + blob, err := client.DownloadBlob(context.Background(), defaultBucketName, blobFilename) + s.NotEqual(blobstore.ErrBlobNotExists, err) + s.Error(err) + s.Nil(blob) +} + +func (s *ClientSuite) TestUploadDownloadBlob() { + dir, err := ioutil.TempDir("", "test.upload.download.blob") + s.NoError(err) + defer os.RemoveAll(dir) + + client := s.constructClient(dir) + blob := s.constructBlob("body version 1", map[string]string{}) + blobFilename := "blob.blob" + s.NoError(client.UploadBlob(context.Background(), defaultBucketName, blobFilename, blob)) + downloadBlob, err := client.DownloadBlob(context.Background(), defaultBucketName, blobFilename) + s.NoError(err) + s.NotNil(downloadBlob) + s.assertBlobEquals(map[string]string{}, "body version 1", downloadBlob) + + blob = s.constructBlob("body version 2", map[string]string{"key": "value"}) + s.NoError(client.UploadBlob(context.Background(), defaultBucketName, blobFilename, blob)) + downloadBlob, err = client.DownloadBlob(context.Background(), defaultBucketName, blobFilename) + s.NoError(err) + s.NotNil(downloadBlob) + s.assertBlobEquals(map[string]string{"key": "value"}, "body version 2", downloadBlob) +} + +func (s *ClientSuite) TestUploadDownloadBlobCustomBucket() { + dir, err := ioutil.TempDir("", "test.upload.download.blob.custom.bucket") + s.NoError(err) + defer os.RemoveAll(dir) + + client := s.constructClient(dir) + blob := s.constructBlob("blob body", map[string]string{}) + blobFilename := "blob.blob" + customBucketName := fmt.Sprintf("%v-%v", customBucketNamePrefix, 3) + s.NoError(client.UploadBlob(context.Background(), customBucketName, blobFilename, blob)) + downloadBlob, err := client.DownloadBlob(context.Background(), customBucketName, blobFilename) + s.NoError(err) + s.NotNil(downloadBlob) + s.assertBlobEquals(map[string]string{}, "blob body", downloadBlob) +} + +func (s *ClientSuite) TestBucketMetadataBucketNotExists() { + dir, err := ioutil.TempDir("", "test.bucket.metadata.bucket.not.exists") + s.NoError(err) + defer os.RemoveAll(dir) + client := s.constructClient(dir) + + metadata, err := client.BucketMetadata(context.Background(), "bucket-not-exists") + s.Equal(blobstore.ErrBucketNotExists, err) + s.Nil(metadata) +} + +func (s *ClientSuite) TestBucketMetadataCheckFileExistsError() { + dir, err := ioutil.TempDir("", "test.bucket.metadata.check.file.exists.error") + s.NoError(err) + defer os.RemoveAll(dir) + client := s.constructClient(dir) + s.NoError(os.Chmod(bucketDirectory(dir, defaultBucketName), os.FileMode(0000))) + + metadata, err := client.BucketMetadata(context.Background(), defaultBucketName) + s.Error(err) + s.Nil(metadata) +} + +func (s *ClientSuite) TestBucketMetadataFileNotExistsError() { + dir, err := ioutil.TempDir("", "test.bucket.metadata.file.not.exists.error") + s.NoError(err) + defer os.RemoveAll(dir) + client := s.constructClient(dir) + s.NoError(os.Remove(bucketItemPath(dir, defaultBucketName, metadataFilename))) + + metadata, err := client.BucketMetadata(context.Background(), defaultBucketName) + s.Error(err) + s.Nil(metadata) +} + +func (s *ClientSuite) TestBucketMetadataFileInvalidForm() { + dir, err := ioutil.TempDir("", "test.bucket.metadata.file.invalid.form") + s.NoError(err) + defer os.RemoveAll(dir) + client := s.constructClient(dir) + s.NoError(writeFile(bucketItemPath(dir, defaultBucketName, metadataFilename), []byte("invalid"))) + + metadata, err := client.BucketMetadata(context.Background(), defaultBucketName) + s.Error(err) + s.Nil(metadata) +} + +func (s *ClientSuite) TestBucketMetadataSuccess() { + dir, err := ioutil.TempDir("", "test.bucket.metadata.success") + s.NoError(err) + defer os.RemoveAll(dir) + client := s.constructClient(dir) + + metadata, err := client.BucketMetadata(context.Background(), defaultBucketName) + s.NoError(err) + s.NotNil(metadata) + s.Equal(defaultBucketRetentionDays, metadata.RetentionDays) + s.Equal(defaultBucketOwner, metadata.Owner) +} + +func (s *ClientSuite) constructBlob(body string, tags map[string]string) *blobstore.Blob { + return &blobstore.Blob{ + Body: bytes.NewReader([]byte(body)), + Tags: tags, + CompressionType: blobstore.NoCompression, + } +} + +func (s *ClientSuite) constructClient(storeDir string) blobstore.Client { + cfg := s.constructConfig(storeDir) + client, err := NewClient(cfg) + s.NoError(err) + s.NotNil(client) + return client +} + +func (s *ClientSuite) constructConfig(storeDir string) *Config { + cfg := &Config{ + StoreDirectory: storeDir, + } + cfg.DefaultBucket = BucketConfig{ + Name: defaultBucketName, + Owner: defaultBucketOwner, + RetentionDays: defaultBucketRetentionDays, + } + + for i := 0; i < numberOfCustomBuckets; i++ { + cfg.CustomBuckets = append(cfg.CustomBuckets, BucketConfig{ + Name: fmt.Sprintf("%v-%v", customBucketNamePrefix, i), + Owner: customBucketOwner, + RetentionDays: customBucketRetentionDays, + }) + } + return cfg +} + +func (s *ClientSuite) assertBlobEquals(expectedTags map[string]string, expectedBody string, actual *blobstore.Blob) { + s.Equal(blobstore.NoCompression, actual.CompressionType) + s.Equal(expectedTags, actual.Tags) + actualBody, err := ioutil.ReadAll(actual.Body) + s.NoError(err) + s.Equal(expectedBody, string(actualBody)) +} diff --git a/common/blobstore/filestore/config.go b/common/blobstore/filestore/config.go new file mode 100644 index 00000000000..210d1a8d7a9 --- /dev/null +++ b/common/blobstore/filestore/config.go @@ -0,0 +1,70 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package filestore + +import ( + "errors" +) + +type ( + // Config describes the configuration needed to construct a blobstore client backed by file system + Config struct { + StoreDirectory string `yaml:"storeDirectory"` + DefaultBucket BucketConfig `yaml:"defaultBucket"` + CustomBuckets []BucketConfig `yaml:"customBuckets"` + } + + // BucketConfig describes the config for a bucket + BucketConfig struct { + Name string `yaml:"name"` + Owner string `yaml:"owner"` + RetentionDays int `yaml:"retentionDays"` + } +) + +// Validate validates config +func (c *Config) Validate() error { + validateBucketConfig := func(b BucketConfig) error { + if len(b.Name) == 0 { + return errors.New("empty bucket name") + } + if len(b.Owner) == 0 { + return errors.New("empty bucket owner") + } + if b.RetentionDays < 0 { + return errors.New("negative retention days") + } + return nil + } + + if len(c.StoreDirectory) == 0 { + return errors.New("empty store directory") + } + if err := validateBucketConfig(c.DefaultBucket); err != nil { + return err + } + for _, b := range c.CustomBuckets { + if err := validateBucketConfig(b); err != nil { + return err + } + } + return nil +} diff --git a/common/blobstore/filestore/config_test.go b/common/blobstore/filestore/config_test.go new file mode 100644 index 00000000000..e1fe8125d77 --- /dev/null +++ b/common/blobstore/filestore/config_test.go @@ -0,0 +1,142 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package filestore + +import ( + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "testing" +) + +type ConfigSuite struct { + *require.Assertions + suite.Suite +} + +func TestConfigSuite(t *testing.T) { + suite.Run(t, new(ConfigSuite)) +} + +func (s *ConfigSuite) SetupTest() { + s.Assertions = require.New(s.T()) +} + +func (s *ConfigSuite) TestValidate() { + testCases := []struct { + config *Config + isValid bool + }{ + { + config: &Config{ + StoreDirectory: "", + }, + isValid: false, + }, + { + config: &Config{ + StoreDirectory: "test-store-directory", + DefaultBucket: BucketConfig{}, + }, + isValid: false, + }, + { + config: &Config{ + StoreDirectory: "test-store-directory", + DefaultBucket: BucketConfig{ + Name: "test-default-bucket-name", + }, + }, + isValid: false, + }, + { + config: &Config{ + StoreDirectory: "test-store-directory", + DefaultBucket: BucketConfig{ + Name: "test-default-bucket-name", + Owner: "test-default-bucket-owner", + }, + }, + isValid: true, + }, + { + config: &Config{ + StoreDirectory: "test-store-directory", + DefaultBucket: BucketConfig{ + Name: "test-default-bucket-name", + Owner: "test-default-bucket-owner", + RetentionDays: -1, + }, + }, + isValid: false, + }, + { + config: &Config{ + StoreDirectory: "test-store-directory", + DefaultBucket: BucketConfig{ + Name: "test-default-bucket-name", + Owner: "test-default-bucket-owner", + RetentionDays: 10, + }, + }, + isValid: true, + }, + { + config: &Config{ + StoreDirectory: "test-store-directory", + DefaultBucket: BucketConfig{ + Name: "test-default-bucket-name", + Owner: "test-default-bucket-owner", + RetentionDays: 10, + }, + CustomBuckets: []BucketConfig{ + {}, + }, + }, + isValid: false, + }, + { + config: &Config{ + StoreDirectory: "test-store-directory", + DefaultBucket: BucketConfig{ + Name: "test-default-bucket-name", + Owner: "test-default-bucket-owner", + RetentionDays: 10, + }, + CustomBuckets: []BucketConfig{ + { + Name: "test-custom-bucket-name", + Owner: "test-custom-bucket-owner", + RetentionDays: 10, + }, + }, + }, + isValid: true, + }, + } + + for _, tc := range testCases { + if tc.isValid { + s.NoError(tc.config.Validate()) + } else { + s.Error(tc.config.Validate()) + } + } +} diff --git a/common/blobstore/filestore/util.go b/common/blobstore/filestore/util.go new file mode 100644 index 00000000000..113928f0369 --- /dev/null +++ b/common/blobstore/filestore/util.go @@ -0,0 +1,141 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package filestore + +import ( + "bytes" + "encoding/gob" + "errors" + "github.com/uber/cadence/common/blobstore" + "gopkg.in/yaml.v2" + "io/ioutil" + "os" +) + +const ( + dirMode = os.FileMode(0700) + fileMode = os.FileMode(0600) +) + +func fileExists(filepath string) (bool, error) { + info, err := os.Stat(filepath) + if err != nil { + if os.IsNotExist(err) { + return false, nil + } + return false, err + } + + if info.IsDir() { + return false, errors.New("specified directory not file") + } + return true, nil +} + +func directoryExists(path string) (bool, error) { + info, err := os.Stat(path) + if err != nil { + if os.IsNotExist(err) { + return false, nil + } + return false, err + } + + if !info.IsDir() { + return false, errors.New("specified file not directory") + } + return true, nil +} + +func mkdirAll(path string) error { + return os.MkdirAll(path, dirMode) +} + +func writeFile(filepath string, data []byte) error { + if err := os.Remove(filepath); err != nil && !os.IsNotExist(err) { + return err + } + f, err := os.Create(filepath) + defer f.Close() + if err != nil { + return err + } + if err = f.Chmod(fileMode); err != nil { + return err + } + if _, err = f.Write(data); err != nil { + return err + } + return nil +} + +func readFile(filepath string) ([]byte, error) { + return ioutil.ReadFile(filepath) +} + +func serializeBucketConfig(bucketCfg *BucketConfig) ([]byte, error) { + return yaml.Marshal(bucketCfg) +} + +func deserializeBucketConfig(data []byte) (*BucketConfig, error) { + bucketCfg := &BucketConfig{} + if err := yaml.Unmarshal(data, bucketCfg); err != nil { + return nil, err + } + return bucketCfg, nil +} + +type serializedBlob struct { + Body []byte + Tags map[string]string +} + +func serializeBlob(blob *blobstore.Blob) ([]byte, error) { + buf := bytes.Buffer{} + encoder := gob.NewEncoder(&buf) + body, err := ioutil.ReadAll(blob.Body) + if err != nil { + return nil, err + } + serBlob := serializedBlob{ + Body: body, + Tags: blob.Tags, + } + if err := encoder.Encode(serBlob); err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func deserializeBlob(data []byte) (*blobstore.Blob, error) { + serBlob := &serializedBlob{} + dataReader := bytes.NewReader(data) + decoder := gob.NewDecoder(dataReader) + if err := decoder.Decode(serBlob); err != nil { + return nil, err + } + + return &blobstore.Blob{ + Body: bytes.NewReader(serBlob.Body), + Tags: serBlob.Tags, + CompressionType: blobstore.NoCompression, + }, nil +} diff --git a/common/blobstore/filestore/util_test.go b/common/blobstore/filestore/util_test.go new file mode 100644 index 00000000000..52b8ababc4d --- /dev/null +++ b/common/blobstore/filestore/util_test.go @@ -0,0 +1,212 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package filestore + +import ( + "bytes" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/uber/cadence/common/blobstore" + "io/ioutil" + "os" + "path/filepath" + "testing" +) + +type UtilSuite struct { + *require.Assertions + suite.Suite +} + +func TestUtilSuite(t *testing.T) { + suite.Run(t, new(UtilSuite)) +} + +func (s *UtilSuite) SetupTest() { + s.Assertions = require.New(s.T()) +} + +func (s *UtilSuite) TestFileExists() { + dir, err := ioutil.TempDir("", "test.file.exists") + s.NoError(err) + defer os.RemoveAll(dir) + + exists, err := fileExists(dir) + s.Error(err) + s.False(exists) + + filename := "test-file-name" + exists, err = fileExists(filepath.Join(dir, filename)) + s.NoError(err) + s.False(exists) + + s.createFile(dir, filename) + exists, err = fileExists(filepath.Join(dir, filename)) + s.NoError(err) + s.True(exists) +} + +func (s *UtilSuite) TestDirectoryExists() { + dir, err := ioutil.TempDir("", "test.directory.exists") + s.NoError(err) + defer os.RemoveAll(dir) + + exists, err := directoryExists(dir) + s.NoError(err) + s.True(exists) + + subdir := "subdir" + exists, err = directoryExists(filepath.Join(dir, subdir)) + s.NoError(err) + s.False(exists) + + filename := "test-file-name" + s.createFile(dir, filename) + fpath := filepath.Join(dir, filename) + exists, err = directoryExists(fpath) + s.Error(err) + s.False(exists) +} + +func (s *UtilSuite) TestMkdirAll() { + dir, err := ioutil.TempDir("", "test.mkdir.all") + s.NoError(err) + defer os.RemoveAll(dir) + + s.assertDirectoryExists(dir) + s.NoError(mkdirAll(dir)) + s.assertDirectoryExists(dir) + + subdirPath := filepath.Join(dir, "subdir_1", "subdir_2", "subdir_3") + s.assertDirectoryNotExists(subdirPath) + s.NoError(mkdirAll(subdirPath)) + s.assertDirectoryExists(subdirPath) + s.assertCorrectFileMode(subdirPath) + + filename := "test-file-name" + s.createFile(dir, filename) + fpath := filepath.Join(dir, filename) + s.Error(mkdirAll(fpath)) +} + +func (s *UtilSuite) TestWriteFile() { + dir, err := ioutil.TempDir("", "test.write.file") + s.NoError(err) + defer os.RemoveAll(dir) + + s.assertDirectoryExists(dir) + + filename := "test-file-name" + fpath := filepath.Join(dir, filename) + s.NoError(writeFile(fpath, []byte("file body 1"))) + s.assertFileExists(fpath) + s.assertCorrectFileMode(fpath) + + s.NoError(writeFile(fpath, []byte("file body 2"))) + s.assertFileExists(fpath) + s.assertCorrectFileMode(fpath) + + s.Error(writeFile(dir, []byte(""))) + s.assertFileExists(fpath) +} + +func (s *UtilSuite) TestReadFile() { + dir, err := ioutil.TempDir("", "test.read.file") + s.NoError(err) + defer os.RemoveAll(dir) + + s.assertDirectoryExists(dir) + + filename := "test-file-name" + fpath := filepath.Join(dir, filename) + data, err := readFile(fpath) + s.Error(err) + s.Empty(data) + + writeFile(fpath, []byte("file contents")) + data, err = readFile(fpath) + s.NoError(err) + s.Equal("file contents", string(data)) +} + +func (s *UtilSuite) TestSerializationBucketConfig() { + inCfg := &BucketConfig{ + Name: "test-custom-bucket-name", + Owner: "test-custom-bucket-owner", + RetentionDays: 10, + } + bytes, err := serializeBucketConfig(inCfg) + s.NoError(err) + + outCfg, err := deserializeBucketConfig(bytes) + s.NoError(err) + s.Equal(inCfg, outCfg) +} + +func (s *UtilSuite) TestSerializationBlob() { + inBlob := &blobstore.Blob{ + Body: bytes.NewReader([]byte("file contents")), + Tags: map[string]string{"key1": "value1", "key2": "value2"}, + CompressionType: blobstore.NoCompression, + } + data, err := serializeBlob(inBlob) + s.NoError(err) + + outBlob, err := deserializeBlob(data) + s.NoError(err) + s.Equal(inBlob.Tags, outBlob.Tags) + s.Equal(inBlob.CompressionType, outBlob.CompressionType) + outBody, err := ioutil.ReadAll(outBlob.Body) + s.Equal("file contents", string(outBody)) +} + +func (s *UtilSuite) createFile(dir string, filename string) { + err := ioutil.WriteFile(filepath.Join(dir, filename), []byte("file contents"), fileMode) + s.Nil(err) +} + +func (s *UtilSuite) assertFileExists(filepath string) { + exists, err := fileExists(filepath) + s.NoError(err) + s.True(exists) +} + +func (s *UtilSuite) assertDirectoryExists(path string) { + exists, err := directoryExists(path) + s.NoError(err) + s.True(exists) +} + +func (s *UtilSuite) assertDirectoryNotExists(path string) { + exists, err := directoryExists(path) + s.NoError(err) + s.False(exists) +} + +func (s *UtilSuite) assertCorrectFileMode(path string) { + info, err := os.Stat(path) + s.NoError(err) + mode := fileMode + if info.IsDir() { + mode = dirMode | os.ModeDir + } + s.Equal(mode, info.Mode()) +} diff --git a/common/blobstore/client.go b/common/blobstore/interface.go similarity index 72% rename from common/blobstore/client.go rename to common/blobstore/interface.go index de83eb8b461..85576fc436d 100644 --- a/common/blobstore/client.go +++ b/common/blobstore/interface.go @@ -34,6 +34,13 @@ const ( NoCompression CompressionType = iota ) +var ( + // ErrBlobNotExists indicates that requested blob does not exist + ErrBlobNotExists = errors.New("requested blob does not exist") + // ErrBucketNotExists indicates that requested bucket does not exist + ErrBucketNotExists = errors.New("requested bucket does not exist") +) + // Blob defines a blob type Blob struct { Body io.Reader @@ -49,26 +56,7 @@ type BucketMetadataResponse struct { // Client is used to operate on blobs in a blobstore type Client interface { - UploadBlob(ctx context.Context, bucket string, path string, blob *Blob) error - DownloadBlob(ctx context.Context, bucket string, path string) (*Blob, error) + UploadBlob(ctx context.Context, bucket string, filename string, blob *Blob) error + DownloadBlob(ctx context.Context, bucket string, filename string) (*Blob, error) BucketMetadata(ctx context.Context, bucket string) (*BucketMetadataResponse, error) } - -type nopClient struct{} - -func (c *nopClient) UploadBlob(ctx context.Context, bucket string, path string, blob *Blob) error { - return errors.New("not implemented") -} - -func (c *nopClient) DownloadBlob(ctx context.Context, bucket string, path string) (*Blob, error) { - return nil, errors.New("not implemented") -} - -func (c *nopClient) BucketMetadata(ctx context.Context, bucket string) (*BucketMetadataResponse, error) { - return nil, errors.New("not implemented") -} - -// NewNopClient creates a nop client -func NewNopClient() Client { - return &nopClient{} -} diff --git a/common/cache/domainCache.go b/common/cache/domainCache.go index b8c939a3e1d..b4ee417228c 100644 --- a/common/cache/domainCache.go +++ b/common/cache/domainCache.go @@ -319,7 +319,7 @@ func (c *domainCache) refreshDomains() error { prevEntries := []*DomainCacheEntry{} nextEntries := []*DomainCacheEntry{} - // make a copy of the existing domain cache, so we can calaulate diff and do compare and swap + // make a copy of the existing domain cache, so we can calculate diff and do compare and swap newCacheNameToID := newDomainCache() newCacheByID := newDomainCache() for _, domain := range c.GetAllDomain() { @@ -539,8 +539,10 @@ func (entry *DomainCacheEntry) duplicate() *DomainCacheEntry { result.info.Data[k] = v } result.config = &persistence.DomainConfig{ - Retention: entry.config.Retention, - EmitMetric: entry.config.EmitMetric, + Retention: entry.config.Retention, + EmitMetric: entry.config.EmitMetric, + ArchivalBucket: entry.config.ArchivalBucket, + ArchivalStatus: entry.config.ArchivalStatus, } result.replicationConfig = &persistence.DomainReplicationConfig{ ActiveClusterName: entry.replicationConfig.ActiveClusterName, diff --git a/common/cluster/metadata.go b/common/cluster/metadata.go index faad1760274..83c92c84808 100644 --- a/common/cluster/metadata.go +++ b/common/cluster/metadata.go @@ -49,8 +49,11 @@ type ( ClusterNameForFailoverVersion(failoverVersion int64) string // GetAllClientAddress return the frontend address for each cluster name GetAllClientAddress() map[string]config.Address - // GetDeploymentGroup returns the deployment group of cluster - GetDeploymentGroup() string + + // IsArchivalEnabled whether archival is enabled + IsArchivalEnabled() bool + // GetDefaultArchivalBucket returns the default archival bucket name + GetDefaultArchivalBucket() string } metadataImpl struct { @@ -70,16 +73,25 @@ type ( initialFailoverVersionClusters map[int64]string // clusterToAddress contains the cluster name to corresponding frontend client clusterToAddress map[string]config.Address - // deploymentGroup is the deployment group name of cluster - deploymentGroup string + + // enableArchival whether archival is enabled + enableArchival dynamicconfig.BoolPropertyFn + // defaultArchivalBucket is the default archival bucket name used for this cluster + defaultArchivalBucket string } ) // NewMetadata create a new instance of Metadata -func NewMetadata(enableGlobalDomain dynamicconfig.BoolPropertyFn, failoverVersionIncrement int64, - masterClusterName string, currentClusterName string, +func NewMetadata( + enableGlobalDomain dynamicconfig.BoolPropertyFn, + failoverVersionIncrement int64, + masterClusterName string, + currentClusterName string, clusterInitialFailoverVersions map[string]int64, - clusterToAddress map[string]config.Address, deploymentGroup string) Metadata { + clusterToAddress map[string]config.Address, + enableArchival dynamicconfig.BoolPropertyFn, + defaultArchivalBucket string, +) Metadata { if len(clusterInitialFailoverVersions) < 0 { panic("Empty initial failover versions for cluster") @@ -116,6 +128,13 @@ func NewMetadata(enableGlobalDomain dynamicconfig.BoolPropertyFn, failoverVersio panic("Cluster to address size is different than Cluster to initial failover versions") } + defaultArchivalBucketSet := len(defaultArchivalBucket) != 0 + if enableArchival() && !defaultArchivalBucketSet { + panic("Archival enabled but no default bucket set") + } else if !enableArchival() && defaultArchivalBucketSet { + panic("Archival not enabled but default bucket set") + } + return &metadataImpl{ enableGlobalDomain: enableGlobalDomain, failoverVersionIncrement: failoverVersionIncrement, @@ -124,7 +143,8 @@ func NewMetadata(enableGlobalDomain dynamicconfig.BoolPropertyFn, failoverVersio clusterInitialFailoverVersions: clusterInitialFailoverVersions, initialFailoverVersionClusters: initialFailoverVersionClusters, clusterToAddress: clusterToAddress, - deploymentGroup: deploymentGroup, + enableArchival: enableArchival, + defaultArchivalBucket: defaultArchivalBucket, } } @@ -195,7 +215,12 @@ func (metadata *metadataImpl) GetAllClientAddress() map[string]config.Address { return metadata.clusterToAddress } -// GetDeploymentGroup returns the deployment group name for cluster -func (metadata *metadataImpl) GetDeploymentGroup() string { - return metadata.deploymentGroup +// IsArchivalEnabled whether archival is enabled +func (metadata *metadataImpl) IsArchivalEnabled() bool { + return metadata.enableArchival() +} + +// GetDefaultArchivalBucket returns the default archival bucket name +func (metadata *metadataImpl) GetDefaultArchivalBucket() string { + return metadata.defaultArchivalBucket } diff --git a/common/cluster/metadataTestBase.go b/common/cluster/metadataTestBase.go index 012433c4217..4e4200686d6 100644 --- a/common/cluster/metadataTestBase.go +++ b/common/cluster/metadataTestBase.go @@ -41,8 +41,6 @@ const ( TestCurrentClusterFrontendAddress = "127.0.0.1:7104" // TestAlternativeClusterFrontendAddress is the ip port address of alternative cluster TestAlternativeClusterFrontendAddress = "127.0.0.1:8104" - // TestDeploymentGroup is alternative deployment group used for test - TestDeploymentGroup = "test" ) var ( @@ -71,6 +69,7 @@ func GetTestClusterMetadata(enableGlobalDomain bool, isMasterCluster bool) Metad TestCurrentClusterName: config.Address{RPCName: common.FrontendServiceName, RPCAddress: TestCurrentClusterFrontendAddress}, TestAlternativeClusterName: config.Address{RPCName: common.FrontendServiceName, RPCAddress: TestAlternativeClusterFrontendAddress}, }, - TestDeploymentGroup, + dynamicconfig.GetBoolPropertyFn(false), + "", ) } diff --git a/common/mocks/ArchivalClient.go b/common/mocks/ArchivalClient.go new file mode 100644 index 00000000000..1d63ea4f594 --- /dev/null +++ b/common/mocks/ArchivalClient.go @@ -0,0 +1,59 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package mocks + +import mock "github.com/stretchr/testify/mock" +import sysworkflow "github.com/uber/cadence/service/worker/sysworkflow" + +// ArchivalClient is an autogenerated mock type for the ArchivalClient type +type ArchivalClient struct { + mock.Mock +} + +// Archive provides a mock function with given fields: _a0 +func (_m *ArchivalClient) Archive(_a0 *sysworkflow.ArchiveRequest) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(*sysworkflow.ArchiveRequest) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Backfill provides a mock function with given fields: _a0 +func (_m *ArchivalClient) Backfill(_a0 *sysworkflow.BackfillRequest) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(*sysworkflow.BackfillRequest) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/common/mocks/ClusterMetadata.go b/common/mocks/ClusterMetadata.go index 45136a5edd5..7cb08eea132 100644 --- a/common/mocks/ClusterMetadata.go +++ b/common/mocks/ClusterMetadata.go @@ -171,3 +171,32 @@ func (_m *ClusterMetadata) GetAllClientAddress() map[string]config.Address { return r0 } + +// IsArchivalEnabled provides a mock function with given fields: +func (_m *ClusterMetadata) IsArchivalEnabled() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// GetDefaultArchivalBucket provides a mock function with given fields: +func (_m *ClusterMetadata) GetDefaultArchivalBucket() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + diff --git a/common/persistence/persistence-tests/persistenceTestBase.go b/common/persistence/persistence-tests/persistenceTestBase.go index a268a21fe58..74e68b12940 100644 --- a/common/persistence/persistence-tests/persistenceTestBase.go +++ b/common/persistence/persistence-tests/persistenceTestBase.go @@ -62,7 +62,7 @@ type ( DriverName string } // TODO this is used for global domain test - // when crtoss DC is public, remove EnableGlobalDomain + // when cross DC is public, remove EnableGlobalDomain EnableGlobalDomain bool // is global domain enabled IsMasterCluster bool // is master cluster ClusterMetadata cluster.Metadata diff --git a/common/service/config/config.go b/common/service/config/config.go index 08882d57b1c..dfb8d614981 100644 --- a/common/service/config/config.go +++ b/common/service/config/config.go @@ -22,6 +22,7 @@ package config import ( "encoding/json" + "github.com/uber/cadence/common/blobstore/filestore" "time" "github.com/uber-go/tally/m3" @@ -45,6 +46,8 @@ type ( Services map[string]Service `yaml:"services"` // Kafka is the config for connecting to kafka Kafka messaging.KafkaConfig `yaml:"kafka"` + // Archival is the config for archival + Archival Archival `yaml:"archival"` } // Service contains the service specific config items @@ -170,8 +173,7 @@ type ( } // Replicator describes the configuration of replicator - Replicator struct { - } + Replicator struct{} // Logger contains the config items for logger Logger struct { @@ -199,8 +201,6 @@ type ( ClusterInitialFailoverVersions map[string]int64 `yaml:"clusterInitialFailoverVersion"` // ClusterAddress contains all cluster names to corresponding address ClusterAddress map[string]Address `yaml:"clusterAddress"` - // DeploymentGroup contains the deployment group name - DeploymentGroup string `yaml:"deploymentGroup"` } // Address indicate the remote cluster's service name and address @@ -237,6 +237,14 @@ type ( FlushBytes int `yaml:"flushBytes"` } + // Archival contains the config for archival + Archival struct { + // Enabled whether archival is enabled + Enabled bool `yaml:"enabled"` + // Filestore the configuration for file based blobstore + Filestore filestore.Config `yaml:"filestore"` + } + // BootstrapMode is an enum type for ringpop bootstrap mode BootstrapMode int ) diff --git a/common/service/dynamicconfig/constants.go b/common/service/dynamicconfig/constants.go index 50df790fb08..337e9c5d234 100644 --- a/common/service/dynamicconfig/constants.go +++ b/common/service/dynamicconfig/constants.go @@ -52,6 +52,7 @@ var keys = map[Key]string{ EnableNewKafkaClient: "system.enableNewKafkaClient", EnableVisibilitySampling: "system.enableVisibilitySampling", EnableVisibilityToKafka: "system.enableVisibilityToKafka", + EnableArchival: "system.enableArchival", // size limit BlobSizeLimitError: "limit.blobSize.error", @@ -147,7 +148,6 @@ var keys = map[Key]string{ EnableAdminProtection: "history.enableAdminProtection", AdminOperationToken: "history.adminOperationToken", EnableEventsV2: "history.enableEventsV2", - EnableArchival: "history.enableArchival", NumSystemWorkflows: "history.numSystemWorkflows", WorkerPersistenceMaxQPS: "worker.persistenceMaxQPS", @@ -180,6 +180,8 @@ const ( EnableVisibilityToKafka // DisableListVisibilityByFilter is config to disable list open/close workflow using filter DisableListVisibilityByFilter + // EnableArchival is key for enable archival + EnableArchival // BlobSizeLimitError is the per event blob size limit BlobSizeLimitError @@ -358,8 +360,6 @@ const ( ShardSyncMinInterval // DefaultEventEncoding is the encoding type for history events DefaultEventEncoding - // EnableArchival is key for enable archival of workflows in a domain - EnableArchival // NumSystemWorkflows is key for number of system workflows running in total NumSystemWorkflows diff --git a/common/service/service.go b/common/service/service.go index cdbb20a493d..fb699b290ab 100644 --- a/common/service/service.go +++ b/common/service/service.go @@ -26,7 +26,6 @@ import ( "sync/atomic" "time" - "github.com/uber/cadence/common/archival" "github.com/uber/cadence/common/blobstore" "github.com/uber/cadence/client" @@ -69,7 +68,6 @@ type ( MessagingClient messaging.Client DynamicConfig dynamicconfig.Client DispatcherProvider client.DispatcherProvider - ArchivalClient archival.Client BlobstoreClient blobstore.Client } diff --git a/common/util.go b/common/util.go index 94d6199702d..e8b09890ad9 100644 --- a/common/util.go +++ b/common/util.go @@ -25,11 +25,10 @@ import ( "github.com/robfig/cron" "github.com/uber/cadence/common/logging" "github.com/uber/cadence/common/metrics" - "sync" - "time" - "go.uber.org/yarpc/yarpcerrors" "golang.org/x/net/context" + "sync" + "time" "github.com/dgryski/go-farm" "github.com/uber-common/bark" diff --git a/config/development.yaml b/config/development.yaml index de2c9f1623f..8352a2c0065 100644 --- a/config/development.yaml +++ b/config/development.yaml @@ -76,7 +76,22 @@ clustersInfo: active: rpcName: "cadence-frontend" rpcAddress: "127.0.0.1:7933" - deploymentGroup: "development" + +archival: + enabled: true + filestore: + storeDirectory: "/tmp/development/blobstore/" + defaultBucket: + name: "cadence-development" + owner: "cadence" + retentionDays: 10 + customBuckets: + - name: "custom-bucket-1" + owner: "custom-owner-1" + retentionDays: 10 + - name: "custom-bucket-2" + owner: "custom-owner-2" + retentionDays: 5 kafka: clusters: diff --git a/config/development_active.yaml b/config/development_active.yaml index 30de97f7ad2..f3f600e7f8e 100644 --- a/config/development_active.yaml +++ b/config/development_active.yaml @@ -80,7 +80,6 @@ clustersInfo: standby: rpcName: "cadence-frontend" rpcAddress: "127.0.0.1:8933" - deploymentGroup: "development" kafka: clusters: @@ -109,3 +108,19 @@ kafka: topic: standby retry-topic: standby-retry dlq-topic: standby-dlq + +archival: + enabled: true + filestore: + storeDirectory: "/tmp/dev_active/blobstore/" + defaultBucket: + name: "cadence-development" + owner: "cadence" + retentionDays: 10 + customBuckets: + - name: "custom-bucket-1" + owner: "custom-owner-1" + retentionDays: 10 + - name: "custom-bucket-2" + owner: "custom-owner-2" + retentionDays: 5 diff --git a/config/development_standby.yaml b/config/development_standby.yaml index 5c96a59c3aa..d2a7e5be321 100644 --- a/config/development_standby.yaml +++ b/config/development_standby.yaml @@ -80,7 +80,6 @@ clustersInfo: standby: rpcName: "cadence-frontend" rpcAddress: "127.0.0.1:8933" - deploymentGroup: "development" kafka: clusters: @@ -109,3 +108,19 @@ kafka: topic: standby retry-topic: standby-retry dlq-topic: standby-dlq + +archival: + enabled: true + filestore: + storeDirectory: "/tmp/dev_standby/blobstore/" + defaultBucket: + name: "cadence-development" + owner: "cadence" + retentionDays: 10 + customBuckets: + - name: "custom-bucket-1" + owner: "custom-owner-1" + retentionDays: 10 + - name: "custom-bucket-2" + owner: "custom-owner-2" + retentionDays: 5 diff --git a/docker/config_template.yaml b/docker/config_template.yaml index 06a3dc1a7ff..7954c9f6401 100644 --- a/docker/config_template.yaml +++ b/docker/config_template.yaml @@ -72,4 +72,19 @@ clustersInfo: active: rpcName: "cadence-frontend" rpcAddress: "127.0.0.1:7933" - deploymentGroup: "development" + +archival: + enableArchival: true + blobstore: + storeDirectory: "/tmp/blobstore/" + defaultBucket: + name: "cadence-development" + owner: "cadence" + retentionDays: 10 + customBuckets: + - name: "custom-bucket-1" + owner: "custom-owner-1" + retentionDays: 10 + - name: "custom-bucket-2" + owner: "custom-owner-2" + retentionDays: 5 diff --git a/host/onebox.go b/host/onebox.go index 18745ade7d7..b94387758ef 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -28,7 +28,6 @@ import ( "sync" "time" - "github.com/uber/cadence/common/blobstore" "github.com/uber/cadence/common/metrics" "github.com/stretchr/testify/mock" @@ -301,7 +300,6 @@ func (c *cadenceImpl) startFrontend(rpHosts []string, startWG *sync.WaitGroup) { } params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, params.Logger)) params.DynamicConfig = dynamicconfig.NewNopClient() - params.BlobstoreClient = blobstore.NewNopClient() // TODO when cross DC is public, remove this temporary override var kafkaProducer messaging.Producer diff --git a/hostxdc/Integration_domain_failover_test.go b/hostxdc/Integration_domain_failover_test.go index 5e27e694822..e0bb287a08f 100644 --- a/hostxdc/Integration_domain_failover_test.go +++ b/hostxdc/Integration_domain_failover_test.go @@ -100,7 +100,6 @@ var ( clusterName[0]: config.Address{RPCName: common.FrontendServiceName, RPCAddress: clusterAddress[0]}, clusterName[1]: config.Address{RPCName: common.FrontendServiceName, RPCAddress: clusterAddress[1]}, }, - DeploymentGroup: "test", }, { EnableGlobalDomain: true, @@ -112,7 +111,6 @@ var ( clusterName[0]: config.Address{RPCName: common.FrontendServiceName, RPCAddress: clusterAddress[0]}, clusterName[1]: config.Address{RPCName: common.FrontendServiceName, RPCAddress: clusterAddress[1]}, }, - DeploymentGroup: "test", }, } clusterReplicationConfig = []*workflow.ClusterReplicationConfiguration{ @@ -142,7 +140,8 @@ func (s *testCluster) setupCluster(no int, enableEventsV2 bool) { clusterInfo.CurrentClusterName, clusterInfo.ClusterInitialFailoverVersions, clusterInfo.ClusterAddress, - clusterInfo.DeploymentGroup, + dynamicconfig.GetBoolPropertyFn(false), + "", ) s.TestBase = persistencetests.NewTestBaseWithCassandra(&options) s.TestBase.Setup() diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index 8e6e7b75a17..f9c62f43780 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -3074,7 +3074,7 @@ func (wh *WorkflowHandler) bucketName(customBucketName *string) string { if wh.customBucketNameProvided(customBucketName) { return *customBucketName } - return fmt.Sprintf("cadence_%v", wh.Service.GetClusterMetadata().GetDeploymentGroup()) + return wh.Service.GetClusterMetadata().GetDefaultArchivalBucket() } func (wh *WorkflowHandler) customBucketNameProvided(customBucketName *string) bool { diff --git a/service/frontend/workflowHandler_test.go b/service/frontend/workflowHandler_test.go index 83895da104c..25cc2ca46a7 100644 --- a/service/frontend/workflowHandler_test.go +++ b/service/frontend/workflowHandler_test.go @@ -73,7 +73,6 @@ func (s *workflowHandlerSuite) SetupSuite() { if testing.Verbose() { log.SetOutput(os.Stdout) } - } func (s *workflowHandlerSuite) TearDownSuite() { @@ -510,7 +509,7 @@ func (s *workflowHandlerSuite) TestRegisterDomain_Success_CustomBucketAndArchiva err := wh.RegisterDomain(context.Background(), req) assert.NoError(s.T(), err) mMetadataManager.AssertCalled(s.T(), "CreateDomain", mock.Anything) - clusterMetadata.AssertNotCalled(s.T(), "GetDeploymentGroup") + clusterMetadata.AssertNotCalled(s.T(), "GetDefaultArchivalBucket") } func (s *workflowHandlerSuite) TestRegisterDomain_Success_ArchivalEnabledWithoutCustomBucket() { @@ -519,7 +518,7 @@ func (s *workflowHandlerSuite) TestRegisterDomain_Success_ArchivalEnabledWithout clusterMetadata.On("IsGlobalDomainEnabled").Return(false) clusterMetadata.On("GetCurrentClusterName").Return("active") clusterMetadata.On("GetNextFailoverVersion", mock.Anything, mock.Anything).Return(int64(0)) - clusterMetadata.On("GetDeploymentGroup").Return("test-deployment-group") + clusterMetadata.On("GetDefaultArchivalBucket").Return("test-archival-bucket") mMetadataManager := &mocks.MetadataManager{} mMetadataManager.On("GetDomain", mock.Anything).Return(nil, &shared.EntityNotExistsError{}) mMetadataManager.On("CreateDomain", mock.Anything).Return(&persistence.CreateDomainResponse{ @@ -536,7 +535,7 @@ func (s *workflowHandlerSuite) TestRegisterDomain_Success_ArchivalEnabledWithout err := wh.RegisterDomain(context.Background(), req) assert.NoError(s.T(), err) mMetadataManager.AssertCalled(s.T(), "CreateDomain", mock.Anything) - clusterMetadata.AssertCalled(s.T(), "GetDeploymentGroup") + clusterMetadata.AssertCalled(s.T(), "GetDefaultArchivalBucket") } func (s *workflowHandlerSuite) TestRegisterDomain_Success_ArchivalNotEnabled() { @@ -561,7 +560,7 @@ func (s *workflowHandlerSuite) TestRegisterDomain_Success_ArchivalNotEnabled() { err := wh.RegisterDomain(context.Background(), req) assert.NoError(s.T(), err) mMetadataManager.AssertCalled(s.T(), "CreateDomain", mock.Anything) - clusterMetadata.AssertNotCalled(s.T(), "GetDeploymentGroup") + clusterMetadata.AssertNotCalled(s.T(), "GetDefaultArchivalBucket") } func (s *workflowHandlerSuite) TestDescribeDomain_Success_ArchivalNeverEnabled() { @@ -864,7 +863,7 @@ func (s *workflowHandlerSuite) TestUpdateDomain_Success_ArchivalNeverEnabledToEn mMetadataManager.On("GetDomain", mock.Anything).Return(persistenceGetDomainResponse("", shared.ArchivalStatusNeverEnabled), nil) clusterMetadata := &mocks.ClusterMetadata{} clusterMetadata.On("IsGlobalDomainEnabled").Return(false) - clusterMetadata.On("GetDeploymentGroup").Return("test-deployment-group") + clusterMetadata.On("GetDefaultArchivalBucket").Return("test-archival-bucket") mService := cs.NewTestService(clusterMetadata, s.mockMessagingClient, s.mockMetricClient, s.logger) mBlobstore := &mocks.Client{} mBlobstore.On("BucketMetadata", mock.Anything, mock.Anything).Return(bucketMetadataResponse("test-owner", 10), nil) @@ -879,7 +878,7 @@ func (s *workflowHandlerSuite) TestUpdateDomain_Success_ArchivalNeverEnabledToEn assert.NotNil(s.T(), result) assert.NotNil(s.T(), result.Configuration) assert.Equal(s.T(), result.Configuration.GetArchivalStatus(), shared.ArchivalStatusEnabled) - assert.Equal(s.T(), result.Configuration.GetArchivalBucketName(), "cadence_test-deployment-group") + assert.Equal(s.T(), result.Configuration.GetArchivalBucketName(), "test-archival-bucket") assert.Equal(s.T(), result.Configuration.GetArchivalBucketOwner(), "test-owner") assert.Equal(s.T(), result.Configuration.GetArchivalRetentionPeriodInDays(), int32(10)) } @@ -910,7 +909,7 @@ func (s *workflowHandlerSuite) TestUpdateDomain_Success_ArchivalNeverEnabledToEn assert.Equal(s.T(), result.Configuration.GetArchivalBucketName(), "custom-bucket") assert.Equal(s.T(), result.Configuration.GetArchivalBucketOwner(), "test-owner") assert.Equal(s.T(), result.Configuration.GetArchivalRetentionPeriodInDays(), int32(10)) - clusterMetadata.AssertNotCalled(s.T(), "GetDeploymentGroup") + clusterMetadata.AssertNotCalled(s.T(), "GetDefaultArchivalBucket") } func bucketMetadataResponse(owner string, retentionDays int) *blobstore.BucketMetadataResponse { diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 56ed4ce0659..be1a131aa6e 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -25,6 +25,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/uber/cadence/service/worker/sysworkflow" "time" "github.com/pborman/uuid" @@ -71,6 +72,7 @@ type ( metricsClient metrics.Client logger bark.Logger config *Config + archivalClient sysworkflow.ArchivalClient } // shardContextWrapper wraps ShardContext to notify transferQueueProcessor on new tasks. @@ -163,6 +165,7 @@ func NewEngineWithShardContext( metricsClient: shard.GetMetricsClient(), historyEventNotifier: historyEventNotifier, config: config, + archivalClient: sysworkflow.NewArchivalClient(frontendClient, shard.GetConfig().NumSysWorkflows), } var visibilityProducer messaging.Producer if config.EnableVisibilityToKafka() { @@ -1708,6 +1711,22 @@ Update_History_Loop: } transferTasks = append(transferTasks, tranT) timerTasks = append(timerTasks, timerT) + + domainCfg := domainEntry.GetConfig() + if e.shard.GetService().GetClusterMetadata().IsArchivalEnabled() && domainCfg.ArchivalStatus == workflow.ArchivalStatusEnabled { + request := &sysworkflow.ArchiveRequest{ + DomainName: domainEntry.GetInfo().Name, + DomainID: domainEntry.GetInfo().ID, + WorkflowID: workflowExecution.GetWorkflowId(), + RunID: workflowExecution.GetRunId(), + Bucket: domainCfg.ArchivalBucket, + } + + // TODO: this will actually be scheduling a timer to do archival + if err := e.archivalClient.Archive(request); err != nil { + return nil, err + } + } } // Generate a transaction ID for appending events to history diff --git a/service/history/historyEngine2_test.go b/service/history/historyEngine2_test.go index e77e5186bb4..3f3c158a24b 100644 --- a/service/history/historyEngine2_test.go +++ b/service/history/historyEngine2_test.go @@ -54,7 +54,7 @@ type ( *require.Assertions historyEngine *historyEngineImpl mockMatchingClient *mocks.MatchingClient - mockInitiator *mocks.Initiator + mockArchivalClient *mocks.ArchivalClient mockHistoryClient *mocks.HistoryClient mockMetadataMgr *mocks.MetadataManager mockVisibilityMgr *mocks.VisibilityManager @@ -98,7 +98,7 @@ func (s *engine2Suite) SetupTest() { shardID := 0 s.mockMatchingClient = &mocks.MatchingClient{} - s.mockInitiator = &mocks.Initiator{} + s.mockArchivalClient = &mocks.ArchivalClient{} s.mockHistoryClient = &mocks.HistoryClient{} s.mockMetadataMgr = &mocks.MetadataManager{} s.mockVisibilityMgr = &mocks.VisibilityManager{} @@ -116,7 +116,6 @@ func (s *engine2Suite) SetupTest() { s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(false) s.mockDomainCache = &cache.DomainCacheMock{} s.mockDomainCache.On("GetDomainByID", mock.Anything).Return(cache.NewDomainCacheEntryWithInfo(&p.DomainInfo{ID: validDomainID}), nil) - s.mockInitiator.On("Archive", mock.Anything).Return(nil) mockShard := &shardContextImpl{ service: s.mockService, @@ -144,6 +143,7 @@ func (s *engine2Suite) SetupTest() { metricsClient: metrics.NewClient(tally.NoopScope, metrics.History), tokenSerializer: common.NewJSONTaskTokenSerializer(), config: s.config, + archivalClient: s.mockArchivalClient, } h.txProcessor = newTransferQueueProcessor(mockShard, h, s.mockVisibilityMgr, s.mockProducer, s.mockMatchingClient, s.mockHistoryClient, s.logger) h.timerProcessor = newTimerQueueProcessor(mockShard, h, s.mockMatchingClient, s.logger) @@ -158,6 +158,7 @@ func (s *engine2Suite) TearDownTest() { s.mockVisibilityMgr.AssertExpectations(s.T()) s.mockClusterMetadata.AssertExpectations(s.T()) s.mockProducer.AssertExpectations(s.T()) + s.mockArchivalClient.AssertExpectations(s.T()) } func (s *engine2Suite) TestRecordDecisionTaskStartedSuccessStickyEnabled() { diff --git a/service/history/historyEngine3_eventsv2_test.go b/service/history/historyEngine3_eventsv2_test.go index a7bd106df51..015406f91a8 100644 --- a/service/history/historyEngine3_eventsv2_test.go +++ b/service/history/historyEngine3_eventsv2_test.go @@ -64,6 +64,7 @@ type ( mockMessagingClient messaging.Client mockService service.Service mockDomainCache *cache.DomainCacheMock + mockArchivalClient *mocks.ArchivalClient shardClosedCh chan int config *Config @@ -114,6 +115,7 @@ func (s *engine3Suite) SetupTest() { s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(false) s.mockDomainCache = &cache.DomainCacheMock{} s.mockDomainCache.On("GetDomainByID", mock.Anything).Return(cache.NewDomainCacheEntryWithInfo(&p.DomainInfo{ID: validDomainID}), nil) + s.mockArchivalClient = &mocks.ArchivalClient{} mockShard := &shardContextImpl{ service: s.mockService, @@ -143,6 +145,7 @@ func (s *engine3Suite) SetupTest() { metricsClient: metrics.NewClient(tally.NoopScope, metrics.History), tokenSerializer: common.NewJSONTaskTokenSerializer(), config: s.config, + archivalClient: s.mockArchivalClient, } h.txProcessor = newTransferQueueProcessor(mockShard, h, s.mockVisibilityMgr, s.mockProducer, s.mockMatchingClient, s.mockHistoryClient, s.logger) h.timerProcessor = newTimerQueueProcessor(mockShard, h, s.mockMatchingClient, s.logger) @@ -158,6 +161,7 @@ func (s *engine3Suite) TearDownTest() { s.mockVisibilityMgr.AssertExpectations(s.T()) s.mockClusterMetadata.AssertExpectations(s.T()) s.mockProducer.AssertExpectations(s.T()) + s.mockArchivalClient.AssertExpectations(s.T()) } func (s *engine3Suite) TestRecordDecisionTaskStartedSuccessStickyEnabled() { diff --git a/service/history/historyEngine_test.go b/service/history/historyEngine_test.go index fbdf33834af..9e7e4396637 100644 --- a/service/history/historyEngine_test.go +++ b/service/history/historyEngine_test.go @@ -57,7 +57,7 @@ type ( *require.Assertions mockHistoryEngine *historyEngineImpl mockMatchingClient *mocks.MatchingClient - mockInitiator *mocks.Initiator + mockArchivalClient *mocks.ArchivalClient mockHistoryClient *mocks.HistoryClient mockMetadataMgr *mocks.MetadataManager mockVisibilityMgr *mocks.VisibilityManager @@ -102,7 +102,7 @@ func (s *engineSuite) SetupTest() { shardID := 0 s.mockMatchingClient = &mocks.MatchingClient{} - s.mockInitiator = &mocks.Initiator{} + s.mockArchivalClient = &mocks.ArchivalClient{} s.mockHistoryClient = &mocks.HistoryClient{} s.mockMetadataMgr = &mocks.MetadataManager{} s.mockVisibilityMgr = &mocks.VisibilityManager{} @@ -151,7 +151,6 @@ func (s *engineSuite) SetupTest() { s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestCurrentClusterName) s.mockClusterMetadata.On("GetAllClusterFailoverVersions").Return(cluster.TestAllClusterFailoverVersions) s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(false) - s.mockInitiator.On("Archive", mock.Anything).Return(nil) h := &historyEngineImpl{ currentClusterName: currentClusterName, shard: shardContextWrapper, @@ -163,6 +162,7 @@ func (s *engineSuite) SetupTest() { tokenSerializer: common.NewJSONTaskTokenSerializer(), historyEventNotifier: historyEventNotifier, config: NewDynamicConfigForTest(), + archivalClient: s.mockArchivalClient, } h.txProcessor = newTransferQueueProcessor(shardContextWrapper, h, s.mockVisibilityMgr, s.mockProducer, s.mockMatchingClient, s.mockHistoryClient, s.logger) h.timerProcessor = newTimerQueueProcessor(shardContextWrapper, h, s.mockMatchingClient, s.logger) @@ -180,6 +180,7 @@ func (s *engineSuite) TearDownTest() { s.mockVisibilityMgr.AssertExpectations(s.T()) s.mockClusterMetadata.AssertExpectations(s.T()) s.mockProducer.AssertExpectations(s.T()) + s.mockArchivalClient.AssertExpectations(s.T()) } func (s *engineSuite) TestGetMutableStateSync() { @@ -1348,8 +1349,11 @@ func (s *engineSuite) TestRespondDecisionTaskCompletedCompleteWorkflowSuccess() s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(&p.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &p.MutableStateUpdateSessionStats{}}, nil).Once() s.mockMetadataMgr.On("GetDomain", mock.Anything).Return( &persistence.GetDomainResponse{ - Info: &persistence.DomainInfo{ID: domainID}, - Config: &persistence.DomainConfig{Retention: 1}, + Info: &persistence.DomainInfo{ID: domainID}, + Config: &persistence.DomainConfig{ + Retention: 1, + ArchivalStatus: workflow.ArchivalStatusEnabled, + }, ReplicationConfig: &persistence.DomainReplicationConfig{ ActiveClusterName: cluster.TestCurrentClusterName, Clusters: []*persistence.ClusterReplicationConfig{ @@ -1360,6 +1364,9 @@ func (s *engineSuite) TestRespondDecisionTaskCompletedCompleteWorkflowSuccess() }, nil, ) + + s.mockClusterMetadata.On("IsArchivalEnabled").Return(true) + s.mockArchivalClient.On("Archive", mock.Anything).Return(nil) _, err := s.mockHistoryEngine.RespondDecisionTaskCompleted(context.Background(), &history.RespondDecisionTaskCompletedRequest{ DomainUUID: common.StringPtr(domainID), CompleteRequest: &workflow.RespondDecisionTaskCompletedRequest{ @@ -1416,8 +1423,11 @@ func (s *engineSuite) TestRespondDecisionTaskCompletedFailWorkflowSuccess() { s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(&p.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &p.MutableStateUpdateSessionStats{}}, nil).Once() s.mockMetadataMgr.On("GetDomain", mock.Anything).Return( &persistence.GetDomainResponse{ - Info: &persistence.DomainInfo{ID: domainID}, - Config: &persistence.DomainConfig{Retention: 1}, + Info: &persistence.DomainInfo{ID: domainID}, + Config: &persistence.DomainConfig{ + Retention: 1, + ArchivalStatus: workflow.ArchivalStatusEnabled, + }, ReplicationConfig: &persistence.DomainReplicationConfig{ ActiveClusterName: cluster.TestCurrentClusterName, Clusters: []*persistence.ClusterReplicationConfig{ @@ -1428,6 +1438,8 @@ func (s *engineSuite) TestRespondDecisionTaskCompletedFailWorkflowSuccess() { }, nil, ) + s.mockClusterMetadata.On("IsArchivalEnabled").Return(true) + s.mockArchivalClient.On("Archive", mock.Anything).Return(nil) _, err := s.mockHistoryEngine.RespondDecisionTaskCompleted(context.Background(), &history.RespondDecisionTaskCompletedRequest{ DomainUUID: common.StringPtr(domainID), CompleteRequest: &workflow.RespondDecisionTaskCompletedRequest{ diff --git a/service/history/service.go b/service/history/service.go index 4df77558631..4343365d30e 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -121,7 +121,6 @@ type Config struct { // whether or not using eventsV2 EnableEventsV2 dynamicconfig.BoolPropertyFnWithDomainFilter - EnableArchival dynamicconfig.BoolPropertyFnWithDomainFilter NumSysWorkflows dynamicconfig.IntPropertyFn BlobSizeLimitError dynamicconfig.IntPropertyFnWithDomainFilter @@ -199,7 +198,6 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int) *Config { EventEncodingType: dc.GetStringPropertyFnWithDomainFilter(dynamicconfig.DefaultEventEncoding, string(common.EncodingTypeJSON)), EnableEventsV2: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.EnableEventsV2, false), - EnableArchival: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.EnableArchival, false), NumSysWorkflows: dc.GetIntProperty(dynamicconfig.NumSystemWorkflows, 1000), BlobSizeLimitError: dc.GetIntPropertyFilteredByDomain(dynamicconfig.BlobSizeLimitError, 2*1024*1024), diff --git a/service/worker/service.go b/service/worker/service.go index 0d0bee469cd..cba62057d4a 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -117,6 +117,10 @@ func (s *Service) Start() { s.startReplicator(params, base, log) } + if params.ClusterMetadata.IsArchivalEnabled() { + s.startSysWorker(base, log, params.MetricScope) + } + log.Infof("%v started", common.WorkerServiceName) <-s.stopC base.Stop() @@ -132,7 +136,6 @@ func (s *Service) Stop() { } func (s *Service) startReplicator(params *service.BootstrapParams, base service.Service, log bark.Logger) { - history, err := base.GetClientFactory().NewHistoryClient() if err != nil { log.Fatalf("failed to create history service client: %v", err) @@ -155,7 +158,7 @@ func (s *Service) startSysWorker(base service.Service, log bark.Logger, scope ta common.IsWhitelistServiceTransientError) s.waitForFrontendStart(frontendClient, log) - sysWorker := sysworkflow.NewSysWorker(frontendClient, scope, s.params.ArchivalClient) + sysWorker := sysworkflow.NewSysWorker(frontendClient, scope, s.params.BlobstoreClient) if err := sysWorker.Start(); err != nil { sysWorker.Stop() log.Fatalf("failed to start sysworker: %v", err) diff --git a/service/worker/sysworkflow/initiator.go b/service/worker/sysworkflow/archival_client.go similarity index 67% rename from service/worker/sysworkflow/initiator.go rename to service/worker/sysworkflow/archival_client.go index dd807b2e35f..9e1ee3c0291 100644 --- a/service/worker/sysworkflow/initiator.go +++ b/service/worker/sysworkflow/archival_client.go @@ -22,47 +22,61 @@ package sysworkflow import ( "context" + "errors" "fmt" "github.com/uber/cadence/client/frontend" - "github.com/uber/cadence/common/archival" "github.com/uber/cadence/common/service/dynamicconfig" "go.uber.org/cadence/client" "math/rand" ) type ( + // ArchiveRequest is request to Archive + ArchiveRequest struct { + DomainName string + DomainID string + WorkflowID string + RunID string + Bucket string + } + + // BackfillRequest is request to Backfill + BackfillRequest struct { + // TODO: fill out any fields needed for backfill + } - // Initiator is used to trigger system tasks - Initiator interface { - Archive(request *archival.PutRequest) error + // ArchivalClient is used to archive workflow histories + ArchivalClient interface { + Archive(*ArchiveRequest) error + Backfill(*BackfillRequest) error } - initiator struct { + archivalClient struct { cadenceClient client.Client numSWFn dynamicconfig.IntPropertyFn } - // Signal is the data sent to system tasks - Signal struct { + signal struct { RequestType RequestType - ArchiveRequest *archival.PutRequest + ArchiveRequest *ArchiveRequest + BackillRequest *BackfillRequest } ) -// NewInitiator creates a new Initiator -func NewInitiator(frontendClient frontend.Client, numSWFn dynamicconfig.IntPropertyFn) Initiator { - return &initiator{ +// NewArchivalClient creates a new ArchivalClient +func NewArchivalClient(frontendClient frontend.Client, numSWFn dynamicconfig.IntPropertyFn) ArchivalClient { + return &archivalClient{ cadenceClient: client.NewClient(frontendClient, Domain, &client.Options{}), numSWFn: numSWFn, } } // Archive starts an archival task -func (i *initiator) Archive(request *archival.PutRequest) error { +func (c *archivalClient) Archive(request *ArchiveRequest) error { if request.DomainName == Domain { return nil } - workflowID := fmt.Sprintf("%v-%v", WorkflowIDPrefix, rand.Intn(i.numSWFn())) + workflowID := fmt.Sprintf("%v-%v", WorkflowIDPrefix, rand.Intn(c.numSWFn())) workflowOptions := client.StartWorkflowOptions{ ID: workflowID, // TODO: once we have higher load, this should select one random of X task lists to do load balancing @@ -71,12 +85,12 @@ func (i *initiator) Archive(request *archival.PutRequest) error { DecisionTaskStartToCloseTimeout: DecisionTaskStartToCloseTimeout, WorkflowIDReusePolicy: client.WorkflowIDReusePolicyAllowDuplicate, } - signal := Signal{ - RequestType: ArchivalRequest, + signal := signal{ + RequestType: archivalRequest, ArchiveRequest: request, } - _, err := i.cadenceClient.SignalWithStartWorkflow( + _, err := c.cadenceClient.SignalWithStartWorkflow( context.Background(), workflowID, SignalName, @@ -87,3 +101,9 @@ func (i *initiator) Archive(request *archival.PutRequest) error { return err } + +// Backfill starts a backfill task +func (c *archivalClient) Backfill(request *BackfillRequest) error { + // TODO: implement this once backfill is supported + return errors.New("not implemented") +} diff --git a/service/worker/sysworkflow/constants.go b/service/worker/sysworkflow/constants.go index 94c30e0fa36..e055a242976 100644 --- a/service/worker/sysworkflow/constants.go +++ b/service/worker/sysworkflow/constants.go @@ -27,61 +27,59 @@ import ( const ( // Domain is the cadence system workflows domain Domain = "cadence-system" - // DecisionTaskList is the task list that all system workflows share DecisionTaskList = "cadsys-decision-tl" - // SignalName is the name of the cadence signal that system tasks are sent on SignalName = "cadsys-signal-sig" - // WorkflowIDPrefix is the prefix of all system workflow ids WorkflowIDPrefix = "cadsys-wf" - // SignalsUntilContinueAsNew is the number of signals system workflow must receive before continuing as new SignalsUntilContinueAsNew = 1000 - // SystemWorkflowScope scope for all metrics emitted by system workflow SystemWorkflowScope = "system-workflow" - // SystemWorkflowIDTag tag for system workflowID SystemWorkflowIDTag = "system-workflow-id" - // HandledSignalCount counter of number of signals processed by system workflow HandledSignalCount = "handled-signal" - // UnknownSignalTypeErr counter of number of unknown signals received by system workflow UnknownSignalTypeErr = "unknown-signal-err" - // ArchivalFailureErr counter of number of archival activity failures ArchivalFailureErr = "archival-failure" - + // BackfillFailureErr counter of number of backfill activity failures + BackfillFailureErr = "backfill-failure" // ChannelClosedUnexpectedlyError counter of number of unexpected channel closes in system workflow ChannelClosedUnexpectedlyError = "channel-closed-unexpectedly-err" - // ArchivalActivityFnName name of archival activity function ArchivalActivityFnName = "ArchivalActivity" - + // BackfillActivityFnName name of backfill activity function + BackfillActivityFnName = "BackfillActivity" // SystemWorkflowFnName name of system workflow function SystemWorkflowFnName = "SystemWorkflow" - // WorkflowStartToCloseTimeout is the time for the workflow to finish WorkflowStartToCloseTimeout = time.Hour * 24 * 30 - // DecisionTaskStartToCloseTimeout is the time for decision to finish DecisionTaskStartToCloseTimeout = time.Minute + // DomainIDTag tag which identifies the domainID of an archived history + DomainIDTag = "domainID" + // WorkflowIDTag tag which identifies the workflowID of an archived history + WorkflowIDTag = "workflowID" + //RunIDTag tag which identifies the runID of an archived history + RunIDTag = "runID" + // BucketNameTag tag which identifies the bucket name + BucketNameTag = "bucket-name" ) // RequestType is the type for signals that can be sent to system workflows type RequestType int const ( - // ArchivalRequest is the archive signal identifier - ArchivalRequest RequestType = iota + archivalRequest RequestType = iota + backfillRequest ) type contextKey int const ( - archivalClientKey contextKey = iota + blobstoreClientKey contextKey = iota frontendClientKey ) diff --git a/service/worker/sysworkflow/system_workflow.go b/service/worker/sysworkflow/system_workflow.go index 9b1c9aa1f23..199070f7f09 100644 --- a/service/worker/sysworkflow/system_workflow.go +++ b/service/worker/sysworkflow/system_workflow.go @@ -21,18 +21,18 @@ package sysworkflow import ( + "bytes" "context" + "errors" + "fmt" "github.com/uber-go/tally" - "github.com/uber/cadence/client/frontend" - "github.com/uber/cadence/common" - "github.com/uber/cadence/common/archival" + "github.com/uber/cadence/common/blobstore" "github.com/uber/cadence/common/logging" "go.uber.org/cadence" - "go.uber.org/cadence/.gen/go/shared" "go.uber.org/cadence/activity" "go.uber.org/cadence/workflow" "go.uber.org/zap" - "math/rand" + "io/ioutil" "time" ) @@ -46,7 +46,7 @@ func SystemWorkflow(ctx workflow.Context) error { logger.Info("started new system workflow") signalsHandled := 0 for ; signalsHandled < SignalsUntilContinueAsNew; signalsHandled++ { - var signal Signal + var signal signal if more := ch.Receive(ctx, &signal); !more { scope.Counter(ChannelClosedUnexpectedlyError).Inc(1) logger.Error("cadence channel was unexpectedly closed") @@ -56,7 +56,7 @@ func SystemWorkflow(ctx workflow.Context) error { } for { - var signal Signal + var signal signal if ok := ch.ReceiveAsync(&signal); !ok { break } @@ -72,7 +72,7 @@ func SystemWorkflow(ctx workflow.Context) error { return workflow.NewContinueAsNewError(ctx, SystemWorkflowFnName) } -func selectSystemTask(scope tally.Scope, signal Signal, ctx workflow.Context, logger *zap.Logger) { +func selectSystemTask(scope tally.Scope, signal signal, ctx workflow.Context, logger *zap.Logger) { scope.Counter(HandledSignalCount).Inc(1) ao := workflow.ActivityOptions{ @@ -85,13 +85,13 @@ func selectSystemTask(scope tally.Scope, signal Signal, ctx workflow.Context, lo MaximumInterval: time.Minute, ExpirationInterval: time.Hour * 24 * 30, MaximumAttempts: 0, - NonRetriableErrorReasons: []string{"bad-error"}, + NonRetriableErrorReasons: []string{}, }, } actCtx := workflow.WithActivityOptions(ctx, ao) switch signal.RequestType { - case ArchivalRequest: + case archivalRequest: if err := workflow.ExecuteActivity( actCtx, ArchivalActivityFnName, @@ -100,6 +100,15 @@ func selectSystemTask(scope tally.Scope, signal Signal, ctx workflow.Context, lo scope.Counter(ArchivalFailureErr) logger.Error("failed to execute archival activity", zap.Error(err)) } + case backfillRequest: + if err := workflow.ExecuteActivity( + actCtx, + BackfillActivityFnName, + *signal.BackillRequest, + ).Get(ctx, nil); err != nil { + scope.Counter(BackfillFailureErr) + logger.Error("failed to backfill", zap.Error(err)) + } default: scope.Counter(UnknownSignalTypeErr).Inc(1) logger.Error("received unknown request type") @@ -107,81 +116,56 @@ func selectSystemTask(scope tally.Scope, signal Signal, ctx workflow.Context, lo } // ArchivalActivity is the archival activity code -func ArchivalActivity( - ctx context.Context, - request archival.PutRequest, -) error { - userWorkflowID := request.WorkflowID - userRunID := request.RunID - - logger := activity.GetLogger(ctx) - logger.Info("starting archival", - zap.String(logging.TagUserWorkflowID, userWorkflowID), - zap.String(logging.TagUserRunID, userRunID)) - - // TODO: do not actually access history until timer to purge history is moved here - //his, err := history(ctx, domainName, userWorkflowID, userRunID) - //if err != nil { - // logger.Error("failed to get history") - // return err - //} - - archivalClient := ctx.Value(archivalClientKey).(archival.Client) - err := archivalClient.PutWorkflow(ctx, &request) - logger.Info("called archive", zap.Error(err)) - - for i := 0; i < 20; i++ { - time.Sleep(100 * time.Millisecond) - activity.RecordHeartbeat(ctx, i) - - // if activity failure occurs restart the activity from the beginning - if 0 == rand.Intn(40) { - logger.Info("activity failed, will retry...") - return cadence.NewCustomError("some-retryable-error") - } +func ArchivalActivity(ctx context.Context, request ArchiveRequest) error { + fields := zap.Fields( + zap.String(DomainIDTag, request.DomainID), + zap.String(WorkflowIDTag, request.WorkflowID), + zap.String(RunIDTag, request.RunID), + zap.String(BucketNameTag, request.Bucket)) + logger := activity.GetLogger(ctx).WithOptions(fields) + logger.Info("called archival activity") + + blobstoreClient := ctx.Value(blobstoreClientKey).(blobstore.Client) + + // TODO: the rest of this method is temporary (follow diff will do history archives and delete from cassandra) + body := fmt.Sprintf("DomainID: %v\n WorkflowID: %v\n RunID: %v", request.DomainID, request.WorkflowID, request.RunID) + blobFilename := HistoryBlobFilename(request.DomainID, request.WorkflowID, request.RunID) + blob := blobstore.Blob{ + Body: bytes.NewReader([]byte(body)), + CompressionType: blobstore.NoCompression, + Tags: map[string]string{ + DomainIDTag: request.DomainID, + WorkflowIDTag: request.WorkflowID, + RunIDTag: request.RunID, + }, } - - logger.Info("finished archival", - zap.String(logging.TagUserWorkflowID, userWorkflowID), - zap.String(logging.TagUserRunID, userRunID)) - return nil -} - -func history( - ctx context.Context, - domainName string, - workflowID string, - runID string, -) (*shared.History, error) { - - frontendClient := ctx.Value(frontendClientKey).(frontend.Client) - execution := &shared.WorkflowExecution{ - WorkflowId: common.StringPtr(workflowID), - RunId: common.StringPtr(runID), + if err := blobstoreClient.UploadBlob(ctx, request.Bucket, blobFilename, &blob); err != nil { + logger.Error("archival failed, could not upload blob", zap.String("blobname", blobFilename), zap.Error(err)) + return err } - historyResponse, err := frontendClient.GetWorkflowExecutionHistory(ctx, &shared.GetWorkflowExecutionHistoryRequest{ - Domain: common.StringPtr(domainName), - Execution: execution, - }) + // TODO: only downloading blob to show that blobstore works, final impl will not download blobs after upload + downloadedBlob, err := blobstoreClient.DownloadBlob(ctx, request.Bucket, blobFilename) if err != nil { - return nil, err + logger.Error("archival failed, could not download blob", zap.String("blobname", blobFilename), zap.Error(err)) + return err } - events := historyResponse.History.Events - for historyResponse.NextPageToken != nil { - historyResponse, err = frontendClient.GetWorkflowExecutionHistory(ctx, &shared.GetWorkflowExecutionHistoryRequest{ - Domain: common.StringPtr(domainName), - Execution: execution, - NextPageToken: historyResponse.NextPageToken, - }) - if err != nil { - return nil, err - } - events = append(events, historyResponse.History.Events...) + bytes, err := ioutil.ReadAll(downloadedBlob.Body) + if err != nil { + logger.Error("archival failed, could not read body of downloaded blob", zap.String("blobname", blobFilename), zap.Error(err)) + return err } + if len(downloadedBlob.Tags) != len(blob.Tags) { + logger.Error("archival failed, did not read correct blob back", zap.String("blobname", blobFilename)) + return errors.New("downloaded blob is not valid") + } + logger.Info("archival successful", zap.String("blobname", blobFilename), zap.Int("body-size", len(bytes))) + return nil +} - return &shared.History{ - Events: events, - }, nil +// BackfillActivity is the backfill activity code +func BackfillActivity(_ context.Context, _ BackfillRequest) error { + // TODO: write this activity + return nil } diff --git a/service/worker/sysworkflow/sysworker.go b/service/worker/sysworkflow/sysworker.go index 6b4f2ce2e59..fe65a238ec8 100644 --- a/service/worker/sysworkflow/sysworker.go +++ b/service/worker/sysworkflow/sysworker.go @@ -24,7 +24,7 @@ import ( "context" "github.com/uber-go/tally" "github.com/uber/cadence/client/frontend" - "github.com/uber/cadence/common/archival" + "github.com/uber/cadence/common/blobstore" "go.uber.org/cadence/activity" "go.uber.org/cadence/worker" "go.uber.org/cadence/workflow" @@ -43,13 +43,14 @@ type ( func init() { workflow.RegisterWithOptions(SystemWorkflow, workflow.RegisterOptions{Name: SystemWorkflowFnName}) activity.RegisterWithOptions(ArchivalActivity, activity.RegisterOptions{Name: ArchivalActivityFnName}) + activity.RegisterWithOptions(BackfillActivity, activity.RegisterOptions{Name: BackfillActivityFnName}) } // NewSysWorker returns a new SysWorker -func NewSysWorker(frontendClient frontend.Client, scope tally.Scope, archivalClient archival.Client) *SysWorker { +func NewSysWorker(frontendClient frontend.Client, scope tally.Scope, blobstoreClient blobstore.Client) *SysWorker { logger, _ := zap.NewProduction() - actCtx := context.WithValue(context.Background(), archivalClientKey, archivalClient) - actCtx = context.WithValue(context.Background(), frontendClientKey, frontendClient) + actCtx := context.WithValue(context.Background(), blobstoreClientKey, blobstoreClient) + actCtx = context.WithValue(actCtx, frontendClientKey, frontendClient) wo := worker.Options{ Logger: logger, MetricsScope: scope.SubScope(SystemWorkflowScope), diff --git a/common/mocks/Initiator.go b/service/worker/sysworkflow/util.go similarity index 66% rename from common/mocks/Initiator.go rename to service/worker/sysworkflow/util.go index 52a8b0d04a0..544e1a257c6 100644 --- a/common/mocks/Initiator.go +++ b/service/worker/sysworkflow/util.go @@ -18,28 +18,21 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by mockery v1.0.0. DO NOT EDIT. +package sysworkflow -package mocks +import ( + "fmt" + "github.com/dgryski/go-farm" + "strings" +) -import archival "github.com/uber/cadence/common/archival" -import mock "github.com/stretchr/testify/mock" +const ( + historyBlobFilenameExt = ".history" +) -// Initiator is an autogenerated mock type for the Initiator type -type Initiator struct { - mock.Mock -} - -// Archive provides a mock function with given fields: request -func (_m *Initiator) Archive(request *archival.PutRequest) error { - ret := _m.Called(request) - - var r0 error - if rf, ok := ret.Get(0).(func(*archival.PutRequest) error); ok { - r0 = rf(request) - } else { - r0 = ret.Error(0) - } - - return r0 +// HistoryBlobFilename constructs name of history file from domainID, workflowID and runID +func HistoryBlobFilename(domainID string, workflowID string, runID string) string { + hashInput := strings.Join([]string{domainID, workflowID, runID}, "") + hash := farm.Fingerprint64([]byte(hashInput)) + return fmt.Sprintf("%v%v", hash, historyBlobFilenameExt) } diff --git a/tools/cli/adminDomainCommands.go b/tools/cli/adminDomainCommands.go index 0b63acf29ec..66f30e99441 100644 --- a/tools/cli/adminDomainCommands.go +++ b/tools/cli/adminDomainCommands.go @@ -280,36 +280,33 @@ func AdminDescribeDomain(c *cli.Context) { if err != nil { if _, ok := err.(*shared.EntityNotExistsError); !ok { ErrorAndExit("Operation DescribeDomain failed.", err) - } else { - ErrorAndExit(fmt.Sprintf("Domain %s does not exist.", domain), err) - } - } else { - archivalStatus := resp.Configuration.GetArchivalStatus() - bucketName := "" - retention := "" - owner := "" - if archivalStatus != shared.ArchivalStatusNeverEnabled { - bucketName = resp.Configuration.GetArchivalBucketName() - retention = fmt.Sprintf("%v", resp.Configuration.GetArchivalRetentionPeriodInDays()) - owner = resp.Configuration.GetArchivalBucketOwner() } - fmt.Printf("Name: %v\nDescription: %v\nOwnerEmail: %v\nDomainData: %v\nStatus: %v\nRetentionInDays: %v\n"+ - "EmitMetrics: %v\nActiveClusterName: %v\nClusters: %v\nArchivalStatus: %v\n"+ - "BucketName: %v\nArchivalRetentionInDays: %v\nBucketOwner: %v\n", - resp.DomainInfo.GetName(), - resp.DomainInfo.GetDescription(), - resp.DomainInfo.GetOwnerEmail(), - resp.DomainInfo.Data, - resp.DomainInfo.GetStatus(), - resp.Configuration.GetWorkflowExecutionRetentionPeriodInDays(), - resp.Configuration.GetEmitMetric(), - resp.ReplicationConfiguration.GetActiveClusterName(), - serverClustersToString(resp.ReplicationConfiguration.Clusters), - archivalStatus.String(), - bucketName, - retention, - owner) + ErrorAndExit(fmt.Sprintf("Domain %s does not exist.", domain), err) + } + + archivalStatus := resp.Configuration.GetArchivalStatus() + var formatStr = "Name: %v\nDescription: %v\nOwnerEmail: %v\nDomainData: %v\nStatus: %v\nRetentionInDays: %v\n" + + "EmitMetrics: %v\nActiveClusterName: %v\nClusters: %v\nArchivalStatus: %v\n" + descValues := []interface{}{ + resp.DomainInfo.GetName(), + resp.DomainInfo.GetDescription(), + resp.DomainInfo.GetOwnerEmail(), + resp.DomainInfo.Data, + resp.DomainInfo.GetStatus(), + resp.Configuration.GetWorkflowExecutionRetentionPeriodInDays(), + resp.Configuration.GetEmitMetric(), + resp.ReplicationConfiguration.GetActiveClusterName(), + serverClustersToString(resp.ReplicationConfiguration.Clusters), + archivalStatus.String(), + } + if archivalStatus != shared.ArchivalStatusNeverEnabled { + formatStr = formatStr + "BucketName: %v\nArchivalRetentionInDays: %v\nBucketOwner: %v\n" + descValues = append(descValues, + resp.Configuration.GetArchivalBucketName(), + fmt.Sprintf("%v", resp.Configuration.GetArchivalRetentionPeriodInDays()), + resp.Configuration.GetArchivalBucketOwner()) } + fmt.Printf(formatStr, descValues...) } func serverClustersToString(clusters []*shared.ClusterReplicationConfiguration) string { diff --git a/tools/cli/commands.go b/tools/cli/commands.go index 15637df3f42..aac6d8dadd2 100644 --- a/tools/cli/commands.go +++ b/tools/cli/commands.go @@ -180,6 +180,7 @@ const ( showErrorStackEnv = `CADENCE_CLI_SHOW_STACKS` ) +// SetFactory is used to set the ClientFactory global func SetFactory(factory ClientFactory) { cFactory = factory }