Skip to content

Commit

Permalink
[cpp client] Bugfix prevent dup consumer for same topic subscription (a…
Browse files Browse the repository at this point in the history
…pache#3748)

Same fix as apache#3746 but for cpp client.

  - Filter consumers for the same topic name.
  - Add test to verify that same subscription names with different topics are
    allowed to be different consumers subscription instead of reused.
  • Loading branch information
lovelle authored and sijie committed Mar 15, 2019
1 parent 03830f9 commit fff02e2
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 2 deletions.
3 changes: 2 additions & 1 deletion pulsar-client-cpp/lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,8 @@ void ClientImpl::subscribeAsync(const std::string& topic, const std::string& con
ConsumersList consumers(consumers_);
for (auto& weakPtr : consumers) {
ConsumerImplBasePtr consumer = weakPtr.lock();
if (consumer && consumer->getSubscriptionName() == consumerName && !consumer->isClosed()) {
if (consumer && consumer->getSubscriptionName() == consumerName &&
consumer->getTopic() == topic && !consumer->isClosed()) {
lock.unlock();
LOG_INFO("Reusing existing consumer instance for " << topic << " -- " << consumerName);
callback(ResultOk, Consumer(consumer));
Expand Down
32 changes: 31 additions & 1 deletion pulsar-client-cpp/tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2909,4 +2909,34 @@ TEST(BasicEndToEndTest, testNegativeAcksWithPartitions) {
ASSERT_FALSE(res != 204 && res != 409);

testNegativeAcks(topicName, true);
}
}

TEST(BasicEndToEndTest, testPreventDupConsumersAllowSameSubForDifferentTopics) {
ClientConfiguration config;
Client client(lookupUrl);
std::string subsName = "my-only-sub";
std::string topicName =
"persistent://public/default/testPreventDupConsumersAllowSameSubForDifferentTopics";
ConsumerConfiguration consumerConf;
consumerConf.setConsumerType(ConsumerShared);

Consumer consumerA;
Result resultA = client.subscribe(topicName, subsName, consumerConf, consumerA);
ASSERT_EQ(ResultOk, resultA);
ASSERT_EQ(consumerA.getSubscriptionName(), subsName);

Consumer consumerB;
Result resultB = client.subscribe(topicName, subsName, consumerConf, consumerB);
ASSERT_EQ(ResultOk, resultB);
ASSERT_EQ(consumerB.getSubscriptionName(), subsName);

Consumer consumerC;
Result resultC = client.subscribe(topicName + "-different-topic", subsName, consumerConf, consumerC);
ASSERT_EQ(ResultOk, resultB);
ASSERT_EQ(consumerB.getSubscriptionName(), subsName);
ASSERT_EQ(ResultOk, consumerA.close());
ASSERT_EQ(ResultAlreadyClosed, consumerB.close());

// consumer C should be a different instance from A and B and should be with open state.
ASSERT_EQ(ResultOk, consumerC.close());
}

0 comments on commit fff02e2

Please sign in to comment.