From 54e7e2f59db54e053dbde4e8f2d3b6c932d5b676 Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Wed, 30 Aug 2023 17:38:30 +0300 Subject: [PATCH] Capture language servers' stderr into server logs --- crates/language_tools/src/lsp_log.rs | 74 ++++++++++++++---------- crates/lsp/src/lsp.rs | 85 ++++++++++++++++++++++------ 2 files changed, 112 insertions(+), 47 deletions(-) diff --git a/crates/language_tools/src/lsp_log.rs b/crates/language_tools/src/lsp_log.rs index 51bdb4c5ce..60c4e41666 100644 --- a/crates/language_tools/src/lsp_log.rs +++ b/crates/language_tools/src/lsp_log.rs @@ -12,6 +12,7 @@ use gpui::{ ViewHandle, WeakModelHandle, }; use language::{Buffer, LanguageServerId, LanguageServerName}; +use lsp::IoKind; use project::{Project, Worktree}; use std::{borrow::Cow, sync::Arc}; use theme::{ui, Theme}; @@ -26,7 +27,7 @@ const RECEIVE_LINE: &str = "// Receive:\n"; pub struct LogStore { projects: HashMap, ProjectState>, - io_tx: mpsc::UnboundedSender<(WeakModelHandle, LanguageServerId, bool, String)>, + io_tx: mpsc::UnboundedSender<(WeakModelHandle, LanguageServerId, IoKind, String)>, } struct ProjectState { @@ -37,12 +38,12 @@ struct ProjectState { struct LanguageServerState { log_buffer: ModelHandle, rpc_state: Option, + _subscription: Option, } struct LanguageServerRpcState { buffer: ModelHandle, last_message_kind: Option, - _subscription: lsp::Subscription, } pub struct LspLogView { @@ -118,11 +119,11 @@ impl LogStore { io_tx, }; cx.spawn_weak(|this, mut cx| async move { - while let Some((project, server_id, is_output, mut message)) = io_rx.next().await { + while let Some((project, server_id, io_kind, mut message)) = io_rx.next().await { if let Some(this) = this.upgrade(&cx) { this.update(&mut cx, |this, cx| { message.push('\n'); - this.on_io(project, server_id, is_output, &message, cx); + this.on_io(project, server_id, io_kind, &message, cx); }); } } @@ -168,22 +169,29 @@ impl LogStore { cx: &mut ModelContext, ) -> Option> { let project_state = self.projects.get_mut(&project.downgrade())?; - Some( - project_state - .servers - .entry(id) - .or_insert_with(|| { - cx.notify(); - LanguageServerState { - rpc_state: None, - log_buffer: cx - .add_model(|cx| Buffer::new(0, cx.model_id() as u64, "")) - .clone(), - } - }) - .log_buffer - .clone(), - ) + let server_state = project_state.servers.entry(id).or_insert_with(|| { + cx.notify(); + LanguageServerState { + rpc_state: None, + log_buffer: cx + .add_model(|cx| Buffer::new(0, cx.model_id() as u64, "")) + .clone(), + _subscription: None, + } + }); + + let server = project.read(cx).language_server_for_id(id); + let weak_project = project.downgrade(); + let io_tx = self.io_tx.clone(); + server_state._subscription = server.map(|server| { + server.on_io(move |io_kind, message| { + io_tx + .unbounded_send((weak_project, id, io_kind, message.to_string())) + .ok(); + }) + }); + + Some(server_state.log_buffer.clone()) } fn add_language_server_log( @@ -230,7 +238,7 @@ impl LogStore { Some(server_state.log_buffer.clone()) } - pub fn enable_rpc_trace_for_language_server( + fn enable_rpc_trace_for_language_server( &mut self, project: &ModelHandle, server_id: LanguageServerId, @@ -239,9 +247,7 @@ impl LogStore { let weak_project = project.downgrade(); let project_state = self.projects.get_mut(&weak_project)?; let server_state = project_state.servers.get_mut(&server_id)?; - let server = project.read(cx).language_server_for_id(server_id)?; let rpc_state = server_state.rpc_state.get_or_insert_with(|| { - let io_tx = self.io_tx.clone(); let language = project.read(cx).languages().language_for_name("JSON"); let buffer = cx.add_model(|cx| Buffer::new(0, cx.model_id() as u64, "")); cx.spawn_weak({ @@ -258,11 +264,6 @@ impl LogStore { LanguageServerRpcState { buffer, last_message_kind: None, - _subscription: server.on_io(move |is_received, json| { - io_tx - .unbounded_send((weak_project, server_id, is_received, json.to_string())) - .ok(); - }), } }); Some(rpc_state.buffer.clone()) @@ -285,10 +286,25 @@ impl LogStore { &mut self, project: WeakModelHandle, language_server_id: LanguageServerId, - is_received: bool, + io_kind: IoKind, message: &str, cx: &mut AppContext, ) -> Option<()> { + let is_received = match io_kind { + IoKind::StdOut => true, + IoKind::StdIn => false, + IoKind::StdErr => { + let project = project.upgrade(cx)?; + project.update(cx, |_, cx| { + cx.emit(project::Event::LanguageServerLog( + language_server_id, + format!("stderr: {}\n", message.trim()), + )) + }); + return Some(()); + } + }; + let state = self .projects .get_mut(&project)? diff --git a/crates/lsp/src/lsp.rs b/crates/lsp/src/lsp.rs index d49dafff2f..2abe0baefa 100644 --- a/crates/lsp/src/lsp.rs +++ b/crates/lsp/src/lsp.rs @@ -35,7 +35,14 @@ const CONTENT_LEN_HEADER: &str = "Content-Length: "; type NotificationHandler = Box, &str, AsyncAppContext)>; type ResponseHandler = Box)>; -type IoHandler = Box; +type IoHandler = Box; + +#[derive(Debug, Clone, Copy)] +pub enum IoKind { + StdOut, + StdIn, + StdErr, +} #[derive(Debug, Clone, Deserialize)] pub struct LanguageServerBinary { @@ -144,16 +151,18 @@ impl LanguageServer { .args(binary.arguments) .stdin(Stdio::piped()) .stdout(Stdio::piped()) - .stderr(Stdio::inherit()) + .stderr(Stdio::piped()) .kill_on_drop(true) .spawn()?; let stdin = server.stdin.take().unwrap(); - let stout = server.stdout.take().unwrap(); + let stdout = server.stdout.take().unwrap(); + let stderr = server.stderr.take().unwrap(); let mut server = Self::new_internal( server_id.clone(), stdin, - stout, + stdout, + stderr, Some(server), root_path, code_action_kinds, @@ -181,10 +190,11 @@ impl LanguageServer { Ok(server) } - fn new_internal( + fn new_internal( server_id: LanguageServerId, stdin: Stdin, stdout: Stdout, + stderr: Stderr, server: Option, root_path: &Path, code_action_kinds: Option>, @@ -194,7 +204,8 @@ impl LanguageServer { where Stdin: AsyncWrite + Unpin + Send + 'static, Stdout: AsyncRead + Unpin + Send + 'static, - F: FnMut(AnyNotification) + 'static + Send, + Stderr: AsyncRead + Unpin + Send + 'static, + F: FnMut(AnyNotification) + 'static + Send + Clone, { let (outbound_tx, outbound_rx) = channel::unbounded::(); let (output_done_tx, output_done_rx) = barrier::channel(); @@ -203,17 +214,26 @@ impl LanguageServer { let response_handlers = Arc::new(Mutex::new(Some(HashMap::<_, ResponseHandler>::default()))); let io_handlers = Arc::new(Mutex::new(HashMap::default())); - let input_task = cx.spawn(|cx| { - Self::handle_input( - stdout, - on_unhandled_notification, - notification_handlers.clone(), - response_handlers.clone(), - io_handlers.clone(), - cx, - ) + + let stdout_input_task = cx.spawn(|cx| { + { + Self::handle_input( + stdout, + on_unhandled_notification.clone(), + notification_handlers.clone(), + response_handlers.clone(), + io_handlers.clone(), + cx, + ) + } .log_err() }); + let stderr_input_task = + cx.spawn(|_| Self::handle_stderr(stderr, io_handlers.clone()).log_err()); + let input_task = cx.spawn(|_| async move { + let (stdout, stderr) = futures::join!(stdout_input_task, stderr_input_task); + stdout.or(stderr) + }); let output_task = cx.background().spawn({ Self::handle_output( stdin, @@ -284,7 +304,7 @@ impl LanguageServer { if let Ok(message) = str::from_utf8(&buffer) { log::trace!("incoming message:{}", message); for handler in io_handlers.lock().values_mut() { - handler(true, message); + handler(IoKind::StdOut, message); } } @@ -327,6 +347,30 @@ impl LanguageServer { } } + async fn handle_stderr( + stderr: Stderr, + io_handlers: Arc>>, + ) -> anyhow::Result<()> + where + Stderr: AsyncRead + Unpin + Send + 'static, + { + let mut stderr = BufReader::new(stderr); + let mut buffer = Vec::new(); + loop { + buffer.clear(); + stderr.read_until(b'\n', &mut buffer).await?; + if let Ok(message) = str::from_utf8(&buffer) { + log::trace!("incoming stderr message:{message}"); + for handler in io_handlers.lock().values_mut() { + handler(IoKind::StdErr, message); + } + } + + // Don't starve the main thread when receiving lots of messages at once. + smol::future::yield_now().await; + } + } + async fn handle_output( stdin: Stdin, outbound_rx: channel::Receiver, @@ -348,7 +392,7 @@ impl LanguageServer { while let Ok(message) = outbound_rx.recv().await { log::trace!("outgoing message:{}", message); for handler in io_handlers.lock().values_mut() { - handler(false, &message); + handler(IoKind::StdIn, &message); } content_len_buffer.clear(); @@ -532,7 +576,7 @@ impl LanguageServer { #[must_use] pub fn on_io(&self, f: F) -> Subscription where - F: 'static + Send + FnMut(bool, &str), + F: 'static + Send + FnMut(IoKind, &str), { let id = self.next_id.fetch_add(1, SeqCst); self.io_handlers.lock().insert(id, Box::new(f)); @@ -845,12 +889,16 @@ impl LanguageServer { ) -> (Self, FakeLanguageServer) { let (stdin_writer, stdin_reader) = async_pipe::pipe(); let (stdout_writer, stdout_reader) = async_pipe::pipe(); + // writers will be dropped after we exit, so readers will also be noop for the fake servers + let (_stderr_writer, stderr_reader) = async_pipe::pipe(); + let (_stderr_writer_2, stderr_reader_2) = async_pipe::pipe(); let (notifications_tx, notifications_rx) = channel::unbounded(); let server = Self::new_internal( LanguageServerId(0), stdin_writer, stdout_reader, + stderr_reader, None, Path::new("/"), None, @@ -862,6 +910,7 @@ impl LanguageServer { LanguageServerId(0), stdout_writer, stdin_reader, + stderr_reader_2, None, Path::new("/"), None,