Skip to content

Commit

Permalink
refactor thread state
Browse files Browse the repository at this point in the history
  • Loading branch information
wangrunji0408 committed Aug 14, 2020
1 parent ebec009 commit a3323c2
Showing 1 changed file with 70 additions and 70 deletions.
140 changes: 70 additions & 70 deletions zircon-object/src/task/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,34 +132,38 @@ struct ThreadInner {
}

impl ThreadInner {
fn get_state(&self) -> ThreadState {
if self.suspend_count == 0 || self.state == ThreadState::BlockedException {
fn state(&self) -> ThreadState {
// Dying > Exception > Suspend > Blocked
if self.suspend_count == 0
|| self.state == ThreadState::BlockedException
|| self.state == ThreadState::Dying
|| self.state == ThreadState::Dead
{
self.state
} else {
ThreadState::Suspended
}
}

fn update_signal(&self, base: &KObjectBase) {
if self.state == ThreadState::Dead {
base.signal_change(
/// Change state and update signal.
fn change_state(&mut self, state: ThreadState, base: &KObjectBase) {
self.state = state;
match self.state() {
ThreadState::Dead => base.signal_change(
Signal::THREAD_RUNNING | Signal::THREAD_SUSPENDED,
Signal::THREAD_TERMINATED,
);
} else if self.state == ThreadState::New || self.state == ThreadState::Dying {
base.signal_clear(
),
ThreadState::New | ThreadState::Dying => base.signal_clear(
Signal::THREAD_RUNNING | Signal::THREAD_SUSPENDED | Signal::THREAD_TERMINATED,
);
} else if self.suspend_count == 0 || self.state == ThreadState::BlockedException {
base.signal_change(
Signal::THREAD_TERMINATED | Signal::THREAD_SUSPENDED,
Signal::THREAD_RUNNING,
);
} else {
base.signal_change(
),
ThreadState::Suspended => base.signal_change(
Signal::THREAD_RUNNING | Signal::THREAD_TERMINATED,
Signal::THREAD_SUSPENDED,
);
),
_ => base.signal_change(
Signal::THREAD_TERMINATED | Signal::THREAD_SUSPENDED,
Signal::THREAD_RUNNING,
),
}
}
}
Expand Down Expand Up @@ -237,8 +241,7 @@ impl Thread {
context.general.x0 = arg1;
context.general.x1 = arg2;
}
inner.state = ThreadState::Running;
inner.update_signal(&self.base);
inner.change_state(ThreadState::Running, &self.base);
}
spawn_fn(CurrentThread(self.clone()));
Ok(())
Expand All @@ -258,9 +261,7 @@ impl Thread {
{
context.general.rflags |= 0x3202;
}
inner.state = ThreadState::Running;
inner.update_signal(&self.base);
self.base.signal_set(Signal::THREAD_RUNNING);
inner.change_state(ThreadState::Running, &self.base);
}
spawn_fn(CurrentThread(self.clone()));
Ok(())
Expand All @@ -287,10 +288,7 @@ impl Thread {
}
return;
}
inner.state = ThreadState::Dying;
// For suspended thread, wake it and clear suspend count
inner.suspend_count = 0;
inner.update_signal(&self.base);
inner.change_state(ThreadState::Dying, &self.base);
if let Some(waker) = inner.waker.take() {
waker.wake();
}
Expand Down Expand Up @@ -319,7 +317,7 @@ impl Thread {
pub fn get_thread_info(&self) -> ThreadInfo {
let inner = self.inner.lock();
ThreadInfo {
state: inner.get_state() as u32,
state: inner.state() as u32,
wait_exception_channel_type: inner
.exception
.as_ref()
Expand All @@ -331,7 +329,7 @@ impl Thread {
/// Get the thread's exception report.
pub fn get_thread_exception_info(&self) -> ZxResult<ExceptionReport> {
let inner = self.inner.lock();
if inner.get_state() != ThreadState::BlockedException {
if inner.state() != ThreadState::BlockedException {
return Err(ZxError::BAD_STATE);
}
let report = inner.exception.as_ref().ok_or(ZxError::BAD_STATE)?.report();
Expand All @@ -340,7 +338,7 @@ impl Thread {

/// Get the thread state.
pub fn state(&self) -> ThreadState {
self.inner.lock().get_state()
self.inner.lock().state()
}

/// Add the parameter to the time this thread has run on cpu.
Expand Down Expand Up @@ -382,23 +380,17 @@ impl Task for Thread {
fn suspend(&self) {
let mut inner = self.inner.lock();
inner.suspend_count += 1;
inner.update_signal(&self.base);
info!(
"thread {:?} suspend: count={}",
self.base.name(),
inner.suspend_count
);
let state = inner.state;
inner.change_state(state, &self.base);
}

fn resume(&self) {
let mut inner = self.inner.lock();
// assert_ne!(inner.suspend_count, 0);
if inner.suspend_count == 0 {
return;
}
assert_ne!(inner.suspend_count, 0);
inner.suspend_count -= 1;
if inner.suspend_count == 0 {
inner.update_signal(&self.base);
let state = inner.state;
inner.change_state(state, &self.base);
if let Some(waker) = inner.waker.take() {
waker.wake();
}
Expand Down Expand Up @@ -438,8 +430,7 @@ impl Drop for CurrentThread {
fn drop(&mut self) {
let mut inner = self.inner.lock();
self.exceptionate.shutdown();
inner.state = ThreadState::Dead;
inner.update_signal(&self.base);
inner.change_state(ThreadState::Dead, &self.base);
self.proc().remove_thread(self.base.id);
}
}
Expand All @@ -465,7 +456,7 @@ impl CurrentThread {

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut inner = self.thread.inner.lock();
if inner.suspend_count == 0 {
if inner.state() != ThreadState::Suspended {
// resume: return the context token from thread object
Poll::Ready(inner.context.take().unwrap())
} else {
Expand Down Expand Up @@ -499,13 +490,13 @@ impl CurrentThread {
{
let (old_state, killed) = {
let mut inner = self.inner.lock();
if inner.get_state() == ThreadState::Dying {
if inner.state() == ThreadState::Dying {
return Err(ZxError::STOP);
}
let (sender, receiver) = channel();
inner.killer = Some(sender);
let old_state = core::mem::replace(&mut inner.state, state);
inner.update_signal(&self.base);
let old_state = inner.state;
inner.change_state(state, &self.base);
(old_state, receiver)
};
let ret = if let Some(cancel_token) = cancel_token {
Expand All @@ -524,12 +515,11 @@ impl CurrentThread {
};
let mut inner = self.inner.lock();
inner.killer = None;
if inner.state == ThreadState::Dying {
if inner.state() == ThreadState::Dying {
return ret;
}
assert_eq!(inner.state, state);
inner.state = old_state;
inner.update_signal(&self.base);
inner.change_state(old_state, &self.base);
ret
}

Expand Down Expand Up @@ -571,7 +561,7 @@ impl CurrentThread {
{
let killed = {
let mut inner = self.inner.lock();
if inner.get_state() == ThreadState::Dead || inner.killed {
if inner.killed {
return Err(ZxError::STOP);
}
let (sender, receiver) = channel::<()>();
Expand Down Expand Up @@ -825,28 +815,38 @@ mod tests {
let root_job = Job::root();
let proc = Process::create(&root_job, "proc").expect("failed to create process");
let thread = Thread::create(&proc, "thread").expect("failed to create thread");
let thread = CurrentThread(thread);

// without suspend
let context = thread.wait_for_run().await;
thread.end_running(context);
assert_eq!(thread.state(), ThreadState::New);

// with suspend
thread.suspend();
thread.suspend();
assert_eq!(thread.state(), ThreadState::Suspended);
async_std::task::spawn({
let thread = thread.clone();
async move {
async_std::task::sleep(Duration::from_millis(10)).await;
thread.resume();
async_std::task::sleep(Duration::from_millis(10)).await;
thread.resume();
}
});
let time = timer_now();
let _context = thread.wait_for_run().await;
assert!(timer_now() - time >= Duration::from_millis(20));
thread.start(0, 0, 0, 0, spawn).unwrap();
fn spawn(thread: CurrentThread) {
async_std::task::spawn(async move {
assert_eq!(thread.state(), ThreadState::Running);

// without suspend
let context = thread.wait_for_run().await;
thread.end_running(context);

// with suspend
thread.suspend();
thread.suspend();
assert_eq!(thread.state(), ThreadState::Suspended);
async_std::task::spawn({
let thread = (*thread).clone();
async move {
async_std::task::sleep(Duration::from_millis(10)).await;
thread.resume();
async_std::task::sleep(Duration::from_millis(10)).await;
thread.resume();
}
});
let time = timer_now();
let _context = thread.wait_for_run().await;
assert!(timer_now() - time >= Duration::from_millis(20));
});
}
let thread: Arc<dyn KernelObject> = thread;
thread.wait_signal(Signal::THREAD_TERMINATED).await;
}

#[test]
Expand Down

0 comments on commit a3323c2

Please sign in to comment.