Skip to content

Commit

Permalink
Fix node sync follower (MystenLabs#2741)
Browse files Browse the repository at this point in the history
* Wait until we have processed all txes in a batch before updating follower store

* NodeSyncDigestHandler::handle_digest returns final status of processing tx
  • Loading branch information
mystenmark authored Jun 27, 2022
1 parent 5e27acd commit b0a5da6
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 24 deletions.
43 changes: 24 additions & 19 deletions crates/sui-core/src/authority_active/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ struct Follower<A> {
aggregator: Arc<AuthorityAggregator<A>>,
}

const EACH_ITEM_DELAY_MS: u64 = 1_000;
const REQUEST_FOLLOW_NUM_DIGESTS: u64 = 100_000;
const REFRESH_FOLLOWER_PERIOD_SECS: u64 = 60;

Expand Down Expand Up @@ -247,6 +246,7 @@ where

#[async_trait]
trait DigestHandler<A> {
/// handle_digest
async fn handle_digest(&self, follower: &Follower<A>, digest: ExecutionDigests) -> SuiResult;
}

Expand Down Expand Up @@ -359,7 +359,8 @@ where
) -> SuiResult {
// Global timeout, we do not exceed this time in this task.
let mut timeout = Box::pin(tokio::time::sleep(duration));
let mut queue = FuturesOrdered::new();
let mut results = FuturesOrdered::new();
let mut batch_seq_to_record = None;

let req = BatchInfoRequest {
start: self.max_seq,
Expand All @@ -379,26 +380,23 @@ where
match items {
Some(Ok(BatchInfoResponseItem(UpdateItem::Batch(signed_batch)) )) => {
let next_seq = signed_batch.batch.next_sequence_number;
self.follower_store.record_next_sequence(&self.peer_name, next_seq)?;
match self.max_seq {
Some(max_seq) => {
if next_seq < max_seq {
info!("Gossip sequence number unexpected: found {:?} but previously received {:?}", next_seq, max_seq);
}
batch_seq_to_record = Some(next_seq);
if let Some(max_seq) = self.max_seq {
if next_seq < max_seq {
info!("Gossip sequence number unexpected: found {:?} but previously received {:?}", next_seq, max_seq);
}
None => {}
}
},

// Upon receiving a transaction digest, store it if it is not processed already.
Some(Ok(BatchInfoResponseItem(UpdateItem::Transaction((_seq, digest))))) => {
if !self.state.database.effects_exists(&digest.transaction)? {
queue.push(async move {
tokio::time::sleep(Duration::from_millis(EACH_ITEM_DELAY_MS)).await;
digest
});
self.state.metrics.gossip_queued_count.inc();
}
Some(Ok(BatchInfoResponseItem(UpdateItem::Transaction((seq, digest))))) => {
let fut = handler.handle_digest(self, digest);
results.push(async move {
fut.await?;
Ok::<TxSequenceNumber, SuiError>(seq)
});

self.state.metrics.gossip_queued_count.inc();
},

// Return any errors.
Expand All @@ -417,8 +415,15 @@ where
},
}
},
digest = &mut queue.next() , if !queue.is_empty() => {
handler.handle_digest(self, digest.unwrap()).await?;

result = &mut results.next() , if !results.is_empty() => {
let seq = result.unwrap()?;
if let Some(batch_seq) = batch_seq_to_record {
if seq >= batch_seq {
self.follower_store.record_next_sequence(&self.peer_name, batch_seq)?;
batch_seq_to_record = None;
}
}
}
};
}
Expand Down
27 changes: 22 additions & 5 deletions crates/sui-core/src/authority_active/gossip/node_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ use sui_types::{
use std::ops::Deref;
use std::sync::{Arc, Mutex};

use tokio::sync::{broadcast, mpsc, Semaphore};
use tokio::sync::{broadcast, mpsc, oneshot, Semaphore};
use tokio::task::JoinHandle;

use tracing::{error, info, trace, warn};
use tracing::{debug, error, info, trace, warn};

const NODE_SYNC_QUEUE_LEN: usize = 500;

Expand Down Expand Up @@ -149,6 +149,7 @@ where
struct DigestsMessage {
digests: ExecutionDigests,
peer: AuthorityName,
tx: oneshot::Sender<SuiResult>,
}

/// NodeSyncState is shared by any number of NodeSyncDigestHandler's, and receives DigestsMessage
Expand Down Expand Up @@ -179,16 +180,27 @@ where
// https://github.com/tokio-rs/tokio/discussions/2648
let limit = Arc::new(Semaphore::new(MAX_NODE_SYNC_CONCURRENCY));

while let Some(DigestsMessage { digests, peer }) = receiver.recv().await {
while let Some(DigestsMessage { digests, peer, tx }) = receiver.recv().await {
let permit = Arc::clone(&limit).acquire_owned().await;
let state = state.clone();
tokio::spawn(async move {
let _permit = permit; // hold semaphore permit until task completes
// TODO: must send status back to follower so that it knows whether to advance
// the watermark.
if let Err(error) = state.process_digest(peer, digests).await {
let res = state.process_digest(peer, digests).await;
if let Err(error) = &res {
error!(?digests, ?peer, "process_digest failed: {}", error);
}
if tx.send(res).is_err() {
// This will happen any time the follower times out and restarts, but
// that's ok - the follower won't have marked this digest as processed so it
// will be retried.
debug!(
?digests,
?peer,
"could not send process_digest response to caller",
);
}
});
}
})
Expand Down Expand Up @@ -396,15 +408,20 @@ where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
async fn handle_digest(&self, follower: &Follower<A>, digests: ExecutionDigests) -> SuiResult {
let (tx, rx) = oneshot::channel();
self.sender
.send(DigestsMessage {
digests,
peer: follower.peer_name,
tx,
})
.await
.map_err(|e| SuiError::GenericAuthorityError {
error: e.to_string(),
})
})?;
rx.await.map_err(|e| SuiError::GenericAuthorityError {
error: e.to_string(),
})?
}
}

Expand Down

0 comments on commit b0a5da6

Please sign in to comment.