Skip to content

Commit

Permalink
[FLINK-27737][rpc] Remove legacy support for unfenced executor in Fen…
Browse files Browse the repository at this point in the history
…cedRpcEndpoint

This closes apache#19938
  • Loading branch information
Aitozi authored and xintongsong committed Jun 13, 2022
1 parent aa693b5 commit 41ac1ba
Show file tree
Hide file tree
Showing 14 changed files with 19 additions and 768 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,6 @@ private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerExce
recoveredWorkers.size(),
++currentMaxAttemptId);

// Should not invoke resource event handler on the main thread executor.
// We are in the initializing thread. The main thread executor is not yet ready.
getResourceEventHandler().onPreviousAttemptWorkersRecovered(recoveredWorkers);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter;
import org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils;
import org.apache.flink.runtime.rpc.FencedMainThreadExecutable;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.RpcEndpoint;
Expand Down Expand Up @@ -296,8 +295,6 @@ public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint)
((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken,
captureAskCallstacks,
flinkClassLoader);

implementedRpcGateways.add(FencedMainThreadExecutable.class);
} else {
akkaInvocationHandler =
new AkkaInvocationHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,24 @@

package org.apache.flink.runtime.rpc.akka;

import org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils;
import org.apache.flink.runtime.rpc.FencedMainThreadExecutable;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.messages.CallAsync;
import org.apache.flink.runtime.rpc.MainThreadExecutable;
import org.apache.flink.runtime.rpc.messages.FencedMessage;
import org.apache.flink.runtime.rpc.messages.LocalFencedMessage;
import org.apache.flink.runtime.rpc.messages.RemoteFencedMessage;
import org.apache.flink.runtime.rpc.messages.RunAsync;
import org.apache.flink.runtime.rpc.messages.UnfencedMessage;
import org.apache.flink.util.Preconditions;

import akka.actor.ActorRef;
import akka.pattern.Patterns;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Fenced extension of the {@link AkkaInvocationHandler}. This invocation handler will be used in
* combination with the {@link FencedRpcEndpoint}. The fencing is done by wrapping all messages in a
Expand All @@ -52,7 +44,7 @@
* @param <F> type of the fencing token
*/
public class FencedAkkaInvocationHandler<F extends Serializable> extends AkkaInvocationHandler
implements FencedMainThreadExecutable, FencedRpcGateway<F> {
implements MainThreadExecutable, FencedRpcGateway<F> {

private final Supplier<F> fencingTokenSupplier;

Expand Down Expand Up @@ -85,54 +77,14 @@ public FencedAkkaInvocationHandler(
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Class<?> declaringClass = method.getDeclaringClass();

if (declaringClass.equals(FencedMainThreadExecutable.class)
if (declaringClass.equals(MainThreadExecutable.class)
|| declaringClass.equals(FencedRpcGateway.class)) {
return method.invoke(this, args);
} else {
return super.invoke(proxy, method, args);
}
}

@Override
public void runAsyncWithoutFencing(Runnable runnable) {
checkNotNull(runnable, "runnable");

if (isLocal) {
getActorRef()
.tell(new UnfencedMessage<>(new RunAsync(runnable, 0L)), ActorRef.noSender());
} else {
throw new RuntimeException(
"Trying to send a Runnable to a remote actor at "
+ getActorRef().path()
+ ". This is not supported.");
}
}

@Override
public <V> CompletableFuture<V> callAsyncWithoutFencing(
Callable<V> callable, Duration timeout) {
checkNotNull(callable, "callable");
checkNotNull(timeout, "timeout");

if (isLocal) {
@SuppressWarnings("unchecked")
CompletableFuture<V> resultFuture =
(CompletableFuture<V>)
AkkaFutureUtils.toJava(
Patterns.ask(
getActorRef(),
new UnfencedMessage<>(new CallAsync(callable)),
timeout.toMillis()));

return resultFuture;
} else {
throw new RuntimeException(
"Trying to send a Runnable to a remote actor at "
+ getActorRef().path()
+ ". This is not supported.");
}
}

@Override
public void tell(Object message) {
super.tell(fenceMessage(message));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.runtime.rpc.messages.FencedMessage;
import org.apache.flink.runtime.rpc.messages.LocalFencedMessage;
import org.apache.flink.runtime.rpc.messages.UnfencedMessage;

import java.io.Serializable;
import java.util.Objects;
Expand Down Expand Up @@ -97,15 +96,12 @@ protected void handleRpcMessage(Object message) {
+ '.'));
}
}
} else if (message instanceof UnfencedMessage) {
super.handleRpcMessage(((UnfencedMessage<?>) message).getPayload());
} else {
if (log.isDebugEnabled()) {
log.debug(
"Unknown message type: Ignoring message {} because it is neither of type {} nor {}.",
"Unknown message type: Ignoring message {} because it is not of type {}.",
message,
FencedMessage.class.getSimpleName(),
UnfencedMessage.class.getSimpleName());
FencedMessage.class.getSimpleName());
}

sendErrorIfSender(
Expand All @@ -114,11 +110,9 @@ protected void handleRpcMessage(Object message) {
+ message
+ " of type "
+ message.getClass().getSimpleName()
+ " because it is neither of type "
+ " because it is not of type "
+ FencedMessage.class.getSimpleName()
+ " nor "
+ UnfencedMessage.class.getSimpleName()
+ '.'));
+ "."));
}
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,10 @@

package org.apache.flink.runtime.rpc;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.Serializable;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

