Skip to content

Commit

Permalink
Merge pull request MystenLabs#304 from MystenLabs/separate-network-dir
Browse files Browse the repository at this point in the history
Add network_utils directory and move network/transport into it
  • Loading branch information
lxfind authored Jan 29, 2022
2 parents 244c0dc + 163b8cc commit e856db7
Show file tree
Hide file tree
Showing 15 changed files with 416 additions and 376 deletions.
5 changes: 1 addition & 4 deletions fastpay/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ anyhow = "1.0.53"
bytes = "1.1.0"
clap = "3.0.13"
futures = "0.3.19"
net2 = "0.2.37"
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.78"
structopt = "0.3.26"
Expand All @@ -34,6 +33,7 @@ tracing-subscriber = { version = "0.3", features = ["time", "env-filter"] }
bcs = "0.1.3"
fastpay_core = { path = "../fastpay_core" }
fastx-adapter = { path = "../fastx_programmability/adapter" }
fastx-network = { path = "../network_utils" }
fastx-types = { path = "../fastx_types" }

move-package = { git = "https://github.com/diem/move", rev="62b5a5075378ae6a7102bbfc1fb33b57ba6125d2" }
Expand All @@ -51,6 +51,3 @@ path = "src/server.rs"
[[bin]]
name = "bench"
path = "src/bench.rs"

[package.metadata.cargo-udeps.ignore]
normal = ["net2"]
7 changes: 4 additions & 3 deletions fastpay/src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
#![deny(warnings)]

use bytes::Bytes;
use fastpay::{network, transport};
use fastpay::{mass_client::MassClient, server_lib};
use fastpay_core::authority::*;
use fastx_network::{network, transport};
use fastx_types::FASTX_FRAMEWORK_ADDRESS;
use fastx_types::{base_types::*, committee::*, messages::*, object::Object, serialize::*};
use futures::stream::StreamExt;
Expand Down Expand Up @@ -269,7 +270,7 @@ impl ClientServerBenchmark {
}

async fn spawn_server(&self, state: AuthorityState) -> transport::SpawnedServer {
let server = network::Server::new(self.host.clone(), self.port, state, self.buffer_size);
let server = server_lib::Server::new(self.host.clone(), self.port, state, self.buffer_size);
server.spawn().await.unwrap()
}

Expand All @@ -290,7 +291,7 @@ impl ClientServerBenchmark {

info!("Sending requests.");
if self.max_in_flight > 0 {
let mass_client = network::MassClient::new(
let mass_client = MassClient::new(
self.host.clone(),
self.port,
self.buffer_size,
Expand Down
7 changes: 4 additions & 3 deletions fastpay/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@

#![deny(warnings)]

use fastpay::{config::*, network, transport};
use fastpay::{config::*, mass_client::MassClient};
use fastpay_core::client::*;
use fastx_network::{network, transport};
use fastx_types::{base_types::*, committee::Committee, messages::*, serialize::*};
use move_core_types::transaction_argument::convert_txn_args;

Expand Down Expand Up @@ -47,10 +48,10 @@ fn make_authority_mass_clients(
send_timeout: std::time::Duration,
recv_timeout: std::time::Duration,
max_in_flight: u64,
) -> Vec<network::MassClient> {
) -> Vec<MassClient> {
let mut authority_clients = Vec::new();
for config in &committee_config.authorities {
let client = network::MassClient::new(
let client = MassClient::new(
config.host.clone(),
config.base_port,
buffer_size,
Expand Down
4 changes: 2 additions & 2 deletions fastpay/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@
#![deny(warnings)]

pub mod config;
pub mod network;
pub mod transport;
pub mod mass_client;
pub mod server_lib;
129 changes: 129 additions & 0 deletions fastpay/src/mass_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright (c) Facebook, Inc. and its affiliates.
// SPDX-License-Identifier: Apache-2.0

use bytes::Bytes;
use fastx_network::transport::*;
use futures::future::FutureExt;
use std::io;
use tokio::time;
use tracing::*;

#[derive(Clone)]
pub struct MassClient {
base_address: String,
base_port: u32,
buffer_size: usize,
send_timeout: std::time::Duration,
recv_timeout: std::time::Duration,
max_in_flight: u64,
}

impl MassClient {
pub fn new(
base_address: String,
base_port: u32,
buffer_size: usize,
send_timeout: std::time::Duration,
recv_timeout: std::time::Duration,
max_in_flight: u64,
) -> Self {
Self {
base_address,
base_port,
buffer_size,
send_timeout,
recv_timeout,
max_in_flight,
}
}

async fn run_core(&self, requests: Vec<Bytes>) -> Result<Vec<Bytes>, io::Error> {
let address = format!("{}:{}", self.base_address, self.base_port);
let mut stream = connect(address, self.buffer_size).await?;
let mut requests = requests.iter();
let mut in_flight: u64 = 0;
let mut responses = Vec::new();

loop {
while in_flight < self.max_in_flight {
let request = match requests.next() {
None => {
if in_flight == 0 {
return Ok(responses);
}
// No more entries to send.
break;
}
Some(request) => request,
};
let status = time::timeout(self.send_timeout, stream.write_data(request)).await;
if let Err(error) = status {
error!("Failed to send request: {}", error);
continue;
}
in_flight += 1;
}
if requests.len() % 5000 == 0 && requests.len() > 0 {
info!("In flight {} Remaining {}", in_flight, requests.len());
}
match time::timeout(self.recv_timeout, stream.read_data()).await {
Ok(Ok(buffer)) => {
in_flight -= 1;
responses.push(Bytes::from(buffer));
}
Ok(Err(error)) => {
if error.kind() == io::ErrorKind::UnexpectedEof {
info!("Socket closed by server");
return Ok(responses);
}
error!("Received error response: {}", error);
}
Err(error) => {
error!(
"Timeout while receiving response: {} (in flight: {})",
error, in_flight
);
}
}
}
}

/// Spin off one task on this authority client.
pub fn run<I>(
&self,
requests: I,
connections: usize,
) -> impl futures::stream::Stream<Item = Vec<Bytes>>
where
I: IntoIterator<Item = Bytes>,
{
let handles = futures::stream::FuturesUnordered::new();

let outer_requests: Vec<_> = requests.into_iter().collect();
let size = outer_requests.len() / connections;
for chunk in outer_requests[..].chunks(size) {
let requests: Vec<_> = chunk.to_vec();
let client = self.clone();
handles.push(
tokio::spawn(async move {
info!(
"Sending TCP requests to {}:{}",
client.base_address, client.base_port,
);
let responses = client
.run_core(requests)
.await
.unwrap_or_else(|_| Vec::new());
info!(
"Done sending TCP requests to {}:{}",
client.base_address, client.base_port,
);
responses
})
.then(|x| async { x.unwrap_or_else(|_| Vec::new()) }),
);
}

handles
}
}
Loading

0 comments on commit e856db7

Please sign in to comment.