Skip to content

Commit

Permalink
[FLINK-28510][python][connector] Support using new KafkaSink
Browse files Browse the repository at this point in the history
This closes apache#20263.
  • Loading branch information
vancior98 authored and HuangXingBo committed Jul 27, 2022
1 parent 94a3e2f commit c796a78
Show file tree
Hide file tree
Showing 15 changed files with 1,185 additions and 58 deletions.
2 changes: 1 addition & 1 deletion docs/content.zh/docs/connectors/datastream/formats/avro.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Flink 的序列化框架可以处理基于 Avro schemas 生成的类。为了能
</dependency>
```

{{< py_download_link "avro" "flink-sql-avro.jar" >}}
{{< py_download_link "avro" >}}

如果读取 Avro 文件数据,你必须指定 `AvroInputFormat`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Flink 支持读取 [Parquet](https://parquet.apache.org/) 文件并生成 {{< ja
</dependency>
```

{{< py_download_link "parquet" "flink-sql-parquet.jar" >}}
{{< py_download_link "parquet" >}}

此格式与新的 Source 兼容,可以同时在批和流模式下使用。
因此,你可使用此格式处理以下两类数据:
Expand Down
165 changes: 161 additions & 4 deletions docs/content.zh/docs/connectors/datastream/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ Apache Flink 集成了通用的 Kafka 连接器,它会尽力与 Kafka client
Flink 目前的流连接器还不是二进制发行版的一部分。
[在此处]({{< ref "docs/dev/configuration/overview" >}})可以了解到如何链接它们,从而在集群中运行。

{{< py_download_link "kafka" >}}

## Kafka Source
{{< hint info >}}
该文档描述的是基于[新数据源 API]({{< ref "docs/dev/datastream/sources.md" >}}) 的 Kafka Source。
Expand All @@ -53,6 +55,9 @@ Flink 目前的流连接器还不是二进制发行版的一部分。
### 使用方法
Kafka Source 提供了构建类来创建 ```KafkaSource``` 的实例。以下代码片段展示了如何构建 ```KafkaSource```
来消费 “input-topic” 最早位点的数据, 使用消费组 “my-group”,并且将 Kafka 消息体反序列化为字符串:

{{< tabs "KafkaSource" >}}
{{< tab "Java" >}}
```java
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
Expand All @@ -64,6 +69,22 @@ KafkaSource<String> source = KafkaSource.<String>builder()

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
```
{{< /tab >}}
{{< tab "Python" >}}
```python
source = KafkaSource.builder() \
.set_bootstrap_servers(brokers) \
.set_topics("input-topic") \
.set_group_id("my-group") \
.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()

env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
```
{{< /tab >}}
{{< /tabs >}}

以下属性在构建 KafkaSource 时是必须指定的:
- Bootstrap server,通过 ```setBootstrapServers(String)``` 方法配置
- 消费者组 ID,通过 ```setGroupId(String)``` 配置
Expand All @@ -73,20 +94,52 @@ env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
### Topic / Partition 订阅
Kafka Source 提供了 3 种 Topic / Partition 的订阅方式:
- Topic 列表,订阅 Topic 列表中所有 Partition 的消息:
{{< tabs "KafkaSource#setTopics" >}}
{{< tab "Java" >}}
```java
KafkaSource.builder().setTopics("topic-a", "topic-b");
```
{{< /tab >}}
{{< tab "Python" >}}
```python
KafkaSource.builder().set_topics("topic-a", "topic-b")
```
{{< /tab >}}
{{< /tabs >}}
- 正则表达式匹配,订阅与正则表达式所匹配的 Topic 下的所有 Partition:
{{< tabs "KafkaSource#setTopicPattern" >}}
{{< tab "Java" >}}
```java
KafkaSource.builder().setTopicPattern("topic.*");
```
{{< /tab >}}
{{< tab "Python" >}}
```python
KafkaSource.builder().set_topic_pattern("topic.*")
```
{{< /tab >}}
{{< /tabs >}}
- Partition 列表,订阅指定的 Partition:
{{< tabs "KafkaSource#setPartitions" >}}
{{< tab "Java" >}}
```java
final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(
new TopicPartition("topic-a", 0), // Partition 0 of topic "topic-a"
new TopicPartition("topic-b", 5))); // Partition 5 of topic "topic-b"
KafkaSource.builder().setPartitions(partitionSet);
```
{{< /tab >}}
{{< tab "Python" >}}
```python
partition_set = {
KafkaTopicPartition("topic-a", 0),
KafkaTopicPartition("topic-b", 5)
}
KafkaSource.builder().set_partitions(partition_set)
```
{{< /tab >}}
{{< /tabs >}}

