diff --git a/pulsar-client-cpp/include/pulsar/Consumer.h b/pulsar-client-cpp/include/pulsar/Consumer.h
index d9828330b14af..7dee15f7895b5 100644
--- a/pulsar-client-cpp/include/pulsar/Consumer.h
+++ b/pulsar-client-cpp/include/pulsar/Consumer.h
@@ -171,6 +171,70 @@ class Consumer {
void acknowledgeCumulativeAsync(const Message& message, ResultCallback callback);
void acknowledgeCumulativeAsync(const MessageId& messageId, ResultCallback callback);
+ /**
+ * Acknowledge the failure to process a single message.
+ *
+ * When a message is "negatively acked" it will be marked for redelivery after
+ * some fixed delay. The delay is configurable when constructing the consumer
+ * with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
+ *
+ * This call is not blocking.
+ *
+ *
+ * Example of usage:
+ *
+ * while (true) {
+ * Message msg;
+ * consumer.receive(msg);
+ *
+ * try {
+ * // Process message...
+ *
+ * consumer.acknowledge(msg);
+ * } catch (Throwable t) {
+ * log.warn("Failed to process message");
+ * consumer.negativeAcknowledge(msg);
+ * }
+ * }
+ *
+ *
+ * @param message
+ * The {@code Message} to be acknowledged
+ */
+ void negativeAcknowledge(const Message& message);
+
+ /**
+ * Acknowledge the failure to process a single message.
+ *
+ * When a message is "negatively acked" it will be marked for redelivery after
+ * some fixed delay. The delay is configurable when constructing the consumer
+ * with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
+ *
+ * This call is not blocking.
+ *
+ *
+ * Example of usage:
+ *
+ * while (true) {
+ * Message msg;
+ * consumer.receive(msg);
+ *
+ * try {
+ * // Process message...
+ *
+ * consumer.acknowledge(msg);
+ * } catch (Throwable t) {
+ * log.warn("Failed to process message");
+ * consumer.negativeAcknowledge(msg);
+ * }
+ * }
+ *
+ *
+ * @param messageId
+ * The {@code MessageId} to be acknowledged
+ */
+ void negativeAcknowledge(const MessageId& messageId);
+
Result close();
void closeAsync(ResultCallback callback);
diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index e30ab80a038a6..362e1781b5a13 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -149,6 +149,27 @@ class ConsumerConfiguration {
*/
long getUnAckedMessagesTimeoutMs() const;
+ /**
+ * Set the delay to wait before re-delivering messages that have failed to be process.
+ *
+ * When application uses {@link Consumer#negativeAcknowledge(Message)}, the failed message
+ * will be redelivered after a fixed timeout. The default is 1 min.
+ *
+ * @param redeliveryDelay
+ * redelivery delay for failed messages
+ * @param timeUnit
+ * unit in which the timeout is provided.
+ * @return the consumer builder instance
+ */
+ void setNegativeAckRedeliveryDelayMs(long redeliveryDelayMillis);
+
+ /**
+ * Get the configured delay to wait before re-delivering messages that have failed to be process.
+ *
+ * @return redelivery delay for failed messages
+ */
+ long getNegativeAckRedeliveryDelayMs() const;
+
/**
* Set the time duration for which the broker side consumer stats will be cached in the client.
* @param cacheTimeInMs in milliseconds
diff --git a/pulsar-client-cpp/include/pulsar/MessageId.h b/pulsar-client-cpp/include/pulsar/MessageId.h
index e25444b94cc4a..a8a9276f1a560 100644
--- a/pulsar-client-cpp/include/pulsar/MessageId.h
+++ b/pulsar-client-cpp/include/pulsar/MessageId.h
@@ -22,7 +22,6 @@
#include
#include
#include
-//#include
#pragma GCC visibility push(default)
@@ -87,6 +86,7 @@ class MessageId {
friend class BatchAcknowledgementTracker;
friend class PulsarWrapper;
friend class PulsarFriend;
+ friend class NegativeAcksTracker;
friend std::ostream& operator<<(std::ostream& s, const MessageId& messageId);
diff --git a/pulsar-client-cpp/include/pulsar/c/consumer.h b/pulsar-client-cpp/include/pulsar/c/consumer.h
index f350ee00a2b19..f84a822f4e421 100644
--- a/pulsar-client-cpp/include/pulsar/c/consumer.h
+++ b/pulsar-client-cpp/include/pulsar/c/consumer.h
@@ -164,6 +164,34 @@ void pulsar_consumer_acknowledge_cumulative_async_id(pulsar_consumer_t *consumer
pulsar_message_id_t *messageId,
pulsar_result_callback callback, void *ctx);
+/**
+ * Acknowledge the failure to process a single message.
+ *
+ * When a message is "negatively acked" it will be marked for redelivery after
+ * some fixed delay. The delay is configurable when constructing the consumer
+ * with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
+ *
+ * This call is not blocking.
+ *
+ * @param message
+ * The {@code Message} to be acknowledged
+ */
+void pulsar_consumer_negative_acknowledge(pulsar_consumer_t *consumer, pulsar_message_t *message);
+
+/**
+ * Acknowledge the failure to process a single message through its message id
+ *
+ * When a message is "negatively acked" it will be marked for redelivery after
+ * some fixed delay. The delay is configurable when constructing the consumer
+ * with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
+ *
+ * This call is not blocking.
+ *
+ * @param message
+ * The message id to be acknowledged
+ */
+void pulsar_consumer_negative_acknowledge_id(pulsar_consumer_t *consumer, pulsar_message_id_t *messageId);
+
pulsar_result pulsar_consumer_close(pulsar_consumer_t *consumer);
void pulsar_consumer_close_async(pulsar_consumer_t *consumer, pulsar_result_callback callback, void *ctx);
diff --git a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
index b90963ae82ee0..810d062658fcd 100644
--- a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
@@ -158,6 +158,30 @@ void pulsar_consumer_set_unacked_messages_timeout_ms(pulsar_consumer_configurati
*/
long pulsar_consumer_get_unacked_messages_timeout_ms(pulsar_consumer_configuration_t *consumer_configuration);
+/**
+ * Set the delay to wait before re-delivering messages that have failed to be process.
+ *
+ * When application uses {@link Consumer#negativeAcknowledge(Message)}, the failed message
+ * will be redelivered after a fixed timeout. The default is 1 min.
+ *
+ * @param redeliveryDelay
+ * redelivery delay for failed messages
+ * @param timeUnit
+ * unit in which the timeout is provided.
+ * @return the consumer builder instance
+ */
+void pulsar_configure_set_negative_ack_redelivery_delay_ms(
+ pulsar_consumer_configuration_t *consumer_configuration, long redeliveryDelayMillis);
+
+/**
+ * Get the configured delay to wait before re-delivering messages that have failed to be process.
+ *
+ * @param consumer_configuration the consumer conf object
+ * @return redelivery delay for failed messages
+ */
+long pulsar_configure_get_negative_ack_redelivery_delay_ms(
+ pulsar_consumer_configuration_t *consumer_configuration);
+
int pulsar_consumer_is_encryption_enabled(pulsar_consumer_configuration_t *consumer_configuration);
int pulsar_consumer_is_read_compacted(pulsar_consumer_configuration_t *consumer_configuration);
diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index 06b96f9032b7d..6c5a63bd0bb31 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -366,11 +366,17 @@ SharedBuffer Commands::newPong() {
return writeMessageWithSize(cmd);
}
-SharedBuffer Commands::newRedeliverUnacknowledgedMessages(uint64_t consumerId) {
+SharedBuffer Commands::newRedeliverUnacknowledgedMessages(uint64_t consumerId,
+ const std::set& messageIds) {
BaseCommand cmd;
cmd.set_type(BaseCommand::REDELIVER_UNACKNOWLEDGED_MESSAGES);
CommandRedeliverUnacknowledgedMessages* command = cmd.mutable_redeliverunacknowledgedmessages();
command->set_consumer_id(consumerId);
+ for (const auto& msgId : messageIds) {
+ MessageIdData* msgIdData = command->add_message_ids();
+ msgIdData->set_ledgerid(msgId.ledgerId());
+ msgIdData->set_entryid(msgId.entryId());
+ }
return writeMessageWithSize(cmd);
}
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index 967270cbd9d20..4952089ce6db1 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -27,6 +27,8 @@
#include "SharedBuffer.h"
#include "Utils.h"
+#include
+
using namespace pulsar;
namespace pulsar {
@@ -102,7 +104,8 @@ class Commands {
static SharedBuffer newPing();
static SharedBuffer newPong();
- static SharedBuffer newRedeliverUnacknowledgedMessages(uint64_t consumerId);
+ static SharedBuffer newRedeliverUnacknowledgedMessages(uint64_t consumerId,
+ const std::set& messageIds);
static std::string messageType(proto::BaseCommand::Type type);
diff --git a/pulsar-client-cpp/lib/Consumer.cc b/pulsar-client-cpp/lib/Consumer.cc
index cbe44fef855bd..a968a91a3f566 100644
--- a/pulsar-client-cpp/lib/Consumer.cc
+++ b/pulsar-client-cpp/lib/Consumer.cc
@@ -142,6 +142,15 @@ void Consumer::acknowledgeCumulativeAsync(const MessageId& messageId, ResultCall
impl_->acknowledgeCumulativeAsync(messageId, callback);
}
+void Consumer::negativeAcknowledge(const Message& message) { negativeAcknowledge(message.getMessageId()); }
+
+void Consumer::negativeAcknowledge(const MessageId& messageId) {
+ if (impl_) {
+ impl_->negativeAcknowledge(messageId);
+ ;
+ }
+}
+
Result Consumer::close() {
Promise promise;
closeAsync(WaitForCallback(promise));
diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
index 4530ebd7773df..1d004207709b5 100644
--- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
@@ -92,6 +92,14 @@ void ConsumerConfiguration::setUnAckedMessagesTimeoutMs(const uint64_t milliSeco
impl_->unAckedMessagesTimeoutMs = milliSeconds;
}
+void ConsumerConfiguration::setNegativeAckRedeliveryDelayMs(long redeliveryDelayMillis) {
+ impl_->negativeAckRedeliveryDelay = std::chrono::milliseconds(redeliveryDelayMillis);
+}
+
+long ConsumerConfiguration::getNegativeAckRedeliveryDelayMs() const {
+ return impl_->negativeAckRedeliveryDelay.count();
+}
+
bool ConsumerConfiguration::isEncryptionEnabled() const { return (impl_->cryptoKeyReader != NULL); }
const CryptoKeyReaderPtr ConsumerConfiguration::getCryptoKeyReader() const { return impl_->cryptoKeyReader; }
diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
index 70f725bc65481..30285ff35b5d5 100644
--- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
@@ -21,10 +21,14 @@
#include
+#include
+
namespace pulsar {
struct ConsumerConfigurationImpl {
SchemaInfo schemaInfo;
long unAckedMessagesTimeoutMs;
+
+ std::chrono::milliseconds negativeAckRedeliveryDelay;
ConsumerType consumerType;
MessageListener messageListener;
bool hasMessageListener;
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 7571d6679ba94..980d9f31bed9a 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -57,6 +57,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
batchAcknowledgementTracker_(topic_, subscription, (long)consumerId_),
brokerConsumerStats_(),
consumerStatsBasePtr_(),
+ negativeAcksTracker_(client, *this, conf),
msgCrypto_(),
readCompacted_(conf.isReadCompacted()),
lastMessageInBroker_(Optional::of(MessageId())) {
@@ -793,6 +794,11 @@ void ConsumerImpl::doAcknowledge(const MessageId& messageId, proto::CommandAck_A
}
}
+void ConsumerImpl::negativeAcknowledge(const MessageId& messageId) {
+ unAckedMessageTrackerPtr_->remove(messageId);
+ negativeAcksTracker_.add(messageId);
+}
+
void ConsumerImpl::disconnectConsumer() {
LOG_INFO("Broker notification of Closed consumer: " << consumerId_);
Lock lock(mutex_);
@@ -918,14 +924,16 @@ Result ConsumerImpl::resumeMessageListener() {
}
void ConsumerImpl::redeliverUnacknowledgedMessages() {
+ static std::set emptySet;
+ redeliverMessages(emptySet);
+}
+
+void ConsumerImpl::redeliverMessages(const std::set& messageIds) {
ClientConnectionPtr cnx = getCnx().lock();
if (cnx) {
if (cnx->getServerProtocolVersion() >= proto::v2) {
- cnx->sendCommand(Commands::newRedeliverUnacknowledgedMessages(consumerId_));
+ cnx->sendCommand(Commands::newRedeliverUnacknowledgedMessages(consumerId_, messageIds));
LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for Consumer - " << getConsumerId());
- } else {
- LOG_DEBUG("Reconnecting the client to redeliver the messages for Consumer - " << getName());
- cnx->close();
}
} else {
LOG_DEBUG("Connection not ready for Consumer - " << getConsumerId());
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index b5fe761876040..b917f79099dcc 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -26,6 +26,7 @@
#include "HandlerBase.h"
#include "ClientConnection.h"
#include "lib/UnAckedMessageTrackerEnabled.h"
+#include "NegativeAcksTracker.h"
#include "Commands.h"
#include "ExecutorService.h"
#include "ConsumerImplBase.h"
@@ -92,7 +93,12 @@ class ConsumerImpl : public ConsumerImplBase,
virtual void receiveAsync(ReceiveCallback& callback);
Result fetchSingleMessageFromBroker(Message& msg);
virtual void acknowledgeAsync(const MessageId& msgId, ResultCallback callback);
+
virtual void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback);
+
+ virtual void redeliverMessages(const std::set& messageIds);
+ virtual void negativeAcknowledge(const MessageId& msgId);
+
virtual void closeAsync(ResultCallback callback);
virtual void start();
virtual void shutdown();
@@ -169,6 +175,7 @@ class ConsumerImpl : public ConsumerImplBase,
UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
BatchAcknowledgementTracker batchAcknowledgementTracker_;
BrokerConsumerStatsImpl brokerConsumerStats_;
+ NegativeAcksTracker negativeAcksTracker_;
MessageCryptoPtr msgCrypto_;
const bool readCompacted_;
diff --git a/pulsar-client-cpp/lib/ConsumerImplBase.h b/pulsar-client-cpp/lib/ConsumerImplBase.h
index 766b0a9c79aca..c7162707f9c2e 100644
--- a/pulsar-client-cpp/lib/ConsumerImplBase.h
+++ b/pulsar-client-cpp/lib/ConsumerImplBase.h
@@ -21,6 +21,8 @@
#include
#include
+#include
+
namespace pulsar {
class ConsumerImplBase;
@@ -50,6 +52,7 @@ class ConsumerImplBase {
virtual int getNumOfPrefetchedMessages() const = 0;
virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) = 0;
virtual void seekAsync(const MessageId& msgId, ResultCallback callback) = 0;
+ virtual void negativeAcknowledge(const MessageId& msgId) = 0;
};
} // namespace pulsar
#endif // PULSAR_CONSUMER_IMPL_BASE_HEADER
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 963c20f835b0a..2c01da5668cba 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -563,6 +563,15 @@ void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId,
callback(ResultOperationNotSupported);
}
+void MultiTopicsConsumerImpl::negativeAcknowledge(const MessageId& msgId) {
+ auto iterator = consumers_.find(msgId.getTopicName());
+
+ if (consumers_.end() != iterator) {
+ unAckedMessageTrackerPtr_->remove(msgId);
+ iterator->second->negativeAcknowledge(msgId);
+ }
+}
+
MultiTopicsConsumerImpl::~MultiTopicsConsumerImpl() {}
Future MultiTopicsConsumerImpl::getConsumerCreatedFuture() {
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index 74350b9eede0d..757c1507d4dbc 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -80,6 +80,8 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
// not supported
virtual void seekAsync(const MessageId& msgId, ResultCallback callback);
+ virtual void negativeAcknowledge(const MessageId& msgId);
+
protected:
const ClientImplPtr client_;
const std::string subscriptionName_;
diff --git a/pulsar-client-cpp/lib/NegativeAcksTracker.cc b/pulsar-client-cpp/lib/NegativeAcksTracker.cc
new file mode 100644
index 0000000000000..66afd1bc94765
--- /dev/null
+++ b/pulsar-client-cpp/lib/NegativeAcksTracker.cc
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "NegativeAcksTracker.h"
+
+#include "ConsumerImpl.h"
+
+#include
+#include
+
+namespace pulsar {
+const std::chrono::milliseconds NegativeAcksTracker::MIN_NACK_DELAY_NANOS = std::chrono::milliseconds(100);
+
+NegativeAcksTracker::NegativeAcksTracker(ClientImplPtr client, ConsumerImpl &consumer,
+ const ConsumerConfiguration &conf)
+ : consumer_(consumer),
+ nackDelay_(
+ std::max(std::chrono::milliseconds(conf.getNegativeAckRedeliveryDelayMs()), MIN_NACK_DELAY_NANOS)),
+ timerInterval_((long)(nackDelay_.count() / 3)),
+ executor_(client->getIOExecutorProvider()->get()) {}
+
+void NegativeAcksTracker::scheduleTimer() {
+ timer_ = executor_->createDeadlineTimer();
+ timer_->expires_from_now(timerInterval_);
+ timer_->async_wait(std::bind(&NegativeAcksTracker::handleTimer, this, std::placeholders::_1));
+}
+
+void NegativeAcksTracker::handleTimer(const boost::system::error_code &ec) {
+ std::lock_guard lock(mutex_);
+ timer_ = nullptr;
+
+ if (ec) {
+ // Ignore cancelled events
+ return;
+ }
+
+ if (nackedMessages_.empty()) {
+ return;
+ }
+
+ // Group all the nacked messages into one single re-delivery request
+ std::set messagesToRedeliver;
+
+ auto now = Clock::now();
+
+ for (auto it = nackedMessages_.begin(); it != nackedMessages_.end();) {
+ if (it->second < now) {
+ messagesToRedeliver.insert(it->first);
+ it = nackedMessages_.erase(it);
+ } else {
+ ++it;
+ }
+ }
+
+ consumer_.redeliverMessages(messagesToRedeliver);
+ scheduleTimer();
+}
+
+void NegativeAcksTracker::add(const MessageId &m) {
+ std::lock_guard lock(mutex_);
+
+ auto now = Clock::now();
+
+ // Erase batch id to group all nacks from same batch
+ MessageId batchMessageId = MessageId(m.partition(), m.ledgerId(), m.entryId(), -1);
+ nackedMessages_[batchMessageId] = now + nackDelay_;
+
+ if (!timer_) {
+ scheduleTimer();
+ }
+}
+
+void NegativeAcksTracker::close() {
+ std::lock_guard lock(mutex_);
+
+ if (timer_) {
+ boost::system::error_code ec;
+ timer_->cancel(ec);
+ }
+}
+
+} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/NegativeAcksTracker.h b/pulsar-client-cpp/lib/NegativeAcksTracker.h
new file mode 100644
index 0000000000000..4a1bbcdb0eb75
--- /dev/null
+++ b/pulsar-client-cpp/lib/NegativeAcksTracker.h
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include
+
+#include "ExecutorService.h"
+#include "ClientImpl.h"
+
+#include
+#include