Skip to content

Commit

Permalink
[tests] improve connector related integration tests (apache#2587)
Browse files Browse the repository at this point in the history
*Motivation*

with more and more connector are added, it becomes expensive to start all external services at the begin.

*Changes*

- refactor the connector testing framework to start external service before methods
- fix kafka, cassandra and mysql connectors
  • Loading branch information
sijie authored Sep 18, 2018
1 parent fb396bf commit 7530d64
Show file tree
Hide file tree
Showing 26 changed files with 212 additions and 386 deletions.
6 changes: 6 additions & 0 deletions distribution/io/src/assemble/io.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,11 @@
<outputDirectory>connectors</outputDirectory>
<fileMode>644</fileMode>
</file>

<file>
<source>${basedir}/../../pulsar-io/elastic-search/target/pulsar-io-elastic-search-${project.version}.nar</source>
<outputDirectory>connectors</outputDirectory>
<fileMode>644</fileMode>
</file>
</files>
</assembly>
Original file line number Diff line number Diff line change
Expand Up @@ -233,66 +233,6 @@ public void testReplayOnConsumerDisconnect() throws Exception {
deleteTopic(topicName);
}

@Test
public void testConsumersWithDifferentPermits() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/shared-topic4";
final String subName = "sub4";
final int numMsgs = 10000;

final AtomicInteger msgCountConsumer1 = new AtomicInteger(0);
final AtomicInteger msgCountConsumer2 = new AtomicInteger(0);
final CountDownLatch latch = new CountDownLatch(numMsgs);

int recvQ1 = 10;
Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(recvQ1)
.messageListener((consumer, msg) -> {
msgCountConsumer1.incrementAndGet();
try {
consumer.acknowledge(msg);
latch.countDown();
} catch (PulsarClientException e) {
fail("Should not fail");
}
}).subscribe();

int recvQ2 = 1;
Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(recvQ2)
.messageListener((consumer, msg) -> {
msgCountConsumer2.incrementAndGet();
try {
consumer.acknowledge(msg);
latch.countDown();
} catch (PulsarClientException e) {
fail("Should not fail");
}
}).subscribe();

List<CompletableFuture<MessageId>> futures = Lists.newArrayListWithCapacity(numMsgs);
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.maxPendingMessages(numMsgs + 1)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
for (int i = 0; i < numMsgs; i++) {
String message = "msg-" + i;
futures.add(producer.sendAsync(message.getBytes()));
}
FutureUtil.waitForAll(futures).get();
producer.close();

latch.await(5, TimeUnit.SECONDS);

assertEquals(msgCountConsumer1.get(), numMsgs - numMsgs / (recvQ1 + recvQ2), numMsgs * 0.1);
assertEquals(msgCountConsumer2.get(), numMsgs / (recvQ1 + recvQ2), numMsgs * 0.1);

consumer1.close();
consumer2.close();

deleteTopic(topicName);
}

