From af18e2ad17aca8076909e7ff2d4eca6107759a2e Mon Sep 17 00:00:00 2001 From: Brandon Williams Date: Mon, 24 Oct 2022 14:07:54 -0700 Subject: [PATCH] narwhal: remove invalid unsafe impl of Send + Sync --- narwhal/primary/src/header_waiter.rs | 18 ++++++++++-------- narwhal/types/src/bounded_future_queue.rs | 6 ------ narwhal/types/src/lib.rs | 3 +++ 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/narwhal/primary/src/header_waiter.rs b/narwhal/primary/src/header_waiter.rs index 01ff24a83a100..9e3f0f9a2e63a 100644 --- a/narwhal/primary/src/header_waiter.rs +++ b/narwhal/primary/src/header_waiter.rs @@ -8,7 +8,11 @@ use crate::{ use anyhow::Result; use config::{Committee, SharedWorkerCache, WorkerId}; use crypto::PublicKey; -use futures::future::{try_join_all, BoxFuture}; +use futures::{ + future::{try_join_all, BoxFuture}, + stream::FuturesUnordered, + StreamExt, +}; use network::{CancelOnDropHandler, P2pNetwork, ReliableNetwork, UnreliableNetwork}; use std::{ collections::HashMap, @@ -24,7 +28,6 @@ use tokio::{ }; use tracing::{debug, info, warn}; use types::{ - bounded_future_queue::BoundedFuturesUnordered, error::{DagError, DagResult}, metered_channel::{Receiver, Sender}, BatchDigest, CertificateDigest, Header, HeaderDigest, ReconfigureNotification, Round, @@ -164,9 +167,8 @@ impl HeaderWaiter { } /// Main loop listening to the `Synchronizer` messages. - async fn run(&mut self) { - let mut waiting: BoundedFuturesUnordered> = - BoundedFuturesUnordered::with_capacity(self.max_pending_header_waiter_requests()); + async fn run(mut self) { + let mut waiting: FuturesUnordered> = FuturesUnordered::new(); info!( "HeaderWaiter on node {} has started successfully.", @@ -176,7 +178,7 @@ impl HeaderWaiter { let mut attempt_garbage_collection = false; tokio::select! { - Some(message) = self.rx_header_waiter.recv(), if waiting.available_permits() > 0 => { + Some(message) = self.rx_header_waiter.recv(), if waiting.len() < self.max_pending_header_waiter_requests() => { match message { WaiterMessage::SyncBatches(missing, header) => { debug!("Synching the payload of {header}"); @@ -221,7 +223,7 @@ impl HeaderWaiter { rx_cancel); // pointer-size allocation, bounded by the # of blocks // (may eventually go away, see rust RFC #1909) - waiting.push(Box::pin(fut)).await; + waiting.push(Box::pin(fut)); } WaiterMessage::SyncParents(missing, header) => { @@ -244,7 +246,7 @@ impl HeaderWaiter { self.pending.insert(header_id, (round, tx_cancel)); let fut = Self::wait_for_parents(wait_for, self.certificate_store.clone(), header, rx_cancel); // pointer-size allocation, bounded by the # of blocks (may eventually go away, see rust RFC #1909) - waiting.push(Box::pin(fut)).await; + waiting.push(Box::pin(fut)); // Ensure we didn't already sent a sync request for these parents. // Optimistically send the sync request to the node that created the certificate. diff --git a/narwhal/types/src/bounded_future_queue.rs b/narwhal/types/src/bounded_future_queue.rs index e8cb2b8dfb121..cc659bb2e58cf 100644 --- a/narwhal/types/src/bounded_future_queue.rs +++ b/narwhal/types/src/bounded_future_queue.rs @@ -35,9 +35,6 @@ impl std::fmt::Debug for BoundedFuturesUnordered { } } -unsafe impl Sync for BoundedFuturesUnordered {} -unsafe impl Send for BoundedFuturesUnordered {} - // We expect to grow this facade over time impl BoundedFuturesUnordered { pub fn with_capacity(capacity: usize) -> Self { @@ -139,9 +136,6 @@ impl std::fmt::Debug for BoundedFuturesOrdered { } } -unsafe impl Sync for BoundedFuturesOrdered {} -unsafe impl Send for BoundedFuturesOrdered {} - // We expect to grow this facade over time impl BoundedFuturesOrdered { pub fn with_capacity(capacity: usize) -> Self { diff --git a/narwhal/types/src/lib.rs b/narwhal/types/src/lib.rs index 4137768c9198f..92d4ea81e3337 100644 --- a/narwhal/types/src/lib.rs +++ b/narwhal/types/src/lib.rs @@ -1,5 +1,8 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 + +#![forbid(unsafe_code)] + // Error types #[macro_use] pub mod error;