Skip to content

Commit

Permalink
Merge "wire up schema raft state machine" from Gleb
Browse files Browse the repository at this point in the history
This series wires up the schema state machine to process raft commands
and transfer snapshots. The series assumes that raft group zero is used
for schema transfer only and that single raft command contains single
schema change in a form of canonical_mutation array. Both assumptions
may change in which case the code will be changed accordingly, but we
need to start somewhere.

* scylla-dev/gleb/schema-raft-sm-v2:
  schema raft sm: request schema sync on schema_state_machine snapshot transfer
  raft service: delegate snapshot transfer to a state machine implementation
  schema raft sm: pass migration manager to schema_raft_state_machine and merge schema on apply()
  • Loading branch information
tgrabiec committed Dec 8, 2021
2 parents 92e7fbe + cab1a1c commit 5eaca85
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 8 deletions.
4 changes: 2 additions & 2 deletions service/raft/raft_group0.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ raft_server_for_group raft_group0::create_server_for_group(raft::group_id gid,
raft::server_address my_addr) {

_raft_gr.address_map().set(my_addr);
auto rpc = std::make_unique<raft_rpc>(_ms, _raft_gr.address_map(), gid, my_addr.id);
auto state_machine = std::make_unique<schema_raft_state_machine>(_mm);
auto rpc = std::make_unique<raft_rpc>(*state_machine, _ms, _raft_gr.address_map(), gid, my_addr.id);
// Keep a reference to a specific RPC class.
auto& rpc_ref = *rpc;
auto storage = std::make_unique<raft_sys_table_storage>(_qp, gid, my_addr.id);
auto& persistence_ref = *storage;
auto state_machine = std::make_unique<schema_raft_state_machine>();
auto server = raft::create_server(my_addr.id, std::move(rpc), std::move(state_machine),
std::move(storage), _raft_gr.failure_detector(), raft::server::configuration());

Expand Down
8 changes: 5 additions & 3 deletions service/raft/raft_rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <seastar/core/coroutine.hh>
#include "service/raft/raft_rpc.hh"
#include "gms/inet_address.hh"
#include "gms/inet_address_serializer.hh"
Expand All @@ -30,8 +31,8 @@ namespace service {

static seastar::logger rlogger("raft_rpc");

raft_rpc::raft_rpc(netw::messaging_service& ms, raft_address_map<>& address_map, raft::group_id gid, raft::server_id srv_id)
: _group_id(std::move(gid)), _server_id(srv_id), _messaging(ms), _address_map(address_map)
raft_rpc::raft_rpc(raft_state_machine& sm, netw::messaging_service& ms, raft_address_map<>& address_map, raft::group_id gid, raft::server_id srv_id)
: _sm(sm), _group_id(std::move(gid)), _server_id(srv_id), _messaging(ms), _address_map(address_map)
{}

future<raft::snapshot_reply> raft_rpc::send_snapshot(raft::server_id id, const raft::install_snapshot& snap, seastar::abort_source& as) {
Expand Down Expand Up @@ -193,7 +194,8 @@ future<raft::read_barrier_reply> raft_rpc::execute_read_barrier(raft::server_id
}

future<raft::snapshot_reply> raft_rpc::apply_snapshot(raft::server_id from, raft::install_snapshot snp) {
return _client->apply_snapshot(from, std::move(snp));
co_await _sm.transfer_snapshot(_address_map.get_inet_address(from), snp.snp.id);
co_return co_await _client->apply_snapshot(from, std::move(snp));
}

future<raft::add_entry_reply> raft_rpc::execute_add_entry(raft::server_id from, raft::command cmd) {
Expand Down
4 changes: 3 additions & 1 deletion service/raft/raft_rpc.hh
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "message/messaging_service_fwd.hh"
#include "utils/UUID.hh"
#include "service/raft/raft_address_map.hh"
#include "service/raft/raft_state_machine.hh"

namespace service {

Expand All @@ -43,6 +44,7 @@ inet_addr_to_raft_addr(gms::inet_address addr) {
// Uses `netw::messaging_service` as an underlying implementation for
// actually sending RPC messages.
class raft_rpc : public raft::rpc {
raft_state_machine& _sm;
raft::group_id _group_id;
raft::server_id _server_id;
netw::messaging_service& _messaging;
Expand All @@ -54,7 +56,7 @@ class raft_rpc : public raft::rpc {
}

public:
explicit raft_rpc(netw::messaging_service& ms, raft_address_map<>& address_map, raft::group_id gid, raft::server_id srv_id);
explicit raft_rpc(raft_state_machine& sm, netw::messaging_service& ms, raft_address_map<>& address_map, raft::group_id gid, raft::server_id srv_id);

future<raft::snapshot_reply> send_snapshot(raft::server_id server_id, const raft::install_snapshot& snap, seastar::abort_source& as) override;
future<> send_append_entries(raft::server_id id, const raft::append_request& append_request) override;
Expand Down
35 changes: 35 additions & 0 deletions service/raft/raft_state_machine.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (C) 2021-present ScyllaDB
*/

/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once

#include "gms/inet_address.hh"
#include "raft/raft.hh"

namespace service {

// Scylla specific extention for raft state machine
// Snapshot transfer is delegated to a state machine implementation
class raft_state_machine : public raft::state_machine {
public:
virtual future<> transfer_snapshot(gms::inet_address from, raft::snapshot_id snp) = 0;
};

} // end of namespace service
33 changes: 32 additions & 1 deletion service/raft/schema_raft_state_machine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,35 @@
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "service/raft/schema_raft_state_machine.hh"
#include <seastar/core/coroutine.hh>
#include "service/migration_manager.hh"
#include "message/messaging_service.hh"
#include "canonical_mutation.hh"
#include "schema_mutations.hh"
#include "frozen_schema.hh"
#include "serialization_visitors.hh"
#include "serializer.hh"
#include "idl/frozen_schema.dist.hh"
#include "idl/uuid.dist.hh"
#include "serializer_impl.hh"
#include "idl/frozen_schema.dist.impl.hh"
#include "idl/uuid.dist.impl.hh"
#include "service/migration_manager.hh"

namespace service {

static logging::logger slogger("schema_raft_sm");

future<> schema_raft_state_machine::apply(std::vector<raft::command_cref> command) {
return make_ready_future<>();
slogger.trace("apply() is called");
for (auto&& c : command) {
auto is = ser::as_input_stream(c);
std::vector<canonical_mutation> mutations =
ser::deserialize(is, boost::type<std::vector<canonical_mutation>>());

slogger.trace("merging schema mutations");
co_await _mm.merge_schema_from(netw::messaging_service::msg_addr(gms::inet_address{}), std::move(mutations));
}
}

future<raft::snapshot_id> schema_raft_state_machine::take_snapshot() {
Expand All @@ -38,6 +62,13 @@ future<> schema_raft_state_machine::load_snapshot(raft::snapshot_id id) {
return make_ready_future<>();
}

future<> schema_raft_state_machine::transfer_snapshot(gms::inet_address from, raft::snapshot_id snp) {
// Note that this may bring newer state than the schema state machine raft's
// log, so some raft entries may be double applied, but since the state
// machine idempotent it is not a problem.
return _mm.submit_migration_task(from, false);
}

future<> schema_raft_state_machine::abort() {
return make_ready_future<>();
}
Expand Down
9 changes: 8 additions & 1 deletion service/raft/schema_raft_state_machine.hh
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,24 @@

#include "raft/raft.hh"
#include "utils/UUID_gen.hh"
#include "service/raft/raft_state_machine.hh"

namespace service {
class migration_manager;

class migration_manager;

// Raft state machine implementation for managing schema changes.
// NOTE: schema raft server is always instantiated on shard 0.
class schema_raft_state_machine : public raft::state_machine {
class schema_raft_state_machine : public raft_state_machine {
migration_manager& _mm;
public:
schema_raft_state_machine(migration_manager& mm) : _mm(mm) {}
future<> apply(std::vector<raft::command_cref> command) override;
future<raft::snapshot_id> take_snapshot() override;
void drop_snapshot(raft::snapshot_id id) override;
future<> load_snapshot(raft::snapshot_id id) override;
future<> transfer_snapshot(gms::inet_address from, raft::snapshot_id snp) override;
future<> abort() override;
};

Expand Down

0 comments on commit 5eaca85

Please sign in to comment.