Skip to content

Commit

Permalink
[cpp client] implement reference count for close() (apache#3863)
Browse files Browse the repository at this point in the history
Add reference count feature to keep track of reused instances of a consumer
instance, for more details please see commit ff4db8d.

*Modifications*

  - Add refCount instance variable on ConsumerImpl.
  - Use new safeDecrRefCount() on consumer close() in order to know whether
    effective close call should occur or not.
  - Increment reference count when a previous built consumer instance is being
    used by caller.

*Future considerations*

Thereafter when feature preventing duplicated consumer is made for
PartitionedConsumer, MultiTopicsConsumer and PatternMultiTopicsConsumer,
incrRefCount() member could be turned into a pure virtual method.
  • Loading branch information
lovelle authored and sijie committed Mar 27, 2019
1 parent fa80feb commit ee98e8b
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 2 deletions.
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ void ClientImpl::subscribeAsync(const std::string& topic, const std::string& con
ConsumerImplBasePtr consumer = weakPtr.lock();
if (consumer && consumer->getSubscriptionName() == consumerName &&
consumer->getTopic() == topic && !consumer->isClosed()) {
consumer->incrRefCount();
lock.unlock();
LOG_INFO("Reusing existing consumer instance for " << topic << " -- " << consumerName);
callback(ResultOk, Consumer(consumer));
Expand Down
16 changes: 14 additions & 2 deletions pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ Future<Result, ConsumerImplBaseWeakPtr> ConsumerImpl::getConsumerCreatedFuture()
return consumerCreatedPromise_.getFuture();
}

void ConsumerImpl::incrRefCount() { ++refCount_; }

unsigned int ConsumerImpl::safeDecrRefCount() { return refCount_ > 0 ? refCount_-- : refCount_; }

const std::string& ConsumerImpl::getSubscriptionName() const { return originalSubscriptionName_; }

const std::string& ConsumerImpl::getTopic() const { return topic_; }
Expand Down Expand Up @@ -726,10 +730,10 @@ inline proto::CommandSubscribe_InitialPosition ConsumerImpl::getInitialPosition(
InitialPosition initialPosition = config_.getSubscriptionInitialPosition();
switch (initialPosition) {
case InitialPositionLatest:
return proto::CommandSubscribe_InitialPosition ::CommandSubscribe_InitialPosition_Latest;
return proto::CommandSubscribe_InitialPosition::CommandSubscribe_InitialPosition_Latest;

case InitialPositionEarliest:
return proto::CommandSubscribe_InitialPosition ::CommandSubscribe_InitialPosition_Earliest;
return proto::CommandSubscribe_InitialPosition::CommandSubscribe_InitialPosition_Earliest;
}
}

Expand Down Expand Up @@ -817,6 +821,14 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
return;
}

if (safeDecrRefCount() != 0) {
lock.unlock();
if (callback) {
callback(ResultOk);
}
return;
}

ClientConnectionPtr cnx = getCnx().lock();
if (!cnx) {
lock.unlock();
Expand Down
3 changes: 3 additions & 0 deletions pulsar-client-cpp/lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class ConsumerImpl : public ConsumerImplBase,
virtual bool isReadCompacted();
virtual void hasMessageAvailableAsync(HasMessageAvailableCallback callback);
virtual void getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback);
virtual void incrRefCount();

protected:
void connectionOpened(const ClientConnectionPtr& cnx);
Expand Down Expand Up @@ -146,6 +147,7 @@ class ConsumerImpl : public ConsumerImplBase,
void statsCallback(Result, ResultCallback, proto::CommandAck_AckType);
void notifyPendingReceivedCallback(Result result, Message& message, const ReceiveCallback& callback);
void failPendingReceiveCallback();
unsigned int safeDecrRefCount();

Optional<MessageId> clearReceiveQueue();

Expand Down Expand Up @@ -176,6 +178,7 @@ class ConsumerImpl : public ConsumerImplBase,
BatchAcknowledgementTracker batchAcknowledgementTracker_;
BrokerConsumerStatsImpl brokerConsumerStats_;
NegativeAcksTracker negativeAcksTracker_;
unsigned int refCount_ = 0;

MessageCryptoPtr msgCrypto_;
const bool readCompacted_;
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/ConsumerImplBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class ConsumerImplBase {
virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) = 0;
virtual void seekAsync(const MessageId& msgId, ResultCallback callback) = 0;
virtual void negativeAcknowledge(const MessageId& msgId) = 0;
virtual void incrRefCount(){};
};
} // namespace pulsar
#endif // PULSAR_CONSUMER_IMPL_BASE_HEADER
4 changes: 4 additions & 0 deletions pulsar-client-cpp/tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2818,6 +2818,8 @@ TEST(BasicEndToEndTest, testPreventDupConsumersOnSharedMode) {
// Since this is a shared consumer over same client cnx
// closing consumerA should result in consumerB also being closed.
ASSERT_EQ(ResultOk, consumerA.close());
ASSERT_EQ(ResultOk, consumerB.close());
ASSERT_EQ(ResultAlreadyClosed, consumerA.close());
ASSERT_EQ(ResultAlreadyClosed, consumerB.close());
}

Expand Down Expand Up @@ -2936,6 +2938,8 @@ TEST(BasicEndToEndTest, testPreventDupConsumersAllowSameSubForDifferentTopics) {
ASSERT_EQ(ResultOk, resultB);
ASSERT_EQ(consumerB.getSubscriptionName(), subsName);
ASSERT_EQ(ResultOk, consumerA.close());
ASSERT_EQ(ResultOk, consumerB.close());
ASSERT_EQ(ResultAlreadyClosed, consumerA.close());
ASSERT_EQ(ResultAlreadyClosed, consumerB.close());

// consumer C should be a different instance from A and B and should be with open state.
Expand Down

0 comments on commit ee98e8b

Please sign in to comment.