Skip to content

Commit

Permalink
[Issue 11496][C++] Allow partitioned producers to start lazily (apach…
Browse files Browse the repository at this point in the history
…e#11570)

Fixes apache#11496 also matches part of PIP 79.

C++ implementation that closely matches the proposed Java client changes from reducing partitioned producer connections and lookups: PR 10279

### Motivation

Producers that send messages to partitioned topics start a producer per partition, even when using single partition routing. For topics that have the combination of a large number of producers and a large number of partitions, this can put strain on the brokers. With say 1000 partitions and single partition routing with non-keyed messages, 999 topic owner lookups and producer registrations are performed that could be avoided.

PIP 79 also describes this. I wrote this before realising that PIP 79 also covers this. This implementation can be reviewed and contrasted to the Java client implementation in apache#10279.

### Modifications

Allows partitioned producers to start producers for individual partitions lazily. Starting a producer involves a topic owner
lookup to find out which broker is the owner of the partition, then registering the producer for that partition with the owner
broker. For topics with many partitions and when using SinglePartition routing without keyed messages, all of these
lookups and producer registrations are a waste except for the single chosen partition.

This change allows the user to control whether a producer on a partitioned topic uses this lazy start or not, via a new config
in ProducerConfiguration. When ProducerConfiguration.setLazyStartPartitionedProducers(true) is set, the PartitionedProducerImpl.start() becomes a synchronous operation that only does housekeeping (no network operations).
The producer of any given partition is started (which includes a topic owner lookup and registration) upon sending the first message to that partition. While the producer starts, messages are buffered.

The sendTimeout timer is only activated once a producer has been fully started, which should give enough time for any buffered messages to be sent. For very short send timeouts, this setting could cause send timeouts during the start phase. The default of 30s should however not cause this issue.
  • Loading branch information
Vanlightly authored Aug 16, 2021
1 parent 4793ff7 commit 9577b84
Show file tree
Hide file tree
Showing 19 changed files with 699 additions and 103 deletions.
21 changes: 21 additions & 0 deletions pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,27 @@ class PULSAR_PUBLIC ProducerConfiguration {
*/
HashingScheme getHashingScheme() const;

/**
* This config affects producers of partitioned topics only. It controls whether
* producers register and connect immediately to the owner broker of each partition
* or start lazily on demand. The internal producer of one partition is always
* started eagerly, chosen by the routing policy, but the internal producers of
* any additional partitions are started on demand, upon receiving their first
* message.
* Using this mode can reduce the strain on brokers for topics with large numbers of
* partitions and when the SinglePartition routing policy is used without keyed messages.
* Because producer connection can be on demand, this can produce extra send latency
* for the first messages of a given partition.
* @param true/false as to whether to start partition producers lazily
* @return
*/
ProducerConfiguration& setLazyStartPartitionedProducers(bool);

/**
* The getter associated with setLazyStartPartitionedProducers()
*/
bool getLazyStartPartitionedProducers() const;

/**
* The setter associated with getBlockIfQueueFull()
*/
Expand Down
6 changes: 6 additions & 0 deletions pulsar-client-cpp/include/pulsar/c/producer_configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ PULSAR_PUBLIC void pulsar_producer_configuration_set_hashing_scheme(pulsar_produ
PULSAR_PUBLIC pulsar_hashing_scheme
pulsar_producer_configuration_get_hashing_scheme(pulsar_producer_configuration_t *conf);

PULSAR_PUBLIC void pulsar_producer_configuration_set_lazy_start_partitioned_producers(
pulsar_producer_configuration_t *conf, int useLazyStartPartitionedProducers);

PULSAR_PUBLIC int pulsar_producer_configuration_get_lazy_start_partitioned_producers(
pulsar_producer_configuration_t *conf);

PULSAR_PUBLIC void pulsar_producer_configuration_set_block_if_queue_full(
pulsar_producer_configuration_t *conf, int blockIfQueueFull);

Expand Down
14 changes: 12 additions & 2 deletions pulsar-client-cpp/lib/HandlerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,23 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic,
mutex_(),
creationTimestamp_(TimeUtils::now()),
operationTimeut_(seconds(client->conf().getOperationTimeoutSeconds())),
state_(Pending),
state_(NotStarted),
backoff_(backoff),
epoch_(0),
timer_(executor_->createDeadlineTimer()) {}

HandlerBase::~HandlerBase() { timer_->cancel(); }

void HandlerBase::start() { grabCnx(); }
void HandlerBase::start() {
Lock lock(mutex_);
// guard against concurrent state changes such as closing
if (state_ == NotStarted) {
state_ = Pending;
lock.unlock();

grabCnx();
}
}

void HandlerBase::grabCnx() {
Lock lock(mutex_);
Expand Down Expand Up @@ -106,6 +115,7 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con
scheduleReconnection(handler);
break;

case NotStarted:
case Closing:
case Closed:
case Failed:
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/HandlerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class HandlerBase {

enum State
{
NotStarted,
Pending,
Ready,
Closing,
Expand Down
91 changes: 76 additions & 15 deletions pulsar-client-cpp/lib/PartitionedProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,18 @@ unsigned int PartitionedProducerImpl::getNumPartitionsWithLock() const {
return getNumPartitions();
}

ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition) const {
ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition, bool lazy) {
using namespace std::placeholders;
std::string topicPartitionName = topicName_->getTopicPartitionName(partition);
auto producer = std::make_shared<ProducerImpl>(client_, topicPartitionName, conf_, partition);
producer->getProducerCreatedFuture().addListener(
std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated,
const_cast<PartitionedProducerImpl*>(this)->shared_from_this(), _1, _2, partition));

if (lazy) {
createLazyPartitionProducer(partition);
} else {
producer->getProducerCreatedFuture().addListener(
std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated,
const_cast<PartitionedProducerImpl*>(this)->shared_from_this(), _1, _2, partition));
}

LOG_DEBUG("Creating Producer for single Partition - " << topicPartitionName);
return producer;
Expand All @@ -104,12 +109,28 @@ void PartitionedProducerImpl::start() {
// create producer per partition
// Here we don't need `producersMutex` to protect `producers_`, because `producers_` can only be increased
// when `state_` is Ready
for (unsigned int i = 0; i < getNumPartitions(); i++) {
producers_.push_back(newInternalProducer(i));
}

for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) {
(*prod)->start();
if (conf_.getLazyStartPartitionedProducers()) {
// start one producer now, to ensure authz errors occur now
// if the SinglePartition router is used, then this producer will serve
// all non-keyed messages in the future
Message msg = MessageBuilder().setContent("x").build();
short partition = (short)(routerPolicy_->getPartition(msg, *topicMetadata_));

for (unsigned int i = 0; i < getNumPartitions(); i++) {
bool lazy = (short)i != partition;
producers_.push_back(newInternalProducer(i, lazy));
}

producers_[partition]->start();
} else {
for (unsigned int i = 0; i < getNumPartitions(); i++) {
producers_.push_back(newInternalProducer(i, false));
}

for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) {
(*prod)->start();
}
}
}

Expand Down Expand Up @@ -147,8 +168,27 @@ void PartitionedProducerImpl::handleSinglePartitionProducerCreated(Result result
}
}

void PartitionedProducerImpl::createLazyPartitionProducer(unsigned int partitionIndex) {
const auto numPartitions = getNumPartitions();
assert(numProducersCreated_ <= numPartitions);
assert(partitionIndex <= numPartitions);
numProducersCreated_++;
if (numProducersCreated_ == numPartitions) {
state_ = Ready;
if (partitionsUpdateTimer_) {
runPartitionUpdateTask();
}
partitionedProducerCreatedPromise_.setValue(shared_from_this());
}
}

// override
void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
if (!assertState(Ready)) {
callback(ResultAlreadyClosed, msg.getMessageId());
return;
}

// get partition for this message from router policy
Lock producersLock(producersMutex_);
short partition = (short)(routerPolicy_->getPartition(msg, *topicMetadata_));
Expand All @@ -161,7 +201,14 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac
}
// find a producer for that partition, index should start from 0
ProducerImplPtr producer = producers_[partition];

// if the producer is not started (lazy producer), then kick-off the start process
if (!producer->isStarted()) {
producer->start();
}

producersLock.unlock();

// send message on that partition
producer->sendAsync(msg, callback);
}
Expand All @@ -175,6 +222,11 @@ void PartitionedProducerImpl::setState(const PartitionedProducerState state) {
lock.unlock();
}

