Skip to content

Commit

Permalink
raft: make forwarding optional
Browse files Browse the repository at this point in the history
In absence of abort_source or timeouts in Raft API, automatic
bouncing can create too much noise during testing, especially
during network failures. Add an option to disable follower
bouncing feature, since randomized_nemesis_test has its own
bouncing which handles timeouts correctly.

Optionally disable forwarding in basic_generator_test.
  • Loading branch information
kostja committed Nov 25, 2021
1 parent c22f945 commit 6d28927
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 10 deletions.
36 changes: 29 additions & 7 deletions raft/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class server_impl : public rpc_server, public server {
raft::server_id id() const override;
future<> stepdown(logical_clock::duration timeout) override;
future<> modify_config(std::vector<server_address> add, std::vector<server_id> del) override;
future<entry_id> add_entry_on_leader(command command);
private:
std::unique_ptr<rpc> _rpc;
std::unique_ptr<state_machine> _state_machine;
Expand Down Expand Up @@ -411,20 +412,24 @@ future<> server_impl::add_entry_internal(T command, wait_type type) {
co_return co_await wait_for_entry({.term = e.term, .idx = e.idx}, type);
}

future<entry_id> server_impl::add_entry_on_leader(command cmd) {
// Wait for a new slot to become available
co_await _fsm->wait_max_log_size();
logger.trace("Adding entry after log size limit check.");

const log_entry& e = _fsm->add_entry(std::move(cmd));

co_return entry_id{.term = e.term, .idx = e.idx};
}

future<add_entry_reply> server_impl::execute_add_entry(server_id from, command cmd) {
if (from != _id && !_fsm->get_configuration().contains(from)) {
// Do not accept entries from servers removed from the
// configuration.
co_return add_entry_reply{not_a_leader{server_id{}}};
}
try {
// Wait for a new slot to become available
co_await _fsm->wait_max_log_size();
logger.trace("Adding entry after log size limit check.");

const log_entry& e = _fsm->add_entry(std::move(cmd));

co_return add_entry_reply{entry_id{.term = e.term, .idx = e.idx}};
co_return add_entry_reply{co_await add_entry_on_leader(std::move(cmd))};
} catch (raft::not_a_leader& e) {
co_return add_entry_reply{e};
}
Expand All @@ -434,6 +439,13 @@ future<> server_impl::add_entry(command command, wait_type type) {
_stats.add_command++;
server_id leader = _fsm->current_leader();
logger.trace("An entry is submitted");
if (!_config.enable_forwarding) {
if (leader != _id) {
throw not_a_leader{leader};
}
auto eid = co_await add_entry_on_leader(std::move(command));
co_return co_await wait_for_entry(eid, type);
}
while (true) {
if (leader == server_id{}) {
logger.trace("The leader is unknown, waiting through uncertainty");
Expand Down Expand Up @@ -502,6 +514,16 @@ future<add_entry_reply> server_impl::execute_modify_config(server_id from,

future<> server_impl::modify_config(std::vector<server_address> add, std::vector<server_id> del) {
server_id leader = _fsm->current_leader();
if (!_config.enable_forwarding) {
if (leader != _id) {
throw not_a_leader{leader};
}
auto reply = co_await execute_modify_config(leader, std::move(add), std::move(del));
if (std::holds_alternative<raft::entry_id>(reply)) {
co_return co_await wait_for_entry(std::get<raft::entry_id>(reply), wait_type::committed);
}
throw raft::not_a_leader{_fsm->current_leader()};
}
while (true) {
if (leader == server_id{}) {
co_await wait_for_leader();
Expand Down
5 changes: 5 additions & 0 deletions raft/server.hh
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public:
size_t max_log_size = 5000;
// If set to true will enable prevoting stage during election
bool enable_prevoting = true;
// If set to true, forward configuration and entries from
// follower to the leader autmatically. This guarantees
// add_entry()/modify_config() never throws not_a_leader,
// but makes timed_out_error more likely.
bool enable_forwarding = true;
};

virtual ~server() {}
Expand Down
11 changes: 8 additions & 3 deletions test/raft/randomized_nemesis_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2160,15 +2160,21 @@ SEASTAR_TEST_CASE(basic_generator_test) {
}}
}, 200'000);

auto leader_id = co_await env.new_server(true);
auto seed = tests::random::get_int<int32_t>();
std::mt19937 random_engine{seed};
raft::server::configuration srv_cfg{
.enable_forwarding = false, //std::bernoulli_distribution{0.5}(random_engine)
};

auto leader_id = co_await env.new_server(true, srv_cfg);

// Wait for the server to elect itself as a leader.
assert(co_await wait_for_leader<AppendReg>{}(env, {leader_id}, timer, timer.now() + 1000_t) == leader_id);

size_t no_all_servers = 10;
std::vector<raft::server_id> all_servers{leader_id};
for (size_t i = 1; i < no_all_servers; ++i) {
all_servers.push_back(co_await env.new_server(false));
all_servers.push_back(co_await env.new_server(false, srv_cfg));
}

size_t no_init_servers = 5;
Expand Down Expand Up @@ -2208,7 +2214,6 @@ SEASTAR_TEST_CASE(basic_generator_test) {

auto reconfig_thread = some(threads_without_nemesis);

auto seed = tests::random::get_int<int32_t>();

raft_call<AppendReg>::state_type db_call_state {
.env = env,
Expand Down

0 comments on commit 6d28927

Please sign in to comment.