From cb87bd7036775dfb8249a0f7bbbff5871b7b4e37 Mon Sep 17 00:00:00 2001 From: Samar Abbas - Uber Date: Tue, 10 Jul 2018 10:38:24 -0700 Subject: [PATCH] Simplify replication worker retry logic (#945) Current retry logic can result in some replication tasks to be retried indifinitely if we have bugs in our history replication code. Updated the logic to make it much simpler. It now uses a retryable history client to retry any history service transient errors before returning the error back to processWithRetry pump. The pump will keep on retrying whiltelisted transient errors for ever as it makes no sense to move those messages to DLQ, otherwise it decrements the remaining retry count and continue applying the message until fixed value of max retry count is hit for that task. --- client/history/retryableClient.go | 355 ++++++++++++++++++ common/metrics/defs.go | 8 +- common/util.go | 10 +- service/frontend/workflowHandler.go | 2 +- service/history/timerQueueActiveProcessor.go | 4 +- .../history/transferQueueActiveProcessor.go | 4 +- service/worker/processor.go | 195 ++++------ service/worker/service.go | 2 + 8 files changed, 441 insertions(+), 139 deletions(-) create mode 100644 client/history/retryableClient.go diff --git a/client/history/retryableClient.go b/client/history/retryableClient.go new file mode 100644 index 00000000000..c280687f523 --- /dev/null +++ b/client/history/retryableClient.go @@ -0,0 +1,355 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package history + +import ( + "context" + + h "github.com/uber/cadence/.gen/go/history" + "github.com/uber/cadence/.gen/go/shared" + "github.com/uber/cadence/common/backoff" + "go.uber.org/yarpc" +) + +var _ Client = (*retryableClient)(nil) + +type retryableClient struct { + client Client + policy backoff.RetryPolicy + isRetryable backoff.IsRetryable +} + +// NewRetryableClient creates a new instance of Client with retry policy +func NewRetryableClient(client Client, policy backoff.RetryPolicy, isRetryable backoff.IsRetryable) Client { + return &retryableClient{ + client: client, + policy: policy, + isRetryable: isRetryable, + } +} + +func (c *retryableClient) StartWorkflowExecution( + ctx context.Context, + request *h.StartWorkflowExecutionRequest, + opts ...yarpc.CallOption) (*shared.StartWorkflowExecutionResponse, error) { + + var resp *shared.StartWorkflowExecutionResponse + op := func() error { + var err error + resp, err = c.client.StartWorkflowExecution(ctx, request, opts...) + return err + } + + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} + +func (c *retryableClient) DescribeHistoryHost( + ctx context.Context, + request *shared.DescribeHistoryHostRequest, + opts ...yarpc.CallOption) (*shared.DescribeHistoryHostResponse, error) { + + var resp *shared.DescribeHistoryHostResponse + op := func() error { + var err error + resp, err = c.client.DescribeHistoryHost(ctx, request, opts...) + return err + } + + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} + +func (c *retryableClient) DescribeMutableState( + ctx context.Context, + request *h.DescribeMutableStateRequest, + opts ...yarpc.CallOption) (*h.DescribeMutableStateResponse, error) { + + var resp *h.DescribeMutableStateResponse + op := func() error { + var err error + resp, err = c.client.DescribeMutableState(ctx, request, opts...) + return err + } + + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} + +func (c *retryableClient) GetMutableState( + ctx context.Context, + request *h.GetMutableStateRequest, + opts ...yarpc.CallOption) (*h.GetMutableStateResponse, error) { + + var resp *h.GetMutableStateResponse + op := func() error { + var err error + resp, err = c.client.GetMutableState(ctx, request, opts...) + return err + } + + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} + +func (c *retryableClient) ResetStickyTaskList( + ctx context.Context, + request *h.ResetStickyTaskListRequest, + opts ...yarpc.CallOption) (*h.ResetStickyTaskListResponse, error) { + + var resp *h.ResetStickyTaskListResponse + op := func() error { + var err error + resp, err = c.client.ResetStickyTaskList(ctx, request, opts...) + return err + } + + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} + +func (c *retryableClient) DescribeWorkflowExecution( + ctx context.Context, + request *h.DescribeWorkflowExecutionRequest, + opts ...yarpc.CallOption) (*shared.DescribeWorkflowExecutionResponse, error) { + + var resp *shared.DescribeWorkflowExecutionResponse + op := func() error { + var err error + resp, err = c.client.DescribeWorkflowExecution(ctx, request, opts...) + return err + } + + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} + +func (c *retryableClient) RecordDecisionTaskStarted( + ctx context.Context, + request *h.RecordDecisionTaskStartedRequest, + opts ...yarpc.CallOption) (*h.RecordDecisionTaskStartedResponse, error) { + + var resp *h.RecordDecisionTaskStartedResponse + op := func() error { + var err error + resp, err = c.client.RecordDecisionTaskStarted(ctx, request, opts...) + return err + } + + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} + +func (c *retryableClient) RecordActivityTaskStarted( + ctx context.Context, + request *h.RecordActivityTaskStartedRequest, + opts ...yarpc.CallOption) (*h.RecordActivityTaskStartedResponse, error) { + + var resp *h.RecordActivityTaskStartedResponse + op := func() error { + var err error + resp, err = c.client.RecordActivityTaskStarted(ctx, request, opts...) + return err + } + + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} + +func (c *retryableClient) RespondDecisionTaskCompleted( + ctx context.Context, + request *h.RespondDecisionTaskCompletedRequest, + opts ...yarpc.CallOption) (*h.RespondDecisionTaskCompletedResponse, error) { + + var resp *h.RespondDecisionTaskCompletedResponse + op := func() error { + var err error + resp, err = c.client.RespondDecisionTaskCompleted(ctx, request, opts...) + return err + } + + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} + +func (c *retryableClient) RespondDecisionTaskFailed( + ctx context.Context, + request *h.RespondDecisionTaskFailedRequest, + opts ...yarpc.CallOption) error { + + op := func() error { + return c.client.RespondDecisionTaskFailed(ctx, request, opts...) + } + + return backoff.Retry(op, c.policy, c.isRetryable) +} + +func (c *retryableClient) RespondActivityTaskCompleted( + ctx context.Context, + request *h.RespondActivityTaskCompletedRequest, + opts ...yarpc.CallOption) error { + + op := func() error { + return c.client.RespondActivityTaskCompleted(ctx, request, opts...) + } + + return backoff.Retry(op, c.policy, c.isRetryable) +} + +func (c *retryableClient) RespondActivityTaskFailed( + ctx context.Context, + request *h.RespondActivityTaskFailedRequest, + opts ...yarpc.CallOption) error { + + op := func() error { + return c.client.RespondActivityTaskFailed(ctx, request, opts...) + } + + return backoff.Retry(op, c.policy, c.isRetryable) +} + +func (c *retryableClient) RespondActivityTaskCanceled( + ctx context.Context, + request *h.RespondActivityTaskCanceledRequest, + opts ...yarpc.CallOption) error { + + op := func() error { + return c.client.RespondActivityTaskCanceled(ctx, request, opts...) + } + + return backoff.Retry(op, c.policy, c.isRetryable) +} + +func (c *retryableClient) RecordActivityTaskHeartbeat( + ctx context.Context, + request *h.RecordActivityTaskHeartbeatRequest, + opts ...yarpc.CallOption) (*shared.RecordActivityTaskHeartbeatResponse, error) { + + var resp *shared.RecordActivityTaskHeartbeatResponse + op := func() error { + var err error + resp, err = c.client.RecordActivityTaskHeartbeat(ctx, request, opts...) + return err + } + + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} + +func (c *retryableClient) RequestCancelWorkflowExecution( + ctx context.Context, + request *h.RequestCancelWorkflowExecutionRequest, + opts ...yarpc.CallOption) error { + + op := func() error { + return c.client.RequestCancelWorkflowExecution(ctx, request, opts...) + } + + return backoff.Retry(op, c.policy, c.isRetryable) +} + +func (c *retryableClient) SignalWorkflowExecution( + ctx context.Context, + request *h.SignalWorkflowExecutionRequest, + opts ...yarpc.CallOption) error { + + op := func() error { + return c.client.SignalWorkflowExecution(ctx, request, opts...) + } + + return backoff.Retry(op, c.policy, c.isRetryable) +} + +func (c *retryableClient) SignalWithStartWorkflowExecution( + ctx context.Context, + request *h.SignalWithStartWorkflowExecutionRequest, + opts ...yarpc.CallOption) (*shared.StartWorkflowExecutionResponse, error) { + + var resp *shared.StartWorkflowExecutionResponse + op := func() error { + var err error + resp, err = c.client.SignalWithStartWorkflowExecution(ctx, request, opts...) + return err + } + + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} + +func (c *retryableClient) RemoveSignalMutableState( + ctx context.Context, + request *h.RemoveSignalMutableStateRequest, + opts ...yarpc.CallOption) error { + + op := func() error { + return c.client.RemoveSignalMutableState(ctx, request, opts...) + } + + return backoff.Retry(op, c.policy, c.isRetryable) +} + +func (c *retryableClient) TerminateWorkflowExecution( + ctx context.Context, + request *h.TerminateWorkflowExecutionRequest, + opts ...yarpc.CallOption) error { + + op := func() error { + return c.client.TerminateWorkflowExecution(ctx, request, opts...) + } + + return backoff.Retry(op, c.policy, c.isRetryable) +} + +func (c *retryableClient) ScheduleDecisionTask( + ctx context.Context, + request *h.ScheduleDecisionTaskRequest, + opts ...yarpc.CallOption) error { + + op := func() error { + return c.client.ScheduleDecisionTask(ctx, request, opts...) + } + + return backoff.Retry(op, c.policy, c.isRetryable) +} + +func (c *retryableClient) RecordChildExecutionCompleted( + ctx context.Context, + request *h.RecordChildExecutionCompletedRequest, + opts ...yarpc.CallOption) error { + + op := func() error { + return c.client.RecordChildExecutionCompleted(ctx, request, opts...) + } + + return backoff.Retry(op, c.policy, c.isRetryable) +} + +func (c *retryableClient) ReplicateEvents( + ctx context.Context, + request *h.ReplicateEventsRequest, + opts ...yarpc.CallOption) error { + + op := func() error { + return c.client.ReplicateEvents(ctx, request, opts...) + } + + return backoff.Retry(op, c.policy, c.isRetryable) +} diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 4e24c90b42e..9707fd21522 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -759,7 +759,6 @@ const ( ReplicatorMessages = iota + NumCommonMetrics ReplicatorFailures ReplicatorLatency - ReplicatorRetryPercentage ) // MetricDefs record the metrics for all services @@ -856,10 +855,9 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ BufferThrottleCounter: {metricName: "buffer.throttle.count"}, }, Worker: { - ReplicatorMessages: {metricName: "replicator.messages"}, - ReplicatorFailures: {metricName: "replicator.errors"}, - ReplicatorLatency: {metricName: "replicator.latency"}, - ReplicatorRetryPercentage: {metricName: "replicator.retry-percentage", metricType: Gauge}, + ReplicatorMessages: {metricName: "replicator.messages"}, + ReplicatorFailures: {metricName: "replicator.errors"}, + ReplicatorLatency: {metricName: "replicator.latency"}, }, } diff --git a/common/util.go b/common/util.go index 87ddd13db66..43f8dfb87d4 100644 --- a/common/util.go +++ b/common/util.go @@ -169,8 +169,8 @@ func IsServiceNonRetryableError(err error) bool { return false } -// IsMatchingServiceTransientError checks if the error is a transient error. -func IsMatchingServiceTransientError(err error) bool { +// IsWhitelistServiceTransientError checks if the error is a transient error. +func IsWhitelistServiceTransientError(err error) bool { switch err.(type) { case *workflow.InternalServiceError: return true @@ -178,9 +178,11 @@ func IsMatchingServiceTransientError(err error) bool { return true case *workflow.LimitExceededError: return true + case *h.ShardOwnershipLostError: + return true case *yarpcerrors.Status: - rpcErr := err.(*yarpcerrors.Status) - if rpcErr.Code() == yarpcerrors.CodeDeadlineExceeded { + // We only selectively retry the following yarpc errors client can safe retry with a backoff + if yarpcerrors.IsDeadlineExceeded(err) || yarpcerrors.IsUnavailable(err) || yarpcerrors.IsInternal(err) { return true } return false diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index c043ddb3fc8..c7f33e98ae5 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -150,7 +150,7 @@ func (wh *WorkflowHandler) Start() error { return err } wh.matching = matching.NewRetryableClient(wh.matchingRawClient, common.CreateMatchingRetryPolicy(), - common.IsMatchingServiceTransientError) + common.IsWhitelistServiceTransientError) wh.metricsClient = wh.Service.GetMetricsClient() wh.startWG.Done() return nil diff --git a/service/history/timerQueueActiveProcessor.go b/service/history/timerQueueActiveProcessor.go index 626f78851e9..552d1f01bda 100644 --- a/service/history/timerQueueActiveProcessor.go +++ b/service/history/timerQueueActiveProcessor.go @@ -65,7 +65,7 @@ func newTimerQueueActiveProcessor(shard ShardContext, historyService *historyEng timerQueueAckMgr := newTimerQueueAckMgr(metrics.TimerActiveQueueProcessorScope, shard, historyService.metricsClient, currentClusterName, logger) retryableMatchingClient := matching.NewRetryableClient(matchingClient, common.CreateMatchingRetryPolicy(), - common.IsMatchingServiceTransientError) + common.IsWhitelistServiceTransientError) processor := &timerQueueActiveProcessorImpl{ shard: shard, historyService: historyService, @@ -103,7 +103,7 @@ func newTimerQueueFailoverProcessor(shard ShardContext, historyService *historyE timerQueueAckMgr := newTimerQueueFailoverAckMgr(shard, historyService.metricsClient, standbyClusterName, minLevel, maxLevel, logger) retryableMatchingClient := matching.NewRetryableClient(matchingClient, common.CreateMatchingRetryPolicy(), - common.IsMatchingServiceTransientError) + common.IsWhitelistServiceTransientError) processor := &timerQueueActiveProcessorImpl{ shard: shard, historyService: historyService, diff --git a/service/history/transferQueueActiveProcessor.go b/service/history/transferQueueActiveProcessor.go index 7c6b0b28903..8753d00e314 100644 --- a/service/history/transferQueueActiveProcessor.go +++ b/service/history/transferQueueActiveProcessor.go @@ -94,7 +94,7 @@ func newTransferQueueActiveProcessor(shard ShardContext, historyService *history } retryableMatchingClient := matching.NewRetryableClient(matchingClient, common.CreateMatchingRetryPolicy(), - common.IsMatchingServiceTransientError) + common.IsWhitelistServiceTransientError) processor := &transferQueueActiveProcessorImpl{ currentClusterName: currentClusterName, @@ -151,7 +151,7 @@ func newTransferQueueFailoverProcessor(shard ShardContext, historyService *histo } retryableMatchingClient := matching.NewRetryableClient(matchingClient, common.CreateMatchingRetryPolicy(), - common.IsMatchingServiceTransientError) + common.IsWhitelistServiceTransientError) processor := &transferQueueActiveProcessorImpl{ currentClusterName: currentClusterName, diff --git a/service/worker/processor.go b/service/worker/processor.go index ca8b7cb285f..05d8933f031 100644 --- a/service/worker/processor.go +++ b/service/worker/processor.go @@ -21,15 +21,12 @@ package worker import ( - "math" + "context" + "encoding/json" "sync" "sync/atomic" "time" - "encoding/json" - - "context" - "github.com/uber-common/bark" "github.com/uber-go/kafka-client/kafka" h "github.com/uber/cadence/.gen/go/history" @@ -44,35 +41,6 @@ import ( "go.uber.org/yarpc/yarpcerrors" ) -type workerStatus int - -var errMaxAttemptReached = &shared.BadRequestError{Message: "Maximum attempts exceeded"} -var errDeserializeReplicationTask = &shared.BadRequestError{Message: "Failed to deserialize replication task"} - -const ( - workerStatusRunning workerStatus = iota - workerStatusPendingRetry -) - -const ( - // below are the max retry count for worker - - // [0, 0.6) percentage max retry count - retryCountInfinity int64 = math.MaxInt64 // using int64 max as infinity - // [0.6, 0.7) percentage max retry count - retryCount70PercentInRetry int64 = 256 - // [0.7, 0.8) percentage max retry count - retryCount80PercentInRetry int64 = 128 - // [0.8, 0.9) percentage max retry count - retryCount90PercentInRetry int64 = 64 - // [0.9, 0.95) percentage max retry count - retryCount95PercentInRetry int64 = 32 - // [0.95, 1] percentage max retry count - retryCount100PercentInRetry int64 = 16 - - retryErrorWaitMillis = 100 -) - type ( // DomainReplicator is the interface which can replicate the domain DomainReplicator interface { @@ -95,14 +63,13 @@ type ( metricsClient metrics.Client domainReplicator DomainReplicator historyClient history.Client - - // worker in retry count is used by underlying processor when doing retry on a task - // this help the replicator / underlying processor understanding the overall - // situation and giveup retrying - workerInRetryCount int32 } ) +const ( + retryErrorWaitMillis = 100 +) + const ( replicationTaskInitialRetryInterval = 100 * time.Millisecond replicationTaskMaxRetryInterval = 2 * time.Second @@ -113,13 +80,20 @@ var ( // ErrEmptyReplicationTask is the error to indicate empty replication task ErrEmptyReplicationTask = &shared.BadRequestError{Message: "empty replication task"} // ErrUnknownReplicationTask is the error to indicate unknown replication task type - ErrUnknownReplicationTask = &shared.BadRequestError{Message: "unknown replication task"} + ErrUnknownReplicationTask = &shared.BadRequestError{Message: "unknown replication task"} + // ErrDeserializeReplicationTask is the error to indicate failure to deserialize replication task + ErrDeserializeReplicationTask = &shared.BadRequestError{Message: "Failed to deserialize replication task"} + replicationTaskRetryPolicy = createReplicatorRetryPolicy() ) func newReplicationTaskProcessor(currentCluster, sourceCluster, consumer string, client messaging.Client, config *Config, logger bark.Logger, metricsClient metrics.Client, domainReplicator DomainReplicator, historyClient history.Client) *replicationTaskProcessor { + + retryableHistoryClient := history.NewRetryableClient(historyClient, common.CreateHistoryServiceRetryPolicy(), + common.IsWhitelistServiceTransientError) + return &replicationTaskProcessor{ currentCluster: currentCluster, sourceCluster: sourceCluster, @@ -132,10 +106,9 @@ func newReplicationTaskProcessor(currentCluster, sourceCluster, consumer string, logging.TagSourceCluster: sourceCluster, logging.TagConsumerName: consumer, }), - metricsClient: metricsClient, - domainReplicator: domainReplicator, - historyClient: historyClient, - workerInRetryCount: 0, + metricsClient: metricsClient, + domainReplicator: domainReplicator, + historyClient: retryableHistoryClient, } } @@ -181,50 +154,6 @@ func (p *replicationTaskProcessor) Stop() { } } -func (p *replicationTaskProcessor) updateWorkerRetryStatus(isInRetry bool) { - if isInRetry { - atomic.AddInt32(&p.workerInRetryCount, 1) - } else { - atomic.AddInt32(&p.workerInRetryCount, -1) - } -} - -// getRemainingRetryCount returns the max retry count at the moment -func (p *replicationTaskProcessor) getRemainingRetryCount(remainingRetryCount int64) int64 { - workerInRetry := float64(atomic.LoadInt32(&p.workerInRetryCount)) - numWorker := float64(p.config.ReplicatorConcurrency) - retryPercentage := workerInRetry / numWorker - - if retryPercentage > 1 || retryPercentage < 0 { - p.logger.Fatal("Worker busy level is out of bound") - } - p.metricsClient.UpdateGauge(metrics.ReplicatorScope, metrics.ReplicatorRetryPercentage, retryPercentage) - - min := func(i int64, j int64) int64 { - if i < j { - return i - } - return j - } - - if retryPercentage < 0.6 { - return min(remainingRetryCount, retryCountInfinity) - } - if retryPercentage < 0.7 { - return min(remainingRetryCount, retryCount70PercentInRetry) - } - if retryPercentage < 0.8 { - return min(remainingRetryCount, retryCount80PercentInRetry) - } - if retryPercentage < 0.9 { - return min(remainingRetryCount, retryCount90PercentInRetry) - } - if retryPercentage < 0.95 { - return min(remainingRetryCount, retryCount95PercentInRetry) - } - return min(remainingRetryCount, retryCount100PercentInRetry) -} - func (p *replicationTaskProcessor) processorPump() { defer p.shutdownWG.Done() @@ -267,54 +196,55 @@ func (p *replicationTaskProcessor) messageProcessLoop(workerWG *sync.WaitGroup, func (p *replicationTaskProcessor) processWithRetry(msg kafka.Message, workerID int) { var err error - isInRetry := false - remainingRetryCount := retryCountInfinity - defer func() { - if isInRetry { - p.updateWorkerRetryStatus(false) + forceBuffer := false + remainingRetryCount := p.config.ReplicationTaskMaxRetry + + op := func() error { + processErr := p.process(msg, forceBuffer) + if processErr != nil && p.isRetryTaskError(processErr) { + // Enable buffering of replication tasks for next attempt + forceBuffer = true } - }() + + return processErr + } ProcessRetryLoop: - for { + for attempt := 0; ; attempt++ { select { case <-p.shutdownCh: return default: - op := func() error { - remainingRetryCount-- - if remainingRetryCount <= 0 { - return errMaxAttemptReached - } + // isTransientRetryableError is pretty broad on purpose as we want to retry replication tasks few times before + // moving them to DLQ. + err = backoff.Retry(op, replicationTaskRetryPolicy, p.isTransientRetryableError) + if err != nil && p.isTransientRetryableError(err) { + // Any whitelisted transient errors should be retried indefinitely + if common.IsWhitelistServiceTransientError(err) { + // Emit a warning log on every 100 transient error retries of replication task + if attempt%100 == 0 { + p.logger.WithFields(bark.Fields{ + logging.TagErr: err, + logging.TagPartitionKey: msg.Partition(), + logging.TagOffset: msg.Offset(), + }).Warn("Error processing replication task.") + } - errMsg := p.process(msg, isInRetry) - if errMsg != nil && p.isTransientRetryableError(errMsg) { // Keep on retrying transient errors for ever - if !isInRetry { - isInRetry = true - p.updateWorkerRetryStatus(true) - } - remainingRetryCount = p.getRemainingRetryCount(remainingRetryCount) + continue ProcessRetryLoop } - return errMsg - } - - err = backoff.Retry(op, replicationTaskRetryPolicy, p.isTransientRetryableError) - if err != nil && p.isTransientRetryableError(err) { - // Emit a warning log on every 1000 attempt to retry a message - if remainingRetryCount%1000 == 0 { - p.logger.WithFields(bark.Fields{ - logging.TagErr: err, - logging.TagPartitionKey: msg.Partition(), - logging.TagOffset: msg.Offset(), - }).Warn("Error processing replication task.") + // Otherwise decrement the remaining retries and check if we have more attempts left. + // This code path is needed to handle RetryTaskError so we can retry such replication tasks with forceBuffer + // enabled. Once all attempts are exhausted then msg will be nack'ed and moved to DLQ + remainingRetryCount-- + if remainingRetryCount > 0 { + continue ProcessRetryLoop } - - // Keep on retrying transient errors for ever - continue ProcessRetryLoop } + } + break ProcessRetryLoop } @@ -345,7 +275,7 @@ func (p *replicationTaskProcessor) process(msg kafka.Message, inRetry bool) erro }).Error("Failed to deserialize replication task.") // return BadRequestError so processWithRetry can nack the message - return errDeserializeReplicationTask + return ErrDeserializeReplicationTask } if task.TaskType == nil { @@ -419,8 +349,15 @@ Loop: RetryLoop: for i := 0; i < p.config.ReplicatorBufferRetryCount; i++ { - err = p.historyClient.ReplicateEvents(context.Background(), req) - if _, ok := err.(*shared.RetryTaskError); ok { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + err = p.historyClient.ReplicateEvents(ctx, req) + + // Replication tasks could be slightly out of order for a particular workflow execution + // We first try to apply the events without buffering enabled with a small delay to account for such delays + // Caller should try to apply the event with buffering enabled once we return RetryTaskError after all retries + if p.isRetryTaskError(err) { time.Sleep(retryErrorWaitMillis * time.Millisecond) continue RetryLoop } @@ -456,6 +393,14 @@ func (p *replicationTaskProcessor) updateFailureMetric(scope int, err error) { } } +func (p *replicationTaskProcessor) isRetryTaskError(err error) bool { + if _, ok := err.(*shared.RetryTaskError); ok { + return true + } + + return false +} + func (p *replicationTaskProcessor) isTransientRetryableError(err error) bool { switch err.(type) { case *shared.BadRequestError: diff --git a/service/worker/service.go b/service/worker/service.go index 7c540013797..c9a6d62a01b 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -43,6 +43,7 @@ type ( // Replicator settings ReplicatorConcurrency int ReplicatorBufferRetryCount int + ReplicationTaskMaxRetry int } ) @@ -60,6 +61,7 @@ func NewConfig() *Config { return &Config{ ReplicatorConcurrency: 1000, ReplicatorBufferRetryCount: 8, + ReplicationTaskMaxRetry: 5, } }