Skip to content

Commit

Permalink
Enforce context timeout for retry policies in execution context and c…
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Jul 8, 2021
1 parent 8d319e2 commit d91e86f
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 9 deletions.
17 changes: 17 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,23 @@ func CreatePersistenceRetryPolicy() backoff.RetryPolicy {
return policy
}

// CreatePersistenceRetryPolicyWithContext create a retry policy for persistence layer operations
// which has an expiration interval computed based on the context's deadline
func CreatePersistenceRetryPolicyWithContext(ctx context.Context) backoff.RetryPolicy {
if ctx == nil {
return CreatePersistenceRetryPolicy()
}
deadline, ok := ctx.Deadline()
if !ok {
return CreatePersistenceRetryPolicy()
}

policy := backoff.NewExponentialRetryPolicy(retryPersistenceOperationInitialInterval)
policy.SetMaximumInterval(retryPersistenceOperationMaxInterval)
policy.SetExpirationInterval(deadline.Sub(time.Now()))
return policy
}

// CreateHistoryServiceRetryPolicy creates a retry policy for calls to history service
func CreateHistoryServiceRetryPolicy() backoff.RetryPolicy {
policy := backoff.NewExponentialRetryPolicy(historyServiceOperationInitialInterval)
Expand Down
7 changes: 6 additions & 1 deletion service/history/execution/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/pborman/uuid"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/definition"
Expand Down Expand Up @@ -315,7 +316,11 @@ func (c *Cache) getCurrentExecutionWithRetry(
return err
}

err := backoff.Retry(op, persistenceOperationRetryPolicy, persistence.IsTransientError)
err := backoff.Retry(
op,
common.CreatePersistenceRetryPolicyWithContext(ctx),
persistence.IsTransientError,
)
if err != nil {
c.metricsClient.IncCounter(metrics.HistoryCacheGetCurrentExecutionScope, metrics.CacheFailures)
return nil, err
Expand Down
13 changes: 5 additions & 8 deletions service/history/execution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,6 @@ type (

var _ Context = (*contextImpl)(nil)

var (
persistenceOperationRetryPolicy = common.CreatePersistenceRetryPolicy()
)

// NewContext creates a new workflow execution context
func NewContext(
domainID string,
Expand Down Expand Up @@ -1003,7 +999,7 @@ func (c *contextImpl) appendHistoryV2EventsWithRetry(

err := backoff.Retry(
op,
persistenceOperationRetryPolicy,
common.CreatePersistenceRetryPolicyWithContext(ctx),
persistence.IsTransientError,
)
return int64(resp), err
Expand Down Expand Up @@ -1033,7 +1029,7 @@ func (c *contextImpl) createWorkflowExecutionWithRetry(

err := backoff.Retry(
op,
persistenceOperationRetryPolicy,
common.CreatePersistenceRetryPolicyWithContext(ctx),
isRetryable,
)
switch err.(type) {
Expand Down Expand Up @@ -1071,7 +1067,7 @@ func (c *contextImpl) getWorkflowExecutionWithRetry(

err := backoff.Retry(
op,
persistenceOperationRetryPolicy,
common.CreatePersistenceRetryPolicyWithContext(ctx),
persistence.IsTransientError,
)
switch err.(type) {
Expand Down Expand Up @@ -1114,7 +1110,8 @@ func (c *contextImpl) updateWorkflowExecutionWithRetry(
}

err := backoff.Retry(
op, persistenceOperationRetryPolicy,
op,
common.CreatePersistenceRetryPolicyWithContext(ctx),
isRetryable,
)
switch err.(type) {
Expand Down

0 comments on commit d91e86f

Please sign in to comment.