Skip to content

Commit

Permalink
raft: (service) manage Raft configuration during topology changes
Browse files Browse the repository at this point in the history
Operations of adding or removing a node to Raft configuration
are made idempotent: they do nothing if already done, and
they are safe to resume after a failure.

However, since topology changes are not transactional, if a
bootstrap or removal procedure fails midway, Raft group 0
configuration may go out of sync with topology state as seen by
gossip.

In future we must change gossip to avoid making any persistent
changes to the cluster: all changes to persistent topology state
will be done exclusively through Raft Group 0.

Specifically, instead of persisting the tokens by advertising
them through gossip, the bootstrap will commit a change to a system
table using Raft group 0. nodetool will switch from looking at
gossip-managed tables to consulting with Raft Group 0 configuration
or Raft-managed tables.
Once this transformation is done, naturally, adding a node to Raft
configuration (perhaps as a non-voting member at first) will become the
first persistent change to ring state applied when a node joins;
removing a node from the Raft Group 0 configuration will become the last
action when removing a node.

Until this is done, do our best to avoid a cluster state when
a removed node or a node which addition failed is stuck in Raft
configuration, but the node is no longer present in gossip-managed
system tables. In other words, keep the gossip the primary source of
truth. For this purpose, carefully chose the timing when we
join and leave Raft group 0:

Join the Raft group 0 only after we've advertised our tokens, so the
cluster is aware of this node, it's visible in nodetool status,
but before node state jumps to "normal", i.e. before it accepts
queries. Since the operation is idempotent, invoke it on each
restart.

Remove the node from Group 0 *before* its tokens are removed
from gossip-managed system tables. This guarantees
that if removal from Raft group 0 fails for whatever reason,
the node stays in the ring, so nodetool removenode and
friends are re-tried.

Add tracing.
kostja committed Nov 25, 2021
1 parent 96e2594 commit c22f945
Showing 19 changed files with 688 additions and 110 deletions.
2 changes: 2 additions & 0 deletions configure.py
Original file line number Diff line number Diff line change
@@ -1020,6 +1020,7 @@ def find_headers(repodir, excluded_dirs):
'service/raft/raft_gossip_failure_detector.cc',
'service/raft/raft_group_registry.cc',
'service/raft/discovery.cc',
'service/raft/raft_group0.cc',
] + [Antlr3Grammar('cql3/Cql.g')] + [Thrift('interface/cassandra.thrift', 'Cassandra')] \
+ scylla_raft_core
)
@@ -1119,6 +1120,7 @@ def find_headers(repodir, excluded_dirs):
'idl/messaging_service.idl.hh',
'idl/paxos.idl.hh',
'idl/raft.idl.hh',
'idl/group0.idl.hh',
'idl/hinted_handoff.idl.hh',
]

33 changes: 33 additions & 0 deletions idl/group0.idl.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2021-present ScyllaDB
*/

/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/

namespace service {

struct group0_info {
raft::group_id group0_id;
raft::server_address addr;
};

struct group0_peer_exchange {
std::variant<std::monostate, service::group0_info, std::vector<raft::server_address>> info;
};

} // namespace raft
27 changes: 12 additions & 15 deletions main.cc
Original file line number Diff line number Diff line change
@@ -816,11 +816,20 @@ int main(int ac, char** av) {
});
gossiper.invoke_on_all(&gms::gossiper::start).get();

supervisor::notify("starting Raft service");
raft_gr.start(std::ref(messaging), std::ref(gossiper), std::ref(qp)).get();

raft_gr.start(cfg->check_experimental(db::experimental_features_t::RAFT),
std::ref(messaging), std::ref(gossiper)).get();
// XXX: stop_raft has to happen before query_processor
// is stopped, since some groups keep using the query
// processor until are stopped inside stop_raft.
auto stop_raft = defer_verbose_shutdown("Raft", [&raft_gr] {
raft_gr.stop().get();
});
if (cfg->check_experimental(db::experimental_features_t::RAFT)) {
supervisor::notify("starting Raft Group Registry service");
}
raft_gr.invoke_on_all(&service::raft_group_registry::start).get();

supervisor::notify("initializing storage service");
service::storage_service_config sscfg;
sscfg.available_memory = memory::stats().total_memory();
@@ -1028,18 +1037,6 @@ int main(int ac, char** av) {
proxy.invoke_on_all(&service::storage_proxy::uninit_messaging_service).get();
});

