Skip to content

Commit

Permalink
Noise handshake tests (exonum#702)
Browse files Browse the repository at this point in the history
  • Loading branch information
Pavel Mukhanov authored and stanislav-tkach committed Jun 18, 2018
1 parent 452aecd commit fa60009
Show file tree
Hide file tree
Showing 4 changed files with 439 additions and 97 deletions.
11 changes: 5 additions & 6 deletions exonum/src/events/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ use std::{cell::RefCell, collections::HashMap, io, net::SocketAddr, rc::Rc, time

use super::{error::{into_other, log_error, other_error, result_ok},
to_box};
use events::noise::{Handshake, HandshakeParams, NoiseHandshake};
use helpers::Milliseconds;
use messages::{Any, Connect, Message, RawMessage};

use events::noise::{HandshakeParams, NoiseHandshake};

const OUTGOING_CHANNEL_SIZE: usize = 10;

#[derive(Debug)]
Expand Down Expand Up @@ -148,9 +147,8 @@ impl ConnectionsPool {
Ok(sock)
})
.and_then(move |sock| {
NoiseHandshake::send(&handshake_params, sock).and_then(|framed|{
Ok(framed)
})
let handshake = NoiseHandshake::initiator(&handshake_params);
handshake.send(sock)
})
// Connect socket with the outgoing channel
.and_then(move |stream| {
Expand Down Expand Up @@ -364,7 +362,8 @@ impl Listener {
trace!("Accepted incoming connection with peer={}", addr);
let network_tx = network_tx.clone();

let stream = NoiseHandshake::listen(&handshake_params, sock).flatten_stream();
let handshake = NoiseHandshake::responder(&handshake_params);
let stream = handshake.listen(sock).flatten_stream();

let connection_handler = stream
.into_future()
Expand Down
148 changes: 86 additions & 62 deletions exonum/src/events/noise/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use byteorder::{ByteOrder, LittleEndian};
use futures::future::{done, Future};
use tokio_core::net::TcpStream;
use tokio_io::{codec::Framed,
io::{read_exact, write_all},
AsyncRead};
AsyncRead,
AsyncWrite};

use std::io;

use crypto::{PublicKey, SecretKey};
use events::noise::wrapper::NOISE_MAX_HANDSHAKE_MESSAGE_LENGTH;
use events::{codec::MessagesCodec,
noise::wrapper::{NoiseWrapper, HANDSHAKE_HEADER_LENGTH}};

Expand All @@ -30,7 +30,7 @@ pub mod wrapper;
#[cfg(test)]
mod tests;

type HandshakeResult = Box<Future<Item = Framed<TcpStream, MessagesCodec>, Error = io::Error>>;
type HandshakeResult<S> = Box<Future<Item = Framed<S, MessagesCodec>, Error = io::Error>>;

#[derive(Debug, Clone)]
/// Params needed to establish secured connection using Noise Protocol.
Expand All @@ -40,80 +40,104 @@ pub struct HandshakeParams {
pub max_message_len: u32,
}

pub trait Handshake {
fn listen<S: AsyncRead + AsyncWrite + 'static>(self, stream: S) -> HandshakeResult<S>;
fn send<S: AsyncRead + AsyncWrite + 'static>(self, stream: S) -> HandshakeResult<S>;
}

#[derive(Debug)]
pub struct NoiseHandshake {}
pub struct NoiseHandshake {
noise: NoiseWrapper,
max_message_len: u32,
}

impl NoiseHandshake {
pub fn listen(params: &HandshakeParams, stream: TcpStream) -> HandshakeResult {
listen_handshake(stream, params)
pub fn initiator(params: &HandshakeParams) -> Self {
let noise = NoiseWrapper::initiator(params);
NoiseHandshake {
noise,
max_message_len: params.max_message_len,
}
}

pub fn send(params: &HandshakeParams, stream: TcpStream) -> HandshakeResult {
send_handshake(stream, params)
pub fn responder(params: &HandshakeParams) -> Self {
let noise = NoiseWrapper::responder(params);
NoiseHandshake {
noise,
max_message_len: params.max_message_len,
}
}
}

fn listen_handshake(stream: TcpStream, params: &HandshakeParams) -> HandshakeResult {
let max_message_len = params.max_message_len;
let mut noise = NoiseWrapper::responder(params);
let framed = read(stream).and_then(move |(stream, msg)| {
let _buf = noise.read_handshake_msg(&msg);
write_handshake_msg(&mut noise)
fn read_handshake_msg<S: AsyncRead + 'static>(
mut self,
stream: S,
) -> impl Future<Item = (S, Self), Error = io::Error> {
read(stream).and_then(move |(stream, msg)| {
self.noise.read_handshake_msg(&msg)?;
Ok((stream, self))
})
}

fn write_handshake_msg<S: AsyncWrite + 'static>(
mut self,
stream: S,
) -> impl Future<Item = (S, Self), Error = io::Error> {
done(self.noise.write_handshake_msg())
.map_err(|e| e.into())
.and_then(|(len, buf)| write(stream, &buf, len))
.and_then(|(stream, _msg)| read(stream))
.and_then(move |(stream, msg)| {
let _buf = noise.read_handshake_msg(&msg);
let noise = noise.into_transport_mode()?;
let framed = stream.framed(MessagesCodec::new(max_message_len, noise));
Ok(framed)
})
});

Box::new(framed)
.map(move |(stream, _)| (stream, self))
}

fn finalize<S: AsyncRead + AsyncWrite + 'static>(
self,
stream: S,
) -> Result<Framed<S, MessagesCodec>, io::Error> {
let noise = self.noise.into_transport_mode()?;
let framed = stream.framed(MessagesCodec::new(self.max_message_len, noise));
Ok(framed)
}
}

fn send_handshake(stream: TcpStream, params: &HandshakeParams) -> HandshakeResult {
let max_message_len = params.max_message_len;
let mut noise = NoiseWrapper::initiator(params);
let framed = write_handshake_msg(&mut noise)
.and_then(|(len, buf)| write(stream, &buf, len))
.and_then(|(stream, _msg)| read(stream))
.and_then(move |(stream, msg)| {
let _buf = noise.read_handshake_msg(&msg);
write_handshake_msg(&mut noise)
.and_then(|(len, buf)| write(stream, &buf, len))
.and_then(move |(stream, _msg)| {
let noise = noise.into_transport_mode()?;
let framed = stream.framed(MessagesCodec::new(max_message_len, noise));
Ok(framed)
})
});

Box::new(framed)
impl Handshake for NoiseHandshake {
fn listen<S>(self, stream: S) -> HandshakeResult<S>
where
S: AsyncRead + AsyncWrite + 'static,
{
let framed = self.read_handshake_msg(stream)
.and_then(|(stream, handshake)| handshake.write_handshake_msg(stream))
.and_then(|(stream, handshake)| handshake.read_handshake_msg(stream))
.and_then(|(stream, handshake)| handshake.finalize(stream));
Box::new(framed)
}

fn send<S>(self, stream: S) -> HandshakeResult<S>
where
S: AsyncRead + AsyncWrite + 'static,
{
let framed = self.write_handshake_msg(stream)
.and_then(|(stream, handshake)| handshake.read_handshake_msg(stream))
.and_then(|(stream, handshake)| handshake.write_handshake_msg(stream))
.and_then(|(stream, handshake)| handshake.finalize(stream));
Box::new(framed)
}
}

fn read(sock: TcpStream) -> Box<Future<Item = (TcpStream, Vec<u8>), Error = io::Error>> {
fn read<S: AsyncRead + 'static>(sock: S) -> impl Future<Item = (S, Vec<u8>), Error = io::Error> {
let buf = vec![0u8; HANDSHAKE_HEADER_LENGTH];
Box::new(
read_exact(sock, buf)
.and_then(|(stream, msg)| read_exact(stream, vec![0u8; msg[0] as usize])),
)
// First byte of handshake message is payload length, remaining bytes [1; len] is
// the handshake payload. Therefore, we need to read first byte and after that
// remaining payload.
read_exact(sock, buf).and_then(|(stream, msg)| read_exact(stream, vec![0u8; msg[0] as usize]))
}

fn write(
sock: TcpStream,
fn write<S: AsyncWrite + 'static>(
sock: S,
buf: &[u8],
len: usize,
) -> Box<Future<Item = (TcpStream, Vec<u8>), Error = io::Error>> {
let mut message = vec![0u8; HANDSHAKE_HEADER_LENGTH];
LittleEndian::write_u16(&mut message, len as u16);
message.extend_from_slice(&buf[0..len]);
Box::new(write_all(sock, message))
}
) -> impl Future<Item = (S, Vec<u8>), Error = io::Error> {
debug_assert!(len < NOISE_MAX_HANDSHAKE_MESSAGE_LENGTH);

fn write_handshake_msg(
noise: &mut NoiseWrapper,
) -> Box<Future<Item = (usize, Vec<u8>), Error = io::Error>> {
let res = noise.write_handshake_msg();
Box::new(done(res.map_err(|e| e.into())))
let mut message = vec![len as u8; HANDSHAKE_HEADER_LENGTH];
message.extend_from_slice(&buf[0..len]);
write_all(sock, message)
}
Loading

0 comments on commit fa60009

Please sign in to comment.