diff --git a/conf/spark-blacklist.conf b/conf/spark-blacklist.conf index af30cb1dc..f0919b0b2 100644 --- a/conf/spark-blacklist.conf +++ b/conf/spark-blacklist.conf @@ -14,3 +14,6 @@ spark.submit.deployMode spark.yarn.jar spark.yarn.jars spark.yarn.archive + +# Don't allow users to override the RSC timeout. +livy.rsc.server.idle_timeout diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java b/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java index b8c4ce641..02ce122ca 100644 --- a/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java +++ b/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java @@ -55,11 +55,14 @@ public static enum Entry implements ConfEntry { LAUNCHER_ADDRESS("launcher.address", null), LAUNCHER_PORT("launcher.port", -1), + // How long will the RSC wait for a connection for a Livy server before shutting itself down. + SERVER_IDLE_TIMEOUT("server.idle_timeout", "10m"), + PROXY_USER("proxy_user", null), RPC_SERVER_ADDRESS("rpc.server.address", null), - RPC_CLIENT_HANDSHAKE_TIMEOUT("server.connect.timeout", "90000ms"), - RPC_CLIENT_CONNECT_TIMEOUT("client.connect.timeout", "10000ms"), + RPC_CLIENT_HANDSHAKE_TIMEOUT("server.connect.timeout", "90s"), + RPC_CLIENT_CONNECT_TIMEOUT("client.connect.timeout", "10s"), RPC_CHANNEL_LOG_LEVEL("channel.log.level", null), RPC_MAX_MESSAGE_SIZE("rpc.max.size", 50 * 1024 * 1024), RPC_MAX_THREADS("rpc.threads", 8), @@ -72,7 +75,7 @@ public static enum Entry implements ConfEntry { private final Object dflt; private Entry(String key, Object dflt) { - this.key = "livy.local." + key; + this.key = "livy.rsc." + key; this.dflt = dflt; } diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/driver/RSCDriver.java b/rsc/src/main/java/com/cloudera/livy/rsc/driver/RSCDriver.java index f5ca69779..835d0cd3c 100644 --- a/rsc/src/main/java/com/cloudera/livy/rsc/driver/RSCDriver.java +++ b/rsc/src/main/java/com/cloudera/livy/rsc/driver/RSCDriver.java @@ -34,9 +34,12 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; +import io.netty.util.concurrent.ScheduledFuture; import org.apache.commons.io.FileUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaFutureAction; @@ -83,6 +86,8 @@ public class RSCDriver extends BaseProtocol { protected final SparkConf conf; protected final RSCConf livyConf; + private final AtomicReference> idleTimeout; + public RSCDriver(SparkConf conf, RSCConf livyConf) throws Exception { Set perms = PosixFilePermissions.fromString("rwx------"); this.localTmpDir = Files.createTempDirectory("rsc-tmp", @@ -99,6 +104,7 @@ public RSCDriver(SparkConf conf, RSCConf livyConf) throws Exception { this.activeJobs = new ConcurrentHashMap<>(); this.bypassJobs = new ConcurrentLinkedDeque<>(); + this.idleTimeout = new AtomicReference<>(); } private synchronized void shutdown() { @@ -156,15 +162,8 @@ private void initializeServer() throws Exception { this.server = new RpcServer(livyConf); server.registerClient(clientId, secret, new RpcServer.ClientCallback() { @Override - public RpcDispatcher onNewClient(final Rpc client) { - clients.add(client); - Utils.addListener(client.getChannel().closeFuture(), new FutureListener() { - @Override - public void onSuccess(Void unused) { - clients.remove(client); - } - }); - LOG.debug("Registered new connection from {}.", client.getChannel()); + public RpcDispatcher onNewClient(Rpc client) { + registerClient(client); return RSCDriver.this; } }); @@ -173,12 +172,67 @@ public void onSuccess(Void unused) { Rpc callbackRpc = Rpc.createClient(livyConf, server.getEventLoopGroup(), launcherAddress, launcherPort, clientId, secret, this).get(); try { - // There's no timeout here because we expect the launching side to kill the underlying - // application if it takes too long to connect back and send its address. - callbackRpc.call(new RemoteDriverAddress(server.getAddress(), server.getPort())).get(); + callbackRpc.call(new RemoteDriverAddress(server.getAddress(), server.getPort())).get( + livyConf.getTimeAsMs(RPC_CLIENT_HANDSHAKE_TIMEOUT), TimeUnit.MILLISECONDS); + } catch (TimeoutException te) { + LOG.warn("Timed out sending address to Livy server, shutting down."); + throw te; } finally { callbackRpc.close(); } + + // At this point we install the idle timeout handler, in case the Livy server fails to connect + // back. + setupIdleTimeout(); + } + + private void registerClient(final Rpc client) { + clients.add(client); + stopIdleTimeout(); + + Utils.addListener(client.getChannel().closeFuture(), new FutureListener() { + @Override + public void onSuccess(Void unused) { + clients.remove(client); + setupIdleTimeout(); + } + }); + LOG.debug("Registered new connection from {}.", client.getChannel()); + } + + private void setupIdleTimeout() { + if (clients.size() > 0) { + return; + } + + Runnable timeoutTask = new Runnable() { + @Override + public void run() { + LOG.warn("Shutting down RSC due to idle timeout ({}).", livyConf.get(SERVER_IDLE_TIMEOUT)); + shutdown(); + } + }; + ScheduledFuture timeout = server.getEventLoopGroup().schedule(timeoutTask, + livyConf.getTimeAsMs(SERVER_IDLE_TIMEOUT), TimeUnit.MILLISECONDS); + + // If there's already an idle task registered, then cancel the new one. + if (!this.idleTimeout.compareAndSet(null, timeout)) { + LOG.debug("Timeout task already registered."); + timeout.cancel(false); + } + + // If a new client connected while the idle task was being set up, then stop the task. + if (clients.size() > 0) { + stopIdleTimeout(); + } + } + + private void stopIdleTimeout() { + ScheduledFuture idleTimeout = this.idleTimeout.getAndSet(null); + if (idleTimeout != null) { + LOG.debug("Cancelling idle timeout since new client connected."); + idleTimeout.cancel(false); + } } /** diff --git a/rsc/src/test/java/com/cloudera/livy/rsc/TestSparkClient.java b/rsc/src/test/java/com/cloudera/livy/rsc/TestSparkClient.java index 5f84ef759..fab36d752 100644 --- a/rsc/src/test/java/com/cloudera/livy/rsc/TestSparkClient.java +++ b/rsc/src/test/java/com/cloudera/livy/rsc/TestSparkClient.java @@ -28,6 +28,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.jar.JarOutputStream; import java.util.zip.ZipEntry; @@ -59,6 +60,7 @@ import com.cloudera.livy.test.jobs.FileReader; import com.cloudera.livy.test.jobs.GetCurrentUser; import com.cloudera.livy.test.jobs.SQLGetTweets; +import com.cloudera.livy.test.jobs.Sleeper; import com.cloudera.livy.test.jobs.SmallCount; import static com.cloudera.livy.rsc.RSCConf.Entry.*; @@ -287,12 +289,7 @@ public void testConnectToRunningContext() throws Exception { runTest(false, new TestFunction() { @Override void call(LivyClient client) throws Exception { - ContextInfo ctx = ((RSCClient) client).getContextInfo(); - URI uri = new URI(String.format("rsc://%s:%s@%s:%d", ctx.clientId, ctx.secret, - ctx.remoteAddress, ctx.remotePort)); - - // Close the old client to make sure the driver doesn't go away when it disconnects. - client.stop(false); + URI uri = disconnectClient(client); // If this tries to create a new context, it will fail because it's missing the // needed configuration from createConf(). @@ -306,12 +303,45 @@ void call(LivyClient client) throws Exception { assertEquals("hello", result); } finally { newClient.stop(true); + } + } + }); + } + + @Test + public void testServerIdleTimeout() throws Exception { + runTest(true, new TestFunction() { + @Override + void call(LivyClient client) throws Exception { + // Close the old client and wait a couple of seconds for the timeout to trigger. + URI uri = disconnectClient(client); + TimeUnit.SECONDS.sleep(2); - // Make sure the underlying ContextLauncher is cleaned up properly, since we did - // a "stop(false)" above. - // ((RSCClient) client).getContextInfo().dispose(true); + // Try to connect back with a new client, it should fail. Since there's no API to monitor + // the connection state, we try to enqueue a long-running job and make sure that it fails, + // in case the connection actually goes through. + try { + LivyClient newClient = new LivyClientBuilder() + .setURI(uri) + .build(); + + try { + newClient.submit(new Sleeper(TimeUnit.SECONDS.toMillis(TIMEOUT))) + .get(TIMEOUT, TimeUnit.SECONDS); + } catch (TimeoutException te) { + // Shouldn't have gotten here, but catch this so that we stop the client. + newClient.stop(true); + } + fail("Should have failed to contact RSC after idle timeout."); + } catch (Exception e) { + // Expected. } } + + @Override + void config(Properties conf) { + conf.setProperty(SERVER_IDLE_TIMEOUT.key(), "1s"); + } }); } @@ -379,6 +409,16 @@ private JobHandle.Listener newListener() { return listener; } + private URI disconnectClient(LivyClient client) throws Exception { + ContextInfo ctx = ((RSCClient) client).getContextInfo(); + URI uri = new URI(String.format("rsc://%s:%s@%s:%d", ctx.clientId, ctx.secret, + ctx.remoteAddress, ctx.remotePort)); + + // Close the old client and wait a couple of seconds for the timeout to trigger. + client.stop(false); + return uri; + } + private void runTest(boolean local, TestFunction test) throws Exception { Properties conf = createConf(local); LivyClient client = null; diff --git a/test-lib/src/main/java/com/cloudera/livy/test/jobs/Sleeper.java b/test-lib/src/main/java/com/cloudera/livy/test/jobs/Sleeper.java new file mode 100644 index 000000000..0b365bfa1 --- /dev/null +++ b/test-lib/src/main/java/com/cloudera/livy/test/jobs/Sleeper.java @@ -0,0 +1,38 @@ +/* + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.livy.test.jobs; + +import com.cloudera.livy.Job; +import com.cloudera.livy.JobContext; + +public class Sleeper implements Job { + + private final long millis; + + public Sleeper(long millis) { + this.millis = millis; + } + + @Override + public Void call(JobContext jc) throws Exception { + Thread.sleep(millis); + return null; + } + +}