Skip to content

Commit

Permalink
Merge "allow early aborts through abort sources." from Glauber
Browse files Browse the repository at this point in the history
"
The shutdown process of compaction manager starts with an explicit call
from the database object. However that can only happen everything is
already initialized. This works well today, but I am soon to change
the resharding process to operate before the node is fully ready.

One can still stop the database in this case, but reshardings will
have to finish before the abort signal is processed.

This patch passes the existing abort source to the construction of the
compaction_manager and subscribes to it. If the abort source is
triggered, the compaction manager will react to it firing and all
compactions it manages will be stopped.

We still want the database object to be able to wait for the compaction
manager, since the database is the object that owns the lifetime of
the compaction manager. To make that possible we'll use a future
that is return from stop(): no matter what triggered the abort, either
an early abort during initial resharding or a database-level event like
drain, everything will shut down in the right order.

The abort source is passed to the database, who is responsible from
constructing the compaction manager

Tests: unit (debug), manual start+stop, manual drain + stop, previously
       failing dtests.
"
  • Loading branch information
avikivity committed May 17, 2020
2 parents 777d5e8 + 7423ccc commit b155eef
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 61 deletions.
11 changes: 5 additions & 6 deletions database.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,11 @@ make_flush_controller(const db::config& cfg, seastar::scheduling_group sg, const

inline
std::unique_ptr<compaction_manager>
make_compaction_manager(const db::config& cfg, database_config& dbcfg) {
make_compaction_manager(const db::config& cfg, database_config& dbcfg, abort_source& as) {
if (cfg.compaction_static_shares() > 0) {
return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), dbcfg.available_memory, cfg.compaction_static_shares());
return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), dbcfg.available_memory, cfg.compaction_static_shares(), as);
}
return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), dbcfg.available_memory);
return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), dbcfg.available_memory, as);
}

