Skip to content

Commit

Permalink
Added some load testing for the deucalion server
Browse files Browse the repository at this point in the history
The named pipe server should be able to handle high bursts of traffic
even with default settings.

Nevertheless, the namedpipe stream implementation was extracted from
parity-ipc-tokio just to give ourselves more control over it.
  • Loading branch information
ff14wed committed Mar 8, 2023
1 parent dca795a commit 29a530d
Show file tree
Hide file tree
Showing 4 changed files with 281 additions and 5 deletions.
7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ name = "deucalion"
crate-type = ["dylib"]

[dependencies]
tokio = { version = "1.25.0", features = ["io-util", "sync", "rt", "rt-multi-thread", "macros"] }
tokio = { version = "1.25.0", features = ["io-util", "sync", "rt", "rt-multi-thread", "net", "time", "macros"] }
tokio-util = { version = "0.7.4", features = ["codec", "compat"] }
parity-tokio-ipc = "0.9"
futures = "0.3"
anyhow = "1.0"
thiserror = "1.0"
Expand All @@ -30,3 +29,7 @@ once_cell = "1.17"
winapi = { version = "0.3", features = [
"minwindef", "minwinbase", "processthreadsapi", "libloaderapi", "consoleapi", "wincon"
] }

[dev-dependencies]
ntest = "*"
rand = "0.8.5"
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use tokio::select;
use tokio::sync::{mpsc, oneshot, Mutex};

mod hook;
mod namedpipe;
mod procloader;
mod rpc;
mod server;
Expand Down
184 changes: 184 additions & 0 deletions src/namedpipe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
This implementation was adapted from
https://github.com/paritytech/parity-tokio-ipc/ under the following license:
Copyright (c) 2017 Nikolay Volf
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
*/

use winapi::shared::winerror::ERROR_PIPE_BUSY;

use futures::Stream;
use std::io;
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use tokio::io::{AsyncRead, AsyncWrite};

use tokio::net::windows::named_pipe;

enum NamedPipe {
Server(named_pipe::NamedPipeServer),
Client(named_pipe::NamedPipeClient),
}

const PIPE_AVAILABILITY_TIMEOUT: Duration = Duration::from_secs(5);

/// Endpoint implementation for windows
pub struct Endpoint {
path: String,
created_listener: bool,
}

impl Endpoint {
/// Stream of incoming connections
pub fn incoming(
mut self,
) -> io::Result<impl Stream<Item = io::Result<impl AsyncRead + AsyncWrite>> + 'static> {
let pipe = self.create_listener()?;

let stream =
futures::stream::try_unfold((pipe, self), |(listener, mut endpoint)| async move {
let () = listener.connect().await?;

let new_listener = endpoint.create_listener()?;

let conn = Connection::wrap(NamedPipe::Server(listener));

Ok(Some((conn, (new_listener, endpoint))))
});

Ok(stream)
}

fn create_listener(&mut self) -> io::Result<named_pipe::NamedPipeServer> {
let server = named_pipe::ServerOptions::new()
.first_pipe_instance(!self.created_listener)
.reject_remote_clients(true)
.access_inbound(true)
.access_outbound(true)
.in_buffer_size(65536)
.out_buffer_size(65536)
.create(&self.path)?;
self.created_listener = true;

Ok(server)
}

/// Make new connection using the provided path and running event pool.
pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<Connection> {
let path = path.as_ref();

// There is not async equivalent of waiting for a named pipe in Windows,
// so we keep trying or sleeping for a bit, until we hit a timeout
let attempt_start = Instant::now();
let client = loop {
match named_pipe::ClientOptions::new()
.read(true)
.write(true)
.open(path)
{
Ok(client) => break client,
Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => {
if attempt_start.elapsed() < PIPE_AVAILABILITY_TIMEOUT {
tokio::time::sleep(Duration::from_millis(50)).await;
continue;
} else {
return Err(e);
}
}
Err(e) => return Err(e),
}
};

Ok(Connection::wrap(NamedPipe::Client(client)))
}

/// New IPC endpoint at the given path
pub fn new(path: String) -> Self {
Endpoint {
path,
created_listener: false,
}
}
}

/// IPC connection.
pub struct Connection {
inner: NamedPipe,
}

impl Connection {
/// Wraps an existing named pipe
fn wrap(pipe: NamedPipe) -> Self {
Self { inner: pipe }
}
}

