Skip to content

Commit

Permalink
[FLINK-27388][Connector/pulsar] Change the topic setup logic in Pulsa…
Browse files Browse the repository at this point in the history
…r runtime operator.
  • Loading branch information
syhily authored and tisonkun committed Sep 9, 2022
1 parent b0a1241 commit d92ef3b
Showing 1 changed file with 58 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
Expand All @@ -53,7 +54,6 @@
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import java.util.stream.Stream;
Expand All @@ -76,7 +76,8 @@
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.pulsar.client.api.SubscriptionInitialPosition.Earliest;
import static org.apache.pulsar.client.api.MessageId.earliest;
import static org.apache.pulsar.client.api.ProducerAccessMode.Shared;
import static org.apache.pulsar.client.api.SubscriptionMode.Durable;
import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
import static org.apache.pulsar.common.partition.PartitionedTopicMetadata.NON_PARTITIONED;
Expand All @@ -95,8 +96,6 @@ public class PulsarRuntimeOperator implements Closeable {
private final String adminUrl;
private final PulsarClient client;
private final PulsarAdmin admin;
private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Producer<?>>> producers;
private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Consumer<?>>> consumers;

public PulsarRuntimeOperator(String serviceUrl, String adminUrl) {
this(serviceUrl, serviceUrl, adminUrl, adminUrl);
Expand All @@ -117,8 +116,6 @@ public PulsarRuntimeOperator(
.enableTransaction(true)
.build());
this.admin = sneakyClient(() -> PulsarAdmin.builder().serviceHttpUrl(adminUrl).build());
this.producers = new ConcurrentHashMap<>();
this.consumers = new ConcurrentHashMap<>();
}

/**
Expand Down Expand Up @@ -169,7 +166,8 @@ public <T> void setupTopic(
}

/**
* Create a pulsar topic with given partition number.
* Create a pulsar topic with given partition number if the topic doesn't exist. We won't do
* anything for the existing topic. Make sure correctly used in the testing code.
*
* @param topic The name of the topic.
* @param numberOfPartitions The number of partitions. We would create a non-partitioned topic
Expand Down Expand Up @@ -219,10 +217,6 @@ public void deleteTopic(String topic) {
return;
}

// Close all the available consumers and producers.
removeConsumers(topic);
removeProducers(topic);

if (metadata.partitions == NON_PARTITIONED) {
sneakyAdmin(() -> admin().topics().delete(topicName));
} else {
Expand Down Expand Up @@ -305,8 +299,8 @@ public <T> List<MessageId> sendMessages(
*/
public <T> List<MessageId> sendMessages(
String topic, Schema<T> schema, String key, Collection<T> messages) {
Producer<T> producer = createProducer(topic, schema);
try {
Producer<T> producer = createProducer(topic, schema);
List<MessageId> messageIds = new ArrayList<>(messages.size());

for (T message : messages) {
Expand All @@ -322,6 +316,15 @@ public <T> List<MessageId> sendMessages(
} catch (PulsarClientException e) {
sneakyThrow(e);
return emptyList();
} finally {
try {
// Waiting for all the pending messages be sent to the Pulsar.
producer.flush();
// Directly close without the flush will drop all the pending messages.
producer.close();
} catch (PulsarClientException e) {
// Just ignore the exception here.
}
}
}

Expand All @@ -330,9 +333,10 @@ public <T> List<MessageId> sendMessages(
* message from this topic.
*/
public <T> Message<T> receiveMessage(String topic, Schema<T> schema) {
try {
Consumer<T> consumer = createConsumer(topic, schema);
return drainOneMessage(consumer);
try (Consumer<T> consumer = createConsumer(topic, schema)) {
Message<T> message = consumer.receive();
consumer.acknowledge(message.getMessageId());
return message;
} catch (PulsarClientException e) {
sneakyThrow(e);
return null;
Expand All @@ -344,10 +348,10 @@ public <T> Message<T> receiveMessage(String topic, Schema<T> schema) {
* timeout. A null message would be returned if no message has been consumed from Pulsar.
*/
public <T> Message<T> receiveMessage(String topic, Schema<T> schema, Duration timeout) {
try {
Consumer<T> consumer = createConsumer(topic, schema);
Message<T> message = consumer.receiveAsync().get(timeout.toMillis(), MILLISECONDS);
consumer.acknowledgeCumulative(message.getMessageId());
try (Consumer<T> consumer = createConsumer(topic, schema)) {
Message<T> message =
consumer.receive(Math.toIntExact(timeout.toMillis()), MILLISECONDS);
consumer.acknowledge(message.getMessageId());

return message;
} catch (Exception e) {
Expand All @@ -371,12 +375,12 @@ public <T> List<Message<T>> receiveMessages(String topic, Schema<T> schema, int
return singletonList(message);
} else {
// Drain a fixed number of messages.
try {
Consumer<T> consumer = createConsumer(topic, schema);
try (Consumer<T> consumer = createConsumer(topic, schema)) {
List<Message<T>> messages = new ArrayList<>(counts);
for (int i = 0; i < counts; i++) {
Message<T> message = drainOneMessage(consumer);
Message<T> message = consumer.receive();
messages.add(message);
consumer.acknowledge(message.getMessageId());
}
return messages;
} catch (PulsarClientException e) {
Expand Down Expand Up @@ -459,9 +463,6 @@ public Configuration sinkConfig(DeliveryGuarantee deliveryGuarantee) {
/** This method is used for test framework. You can't close this operator manually. */
@Override
public void close() throws IOException {
producers.clear();
consumers.clear();

if (admin != null) {
admin.close();
}
Expand All @@ -474,93 +475,53 @@ public void close() throws IOException {

private void createNonPartitionedTopic(String topic) {
try {
admin().lookups().lookupTopic(topic);
sneakyAdmin(() -> admin().topics().expireMessagesForAllSubscriptions(topic, 0));
admin().topics().createNonPartitionedTopic(topic);
} catch (PulsarAdminException e) {
sneakyAdmin(() -> admin().topics().createNonPartitionedTopic(topic));
if (!(e instanceof ConflictException
&& e.getMessage().equals("This topic already exists"))) {
sneakyThrow(e);
}
}
}

private void createPartitionedTopic(String topic, int numberOfPartitions) {
try {
admin().lookups().lookupPartitionedTopic(topic);
sneakyAdmin(() -> admin().topics().expireMessagesForAllSubscriptions(topic, 0));
admin().topics().createPartitionedTopic(topic, numberOfPartitions);
} catch (PulsarAdminException e) {
sneakyAdmin(() -> admin().topics().createPartitionedTopic(topic, numberOfPartitions));
if (!(e instanceof ConflictException
&& e.getMessage().equals("This topic already exists"))) {
sneakyThrow(e);
}
}
}

@SuppressWarnings("unchecked")
private <T> Producer<T> createProducer(String topic, Schema<T> schema)
throws PulsarClientException {
TopicName topicName = TopicName.get(topic);
String name = topicName.getPartitionedTopicName();
int index = topicName.getPartitionIndex();
ConcurrentHashMap<Integer, Producer<?>> topicProducers =
producers.computeIfAbsent(name, d -> new ConcurrentHashMap<>());

return (Producer<T>)
topicProducers.computeIfAbsent(
index,
i -> {
ProducerBuilder<T> builder =
client().newProducer(schema)
.topic(topic)
.enableBatching(false)
.enableMultiSchema(true);

return sneakyClient(builder::create);
});
}
private synchronized <T> Producer<T> createProducer(String topic, Schema<T> schema) {
ProducerBuilder<T> builder =
client().newProducer(schema)
.topic(topic)
.enableBatching(false)
.enableMultiSchema(true)
.accessMode(Shared);

@SuppressWarnings("unchecked")
private <T> Consumer<T> createConsumer(String topic, Schema<T> schema)
throws PulsarClientException {
TopicName topicName = TopicName.get(topic);
String name = topicName.getPartitionedTopicName();
int index = topicName.getPartitionIndex();
ConcurrentHashMap<Integer, Consumer<?>> topicConsumers =
consumers.computeIfAbsent(name, d -> new ConcurrentHashMap<>());

return (Consumer<T>)
topicConsumers.computeIfAbsent(
index,
i -> {
ConsumerBuilder<T> builder =
client().newConsumer(schema)
.topic(topic)
.subscriptionName(SUBSCRIPTION_NAME)
.subscriptionMode(Durable)
.subscriptionType(Exclusive)
.subscriptionInitialPosition(Earliest);

return sneakyClient(builder::subscribe);
});
return sneakyClient(builder::create);
}

private void removeProducers(String topic) {
String topicName = topicName(topic);
ConcurrentHashMap<Integer, Producer<?>> integerProducers = producers.remove(topicName);
if (integerProducers != null) {
for (Producer<?> producer : integerProducers.values()) {
sneakyClient(producer::close);
}
private synchronized <T> Consumer<T> createConsumer(String topic, Schema<T> schema) {
// Create the earliest subscription if it's not existed.
List<String> subscriptions = sneakyAdmin(() -> admin().topics().getSubscriptions(topic));
if (!subscriptions.contains(SUBSCRIPTION_NAME)) {
sneakyAdmin(
() -> admin().topics().createSubscription(topic, SUBSCRIPTION_NAME, earliest));
}
}

private void removeConsumers(String topic) {
String topicName = topicName(topic);
ConcurrentHashMap<Integer, Consumer<?>> integerConsumers = consumers.remove(topicName);
if (integerConsumers != null) {
for (Consumer<?> consumer : integerConsumers.values()) {
sneakyClient(consumer::close);
}
}
}
// Create the consumer without the initial position.
ConsumerBuilder<T> builder =
client().newConsumer(schema)
.topic(topic)
.subscriptionName(SUBSCRIPTION_NAME)
.subscriptionMode(Durable)
.subscriptionType(Exclusive);

private <T> Message<T> drainOneMessage(Consumer<T> consumer) throws PulsarClientException {
Message<T> message = consumer.receive();
consumer.acknowledgeCumulative(message.getMessageId());
return message;
return sneakyClient(builder::subscribe);
}
}

0 comments on commit d92ef3b

Please sign in to comment.