Skip to content

Commit

Permalink
Consensus Adapter (MystenLabs#1449)
Browse files Browse the repository at this point in the history
Introduce the consensus adapter module
  • Loading branch information
asonnino authored Apr 20, 2022
1 parent de86f44 commit cf53ad5
Show file tree
Hide file tree
Showing 17 changed files with 457 additions and 30 deletions.
6 changes: 6 additions & 0 deletions sui/src/benchmark/load_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,17 @@ pub async fn spawn_authority_server(
network_server: NetworkServer,
state: AuthorityState,
) -> transport::SpawnedServer<AuthorityServer> {
// The following two fields are only needed for shared objects (not by this bench).
let consensus_address = "127.0.0.1:0".parse().unwrap();
let (tx_consensus_listener, _rx_consensus_listener) = tokio::sync::mpsc::channel(1);

let server = AuthorityServer::new(
network_server.base_address,
network_server.base_port,
network_server.buffer_size,
state,
consensus_address,
tx_consensus_listener,
);
server.spawn().await.unwrap()
}
Expand Down
18 changes: 15 additions & 3 deletions sui/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ use serde_with::serde_as;
use sui_types::committee::Committee;
use tracing::log::trace;

use crate::gateway::GatewayType;
use crate::keystore::KeystoreType;
use std::net::SocketAddr;
use sui_framework::DEFAULT_FRAMEWORK_PATH;
use sui_network::network::PortAllocator;
use sui_types::base_types::*;
use sui_types::crypto::{get_key_pair, KeyPair};

use crate::gateway::GatewayType;
use crate::keystore::KeystoreType;

const DEFAULT_WEIGHT: usize = 1;
const DEFAULT_GAS_AMOUNT: u64 = 100000;
pub const AUTHORITIES_DB_NAME: &str = "authorities_db";
Expand All @@ -50,6 +50,7 @@ pub struct AuthorityPrivateInfo {
pub port: u16,
pub db_path: PathBuf,
pub stake: usize,
pub consensus_address: SocketAddr,
}

// Custom deserializer with optional default fields
Expand Down Expand Up @@ -92,13 +93,24 @@ impl<'de> Deserialize<'de> for AuthorityPrivateInfo {
} else {
DEFAULT_WEIGHT
};
let consensus_address = if let Some(val) = json.get("consensus_address") {
SocketAddr::deserialize(val).map_err(serde::de::Error::custom)?
} else {
let port = PORT_ALLOCATOR
.lock()
.map_err(serde::de::Error::custom)?
.next_port()
.ok_or_else(|| serde::de::Error::custom("No available port."))?;
format!("127.0.0.1:{port}").parse().unwrap()
};

Ok(AuthorityPrivateInfo {
key_pair,
host,
port,
db_path,
stake,
consensus_address,
})
}
}
Expand Down
45 changes: 32 additions & 13 deletions sui/src/sui_commands.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::collections::BTreeMap;
use std::fs;
use std::path::PathBuf;
use std::sync::Arc;

use anyhow::{anyhow, bail};
use base64ct::{Base64, Encoding};
use clap::*;
use futures::future::join_all;
use move_binary_format::CompiledModule;
use move_package::BuildConfig;
use tracing::{error, info};

use std::collections::BTreeMap;
use std::fs;
use std::path::PathBuf;
use std::sync::Arc;
use sui_adapter::adapter::generate_package_id;
use sui_adapter::genesis;
use sui_core::authority::{AuthorityState, AuthorityStore};
use sui_core::authority_server::AuthorityServer;
use sui_core::consensus_adapter::ConsensusListener;
use sui_network::transport::SpawnedServer;
use sui_network::transport::DEFAULT_MAX_DATAGRAM_SIZE;
use sui_types::base_types::decode_bytes_hex;
use sui_types::base_types::{SequenceNumber, SuiAddress, TxContext};
use sui_types::committee::Committee;
use sui_types::error::SuiResult;
use sui_types::object::Object;
use tokio::sync::mpsc::channel;
use tracing::{error, info};

use crate::config::{
AuthorityPrivateInfo, Config, GenesisConfig, NetworkConfig, PersistedConfig, WalletConfig,
Expand Down Expand Up @@ -430,12 +430,8 @@ pub async fn make_server(
store,
)
.await;
Ok(AuthorityServer::new(
authority.host.clone(),
authority.port,
buffer_size,
state,
))

make_authority(authority, buffer_size, state).await
}

