Skip to content

Commit

Permalink
compaction: TWCS: wire up compaction_strategy_state
Browse files Browse the repository at this point in the history
TWCS no longer keeps internal state, and will now rely on state
managed by each compaction group through compaction::table_state.

Signed-off-by: Raphael S. Carvalho <[email protected]>
  • Loading branch information
raphaelsc committed Mar 28, 2023
1 parent 233fe6d commit 989afbf
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 15 deletions.
26 changes: 17 additions & 9 deletions compaction/time_window_compaction_strategy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "leveled_manifest.hh"
#include "mutation_writer/timestamp_based_splitting_writer.hh"
#include "mutation/mutation_source_metadata.hh"
#include "compaction_strategy_state.hh"

#include <boost/range/algorithm/find.hpp>
#include <boost/range/algorithm/remove_if.hpp>
Expand All @@ -21,6 +22,10 @@ namespace sstables {

extern logging::logger clogger;

time_window_compaction_strategy_state& time_window_compaction_strategy::get_state(table_state& table_s) const {
return table_s.get_compaction_strategy_state().get<time_window_compaction_strategy_state>();
}

time_window_compaction_strategy_options::time_window_compaction_strategy_options(const std::map<sstring, sstring>& options) {
std::chrono::seconds window_unit = DEFAULT_COMPACTION_WINDOW_UNIT;
int window_size = DEFAULT_COMPACTION_WINDOW_SIZE;
Expand Down Expand Up @@ -218,15 +223,16 @@ time_window_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> i

compaction_descriptor
time_window_compaction_strategy::get_sstables_for_compaction(table_state& table_s, strategy_control& control, std::vector<shared_sstable> candidates) {
auto& state = get_state(table_s);
auto compaction_time = gc_clock::now();

if (candidates.empty()) {
_state.estimated_remaining_tasks = 0;
state.estimated_remaining_tasks = 0;
return compaction_descriptor();
}

auto now = db_clock::now();
if (now - _state.last_expired_check > _options.expired_sstable_check_frequency) {
if (now - state.last_expired_check > _options.expired_sstable_check_frequency) {
clogger.debug("[{}] TWCS expired check sufficiently far in the past, checking for fully expired SSTables", fmt::ptr(this));

// Find fully expired SSTables. Those will be included no matter what.
Expand All @@ -238,7 +244,7 @@ time_window_compaction_strategy::get_sstables_for_compaction(table_state& table_
// Keep checking for fully_expired_sstables until we don't find
// any among the candidates, meaning they are either already compacted
// or registered for compaction.
_state.last_expired_check = now;
state.last_expired_check = now;
} else {
clogger.debug("[{}] TWCS skipping check for fully expired SSTables", fmt::ptr(this));
}
Expand Down Expand Up @@ -290,14 +296,15 @@ time_window_compaction_strategy::get_next_non_expired_sstables(table_state& tabl

std::vector<shared_sstable>
time_window_compaction_strategy::get_compaction_candidates(table_state& table_s, strategy_control& control, std::vector<shared_sstable> candidate_sstables) {
auto& state = get_state(table_s);
auto p = get_buckets(std::move(candidate_sstables), _options);
// Update the highest window seen, if necessary
_state.highest_window_seen = std::max(_state.highest_window_seen, p.second);
state.highest_window_seen = std::max(state.highest_window_seen, p.second);

update_estimated_compaction_by_tasks(_state, p.first, table_s.min_compaction_threshold(), table_s.schema()->max_compaction_threshold());
update_estimated_compaction_by_tasks(state, p.first, table_s.min_compaction_threshold(), table_s.schema()->max_compaction_threshold());

return newest_bucket(table_s, control, std::move(p.first), table_s.min_compaction_threshold(), table_s.schema()->max_compaction_threshold(),
_state.highest_window_seen);
state.highest_window_seen);
}

timestamp_type
Expand Down Expand Up @@ -341,6 +348,7 @@ static std::ostream& operator<<(std::ostream& os, const std::map<timestamp_type,
std::vector<shared_sstable>
time_window_compaction_strategy::newest_bucket(table_state& table_s, strategy_control& control, std::map<timestamp_type, std::vector<shared_sstable>> buckets,
int min_threshold, int max_threshold, timestamp_type now) {
auto& state = get_state(table_s);
clogger.debug("time_window_compaction_strategy::newest_bucket:\n now {}\n{}", now, buckets);

for (auto&& key_bucket : buckets | boost::adaptors::reversed) {
Expand All @@ -349,9 +357,9 @@ time_window_compaction_strategy::newest_bucket(table_state& table_s, strategy_co

bool last_active_bucket = is_last_active_bucket(key, now);
if (last_active_bucket) {
_state.recent_active_windows.insert(key);
state.recent_active_windows.insert(key);
}
switch (compaction_mode(_state, bucket, key, now, min_threshold)) {
switch (compaction_mode(state, bucket, key, now, min_threshold)) {
case bucket_compaction_mode::size_tiered: {
// If we're in the newest bucket, we'll use STCS to prioritize sstables.
auto stcs_interesting_bucket = size_tiered_compaction_strategy::most_interesting_bucket(bucket, min_threshold, max_threshold, _stcs_options);
Expand All @@ -375,7 +383,7 @@ time_window_compaction_strategy::newest_bucket(table_state& table_s, strategy_co
// after that, they will fall into default mode where we'll stop considering them as a recent window
// which needs major. That's to avoid terrible writeamp as streaming may push data into older windows.
if (!last_active_bucket) {
_state.recent_active_windows.erase(key);
state.recent_active_windows.erase(key);
}
clogger.debug("No compaction necessary for bucket size {} , key {}, now {}", bucket.size(), key, now);
break;
Expand Down
5 changes: 3 additions & 2 deletions compaction/time_window_compaction_strategy.hh
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ struct time_window_compaction_strategy_state {
class time_window_compaction_strategy : public compaction_strategy_impl {
time_window_compaction_strategy_options _options;
size_tiered_compaction_strategy_options _stcs_options;
time_window_compaction_strategy_state _state;
public:
// The maximum amount of buckets we segregate data into when writing into sstables.
// To prevent an explosion in the number of sstables we cap it.
Expand All @@ -91,6 +90,8 @@ public:

virtual std::vector<compaction_descriptor> get_cleanup_compaction_jobs(table_state& table_s, std::vector<shared_sstable> candidates) const override;
private:
time_window_compaction_strategy_state& get_state(table_state& table_s) const;

static timestamp_type
to_timestamp_type(time_window_compaction_strategy_options::timestamp_resolutions resolution, int64_t timestamp_from_sstable) {
switch (resolution) {
Expand Down Expand Up @@ -151,7 +152,7 @@ private:
friend class time_window_backlog_tracker;
public:
virtual int64_t estimated_pending_compactions(table_state& table_s) const override {
return _state.estimated_remaining_tasks;
return get_state(table_s).estimated_remaining_tasks;
}

virtual compaction_strategy_type type() const override {
Expand Down
12 changes: 8 additions & 4 deletions test/boost/sstable_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1475,9 +1475,11 @@ SEASTAR_TEST_CASE(time_window_strategy_correctness_test) {
using namespace std::chrono;

return test_env::do_with_async([] (test_env& env) {
auto s = schema_builder("tests", "time_window_strategy")
auto builder = schema_builder("tests", "time_window_strategy")
.with_column("id", utf8_type, column_kind::partition_key)
.with_column("value", int32_type).build();
.with_column("value", int32_type);
builder.set_compaction_strategy(sstables::compaction_strategy_type::time_window);
auto s = builder.build();

auto make_insert = [&] (partition_key key, api::timestamp_type t) {
mutation m(s, key);
Expand Down Expand Up @@ -1572,9 +1574,11 @@ SEASTAR_TEST_CASE(time_window_strategy_size_tiered_behavior_correctness) {
using namespace std::chrono;

return test_env::do_with_async([] (test_env& env) {
auto s = schema_builder("tests", "time_window_strategy")
auto builder = schema_builder("tests", "time_window_strategy")
.with_column("id", utf8_type, column_kind::partition_key)
.with_column("value", int32_type).build();
.with_column("value", int32_type);
builder.set_compaction_strategy(sstables::compaction_strategy_type::time_window);
auto s = builder.build();

auto sst_gen = env.make_sst_factory(s);

Expand Down

0 comments on commit 989afbf

Please sign in to comment.