Skip to content

Commit

Permalink
a simple debezium integration test (apache#3154)
Browse files Browse the repository at this point in the history
### Motivation

create a simple debezium integration test. 
In this test, it start a Debezium MySQL Container based on "debezium/example-mysql:0.8", then use debezium source connector to binlog from MySQL, and store the debezium output into Pulsar.
It verify the consumer readout message number is as expected.

### Modifications

 create a simple debezium integration test.

### Result

this test passed.
  • Loading branch information
jiazhai authored and sijie committed Dec 11, 2018
1 parent 8c7ff2b commit 1260bf3
Show file tree
Hide file tree
Showing 3 changed files with 229 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.containers;


import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;

public class DebeziumMySQLContainer extends ChaosContainer<DebeziumMySQLContainer> {

public static final String NAME = "mysql";
static final Integer[] PORTS = { 3306 };

private static final String IMAGE_NAME = "debezium/example-mysql:0.8";

public DebeziumMySQLContainer(String clusterName) {
super(clusterName, IMAGE_NAME);
this.withEnv("MYSQL_USER", "mysqluser");
this.withEnv("MYSQL_PASSWORD", "mysqlpw");
this.withEnv("MYSQL_ROOT_PASSWORD", "debezium");

}

@Override
public String getContainerName() {
return clusterName;
}

@Override
protected void configure() {
super.configure();
this.withNetworkAliases(NAME)
.withExposedPorts(PORTS)
.withCreateContainerCmdModifier(createContainerCmd -> {
createContainerCmd.withHostName(NAME);
createContainerCmd.withName(getContainerName());
})
.waitingFor(new HostPortWaitStrategy());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.common.policies.data.SourceStatus;
import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
import org.apache.pulsar.functions.api.examples.serde.CustomObject;
import org.apache.pulsar.tests.integration.containers.DebeziumMySQLContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
Expand Down Expand Up @@ -100,6 +101,11 @@ public void testElasticSearchSink() throws Exception {
testSink(new ElasticSearchSinkTester(), true);
}

@Test
public void testDebeziumMySqlSource() throws Exception {
testDebeziumMySqlConnect();
}

private void testSink(SinkTester tester, boolean builtin) throws Exception {
tester.startServiceContainer(pulsarCluster);
try {
Expand Down Expand Up @@ -1130,4 +1136,63 @@ private static void publishAndConsumeAvroMessages(String inputTopic,
assertEquals("value-" + i, msg.getValue());
}
}
}

private void testDebeziumMySqlConnect()
throws Exception {

final String tenant = TopicName.PUBLIC_TENANT;
final String namespace = TopicName.DEFAULT_NAMESPACE;
final String outputTopicName = "debe-output-topic-name";
final String consumeTopicName = "dbserver1.inventory.products";
final String sourceName = "test-source-connector-"
+ functionRuntimeType + "-name-" + randomName(8);

// This is the binlog count that contained in mysql container.
final int numMessages = 47;

@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();

@Cleanup
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(consumeTopicName)
.subscriptionName("debezium-source-tester")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();

DebeziumMySqlSourceTester sourceTester = new DebeziumMySqlSourceTester(pulsarCluster);

// setup debezium mysql server
DebeziumMySQLContainer mySQLContainer = new DebeziumMySQLContainer(pulsarCluster.getClusterName());
sourceTester.setServiceContainer(mySQLContainer);

// prepare the testing environment for source
prepareSource(sourceTester);

// submit the source connector
submitSourceConnector(sourceTester, tenant, namespace, sourceName, outputTopicName);

// get source info
getSourceInfoSuccess(sourceTester, tenant, namespace, sourceName);

// get source status
getSourceStatus(tenant, namespace, sourceName);

// wait for source to process messages
waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages);

// validate the source result
sourceTester.validateSourceResult(consumer, null);

// delete the source
deleteSource(tenant, namespace, sourceName);

// get source info (source should be deleted)
getSourceInfoNotFound(tenant, namespace, sourceName);

pulsarCluster.stopService("mysql", sourceTester.getDebeziumMySqlContainer());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.io;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.tests.integration.containers.DebeziumMySQLContainer;
import org.apache.pulsar.tests.integration.containers.PulsarContainer;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testng.Assert;

/**
* A tester for testing Debezium MySQL source.
*
* It reads binlog from MySQL, and store the debezium output into Pulsar.
* This test verify that the target topic contains wanted number messages.
*
* Debezium MySQL Container is "debezium/example-mysql:0.8",
* which is a MySQL database server preconfigured with an inventory database.
*/
@Slf4j
public class DebeziumMySqlSourceTester extends SourceTester<DebeziumMySQLContainer> {

private static final String NAME = "kafka-connect-adaptor";

private final String pulsarServiceUrl;

@Getter
private DebeziumMySQLContainer debeziumMySqlContainer;

private final PulsarCluster pulsarCluster;

public DebeziumMySqlSourceTester(PulsarCluster cluster) {
super(NAME);
this.pulsarCluster = cluster;
pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT;

sourceConfig.put("task.class", "io.debezium.connector.mysql.MySqlConnectorTask");
sourceConfig.put("database.hostname", "mysql");
sourceConfig.put("database.port", "3306");
sourceConfig.put("database.user", "debezium");
sourceConfig.put("database.password", "dbz");
sourceConfig.put("database.server.id", "184054");
sourceConfig.put("database.server.name", "dbserver1");
sourceConfig.put("database.whitelist", "inventory");
sourceConfig.put("database.history", "org.apache.pulsar.io.debezium.PulsarDatabaseHistory");
sourceConfig.put("database.history.pulsar.topic", "history-topic");
sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl);
sourceConfig.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
sourceConfig.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
sourceConfig.put("pulsar.service.url", pulsarServiceUrl);
sourceConfig.put("offset.storage.topic", "offset-topic");
}

@Override
public void setServiceContainer(DebeziumMySQLContainer container) {
log.info("start debezium mysql server container.");
debeziumMySqlContainer = container;
pulsarCluster.startService("mysql", debeziumMySqlContainer);
}

@Override
public void prepareSource() throws Exception {
log.info("debezium mysql server already contains preconfigured data.");
}

@Override
public Map<String, String> produceSourceMessages(int numMessages) throws Exception {
log.info("debezium mysql server already contains preconfigured data.");
return null;
}

public void validateSourceResult(Consumer<String> consumer, Map<String, String> kvs) throws Exception {
int recordsNumber = 0;
Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
while(msg != null) {
recordsNumber ++;
log.info("Received message: {}.", msg.getValue());
Assert.assertTrue(msg.getValue().contains("dbserver1.inventory.products"));
consumer.acknowledge(msg);
msg = consumer.receive(1, TimeUnit.SECONDS);
}

Assert.assertEquals(recordsNumber, 9);
log.info("Stop debezium mysql server container. topic: {} has {} records.", consumer.getTopic(), recordsNumber);
}
}

0 comments on commit 1260bf3

Please sign in to comment.