Skip to content

Commit

Permalink
Don't use BackoffThrottler in multithread contexts (temporalio#1467)
Browse files Browse the repository at this point in the history
  • Loading branch information
mjameswh authored Oct 13, 2022
1 parent f55f610 commit cea3e73
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 313 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@

public final class GenericWorkflowClientImpl implements GenericWorkflowClient {

private static final ScheduledExecutorService executor =
new ScheduledThreadPoolExecutor(1, r -> new Thread(r, "generic-wf-client-async-throttler"));

private final WorkflowServiceStubs service;
private final Scope metricsScope;
private final GrpcRetryer grpcRetryer;
Expand Down Expand Up @@ -160,6 +163,7 @@ public GetWorkflowExecutionHistoryResponse longPollHistory(
public CompletableFuture<GetWorkflowExecutionHistoryResponse> longPollHistoryAsync(
@Nonnull GetWorkflowExecutionHistoryRequest request, @Nonnull Deadline deadline) {
return grpcRetryer.retryWithResultAsync(
executor,
() -> {
CompletableFuture<GetWorkflowExecutionHistoryResponse> result = new CompletableFuture<>();
ListenableFuture<GetWorkflowExecutionHistoryResponse> resultFuture =
Expand Down
21 changes: 13 additions & 8 deletions temporal-sdk/src/main/java/io/temporal/internal/worker/Poller.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ interface ThrowingRunnable {

private final AtomicReference<CountDownLatch> suspendLatch = new AtomicReference<>();

private BackoffThrottler pollBackoffThrottler;
private Throttler pollRateThrottler;

private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler =
Expand Down Expand Up @@ -113,11 +112,6 @@ public void start() {
new ExecutorThreadFactory(
pollerOptions.getPollThreadNamePrefix(), pollerOptions.getUncaughtExceptionHandler()));

pollBackoffThrottler =
new BackoffThrottler(
pollerOptions.getPollBackoffInitialInterval(),
pollerOptions.getPollBackoffMaximumInterval(),
pollerOptions.getPollBackoffCoefficient());
for (int i = 0; i < pollerOptions.getPollThreadCount(); i++) {
pollExecutor.execute(new PollLoopTask(new PollExecutionTask()));
workerMetricsScope.counter(MetricsType.POLLER_START_COUNTER).inc(1);
Expand Down Expand Up @@ -206,15 +200,24 @@ public String toString() {
private class PollLoopTask implements Runnable {

private final Poller.ThrowingRunnable task;
private final BackoffThrottler pollBackoffThrottler;

PollLoopTask(Poller.ThrowingRunnable task) {
this.task = task;
this.pollBackoffThrottler =
new BackoffThrottler(
pollerOptions.getPollBackoffInitialInterval(),
pollerOptions.getPollBackoffMaximumInterval(),
pollerOptions.getPollBackoffCoefficient());
}

@Override
public void run() {
try {
pollBackoffThrottler.throttle();
long throttleMs = pollBackoffThrottler.getSleepTime();
if (throttleMs > 0) {
Thread.sleep(throttleMs);
}
if (pollRateThrottler != null) {
pollRateThrottler.throttle();
}
Expand All @@ -237,8 +240,10 @@ public void run() {
if (e instanceof InterruptedException) {
// we restore the flag here, so it can be checked and processed (with exit) in finally.
Thread.currentThread().interrupt();
} else {
// Don't increase throttle on InterruptedException
pollBackoffThrottler.failure();
}
pollBackoffThrottler.failure();
uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), e);
} finally {
if (!shouldTerminate()) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;

/**
* Used to throttle code execution in presence of failures using exponential backoff logic. The
Expand All @@ -43,7 +43,10 @@
* BackoffThrottler throttler = new BackoffThrottler(1000, 60000, 2);
* while(!stopped) {
* try {
* throttler.throttle();
* long throttleMs = throttler.getSleepTime();
* if (throttleMs > 0) {
* Thread.sleep(throttleMs);
* }
* // some code that can fail and should be throttled
* ...
* throttler.success();
Expand All @@ -56,6 +59,7 @@
*
* @author fateev
*/
@NotThreadSafe
public final class BackoffThrottler {

private final Duration initialSleep;
Expand All @@ -64,7 +68,7 @@ public final class BackoffThrottler {

private final double backoffCoefficient;

private final AtomicLong failureCount = new AtomicLong();
private int failureCount = 0;

/**
* Construct an instance of the throttler.
Expand All @@ -81,33 +85,26 @@ public BackoffThrottler(
this.backoffCoefficient = backoffCoefficient;
}

private long calculateSleepTime() {
double sleepMillis =
Math.pow(backoffCoefficient, failureCount.get() - 1) * initialSleep.toMillis();
public long getSleepTime() {
if (failureCount == 0) return 0;
double sleepMillis = Math.pow(backoffCoefficient, failureCount - 1) * initialSleep.toMillis();
if (maxSleep != null) {
return Math.min((long) sleepMillis, maxSleep.toMillis());
}
return (long) sleepMillis;
}

/**
* Sleep if there were failures since the last success call.
*
* @throws InterruptedException
*/
public void throttle() throws InterruptedException {
if (failureCount.get() > 0) {
Thread.sleep(calculateSleepTime());
}
public int getAttemptCount() {
return failureCount;
}

/** Resent failure count to 0. */
/** Reset failure count to 0. */
public void success() {
failureCount.set(0);
failureCount = 0;
}

/** Increment failure count. */
public void failure() {
failureCount.incrementAndGet();
failureCount++;
}
}
Loading

0 comments on commit cea3e73

Please sign in to comment.