Skip to content

Commit

Permalink
Shutdown Broker gracefully, but forcefully after brokerShutdownTimeou…
Browse files Browse the repository at this point in the history
…tMs (apache#10199)

* Wait for shutdown of BrokerService event loops

* Move CompletableFutureCancellationHandler to pulsar-common util

* Prevent misusage of CompletableFutureCancellationHandler

* Clear cancelAction field after the future completes

- make cancelAction eligible for GC after the future completes (or
  gets cancelled)

* Support cancel signalling when executing multiple futures

* Shutdown BrokerService gracefully which using closeAsync

- shutdown forcefully when closing times out

* Set 100ms to brokerShutdownTimeoutMs used in tests

* Revert changes in MessagingServiceShutdownHook

* Handle CancellationException since it's used in timeouts in BrokerService

* Set shutdown timeout to 0 ms in tests

* Ignore TimeoutException and CancellationException when broker shutdown timeout is 0

* Extract GracefulExecutorServicesShutdown and use it in PulsarService

- handle graceful / forcefully shutdown also for PulsarService executors

* Fix some unclosed PulsarServices

* Set shutdown timeout to 0 in some more tests

* Do some class and method renamings to clarify the code

* Revisit the logic to use awaitTermination

* Use shutdownNow to shutdown the scheduler used for future timeout handling
  • Loading branch information
lhotari authored Apr 16, 2021
1 parent 3d6e480 commit 152d1e6
Show file tree
Hide file tree
Showing 50 changed files with 841 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import io.netty.util.concurrent.DefaultThreadFactory;
import java.lang.reflect.Method;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -76,7 +77,7 @@ public void run() {
future.get(service.getConfiguration().getBrokerShutdownTimeoutMs(), TimeUnit.MILLISECONDS);

LOG.info("Completed graceful shutdown. Exiting");
} catch (TimeoutException e) {
} catch (TimeoutException | CancellationException e) {
LOG.warn("Graceful shutdown timeout expired. Closing now");
} catch (Exception e) {
LOG.error("Failed to perform graceful shutdown, Exiting anyway", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -48,6 +50,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -89,6 +92,7 @@
import org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.GracefulExecutorServicesShutdown;
import org.apache.pulsar.broker.service.SystemTopicBaseTxnBufferSnapshotService;
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.Topic;
Expand Down Expand Up @@ -164,6 +168,7 @@
@Setter(AccessLevel.PROTECTED)
public class PulsarService implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(PulsarService.class);
private static final double GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT = 0.5d;
private ServiceConfiguration config = null;
private NamespaceService nsService = null;
private ManagedLedgerStorage managedLedgerClientFactory = null;
Expand Down Expand Up @@ -236,7 +241,7 @@ public class PulsarService implements AutoCloseable {
private PulsarResources pulsarResources;

public enum State {
Init, Started, Closed
Init, Started, Closing, Closed
}

private volatile State state;
Expand Down Expand Up @@ -308,10 +313,15 @@ public void close() throws PulsarServerException {
try {
closeAsync().get();
} catch (ExecutionException e) {
if (e.getCause() instanceof PulsarServerException) {
throw (PulsarServerException) e.getCause();
Throwable cause = e.getCause();
if (cause instanceof PulsarServerException) {
throw (PulsarServerException) cause;
} else if (getConfiguration().getBrokerShutdownTimeoutMs() == 0
&& (cause instanceof TimeoutException || cause instanceof CancellationException)) {
// ignore shutdown timeout when timeout is 0, which is primarily used in tests
// to forcefully shutdown the broker
} else {
throw new PulsarServerException(e.getCause());
throw new PulsarServerException(cause);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand All @@ -327,6 +337,7 @@ public CompletableFuture<Void> closeAsync() {
if (closeFuture != null) {
return closeFuture;
}
state = State.Closing;

// close the service in reverse order v.s. in which they are started
if (this.webService != null) {
Expand All @@ -345,6 +356,15 @@ public CompletableFuture<Void> closeAsync() {
this.webSocketService.close();
}

GracefulExecutorServicesShutdown executorServicesShutdown =
GracefulExecutorServicesShutdown
.initiate()
.timeout(
Duration.ofMillis(
(long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT
* getConfiguration()
.getBrokerShutdownTimeoutMs())));

List<CompletableFuture<Void>> asyncCloseFutures = new ArrayList<>();
if (this.brokerService != null) {
asyncCloseFutures.add(this.brokerService.closeAsync());
Expand All @@ -366,7 +386,7 @@ public CompletableFuture<Void> closeAsync() {
this.leaderElectionService = null;
}

loadManagerExecutor.shutdown();
executorServicesShutdown.shutdown(loadManagerExecutor);

if (globalZkCache != null) {
globalZkCache.close();
Expand Down Expand Up @@ -402,24 +422,11 @@ public CompletableFuture<Void> closeAsync() {
nsService = null;
}

if (compactorExecutor != null) {
compactorExecutor.shutdown();
}

if (offloaderScheduler != null) {
offloaderScheduler.shutdown();
}

// executor is not initialized in mocks even when real close method is called
// guard against null executors
if (executor != null) {
executor.shutdown();
}

if (orderedExecutor != null) {
orderedExecutor.shutdown();
}
cacheExecutor.shutdown();
executorServicesShutdown.shutdown(compactorExecutor);
executorServicesShutdown.shutdown(offloaderScheduler);
executorServicesShutdown.shutdown(executor);
executorServicesShutdown.shutdown(orderedExecutor);
executorServicesShutdown.shutdown(cacheExecutor);

LoadManager loadManager = this.loadManager.get();
if (loadManager != null) {
Expand All @@ -441,10 +448,7 @@ public CompletableFuture<Void> closeAsync() {
transactionBufferClient.close();
}

if (transactionExecutor != null) {
transactionExecutor.shutdown();
transactionExecutor = null;
}
executorServicesShutdown.shutdown(transactionExecutor);

if (coordinationService != null) {
coordinationService.close();
Expand All @@ -457,20 +461,23 @@ public CompletableFuture<Void> closeAsync() {
configurationMetadataStore.close();
}

state = State.Closed;
isClosedCondition.signalAll();
// add timeout handling for closing executors
asyncCloseFutures.add(executorServicesShutdown.handle());

CompletableFuture<Void> shutdownFuture =
CompletableFuture.allOf(asyncCloseFutures.toArray(new CompletableFuture[0]));
closeFuture = shutdownFuture;
return shutdownFuture;
closeFuture = addTimeoutHandling(FutureUtil.waitForAllAndSupportCancel(asyncCloseFutures));
closeFuture.handle((v, t) -> {
state = State.Closed;
isClosedCondition.signalAll();
return null;
});
return closeFuture;
} catch (Exception e) {
PulsarServerException pse;
if (e instanceof CompletionException && e.getCause() instanceof MetadataStoreException) {
pse = new PulsarServerException(MetadataStoreException.unwrap((CompletionException) e));
pse = new PulsarServerException(MetadataStoreException.unwrap(e));
} else if (e.getCause() instanceof CompletionException
&& e.getCause().getCause() instanceof MetadataStoreException) {
pse = new PulsarServerException(MetadataStoreException.unwrap((CompletionException) e.getCause()));
pse = new PulsarServerException(MetadataStoreException.unwrap(e.getCause()));
} else {
pse = new PulsarServerException(e);
}
Expand All @@ -480,6 +487,20 @@ public CompletableFuture<Void> closeAsync() {
}
}

private CompletableFuture<Void> addTimeoutHandling(CompletableFuture<Void> future) {
ScheduledExecutorService shutdownExecutor = Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory(getClass().getSimpleName() + "-shutdown"));
FutureUtil.addTimeoutHandling(future,
Duration.ofMillis(Math.max(1L, getConfiguration().getBrokerShutdownTimeoutMs())),
shutdownExecutor, () -> FutureUtil.createTimeoutException("Timeout in close", getClass(), "close"));
future.handle((v, t) -> {
// shutdown the shutdown executor
shutdownExecutor.shutdownNow();
return null;
});
return future;
}

/**
* Get the current service configuration.
*
Expand Down
Loading

0 comments on commit 152d1e6

Please sign in to comment.