Skip to content

Commit

Permalink
Refactor and improve the retry logic to avoid throttling dependencies (
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored Oct 15, 2021
1 parent 87b2eae commit 13c6a2b
Show file tree
Hide file tree
Showing 36 changed files with 653 additions and 335 deletions.
61 changes: 31 additions & 30 deletions client/admin/retryableClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,18 @@ import (
var _ Client = (*retryableClient)(nil)

type retryableClient struct {
client Client
policy backoff.RetryPolicy
isRetryable backoff.IsRetryable
client Client
throttleRetry *backoff.ThrottleRetry
}

// NewRetryableClient creates a new instance of Client with retry policy
func NewRetryableClient(client Client, policy backoff.RetryPolicy, isRetryable backoff.IsRetryable) Client {
return &retryableClient{
client: client,
policy: policy,
isRetryable: isRetryable,
client: client,
throttleRetry: backoff.NewThrottleRetry(
backoff.WithRetryPolicy(policy),
backoff.WithRetryableError(isRetryable),
),
}
}

Expand All @@ -55,7 +56,7 @@ func (c *retryableClient) AddSearchAttribute(
op := func() error {
return c.client.AddSearchAttribute(ctx, request, opts...)
}
return backoff.Retry(op, c.policy, c.isRetryable)
return c.throttleRetry.Do(ctx, op)
}

func (c *retryableClient) DescribeShardDistribution(
Expand All @@ -70,7 +71,7 @@ func (c *retryableClient) DescribeShardDistribution(
resp, err = c.client.DescribeShardDistribution(ctx, request, opts...)
return err
}
err := backoff.Retry(op, c.policy, c.isRetryable)
err := c.throttleRetry.Do(ctx, op)
return resp, err
}

Expand All @@ -86,7 +87,7 @@ func (c *retryableClient) DescribeHistoryHost(
resp, err = c.client.DescribeHistoryHost(ctx, request, opts...)
return err
}
err := backoff.Retry(op, c.policy, c.isRetryable)
err := c.throttleRetry.Do(ctx, op)
return resp, err
}

Expand All @@ -99,7 +100,7 @@ func (c *retryableClient) RemoveTask(
op := func() error {
return c.client.RemoveTask(ctx, request, opts...)
}
return backoff.Retry(op, c.policy, c.isRetryable)
return c.throttleRetry.Do(ctx, op)
}

func (c *retryableClient) CloseShard(
Expand All @@ -111,7 +112,7 @@ func (c *retryableClient) CloseShard(
op := func() error {
return c.client.CloseShard(ctx, request, opts...)
}
return backoff.Retry(op, c.policy, c.isRetryable)
return c.throttleRetry.Do(ctx, op)
}

func (c *retryableClient) ResetQueue(
Expand All @@ -123,7 +124,7 @@ func (c *retryableClient) ResetQueue(
op := func() error {
return c.client.ResetQueue(ctx, request, opts...)
}
return backoff.Retry(op, c.policy, c.isRetryable)
return c.throttleRetry.Do(ctx, op)
}

func (c *retryableClient) DescribeQueue(
Expand All @@ -138,7 +139,7 @@ func (c *retryableClient) DescribeQueue(
resp, err = c.client.DescribeQueue(ctx, request, opts...)
return err
}
err := backoff.Retry(op, c.policy, c.isRetryable)
err := c.throttleRetry.Do(ctx, op)
return resp, err
}

Expand All @@ -154,7 +155,7 @@ func (c *retryableClient) DescribeWorkflowExecution(
resp, err = c.client.DescribeWorkflowExecution(ctx, request, opts...)
return err
}
err := backoff.Retry(op, c.policy, c.isRetryable)
err := c.throttleRetry.Do(ctx, op)
return resp, err
}

Expand All @@ -170,7 +171,7 @@ func (c *retryableClient) GetWorkflowExecutionRawHistoryV2(
resp, err = c.client.GetWorkflowExecutionRawHistoryV2(ctx, request, opts...)
return err
}
err := backoff.Retry(op, c.policy, c.isRetryable)
err := c.throttleRetry.Do(ctx, op)
return resp, err
}

Expand All @@ -185,7 +186,7 @@ func (c *retryableClient) DescribeCluster(
resp, err = c.client.DescribeCluster(ctx, opts...)
return err
}
err := backoff.Retry(op, c.policy, c.isRetryable)
err := c.throttleRetry.Do(ctx, op)
return resp, err
}

Expand All @@ -200,7 +201,7 @@ func (c *retryableClient) GetReplicationMessages(
resp, err = c.client.GetReplicationMessages(ctx, request, opts...)
return err
}
err := backoff.Retry(op, c.policy, c.isRetryable)
err := c.throttleRetry.Do(ctx, op)
return resp, err
}

