Skip to content

Commit

Permalink
[FLINK-1230] Add documentation and an example for collection-based ex…
Browse files Browse the repository at this point in the history
…ecution

This closes apache#195
  • Loading branch information
rmetzger committed Nov 13, 2014
1 parent b3c290f commit b253cb2
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 44 deletions.
79 changes: 35 additions & 44 deletions docs/local_execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@
title: "Local Execution"
---

## Local Execution/Debugging
## Local Execution

Flink can run on a single machine, even in a single Java Virtual Machine. This allows users to test and debug Flink programs locally. This section gives an overview of the local execution mechanisms.

**NOTE:** Please also refer to the [debugging section](java_api_guide.html#debugging) in the Java API documentation for a guide to testing and local debugging utilities in the Java API.
The local environments and executors allow you to run Flink programs in a local Java Virtual Machine, or with within any JVM as part of existing programs. Most examples can be launched locally by simply hitting the "Run" button of your IDE.

The local environments and executors allow you to run Flink programs in local Java Virtual Machine, or with within any JVM as part of existing programs. Most examples can be launched locally by simply hitting the "Run" button of your IDE.

If you are running Flink programs locally, you can also debug your program like any other Java program. You can either use `System.out.println()` to write out some internal variables or you can use the debugger. It is possible to set breakpoints within `map()`, `reduce()` and all the other methods.
There are two different kinds of local execution supported in Flink. The `LocalExecutionEnvironment` is starting the full Flink runtime, including a JobManager and a TaskManager. These include memory management and all the internal algorithms that are executed in the cluster mode.

The `JobExecutionResult` object, which is returned after the execution finished, contains the program runtime and the accumulator results.
The `CollectionEnvironment` is executing the Flink program on Java collections. This mode will not start the full Flink runtime, so the execution is very low-overhead and lightweight. For example a `DataSet.map()`-transformation will be executed by applying the `map()` function to all elements in a Java list.

*Note:* The local execution environments do not start any web frontend to monitor the execution.

## Debugging

If you are running Flink programs locally, you can also debug your program like any other Java program. You can either use `System.out.println()` to write out some internal variables or you can use the debugger. It is possible to set breakpoints within `map()`, `reduce()` and all the other methods.
Please also refer to the [debugging section](programming_guide.html#debugging) in the Java API documentation for a guide to testing and local debugging utilities in the Java API.

## Maven Dependency

Expand Down Expand Up @@ -51,56 +53,45 @@ public static void main(String[] args) throws Exception {
})
.writeAsText("file:///path/to/result");

env.execute();
JobExecutionResult res = env.execute();
}
~~~

The `JobExecutionResult` object, which is returned after the execution finished, contains the program runtime and the accumulator results.

## Local Executor

The *LocalExecutor* is similar to the local environment, but it takes a *Plan* object, which describes the program as a single executable unit. The *LocalExecutor* is typically used with the Scala API.

The following code shows how you would use the `LocalExecutor` with the Wordcount example for Scala Programs:
*Note:* The local execution environments do not start any web frontend to monitor the execution.

~~~scala
public static void main(String[] args) throws Exception {
val input = TextFile("hdfs://path/to/file")
## Collection Environment

val words = input flatMap { _.toLowerCase().split("""\W+""") filter { _ != "" } }
val counts = words groupBy { x => x } count()
The execution on Java Collections using the `CollectionEnvironment` is a low-overhead approach for executing Flink programs. Typical use-cases for this mode are automated tests, debugging and code re-use.

val output = counts.write(wordsOutput, CsvOutputFormat())

val plan = new ScalaPlan(Seq(output), "Word Count")
LocalExecutor.executePlan(p);
}
~~~
Users can use algorithms implemented for batch processing also for cases that are more interactive. A slightly changed variant of a Flink program could be used in a Java Application Server for processing incoming requests.


## LocalDistributedExecutor

