Skip to content

Commit

Permalink
Cpp producer sequence id changes (apache#763)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Sep 18, 2017
1 parent 1f7712c commit 824922a
Show file tree
Hide file tree
Showing 23 changed files with 463 additions and 18 deletions.
18 changes: 18 additions & 0 deletions pulsar-client-cpp/include/pulsar/MessageBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,24 @@ class MessageBuilder {
*/
MessageBuilder& setEventTimestamp(uint64_t eventTimestamp);

/**
* Specify a custom sequence id for the message being published.
* <p>
* The sequence id can be used for deduplication purposes and it needs to follow these rules:
* <ol>
* <li><code>sequenceId >= 0</code>
* <li>Sequence id for a message needs to be greater than sequence id for earlier messages:
* <code>sequenceId(N+1) > sequenceId(N)</code>
* <li>It's not necessary for sequence ids to be consecutive. There can be holes between messages. Eg. the
* <code>sequenceId</code> could represent an offset or a cumulative size.
* </ol>
*
* @param sequenceId
* the sequence id to assign to the current message
* @since 1.20.0
*/
MessageBuilder& setSequenceId(int64_t sequenceId);

/**
* override namespace replication clusters. note that it is the
* caller's responsibility to provide valid cluster names, and that
Expand Down
18 changes: 18 additions & 0 deletions pulsar-client-cpp/include/pulsar/Producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ class Producer {
*/
const std::string& getTopic() const;

/**
* @return the producer name which could have been assigned by the system or specified by the client
*/
const std::string& getProducerName() const;

/**
* Publish a message on the topic associated with this Producer.
*
Expand Down Expand Up @@ -75,6 +80,19 @@ class Producer {
*/
void sendAsync(const Message& msg, SendCallback callback);

/**
* Get the last sequence id that was published by this producer.
*
* This represent either the automatically assigned or custom sequence id (set on the MessageBuilder) that
* was published and acknowledged by the broker.
*
* After recreating a producer with the same producer name, this will return the last message that was published in
* the previous producer session, or -1 if there no message was ever published.
*
* @return the last sequence id published by this producer
*/
int64_t getLastSequenceId() const;

/**
* Close the producer and release resources allocated.
*
Expand Down
6 changes: 6 additions & 0 deletions pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,15 @@ class ProducerConfiguration {
ProducerConfiguration(const ProducerConfiguration&);
ProducerConfiguration& operator=(const ProducerConfiguration&);

ProducerConfiguration& setProducerName(const std::string& producerName);
const std::string& getProducerName() const;

ProducerConfiguration& setSendTimeout(int sendTimeoutMs);
int getSendTimeout() const;

ProducerConfiguration& setInitialSequenceId(int64_t initialSequenceId);
int64_t getInitialSequenceId() const;

ProducerConfiguration& setCompressionType(CompressionType compressionType);
CompressionType getCompressionType() const;

Expand Down
9 changes: 5 additions & 4 deletions pulsar-client-cpp/lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ void ClientConnection::handleIncomingCommand() {
pendingRequests_.erase(it);
lock.unlock();

requestData.promise.setValue("");
requestData.promise.setValue({"", -1});
requestData.timer->cancel();
}
break;
Expand Down Expand Up @@ -825,7 +825,8 @@ void ClientConnection::handleIncomingCommand() {
pendingRequests_.erase(it);
lock.unlock();

requestData.promise.setValue(producerSuccess.producer_name());
requestData.promise.setValue(
{ producerSuccess.producer_name(), producerSuccess.last_sequence_id() });
requestData.timer->cancel();
}
break;
Expand Down Expand Up @@ -1061,12 +1062,12 @@ void ClientConnection::sendPendingCommands() {
}
}

Future<Result, std::string> ClientConnection::sendRequestWithId(SharedBuffer cmd, int requestId) {
Future<Result, ResponseData> ClientConnection::sendRequestWithId(SharedBuffer cmd, int requestId) {
Lock lock(mutex_);

if (isClosed()) {
lock.unlock();
Promise<Result, std::string> promise;
Promise<Result, ResponseData> promise;
promise.setFailed(ResultNotConnected);
return promise.getFuture();
}
Expand Down
6 changes: 4 additions & 2 deletions pulsar-client-cpp/lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ class LookupDataResult;

struct OpSendMsg;

typedef std::pair<std::string, int64_t> ResponseData;

class ClientConnection : public boost::enable_shared_from_this<ClientConnection> {
enum State {
Pending,
Expand Down Expand Up @@ -125,7 +127,7 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection>
* Send a request with a specific Id over the connection. The future will be
* triggered when the response for this request is received
*/
Future<Result, std::string> sendRequestWithId(SharedBuffer cmd, int requestId);
Future<Result, ResponseData> sendRequestWithId(SharedBuffer cmd, int requestId);

const std::string& brokerAddress() const;

Expand All @@ -138,7 +140,7 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection>
Future<Result, BrokerConsumerStatsImpl> newConsumerStats(uint64_t consumerId, uint64_t requestId) ;
private:
struct PendingRequestData {
Promise<Result, std::string> promise;
Promise<Result, ResponseData> promise;
DeadlineTimerPtr timer;
};

Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
// Lock is no longer required
lock.unlock();
int requestId = client->newRequestId();
Future<Result, std::string> future = cnx->sendRequestWithId(
Future<Result, ResponseData> future = cnx->sendRequestWithId(
Commands::newCloseConsumer(consumerId_, requestId), requestId);
if (!callback.empty()) {
future.addListener(
Expand Down
9 changes: 9 additions & 0 deletions pulsar-client-cpp/lib/MessageBuilder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ MessageBuilder& MessageBuilder::setEventTimestamp(uint64_t eventTimestamp) {
return *this;
}

MessageBuilder& MessageBuilder::setSequenceId(int64_t sequenceId) {
if (sequenceId < 0) {
throw "sequenceId needs to be >= 0";
}
checkMetadata();
impl_->metadata.set_sequence_id(sequenceId);
return *this;
}

MessageBuilder& MessageBuilder::setReplicationClusters(const std::vector<std::string>& clusters) {
checkMetadata();
google::protobuf::RepeatedPtrField<std::string> r(clusters.begin(), clusters.end());
Expand Down
13 changes: 13 additions & 0 deletions pulsar-client-cpp/lib/PartitionedProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,19 @@ namespace pulsar {
lock.unlock();
}

const std::string& PartitionedProducerImpl::getProducerName() const {
return producers_[0]->getProducerName();
}

int64_t PartitionedProducerImpl::getLastSequenceId() const {
int64_t currentMax = -1L;
for (int i = 0; i < producers_.size(); i++) {
currentMax = std::max(currentMax, producers_[i]->getLastSequenceId());
}

return currentMax;
}

/*
* if createProducerCallback is set, it means the closeAsync is called from CreateProducer API which failed to create
* one or many producers for partitions. So, we have to notify with ERROR on createProducerFailure
Expand Down
4 changes: 4 additions & 0 deletions pulsar-client-cpp/lib/PartitionedProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ namespace pulsar {
*/
virtual void closeAsync(CloseCallback closeCallback);

virtual const std::string& getProducerName() const;

virtual int64_t getLastSequenceId() const;

virtual void start();

virtual void shutdown();
Expand Down
8 changes: 8 additions & 0 deletions pulsar-client-cpp/lib/Producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ void Producer::sendAsync(const Message& msg, SendCallback callback) {
impl_->sendAsync(msg, callback);
}

const std::string& Producer::getProducerName() const {
return impl_->getProducerName();
}

int64_t Producer::getLastSequenceId() const {
return impl_->getLastSequenceId();
}

Result Producer::close() {
Promise<bool, Result> promise;
closeAsync(WaitForCallback(promise));
Expand Down
19 changes: 19 additions & 0 deletions pulsar-client-cpp/lib/ProducerConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,25 @@ ProducerConfiguration& ProducerConfiguration::operator=(const ProducerConfigurat
return *this;
}

ProducerConfiguration& ProducerConfiguration::setProducerName(const std::string& producerName) {
impl_->producerName = Optional<std::string>::of(producerName);
return *this;
}

const std::string& ProducerConfiguration::getProducerName() const {
static const std::string emptyString;
return impl_->producerName.is_present() ? impl_->producerName.value() : emptyString;
}

ProducerConfiguration& ProducerConfiguration::setInitialSequenceId(int64_t initialSequenceId) {
impl_->initialSequenceId = Optional<int64_t>::of(initialSequenceId);
return *this;
}

int64_t ProducerConfiguration::getInitialSequenceId() const {
return impl_->initialSequenceId.is_present() ? impl_->initialSequenceId.value() : -1ll;
}

ProducerConfiguration& ProducerConfiguration::setSendTimeout(int sendTimeoutMs) {
impl_->sendTimeoutMs = sendTimeoutMs;
return *this;
Expand Down
4 changes: 4 additions & 0 deletions pulsar-client-cpp/lib/ProducerConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@
#include <pulsar/ProducerConfiguration.h>
#include <boost/make_shared.hpp>

#include "Utils.h"

namespace pulsar {

struct ProducerConfigurationImpl {
Optional<std::string> producerName;
Optional<int64_t> initialSequenceId;
int sendTimeoutMs;
CompressionType compressionType;
int maxPendingMessages;
Expand Down
36 changes: 33 additions & 3 deletions pulsar-client-cpp/lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,18 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic,
conf_(conf),
executor_(client->getIOExecutorProvider()->get()),
pendingMessagesQueue_(conf_.getMaxPendingMessages()),
producerName_(conf_.getProducerName()),
producerStr_("[" + topic_ + ", " + producerName_ + "] "),
producerId_(client->newProducerId()),
msgSequenceGenerator_(0),
sendTimer_() {
LOG_DEBUG(
"ProducerName - " << producerName_ << " Created producer on topic " << topic_ << " id: " << producerId_);

int64_t initialSequenceId = conf.getInitialSequenceId();
lastSequenceIdPublished_ = initialSequenceId;
msgSequenceGenerator_ = initialSequenceId + 1;

// boost::ref is used to drop the constantness constraint of make_shared
if (conf_.getBatchingEnabled()) {
batchMessageContainer = boost::make_shared<BatchMessageContainer>(boost::ref(*this));
Expand All @@ -86,6 +92,14 @@ const std::string& ProducerImpl::getTopic() const {
return topic_;
}

const std::string& ProducerImpl::getProducerName() const {
return producerName_;
}

int64_t ProducerImpl::getLastSequenceId() const {
return lastSequenceIdPublished_;
}

void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
Lock lock(mutex_);
if (state_ == Closed) {
Expand Down Expand Up @@ -114,12 +128,14 @@ void ProducerImpl::connectionFailed(Result result) {
}

void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
const std::string& producerName) {
const ResponseData& responseData) {
LOG_DEBUG(getName() << "ProducerImpl::handleCreateProducer res: " << strResult(result));

if (result == ResultOk) {
// We are now reconnected to broker and clear to send messages. Re-send all pending messages and
// set the cnx pointer so that new messages will be sent immediately
const std::string& producerName = responseData.first;
int64_t lastSequenceId = responseData.second;
LOG_INFO(getName() << "Created producer on broker " << cnx->cnxString());

Lock lock(mutex_);
Expand All @@ -129,6 +145,11 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
if (batchMessageContainer) {
batchMessageContainer->producerName_ = producerName_;
}

if (lastSequenceIdPublished_ == -1 && conf_.getInitialSequenceId() == -1) {
lastSequenceIdPublished_ = lastSequenceId;
msgSequenceGenerator_ = lastSequenceIdPublished_ + 1;
}
resendMessages(cnx);
connection_ = cnx;
state_ = Ready;
Expand Down Expand Up @@ -297,7 +318,13 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
return;
}

setMessageMetadata(msg, msgSequenceGenerator_++, uncompressedSize);
int64_t sequenceId;
if (!msg.impl_->metadata.has_sequence_id()) {
sequenceId = msgSequenceGenerator_++;
} else {
sequenceId = msg.impl_->metadata.sequence_id();
}
setMessageMetadata(msg, sequenceId, uncompressedSize);

// reserving a spot and going forward - not blocking
if (!conf_.getBlockIfQueueFull() && !pendingMessagesQueue_.tryReserve(1)) {
Expand Down Expand Up @@ -402,7 +429,7 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
return;
}
int requestId = client->newRequestId();
Future<Result, std::string> future = cnx->sendRequestWithId(
Future<Result, ResponseData> future = cnx->sendRequestWithId(
Commands::newCloseProducer(producerId_, requestId), requestId);
if (!callback.empty()) {
future.addListener(
Expand Down Expand Up @@ -539,6 +566,9 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId) {
// -1 since the pushing batch message into the queue already released a spot
pendingMessagesQueue_.release(op.msg_.impl_->metadata.num_messages_in_batch() - 1);
}

lastSequenceIdPublished_ = sequenceId + op.msg_.impl_->metadata.num_messages_in_batch() - 1;

lock.unlock();
if (op.sendCallback_) {
try {
Expand Down
10 changes: 8 additions & 2 deletions pulsar-client-cpp/lib/ProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ class ProducerImpl : public HandlerBase, public boost::enable_shared_from_this<P

virtual void disconnectProducer();

const std::string& getProducerName() const;

int64_t getLastSequenceId() const;

uint64_t getProducerId() const;

virtual void start();
Expand Down Expand Up @@ -111,7 +115,7 @@ class ProducerImpl : public HandlerBase, public boost::enable_shared_from_this<P
void printStats();

void handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
const std::string& producerName);
const ResponseData& responseData);

void statsCallBackHandler(Result , const Message& , SendCallback , boost::posix_time::ptime );

Expand All @@ -130,10 +134,12 @@ class ProducerImpl : public HandlerBase, public boost::enable_shared_from_this<P
std::string producerName_;
std::string producerStr_;
uint64_t producerId_;
uint64_t msgSequenceGenerator_;
int64_t msgSequenceGenerator_;
proto::BaseCommand cmd_;
BatchMessageContainerPtr batchMessageContainer;

volatile int64_t lastSequenceIdPublished_;

typedef boost::shared_ptr<boost::asio::deadline_timer> TimerPtr;
TimerPtr sendTimer_;
void handleSendTimeout(const boost::system::error_code& err);
Expand Down
4 changes: 4 additions & 0 deletions pulsar-client-cpp/lib/ProducerImplBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ class ProducerImplBase {
virtual ~ProducerImplBase(){
}

virtual const std::string& getProducerName() const = 0;

virtual int64_t getLastSequenceId() const = 0;

virtual void sendAsync(const Message& msg, SendCallback callback) = 0;
virtual void closeAsync(CloseCallback callback) = 0;
virtual void start() = 0;
Expand Down
Loading

0 comments on commit 824922a

Please sign in to comment.