Skip to content

Commit

Permalink
fixed logic bug - increment ref count when a new task is scheduled
Browse files Browse the repository at this point in the history
  • Loading branch information
senyosimpson committed Feb 13, 2022
1 parent 46acf92 commit a3f51e3
Showing 1 changed file with 16 additions and 19 deletions.
35 changes: 16 additions & 19 deletions woi/src/task/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use crate::task::header::Header;
use crate::task::state::State;
use crate::task::task::Task;
use super::header::TaskId;

// The C representation means we have guarantees on
// the memory layout of the task
Expand Down Expand Up @@ -45,7 +46,6 @@ pub struct TaskLayout {
pub struct TaskVTable {
pub(crate) poll: unsafe fn(*const ()),
pub(crate) get_output: unsafe fn(*const (), *mut ()),
pub(crate) schedule: unsafe fn(*const ()),
pub(crate) drop_join_handle: unsafe fn(*const ())
}

Expand Down Expand Up @@ -80,14 +80,15 @@ where
};

let raw = Self::from_ptr(ptr.as_ptr());
let id = TaskId::new();

let header = Header {
state: State::new(),
id,
state: State::new(id),
waker: None,
vtable: &TaskVTable {
poll: Self::poll,
get_output: Self::get_output,
schedule: Self::schedule,
drop_join_handle: Self::drop_join_handle
},
};
Expand Down Expand Up @@ -137,6 +138,11 @@ where
}

pub unsafe fn dealloc(ptr: *const()) {
let raw = Self::from_ptr(ptr);
let header = &*(raw.header as *mut Header);

tracing::debug!("Task {}: Deallocating", header.id);

let layout = Self::layout();
// TODO: Investigate if I need to use .drop_in_place()
alloc::dealloc(ptr as *mut u8, layout.layout);
Expand Down Expand Up @@ -169,21 +175,6 @@ where
tracing::debug!("Waking raw task");
let raw = Self::from_ptr(ptr);
let header = &mut *(raw.header as *mut Header);

// Commenting these checks out for now. Since we only have one thread,
// the state at this point is deterministic (running and scheduled unset)

// // Task is complete so just consume the waker
// if state.is_complete() {
// Self::drop_waker(ptr);
// }

// // If the task has already been scheduled, we don't need to do
// // anything. Again, consume the waker
// if state.is_scheduled() {
// Self::drop_waker(ptr);
// }


// TODO: We need to hold a reference count if we have to schedule
// the task otherwise we will cause UB. This is likely to require
Expand All @@ -209,10 +200,15 @@ where

unsafe fn schedule(ptr: *const ()) {
let raw = Self::from_ptr(ptr);
let header = &mut *(raw.header as *mut Header);

let task = Task {
raw: NonNull::new_unchecked(ptr as *mut ()),
};
// When we create a new task, we need to increment its reference
// count since we now have another 'thing' holding a reference
// to the raw task
header.state.ref_incr();

let scheduler = &*raw.scheduler;
scheduler.schedule(task)
Expand All @@ -222,6 +218,7 @@ where
unsafe fn poll(ptr: *const ()) {
let raw = Self::from_ptr(ptr);
let header = &mut *(raw.header as *mut Header);
let id = header.id;

let waker = Waker::from_raw(RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE));
let cx = &mut Context::from_waker(&waker);
Expand All @@ -241,7 +238,7 @@ where
let future = Pin::new_unchecked(future);
match future.poll(cx) {
Poll::Ready(out) => {
tracing::debug!("Task ready");
tracing::debug!("Task {}: ready", id);
header.state.transition_to_complete();
if header.state.has_join_waker() {
header.wake_join_handle();
Expand Down

0 comments on commit a3f51e3

Please sign in to comment.