Skip to content

Commit

Permalink
raft: (discovery) introduce leader discovery state machine
Browse files Browse the repository at this point in the history
Introduce a special state machine used to to find
a leader of an existing Raft cluster or create
a new cluster.

This state machine should be used when a new
Scylla node has no persisted Raft Group 0 configuration.

The algorithm is initialized with a list of seed
IP addresses, IP address of this server, and,
this server's Raft server id.

The IP addresses are used to construct an initial list of peers.

Then, the algorithm tries to contact each peer (excluding self) from
its peer list and share the peer list with this peer, as well as
get the peer's peer list. If this peer is already part of
some Raft cluster, this information is also shared. On a response
from a peer, the current peer's peer list is updated. The
algorithm stops when all peers have exchanged peer information or
one of the peers responds with id of a Raft group and Raft
server address of the group leader.

(If any of the peers fails to respond, the algorithm re-tries
ad infinitum with a timeout).

More formally, the algorithm stops when one of the following is true:
- it finds an instance with initialized Raft Group 0, with a leader
- all the peers have been contacted, and this server's
  Raft server id is the smallest among all contacted peers.
kostja committed Nov 25, 2021
1 parent 30e3227 commit 8ee88a9
Showing 4 changed files with 392 additions and 0 deletions.
6 changes: 6 additions & 0 deletions configure.py
Original file line number Diff line number Diff line change
@@ -563,6 +563,7 @@ def find_headers(repodir, excluded_dirs):
'test/raft/etcd_test',
'test/raft/raft_sys_table_storage_test',
'test/raft/raft_address_map_test',
'test/raft/discovery_test',
])

apps = set([
@@ -1018,6 +1019,7 @@ def find_headers(repodir, excluded_dirs):
'service/raft/raft_rpc.cc',
'service/raft/raft_gossip_failure_detector.cc',
'service/raft/raft_group_registry.cc',
'service/raft/discovery.cc',
] + [Antlr3Grammar('cql3/Cql.g')] + [Thrift('interface/cassandra.thrift', 'Cassandra')] \
+ scylla_raft_core
)
@@ -1279,6 +1281,10 @@ def find_headers(repodir, excluded_dirs):
deps['test/raft/raft_sys_table_storage_test'] = ['test/raft/raft_sys_table_storage_test.cc'] + \
scylla_core + scylla_tests_generic_dependencies
deps['test/raft/raft_address_map_test'] = ['test/raft/raft_address_map_test.cc'] + scylla_core
deps['test/raft/discovery_test'] = ['test/raft/discovery_test.cc',
'test/raft/helpers.cc',
'test/lib/log.cc',
'service/raft/discovery.cc'] + scylla_raft_dependencies

deps['utils/gz/gen_crc_combine_table'] = ['utils/gz/gen_crc_combine_table.cc']

146 changes: 146 additions & 0 deletions service/raft/discovery.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright (C) 2021-present ScyllaDB
*/

/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "service/raft/discovery.hh"

