Skip to content

Commit

Permalink
[C++] Fix paused zero queue consumer still pre-fetches messages (apac…
Browse files Browse the repository at this point in the history
…he#10036)

* Fix zero queue consumer pre-fetches messages after paused

* Remove unused logs and checks

* Fix pulsar image build failure

* Fix AtomicHelper functions

* Use fetch_add as a substitute of addAndGet
  • Loading branch information
BewareMyPower authored Mar 26, 2021
1 parent 3181689 commit fb6b4ea
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 44 deletions.
57 changes: 21 additions & 36 deletions pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
// This is the initial capacity of the queue
incomingMessages_(std::max(config_.getReceiverQueueSize(), 1)),
pendingReceives_(),
availablePermits_(conf.getReceiverQueueSize()),
availablePermits_(0),
receiverQueueRefillThreshold_(config_.getReceiverQueueSize() / 2),
consumerId_(client->newConsumerId()),
consumerName_(config_.getConsumerName()),
partitionIndex_(-1),
Expand Down Expand Up @@ -202,9 +203,12 @@ void ConsumerImpl::connectionFailed(Result result) {
}
}

void ConsumerImpl::receiveMessages(const ClientConnectionPtr& cnx, unsigned int count) {
SharedBuffer cmd = Commands::newFlow(consumerId_, count);
cnx->sendCommand(cmd);
void ConsumerImpl::sendFlowPermitsToBroker(const ClientConnectionPtr& cnx, int numMessages) {
if (cnx) {
LOG_DEBUG(getName() << "Send more permits: " << numMessages);
SharedBuffer cmd = Commands::newFlow(consumerId_, static_cast<unsigned int>(numMessages));
cnx->sendCommand(cmd);
}
}

void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result result) {
Expand All @@ -223,17 +227,17 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r
backoff_.reset();
// Complicated logic since we don't have a isLocked() function for mutex
if (waitingForZeroQueueSizeMessage) {
receiveMessages(cnx, 1);
sendFlowPermitsToBroker(cnx, 1);
}
availablePermits_ = 0;
}

LOG_DEBUG(getName() << "Send initial flow permits: " << config_.getReceiverQueueSize());
if (consumerTopicType_ == NonPartitioned || !firstTime) {
if (config_.getReceiverQueueSize() != 0) {
receiveMessages(cnx, config_.getReceiverQueueSize());
sendFlowPermitsToBroker(cnx, config_.getReceiverQueueSize());
} else if (messageListener_) {
receiveMessages(cnx, 1);
sendFlowPermitsToBroker(cnx, 1);
}
}
consumerCreatedPromise_.setValue(shared_from_this());
Expand Down Expand Up @@ -380,11 +384,9 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
}

if (messageListener_) {
Lock lock(messageListenerMutex_);
if (!messageListenerRunning_) {
return;
}
lock.unlock();
// Trigger message listener callback in a separate thread
while (numOfMessageReceived--) {
listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, shared_from_this()));
Expand Down Expand Up @@ -556,11 +558,9 @@ void ConsumerImpl::discardCorruptedMessage(const ClientConnectionPtr& cnx,
}

