Skip to content

Commit

Permalink
storage_service: pass replacement_info to booststrap
Browse files Browse the repository at this point in the history
So it won't need to call get_replace_address.

Signed-off-by: Benny Halevy <[email protected]>
  • Loading branch information
bhalevy committed Jan 13, 2023
1 parent b863f7a commit 08598e4
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 17 deletions.
33 changes: 17 additions & 16 deletions service/storage_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi

bool replacing_a_node_with_same_ip = false;
bool replacing_a_node_with_diff_ip = false;
std::optional<replacement_info> ri;
std::optional<gms::inet_address> replace_address;
std::optional<locator::host_id> replaced_host_id;
std::optional<raft_group0::replace_info> raft_replace_info;
Expand All @@ -324,21 +325,21 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
if (_sys_ks.local().bootstrap_complete()) {
throw std::runtime_error("Cannot replace address with a node that is already bootstrapped");
}
auto ri = co_await prepare_replacement_info(initial_contact_nodes, loaded_peer_features);
bootstrap_tokens = std::move(ri.tokens);
replace_address = ri.address;
ri = co_await prepare_replacement_info(initial_contact_nodes, loaded_peer_features);
bootstrap_tokens = std::move(ri->tokens);
replace_address = ri->address;
replacing_a_node_with_same_ip = *replace_address == get_broadcast_address();
replacing_a_node_with_diff_ip = *replace_address != get_broadcast_address();

slogger.info("Replacing a node with {} IP address, my address={}, node being replaced={}",
get_broadcast_address() == *replace_address ? "the same" : "a different",
get_broadcast_address(), *replace_address);
tmptr->update_topology(*replace_address, std::move(ri.dc_rack));
tmptr->update_topology(*replace_address, std::move(ri->dc_rack));
co_await tmptr->update_normal_tokens(bootstrap_tokens, *replace_address);
replaced_host_id = ri.host_id;
replaced_host_id = ri->host_id;
raft_replace_info = raft_group0::replace_info {
.ip_addr = *replace_address,
.raft_id = raft::server_id{ri.host_id.uuid()},
.raft_id = raft::server_id{ri->host_id.uuid()},
};
} else if (should_bootstrap()) {
co_await check_for_endpoint_collision(initial_contact_nodes, loaded_peer_features);
Expand Down Expand Up @@ -529,7 +530,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
co_await sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::start);
co_await mark_existing_views_as_built(sys_dist_ks);
co_await _sys_ks.local().update_tokens(bootstrap_tokens);
co_await bootstrap(cdc_gen_service, bootstrap_tokens, cdc_gen_id, replaced_host_id);
co_await bootstrap(cdc_gen_service, bootstrap_tokens, cdc_gen_id, ri);
} else {
supervisor::notify("starting system distributed keyspace");
co_await sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::start);
Expand Down Expand Up @@ -651,8 +652,8 @@ std::list<gms::inet_address> storage_service::get_ignore_dead_nodes_for_replace(
}

// Runs inside seastar::async context
future<> storage_service::bootstrap(cdc::generation_service& cdc_gen_service, std::unordered_set<token>& bootstrap_tokens, std::optional<cdc::generation_id>& cdc_gen_id, const std::optional<locator::host_id>& replaced_host_id) {
return seastar::async([this, &bootstrap_tokens, &cdc_gen_id, &cdc_gen_service, &replaced_host_id] {
future<> storage_service::bootstrap(cdc::generation_service& cdc_gen_service, std::unordered_set<token>& bootstrap_tokens, std::optional<cdc::generation_id>& cdc_gen_id, const std::optional<replacement_info>& replacement_info) {
return seastar::async([this, &bootstrap_tokens, &cdc_gen_id, &cdc_gen_service, &replacement_info] {
auto bootstrap_rbno = is_repair_based_node_ops_enabled(streaming::stream_reason::bootstrap);

set_mode(mode::BOOTSTRAP);
Expand All @@ -668,7 +669,7 @@ future<> storage_service::bootstrap(cdc::generation_service& cdc_gen_service, st
}
}).get();

if (!is_replacing()) {
if (!replacement_info) {
int retry = 0;
while (get_token_metadata_ptr()->count_normal_token_owners() == 0) {
if (retry++ < 500) {
Expand Down Expand Up @@ -733,15 +734,15 @@ future<> storage_service::bootstrap(cdc::generation_service& cdc_gen_service, st
run_bootstrap_ops(bootstrap_tokens);
}
} else {
auto replace_addr = get_replace_address();
assert(replace_addr);
auto replace_addr = replacement_info->address;
auto replaced_host_id = replacement_info->host_id;

slogger.debug("Removing replaced endpoint {} from system.peers", *replace_addr);
_sys_ks.local().remove_endpoint(*replace_addr).get();
slogger.debug("Removing replaced endpoint {} from system.peers", replace_addr);
_sys_ks.local().remove_endpoint(replace_addr).get();

assert(replaced_host_id);
auto raft_id = raft::server_id{replaced_host_id->uuid()};
slogger.info("Replace: removing {}/{} from group 0...", *replace_addr, raft_id);
auto raft_id = raft::server_id{replaced_host_id.uuid()};
slogger.info("Replace: removing {}/{} from group 0...", replace_addr, raft_id);
assert(_group0);
_group0->remove_from_group0(raft_id).get();

Expand Down
2 changes: 1 addition & 1 deletion service/storage_service.hh
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ private:
// Stream data for which we become a new replica.
// Before that, if we're not replacing another node, inform other nodes about our chosen tokens
// and wait for RING_DELAY ms so that we receive new writes from coordinators during streaming.
future<> bootstrap(cdc::generation_service& cdc_gen_service, std::unordered_set<token>& bootstrap_tokens, std::optional<cdc::generation_id>& cdc_gen_id, const std::optional<locator::host_id>& replaced_host_id);
future<> bootstrap(cdc::generation_service& cdc_gen_service, std::unordered_set<token>& bootstrap_tokens, std::optional<cdc::generation_id>& cdc_gen_id, const std::optional<replacement_info>& replacement_info);

public:
/**
Expand Down

0 comments on commit 08598e4

Please sign in to comment.