Skip to content

Commit

Permalink
[FLINK-17291][docs-zh] Translate training lesson on event-driven appl…
Browse files Browse the repository at this point in the history
…ications to chinese

This closes apache#11979
  • Loading branch information
RocMarshal authored May 9, 2020
1 parent fb7ae52 commit 890dd8d
Showing 1 changed file with 99 additions and 110 deletions.
209 changes: 99 additions & 110 deletions docs/training/event_driven.zh.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
---
title: Event-driven Applications
nav-id: event-driven
title: 事件驱动应用
nav-id: 事件驱动
nav-pos: 5
nav-title: Event-driven Applications
nav-title: 事件驱动应用
nav-parent_id: training
---
<!--
Expand All @@ -27,49 +27,47 @@ under the License.
* This will be replaced by the TOC
{:toc}

## Process Functions
## 处理函数(Process Functions

### Introduction
### 简介

A `ProcessFunction` combines event processing with timers and state, making it a powerful building
block for stream processing applications. This is the basis for creating event-driven applications
with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers.
`ProcessFunction` 将事件处理与 Timer,State 结合在一起,使其成为流处理应用的强大构建模块。
这是使用 Flink 创建事件驱动应用程序的基础。它和 `RichFlatMapFunction` 十分相似, 但是增加了 Timer。

### Example
### 示例

