Skip to content

Commit

Permalink
remove unnecessary allocs and unsafe
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Whitehead <[email protected]>
andrewwhitehead committed Jul 14, 2022
1 parent 9e0eb7c commit 502cec9
Showing 6 changed files with 87 additions and 55 deletions.
39 changes: 24 additions & 15 deletions src/codec/command.rs
Original file line number Diff line number Diff line change
@@ -4,21 +4,28 @@ use crate::SocketType;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use std::collections::HashMap;
use std::convert::TryFrom;
use std::fmt::Display;

#[allow(clippy::upper_case_acronyms)]
#[derive(Debug, Copy, Clone)]
pub enum ZmqCommandName {
READY,
}

impl From<ZmqCommandName> for String {
fn from(c_name: ZmqCommandName) -> Self {
match c_name {
ZmqCommandName::READY => "READY".into(),
impl ZmqCommandName {
pub const fn as_str(&self) -> &'static str {
match self {
ZmqCommandName::READY => "READY",
}
}
}

impl Display for ZmqCommandName {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}

#[derive(Debug, Clone)]
pub struct ZmqCommand {
pub name: ZmqCommandName,
@@ -28,7 +35,7 @@ pub struct ZmqCommand {
impl ZmqCommand {
pub fn ready(socket: SocketType) -> Self {
let mut properties = HashMap::new();
properties.insert("Socket-Type".into(), format!("{}", socket).into());
properties.insert("Socket-Type".into(), socket.as_str().into());
Self {
name: ZmqCommandName::READY,
properties,
@@ -46,27 +53,29 @@ impl ZmqCommand {
}
}

impl TryFrom<BytesMut> for ZmqCommand {
impl TryFrom<Bytes> for ZmqCommand {
type Error = CodecError;

fn try_from(mut buf: BytesMut) -> Result<Self, Self::Error> {
fn try_from(mut buf: Bytes) -> Result<Self, Self::Error> {
let command_len = buf.get_u8() as usize;
// command-name-char = ALPHA according to https://rfc.zeromq.org/spec:23/ZMTP/
let command_name =
unsafe { String::from_utf8_unchecked(buf.split_to(command_len).to_vec()) };
let command = match command_name.as_str() {
"READY" => ZmqCommandName::READY,
_ => return Err(CodecError::Command("Uknown command received")),
let command = match &buf[..command_len] {
b"READY" => ZmqCommandName::READY,
_ => return Err(CodecError::Command("Unknown command received")),
};
buf.advance(command_len);
let mut properties = HashMap::new();

while !buf.is_empty() {
// Collect command properties
let prop_len = buf.get_u8() as usize;
let property = unsafe { String::from_utf8_unchecked(buf.split_to(prop_len).to_vec()) };
let property = match String::from_utf8(buf.split_to(prop_len).to_vec()) {
Ok(p) => p,
Err(_) => return Err(CodecError::Decode("Invalid property identifier")),
};

let prop_val_len = buf.get_u32() as usize;
let prop_value = buf.split_to(prop_val_len).freeze();
let prop_value = buf.split_to(prop_val_len);
properties.insert(property, prop_value);
}
Ok(Self {
@@ -80,7 +89,7 @@ impl From<ZmqCommand> for BytesMut {
fn from(command: ZmqCommand) -> Self {
let mut message_len = 0;

let command_name: String = command.name.into();
let command_name = command.name.as_str();
message_len += command_name.len() + 1;
for (prop, val) in command.properties.iter() {
message_len += prop.len() + 1;
10 changes: 5 additions & 5 deletions src/codec/greeting.rs
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@ impl Default for ZmqGreeting {
fn default() -> Self {
Self {
version: (3, 0),
mechanism: ZmqMechanism::NULL,
mechanism: ZmqMechanism::default(),
as_server: false,
}
}
@@ -27,12 +27,12 @@ impl TryFrom<Bytes> for ZmqGreeting {
type Error = CodecError;

fn try_from(value: Bytes) -> Result<Self, Self::Error> {
if !(value[0] == 0xff && value[9] == 0x7f) {
if value.len() != 64 || !(value[0] == 0xff && value[9] == 0x7f) {
return Err(CodecError::Greeting("Failed to parse greeting"));
}
Ok(ZmqGreeting {
version: (value[10], value[11]),
mechanism: ZmqMechanism::try_from(value[12..32].to_vec())?,
mechanism: ZmqMechanism::try_from(&value[12..32])?,
as_server: value[32] == 0x01,
})
}
@@ -45,10 +45,10 @@ impl From<ZmqGreeting> for BytesMut {
data[9] = 0x7f;
data[10] = greet.version.0;
data[11] = greet.version.1;
let mech = format!("{}", greet.mechanism);
let mech = greet.mechanism.as_str();
data[12..12 + mech.len()].copy_from_slice(mech.as_bytes());
data[32] = greet.as_server.into();
let mut bytes = BytesMut::new();
let mut bytes = BytesMut::with_capacity(64);
bytes.extend_from_slice(&data);
bytes
}
37 changes: 24 additions & 13 deletions src/codec/mechanism.rs
Original file line number Diff line number Diff line change
@@ -11,29 +11,40 @@ pub enum ZmqMechanism {
CURVE,
}

impl Display for ZmqMechanism {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl Default for ZmqMechanism {
fn default() -> Self {
ZmqMechanism::NULL
}
}

impl ZmqMechanism {
pub const fn as_str(&self) -> &'static str {
match self {
ZmqMechanism::NULL => write!(f, "NULL"),
ZmqMechanism::PLAIN => write!(f, "PLAIN"),
ZmqMechanism::CURVE => write!(f, "CURVE"),
ZmqMechanism::NULL => "NULL",
ZmqMechanism::PLAIN => "PLAIN",
ZmqMechanism::CURVE => "CURVE",
}
}
}

impl TryFrom<Vec<u8>> for ZmqMechanism {
impl Display for ZmqMechanism {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}

impl TryFrom<&[u8]> for ZmqMechanism {
type Error = CodecError;

fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
let mech = value.split(|x| *x == 0x0).next().unwrap_or(b"");
fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
let mech = value.split(|x| *x == 0x0).next().unwrap_or_default();
// mechanism-char = "A"-"Z" | DIGIT
// | "-" | "_" | "." | "+" | %x0
// according to https://rfc.zeromq.org/spec:23/ZMTP/
let mechanism = unsafe { String::from_utf8_unchecked(mech.to_vec()) };
match mechanism.as_str() {
"NULL" => Ok(ZmqMechanism::NULL),
"PLAIN" => Ok(ZmqMechanism::PLAIN),
"CURVE" => Ok(ZmqMechanism::CURVE),
match mech {
b"NULL" => Ok(ZmqMechanism::NULL),
b"PLAIN" => Ok(ZmqMechanism::PLAIN),
b"CURVE" => Ok(ZmqMechanism::CURVE),
_ => Err(CodecError::Mechanism("Failed to parse ZmqMechanism")),
}
}
6 changes: 3 additions & 3 deletions src/codec/zmq_codec.rs
Original file line number Diff line number Diff line change
@@ -28,7 +28,7 @@ pub struct ZmqCodec {
state: DecoderState,
waiting_for: usize, // Number of bytes needed to decode frame
// Needed to store incoming multipart message
// This allows to incapsulate it's processing inside codec and not expose
// This allows to encapsulate its processing inside codec and not expose
// internal details to higher levels
buffered_message: Option<ZmqMessage>,
}
@@ -37,7 +37,7 @@ impl ZmqCodec {
pub fn new() -> Self {
Self {
state: DecoderState::Greeting,
waiting_for: 64, // len of the greeting frame,
waiting_for: 64, // len of the greeting frame
buffered_message: None,
}
}
@@ -95,7 +95,7 @@ impl Decoder for ZmqCodec {
self.state = DecoderState::FrameHeader;
self.waiting_for = 1;
if frame.command {
return Ok(Some(Message::Command(ZmqCommand::try_from(data)?)));
return Ok(Some(Message::Command(ZmqCommand::try_from(data.freeze())?)));
}

// process incoming message frame
16 changes: 11 additions & 5 deletions src/endpoint/transport.rs
Original file line number Diff line number Diff line change
@@ -13,6 +13,15 @@ pub enum Transport {
Ipc,
}

impl Transport {
pub const fn as_str(&self) -> &'static str {
match self {
Transport::Tcp => "tcp",
Transport::Ipc => "ipc",
}
}
}

impl FromStr for Transport {
type Err = EndpointError;

@@ -25,6 +34,7 @@ impl FromStr for Transport {
Ok(result)
}
}

impl TryFrom<&str> for Transport {
type Error = EndpointError;

@@ -35,10 +45,6 @@ impl TryFrom<&str> for Transport {

impl fmt::Display for Transport {
fn fmt(&self, f: &mut fmt::Formatter) -> std::result::Result<(), std::fmt::Error> {
let s = match self {
Transport::Tcp => "tcp",
Transport::Ipc => "ipc",
};
write!(f, "{}", s)
f.write_str(self.as_str())
}
}
34 changes: 20 additions & 14 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -72,6 +72,25 @@ pub enum SocketType {
STREAM = 11,
}

impl SocketType {
pub const fn as_str(&self) -> &'static str {
match self {
SocketType::PAIR => "PAIR",
SocketType::PUB => "PUB",
SocketType::SUB => "SUB",
SocketType::REQ => "REQ",
SocketType::REP => "REP",
SocketType::DEALER => "DEALER",
SocketType::ROUTER => "ROUTER",
SocketType::PULL => "PULL",
SocketType::PUSH => "PUSH",
SocketType::XPUB => "XPUB",
SocketType::XSUB => "XSUB",
SocketType::STREAM => "STREAM",
}
}
}

impl TryFrom<&str> for SocketType {
type Error = ZmqError;

@@ -96,20 +115,7 @@ impl TryFrom<&str> for SocketType {

impl Display for SocketType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SocketType::PAIR => write!(f, "PAIR"),
SocketType::PUB => write!(f, "PUB"),
SocketType::SUB => write!(f, "SUB"),
SocketType::REQ => write!(f, "REQ"),
SocketType::REP => write!(f, "REP"),
SocketType::DEALER => write!(f, "DEALER"),
SocketType::ROUTER => write!(f, "ROUTER"),
SocketType::PULL => write!(f, "PULL"),
SocketType::PUSH => write!(f, "PUSH"),
SocketType::XPUB => write!(f, "XPUB"),
SocketType::XSUB => write!(f, "XSUB"),
SocketType::STREAM => write!(f, "STREAM"),
}
f.write_str(self.as_str())
}
}

0 comments on commit 502cec9

Please sign in to comment.