diff --git a/Cargo.lock b/Cargo.lock index c514dbfe11..a9e631b175 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2958,6 +2958,7 @@ dependencies = [ "gpui", "lsp-types", "parking_lot", + "postage", "serde 1.0.125", "serde_json 1.0.64", "smol", diff --git a/crates/lsp/Cargo.toml b/crates/lsp/Cargo.toml index 897c8c8224..3c1f08bb5f 100644 --- a/crates/lsp/Cargo.toml +++ b/crates/lsp/Cargo.toml @@ -10,6 +10,7 @@ anyhow = "1.0" futures = "0.3" lsp-types = "0.91" parking_lot = "0.11" +postage = { version = "0.4.1", features = ["futures-traits"] } serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0", features = ["raw_value"] } smol = "1.2" diff --git a/crates/lsp/src/lib.rs b/crates/lsp/src/lib.rs index c2fdf75182..2cc2426d2f 100644 --- a/crates/lsp/src/lib.rs +++ b/crates/lsp/src/lib.rs @@ -1,6 +1,7 @@ use anyhow::{anyhow, Context, Result}; use gpui::{executor, AppContext, Task}; use parking_lot::Mutex; +use postage::{barrier, prelude::Stream}; use serde::{Deserialize, Serialize}; use serde_json::value::RawValue; use smol::{ @@ -29,6 +30,7 @@ pub struct LanguageServer { response_handlers: Arc>>, _input_task: Task>, _output_task: Task>, + initialized: barrier::Receiver, } type ResponseHandler = Box)>; @@ -151,32 +153,51 @@ impl LanguageServer { .log_err(), ); + let (initialized_tx, initialized_rx) = barrier::channel(); let this = Arc::new(Self { response_handlers, next_id: Default::default(), outbound_tx, _input_task, _output_task, + initialized: initialized_rx, }); - background.spawn(this.clone().init().log_err()).detach(); + + background + .spawn({ + let this = this.clone(); + async move { + this.init().log_err().await; + drop(initialized_tx); + } + }) + .detach(); Ok(this) } async fn init(self: Arc) -> Result<()> { - self.request::(lsp_types::InitializeParams { - process_id: Default::default(), - root_path: Default::default(), - root_uri: Default::default(), - initialization_options: Default::default(), - capabilities: Default::default(), - trace: Default::default(), - workspace_folders: Default::default(), - client_info: Default::default(), - locale: Default::default(), - }) + let res = self + .request_internal::( + lsp_types::InitializeParams { + process_id: Default::default(), + root_path: Default::default(), + root_uri: Default::default(), + initialization_options: Default::default(), + capabilities: Default::default(), + trace: Default::default(), + workspace_folders: Default::default(), + client_info: Default::default(), + locale: Default::default(), + }, + false, + ) + .await?; + self.notify_internal::( + lsp_types::InitializedParams {}, + false, + ) .await?; - self.notify::(lsp_types::InitializedParams {})?; Ok(()) } @@ -184,6 +205,17 @@ impl LanguageServer { self: &Arc, params: T::Params, ) -> impl Future> + where + T::Result: 'static + Send, + { + self.request_internal::(params, true) + } + + fn request_internal( + self: &Arc, + params: T::Params, + wait_for_initialization: bool, + ) -> impl Future> where T::Result: 'static + Send, { @@ -210,25 +242,43 @@ impl LanguageServer { }), ); - let outbound_tx = self.outbound_tx.clone(); + let this = self.clone(); async move { - outbound_tx.send(message).await?; + if wait_for_initialization { + this.initialized.clone().recv().await; + } + this.outbound_tx.send(message).await?; rx.recv().await? } } pub fn notify( - &self, + self: &Arc, params: T::Params, - ) -> Result<()> { + ) -> impl Future> { + self.notify_internal::(params, true) + } + + fn notify_internal( + self: &Arc, + params: T::Params, + wait_for_initialization: bool, + ) -> impl Future> { let message = serde_json::to_vec(&OutboundNotification { jsonrpc: JSON_RPC_VERSION, method: T::METHOD, params, }) .unwrap(); - smol::block_on(self.outbound_tx.send(message))?; - Ok(()) + + let this = self.clone(); + async move { + if wait_for_initialization { + this.initialized.clone().recv().await; + } + this.outbound_tx.send(message).await?; + Ok(()) + } } }