Skip to content

Commit

Permalink
[FLINK-23969][connector/pulsar] Create e2e tests for pulsar connector.
Browse files Browse the repository at this point in the history
  • Loading branch information
syhily authored and dawidwys committed Sep 16, 2021
1 parent d18911d commit b608620
Show file tree
Hide file tree
Showing 29 changed files with 1,143 additions and 165 deletions.
47 changes: 47 additions & 0 deletions flink-connectors/flink-connector-pulsar/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@ under the License.
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-all</artifactId>
<version>${pulsar.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-package-core</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

Expand Down Expand Up @@ -258,6 +264,47 @@ under the License.
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
<configuration>
<includes>
<include>**/testutils/**</include>
<include>META-INF/LICENSE</include>
<include>META-INF/NOTICE</include>
</includes>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<executions>
<execution>
<id>attach-test-sources</id>
<goals>
<goal>test-jar-no-fork</goal>
</goals>
<configuration>
<archive>
<!-- Globally exclude maven metadata, because it may accidentally bundle files we don't intend to -->
<addMavenDescriptor>false</addMavenDescriptor>
</archive>
<includes>
<include>**/testutils/**</include>
<include>META-INF/LICENSE</include>
<include>META-INF/NOTICE</include>
</includes>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public class TopicRange implements Serializable {
public TopicRange(int start, int end) {
checkArgument(start >= MIN_RANGE, "Start range %s shouldn't below zero.", start);
checkArgument(end <= MAX_RANGE, "End range %s shouldn't exceed 65535.", end);
checkArgument(start <= end, "Range end must >= range start.");

this.start = start;
this.end = end;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,25 @@
* limitations under the License.
*/

package org.apache.flink.connector.pulsar.testutils.runtime;
package org.apache.flink.connector.pulsar.source.enumerator.topic.range;

import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;

/**
* A abstraction for different pulsar runtimes. Providing the common methods for {@link
* PulsarTestEnvironment}.
*/
public interface PulsarRuntimeProvider {
import java.util.List;

/** Always return the same range set for all topics. */
public class FixedRangeGenerator implements RangeGenerator {
private static final long serialVersionUID = -3895203007855538734L;

/** Start up this pulsar runtime, block the thread until everytime is ready for this runtime. */
void startUp();
private final List<TopicRange> ranges;

/** Shutdown this pulsar runtime. */
void tearDown();
public FixedRangeGenerator(List<TopicRange> ranges) {
this.ranges = ranges;
}

/** Return a operator for operating this pulsar runtime. */
PulsarRuntimeOperator operator();
@Override
public List<TopicRange> range(TopicMetadata metadata, int parallelism) {
return ranges;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class PulsarSourceITCase extends SourceTestSuiteBase<String> {
@TestEnv MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();

// Defines pulsar running environment
@ExternalSystem PulsarTestEnvironment pulsar = new PulsarTestEnvironment(PulsarRuntime.MOCK);
@ExternalSystem PulsarTestEnvironment pulsar = new PulsarTestEnvironment(PulsarRuntime.mock());

// Defines a external context Factories,
// so test cases will be invoked using this external contexts.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,31 @@

package org.apache.flink.connector.pulsar.testutils;

import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;

import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;

import java.util.Collection;

import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;

/** Source split data writer for writing test data into a Pulsar topic partition. */
public class PulsarPartitionDataWriter implements SourceSplitDataWriter<String> {

private final Producer<String> producer;
private final PulsarRuntimeOperator operator;
private final String fullTopicName;

public PulsarPartitionDataWriter(PulsarClient client, TopicPartition partition) {
try {
this.producer =
client.newProducer(Schema.STRING).topic(partition.getFullTopicName()).create();
} catch (PulsarClientException e) {
throw new IllegalStateException(e);
}
public PulsarPartitionDataWriter(PulsarRuntimeOperator operator, String fullTopicName) {
this.operator = operator;
this.fullTopicName = fullTopicName;
}

@Override
public void writeRecords(Collection<String> records) {
for (String record : records) {
sneakyClient(() -> producer.newMessage().value(record).send());
}
operator.sendMessages(fullTopicName, Schema.STRING, records);
}

@Override
public void close() throws Exception {
producer.close();
public void close() {
// Nothing to do.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,14 @@

/** Common test context for pulsar based test. */
public abstract class PulsarTestContext<T> implements ExternalContext<T> {
private static final long serialVersionUID = 4717940854368532130L;

private static final int NUM_RECORDS_UPPER_BOUND = 500;
private static final int NUM_RECORDS_LOWER_BOUND = 100;

private final String displayName;
protected final PulsarRuntimeOperator operator;

protected PulsarTestContext(String displayName, PulsarTestEnvironment environment) {
this.displayName = displayName;
protected PulsarTestContext(PulsarTestEnvironment environment) {
this.operator = environment.operator();
}

Expand All @@ -58,8 +57,10 @@ protected List<String> generateStringTestData(int splitIndex, long seed) {
return records;
}

protected abstract String displayName();

@Override
public String toString() {
return displayName;
return displayName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeProvider;
import org.apache.flink.connectors.test.common.TestResource;
import org.apache.flink.connectors.test.common.junit.annotations.ExternalSystem;

Expand Down Expand Up @@ -54,10 +53,10 @@
public class PulsarTestEnvironment
implements BeforeAllCallback, AfterAllCallback, TestResource, TestRule {

private final PulsarRuntimeProvider provider;
private final PulsarRuntime runtime;

public PulsarTestEnvironment(PulsarRuntime runtime) {
this.provider = runtime.provider();
this.runtime = runtime;
}

/** JUnit 4 Rule based test logic. */
Expand All @@ -66,7 +65,7 @@ public Statement apply(Statement base, Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
provider.startUp();
runtime.startUp();

List<Throwable> errors = new ArrayList<>();
try {
Expand All @@ -75,7 +74,7 @@ public void evaluate() throws Throwable {
errors.add(t);
} finally {
try {
provider.tearDown();
runtime.tearDown();
} catch (Throwable t) {
errors.add(t);
}
Expand All @@ -88,29 +87,29 @@ public void evaluate() throws Throwable {
/** JUnit 5 Extension setup method. */
@Override
public void beforeAll(ExtensionContext context) {
provider.startUp();
runtime.startUp();
}

/** flink-connector-testing setup method. */
@Override
public void startUp() {
provider.startUp();
runtime.startUp();
}

/** JUnit 5 Extension shutdown method. */
@Override
public void afterAll(ExtensionContext context) {
provider.tearDown();
runtime.tearDown();
}

/** flink-connector-testing shutdown method. */
@Override
public void tearDown() {
provider.tearDown();
runtime.tearDown();
}

/** Get a common supported set of method for operating pulsar which is in container. */
public PulsarRuntimeOperator operator() {
return provider.operator();
return runtime.operator();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public abstract class PulsarTestSuiteBase {
* pulsar broker. Override this method when needs.
*/
protected PulsarRuntime runtime() {
return PulsarRuntime.MOCK;
return PulsarRuntime.mock();
}

/** Operate pulsar by acquiring a runtime operator. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,96 +18,33 @@

package org.apache.flink.connector.pulsar.testutils.cases;

import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.connector.pulsar.source.PulsarSource;
import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.testutils.PulsarPartitionDataWriter;
import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;

import org.apache.pulsar.client.api.RegexSubscriptionMode;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;

import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
import static org.apache.pulsar.client.api.Schema.STRING;
import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
import org.apache.pulsar.client.api.SubscriptionType;

/**
* Pulsar external context that will create multiple topics with only one partitions as source
* splits.
*/
public class MultipleTopicConsumingContext extends PulsarTestContext<String> {

private int numTopics = 0;

private final String topicPattern;

private final Map<String, SourceSplitDataWriter<String>> topicNameToSplitWriters =
new HashMap<>();
public class MultipleTopicConsumingContext extends MultipleTopicTemplateContext {
private static final long serialVersionUID = -3855336888090886528L;

public MultipleTopicConsumingContext(PulsarTestEnvironment environment) {
super("consuming message on multiple topic", environment);
this.topicPattern =
"pulsar-multiple-topic-[0-9]+-"
+ ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
super(environment);
}

@Override
public Source<String, ?, ?> createSource(Boundedness boundedness) {
PulsarSourceBuilder<String> builder =
PulsarSource.builder()
.setDeserializationSchema(pulsarSchema(STRING))
.setServiceUrl(operator.serviceUrl())
.setAdminUrl(operator.adminUrl())
.setTopicPattern(topicPattern, RegexSubscriptionMode.AllTopics)
.setSubscriptionType(Exclusive)
.setSubscriptionName("flink-pulsar-multiple-topic-test");
if (boundedness == Boundedness.BOUNDED) {
// Using latest stop cursor for making sure the source could be stopped.
// This is required for SourceTestSuiteBase.
builder.setBoundedStopCursor(StopCursor.latest());
}

return builder.build();
protected String displayName() {
return "consuming message on multiple topic";
}

@Override
public SourceSplitDataWriter<String> createSourceSplitDataWriter() {
String topicName = topicPattern.replace("[0-9]+", String.valueOf(numTopics));
operator.createTopic(topicName, 1);

String partitionName = TopicNameUtils.topicNameWithPartition(topicName, 0);
TopicPartition partition = new TopicPartition(partitionName, 0, createFullRange());
PulsarPartitionDataWriter writer =
new PulsarPartitionDataWriter(operator.client(), partition);

topicNameToSplitWriters.put(partitionName, writer);
numTopics++;

return writer;
}

@Override
public Collection<String> generateTestData(int splitIndex, long seed) {
return generateStringTestData(splitIndex, seed);
protected String subscriptionName() {
return "flink-pulsar-multiple-topic-test";
}

@Override
public void close() throws Exception {
for (SourceSplitDataWriter<String> writer : topicNameToSplitWriters.values()) {
writer.close();
}

topicNameToSplitWriters.clear();
protected SubscriptionType subscriptionType() {
return SubscriptionType.Exclusive;
}
}
Loading

0 comments on commit b608620

Please sign in to comment.