/**
* Base class for fenced {@link RpcEndpoint}. A fenced rpc endpoint expects all rpc messages being
Expand All @@ -40,149 +32,22 @@
*/
public abstract class FencedRpcEndpoint<F extends Serializable> extends RpcEndpoint {

private final UnfencedMainThreadExecutor unfencedMainThreadExecutor;
private volatile F fencingToken;
private volatile MainThreadExecutor fencedMainThreadExecutor;
private final F fencingToken;

protected FencedRpcEndpoint(
RpcService rpcService, String endpointId, @Nullable F fencingToken) {
protected FencedRpcEndpoint(RpcService rpcService, String endpointId, F fencingToken) {
super(rpcService, endpointId);

Preconditions.checkArgument(
rpcServer instanceof FencedMainThreadExecutable,
"The rpcServer must be of type %s.",
FencedMainThreadExecutable.class.getSimpleName());
Preconditions.checkNotNull(fencingToken, "The fence token should be null");
Preconditions.checkNotNull(rpcServer, "The rpc server should be null");

// no fencing token == no leadership
this.fencingToken = fencingToken;
this.unfencedMainThreadExecutor =
new UnfencedMainThreadExecutor((FencedMainThreadExecutable) rpcServer);

MainThreadExecutable mainThreadExecutable =
getRpcService().fenceRpcServer(rpcServer, fencingToken);
setFencedMainThreadExecutor(
new MainThreadExecutor(
mainThreadExecutable, this::validateRunsInMainThread, endpointId));
}

protected FencedRpcEndpoint(RpcService rpcService, @Nullable F fencingToken) {
protected FencedRpcEndpoint(RpcService rpcService, F fencingToken) {
this(rpcService, UUID.randomUUID().toString(), fencingToken);
}

public F getFencingToken() {
return fencingToken;
}

protected void setFencingToken(@Nullable F newFencingToken) {
// this method should only be called from within the main thread
validateRunsInMainThread();

this.fencingToken = newFencingToken;

// setting a new fencing token entails that we need a new MainThreadExecutor
// which is bound to the new fencing token
MainThreadExecutable mainThreadExecutable =
getRpcService().fenceRpcServer(rpcServer, newFencingToken);
setFencedMainThreadExecutor(
new MainThreadExecutor(
mainThreadExecutable, this::validateRunsInMainThread, getEndpointId()));
}

/**
* Set fenced main thread executor and register it to closeable register.
*
* @param fencedMainThreadExecutor the given fenced main thread executor
*/
private void setFencedMainThreadExecutor(MainThreadExecutor fencedMainThreadExecutor) {
if (this.fencedMainThreadExecutor != null) {
this.fencedMainThreadExecutor.close();
unregisterResource(this.fencedMainThreadExecutor);
}
this.fencedMainThreadExecutor = fencedMainThreadExecutor;
registerResource(this.fencedMainThreadExecutor);
}

/**
* Returns a main thread executor which is bound to the currently valid fencing token. This
* means that runnables which are executed with this executor fail after the fencing token has
* changed. This allows to scope operations by the fencing token.
*
* @return MainThreadExecutor bound to the current fencing token
*/
@Override
protected MainThreadExecutor getMainThreadExecutor() {
return fencedMainThreadExecutor;
}

/**
* Returns a main thread executor which is not bound to the fencing token. This means that
* {@link Runnable} which are executed with this executor will always be executed.
*
* @return MainThreadExecutor which is not bound to the fencing token
*/
protected Executor getUnfencedMainThreadExecutor() {
return unfencedMainThreadExecutor;
}

/**
* Run the given runnable in the main thread of the RpcEndpoint without checking the fencing
* token. This allows to run operations outside of the fencing token scope.
*
* @param runnable to execute in the main thread of the rpc endpoint without checking the
* fencing token.
*/
protected void runAsyncWithoutFencing(Runnable runnable) {
if (rpcServer instanceof FencedMainThreadExecutable) {
((FencedMainThreadExecutable) rpcServer).runAsyncWithoutFencing(runnable);
} else {
throw new RuntimeException(
"FencedRpcEndpoint has not been started with a FencedMainThreadExecutable RpcServer.");
}
}

/**
* Run the given callable in the main thread of the RpcEndpoint without checking the fencing
* token. This allows to run operations outside of the fencing token scope.
*
* @param callable to run in the main thread of the rpc endpoint without checking the fencing
* token.
* @param timeout for the operation.
* @return Future containing the callable result.
*/
protected <V> CompletableFuture<V> callAsyncWithoutFencing(
Callable<V> callable, Duration timeout) {
if (rpcServer instanceof FencedMainThreadExecutable) {
return ((FencedMainThreadExecutable) rpcServer)
.callAsyncWithoutFencing(callable, timeout);
} else {
throw new RuntimeException(
"FencedRpcEndpoint has not been started with a FencedMainThreadExecutable RpcServer.");
}
}

@VisibleForTesting
public boolean validateResourceClosed() {
return super.validateResourceClosed()
&& (fencedMainThreadExecutor == null
|| fencedMainThreadExecutor.validateScheduledExecutorClosed());
}

// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------

/** Executor which executes {@link Runnable} in the main thread context without fencing. */
private static class UnfencedMainThreadExecutor implements Executor {

private final FencedMainThreadExecutable gateway;

UnfencedMainThreadExecutor(FencedMainThreadExecutable gateway) {
this.gateway = Preconditions.checkNotNull(gateway);
}

@Override
public void execute(@Nonnull Runnable runnable) {
gateway.runAsyncWithoutFencing(runnable);
}
}
}
Loading

0 comments on commit 41ac1ba

Please sign in to comment.