Skip to content

Commit

Permalink
[pulsar-client] Avoid subscribing the same topic again (apache#7823)
Browse files Browse the repository at this point in the history
### Motivation

The current key of `MultiTopicsConsumerImpl.topics` is the topic name passed by user. The `topicNameValid` method checks if the name is valid and `topics` doesn't contain the key.

However, if a multi topics consumer subscribed a partition of a subscribed partitioned topic,  `subscribeAsync` succeed and a new `ConsumerImpl` of the same partition was created, which is redundant.

Also, if a multi topics consumer subscribed `public/default/topic` or `persistent://public/default/topic`, while the initial subscribed topic is `topic`, the redundant consumers would be created.

### Modifications

- Use full topic name as key of `MultiTopicsConsumerImpl.topics`
- Check both full topic name and full partitioned topic name not exist in `MultiTopicsConsumerImpl.topics` when `subscribeAsync` is called
- Throw a different exception to differ topic is invalid and topic is already subscribed
- Add a unit test for subscribing a partition of a subscribed partitioned topic or the same topic with prefix
  • Loading branch information
BewareMyPower authored Aug 17, 2020
1 parent bc5be21 commit e1b76a3
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ public void testTopicNameValid() throws Exception{
}).get();
((MultiTopicsConsumerImpl) consumer).subscribeAsync(topicName, 3).handle((res, exception) -> {
assertTrue(exception instanceof PulsarClientException.AlreadyClosedException);
assertEquals(((PulsarClientException.AlreadyClosedException) exception).getMessage(), "Topic name not valid");
assertEquals(((PulsarClientException.AlreadyClosedException) exception).getMessage(), "Already subscribed to " + topicName);
return null;
}).get();
}
Expand Down Expand Up @@ -606,6 +606,44 @@ public void testSubscribeUnsubscribeSingleTopic() throws Exception {
}


@Test
public void testResubscribeSameTopic() throws Exception {
final String localTopicName = "TopicsConsumerResubscribeSameTopicTest";
final String localPartitionName = localTopicName + "-partition-0";
final String topicNameWithNamespace = "public/default/" + localTopicName;
final String topicNameWithDomain = "persistent://" + topicNameWithNamespace;

admin.topics().createPartitionedTopic(localTopicName, 2);

Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(localTopicName)
.subscriptionName("SubscriptionName")
.subscribe();

assertTrue(consumer instanceof MultiTopicsConsumerImpl);
MultiTopicsConsumerImpl<byte[]> multiTopicsConsumer = (MultiTopicsConsumerImpl<byte[]>) consumer;

multiTopicsConsumer.subscribeAsync(topicNameWithNamespace, false).handle((res, exception) -> {
assertTrue(exception instanceof PulsarClientException.AlreadyClosedException);
assertEquals(exception.getMessage(), "Already subscribed to " + topicNameWithNamespace);
return null;
}).get();
multiTopicsConsumer.subscribeAsync(topicNameWithDomain, false).handle((res, exception) -> {
assertTrue(exception instanceof PulsarClientException.AlreadyClosedException);
assertEquals(exception.getMessage(), "Already subscribed to " + topicNameWithDomain);
return null;
}).get();
multiTopicsConsumer.subscribeAsync(localPartitionName, false).handle((res, exception) -> {
assertTrue(exception instanceof PulsarClientException.AlreadyClosedException);
assertEquals(exception.getMessage(), "Already subscribed to " + localPartitionName);
return null;
}).get();

consumer.unsubscribe();
consumer.close();
}


@Test(timeOut = testTimeout)
public void testTopicsNameSubscribeWithBuilderFail() throws Exception {
String key = "TopicsNameSubscribeWithBuilder";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -720,15 +720,37 @@ private void removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
}
}

