Skip to content

Commit

Permalink
perf: introduce deferred deserialization
Browse files Browse the repository at this point in the history
Signed-off-by: ljedrz <[email protected]>
  • Loading branch information
ljedrz committed Aug 26, 2022
1 parent 9830008 commit e8a3b47
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 38 deletions.
6 changes: 4 additions & 2 deletions snarkos/ledger/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
mod server;
pub use server::*;

use crate::{handle_dispatch_error, BlockDB, ProgramDB};
use crate::{handle_dispatch_error, BlockDB, Data, ProgramDB};
use snarkvm::prelude::*;

use colored::Colorize;
Expand Down Expand Up @@ -154,7 +154,9 @@ impl<N: Network> Ledger<N> {
// Broadcast the block to all peers.
let peers = self.peers().read().clone();
for (_, sender) in peers.iter() {
let _ = sender.send(crate::Message::<N>::BlockBroadcast(next_block.clone())).await;
let _ = sender
.send(crate::Message::<N>::BlockBroadcast(Data::Object(next_block.clone())))
.await;
}

// Return the next block.
Expand Down
95 changes: 65 additions & 30 deletions snarkos/network/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,58 @@

use snarkvm::prelude::*;

use ::bytes::{Buf, BufMut, BytesMut};
use ::bytes::{Buf, BufMut, Bytes, BytesMut};
use std::marker::PhantomData;
use tokio::task;
use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};

/// This object enables deferred deserialization / ahead-of-time serialization for objects that
/// take a while to deserialize / serialize, in order to allow these operations to be non-blocking.
#[derive(Clone, Debug)]
pub enum Data<T: FromBytes + ToBytes + Send + 'static> {
Object(T),
Buffer(Bytes),
}

impl<T: FromBytes + ToBytes + Send + 'static> Data<T> {
pub fn deserialize_blocking(self) -> Result<T> {
match self {
Self::Object(x) => Ok(x),
Self::Buffer(bytes) => T::from_bytes_le(&bytes),
}
}

pub async fn deserialize(self) -> Result<T> {
match self {
Self::Object(x) => Ok(x),
Self::Buffer(bytes) => match task::spawn_blocking(move || T::from_bytes_le(&bytes)).await {
Ok(x) => x,
Err(err) => Err(err.into()),
},
}
}

pub fn serialize_blocking_into<W: Write>(&self, writer: &mut W) -> Result<()> {
match self {
Self::Object(x) => {
let bytes = x.to_bytes_le()?;
Ok(writer.write_all(&bytes)?)
}
Self::Buffer(bytes) => Ok(writer.write_all(bytes)?),
}
}

pub async fn serialize(self) -> Result<Bytes> {
match self {
Self::Object(x) => match task::spawn_blocking(move || x.to_bytes_le()).await {
Ok(bytes) => bytes.map(|vec| vec.into()),
Err(err) => Err(err.into()),
},
Self::Buffer(bytes) => Ok(bytes),
}
}
}

