Skip to content

Commit

Permalink
Use a local connection between ConsensusAdapter and Narwhal (MystenLa…
Browse files Browse the repository at this point in the history
…bs#9694)

## Description 

We are having operational and potentially throughput issues with using
gRPC for Sui -> Narwhal submissions. Switching to a local interface
should simplify debugging.

## Test Plan 

Existing unit tests. Deploy to private testnet.
This change seems to have speed up reconfig simtests, sometimes cutting
the time by half.

---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
mwtian authored Mar 24, 2023
1 parent a1314eb commit 018aeb7
Show file tree
Hide file tree
Showing 10 changed files with 342 additions and 75 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 2 additions & 8 deletions crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
use anyhow::Result;
use async_trait::async_trait;
use mysten_metrics::spawn_monitored_task;
use narwhal_types::TransactionsClient;
use prometheus::{
register_histogram_with_registry, register_int_counter_with_registry, Histogram, IntCounter,
Registry,
Expand All @@ -26,7 +25,7 @@ use tap::TapFallible;
use tokio::task::JoinHandle;
use tracing::{error_span, info, Instrument};

use crate::consensus_adapter::ConnectionMonitorStatusForTests;
use crate::consensus_adapter::{ConnectionMonitorStatusForTests, LazyNarwhalClient};
use crate::{
authority::{AuthorityState, MAX_PER_OBJECT_EXECUTION_QUEUE_LENGTH},
consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics},
Expand Down Expand Up @@ -79,13 +78,8 @@ impl AuthorityServer {
state: Arc<AuthorityState>,
consensus_address: Multiaddr,
) -> Self {
let consensus_client = Box::new(TransactionsClient::new(
mysten_network::client::connect_lazy(&consensus_address)
.expect("Failed to connect to consensus"),
));

let consensus_adapter = Arc::new(ConsensusAdapter::new(
consensus_client,
Box::new(LazyNarwhalClient::new(consensus_address)),
state.name,
Box::new(Arc::new(ConnectionMonitorStatusForTests {})),
100_000,
Expand Down
78 changes: 76 additions & 2 deletions crates/sui-core/src/consensus_adapter.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use arc_swap::ArcSwap;
use arc_swap::{ArcSwap, ArcSwapOption};
use bytes::Bytes;
use dashmap::try_result::TryResult;
use dashmap::DashMap;
use futures::future::{select, Either};
use futures::FutureExt;
use itertools::Itertools;
use mysten_network::Multiaddr;
use narwhal_types::{TransactionProto, TransactionsClient};
use narwhal_worker::LocalNarwhalClient;
use parking_lot::{Mutex, RwLockReadGuard};
use prometheus::IntGauge;
use prometheus::Registry;
Expand Down Expand Up @@ -36,7 +38,7 @@ use sui_types::{
use tap::prelude::*;
use tokio::sync::{Semaphore, SemaphorePermit};
use tokio::task::JoinHandle;
use tokio::time;
use tokio::time::{self, sleep, timeout};

use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
use crate::epoch::reconfiguration::{ReconfigState, ReconfigurationInitiator};
Expand Down Expand Up @@ -184,6 +186,78 @@ impl SubmitToConsensus for TransactionsClient<sui_network::tonic::transport::Cha
}
}

/// A Narwhal client that instantiates LocalNarwhalClient lazily.
pub struct LazyNarwhalClient {
/// Outer ArcSwapOption allows initialization after the first connection to Narwhal.
/// Inner ArcSwap allows Narwhal restarts across epoch changes.
client: ArcSwapOption<ArcSwap<LocalNarwhalClient>>,
addr: Multiaddr,
}

impl LazyNarwhalClient {
/// Lazily instantiates LocalNarwhalClient keyed by the address of the Narwhal worker.
pub fn new(addr: Multiaddr) -> Self {
Self {
client: ArcSwapOption::empty(),
addr,
}
}

async fn get(&self) -> Arc<ArcSwap<LocalNarwhalClient>> {
// Narwhal may not have started and created LocalNarwhalClient, so retry in a loop.
// Retries should only happen on Sui process start.
if let Ok(client) = timeout(Duration::from_secs(30), async {
loop {
match LocalNarwhalClient::get_global(&self.addr) {
Some(c) => return c,
None => {
sleep(Duration::from_millis(100)).await;
continue;
}
};
}
})
.await
{
return client;
}
panic!("Timed out waiting for Narwhal to start on {}!", self.addr);
}
}

#[async_trait::async_trait]
impl SubmitToConsensus for LazyNarwhalClient {
async fn submit_to_consensus(
&self,
transaction: &ConsensusTransaction,
_epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult {
let transaction =
bcs::to_bytes(transaction).expect("Serializing consensus transaction cannot fail");
// The retrieved LocalNarwhalClient can be from the past epoch. Submit would fail after
// Narwhal shuts down, so there should be no correctness issue.
let client = {
let c = self.client.load();
if c.is_some() {
c
} else {
self.client.store(Some(self.get().await));
self.client.load()
}
};
let client = client.as_ref().unwrap().load();
client
.submit_transaction(transaction)
.await
.map_err(|e| SuiError::FailedToSubmitToConsensus(format!("{:?}", e)))
.tap_err(|r| {
// Will be logged by caller as well.
warn!("Submit transaction failed with: {:?}", r);
})?;
Ok(())
}
}

/// Submit Sui certificates to the consensus.
pub struct ConsensusAdapter {
/// The network client connecting to the consensus node of this authority.
Expand Down
25 changes: 5 additions & 20 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use anyhow::Result;
use arc_swap::ArcSwap;
use futures::TryFutureExt;
use prometheus::Registry;
use sui_core::consensus_adapter::SubmitToConsensus;
use sui_core::consensus_adapter::{LazyNarwhalClient, SubmitToConsensus};
use sui_types::sui_system_state::SuiSystemState;
use tap::tap::TapFallible;
use tokio::sync::broadcast;
Expand All @@ -34,7 +34,6 @@ use mysten_metrics::{spawn_monitored_task, RegistryService};
use mysten_network::server::ServerBuilder;
use narwhal_network::metrics::MetricsMakeCallbackHandler;
use narwhal_network::metrics::{NetworkConnectionMetrics, NetworkMetrics};
use narwhal_types::TransactionsClient;
use sui_config::node::DBCheckpointConfig;
use sui_config::{ConsensusConfig, NodeConfig};
use sui_core::authority::authority_per_epoch_store::AuthorityPerEpochStore;
Expand Down Expand Up @@ -77,7 +76,7 @@ use sui_macros::fail_point_async;
use sui_network::api::ValidatorServer;
use sui_network::discovery;
use sui_network::discovery::TrustedPeerChangeEvent;
use sui_network::{state_sync, DEFAULT_CONNECT_TIMEOUT_SEC, DEFAULT_HTTP2_KEEPALIVE_SEC};
use sui_network::state_sync;
use sui_protocol_config::{ProtocolConfig, SupportedProtocolVersions};
use sui_storage::IndexStore;
use sui_types::base_types::{AuthorityName, EpochId, TransactionDigest};
Expand Down Expand Up @@ -759,27 +758,13 @@ impl SuiNode {
connection_monitor_status: Arc<ConnectionMonitorStatus>,
prometheus_registry: &Registry,
) -> ConsensusAdapter {
const REQUEST_TIMEOUT: Duration = Duration::from_secs(20);

let consensus_address = consensus_config.address().to_owned();
let client_config = mysten_network::config::Config {
connect_timeout: Some(DEFAULT_CONNECT_TIMEOUT_SEC),
http2_keepalive_interval: Some(DEFAULT_HTTP2_KEEPALIVE_SEC),
request_timeout: Some(REQUEST_TIMEOUT),
..Default::default()
};

let consensus_client = TransactionsClient::new(
client_config
.connect_lazy(&consensus_address)
.expect("Failed to connect to consensus"),
);

let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
// The consensus adapter allows the authority to send user certificates through consensus.

ConsensusAdapter::new(
Box::new(consensus_client),
Box::new(LazyNarwhalClient::new(
consensus_config.address().to_owned(),
)),
authority,
Box::new(connection_monitor_status),
consensus_config.max_pending_transactions(),
Expand Down
2 changes: 2 additions & 0 deletions crates/sui-types/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,8 @@ pub enum SuiError {
FullNodeInvalidTxRangeQuery { error: String },

// Errors related to the authority-consensus interface.
#[error("Failed to submit transaction to consensus: {0}")]
FailedToSubmitToConsensus(String),
#[error("Failed to connect with consensus node: {0}")]
ConsensusConnectionBroken(String),
#[error("Failed to hear back from consensus: {0}")]
Expand Down
3 changes: 2 additions & 1 deletion narwhal/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ edition = "2021"
publish = false

[dependencies]
arc-swap = "1.5.1"
async-trait = "0.1.61"
byteorder = "1.4.3"
bytes = "1.3.0"
futures = "0.3.24"
governor = "0.5.1"
rand = { version = "0.8.5", features = ["small_rng"] }
tap = "1.0.1"
thiserror = "1.0.35"
tokio = { workspace = true, features = ["sync", "rt", "macros"] }
tonic = "0.8.2"
tower = "0.4.13"
Expand All @@ -27,7 +29,6 @@ types = { path = "../types", package = "narwhal-types" }
prometheus = "0.13.3"
store = { path = "../../crates/typed-store", package = "typed-store" }
mysten-network = { path = "../../crates/mysten-network"}

mysten-metrics = { path = "../../crates/mysten-metrics" }

anemo.workspace = true
Expand Down
106 changes: 106 additions & 0 deletions narwhal/worker/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{
collections::{btree_map::Entry, BTreeMap},
net::Ipv4Addr,
sync::{Arc, Mutex},
};

use arc_swap::ArcSwap;
use mysten_network::{multiaddr::Protocol, Multiaddr};
use thiserror::Error;
use types::{metered_channel::Sender, Transaction, TxResponse};

/// Uses a map to allow running multiple Narwhal instances in the same process.
/// TODO: after Rust 1.66, use BTreeMap::new() instead of wrapping it in an Option.
static LOCAL_NARWHAL_CLIENTS: Mutex<Option<BTreeMap<Multiaddr, Arc<ArcSwap<LocalNarwhalClient>>>>> =
Mutex::new(None);

/// The maximum allowed size of transactions into Narwhal.
/// TODO: maybe move to TxValidator?
pub const MAX_ALLOWED_TRANSACTION_SIZE: usize = 6 * 1024 * 1024;

/// Errors returned to clients submitting transactions to Narwhal.
#[derive(Clone, Debug, Error)]
pub enum NarwhalError {
#[error("Failed to include transaction in a header!")]
TransactionNotIncludedInHeader,

#[error("Narwhal is shutting down!")]
ShuttingDown,

#[error("Transaction is too large: size={0} limit={1}")]
TransactionTooLarge(usize, usize),
}

/// TODO: add NarwhalClient trait and implement RemoteNarwhalClient with grpc.
/// A client that connects to Narwhal locally.
#[derive(Clone)]
pub struct LocalNarwhalClient {
/// TODO: maybe use tx_batch_maker for load schedding.
tx_batch_maker: Sender<(Transaction, TxResponse)>,
}

impl LocalNarwhalClient {
pub fn new(tx_batch_maker: Sender<(Transaction, TxResponse)>) -> Arc<Self> {
Arc::new(Self { tx_batch_maker })
}

/// Sets the instance of LocalNarwhalClient for the local address.
/// Address is only used as the key.
pub fn set_global(addr: Multiaddr, instance: Arc<Self>) {
let addr = Self::canonicalize_address_key(addr);
let mut clients = LOCAL_NARWHAL_CLIENTS.lock().unwrap();
if clients.is_none() {
*clients = Some(BTreeMap::new());
}
match clients.as_mut().unwrap().entry(addr) {
Entry::Vacant(entry) => {
entry.insert(Arc::new(ArcSwap::from(instance)));
}
Entry::Occupied(mut entry) => {
entry.get_mut().store(instance);
}
};
}

/// Gets the instance of LocalNarwhalClient for the local address.
/// Address is only used as the key.
pub fn get_global(addr: &Multiaddr) -> Option<Arc<ArcSwap<Self>>> {
let addr = Self::canonicalize_address_key(addr.clone());
let clients = LOCAL_NARWHAL_CLIENTS.lock().unwrap();
clients.as_ref()?.get(&addr).cloned()
}

/// Submits a transaction to the local Narwhal worker.
pub async fn submit_transaction(&self, transaction: Transaction) -> Result<(), NarwhalError> {
if transaction.len() > MAX_ALLOWED_TRANSACTION_SIZE {
return Err(NarwhalError::TransactionTooLarge(
transaction.len(),
MAX_ALLOWED_TRANSACTION_SIZE,
));
}
// Send the transaction to the batch maker.
let (notifier, when_done) = tokio::sync::oneshot::channel();
self.tx_batch_maker
.send((transaction, notifier))
.await
.map_err(|_| NarwhalError::ShuttingDown)?;

let _digest = when_done
.await
.map_err(|_| NarwhalError::TransactionNotIncludedInHeader)?;

Ok(())
}

/// Ensures getter and setter use the same key for the same network address.
/// This is needed because TxServer serves from 0.0.0.0.
fn canonicalize_address_key(address: Multiaddr) -> Multiaddr {
address
.replace(0, |_protocol| Some(Protocol::Ip4(Ipv4Addr::UNSPECIFIED)))
.unwrap()
}
}
5 changes: 4 additions & 1 deletion narwhal/worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@
)]

mod batch_maker;
mod client;
mod handlers;
pub mod metrics;
mod primary_connector;
mod quorum_waiter;
mod transactions_server;
mod tx_validator;
mod worker;

pub mod metrics;

pub use crate::client::LocalNarwhalClient;
pub use crate::tx_validator::{TransactionValidator, TrivialTransactionValidator};
pub use crate::worker::Worker;

Expand Down
Loading

0 comments on commit 018aeb7

Please sign in to comment.