Skip to content

Commit

Permalink
Merge pull request AleoNet#1884 from ljedrz/post_dev_tweaks
Browse files Browse the repository at this point in the history
Further tweaks
  • Loading branch information
raychu86 authored Aug 26, 2022
2 parents 796d9f4 + 4d732e6 commit f8c0d95
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 107 deletions.
18 changes: 11 additions & 7 deletions snarkos/ledger/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
mod server;
pub use server::*;

use crate::{handle_dispatch_error, BlockDB, ProgramDB};
use crate::{handle_dispatch_error, BlockDB, Data, ProgramDB};
use snarkvm::prelude::*;

use colored::Colorize;
Expand Down Expand Up @@ -149,24 +149,28 @@ impl<N: Network> Ledger<N> {
.await??;

// Add the next block to the ledger.
self.add_next_block(&next_block).await?;
self.add_next_block(next_block.clone()).await?;

// Serialize the block ahead of time to not do it for each peer.
let serialized_block = Data::Object(next_block.clone()).serialize().await?;

// Broadcast the block to all peers.
let peers = self.peers().read().clone();
for (_, sender) in peers.iter() {
let _ = sender.send(crate::Message::<N>::BlockBroadcast(next_block.clone())).await;
let _ = sender
.send(crate::Message::<N>::BlockBroadcast(Data::Buffer(serialized_block.clone())))
.await;
}

// Return the next block.
Ok(next_block)
}