Expand All @@ -215,7 +216,7 @@ func (c *retryableClient) GetDomainReplicationMessages(
resp, err = c.client.GetDomainReplicationMessages(ctx, request, opts...)
return err
}
err := backoff.Retry(op, c.policy, c.isRetryable)
err := c.throttleRetry.Do(ctx, op)
return resp, err
}

Expand All @@ -230,7 +231,7 @@ func (c *retryableClient) GetDLQReplicationMessages(
resp, err = c.client.GetDLQReplicationMessages(ctx, request, opts...)
return err
}
err := backoff.Retry(op, c.policy, c.isRetryable)
err := c.throttleRetry.Do(ctx, op)
return resp, err
}

Expand All @@ -243,7 +244,7 @@ func (c *retryableClient) ReapplyEvents(
op := func() error {
return c.client.ReapplyEvents(ctx, request, opts...)
}
return backoff.Retry(op, c.policy, c.isRetryable)
return c.throttleRetry.Do(ctx, op)
}

func (c *retryableClient) ReadDLQMessages(
Expand All @@ -258,7 +259,7 @@ func (c *retryableClient) ReadDLQMessages(
resp, err = c.client.ReadDLQMessages(ctx, request, opts...)
return err
}
err := backoff.Retry(op, c.policy, c.isRetryable)
err := c.throttleRetry.Do(ctx, op)
return resp, err
}

Expand All @@ -271,7 +272,7 @@ func (c *retryableClient) PurgeDLQMessages(
op := func() error {
return c.client.PurgeDLQMessages(ctx, request, opts...)
}
return backoff.Retry(op, c.policy, c.isRetryable)
return c.throttleRetry.Do(ctx, op)
}

func (c *retryableClient) MergeDLQMessages(
Expand All @@ -286,7 +287,7 @@ func (c *retryableClient) MergeDLQMessages(
resp, err = c.client.MergeDLQMessages(ctx, request, opts...)
return err
}
err := backoff.Retry(op, c.policy, c.isRetryable)
err := c.throttleRetry.Do(ctx, op)
return resp, err
}

Expand All @@ -299,7 +300,7 @@ func (c *retryableClient) RefreshWorkflowTasks(
op := func() error {
return c.client.RefreshWorkflowTasks(ctx, request, opts...)
}
return backoff.Retry(op, c.policy, c.isRetryable)
return c.throttleRetry.Do(ctx, op)
}

func (c *retryableClient) ResendReplicationTasks(
Expand All @@ -311,7 +312,7 @@ func (c *retryableClient) ResendReplicationTasks(
op := func() error {
return c.client.ResendReplicationTasks(ctx, request, opts...)
}
return backoff.Retry(op, c.policy, c.isRetryable)
return c.throttleRetry.Do(ctx, op)
}

func (c *retryableClient) GetCrossClusterTasks(
Expand All @@ -325,7 +326,7 @@ func (c *retryableClient) GetCrossClusterTasks(
resp, err = c.client.GetCrossClusterTasks(ctx, request, opts...)
return err
}
err := backoff.Retry(op, c.policy, c.isRetryable)
err := c.throttleRetry.Do(ctx, op)
return resp, err
}

Expand All @@ -340,7 +341,7 @@ func (c *retryableClient) GetDynamicConfig(
resp, err = c.client.GetDynamicConfig(ctx, request, opts...)
return err
}
err := backoff.Retry(op, c.policy, c.isRetryable)
err := c.throttleRetry.Do(ctx, op)
return resp, err
}

Expand All @@ -352,7 +353,7 @@ func (c *retryableClient) UpdateDynamicConfig(
op := func() error {
return c.client.UpdateDynamicConfig(ctx, request, opts...)
}
return backoff.Retry(op, c.policy, c.isRetryable)
return c.throttleRetry.Do(ctx, op)
}

func (c *retryableClient) RestoreDynamicConfig(
Expand All @@ -363,7 +364,7 @@ func (c *retryableClient) RestoreDynamicConfig(
op := func() error {
return c.client.RestoreDynamicConfig(ctx, request, opts...)
}
return backoff.Retry(op, c.policy, c.isRetryable)
return c.throttleRetry.Do(ctx, op)
}

func (c *retryableClient) ListDynamicConfig(
Expand All @@ -377,6 +378,6 @@ func (c *retryableClient) ListDynamicConfig(
resp, err = c.client.ListDynamicConfig(ctx, request, opts...)
return err
}
err := backoff.Retry(op, c.policy, c.isRetryable)
err := c.throttleRetry.Do(ctx, op)
return resp, err
}
Loading

0 comments on commit 13c6a2b

Please sign in to comment.