impl AsyncRead for Connection {
fn poll_read(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let this = Pin::into_inner(self);
match this.inner {
NamedPipe::Client(ref mut c) => Pin::new(c).poll_read(ctx, buf),
NamedPipe::Server(ref mut s) => Pin::new(s).poll_read(ctx, buf),
}
}
}

impl AsyncWrite for Connection {
fn poll_write(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
let this = Pin::into_inner(self);
match this.inner {
NamedPipe::Client(ref mut c) => Pin::new(c).poll_write(ctx, buf),
NamedPipe::Server(ref mut s) => Pin::new(s).poll_write(ctx, buf),
}
}

fn poll_flush(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
let this = Pin::into_inner(self);
match this.inner {
NamedPipe::Client(ref mut c) => Pin::new(c).poll_flush(ctx),
NamedPipe::Server(ref mut s) => Pin::new(s).poll_flush(ctx),
}
}

fn poll_shutdown(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
let this = Pin::into_inner(self);
match this.inner {
NamedPipe::Client(ref mut c) => Pin::new(c).poll_shutdown(ctx),
NamedPipe::Server(ref mut s) => Pin::new(s).poll_shutdown(ctx),
}
}
}
94 changes: 91 additions & 3 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use anyhow::{format_err, Error, Result};

use futures::{SinkExt, Stream, StreamExt};

use parity_tokio_ipc::{Endpoint, SecurityAttributes};
use crate::namedpipe::Endpoint;
use tokio::sync::{mpsc, Mutex};
use tokio_util::codec::Framed;

Expand Down Expand Up @@ -263,8 +263,7 @@ where
{
let (trigger, tripwire) = Tripwire::new();

let mut endpoint = Endpoint::new(pipe_name);
endpoint.set_security_attributes(SecurityAttributes::allow_everyone_create()?);
let endpoint = Endpoint::new(pipe_name);

let incoming = endpoint.incoming()?.take_until(tripwire);

Expand Down Expand Up @@ -293,3 +292,92 @@ where
}
Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
use ntest::timeout;
use rand::Rng;
use winapi::um::processthreadsapi;

#[tokio::test(flavor = "multi_thread")]
#[timeout(10_000)]
async fn named_pipe_load_test() {
let (signal_tx, signal_rx) = mpsc::channel(1);
let state = Arc::new(Mutex::new(Shared::new(signal_tx)));
let state_clone = state.clone();

let pid = unsafe { processthreadsapi::GetCurrentProcessId() };
let pipe_name = format!(r"\\.\pipe\deucalion-test-{}", pid);
let pipe_name_clone = pipe_name.clone();

tokio::spawn(async move {
if let Err(e) = run(pipe_name_clone, state, signal_rx, move |_: rpc::Payload| {
Ok(())
})
.await
{
panic!("Server should not fail to run: {:?}", e);
}
});

// Give the server some time to start
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

let client = Endpoint::connect(&pipe_name)
.await
.expect("Failed to connect client to server");

// Create a frame decoder that processes the client stream
let codec = rpc::PayloadCodec::new();
let frames = Framed::new(client, codec);
// This state isn't really used for anything
let (dummy_signal_tx, _) = mpsc::channel(1);
let dummy_state = Arc::new(Mutex::new(Shared::new(dummy_signal_tx)));
let mut peer = Peer::new(dummy_state, frames).await.unwrap();

// Handle the SERVER_HELLO message
let peer_message = peer.next().await.unwrap();
if let Ok(Message::Request(payload)) = peer_message {
assert_eq!(payload.ctx, 9000);
} else {
panic!("Did not properly receive Server Hello");
}

// Synchronously send many packets before the client can process them
const NUM_PACKETS: u32 = 10000;
for i in 0..NUM_PACKETS {
let mut dummy_data = Vec::from([0u8; 5000]);
rand::thread_rng().fill(&mut dummy_data[..]);

state_clone
.lock()
.await
.broadcast(rpc::Payload {
op: rpc::MessageOps::Debug,
ctx: i,
data: dummy_data,
})
.await;
}

// Test that every packet was received in order
let mut num_received = 0u32;
while let Some(result) = peer.next().await {
match result {
// A request was received from the current user
Ok(Message::Request(payload)) => {
assert_eq!(
payload.ctx, num_received,
"Received data from pipe does not match expected index!"
);
num_received += 1;
if num_received >= NUM_PACKETS {
return;
}
}
_ => (),
}
}
}
}

0 comments on commit 29a530d

Please sign in to comment.