Skip to content

Commit

Permalink
[FLINK-25085][runtime] Support schedule task in MainThreadExecutor lo…
Browse files Browse the repository at this point in the history
…cal thread pool

This closes apache#18303.
  • Loading branch information
FangYongs authored and KarmaGYZ committed Mar 18, 2022
1 parent 7bf97d5 commit f5cdeca
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.rpc;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -56,10 +57,12 @@ protected FencedRpcEndpoint(
this.fencingToken = fencingToken;
this.unfencedMainThreadExecutor =
new UnfencedMainThreadExecutor((FencedMainThreadExecutable) rpcServer);
this.fencedMainThreadExecutor =

MainThreadExecutable mainThreadExecutable =
getRpcService().fenceRpcServer(rpcServer, fencingToken);
setFencedMainThreadExecutor(
new MainThreadExecutor(
getRpcService().fenceRpcServer(rpcServer, fencingToken),
this::validateRunsInMainThread);
mainThreadExecutable, this::validateRunsInMainThread, endpointId));
}

protected FencedRpcEndpoint(RpcService rpcService, @Nullable F fencingToken) {
Expand All @@ -80,9 +83,23 @@ protected void setFencingToken(@Nullable F newFencingToken) {
// which is bound to the new fencing token
MainThreadExecutable mainThreadExecutable =
getRpcService().fenceRpcServer(rpcServer, newFencingToken);
setFencedMainThreadExecutor(
new MainThreadExecutor(
mainThreadExecutable, this::validateRunsInMainThread, getEndpointId()));
}

this.fencedMainThreadExecutor =
new MainThreadExecutor(mainThreadExecutable, this::validateRunsInMainThread);
/**
* Set fenced main thread executor and register it to closeable register.
*
* @param fencedMainThreadExecutor the given fenced main thread executor
*/
private void setFencedMainThreadExecutor(MainThreadExecutor fencedMainThreadExecutor) {
if (this.fencedMainThreadExecutor != null) {
this.fencedMainThreadExecutor.close();
unregisterResource(this.fencedMainThreadExecutor);
}
this.fencedMainThreadExecutor = fencedMainThreadExecutor;
registerResource(this.fencedMainThreadExecutor);
}

/**
Expand Down Expand Up @@ -142,6 +159,13 @@ protected <V> CompletableFuture<V> callAsyncWithoutFencing(Callable<V> callable,
}
}

@VisibleForTesting
public boolean validateResourceClosed() {
return super.validateResourceClosed()
&& (fencedMainThreadExecutor == null
|| fencedMainThreadExecutor.validateScheduledExecutorClosed());
}

// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,28 @@
package org.apache.flink.runtime.rpc;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ScheduledFutureAdapter;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

import java.io.Closeable;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -106,6 +113,12 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync {
*/
private final MainThreadExecutor mainThreadExecutor;

/**
* Register endpoint closeable resource to the registry and close them when the server is
* stopped.
*/
private final CloseableRegistry resourceRegistry;

/**
* Indicates whether the RPC endpoint is started and not stopped or being stopped.
*
Expand All @@ -125,8 +138,11 @@ protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
this.endpointId = checkNotNull(endpointId, "endpointId");

this.rpcServer = rpcService.startServer(this);
this.resourceRegistry = new CloseableRegistry();

this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
this.mainThreadExecutor =
new MainThreadExecutor(rpcServer, this::validateRunsInMainThread, endpointId);
registerResource(this.mainThreadExecutor);
}

/**
Expand Down Expand Up @@ -211,11 +227,43 @@ protected final void stop() {
*/
public final CompletableFuture<Void> internalCallOnStop() {
validateRunsInMainThread();
CompletableFuture<Void> stopFuture = onStop();
CompletableFuture<Void> stopFuture = new CompletableFuture<>();
try {
resourceRegistry.close();
stopFuture.complete(null);
} catch (IOException e) {
stopFuture.completeExceptionally(
new RuntimeException("Close resource registry fail", e));
}
stopFuture = CompletableFuture.allOf(stopFuture, onStop());
isRunning = false;
return stopFuture;
}

/**
* Register the given closeable resource to {@link CloseableRegistry}.
*
* @param closeableResource the given closeable resource
*/
protected void registerResource(Closeable closeableResource) {
try {
resourceRegistry.registerCloseable(closeableResource);
} catch (IOException e) {
throw new RuntimeException(
"Registry closeable resource " + closeableResource + " fail", e);
}
}

/**
* Unregister the given closeable resource from {@link CloseableRegistry}.
*
* @param closeableResource the given closeable resource
* @return true if the given resource unregister successful, otherwise false
*/
protected boolean unregisterResource(Closeable closeableResource) {
return resourceRegistry.unregisterCloseable(closeableResource);
}

/**
* User overridable callback which is called from {@link #internalCallOnStop()}.
*
Expand Down Expand Up @@ -395,43 +443,91 @@ public void validateRunsInMainThread() {
assert MainThreadValidatorUtil.isRunningInExpectedThread(currentMainThread.get());
}

/**
* Validate whether all the resources are closed.
*
* @return true if all the resources are closed, otherwise false
*/
boolean validateResourceClosed() {
return mainThreadExecutor.validateScheduledExecutorClosed() && resourceRegistry.isClosed();
}

// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------

/** Executor which executes runnables in the main thread context. */
protected static class MainThreadExecutor implements ComponentMainThreadExecutor {
protected static class MainThreadExecutor implements ComponentMainThreadExecutor, Closeable {
private static final Logger log = LoggerFactory.getLogger(MainThreadExecutor.class);

private final MainThreadExecutable gateway;
private final Runnable mainThreadCheck;

MainThreadExecutor(MainThreadExecutable gateway, Runnable mainThreadCheck) {
/**
* The main scheduled executor manages the scheduled tasks and send them to gateway when
* they should be executed.
*/
private final ScheduledExecutorService mainScheduledExecutor;

MainThreadExecutor(
MainThreadExecutable gateway, Runnable mainThreadCheck, String endpointId) {
this.gateway = Preconditions.checkNotNull(gateway);
this.mainThreadCheck = Preconditions.checkNotNull(mainThreadCheck);
}

private void scheduleRunAsync(Runnable runnable, long delayMillis) {
gateway.scheduleRunAsync(runnable, delayMillis);
this.mainScheduledExecutor =
Executors.newSingleThreadScheduledExecutor(
new ExecutorThreadFactory(endpointId + "-main-scheduler"));
}

@Override
public void execute(@Nonnull Runnable command) {
gateway.runAsync(command);
}

/**
* The mainScheduledExecutor manages the task and sends it to the gateway after the given
* delay.
*
* @param command the task to execute in the future
* @param delay the time from now to delay the execution
* @param unit the time unit of the delay parameter
* @return a ScheduledFuture representing the completion of the scheduled task
*/
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, unit);
FutureTask<Void> ft = new FutureTask<>(command, null);
scheduleRunAsync(ft, delayMillis);
if (mainScheduledExecutor.isShutdown()) {
log.warn(
"The scheduled executor service is shutdown and ignores the command {}",
command);
} else {
mainScheduledExecutor.schedule(
() -> gateway.runAsync(ft), delayMillis, TimeUnit.MILLISECONDS);
}
return new ScheduledFutureAdapter<>(ft, delayMillis, TimeUnit.MILLISECONDS);
}

/**
* The mainScheduledExecutor manages the given callable and sends it to the gateway after
* the given delay.
*
* @param callable the callable to execute
* @param delay the time from now to delay the execution
* @param unit the time unit of the delay parameter
* @param <V> result type of the callable
* @return a ScheduledFuture which holds the future value of the given callable
*/
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, unit);
FutureTask<V> ft = new FutureTask<>(callable);
scheduleRunAsync(ft, delayMillis);
if (mainScheduledExecutor.isShutdown()) {
log.warn(
"The scheduled executor service is shutdown and ignores the callable {}",
callable);
} else {
mainScheduledExecutor.schedule(
() -> gateway.runAsync(ft), delayMillis, TimeUnit.MILLISECONDS);
}
return new ScheduledFutureAdapter<>(ft, delayMillis, TimeUnit.MILLISECONDS);
}

