diff --git a/Cargo.toml b/Cargo.toml index ceab28c4fb..498ba467df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -227,7 +227,7 @@ snippet = { path = "crates/snippet" } sqlez = { path = "crates/sqlez" } sqlez_macros = { path = "crates/sqlez_macros" } supermaven = { path = "crates/supermaven" } -supermaven_api = { path = "crates/supermaven_api"} +supermaven_api = { path = "crates/supermaven_api" } story = { path = "crates/story" } storybook = { path = "crates/storybook" } sum_tree = { path = "crates/sum_tree" } @@ -386,6 +386,8 @@ version = "0.53.0" features = [ "implement", "Foundation_Numerics", + "System", + "System_Threading", "Wdk_System_SystemServices", "Win32_Globalization", "Win32_Graphics_Direct2D", @@ -409,6 +411,7 @@ features = [ "Win32_System_SystemServices", "Win32_System_Threading", "Win32_System_Time", + "Win32_System_WinRT", "Win32_UI_Controls", "Win32_UI_HiDpi", "Win32_UI_Input_Ime", diff --git a/crates/gpui/src/platform/windows/dispatcher.rs b/crates/gpui/src/platform/windows/dispatcher.rs index 5a7e2b8ee1..5932132210 100644 --- a/crates/gpui/src/platform/windows/dispatcher.rs +++ b/crates/gpui/src/platform/windows/dispatcher.rs @@ -1,71 +1,97 @@ use std::{ - sync::{ - atomic::{AtomicIsize, Ordering}, - Arc, - }, thread::{current, ThreadId}, + time::Duration, }; use async_task::Runnable; -use flume::Sender; use parking::Parker; use parking_lot::Mutex; -use windows::Win32::{Foundation::*, System::Threading::*}; +use util::ResultExt; +use windows::{ + Foundation::TimeSpan, + System::{ + DispatcherQueue, DispatcherQueueController, DispatcherQueueHandler, + Threading::{ + ThreadPool, ThreadPoolTimer, TimerElapsedHandler, WorkItemHandler, WorkItemOptions, + WorkItemPriority, + }, + }, + Win32::System::WinRT::{ + CreateDispatcherQueueController, DispatcherQueueOptions, DQTAT_COM_NONE, + DQTYPE_THREAD_CURRENT, + }, +}; use crate::{PlatformDispatcher, TaskLabel}; pub(crate) struct WindowsDispatcher { - threadpool: PTP_POOL, - main_sender: Sender, + controller: DispatcherQueueController, + main_queue: DispatcherQueue, parker: Mutex, main_thread_id: ThreadId, - dispatch_event: HANDLE, } -impl WindowsDispatcher { - pub(crate) fn new(main_sender: Sender, dispatch_event: HANDLE) -> Self { - let parker = Mutex::new(Parker::new()); - let threadpool = unsafe { - let ret = CreateThreadpool(None); - if ret.0 == 0 { - panic!( - "unable to initialize a thread pool: {}", - std::io::Error::last_os_error() - ); - } - // set minimum 1 thread in threadpool - let _ = SetThreadpoolThreadMinimum(ret, 1) - .inspect_err(|_| log::error!("unable to configure thread pool")); +unsafe impl Send for WindowsDispatcher {} +unsafe impl Sync for WindowsDispatcher {} - ret +impl WindowsDispatcher { + pub(crate) fn new() -> Self { + let controller = unsafe { + let options = DispatcherQueueOptions { + dwSize: std::mem::size_of::() as u32, + threadType: DQTYPE_THREAD_CURRENT, + apartmentType: DQTAT_COM_NONE, + }; + CreateDispatcherQueueController(options).unwrap() }; + let main_queue = controller.DispatcherQueue().unwrap(); + let parker = Mutex::new(Parker::new()); let main_thread_id = current().id(); + WindowsDispatcher { - threadpool, - main_sender, + controller, + main_queue, parker, main_thread_id, - dispatch_event, } } fn dispatch_on_threadpool(&self, runnable: Runnable) { - unsafe { - let ptr = Box::into_raw(Box::new(runnable)); - let environment = get_threadpool_environment(self.threadpool); - let Ok(work) = - CreateThreadpoolWork(Some(threadpool_runner), Some(ptr as _), Some(&environment)) - .inspect_err(|_| { - log::error!( - "unable to dispatch work on thread pool: {}", - std::io::Error::last_os_error() - ) - }) - else { - return; - }; - SubmitThreadpoolWork(work); - } + let handler = { + let mut task_wrapper = Some(runnable); + WorkItemHandler::new(move |_| { + task_wrapper.take().unwrap().run(); + Ok(()) + }) + }; + ThreadPool::RunWithPriorityAndOptionsAsync( + &handler, + WorkItemPriority::High, + WorkItemOptions::TimeSliced, + ) + .log_err(); + } + + fn dispatch_on_threadpool_after(&self, runnable: Runnable, duration: Duration) { + let handler = { + let mut task_wrapper = Some(runnable); + TimerElapsedHandler::new(move |_| { + task_wrapper.take().unwrap().run(); + Ok(()) + }) + }; + let delay = TimeSpan { + // A time period expressed in 100-nanosecond units. + // 10,000,000 ticks per second + Duration: (duration.as_nanos() / 100) as i64, + }; + ThreadPoolTimer::CreateTimer(&handler, delay).log_err(); + } +} + +impl Drop for WindowsDispatcher { + fn drop(&mut self) { + self.controller.ShutdownQueueAsync().log_err(); } } @@ -82,38 +108,18 @@ impl PlatformDispatcher for WindowsDispatcher { } fn dispatch_on_main_thread(&self, runnable: Runnable) { - self.main_sender - .send(runnable) - .inspect_err(|e| log::error!("Dispatch failed: {e}")) - .ok(); - unsafe { SetEvent(self.dispatch_event) }.ok(); + let handler = { + let mut task_wrapper = Some(runnable); + DispatcherQueueHandler::new(move || { + task_wrapper.take().unwrap().run(); + Ok(()) + }) + }; + self.main_queue.TryEnqueue(&handler).log_err(); } - fn dispatch_after(&self, duration: std::time::Duration, runnable: Runnable) { - if duration.as_millis() == 0 { - self.dispatch_on_threadpool(runnable); - return; - } - unsafe { - let mut handle = std::mem::zeroed(); - let task = Arc::new(DelayedTask::new(runnable)); - let _ = CreateTimerQueueTimer( - &mut handle, - None, - Some(timer_queue_runner), - Some(Arc::into_raw(task.clone()) as _), - duration.as_millis() as u32, - 0, - WT_EXECUTEONLYONCE, - ) - .inspect_err(|_| { - log::error!( - "unable to dispatch delayed task: {}", - std::io::Error::last_os_error() - ) - }); - task.raw_timer_handle.store(handle.0, Ordering::SeqCst); - } + fn dispatch_after(&self, duration: Duration, runnable: Runnable) { + self.dispatch_on_threadpool_after(runnable, duration); } fn tick(&self, _background_only: bool) -> bool { @@ -128,48 +134,3 @@ impl PlatformDispatcher for WindowsDispatcher { self.parker.lock().unparker() } } - -extern "system" fn threadpool_runner( - _: PTP_CALLBACK_INSTANCE, - ptr: *mut std::ffi::c_void, - _: PTP_WORK, -) { - unsafe { - let runnable = Box::from_raw(ptr as *mut Runnable); - runnable.run(); - } -} - -unsafe extern "system" fn timer_queue_runner(ptr: *mut std::ffi::c_void, _: BOOLEAN) { - let task = Arc::from_raw(ptr as *mut DelayedTask); - task.runnable.lock().take().unwrap().run(); - unsafe { - let timer = task.raw_timer_handle.load(Ordering::SeqCst); - let _ = DeleteTimerQueueTimer(None, HANDLE(timer), None); - } -} - -struct DelayedTask { - runnable: Mutex>, - raw_timer_handle: AtomicIsize, -} - -impl DelayedTask { - pub fn new(runnable: Runnable) -> Self { - DelayedTask { - runnable: Mutex::new(Some(runnable)), - raw_timer_handle: AtomicIsize::new(0), - } - } -} - -#[inline] -fn get_threadpool_environment(pool: PTP_POOL) -> TP_CALLBACK_ENVIRON_V3 { - TP_CALLBACK_ENVIRON_V3 { - Version: 3, // Win7+, otherwise this value should be 1 - Pool: pool, - CallbackPriority: TP_CALLBACK_PRIORITY_NORMAL, - Size: std::mem::size_of::() as _, - ..Default::default() - } -} diff --git a/crates/gpui/src/platform/windows/events.rs b/crates/gpui/src/platform/windows/events.rs index 89315eac07..b79ddced5d 100644 --- a/crates/gpui/src/platform/windows/events.rs +++ b/crates/gpui/src/platform/windows/events.rs @@ -171,9 +171,6 @@ fn handle_timer_msg( state_ptr: Rc, ) -> Option { if wparam.0 == SIZE_MOVE_LOOP_TIMER_ID { - for runnable in state_ptr.main_receiver.drain() { - runnable.run(); - } handle_paint_msg(handle, state_ptr) } else { None diff --git a/crates/gpui/src/platform/windows/platform.rs b/crates/gpui/src/platform/windows/platform.rs index a38e0d18b4..470d6be0e4 100644 --- a/crates/gpui/src/platform/windows/platform.rs +++ b/crates/gpui/src/platform/windows/platform.rs @@ -13,7 +13,6 @@ use std::{ use ::util::ResultExt; use anyhow::{anyhow, Context, Result}; -use async_task::Runnable; use copypasta::{ClipboardContext, ClipboardProvider}; use futures::channel::oneshot::{self, Receiver}; use itertools::Itertools; @@ -42,11 +41,9 @@ pub(crate) struct WindowsPlatform { raw_window_handles: RwLock>, // The below members will never change throughout the entire lifecycle of the app. icon: HICON, - main_receiver: flume::Receiver, background_executor: BackgroundExecutor, foreground_executor: ForegroundExecutor, text_system: Arc, - dispatch_event: OwnedHandle, } pub(crate) struct WindowsPlatformState { @@ -85,10 +82,7 @@ impl WindowsPlatform { unsafe { OleInitialize(None).expect("unable to initialize Windows OLE"); } - let (main_sender, main_receiver) = flume::unbounded::(); - let dispatch_event = - OwnedHandle::new(unsafe { CreateEventW(None, false, false, None) }.unwrap()); - let dispatcher = Arc::new(WindowsDispatcher::new(main_sender, dispatch_event.to_raw())); + let dispatcher = Arc::new(WindowsDispatcher::new()); let background_executor = BackgroundExecutor::new(dispatcher.clone()); let foreground_executor = ForegroundExecutor::new(dispatcher); let text_system = if let Some(direct_write) = DirectWriteTextSystem::new().log_err() { @@ -106,18 +100,9 @@ impl WindowsPlatform { state, raw_window_handles, icon, - main_receiver, background_executor, foreground_executor, text_system, - dispatch_event, - } - } - - #[inline] - fn run_foreground_tasks(&self) { - for runnable in self.main_receiver.drain() { - runnable.run(); } } @@ -201,7 +186,6 @@ impl Platform for WindowsPlatform { fn run(&self, on_finish_launching: Box) { on_finish_launching(); - let dispatch_event = self.dispatch_event.to_raw(); let vsync_event = create_event().unwrap(); let timer_stop_event = create_event().unwrap(); let raw_timer_stop_event = timer_stop_event.to_raw(); @@ -209,7 +193,7 @@ impl Platform for WindowsPlatform { 'a: loop { let wait_result = unsafe { MsgWaitForMultipleObjects( - Some(&[vsync_event.to_raw(), dispatch_event]), + Some(&[vsync_event.to_raw()]), false, INFINITE, QS_ALLINPUT, @@ -221,12 +205,8 @@ impl Platform for WindowsPlatform { WAIT_EVENT(0) => { self.redraw_all(); } - // foreground tasks are dispatched - WAIT_EVENT(1) => { - self.run_foreground_tasks(); - } // Windows thread messages are posted - WAIT_EVENT(2) => { + WAIT_EVENT(1) => { let mut msg = MSG::default(); unsafe { while PeekMessageW(&mut msg, None, 0, 0, PM_REMOVE).as_bool() { @@ -245,9 +225,6 @@ impl Platform for WindowsPlatform { } } } - - // foreground tasks may have been queued in the message handlers - self.run_foreground_tasks(); } _ => { log::error!("Something went wrong while waiting {:?}", wait_result); @@ -344,7 +321,6 @@ impl Platform for WindowsPlatform { options, self.icon, self.foreground_executor.clone(), - self.main_receiver.clone(), lock.settings.mouse_wheel_settings, lock.current_cursor, ); diff --git a/crates/gpui/src/platform/windows/window.rs b/crates/gpui/src/platform/windows/window.rs index 753abe56d8..55ecd0829c 100644 --- a/crates/gpui/src/platform/windows/window.rs +++ b/crates/gpui/src/platform/windows/window.rs @@ -12,7 +12,6 @@ use std::{ use ::util::ResultExt; use anyhow::Context; -use async_task::Runnable; use futures::channel::oneshot::{self, Receiver}; use itertools::Itertools; use raw_window_handle as rwh; @@ -58,7 +57,6 @@ pub(crate) struct WindowsWindowStatePtr { pub(crate) handle: AnyWindowHandle, pub(crate) hide_title_bar: bool, pub(crate) executor: ForegroundExecutor, - pub(crate) main_receiver: flume::Receiver, } impl WindowsWindowState { @@ -208,7 +206,6 @@ impl WindowsWindowStatePtr { handle: context.handle, hide_title_bar: context.hide_title_bar, executor: context.executor.clone(), - main_receiver: context.main_receiver.clone(), }) } } @@ -232,7 +229,6 @@ struct WindowCreateContext { display: WindowsDisplay, transparent: bool, executor: ForegroundExecutor, - main_receiver: flume::Receiver, mouse_wheel_settings: MouseWheelSettings, current_cursor: HCURSOR, } @@ -243,7 +239,6 @@ impl WindowsWindow { params: WindowParams, icon: HICON, executor: ForegroundExecutor, - main_receiver: flume::Receiver, mouse_wheel_settings: MouseWheelSettings, current_cursor: HCURSOR, ) -> Self { @@ -272,7 +267,6 @@ impl WindowsWindow { display: WindowsDisplay::primary_monitor().unwrap(), transparent: params.window_background != WindowBackgroundAppearance::Opaque, executor, - main_receiver, mouse_wheel_settings, current_cursor, };