Skip to content

Commit

Permalink
Added a test for issue of non closing connection when closing sender.
Browse files Browse the repository at this point in the history
  • Loading branch information
realcr committed Apr 3, 2019
1 parent 6a7e7ac commit 90fa588
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 1 deletion.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions components/net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,7 @@ futures-preview = {version = "0.3.0-alpha.13", features = ["compat"] }
log = "0.4"

bytes = "0.4"

[dev-dependencies]

env_logger = "0.6.0"
82 changes: 81 additions & 1 deletion components/net/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
use std::convert::TryInto;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};

use bytes::Bytes;

use env_logger;

use futures::executor::ThreadPool;
use futures::task::Spawn;
use futures::{SinkExt, StreamExt};
use futures::{FutureExt, SinkExt, StreamExt};
use futures::compat::{Future01CompatExt, Stream01CompatExt};
use futures_01::stream::Stream as Stream01;
use futures_01::sink::Sink as Sink01;


use common::conn::{FutTransform, Listener};
use proto::net::messages::NetAddress;
Expand All @@ -13,6 +21,8 @@ use crate::tcp_connector::TcpConnector;
use crate::tcp_listener::TcpListener;

use tokio::net::TcpListener as TokioTcpListener;
use tokio::net::TcpStream;
use tokio::codec::{Framed, LengthDelimitedCodec};

/// Get an available port we can listen on
fn get_available_port_v4() -> u16 {
Expand Down Expand Up @@ -110,6 +120,7 @@ async fn task_net_connector_v4_drop_sender<S>(spawner: S)
where
S: Spawn + Clone + Send + 'static,
{

let available_port = get_available_port_v4();
let loopback = Ipv4Addr::new(127, 0, 0, 1);
let socket_addr = SocketAddr::new(IpAddr::V4(loopback), available_port);
Expand All @@ -136,6 +147,75 @@ where

#[test]
fn test_net_connector_v4_drop_sender() {
env_logger::init();
let mut thread_pool = ThreadPool::new().unwrap();
thread_pool.run(task_net_connector_v4_drop_sender(thread_pool.clone()));
}


fn tcp_stream_to_conn_pair_01(
tcp_stream: TcpStream,
max_frame_length: usize,
) -> (impl Sink01<SinkItem=Vec<u8>>, impl Stream01<Item=Vec<u8>>)
{
let mut codec = LengthDelimitedCodec::new();
codec.set_max_frame_length(max_frame_length);
let (sender_01, receiver_01) = Framed::new(tcp_stream, codec).split();

// Conversion layer between Vec<u8> to Bytes:
let sender_01 = sender_01
.sink_map_err(|_| {
info!("tcp_stream_to_conn_pair(): sender_01 error!");
()
})
.with(|vec: Vec<u8>| -> Result<Bytes, ()> { Ok(Bytes::from(vec)) });

let receiver_01 = receiver_01.map(|bytes| bytes.to_vec());

(sender_01, receiver_01)
}


async fn task_tcp_stream_to_conn_pair_drop_sender<S>(_spawner: S)
where
S: Spawn + Clone + Send + 'static,
{

let max_frame_length = 0x100;

let available_port = get_available_port_v4();
let loopback = Ipv4Addr::new(127, 0, 0, 1);
let socket_addr = SocketAddr::new(IpAddr::V4(loopback), available_port);

// Set up server side (listen):
let listener = TokioTcpListener::bind(&socket_addr).unwrap();

let mut incoming_conns = listener.incoming().compat();
let fut_server_tcp_stream = incoming_conns.next();

// Set up client side (connect):
let fut_client_tcp_stream = TcpStream::connect(&socket_addr).compat();

let (opt_server_tcp_stream, opt_client_tcp_stream) =
await!(fut_server_tcp_stream
.join(fut_client_tcp_stream));

let (_server_sender, server_receiver) = tcp_stream_to_conn_pair_01(opt_server_tcp_stream.unwrap().unwrap(), max_frame_length);
let (client_sender, _client_receiver) = tcp_stream_to_conn_pair_01(opt_client_tcp_stream.unwrap(), max_frame_length);

// Close the client sender:
drop(client_sender);

// Expect the server to notice that the connection was closed:
let mut server_receiver = server_receiver.compat();
let opt_message = await!(server_receiver.next());
assert!(opt_message.is_none());
}

#[test]
fn test_tcp_stream_to_conn_pair_drop_sender() {
let mut thread_pool = ThreadPool::new().unwrap();
thread_pool.run(task_tcp_stream_to_conn_pair_drop_sender(thread_pool.clone()));
}


0 comments on commit 90fa588

Please sign in to comment.