Skip to content

Commit

Permalink
[FLINK-1878] [APIs] Environments accept a flag that controls sysout l…
Browse files Browse the repository at this point in the history
…ogging during execution.
StephanEwen committed Apr 13, 2015
1 parent 0e6c9d4 commit e79813b
Showing 11 changed files with 119 additions and 49 deletions.
Original file line number Diff line number Diff line change
@@ -55,12 +55,9 @@ public class LocalExecutor extends PlanExecutor {

// ---------------------------------- config options ------------------------------------------


private int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;

private boolean defaultOverwriteFiles = DEFAULT_OVERWRITE;

private boolean printStatusDuringExecution = true;

// --------------------------------------------------------------------------------------------

@@ -85,12 +82,12 @@ public void setDefaultOverwriteFiles(boolean defaultOverwriteFiles) {
this.defaultOverwriteFiles = defaultOverwriteFiles;
}

public void setTaskManagerNumSlots(int taskManagerNumSlots) { this.taskManagerNumSlots = taskManagerNumSlots; }

public int getTaskManagerNumSlots() { return this.taskManagerNumSlots; }
public void setTaskManagerNumSlots(int taskManagerNumSlots) {
this.taskManagerNumSlots = taskManagerNumSlots;
}

public void setPrintStatusDuringExecution(boolean printStatus) {
this.printStatusDuringExecution = printStatus;
public int getTaskManagerNumSlots() {
return this.taskManagerNumSlots;
}

// --------------------------------------------------------------------------------------------
@@ -178,7 +175,8 @@ public JobExecutionResult executePlan(Plan plan) throws Exception {
JobGraphGenerator jgg = new JobGraphGenerator();
JobGraph jobGraph = jgg.compileJobGraph(op);

SerializedJobExecutionResult result = flink.submitJobAndWait(jobGraph, printStatusDuringExecution);
boolean sysoutPrint = isPrintingStatusDuringExecution();
SerializedJobExecutionResult result = flink.submitJobAndWait(jobGraph,sysoutPrint);
return result.toJobExecutionResult(ClassLoader.getSystemClassLoader());
}
finally {
@@ -276,16 +274,4 @@ public static String getPlanAsJSON(Plan plan) {
List<DataSinkNode> sinks = Optimizer.createPreOptimizedPlan(plan);
return gen.getPactPlanAsJSON(sinks);
}

/**
* By default, local environments do not overwrite existing files.
*
* NOTE: This method must be called prior to initializing the LocalExecutor or a
* {@link org.apache.flink.api.java.LocalEnvironment}.
*
* @param overwriteByDefault True to overwrite by default, false to not overwrite by default.
*/
public static void setOverwriteFilesByDefault(boolean overwriteByDefault) {
DEFAULT_OVERWRITE = overwriteByDefault;
}
}
Original file line number Diff line number Diff line change
@@ -16,7 +16,6 @@
* limitations under the License.
*/


package org.apache.flink.client;

import java.io.File;
@@ -39,7 +38,18 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The RemoteExecutor is a {@link org.apache.flink.api.common.PlanExecutor} that takes the program
* and ships it to a remote Flink cluster for execution.
*
* The RemoteExecutor is pointed at the JobManager and gets the program and (if necessary) the
* set of libraries that need to be shipped together with the program.
*
* The RemoteExecutor is used in the {@link org.apache.flink.api.java.RemoteEnvironment} to
* remotely execute program parts.
*/
public class RemoteExecutor extends PlanExecutor {

private static final Logger LOG = LoggerFactory.getLogger(RemoteExecutor.class);

private final List<String> jarFiles;
@@ -65,22 +75,6 @@ public RemoteExecutor(InetSocketAddress inet, List<String> jarFiles) {
this.jarFiles = jarFiles;
this.address = inet;
}

public static InetSocketAddress getInetFromHostport(String hostport) {
// from http://stackoverflow.com/questions/2345063/java-common-way-to-validate-and-convert-hostport-to-inetsocketaddress
URI uri;
try {
uri = new URI("my://" + hostport);
} catch (URISyntaxException e) {
throw new RuntimeException("Could not identify hostname and port", e);
}
String host = uri.getHost();
int port = uri.getPort();
if (host == null || port == -1) {
throw new RuntimeException("Could not identify hostname and port");
}
return new InetSocketAddress(host, port);
}

@Override
public JobExecutionResult executePlan(Plan plan) throws Exception {
@@ -90,8 +84,10 @@ public JobExecutionResult executePlan(Plan plan) throws Exception {

public JobExecutionResult executePlanWithJars(JobWithJars p) throws Exception {
Client c = new Client(this.address, new Configuration(), p.getUserCodeClassLoader(), -1);
c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());

JobSubmissionResult result = c.run(p, -1, true);
if(result instanceof JobExecutionResult) {
if (result instanceof JobExecutionResult) {
return (JobExecutionResult) result;
} else {
LOG.warn("The Client didn't return a JobExecutionResult");
@@ -104,6 +100,8 @@ public JobExecutionResult executeJar(String jarPath, String assemblerClass, Stri
PackagedProgram program = new PackagedProgram(jarFile, assemblerClass, args);

Client c = new Client(this.address, new Configuration(), program.getUserCodeClassLoader(), -1);
c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());

JobSubmissionResult result = c.run(program.getPlanWithJars(), -1, true);
if(result instanceof JobExecutionResult) {
return (JobExecutionResult) result;
@@ -122,4 +120,24 @@ public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
return jsonGen.getOptimizerPlanAsJSON(op);
}

// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
public static InetSocketAddress getInetFromHostport(String hostport) {
// from http://stackoverflow.com/questions/2345063/java-common-way-to-validate-and-convert-hostport-to-inetsocketaddress
URI uri;
try {
uri = new URI("my://" + hostport);
} catch (URISyntaxException e) {
throw new RuntimeException("Could not identify hostname and port", e);
}
String host = uri.getHost();
int port = uri.getPort();
if (host == null || port == -1) {
throw new RuntimeException("Could not identify hostname and port");
}
return new InetSocketAddress(host, port);
}

}
Original file line number Diff line number Diff line change
@@ -76,6 +76,9 @@ public class ExecutionConfig implements Serializable {

private boolean serializeGenericTypesWithAvro = false;

/** If set to true, progress updates are printed to System.out during execution */
private boolean printProgressDuringExecution = true;

// Serializers and types registered with Kryo and the PojoSerializer
// we store them in lists to ensure they are registered in order in all kryo instances.

@@ -281,8 +284,6 @@ public boolean serializeGenericTypesWithAvro() {
return serializeGenericTypesWithAvro;
}



/**
* Enables reusing objects that Flink internally uses for deserialization and passing
* data to user-code functions. Keep in mind that this can lead to bugs when the
@@ -309,6 +310,35 @@ public boolean isObjectReuseEnabled() {
return objectReuse;
}

/**
* Enables the printing of progress update messages to {@code System.out}
*
* @return The ExecutionConfig object, to allow for function chaining.
*/
public ExecutionConfig enableSysoutLogging() {
this.printProgressDuringExecution = true;
return this;
}

/**
* Disables the printing of progress update messages to {@code System.out}
*
* @return The ExecutionConfig object, to allow for function chaining.
*/
public ExecutionConfig disableSysoutLogging() {
this.printProgressDuringExecution = false;
return this;
}

/**
* Gets whether progress update messages should be printed to {@code System.out}
*
* @return True, if progress update messages should be printed, false otherwise.
*/
public boolean isSysoutLoggingEnabled() {
return this.printProgressDuringExecution;
}

// --------------------------------------------------------------------------------------------
// Registry for types and serializers
// --------------------------------------------------------------------------------------------
Original file line number Diff line number Diff line change
@@ -27,13 +27,34 @@

/**
* A PlanExecutor runs a plan. The specific implementation (such as the org.apache.flink.client.LocalExecutor
* and org.apache.flink.client.RemoteExecutor) determines where and how to run the plan.
* and org.apache.flink.client.RemoteExecutor) determines where and how to run the plan.
*
* The concrete implementations are loaded dynamically, because they depend on the full set of
* dependencies of all runtime classes.
*/
public abstract class PlanExecutor {

private static final String LOCAL_EXECUTOR_CLASS = "org.apache.flink.client.LocalExecutor";
private static final String REMOTE_EXECUTOR_CLASS = "org.apache.flink.client.RemoteExecutor";

// ------------------------------------------------------------------------
// Config Options
// ------------------------------------------------------------------------

/** If true, all execution progress updates are not only logged, but also printed to System.out */
private boolean printUpdatesToSysout = true;

public void setPrintStatusDuringExecution(boolean printStatus) {
this.printUpdatesToSysout = printStatus;
}

public boolean isPrintingStatusDuringExecution() {
return this.printUpdatesToSysout;
}

// ------------------------------------------------------------------------
// Program Execution
// ------------------------------------------------------------------------

/**
* Execute the given plan and return the runtime in milliseconds.
@@ -55,7 +76,11 @@ public abstract class PlanExecutor {
* @throws Exception Thrown, if the executor could not connect to the compiler.
*/
public abstract String getOptimizerPlanAsJSON(Plan plan) throws Exception;



// ------------------------------------------------------------------------
// Executor Factories
// ------------------------------------------------------------------------

/**
* Creates an executor that runs the plan locally in a multi-threaded environment.
@@ -69,7 +94,8 @@ public static PlanExecutor createLocalExecutor(Configuration configuration) {
return leClass.getConstructor(Configuration.class).newInstance(configuration);
}
catch (Throwable t) {
throw new RuntimeException("An error occurred while loading the local executor (" + LOCAL_EXECUTOR_CLASS + ").", t);
throw new RuntimeException("An error occurred while loading the local executor ("
+ LOCAL_EXECUTOR_CLASS + ").", t);
}
}

@@ -93,13 +119,15 @@ public static PlanExecutor createRemoteExecutor(String hostname, int port, Strin

Class<? extends PlanExecutor> reClass = loadExecutorClass(REMOTE_EXECUTOR_CLASS);

List<String> files = (jarFiles == null || jarFiles.length == 0) ? Collections.<String>emptyList() : Arrays.asList(jarFiles);
List<String> files = (jarFiles == null || jarFiles.length == 0) ? Collections.<String>emptyList()
: Arrays.asList(jarFiles);

try {
return reClass.getConstructor(String.class, int.class, List.class).newInstance(hostname, port, files);
}
catch (Throwable t) {
throw new RuntimeException("An error occurred while loading the remote executor (" + REMOTE_EXECUTOR_CLASS + ").", t);
throw new RuntimeException("An error occurred while loading the remote executor ("
+ REMOTE_EXECUTOR_CLASS + ").", t);
}
}

@@ -109,7 +137,8 @@ private static Class<? extends PlanExecutor> loadExecutorClass(String className)
return leClass.asSubclass(PlanExecutor.class);
}
catch (ClassNotFoundException cnfe) {
throw new RuntimeException("Could not load the executor class (" + className + "). Do you have the 'flink-clients' project in your dependencies?");
throw new RuntimeException("Could not load the executor class (" + className
+ "). Do you have the 'flink-clients' project in your dependencies?");
}
catch (Throwable t) {
throw new RuntimeException("An error occurred while loading the executor (" + className + ").", t);
Original file line number Diff line number Diff line change
@@ -50,6 +50,7 @@ public JobExecutionResult execute(String jobName) throws Exception {
Plan p = createProgramPlan(jobName);

PlanExecutor executor = PlanExecutor.createLocalExecutor(configuration);
executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
return executor.executePlan(p);
}

Original file line number Diff line number Diff line change
@@ -66,6 +66,7 @@ public JobExecutionResult execute(String jobName) throws Exception {
Plan p = createProgramPlan(jobName);

PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, jarFiles);
executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
return executor.executePlan(p);
}

Original file line number Diff line number Diff line change
@@ -51,6 +51,7 @@ public static void main(String[] args) throws Exception {

ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
env.setParallelism(parallelism);
env.getConfig().disableSysoutLogging();

DataSet<Integer> data = env.createInput(new CustomInputFormat());

Original file line number Diff line number Diff line change
@@ -68,6 +68,7 @@ public static void main(String[] args) throws Exception {

ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
env.setParallelism(parallelism);
env.getConfig().disableSysoutLogging();

// get input data
DataSet<Point> points = env.fromElements(pointsData.split("\n"))
Original file line number Diff line number Diff line change
@@ -37,7 +37,8 @@ public static void main(String[] args) throws Exception {
final int port = Integer.parseInt(args[2]);

StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);

env.getConfig().disableSysoutLogging();

DataStream<String> text = env.fromElements(WordCountData.TEXT);

DataStream<Word> counts =
Original file line number Diff line number Diff line change
@@ -67,6 +67,7 @@ public void testProgram(int jobManagerPort, final File coordinateDir) throws Exc
env.setParallelism(PARALLELISM);
env.setNumberOfExecutionRetries(1);
env.getConfig().setExecutionMode(executionMode);
env.getConfig().disableSysoutLogging();

final long NUM_ELEMENTS = 100000L;
final DataSet<Long> result = env.generateSequence(1, NUM_ELEMENTS)
Original file line number Diff line number Diff line change
@@ -72,6 +72,7 @@ public void testProgram(int jobManagerPort, final File coordinateDir) throws Exc
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createRemoteEnvironment("localhost", jobManagerPort);
env.setParallelism(PARALLELISM);
env.getConfig().disableSysoutLogging();
env.setNumberOfExecutionRetries(1);
env.enableCheckpointing(200);

0 comments on commit e79813b

Please sign in to comment.