bool PartitionedProducerImpl::assertState(const PartitionedProducerState state) {
Lock lock(mutex_);
return state_ == state;
}

const std::string& PartitionedProducerImpl::getProducerName() const {
Lock producersLock(producersMutex_);
return producers_[0]->getProducerName();
Expand Down Expand Up @@ -285,7 +337,9 @@ bool PartitionedProducerImpl::isClosed() { return state_ == Closed; }
void PartitionedProducerImpl::triggerFlush() {
Lock producersLock(producersMutex_);
for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) {
(*prod)->triggerFlush();
if ((*prod)->isStarted()) {
(*prod)->triggerFlush();
}
}
}

Expand Down Expand Up @@ -322,7 +376,11 @@ void PartitionedProducerImpl::flushAsync(FlushCallback callback) {
};

for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) {
(*prod)->flushAsync(subFlushCallback);
if ((*prod)->isStarted()) {
(*prod)->flushAsync(subFlushCallback);
} else {
subFlushCallback(ResultOk);
}
}
}

Expand Down Expand Up @@ -355,8 +413,11 @@ void PartitionedProducerImpl::handleGetPartitions(Result result,
topicMetadata_.reset(new TopicMetadataImpl(newNumPartitions));

for (unsigned int i = currentNumPartitions; i < newNumPartitions; i++) {
auto producer = newInternalProducer(i);
producer->start();
auto producer = newInternalProducer(i, conf_.getLazyStartPartitionedProducers());

if (!conf_.getLazyStartPartitionedProducers()) {
producer->start();
}
producers_.push_back(producer);
}
// `runPartitionUpdateTask()` will be called in `handleSinglePartitionProducerCreated()`
Expand All @@ -379,8 +440,8 @@ bool PartitionedProducerImpl::isConnected() const {
Lock producersLock(producersMutex_);
const auto producers = producers_;
producersLock.unlock();
for (const auto& producer : producers_) {
if (!producer->isConnected()) {
for (const auto& producer : producers) {
if (producer->isStarted() && !producer->isConnected()) {
return false;
}
}
Expand Down
5 changes: 3 additions & 2 deletions pulsar-client-cpp/lib/PartitionedProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
uint64_t getNumberOfConnectedProducer() override;
void handleSinglePartitionProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr,
const unsigned int partitionIndex);

void createLazyPartitionProducer(const unsigned int partitionIndex);
void handleSinglePartitionProducerClose(Result result, const unsigned int partitionIndex,
CloseCallback callback);

Expand Down Expand Up @@ -104,7 +104,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
unsigned int getNumPartitions() const;
unsigned int getNumPartitionsWithLock() const;

ProducerImplPtr newInternalProducer(unsigned int partition) const;
ProducerImplPtr newInternalProducer(unsigned int partition, bool lazy);

MessageRoutingPolicyPtr routerPolicy_;

Expand All @@ -129,6 +129,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
void runPartitionUpdateTask();
void getPartitionMetadata();
void handleGetPartitions(const Result result, const LookupDataResultPtr& partitionMetadata);
bool assertState(const PartitionedProducerState state);
};

} // namespace pulsar
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/Producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ void Producer::flushAsync(FlushCallback callback) {
void Producer::producerFailMessages(Result result) {
if (impl_) {
ProducerImpl* producerImpl = static_cast<ProducerImpl*>(impl_.get());
producerImpl->failPendingMessages(result);
producerImpl->failPendingMessages(result, true);
}
}

Expand Down
10 changes: 10 additions & 0 deletions pulsar-client-cpp/lib/ProducerConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,16 @@ ProducerConfiguration& ProducerConfiguration::addEncryptionKey(std::string key)
return *this;
}

ProducerConfiguration& ProducerConfiguration::setLazyStartPartitionedProducers(
bool useLazyStartPartitionedProducers) {
impl_->useLazyStartPartitionedProducers = useLazyStartPartitionedProducers;
return *this;
}

bool ProducerConfiguration::getLazyStartPartitionedProducers() const {
return impl_->useLazyStartPartitionedProducers;
}

ProducerConfiguration& ProducerConfiguration::setSchema(const SchemaInfo& schemaInfo) {
impl_->schemaInfo = schemaInfo;
return *this;
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/ProducerConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ struct ProducerConfigurationImpl {
ProducerConfiguration::PartitionsRoutingMode routingMode{ProducerConfiguration::UseSinglePartition};
MessageRoutingPolicyPtr messageRouter;
ProducerConfiguration::HashingScheme hashingScheme{ProducerConfiguration::BoostHash};
bool useLazyStartPartitionedProducers{false};
bool blockIfQueueFull{false};
bool batchingEnabled{true};
unsigned int batchingMaxMessages{1000};
Expand Down
Loading

0 comments on commit 9577b84

Please sign in to comment.