Skip to content

Commit 8fc4f3c

Browse files
jiazhaisijie
authored andcommittedMar 19, 2019
hide kafka-connecter details for easy use debezium connector (apache#3825)
currently we explored too much internal config for debezium connector, this PR is to hide some details and make the config easier. expected pass integration tests.
1 parent 7b2ccd1 commit 8fc4f3c

File tree

15 files changed

+343
-112
lines changed

15 files changed

+343
-112
lines changed
 

‎distribution/io/src/assemble/io.xml

+1
Original file line numberDiff line numberDiff line change
@@ -64,5 +64,6 @@
6464
<file><source>${basedir}/../../pulsar-io/canal/target/pulsar-io-canal-${project.version}.nar</source></file>
6565
<file><source>${basedir}/../../pulsar-io/netty/target/pulsar-io-netty-${project.version}.nar</source></file>
6666
<file><source>${basedir}/../../pulsar-io/mongo/target/pulsar-io-mongo-${project.version}.nar</source></file>
67+
<file><source>${basedir}/../../pulsar-io/debezium/mysql/target/pulsar-io-debezium-mysql-${project.version}.nar</source></file>
6768
</files>
6869
</assembly>

‎pulsar-io/debezium/core/pom.xml

+104
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
<!--
2+
3+
Licensed to the Apache Software Foundation (ASF) under one
4+
or more contributor license agreements. See the NOTICE file
5+
distributed with this work for additional information
6+
regarding copyright ownership. The ASF licenses this file
7+
to you under the Apache License, Version 2.0 (the
8+
"License"); you may not use this file except in compliance
9+
with the License. You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing,
14+
software distributed under the License is distributed on an
15+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
KIND, either express or implied. See the License for the
17+
specific language governing permissions and limitations
18+
under the License.
19+
20+
-->
21+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
22+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
23+
<modelVersion>4.0.0</modelVersion>
24+
<parent>
25+
<groupId>org.apache.pulsar</groupId>
26+
<artifactId>pulsar-io-debezium</artifactId>
27+
<version>2.4.0-SNAPSHOT</version>
28+
</parent>
29+
30+
<artifactId>pulsar-io-debezium-core</artifactId>
31+
<name>Pulsar IO :: Debezium :: Core</name>
32+
33+
<dependencies>
34+
35+
<dependency>
36+
<groupId>${project.groupId}</groupId>
37+
<artifactId>pulsar-io-core</artifactId>
38+
<version>${project.version}</version>
39+
</dependency>
40+
41+
<dependency>
42+
<groupId>io.debezium</groupId>
43+
<artifactId>debezium-core</artifactId>
44+
<version>${debezium.version}</version>
45+
</dependency>
46+
47+
<dependency>
48+
<groupId>${project.groupId}</groupId>
49+
<artifactId>pulsar-io-kafka-connect-adaptor</artifactId>
50+
<version>${project.version}</version>
51+
</dependency>
52+
53+
<dependency>
54+
<groupId>org.apache.kafka</groupId>
55+
<artifactId>kafka_${scala.binary.version}</artifactId>
56+
<version>${kafka-client.version}</version>
57+
</dependency>
58+
59+
<dependency>
60+
<groupId>org.apache.kafka</groupId>
61+
<artifactId>connect-runtime</artifactId>
62+
<version>${kafka-client.version}</version>
63+
</dependency>
64+
65+
<dependency>
66+
<groupId>${project.groupId}</groupId>
67+
<artifactId>pulsar-client-original</artifactId>
68+
<version>${project.version}</version>
69+
</dependency>
70+
71+
<dependency>
72+
<groupId>${project.groupId}</groupId>
73+
<artifactId>pulsar-broker</artifactId>
74+
<version>${project.version}</version>
75+
<scope>test</scope>
76+
</dependency>
77+
78+
<dependency>
79+
<groupId>${project.groupId}</groupId>
80+
<artifactId>managed-ledger-original</artifactId>
81+
<version>${project.version}</version>
82+
<type>test-jar</type>
83+
<scope>test</scope>
84+
</dependency>
85+
86+
<dependency>
87+
<groupId>${project.groupId}</groupId>
88+
<artifactId>pulsar-zookeeper-utils</artifactId>
89+
<version>${project.version}</version>
90+
<type>test-jar</type>
91+
<scope>test</scope>
92+
</dependency>
93+
94+
<dependency>
95+
<groupId>${project.groupId}</groupId>
96+
<artifactId>pulsar-broker</artifactId>
97+
<version>${project.version}</version>
98+
<scope>test</scope>
99+
<type>test-jar</type>
100+
</dependency>
101+
102+
</dependencies>
103+
104+
</project>

‎pulsar-io/debezium/mysql/pom.xml

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
<!--
2+
3+
Licensed to the Apache Software Foundation (ASF) under one
4+
or more contributor license agreements. See the NOTICE file
5+
distributed with this work for additional information
6+
regarding copyright ownership. The ASF licenses this file
7+
to you under the Apache License, Version 2.0 (the
8+
"License"); you may not use this file except in compliance
9+
with the License. You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing,
14+
software distributed under the License is distributed on an
15+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
KIND, either express or implied. See the License for the
17+
specific language governing permissions and limitations
18+
under the License.
19+
20+
-->
21+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
22+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
23+
<modelVersion>4.0.0</modelVersion>
24+
<parent>
25+
<groupId>org.apache.pulsar</groupId>
26+
<artifactId>pulsar-io-debezium</artifactId>
27+
<version>2.4.0-SNAPSHOT</version>
28+
</parent>
29+
30+
<artifactId>pulsar-io-debezium-mysql</artifactId>
31+
<name>Pulsar IO :: Debezium :: mysql</name>
32+
33+
<dependencies>
34+
35+
<dependency>
36+
<groupId>${project.groupId}</groupId>
37+
<artifactId>pulsar-io-debezium-core</artifactId>
38+
<version>${project.version}</version>
39+
</dependency>
40+
41+
<dependency>
42+
<groupId>io.debezium</groupId>
43+
<artifactId>debezium-connector-mysql</artifactId>
44+
<version>${debezium.version}</version>
45+
</dependency>
46+
47+
</dependencies>
48+
49+
50+
<build>
51+
<plugins>
52+
<plugin>
53+
<groupId>org.apache.nifi</groupId>
54+
<artifactId>nifi-nar-maven-plugin</artifactId>
55+
</plugin>
56+
</plugins>
57+
</build>
58+
59+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.io.debezium.mysql;
20+
21+
import java.util.Map;
22+
23+
import io.debezium.connector.mysql.MySqlConnectorConfig;
24+
import lombok.extern.slf4j.Slf4j;
25+
import org.apache.commons.lang3.StringUtils;
26+
import org.apache.kafka.connect.runtime.TaskConfig;
27+
import org.apache.pulsar.common.naming.TopicName;
28+
import org.apache.pulsar.io.core.SourceContext;
29+
import org.apache.pulsar.io.debezium.PulsarDatabaseHistory;
30+
import org.apache.pulsar.io.kafka.connect.KafkaConnectSource;
31+
import org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig;
32+
33+
/**
34+
* A pulsar source that runs
35+
*/
36+
@Slf4j
37+
public class DebeziumMysqlSource extends KafkaConnectSource {
38+
static private final String DEFAULT_TASK = "io.debezium.connector.mysql.MySqlConnectorTask";
39+
static private final String DEFAULT_CONVERTER = "org.apache.kafka.connect.json.JsonConverter";
40+
static private final String DEFAULT_HISTORY = "org.apache.pulsar.io.debezium.PulsarDatabaseHistory";
41+
static private final String DEFAULT_OFFSET_TOPIC = "debezium-mysql-offset-topic";
42+
static private final String DEFAULT_HISTORY_TOPIC = "debezium-mysql-history-topic";
43+
44+
private static void throwExceptionIfConfigNotMatch(Map<String, Object> config,
45+
String key,
46+
String value) throws IllegalArgumentException {
47+
Object orig = config.get(key);
48+
if (orig == null) {
49+
config.put(key, value);
50+
return;
51+
}
52+
53+
// throw exception if value not match
54+
if (!orig.equals(value)) {
55+
throw new IllegalArgumentException("Expected " + value + " but has " + orig);
56+
}
57+
}
58+
59+
private static void setConfigIfNull(Map<String, Object> config, String key, String value) {
60+
Object orig = config.get(key);
61+
if (orig == null) {
62+
config.put(key, value);
63+
}
64+
}
65+
66+
// namespace: tenant/namespace
67+
private static String topicNamespace(SourceContext sourceContext) {
68+
String tenant = sourceContext.getTenant();
69+
String namespace = sourceContext.getNamespace();
70+
71+
return (StringUtils.isEmpty(tenant) ? TopicName.PUBLIC_TENANT : tenant) + "/" +
72+
(StringUtils.isEmpty(namespace) ? TopicName.DEFAULT_NAMESPACE : namespace);
73+
}
74+
75+
@Override
76+
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
77+
// connector task
78+
throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG, DEFAULT_TASK);
79+
80+
// key.converter
81+
setConfigIfNull(config, PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER);
82+
// value.converter
83+
setConfigIfNull(config, PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER);
84+
85+
// database.history implementation class
86+
setConfigIfNull(config, MySqlConnectorConfig.DATABASE_HISTORY.name(), DEFAULT_HISTORY);
87+
88+
// database.history.pulsar.service.url, this is set as the value of pulsar.service.url if null.
89+
String serviceUrl = (String) config.get(PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG);
90+
if (serviceUrl == null) {
91+
throw new IllegalArgumentException("Pulsar service URL not provided.");
92+
}
93+
setConfigIfNull(config, PulsarDatabaseHistory.SERVICE_URL.name(), serviceUrl);
94+
95+
String topicNamespace = topicNamespace(sourceContext);
96+
// topic.namespace
97+
setConfigIfNull(config, PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG, topicNamespace);
98+
99+
String sourceName = sourceContext.getSourceName();
100+
// database.history.pulsar.topic: history topic name
101+
setConfigIfNull(config, PulsarDatabaseHistory.TOPIC.name(),
102+
topicNamespace + "/" + sourceName + "-" + DEFAULT_HISTORY_TOPIC);
103+
// offset.storage.topic: offset topic name
104+
setConfigIfNull(config, PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG,
105+
topicNamespace + "/" + sourceName + "-" + DEFAULT_OFFSET_TOPIC);
106+
107+
super.open(config, sourceContext);
108+
}
109+
110+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
#
19+
20+
name: debezium-mysql
21+
description: Debezium MySql Source
22+
sourceClass: org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource

‎pulsar-io/kafka-connect-adaptor/src/main/resources/debezium-mysql-source-config.yaml ‎pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml

+5-15
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,13 @@
1919

2020
tenant: "test"
2121
namespace: "test-namespace"
22-
name: "debezium-kafka-source"
23-
topicName: "kafka-connect-topic"
24-
archive: "connectors/pulsar-io-kafka-connect-adaptor-2.3.0-SNAPSHOT.nar"
22+
name: "debezium-mysql-source"
23+
topicName: "debezium-mysql-topic"
24+
archive: "connectors/pulsar-io-debezium-mysql-2.4.0-SNAPSHOT.nar"
2525

26-
##autoAck: true
2726
parallelism: 1
2827

2928
configs:
30-
## sourceTask
31-
task.class: "io.debezium.connector.mysql.MySqlConnectorTask"
32-
3329
## config for mysql, docker image: debezium/example-mysql:0.8
3430
database.hostname: "localhost"
3531
database.port: "3306"
@@ -39,15 +35,9 @@ configs:
3935
database.server.name: "dbserver1"
4036
database.whitelist: "inventory"
4137

42-
database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"
43-
database.history.pulsar.topic: "history-topic"
44-
database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
45-
## KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG
46-
key.converter: "org.apache.kafka.connect.json.JsonConverter"
47-
value.converter: "org.apache.kafka.connect.json.JsonConverter"
4838
## PULSAR_SERVICE_URL_CONFIG
4939
pulsar.service.url: "pulsar://127.0.0.1:6650"
50-
## OFFSET_STORAGE_TOPIC_CONFIG
51-
offset.storage.topic: "offset-topic"
40+
database.history.pulsar.topic: "mysql-history-topic"
41+
offset.storage.topic: "mysql-offset-topic"
5242

