Skip to content

Commit

Permalink
config: better allocation of available ports
Browse files Browse the repository at this point in the history
Its not ideal to attempt to allocate ports for a large number of nodes
up front. Generally the better thing to do would be to always bind to
port 0 and allow from the OS to properly allocate an available port and
then communicate the exact port that a network interface is bound to to
other entities that would like to speak with it inside of a test
environment.

Given the above still requires a bit of work and plumbing to be done the
next best thing is to be a bit more intelligent about how we're
allocating ports when constructing test configs. This patch changes how
ports are allocated by requesting a fresh, available port from the OS by
binding on port 0, immediately establishing a connection, and then
dropping the connection and the listener. This forces the OS to put the
port into the TIME_WAIT state ensuring that the OS won't hand out the
port for some grace period. Then the port can later be used by by
binding on the port using SO_REUSEADDR (which most rust networking
libraries do by default).
  • Loading branch information
bmwill committed May 3, 2022
1 parent a6bee93 commit d1e9dc2
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 53 deletions.
26 changes: 1 addition & 25 deletions crates/sui-network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{
net::TcpListener,
sync::atomic::{AtomicUsize, Ordering},
};
use std::sync::atomic::{AtomicUsize, Ordering};

#[derive(Clone, Debug)]
pub struct NetworkClient {
Expand Down Expand Up @@ -86,24 +83,3 @@ impl NetworkServer {
self.user_errors.fetch_add(1, Ordering::Relaxed);
}
}

pub struct PortAllocator {
next_port: u16,
}

impl PortAllocator {
pub fn new(starting_port: u16) -> Self {
Self {
next_port: starting_port,
}
}
pub fn next_port(&mut self) -> Option<u16> {
for port in self.next_port..65535 {
if TcpListener::bind(("127.0.0.1", port)).is_ok() {
self.next_port = port + 1;
return Some(port);
}
}
None
}
}
35 changes: 10 additions & 25 deletions sui/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use narwhal_config::{
Authority, Committee as ConsensusCommittee, PrimaryAddresses, Stake, WorkerAddresses,
};
use narwhal_crypto::ed25519::Ed25519PublicKey;
use once_cell::sync::Lazy;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::Value;
use serde_with::{hex::Hex, serde_as};
Expand All @@ -18,26 +17,23 @@ use std::{
net::{SocketAddr, ToSocketAddrs},
ops::{Deref, DerefMut},
path::{Path, PathBuf},
sync::Mutex,
};
use sui_framework::DEFAULT_FRAMEWORK_PATH;
use sui_network::network::PortAllocator;
use sui_types::{
base_types::*,
committee::{Committee, EpochId},
crypto::{get_key_pair, KeyPair, PublicKeyBytes},
};
use tracing::log::trace;

pub mod utils;

const DEFAULT_WEIGHT: usize = 1;
const DEFAULT_GAS_AMOUNT: u64 = 100000;
pub const AUTHORITIES_DB_NAME: &str = "authorities_db";
pub const DEFAULT_STARTING_PORT: u16 = 10000;
pub const CONSENSUS_DB_NAME: &str = "consensus_db";

pub static PORT_ALLOCATOR: Lazy<Mutex<PortAllocator>> =
Lazy::new(|| Mutex::new(PortAllocator::new(DEFAULT_STARTING_PORT)));

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct AuthorityInfo {
#[serde(serialize_with = "bytes_as_hex", deserialize_with = "bytes_from_hex")]
Expand Down Expand Up @@ -91,11 +87,7 @@ impl<'de> Deserialize<'de> for AuthorityPrivateInfo {
let port = if let Some(val) = json.get("port") {
u16::deserialize(val).map_err(serde::de::Error::custom)?
} else {
PORT_ALLOCATOR
.lock()
.map_err(serde::de::Error::custom)?
.next_port()
.ok_or_else(|| serde::de::Error::custom("No available port."))?
utils::get_available_port()
};
let db_path = if let Some(val) = json.get("db_path") {
PathBuf::deserialize(val).map_err(serde::de::Error::custom)?
Expand All @@ -112,11 +104,7 @@ impl<'de> Deserialize<'de> for AuthorityPrivateInfo {
let consensus_address = if let Some(val) = json.get("consensus_address") {
SocketAddr::deserialize(val).map_err(serde::de::Error::custom)?
} else {
let port = PORT_ALLOCATOR
.lock()
.map_err(serde::de::Error::custom)?
.next_port()
.ok_or_else(|| serde::de::Error::custom("No available port."))?;
let port = utils::get_available_port();
socket_addr_from_hostport("127.0.0.1", port)
};

Expand Down Expand Up @@ -431,15 +419,12 @@ pub fn make_default_narwhal_committee(
) -> Result<ConsensusCommittee<Ed25519PublicKey>, anyhow::Error> {
let mut ports = Vec::new();
for _ in authorities {
let mut authority_ports = Vec::new();
for _ in 0..4 {
let port = PORT_ALLOCATOR
.lock()
.map_err(|e| anyhow::anyhow!("{e}"))?
.next_port()
.ok_or_else(|| anyhow::anyhow!("No available ports"))?;
authority_ports.push(port + 100);
}
let authority_ports = [
utils::get_available_port(),
utils::get_available_port(),
utils::get_available_port(),
utils::get_available_port(),
];
ports.push(authority_ports);
}

Expand Down
33 changes: 33 additions & 0 deletions sui/src/config/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::net::{TcpListener, TcpStream};

/// Return an ephemeral, available port. On unix systems, the port returned will be in the
/// TIME_WAIT state ensuring that the OS won't hand out this port for some grace period.
/// Callers should be able to bind to this port given they use SO_REUSEADDR.
pub fn get_available_port() -> u16 {
const MAX_PORT_RETRIES: u32 = 1000;

for _ in 0..MAX_PORT_RETRIES {
if let Ok(port) = get_ephemeral_port() {
return port;
}
}

panic!("Error: could not find an available port");
}

fn get_ephemeral_port() -> ::std::io::Result<u16> {
// Request a random available port from the OS
let listener = TcpListener::bind(("localhost", 0))?;
let addr = listener.local_addr()?;

// Create and accept a connection (which we'll promptly drop) in order to force the port
// into the TIME_WAIT state, ensuring that the port will be reserved from some limited
// amount of time (roughly 60s on some Linux systems)
let _sender = TcpStream::connect(addr)?;
let _incoming = listener.accept()?;

Ok(addr.port())
}
6 changes: 3 additions & 3 deletions test_utils/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{test_committee, test_keys};
use narwhal_config::Parameters as ConsensusParameters;
use std::{path::PathBuf, sync::Arc};
use sui::{
config::{make_default_narwhal_committee, AuthorityPrivateInfo, PORT_ALLOCATOR},
config::{make_default_narwhal_committee, utils::get_available_port, AuthorityPrivateInfo},
sui_commands::make_authority,
};
use sui_adapter::genesis;
Expand Down Expand Up @@ -33,8 +33,8 @@ pub fn test_authority_configs() -> (Vec<AuthorityPrivateInfo>, Vec<KeyPair>) {
let authorities = test_keys
.into_iter()
.map(|(address, key)| {
let authority_port = PORT_ALLOCATOR.lock().unwrap().next_port().unwrap();
let consensus_port = PORT_ALLOCATOR.lock().unwrap().next_port().unwrap();
let authority_port = get_available_port();
let consensus_port = get_available_port();

AuthorityPrivateInfo {
address,
Expand Down

0 comments on commit d1e9dc2

Please sign in to comment.