Skip to content

Commit

Permalink
[Authority Server] Chunks message inputs on a TCP connections and doe…
Browse files Browse the repository at this point in the history
…s batch signature verification (MystenLabs#920)

* Added transaction verification batching for each TCP socket
* Refactored to verify batches of input transactions and also to cache keys
* Modernize mass client
* Ignore max in flight
* Adds a is_checked to Transaction and Certificate, and only check if this is false
* Refactor checks to use obligations
* Update the store to the latest contains implementation

Co-authored-by: George Danezis <[email protected]>
  • Loading branch information
gdanezis and George Danezis authored Mar 18, 2022
1 parent 283bc33 commit b3e78ce
Show file tree
Hide file tree
Showing 10 changed files with 431 additions and 191 deletions.
99 changes: 37 additions & 62 deletions network_utils/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::transport::*;
use bytes::Bytes;
use futures::future::FutureExt;
use bytes::{Bytes, BytesMut};
use std::{
net::TcpListener,
sync::atomic::{AtomicUsize, Ordering},
Expand All @@ -13,7 +12,11 @@ use sui_types::{error::*, serialize::*};
use tracing::*;

use std::io;
use tokio::time;
use tokio::{task::JoinError, time};

use futures::stream;
use futures::SinkExt;
use futures::StreamExt;

#[derive(Clone)]
pub struct NetworkClient {
Expand Down Expand Up @@ -76,66 +79,41 @@ impl NetworkClient {
async fn batch_send_one_chunk(
&self,
requests: Vec<Bytes>,
max_in_flight: u64,
) -> Result<Vec<Bytes>, io::Error> {
_max_in_flight: u64,
) -> Vec<Result<BytesMut, std::io::Error>> {
let address = format!("{}:{}", self.base_address, self.base_port);
let mut stream = connect(address, self.buffer_size).await?;
let mut requests = requests.iter();
let mut in_flight: u64 = 0;
let mut responses = Vec::new();

loop {
while in_flight < max_in_flight {
let request = match requests.next() {
None => {
if in_flight == 0 {
return Ok(responses);
}
// No more entries to send.
break;
}
Some(request) => request,
};
let status = time::timeout(self.send_timeout, stream.write_data(request)).await;
if let Err(error) = status {
error!("Failed to send request: {}", error);
continue;
}
in_flight += 1;
}
if requests.len() % 5000 == 0 && requests.len() > 0 {
info!("In flight {} Remaining {}", in_flight, requests.len());
}
match time::timeout(self.recv_timeout, stream.read_data()).await {
Ok(Some(Ok(buffer))) => {
in_flight -= 1;
responses.push(Bytes::from(buffer));
}
Ok(Some(Err(error))) => {
error!("Received error response: {}", error);
let stream = connect(address, self.buffer_size)
.await
.expect("Must be able to connect.");
let total = requests.len();

let (read_stream, mut write_stream) = (stream.framed_read, stream.framed_write);

let mut requests = stream::iter(requests.into_iter().map(Ok));
tokio::spawn(async move { write_stream.send_all(&mut requests).await });

let mut received = 0;
let responses: Vec<Result<BytesMut, std::io::Error>> = read_stream
.take_while(|_buf| {
received += 1;
if received % 5000 == 0 && received > 0 {
debug!("Received {}", received);
}
Ok(None) => {
info!("Socket closed by server");
return Ok(responses);
}
Err(error) => {
error!(
"Timeout while receiving response: {} (in flight: {})",
error, in_flight
);
}
}
}
let xcontinue = received < total;
futures::future::ready(xcontinue)
})
.collect()
.await;

responses
}

pub fn batch_send<I>(
pub fn batch_send(
&self,
requests: I,
requests: Vec<Bytes>,
connections: usize,
max_in_flight: u64,
) -> impl futures::stream::Stream<Item = Vec<Bytes>>
where
I: IntoIterator<Item = Bytes>,
) -> impl futures::stream::Stream<Item = Result<Vec<Result<BytesMut, std::io::Error>>, JoinError>>
{
let handles = futures::stream::FuturesUnordered::new();

Expand All @@ -150,17 +128,14 @@ impl NetworkClient {
"Sending TCP requests to {}:{}",
client.base_address, client.base_port,
);
let responses = client
.batch_send_one_chunk(requests, max_in_flight)
.await
.unwrap_or_else(|_| Vec::new());
let responses = client.batch_send_one_chunk(requests, max_in_flight).await;
// .unwrap_or_else(|_| Vec::new());
info!(
"Done sending TCP requests to {}:{}",
client.base_address, client.base_port,
);
responses
})
.then(|x| async { x.unwrap_or_else(|_| Vec::new()) }),
}), // .then(|x| async { x.unwrap_or_else(|_| Vec::new()) }),
);
}

