Skip to content

Commit

Permalink
add body::to_bytes_limited (actix#3000
Browse files Browse the repository at this point in the history
* add body::to_body_limit

* rename to_bytes_limited
  • Loading branch information
robjtede authored Mar 13, 2023
1 parent dfaca18 commit 9e7a6fe
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 13 deletions.
5 changes: 5 additions & 0 deletions actix-http/CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## Unreleased - 2023-xx-xx

### Added

- Add `body::to_body_limit()` function.
- Add `body::BodyLimitExceeded` error type.

## 3.3.1 - 2023-03-02

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion actix-http/src/body/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ pub(crate) use self::message_body::MessageBodyMapErr;
pub use self::none::None;
pub use self::size::BodySize;
pub use self::sized_stream::SizedStream;
pub use self::utils::to_bytes;
pub use self::utils::{to_bytes, to_bytes_limited, BodyLimitExceeded};
141 changes: 129 additions & 12 deletions actix-http/src/body/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,71 +7,188 @@ use futures_core::ready;

use super::{BodySize, MessageBody};

/// Collects the body produced by a `MessageBody` implementation into `Bytes`.
/// Collects all the bytes produced by `body`.
///
/// Any errors produced by the body stream are returned immediately.
///
/// Consider using [`to_bytes_limited`] instead to protect against memory exhaustion.
///
/// # Examples
///
/// ```
/// use actix_http::body::{self, to_bytes};
/// use bytes::Bytes;
///
/// # async fn test_to_bytes() {
/// # actix_rt::System::new().block_on(async {
/// let body = body::None::new();
/// let bytes = to_bytes(body).await.unwrap();
/// assert!(bytes.is_empty());
///
/// let body = Bytes::from_static(b"123");
/// let bytes = to_bytes(body).await.unwrap();
/// assert_eq!(bytes, b"123"[..]);
/// # }
/// assert_eq!(bytes, "123");
/// # });
/// ```
pub async fn to_bytes<B: MessageBody>(body: B) -> Result<Bytes, B::Error> {
to_bytes_limited(body, usize::MAX)
.await
.expect("body should never overflow usize::MAX")
}

/// Error type returned from [`to_bytes_limited`] when body produced exceeds limit.
#[derive(Debug)]
#[non_exhaustive]
pub struct BodyLimitExceeded;

/// Collects the bytes produced by `body`, up to `limit` bytes.
///
/// If a chunk read from `poll_next` causes the total number of bytes read to exceed `limit`, an
/// `Err(BodyLimitExceeded)` is returned.
///
/// Any errors produced by the body stream are returned immediately as `Ok(Err(B::Error))`.
///
/// # Examples
///
/// ```
/// use actix_http::body::{self, to_bytes_limited};
/// use bytes::Bytes;
///
/// # actix_rt::System::new().block_on(async {
/// let body = body::None::new();
/// let bytes = to_bytes_limited(body, 10).await.unwrap().unwrap();
/// assert!(bytes.is_empty());
///
/// let body = Bytes::from_static(b"123");
/// let bytes = to_bytes_limited(body, 10).await.unwrap().unwrap();
/// assert_eq!(bytes, "123");
///
/// let body = Bytes::from_static(b"123");
/// assert!(to_bytes_limited(body, 2).await.is_err());
/// # });
/// ```
pub async fn to_bytes_limited<B: MessageBody>(
body: B,
limit: usize,
) -> Result<Result<Bytes, B::Error>, BodyLimitExceeded> {
let cap = match body.size() {
BodySize::None | BodySize::Sized(0) => return Ok(Bytes::new()),
BodySize::None | BodySize::Sized(0) => return Ok(Ok(Bytes::new())),
BodySize::Sized(size) if size as usize > limit => return Err(BodyLimitExceeded),
BodySize::Sized(size) => size as usize,
// good enough first guess for chunk size
BodySize::Stream => 32_768,
};

let mut exceeded_limit = false;
let mut buf = BytesMut::with_capacity(cap);

pin!(body);

poll_fn(|cx| loop {
match poll_fn(|cx| loop {
let body = body.as_mut();

match ready!(body.poll_next(cx)) {
Some(Ok(bytes)) => buf.extend_from_slice(&bytes),
Some(Ok(bytes)) => {
// if limit is exceeded...
if buf.len() + bytes.len() > limit {
// ...set flag to true and break out of poll_fn
exceeded_limit = true;
return Poll::Ready(Ok(()));
}

buf.extend_from_slice(&bytes)
}
None => return Poll::Ready(Ok(())),
Some(Err(err)) => return Poll::Ready(Err(err)),
}
})
.await?;
.await
{
// propagate error returned from body poll
Err(err) => Ok(Err(err)),

Ok(buf.freeze())
// limit was exceeded while reading body
Ok(()) if exceeded_limit => Err(BodyLimitExceeded),

// otherwise return body buffer
Ok(()) => Ok(Ok(buf.freeze())),
}
}

#[cfg(test)]
mod test {
mod tests {
use std::io;

use futures_util::{stream, StreamExt as _};

use super::*;
use crate::{body::BodyStream, Error};
use crate::{
body::{BodyStream, SizedStream},
Error,
};

#[actix_rt::test]
async fn test_to_bytes() {
async fn to_bytes_complete() {
let bytes = to_bytes(()).await.unwrap();
assert!(bytes.is_empty());

let body = Bytes::from_static(b"123");
let bytes = to_bytes(body).await.unwrap();
assert_eq!(bytes, b"123"[..]);
}

#[actix_rt::test]
async fn to_bytes_streams() {
let stream = stream::iter(vec![Bytes::from_static(b"123"), Bytes::from_static(b"abc")])
.map(Ok::<_, Error>);
let body = BodyStream::new(stream);
let bytes = to_bytes(body).await.unwrap();
assert_eq!(bytes, b"123abc"[..]);
}

#[actix_rt::test]
async fn to_bytes_limited_complete() {
let bytes = to_bytes_limited((), 0).await.unwrap().unwrap();
assert!(bytes.is_empty());

let bytes = to_bytes_limited((), 1).await.unwrap().unwrap();
assert!(bytes.is_empty());

assert!(to_bytes_limited(Bytes::from_static(b"12"), 0)
.await
.is_err());
assert!(to_bytes_limited(Bytes::from_static(b"12"), 1)
.await
.is_err());
assert!(to_bytes_limited(Bytes::from_static(b"12"), 2).await.is_ok());
assert!(to_bytes_limited(Bytes::from_static(b"12"), 3).await.is_ok());
}

#[actix_rt::test]
async fn to_bytes_limited_streams() {
// hinting a larger body fails
let body = SizedStream::new(8, stream::empty().map(Ok::<_, Error>));
assert!(to_bytes_limited(body, 3).await.is_err());

// hinting a smaller body is okay
let body = SizedStream::new(3, stream::empty().map(Ok::<_, Error>));
assert!(to_bytes_limited(body, 3).await.unwrap().unwrap().is_empty());

// hinting a smaller body then returning a larger one fails
let stream = stream::iter(vec![Bytes::from_static(b"1234")]).map(Ok::<_, Error>);
let body = SizedStream::new(3, stream);
assert!(to_bytes_limited(body, 3).await.is_err());

let stream = stream::iter(vec![Bytes::from_static(b"123"), Bytes::from_static(b"abc")])
.map(Ok::<_, Error>);
let body = BodyStream::new(stream);
assert!(to_bytes_limited(body, 3).await.is_err());
}

#[actix_rt::test]
async fn to_body_limit_error() {
let err_stream = stream::once(async { Err(io::Error::new(io::ErrorKind::Other, "")) });
let body = SizedStream::new(8, err_stream);
// not too big, but propagates error from body stream
assert!(to_bytes_limited(body, 10).await.unwrap().is_err());
}
}

0 comments on commit 9e7a6fe

Please sign in to comment.