Skip to content

Commit

Permalink
Negative acks in C++ client (apache#3750)
Browse files Browse the repository at this point in the history
* Negative acks in C++ client

* Fixed formatting

* Fixed api docs
  • Loading branch information
merlimat authored Mar 13, 2019
1 parent 0158439 commit 2d44850
Show file tree
Hide file tree
Showing 22 changed files with 475 additions and 26 deletions.
64 changes: 64 additions & 0 deletions pulsar-client-cpp/include/pulsar/Consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,70 @@ class Consumer {
void acknowledgeCumulativeAsync(const Message& message, ResultCallback callback);
void acknowledgeCumulativeAsync(const MessageId& messageId, ResultCallback callback);

/**
* Acknowledge the failure to process a single message.
* <p>
* When a message is "negatively acked" it will be marked for redelivery after
* some fixed delay. The delay is configurable when constructing the consumer
* with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
* <p>
* This call is not blocking.
*
* <p>
* Example of usage:
* <pre><code>
* while (true) {
* Message msg;
* consumer.receive(msg);
*
* try {
* // Process message...
*
* consumer.acknowledge(msg);
* } catch (Throwable t) {
* log.warn("Failed to process message");
* consumer.negativeAcknowledge(msg);
* }
* }
* </code></pre>
*
* @param message
* The {@code Message} to be acknowledged
*/
void negativeAcknowledge(const Message& message);

/**
* Acknowledge the failure to process a single message.
* <p>
* When a message is "negatively acked" it will be marked for redelivery after
* some fixed delay. The delay is configurable when constructing the consumer
* with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
* <p>
* This call is not blocking.
*
* <p>
* Example of usage:
* <pre><code>
* while (true) {
* Message msg;
* consumer.receive(msg);
*
* try {
* // Process message...
*
* consumer.acknowledge(msg);
* } catch (Throwable t) {
* log.warn("Failed to process message");
* consumer.negativeAcknowledge(msg);
* }
* }
* </code></pre>
*
* @param messageId
* The {@code MessageId} to be acknowledged
*/
void negativeAcknowledge(const MessageId& messageId);

Result close();

void closeAsync(ResultCallback callback);
Expand Down
21 changes: 21 additions & 0 deletions pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,27 @@ class ConsumerConfiguration {
*/
long getUnAckedMessagesTimeoutMs() const;

/**
* Set the delay to wait before re-delivering messages that have failed to be process.
* <p>
* When application uses {@link Consumer#negativeAcknowledge(Message)}, the failed message
* will be redelivered after a fixed timeout. The default is 1 min.
*
* @param redeliveryDelay
* redelivery delay for failed messages
* @param timeUnit
* unit in which the timeout is provided.
* @return the consumer builder instance
*/
void setNegativeAckRedeliveryDelayMs(long redeliveryDelayMillis);

/**
* Get the configured delay to wait before re-delivering messages that have failed to be process.
*
* @return redelivery delay for failed messages
*/
long getNegativeAckRedeliveryDelayMs() const;

/**
* Set the time duration for which the broker side consumer stats will be cached in the client.
* @param cacheTimeInMs in milliseconds
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/include/pulsar/MessageId.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <iosfwd>
#include <stdint.h>
#include <memory>
//#include <lib/MessageIdImpl.h>

#pragma GCC visibility push(default)

Expand Down Expand Up @@ -87,6 +86,7 @@ class MessageId {
friend class BatchAcknowledgementTracker;
friend class PulsarWrapper;
friend class PulsarFriend;
friend class NegativeAcksTracker;

friend std::ostream& operator<<(std::ostream& s, const MessageId& messageId);

Expand Down
28 changes: 28 additions & 0 deletions pulsar-client-cpp/include/pulsar/c/consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,34 @@ void pulsar_consumer_acknowledge_cumulative_async_id(pulsar_consumer_t *consumer
pulsar_message_id_t *messageId,
pulsar_result_callback callback, void *ctx);

/**
* Acknowledge the failure to process a single message.
* <p>
* When a message is "negatively acked" it will be marked for redelivery after
* some fixed delay. The delay is configurable when constructing the consumer
* with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
* <p>
* This call is not blocking.
*
* @param message
* The {@code Message} to be acknowledged
*/
void pulsar_consumer_negative_acknowledge(pulsar_consumer_t *consumer, pulsar_message_t *message);

/**
* Acknowledge the failure to process a single message through its message id
* <p>
* When a message is "negatively acked" it will be marked for redelivery after
* some fixed delay. The delay is configurable when constructing the consumer
* with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
* <p>
* This call is not blocking.
*
* @param message
* The message id to be acknowledged
*/
void pulsar_consumer_negative_acknowledge_id(pulsar_consumer_t *consumer, pulsar_message_id_t *messageId);

pulsar_result pulsar_consumer_close(pulsar_consumer_t *consumer);

void pulsar_consumer_close_async(pulsar_consumer_t *consumer, pulsar_result_callback callback, void *ctx);
Expand Down
24 changes: 24 additions & 0 deletions pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,30 @@ void pulsar_consumer_set_unacked_messages_timeout_ms(pulsar_consumer_configurati
*/
long pulsar_consumer_get_unacked_messages_timeout_ms(pulsar_consumer_configuration_t *consumer_configuration);

/**
* Set the delay to wait before re-delivering messages that have failed to be process.
* <p>
* When application uses {@link Consumer#negativeAcknowledge(Message)}, the failed message
* will be redelivered after a fixed timeout. The default is 1 min.
*
* @param redeliveryDelay
* redelivery delay for failed messages
* @param timeUnit
* unit in which the timeout is provided.
* @return the consumer builder instance
*/
void pulsar_configure_set_negative_ack_redelivery_delay_ms(
pulsar_consumer_configuration_t *consumer_configuration, long redeliveryDelayMillis);

/**
* Get the configured delay to wait before re-delivering messages that have failed to be process.
*
* @param consumer_configuration the consumer conf object
* @return redelivery delay for failed messages
*/
long pulsar_configure_get_negative_ack_redelivery_delay_ms(
pulsar_consumer_configuration_t *consumer_configuration);

int pulsar_consumer_is_encryption_enabled(pulsar_consumer_configuration_t *consumer_configuration);

int pulsar_consumer_is_read_compacted(pulsar_consumer_configuration_t *consumer_configuration);
Expand Down
8 changes: 7 additions & 1 deletion pulsar-client-cpp/lib/Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -366,11 +366,17 @@ SharedBuffer Commands::newPong() {
return writeMessageWithSize(cmd);
}

SharedBuffer Commands::newRedeliverUnacknowledgedMessages(uint64_t consumerId) {
SharedBuffer Commands::newRedeliverUnacknowledgedMessages(uint64_t consumerId,
const std::set<MessageId>& messageIds) {
BaseCommand cmd;
cmd.set_type(BaseCommand::REDELIVER_UNACKNOWLEDGED_MESSAGES);
CommandRedeliverUnacknowledgedMessages* command = cmd.mutable_redeliverunacknowledgedmessages();
command->set_consumer_id(consumerId);
for (const auto& msgId : messageIds) {
MessageIdData* msgIdData = command->add_message_ids();
msgIdData->set_ledgerid(msgId.ledgerId());
msgIdData->set_entryid(msgId.entryId());
}
return writeMessageWithSize(cmd);
}

Expand Down
5 changes: 4 additions & 1 deletion pulsar-client-cpp/lib/Commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
#include "SharedBuffer.h"
#include "Utils.h"

#include <set>

using namespace pulsar;

namespace pulsar {
Expand Down Expand Up @@ -102,7 +104,8 @@ class Commands {
static SharedBuffer newPing();
static SharedBuffer newPong();

static SharedBuffer newRedeliverUnacknowledgedMessages(uint64_t consumerId);
static SharedBuffer newRedeliverUnacknowledgedMessages(uint64_t consumerId,
const std::set<MessageId>& messageIds);

static std::string messageType(proto::BaseCommand::Type type);

Expand Down
9 changes: 9 additions & 0 deletions pulsar-client-cpp/lib/Consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,15 @@ void Consumer::acknowledgeCumulativeAsync(const MessageId& messageId, ResultCall
impl_->acknowledgeCumulativeAsync(messageId, callback);
}

void Consumer::negativeAcknowledge(const Message& message) { negativeAcknowledge(message.getMessageId()); }

void Consumer::negativeAcknowledge(const MessageId& messageId) {
if (impl_) {
impl_->negativeAcknowledge(messageId);
;
}
}

Result Consumer::close() {
Promise<bool, Result> promise;
closeAsync(WaitForCallback(promise));
Expand Down
8 changes: 8 additions & 0 deletions pulsar-client-cpp/lib/ConsumerConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ void ConsumerConfiguration::setUnAckedMessagesTimeoutMs(const uint64_t milliSeco
impl_->unAckedMessagesTimeoutMs = milliSeconds;
}

void ConsumerConfiguration::setNegativeAckRedeliveryDelayMs(long redeliveryDelayMillis) {
impl_->negativeAckRedeliveryDelay = std::chrono::milliseconds(redeliveryDelayMillis);
}

long ConsumerConfiguration::getNegativeAckRedeliveryDelayMs() const {
return impl_->negativeAckRedeliveryDelay.count();
}

bool ConsumerConfiguration::isEncryptionEnabled() const { return (impl_->cryptoKeyReader != NULL); }

const CryptoKeyReaderPtr ConsumerConfiguration::getCryptoKeyReader() const { return impl_->cryptoKeyReader; }
Expand Down
4 changes: 4 additions & 0 deletions pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@

#include <pulsar/ConsumerConfiguration.h>

#include <chrono>

namespace pulsar {
struct ConsumerConfigurationImpl {
SchemaInfo schemaInfo;
long unAckedMessagesTimeoutMs;

std::chrono::milliseconds negativeAckRedeliveryDelay;
ConsumerType consumerType;
MessageListener messageListener;
bool hasMessageListener;
Expand Down
16 changes: 12 additions & 4 deletions pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
batchAcknowledgementTracker_(topic_, subscription, (long)consumerId_),
brokerConsumerStats_(),
consumerStatsBasePtr_(),
negativeAcksTracker_(client, *this, conf),
msgCrypto_(),
readCompacted_(conf.isReadCompacted()),
lastMessageInBroker_(Optional<MessageId>::of(MessageId())) {
Expand Down Expand Up @@ -793,6 +794,11 @@ void ConsumerImpl::doAcknowledge(const MessageId& messageId, proto::CommandAck_A
}
}

void ConsumerImpl::negativeAcknowledge(const MessageId& messageId) {
unAckedMessageTrackerPtr_->remove(messageId);
negativeAcksTracker_.add(messageId);
}

void ConsumerImpl::disconnectConsumer() {
LOG_INFO("Broker notification of Closed consumer: " << consumerId_);
Lock lock(mutex_);
Expand Down Expand Up @@ -918,14 +924,16 @@ Result ConsumerImpl::resumeMessageListener() {
}

void ConsumerImpl::redeliverUnacknowledgedMessages() {
static std::set<MessageId> emptySet;
redeliverMessages(emptySet);
}

void ConsumerImpl::redeliverMessages(const std::set<MessageId>& messageIds) {
ClientConnectionPtr cnx = getCnx().lock();
if (cnx) {
if (cnx->getServerProtocolVersion() >= proto::v2) {
cnx->sendCommand(Commands::newRedeliverUnacknowledgedMessages(consumerId_));
cnx->sendCommand(Commands::newRedeliverUnacknowledgedMessages(consumerId_, messageIds));
LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for Consumer - " << getConsumerId());
} else {
LOG_DEBUG("Reconnecting the client to redeliver the messages for Consumer - " << getName());
cnx->close();
}
} else {
LOG_DEBUG("Connection not ready for Consumer - " << getConsumerId());
Expand Down
7 changes: 7 additions & 0 deletions pulsar-client-cpp/lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "HandlerBase.h"
#include "ClientConnection.h"
#include "lib/UnAckedMessageTrackerEnabled.h"
#include "NegativeAcksTracker.h"
#include "Commands.h"
#include "ExecutorService.h"
#include "ConsumerImplBase.h"
Expand Down Expand Up @@ -92,7 +93,12 @@ class ConsumerImpl : public ConsumerImplBase,
virtual void receiveAsync(ReceiveCallback& callback);
Result fetchSingleMessageFromBroker(Message& msg);
virtual void acknowledgeAsync(const MessageId& msgId, ResultCallback callback);

virtual void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback);

virtual void redeliverMessages(const std::set<MessageId>& messageIds);
virtual void negativeAcknowledge(const MessageId& msgId);

virtual void closeAsync(ResultCallback callback);
virtual void start();
virtual void shutdown();
Expand Down Expand Up @@ -169,6 +175,7 @@ class ConsumerImpl : public ConsumerImplBase,
UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
BatchAcknowledgementTracker batchAcknowledgementTracker_;
BrokerConsumerStatsImpl brokerConsumerStats_;
NegativeAcksTracker negativeAcksTracker_;

MessageCryptoPtr msgCrypto_;
const bool readCompacted_;
Expand Down
3 changes: 3 additions & 0 deletions pulsar-client-cpp/lib/ConsumerImplBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include <pulsar/Message.h>
#include <pulsar/Consumer.h>

#include <set>

namespace pulsar {
class ConsumerImplBase;

Expand Down Expand Up @@ -50,6 +52,7 @@ class ConsumerImplBase {
virtual int getNumOfPrefetchedMessages() const = 0;
virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) = 0;
virtual void seekAsync(const MessageId& msgId, ResultCallback callback) = 0;
virtual void negativeAcknowledge(const MessageId& msgId) = 0;
};
} // namespace pulsar
#endif // PULSAR_CONSUMER_IMPL_BASE_HEADER
9 changes: 9 additions & 0 deletions pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,15 @@ void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId,
callback(ResultOperationNotSupported);
}

void MultiTopicsConsumerImpl::negativeAcknowledge(const MessageId& msgId) {
auto iterator = consumers_.find(msgId.getTopicName());

if (consumers_.end() != iterator) {
unAckedMessageTrackerPtr_->remove(msgId);
iterator->second->negativeAcknowledge(msgId);
}
}

MultiTopicsConsumerImpl::~MultiTopicsConsumerImpl() {}

Future<Result, ConsumerImplBaseWeakPtr> MultiTopicsConsumerImpl::getConsumerCreatedFuture() {
Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
// not supported
virtual void seekAsync(const MessageId& msgId, ResultCallback callback);

virtual void negativeAcknowledge(const MessageId& msgId);

protected:
const ClientImplPtr client_;
const std::string subscriptionName_;
Expand Down
Loading

0 comments on commit 2d44850

Please sign in to comment.