const bool raft_enabled = cfg->check_experimental(db::experimental_features_t::RAFT);
if (raft_enabled) {
supervisor::notify("starting Raft RPC");
raft_gr.invoke_on_all(&service::raft_group_registry::init).get();
}
auto stop_raft_rpc = defer_verbose_shutdown("Raft RPC", [&raft_gr] {
raft_gr.invoke_on_all(&service::raft_group_registry::uninit).get();
});
if (!raft_enabled) {
stop_raft_rpc->cancel();
}

debug::the_stream_manager = &stream_manager;
supervisor::notify("starting streaming service");
stream_manager.start(std::ref(db), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(messaging), std::ref(mm), std::ref(gossiper)).get();
@@ -1173,7 +1170,7 @@ int main(int ac, char** av) {
}).get();

with_scheduling_group(maintenance_scheduling_group, [&] {
return ss.local().init_server();
return ss.local().init_server(qp.local());
}).get();

gossiper.local().wait_for_gossip_to_settle().get();
28 changes: 28 additions & 0 deletions message/messaging_service.cc
Original file line number Diff line number Diff line change
@@ -64,6 +64,7 @@
#include "idl/messaging_service.dist.hh"
#include "idl/paxos.dist.hh"
#include "idl/raft.dist.hh"
#include "idl/group0.dist.hh"
#include "serializer_impl.hh"
#include "serialization_visitors.hh"
#include "idl/consistency_level.dist.impl.hh"
@@ -87,6 +88,7 @@
#include "idl/messaging_service.dist.impl.hh"
#include "idl/paxos.dist.impl.hh"
#include "idl/raft.dist.impl.hh"
#include "idl/group0.dist.impl.hh"
#include <seastar/rpc/lz4_compressor.hh>
#include <seastar/rpc/lz4_fragmented_compressor.hh>
#include <seastar/rpc/multi_algo_compressor_factory.hh>
@@ -441,6 +443,10 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::GOSSIP_ECHO:
case messaging_verb::GOSSIP_GET_ENDPOINT_STATES:
case messaging_verb::GET_SCHEMA_VERSION:
// Raft peer exchange is mainly running at boot, but still
// should not be blocked by any data requests.
case messaging_verb::GROUP0_PEER_EXCHANGE:
case messaging_verb::GROUP0_MODIFY_CONFIG:
return 0;
case messaging_verb::PREPARE_MESSAGE:
case messaging_verb::PREPARE_DONE_MESSAGE:
@@ -1436,6 +1442,28 @@ future<raft::add_entry_reply> messaging_service::send_raft_modify_config(msg_add
return send_message_timeout<raft::add_entry_reply>(this, messaging_verb::RAFT_MODIFY_CONFIG, std::move(id), timeout, std::move(gid), std::move(from_id), std::move(dst_id), add, del);
}

void messaging_service::register_group0_peer_exchange(std::function<future<service::group0_peer_exchange>(const rpc::client_info&, rpc::opt_time_point, std::vector<raft::server_address>)>&& func) {
register_handler(this, netw::messaging_verb::GROUP0_PEER_EXCHANGE, std::move(func));
}
future<> messaging_service::unregister_group0_peer_exchange() {
return unregister_handler(netw::messaging_verb::GROUP0_PEER_EXCHANGE);
}
future<service::group0_peer_exchange> messaging_service::send_group0_peer_exchange(msg_addr id, clock_type::time_point timeout, const std::vector<raft::server_address>& peers) {
return send_message_timeout<service::group0_peer_exchange>(this, messaging_verb::GROUP0_PEER_EXCHANGE, std::move(id), timeout, peers);
}

void messaging_service::register_group0_modify_config(std::function<future<>(const rpc::client_info&, rpc::opt_time_point, raft::group_id gid, std::vector<raft::server_address> add, std::vector<raft::server_id> del)>&& func) {
register_handler(this, netw::messaging_verb::GROUP0_MODIFY_CONFIG, std::move(func));
}

future<> messaging_service::unregister_group0_modify_config() {
return unregister_handler(netw::messaging_verb::GROUP0_MODIFY_CONFIG);
}

future<> messaging_service::send_group0_modify_config(msg_addr id, clock_type::time_point timeout, raft::group_id gid, const std::vector<raft::server_address>& add, const std::vector<raft::server_id>& del) {
return send_message_timeout<void>(this, messaging_verb::GROUP0_MODIFY_CONFIG, std::move(id), timeout, std::move(gid), add, del);
}

