Skip to content

Commit

Permalink
[fastx net] Remove UDP and replace hand-crafted traits with async_tra…
Browse files Browse the repository at this point in the history
…it (MystenLabs#184)

* Use async trait
* Removed all traces of UDP
* Adapted new async function to async-trait syntax

Co-authored-by: George Danezis <[email protected]>
  • Loading branch information
gdanezis and George Danezis authored Jan 20, 2022
1 parent 909e1fc commit 281dbf4
Show file tree
Hide file tree
Showing 15 changed files with 287 additions and 493 deletions.
1 change: 1 addition & 0 deletions fastpay/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ base64 = "0.13.0"
ed25519-dalek = { version = "1.0.1", features = ["batch", "serde"] }
rocksdb = "0.17.0"
hex = "0.4.3"
async-trait = "0.1.52"

bcs = "0.1.3"
fastpay_core = { path = "../fastpay_core" }
Expand Down
13 changes: 1 addition & 12 deletions fastpay/src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ use strum_macros::EnumString;
about = "Local end-to-end test and benchmark of the FastPay protocol"
)]
struct ClientServerBenchmark {
/// Choose a network protocol between Udp and Tcp
#[structopt(long, default_value = "tcp")]
protocol: transport::NetworkProtocol,
/// Hostname
#[structopt(long, default_value = "127.0.0.1")]
host: String,
Expand Down Expand Up @@ -266,13 +263,7 @@ impl ClientServerBenchmark {
}

async fn spawn_server(&self, state: AuthorityState) -> transport::SpawnedServer {
let server = network::Server::new(
self.protocol,
self.host.clone(),
self.port,
state,
self.buffer_size,
);
let server = network::Server::new(self.host.clone(), self.port, state, self.buffer_size);
server.spawn().await.unwrap()
}

Expand All @@ -294,7 +285,6 @@ impl ClientServerBenchmark {
info!("Sending requests.");
if self.max_in_flight > 0 {
let mass_client = network::MassClient::new(
self.protocol,
self.host.clone(),
self.port,
self.buffer_size,
Expand Down Expand Up @@ -325,7 +315,6 @@ impl ClientServerBenchmark {
} else {
// Use actual client core
let client = network::Client::new(
self.protocol,
self.host.clone(),
self.port,
self.buffer_size,
Expand Down
2 changes: 0 additions & 2 deletions fastpay/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ fn make_authority_clients(
for config in &committee_config.authorities {
let config = config.clone();
let client = network::Client::new(
config.network_protocol,
config.host,
config.base_port,
buffer_size,
Expand All @@ -50,7 +49,6 @@ fn make_authority_mass_clients(
let mut authority_clients = Vec::new();
for config in &committee_config.authorities {
let client = network::MassClient::new(
config.network_protocol,
config.host.clone(),
config.base_port,
buffer_size,
Expand Down
2 changes: 0 additions & 2 deletions fastpay/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) Facebook, Inc. and its affiliates.
// SPDX-License-Identifier: Apache-2.0

use crate::transport::NetworkProtocol;
use fastpay_core::client::ClientState;
use fastx_types::{
base_types::*,
Expand All @@ -19,7 +18,6 @@ use std::{
};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AuthorityConfig {
pub network_protocol: NetworkProtocol,
#[serde(
serialize_with = "address_as_hex",
deserialize_with = "address_from_hex"
Expand Down
88 changes: 33 additions & 55 deletions fastpay/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::transport::*;
use fastpay_core::{authority::*, client::*};
use fastx_types::{error::*, messages::*, serialize::*};

use async_trait::async_trait;
use bytes::Bytes;
use futures::future::FutureExt;
use log::*;
Expand All @@ -13,7 +14,6 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::time;

pub struct Server {
network_protocol: NetworkProtocol,
base_address: String,
base_port: u32,
state: AuthorityState,
Expand All @@ -25,14 +25,12 @@ pub struct Server {

impl Server {
pub fn new(
network_protocol: NetworkProtocol,
base_address: String,
base_port: u32,
state: AuthorityState,
buffer_size: usize,
) -> Self {
Self {
network_protocol,
base_address,
base_port,
state,
Expand All @@ -52,16 +50,15 @@ impl Server {

pub async fn spawn(self) -> Result<SpawnedServer, io::Error> {
info!(
"Listening to {} traffic on {}:{}",
self.network_protocol, self.base_address, self.base_port
"Listening to TCP traffic on {}:{}",
self.base_address, self.base_port
);
let address = format!("{}:{}", self.base_address, self.base_port);

let buffer_size = self.buffer_size;
let protocol = self.network_protocol;
let state = RunningServerState { server: self };

// Launch server for the appropriate protocol.
protocol.spawn_server(&address, state, buffer_size).await
spawn_server(&address, state, buffer_size).await
}
}

Expand Down Expand Up @@ -147,7 +144,6 @@ impl MessageHandler for RunningServerState {

#[derive(Clone)]
pub struct Client {
network_protocol: NetworkProtocol,
base_address: String,
base_port: u32,
buffer_size: usize,
Expand All @@ -157,15 +153,13 @@ pub struct Client {

impl Client {
pub fn new(
network_protocol: NetworkProtocol,
base_address: String,
base_port: u32,
buffer_size: usize,
send_timeout: std::time::Duration,
recv_timeout: std::time::Duration,
) -> Self {
Self {
network_protocol,
base_address,
base_port,
buffer_size,
Expand All @@ -176,10 +170,7 @@ impl Client {

async fn send_recv_bytes_internal(&self, buf: Vec<u8>) -> Result<Vec<u8>, io::Error> {
let address = format!("{}:{}", self.base_address, self.base_port);
let mut stream = self
.network_protocol
.connect(address, self.buffer_size)
.await?;
let mut stream = connect(address, self.buffer_size).await?;
// Send message
time::timeout(self.send_timeout, stream.write_data(&buf)).await??;
// Wait for reply
Expand Down Expand Up @@ -208,56 +199,48 @@ impl Client {
}
}

#[async_trait]
impl AuthorityClient for Client {
/// Initiate a new transfer to a FastPay or Primary account.
fn handle_order(&mut self, order: Order) -> AsyncResult<'_, OrderInfoResponse, FastPayError> {
Box::pin(async move {
self.send_recv_bytes(serialize_order(&order), order_info_deserializer)
.await
})
async fn handle_order(&mut self, order: Order) -> Result<OrderInfoResponse, FastPayError> {
self.send_recv_bytes(serialize_order(&order), order_info_deserializer)
.await
}

/// Confirm a transfer to a FastPay or Primary account.
fn handle_confirmation_order(
async fn handle_confirmation_order(
&mut self,
order: ConfirmationOrder,
) -> AsyncResult<'_, OrderInfoResponse, FastPayError> {
Box::pin(async move {
self.send_recv_bytes(serialize_cert(&order.certificate), order_info_deserializer)
.await
})
) -> Result<OrderInfoResponse, FastPayError> {
self.send_recv_bytes(serialize_cert(&order.certificate), order_info_deserializer)
.await
}

fn handle_account_info_request(
async fn handle_account_info_request(
&self,
request: AccountInfoRequest,
) -> AsyncResult<'_, AccountInfoResponse, FastPayError> {
Box::pin(async move {
self.send_recv_bytes(
serialize_account_info_request(&request),
account_info_deserializer,
)
.await
})
) -> Result<AccountInfoResponse, FastPayError> {
self.send_recv_bytes(
serialize_account_info_request(&request),
account_info_deserializer,
)
.await
}

fn handle_object_info_request(
async fn handle_object_info_request(
&self,
request: ObjectInfoRequest,
) -> AsyncResult<'_, ObjectInfoResponse, FastPayError> {
Box::pin(async move {
self.send_recv_bytes(
serialize_object_info_request(&request),
object_info_deserializer,
)
.await
})
) -> Result<ObjectInfoResponse, FastPayError> {
self.send_recv_bytes(
serialize_object_info_request(&request),
object_info_deserializer,
)
.await
}
}

#[derive(Clone)]
pub struct MassClient {
network_protocol: NetworkProtocol,
base_address: String,
base_port: u32,
buffer_size: usize,
Expand All @@ -268,7 +251,6 @@ pub struct MassClient {

impl MassClient {
pub fn new(
network_protocol: NetworkProtocol,
base_address: String,
base_port: u32,
buffer_size: usize,
Expand All @@ -277,7 +259,6 @@ impl MassClient {
max_in_flight: u64,
) -> Self {
Self {
network_protocol,
base_address,
base_port,
buffer_size,
Expand All @@ -289,10 +270,7 @@ impl MassClient {

async fn run_core(&self, requests: Vec<Bytes>) -> Result<Vec<Bytes>, io::Error> {
let address = format!("{}:{}", self.base_address, self.base_port);
let mut stream = self
.network_protocol
.connect(address, self.buffer_size)
.await?;
let mut stream = connect(address, self.buffer_size).await?;
let mut requests = requests.iter();
let mut in_flight: u64 = 0;
let mut responses = Vec::new();
Expand Down Expand Up @@ -360,16 +338,16 @@ impl MassClient {
handles.push(
tokio::spawn(async move {
info!(
"Sending {} requests to {}:{}",
client.network_protocol, client.base_address, client.base_port,
"Sending TCP requests to {}:{}",
client.base_address, client.base_port,
);
let responses = client
.run_core(requests)
.await
.unwrap_or_else(|_| Vec::new());
info!(
"Done sending {} requests to {}:{}",
client.network_protocol, client.base_address, client.base_port,
"Done sending TCP requests to {}:{}",
client.base_address, client.base_port,
);
responses
})
Expand Down
7 changes: 0 additions & 7 deletions fastpay/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ fn make_server(
});

network::Server::new(
server_config.authority.network_protocol,
local_ip_addr.to_string(),
server_config.authority.base_port,
state,
Expand Down Expand Up @@ -104,10 +103,6 @@ enum ServerCommands {
/// Generate a new server configuration and output its public description
#[structopt(name = "generate")]
Generate {
/// Chooses a network protocol between Udp and Tcp
#[structopt(long, default_value = "Udp")]
protocol: transport::NetworkProtocol,

/// Sets the public name of the host
#[structopt(long)]
host: String,
Expand Down Expand Up @@ -164,14 +159,12 @@ fn main() {
}

ServerCommands::Generate {
protocol,
host,
port,
database_path,
} => {
let (address, key) = get_key_pair();
let authority = AuthorityConfig {
network_protocol: protocol,
address,
host,
base_port: port,
Expand Down
Loading

0 comments on commit 281dbf4

Please sign in to comment.