diff --git a/Cargo.lock b/Cargo.lock index cbbcbc7914..558cb84535 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1507,7 +1507,7 @@ dependencies = [ "parking_lot 0.11.2", "postage", "rand 0.8.5", - "rpc", + "rpc2", "schemars", "serde", "serde_derive", @@ -4285,7 +4285,6 @@ dependencies = [ "collections", "ctor", "env_logger 0.9.3", - "fs", "futures 0.3.28", "fuzzy2", "git", @@ -4299,7 +4298,7 @@ dependencies = [ "postage", "rand 0.8.5", "regex", - "rpc", + "rpc2", "schemars", "serde", "serde_derive", @@ -6087,7 +6086,7 @@ dependencies = [ "pretty_assertions", "rand 0.8.5", "regex", - "rpc", + "rpc2", "schemars", "serde", "serde_derive", @@ -6839,6 +6838,35 @@ dependencies = [ "zstd", ] +[[package]] +name = "rpc2" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-lock", + "async-tungstenite", + "base64 0.13.1", + "clock", + "collections", + "ctor", + "env_logger 0.9.3", + "futures 0.3.28", + "gpui2", + "parking_lot 0.11.2", + "prost 0.8.0", + "prost-build", + "rand 0.8.5", + "rsa 0.4.0", + "serde", + "serde_derive", + "smol", + "smol-timeout", + "tempdir", + "tracing", + "util", + "zstd", +] + [[package]] name = "rsa" version = "0.4.0" @@ -10806,7 +10834,7 @@ dependencies = [ "project2", "rand 0.8.5", "regex", - "rpc", + "rpc2", "rsa 0.4.0", "rust-embed", "schemars", diff --git a/Cargo.toml b/Cargo.toml index f8ce95ea6b..82af9265dd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,6 +73,7 @@ members = [ "crates/recent_projects", "crates/rope", "crates/rpc", + "crates/rpc2", "crates/search", "crates/settings", "crates/settings2", diff --git a/crates/client2/Cargo.toml b/crates/client2/Cargo.toml index 3203b71819..8a6edbb428 100644 --- a/crates/client2/Cargo.toml +++ b/crates/client2/Cargo.toml @@ -9,14 +9,14 @@ path = "src/client2.rs" doctest = false [features] -test-support = ["collections/test-support", "gpui2/test-support", "rpc/test-support"] +test-support = ["collections/test-support", "gpui2/test-support", "rpc2/test-support"] [dependencies] collections = { path = "../collections" } db2 = { path = "../db2" } gpui2 = { path = "../gpui2" } util = { path = "../util" } -rpc = { path = "../rpc" } +rpc2 = { path = "../rpc2" } text = { path = "../text" } settings2 = { path = "../settings2" } feature_flags2 = { path = "../feature_flags2" } @@ -47,6 +47,6 @@ url = "2.2" [dev-dependencies] collections = { path = "../collections", features = ["test-support"] } gpui2 = { path = "../gpui2", features = ["test-support"] } -rpc = { path = "../rpc", features = ["test-support"] } +rpc2 = { path = "../rpc2", features = ["test-support"] } settings = { path = "../settings", features = ["test-support"] } util = { path = "../util", features = ["test-support"] } diff --git a/crates/client2/src/client2.rs b/crates/client2/src/client2.rs index 6072f47397..215fcdf567 100644 --- a/crates/client2/src/client2.rs +++ b/crates/client2/src/client2.rs @@ -21,7 +21,7 @@ use lazy_static::lazy_static; use parking_lot::RwLock; use postage::watch; use rand::prelude::*; -use rpc::proto::{AnyTypedEnvelope, EntityMessage, EnvelopedMessage, PeerId, RequestMessage}; +use rpc2::proto::{AnyTypedEnvelope, EntityMessage, EnvelopedMessage, PeerId, RequestMessage}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use settings2::Settings; @@ -43,7 +43,7 @@ use util::channel::ReleaseChannel; use util::http::HttpClient; use util::{ResultExt, TryFutureExt}; -pub use rpc::*; +pub use rpc2::*; pub use telemetry::ClickhouseEvent; pub use user::*; @@ -975,7 +975,7 @@ impl Client { "Authorization", format!("{} {}", credentials.user_id, credentials.access_token), ) - .header("x-zed-protocol-version", rpc::PROTOCOL_VERSION); + .header("x-zed-protocol-version", rpc2::PROTOCOL_VERSION); let http = self.http.clone(); cx.executor().spawn(async move { @@ -1025,7 +1025,7 @@ impl Client { // zed server to encrypt the user's access token, so that it can'be intercepted by // any other app running on the user's device. let (public_key, private_key) = - rpc::auth::keypair().expect("failed to generate keypair for auth"); + rpc2::auth::keypair().expect("failed to generate keypair for auth"); let public_key_string = String::try_from(public_key).expect("failed to serialize public key for auth"); diff --git a/crates/client2/src/user.rs b/crates/client2/src/user.rs index 2aaae6dc85..41cf46ea8f 100644 --- a/crates/client2/src/user.rs +++ b/crates/client2/src/user.rs @@ -5,7 +5,7 @@ use feature_flags2::FeatureFlagAppExt; use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt}; use gpui2::{AsyncAppContext, EventEmitter, Handle, ImageData, ModelContext, Task}; use postage::{sink::Sink, watch}; -use rpc::proto::{RequestMessage, UsersResponse}; +use rpc2::proto::{RequestMessage, UsersResponse}; use std::sync::{Arc, Weak}; use text::ReplicaId; use util::http::HttpClient; diff --git a/crates/gpui2/src/platform/test/dispatcher.rs b/crates/gpui2/src/platform/test/dispatcher.rs index 746a5ed0c0..02873f8e6e 100644 --- a/crates/gpui2/src/platform/test/dispatcher.rs +++ b/crates/gpui2/src/platform/test/dispatcher.rs @@ -1,6 +1,6 @@ use crate::PlatformDispatcher; use async_task::Runnable; -use collections::{BTreeMap, HashMap, VecDeque}; +use collections::{HashMap, VecDeque}; use parking_lot::Mutex; use rand::prelude::*; use std::{ @@ -24,7 +24,7 @@ struct TestDispatcherState { random: StdRng, foreground: HashMap>, background: Vec, - delayed: BTreeMap, + delayed: Vec<(Instant, Runnable)>, time: Instant, is_main_thread: bool, next_id: TestDispatcherId, @@ -36,7 +36,7 @@ impl TestDispatcher { random, foreground: HashMap::default(), background: Vec::new(), - delayed: BTreeMap::new(), + delayed: Vec::new(), time: Instant::now(), is_main_thread: true, next_id: TestDispatcherId(1), @@ -112,17 +112,20 @@ impl PlatformDispatcher for TestDispatcher { fn dispatch_after(&self, duration: std::time::Duration, runnable: Runnable) { let mut state = self.state.lock(); let next_time = state.time + duration; - state.delayed.insert(next_time, runnable); + let ix = match state.delayed.binary_search_by_key(&next_time, |e| e.0) { + Ok(ix) | Err(ix) => ix, + }; + state.delayed.insert(ix, (next_time, runnable)); } fn poll(&self) -> bool { let mut state = self.state.lock(); - while let Some((deadline, _)) = state.delayed.first_key_value() { + while let Some((deadline, _)) = state.delayed.first() { if *deadline > state.time { break; } - let (_, runnable) = state.delayed.pop_first().unwrap(); + let (_, runnable) = state.delayed.remove(0); state.background.push(runnable); } @@ -134,8 +137,10 @@ impl PlatformDispatcher for TestDispatcher { let background_len = state.background.len(); if foreground_len == 0 && background_len == 0 { + eprintln!("no runnables to poll"); return false; } + eprintln!("runnables {} {}", foreground_len, background_len); let main_thread = state.random.gen_ratio( foreground_len as u32, @@ -145,6 +150,7 @@ impl PlatformDispatcher for TestDispatcher { state.is_main_thread = main_thread; let runnable = if main_thread { + eprintln!("running next main thread"); let state = &mut *state; let runnables = state .foreground @@ -155,6 +161,7 @@ impl PlatformDispatcher for TestDispatcher { runnables.pop_front().unwrap() } else { let ix = state.random.gen_range(0..background_len); + eprintln!("running background thread {ix}"); state.background.swap_remove(ix) }; diff --git a/crates/language2/Cargo.toml b/crates/language2/Cargo.toml index 77b195293b..a3e1bf3e2a 100644 --- a/crates/language2/Cargo.toml +++ b/crates/language2/Cargo.toml @@ -25,11 +25,10 @@ test-support = [ clock = { path = "../clock" } collections = { path = "../collections" } fuzzy2 = { path = "../fuzzy2" } -fs = { path = "../fs" } git = { path = "../git" } gpui2 = { path = "../gpui2" } lsp2 = { path = "../lsp2" } -rpc = { path = "../rpc" } +rpc2 = { path = "../rpc2" } settings2 = { path = "../settings2" } sum_tree = { path = "../sum_tree" } text = { path = "../text" } diff --git a/crates/language2/src/buffer.rs b/crates/language2/src/buffer.rs index a8b764abc9..54425cec47 100644 --- a/crates/language2/src/buffer.rs +++ b/crates/language2/src/buffer.rs @@ -226,7 +226,7 @@ pub trait File: Send + Sync { fn as_any(&self) -> &dyn Any; - fn to_proto(&self) -> rpc::proto::File; + fn to_proto(&self) -> rpc2::proto::File; } pub trait LocalFile: File { @@ -375,7 +375,7 @@ impl Buffer { file, ); this.text.set_line_ending(proto::deserialize_line_ending( - rpc::proto::LineEnding::from_i32(message.line_ending) + rpc2::proto::LineEnding::from_i32(message.line_ending) .ok_or_else(|| anyhow!("missing line_ending"))?, )); this.saved_version = proto::deserialize_version(&message.saved_version); diff --git a/crates/language2/src/language2.rs b/crates/language2/src/language2.rs index d21ebd572a..9edaa16046 100644 --- a/crates/language2/src/language2.rs +++ b/crates/language2/src/language2.rs @@ -1862,111 +1862,112 @@ pub fn range_from_lsp(range: lsp2::Range) -> Range> { start..end } -// #[cfg(test)] -// mod tests { -// use super::*; -// use gpui::TestAppContext; +#[cfg(test)] +mod tests { + use super::*; + use gpui2::TestAppContext; -// #[gpui::test(iterations = 10)] -// async fn test_first_line_pattern(cx: &mut TestAppContext) { -// let mut languages = LanguageRegistry::test(); -// languages.set_executor(cx.background()); -// let languages = Arc::new(languages); -// languages.register( -// "/javascript", -// LanguageConfig { -// name: "JavaScript".into(), -// path_suffixes: vec!["js".into()], -// first_line_pattern: Some(Regex::new(r"\bnode\b").unwrap()), -// ..Default::default() -// }, -// tree_sitter_typescript::language_tsx(), -// vec![], -// |_| Default::default(), -// ); + #[gpui2::test(iterations = 10)] + async fn test_first_line_pattern(cx: &mut TestAppContext) { + let mut languages = LanguageRegistry::test(); -// languages -// .language_for_file("the/script", None) -// .await -// .unwrap_err(); -// languages -// .language_for_file("the/script", Some(&"nothing".into())) -// .await -// .unwrap_err(); -// assert_eq!( -// languages -// .language_for_file("the/script", Some(&"#!/bin/env node".into())) -// .await -// .unwrap() -// .name() -// .as_ref(), -// "JavaScript" -// ); -// } + languages.set_executor(cx.executor().clone()); + let languages = Arc::new(languages); + languages.register( + "/javascript", + LanguageConfig { + name: "JavaScript".into(), + path_suffixes: vec!["js".into()], + first_line_pattern: Some(Regex::new(r"\bnode\b").unwrap()), + ..Default::default() + }, + tree_sitter_typescript::language_tsx(), + vec![], + |_| Default::default(), + ); -// #[gpui::test(iterations = 10)] -// async fn test_language_loading(cx: &mut TestAppContext) { -// let mut languages = LanguageRegistry::test(); -// languages.set_executor(cx.background()); -// let languages = Arc::new(languages); -// languages.register( -// "/JSON", -// LanguageConfig { -// name: "JSON".into(), -// path_suffixes: vec!["json".into()], -// ..Default::default() -// }, -// tree_sitter_json::language(), -// vec![], -// |_| Default::default(), -// ); -// languages.register( -// "/rust", -// LanguageConfig { -// name: "Rust".into(), -// path_suffixes: vec!["rs".into()], -// ..Default::default() -// }, -// tree_sitter_rust::language(), -// vec![], -// |_| Default::default(), -// ); -// assert_eq!( -// languages.language_names(), -// &[ -// "JSON".to_string(), -// "Plain Text".to_string(), -// "Rust".to_string(), -// ] -// ); + languages + .language_for_file("the/script", None) + .await + .unwrap_err(); + languages + .language_for_file("the/script", Some(&"nothing".into())) + .await + .unwrap_err(); + assert_eq!( + languages + .language_for_file("the/script", Some(&"#!/bin/env node".into())) + .await + .unwrap() + .name() + .as_ref(), + "JavaScript" + ); + } -// let rust1 = languages.language_for_name("Rust"); -// let rust2 = languages.language_for_name("Rust"); + #[gpui2::test(iterations = 10)] + async fn test_language_loading(cx: &mut TestAppContext) { + let mut languages = LanguageRegistry::test(); + languages.set_executor(cx.executor().clone()); + let languages = Arc::new(languages); + languages.register( + "/JSON", + LanguageConfig { + name: "JSON".into(), + path_suffixes: vec!["json".into()], + ..Default::default() + }, + tree_sitter_json::language(), + vec![], + |_| Default::default(), + ); + languages.register( + "/rust", + LanguageConfig { + name: "Rust".into(), + path_suffixes: vec!["rs".into()], + ..Default::default() + }, + tree_sitter_rust::language(), + vec![], + |_| Default::default(), + ); + assert_eq!( + languages.language_names(), + &[ + "JSON".to_string(), + "Plain Text".to_string(), + "Rust".to_string(), + ] + ); -// // Ensure language is still listed even if it's being loaded. -// assert_eq!( -// languages.language_names(), -// &[ -// "JSON".to_string(), -// "Plain Text".to_string(), -// "Rust".to_string(), -// ] -// ); + let rust1 = languages.language_for_name("Rust"); + let rust2 = languages.language_for_name("Rust"); -// let (rust1, rust2) = futures::join!(rust1, rust2); -// assert!(Arc::ptr_eq(&rust1.unwrap(), &rust2.unwrap())); + // Ensure language is still listed even if it's being loaded. + assert_eq!( + languages.language_names(), + &[ + "JSON".to_string(), + "Plain Text".to_string(), + "Rust".to_string(), + ] + ); -// // Ensure language is still listed even after loading it. -// assert_eq!( -// languages.language_names(), -// &[ -// "JSON".to_string(), -// "Plain Text".to_string(), -// "Rust".to_string(), -// ] -// ); + let (rust1, rust2) = futures::join!(rust1, rust2); + assert!(Arc::ptr_eq(&rust1.unwrap(), &rust2.unwrap())); -// // Loading an unknown language returns an error. -// assert!(languages.language_for_name("Unknown").await.is_err()); -// } -// } + // Ensure language is still listed even after loading it. + assert_eq!( + languages.language_names(), + &[ + "JSON".to_string(), + "Plain Text".to_string(), + "Rust".to_string(), + ] + ); + + // Loading an unknown language returns an error. + assert!(languages.language_for_name("Unknown").await.is_err()); + } +} diff --git a/crates/language2/src/proto.rs b/crates/language2/src/proto.rs index e23711e328..f90bb94742 100644 --- a/crates/language2/src/proto.rs +++ b/crates/language2/src/proto.rs @@ -5,7 +5,7 @@ use crate::{ use anyhow::{anyhow, Result}; use clock::ReplicaId; use lsp2::{DiagnosticSeverity, LanguageServerId}; -use rpc::proto; +use rpc2::proto; use std::{ops::Range, sync::Arc}; use text::*; diff --git a/crates/project2/Cargo.toml b/crates/project2/Cargo.toml index 28b9826bd4..98bf9b62be 100644 --- a/crates/project2/Cargo.toml +++ b/crates/project2/Cargo.toml @@ -34,7 +34,7 @@ language2 = { path = "../language2" } lsp2 = { path = "../lsp2" } node_runtime = { path = "../node_runtime" } prettier2 = { path = "../prettier2" } -rpc = { path = "../rpc" } +rpc2 = { path = "../rpc2" } settings2 = { path = "../settings2" } sum_tree = { path = "../sum_tree" } terminal2 = { path = "../terminal2" } @@ -78,7 +78,7 @@ lsp2 = { path = "../lsp2", features = ["test-support"] } settings2 = { path = "../settings2", features = ["test-support"] } prettier2 = { path = "../prettier2", features = ["test-support"] } util = { path = "../util", features = ["test-support"] } -rpc = { path = "../rpc", features = ["test-support"] } +rpc2 = { path = "../rpc2", features = ["test-support"] } git2.workspace = true tempdir.workspace = true unindent.workspace = true diff --git a/crates/project2/src/worktree.rs b/crates/project2/src/worktree.rs index c094f21db4..c1b762640b 100644 --- a/crates/project2/src/worktree.rs +++ b/crates/project2/src/worktree.rs @@ -2646,8 +2646,8 @@ impl language2::File for File { self } - fn to_proto(&self) -> rpc::proto::File { - rpc::proto::File { + fn to_proto(&self) -> rpc2::proto::File { + rpc2::proto::File { worktree_id: self.worktree.entity_id().as_u64(), entry_id: self.entry_id.to_proto(), path: self.path.to_string_lossy().into(), @@ -2713,7 +2713,7 @@ impl File { } pub fn from_proto( - proto: rpc::proto::File, + proto: rpc2::proto::File, worktree: Handle, cx: &AppContext, ) -> Result { diff --git a/crates/rpc2/Cargo.toml b/crates/rpc2/Cargo.toml new file mode 100644 index 0000000000..f108af3d3f --- /dev/null +++ b/crates/rpc2/Cargo.toml @@ -0,0 +1,44 @@ +[package] +description = "Shared logic for communication between the Zed app and the zed.dev server" +edition = "2021" +name = "rpc2" +version = "0.1.0" +publish = false + +[lib] +path = "src/rpc.rs" +doctest = false + +[features] +test-support = ["collections/test-support", "gpui2/test-support"] + +[dependencies] +clock = { path = "../clock" } +collections = { path = "../collections" } +gpui2 = { path = "../gpui2", optional = true } +util = { path = "../util" } +anyhow.workspace = true +async-lock = "2.4" +async-tungstenite = "0.16" +base64 = "0.13" +futures.workspace = true +parking_lot.workspace = true +prost.workspace = true +rand.workspace = true +rsa = "0.4" +serde.workspace = true +serde_derive.workspace = true +smol-timeout = "0.6" +tracing = { version = "0.1.34", features = ["log"] } +zstd = "0.11" + +[build-dependencies] +prost-build = "0.9" + +[dev-dependencies] +collections = { path = "../collections", features = ["test-support"] } +gpui2 = { path = "../gpui2", features = ["test-support"] } +smol.workspace = true +tempdir.workspace = true +ctor.workspace = true +env_logger.workspace = true diff --git a/crates/rpc2/build.rs b/crates/rpc2/build.rs new file mode 100644 index 0000000000..66b289f1db --- /dev/null +++ b/crates/rpc2/build.rs @@ -0,0 +1,8 @@ +fn main() { + let mut build = prost_build::Config::new(); + // build.protoc_arg("--experimental_allow_proto3_optional"); + build + .type_attribute(".", "#[derive(serde::Serialize)]") + .compile_protos(&["proto/zed.proto"], &["proto"]) + .unwrap(); +} diff --git a/crates/rpc2/proto/zed.proto b/crates/rpc2/proto/zed.proto new file mode 100644 index 0000000000..3501e70e6a --- /dev/null +++ b/crates/rpc2/proto/zed.proto @@ -0,0 +1,1559 @@ +syntax = "proto3"; +package zed.messages; + +// Looking for a number? Search "// Current max" + +message PeerId { + uint32 owner_id = 1; + uint32 id = 2; +} + +message Envelope { + uint32 id = 1; + optional uint32 responding_to = 2; + optional PeerId original_sender_id = 3; + oneof payload { + Hello hello = 4; + Ack ack = 5; + Error error = 6; + Ping ping = 7; + Test test = 8; + + CreateRoom create_room = 9; + CreateRoomResponse create_room_response = 10; + JoinRoom join_room = 11; + JoinRoomResponse join_room_response = 12; + RejoinRoom rejoin_room = 13; + RejoinRoomResponse rejoin_room_response = 14; + LeaveRoom leave_room = 15; + Call call = 16; + IncomingCall incoming_call = 17; + CallCanceled call_canceled = 18; + CancelCall cancel_call = 19; + DeclineCall decline_call = 20; + UpdateParticipantLocation update_participant_location = 21; + RoomUpdated room_updated = 22; + + ShareProject share_project = 23; + ShareProjectResponse share_project_response = 24; + UnshareProject unshare_project = 25; + JoinProject join_project = 26; + JoinProjectResponse join_project_response = 27; + LeaveProject leave_project = 28; + AddProjectCollaborator add_project_collaborator = 29; + UpdateProjectCollaborator update_project_collaborator = 30; + RemoveProjectCollaborator remove_project_collaborator = 31; + + GetDefinition get_definition = 32; + GetDefinitionResponse get_definition_response = 33; + GetTypeDefinition get_type_definition = 34; + GetTypeDefinitionResponse get_type_definition_response = 35; + GetReferences get_references = 36; + GetReferencesResponse get_references_response = 37; + GetDocumentHighlights get_document_highlights = 38; + GetDocumentHighlightsResponse get_document_highlights_response = 39; + GetProjectSymbols get_project_symbols = 40; + GetProjectSymbolsResponse get_project_symbols_response = 41; + OpenBufferForSymbol open_buffer_for_symbol = 42; + OpenBufferForSymbolResponse open_buffer_for_symbol_response = 43; + + UpdateProject update_project = 44; + UpdateWorktree update_worktree = 45; + + CreateProjectEntry create_project_entry = 46; + RenameProjectEntry rename_project_entry = 47; + CopyProjectEntry copy_project_entry = 48; + DeleteProjectEntry delete_project_entry = 49; + ProjectEntryResponse project_entry_response = 50; + ExpandProjectEntry expand_project_entry = 51; + ExpandProjectEntryResponse expand_project_entry_response = 52; + + UpdateDiagnosticSummary update_diagnostic_summary = 53; + StartLanguageServer start_language_server = 54; + UpdateLanguageServer update_language_server = 55; + + OpenBufferById open_buffer_by_id = 56; + OpenBufferByPath open_buffer_by_path = 57; + OpenBufferResponse open_buffer_response = 58; + CreateBufferForPeer create_buffer_for_peer = 59; + UpdateBuffer update_buffer = 60; + UpdateBufferFile update_buffer_file = 61; + SaveBuffer save_buffer = 62; + BufferSaved buffer_saved = 63; + BufferReloaded buffer_reloaded = 64; + ReloadBuffers reload_buffers = 65; + ReloadBuffersResponse reload_buffers_response = 66; + SynchronizeBuffers synchronize_buffers = 67; + SynchronizeBuffersResponse synchronize_buffers_response = 68; + FormatBuffers format_buffers = 69; + FormatBuffersResponse format_buffers_response = 70; + GetCompletions get_completions = 71; + GetCompletionsResponse get_completions_response = 72; + ApplyCompletionAdditionalEdits apply_completion_additional_edits = 73; + ApplyCompletionAdditionalEditsResponse apply_completion_additional_edits_response = 74; + GetCodeActions get_code_actions = 75; + GetCodeActionsResponse get_code_actions_response = 76; + GetHover get_hover = 77; + GetHoverResponse get_hover_response = 78; + ApplyCodeAction apply_code_action = 79; + ApplyCodeActionResponse apply_code_action_response = 80; + PrepareRename prepare_rename = 81; + PrepareRenameResponse prepare_rename_response = 82; + PerformRename perform_rename = 83; + PerformRenameResponse perform_rename_response = 84; + SearchProject search_project = 85; + SearchProjectResponse search_project_response = 86; + + UpdateContacts update_contacts = 87; + UpdateInviteInfo update_invite_info = 88; + ShowContacts show_contacts = 89; + + GetUsers get_users = 90; + FuzzySearchUsers fuzzy_search_users = 91; + UsersResponse users_response = 92; + RequestContact request_contact = 93; + RespondToContactRequest respond_to_contact_request = 94; + RemoveContact remove_contact = 95; + + Follow follow = 96; + FollowResponse follow_response = 97; + UpdateFollowers update_followers = 98; + Unfollow unfollow = 99; + GetPrivateUserInfo get_private_user_info = 100; + GetPrivateUserInfoResponse get_private_user_info_response = 101; + UpdateDiffBase update_diff_base = 102; + + OnTypeFormatting on_type_formatting = 103; + OnTypeFormattingResponse on_type_formatting_response = 104; + + UpdateWorktreeSettings update_worktree_settings = 105; + + InlayHints inlay_hints = 106; + InlayHintsResponse inlay_hints_response = 107; + ResolveInlayHint resolve_inlay_hint = 108; + ResolveInlayHintResponse resolve_inlay_hint_response = 109; + RefreshInlayHints refresh_inlay_hints = 110; + + CreateChannel create_channel = 111; + CreateChannelResponse create_channel_response = 112; + InviteChannelMember invite_channel_member = 113; + RemoveChannelMember remove_channel_member = 114; + RespondToChannelInvite respond_to_channel_invite = 115; + UpdateChannels update_channels = 116; + JoinChannel join_channel = 117; + DeleteChannel delete_channel = 118; + GetChannelMembers get_channel_members = 119; + GetChannelMembersResponse get_channel_members_response = 120; + SetChannelMemberAdmin set_channel_member_admin = 121; + RenameChannel rename_channel = 122; + RenameChannelResponse rename_channel_response = 123; + + JoinChannelBuffer join_channel_buffer = 124; + JoinChannelBufferResponse join_channel_buffer_response = 125; + UpdateChannelBuffer update_channel_buffer = 126; + LeaveChannelBuffer leave_channel_buffer = 127; + UpdateChannelBufferCollaborators update_channel_buffer_collaborators = 128; + RejoinChannelBuffers rejoin_channel_buffers = 129; + RejoinChannelBuffersResponse rejoin_channel_buffers_response = 130; + AckBufferOperation ack_buffer_operation = 143; + + JoinChannelChat join_channel_chat = 131; + JoinChannelChatResponse join_channel_chat_response = 132; + LeaveChannelChat leave_channel_chat = 133; + SendChannelMessage send_channel_message = 134; + SendChannelMessageResponse send_channel_message_response = 135; + ChannelMessageSent channel_message_sent = 136; + GetChannelMessages get_channel_messages = 137; + GetChannelMessagesResponse get_channel_messages_response = 138; + RemoveChannelMessage remove_channel_message = 139; + AckChannelMessage ack_channel_message = 144; + + LinkChannel link_channel = 140; + UnlinkChannel unlink_channel = 141; + MoveChannel move_channel = 142; // current max: 144 + } +} + +// Messages + +message Hello { + PeerId peer_id = 1; +} + +message Ping {} + +message Ack {} + +message Error { + string message = 1; +} + +message Test { + uint64 id = 1; +} + +message CreateRoom {} + +message CreateRoomResponse { + Room room = 1; + optional LiveKitConnectionInfo live_kit_connection_info = 2; +} + +message JoinRoom { + uint64 id = 1; +} + +message JoinRoomResponse { + Room room = 1; + optional uint64 channel_id = 2; + optional LiveKitConnectionInfo live_kit_connection_info = 3; +} + +message RejoinRoom { + uint64 id = 1; + repeated UpdateProject reshared_projects = 2; + repeated RejoinProject rejoined_projects = 3; +} + +message RejoinProject { + uint64 id = 1; + repeated RejoinWorktree worktrees = 2; +} + +message RejoinWorktree { + uint64 id = 1; + uint64 scan_id = 2; +} + +message RejoinRoomResponse { + Room room = 1; + repeated ResharedProject reshared_projects = 2; + repeated RejoinedProject rejoined_projects = 3; +} + +message ResharedProject { + uint64 id = 1; + repeated Collaborator collaborators = 2; +} + +message RejoinedProject { + uint64 id = 1; + repeated WorktreeMetadata worktrees = 2; + repeated Collaborator collaborators = 3; + repeated LanguageServer language_servers = 4; +} + +message LeaveRoom {} + +message Room { + uint64 id = 1; + repeated Participant participants = 2; + repeated PendingParticipant pending_participants = 3; + repeated Follower followers = 4; + string live_kit_room = 5; +} + +message Participant { + uint64 user_id = 1; + PeerId peer_id = 2; + repeated ParticipantProject projects = 3; + ParticipantLocation location = 4; + uint32 participant_index = 5; +} + +message PendingParticipant { + uint64 user_id = 1; + uint64 calling_user_id = 2; + optional uint64 initial_project_id = 3; +} + +message ParticipantProject { + uint64 id = 1; + repeated string worktree_root_names = 2; +} + +message Follower { + PeerId leader_id = 1; + PeerId follower_id = 2; + uint64 project_id = 3; +} + +message ParticipantLocation { + oneof variant { + SharedProject shared_project = 1; + UnsharedProject unshared_project = 2; + External external = 3; + } + + message SharedProject { + uint64 id = 1; + } + + message UnsharedProject {} + + message External {} +} + +message Call { + uint64 room_id = 1; + uint64 called_user_id = 2; + optional uint64 initial_project_id = 3; +} + +message IncomingCall { + uint64 room_id = 1; + uint64 calling_user_id = 2; + repeated uint64 participant_user_ids = 3; + optional ParticipantProject initial_project = 4; +} + +message CallCanceled { + uint64 room_id = 1; +} + +message CancelCall { + uint64 room_id = 1; + uint64 called_user_id = 2; +} + +message DeclineCall { + uint64 room_id = 1; +} + +message UpdateParticipantLocation { + uint64 room_id = 1; + ParticipantLocation location = 2; +} + +message RoomUpdated { + Room room = 1; +} + +message LiveKitConnectionInfo { + string server_url = 1; + string token = 2; +} + +message ShareProject { + uint64 room_id = 1; + repeated WorktreeMetadata worktrees = 2; +} + +message ShareProjectResponse { + uint64 project_id = 1; +} + +message UnshareProject { + uint64 project_id = 1; +} + +message UpdateProject { + uint64 project_id = 1; + repeated WorktreeMetadata worktrees = 2; +} + +message JoinProject { + uint64 project_id = 1; +} + +message JoinProjectResponse { + uint32 replica_id = 1; + repeated WorktreeMetadata worktrees = 2; + repeated Collaborator collaborators = 3; + repeated LanguageServer language_servers = 4; +} + +message LeaveProject { + uint64 project_id = 1; +} + +message UpdateWorktree { + uint64 project_id = 1; + uint64 worktree_id = 2; + string root_name = 3; + repeated Entry updated_entries = 4; + repeated uint64 removed_entries = 5; + repeated RepositoryEntry updated_repositories = 6; + repeated uint64 removed_repositories = 7; + uint64 scan_id = 8; + bool is_last_update = 9; + string abs_path = 10; +} + +message UpdateWorktreeSettings { + uint64 project_id = 1; + uint64 worktree_id = 2; + string path = 3; + optional string content = 4; +} + +message CreateProjectEntry { + uint64 project_id = 1; + uint64 worktree_id = 2; + string path = 3; + bool is_directory = 4; +} + +message RenameProjectEntry { + uint64 project_id = 1; + uint64 entry_id = 2; + string new_path = 3; +} + +message CopyProjectEntry { + uint64 project_id = 1; + uint64 entry_id = 2; + string new_path = 3; +} + +message DeleteProjectEntry { + uint64 project_id = 1; + uint64 entry_id = 2; +} + +message ExpandProjectEntry { + uint64 project_id = 1; + uint64 entry_id = 2; +} + +message ExpandProjectEntryResponse { + uint64 worktree_scan_id = 1; +} + +message ProjectEntryResponse { + Entry entry = 1; + uint64 worktree_scan_id = 2; +} + +message AddProjectCollaborator { + uint64 project_id = 1; + Collaborator collaborator = 2; +} + +message UpdateProjectCollaborator { + uint64 project_id = 1; + PeerId old_peer_id = 2; + PeerId new_peer_id = 3; +} + +message RemoveProjectCollaborator { + uint64 project_id = 1; + PeerId peer_id = 2; +} + +message UpdateChannelBufferCollaborators { + uint64 channel_id = 1; + repeated Collaborator collaborators = 2; +} + +message GetDefinition { + uint64 project_id = 1; + uint64 buffer_id = 2; + Anchor position = 3; + repeated VectorClockEntry version = 4; + } + +message GetDefinitionResponse { + repeated LocationLink links = 1; +} + +message GetTypeDefinition { + uint64 project_id = 1; + uint64 buffer_id = 2; + Anchor position = 3; + repeated VectorClockEntry version = 4; + } + +message GetTypeDefinitionResponse { + repeated LocationLink links = 1; +} + +message GetReferences { + uint64 project_id = 1; + uint64 buffer_id = 2; + Anchor position = 3; + repeated VectorClockEntry version = 4; + } + +message GetReferencesResponse { + repeated Location locations = 1; +} + +message GetDocumentHighlights { + uint64 project_id = 1; + uint64 buffer_id = 2; + Anchor position = 3; + repeated VectorClockEntry version = 4; + } + +message GetDocumentHighlightsResponse { + repeated DocumentHighlight highlights = 1; +} + +message Location { + uint64 buffer_id = 1; + Anchor start = 2; + Anchor end = 3; +} + +message LocationLink { + optional Location origin = 1; + Location target = 2; +} + +message DocumentHighlight { + Kind kind = 1; + Anchor start = 2; + Anchor end = 3; + + enum Kind { + Text = 0; + Read = 1; + Write = 2; + } +} + +message GetProjectSymbols { + uint64 project_id = 1; + string query = 2; +} + +message GetProjectSymbolsResponse { + repeated Symbol symbols = 4; +} + +message Symbol { + uint64 source_worktree_id = 1; + uint64 worktree_id = 2; + string language_server_name = 3; + string name = 4; + int32 kind = 5; + string path = 6; + // Cannot use generate anchors for unopened files, + // so we are forced to use point coords instead + PointUtf16 start = 7; + PointUtf16 end = 8; + bytes signature = 9; +} + +message OpenBufferForSymbol { + uint64 project_id = 1; + Symbol symbol = 2; +} + +message OpenBufferForSymbolResponse { + uint64 buffer_id = 1; +} + +message OpenBufferByPath { + uint64 project_id = 1; + uint64 worktree_id = 2; + string path = 3; +} + +message OpenBufferById { + uint64 project_id = 1; + uint64 id = 2; +} + +message OpenBufferResponse { + uint64 buffer_id = 1; +} + +message CreateBufferForPeer { + uint64 project_id = 1; + PeerId peer_id = 2; + oneof variant { + BufferState state = 3; + BufferChunk chunk = 4; + } +} + +message UpdateBuffer { + uint64 project_id = 1; + uint64 buffer_id = 2; + repeated Operation operations = 3; +} + +message UpdateChannelBuffer { + uint64 channel_id = 1; + repeated Operation operations = 2; +} + +message UpdateBufferFile { + uint64 project_id = 1; + uint64 buffer_id = 2; + File file = 3; +} + +message SaveBuffer { + uint64 project_id = 1; + uint64 buffer_id = 2; + repeated VectorClockEntry version = 3; +} + +message BufferSaved { + uint64 project_id = 1; + uint64 buffer_id = 2; + repeated VectorClockEntry version = 3; + Timestamp mtime = 4; + string fingerprint = 5; +} + +message BufferReloaded { + uint64 project_id = 1; + uint64 buffer_id = 2; + repeated VectorClockEntry version = 3; + Timestamp mtime = 4; + string fingerprint = 5; + LineEnding line_ending = 6; +} + +message ReloadBuffers { + uint64 project_id = 1; + repeated uint64 buffer_ids = 2; +} + +message ReloadBuffersResponse { + ProjectTransaction transaction = 1; +} + +message SynchronizeBuffers { + uint64 project_id = 1; + repeated BufferVersion buffers = 2; +} + +message SynchronizeBuffersResponse { + repeated BufferVersion buffers = 1; +} + +message BufferVersion { + uint64 id = 1; + repeated VectorClockEntry version = 2; +} + +message ChannelBufferVersion { + uint64 channel_id = 1; + repeated VectorClockEntry version = 2; + uint64 epoch = 3; +} + +enum FormatTrigger { + Save = 0; + Manual = 1; +} + +message FormatBuffers { + uint64 project_id = 1; + FormatTrigger trigger = 2; + repeated uint64 buffer_ids = 3; +} + +message FormatBuffersResponse { + ProjectTransaction transaction = 1; +} + +message GetCompletions { + uint64 project_id = 1; + uint64 buffer_id = 2; + Anchor position = 3; + repeated VectorClockEntry version = 4; +} + +message GetCompletionsResponse { + repeated Completion completions = 1; + repeated VectorClockEntry version = 2; +} + +message ApplyCompletionAdditionalEdits { + uint64 project_id = 1; + uint64 buffer_id = 2; + Completion completion = 3; +} + +message ApplyCompletionAdditionalEditsResponse { + Transaction transaction = 1; +} + +message Completion { + Anchor old_start = 1; + Anchor old_end = 2; + string new_text = 3; + uint64 server_id = 4; + bytes lsp_completion = 5; +} + +message GetCodeActions { + uint64 project_id = 1; + uint64 buffer_id = 2; + Anchor start = 3; + Anchor end = 4; + repeated VectorClockEntry version = 5; +} + +message GetCodeActionsResponse { + repeated CodeAction actions = 1; + repeated VectorClockEntry version = 2; +} + +message GetHover { + uint64 project_id = 1; + uint64 buffer_id = 2; + Anchor position = 3; + repeated VectorClockEntry version = 5; +} + +message GetHoverResponse { + optional Anchor start = 1; + optional Anchor end = 2; + repeated HoverBlock contents = 3; +} + +message HoverBlock { + string text = 1; + optional string language = 2; + bool is_markdown = 3; +} + +message ApplyCodeAction { + uint64 project_id = 1; + uint64 buffer_id = 2; + CodeAction action = 3; +} + +message ApplyCodeActionResponse { + ProjectTransaction transaction = 1; +} + +message PrepareRename { + uint64 project_id = 1; + uint64 buffer_id = 2; + Anchor position = 3; + repeated VectorClockEntry version = 4; +} + +message PrepareRenameResponse { + bool can_rename = 1; + Anchor start = 2; + Anchor end = 3; + repeated VectorClockEntry version = 4; +} + +message PerformRename { + uint64 project_id = 1; + uint64 buffer_id = 2; + Anchor position = 3; + string new_name = 4; + repeated VectorClockEntry version = 5; +} + +message OnTypeFormatting { + uint64 project_id = 1; + uint64 buffer_id = 2; + Anchor position = 3; + string trigger = 4; + repeated VectorClockEntry version = 5; +} + +message OnTypeFormattingResponse { + Transaction transaction = 1; +} + +message InlayHints { + uint64 project_id = 1; + uint64 buffer_id = 2; + Anchor start = 3; + Anchor end = 4; + repeated VectorClockEntry version = 5; +} + +message InlayHintsResponse { + repeated InlayHint hints = 1; + repeated VectorClockEntry version = 2; +} + +message InlayHint { + Anchor position = 1; + InlayHintLabel label = 2; + optional string kind = 3; + bool padding_left = 4; + bool padding_right = 5; + InlayHintTooltip tooltip = 6; + ResolveState resolve_state = 7; +} + +message InlayHintLabel { + oneof label { + string value = 1; + InlayHintLabelParts label_parts = 2; + } +} + +message InlayHintLabelParts { + repeated InlayHintLabelPart parts = 1; +} + +message InlayHintLabelPart { + string value = 1; + InlayHintLabelPartTooltip tooltip = 2; + optional string location_url = 3; + PointUtf16 location_range_start = 4; + PointUtf16 location_range_end = 5; + optional uint64 language_server_id = 6; +} + +message InlayHintTooltip { + oneof content { + string value = 1; + MarkupContent markup_content = 2; + } +} + +message InlayHintLabelPartTooltip { + oneof content { + string value = 1; + MarkupContent markup_content = 2; + } +} + +message ResolveState { + State state = 1; + LspResolveState lsp_resolve_state = 2; + + enum State { + Resolved = 0; + CanResolve = 1; + Resolving = 2; + } + + message LspResolveState { + string value = 1; + uint64 server_id = 2; + } +} + +message ResolveInlayHint { + uint64 project_id = 1; + uint64 buffer_id = 2; + uint64 language_server_id = 3; + InlayHint hint = 4; +} + +message ResolveInlayHintResponse { + InlayHint hint = 1; +} + +message RefreshInlayHints { + uint64 project_id = 1; +} + +message MarkupContent { + bool is_markdown = 1; + string value = 2; +} + +message PerformRenameResponse { + ProjectTransaction transaction = 2; +} + +message SearchProject { + uint64 project_id = 1; + string query = 2; + bool regex = 3; + bool whole_word = 4; + bool case_sensitive = 5; + string files_to_include = 6; + string files_to_exclude = 7; +} + +message SearchProjectResponse { + repeated Location locations = 1; +} + +message CodeAction { + uint64 server_id = 1; + Anchor start = 2; + Anchor end = 3; + bytes lsp_action = 4; +} + +message ProjectTransaction { + repeated uint64 buffer_ids = 1; + repeated Transaction transactions = 2; +} + +message Transaction { + LamportTimestamp id = 1; + repeated LamportTimestamp edit_ids = 2; + repeated VectorClockEntry start = 3; +} + +message LamportTimestamp { + uint32 replica_id = 1; + uint32 value = 2; +} + +message LanguageServer { + uint64 id = 1; + string name = 2; +} + +message StartLanguageServer { + uint64 project_id = 1; + LanguageServer server = 2; +} + +message UpdateDiagnosticSummary { + uint64 project_id = 1; + uint64 worktree_id = 2; + DiagnosticSummary summary = 3; +} + +message DiagnosticSummary { + string path = 1; + uint64 language_server_id = 2; + uint32 error_count = 3; + uint32 warning_count = 4; +} + +message UpdateLanguageServer { + uint64 project_id = 1; + uint64 language_server_id = 2; + oneof variant { + LspWorkStart work_start = 3; + LspWorkProgress work_progress = 4; + LspWorkEnd work_end = 5; + LspDiskBasedDiagnosticsUpdating disk_based_diagnostics_updating = 6; + LspDiskBasedDiagnosticsUpdated disk_based_diagnostics_updated = 7; + } +} + +message LspWorkStart { + string token = 1; + optional string message = 2; + optional uint32 percentage = 3; +} + +message LspWorkProgress { + string token = 1; + optional string message = 2; + optional uint32 percentage = 3; +} + +message LspWorkEnd { + string token = 1; +} + +message LspDiskBasedDiagnosticsUpdating {} + +message LspDiskBasedDiagnosticsUpdated {} + +message UpdateChannels { + repeated Channel channels = 1; + repeated ChannelEdge insert_edge = 2; + repeated ChannelEdge delete_edge = 3; + repeated uint64 delete_channels = 4; + repeated Channel channel_invitations = 5; + repeated uint64 remove_channel_invitations = 6; + repeated ChannelParticipants channel_participants = 7; + repeated ChannelPermission channel_permissions = 8; + repeated UnseenChannelMessage unseen_channel_messages = 9; + repeated UnseenChannelBufferChange unseen_channel_buffer_changes = 10; +} + +message UnseenChannelMessage { + uint64 channel_id = 1; + uint64 message_id = 2; +} + +message UnseenChannelBufferChange { + uint64 channel_id = 1; + uint64 epoch = 2; + repeated VectorClockEntry version = 3; +} + +message ChannelEdge { + uint64 channel_id = 1; + uint64 parent_id = 2; +} + +message ChannelPermission { + uint64 channel_id = 1; + bool is_admin = 2; +} + +message ChannelParticipants { + uint64 channel_id = 1; + repeated uint64 participant_user_ids = 2; +} + +message JoinChannel { + uint64 channel_id = 1; +} + +message DeleteChannel { + uint64 channel_id = 1; +} + +message GetChannelMembers { + uint64 channel_id = 1; +} + +message GetChannelMembersResponse { + repeated ChannelMember members = 1; +} + +message ChannelMember { + uint64 user_id = 1; + bool admin = 2; + Kind kind = 3; + + enum Kind { + Member = 0; + Invitee = 1; + AncestorMember = 2; + } +} + +message CreateChannel { + string name = 1; + optional uint64 parent_id = 2; +} + +message CreateChannelResponse { + Channel channel = 1; + optional uint64 parent_id = 2; +} + +message InviteChannelMember { + uint64 channel_id = 1; + uint64 user_id = 2; + bool admin = 3; +} + +message RemoveChannelMember { + uint64 channel_id = 1; + uint64 user_id = 2; +} + +message SetChannelMemberAdmin { + uint64 channel_id = 1; + uint64 user_id = 2; + bool admin = 3; +} + +message RenameChannel { + uint64 channel_id = 1; + string name = 2; +} + +message RenameChannelResponse { + Channel channel = 1; +} + +message JoinChannelChat { + uint64 channel_id = 1; +} + +message JoinChannelChatResponse { + repeated ChannelMessage messages = 1; + bool done = 2; +} + +message LeaveChannelChat { + uint64 channel_id = 1; +} + +message SendChannelMessage { + uint64 channel_id = 1; + string body = 2; + Nonce nonce = 3; +} + +message RemoveChannelMessage { + uint64 channel_id = 1; + uint64 message_id = 2; +} + +message AckChannelMessage { + uint64 channel_id = 1; + uint64 message_id = 2; +} + +message SendChannelMessageResponse { + ChannelMessage message = 1; +} + +message ChannelMessageSent { + uint64 channel_id = 1; + ChannelMessage message = 2; +} + +message GetChannelMessages { + uint64 channel_id = 1; + uint64 before_message_id = 2; +} + +message GetChannelMessagesResponse { + repeated ChannelMessage messages = 1; + bool done = 2; +} + +message LinkChannel { + uint64 channel_id = 1; + uint64 to = 2; +} + +message UnlinkChannel { + uint64 channel_id = 1; + uint64 from = 2; +} + +message MoveChannel { + uint64 channel_id = 1; + uint64 from = 2; + uint64 to = 3; +} + +message JoinChannelBuffer { + uint64 channel_id = 1; +} + +message ChannelMessage { + uint64 id = 1; + string body = 2; + uint64 timestamp = 3; + uint64 sender_id = 4; + Nonce nonce = 5; +} + +message RejoinChannelBuffers { + repeated ChannelBufferVersion buffers = 1; +} + +message RejoinChannelBuffersResponse { + repeated RejoinedChannelBuffer buffers = 1; +} + +message AckBufferOperation { + uint64 buffer_id = 1; + uint64 epoch = 2; + repeated VectorClockEntry version = 3; +} + +message JoinChannelBufferResponse { + uint64 buffer_id = 1; + uint32 replica_id = 2; + string base_text = 3; + repeated Operation operations = 4; + repeated Collaborator collaborators = 5; + uint64 epoch = 6; +} + +message RejoinedChannelBuffer { + uint64 channel_id = 1; + repeated VectorClockEntry version = 2; + repeated Operation operations = 3; + repeated Collaborator collaborators = 4; +} + +message LeaveChannelBuffer { + uint64 channel_id = 1; +} + +message RespondToChannelInvite { + uint64 channel_id = 1; + bool accept = 2; +} + +message GetUsers { + repeated uint64 user_ids = 1; +} + +message FuzzySearchUsers { + string query = 1; +} + +message UsersResponse { + repeated User users = 1; +} + +message RequestContact { + uint64 responder_id = 1; +} + +message RemoveContact { + uint64 user_id = 1; +} + +message RespondToContactRequest { + uint64 requester_id = 1; + ContactRequestResponse response = 2; +} + +enum ContactRequestResponse { + Accept = 0; + Decline = 1; + Block = 2; + Dismiss = 3; +} + +message UpdateContacts { + repeated Contact contacts = 1; + repeated uint64 remove_contacts = 2; + repeated IncomingContactRequest incoming_requests = 3; + repeated uint64 remove_incoming_requests = 4; + repeated uint64 outgoing_requests = 5; + repeated uint64 remove_outgoing_requests = 6; +} + +message UpdateInviteInfo { + string url = 1; + uint32 count = 2; +} + +message ShowContacts {} + +message IncomingContactRequest { + uint64 requester_id = 1; + bool should_notify = 2; +} + +message UpdateDiagnostics { + uint32 replica_id = 1; + uint32 lamport_timestamp = 2; + uint64 server_id = 3; + repeated Diagnostic diagnostics = 4; +} + +message Follow { + uint64 room_id = 1; + optional uint64 project_id = 2; + PeerId leader_id = 3; +} + +message FollowResponse { + optional ViewId active_view_id = 1; + repeated View views = 2; +} + +message UpdateFollowers { + uint64 room_id = 1; + optional uint64 project_id = 2; + repeated PeerId follower_ids = 3; + oneof variant { + UpdateActiveView update_active_view = 4; + View create_view = 5; + UpdateView update_view = 6; + } +} + +message Unfollow { + uint64 room_id = 1; + optional uint64 project_id = 2; + PeerId leader_id = 3; +} + +message GetPrivateUserInfo {} + +message GetPrivateUserInfoResponse { + string metrics_id = 1; + bool staff = 2; + repeated string flags = 3; +} + +// Entities + +message ViewId { + PeerId creator = 1; + uint64 id = 2; +} + +message UpdateActiveView { + optional ViewId id = 1; + optional PeerId leader_id = 2; +} + +message UpdateView { + ViewId id = 1; + optional PeerId leader_id = 2; + + oneof variant { + Editor editor = 3; + } + + message Editor { + repeated ExcerptInsertion inserted_excerpts = 1; + repeated uint64 deleted_excerpts = 2; + repeated Selection selections = 3; + optional Selection pending_selection = 4; + EditorAnchor scroll_top_anchor = 5; + float scroll_x = 6; + float scroll_y = 7; + } +} + +message View { + ViewId id = 1; + optional PeerId leader_id = 2; + + oneof variant { + Editor editor = 3; + ChannelView channel_view = 4; + } + + message Editor { + bool singleton = 1; + optional string title = 2; + repeated Excerpt excerpts = 3; + repeated Selection selections = 4; + optional Selection pending_selection = 5; + EditorAnchor scroll_top_anchor = 6; + float scroll_x = 7; + float scroll_y = 8; + } + + message ChannelView { + uint64 channel_id = 1; + Editor editor = 2; + } +} + +message Collaborator { + PeerId peer_id = 1; + uint32 replica_id = 2; + uint64 user_id = 3; +} + +message User { + uint64 id = 1; + string github_login = 2; + string avatar_url = 3; +} + +message File { + uint64 worktree_id = 1; + uint64 entry_id = 2; + string path = 3; + Timestamp mtime = 4; + bool is_deleted = 5; +} + +message Entry { + uint64 id = 1; + bool is_dir = 2; + string path = 3; + uint64 inode = 4; + Timestamp mtime = 5; + bool is_symlink = 6; + bool is_ignored = 7; + bool is_external = 8; + optional GitStatus git_status = 9; +} + +message RepositoryEntry { + uint64 work_directory_id = 1; + optional string branch = 2; +} + +message StatusEntry { + string repo_path = 1; + GitStatus status = 2; +} + +enum GitStatus { + Added = 0; + Modified = 1; + Conflict = 2; +} + +message BufferState { + uint64 id = 1; + optional File file = 2; + string base_text = 3; + optional string diff_base = 4; + LineEnding line_ending = 5; + repeated VectorClockEntry saved_version = 6; + string saved_version_fingerprint = 7; + Timestamp saved_mtime = 8; +} + +message BufferChunk { + uint64 buffer_id = 1; + repeated Operation operations = 2; + bool is_last = 3; +} + +enum LineEnding { + Unix = 0; + Windows = 1; +} + +message Selection { + uint64 id = 1; + EditorAnchor start = 2; + EditorAnchor end = 3; + bool reversed = 4; +} + +message EditorAnchor { + uint64 excerpt_id = 1; + Anchor anchor = 2; +} + +enum CursorShape { + CursorBar = 0; + CursorBlock = 1; + CursorUnderscore = 2; + CursorHollow = 3; +} + +message ExcerptInsertion { + Excerpt excerpt = 1; + optional uint64 previous_excerpt_id = 2; +} + +message Excerpt { + uint64 id = 1; + uint64 buffer_id = 2; + Anchor context_start = 3; + Anchor context_end = 4; + Anchor primary_start = 5; + Anchor primary_end = 6; +} + +message Anchor { + uint32 replica_id = 1; + uint32 timestamp = 2; + uint64 offset = 3; + Bias bias = 4; + optional uint64 buffer_id = 5; +} + +enum Bias { + Left = 0; + Right = 1; +} + +message Diagnostic { + Anchor start = 1; + Anchor end = 2; + optional string source = 3; + Severity severity = 4; + string message = 5; + optional string code = 6; + uint64 group_id = 7; + bool is_primary = 8; + bool is_valid = 9; + bool is_disk_based = 10; + bool is_unnecessary = 11; + + enum Severity { + None = 0; + Error = 1; + Warning = 2; + Information = 3; + Hint = 4; + } +} + +message Operation { + oneof variant { + Edit edit = 1; + Undo undo = 2; + UpdateSelections update_selections = 3; + UpdateDiagnostics update_diagnostics = 4; + UpdateCompletionTriggers update_completion_triggers = 5; + } + + message Edit { + uint32 replica_id = 1; + uint32 lamport_timestamp = 2; + repeated VectorClockEntry version = 3; + repeated Range ranges = 4; + repeated string new_text = 5; + } + + message Undo { + uint32 replica_id = 1; + uint32 lamport_timestamp = 2; + repeated VectorClockEntry version = 3; + repeated UndoCount counts = 4; + } + + message UpdateSelections { + uint32 replica_id = 1; + uint32 lamport_timestamp = 2; + repeated Selection selections = 3; + bool line_mode = 4; + CursorShape cursor_shape = 5; + } + + message UpdateCompletionTriggers { + uint32 replica_id = 1; + uint32 lamport_timestamp = 2; + repeated string triggers = 3; + } +} + +message UndoMapEntry { + uint32 replica_id = 1; + uint32 local_timestamp = 2; + repeated UndoCount counts = 3; +} + +message UndoCount { + uint32 replica_id = 1; + uint32 lamport_timestamp = 2; + uint32 count = 3; +} + +message VectorClockEntry { + uint32 replica_id = 1; + uint32 timestamp = 2; +} + +message Timestamp { + uint64 seconds = 1; + uint32 nanos = 2; +} + +message Range { + uint64 start = 1; + uint64 end = 2; +} + +message PointUtf16 { + uint32 row = 1; + uint32 column = 2; +} + +message Nonce { + uint64 upper_half = 1; + uint64 lower_half = 2; +} + +message Channel { + uint64 id = 1; + string name = 2; +} + +message Contact { + uint64 user_id = 1; + bool online = 2; + bool busy = 3; + bool should_notify = 4; +} + +message WorktreeMetadata { + uint64 id = 1; + string root_name = 2; + bool visible = 3; + string abs_path = 4; +} + +message UpdateDiffBase { + uint64 project_id = 1; + uint64 buffer_id = 2; + optional string diff_base = 3; +} diff --git a/crates/rpc2/src/auth.rs b/crates/rpc2/src/auth.rs new file mode 100644 index 0000000000..ac7bbcebec --- /dev/null +++ b/crates/rpc2/src/auth.rs @@ -0,0 +1,136 @@ +use anyhow::{Context, Result}; +use rand::{thread_rng, Rng as _}; +use rsa::{PublicKey as _, PublicKeyEncoding, RSAPrivateKey, RSAPublicKey}; +use std::convert::TryFrom; + +pub struct PublicKey(RSAPublicKey); + +pub struct PrivateKey(RSAPrivateKey); + +/// Generate a public and private key for asymmetric encryption. +pub fn keypair() -> Result<(PublicKey, PrivateKey)> { + let mut rng = thread_rng(); + let bits = 1024; + let private_key = RSAPrivateKey::new(&mut rng, bits)?; + let public_key = RSAPublicKey::from(&private_key); + Ok((PublicKey(public_key), PrivateKey(private_key))) +} + +/// Generate a random 64-character base64 string. +pub fn random_token() -> String { + let mut rng = thread_rng(); + let mut token_bytes = [0; 48]; + for byte in token_bytes.iter_mut() { + *byte = rng.gen(); + } + base64::encode_config(token_bytes, base64::URL_SAFE) +} + +impl PublicKey { + /// Convert a string to a base64-encoded string that can only be decoded with the corresponding + /// private key. + pub fn encrypt_string(&self, string: &str) -> Result { + let mut rng = thread_rng(); + let bytes = string.as_bytes(); + let encrypted_bytes = self + .0 + .encrypt(&mut rng, PADDING_SCHEME, bytes) + .context("failed to encrypt string with public key")?; + let encrypted_string = base64::encode_config(&encrypted_bytes, base64::URL_SAFE); + Ok(encrypted_string) + } +} + +impl PrivateKey { + /// Decrypt a base64-encoded string that was encrypted by the corresponding public key. + pub fn decrypt_string(&self, encrypted_string: &str) -> Result { + let encrypted_bytes = base64::decode_config(encrypted_string, base64::URL_SAFE) + .context("failed to base64-decode encrypted string")?; + let bytes = self + .0 + .decrypt(PADDING_SCHEME, &encrypted_bytes) + .context("failed to decrypt string with private key")?; + let string = String::from_utf8(bytes).context("decrypted content was not valid utf8")?; + Ok(string) + } +} + +impl TryFrom for String { + type Error = anyhow::Error; + fn try_from(key: PublicKey) -> Result { + let bytes = key.0.to_pkcs1().context("failed to serialize public key")?; + let string = base64::encode_config(&bytes, base64::URL_SAFE); + Ok(string) + } +} + +impl TryFrom for PublicKey { + type Error = anyhow::Error; + fn try_from(value: String) -> Result { + let bytes = base64::decode_config(&value, base64::URL_SAFE) + .context("failed to base64-decode public key string")?; + let key = Self(RSAPublicKey::from_pkcs1(&bytes).context("failed to parse public key")?); + Ok(key) + } +} + +const PADDING_SCHEME: rsa::PaddingScheme = rsa::PaddingScheme::PKCS1v15Encrypt; + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_generate_encrypt_and_decrypt_token() { + // CLIENT: + // * generate a keypair for asymmetric encryption + // * serialize the public key to send it to the server. + let (public, private) = keypair().unwrap(); + let public_string = String::try_from(public).unwrap(); + assert_printable(&public_string); + + // SERVER: + // * parse the public key + // * generate a random token. + // * encrypt the token using the public key. + let public = PublicKey::try_from(public_string).unwrap(); + let token = random_token(); + let encrypted_token = public.encrypt_string(&token).unwrap(); + assert_eq!(token.len(), 64); + assert_ne!(encrypted_token, token); + assert_printable(&token); + assert_printable(&encrypted_token); + + // CLIENT: + // * decrypt the token using the private key. + let decrypted_token = private.decrypt_string(&encrypted_token).unwrap(); + assert_eq!(decrypted_token, token); + } + + #[test] + fn test_tokens_are_always_url_safe() { + for _ in 0..5 { + let token = random_token(); + let (public_key, _) = keypair().unwrap(); + let encrypted_token = public_key.encrypt_string(&token).unwrap(); + let public_key_str = String::try_from(public_key).unwrap(); + + assert_printable(&token); + assert_printable(&public_key_str); + assert_printable(&encrypted_token); + } + } + + fn assert_printable(token: &str) { + for c in token.chars() { + assert!( + c.is_ascii_graphic(), + "token {:?} has non-printable char {}", + token, + c + ); + assert_ne!(c, '/', "token {:?} is not URL-safe", token); + assert_ne!(c, '&', "token {:?} is not URL-safe", token); + } + } +} diff --git a/crates/rpc2/src/conn.rs b/crates/rpc2/src/conn.rs new file mode 100644 index 0000000000..902e9822d5 --- /dev/null +++ b/crates/rpc2/src/conn.rs @@ -0,0 +1,108 @@ +use async_tungstenite::tungstenite::Message as WebSocketMessage; +use futures::{SinkExt as _, StreamExt as _}; + +pub struct Connection { + pub(crate) tx: + Box>, + pub(crate) rx: Box< + dyn 'static + + Send + + Unpin + + futures::Stream>, + >, +} + +impl Connection { + pub fn new(stream: S) -> Self + where + S: 'static + + Send + + Unpin + + futures::Sink + + futures::Stream>, + { + let (tx, rx) = stream.split(); + Self { + tx: Box::new(tx), + rx: Box::new(rx), + } + } + + pub async fn send(&mut self, message: WebSocketMessage) -> Result<(), anyhow::Error> { + self.tx.send(message).await + } + + #[cfg(any(test, feature = "test-support"))] + pub fn in_memory( + executor: gpui2::Executor, + ) -> (Self, Self, std::sync::Arc) { + use std::sync::{ + atomic::{AtomicBool, Ordering::SeqCst}, + Arc, + }; + + let killed = Arc::new(AtomicBool::new(false)); + let (a_tx, a_rx) = channel(killed.clone(), executor.clone()); + let (b_tx, b_rx) = channel(killed.clone(), executor); + return ( + Self { tx: a_tx, rx: b_rx }, + Self { tx: b_tx, rx: a_rx }, + killed, + ); + + #[allow(clippy::type_complexity)] + fn channel( + killed: Arc, + executor: gpui2::Executor, + ) -> ( + Box>, + Box>>, + ) { + use anyhow::anyhow; + use futures::channel::mpsc; + use std::io::{Error, ErrorKind}; + + let (tx, rx) = mpsc::unbounded::(); + + let tx = tx.sink_map_err(|error| anyhow!(error)).with({ + let killed = killed.clone(); + let executor = executor.clone(); + move |msg| { + let killed = killed.clone(); + let executor = executor.clone(); + Box::pin(async move { + executor.simulate_random_delay().await; + + // Writes to a half-open TCP connection will error. + if killed.load(SeqCst) { + std::io::Result::Err(Error::new(ErrorKind::Other, "connection lost"))?; + } + + Ok(msg) + }) + } + }); + + let rx = rx.then({ + let killed = killed; + let executor = executor.clone(); + move |msg| { + let killed = killed.clone(); + let executor = executor.clone(); + Box::pin(async move { + executor.simulate_random_delay().await; + + // Reads from a half-open TCP connection will hang. + if killed.load(SeqCst) { + futures::future::pending::<()>().await; + } + + Ok(msg) + }) + } + }); + + (Box::new(tx), Box::new(rx)) + } + } +} diff --git a/crates/rpc2/src/macros.rs b/crates/rpc2/src/macros.rs new file mode 100644 index 0000000000..89e605540d --- /dev/null +++ b/crates/rpc2/src/macros.rs @@ -0,0 +1,70 @@ +#[macro_export] +macro_rules! messages { + ($(($name:ident, $priority:ident)),* $(,)?) => { + pub fn build_typed_envelope(sender_id: ConnectionId, envelope: Envelope) -> Option> { + match envelope.payload { + $(Some(envelope::Payload::$name(payload)) => { + Some(Box::new(TypedEnvelope { + sender_id, + original_sender_id: envelope.original_sender_id.map(|original_sender| PeerId { + owner_id: original_sender.owner_id, + id: original_sender.id + }), + message_id: envelope.id, + payload, + })) + }, )* + _ => None + } + } + + $( + impl EnvelopedMessage for $name { + const NAME: &'static str = std::stringify!($name); + const PRIORITY: MessagePriority = MessagePriority::$priority; + + fn into_envelope( + self, + id: u32, + responding_to: Option, + original_sender_id: Option, + ) -> Envelope { + Envelope { + id, + responding_to, + original_sender_id, + payload: Some(envelope::Payload::$name(self)), + } + } + + fn from_envelope(envelope: Envelope) -> Option { + if let Some(envelope::Payload::$name(msg)) = envelope.payload { + Some(msg) + } else { + None + } + } + } + )* + }; +} + +#[macro_export] +macro_rules! request_messages { + ($(($request_name:ident, $response_name:ident)),* $(,)?) => { + $(impl RequestMessage for $request_name { + type Response = $response_name; + })* + }; +} + +#[macro_export] +macro_rules! entity_messages { + ($id_field:ident, $($name:ident),* $(,)?) => { + $(impl EntityMessage for $name { + fn remote_entity_id(&self) -> u64 { + self.$id_field + } + })* + }; +} diff --git a/crates/rpc2/src/peer.rs b/crates/rpc2/src/peer.rs new file mode 100644 index 0000000000..6dfb170f4c --- /dev/null +++ b/crates/rpc2/src/peer.rs @@ -0,0 +1,933 @@ +use super::{ + proto::{self, AnyTypedEnvelope, EnvelopedMessage, MessageStream, PeerId, RequestMessage}, + Connection, +}; +use anyhow::{anyhow, Context, Result}; +use collections::HashMap; +use futures::{ + channel::{mpsc, oneshot}, + stream::BoxStream, + FutureExt, SinkExt, StreamExt, TryFutureExt, +}; +use parking_lot::{Mutex, RwLock}; +use serde::{ser::SerializeStruct, Serialize}; +use std::{fmt, sync::atomic::Ordering::SeqCst}; +use std::{ + future::Future, + marker::PhantomData, + sync::{ + atomic::{self, AtomicU32}, + Arc, + }, + time::Duration, +}; +use tracing::instrument; + +#[derive(Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Serialize)] +pub struct ConnectionId { + pub owner_id: u32, + pub id: u32, +} + +impl Into for ConnectionId { + fn into(self) -> PeerId { + PeerId { + owner_id: self.owner_id, + id: self.id, + } + } +} + +impl From for ConnectionId { + fn from(peer_id: PeerId) -> Self { + Self { + owner_id: peer_id.owner_id, + id: peer_id.id, + } + } +} + +impl fmt::Display for ConnectionId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}/{}", self.owner_id, self.id) + } +} + +pub struct Receipt { + pub sender_id: ConnectionId, + pub message_id: u32, + payload_type: PhantomData, +} + +impl Clone for Receipt { + fn clone(&self) -> Self { + Self { + sender_id: self.sender_id, + message_id: self.message_id, + payload_type: PhantomData, + } + } +} + +impl Copy for Receipt {} + +#[derive(Clone, Debug)] +pub struct TypedEnvelope { + pub sender_id: ConnectionId, + pub original_sender_id: Option, + pub message_id: u32, + pub payload: T, +} + +impl TypedEnvelope { + pub fn original_sender_id(&self) -> Result { + self.original_sender_id + .ok_or_else(|| anyhow!("missing original_sender_id")) + } +} + +impl TypedEnvelope { + pub fn receipt(&self) -> Receipt { + Receipt { + sender_id: self.sender_id, + message_id: self.message_id, + payload_type: PhantomData, + } + } +} + +pub struct Peer { + epoch: AtomicU32, + pub connections: RwLock>, + next_connection_id: AtomicU32, +} + +#[derive(Clone, Serialize)] +pub struct ConnectionState { + #[serde(skip)] + outgoing_tx: mpsc::UnboundedSender, + next_message_id: Arc, + #[allow(clippy::type_complexity)] + #[serde(skip)] + response_channels: + Arc)>>>>>, +} + +const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1); +const WRITE_TIMEOUT: Duration = Duration::from_secs(2); +pub const RECEIVE_TIMEOUT: Duration = Duration::from_secs(10); + +impl Peer { + pub fn new(epoch: u32) -> Arc { + Arc::new(Self { + epoch: AtomicU32::new(epoch), + connections: Default::default(), + next_connection_id: Default::default(), + }) + } + + pub fn epoch(&self) -> u32 { + self.epoch.load(SeqCst) + } + + #[instrument(skip_all)] + pub fn add_connection( + self: &Arc, + connection: Connection, + create_timer: F, + ) -> ( + ConnectionId, + impl Future> + Send, + BoxStream<'static, Box>, + ) + where + F: Send + Fn(Duration) -> Fut, + Fut: Send + Future, + Out: Send, + { + // For outgoing messages, use an unbounded channel so that application code + // can always send messages without yielding. For incoming messages, use a + // bounded channel so that other peers will receive backpressure if they send + // messages faster than this peer can process them. + #[cfg(any(test, feature = "test-support"))] + const INCOMING_BUFFER_SIZE: usize = 1; + #[cfg(not(any(test, feature = "test-support")))] + const INCOMING_BUFFER_SIZE: usize = 64; + let (mut incoming_tx, incoming_rx) = mpsc::channel(INCOMING_BUFFER_SIZE); + let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded(); + + let connection_id = ConnectionId { + owner_id: self.epoch.load(SeqCst), + id: self.next_connection_id.fetch_add(1, SeqCst), + }; + let connection_state = ConnectionState { + outgoing_tx, + next_message_id: Default::default(), + response_channels: Arc::new(Mutex::new(Some(Default::default()))), + }; + let mut writer = MessageStream::new(connection.tx); + let mut reader = MessageStream::new(connection.rx); + + let this = self.clone(); + let response_channels = connection_state.response_channels.clone(); + let handle_io = async move { + tracing::trace!(%connection_id, "handle io future: start"); + + let _end_connection = util::defer(|| { + response_channels.lock().take(); + this.connections.write().remove(&connection_id); + tracing::trace!(%connection_id, "handle io future: end"); + }); + + // Send messages on this frequency so the connection isn't closed. + let keepalive_timer = create_timer(KEEPALIVE_INTERVAL).fuse(); + futures::pin_mut!(keepalive_timer); + + // Disconnect if we don't receive messages at least this frequently. + let receive_timeout = create_timer(RECEIVE_TIMEOUT).fuse(); + futures::pin_mut!(receive_timeout); + + loop { + tracing::trace!(%connection_id, "outer loop iteration start"); + let read_message = reader.read().fuse(); + futures::pin_mut!(read_message); + + loop { + tracing::trace!(%connection_id, "inner loop iteration start"); + futures::select_biased! { + outgoing = outgoing_rx.next().fuse() => match outgoing { + Some(outgoing) => { + tracing::trace!(%connection_id, "outgoing rpc message: writing"); + futures::select_biased! { + result = writer.write(outgoing).fuse() => { + tracing::trace!(%connection_id, "outgoing rpc message: done writing"); + result.context("failed to write RPC message")?; + tracing::trace!(%connection_id, "keepalive interval: resetting after sending message"); + keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse()); + } + _ = create_timer(WRITE_TIMEOUT).fuse() => { + tracing::trace!(%connection_id, "outgoing rpc message: writing timed out"); + Err(anyhow!("timed out writing message"))?; + } + } + } + None => { + tracing::trace!(%connection_id, "outgoing rpc message: channel closed"); + return Ok(()) + }, + }, + _ = keepalive_timer => { + tracing::trace!(%connection_id, "keepalive interval: pinging"); + futures::select_biased! { + result = writer.write(proto::Message::Ping).fuse() => { + tracing::trace!(%connection_id, "keepalive interval: done pinging"); + result.context("failed to send keepalive")?; + tracing::trace!(%connection_id, "keepalive interval: resetting after pinging"); + keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse()); + } + _ = create_timer(WRITE_TIMEOUT).fuse() => { + tracing::trace!(%connection_id, "keepalive interval: pinging timed out"); + Err(anyhow!("timed out sending keepalive"))?; + } + } + } + incoming = read_message => { + let incoming = incoming.context("error reading rpc message from socket")?; + tracing::trace!(%connection_id, "incoming rpc message: received"); + tracing::trace!(%connection_id, "receive timeout: resetting"); + receive_timeout.set(create_timer(RECEIVE_TIMEOUT).fuse()); + if let proto::Message::Envelope(incoming) = incoming { + tracing::trace!(%connection_id, "incoming rpc message: processing"); + futures::select_biased! { + result = incoming_tx.send(incoming).fuse() => match result { + Ok(_) => { + tracing::trace!(%connection_id, "incoming rpc message: processed"); + } + Err(_) => { + tracing::trace!(%connection_id, "incoming rpc message: channel closed"); + return Ok(()) + } + }, + _ = create_timer(WRITE_TIMEOUT).fuse() => { + tracing::trace!(%connection_id, "incoming rpc message: processing timed out"); + Err(anyhow!("timed out processing incoming message"))? + } + } + } + break; + }, + _ = receive_timeout => { + tracing::trace!(%connection_id, "receive timeout: delay between messages too long"); + Err(anyhow!("delay between messages too long"))? + } + } + } + } + }; + + let response_channels = connection_state.response_channels.clone(); + self.connections + .write() + .insert(connection_id, connection_state); + + let incoming_rx = incoming_rx.filter_map(move |incoming| { + let response_channels = response_channels.clone(); + async move { + let message_id = incoming.id; + tracing::trace!(?incoming, "incoming message future: start"); + let _end = util::defer(move || { + tracing::trace!(%connection_id, message_id, "incoming message future: end"); + }); + + if let Some(responding_to) = incoming.responding_to { + tracing::trace!( + %connection_id, + message_id, + responding_to, + "incoming response: received" + ); + let channel = response_channels.lock().as_mut()?.remove(&responding_to); + if let Some(tx) = channel { + let requester_resumed = oneshot::channel(); + if let Err(error) = tx.send((incoming, requester_resumed.0)) { + tracing::trace!( + %connection_id, + message_id, + responding_to = responding_to, + ?error, + "incoming response: request future dropped", + ); + } + + tracing::trace!( + %connection_id, + message_id, + responding_to, + "incoming response: waiting to resume requester" + ); + let _ = requester_resumed.1.await; + tracing::trace!( + %connection_id, + message_id, + responding_to, + "incoming response: requester resumed" + ); + } else { + tracing::warn!( + %connection_id, + message_id, + responding_to, + "incoming response: unknown request" + ); + } + + None + } else { + tracing::trace!(%connection_id, message_id, "incoming message: received"); + proto::build_typed_envelope(connection_id, incoming).or_else(|| { + tracing::error!( + %connection_id, + message_id, + "unable to construct a typed envelope" + ); + None + }) + } + } + }); + (connection_id, handle_io, incoming_rx.boxed()) + } + + #[cfg(any(test, feature = "test-support"))] + pub fn add_test_connection( + self: &Arc, + connection: Connection, + executor: gpui2::Executor, + ) -> ( + ConnectionId, + impl Future> + Send, + BoxStream<'static, Box>, + ) { + let executor = executor.clone(); + self.add_connection(connection, move |duration| executor.timer(duration)) + } + + pub fn disconnect(&self, connection_id: ConnectionId) { + self.connections.write().remove(&connection_id); + } + + pub fn reset(&self, epoch: u32) { + self.teardown(); + self.next_connection_id.store(0, SeqCst); + self.epoch.store(epoch, SeqCst); + } + + pub fn teardown(&self) { + self.connections.write().clear(); + } + + pub fn request( + &self, + receiver_id: ConnectionId, + request: T, + ) -> impl Future> { + self.request_internal(None, receiver_id, request) + .map_ok(|envelope| envelope.payload) + } + + pub fn request_envelope( + &self, + receiver_id: ConnectionId, + request: T, + ) -> impl Future>> { + self.request_internal(None, receiver_id, request) + } + + pub fn forward_request( + &self, + sender_id: ConnectionId, + receiver_id: ConnectionId, + request: T, + ) -> impl Future> { + self.request_internal(Some(sender_id), receiver_id, request) + .map_ok(|envelope| envelope.payload) + } + + pub fn request_internal( + &self, + original_sender_id: Option, + receiver_id: ConnectionId, + request: T, + ) -> impl Future>> { + let (tx, rx) = oneshot::channel(); + let send = self.connection_state(receiver_id).and_then(|connection| { + let message_id = connection.next_message_id.fetch_add(1, SeqCst); + connection + .response_channels + .lock() + .as_mut() + .ok_or_else(|| anyhow!("connection was closed"))? + .insert(message_id, tx); + connection + .outgoing_tx + .unbounded_send(proto::Message::Envelope(request.into_envelope( + message_id, + None, + original_sender_id.map(Into::into), + ))) + .map_err(|_| anyhow!("connection was closed"))?; + Ok(()) + }); + async move { + send?; + let (response, _barrier) = rx.await.map_err(|_| anyhow!("connection was closed"))?; + + if let Some(proto::envelope::Payload::Error(error)) = &response.payload { + Err(anyhow!( + "RPC request {} failed - {}", + T::NAME, + error.message + )) + } else { + Ok(TypedEnvelope { + message_id: response.id, + sender_id: receiver_id, + original_sender_id: response.original_sender_id, + payload: T::Response::from_envelope(response) + .ok_or_else(|| anyhow!("received response of the wrong type"))?, + }) + } + } + } + + pub fn send(&self, receiver_id: ConnectionId, message: T) -> Result<()> { + let connection = self.connection_state(receiver_id)?; + let message_id = connection + .next_message_id + .fetch_add(1, atomic::Ordering::SeqCst); + connection + .outgoing_tx + .unbounded_send(proto::Message::Envelope( + message.into_envelope(message_id, None, None), + ))?; + Ok(()) + } + + pub fn forward_send( + &self, + sender_id: ConnectionId, + receiver_id: ConnectionId, + message: T, + ) -> Result<()> { + let connection = self.connection_state(receiver_id)?; + let message_id = connection + .next_message_id + .fetch_add(1, atomic::Ordering::SeqCst); + connection + .outgoing_tx + .unbounded_send(proto::Message::Envelope(message.into_envelope( + message_id, + None, + Some(sender_id.into()), + )))?; + Ok(()) + } + + pub fn respond( + &self, + receipt: Receipt, + response: T::Response, + ) -> Result<()> { + let connection = self.connection_state(receipt.sender_id)?; + let message_id = connection + .next_message_id + .fetch_add(1, atomic::Ordering::SeqCst); + connection + .outgoing_tx + .unbounded_send(proto::Message::Envelope(response.into_envelope( + message_id, + Some(receipt.message_id), + None, + )))?; + Ok(()) + } + + pub fn respond_with_error( + &self, + receipt: Receipt, + response: proto::Error, + ) -> Result<()> { + let connection = self.connection_state(receipt.sender_id)?; + let message_id = connection + .next_message_id + .fetch_add(1, atomic::Ordering::SeqCst); + connection + .outgoing_tx + .unbounded_send(proto::Message::Envelope(response.into_envelope( + message_id, + Some(receipt.message_id), + None, + )))?; + Ok(()) + } + + pub fn respond_with_unhandled_message( + &self, + envelope: Box, + ) -> Result<()> { + let connection = self.connection_state(envelope.sender_id())?; + let response = proto::Error { + message: format!("message {} was not handled", envelope.payload_type_name()), + }; + let message_id = connection + .next_message_id + .fetch_add(1, atomic::Ordering::SeqCst); + connection + .outgoing_tx + .unbounded_send(proto::Message::Envelope(response.into_envelope( + message_id, + Some(envelope.message_id()), + None, + )))?; + Ok(()) + } + + fn connection_state(&self, connection_id: ConnectionId) -> Result { + let connections = self.connections.read(); + let connection = connections + .get(&connection_id) + .ok_or_else(|| anyhow!("no such connection: {}", connection_id))?; + Ok(connection.clone()) + } +} + +impl Serialize for Peer { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut state = serializer.serialize_struct("Peer", 2)?; + state.serialize_field("connections", &*self.connections.read())?; + state.end() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::TypedEnvelope; + use async_tungstenite::tungstenite::Message as WebSocketMessage; + use gpui2::TestAppContext; + + #[ctor::ctor] + fn init_logger() { + if std::env::var("RUST_LOG").is_ok() { + env_logger::init(); + } + } + + #[gpui2::test(iterations = 50)] + async fn test_request_response(cx: &mut TestAppContext) { + let executor = cx.executor(); + + // create 2 clients connected to 1 server + let server = Peer::new(0); + let client1 = Peer::new(0); + let client2 = Peer::new(0); + + let (client1_to_server_conn, server_to_client_1_conn, _kill) = + Connection::in_memory(cx.executor().clone()); + let (client1_conn_id, io_task1, client1_incoming) = + client1.add_test_connection(client1_to_server_conn, cx.executor().clone()); + let (_, io_task2, server_incoming1) = + server.add_test_connection(server_to_client_1_conn, cx.executor().clone()); + + let (client2_to_server_conn, server_to_client_2_conn, _kill) = + Connection::in_memory(cx.executor().clone()); + let (client2_conn_id, io_task3, client2_incoming) = + client2.add_test_connection(client2_to_server_conn, cx.executor().clone()); + let (_, io_task4, server_incoming2) = + server.add_test_connection(server_to_client_2_conn, cx.executor().clone()); + + executor.spawn(io_task1).detach(); + executor.spawn(io_task2).detach(); + executor.spawn(io_task3).detach(); + executor.spawn(io_task4).detach(); + executor + .spawn(handle_messages(server_incoming1, server.clone())) + .detach(); + executor + .spawn(handle_messages(client1_incoming, client1.clone())) + .detach(); + executor + .spawn(handle_messages(server_incoming2, server.clone())) + .detach(); + executor + .spawn(handle_messages(client2_incoming, client2.clone())) + .detach(); + + assert_eq!( + client1 + .request(client1_conn_id, proto::Ping {},) + .await + .unwrap(), + proto::Ack {} + ); + + assert_eq!( + client2 + .request(client2_conn_id, proto::Ping {},) + .await + .unwrap(), + proto::Ack {} + ); + + assert_eq!( + client1 + .request(client1_conn_id, proto::Test { id: 1 },) + .await + .unwrap(), + proto::Test { id: 1 } + ); + + assert_eq!( + client2 + .request(client2_conn_id, proto::Test { id: 2 }) + .await + .unwrap(), + proto::Test { id: 2 } + ); + + client1.disconnect(client1_conn_id); + client2.disconnect(client1_conn_id); + + async fn handle_messages( + mut messages: BoxStream<'static, Box>, + peer: Arc, + ) -> Result<()> { + while let Some(envelope) = messages.next().await { + let envelope = envelope.into_any(); + if let Some(envelope) = envelope.downcast_ref::>() { + let receipt = envelope.receipt(); + peer.respond(receipt, proto::Ack {})? + } else if let Some(envelope) = envelope.downcast_ref::>() + { + peer.respond(envelope.receipt(), envelope.payload.clone())? + } else { + panic!("unknown message type"); + } + } + + Ok(()) + } + } + + #[gpui2::test(iterations = 50)] + async fn test_order_of_response_and_incoming(cx: &mut TestAppContext) { + let executor = cx.executor(); + let server = Peer::new(0); + let client = Peer::new(0); + + let (client_to_server_conn, server_to_client_conn, _kill) = + Connection::in_memory(executor.clone()); + let (client_to_server_conn_id, io_task1, mut client_incoming) = + client.add_test_connection(client_to_server_conn, executor.clone()); + + let (server_to_client_conn_id, io_task2, mut server_incoming) = + server.add_test_connection(server_to_client_conn, executor.clone()); + + executor.spawn(io_task1).detach(); + executor.spawn(io_task2).detach(); + + executor + .spawn(async move { + let future = server_incoming.next().await; + let request = future + .unwrap() + .into_any() + .downcast::>() + .unwrap(); + + server + .send( + server_to_client_conn_id, + proto::Error { + message: "message 1".to_string(), + }, + ) + .unwrap(); + server + .send( + server_to_client_conn_id, + proto::Error { + message: "message 2".to_string(), + }, + ) + .unwrap(); + server.respond(request.receipt(), proto::Ack {}).unwrap(); + + // Prevent the connection from being dropped + server_incoming.next().await; + }) + .detach(); + + let events = Arc::new(Mutex::new(Vec::new())); + + let response = client.request(client_to_server_conn_id, proto::Ping {}); + let response_task = executor.spawn({ + let events = events.clone(); + async move { + response.await.unwrap(); + events.lock().push("response".to_string()); + } + }); + + executor + .spawn({ + let events = events.clone(); + async move { + let incoming1 = client_incoming + .next() + .await + .unwrap() + .into_any() + .downcast::>() + .unwrap(); + events.lock().push(incoming1.payload.message); + let incoming2 = client_incoming + .next() + .await + .unwrap() + .into_any() + .downcast::>() + .unwrap(); + events.lock().push(incoming2.payload.message); + + // Prevent the connection from being dropped + client_incoming.next().await; + } + }) + .detach(); + + response_task.await; + assert_eq!( + &*events.lock(), + &[ + "message 1".to_string(), + "message 2".to_string(), + "response".to_string() + ] + ); + } + + #[gpui2::test(iterations = 50)] + async fn test_dropping_request_before_completion(cx: &mut TestAppContext) { + let executor = cx.executor().clone(); + let server = Peer::new(0); + let client = Peer::new(0); + + let (client_to_server_conn, server_to_client_conn, _kill) = + Connection::in_memory(cx.executor().clone()); + let (client_to_server_conn_id, io_task1, mut client_incoming) = + client.add_test_connection(client_to_server_conn, cx.executor().clone()); + let (server_to_client_conn_id, io_task2, mut server_incoming) = + server.add_test_connection(server_to_client_conn, cx.executor().clone()); + + executor.spawn(io_task1).detach(); + executor.spawn(io_task2).detach(); + + executor + .spawn(async move { + let request1 = server_incoming + .next() + .await + .unwrap() + .into_any() + .downcast::>() + .unwrap(); + let request2 = server_incoming + .next() + .await + .unwrap() + .into_any() + .downcast::>() + .unwrap(); + + server + .send( + server_to_client_conn_id, + proto::Error { + message: "message 1".to_string(), + }, + ) + .unwrap(); + server + .send( + server_to_client_conn_id, + proto::Error { + message: "message 2".to_string(), + }, + ) + .unwrap(); + server.respond(request1.receipt(), proto::Ack {}).unwrap(); + server.respond(request2.receipt(), proto::Ack {}).unwrap(); + + // Prevent the connection from being dropped + server_incoming.next().await; + }) + .detach(); + + let events = Arc::new(Mutex::new(Vec::new())); + + let request1 = client.request(client_to_server_conn_id, proto::Ping {}); + let request1_task = executor.spawn(request1); + let request2 = client.request(client_to_server_conn_id, proto::Ping {}); + let request2_task = executor.spawn({ + let events = events.clone(); + async move { + request2.await.unwrap(); + events.lock().push("response 2".to_string()); + } + }); + + executor + .spawn({ + let events = events.clone(); + async move { + let incoming1 = client_incoming + .next() + .await + .unwrap() + .into_any() + .downcast::>() + .unwrap(); + events.lock().push(incoming1.payload.message); + let incoming2 = client_incoming + .next() + .await + .unwrap() + .into_any() + .downcast::>() + .unwrap(); + events.lock().push(incoming2.payload.message); + + // Prevent the connection from being dropped + client_incoming.next().await; + } + }) + .detach(); + + // Allow the request to make some progress before dropping it. + cx.executor().simulate_random_delay().await; + drop(request1_task); + + request2_task.await; + assert_eq!( + &*events.lock(), + &[ + "message 1".to_string(), + "message 2".to_string(), + "response 2".to_string() + ] + ); + } + + #[gpui2::test(iterations = 50)] + async fn test_disconnect(cx: &mut TestAppContext) { + let executor = cx.executor(); + + let (client_conn, mut server_conn, _kill) = Connection::in_memory(executor.clone()); + + let client = Peer::new(0); + let (connection_id, io_handler, mut incoming) = + client.add_test_connection(client_conn, executor.clone()); + + let (io_ended_tx, io_ended_rx) = oneshot::channel(); + executor + .spawn(async move { + io_handler.await.ok(); + io_ended_tx.send(()).unwrap(); + }) + .detach(); + + let (messages_ended_tx, messages_ended_rx) = oneshot::channel(); + executor + .spawn(async move { + incoming.next().await; + messages_ended_tx.send(()).unwrap(); + }) + .detach(); + + client.disconnect(connection_id); + + let _ = io_ended_rx.await; + let _ = messages_ended_rx.await; + assert!(server_conn + .send(WebSocketMessage::Binary(vec![])) + .await + .is_err()); + } + + #[gpui2::test(iterations = 50)] + async fn test_io_error(cx: &mut TestAppContext) { + let executor = cx.executor(); + let (client_conn, mut server_conn, _kill) = Connection::in_memory(executor.clone()); + + let client = Peer::new(0); + let (connection_id, io_handler, mut incoming) = + client.add_test_connection(client_conn, executor.clone()); + executor.spawn(io_handler).detach(); + executor + .spawn(async move { incoming.next().await }) + .detach(); + + let response = executor.spawn(client.request(connection_id, proto::Ping {})); + let _request = server_conn.rx.next().await.unwrap().unwrap(); + + drop(server_conn); + assert_eq!( + response.await.unwrap_err().to_string(), + "connection was closed" + ); + } +} diff --git a/crates/rpc2/src/proto.rs b/crates/rpc2/src/proto.rs new file mode 100644 index 0000000000..c1a7af3e4d --- /dev/null +++ b/crates/rpc2/src/proto.rs @@ -0,0 +1,674 @@ +#![allow(non_snake_case)] + +use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope}; +use anyhow::{anyhow, Result}; +use async_tungstenite::tungstenite::Message as WebSocketMessage; +use collections::HashMap; +use futures::{SinkExt as _, StreamExt as _}; +use prost::Message as _; +use serde::Serialize; +use std::any::{Any, TypeId}; +use std::{ + cmp, + fmt::Debug, + io, iter, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; +use std::{fmt, mem}; + +include!(concat!(env!("OUT_DIR"), "/zed.messages.rs")); + +pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static { + const NAME: &'static str; + const PRIORITY: MessagePriority; + fn into_envelope( + self, + id: u32, + responding_to: Option, + original_sender_id: Option, + ) -> Envelope; + fn from_envelope(envelope: Envelope) -> Option; +} + +pub trait EntityMessage: EnvelopedMessage { + fn remote_entity_id(&self) -> u64; +} + +pub trait RequestMessage: EnvelopedMessage { + type Response: EnvelopedMessage; +} + +pub trait AnyTypedEnvelope: 'static + Send + Sync { + fn payload_type_id(&self) -> TypeId; + fn payload_type_name(&self) -> &'static str; + fn as_any(&self) -> &dyn Any; + fn into_any(self: Box) -> Box; + fn is_background(&self) -> bool; + fn original_sender_id(&self) -> Option; + fn sender_id(&self) -> ConnectionId; + fn message_id(&self) -> u32; +} + +pub enum MessagePriority { + Foreground, + Background, +} + +impl AnyTypedEnvelope for TypedEnvelope { + fn payload_type_id(&self) -> TypeId { + TypeId::of::() + } + + fn payload_type_name(&self) -> &'static str { + T::NAME + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn into_any(self: Box) -> Box { + self + } + + fn is_background(&self) -> bool { + matches!(T::PRIORITY, MessagePriority::Background) + } + + fn original_sender_id(&self) -> Option { + self.original_sender_id + } + + fn sender_id(&self) -> ConnectionId { + self.sender_id + } + + fn message_id(&self) -> u32 { + self.message_id + } +} + +impl PeerId { + pub fn from_u64(peer_id: u64) -> Self { + let owner_id = (peer_id >> 32) as u32; + let id = peer_id as u32; + Self { owner_id, id } + } + + pub fn as_u64(self) -> u64 { + ((self.owner_id as u64) << 32) | (self.id as u64) + } +} + +impl Copy for PeerId {} + +impl Eq for PeerId {} + +impl Ord for PeerId { + fn cmp(&self, other: &Self) -> cmp::Ordering { + self.owner_id + .cmp(&other.owner_id) + .then_with(|| self.id.cmp(&other.id)) + } +} + +impl PartialOrd for PeerId { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl std::hash::Hash for PeerId { + fn hash(&self, state: &mut H) { + self.owner_id.hash(state); + self.id.hash(state); + } +} + +impl fmt::Display for PeerId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}/{}", self.owner_id, self.id) + } +} + +messages!( + (Ack, Foreground), + (AddProjectCollaborator, Foreground), + (ApplyCodeAction, Background), + (ApplyCodeActionResponse, Background), + (ApplyCompletionAdditionalEdits, Background), + (ApplyCompletionAdditionalEditsResponse, Background), + (BufferReloaded, Foreground), + (BufferSaved, Foreground), + (Call, Foreground), + (CallCanceled, Foreground), + (CancelCall, Foreground), + (CopyProjectEntry, Foreground), + (CreateBufferForPeer, Foreground), + (CreateChannel, Foreground), + (CreateChannelResponse, Foreground), + (ChannelMessageSent, Foreground), + (CreateProjectEntry, Foreground), + (CreateRoom, Foreground), + (CreateRoomResponse, Foreground), + (DeclineCall, Foreground), + (DeleteProjectEntry, Foreground), + (Error, Foreground), + (ExpandProjectEntry, Foreground), + (Follow, Foreground), + (FollowResponse, Foreground), + (FormatBuffers, Foreground), + (FormatBuffersResponse, Foreground), + (FuzzySearchUsers, Foreground), + (GetCodeActions, Background), + (GetCodeActionsResponse, Background), + (GetHover, Background), + (GetHoverResponse, Background), + (GetChannelMessages, Background), + (GetChannelMessagesResponse, Background), + (SendChannelMessage, Background), + (SendChannelMessageResponse, Background), + (GetCompletions, Background), + (GetCompletionsResponse, Background), + (GetDefinition, Background), + (GetDefinitionResponse, Background), + (GetTypeDefinition, Background), + (GetTypeDefinitionResponse, Background), + (GetDocumentHighlights, Background), + (GetDocumentHighlightsResponse, Background), + (GetReferences, Background), + (GetReferencesResponse, Background), + (GetProjectSymbols, Background), + (GetProjectSymbolsResponse, Background), + (GetUsers, Foreground), + (Hello, Foreground), + (IncomingCall, Foreground), + (InviteChannelMember, Foreground), + (UsersResponse, Foreground), + (JoinProject, Foreground), + (JoinProjectResponse, Foreground), + (JoinRoom, Foreground), + (JoinRoomResponse, Foreground), + (JoinChannelChat, Foreground), + (JoinChannelChatResponse, Foreground), + (LeaveChannelChat, Foreground), + (LeaveProject, Foreground), + (LeaveRoom, Foreground), + (OpenBufferById, Background), + (OpenBufferByPath, Background), + (OpenBufferForSymbol, Background), + (OpenBufferForSymbolResponse, Background), + (OpenBufferResponse, Background), + (PerformRename, Background), + (PerformRenameResponse, Background), + (OnTypeFormatting, Background), + (OnTypeFormattingResponse, Background), + (InlayHints, Background), + (InlayHintsResponse, Background), + (ResolveInlayHint, Background), + (ResolveInlayHintResponse, Background), + (RefreshInlayHints, Foreground), + (Ping, Foreground), + (PrepareRename, Background), + (PrepareRenameResponse, Background), + (ExpandProjectEntryResponse, Foreground), + (ProjectEntryResponse, Foreground), + (RejoinRoom, Foreground), + (RejoinRoomResponse, Foreground), + (RemoveContact, Foreground), + (RemoveChannelMember, Foreground), + (RemoveChannelMessage, Foreground), + (ReloadBuffers, Foreground), + (ReloadBuffersResponse, Foreground), + (RemoveProjectCollaborator, Foreground), + (RenameProjectEntry, Foreground), + (RequestContact, Foreground), + (RespondToContactRequest, Foreground), + (RespondToChannelInvite, Foreground), + (JoinChannel, Foreground), + (RoomUpdated, Foreground), + (SaveBuffer, Foreground), + (RenameChannel, Foreground), + (RenameChannelResponse, Foreground), + (SetChannelMemberAdmin, Foreground), + (SearchProject, Background), + (SearchProjectResponse, Background), + (ShareProject, Foreground), + (ShareProjectResponse, Foreground), + (ShowContacts, Foreground), + (StartLanguageServer, Foreground), + (SynchronizeBuffers, Foreground), + (SynchronizeBuffersResponse, Foreground), + (RejoinChannelBuffers, Foreground), + (RejoinChannelBuffersResponse, Foreground), + (Test, Foreground), + (Unfollow, Foreground), + (UnshareProject, Foreground), + (UpdateBuffer, Foreground), + (UpdateBufferFile, Foreground), + (UpdateContacts, Foreground), + (DeleteChannel, Foreground), + (MoveChannel, Foreground), + (LinkChannel, Foreground), + (UnlinkChannel, Foreground), + (UpdateChannels, Foreground), + (UpdateDiagnosticSummary, Foreground), + (UpdateFollowers, Foreground), + (UpdateInviteInfo, Foreground), + (UpdateLanguageServer, Foreground), + (UpdateParticipantLocation, Foreground), + (UpdateProject, Foreground), + (UpdateProjectCollaborator, Foreground), + (UpdateWorktree, Foreground), + (UpdateWorktreeSettings, Foreground), + (UpdateDiffBase, Foreground), + (GetPrivateUserInfo, Foreground), + (GetPrivateUserInfoResponse, Foreground), + (GetChannelMembers, Foreground), + (GetChannelMembersResponse, Foreground), + (JoinChannelBuffer, Foreground), + (JoinChannelBufferResponse, Foreground), + (LeaveChannelBuffer, Background), + (UpdateChannelBuffer, Foreground), + (UpdateChannelBufferCollaborators, Foreground), + (AckBufferOperation, Background), + (AckChannelMessage, Background), +); + +request_messages!( + (ApplyCodeAction, ApplyCodeActionResponse), + ( + ApplyCompletionAdditionalEdits, + ApplyCompletionAdditionalEditsResponse + ), + (Call, Ack), + (CancelCall, Ack), + (CopyProjectEntry, ProjectEntryResponse), + (CreateProjectEntry, ProjectEntryResponse), + (CreateRoom, CreateRoomResponse), + (CreateChannel, CreateChannelResponse), + (DeclineCall, Ack), + (DeleteProjectEntry, ProjectEntryResponse), + (ExpandProjectEntry, ExpandProjectEntryResponse), + (Follow, FollowResponse), + (FormatBuffers, FormatBuffersResponse), + (GetCodeActions, GetCodeActionsResponse), + (GetHover, GetHoverResponse), + (GetCompletions, GetCompletionsResponse), + (GetDefinition, GetDefinitionResponse), + (GetTypeDefinition, GetTypeDefinitionResponse), + (GetDocumentHighlights, GetDocumentHighlightsResponse), + (GetReferences, GetReferencesResponse), + (GetPrivateUserInfo, GetPrivateUserInfoResponse), + (GetProjectSymbols, GetProjectSymbolsResponse), + (FuzzySearchUsers, UsersResponse), + (GetUsers, UsersResponse), + (InviteChannelMember, Ack), + (JoinProject, JoinProjectResponse), + (JoinRoom, JoinRoomResponse), + (JoinChannelChat, JoinChannelChatResponse), + (LeaveRoom, Ack), + (RejoinRoom, RejoinRoomResponse), + (IncomingCall, Ack), + (OpenBufferById, OpenBufferResponse), + (OpenBufferByPath, OpenBufferResponse), + (OpenBufferForSymbol, OpenBufferForSymbolResponse), + (Ping, Ack), + (PerformRename, PerformRenameResponse), + (PrepareRename, PrepareRenameResponse), + (OnTypeFormatting, OnTypeFormattingResponse), + (InlayHints, InlayHintsResponse), + (ResolveInlayHint, ResolveInlayHintResponse), + (RefreshInlayHints, Ack), + (ReloadBuffers, ReloadBuffersResponse), + (RequestContact, Ack), + (RemoveChannelMember, Ack), + (RemoveContact, Ack), + (RespondToContactRequest, Ack), + (RespondToChannelInvite, Ack), + (SetChannelMemberAdmin, Ack), + (SendChannelMessage, SendChannelMessageResponse), + (GetChannelMessages, GetChannelMessagesResponse), + (GetChannelMembers, GetChannelMembersResponse), + (JoinChannel, JoinRoomResponse), + (RemoveChannelMessage, Ack), + (DeleteChannel, Ack), + (RenameProjectEntry, ProjectEntryResponse), + (RenameChannel, RenameChannelResponse), + (LinkChannel, Ack), + (UnlinkChannel, Ack), + (MoveChannel, Ack), + (SaveBuffer, BufferSaved), + (SearchProject, SearchProjectResponse), + (ShareProject, ShareProjectResponse), + (SynchronizeBuffers, SynchronizeBuffersResponse), + (RejoinChannelBuffers, RejoinChannelBuffersResponse), + (Test, Test), + (UpdateBuffer, Ack), + (UpdateParticipantLocation, Ack), + (UpdateProject, Ack), + (UpdateWorktree, Ack), + (JoinChannelBuffer, JoinChannelBufferResponse), + (LeaveChannelBuffer, Ack) +); + +entity_messages!( + project_id, + AddProjectCollaborator, + ApplyCodeAction, + ApplyCompletionAdditionalEdits, + BufferReloaded, + BufferSaved, + CopyProjectEntry, + CreateBufferForPeer, + CreateProjectEntry, + DeleteProjectEntry, + ExpandProjectEntry, + FormatBuffers, + GetCodeActions, + GetCompletions, + GetDefinition, + GetTypeDefinition, + GetDocumentHighlights, + GetHover, + GetReferences, + GetProjectSymbols, + JoinProject, + LeaveProject, + OpenBufferById, + OpenBufferByPath, + OpenBufferForSymbol, + PerformRename, + OnTypeFormatting, + InlayHints, + ResolveInlayHint, + RefreshInlayHints, + PrepareRename, + ReloadBuffers, + RemoveProjectCollaborator, + RenameProjectEntry, + SaveBuffer, + SearchProject, + StartLanguageServer, + SynchronizeBuffers, + UnshareProject, + UpdateBuffer, + UpdateBufferFile, + UpdateDiagnosticSummary, + UpdateLanguageServer, + UpdateProject, + UpdateProjectCollaborator, + UpdateWorktree, + UpdateWorktreeSettings, + UpdateDiffBase +); + +entity_messages!( + channel_id, + ChannelMessageSent, + UpdateChannelBuffer, + RemoveChannelMessage, + UpdateChannelBufferCollaborators, +); + +const KIB: usize = 1024; +const MIB: usize = KIB * 1024; +const MAX_BUFFER_LEN: usize = MIB; + +/// A stream of protobuf messages. +pub struct MessageStream { + stream: S, + encoding_buffer: Vec, +} + +#[allow(clippy::large_enum_variant)] +#[derive(Debug)] +pub enum Message { + Envelope(Envelope), + Ping, + Pong, +} + +impl MessageStream { + pub fn new(stream: S) -> Self { + Self { + stream, + encoding_buffer: Vec::new(), + } + } + + pub fn inner_mut(&mut self) -> &mut S { + &mut self.stream + } +} + +impl MessageStream +where + S: futures::Sink + Unpin, +{ + pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> { + #[cfg(any(test, feature = "test-support"))] + const COMPRESSION_LEVEL: i32 = -7; + + #[cfg(not(any(test, feature = "test-support")))] + const COMPRESSION_LEVEL: i32 = 4; + + match message { + Message::Envelope(message) => { + self.encoding_buffer.reserve(message.encoded_len()); + message + .encode(&mut self.encoding_buffer) + .map_err(io::Error::from)?; + let buffer = + zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL) + .unwrap(); + + self.encoding_buffer.clear(); + self.encoding_buffer.shrink_to(MAX_BUFFER_LEN); + self.stream.send(WebSocketMessage::Binary(buffer)).await?; + } + Message::Ping => { + self.stream + .send(WebSocketMessage::Ping(Default::default())) + .await?; + } + Message::Pong => { + self.stream + .send(WebSocketMessage::Pong(Default::default())) + .await?; + } + } + + Ok(()) + } +} + +impl MessageStream +where + S: futures::Stream> + Unpin, +{ + pub async fn read(&mut self) -> Result { + while let Some(bytes) = self.stream.next().await { + match bytes? { + WebSocketMessage::Binary(bytes) => { + zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap(); + let envelope = Envelope::decode(self.encoding_buffer.as_slice()) + .map_err(io::Error::from)?; + + self.encoding_buffer.clear(); + self.encoding_buffer.shrink_to(MAX_BUFFER_LEN); + return Ok(Message::Envelope(envelope)); + } + WebSocketMessage::Ping(_) => return Ok(Message::Ping), + WebSocketMessage::Pong(_) => return Ok(Message::Pong), + WebSocketMessage::Close(_) => break, + _ => {} + } + } + Err(anyhow!("connection closed")) + } +} + +impl From for SystemTime { + fn from(val: Timestamp) -> Self { + UNIX_EPOCH + .checked_add(Duration::new(val.seconds, val.nanos)) + .unwrap() + } +} + +impl From for Timestamp { + fn from(time: SystemTime) -> Self { + let duration = time.duration_since(UNIX_EPOCH).unwrap(); + Self { + seconds: duration.as_secs(), + nanos: duration.subsec_nanos(), + } + } +} + +impl From for Nonce { + fn from(nonce: u128) -> Self { + let upper_half = (nonce >> 64) as u64; + let lower_half = nonce as u64; + Self { + upper_half, + lower_half, + } + } +} + +impl From for u128 { + fn from(nonce: Nonce) -> Self { + let upper_half = (nonce.upper_half as u128) << 64; + let lower_half = nonce.lower_half as u128; + upper_half | lower_half + } +} + +pub fn split_worktree_update( + mut message: UpdateWorktree, + max_chunk_size: usize, +) -> impl Iterator { + let mut done_files = false; + + let mut repository_map = message + .updated_repositories + .into_iter() + .map(|repo| (repo.work_directory_id, repo)) + .collect::>(); + + iter::from_fn(move || { + if done_files { + return None; + } + + let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size); + let updated_entries: Vec<_> = message + .updated_entries + .drain(..updated_entries_chunk_size) + .collect(); + + let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size); + let removed_entries = message + .removed_entries + .drain(..removed_entries_chunk_size) + .collect(); + + done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty(); + + let mut updated_repositories = Vec::new(); + + if !repository_map.is_empty() { + for entry in &updated_entries { + if let Some(repo) = repository_map.remove(&entry.id) { + updated_repositories.push(repo) + } + } + } + + let removed_repositories = if done_files { + mem::take(&mut message.removed_repositories) + } else { + Default::default() + }; + + if done_files { + updated_repositories.extend(mem::take(&mut repository_map).into_values()); + } + + Some(UpdateWorktree { + project_id: message.project_id, + worktree_id: message.worktree_id, + root_name: message.root_name.clone(), + abs_path: message.abs_path.clone(), + updated_entries, + removed_entries, + scan_id: message.scan_id, + is_last_update: done_files && message.is_last_update, + updated_repositories, + removed_repositories, + }) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[gpui2::test] + async fn test_buffer_size() { + let (tx, rx) = futures::channel::mpsc::unbounded(); + let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!(""))); + sink.write(Message::Envelope(Envelope { + payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree { + root_name: "abcdefg".repeat(10), + ..Default::default() + })), + ..Default::default() + })) + .await + .unwrap(); + assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN); + sink.write(Message::Envelope(Envelope { + payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree { + root_name: "abcdefg".repeat(1000000), + ..Default::default() + })), + ..Default::default() + })) + .await + .unwrap(); + assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN); + + let mut stream = MessageStream::new(rx.map(anyhow::Ok)); + stream.read().await.unwrap(); + assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN); + stream.read().await.unwrap(); + assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN); + } + + #[gpui2::test] + fn test_converting_peer_id_from_and_to_u64() { + let peer_id = PeerId { + owner_id: 10, + id: 3, + }; + assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id); + let peer_id = PeerId { + owner_id: u32::MAX, + id: 3, + }; + assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id); + let peer_id = PeerId { + owner_id: 10, + id: u32::MAX, + }; + assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id); + let peer_id = PeerId { + owner_id: u32::MAX, + id: u32::MAX, + }; + assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id); + } +} diff --git a/crates/rpc2/src/rpc.rs b/crates/rpc2/src/rpc.rs new file mode 100644 index 0000000000..942672b94b --- /dev/null +++ b/crates/rpc2/src/rpc.rs @@ -0,0 +1,9 @@ +pub mod auth; +mod conn; +mod peer; +pub mod proto; +pub use conn::Connection; +pub use peer::*; +mod macros; + +pub const PROTOCOL_VERSION: u32 = 64; diff --git a/crates/zed2/Cargo.toml b/crates/zed2/Cargo.toml index bc56e31457..1d90536a33 100644 --- a/crates/zed2/Cargo.toml +++ b/crates/zed2/Cargo.toml @@ -57,7 +57,7 @@ project2 = { path = "../project2" } # project_symbols = { path = "../project_symbols" } # quick_action_bar = { path = "../quick_action_bar" } # recent_projects = { path = "../recent_projects" } -rpc = { path = "../rpc" } +rpc2 = { path = "../rpc2" } settings2 = { path = "../settings2" } feature_flags = { path = "../feature_flags" } sum_tree = { path = "../sum_tree" }