Skip to content

Commit

Permalink
SafeClient doesn't need to implement AuthorityAPI (MystenLabs#2393)
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind authored Jun 2, 2022
1 parent 87e0756 commit 9f138b8
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 50 deletions.
12 changes: 5 additions & 7 deletions crates/sui-core/src/authority_active/checkpoint_driver/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async fn checkpoint_active_flow_happy_path() {

// Start active part of authority.
for inner_state in authorities.clone() {
let clients = aggregator.authority_clients.clone();
let clients = aggregator.clone_inner_clients();
let _active_handle = tokio::task::spawn(async move {
let active_state =
ActiveAuthority::new(inner_state.authority.clone(), clients).unwrap();
Expand Down Expand Up @@ -93,7 +93,7 @@ async fn checkpoint_active_flow_crash_client_with_gossip() {

// Start active part of authority.
for inner_state in authorities.clone() {
let clients = aggregator.authority_clients.clone();
let clients = aggregator.clone_inner_clients();
let _active_handle = tokio::task::spawn(async move {
let active_state =
ActiveAuthority::new(inner_state.authority.clone(), clients).unwrap();
Expand All @@ -112,14 +112,13 @@ async fn checkpoint_active_flow_crash_client_with_gossip() {
.expect("Unexpected crash");

// Send it only to 1 random node
use crate::authority_client::AuthorityAPI;
let sample_authority = sender_aggregator.committee.sample();
let client: SafeClient<LocalAuthorityClient> =
sender_aggregator.authority_clients[sample_authority].clone();
let _response = client
.handle_confirmation_transaction(ConfirmationTransaction::new(new_certificate))
.await
.expect("Problem processing certificare");
.expect("Problem processing certificate");

// Check whether this is a success?
assert!(matches!(
Expand Down Expand Up @@ -177,7 +176,7 @@ async fn checkpoint_active_flow_crash_client_no_gossip() {

// Start active part of authority.
for inner_state in authorities.clone() {
let clients = aggregator.authority_clients.clone();
let clients = aggregator.clone_inner_clients();
let _active_handle = tokio::task::spawn(async move {
let active_state =
ActiveAuthority::new(inner_state.authority.clone(), clients).unwrap();
Expand All @@ -196,14 +195,13 @@ async fn checkpoint_active_flow_crash_client_no_gossip() {
.expect("Unexpected crash");

// Send it only to 1 random node
use crate::authority_client::AuthorityAPI;
let sample_authority = sender_aggregator.committee.sample();
let client: SafeClient<LocalAuthorityClient> =
sender_aggregator.authority_clients[sample_authority].clone();
let _response = client
.handle_confirmation_transaction(ConfirmationTransaction::new(new_certificate))
.await
.expect("Problem processing certificare");
.expect("Problem processing certificate");

// Check whether this is a success?
assert!(matches!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::authority::AuthorityState;
use crate::authority::AuthorityStore;
use crate::authority_aggregator::authority_aggregator_tests::*;
use crate::authority_client::{AuthorityAPI, BatchInfoResponseItemStream};
use crate::safe_client::SafeClient;
use async_trait::async_trait;
use std::borrow::Borrow;
use std::collections::BTreeMap;
Expand Down Expand Up @@ -242,7 +243,7 @@ pub async fn init_configurable_authorities(
}
states.push(client.state.clone());
names.push(authority_name);
clients.push(client);
clients.push(SafeClient::new(client, committee.clone(), authority_name));
}

// Execute transactions for every EmitUpdateItem Action, use the digest of the transaction to
Expand Down Expand Up @@ -300,9 +301,15 @@ pub async fn init_configurable_authorities(
_ = do_cert(cert_client, &cert1).await;

// Register the internal actions to client
cert_client.register_action_sequence(batch_action_internal.clone());
cert_client
.authority_client_mut()
.register_action_sequence(batch_action_internal.clone());
}
}

let authority_clients = authority_clients
.into_iter()
.map(|(name, client)| (name, client.authority_client().clone()))
.collect();
(authority_clients, states, executed_digests)
}
20 changes: 16 additions & 4 deletions crates/sui-core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,17 @@ impl<A> AuthorityAggregator<A> {
{
self.authority_clients[name].clone()
}

pub fn clone_inner_clients(&self) -> BTreeMap<AuthorityName, A>
where
A: Clone,
{
let mut clients = BTreeMap::new();
for (name, client) in &self.authority_clients {
clients.insert(*name, client.authority_client().clone());
}
clients
}
}

pub enum ReduceOutput<S> {
Expand Down Expand Up @@ -1065,17 +1076,18 @@ where
// - we try to update the authority with the cert, and on error return Err.
// - we try to re-process the certificate and return the result.

let handle = if contains_shared_object {
let res = if contains_shared_object {
client.handle_consensus_transaction(ConsensusTransaction::UserTransaction(Box::new(cert_ref.clone())))
.instrument(tracing::trace_span!("handle_consensus_cert", authority =? name))
.await
} else {
client
.handle_confirmation_transaction(ConfirmationTransaction::new(
cert_ref.clone(),
))
.instrument(tracing::trace_span!("handle_cert", authority =? name))
.await
};
let res = handle
.instrument(tracing::trace_span!("handle_cert", authority =? name))
.await;

if res.is_ok() {
// We got an ok answer, so returning the result of processing
Expand Down
4 changes: 1 addition & 3 deletions crates/sui-core/src/checkpoints/tests/checkpoint_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1435,7 +1435,7 @@ pub async fn checkpoint_tests_setup(num_objects: usize, batch_interval: Duration
/* total_batches */ 100, /* total_transactions */ 100,
);
}
println!("CHANEL EXIT.");
println!("CHANNEL EXIT.");
});

// Now make an authority aggregator
Expand All @@ -1460,8 +1460,6 @@ pub async fn checkpoint_tests_setup(num_objects: usize, batch_interval: Duration
}
}

use crate::authority_client::AuthorityAPI;

#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn checkpoint_messaging_flow_bug() {
let mut setup = checkpoint_tests_setup(5, Duration::from_millis(500)).await;
Expand Down
29 changes: 13 additions & 16 deletions crates/sui-core/src/safe_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
// SPDX-License-Identifier: Apache-2.0

use crate::authority_client::{AuthorityAPI, BatchInfoResponseItemStream};
use async_trait::async_trait;
use futures::StreamExt;
use sui_types::batch::{AuthorityBatch, SignedBatch, TxSequenceNumber, UpdateItem};
use sui_types::crypto::PublicKeyBytes;
Expand Down Expand Up @@ -31,8 +30,12 @@ impl<C> SafeClient<C> {
}
}

pub fn authority_client(&self) -> &C {
&self.authority_client
}

#[cfg(test)]
pub fn authority_client(&mut self) -> &mut C {
pub fn authority_client_mut(&mut self) -> &mut C {
&mut self.authority_client
}

Expand Down Expand Up @@ -274,15 +277,9 @@ where

Ok(new_stream)
}
}

#[async_trait]
impl<C> AuthorityAPI for SafeClient<C>
where
C: AuthorityAPI + Send + Sync + Clone + 'static,
{
/// Initiate a new transfer to a Sui or Primary account.
async fn handle_transaction(
pub async fn handle_transaction(
&self,
transaction: Transaction,
) -> Result<TransactionInfoResponse, SuiError> {
Expand All @@ -299,7 +296,7 @@ where
}

/// Confirm a transfer to a Sui or Primary account.
async fn handle_confirmation_transaction(
pub async fn handle_confirmation_transaction(
&self,
transaction: ConfirmationTransaction,
) -> Result<TransactionInfoResponse, SuiError> {
Expand All @@ -316,7 +313,7 @@ where
Ok(transaction_info)
}

async fn handle_consensus_transaction(
pub async fn handle_consensus_transaction(
&self,
transaction: ConsensusTransaction,
) -> Result<TransactionInfoResponse, SuiError> {
Expand All @@ -326,7 +323,7 @@ where
.await
}

async fn handle_account_info_request(
pub async fn handle_account_info_request(
&self,
request: AccountInfoRequest,
) -> Result<AccountInfoResponse, SuiError> {
Expand All @@ -335,7 +332,7 @@ where
.await
}

async fn handle_object_info_request(
pub async fn handle_object_info_request(
&self,
request: ObjectInfoRequest,
) -> Result<ObjectInfoResponse, SuiError> {
Expand All @@ -351,7 +348,7 @@ where
}

/// Handle Object information requests for this account.
async fn handle_transaction_info_request(
pub async fn handle_transaction_info_request(
&self,
request: TransactionInfoRequest,
) -> Result<TransactionInfoResponse, SuiError> {
Expand All @@ -368,7 +365,7 @@ where
Ok(transaction_info)
}

async fn handle_checkpoint(
pub async fn handle_checkpoint(
&self,
request: CheckpointRequest,
) -> Result<CheckpointResponse, SuiError> {
Expand All @@ -378,7 +375,7 @@ where
}

/// Handle Batch information requests for this authority.
async fn handle_batch_stream(
pub async fn handle_batch_stream(
&self,
request: BatchInfoRequest,
) -> Result<BatchInfoResponseItemStream, SuiError> {
Expand Down
37 changes: 26 additions & 11 deletions crates/sui-core/src/unit_tests/authority_aggregator_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub fn get_local_client(
clients.next();
i += 1;
}
clients.next().unwrap().authority_client()
clients.next().unwrap().authority_client_mut()
}

pub fn transfer_coin_transaction(
Expand Down Expand Up @@ -219,18 +219,24 @@ pub fn to_transaction(data: TransactionData, signer: &dyn Signer<Signature>) ->
Transaction::new(data, signature)
}

pub async fn do_transaction<A: AuthorityAPI>(authority: &A, transaction: &Transaction) {
pub async fn do_transaction<A>(authority: &SafeClient<A>, transaction: &Transaction)
where
A: AuthorityAPI + Send + Sync + Clone + 'static,
{
authority
.handle_transaction(transaction.clone())
.await
.unwrap();
}

pub async fn extract_cert<A: AuthorityAPI>(
authorities: &[&A],
pub async fn extract_cert<A>(
authorities: &[&SafeClient<A>],
committee: &Committee,
transaction_digest: &TransactionDigest,
) -> CertifiedTransaction {
) -> CertifiedTransaction
where
A: AuthorityAPI + Send + Sync + Clone + 'static,
{
let mut votes = vec![];
let mut transaction: Option<SignedTransaction> = None;
for authority in authorities {
Expand Down Expand Up @@ -263,10 +269,13 @@ pub async fn extract_cert<A: AuthorityAPI>(
)
}

pub async fn do_cert<A: AuthorityAPI>(
authority: &A,
pub async fn do_cert<A>(
authority: &SafeClient<A>,
cert: &CertifiedTransaction,
) -> TransactionEffects {
) -> TransactionEffects
where
A: AuthorityAPI + Send + Sync + Clone + 'static,
{
authority
.handle_confirmation_transaction(ConfirmationTransaction::new(cert.clone()))
.await
Expand All @@ -276,7 +285,10 @@ pub async fn do_cert<A: AuthorityAPI>(
.effects
}

pub async fn do_cert_configurable<A: AuthorityAPI>(authority: &A, cert: &CertifiedTransaction) {
pub async fn do_cert_configurable<A>(authority: &A, cert: &CertifiedTransaction)
where
A: AuthorityAPI + Send + Sync + Clone + 'static,
{
let result = authority
.handle_confirmation_transaction(ConfirmationTransaction::new(cert.clone()))
.await;
Expand All @@ -285,7 +297,10 @@ pub async fn do_cert_configurable<A: AuthorityAPI>(authority: &A, cert: &Certifi
}
}

pub async fn get_latest_ref<A: AuthorityAPI>(authority: &A, object_id: ObjectID) -> ObjectRef {
pub async fn get_latest_ref<A>(authority: &SafeClient<A>, object_id: ObjectID) -> ObjectRef
where
A: AuthorityAPI + Send + Sync + Clone + 'static,
{
if let Ok(ObjectInfoResponse {
requested_object_reference: Some(object_ref),
..
Expand Down Expand Up @@ -331,7 +346,7 @@ async fn execute_transaction_with_fault_configs(
.await?;

for client in authorities.authority_clients.values_mut() {
client.authority_client().fault_config.reset();
client.authority_client_mut().fault_config.reset();
}
for (index, config) in configs_before_process_certificate {
get_local_client(&mut authorities, *index).fault_config = *config;
Expand Down
9 changes: 2 additions & 7 deletions crates/test-utils/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::time::Duration;
use sui_config::{NetworkConfig, ValidatorInfo};
use sui_core::{
authority_aggregator::AuthorityAggregator, authority_client::NetworkAuthorityClient,
safe_client::SafeClient,
};
use sui_node::SuiNode;
use sui_types::{committee::Committee, object::Object};
Expand Down Expand Up @@ -54,7 +53,7 @@ where

pub fn create_authority_aggregator(
authority_configs: &[ValidatorInfo],
) -> AuthorityAggregator<SafeClient<NetworkAuthorityClient>> {
) -> AuthorityAggregator<NetworkAuthorityClient> {
let voting_rights: BTreeMap<_, _> = authority_configs
.iter()
.map(|config| (config.public_key(), config.stake()))
Expand All @@ -65,11 +64,7 @@ pub fn create_authority_aggregator(
.map(|config| {
(
config.public_key(),
SafeClient::new(
NetworkAuthorityClient::connect_lazy(config.network_address()).unwrap(),
committee.clone(),
config.public_key(),
),
NetworkAuthorityClient::connect_lazy(config.network_address()).unwrap(),
)
})
.collect();
Expand Down

0 comments on commit 9f138b8

Please sign in to comment.