Skip to content

Commit

Permalink
feat: use To/FromBytes instead of bincode to serialize the Message
Browse files Browse the repository at this point in the history
Signed-off-by: ljedrz <[email protected]>
  • Loading branch information
ljedrz committed Oct 3, 2023
1 parent 0c01473 commit 1dfa8d0
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 43 deletions.
3 changes: 0 additions & 3 deletions node/router/messages/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ test = [ ]
[dependencies.anyhow]
version = "1.0"

[dependencies.bincode]
version = "1.0"

[dependencies.bytes]
version = "1"

Expand Down
11 changes: 6 additions & 5 deletions node/router/messages/src/block_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,18 @@ impl MessageTrait for BlockRequest {
/// Serializes the message into the buffer.
#[inline]
fn serialize<W: Write>(&self, writer: &mut W) -> Result<()> {
Ok(bincode::serialize_into(writer, &(self.start_height, self.end_height))?)
self.start_height.write_le(&mut *writer)?;
self.end_height.write_le(writer)?;
Ok(())
}

/// Deserializes the given buffer into a message.
#[inline]
fn deserialize(bytes: BytesMut) -> Result<Self> {
let mut reader = bytes.reader();
Ok(Self {
start_height: bincode::deserialize_from(&mut reader)?,
end_height: bincode::deserialize_from(&mut reader)?,
})
let start_height = u32::read_le(&mut reader)?;
let end_height = u32::read_le(&mut reader)?;
Ok(Self { start_height, end_height })
}
}

Expand Down
8 changes: 2 additions & 6 deletions node/router/messages/src/block_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,8 @@ impl<N: Network> MessageTrait for BlockResponse<N> {
/// Deserializes the given buffer into a message.
#[inline]
fn deserialize(bytes: BytesMut) -> Result<Self> {
let mut reader = bytes.reader();
let request = BlockRequest {
start_height: bincode::deserialize_from(&mut reader)?,
end_height: bincode::deserialize_from(&mut reader)?,
};
let blocks = Data::Buffer(reader.into_inner().freeze());
let request = BlockRequest::deserialize(bytes.clone())?;
let blocks = Data::Buffer(bytes.freeze());
Ok(Self { request, blocks })
}
}
21 changes: 13 additions & 8 deletions node/router/messages/src/challenge_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use super::*;

use bincode::Options;
use std::borrow::Cow;

#[derive(Clone, Debug, PartialEq, Eq)]
Expand All @@ -36,18 +35,24 @@ impl<N: Network> MessageTrait for ChallengeRequest<N> {
/// Serializes the message into the buffer.
#[inline]
fn serialize<W: Write>(&self, writer: &mut W) -> Result<()> {
Ok(bincode::serialize_into(
writer,
&(self.version, self.listener_port, self.node_type, self.address, self.nonce),
)?)
self.version.write_le(&mut *writer)?;
self.listener_port.write_le(&mut *writer)?;
self.node_type.write_le(&mut *writer)?;
self.address.write_le(&mut *writer)?;
self.nonce.write_le(&mut *writer)?;
Ok(())
}

/// Deserializes the given buffer into a message.
#[inline]
fn deserialize(bytes: BytesMut) -> Result<Self> {
let options =
bincode::options().with_limit(MAXIMUM_MESSAGE_SIZE as u64).with_fixint_encoding().allow_trailing_bytes();
let (version, listener_port, node_type, address, nonce) = options.deserialize_from(&mut bytes.reader())?;
let mut reader = bytes.reader();
let version = u32::read_le(&mut reader)?;
let listener_port = u16::read_le(&mut reader)?;
let node_type = NodeType::read_le(&mut reader)?;
let address = Address::<N>::read_le(&mut reader)?;
let nonce = u64::read_le(&mut reader)?;

Ok(Self { version, listener_port, node_type, address, nonce })
}
}
Expand Down
11 changes: 3 additions & 8 deletions node/router/messages/src/disconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,13 @@ impl MessageTrait for Disconnect {
/// Serializes the message into the buffer.
#[inline]
fn serialize<W: Write>(&self, writer: &mut W) -> Result<()> {
Ok(bincode::serialize_into(writer, &self.reason)?)
Ok(self.reason.write_le(writer)?)
}

/// Deserializes the given buffer into a message.
#[inline]
fn deserialize(bytes: BytesMut) -> Result<Self> {
if bytes.remaining() == 0 {
Ok(Self { reason: DisconnectReason::NoReasonGiven })
} else if let Ok(reason) = bincode::deserialize_from(&mut bytes.reader()) {
Ok(Self { reason })
} else {
bail!("Invalid 'Disconnect' message");
}
let mut reader = bytes.reader();
Ok(Disconnect { reason: DisconnectReason::read_le(&mut reader)? })
}
}
60 changes: 58 additions & 2 deletions node/router/messages/src/helpers/disconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use serde::{Deserialize, Serialize};
use snarkvm::prelude::{error, FromBytes, ToBytes};