If you've done the
[hands-on exercise]({% link training/streaming_analytics.zh.md %}#hands-on)
in the [Streaming Analytics training]({% link training/streaming_analytics.zh.md %}),
you will recall that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for
each driver during each hour, like this:
如果你已经体验了
[流式分析训练]({% link training/streaming_analytics.zh.md %})
[动手实践]({% link training/streaming_analytics.zh.md %}#hands-on),
你应该记得,它是采用 `TumblingEventTimeWindow` 来计算每个小时内每个司机的小费总和,
像下面的示例这样:

{% highlight java %}
// compute the sum of the tips per hour for each driver
// 计算每个司机每小时的小费总和
DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
.keyBy((TaxiFare fare) -> fare.driverId)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.process(new AddTips());
{% endhighlight %}

It is reasonably straightforward, and educational, to do the same thing with a
`KeyedProcessFunction`. Let us begin by replacing the code above with this:
使用 `KeyedProcessFunction` 去实现相同的操作更加直接且更有学习意义。
让我们开始用以下代码替换上面的代码:

{% highlight java %}
// compute the sum of the tips per hour for each driver
// 计算每个司机每小时的小费总和
DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
.keyBy((TaxiFare fare) -> fare.driverId)
.process(new PseudoWindow(Time.hours(1)));
{% endhighlight %}

In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed
stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream
produced by the implementation that uses Flink's built-in time windows).
在这个代码片段中,一个名为 `PseudoWindow``KeyedProcessFunction` 被应用于 KeyedStream,
其结果是一个 `DataStream<Tuple3<Long, Long, Float>>` (与使用 Flink 内置时间窗口的实现生成的流相同)。

The overall outline of `PseudoWindow` has this shape:
`PseudoWindow` 的总体轮廓示意如下:

{% highlight java %}
// Compute the sum of the tips for each driver in hour-long windows.
// The keys are driverIds.
// 在时长跨度为一小时的窗口中计算每个司机的小费总和。
// 司机ID作为 key。
public static class PseudoWindow extends
KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> {

Expand All @@ -80,13 +78,13 @@ public static class PseudoWindow extends
}

@Override
// Called once during initialization.
// 在初始化期间调用一次。
public void open(Configuration conf) {
. . .
}

@Override
// Called as each fare arrives to be processed.
// 每个票价事件(TaxiFare-Event)输入(到达)时调用,以处理输入的票价事件。
public void processElement(
TaxiFare fare,
Context ctx,
Expand All @@ -96,7 +94,7 @@ public static class PseudoWindow extends
}

@Override
// Called when the current watermark indicates that a window is now complete.
// 当当前水印(watermark)表明窗口现在需要完成的时候调用。
public void onTimer(long timestamp,
OnTimerContext context,
Collector<Tuple3<Long, Long, Float>> out) throws Exception {
Expand All @@ -106,25 +104,24 @@ public static class PseudoWindow extends
}
{% endhighlight %}

Things to be aware of:
注意事项:

* There are several types of ProcessFunctions -- this is a `KeyedProcessFunction`, but there are also
`CoProcessFunctions`, `BroadcastProcessFunctions`, etc.
* 有几种类型的 ProcessFunctions -- 不仅包括 `KeyedProcessFunction`,还包括
`CoProcessFunctions``BroadcastProcessFunctions`.

* A `KeyedProcessFunction` is a kind of `RichFunction`. Being a `RichFunction`, it has access to the `open`
and `getRuntimeContext` methods needed for working with managed keyed state.
* `KeyedProcessFunction` 是一种 `RichFunction`。作为 `RichFunction`,它可以访问使用 Managed Keyed State 所需的 `open`
`getRuntimeContext` 方法。

* There are two callbacks to implement: `processElement` and `onTimer`. `processElement` is called
with each incoming event; `onTimer` is called when timers fire. These can be either event time or
processing time timers. Both `processElement` and `onTimer` are provided with a context object
that can be used to interact with a `TimerService` (among other things). Both callbacks are also
passed a `Collector` that can be used to emit results.
* 有两个回调方法须要实现: `processElement``onTimer`。每个输入事件都会调用 `processElement` 方法;
当计时器触发时调用 `onTimer`。它们可以是基于事件时间(event time)的 timer,也可以是基于处理时间(processing time)的 timer。
除此之外,`processElement``onTimer` 都提供了一个上下文对象,该对象可用于与 `TimerService` 交互。
这两个回调还传递了一个可用于发出结果的 `Collector`

#### The `open()` method
#### `open()` 方法

{% highlight java %}
// Keyed, managed state, with an entry for each window, keyed by the window's end time.
// There is a separate MapState object for each driver.
// 每个窗口都持有托管的 Keyed state 的入口,并且根据窗口的结束时间执行 keyed 策略。
// 每个司机都有一个单独的MapState对象。
private transient MapState<Long, Float> sumOfTips;

@Override
Expand All @@ -136,13 +133,12 @@ public void open(Configuration conf) {
}
{% endhighlight %}

Because the fare events can arrive out of order, it will sometimes be necessary to process events
for one hour before having finished computing the results for the previous hour. In fact, if the
watermarking delay is much longer than the window length, then there may be many windows open
simultaneously, rather than just two. This implementation supports this by using a `MapState` that
maps the timestamp for the end of each window to the sum of the tips for that window.
由于票价事件(fare-event)可能会乱序到达,有时需要在计算输出前一个小时结果前,处理下一个小时的事件。
这样能够保证“乱序造成的延迟数据”得到正确处理(放到前一个小时中)。
实际上,如果 Watermark 延迟比窗口长度长得多,则可能有多个窗口同时打开,而不仅仅是两个。
此实现通过使用 `MapState` 来支持处理这一点,该 `MapState` 将每个窗口的结束时间戳映射到该窗口的小费总和。

#### The `processElement()` method
#### `processElement()` 方法

{% highlight java %}
public void processElement(
Expand All @@ -154,15 +150,15 @@ public void processElement(
TimerService timerService = ctx.timerService();

if (eventTime <= timerService.currentWatermark()) {
// This event is late; its window has already been triggered.
// 事件延迟;其对应的窗口已经触发。
} else {
// Round up eventTime to the end of the window containing this event.
// eventTime 向上取值并将结果赋值到包含当前事件的窗口的末尾时间点。
long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1);

// Schedule a callback for when the window has been completed.
// 在窗口完成时将启用回调
timerService.registerEventTimeTimer(endOfWindow);

// Add this fare's tip to the running total for that window.
// 将此票价的小费添加到该窗口的总计中。
Float sum = sumOfTips.get(endOfWindow);
if (sum == null) {
sum = 0.0F;
Expand All @@ -173,18 +169,16 @@ public void processElement(
}
{% endhighlight %}

Things to consider:
需要考虑的事项:

* What happens with late events? Events that are behind the watermark (i.e., late) are being
dropped. If you want to do something better than this, consider using a side output, which is
explained in the [next section]({% link training/event_driven.zh.md
%}#side-outputs).
* 延迟的事件怎么处理?watermark 后面的事件(即延迟的)正在被删除。
如果你想做一些比这更高级的操作,可以考虑使用旁路输出(Side outputs),这将在[下一节]({% link
training/event_driven.zh.md%}#side-outputs)中解释。

* This example uses a `MapState` where the keys are timestamps, and sets a `Timer` for that same
timestamp. This is a common pattern; it makes it easy and efficient to lookup relevant information
when the timer fires.
* 本例使用一个 `MapState`,其中 keys 是时间戳(timestamp),并为同一时间戳设置一个 Timer。
这是一种常见的模式;它使得在 Timer 触发时查找相关信息变得简单高效。

#### The `onTimer()` method
#### `onTimer()` 方法

{% highlight java %}
public void onTimer(
Expand All @@ -193,7 +187,7 @@ public void onTimer(
Collector<Tuple3<Long, Long, Float>> out) throws Exception {

long driverId = context.getCurrentKey();
// Look up the result for the hour that just ended.
// 查找刚结束的一小时结果。
Float sumOfTips = this.sumOfTips.get(timestamp);

Tuple3<Long, Long, Float> result = Tuple3.of(driverId, timestamp, sumOfTips);
Expand All @@ -202,107 +196,102 @@ public void onTimer(
}
{% endhighlight %}

Observations:
注意:

* The `OnTimerContext context` passed in to `onTimer` can be used to determine the current key.
* 传递给 `onTimer``OnTimerContext context` 可用于确定当前 key

* Our pseudo-windows are being triggered when the current watermark reaches the end of each hour, at
which point `onTimer` is called. This onTimer method removes the related entry from `sumOfTips`,
which has the effect of making it impossible to accommodate late events. This is the equivalent of
setting the allowedLateness to zero when working with Flink's time windows.
* 我们的 pseudo-windows 在当前 Watermark 到达每小时结束时触发,此时调用 `onTimer`
这个 `onTimer` 方法从 `sumOfTips` 中删除相关的条目,这样做的效果是不可能容纳延迟的事件。
这相当于在使用 Flink 的时间窗口时将 allowedLateness 设置为零。

### Performance Considerations
### 性能考虑

Flink provides `MapState` and `ListState` types that are optimized for RocksDB. Where possible,
these should be used instead of a `ValueState` object holding some sort of collection. The RocksDB
state backend can append to `ListState` without going through (de)serialization, and for `MapState`, each
key/value pair is a separate RocksDB object, so `MapState` can be efficiently accessed and updated.
Flink 提供了为 RocksDB 优化的 `MapState``ListState` 类型。
相对于 `ValueState`,更建议使用 `MapState``ListState`,因为使用 RocksDBStateBackend 的情况下,
`MapState``ListState``ValueState` 性能更好。
RocksDBStateBackend 可以附加到 `ListState`,而无需进行(反)序列化,
对于 `MapState`,每个 key/value 都是一个单独的 RocksDB 对象,因此可以有效地访问和更新 `MapState`

{% top %}

## Side Outputs
## 旁路输出(Side Outputs

### Introduction
### 简介

There are several good reasons to want to have more than one output stream from a Flink operator, such as reporting:
有几个很好的理由希望从 Flink 算子获得多个输出流,如下报告条目:

* exceptions
* malformed events
* late events
* operational alerts, such as timed-out connections to external services
* 异常情况(exceptions
* 格式错误的事件(malformed events
* 延迟的事件(late events
* operator 告警(operational alerts),如与外部服务的连接超时

Side outputs are a convenient way to do this. Beyond error reporting, side outputs are also
a good way to implement an n-way split of a stream.
旁路输出(Side outputs)是一种方便的方法。除了错误报告之外,旁路输出也是实现流的 n 路分割的好方法。

### Example
### 示例

You are now in a position to do something with the late events that were ignored in the previous
section.
现在你可以对上一节中忽略的延迟事件执行某些操作。

A side output channel is associated with an `OutputTag<T>`. These tags have generic types that
correspond to the type of the side output's `DataStream`, and they have names.
Side output channel 与 `OutputTag<T>` 相关联。这些标记拥有自己的名称,并与对应 DataStream 类型一致。

{% highlight java %}
private static final OutputTag<TaxiFare> lateFares = new OutputTag<TaxiFare>("lateFares") {};
{% endhighlight %}

Shown above is a static `OutputTag<TaxiFare>` that can be referenced both when emitting
late events in the `processElement` method of the `PseudoWindow`:
上面显示的是一个静态 `OutputTag<TaxiFare>` ,当在 `PseudoWindow``processElement` 方法中发出延迟事件时,可以引用它:

{% highlight java %}
if (eventTime <= timerService.currentWatermark()) {
// This event is late; its window has already been triggered.
// 事件延迟,其对应的窗口已经触发。
ctx.output(lateFares, fare);
} else {
. . .
}
{% endhighlight %}

and when accessing the stream from this side output in the `main` method of the job:
以及当在作业的 `main` 中从该旁路输出访问流时:

{% highlight java %}
// compute the sum of the tips per hour for each driver
// 计算每个司机每小时的小费总和
SingleOutputStreamOperator hourlyTips = fares
.keyBy((TaxiFare fare) -> fare.driverId)
.process(new PseudoWindow(Time.hours(1)));

hourlyTips.getSideOutput(lateFares).print();
{% endhighlight %}

Alternatively, you can use two OutputTags with the
same name to refer to the same side output, but if you do, they must have the same type.
或者,可以使用两个同名的 OutputTag 来引用同一个旁路输出,但如果这样做,它们必须具有相同的类型。

{% top %}

## Closing Remarks
## 结语

In this example you have seen how a `ProcessFunction` can be used to reimplement a straightforward time
window. Of course, if Flink's built-in windowing API meets your needs, by all means, go ahead and
use it. But if you find yourself considering doing something contorted with Flink's windows, don't
be afraid to roll your own.
在本例中,你已经了解了如何使用 `ProcessFunction` 重新实现一个简单的时间窗口。
当然,如果 Flink 内置的窗口 API 能够满足你的开发需求,那么一定要优先使用它。
但如果你发现自己在考虑用 Flink 的窗口做些错综复杂的事情,不要害怕自己动手。

Also, `ProcessFunctions` are useful for many other use cases beyond computing analytics. The hands-on
exercise below provides an example of something completely different.
此外,`ProcessFunctions` 对于计算分析之外的许多其他用例也很有用。
下面的实践练习提供了一个完全不同的例子。

Another common use case for ProcessFunctions is for expiring stale state. If you think back to the
[Rides and Fares Exercise](https://github.com/apache/flink-training/tree/{% if site.is_stable %}release-{{ site.version_title }}{% else %}master{% endif %}/rides-and-fares),
where a `RichCoFlatMapFunction` is used to compute a simple join, the sample solution assumes that
the TaxiRides and TaxiFares are perfectly matched, one-to-one for each `rideId`. If an event is lost,
the other event for the same `rideId` will be held in state forever. This could instead be implemented
as a `KeyedCoProcessFunction`, and a timer could be used to detect and clear any stale state.
`ProcessFunctions` 的另一个常见用例是清理过时 State。如果你回想一下
[Rides and Fares Exercise](https://github.com/apache/flink-training/tree/{% if site.is_stable %}release-{{ site.version_title }}{% else %}master{% endif %}/rides-and-fares),
其中使用 `RichCoFlatMapFunction` 来计算简单 Join,那么示例方案假设 TaxiRides 和 TaxiFares 两个事件是严格匹配为一个有效
数据对(必须同时出现)并且每一组这样的有效数据对都和一个唯一的 `rideId` 严格对应。如果数据对中的某个 TaxiRides 事件(TaxiFares 事件)
丢失,则同一 `rideId` 对应的另一个出现的 TaxiFares 事件(TaxiRides 事件)对应的 State 则永远不会被清理掉。
所以这里可以使用 `KeyedCoProcessFunction` 的实现代替它(`RichCoFlatMapFunction`),并且可以使用计时器来检测和清除任何过时
的 State。

{% top %}

## Hands-on
## 实践练习

The hands-on exercise that goes with this section is the [Long Ride Alerts
本节的实践练习是 [Long Ride Alerts
Exercise](https://github.com/apache/flink-training/tree/{% if site.is_stable %}release-{{ site.version_title }}{% else %}master{% endif %}/long-ride-alerts).

{% top %}

## Further Reading
## 延伸阅读

- [ProcessFunction]({% link dev/stream/operators/process_function.zh.md %})
- [Side Outputs]({% link dev/stream/side_output.zh.md %})
- [处理函数(ProcessFunction]({% link dev/stream/operators/process_function.zh.md %})
- [旁路输出(Side Outputs]({% link dev/stream/side_output.zh.md %})

{% top %}

0 comments on commit 890dd8d

Please sign in to comment.