5343

‎pulsar-io/debezium/pom.xml

+5-70
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2222
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
2323
<modelVersion>4.0.0</modelVersion>
24+
<packaging>pom</packaging>
2425
<parent>
2526
<groupId>org.apache.pulsar</groupId>
2627
<artifactId>pulsar-io</artifactId>
@@ -30,75 +31,9 @@
3031
<artifactId>pulsar-io-debezium</artifactId>
3132
<name>Pulsar IO :: Debezium</name>
3233

33-
<dependencies>
34-
35-
<dependency>
36-
<groupId>${project.groupId}</groupId>
37-
<artifactId>pulsar-io-core</artifactId>
38-
<version>${project.version}</version>
39-
</dependency>
40-
41-
<dependency>
42-
<groupId>io.debezium</groupId>
43-
<artifactId>debezium-core</artifactId>
44-
<version>${debezium.version}</version>
45-
</dependency>
46-
47-
<dependency>
48-
<groupId>io.debezium</groupId>
49-
<artifactId>debezium-connector-mysql</artifactId>
50-
<version>${debezium.version}</version>
51-
</dependency>
52-
53-
<dependency>
54-
<groupId>org.apache.kafka</groupId>
55-
<artifactId>kafka_${scala.binary.version}</artifactId>
56-
<version>${kafka-client.version}</version>
57-
</dependency>
58-
59-
<dependency>
60-
<groupId>org.apache.kafka</groupId>
61-
<artifactId>connect-runtime</artifactId>
62-
<version>${kafka-client.version}</version>
63-
</dependency>
64-
65-
<dependency>
66-
<groupId>${project.groupId}</groupId>
67-
<artifactId>pulsar-client-original</artifactId>
68-
<version>${project.version}</version>
69-
</dependency>
70-
71-
<dependency>
72-
<groupId>${project.groupId}</groupId>
73-
<artifactId>pulsar-broker</artifactId>
74-
<version>${project.version}</version>
75-
<scope>test</scope>
76-
</dependency>
77-
78-
<dependency>
79-
<groupId>${project.groupId}</groupId>
80-
<artifactId>managed-ledger-original</artifactId>
81-
<version>${project.version}</version>
82-
<type>test-jar</type>
83-
<scope>test</scope>
84-
</dependency>
85-
86-
<dependency>
87-
<groupId>${project.groupId}</groupId>
88-
<artifactId>pulsar-zookeeper-utils</artifactId>
89-
<version>${project.version}</version>
90-
<type>test-jar</type>
91-
<scope>test</scope>
92-
</dependency>
93-
94-
<dependency>
95-
<groupId>${project.groupId}</groupId>
96-
<artifactId>pulsar-broker</artifactId>
97-
<version>${project.version}</version>
98-
<scope>test</scope>
99-
<type>test-jar</type>
100-
</dependency>
101-
102-
</dependencies>
34+
<modules>
35+
<module>core</module>
36+
<module>mysql</module>
37+
</modules>
10338