use std::io;

/// The reason behind the node disconnecting from a peer.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum DisconnectReason {
/// The fork length limit was exceeded.
ExceededForkRange,
Expand Down Expand Up @@ -48,3 +50,57 @@ pub enum DisconnectReason {
/// The peer's listening port is closed.
YourPortIsClosed(u16),
}

impl ToBytes for DisconnectReason {
fn write_le<W: io::Write>(&self, mut writer: W) -> io::Result<()> {
match self {
Self::ExceededForkRange => 0u8.write_le(writer),
Self::InvalidChallengeResponse => 1u8.write_le(writer),
Self::InvalidForkDepth => 2u8.write_le(writer),
Self::INeedToSyncFirst => 3u8.write_le(writer),
Self::NoReasonGiven => 4u8.write_le(writer),
Self::ProtocolViolation => 5u8.write_le(writer),
Self::OutdatedClientVersion => 6u8.write_le(writer),
Self::PeerHasDisconnected => 7u8.write_le(writer),
Self::PeerRefresh => 8u8.write_le(writer),
Self::ShuttingDown => 9u8.write_le(writer),
Self::SyncComplete => 10u8.write_le(writer),
Self::TooManyFailures => 11u8.write_le(writer),
Self::TooManyPeers => 12u8.write_le(writer),
Self::YouNeedToSyncFirst => 13u8.write_le(writer),
Self::YourPortIsClosed(port) => {
14u8.write_le(&mut writer)?;
port.write_le(writer)
}
}
}
}

impl FromBytes for DisconnectReason {
fn read_le<R: io::Read>(mut reader: R) -> io::Result<Self>
where
Self: Sized,
{
match u8::read_le(&mut reader)? {
0 => Ok(Self::ExceededForkRange),
1 => Ok(Self::InvalidChallengeResponse),
2 => Ok(Self::InvalidForkDepth),
3 => Ok(Self::INeedToSyncFirst),
4 => Ok(Self::NoReasonGiven),
5 => Ok(Self::ProtocolViolation),
6 => Ok(Self::OutdatedClientVersion),
7 => Ok(Self::PeerHasDisconnected),
8 => Ok(Self::PeerRefresh),
9 => Ok(Self::ShuttingDown),
10 => Ok(Self::SyncComplete),
11 => Ok(Self::TooManyFailures),
12 => Ok(Self::TooManyPeers),
13 => Ok(Self::YouNeedToSyncFirst),
14 => {
let port = u16::read_le(reader)?;
Ok(Self::YourPortIsClosed(port))
}
_ => Err(error("Invalid disconnect reason")),
}
}
}
1 change: 0 additions & 1 deletion node/router/messages/src/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

mod codec;
pub use codec::MessageCodec;
pub(crate) use codec::MAXIMUM_MESSAGE_SIZE;

mod data;
pub use data::Data;
Expand Down
23 changes: 23 additions & 0 deletions node/router/messages/src/helpers/node_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use snarkvm::prelude::{error, FromBytes, ToBytes};

use serde::{Deserialize, Serialize};
use std::io;

#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
#[repr(u8)]
Expand Down Expand Up @@ -60,3 +63,23 @@ impl core::fmt::Display for NodeType {
})
}
}

impl ToBytes for NodeType {
fn write_le<W: io::Write>(&self, writer: W) -> io::Result<()> {
(*self as u8).write_le(writer)
}
}

