Skip to content

Commit

Permalink
LIVY-110, LIVY-93. Simplify RSC driver class hierarchy.
Browse files Browse the repository at this point in the history
The separation between "client-mode" driver and REPL driver was a little
hacky after recent changes. This patch aims at simplifying things.

Now, there is a single entry point to the RSC (RSCDriverBootstrapper),
and the actual driver implementation (which must extend RSCDriver, the
new name of RemoteDriver) can be configured.

The driver is responsible for handling RPC messages, so it must provide
"handle()" methods for any messages that might arrive. RSCDriver handles
all messages directed at the client-mode driver. The REPL driver extends
it and adds handlers to REPL-specific messages, and re-uses the client-mode
logic for running the PingJob used to detect when the session is ready.
This simplified code a lot.

On top of that I made some other clean up changes:

- the code that launches the driver in-process and out-of-process was
  slightly modified to share more setup code.
- when jobs send results back to the client, all clients now receive
  the message. This code path is mostly unused in Livy (most uses are in
  the unit tests), since the Livy API uses bypass jobs which have different
  logic. In any case, there should be very few, if any, situations when
  there are two clients connected to a remote driver.
- fixed some sketchy error handling code, especially in the shutdown
  path of the driver.

Closes apache#101
  • Loading branch information
Marcelo Vanzin committed Apr 11, 2016
1 parent 8b63520 commit 8cdc459
Show file tree
Hide file tree
Showing 19 changed files with 623 additions and 727 deletions.
6 changes: 5 additions & 1 deletion api/src/main/java/com/cloudera/livy/LivyClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,11 @@ public LivyClientBuilder setURI(URI uri) {
}

