From 281dbf4663033431aa766f3c32db8e041232c6fc Mon Sep 17 00:00:00 2001 From: George Danezis Date: Thu, 20 Jan 2022 18:58:03 +0000 Subject: [PATCH] [fastx net] Remove UDP and replace hand-crafted traits with async_trait (#184) * Use async trait * Removed all traces of UDP * Adapted new async function to async-trait syntax Co-authored-by: George Danezis --- fastpay/Cargo.toml | 1 + fastpay/src/bench.rs | 13 +- fastpay/src/client.rs | 2 - fastpay/src/config.rs | 2 - fastpay/src/network.rs | 88 ++--- fastpay/src/server.rs | 7 - fastpay/src/transport.rs | 309 +++++------------- fastpay/src/unit_tests/transport_tests.rs | 20 +- fastpay_core/Cargo.toml | 1 + fastpay_core/src/client.rs | 280 ++++++++-------- fastpay_core/src/downloader.rs | 6 +- fastpay_core/src/unit_tests/client_tests.rs | 38 ++- .../src/unit_tests/downloader_tests.rs | 7 +- scripts/bench.sh | 2 +- scripts/fabfile.py | 4 +- 15 files changed, 287 insertions(+), 493 deletions(-) diff --git a/fastpay/Cargo.toml b/fastpay/Cargo.toml index 45c19e301f175..baeceb2fa28e2 100644 --- a/fastpay/Cargo.toml +++ b/fastpay/Cargo.toml @@ -28,6 +28,7 @@ base64 = "0.13.0" ed25519-dalek = { version = "1.0.1", features = ["batch", "serde"] } rocksdb = "0.17.0" hex = "0.4.3" +async-trait = "0.1.52" bcs = "0.1.3" fastpay_core = { path = "../fastpay_core" } diff --git a/fastpay/src/bench.rs b/fastpay/src/bench.rs index a2e490346b309..a378b09121783 100644 --- a/fastpay/src/bench.rs +++ b/fastpay/src/bench.rs @@ -32,9 +32,6 @@ use strum_macros::EnumString; about = "Local end-to-end test and benchmark of the FastPay protocol" )] struct ClientServerBenchmark { - /// Choose a network protocol between Udp and Tcp - #[structopt(long, default_value = "tcp")] - protocol: transport::NetworkProtocol, /// Hostname #[structopt(long, default_value = "127.0.0.1")] host: String, @@ -266,13 +263,7 @@ impl ClientServerBenchmark { } async fn spawn_server(&self, state: AuthorityState) -> transport::SpawnedServer { - let server = network::Server::new( - self.protocol, - self.host.clone(), - self.port, - state, - self.buffer_size, - ); + let server = network::Server::new(self.host.clone(), self.port, state, self.buffer_size); server.spawn().await.unwrap() } @@ -294,7 +285,6 @@ impl ClientServerBenchmark { info!("Sending requests."); if self.max_in_flight > 0 { let mass_client = network::MassClient::new( - self.protocol, self.host.clone(), self.port, self.buffer_size, @@ -325,7 +315,6 @@ impl ClientServerBenchmark { } else { // Use actual client core let client = network::Client::new( - self.protocol, self.host.clone(), self.port, self.buffer_size, diff --git a/fastpay/src/client.rs b/fastpay/src/client.rs index ee3b2f213212e..32b22da197582 100644 --- a/fastpay/src/client.rs +++ b/fastpay/src/client.rs @@ -28,7 +28,6 @@ fn make_authority_clients( for config in &committee_config.authorities { let config = config.clone(); let client = network::Client::new( - config.network_protocol, config.host, config.base_port, buffer_size, @@ -50,7 +49,6 @@ fn make_authority_mass_clients( let mut authority_clients = Vec::new(); for config in &committee_config.authorities { let client = network::MassClient::new( - config.network_protocol, config.host.clone(), config.base_port, buffer_size, diff --git a/fastpay/src/config.rs b/fastpay/src/config.rs index 2d9a644684e9d..091b327759aa9 100644 --- a/fastpay/src/config.rs +++ b/fastpay/src/config.rs @@ -1,7 +1,6 @@ // Copyright (c) Facebook, Inc. and its affiliates. // SPDX-License-Identifier: Apache-2.0 -use crate::transport::NetworkProtocol; use fastpay_core::client::ClientState; use fastx_types::{ base_types::*, @@ -19,7 +18,6 @@ use std::{ }; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AuthorityConfig { - pub network_protocol: NetworkProtocol, #[serde( serialize_with = "address_as_hex", deserialize_with = "address_from_hex" diff --git a/fastpay/src/network.rs b/fastpay/src/network.rs index e70fee76ff9c5..619e4aa241caa 100644 --- a/fastpay/src/network.rs +++ b/fastpay/src/network.rs @@ -5,6 +5,7 @@ use crate::transport::*; use fastpay_core::{authority::*, client::*}; use fastx_types::{error::*, messages::*, serialize::*}; +use async_trait::async_trait; use bytes::Bytes; use futures::future::FutureExt; use log::*; @@ -13,7 +14,6 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use tokio::time; pub struct Server { - network_protocol: NetworkProtocol, base_address: String, base_port: u32, state: AuthorityState, @@ -25,14 +25,12 @@ pub struct Server { impl Server { pub fn new( - network_protocol: NetworkProtocol, base_address: String, base_port: u32, state: AuthorityState, buffer_size: usize, ) -> Self { Self { - network_protocol, base_address, base_port, state, @@ -52,16 +50,15 @@ impl Server { pub async fn spawn(self) -> Result { info!( - "Listening to {} traffic on {}:{}", - self.network_protocol, self.base_address, self.base_port + "Listening to TCP traffic on {}:{}", + self.base_address, self.base_port ); let address = format!("{}:{}", self.base_address, self.base_port); - let buffer_size = self.buffer_size; - let protocol = self.network_protocol; let state = RunningServerState { server: self }; + // Launch server for the appropriate protocol. - protocol.spawn_server(&address, state, buffer_size).await + spawn_server(&address, state, buffer_size).await } } @@ -147,7 +144,6 @@ impl MessageHandler for RunningServerState { #[derive(Clone)] pub struct Client { - network_protocol: NetworkProtocol, base_address: String, base_port: u32, buffer_size: usize, @@ -157,7 +153,6 @@ pub struct Client { impl Client { pub fn new( - network_protocol: NetworkProtocol, base_address: String, base_port: u32, buffer_size: usize, @@ -165,7 +160,6 @@ impl Client { recv_timeout: std::time::Duration, ) -> Self { Self { - network_protocol, base_address, base_port, buffer_size, @@ -176,10 +170,7 @@ impl Client { async fn send_recv_bytes_internal(&self, buf: Vec) -> Result, io::Error> { let address = format!("{}:{}", self.base_address, self.base_port); - let mut stream = self - .network_protocol - .connect(address, self.buffer_size) - .await?; + let mut stream = connect(address, self.buffer_size).await?; // Send message time::timeout(self.send_timeout, stream.write_data(&buf)).await??; // Wait for reply @@ -208,56 +199,48 @@ impl Client { } } +#[async_trait] impl AuthorityClient for Client { /// Initiate a new transfer to a FastPay or Primary account. - fn handle_order(&mut self, order: Order) -> AsyncResult<'_, OrderInfoResponse, FastPayError> { - Box::pin(async move { - self.send_recv_bytes(serialize_order(&order), order_info_deserializer) - .await - }) + async fn handle_order(&mut self, order: Order) -> Result { + self.send_recv_bytes(serialize_order(&order), order_info_deserializer) + .await } /// Confirm a transfer to a FastPay or Primary account. - fn handle_confirmation_order( + async fn handle_confirmation_order( &mut self, order: ConfirmationOrder, - ) -> AsyncResult<'_, OrderInfoResponse, FastPayError> { - Box::pin(async move { - self.send_recv_bytes(serialize_cert(&order.certificate), order_info_deserializer) - .await - }) + ) -> Result { + self.send_recv_bytes(serialize_cert(&order.certificate), order_info_deserializer) + .await } - fn handle_account_info_request( + async fn handle_account_info_request( &self, request: AccountInfoRequest, - ) -> AsyncResult<'_, AccountInfoResponse, FastPayError> { - Box::pin(async move { - self.send_recv_bytes( - serialize_account_info_request(&request), - account_info_deserializer, - ) - .await - }) + ) -> Result { + self.send_recv_bytes( + serialize_account_info_request(&request), + account_info_deserializer, + ) + .await } - fn handle_object_info_request( + async fn handle_object_info_request( &self, request: ObjectInfoRequest, - ) -> AsyncResult<'_, ObjectInfoResponse, FastPayError> { - Box::pin(async move { - self.send_recv_bytes( - serialize_object_info_request(&request), - object_info_deserializer, - ) - .await - }) + ) -> Result { + self.send_recv_bytes( + serialize_object_info_request(&request), + object_info_deserializer, + ) + .await } } #[derive(Clone)] pub struct MassClient { - network_protocol: NetworkProtocol, base_address: String, base_port: u32, buffer_size: usize, @@ -268,7 +251,6 @@ pub struct MassClient { impl MassClient { pub fn new( - network_protocol: NetworkProtocol, base_address: String, base_port: u32, buffer_size: usize, @@ -277,7 +259,6 @@ impl MassClient { max_in_flight: u64, ) -> Self { Self { - network_protocol, base_address, base_port, buffer_size, @@ -289,10 +270,7 @@ impl MassClient { async fn run_core(&self, requests: Vec) -> Result, io::Error> { let address = format!("{}:{}", self.base_address, self.base_port); - let mut stream = self - .network_protocol - .connect(address, self.buffer_size) - .await?; + let mut stream = connect(address, self.buffer_size).await?; let mut requests = requests.iter(); let mut in_flight: u64 = 0; let mut responses = Vec::new(); @@ -360,16 +338,16 @@ impl MassClient { handles.push( tokio::spawn(async move { info!( - "Sending {} requests to {}:{}", - client.network_protocol, client.base_address, client.base_port, + "Sending TCP requests to {}:{}", + client.base_address, client.base_port, ); let responses = client .run_core(requests) .await .unwrap_or_else(|_| Vec::new()); info!( - "Done sending {} requests to {}:{}", - client.network_protocol, client.base_address, client.base_port, + "Done sending TCP requests to {}:{}", + client.base_address, client.base_port, ); responses }) diff --git a/fastpay/src/server.rs b/fastpay/src/server.rs index ad3746bf58459..4fb6ce116294f 100644 --- a/fastpay/src/server.rs +++ b/fastpay/src/server.rs @@ -60,7 +60,6 @@ fn make_server( }); network::Server::new( - server_config.authority.network_protocol, local_ip_addr.to_string(), server_config.authority.base_port, state, @@ -104,10 +103,6 @@ enum ServerCommands { /// Generate a new server configuration and output its public description #[structopt(name = "generate")] Generate { - /// Chooses a network protocol between Udp and Tcp - #[structopt(long, default_value = "Udp")] - protocol: transport::NetworkProtocol, - /// Sets the public name of the host #[structopt(long)] host: String, @@ -164,14 +159,12 @@ fn main() { } ServerCommands::Generate { - protocol, host, port, database_path, } => { let (address, key) = get_key_pair(); let authority = AuthorityConfig { - network_protocol: protocol, address, host, base_port: port, diff --git a/fastpay/src/transport.rs b/fastpay/src/transport.rs index 29d46340f0c6e..41a0701fd221d 100644 --- a/fastpay/src/transport.rs +++ b/fastpay/src/transport.rs @@ -1,17 +1,15 @@ // Copyright (c) Facebook, Inc. and its affiliates. // SPDX-License-Identifier: Apache-2.0 -use clap::arg_enum; use futures::future; use log::*; -use serde::{Deserialize, Serialize}; use std::io::ErrorKind; use std::{collections::HashMap, convert::TryInto, io, sync::Arc}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpSocket; use tokio::{ io::{AsyncRead, AsyncWrite}, - net::{TcpListener, TcpStream, UdpSocket}, + net::{TcpListener, TcpStream}, }; #[cfg(test)] @@ -21,33 +19,6 @@ mod transport_tests; /// Suggested buffer size pub const DEFAULT_MAX_DATAGRAM_SIZE: &str = "65507"; -// Supported transport protocols. -arg_enum! { - #[derive(Clone, Copy, Debug, Serialize, Deserialize)] - pub enum NetworkProtocol { - Udp, - Tcp, - } -} - -/// How to send and obtain data packets over an "active socket". -pub trait DataStream: Send { - fn write_data<'a>( - &'a mut self, - buffer: &'a [u8], - ) -> future::BoxFuture<'a, Result<(), std::io::Error>>; - fn read_data(&mut self) -> future::BoxFuture<'_, Result, std::io::Error>>; -} - -/// A pool of (outgoing) data streams. -pub trait DataStreamPool: Send { - fn send_data_to<'a>( - &'a mut self, - buffer: &'a [u8], - address: &'a str, - ) -> future::BoxFuture<'a, Result<(), io::Error>>; -} - /// The handler required to create a service. pub trait MessageHandler { fn handle_message<'a>(&'a self, buffer: &'a [u8]) -> future::BoxFuture<'a, Option>>; @@ -73,157 +44,42 @@ impl SpawnedServer { } } -impl NetworkProtocol { - /// Create a DataStream for this protocol. - pub async fn connect( - self, - address: String, - max_data_size: usize, - ) -> Result, std::io::Error> { - let stream: Box = match self { - NetworkProtocol::Udp => Box::new(UdpDataStream::connect(address, max_data_size).await?), - NetworkProtocol::Tcp => Box::new(TcpDataStream::connect(address, max_data_size).await?), - }; - Ok(stream) - } - - /// Create a DataStreamPool for this protocol. - pub async fn make_outgoing_connection_pool( - self, - ) -> Result, std::io::Error> { - let pool: Box = match self { - Self::Udp => Box::new(UdpDataStreamPool::new().await?), - Self::Tcp => Box::new(TcpDataStreamPool::new().await?), - }; - Ok(pool) - } - - /// Run a server for this protocol and the given message handler. - pub async fn spawn_server( - self, - address: &str, - state: S, - buffer_size: usize, - ) -> Result - where - S: MessageHandler + Send + Sync + 'static, - { - let (complete, receiver) = futures::channel::oneshot::channel(); - let handle = match self { - Self::Udp => { - let socket = UdpSocket::bind(&address).await?; - tokio::spawn(Self::run_udp_server(socket, state, receiver, buffer_size)) - } - Self::Tcp => { - // see https://fly.io/blog/the-tokio-1-x-upgrade/#tcplistener-from_std-needs-to-be-set-to-nonblocking - let std_listener = std::net::TcpListener::bind(address)?; - std_listener.set_nonblocking(true)?; - let listener = TcpListener::from_std(std_listener)?; - - tokio::spawn(Self::run_tcp_server(listener, state, receiver, buffer_size)) - } - }; - Ok(SpawnedServer { complete, handle }) - } -} - -/// An implementation of DataStream based on UDP. -struct UdpDataStream { - socket: UdpSocket, +/// Create a DataStream for this protocol. +pub async fn connect( address: String, - buffer: Vec, -} - -impl UdpDataStream { - async fn connect(address: String, max_data_size: usize) -> Result { - let socket = UdpSocket::bind(&"0.0.0.0:0").await?; - let buffer = vec![0u8; max_data_size]; - Ok(Self { - socket, - address, - buffer, - }) - } -} - -impl DataStream for UdpDataStream { - fn write_data<'a>( - &'a mut self, - buffer: &'a [u8], - ) -> future::BoxFuture<'a, Result<(), std::io::Error>> { - Box::pin(async move { - self.socket.send_to(buffer, &*self.address).await?; - Ok(()) - }) - } - - fn read_data(&mut self) -> future::BoxFuture<'_, Result, std::io::Error>> { - Box::pin(async move { - let size = self.socket.recv(&mut self.buffer).await?; - Ok(self.buffer[..size].into()) - }) - } + max_data_size: usize, +) -> Result { + Ok(TcpDataStream::connect(address, max_data_size).await?) } -/// An implementation of DataStreamPool based on UDP. -struct UdpDataStreamPool { - socket: UdpSocket, +/// Create a DataStreamPool for this protocol. +pub async fn make_outgoing_connection_pool() -> Result { + Ok(TcpDataStreamPool::new().await?) } -impl UdpDataStreamPool { - async fn new() -> Result { - let socket = UdpSocket::bind(&"0.0.0.0:0").await?; - Ok(Self { socket }) - } -} +/// Run a server for this protocol and the given message handler. +pub async fn spawn_server( + address: &str, + state: S, + buffer_size: usize, +) -> Result +where + S: MessageHandler + Send + Sync + 'static, +{ + let (complete, receiver) = futures::channel::oneshot::channel(); + let handle = { + // see https://fly.io/blog/the-tokio-1-x-upgrade/#tcplistener-from_std-needs-to-be-set-to-nonblocking + let std_listener = std::net::TcpListener::bind(address)?; + std_listener.set_nonblocking(true)?; + let listener = TcpListener::from_std(std_listener)?; -impl DataStreamPool for UdpDataStreamPool { - fn send_data_to<'a>( - &'a mut self, - buffer: &'a [u8], - address: &'a str, - ) -> future::BoxFuture<'a, Result<(), std::io::Error>> { - Box::pin(async move { - self.socket.send_to(buffer, address).await?; - Ok(()) - }) - } -} - -// Server implementation for UDP. -impl NetworkProtocol { - async fn run_udp_server( - socket: UdpSocket, - state: S, - mut exit_future: futures::channel::oneshot::Receiver<()>, - buffer_size: usize, - ) -> Result<(), std::io::Error> - where - S: MessageHandler + Send + 'static, - { - let mut buffer = vec![0; buffer_size]; - loop { - let (size, peer) = - match future::select(exit_future, Box::pin(socket.recv_from(&mut buffer))).await { - future::Either::Left(_) => break, - future::Either::Right((value, new_exit_future)) => { - exit_future = new_exit_future; - value? - } - }; - if let Some(reply) = state.handle_message(&buffer[..size]).await { - let status = socket.send_to(&reply[..], &peer).await; - if let Err(error) = status { - error!("Failed to send query response: {}", error); - } - } - } - Ok(()) - } + tokio::spawn(run_tcp_server(listener, state, receiver, buffer_size)) + }; + Ok(SpawnedServer { complete, handle }) } /// An implementation of DataStream based on TCP. -struct TcpDataStream { +pub struct TcpDataStream { stream: TcpStream, max_data_size: usize, } @@ -278,21 +134,18 @@ impl TcpDataStream { } } -impl DataStream for TcpDataStream { - fn write_data<'a>( - &'a mut self, - buffer: &'a [u8], - ) -> future::BoxFuture<'a, Result<(), std::io::Error>> { - Box::pin(Self::tcp_write_data(&mut self.stream, buffer)) +impl TcpDataStream { + pub async fn write_data<'a>(&'a mut self, buffer: &'a [u8]) -> Result<(), std::io::Error> { + Self::tcp_write_data(&mut self.stream, buffer).await } - fn read_data(&mut self) -> future::BoxFuture<'_, Result, std::io::Error>> { - Box::pin(Self::tcp_read_data(&mut self.stream, self.max_data_size)) + pub async fn read_data(&mut self) -> Result, std::io::Error> { + Self::tcp_read_data(&mut self.stream, self.max_data_size).await } } /// An implementation of DataStreamPool based on TCP. -struct TcpDataStreamPool { +pub struct TcpDataStreamPool { streams: HashMap, } @@ -318,65 +171,59 @@ impl TcpDataStreamPool { } } -impl DataStreamPool for TcpDataStreamPool { - fn send_data_to<'a>( +impl TcpDataStreamPool { + pub async fn send_data_to<'a>( &'a mut self, buffer: &'a [u8], address: &'a str, - ) -> future::BoxFuture<'a, Result<(), std::io::Error>> { - Box::pin(async move { - let stream = self.get_stream(address).await?; - TcpDataStream::tcp_write_data(stream, buffer).await - }) + ) -> Result<(), std::io::Error> { + let stream = self.get_stream(address).await?; + TcpDataStream::tcp_write_data(stream, buffer).await } } // Server implementation for TCP. -impl NetworkProtocol { - async fn run_tcp_server( - listener: TcpListener, - state: S, - mut exit_future: futures::channel::oneshot::Receiver<()>, - buffer_size: usize, - ) -> Result<(), std::io::Error> - where - S: MessageHandler + Send + Sync + 'static, - { - let guarded_state = Arc::new(Box::new(state)); - loop { - let (mut stream, _) = - match future::select(exit_future, Box::pin(listener.accept())).await { - future::Either::Left(_) => break, - future::Either::Right((value, new_exit_future)) => { - exit_future = new_exit_future; - value? - } - }; +async fn run_tcp_server( + listener: TcpListener, + state: S, + mut exit_future: futures::channel::oneshot::Receiver<()>, + buffer_size: usize, +) -> Result<(), std::io::Error> +where + S: MessageHandler + Send + Sync + 'static, +{ + let guarded_state = Arc::new(Box::new(state)); + loop { + let (mut stream, _) = match future::select(exit_future, Box::pin(listener.accept())).await { + future::Either::Left(_) => break, + future::Either::Right((value, new_exit_future)) => { + exit_future = new_exit_future; + value? + } + }; - let guarded_state = guarded_state.clone(); - tokio::spawn(async move { - loop { - let buffer = match TcpDataStream::tcp_read_data(&mut stream, buffer_size).await - { - Ok(buffer) => buffer, - Err(err) => { - // We expect an EOF error at the end. - if err.kind() != io::ErrorKind::UnexpectedEof { - error!("Error while reading TCP stream: {}", err); - } - break; + let guarded_state = guarded_state.clone(); + tokio::spawn(async move { + loop { + let buffer = match TcpDataStream::tcp_read_data(&mut stream, buffer_size).await { + Ok(buffer) => buffer, + Err(err) => { + // We expect an EOF error at the end. + if err.kind() != io::ErrorKind::UnexpectedEof { + error!("Error while reading TCP stream: {}", err); } - }; + break; + } + }; - if let Some(reply) = guarded_state.handle_message(&buffer[..]).await { - let status = TcpDataStream::tcp_write_data(&mut stream, &reply[..]).await; - if let Err(error) = status { - error!("Failed to send query response: {}", error); - } - }; - } - }); - } - Ok(()) + if let Some(reply) = guarded_state.handle_message(&buffer[..]).await { + let status = TcpDataStream::tcp_write_data(&mut stream, &reply[..]).await; + if let Err(error) = status { + error!("Failed to send query response: {}", error); + } + }; + } + }); } + Ok(()) } diff --git a/fastpay/src/unit_tests/transport_tests.rs b/fastpay/src/unit_tests/transport_tests.rs index 8d93dde37fc1e..531720b3897cf 100644 --- a/fastpay/src/unit_tests/transport_tests.rs +++ b/fastpay/src/unit_tests/transport_tests.rs @@ -32,24 +32,22 @@ impl MessageHandler for TestService { } } -async fn test_server(protocol: NetworkProtocol) -> Result<(usize, usize), std::io::Error> { +async fn test_server() -> Result<(usize, usize), std::io::Error> { let address = get_new_local_address().await.unwrap(); let counter = Arc::new(AtomicUsize::new(0)); let mut received = 0; - let server = protocol - .spawn_server(&address, TestService::new(counter.clone()), 100) - .await?; + let server = spawn_server(&address, TestService::new(counter.clone()), 100).await?; - let mut client = protocol.connect(address.clone(), 1000).await?; + let mut client = connect(address.clone(), 1000).await?; client.write_data(b"abcdef").await?; received += client.read_data().await?.len(); client.write_data(b"abcd").await?; received += client.read_data().await?.len(); // Use a second connection (here pooled). - let mut pool = protocol.make_outgoing_connection_pool().await?; + let mut pool = make_outgoing_connection_pool().await?; pool.send_data_to(b"abc", &address).await?; // Try to read data on the first connection (should fail). @@ -72,18 +70,10 @@ async fn test_server(protocol: NetworkProtocol) -> Result<(usize, usize), std::i Ok((counter.load(Ordering::Relaxed), received)) } -#[test] -fn udp_server() { - let rt = Runtime::new().unwrap(); - let (processed, received) = rt.block_on(test_server(NetworkProtocol::Udp)).unwrap(); - assert_eq!(processed, 13); - assert_eq!(received, 10); -} - #[test] fn tcp_server() { let rt = Runtime::new().unwrap(); - let (processed, received) = rt.block_on(test_server(NetworkProtocol::Tcp)).unwrap(); + let (processed, received) = rt.block_on(test_server()).unwrap(); // Active TCP connections are allowed to finish before the server is gracefully killed. assert_eq!(processed, 17); assert_eq!(received, 14); diff --git a/fastpay_core/Cargo.toml b/fastpay_core/Cargo.toml index 29187e483203b..de3d6b68ccafc 100644 --- a/fastpay_core/Cargo.toml +++ b/fastpay_core/Cargo.toml @@ -16,6 +16,7 @@ serde = { version = "1.0.133", features = ["derive"] } tokio = { version = "1.15.0", features = ["full"] } parking_lot = "0.11.2" itertools = "0.10.3" +async-trait = "0.1.52" fastx-adapter = { path = "../fastx_programmability/adapter" } fastx-framework = { path = "../fastx_programmability/framework" } diff --git a/fastpay_core/src/client.rs b/fastpay_core/src/client.rs index e4835f669fb1f..62eb36fd27268 100644 --- a/fastpay_core/src/client.rs +++ b/fastpay_core/src/client.rs @@ -3,6 +3,7 @@ use crate::downloader::*; use anyhow::{bail, ensure}; +use async_trait::async_trait; use fastx_types::messages::Address::FastPay; use fastx_types::{ base_types::*, committee::Committee, error::FastPayError, fp_ensure, messages::*, @@ -25,27 +26,28 @@ const AUTHORITY_REQUEST_TIMEOUT: Duration = Duration::from_secs(60); pub type AsyncResult<'a, T, E> = future::BoxFuture<'a, Result>; +#[async_trait] pub trait AuthorityClient { /// Initiate a new order to a FastPay or Primary account. - fn handle_order(&mut self, order: Order) -> AsyncResult<'_, OrderInfoResponse, FastPayError>; + async fn handle_order(&mut self, order: Order) -> Result; /// Confirm an order to a FastPay or Primary account. - fn handle_confirmation_order( + async fn handle_confirmation_order( &mut self, order: ConfirmationOrder, - ) -> AsyncResult<'_, OrderInfoResponse, FastPayError>; + ) -> Result; /// Handle Account information requests for this account. - fn handle_account_info_request( + async fn handle_account_info_request( &self, request: AccountInfoRequest, - ) -> AsyncResult<'_, AccountInfoResponse, FastPayError>; + ) -> Result; /// Handle Object information requests for this account. - fn handle_object_info_request( + async fn handle_object_info_request( &self, request: ObjectInfoRequest, - ) -> AsyncResult<'_, ObjectInfoResponse, FastPayError>; + ) -> Result; } pub struct ClientState { @@ -70,39 +72,36 @@ pub struct ClientState { } // Operations are considered successful when they successfully reach a quorum of authorities. +#[async_trait] pub trait Client { /// Send object to a FastX account. - fn transfer_object( + async fn transfer_object( &mut self, object_id: ObjectID, gas_payment: ObjectID, recipient: FastPayAddress, - ) -> AsyncResult<'_, CertifiedOrder, anyhow::Error>; + ) -> Result; /// Receive object from FastX. - fn receive_object(&mut self, certificate: CertifiedOrder) - -> AsyncResult<'_, (), anyhow::Error>; + async fn receive_object(&mut self, certificate: CertifiedOrder) -> Result<(), anyhow::Error>; /// Send object to a FastX account. /// Do not confirm the transaction. - fn transfer_to_fastx_unsafe_unconfirmed( + async fn transfer_to_fastx_unsafe_unconfirmed( &mut self, recipient: FastPayAddress, object_id: ObjectID, gas_payment: ObjectID, - ) -> AsyncResult<'_, CertifiedOrder, anyhow::Error>; + ) -> Result; /// Synchronise client state with a random authorities, updates all object_ids and certificates, request only goes out to one authority. /// this method doesn't guarantee data correctness, client will have to handle potential byzantine authority - fn sync_client_state_with_random_authority( + async fn sync_client_state_with_random_authority( &mut self, - ) -> AsyncResult<'_, AuthorityName, anyhow::Error>; - - /// Get all object we own. - fn get_owned_objects(&self) -> AsyncResult<'_, Vec, anyhow::Error>; + ) -> Result; /// Call move functions in the module in the given package, with args supplied - fn move_call( + async fn move_call( &mut self, package_object_ref: ObjectRef, module: Identifier, @@ -112,13 +111,16 @@ pub trait Client { object_arguments: Vec, pure_arguments: Vec>, gas_budget: u64, - ) -> AsyncResult<'_, (CertifiedOrder, OrderEffects), anyhow::Error>; + ) -> Result<(CertifiedOrder, OrderEffects), anyhow::Error>; /// Get the object information - fn get_object_info( + async fn get_object_info( &mut self, object_info_req: ObjectInfoRequest, - ) -> AsyncResult<'_, ObjectInfoResponse, anyhow::Error>; + ) -> Result; + + /// Get all object we own. + async fn get_owned_objects(&self) -> Result, anyhow::Error>; } impl ClientState { @@ -205,6 +207,7 @@ impl CertificateRequester { } } +#[async_trait] impl Requester for CertificateRequester where A: AuthorityClient + Send + Sync + 'static + Clone, @@ -213,39 +216,35 @@ where type Value = Result; /// Try to find a certificate for the given sender and sequence number. - fn query( + async fn query( &mut self, sequence_number: SequenceNumber, - ) -> AsyncResult<'_, CertifiedOrder, FastPayError> { - Box::pin(async move { - let request = ObjectInfoRequest { - object_id: self.object_id, - request_sequence_number: Some(sequence_number), - request_received_transfers_excluding_first_nth: None, - }; - // Sequentially try each authority in random order. - // TODO: Improve shuffle, different authorities might different amount of stake. - self.authority_clients.shuffle(&mut rand::thread_rng()); - for client in self.authority_clients.iter_mut() { - let result = client.handle_object_info_request(request.clone()).await; - if let Ok(response) = result { - let certificate = response.requested_certificate.unwrap(); - if certificate.check(&self.committee).is_ok() { - let order = &certificate.order; - if let Some(sender) = self.sender { - if order.sender() == &sender - && order.sequence_number() == sequence_number - { - return Ok(certificate.clone()); - } - } else { + ) -> Result { + let request = ObjectInfoRequest { + object_id: self.object_id, + request_sequence_number: Some(sequence_number), + request_received_transfers_excluding_first_nth: None, + }; + // Sequentially try each authority in random order. + // TODO: Improve shuffle, different authorities might different amount of stake. + self.authority_clients.shuffle(&mut rand::thread_rng()); + for client in self.authority_clients.iter_mut() { + let result = client.handle_object_info_request(request.clone()).await; + if let Ok(response) = result { + let certificate = response.requested_certificate.unwrap(); + if certificate.check(&self.committee).is_ok() { + let order = &certificate.order; + if let Some(sender) = self.sender { + if order.sender() == &sender && order.sequence_number() == sequence_number { return Ok(certificate.clone()); } + } else { + return Ok(certificate.clone()); } } } - Err(FastPayError::ErrorWhileRequestingCertificate) - }) + } + Err(FastPayError::ErrorWhileRequestingCertificate) } } @@ -895,123 +894,114 @@ where } } +#[async_trait] impl Client for ClientState where A: AuthorityClient + Send + Sync + Clone + 'static, { - fn transfer_object( + async fn transfer_object( &mut self, object_id: ObjectID, gas_payment: ObjectID, recipient: FastPayAddress, - ) -> AsyncResult<'_, CertifiedOrder, anyhow::Error> { - Box::pin(self.transfer(object_id, gas_payment, Address::FastPay(recipient))) + ) -> Result { + self.transfer(object_id, gas_payment, Address::FastPay(recipient)) + .await } - fn receive_object( - &mut self, - certificate: CertifiedOrder, - ) -> AsyncResult<'_, (), anyhow::Error> { - Box::pin(async move { - certificate.check(&self.committee)?; - match &certificate.order.kind { - OrderKind::Transfer(transfer) => { - ensure!( - transfer.recipient == Address::FastPay(self.address), - "Transfer should be received by us." - ); - self.communicate_transfers( - transfer.sender, - *certificate.order.object_id(), - vec![certificate.clone()], - CommunicateAction::SynchronizeNextSequenceNumber( - transfer.object_ref.1.increment(), - ), - ) - .await?; - // Everything worked: update the local balance. - if let Entry::Vacant(entry) = - self.certificates.entry(certificate.order.digest()) - { - self.object_ids - .insert(transfer.object_ref.0, transfer.object_ref.1.increment()); - self.object_certs - .entry(transfer.object_ref.0) - .or_default() - .push(certificate.order.digest()); - entry.insert(certificate); - } - Ok(()) - } - OrderKind::Publish(_) | OrderKind::Call(_) => { - unimplemented!("receiving (?) Move call or publish") + async fn receive_object(&mut self, certificate: CertifiedOrder) -> Result<(), anyhow::Error> { + certificate.check(&self.committee)?; + match &certificate.order.kind { + OrderKind::Transfer(transfer) => { + ensure!( + transfer.recipient == Address::FastPay(self.address), + "Transfer should be received by us." + ); + self.communicate_transfers( + transfer.sender, + *certificate.order.object_id(), + vec![certificate.clone()], + CommunicateAction::SynchronizeNextSequenceNumber( + transfer.object_ref.1.increment(), + ), + ) + .await?; + // Everything worked: update the local balance. + if let Entry::Vacant(entry) = self.certificates.entry(certificate.order.digest()) { + self.object_ids + .insert(transfer.object_ref.0, transfer.object_ref.1.increment()); + self.object_certs + .entry(transfer.object_ref.0) + .or_default() + .push(certificate.order.digest()); + entry.insert(certificate); } + Ok(()) + } + OrderKind::Publish(_) | OrderKind::Call(_) => { + unimplemented!("receiving (?) Move call or publish") } - }) + } } - fn transfer_to_fastx_unsafe_unconfirmed( + async fn transfer_to_fastx_unsafe_unconfirmed( &mut self, recipient: FastPayAddress, object_id: ObjectID, gas_payment: ObjectID, - ) -> AsyncResult<'_, CertifiedOrder, anyhow::Error> { - Box::pin(async move { - let transfer = Transfer { - object_ref: ( - object_id, - self.next_sequence_number(&object_id)?, - // TODO(https://github.com/MystenLabs/fastnft/issues/123): Include actual object digest here - ObjectDigest::new([0; 32]), - ), - sender: self.address, - recipient: Address::FastPay(recipient), - gas_payment: ( - gas_payment, - self.next_sequence_number(&gas_payment)?, - // TODO(https://github.com/MystenLabs/fastnft/issues/123): Include actual object digest here - ObjectDigest::new([0; 32]), - ), - }; - let order = Order::new_transfer(transfer, &self.secret); - let new_certificate = self - .execute_transfer(order, /* with_confirmation */ false) - .await?; - Ok(new_certificate) - }) + ) -> Result { + let transfer = Transfer { + object_ref: ( + object_id, + self.next_sequence_number(&object_id)?, + // TODO(https://github.com/MystenLabs/fastnft/issues/123): Include actual object digest here + ObjectDigest::new([0; 32]), + ), + sender: self.address, + recipient: Address::FastPay(recipient), + gas_payment: ( + gas_payment, + self.next_sequence_number(&gas_payment)?, + // TODO(https://github.com/MystenLabs/fastnft/issues/123): Include actual object digest here + ObjectDigest::new([0; 32]), + ), + }; + let order = Order::new_transfer(transfer, &self.secret); + let new_certificate = self + .execute_transfer(order, /* with_confirmation */ false) + .await?; + Ok(new_certificate) } - fn sync_client_state_with_random_authority( + async fn sync_client_state_with_random_authority( &mut self, - ) -> AsyncResult<'_, AuthorityName, anyhow::Error> { - Box::pin(async move { - if let Some(order) = self.pending_transfer.clone() { - // Finish executing the previous transfer. - self.execute_transfer(order, /* with_confirmation */ false) - .await?; - } - // update object_ids. - self.object_ids.clear(); + ) -> Result { + if let Some(order) = self.pending_transfer.clone() { + // Finish executing the previous transfer. + self.execute_transfer(order, /* with_confirmation */ false) + .await?; + } + // update object_ids. + self.object_ids.clear(); - let (authority_name, object_ids) = self.download_own_object_ids().await?; - for (object_id, sequence_number, _) in object_ids { - self.object_ids.insert(object_id, sequence_number); - } - // Recover missing certificates. - let new_certificates = self.download_certificates().await?; + let (authority_name, object_ids) = self.download_own_object_ids().await?; + for (object_id, sequence_number, _) in object_ids { + self.object_ids.insert(object_id, sequence_number); + } + // Recover missing certificates. + let new_certificates = self.download_certificates().await?; - for (id, certs) in new_certificates { - self.update_certificates(&id, &certs)?; - } - Ok(authority_name) - }) + for (id, certs) in new_certificates { + self.update_certificates(&id, &certs)?; + } + Ok(authority_name) } - fn get_owned_objects(&self) -> AsyncResult<'_, Vec, anyhow::Error> { - Box::pin(async move { Ok(self.object_ids.keys().copied().collect()) }) + async fn get_owned_objects(&self) -> Result, anyhow::Error> { + Ok(self.object_ids.keys().copied().collect()) } - fn move_call( + async fn move_call( &mut self, package_object_ref: ObjectRef, module: Identifier, @@ -1021,8 +1011,8 @@ where object_arguments: Vec, pure_arguments: Vec>, gas_budget: u64, - ) -> AsyncResult<'_, (CertifiedOrder, OrderEffects), anyhow::Error> { - Box::pin(self.call( + ) -> Result<(CertifiedOrder, OrderEffects), anyhow::Error> { + self.call( package_object_ref, module, function, @@ -1031,12 +1021,14 @@ where object_arguments, pure_arguments, gas_budget, - )) + ) + .await } - fn get_object_info( + + async fn get_object_info( &mut self, object_info_req: ObjectInfoRequest, - ) -> AsyncResult<'_, ObjectInfoResponse, anyhow::Error> { - Box::pin(self.get_object_info_execute(object_info_req)) + ) -> Result { + self.get_object_info_execute(object_info_req).await } } diff --git a/fastpay_core/src/downloader.rs b/fastpay_core/src/downloader.rs index 9ae3cf7fbe826..4c970cda8dcda 100644 --- a/fastpay_core/src/downloader.rs +++ b/fastpay_core/src/downloader.rs @@ -1,9 +1,10 @@ // Copyright (c) Facebook, Inc. and its affiliates. // SPDX-License-Identifier: Apache-2.0 +use async_trait::async_trait; use futures::{ channel::{mpsc, oneshot}, - future, SinkExt, StreamExt, + SinkExt, StreamExt, }; use std::collections::BTreeMap; @@ -24,12 +25,13 @@ pub struct Downloader { } /// The underlying data-fetching mechanism to be provided by the user. +#[async_trait] pub trait Requester { type Key: std::cmp::Ord + Send + Sync + Clone + 'static; type Value: std::fmt::Debug + Send + Clone + 'static; /// Request the value corresponding to the given key. - fn query(&mut self, key: Self::Key) -> future::BoxFuture<'_, Self::Value>; + async fn query(&mut self, key: Self::Key) -> Self::Value; } /// Channel for using code to send requests and stop the downloader task. diff --git a/fastpay_core/src/unit_tests/client_tests.rs b/fastpay_core/src/unit_tests/client_tests.rs index 9736a645944dc..4c58ac1d2e995 100644 --- a/fastpay_core/src/unit_tests/client_tests.rs +++ b/fastpay_core/src/unit_tests/client_tests.rs @@ -33,40 +33,44 @@ fn max_files_client_tests() -> i32 { #[derive(Clone)] struct LocalAuthorityClient(Arc>); +#[async_trait] impl AuthorityClient for LocalAuthorityClient { - fn handle_order(&mut self, order: Order) -> AsyncResult<'_, OrderInfoResponse, FastPayError> { + async fn handle_order(&mut self, order: Order) -> Result { let state = self.0.clone(); - Box::pin(async move { state.lock().await.handle_order(order).await }) + let result = state.lock().await.handle_order(order).await; + result } - fn handle_confirmation_order( + async fn handle_confirmation_order( &mut self, order: ConfirmationOrder, - ) -> AsyncResult<'_, OrderInfoResponse, FastPayError> { + ) -> Result { let state = self.0.clone(); - Box::pin(async move { state.lock().await.handle_confirmation_order(order).await }) + let result = state.lock().await.handle_confirmation_order(order).await; + result } - fn handle_account_info_request( + async fn handle_account_info_request( &self, request: AccountInfoRequest, - ) -> AsyncResult<'_, AccountInfoResponse, FastPayError> { + ) -> Result { let state = self.0.clone(); - Box::pin(async move { - state - .lock() - .await - .handle_account_info_request(request) - .await - }) + + let result = state + .lock() + .await + .handle_account_info_request(request) + .await; + result } - fn handle_object_info_request( + async fn handle_object_info_request( &self, request: ObjectInfoRequest, - ) -> AsyncResult<'_, ObjectInfoResponse, FastPayError> { + ) -> Result { let state = self.0.clone(); - Box::pin(async move { state.lock().await.handle_object_info_request(request).await }) + let x = state.lock().await.handle_object_info_request(request).await; + x } } diff --git a/fastpay_core/src/unit_tests/downloader_tests.rs b/fastpay_core/src/unit_tests/downloader_tests.rs index dae645f98be98..5bcda6acd2c12 100644 --- a/fastpay_core/src/unit_tests/downloader_tests.rs +++ b/fastpay_core/src/unit_tests/downloader_tests.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use super::*; -use futures::future; +use async_trait::async_trait; use std::sync::{ atomic::{AtomicU32, Ordering}, Arc, @@ -18,12 +18,13 @@ impl LocalRequester { } } +#[async_trait] impl Requester for LocalRequester { type Key = &'static str; type Value = u32; - fn query(&mut self, _key: Self::Key) -> future::BoxFuture<'_, Self::Value> { - Box::pin(future::ready(self.0.fetch_add(1, Ordering::Relaxed))) + async fn query(&mut self, _key: Self::Key) -> Self::Value { + self.0.fetch_add(1, Ordering::Relaxed) } } diff --git a/scripts/bench.sh b/scripts/bench.sh index 54ad575b393ee..d79029cf60e03 100755 --- a/scripts/bench.sh +++ b/scripts/bench.sh @@ -6,7 +6,7 @@ num_shards=15 num_accounts=500000 max_in_flight=700 committee_size=4 -protocol=UDP +protocol=TCP if [ "$1" != "" ]; then num_shards=$1 diff --git a/scripts/fabfile.py b/scripts/fabfile.py index 25f81e156591f..fe560d6b85c0f 100644 --- a/scripts/fabfile.py +++ b/scripts/fabfile.py @@ -359,7 +359,7 @@ def update(): @roles('throughput') @parallel -def tps(tps_shards=65, tps_accounts=1000000, tps_max_in_flight=1000, tps_committee=4, tps_protocol='UDP', log_file=None): +def tps(tps_shards=65, tps_accounts=1000000, tps_max_in_flight=1000, tps_committee=4, tps_protocol='TCP', log_file=None): shards = int(tps_shards) accounts = int(tps_accounts) max_in_flight = int(tps_max_in_flight) @@ -378,7 +378,7 @@ def tps(tps_shards=65, tps_accounts=1000000, tps_max_in_flight=1000, tps_committ tps_accounts = 1000000 tps_in_flights = 1000 tps_committee = 4 -tps_protocol = 'UDP' +tps_protocol = 'TCP' base_tps_log_file = 'tps_logs.txt'