Skip to content

Commit

Permalink
[network] Use Vec<u8> in wire spec to avoid dependency
Browse files Browse the repository at this point in the history
Closes: aptos-labs#3315
Approved by: davidiw
  • Loading branch information
bothra90 authored and bors-libra committed Apr 8, 2020
1 parent b31d012 commit 6abd90e
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 33 deletions.
9 changes: 4 additions & 5 deletions network/src/peer/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use crate::{
},
ProtocolId,
};
use bytes::Bytes;
use futures::{future::join, io::AsyncWriteExt, stream::StreamExt, SinkExt};
use libra_types::PeerId;
use memsocket::MemorySocket;
Expand Down Expand Up @@ -175,7 +174,7 @@ fn peer_send_message() {
let send_msg = NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id: PROTOCOL,
priority: 0,
raw_msg: Bytes::from_static(b"hello world"),
raw_msg: Vec::from("hello world"),
});
let recv_msg = send_msg.clone();

Expand Down Expand Up @@ -212,7 +211,7 @@ fn peer_recv_message() {
let send_msg = NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id: PROTOCOL,
priority: 0,
raw_msg: Bytes::from_static(b"hello world"),
raw_msg: Vec::from("hello world"),
});
let recv_msg = send_msg.clone();

Expand Down Expand Up @@ -269,12 +268,12 @@ fn peer_open_substream_simultaneous() {
let msg_a = NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id: PROTOCOL,
priority: 0,
raw_msg: Bytes::from_static(b"hello world"),
raw_msg: Vec::from("hello world"),
});
let msg_b = NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id: PROTOCOL,
priority: 0,
raw_msg: Bytes::from_static(b"namaste"),
raw_msg: Vec::from("namaste"),
});