namespace service {

void check_peer(const raft::server_address& peer) {
if (!peer.info.size()) {
throw std::logic_error("Discovery requires peer internet address to be set");
}
}

discovery::discovery(raft::server_address self, const peer_list& seeds)
: _self(std::move(self)) {

// self must have a non-empty Internet address
check_peer(_self);
for (const auto& addr : seeds) {
check_peer(addr);
}
_peer_list.push_back(_self);

step(seeds);
}

void discovery::step(const peer_list& peers) {

if (_is_leader) {
return;
}

peer_set new_peers;
// Set to true if we learned about a new peer or
// received Raft server ID for one of the seeds.
bool refresh_peer_list = false;

for (const auto& addr : peers) {
// peer must have a non-empty Internet address
if (addr.info == _self.info) {
// do not include _self into _peers
continue;
}
auto it = _peers.find(addr);
// Update peer information if it's a new peer or provides
// a Raft ID for an existing peer.
if (it == _peers.end() || it->id == raft::server_id{}) {
refresh_peer_list = true;
if (it == _peers.end()) {
_peers.emplace(addr);
new_peers.emplace(addr);
} else {
// Update Raft ID
_peers.erase(it);
_peers.emplace(addr);
}
} else {
// If we have this peer, its ID must be the
// same as we know (with the exceptions of seeds,
// for which servers might not know ids at first).
assert(it == _peers.end() || it->id == addr.id || addr.id == raft::server_id{});
}
}
if (refresh_peer_list) {
_peer_list = {_peers.begin(), _peers.end()};
_peer_list.push_back(_self);
}
maybe_become_leader();
if (_is_leader) {
return;
}
for (const auto& peer : new_peers) {
_requests.push_back(std::make_pair(peer, _peer_list));
}
}

void discovery::maybe_become_leader() {
/*
* _responded is a subset of _peers.
* When all contacted peers have responded, we're ready
* to choose a node with the smallest id for the leader.
*/
if (_responded.size() < _peers.size()) {
return;
}
auto min_id = std::min_element(_peer_list.begin(), _peer_list.end());
if (min_id != _peer_list.end() && min_id->id == _self.id) {
_is_leader = true;
}
}

discovery::peer_list discovery::request(const peer_list& peers) {
step(peers);
return _peer_list;
}

void discovery::response(raft::server_address from, const peer_list& peers) {
assert(_peers.contains(from));
_responded.emplace(from);
step(peers);
}

discovery::output discovery::get_output() {
if (_is_leader) {
return i_am_leader{};
} else if (!_requests.empty()) {
return std::move(_requests);
} else {
if (_responded.size() == _peers.size()) {
// All have responded, but we're not a leader.
// Try to find out who it is. Don't waste traffic on
// the peer list.
for (const auto& peer : _peers) {
_requests.push_back(std::make_pair(peer, peer_list{}));
}
} else {
// Contact new peers
for (const auto& peer : _peers) {
if (_responded.contains(peer)) {
continue;
}
_requests.push_back(std::make_pair(peer, _peer_list));
}
}
return pause{};
}
}

} // end of namespace raft
128 changes: 128 additions & 0 deletions service/raft/discovery.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright (C) 2021-present ScyllaDB
*/

/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "raft/raft.hh"

namespace service {

// Raft leader discovery FSM
// https://github.com/kbr-/scylla-raft-boot/blob/master/boot.tla
//
// Contact all known peers, extending the transitive closure of
// the known peers, sharing this server's Raft Id and the list of
// its peers. Once the transitive closure of peers has been built,
// select the peer with the smallest Raft Id to be the leader. To
// be used during initial setup of Raft Group 0.
class discovery {
public:
// During discovery, peers are identified based on their Internet
// address, not Raft server id.
struct server_address_hash {
size_t operator()(const raft::server_address& address) const {
return std::hash<bytes>{}(address.info);
}
};
struct server_address_equal {
bool operator()(const raft::server_address& rhs, const raft::server_address&lhs) const {
return rhs.info == lhs.info;
}
};

// When a fresh cluster is bootstrapping, peer list is
// used to build a transitive closure of all cluster members
// and select an initial Raft configuration of the cluster.
using peer_list = std::vector<raft::server_address>;
using peer_set = std::unordered_set<raft::server_address, server_address_hash, server_address_equal>;
struct i_am_leader {};
struct pause {};
using request_list = std::vector<std::pair<raft::server_address, peer_list>>;
// @sa discovery::get_output()
using output = std::variant<i_am_leader, pause, request_list>;
private:
raft::server_address _self;
// Assigned if this server elects itself a leader.
bool _is_leader = false;
// _seeds + all peers we've discovered, excludes _self
peer_set _peers;
// A subset of _peers which have responded to our requests, excludes _self.
peer_set _responded;
// _peers + self - the peer list we're sharing; if this node
// is a leader, empty list to save bandwidth
peer_list _peer_list;
// outstanding messages
request_list _requests;
private:
// Update this state machine with new peer data and
// create outbound messages if necessary.
void step(const peer_list& peers);
// Check if we can run election and then elect itself
// a leader.
void maybe_become_leader();
public:
// For construction, pass this server's Internet address and
// Raft id - and a set of seed Internet addresses. It's OK to
// leave Raft ids of seed peers unset, they will be updated as
// these peers respond.
//
// For discovery to work correctly the following must hold:
//
// - this server's Raft id must survive restarts.
// The opposite would be a Byzantine failure: imagine
// we generate and share a big id first, so another node
// elects itself a leader. Then this node restarts, generates
// the smallest known id and elects itself a leader too.
//
// - the seed graph must contain a vertex which is reachable from
// every other vertex, for example it can be be fully
// connected, with either each server having at least one
// common seed or seed connections forming a loop. A rule of
// thumb is to use the same seed list everywhere.
//
discovery(raft::server_address self, const peer_list& seeds);

// To be used on the receiving peer to generate a reply
// while the discovery protocol is in progress. Always
// returns a peer list, even if this node is a leader,
// since leader state must be persisted first.
peer_list request(const peer_list& peers);

// Submit a reply from one of the peers to this discovery
// state machine. If this node is a leader, resposne is
// ignored.
void response(raft::server_address from, const peer_list& peers);

// Until all peers answer, returns a list of messages for the
// peers which haven't replied yet. As soon as all peers have
// replied, returns a pause{}, to allow some node to become
// a leader, and then a list of messages for all peers which
// can be used to find the leader. If this node is a leader,
// returns leader{}.
discovery::output get_output();

// A helper for testing.
bool is_leader() { return _is_leader; }

// A helper used for testing
raft::server_id id() const { return _self.id; }
};

} // namespace raft

