Skip to content

Commit

Permalink
BufferSlice: stats of total BufferSlices size
Browse files Browse the repository at this point in the history
GitOrigin-RevId: df712161ba00c4f3d6eae9b6459c69ee046a9bda
  • Loading branch information
arseny30 committed Aug 4, 2020
1 parent f74d8ba commit e75860b
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 4 deletions.
6 changes: 3 additions & 3 deletions tddb/td/db/binlog/Binlog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -667,9 +667,9 @@ void Binlog::do_reindex() {
<< fd_size_ << ' ' << detail::file_size(path_) << ' ' << fd_events_ << ' ' << path_;

double ratio = static_cast<double>(start_size) / static_cast<double>(finish_size + 1);
LOG(INFO) << "Regenerate index " << tag("name", path_) << tag("time", format::as_time(finish_time - start_time))
<< tag("before_size", format::as_size(start_size)) << tag("after_size", format::as_size(finish_size))
<< tag("ratio", ratio) << tag("before_events", start_events) << tag("after_events", finish_events);
LOG(ERROR) << "Regenerate index " << tag("name", path_) << tag("time", format::as_time(finish_time - start_time))
<< tag("before_size", format::as_size(start_size)) << tag("after_size", format::as_size(finish_size))
<< tag("ratio", ratio) << tag("before_events", start_events) << tag("after_events", finish_events);

buffer_writer_ = ChainBufferWriter();
buffer_reader_ = buffer_writer_.extract_reader();
Expand Down
2 changes: 1 addition & 1 deletion tdutils/td/utils/ThreadSafeCounter.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class ThreadSafeMultiCounter {
int64 sum(size_t index) const {
CHECK(index < N);
int64 res = 0;
tls_.for_each([&res](auto &value) { res += value[index].load(std::memory_order_relaxed); });
tls_.for_each([&res, &index](auto &value) { res += value[index].load(std::memory_order_relaxed); });
return res;
}
void clear() {
Expand Down
12 changes: 12 additions & 0 deletions tdutils/td/utils/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

#include "td/utils/port/thread_local.h"

#include "td/utils/ThreadSafeCounter.h"

#include <cstddef>
#include <new>

Expand All @@ -24,6 +26,16 @@ TD_THREAD_LOCAL BufferAllocator::BufferRawTls *BufferAllocator::buffer_raw_tls;

std::atomic<size_t> BufferAllocator::buffer_mem;

static td::ThreadSafeCounter buffer_slice_size_;

int64 BufferAllocator::get_buffer_slice_size() {
return buffer_slice_size_.sum();
}

void BufferAllocator::track_buffer_slice(int64 size) {
buffer_slice_size_.add(size);
}

size_t BufferAllocator::get_buffer_mem() {
return buffer_mem;
}
Expand Down
37 changes: 37 additions & 0 deletions tdutils/td/utils/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,15 @@ class BufferAllocator {
static ReaderPtr create_reader(const ReaderPtr &raw);

static size_t get_buffer_mem();
static int64 get_buffer_slice_size();

static void clear_thread_local();

private:
friend class BufferSlice;

static void track_buffer_slice(int64 size);

static ReaderPtr create_reader_fast(size_t size);

static WriterPtr create_writer_exact(size_t size);
Expand Down Expand Up @@ -104,16 +109,29 @@ class BufferSlice {
return;
}
begin_ = buffer_->begin_;
end_ = begin_;
sync_with_writer();
}
BufferSlice(BufferReaderPtr buffer_ptr, size_t begin, size_t end)
: buffer_(std::move(buffer_ptr)), begin_(begin), end_(end) {
debug_track();
}
BufferSlice(BufferSlice &&other) : BufferSlice(std::move(other.buffer_), other.begin_, other.end_) {
debug_untrack(); // yes, debug_untrack
}
BufferSlice &operator=(BufferSlice &&other) {
debug_untrack();
buffer_ = std::move(other.buffer_);
begin_ = other.begin_;
end_ = other.end_;
return *this;
}

explicit BufferSlice(size_t size) : buffer_(BufferAllocator::create_reader(size)) {
end_ = buffer_->end_.load(std::memory_order_relaxed);
begin_ = end_ - ((size + 7) & -8);
end_ = begin_ + size;
debug_track();
}

explicit BufferSlice(Slice slice) : BufferSlice(slice.size()) {
Expand All @@ -123,6 +141,17 @@ class BufferSlice {
BufferSlice(const char *ptr, size_t size) : BufferSlice(Slice(ptr, size)) {
}

~BufferSlice() {
debug_untrack();
}

void debug_track() {
BufferAllocator::track_buffer_slice(static_cast<int64>(size()));
}
void debug_untrack() {
BufferAllocator::track_buffer_slice(-static_cast<int64>(size()));
}

BufferSlice clone() const {
if (is_null()) {
return BufferSlice(BufferReaderPtr(), begin_, end_);
Expand Down Expand Up @@ -166,21 +195,26 @@ class BufferSlice {
}

bool confirm_read(size_t size) {
debug_untrack();
begin_ += size;
CHECK(begin_ <= end_);
debug_track();
return begin_ == end_;
}

void truncate(size_t limit) {
if (size() > limit) {
debug_untrack();
end_ = begin_ + limit;
debug_track();
}
}

BufferSlice from_slice(Slice slice) const {
auto res = BufferSlice(BufferAllocator::create_reader(buffer_));
res.begin_ = static_cast<size_t>(slice.ubegin() - buffer_->data_);
res.end_ = static_cast<size_t>(slice.uend() - buffer_->data_);
res.debug_track();
CHECK(buffer_->begin_ <= res.begin_);
CHECK(res.begin_ <= res.end_);
CHECK(res.end_ <= buffer_->end_.load(std::memory_order_relaxed));
Expand Down Expand Up @@ -220,16 +254,19 @@ class BufferSlice {

// set end_ into writer's end_
size_t sync_with_writer() {
debug_untrack();
CHECK(!is_null());
auto old_end = end_;
end_ = buffer_->end_.load(std::memory_order_acquire);
debug_track();
return end_ - old_end;
}
bool is_writer_alive() const {
CHECK(!is_null());
return buffer_->has_writer_.load(std::memory_order_acquire);
}
void clear() {
debug_untrack();
begin_ = 0;
end_ = 0;
buffer_ = nullptr;
Expand Down
2 changes: 2 additions & 0 deletions test/http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ TEST(Http, reader) {
clear_thread_locals();
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
auto start_mem = BufferAllocator::get_buffer_mem();
auto start_size = BufferAllocator::get_buffer_slice_size();
for (int i = 0; i < 20; i++) {
td::ChainBufferWriter input_writer;
auto input = input_writer.extract_reader();
Expand Down Expand Up @@ -184,6 +185,7 @@ TEST(Http, reader) {
}
clear_thread_locals();
ASSERT_EQ(start_mem, BufferAllocator::get_buffer_mem());
ASSERT_EQ(start_size, BufferAllocator::get_buffer_slice_size());
}

TEST(Http, gzip_bomb) {
Expand Down
34 changes: 34 additions & 0 deletions test/tqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include "td/db/binlog/BinlogHelper.h"
#include "td/db/TQueue.h"

#include "td/utils/Random.h"
#include "td/utils/buffer.h"
#include "td/utils/int_types.h"
#include "td/utils/misc.h"
#include "td/utils/port/path.h"
Expand Down Expand Up @@ -192,3 +194,35 @@ TEST(TQueue, random) {
steps.step(rnd);
}
}

TEST(TQueue, memory_leak) {
//return;
auto tqueue = td::TQueue::create();
auto tqueue_binlog = td::make_unique<td::TQueueBinlog<td::Binlog>>();
std::string binlog_path = "test_tqueue.binlog";
td::Binlog::destroy(binlog_path).ensure();
auto binlog = std::make_shared<td::Binlog>();
binlog->init(binlog_path, [&](const td::BinlogEvent &event) { UNREACHABLE(); }).ensure();
tqueue_binlog->set_binlog(std::move(binlog));
tqueue->set_callback(std::move(tqueue_binlog));

double now = 0;
std::vector<td::TQueue::EventId> ids;
td::Random::Xorshift128plus rnd(123);
int i = 0;
while (true) {
auto id = tqueue->push(1, "a", now + 600000, 0, {}).move_as_ok();
ids.push_back(id);
if (ids.size() > rnd() % 100000) {
auto it = rnd() % ids.size();
std::swap(ids.back(), ids[it]);
tqueue->forget(1, ids.back());
ids.pop_back();
}
now += 1;
if (i++ % 100000 == 0) {
LOG(ERROR) << td::BufferAllocator::get_buffer_mem() << " " << tqueue->get_size(1) << " "
<< td::BufferAllocator::get_buffer_slice_size();
}
}
}

0 comments on commit e75860b

Please sign in to comment.