Skip to content

Commit

Permalink
[Issue 5676][C++ client] Expose redelivery count (apache#5677)
Browse files Browse the repository at this point in the history
Fixes apache#5676

### Motivation


Expose the redelivery count in the C++ client.

### Modifications

Exposed the redelivery count from the broker in `Message` and `MessageImpl`. Set the counter when receiving messages. Added test.
  • Loading branch information
frejonb authored and sijie committed Nov 22, 2019
1 parent 413aa74 commit a00791d
Show file tree
Hide file tree
Showing 11 changed files with 84 additions and 4 deletions.
5 changes: 5 additions & 0 deletions pulsar-client-cpp/include/pulsar/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ class PULSAR_PUBLIC Message {
*/
const std::string& getTopicName() const;

/**
* Get the redelivery count for this message
*/
const int getRedeliveryCount() const;

bool operator==(const Message& msg) const;

private:
Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/include/pulsar/c/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ PULSAR_PUBLIC uint64_t pulsar_message_get_event_timestamp(pulsar_message_t *mess

PULSAR_PUBLIC const char *pulsar_message_get_topic_name(pulsar_message_t *message);

PULSAR_PUBLIC int pulsar_message_get_redelivery_count(pulsar_message_t *message);

#ifdef __cplusplus
}
#endif
6 changes: 4 additions & 2 deletions pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
Message m(msg, metadata, payload, partitionIndex_);
m.impl_->cnx_ = cnx.get();
m.impl_->setTopicName(topic_);
m.impl_->setRedeliveryCount(msg.redelivery_count());

LOG_DEBUG(getName() << " metadata.num_messages_in_batch() = " << metadata.num_messages_in_batch());
LOG_DEBUG(getName() << " metadata.has_num_messages_in_batch() = "
Expand All @@ -291,7 +292,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
unsigned int numOfMessageReceived = 1;
if (metadata.has_num_messages_in_batch()) {
Lock lock(mutex_);
numOfMessageReceived = receiveIndividualMessagesFromBatch(cnx, m);
numOfMessageReceived = receiveIndividualMessagesFromBatch(cnx, m, msg.redelivery_count());
} else {
Lock lock(pendingReceiveMutex_);
// if asyncReceive is waiting then notify callback without adding to incomingMessages queue
Expand Down Expand Up @@ -358,7 +359,7 @@ void ConsumerImpl::notifyPendingReceivedCallback(Result result, Message& msg,

// Zero Queue size is not supported with Batch Messages
uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx,
Message& batchedMessage) {
Message& batchedMessage, int redeliveryCount) {
unsigned int batchSize = batchedMessage.impl_->metadata.num_messages_in_batch();
batchAcknowledgementTracker_.receivedMessage(batchedMessage);
LOG_DEBUG("Received Batch messages of size - " << batchSize
Expand All @@ -369,6 +370,7 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
for (int i = 0; i < batchSize; i++) {
// This is a cheap copy since message contains only one shared pointer (impl_)
Message msg = Commands::deSerializeSingleMessageInBatch(batchedMessage, i);
msg.impl_->setRedeliveryCount(redeliveryCount);

if (startMessageId_.is_present()) {
const MessageId& msgId = msg.getMessageId();
Expand Down
3 changes: 2 additions & 1 deletion pulsar-client-cpp/lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ class ConsumerImpl : public ConsumerImplBase,
proto::CommandAck::ValidationError validationError);
void increaseAvailablePermits(const ClientConnectionPtr& currentCnx, int numberOfPermits = 1);
void drainIncomingMessageQueue(size_t count);
uint32_t receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx, Message& batchedMessage);
uint32_t receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx, Message& batchedMessage,
int redeliveryCount);
void brokerConsumerStatsListener(Result, BrokerConsumerStatsImpl, BrokerConsumerStatsCallback);

bool decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
Expand Down
7 changes: 7 additions & 0 deletions pulsar-client-cpp/lib/Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ const std::string& Message::getTopicName() const {
return impl_->getTopicName();
}

const int Message::getRedeliveryCount() const {
if (!impl_) {
return 0;
}
return impl_->getRedeliveryCount();
}

uint64_t Message::getPublishTimestamp() const { return impl_ ? impl_->getPublishTimestamp() : 0ull; }

uint64_t Message::getEventTimestamp() const { return impl_ ? impl_->getEventTimestamp() : 0ull; }
Expand Down
6 changes: 5 additions & 1 deletion pulsar-client-cpp/lib/MessageImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

namespace pulsar {

MessageImpl::MessageImpl() : metadata(), payload(), messageId(), cnx_(0), topicName_() {}
MessageImpl::MessageImpl() : metadata(), payload(), messageId(), cnx_(0), topicName_(), redeliveryCount_() {}

const Message::StringMap& MessageImpl::properties() {
if (properties_.size() == 0) {
Expand Down Expand Up @@ -92,4 +92,8 @@ void MessageImpl::setTopicName(const std::string& topicName) {

const std::string& MessageImpl::getTopicName() { return *topicName_; }

int MessageImpl::getRedeliveryCount() { return redeliveryCount_; }

void MessageImpl::setRedeliveryCount(int count) { redeliveryCount_ = count; }

} // namespace pulsar
4 changes: 4 additions & 0 deletions pulsar-client-cpp/lib/MessageImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class MessageImpl {
MessageId messageId;
ClientConnection* cnx_;
const std::string* topicName_;
int redeliveryCount_;

const std::string& getPartitionKey() const;
bool hasPartitionKey() const;
Expand All @@ -62,6 +63,9 @@ class MessageImpl {
*/
void setTopicName(const std::string& topicName);

int getRedeliveryCount();
void setRedeliveryCount(int count);

friend class PulsarWrapper;
friend class MessageBuilder;

Expand Down
4 changes: 4 additions & 0 deletions pulsar-client-cpp/lib/c/c_Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,7 @@ pulsar_string_map_t *pulsar_message_get_properties(pulsar_message_t *message) {
const char *pulsar_message_get_topic_name(pulsar_message_t *message) {
return message->message.getTopicName().c_str();
}

int pulsar_message_get_redelivery_count(pulsar_message_t *message) {
return message->message.getRedeliveryCount();
}
2 changes: 2 additions & 0 deletions pulsar-client-cpp/test-conf/standalone-ssl.conf
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ statusFilePath=/usr/local/apache/htdocs
# Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction
maxUnackedMessagesPerConsumer=50000

subscriptionRedeliveryTrackerEnabled=true

### --- Authentication --- ###

# Enable TLS
Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/test-conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ statusFilePath=/usr/local/apache/htdocs
# Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction
maxUnackedMessagesPerConsumer=50000

subscriptionRedeliveryTrackerEnabled=true

### --- Authentication --- ###

# Enable authentication
Expand Down
47 changes: 47 additions & 0 deletions pulsar-client-cpp/tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,53 @@ TEST(BasicEndToEndTest, testProduceConsume) {
ASSERT_EQ(ResultOk, client.close());
}

TEST(BasicEndToEndTest, testRedeliveryCount) {
ClientConfiguration config;
Client client(lookupUrl, config);
std::string topicName = "persistent://public/default/test-redelivery-count";
std::string subName = "my-sub-name";

Producer producer;
Promise<Result, Producer> producerPromise;
client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise));
Future<Result, Producer> producerFuture = producerPromise.getFuture();
Result result = producerFuture.get(producer);
ASSERT_EQ(ResultOk, result);

Consumer consumer;
Promise<Result, Consumer> consumerPromise;
ConsumerConfiguration consumerConf;
consumerConf.setNegativeAckRedeliveryDelayMs(500);
consumerConf.setConsumerType(ConsumerShared);
client.subscribeAsync(topicName, subName, consumerConf, WaitForCallbackValue<Consumer>(consumerPromise));
Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
result = consumerFuture.get(consumer);
ASSERT_EQ(ResultOk, result);
std::string temp = producer.getTopic();
ASSERT_EQ(temp, topicName);
temp = consumer.getTopic();
ASSERT_EQ(temp, topicName);
ASSERT_EQ(consumer.getSubscriptionName(), subName);

std::string content = "msg-content";
Message msg = MessageBuilder().setContent(content).build();
producer.send(msg);

int redeliveryCount = 0;
Message msgReceived;
for (int i = 0; i < 4; i++) {
consumer.receive(msgReceived);
LOG_INFO("Received message " << msgReceived.getDataAsString());
consumer.negativeAcknowledge(msgReceived);
redeliveryCount = msgReceived.getRedeliveryCount();
}

ASSERT_EQ(3, redeliveryCount);
consumer.acknowledge(msgReceived);
consumer.close();
producer.close();
}

TEST(BasicEndToEndTest, testLookupThrottling) {
std::string topicName = "testLookupThrottling";
ClientConfiguration config;
Expand Down

0 comments on commit a00791d

Please sign in to comment.