Skip to content

Commit

Permalink
drop unpin constraint
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Nov 21, 2019
1 parent 687884f commit 1ffa7d1
Show file tree
Hide file tree
Showing 31 changed files with 2,174 additions and 2,179 deletions.
3 changes: 2 additions & 1 deletion actix-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ language-tags = "0.2"
log = "0.4"
mime = "0.3"
percent-encoding = "2.1"
pin-project = "0.4.5"
rand = "0.7"
regex = "1.0"
serde = "1.0"
Expand Down Expand Up @@ -107,7 +108,7 @@ webpki-roots = { version = "0.18", optional = true }

[dev-dependencies]
actix-rt = "1.0.0-alpha.1"
actix-server = { version = "0.8.0-alpha.1", features=["openssl"] }
actix-server = { version = "0.8.0-alpha.1", features=["openssl", "rustls"] }
actix-connect = { version = "1.0.0-alpha.1", features=["openssl"] }
actix-http-test = { version = "0.3.0-alpha.1", features=["openssl"] }
env_logger = "0.6"
Expand Down
34 changes: 18 additions & 16 deletions actix-http/examples/echo.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::{env, io};

use actix_http::{error::PayloadError, HttpService, Request, Response};
use actix_http::{Error, HttpService, Request, Response};
use actix_server::Server;
use bytes::BytesMut;
use futures::{Future, Stream};
use futures::StreamExt;
use http::header::HeaderValue;
use log::info;

Expand All @@ -17,20 +17,22 @@ fn main() -> io::Result<()> {
.client_timeout(1000)
.client_disconnect(1000)
.finish(|mut req: Request| {
req.take_payload()
.fold(BytesMut::new(), move |mut body, chunk| {
body.extend_from_slice(&chunk);
Ok::<_, PayloadError>(body)
})
.and_then(|bytes| {
info!("request body: {:?}", bytes);
let mut res = Response::Ok();
res.header(
"x-head",
HeaderValue::from_static("dummy value!"),
);
Ok(res.body(bytes))
})
async move {
let mut body = BytesMut::new();
while let Some(item) = req.payload().next().await {
body.extend_from_slice(&item?);
}

info!("request body: {:?}", body);
Ok::<_, Error>(
Response::Ok()
.header(
"x-head",
HeaderValue::from_static("dummy value!"),
)
.body(body),
)
}
})
})?
.run()
Expand Down
29 changes: 13 additions & 16 deletions actix-http/examples/echo2.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,22 @@
use std::{env, io};

use actix_http::http::HeaderValue;
use actix_http::{error::PayloadError, Error, HttpService, Request, Response};
use actix_http::{Error, HttpService, Request, Response};
use actix_server::Server;
use bytes::BytesMut;
use futures::{Future, Stream};
use futures::StreamExt;
use log::info;

