Skip to content

Commit

Permalink
[FLINK-21996][python] Support Kinesis connector in Python DataStream API
Browse files Browse the repository at this point in the history
This closes apache#19869.
  • Loading branch information
pengmingde authored and dianfu committed Jun 8, 2022
1 parent 0a5e614 commit 9e63f21
Show file tree
Hide file tree
Showing 9 changed files with 1,010 additions and 6 deletions.
34 changes: 34 additions & 0 deletions docs/content.zh/docs/connectors/datastream/firehose.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,30 @@ val kdfSink =
flinkStream.sinkTo(kdfSink)
```
{{< /tab >}}
{{< tab "Python" >}}
```python
sink_properties = {
# Required
'aws.region': 'eu-west-1',
# Optional, provide via alternative routes e.g. environment variables
'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key'
}

kdf_sink = KinesisFirehoseSink.builder() \
.set_firehose_client_properties(sink_properties) \ # Required
.set_serialization_schema(SimpleStringSchema()) \ # Required
.set_delivery_stream_name('your-stream-name') \ # Required
.set_fail_on_error(False) \ # Optional
.set_max_batch_size(500) \ # Optional
.set_max_in_flight_requests(50) \ # Optional
.set_max_buffered_requests(10000) \ # Optional
.set_max_batch_size_in_bytes(5 * 1024 * 1024) \ # Optional
.set_max_time_in_buffer_ms(5000) \ # Optional
.set_max_record_size_in_bytes(1 * 1024 * 1024) \ # Optional
.build()
```
{{< /tab >}}
{{< /tabs >}}

## Configurations
Expand Down Expand Up @@ -156,6 +180,16 @@ producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_
producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566")
```
{{< /tab >}}
{{< tab "Python" >}}
```python
producer_config = {
'aws.region': 'us-east-1',
'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key',
'aws.endpoint': 'http://localhost:4566'
}
```
{{< /tab >}}
{{< /tabs >}}

{{< top >}}
145 changes: 144 additions & 1 deletion docs/content.zh/docs/connectors/datastream/kinesis.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,20 @@ val kinesis = env.addSource(new FlinkKinesisConsumer[String](
"kinesis_stream_name", new SimpleStringSchema, consumerConfig))
```
{{< /tab >}}
{{< tab "Python" >}}
```python
consumer_config = {
'aws.region': 'us-east-1',
'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key',
'flink.stream.initpos': 'LATEST'
}

env = StreamExecutionEnvironment.get_execution_environment()

kinesis = env.add_source(FlinkKinesisConsumer("stream-1", SimpleStringSchema(), consumer_config))
```
{{< /tab >}}
{{< /tabs >}}

The above is a simple example of using the consumer. Configuration for the consumer is supplied with a `java.util.Properties`
Expand Down Expand Up @@ -214,6 +228,12 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // checkpoint every 5000 msecs
```
{{< /tab >}}
{{< tab "Python" >}}
```python
env = StreamExecutionEnvironment.get_execution_environment()
env.enable_checkpointing(5000) # checkpoint every 5000 msecs
```
{{< /tab >}}
{{< /tabs >}}

Also note that Flink can only restart the topology if enough processing slots are available to restart the topology.
Expand Down Expand Up @@ -271,6 +291,21 @@ val kinesis = env.addSource(new FlinkKinesisConsumer[String](
"kinesis_stream_name", new SimpleStringSchema, consumerConfig))
```
{{< /tab >}}
{{< tab "Python" >}}
```python
consumer_config = {
'aws.region': 'us-east-1',
'flink.stream.initpos': 'LATEST',
'flink.stream.recordpublisher': 'EFO',
'flink.stream.efo.consumername': 'my-flink-efo-consumer'
}

env = StreamExecutionEnvironment.get_execution_environment()

