Skip to content

Commit

Permalink
[FLINK-18783] RpcSystem extends AutoCloseable
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Jul 13, 2021
1 parent 9cf4c48 commit 548d70a
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* This interface serves as a factory interface for RPC services, with some additional utilities
* that are reliant on implementation details of the RPC service.
*/
public interface RpcSystem extends RpcSystemUtils {
public interface RpcSystem extends RpcSystemUtils, AutoCloseable {

/**
* Returns a builder for an {@link RpcService} that is only reachable from the local machine.
Expand All @@ -51,6 +51,10 @@ RpcServiceBuilder remoteServiceBuilder(
@Nullable String externalAddress,
String externalPortRange);

/** Hook to cleanup resources, like common thread pools or classloaders. */
@Override
default void close() {}

/** Builder for {@link RpcService}. */
interface RpcServiceBuilder {
RpcServiceBuilder withComponentName(String name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro
private ExecutionGraphInfoStore executionGraphInfoStore;

private final Thread shutDownHook;
private RpcSystem rpcSystem;

protected ClusterEntrypoint(Configuration configuration) {
this.configuration = generateClusterConfiguration(configuration);
Expand Down Expand Up @@ -293,7 +294,7 @@ protected void initializeServices(Configuration configuration, PluginManager plu
LOG.info("Initializing cluster services.");

synchronized (lock) {
final RpcSystem rpcSystem = RpcSystem.load();
rpcSystem = RpcSystem.load();

commonRpcService =
RpcUtils.createRemoteRpcService(
Expand Down Expand Up @@ -499,8 +500,12 @@ private CompletableFuture<ApplicationStatus> shutDownAsync(
FutureUtils.composeAfterwards(
shutDownApplicationFuture, () -> stopClusterServices(cleanupHaData));

final CompletableFuture<Void> rpcSystemClassLoaderCloseFuture =
FutureUtils.runAfterwards(serviceShutdownFuture, rpcSystem::close);

final CompletableFuture<Void> cleanupDirectoriesFuture =
FutureUtils.runAfterwards(serviceShutdownFuture, this::cleanupDirectories);
FutureUtils.runAfterwards(
rpcSystemClassLoaderCloseFuture, this::cleanupDirectories);

cleanupDirectoriesFuture.whenComplete(
(Void ignored2, Throwable serviceThrowable) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ public class MiniCluster implements AutoCloseableAsync {
/** Flag marking the mini cluster as started/running. */
private volatile boolean running;

@GuardedBy("lock")
private RpcSystem rpcSystem;

// ------------------------------------------------------------------------

/**
Expand Down Expand Up @@ -273,7 +276,7 @@ public void start() throws Exception {
try {
initializeIOFormatClasses(configuration);

final RpcSystem rpcSystem = RpcSystem.load();
rpcSystem = RpcSystem.load();

LOG.info("Starting Metrics Registry");
metricRegistry =
Expand Down Expand Up @@ -1064,6 +1067,12 @@ private void terminateMiniClusterServices() throws Exception {
haServices = null;
}

try {
rpcSystem.close();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}

if (exception != null) {
throw exception;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ public class TaskManagerRunner implements FatalErrorHandler {

private final CompletableFuture<Result> terminationFuture;

private final RpcSystem rpcSystem;

private boolean shutdown;

public TaskManagerRunner(
Expand All @@ -137,7 +139,7 @@ public TaskManagerRunner(
throws Exception {
this.configuration = checkNotNull(configuration);

final RpcSystem rpcSystem = RpcSystem.load();
rpcSystem = RpcSystem.load();

timeout = Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION));

Expand Down Expand Up @@ -252,7 +254,10 @@ private CompletableFuture<Result> closeAsync(Result terminationResult) {
FutureUtils.composeAfterwards(
taskManagerTerminationFuture, this::shutDownServices);

serviceTerminationFuture.whenComplete(
final CompletableFuture<Void> rpcSystemClassLoaderCloseFuture =
FutureUtils.runAfterwards(serviceTerminationFuture, rpcSystem::close);

rpcSystemClassLoaderCloseFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
if (throwable != null) {
terminationFuture.completeExceptionally(throwable);
Expand Down

0 comments on commit 548d70a

Please sign in to comment.