diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc index 7cc818b40f07f..0a8aaa61f14fa 100644 --- a/pulsar-client-cpp/lib/ClientImpl.cc +++ b/pulsar-client-cpp/lib/ClientImpl.cc @@ -344,7 +344,8 @@ void ClientImpl::subscribeAsync(const std::string& topic, const std::string& con ConsumersList consumers(consumers_); for (auto& weakPtr : consumers) { ConsumerImplBasePtr consumer = weakPtr.lock(); - if (consumer && consumer->getSubscriptionName() == consumerName && !consumer->isClosed()) { + if (consumer && consumer->getSubscriptionName() == consumerName && + consumer->getTopic() == topic && !consumer->isClosed()) { lock.unlock(); LOG_INFO("Reusing existing consumer instance for " << topic << " -- " << consumerName); callback(ResultOk, Consumer(consumer)); diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index 52c2800107f12..9ce89f7baed6a 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -2909,4 +2909,34 @@ TEST(BasicEndToEndTest, testNegativeAcksWithPartitions) { ASSERT_FALSE(res != 204 && res != 409); testNegativeAcks(topicName, true); -} \ No newline at end of file +} + +TEST(BasicEndToEndTest, testPreventDupConsumersAllowSameSubForDifferentTopics) { + ClientConfiguration config; + Client client(lookupUrl); + std::string subsName = "my-only-sub"; + std::string topicName = + "persistent://public/default/testPreventDupConsumersAllowSameSubForDifferentTopics"; + ConsumerConfiguration consumerConf; + consumerConf.setConsumerType(ConsumerShared); + + Consumer consumerA; + Result resultA = client.subscribe(topicName, subsName, consumerConf, consumerA); + ASSERT_EQ(ResultOk, resultA); + ASSERT_EQ(consumerA.getSubscriptionName(), subsName); + + Consumer consumerB; + Result resultB = client.subscribe(topicName, subsName, consumerConf, consumerB); + ASSERT_EQ(ResultOk, resultB); + ASSERT_EQ(consumerB.getSubscriptionName(), subsName); + + Consumer consumerC; + Result resultC = client.subscribe(topicName + "-different-topic", subsName, consumerConf, consumerC); + ASSERT_EQ(ResultOk, resultB); + ASSERT_EQ(consumerB.getSubscriptionName(), subsName); + ASSERT_EQ(ResultOk, consumerA.close()); + ASSERT_EQ(ResultAlreadyClosed, consumerB.close()); + + // consumer C should be a different instance from A and B and should be with open state. + ASSERT_EQ(ResultOk, consumerC.close()); +}