Expand Down
42 changes: 29 additions & 13 deletions network_utils/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ use async_trait::async_trait;
use tracing::*;

use bytes::{Bytes, BytesMut};
use tokio_util::codec::{Framed, LengthDelimitedCodec};
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};

#[cfg(test)]
#[path = "unit_tests/transport_tests.rs"]
mod transport_tests;

/// Suggested buffer size
pub const DEFAULT_MAX_DATAGRAM_SIZE: usize = 65507;
pub const DEFAULT_MAX_DATAGRAM_SIZE_STR: &str = "65507";
pub const DEFAULT_MAX_DATAGRAM_SIZE: usize = 650000;
pub const DEFAULT_MAX_DATAGRAM_SIZE_STR: &str = "650000";

/// The handler required to create a service.
#[async_trait]
Expand Down Expand Up @@ -104,9 +104,13 @@ where
Ok(SpawnedServer { complete, handle })
}

use tokio::net::tcp::OwnedReadHalf;
use tokio::net::tcp::OwnedWriteHalf;

/// An implementation of DataStream based on TCP.
pub struct TcpDataStream {
framed: Framed<TcpStream, LengthDelimitedCodec>,
pub framed_read: FramedRead<OwnedReadHalf, LengthDelimitedCodec>,
pub framed_write: FramedWrite<OwnedWriteHalf, LengthDelimitedCodec>,
}

impl TcpDataStream {
Expand All @@ -123,37 +127,49 @@ impl TcpDataStream {
}

fn from_tcp_stream(stream: TcpStream, max_data_size: usize) -> TcpDataStream {
let framed = Framed::new(
stream,
let (read_half, write_half) = stream.into_split();

let framed_read = FramedRead::new(
read_half,
LengthDelimitedCodec::builder()
.max_frame_length(max_data_size)
.new_codec(),
);

let framed_write = FramedWrite::new(
write_half,
LengthDelimitedCodec::builder()
.max_frame_length(max_data_size)
.new_codec(),
);

Self { framed }
Self {
framed_read,
framed_write,
}
}

// TODO: Eliminate vecs and use Byte, ByteBuf

pub async fn write_data<'a>(&'a mut self, buffer: &'a [u8]) -> Result<(), std::io::Error> {
self.framed.send(buffer.to_vec().into()).await
self.framed_write.send(buffer.to_vec().into()).await
}

pub async fn read_data(&mut self) -> Option<Result<Vec<u8>, std::io::Error>> {
let result = self.framed.next().await;
let result = self.framed_read.next().await;
result.map(|v| v.map(|w| w.to_vec()))
}
}

impl<'a> RwChannel<'a> for TcpDataStream {
type W = Framed<TcpStream, LengthDelimitedCodec>;
type R = Framed<TcpStream, LengthDelimitedCodec>;
type W = FramedWrite<OwnedWriteHalf, LengthDelimitedCodec>;
type R = FramedRead<OwnedReadHalf, LengthDelimitedCodec>;

fn sink(&mut self) -> &mut Self::W {
&mut self.framed
&mut self.framed_write
}
fn stream(&mut self) -> &mut Self::R {
&mut self.framed
&mut self.framed_read
}
}