### 消息解析
代码中需要提供一个反序列化器(Deserializer)来对 Kafka 的消息进行解析。
反序列化器通过 ```setDeserializer(KafkaRecordDeserializationSchema)``` 来指定,其中 ```KafkaRecordDeserializationSchema```
Expand All @@ -105,9 +158,16 @@ KafkaSource.<String>builder()
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
```

目前 PyFlink 只支持 ```set_value_only_deserializer``` 来自定义 Kafka 消息中值的反序列化.
```python
KafkaSource.builder().set_value_only_deserializer(SimpleStringSchema())
```

### 起始消费位点
Kafka source 能够通过位点初始化器(```OffsetsInitializer```)来指定从不同的偏移量开始消费 。内置的位点初始化器包括:

{{< tabs "KafkaSource#setStartingOffsets" >}}
{{< tab "Java" >}}
```java
KafkaSource.builder()
// 从消费组提交的位点开始消费,不指定位点重置策略
Expand All @@ -121,7 +181,25 @@ KafkaSource.builder()
// 从最末尾位点开始消费
.setStartingOffsets(OffsetsInitializer.latest());
```
如果内置的初始化器不能满足需求,也可以实现自定义的位点初始化器(```OffsetsInitializer```)。
{{< /tab >}}
{{< tab "Python" >}}
```python
KafkaSource.builder()
# 从消费组提交的位点开始消费,不指定位点重置策略
.set_starting_offsets(KafkaOffsetsInitializer.committed_offsets()) \
# 从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点
.set_starting_offsets(KafkaOffsetsInitializer.committed_offsets(KafkaOffsetResetStrategy.EARLIEST)) \
# 从时间戳大于等于指定时间戳(毫秒)的数据开始消费
.set_starting_offsets(KafkaOffsetsInitializer.timestamp(1657256176000)) \
# 从最早位点开始消费
.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
# 从最末尾位点开始消费
.set_starting_offsets(KafkaOffsetsInitializer.latest())
```
{{< /tab >}}
{{< /tabs >}}

如果内置的初始化器不能满足需求,也可以实现自定义的位点初始化器(```OffsetsInitializer```)。( PyFlink 不支持)

如果未指定位点初始化器,将默认使用 ```OffsetsInitializer.earliest()```

Expand Down Expand Up @@ -153,10 +231,22 @@ Kafka consumer 的配置可以参考 [Apache Kafka 文档](http://kafka.apache.o
为了在不重启 Flink 作业的情况下处理 Topic 扩容或新建 Topic 等场景,可以将 Kafka Source 配置为在提供的 Topic / Partition
订阅模式下定期检查新分区。要启用动态分区检查,请将 ```partition.discovery.interval.ms``` 设置为非负值:


{{< tabs "KafkaSource#PartitionDiscovery" >}}
{{< tab "Java" >}}
```java
KafkaSource.builder()
.setProperty("partition.discovery.interval.ms", "10000"); // 每 10 秒检查一次新分区
```
{{< /tab >}}
{{< tab "Python" >}}
```python
KafkaSource.builder() \
.set_property("partition.discovery.interval.ms", "10000") # 每 10 秒检查一次新分区
```
{{< /tab >}}
{{< /tabs >}}

{{< hint warning >}}
分区检查功能默认**不开启**。需要显式地设置分区检查间隔才能启用此功能。
{{< /hint >}}
Expand All @@ -167,7 +257,7 @@ KafkaSource.builder()
```java
env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy");
```
[这篇文档]({{< ref "docs/dev/datastream/event-time/generating_watermarks.md" >}})描述了如何自定义水印策略(```WatermarkStrategy```)。
[这篇文档]({{< ref "docs/dev/datastream/event-time/generating_watermarks.md" >}})描述了如何自定义水印策略(```WatermarkStrategy```)。( PyFlink 不支持)

### 空闲
如果并行度高于分区数,Kafka Source 不会自动进入空闲状态。您将需要降低并行度或向水印策略添加空闲超时。如果在这段时间内没有记录在流的分区中流动,则该分区被视为“空闲”并且不会阻止下游操作符中水印的进度。
Expand Down Expand Up @@ -266,24 +356,37 @@ Kafka consumer 的所有指标都注册在指标组 ```KafkaSourceReader.KafkaCo
要启用加密和认证相关的安全配置,只需将安全配置作为其他属性配置在 Kafka source 上即可。下面的代码片段展示了如何配置 Kafka source 以使用
PLAIN 作为 SASL 机制并提供 JAAS 配置:

{{< tabs "KafkaSource#SecurityPlain" >}}
{{< tab "Java" >}}
```java
KafkaSource.builder()
.setProperty("security.protocol", "SASL_PLAINTEXT")
.setProperty("sasl.mechanism", "PLAIN")
.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");
```
{{< /tab >}}
{{< tab "Python" >}}
```python
KafkaSource.builder() \
.set_property("security.protocol", "SASL_PLAINTEXT") \
.set_property("sasl.mechanism", "PLAIN") \
.set_property("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")
```
{{< /tab >}}
{{< /tabs >}}

另一个更复杂的例子,使用 SASL_SSL 作为安全协议并使用 SCRAM-SHA-256 作为 SASL 机制:

