Skip to content

Commit

Permalink
fix master branch build. change web::block output type. (actix#1957)
Browse files Browse the repository at this point in the history
  • Loading branch information
fakeshadow authored Feb 6, 2021
1 parent 83fb497 commit 20cf009
Show file tree
Hide file tree
Showing 17 changed files with 94 additions and 110 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* `ServiceRequest::into_parts` and `ServiceRequest::from_parts` would not fail.
`ServiceRequest::from_request` would not fail and no payload would be generated [#1893]
* Our `Either` type now uses `Left`/`Right` variants (instead of `A`/`B`) [#1894]
* `web::block` accept any closure that has an output bound to `Send` and `'static`. [#1957]

### Fixed
* Multiple calls `App::data` with the same type now keeps the latest call's data. [#1906]
Expand All @@ -28,6 +29,7 @@
[#1894]: https://github.com/actix/actix-web/pull/1894
[#1869]: https://github.com/actix/actix-web/pull/1869
[#1906]: https://github.com/actix/actix-web/pull/1906
[#1957]: https://github.com/actix/actix-web/pull/1957


## 4.0.0-beta.1 - 2021-01-07
Expand Down
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ required-features = ["rustls"]

[dependencies]
actix-codec = "0.4.0-beta.1"
actix-macros = "0.1.0"
actix-macros = "=0.2.0-beta.1"
actix-router = "0.2.4"
actix-rt = "2.0.0-beta.2"
actix-rt = "=2.0.0-beta.2"
actix-server = "2.0.0-beta.2"
actix-service = "2.0.0-beta.3"
actix-service = "=2.0.0-beta.3"
actix-utils = "3.0.0-beta.1"
actix-tls = { version = "3.0.0-beta.2", default-features = false, optional = true }

Expand Down
2 changes: 1 addition & 1 deletion actix-files/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ percent-encoding = "2.1"
v_htmlescape = "0.12"

[dev-dependencies]
actix-rt = "2.0.0-beta.2"
actix-rt = "=2.0.0-beta.2"
actix-web = "4.0.0-beta.1"
98 changes: 58 additions & 40 deletions actix-files/src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
};

use actix_web::{
error::{Error, ErrorInternalServerError},
error::{BlockingError, Error},
rt::task::{spawn_blocking, JoinHandle},
};
use bytes::Bytes;
Expand All @@ -18,11 +18,26 @@ use futures_core::{ready, Stream};
/// A helper created from a `std::fs::File` which reads the file
/// chunk-by-chunk on a `ThreadPool`.
pub struct ChunkedReadFile {
pub(crate) size: u64,
pub(crate) offset: u64,
pub(crate) file: Option<File>,
pub(crate) fut: Option<JoinHandle<Result<(File, Bytes), io::Error>>>,
pub(crate) counter: u64,
size: u64,
offset: u64,
state: ChunkedReadFileState,
counter: u64,
}

enum ChunkedReadFileState {
File(Option<File>),
Future(JoinHandle<Result<(File, Bytes), io::Error>>),
}

impl ChunkedReadFile {
pub(crate) fn new(size: u64, offset: u64, file: File) -> Self {
Self {
size,
offset,
state: ChunkedReadFileState::File(Some(file)),
counter: 0,
}
}
}

impl fmt::Debug for ChunkedReadFile {
Expand All @@ -38,49 +53,52 @@ impl Stream for ChunkedReadFile {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if let Some(ref mut fut) = self.fut {
let res = match ready!(Pin::new(fut).poll(cx)) {
Ok(Ok((file, bytes))) => {
self.fut.take();
self.file = Some(file);

self.offset += bytes.len() as u64;
self.counter += bytes.len() as u64;

Ok(bytes)
}
Ok(Err(e)) => Err(e.into()),
Err(_) => Err(ErrorInternalServerError("Unexpected error")),
};
return Poll::Ready(Some(res));
}
let this = self.as_mut().get_mut();
match this.state {
ChunkedReadFileState::File(ref mut file) => {
let size = this.size;
let offset = this.offset;
let counter = this.counter;

let size = self.size;
let offset = self.offset;
let counter = self.counter;
if size == counter {
Poll::Ready(None)
} else {
let mut file = file
.take()
.expect("ChunkedReadFile polled after completion");

if size == counter {
Poll::Ready(None)
} else {
let mut file = self.file.take().expect("Use after completion");
let fut = spawn_blocking(move || {
let max_bytes =
cmp::min(size.saturating_sub(counter), 65_536) as usize;

self.fut = Some(spawn_blocking(move || {
let max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize;
let mut buf = Vec::with_capacity(max_bytes);
file.seek(io::SeekFrom::Start(offset))?;

let mut buf = Vec::with_capacity(max_bytes);
file.seek(io::SeekFrom::Start(offset))?;
let n_bytes = file
.by_ref()
.take(max_bytes as u64)
.read_to_end(&mut buf)?;

let n_bytes =
file.by_ref().take(max_bytes as u64).read_to_end(&mut buf)?;
if n_bytes == 0 {
return Err(io::ErrorKind::UnexpectedEof.into());
}

if n_bytes == 0 {
return Err(io::ErrorKind::UnexpectedEof.into());
Ok((file, Bytes::from(buf)))
});
this.state = ChunkedReadFileState::Future(fut);
self.poll_next(cx)
}
}
ChunkedReadFileState::Future(ref mut fut) => {
let (file, bytes) =
ready!(Pin::new(fut).poll(cx)).map_err(|_| BlockingError)??;
this.state = ChunkedReadFileState::File(Some(file));

Ok((file, Bytes::from(buf)))
}));
this.offset += bytes.len() as u64;
this.counter += bytes.len() as u64;

self.poll_next(cx)
Poll::Ready(Some(Ok(bytes)))
}
}
}
}
16 changes: 2 additions & 14 deletions actix-files/src/named.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,13 +298,7 @@ impl NamedFile {
res.encoding(current_encoding);
}

let reader = ChunkedReadFile {
size: self.md.len(),
offset: 0,
file: Some(self.file),
fut: None,
counter: 0,
};
let reader = ChunkedReadFile::new(self.md.len(), 0, self.file);

return res.streaming(reader);
}
Expand Down Expand Up @@ -426,13 +420,7 @@ impl NamedFile {
return resp.status(StatusCode::NOT_MODIFIED).finish();
}

let reader = ChunkedReadFile {
offset,
size: length,
file: Some(self.file),
fut: None,
counter: 0,
};
let reader = ChunkedReadFile::new(length, offset, self.file);

if offset != 0 || length != self.md.len() {
resp.status(StatusCode::PARTIAL_CONTENT);
Expand Down
2 changes: 1 addition & 1 deletion actix-http-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ actix-service = "2.0.0-beta.3"
actix-codec = "0.4.0-beta.1"
actix-tls = "3.0.0-beta.2"
actix-utils = "3.0.0-beta.1"
actix-rt = "2.0.0-beta.2"
actix-rt = "=2.0.0-beta.2"
actix-server = "2.0.0-beta.2"
awc = "3.0.0-beta.1"

Expand Down
2 changes: 2 additions & 0 deletions actix-http/CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* `Extensions::insert` returns Option of replaced item. [#1904]
* Remove `HttpResponseBuilder::json2()` and make `HttpResponseBuilder::json()` take a value by
reference. [#1903]
* Simplify `BlockingError` type to a struct. It's only triggered with blocking thread pool is dead. [#1957]

### Removed
* `ResponseBuilder::set`; use `ResponseBuilder::insert_header`. [#1869]
Expand All @@ -29,6 +30,7 @@
[#1903]: https://github.com/actix/actix-web/pull/1903
[#1904]: https://github.com/actix/actix-web/pull/1904
[#1912]: https://github.com/actix/actix-web/pull/1912
[#1957]: https://github.com/actix/actix-web/pull/1957


## 3.0.0-beta.1 - 2021-01-07
Expand Down
2 changes: 1 addition & 1 deletion actix-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ actors = ["actix"]
actix-service = "2.0.0-beta.3"
actix-codec = "0.4.0-beta.1"
actix-utils = "3.0.0-beta.1"
actix-rt = "2.0.0-beta.2"
actix-rt = "=2.0.0-beta.2"
actix-tls = "3.0.0-beta.2"
actix = { version = "0.11.0-beta.1", optional = true }

Expand Down
11 changes: 2 additions & 9 deletions actix-http/src/encoding/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,8 @@ where
) -> Poll<Option<Self::Item>> {
loop {
if let Some(ref mut fut) = self.fut {
let (chunk, decoder) = match ready!(Pin::new(fut).poll(cx)) {
Ok(Ok(item)) => item,
Ok(Err(e)) => {
return Poll::Ready(Some(Err(BlockingError::Error(e).into())))
}
Err(_) => {
return Poll::Ready(Some(Err(BlockingError::Canceled.into())))
}
};
let (chunk, decoder) =
ready!(Pin::new(fut).poll(cx)).map_err(|_| BlockingError)??;
self.decoder = Some(decoder);
self.fut.take();
if let Some(chunk) = chunk {
Expand Down
13 changes: 2 additions & 11 deletions actix-http/src/encoding/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,17 +136,8 @@ impl<B: MessageBody> MessageBody for Encoder<B> {
}

if let Some(ref mut fut) = this.fut {
let mut encoder = match ready!(Pin::new(fut).poll(cx)) {
Ok(Ok(item)) => item,
Ok(Err(e)) => {
return Poll::Ready(Some(Err(BlockingError::Error(e).into())))
}
Err(_) => {
return Poll::Ready(Some(Err(
BlockingError::<io::Error>::Canceled.into(),
)))
}
};
let mut encoder =
ready!(Pin::new(fut).poll(cx)).map_err(|_| BlockingError)??;
let chunk = encoder.take();
*this.encoder = Some(encoder);
this.fut.take();
Expand Down
27 changes: 10 additions & 17 deletions actix-http/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,17 +297,13 @@ impl From<httparse::Error> for ParseError {

/// A set of errors that can occur running blocking tasks in thread pool.
#[derive(Debug, Display)]
pub enum BlockingError<E: fmt::Debug> {
#[display(fmt = "{:?}", _0)]
Error(E),
#[display(fmt = "Thread pool is gone")]
Canceled,
}
#[display(fmt = "Blocking thread pool is gone")]
pub struct BlockingError;

impl<E: fmt::Debug> std::error::Error for BlockingError<E> {}
impl std::error::Error for BlockingError {}

/// `InternalServerError` for `BlockingError`
impl<E: fmt::Debug> ResponseError for BlockingError<E> {}
impl ResponseError for BlockingError {}

#[derive(Display, Debug)]
/// A set of errors that can occur during payload parsing
Expand Down Expand Up @@ -372,15 +368,12 @@ impl From<io::Error> for PayloadError {
}
}

impl From<BlockingError<io::Error>> for PayloadError {
fn from(err: BlockingError<io::Error>) -> Self {
match err {
BlockingError::Error(e) => PayloadError::Io(e),
BlockingError::Canceled => PayloadError::Io(io::Error::new(
io::ErrorKind::Other,
"Operation is canceled",
)),
}
impl From<BlockingError> for PayloadError {
fn from(_: BlockingError) -> Self {
PayloadError::Io(io::Error::new(
io::ErrorKind::Other,
"Operation is canceled",
))
}
}

Expand Down
2 changes: 1 addition & 1 deletion actix-multipart/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ mime = "0.3"
twoway = "0.2"

[dev-dependencies]
actix-rt = "2.0.0-beta.2"
actix-rt = "=2.0.0-beta.2"
actix-http = "3.0.0-beta.1"
2 changes: 1 addition & 1 deletion actix-web-actors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ pin-project = "1.0.0"
tokio = { version = "1", features = ["sync"] }

[dev-dependencies]
actix-rt = "2.0.0-beta.2"
actix-rt = "=2.0.0-beta.2"
env_logger = "0.7"
futures-util = { version = "0.3.7", default-features = false }
2 changes: 1 addition & 1 deletion actix-web-codegen/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ syn = { version = "1", features = ["full", "parsing"] }
proc-macro2 = "1"

[dev-dependencies]
actix-rt = "2.0.0-beta.2"
actix-rt = "=2.0.0-beta.2"
actix-web = "4.0.0-beta.1"
futures-util = { version = "0.3.7", default-features = false }
trybuild = "1"
Expand Down
2 changes: 1 addition & 1 deletion awc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ compress = ["actix-http/compress"]
actix-codec = "0.4.0-beta.1"
actix-service = "2.0.0-beta.3"
actix-http = "3.0.0-beta.1"
actix-rt = "2.0.0-beta.2"
actix-rt = "=2.0.0-beta.2"

base64 = "0.13"
bytes = "1"
Expand Down
2 changes: 1 addition & 1 deletion src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,7 @@ mod tests {
match res {
Ok(value) => Ok(HttpResponse::Ok()
.content_type("text/plain")
.body(format!("Async with block value: {}", value))),
.body(format!("Async with block value: {:?}", value))),
Err(_) => panic!("Unexpected"),
}
}
Expand Down
13 changes: 5 additions & 8 deletions src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,14 +274,11 @@ pub fn service<T: IntoPattern>(path: T) -> WebService {

/// Execute blocking function on a thread pool, returns future that resolves
/// to result of the function execution.
pub async fn block<F, I, E>(f: F) -> Result<I, BlockingError<E>>
pub fn block<F, R>(f: F) -> impl Future<Output = Result<R, BlockingError>>
where
F: FnOnce() -> Result<I, E> + Send + 'static,
I: Send + 'static,
E: Send + std::fmt::Debug + 'static,
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
match actix_rt::task::spawn_blocking(f).await {
Ok(res) => res.map_err(BlockingError::Error),
Err(_) => Err(BlockingError::Canceled),
}
let fut = actix_rt::task::spawn_blocking(f);
async { fut.await.map_err(|_| BlockingError) }
}

0 comments on commit 20cf009

Please sign in to comment.