Expand All @@ -453,5 +549,22 @@ public ScheduledFuture<?> scheduleWithFixedDelay(
public void assertRunningInMainThread() {
mainThreadCheck.run();
}

/** Shutdown the {@link ScheduledThreadPoolExecutor} and remove all the pending tasks. */
@Override
public void close() {
if (!mainScheduledExecutor.isShutdown()) {
mainScheduledExecutor.shutdownNow();
}
}

/**
* Validate whether the scheduled executor is closed.
*
* @return true if the scheduled executor is shutdown, otherwise false
*/
final boolean validateScheduledExecutorClosed() {
return mainScheduledExecutor.isShutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public void testFencingTokenSetting() throws Exception {
assertEquals(newFencingToken, fencedTestingEndpoint.getFencingToken());
} finally {
RpcUtils.terminateRpcEndpoint(fencedTestingEndpoint, timeout);
fencedTestingEndpoint.validateResourceClosed();
}
}

Expand Down Expand Up @@ -178,6 +179,7 @@ public void testFencing() throws Exception {

} finally {
RpcUtils.terminateRpcEndpoint(fencedTestingEndpoint, timeout);
fencedTestingEndpoint.validateResourceClosed();
}
}

Expand Down Expand Up @@ -245,6 +247,7 @@ public void testRemoteAndSelfGateways() throws Exception {
}
} finally {
RpcUtils.terminateRpcEndpoint(fencedTestingEndpoint, timeout);
fencedTestingEndpoint.validateResourceClosed();
}
}

Expand Down Expand Up @@ -294,6 +297,7 @@ public void testMainThreadExecutorUnderChangingFencingToken() throws Exception {

} finally {
RpcUtils.terminateRpcEndpoint(fencedTestingEndpoint, timeout);
fencedTestingEndpoint.validateResourceClosed();
}
}

Expand Down Expand Up @@ -335,6 +339,7 @@ public void testUnfencedRemoteGateway() throws Exception {
}
} finally {
RpcUtils.terminateRpcEndpoint(fencedTestingEndpoint, timeout);
fencedTestingEndpoint.validateResourceClosed();
}
}

Expand Down
Loading

0 comments on commit f5cdeca

Please sign in to comment.