Skip to content

Commit

Permalink
everywhere: Replace engine().cpu_id() with this_shard_id()
Browse files Browse the repository at this point in the history
This is a bit simpler and might allow removing a few includes of
reactor.hh.

Signed-off-by: Rafael Ávila de Espíndola <[email protected]>
Message-Id: <[email protected]>
  • Loading branch information
espindola authored and avikivity committed Mar 27, 2020
1 parent c639a5e commit c5795e8
Show file tree
Hide file tree
Showing 58 changed files with 161 additions and 161 deletions.
4 changes: 2 additions & 2 deletions alternator/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,7 @@ std::optional<shard_id> rmw_operation::shard_for_execute(bool needs_read_before_
// find the appropriate shard to run it on:
auto token = dht::get_token(*_schema, _pk);
auto desired_shard = service::storage_proxy::cas_shard(*_schema, token);
if (desired_shard == engine().cpu_id()) {
if (desired_shard == this_shard_id()) {
return {};
}
return desired_shard;
Expand Down Expand Up @@ -1597,7 +1597,7 @@ static future<> do_batch_write(service::storage_proxy& proxy,
return parallel_for_each(std::move(key_builders), [&proxy, &client_state, &stats, trace_state, ssg, permit = std::move(permit)] (auto& e) {
stats.write_using_lwt++;
auto desired_shard = service::storage_proxy::cas_shard(*e.first.schema, e.first.dk.token());
if (desired_shard == engine().cpu_id()) {
if (desired_shard == this_shard_id()) {
return cas_write(proxy, e.first.schema, e.first.dk, std::move(e.second), client_state, trace_state, permit);
} else {
stats.shard_bounce_for_lwt++;
Expand Down
2 changes: 1 addition & 1 deletion auth/common.hh
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ extern const sstring AUTH_PACKAGE_NAME;

template <class Task>
future<> once_among_shards(Task&& f) {
if (engine().cpu_id() == 0u) {
if (this_shard_id() == 0u) {
return f();
}

Expand Down
2 changes: 1 addition & 1 deletion cql3/statements/batch_statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::exe
}

auto shard = service::storage_proxy::cas_shard(*_statements[0].statement->s, request->key()[0].start()->value().as_decorated_key().token());
if (shard != engine().cpu_id()) {
if (shard != this_shard_id()) {
proxy.get_stats().replica_cross_shard_ops++;
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(
make_shared<cql_transport::messages::result_message::bounce_to_shard>(shard));
Expand Down
2 changes: 1 addition & 1 deletion cql3/statements/modification_statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ modification_statement::execute_with_condition(service::storage_proxy& proxy, se
request->add_row_update(*this, std::move(ranges), std::move(json_cache), options);

auto shard = service::storage_proxy::cas_shard(*s, request->key()[0].start()->value().as_decorated_key().token());
if (shard != engine().cpu_id()) {
if (shard != this_shard_id()) {
proxy.get_stats().replica_cross_shard_ops++;
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(
make_shared<cql_transport::messages::result_message::bounce_to_shard>(shard));
Expand Down
2 changes: 1 addition & 1 deletion cql3/statements/select_statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ select_statement::do_execute(service::storage_proxy& proxy,
"SERIAL/LOCAL_SERIAL consistency may only be requested for one partition at a time");
}
unsigned shard = dht::shard_of(*_schema, key_ranges[0].start()->value().as_decorated_key().token());
if (engine().cpu_id() != shard) {
if (this_shard_id() != shard) {
proxy.get_stats().replica_cross_shard_ops++;
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(
make_shared<cql_transport::messages::result_message::bounce_to_shard>(shard));
Expand Down
6 changes: 3 additions & 3 deletions database.cc
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ database::setup_metrics() {
sm::make_total_operations("total_view_updates_failed_remote", _cf_stats.total_view_updates_failed_remote,
sm::description("Total number of view updates generated for tables and failed to be sent to remote replicas.")),
});
if (engine().cpu_id() == 0) {
if (this_shard_id() == 0) {
_metrics.add_group("database", {
sm::make_derive("schema_changed", _schema_change_count,
sm::description("The number of times the schema changed")),
Expand Down Expand Up @@ -2016,7 +2016,7 @@ flat_mutation_reader make_multishard_streaming_reader(distributed<database>& db,
const io_priority_class& pc,
tracing::trace_state_ptr,
mutation_reader::forwarding fwd_mr) override {
const auto shard = engine().cpu_id();
const auto shard = this_shard_id();
auto& cf = _db.local().find_column_family(schema);

_contexts[shard].range = make_foreign(std::make_unique<const dht::partition_range>(range));
Expand All @@ -2036,7 +2036,7 @@ flat_mutation_reader make_multishard_streaming_reader(distributed<database>& db,
});
}
virtual reader_concurrency_semaphore& semaphore() override {
return *_contexts[engine().cpu_id()].semaphore;
return *_contexts[this_shard_id()].semaphore;
}
};
auto ms = mutation_source([&db] (schema_ptr s,
Expand Down
2 changes: 1 addition & 1 deletion database.hh
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ private:
assert(_sstable_generation);
// FIXME: better way of ensuring we don't attempt to
// overwrite an existing table.
return (*_sstable_generation)++ * smp::count + engine().cpu_id();
return (*_sstable_generation)++ * smp::count + this_shard_id();
}

// inverse of calculate_generation_for_new_table(), used to determine which
Expand Down
4 changes: 2 additions & 2 deletions db/batchlog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ future<> db::batchlog_manager::start() {
// we use the _timer and _sem on shard zero only. Replaying batchlog can
// generate a lot of work, so we distrute the real work on all cpus with
// round-robin scheduling.
if (engine().cpu_id() == 0) {
if (this_shard_id() == 0) {
_timer.set_callback([this] {
// Do it in the background.
(void)do_batch_log_replay().handle_exception([] (auto ep) {
Expand Down Expand Up @@ -295,7 +295,7 @@ future<> db::batchlog_manager::replay_all_failed_batches() {
};

return seastar::with_gate(_gate, [this, batch = std::move(batch)] {
blogger.debug("Started replayAllFailedBatches (cpu {})", engine().cpu_id());
blogger.debug("Started replayAllFailedBatches (cpu {})", this_shard_id());

typedef ::shared_ptr<cql3::untyped_result_set> page_ptr;
sstring query = format("SELECT id, data, written_at, version FROM {}.{} LIMIT {:d}", system_keyspace::NAME, system_keyspace::BATCHLOG, page_size);
Expand Down
4 changes: 2 additions & 2 deletions db/commitlog/commitlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1149,10 +1149,10 @@ future<> db::commitlog::segment_manager::init() {
}

// base id counter is [ <shard> | <base> ]
_ids = replay_position(engine().cpu_id(), id).id;
_ids = replay_position(this_shard_id(), id).id;
// always run the timer now, since we need to handle segment pre-alloc etc as well.
_timer.set_callback(std::bind(&segment_manager::on_timer, this));
auto delay = engine().cpu_id() * std::ceil(double(cfg.commitlog_sync_period_in_ms) / smp::count);
auto delay = this_shard_id() * std::ceil(double(cfg.commitlog_sync_period_in_ms) / smp::count);
clogger.trace("Delaying timer loop {} ms", delay);
// We need to wait until we have scanned all other segments to actually start serving new
// segments. We are ready now
Expand Down
2 changes: 1 addition & 1 deletion db/hints/manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const std::chrono::seconds manager::hint_file_write_timeout = std::chrono::secon
const std::chrono::seconds manager::hints_flush_period = std::chrono::seconds(10);

manager::manager(sstring hints_directory, std::vector<sstring> hinted_dcs, int64_t max_hint_window_ms, resource_manager& res_manager, distributed<database>& db)
: _hints_dir(fs::path(hints_directory) / format("{:d}", engine().cpu_id()))
: _hints_dir(fs::path(hints_directory) / format("{:d}", this_shard_id()))
, _hinted_dcs(hinted_dcs.begin(), hinted_dcs.end())
, _local_snitch_ptr(locator::i_endpoint_snitch::get_local_snitch_ptr())
, _max_hint_window_us(max_hint_window_ms * 1000)
Expand Down
2 changes: 1 addition & 1 deletion db/size_estimates_virtual_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ static std::vector<sstring> get_keyspaces(const schema& s, const database& db, d
// If this is a range query, results are divided between shards by the partition key (keyspace_name).
return shard_of(s, dht::get_token(s,
partition_key::from_single_value(s, utf8_type->decompose(ks))))
== engine().cpu_id();
== this_shard_id();
})
);
}
Expand Down
2 changes: 1 addition & 1 deletion db/system_distributed_keyspace.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ system_distributed_keyspace::system_distributed_keyspace(cql3::query_processor&
}

future<> system_distributed_keyspace::start() {
if (engine().cpu_id() != 0) {
if (this_shard_id() != 0) {
return make_ready_future<>();
}

Expand Down
6 changes: 3 additions & 3 deletions db/system_keyspace.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2111,7 +2111,7 @@ future<> register_view_for_building(sstring ks_name, sstring view_name, const dh
std::move(ks_name),
std::move(view_name),
0,
int32_t(engine().cpu_id()),
int32_t(this_shard_id()),
token.to_sstring()).discard_result();
}

Expand All @@ -2123,7 +2123,7 @@ future<> update_view_build_progress(sstring ks_name, sstring view_name, const dh
std::move(ks_name),
std::move(view_name),
token.to_sstring(),
int32_t(engine().cpu_id())).discard_result();
int32_t(this_shard_id())).discard_result();
}

future<> remove_view_build_progress_across_all_shards(sstring ks_name, sstring view_name) {
Expand All @@ -2138,7 +2138,7 @@ future<> remove_view_build_progress(sstring ks_name, sstring view_name) {
format("DELETE FROM system.{} WHERE keyspace_name = ? AND view_name = ? AND cpu_id = ?", v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS),
std::move(ks_name),
std::move(view_name),
int32_t(engine().cpu_id())).discard_result();
int32_t(this_shard_id())).discard_result();
}

future<> mark_view_as_built(sstring ks_name, sstring view_name) {
Expand Down
12 changes: 6 additions & 6 deletions db/view/view.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1393,7 +1393,7 @@ future<> view_builder::calculate_shard_build_step(
} catch (const no_such_column_family&) {
// Fall-through
}
if (engine().cpu_id() == 0) {
if (this_shard_id() == 0) {
bookkeeping_ops->push_back(_sys_dist_ks.remove_view(name.first, name.second));
bookkeeping_ops->push_back(system_keyspace::remove_built_view(name.first, name.second));
bookkeeping_ops->push_back(
Expand All @@ -1413,7 +1413,7 @@ future<> view_builder::calculate_shard_build_step(
for (auto& [view_name, first_token, next_token_opt, cpu_id] : in_progress) {
if (auto view = maybe_fetch_view(view_name)) {
if (built_views.find(view->id()) != built_views.end()) {
if (engine().cpu_id() == 0) {
if (this_shard_id() == 0) {
auto f = _sys_dist_ks.finish_view_build(std::move(view_name.first), std::move(view_name.second)).then([view = std::move(view)] {
//FIXME: discarded future.
(void)system_keyspace::remove_view_build_progress_across_all_shards(view->cf_name(), view->ks_name());
Expand Down Expand Up @@ -1448,7 +1448,7 @@ future<> view_builder::calculate_shard_build_step(
if (view_build_status_per_shard.size() != smp::count) {
reshard(std::move(view_build_status_per_shard), loaded_views);
} else if (!view_build_status_per_shard.empty()) {
for (auto& status : view_build_status_per_shard[engine().cpu_id()]) {
for (auto& status : view_build_status_per_shard[this_shard_id()]) {
load_view_status(std::move(status), loaded_views);
}
}
Expand Down Expand Up @@ -1485,7 +1485,7 @@ future<> view_builder::calculate_shard_build_step(
future<> view_builder::add_new_view(view_ptr view, build_step& step) {
vlogger.info0("Building view {}.{}, starting at token {}", view->ks_name(), view->cf_name(), step.current_token());
step.build_status.emplace(step.build_status.begin(), view_build_status{view, step.current_token(), std::nullopt});
auto f = engine().cpu_id() == 0 ? _sys_dist_ks.start_view_build(view->ks_name(), view->cf_name()) : make_ready_future<>();
auto f = this_shard_id() == 0 ? _sys_dist_ks.start_view_build(view->ks_name(), view->cf_name()) : make_ready_future<>();
return when_all_succeed(
std::move(f),
system_keyspace::register_view_for_building(view->ks_name(), view->cf_name(), step.current_token()));
Expand Down Expand Up @@ -1564,7 +1564,7 @@ void view_builder::on_drop_view(const sstring& ks_name, const sstring& view_name
}
}
})();
if (engine().cpu_id() != 0) {
if (this_shard_id() != 0) {
// Shard 0 can't remove the entry in the build progress system table on behalf of the
// current shard, since shard 0 may have already processed the notification, and this
// shard may since have updated the system table if the drop happened concurrently
Expand Down Expand Up @@ -1815,7 +1815,7 @@ future<> view_builder::maybe_mark_view_as_built(view_ptr view, dht::token next_t
}).then([this, view, next_token = std::move(next_token)] (bool built) {
if (built) {
return container().invoke_on_all([view_id = view->id()] (view_builder& builder) {
if (builder._built_views.erase(view_id) == 0 || engine().cpu_id() != 0) {
if (builder._built_views.erase(view_id) == 0 || this_shard_id() != 0) {
return make_ready_future<>();
}
auto view = builder._db.find_schema(view_id);
Expand Down
8 changes: 4 additions & 4 deletions distributed_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ static std::vector<sstables::shared_sstable> sstables_for_shard(const std::vecto
}

void distributed_loader::reshard(distributed<database>& db, sstring ks_name, sstring cf_name) {
assert(engine().cpu_id() == 0); // NOTE: should always run on shard 0!
assert(this_shard_id() == 0); // NOTE: should always run on shard 0!

// ensures that only one column family is resharded at a time (that's okay because
// actual resharding is parallelized), and that's needed to prevent the same column
Expand Down Expand Up @@ -728,7 +728,7 @@ future<> distributed_loader::do_populate_column_family(distributed<database>& db
sstables::sstable::version_types version = v.second.version;
sstables::sstable::format_types format = v.second.format;

if (engine().cpu_id() != 0) {
if (this_shard_id() != 0) {
dblog.debug("At directory: {}, partial SSTable with generation {} not relevant for this shard, ignoring", sstdir, v.first);
return make_ready_future<>();
}
Expand All @@ -747,7 +747,7 @@ future<> distributed_loader::do_populate_column_family(distributed<database>& db
future<> distributed_loader::populate_column_family(distributed<database>& db, sstring sstdir, sstring ks, sstring cf) {
return async([&db, sstdir = std::move(sstdir), ks = std::move(ks), cf = std::move(cf)] {
// First pass, cleanup temporary sstable directories and sstables pending delete.
if (engine().cpu_id() == 0) {
if (this_shard_id() == 0) {
cleanup_column_family_temp_sst_dirs(sstdir).get();
auto pending_delete_dir = sstdir + "/" + sstables::sstable::pending_delete_dir_basename();
auto exists = file_exists(pending_delete_dir).get0();
Expand Down Expand Up @@ -804,7 +804,7 @@ future<> distributed_loader::init_system_keyspace(distributed<database>& db) {
return db.init_commitlog();
}).get();
db.invoke_on_all([] (database& db) {
if (engine().cpu_id() == 0) {
if (this_shard_id() == 0) {
return make_ready_future<>();
}
return db.init_commitlog();
Expand Down
2 changes: 1 addition & 1 deletion gms/feature_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ feature& feature::operator=(feature&& other) {

void feature::enable() {
if (!_enabled) {
if (engine().cpu_id() == 0) {
if (this_shard_id() == 0) {
logger.info("Feature {} is enabled", name());
}
_enabled = true;
Expand Down
18 changes: 9 additions & 9 deletions gms/gossiper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ gossiper::gossiper(abort_source& as, feature_service& features, locator::token_m
std::chrono::milliseconds(cfg.fd_initial_value_ms()),
std::chrono::milliseconds(cfg.fd_max_interval_ms())) {
// Gossiper's stuff below runs only on CPU0
if (engine().cpu_id() != 0) {
if (this_shard_id() != 0) {
return;
}

Expand Down Expand Up @@ -823,7 +823,7 @@ void gossiper::run() {

_the_gossiper.invoke_on_all([this, live_endpoint_changed, unreachable_endpoint_changed, es = endpoint_state_map] (gossiper& local_gossiper) {
// Don't copy gossiper(CPU0) maps into themselves!
if (engine().cpu_id() != 0) {
if (this_shard_id() != 0) {
if (live_endpoint_changed) {
local_gossiper._live_endpoints = _shadow_live_endpoints;
}
Expand Down Expand Up @@ -1041,16 +1041,16 @@ void gossiper::make_random_gossip_digest(utils::chunked_vector<gossip_digest>& g
}

future<> gossiper::replicate(inet_address ep, const endpoint_state& es) {
return container().invoke_on_all([ep, es, orig = engine().cpu_id(), self = shared_from_this()] (gossiper& g) {
if (engine().cpu_id() != orig) {
return container().invoke_on_all([ep, es, orig = this_shard_id(), self = shared_from_this()] (gossiper& g) {
if (this_shard_id() != orig) {
g.endpoint_state_map[ep].add_application_state(es);
}
});
}

future<> gossiper::replicate(inet_address ep, const std::map<application_state, versioned_value>& src, const utils::chunked_vector<application_state>& changed) {
return container().invoke_on_all([ep, &src, &changed, orig = engine().cpu_id(), self = shared_from_this()] (gossiper& g) {
if (engine().cpu_id() != orig) {
return container().invoke_on_all([ep, &src, &changed, orig = this_shard_id(), self = shared_from_this()] (gossiper& g) {
if (this_shard_id() != orig) {
for (auto&& key : changed) {
g.endpoint_state_map[ep].add_application_state(key, src.at(key));
}
Expand All @@ -1059,8 +1059,8 @@ future<> gossiper::replicate(inet_address ep, const std::map<application_state,
}

future<> gossiper::replicate(inet_address ep, application_state key, const versioned_value& value) {
return container().invoke_on_all([ep, key, &value, orig = engine().cpu_id(), self = shared_from_this()] (gossiper& g) {
if (engine().cpu_id() != orig) {
return container().invoke_on_all([ep, key, &value, orig = this_shard_id(), self = shared_from_this()] (gossiper& g) {
if (this_shard_id() != orig) {
g.endpoint_state_map[ep].add_application_state(key, value);
}
});
Expand Down Expand Up @@ -1960,7 +1960,7 @@ future<> gossiper::do_stop_gossiping() {
seastar::with_semaphore(_callback_running, 1, [] {
logger.info("Disable and wait for gossip loop finished");
return get_gossiper().invoke_on_all([] (gossiper& g) {
if (engine().cpu_id() == 0) {
if (this_shard_id() == 0) {
g.fd().unregister_failure_detection_event_listener(&g);
}
g._features_condvar.broken();
Expand Down
4 changes: 2 additions & 2 deletions locator/ec2_multi_region_snitch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ future<> ec2_multi_region_snitch::start() {

return seastar::async([this] {
ec2_snitch::load_config().get();
if (engine().cpu_id() == io_cpu_id()) {
if (this_shard_id() == io_cpu_id()) {
sstring pub_addr;
inet_address local_public_address;

Expand Down Expand Up @@ -81,7 +81,7 @@ future<> ec2_multi_region_snitch::start() {
// going to invoke gossiper_starting() method.
//
_my_distributed->invoke_on(0, [this] (snitch_ptr& local_s) {
if (engine().cpu_id() != io_cpu_id()) {
if (this_shard_id() != io_cpu_id()) {
local_s->set_local_private_addr(_local_private_address);
}
}).get();
Expand Down
Loading

0 comments on commit c5795e8

Please sign in to comment.