kinesis = env.add_source(FlinkKinesisConsumer(
"kinesis_stream_name", SimpleStringSchema(), consumer_config))
```
{{< /tab >}}
{{< /tabs >}}

#### EFO Stream Consumer Registration/Deregistration
Expand Down Expand Up @@ -346,6 +381,24 @@ val kinesis = env.addSource(new FlinkKinesisConsumer[String](
"kinesis_stream_name", new SimpleStringSchema, consumerConfig))
```
{{< /tab >}}
{{< tab "Python" >}}
```python
consumer_config = {
'aws.region': 'us-east-1',
'flink.stream.initpos': 'LATEST',

'flink.stream.recordpublisher': 'EFO',
'flink.stream.efo.consumername': 'my-flink-efo-consumer',

'flink.stream.efo.registration': 'EAGER'
}

env = StreamExecutionEnvironment.get_execution_environment()

kinesis = env.add_source(FlinkKinesisConsumer(
"kinesis_stream_name", SimpleStringSchema(), consumer_config))
```
{{< /tab >}}
{{< /tabs >}}

Below is an example configuration to use the `NONE` registration strategy:
Expand Down Expand Up @@ -393,6 +446,24 @@ val kinesis = env.addSource(new FlinkKinesisConsumer[String](
"kinesis_stream_name", new SimpleStringSchema, consumerConfig))
```
{{< /tab >}}
{{< tab "Python" >}}
```python
consumer_config = {
'aws.region': 'us-east-1',
'flink.stream.initpos': 'LATEST',
'flink.stream.recordpublisher': 'EFO',
'flink.stream.efo.consumername': 'my-flink-efo-consumer',
'flink.stream.efo.registration': 'NONE',
'flink.stream.efo.consumerarn.stream-name':
'arn:aws:kinesis:<region>:<account>>:stream/<stream-name>/consumer/<consumer-name>:<create-timestamp>'
}

env = StreamExecutionEnvironment.get_execution_environment()

kinesis = env.add_source(FlinkKinesisConsumer(
"kinesis_stream_name", SimpleStringSchema(), consumer_config))
```
{{< /tab >}}
{{< /tabs >}}

### Event Time for Consumed Records
Expand Down Expand Up @@ -432,6 +503,15 @@ val stream = env
.print();
```
{{< /tab >}}
{{< tab "Python" >}}
```python
consumer = FlinkKinesisConsumer(
"kinesis_stream_name",
SimpleStringSchema()
consumer_config)
stream = env.add_source(consumer).print()
```
{{< /tab >}}
{{< /tabs >}}