// this test is good to have to see the distribution, but every now and then it gets slightly different than the
// expected numbers. keeping this disabled to not break the build, but nevertheless this gives good insight into
// how the round robin distribution algorithm is behaving
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ JavaInstance setupJavaInstance(ContextImpl contextImpl) throws Exception {
ThreadContext.put("functionname", instanceConfig.getFunctionDetails().getName());
ThreadContext.put("instance", instanceConfig.getInstanceId());

log.info("Starting Java Instance {}", instanceConfig.getFunctionDetails().getName());
log.info("Starting Java Instance {} : \n Details = {}",
instanceConfig.getFunctionDetails().getName(), instanceConfig.getFunctionDetails());

// start the function thread
loadJars();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ public Schema<?> getSchema(String topic, Class<?> clazz, SchemaType schemaType)
private SchemaType getSchemaTypeOrDefault(String topic, Class<?> clazz) {
if (GenericRecord.class.isAssignableFrom(clazz)) {
return SchemaType.AUTO;
} else if (byte[].class.equals(clazz)) {
// if function uses bytes, we should ignore
return SchemaType.NONE;
} else {
Optional<SchemaInfo> schema = ((PulsarClientImpl) client).getSchema(topic).join();
if (schema.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
* Users need to implement extractKeyValue function to use this sink.
* This class assumes that the input will be JSON documents
*/
public abstract class ElasticSearchAbstractSink<K, V> implements Sink<byte[]> {
public class ElasticSearchSink implements Sink<byte[]> {

protected static final String DOCUMENT = "doc";

Expand All @@ -74,7 +74,7 @@ public void close() throws Exception {

@Override
public void write(Record<byte[]> record) {
KeyValue<K, V> keyValue = extractKeyValue(record);
KeyValue<String, byte[]> keyValue = extractKeyValue(record);
IndexRequest indexRequest = Requests.indexRequest(elasticSearchConfig.getIndexName());
indexRequest.type(DOCUMENT);
indexRequest.source(keyValue.getValue(), XContentType.JSON);
Expand All @@ -91,7 +91,10 @@ public void write(Record<byte[]> record) {
}
}

public abstract KeyValue<K, V> extractKeyValue(Record<byte[]> record);
public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) {
String key = record.getKey().orElseGet(null);
return new KeyValue<>(key, record.getValue());
}

private void createIndexIfNeeded() throws IOException {
GetIndexRequest request = new GetIndexRequest();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@
# under the License.
#

name: Elastic Search
name: elastic_search
description: Writes data into Elastic Search
sinkClass: org.apache.pulsar.io.elasticsearch.ElasticSearchStringSink
sinkClass: org.apache.pulsar.io.elasticsearch.ElasticSearchSink
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class ElasticSearchSinkTests {
@Mock
protected SinkContext mockSinkContext;
protected Map<String, Object> map;
protected ElasticSearchStringSink sink;
protected ElasticSearchSink sink;

@BeforeClass
public static final void init() {
Expand All @@ -71,7 +71,7 @@ public static final void init() {
public final void setUp() throws Exception {
map = new HashMap<String, Object> ();
map.put("elasticSearchUrl", "http://localhost:9200");
sink = new ElasticSearchStringSink();
sink = new ElasticSearchSink();

mockRecord = mock(Record.class);
mockSinkContext = mock(SinkContext.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ public void close() throws IOException {
}
}

protected Properties beforeCreateProducer(Properties props) {
return props;
}

@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
kafkaSinkConfig = KafkaSinkConfig.load(config);
Expand All @@ -89,7 +93,7 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaSinkConfig.getKeySerializerClass());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaSinkConfig.getValueSerializerClass());

producer = new KafkaProducer<>(props);
producer = new KafkaProducer<>(beforeCreateProducer(props));

log.info("Kafka sink started.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,31 @@

package org.apache.pulsar.io.kafka;

import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;

/**
* Kafka sink that treats incoming messages on the input topic as Strings
* and write identical key/value pairs.
* Kafka sink should treats incoming messages as pure bytes. So we don't
* apply schema into it.
*/
public class KafkaStringSink extends KafkaAbstractSink<String, String> {
@Slf4j
public class KafkaBytesSink extends KafkaAbstractSink<String, byte[]> {

@Override
protected Properties beforeCreateProducer(Properties props) {
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
log.info("Created kafka producer config : {}", props);
return props;
}

@Override
public KeyValue<String, String> extractKeyValue(Record<byte[]> record) {
return new KeyValue<>(record.getKey().orElse(null), new String(record.getValue()));
public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) {
return new KeyValue<>(record.getKey().orElse(null), record.getValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@
name: kafka
description: Kafka source and sink connector
sourceClass: org.apache.pulsar.io.kafka.KafkaStringSource
sinkClass: org.apache.pulsar.io.kafka.KafkaStringSink
sinkClass: org.apache.pulsar.io.kafka.KafkaBytesSink
2 changes: 1 addition & 1 deletion site2/docs/io-quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ curl -s http://localhost:8080/admin/v2/functions/connectors

Example output:
```json
[{"name":"aerospike","description":"Aerospike database sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes data into Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka source and sink connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaStringSink"},{"name":"kinesis","description":"Kinesis sink connector","sinkClass":"org.apache.pulsar.io.kinesis.KinesisSink"},{"name":"rabbitmq","description":"RabbitMQ source connector","sourceClass":"org.apache.pulsar.io.rabbitmq.RabbitMQSource"},{"name":"twitter","description":"Ingest data from Twitter firehose","sourceClass":"org.apache.pulsar.io.twitter.TwitterFireHose"}]
[{"name":"aerospike","description":"Aerospike database sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes data into Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka source and sink connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaBytesSink"},{"name":"kinesis","description":"Kinesis sink connector","sinkClass":"org.apache.pulsar.io.kinesis.KinesisSink"},{"name":"rabbitmq","description":"RabbitMQ source connector","sourceClass":"org.apache.pulsar.io.rabbitmq.RabbitMQSource"},{"name":"twitter","description":"Ingest data from Twitter firehose","sourceClass":"org.apache.pulsar.io.twitter.TwitterFireHose"}]
```

If an error occurred while starting Pulsar service, you may be able to seen exception at the terminal you are running `pulsar/standalone`,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ curl -s http://localhost:8080/admin/v2/functions/connectors

Example output:
```json
[{"name":"aerospike","description":"Aerospike database sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes data into Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka source and sink connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaStringSink"},{"name":"kinesis","description":"Kinesis sink connector","sinkClass":"org.apache.pulsar.io.kinesis.KinesisSink"},{"name":"rabbitmq","description":"RabbitMQ source connector","sourceClass":"org.apache.pulsar.io.rabbitmq.RabbitMQSource"},{"name":"twitter","description":"Ingest data from Twitter firehose","sourceClass":"org.apache.pulsar.io.twitter.TwitterFireHose"}]
[{"name":"aerospike","description":"Aerospike database sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes data into Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka source and sink connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaBytesSink"},{"name":"kinesis","description":"Kinesis sink connector","sinkClass":"org.apache.pulsar.io.kinesis.KinesisSink"},{"name":"rabbitmq","description":"RabbitMQ source connector","sourceClass":"org.apache.pulsar.io.rabbitmq.RabbitMQSource"},{"name":"twitter","description":"Ingest data from Twitter firehose","sourceClass":"org.apache.pulsar.io.twitter.TwitterFireHose"}]
```

If an error occurred while starting Pulsar service, you may be able to seen exception at the terminal you are running `pulsar/standalone`,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.pulsar.tests.integration.io.JdbcSinkTester.Foo;
import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testcontainers.containers.GenericContainer;
import org.testng.annotations.Test;

/**
Expand All @@ -62,17 +63,17 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {

@Test
public void testKafkaSink() throws Exception {
testSink(new KafkaSinkTester(), true);
testSink(new KafkaSinkTester(), true, new KafkaSourceTester());
}

@Test
public void testCassandraSink() throws Exception {
testSink(new CassandraSinkTester(), true);
testSink(CassandraSinkTester.createTester(true), true);
}

@Test
public void testCassandraArchiveSink() throws Exception {
testSink(new CassandraSinkArchiveTester(), false);
testSink(CassandraSinkTester.createTester(false), false);
}

@Test(enabled = false)
Expand All @@ -91,8 +92,31 @@ public void testElasticSearchSink() throws Exception {
}

private void testSink(SinkTester tester, boolean builtin) throws Exception {
tester.findSinkServiceContainer(pulsarCluster.getExternalServices());
tester.startServiceContainer(pulsarCluster);
try {
runSinkTester(tester, builtin);
} finally {
tester.stopServiceContainer(pulsarCluster);
}
}


private <ServiceContainerT extends GenericContainer> void testSink(SinkTester<ServiceContainerT> sinkTester,
boolean builtinSink,
SourceTester<ServiceContainerT> sourceTester)
throws Exception {
ServiceContainerT serviceContainer = sinkTester.startServiceContainer(pulsarCluster);
try {
runSinkTester(sinkTester, builtinSink);
if (null != sourceTester) {
sourceTester.setServiceContainer(serviceContainer);
testSource(sourceTester);
}
} finally {
sinkTester.stopServiceContainer(pulsarCluster);
}
}
private void runSinkTester(SinkTester tester, boolean builtin) throws Exception {
final String tenant = TopicName.PUBLIC_TENANT;
final String namespace = TopicName.DEFAULT_NAMESPACE;
final String inputTopicName = "test-sink-connector-"
Expand Down Expand Up @@ -357,14 +381,7 @@ protected void getSinkInfoNotFound(String tenant, String namespace, String sinkN
// Source Test
//

@Test
public void testKafkaSource() throws Exception {
testSource(new KafkaSourceTester());
}

private void testSource(SourceTester tester) throws Exception {
tester.findSourceServiceContainer(pulsarCluster.getExternalServices());

final String tenant = TopicName.PUBLIC_TENANT;
final String namespace = TopicName.DEFAULT_NAMESPACE;
final String outputTopicName = "test-source-connector-"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public String generateUpdateFunctionCommand() {
}

public String generateUpdateFunctionCommand(String codeFile) {
StringBuilder commandBuilder = new StringBuilder("PULSAR_MEM=-Xmx1024m ");
StringBuilder commandBuilder = new StringBuilder();
if (adminUrl == null) {
commandBuilder.append("/pulsar/bin/pulsar-admin functions update");
} else {
Expand Down
Loading

0 comments on commit 7530d64

Please sign in to comment.