From 0a40a21c74083555d847ed8ca5e762871976adc8 Mon Sep 17 00:00:00 2001 From: Julia Date: Tue, 20 Feb 2024 10:26:48 -0500 Subject: [PATCH] Timeout while waiting for server to shutdown and kill it --- crates/lsp/src/lsp.rs | 35 ++++++++++++----- crates/project/src/project.rs | 74 ++++++++++++++++++++++++----------- 2 files changed, 78 insertions(+), 31 deletions(-) diff --git a/crates/lsp/src/lsp.rs b/crates/lsp/src/lsp.rs index 478b650168..342475e3f6 100644 --- a/crates/lsp/src/lsp.rs +++ b/crates/lsp/src/lsp.rs @@ -4,7 +4,7 @@ pub use lsp_types::*; use anyhow::{anyhow, Context, Result}; use collections::HashMap; -use futures::{channel::oneshot, io::BufWriter, AsyncRead, AsyncWrite, FutureExt}; +use futures::{channel::oneshot, io::BufWriter, select, AsyncRead, AsyncWrite, FutureExt}; use gpui::{AppContext, AsyncAppContext, BackgroundExecutor, Task}; use parking_lot::Mutex; use postage::{barrier, prelude::Stream}; @@ -35,6 +35,7 @@ const HEADER_DELIMITER: &'static [u8; 4] = b"\r\n\r\n"; const JSON_RPC_VERSION: &str = "2.0"; const CONTENT_LEN_HEADER: &str = "Content-Length: "; const LSP_REQUEST_TIMEOUT: Duration = Duration::from_secs(60 * 2); +const SERVER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5); type NotificationHandler = Box, &str, AsyncAppContext)>; type ResponseHandler = Box)>; @@ -61,7 +62,7 @@ pub struct LanguageServer { server_id: LanguageServerId, next_id: AtomicI32, outbound_tx: channel::Sender, - name: String, + name: Arc, capabilities: ServerCapabilities, code_action_kinds: Option>, notification_handlers: Arc>>, @@ -72,7 +73,7 @@ pub struct LanguageServer { io_tasks: Mutex>, Task>)>>, output_done_rx: Mutex>, root_path: PathBuf, - _server: Option>, + server: Arc>>, } /// Identifies a running language server. @@ -217,7 +218,7 @@ impl LanguageServer { ); if let Some(name) = binary.path.file_name() { - server.name = name.to_string_lossy().to_string(); + server.name = name.to_string_lossy().into(); } Ok(server) @@ -293,7 +294,7 @@ impl LanguageServer { notification_handlers, response_handlers, io_handlers, - name: Default::default(), + name: "".into(), capabilities: Default::default(), code_action_kinds, next_id: Default::default(), @@ -302,7 +303,7 @@ impl LanguageServer { io_tasks: Mutex::new(Some((input_task, output_task))), output_done_rx: Mutex::new(Some(output_done_rx)), root_path: root_path.to_path_buf(), - _server: server.map(|server| Mutex::new(server)), + server: Arc::new(Mutex::new(server.map(|server| server))), } } @@ -618,7 +619,7 @@ impl LanguageServer { cx.spawn(|_| async move { let response = self.request::(params).await?; if let Some(info) = response.server_info { - self.name = info.name; + self.name = info.name.into(); } self.capabilities = response.capabilities; @@ -644,14 +645,30 @@ impl LanguageServer { ); let exit = Self::notify_internal::(&outbound_tx, ()); outbound_tx.close(); + + let server = self.server.clone(); + let name = self.name.clone(); + let mut timer = self.executor.timer(SERVER_SHUTDOWN_TIMEOUT).fuse(); Some( async move { log::debug!("language server shutdown started"); - shutdown_request.await?; + + select! { + request_result = shutdown_request.fuse() => { + request_result?; + } + + _ = timer => { + log::info!("timeout waiting for language server {name} to shutdown"); + }, + } + response_handlers.lock().take(); exit?; output_done.recv().await; + server.lock().take().map(|mut child| child.kill()); log::debug!("language server shutdown finished"); + drop(tasks); anyhow::Ok(()) } @@ -931,7 +948,7 @@ impl LanguageServer { }); let method = T::METHOD; - futures::select! { + select! { response = rx.fuse() => { let elapsed = started.elapsed(); log::trace!("Took {elapsed:?} to receive response to {method:?} id {id}"); diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index b4253ec059..fba14e285b 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -23,6 +23,7 @@ use debounced_delay::DebouncedDelay; use futures::{ channel::mpsc::{self, UnboundedReceiver}, future::{try_join_all, Shared}, + select, stream::FuturesUnordered, AsyncWriteExt, Future, FutureExt, StreamExt, TryFutureExt, }; @@ -97,6 +98,7 @@ pub use runnable_inventory::Inventory; pub use worktree::*; const MAX_SERVER_REINSTALL_ATTEMPT_COUNT: u64 = 4; +const SERVER_LAUNCHING_BEFORE_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5); pub trait Item { fn entry_id(&self, cx: &AppContext) -> Option; @@ -3361,7 +3363,8 @@ impl Project { ) -> Task> { let key = (worktree_id, adapter_name); if let Some(server_id) = self.language_server_ids.remove(&key) { - log::info!("stopping language server {}", key.1 .0); + let name = key.1 .0; + log::info!("stopping language server {name}"); // Remove other entries for this language server as well let mut orphaned_worktrees = vec![worktree_id]; @@ -3395,27 +3398,8 @@ impl Project { let server_state = self.language_servers.remove(&server_id); cx.emit(Event::LanguageServerRemoved(server_id)); - cx.spawn(move |this, mut cx| async move { - let server = match server_state { - Some(LanguageServerState::Starting(task)) => task.await, - Some(LanguageServerState::Running { server, .. }) => Some(server), - None => None, - }; - - if let Some(server) = server { - if let Some(shutdown) = server.shutdown() { - shutdown.await; - } - } - - if let Some(this) = this.upgrade() { - this.update(&mut cx, |this, cx| { - this.language_server_statuses.remove(&server_id); - cx.notify(); - }) - .ok(); - } - + cx.spawn(move |this, cx| async move { + Self::shutdown_language_server(this, server_state, name, server_id, cx).await; orphaned_worktrees }) } else { @@ -3423,6 +3407,52 @@ impl Project { } } + async fn shutdown_language_server( + this: WeakModel, + server_state: Option, + name: Arc, + server_id: LanguageServerId, + mut cx: AsyncAppContext, + ) { + let server = match server_state { + Some(LanguageServerState::Starting(task)) => { + let mut timer = cx + .background_executor() + .timer(SERVER_LAUNCHING_BEFORE_SHUTDOWN_TIMEOUT) + .fuse(); + + select! { + server = task.fuse() => server, + _ = timer => { + log::info!( + "timeout waiting for language server {} to finish launching before stopping", + name + ); + None + }, + } + } + + Some(LanguageServerState::Running { server, .. }) => Some(server), + + None => None, + }; + + if let Some(server) = server { + if let Some(shutdown) = server.shutdown() { + shutdown.await; + } + } + + if let Some(this) = this.upgrade() { + this.update(&mut cx, |this, cx| { + this.language_server_statuses.remove(&server_id); + cx.notify(); + }) + .ok(); + } + } + pub fn restart_language_servers_for_buffers( &mut self, buffers: impl IntoIterator>,