Internally, an instance of the assigner is executed per shard / consumer thread (see threading model below).
Expand All @@ -452,11 +532,21 @@ to avoid the event time skew related problems described in [Event time synchroni

To enable synchronization, set the watermark tracker on the consumer:

{{< tabs "8fbaf5cb-3b76-4c62-a74e-db51b60f6601" >}}
{{< tab "Java" >}}
```java
JobManagerWatermarkTracker watermarkTracker =
new JobManagerWatermarkTracker("myKinesisSource");
consumer.setWatermarkTracker(watermarkTracker);
```
{{< /tab >}}
{{< tab "Python" >}}
```python
watermark_tracker = WatermarkTracker.job_manager_watermark_tracker("myKinesisSource")
consumer.set_watermark_tracker(watermark_tracker)
```
{{< /tab >}}
{{< /tabs >}}

The `JobManagerWatermarkTracker` will use a global aggregate to synchronize the per subtask watermarks. Each subtask
uses a per shard queue to control the rate at which records are emitted downstream based on how far ahead of the global
Expand Down Expand Up @@ -632,6 +722,37 @@ val simpleStringStream = ...
simpleStringStream.sinkTo(kdsSink)
```
{{< /tab >}}
{{< tab "Python" >}}
```python
# Required
sink_properties = {
# Required
'aws.region': 'us-east-1',

# Optional, provide via alternative routes e.g. environment variables
'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key',
'aws.endpoint': 'http://localhost:4567'
}

kds_sink = KinesisSink.builder() \
.set_kinesis_client_properties(sink_properties) \ # Required
.set_serialization_schema(SimpleStringSchema()) \ # Required
.set_partition_key_generator(PartitionKeyGenerator.fixed()) \ # Required
.set_stream_name("your-stream-name") \ # Required
.set_fail_on_error(False) \ # Optional
.set_max_batch_size(500) \ # Optional
.set_max_in_flight_requests(50) \ # Optional
.set_max_buffered_requests(10000) \ # Optional
.set_max_batch_size_in_bytes(5 * 1024 * 1024) \ # Optional
.set_max_time_in_buffer_ms(5000) \ # Optional
.set_max_record_size_in_bytes(1 * 1024 * 1024) \ # Optional
.build()

simple_string_stream = ...
simple_string_stream.sink_to(kds_sink)
```
{{< /tab >}}
{{< /tabs >}}

The above is a simple example of using the Kinesis sink. Begin by creating a `java.util.Properties` instance with the `AWS_REGION`, `AWS_ACCESS_KEY_ID`, and `AWS_SECRET_ACCESS_KEY` configured. You can then construct the sink with the builder. The default values for the optional configurations are shown above. Some of these values have been set as a result of [configuration on KDS](https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html).
Expand All @@ -657,13 +778,25 @@ begins to exhibit blocking behaviour. More information on the rate restrictions
found at [Quotas and Limits](https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html).

You generally reduce backpressure by increasing the size of the internal queue:
```

{{< tabs "6df3b696-c2ca-4f44-bea0-96cf8275d61d" >}}
{{< tab "Java" >}}
```java
KinesisStreamsSink<String> kdsSink =
KinesisStreamsSink.<String>builder()
...
.setMaxBufferedRequests(10_000)
...
```
{{< /tab >}}
{{< tab "Python" >}}
```python
kds_sink = KinesisStreamsSink.builder() \
.set_max_buffered_requests(10000) \
.build()
```
{{< /tab >}}
{{< /tabs >}}

## Kinesis Producer

Expand Down Expand Up @@ -701,6 +834,16 @@ config.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
config.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567")
```
{{< /tab >}}
{{< tab "Python" >}}
```python
config = {
'aws.region': 'us-east-1',
'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key',
'aws.endpoint': 'http://localhost:4567'
}
```
{{< /tab >}}
{{< /tabs >}}

{{< top >}}
34 changes: 34 additions & 0 deletions docs/content/docs/connectors/datastream/firehose.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,30 @@ val kdfSink =
flinkStream.sinkTo(kdfSink)
```
{{< /tab >}}
{{< tab "Python" >}}
```python
sink_properties = {
# Required
'aws.region': 'eu-west-1',
# Optional, provide via alternative routes e.g. environment variables
'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key'
}

kdf_sink = KinesisFirehoseSink.builder() \
.set_firehose_client_properties(sink_properties) \ # Required
.set_serialization_schema(SimpleStringSchema()) \ # Required
.set_delivery_stream_name('your-stream-name') \ # Required
.set_fail_on_error(False) \ # Optional
.set_max_batch_size(500) \ # Optional
.set_max_in_flight_requests(50) \ # Optional
.set_max_buffered_requests(10000) \ # Optional
.set_max_batch_size_in_bytes(5 * 1024 * 1024) \ # Optional
.set_max_time_in_buffer_ms(5000) \ # Optional
.set_max_record_size_in_bytes(1 * 1024 * 1024) \ # Optional
.build()
```
{{< /tab >}}
{{< /tabs >}}

## Configurations
Expand Down Expand Up @@ -156,6 +180,16 @@ producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_
producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566")
```
{{< /tab >}}
{{< tab "Python" >}}
```python
producer_config = {
'aws.region': 'us-east-1',
'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key',
'aws.endpoint': 'http://localhost:4566'
}
```
{{< /tab >}}
{{< /tabs >}}

{{< top >}}
Loading

0 comments on commit 9e63f21

Please sign in to comment.