Skip to content

Commit

Permalink
cpp: fix reference leak when reader create (apache#7793)
Browse files Browse the repository at this point in the history
### Motivation
User reports a valgrind error for `client::createReader` method:

```
==23308== 284,826 (160 direct, 284,666 indirect) bytes in 1 blocks are definitely lost in loss record 113 of 113
==23308==    at 0x4C2A593: operator new(unsigned long) (vg_replace_malloc.c:344)
==23308==    by 0x5303B4A: allocate (new_allocator.h:104)
==23308==    by 0x5303B4A: allocate (alloc_traits.h:351)
==23308==    by 0x5303B4A: __shared_count<pulsar::InternalState<pulsar::Result, pulsar::Reader>, std::allocator<pulsar::InternalState<pulsar::Result, pulsar::Reader> > > (shared_ptr_base.h:499)
==23308==    by 0x5303B4A: __shared_ptr<std::allocator<pulsar::InternalState<pulsar::Result, pulsar::Reader> > > (shared_ptr_base.h:957)
==23308==    by 0x5303B4A: shared_ptr<std::allocator<pulsar::InternalState<pulsar::Result, pulsar::Reader> > > (shared_ptr.h:316)
==23308==    by 0x5303B4A: allocate_shared<pulsar::InternalState<pulsar::Result, pulsar::Reader>, std::allocator<pulsar::InternalState<pulsar::Result, pulsar::Reader> > > (shared_ptr.h:598)
==23308==    by 0x5303B4A: make_shared<pulsar::InternalState<pulsar::Result, pulsar::Reader> > (shared_ptr.h:614)
==23308==    by 0x5303B4A: Promise (Future.h:91)
==23308==    by 0x5303B4A: pulsar::Client::createReader(std::string const&, pulsar::MessageId const&, pulsar::ReaderConfiguration const&, pulsar::Reader&) (Client.cc:142)
==23308==    by 0x401DDB: main (pulsarReader.cpp:92)
==23308== 
```
It seems the `ReaderImpl` has been tracked twice when call WaitForCallbackValue. this PR is to fix the issue.

### Modifications

- fix WaitForCallbackValue which is changed in PR apache#3484.
- add test for the reference issue.

### Verifying this change
ut passed.
valgrind found no issue:
```
==14758== LEAK SUMMARY:
==14758==    definitely lost: 0 bytes in 0 blocks
==14758==    indirectly lost: 0 bytes in 0 blocks
==14758==      possibly lost: 0 bytes in 0 blocks
==14758==    still reachable: 12,621 bytes in 145 blocks
==14758==         suppressed: 0 bytes in 0 blocks
==14758== 
==14758== For lists of detected and suppressed errors, rerun with: -s
==14758== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
```
  • Loading branch information
jiazhai authored Aug 11, 2020
1 parent a4a12d1 commit 0e67fc5
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 3 deletions.
6 changes: 5 additions & 1 deletion pulsar-client-cpp/lib/ReaderImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ void ReaderImpl::start(const MessageId& startMessageId) {
const std::string& ReaderImpl::getTopic() const { return consumer_->getTopic(); }

void ReaderImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumer) {
readerCreatedCallback_(result, Reader(shared_from_this()));
auto self = shared_from_this();
readerCreatedCallback_(result, Reader(self));
readerImplWeakPtr_ = self;
}

ConsumerImplPtr ReaderImpl::getConsumer() { return consumer_; }
Expand Down Expand Up @@ -111,4 +113,6 @@ void ReaderImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
consumer_->seekAsync(timestamp, callback);
}

ReaderImplWeakPtr ReaderImpl::getReaderImplWeakPtr() { return readerImplWeakPtr_; }

} // namespace pulsar
3 changes: 3 additions & 0 deletions pulsar-client-cpp/lib/ReaderImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class ReaderImpl : public std::enable_shared_from_this<ReaderImpl> {
void seekAsync(const MessageId& msgId, ResultCallback callback);
void seekAsync(uint64_t timestamp, ResultCallback callback);

ReaderImplWeakPtr getReaderImplWeakPtr();

private:
void handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumer);

Expand All @@ -65,6 +67,7 @@ class ReaderImpl : public std::enable_shared_from_this<ReaderImpl> {
ConsumerImplPtr consumer_;
ReaderCallback readerCreatedCallback_;
ReaderListener readerListener_;
ReaderImplWeakPtr readerImplWeakPtr_;
};
} // namespace pulsar

Expand Down
4 changes: 2 additions & 2 deletions pulsar-client-cpp/lib/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ struct WaitForCallback {

template <typename T>
struct WaitForCallbackValue {
Promise<Result, T> m_promise;
Promise<Result, T>& m_promise;

WaitForCallbackValue(Promise<Result, T> promise) : m_promise(promise) {}
WaitForCallbackValue(Promise<Result, T>& promise) : m_promise(promise) {}

void operator()(Result result, const T& value) {
if (result == ResultOk) {
Expand Down
46 changes: 46 additions & 0 deletions pulsar-client-cpp/tests/ReaderTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
* under the License.
*/
#include <pulsar/Client.h>
#include <pulsar/Reader.h>
#include "ReaderTest.h"

#include <gtest/gtest.h>

Expand Down Expand Up @@ -416,3 +418,47 @@ TEST(ReaderTest, testReaderReachEndOfTopicMessageWithoutBatches) {
reader.close();
client.close();
}

TEST(ReaderTest, testReferenceLeak) {
Client client(serviceUrl);

std::string topicName = "persistent://public/default/testReferenceLeak";

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));

for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
Message msg = MessageBuilder().setContent(content).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}

ReaderConfiguration readerConf;
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));

ConsumerImplBaseWeakPtr consumerPtr = ReaderTest::getConsumer(reader);
ReaderImplWeakPtr readerPtr = ReaderTest::getReaderImplWeakPtr(reader);

LOG_INFO("1 consumer use count " << consumerPtr.use_count());
LOG_INFO("1 reader use count " << readerPtr.use_count());

for (int i = 0; i < 10; i++) {
Message msg;
ASSERT_EQ(ResultOk, reader.readNext(msg));

std::string content = msg.getDataAsString();
std::string expected = "my-message-" + std::to_string(i);
ASSERT_EQ(expected, content);
}

producer.close();
reader.close();
// will be released after exit this method.
ASSERT_EQ(1, consumerPtr.use_count());
ASSERT_EQ(1, readerPtr.use_count());
client.close();
// will be released after exit this method.
ASSERT_EQ(1, consumerPtr.use_count());
ASSERT_EQ(1, readerPtr.use_count());
}
32 changes: 32 additions & 0 deletions pulsar-client-cpp/tests/ReaderTest.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "lib/ReaderImpl.h"
#include <string>

using std::string;

namespace pulsar {
class ReaderTest {
public:
static ConsumerImplPtr getConsumer(const Reader& reader) { return reader.impl_->getConsumer(); }
static ReaderImplWeakPtr getReaderImplWeakPtr(const Reader& reader) {
return reader.impl_->getReaderImplWeakPtr();
}
};
} // namespace pulsar

0 comments on commit 0e67fc5

Please sign in to comment.