Skip to content

Commit

Permalink
Use single send socket in UdpTpuConnection (solana-labs#26105)
Browse files Browse the repository at this point in the history
  • Loading branch information
pgarg66 authored Jun 21, 2022
1 parent 19eea3a commit 43ff65e
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 56 deletions.
5 changes: 3 additions & 2 deletions banking-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
log::*,
rand::{thread_rng, Rng},
rayon::prelude::*,
solana_client::connection_cache::{ConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE},
solana_client::connection_cache::{ConnectionCache, UseQUIC, DEFAULT_TPU_CONNECTION_POOL_SIZE},
solana_core::banking_stage::BankingStage,
solana_gossip::cluster_info::{ClusterInfo, Node},
solana_ledger::{
Expand Down Expand Up @@ -341,7 +341,8 @@ fn main() {
SocketAddrSpace::Unspecified,
);
let cluster_info = Arc::new(cluster_info);
let tpu_use_quic = matches.is_present("tpu_use_quic");
let tpu_use_quic = UseQUIC::new(matches.is_present("tpu_use_quic"))
.expect("Failed to initialize QUIC flags");
let banking_stage = BankingStage::new_num_threads(
&cluster_info,
&poh_recorder,
Expand Down
8 changes: 5 additions & 3 deletions bench-tps/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use {
keypairs::get_keypairs,
},
solana_client::{
connection_cache::ConnectionCache,
connection_cache::{ConnectionCache, UseQUIC},
rpc_client::RpcClient,
thin_client::ThinClient,
tpu_client::{TpuClient, TpuClientConfig},
Expand Down Expand Up @@ -103,8 +103,9 @@ fn main() {
do_bench_tps(client, cli_config, keypairs);
}
ExternalClientType::ThinClient => {
let use_quic = UseQUIC::new(*use_quic).expect("Failed to initialize QUIC flags");
let connection_cache =
Arc::new(ConnectionCache::new(*use_quic, *tpu_connection_pool_size));
Arc::new(ConnectionCache::new(use_quic, *tpu_connection_pool_size));

let client = if let Ok(rpc_addr) = value_t!(matches, "rpc_addr", String) {
let rpc = rpc_addr.parse().unwrap_or_else(|e| {
Expand Down Expand Up @@ -175,8 +176,9 @@ fn main() {
json_rpc_url.to_string(),
CommitmentConfig::confirmed(),
));
let use_quic = UseQUIC::new(*use_quic).expect("Failed to initialize QUIC flags");
let connection_cache =
Arc::new(ConnectionCache::new(*use_quic, *tpu_connection_pool_size));
Arc::new(ConnectionCache::new(use_quic, *tpu_connection_pool_size));

let client = Arc::new(
TpuClient::new_with_connection_cache(
Expand Down
51 changes: 39 additions & 12 deletions client/src/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ use {
solana_measure::measure::Measure,
solana_sdk::{quic::QUIC_PORT_OFFSET, timing::AtomicInterval},
std::{
net::SocketAddr,
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
sync::{
atomic::{AtomicU64, Ordering},
Arc, RwLock,
},
},
tokio::io,
};

// Should be non-zero
Expand Down Expand Up @@ -217,11 +218,28 @@ impl ConnectionCacheStats {
}
}

pub enum UseQUIC {
Yes,
No(Arc<UdpSocket>),
}

impl UseQUIC {
pub fn new(use_quic: bool) -> io::Result<Self> {
if use_quic {
Ok(UseQUIC::Yes)
} else {
let socket =
solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))?;
Ok(UseQUIC::No(Arc::new(socket)))
}
}
}

pub struct ConnectionCache {
map: RwLock<IndexMap<SocketAddr, ConnectionPool>>,
stats: Arc<ConnectionCacheStats>,
last_stats: AtomicInterval,
use_quic: bool,
use_quic: UseQUIC,
connection_pool_size: usize,
}

Expand Down Expand Up @@ -251,7 +269,7 @@ impl ConnectionPool {
}

impl ConnectionCache {
pub fn new(use_quic: bool, connection_pool_size: usize) -> Self {
pub fn new(use_quic: UseQUIC, connection_pool_size: usize) -> Self {
// The minimum pool size is 1.
let connection_pool_size = 1.max(connection_pool_size);
Self {
Expand All @@ -262,11 +280,14 @@ impl ConnectionCache {
}

pub fn get_use_quic(&self) -> bool {
self.use_quic
match self.use_quic {
UseQUIC::Yes => true,
UseQUIC::No(_) => false,
}
}

fn create_endpoint(&self) -> Option<Arc<QuicLazyInitializedEndpoint>> {
if self.use_quic {
if self.get_use_quic() {
Some(Arc::new(QuicLazyInitializedEndpoint::new()))
} else {
None
Expand Down Expand Up @@ -299,15 +320,16 @@ impl ConnectionCache {

let (cache_hit, connection_cache_stats, num_evictions, eviction_timing_ms) =
if to_create_connection {
let connection: Connection = if self.use_quic {
QuicTpuConnection::new(
let connection: Connection = match &self.use_quic {
UseQUIC::Yes => QuicTpuConnection::new(
endpoint.as_ref().unwrap().clone(),
*addr,
self.stats.clone(),
)
.into()
} else {
UdpTpuConnection::new(*addr, self.stats.clone()).into()
.into(),
UseQUIC::No(socket) => {
UdpTpuConnection::new(socket.clone(), *addr, self.stats.clone()).into()
}
};

let connection = Arc::new(connection);
Expand Down Expand Up @@ -363,7 +385,11 @@ impl ConnectionCache {
let map = self.map.read().unwrap();
get_connection_map_lock_measure.stop();

let port_offset = if self.use_quic { QUIC_PORT_OFFSET } else { 0 };
let port_offset = if self.get_use_quic() {
QUIC_PORT_OFFSET
} else {
0
};

let addr = SocketAddr::new(addr.ip(), addr.port() + port_offset);

Expand Down Expand Up @@ -470,11 +496,12 @@ impl ConnectionCache {

impl Default for ConnectionCache {
fn default() -> Self {
let use_quic = UseQUIC::new(DEFAULT_TPU_USE_QUIC).expect("Failed to initialize QUIC flags");
Self {
map: RwLock::new(IndexMap::with_capacity(MAX_CONNECTIONS)),
stats: Arc::new(ConnectionCacheStats::default()),
last_stats: AtomicInterval::default(),
use_quic: DEFAULT_TPU_USE_QUIC,
use_quic,
connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
}
}
Expand Down
30 changes: 5 additions & 25 deletions client/src/nonblocking/udp_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@
//! an interface for sending transactions
use {
crate::nonblocking::tpu_connection::TpuConnection,
async_trait::async_trait,
core::iter::repeat,
solana_sdk::transport::Result as TransportResult,
solana_streamer::nonblocking::sendmmsg::batch_send,
std::net::{IpAddr, Ipv4Addr, SocketAddr},
crate::nonblocking::tpu_connection::TpuConnection, async_trait::async_trait,
core::iter::repeat, solana_sdk::transport::Result as TransportResult,
solana_streamer::nonblocking::sendmmsg::batch_send, std::net::SocketAddr,
tokio::net::UdpSocket,
};

Expand All @@ -17,14 +14,8 @@ pub struct UdpTpuConnection {
}

impl UdpTpuConnection {
pub fn new(tpu_addr: SocketAddr) -> Self {
let socket =
solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))).unwrap();
pub fn new(tpu_addr: SocketAddr, socket: std::net::UdpSocket) -> Self {
socket.set_nonblocking(true).unwrap();
Self::new_with_std_socket(tpu_addr, socket)
}

pub fn new_with_std_socket(tpu_addr: SocketAddr, socket: std::net::UdpSocket) -> Self {
let socket = UdpSocket::from_std(socket).unwrap();
Self {
socket,
Expand Down Expand Up @@ -92,20 +83,9 @@ mod tests {
async fn test_send_from_addr() {
let addr_str = "0.0.0.0:50100";
let addr = addr_str.parse().unwrap();
let connection = UdpTpuConnection::new(addr);
let reader = UdpSocket::bind(addr_str).await.expect("bind");
check_send_one(&connection, &reader).await;
check_send_batch(&connection, &reader).await;
}

#[tokio::test]
async fn test_send_from_socket() {
let addr_str = "0.0.0.0:50101";
let addr = addr_str.parse().unwrap();
let socket =
solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))).unwrap();
socket.set_nonblocking(true).unwrap();
let connection = UdpTpuConnection::new_with_std_socket(addr, socket);
let connection = UdpTpuConnection::new(addr, socket);
let reader = UdpSocket::bind(addr_str).await.expect("bind");
check_send_one(&connection, &reader).await;
check_send_batch(&connection, &reader).await;
Expand Down
18 changes: 10 additions & 8 deletions client/src/udp_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,30 @@ use {
solana_sdk::transport::Result as TransportResult,
solana_streamer::sendmmsg::batch_send,
std::{
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
net::{SocketAddr, UdpSocket},
sync::Arc,
},
};

pub struct UdpTpuConnection {
socket: UdpSocket,
socket: Arc<UdpSocket>,
addr: SocketAddr,
}

impl UdpTpuConnection {
pub fn new_from_addr(tpu_addr: SocketAddr) -> Self {
let socket =
solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))).unwrap();
pub fn new_from_addr(local_socket: Arc<UdpSocket>, tpu_addr: SocketAddr) -> Self {
Self {
socket,
socket: local_socket,
addr: tpu_addr,
}
}

pub fn new(tpu_addr: SocketAddr, _connection_stats: Arc<ConnectionCacheStats>) -> Self {
Self::new_from_addr(tpu_addr)
pub fn new(
local_socket: Arc<UdpSocket>,
tpu_addr: SocketAddr,
_connection_stats: Arc<ConnectionCacheStats>,
) -> Self {
Self::new_from_addr(local_socket, tpu_addr)
}
}

Expand Down
3 changes: 2 additions & 1 deletion core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use {
},
crossbeam_channel::{bounded, unbounded, Receiver},
rand::{thread_rng, Rng},
solana_client::connection_cache::ConnectionCache,
solana_client::connection_cache::{ConnectionCache, UseQUIC},
solana_entry::poh::compute_hash_time_ns,
solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService,
solana_gossip::{
Expand Down Expand Up @@ -753,6 +753,7 @@ impl Validator {
};
let poh_recorder = Arc::new(Mutex::new(poh_recorder));

let use_quic = UseQUIC::new(use_quic).expect("Failed to initialize QUIC flags");
let connection_cache = Arc::new(ConnectionCache::new(use_quic, tpu_connection_pool_size));

let rpc_override_health_check = Arc::new(AtomicBool::new(false));
Expand Down
7 changes: 5 additions & 2 deletions dos/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use {
rand::{thread_rng, Rng},
solana_bench_tps::{bench::generate_and_fund_keypairs, bench_tps_client::BenchTpsClient},
solana_client::{
connection_cache::{ConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE},
connection_cache::{ConnectionCache, UseQUIC, DEFAULT_TPU_CONNECTION_POOL_SIZE},
rpc_client::RpcClient,
tpu_connection::TpuConnection,
},
Expand Down Expand Up @@ -423,6 +423,7 @@ fn run_dos_transactions<T: 'static + BenchTpsClient + Send + Sync>(
//let connection_cache_stats = Arc::new(ConnectionCacheStats::default());
//let udp_client = UdpTpuConnection::new(target, connection_cache_stats);

let tpu_use_quic = UseQUIC::new(tpu_use_quic).expect("Failed to initialize QUIC flags");
let connection_cache = ConnectionCache::new(tpu_use_quic, DEFAULT_TPU_CONNECTION_POOL_SIZE);
let connection = connection_cache.get_connection(&target);

Expand Down Expand Up @@ -621,8 +622,10 @@ fn main() {
exit(1);
});

let tpu_use_quic =
UseQUIC::new(cmd_params.tpu_use_quic).expect("Failed to initialize QUIC flags");
let connection_cache = Arc::new(ConnectionCache::new(
cmd_params.tpu_use_quic,
tpu_use_quic,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
));
let (client, num_clients) =
Expand Down
6 changes: 4 additions & 2 deletions local-cluster/src/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use {
log::*,
solana_client::{
connection_cache::{
ConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC,
ConnectionCache, UseQUIC, DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC,
},
thin_client::ThinClient,
},
Expand Down Expand Up @@ -277,13 +277,15 @@ impl LocalCluster {

validators.insert(leader_pubkey, cluster_leader);

let tpu_use_quic =
UseQUIC::new(config.tpu_use_quic).expect("Failed to initialize QUIC flags");
let mut cluster = Self {
funding_keypair: mint_keypair,
entry_point_info: leader_contact_info,
validators,
genesis_config,
connection_cache: Arc::new(ConnectionCache::new(
config.tpu_use_quic,
tpu_use_quic,
config.tpu_connection_pool_size,
)),
};
Expand Down
3 changes: 2 additions & 1 deletion rpc-test/tests/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use {
solana_account_decoder::UiAccount,
solana_client::{
client_error::{ClientErrorKind, Result as ClientResult},
connection_cache::{ConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE},
connection_cache::{ConnectionCache, UseQUIC, DEFAULT_TPU_CONNECTION_POOL_SIZE},
nonblocking::pubsub_client::PubsubClient,
rpc_client::RpcClient,
rpc_config::{RpcAccountInfoConfig, RpcSignatureSubscribeConfig},
Expand Down Expand Up @@ -420,6 +420,7 @@ fn run_tpu_send_transaction(tpu_use_quic: bool) {
test_validator.rpc_url(),
CommitmentConfig::processed(),
));
let tpu_use_quic = UseQUIC::new(tpu_use_quic).expect("Failed to initialize QUIC flags");
let connection_cache = Arc::new(ConnectionCache::new(
tpu_use_quic,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
Expand Down

0 comments on commit 43ff65e

Please sign in to comment.