Skip to content

Commit

Permalink
[FLINK-5457] [docs] Add documentation for asynchronous I/O
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Jan 16, 2017
1 parent e2ba042 commit fb3761b
Show file tree
Hide file tree
Showing 3 changed files with 562 additions and 2 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ out/
/docs/.rubydeps
/docs/ruby2/.bundle
/docs/ruby2/.rubydeps
/docs/.jekyll-metadata
*.ipr
*.iws
226 changes: 224 additions & 2 deletions docs/dev/stream/asyncio.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
---
title: "Async I/O for External Data Access"
title: "Asynchronous I/O for External Data Access"
nav-title: "Async I/O"
nav-parent_id: streaming
nav-pos: 60
---
Expand All @@ -25,4 +26,225 @@ under the License.
* ToC
{:toc}

**TDB**
This page explains the use of Flink's API for asynchronous I/O with external data stores.
For users not familiar with asynchronous or event-driven programming, an article about Futures and
event-driven programming may be useful preparation.

Note: Details about the design and implementation of the asynchronous I/O utility can be found in the proposal and design document
[FLIP-12: Asynchronous I/O Design and Implementation](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673).


## The need for Asynchronous I/O Operations

When interacting with external systems (for example when enriching stream events with data stored in a database), one needs to take care
that communication delay with the external system does not dominate the streaming application's total work.

Naively accessing data in the external database, for example in a `MapFunction`, typically means **synchronous** interaction:
A request is sent to the database and the `MapFunction` waits until the response has been received. In many cases, this waiting
makes up the vast majority of the function's time.

Asynchronous interaction with the database means that a single parallel function instance can handle many requests concurrently and
receive the responses concurrently. That way, the waiting time can be overlayed with sending other requests and
receiving responses. At the very least, the waiting time is amortized over multiple requests. This leads in most cased to much higher
streaming throughput.

<img src="../../fig/async_io.svg" class="center" width="50%" />

*Note:* Improving throughput by just scaling the `MapFunction` to a very high parallelism is in some cases possible as well, but usually
comes at a very high resource cost: Having many more parallel MapFunction instances means more tasks, threads, Flink-internal network
connections, network connections to the database, buffers, and general internal bookkeeping overhead.


## Prerequisites

As illustrated in the section above, implementing proper asynchronous I/O to a database (or key/value store) requires a client
to that database that supports asynchronous requests. Many popular databases offer such a client.

In the absence of such a client, one can try and turn a synchronous client into a limited concurrent client by creating
multiple clients and handling the synchronous calls with a thread pool. However, this approach is usually less
efficient than a proper asynchronous client.


## Async I/O API

Flink's Async I/O API allows users to use asynchronous request clients with data streams. The API handles the integration with
data streams, well as handling order, event time, fault tolerance, etc.

Assuming one has an asynchronous client for the target database, three parts are needed to implement a stream transformation
with asynchronous I/O against the database:

- An implementation of `AsyncFunction` that dispatches the requests
- A *callback* that takes the result of the operation and hands it to the `AsyncCollector`
- Applying the async I/O operation on a DataStream as a transformation

The following code example illustrates the basic pattern:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)

/**
* An implementation of the 'AsyncFunction' that sends requests and sets the callback.
*/
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {

/** The database specific client that can issue concurrent requests with callbacks */
private transient DatabaseClient client;

@Override
public void open(Configuration parameters) throws Exception {
client = new DatabaseClient(host, post, credentials);
}

@Override
public void close() throws Exception {
client.close();
}

@Override
public void asyncInvoke(final String str, final AsyncCollector<Tuple2<String, String>> asyncCollector) throws Exception {

// issue the asynchronous request, receive a future for result
Future<String> resultFuture = client.query(str);

// set the callback to be executed once the request by the client is complete
// the callback simply forwards the result to the collector
resultFuture.thenAccept( (String result) -> {

asyncCollector.collect(Collections.singleton(new Tuple2<>(str, result)));
});
}
}

// create the original stream
DataStream<String> stream = ...;

// apply the async I/O transformation
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);

{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
/**
* An implementation of the 'AsyncFunction' that sends requests and sets the callback.
*/
class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {

/** The database specific client that can issue concurrent requests with callbacks */
lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials)

/** The context used for the future callbacks */
implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor()))


