Skip to content

Commit

Permalink
swap network information at end of reconfiguration if A is type with …
Browse files Browse the repository at this point in the history
…network
  • Loading branch information
lanvidr committed Jun 30, 2022
1 parent efc36d7 commit c7f87e7
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 14 deletions.
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1511,7 +1511,7 @@ pub async fn generate_genesis_system_object<S: Eq + Serialize + for<'de> Deseria
CallArg::Pure(bcs::to_bytes(&pubkeys).unwrap()),
CallArg::Pure(bcs::to_bytes(&sui_addresses).unwrap()),
CallArg::Pure(bcs::to_bytes(&names).unwrap()),
// TODO Laura: below is netaddress, for now just use names as we don't yet want to expose them.
// TODO: below is netaddress, for now just use names as we don't yet want to expose them.
CallArg::Pure(bcs::to_bytes(&names).unwrap()),
CallArg::Pure(bcs::to_bytes(&stakes).unwrap()),
],
Expand Down
23 changes: 23 additions & 0 deletions crates/sui-core/src/authority_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ use sui_types::{
object::Object,
};

use crate::epoch::reconfiguration::Reconfigurable;
#[cfg(test)]
use sui_config::genesis::Genesis;
use sui_network::tonic::transport::Channel;

#[async_trait]
pub trait AuthorityAPI {
Expand Down Expand Up @@ -102,6 +104,17 @@ impl NetworkAuthorityClient {
}
}

#[async_trait]
impl Reconfigurable for NetworkAuthorityClient {
fn needs_network_recreation() -> bool {
true
}

fn recreate(channel: tonic::transport::Channel) -> Self {
NetworkAuthorityClient::new(channel)
}
}

#[async_trait]
impl AuthorityAPI for NetworkAuthorityClient {
/// Initiate a new transfer to a Sui or Primary account.
Expand Down Expand Up @@ -221,6 +234,16 @@ pub struct LocalAuthorityClient {
pub fault_config: LocalAuthorityClientFaultConfig,
}

impl Reconfigurable for LocalAuthorityClient {
fn needs_network_recreation() -> bool {
false
}

fn recreate(_channel: Channel) -> Self {
unreachable!(); // this function should not get called because the above function returns false
}
}

#[async_trait]
impl AuthorityAPI for LocalAuthorityClient {
async fn handle_transaction(
Expand Down
90 changes: 77 additions & 13 deletions crates/sui-core/src/epoch/reconfiguration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,35 @@
use crate::authority_active::ActiveAuthority;
use crate::authority_aggregator::AuthorityAggregator;
use crate::authority_client::AuthorityAPI;
use async_trait::async_trait;
use multiaddr::Multiaddr;
use std::collections::BTreeMap;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use sui_network::tonic;
use sui_types::committee::Committee;
use sui_types::crypto::PublicKeyBytes;
use sui_types::error::SuiResult;
use sui_types::error::{SuiError, SuiResult};
use sui_types::messages::{ConfirmationTransaction, SignedTransaction};
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
use typed_store::Map;

#[async_trait]
pub trait Reconfigurable {
fn needs_network_recreation() -> bool;

fn recreate(channel: tonic::transport::Channel) -> Self;
}

// TODO: Make last checkpoint number of each epoch more flexible.
pub const CHECKPOINT_COUNT_PER_EPOCH: u64 = 200;

const WAIT_BETWEEN_EPOCH_TX_QUERY_RETRY: Duration = Duration::from_millis(300);

impl<A> ActiveAuthority<A>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
A: AuthorityAPI + Send + Sync + 'static + Clone + Reconfigurable,
{
/// This function should be called by the active checkpoint process, when it finishes processing
/// all transactions from the second to the least checkpoint of the epoch. It's called by a
Expand Down Expand Up @@ -94,18 +105,69 @@ where
.collect();
let new_committee = Committee::new(next_epoch, votes)?;
self.state.insert_new_epoch_info(&new_committee)?;
let new_net = Arc::new(AuthorityAggregator::new(
new_committee,
self.net.load().clone_inner_clients(),
self.gateway_metrics.clone(),
));
self.net.store(new_net.clone());
// TODO Laura: Also reconnect network if changed.
// This is blocked for now since we are not storing network info on-chain yet.

// Reconnect the network if we have an type of AuthorityClient that has a network.
if A::needs_network_recreation() {
let mut new_clients = BTreeMap::new();
let sui_system_state = self
.state
.get_sui_system_state_object()
.await
.map_err(|e| SuiError::GenericAuthorityError {
error: e.to_string(),
})
.unwrap();
let next_epoch_validators = sui_system_state.validators.next_epoch_validators;

let mut net_config = mysten_network::config::Config::new();
net_config.connect_timeout = Some(Duration::from_secs(5));
net_config.request_timeout = Some(Duration::from_secs(5));
net_config.http2_keepalive_interval = Some(Duration::from_secs(5));

for validator in next_epoch_validators {
let net_addr: &[u8] = &validator.net_address.clone();
let str_addr =
std::str::from_utf8(net_addr).map_err(|e| SuiError::GenericAuthorityError {
error: e.to_string(),
});
let address: Multiaddr = str_addr
.unwrap()
.parse()
.map_err(|e: multiaddr::Error| SuiError::GenericAuthorityError {
error: e.to_string(),
})
.unwrap();

let channel = net_config
.connect_lazy(&address)
.map_err(|e| SuiError::GenericAuthorityError {
error: e.to_string(),
})
.unwrap();
let client: A = A::recreate(channel);
let name: &[u8] = &validator.name;
let public_key_bytes = PublicKeyBytes::try_from(name)?;
new_clients.insert(public_key_bytes, client);
}

// Replace the clients in the authority aggregator with new clients.
let new_net = Arc::new(AuthorityAggregator::new(
new_committee,
new_clients,
self.gateway_metrics.clone(),
));
self.net.store(new_net);
} else {
// update the authorities with the new committee
let new_net = Arc::new(AuthorityAggregator::new(
new_committee,
self.net.load().clone_inner_clients(),
self.gateway_metrics.clone(),
));
self.net.store(new_net.clone());
}
// TODO: Update all committee in all components safely,
// potentially restart some authority clients.
// Including: self.net, narwhal committee/consensus adapter,
// potentially restart narwhal committee/consensus adapter,
// all active processes, maybe batch service.
// We should also reduce the amount of committee passed around.

Expand All @@ -124,7 +186,9 @@ where
// Collect a certificate for this system transaction that changes epoch,
// and execute it locally.
loop {
if let Ok(certificate) = new_net
if let Ok(certificate) = self
.net
.load()
.process_transaction(advance_epoch_tx.clone().to_transaction())
.await
{
Expand Down

0 comments on commit c7f87e7

Please sign in to comment.