Skip to content

Commit

Permalink
LIVY-149. Add idle timeout to RSC server.
Browse files Browse the repository at this point in the history
The RSC server currently listens indefinitely waiting for a Livy server
to connect to it. That can result in sessions that never go away and need
to be manually cleaned up when the Livy server crashes or is otherwise
uncleanly shut down.

So add a timeout, default 10 minutes, that allows sessions to shut themselves
down if the Livy server goes away and doesn't come back.

Closes apache#137
  • Loading branch information
Marcelo Vanzin committed May 17, 2016
1 parent 386663d commit 816976f
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 24 deletions.
3 changes: 3 additions & 0 deletions conf/spark-blacklist.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@ spark.submit.deployMode
spark.yarn.jar
spark.yarn.jars
spark.yarn.archive

# Don't allow users to override the RSC timeout.
livy.rsc.server.idle_timeout
9 changes: 6 additions & 3 deletions rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,14 @@ public static enum Entry implements ConfEntry {
LAUNCHER_ADDRESS("launcher.address", null),
LAUNCHER_PORT("launcher.port", -1),

// How long will the RSC wait for a connection for a Livy server before shutting itself down.
SERVER_IDLE_TIMEOUT("server.idle_timeout", "10m"),

PROXY_USER("proxy_user", null),

RPC_SERVER_ADDRESS("rpc.server.address", null),
RPC_CLIENT_HANDSHAKE_TIMEOUT("server.connect.timeout", "90000ms"),
RPC_CLIENT_CONNECT_TIMEOUT("client.connect.timeout", "10000ms"),
RPC_CLIENT_HANDSHAKE_TIMEOUT("server.connect.timeout", "90s"),
RPC_CLIENT_CONNECT_TIMEOUT("client.connect.timeout", "10s"),
RPC_CHANNEL_LOG_LEVEL("channel.log.level", null),
RPC_MAX_MESSAGE_SIZE("rpc.max.size", 50 * 1024 * 1024),
RPC_MAX_THREADS("rpc.threads", 8),
Expand All @@ -72,7 +75,7 @@ public static enum Entry implements ConfEntry {
private final Object dflt;

private Entry(String key, Object dflt) {
this.key = "livy.local." + key;
this.key = "livy.rsc." + key;
this.dflt = dflt;
}

Expand Down
78 changes: 66 additions & 12 deletions rsc/src/main/java/com/cloudera/livy/rsc/driver/RSCDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.ScheduledFuture;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaFutureAction;
Expand Down Expand Up @@ -83,6 +86,8 @@ public class RSCDriver extends BaseProtocol {
protected final SparkConf conf;
protected final RSCConf livyConf;

private final AtomicReference<ScheduledFuture<?>> idleTimeout;

public RSCDriver(SparkConf conf, RSCConf livyConf) throws Exception {
Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rwx------");
this.localTmpDir = Files.createTempDirectory("rsc-tmp",
Expand All @@ -99,6 +104,7 @@ public RSCDriver(SparkConf conf, RSCConf livyConf) throws Exception {

this.activeJobs = new ConcurrentHashMap<>();
this.bypassJobs = new ConcurrentLinkedDeque<>();
this.idleTimeout = new AtomicReference<>();
}

private synchronized void shutdown() {
Expand Down Expand Up @@ -156,15 +162,8 @@ private void initializeServer() throws Exception {
this.server = new RpcServer(livyConf);
server.registerClient(clientId, secret, new RpcServer.ClientCallback() {
@Override
public RpcDispatcher onNewClient(final Rpc client) {
clients.add(client);
Utils.addListener(client.getChannel().closeFuture(), new FutureListener<Void>() {
@Override
public void onSuccess(Void unused) {
clients.remove(client);
}
});
LOG.debug("Registered new connection from {}.", client.getChannel());
public RpcDispatcher onNewClient(Rpc client) {
registerClient(client);
return RSCDriver.this;
}
});
Expand All @@ -173,12 +172,67 @@ public void onSuccess(Void unused) {
Rpc callbackRpc = Rpc.createClient(livyConf, server.getEventLoopGroup(),
launcherAddress, launcherPort, clientId, secret, this).get();
try {
// There's no timeout here because we expect the launching side to kill the underlying
// application if it takes too long to connect back and send its address.
callbackRpc.call(new RemoteDriverAddress(server.getAddress(), server.getPort())).get();
callbackRpc.call(new RemoteDriverAddress(server.getAddress(), server.getPort())).get(
livyConf.getTimeAsMs(RPC_CLIENT_HANDSHAKE_TIMEOUT), TimeUnit.MILLISECONDS);
} catch (TimeoutException te) {
LOG.warn("Timed out sending address to Livy server, shutting down.");
throw te;
} finally {
callbackRpc.close();
}

// At this point we install the idle timeout handler, in case the Livy server fails to connect
// back.
setupIdleTimeout();
}

private void registerClient(final Rpc client) {
clients.add(client);
stopIdleTimeout();

Utils.addListener(client.getChannel().closeFuture(), new FutureListener<Void>() {
@Override
public void onSuccess(Void unused) {
clients.remove(client);
setupIdleTimeout();
}
});
LOG.debug("Registered new connection from {}.", client.getChannel());
}

private void setupIdleTimeout() {
if (clients.size() > 0) {
return;
}

Runnable timeoutTask = new Runnable() {
@Override
public void run() {
LOG.warn("Shutting down RSC due to idle timeout ({}).", livyConf.get(SERVER_IDLE_TIMEOUT));
shutdown();
}
};
ScheduledFuture<?> timeout = server.getEventLoopGroup().schedule(timeoutTask,
livyConf.getTimeAsMs(SERVER_IDLE_TIMEOUT), TimeUnit.MILLISECONDS);

// If there's already an idle task registered, then cancel the new one.
if (!this.idleTimeout.compareAndSet(null, timeout)) {
LOG.debug("Timeout task already registered.");
timeout.cancel(false);
}

// If a new client connected while the idle task was being set up, then stop the task.
if (clients.size() > 0) {
stopIdleTimeout();
}
}

private void stopIdleTimeout() {
ScheduledFuture<?> idleTimeout = this.idleTimeout.getAndSet(null);
if (idleTimeout != null) {
LOG.debug("Cancelling idle timeout since new client connected.");
idleTimeout.cancel(false);
}
}

/**
Expand Down
58 changes: 49 additions & 9 deletions rsc/src/test/java/com/cloudera/livy/rsc/TestSparkClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.jar.JarOutputStream;
import java.util.zip.ZipEntry;

Expand Down Expand Up @@ -59,6 +60,7 @@
import com.cloudera.livy.test.jobs.FileReader;
import com.cloudera.livy.test.jobs.GetCurrentUser;
import com.cloudera.livy.test.jobs.SQLGetTweets;
import com.cloudera.livy.test.jobs.Sleeper;
import com.cloudera.livy.test.jobs.SmallCount;
import static com.cloudera.livy.rsc.RSCConf.Entry.*;

Expand Down Expand Up @@ -287,12 +289,7 @@ public void testConnectToRunningContext() throws Exception {
runTest(false, new TestFunction() {
@Override
void call(LivyClient client) throws Exception {
ContextInfo ctx = ((RSCClient) client).getContextInfo();
URI uri = new URI(String.format("rsc://%s:%s@%s:%d", ctx.clientId, ctx.secret,
ctx.remoteAddress, ctx.remotePort));

// Close the old client to make sure the driver doesn't go away when it disconnects.
client.stop(false);
URI uri = disconnectClient(client);

// If this tries to create a new context, it will fail because it's missing the
// needed configuration from createConf().
Expand All @@ -306,12 +303,45 @@ void call(LivyClient client) throws Exception {
assertEquals("hello", result);
} finally {
newClient.stop(true);
}
}
});
}

@Test
public void testServerIdleTimeout() throws Exception {
runTest(true, new TestFunction() {
@Override
void call(LivyClient client) throws Exception {
// Close the old client and wait a couple of seconds for the timeout to trigger.
URI uri = disconnectClient(client);
TimeUnit.SECONDS.sleep(2);

// Make sure the underlying ContextLauncher is cleaned up properly, since we did
// a "stop(false)" above.
// ((RSCClient) client).getContextInfo().dispose(true);
// Try to connect back with a new client, it should fail. Since there's no API to monitor
// the connection state, we try to enqueue a long-running job and make sure that it fails,
// in case the connection actually goes through.
try {
LivyClient newClient = new LivyClientBuilder()
.setURI(uri)
.build();

try {
newClient.submit(new Sleeper(TimeUnit.SECONDS.toMillis(TIMEOUT)))
.get(TIMEOUT, TimeUnit.SECONDS);
} catch (TimeoutException te) {
// Shouldn't have gotten here, but catch this so that we stop the client.
newClient.stop(true);
}
fail("Should have failed to contact RSC after idle timeout.");
} catch (Exception e) {
// Expected.
}
}

@Override
void config(Properties conf) {
conf.setProperty(SERVER_IDLE_TIMEOUT.key(), "1s");
}
});
}

Expand Down Expand Up @@ -379,6 +409,16 @@ private <T> JobHandle.Listener<T> newListener() {
return listener;
}

private URI disconnectClient(LivyClient client) throws Exception {
ContextInfo ctx = ((RSCClient) client).getContextInfo();
URI uri = new URI(String.format("rsc://%s:%s@%s:%d", ctx.clientId, ctx.secret,
ctx.remoteAddress, ctx.remotePort));

// Close the old client and wait a couple of seconds for the timeout to trigger.
client.stop(false);
return uri;
}

private void runTest(boolean local, TestFunction test) throws Exception {
Properties conf = createConf(local);
LivyClient client = null;
Expand Down
38 changes: 38 additions & 0 deletions test-lib/src/main/java/com/cloudera/livy/test/jobs/Sleeper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.cloudera.livy.test.jobs;

import com.cloudera.livy.Job;
import com.cloudera.livy.JobContext;

public class Sleeper implements Job<Void> {

private final long millis;

public Sleeper(long millis) {
this.millis = millis;
}

@Override
public Void call(JobContext jc) throws Exception {
Thread.sleep(millis);
return null;
}

}

0 comments on commit 816976f

Please sign in to comment.