private boolean topicNameValid(String topicName) {
return TopicName.isValid(topicName) && !topics.containsKey(topicName);
private TopicName getTopicName(String topic) {
try {
return TopicName.get(topic);
} catch (Exception ignored) {
return null;
}
}

private String getFullTopicName(String topic) {
TopicName topicName = getTopicName(topic);
return (topicName != null) ? topicName.toString() : null;
}

private void removeTopic(String topic) {
String fullTopicName = getFullTopicName(topic);
if (fullTopicName != null) {
topics.remove(topic);
}
}

// subscribe one more given topic
public CompletableFuture<Void> subscribeAsync(String topicName, boolean createTopicIfDoesNotExist) {
if (!topicNameValid(topicName)) {
TopicName topicNameInstance = getTopicName(topicName);
if (topicNameInstance == null) {
return FutureUtil.failedFuture(
new PulsarClientException.AlreadyClosedException("Topic name not valid"));
}
String fullTopicName = topicNameInstance.toString();
if (topics.containsKey(fullTopicName) || topics.containsKey(topicNameInstance.getPartitionedTopicName())) {
return FutureUtil.failedFuture(
new PulsarClientException.AlreadyClosedException("Topic name not valid"));
new PulsarClientException.AlreadyClosedException("Already subscribed to " + topicName));
}

if (getState() == State.Closing || getState() == State.Closed) {
Expand All @@ -739,10 +761,10 @@ public CompletableFuture<Void> subscribeAsync(String topicName, boolean createTo
CompletableFuture<Void> subscribeResult = new CompletableFuture<>();

client.getPartitionedTopicMetadata(topicName)
.thenAccept(metadata -> subscribeTopicPartitions(subscribeResult, topicName, metadata.partitions,
.thenAccept(metadata -> subscribeTopicPartitions(subscribeResult, fullTopicName, metadata.partitions,
createTopicIfDoesNotExist))
.exceptionally(ex1 -> {
log.warn("[{}] Failed to get partitioned topic metadata: {}", topicName, ex1.getMessage());
log.warn("[{}] Failed to get partitioned topic metadata: {}", fullTopicName, ex1.getMessage());
subscribeResult.completeExceptionally(ex1);
return null;
});
Expand Down Expand Up @@ -784,9 +806,15 @@ public static <T> MultiTopicsConsumerImpl<T> createPartitionedConsumer(PulsarCli
// subscribe one more given topic, but already know the numberPartitions
@VisibleForTesting
CompletableFuture<Void> subscribeAsync(String topicName, int numberPartitions) {
if (!topicNameValid(topicName)) {
TopicName topicNameInstance = getTopicName(topicName);
if (topicNameInstance == null) {
return FutureUtil.failedFuture(
new PulsarClientException.AlreadyClosedException("Topic name not valid"));
}
String fullTopicName = topicNameInstance.toString();
if (topics.containsKey(fullTopicName) || topics.containsKey(topicNameInstance.getPartitionedTopicName())) {
return FutureUtil.failedFuture(
new PulsarClientException.AlreadyClosedException("Topic name not valid"));
new PulsarClientException.AlreadyClosedException("Already subscribed to " + topicName));
}

if (getState() == State.Closing || getState() == State.Closed) {
Expand All @@ -795,7 +823,7 @@ CompletableFuture<Void> subscribeAsync(String topicName, int numberPartitions) {
}

CompletableFuture<Void> subscribeResult = new CompletableFuture<>();
subscribeTopicPartitions(subscribeResult, topicName, numberPartitions, true /* createTopicIfDoesNotExist */);
subscribeTopicPartitions(subscribeResult, fullTopicName, numberPartitions, true /* createTopicIfDoesNotExist */);

return subscribeResult;
}
Expand Down Expand Up @@ -926,7 +954,7 @@ private void handleSubscribeOneTopicError(String topicName, Throwable error, Com
if (toCloseNum.decrementAndGet() == 0) {
log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer, subscribe error: {}",
topic, topicName, error.getMessage());
topics.remove(topicName);
removeTopic(topicName);
subscribeFuture.completeExceptionally(error);
}
return;
Expand Down Expand Up @@ -970,7 +998,7 @@ public CompletableFuture<Void> unsubscribeAsync(String topicName) {
allTopicPartitionsNumber.decrementAndGet();
});

topics.remove(topicName);
removeTopic(topicName);
((UnAckedTopicMessageTracker) unAckedMessageTracker).removeTopicMessages(topicName);

unsubscribeFuture.complete(null);
Expand Down Expand Up @@ -1018,7 +1046,7 @@ public CompletableFuture<Void> removeConsumerAsync(String topicName) {
allTopicPartitionsNumber.decrementAndGet();
});

topics.remove(topicName);
removeTopic(topicName);
((UnAckedTopicMessageTracker) unAckedMessageTracker).removeTopicMessages(topicName);

unsubscribeFuture.complete(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,7 @@ public void testParallelSubscribeAsync() throws Exception {
Throwable t = expectThrows(ExecutionException.class, secondInvocation::get);
Throwable cause = t.getCause();
assertEquals(cause.getClass(), PulsarClientException.class);
assertTrue(cause.getMessage().endsWith(
"Failed to subscribe for topic [parallel-subscribe-async-topic] in topics consumer. "
+ "Topic is already being subscribed for in other thread."));
assertTrue(cause.getMessage().endsWith("Topic is already being subscribed for in other thread."));
}

private <T> PulsarClientImpl setUpPulsarClientMock(Schema<T> schema, int completionDelayMillis) {
Expand Down

0 comments on commit e1b76a3

Please sign in to comment.