From 5a558e0d8e20eb5b5d474e0f27fd51f4c633dd80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bla=C5=BE=20Hrastnik?= Date: Tue, 31 Aug 2021 16:48:59 +0900 Subject: [PATCH] lsp: Delay requests & notifications until initialization is complete --- helix-lsp/src/client.rs | 15 ++++--- helix-lsp/src/lib.rs | 11 +++-- helix-lsp/src/transport.rs | 89 ++++++++++++++++++++++++++++++++------ 3 files changed, 90 insertions(+), 25 deletions(-) diff --git a/helix-lsp/src/client.rs b/helix-lsp/src/client.rs index 87078c699..02cd57477 100644 --- a/helix-lsp/src/client.rs +++ b/helix-lsp/src/client.rs @@ -9,13 +9,16 @@ use serde_json::Value; use std::future::Future; use std::process::Stdio; -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; use tokio::{ io::{BufReader, BufWriter}, process::{Child, Command}, sync::{ mpsc::{channel, UnboundedReceiver, UnboundedSender}, - OnceCell, + Notify, OnceCell, }, }; @@ -31,12 +34,13 @@ pub struct Client { } impl Client { + #[allow(clippy::type_complexity)] pub fn start( cmd: &str, args: &[String], config: Option, id: usize, - ) -> Result<(Self, UnboundedReceiver<(usize, Call)>)> { + ) -> Result<(Self, UnboundedReceiver<(usize, Call)>, Arc)> { let process = Command::new(cmd) .args(args) .stdin(Stdio::piped()) @@ -53,7 +57,8 @@ pub fn start( let reader = BufReader::new(process.stdout.take().expect("Failed to open stdout")); let stderr = BufReader::new(process.stderr.take().expect("Failed to open stderr")); - let (server_rx, server_tx) = Transport::start(reader, writer, stderr, id); + let (server_rx, server_tx, initialize_notify) = + Transport::start(reader, writer, stderr, id); let client = Self { id, @@ -65,7 +70,7 @@ pub fn start( config, }; - Ok((client, server_rx)) + Ok((client, server_rx, initialize_notify)) } pub fn id(&self) -> usize { diff --git a/helix-lsp/src/lib.rs b/helix-lsp/src/lib.rs index a118239fb..3a761ad02 100644 --- a/helix-lsp/src/lib.rs +++ b/helix-lsp/src/lib.rs @@ -312,7 +312,7 @@ pub fn get(&mut self, language_config: &LanguageConfiguration) -> Result { // initialize a new client let id = self.counter.fetch_add(1, Ordering::Relaxed); - let (client, incoming) = Client::start( + let (client, incoming, initialize_notify) = Client::start( &config.command, &config.args, serde_json::from_str(language_config.config.as_deref().unwrap_or("")).ok(), @@ -322,9 +322,9 @@ pub fn get(&mut self, language_config: &LanguageConfiguration) -> Result Result(lsp::InitializedParams {}) .await .unwrap(); - }); - // TODO: remove this block - futures_executor::block_on(initialize).map_err(|_| anyhow::anyhow!("bail"))?; + initialize_notify.notify_one(); + }); entry.insert((id, client.clone())); Ok(client) diff --git a/helix-lsp/src/transport.rs b/helix-lsp/src/transport.rs index 9353de20c..071c5b937 100644 --- a/helix-lsp/src/transport.rs +++ b/helix-lsp/src/transport.rs @@ -1,7 +1,7 @@ use crate::{Error, Result}; use anyhow::Context; use jsonrpc_core as jsonrpc; -use log::{debug, error, info, warn}; +use log::{error, info}; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::HashMap; @@ -11,7 +11,7 @@ process::{ChildStderr, ChildStdin, ChildStdout}, sync::{ mpsc::{unbounded_channel, Sender, UnboundedReceiver, UnboundedSender}, - Mutex, + Mutex, Notify, }, }; @@ -51,9 +51,11 @@ pub fn start( ) -> ( UnboundedReceiver<(usize, jsonrpc::Call)>, UnboundedSender, + Arc, ) { let (client_tx, rx) = unbounded_channel(); let (tx, client_rx) = unbounded_channel(); + let notify = Arc::new(Notify::new()); let transport = Self { id, @@ -64,9 +66,14 @@ pub fn start( tokio::spawn(Self::recv(transport.clone(), server_stdout, client_tx)); tokio::spawn(Self::err(transport.clone(), server_stderr)); - tokio::spawn(Self::send(transport, server_stdin, client_rx)); + tokio::spawn(Self::send( + transport, + server_stdin, + client_rx, + notify.clone(), + )); - (rx, tx) + (rx, tx, notify) } async fn recv_server_message( @@ -82,7 +89,8 @@ async fn recv_server_message( // debug!("<- header {:?}", buffer); - if header.is_empty() { + if buffer == "\r\n" { + // look for an empty CRLF line break; } @@ -99,7 +107,8 @@ async fn recv_server_message( // Workaround: Some non-conformant language servers will output logging and other garbage // into the same stream as JSON-RPC messages. This can also happen from shell scripts that spawn // the server. Skip such lines and log a warning. - warn!("Failed to parse header: {:?}", header); + + // warn!("Failed to parse header: {:?}", header); } } } @@ -261,15 +270,67 @@ async fn send( transport: Arc, mut server_stdin: BufWriter, mut client_rx: UnboundedReceiver, + initialize_notify: Arc, ) { - while let Some(msg) = client_rx.recv().await { - match transport - .send_payload_to_server(&mut server_stdin, msg) - .await - { - Ok(_) => {} - Err(err) => { - error!("err: <- {:?}", err); + let mut pending_messages: Vec = Vec::new(); + let mut is_pending = true; + + // Determine if a message is allowed to be sent early + fn is_initialize(payload: &Payload) -> bool { + use lsp_types::{ + notification::{Initialized, Notification}, + request::{Initialize, Request}, + }; + match payload { + Payload::Request { + value: jsonrpc::MethodCall { method, .. }, + .. + } if method == Initialize::METHOD => true, + Payload::Notification(jsonrpc::Notification { method, .. }) + if method == Initialized::METHOD => + { + true + } + _ => false, + } + } + + // TODO: events that use capabilities need to do the right thing + + loop { + tokio::select! { + biased; + _ = initialize_notify.notified() => { // TODO: notified is technically not cancellation safe + // server successfully initialized + is_pending = false; + // drain the pending queue and send payloads to server + for msg in pending_messages.drain(..) { + log::info!("Draining pending message {:?}", msg); + match transport.send_payload_to_server(&mut server_stdin, msg).await { + Ok(_) => {} + Err(err) => { + error!("err: <- {:?}", err); + } + } + } + } + msg = client_rx.recv() => { + if let Some(msg) = msg { + if is_pending && !is_initialize(&msg) { + log::info!("Language server not initialized, delaying request"); + pending_messages.push(msg); + } else { + match transport.send_payload_to_server(&mut server_stdin, msg).await { + Ok(_) => {} + Err(err) => { + error!("err: <- {:?}", err); + } + } + } + } else { + // channel closed + break; + } } } }