override def asyncInvoke(str: String, asyncCollector: AsyncCollector[(String, String)]): Unit = {

// issue the asynchronous request, receive a future for the result
val resultFuture: Future[String] = client.query(str)

// set the callback to be executed once the request by the client is complete
// the callback simply forwards the result to the collector
resultFuture.onSuccess {
case result: String => asyncCollector.collect(Collections.singleton((str, result)));
})
}
}

// create the original stream
val stream: DataStream[String] = ...

// apply the async I/O transformation
val resultStream: DataStream[(String, String)] =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100)

{% endhighlight %}
</div>
</div>

The following two parameters control the asynchronous operations:

- **Timeout**: The timeout defines how long an asynchronous request may take before it is considered failed. This parameter
guards against dead/failed requests.

- **Capacity**: This parameter defines how many asynchronous requests may be in progress at the same time.
Even though the async I/O approach leads typically to much better throughput, the operator can still be the bottleneck in
the streaming application. Limiting the number of concurrent requests ensures that the operator will not
accumulate an ever-growing backlog of pending requests, but that it will trigger backpressure once the capacity
is exhausted.


### Order of Results

The concurrent requests issued by the `AsyncFunction` frequently complete in some undefined order, based on which request finished first.
To control in which order the resulting records are emitted, Flink offers two modes:

- **Unordered**: Result records are emitted as soon as the asynchronous request finishes.
The order of the records in the stream is different after the async I/O operator than before.
This mode has the lowest latency and lowest overhead, when used with *processing time* as the basic time characteristic.
Use `AsyncDataStream.unorderedWait(...)` for this mode.

- **Ordered**: In that case, the stream order is preserved. Result records are emitted in the same order as the asynchronous
requests are triggered (the order of the operators input records). To achieve that, the operator buffers a result record
until all its preceeding records are emitted (or timed out).
This usually introduces some amount of extra latency and some overhead in checkpointing, because records or results are maintained
in the checkpointed state for a longer time, compared to the unordered mode.
Use `AsyncDataStream.orderedWait(...)` for this mode.


### Event Time

When the streaming application works with [event time](../event_time.html), watermarks will be handled correctly by the
asynchronous I/O operator. That means concretely the following for the two order modes:

- **Unordered**: Watermarks do not overtake records and vice versa, meaning watermarks establish an *order boundary*.
Records are emitted unordered only between watermarks.
A record occurring after a certain watermark will be emitted only after that watermark was emitted.
The watermark in turn will be emitted only after all result records from inputs before that watermark were emitted.

That means that in the presence of watermarks, the *unordered* mode introduces some of the same latency and management
overhead as the *ordered* mode does. The amount of that overhead depends on the watermark frequency.

- **Ordered**: Order of watermarks an records is preserved, just like order between records is preserved. There is no
significant change in overhead, compared to working with *processing time*.

Please recall that *Ingestion Time* is a special case of *event time* with automatically generated watermarks that
are based on the sources processing time.


### Fault Tolerance Guarantees

The asynchronous I/O operator offers full exactly-once fault tolerance guarantees. It stores the records for in-flight
asynchronous requests in checkpoints and restores/re-triggers the requests when recovering from a failure.


### Implementation Tips

For implementations with *Futures* that have an *Executor* (or *ExecutionContext* in Scala) for callbacks, we suggets to use a `DirectExecutor`, because the
callback typically does minimal work, and a `DirectExecutor` avoids an additional thread-to-thread handover overhead. The callback typically only hands
the result to the `AsyncCollector`, which adds it to the output buffer. From there, the heavy logic that includes record emission and interaction
with the checkpoint bookkeepting happens in a dedicated thread-pool anyways.

A `DirectExecutor` can be obtained via `org.apache.flink.runtime.concurrent.Executors.directExecutor()` or
`com.google.common.util.concurrent.MoreExecutors.directExecutor()`.


### Caveat

**The AsyncFunction is not called Multi-Threaded**

A common confusion that we want to explicitly point out here is that the `AsyncFunction` is not called in a multi-threaded fashion.
There exists only one instance of the `AsyncFunction` and it is called sequentially for each record in the respective partition
of the stream. Unless the `asyncInvoke(...)` method returns fast and relies on a callback (by the client), it will not result in
proper asynchronous I/O.

For example, the following patterns result in a blocking `asyncInvoke(...)` functions and thus void the asynchronous behavior:

- Using a database client whose lookup/query method call blocks until the result has been received back

- Blocking/waiting on the future-type objects returned by an aynchronous client inside the `asyncInvoke(...)` method

Loading

0 comments on commit fb3761b

Please sign in to comment.