void ConsumerImpl::internalListener() {
Lock lock(messageListenerMutex_);
if (!messageListenerRunning_) {
return;
}
lock.unlock();
Message msg;
if (!incomingMessages_.pop(msg, std::chrono::milliseconds(0))) {
// This will only happen when the connection got reset and we cleared the queue
Expand Down Expand Up @@ -596,10 +596,7 @@ Result ConsumerImpl::fetchSingleMessageFromBroker(Message& msg) {
waitingForZeroQueueSizeMessage = true;
localLock.unlock();

if (currentCnx) {
LOG_DEBUG(getName() << "Send more permits: " << 1);
receiveMessages(currentCnx, 1);
}
sendFlowPermitsToBroker(currentCnx, 1);

while (true) {
incomingMessages_.pop(msg);
Expand Down Expand Up @@ -646,11 +643,7 @@ void ConsumerImpl::receiveAsync(ReceiveCallback& callback) {
lock.unlock();

if (config_.getReceiverQueueSize() == 0) {
ClientConnectionPtr currentCnx = getCnx().lock();
if (currentCnx) {
LOG_DEBUG(getName() << "Send more permits: " << 1);
receiveMessages(currentCnx, 1);
}
sendFlowPermitsToBroker(getCnx().lock(), 1);
}
}
}
Expand Down Expand Up @@ -752,20 +745,13 @@ Optional<MessageId> ConsumerImpl::clearReceiveQueue() {
}
}

void ConsumerImpl::increaseAvailablePermits(const ClientConnectionPtr& currentCnx, int numberOfPermits) {
int additionalPermits = 0;
void ConsumerImpl::increaseAvailablePermits(const ClientConnectionPtr& currentCnx, int delta) {
int newAvailablePermits = availablePermits_.fetch_add(delta) + delta;

availablePermits_ += numberOfPermits;
if (availablePermits_ >= config_.getReceiverQueueSize() / 2) {
additionalPermits = availablePermits_;
availablePermits_ = 0;
}
if (additionalPermits > 0) {
if (currentCnx) {
LOG_DEBUG(getName() << "Send more permits: " << additionalPermits);
receiveMessages(currentCnx, additionalPermits);
} else {
LOG_DEBUG(getName() << "Connection is not ready, Unable to send flow Command");
while (newAvailablePermits >= receiverQueueRefillThreshold_ && messageListenerRunning_) {
if (availablePermits_.compare_exchange_weak(newAvailablePermits, 0)) {
sendFlowPermitsToBroker(currentCnx, newAvailablePermits);
break;
}
}
}
Expand Down Expand Up @@ -972,7 +958,6 @@ Result ConsumerImpl::pauseMessageListener() {
if (!messageListener_) {
return ResultInvalidConfiguration;
}
Lock lock(messageListenerMutex_);
messageListenerRunning_ = false;
return ResultOk;
}
Expand All @@ -982,19 +967,19 @@ Result ConsumerImpl::resumeMessageListener() {
return ResultInvalidConfiguration;
}

Lock lock(messageListenerMutex_);
if (messageListenerRunning_) {
// Not paused
return ResultOk;
}
messageListenerRunning_ = true;
const size_t count = incomingMessages_.size();
lock.unlock();

for (size_t i = 0; i < count; i++) {
// Trigger message listener callback in a separate thread
listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, shared_from_this()));
}
// Check current permits and determine whether to send FLOW command
this->increaseAvailablePermits(getCnx().lock(), 0);
return ResultOk;
}

Expand Down
12 changes: 6 additions & 6 deletions pulsar-client-cpp/lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include <lib/stats/ConsumerStatsImpl.h>
#include <lib/stats/ConsumerStatsDisabled.h>
#include <queue>
#include <atomic>

using namespace pulsar;

Expand Down Expand Up @@ -73,11 +74,10 @@ class ConsumerImpl : public ConsumerImplBase,
~ConsumerImpl();
void setPartitionIndex(int partitionIndex);
int getPartitionIndex();
void receiveMessages(const ClientConnectionPtr& cnx, unsigned int count);
void sendFlowPermitsToBroker(const ClientConnectionPtr& cnx, int numMessages);
uint64_t getConsumerId();
void messageReceived(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
bool& isChecksumValid, proto::MessageMetadata& msgMetadata, SharedBuffer& payload);
int incrementAndGetPermits(uint64_t cnxSequenceId);
void messageProcessed(Message& msg);
inline proto::CommandSubscribe_SubType getSubType();
inline proto::CommandSubscribe_InitialPosition getInitialPosition();
Expand Down Expand Up @@ -150,7 +150,7 @@ class ConsumerImpl : public ConsumerImplBase,
const proto::MessageMetadata& metadata, SharedBuffer& payload);
void discardCorruptedMessage(const ClientConnectionPtr& cnx, const proto::MessageIdData& messageId,
proto::CommandAck::ValidationError validationError);
void increaseAvailablePermits(const ClientConnectionPtr& currentCnx, int numberOfPermits = 1);
void increaseAvailablePermits(const ClientConnectionPtr& currentCnx, int delta = 1);
void drainIncomingMessageQueue(size_t count);
uint32_t receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx, Message& batchedMessage,
int redeliveryCount);
Expand Down Expand Up @@ -185,14 +185,14 @@ class ConsumerImpl : public ConsumerImplBase,
Optional<MessageId> lastDequedMessage_;
UnboundedBlockingQueue<Message> incomingMessages_;
std::queue<ReceiveCallback> pendingReceives_;
int availablePermits_;
std::atomic_int availablePermits_;
const int receiverQueueRefillThreshold_;
uint64_t consumerId_;
std::string consumerName_;
std::string consumerStr_;
int32_t partitionIndex_;
Promise<Result, ConsumerImplBaseWeakPtr> consumerCreatedPromise_;
bool messageListenerRunning_;
std::mutex messageListenerMutex_;
std::atomic_bool messageListenerRunning_;
CompressionCodecProvider compressionCodecProvider_;
UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
BatchAcknowledgementTracker batchAcknowledgementTracker_;
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ void MultiTopicsConsumerImpl::receiveMessages() {
for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end();
consumer++) {
ConsumerImplPtr consumerPtr = consumer->second;
consumerPtr->receiveMessages(consumerPtr->getCnx().lock(), conf_.getReceiverQueueSize());
consumerPtr->sendFlowPermitsToBroker(consumerPtr->getCnx().lock(), conf_.getReceiverQueueSize());
LOG_DEBUG("Sending FLOW command for consumer - " << consumerPtr->getConsumerId());
}
}
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ void PartitionedConsumerImpl::internalListener(Consumer consumer) {
void PartitionedConsumerImpl::receiveMessages() {
for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) {
ConsumerImplPtr consumer = *i;
consumer->receiveMessages(consumer->getCnx().lock(), conf_.getReceiverQueueSize());
consumer->sendFlowPermitsToBroker(consumer->getCnx().lock(), conf_.getReceiverQueueSize());
LOG_DEBUG("Sending FLOW command for consumer - " << consumer->getConsumerId());
}
}
Expand Down
107 changes: 107 additions & 0 deletions pulsar-client-cpp/tests/ZeroQueueSizeTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
#include <pulsar/Client.h>
#include <lib/Latch.h>
#include "ConsumerTest.h"
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>

