Skip to content

Commit

Permalink
Prevent dup consumers on same client cnx with shared subscription (ap…
Browse files Browse the repository at this point in the history
…ache#3312)

* Prevent dup consumers on same client cnx with shared subscription

Providing help trying to fix issue apache#3226. Bug description:

When a client attempts to setup more than one consumer subscription on shared
mode with the same subscription name, due to the validation at broker level of
`consumerList.size() == 1` on canUnsubscribe() method, broker will throw an
exception at the moment the client tries to unsubscribe the consumer.

In order to prevent this, the proposed solution (probably not the best one) is
to detect when the user is trying to setup an already subscribed consumer and
return this exact same consumer instance.

I believe that is quite strange to have two or more consumers with shared mode
on the same connection for the same subscription, this might be due to the user
confusing about consumer shared mode or behaviour, another good solution to
prevent this from happening might be just throwing an invalid configuration
exception.

* Adapt tests for bugfix introduced in commit 44e1a23

* Add test case exploiting issue 3326 described in commit 44e1a23
  • Loading branch information
lovelle authored and jiazhai committed Jan 27, 2019
1 parent 762e0ab commit 231db03
Show file tree
Hide file tree
Showing 12 changed files with 272 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
Expand Down Expand Up @@ -111,7 +112,11 @@ protected final void internalSetup() throws Exception {
if (isTcpLookup) {
lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT);
}
pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).statsInterval(0, TimeUnit.SECONDS).build();
pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
}

protected PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException {
return PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS).build();
}

protected final void internalSetupForStatsTest() throws Exception {
Expand All @@ -120,7 +125,7 @@ protected final void internalSetupForStatsTest() throws Exception {
if (isTcpLookup) {
lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
}
pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).statsInterval(1, TimeUnit.SECONDS).build();
pulsarClient = newPulsarClient(lookupUrl, 1);
}

protected final void init() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,9 @@
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.policies.data.ConsumerStats;
Expand Down Expand Up @@ -91,12 +86,13 @@ public void testSimpleConsumerEvents() throws Exception {
final String subName = "sub1";
final int numMsgs = 100;

ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared);

// 1. two consumers on the same subscription
Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared).subscribe();

PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> consumer2 = newPulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared).subscribe();

PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
PersistentSubscription subRef = topicRef.getSubscription(subName);
Expand Down Expand Up @@ -180,6 +176,7 @@ public void testSimpleConsumerEvents() throws Exception {

producer.close();
consumer2.close();
newPulsarClient.close();

deleteTopic(topicName);
}
Expand All @@ -204,7 +201,8 @@ public void testReplayOnConsumerDisconnect() throws Exception {
}).subscribe();

// consumer2 does not ack messages
Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> consumer2 = newPulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared).messageListener((consumer, msg) -> {
// do notthing
}).subscribe();
Expand All @@ -229,6 +227,7 @@ public void testReplayOnConsumerDisconnect() throws Exception {
assertTrue(CollectionUtils.subtract(messagesProduced, messagesConsumed).isEmpty());

consumer1.close();
newPulsarClient.close();

deleteTopic(topicName);
}
Expand Down Expand Up @@ -322,10 +321,14 @@ public void testSharedSingleAckedNormalTopic() throws Exception {
assertEquals(topicRef.getProducers().size(), 1);

// 2. Create consumer
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName)
ConsumerBuilder<byte[]> consumerBuilder1 = pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriptionName).receiverQueueSize(10).subscriptionType(SubscriptionType.Shared);
Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
Consumer<byte[]> consumer1 = consumerBuilder1.subscribe();

PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
ConsumerBuilder<byte[]> consumerBuilder2 = newPulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriptionName).receiverQueueSize(10).subscriptionType(SubscriptionType.Shared);
Consumer<byte[]> consumer2 = consumerBuilder2.subscribe();

