title |
---|
Cluster Execution |
Flink programs can run distributed on clusters of many machines. There are two ways to send a program to a cluster for execution:
The command line interface lets you submit packaged programs (JARs) to a cluster (or single machine setup).
Please refer to the Command Line Interface documentation for details.
The remote environment lets you execute Flink Java programs on a cluster directly. The remote environment points to the cluster on which you want to execute the program.
If you are developing your program as a Maven project, you have to add the
flink-clients
module using this dependency:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>{{ site.FLINK_VERSION_STABLE }}</version>
</dependency>
The following illustrates the use of the RemoteEnvironment
:
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment
.createRemoteEnvironment("strato-master", "7661", "/home/user/udfs.jar");
DataSet<String> data = env.readTextFile("hdfs://path/to/file");
data
.filter(new FilterFunction<String>() {
public boolean filter(String value) {
return value.startsWith("http://");
}
})
.writeAsText("hdfs://path/to/result");
env.execute();
}
Note that the program contains custom UDFs and hence requires a JAR file with the classes of the code attached. The constructor of the remote environment takes the path(s) to the JAR file(s).
Similar to the RemoteEnvironment, the RemoteExecutor lets you execute Flink programs on a cluster directly. The remote executor accepts a Plan object, which describes the program as a single executable unit.
If you are developing your program in a Maven project, you have to add the
flink-clients
module using this dependency:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>{{ site.FLINK_VERSION_STABLE }}</version>
</dependency>
The following illustrates the use of the RemoteExecutor
with the Scala API:
def main(args: Array[String]) {
val input = TextFile("hdfs://path/to/file")
val words = input flatMap { _.toLowerCase().split("""\W+""") filter { _ != "" } }
val counts = words groupBy { x => x } count()
val output = counts.write(wordsOutput, CsvOutputFormat())
val plan = new ScalaPlan(Seq(output), "Word Count")
val executor = new RemoteExecutor("strato-master", 7881, "/path/to/jarfile.jar")
executor.executePlan(p);
}
The following illustrates the use of the RemoteExecutor
with the Java API (as
an alternative to the RemoteEnvironment):
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> data = env.readTextFile("hdfs://path/to/file");
data
.filter(new FilterFunction<String>() {
public boolean filter(String value) {
return value.startsWith("http://");
}
})
.writeAsText("hdfs://path/to/result");
Plan p = env.createProgramPlan();
RemoteExecutor e = new RemoteExecutor("strato-master", 7881, "/path/to/jarfile.jar");
e.executePlan(p);
}
Note that the program contains custom UDFs and hence requires a JAR file with the classes of the code attached. The constructor of the remote executor takes the path(s) to the JAR file(s).