impl FromBytes for NodeType {
fn read_le<R: io::Read>(reader: R) -> io::Result<Self>
where
Self: Sized,
{
match u8::read_le(reader)? {
0 => Ok(Self::Client),
1 => Ok(Self::Prover),
2 => Ok(Self::Validator),
_ => Err(error("Invalid node type")),
}
}
}
21 changes: 16 additions & 5 deletions node/router/messages/src/peer_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@

use super::*;

use bincode::Options;
use snarkvm::prelude::{FromBytes, ToBytes};

use std::borrow::Cow;

#[derive(Clone, Debug, PartialEq, Eq)]
Expand All @@ -32,14 +33,24 @@ impl MessageTrait for PeerResponse {
/// Serializes the message into the buffer.
#[inline]
fn serialize<W: Write>(&self, writer: &mut W) -> Result<()> {
Ok(bincode::serialize_into(writer, &self.peers)?)
(self.peers.len().min(u8::MAX as usize) as u8).write_le(&mut *writer)?;
for peer in &self.peers {
peer.write_le(&mut *writer)?;
}

Ok(())
}

/// Deserializes the given buffer into a message.
#[inline]
fn deserialize(bytes: BytesMut) -> Result<Self> {
let options =
bincode::options().with_limit(MAXIMUM_MESSAGE_SIZE as u64).with_fixint_encoding().allow_trailing_bytes();
Ok(Self { peers: options.deserialize_from(&mut bytes.reader())? })
let mut reader = bytes.reader();
let count = u8::read_le(&mut reader)?;
let mut peers = Vec::with_capacity(count as usize);
for _ in 0..count {
peers.push(SocketAddr::read_le(&mut reader)?);
}

Ok(Self { peers })
}
}
53 changes: 48 additions & 5 deletions node/router/messages/src/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use super::*;

use bincode::Options;
use indexmap::IndexMap;
use std::borrow::Cow;

#[derive(Clone, Debug, PartialEq, Eq)]
Expand All @@ -34,16 +34,59 @@ impl<N: Network> MessageTrait for Ping<N> {
/// Serializes the message into the buffer.
#[inline]
fn serialize<W: Write>(&self, writer: &mut W) -> Result<()> {
Ok(bincode::serialize_into(&mut *writer, &(self.version, self.node_type, &self.block_locators))?)
self.version.write_le(&mut *writer)?;
self.node_type.write_le(&mut *writer)?;
if let Some(locators) = &self.block_locators {
1u8.write_le(&mut *writer)?;

(locators.recents.len().min(u32::MAX as usize) as u32).write_le(&mut *writer)?;
for (height, hash) in locators.recents.iter() {
height.write_le(&mut *writer)?;
hash.write_le(&mut *writer)?;
}

(locators.checkpoints.len().min(u32::MAX as usize) as u32).write_le(&mut *writer)?;
for (height, hash) in locators.checkpoints.iter() {
height.write_le(&mut *writer)?;
hash.write_le(&mut *writer)?;
}
} else {
0u8.write_le(&mut *writer)?;
}

Ok(())
}

/// Deserializes the given buffer into a message.
#[inline]
fn deserialize(bytes: BytesMut) -> Result<Self> {
let options =
bincode::options().with_limit(MAXIMUM_MESSAGE_SIZE as u64).with_fixint_encoding().allow_trailing_bytes();
let mut reader = bytes.reader();
let (version, node_type, block_locators) = options.deserialize_from(&mut reader)?;

let version = u32::read_le(&mut reader)?;
let node_type = NodeType::read_le(&mut reader)?;

if u8::read_le(&mut reader)? == 0 {
return Ok(Self { version, node_type, block_locators: None });
}

let mut recents = IndexMap::new();
let num_recents = u32::read_le(&mut reader)?;
for _ in 0..num_recents {
let height = u32::read_le(&mut reader)?;
let hash = N::BlockHash::read_le(&mut reader)?;
recents.insert(height, hash);
}

let mut checkpoints = IndexMap::new();
let num_checkpoints = u32::read_le(&mut reader)?;
for _ in 0..num_checkpoints {
let height = u32::read_le(&mut reader)?;
let hash = N::BlockHash::read_le(&mut reader)?;
checkpoints.insert(height, hash);
}

let block_locators = Some(BlockLocators { recents, checkpoints });

Ok(Self { version, node_type, block_locators })
}
}
Expand Down

0 comments on commit 1dfa8d0

Please sign in to comment.