Skip to content

Commit

Permalink
async: use shutdown to implement server graceful shutdown.
Browse files Browse the repository at this point in the history
Using shutdown can be used instead of channel composition.

Signed-off-by: wllenyj <[email protected]>
  • Loading branch information
wllenyj committed Aug 10, 2022
1 parent 21b8e1a commit 54ab070
Showing 1 changed file with 38 additions and 41 deletions.
79 changes: 38 additions & 41 deletions src/asynchronous/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use tokio::{
net::UnixListener,
select, spawn,
sync::mpsc::{channel, Receiver, Sender},
sync::watch,
time::timeout,
};
#[cfg(target_os = "linux")]
Expand All @@ -33,16 +32,20 @@ use crate::common::{self, Domain};
use crate::context;
use crate::error::{get_status, Error, Result};
use crate::proto::{Code, MessageHeader, Status, MESSAGE_TYPE_REQUEST};
use crate::r#async::shutdown;
use crate::r#async::utils;
use crate::r#async::{MethodHandler, TtrpcContext};

const DEFAULT_CONN_SHUTDOWN_TIMEOUT: Duration = Duration::from_millis(5000);
const DEFAULT_SERVER_SHUTDOWN_TIMEOUT: Duration = Duration::from_millis(10000);

/// A ttrpc Server (async).
pub struct Server {
listeners: Vec<RawFd>,
methods: Arc<HashMap<String, Box<dyn MethodHandler + Send + Sync>>>,
domain: Option<Domain>,
disconnect_tx: Option<watch::Sender<i32>>,
all_conn_done_rx: Option<Receiver<i32>>,

shutdown: shutdown::Notifier,
stop_listen_tx: Option<Sender<Sender<RawFd>>>,
}

Expand All @@ -52,8 +55,7 @@ impl Default for Server {
listeners: Vec::with_capacity(1),
methods: Arc::new(HashMap::new()),
domain: None,
disconnect_tx: None,
all_conn_done_rx: None,
shutdown: shutdown::with_timeout(DEFAULT_SERVER_SHUTDOWN_TIMEOUT).0,
stop_listen_tx: None,
}
}
Expand Down Expand Up @@ -151,11 +153,7 @@ impl Server {
{
let methods = self.methods.clone();

let (disconnect_tx, close_conn_rx) = watch::channel(0);
self.disconnect_tx = Some(disconnect_tx);

let (conn_done_tx, all_conn_done_rx) = channel::<i32>(1);
self.all_conn_done_rx = Some(all_conn_done_rx);
let shutdown_waiter = self.shutdown.subscribe();

let (stop_listen_tx, mut stop_listen_rx) = channel(1);
self.stop_listen_tx = Some(stop_listen_tx);
Expand All @@ -174,8 +172,7 @@ impl Server {
fd,
stream,
methods.clone(),
close_conn_rx.clone(),
conn_done_tx.clone()
shutdown_waiter.clone(),
).await;
}
Err(e) => {
Expand All @@ -201,7 +198,6 @@ impl Server {
}
}
}
drop(conn_done_tx);
});
Ok(())
}
Expand All @@ -214,13 +210,16 @@ impl Server {
}

pub async fn disconnect(&mut self) {
if let Some(tx) = self.disconnect_tx.take() {
tx.send(1).ok();
}
self.shutdown.shutdown();

if let Some(mut rx) = self.all_conn_done_rx.take() {
rx.recv().await;
}
self.shutdown
.wait_all_exit()
.await
.map_err(|e| {
trace!("wait connection exit error: {}", e);
})
.ok();
trace!("wait connection exit.");
}

pub async fn stop_listen(&mut self) {
Expand All @@ -239,17 +238,17 @@ async fn spawn_connection_handler<S>(
fd: RawFd,
stream: S,
methods: Arc<HashMap<String, Box<dyn MethodHandler + Send + Sync>>>,
mut close_conn_rx: watch::Receiver<i32>,
conn_done_tx: Sender<i32>,
shutdown_waiter: shutdown::Waiter,
) where
S: AsyncRead + AsyncWrite + AsRawFd + Send + 'static,
{
let (req_done_tx, mut all_req_done_rx) = channel::<i32>(1);

spawn(async move {
let (mut reader, mut writer) = split(stream);
let (tx, mut rx): (Sender<Vec<u8>>, Receiver<Vec<u8>>) = channel(100);
let (client_disconnected_tx, client_disconnected_rx) = watch::channel(false);

let server_shutdown = shutdown_waiter.clone();
let (disconnect_notifier, disconnect_waiter) =
shutdown::with_timeout(DEFAULT_CONN_SHUTDOWN_TIMEOUT);

spawn(async move {
while let Some(buf) = rx.recv().await {
Expand All @@ -262,8 +261,7 @@ async fn spawn_connection_handler<S>(
loop {
let tx = tx.clone();
let methods = methods.clone();
let req_done_tx2 = req_done_tx.clone();
let mut client_disconnected_rx2 = client_disconnected_rx.clone();
let handler_shutdown_waiter = disconnect_waiter.clone();

select! {
resp = receive(&mut reader) => {
Expand All @@ -272,33 +270,32 @@ async fn spawn_connection_handler<S>(
spawn(async move {
select! {
_ = handle_request(tx, fd, methods, message) => {}
_ = client_disconnected_rx2.changed() => {}
_ = handler_shutdown_waiter.wait_shutdown() => {}
}

drop(req_done_tx2);
});
}
Err(e) => {
let _ = client_disconnected_tx.send(true);
disconnect_notifier.shutdown();
trace!("error {:?}", e);
break;
}
}
}
v = close_conn_rx.changed() => {
// 0 is the init value of this watch, not a valid signal
// is_err means the tx was dropped.
if v.is_err() || *close_conn_rx.borrow() != 0 {
info!("Stop accepting new connections.");
break;
}
_ = server_shutdown.wait_shutdown() => {
trace!("Receive shutdown.");
break;
}
}
}

drop(req_done_tx);
all_req_done_rx.recv().await;
drop(conn_done_tx);
// TODO: Don't disconnect_notifier.shutdown();
// Wait pedding request/stream to exit.
disconnect_notifier
.wait_all_exit()
.await
.map_err(|e| {
trace!("wait handler exit error: {}", e);
})
.ok();
});
}

Expand Down

0 comments on commit 54ab070

Please sign in to comment.