Skip to content

Commit

Permalink
Use private impl for MessageId in c++ client (apache#1322)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Mar 25, 2018
1 parent 489099b commit 5efafc5
Show file tree
Hide file tree
Showing 27 changed files with 235 additions and 312 deletions.
67 changes: 0 additions & 67 deletions pulsar-client-cpp/include/pulsar/BatchMessageId.h

This file was deleted.

10 changes: 5 additions & 5 deletions pulsar-client-cpp/include/pulsar/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
#include <string>

#include <boost/shared_ptr.hpp>
#include "BatchMessageId.h"

#include "MessageId.h"

#pragma GCC visibility push(default)

Expand All @@ -39,8 +40,6 @@ class MessageBuilder;
class MessageImpl;
class PulsarWrapper;

// TODO: When releasing 2.0.0, make all methods virtual and create the virtual destructor for Google Mock
// tests
class Message {
public:
typedef std::map<std::string, std::string> StringMap;
Expand Down Expand Up @@ -128,9 +127,10 @@ class Message {
MessageImplPtr impl_;

Message(MessageImplPtr& impl);
Message(const proto::CommandMessage& msg, proto::MessageMetadata& data, SharedBuffer& payload);
Message(const proto::CommandMessage& msg, proto::MessageMetadata& data, SharedBuffer& payload,
int32_t partition);
/// Used for Batch Messages
Message(const BatchMessageId& messageID, proto::MessageMetadata& metadata, SharedBuffer& payload,
Message(const MessageId& messageID, proto::MessageMetadata& metadata, SharedBuffer& payload,
proto::SingleMessageMetadata& singleMetadata);
friend class PartitionedProducerImpl;
friend class PartitionedConsumerImpl;
Expand Down
33 changes: 20 additions & 13 deletions pulsar-client-cpp/include/pulsar/MessageId.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,12 @@

namespace pulsar {

class ConsumerImpl;
class UnAckedMessageTrackerEnabled;
class PulsarWrapper;
class MessageIdImpl;

class MessageId {
public:
MessageId& operator=(const MessageId&);
MessageId();
virtual ~MessageId() {}

/**
* MessageId representing the "earliest" or "oldest available" message stored in the topic
Expand All @@ -50,34 +47,44 @@ class MessageId {
/**
* Serialize the message id into a binary string for storing
*/
virtual void serialize(std::string& result) const;
void serialize(std::string& result) const;

/**
* Deserialize a message id from a binary string
*/
static boost::shared_ptr<MessageId> deserialize(const std::string& serializedMessageId);
static MessageId deserialize(const std::string& serializedMessageId);

// These functions compare the message order as stored in bookkeeper
bool operator<(const MessageId& other) const;
bool operator<=(const MessageId& other) const;
bool operator>(const MessageId& other) const;
bool operator>=(const MessageId& other) const;
bool operator==(const MessageId& other) const;
bool operator!=(const MessageId& other) const;

protected:
virtual int64_t getBatchIndex() const;
private:
friend class ConsumerImpl;
friend class ReaderImpl;
friend class Message;
friend class MessageImpl;
friend class Commands;
friend class BatchMessageId;
friend class PartitionedProducerImpl;
friend class PartitionedConsumerImpl;
friend class UnAckedMessageTrackerEnabled;
friend class BatchAcknowledgementTracker;
friend class PulsarWrapper;
MessageId(int64_t, int64_t);
friend class PulsarFriend;

explicit MessageId(int32_t partition, int64_t ledgerId, int64_t entryId, int32_t batchIndex);
friend std::ostream& operator<<(std::ostream& s, const MessageId& messageId);
int64_t ledgerId_;
int64_t entryId_ : 48;
short partition_ : 16;

int64_t ledgerId() const;
int64_t entryId() const;
int32_t batchIndex() const;
int32_t partition() const;

typedef boost::shared_ptr<MessageIdImpl> MessageIdImplPtr;
MessageIdImplPtr impl_;
};
} // namespace pulsar

Expand Down
40 changes: 23 additions & 17 deletions pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ DECLARE_LOG_OBJECT()
BatchAcknowledgementTracker::BatchAcknowledgementTracker(const std::string topic,
const std::string subscription,
const long consumerId)
: greatestCumulativeAckSent_(BatchMessageId()) {
: greatestCumulativeAckSent_() {
std::stringstream consumerStrStream;
consumerStrStream << "BatchAcknowledgementTracker for [" << topic << ", " << subscription << ", "
<< consumerId << "] ";
Expand All @@ -43,7 +43,7 @@ void BatchAcknowledgementTracker::receivedMessage(const Message& message) {
return;
}
Lock lock(mutex_);
BatchMessageId msgID = message.impl_->messageId;
MessageId msgID = message.impl_->messageId;

// ignore message if it is less than the last cumulative ack sent or messageID is already being tracked
TrackerMap::iterator pos = trackerMap_.find(msgID);
Expand All @@ -60,10 +60,10 @@ void BatchAcknowledgementTracker::receivedMessage(const Message& message) {
TrackerPair(msgID, boost::dynamic_bitset<>(message.impl_->metadata.num_messages_in_batch()).set()));
}

void BatchAcknowledgementTracker::deleteAckedMessage(const BatchMessageId& messageId,
void BatchAcknowledgementTracker::deleteAckedMessage(const MessageId& messageId,
proto::CommandAck_AckType ackType) {
// Not a batch message and a individual ack
if (messageId.batchIndex_ == -1 && ackType == proto::CommandAck_AckType_Individual) {
if (messageId.batchIndex() == -1 && ackType == proto::CommandAck_AckType_Individual) {
return;
}

Expand Down Expand Up @@ -104,18 +104,22 @@ void BatchAcknowledgementTracker::deleteAckedMessage(const BatchMessageId& messa
}
}

bool BatchAcknowledgementTracker::isBatchReady(const BatchMessageId& msgID,
bool BatchAcknowledgementTracker::isBatchReady(const MessageId& msgID,
const proto::CommandAck_AckType ackType) {
Lock lock(mutex_);
TrackerMap::iterator pos = trackerMap_.find(msgID);
if (pos == trackerMap_.end() || std::find(sendList_.begin(), sendList_.end(), msgID) != sendList_.end()) {
// Remove batch index
MessageId batchMessageId = MessageId(msgID.partition(), msgID.ledgerId(), msgID.entryId(),
-1 /* Batch index */);

TrackerMap::iterator pos = trackerMap_.find(batchMessageId);
if (pos == trackerMap_.end() || std::find(sendList_.begin(), sendList_.end(), batchMessageId) != sendList_.end()) {
LOG_DEBUG(
"Batch is ready since message present in sendList_ or not present in trackerMap_ [message ID = "
<< msgID << "]");
<< batchMessageId << "]");
return true;
}

int batchIndex = msgID.batchIndex_;
int batchIndex = msgID.batchIndex();
assert(batchIndex < pos->second.size());
pos->second.set(batchIndex, false);

Expand All @@ -128,7 +132,7 @@ bool BatchAcknowledgementTracker::isBatchReady(const BatchMessageId& msgID,
if (pos->second.any()) {
return false;
}
sendList_.push_back(msgID);
sendList_.push_back(batchMessageId);
trackerMap_.erase(pos);
LOG_DEBUG("Batch is ready since message all bits are reset in trackerMap_ [message ID = " << msgID
<< "]");
Expand All @@ -138,22 +142,24 @@ bool BatchAcknowledgementTracker::isBatchReady(const BatchMessageId& msgID,
// returns
// - a batch message id < messageId
// - same messageId if it is the last message in the batch
const BatchMessageId BatchAcknowledgementTracker::getGreatestCumulativeAckReady(
const BatchMessageId& messageId) {
const MessageId BatchAcknowledgementTracker::getGreatestCumulativeAckReady(const MessageId& messageId) {
Lock lock(mutex_);
BatchMessageId messageReadyForCumulativeAck = BatchMessageId();
TrackerMap::iterator pos = trackerMap_.find(messageId);

// Remove batch index
MessageId batchMessageId = MessageId(messageId.partition(), messageId.ledgerId(),
messageId.entryId(), -1 /* Batch index */);
TrackerMap::iterator pos = trackerMap_.find(batchMessageId);

// element not found
if (pos == trackerMap_.end()) {
return BatchMessageId();
return MessageId();
}

if (pos->second.size() - 1 != messageId.batchIndex_) {
if (pos->second.size() - 1 != messageId.batchIndex()) {
// Can't cumulatively ack this batch message
if (pos == trackerMap_.begin()) {
// This was the first message hence we can't decrement the iterator
return BatchMessageId();
return MessageId();
}
pos--;
}
Expand Down
27 changes: 13 additions & 14 deletions pulsar-client-cpp/lib/BatchAcknowledgementTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#ifndef LIB_BATCHACKNOWLEDGEMENTTRACKER_H_
#define LIB_BATCHACKNOWLEDGEMENTTRACKER_H_

#include "pulsar/BatchMessageId.h"
#include "MessageImpl.h"
#include <map>
#include <boost/thread/mutex.hpp>
Expand All @@ -36,8 +35,8 @@ class ConsumerImpl;
class BatchAcknowledgementTracker {
private:
typedef boost::unique_lock<boost::mutex> Lock;
typedef std::pair<BatchMessageId, boost::dynamic_bitset<> > TrackerPair;
typedef std::map<BatchMessageId, boost::dynamic_bitset<> > TrackerMap;
typedef std::pair<MessageId, boost::dynamic_bitset<> > TrackerPair;
typedef std::map<MessageId, boost::dynamic_bitset<> > TrackerMap;
boost::mutex mutex_;

TrackerMap trackerMap_;
Expand All @@ -48,20 +47,20 @@ class BatchAcknowledgementTracker {
// batch index
// is acked again, we just check the sendList to verify that the batch is acked w/o iterating over the
// dynamic_bitset.
std::vector<BatchMessageId> sendList_;
std::vector<MessageId> sendList_;

// we don't need to track MessageId < greatestCumulativeAckReceived
BatchMessageId greatestCumulativeAckSent_;
MessageId greatestCumulativeAckSent_;
std::string name_;

public:
BatchAcknowledgementTracker(const std::string topic, const std::string subscription,
const long consumerId);

bool isBatchReady(const BatchMessageId& msgID, const proto::CommandAck_AckType ackType);
const BatchMessageId getGreatestCumulativeAckReady(const BatchMessageId& messageId);
bool isBatchReady(const MessageId& msgID, const proto::CommandAck_AckType ackType);
const MessageId getGreatestCumulativeAckReady(const MessageId& messageId);

void deleteAckedMessage(const BatchMessageId& messageId, proto::CommandAck_AckType ackType);
void deleteAckedMessage(const MessageId& messageId, proto::CommandAck_AckType ackType);
void receivedMessage(const Message& message);

void clear();
Expand All @@ -72,23 +71,23 @@ class BatchAcknowledgementTracker {
// Used for Cumulative acks only
struct SendRemoveCriteria {
private:
const BatchMessageId& messageId_;
const MessageId& messageId_;

public:
SendRemoveCriteria(const BatchMessageId& messageId) : messageId_(messageId) {}
SendRemoveCriteria(const MessageId& messageId) : messageId_(messageId) {}

bool operator()(const BatchMessageId& element) const { return (element <= messageId_); }
bool operator()(const MessageId& element) const { return (element <= messageId_); }
};

// Used for Cumulative acks only
struct TrackerMapRemoveCriteria {
private:
const BatchMessageId& messageId_;
const MessageId& messageId_;

public:
TrackerMapRemoveCriteria(const BatchMessageId& messageId) : messageId_(messageId) {}
TrackerMapRemoveCriteria(const MessageId& messageId) : messageId_(messageId) {}

bool operator()(std::pair<const pulsar::BatchMessageId, boost::dynamic_bitset<> >& element) const {
bool operator()(std::pair<const MessageId, boost::dynamic_bitset<> >& element) const {
return (element.first <= messageId_);
}
};
Expand Down
Loading

0 comments on commit 5efafc5

Please sign in to comment.