Skip to content

Commit

Permalink
Merge 'loading_cache: force minimum size of unprivileged ' from Piotr…
Browse files Browse the repository at this point in the history
… Grabowski

This series enforces a minimum size of the unprivileged section when
performing `shrink()` operation.

When the cache is shrunk, we still drop entries first from unprivileged
section (as before this commit), however, if this section is already small
(smaller than `max_size / 2`), we will drop entries from the privileged
section.

This is necessary, as before this change the unprivileged section could
be starved. For example if the cache could store at most 50 entries and
there are 49 entries in privileged section, after adding 5 entries (that would
go to unprivileged section) 4 of them would get evicted and only the 5th one
would stay. This caused problems with BATCH statements where all
prepared statements in the batch have to stay in cache at the same time
for the batch to correctly execute.

To correctly check if the unprivileged section might get too small after
dropping an entry, `_current_size` variable, which tracked the overall size
of cache, is changed to two variables: `_unprivileged_section_size` and
`_privileged_section_size`, tracking section sizes separately.

New tests are added to check this new behavior and bookkeeping of the section
sizes. A test is added, that sets up a CQL environment with a very small
prepared statement cache, reproduces issue in scylladb#10440 and stresses the cache.

Fixes scylladb#10440.

Closes scylladb#10456

* github.com:scylladb/scylla:
  loading_cache_test: test prepared stmts cache
  loading_cache: force minimum size of unprivileged
  loading_cache: extract dropping entries to lambdas
  loading_cache: separately track size of sections
  loading_cache: fix typo in 'privileged'
  • Loading branch information
avikivity committed May 1, 2022
2 parents 325eb9b + 6537dc6 commit 5169ce4
Show file tree
Hide file tree
Showing 4 changed files with 307 additions and 17 deletions.
240 changes: 240 additions & 0 deletions test/boost/loading_cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "test/lib/tmpdir.hh"
#include "test/lib/log.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/cql_test_env.hh"

#include <vector>
#include <numeric>
Expand Down Expand Up @@ -428,6 +429,163 @@ SEASTAR_TEST_CASE(test_loading_cache_eviction_unprivileged) {
});
}

SEASTAR_TEST_CASE(test_loading_cache_eviction_unprivileged_minimum_size) {
return seastar::async([] {
// Test that unprivileged section is not starved.
//
// This scenario is tested: cache max_size is 50 and there are 49 entries in
// privileged section. After adding 5 elements (that go to unprivileged
// section) all of them should stay in unprivileged section and elements
// in privileged section should get evicted.
//
// Wrong handling of this situation caused problems with BATCH statements
// where all prepared statements in the batch have to stay in cache at
// the same time for the batch to correctly execute.

using namespace std::chrono;
utils::loading_cache<int, sstring, 1> loading_cache(50, 1h, testlog);
auto stop_cache_reload = seastar::defer([&loading_cache] { loading_cache.stop().get(); });

prepare().get();

// Add 49 elements to privileged section
for (int i = 0; i < 49; i++) {
// Touch the value with the key "i" twice
loading_cache.get_ptr(i, loader).discard_result().get();
loading_cache.find(i);
}

// Add 5 elements to unprivileged section
for (int i = 50; i < 55; i++) {
loading_cache.get_ptr(i, loader).discard_result().get();
}

// Make sure that none of 5 elements were evicted
for (int i = 50; i < 55; i++) {
BOOST_REQUIRE(loading_cache.find(i) != nullptr);
}

BOOST_REQUIRE_EQUAL(loading_cache.size(), 50);
});
}

struct sstring_length_entry_size {
size_t operator()(const sstring& val) {
return val.size();
}
};

SEASTAR_TEST_CASE(test_loading_cache_section_size_correctly_calculated) {
return seastar::async([] {
auto load_len1 = [] (const int& key) { return make_ready_future<sstring>(tests::random::get_sstring(1)); };
auto load_len5 = [] (const int& key) { return make_ready_future<sstring>(tests::random::get_sstring(5)); };
auto load_len10 = [] (const int& key) { return make_ready_future<sstring>(tests::random::get_sstring(10)); };
auto load_len95 = [] (const int& key) { return make_ready_future<sstring>(tests::random::get_sstring(95)); };

using namespace std::chrono;
utils::loading_cache<int, sstring, 1, utils::loading_cache_reload_enabled::no, sstring_length_entry_size> loading_cache(100, 1h, testlog);
auto stop_cache_reload = seastar::defer([&loading_cache] { loading_cache.stop().get(); });

BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 0);
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 0);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 0);

