From 0e67fc598d3f22f14ca51276ebb75c57b7a159af Mon Sep 17 00:00:00 2001 From: Jia Zhai Date: Wed, 12 Aug 2020 06:50:05 +0800 Subject: [PATCH] cpp: fix reference leak when reader create (#7793) ### Motivation User reports a valgrind error for `client::createReader` method: ``` ==23308== 284,826 (160 direct, 284,666 indirect) bytes in 1 blocks are definitely lost in loss record 113 of 113 ==23308== at 0x4C2A593: operator new(unsigned long) (vg_replace_malloc.c:344) ==23308== by 0x5303B4A: allocate (new_allocator.h:104) ==23308== by 0x5303B4A: allocate (alloc_traits.h:351) ==23308== by 0x5303B4A: __shared_count, std::allocator > > (shared_ptr_base.h:499) ==23308== by 0x5303B4A: __shared_ptr > > (shared_ptr_base.h:957) ==23308== by 0x5303B4A: shared_ptr > > (shared_ptr.h:316) ==23308== by 0x5303B4A: allocate_shared, std::allocator > > (shared_ptr.h:598) ==23308== by 0x5303B4A: make_shared > (shared_ptr.h:614) ==23308== by 0x5303B4A: Promise (Future.h:91) ==23308== by 0x5303B4A: pulsar::Client::createReader(std::string const&, pulsar::MessageId const&, pulsar::ReaderConfiguration const&, pulsar::Reader&) (Client.cc:142) ==23308== by 0x401DDB: main (pulsarReader.cpp:92) ==23308== ``` It seems the `ReaderImpl` has been tracked twice when call WaitForCallbackValue. this PR is to fix the issue. ### Modifications - fix WaitForCallbackValue which is changed in PR #3484. - add test for the reference issue. ### Verifying this change ut passed. valgrind found no issue: ``` ==14758== LEAK SUMMARY: ==14758== definitely lost: 0 bytes in 0 blocks ==14758== indirectly lost: 0 bytes in 0 blocks ==14758== possibly lost: 0 bytes in 0 blocks ==14758== still reachable: 12,621 bytes in 145 blocks ==14758== suppressed: 0 bytes in 0 blocks ==14758== ==14758== For lists of detected and suppressed errors, rerun with: -s ==14758== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0) ``` --- pulsar-client-cpp/lib/ReaderImpl.cc | 6 +++- pulsar-client-cpp/lib/ReaderImpl.h | 3 ++ pulsar-client-cpp/lib/Utils.h | 4 +-- pulsar-client-cpp/tests/ReaderTest.cc | 46 +++++++++++++++++++++++++++ pulsar-client-cpp/tests/ReaderTest.h | 32 +++++++++++++++++++ 5 files changed, 88 insertions(+), 3 deletions(-) create mode 100644 pulsar-client-cpp/tests/ReaderTest.h diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc b/pulsar-client-cpp/lib/ReaderImpl.cc index ad76493e8c8fe..4bd091aafb405 100644 --- a/pulsar-client-cpp/lib/ReaderImpl.cc +++ b/pulsar-client-cpp/lib/ReaderImpl.cc @@ -63,7 +63,9 @@ void ReaderImpl::start(const MessageId& startMessageId) { const std::string& ReaderImpl::getTopic() const { return consumer_->getTopic(); } void ReaderImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumer) { - readerCreatedCallback_(result, Reader(shared_from_this())); + auto self = shared_from_this(); + readerCreatedCallback_(result, Reader(self)); + readerImplWeakPtr_ = self; } ConsumerImplPtr ReaderImpl::getConsumer() { return consumer_; } @@ -111,4 +113,6 @@ void ReaderImpl::seekAsync(uint64_t timestamp, ResultCallback callback) { consumer_->seekAsync(timestamp, callback); } +ReaderImplWeakPtr ReaderImpl::getReaderImplWeakPtr() { return readerImplWeakPtr_; } + } // namespace pulsar diff --git a/pulsar-client-cpp/lib/ReaderImpl.h b/pulsar-client-cpp/lib/ReaderImpl.h index ca41510df40bc..40692474c4017 100644 --- a/pulsar-client-cpp/lib/ReaderImpl.h +++ b/pulsar-client-cpp/lib/ReaderImpl.h @@ -52,6 +52,8 @@ class ReaderImpl : public std::enable_shared_from_this { void seekAsync(const MessageId& msgId, ResultCallback callback); void seekAsync(uint64_t timestamp, ResultCallback callback); + ReaderImplWeakPtr getReaderImplWeakPtr(); + private: void handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumer); @@ -65,6 +67,7 @@ class ReaderImpl : public std::enable_shared_from_this { ConsumerImplPtr consumer_; ReaderCallback readerCreatedCallback_; ReaderListener readerListener_; + ReaderImplWeakPtr readerImplWeakPtr_; }; } // namespace pulsar diff --git a/pulsar-client-cpp/lib/Utils.h b/pulsar-client-cpp/lib/Utils.h index cd2fb6dbcd096..fd50e977cf560 100644 --- a/pulsar-client-cpp/lib/Utils.h +++ b/pulsar-client-cpp/lib/Utils.h @@ -38,9 +38,9 @@ struct WaitForCallback { template struct WaitForCallbackValue { - Promise m_promise; + Promise& m_promise; - WaitForCallbackValue(Promise promise) : m_promise(promise) {} + WaitForCallbackValue(Promise& promise) : m_promise(promise) {} void operator()(Result result, const T& value) { if (result == ResultOk) { diff --git a/pulsar-client-cpp/tests/ReaderTest.cc b/pulsar-client-cpp/tests/ReaderTest.cc index a91af57ffafd8..779331107255e 100644 --- a/pulsar-client-cpp/tests/ReaderTest.cc +++ b/pulsar-client-cpp/tests/ReaderTest.cc @@ -17,6 +17,8 @@ * under the License. */ #include +#include +#include "ReaderTest.h" #include @@ -416,3 +418,47 @@ TEST(ReaderTest, testReaderReachEndOfTopicMessageWithoutBatches) { reader.close(); client.close(); } + +TEST(ReaderTest, testReferenceLeak) { + Client client(serviceUrl); + + std::string topicName = "persistent://public/default/testReferenceLeak"; + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); + + for (int i = 0; i < 10; i++) { + std::string content = "my-message-" + std::to_string(i); + Message msg = MessageBuilder().setContent(content).build(); + ASSERT_EQ(ResultOk, producer.send(msg)); + } + + ReaderConfiguration readerConf; + Reader reader; + ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader)); + + ConsumerImplBaseWeakPtr consumerPtr = ReaderTest::getConsumer(reader); + ReaderImplWeakPtr readerPtr = ReaderTest::getReaderImplWeakPtr(reader); + + LOG_INFO("1 consumer use count " << consumerPtr.use_count()); + LOG_INFO("1 reader use count " << readerPtr.use_count()); + + for (int i = 0; i < 10; i++) { + Message msg; + ASSERT_EQ(ResultOk, reader.readNext(msg)); + + std::string content = msg.getDataAsString(); + std::string expected = "my-message-" + std::to_string(i); + ASSERT_EQ(expected, content); + } + + producer.close(); + reader.close(); + // will be released after exit this method. + ASSERT_EQ(1, consumerPtr.use_count()); + ASSERT_EQ(1, readerPtr.use_count()); + client.close(); + // will be released after exit this method. + ASSERT_EQ(1, consumerPtr.use_count()); + ASSERT_EQ(1, readerPtr.use_count()); +} diff --git a/pulsar-client-cpp/tests/ReaderTest.h b/pulsar-client-cpp/tests/ReaderTest.h new file mode 100644 index 0000000000000..fd0387f1d0662 --- /dev/null +++ b/pulsar-client-cpp/tests/ReaderTest.h @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "lib/ReaderImpl.h" +#include + +using std::string; + +namespace pulsar { +class ReaderTest { + public: + static ConsumerImplPtr getConsumer(const Reader& reader) { return reader.impl_->getConsumer(); } + static ReaderImplWeakPtr getReaderImplWeakPtr(const Reader& reader) { + return reader.impl_->getReaderImplWeakPtr(); + } +}; +} // namespace pulsar