Skip to content

Commit

Permalink
Issue apache#3332: fix httplookup issue for get ns/topics in cpp (apa…
Browse files Browse the repository at this point in the history
…che#3407)

* fix httplookup issue for get ns/topics

* change following comments
  • Loading branch information
jiazhai authored and merlimat committed Jan 24, 2019
1 parent 358d4e7 commit 0ec897f
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 3 deletions.
9 changes: 6 additions & 3 deletions pulsar-client-cpp/lib/HTTPLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -310,18 +310,21 @@ LookupDataResultPtr HTTPLookupService::parseLookupData(const std::string &json)
NamespaceTopicsPtr HTTPLookupService::parseNamespaceTopicsData(const std::string &json) {
Json::Value root;
Json::Reader reader;
LOG_DEBUG("GetNamespaceTopics json = " << json);

// passed in json is like: ["topic1", "topic2"...]
// root will be an array of topics
if (!reader.parse(json, root, false)) {
LOG_ERROR("Failed to parse json of Topics of Namespace: " << reader.getFormatedErrorMessages()
<< "\nInput Json = " << json);
return NamespaceTopicsPtr();
}

Json::Value topicsArray = root["topics"];
std::set<std::string> topicSet;
// get all topics
for (int i = 0; i < topicsArray.size(); i++) {
for (int i = 0; i < root.size(); i++) {
// remove partition part
const std::string &topicName = topicsArray[i].asString();
const std::string &topicName = root[i].asString();
int pos = topicName.find("-partition-");
std::string filteredName = topicName.substr(0, pos);

Expand Down
95 changes: 95 additions & 0 deletions pulsar-client-cpp/tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1867,6 +1867,101 @@ TEST(BasicEndToEndTest, testPatternMultiTopicsConsumerPubSub) {
client.shutdown();
}

// User adminUrl to create client, to protect http related services
TEST(BasicEndToEndTest, testpatternMultiTopicsHttpConsumerPubSub) {
Client client(adminUrl);
std::string pattern = "persistent://public/default/patternMultiTopicsHttpConsumer.*";

std::string subName = "testpatternMultiTopicsHttpConsumer";
std::string topicName1 = "persistent://public/default/patternMultiTopicsHttpConsumerPubSub1";
std::string topicName2 = "persistent://public/default/patternMultiTopicsHttpConsumerPubSub2";
std::string topicName3 = "persistent://public/default/patternMultiTopicsHttpConsumerPubSub3";

// call admin api to make topics partitioned
std::string url1 =
adminUrl + "admin/v2/persistent/public/default/patternMultiTopicsHttpConsumerPubSub1/partitions";
std::string url2 =
adminUrl + "admin/v2/persistent/public/default/patternMultiTopicsHttpConsumerPubSub2/partitions";
std::string url3 =
adminUrl + "admin/v2/persistent/public/default/patternMultiTopicsHttpConsumerPubSub3/partitions";

int res = makePutRequest(url1, "2");
ASSERT_FALSE(res != 204 && res != 409);
res = makePutRequest(url2, "3");
ASSERT_FALSE(res != 204 && res != 409);
res = makePutRequest(url3, "4");
ASSERT_FALSE(res != 204 && res != 409);

Producer producer1;
Result result = client.createProducer(topicName1, producer1);
ASSERT_EQ(ResultOk, result);
Producer producer2;
result = client.createProducer(topicName2, producer2);
ASSERT_EQ(ResultOk, result);
Producer producer3;
result = client.createProducer(topicName3, producer3);
ASSERT_EQ(ResultOk, result);

LOG_INFO("created 3 producers that match, with partitions: 2, 3, 4");

int messageNumber = 100;
ConsumerConfiguration consConfig;
consConfig.setConsumerType(ConsumerShared);
consConfig.setReceiverQueueSize(10); // size for each sub-consumer
Consumer consumer;
Promise<Result, Consumer> consumerPromise;
client.subscribeWithRegexAsync(pattern, subName, consConfig,
WaitForCallbackValue<Consumer>(consumerPromise));
Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
result = consumerFuture.get(consumer);
ASSERT_EQ(ResultOk, result);
ASSERT_EQ(consumer.getSubscriptionName(), subName);
LOG_INFO("created topics consumer on a pattern that match 3 topics");

std::string msgContent = "msg-content";
LOG_INFO("Publishing 100 messages by producer 1 synchronously");
for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
std::stringstream stream;
stream << msgContent << msgNum;
Message msg = MessageBuilder().setContent(stream.str()).build();
ASSERT_EQ(ResultOk, producer1.send(msg));
}

msgContent = "msg-content2";
LOG_INFO("Publishing 100 messages by producer 2 synchronously");
for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
std::stringstream stream;
stream << msgContent << msgNum;
Message msg = MessageBuilder().setContent(stream.str()).build();
ASSERT_EQ(ResultOk, producer2.send(msg));
}

msgContent = "msg-content3";
LOG_INFO("Publishing 100 messages by producer 3 synchronously");
for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
std::stringstream stream;
stream << msgContent << msgNum;
Message msg = MessageBuilder().setContent(stream.str()).build();
ASSERT_EQ(ResultOk, producer3.send(msg));
}

LOG_INFO("Consuming and acking 300 messages by multiTopicsConsumer");
for (int i = 0; i < 3 * messageNumber; i++) {
Message m;
ASSERT_EQ(ResultOk, consumer.receive(m, 1000));
ASSERT_EQ(ResultOk, consumer.acknowledge(m));
}
LOG_INFO("Consumed and acked 300 messages by multiTopicsConsumer");

// verify no more to receive
Message m;
ASSERT_EQ(ResultTimeout, consumer.receive(m, 1000));

ASSERT_EQ(ResultOk, consumer.unsubscribe());

client.shutdown();
}

// create a pattern consumer, which contains no match topics at beginning.
// create 4 topics, in which 3 topics match the pattern.
// verify PatternMultiTopicsConsumer subscribed matched topics, after a while,
Expand Down

0 comments on commit 0ec897f

Please sign in to comment.