Skip to content

Commit

Permalink
updated based on review
Browse files Browse the repository at this point in the history
  • Loading branch information
lanvidr committed May 25, 2022
1 parent dacc007 commit ea7aa34
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl AuthorityAPI for ConfigurableBatchActionClient {
let name = self.state.name;
let mut items: Vec<Result<BatchInfoResponseItem, SuiError>> = Vec::new();
let mut seq = 0;
let _ = actions.into_iter().for_each(|action| {
let _ = actions.iter().for_each(|action| {
match action {
BatchActionInternal::EmitUpdateItem(test_batch) => {
let mut temp_items: Vec<Result<BatchInfoResponseItem, SuiError>> = Vec::new();
Expand Down Expand Up @@ -255,7 +255,7 @@ pub async fn init_configurable_authorities(
digests: vec![*transaction.digest()],
};
batch_action_internal.push(BatchActionInternal::EmitUpdateItem(t_b));
executed_digests.push(transaction.digest().clone());
executed_digests.push(*transaction.digest());
}
if let BatchAction::EmitError() = action {
batch_action_internal.push(BatchActionInternal::EmitError());
Expand Down
40 changes: 25 additions & 15 deletions crates/sui-core/src/authority_active/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
safe_client::SafeClient,
};
use async_trait::async_trait;
use futures::stream::FuturesOrdered;
use futures::{stream::FuturesUnordered, StreamExt};
use std::{collections::HashSet, sync::Arc, time::Duration};
use sui_types::{
Expand Down Expand Up @@ -35,6 +36,7 @@ struct PeerGossip<A> {
aggregator: Arc<AuthorityAggregator<A>>,
}

const EACH_ITEM_DELAY_MS: u64 = 1_000;
const REQUEST_FOLLOW_NUM_DIGESTS: u64 = 100_000;
const REFRESH_FOLLOWER_PERIOD_SECS: u64 = 60;

Expand Down Expand Up @@ -156,23 +158,21 @@ where
// Make sure we exit loop by limiting the number of tries to choose peer
// where n is the total number of committee members.
let mut tries_remaining = active_authority.state.committee.voting_rights.len();
loop {
while tries_remaining > 0 {
let name = active_authority.state.committee.sample();
if peer_names.contains(name)
|| *name == my_name
|| !active_authority.can_contact(*name).await
{
tries_remaining -= 1;
if tries_remaining == 0 {
return Err(SuiError::GenericAuthorityError {
error: "Could not connect to any peer".to_string(),
});
}
tokio::time::sleep(Duration::from_millis(10)).await;
continue;
}
return Ok(*name);
}
Err(SuiError::GenericAuthorityError {
error: "Could not connect to any peer".to_string(),
})
}

impl<A> PeerGossip<A>
Expand All @@ -194,21 +194,22 @@ where
let result =
tokio::task::spawn(async move { self.peer_gossip_for_duration(duration).await }).await;

if result.is_err() {
return (
// todo: log e
match result {
Err(_e) => (
peer_name,
Err(SuiError::GenericAuthorityError {
error: "Gossip Join Error".to_string(),
}),
);
};

(peer_name, result.unwrap())
),
Ok(r) => (peer_name, r),
}
}

async fn peer_gossip_for_duration(&mut self, duration: Duration) -> Result<(), SuiError> {
// Global timeout, we do not exceed this time in this task.
let mut timeout = Box::pin(tokio::time::sleep(duration));
let mut queue = FuturesOrdered::new();

let req = BatchInfoRequest {
start: self.max_seq,
Expand All @@ -231,9 +232,10 @@ where
// Upon receiving a transaction digest, store it if it is not processed already.
Some(Ok(BatchInfoResponseItem(UpdateItem::Transaction((seq, digest))))) => {
if !self.state.database.effects_exists(&digest)? {
// Download the certificate
let response = self.client.handle_transaction_info_request(TransactionInfoRequest::from(digest)).await?;
self.process_response(response).await?;
queue.push(async move {
tokio::time::sleep(Duration::from_millis(EACH_ITEM_DELAY_MS)).await;
digest
});

}
self.max_seq = Some(seq + 1);
Expand All @@ -255,6 +257,14 @@ where
},
}
},
digest = &mut queue.next() , if !queue.is_empty() => {
let digest = digest.unwrap();
if !self.state.database.effects_exists(&digest)? {
// Download the certificate
let response = self.client.handle_transaction_info_request(TransactionInfoRequest::from(digest)).await?;
self.process_response(response).await?;
}
}
};
}
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority_active/gossip/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub async fn test_gossip_error() {

//assert!(result1.is_ok());
let result = result1.unwrap();
let found_cert = result.certified_transaction.is_some();
let _found_cert = result.certified_transaction.is_some();
//assert!(found_cert); // todo this should not fail
}
}
Expand Down

0 comments on commit ea7aa34

Please sign in to comment.