Skip to content

Commit

Permalink
windows: Better dispatcher (zed-industries#11485)
Browse files Browse the repository at this point in the history
This PR leverages a more modern Windows API to implement
`WindowsDispatcher`, aligning its implementation more closely with that
of the `macOS` platform. The following improvements have been made:

1. Similar to `macOS`, there is no longer a need to use `sender` and
`receiver` to dispatch a `Runnable` on the main thread.
2. There is also no longer a need to use an `Event` for synchronization.
3. Consistent with zed-industries#7506 and zed-industries#11269, `Runnable` is now executed with
high priority.

However, this PR raises the minimum Windows version requirement of
`GPUI` to Windows 10, specifically Windows 10 Fall Creators Update
(10.0.16299). However, the `alacritty_terminal` dependency in Zed relies
on `conPTY` on Windows, an API introduced in the Windows 10 Fall
Creators Update. Therefore, the impact of this PR on Zed should be
minimal. I'd like to hear your voices about this PR, especially about
the minimum Windows version bumping.

Release Notes:

- N/A
  • Loading branch information
JunkuiZhang authored May 9, 2024
1 parent ba25e37 commit 95e246a
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 155 deletions.
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ snippet = { path = "crates/snippet" }
sqlez = { path = "crates/sqlez" }
sqlez_macros = { path = "crates/sqlez_macros" }
supermaven = { path = "crates/supermaven" }
supermaven_api = { path = "crates/supermaven_api"}
supermaven_api = { path = "crates/supermaven_api" }
story = { path = "crates/story" }
storybook = { path = "crates/storybook" }
sum_tree = { path = "crates/sum_tree" }
Expand Down Expand Up @@ -386,6 +386,8 @@ version = "0.53.0"
features = [
"implement",
"Foundation_Numerics",
"System",
"System_Threading",
"Wdk_System_SystemServices",
"Win32_Globalization",
"Win32_Graphics_Direct2D",
Expand All @@ -409,6 +411,7 @@ features = [
"Win32_System_SystemServices",
"Win32_System_Threading",
"Win32_System_Time",
"Win32_System_WinRT",
"Win32_UI_Controls",
"Win32_UI_HiDpi",
"Win32_UI_Input_Ime",
Expand Down
197 changes: 79 additions & 118 deletions crates/gpui/src/platform/windows/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,71 +1,97 @@
use std::{
sync::{
atomic::{AtomicIsize, Ordering},
Arc,
},
thread::{current, ThreadId},
time::Duration,
};

use async_task::Runnable;
use flume::Sender;
use parking::Parker;
use parking_lot::Mutex;
use windows::Win32::{Foundation::*, System::Threading::*};
use util::ResultExt;
use windows::{
Foundation::TimeSpan,
System::{
DispatcherQueue, DispatcherQueueController, DispatcherQueueHandler,
Threading::{
ThreadPool, ThreadPoolTimer, TimerElapsedHandler, WorkItemHandler, WorkItemOptions,
WorkItemPriority,
},
},
Win32::System::WinRT::{
CreateDispatcherQueueController, DispatcherQueueOptions, DQTAT_COM_NONE,
DQTYPE_THREAD_CURRENT,
},
};

use crate::{PlatformDispatcher, TaskLabel};

pub(crate) struct WindowsDispatcher {
threadpool: PTP_POOL,
main_sender: Sender<Runnable>,
controller: DispatcherQueueController,
main_queue: DispatcherQueue,
parker: Mutex<Parker>,
main_thread_id: ThreadId,
dispatch_event: HANDLE,
}

unsafe impl Send for WindowsDispatcher {}
unsafe impl Sync for WindowsDispatcher {}

impl WindowsDispatcher {
pub(crate) fn new(main_sender: Sender<Runnable>, dispatch_event: HANDLE) -> Self {
let parker = Mutex::new(Parker::new());
let threadpool = unsafe {
let ret = CreateThreadpool(None);
if ret.0 == 0 {
panic!(
"unable to initialize a thread pool: {}",
std::io::Error::last_os_error()
);
}
// set minimum 1 thread in threadpool
let _ = SetThreadpoolThreadMinimum(ret, 1)
.inspect_err(|_| log::error!("unable to configure thread pool"));

ret
pub(crate) fn new() -> Self {
let controller = unsafe {
let options = DispatcherQueueOptions {
dwSize: std::mem::size_of::<DispatcherQueueOptions>() as u32,
threadType: DQTYPE_THREAD_CURRENT,
apartmentType: DQTAT_COM_NONE,
};
CreateDispatcherQueueController(options).unwrap()
};
let main_queue = controller.DispatcherQueue().unwrap();
let parker = Mutex::new(Parker::new());
let main_thread_id = current().id();

WindowsDispatcher {
threadpool,
main_sender,
controller,
main_queue,
parker,
main_thread_id,
dispatch_event,
}
}

fn dispatch_on_threadpool(&self, runnable: Runnable) {
unsafe {
let ptr = Box::into_raw(Box::new(runnable));
let environment = get_threadpool_environment(self.threadpool);
let Ok(work) =
CreateThreadpoolWork(Some(threadpool_runner), Some(ptr as _), Some(&environment))
.inspect_err(|_| {
log::error!(
"unable to dispatch work on thread pool: {}",
std::io::Error::last_os_error()
)
})
else {
return;
};
SubmitThreadpoolWork(work);
}
let handler = {
let mut task_wrapper = Some(runnable);
WorkItemHandler::new(move |_| {
task_wrapper.take().unwrap().run();
Ok(())
})
};
ThreadPool::RunWithPriorityAndOptionsAsync(
&handler,
WorkItemPriority::High,
WorkItemOptions::TimeSliced,
)
.log_err();
}

fn dispatch_on_threadpool_after(&self, runnable: Runnable, duration: Duration) {
let handler = {
let mut task_wrapper = Some(runnable);
TimerElapsedHandler::new(move |_| {
task_wrapper.take().unwrap().run();
Ok(())
})
};
let delay = TimeSpan {
// A time period expressed in 100-nanosecond units.
// 10,000,000 ticks per second
Duration: (duration.as_nanos() / 100) as i64,
};
ThreadPoolTimer::CreateTimer(&handler, delay).log_err();
}
}

impl Drop for WindowsDispatcher {
fn drop(&mut self) {
self.controller.ShutdownQueueAsync().log_err();
}
}

Expand All @@ -82,38 +108,18 @@ impl PlatformDispatcher for WindowsDispatcher {
}

fn dispatch_on_main_thread(&self, runnable: Runnable) {
self.main_sender
.send(runnable)
.inspect_err(|e| log::error!("Dispatch failed: {e}"))
.ok();
unsafe { SetEvent(self.dispatch_event) }.ok();
let handler = {
let mut task_wrapper = Some(runnable);
DispatcherQueueHandler::new(move || {
task_wrapper.take().unwrap().run();
Ok(())
})
};
self.main_queue.TryEnqueue(&handler).log_err();
}

fn dispatch_after(&self, duration: std::time::Duration, runnable: Runnable) {
if duration.as_millis() == 0 {
self.dispatch_on_threadpool(runnable);
return;
}
unsafe {
let mut handle = std::mem::zeroed();
let task = Arc::new(DelayedTask::new(runnable));
let _ = CreateTimerQueueTimer(
&mut handle,
None,
Some(timer_queue_runner),
Some(Arc::into_raw(task.clone()) as _),
duration.as_millis() as u32,
0,
WT_EXECUTEONLYONCE,
)
.inspect_err(|_| {
log::error!(
"unable to dispatch delayed task: {}",
std::io::Error::last_os_error()
)
});
task.raw_timer_handle.store(handle.0, Ordering::SeqCst);
}
fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
self.dispatch_on_threadpool_after(runnable, duration);
}

fn tick(&self, _background_only: bool) -> bool {
Expand All @@ -128,48 +134,3 @@ impl PlatformDispatcher for WindowsDispatcher {
self.parker.lock().unparker()
}
}

extern "system" fn threadpool_runner(
_: PTP_CALLBACK_INSTANCE,
ptr: *mut std::ffi::c_void,
_: PTP_WORK,
) {
unsafe {
let runnable = Box::from_raw(ptr as *mut Runnable);
runnable.run();
}
}

unsafe extern "system" fn timer_queue_runner(ptr: *mut std::ffi::c_void, _: BOOLEAN) {
let task = Arc::from_raw(ptr as *mut DelayedTask);
task.runnable.lock().take().unwrap().run();
unsafe {
let timer = task.raw_timer_handle.load(Ordering::SeqCst);
let _ = DeleteTimerQueueTimer(None, HANDLE(timer), None);
}
}

struct DelayedTask {
runnable: Mutex<Option<Runnable>>,
raw_timer_handle: AtomicIsize,
}

impl DelayedTask {
pub fn new(runnable: Runnable) -> Self {
DelayedTask {
runnable: Mutex::new(Some(runnable)),
raw_timer_handle: AtomicIsize::new(0),
}
}
}

#[inline]
fn get_threadpool_environment(pool: PTP_POOL) -> TP_CALLBACK_ENVIRON_V3 {
TP_CALLBACK_ENVIRON_V3 {
Version: 3, // Win7+, otherwise this value should be 1
Pool: pool,
CallbackPriority: TP_CALLBACK_PRIORITY_NORMAL,
Size: std::mem::size_of::<TP_CALLBACK_ENVIRON_V3>() as _,
..Default::default()
}
}
3 changes: 0 additions & 3 deletions crates/gpui/src/platform/windows/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,6 @@ fn handle_timer_msg(
state_ptr: Rc<WindowsWindowStatePtr>,
) -> Option<isize> {
if wparam.0 == SIZE_MOVE_LOOP_TIMER_ID {
for runnable in state_ptr.main_receiver.drain() {
runnable.run();
}
handle_paint_msg(handle, state_ptr)
} else {
None
Expand Down
30 changes: 3 additions & 27 deletions crates/gpui/src/platform/windows/platform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use std::{

use ::util::ResultExt;
use anyhow::{anyhow, Context, Result};
use async_task::Runnable;
use copypasta::{ClipboardContext, ClipboardProvider};
use futures::channel::oneshot::{self, Receiver};
use itertools::Itertools;
Expand Down Expand Up @@ -42,11 +41,9 @@ pub(crate) struct WindowsPlatform {
raw_window_handles: RwLock<SmallVec<[HWND; 4]>>,
// The below members will never change throughout the entire lifecycle of the app.
icon: HICON,
main_receiver: flume::Receiver<Runnable>,
background_executor: BackgroundExecutor,
foreground_executor: ForegroundExecutor,
text_system: Arc<dyn PlatformTextSystem>,
dispatch_event: OwnedHandle,
}

pub(crate) struct WindowsPlatformState {
Expand Down Expand Up @@ -85,10 +82,7 @@ impl WindowsPlatform {
unsafe {
OleInitialize(None).expect("unable to initialize Windows OLE");
}
let (main_sender, main_receiver) = flume::unbounded::<Runnable>();
let dispatch_event =
OwnedHandle::new(unsafe { CreateEventW(None, false, false, None) }.unwrap());
let dispatcher = Arc::new(WindowsDispatcher::new(main_sender, dispatch_event.to_raw()));
let dispatcher = Arc::new(WindowsDispatcher::new());
let background_executor = BackgroundExecutor::new(dispatcher.clone());
let foreground_executor = ForegroundExecutor::new(dispatcher);
let text_system = if let Some(direct_write) = DirectWriteTextSystem::new().log_err() {
Expand All @@ -106,18 +100,9 @@ impl WindowsPlatform {
state,
raw_window_handles,
icon,
main_receiver,
background_executor,
foreground_executor,
text_system,
dispatch_event,
}
}

#[inline]
fn run_foreground_tasks(&self) {
for runnable in self.main_receiver.drain() {
runnable.run();
}
}

Expand Down Expand Up @@ -201,15 +186,14 @@ impl Platform for WindowsPlatform {

fn run(&self, on_finish_launching: Box<dyn 'static + FnOnce()>) {
on_finish_launching();
let dispatch_event = self.dispatch_event.to_raw();
let vsync_event = create_event().unwrap();
let timer_stop_event = create_event().unwrap();
let raw_timer_stop_event = timer_stop_event.to_raw();
begin_vsync_timer(vsync_event.to_raw(), timer_stop_event);
'a: loop {
let wait_result = unsafe {
MsgWaitForMultipleObjects(
Some(&[vsync_event.to_raw(), dispatch_event]),
Some(&[vsync_event.to_raw()]),
false,
INFINITE,
QS_ALLINPUT,
Expand All @@ -221,12 +205,8 @@ impl Platform for WindowsPlatform {
WAIT_EVENT(0) => {
self.redraw_all();
}
// foreground tasks are dispatched
WAIT_EVENT(1) => {
self.run_foreground_tasks();
}
// Windows thread messages are posted
WAIT_EVENT(2) => {
WAIT_EVENT(1) => {
let mut msg = MSG::default();
unsafe {
while PeekMessageW(&mut msg, None, 0, 0, PM_REMOVE).as_bool() {
Expand All @@ -245,9 +225,6 @@ impl Platform for WindowsPlatform {
}
}
}

// foreground tasks may have been queued in the message handlers
self.run_foreground_tasks();
}
_ => {
log::error!("Something went wrong while waiting {:?}", wait_result);
Expand Down Expand Up @@ -344,7 +321,6 @@ impl Platform for WindowsPlatform {
options,
self.icon,
self.foreground_executor.clone(),
self.main_receiver.clone(),
lock.settings.mouse_wheel_settings,
lock.current_cursor,
);
Expand Down
Loading

0 comments on commit 95e246a

Please sign in to comment.