Skip to content

Commit

Permalink
[C++] Fix message id error when subscribing a single partition (apach…
Browse files Browse the repository at this point in the history
…e#8341)

### Motivation

A consumer/reader can subscribe a single partition of a partitioned topic. However, the partition index was not set correctly. So the partition field of received messages' id is always -1.

In addition, current `TopicName#getPartitionIndex` Java implementation doesn't handle some corner cases, like "xxx-partition-00" or "xxx-partition--1". For the backward compatibility, the C++ implementation will use the same implementation and note it in Java `TopicName`'s unit test.

### Modifications

- Add `getPartitionIndex` method to `TopicName`;
- Set the partition index when a `Consumer` was created from a `Client`;
- Set the partition index when a `Reader` was created;
- Add relative unit tests for above changes;
- Note the "incorrect" behavior in Java `TopicName`'s unit test.

### Verifying this change

This change added tests and can be verified as follows:

- `TopicNameTest#testPartitionIndex`
- `ConsumerTest#testPartitionIndex`
- `ReaderTest#testPartitionIndex`

* Add getPartitionIndex() method to TopicName

* Set partition index when creating a ConsumerImpl from Client

* Add test cases to Java TopicName to note the corner cases

* Set partition index for Reader

* Fix compile error
  • Loading branch information
BewareMyPower authored Oct 23, 2020
1 parent 94d791b commit dd8be83
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 1 deletion.
4 changes: 3 additions & 1 deletion pulsar-client-cpp/lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,10 @@ void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr
consumer = std::make_shared<PartitionedConsumerImpl>(shared_from_this(), consumerName, topicName,
partitionMetadata->getPartitions(), conf);
} else {
consumer =
auto consumerImpl =
std::make_shared<ConsumerImpl>(shared_from_this(), topicName->toString(), consumerName, conf);
consumerImpl->setPartitionIndex(topicName->getPartitionIndex());
consumer = consumerImpl;
}
consumer->getConsumerCreatedFuture().addListener(
std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1,
Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/lib/ReaderImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "ClientImpl.h"
#include "ReaderImpl.h"
#include "TopicName.h"

namespace pulsar {

Expand Down Expand Up @@ -54,6 +55,7 @@ void ReaderImpl::start(const MessageId& startMessageId) {
consumer_ = std::make_shared<ConsumerImpl>(
client_.lock(), topic_, subscription, consumerConf, ExecutorServicePtr(), NonPartitioned,
Commands::SubscriptionModeNonDurable, Optional<MessageId>::of(startMessageId));
consumer_->setPartitionIndex(TopicName::getPartitionIndex(topic_));
consumer_->getConsumerCreatedFuture().addListener(std::bind(&ReaderImpl::handleConsumerCreated,
shared_from_this(), std::placeholders::_1,
std::placeholders::_2));
Expand Down
18 changes: 18 additions & 0 deletions pulsar-client-cpp/lib/TopicName.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ bool TopicName::init(const std::string& topicName) {
} else {
namespaceName_ = NamespaceName::get(property_, cluster_, namespacePortion_);
}
partition_ = TopicName::getPartitionIndex(localName_);
return true;
}
bool TopicName::parse(const std::string& topicName, std::string& domain, std::string& property,
Expand Down Expand Up @@ -237,6 +238,23 @@ const std::string TopicName::getTopicPartitionName(unsigned int partition) {
return topicPartitionName.str();
}

int TopicName::getPartitionIndex(const std::string& topic) {
const auto& suffix = PartitionedProducerImpl::PARTITION_NAME_SUFFIX;
const size_t pos = topic.rfind(suffix);
if (pos == std::string::npos) {
return -1;
}

try {
// TODO: When handling topic name like "xxx-partition-00", it should return -1.
// But here it will returns, which is consistent with Java client's behavior
// Another corner case: "xxx-partition--2" => 2 (not -1)
return std::stoi(topic.substr(topic.rfind('-') + 1));
} catch (const std::exception&) {
return -1;
}
}

NamespaceNamePtr TopicName::getNamespaceName() { return namespaceName_; }

} // namespace pulsar
3 changes: 3 additions & 0 deletions pulsar-client-cpp/lib/TopicName.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class PULSAR_PUBLIC TopicName : public ServiceUnitId {
std::string localName_;
bool isV2Topic_;
std::shared_ptr<NamespaceName> namespaceName_;
int partition_ = -1;

public:
bool isV2Topic();
Expand All @@ -57,10 +58,12 @@ class PULSAR_PUBLIC TopicName : public ServiceUnitId {
std::string toString();
bool isPersistent() const;
NamespaceNamePtr getNamespaceName();
int getPartitionIndex() const noexcept { return partition_; }
static std::shared_ptr<TopicName> get(const std::string& topicName);
bool operator==(const TopicName& other);
static std::string getEncodedName(const std::string& nameBeforeEncoding);
const std::string getTopicPartitionName(unsigned int partition);
static int getPartitionIndex(const std::string& topic);

private:
static CURL* getCurlHandle();
Expand Down
65 changes: 65 additions & 0 deletions pulsar-client-cpp/tests/ConsumerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@
*/
#include <pulsar/Client.h>
#include <gtest/gtest.h>
#include <time.h>
#include <set>

#include "../lib/Future.h"
#include "../lib/Utils.h"

#include "HttpHelper.h"

using namespace pulsar;

static const std::string lookupUrl = "pulsar://localhost:6650";
static const std::string adminUrl = "http://localhost:8080/";

TEST(ConsumerTest, consumerNotInitialized) {
Consumer consumer;

Expand Down Expand Up @@ -95,3 +102,61 @@ TEST(ConsumerTest, consumerNotInitialized) {
ASSERT_EQ(ResultConsumerNotInitialized, result);
}
}

TEST(ConsumerTest, testPartitionIndex) {
Client client(lookupUrl);

const std::string nonPartitionedTopic =
"ConsumerTestPartitionIndex-topic-" + std::to_string(time(nullptr));
const std::string partitionedTopic1 =
"ConsumerTestPartitionIndex-par-topic1-" + std::to_string(time(nullptr));
const std::string partitionedTopic2 =
"ConsumerTestPartitionIndex-par-topic2-" + std::to_string(time(nullptr));
constexpr int numPartitions = 3;

int res = makePutRequest(
adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic1 + "/partitions", "1");
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
res = makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic2 + "/partitions",
std::to_string(numPartitions));
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;

auto sendMessageToTopic = [&client](const std::string& topic) {
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));

Message msg = MessageBuilder().setContent("hello").build();
ASSERT_EQ(ResultOk, producer.send(msg));
};

// consumers
// [0] subscribes a non-partitioned topic
// [1] subscribes a partition of a partitioned topic
// [2] subscribes a partitioned topic
Consumer consumers[3];
ASSERT_EQ(ResultOk, client.subscribe(nonPartitionedTopic, "sub", consumers[0]));
ASSERT_EQ(ResultOk, client.subscribe(partitionedTopic1 + "-partition-0", "sub", consumers[1]));
ASSERT_EQ(ResultOk, client.subscribe(partitionedTopic2, "sub", consumers[2]));

sendMessageToTopic(nonPartitionedTopic);
sendMessageToTopic(partitionedTopic1);
for (int i = 0; i < numPartitions; i++) {
sendMessageToTopic(partitionedTopic2 + "-partition-" + std::to_string(i));
}

Message msg;
ASSERT_EQ(ResultOk, consumers[0].receive(msg, 5000));
ASSERT_EQ(msg.getMessageId().partition(), -1);

ASSERT_EQ(ResultOk, consumers[1].receive(msg, 5000));
ASSERT_EQ(msg.getMessageId().partition(), 0);

std::set<int> partitionIndexes;
for (int i = 0; i < 3; i++) {
ASSERT_EQ(ResultOk, consumers[2].receive(msg, 5000));
partitionIndexes.emplace(msg.getMessageId().partition());
}
ASSERT_EQ(partitionIndexes, (std::set<int>{0, 1, 2}));

client.close();
}
44 changes: 44 additions & 0 deletions pulsar-client-cpp/tests/ReaderTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
#include <pulsar/Client.h>
#include <pulsar/Reader.h>
#include "ReaderTest.h"
#include "HttpHelper.h"

#include <gtest/gtest.h>

#include <time.h>
#include <string>

#include <lib/LogUtils.h>
Expand All @@ -30,6 +32,7 @@ DECLARE_LOG_OBJECT()
using namespace pulsar;

static std::string serviceUrl = "pulsar://localhost:6650";
static const std::string adminUrl = "http://localhost:8080/";

TEST(ReaderTest, testSimpleReader) {
Client client(serviceUrl);
Expand Down Expand Up @@ -462,3 +465,44 @@ TEST(ReaderTest, testReferenceLeak) {
ASSERT_EQ(1, consumerPtr.use_count());
ASSERT_EQ(1, readerPtr.use_count());
}

TEST(ReaderTest, testPartitionIndex) {
Client client(serviceUrl);

const std::string nonPartitionedTopic = "ReaderTestPartitionIndex-topic-" + std::to_string(time(nullptr));
const std::string partitionedTopic =
"ReaderTestPartitionIndex-par-topic-" + std::to_string(time(nullptr));

int res = makePutRequest(
adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions", "2");
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;

const std::string partition0 = partitionedTopic + "-partition-0";
const std::string partition1 = partitionedTopic + "-partition-1";

ReaderConfiguration readerConf;
Reader readers[3];
ASSERT_EQ(ResultOk,
client.createReader(nonPartitionedTopic, MessageId::earliest(), readerConf, readers[0]));
ASSERT_EQ(ResultOk, client.createReader(partition0, MessageId::earliest(), readerConf, readers[1]));
ASSERT_EQ(ResultOk, client.createReader(partition1, MessageId::earliest(), readerConf, readers[2]));

Producer producers[3];
ASSERT_EQ(ResultOk, client.createProducer(nonPartitionedTopic, producers[0]));
ASSERT_EQ(ResultOk, client.createProducer(partition0, producers[1]));
ASSERT_EQ(ResultOk, client.createProducer(partition1, producers[2]));

for (auto& producer : producers) {
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("hello").build()));
}

Message msg;
readers[0].readNext(msg);
ASSERT_EQ(msg.getMessageId().partition(), -1);
readers[1].readNext(msg);
ASSERT_EQ(msg.getMessageId().partition(), 0);
readers[2].readNext(msg);
ASSERT_EQ(msg.getMessageId().partition(), 1);

client.close();
}
25 changes: 25 additions & 0 deletions pulsar-client-cpp/tests/TopicNameTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
#include <gtest/gtest.h>
#include <lib/TopicName.h>
#include <map>

using namespace pulsar;

Expand Down Expand Up @@ -154,3 +155,27 @@ TEST(TopicNameTest, testExtraArguments) {
TopicName::get("persistent:::/property/cluster/namespace/topic/some/extra/args");
ASSERT_FALSE(topicName);
}

TEST(TopicNameTest, testPartitionIndex) {
// key: topic name, value: partition index
const std::map<std::string, int> nameToPartition = {
{"persistent://public/default/xxx-partition-0", 0},
{"xxx-partition-0", 0},
{"xxx-partition-4", 4},
{"xxx-partition-13", 13},
{"xxx-partition-x", -1},
// Following cases are not the right behavior, but it's Java client's behavior
{"xxx-partition--1", 1},
{"xxx-partition-00", 0},
{"xxx-partition-012", 12},
};

for (const auto& kv : nameToPartition) {
const auto& name = kv.first;
const auto& partition = kv.second;

auto topicName = TopicName::get(name);
ASSERT_EQ(topicName->getPartitionIndex(), TopicName::getPartitionIndex(name));
ASSERT_EQ(topicName->getPartitionIndex(), partition);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,11 @@ public void topic() {
assertEquals(topicName.getPartitionIndex(), -1);

assertEquals(TopicName.getPartitionIndex("persistent://myprop/mycolo/myns/mytopic-partition-4"), 4);

// NOTE: Following behavior is not right actually, but for the backward compatibility, it shouldn't be changed
assertEquals(TopicName.getPartitionIndex("mytopic-partition--1"), 1);
assertEquals(TopicName.getPartitionIndex("mytopic-partition-00"), 0);
assertEquals(TopicName.getPartitionIndex("mytopic-partition-012"), 12);
}

@Test
Expand Down

0 comments on commit dd8be83

Please sign in to comment.