Skip to content

Commit

Permalink
[C++] Avoid sending flow requests with zero permits (apache#10506)
Browse files Browse the repository at this point in the history
* Avoid sending zero permits

* Avoid the same topic name
  • Loading branch information
BewareMyPower authored May 7, 2021
1 parent 09ef682 commit 37583f6
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ void ConsumerImpl::connectionFailed(Result result) {
}

void ConsumerImpl::sendFlowPermitsToBroker(const ClientConnectionPtr& cnx, int numMessages) {
if (cnx) {
if (cnx && numMessages > 0) {
LOG_DEBUG(getName() << "Send more permits: " << numMessages);
SharedBuffer cmd = Commands::newFlow(consumerId_, static_cast<unsigned int>(numMessages));
cnx->sendCommand(cmd);
Expand Down
61 changes: 61 additions & 0 deletions pulsar-client-cpp/tests/ZeroQueueSizeTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,64 @@ TEST(ZeroQueueSizeTest, testPauseResume) {

client.close();
}

TEST(ZeroQueueSizeTest, testPauseResumeNoReconnection) {
Client client(lookupUrl);
const auto topic = "ZeroQueueSizeTestPauseResumeNoReconnection-" + std::to_string(time(nullptr));

std::mutex mtx;
std::condition_variable cond;
bool running = true;

auto notify = [&mtx, &cond, &running] {
std::unique_lock<std::mutex> lock(mtx);
running = false;
cond.notify_all();
};
auto wait = [&mtx, &cond, &running] {
std::unique_lock<std::mutex> lock(mtx);
running = true;
while (running) {
cond.wait(lock);
}
};

std::mutex mtxForMessages;
std::vector<std::string> receivedMessages;

ConsumerConfiguration consumerConf;
consumerConf.setReceiverQueueSize(0);
consumerConf.setMessageListener(
[&mtxForMessages, &receivedMessages, &notify](Consumer consumer, const Message& msg) {
std::unique_lock<std::mutex> lock(mtxForMessages);
receivedMessages.emplace_back(msg.getDataAsString());
lock.unlock();
consumer.acknowledge(msg);
notify(); // notify the consumer that a new message arrived
});

Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topic, "my-sub", consumerConf, consumer));

Producer producer;
ASSERT_EQ(ResultOk,
client.createProducer(topic, ProducerConfiguration().setBatchingEnabled(false), producer));

constexpr int numMessages = 300;
for (int i = 0; i < numMessages; i++) {
const auto message = MessageBuilder().setContent(std::to_string(i)).build();
consumer.resumeMessageListener();
producer.sendAsync(message, {});
wait(); // wait until a new message is received
consumer.pauseMessageListener();
}

std::unique_lock<std::mutex> lock(mtxForMessages);
ASSERT_EQ(receivedMessages.size(), numMessages);
for (int i = 0; i < numMessages; i++) {
ASSERT_EQ(i, std::stoi(receivedMessages[i]));
}
lock.unlock();

client.close();
}

0 comments on commit 37583f6

Please sign in to comment.