loading_cache.get_ptr(1, load_len1).discard_result().get();
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 0);
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 1);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 1);

loading_cache.get_ptr(2, load_len5).discard_result().get();
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 0);
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 6);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 2);

// Move "2" to privileged section by touching it the second time.
loading_cache.get_ptr(2, load_len5).discard_result().get();
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 5);
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 1);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 2);

loading_cache.get_ptr(3, load_len10).discard_result().get();
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 5);
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 11);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 3);

// Move "1" to privileged section. load_len10 should not get executed, as "1"
// is already in the cache.
loading_cache.get_ptr(1, load_len10).discard_result().get();
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 6);
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 10);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 3);

// Flood cache with elements of size 10,
// unprivileged. "1" and "2" should stay in the privileged section.
for (int i = 11; i < 30; i++) {
loading_cache.get_ptr(i, load_len10).discard_result().get();
}
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 6);
// We shrink the cache BEFORE adding element,
// so after adding the element, the cache
// can exceed max_size...
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 100);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 12);

// Flood cache with elements of size 10, privileged.
for (int i = 11; i < 30; i++) {
loading_cache.get_ptr(i, load_len10).discard_result().get();
loading_cache.get_ptr(i, load_len10).discard_result().get();
}
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 100);
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 0);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 10);

// Add one new unprivileged entry.
loading_cache.get_ptr(31, load_len1).discard_result().get();
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 90);
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 1);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 10);

// Add another unprivileged entry, privileged entry should get evicted.
loading_cache.get_ptr(32, load_len5).discard_result().get();
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 90);
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 6);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 11);

// Make it privileged by touching it again.
loading_cache.get_ptr(32, load_len5).discard_result().get();
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 95);
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 1);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 11);

// Add another unprivileged entry.
loading_cache.get_ptr(33, load_len10).discard_result().get();
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 95);
// We shrink the cache BEFORE adding element,
// so after adding the element, the cache
// can exceed max_size...
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 11);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 12);

// Add another unprivileged entry, privileged entry should get evicted.
loading_cache.get_ptr(34, load_len10).discard_result().get();
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 85);
// We shrink the cache BEFORE adding element,
// so after adding the element, the cache
// can exceed max_size...
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 21);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 12);

// Add a big unprivileged entry, filling almost entire cache.
loading_cache.get_ptr(35, load_len95).discard_result().get();
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 75);
// We shrink the cache BEFORE adding element,
// so after adding the element, the cache
// can exceed max_size...
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 95 + 21);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 12);
});
}