public LivyClientBuilder setConf(String key, String value) {
config.setProperty(key, value);
if (value != null) {
config.setProperty(key, value);
} else {
config.remove(key);
}
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,4 +220,18 @@ public static class GetReplState {

}

public static class InitializationError {

public final String stackTrace;

public InitializationError(String stackTrace) {
this.stackTrace = stackTrace;
}

public InitializationError() {
this(null);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.cloudera.livy.client.local.driver.RemoteDriver;
import com.cloudera.livy.client.local.driver.RSCDriverBootstrapper;
import com.cloudera.livy.client.local.rpc.Rpc;
import com.cloudera.livy.client.local.rpc.RpcDispatcher;
import com.cloudera.livy.client.local.rpc.RpcServer;
Expand Down Expand Up @@ -84,13 +84,7 @@ class ContextLauncher implements ContextInfo {
factory.getServer().registerClient(clientId, secret, handler);
String replMode = conf.get("repl");
boolean repl = replMode != null && replMode.equals("true");
String className;
if (conf.getBoolean(CLIENT_REPL_MODE)) {
className = "com.cloudera.livy.repl.ReplDriver";
} else {
className = RemoteDriver.class.getName();
}
this.child = startDriver(factory.getServer(), conf, clientId, secret, className);
this.child = startDriver(factory.getServer(), conf, clientId, secret);

// Wait for the handler to receive the driver information. Wait a little at a time so
// that we can check whether the child process is still alive, and throw an error if the
Expand Down Expand Up @@ -155,42 +149,63 @@ private static ChildProcess startDriver(
final RpcServer rpcServer,
final LocalConf conf,
final String clientId,
final String secret,
final String className) throws IOException {
final String serverAddress = rpcServer.getAddress();
final String serverPort = String.valueOf(rpcServer.getPort());
final String secret) throws IOException {
// Write out the config file used by the remote context.
conf.set(LAUNCHER_ADDRESS, rpcServer.getAddress());
conf.set(LAUNCHER_PORT, rpcServer.getPort());
conf.set(CLIENT_ID, clientId);
conf.set(CLIENT_SECRET, secret);

String livyJars = conf.get(LIVY_JARS);
if (livyJars == null) {
String livyHome = System.getenv("LIVY_HOME");
Preconditions.checkState(livyHome != null,
"Need one of LIVY_HOME or %s set.", LIVY_JARS.key());
File clientJars = new File(livyHome, "client-jars");
Preconditions.checkState(clientJars.isDirectory(),
"Cannot find 'client-jars' directory under LIVY_HOME.");
List<String> jars = new ArrayList<>();
for (File f : clientJars.listFiles()) {
jars.add(f.getAbsolutePath());
}
livyJars = Joiner.on(",").join(jars);
}
merge(conf, SPARK_JARS_KEY, livyJars, ",");

if ("sparkr".equals(conf.get("session.kind"))) {
merge(conf, SPARK_ARCHIVES_KEY, conf.get(LocalConf.Entry.SPARKR_PACKAGE), ",");
}

// Disable multiple attempts since the RPC server doesn't yet support multiple
// connections for the same registered app.
conf.set("spark.yarn.maxAppAttempts", "1");

// For testing; propagate jacoco settings so that we also do coverage analysis
// on the launched driver. We replace the name of the main file ("main.exec")
// so that we don't end up fighting with the main test launcher.
String jacocoArgs = System.getProperty("jacoco.args");
if (jacocoArgs != null) {
jacocoArgs = jacocoArgs.replace("main.exec", "child.exec");
merge(conf, SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, jacocoArgs, " ");
}

final File confFile = writeConfToFile(conf);

if (conf.get(CLIENT_IN_PROCESS) != null) {
// Mostly for testing things quickly. Do not do this in production.
LOG.warn("!!!! Running remote driver in-process. !!!!");
Runnable child = new Runnable() {
@Override
public void run() {
List<String> args = new ArrayList<>();
args.add("--remote-host");
args.add(serverAddress);
args.add("--remote-port");
args.add(serverPort);
args.add("--client-id");
args.add(clientId);
args.add("--secret");
args.add(secret);

for (Map.Entry<String, String> e : conf) {
args.add("--conf");
args.add(String.format("%s=%s", e.getKey(), e.getValue()));
}
try {
RemoteDriver.main(args.toArray(new String[args.size()]));
RSCDriverBootstrapper.main(new String[] { confFile.getAbsolutePath() });
} catch (Exception e) {
LOG.error("Error running driver.", e);
throw Throwables.propagate(e);
}
}
};
return new ChildProcess(conf, child);
return new ChildProcess(conf, child, confFile);
} else {
// If a Spark installation is provided, use the spark-submit script. Otherwise, call the
// SparkSubmit class directly, which has some caveats (like having to provide a proper
// version of Guava on the classpath depending on the deploy mode).
final SparkLauncher launcher = new SparkLauncher();
String sparkHome = conf.get(SPARK_HOME_KEY);
if (sparkHome == null) {
Expand All @@ -201,85 +216,30 @@ public void run() {
sparkHome = System.getProperty(SPARK_HOME_KEY);
}
launcher.setSparkHome(sparkHome);
conf.set(CLIENT_ID, clientId);
conf.set(CLIENT_SECRET, secret);

launcher.setAppResource("spark-internal");

String livyJars = conf.get(LIVY_JARS);
if (livyJars == null) {
String livyHome = System.getenv("LIVY_HOME");
Preconditions.checkState(livyHome != null,
"Need one of LIVY_HOME or %s set.", LIVY_JARS.key());
File clientJars = new File(livyHome, "client-jars");
Preconditions.checkState(clientJars.isDirectory(),
"Cannot find 'client-jars' directory under LIVY_HOME.");
List<String> jars = new ArrayList<>();
for (File f : clientJars.listFiles()) {
jars.add(f.getAbsolutePath());
}
livyJars = Joiner.on(",").join(jars);
}

String userJars = conf.get(SPARK_JARS_KEY);
String userArchives = conf.get(SPARK_ARCHIVES_KEY);

if ("sparkr".equals(conf.get("session.kind"))) {
String sparkRArchives = conf.get(LocalConf.Entry.SPARKR_PACKAGE);
if (userArchives != null) {
String archives = Joiner.on(",").join(sparkRArchives, userArchives);
conf.set(SPARK_ARCHIVES_KEY, archives);
} else {
conf.set(SPARK_ARCHIVES_KEY, sparkRArchives);
}
}

if (userJars != null) {
String allJars = Joiner.on(",").join(livyJars, userJars);
conf.set(SPARK_JARS_KEY, allJars);
} else {
conf.set(SPARK_JARS_KEY, livyJars);
}

// For testing; propagate jacoco settings so that we also do coverage analysis
// on the launched driver. We replace the name of the main file ("main.exec")
// so that we don't end up fighting with the main test launcher.
String jacocoArgs = System.getProperty("jacoco.args");
if (jacocoArgs != null) {
jacocoArgs = jacocoArgs.replace("main.exec", "child.exec");
String userArgs = conf.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS);
if (userArgs != null) {
userArgs = userArgs + " " + jacocoArgs;
} else {
userArgs = jacocoArgs;
}
conf.set(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, userArgs);
}

// Disable multiple attempts since the RPC server doesn't yet support multiple
// connections for the same registered app.
conf.set("spark.yarn.maxAppAttempts", "1");

File confFile = writeConfToFile(conf);

// Define how to pass options to the child process. If launching in client (or local)
// mode, the driver options need to be passed directly on the command line. Otherwise,
// SparkSubmit will take care of that for us.
String master = conf.get("spark.master");
Preconditions.checkArgument(master != null, "spark.master is not defined.");
launcher.setMaster(master);
launcher.setPropertiesFile(confFile.getAbsolutePath());
launcher.setMainClass(className);
launcher.setMainClass(RSCDriverBootstrapper.class.getName());
if (conf.get(PROXY_USER) != null) {
launcher.addSparkArg("--proxy-user", conf.get(PROXY_USER));
}
launcher.addAppArgs("--remote-host", serverAddress);
launcher.addAppArgs("--remote-port", serverPort);

return new ChildProcess(conf, launcher.launch());
return new ChildProcess(conf, launcher.launch(), confFile);
}
}

private static void merge(LocalConf conf, String key, String livyConf, String sep) {
String confValue = Joiner.on(sep).skipNulls().join(livyConf, conf.get(key));
conf.set(key, confValue);
}

/**
* Write the configuration to a file readable only by the process's owner. Livy properties
* are written with an added prefix so that they can be loaded using SparkConf on the driver
Expand Down Expand Up @@ -375,23 +335,26 @@ private static class ChildProcess {
private final Thread monitor;
private final Thread stdout;
private final Thread stderr;
private final File confFile;
private volatile boolean childFailed;

public ChildProcess(LocalConf conf, Runnable child) {
public ChildProcess(LocalConf conf, Runnable child, File confFile) {
this.conf = conf;
this.monitor = monitor(child, CHILD_IDS.incrementAndGet());
this.child = null;
this.stdout = null;
this.stderr = null;
this.confFile = confFile;
this.childFailed = false;
}

public ChildProcess(LocalConf conf, final Process childProc) {
public ChildProcess(LocalConf conf, final Process childProc, File confFile) {
int childId = CHILD_IDS.incrementAndGet();
this.conf = conf;
this.child = childProc;
this.stdout = redirect("stdout-redir-" + childId, child.getInputStream());
this.stderr = redirect("stderr-redir-" + childId, child.getErrorStream());
this.confFile = confFile;
this.childFailed = false;

Runnable monitorTask = new Runnable() {
Expand Down Expand Up @@ -470,8 +433,18 @@ private Thread redirect(String name, InputStream in) {
return thread;
}

private Thread monitor(Runnable task, int childId) {
Thread thread = new Thread(task);
private Thread monitor(final Runnable task, int childId) {
Runnable wrappedTask = new Runnable() {
@Override
public void run() {
try {
task.run();
} finally {
confFile.delete();
}
}
};
Thread thread = new Thread(wrappedTask);
thread.setDaemon(true);
thread.setName("ContextLauncher-" + childId);
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,8 @@ Future<?> endSession() {
return driverRpc.call(new EndSession());
}

private void handle(ChannelHandlerContext ctx, java.lang.Error msg) {
LOG.warn("Error reported from remote driver.", msg.getCause());
private void handle(ChannelHandlerContext ctx, InitializationError msg) {
LOG.warn("Error reported from remote driver: %s", msg.stackTrace);
}

private void handle(ChannelHandlerContext ctx, JobResult msg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,15 @@ public static enum Entry implements ConfEntry {
CLIENT_SECRET("client.auth.secret", null),
CLIENT_IN_PROCESS("client.do_not_use.run_driver_in_process", null),
CLIENT_SHUTDOWN_TIMEOUT("client.shutdown_timeout", "10s"),
CLIENT_REPL_MODE("repl", false),
DRIVER_CLASS("driver_class", null),

LIVY_JARS("jars", null),
SPARKR_PACKAGE("sparkr.package", null),

// Address for the RSC driver to connect back with it's connection info.
LAUNCHER_ADDRESS("launcher.address", null),
LAUNCHER_PORT("launcher.port", -1),

PROXY_USER("proxy_user", null),

RPC_SERVER_ADDRESS("rpc.server.address", null),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class BypassJobWrapper extends JobWrapper<byte[]> {
private volatile JobHandle.State state;
private volatile List<Integer> newSparkJobs;

BypassJobWrapper(Driver driver, DriverProtocol client, String jobId, byte[] serializedJob) {
super(driver, client, jobId, new BypassJob(driver.serializer, serializedJob));
BypassJobWrapper(RSCDriver driver, String jobId, byte[] serializedJob) {
super(driver, jobId, new BypassJob(driver.serializer(), serializedJob));
state = JobHandle.State.QUEUED;
}

Expand Down
Loading

0 comments on commit 8cdc459

Please sign in to comment.