Skip to content

Commit

Permalink
[fix][C++ client] Fix the close of Client might stuck or return a wro…
Browse files Browse the repository at this point in the history
…ng result (apache#16285)

Fixes apache#15976

### Motivation

Currently even if the producer, consumer, or reader failed to
create, it would still be added to the producers or consumers in
`Client`. `Client::close` first closes the internal producers and
consumers, if the producers or consumers to close include failed
producers or consumers, `Client::close` would return
`ResultAlreadyClosed`. Even worse, closing a failed partitioned producer
might stuck.

It also makes the Python test `test_listener_name_client` flaky because
`client.close()` will throw an exception if the underlying
`Client::close` call in C++ client doesn't return `ResultOk`.

### Modifications

- Only adding the created producer or consumer to the internal list of
  `Client` after the creation succeeded.
- Add `ClientTest.testWrongListener` to verify when producer, consumer,
  reader failed to create, the internal producer list and consumer list
  are both empty. And `client.close()` would return `ResultOk`.
  • Loading branch information
BewareMyPower authored Jul 6, 2022
1 parent 4c958a9 commit e23d312
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 29 deletions.
41 changes: 24 additions & 17 deletions pulsar-client-cpp/lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,6 @@ void ClientImpl::handleCreateProducer(const Result result, const LookupDataResul
producer->getProducerCreatedFuture().addListener(
std::bind(&ClientImpl::handleProducerCreated, shared_from_this(), std::placeholders::_1,
std::placeholders::_2, callback, producer));
Lock lock(mutex_);
producers_.push_back(producer);
lock.unlock();
producer->start();
} else {
LOG_ERROR("Error Checking/Getting Partition Metadata while creating producer on "
Expand All @@ -198,7 +195,14 @@ void ClientImpl::handleCreateProducer(const Result result, const LookupDataResul

void ClientImpl::handleProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr,
CreateProducerCallback callback, ProducerImplBasePtr producer) {
callback(result, Producer(producer));
if (result == ResultOk) {
Lock lock(mutex_);
producers_.push_back(producer);
lock.unlock();
callback(result, Producer(producer));
} else {
callback(result, {});
}
}

void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& startMessageId,
Expand Down Expand Up @@ -241,10 +245,13 @@ void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDat

ReaderImplPtr reader = std::make_shared<ReaderImpl>(shared_from_this(), topicName->toString(), conf,
getListenerExecutorProvider()->get(), callback);
reader->start(startMessageId);

Lock lock(mutex_);
consumers_.push_back(reader->getConsumer());
ConsumerImplBasePtr consumer = reader->getConsumer().lock();
auto self = shared_from_this();
reader->start(startMessageId, [this, self](const ConsumerImplBaseWeakPtr& weakConsumerPtr) {
Lock lock(mutex_);
consumers_.push_back(weakConsumerPtr);
lock.unlock();
});
}

void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName,
Expand Down Expand Up @@ -291,9 +298,6 @@ void ClientImpl::createPatternMultiTopicsConsumer(const Result result, const Nam
consumer->getConsumerCreatedFuture().addListener(
std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1,
std::placeholders::_2, callback, consumer));
Lock lock(mutex_);
consumers_.push_back(consumer);
lock.unlock();
consumer->start();
} else {
LOG_ERROR("Error Getting topicsOfNameSpace while createPatternMultiTopicsConsumer: " << result);
Expand All @@ -317,6 +321,7 @@ void ClientImpl::subscribeAsync(const std::vector<std::string>& topics, const st
return;
}
}
lock.unlock();

if (topicNamePtr) {
std::string randomName = generateRandomName();
Expand All @@ -331,8 +336,6 @@ void ClientImpl::subscribeAsync(const std::vector<std::string>& topics, const st
consumer->getConsumerCreatedFuture().addListener(std::bind(&ClientImpl::handleConsumerCreated,
shared_from_this(), std::placeholders::_1,
std::placeholders::_2, callback, consumer));
consumers_.push_back(consumer);
lock.unlock();
consumer->start();
}

Expand Down Expand Up @@ -389,9 +392,6 @@ void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr
consumer->getConsumerCreatedFuture().addListener(
std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1,
std::placeholders::_2, callback, consumer));
Lock lock(mutex_);
consumers_.push_back(consumer);
lock.unlock();
consumer->start();
} else {
LOG_ERROR("Error Checking/Getting Partition Metadata while Subscribing on " << topicName->toString()
Expand All @@ -402,7 +402,14 @@ void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr

void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
SubscribeCallback callback, ConsumerImplBasePtr consumer) {
callback(result, Consumer(consumer));
if (result == ResultOk) {
Lock lock(mutex_);
consumers_.push_back(consumer);
lock.unlock();
callback(result, Consumer(consumer));
} else {
callback(result, {});
}
}

Future<Result, ClientConnectionWeakPtr> ClientImpl::getConnection(const std::string& topic) {
Expand Down
21 changes: 12 additions & 9 deletions pulsar-client-cpp/lib/ReaderImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ ReaderImpl::ReaderImpl(const ClientImplPtr client, const std::string& topic, con
const ExecutorServicePtr listenerExecutor, ReaderCallback readerCreatedCallback)
: topic_(topic), client_(client), readerConf_(conf), readerCreatedCallback_(readerCreatedCallback) {}

void ReaderImpl::start(const MessageId& startMessageId) {
void ReaderImpl::start(const MessageId& startMessageId,
std::function<void(const ConsumerImplBaseWeakPtr&)> callback) {
ConsumerConfiguration consumerConf;
consumerConf.setConsumerType(ConsumerExclusive);
consumerConf.setReceiverQueueSize(readerConf_.getReceiverQueueSize());
Expand Down Expand Up @@ -79,19 +80,21 @@ void ReaderImpl::start(const MessageId& startMessageId) {
client_.lock(), topic_, subscription, consumerConf, ExecutorServicePtr(), false, NonPartitioned,
Commands::SubscriptionModeNonDurable, Optional<MessageId>::of(startMessageId));
consumer_->setPartitionIndex(TopicName::getPartitionIndex(topic_));
consumer_->getConsumerCreatedFuture().addListener(std::bind(&ReaderImpl::handleConsumerCreated,
shared_from_this(), std::placeholders::_1,
std::placeholders::_2));
auto self = shared_from_this();
consumer_->getConsumerCreatedFuture().addListener(
[this, self, callback](Result result, const ConsumerImplBaseWeakPtr& weakConsumerPtr) {
if (result == ResultOk) {
readerCreatedCallback_(result, Reader(self));
callback(weakConsumerPtr);
} else {
readerCreatedCallback_(result, {});
}
});
consumer_->start();
}

const std::string& ReaderImpl::getTopic() const { return consumer_->getTopic(); }

void ReaderImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumer) {
auto self = shared_from_this();
readerCreatedCallback_(result, Reader(self));
}

Result ReaderImpl::readNext(Message& msg) {
Result res = consumer_->receive(msg);
acknowledgeIfNecessary(res, msg);
Expand Down
4 changes: 1 addition & 3 deletions pulsar-client-cpp/lib/ReaderImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl>
ReaderImpl(const ClientImplPtr client, const std::string& topic, const ReaderConfiguration& conf,
const ExecutorServicePtr listenerExecutor, ReaderCallback readerCreatedCallback);

void start(const MessageId& startMessageId);
void start(const MessageId& startMessageId, std::function<void(const ConsumerImplBaseWeakPtr&)> callback);

const std::string& getTopic() const;

Expand All @@ -65,8 +65,6 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl>
bool isConnected() const;

private:
void handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumer);

void messageListener(Consumer consumer, const Message& msg);

void acknowledgeIfNecessary(Result result, const Message& msg);
Expand Down
32 changes: 32 additions & 0 deletions pulsar-client-cpp/tests/ClientTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,3 +236,35 @@ TEST(ClientTest, testReferenceCount) {
ASSERT_EQ(readerWeakPtr.use_count(), 0);
client.close();
}

TEST(ClientTest, testWrongListener) {
const std::string topic = "client-test-wrong-listener-" + std::to_string(time(nullptr));
auto httpCode = makePutRequest(
"http://localhost:8080/admin/v2/persistent/public/default/" + topic + "/partitions", "3");
LOG_INFO("create " << topic << ": " << httpCode);

Client client(lookupUrl, ClientConfiguration().setListenerName("test"));
Producer producer;
ASSERT_EQ(ResultServiceUnitNotReady, client.createProducer(topic, producer));
ASSERT_EQ(ResultProducerNotInitialized, producer.close());

Consumer consumer;
ASSERT_EQ(ResultServiceUnitNotReady, client.subscribe(topic, "sub", consumer));
ASSERT_EQ(ResultConsumerNotInitialized, consumer.close());

ASSERT_EQ(PulsarFriend::getProducers(client).size(), 0);
ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 0);
ASSERT_EQ(ResultOk, client.close());

// The connection will be closed when the consumer failed, we must recreate the Client. Otherwise, the
// creation of Reader would fail with ResultConnectError.
client = Client(lookupUrl, ClientConfiguration().setListenerName("test"));

// Currently Reader can only read a non-partitioned topic in C++ client
Reader reader;
ASSERT_EQ(ResultServiceUnitNotReady,
client.createReader(topic + "-partition-0", MessageId::earliest(), {}, reader));
ASSERT_EQ(ResultConsumerNotInitialized, reader.close());
ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 0);
ASSERT_EQ(ResultOk, client.close());
}

0 comments on commit e23d312

Please sign in to comment.