diff --git a/pulsar-client-cpp/lib/HandlerBase.h b/pulsar-client-cpp/lib/HandlerBase.h index 40cda0bc4a18b..4e2846571957d 100644 --- a/pulsar-client-cpp/lib/HandlerBase.h +++ b/pulsar-client-cpp/lib/HandlerBase.h @@ -115,6 +115,7 @@ class HandlerBase { private: DeadlineTimerPtr timer_; friend class ClientConnection; + friend class PulsarFriend; }; } #endif //_PULSAR_HANDLER_BASE_HEADER_ diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index 6d77df4c0b66a..7b2898cb4a4e5 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -30,7 +30,8 @@ #include #include "PulsarFriend.h" #include "HttpHelper.h" - +#include +#include #include "lib/Future.h" #include "lib/Utils.h" DECLARE_LOG_OBJECT() @@ -184,17 +185,19 @@ void resendMessage(Result r, const Message msg, Producer producer) { ASSERT_EQ(ResultOk, client.close()); } -TEST(BasicEndToEndTest, testLookupThrottling) { +TEST(BasicEndToEndTest, testLookupThrottling) +{ + std::string topicName = "persistent://prop/unit/ns1/testLookupThrottling"; ClientConfiguration config; config.setConcurrentLookupRequest(0); Client client(lookupUrl, config); Producer producer; - Result result = client.createProducer("persistent://prop/unit/ns1/my-topic-1", producer); + Result result = client.createProducer(topicName, producer); ASSERT_EQ(ResultTooManyLookupRequestException, result); Consumer consumer1; - result = client.subscribe("persistent://prop/unit/ns1/my-topic-1", "my-sub-name", consumer1); + result = client.subscribe(topicName, "my-sub-name", consumer1); ASSERT_EQ(ResultTooManyLookupRequestException, result); } @@ -213,50 +216,53 @@ TEST(BasicEndToEndTest, testLookupThrottling) { TEST(BasicEndToEndTest, testNonPersistentTopic) { + std::string topicName = "non-persistent://prop/unit/ns1/testNonPersistentTopic"; Client client(lookupUrl); Producer producer; - Result result = client.createProducer("non-persistent://prop/unit/ns1/destination", producer); + Result result = client.createProducer(topicName, producer); ASSERT_EQ(ResultInvalidTopicName, result); Consumer consumer; - result = client.subscribe("non-persistent://prop/unit/ns1/destination", "my-sub-name", - consumer); + result = client.subscribe(topicName, "my-sub-name", consumer); ASSERT_EQ(ResultInvalidTopicName, result); } TEST(BasicEndToEndTest, testSingleClientMultipleSubscriptions) { + std::string topicName = "persistent://prop/unit/ns1/testSingleClientMultipleSubscriptions"; + Client client(lookupUrl); Producer producer; - Result result = client.createProducer("persistent://prop/unit/ns1/my-topic-1", producer); + Result result = client.createProducer(topicName, producer); ASSERT_EQ(ResultOk, result); Consumer consumer1; - result = client.subscribe("persistent://prop/unit/ns1/my-topic-1", "my-sub-name", consumer1); + result = client.subscribe(topicName, "my-sub-name", consumer1); ASSERT_EQ(ResultOk, result); Consumer consumer2; - result = client.subscribe("persistent://prop/unit/ns1/my-topic-1", "my-sub-name", consumer2); + result = client.subscribe(topicName, "my-sub-name", consumer2); ASSERT_EQ(ResultConsumerBusy, result); //at this point connection gets destroyed because this consumer creation fails } TEST(BasicEndToEndTest, testMultipleClientsMultipleSubscriptions) { + std::string topicName = "persistent://prop/unit/ns1/testMultipleClientsMultipleSubscriptions"; Client client1(lookupUrl); Client client2(lookupUrl); Producer producer1; - Result result = client1.createProducer("persistent://prop/unit/ns1/my-topic-2", producer1); + Result result = client1.createProducer(topicName, producer1); ASSERT_EQ(ResultOk, result); Consumer consumer1; - result = client1.subscribe("persistent://prop/unit/ns1/my-topic-2", "my-sub-name", consumer1); + result = client1.subscribe(topicName, "my-sub-name", consumer1); ASSERT_EQ(ResultOk, result); Consumer consumer2; - result = client2.subscribe("persistent://prop/unit/ns1/my-topic-2", "my-sub-name", consumer2); + result = client2.subscribe(topicName, "my-sub-name", consumer2); ASSERT_EQ(ResultConsumerBusy, result); ASSERT_EQ(ResultOk, producer1.close()); @@ -273,18 +279,19 @@ TEST(BasicEndToEndTest, testLookupThrottling) { TEST(BasicEndToEndTest, testProduceAndConsumeAfterClientClose) { + std::string topicName = "persistent://prop/unit/ns1/testProduceAndConsumeAfterClientClose"; Client client(lookupUrl); Producer producer; - Result result = client.createProducer("persistent://prop/unit/ns1/my-topic-3", producer); + Result result = client.createProducer(topicName, producer); ASSERT_EQ(ResultOk, result); Consumer consumer; - result = client.subscribe("persistent://prop/unit/ns1/my-topic-3", "my-sub-name", consumer); + result = client.subscribe(topicName, "my-sub-name", consumer); // Clean dangling subscription consumer.unsubscribe(); - result = client.subscribe("persistent://prop/unit/ns1/my-topic-3", "my-sub-name", consumer); + result = client.subscribe(topicName, "my-sub-name", consumer); ASSERT_EQ(ResultOk, result); @@ -359,7 +366,7 @@ TEST(BasicEndToEndTest, testLookupThrottling) { TEST(BasicEndToEndTest, testInvalidUrlPassed) { Client client("localhost:4080"); - std::string topicName = "persistent://prop/unit/ns1/test"; + std::string topicName = "persistent://prop/unit/ns1/testInvalidUrlPassed"; std::string subName = "test-sub"; Producer producer; Result result = client.createProducer(topicName, producer); @@ -385,7 +392,7 @@ TEST(BasicEndToEndTest, testLookupThrottling) { TEST(BasicEndToEndTest, testPartitionedProducerConsumer) { Client client(lookupUrl); - std::string topicName = "persistent://prop/unit/ns/partition-test"; + std::string topicName = "persistent://prop/unit/ns/testPartitionedProducerConsumer"; // call admin api to make it partitioned std::string url = adminUrl + "admin/persistent/prop/unit/ns/partition-test/partitions"; @@ -426,7 +433,7 @@ TEST(BasicEndToEndTest, testMessageTooBig) { ClientConfiguration config; Client client(lookupUrl); - std::string topicName = "persistent://prop/unit/ns1/my-topic"; + std::string topicName = "persistent://prop/unit/ns1/testMessageTooBig"; Producer producer; Result result = client.createProducer(topicName, producer); ASSERT_EQ(ResultOk, result); @@ -450,7 +457,7 @@ TEST(BasicEndToEndTest, testMessageTooBig) { ClientConfiguration config; Client client(lookupUrl); - std::string topicName = "persistent://prop/unit/namespace1/my-topic-lz4"; + std::string topicName = "persistent://prop/unit/namespace1/testCompressionLZ4"; std::string subName = "my-sub-name"; Producer producer; ProducerConfiguration conf; @@ -489,7 +496,7 @@ TEST(BasicEndToEndTest, testMessageTooBig) { ClientConfiguration config; Client client(lookupUrl); - std::string topicName = "persistent://prop/unit/ns1/my-topic-zlib"; + std::string topicName = "persistent://prop/unit/ns1/testCompressionZLib"; std::string subName = "my-sub-name"; Producer producer; ProducerConfiguration conf; @@ -766,7 +773,7 @@ TEST(BasicEndToEndTest, testMessageListener) TEST(BasicEndToEndTest, testMessageListenerPause) { Client client(lookupUrl); - std::string topicName = "persistent://property/cluster/namespace/partition-testMessageListener-pauses"; + std::string topicName = "persistent://property/cluster/namespace/partition-testMessageListenerPause"; // call admin api to make it partitioned std::string url = adminUrl + "admin/persistent/property/cluster/namespace/partition-testMessageListener-pauses/partitions"; @@ -976,7 +983,7 @@ TEST(BasicEndToEndTest, testMessageListenerPause) TEST(BasicEndToEndTest, testProduceMessageSize) { ClientConfiguration config; Client client(lookupUrl); - std::string topicName = "persistent://prop/unit/ns1/maxMsgSize"; + std::string topicName = "persistent://prop/unit/ns1/testProduceMessageSize"; std::string subName = "my-sub-name"; Producer producer1; Producer producer2; @@ -1023,3 +1030,65 @@ TEST(BasicEndToEndTest, testProduceMessageSize) { delete[] content; } + +TEST(BasicEndToEndTest, testHandlerReconnectionLogic) { + Client client(adminUrl); + std::string topicName = "persistent://prop/unit/ns1/testHandlerReconnectionLogic"; + + Producer producer; + Consumer consumer; + + ASSERT_EQ(client.subscribe(topicName, "my-sub", consumer), ResultOk); + ASSERT_EQ(client.createProducer(topicName, producer), ResultOk); + + std::vector oldConnections; + + int numOfMessages = 10; + std::string propertyName = "msgIndex"; + for (int i = 0; i(i); + Message msg = MessageBuilder().setContent(messageContent).setProperty( + propertyName, boost::lexical_cast(i)).build(); + if (i % 3 == 1) { + ProducerImpl& pImpl = PulsarFriend::getProducerImpl(producer); + ClientConnectionPtr clientConnectionPtr; + do { + ClientConnectionWeakPtr clientConnectionWeakPtr = PulsarFriend::getClientConnection(pImpl); + clientConnectionPtr = clientConnectionWeakPtr.lock(); + usleep(1 * 1e6); + } while(!clientConnectionPtr); + oldConnections.push_back(clientConnectionPtr); + clientConnectionPtr->close(); + } + ASSERT_EQ(producer.send(msg), ResultOk); + } + + std::set receivedMsgContent; + std::set receivedMsgIndex; + + Message msg; + while(consumer.receive(msg, 30000) == ResultOk) { + receivedMsgContent.insert(msg.getDataAsString()); + receivedMsgIndex.insert(msg.getProperty(propertyName)); + } + + ConsumerImpl& cImpl = PulsarFriend::getConsumerImpl(consumer); + ClientConnectionWeakPtr clientConnectionWeakPtr = PulsarFriend::getClientConnection(cImpl); + ClientConnectionPtr clientConnectionPtr = clientConnectionWeakPtr.lock(); + oldConnections.push_back(clientConnectionPtr); + clientConnectionPtr->close(); + + while(consumer.receive(msg, 30000) == ResultOk) { + consumer.acknowledge(msg); + receivedMsgContent.insert(msg.getDataAsString()); + receivedMsgIndex.insert(msg.getProperty(propertyName)); + } + + ASSERT_EQ(receivedMsgContent.size(), 10); + ASSERT_EQ(receivedMsgIndex.size(), 10); + + for (int i = 0; i(i)), receivedMsgContent.end()); + ASSERT_NE(receivedMsgIndex.find(boost::lexical_cast(i)), receivedMsgIndex.end()); + } +} diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h index fbfd9c1c4bdd7..749aeae497b93 100644 --- a/pulsar-client-cpp/tests/PulsarFriend.h +++ b/pulsar-client-cpp/tests/PulsarFriend.h @@ -48,5 +48,19 @@ class PulsarFriend { ConsumerImpl* consumerImpl = static_cast(consumer.impl_.get()); return boost::static_pointer_cast(consumerImpl->consumerStatsBasePtr_); } + + static ProducerImpl& getProducerImpl(Producer producer) { + ProducerImpl* producerImpl = static_cast(producer.impl_.get()); + return *producerImpl; + } + + static ConsumerImpl& getConsumerImpl(Consumer consumer) { + ConsumerImpl* consumerImpl = static_cast(consumer.impl_.get()); + return *consumerImpl; + } + + static ClientConnectionWeakPtr getClientConnection(HandlerBase& handler) { + return handler.connection_; + } }; }