Skip to content

Commit

Permalink
remove parameterization of start sequence
Browse files Browse the repository at this point in the history
  • Loading branch information
lanvidr committed Jun 10, 2022
1 parent c3bd75d commit 8f9d95a
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 30 deletions.
46 changes: 19 additions & 27 deletions crates/sui-core/src/authority_active/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,9 @@ use super::ActiveAuthority;
pub async fn gossip_process<A>(active_authority: &ActiveAuthority<A>, degree: usize)
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
gossip_process_with_start_seq(active_authority, degree, None).await
}

pub async fn gossip_process_with_start_seq<A>(
_active_authority: &ActiveAuthority<A>,
degree: usize,
start_seq: Option<TxSequenceNumber>,
) where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
// Make a clone of the active authority and committee, and keep using it until epoch changes.
let mut local_active = Arc::new(_active_authority.clone());
let mut local_active = Arc::new(active_authority.clone());
let mut committee = local_active.state.committee.load().deref().clone();

// Number of tasks at most "degree" and no more than committee - 1
Expand All @@ -80,14 +70,14 @@ pub async fn gossip_process_with_start_seq<A>(
let mut gossip_tasks = FuturesUnordered::new();

loop {
if _active_authority.state.committee.load().epoch != committee.epoch {
if active_authority.state.committee.load().epoch != committee.epoch {
// If epoch has changed, we need to make a new copy of the active authority,
// and update all local variables.
// We also need to remove any authority that's no longer a valid validator
// from the list of peer names.
// It's ok to keep the existing gossip tasks running even for peers that are no longer
// validators, and let them end naturally.
local_active = Arc::new(_active_authority.clone());
local_active = Arc::new(active_authority.clone());
committee = local_active.state.committee.load().deref().clone();
target_num_tasks = usize::min(committee.voting_rights.len() - 1, degree);
peer_names = peer_names
Expand Down Expand Up @@ -119,7 +109,7 @@ pub async fn gossip_process_with_start_seq<A>(
peer_names.insert(name);
let local_active_ref_copy = local_active.clone();
gossip_tasks.push(async move {
let peer_gossip = PeerGossip::new(name, &local_active_ref_copy, start_seq);
let peer_gossip = PeerGossip::new(name, &local_active_ref_copy);
// Add more duration if we make more than 1 to ensure overlap
debug!(peer = ?name, "Starting gossip from peer");
peer_gossip
Expand Down Expand Up @@ -222,27 +212,21 @@ impl<A> PeerGossip<A>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
pub fn new(
peer_name: AuthorityName,
active_authority: &ActiveAuthority<A>,
start_seq: Option<TxSequenceNumber>,
) -> PeerGossip<A> {
pub fn new(peer_name: AuthorityName, active_authority: &ActiveAuthority<A>) -> PeerGossip<A> {
// TODO: for validator gossip, we should always use None as the start_seq, but we should
// consult the start_seq we retrieved from the db to make sure that the peer is giving
// us new txes.
let start_seq = match active_authority
.follower_store
.get_next_sequence(&peer_name)
{
Err(e) => {
error!("Could not load next sequence from follower store, defaulting to None. Error: {}", e);
// It might seem like a good idea to return start_seq here, but if we are running
// as a full node start_seq will be Some(0), and if the gossip process is repeatedly
// restarting, we would in that case repeatedly re-request all txes from the
// beginning of the epoch which could DoS the validators we are following.
None
Err(_e) => {
// If there was no start sequence found for this peer, it is likely a new peer
// that has just joined the network, start at 0.
info!("New gossip peer has joined: {:?}", peer_name);
Some(0)
}
Ok(s) => s.or(start_seq),
Ok(s) => s.or(Some(0)),
};

Self {
Expand Down Expand Up @@ -297,6 +281,14 @@ where
self.state.metrics.gossip_queued_count.inc();

}
match self.max_seq {
Some(max_seq) => {
if seq < max_seq {
info!("Gossip sequence number unexpected: found {:?} but previously received {:?}", seq, max_seq);
}
}
None => {}
}
self.max_seq = Some(seq + 1);
},

Expand Down
7 changes: 4 additions & 3 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ use futures::TryFutureExt;
use parking_lot::Mutex;
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use sui_config::NodeConfig;
use sui_core::authority_active::gossip::gossip_process;
use sui_core::authority_server::ValidatorService;
use sui_core::{
authority::{AuthorityState, AuthorityStore},
authority_active::{gossip::gossip_process_with_start_seq, ActiveAuthority},
authority_active::ActiveAuthority,
authority_client::NetworkAuthorityClient,
checkpoints::CheckpointStore,
};
Expand Down Expand Up @@ -97,12 +98,12 @@ impl SuiNode {

// Start following validators
Some(tokio::task::spawn(async move {
gossip_process_with_start_seq(
gossip_process(
&active_authority,
// listen to all authorities (note that gossip_process caps this to total minus 1.)
active_authority.state.committee.load().voting_rights.len(),
// start receiving the earliest TXes the validator has.
Some(0),
//Some(0),
)
.await;
}))
Expand Down

0 comments on commit 8f9d95a

Please sign in to comment.