DECLARE_LOG_OBJECT()

Expand Down Expand Up @@ -116,3 +121,105 @@ TEST(ZeroQueueSizeTest, testMessageListener) {
producer.close();
client.close();
}

static ConsumerConfiguration zeroQueueSharedConsumerConf(
const std::string& name, std::function<void(Consumer, const Message&)> callback) {
ConsumerConfiguration conf;
conf.setConsumerType(ConsumerShared);
conf.setReceiverQueueSize(0);
conf.setSubscriptionInitialPosition(InitialPositionEarliest);
conf.setMessageListener([name, callback](Consumer consumer, const Message& msg) {
LOG_INFO(name << " received " << msg.getDataAsString() << " from " << msg.getMessageId());
callback(consumer, msg);
});
return conf;
}

class IntVector {
public:
size_t add(int i) {
std::lock_guard<std::mutex> lock(mutex_);
data_.emplace_back(i);
return data_.size();
}

std::vector<int> data() const {
std::lock_guard<std::mutex> lock(mutex_);
return data_;
}

private:
std::vector<int> data_;
mutable std::mutex mutex_;
};

TEST(ZeroQueueSizeTest, testPauseResume) {
Client client(lookupUrl);
const auto topic = "ZeroQueueSizeTestPauseListener-" + std::to_string(time(nullptr));
const auto subscription = "my-sub";

auto intToMessage = [](int i) { return MessageBuilder().setContent(std::to_string(i)).build(); };
auto messageToInt = [](const Message& msg) { return std::stoi(msg.getDataAsString()); };

// 1. Produce 10 messages
Producer producer;
const auto producerConf = ProducerConfiguration().setBatchingEnabled(false);
ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf, producer));
for (int i = 0; i < 10; i++) {
MessageId id;
ASSERT_EQ(ResultOk, producer.send(intToMessage(i), id));
LOG_INFO("Send " << i << " to " << id);
}

// 2. consumer-1 receives 1 message and pause
std::mutex mtx;
std::condition_variable condConsumer1FirstMessage;
std::condition_variable condConsumer1Completed;
IntVector messages1;
const auto conf1 = zeroQueueSharedConsumerConf("consumer-1", [&](Consumer consumer, const Message& msg) {
const auto numReceived = messages1.add(messageToInt(msg));
if (numReceived == 1) {
ASSERT_EQ(ResultOk, consumer.pauseMessageListener());
condConsumer1FirstMessage.notify_all();
} else if (numReceived == 5) {
ASSERT_EQ(ResultOk, consumer.pauseMessageListener());
condConsumer1Completed.notify_all();
}
});
Consumer consumer1;
ASSERT_EQ(ResultOk, client.subscribe(topic, subscription, conf1, consumer1));
{
std::unique_lock<std::mutex> lock(mtx);
ASSERT_EQ(condConsumer1FirstMessage.wait_for(lock, std::chrono::seconds(3)),
std::cv_status::no_timeout);
ASSERT_EQ(messages1.data(), (std::vector<int>{0}));
}

// 3. consumer-2 receives 5 messages and pause
std::condition_variable condConsumer2Completed;
IntVector messages2;
const auto conf2 = zeroQueueSharedConsumerConf("consumer-2", [&](Consumer consumer, const Message& msg) {
const int numReceived = messages2.add(messageToInt(msg));
if (numReceived == 5) {
ASSERT_EQ(ResultOk, consumer.pauseMessageListener());
condConsumer2Completed.notify_all();
}
});
Consumer consumer2;
ASSERT_EQ(ResultOk, client.subscribe(topic, subscription, conf2, consumer2));
{
std::unique_lock<std::mutex> lock(mtx);
ASSERT_EQ(condConsumer2Completed.wait_for(lock, std::chrono::seconds(3)), std::cv_status::no_timeout);
ASSERT_EQ(messages2.data(), (std::vector<int>{1, 2, 3, 4, 5}));
}

// 4. consumer-1 resumes listening, and receives last 4 messages
ASSERT_EQ(ResultOk, consumer1.resumeMessageListener());
{
std::unique_lock<std::mutex> lock(mtx);
ASSERT_EQ(condConsumer1Completed.wait_for(lock, std::chrono::seconds(3)), std::cv_status::no_timeout);
ASSERT_EQ(messages1.data(), (std::vector<int>{0, 6, 7, 8, 9}));
}

client.close();
}

0 comments on commit fb6b4ea

Please sign in to comment.