Skip to content

Commit

Permalink
[node] Add plumbing that allows authorities to connect to other autho…
Browse files Browse the repository at this point in the history
…rities (MystenLabs#1610)

* Add plumbing to allows authorities to connect to other authorities

Co-authored-by: George Danezis <[email protected]>
  • Loading branch information
gdanezis and George Danezis authored Apr 28, 2022
1 parent 27dff72 commit 3b6cd6b
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 1 deletion.
3 changes: 3 additions & 0 deletions sui/src/bin/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,16 @@ async fn main() -> Result<(), anyhow::Error> {
.join(CONSENSUS_DB_NAME)
.join(encode_bytes_hex(net_cfg.key_pair.public_key_bytes()));

// Pass in the newtwork parameters of all authorities
let net = network_config.get_authority_infos();
if let Err(e) = make_server(
net_cfg,
&Committee::from(&network_config),
network_config.buffer_size,
&consensus_committee,
&consensus_store_path,
&consensus_parameters,
Some(net),
)
.await
.unwrap()
Expand Down
36 changes: 35 additions & 1 deletion sui/src/sui_commands.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::config::{make_default_narwhal_committee, CONSENSUS_DB_NAME};
use crate::config::{make_default_narwhal_committee, AuthorityInfo, CONSENSUS_DB_NAME};
use crate::config::{
AuthorityPrivateInfo, Config, GenesisConfig, NetworkConfig, PersistedConfig, WalletConfig,
};
Expand All @@ -20,11 +20,15 @@ use std::fs;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use sui_adapter::adapter::generate_package_id;
use sui_adapter::genesis;
use sui_core::authority::{AuthorityState, AuthorityStore};
use sui_core::authority_active::ActiveAuthority;
use sui_core::authority_client::NetworkAuthorityClient;
use sui_core::authority_server::AuthorityServer;
use sui_core::consensus_adapter::ConsensusListener;
use sui_network::network::NetworkClient;
use sui_network::transport::SpawnedServer;
use sui_network::transport::DEFAULT_MAX_DATAGRAM_SIZE;
use sui_types::base_types::decode_bytes_hex;
Expand Down Expand Up @@ -287,6 +291,9 @@ impl SuiNetwork {
let consensus_committee = make_default_narwhal_committee(&config.authorities)?;
let consensus_parameters = ConsensusParameters::default();

// Pass in the newtwork parameters of all authorities
let net = config.get_authority_infos();

let mut spawned_authorities = Vec::new();
for authority in &config.authorities {
let consensus_store_path = sui_config_dir()?
Expand All @@ -300,6 +307,7 @@ impl SuiNetwork {
&consensus_committee,
&consensus_store_path,
&consensus_parameters,
Some(net.clone()),
)
.await?;
spawned_authorities.push(server.spawn().await?);
Expand Down Expand Up @@ -445,6 +453,7 @@ pub async fn make_server(
consensus_committee: &ConsensusCommittee<Ed25519PublicKey>,
consensus_store_path: &Path,
consensus_parameters: &ConsensusParameters,
net_parameters: Option<Vec<AuthorityInfo>>,
) -> SuiResult<AuthorityServer> {
let store = Arc::new(AuthorityStore::open(&authority.db_path, None));
let name = *authority.key_pair.public_key_bytes();
Expand All @@ -463,6 +472,7 @@ pub async fn make_server(
consensus_committee,
consensus_store_path,
consensus_parameters,
net_parameters,
)
.await
}
Expand Down Expand Up @@ -512,6 +522,7 @@ pub async fn make_authority(
consensus_committee: &ConsensusCommittee<Ed25519PublicKey>,
consensus_store_path: &Path,
consensus_parameters: &ConsensusParameters,
net_parameters: Option<Vec<AuthorityInfo>>,
) -> SuiResult<AuthorityServer> {
let (tx_consensus_to_sui, rx_consensus_to_sui) = channel(1_000);
let (tx_sui_to_consensus, rx_sui_to_consensus) = channel(1_000);
Expand Down Expand Up @@ -544,6 +555,29 @@ pub async fn make_authority(
// authority server when a sequenced transaction is ready for execution.
ConsensusListener::spawn(rx_sui_to_consensus, rx_consensus_to_sui);

// If we have network information make authority clients
// to all authorities in the system.
let _active_authority = if let Some(network) = net_parameters {
let mut authority_clients = BTreeMap::new();
for info in &network {
let client = NetworkAuthorityClient::new(NetworkClient::new(
info.host.clone(),
info.base_port,
buffer_size,
Duration::from_secs(5),
Duration::from_secs(5),
));
authority_clients.insert(info.name, client);
}

let active_authority = ActiveAuthority::new(authority_state.clone(), authority_clients)?;

let join_handle = active_authority.spawn_all_active_processes().await;
Some(join_handle)
} else {
None
};

// Return new authority server. It listen to users transactions and send back replies.
Ok(AuthorityServer::new(
authority.host.clone(),
Expand Down
70 changes: 70 additions & 0 deletions sui_core/src/authority_active.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

/*
Authorities have a passive component (in AuthorityState), but can also have active
components to perform a number of functions such as:
(1) Share transactions received with other authorities, to complete their execution
in case clients fail before sharing a trasnaction with sufficient authorities.
(2) Share certificates with other authorities in case clients fail before a
certificate has its executon finalized.
(3) Gossip executed certificates digests with other authorities through following
each other and using push / pull to execute certificates.
(4) Perform the active operations necessary to progress the periodic checkpointing
protocol.
This component manages the root of all these active processes. It spawns services
and tasks that activelly initiate network operations to progess all these
processes.
Some ground rules:
- The logic here does nothing "privileged", namely any process that could not
have been performed over the public authority interface by an untrusted
client.
- All logic here should be safe to the ActiveAuthority state being transient
and multiple instances running in parallel per authority, or at untrusted
clients. Or Authority state being stopped, without its state being saved
(loss of store), and then restarted some time later.
*/

use std::{collections::BTreeMap, sync::Arc};

use sui_types::{base_types::AuthorityName, error::SuiResult};

use crate::{
authority::AuthorityState, authority_aggregator::AuthorityAggregator,
authority_client::AuthorityAPI,
};

pub struct ActiveAuthority<A> {
// The local authority state
pub authority: Arc<AuthorityState>,
// The network interfaces to other authorities
pub net: AuthorityAggregator<A>,
}

impl<A> ActiveAuthority<A> {
pub fn new(
authority: Arc<AuthorityState>,
authority_clients: BTreeMap<AuthorityName, A>,
) -> SuiResult<Self> {
let committee = authority.committee.clone();

Ok(ActiveAuthority {
authority,
net: AuthorityAggregator::new(committee, authority_clients),
})
}
}

impl<A> ActiveAuthority<A>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
// TODO: Active tasks go here + logic to spawn them all
pub async fn spawn_all_active_processes(self) -> Option<()> {
None
}
}
1 change: 1 addition & 0 deletions sui_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// SPDX-License-Identifier: Apache-2.0

pub mod authority;
pub mod authority_active;
pub mod authority_aggregator;
pub mod authority_batch;
pub mod authority_client;
Expand Down
1 change: 1 addition & 0 deletions test_utils/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ where
&consensus_committee,
/* consensus_store_path */ tempfile::tempdir().unwrap().path(),
&ConsensusParameters::default(),
None,
)
.await
.unwrap()
Expand Down

0 comments on commit 3b6cd6b

Please sign in to comment.