10439
</project>

‎pulsar-io/kafka-connect-adaptor/pom.xml

-6
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,6 @@
3838
<version>${project.version}</version>
3939
</dependency>
4040

41-
<dependency>
42-
<groupId>${project.groupId}</groupId>
43-
<artifactId>pulsar-io-debezium</artifactId>
44-
<version>${project.version}</version>
45-
</dependency>
46-
4741
<dependency>
4842
<groupId>org.apache.kafka</groupId>
4943
<artifactId>kafka_${scala.binary.version}</artifactId>

‎pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.apache.pulsar.io.kafka.connect;
2020

21+
import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG;
22+
2123
import java.util.Base64;
2224
import java.util.Collections;
2325
import java.util.HashMap;
@@ -66,6 +68,7 @@ public class KafkaConnectSource implements Source<KeyValue<byte[], byte[]>> {
6668
private CompletableFuture<Void> flushFuture;
6769
private OffsetBackingStore offsetStore;
6870
private OffsetStorageReader offsetReader;
71+
private String topicNamespace;
6972
@Getter
7073
private OffsetStorageWriter offsetWriter;
7174
// number of outstandingRecords that have been polled but not been acked
@@ -86,6 +89,8 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
8689
.getDeclaredConstructor()
8790
.newInstance();
8891

92+
topicNamespace = stringConfig.get(TOPIC_NAMESPACE_CONFIG);
93+
8994
// initialize the key and value converter
9095
keyConverter = ((Class<? extends Converter>)Class.forName(stringConfig.get(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG)))
9196
.asSubclass(Converter.class)
@@ -193,7 +198,7 @@ private class KafkaSourceRecord implements Record<KeyValue<byte[], byte[]>> {
193198
.stream()
194199
.map(e -> e.getKey() + "=" + e.getValue())
195200
.collect(Collectors.joining(",")));
196-
this.destinationTopic = Optional.of(srcRecord.topic());
201+
this.destinationTopic = Optional.of(topicNamespace + "/" + srcRecord.topic());
197202
}
198203

