Skip to content

Commit

Permalink
Feature - implement reference count for ConsumerImpl (apache#3795)
Browse files Browse the repository at this point in the history
* Feature - implement reference count for ConsumerImpl

Add reference count for ConsumerImpl in order to track reused instances of a
consumer instance returned by `subscribe()` method call.
Having the reference of subscribed consumer instances offers the ability to not
close a consumer until the last corresponding `close()` is being called.

Modifications:

  - Add field on ConsumerBase to track references of consumer instances
    subscribed by the user.
  - Add checks on ConsumerImpl to know whether close() action should be
    performed regarding of reference count being zero value.
  - Increment reference count when a previous built consumer instance is being
    used by caller.

Future considerations:

When optimization apache#3312 is going to be made for other consumers implementation
aside from ConsumerImpl it should add refCount checks on close() method call.

* Add tests for reference count on ConsumerImpl

  - Add test to verify ConsumerImpl reference count on close() method.
  - Fix test from dup consumers feature with refcount.
  • Loading branch information
lovelle authored and sijie committed Mar 12, 2019
1 parent 117321d commit ff4db8d
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.client.api;

import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -65,7 +64,6 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageCrypto;
import org.apache.pulsar.client.impl.MessageIdImpl;
Expand Down Expand Up @@ -3076,4 +3074,29 @@ public void testPreventDupConsumersOnClientCnxForSingleSub_AllowDifferentTopics(
Assert.assertTrue(consumerC.isConnected());
consumerC.close();
}

@Test
public void testRefCount_OnCloseConsumer() throws Exception {
final String topic = "persistent://my-property/my-ns/my-topic";
final String subName = "my-subscription";

Consumer<byte[]> consumerA = pulsarClient.newConsumer().topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared)
.subscribe();
Consumer<byte[]> consumerB = pulsarClient.newConsumer().topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared)
.subscribe();

Assert.assertEquals(consumerA, consumerB);

consumerA.close();
Assert.assertTrue(consumerA.isConnected());
Assert.assertTrue(consumerB.isConnected());

consumerB.close();
Assert.assertFalse(consumerA.isConnected());
Assert.assertFalse(consumerB.isConnected());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ enum ConsumerType {
final BlockingQueue<Message<T>> incomingMessages;
protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives;
protected int maxReceiverQueueSize;
protected Schema<T> schema;
protected final Schema<T> schema;
protected final ConsumerInterceptors<T> interceptors;
private int refCount = 0;

protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
int receiverQueueSize, ExecutorService listenerExecutor,
Expand Down Expand Up @@ -385,4 +386,11 @@ protected void onAcknowledgeCumulative(MessageId messageId, Throwable exception)
}
}

protected synchronized void incrRefCount() {
++refCount;
}

protected synchronized boolean shouldTearDown() {
return refCount > 0 ? refCount-- == 0 : refCount == 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,10 @@ public void connectionFailed(PulsarClientException exception) {

@Override
public CompletableFuture<Void> closeAsync() {
if (!shouldTearDown()) {
return CompletableFuture.completedFuture(null);
}

if (getState() == State.Closing || getState() == State.Closed) {
unAckedMessageTracker.close();
if (possibleSendToDeadLetterTopicMessages != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ private <T> Optional<ConsumerBase<T>> subscriptionExist(ConsumerConfigurationDat
.filter(c -> c.getSubscription().equals(conf.getSubscriptionName()))
.filter(Consumer::isConnected)
.findFirst();
subscriber.ifPresent(ConsumerBase::incrRefCount);
return subscriber.map(ConsumerBase.class::cast);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,11 @@ public void testNoSharedConsumer() throws Exception {

otherSpout.close();

topicStats = admin.topics().getStats(topic);
Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);

otherSpout.close();

topicStats = admin.topics().getStats(topic);
Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 0);
}
Expand Down

0 comments on commit ff4db8d

Please sign in to comment.