#[derive(Clone)]
pub enum Message<N: Network> {
/// Ping with the current block height.
Expand All @@ -29,11 +77,11 @@ pub enum Message<N: Network> {
/// Request a block for a given height.
BlockRequest(u32),
/// A response to a `BlockRequest`.
BlockResponse(Block<N>),
BlockResponse(Data<Block<N>>),
/// A message containing a transaction to be broadcast.
TransactionBroadcast(Transaction<N>),
TransactionBroadcast(Data<Transaction<N>>),
/// A message containing a new block to be broadcast.
BlockBroadcast(Block<N>),
BlockBroadcast(Data<Block<N>>),
}

impl<N: Network> Message<N> {
Expand Down Expand Up @@ -72,56 +120,43 @@ impl<N: Network> Message<N> {
Self::Ping => Ok(()),
Self::Pong(block_height) => Ok(writer.write_all(&block_height.to_le_bytes())?),
Self::BlockRequest(block_height) => Ok(writer.write_all(&block_height.to_le_bytes())?),
Self::BlockResponse(block) | Self::BlockBroadcast(block) => Ok(writer.write_all(&block.to_bytes_le()?)?),
Self::TransactionBroadcast(transaction) => Ok(writer.write_all(&transaction.to_bytes_le()?)?),
Self::BlockResponse(block) | Self::BlockBroadcast(block) => block.serialize_blocking_into(writer),
Self::TransactionBroadcast(transaction) => transaction.serialize_blocking_into(writer),
}
}

/// Deserialize the given buffer into a message.
fn deserialize(mut bytes: BytesMut) -> Result<Self> {
if bytes.remaining() < 1 {
if bytes.remaining() < 2 {
bail!("Missing message ID");
}

// Read the message ID.
let id: u16 = bytes.get_u16_le();

// Deserialize the data field.

match id {
let message = match id {
0 => {
if bytes.remaining() != 0 {
bail!("Unexpected data for Ping");
}
Ok(Message::<N>::Ping)
Message::<N>::Ping
}
1 => {
let mut reader = bytes.reader();
let message = Message::<N>::Pong(bincode::deserialize_from(&mut reader)?);
Ok(message)
Message::<N>::Pong(bincode::deserialize_from(&mut reader)?)
}
2 => {
let mut reader = bytes.reader();
let message = Message::<N>::BlockRequest(bincode::deserialize_from(&mut reader)?);
Ok(message)
}
3 => {
let mut reader = bytes.reader();
let message = Message::<N>::BlockResponse(Block::read_le(&mut reader)?);
Ok(message)
}
4 => {
let mut reader = bytes.reader();
let message = Message::<N>::TransactionBroadcast(Transaction::read_le(&mut reader)?);
Ok(message)
}
5 => {
let mut reader = bytes.reader();
let message = Message::<N>::BlockBroadcast(Block::read_le(&mut reader)?);
Ok(message)
Message::<N>::BlockRequest(bincode::deserialize_from(&mut reader)?)
}
3 => Message::<N>::BlockResponse(Data::Buffer(bytes.freeze())),
4 => Message::<N>::TransactionBroadcast(Data::Buffer(bytes.freeze())),
5 => Message::<N>::BlockBroadcast(Data::Buffer(bytes.freeze())),
_ => bail!("Unknown message ID"),
}
};

Ok(message)
}
}

Expand Down
21 changes: 15 additions & 6 deletions snarkos/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,15 @@ pub(crate) async fn handle_peer<N: Network>(
trace!("Peer requested block {height}, which is greater than the current height {latest_height}");
} else {
let block = ledger.ledger().read().get_block(height)?;
let response = Message::BlockResponse(block);
let response = Message::BlockResponse(Data::Object(block));

peer.outbound.send(response).await?;
}
},
Message::BlockResponse(block) => {
Message::BlockResponse(block_bytes) => {
// Perform deferred deserialization.
let block = block_bytes.deserialize().await?;

// Check if the block can be added to the ledger.
if block.height() == ledger.ledger().read().latest_height() + 1 {
// Attempt to add the block to the ledger.
Expand All @@ -134,7 +137,10 @@ pub(crate) async fn handle_peer<N: Network>(
trace!("Skipping block {} (height: {})", block.hash(), block.height());
}
},
Message::TransactionBroadcast(transaction) => {
Message::TransactionBroadcast(transaction_bytes) => {
// Perform deferred deserialization.
let transaction = transaction_bytes.deserialize().await?;

let transaction_id = transaction.id();

// Check that the transaction doesn't already exist in the ledger or mempool.
Expand All @@ -146,7 +152,7 @@ pub(crate) async fn handle_peer<N: Network>(
let peers = ledger.peers().read().clone();
tokio::spawn(async move {
for (_, sender) in peers.iter().filter(|(ip, _)| *ip != &peer.ip) {
let _ = sender.send(Message::<N>::TransactionBroadcast(transaction.clone())).await;
let _ = sender.send(Message::<N>::TransactionBroadcast(Data::Object(transaction.clone()))).await;
}
});

Expand All @@ -161,7 +167,10 @@ pub(crate) async fn handle_peer<N: Network>(
}
}
},
Message::BlockBroadcast(block) => {
Message::BlockBroadcast(block_bytes) => {
// Perform deferred deserialization.
let block = block_bytes.deserialize().await?;

// Check if the block can be added to the ledger.
if block.height() == ledger.ledger().read().latest_height() + 1 {
// Attempt to add the block to the ledger.
Expand All @@ -173,7 +182,7 @@ pub(crate) async fn handle_peer<N: Network>(
let peers = ledger.peers().read().clone();
tokio::spawn(async move {
for (_, sender) in peers.iter().filter(|(ip, _)| *ip != &peer.ip) {
let _ = sender.send(Message::<N>::BlockBroadcast(block.clone())).await;
let _ = sender.send(Message::<N>::BlockBroadcast(Data::Object(block.clone()))).await;
}
});
},
Expand Down

0 comments on commit e8a3b47

Please sign in to comment.