Skip to content

Commit

Permalink
GitBook: [master] 4 pages modified
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonathan-Wei authored and gitbook-bot committed Jan 3, 2021
1 parent 9e6e37b commit c11ab7d
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,2 +1,146 @@
# 用户自定义函数(UDF)

大多数操作都需要用户定义的功能。本节列出了如何指定它们的不同方法。我们还将介绍`Accumulators`,可用于深入了解您的Flink应用程序。

{% tabs %}
{% tab title="Java" %}
## 实现借口

最基本的方法是实现提供的接口之一:

```java
class MyMapFunction implements MapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
};
data.map(new MyMapFunction());
```

## 匿名类

可以将函数作为匿名类传递:

```java
data.map(new MapFunction<String, Integer> () {
public Integer map(String value) { return Integer.parseInt(value); }
});
```

## Java 8 Lambdas

Flink还支持Java API中的Java 8 Lambda。

```java
data.filter(s -> s.startsWith("http://"));
```

```java
data.reduce((i1,i2) -> i1 + i2);
```

## Rich functions

所有需要用户定义函数的转换都可以将_Rich_函数作为参数。例如

```java
class MyMapFunction implements MapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
};
```

你可以写

```java
class MyMapFunction extends RichMapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
};
```

并将函数照常传递给`map`转换:

```java
data.map(new MyMapFunction());
```

Rich函数也可以定义为匿名类:

```java
data.map (new RichMapFunction<String, Integer>() {
public Integer map(String value) { return Integer.parseInt(value); }
});
```
{% endtab %}

{% tab title="Scala" %}
## Lambda函数

如前面的示例中已经看到的,所有操作都接受lambda函数来描述该操作:

```scala
val data: DataSet[String] = // [...]
data.filter { _.startsWith("http://") }
```

```scala
val data: DataSet[Int] = // [...]
data.reduce { (i1,i2) => i1 + i2 }
// or
data.reduce { _ + _ }
```

## Rich Function

所有将lambda函数作为参数的转换都可以将_富_函数作为参数。例如

```scala
data.map { x => x.toInt }
```

你可以写

```scala
class MyMapFunction extends RichMapFunction[String, Int] {
def map(in: String):Int = { in.toInt }
};
```

并将函数传递给`map`转换:

```scala
data.map(new MyMapFunction())
```

Rich Function也可以定义为匿名类:

```scala
data.map (new RichMapFunction[String, Int] {
def map(in: String):Int = { in.toInt }
})
```
{% endtab %}
{% endtabs %}

