diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc index 47250c8f944e5..4d64d2413c482 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.cc +++ b/pulsar-client-cpp/lib/ConsumerImpl.cc @@ -54,7 +54,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, // This is the initial capacity of the queue incomingMessages_(std::max(config_.getReceiverQueueSize(), 1)), pendingReceives_(), - availablePermits_(conf.getReceiverQueueSize()), + availablePermits_(0), + receiverQueueRefillThreshold_(config_.getReceiverQueueSize() / 2), consumerId_(client->newConsumerId()), consumerName_(config_.getConsumerName()), partitionIndex_(-1), @@ -202,9 +203,12 @@ void ConsumerImpl::connectionFailed(Result result) { } } -void ConsumerImpl::receiveMessages(const ClientConnectionPtr& cnx, unsigned int count) { - SharedBuffer cmd = Commands::newFlow(consumerId_, count); - cnx->sendCommand(cmd); +void ConsumerImpl::sendFlowPermitsToBroker(const ClientConnectionPtr& cnx, int numMessages) { + if (cnx) { + LOG_DEBUG(getName() << "Send more permits: " << numMessages); + SharedBuffer cmd = Commands::newFlow(consumerId_, static_cast(numMessages)); + cnx->sendCommand(cmd); + } } void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result result) { @@ -223,7 +227,7 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r backoff_.reset(); // Complicated logic since we don't have a isLocked() function for mutex if (waitingForZeroQueueSizeMessage) { - receiveMessages(cnx, 1); + sendFlowPermitsToBroker(cnx, 1); } availablePermits_ = 0; } @@ -231,9 +235,9 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r LOG_DEBUG(getName() << "Send initial flow permits: " << config_.getReceiverQueueSize()); if (consumerTopicType_ == NonPartitioned || !firstTime) { if (config_.getReceiverQueueSize() != 0) { - receiveMessages(cnx, config_.getReceiverQueueSize()); + sendFlowPermitsToBroker(cnx, config_.getReceiverQueueSize()); } else if (messageListener_) { - receiveMessages(cnx, 1); + sendFlowPermitsToBroker(cnx, 1); } } consumerCreatedPromise_.setValue(shared_from_this()); @@ -380,11 +384,9 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto:: } if (messageListener_) { - Lock lock(messageListenerMutex_); if (!messageListenerRunning_) { return; } - lock.unlock(); // Trigger message listener callback in a separate thread while (numOfMessageReceived--) { listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, shared_from_this())); @@ -556,11 +558,9 @@ void ConsumerImpl::discardCorruptedMessage(const ClientConnectionPtr& cnx, } void ConsumerImpl::internalListener() { - Lock lock(messageListenerMutex_); if (!messageListenerRunning_) { return; } - lock.unlock(); Message msg; if (!incomingMessages_.pop(msg, std::chrono::milliseconds(0))) { // This will only happen when the connection got reset and we cleared the queue @@ -596,10 +596,7 @@ Result ConsumerImpl::fetchSingleMessageFromBroker(Message& msg) { waitingForZeroQueueSizeMessage = true; localLock.unlock(); - if (currentCnx) { - LOG_DEBUG(getName() << "Send more permits: " << 1); - receiveMessages(currentCnx, 1); - } + sendFlowPermitsToBroker(currentCnx, 1); while (true) { incomingMessages_.pop(msg); @@ -646,11 +643,7 @@ void ConsumerImpl::receiveAsync(ReceiveCallback& callback) { lock.unlock(); if (config_.getReceiverQueueSize() == 0) { - ClientConnectionPtr currentCnx = getCnx().lock(); - if (currentCnx) { - LOG_DEBUG(getName() << "Send more permits: " << 1); - receiveMessages(currentCnx, 1); - } + sendFlowPermitsToBroker(getCnx().lock(), 1); } } } @@ -752,20 +745,13 @@ Optional ConsumerImpl::clearReceiveQueue() { } } -void ConsumerImpl::increaseAvailablePermits(const ClientConnectionPtr& currentCnx, int numberOfPermits) { - int additionalPermits = 0; +void ConsumerImpl::increaseAvailablePermits(const ClientConnectionPtr& currentCnx, int delta) { + int newAvailablePermits = availablePermits_.fetch_add(delta) + delta; - availablePermits_ += numberOfPermits; - if (availablePermits_ >= config_.getReceiverQueueSize() / 2) { - additionalPermits = availablePermits_; - availablePermits_ = 0; - } - if (additionalPermits > 0) { - if (currentCnx) { - LOG_DEBUG(getName() << "Send more permits: " << additionalPermits); - receiveMessages(currentCnx, additionalPermits); - } else { - LOG_DEBUG(getName() << "Connection is not ready, Unable to send flow Command"); + while (newAvailablePermits >= receiverQueueRefillThreshold_ && messageListenerRunning_) { + if (availablePermits_.compare_exchange_weak(newAvailablePermits, 0)) { + sendFlowPermitsToBroker(currentCnx, newAvailablePermits); + break; } } } @@ -972,7 +958,6 @@ Result ConsumerImpl::pauseMessageListener() { if (!messageListener_) { return ResultInvalidConfiguration; } - Lock lock(messageListenerMutex_); messageListenerRunning_ = false; return ResultOk; } @@ -982,19 +967,19 @@ Result ConsumerImpl::resumeMessageListener() { return ResultInvalidConfiguration; } - Lock lock(messageListenerMutex_); if (messageListenerRunning_) { // Not paused return ResultOk; } messageListenerRunning_ = true; const size_t count = incomingMessages_.size(); - lock.unlock(); for (size_t i = 0; i < count; i++) { // Trigger message listener callback in a separate thread listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, shared_from_this())); } + // Check current permits and determine whether to send FLOW command + this->increaseAvailablePermits(getCnx().lock(), 0); return ResultOk; } diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h index 313f1d9eabed0..ea67893404bab 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.h +++ b/pulsar-client-cpp/lib/ConsumerImpl.h @@ -43,6 +43,7 @@ #include #include #include +#include using namespace pulsar; @@ -73,11 +74,10 @@ class ConsumerImpl : public ConsumerImplBase, ~ConsumerImpl(); void setPartitionIndex(int partitionIndex); int getPartitionIndex(); - void receiveMessages(const ClientConnectionPtr& cnx, unsigned int count); + void sendFlowPermitsToBroker(const ClientConnectionPtr& cnx, int numMessages); uint64_t getConsumerId(); void messageReceived(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg, bool& isChecksumValid, proto::MessageMetadata& msgMetadata, SharedBuffer& payload); - int incrementAndGetPermits(uint64_t cnxSequenceId); void messageProcessed(Message& msg); inline proto::CommandSubscribe_SubType getSubType(); inline proto::CommandSubscribe_InitialPosition getInitialPosition(); @@ -150,7 +150,7 @@ class ConsumerImpl : public ConsumerImplBase, const proto::MessageMetadata& metadata, SharedBuffer& payload); void discardCorruptedMessage(const ClientConnectionPtr& cnx, const proto::MessageIdData& messageId, proto::CommandAck::ValidationError validationError); - void increaseAvailablePermits(const ClientConnectionPtr& currentCnx, int numberOfPermits = 1); + void increaseAvailablePermits(const ClientConnectionPtr& currentCnx, int delta = 1); void drainIncomingMessageQueue(size_t count); uint32_t receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx, Message& batchedMessage, int redeliveryCount); @@ -185,14 +185,14 @@ class ConsumerImpl : public ConsumerImplBase, Optional lastDequedMessage_; UnboundedBlockingQueue incomingMessages_; std::queue pendingReceives_; - int availablePermits_; + std::atomic_int availablePermits_; + const int receiverQueueRefillThreshold_; uint64_t consumerId_; std::string consumerName_; std::string consumerStr_; int32_t partitionIndex_; Promise consumerCreatedPromise_; - bool messageListenerRunning_; - std::mutex messageListenerMutex_; + std::atomic_bool messageListenerRunning_; CompressionCodecProvider compressionCodecProvider_; UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_; BatchAcknowledgementTracker batchAcknowledgementTracker_; diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc index b62083aa12a01..46660e7b62586 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc @@ -613,7 +613,7 @@ void MultiTopicsConsumerImpl::receiveMessages() { for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end(); consumer++) { ConsumerImplPtr consumerPtr = consumer->second; - consumerPtr->receiveMessages(consumerPtr->getCnx().lock(), conf_.getReceiverQueueSize()); + consumerPtr->sendFlowPermitsToBroker(consumerPtr->getCnx().lock(), conf_.getReceiverQueueSize()); LOG_DEBUG("Sending FLOW command for consumer - " << consumerPtr->getConsumerId()); } } diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc index 8881a0db9531c..5571a6c34d554 100644 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc @@ -454,7 +454,7 @@ void PartitionedConsumerImpl::internalListener(Consumer consumer) { void PartitionedConsumerImpl::receiveMessages() { for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) { ConsumerImplPtr consumer = *i; - consumer->receiveMessages(consumer->getCnx().lock(), conf_.getReceiverQueueSize()); + consumer->sendFlowPermitsToBroker(consumer->getCnx().lock(), conf_.getReceiverQueueSize()); LOG_DEBUG("Sending FLOW command for consumer - " << consumer->getConsumerId()); } } diff --git a/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc b/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc index 53b6c9d7df87e..6d6c9332fa572 100644 --- a/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc +++ b/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc @@ -20,7 +20,12 @@ #include #include #include "ConsumerTest.h" +#include +#include +#include #include +#include +#include DECLARE_LOG_OBJECT() @@ -116,3 +121,105 @@ TEST(ZeroQueueSizeTest, testMessageListener) { producer.close(); client.close(); } + +static ConsumerConfiguration zeroQueueSharedConsumerConf( + const std::string& name, std::function callback) { + ConsumerConfiguration conf; + conf.setConsumerType(ConsumerShared); + conf.setReceiverQueueSize(0); + conf.setSubscriptionInitialPosition(InitialPositionEarliest); + conf.setMessageListener([name, callback](Consumer consumer, const Message& msg) { + LOG_INFO(name << " received " << msg.getDataAsString() << " from " << msg.getMessageId()); + callback(consumer, msg); + }); + return conf; +} + +class IntVector { + public: + size_t add(int i) { + std::lock_guard lock(mutex_); + data_.emplace_back(i); + return data_.size(); + } + + std::vector data() const { + std::lock_guard lock(mutex_); + return data_; + } + + private: + std::vector data_; + mutable std::mutex mutex_; +}; + +TEST(ZeroQueueSizeTest, testPauseResume) { + Client client(lookupUrl); + const auto topic = "ZeroQueueSizeTestPauseListener-" + std::to_string(time(nullptr)); + const auto subscription = "my-sub"; + + auto intToMessage = [](int i) { return MessageBuilder().setContent(std::to_string(i)).build(); }; + auto messageToInt = [](const Message& msg) { return std::stoi(msg.getDataAsString()); }; + + // 1. Produce 10 messages + Producer producer; + const auto producerConf = ProducerConfiguration().setBatchingEnabled(false); + ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf, producer)); + for (int i = 0; i < 10; i++) { + MessageId id; + ASSERT_EQ(ResultOk, producer.send(intToMessage(i), id)); + LOG_INFO("Send " << i << " to " << id); + } + + // 2. consumer-1 receives 1 message and pause + std::mutex mtx; + std::condition_variable condConsumer1FirstMessage; + std::condition_variable condConsumer1Completed; + IntVector messages1; + const auto conf1 = zeroQueueSharedConsumerConf("consumer-1", [&](Consumer consumer, const Message& msg) { + const auto numReceived = messages1.add(messageToInt(msg)); + if (numReceived == 1) { + ASSERT_EQ(ResultOk, consumer.pauseMessageListener()); + condConsumer1FirstMessage.notify_all(); + } else if (numReceived == 5) { + ASSERT_EQ(ResultOk, consumer.pauseMessageListener()); + condConsumer1Completed.notify_all(); + } + }); + Consumer consumer1; + ASSERT_EQ(ResultOk, client.subscribe(topic, subscription, conf1, consumer1)); + { + std::unique_lock lock(mtx); + ASSERT_EQ(condConsumer1FirstMessage.wait_for(lock, std::chrono::seconds(3)), + std::cv_status::no_timeout); + ASSERT_EQ(messages1.data(), (std::vector{0})); + } + + // 3. consumer-2 receives 5 messages and pause + std::condition_variable condConsumer2Completed; + IntVector messages2; + const auto conf2 = zeroQueueSharedConsumerConf("consumer-2", [&](Consumer consumer, const Message& msg) { + const int numReceived = messages2.add(messageToInt(msg)); + if (numReceived == 5) { + ASSERT_EQ(ResultOk, consumer.pauseMessageListener()); + condConsumer2Completed.notify_all(); + } + }); + Consumer consumer2; + ASSERT_EQ(ResultOk, client.subscribe(topic, subscription, conf2, consumer2)); + { + std::unique_lock lock(mtx); + ASSERT_EQ(condConsumer2Completed.wait_for(lock, std::chrono::seconds(3)), std::cv_status::no_timeout); + ASSERT_EQ(messages2.data(), (std::vector{1, 2, 3, 4, 5})); + } + + // 4. consumer-1 resumes listening, and receives last 4 messages + ASSERT_EQ(ResultOk, consumer1.resumeMessageListener()); + { + std::unique_lock lock(mtx); + ASSERT_EQ(condConsumer1Completed.wait_for(lock, std::chrono::seconds(3)), std::cv_status::no_timeout); + ASSERT_EQ(messages1.data(), (std::vector{0, 6, 7, 8, 9})); + } + + client.close(); +}