Skip to content

Commit

Permalink
fix prio retry and FINALLY add test...
Browse files Browse the repository at this point in the history
  • Loading branch information
bold committed May 12, 2019
1 parent 6f8dcff commit e1090ea
Showing 1 changed file with 51 additions and 19 deletions.
70 changes: 51 additions & 19 deletions src/future/prio_retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,30 +83,32 @@ where
type Error = Error<S::Error>;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.stream.poll() {
Ok(Async::NotReady) => {}
Ok(Async::Ready(Some(new_item))) => {
// check if we currently have a delay item
if let Some(ref mut delayed_item) = self.delayed_item {
if delayed_item.value < new_item {
loop {
match self.stream.poll() {
Ok(Async::NotReady) => { break; }
Ok(Async::Ready(Some(new_item))) => {
// check if we currently have a delay item
if let Some(ref mut delayed_item) = self.delayed_item {
if delayed_item.value < new_item {
// we have new item, this one will be yielded instantly
self.delayed_item = Some(DelayedItem::new(new_item.clone()));
return Ok(Async::Ready(Some(new_item)));
} else if delayed_item.value == new_item {
// if the current item was requeued, then we will yield it with a backoff
delayed_item.exp_backoff(self.delay_duration);
}
} else {
// we have new item, this one will be yielded instantly
self.delayed_item = Some(DelayedItem::new(new_item.clone()));
return Ok(Async::Ready(Some(new_item)));
} else if delayed_item.value == new_item {
// if the current item was requeued, then we will yield it with a backoff
delayed_item.exp_backoff(self.delay_duration);
}
} else {
// we have new item, this one will be yielded instantly
self.delayed_item = Some(DelayedItem::new(new_item.clone()));
return Ok(Async::Ready(Some(new_item)));
}
}
Ok(Async::Ready(None)) => {
return Ok(Async::Ready(None));
}
Err(e) => {
return Err(Error(Kind::Inner(e)));
Ok(Async::Ready(None)) => {
return Ok(Async::Ready(None));
}
Err(e) => {
return Err(Error(Kind::Inner(e)));
}
}
}

Expand All @@ -125,6 +127,36 @@ where
}
}
};

Ok(Async::NotReady)
}
}

#[cfg(test)]
mod tests {
use super::*;
use tokio;
use tokio::timer::Interval;

#[test]
fn test_prio_retry() {
let mut items = vec![0, 1, 2, 3, 3, 3, 0, 1, 2, 2, 6, 5, 7].into_iter();
let len = items.len();
let items = Interval::new(Instant::now(), Duration::from_millis(200))
.take(len as u64)
.map(move |_| {
items.next().unwrap()
})
.map_err(|e| error!("can't consume interval: {:?}", e));
let exp: Vec<i64> = vec![0, 1, 2, 3, 3, 3, 6, 7];
let stream = PrioRetry::new(items, Duration::from_millis(100));
let res = stream.collect();
tokio::run(res.then(move |res| {
match res {
Err(_) => assert!(false),
Ok(items) => assert_eq!(items, exp, "can't get expected items from prio retry"),
};
Ok(())
}));
}
}

0 comments on commit e1090ea

Please sign in to comment.