async fn make_server_with_genesis_ctx(
Expand Down Expand Up @@ -463,10 +459,33 @@ async fn make_server_with_genesis_ctx(
state.insert_genesis_object(object.clone()).await;
}

make_authority(authority, buffer_size, state).await
}

/// Spawn all the subsystems run by a Sui authority: a consensus node, a sui authority server,
/// and a consensus listener bridging the consensus node and the sui authority.
async fn make_authority(
authority: &AuthorityPrivateInfo,
buffer_size: usize,
state: AuthorityState,
) -> SuiResult<AuthorityServer> {
let (tx_consensus_to_sui, rx_consensus_to_sui) = channel(1_000);
let (tx_sui_to_consensus, rx_sui_to_consensus) = channel(1_000);

// TODO [issue #633]: Spawn the consensus node of this authority.
let _tx_consensus_to_sui = tx_consensus_to_sui;

// Spawn a consensus listener. It listen for consensus outputs and notifies the
// authority server when a sequenced transaction is ready for execution.
ConsensusListener::spawn(rx_sui_to_consensus, rx_consensus_to_sui);

// Return new authority server. It listen to users transactions and send back replies.
Ok(AuthorityServer::new(
authority.host.clone(),
authority.port,
buffer_size,
state,
authority.consensus_address,
/* tx_consensus_listener */ tx_sui_to_consensus,
))
}
1 change: 1 addition & 0 deletions sui/src/unit_tests/sui_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub async fn start_test_network(
port: 0,
db_path: info.db_path.clone(),
stake: info.stake,
consensus_address: info.consensus_address,
})
.collect();
genesis_config.authorities = authorities;
Expand Down
31 changes: 28 additions & 3 deletions sui_core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
// SPDX-License-Identifier: Apache-2.0

use crate::authority::AuthorityState;

use futures::{SinkExt, StreamExt};
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::{io, sync::Arc};
use sui_network::{
network::NetworkServer,
Expand All @@ -13,12 +14,12 @@ use sui_network::{
use sui_types::{
batch::UpdateItem, crypto::VerificationObligation, error::*, messages::*, serialize::*,
};

use futures::{SinkExt, StreamExt};
use tokio::sync::mpsc::Sender;

use std::time::Duration;
use tracing::{error, info, warn, Instrument};

use crate::consensus_adapter::{ConsensusInput, ConsensusSubmitter};
use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
use tokio::sync::broadcast::error::RecvError;
Expand All @@ -40,6 +41,7 @@ const MAX_DELAY_MILLIS: u64 = 5_000; // 5 sec
pub struct AuthorityServer {
server: NetworkServer,
pub state: AuthorityState,
consensus_submitter: ConsensusSubmitter,
}

impl AuthorityServer {
Expand All @@ -48,10 +50,19 @@ impl AuthorityServer {
base_port: u16,
buffer_size: usize,
state: AuthorityState,
consensus_address: SocketAddr,
tx_consensus_listener: Sender<ConsensusInput>,
) -> Self {
let consensus_submitter = ConsensusSubmitter::new(
consensus_address,
buffer_size,
state.committee.clone(),
tx_consensus_listener,
);
Self {
server: NetworkServer::new(base_address, base_port, buffer_size),
state,
consensus_submitter,
}
}

Expand Down Expand Up @@ -244,6 +255,20 @@ impl AuthorityServer {
.handle_batch_streaming(*message, channel)
.await
.map(|_| None),
SerializedMessage::ConsensusTransaction(message) => {
match self.consensus_submitter.submit(&message).await {
Ok(()) => match *message {
ConsensusTransaction::UserTransaction(certificate) => {
let confirmation_transaction = ConfirmationTransaction { certificate };
self.state
.handle_confirmation_transaction(confirmation_transaction)
.await
.map(|info| Some(serialize_transaction_info(&info)))
}
},
Err(e) => Err(e),
}
}

_ => Err(SuiError::UnexpectedMessage),
};
Expand Down
Loading

0 comments on commit cf53ad5

Please sign in to comment.