/* * Copyright (c) 2023, Andreas Kling * * SPDX-License-Identifier: BSD-2-Clause */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace Core { struct ThreadData; namespace { thread_local ThreadData* s_thread_data; } struct EventLoopTimer { int timer_id { 0 }; Time interval; Time fire_time; bool should_reload { false }; TimerShouldFireWhenNotVisible fire_when_not_visible { TimerShouldFireWhenNotVisible::No }; WeakPtr owner; void reload(Time const& now) { fire_time = now + interval; } bool has_expired(Time const& now) const { return now > fire_time; } }; struct ThreadData { static ThreadData& the() { if (!s_thread_data) { // FIXME: Don't leak this. s_thread_data = new ThreadData; } return *s_thread_data; } ThreadData() { pid = getpid(); initialize_wake_pipe(); } void initialize_wake_pipe() { if (wake_pipe_fds[0] != -1) close(wake_pipe_fds[0]); if (wake_pipe_fds[1] != -1) close(wake_pipe_fds[1]); #if defined(SOCK_NONBLOCK) int rc = pipe2(wake_pipe_fds, O_CLOEXEC); #else int rc = pipe(wake_pipe_fds); fcntl(wake_pipe_fds[0], F_SETFD, FD_CLOEXEC); fcntl(wake_pipe_fds[1], F_SETFD, FD_CLOEXEC); #endif VERIFY(rc == 0); } // Each thread has its own timers, notifiers and a wake pipe. HashMap> timers; HashTable notifiers; // The wake pipe is used to notify another event loop that someone has called wake(), or a signal has been received. // wake() writes 0i32 into the pipe, signals write the signal number (guaranteed non-zero). int wake_pipe_fds[2] { -1, -1 }; pid_t pid { 0 }; IDAllocator id_allocator; }; EventLoopImplementationUnix::EventLoopImplementationUnix() : m_wake_pipe_fds(&ThreadData::the().wake_pipe_fds) { } EventLoopImplementationUnix::~EventLoopImplementationUnix() = default; int EventLoopImplementationUnix::exec() { for (;;) { if (m_exit_requested) return m_exit_code; pump(PumpMode::WaitForEvents); } VERIFY_NOT_REACHED(); } size_t EventLoopImplementationUnix::pump(PumpMode mode) { static_cast(EventLoopManager::the()).wait_for_events(mode); return ThreadEventQueue::current().process(); } void EventLoopImplementationUnix::quit(int code) { m_exit_requested = true; m_exit_code = code; } void EventLoopImplementationUnix::unquit() { m_exit_requested = false; m_exit_code = 0; } bool EventLoopImplementationUnix::was_exit_requested() const { return m_exit_requested; } void EventLoopImplementationUnix::post_event(Object& receiver, NonnullOwnPtr&& event) { m_thread_event_queue.post_event(receiver, move(event)); if (&m_thread_event_queue != &ThreadEventQueue::current()) wake(); } void EventLoopImplementationUnix::wake() { int wake_event = 0; MUST(Core::System::write((*m_wake_pipe_fds)[1], { &wake_event, sizeof(wake_event) })); } void EventLoopManagerUnix::wait_for_events(EventLoopImplementation::PumpMode mode) { auto& thread_data = ThreadData::the(); fd_set read_fds {}; fd_set write_fds {}; retry: int max_fd = 0; auto add_fd_to_set = [&max_fd](int fd, fd_set& set) { FD_SET(fd, &set); if (fd > max_fd) max_fd = fd; }; int max_fd_added = -1; // The wake pipe informs us of POSIX signals as well as manual calls to wake() add_fd_to_set(thread_data.wake_pipe_fds[0], read_fds); max_fd = max(max_fd, max_fd_added); for (auto& notifier : thread_data.notifiers) { if (notifier->type() == Notifier::Type::Read) add_fd_to_set(notifier->fd(), read_fds); if (notifier->type() == Notifier::Type::Write) add_fd_to_set(notifier->fd(), write_fds); if (notifier->type() == Notifier::Type::Exceptional) TODO(); } bool has_pending_events = ThreadEventQueue::current().has_pending_events(); // Figure out how long to wait at maximum. // This mainly depends on the PumpMode and whether we have pending events, but also the next expiring timer. Time now; struct timeval timeout = { 0, 0 }; bool should_wait_forever = false; if (mode == EventLoopImplementation::PumpMode::WaitForEvents && !has_pending_events) { auto next_timer_expiration = get_next_timer_expiration(); if (next_timer_expiration.has_value()) { now = Time::now_monotonic_coarse(); auto computed_timeout = next_timer_expiration.value() - now; if (computed_timeout.is_negative()) computed_timeout = Time::zero(); timeout = computed_timeout.to_timeval(); } else { should_wait_forever = true; } } try_select_again: // select() and wait for file system events, calls to wake(), POSIX signals, or timer expirations. int marked_fd_count = select(max_fd + 1, &read_fds, &write_fds, nullptr, should_wait_forever ? nullptr : &timeout); // Because POSIX, we might spuriously return from select() with EINTR; just select again. if (marked_fd_count < 0) { int saved_errno = errno; if (saved_errno == EINTR) goto try_select_again; dbgln("EventLoopImplementationUnix::wait_for_events: {} ({}: {})", marked_fd_count, saved_errno, strerror(saved_errno)); VERIFY_NOT_REACHED(); } // We woke up due to a call to wake() or a POSIX signal. // Handle signals and see whether we need to handle events as well. if (FD_ISSET(thread_data.wake_pipe_fds[0], &read_fds)) { int wake_events[8]; ssize_t nread; // We might receive another signal while read()ing here. The signal will go to the handle_signal properly, // but we get interrupted. Therefore, just retry while we were interrupted. do { errno = 0; nread = read(thread_data.wake_pipe_fds[0], wake_events, sizeof(wake_events)); if (nread == 0) break; } while (nread < 0 && errno == EINTR); if (nread < 0) { perror("EventLoopImplementationUnix::wait_for_events: read from wake pipe"); VERIFY_NOT_REACHED(); } VERIFY(nread > 0); bool wake_requested = false; int event_count = nread / sizeof(wake_events[0]); for (int i = 0; i < event_count; i++) { if (wake_events[i] != 0) dispatch_signal(wake_events[i]); else wake_requested = true; } if (!wake_requested && nread == sizeof(wake_events)) goto retry; } if (!thread_data.timers.is_empty()) { now = Time::now_monotonic_coarse(); } // Handle expired timers. for (auto& it : thread_data.timers) { auto& timer = *it.value; if (!timer.has_expired(now)) continue; auto owner = timer.owner.strong_ref(); if (timer.fire_when_not_visible == TimerShouldFireWhenNotVisible::No && owner && !owner->is_visible_for_timer_purposes()) { continue; } if (owner) ThreadEventQueue::current().post_event(*owner, make(timer.timer_id)); if (timer.should_reload) { timer.reload(now); } else { // FIXME: Support removing expired timers that don't want to reload. VERIFY_NOT_REACHED(); } } if (!marked_fd_count) return; // Handle file system notifiers by making them normal events. for (auto& notifier : thread_data.notifiers) { if (notifier->type() == Notifier::Type::Read && FD_ISSET(notifier->fd(), &read_fds)) { ThreadEventQueue::current().post_event(*notifier, make(notifier->fd())); } if (notifier->type() == Notifier::Type::Write && FD_ISSET(notifier->fd(), &write_fds)) { ThreadEventQueue::current().post_event(*notifier, make(notifier->fd())); } } } class SignalHandlers : public RefCounted { AK_MAKE_NONCOPYABLE(SignalHandlers); AK_MAKE_NONMOVABLE(SignalHandlers); public: SignalHandlers(int signal_number, void (*handle_signal)(int)); ~SignalHandlers(); void dispatch(); int add(Function&& handler); bool remove(int handler_id); bool is_empty() const { if (m_calling_handlers) { for (auto& handler : m_handlers_pending) { if (handler.value) return false; // an add is pending } } return m_handlers.is_empty(); } bool have(int handler_id) const { if (m_calling_handlers) { auto it = m_handlers_pending.find(handler_id); if (it != m_handlers_pending.end()) { if (!it->value) return false; // a deletion is pending } } return m_handlers.contains(handler_id); } int m_signal_number; void (*m_original_handler)(int); // TODO: can't use sighandler_t? HashMap> m_handlers; HashMap> m_handlers_pending; bool m_calling_handlers { false }; }; struct SignalHandlersInfo { HashMap> signal_handlers; int next_signal_id { 0 }; }; static Singleton s_signals; template inline SignalHandlersInfo* signals_info() { return s_signals.ptr(); } void EventLoopManagerUnix::dispatch_signal(int signal_number) { auto& info = *signals_info(); auto handlers = info.signal_handlers.find(signal_number); if (handlers != info.signal_handlers.end()) { // Make sure we bump the ref count while dispatching the handlers! // This allows a handler to unregister/register while the handlers // are being called! auto handler = handlers->value; handler->dispatch(); } } void EventLoopImplementationUnix::notify_forked_and_in_child() { auto& thread_data = ThreadData::the(); thread_data.timers.clear(); thread_data.notifiers.clear(); thread_data.initialize_wake_pipe(); if (auto* info = signals_info()) { info->signal_handlers.clear(); info->next_signal_id = 0; } thread_data.pid = getpid(); } Optional