lw_shared_ptr<keyspace_metadata>
Expand Down Expand Up @@ -161,7 +161,7 @@ void keyspace::remove_user_type(const user_type ut) {

utils::UUID database::empty_version = utils::UUID_gen::get_name_UUID(bytes{});

database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::token_metadata& tm)
database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::token_metadata& tm, abort_source& as)
: _stats(make_lw_shared<db_stats>())
, _cl_stats(std::make_unique<cell_locker_stats>())
, _cfg(cfg)
Expand Down Expand Up @@ -198,7 +198,7 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
, _mutation_query_stage()
, _apply_stage("db_apply", &database::do_apply)
, _version(empty_version)
, _compaction_manager(make_compaction_manager(_cfg, dbcfg))
, _compaction_manager(make_compaction_manager(_cfg, dbcfg, as))
, _enable_incremental_backups(cfg.incremental_backups())
, _querier_cache(_read_concurrency_sem, dbcfg.available_memory * 0.04)
, _large_data_handler(std::make_unique<db::cql_table_large_data_handler>(_cfg.compaction_large_partition_warning_threshold_mb()*1024*1024,
Expand Down Expand Up @@ -1751,7 +1751,6 @@ future<> database::stop_large_data_handler() {
future<>
database::stop() {
assert(!_large_data_handler->running());
assert(_compaction_manager->stopped());

// try to ensure that CL has done disk flushing
future<> maybe_shutdown_commitlog = _commitlog != nullptr ? _commitlog->shutdown() : make_ready_future<>();
Expand Down
2 changes: 1 addition & 1 deletion database.hh
Original file line number Diff line number Diff line change
Expand Up @@ -1433,7 +1433,7 @@ public:
void set_enable_incremental_backups(bool val) { _enable_incremental_backups = val; }

future<> parse_system_tables(distributed<service::storage_proxy>&, distributed<service::migration_manager>&);
database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::token_metadata& tm);
database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::token_metadata& tm, abort_source& as);
database(database&&) = delete;
~database();

Expand Down
2 changes: 1 addition & 1 deletion distributed_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ void distributed_loader::reshard(distributed<database>& db, sstring ks_name, sst
return seastar::async([&db, ks_name = std::move(ks_name), cf_name = std::move(cf_name)] () mutable {
global_column_family_ptr cf(db, ks_name, cf_name);

if (cf->get_compaction_manager().stopped()) {
if (!cf->get_compaction_manager().enabled()) {
return;
}
// fast path to detect that this column family doesn't need reshard.
Expand Down
10 changes: 2 additions & 8 deletions main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ int main(int ac, char** av) {
dbcfg.memtable_scheduling_group = make_sched_group("memtable", 1000);
dbcfg.memtable_to_cache_scheduling_group = make_sched_group("memtable_to_cache", 200);
dbcfg.available_memory = memory::stats().total_memory();
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notifier), std::ref(feature_service), std::ref(token_metadata)).get();
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notifier), std::ref(feature_service), std::ref(token_metadata), std::ref(stop_signal.as_sharded_abort_source())).get();
start_large_data_handler(db).get();
auto stop_database_and_sstables = defer_verbose_shutdown("database", [&db] {
// #293 - do not stop anything - not even db (for real)
Expand Down Expand Up @@ -916,7 +916,7 @@ int main(int ac, char** av) {
}

db.invoke_on_all([&proxy] (database& db) {
db.get_compaction_manager().start();
db.get_compaction_manager().enable();
}).get();

// If the same sstable is shared by several shards, it cannot be
Expand Down Expand Up @@ -1188,12 +1188,6 @@ int main(int ac, char** av) {
}
});

auto stop_compaction_manager = defer_verbose_shutdown("compaction manager", [&db] {
db.invoke_on_all([](auto& db) {
return db.get_compaction_manager().stop();
}).get();
});

auto stop_redis_service = defer_verbose_shutdown("redis service", [&cfg] {
if (cfg->redis_port() || cfg->redis_ssl_port()) {
redis.stop().get();
Expand Down
2 changes: 1 addition & 1 deletion service/storage_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2610,7 +2610,7 @@ future<> storage_service::drain() {

// Interrupt on going compaction and shutdown to prevent further compaction
ss.db().invoke_on_all([] (auto& db) {
return db.get_compaction_manager().stop();
return db.get_compaction_manager().drain();
}).get();

ss.set_mode(mode::DRAINING, "flushing column families", false);
Expand Down
102 changes: 73 additions & 29 deletions sstables/compaction_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ class user_initiated_backlog_tracker final : public compaction_backlog_tracker::
};

future<> compaction_manager::submit_major_compaction(column_family* cf) {
if (_stopped) {
if (_state != state::enabled) {
return make_ready_future<>();
}
auto task = make_lw_shared<compaction_manager::task>();
Expand Down Expand Up @@ -310,7 +310,7 @@ future<> compaction_manager::submit_major_compaction(column_family* cf) {
}

future<> compaction_manager::run_resharding_job(column_family* cf, std::function<future<>()> job) {
if (_stopped) {
if (_state != state::enabled) {
return make_ready_future<>();
}

Expand Down Expand Up @@ -357,7 +357,7 @@ future<> compaction_manager::task_stop(lw_shared_ptr<compaction_manager::task> t
});
}

compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory)
compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, abort_source& as)
: _compaction_controller(sg, iop, 250ms, [this, available_memory] () -> float {
auto b = backlog() / available_memory;
// This means we are using an unimplemented strategy
Expand All @@ -372,23 +372,39 @@ compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_
, _backlog_manager(_compaction_controller)
, _scheduling_group(_compaction_controller.sg())
, _available_memory(available_memory)
{}
, _early_abort_subscription(as.subscribe([this] {
do_stop();
}))
{
register_metrics();
}

compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, uint64_t shares)
compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, uint64_t shares, abort_source& as)
: _compaction_controller(sg, iop, shares)
, _backlog_manager(_compaction_controller)
, _scheduling_group(_compaction_controller.sg())
, _available_memory(available_memory)
{}
, _available_memory(available_memory)
, _early_abort_subscription(as.subscribe([this] {
do_stop();
}))
{
register_metrics();
}

compaction_manager::compaction_manager()
: compaction_manager(seastar::default_scheduling_group(), default_priority_class(), 1)
{}
: _compaction_controller(seastar::default_scheduling_group(), default_priority_class(), 1)
, _backlog_manager(_compaction_controller)
, _scheduling_group(_compaction_controller.sg())
, _available_memory(1)
{
// No metric registration because this constructor is supposed to be used only by the testing
// infrastructure.
}

compaction_manager::~compaction_manager() {
// Assert that compaction manager was explicitly stopped, if started.
// Otherwise, fiber(s) will be alive after the object is destroyed.
assert(_stopped == true);
// Otherwise, fiber(s) will be alive after the object is stopped.
assert(_state == state::none || _state == state::stopped);
}

void compaction_manager::register_metrics() {
Expand All @@ -402,13 +418,19 @@ void compaction_manager::register_metrics() {
});
}

void compaction_manager::start() {
_stopped = false;
register_metrics();
void compaction_manager::enable() {
assert(_state == state::none || _state == state::disabled);
_state = state::enabled;
_compaction_submission_timer.arm(periodic_compaction_submission_interval());
postponed_compactions_reevaluation();
}

void compaction_manager::disable() {
assert(_state == state::none || _state == state::enabled);
_state = state::disabled;
_compaction_submission_timer.cancel();
}

std::function<void()> compaction_manager::compaction_submission_callback() {
return [this] () mutable {
for (auto& e: _compaction_locks) {
Expand All @@ -420,7 +442,7 @@ std::function<void()> compaction_manager::compaction_submission_callback() {
void compaction_manager::postponed_compactions_reevaluation() {
_waiting_reevalution = repeat([this] {
return _postponed_reevaluation.wait().then([this] {
if (_stopped) {
if (_state != state::enabled) {
_postponed.clear();
return stop_iteration::yes;
}
Expand All @@ -445,38 +467,61 @@ void compaction_manager::postpone_compaction_for_column_family(column_family* cf
_postponed.push_back(cf);
}

future<> compaction_manager::stop() {
if (_stopped) {
return make_ready_future<>();
}
cmlog.info("Asked to stop");
_stopped = true;
// Reset the metrics registry
_metrics.clear();
future<> compaction_manager::stop_ongoing_compactions(sstring reason) {
cmlog.info("Stopping {} ongoing compactions", _compactions.size());

// Stop all ongoing compaction.
for (auto& info : _compactions) {
info->stop("shutdown");
info->stop(reason);
}

// Wait for each task handler to stop. Copy list because task remove itself
// from the list when done.
auto tasks = _tasks;
return do_with(std::move(tasks), [this] (std::list<lw_shared_ptr<task>>& tasks) {
return parallel_for_each(tasks, [this] (auto& task) {
return this->task_stop(task);
});
}).then([this] () mutable {
});
}

future<> compaction_manager::drain() {
_state = state::disabled;
return stop_ongoing_compactions("drain");
}

future<> compaction_manager::stop() {
// never started
if (_state == state::none) {
return make_ready_future<>();
} else {
do_stop();
return std::move(*_stop_future);
}
}

void compaction_manager::do_stop() {
if (_state == state::none || _state == state::stopped) {
return;
}

_state = state::stopped;
cmlog.info("Asked to stop");
// Reset the metrics registry
_metrics.clear();
_stop_future.emplace(stop_ongoing_compactions("shutdown").then([this] () mutable {
reevaluate_postponed_compactions();
return std::move(_waiting_reevalution);
}).then([this] {
_weight_tracker.clear();
_compaction_submission_timer.cancel();
cmlog.info("Stopped");
return _compaction_controller.shutdown();
});
}));
}

inline bool compaction_manager::can_proceed(const lw_shared_ptr<task>& task) {
return !_stopped && !task->stopping;
return (_state == state::enabled) && !task->stopping;
}

inline future<> compaction_manager::put_task_to_sleep(lw_shared_ptr<task>& task) {
Expand Down Expand Up @@ -505,8 +550,7 @@ inline bool compaction_manager::maybe_stop_on_error(future<> f, stop_iteration w
} catch (storage_io_error& e) {
cmlog.error("compaction failed due to storage io error: {}: stopping", e.what());
retry = false;
// FIXME discarded future.
(void)stop();
do_stop();
} catch (...) {
cmlog.error("compaction failed: {}: {}", std::current_exception(), decision_msg);
retry = true;
Expand Down
45 changes: 37 additions & 8 deletions sstables/compaction_manager.hh
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <seastar/core/rwlock.hh>
#include <seastar/core/metrics_registration.hh>
#include <seastar/core/scheduling.hh>
#include <seastar/core/abort_source.hh>
#include "log.hh"
#include "utils/exponential_backoff_retry.hh"
#include <vector>
Expand Down Expand Up @@ -67,8 +68,22 @@ private:
// compaction manager may have N fibers to allow parallel compaction per shard.
std::list<lw_shared_ptr<task>> _tasks;

// Used to assert that compaction_manager was explicitly stopped, if started.
bool _stopped = true;
// Possible states in which the compaction manager can be found.
//
// none: started, but not yet enabled. Once the compaction manager moves out of "none", it can
// never legally move back
// stopped: stop() was called. The compaction_manager will never be enabled or disabled again
// and can no longer be used (although it is possible to still grab metrics, stats,
// etc)
// enabled: accepting compactions
// disabled: not accepting compactions
//
// Moving the compaction manager to and from enabled and disable states is legal, as many times
// as necessary.
enum class state { none, stopped, disabled, enabled };
state _state = state::none;

std::optional<future<>> _stop_future;

stats _stats;
seastar::metrics::metric_groups _metrics;
Expand Down Expand Up @@ -149,21 +164,35 @@ private:
using get_candidates_func = std::function<std::vector<sstables::shared_sstable>(const column_family&)>;

future<> rewrite_sstables(column_family* cf, sstables::compaction_options options, get_candidates_func);

future<> stop_ongoing_compactions(sstring reason);
optimized_optional<abort_source::subscription> _early_abort_subscription;
public:
compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory);
compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, uint64_t shares);
compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, abort_source& as);
compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, uint64_t shares, abort_source& as);
compaction_manager();
~compaction_manager();

void register_metrics();

// Start compaction manager.
void start();
// enable/disable compaction manager.
void enable();
void disable();

// Stop all fibers. Ongoing compactions will be waited.
// Stop all fibers. Ongoing compactions will be waited. Should only be called
// once, from main teardown path.
future<> stop();

bool stopped() const { return _stopped; }
// cancels all running compactions and moves the compaction manager into disabled state.
// The compaction manager is still alive after drain but it will not accept new compactions
// unless it is moved back to enabled state.
future<> drain();

// FIXME: should not be public. It's not anyone's business if we are enabled.
// distributed_loader.cc uses for resharding, remove this when the new resharding series lands.
bool enabled() const { return _state == state::enabled; }
// Stop all fibers, without waiting. Safe to be called multiple times.
void do_stop();

// Submit a column family to be compacted.
void submit(column_family* cf);
Expand Down
4 changes: 2 additions & 2 deletions test/boost/gossip_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
service::get_storage_service().start(std::ref(abort_sources), std::ref(db), std::ref(gms::get_gossiper()), std::ref(auth_service), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service), sscfg, std::ref(mm_notif), std::ref(token_metadata), true).get();
auto stop_ss = defer([&] { service::get_storage_service().stop().get(); });

db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata)).get();
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata), std::ref(abort_sources)).get();
db.invoke_on_all([] (database& db) {
db.get_compaction_manager().start();
db.get_compaction_manager().enable();
}).get();

auto stop_db = defer([&] { db.stop().get(); });
Expand Down
Loading

0 comments on commit b155eef

Please sign in to comment.