Skip to content

Commit

Permalink
[LIVY-735][RSC] Fix rpc channel closed when multi clients connect to …
Browse files Browse the repository at this point in the history
…one driver

## What changes were proposed in this pull request?

Currently, the driver tries to support communicating with multi-clients, by registering each client at https://github.com/apache/incubator-livy/blob/master/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java#L220.

But actually, if multi-clients connect to one driver, the rpc channel will close, the reason are as follows.

1.  In every communication, client sends two packages to driver: header{type, id}, and payload at https://github.com/apache/incubator-livy/blob/master/rsc/src/main/java/org/apache/livy/rsc/rpc/RpcDispatcher.java#L144.

2. If client1 sends header1, payload1, and client2 sends header2, payload2 at the same time.
  The driver receives the package in the order: header1, header2, payload1, payload2.

3. When driver receives header1, driver assigns lastHeader at https://github.com/apache/incubator-livy/blob/master/rsc/src/main/java/org/apache/livy/rsc/rpc/RpcDispatcher.java#L73.

4. Then driver receives header2, driver process it as a payload at https://github.com/apache/incubator-livy/blob/master/rsc/src/main/java/org/apache/livy/rsc/rpc/RpcDispatcher.java#L78 which cause exception and rpc channel closed.

In the muti-active HA mode, the design doc is at: https://docs.google.com/document/d/1bD3qYZpw14_NuCcSGUOfqQ0pqvSbCQsOLFuZp26Ohjc/edit?usp=sharing, the session is allocated among servers by consistent hashing. If a new livy joins, some session will be migrated from old livy to new livy. If the session client in new livy connect to driver before stoping session client in old livy, then two session clients will both connect to driver, and rpc channel close.  In this case, it's hard to ensure only one client connect to one driver at any time. So it's better to support multi-clients connect to one driver, which has no side effects.

How to fix:
1. Move the code of processing client message from `RpcDispatcher` to each `Rpc`.
2. Each `Rpc` registers itself to `channelRpc` in RpcDispatcher.
3. `RpcDispatcher` dispatches each message to `Rpc` according to  `ctx.channel()`.

## How was this patch tested?

Existed UT and IT

Author: runzhiwang <[email protected]>

Closes apache#268 from runzhiwang/multi-client-one-driver.
  • Loading branch information
runzhiwang authored and jerryshao committed Jan 8, 2020
1 parent 40ea8cc commit 66b5833
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ private void registerClient(final Rpc client) {
@Override
public void onSuccess(Void unused) {
clients.remove(client);
client.unRegisterRpc();
if (!inShutdown.get()) {
setupIdleTimeout();
}
Expand Down
185 changes: 179 additions & 6 deletions rsc/src/main/java/org/apache/livy/rsc/rpc/Rpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -208,6 +209,7 @@ static Rpc createEmbedded(RpcDispatcher dispatcher) {
dispatcher);
Rpc rpc = new Rpc(new RSCConf(null), c, ImmediateEventExecutor.INSTANCE);
rpc.dispatcher = dispatcher;
dispatcher.registerRpc(c, rpc);
return rpc;
}

Expand All @@ -218,6 +220,10 @@ static Rpc createEmbedded(RpcDispatcher dispatcher) {
private final EventExecutorGroup egroup;
private volatile RpcDispatcher dispatcher;

private final Map<Class<?>, Method> handlers = new ConcurrentHashMap<>();
private final Collection<OutstandingRpc> rpcCalls = new ConcurrentLinkedQueue<OutstandingRpc>();
private volatile Rpc.MessageHeader lastHeader;

private Rpc(RSCConf config, Channel channel, EventExecutorGroup egroup) {
Utils.checkArgument(channel != null);
Utils.checkArgument(egroup != null);
Expand All @@ -238,6 +244,166 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
});
}

/**
* For debugging purposes.
* @return The name of this Class.
*/
protected String name() {
return getClass().getSimpleName();
}

public void handleMsg(ChannelHandlerContext ctx, Object msg, Class<?> handleClass, Object obj)
throws Exception {
if (lastHeader == null) {
if (!(msg instanceof MessageHeader)) {
LOG.warn("[{}] Expected RPC header, got {} instead.", name(),
msg != null ? msg.getClass().getName() : null);
throw new IllegalArgumentException();
}
lastHeader = (MessageHeader) msg;
} else {
LOG.debug("[{}] Received RPC message: type={} id={} payload={}", name(),
lastHeader.type, lastHeader.id, msg != null ? msg.getClass().getName() : null);
try {
switch (lastHeader.type) {
case CALL:
handleCall(ctx, msg, handleClass, obj);
break;
case REPLY:
handleReply(ctx, msg, findRpcCall(lastHeader.id));
break;
case ERROR:
handleError(ctx, msg, findRpcCall(lastHeader.id));
break;
default:
throw new IllegalArgumentException("Unknown RPC message type: " + lastHeader.type);
}
} finally {
lastHeader = null;
}
}
}

