#[cfg(windows)] use crate::os::windows::event::EventHandle; #[cfg(target_os = "macos")] use core_foundation::runloop::*; use failure::Fallible; use promise::{BasicExecutor, SpawnFunc}; use std::collections::VecDeque; use std::sync::{Arc, Mutex}; #[cfg(all(unix, not(target_os = "macos")))] use { filedescriptor::{FileDescriptor, Pipe}, mio::unix::EventedFd, mio::{Evented, Poll, PollOpt, Ready, Token}, std::os::unix::io::AsRawFd, }; lazy_static::lazy_static! { pub(crate) static ref SPAWN_QUEUE: Arc = Arc::new(SpawnQueue::new().expect("failed to create SpawnQueue")); } pub(crate) struct SpawnQueue { spawned_funcs: Mutex>, spawned_funcs_low_pri: Mutex>, #[cfg(windows)] pub event_handle: EventHandle, #[cfg(all(unix, not(target_os = "macos")))] write: Mutex, #[cfg(all(unix, not(target_os = "macos")))] read: Mutex, } impl SpawnQueue { pub fn new() -> Fallible { Self::new_impl() } pub fn spawn(&self, f: SpawnFunc) { self.spawn_impl(f, true) } pub fn run(&self) -> bool { self.run_impl() } // This needs to be a separate function from the loop in `run` // in order for the lock to be released before we call the // returned function fn pop_func(&self) -> Option { if let Some(func) = self.spawned_funcs.lock().unwrap().pop_front() { Some(func) } else { self.spawned_funcs_low_pri.lock().unwrap().pop_front() } } fn queue_func(&self, f: SpawnFunc, high_pri: bool) { if high_pri { self.spawned_funcs.lock().unwrap() } else { self.spawned_funcs_low_pri.lock().unwrap() } .push_back(f); } fn has_any_queued(&self) -> bool { !self.spawned_funcs.lock().unwrap().is_empty() || !self.spawned_funcs_low_pri.lock().unwrap().is_empty() } } #[cfg(windows)] impl SpawnQueue { fn new_impl() -> Fallible { let spawned_funcs = Mutex::new(VecDeque::new()); let spawned_funcs_low_pri = Mutex::new(VecDeque::new()); let event_handle = EventHandle::new_manual_reset().expect("EventHandle creation failed"); Ok(Self { spawned_funcs, spawned_funcs_low_pri, event_handle, }) } fn spawn_impl(&self, f: SpawnFunc, high_pri: bool) { self.queue_func(f, high_pri); self.event_handle.set_event(); } fn run_impl(&self) -> bool { self.event_handle.reset_event(); while let Some(func) = self.pop_func() { func(); } self.has_any_queued() } } #[cfg(all(unix, not(target_os = "macos")))] impl SpawnQueue { fn new_impl() -> Fallible { // On linux we have a slightly sloppy wakeup mechanism; // we have a non-blocking pipe that we can use to get // woken up after some number of enqueues. We don't // guarantee a 1:1 enqueue to wakeup with this mechanism // but in practical terms it does guarantee a wakeup // if the main thread is asleep and we enqueue some // number of items. // We can't affort to use a blocking pipe for the wakeup // because the write needs to hold a mutex and that // can block reads as well as other writers. let pipe = Pipe::new()?; let on = 1; unsafe { libc::ioctl(pipe.write.as_raw_fd(), libc::FIONBIO, &on); libc::ioctl(pipe.read.as_raw_fd(), libc::FIONBIO, &on); } Ok(Self { spawned_funcs: Mutex::new(VecDeque::new()), spawned_funcs_low_pri: Mutex::new(VecDeque::new()), write: Mutex::new(pipe.write), read: Mutex::new(pipe.read), }) } fn spawn_impl(&self, f: SpawnFunc, high_pri: bool) { use std::io::Write; self.queue_func(f, high_pri); self.write.lock().unwrap().write(b"x").ok(); } fn run_impl(&self) -> bool { // On linux we only ever process one at at time, so that // we can return to the main loop and process messages // from the X server if let Some(func) = self.pop_func() { func(); } // try to drain the pipe. // We do this regardless of whether we popped an item // so that we avoid being in a perpetually signalled state. // It is ok if we completely drain the pipe because the // main loop uses the return value to set the sleep // interval and will unconditionally call us on each // iteration. let mut byte = [0u8; 64]; use std::io::Read; self.read.lock().unwrap().read(&mut byte).ok(); self.has_any_queued() } } #[cfg(all(unix, not(target_os = "macos")))] impl Evented for SpawnQueue { fn register( &self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt, ) -> std::io::Result<()> { EventedFd(&self.read.lock().unwrap().as_raw_fd()).register(poll, token, interest, opts) } fn reregister( &self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt, ) -> std::io::Result<()> { EventedFd(&self.read.lock().unwrap().as_raw_fd()).reregister(poll, token, interest, opts) } fn deregister(&self, poll: &Poll) -> std::io::Result<()> { EventedFd(&self.read.lock().unwrap().as_raw_fd()).deregister(poll) } } #[cfg(target_os = "macos")] impl SpawnQueue { fn new_impl() -> Fallible { let spawned_funcs = Mutex::new(VecDeque::new()); let spawned_funcs_low_pri = Mutex::new(VecDeque::new()); let observer = unsafe { CFRunLoopObserverCreate( std::ptr::null(), kCFRunLoopAllActivities, 1, 0, SpawnQueue::trigger, std::ptr::null_mut(), ) }; unsafe { CFRunLoopAddObserver(CFRunLoopGetMain(), observer, kCFRunLoopCommonModes); } Ok(Self { spawned_funcs, spawned_funcs_low_pri, }) } extern "C" fn trigger( _observer: *mut __CFRunLoopObserver, _: CFRunLoopActivity, _: *mut std::ffi::c_void, ) { if SPAWN_QUEUE.run() { Self::queue_wakeup(); } } fn queue_wakeup() { unsafe { CFRunLoopWakeUp(CFRunLoopGetMain()); } } fn spawn_impl(&self, f: SpawnFunc, high_pri: bool) { self.queue_func(f, high_pri); Self::queue_wakeup(); } fn run_impl(&self) -> bool { if let Some(func) = self.pop_func() { func(); } self.has_any_queued() } } pub struct SpawnQueueExecutor; impl BasicExecutor for SpawnQueueExecutor { fn execute(&self, f: SpawnFunc) { SPAWN_QUEUE.spawn(f) } } pub struct LowPriSpawnQueueExecutor; impl BasicExecutor for LowPriSpawnQueueExecutor { fn execute(&self, f: SpawnFunc) { SPAWN_QUEUE.spawn_impl(f, false) } }