Skip to content

Commit

Permalink
[Performance] Optimize CompletableFuture timeout handling (apache#10065)
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari authored Mar 29, 2021
1 parent 907fcb5 commit 5fcd0d1
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,13 @@
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
Expand Down Expand Up @@ -60,6 +58,7 @@
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.policies.data.BrokerInfo;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -68,6 +67,7 @@
*/
public class BrokersBase extends PulsarWebResource {
private static final Logger LOG = LoggerFactory.getLogger(BrokersBase.class);
private static final Duration HEALTHCHECK_READ_TIMEOUT = Duration.ofSeconds(10);

@GET
@Path("/{cluster}")
Expand Down Expand Up @@ -346,13 +346,10 @@ public void healthcheck(@Suspended AsyncResponse asyncResponse) throws Exception
healthcheckReadLoop(readerFuture, completePromise, messageStr);

// timeout read loop after 10 seconds
ScheduledFuture<?> timeout = pulsar().getExecutor().schedule(() -> {
completePromise.completeExceptionally(new TimeoutException("Timed out reading"));
}, 10, TimeUnit.SECONDS);
// don't leave timeout dangling
completePromise.whenComplete((ignore2, exception2) -> {
timeout.cancel(false);
});
FutureUtil.addTimeoutHandling(completePromise,
HEALTHCHECK_READ_TIMEOUT, pulsar().getExecutor(),
() -> FutureUtil.createTimeoutException("Timed out reading", getClass(),
"healthcheck(...)"));
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -179,6 +180,13 @@
@Setter(AccessLevel.PROTECTED)
public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies> {
private static final Logger log = LoggerFactory.getLogger(BrokerService.class);
private static final Duration FUTURE_DEADLINE_TIMEOUT_DURATION = Duration.ofSeconds(60);
private static final TimeoutException FUTURE_DEADLINE_TIMEOUT_EXCEPTION =
FutureUtil.createTimeoutException("Future didn't finish within deadline", BrokerService.class,
"futureWithDeadline(...)");
private static final TimeoutException FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION =
FutureUtil.createTimeoutException("Failed to load topic within timeout", BrokerService.class,
"futureWithDeadline(...)");

private final PulsarService pulsar;
private final ManagedLedgerFactory managedLedgerFactory;
Expand Down Expand Up @@ -847,7 +855,7 @@ public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean c
}
} catch (IllegalArgumentException e) {
log.warn("[{}] Illegalargument exception when loading topic", topic, e);
return failedFuture(e);
return FutureUtil.failedFuture(e);
} catch (RuntimeException e) {
Throwable cause = e.getCause();
if (cause instanceof ServiceUnitNotReadyException) {
Expand All @@ -856,7 +864,7 @@ public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean c
log.warn("[{}] Unexpected exception when loading topic: {}", topic, e.getMessage(), e);
}

return failedFuture(cause);
return FutureUtil.failedFuture(cause);
}
}

Expand Down Expand Up @@ -964,25 +972,9 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
return topicFuture;
}

private static <T> CompletableFuture<T> failedFuture(Throwable t) {
CompletableFuture<T> future = new CompletableFuture<>();
future.completeExceptionally(t);
return future;
}

private <T> CompletableFuture<T> futureWithDeadline(Long delay, TimeUnit unit, Exception exp) {
CompletableFuture<T> future = new CompletableFuture<T>();
executor().schedule(() -> {
if (!future.isDone()) {
future.completeExceptionally(exp);
}
}, delay, unit);
return future;
}

private <T> CompletableFuture<T> futureWithDeadline() {
return futureWithDeadline(60000L, TimeUnit.MILLISECONDS,
new TimeoutException("Future didn't finish within deadline"));
return FutureUtil.createFutureWithTimeout(FUTURE_DEADLINE_TIMEOUT_DURATION, executor(),
() -> FUTURE_DEADLINE_TIMEOUT_EXCEPTION);
}

