Skip to content

Commit

Permalink
Persist gossip state (next expected sequence num) to disk (MystenLabs…
Browse files Browse the repository at this point in the history
…#2445)

* FollowerStore for storing the next expected sequence from a gossip peer

* Persist next expected sequence number from gossip peers

* Fix tests

* Add TODO
  • Loading branch information
mystenmark authored Jun 6, 2022
1 parent eb4c334 commit 2f4366f
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 19 deletions.
15 changes: 15 additions & 0 deletions crates/sui-core/src/authority_active.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use std::{
sync::Arc,
time::Duration,
};
use sui_storage::follower_store::FollowerStore;
use sui_types::{base_types::AuthorityName, error::SuiResult};
use tokio::sync::Mutex;
use tracing::error;
Expand Down Expand Up @@ -100,6 +101,7 @@ impl AuthorityHealth {
pub struct ActiveAuthority<A> {
// The local authority state
pub state: Arc<AuthorityState>,
pub follower_store: Arc<FollowerStore>,
// The network interfaces to other authorities
pub net: ArcSwap<AuthorityAggregator<A>>,
// Network health
Expand All @@ -109,6 +111,7 @@ pub struct ActiveAuthority<A> {
impl<A> ActiveAuthority<A> {
pub fn new(
authority: Arc<AuthorityState>,
follower_store: Arc<FollowerStore>,
authority_clients: BTreeMap<AuthorityName, A>,
) -> SuiResult<Self> {
let committee = authority.clone_committee();
Expand All @@ -122,13 +125,24 @@ impl<A> ActiveAuthority<A> {
.collect(),
)),
state: authority,
follower_store,
net: ArcSwap::from(Arc::new(AuthorityAggregator::new(
committee,
authority_clients,
))),
})
}

#[cfg(test)]
pub fn new_with_ephemeral_follower_store(
authority: Arc<AuthorityState>,
authority_clients: BTreeMap<AuthorityName, A>,
) -> SuiResult<Self> {
let working_dir = tempfile::tempdir().unwrap();
let follower_store = Arc::new(FollowerStore::open(&working_dir).expect("cannot open db"));
Self::new(authority, follower_store, authority_clients)
}

/// Returns the amount of time we should wait to be able to contact at least
/// 2/3 of the nodes in the committee according to the `no_contact_before`
/// instant stored in the authority health records. A network needs 2/3 stake
Expand Down Expand Up @@ -179,6 +193,7 @@ impl<A> Clone for ActiveAuthority<A> {
fn clone(&self) -> Self {
ActiveAuthority {
state: self.state.clone(),
follower_store: self.follower_store.clone(),
net: ArcSwap::from(self.net.load().clone()),
health: self.health.clone(),
}
Expand Down
21 changes: 15 additions & 6 deletions crates/sui-core/src/authority_active/checkpoint_driver/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ async fn checkpoint_active_flow_happy_path() {
for inner_state in authorities.clone() {
let clients = aggregator.clone_inner_clients();
let _active_handle = tokio::task::spawn(async move {
let active_state =
ActiveAuthority::new(inner_state.authority.clone(), clients).unwrap();
let active_state = ActiveAuthority::new_with_ephemeral_follower_store(
inner_state.authority.clone(),
clients,
)
.unwrap();
active_state.spawn_all_active_processes().await
});
}
Expand Down Expand Up @@ -95,8 +98,11 @@ async fn checkpoint_active_flow_crash_client_with_gossip() {
for inner_state in authorities.clone() {
let clients = aggregator.clone_inner_clients();
let _active_handle = tokio::task::spawn(async move {
let active_state =
ActiveAuthority::new(inner_state.authority.clone(), clients).unwrap();
let active_state = ActiveAuthority::new_with_ephemeral_follower_store(
inner_state.authority.clone(),
clients,
)
.unwrap();
// Spin the gossip service.
active_state.spawn_active_processes(true, true).await;
});
Expand Down Expand Up @@ -178,8 +184,11 @@ async fn checkpoint_active_flow_crash_client_no_gossip() {
for inner_state in authorities.clone() {
let clients = aggregator.clone_inner_clients();
let _active_handle = tokio::task::spawn(async move {
let active_state =
ActiveAuthority::new(inner_state.authority.clone(), clients).unwrap();
let active_state = ActiveAuthority::new_with_ephemeral_follower_store(
inner_state.authority.clone(),
clients,
)
.unwrap();
// Spin the gossip service.
active_state.spawn_active_processes(false, true).await;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl ConfigurableBatchActionClient {
let path = dir.join(format!("DB_{:?}", ObjectID::random()));
fs::create_dir(&path).unwrap();

let store = Arc::new(AuthorityStore::open(path, None));
let store = Arc::new(AuthorityStore::open(path.clone(), None));
let state = AuthorityState::new(
committee.clone(),
address,
Expand Down
34 changes: 29 additions & 5 deletions crates/sui-core/src/authority_active/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use futures::{stream::FuturesUnordered, StreamExt};
use std::future::Future;
use std::ops::Deref;
use std::{collections::HashSet, sync::Arc, time::Duration};
use sui_storage::follower_store::FollowerStore;
use sui_types::committee::StakeUnit;
use sui_types::{
base_types::AuthorityName,
Expand All @@ -35,6 +36,7 @@ struct PeerGossip<A> {
peer_name: AuthorityName,
client: SafeClient<A>,
state: Arc<AuthorityState>,
follower_store: Arc<FollowerStore>,
max_seq: Option<TxSequenceNumber>,
aggregator: Arc<AuthorityAggregator<A>>,
}
Expand Down Expand Up @@ -119,7 +121,7 @@ pub async fn gossip_process_with_start_seq<A>(
gossip_tasks.push(async move {
let peer_gossip = PeerGossip::new(name, &local_active_ref_copy, start_seq);
// Add more duration if we make more than 1 to ensure overlap
debug!("Starting gossip from peer {:?}", name);
debug!(peer = ?name, "Starting gossip from peer");
peer_gossip
.start(Duration::from_secs(REFRESH_FOLLOWER_PERIOD_SECS + k * 15))
.await
Expand Down Expand Up @@ -160,15 +162,15 @@ async fn wait_for_one_gossip_task_to_finish<A>(
if let Err(err) = _result {
active_authority.set_failure_backoff(finished_name).await;
active_authority.state.metrics.gossip_task_error_count.inc();
error!("Peer {:?} returned error: {:?}", finished_name, err);
error!(peer = ?finished_name, "Peer returned error: {:?}", err);
} else {
active_authority.set_success_backoff(finished_name).await;
active_authority
.state
.metrics
.gossip_task_success_count
.inc();
debug!("End gossip from peer {:?}", finished_name);
debug!(peer = ?finished_name, "End gossip from peer");
}
peer_names.remove(&finished_name);
}
Expand Down Expand Up @@ -225,10 +227,29 @@ where
active_authority: &ActiveAuthority<A>,
start_seq: Option<TxSequenceNumber>,
) -> PeerGossip<A> {
PeerGossip {
// TODO: for validator gossip, we should always use None as the start_seq, but we should
// consult the start_seq we retrieved from the db to make sure that the peer is giving
// us new txes.
let start_seq = match active_authority
.follower_store
.get_next_sequence(&peer_name)
{
Err(e) => {
error!("Could not load next sequence from follower store, defaulting to None. Error: {}", e);
// It might seem like a good idea to return start_seq here, but if we are running
// as a full node start_seq will be Some(0), and if the gossip process is repeatedly
// restarting, we would in that case repeatedly re-request all txes from the
// beginning of the epoch which could DoS the validators we are following.
None
}
Ok(s) => s.or(start_seq),
};

Self {
peer_name,
client: active_authority.net.load().authority_clients[&peer_name].clone(),
state: active_authority.state.clone(),
follower_store: active_authority.follower_store.clone(),
max_seq: start_seq,
aggregator: active_authority.net.load().clone(),
}
Expand Down Expand Up @@ -261,7 +282,10 @@ where

items = &mut streamx.next() => {
match items {
Some(Ok(BatchInfoResponseItem(UpdateItem::Batch(_signed_batch)) )) => {},
Some(Ok(BatchInfoResponseItem(UpdateItem::Batch(signed_batch)) )) => {
let next_seq = signed_batch.batch.next_sequence_number;
self.follower_store.record_next_sequence(&self.peer_name, next_seq)?;
},

// Upon receiving a transaction digest, store it if it is not processed already.
Some(Ok(BatchInfoResponseItem(UpdateItem::Transaction((seq, digest))))) => {
Expand Down
8 changes: 6 additions & 2 deletions crates/sui-core/src/authority_active/gossip/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ pub async fn test_gossip() {
let inner_clients = clients.clone();

let handle = tokio::task::spawn(async move {
let active_state = ActiveAuthority::new(inner_state, inner_clients).unwrap();
let active_state =
ActiveAuthority::new_with_ephemeral_follower_store(inner_state, inner_clients)
.unwrap();
active_state.spawn_all_active_processes().await;
});

Expand Down Expand Up @@ -65,7 +67,9 @@ pub async fn test_gossip_error() {
let inner_clients = clients.clone();

let handle = tokio::task::spawn(async move {
let active_state = ActiveAuthority::new(inner_state, inner_clients).unwrap();
let active_state =
ActiveAuthority::new_with_ephemeral_follower_store(inner_state, inner_clients)
.unwrap();
active_state.spawn_all_active_processes().await;
});
active_authorities.push(handle);
Expand Down
14 changes: 12 additions & 2 deletions crates/sui-core/src/epoch/tests/reconfiguration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ async fn test_start_epoch_change() {
})
.unwrap();
// Create an active authority for the first authority state.
let active = ActiveAuthority::new(state.clone(), net.clone_inner_clients()).unwrap();
let active = ActiveAuthority::new_with_ephemeral_follower_store(
state.clone(),
net.clone_inner_clients(),
)
.unwrap();
// Make the high watermark differ from low watermark.
let ticket = state.batch_notifier.ticket().unwrap();

Expand Down Expand Up @@ -170,7 +174,13 @@ async fn test_finish_epoch_change() {
.await;
let actives: Vec<_> = states
.iter()
.map(|state| ActiveAuthority::new(state.clone(), net.clone_inner_clients()).unwrap())
.map(|state| {
ActiveAuthority::new_with_ephemeral_follower_store(
state.clone(),
net.clone_inner_clients(),
)
.unwrap()
})
.collect();
let results: Vec<_> = states
.iter()
Expand Down
7 changes: 5 additions & 2 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use sui_gateway::bcs_api::BcsApiImpl;
use sui_gateway::json_rpc::JsonRpcServerBuilder;
use sui_gateway::read_api::{FullNodeApi, ReadApi};
use sui_network::api::ValidatorServer;
use sui_storage::IndexStore;
use sui_storage::{follower_store::FollowerStore, IndexStore};
use tracing::info;

pub struct SuiNode {
Expand Down Expand Up @@ -58,6 +58,8 @@ impl SuiNode {
)))
};

let follower_store = Arc::new(FollowerStore::open(config.db_path().join("follower_db"))?);

let state = Arc::new(
AuthorityState::new(
genesis.committee(),
Expand Down Expand Up @@ -87,7 +89,8 @@ impl SuiNode {
authority_clients.insert(validator.public_key(), client);
}

let active_authority = ActiveAuthority::new(state.clone(), authority_clients)?;
let active_authority =
ActiveAuthority::new(state.clone(), follower_store, authority_clients)?;

// Start following validators
Some(tokio::task::spawn(async move {
Expand Down
89 changes: 89 additions & 0 deletions crates/sui-storage/src/follower_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::path::Path;
use sui_types::{
base_types::AuthorityName,
batch::TxSequenceNumber,
error::{SuiError, SuiResult},
};
use typed_store::rocks::DBMap;
use typed_store::{reopen, traits::Map};

use crate::default_db_options;

use tracing::debug;

/// FollowerStore tracks the next tx sequence numbers that we should expect after the previous
/// batch.
pub struct FollowerStore {
next_sequence: DBMap<AuthorityName, TxSequenceNumber>,
}

impl FollowerStore {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, SuiError> {
let (options, _) = default_db_options(None);

let db = {
let path = &path;
let db_options = Some(options.clone());
let opt_cfs: &[(&str, &rocksdb::Options)] = &[("next_sequence", &options)];
typed_store::rocks::open_cf_opts(path, db_options, opt_cfs)
}
.map_err(SuiError::StorageError)?;

let next_sequence = reopen!(&db, "next_sequence";<AuthorityName, TxSequenceNumber>);

Ok(Self { next_sequence })
}

pub fn get_next_sequence(&self, name: &AuthorityName) -> SuiResult<Option<TxSequenceNumber>> {
self.next_sequence.get(name).map_err(SuiError::StorageError)
}

pub fn record_next_sequence(&self, name: &AuthorityName, seq: TxSequenceNumber) -> SuiResult {
debug!(peer = ?name, ?seq, "record_next_sequence");
self.next_sequence
.insert(name, &seq)
.map_err(SuiError::StorageError)
}
}

#[cfg(test)]
mod test {
use crate::follower_store::FollowerStore;
use sui_types::crypto::get_key_pair;

#[test]
fn test_follower_store() {
let working_dir = tempfile::tempdir().unwrap();

let follower_store = FollowerStore::open(&working_dir).expect("cannot open db");

let (_, key_pair) = get_key_pair();
let val_name = key_pair.public_key_bytes();

let seq = follower_store
.get_next_sequence(val_name)
.expect("read error");
assert!(seq.is_none());

follower_store
.record_next_sequence(val_name, 42)
.expect("write error");

let seq = follower_store
.get_next_sequence(val_name)
.expect("read error");
assert_eq!(seq.unwrap(), 42);

follower_store
.record_next_sequence(val_name, 43)
.expect("write error");

let seq = follower_store
.get_next_sequence(val_name)
.expect("read error");
assert_eq!(seq.unwrap(), 43);
}
}
1 change: 1 addition & 0 deletions crates/sui-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod indexes;
pub use indexes::IndexStore;

pub mod event_store;
pub mod follower_store;
pub mod mutex_table;
pub mod write_ahead_log;

Expand Down
2 changes: 1 addition & 1 deletion crates/sui-types/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl BcsSignable for TransactionBatch {}

#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Default, Debug, Serialize, Deserialize)]
pub struct AuthorityBatch {
// TODO: Add epoch
// TODO: Add epoch, and update follower_store.rs to store the epoch of the last seen batch
/// The next sequence number after the end of this batch
pub next_sequence_number: u64,

Expand Down

0 comments on commit 2f4366f

Please sign in to comment.