Skip to content

Commit

Permalink
[FLINK-18974][docs-zh]Translate the 'User-Defined Functions' page of …
Browse files Browse the repository at this point in the history
…"Application Development's DataStream API" into Chinese (apache#13225)


Co-authored-by: zhushang <[email protected]>
  • Loading branch information
zhuxiaoshang and zhushang authored Sep 17, 2020
1 parent 7925647 commit 8f2385b
Showing 1 changed file with 63 additions and 74 deletions.
137 changes: 63 additions & 74 deletions docs/dev/user_defined_functions.zh.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
title: '用户自定义函数'
title: '用户自定义 Functions'
nav-id: user_defined_function
nav-parent_id: streaming
nav-pos: 4
Expand All @@ -23,16 +23,16 @@ specific language governing permissions and limitations
under the License.
-->

Most operations require a user-defined function. This section lists different
ways of how they can be specified. We also cover `Accumulators`, which can be
used to gain insights into your Flink application.
大多数操作都需要用户自定义 function。本节列出了实现用户自定义 function 的不同方式。还会介绍 `Accumulators`(累加器),可用于深入了解你的 Flink 应用程序。

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">

## Implementing an interface
<a name="implementing-an-interface"></a>

The most basic way is to implement one of the provided interfaces:
## 实现接口

最基本的方法是实现提供的接口:

{% highlight java %}
class MyMapFunction implements MapFunction<String, Integer> {
Expand All @@ -41,18 +41,22 @@ class MyMapFunction implements MapFunction<String, Integer> {
data.map(new MyMapFunction());
{% endhighlight %}

## Anonymous classes
<a name="anonymous-classes"></a>

## 匿名类

You can pass a function as an anonymous class:
你可以将 function 当做匿名类传递:
{% highlight java %}
data.map(new MapFunction<String, Integer> () {
public Integer map(String value) { return Integer.parseInt(value); }
});
{% endhighlight %}

<a name="java-8-lambdas"></a>

## Java 8 Lambdas

Flink also supports Java 8 Lambdas in the Java API.
Flink 在 Java API 中还支持 Java 8 Lambdas 表达式。

{% highlight java %}
data.filter(s -> s.startsWith("http://"));
Expand All @@ -62,32 +66,33 @@ data.filter(s -> s.startsWith("http://"));
data.reduce((i1,i2) -> i1 + i2);
{% endhighlight %}

<a name="rich-functions"></a>

## Rich functions

All transformations that require a user-defined function can
instead take as argument a *rich* function. For example, instead of
所有需要用户自定义 function 的转化操作都可以将 *rich* function 作为参数。例如,你可以将下面代码

{% highlight java %}
class MyMapFunction implements MapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
};
{% endhighlight %}

you can write
替换成

{% highlight java %}
class MyMapFunction extends RichMapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
};
{% endhighlight %}

and pass the function as usual to a `map` transformation:
并将 function 照常传递给 `map` transformation:

{% highlight java %}
data.map(new MyMapFunction());
{% endhighlight %}

Rich functions can also be defined as an anonymous class:
Rich functions 也可以定义成匿名类:
{% highlight java %}
data.map (new RichMapFunction<String, Integer>() {
public Integer map(String value) { return Integer.parseInt(value); }
Expand All @@ -97,11 +102,11 @@ data.map (new RichMapFunction<String, Integer>() {
</div>
<div data-lang="scala" markdown="1">

<a name="lambda-functions"></a>

## Lambda Functions

As already seen in previous examples all operations accept lambda functions for describing
the operation:
正如你在上面的例子中看到的,所有的操作同可以通过 lambda 表达式来描述:
{% highlight scala %}
val data: DataSet[String] = // [...]
data.filter { _.startsWith("http://") }
Expand All @@ -114,30 +119,31 @@ data.reduce { (i1,i2) => i1 + i2 }
data.reduce { _ + _ }
{% endhighlight %}

<a name="rich-functions"></a>

## Rich functions

All transformations that take as argument a lambda function can
instead take as argument a *rich* function. For example, instead of
所有将 lambda 表达式作为参数的转化操作都可以用 *rich* function 来代替。例如,你可以将下面代码

{% highlight scala %}
data.map { x => x.toInt }
{% endhighlight %}

you can write
替换成

{% highlight scala %}
class MyMapFunction extends RichMapFunction[String, Int] {
def map(in: String):Int = { in.toInt }
};
{% endhighlight %}

and pass the function to a `map` transformation:
并将 function 传递给 `map` transformation:

{% highlight scala %}
data.map(new MyMapFunction())
{% endhighlight %}

Rich functions can also be defined as an anonymous class:
Rich functions 也可以定义成匿名类:
{% highlight scala %}
data.map (new RichMapFunction[String, Int] {
def map(in: String):Int = { in.toInt }
Expand All @@ -147,95 +153,78 @@ data.map (new RichMapFunction[String, Int] {

</div>

Rich functions provide, in addition to the user-defined function (map,
reduce, etc), four methods: `open`, `close`, `getRuntimeContext`, and
`setRuntimeContext`. These are useful for parameterizing the function
(see [Passing Parameters to Functions]({{ site.baseurl }}/dev/batch/index.html#passing-parameters-to-functions)),
creating and finalizing local state, accessing broadcast variables (see
[Broadcast Variables]({{ site.baseurl }}/dev/batch/index.html#broadcast-variables)), and for accessing runtime
information such as accumulators and counters (see
[Accumulators and Counters](#accumulators--counters)), and information
on iterations (see [Iterations]({{ site.baseurl }}/dev/batch/iterations.html)).
除了用户自定义的 function(map,reduce 等),Rich functions 还提供了四个方法:`open``close``getRuntimeContext`
`setRuntimeContext`。这些方法对于参数化 function
(参阅 [给 function 传递参数]({% link dev/batch/index.zh.md %}#passing-parameters-to-functions)),
创建和最终确定本地状态,访问广播变量(参阅
[广播变量]({% link dev/batch/index.zh.md %}#broadcast-variables )),以及访问运行时信息,例如累加器和计数器(参阅
[累加器和计数器](#accumulators--counters)),以及迭代器的相关信息(参阅 [迭代器]({% link dev/batch/iterations.zh.md %}))
有很大作用。

{% top %}

## Accumulators & Counters
<a name="accumulators--counters"></a>

Accumulators are simple constructs with an **add operation** and a **final accumulated result**,
which is available after the job ended.
## 累加器和计数器

The most straightforward accumulator is a **counter**: You can increment it using the
```Accumulator.add(V value)``` method. At the end of the job Flink will sum up (merge) all partial
results and send the result to the client. Accumulators are useful during debugging or if you
quickly want to find out more about your data.
累加器是具有**加法运算****最终累加结果**的一种简单结构,可在作业结束后使用。

Flink currently has the following **built-in accumulators**. Each of them implements the
{% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java "Accumulator" %}
interface.
最简单的累加器就是**计数器**: 你可以使用
```Accumulator.add(V value)``` 方法将其递增。在作业结束时,Flink 会汇总(合并)所有部分的结果并将其发送给客户端。
在调试过程中或在你想快速了解有关数据更多信息时,累加器作用很大。

Flink 目前有如下**内置累加器**。每个都实现了
{% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java "累加器" %}
接口。

- {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/IntCounter.java "__IntCounter__" %},
{% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/LongCounter.java "__LongCounter__" %}
and {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/DoubleCounter.java "__DoubleCounter__" %}:
See below for an example using a counter.
- {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java "__Histogram__" %}:
A histogram implementation for a discrete number of bins. Internally it is just a map from Integer
to Integer. You can use this to compute distributions of values, e.g. the distribution of
words-per-line for a word count program.
和 {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/DoubleCounter.java "__DoubleCounter__" %}:
有关使用计数器的示例,请参见下文。
- {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java "__直方图__" %}:
离散数量的柱状直方图实现。在内部,它只是整形到整形的映射。你可以使用它来计算值的分布,例如,单词计数程序的每行单词的分布情况。

__How to use accumulators:__
__如何使用累加器:__

First you have to create an accumulator object (here a counter) in the user-defined transformation
function where you want to use it.
首先,在需要使用累加器的用户自定义的转换 function 中创建一个累加器对象(此处是计数器)。

{% highlight java %}
private IntCounter numLines = new IntCounter();
{% endhighlight %}

Second you have to register the accumulator object, typically in the ```open()``` method of the
*rich* function. Here you also define the name.
其次,你必须在 *rich* function 的 ```open()``` 方法中注册累加器对象。也可以在此处定义名称。

{% highlight java %}
getRuntimeContext().addAccumulator("num-lines", this.numLines);
{% endhighlight %}

You can now use the accumulator anywhere in the operator function, including in the ```open()``` and
```close()``` methods.
现在你可以在操作 function 中的任何位置(包括 ```open()``````close()``` 方法中)使用累加器。

{% highlight java %}
this.numLines.add(1);
{% endhighlight %}

The overall result will be stored in the ```JobExecutionResult``` object which is
returned from the `execute()` method of the execution environment
(currently this only works if the execution waits for the
completion of the job).
最终整体结果会存储在由执行环境的 `execute()` 方法返回的 ```JobExecutionResult``` 对象中(当前只有等待作业完成后执行才起作用)。

{% highlight java %}
myJobExecutionResult.getAccumulatorResult("num-lines")
{% endhighlight %}

All accumulators share a single namespace per job. Thus you can use the same accumulator in
different operator functions of your job. Flink will internally merge all accumulators with the same
name.
单个作业的所有累加器共享一个命名空间。因此你可以在不同的操作 function 里面使用同一个累加器。Flink 会在内部将所有具有相同名称的累加器合并起来。

A note on accumulators and iterations: Currently the result of accumulators is only available after
the overall job has ended. We plan to also make the result of the previous iteration available in the
next iteration. You can use
{% gh_link /flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java#L98 "Aggregators" %}
to compute per-iteration statistics and base the termination of iterations on such statistics.
关于累加器和迭代的注意事项:当前累加器的结果只有在整个作业结束后才可用。我们还计划在下一次迭代中提供上一次的迭代结果。你可以使用
{% gh_link /flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java#L98 "聚合器" %}
来计算每次迭代的统计信息,并基于此类统计信息来终止迭代。

__Custom accumulators:__
__定制累加器:__

To implement your own accumulator you simply have to write your implementation of the Accumulator
interface. Feel free to create a pull request if you think your custom accumulator should be shipped
with Flink.
要实现自己的累加器,你只需要实现累加器接口即可。如果你认为自定义累加器应随 Flink 一起提供,请尽管创建 pull request。

You have the choice to implement either
你可以选择实现
{% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java "Accumulator" %}
or {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/SimpleAccumulator.java "SimpleAccumulator" %}.
{% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/SimpleAccumulator.java "SimpleAccumulator" %}

```Accumulator<V,R>``` is most flexible: It defines a type ```V``` for the value to add, and a
result type ```R``` for the final result. E.g. for a histogram, ```V``` is a number and ```R``` is
a histogram. ```SimpleAccumulator``` is for the cases where both types are the same, e.g. for counters.
```Accumulator<V,R>``` 的实现十分灵活: 它定义了将要添加的值类型 ```V```,并定义了最终的结果类型 ```R```。例如,对于直方图,```V``` 是一个数字且 ```R``` 是一个直方图。
```SimpleAccumulator``` 适用于两种类型都相同的情况,例如计数器。

{% top %}

0 comments on commit 8f2385b

Please sign in to comment.