Skip to content

Commit

Permalink
add producer flush method in cpp client (apache#3020)
Browse files Browse the repository at this point in the history
### Motivation

We already have flush() method in java api: http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/Producer.html#flush--
It is better to provide a same method for cpp client.

### Modifications

Add related methods in cpp client.
Add related tests.

### Result

Cpp unit tests pass.
  • Loading branch information
jiazhai authored and sijie committed Nov 20, 2018
1 parent 5fde6f3 commit b982b5d
Show file tree
Hide file tree
Showing 10 changed files with 320 additions and 12 deletions.
15 changes: 15 additions & 0 deletions pulsar-client-cpp/include/pulsar/Producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ namespace pulsar {
class ProducerImplBase;
class PulsarWrapper;
class PulsarFriend;

typedef boost::function<void(Result)> FlushCallback;

class Producer {
public:
/**
Expand Down Expand Up @@ -80,6 +83,18 @@ class Producer {
*/
void sendAsync(const Message& msg, SendCallback callback);

/**
* Flush all the messages buffered in the client and wait until all messages have been successfully
* persisted.
*/
Result flush();

/**
* Flush all the messages buffered in the client and wait until all messages have been successfully
* persisted.
*/
void flushAsync(FlushCallback callback);

/**
* Get the last sequence id that was published by this producer.
*
Expand Down
22 changes: 16 additions & 6 deletions pulsar-client-cpp/lib/BatchMessageContainer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void BatchMessageContainer::add(const Message& msg, SendCallback sendCallback, b
<< "]");
if (!(disableCheck || hasSpaceInBatch(msg))) {
LOG_DEBUG(*this << " Batch is full");
sendMessage();
sendMessage(NULL);
add(msg, sendCallback, true);
return;
}
Expand All @@ -71,7 +71,7 @@ void BatchMessageContainer::add(const Message& msg, SendCallback sendCallback, b
LOG_DEBUG(*this << " Batch Payload Size In Bytes = " << batchSizeInBytes_);
if (isFull()) {
LOG_DEBUG(*this << " Batch is full.");
sendMessage();
sendMessage(NULL);
}
}

Expand All @@ -83,11 +83,14 @@ void BatchMessageContainer::startTimer() {
boost::asio::placeholders::error));
}

void BatchMessageContainer::sendMessage() {
void BatchMessageContainer::sendMessage(FlushCallback flushCallback) {
// Call this function after acquiring the ProducerImpl lock
LOG_DEBUG(*this << "Sending the batch message container");
if (isEmpty()) {
LOG_DEBUG(*this << " Batch is empty - returning.");
if (flushCallback) {
flushCallback(ResultOk);
}
return;
}
impl_->metadata.set_num_messages_in_batch(messagesContainerListPtr_->size());
Expand All @@ -101,8 +104,8 @@ void BatchMessageContainer::sendMessage() {
msg.impl_ = impl_;

// bind keeps a copy of the parameters
SendCallback callback =
boost::bind(&BatchMessageContainer::batchMessageCallBack, _1, messagesContainerListPtr_);
SendCallback callback = boost::bind(&BatchMessageContainer::batchMessageCallBack, _1,
messagesContainerListPtr_, flushCallback);

producer_.sendMessage(msg, callback);
clear();
Expand Down Expand Up @@ -131,8 +134,12 @@ void BatchMessageContainer::clear() {
batchSizeInBytes_ = 0;
}

void BatchMessageContainer::batchMessageCallBack(Result r, MessageContainerListPtr messagesContainerListPtr) {
void BatchMessageContainer::batchMessageCallBack(Result r, MessageContainerListPtr messagesContainerListPtr,
FlushCallback flushCallback) {
if (!messagesContainerListPtr) {
if (flushCallback) {
flushCallback(ResultOk);
}
return;
}
LOG_DEBUG("BatchMessageContainer::batchMessageCallBack called with [Result = "
Expand All @@ -142,6 +149,9 @@ void BatchMessageContainer::batchMessageCallBack(Result r, MessageContainerListP
// callback(result, message)
iter->sendCallback_(r, iter->message_);
}
if (flushCallback) {
flushCallback(ResultOk);
}
}

BatchMessageContainer::~BatchMessageContainer() {
Expand Down
4 changes: 2 additions & 2 deletions pulsar-client-cpp/lib/BatchMessageContainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class BatchMessageContainer {

void clear();

static void batchMessageCallBack(Result r, MessageContainerListPtr messages);
static void batchMessageCallBack(Result r, MessageContainerListPtr messages, FlushCallback callback);

friend inline std::ostream& operator<<(std::ostream& os,
const BatchMessageContainer& batchMessageContainer);
Expand Down Expand Up @@ -108,7 +108,7 @@ class BatchMessageContainer {

void startTimer();

void sendMessage();
void sendMessage(FlushCallback callback);
};

bool BatchMessageContainer::hasSpaceInBatch(const Message& msg) const {
Expand Down
33 changes: 33 additions & 0 deletions pulsar-client-cpp/lib/PartitionedProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -241,4 +241,37 @@ void PartitionedProducerImpl::triggerFlush() {
}
}

void PartitionedProducerImpl::flushAsync(FlushCallback callback) {
if (!flushPromise_ || flushPromise_->isComplete()) {
flushPromise_ = boost::make_shared<Promise<Result, bool_type>>();
} else {
// already in flushing, register a listener callback
boost::function<void(Result, bool)> listenerCallback = [this, callback](Result result, bool_type v) {
if (v) {
callback(ResultOk);
} else {
callback(ResultUnknownError);
}
return;
};

flushPromise_->getFuture().addListener(listenerCallback);
return;
}

FlushCallback subFlushCallback = [this, callback](Result result) {
int previous = flushedPartitions_.fetch_add(1);
if (previous == producers_.size() - 1) {
flushedPartitions_.store(0);
flushPromise_->setValue(true);
callback(result);
}
return;
};

for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) {
(*prod)->flushAsync(subFlushCallback);
}
}

} // namespace pulsar
5 changes: 5 additions & 0 deletions pulsar-client-cpp/lib/PartitionedProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class PartitionedProducerImpl : public ProducerImplBase,

virtual void triggerFlush();

virtual void flushAsync(FlushCallback callback);

void handleSinglePartitionProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr,
const unsigned int partitionIndex);

Expand Down Expand Up @@ -115,6 +117,9 @@ class PartitionedProducerImpl : public ProducerImplBase,
Promise<Result, ProducerImplBaseWeakPtr> partitionedProducerCreatedPromise_;

MessageRoutingPolicyPtr getMessageRouter();

std::atomic<int> flushedPartitions_;
boost::shared_ptr<Promise<Result, bool_type>> flushPromise_;
};

} // namespace pulsar
18 changes: 18 additions & 0 deletions pulsar-client-cpp/lib/Producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,22 @@ void Producer::closeAsync(CloseCallback callback) {

impl_->closeAsync(callback);
}

Result Producer::flush() {
Promise<bool, Result> promise;
flushAsync(WaitForCallback(promise));

Result result;
promise.getFuture().get(result);
return result;
}

void Producer::flushAsync(FlushCallback callback) {
if (!impl_) {
callback(ResultProducerNotInitialized);
return;
}

impl_->flushAsync(callback);
}
} // namespace pulsar
41 changes: 37 additions & 4 deletions pulsar-client-cpp/lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ void ProducerImpl::failPendingMessages(Result result) {
}

// this function can handle null pointer
BatchMessageContainer::batchMessageCallBack(ResultTimeout, messageContainerListPtr);
BatchMessageContainer::batchMessageCallBack(ResultTimeout, messageContainerListPtr, NULL);
}

void ProducerImpl::resendMessages(ClientConnectionPtr cnx) {
Expand Down Expand Up @@ -285,10 +285,43 @@ void ProducerImpl::statsCallBackHandler(Result res, const Message& msg, SendCall
}
}

void ProducerImpl::flushAsync(FlushCallback callback) {
if (batchMessageContainer) {
if (!flushPromise_ || flushPromise_->isComplete()) {
flushPromise_ = boost::make_shared<Promise<Result, bool_type>>();
} else {
// already in flushing, register a listener callback
boost::function<void(Result, bool)> listenerCallback = [this, callback](Result result,
bool_type v) {
if (v) {
callback(ResultOk);
} else {
callback(ResultUnknownError);
}
return;
};

flushPromise_->getFuture().addListener(listenerCallback);
return;
}

FlushCallback innerCallback = [this, callback](Result result) {
flushPromise_->setValue(true);
callback(result);
return;
};

Lock lock(mutex_);
batchMessageContainer->sendMessage(innerCallback);
} else {
callback(ResultOk);
}
}

void ProducerImpl::triggerFlush() {
if (batchMessageContainer) {
Lock lock(mutex_);
batchMessageContainer->sendMessage();
batchMessageContainer->sendMessage(NULL);
}
}

Expand Down Expand Up @@ -364,7 +397,7 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
// If queue is full sending the batch immediately, no point waiting till batchMessagetimeout
if (batchMessageContainer) {
LOG_DEBUG(getName() << " - sending batch message immediately");
batchMessageContainer->sendMessage();
batchMessageContainer->sendMessage(NULL);
}
lock.unlock();
cb(ResultProducerQueueIsFull, msg);
Expand Down Expand Up @@ -412,7 +445,7 @@ void ProducerImpl::batchMessageTimeoutHandler(const boost::system::error_code& e
}
LOG_DEBUG(getName() << " - Batch Message Timer expired");
Lock lock(mutex_);
batchMessageContainer->sendMessage();
batchMessageContainer->sendMessage(NULL);
}

void ProducerImpl::printStats() {
Expand Down
4 changes: 4 additions & 0 deletions pulsar-client-cpp/lib/ProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
using namespace pulsar;

namespace pulsar {
typedef bool bool_type;

class BatchMessageContainer;

Expand Down Expand Up @@ -90,6 +91,8 @@ class ProducerImpl : public HandlerBase,

virtual void triggerFlush();

virtual void flushAsync(FlushCallback callback);

protected:
ProducerStatsBasePtr producerStatsBasePtr_;

Expand Down Expand Up @@ -156,6 +159,7 @@ class ProducerImpl : public HandlerBase,
MessageCryptoPtr msgCrypto_;
DeadlineTimerPtr dataKeyGenTImer_;
uint32_t dataKeyGenIntervalSec_;
boost::shared_ptr<Promise<Result, bool_type>> flushPromise_;
};

struct ProducerImplCmp {
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/ProducerImplBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class ProducerImplBase {
virtual const std::string& getTopic() const = 0;
virtual Future<Result, ProducerImplBaseWeakPtr> getProducerCreatedFuture() = 0;
virtual void triggerFlush() = 0;
virtual void flushAsync(FlushCallback callback) = 0;
};
} // namespace pulsar
#endif // PULSAR_PRODUCER_IMPL_BASE_HEADER
Loading

0 comments on commit b982b5d

Please sign in to comment.