diff --git a/pulsar-client-cpp/lib/HTTPLookupService.cc b/pulsar-client-cpp/lib/HTTPLookupService.cc index b3cb02349a404..e96c35de2855e 100644 --- a/pulsar-client-cpp/lib/HTTPLookupService.cc +++ b/pulsar-client-cpp/lib/HTTPLookupService.cc @@ -310,18 +310,21 @@ LookupDataResultPtr HTTPLookupService::parseLookupData(const std::string &json) NamespaceTopicsPtr HTTPLookupService::parseNamespaceTopicsData(const std::string &json) { Json::Value root; Json::Reader reader; + LOG_DEBUG("GetNamespaceTopics json = " << json); + + // passed in json is like: ["topic1", "topic2"...] + // root will be an array of topics if (!reader.parse(json, root, false)) { LOG_ERROR("Failed to parse json of Topics of Namespace: " << reader.getFormatedErrorMessages() << "\nInput Json = " << json); return NamespaceTopicsPtr(); } - Json::Value topicsArray = root["topics"]; std::set topicSet; // get all topics - for (int i = 0; i < topicsArray.size(); i++) { + for (int i = 0; i < root.size(); i++) { // remove partition part - const std::string &topicName = topicsArray[i].asString(); + const std::string &topicName = root[i].asString(); int pos = topicName.find("-partition-"); std::string filteredName = topicName.substr(0, pos); diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index 9a34fbe7a9f4c..4871b89c4032f 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -1867,6 +1867,101 @@ TEST(BasicEndToEndTest, testPatternMultiTopicsConsumerPubSub) { client.shutdown(); } +// User adminUrl to create client, to protect http related services +TEST(BasicEndToEndTest, testpatternMultiTopicsHttpConsumerPubSub) { + Client client(adminUrl); + std::string pattern = "persistent://public/default/patternMultiTopicsHttpConsumer.*"; + + std::string subName = "testpatternMultiTopicsHttpConsumer"; + std::string topicName1 = "persistent://public/default/patternMultiTopicsHttpConsumerPubSub1"; + std::string topicName2 = "persistent://public/default/patternMultiTopicsHttpConsumerPubSub2"; + std::string topicName3 = "persistent://public/default/patternMultiTopicsHttpConsumerPubSub3"; + + // call admin api to make topics partitioned + std::string url1 = + adminUrl + "admin/v2/persistent/public/default/patternMultiTopicsHttpConsumerPubSub1/partitions"; + std::string url2 = + adminUrl + "admin/v2/persistent/public/default/patternMultiTopicsHttpConsumerPubSub2/partitions"; + std::string url3 = + adminUrl + "admin/v2/persistent/public/default/patternMultiTopicsHttpConsumerPubSub3/partitions"; + + int res = makePutRequest(url1, "2"); + ASSERT_FALSE(res != 204 && res != 409); + res = makePutRequest(url2, "3"); + ASSERT_FALSE(res != 204 && res != 409); + res = makePutRequest(url3, "4"); + ASSERT_FALSE(res != 204 && res != 409); + + Producer producer1; + Result result = client.createProducer(topicName1, producer1); + ASSERT_EQ(ResultOk, result); + Producer producer2; + result = client.createProducer(topicName2, producer2); + ASSERT_EQ(ResultOk, result); + Producer producer3; + result = client.createProducer(topicName3, producer3); + ASSERT_EQ(ResultOk, result); + + LOG_INFO("created 3 producers that match, with partitions: 2, 3, 4"); + + int messageNumber = 100; + ConsumerConfiguration consConfig; + consConfig.setConsumerType(ConsumerShared); + consConfig.setReceiverQueueSize(10); // size for each sub-consumer + Consumer consumer; + Promise consumerPromise; + client.subscribeWithRegexAsync(pattern, subName, consConfig, + WaitForCallbackValue(consumerPromise)); + Future consumerFuture = consumerPromise.getFuture(); + result = consumerFuture.get(consumer); + ASSERT_EQ(ResultOk, result); + ASSERT_EQ(consumer.getSubscriptionName(), subName); + LOG_INFO("created topics consumer on a pattern that match 3 topics"); + + std::string msgContent = "msg-content"; + LOG_INFO("Publishing 100 messages by producer 1 synchronously"); + for (int msgNum = 0; msgNum < messageNumber; msgNum++) { + std::stringstream stream; + stream << msgContent << msgNum; + Message msg = MessageBuilder().setContent(stream.str()).build(); + ASSERT_EQ(ResultOk, producer1.send(msg)); + } + + msgContent = "msg-content2"; + LOG_INFO("Publishing 100 messages by producer 2 synchronously"); + for (int msgNum = 0; msgNum < messageNumber; msgNum++) { + std::stringstream stream; + stream << msgContent << msgNum; + Message msg = MessageBuilder().setContent(stream.str()).build(); + ASSERT_EQ(ResultOk, producer2.send(msg)); + } + + msgContent = "msg-content3"; + LOG_INFO("Publishing 100 messages by producer 3 synchronously"); + for (int msgNum = 0; msgNum < messageNumber; msgNum++) { + std::stringstream stream; + stream << msgContent << msgNum; + Message msg = MessageBuilder().setContent(stream.str()).build(); + ASSERT_EQ(ResultOk, producer3.send(msg)); + } + + LOG_INFO("Consuming and acking 300 messages by multiTopicsConsumer"); + for (int i = 0; i < 3 * messageNumber; i++) { + Message m; + ASSERT_EQ(ResultOk, consumer.receive(m, 1000)); + ASSERT_EQ(ResultOk, consumer.acknowledge(m)); + } + LOG_INFO("Consumed and acked 300 messages by multiTopicsConsumer"); + + // verify no more to receive + Message m; + ASSERT_EQ(ResultTimeout, consumer.receive(m, 1000)); + + ASSERT_EQ(ResultOk, consumer.unsubscribe()); + + client.shutdown(); +} + // create a pattern consumer, which contains no match topics at beginning. // create 4 topics, in which 3 topics match the pattern. // verify PatternMultiTopicsConsumer subscribed matched topics, after a while,