fn handle_request(mut req: Request) -> impl Future<Item = Response, Error = Error> {
req.take_payload()
.fold(BytesMut::new(), move |mut body, chunk| {
body.extend_from_slice(&chunk);
Ok::<_, PayloadError>(body)
})
.from_err()
.and_then(|bytes| {
info!("request body: {:?}", bytes);
let mut res = Response::Ok();
res.header("x-head", HeaderValue::from_static("dummy value!"));
Ok(res.body(bytes))
})
async fn handle_request(mut req: Request) -> Result<Response, Error> {
let mut body = BytesMut::new();
while let Some(item) = req.payload().next().await {
body.extend_from_slice(&item?)
}

info!("request body: {:?}", body);
Ok(Response::Ok()
.header("x-head", HeaderValue::from_static("dummy value!"))
.body(body))
}

fn main() -> io::Result<()> {
Expand All @@ -28,7 +25,7 @@ fn main() -> io::Result<()> {

Server::build()
.bind("echo", "127.0.0.1:8080", || {
HttpService::build().finish(|_req: Request| handle_request(_req))
HttpService::build().finish(handle_request)
})?
.run()
}
73 changes: 44 additions & 29 deletions actix-http/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{fmt, mem};

use bytes::{Bytes, BytesMut};
use futures::Stream;
use pin_project::{pin_project, project};

use crate::error::Error;

Expand All @@ -31,7 +32,7 @@ impl BodySize {
}

/// Type that provides this trait can be streamed to a peer.
pub trait MessageBody: Unpin {
pub trait MessageBody {
fn size(&self) -> BodySize;

fn poll_next(&mut self, cx: &mut Context) -> Poll<Option<Result<Bytes, Error>>>;
Expand All @@ -57,6 +58,7 @@ impl<T: MessageBody> MessageBody for Box<T> {
}
}

#[pin_project]
pub enum ResponseBody<B> {
Body(B),
Other(Body),
Expand Down Expand Up @@ -106,8 +108,13 @@ impl<B: MessageBody> MessageBody for ResponseBody<B> {
impl<B: MessageBody> Stream for ResponseBody<B> {
type Item = Result<Bytes, Error>;

#[project]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.get_mut().poll_next(cx)
#[project]
match self.project() {
ResponseBody::Body(ref mut body) => body.poll_next(cx),
ResponseBody::Other(ref mut body) => body.poll_next(cx),
}
}
}

Expand Down Expand Up @@ -243,7 +250,7 @@ impl From<serde_json::Value> for Body {

impl<S> From<SizedStream<S>> for Body
where
S: Stream<Item = Result<Bytes, Error>> + Unpin + 'static,
S: Stream<Item = Result<Bytes, Error>> + 'static,
{
fn from(s: SizedStream<S>) -> Body {
Body::from_message(s)
Expand All @@ -252,7 +259,7 @@ where

impl<S, E> From<BodyStream<S, E>> for Body
where
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
S: Stream<Item = Result<Bytes, E>> + 'static,
E: Into<Error> + 'static,
{
fn from(s: BodyStream<S, E>) -> Body {
Expand Down Expand Up @@ -350,7 +357,9 @@ impl MessageBody for String {

/// Type represent streaming body.
/// Response does not contain `content-length` header and appropriate transfer encoding is used.
#[pin_project]
pub struct BodyStream<S, E> {
#[pin]
stream: S,
_t: PhantomData<E>,
}
Expand All @@ -368,33 +377,30 @@ where
}
}

impl<S, E> Unpin for BodyStream<S, E>
where
S: Stream<Item = Result<Bytes, E>> + Unpin,
E: Into<Error>,
{
}

impl<S, E> MessageBody for BodyStream<S, E>
where
S: Stream<Item = Result<Bytes, E>> + Unpin,
S: Stream<Item = Result<Bytes, E>>,
E: Into<Error>,
{
fn size(&self) -> BodySize {
BodySize::Stream
}

fn poll_next(&mut self, cx: &mut Context) -> Poll<Option<Result<Bytes, Error>>> {
Pin::new(&mut self.stream)
unsafe { Pin::new_unchecked(self) }
.project()
.stream
.poll_next(cx)
.map(|res| res.map(|res| res.map_err(std::convert::Into::into)))
}
}

/// Type represent streaming body. This body implementation should be used
/// if total size of stream is known. Data get sent as is without using transfer encoding.
#[pin_project]
pub struct SizedStream<S> {
size: u64,
#[pin]
stream: S,
}

Expand All @@ -409,20 +415,25 @@ where

impl<S> MessageBody for SizedStream<S>
where
S: Stream<Item = Result<Bytes, Error>> + Unpin,
S: Stream<Item = Result<Bytes, Error>>,
{
fn size(&self) -> BodySize {
BodySize::Sized64(self.size)
}

fn poll_next(&mut self, cx: &mut Context) -> Poll<Option<Result<Bytes, Error>>> {
Pin::new(&mut self.stream).poll_next(cx)
unsafe { Pin::new_unchecked(self) }
.project()
.stream
.poll_next(cx)
}
}

#[cfg(test)]
mod tests {
use super::*;
use actix_http_test::block_on;
use futures::future::{lazy, poll_fn};

impl Body {
pub(crate) fn get_ref(&self) -> &[u8] {
Expand Down Expand Up @@ -450,8 +461,8 @@ mod tests {

assert_eq!("test".size(), BodySize::Sized(4));
assert_eq!(
"test".poll_next().unwrap(),
Async::Ready(Some(Bytes::from("test")))
block_on(poll_fn(|cx| "test".poll_next(cx))).unwrap().ok(),
Some(Bytes::from("test"))
);
}

Expand All @@ -467,8 +478,10 @@ mod tests {

assert_eq!((&b"test"[..]).size(), BodySize::Sized(4));
assert_eq!(
(&b"test"[..]).poll_next().unwrap(),
Async::Ready(Some(Bytes::from("test")))
block_on(poll_fn(|cx| (&b"test"[..]).poll_next(cx)))
.unwrap()
.ok(),
Some(Bytes::from("test"))
);
}

Expand All @@ -479,8 +492,10 @@ mod tests {

assert_eq!(Vec::from("test").size(), BodySize::Sized(4));
assert_eq!(
Vec::from("test").poll_next().unwrap(),
Async::Ready(Some(Bytes::from("test")))
block_on(poll_fn(|cx| Vec::from("test").poll_next(cx)))
.unwrap()
.ok(),
Some(Bytes::from("test"))
);
}

Expand All @@ -492,8 +507,8 @@ mod tests {

assert_eq!(b.size(), BodySize::Sized(4));
assert_eq!(
b.poll_next().unwrap(),
Async::Ready(Some(Bytes::from("test")))
block_on(poll_fn(|cx| b.poll_next(cx))).unwrap().ok(),
Some(Bytes::from("test"))
);
}

Expand All @@ -505,8 +520,8 @@ mod tests {

assert_eq!(b.size(), BodySize::Sized(4));
assert_eq!(
b.poll_next().unwrap(),
Async::Ready(Some(Bytes::from("test")))
block_on(poll_fn(|cx| b.poll_next(cx))).unwrap().ok(),
Some(Bytes::from("test"))
);
}

Expand All @@ -520,22 +535,22 @@ mod tests {

assert_eq!(b.size(), BodySize::Sized(4));
assert_eq!(
b.poll_next().unwrap(),
Async::Ready(Some(Bytes::from("test")))
block_on(poll_fn(|cx| b.poll_next(cx))).unwrap().ok(),
Some(Bytes::from("test"))
);
}

#[test]
fn test_unit() {
assert_eq!(().size(), BodySize::Empty);
assert_eq!(().poll_next().unwrap(), Async::Ready(None));
assert!(block_on(poll_fn(|cx| ().poll_next(cx))).is_none());
}

#[test]
fn test_box() {
let mut val = Box::new(());
assert_eq!(val.size(), BodySize::Empty);
assert_eq!(val.poll_next().unwrap(), Async::Ready(None));
assert!(block_on(poll_fn(|cx| val.poll_next(cx))).is_none());
}

#[test]
Expand Down
Loading

0 comments on commit 1ffa7d1

Please sign in to comment.