Skip to content

Commit

Permalink
[pulsar-io] pass client builder if no service url provided to debeziu…
Browse files Browse the repository at this point in the history
…m connector (apache#12145)
  • Loading branch information
freeznet authored Jan 18, 2022
1 parent fbec699 commit aaa22f9
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.io.debezium;

import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
import io.debezium.relational.history.DatabaseHistory;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -49,10 +48,7 @@ public static void throwExceptionIfConfigNotMatch(Map<String, Object> config,
}

public static void setConfigIfNull(Map<String, Object> config, String key, String value) {
Object orig = config.get(key);
if (orig == null) {
config.put(key, value);
}
config.putIfAbsent(key, value);
}

// namespace for output topics, default value is "tenant/namespace"
Expand Down Expand Up @@ -80,9 +76,6 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws

// database.history.pulsar.service.url
String pulsarUrl = (String) config.get(PulsarDatabaseHistory.SERVICE_URL.name());
if (StringUtils.isEmpty(pulsarUrl)) {
throw new IllegalArgumentException("Pulsar service URL for History Database not provided.");
}

String topicNamespace = topicNamespace(sourceContext);
// topic.namespace
Expand All @@ -96,8 +89,11 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
setConfigIfNull(config, PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG,
topicNamespace + "/" + sourceName + "-" + DEFAULT_OFFSET_TOPIC);

config.put(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.client.builder",
SerDeUtils.serialize(sourceContext.getPulsarClientBuilder()));
// pass pulsar.client.builder if database.history.pulsar.service.url is not provided
if (StringUtils.isEmpty(pulsarUrl)) {
String pulsarClientBuilder = SerDeUtils.serialize(sourceContext.getPulsarClientBuilder());
config.put(PulsarDatabaseHistory.CLIENT_BUILDER.name(), pulsarClientBuilder);
}

super.open(config, sourceContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,12 @@ public void configure(
}
this.topicName = config.getString(TOPIC);

if (config.getString(CLIENT_BUILDER) == null && config.getString(SERVICE_URL) == null) {
String clientBuilderBase64Encoded = config.getString(CLIENT_BUILDER);
if (isBlank(clientBuilderBase64Encoded) && isBlank(config.getString(SERVICE_URL))) {
throw new IllegalArgumentException("Neither Pulsar Service URL nor ClientBuilder provided.");
}
String clientBuilderBase64Encoded = config.getString(CLIENT_BUILDER);
this.clientBuilder = PulsarClient.builder();
if (null != clientBuilderBase64Encoded) {
if (!isBlank(clientBuilderBase64Encoded)) {
// deserialize the client builder to the same classloader
this.clientBuilder = (ClientBuilder) SerDeUtils.deserialize(clientBuilderBase64Encoded,
this.clientBuilder.getClass().getClassLoader());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public class DebeziumMySqlSourceTester extends SourceTester<DebeziumMySQLContain

private final PulsarCluster pulsarCluster;

public DebeziumMySqlSourceTester(PulsarCluster cluster, String converterClassName) {
public DebeziumMySqlSourceTester(PulsarCluster cluster, String converterClassName,
boolean testWithClientBuilder) {
super(NAME);
this.pulsarCluster = cluster;
pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT;
Expand All @@ -61,7 +62,9 @@ public DebeziumMySqlSourceTester(PulsarCluster cluster, String converterClassNam
sourceConfig.put("database.server.id", "184054");
sourceConfig.put("database.server.name", "dbserver1");
sourceConfig.put("database.whitelist", "inventory");
sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl);
if (!testWithClientBuilder) {
sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl);
}
sourceConfig.put("key.converter", converterClassName);
sourceConfig.put("value.converter", converterClassName);
sourceConfig.put("topic.namespace", "debezium/mysql-" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,26 @@ public class PulsarDebeziumSourcesTest extends PulsarIOTestBase {

@Test(groups = "source")
public void testDebeziumMySqlSourceJson() throws Exception {
testDebeziumMySqlConnect("org.apache.kafka.connect.json.JsonConverter", true);
testDebeziumMySqlConnect("org.apache.kafka.connect.json.JsonConverter", true, false);
}

@Test(groups = "source")
public void testDebeziumMySqlSourceJsonWithClientBuilder() throws Exception {
testDebeziumMySqlConnect("org.apache.kafka.connect.json.JsonConverter", true, true);
}

@Test(groups = "source")
public void testDebeziumMySqlSourceAvro() throws Exception {
testDebeziumMySqlConnect(
"org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter", false);
"org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter", false, false);
}

@Test(groups = "source")
public void testDebeziumPostgreSqlSource() throws Exception {
testDebeziumPostgreSqlConnect("org.apache.kafka.connect.json.JsonConverter", true);
}


@Test(groups = "source")
public void testDebeziumMongoDbSource() throws Exception{
testDebeziumMongoDbConnect("org.apache.kafka.connect.json.JsonConverter", true);
Expand All @@ -69,7 +75,8 @@ public void testDebeziumMsSqlSource() throws Exception{
testDebeziumMsSqlConnect("org.apache.kafka.connect.json.JsonConverter", true);
}

private void testDebeziumMySqlConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception {
private void testDebeziumMySqlConnect(String converterClassName, boolean jsonWithEnvelope,
boolean testWithClientBuilder) throws Exception {

final String tenant = TopicName.PUBLIC_TENANT;
final String namespace = TopicName.DEFAULT_NAMESPACE;
Expand Down Expand Up @@ -104,7 +111,7 @@ private void testDebeziumMySqlConnect(String converterClassName, boolean jsonWit
admin.topics().createNonPartitionedTopic(outputTopicName);

@Cleanup
DebeziumMySqlSourceTester sourceTester = new DebeziumMySqlSourceTester(pulsarCluster, converterClassName);
DebeziumMySqlSourceTester sourceTester = new DebeziumMySqlSourceTester(pulsarCluster, converterClassName, testWithClientBuilder);
sourceTester.getSourceConfig().put("json-with-envelope", jsonWithEnvelope);

// setup debezium mysql server
Expand Down

0 comments on commit aaa22f9

Please sign in to comment.