Skip to content

Commit

Permalink
[FLINK-29830][Connector/Pulsar] Create the topic with schema before c…
Browse files Browse the repository at this point in the history
…onsuming messages in PulsarSinkITCase. (apache#21252)
  • Loading branch information
syhily authored Nov 11, 2022
1 parent b19a45d commit f8b3b33
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@

import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
import static org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema.flinkSchema;
import static org.apache.pulsar.client.api.Schema.STRING;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for using PulsarSink writing to a Pulsar cluster. */
Expand Down Expand Up @@ -104,6 +105,7 @@ void writeRecordsToPulsar(DeliveryGuarantee guarantee) throws Exception {
// A random topic with partition 4.
String topic = randomAlphabetic(8);
operator().createTopic(topic, 4);
operator().createSchema(topic, STRING);
int counts = ThreadLocalRandom.current().nextInt(100, 200);

ControlSource source =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@

import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,8 +49,12 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
import static org.apache.pulsar.client.api.SubscriptionMode.Durable;
import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;

/**
* This source is used for testing in Pulsar sink. We would generate a fix number of records by the
Expand Down Expand Up @@ -183,37 +191,58 @@ public List<String> getExpectedRecords() {
private static class StopSignal implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(StopSignal.class);

private final String topic;
private final int desiredCounts;
// This is a thread-safe list.
private final List<String> consumedRecords;
private final AtomicLong deadline;
private final ExecutorService executor;
private final Consumer<String> consumer;
private final AtomicReference<PulsarClientException> throwableException;

public StopSignal(
PulsarRuntimeOperator operator, String topic, int messageCounts, Duration timeout) {
this.topic = topic;
this.desiredCounts = messageCounts;
this.consumedRecords = Collections.synchronizedList(new ArrayList<>(messageCounts));
this.deadline = new AtomicLong(timeout.toMillis() + System.currentTimeMillis());
this.executor = Executors.newSingleThreadExecutor();
ConsumerBuilder<String> consumerBuilder =
operator.client()
.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(randomAlphanumeric(10))
.subscriptionMode(Durable)
.subscriptionType(Exclusive)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
this.consumer = sneakyClient(consumerBuilder::subscribe);
this.throwableException = new AtomicReference<>();

// Start consuming.
executor.execute(
() -> {
while (consumedRecords.size() < desiredCounts) {
// This method would block until we consumed a message.
int counts = desiredCounts - consumedRecords.size();
List<Message<String>> messages =
operator.receiveMessages(this.topic, Schema.STRING, counts);
for (Message<String> message : messages) {
consumedRecords.add(message.getValue());
for (int i = 0; i < counts; i++) {
try {
Message<String> message = consumer.receive();
consumedRecords.add(message.getValue());
} catch (PulsarClientException e) {
throwableException.set(e);
break;
}
}
}
});
}

public boolean canStop() {
PulsarClientException exception = throwableException.get();
if (exception != null) {
LOG.error("Error in consuming messages from Pulsar.");
LOG.error("", exception);
return true;
}

if (deadline.get() < System.currentTimeMillis()) {
String errorMsg =
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ public void createTopic(String topic, int numberOfPartitions) {
}
}

public void createSchema(String topic, Schema<?> schema) {
sneakyAdmin(() -> admin().schemas().createSchema(topic, schema.getSchemaInfo()));
}

/**
* Increase the partition number of the topic.
*
Expand Down

0 comments on commit f8b3b33

Please sign in to comment.