Skip to content

Commit

Permalink
[Narwhal] call handlers directly for local PrimaryToWorker and Worker…
Browse files Browse the repository at this point in the history
…ToPrimary communications (MystenLabs#9821)

## Description 

We have observed the following issues in practice:
1. PrimaryToWorker and WorkerToPrimary connections sometimes break or
stay broken. The can be multiple causes, e.g. network misconfigurations,
load, fragile logic or something else.
2. There are components that are dependencies for network handlers, or
part of consensus, e.g. Synchronizer and Subscriber, that need access to
the network, but passing Network to them cannot be done at creation
time.

This change aims to address the above two issues:
1. Wire PrimaryToWorker and WorkerToPrimary handlers to the client
callsites directly, without going through the networking layer.
2. Pass a `NetworkClient` object to components that need access to the
network. Local handlers and in future remote Networks will be wired to
the `NetworkClient`, but these do not need to happen before the creation
of `NetworkClient`.

MystenLabs#10168

## Test Plan 

existing tests

---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
mwtian authored Mar 30, 2023
1 parent 0cd0702 commit ea474c0
Show file tree
Hide file tree
Showing 37 changed files with 660 additions and 450 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/mysten-common/src/sync/notify_once.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl NotifyOnce {
/// This method returns errors if called more then once
#[allow(clippy::result_unit_err)]
pub fn notify(&self) -> Result<(), ()> {
let Some(notify) = self.notify.lock().take()else { return Err(()) };
let Some(notify) = self.notify.lock().take() else { return Err(()) };
// At this point all `register` either registered with current notify,
// or will be returning immediately
notify.notify_waiters();
Expand Down
1 change: 1 addition & 0 deletions crates/sui-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ mysten-common.workspace = true
mysten-metrics = { path = "../mysten-metrics" }
narwhal-config = { path = "../../narwhal/config" }
narwhal-executor = { path = "../../narwhal/executor" }
narwhal-network = { path = "../../narwhal/network" }
narwhal-node = { path = "../../narwhal/node" }
narwhal-crypto = { path = "../../narwhal/crypto" }
narwhal-types = { path = "../../narwhal/types" }
Expand Down
8 changes: 7 additions & 1 deletion crates/sui-core/src/narwhal_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use fastcrypto::traits::KeyPair;
use mysten_metrics::RegistryService;
use narwhal_config::{Committee, Epoch, Parameters, WorkerCache, WorkerId};
use narwhal_executor::ExecutionState;
use narwhal_network::client::NetworkClient;
use narwhal_node::primary_node::PrimaryNode;
use narwhal_node::worker_node::WorkerNodes;
use narwhal_node::{CertificateStoreCacheMetrics, NodeStorage};
Expand Down Expand Up @@ -75,12 +76,12 @@ impl NarwhalManagerMetrics {
}

pub struct NarwhalManager {
storage_base_path: PathBuf,
primary_keypair: AuthorityKeyPair,
network_keypair: NetworkKeyPair,
worker_ids_and_keypairs: Vec<(WorkerId, NetworkKeyPair)>,
primary_node: PrimaryNode,
worker_nodes: WorkerNodes,
storage_base_path: PathBuf,
running: Mutex<Running>,
metrics: NarwhalManagerMetrics,
store_cache_metrics: CertificateStoreCacheMetrics,
Expand Down Expand Up @@ -141,6 +142,9 @@ impl NarwhalManager {
let store_path = self.get_store_path(committee.epoch());
let store = NodeStorage::reopen(store_path, Some(self.store_cache_metrics.clone()));

// Create a new client.
let network_client = NetworkClient::new_from_keypair(&self.network_keypair);

let name = self.primary_keypair.public().clone();

tracing::info!("Starting up Narwhal for epoch {}", committee.epoch());
Expand All @@ -156,6 +160,7 @@ impl NarwhalManager {
self.network_keypair.copy(),
committee.clone(),
worker_cache.clone(),
network_client.clone(),
&store,
execution_state.clone(),
)
Expand Down Expand Up @@ -193,6 +198,7 @@ impl NarwhalManager {
id_keypair_copy,
committee.clone(),
worker_cache.clone(),
network_client.clone(),
&store,
tx_validator.clone(),
)
Expand Down
15 changes: 9 additions & 6 deletions narwhal/benchmark/benchmark/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,18 +261,21 @@ def result(self):
consensus_tps, consensus_bps, _ = self._consensus_throughput()
end_to_end_tps, end_to_end_bps, duration = self._end_to_end_throughput()
end_to_end_latency = self._end_to_end_latency() * 1_000

# TODO: support primary and worker on different processes, and fail on
# empty log entries.
batch_creation_latency = mean(
self.batch_creation_latencies.values()) * 1000
self.batch_creation_latencies.values()) * 1000 if self.batch_creation_latencies else -1
header_creation_latency = mean(
self.header_creation_latencies .values()) * 1000
self.header_creation_latencies.values()) * 1000 if self.header_creation_latencies else -1
batch_to_header_latency = mean(
self.batch_to_header_latencies.values()) * 1000
self.batch_to_header_latencies.values()) * 1000 if self.batch_to_header_latencies else -1
header_to_cert_latency = mean(
self.header_to_cert_latencies.values()) * 1000
self.header_to_cert_latencies.values()) * 1000 if self.header_to_cert_latencies else -1
cert_commit_latency = mean(
self.cert_commit_latencies.values()) * 1000
self.cert_commit_latencies.values()) * 1000 if self.cert_commit_latencies else -1
request_vote_outbound_latency = mean(
self.request_vote_outbound_latencies)
self.request_vote_outbound_latencies) if self.request_vote_outbound_latencies else -1

