Skip to content

Commit

Permalink
Add archival config and support file based blobstore (cadence-workflo…
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjdawson2016 authored Jan 2, 2019
1 parent 4c1f246 commit 4814b92
Show file tree
Hide file tree
Showing 40 changed files with 1,514 additions and 334 deletions.
18 changes: 12 additions & 6 deletions cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@
package main

import (
"github.com/uber/cadence/common/blobstore/filestore"
"log"
"time"

"github.com/uber/cadence/client"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/archival"
"github.com/uber/cadence/common/blobstore"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/service"
Expand Down Expand Up @@ -113,22 +112,22 @@ func (s *server) startService() common.Daemon {
params.DynamicConfig = dynamicconfig.NewNopClient()
dc := dynamicconfig.NewCollection(params.DynamicConfig, params.Logger)

params.ArchivalClient = archival.NewNopClient()
params.BlobstoreClient = blobstore.NewNopClient()

svcCfg := s.cfg.Services[s.name]
params.MetricScope = svcCfg.Metrics.NewScope()
params.RPCFactory = svcCfg.RPC.NewFactory(params.Name, params.Logger)
params.PProfInitializer = svcCfg.PProf.NewInitializer(params.Logger)
enableGlobalDomain := dc.GetBoolProperty(dynamicconfig.EnableGlobalDomain, s.cfg.ClustersInfo.EnableGlobalDomain)
enableArchival := dc.GetBoolProperty(dynamicconfig.EnableArchival, s.cfg.Archival.Enabled)

params.ClusterMetadata = cluster.NewMetadata(
enableGlobalDomain,
s.cfg.ClustersInfo.FailoverVersionIncrement,
s.cfg.ClustersInfo.MasterClusterName,
s.cfg.ClustersInfo.CurrentClusterName,
s.cfg.ClustersInfo.ClusterInitialFailoverVersions,
s.cfg.ClustersInfo.ClusterAddress,
s.cfg.ClustersInfo.DeploymentGroup,
enableArchival,
s.cfg.Archival.Filestore.DefaultBucket.Name,
)
params.DispatcherProvider = client.NewIPYarpcDispatcherProvider()
// TODO: We need to switch Cadence to use zap logger, until then just pass zap.NewNop
Expand All @@ -143,6 +142,13 @@ func (s *server) startService() common.Daemon {
params.MessagingClient = nil
}

if params.ClusterMetadata.IsArchivalEnabled() {
params.BlobstoreClient, err = filestore.NewClient(&s.cfg.Archival.Filestore)
if err != nil {
log.Fatalf("error creating blobstore: %v", err)
}
}

params.Logger.Info("Starting service " + s.name)

var daemon common.Daemon
Expand Down
80 changes: 0 additions & 80 deletions common/archival/client.go

This file was deleted.

180 changes: 180 additions & 0 deletions common/blobstore/filestore/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package filestore

import (
"context"
"fmt"
"os"
"path/filepath"
"sync"

"github.com/uber/cadence/common/blobstore"
)

const (
metadataFilename = "metadata"
)

type client struct {
sync.Mutex
storeDirectory string
}

// NewClient returns a new Client backed by file system
func NewClient(cfg *Config) (blobstore.Client, error) {
if err := cfg.Validate(); err != nil {
return nil, err
}
if err := setupDirectories(cfg); err != nil {
return nil, err
}
if err := writeMetadataFiles(cfg); err != nil {
return nil, err
}
return &client{
storeDirectory: cfg.StoreDirectory,
}, nil
}

func (c *client) UploadBlob(_ context.Context, bucket string, filename string, blob *blobstore.Blob) error {
c.Lock()
defer c.Unlock()

exists, err := directoryExists(bucketDirectory(c.storeDirectory, bucket))
if err != nil {
return err
}
if !exists {
return blobstore.ErrBucketNotExists
}
data, err := serializeBlob(blob)
if err != nil {
return err
}
blobPath := bucketItemPath(c.storeDirectory, bucket, filename)
return writeFile(blobPath, data)
}

func (c *client) DownloadBlob(_ context.Context, bucket string, filename string) (*blobstore.Blob, error) {
c.Lock()
defer c.Unlock()

exists, err := directoryExists(bucketDirectory(c.storeDirectory, bucket))
if err != nil {
return nil, err
}
if !exists {
return nil, blobstore.ErrBucketNotExists
}
blobPath := bucketItemPath(c.storeDirectory, bucket, filename)
data, err := readFile(blobPath)
if err != nil {
if os.IsNotExist(err) {
return nil, blobstore.ErrBlobNotExists
}
return nil, err
}
return deserializeBlob(data)
}

func (c *client) BucketMetadata(_ context.Context, bucket string) (*blobstore.BucketMetadataResponse, error) {
c.Lock()
defer c.Unlock()

exists, err := directoryExists(bucketDirectory(c.storeDirectory, bucket))
if err != nil {
return nil, err
}
if !exists {
return nil, blobstore.ErrBucketNotExists
}

metadataFilepath := bucketItemPath(c.storeDirectory, bucket, metadataFilename)
exists, err = fileExists(metadataFilepath)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("failed to get metadata, file at %v does not exist", metadataFilepath)
}

data, err := readFile(metadataFilepath)
if err != nil {
return nil, err
}
bucketCfg, err := deserializeBucketConfig(data)
if err != nil {
return nil, err
}

return &blobstore.BucketMetadataResponse{
Owner: bucketCfg.Owner,
RetentionDays: bucketCfg.RetentionDays,
}, nil
}

func setupDirectories(cfg *Config) error {
if err := mkdirAll(cfg.StoreDirectory); err != nil {
return err
}
if err := mkdirAll(bucketDirectory(cfg.StoreDirectory, cfg.DefaultBucket.Name)); err != nil {
return err
}
for _, b := range cfg.CustomBuckets {
if err := mkdirAll(bucketDirectory(cfg.StoreDirectory, b.Name)); err != nil {
return err
}
}
return nil
}

func writeMetadataFiles(cfg *Config) error {
writeMetadataFile := func(bucketConfig BucketConfig) error {
path := bucketItemPath(cfg.StoreDirectory, bucketConfig.Name, metadataFilename)
bytes, err := serializeBucketConfig(&bucketConfig)
if err != nil {
return fmt.Errorf("failed to write metadata file for bucket %v: %v", bucketConfig.Name, err)
}
if err := writeFile(path, bytes); err != nil {
return fmt.Errorf("failed to write metadata file for bucket %v: %v", bucketConfig.Name, err)
}
return nil
}

if err := writeMetadataFile(cfg.DefaultBucket); err != nil {
return err
}
for _, b := range cfg.CustomBuckets {
if err := writeMetadataFile(b); err != nil {
return err
}
}
return nil
}

func bucketDirectory(storeDirectory string, bucketName string) string {
return filepath.Join(storeDirectory, bucketName)
}

func bucketItemPath(storeDirectory string, bucketName string, filename string) string {
return filepath.Join(bucketDirectory(storeDirectory, bucketName), filename)
}
Loading

0 comments on commit 4814b92

Please sign in to comment.