forked from facebook/rocksdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwrite_buffer_manager.cc
126 lines (116 loc) · 4.9 KB
/
write_buffer_manager.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// This source code is also licensed under the GPLv2 license found in the
// COPYING file in the root directory of this source tree.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "rocksdb/write_buffer_manager.h"
#include <mutex>
#include "util/coding.h"
namespace rocksdb {
#ifndef ROCKSDB_LITE
namespace {
const size_t kSizeDummyEntry = 1024 * 1024;
// The key will be longer than keys for blocks in SST files so they won't
// conflict.
const size_t kCacheKeyPrefix = kMaxVarint64Length * 4 + 1;
} // namespace
struct WriteBufferManager::CacheRep {
std::shared_ptr<Cache> cache_;
std::mutex cache_mutex_;
std::atomic<size_t> cache_allocated_size_;
// The non-prefix part will be updated according to the ID to use.
char cache_key_[kCacheKeyPrefix + kMaxVarint64Length];
uint64_t next_cache_key_id_ = 0;
std::vector<Cache::Handle*> dummy_handles_;
explicit CacheRep(std::shared_ptr<Cache> cache)
: cache_(cache), cache_allocated_size_(0) {
memset(cache_key_, 0, kCacheKeyPrefix);
size_t pointer_size = sizeof(const void*);
assert(pointer_size <= kCacheKeyPrefix);
memcpy(cache_key_, static_cast<const void*>(this), pointer_size);
}
Slice GetNextCacheKey() {
memset(cache_key_ + kCacheKeyPrefix, 0, kMaxVarint64Length);
char* end =
EncodeVarint64(cache_key_ + kCacheKeyPrefix, next_cache_key_id_++);
return Slice(cache_key_, static_cast<size_t>(end - cache_key_));
}
};
#else
struct WriteBufferManager::CacheRep {};
#endif // ROCKSDB_LITE
WriteBufferManager::WriteBufferManager(size_t _buffer_size,
std::shared_ptr<Cache> cache)
: buffer_size_(_buffer_size),
mutable_limit_(buffer_size_ * 7 / 8),
memory_used_(0),
memory_active_(0),
cache_rep_(nullptr) {
#ifndef ROCKSDB_LITE
if (cache) {
// Construct the cache key using the pointer to this.
cache_rep_.reset(new CacheRep(cache));
}
#endif // ROCKSDB_LITE
}
WriteBufferManager::~WriteBufferManager() {
#ifndef ROCKSDB_LITE
if (cache_rep_) {
for (auto* handle : cache_rep_->dummy_handles_) {
cache_rep_->cache_->Release(handle, true);
}
}
#endif // ROCKSDB_LITE
}
// Should only be called from write thread
void WriteBufferManager::ReserveMemWithCache(size_t mem) {
#ifndef ROCKSDB_LITE
assert(cache_rep_ != nullptr);
// Use a mutex to protect various data structures. Can be optimzied to a
// lock-free solution if it ends up with a performance bottleneck.
std::lock_guard<std::mutex> lock(cache_rep_->cache_mutex_);
size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) + mem;
memory_used_.store(new_mem_used, std::memory_order_relaxed);
while (new_mem_used > cache_rep_->cache_allocated_size_) {
// Expand size by at least 1MB.
// Add a dummy record to the cache
Cache::Handle* handle;
cache_rep_->cache_->Insert(cache_rep_->GetNextCacheKey(), nullptr,
kSizeDummyEntry, nullptr, &handle);
cache_rep_->dummy_handles_.push_back(handle);
cache_rep_->cache_allocated_size_ += kSizeDummyEntry;
}
#endif // ROCKSDB_LITE
}
void WriteBufferManager::FreeMemWithCache(size_t mem) {
#ifndef ROCKSDB_LITE
assert(cache_rep_ != nullptr);
// Use a mutex to protect various data structures. Can be optimzied to a
// lock-free solution if it ends up with a performance bottleneck.
std::lock_guard<std::mutex> lock(cache_rep_->cache_mutex_);
size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) - mem;
memory_used_.store(new_mem_used, std::memory_order_relaxed);
// Gradually shrink memory costed in the block cache if the actual
// usage is less than 3/4 of what we reserve from the block cache.
// We do this becausse:
// 1. we don't pay the cost of the block cache immediately a memtable is
// freed, as block cache insert is expensive;
// 2. eventually, if we walk away from a temporary memtable size increase,
// we make sure shrink the memory costed in block cache over time.
// In this way, we only shrink costed memory showly even there is enough
// margin.
if (new_mem_used < cache_rep_->cache_allocated_size_ / 4 * 3 &&
cache_rep_->cache_allocated_size_ - kSizeDummyEntry > new_mem_used) {
assert(!cache_rep_->dummy_handles_.empty());
cache_rep_->cache_->Release(cache_rep_->dummy_handles_.back(), true);
cache_rep_->dummy_handles_.pop_back();
cache_rep_->cache_allocated_size_ -= kSizeDummyEntry;
}
#endif // ROCKSDB_LITE
}
} // namespace rocksdb