Skip to content

Commit

Permalink
[FLINK-3641] Add documentation for DataSet distributed cache.
Browse files Browse the repository at this point in the history
This closes apache#2122
  • Loading branch information
fhueske committed Jun 18, 2016
1 parent 5a0c268 commit ba62df1
Showing 1 changed file with 105 additions and 1 deletion.
106 changes: 105 additions & 1 deletion docs/apis/batch/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2019,6 +2019,110 @@ of a function, or use the `withParameters(...)` method to pass in a configuratio

{% top %}

Distributed Cache
-------------------

Flink offers a distributed cache, similar to Apache Hadoop, to make files locally accessible to parallel instances of user functions. This functionality can be used to share files that contain static external data such as dictionaries or machine-learned regression models.

The cache works as follows. A program registers a file or directory of a [local or remote filesystem such as HDFS or S3]({{ site.baseurl }}/apis/batch/connectors.html#reading-from-file-systems) under a specific name in its `ExecutionEnvironment` as a cached file. When the program is executed, Flink automatically copies the file or directory to the local filesystem of all workers. A user function can look up the file or directory under the specified name and access it from the worker's local filesystem.

The distributed cache is used as follows:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">

Register the file or directory in the `ExecutionEnvironment`.

{% highlight java %}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")

// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)

// define your program and execute
...
DataSet<String> input = ...
DataSet<Integer> result = input.map(new MyMapper());
...
env.execute();
{% endhighlight %}

Access the cached file or directory in a user function (here a `MapFunction`). The function must extend a [RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) class because it needs access to the `RuntimeContext`.

{% highlight java %}

// extend a RichFunction to have access to the RuntimeContext
public final class MyMapper extends RichMapFunction<String, Integer> {

@Override
public void open(Configuration config) {

// access cached file via RuntimeContext and DistributedCache
File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
// read the file (or navigate the directory)
...
}

@Override
public Integer map(String value) throws Exception {
// use content of cached file
...
}
}
{% endhighlight %}

</div>
<div data-lang="scala" markdown="1">

Register the file or directory in the `ExecutionEnvironment`.

{% highlight scala %}
val env = ExecutionEnvironment.getExecutionEnvironment

// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")

// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)

// define your program and execute
...
val input: DataSet[String] = ...
val result: DataSet[Integer] = input.map(new MyMapper())
...
env.execute()
{% endhighlight %}

Access the cached file in a user function (here a `MapFunction`). The function must extend a [RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) class because it needs access to the `RuntimeContext`.

{% highlight scala %}

// extend a RichFunction to have access to the RuntimeContext
class MyMapper extends RichMapFunction[String, Int] {

override def open(config: Configuration): Unit = {

// access cached file via RuntimeContext and DistributedCache
val myFile: File = getRuntimeContext.getDistributedCache.getFile("hdfsFile")
// read the file (or navigate the directory)
...
}

override def map(value: String): Int = {
// use content of cached file
...
}
}
{% endhighlight %}

</div>
</div>

{% top %}

Passing Parameters to Functions
-------------------

Expand Down Expand Up @@ -2067,7 +2171,7 @@ class MyFilter(limit: Int) extends FilterFunction[Int] {

#### Via `withParameters(Configuration)`

This method takes a Configuration object as an argument, which will be passed to the [rich function](#rich-functions)'s `open()`
This method takes a Configuration object as an argument, which will be passed to the [rich function]({{ site.baseurl }}/apis/common/index.html#rich-functions)'s `open()`
method. The Configuration object is a Map from String keys to different value types.

<div class="codetabs" markdown="1">
Expand Down

0 comments on commit ba62df1

Please sign in to comment.