private void handleCall(ChannelHandlerContext ctx, Object msg, Class<?> handleClass, Object obj)
throws Exception {
Method handler = handlers.get(msg.getClass());
if (handler == null) {
// Try both getDeclaredMethod() and getMethod() so that we try both private methods
// of the class, and public methods of parent classes.
try {
handler = handleClass.getDeclaredMethod("handle", ChannelHandlerContext.class,
msg.getClass());
} catch (NoSuchMethodException e) {
try {
handler = handleClass.getMethod("handle", ChannelHandlerContext.class,
msg.getClass());
} catch (NoSuchMethodException e2) {
LOG.warn(String.format("[%s] Failed to find handler for msg '%s'.", name(),
msg.getClass().getName()));
writeMessage(MessageType.ERROR, Utils.stackTraceAsString(e.getCause()));
return;
}
}
handler.setAccessible(true);
handlers.put(msg.getClass(), handler);
}

try {
Object payload = handler.invoke(obj, ctx, msg);
if (payload == null) {
payload = new NullMessage();
}
writeMessage(MessageType.REPLY, payload);
} catch (InvocationTargetException ite) {
LOG.debug(String.format("[%s] Error in RPC handler.", name()), ite.getCause());
writeMessage(MessageType.ERROR, Utils.stackTraceAsString(ite.getCause()));
}
}

private void handleReply(ChannelHandlerContext ctx, Object msg, OutstandingRpc rpc) {
rpc.future.setSuccess(msg instanceof NullMessage ? null : msg);
}

private void handleError(ChannelHandlerContext ctx, Object msg, OutstandingRpc rpc) {
if (msg instanceof String) {
LOG.warn("Received error message:{}.", msg);
rpc.future.setFailure(new RpcException((String) msg));
} else {
String error = String.format("Received error with unexpected payload (%s).",
msg != null ? msg.getClass().getName() : null);
LOG.warn(String.format("[%s] %s", name(), error));
rpc.future.setFailure(new IllegalArgumentException(error));
ctx.close();
}
}

private void writeMessage(MessageType replyType, Object payload) {
channel.write(new MessageHeader(lastHeader.id, replyType));
channel.writeAndFlush(payload);
}

private OutstandingRpc findRpcCall(long id) {
for (Iterator<OutstandingRpc> it = rpcCalls.iterator(); it.hasNext();) {
OutstandingRpc rpc = it.next();
if (rpc.id == id) {
it.remove();
return rpc;
}
}
throw new IllegalArgumentException(String.format(
"Received RPC reply for unknown RPC (%d).", id));
}

private void registerRpcCall(long id, Promise<?> promise, String type) {
LOG.debug("[{}] Registered outstanding rpc {} ({}).", name(), id, type);
rpcCalls.add(new OutstandingRpc(id, promise));
}

private void discardRpcCall(long id) {
LOG.debug("[{}] Discarding failed RPC {}.", name(), id);
findRpcCall(id);
}

private static class OutstandingRpc {
final long id;
final Promise<Object> future;

@SuppressWarnings("unchecked")
OutstandingRpc(long id, Promise<?> future) {
this.id = id;
this.future = (Promise<Object>) future;
}
}

public void handleChannelException(ChannelHandlerContext ctx, Throwable cause) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("[%s] Caught exception in channel pipeline.", name()), cause);
} else {
LOG.info(String.format("[%s] Caught exception in channel pipeline.", name()), cause);
}

if (lastHeader != null) {
// There's an RPC waiting for a reply. Exception was most probably caught while processing
// the RPC, so send an error.
channel.write(new MessageHeader(lastHeader.id, MessageType.ERROR));
channel.writeAndFlush(Utils.stackTraceAsString(cause));
lastHeader = null;
}

ctx.close();
}

public void handleChannelInactive() {
if (rpcCalls.size() > 0) {
LOG.warn("[{}] Closing RPC channel with {} outstanding RPCs.", name(), rpcCalls.size());
for (OutstandingRpc rpc : rpcCalls) {
rpc.future.cancel(true);
}
} else {
LOG.debug("Channel {} became inactive.", channel);
}
}

/**
* Send an RPC call to the remote endpoint and returns a future that can be used to monitor the
* operation.
Expand Down Expand Up @@ -269,13 +435,13 @@ public void operationComplete(ChannelFuture cf) {
if (!cf.isSuccess() && !promise.isDone()) {
LOG.warn("Failed to send RPC, closing connection.", cf.cause());
promise.setFailure(cf.cause());
dispatcher.discardRpc(id);
discardRpcCall(id);
close();
}
}
};

dispatcher.registerRpc(id, promise, msg.getClass().getName());
registerRpcCall(id, promise, msg.getClass().getName());
channel.eventLoop().submit(new Runnable() {
@Override
public void run() {
Expand All @@ -294,11 +460,18 @@ public Channel getChannel() {
return channel;
}

public void unRegisterRpc() {
if (dispatcher != null) {
dispatcher.unregisterRpc(channel);
}
}

void setDispatcher(RpcDispatcher dispatcher) {
Utils.checkNotNull(dispatcher);
Utils.checkState(this.dispatcher == null, "Dispatcher already set.");
this.dispatcher = dispatcher;
channel.pipeline().addLast("dispatcher", dispatcher);
dispatcher.registerRpc(channel, this);
}

@Override
Expand Down
Loading

0 comments on commit 66b5833

Please sign in to comment.