Skip to content

Commit

Permalink
Merge "test: raft: randomized_nemesis_test: introduce server stop/cra…
Browse files Browse the repository at this point in the history
…sh nemesis" from Kamil

We begin by preparing the `persistence` class so that the storage can be
reused across different Raft server instances: the test keeps a shared
pointer to the storage so that when a server stops, a new server with
the same ID can be reconstructed with this storage.

We then modify `environment` so that server instances can be removed and
replaced in middle of operations.

Finally we prepare a nemesis operation which gracefully stops or
immediately crashes a randomly picked server and run this operation
periodically in `basic_generator_test`.

One important change that changes the API of `raft::server` is included:
the metrics are not automatically registered in `start()`. This is
because metric registration modifies global data structures, which
cannot be done twice with the same set of metrics (and we would do it
when we restart a server with the same ID). Instead,
`register_metrics()` is exposed in the `raft::server` interface to be
called when running servers in production.

* kbr/crashes-v3:
  raft: server: print the ID of aborted server
  test: raft: randomized_nemesis_test: run stop_crash nemesis in `basic_generator_test`
  test: raft: randomized_nemesis_test: introduce `stop_crash` operation
  test: raft: randomized_nemesis_test: environment: implement server `stop` and `crash`
  raft: server: don't register metrics in `start()`
  test: raft: randomized_nemesis_test: raft_server: return `stopped_error` when called during abort
  test: raft: randomized_nemesis_test: handle `raft::stopped_error`
  test: raft: randomized_nemesis_test: handle missing servers in `environment` call functions
  test: raft: randomized_nemesis_test: environment: split `new_server` into `new_node` and `start_server`
  test: raft: randomized_nemesis_test: remove `environment::get_server`
  test: raft: randomized_nemesis_test: construct `persistence_proxy` outside `raft_server<M>::create`
  test: raft: randomized_nemesis_test: persistence_proxy: store a shared pointer to `persistence`
  test: raft: randomized_nemesis_test: persistence: split into two classes
  test: raft: logical_timer: introduce `sleep_until`
  • Loading branch information
tgrabiec committed Dec 7, 2021
2 parents bb0f8c3 + 75bab2b commit 2a36377
Show file tree
Hide file tree
Showing 6 changed files with 454 additions and 128 deletions.
6 changes: 2 additions & 4 deletions raft/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class server_impl : public rpc_server, public server {
future<> stepdown(logical_clock::duration timeout) override;
future<> modify_config(std::vector<server_address> add, std::vector<server_id> del) override;
future<entry_id> add_entry_on_leader(command command);
void register_metrics() override;
private:
std::unique_ptr<rpc> _rpc;
std::unique_ptr<state_machine> _state_machine;
Expand Down Expand Up @@ -228,7 +229,6 @@ class server_impl : public rpc_server, public server {
future<> _applier_status = make_ready_future<>();
future<> _io_status = make_ready_future<>();

void register_metrics();
seastar::metrics::metric_groups _metrics;

// Server address set to be used by RPC module to maintain its address
Expand Down Expand Up @@ -323,8 +323,6 @@ future<> server_impl::start() {
// start fiber to apply committed entries
_applier_status = applier_fiber();

// Metrics access _fsm, so create them only after the pointer is populated
register_metrics();
co_return;
}

Expand Down Expand Up @@ -1082,7 +1080,7 @@ void server_impl::abort_snapshot_transfers() {
}

future<> server_impl::abort() {
logger.trace("abort() called");
logger.trace("[{}]: abort() called", _id);
_fsm->stop();
_apply_entries.abort(std::make_exception_ptr(stop_apply_fiber()));

Expand Down
5 changes: 5 additions & 0 deletions raft/server.hh
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ public:
// In case of a timeout returns timeout_error.
virtual future<> stepdown(logical_clock::duration timeout) = 0;

// Register metrics for this server. Metric are global but their names
// depend on the server's ID, so it is possible to register metrics
// of two servers iff their IDs are different.
virtual void register_metrics() = 0;

// Ad hoc functions for testing
virtual void wait_until_candidate() = 0;
virtual future<> wait_election_done() = 0;
Expand Down
1 change: 1 addition & 0 deletions service/raft/raft_group_registry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ future<> raft_group_registry::start_server_for_group(raft_server_for_group new_g
// start the server instance prior to arming the ticker timer.
// By the time the tick() is executed the server should already be initialized.
co_await new_grp.server->start();
new_grp.server->register_metrics();
} catch (...) {
on_internal_error(rslog, std::current_exception());
}
Expand Down
14 changes: 14 additions & 0 deletions test/raft/generator.hh
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,20 @@ thread_id some(const thread_set& s) {
return *s.begin();
}

template <size_t... I>
auto take_impl(const std::vector<thread_id>& vec, std::index_sequence<I...>) {
return std::make_tuple(vec[I]...);
}

template <size_t N>
auto take(const thread_set& s) {
assert(N <= s.size());
auto end = s.begin();
std::advance(end, N);
std::vector<thread_id> vec{s.begin(), end};
return take_impl(vec, std::make_index_sequence<N>{});
}

// Make a set of `n` threads.
thread_set make_thread_set(size_t n) {
thread_set s;
Expand Down
19 changes: 13 additions & 6 deletions test/raft/logical_timer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,11 @@ public:
return res;
}

// Returns a future that resolves after a number of `tick()`s represented by `d`.
// Example usage: `sleep(20_t)` resolves after 20 `tick()`s.
// Returns a future that resolves at logical time point `tp` (according to this timer's clock).
// Note: analogous remark applies as for `with_timeout`, i.e. make sure to call at least one `tick`
// per one `sleep` call on average.
future<> sleep(raft::logical_clock::duration d) {
if (d == raft::logical_clock::duration{0}) {
// per one `sleep_until` call on average.
future<> sleep_until(raft::logical_clock::time_point tp) {
if (tp <= now()) {
return make_ready_future<>();
}

Expand All @@ -190,14 +189,22 @@ public:
auto s = make_shared<sched>();
auto f = s->_p.get_future();
_scheduled.push_back(scheduled{
._at = now() + d,
._at = tp,
._impl = std::move(s)
});
std::push_heap(_scheduled.begin(), _scheduled.end(), cmp);

return f;
}

// Returns a future that resolves after a number of `tick()`s represented by `d`.
// Example usage: `sleep(20_t)` resolves after 20 `tick()`s.
// Note: analogous remark applies as for `with_timeout`, i.e. make sure to call at least one `tick`
// per one `sleep` call on average.
future<> sleep(raft::logical_clock::duration d) {
return sleep_until(now() + d);
}

// Schedule `f` to be called at logical time point `tp` (according to this timer's clock).
template <typename F>
void schedule(raft::logical_clock::time_point tp, F f) {
Expand Down
Loading

0 comments on commit 2a36377

Please sign in to comment.