SEASTAR_TEST_CASE(test_loading_cache_reload_during_eviction) {
return seastar::async([] {
using namespace std::chrono;
Expand Down Expand Up @@ -533,3 +691,85 @@ SEASTAR_THREAD_TEST_CASE(test_loading_cache_remove_leaves_no_old_entries_behind)
ptr2 = nullptr;
}
}

SEASTAR_TEST_CASE(test_prepared_statement_small_cache) {
// CQL prepared statement cache uses loading_cache
// internally.
constexpr auto CACHE_SIZE = 950000;

cql_test_config small_cache_config;
small_cache_config.qp_mcfg = {CACHE_SIZE, CACHE_SIZE};
return do_with_cql_env_thread([](cql_test_env& e) {
e.execute_cql("CREATE TABLE tbl1 (a int, b int, PRIMARY KEY (a))").get();

auto current_uid = 0;

// Prepare 100 queries and execute them twice,
// filling "privileged section" of loading_cache.
std::vector<cql3::prepared_cache_key_type> prepared_ids_privileged;
for (int i = 0; i < 100; i++) {
auto prepared_id = e.prepare(fmt::format("SELECT * FROM tbl1 WHERE a = {}", current_uid++)).get0();
e.execute_prepared(prepared_id, {}).get();
e.execute_prepared(prepared_id, {}).get();
prepared_ids_privileged.push_back(prepared_id);
}

int how_many_in_cache = 0;
for (auto& prepared_id : prepared_ids_privileged) {
if (e.local_qp().get_prepared(prepared_id)) {
how_many_in_cache++;
}
}

// Assumption: CACHE_SIZE should hold at least 50 queries,
// but not more than 99 queries. Other checks in this
// test rely on that fact.
BOOST_REQUIRE(how_many_in_cache >= 50);
BOOST_REQUIRE(how_many_in_cache <= 99);

// Then prepare 5 queries and execute them one time,
// which will occupy "unprivileged section" of loading_cache.
std::vector<cql3::prepared_cache_key_type> prepared_ids_unprivileged;
for (int i = 0; i < 5; i++) {
auto prepared_id = e.prepare(fmt::format("SELECT * FROM tbl1 WHERE a = {}", current_uid++)).get0();
e.execute_prepared(prepared_id, {}).get();
prepared_ids_unprivileged.push_back(prepared_id);
}

// Check that all of those prepared queries can still be
// executed. This simulates as if you wanted to execute
// a BATCH with all of them, which requires all of those
// prepared statements to be executable (in the cache).
for (auto& prepared_id : prepared_ids_unprivileged) {
e.execute_prepared(prepared_id, {}).get();
}

// Deterministic random for reproducibility.
testing::local_random_engine.seed(12345);

// Prepare 500 queries and execute them a random number of times.
for (int i = 0; i < 500; i++) {
auto prepared_id = e.prepare(fmt::format("SELECT * FROM tbl1 WHERE a = {}", current_uid++)).get0();
auto times = rand_int(4);
for (int j = 0; j < times; j++) {
e.execute_prepared(prepared_id, {}).get();
}
}

// Prepare 100 simulated "batches" and execute them
// a random number of times.
for (int i = 0; i < 100; i++) {
std::vector<cql3::prepared_cache_key_type> prepared_ids_batch;
for (int j = 0; j < 5; j++) {
auto prepared_id = e.prepare(fmt::format("SELECT * FROM tbl1 WHERE a = {}", current_uid++)).get0();
prepared_ids_batch.push_back(prepared_id);
}
auto times = rand_int(4);
for (int j = 0; j < times; j++) {
for (auto& prepared_id : prepared_ids_batch) {
e.execute_prepared(prepared_id, {}).get();
}
}
}
}, small_cache_config);
}
7 changes: 6 additions & 1 deletion test/lib/cql_test_env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,12 @@ class single_node_cql_env : public cql_test_env {
mm.start(std::ref(mm_notif), std::ref(feature_service), std::ref(ms), std::ref(proxy), std::ref(gossiper), std::ref(raft_gr), std::ref(sys_ks)).get();
auto stop_mm = defer([&mm] { mm.stop().get(); });

cql3::query_processor::memory_config qp_mcfg = {memory::stats().total_memory() / 256, memory::stats().total_memory() / 2560};
cql3::query_processor::memory_config qp_mcfg;
if (cfg_in.qp_mcfg) {
qp_mcfg = *cfg_in.qp_mcfg;
} else {
qp_mcfg = {memory::stats().total_memory() / 256, memory::stats().total_memory() / 2560};
}
auto local_data_dict = seastar::sharded_parameter([] (const replica::database& db) { return db.as_data_dictionary(); }, std::ref(db));
qp.start(std::ref(proxy), std::ref(forward_service), std::move(local_data_dict), std::ref(mm_notif), std::ref(mm), qp_mcfg, std::ref(cql_config)).get();
auto stop_qp = defer([&qp] { qp.stop().get(); });
Expand Down
2 changes: 2 additions & 0 deletions test/lib/cql_test_env.hh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "cql3/query_options_fwd.hh"
#include "cql3/values.hh"
#include "cql3/prepared_statements_cache.hh"
#include "cql3/query_processor.hh"
#include "bytes.hh"
#include "schema.hh"
#include "test/lib/eventually.hh"
Expand Down Expand Up @@ -85,6 +86,7 @@ public:
// Scheduling groups are overwritten unconditionally, see get_scheduling_groups().
std::optional<replica::database_config> dbcfg;
std::set<sstring> disabled_features;
std::optional<cql3::query_processor::memory_config> qp_mcfg;

cql_test_config();
cql_test_config(const cql_test_config&);
Expand Down
Loading

0 comments on commit 5169ce4

Please sign in to comment.