Skip to content

Commit

Permalink
Move blobstore retry logic into specific implementations (cadence-wor…
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjdawson2016 authored Mar 25, 2019
1 parent 2f5e5ef commit 243874b
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 45 deletions.
20 changes: 16 additions & 4 deletions common/blobstore/filestore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,17 @@ package filestore
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"sync"

"github.com/uber-common/bark"
"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/blobstore"
"github.com/uber/cadence/common/blobstore/blob"
"github.com/uber/cadence/common/logging"
"os"
"path/filepath"
"strings"
"sync"
)

const (
Expand Down Expand Up @@ -325,6 +327,16 @@ func (c *client) BucketMetadata(_ context.Context, bucket string) (*blobstore.Bu
}, nil
}

func (c *client) IsRetryableError(err error) bool {
return false
}

func (c *client) GetRetryPolicy() backoff.RetryPolicy {
policy := backoff.NewExponentialRetryPolicy(0)
policy.SetMaximumAttempts(1)
return policy
}

func setupDirectories(cfg *Config) error {
if err := mkdirAll(cfg.StoreDirectory); err != nil {
return err
Expand Down
5 changes: 5 additions & 0 deletions common/blobstore/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ package blobstore

import (
"context"

"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/blobstore/blob"
)

Expand Down Expand Up @@ -51,4 +53,7 @@ type Client interface {
Delete(ctx context.Context, bucket string, key blob.Key) (bool, error)
ListByPrefix(ctx context.Context, bucket string, prefix string) ([]blob.Key, error)
BucketMetadata(ctx context.Context, bucket string) (*BucketMetadataResponse, error)

IsRetryableError(err error) bool
GetRetryPolicy() backoff.RetryPolicy
}
9 changes: 9 additions & 0 deletions common/blobstore/metricClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package blobstore

import (
"context"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/blobstore/blob"
"github.com/uber/cadence/common/metrics"
)
Expand Down Expand Up @@ -118,3 +119,11 @@ func (c *metricClient) BucketMetadata(ctx context.Context, bucket string) (*Buck
}
return resp, err
}

func (c *metricClient) IsRetryableError(err error) bool {
return c.client.IsRetryableError(err)
}

func (c *metricClient) GetRetryPolicy() backoff.RetryPolicy {
return c.client.GetRetryPolicy()
}
8 changes: 8 additions & 0 deletions common/blobstore/retryableClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,11 @@ func (c *retryableClient) BucketMetadata(ctx context.Context, bucket string) (*B
err := backoff.Retry(op, c.policy, c.isRetryable)
return resp, err
}

func (c *retryableClient) IsRetryableError(err error) bool {
return c.client.IsRetryableError(err)
}

func (c *retryableClient) GetRetryPolicy() backoff.RetryPolicy {
return c.client.GetRetryPolicy()
}
31 changes: 31 additions & 0 deletions common/mocks/BlobstoreClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 0 additions & 29 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@ const (
retryKafkaOperationMaxInterval = 10 * time.Second
retryKafkaOperationExpirationInterval = 30 * time.Second

retryBlobstoreClientInitialInterval = time.Second
retryBlobstoreClientMaxInterval = 10 * time.Second
retryBlobstoreClientExpirationInterval = time.Minute

// FailureReasonCompleteResultExceedsLimit is failureReason for complete result exceeds limit
FailureReasonCompleteResultExceedsLimit = "COMPLETE_RESULT_EXCEEDS_LIMIT"
// FailureReasonFailureDetailsExceedsLimit is failureReason for failure details exceeds limit
Expand Down Expand Up @@ -187,15 +183,6 @@ func CreateKafkaOperationRetryPolicy() backoff.RetryPolicy {
return policy
}

// CreateBlobstoreClientRetryPolicy creates a retry policy for blobstore client
func CreateBlobstoreClientRetryPolicy() backoff.RetryPolicy {
policy := backoff.NewExponentialRetryPolicy(retryBlobstoreClientInitialInterval)
policy.SetMaximumInterval(retryBlobstoreClientMaxInterval)
policy.SetExpirationInterval(retryBlobstoreClientExpirationInterval)

return policy
}

// IsPersistenceTransientError checks if the error is a transient persistence error
func IsPersistenceTransientError(err error) bool {
switch err.(type) {
Expand All @@ -211,22 +198,6 @@ func IsKafkaTransientError(err error) bool {
return true
}

// IsBlobstoreTransientError checks if the error is a retryable error.
func IsBlobstoreTransientError(err error) bool {
return !IsBlobstoreNonRetryableError(err)
}

// IsBlobstoreNonRetryableError checks if the error is a non retryable error.
func IsBlobstoreNonRetryableError(err error) bool {
switch err.(type) {
case *workflow.BadRequestError:
return true
case *workflow.EntityNotExistsError:
return true
}
return false
}

// IsServiceTransientError checks if the error is a retryable error.
func IsServiceTransientError(err error) bool {
return !IsServiceNonRetryableError(err)
Expand Down
4 changes: 2 additions & 2 deletions host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,8 +507,8 @@ func (c *cadenceImpl) startWorkerClientWorker(params *service.BootstrapParams, s
)
blobstoreClient := blobstore.NewRetryableClient(
blobstore.NewMetricClient(c.blobstoreClient, service.GetMetricsClient()),
common.CreateBlobstoreClientRetryPolicy(),
common.IsBlobstoreTransientError)
c.blobstoreClient.GetRetryPolicy(),
c.blobstoreClient.IsRetryableError)
workerConfig := worker.NewConfig(dynamicconfig.NewNopCollection())
workerConfig.ArchiverConfig.ArchiverConcurrency = dynamicconfig.GetIntPropertyFn(10)
bc := &archiver.BootstrapContainer{
Expand Down
4 changes: 2 additions & 2 deletions service/worker/archiver/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func blobExists(ctx context.Context, blobstoreClient blobstore.Client, bucket st
cancel()
for err != nil {
activity.RecordHeartbeat(ctx)
if !common.IsBlobstoreTransientError(err) {
if !blobstoreClient.IsRetryableError(err) {
return false, cadence.NewCustomError(errBlobExists)
}
if contextExpired(ctx) {
Expand All @@ -231,7 +231,7 @@ func uploadBlob(ctx context.Context, blobstoreClient blobstore.Client, bucket st
cancel()
for err != nil {
activity.RecordHeartbeat(ctx)
if !common.IsBlobstoreTransientError(err) {
if !blobstoreClient.IsRetryableError(err) {
return cadence.NewCustomError(errUploadBlob)
}
if contextExpired(ctx) {
Expand Down
14 changes: 8 additions & 6 deletions service/worker/archiver/activities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ const (
var (
errPersistenceNonRetryable = errors.New("persistence non-retryable error")
errPersistenceRetryable = &shared.InternalServiceError{}
errBlobstoreNonRetryable = &shared.BadRequestError{}
errBlobstoreRetryable = errors.New("blobstore retryable error")
)

type activitiesSuite struct {
Expand Down Expand Up @@ -303,7 +301,8 @@ func (s *activitiesSuite) TestUploadHistoryActivity_Fail_BlobExistsNonRetryableE
}
mockHistoryBlobIterator.On("Next").Return(historyBlob, nil)
mockBlobstore := &mocks.BlobstoreClient{}
mockBlobstore.On("Exists", mock.Anything, mock.Anything, mock.Anything).Return(false, errBlobstoreNonRetryable)
mockBlobstore.On("Exists", mock.Anything, mock.Anything, mock.Anything).Return(false, errors.New("some error"))
mockBlobstore.On("IsRetryableError", mock.Anything).Return(false)
container := &BootstrapContainer{
Logger: s.logger,
MetricsClient: s.metricsClient,
Expand Down Expand Up @@ -340,7 +339,8 @@ func (s *activitiesSuite) TestUploadHistoryActivity_Fail_TimeoutOnBlobExists() {
}
mockHistoryBlobIterator.On("Next").Return(historyBlob, nil)
mockBlobstore := &mocks.BlobstoreClient{}
mockBlobstore.On("Exists", mock.Anything, mock.Anything, mock.Anything).Return(false, errBlobstoreRetryable)
mockBlobstore.On("Exists", mock.Anything, mock.Anything, mock.Anything).Return(false, errors.New("some error"))
mockBlobstore.On("IsRetryableError", mock.Anything).Return(true)
container := &BootstrapContainer{
Logger: s.logger,
MetricsClient: s.metricsClient,
Expand Down Expand Up @@ -418,7 +418,8 @@ func (s *activitiesSuite) TestUploadHistoryActivity_Fail_UploadBlobNonRetryableE
mockHistoryBlobIterator.On("Next").Return(historyBlob, nil)
mockBlobstore := &mocks.BlobstoreClient{}
mockBlobstore.On("Exists", mock.Anything, mock.Anything, mock.Anything).Return(false, nil)
mockBlobstore.On("Upload", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errBlobstoreNonRetryable)
mockBlobstore.On("Upload", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("some error"))
mockBlobstore.On("IsRetryableError", mock.Anything).Return(false)
container := &BootstrapContainer{
Logger: s.logger,
MetricsClient: s.metricsClient,
Expand Down Expand Up @@ -458,7 +459,8 @@ func (s *activitiesSuite) TestUploadHistoryActivity_Fail_TimeoutOnUploadBlob() {
mockHistoryBlobIterator.On("Next").Return(historyBlob, nil)
mockBlobstore := &mocks.BlobstoreClient{}
mockBlobstore.On("Exists", mock.Anything, mock.Anything, mock.Anything).Return(false, nil)
mockBlobstore.On("Upload", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errBlobstoreRetryable)
mockBlobstore.On("Upload", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("some error"))
mockBlobstore.On("IsRetryableError", mock.Anything).Return(true)
container := &BootstrapContainer{
Logger: s.logger,
MetricsClient: s.metricsClient,
Expand Down
4 changes: 2 additions & 2 deletions service/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ func (s *Service) startArchiver(base service.Service, pFactory persistencefactor

blobstoreClient := blobstore.NewRetryableClient(
blobstore.NewMetricClient(s.params.BlobstoreClient, s.metricsClient),
common.CreateBlobstoreClientRetryPolicy(),
common.IsBlobstoreTransientError)
s.params.BlobstoreClient.GetRetryPolicy(),
s.params.BlobstoreClient.IsRetryableError)

bc := &archiver.BootstrapContainer{
PublicClient: publicClient,
Expand Down

0 comments on commit 243874b

Please sign in to comment.