void init_messaging_service(sharded<messaging_service>& ms,
messaging_service::config mscfg, netw::messaging_service::scheduling_config scfg, const db::config& db_config) {
using encrypt_what = messaging_service::encrypt_what;
13 changes: 12 additions & 1 deletion message/messaging_service.hh
Original file line number Diff line number Diff line change
@@ -41,6 +41,7 @@
#include "cache_temperature.hh"
#include "service/paxos/prepare_response.hh"
#include "raft/raft.hh"
#include "service/raft/messaging.hh"

#include <list>
#include <vector>
@@ -158,7 +159,9 @@ enum class messaging_verb : int32_t {
RAFT_EXECUTE_READ_BARRIER_ON_LEADER = 54,
RAFT_ADD_ENTRY = 55,
RAFT_MODIFY_CONFIG = 56,
LAST = 57,
GROUP0_PEER_EXCHANGE = 57,
GROUP0_MODIFY_CONFIG = 58,
LAST = 59,
};

} // namespace netw
@@ -601,6 +604,14 @@ public:
future<> unregister_raft_modify_config();
future<raft::add_entry_reply> send_raft_modify_config(msg_addr id, clock_type::time_point timeout, raft::group_id gid, raft::server_id from_id, raft::server_id dst_id, const std::vector<raft::server_address>& add, const std::vector<raft::server_id>& del);

void register_group0_peer_exchange(std::function<future<service::group0_peer_exchange> (const rpc::client_info&, rpc::opt_time_point, std::vector<raft::server_address>)>&& func);
future<> unregister_group0_peer_exchange();
future<service::group0_peer_exchange> send_group0_peer_exchange(msg_addr id, clock_type::time_point timeout, const std::vector<raft::server_address>& peers);

void register_group0_modify_config(std::function<future<>(const rpc::client_info&, rpc::opt_time_point, raft::group_id gid, std::vector<raft::server_address> add, std::vector<raft::server_id> del)>&& func);
future<> unregister_group0_modify_config();
future<> send_group0_modify_config(msg_addr id, clock_type::time_point timeout, raft::group_id gid, const std::vector<raft::server_address>& add, const std::vector<raft::server_id>& del);

void foreach_server_connection_stats(std::function<void(const rpc::client_info&, const rpc::stats&)>&& f) const;
private:
bool remove_rpc_client_one(clients_map& clients, msg_addr id, bool dead_only);
1 change: 0 additions & 1 deletion raft/fsm.cc
Original file line number Diff line number Diff line change
@@ -53,7 +53,6 @@ future<> fsm::wait_max_log_size() {
}

const configuration& fsm::get_configuration() const {
check_is_leader();
return _log.get_configuration();
}

48 changes: 48 additions & 0 deletions service/raft/messaging.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (C) 2021-present ScyllaDB
*/

/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "raft/raft.hh"
/////////////////////////////////////////
// Discovery RPC supporting message types

namespace service {

// Used in a bootstrapped Scylla cluster, provides group 0
// identifier and the current group leader address.
struct group0_info {
raft::group_id group0_id;
raft::server_address addr;
bool operator==(const group0_info& rhs) const {
return rhs.group0_id == group0_id && rhs.addr == addr;
}
};

// If the peer has no cluster discovery running, it returns
// std::monostate, which means the caller needs to retry
// contacting this server after a pause. Otherwise it returns
// its leader data or a list of peers.
struct group0_peer_exchange {
std::variant<std::monostate, group0_info, std::vector<raft::server_address>> info;
};

/////////////////////////////////////////
} // end of namespace service

19 changes: 19 additions & 0 deletions service/raft/raft_address_map.hh
Original file line number Diff line number Diff line change
@@ -281,6 +281,17 @@ public:
}
return set_it->_addr;
}
// Linear search for id based on inet address. Used when
// removing a node which id is unknown. Do not return self
// - we need to remove id of the node self is replacing.
std::optional<raft::server_id> find_replace_id(gms::inet_address addr, raft::server_id self) const {
for (auto it : _set) {
if (it._addr == addr && it._id != self) {
return it._id;
}
}
return {};
}
// Inserts a new mapping or updates the existing one.
// The function verifies that if the mapping exists, then its inet_address
// and the provided one match.
@@ -329,6 +340,14 @@ public:
}
// No action needed when a regular entry is updated
}

// A shortcut to setting a new permanent address
void set(raft::server_address addr) {
return set(addr.id,
ser::deserialize_from_buffer(addr.info, boost::type<gms::inet_address>{}),
false);
}

// Erase an entry from the server address map.
// Does nothing if an element with a given id does not exist.
void erase(raft::server_id id) {
Loading

0 comments on commit c22f945

Please sign in to comment.