Skip to content

Commit

Permalink
[java client] Bugfix prevent dup consumers for same topic subscribe (a…
Browse files Browse the repository at this point in the history
…pache#3746)

Fixes apache#3743 issue.

Return previous instance of a consumer in the subscription processed should only
be considered with the scope of the same topic.

Modifications:

  - Remove optimization of duplicated consumers for multi topics subscribe and
    pattern topics subscribe, this should be handled with a different approach.
  - Filter consumers for the same topic name.
  - Filter consumers which are connected to broker, this is not necessary to fix
    this issue but is a good thing to do.
  - Add test that verifies that same subscription will allow different consumers
    instance for different topics.
  • Loading branch information
lovelle authored and merlimat committed Mar 7, 2019
1 parent 3d9b47e commit fb5dcd9
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2994,7 +2994,7 @@ public void testFailOverConsumerPriority() throws Exception {
// Pull 3312: https://github.com/apache/pulsar/pull/3312
// Bugfix preventing duplicated consumers on same client cnx with shared subscription mode
@Test()
public void testPreventDupConsumersOnClientCnx() throws Exception {
public void testPreventDupConsumersOnClientCnxForSingleSub() throws Exception {
final CompletableFuture<Void> future = new CompletableFuture<>();
final String topic = "persistent://my-property/my-ns/my-topic";
final String subName = "my-subscription";
Expand Down Expand Up @@ -3024,7 +3024,56 @@ public void testPreventDupConsumersOnClientCnx() throws Exception {
});

future.get(5, TimeUnit.SECONDS);
Assert.assertEquals(consumer, consumerB);
Assert.assertTrue(future.isDone());
Assert.assertFalse(future.isCompletedExceptionally());
}

@Test()
public void testPreventDupConsumersOnClientCnxForSingleSub_AllowDifferentTopics() throws Exception {
final CompletableFuture<Void> future = new CompletableFuture<>();
final String topic = "persistent://my-property/my-ns/my-topic";
final String subName = "my-subscription";

Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared)
.subscribe();
Consumer<byte[]> consumerB = pulsarClient.newConsumer().topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared)
.subscribe();

// This consumer should be a newly subscription since is it from a different topic
// even though has the same subscription name.
Consumer<byte[]> consumerC = pulsarClient.newConsumer().topic(topic + "-different-topic")
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared)
.subscribe();

consumer.unsubscribeAsync().whenComplete((aVoid1, t1) -> {
if (t1 != null) {
future.completeExceptionally(t1);
return;
}

consumer.closeAsync().whenComplete((aVoid2, t2) -> {
if (t2 != null) {
future.completeExceptionally(t2);
return;
}
future.complete(null);
});
});

future.get(5, TimeUnit.SECONDS);
Assert.assertEquals(consumer, consumerB);
Assert.assertTrue(future.isDone());
Assert.assertFalse(future.isCompletedExceptionally());

// consumerC is a newly created subscription.
Assert.assertNotEquals(consumer, consumerC);
Assert.assertTrue(consumerC.isConnected());
consumerC.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,11 +350,6 @@ private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerC
}

private <T> CompletableFuture<Consumer<T>> multiTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
Optional<ConsumerBase<T>> subscriber = subscriptionExist(conf);
if (subscriber.isPresent()) {
return CompletableFuture.completedFuture(subscriber.get());
}

CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>();

ConsumerBase<T> consumer = new MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf,
Expand All @@ -377,10 +372,6 @@ private <T> CompletableFuture<Consumer<T>> patternTopicSubscribeAsync(ConsumerCo
Mode subscriptionMode = convertRegexSubscriptionMode(conf.getRegexSubscriptionMode());
TopicName destination = TopicName.get(regex);
NamespaceName namespaceName = destination.getNamespaceObject();
Optional<ConsumerBase<T>> subscriber = subscriptionExist(conf);
if (subscriber.isPresent()) {
return CompletableFuture.completedFuture(subscriber.get());
}

CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>();
lookup.getTopicsUnderNamespace(namespaceName, subscriptionMode)
Expand Down Expand Up @@ -688,8 +679,10 @@ public CompletableFuture<List<String>> getPartitionsForTopic(String topic) {
private <T> Optional<ConsumerBase<T>> subscriptionExist(ConsumerConfigurationData<?> conf) {
synchronized (consumers) {
Optional<ConsumerBase<?>> subscriber = consumers.keySet().stream()
.filter(consumerBase -> consumerBase.getSubType().equals(PulsarApi.CommandSubscribe.SubType.Shared))
.filter(c -> c.getSubType().equals(PulsarApi.CommandSubscribe.SubType.Shared))
.filter(c -> conf.getTopicNames().contains(c.getTopic()))
.filter(c -> c.getSubscription().equals(conf.getSubscriptionName()))
.filter(Consumer::isConnected)
.findFirst();
return subscriber.map(ConsumerBase.class::cast);
}
Expand Down

0 comments on commit fb5dcd9

Please sign in to comment.