// 3. Producer publishes messages
for (int i = 0; i < totalMessages; i++) {
Expand Down Expand Up @@ -369,6 +372,7 @@ public void testSharedSingleAckedNormalTopic() throws Exception {
receivedConsumer2 += 1;
}

newPulsarClient.close();
log.info("Total receives by Consumer 2 = " + receivedConsumer2);
assertEquals(receivedConsumer2, totalMessages);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,7 @@ public void testSubscriptionTypeTransitions() throws Exception {

// 1. shared consumer on an exclusive sub fails
try {
PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared).subscribe();
fail("should have failed");
Expand All @@ -806,6 +807,7 @@ public void testSubscriptionTypeTransitions() throws Exception {

// 2. failover consumer on an exclusive sub fails
try {
PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
consumer3 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover).subscribe();
fail("should have failed");
Expand All @@ -816,6 +818,7 @@ public void testSubscriptionTypeTransitions() throws Exception {
// 3. disconnected sub can be converted in shared
consumer1.close();
try {
PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared).subscribe();
assertEquals(subRef.getDispatcher().getType(), SubType.Shared);
Expand All @@ -825,6 +828,7 @@ public void testSubscriptionTypeTransitions() throws Exception {

// 4. exclusive fails on shared sub
try {
PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Exclusive).subscribe();
fail("should have failed");
Expand All @@ -835,6 +839,7 @@ public void testSubscriptionTypeTransitions() throws Exception {
// 5. disconnected sub can be converted in failover
consumer2.close();
try {
PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
consumer3 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover).subscribe();
assertEquals(subRef.getDispatcher().getType(), SubType.Failover);
Expand All @@ -845,6 +850,7 @@ public void testSubscriptionTypeTransitions() throws Exception {
// 5. exclusive consumer can connect after failover disconnects
consumer3.close();
try {
PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Exclusive).subscribe();
assertEquals(subRef.getDispatcher().getType(), SubType.Exclusive);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,9 @@
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.slf4j.Logger;
Expand Down Expand Up @@ -171,11 +167,14 @@ public void testSharedSingleAckedNormalTopic() throws Exception {
assertEquals(topicRef.getProducers().size(), 1);

// 2. Create consumer
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName)
Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriptionName).receiverQueueSize(totalMessages / 2)
.subscriptionType(SubscriptionType.Shared).subscribe();

PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> consumer2 = newPulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriptionName).receiverQueueSize(totalMessages / 2)
.subscriptionType(SubscriptionType.Shared);
Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
.subscriptionType(SubscriptionType.Shared).subscribe();

// 3. Producer publishes messages
for (int i = 0; i < totalMessages; i++) {
Expand Down Expand Up @@ -232,6 +231,7 @@ public void testSharedSingleAckedNormalTopic() throws Exception {
message2 = consumer2.receive(200, TimeUnit.MILLISECONDS);
} while (message1 != null || message2 != null);
log.info("Additional received = " + receivedMessagesAfterRedelivery);
newPulsarClient.close();
assertTrue(receivedMessagesAfterRedelivery > 0);

assertEquals(receivedConsumer1 + receivedConsumer2, totalMessages);
Expand Down Expand Up @@ -486,10 +486,12 @@ public void testSharedSingleAckedPartitionedTopic() throws Exception {
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();

// 2. Create consumer
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriptionName).receiverQueueSize(7).subscriptionType(SubscriptionType.Shared);
Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.receiverQueueSize(7).subscriptionType(SubscriptionType.Shared).subscribe();

PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> consumer2 = newPulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.receiverQueueSize(7).subscriptionType(SubscriptionType.Shared).subscribe();

// 3. producer publish messages
for (int i = 0; i < totalMessages; i++) {
Expand Down Expand Up @@ -561,6 +563,7 @@ public void testSharedSingleAckedPartitionedTopic() throws Exception {
log.info(key + " messageCount2 = " + messageCount2);
log.info(key + " ackCount1 = " + ackCount1);
log.info(key + " ackCount2 = " + ackCount2);
newPulsarClient.close();
assertEquals(messageCount1 + messageCount2 + ackCount1, totalMessages);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public void testDeadLetterTopic() throws Exception {
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

Consumer<byte[]> deadLetterConsumer = pulsarClient.newConsumer(Schema.BYTES)
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
.subscriptionName("my-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
Expand Down Expand Up @@ -98,7 +99,7 @@ public void testDeadLetterTopic() throws Exception {
deadLetterConsumer.close();
consumer.close();

Consumer<byte[]> checkConsumer = pulsarClient.newConsumer(Schema.BYTES)
Consumer<byte[]> checkConsumer = this.pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
Expand All @@ -112,6 +113,7 @@ public void testDeadLetterTopic() throws Exception {
assertNull(checkMessage);

checkConsumer.close();
newPulsarClient.close();
}

/**
Expand Down Expand Up @@ -216,7 +218,8 @@ public void testDeadLetterTopicByCustomTopicName() throws Exception {
.build())
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Consumer<byte[]> deadLetterConsumer = pulsarClient.newConsumer(Schema.BYTES)
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
.topic("persistent://my-property/my-ns/dead-letter-custom-topic-my-subscription-custom-DLQ")
.subscriptionName("my-subscription")
.subscribe();
Expand Down Expand Up @@ -244,7 +247,8 @@ public void testDeadLetterTopicByCustomTopicName() throws Exception {
} while (totalInDeadLetter < sendMessages);
deadLetterConsumer.close();
consumer.close();
Consumer<byte[]> checkConsumer = pulsarClient.newConsumer(Schema.BYTES)
PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> checkConsumer = newPulsarClient1.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
Expand All @@ -255,6 +259,8 @@ public void testDeadLetterTopicByCustomTopicName() throws Exception {
log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData()));
}
assertNull(checkMessage);
newPulsarClient.close();
newPulsarClient1.close();
checkConsumer.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -750,8 +750,9 @@ public void testBlockBrokerDispatching() throws Exception {
}
// client must receive number of messages = maxUnAckPerbroker rather all produced messages
assertNotEquals(messages1.size(), totalProducedMsgs);
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
// (1.b) consumer2 with same sub should not receive any more messages as subscription is blocked
ConsumerImpl<byte[]> consumer2Sub1 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
ConsumerImpl<byte[]> consumer2Sub1 = (ConsumerImpl<byte[]>) newPulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriberName1).receiverQueueSize(receiverQueueSize)
.subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
int consumer2Msgs = 0;
Expand Down Expand Up @@ -847,6 +848,7 @@ public void testBlockBrokerDispatching() throws Exception {
consumer1Sub1.close();
consumerSub2.close();
consumer1Sub3.close();
newPulsarClient.close();

log.info("-- Exiting {} test --", methodName);
} catch (Exception e) {
Expand Down Expand Up @@ -947,7 +949,8 @@ public void testBrokerDispatchBlockAndSubAckBackRequiredMsgs() {
// client must receive number of messages = maxUnAckPerbroker rather all produced messages
assertNotEquals(messages1.size(), totalProducedMsgs);
// (1.b) consumer2 with same sub should not receive any more messages as subscription is blocked
ConsumerImpl<byte[]> consumer2Sub1 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
ConsumerImpl<byte[]> consumer2Sub1 = (ConsumerImpl<byte[]>) newPulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriberName1).receiverQueueSize(receiverQueueSize)
.subscriptionType(SubscriptionType.Shared).subscribe();
int consumer2Msgs = 0;
Expand Down Expand Up @@ -1012,6 +1015,7 @@ public void testBrokerDispatchBlockAndSubAckBackRequiredMsgs() {

consumer1Sub1.close();
consumer1Sub2.close();
newPulsarClient.close();

log.info("-- Exiting {} test --", methodName);
} catch (Exception e) {
Expand Down
Loading

0 comments on commit 231db03

Please sign in to comment.