199204
@Override

‎pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,12 @@ public class PulsarKafkaWorkerConfig extends WorkerConfig {
4444
public static final String PULSAR_SERVICE_URL_CONFIG = "pulsar.service.url";
4545
private static final String PULSAR_SERVICE_URL_CONFIG_DOC = "pulsar service url";
4646

47+
/**
48+
* <code>topic.namespace</code>
49+
*/
50+
public static final String TOPIC_NAMESPACE_CONFIG = "topic.namespace";
51+
private static final String TOPIC_NAMESPACE_CONFIG_DOC = "namespace of topic name to store the output topics";
52+
4753
static {
4854
CONFIG = new ConfigDef()
4955
.define(OFFSET_STORAGE_TOPIC_CONFIG,
@@ -53,10 +59,14 @@ public class PulsarKafkaWorkerConfig extends WorkerConfig {
5359
.define(PULSAR_SERVICE_URL_CONFIG,
5460
Type.STRING,
5561
Importance.HIGH,
56-
PULSAR_SERVICE_URL_CONFIG_DOC);
62+
PULSAR_SERVICE_URL_CONFIG_DOC)
63+
.define(TOPIC_NAMESPACE_CONFIG,
64+
Type.STRING,
65+
"public/default",
66+
Importance.HIGH,
67+
TOPIC_NAMESPACE_CONFIG_DOC);
5768
}
5869

59-
6070
public PulsarKafkaWorkerConfig(Map<String, String> props) {
6171
super(CONFIG, props);
6272
}

‎tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
public class DebeziumMySQLContainer extends ChaosContainer<DebeziumMySQLContainer> {
2525

26-
public static final String NAME = "mysql";
26+
public static final String NAME = "debezium-mysql-example";
2727
static final Integer[] PORTS = { 3306 };
2828

2929
private static final String IMAGE_NAME = "debezium/example-mysql:0.8";

‎tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -1163,7 +1163,7 @@ private void testDebeziumMySqlConnect()
11631163
final String tenant = TopicName.PUBLIC_TENANT;
11641164
final String namespace = TopicName.DEFAULT_NAMESPACE;
11651165
final String outputTopicName = "debe-output-topic-name";
1166-
final String consumeTopicName = "dbserver1.inventory.products";
1166+
final String consumeTopicName = "public/default/dbserver1.inventory.products";
11671167
final String sourceName = "test-source-connector-"
11681168
+ functionRuntimeType + "-name-" + randomName(8);
11691169

@@ -1182,6 +1182,7 @@ private void testDebeziumMySqlConnect()
11821182
.subscriptionType(SubscriptionType.Exclusive)
11831183
.subscribe();
11841184

1185+
@Cleanup
11851186
DebeziumMySqlSourceTester sourceTester = new DebeziumMySqlSourceTester(pulsarCluster);
11861187

11871188
// setup debezium mysql server
@@ -1204,15 +1205,13 @@ private void testDebeziumMySqlConnect()
12041205
waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages);
12051206

12061207
// validate the source result
1207-
sourceTester.validateSourceResult(consumer, null);
1208+
sourceTester.validateSourceResult(consumer, 9);
12081209

12091210
// delete the source
12101211
deleteSource(tenant, namespace, sourceName);
12111212

12121213
// get source info (source should be deleted)
12131214
getSourceInfoNotFound(tenant, namespace, sourceName);
1214-
1215-
pulsarCluster.stopService("mysql", sourceTester.getDebeziumMySqlContainer());
12161215
}
12171216

12181217
}

