Skip to content

Commit 53a1f0c

Browse files
authored
[Feature][connector][kafka] Support read debezium format message from kafka (apache#5066)
1 parent e964c03 commit 53a1f0c

File tree

22 files changed

+1270
-24
lines changed

22 files changed

+1270
-24
lines changed

.github/workflows/backend.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -564,7 +564,7 @@ jobs:
564564
matrix:
565565
java: [ '8', '11' ]
566566
os: [ 'ubuntu-latest' ]
567-
timeout-minutes: 90
567+
timeout-minutes: 150
568568
steps:
569569
- uses: actions/checkout@v2
570570
- name: Set up JDK ${{ matrix.java }}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
# Debezium Format
2+
3+
Changelog-Data-Capture Format: Serialization Schema Format: Deserialization Schema
4+
5+
Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a *change event stream*, and applications simply read these streams to see the change events in the same order in which they occurred.
6+
7+
Seatunnel supports to interpret Debezium JSON messages as INSERT/UPDATE/DELETE messages into seatunnel system. This is useful in many cases to leverage this feature, such as
8+
9+
synchronizing incremental data from databases to other systems
10+
auditing logs
11+
real-time materialized views on databases
12+
temporal join changing history of a database table and so on.
13+
14+
Seatunnel also supports to encode the INSERT/UPDATE/DELETE messages in Seatunnel asDebezium JSON messages, and emit to storage like Kafka.
15+
16+
# Format Options
17+
18+
| option | default | required | Description |
19+
|-----------------------------------|---------|----------|------------------------------------------------------------------------------------------------------|
20+
| format | (none) | yes | Specify what format to use, here should be 'debezium_json'. |
21+
| debezium-json.ignore-parse-errors | false | no | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors. |
22+
23+
# How to use Debezium format
24+
25+
## Kafka uses example
26+
27+
Debezium provides a unified format for changelog, here is a simple example for an update operation captured from a MySQL products table:
28+
29+
```bash
30+
{
31+
"before": {
32+
"id": 111,
33+
"name": "scooter",
34+
"description": "Big 2-wheel scooter ",
35+
"weight": 5.18
36+
},
37+
"after": {
38+
"id": 111,
39+
"name": "scooter",
40+
"description": "Big 2-wheel scooter ",
41+
"weight": 5.17
42+
},
43+
"source": {
44+
"version": "1.1.1.Final",
45+
"connector": "mysql",
46+
"name": "dbserver1",
47+
"ts_ms": 1589362330000,
48+
"snapshot": "false",
49+
"db": "inventory",
50+
"table": "products",
51+
"server_id": 223344,
52+
"gtid": null,
53+
"file": "mysql-bin.000003",
54+
"pos": 2090,
55+
"row": 0,
56+
"thread": 2,
57+
"query": null
58+
},
59+
"op": "u",
60+
"ts_ms": 1589362330904,
61+
"transaction": null
62+
}
63+
```
64+
65+
Note: please refer to Debezium documentation about the meaning of each fields.
66+
67+
The MySQL products table has 4 columns (id, name, description and weight).
68+
The above JSON message is an update change event on the products table where the weight value of the row with id = 111 is changed from 5.18 to 5.15.
69+
Assuming the messages have been synchronized to Kafka topic products_binlog, then we can use the following Seatunnel conf to consume this topic and interpret the change events by Debezium format.
70+
71+
```bash
72+
env {
73+
execution.parallelism = 1
74+
job.mode = "BATCH"
75+
}
76+
77+
source {
78+
Kafka {
79+
bootstrap.servers = "kafkaCluster:9092"
80+
topic = "products_binlog"
81+
result_table_name = "kafka_name"
82+
start_mode = earliest
83+
schema = {
84+
fields {
85+
id = "int"
86+
name = "string"
87+
description = "string"
88+
weight = "string"
89+
}
90+
}
91+
format = debezium_json
92+
}
93+
94+
}
95+
96+
transform {
97+
}
98+
99+
sink {
100+
Kafka {
101+
bootstrap.servers = "kafkaCluster:9092"
102+
topic = "consume-binlog"
103+
format = debezium_json
104+
}
105+
}
106+
```
107+

docs/en/connector-v2/sink/Kafka.md

+10-6
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,10 @@ Kafka distinguishes different transactions by different transactionId. This para
108108

109109
### format
110110

111-
Data format. The default format is json. Optional text format. The default field separator is ",".
112-
If you customize the delimiter, add the "field_delimiter" option.
111+
Data format. The default format is json. Optional text format, canal-json and debezium-json.
112+
If you use json or text format. The default field separator is ", ". If you customize the delimiter, add the "field_delimiter" option.
113+
If you use canal format, please refer to [canal-json](../formats/canal-json.md) for details.
114+
If you use debezium format, please refer to [debezium-json](../formats/debezium-json.md) for details.
113115

114116
### field_delimiter
115117

@@ -209,8 +211,10 @@ sink {
209211

210212
### next version
211213

212-
- [Improve] Support to specify multiple partition keys [3230](https://github.com/apache/seatunnel/pull/3230)
213-
- [Improve] Add text format for kafka sink connector [3711](https://github.com/apache/seatunnel/pull/3711)
214-
- [Improve] Support extract topic from SeaTunnelRow fields [3742](https://github.com/apache/seatunnel/pull/3742)
215-
- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/seatunnel/pull/3719)
214+
- [Improve] Support to specify multiple partition keys [3230](https://github.com/apache/incubator-seatunnel/pull/3230)
215+
- [Improve] Add text format for kafka sink connector [3711](https://github.com/apache/incubator-seatunnel/pull/3711)
216+
- [Improve] Support extract topic from SeaTunnelRow fields [3742](https://github.com/apache/incubator-seatunnel/pull/3742)
217+
- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719)
218+
- [Improve] Support read canal format message [3950](https://github.com/apache/incubator-seatunnel/pull/3950)
219+
- [Improve] Support read debezium format message [3981](https://github.com/apache/incubator-seatunnel/pull/3981)
216220

docs/en/connector-v2/source/kafka.md

+10-7
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,10 @@ The structure of the data, including field names and field types.
7373

7474
## format
7575

76-
Data format. The default format is json. Optional text format. The default field separator is ", ".
77-
If you customize the delimiter, add the "field_delimiter" option.
76+
Data format. The default format is json. Optional text format, canal-json and debezium-json.
77+
If you use json or text format. The default field separator is ", ". If you customize the delimiter, add the "field_delimiter" option.
78+
If you use canal format, please refer to [canal-json](../formats/canal-json.md) for details.
79+
If you use debezium format, please refer to [debezium-json](../formats/debezium-json.md) for details.
7880

7981
## format_error_handle_way
8082

@@ -221,9 +223,10 @@ source {
221223

222224
### Next Version
223225

224-
- [Improve] Support setting read starting offset or time at startup config ([3157](https://github.com/apache/seatunnel/pull/3157))
225-
- [Improve] Support for dynamic discover topic & partition in streaming mode ([3125](https://github.com/apache/seatunnel/pull/3125))
226-
- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/seatunnel/pull/3719)
227-
- [Bug] Fixed the problem that parsing the offset format failed when the startup mode was offset([3810](https://github.com/apache/seatunnel/pull/3810))
228-
- [Feature] Kafka source supports data deserialization failure skipping([4364](https://github.com/apache/seatunnel/pull/4364))
226+
- [Improve] Support setting read starting offset or time at startup config ([3157](https://github.com/apache/incubator-seatunnel/pull/3157))
227+
- [Improve] Support for dynamic discover topic & partition in streaming mode ([3125](https://github.com/apache/incubator-seatunnel/pull/3125))
228+
- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719)
229+
- [Bug] Fixed the problem that parsing the offset format failed when the startup mode was offset([3810](https://github.com/apache/incubator-seatunnel/pull/3810))
230+
- [Improve] Support read canal format message [3950](https://github.com/apache/incubator-seatunnel/pull/3950)
231+
- [Improve] Support read debezium format message [3981](https://github.com/apache/incubator-seatunnel/pull/3981)
229232

release-note.md

+11-1
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,19 @@
33
## Bug fix
44

55
### Core
6-
76
- [Core] [API] Fixed generic class loss for lists (#4421)
87
- [Core] [API] Fix parse nested row data type key changed upper (#4459)
8+
- [Starter][Flink]Support transform-v2 for flink #3396
9+
- [Flink] Support flink 1.14.x #3963
10+
### Transformer
11+
- [Spark] Support transform-v2 for spark (#3409)
12+
- [ALL]Add FieldMapper Transform #3781
13+
### Connectors
14+
- [Elasticsearch] Support https protocol & compatible with opensearch
15+
- [Hbase] Add hbase sink connector #4049
16+
### Formats
17+
- [Canal]Support read canal format message #3950
18+
- [Debezium]Support debezium canal format message #3981
919

1020
### Connector-V2
1121

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@
2626
public class Config {
2727

2828
public static final String CONNECTOR_IDENTITY = "Kafka";
29-
public static final String REPLICATION_FACTOR = "replication.factor";
30-
3129
/** The default field delimiter is “,” */
3230
public static final String DEFAULT_FIELD_DELIMITER = ",";
3331

@@ -99,6 +97,12 @@ public class Config {
9997
"Data format. The default format is json. Optional text format. The default field separator is \", \". "
10098
+ "If you customize the delimiter, add the \"field_delimiter\" option.");
10199

100+
public static final Option<Boolean> DEBEZIUM_RECORD_INCLUDE_SCHEMA =
101+
Options.key("debezium_record_include_schema")
102+
.booleanType()
103+
.defaultValue(true)
104+
.withDescription("Does the debezium record carry a schema.");
105+
102106
public static final Option<String> FIELD_DELIMITER =
103107
Options.key("field_delimiter")
104108
.stringType()

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java

+1
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,6 @@ public enum MessageFormat {
2121
JSON,
2222
TEXT,
2323
CANAL_JSON,
24+
DEBEZIUM_JSON,
2425
COMPATIBLE_DEBEZIUM_JSON
2526
}

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java

+3
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonSerializationSchema;
2929
import org.apache.seatunnel.format.json.JsonSerializationSchema;
3030
import org.apache.seatunnel.format.json.canal.CanalJsonSerializationSchema;
31+
import org.apache.seatunnel.format.json.debezium.DebeziumJsonSerializationSchema;
3132
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
3233
import org.apache.seatunnel.format.text.TextSerializationSchema;
3334

@@ -219,6 +220,8 @@ private static SerializationSchema createSerializationSchema(
219220
.build();
220221
case CANAL_JSON:
221222
return new CanalJsonSerializationSchema(rowType);
223+
case DEBEZIUM_JSON:
224+
return new DebeziumJsonSerializationSchema(rowType);
222225
case COMPATIBLE_DEBEZIUM_JSON:
223226
return new CompatibleDebeziumJsonSerializationSchema(rowType, isKey);
224227
default:

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java

+10
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
4848
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
4949
import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
50+
import org.apache.seatunnel.format.json.debezium.DebeziumJsonDeserializationSchema;
5051
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
5152
import org.apache.seatunnel.format.text.TextDeserializationSchema;
5253
import org.apache.seatunnel.format.text.constant.TextFormatConstant;
@@ -62,6 +63,7 @@
6263
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
6364
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMMIT_ON_CHECKPOINT;
6465
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONSUMER_GROUP;
66+
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEBEZIUM_RECORD_INCLUDE_SCHEMA;
6567
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FIELD_DELIMITER;
6668
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
6769
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
@@ -266,6 +268,14 @@ private void setDeserialization(Config config) {
266268
.setIgnoreParseErrors(true)
267269
.build();
268270
break;
271+
case DEBEZIUM_JSON:
272+
boolean includeSchema = DEBEZIUM_RECORD_INCLUDE_SCHEMA.defaultValue();
273+
if (config.hasPath(DEBEZIUM_RECORD_INCLUDE_SCHEMA.key())) {
274+
includeSchema = config.getBoolean(DEBEZIUM_RECORD_INCLUDE_SCHEMA.key());
275+
}
276+
deserializationSchema =
277+
new DebeziumJsonDeserializationSchema(typeInfo, true, includeSchema);
278+
break;
269279
default:
270280
throw new SeaTunnelJsonFormatException(
271281
CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + format);

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java

+1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public OptionRule optionRule() {
4646
Config.KAFKA_CONFIG,
4747
Config.SCHEMA,
4848
Config.FORMAT,
49+
Config.DEBEZIUM_RECORD_INCLUDE_SCHEMA,
4950
Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS)
5051
.conditional(Config.START_MODE, StartMode.TIMESTAMP, Config.START_MODE_TIMESTAMP)
5152
.conditional(

0 commit comments

Comments
 (0)