112 changes: 112 additions & 0 deletions test/raft/discovery_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright (C) 2021-present ScyllaDB
*/

/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#define BOOST_TEST_MODULE raft
#include "test/raft/helpers.hh"
#include "service/raft/discovery.hh"

using namespace raft;

using discovery_network = std::unordered_map<server_id, service::discovery*>;

using service::discovery;

void
run_discovery_impl(discovery_network& network) {
while (true) {
for (auto e : network) {
discovery& from = *e.second;
auto output = from.get_output();
if (std::holds_alternative<discovery::i_am_leader>(output)) {
return;
} else if (std::holds_alternative<discovery::pause>(output)) {
continue;
}
auto& msgs = std::get<discovery::request_list>(output);
for (auto&& m : msgs) {
auto it = network.find(m.first.id);
if (it == network.end()) {
// The node is not available, drop the message
continue;
}
discovery& to = *(it->second);
from.response(m.first, to.request(m.second));
}
}
}
}

template <typename... Args>
void run_discovery(Args&&... args) {
discovery_network network;
auto add_node = [&network](discovery& node) -> void {
network.emplace(node.id(), &node);
};
(add_node(args), ...);
run_discovery_impl(network);
}

BOOST_AUTO_TEST_CASE(test_basic) {

server_address addr1 = {.id = id()};

// Must supply an Internet address for self
BOOST_CHECK_THROW(discovery(addr1, {}), std::logic_error);
server_address addr2 = {.id = id(), .info = "192.168.1.2"};
BOOST_CHECK_NO_THROW(discovery(addr2, {}));
// Must supply an Internet address for each peer
BOOST_CHECK_THROW(discovery(addr2, {addr1}), std::logic_error);
// OK to include self into peers
BOOST_CHECK_NO_THROW(discovery(addr2, {addr2}));
// With a single peer, discovery immediately finds a leader
discovery d(addr2, {});
BOOST_CHECK(d.is_leader());
d = discovery(addr2, {addr2});
BOOST_CHECK(d.is_leader());
}


BOOST_AUTO_TEST_CASE(test_discovery) {

server_address addr1 = {.id = id(), .info = "192.168.1.1"};
server_address addr2 = {.id = id(), .info = "192.168.1.2"};

discovery d1(addr1, {addr2});
discovery d2(addr2, {addr1});
run_discovery(d1, d2);

BOOST_CHECK(d1.is_leader() ^ d2.is_leader());
}

BOOST_AUTO_TEST_CASE(test_discovery_fullmesh) {

server_address addr1 = {.id = id(), .info = "127.0.0.13"};
server_address addr2 = {.id = id(), .info = "127.0.0.19"};
server_address addr3 = {.id = id(), .info = "127.0.0.21"};

auto seeds = std::vector<server_address>({addr1, addr2, addr3});

discovery d1(addr1, seeds);
discovery d2(addr2, seeds);
discovery d3(addr3, seeds);
run_discovery(d1, d2, d3);

BOOST_CHECK(d1.is_leader() ^ d2.is_leader() ^ d3.is_leader());
}

0 comments on commit 8ee88a9

Please sign in to comment.