Flink also offers a `LocalDistributedExecutor` which starts multiple TaskManagers within one JVM. The standard `LocalExecutor` starts one JobManager and one TaskManager in one JVM.
With the `LocalDistributedExecutor` you can define the number of TaskManagers to start. This is useful for debugging network related code and more of a developer tool than a user tool.
**Skeleton for Collection-based execution**

~~~java
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<String> data = env.readTextFile("hdfs://path/to/file");

data
.filter(new FilterFunction<String>() {
public boolean filter(String value) {
return value.startsWith("http://");
}
})
.writeAsText("hdfs://path/to/result");

Plan p = env.createProgramPlan();
LocalDistributedExecutor lde = new LocalDistributedExecutor();
lde.startNephele(2); // start two TaskManagers
lde.run(p);
// initialize a new Collection-based execution environment
final ExecutionEnvironment env = new CollectionEnvironment();

DataSet<User> users = env.fromCollection( /* get elements from a Java Collection */);

/* Data Set transformations ... */

// retrieve the resulting Tuple2 elements into a ArrayList.
Collection<...> result = new ArrayList<...>();
resultDataSet.output(new LocalCollectionOutputFormat<...>(result));

// kick off execution.
env.execute();

// Do some work with the resulting ArrayList (=Collection).
for(... t : result) {
System.err.println("Result = "+t);
}
}
~~~

The `flink-java-examples` module contains a full example, called `CollectionExecutionExample`.

Please note that the execution of the collection-based Flink programs is only possible on small data, which fits into the JVM heap. The execution on collections is not multi-threaded, only one thread is used.
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.examples.java.misc;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;

import org.apache.flink.api.java.CollectionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;

/**
* This example shows how to use the collection based execution of Flink.
*
* The collection based execution is a local mode that is not using the full Flink runtime.
* DataSet transformations are executed on Java collections.
*
* See the "Local Execution" section in the documentation for more details:
* http://flink.incubator.apache.org/docs/0.7-incubating/local_execution.html
*
*/
public class CollectionExecutionExample {

/**
* POJO class representing a user
*/
public static class User {
public int userIdentifier;
public String name;
public User() {}
public User(int userIdentifier, String name) {
this.userIdentifier = userIdentifier; this.name = name;
}
public String toString() {
return "User{userIdentifier="+userIdentifier+" name="+name+"}";
}
}

/**
* POJO for an EMail.
*/
public static class EMail {
public int userId;
public String subject;
public String body;
public EMail() {}
public EMail(int userId, String subject, String body) {
this.userId = userId; this.subject = subject; this.body = body;
}
public String toString() {
return "eMail{userId="+userId+" subject="+subject+" body="+body+"}";
}

}
public static void main(String[] args) throws Exception {
// initialize a new Collection-based execution environment
final ExecutionEnvironment env = new CollectionEnvironment();

// create objects for users and emails
User[] usersArray = { new User(1, "Peter"), new User(2, "John"), new User(3, "Bill") };
EMail[] emailsArray = {new EMail(1, "Re: Meeting", "How about 1pm?"),
new EMail(1, "Re: Meeting", "Sorry, I'm not availble"),
new EMail(3, "Re: Re: Project proposal", "Give me a few more days to think about it.")};

// convert objects into a DataSet
DataSet<User> users = env.fromCollection(Arrays.asList(usersArray));
DataSet<EMail> emails = env.fromCollection(Arrays.asList(emailsArray));

// join the two DataSets
DataSet<Tuple2<User,EMail>> joined = users.join(emails).where("userIdentifier").equalTo("userId");

// retrieve the resulting Tuple2 elements into a ArrayList.
Collection<Tuple2<User,EMail>> result = new ArrayList<Tuple2<User,EMail>>(3);
joined.output(new LocalCollectionOutputFormat<Tuple2<User,EMail>>(result));

// kick off execution.
env.execute();

// Do some work with the resulting ArrayList (=Collection).
for(Tuple2<User, EMail> t : result) {
System.err.println("Result = "+t);
}
}
}

0 comments on commit b253cb2

Please sign in to comment.