Skip to content

Commit

Permalink
Update replication path to use lazy retry on service busy error (cade…
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 authored Jul 9, 2020
1 parent 2911586 commit 598c507
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 29 deletions.
17 changes: 15 additions & 2 deletions client/history/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync"
"time"

"go.uber.org/cadence/.gen/go/shared"
"go.uber.org/yarpc"

h "github.com/uber/cadence/.gen/go/history"
Expand Down Expand Up @@ -791,6 +792,7 @@ func (c *clientImpl) GetReplicationMessages(
var wg sync.WaitGroup
wg.Add(len(requestsByClient))
respChan := make(chan *replicator.GetReplicationMessagesResponse, len(requestsByClient))
errChan := make(chan error, 1)
for client, req := range requestsByClient {
go func(client historyserviceclient.Interface, request *replicator.GetReplicationMessagesRequest) {
defer wg.Done()
Expand All @@ -800,6 +802,13 @@ func (c *clientImpl) GetReplicationMessages(
resp, err := client.GetReplicationMessages(ctx, request, opts...)
if err != nil {
c.logger.Warn("Failed to get replication tasks from client", tag.Error(err))
// Returns service busy error to notify replication
if _, ok := err.(*shared.ServiceBusyError); ok {
select {
case errChan <- err:
default:
}
}
return
}
respChan <- resp
Expand All @@ -808,15 +817,19 @@ func (c *clientImpl) GetReplicationMessages(

wg.Wait()
close(respChan)
close(errChan)

response := &replicator.GetReplicationMessagesResponse{MessagesByShard: make(map[int32]*replicator.ReplicationMessages)}
for resp := range respChan {
for shardID, tasks := range resp.MessagesByShard {
response.MessagesByShard[shardID] = tasks
}
}

return response, nil
var err error
if len(errChan) > 0 {
err = <-errChan
}
return response, err
}

func (c *clientImpl) GetDLQReplicationMessages(
Expand Down
4 changes: 3 additions & 1 deletion common/backoff/retrypolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (p *ExponentialRetryPolicy) ComputeNextDelay(elapsedTime time.Duration, num
return time.Duration(nextInterval)
}

// ComputeNextDelay returns the next delay interval. This is used by Retrier to delay calling the operation again
// ComputeNextDelay returns the next delay interval.
func (tp *TwoPhaseRetryPolicy) ComputeNextDelay(elapsedTime time.Duration, numAttempts int) time.Duration {
nextInterval := tp.firstPolicy.ComputeNextDelay(elapsedTime, numAttempts)
if nextInterval == done {
Expand Down Expand Up @@ -242,3 +242,5 @@ func (r *retrierImpl) NextBackOff() time.Duration {
func (r *retrierImpl) getElapsedTime() time.Duration {
return r.clock.Now().Sub(r.startTime)
}


3 changes: 3 additions & 0 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ var keys = map[Key]string{
ReplicationTaskFetcherAggregationInterval: "history.ReplicationTaskFetcherAggregationInterval",
ReplicationTaskFetcherTimerJitterCoefficient: "history.ReplicationTaskFetcherTimerJitterCoefficient",
ReplicationTaskFetcherErrorRetryWait: "history.ReplicationTaskFetcherErrorRetryWait",
ReplicationTaskFetcherServiceBusyWait: "history.ReplicationTaskFetcherServiceBusyWait",
ReplicationTaskProcessorErrorRetryWait: "history.ReplicationTaskProcessorErrorRetryWait",
ReplicationTaskProcessorErrorRetryMaxAttempts: "history.ReplicationTaskProcessorErrorRetryMaxAttempts",
ReplicationTaskProcessorNoTaskInitialWait: "history.ReplicationTaskProcessorNoTaskInitialWait",
Expand Down Expand Up @@ -825,6 +826,8 @@ const (
ReplicationTaskFetcherTimerJitterCoefficient
// ReplicationTaskFetcherErrorRetryWait is the wait time when fetcher encounters error
ReplicationTaskFetcherErrorRetryWait
// ReplicationTaskFetcherServiceBusyWait is the wait time when fetcher encounters service busy error
ReplicationTaskFetcherServiceBusyWait
// ReplicationTaskProcessorErrorRetryWait is the initial retry wait when we see errors in applying replication tasks
ReplicationTaskProcessorErrorRetryWait
// ReplicationTaskProcessorErrorRetryMaxAttempts is the max retry attempts for applying replication tasks
Expand Down
22 changes: 22 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ const (
retryTaskProcessingMaxInterval = 100 * time.Millisecond
retryTaskProcessingMaxAttempts = 3

replicationServiceBusyInitialInterval = 2 * time.Second
replicationServiceBusyMaxInterval = 10 * time.Second
replicationServiceBusyExpirationInterval = 30 * time.Second

contextExpireThreshold = 10 * time.Millisecond

// FailureReasonCompleteResultExceedsLimit is failureReason for complete result exceeds limit
Expand Down Expand Up @@ -190,6 +194,15 @@ func CreateTaskProcessingRetryPolicy() backoff.RetryPolicy {
return policy
}

// CreateReplicationServiceBusyRetryPolicy creates a retry policy to handle replication service busy
func CreateReplicationServiceBusyRetryPolicy() backoff.RetryPolicy {
policy := backoff.NewExponentialRetryPolicy(replicationServiceBusyInitialInterval)
policy.SetMaximumInterval(replicationServiceBusyMaxInterval)
policy.SetExpirationInterval(replicationServiceBusyExpirationInterval)

return policy
}

// IsPersistenceTransientError checks if the error is a transient persistence error
func IsPersistenceTransientError(err error) bool {
switch err.(type) {
Expand Down Expand Up @@ -230,6 +243,15 @@ func IsServiceTransientError(err error) bool {
return false
}

// IsServiceBusyError checks if the error is a service busy error.
func IsServiceBusyError(err error) bool {
switch err.(type) {
case *workflow.ServiceBusyError:
return true
}
return false
}

// WorkflowIDToHistoryShard is used to map a workflowID to a shardID
func WorkflowIDToHistoryShard(workflowID string, numberOfShards int) int {
hash := farm.Fingerprint32([]byte(workflowID))
Expand Down
2 changes: 2 additions & 0 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ type Config struct {
ReplicationTaskFetcherAggregationInterval dynamicconfig.DurationPropertyFn
ReplicationTaskFetcherTimerJitterCoefficient dynamicconfig.FloatPropertyFn
ReplicationTaskFetcherErrorRetryWait dynamicconfig.DurationPropertyFn
ReplicationTaskFetcherServiceBusyWait dynamicconfig.DurationPropertyFn
ReplicationTaskProcessorErrorRetryWait dynamicconfig.DurationPropertyFnWithShardIDFilter
ReplicationTaskProcessorErrorRetryMaxAttempts dynamicconfig.IntPropertyFnWithShardIDFilter
ReplicationTaskProcessorNoTaskRetryWait dynamicconfig.DurationPropertyFnWithShardIDFilter
Expand Down Expand Up @@ -430,6 +431,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA
ReplicationTaskFetcherAggregationInterval: dc.GetDurationProperty(dynamicconfig.ReplicationTaskFetcherAggregationInterval, 2*time.Second),
ReplicationTaskFetcherTimerJitterCoefficient: dc.GetFloat64Property(dynamicconfig.ReplicationTaskFetcherTimerJitterCoefficient, 0.15),
ReplicationTaskFetcherErrorRetryWait: dc.GetDurationProperty(dynamicconfig.ReplicationTaskFetcherErrorRetryWait, time.Second),
ReplicationTaskFetcherServiceBusyWait: dc.GetDurationProperty(dynamicconfig.ReplicationTaskFetcherServiceBusyWait, 60*time.Second),
ReplicationTaskProcessorErrorRetryWait: dc.GetDurationPropertyFilteredByShardID(dynamicconfig.ReplicationTaskProcessorErrorRetryWait, 50*time.Millisecond),
ReplicationTaskProcessorErrorRetryMaxAttempts: dc.GetIntPropertyFilteredByShardID(dynamicconfig.ReplicationTaskProcessorErrorRetryMaxAttempts, 5),
ReplicationTaskProcessorNoTaskRetryWait: dc.GetDurationPropertyFilteredByShardID(dynamicconfig.ReplicationTaskProcessorNoTaskInitialWait, 2*time.Second),
Expand Down
23 changes: 19 additions & 4 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
m "github.com/uber/cadence/.gen/go/matching"
r "github.com/uber/cadence/.gen/go/replicator"
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/client/admin"
hc "github.com/uber/cadence/client/history"
"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/common"
Expand Down Expand Up @@ -287,11 +288,25 @@ func NewEngineWithShardContext(
replicationTaskExecutors := make(map[string]replication.TaskExecutor)
for _, replicationTaskFetcher := range replicationTaskFetchers.GetFetchers() {
sourceCluster := replicationTaskFetcher.GetSourceCluster()
// Intentionally use the raw client to create its own retry policy
adminClient := shard.GetService().GetClientBean().GetRemoteAdminClient(sourceCluster)
adminRetryableClient := admin.NewRetryableClient(
adminClient,
common.CreateReplicationServiceBusyRetryPolicy(),
common.IsServiceBusyError,
)
// Intentionally use the raw client to create its own retry policy
historyClient := shard.GetService().GetClientBean().GetHistoryClient()
historyRetryableClient := hc.NewRetryableClient(
historyClient,
common.CreateReplicationServiceBusyRetryPolicy(),
common.IsServiceBusyError,
)
nDCHistoryResender := xdc.NewNDCHistoryResender(
shard.GetDomainCache(),
shard.GetService().GetClientBean().GetRemoteAdminClient(sourceCluster),
adminRetryableClient,
func(ctx context.Context, request *h.ReplicateEventsV2Request) error {
return shard.GetService().GetHistoryClient().ReplicateEventsV2(ctx, request)
return historyRetryableClient.ReplicateEventsV2(ctx, request)
},
shard.GetService().GetPayloadSerializer(),
nil,
Expand All @@ -300,9 +315,9 @@ func NewEngineWithShardContext(
historyRereplicator := xdc.NewHistoryRereplicator(
currentClusterName,
shard.GetDomainCache(),
shard.GetService().GetClientBean().GetRemoteAdminClient(sourceCluster),
adminRetryableClient,
func(ctx context.Context, request *h.ReplicateRawEventsRequest) error {
return shard.GetService().GetHistoryClient().ReplicateRawEvents(ctx, request)
return historyRetryableClient.ReplicateRawEvents(ctx, request)
},
shard.GetService().GetPayloadSerializer(),
replicationTimeout,
Expand Down
26 changes: 18 additions & 8 deletions service/history/replication/task_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

r "github.com/uber/cadence/.gen/go/replicator"
"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/client"
"github.com/uber/cadence/client/admin"
"github.com/uber/cadence/common"
Expand Down Expand Up @@ -220,10 +221,15 @@ func (f *taskFetcherImpl) fetchTasks() {
// When timer fires, we collect all the requests we have so far and attempt to send them to remote.
err := f.fetchAndDistributeTasks(requestByShard)
if err != nil {
timer.Reset(backoff.JitDuration(
f.config.ReplicationTaskFetcherErrorRetryWait(),
f.config.ReplicationTaskFetcherTimerJitterCoefficient(),
))
if _, ok := err.(*shared.ServiceBusyError); ok {
// slow down replication when source cluster is busy
timer.Reset(f.config.ReplicationTaskFetcherErrorRetryWait())
} else {
timer.Reset(backoff.JitDuration(
f.config.ReplicationTaskFetcherErrorRetryWait(),
f.config.ReplicationTaskFetcherTimerJitterCoefficient(),
))
}
} else {
timer.Reset(backoff.JitDuration(
f.config.ReplicationTaskFetcherAggregationInterval(),
Expand All @@ -246,8 +252,10 @@ func (f *taskFetcherImpl) fetchAndDistributeTasks(requestByShard map[int32]*requ

messagesByShard, err := f.getMessages(requestByShard)
if err != nil {
f.logger.Error("Failed to get replication tasks", tag.Error(err))
return err
if _, ok := err.(*shared.ServiceBusyError); !ok {
f.logger.Error("Failed to get replication tasks", tag.Error(err))
return err
}
}

f.logger.Debug("Successfully fetched replication tasks.", tag.Counter(len(messagesByShard)))
Expand All @@ -259,7 +267,7 @@ func (f *taskFetcherImpl) fetchAndDistributeTasks(requestByShard map[int32]*requ
delete(requestByShard, shardID)
}

return nil
return err
}

func (f *taskFetcherImpl) getMessages(
Expand All @@ -279,7 +287,9 @@ func (f *taskFetcherImpl) getMessages(
}
response, err := f.remotePeer.GetReplicationMessages(ctx, request)
if err != nil {
return nil, err
if _, ok := err.(*shared.ServiceBusyError); !ok {
return nil, err
}
}

return response.GetMessagesByShard(), err
Expand Down
20 changes: 17 additions & 3 deletions service/history/replication/task_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,21 @@ func (p *taskProcessorImpl) handleSyncShardStatus(
}

func (p *taskProcessorImpl) processSingleTask(replicationTask *r.ReplicationTask) error {
err := backoff.Retry(func() error {
return p.processTaskOnce(replicationTask)
}, p.taskRetryPolicy, isTransientRetryableError)
retryTransientError := func() error {
return backoff.Retry(
func() error {
return p.processTaskOnce(replicationTask)
},
p.taskRetryPolicy,
isTransientRetryableError)
}

//Handle service busy error
err := backoff.Retry(
retryTransientError,
common.CreateReplicationServiceBusyRetryPolicy(),
common.IsServiceBusyError,
)

if err != nil {
p.logger.Error(
Expand Down Expand Up @@ -499,6 +511,8 @@ func isTransientRetryableError(err error) bool {
switch err.(type) {
case *shared.BadRequestError:
return false
case *shared.ServiceBusyError:
return false
default:
return true
}
Expand Down
5 changes: 1 addition & 4 deletions service/worker/replicator/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,6 @@ func newReplicationTaskProcessor(
sequentialTaskProcessor task.Processor,
) *replicationTaskProcessor {

retryableHistoryClient := history.NewRetryableClient(historyClient, common.CreateHistoryServiceRetryPolicy(),
common.IsServiceTransientError)

return &replicationTaskProcessor{
currentCluster: currentCluster,
sourceCluster: sourceCluster,
Expand All @@ -115,7 +112,7 @@ func newReplicationTaskProcessor(
domainreplicationTaskExecutor: domainreplicationTaskExecutor,
historyRereplicator: historyRereplicator,
nDCHistoryResender: nDCHistoryResender,
historyClient: retryableHistoryClient,
historyClient: historyClient,
msgEncoder: codec.NewThriftRWEncoder(),
timeSource: clock.NewRealTimeSource(),
domainCache: domainCache,
Expand Down
Loading

0 comments on commit 598c507

Please sign in to comment.