Expand Down
23 changes: 13 additions & 10 deletions sui/src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ struct ClientServerBenchmark {
#[structopt(long, default_value = "10")]
committee_size: usize,
/// Maximum number of requests in flight (0 for blocking client)
#[structopt(long, default_value = "1000")]
#[structopt(long, default_value = "0")]
max_in_flight: usize,
/// Number of accounts and transactions used in the benchmark
#[structopt(long, default_value = "40000")]
Expand All @@ -59,7 +59,7 @@ struct ClientServerBenchmark {
#[structopt(long, default_value = "4000000")]
send_timeout_us: u64,
/// Timeout for receiving responses (us)
#[structopt(long, default_value = "4000000")]
#[structopt(long, default_value = "40000000")]
recv_timeout_us: u64,
/// Maximum size of datagrams received and sent (bytes)
#[structopt(long, default_value = transport::DEFAULT_MAX_DATAGRAM_SIZE_STR)]
Expand Down Expand Up @@ -117,7 +117,7 @@ fn main() {
thread::spawn(move || {
let runtime = Builder::new_multi_thread()
.enable_all()
.thread_stack_size(15 * 1024 * 1024)
.thread_stack_size(32 * 1024 * 1024)
.build()
.unwrap();

Expand All @@ -132,7 +132,8 @@ fn main() {
// Make a single-core runtime for the client.
let runtime = Builder::new_multi_thread()
.enable_all()
.thread_stack_size(15 * 1024 * 1024)
.thread_stack_size(32 * 1024 * 1024)
.worker_threads(usize::min(num_cpus::get(), 24))
.build()
.unwrap();
runtime.block_on(benchmark.launch_client(connections, transactions));
Expand Down Expand Up @@ -183,7 +184,7 @@ impl ClientServerBenchmark {
// Seed user accounts.
let rt = Runtime::new().unwrap();

println!("Init Authority.");
info!("Init Authority.");
let state = rt.block_on(async {
AuthorityState::new(
committee.clone(),
Expand All @@ -196,7 +197,7 @@ impl ClientServerBenchmark {
.await
});

println!("Generate empty store with Genesis.");
info!("Generate empty store with Genesis.");
let (address, keypair) = get_key_pair();

let account_gas_objects: Vec<_> = (0u64..(self.num_accounts as u64))
Expand Down Expand Up @@ -327,9 +328,10 @@ impl ClientServerBenchmark {
let items_number = transactions.len() / transaction_len_factor;
let mut elapsed_time: u128 = 0;

let max_in_flight = self.max_in_flight / connections as usize;
if self.max_in_flight != 0 {
warn!("Option max-in-flight is now ignored.")
}
info!("Number of TCP connections: {}", connections);
info!("Max_in_flight: {}", max_in_flight);

info!("Sending requests.");
if self.max_in_flight > 0 {
Expand All @@ -343,15 +345,16 @@ impl ClientServerBenchmark {

let time_start = Instant::now();
let responses = mass_client
.batch_send(transactions, connections, max_in_flight as u64)
.batch_send(transactions, connections, 0)
.map(|x| x.unwrap())
.concat()
.await;
elapsed_time = time_start.elapsed().as_micros();

info!("Received {} responses.", responses.len(),);
// Check the responses for errors
for resp in &responses {
let reply_message = deserialize_message(&resp[..]);
let reply_message = deserialize_message(&(resp.as_ref().unwrap())[..]);
match reply_message {
Ok(SerializedMessage::TransactionResp(res)) => {
if let Some(e) = res.signed_effects {
Expand Down
4 changes: 2 additions & 2 deletions sui_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ move-core-types = { git = "https://github.com/diem/move", rev = "2ef516919d5bbc7
move-package = { git = "https://github.com/diem/move", rev = "2ef516919d5bbc728a57a2c0073b85c46d9fcf5a" }
move-vm-runtime = { git = "https://github.com/diem/move", rev = "2ef516919d5bbc728a57a2c0073b85c46d9fcf5a" }

typed-store = { git = "https://github.com/MystenLabs/mysten-infra", rev = "97a056f85555fa2afe497d6abb7cf6bf8faa63cf"}
typed-store = { git = "https://github.com/MystenLabs/mysten-infra", rev = "e44bca4513a6ff6c97399cd79e82e4bc00571ac3"}

[dev-dependencies]
fdlimit = "0.2.1"
Expand All @@ -49,4 +49,4 @@ assert-str = "0.1.0"
[[example]]
name = "generate-format"
path = "src/generate_format.rs"
test = false
test = false
5 changes: 3 additions & 2 deletions sui_core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,6 @@ impl AuthorityState {
confirmation_transaction: ConfirmationTransaction,
) -> SuiResult<TransactionInfoResponse> {
let transaction_digest = *confirmation_transaction.certificate.digest();
let certificate = &confirmation_transaction.certificate;

// Ensure an idempotent answer.
if self._database.signed_effects_exists(&transaction_digest)? {
Expand All @@ -351,7 +350,9 @@ impl AuthorityState {
}

// Check the certificate and retrieve the transfer data.
certificate.check(&self.committee)?;
confirmation_transaction
.certificate
.check(&self.committee)?;

self.process_certificate(confirmation_transaction).await
}
Expand Down
Loading

0 comments on commit b3e78ce

Please sign in to comment.