Skip to content

Commit

Permalink
[C++] Support key based batching (apache#7996)
Browse files Browse the repository at this point in the history
### Motivation

Support key based batching for C++ client. This is a catch up work on apache#4435.

In addition, currently the implementation of `BatchMessageContainer` is coupling to `ProducerImpl` tightly. The  batch message container registers a timer to producer's executor and the timeout callback is also producer's method. Even its `add` method could call `sendMessage` to send batch to producer's pending queue. These should be producer's work.

### Modifications

- Add a `MessageAndCallbackBatch` to store a `MessageImpl` of serialized single messages and a callback list.
- Add a `BatchMessageContainerBase` to provide interface methods and methods like update/clear message number/bytes, create `OpSendMsg`.
- Let `ProducerImpl` manage the batch timer and determine whether to create `OpSendMsg` from `BatchMessageContainerBase` and send.
- Make `BatchMessageContainer` inherit `BatchMessageContainerBase`, it only manages a `MessageAndCallbackBatch`.
- Add a `BatchMessageKeyBasedContainer` that inherits `BatchMessageContainerBase`, it manages a map of message key and `MessageAndCallbackBatch`.
- Add a producer config to change batching type.
- Add some units tests for key based batching and a unit test for key shared subscription.

### Verifying this change
This change added tests and can be verified as follows: 

  - Current tests affected by default batching type, like `BatchMessageTest.*` and `BasicEndToEndTest.*`
  - Newly added tests: `KeyBasedBatchingTest.*` and `KeySharedConsumerTest.testKeyBasedBatching`



* Change ClientConnection::getMaxMessageSize() to static method

* Refactor the BatchMessageContainer

* Add batching type to producer config

* Fix testBigMessageSizeBatching error

* Add key based batching container

* Make batching type config work

* Add unit tests for key based batching and key shared subscription

* Fix flush test error possibly caused by timeout overflow

* Make BatchMessageKeyBasedContainer work for a single batch
  • Loading branch information
BewareMyPower authored Sep 9, 2020
1 parent 7154185 commit e7c4074
Show file tree
Hide file tree
Showing 26 changed files with 1,182 additions and 381 deletions.
2 changes: 1 addition & 1 deletion pulsar-client-cpp/include/pulsar/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class PULSAR_PUBLIC Message {
friend class ConsumerImpl;
friend class ProducerImpl;
friend class Commands;
friend class BatchMessageContainer;
friend class BatchMessageContainerBase;
friend class BatchAcknowledgementTracker;
friend class PulsarWrapper;
friend class MessageBatch;
Expand Down
32 changes: 31 additions & 1 deletion pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,30 @@ class PULSAR_PUBLIC ProducerConfiguration {
BoostHash,
JavaStringHash
};
enum BatchingType
{
/**
* Default batching.
*
* <p>incoming single messages:
* (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
*
* <p>batched into single batch message:
* [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]
*/
DefaultBatching,

/**
* Key based batching.
*
* <p>incoming single messages:
* (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
*
* <p>batched into single batch message:
* [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]
*/
KeyBasedBatching
};

ProducerConfiguration();
~ProducerConfiguration();
Expand Down Expand Up @@ -152,13 +176,19 @@ class PULSAR_PUBLIC ProducerConfiguration {
ProducerConfiguration& setBatchingMaxPublishDelayMs(const unsigned long& batchingMaxPublishDelayMs);
const unsigned long& getBatchingMaxPublishDelayMs() const;

/**
* @see BatchingType
*/
ProducerConfiguration& setBatchingType(BatchingType batchingType);
BatchingType getBatchingType() const;

const CryptoKeyReaderPtr getCryptoKeyReader() const;
ProducerConfiguration& setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader);

ProducerCryptoFailureAction getCryptoFailureAction() const;
ProducerConfiguration& setCryptoFailureAction(ProducerCryptoFailureAction action);

std::set<std::string>& getEncryptionKeys();
const std::set<std::string>& getEncryptionKeys() const;
bool isEncryptionEnabled() const;
ProducerConfiguration& addEncryptionKey(std::string key);

Expand Down
201 changes: 42 additions & 159 deletions pulsar-client-cpp/lib/BatchMessageContainer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,180 +17,63 @@
* under the License.
*/
#include "BatchMessageContainer.h"
#include <memory>
#include <functional>
#include "ClientConnection.h"
#include "Commands.h"
#include "LogUtils.h"
#include "MessageImpl.h"
#include "ProducerImpl.h"
#include "TimeUtils.h"
#include <stdexcept>

namespace pulsar {

static ObjectPool<MessageImpl, 1000> messagePool;
static ObjectPool<BatchMessageContainer::MessageContainerList, 1000> messageContainerListPool;
DECLARE_LOG_OBJECT()

BatchMessageContainer::BatchMessageContainer(ProducerImpl& producer)
: maxAllowedNumMessagesInBatch_(producer.conf_.getBatchingMaxMessages()),
maxAllowedMessageBatchSizeInBytes_(producer.conf_.getBatchingMaxAllowedSizeInBytes()),
topicName_(producer.topic_),
producerName_(producer.producerName_),
compressionType_(producer.conf_.getCompressionType()),
producer_(producer),
impl_(messagePool.create()),
timer_(producer.executor_->createDeadlineTimer()),
batchSizeInBytes_(0),
messagesContainerListPtr_(messageContainerListPool.create()),
averageBatchSize_(0),
numberOfBatchesSent_(0) {
messagesContainerListPtr_->reserve(1000);
LOG_INFO(*this << " BatchMessageContainer constructed");
}

bool BatchMessageContainer::add(const Message& msg, SendCallback sendCallback, bool disableCheck) {
// disableCheck is needed to avoid recursion in case the batchSizeInKB < IndividualMessageSizeInKB
LOG_DEBUG(*this << " Called add function for [message = " << msg << "] [disableCheck = " << disableCheck
<< "]");
if (!(disableCheck || hasSpaceInBatch(msg))) {
LOG_DEBUG(*this << " Batch is full");
bool hasMessages = !messagesContainerListPtr_->empty();
bool pushedToPendingQueue = sendMessage(NULL);
bool result = add(msg, sendCallback, true);
if (hasMessages && !pushedToPendingQueue) {
// The msg failed to be pushed to the producer's queue, so the reserved spot before won't be
// released and we must return false to tell the producer to release the spot.
// Exceptionally, `hasSpaceInBatch` returns false just because `msg` is too big before compressed,
// while there're no messages before. In this case, the spots have already been released so we
// can't return false simply.
return false;
}
return result;
}
if (messagesContainerListPtr_->empty()) {
// First message to be added
startTimer();
Commands::initBatchMessageMetadata(msg, impl_->metadata);
// TODO - add this to Commands.cc
impl_->metadata.set_producer_name(producerName_);
}
batchSizeInBytes_ += msg.impl_->payload.readableBytes();

LOG_DEBUG(*this << " Before serialization payload size in bytes = " << impl_->payload.readableBytes());
Commands::serializeSingleMessageInBatchWithPayload(msg, impl_->payload,
maxAllowedMessageBatchSizeInBytes_);
LOG_DEBUG(*this << " After serialization payload size in bytes = " << impl_->payload.readableBytes());
namespace pulsar {

messagesContainerListPtr_->emplace_back(msg, sendCallback);
BatchMessageContainer::BatchMessageContainer(const ProducerImpl& producer)
: BatchMessageContainerBase(producer) {}

LOG_DEBUG(*this << " Number of messages in Batch = " << messagesContainerListPtr_->size());
LOG_DEBUG(*this << " Batch Payload Size In Bytes = " << batchSizeInBytes_);
bool hasOnlyOneMessage = (messagesContainerListPtr_->size() == 1);
if (isFull()) {
LOG_DEBUG(*this << " Batch is full.");
// If there're more than one messages in the batch, even if it was pushed to the queue successfully,
// we also returns false to release one spot, because there're two spots to be released. One is
// reserved when the first message arrived, another is reserved when the current message arrived.
bool pushedToPendingQueue = sendMessage(NULL);
return hasOnlyOneMessage && pushedToPendingQueue;
}
// A batch of messages only need one spot, so returns false when more messages were added to the batch,
// then outer ProducerImpl::sendAsync() will release unnecessary reserved spots
return hasOnlyOneMessage;
BatchMessageContainer::~BatchMessageContainer() {
LOG_DEBUG(*this << " destructed");
LOG_INFO("[numberOfBatchesSent = " << numberOfBatchesSent_
<< "] [averageBatchSize_ = " << averageBatchSize_ << "]");
}

void BatchMessageContainer::startTimer() {
const unsigned long& publishDelayInMs = producer_.conf_.getBatchingMaxPublishDelayMs();
LOG_DEBUG(*this << " Timer started with expiry after " << publishDelayInMs);
timer_->expires_from_now(boost::posix_time::milliseconds(publishDelayInMs));
timer_->async_wait(
std::bind(&pulsar::ProducerImpl::batchMessageTimeoutHandler, &producer_, std::placeholders::_1));
bool BatchMessageContainer::add(const Message& msg, const SendCallback& callback) {
LOG_DEBUG("Before add: " << *this << " [message = " << msg << "]");
batch_.add(msg, callback);
updateStats(msg);
LOG_DEBUG("After add: " << *this);
return isFull();
}

bool BatchMessageContainer::sendMessage(FlushCallback flushCallback) {
// Call this function after acquiring the ProducerImpl lock
LOG_DEBUG(*this << "Sending the batch message container");
if (isEmpty()) {
LOG_DEBUG(*this << " Batch is empty - returning.");
if (flushCallback) {
flushCallback(ResultOk);
}
return false;
}
impl_->metadata.set_num_messages_in_batch(messagesContainerListPtr_->size());
compressPayLoad();

SharedBuffer encryptedPayload;
if (!producer_.encryptMessage(impl_->metadata, impl_->payload, encryptedPayload)) {
batchMessageCallBack(ResultCryptoError, MessageId{}, messagesContainerListPtr_, nullptr);
clear();
return false;
}
impl_->payload = encryptedPayload;

if (impl_->payload.readableBytes() > producer_.keepMaxMessageSize_) {
// At this point the compressed batch is above the overall MaxMessageSize. There
// can only 1 single message in the batch at this point.
batchMessageCallBack(ResultMessageTooBig, MessageId{}, messagesContainerListPtr_, nullptr);
clear();
return false;
}

Message msg;
msg.impl_ = impl_;

// bind keeps a copy of the parameters
SendCallback callback = std::bind(&BatchMessageContainer::batchMessageCallBack, std::placeholders::_1,
std::placeholders::_2, messagesContainerListPtr_, flushCallback);

producer_.sendMessage(msg, callback);
clear();
return true;
void BatchMessageContainer::clear() {
averageBatchSize_ =
(batch_.size() + averageBatchSize_ * numberOfBatchesSent_) / (numberOfBatchesSent_ + 1);
numberOfBatchesSent_++;
batch_.clear();
resetStats();
LOG_DEBUG(*this << " clear() called");
}

void BatchMessageContainer::compressPayLoad() {
if (compressionType_ != CompressionNone) {
impl_->metadata.set_compression(CompressionCodecProvider::convertType(compressionType_));
impl_->metadata.set_uncompressed_size(impl_->payload.readableBytes());
}
impl_->payload = CompressionCodecProvider::getCodec(compressionType_).encode(impl_->payload);
Result BatchMessageContainer::createOpSendMsg(OpSendMsg& opSendMsg,
const FlushCallback& flushCallback) const {
return createOpSendMsgHelper(opSendMsg, flushCallback, batch_);
}

SharedBuffer BatchMessageContainer::getBatchedPayload() { return impl_->payload; }

void BatchMessageContainer::clear() {
LOG_DEBUG(*this << " BatchMessageContainer::clear() called");
timer_->cancel();
averageBatchSize_ = (messagesContainerListPtr_->size() + (averageBatchSize_ * numberOfBatchesSent_)) /
(numberOfBatchesSent_ + 1);
numberOfBatchesSent_++;
messagesContainerListPtr_ = messageContainerListPool.create();
// Try to optimize this
messagesContainerListPtr_->reserve(10000);
impl_ = messagePool.create();
batchSizeInBytes_ = 0;
std::vector<Result> BatchMessageContainer::createOpSendMsgs(std::vector<OpSendMsg>& opSendMsgs,
const FlushCallback& flushCallback) const {
throw std::runtime_error("createOpSendMsgs is not supported for BatchMessageContainer");
}

void BatchMessageContainer::batchMessageCallBack(Result r, const MessageId& messageId,
MessageContainerListPtr messagesContainerListPtr,
FlushCallback flushCallback) {
if (!messagesContainerListPtr) {
if (flushCallback) {
flushCallback(ResultOk);
}
return;
}
LOG_DEBUG("BatchMessageContainer::batchMessageCallBack called with [Result = "
<< r << "] [numOfMessages = " << messagesContainerListPtr->size() << "]");
size_t batch_size = messagesContainerListPtr->size();
for (size_t i = 0; i < batch_size; i++) {
MessageId messageIdInBatch(messageId.partition(), messageId.ledgerId(), messageId.entryId(), i);
messagesContainerListPtr->operator[](i).callBack(r, messageIdInBatch);
}
if (flushCallback) {
flushCallback(ResultOk);
}
void BatchMessageContainer::serialize(std::ostream& os) const {
os << "{ BatchMessageContainer [size = " << numMessages_ //
<< "] [bytes = " << sizeInBytes_ //
<< "] [maxSize = " << getMaxNumMessages() //
<< "] [maxBytes = " << getMaxSizeInBytes() //
<< "] [topicName = " << topicName_ //
<< "] [numberOfBatchesSent_ = " << numberOfBatchesSent_ //
<< "] [averageBatchSize_ = " << averageBatchSize_ //
<< "] }";
}

BatchMessageContainer::~BatchMessageContainer() {
timer_->cancel();
LOG_DEBUG(*this << " BatchMessageContainer Object destructed");
LOG_INFO("[numberOfBatchesSent = " << numberOfBatchesSent_
<< "] [averageBatchSize = " << averageBatchSize_ << "]");
}
} // namespace pulsar
Loading

0 comments on commit e7c4074

Please sign in to comment.