Skip to content

Commit 3e6a20a

Browse files
authored
[Improve][CDC] Use Source to output the CatalogTable (apache#5626)
1 parent 77fbbbd commit 3e6a20a

File tree

3 files changed

+12
-14
lines changed

3 files changed

+12
-14
lines changed

docs/en/connector-v2/formats/cdc-compatible-debezium-json.md

-11
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,6 @@ source {
2121

2222
base-url="jdbc:mysql://localhost:3306/test"
2323
"startup.mode"=INITIAL
24-
catalog {
25-
factory=MySQL
26-
}
2724
table-names=[
2825
"database1.t1",
2926
"database1.t2",
@@ -41,14 +38,6 @@ source {
4138
# topic prefix
4239
database.server.name = "mysql_cdc_1"
4340
}
44-
# compatible_debezium_json fixed schema
45-
schema = {
46-
fields = {
47-
topic = string
48-
key = string
49-
value = string
50-
}
51-
}
5241
}
5342
}
5443

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java

+12
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,14 @@
2929
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
3030
import org.apache.seatunnel.api.source.SupportCoordinate;
3131
import org.apache.seatunnel.api.table.catalog.CatalogTable;
32+
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
3233
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
3334
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
3435
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
3536
import org.apache.seatunnel.connectors.cdc.base.config.StartupConfig;
3637
import org.apache.seatunnel.connectors.cdc.base.config.StopConfig;
3738
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
39+
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
3840
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
3941
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
4042
import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
@@ -56,14 +58,17 @@
5658
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
5759
import org.apache.seatunnel.connectors.cdc.base.source.split.state.SourceSplitStateBase;
5860
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
61+
import org.apache.seatunnel.connectors.cdc.debezium.DeserializeFormat;
5962
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
6063
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
6164
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderOptions;
65+
import org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonDeserializationSchema;
6266

6367
import com.google.common.collect.Sets;
6468
import io.debezium.relational.TableId;
6569
import lombok.NoArgsConstructor;
6670

71+
import java.util.Collections;
6772
import java.util.HashMap;
6873
import java.util.HashSet;
6974
import java.util.Iterator;
@@ -145,6 +150,13 @@ private StopConfig getStopConfig(ReadonlyConfig config) {
145150

146151
@Override
147152
public List<CatalogTable> getProducedCatalogTables() {
153+
if (DeserializeFormat.COMPATIBLE_DEBEZIUM_JSON.equals(
154+
readonlyConfig.get(JdbcSourceOptions.FORMAT))) {
155+
return Collections.singletonList(
156+
CatalogTableUtil.getCatalogTable(
157+
"default.default",
158+
CompatibleDebeziumJsonDeserializationSchema.DEBEZIUM_DATA_ROW_TYPE));
159+
}
148160
return catalogTables;
149161
}
150162

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf

-3
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,6 @@ source {
3535
password = "seatunnel"
3636
table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
3737
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
38-
catalog {
39-
factory = MySQL
40-
}
4138
}
4239
}
4340

0 commit comments

Comments
 (0)