Skip to content

Commit

Permalink
Add support in cpp client for 1 partitioned topic (apache#5016)
Browse files Browse the repository at this point in the history
### Motivation

IN PR apache#4883, we support 1 partitioned topic producer/consumer in java client. this is for Cpp client support.

### Modifications
- change cpp client 
- add unit test

### Verifying this change
New added Ut Passed
  • Loading branch information
jiazhai authored Aug 26, 2019
1 parent 4fa87a4 commit bf94890
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 11 deletions.
8 changes: 4 additions & 4 deletions pulsar-client-cpp/lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ void ClientImpl::handleCreateProducer(const Result result, const LookupDataResul
CreateProducerCallback callback) {
if (!result) {
ProducerImplBasePtr producer;
if (partitionMetadata->getPartitions() > 1) {
if (partitionMetadata->getPartitions() > 0) {
producer = std::make_shared<PartitionedProducerImpl>(shared_from_this(), topicName,
partitionMetadata->getPartitions(), conf);
} else {
Expand Down Expand Up @@ -221,7 +221,7 @@ void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDat
return;
}

if (partitionMetadata->getPartitions() > 1) {
if (partitionMetadata->getPartitions() > 0) {
LOG_ERROR("Topic reader cannot be created on a partitioned topic: " << topicName->toString());
callback(ResultOperationNotSupported, Reader());
return;
Expand Down Expand Up @@ -360,7 +360,7 @@ void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr
conf.setConsumerName(generateRandomName());
}
ConsumerImplBasePtr consumer;
if (partitionMetadata->getPartitions() > 1) {
if (partitionMetadata->getPartitions() > 0) {
if (conf.getReceiverQueueSize() == 0) {
LOG_ERROR("Can't use partitioned topic if the queue size is 0.");
callback(ResultInvalidConfiguration, Consumer());
Expand Down Expand Up @@ -435,7 +435,7 @@ void ClientImpl::handleGetPartitions(const Result result, const LookupDataResult

StringList partitions;

if (partitionMetadata->getPartitions() > 1) {
if (partitionMetadata->getPartitions() > 0) {
for (unsigned int i = 0; i < partitionMetadata->getPartitions(); i++) {
partitions.push_back(topicName->getTopicPartitionName(i));
}
Expand Down
16 changes: 9 additions & 7 deletions pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,21 +162,23 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, shared_from_this(),
std::placeholders::_1, std::placeholders::_2));

int numPartitions = partitionMetadata->getPartitions() >= 1 ? partitionMetadata->getPartitions() : 1;
int numPartitions = partitionMetadata->getPartitions();
int partitions = numPartitions == 0 ? 1 : numPartitions;

// Apply total limit of receiver queue size across partitions
config.setReceiverQueueSize(
std::min(conf_.getReceiverQueueSize(),
(int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions)));
(int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() / partitions)));

Lock lock(mutex_);
topicsPartitions_.insert(std::make_pair(topicName->toString(), numPartitions));
topicsPartitions_.insert(std::make_pair(topicName->toString(), partitions));
lock.unlock();
numberTopicPartitions_->fetch_add(numPartitions);
numberTopicPartitions_->fetch_add(partitions);

std::shared_ptr<std::atomic<int>> partitionsNeedCreate =
std::make_shared<std::atomic<int>>(numPartitions);
std::shared_ptr<std::atomic<int>> partitionsNeedCreate = std::make_shared<std::atomic<int>>(partitions);

if (numPartitions == 1) {
// non-partitioned topic
if (numPartitions == 0) {
// We don't have to add partition-n suffix
consumer = std::make_shared<ConsumerImpl>(client_, topicName->toString(), subscriptionName_, config,
internalListenerExecutor, NonPartitioned);
Expand Down
69 changes: 69 additions & 0 deletions pulsar-client-cpp/tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2978,3 +2978,72 @@ TEST(BasicEndToEndTest, testRegexTopicsWithMessageListener) {
timeWaited += 500;
}
}

TEST(BasicEndToEndTest, testPartitionedTopicWithOnePartition) {
ClientConfiguration config;
Client client(lookupUrl);
std::string topicName = "testPartitionedTopicWithOnePartition";
std::string subsName = topicName + "-sub-";

// call admin api to make 1 partition
std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions";
int putRes = makePutRequest(url, "1");
LOG_INFO("res = " << putRes);
ASSERT_FALSE(putRes != 204 && putRes != 409);

Consumer consumer1;
ConsumerConfiguration conf;
Result result = client.subscribe(topicName, subsName + "1", consumer1);
ASSERT_EQ(ResultOk, result);

Consumer consumer2;
result = client.subscribe(topicName + "-partition-0", subsName + "2", consumer2);
ASSERT_EQ(ResultOk, result);

LOG_INFO("created 2 consumer");

Producer producer1;
ProducerConfiguration producerConf;
producerConf.setBatchingEnabled(false);
result = client.createProducer(topicName, producerConf, producer1);
ASSERT_EQ(ResultOk, result);

Producer producer2;
result = client.createProducer(topicName + "-partition-0", producerConf, producer2);
ASSERT_EQ(ResultOk, result);

LOG_INFO("created 2 producer");

// create messages
int numMessages = 10;
for (int i = 0; i < numMessages; i++) {
Message msg = MessageBuilder().setContent("test-producer1-" + topicName + std::to_string(i)).build();
producer1.send(msg);
msg = MessageBuilder().setContent("test-producer2-" + topicName + std::to_string(i)).build();
producer2.send(msg);
}

// produced 10 messages by each producer.
// expected receive 20 messages by each consumer.
for (int i = 0; i < numMessages * 2; i++) {
LOG_INFO("begin to receive message " << i);

Message msg;
Result res = consumer1.receive(msg, 100);
ASSERT_EQ(ResultOk, res);
consumer1.acknowledge(msg);

res = consumer2.receive(msg, 100);
ASSERT_EQ(ResultOk, res);
consumer2.acknowledge(msg);
}

// No more messages expected
Message msg;
Result res = consumer1.receive(msg, 100);
ASSERT_EQ(ResultTimeout, res);

res = consumer2.receive(msg, 100);
ASSERT_EQ(ResultTimeout, res);
client.shutdown();
}

0 comments on commit bf94890

Please sign in to comment.