Skip to content

Commit

Permalink
util: fix panics on updating DelayQueue entries (tokio-rs#3270)
Browse files Browse the repository at this point in the history
  • Loading branch information
wabain authored Jan 21, 2021
1 parent 7d5b12c commit c4f66ed
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 18 deletions.
10 changes: 6 additions & 4 deletions tokio-util/src/time/delay_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ use std::task::{self, Poll, Waker};
/// # `Stream` implementation
///
/// Items are retrieved from the queue via [`DelayQueue::poll_expired`]. If no delays have
/// expired, no items are returned. In this case, `NotReady` is returned and the
/// expired, no items are returned. In this case, `Pending` is returned and the
/// current task is registered to be notified once the next item's delay has
/// expired.
///
/// If no items are in the queue, i.e. `is_empty()` returns `true`, then `poll`
/// returns `Ready(None)`. This indicates that the stream has reached an end.
/// However, if a new item is inserted *after*, `poll` will once again start
/// returning items or `NotReady.
/// returning items or `Pending.
///
/// Items are returned ordered by their expirations. Items that are configured
/// to expire first will be returned first. There are no ordering guarantees
Expand Down Expand Up @@ -538,7 +538,7 @@ impl<T> DelayQueue<T> {
///
/// delay_queue.reset_at(&key, Instant::now() + Duration::from_secs(10));
///
/// // "foo"is now scheduled to be returned in 10 seconds
/// // "foo" is now scheduled to be returned in 10 seconds
/// # }
/// ```
pub fn reset_at(&mut self, key: &Key, when: Instant) {
Expand All @@ -548,6 +548,8 @@ impl<T> DelayQueue<T> {
let when = self.normalize_deadline(when);

self.slab[key.index].when = when;
self.slab[key.index].expired = false;

self.insert_idx(when, key.index);

let next_deadline = self.next_deadline();
Expand Down Expand Up @@ -711,7 +713,7 @@ impl<T> DelayQueue<T> {
/// Returns `true` if there are no items in the queue.
///
/// Note that this function returns `false` even if all items have not yet
/// expired and a call to `poll` will return `NotReady`.
/// expired and a call to `poll` will return `Pending`.
///
/// # Examples
///
Expand Down
4 changes: 1 addition & 3 deletions tokio-util/src/time/wheel/level.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,14 +233,13 @@ fn slot_for(duration: u64, level: usize) -> usize {
((duration >> (level * 6)) % LEVEL_MULT as u64) as usize
}

/*
#[cfg(all(test, not(loom)))]
mod test {
use super::*;

#[test]
fn test_slot_for() {
for pos in 1..64 {
for pos in 0..64 {
assert_eq!(pos as usize, slot_for(pos, 0));
}

Expand All @@ -252,4 +251,3 @@ mod test {
}
}
}
*/
18 changes: 14 additions & 4 deletions tokio-util/src/time/wheel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,17 @@ where
Ok(())
}

/// Remove `item` from thee timing wheel.
/// Remove `item` from the timing wheel.
pub(crate) fn remove(&mut self, item: &T::Borrowed, store: &mut T::Store) {
let when = T::when(item, store);

assert!(
self.elapsed <= when,
"elapsed={}; when={}",
self.elapsed,
when
);

let level = self.level_for(when);

self.levels[level].remove_entry(when, item, store);
Expand Down Expand Up @@ -240,9 +248,11 @@ where
}

fn level_for(elapsed: u64, when: u64) -> usize {
let masked = elapsed ^ when;
const SLOT_MASK: u64 = (1 << 6) - 1;

assert!(masked != 0, "elapsed={}; when={}", elapsed, when);
// Mask in the trailing bits ignored by the level calculation in order to cap
// the possible leading zeros
let masked = elapsed ^ when | SLOT_MASK;

let leading_zeros = masked.leading_zeros() as usize;
let significant = 63 - leading_zeros;
Expand All @@ -255,7 +265,7 @@ mod test {

#[test]
fn test_level_for() {
for pos in 1..64 {
for pos in 0..64 {
assert_eq!(
0,
level_for(0, pos),
Expand Down
61 changes: 61 additions & 0 deletions tokio-util/tests/time_delay_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,35 @@ async fn reset_twice() {
assert!(queue.is_woken());
}

/// Regression test: Given an entry inserted with a deadline in the past, so
/// that it is placed directly on the expired queue, reset the entry to a
/// deadline in the future. Validate that this leaves the entry and queue in an
/// internally consistent state by running an additional reset on the entry
/// before polling it to completion.
#[tokio::test]
async fn repeatedly_reset_entry_inserted_as_expired() {
time::pause();
let mut queue = task::spawn(DelayQueue::new());
let now = Instant::now();

let key = queue.insert_at("foo", now - ms(100));

queue.reset_at(&key, now + ms(100));
queue.reset_at(&key, now + ms(50));

assert_pending!(poll!(queue));

time::sleep_until(now + ms(60)).await;

assert!(queue.is_woken());

let entry = assert_ready_ok!(poll!(queue)).into_inner();
assert_eq!(entry, "foo");

let entry = assert_ready!(poll!(queue));
assert!(entry.is_none());
}

#[tokio::test]
async fn remove_expired_item() {
time::pause();
Expand All @@ -261,6 +290,38 @@ async fn remove_expired_item() {
assert_eq!(entry.into_inner(), "foo");
}

/// Regression test: it should be possible to remove entries which fall in the
/// 0th slot of the internal timer wheel — that is, entries whose expiration
/// (a) falls at the beginning of one of the wheel's hierarchical levels and (b)
/// is equal to the wheel's current elapsed time.
#[tokio::test]
async fn remove_at_timer_wheel_threshold() {
time::pause();

let mut queue = task::spawn(DelayQueue::new());

let now = Instant::now();

let key1 = queue.insert_at("foo", now + ms(64));
let key2 = queue.insert_at("bar", now + ms(64));

sleep(ms(80)).await;

let entry = assert_ready_ok!(poll!(queue)).into_inner();

match entry {
"foo" => {
let entry = queue.remove(&key2).into_inner();
assert_eq!(entry, "bar");
}
"bar" => {
let entry = queue.remove(&key1).into_inner();
assert_eq!(entry, "foo");
}
other => panic!("other: {:?}", other),
}
}

#[tokio::test]
async fn expires_before_last_insert() {
time::pause();
Expand Down
4 changes: 1 addition & 3 deletions tokio/src/time/driver/wheel/level.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,14 +255,13 @@ fn slot_for(duration: u64, level: usize) -> usize {
((duration >> (level * 6)) % LEVEL_MULT as u64) as usize
}

/*
#[cfg(all(test, not(loom)))]
mod test {
use super::*;

#[test]
fn test_slot_for() {
for pos in 1..64 {
for pos in 0..64 {
assert_eq!(pos as usize, slot_for(pos, 0));
}

Expand All @@ -274,4 +273,3 @@ mod test {
}
}
}
*/
17 changes: 13 additions & 4 deletions tokio/src/time/driver/wheel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ impl Wheel {
if when == u64::max_value() {
self.pending.remove(item);
} else {
debug_assert!(
self.elapsed <= when,
"elapsed={}; when={}",
self.elapsed,
when
);

let level = self.level_for(when);

self.levels[level].remove_entry(item);
Expand Down Expand Up @@ -281,15 +288,17 @@ impl Wheel {
}

fn level_for(elapsed: u64, when: u64) -> usize {
let mut masked = elapsed ^ when;
const SLOT_MASK: u64 = (1 << 6) - 1;

// Mask in the trailing bits ignored by the level calculation in order to cap
// the possible leading zeros
let mut masked = elapsed ^ when | SLOT_MASK;

if masked >= MAX_DURATION {
// Fudge the timer into the top level
masked = MAX_DURATION - 1;
}

assert!(masked != 0, "elapsed={}; when={}", elapsed, when);

let leading_zeros = masked.leading_zeros() as usize;
let significant = 63 - leading_zeros;

Expand All @@ -302,7 +311,7 @@ mod test {

#[test]
fn test_level_for() {
for pos in 1..64 {
for pos in 0..64 {
assert_eq!(
0,
level_for(0, pos),
Expand Down

0 comments on commit c4f66ed

Please sign in to comment.