Skip to content

Commit

Permalink
Charge block cache for cache internal usage (facebook#5797)
Browse files Browse the repository at this point in the history
Summary:
For our default block cache, each additional entry has extra memory overhead. It include LRUHandle (72 bytes currently) and the cache key (two varint64, file id and offset). The usage is not negligible. For example for block_size=4k, the overhead accounts for an extra 2% memory usage for the cache. The patch charging the cache for the extra usage, reducing untracked memory usage outside block cache. The feature is enabled by default and can be disabled by passing kDontChargeCacheMetadata to the cache constructor.
This PR builds up on facebook#4258
Pull Request resolved: facebook#5797

Test Plan:
- Existing tests are updated to either disable the feature when the test has too much dependency on the old way of accounting the usage or increasing the cache capacity to account for the additional charge of metadata.
- The Usage tests in cache_test.cc are augmented to test the cache usage under kFullChargeCacheMetadata.

Differential Revision: D17396833

Pulled By: maysamyabandeh

fbshipit-source-id: 7684ccb9f8a40ca595e4f5efcdb03623afea0c6f
  • Loading branch information
Maysam Yabandeh authored and facebook-github-bot committed Sep 16, 2019
1 parent 94d62d7 commit 638d239
Show file tree
Hide file tree
Showing 25 changed files with 289 additions and 120 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* When user uses options.force_consistency_check in RocksDb, instead of crashing the process, we now pass the error back to the users without killing the process.
* Add an option `memtable_insert_hint_per_batch` to WriteOptions. If it is true, each WriteBatch will maintain its own insert hints for each memtable in concurrent write. See include/rocksdb/options.h for more details.
* The `sst_dump` command line tool `recompress` command now displays how many blocks were compressed and how many were not, in particular how many were not compressed because the compression ratio was not met (12.5% threshold for GoodCompressionRatio), as seen in the `number.block.not_compressed` counter stat since version 6.0.0.
* The block cache usage is now takes into account the overhead of metadata per each entry. This results into more accurate managment of memory. A side-effect of this feature is that less items are fit into the block cache of the same size, which would result to higher cache miss rates. This can be remedied by increasing the block cache size or passing kDontChargeCacheMetadata to its constuctor to restore the old behavior.
### Public API Change
* Added max_write_buffer_size_to_maintain option to better control memory usage of immutable memtables.
* Added a lightweight API GetCurrentWalFile() to get last live WAL filename and size. Meant to be used as a helper for backup/restore tooling in a larger ecosystem such as MySQL with a MyRocks storage engine.
Expand Down
84 changes: 72 additions & 12 deletions cache/cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,22 @@ class CacheTest : public testing::TestWithParam<std::string> {
return nullptr;
}

std::shared_ptr<Cache> NewCache(size_t capacity, int num_shard_bits,
bool strict_capacity_limit) {
std::shared_ptr<Cache> NewCache(
size_t capacity, int num_shard_bits, bool strict_capacity_limit,
CacheMetadataChargePolicy charge_policy = kDontChargeCacheMetadata) {
auto type = GetParam();
if (type == kLRU) {
return NewLRUCache(capacity, num_shard_bits, strict_capacity_limit, 0.0);
LRUCacheOptions co;
co.capacity = capacity;
co.num_shard_bits = num_shard_bits;
co.strict_capacity_limit = strict_capacity_limit;
co.high_pri_pool_ratio = 0;
co.metadata_charge_policy = charge_policy;
return NewLRUCache(co);
}
if (type == kClock) {
return NewClockCache(capacity, num_shard_bits, strict_capacity_limit);
return NewClockCache(capacity, num_shard_bits, strict_capacity_limit,
charge_policy);
}
return nullptr;
}
Expand Down Expand Up @@ -143,10 +151,15 @@ class CacheTest : public testing::TestWithParam<std::string> {
};
CacheTest* CacheTest::current_;

class LRUCacheTest : public CacheTest {};

TEST_P(CacheTest, UsageTest) {
// cache is std::shared_ptr and will be automatically cleaned up.
const uint64_t kCapacity = 100000;
auto cache = NewCache(kCapacity, 8, false);
auto cache = NewCache(kCapacity, 8, false, kDontChargeCacheMetadata);
auto precise_cache = NewCache(kCapacity, 0, false, kFullChargeCacheMetadata);
ASSERT_EQ(0, cache->GetUsage());
ASSERT_EQ(0, precise_cache->GetUsage());

size_t usage = 0;
char value[10] = "abcdef";
Expand All @@ -155,72 +168,118 @@ TEST_P(CacheTest, UsageTest) {
std::string key(i, 'a');
auto kv_size = key.size() + 5;
cache->Insert(key, reinterpret_cast<void*>(value), kv_size, dumbDeleter);
precise_cache->Insert(key, reinterpret_cast<void*>(value), kv_size,
dumbDeleter);
usage += kv_size;
ASSERT_EQ(usage, cache->GetUsage());
ASSERT_LT(usage, precise_cache->GetUsage());
}

cache->EraseUnRefEntries();
precise_cache->EraseUnRefEntries();
ASSERT_EQ(0, cache->GetUsage());
ASSERT_EQ(0, precise_cache->GetUsage());

// make sure the cache will be overloaded
for (uint64_t i = 1; i < kCapacity; ++i) {
auto key = ToString(i);
cache->Insert(key, reinterpret_cast<void*>(value), key.size() + 5,
dumbDeleter);
precise_cache->Insert(key, reinterpret_cast<void*>(value), key.size() + 5,
dumbDeleter);
}

// the usage should be close to the capacity
ASSERT_GT(kCapacity, cache->GetUsage());
ASSERT_GT(kCapacity, precise_cache->GetUsage());
ASSERT_LT(kCapacity * 0.95, cache->GetUsage());
ASSERT_LT(kCapacity * 0.95, precise_cache->GetUsage());
}

TEST_P(CacheTest, PinnedUsageTest) {
// cache is std::shared_ptr and will be automatically cleaned up.
const uint64_t kCapacity = 100000;
auto cache = NewCache(kCapacity, 8, false);
const uint64_t kCapacity = 200000;
auto cache = NewCache(kCapacity, 8, false, kDontChargeCacheMetadata);
auto precise_cache = NewCache(kCapacity, 8, false, kFullChargeCacheMetadata);

size_t pinned_usage = 0;
char value[10] = "abcdef";

std::forward_list<Cache::Handle*> unreleased_handles;
std::forward_list<Cache::Handle*> unreleased_handles_in_precise_cache;

// Add entries. Unpin some of them after insertion. Then, pin some of them
// again. Check GetPinnedUsage().
for (int i = 1; i < 100; ++i) {
std::string key(i, 'a');
auto kv_size = key.size() + 5;
Cache::Handle* handle;
Cache::Handle* handle_in_precise_cache;
cache->Insert(key, reinterpret_cast<void*>(value), kv_size, dumbDeleter,
&handle);
assert(handle);
precise_cache->Insert(key, reinterpret_cast<void*>(value), kv_size,
dumbDeleter, &handle_in_precise_cache);
assert(handle_in_precise_cache);
pinned_usage += kv_size;
ASSERT_EQ(pinned_usage, cache->GetPinnedUsage());
ASSERT_LT(pinned_usage, precise_cache->GetPinnedUsage());
if (i % 2 == 0) {
cache->Release(handle);
precise_cache->Release(handle_in_precise_cache);
pinned_usage -= kv_size;
ASSERT_EQ(pinned_usage, cache->GetPinnedUsage());
ASSERT_LT(pinned_usage, precise_cache->GetPinnedUsage());
} else {
unreleased_handles.push_front(handle);
unreleased_handles_in_precise_cache.push_front(handle_in_precise_cache);
}
if (i % 3 == 0) {
unreleased_handles.push_front(cache->Lookup(key));
auto x = precise_cache->Lookup(key);
assert(x);
unreleased_handles_in_precise_cache.push_front(x);
// If i % 2 == 0, then the entry was unpinned before Lookup, so pinned
// usage increased
if (i % 2 == 0) {
pinned_usage += kv_size;
}
ASSERT_EQ(pinned_usage, cache->GetPinnedUsage());
ASSERT_LT(pinned_usage, precise_cache->GetPinnedUsage());
}
}
auto precise_cache_pinned_usage = precise_cache->GetPinnedUsage();
ASSERT_LT(pinned_usage, precise_cache_pinned_usage);

// check that overloading the cache does not change the pinned usage
for (uint64_t i = 1; i < 2 * kCapacity; ++i) {
auto key = ToString(i);
cache->Insert(key, reinterpret_cast<void*>(value), key.size() + 5,
dumbDeleter);
precise_cache->Insert(key, reinterpret_cast<void*>(value), key.size() + 5,
dumbDeleter);
}
ASSERT_EQ(pinned_usage, cache->GetPinnedUsage());
ASSERT_EQ(precise_cache_pinned_usage, precise_cache->GetPinnedUsage());

cache->EraseUnRefEntries();
precise_cache->EraseUnRefEntries();
ASSERT_EQ(pinned_usage, cache->GetPinnedUsage());
ASSERT_EQ(precise_cache_pinned_usage, precise_cache->GetPinnedUsage());

// release handles for pinned entries to prevent memory leaks
for (auto handle : unreleased_handles) {
cache->Release(handle);
}
for (auto handle : unreleased_handles_in_precise_cache) {
precise_cache->Release(handle);
}
ASSERT_EQ(0, cache->GetPinnedUsage());
ASSERT_EQ(0, precise_cache->GetPinnedUsage());
cache->EraseUnRefEntries();
precise_cache->EraseUnRefEntries();
ASSERT_EQ(0, cache->GetUsage());
ASSERT_EQ(0, precise_cache->GetUsage());
}

TEST_P(CacheTest, HitAndMiss) {
Expand Down Expand Up @@ -550,10 +609,10 @@ TEST_P(CacheTest, SetCapacity) {
}
}

TEST_P(CacheTest, SetStrictCapacityLimit) {
TEST_P(LRUCacheTest, SetStrictCapacityLimit) {
// test1: set the flag to false. Insert more keys than capacity. See if they
// all go through.
std::shared_ptr<Cache> cache = NewLRUCache(5, 0, false);
std::shared_ptr<Cache> cache = NewCache(5, 0, false);
std::vector<Cache::Handle*> handles(10);
Status s;
for (size_t i = 0; i < 10; i++) {
Expand All @@ -579,7 +638,7 @@ TEST_P(CacheTest, SetStrictCapacityLimit) {
}

// test3: init with flag being true.
std::shared_ptr<Cache> cache2 = NewLRUCache(5, 0, true);
std::shared_ptr<Cache> cache2 = NewCache(5, 0, true);
for (size_t i = 0; i < 5; i++) {
std::string key = ToString(i + 1);
s = cache2->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]);
Expand Down Expand Up @@ -697,13 +756,14 @@ TEST_P(CacheTest, GetCharge) {
}

#ifdef SUPPORT_CLOCK_CACHE
std::shared_ptr<Cache> (*new_clock_cache_func)(size_t, int,
bool) = NewClockCache;
std::shared_ptr<Cache> (*new_clock_cache_func)(
size_t, int, bool, CacheMetadataChargePolicy) = NewClockCache;
INSTANTIATE_TEST_CASE_P(CacheTestInstance, CacheTest,
testing::Values(kLRU, kClock));
#else
INSTANTIATE_TEST_CASE_P(CacheTestInstance, CacheTest, testing::Values(kLRU));
#endif // SUPPORT_CLOCK_CACHE
INSTANTIATE_TEST_CASE_P(CacheTestInstance, LRUCacheTest, testing::Values(kLRU));

} // namespace rocksdb

Expand Down
59 changes: 46 additions & 13 deletions cache/clock_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@

namespace rocksdb {

std::shared_ptr<Cache> NewClockCache(size_t /*capacity*/, int /*num_shard_bits*/,
bool /*strict_capacity_limit*/) {
std::shared_ptr<Cache> NewClockCache(
size_t /*capacity*/, int /*num_shard_bits*/, bool /*strict_capacity_limit*/,
CacheMetadataChargePolicy /*metadata_charge_policy*/) {
// Clock cache not supported.
return nullptr;
}
Expand All @@ -35,6 +36,7 @@ std::shared_ptr<Cache> NewClockCache(size_t /*capacity*/, int /*num_shard_bits*/
#include "tbb/concurrent_hash_map.h"

#include "cache/sharded_cache.h"
#include "port/malloc.h"
#include "port/port.h"
#include "util/autovector.h"
#include "util/mutexlock.h"
Expand Down Expand Up @@ -202,6 +204,27 @@ struct CacheHandle {
deleter = a.deleter;
return *this;
}

inline static size_t CalcTotalCharge(
Slice key, size_t charge,
CacheMetadataChargePolicy metadata_charge_policy) {
size_t meta_charge = 0;
if (metadata_charge_policy == kFullChargeCacheMetadata) {
meta_charge += sizeof(CacheHandle);
#ifdef ROCKSDB_MALLOC_USABLE_SIZE
meta_charge +=
malloc_usable_size(static_cast<void*>(const_cast<char*>(key.data())));
#else
meta_charge += key.size();
#endif
}
return charge + meta_charge;
}

inline size_t CalcTotalCharge(
CacheMetadataChargePolicy metadata_charge_policy) {
return CalcTotalCharge(key, charge, metadata_charge_policy);
}
};

// Key of hash map. We store hash value with the key for convenience.
Expand Down Expand Up @@ -404,11 +427,12 @@ void ClockCacheShard::RecycleHandle(CacheHandle* handle,
assert(!InCache(handle->flags) && CountRefs(handle->flags) == 0);
context->to_delete_key.push_back(handle->key.data());
context->to_delete_value.emplace_back(*handle);
size_t total_charge = handle->CalcTotalCharge(metadata_charge_policy_);
handle->key.clear();
handle->value = nullptr;
handle->deleter = nullptr;
recycle_.push_back(handle);
usage_.fetch_sub(handle->charge, std::memory_order_relaxed);
usage_.fetch_sub(total_charge, std::memory_order_relaxed);
}

void ClockCacheShard::Cleanup(const CleanupContext& context) {
Expand All @@ -434,7 +458,8 @@ bool ClockCacheShard::Ref(Cache::Handle* h) {
std::memory_order_relaxed)) {
if (CountRefs(flags) == 0) {
// No reference count before the operation.
pinned_usage_.fetch_add(handle->charge, std::memory_order_relaxed);
size_t total_charge = handle->CalcTotalCharge(metadata_charge_policy_);
pinned_usage_.fetch_add(total_charge, std::memory_order_relaxed);
}
return true;
}
Expand All @@ -454,7 +479,8 @@ bool ClockCacheShard::Unref(CacheHandle* handle, bool set_usage,
assert(CountRefs(flags) > 0);
if (CountRefs(flags) == 1) {
// this is the last reference.
pinned_usage_.fetch_sub(handle->charge, std::memory_order_relaxed);
size_t total_charge = handle->CalcTotalCharge(metadata_charge_policy_);
pinned_usage_.fetch_sub(total_charge, std::memory_order_relaxed);
// Cleanup if it is the last reference.
if (!InCache(flags)) {
MutexLock l(&mutex_);
Expand Down Expand Up @@ -539,8 +565,10 @@ CacheHandle* ClockCacheShard::Insert(
const Slice& key, uint32_t hash, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value), bool hold_reference,
CleanupContext* context) {
size_t total_charge =
CacheHandle::CalcTotalCharge(key, charge, metadata_charge_policy_);
MutexLock l(&mutex_);
bool success = EvictFromCache(charge, context);
bool success = EvictFromCache(total_charge, context);
bool strict = strict_capacity_limit_.load(std::memory_order_relaxed);
if (!success && (strict || !hold_reference)) {
context->to_delete_key.push_back(key.data());
Expand Down Expand Up @@ -575,9 +603,9 @@ CacheHandle* ClockCacheShard::Insert(
}
table_.insert(HashTable::value_type(CacheKey(key, hash), handle));
if (hold_reference) {
pinned_usage_.fetch_add(charge, std::memory_order_relaxed);
pinned_usage_.fetch_add(total_charge, std::memory_order_relaxed);
}
usage_.fetch_add(charge, std::memory_order_relaxed);
usage_.fetch_add(total_charge, std::memory_order_relaxed);
return handle;
}

Expand Down Expand Up @@ -674,10 +702,14 @@ void ClockCacheShard::EraseUnRefEntries() {

class ClockCache final : public ShardedCache {
public:
ClockCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit)
ClockCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit,
CacheMetadataChargePolicy metadata_charge_policy)
: ShardedCache(capacity, num_shard_bits, strict_capacity_limit) {
int num_shards = 1 << num_shard_bits;
shards_ = new ClockCacheShard[num_shards];
for (int i = 0; i < num_shards; i++) {
shards_[i].set_metadata_charge_policy(metadata_charge_policy);
}
SetCapacity(capacity);
SetStrictCapacityLimit(strict_capacity_limit);
}
Expand Down Expand Up @@ -714,13 +746,14 @@ class ClockCache final : public ShardedCache {

} // end anonymous namespace

std::shared_ptr<Cache> NewClockCache(size_t capacity, int num_shard_bits,
bool strict_capacity_limit) {
std::shared_ptr<Cache> NewClockCache(
size_t capacity, int num_shard_bits, bool strict_capacity_limit,
CacheMetadataChargePolicy metadata_charge_policy) {
if (num_shard_bits < 0) {
num_shard_bits = GetDefaultCacheShardBits(capacity);
}
return std::make_shared<ClockCache>(capacity, num_shard_bits,
strict_capacity_limit);
return std::make_shared<ClockCache>(
capacity, num_shard_bits, strict_capacity_limit, metadata_charge_policy);
}

} // namespace rocksdb
Expand Down
Loading

0 comments on commit 638d239

Please sign in to comment.