‎tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java

+15-13
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.tests.integration.io;
2020

21+
import java.io.Closeable;
2122
import java.util.Map;
2223
import java.util.concurrent.TimeUnit;
2324
import lombok.Getter;
@@ -39,9 +40,9 @@
3940
* which is a MySQL database server preconfigured with an inventory database.
4041
*/
4142
@Slf4j
42-
public class DebeziumMySqlSourceTester extends SourceTester<DebeziumMySQLContainer> {
43+
public class DebeziumMySqlSourceTester extends SourceTester<DebeziumMySQLContainer> implements Closeable {
4344

44-
private static final String NAME = "kafka-connect-adaptor";
45+
private static final String NAME = "debezium-mysql";
4546

4647
private final String pulsarServiceUrl;
4748

@@ -55,28 +56,21 @@ public DebeziumMySqlSourceTester(PulsarCluster cluster) {
5556
this.pulsarCluster = cluster;
5657
pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT;
5758

58-
sourceConfig.put("task.class", "io.debezium.connector.mysql.MySqlConnectorTask");
59-
sourceConfig.put("database.hostname", "mysql");
59+
sourceConfig.put("database.hostname", DebeziumMySQLContainer.NAME);
6060
sourceConfig.put("database.port", "3306");
6161
sourceConfig.put("database.user", "debezium");
6262
sourceConfig.put("database.password", "dbz");
6363
sourceConfig.put("database.server.id", "184054");
6464
sourceConfig.put("database.server.name", "dbserver1");
6565
sourceConfig.put("database.whitelist", "inventory");
66-
sourceConfig.put("database.history", "org.apache.pulsar.io.debezium.PulsarDatabaseHistory");
67-
sourceConfig.put("database.history.pulsar.topic", "history-topic");
68-
sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl);
69-
sourceConfig.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
70-
sourceConfig.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
7166
sourceConfig.put("pulsar.service.url", pulsarServiceUrl);
72-
sourceConfig.put("offset.storage.topic", "offset-topic");
7367
}
7468

7569
@Override
7670
public void setServiceContainer(DebeziumMySQLContainer container) {
7771
log.info("start debezium mysql server container.");
7872
debeziumMySqlContainer = container;
79-
pulsarCluster.startService("mysql", debeziumMySqlContainer);
73+
pulsarCluster.startService(DebeziumMySQLContainer.NAME, debeziumMySqlContainer);
8074
}
8175

8276
@Override
@@ -90,7 +84,7 @@ public Map<String, String> produceSourceMessages(int numMessages) throws Excepti
9084
return null;
9185
}
9286

93-
public void validateSourceResult(Consumer<String> consumer, Map<String, String> kvs) throws Exception {
87+
public void validateSourceResult(Consumer<String> consumer, int number) throws Exception {
9488
int recordsNumber = 0;
9589
Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
9690
while(msg != null) {
@@ -101,7 +95,15 @@ public void validateSourceResult(Consumer<String> consumer, Map<String, String>
10195
msg = consumer.receive(1, TimeUnit.SECONDS);
10296
}
10397

104-
Assert.assertEquals(recordsNumber, 9);
98+
Assert.assertEquals(recordsNumber, number);
10599
log.info("Stop debezium mysql server container. topic: {} has {} records.", consumer.getTopic(), recordsNumber);
106100
}
101+
102+
@Override
103+
public void close() {
104+
if (pulsarCluster != null) {
105+
pulsarCluster.stopService(DebeziumMySQLContainer.NAME, debeziumMySqlContainer);
106+
}
107+
}
108+
107109
}

0 commit comments

Comments
 (0)
Please sign in to comment.