return (
'\n'
Expand Down
2 changes: 2 additions & 0 deletions narwhal/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ async-trait = "0.1.61"
backoff = { version = "0.4.0", features = ["tokio"] }
bytes = "1.3.0"
futures = "0.3.24"
parking_lot = "0.12.1"
prometheus = "0.13.3"
rand = { version = "0.8.5", features = ["small_rng"] }
tokio = { workspace = true, features = ["rt", "net", "sync", "macros", "time"] }
tracing = "0.1.36"
types = { path = "../types", package = "narwhal-types" }
crypto = { path = "../crypto", package = "narwhal-crypto" }
mysten-common = { path = "../../crates/mysten-common" }
mysten-metrics = { path = "../../crates/mysten-metrics" }

workspace-hack = { version = "0.1", path = "../../crates/workspace-hack" }
Expand Down
193 changes: 193 additions & 0 deletions narwhal/network/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{collections::BTreeMap, sync::Arc, time::Duration};

use anemo::{PeerId, Request};
use async_trait::async_trait;
use crypto::{traits::KeyPair, NetworkKeyPair, NetworkPublicKey};
use mysten_common::sync::notify_once::NotifyOnce;
use parking_lot::RwLock;
use tokio::{select, time::sleep};
use tracing::debug;
use types::{
error::LocalClientError, PrimaryToWorker, WorkerOthersBatchMessage, WorkerOurBatchMessage,
WorkerSynchronizeMessage, WorkerToPrimary,
};

use crate::traits::{PrimaryToWorkerClient, WorkerToPrimaryClient};

/// NetworkClient provides the interface to send requests to other nodes, and call other components
/// directly if they live in the same process. It is used by both primary and worker(s).
///
/// Currently this only supports local direct calls, and it will be extended to support remote
/// network calls.
///
/// TODO: investigate splitting this into Primary and Worker specific clients.
#[derive(Clone)]
pub struct NetworkClient {
inner: Arc<RwLock<Inner>>,
shutdown_notify: Arc<NotifyOnce>,
}

struct Inner {
// The private-public network key pair of this authority.
primary_peer_id: PeerId,
worker_to_primary_handler: Option<Arc<dyn WorkerToPrimary>>,
primary_to_worker_handler: BTreeMap<PeerId, Arc<dyn PrimaryToWorker>>,
shutdown: bool,
}

impl NetworkClient {
const GET_CLIENT_RETRIES: usize = 50;
const GET_CLIENT_INTERVAL: Duration = Duration::from_millis(100);

pub fn new(primary_peer_id: PeerId) -> Self {
Self {
inner: Arc::new(RwLock::new(Inner {
primary_peer_id,
worker_to_primary_handler: None,
primary_to_worker_handler: BTreeMap::new(),
shutdown: false,
})),
shutdown_notify: Arc::new(NotifyOnce::new()),
}
}

pub fn new_from_keypair(primary_network_keypair: &NetworkKeyPair) -> Self {
Self::new(PeerId(primary_network_keypair.public().0.into()))
}

pub fn new_with_empty_id() -> Self {
// ED25519_PUBLIC_KEY_LENGTH is 32 bytes.
Self::new(empty_peer_id())
}

pub fn set_worker_to_primary_local_handler(&self, handler: Arc<dyn WorkerToPrimary>) {
let mut inner = self.inner.write();
inner.worker_to_primary_handler = Some(handler);
}

pub fn set_primary_to_worker_local_handler(
&self,
worker_id: PeerId,
handler: Arc<dyn PrimaryToWorker>,
) {
let mut inner = self.inner.write();
inner.primary_to_worker_handler.insert(worker_id, handler);
}

pub fn shutdown(&self) {
let mut inner = self.inner.write();
if inner.shutdown {
return;
}
inner.worker_to_primary_handler = None;
inner.primary_to_worker_handler = BTreeMap::new();
inner.shutdown = true;
let _ = self.shutdown_notify.notify();
}

async fn get_primary_to_worker_handler(
&self,
peer_id: PeerId,
) -> Result<Arc<dyn PrimaryToWorker>, LocalClientError> {
for _ in 0..Self::GET_CLIENT_RETRIES {
{
let inner = self.inner.read();
if inner.shutdown {
return Err(LocalClientError::ShuttingDown);
}
if let Some(handler) = inner.primary_to_worker_handler.get(&peer_id) {
return Ok(handler.clone());
}
}
sleep(Self::GET_CLIENT_INTERVAL).await;
}
Err(LocalClientError::WorkerNotStarted(peer_id))
}

async fn get_worker_to_primary_handler(
&self,
) -> Result<Arc<dyn WorkerToPrimary>, LocalClientError> {
for _ in 0..Self::GET_CLIENT_RETRIES {
{
let inner = self.inner.read();
if inner.shutdown {
return Err(LocalClientError::ShuttingDown);
}
if let Some(handler) = &inner.worker_to_primary_handler {
debug!("Found primary {}", inner.primary_peer_id);
return Ok(handler.clone());
}
}
sleep(Self::GET_CLIENT_INTERVAL).await;
}
Err(LocalClientError::PrimaryNotStarted(
self.inner.read().primary_peer_id,
))
}
}

// TODO: extract common logic for shutdown.

#[async_trait]
impl PrimaryToWorkerClient for NetworkClient {
async fn synchronize(
&self,
worker_peer: NetworkPublicKey,
request: WorkerSynchronizeMessage,
) -> Result<(), LocalClientError> {
let c = self
.get_primary_to_worker_handler(PeerId(worker_peer.0.into()))
.await?;
select! {
resp = c.synchronize(Request::new(request)) => {
resp.map_err(|e| LocalClientError::Internal(format!("{e:?}")))?;
Ok(())
},
() = self.shutdown_notify.wait() => {
Err(LocalClientError::ShuttingDown)
},
}
}
}

#[async_trait]
impl WorkerToPrimaryClient for NetworkClient {
async fn report_our_batch(
&self,
request: WorkerOurBatchMessage,
) -> Result<(), LocalClientError> {
let c = self.get_worker_to_primary_handler().await?;
select! {
resp = c.report_our_batch(Request::new(request)) => {
resp.map_err(|e| LocalClientError::Internal(format!("{e:?}")))?;
Ok(())
},
() = self.shutdown_notify.wait() => {
Err(LocalClientError::ShuttingDown)
},
}
}

async fn report_others_batch(
&self,
request: WorkerOthersBatchMessage,
) -> Result<(), LocalClientError> {
let c = self.get_worker_to_primary_handler().await?;
select! {
resp = c.report_others_batch(Request::new(request)) => {
resp.map_err(|e| LocalClientError::Internal(format!("{e:?}")))?;
Ok(())
},
() = self.shutdown_notify.wait() => {
Err(LocalClientError::ShuttingDown)
},
}
}
}

fn empty_peer_id() -> PeerId {
PeerId([0u8; 32])
}
4 changes: 3 additions & 1 deletion narwhal/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

pub mod admin;
pub mod anemo_ext;
pub mod client;
pub mod connectivity;
pub mod epoch_filter;
pub mod failpoints;
Expand All @@ -22,7 +23,8 @@ mod traits;
pub use crate::{
retry::RetryConfig,
traits::{
PrimaryToPrimaryRpc, PrimaryToWorkerRpc, ReliableNetwork, UnreliableNetwork, WorkerRpc,
PrimaryToPrimaryRpc, PrimaryToWorkerClient, PrimaryToWorkerRpc, ReliableNetwork,
UnreliableNetwork, WorkerRpc, WorkerToPrimaryClient,
},
};

Expand Down
28 changes: 26 additions & 2 deletions narwhal/network/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ use async_trait::async_trait;
use crypto::NetworkPublicKey;
use tokio::task::JoinHandle;
use types::{
Batch, BatchDigest, FetchCertificatesRequest, FetchCertificatesResponse,
GetCertificatesRequest, GetCertificatesResponse, RequestBatchesRequest, RequestBatchesResponse,
error::LocalClientError, Batch, BatchDigest, FetchCertificatesRequest,
FetchCertificatesResponse, GetCertificatesRequest, GetCertificatesResponse,
RequestBatchesRequest, RequestBatchesResponse, WorkerOthersBatchMessage, WorkerOurBatchMessage,
WorkerSynchronizeMessage,
};

pub trait UnreliableNetwork<Request: Clone + Send + Sync> {
Expand Down Expand Up @@ -78,6 +80,28 @@ pub trait PrimaryToWorkerRpc {
-> Result<()>;
}

#[async_trait]
pub trait PrimaryToWorkerClient {
async fn synchronize(
&self,
worker_name: NetworkPublicKey,
request: WorkerSynchronizeMessage,
) -> Result<(), LocalClientError>;
}

#[async_trait]
pub trait WorkerToPrimaryClient {
async fn report_our_batch(
&self,
request: WorkerOurBatchMessage,
) -> Result<(), LocalClientError>;

async fn report_others_batch(
&self,
request: WorkerOthersBatchMessage,
) -> Result<(), LocalClientError>;
}

#[async_trait]
pub trait WorkerRpc {
async fn request_batch(
Expand Down
Loading

0 comments on commit ea474c0

Please sign in to comment.