Skip to content

Commit

Permalink
state-sync: verify that a peer is on the same chain as us
Browse files Browse the repository at this point in the history
Now that all nodes are required to have the genesis checkpoint we can
make use of this fact to ensure that a peer is on the same chain/network
as us by checking that they share the same checkpoint 0 as us.
  • Loading branch information
bmwill committed Jan 12, 2023
1 parent 2080b11 commit 508e592
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 95 deletions.
2 changes: 1 addition & 1 deletion crates/sui-network/src/state_sync/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ where
checkpoint_event_sender: checkpoint_event_sender.clone(),
};
let peer_heights = PeerHeights {
heights: HashMap::new(),
peers: HashMap::new(),
unprocessed_checkpoints: HashMap::new(),
sequence_number_to_digest: HashMap::new(),
}
Expand Down
230 changes: 156 additions & 74 deletions crates/sui-network/src/state_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,7 @@
//! channel will always be made in order. StateSync will also send out a notification to its peers
//! of the newly synchronized checkpoint so that it can help other peers synchronize.
// TODO
// * When querying a peer make sure that we're sending to peers that are on the same "network" as
// us. this means verifying their genesis or something else

use anemo::{rpc::Status, types::PeerEvent, PeerId, Request, Response, Result};
use anemo::{types::PeerEvent, PeerId, Request, Response, Result};
use anyhow::anyhow;
use futures::{FutureExt, StreamExt};
use std::{
Expand Down Expand Up @@ -132,50 +128,75 @@ impl Handle {

struct PeerHeights {
/// Table used to track the highest checkpoint for each of our peers.
///
/// Today we don't have the concept of a "genesis checkpoint" so when a node starts up with an
/// empty db it won't have any checkpoints, the None case indicates this. If a node shows up in
/// this map then they support state-sync.
heights: HashMap<PeerId, Option<CheckpointSequenceNumber>>,
peers: HashMap<PeerId, PeerStateSyncInfo>,
unprocessed_checkpoints: HashMap<CheckpointDigest, Checkpoint>,
sequence_number_to_digest: HashMap<CheckpointSequenceNumber, CheckpointDigest>,
}

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
struct PeerStateSyncInfo {
/// The digest of the Peer's genesis checkpoint.
genesis_checkpoint_digest: CheckpointDigest,
/// Indicates if this Peer is on the same chain as us.
on_same_chain_as_us: bool,
/// Highest checkpoint sequence number we know of for this Peer.
height: CheckpointSequenceNumber,
}

impl PeerHeights {
pub fn highest_known_checkpoint(&self) -> Option<&Checkpoint> {
self.heights
.values()
.max()
.and_then(Clone::clone)
self.highest_known_checkpoint_sequence_number()
.and_then(|s| self.sequence_number_to_digest.get(&s))
.and_then(|digest| self.unprocessed_checkpoints.get(digest))
}

pub fn highest_known_checkpoint_sequence_number(&self) -> Option<CheckpointSequenceNumber> {
self.heights.values().max().and_then(Clone::clone)
self.peers
.values()
.filter_map(|info| info.on_same_chain_as_us.then_some(info.height))
.max()
}

pub fn update_peer_height(&mut self, peer_id: PeerId, checkpoint: Option<Checkpoint>) {
use std::collections::hash_map::Entry;
pub fn peers_on_same_chain(&self) -> impl Iterator<Item = (&PeerId, &PeerStateSyncInfo)> {
self.peers
.iter()
.filter(|(_peer_id, info)| info.on_same_chain_as_us)
}

// Returns a bool that indicates if the update was done successfully.
//
// This will return false if the given peer doesn't have an entry or is not on the same chain
// as us
pub fn update_peer_info(&mut self, peer_id: PeerId, checkpoint: Checkpoint) -> bool {
let info = match self.peers.get_mut(&peer_id) {
Some(info) if info.on_same_chain_as_us => info,
_ => return false,
};

let latest = checkpoint
.as_ref()
.map(|checkpoint| checkpoint.sequence_number());
info.height = std::cmp::max(checkpoint.sequence_number(), info.height);
self.insert_checkpoint(checkpoint);

match self.heights.entry(peer_id) {
true
}

pub fn insert_peer_info(&mut self, peer_id: PeerId, info: PeerStateSyncInfo) {
use std::collections::hash_map::Entry;

match self.peers.entry(peer_id) {
Entry::Occupied(mut entry) => {
if latest > *entry.get() {
entry.insert(latest);
// If there's already an entry and the genesis checkpoint digests match then update
// the maximum height. Otherwise we'll use the more recent one
let entry = entry.get_mut();
if entry.genesis_checkpoint_digest == info.genesis_checkpoint_digest {
entry.height = std::cmp::max(entry.height, info.height);
} else {
*entry = info;
}
}
Entry::Vacant(entry) => {
entry.insert(latest);
entry.insert(info);
}
}

if let Some(checkpoint) = checkpoint {
self.insert_checkpoint(checkpoint);
}
}

pub fn cleanup_old_checkpoints(&mut self, sequence_number: CheckpointSequenceNumber) {
Expand Down Expand Up @@ -398,7 +419,7 @@ where
self.spawn_get_latest_from_peer(peer_id);
}
Ok(PeerEvent::LostPeer(peer_id, _)) => {
self.peer_heights.write().unwrap().heights.remove(&peer_id);
self.peer_heights.write().unwrap().peers.remove(&peer_id);
}

Err(RecvError::Closed) => {
Expand All @@ -413,7 +434,14 @@ where

fn spawn_get_latest_from_peer(&mut self, peer_id: PeerId) {
if let Some(peer) = self.network.peer(peer_id) {
let task = get_latest_from_peer(peer, self.peer_heights.clone());
let genesis_checkpoint_digest = self
.store
.get_checkpoint_by_sequence_number(0)
.expect("store operation should not fail")
.expect("store should contain genesis checkpoint")
.digest();
let task =
get_latest_from_peer(genesis_checkpoint_digest, peer, self.peer_heights.clone());
self.tasks.spawn(task);
}
}
Expand Down Expand Up @@ -528,11 +556,11 @@ async fn notify_peers_of_checkpoint(
let futs = peer_heights
.read()
.unwrap()
.heights
.iter()
.peers_on_same_chain()
// Filter out any peers who we know already have a checkpoint higher than this one
.filter(|(_peer_id, &height)| Some(checkpoint.sequence_number()) > height)
.map(|(peer_id, _height)| peer_id)
.filter_map(|(peer_id, info)| {
(checkpoint.sequence_number() > info.height).then_some(peer_id)
})
// Filter out any peers who we aren't connected with
.flat_map(|peer_id| network.peer(*peer_id))
.map(StateSyncClient::new)
Expand All @@ -544,33 +572,84 @@ async fn notify_peers_of_checkpoint(
futures::future::join_all(futs).await;
}

async fn get_latest_from_peer(peer: anemo::Peer, peer_heights: Arc<RwLock<PeerHeights>>) {
async fn get_latest_from_peer(
our_genesis_checkpoint_digest: CheckpointDigest,
peer: anemo::Peer,
peer_heights: Arc<RwLock<PeerHeights>>,
) {
let peer_id = peer.peer_id();
let request = Request::new(GetCheckpointSummaryRequest::Latest).with_timeout(DEFAULT_TIMEOUT);
let response = StateSyncClient::new(peer)
.get_checkpoint_summary(request)
.await
.map(Response::into_inner);
update_peer_height(&peer_heights, peer_id, &response);
}
let mut client = StateSyncClient::new(peer);

fn update_peer_height(
peer_heights: &RwLock<PeerHeights>,
peer_id: PeerId,
response: &Result<Option<Checkpoint>, Status>,
) {
match response {
Ok(latest) => {
let info = {
let maybe_info = peer_heights.read().unwrap().peers.get(&peer_id).copied();

if let Some(info) = maybe_info {
info
} else {
// TODO do we want to create a new API just for querying a node's chainid?
//
// We need to query this node's genesis checkpoint to see if they're on the same chain
// as us
let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(0))
.with_timeout(DEFAULT_TIMEOUT);
let response = client
.get_checkpoint_summary(request)
.await
.map(Response::into_inner);

let info = match response {
Ok(Some(checkpoint)) => {
let digest = checkpoint.digest();
PeerStateSyncInfo {
genesis_checkpoint_digest: digest,
on_same_chain_as_us: our_genesis_checkpoint_digest == digest,
height: checkpoint.sequence_number(),
}
}
Ok(None) => PeerStateSyncInfo {
genesis_checkpoint_digest: CheckpointDigest::default(),
on_same_chain_as_us: false,
height: CheckpointSequenceNumber::default(),
},
Err(status) => {
trace!("get_latest_checkpoint_summary request failed: {status:?}");
return;
}
};
peer_heights
.write()
.unwrap()
.update_peer_height(peer_id, latest.clone());
}
Err(status) => {
trace!("get_latest_checkpoint_summary request failed: {status:?}");
peer_heights.write().unwrap().heights.remove(&peer_id);
.insert_peer_info(peer_id, info);
info
}
};

// Bail early if this node isn't on the same chain as us
if !info.on_same_chain_as_us {
return;
}

let checkpoint = {
let request =
Request::new(GetCheckpointSummaryRequest::Latest).with_timeout(DEFAULT_TIMEOUT);
let response = client
.get_checkpoint_summary(request)
.await
.map(Response::into_inner);
match response {
Ok(Some(checkpoint)) => checkpoint,
Ok(None) => return,
Err(status) => {
trace!("get_latest_checkpoint_summary request failed: {status:?}");
return;
}
}
};

peer_heights
.write()
.unwrap()
.update_peer_info(peer_id, checkpoint);
}

async fn query_peers_for_their_latest_checkpoint(
Expand All @@ -582,32 +661,37 @@ async fn query_peers_for_their_latest_checkpoint(
let futs = peer_heights
.read()
.unwrap()
.heights
.keys()
.peers_on_same_chain()
// Filter out any peers who we aren't connected with
.flat_map(|peer_id| network.peer(*peer_id))
.flat_map(|(peer_id, _info)| network.peer(*peer_id))
.map(|peer| {
let peer_id = peer.peer_id();
let mut client = StateSyncClient::new(peer);

let request =
Request::new(GetCheckpointSummaryRequest::Latest).with_timeout(DEFAULT_TIMEOUT);
async move {
let request =
Request::new(GetCheckpointSummaryRequest::Latest).with_timeout(DEFAULT_TIMEOUT);
let response = client
.get_checkpoint_summary(request)
.await
.map(Response::into_inner);
update_peer_height(peer_heights, peer_id, &response);
response
match response {
Ok(Some(checkpoint)) => peer_heights
.write()
.unwrap()
.update_peer_info(peer_id, checkpoint.clone())
.then_some(checkpoint),
Ok(None) => None,
Err(status) => {
trace!("get_latest_checkpoint_summary request failed: {status:?}");
None
}
}
}
})
.collect::<Vec<_>>();

let checkpoints = futures::future::join_all(futs)
.await
.into_iter()
.flatten()
.flatten();
let checkpoints = futures::future::join_all(futs).await.into_iter().flatten();

let highest_checkpoint = checkpoints.max_by_key(|checkpoint| checkpoint.sequence_number());

Expand Down Expand Up @@ -658,11 +742,10 @@ where
let peers = peer_heights
.read()
.unwrap()
.heights
.iter()
.peers_on_same_chain()
// Filter out any peers who can't help
.filter(|(_peer_id, &height)| height > Some(current.sequence_number()))
.map(|(&peer_id, &height)| (peer_id, height))
.filter(|(_peer_id, info)| info.height > current.sequence_number())
.map(|(&peer_id, &info)| (peer_id, info))
.collect::<Vec<_>>();

// range of the next sequence_numbers to fetch
Expand All @@ -672,7 +755,7 @@ where
let mut peers = peers
.iter()
// Filter out any peers who can't help with this particular checkpoint
.filter(|(_peer_id, height)| height >= &Some(next))
.filter(|(_peer_id, info)| info.height >= next)
// Filter out any peers who we aren't connected with
.flat_map(|(peer_id, _height)| network.peer(*peer_id))
.map(StateSyncClient::new)
Expand Down Expand Up @@ -866,10 +949,9 @@ where
let mut peers = peer_heights
.read()
.unwrap()
.heights
.iter()
.peers_on_same_chain()
// Filter out any peers who can't help with this particular checkpoint
.filter(|(_peer_id, &height)| height >= Some(checkpoint.sequence_number()))
.filter(|(_peer_id, info)| info.height >= checkpoint.sequence_number())
// Filter out any peers who we aren't connected with
.flat_map(|(peer_id, _height)| network.peer(*peer_id))
.map(StateSyncClient::new)
Expand Down
8 changes: 6 additions & 2 deletions crates/sui-network/src/state_sync/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,14 @@ where

let checkpoint = request.into_inner();

self.peer_heights
if !self
.peer_heights
.write()
.unwrap()
.update_peer_height(peer_id, Some(checkpoint.clone()));
.update_peer_info(peer_id, checkpoint.clone())
{
return Ok(Response::new(()));
}

let highest_verified_checkpoint = self
.store
Expand Down
Loading

0 comments on commit 508e592

Please sign in to comment.