Skip to content

Commit

Permalink
refined error model (actix#2253)
Browse files Browse the repository at this point in the history
  • Loading branch information
robjtede authored Jun 17, 2021
1 parent bb0331a commit 532f7b9
Show file tree
Hide file tree
Showing 69 changed files with 1,491 additions and 894 deletions.
5 changes: 3 additions & 2 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
[alias]
chk = "hack check --workspace --all-features --tests --examples"
lint = "hack --clean-per-run clippy --workspace --tests --examples"
chk = "check --workspace --all-features --tests --examples --bins"
lint = "clippy --workspace --tests --examples"
ci-min = "hack check --workspace --no-default-features"
ci-min-test = "hack check --workspace --no-default-features --tests --examples"
ci-default = "hack check --workspace"
ci-full = "check --workspace --bins --examples --tests"
ci-test = "test --workspace --all-features --no-fail-fast"
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
* Update `language-tags` to `0.3`.
* `ServiceResponse::take_body`. [#2201]
* `ServiceResponse::map_body` closure receives and returns `B` instead of `ResponseBody<B>` types. [#2201]
* All error trait bounds in server service builders have changed from `Into<Error>` to `Into<Response<AnyBody>>`. [#2253]
* All error trait bounds in message body and stream impls changed from `Into<Error>` to `Into<Box<dyn std::error::Error>>`. [#2253]
* `HttpServer::{listen_rustls(), bind_rustls()}` now honor the ALPN protocols in the configuation parameter. [#2226]
* `middleware::normalize` now will not try to normalize URIs with no valid path [#2246]

Expand All @@ -21,6 +23,7 @@

[#2200]: https://github.com/actix/actix-web/pull/2200
[#2201]: https://github.com/actix/actix-web/pull/2201
[#2253]: https://github.com/actix/actix-web/pull/2253
[#2246]: https://github.com/actix/actix-web/pull/2246


Expand Down
1 change: 1 addition & 0 deletions actix-files/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ path = "src/lib.rs"

[dependencies]
actix-web = { version = "4.0.0-beta.6", default-features = false }
actix-http = "3.0.0-beta.6"
actix-service = "2.0.0"
actix-utils = "3.0.0"

Expand Down
4 changes: 4 additions & 0 deletions actix-http/CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@

### Changed
* The `MessageBody` trait now has an associated `Error` type. [#2183]
* All error trait bounds in server service builders have changed from `Into<Error>` to `Into<Response<AnyBody>>`. [#2253]
* All error trait bounds in message body and stream impls changed from `Into<Error>` to `Into<Box<dyn std::error::Error>>`. [#2253]
* Places in `Response` where `ResponseBody<B>` was received or returned now simply use `B`. [#2201]
* `header` mod is now public. [#2171]
* `uri` mod is now public. [#2171]
* Update `language-tags` to `0.3`.
* Reduce the level from `error` to `debug` for the log line that is emitted when a `500 Internal Server Error` is built using `HttpResponse::from_error`. [#2201]
* `ResponseBuilder::message_body` now returns a `Result`. [#2201]
* Remove `Unpin` bound on `ResponseBuilder::streaming`. [#2253]
* `HttpServer::{listen_rustls(), bind_rustls()}` now honor the ALPN protocols in the configuation parameter. [#2226]

### Removed
Expand All @@ -37,6 +40,7 @@
[#2201]: https://github.com/actix/actix-web/pull/2201
[#2205]: https://github.com/actix/actix-web/pull/2205
[#2215]: https://github.com/actix/actix-web/pull/2215
[#2253]: https://github.com/actix/actix-web/pull/2253
[#2244]: https://github.com/actix/actix-web/pull/2244


Expand Down
3 changes: 2 additions & 1 deletion actix-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ trust-dns-resolver = { version = "0.20.0", optional = true }
actix-server = "2.0.0-beta.3"
actix-http-test = { version = "3.0.0-beta.4", features = ["openssl"] }
actix-tls = { version = "3.0.0-beta.5", features = ["openssl"] }
criterion = "0.3"
async-stream = "0.3"
criterion = { version = "0.3", features = ["html_reports"] }
env_logger = "0.8"
rcgen = "0.8"
serde = { version = "1.0", features = ["derive"] }
Expand Down
6 changes: 3 additions & 3 deletions actix-http/examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ use actix_server::Server;
use bytes::BytesMut;
use futures_util::StreamExt as _;
use http::header::HeaderValue;
use log::info;

#[actix_rt::main]
async fn main() -> io::Result<()> {
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));

Server::build()
.bind("echo", "127.0.0.1:8080", || {
.bind("echo", ("127.0.0.1", 8080), || {
HttpService::build()
.client_timeout(1000)
.client_disconnect(1000)
Expand All @@ -22,7 +21,8 @@ async fn main() -> io::Result<()> {
body.extend_from_slice(&item?);
}

info!("request body: {:?}", body);
log::info!("request body: {:?}", body);

Ok::<_, Error>(
Response::build(StatusCode::OK)
.insert_header((
Expand Down
6 changes: 3 additions & 3 deletions actix-http/examples/echo2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ use actix_http::{Error, HttpService, Request, Response};
use actix_server::Server;
use bytes::BytesMut;
use futures_util::StreamExt as _;
use log::info;

async fn handle_request(mut req: Request) -> Result<Response<Body>, Error> {
let mut body = BytesMut::new();
while let Some(item) = req.payload().next().await {
body.extend_from_slice(&item?)
}

info!("request body: {:?}", body);
log::info!("request body: {:?}", body);

Ok(Response::build(StatusCode::OK)
.insert_header(("x-head", HeaderValue::from_static("dummy value!")))
.body(body))
Expand All @@ -24,7 +24,7 @@ async fn main() -> io::Result<()> {
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));

Server::build()
.bind("echo", "127.0.0.1:8080", || {
.bind("echo", ("127.0.0.1", 8080), || {
HttpService::build().finish(handle_request).tcp()
})?
.run()
Expand Down
14 changes: 7 additions & 7 deletions actix-http/examples/hello-world.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
use std::io;
use std::{convert::Infallible, io};

use actix_http::{http::StatusCode, HttpService, Response};
use actix_server::Server;
use actix_utils::future;
use http::header::HeaderValue;
use log::info;

#[actix_rt::main]
async fn main() -> io::Result<()> {
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));

Server::build()
.bind("hello-world", "127.0.0.1:8080", || {
.bind("hello-world", ("127.0.0.1", 8080), || {
HttpService::build()
.client_timeout(1000)
.client_disconnect(1000)
.finish(|_req| {
info!("{:?}", _req);
.finish(|req| async move {
log::info!("{:?}", req);

let mut res = Response::build(StatusCode::OK);
res.insert_header((
"x-head",
HeaderValue::from_static("dummy value!"),
));
future::ok::<_, ()>(res.body("Hello world!"))

Ok::<_, Infallible>(res.body("Hello world!"))
})
.tcp()
})?
Expand Down
40 changes: 40 additions & 0 deletions actix-http/examples/streaming-error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
//! Example showing response body (chunked) stream erroring.
//!
//! Test using `nc` or `curl`.
//! ```sh
//! $ curl -vN 127.0.0.1:8080
//! $ echo 'GET / HTTP/1.1\n\n' | nc 127.0.0.1 8080
//! ```
use std::{convert::Infallible, io, time::Duration};

use actix_http::{body::BodyStream, HttpService, Response};
use actix_server::Server;
use async_stream::stream;
use bytes::Bytes;

#[actix_rt::main]
async fn main() -> io::Result<()> {
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));

Server::build()
.bind("streaming-error", ("127.0.0.1", 8080), || {
HttpService::build()
.finish(|req| async move {
log::info!("{:?}", req);
let res = Response::ok();

Ok::<_, Infallible>(res.set_body(BodyStream::new(stream! {
yield Ok(Bytes::from("123"));
yield Ok(Bytes::from("456"));

actix_rt::time::sleep(Duration::from_millis(1000)).await;

yield Err(io::Error::new(io::ErrorKind::Other, ""));
})))
})
.tcp()
})?
.run()
.await
}
13 changes: 8 additions & 5 deletions actix-http/src/body/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ impl MessageBody for AnyBody {

// TODO: MSRV 1.51: poll_map_err
AnyBody::Message(body) => match ready!(body.as_pin_mut().poll_next(cx)) {
Some(Err(err)) => Poll::Ready(Some(Err(err.into()))),
Some(Err(err)) => {
Poll::Ready(Some(Err(Error::new_body().with_cause(err))))
}
Some(Ok(val)) => Poll::Ready(Some(Ok(val))),
None => Poll::Ready(None),
},
Expand Down Expand Up @@ -162,9 +164,10 @@ impl From<BytesMut> for AnyBody {
}
}

impl<S> From<SizedStream<S>> for AnyBody
impl<S, E> From<SizedStream<S>> for AnyBody
where
S: Stream<Item = Result<Bytes, Error>> + 'static,
S: Stream<Item = Result<Bytes, E>> + 'static,
E: Into<Box<dyn StdError>> + 'static,
{
fn from(s: SizedStream<S>) -> Body {
AnyBody::from_message(s)
Expand All @@ -174,7 +177,7 @@ where
impl<S, E> From<BodyStream<S>> for AnyBody
where
S: Stream<Item = Result<Bytes, E>> + 'static,
E: Into<Error> + 'static,
E: Into<Box<dyn StdError>> + 'static,
{
fn from(s: BodyStream<S>) -> Body {
AnyBody::from_message(s)
Expand Down Expand Up @@ -222,7 +225,7 @@ impl MessageBody for BoxAnyBody {
) -> Poll<Option<Result<Bytes, Self::Error>>> {
// TODO: MSRV 1.51: poll_map_err
match ready!(self.0.as_mut().poll_next(cx)) {
Some(Err(err)) => Poll::Ready(Some(Err(err.into()))),
Some(Err(err)) => Poll::Ready(Some(Err(Error::new_body().with_cause(err)))),
Some(Ok(val)) => Poll::Ready(Some(Ok(val))),
None => Poll::Ready(None),
}
Expand Down
80 changes: 70 additions & 10 deletions actix-http/src/body/body_stream.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
error::Error as StdError,
pin::Pin,
task::{Context, Poll},
};
Expand All @@ -7,8 +8,6 @@ use bytes::Bytes;
use futures_core::{ready, Stream};
use pin_project_lite::pin_project;

use crate::error::Error;

use super::{BodySize, MessageBody};

pin_project! {
Expand All @@ -24,7 +23,7 @@ pin_project! {
impl<S, E> BodyStream<S>
where
S: Stream<Item = Result<Bytes, E>>,
E: Into<Error>,
E: Into<Box<dyn StdError>> + 'static,
{
pub fn new(stream: S) -> Self {
BodyStream { stream }
Expand All @@ -34,9 +33,9 @@ where
impl<S, E> MessageBody for BodyStream<S>
where
S: Stream<Item = Result<Bytes, E>>,
E: Into<Error>,
E: Into<Box<dyn StdError>> + 'static,
{
type Error = Error;
type Error = E;

fn size(&self) -> BodySize {
BodySize::Stream
Expand All @@ -56,7 +55,7 @@ where

let chunk = match ready!(stream.poll_next(cx)) {
Some(Ok(ref bytes)) if bytes.is_empty() => continue,
opt => opt.map(|res| res.map_err(Into::into)),
opt => opt,
};

return Poll::Ready(chunk);
Expand All @@ -66,9 +65,16 @@ where

#[cfg(test)]
mod tests {
use actix_rt::pin;
use std::{convert::Infallible, time::Duration};

use actix_rt::{
pin,
time::{sleep, Sleep},
};
use actix_utils::future::poll_fn;
use futures_util::stream;
use derive_more::{Display, Error};
use futures_core::ready;
use futures_util::{stream, FutureExt as _};

use super::*;
use crate::body::to_bytes;
Expand All @@ -78,7 +84,7 @@ mod tests {
let body = BodyStream::new(stream::iter(
["1", "", "2"]
.iter()
.map(|&v| Ok(Bytes::from(v)) as Result<Bytes, ()>),
.map(|&v| Ok::<_, Infallible>(Bytes::from(v))),
));
pin!(body);

Expand All @@ -103,9 +109,63 @@ mod tests {
let body = BodyStream::new(stream::iter(
["1", "", "2"]
.iter()
.map(|&v| Ok(Bytes::from(v)) as Result<Bytes, ()>),
.map(|&v| Ok::<_, Infallible>(Bytes::from(v))),
));

assert_eq!(to_bytes(body).await.ok(), Some(Bytes::from("12")));
}
#[derive(Debug, Display, Error)]
#[display(fmt = "stream error")]
struct StreamErr;

#[actix_rt::test]
async fn stream_immediate_error() {
let body = BodyStream::new(stream::once(async { Err(StreamErr) }));
assert!(matches!(to_bytes(body).await, Err(StreamErr)));
}

#[actix_rt::test]
async fn stream_delayed_error() {
let body =
BodyStream::new(stream::iter(vec![Ok(Bytes::from("1")), Err(StreamErr)]));
assert!(matches!(to_bytes(body).await, Err(StreamErr)));

#[pin_project::pin_project(project = TimeDelayStreamProj)]
#[derive(Debug)]
enum TimeDelayStream {
Start,
Sleep(Pin<Box<Sleep>>),
Done,
}

impl Stream for TimeDelayStream {
type Item = Result<Bytes, StreamErr>;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
match self.as_mut().get_mut() {
TimeDelayStream::Start => {
let sleep = sleep(Duration::from_millis(1));
self.as_mut().set(TimeDelayStream::Sleep(Box::pin(sleep)));
cx.waker().wake_by_ref();
Poll::Pending
}

TimeDelayStream::Sleep(ref mut delay) => {
ready!(delay.poll_unpin(cx));
self.set(TimeDelayStream::Done);
cx.waker().wake_by_ref();
Poll::Pending
}

TimeDelayStream::Done => Poll::Ready(Some(Err(StreamErr))),
}
}
}

let body = BodyStream::new(TimeDelayStream::Start);
assert!(matches!(to_bytes(body).await, Err(StreamErr)));
}
}
6 changes: 5 additions & 1 deletion actix-http/src/body/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,15 @@ mod tests {
}

#[actix_rt::test]
async fn test_box() {
async fn test_box_and_pin() {
let val = Box::new(());
pin!(val);
assert_eq!(val.size(), BodySize::Empty);
assert!(poll_fn(|cx| val.as_mut().poll_next(cx)).await.is_none());

let mut val = Box::pin(());
assert_eq!(val.size(), BodySize::Empty);
assert!(poll_fn(|cx| val.as_mut().poll_next(cx)).await.is_none());
}

#[actix_rt::test]
Expand Down
Loading

0 comments on commit 532f7b9

Please sign in to comment.