Skip to content

Commit

Permalink
narwhal: remove invalid unsafe impl of Send + Sync
Browse files Browse the repository at this point in the history
  • Loading branch information
bmwill committed Oct 24, 2022
1 parent e6fe290 commit af18e2a
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 14 deletions.
18 changes: 10 additions & 8 deletions narwhal/primary/src/header_waiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ use crate::{
use anyhow::Result;
use config::{Committee, SharedWorkerCache, WorkerId};
use crypto::PublicKey;
use futures::future::{try_join_all, BoxFuture};
use futures::{
future::{try_join_all, BoxFuture},
stream::FuturesUnordered,
StreamExt,
};
use network::{CancelOnDropHandler, P2pNetwork, ReliableNetwork, UnreliableNetwork};
use std::{
collections::HashMap,
Expand All @@ -24,7 +28,6 @@ use tokio::{
};
use tracing::{debug, info, warn};
use types::{
bounded_future_queue::BoundedFuturesUnordered,
error::{DagError, DagResult},
metered_channel::{Receiver, Sender},
BatchDigest, CertificateDigest, Header, HeaderDigest, ReconfigureNotification, Round,
Expand Down Expand Up @@ -164,9 +167,8 @@ impl HeaderWaiter {
}

/// Main loop listening to the `Synchronizer` messages.
async fn run(&mut self) {
let mut waiting: BoundedFuturesUnordered<BoxFuture<'_, _>> =
BoundedFuturesUnordered::with_capacity(self.max_pending_header_waiter_requests());
async fn run(mut self) {
let mut waiting: FuturesUnordered<BoxFuture<'static, _>> = FuturesUnordered::new();

info!(
"HeaderWaiter on node {} has started successfully.",
Expand All @@ -176,7 +178,7 @@ impl HeaderWaiter {
let mut attempt_garbage_collection = false;

tokio::select! {
Some(message) = self.rx_header_waiter.recv(), if waiting.available_permits() > 0 => {
Some(message) = self.rx_header_waiter.recv(), if waiting.len() < self.max_pending_header_waiter_requests() => {
match message {
WaiterMessage::SyncBatches(missing, header) => {
debug!("Synching the payload of {header}");
Expand Down Expand Up @@ -221,7 +223,7 @@ impl HeaderWaiter {
rx_cancel);
// pointer-size allocation, bounded by the # of blocks
// (may eventually go away, see rust RFC #1909)
waiting.push(Box::pin(fut)).await;
waiting.push(Box::pin(fut));
}

WaiterMessage::SyncParents(missing, header) => {
Expand All @@ -244,7 +246,7 @@ impl HeaderWaiter {
self.pending.insert(header_id, (round, tx_cancel));
let fut = Self::wait_for_parents(wait_for, self.certificate_store.clone(), header, rx_cancel);
// pointer-size allocation, bounded by the # of blocks (may eventually go away, see rust RFC #1909)
waiting.push(Box::pin(fut)).await;
waiting.push(Box::pin(fut));

// Ensure we didn't already sent a sync request for these parents.
// Optimistically send the sync request to the node that created the certificate.
Expand Down
6 changes: 0 additions & 6 deletions narwhal/types/src/bounded_future_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ impl<T: Future> std::fmt::Debug for BoundedFuturesUnordered<T> {
}
}

unsafe impl<T: Future> Sync for BoundedFuturesUnordered<T> {}
unsafe impl<T: Future> Send for BoundedFuturesUnordered<T> {}

// We expect to grow this facade over time
impl<T: Future> BoundedFuturesUnordered<T> {
pub fn with_capacity(capacity: usize) -> Self {
Expand Down Expand Up @@ -139,9 +136,6 @@ impl<T: Future> std::fmt::Debug for BoundedFuturesOrdered<T> {
}
}

unsafe impl<T: Future> Sync for BoundedFuturesOrdered<T> {}
unsafe impl<T: Future> Send for BoundedFuturesOrdered<T> {}

// We expect to grow this facade over time
impl<T: Future> BoundedFuturesOrdered<T> {
pub fn with_capacity(capacity: usize) -> Self {
Expand Down
3 changes: 3 additions & 0 deletions narwhal/types/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

#![forbid(unsafe_code)]

// Error types
#[macro_use]
pub mod error;
Expand Down

0 comments on commit af18e2a

Please sign in to comment.