Skip to content

Commit

Permalink
[FLINK-34509] docs: add missing "url" option for Debezium Avro
Browse files Browse the repository at this point in the history
  • Loading branch information
affo authored and JingGe committed Feb 28, 2024
1 parent bfaa75a commit 641f4f4
Showing 1 changed file with 30 additions and 6 deletions.
36 changes: 30 additions & 6 deletions docs/content/docs/connectors/table/formats/debezium.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ However, currently Flink can't combine UPDATE_BEFORE and UPDATE_AFTER into a sin
Dependencies
------------

#### Debezium Avro
#### Debezium Confluent Avro

{{< sql_download_table "debezium-avro-confluent" >}}

Expand Down Expand Up @@ -85,7 +85,9 @@ Debezium provides a unified format for changelog, here is a simple example for a
*Note: please refer to [Debezium documentation](https://debezium.io/documentation/reference/1.3/connectors/mysql.html#mysql-connector-events_debezium) about the meaning of each fields.*

The MySQL `products` table has 4 columns (`id`, `name`, `description` and `weight`). The above JSON message is an update change event on the `products` table where the `weight` value of the row with `id = 111` is changed from `5.18` to `5.15`.
Assuming this messages is synchronized to Kafka topic `products_binlog`, then we can use the following DDL to consume this topic and interpret the change events.
Assuming this messages is synchronized to Kafka topic `products_binlog`, then we can use the following DDLs (for Debezium JSON and Debezium Confluent Avro) to consume this topic and interpret the change events.

#### Debezium JSON DDL

```sql
CREATE TABLE topic_products (
Expand All @@ -100,7 +102,6 @@ CREATE TABLE topic_products (
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
-- using 'debezium-json' as the format to interpret Debezium JSON messages
-- please use 'debezium-avro-confluent' if Debezium encodes messages in Avro format
'format' = 'debezium-json'
)
```
Expand Down Expand Up @@ -133,7 +134,30 @@ In some cases, users may setup the Debezium Kafka Connect with the Kafka configu

In order to interpret such messages, you need to add the option `'debezium-json.schema-include' = 'true'` into above DDL WITH clause (`false` by default). Usually, this is not recommended to include schema because this makes the messages very verbose and reduces parsing performance.

After registering the topic as a Flink table, then you can consume the Debezium messages as a changelog source.
#### Debezium Confluent Avro DDL

```sql
CREATE TABLE topic_products (
-- schema is totally the same to the MySQL "products" table
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'products_binlog',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
-- using 'debezium-avro-confluent' as the format to interpret Debezium Avro messages
'format' = 'debezium-avro-confluent',
-- the URL to the schema registry for Kafka
'debezium-avro-confluent.url' = 'http://localhost:8081'
)
```

#### Producing Results

For every data format, after registering the topic as a Flink table, you can consume the Debezium messages as a changelog source.

```sql
-- a real-time materialized view on the MySQL "products"
Expand Down Expand Up @@ -234,8 +258,8 @@ CREATE TABLE KafkaTable (
Format Options
----------------

Flink provides `debezium-avro-confluent` and `debezium-json` formats to interpret Avro or Json messages produced by Debezium.
Use format `debezium-avro-confluent` to interpret Debezium Avro messages and format `debezium-json` to interpret Debezium Json messages.
Flink provides `debezium-avro-confluent` and `debezium-json` formats to interpret Avro or JSON messages produced by Debezium.
Use format `debezium-avro-confluent` to interpret Debezium Avro messages and format `debezium-json` to interpret Debezium JSON messages.

{{< tabs "a8edce02-58d5-4e0b-bc4b-75d05a98a0f9" >}}
{{< tab "Debezium Avro" >}}
Expand Down

0 comments on commit 641f4f4

Please sign in to comment.