Skip to content

Commit

Permalink
Add blobstore interface and plumb into frontend (cadence-workflow#1340)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjdawson2016 authored Dec 18, 2018
1 parent 4676004 commit c0c38f7
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 12 deletions.
2 changes: 2 additions & 0 deletions cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package main

import (
"github.com/uber/cadence/common/archival"
"github.com/uber/cadence/common/blobstore"
"log"
"time"

Expand Down Expand Up @@ -112,6 +113,7 @@ func (s *server) startService() common.Daemon {
dc := dynamicconfig.NewCollection(params.DynamicConfig, params.Logger)

params.ArchivalClient = archival.NewNopClient()
params.BlobstoreClient = blobstore.NewNopClient()

svcCfg := s.cfg.Services[s.name]
params.MetricScope = svcCfg.Metrics.NewScope()
Expand Down
74 changes: 74 additions & 0 deletions common/blobstore/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package blobstore

import (
"context"
"errors"
"io"
)

// CompressionType defines the type of compression used for a blob
type CompressionType int

const (
// NoCompression indicates that blob is not compressed
NoCompression CompressionType = iota
)

// Blob defines a blob
type Blob struct {
Body io.Reader
CompressionType CompressionType
Tags map[string]string
}

// BucketMetadataResponse contains information relating to a bucket's configuration
type BucketMetadataResponse struct {
Owner string
RetentionDays int
}

// Client is used to operate on blobs in a blobstore
type Client interface {
UploadBlob(ctx context.Context, bucket string, path string, blob *Blob) error
DownloadBlob(ctx context.Context, bucket string, path string) (*Blob, error)
BucketMetadata(ctx context.Context, bucket string) (*BucketMetadataResponse, error)
}

type nopClient struct{}

func (c *nopClient) UploadBlob(ctx context.Context, bucket string, path string, blob *Blob) error {
return errors.New("not implemented")
}

func (c *nopClient) DownloadBlob(ctx context.Context, bucket string, path string) (*Blob, error) {
return nil, errors.New("not implemented")
}

func (c *nopClient) BucketMetadata(ctx context.Context, bucket string) (*BucketMetadataResponse, error) {
return nil, errors.New("not implemented")
}

// NewNopClient creates a nop client
func NewNopClient() Client {
return &nopClient{}
}
92 changes: 92 additions & 0 deletions common/mocks/Client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions common/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package service

import (
"github.com/uber/cadence/common/archival"
"github.com/uber/cadence/common/blobstore"
"math/rand"
"os"
"time"
Expand Down Expand Up @@ -66,6 +67,7 @@ type (
DynamicConfig dynamicconfig.Client
DispatcherProvider client.DispatcherProvider
ArchivalClient archival.Client
BlobstoreClient blobstore.Client
}

// RingpopFactory provides a bootstrapped ringpop
Expand Down
5 changes: 4 additions & 1 deletion host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"errors"
"flag"
"fmt"
"github.com/uber/cadence/common/blobstore"
"reflect"
"sync"
"time"
Expand Down Expand Up @@ -289,6 +290,7 @@ func (c *cadenceImpl) startFrontend(rpHosts []string, startWG *sync.WaitGroup) {
}

params.DynamicConfig = dynamicconfig.NewNopClient()
params.BlobstoreClient = blobstore.NewNopClient()

// TODO when cross DC is public, remove this temporary override
var kafkaProducer messaging.Producer
Expand All @@ -305,7 +307,8 @@ func (c *cadenceImpl) startFrontend(rpHosts []string, startWG *sync.WaitGroup) {

c.frontEndService = service.New(params)
c.frontendHandler = frontend.NewWorkflowHandler(
c.frontEndService, frontend.NewConfig(dynamicconfig.NewNopCollection()), c.metadataMgr, c.historyMgr, c.historyV2Mgr, c.visibilityMgr, kafkaProducer)
c.frontEndService, frontend.NewConfig(dynamicconfig.NewNopCollection()),
c.metadataMgr, c.historyMgr, c.historyV2Mgr, c.visibilityMgr, kafkaProducer, params.BlobstoreClient)
err = c.frontendHandler.Start()
if err != nil {
c.logger.WithField("error", err).Fatal("Failed to start frontend")
Expand Down
2 changes: 1 addition & 1 deletion service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (s *Service) Start() {
kafkaProducer = &mocks.KafkaProducer{}
}

wfHandler := NewWorkflowHandler(base, s.config, metadata, history, historyV2, visibility, kafkaProducer)
wfHandler := NewWorkflowHandler(base, s.config, metadata, history, historyV2, visibility, kafkaProducer, params.BlobstoreClient)
wfHandler.Start()

adminHandler := NewAdminHandler(base, pConfig.NumHistoryShards, metadata)
Expand Down
5 changes: 4 additions & 1 deletion service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/uber/cadence/common/blobstore"
"sync"
"time"

Expand Down Expand Up @@ -71,6 +72,7 @@ type (
rateLimiter common.TokenBucket
config *Config
domainReplicator DomainReplicator
blobstoreClient blobstore.Client
service.Service
}

Expand Down Expand Up @@ -122,7 +124,7 @@ var (
// NewWorkflowHandler creates a thrift handler for the cadence service
func NewWorkflowHandler(sVice service.Service, config *Config, metadataMgr persistence.MetadataManager,
historyMgr persistence.HistoryManager, historyV2Mgr persistence.HistoryV2Manager, visibilityMgr persistence.VisibilityManager,
kafkaProducer messaging.Producer) *WorkflowHandler {
kafkaProducer messaging.Producer, blobstoreClient blobstore.Client) *WorkflowHandler {
handler := &WorkflowHandler{
Service: sVice,
config: config,
Expand All @@ -134,6 +136,7 @@ func NewWorkflowHandler(sVice service.Service, config *Config, metadataMgr persi
domainCache: cache.NewDomainCache(metadataMgr, sVice.GetClusterMetadata(), sVice.GetMetricsClient(), sVice.GetLogger()),
rateLimiter: common.NewTokenBucket(config.RPS(), common.NewRealTimeSource()),
domainReplicator: NewDomainReplicator(kafkaProducer, sVice.GetLogger()),
blobstoreClient: blobstoreClient,
}
// prevent us from trying to serve requests before handler's Start() is complete
handler.startWG.Add(1)
Expand Down
29 changes: 20 additions & 9 deletions service/frontend/workflowHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type (
mockVisibilityMgr *mocks.VisibilityManager
mockDomainCache *cache.DomainCacheMock
mockService cs.Service
mockBlobstoreClient *mocks.Client
}
)

Expand Down Expand Up @@ -87,6 +88,7 @@ func (s *workflowHandlerSuite) SetupTest() {
s.mockHistoryV2Mgr = &mocks.HistoryV2Manager{}
s.mockVisibilityMgr = &mocks.VisibilityManager{}
s.mockService = cs.NewTestService(s.mockClusterMetadata, s.mockMessagingClient, s.mockMetricClient, s.logger)
s.mockBlobstoreClient = &mocks.Client{}
}

func (s *workflowHandlerSuite) TestMergeDomainData_Overriding() {
Expand Down Expand Up @@ -162,7 +164,8 @@ func (s *workflowHandlerSuite) TestDisableListVisibilityByFilter() {
config := NewConfig(dc.NewCollection(dc.NewNopClient(), s.logger))
config.DisableListVisibilityByFilter = dc.GetBoolPropertyFnFilteredByDomain(true)

wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, s.mockHistoryV2Mgr, s.mockVisibilityMgr, s.mockProducer)
wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr,
s.mockHistoryV2Mgr, s.mockVisibilityMgr, s.mockProducer, s.mockBlobstoreClient)
mockDomainCache := &cache.DomainCacheMock{}
wh.metricsClient = wh.Service.GetMetricsClient()
wh.domainCache = mockDomainCache
Expand Down Expand Up @@ -230,7 +233,8 @@ func (s *workflowHandlerSuite) TestDisableListVisibilityByFilter() {
func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_RequestIdNotSet() {
config := NewConfig(dc.NewCollection(dc.NewNopClient(), s.logger))
config.RPS = dc.GetIntPropertyFn(10)
wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, s.mockHistoryV2Mgr, s.mockVisibilityMgr, s.mockProducer)
wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr,
s.mockHistoryV2Mgr, s.mockVisibilityMgr, s.mockProducer, s.mockBlobstoreClient)
wh.metricsClient = wh.Service.GetMetricsClient()
wh.startWG.Done()

Expand Down Expand Up @@ -261,7 +265,8 @@ func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_RequestIdNotSet
func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_StartRequestNotSet() {
config := NewConfig(dc.NewCollection(dc.NewNopClient(), s.logger))
config.RPS = dc.GetIntPropertyFn(10)
wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, s.mockHistoryV2Mgr, s.mockVisibilityMgr, s.mockProducer)
wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr,
s.mockHistoryV2Mgr, s.mockVisibilityMgr, s.mockProducer, s.mockBlobstoreClient)
wh.metricsClient = wh.Service.GetMetricsClient()
wh.startWG.Done()

Expand All @@ -273,7 +278,8 @@ func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_StartRequestNot
func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_DomainNotSet() {
config := NewConfig(dc.NewCollection(dc.NewNopClient(), s.logger))
config.RPS = dc.GetIntPropertyFn(10)
wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, s.mockHistoryV2Mgr, s.mockVisibilityMgr, s.mockProducer)
wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr,
s.mockHistoryV2Mgr, s.mockVisibilityMgr, s.mockProducer, s.mockBlobstoreClient)
wh.metricsClient = wh.Service.GetMetricsClient()
wh.startWG.Done()

Expand Down Expand Up @@ -304,7 +310,8 @@ func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_DomainNotSet()
func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_WorkflowIdNotSet() {
config := NewConfig(dc.NewCollection(dc.NewNopClient(), s.logger))
config.RPS = dc.GetIntPropertyFn(10)
wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, s.mockHistoryV2Mgr, s.mockVisibilityMgr, s.mockProducer)
wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, s.mockHistoryV2Mgr,
s.mockVisibilityMgr, s.mockProducer, s.mockBlobstoreClient)
wh.metricsClient = wh.Service.GetMetricsClient()
wh.startWG.Done()

Expand Down Expand Up @@ -335,7 +342,8 @@ func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_WorkflowIdNotSe
func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_WorkflowTypeNotSet() {
config := NewConfig(dc.NewCollection(dc.NewNopClient(), s.logger))
config.RPS = dc.GetIntPropertyFn(10)
wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, s.mockHistoryV2Mgr, s.mockVisibilityMgr, s.mockProducer)
wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, s.mockHistoryV2Mgr,
s.mockVisibilityMgr, s.mockProducer, s.mockBlobstoreClient)
wh.metricsClient = wh.Service.GetMetricsClient()
wh.startWG.Done()

Expand Down Expand Up @@ -367,7 +375,8 @@ func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_WorkflowTypeNot
func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_TaskListNotSet() {
config := NewConfig(dc.NewCollection(dc.NewNopClient(), s.logger))
config.RPS = dc.GetIntPropertyFn(10)
wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, s.mockHistoryV2Mgr, s.mockVisibilityMgr, s.mockProducer)
wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, s.mockHistoryV2Mgr,
s.mockVisibilityMgr, s.mockProducer, s.mockBlobstoreClient)
wh.metricsClient = wh.Service.GetMetricsClient()
wh.startWG.Done()

Expand Down Expand Up @@ -399,7 +408,8 @@ func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_TaskListNotSet(
func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_InvalidExecutionStartToCloseTimeout() {
config := NewConfig(dc.NewCollection(dc.NewNopClient(), s.logger))
config.RPS = dc.GetIntPropertyFn(10)
wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, s.mockHistoryV2Mgr, s.mockVisibilityMgr, s.mockProducer)
wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, s.mockHistoryV2Mgr,
s.mockVisibilityMgr, s.mockProducer, s.mockBlobstoreClient)
wh.metricsClient = wh.Service.GetMetricsClient()
wh.startWG.Done()

Expand Down Expand Up @@ -431,7 +441,8 @@ func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_InvalidExecutio
func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_InvalidTaskStartToCloseTimeout() {
config := NewConfig(dc.NewCollection(dc.NewNopClient(), s.logger))
config.RPS = dc.GetIntPropertyFn(10)
wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, s.mockHistoryV2Mgr, s.mockVisibilityMgr, s.mockProducer)
wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, s.mockHistoryV2Mgr,
s.mockVisibilityMgr, s.mockProducer, s.mockBlobstoreClient)
wh.metricsClient = wh.Service.GetMetricsClient()
wh.startWG.Done()

Expand Down

0 comments on commit c0c38f7

Please sign in to comment.