除了用户定义的函数\(`map``reduce`\)之外,Rich Function还提供了四种方法:`open``close``getRuntimeContext``setRuntimeContext`。对于参数化函数\(请参阅将[参数传递给函数](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/batch/#passing-parameters-to-functions)\)、创建和结束本地状态、访问广播变量\(请参阅 [广播变量](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/batch/#broadcast-variables)\)、访问运行时信息\(如累加器和计数器\(请参阅[累加器和计数器](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/user_defined_functions.html#accumulators--counters)\)以及关于Iterations的信息\(请参阅 [Iterations](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/batch/iterations.html)\),这些都很有用。

## 累加器和计数器

累加器是具有**加法运算****最终累加结果的**简单结构,可在作业结束后使用。

最简单的累加器是一个**计数器**:你可以使用`Accumulator.add(V value)`方法将其递增 。在任务结束时,Flink将汇总(合并)所有部分结果,并将结果发送给客户端。累加器在调试过程中或如果您想快速查找有关数据的更多信息时非常有用。

Flink当前具有以下**内置累加器**。它们每个都实现 [累加器](https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java) 接口。

* [**IntCounter**](https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/IntCounter.java)[**LongCounter**](https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/LongCounter.java)[**DoubleCounter**](https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/DoubleCounter.java):有关使用计数器的示例,请参见下文。
* [**直方图**](https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java):离散数量的bin的直方图实现。在内部,它只是从Integer到Integer的映射。您可以使用它来计算值的分布,例如,单词计数程序的每行单词的分布。

**如何使用累加器:**

首先,您必须在要使用它的用户定义的转换函数中创建一个累加器对象(此处是一个计数器)。

```text
private IntCounter numLines = new IntCounter();
```

其次,您必须通常在**RichFunc**法中 注册累加器对象。您还可以在此处定义名称。



Original file line number Diff line number Diff line change
@@ -1,19 +1,6 @@
# 概述

有状态函数和运算符在各个元素/事件的处理中存储数据,使状态成为任何类型的更复杂操作的关键构建块
在本节中,您将了解Flink提供的用于编写有状态程序的API。请查看[状态流处理,](https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html) 以了解状态流处理背后的概念

例如:

* 当应用程序搜索特定的事件模式时,状态将存储到目前为止遇到的事件序列。
* 当按每分钟/小时/每天聚合事件时,状态保留为处理的聚合。
* 当在数据点流上训练机器学习模型时,状态保存模型参数的当前版本。
* 当需要管理历史数据时,状态允许有效地访问过去发生的事件。

Flink需要了解状态,以便使用[检查点](https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/checkpointing.html)使状态容错,并允许流应用程序的[保存点](https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html)

有关状态的知识还允许重新调整Flink应用程序,这意味着Flink负责跨并行实例重新分配状态。

Flink 的[可查询状态](https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/queryable_state.html)功能允许您在运行时从Flink外部访问状态。

在使用state时,阅读[Flink的状态后端](https://ci.apache.org/projects/flink/flink-docs-master/ops/state/state_backends.html)可能也很有用。Flink提供了不同的状态后端,用于指定状态的存储方式和位置。State可以位于Java的堆上或堆外。根据您的状态后端,Flink还可以_管理_应用程序的状态,这意味着Flink处理内存管理(如果需要可能会溢出到磁盘)以允许应用程序保持非常大的状态。可以在不更改应用程序逻辑的情况下配置状态后端。

Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# 状态后端
# State Backends

Flink提供了不同的状态后端,用于指定状态的存储方式和位置
Flink提供了不同的State Backends,用于指定状态的存储方式和位置

状态可以位于Java的堆上或堆外。根据您的状态后端,Flink还可以管理应用程序的状态,这意味着Flink处理内存管理\(如果需要,可能会溢出到磁盘\),以允许应用程序保存非常大的状态。默认情况下,配置文件为`flink-conf.yaml`确定所有Flink作业的状态后端
状态可以位于Java的堆上或堆外。根据您的`State Backends`,Flink还可以管理应用程序的状态,这意味着Flink处理内存管理\(如果需要,可能会溢出到磁盘\),以允许应用程序保存非常大的状态。默认情况下,配置文件为`flink-conf.yaml`确定所有Flink作业的State Backends

但是,可以在每个作业的基础上覆盖默认状态后端,如下所示。
但是,可以在每个作业的基础上覆盖默认State Backends,如下所示。

有关可用状态后端,其优点,限制和配置参数的详细信息,请参阅[部署和操作中](https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/state/state_backends.html)的相应部分。
有关可用State Backends,其优点,限制和配置参数的详细信息,请参阅[部署和操作中](https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/state/state_backends.html)的相应部分。

{% tabs %}
{% tab title="Java" %}
Expand Down
2 changes: 1 addition & 1 deletion SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
* [广播状态模式](05-ying-yong-kai-fa/streaming-datastream-api/zhuang-tai-rong-cuo/guang-bo-zhuang-tai-mo-shi.md)
* [检查点\(Checkpointing\)](05-ying-yong-kai-fa/streaming-datastream-api/zhuang-tai-rong-cuo/checkpointing.md)
* [可查询状态\(Beta\)](05-ying-yong-kai-fa/streaming-datastream-api/zhuang-tai-rong-cuo/ke-cha-xun-zhuang-tai.md)
* [状态后端](05-ying-yong-kai-fa/streaming-datastream-api/zhuang-tai-rong-cuo/zhuang-tai-hou-duan.md)
* [State Backends](05-ying-yong-kai-fa/streaming-datastream-api/zhuang-tai-rong-cuo/zhuang-tai-hou-duan.md)
* [状态模式演化](05-ying-yong-kai-fa/streaming-datastream-api/zhuang-tai-rong-cuo/zhuang-tai-mo-shi-yan-hua.md)
* [自定义状态序列化](05-ying-yong-kai-fa/streaming-datastream-api/zhuang-tai-rong-cuo/zi-ding-yi-zhuang-tai-xu-lie-hua.md)
* [用户自定义函数(UDF)](05-ying-yong-kai-fa/streaming-datastream-api/yong-hu-zi-ding-yi-han-shu-udf.md)
Expand Down

0 comments on commit c11ab7d

Please sign in to comment.