diff --git a/docs/dev/user_defined_functions.zh.md b/docs/dev/user_defined_functions.zh.md index a1b52a087dc21..e2473afee55a2 100644 --- a/docs/dev/user_defined_functions.zh.md +++ b/docs/dev/user_defined_functions.zh.md @@ -1,5 +1,5 @@ --- -title: '用户自定义函数' +title: '用户自定义 Functions' nav-id: user_defined_function nav-parent_id: streaming nav-pos: 4 @@ -23,16 +23,16 @@ specific language governing permissions and limitations under the License. --> -Most operations require a user-defined function. This section lists different -ways of how they can be specified. We also cover `Accumulators`, which can be -used to gain insights into your Flink application. +大多数操作都需要用户自定义 function。本节列出了实现用户自定义 function 的不同方式。还会介绍 `Accumulators`(累加器),可用于深入了解你的 Flink 应用程序。
-## Implementing an interface + -The most basic way is to implement one of the provided interfaces: +## 实现接口 + +最基本的方法是实现提供的接口: {% highlight java %} class MyMapFunction implements MapFunction { @@ -41,18 +41,22 @@ class MyMapFunction implements MapFunction { data.map(new MyMapFunction()); {% endhighlight %} -## Anonymous classes + + +## 匿名类 -You can pass a function as an anonymous class: +你可以将 function 当做匿名类传递: {% highlight java %} data.map(new MapFunction () { public Integer map(String value) { return Integer.parseInt(value); } }); {% endhighlight %} + + ## Java 8 Lambdas -Flink also supports Java 8 Lambdas in the Java API. +Flink 在 Java API 中还支持 Java 8 Lambdas 表达式。 {% highlight java %} data.filter(s -> s.startsWith("http://")); @@ -62,10 +66,11 @@ data.filter(s -> s.startsWith("http://")); data.reduce((i1,i2) -> i1 + i2); {% endhighlight %} + + ## Rich functions -All transformations that require a user-defined function can -instead take as argument a *rich* function. For example, instead of +所有需要用户自定义 function 的转化操作都可以将 *rich* function 作为参数。例如,你可以将下面代码 {% highlight java %} class MyMapFunction implements MapFunction { @@ -73,7 +78,7 @@ class MyMapFunction implements MapFunction { }; {% endhighlight %} -you can write +替换成 {% highlight java %} class MyMapFunction extends RichMapFunction { @@ -81,13 +86,13 @@ class MyMapFunction extends RichMapFunction { }; {% endhighlight %} -and pass the function as usual to a `map` transformation: +并将 function 照常传递给 `map` transformation: {% highlight java %} data.map(new MyMapFunction()); {% endhighlight %} -Rich functions can also be defined as an anonymous class: +Rich functions 也可以定义成匿名类: {% highlight java %} data.map (new RichMapFunction() { public Integer map(String value) { return Integer.parseInt(value); } @@ -97,11 +102,11 @@ data.map (new RichMapFunction() {
+ ## Lambda Functions -As already seen in previous examples all operations accept lambda functions for describing -the operation: +正如你在上面的例子中看到的,所有的操作同可以通过 lambda 表达式来描述: {% highlight scala %} val data: DataSet[String] = // [...] data.filter { _.startsWith("http://") } @@ -114,16 +119,17 @@ data.reduce { (i1,i2) => i1 + i2 } data.reduce { _ + _ } {% endhighlight %} + + ## Rich functions -All transformations that take as argument a lambda function can -instead take as argument a *rich* function. For example, instead of +所有将 lambda 表达式作为参数的转化操作都可以用 *rich* function 来代替。例如,你可以将下面代码 {% highlight scala %} data.map { x => x.toInt } {% endhighlight %} -you can write +替换成 {% highlight scala %} class MyMapFunction extends RichMapFunction[String, Int] { @@ -131,13 +137,13 @@ class MyMapFunction extends RichMapFunction[String, Int] { }; {% endhighlight %} -and pass the function to a `map` transformation: +并将 function 传递给 `map` transformation: {% highlight scala %} data.map(new MyMapFunction()) {% endhighlight %} -Rich functions can also be defined as an anonymous class: +Rich functions 也可以定义成匿名类: {% highlight scala %} data.map (new RichMapFunction[String, Int] { def map(in: String):Int = { in.toInt } @@ -147,95 +153,78 @@ data.map (new RichMapFunction[String, Int] {
-Rich functions provide, in addition to the user-defined function (map, -reduce, etc), four methods: `open`, `close`, `getRuntimeContext`, and -`setRuntimeContext`. These are useful for parameterizing the function -(see [Passing Parameters to Functions]({{ site.baseurl }}/dev/batch/index.html#passing-parameters-to-functions)), -creating and finalizing local state, accessing broadcast variables (see -[Broadcast Variables]({{ site.baseurl }}/dev/batch/index.html#broadcast-variables)), and for accessing runtime -information such as accumulators and counters (see -[Accumulators and Counters](#accumulators--counters)), and information -on iterations (see [Iterations]({{ site.baseurl }}/dev/batch/iterations.html)). +除了用户自定义的 function(map,reduce 等),Rich functions 还提供了四个方法:`open`、`close`、`getRuntimeContext` 和 +`setRuntimeContext`。这些方法对于参数化 function +(参阅 [给 function 传递参数]({% link dev/batch/index.zh.md %}#passing-parameters-to-functions)), +创建和最终确定本地状态,访问广播变量(参阅 +[广播变量]({% link dev/batch/index.zh.md %}#broadcast-variables )),以及访问运行时信息,例如累加器和计数器(参阅 +[累加器和计数器](#accumulators--counters)),以及迭代器的相关信息(参阅 [迭代器]({% link dev/batch/iterations.zh.md %})) +有很大作用。 {% top %} -## Accumulators & Counters + -Accumulators are simple constructs with an **add operation** and a **final accumulated result**, -which is available after the job ended. +## 累加器和计数器 -The most straightforward accumulator is a **counter**: You can increment it using the -```Accumulator.add(V value)``` method. At the end of the job Flink will sum up (merge) all partial -results and send the result to the client. Accumulators are useful during debugging or if you -quickly want to find out more about your data. +累加器是具有**加法运算**和**最终累加结果**的一种简单结构,可在作业结束后使用。 -Flink currently has the following **built-in accumulators**. Each of them implements the -{% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java "Accumulator" %} -interface. +最简单的累加器就是**计数器**: 你可以使用 +```Accumulator.add(V value)``` 方法将其递增。在作业结束时,Flink 会汇总(合并)所有部分的结果并将其发送给客户端。 +在调试过程中或在你想快速了解有关数据更多信息时,累加器作用很大。 + +Flink 目前有如下**内置累加器**。每个都实现了 +{% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java "累加器" %} +接口。 - {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/IntCounter.java "__IntCounter__" %}, {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/LongCounter.java "__LongCounter__" %} - and {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/DoubleCounter.java "__DoubleCounter__" %}: - See below for an example using a counter. -- {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java "__Histogram__" %}: - A histogram implementation for a discrete number of bins. Internally it is just a map from Integer - to Integer. You can use this to compute distributions of values, e.g. the distribution of - words-per-line for a word count program. + 和 {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/DoubleCounter.java "__DoubleCounter__" %}: + 有关使用计数器的示例,请参见下文。 +- {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java "__直方图__" %}: + 离散数量的柱状直方图实现。在内部,它只是整形到整形的映射。你可以使用它来计算值的分布,例如,单词计数程序的每行单词的分布情况。 -__How to use accumulators:__ +__如何使用累加器:__ -First you have to create an accumulator object (here a counter) in the user-defined transformation -function where you want to use it. +首先,在需要使用累加器的用户自定义的转换 function 中创建一个累加器对象(此处是计数器)。 {% highlight java %} private IntCounter numLines = new IntCounter(); {% endhighlight %} -Second you have to register the accumulator object, typically in the ```open()``` method of the -*rich* function. Here you also define the name. +其次,你必须在 *rich* function 的 ```open()``` 方法中注册累加器对象。也可以在此处定义名称。 {% highlight java %} getRuntimeContext().addAccumulator("num-lines", this.numLines); {% endhighlight %} -You can now use the accumulator anywhere in the operator function, including in the ```open()``` and -```close()``` methods. +现在你可以在操作 function 中的任何位置(包括 ```open()``` 和 ```close()``` 方法中)使用累加器。 {% highlight java %} this.numLines.add(1); {% endhighlight %} -The overall result will be stored in the ```JobExecutionResult``` object which is -returned from the `execute()` method of the execution environment -(currently this only works if the execution waits for the -completion of the job). +最终整体结果会存储在由执行环境的 `execute()` 方法返回的 ```JobExecutionResult``` 对象中(当前只有等待作业完成后执行才起作用)。 {% highlight java %} myJobExecutionResult.getAccumulatorResult("num-lines") {% endhighlight %} -All accumulators share a single namespace per job. Thus you can use the same accumulator in -different operator functions of your job. Flink will internally merge all accumulators with the same -name. +单个作业的所有累加器共享一个命名空间。因此你可以在不同的操作 function 里面使用同一个累加器。Flink 会在内部将所有具有相同名称的累加器合并起来。 -A note on accumulators and iterations: Currently the result of accumulators is only available after -the overall job has ended. We plan to also make the result of the previous iteration available in the -next iteration. You can use -{% gh_link /flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java#L98 "Aggregators" %} -to compute per-iteration statistics and base the termination of iterations on such statistics. +关于累加器和迭代的注意事项:当前累加器的结果只有在整个作业结束后才可用。我们还计划在下一次迭代中提供上一次的迭代结果。你可以使用 +{% gh_link /flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java#L98 "聚合器" %} +来计算每次迭代的统计信息,并基于此类统计信息来终止迭代。 -__Custom accumulators:__ +__定制累加器:__ -To implement your own accumulator you simply have to write your implementation of the Accumulator -interface. Feel free to create a pull request if you think your custom accumulator should be shipped -with Flink. +要实现自己的累加器,你只需要实现累加器接口即可。如果你认为自定义累加器应随 Flink 一起提供,请尽管创建 pull request。 -You have the choice to implement either +你可以选择实现 {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java "Accumulator" %} -or {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/SimpleAccumulator.java "SimpleAccumulator" %}. +或 {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/SimpleAccumulator.java "SimpleAccumulator" %}。 -```Accumulator``` is most flexible: It defines a type ```V``` for the value to add, and a -result type ```R``` for the final result. E.g. for a histogram, ```V``` is a number and ```R``` is - a histogram. ```SimpleAccumulator``` is for the cases where both types are the same, e.g. for counters. +```Accumulator``` 的实现十分灵活: 它定义了将要添加的值类型 ```V```,并定义了最终的结果类型 ```R```。例如,对于直方图,```V``` 是一个数字且 ```R``` 是一个直方图。 + ```SimpleAccumulator``` 适用于两种类型都相同的情况,例如计数器。 {% top %}