Skip to content

Commit

Permalink
[Issue 6168] Fix Unacked Message Tracker by Using Time Partition on C…
Browse files Browse the repository at this point in the history
…++ (apache#6391)

### Motivation
Fix apache#6168 .
>On C++ lib, like the following log, unacked messages are redelivered after about 2 * unAckedMessagesTimeout.

### Modifications
As same apache#3118, by using TimePartition, fixed ` UnackedMessageTracker` .
- Add `TickDurationInMs`
- Add `redeliverUnacknowledgedMessages` which require `MessageIds` to `ConsumerImpl`, `MultiTopicsConsumerImpl` and `PartitionedConsumerImpl`.
  • Loading branch information
k2la authored Mar 2, 2020
1 parent 330e782 commit 333888a
Show file tree
Hide file tree
Showing 14 changed files with 152 additions and 50 deletions.
4 changes: 4 additions & 0 deletions pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ class PULSAR_PUBLIC ConsumerConfiguration {
*/
long getUnAckedMessagesTimeoutMs() const;

void setTickDurationInMs(const uint64_t milliSeconds);

long getTickDurationInMs() const;

/**
* Set the delay to wait before re-delivering messages that have failed to be process.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,4 @@ PULSAR_PUBLIC void pulsar_producer_configuration_set_property(pulsar_producer_co

#ifdef __cplusplus
}
#endif
#endif
6 changes: 6 additions & 0 deletions pulsar-client-cpp/lib/ConsumerConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ void ConsumerConfiguration::setUnAckedMessagesTimeoutMs(const uint64_t milliSeco
impl_->unAckedMessagesTimeoutMs = milliSeconds;
}

long ConsumerConfiguration::getTickDurationInMs() const { return impl_->tickDurationInMs; }

void ConsumerConfiguration::setTickDurationInMs(const uint64_t milliSeconds) {
impl_->tickDurationInMs = milliSeconds;
}

void ConsumerConfiguration::setNegativeAckRedeliveryDelayMs(long redeliveryDelayMillis) {
impl_->negativeAckRedeliveryDelayMs = redeliveryDelayMillis;
}
Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ namespace pulsar {
struct ConsumerConfigurationImpl {
SchemaInfo schemaInfo;
long unAckedMessagesTimeoutMs;
long tickDurationInMs;

long negativeAckRedeliveryDelayMs;
ConsumerType consumerType;
Expand All @@ -45,6 +46,7 @@ struct ConsumerConfigurationImpl {
ConsumerConfigurationImpl()
: schemaInfo(),
unAckedMessagesTimeoutMs(0),
tickDurationInMs(1000),
negativeAckRedeliveryDelayMs(60000),
consumerType(ConsumerExclusive),
messageListener(),
Expand Down
21 changes: 19 additions & 2 deletions pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,13 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
consumerStrStream << "[" << topic_ << ", " << subscription_ << ", " << consumerId_ << "] ";
consumerStr_ = consumerStrStream.str();
if (conf.getUnAckedMessagesTimeoutMs() != 0) {
unAckedMessageTrackerPtr_.reset(
new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this));
if (conf.getTickDurationInMs() > 0) {
unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this));
} else {
unAckedMessageTrackerPtr_.reset(
new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this));
}
} else {
unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
}
Expand Down Expand Up @@ -953,6 +958,18 @@ Result ConsumerImpl::resumeMessageListener() {
void ConsumerImpl::redeliverUnacknowledgedMessages() {
static std::set<MessageId> emptySet;
redeliverMessages(emptySet);
unAckedMessageTrackerPtr_->clear();
}

void ConsumerImpl::redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) {
if (messageIds.empty()) {
return;
}
if (config_.getConsumerType() != ConsumerShared && config_.getConsumerType() != ConsumerKeyShared) {
redeliverUnacknowledgedMessages();
return;
}
redeliverMessages(messageIds);
}

void ConsumerImpl::redeliverMessages(const std::set<MessageId>& messageIds) {
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class ConsumerImpl : public ConsumerImplBase,
virtual bool isCumulativeAcknowledgementAllowed(ConsumerType consumerType);

virtual void redeliverMessages(const std::set<MessageId>& messageIds);
virtual void redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds);
virtual void negativeAcknowledge(const MessageId& msgId);

virtual void closeAsync(ResultCallback callback);
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/ConsumerImplBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class ConsumerImplBase {
virtual Result pauseMessageListener() = 0;
virtual Result resumeMessageListener() = 0;
virtual void redeliverUnacknowledgedMessages() = 0;
virtual void redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) = 0;
virtual const std::string& getName() const = 0;
virtual int getNumOfPrefetchedMessages() const = 0;
virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) = 0;
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/LogUtils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,4 @@ std::string LogUtils::getLoggerName(const std::string& path) {
return path.substr(startIdx + 1, endIdx - startIdx - 1);
}

} // namespace pulsar
} // namespace pulsar
25 changes: 23 additions & 2 deletions pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,13 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
consumerStr_ = consumerStrStream.str();

if (conf.getUnAckedMessagesTimeoutMs() != 0) {
unAckedMessageTrackerPtr_.reset(
new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this));
if (conf.getTickDurationInMs() > 0) {
unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this));
} else {
unAckedMessageTrackerPtr_.reset(
new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this));
}
} else {
unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
}
Expand Down Expand Up @@ -653,6 +658,22 @@ void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages() {
consumer++) {
(consumer->second)->redeliverUnacknowledgedMessages();
}
unAckedMessageTrackerPtr_->clear();
}

void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) {
if (messageIds.empty()) {
return;
}
if (conf_.getConsumerType() != ConsumerShared && conf_.getConsumerType() != ConsumerKeyShared) {
redeliverUnacknowledgedMessages();
return;
}
LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for partitioned consumer.");
for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end();
consumer++) {
(consumer->second)->redeliverUnacknowledgedMessages(messageIds);
}
}

int MultiTopicsConsumerImpl::getNumOfPrefetchedMessages() const { return messages_.size(); }
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
virtual Result pauseMessageListener();
virtual Result resumeMessageListener();
virtual void redeliverUnacknowledgedMessages();
virtual void redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds);
virtual int getNumOfPrefetchedMessages() const;
virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback);
void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, MultiTopicsBrokerConsumerStatsPtr,
Expand Down
24 changes: 22 additions & 2 deletions pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,13 @@ PartitionedConsumerImpl::PartitionedConsumerImpl(ClientImplPtr client, const std
consumerStrStream << "[Partitioned Consumer: " << topic_ << "," << subscriptionName << ","
<< numPartitions << "]";
if (conf.getUnAckedMessagesTimeoutMs() != 0) {
unAckedMessageTrackerPtr_.reset(
new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this));
if (conf.getTickDurationInMs() > 0) {
unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this));
} else {
unAckedMessageTrackerPtr_.reset(
new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this));
}
} else {
unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
}
Expand Down Expand Up @@ -426,6 +431,21 @@ void PartitionedConsumerImpl::redeliverUnacknowledgedMessages() {
for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) {
(*i)->redeliverUnacknowledgedMessages();
}
unAckedMessageTrackerPtr_->clear();
}

void PartitionedConsumerImpl::redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) {
if (messageIds.empty()) {
return;
}
if (conf_.getConsumerType() != ConsumerShared && conf_.getConsumerType() != ConsumerKeyShared) {
redeliverUnacknowledgedMessages();
return;
}
LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for partitioned consumer.");
for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) {
(*i)->redeliverUnacknowledgedMessages(messageIds);
}
}

const std::string& PartitionedConsumerImpl::getName() const { return partitionStr_; }
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/PartitionedConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
virtual Result pauseMessageListener();
virtual Result resumeMessageListener();
virtual void redeliverUnacknowledgedMessages();
virtual void redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds);
virtual const std::string& getName() const;
virtual int getNumOfPrefetchedMessages() const;
virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback);
Expand Down
106 changes: 66 additions & 40 deletions pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() {
timeoutHandlerHelper();
ExecutorServicePtr executorService = client_->getIOExecutorProvider()->get();
timer_ = executorService->createDeadlineTimer();
timer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs_));
timer_->expires_from_now(boost::posix_time::milliseconds(tickDurationInMs_));
timer_->async_wait([&](const boost::system::error_code& ec) {
if (ec) {
LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]");
Expand All @@ -42,86 +42,112 @@ void UnAckedMessageTrackerEnabled::timeoutHandlerHelper() {
std::lock_guard<std::mutex> acquire(lock_);
LOG_DEBUG("UnAckedMessageTrackerEnabled::timeoutHandlerHelper invoked for consumerPtr_ "
<< consumerReference_.getName().c_str());
if (!oldSet_.empty()) {

std::set<MessageId> headPartition = timePartitions.front();
timePartitions.pop_front();

std::set<MessageId> msgIdsToRedeliver;
if (!headPartition.empty()) {
LOG_INFO(consumerReference_.getName().c_str()
<< ": " << oldSet_.size() << " Messages were not acked within " << timeoutMs_ << " time");
oldSet_.clear();
currentSet_.clear();
consumerReference_.redeliverUnacknowledgedMessages();
<< ": " << headPartition.size() << " Messages were not acked within "
<< timePartitions.size() * tickDurationInMs_ << " time");
for (auto it = headPartition.begin(); it != headPartition.end(); it++) {
msgIdsToRedeliver.insert(*it);
messageIdPartitionMap.erase(*it);
}
}
headPartition.clear();
timePartitions.push_back(headPartition);

if (msgIdsToRedeliver.size() > 0) {
consumerReference_.redeliverUnacknowledgedMessages(msgIdsToRedeliver);
}
oldSet_.swap(currentSet_);
}

UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, const ClientImplPtr client,
ConsumerImplBase& consumer)
: consumerReference_(consumer) {
UnAckedMessageTrackerEnabled(timeoutMs, timeoutMs, client, consumer);
}

UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, long tickDurationInMs,
const ClientImplPtr client,
ConsumerImplBase& consumer)
: consumerReference_(consumer) {
timeoutMs_ = timeoutMs;
tickDurationInMs_ = (timeoutMs >= tickDurationInMs) ? tickDurationInMs : timeoutMs;
client_ = client;

int blankPartitions = (int)std::ceil((double)timeoutMs_ / tickDurationInMs_);
for (int i = 0; i < blankPartitions + 1; i++) {
std::set<MessageId> msgIds;
timePartitions.push_back(msgIds);
}

timeoutHandler();
}

bool UnAckedMessageTrackerEnabled::add(const MessageId& m) {
std::lock_guard<std::mutex> acquire(lock_);
oldSet_.erase(m);
return currentSet_.insert(m).second;
if (messageIdPartitionMap.count(m) == 0) {
bool insert = messageIdPartitionMap.insert(std::make_pair(m, timePartitions.back())).second;
return insert && timePartitions.back().insert(m).second;
}
return false;
}

bool UnAckedMessageTrackerEnabled::isEmpty() {
std::lock_guard<std::mutex> acquire(lock_);
return oldSet_.empty() && currentSet_.empty();
return messageIdPartitionMap.empty();
}

bool UnAckedMessageTrackerEnabled::remove(const MessageId& m) {
std::lock_guard<std::mutex> acquire(lock_);
return oldSet_.erase(m) || currentSet_.erase(m);
bool removed = false;
std::map<MessageId, std::set<MessageId>>::iterator exist = messageIdPartitionMap.find(m);
if (exist != messageIdPartitionMap.end()) {
removed = exist->second.erase(m);
}
return removed;
}

long UnAckedMessageTrackerEnabled::size() {
std::lock_guard<std::mutex> acquire(lock_);
return oldSet_.size() + currentSet_.size();
return messageIdPartitionMap.size();
}

void UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) {
std::lock_guard<std::mutex> acquire(lock_);
for (std::set<MessageId>::iterator it = oldSet_.begin(); it != oldSet_.end();) {
if (*it < msgId && it->partition() == msgId.partition()) {
oldSet_.erase(it++);
} else {
it++;
}
}
for (std::set<MessageId>::iterator it = currentSet_.begin(); it != currentSet_.end();) {
if (*it < msgId && it->partition() == msgId.partition()) {
currentSet_.erase(it++);
} else {
it++;
for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end(); it++) {
MessageId msgIdInMap = it->first;
if (msgIdInMap < msgId) {
std::map<MessageId, std::set<MessageId>>::iterator exist = messageIdPartitionMap.find(msgId);
if (exist != messageIdPartitionMap.end()) {
exist->second.erase(msgId);
}
}
}
}

// this is only for MultiTopicsConsumerImpl, when un-subscribe a single topic, should remove all it's message.
void UnAckedMessageTrackerEnabled::removeTopicMessage(const std::string& topic) {
for (std::set<MessageId>::iterator it = oldSet_.begin(); it != oldSet_.end();) {
const std::string& topicPartitionName = it->getTopicName();
if (topicPartitionName.find(topic) != std::string::npos) {
oldSet_.erase(it++);
} else {
it++;
}
}
for (std::set<MessageId>::iterator it = currentSet_.begin(); it != currentSet_.end();) {
const std::string& topicPartitionName = it->getTopicName();
if (topicPartitionName.find(topic) != std::string::npos) {
currentSet_.erase(it++);
} else {
it++;
std::lock_guard<std::mutex> acquire(lock_);
for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end(); it++) {
MessageId msgIdInMap = it->first;
if (msgIdInMap.getTopicName().compare(topic) == 0) {
std::map<MessageId, std::set<MessageId>>::iterator exist = messageIdPartitionMap.find(msgIdInMap);
if (exist != messageIdPartitionMap.end()) {
exist->second.erase(msgIdInMap);
}
}
}
}

void UnAckedMessageTrackerEnabled::clear() {
currentSet_.clear();
oldSet_.clear();
messageIdPartitionMap.clear();
for (auto it = timePartitions.begin(); it != timePartitions.end(); it++) {
it->clear();
}
}

UnAckedMessageTrackerEnabled::~UnAckedMessageTrackerEnabled() {
Expand Down
6 changes: 4 additions & 2 deletions pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
public:
~UnAckedMessageTrackerEnabled();
UnAckedMessageTrackerEnabled(long timeoutMs, const ClientImplPtr, ConsumerImplBase&);
UnAckedMessageTrackerEnabled(long timeoutMs, long tickDuration, const ClientImplPtr, ConsumerImplBase&);
bool add(const MessageId& m);
bool remove(const MessageId& m);
void removeMessagesTill(const MessageId& msgId);
Expand All @@ -40,13 +41,14 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
void timeoutHandlerHelper();
bool isEmpty();
long size();
std::set<MessageId> currentSet_;
std::set<MessageId> oldSet_;
std::map<MessageId, std::set<MessageId>> messageIdPartitionMap;
std::deque<std::set<MessageId>> timePartitions;
std::mutex lock_;
DeadlineTimerPtr timer_;
ConsumerImplBase& consumerReference_;
ClientImplPtr client_;
long timeoutMs_;
long tickDurationInMs_;
};
} // namespace pulsar

Expand Down

0 comments on commit 333888a

Please sign in to comment.