From 9f138b8a49c32b5859e3b1844789547bcd3cada5 Mon Sep 17 00:00:00 2001 From: Xun Li Date: Thu, 2 Jun 2022 15:30:33 -0700 Subject: [PATCH] SafeClient doesn't need to implement AuthorityAPI (#2393) --- .../checkpoint_driver/tests.rs | 12 +++--- .../configurable_batch_action_client.rs | 11 +++++- crates/sui-core/src/authority_aggregator.rs | 20 ++++++++-- .../src/checkpoints/tests/checkpoint_tests.rs | 4 +- crates/sui-core/src/safe_client.rs | 29 +++++++-------- .../unit_tests/authority_aggregator_tests.rs | 37 +++++++++++++------ crates/test-utils/src/authority.rs | 9 +---- 7 files changed, 72 insertions(+), 50 deletions(-) diff --git a/crates/sui-core/src/authority_active/checkpoint_driver/tests.rs b/crates/sui-core/src/authority_active/checkpoint_driver/tests.rs index 10238c71741a0..a217088da1749 100644 --- a/crates/sui-core/src/authority_active/checkpoint_driver/tests.rs +++ b/crates/sui-core/src/authority_active/checkpoint_driver/tests.rs @@ -27,7 +27,7 @@ async fn checkpoint_active_flow_happy_path() { // Start active part of authority. for inner_state in authorities.clone() { - let clients = aggregator.authority_clients.clone(); + let clients = aggregator.clone_inner_clients(); let _active_handle = tokio::task::spawn(async move { let active_state = ActiveAuthority::new(inner_state.authority.clone(), clients).unwrap(); @@ -93,7 +93,7 @@ async fn checkpoint_active_flow_crash_client_with_gossip() { // Start active part of authority. for inner_state in authorities.clone() { - let clients = aggregator.authority_clients.clone(); + let clients = aggregator.clone_inner_clients(); let _active_handle = tokio::task::spawn(async move { let active_state = ActiveAuthority::new(inner_state.authority.clone(), clients).unwrap(); @@ -112,14 +112,13 @@ async fn checkpoint_active_flow_crash_client_with_gossip() { .expect("Unexpected crash"); // Send it only to 1 random node - use crate::authority_client::AuthorityAPI; let sample_authority = sender_aggregator.committee.sample(); let client: SafeClient = sender_aggregator.authority_clients[sample_authority].clone(); let _response = client .handle_confirmation_transaction(ConfirmationTransaction::new(new_certificate)) .await - .expect("Problem processing certificare"); + .expect("Problem processing certificate"); // Check whether this is a success? assert!(matches!( @@ -177,7 +176,7 @@ async fn checkpoint_active_flow_crash_client_no_gossip() { // Start active part of authority. for inner_state in authorities.clone() { - let clients = aggregator.authority_clients.clone(); + let clients = aggregator.clone_inner_clients(); let _active_handle = tokio::task::spawn(async move { let active_state = ActiveAuthority::new(inner_state.authority.clone(), clients).unwrap(); @@ -196,14 +195,13 @@ async fn checkpoint_active_flow_crash_client_no_gossip() { .expect("Unexpected crash"); // Send it only to 1 random node - use crate::authority_client::AuthorityAPI; let sample_authority = sender_aggregator.committee.sample(); let client: SafeClient = sender_aggregator.authority_clients[sample_authority].clone(); let _response = client .handle_confirmation_transaction(ConfirmationTransaction::new(new_certificate)) .await - .expect("Problem processing certificare"); + .expect("Problem processing certificate"); // Check whether this is a success? assert!(matches!( diff --git a/crates/sui-core/src/authority_active/gossip/configurable_batch_action_client.rs b/crates/sui-core/src/authority_active/gossip/configurable_batch_action_client.rs index 9f358ca67e5ba..df1e5a7c4f7f3 100644 --- a/crates/sui-core/src/authority_active/gossip/configurable_batch_action_client.rs +++ b/crates/sui-core/src/authority_active/gossip/configurable_batch_action_client.rs @@ -6,6 +6,7 @@ use crate::authority::AuthorityState; use crate::authority::AuthorityStore; use crate::authority_aggregator::authority_aggregator_tests::*; use crate::authority_client::{AuthorityAPI, BatchInfoResponseItemStream}; +use crate::safe_client::SafeClient; use async_trait::async_trait; use std::borrow::Borrow; use std::collections::BTreeMap; @@ -242,7 +243,7 @@ pub async fn init_configurable_authorities( } states.push(client.state.clone()); names.push(authority_name); - clients.push(client); + clients.push(SafeClient::new(client, committee.clone(), authority_name)); } // Execute transactions for every EmitUpdateItem Action, use the digest of the transaction to @@ -300,9 +301,15 @@ pub async fn init_configurable_authorities( _ = do_cert(cert_client, &cert1).await; // Register the internal actions to client - cert_client.register_action_sequence(batch_action_internal.clone()); + cert_client + .authority_client_mut() + .register_action_sequence(batch_action_internal.clone()); } } + let authority_clients = authority_clients + .into_iter() + .map(|(name, client)| (name, client.authority_client().clone())) + .collect(); (authority_clients, states, executed_digests) } diff --git a/crates/sui-core/src/authority_aggregator.rs b/crates/sui-core/src/authority_aggregator.rs index 21832162c48ab..14c05523adfc5 100644 --- a/crates/sui-core/src/authority_aggregator.rs +++ b/crates/sui-core/src/authority_aggregator.rs @@ -65,6 +65,17 @@ impl AuthorityAggregator { { self.authority_clients[name].clone() } + + pub fn clone_inner_clients(&self) -> BTreeMap + where + A: Clone, + { + let mut clients = BTreeMap::new(); + for (name, client) in &self.authority_clients { + clients.insert(*name, client.authority_client().clone()); + } + clients + } } pub enum ReduceOutput { @@ -1065,17 +1076,18 @@ where // - we try to update the authority with the cert, and on error return Err. // - we try to re-process the certificate and return the result. - let handle = if contains_shared_object { + let res = if contains_shared_object { client.handle_consensus_transaction(ConsensusTransaction::UserTransaction(Box::new(cert_ref.clone()))) + .instrument(tracing::trace_span!("handle_consensus_cert", authority =? name)) + .await } else { client .handle_confirmation_transaction(ConfirmationTransaction::new( cert_ref.clone(), )) + .instrument(tracing::trace_span!("handle_cert", authority =? name)) + .await }; - let res = handle - .instrument(tracing::trace_span!("handle_cert", authority =? name)) - .await; if res.is_ok() { // We got an ok answer, so returning the result of processing diff --git a/crates/sui-core/src/checkpoints/tests/checkpoint_tests.rs b/crates/sui-core/src/checkpoints/tests/checkpoint_tests.rs index 58e458f037fb0..7d0562c47c06d 100644 --- a/crates/sui-core/src/checkpoints/tests/checkpoint_tests.rs +++ b/crates/sui-core/src/checkpoints/tests/checkpoint_tests.rs @@ -1435,7 +1435,7 @@ pub async fn checkpoint_tests_setup(num_objects: usize, batch_interval: Duration /* total_batches */ 100, /* total_transactions */ 100, ); } - println!("CHANEL EXIT."); + println!("CHANNEL EXIT."); }); // Now make an authority aggregator @@ -1460,8 +1460,6 @@ pub async fn checkpoint_tests_setup(num_objects: usize, batch_interval: Duration } } -use crate::authority_client::AuthorityAPI; - #[tokio::test(flavor = "current_thread", start_paused = true)] async fn checkpoint_messaging_flow_bug() { let mut setup = checkpoint_tests_setup(5, Duration::from_millis(500)).await; diff --git a/crates/sui-core/src/safe_client.rs b/crates/sui-core/src/safe_client.rs index f9cdbac368df4..ca43ffab4fcd3 100644 --- a/crates/sui-core/src/safe_client.rs +++ b/crates/sui-core/src/safe_client.rs @@ -3,7 +3,6 @@ // SPDX-License-Identifier: Apache-2.0 use crate::authority_client::{AuthorityAPI, BatchInfoResponseItemStream}; -use async_trait::async_trait; use futures::StreamExt; use sui_types::batch::{AuthorityBatch, SignedBatch, TxSequenceNumber, UpdateItem}; use sui_types::crypto::PublicKeyBytes; @@ -31,8 +30,12 @@ impl SafeClient { } } + pub fn authority_client(&self) -> &C { + &self.authority_client + } + #[cfg(test)] - pub fn authority_client(&mut self) -> &mut C { + pub fn authority_client_mut(&mut self) -> &mut C { &mut self.authority_client } @@ -274,15 +277,9 @@ where Ok(new_stream) } -} -#[async_trait] -impl AuthorityAPI for SafeClient -where - C: AuthorityAPI + Send + Sync + Clone + 'static, -{ /// Initiate a new transfer to a Sui or Primary account. - async fn handle_transaction( + pub async fn handle_transaction( &self, transaction: Transaction, ) -> Result { @@ -299,7 +296,7 @@ where } /// Confirm a transfer to a Sui or Primary account. - async fn handle_confirmation_transaction( + pub async fn handle_confirmation_transaction( &self, transaction: ConfirmationTransaction, ) -> Result { @@ -316,7 +313,7 @@ where Ok(transaction_info) } - async fn handle_consensus_transaction( + pub async fn handle_consensus_transaction( &self, transaction: ConsensusTransaction, ) -> Result { @@ -326,7 +323,7 @@ where .await } - async fn handle_account_info_request( + pub async fn handle_account_info_request( &self, request: AccountInfoRequest, ) -> Result { @@ -335,7 +332,7 @@ where .await } - async fn handle_object_info_request( + pub async fn handle_object_info_request( &self, request: ObjectInfoRequest, ) -> Result { @@ -351,7 +348,7 @@ where } /// Handle Object information requests for this account. - async fn handle_transaction_info_request( + pub async fn handle_transaction_info_request( &self, request: TransactionInfoRequest, ) -> Result { @@ -368,7 +365,7 @@ where Ok(transaction_info) } - async fn handle_checkpoint( + pub async fn handle_checkpoint( &self, request: CheckpointRequest, ) -> Result { @@ -378,7 +375,7 @@ where } /// Handle Batch information requests for this authority. - async fn handle_batch_stream( + pub async fn handle_batch_stream( &self, request: BatchInfoRequest, ) -> Result { diff --git a/crates/sui-core/src/unit_tests/authority_aggregator_tests.rs b/crates/sui-core/src/unit_tests/authority_aggregator_tests.rs index ffc6c5f1c14b2..4cebe8f42f46b 100644 --- a/crates/sui-core/src/unit_tests/authority_aggregator_tests.rs +++ b/crates/sui-core/src/unit_tests/authority_aggregator_tests.rs @@ -85,7 +85,7 @@ pub fn get_local_client( clients.next(); i += 1; } - clients.next().unwrap().authority_client() + clients.next().unwrap().authority_client_mut() } pub fn transfer_coin_transaction( @@ -219,18 +219,24 @@ pub fn to_transaction(data: TransactionData, signer: &dyn Signer) -> Transaction::new(data, signature) } -pub async fn do_transaction(authority: &A, transaction: &Transaction) { +pub async fn do_transaction(authority: &SafeClient, transaction: &Transaction) +where + A: AuthorityAPI + Send + Sync + Clone + 'static, +{ authority .handle_transaction(transaction.clone()) .await .unwrap(); } -pub async fn extract_cert( - authorities: &[&A], +pub async fn extract_cert( + authorities: &[&SafeClient], committee: &Committee, transaction_digest: &TransactionDigest, -) -> CertifiedTransaction { +) -> CertifiedTransaction +where + A: AuthorityAPI + Send + Sync + Clone + 'static, +{ let mut votes = vec![]; let mut transaction: Option = None; for authority in authorities { @@ -263,10 +269,13 @@ pub async fn extract_cert( ) } -pub async fn do_cert( - authority: &A, +pub async fn do_cert( + authority: &SafeClient, cert: &CertifiedTransaction, -) -> TransactionEffects { +) -> TransactionEffects +where + A: AuthorityAPI + Send + Sync + Clone + 'static, +{ authority .handle_confirmation_transaction(ConfirmationTransaction::new(cert.clone())) .await @@ -276,7 +285,10 @@ pub async fn do_cert( .effects } -pub async fn do_cert_configurable(authority: &A, cert: &CertifiedTransaction) { +pub async fn do_cert_configurable(authority: &A, cert: &CertifiedTransaction) +where + A: AuthorityAPI + Send + Sync + Clone + 'static, +{ let result = authority .handle_confirmation_transaction(ConfirmationTransaction::new(cert.clone())) .await; @@ -285,7 +297,10 @@ pub async fn do_cert_configurable(authority: &A, cert: &Certifi } } -pub async fn get_latest_ref(authority: &A, object_id: ObjectID) -> ObjectRef { +pub async fn get_latest_ref(authority: &SafeClient, object_id: ObjectID) -> ObjectRef +where + A: AuthorityAPI + Send + Sync + Clone + 'static, +{ if let Ok(ObjectInfoResponse { requested_object_reference: Some(object_ref), .. @@ -331,7 +346,7 @@ async fn execute_transaction_with_fault_configs( .await?; for client in authorities.authority_clients.values_mut() { - client.authority_client().fault_config.reset(); + client.authority_client_mut().fault_config.reset(); } for (index, config) in configs_before_process_certificate { get_local_client(&mut authorities, *index).fault_config = *config; diff --git a/crates/test-utils/src/authority.rs b/crates/test-utils/src/authority.rs index c2784bc363fa7..d5e33d85ddff9 100644 --- a/crates/test-utils/src/authority.rs +++ b/crates/test-utils/src/authority.rs @@ -8,7 +8,6 @@ use std::time::Duration; use sui_config::{NetworkConfig, ValidatorInfo}; use sui_core::{ authority_aggregator::AuthorityAggregator, authority_client::NetworkAuthorityClient, - safe_client::SafeClient, }; use sui_node::SuiNode; use sui_types::{committee::Committee, object::Object}; @@ -54,7 +53,7 @@ where pub fn create_authority_aggregator( authority_configs: &[ValidatorInfo], -) -> AuthorityAggregator> { +) -> AuthorityAggregator { let voting_rights: BTreeMap<_, _> = authority_configs .iter() .map(|config| (config.public_key(), config.stake())) @@ -65,11 +64,7 @@ pub fn create_authority_aggregator( .map(|config| { ( config.public_key(), - SafeClient::new( - NetworkAuthorityClient::connect_lazy(config.network_address()).unwrap(), - committee.clone(), - config.public_key(), - ), + NetworkAuthorityClient::connect_lazy(config.network_address()).unwrap(), ) }) .collect();