// Send open substream requests to both peer_a and peer_b
Expand Down
4 changes: 2 additions & 2 deletions network/src/protocols/direct_send/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl DirectSend {
.observe(data.len() as f64);
let notif = DirectSendNotification::RecvMessage(Message {
protocol,
mdata: data,
mdata: Bytes::from(data),
});
if let Err(err) = self.ds_notifs_tx.send(notif).await {
warn!(
Expand Down Expand Up @@ -181,7 +181,7 @@ impl DirectSend {
protocol_id,
// TODO: Use default priority for now. To be exposed via network API.
priority: Priority::default(),
raw_msg: msg.mdata,
raw_msg: Vec::from(msg.mdata.as_ref()),
}),
protocol_id,
)
Expand Down
13 changes: 7 additions & 6 deletions network/src/protocols/direct_send/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ use bytes::Bytes;
use futures::{sink::SinkExt, stream::StreamExt};
use libra_logger::debug;
use libra_types::PeerId;
use once_cell::sync::Lazy;
use serial_test::serial;
use tokio::runtime::{Handle, Runtime};

const PROTOCOL_1: ProtocolId = ProtocolId::ConsensusDirectSend;
const PROTOCOL_2: ProtocolId = ProtocolId::MempoolDirectSend;
static MESSAGE_1: Bytes = Bytes::from_static(b"Direct Send 1");
static MESSAGE_2: Bytes = Bytes::from_static(b"Direct Send 2");
static MESSAGE_1: Lazy<Vec<u8>> = Lazy::new(|| Vec::from("Direct Send 1"));
static MESSAGE_2: Lazy<Vec<u8>> = Lazy::new(|| Vec::from("Direct Send 2"));

// counters are static and therefore shared across tests. This can sometimes lead to
// surprising counter readings if tests are run in parallel. Since we use counter values in some
Expand Down Expand Up @@ -59,7 +60,7 @@ fn start_direct_send_actor(
async fn expect_network_provider_recv_message(
ds_notifs_rx: &mut channel::Receiver<DirectSendNotification>,
expected_protocol: ProtocolId,
expected_message: Bytes,
expected_message: Vec<u8>,
) {
match ds_notifs_rx.next().await.unwrap() {
DirectSendNotification::RecvMessage(msg) => {
Expand Down Expand Up @@ -144,7 +145,7 @@ fn test_outbound_msg() {
let f_network_provider = async move {
let msg_sent = DirectSendRequest::SendMessage(Message {
protocol: PROTOCOL_1,
mdata: MESSAGE_1.clone(),
mdata: Bytes::from(MESSAGE_1.clone()),
});
debug!("Sending message");
ds_requests_tx.send(msg_sent).await.unwrap();
Expand Down Expand Up @@ -182,15 +183,15 @@ fn test_send_failure() {
ds_requests_tx
.send(DirectSendRequest::SendMessage(Message {
protocol: PROTOCOL_1,
mdata: MESSAGE_1.clone(),
mdata: Bytes::from(MESSAGE_1.clone()),
}))
.await
.unwrap();
// Request DirectSend to send the second message
ds_requests_tx
.send(DirectSendRequest::SendMessage(Message {
protocol: PROTOCOL_1,
mdata: MESSAGE_2.clone(),
mdata: Bytes::from(MESSAGE_2.clone()),
}))
.await
.unwrap();
Expand Down
6 changes: 2 additions & 4 deletions network/src/protocols/rpc/fuzzing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,14 @@ use crate::{
},
ProtocolId,
};
use bytes::Bytes;
use futures::{
future::{self, FutureExt},
stream::StreamExt,
};
use libra_proptest_helpers::ValueGenerator;
use libra_types::PeerId;
use proptest::{arbitrary::any, collection::vec, prop_oneof, strategy::Strategy};
use std::{io, iter::FromIterator};
use std::io;
use tokio::runtime;
use tokio_util::codec::{Encoder, LengthDelimitedCodec};

Expand Down Expand Up @@ -64,8 +63,7 @@ pub fn generate_corpus(gen: &mut ValueGenerator) -> Vec<u8> {
pub fn fuzzer(data: &[u8]) {
let (notification_tx, mut notification_rx) = channel::new_test(8);
let (peer_reqs_tx, mut peer_reqs_rx) = channel::new_test(8);
let data = Vec::from(data);
let raw_request = Bytes::from_iter(data.into_iter());
let raw_request = Vec::from(data);
let inbound_request = RpcRequest {
protocol_id: TEST_PROTOCOL,
request_id: 0,
Expand Down
8 changes: 4 additions & 4 deletions network/src/protocols/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ async fn handle_outbound_rpc_inner(
// TODO: Use default priority for now. To be exposed via network API.
priority: Priority::default(),
protocol_id: protocol,
raw_request: req_data,
raw_request: Vec::from(req_data.as_ref()),
});

// Start timer to collect RPC latency.
Expand Down Expand Up @@ -506,7 +506,7 @@ async fn handle_outbound_rpc_inner(
counters::LIBRA_NETWORK_RPC_BYTES
.with_label_values(&["response", "received"])
.observe(res_data.len() as f64);
Ok(res_data)
Ok(Bytes::from(res_data))
}

async fn handle_inbound_request_inner(
Expand Down Expand Up @@ -535,7 +535,7 @@ async fn handle_inbound_request_inner(
let (res_tx, res_rx) = oneshot::channel();
let notification = RpcNotification::RecvRpc(InboundRpcRequest {
protocol: request.protocol_id,
data: req_data,
data: Bytes::from(req_data),
res_tx,
});
notification_tx.send(notification).await?;
Expand All @@ -556,7 +556,7 @@ async fn handle_inbound_request_inner(
peer_id.short_str()
);
let response = RpcResponse {
raw_response: res_data,
raw_response: Vec::from(res_data.as_ref()),
request_id,
priority: request.priority,
};
Expand Down
13 changes: 6 additions & 7 deletions network/src/protocols/rpc/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,15 @@ fn create_network_request(
request_id,
protocol_id,
priority: Priority::default(),
raw_request,
raw_request: Vec::from(raw_request.as_ref()),
})
}

fn create_network_response(request_id: RequestId, raw_response: Bytes) -> NetworkMessage {
NetworkMessage::RpcResponse(RpcResponse {
request_id,
priority: Priority::default(),
raw_response,
raw_response: Vec::from(raw_response.as_ref()),
})
}

Expand Down Expand Up @@ -299,12 +299,11 @@ fn outbound_rpc_timeout() {

let protocol_id = RPC_PROTOCOL_A;
let req_data = Bytes::from_static(b"hello");
let message = NetworkMessage::RpcRequest(RpcRequest {
request_id: 0, // This is the first request.
let message = create_network_request(
0, // This is the first request.
protocol_id,
priority: Priority::default(),
raw_request: req_data.clone(),
});
req_data.clone(),
);

let f_mock_peer = expect_successful_send(&mut peer_reqs_rx, protocol_id, message);

Expand Down
7 changes: 3 additions & 4 deletions network/src/protocols/wire/messaging/v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
//! These should serialize as per [link](TODO: Add ref).
use crate::protocols::wire::handshake::v1::MessagingProtocolVersion;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};

Expand Down Expand Up @@ -67,7 +66,7 @@ pub struct RpcRequest {
/// Request priority in the range 0..=255.
pub priority: Priority,
/// Request payload. This will be parsed by the application-level handler.
pub raw_request: Bytes,
pub raw_request: Vec<u8>,
}

#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
Expand All @@ -78,7 +77,7 @@ pub struct RpcResponse {
/// corresponding request.
pub priority: Priority,
/// Response payload.
pub raw_response: Bytes,
pub raw_response: Vec<u8>,
}

#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
Expand All @@ -88,5 +87,5 @@ pub struct DirectSendMsg {
/// Message priority in the range 0..=255.
pub priority: Priority,
/// Message payload.
pub raw_msg: Bytes,
pub raw_msg: Vec<u8>,
}
2 changes: 1 addition & 1 deletion network/src/protocols/wire/messaging/v1/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn rpc_request() -> lcs::Result<()> {
request_id: 25,
protocol_id: ProtocolId::ConsensusRpc,
priority: 0,
raw_request: Bytes::from_static(&[0, 1, 2, 3]),
raw_request: [0, 1, 2, 3].to_vec(),
};
assert_eq!(
lcs::to_bytes(&rpc_request)?,
Expand Down

0 comments on commit 6abd90e

Please sign in to comment.