public PulsarClient getReplicationClient(String cluster) {
Expand Down Expand Up @@ -1093,9 +1085,9 @@ public PulsarAdmin getClusterPulsarAdmin(String cluster) {
*/
protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final String topic,
boolean createIfMissing) throws RuntimeException {
final CompletableFuture<Optional<Topic>> topicFuture = futureWithDeadline(
pulsar.getConfiguration().getTopicLoadTimeoutSeconds(),
TimeUnit.SECONDS, new TimeoutException("Failed to load topic within timeout"));
final CompletableFuture<Optional<Topic>> topicFuture = FutureUtil.createFutureWithTimeout(
Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), executor(),
() -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION);
if (!pulsar.getConfiguration().isEnablePersistentTopics()) {
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load persistent topic {}", topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,13 @@
import com.google.common.collect.ImmutableMap;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
Expand Down Expand Up @@ -62,6 +60,7 @@ public class TwoPhaseCompactor extends Compactor {
private static final Logger log = LoggerFactory.getLogger(TwoPhaseCompactor.class);
private static final int MAX_OUTSTANDING = 500;
private static final String COMPACTED_TOPIC_LEDGER_PROPERTY = "CompactedTopicLedger";
public static final Duration PHASE_ONE_LOOP_READ_TIMEOUT = Duration.ofSeconds(10);

public TwoPhaseCompactor(ServiceConfiguration conf,
PulsarClient pulsar,
Expand Down Expand Up @@ -116,7 +115,9 @@ private void phaseOneLoop(RawReader reader,
return;
}
CompletableFuture<RawMessage> future = reader.readNextAsync();
scheduleTimeout(future);
FutureUtil.addTimeoutHandling(future,
PHASE_ONE_LOOP_READ_TIMEOUT, scheduler,
() -> FutureUtil.createTimeoutException("Timeout", getClass(), "phaseOneLoop(...)"));

future.thenAcceptAsync(m -> {
try {
Expand Down Expand Up @@ -172,15 +173,6 @@ private void phaseOneLoop(RawReader reader,
});
}

private void scheduleTimeout(CompletableFuture<RawMessage> future) {
Future<?> timeout = scheduler.schedule(() -> {
future.completeExceptionally(new TimeoutException("Timeout"));
}, 10, TimeUnit.SECONDS);
future.whenComplete((res, exception) -> {
timeout.cancel(true);
});
}

private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, MessageId to, MessageId lastReadId,
Map<String, MessageId> latestForKey, BookKeeper bk) {
Map<String, byte[]> metadata =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;
Expand All @@ -50,6 +50,7 @@
import org.apache.pulsar.client.api.KeyStoreParams;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
import org.asynchttpclient.AsyncHttpClient;
Expand All @@ -71,10 +72,11 @@
*/
@Slf4j
public class AsyncHttpConnector implements Connector {

private static final TimeoutException READ_TIMEOUT_EXCEPTION =
FutureUtil.createTimeoutException("Read timeout", AsyncHttpConnector.class, "retryOrTimeout(...)");
@Getter
private final AsyncHttpClient httpClient;
private final int readTimeout;
private final Duration readTimeout;
private final int maxRetries;
private final PulsarServiceNameResolver serviceNameResolver;
private final ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1,
Expand Down Expand Up @@ -156,7 +158,7 @@ public boolean keepAlive(InetSocketAddress remoteAddress, Request ahcRequest,
}
}
httpClient = new DefaultAsyncHttpClient(confBuilder.build());
this.readTimeout = readTimeoutMs;
this.readTimeout = Duration.ofMillis(readTimeoutMs);
this.maxRetries = httpClient.getConfig().getMaxRequestRetry();
}

Expand Down Expand Up @@ -216,7 +218,8 @@ public Future<?> apply(ClientRequest jerseyRequest, AsyncConnectorCallback callb
private CompletableFuture<Response> retryOrTimeOut(ClientRequest request) {
final CompletableFuture<Response> resultFuture = new CompletableFuture<>();
retryOperation(resultFuture, () -> oneShot(serviceNameResolver.resolveHost(), request), maxRetries);
CompletableFuture<Response> timeoutAfter = timeoutAfter(readTimeout, TimeUnit.MILLISECONDS);
CompletableFuture<Response> timeoutAfter = FutureUtil.createFutureWithTimeout(readTimeout, delayer,
() -> READ_TIMEOUT_EXCEPTION);
return resultFuture.applyToEither(timeoutAfter, Function.identity());
}

Expand Down Expand Up @@ -297,12 +300,6 @@ private CompletableFuture<Response> oneShot(InetSocketAddress host, ClientReques
return builder.execute().toCompletableFuture();
}

public <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) {
CompletableFuture<T> result = new CompletableFuture<>();
delayer.schedule(() -> result.completeExceptionally(new TimeoutException()), timeout, unit);
return result;
}

@Override
public String getName() {
return "Pulsar-Admin";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@
*/
package org.apache.pulsar.common.util;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

/**
* This class is aimed at simplifying work with {@code CompletableFuture}.
Expand Down Expand Up @@ -50,4 +56,71 @@ public static Throwable unwrapCompletionException(Throwable t) {
return t;
}
}

/**
* Creates a new {@link CompletableFuture} instance with timeout handling.
*
* @param timeout the duration of the timeout
* @param executor the executor to use for scheduling the timeout
* @param exceptionSupplier the supplier for creating the exception
* @param <T> type parameter for the future
* @return the new {@link CompletableFuture} instance
*/
public static <T> CompletableFuture<T> createFutureWithTimeout(Duration timeout,
ScheduledExecutorService executor,
Supplier<Throwable> exceptionSupplier) {
return addTimeoutHandling(new CompletableFuture<>(), timeout, executor, exceptionSupplier);
}

/**
* Adds timeout handling to an existing {@link CompletableFuture}.
*
* @param future the target future
* @param timeout the duration of the timeout
* @param executor the executor to use for scheduling the timeout
* @param exceptionSupplier the supplier for creating the exception
* @param <T> type parameter for the future
* @return returns the original target future
*/
public static <T> CompletableFuture<T> addTimeoutHandling(CompletableFuture<T> future, Duration timeout,
ScheduledExecutorService executor,
Supplier<Throwable> exceptionSupplier) {
ScheduledFuture<?> scheduledFuture = executor.schedule(() -> {
if (!future.isDone()) {
future.completeExceptionally(exceptionSupplier.get());
}
}, timeout.toMillis(), TimeUnit.MILLISECONDS);
future.whenComplete((res, exception) -> scheduledFuture.cancel(false));
return future;
}

/**
* Creates a low-overhead timeout exception which is performance optimized to minimize allocations
* and cpu consumption. It sets the stacktrace of the exception to the given source class and
* source method name. The instances of this class can be cached or stored as constants and reused
* multiple times.
*
* @param message exception message
* @param sourceClass source class for manually filled in stacktrace
* @param sourceMethod source method name for manually filled in stacktrace
* @return new TimeoutException instance
*/
public static TimeoutException createTimeoutException(String message, Class<?> sourceClass, String sourceMethod) {
return new LowOverheadTimeoutException(message, sourceClass, sourceMethod);
}

private static class LowOverheadTimeoutException extends TimeoutException {
private static final long serialVersionUID = 1L;

LowOverheadTimeoutException(String message, Class<?> sourceClass, String sourceMethod) {
super(message);
setStackTrace(new StackTraceElement[]{new StackTraceElement(sourceClass.getName(), sourceMethod,
null, -1)});
}

@Override
public synchronized Throwable fillInStackTrace() {
return this;
}
}
}
Loading

0 comments on commit 5fcd0d1

Please sign in to comment.