Skip to content

Commit

Permalink
fix(http): Connection checks for spurious timeouts
Browse files Browse the repository at this point in the history
We've been seeing a strange number of timeouts in our benchmarking.
Handling spurious timeouts as in this patch seems to fix it!

Note that managing the `timeout_start` needs to be done carefully. If
the current time is provided in the wrong place, it's possible requests
would never timeout.
  • Loading branch information
jwilm committed Oct 8, 2016
1 parent 588ef9d commit 934f2c4
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 13 deletions.
9 changes: 7 additions & 2 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,13 @@ where C: Connect,
trace!("connected and writable {:?}", seed.0);
rotor::Response::ok(
ClientFsm::Socket(
http::Conn::new(seed.0, seed.1, Next::write().timeout(scope.connect_timeout), scope.notifier())
.keep_alive(scope.keep_alive)
http::Conn::new(
seed.0,
seed.1,
Next::write().timeout(scope.connect_timeout),
scope.notifier(),
scope.now()
).keep_alive(scope.keep_alive)
)
)
} else {
Expand Down
95 changes: 85 additions & 10 deletions src/http/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::marker::PhantomData;
use std::mem;
use std::time::Duration;

use rotor::{self, EventSet, PollOpt, Scope};
use rotor::{self, EventSet, PollOpt, Scope, Time};

use http::{self, h1, Http1Message, Encoder, Decoder, Next, Next_, Reg, Control};
use http::channel;
Expand Down Expand Up @@ -176,13 +176,15 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> {
let next = handler.on_incoming(head, &self.transport);
trace!("handler.on_incoming() -> {:?}", next);

let now = scope.now();
match next.interest {
Next_::Read => self.read(scope, State::Http1(Http1 {
handler: handler,
reading: Reading::Body(decoder),
writing: Writing::Init,
keep_alive: keep_alive,
timeout: next.timeout,
timeout_start: Some(now),
_marker: PhantomData,
})),
Next_::Write => State::Http1(Http1 {
Expand All @@ -199,6 +201,7 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> {
writing: Writing::Head,
keep_alive: keep_alive,
timeout: next.timeout,
timeout_start: Some(now),
_marker: PhantomData,
}),
Next_::ReadWrite => self.read(scope, State::Http1(Http1 {
Expand All @@ -207,6 +210,7 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> {
writing: Writing::Head,
keep_alive: keep_alive,
timeout: next.timeout,
timeout_start: Some(now),
_marker: PhantomData,
})),
Next_::Wait => State::Http1(Http1 {
Expand All @@ -215,6 +219,7 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> {
writing: Writing::Init,
keep_alive: keep_alive,
timeout: next.timeout,
timeout_start: Some(now),
_marker: PhantomData,
}),
Next_::End |
Expand Down Expand Up @@ -288,7 +293,7 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> {
};
let mut s = State::Http1(http1);
if let Some(next) = next {
s.update(next, &**scope);
s.update(next, &**scope, Some(scope.now()));
}
trace!("Conn.on_readable State::Http1 completed, new state = State::{:?}", s);

Expand Down Expand Up @@ -354,6 +359,7 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> {
handler: handler,
keep_alive: keep_alive,
timeout: interest.timeout,
timeout_start: Some(scope.now()),
_marker: PhantomData,
})
}
Expand Down Expand Up @@ -447,7 +453,7 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> {
};

if let Some(next) = next {
state.update(next, &**scope);
state.update(next, &**scope, Some(scope.now()));
}
state
}
Expand All @@ -467,7 +473,7 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> {
State::Http1(ref mut http1) => http1.handler.on_error(err),
State::Closed => Next::remove(),
};
self.state.update(next, factory);
self.state.update(next, factory, None);
}

fn on_readable<F>(&mut self, scope: &mut Scope<F>)
Expand Down Expand Up @@ -502,7 +508,13 @@ pub enum ReadyResult<C> {
}

impl<K: Key, T: Transport, H: MessageHandler<T>> Conn<K, T, H> {
pub fn new(key: K, transport: T, next: Next, notify: rotor::Notifier) -> Conn<K, T, H> {
pub fn new(
key: K,
transport: T,
next: Next,
notify: rotor::Notifier,
now: Time
) -> Conn<K, T, H> {
Conn(Box::new(ConnInner {
buf: Buffer::new(),
ctrl: channel::new(notify),
Expand All @@ -511,6 +523,7 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> Conn<K, T, H> {
state: State::Init {
interest: next.interest,
timeout: next.timeout,
timeout_start: Some(now),
},
transport: transport,
}))
Expand Down Expand Up @@ -615,7 +628,8 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> Conn<K, T, H> {
where F: MessageHandlerFactory<K, T, Output=H> {
while let Ok(next) = self.0.ctrl.1.try_recv() {
trace!("woke up with {:?}", next);
self.0.state.update(next, &**scope);
let timeout_start = self.0.state.timeout_start();
self.0.state.update(next, &**scope, timeout_start);
}

let mut conn = Some(self);
Expand All @@ -629,8 +643,11 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> Conn<K, T, H> {

pub fn timeout<F>(mut self, scope: &mut Scope<F>) -> Option<(Self, Option<Duration>)>
where F: MessageHandlerFactory<K, T, Output=H> {
//TODO: check if this was a spurious timeout?
self.0.on_error(::Error::Timeout, &**scope);
// Run error handler if timeout has elapsed
if self.0.state.timeout_elapsed(scope.now()) {
self.0.on_error(::Error::Timeout, &**scope);
}

let mut conn = Some(self);
loop {
match conn.take().unwrap().ready(EventSet::none(), scope) {
Expand Down Expand Up @@ -667,6 +684,7 @@ enum State<H: MessageHandler<T>, T: Transport> {
Init {
interest: Next_,
timeout: Option<Duration>,
timeout_start: Option<Time>,
},
/// Http1 will only ever use a connection to send and receive a single
/// message at a time. Once a H1 status has been determined, we will either
Expand All @@ -681,6 +699,27 @@ enum State<H: MessageHandler<T>, T: Transport> {
Closed,
}

/// Given two rotor::Time and a duration, see if the duration has elapsed.
///
/// The rotor::Time type only implements Add<Duration>, doesn't provide an API for comparing
/// itself with other rotor::Time, and it doesn't implement arithmetic operations with itself.
///
/// `Time` is just a newtype around (u64). Since there's no other way to compare them, we'll just
/// use this knowledge to actually do a comparison.
fn timeout_elapsed(timeout: Duration, start: Time, now: Time) -> bool {
// type annotation for sanity
let timeout_at: rotor::Time = start + timeout;

let timeout_at: u64 = unsafe { mem::transmute(timeout_at) };
let now: u64 = unsafe { mem::transmute(now) };

if now >= timeout_at {
true
} else {
false
}
}


impl<H: MessageHandler<T>, T: Transport> State<H, T> {
fn timeout(&self) -> Option<Duration> {
Expand All @@ -690,14 +729,37 @@ impl<H: MessageHandler<T>, T: Transport> State<H, T> {
State::Closed => None,
}
}

fn timeout_start(&self) -> Option<Time> {
match *self {
State::Init { timeout_start, .. } => timeout_start,
State::Http1(ref http1) => http1.timeout_start,
State::Closed => None,
}
}

fn timeout_elapsed(&self, now: Time) -> bool {
match *self {
State::Init { timeout, timeout_start, .. } => {
if let (Some(timeout), Some(start)) = (timeout, timeout_start) {
timeout_elapsed(timeout, start, now)
} else {
false
}
},
State::Http1(ref http1) => http1.timeout_elapsed(now),
State::Closed => false,
}
}
}

impl<H: MessageHandler<T>, T: Transport> fmt::Debug for State<H, T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
State::Init { interest, timeout } => f.debug_struct("Init")
State::Init { interest, timeout, timeout_start } => f.debug_struct("Init")
.field("interest", &interest)
.field("timeout", &timeout)
.field("timeout_start", &timeout_start)
.finish(),
State::Http1(ref h1) => f.debug_tuple("Http1")
.field(h1)
Expand All @@ -708,7 +770,7 @@ impl<H: MessageHandler<T>, T: Transport> fmt::Debug for State<H, T> {
}

impl<H: MessageHandler<T>, T: Transport> State<H, T> {
fn update<F, K>(&mut self, next: Next, factory: &F)
fn update<F, K>(&mut self, next: Next, factory: &F, timeout_start: Option<Time>)
where F: MessageHandlerFactory<K, T>,
K: Key
{
Expand All @@ -723,6 +785,7 @@ impl<H: MessageHandler<T>, T: Transport> State<H, T> {
State::Init {
interest: e,
timeout: timeout,
timeout_start: timeout_start,
});
}
(State::Http1(mut http1), next_) => {
Expand Down Expand Up @@ -782,6 +845,7 @@ impl<H: MessageHandler<T>, T: Transport> State<H, T> {
State::Init {
interest: next.interest,
timeout: next.timeout,
timeout_start: timeout_start,
});
return;
}
Expand Down Expand Up @@ -913,9 +977,20 @@ struct Http1<H, T> {
writing: Writing,
keep_alive: bool,
timeout: Option<Duration>,
timeout_start: Option<Time>,
_marker: PhantomData<T>,
}

impl<H, T> Http1<H, T> {
fn timeout_elapsed(&self, now: Time) -> bool {
if let (Some(timeout), Some(start)) = (self.timeout, self.timeout_start) {
timeout_elapsed(timeout, start, now)
} else {
false
}
}
}

impl<H, T> fmt::Debug for Http1<H, T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Http1")
Expand Down
2 changes: 1 addition & 1 deletion src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ where A: Accept,
rotor_try!(scope.register(&seed, EventSet::readable(), PollOpt::level()));
rotor::Response::ok(
ServerFsm::Conn(
http::Conn::new((), seed, Next::read(), scope.notifier())
http::Conn::new((), seed, Next::read(), scope.notifier(), scope.now())
.keep_alive(scope.keep_alive)
)
)
Expand Down

0 comments on commit 934f2c4

Please sign in to comment.