Skip to content

Commit

Permalink
CPP Client Test for Lookup Issue (apache#551) (apache#595)
Browse files Browse the repository at this point in the history
* CPP Client Test for Lookup Issue (apache#551)

* Fixed a typo
  • Loading branch information
jai1 authored Jul 25, 2017
1 parent aaae221 commit 87b8944
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 23 deletions.
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/HandlerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class HandlerBase {
private:
DeadlineTimerPtr timer_;
friend class ClientConnection;
friend class PulsarFriend;
};
}
#endif //_PULSAR_HANDLER_BASE_HEADER_
115 changes: 92 additions & 23 deletions pulsar-client-cpp/tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
#include <boost/thread/mutex.hpp>
#include "PulsarFriend.h"
#include "HttpHelper.h"

#include <set>
#include <vector>
#include "lib/Future.h"
#include "lib/Utils.h"
DECLARE_LOG_OBJECT()
Expand Down Expand Up @@ -184,17 +185,19 @@ void resendMessage(Result r, const Message msg, Producer producer) {
ASSERT_EQ(ResultOk, client.close());
}

TEST(BasicEndToEndTest, testLookupThrottling) {
TEST(BasicEndToEndTest, testLookupThrottling)
{
std::string topicName = "persistent://prop/unit/ns1/testLookupThrottling";
ClientConfiguration config;
config.setConcurrentLookupRequest(0);
Client client(lookupUrl, config);

Producer producer;
Result result = client.createProducer("persistent://prop/unit/ns1/my-topic-1", producer);
Result result = client.createProducer(topicName, producer);
ASSERT_EQ(ResultTooManyLookupRequestException, result);

Consumer consumer1;
result = client.subscribe("persistent://prop/unit/ns1/my-topic-1", "my-sub-name", consumer1);
result = client.subscribe(topicName, "my-sub-name", consumer1);
ASSERT_EQ(ResultTooManyLookupRequestException, result);

}
Expand All @@ -213,50 +216,53 @@ TEST(BasicEndToEndTest, testLookupThrottling) {

TEST(BasicEndToEndTest, testNonPersistentTopic)
{
std::string topicName = "non-persistent://prop/unit/ns1/testNonPersistentTopic";
Client client(lookupUrl);
Producer producer;
Result result = client.createProducer("non-persistent://prop/unit/ns1/destination", producer);
Result result = client.createProducer(topicName, producer);
ASSERT_EQ(ResultInvalidTopicName, result);

Consumer consumer;
result = client.subscribe("non-persistent://prop/unit/ns1/destination", "my-sub-name",
consumer);
result = client.subscribe(topicName, "my-sub-name", consumer);
ASSERT_EQ(ResultInvalidTopicName, result);
}

TEST(BasicEndToEndTest, testSingleClientMultipleSubscriptions)
{
std::string topicName = "persistent://prop/unit/ns1/testSingleClientMultipleSubscriptions";

Client client(lookupUrl);

Producer producer;
Result result = client.createProducer("persistent://prop/unit/ns1/my-topic-1", producer);
Result result = client.createProducer(topicName, producer);
ASSERT_EQ(ResultOk, result);

Consumer consumer1;
result = client.subscribe("persistent://prop/unit/ns1/my-topic-1", "my-sub-name", consumer1);
result = client.subscribe(topicName, "my-sub-name", consumer1);
ASSERT_EQ(ResultOk, result);

Consumer consumer2;
result = client.subscribe("persistent://prop/unit/ns1/my-topic-1", "my-sub-name", consumer2);
result = client.subscribe(topicName, "my-sub-name", consumer2);
ASSERT_EQ(ResultConsumerBusy, result);
//at this point connection gets destroyed because this consumer creation fails
}

TEST(BasicEndToEndTest, testMultipleClientsMultipleSubscriptions)
{
std::string topicName = "persistent://prop/unit/ns1/testMultipleClientsMultipleSubscriptions";
Client client1(lookupUrl);
Client client2(lookupUrl);

Producer producer1;
Result result = client1.createProducer("persistent://prop/unit/ns1/my-topic-2", producer1);
Result result = client1.createProducer(topicName, producer1);
ASSERT_EQ(ResultOk, result);

Consumer consumer1;
result = client1.subscribe("persistent://prop/unit/ns1/my-topic-2", "my-sub-name", consumer1);
result = client1.subscribe(topicName, "my-sub-name", consumer1);
ASSERT_EQ(ResultOk, result);

Consumer consumer2;
result = client2.subscribe("persistent://prop/unit/ns1/my-topic-2", "my-sub-name", consumer2);
result = client2.subscribe(topicName, "my-sub-name", consumer2);
ASSERT_EQ(ResultConsumerBusy, result);

ASSERT_EQ(ResultOk, producer1.close());
Expand All @@ -273,18 +279,19 @@ TEST(BasicEndToEndTest, testLookupThrottling) {

TEST(BasicEndToEndTest, testProduceAndConsumeAfterClientClose)
{
std::string topicName = "persistent://prop/unit/ns1/testProduceAndConsumeAfterClientClose";
Client client(lookupUrl);

Producer producer;
Result result = client.createProducer("persistent://prop/unit/ns1/my-topic-3", producer);
Result result = client.createProducer(topicName, producer);
ASSERT_EQ(ResultOk, result);

Consumer consumer;
result = client.subscribe("persistent://prop/unit/ns1/my-topic-3", "my-sub-name", consumer);
result = client.subscribe(topicName, "my-sub-name", consumer);

// Clean dangling subscription
consumer.unsubscribe();
result = client.subscribe("persistent://prop/unit/ns1/my-topic-3", "my-sub-name", consumer);
result = client.subscribe(topicName, "my-sub-name", consumer);

ASSERT_EQ(ResultOk, result);

Expand Down Expand Up @@ -359,7 +366,7 @@ TEST(BasicEndToEndTest, testLookupThrottling) {
TEST(BasicEndToEndTest, testInvalidUrlPassed)
{
Client client("localhost:4080");
std::string topicName = "persistent://prop/unit/ns1/test";
std::string topicName = "persistent://prop/unit/ns1/testInvalidUrlPassed";
std::string subName = "test-sub";
Producer producer;
Result result = client.createProducer(topicName, producer);
Expand All @@ -385,7 +392,7 @@ TEST(BasicEndToEndTest, testLookupThrottling) {
TEST(BasicEndToEndTest, testPartitionedProducerConsumer)
{
Client client(lookupUrl);
std::string topicName = "persistent://prop/unit/ns/partition-test";
std::string topicName = "persistent://prop/unit/ns/testPartitionedProducerConsumer";

// call admin api to make it partitioned
std::string url = adminUrl + "admin/persistent/prop/unit/ns/partition-test/partitions";
Expand Down Expand Up @@ -426,7 +433,7 @@ TEST(BasicEndToEndTest, testMessageTooBig)
{
ClientConfiguration config;
Client client(lookupUrl);
std::string topicName = "persistent://prop/unit/ns1/my-topic";
std::string topicName = "persistent://prop/unit/ns1/testMessageTooBig";
Producer producer;
Result result = client.createProducer(topicName, producer);
ASSERT_EQ(ResultOk, result);
Expand All @@ -450,7 +457,7 @@ TEST(BasicEndToEndTest, testMessageTooBig)
{
ClientConfiguration config;
Client client(lookupUrl);
std::string topicName = "persistent://prop/unit/namespace1/my-topic-lz4";
std::string topicName = "persistent://prop/unit/namespace1/testCompressionLZ4";
std::string subName = "my-sub-name";
Producer producer;
ProducerConfiguration conf;
Expand Down Expand Up @@ -489,7 +496,7 @@ TEST(BasicEndToEndTest, testMessageTooBig)
{
ClientConfiguration config;
Client client(lookupUrl);
std::string topicName = "persistent://prop/unit/ns1/my-topic-zlib";
std::string topicName = "persistent://prop/unit/ns1/testCompressionZLib";
std::string subName = "my-sub-name";
Producer producer;
ProducerConfiguration conf;
Expand Down Expand Up @@ -766,7 +773,7 @@ TEST(BasicEndToEndTest, testMessageListener)
TEST(BasicEndToEndTest, testMessageListenerPause)
{
Client client(lookupUrl);
std::string topicName = "persistent://property/cluster/namespace/partition-testMessageListener-pauses";
std::string topicName = "persistent://property/cluster/namespace/partition-testMessageListenerPause";

// call admin api to make it partitioned
std::string url = adminUrl + "admin/persistent/property/cluster/namespace/partition-testMessageListener-pauses/partitions";
Expand Down Expand Up @@ -976,7 +983,7 @@ TEST(BasicEndToEndTest, testMessageListenerPause)
TEST(BasicEndToEndTest, testProduceMessageSize) {
ClientConfiguration config;
Client client(lookupUrl);
std::string topicName = "persistent://prop/unit/ns1/maxMsgSize";
std::string topicName = "persistent://prop/unit/ns1/testProduceMessageSize";
std::string subName = "my-sub-name";
Producer producer1;
Producer producer2;
Expand Down Expand Up @@ -1023,3 +1030,65 @@ TEST(BasicEndToEndTest, testProduceMessageSize) {

delete[] content;
}

TEST(BasicEndToEndTest, testHandlerReconnectionLogic) {
Client client(adminUrl);
std::string topicName = "persistent://prop/unit/ns1/testHandlerReconnectionLogic";

Producer producer;
Consumer consumer;

ASSERT_EQ(client.subscribe(topicName, "my-sub", consumer), ResultOk);
ASSERT_EQ(client.createProducer(topicName, producer), ResultOk);

std::vector<ClientConnectionPtr> oldConnections;

int numOfMessages = 10;
std::string propertyName = "msgIndex";
for (int i = 0; i<numOfMessages; i++) {
std::string messageContent = "msg-" + boost::lexical_cast<std::string>(i);
Message msg = MessageBuilder().setContent(messageContent).setProperty(
propertyName, boost::lexical_cast<std::string>(i)).build();
if (i % 3 == 1) {
ProducerImpl& pImpl = PulsarFriend::getProducerImpl(producer);
ClientConnectionPtr clientConnectionPtr;
do {
ClientConnectionWeakPtr clientConnectionWeakPtr = PulsarFriend::getClientConnection(pImpl);
clientConnectionPtr = clientConnectionWeakPtr.lock();
usleep(1 * 1e6);
} while(!clientConnectionPtr);
oldConnections.push_back(clientConnectionPtr);
clientConnectionPtr->close();
}
ASSERT_EQ(producer.send(msg), ResultOk);
}

std::set<std::string> receivedMsgContent;
std::set<std::string> receivedMsgIndex;

Message msg;
while(consumer.receive(msg, 30000) == ResultOk) {
receivedMsgContent.insert(msg.getDataAsString());
receivedMsgIndex.insert(msg.getProperty(propertyName));
}

ConsumerImpl& cImpl = PulsarFriend::getConsumerImpl(consumer);
ClientConnectionWeakPtr clientConnectionWeakPtr = PulsarFriend::getClientConnection(cImpl);
ClientConnectionPtr clientConnectionPtr = clientConnectionWeakPtr.lock();
oldConnections.push_back(clientConnectionPtr);
clientConnectionPtr->close();

while(consumer.receive(msg, 30000) == ResultOk) {
consumer.acknowledge(msg);
receivedMsgContent.insert(msg.getDataAsString());
receivedMsgIndex.insert(msg.getProperty(propertyName));
}

ASSERT_EQ(receivedMsgContent.size(), 10);
ASSERT_EQ(receivedMsgIndex.size(), 10);

for (int i = 0; i<numOfMessages; i++) {
ASSERT_NE(receivedMsgContent.find("msg-" + boost::lexical_cast<std::string>(i)), receivedMsgContent.end());
ASSERT_NE(receivedMsgIndex.find(boost::lexical_cast<std::string>(i)), receivedMsgIndex.end());
}
}
14 changes: 14 additions & 0 deletions pulsar-client-cpp/tests/PulsarFriend.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,19 @@ class PulsarFriend {
ConsumerImpl* consumerImpl = static_cast<ConsumerImpl*>(consumer.impl_.get());
return boost::static_pointer_cast<ConsumerStatsImpl>(consumerImpl->consumerStatsBasePtr_);
}

static ProducerImpl& getProducerImpl(Producer producer) {
ProducerImpl* producerImpl = static_cast<ProducerImpl*>(producer.impl_.get());
return *producerImpl;
}

static ConsumerImpl& getConsumerImpl(Consumer consumer) {
ConsumerImpl* consumerImpl = static_cast<ConsumerImpl*>(consumer.impl_.get());
return *consumerImpl;
}

static ClientConnectionWeakPtr getClientConnection(HandlerBase& handler) {
return handler.connection_;
}
};
}

0 comments on commit 87b8944

Please sign in to comment.