/// Attempts to add the given block to the ledger.
pub(crate) async fn add_next_block(self: &Arc<Self>, next_block: &Block<N>) -> Result<()> {
pub(crate) async fn add_next_block(self: &Arc<Self>, next_block: Block<N>) -> Result<()> {
// Add the next block to the ledger.
let self_clone = self.clone();
let next_block_clone = next_block.clone();
if let Err(error) = task::spawn_blocking(move || self_clone.ledger.write().add_next_block(&next_block_clone)).await? {
if let Err(error) = task::spawn_blocking(move || self_clone.ledger.write().add_next_block(&next_block)).await? {
// Log the error.
warn!("{error}");
return Err(error);
Expand Down Expand Up @@ -256,7 +260,7 @@ impl<N: Network> Ledger<N> {
// Internal operations.
impl<N: Network> Ledger<N> {
/// Syncs the ledger with the network.
pub(crate) async fn initial_sync_with_network(self: &Arc<Self>, leader_ip: &IpAddr) -> Result<()> {
pub(crate) async fn initial_sync_with_network(self: &Arc<Self>, leader_ip: IpAddr) -> Result<()> {
/// The number of concurrent requests with the network.
const CONCURRENT_REQUESTS: usize = 100;
/// Url to fetch the blocks from.
Expand Down
125 changes: 62 additions & 63 deletions snarkos/network/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,58 @@

use snarkvm::prelude::*;

use ::bytes::{Buf, BufMut, BytesMut};
use ::bytes::{Buf, BufMut, Bytes, BytesMut};
use std::marker::PhantomData;
use tokio::task;
use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};

/// This object enables deferred deserialization / ahead-of-time serialization for objects that
/// take a while to deserialize / serialize, in order to allow these operations to be non-blocking.
#[derive(Clone, Debug)]
pub enum Data<T: FromBytes + ToBytes + Send + 'static> {
Object(T),
Buffer(Bytes),
}

impl<T: FromBytes + ToBytes + Send + 'static> Data<T> {
pub fn deserialize_blocking(self) -> Result<T> {
match self {
Self::Object(x) => Ok(x),
Self::Buffer(bytes) => T::from_bytes_le(&bytes),
}
}

pub async fn deserialize(self) -> Result<T> {
match self {
Self::Object(x) => Ok(x),
Self::Buffer(bytes) => match task::spawn_blocking(move || T::from_bytes_le(&bytes)).await {
Ok(x) => x,
Err(err) => Err(err.into()),
},
}
}

pub fn serialize_blocking_into<W: Write>(&self, writer: &mut W) -> Result<()> {
match self {
Self::Object(x) => {
let bytes = x.to_bytes_le()?;
Ok(writer.write_all(&bytes)?)
}
Self::Buffer(bytes) => Ok(writer.write_all(bytes)?),
}
}

pub async fn serialize(self) -> Result<Bytes> {
match self {
Self::Object(x) => match task::spawn_blocking(move || x.to_bytes_le()).await {
Ok(bytes) => bytes.map(|vec| vec.into()),
Err(err) => Err(err.into()),
},
Self::Buffer(bytes) => Ok(bytes),
}
}
}

#[derive(Clone)]
pub enum Message<N: Network> {
/// Ping with the current block height.
Expand All @@ -29,11 +77,11 @@ pub enum Message<N: Network> {
/// Request a block for a given height.
BlockRequest(u32),
/// A response to a `BlockRequest`.
BlockResponse(Block<N>),
BlockResponse(Data<Block<N>>),
/// A message containing a transaction to be broadcast.
TransactionBroadcast(Transaction<N>),
TransactionBroadcast(Data<Transaction<N>>),
/// A message containing a new block to be broadcast.
BlockBroadcast(Block<N>),
BlockBroadcast(Data<Block<N>>),
}

impl<N: Network> Message<N> {
Expand Down Expand Up @@ -72,95 +120,46 @@ impl<N: Network> Message<N> {
Self::Ping => Ok(()),
Self::Pong(block_height) => Ok(writer.write_all(&block_height.to_le_bytes())?),
Self::BlockRequest(block_height) => Ok(writer.write_all(&block_height.to_le_bytes())?),
Self::BlockResponse(block) => Ok(writer.write_all(&block.to_bytes_le()?)?),
Self::TransactionBroadcast(transaction) => Ok(writer.write_all(&transaction.to_bytes_le()?)?),
Self::BlockBroadcast(block) => Ok(writer.write_all(&block.to_bytes_le()?)?),
Self::BlockResponse(block) | Self::BlockBroadcast(block) => block.serialize_blocking_into(writer),
Self::TransactionBroadcast(transaction) => transaction.serialize_blocking_into(writer),
}
}

/// Deserialize the given buffer into a message.
fn deserialize(mut bytes: BytesMut) -> Result<Self> {
if bytes.remaining() < 1 {
if bytes.remaining() < 2 {
bail!("Missing message ID");
}

// Read the message ID.
let id: u16 = bytes.get_u16_le();

// Deserialize the data field.

match id {
let message = match id {
0 => {
if bytes.remaining() != 0 {
bail!("Unexpected data for Ping");
}
Ok(Message::<N>::Ping)
Message::<N>::Ping
}
1 => {
let mut reader = bytes.reader();
let message = Message::<N>::Pong(bincode::deserialize_from(&mut reader)?);
Ok(message)
Message::<N>::Pong(bincode::deserialize_from(&mut reader)?)
}
2 => {
let mut reader = bytes.reader();
let message = Message::<N>::BlockRequest(bincode::deserialize_from(&mut reader)?);
Ok(message)
}
3 => {
let mut reader = bytes.reader();
let message = Message::<N>::BlockResponse(Block::read_le(&mut reader)?);
Ok(message)
}
4 => {
let mut reader = bytes.reader();
let message = Message::<N>::TransactionBroadcast(Transaction::read_le(&mut reader)?);
Ok(message)
}
5 => {
let mut reader = bytes.reader();
let message = Message::<N>::BlockBroadcast(Block::read_le(&mut reader)?);
Ok(message)
Message::<N>::BlockRequest(bincode::deserialize_from(&mut reader)?)
}
3 => Message::<N>::BlockResponse(Data::Buffer(bytes.freeze())),
4 => Message::<N>::TransactionBroadcast(Data::Buffer(bytes.freeze())),
5 => Message::<N>::BlockBroadcast(Data::Buffer(bytes.freeze())),
_ => bail!("Unknown message ID"),
}
}
}

impl<N: Network> FromBytes for Message<N> {
/// Reads the message from a buffer.
fn read_le<R: Read>(mut reader: R) -> IoResult<Self> {
let id = u16::read_le(&mut reader)?;

let message = match id {
0 => Self::Ping,
1 => Self::Pong(u32::read_le(&mut reader)?),
2 => Self::BlockRequest(u32::read_le(&mut reader)?),
3 => Self::BlockResponse(Block::read_le(&mut reader)?),
4 => Self::TransactionBroadcast(Transaction::read_le(&mut reader)?),
5 => Self::BlockBroadcast(Block::read_le(&mut reader)?),
6.. => return Err(error(format!("Failed to decode message id {id}"))),
};

Ok(message)
}
}

impl<N: Network> ToBytes for Message<N> {
/// Writes the message to a buffer.
fn write_le<W: Write>(&self, mut writer: W) -> IoResult<()> {
self.id().write_le(&mut writer)?;

match self {
Message::Ping => Ok(()),
Message::Pong(height) => height.write_le(&mut writer),
Message::BlockRequest(height) => height.write_le(&mut writer),
Message::BlockResponse(block) => block.write_le(&mut writer),
Message::TransactionBroadcast(transaction) => transaction.write_le(&mut writer),
Message::BlockBroadcast(block) => block.write_le(&mut writer),
}
}
}

/// The maximum size of a message that can be transmitted in the network.
const MAXIMUM_MESSAGE_SIZE: usize = 128 * 1024 * 1024; // 128 MiB

Expand Down
Loading

0 comments on commit f8c0d95

Please sign in to comment.