{{< tabs "KafkaSource#SecuritySASL" >}}
{{< tab "Java" >}}
```java
KafkaSource.builder()
.setProperty("security.protocol", "SASL_SSL")
// SSL 配置
// 配置服务端提供的 truststore (CA 证书) 的路径
// Configure the path of truststore (CA) provided by the server
.setProperty("ssl.truststore.location", "/path/to/kafka.client.truststore.jks")
.setProperty("ssl.truststore.password", "test1234")
// 如果要求客户端认证,则需要配置 keystore (私钥) 的路径
// Configure the path of keystore (private key) if client authentication is required
.setProperty("ssl.keystore.location", "/path/to/kafka.client.keystore.jks")
.setProperty("ssl.keystore.password", "test1234")
// SASL 配置
Expand All @@ -292,6 +395,26 @@ KafkaSource.builder()
// 配置 JAAS
.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";");
```
{{< /tab >}}
{{< tab "Python" >}}
```python
KafkaSource.builder() \
.set_property("security.protocol", "SASL_SSL") \
# SSL 配置
# 配置服务端提供的 truststore (CA 证书) 的路径
.set_property("ssl.truststore.location", "/path/to/kafka.client.truststore.jks") \
.set_property("ssl.truststore.password", "test1234") \
# 如果要求客户端认证,则需要配置 keystore (私钥) 的路径
.set_property("ssl.keystore.location", "/path/to/kafka.client.keystore.jks") \
.set_property("ssl.keystore.password", "test1234") \
# SASL 配置
# 将 SASL 机制配置为 SCRAM-SHA-256
.set_property("sasl.mechanism", "SCRAM-SHA-256") \
# 配置 JAAS
.set_property("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";")
```
{{< /tab >}}
{{< /tabs >}}
如果在作业 JAR 中 Kafka 客户端依赖的类路径被重置了(relocate class),登录模块(login module)的类路径可能会不同,因此请根据登录模块在
JAR 中实际的类路径来改写以上配置。

Expand Down Expand Up @@ -341,6 +464,8 @@ Kafka source 的源读取器扩展了 ```SourceReaderBase```,并使用单线
Kafka sink 提供了构建类来创建 ```KafkaSink``` 的实例。以下代码片段展示了如何将字符串数据按照至少一次(at lease once)的语义保证写入 Kafka
topic:

{{< tabs "KafkaSink" >}}
{{< tab "Java" >}}
```java
DataStream<String> stream = ...;

Expand All @@ -356,6 +481,24 @@ KafkaSink<String> sink = KafkaSink.<String>builder()

stream.sinkTo(sink);
```
{{< /tab >}}
{{< tab "Python" >}}
```python
sink = KafkaSink.builder() \
.set_bootstrap_servers(brokers) \
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic("topic-name")
.set_value_serialization_schema(SimpleStringSchema())
.build()
) \
.set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
.build()

stream.sink_to(sink)
```
{{< /tab >}}
{{< /tabs >}}

以下属性在构建 KafkaSink 时是必须指定的:

Expand All @@ -368,6 +511,8 @@ stream.sinkTo(sink);
构建时需要提供 ```KafkaRecordSerializationSchema``` 来将输入数据转换为 Kafka 的 ```ProducerRecord```。Flink 提供了 schema 构建器
以提供一些通用的组件,例如消息键(key)/消息体(value)序列化、topic 选择、消息分区,同样也可以通过实现对应的接口来进行更丰富的控制。

{{< tabs "KafkaSink#Serializer" >}}
{{< tab "Java" >}}
```java
KafkaRecordSerializationSchema.builder()
.setTopicSelector((element) -> {<your-topic-selection-logic>})
Expand All @@ -376,6 +521,18 @@ KafkaRecordSerializationSchema.builder()
.setPartitioner(new FlinkFixedPartitioner())
.build();
```
{{< /tab >}}
{{< tab "Python" >}}
```python
KafkaRecordSerializationSchema.builder() \
.set_topic_selector(lambda element: <your-topic-selection-logic>) \
.set_value_serialization_schema(SimpleStringSchema()) \
.set_key_serialization_schema(SimpleStringSchema()) \
# set partitioner is not supported in PyFlink
.build()
```
{{< /tab >}}
{{< /tabs >}}

其中消息体(value)序列化方法和 topic 的选择方法是必须指定的,此外也可以通过 ```setKafkaKeySerializer(Serializer)```
```setKafkaValueSerializer(Serializer)``` 来使用 Kafka 提供而非 Flink 提供的序列化器。
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/connectors/datastream/formats/avro.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ The serialization framework of Flink is able to handle classes generated from Av
</dependency>
```

{{< py_download_link "avro" "flink-sql-avro.jar" >}}
{{< py_download_link "avro" >}}

In order to read data from an Avro file, you have to specify an `AvroInputFormat`.

Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/connectors/datastream/formats/parquet.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ To read Avro records, you will need to add the `parquet-avro` dependency:
</dependency>
```

{{< py_download_link "parquet" "flink-sql-parquet.jar" >}}
{{< py_download_link "parquet" >}}

This format is compatible with the new Source that can be used in both batch and streaming execution modes.
Thus, you can use this format for two kinds of data:
Expand Down
Loading

0 comments on commit c796a78

Please sign in to comment.