diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b204a16c3e..6e133e7a21 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -74,7 +74,7 @@ jobs: version: v1.29.0 - uses: bufbuild/buf-breaking-action@v1 with: - input: "crates/rpc/proto/" + input: "crates/proto/proto/" against: "https://github.com/${GITHUB_REPOSITORY}.git#branch=${BUF_BASE_BRANCH},subdir=crates/rpc/proto/" macos_tests: diff --git a/Cargo.lock b/Cargo.lock index 6b8b78fbb1..55f8d35aba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7931,6 +7931,17 @@ dependencies = [ "prost", ] +[[package]] +name = "proto" +version = "0.1.0" +dependencies = [ + "anyhow", + "collections", + "prost", + "prost-build", + "serde", +] + [[package]] name = "protobuf" version = "2.28.0" @@ -8514,8 +8525,7 @@ dependencies = [ "futures 0.3.28", "gpui", "parking_lot", - "prost", - "prost-build", + "proto", "rand 0.8.5", "rsa 0.4.0", "serde", diff --git a/Cargo.toml b/Cargo.toml index 523056e1f4..4af8ca2630 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,6 +68,7 @@ members = [ "crates/project", "crates/project_panel", "crates/project_symbols", + "crates/proto", "crates/quick_action_bar", "crates/recent_projects", "crates/refineable", @@ -319,6 +320,7 @@ pretty_assertions = "1.3.0" prost = "0.9" prost-build = "0.9" prost-types = "0.9" +proto = { path = "./crates/proto" } pulldown-cmark = { version = "0.10.0", default-features = false } rand = "0.8.5" refineable = { path = "./crates/refineable" } diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index fa4b3241b1..dc30080f8b 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -1729,6 +1729,7 @@ mod tests { use gpui::{BackgroundExecutor, Context, TestAppContext}; use http::FakeHttpClient; use parking_lot::Mutex; + use proto::TypedEnvelope; use settings::SettingsStore; use std::future; diff --git a/crates/proto/Cargo.toml b/crates/proto/Cargo.toml new file mode 100644 index 0000000000..eca020a92d --- /dev/null +++ b/crates/proto/Cargo.toml @@ -0,0 +1,29 @@ +[package] +description = "Shared protocol for communication between the Zed app and the zed.dev server" +edition = "2021" +name = "proto" +version = "0.1.0" +publish = false +license = "GPL-3.0-or-later" + +[features] +test-support = ["collections/test-support"] + +[lints] +workspace = true + +[lib] +path = "src/proto.rs" +doctest = false + +[dependencies] +anyhow.workspace = true +collections.workspace = true +prost.workspace = true +serde.workspace = true + +[build-dependencies] +prost-build.workspace = true + +[dev-dependencies] +collections = { workspace = true, features = ["test-support"] } diff --git a/crates/rpc/build.rs b/crates/proto/build.rs similarity index 100% rename from crates/rpc/build.rs rename to crates/proto/build.rs diff --git a/crates/rpc/proto/buf.yaml b/crates/proto/proto/buf.yaml similarity index 100% rename from crates/rpc/proto/buf.yaml rename to crates/proto/proto/buf.yaml diff --git a/crates/rpc/proto/zed.proto b/crates/proto/proto/zed.proto similarity index 100% rename from crates/rpc/proto/zed.proto rename to crates/proto/proto/zed.proto diff --git a/crates/rpc/src/error.rs b/crates/proto/src/error.rs similarity index 88% rename from crates/rpc/src/error.rs rename to crates/proto/src/error.rs index f589863f2d..8a87d6fdc9 100644 --- a/crates/rpc/src/error.rs +++ b/crates/proto/src/error.rs @@ -31,8 +31,7 @@ /// } /// ``` /// -use crate::proto; -pub use proto::ErrorCode; +pub use crate::ErrorCode; /// ErrorCodeExt provides some helpers for structured error handling. /// @@ -53,7 +52,7 @@ pub trait ErrorCodeExt { fn with_tag(self, k: &str, v: &str) -> RpcError; } -impl ErrorCodeExt for proto::ErrorCode { +impl ErrorCodeExt for ErrorCode { fn anyhow(self) -> anyhow::Error { self.into() } @@ -75,21 +74,21 @@ impl ErrorCodeExt for proto::ErrorCode { /// what we use throughout our codebase. Though under the hood this pub trait ErrorExt { /// error_code() returns the ErrorCode (or ErrorCode::Internal if there is none) - fn error_code(&self) -> proto::ErrorCode; + fn error_code(&self) -> ErrorCode; /// error_tag() returns the value of the tag with the given key, if any. fn error_tag(&self, k: &str) -> Option<&str>; - /// to_proto() converts the error into a proto::Error - fn to_proto(&self) -> proto::Error; + /// to_proto() converts the error into a crate::Error + fn to_proto(&self) -> crate::Error; /// Clones the error and turns into an [anyhow::Error]. fn cloned(&self) -> anyhow::Error; } impl ErrorExt for anyhow::Error { - fn error_code(&self) -> proto::ErrorCode { + fn error_code(&self) -> ErrorCode { if let Some(rpc_error) = self.downcast_ref::() { rpc_error.code } else { - proto::ErrorCode::Internal + ErrorCode::Internal } } @@ -101,7 +100,7 @@ impl ErrorExt for anyhow::Error { } } - fn to_proto(&self) -> proto::Error { + fn to_proto(&self) -> crate::Error { if let Some(rpc_error) = self.downcast_ref::() { rpc_error.to_proto() } else { @@ -118,8 +117,8 @@ impl ErrorExt for anyhow::Error { } } -impl From for anyhow::Error { - fn from(value: proto::ErrorCode) -> Self { +impl From for anyhow::Error { + fn from(value: ErrorCode) -> Self { RpcError { request: None, code: value, @@ -134,7 +133,7 @@ impl From for anyhow::Error { pub struct RpcError { request: Option, msg: String, - code: proto::ErrorCode, + code: ErrorCode, tags: Vec, } @@ -146,9 +145,9 @@ pub struct RpcError { /// in the app; however it is useful for chaining .message() and .with_tag() on /// ErrorCode. impl RpcError { - /// from_proto converts a proto::Error into an anyhow::Error containing + /// from_proto converts a crate::Error into an anyhow::Error containing /// an RpcError. - pub fn from_proto(error: &proto::Error, request: &str) -> anyhow::Error { + pub fn from_proto(error: &crate::Error, request: &str) -> anyhow::Error { RpcError { request: Some(request.to_string()), code: error.code(), @@ -188,12 +187,12 @@ impl ErrorExt for RpcError { None } - fn error_code(&self) -> proto::ErrorCode { + fn error_code(&self) -> ErrorCode { self.code } - fn to_proto(&self) -> proto::Error { - proto::Error { + fn to_proto(&self) -> crate::Error { + crate::Error { code: self.code as i32, message: self.msg.clone(), tags: self.tags.clone(), @@ -225,8 +224,8 @@ impl std::fmt::Display for RpcError { } } -impl From for RpcError { - fn from(code: proto::ErrorCode) -> Self { +impl From for RpcError { + fn from(code: ErrorCode) -> Self { RpcError { request: None, code, diff --git a/crates/proto/src/macros.rs b/crates/proto/src/macros.rs new file mode 100644 index 0000000000..35750a87c8 --- /dev/null +++ b/crates/proto/src/macros.rs @@ -0,0 +1,70 @@ +#[macro_export] +macro_rules! messages { + ($(($name:ident, $priority:ident)),* $(,)?) => { + pub fn build_typed_envelope(sender_id: PeerId, received_at: Instant, envelope: Envelope) -> Option> { + match envelope.payload { + $(Some(envelope::Payload::$name(payload)) => { + Some(Box::new(TypedEnvelope { + sender_id, + original_sender_id: envelope.original_sender_id, + message_id: envelope.id, + payload, + received_at, + })) + }, )* + _ => None + } + } + + $( + impl EnvelopedMessage for $name { + const NAME: &'static str = std::stringify!($name); + const PRIORITY: MessagePriority = MessagePriority::$priority; + + fn into_envelope( + self, + id: u32, + responding_to: Option, + original_sender_id: Option, + ) -> Envelope { + Envelope { + id, + responding_to, + original_sender_id, + payload: Some(envelope::Payload::$name(self)), + } + } + + fn from_envelope(envelope: Envelope) -> Option { + if let Some(envelope::Payload::$name(msg)) = envelope.payload { + Some(msg) + } else { + None + } + } + } + )* + }; +} + +#[macro_export] +macro_rules! request_messages { + ($(($request_name:ident, $response_name:ident)),* $(,)?) => { + $(impl RequestMessage for $request_name { + type Response = $response_name; + })* + }; +} + +#[macro_export] +macro_rules! entity_messages { + ({$id_field:ident, $entity_type:ty}, $($name:ident),* $(,)?) => { + $(impl EntityMessage for $name { + type Entity = $entity_type; + + fn remote_entity_id(&self) -> u64 { + self.$id_field + } + })* + }; +} diff --git a/crates/proto/src/proto.rs b/crates/proto/src/proto.rs new file mode 100644 index 0000000000..3afa56f43a --- /dev/null +++ b/crates/proto/src/proto.rs @@ -0,0 +1,652 @@ +#![allow(non_snake_case)] + +pub mod error; +mod macros; +mod typed_envelope; + +pub use error::*; +pub use typed_envelope::*; + +use collections::HashMap; +pub use prost::Message; +use serde::Serialize; +use std::any::{Any, TypeId}; +use std::time::Instant; +use std::{ + cmp, + fmt::Debug, + iter, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; +use std::{fmt, mem}; + +include!(concat!(env!("OUT_DIR"), "/zed.messages.rs")); + +pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static { + const NAME: &'static str; + const PRIORITY: MessagePriority; + fn into_envelope( + self, + id: u32, + responding_to: Option, + original_sender_id: Option, + ) -> Envelope; + fn from_envelope(envelope: Envelope) -> Option; +} + +pub trait EntityMessage: EnvelopedMessage { + type Entity; + fn remote_entity_id(&self) -> u64; +} + +pub trait RequestMessage: EnvelopedMessage { + type Response: EnvelopedMessage; +} + +pub trait AnyTypedEnvelope: 'static + Send + Sync { + fn payload_type_id(&self) -> TypeId; + fn payload_type_name(&self) -> &'static str; + fn as_any(&self) -> &dyn Any; + fn into_any(self: Box) -> Box; + fn is_background(&self) -> bool; + fn original_sender_id(&self) -> Option; + fn sender_id(&self) -> PeerId; + fn message_id(&self) -> u32; +} + +pub enum MessagePriority { + Foreground, + Background, +} + +impl AnyTypedEnvelope for TypedEnvelope { + fn payload_type_id(&self) -> TypeId { + TypeId::of::() + } + + fn payload_type_name(&self) -> &'static str { + T::NAME + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn into_any(self: Box) -> Box { + self + } + + fn is_background(&self) -> bool { + matches!(T::PRIORITY, MessagePriority::Background) + } + + fn original_sender_id(&self) -> Option { + self.original_sender_id + } + + fn sender_id(&self) -> PeerId { + self.sender_id + } + + fn message_id(&self) -> u32 { + self.message_id + } +} + +impl PeerId { + pub fn from_u64(peer_id: u64) -> Self { + let owner_id = (peer_id >> 32) as u32; + let id = peer_id as u32; + Self { owner_id, id } + } + + pub fn as_u64(self) -> u64 { + ((self.owner_id as u64) << 32) | (self.id as u64) + } +} + +impl Copy for PeerId {} + +impl Eq for PeerId {} + +impl Ord for PeerId { + fn cmp(&self, other: &Self) -> cmp::Ordering { + self.owner_id + .cmp(&other.owner_id) + .then_with(|| self.id.cmp(&other.id)) + } +} + +impl PartialOrd for PeerId { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl std::hash::Hash for PeerId { + fn hash(&self, state: &mut H) { + self.owner_id.hash(state); + self.id.hash(state); + } +} + +impl fmt::Display for PeerId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}/{}", self.owner_id, self.id) + } +} + +messages!( + (Ack, Foreground), + (AckBufferOperation, Background), + (AckChannelMessage, Background), + (AddNotification, Foreground), + (AddProjectCollaborator, Foreground), + (ApplyCodeAction, Background), + (ApplyCodeActionResponse, Background), + (ApplyCompletionAdditionalEdits, Background), + (ApplyCompletionAdditionalEditsResponse, Background), + (BufferReloaded, Foreground), + (BufferSaved, Foreground), + (Call, Foreground), + (CallCanceled, Foreground), + (CancelCall, Foreground), + (ChannelMessageSent, Foreground), + (ChannelMessageUpdate, Foreground), + (CompleteWithLanguageModel, Background), + (ComputeEmbeddings, Background), + (ComputeEmbeddingsResponse, Background), + (CopyProjectEntry, Foreground), + (CountTokensWithLanguageModel, Background), + (CountTokensResponse, Background), + (CreateBufferForPeer, Foreground), + (CreateChannel, Foreground), + (CreateChannelResponse, Foreground), + (CreateProjectEntry, Foreground), + (CreateRoom, Foreground), + (CreateRoomResponse, Foreground), + (DeclineCall, Foreground), + (DeleteChannel, Foreground), + (DeleteNotification, Foreground), + (UpdateNotification, Foreground), + (DeleteProjectEntry, Foreground), + (EndStream, Foreground), + (Error, Foreground), + (ExpandProjectEntry, Foreground), + (ExpandProjectEntryResponse, Foreground), + (Follow, Foreground), + (FollowResponse, Foreground), + (FormatBuffers, Foreground), + (FormatBuffersResponse, Foreground), + (FuzzySearchUsers, Foreground), + (GetCachedEmbeddings, Background), + (GetCachedEmbeddingsResponse, Background), + (GetChannelMembers, Foreground), + (GetChannelMembersResponse, Foreground), + (GetChannelMessages, Background), + (GetChannelMessagesById, Background), + (GetChannelMessagesResponse, Background), + (GetCodeActions, Background), + (GetCodeActionsResponse, Background), + (GetCompletions, Background), + (GetCompletionsResponse, Background), + (GetDefinition, Background), + (GetDefinitionResponse, Background), + (GetDocumentHighlights, Background), + (GetDocumentHighlightsResponse, Background), + (GetHover, Background), + (GetHoverResponse, Background), + (GetNotifications, Foreground), + (GetNotificationsResponse, Foreground), + (GetPrivateUserInfo, Foreground), + (GetPrivateUserInfoResponse, Foreground), + (GetProjectSymbols, Background), + (GetProjectSymbolsResponse, Background), + (GetReferences, Background), + (GetReferencesResponse, Background), + (GetSupermavenApiKey, Background), + (GetSupermavenApiKeyResponse, Background), + (GetTypeDefinition, Background), + (GetTypeDefinitionResponse, Background), + (GetImplementation, Background), + (GetImplementationResponse, Background), + (GetUsers, Foreground), + (Hello, Foreground), + (IncomingCall, Foreground), + (InlayHints, Background), + (InlayHintsResponse, Background), + (InviteChannelMember, Foreground), + (JoinChannel, Foreground), + (JoinChannelBuffer, Foreground), + (JoinChannelBufferResponse, Foreground), + (JoinChannelChat, Foreground), + (JoinChannelChatResponse, Foreground), + (JoinProject, Foreground), + (JoinHostedProject, Foreground), + (JoinProjectResponse, Foreground), + (JoinRoom, Foreground), + (JoinRoomResponse, Foreground), + (LanguageModelResponse, Background), + (LeaveChannelBuffer, Background), + (LeaveChannelChat, Foreground), + (LeaveProject, Foreground), + (LeaveRoom, Foreground), + (MarkNotificationRead, Foreground), + (MoveChannel, Foreground), + (OnTypeFormatting, Background), + (OnTypeFormattingResponse, Background), + (OpenBufferById, Background), + (OpenBufferByPath, Background), + (OpenBufferForSymbol, Background), + (OpenBufferForSymbolResponse, Background), + (OpenBufferResponse, Background), + (PerformRename, Background), + (PerformRenameResponse, Background), + (Ping, Foreground), + (PrepareRename, Background), + (PrepareRenameResponse, Background), + (ProjectEntryResponse, Foreground), + (RefreshInlayHints, Foreground), + (RejoinChannelBuffers, Foreground), + (RejoinChannelBuffersResponse, Foreground), + (RejoinRoom, Foreground), + (RejoinRoomResponse, Foreground), + (ReloadBuffers, Foreground), + (ReloadBuffersResponse, Foreground), + (RemoveChannelMember, Foreground), + (RemoveChannelMessage, Foreground), + (UpdateChannelMessage, Foreground), + (RemoveContact, Foreground), + (RemoveProjectCollaborator, Foreground), + (RenameChannel, Foreground), + (RenameChannelResponse, Foreground), + (RenameProjectEntry, Foreground), + (RequestContact, Foreground), + (ResolveCompletionDocumentation, Background), + (ResolveCompletionDocumentationResponse, Background), + (ResolveInlayHint, Background), + (ResolveInlayHintResponse, Background), + (RespondToChannelInvite, Foreground), + (RespondToContactRequest, Foreground), + (RoomUpdated, Foreground), + (SaveBuffer, Foreground), + (SetChannelMemberRole, Foreground), + (SetChannelVisibility, Foreground), + (SearchProject, Background), + (SearchProjectResponse, Background), + (SendChannelMessage, Background), + (SendChannelMessageResponse, Background), + (ShareProject, Foreground), + (ShareProjectResponse, Foreground), + (ShowContacts, Foreground), + (StartLanguageServer, Foreground), + (SubscribeToChannels, Foreground), + (SynchronizeBuffers, Foreground), + (SynchronizeBuffersResponse, Foreground), + (TaskContextForLocation, Background), + (TaskContext, Background), + (TaskTemplates, Background), + (TaskTemplatesResponse, Background), + (Test, Foreground), + (Unfollow, Foreground), + (UnshareProject, Foreground), + (UpdateBuffer, Foreground), + (UpdateBufferFile, Foreground), + (UpdateChannelBuffer, Foreground), + (UpdateChannelBufferCollaborators, Foreground), + (UpdateChannels, Foreground), + (UpdateUserChannels, Foreground), + (UpdateContacts, Foreground), + (UpdateDiagnosticSummary, Foreground), + (UpdateDiffBase, Foreground), + (UpdateFollowers, Foreground), + (UpdateInviteInfo, Foreground), + (UpdateLanguageServer, Foreground), + (UpdateParticipantLocation, Foreground), + (UpdateProject, Foreground), + (UpdateProjectCollaborator, Foreground), + (UpdateWorktree, Foreground), + (UpdateWorktreeSettings, Foreground), + (UsersResponse, Foreground), + (LspExtExpandMacro, Background), + (LspExtExpandMacroResponse, Background), + (SetRoomParticipantRole, Foreground), + (BlameBuffer, Foreground), + (BlameBufferResponse, Foreground), + (CreateDevServerProject, Background), + (CreateDevServerProjectResponse, Foreground), + (CreateDevServer, Foreground), + (CreateDevServerResponse, Foreground), + (DevServerInstructions, Foreground), + (ShutdownDevServer, Foreground), + (ReconnectDevServer, Foreground), + (ReconnectDevServerResponse, Foreground), + (ShareDevServerProject, Foreground), + (JoinDevServerProject, Foreground), + (RejoinRemoteProjects, Foreground), + (RejoinRemoteProjectsResponse, Foreground), + (MultiLspQuery, Background), + (MultiLspQueryResponse, Background), + (DevServerProjectsUpdate, Foreground), + (ValidateDevServerProjectRequest, Background), + (DeleteDevServer, Foreground), + (DeleteDevServerProject, Foreground), + (RegenerateDevServerToken, Foreground), + (RegenerateDevServerTokenResponse, Foreground), + (RenameDevServer, Foreground), + (OpenNewBuffer, Foreground), + (RestartLanguageServers, Foreground), +); + +request_messages!( + (ApplyCodeAction, ApplyCodeActionResponse), + ( + ApplyCompletionAdditionalEdits, + ApplyCompletionAdditionalEditsResponse + ), + (Call, Ack), + (CancelCall, Ack), + (CopyProjectEntry, ProjectEntryResponse), + (CompleteWithLanguageModel, LanguageModelResponse), + (ComputeEmbeddings, ComputeEmbeddingsResponse), + (CountTokensWithLanguageModel, CountTokensResponse), + (CreateChannel, CreateChannelResponse), + (CreateProjectEntry, ProjectEntryResponse), + (CreateRoom, CreateRoomResponse), + (DeclineCall, Ack), + (DeleteChannel, Ack), + (DeleteProjectEntry, ProjectEntryResponse), + (ExpandProjectEntry, ExpandProjectEntryResponse), + (Follow, FollowResponse), + (FormatBuffers, FormatBuffersResponse), + (FuzzySearchUsers, UsersResponse), + (GetCachedEmbeddings, GetCachedEmbeddingsResponse), + (GetChannelMembers, GetChannelMembersResponse), + (GetChannelMessages, GetChannelMessagesResponse), + (GetChannelMessagesById, GetChannelMessagesResponse), + (GetCodeActions, GetCodeActionsResponse), + (GetCompletions, GetCompletionsResponse), + (GetDefinition, GetDefinitionResponse), + (GetImplementation, GetImplementationResponse), + (GetDocumentHighlights, GetDocumentHighlightsResponse), + (GetHover, GetHoverResponse), + (GetNotifications, GetNotificationsResponse), + (GetPrivateUserInfo, GetPrivateUserInfoResponse), + (GetProjectSymbols, GetProjectSymbolsResponse), + (GetReferences, GetReferencesResponse), + (GetSupermavenApiKey, GetSupermavenApiKeyResponse), + (GetTypeDefinition, GetTypeDefinitionResponse), + (GetUsers, UsersResponse), + (IncomingCall, Ack), + (InlayHints, InlayHintsResponse), + (InviteChannelMember, Ack), + (JoinChannel, JoinRoomResponse), + (JoinChannelBuffer, JoinChannelBufferResponse), + (JoinChannelChat, JoinChannelChatResponse), + (JoinHostedProject, JoinProjectResponse), + (JoinProject, JoinProjectResponse), + (JoinRoom, JoinRoomResponse), + (LeaveChannelBuffer, Ack), + (LeaveRoom, Ack), + (MarkNotificationRead, Ack), + (MoveChannel, Ack), + (OnTypeFormatting, OnTypeFormattingResponse), + (OpenBufferById, OpenBufferResponse), + (OpenBufferByPath, OpenBufferResponse), + (OpenBufferForSymbol, OpenBufferForSymbolResponse), + (OpenNewBuffer, OpenBufferResponse), + (PerformRename, PerformRenameResponse), + (Ping, Ack), + (PrepareRename, PrepareRenameResponse), + (RefreshInlayHints, Ack), + (RejoinChannelBuffers, RejoinChannelBuffersResponse), + (RejoinRoom, RejoinRoomResponse), + (ReloadBuffers, ReloadBuffersResponse), + (RemoveChannelMember, Ack), + (RemoveChannelMessage, Ack), + (UpdateChannelMessage, Ack), + (RemoveContact, Ack), + (RenameChannel, RenameChannelResponse), + (RenameProjectEntry, ProjectEntryResponse), + (RequestContact, Ack), + ( + ResolveCompletionDocumentation, + ResolveCompletionDocumentationResponse + ), + (ResolveInlayHint, ResolveInlayHintResponse), + (RespondToChannelInvite, Ack), + (RespondToContactRequest, Ack), + (SaveBuffer, BufferSaved), + (SearchProject, SearchProjectResponse), + (SendChannelMessage, SendChannelMessageResponse), + (SetChannelMemberRole, Ack), + (SetChannelVisibility, Ack), + (ShareProject, ShareProjectResponse), + (SynchronizeBuffers, SynchronizeBuffersResponse), + (TaskContextForLocation, TaskContext), + (TaskTemplates, TaskTemplatesResponse), + (Test, Test), + (UpdateBuffer, Ack), + (UpdateParticipantLocation, Ack), + (UpdateProject, Ack), + (UpdateWorktree, Ack), + (LspExtExpandMacro, LspExtExpandMacroResponse), + (SetRoomParticipantRole, Ack), + (BlameBuffer, BlameBufferResponse), + (CreateDevServerProject, CreateDevServerProjectResponse), + (CreateDevServer, CreateDevServerResponse), + (ShutdownDevServer, Ack), + (ShareDevServerProject, ShareProjectResponse), + (JoinDevServerProject, JoinProjectResponse), + (RejoinRemoteProjects, RejoinRemoteProjectsResponse), + (ReconnectDevServer, ReconnectDevServerResponse), + (ValidateDevServerProjectRequest, Ack), + (MultiLspQuery, MultiLspQueryResponse), + (DeleteDevServer, Ack), + (DeleteDevServerProject, Ack), + (RegenerateDevServerToken, RegenerateDevServerTokenResponse), + (RenameDevServer, Ack), + (RestartLanguageServers, Ack) +); + +entity_messages!( + {project_id, ShareProject}, + AddProjectCollaborator, + ApplyCodeAction, + ApplyCompletionAdditionalEdits, + BlameBuffer, + BufferReloaded, + BufferSaved, + CopyProjectEntry, + CreateBufferForPeer, + CreateProjectEntry, + DeleteProjectEntry, + ExpandProjectEntry, + FormatBuffers, + GetCodeActions, + GetCompletions, + GetDefinition, + GetImplementation, + GetDocumentHighlights, + GetHover, + GetProjectSymbols, + GetReferences, + GetTypeDefinition, + InlayHints, + JoinProject, + LeaveProject, + MultiLspQuery, + RestartLanguageServers, + OnTypeFormatting, + OpenNewBuffer, + OpenBufferById, + OpenBufferByPath, + OpenBufferForSymbol, + PerformRename, + PrepareRename, + RefreshInlayHints, + ReloadBuffers, + RemoveProjectCollaborator, + RenameProjectEntry, + ResolveCompletionDocumentation, + ResolveInlayHint, + SaveBuffer, + SearchProject, + StartLanguageServer, + SynchronizeBuffers, + TaskContextForLocation, + TaskTemplates, + UnshareProject, + UpdateBuffer, + UpdateBufferFile, + UpdateDiagnosticSummary, + UpdateDiffBase, + UpdateLanguageServer, + UpdateProject, + UpdateProjectCollaborator, + UpdateWorktree, + UpdateWorktreeSettings, + LspExtExpandMacro, +); + +entity_messages!( + {channel_id, Channel}, + ChannelMessageSent, + ChannelMessageUpdate, + RemoveChannelMessage, + UpdateChannelMessage, + UpdateChannelBuffer, + UpdateChannelBufferCollaborators, +); + +impl From for SystemTime { + fn from(val: Timestamp) -> Self { + UNIX_EPOCH + .checked_add(Duration::new(val.seconds, val.nanos)) + .unwrap() + } +} + +impl From for Timestamp { + fn from(time: SystemTime) -> Self { + let duration = time.duration_since(UNIX_EPOCH).unwrap(); + Self { + seconds: duration.as_secs(), + nanos: duration.subsec_nanos(), + } + } +} + +impl From for Nonce { + fn from(nonce: u128) -> Self { + let upper_half = (nonce >> 64) as u64; + let lower_half = nonce as u64; + Self { + upper_half, + lower_half, + } + } +} + +impl From for u128 { + fn from(nonce: Nonce) -> Self { + let upper_half = (nonce.upper_half as u128) << 64; + let lower_half = nonce.lower_half as u128; + upper_half | lower_half + } +} + +pub fn split_worktree_update( + mut message: UpdateWorktree, + max_chunk_size: usize, +) -> impl Iterator { + let mut done_files = false; + + let mut repository_map = message + .updated_repositories + .into_iter() + .map(|repo| (repo.work_directory_id, repo)) + .collect::>(); + + iter::from_fn(move || { + if done_files { + return None; + } + + let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size); + let updated_entries: Vec<_> = message + .updated_entries + .drain(..updated_entries_chunk_size) + .collect(); + + let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size); + let removed_entries = message + .removed_entries + .drain(..removed_entries_chunk_size) + .collect(); + + done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty(); + + let mut updated_repositories = Vec::new(); + + if !repository_map.is_empty() { + for entry in &updated_entries { + if let Some(repo) = repository_map.remove(&entry.id) { + updated_repositories.push(repo) + } + } + } + + let removed_repositories = if done_files { + mem::take(&mut message.removed_repositories) + } else { + Default::default() + }; + + if done_files { + updated_repositories.extend(mem::take(&mut repository_map).into_values()); + } + + Some(UpdateWorktree { + project_id: message.project_id, + worktree_id: message.worktree_id, + root_name: message.root_name.clone(), + abs_path: message.abs_path.clone(), + updated_entries, + removed_entries, + scan_id: message.scan_id, + is_last_update: done_files && message.is_last_update, + updated_repositories, + removed_repositories, + }) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_converting_peer_id_from_and_to_u64() { + let peer_id = PeerId { + owner_id: 10, + id: 3, + }; + assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id); + let peer_id = PeerId { + owner_id: u32::MAX, + id: 3, + }; + assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id); + let peer_id = PeerId { + owner_id: 10, + id: u32::MAX, + }; + assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id); + let peer_id = PeerId { + owner_id: u32::MAX, + id: u32::MAX, + }; + assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id); + } +} diff --git a/crates/proto/src/typed_envelope.rs b/crates/proto/src/typed_envelope.rs new file mode 100644 index 0000000000..e45f17f548 --- /dev/null +++ b/crates/proto/src/typed_envelope.rs @@ -0,0 +1,43 @@ +use crate::{PeerId, RequestMessage}; +use anyhow::{anyhow, Result}; +use std::{marker::PhantomData, time::Instant}; + +pub struct Receipt { + pub sender_id: PeerId, + pub message_id: u32, + payload_type: PhantomData, +} + +impl Clone for Receipt { + fn clone(&self) -> Self { + *self + } +} + +impl Copy for Receipt {} + +#[derive(Clone, Debug)] +pub struct TypedEnvelope { + pub sender_id: PeerId, + pub original_sender_id: Option, + pub message_id: u32, + pub payload: T, + pub received_at: Instant, +} + +impl TypedEnvelope { + pub fn original_sender_id(&self) -> Result { + self.original_sender_id + .ok_or_else(|| anyhow!("missing original_sender_id")) + } +} + +impl TypedEnvelope { + pub fn receipt(&self) -> Receipt { + Receipt { + sender_id: self.sender_id, + message_id: self.message_id, + payload_type: PhantomData, + } + } +} diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml index b197073b7a..8fadc97b61 100644 --- a/crates/rpc/Cargo.toml +++ b/crates/rpc/Cargo.toml @@ -14,7 +14,7 @@ path = "src/rpc.rs" doctest = false [features] -test-support = ["collections/test-support", "gpui/test-support"] +test-support = ["collections/test-support", "gpui/test-support", "proto/test-support"] [dependencies] anyhow.workspace = true @@ -25,7 +25,7 @@ collections.workspace = true futures.workspace = true gpui = { workspace = true, optional = true } parking_lot.workspace = true -prost.workspace = true +proto.workspace = true rand.workspace = true rsa = "0.4" serde.workspace = true @@ -35,10 +35,8 @@ tracing = { version = "0.1.34", features = ["log"] } util.workspace = true zstd = "0.11" -[build-dependencies] -prost-build.workspace = true - [dev-dependencies] -collections.workspace = true +collections = { workspace = true, features = ["test-support"] } env_logger.workspace = true -gpui.workspace = true +gpui = { workspace = true, features = ["test-support"] } +proto = { workspace = true, features = ["test-support"] } diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index c90a1cb35c..0b4af7e9c5 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -1,7 +1,8 @@ -use crate::{ErrorCode, ErrorCodeExt, ErrorExt, RpcError}; - use super::{ - proto::{self, AnyTypedEnvelope, EnvelopedMessage, MessageStream, PeerId, RequestMessage}, + proto::{ + self, AnyTypedEnvelope, EnvelopedMessage, MessageStream, PeerId, Receipt, RequestMessage, + TypedEnvelope, + }, Connection, }; use anyhow::{anyhow, Context, Result}; @@ -12,11 +13,11 @@ use futures::{ FutureExt, SinkExt, Stream, StreamExt, TryFutureExt, }; use parking_lot::{Mutex, RwLock}; +use proto::{ErrorCode, ErrorCodeExt, ErrorExt, RpcError}; use serde::{ser::SerializeStruct, Serialize}; use std::{ fmt, future, future::Future, - marker::PhantomData, sync::atomic::Ordering::SeqCst, sync::{ atomic::{self, AtomicU32}, @@ -57,46 +58,6 @@ impl fmt::Display for ConnectionId { } } -pub struct Receipt { - pub sender_id: ConnectionId, - pub message_id: u32, - payload_type: PhantomData, -} - -impl Clone for Receipt { - fn clone(&self) -> Self { - *self - } -} - -impl Copy for Receipt {} - -#[derive(Clone, Debug)] -pub struct TypedEnvelope { - pub sender_id: ConnectionId, - pub original_sender_id: Option, - pub message_id: u32, - pub payload: T, - pub received_at: Instant, -} - -impl TypedEnvelope { - pub fn original_sender_id(&self) -> Result { - self.original_sender_id - .ok_or_else(|| anyhow!("missing original_sender_id")) - } -} - -impl TypedEnvelope { - pub fn receipt(&self) -> Receipt { - Receipt { - sender_id: self.sender_id, - message_id: self.message_id, - payload_type: PhantomData, - } - } -} - pub struct Peer { epoch: AtomicU32, pub connections: RwLock>, @@ -376,9 +337,12 @@ impl Peer { "incoming stream response: requester resumed" ); } else { - let message_type = - proto::build_typed_envelope(connection_id, received_at, incoming) - .map(|p| p.payload_type_name()); + let message_type = proto::build_typed_envelope( + connection_id.into(), + received_at, + incoming, + ) + .map(|p| p.payload_type_name()); tracing::warn!( %connection_id, message_id, @@ -391,16 +355,15 @@ impl Peer { None } else { tracing::trace!(%connection_id, message_id, "incoming message: received"); - proto::build_typed_envelope(connection_id, received_at, incoming).or_else( - || { + proto::build_typed_envelope(connection_id.into(), received_at, incoming) + .or_else(|| { tracing::error!( %connection_id, message_id, "unable to construct a typed envelope" ); None - }, - ) + }) } } }); @@ -475,7 +438,7 @@ impl Peer { let (response, received_at) = response.await?; Ok(TypedEnvelope { message_id: response.id, - sender_id: receiver_id, + sender_id: receiver_id.into(), original_sender_id: response.original_sender_id, payload: T::Response::from_envelope(response) .ok_or_else(|| anyhow!("received response of the wrong type"))?, @@ -619,7 +582,7 @@ impl Peer { receipt: Receipt, response: T::Response, ) -> Result<()> { - let connection = self.connection_state(receipt.sender_id)?; + let connection = self.connection_state(receipt.sender_id.into())?; let message_id = connection .next_message_id .fetch_add(1, atomic::Ordering::SeqCst); @@ -634,7 +597,7 @@ impl Peer { } pub fn end_stream(&self, receipt: Receipt) -> Result<()> { - let connection = self.connection_state(receipt.sender_id)?; + let connection = self.connection_state(receipt.sender_id.into())?; let message_id = connection .next_message_id .fetch_add(1, atomic::Ordering::SeqCst); @@ -656,7 +619,7 @@ impl Peer { receipt: Receipt, response: proto::Error, ) -> Result<()> { - let connection = self.connection_state(receipt.sender_id)?; + let connection = self.connection_state(receipt.sender_id.into())?; let message_id = connection .next_message_id .fetch_add(1, atomic::Ordering::SeqCst); @@ -674,7 +637,7 @@ impl Peer { &self, envelope: Box, ) -> Result<()> { - let connection = self.connection_state(envelope.sender_id())?; + let connection = self.connection_state(envelope.sender_id().into())?; let response = ErrorCode::Internal .message(format!( "message {} was not handled", @@ -717,7 +680,6 @@ impl Serialize for Peer { #[cfg(test)] mod tests { use super::*; - use crate::TypedEnvelope; use async_tungstenite::tungstenite::Message as WebSocketMessage; use gpui::TestAppContext; diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index 4844ea6aba..28963991d1 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -1,520 +1,11 @@ #![allow(non_snake_case)] -use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope}; -use anyhow::{anyhow, Result}; +use anyhow::anyhow; use async_tungstenite::tungstenite::Message as WebSocketMessage; -use collections::HashMap; use futures::{SinkExt as _, StreamExt as _}; -use prost::Message as _; -use serde::Serialize; -use std::any::{Any, TypeId}; +pub use proto::{Message as _, *}; use std::time::Instant; -use std::{ - cmp, - fmt::Debug, - io, iter, - time::{Duration, SystemTime, UNIX_EPOCH}, -}; -use std::{fmt, mem}; - -include!(concat!(env!("OUT_DIR"), "/zed.messages.rs")); - -pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static { - const NAME: &'static str; - const PRIORITY: MessagePriority; - fn into_envelope( - self, - id: u32, - responding_to: Option, - original_sender_id: Option, - ) -> Envelope; - fn from_envelope(envelope: Envelope) -> Option; -} - -pub trait EntityMessage: EnvelopedMessage { - type Entity; - fn remote_entity_id(&self) -> u64; -} - -pub trait RequestMessage: EnvelopedMessage { - type Response: EnvelopedMessage; -} - -pub trait AnyTypedEnvelope: 'static + Send + Sync { - fn payload_type_id(&self) -> TypeId; - fn payload_type_name(&self) -> &'static str; - fn as_any(&self) -> &dyn Any; - fn into_any(self: Box) -> Box; - fn is_background(&self) -> bool; - fn original_sender_id(&self) -> Option; - fn sender_id(&self) -> ConnectionId; - fn message_id(&self) -> u32; -} - -pub enum MessagePriority { - Foreground, - Background, -} - -impl AnyTypedEnvelope for TypedEnvelope { - fn payload_type_id(&self) -> TypeId { - TypeId::of::() - } - - fn payload_type_name(&self) -> &'static str { - T::NAME - } - - fn as_any(&self) -> &dyn Any { - self - } - - fn into_any(self: Box) -> Box { - self - } - - fn is_background(&self) -> bool { - matches!(T::PRIORITY, MessagePriority::Background) - } - - fn original_sender_id(&self) -> Option { - self.original_sender_id - } - - fn sender_id(&self) -> ConnectionId { - self.sender_id - } - - fn message_id(&self) -> u32 { - self.message_id - } -} - -impl PeerId { - pub fn from_u64(peer_id: u64) -> Self { - let owner_id = (peer_id >> 32) as u32; - let id = peer_id as u32; - Self { owner_id, id } - } - - pub fn as_u64(self) -> u64 { - ((self.owner_id as u64) << 32) | (self.id as u64) - } -} - -impl Copy for PeerId {} - -impl Eq for PeerId {} - -impl Ord for PeerId { - fn cmp(&self, other: &Self) -> cmp::Ordering { - self.owner_id - .cmp(&other.owner_id) - .then_with(|| self.id.cmp(&other.id)) - } -} - -impl PartialOrd for PeerId { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl std::hash::Hash for PeerId { - fn hash(&self, state: &mut H) { - self.owner_id.hash(state); - self.id.hash(state); - } -} - -impl fmt::Display for PeerId { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}/{}", self.owner_id, self.id) - } -} - -messages!( - (Ack, Foreground), - (AckBufferOperation, Background), - (AckChannelMessage, Background), - (AddNotification, Foreground), - (AddProjectCollaborator, Foreground), - (ApplyCodeAction, Background), - (ApplyCodeActionResponse, Background), - (ApplyCompletionAdditionalEdits, Background), - (ApplyCompletionAdditionalEditsResponse, Background), - (BufferReloaded, Foreground), - (BufferSaved, Foreground), - (Call, Foreground), - (CallCanceled, Foreground), - (CancelCall, Foreground), - (ChannelMessageSent, Foreground), - (ChannelMessageUpdate, Foreground), - (CompleteWithLanguageModel, Background), - (ComputeEmbeddings, Background), - (ComputeEmbeddingsResponse, Background), - (CopyProjectEntry, Foreground), - (CountTokensWithLanguageModel, Background), - (CountTokensResponse, Background), - (CreateBufferForPeer, Foreground), - (CreateChannel, Foreground), - (CreateChannelResponse, Foreground), - (CreateProjectEntry, Foreground), - (CreateRoom, Foreground), - (CreateRoomResponse, Foreground), - (DeclineCall, Foreground), - (DeleteChannel, Foreground), - (DeleteNotification, Foreground), - (UpdateNotification, Foreground), - (DeleteProjectEntry, Foreground), - (EndStream, Foreground), - (Error, Foreground), - (ExpandProjectEntry, Foreground), - (ExpandProjectEntryResponse, Foreground), - (Follow, Foreground), - (FollowResponse, Foreground), - (FormatBuffers, Foreground), - (FormatBuffersResponse, Foreground), - (FuzzySearchUsers, Foreground), - (GetCachedEmbeddings, Background), - (GetCachedEmbeddingsResponse, Background), - (GetChannelMembers, Foreground), - (GetChannelMembersResponse, Foreground), - (GetChannelMessages, Background), - (GetChannelMessagesById, Background), - (GetChannelMessagesResponse, Background), - (GetCodeActions, Background), - (GetCodeActionsResponse, Background), - (GetCompletions, Background), - (GetCompletionsResponse, Background), - (GetDefinition, Background), - (GetDefinitionResponse, Background), - (GetDocumentHighlights, Background), - (GetDocumentHighlightsResponse, Background), - (GetHover, Background), - (GetHoverResponse, Background), - (GetNotifications, Foreground), - (GetNotificationsResponse, Foreground), - (GetPrivateUserInfo, Foreground), - (GetPrivateUserInfoResponse, Foreground), - (GetProjectSymbols, Background), - (GetProjectSymbolsResponse, Background), - (GetReferences, Background), - (GetReferencesResponse, Background), - (GetSupermavenApiKey, Background), - (GetSupermavenApiKeyResponse, Background), - (GetTypeDefinition, Background), - (GetTypeDefinitionResponse, Background), - (GetImplementation, Background), - (GetImplementationResponse, Background), - (GetUsers, Foreground), - (Hello, Foreground), - (IncomingCall, Foreground), - (InlayHints, Background), - (InlayHintsResponse, Background), - (InviteChannelMember, Foreground), - (JoinChannel, Foreground), - (JoinChannelBuffer, Foreground), - (JoinChannelBufferResponse, Foreground), - (JoinChannelChat, Foreground), - (JoinChannelChatResponse, Foreground), - (JoinProject, Foreground), - (JoinHostedProject, Foreground), - (JoinProjectResponse, Foreground), - (JoinRoom, Foreground), - (JoinRoomResponse, Foreground), - (LanguageModelResponse, Background), - (LeaveChannelBuffer, Background), - (LeaveChannelChat, Foreground), - (LeaveProject, Foreground), - (LeaveRoom, Foreground), - (MarkNotificationRead, Foreground), - (MoveChannel, Foreground), - (OnTypeFormatting, Background), - (OnTypeFormattingResponse, Background), - (OpenBufferById, Background), - (OpenBufferByPath, Background), - (OpenBufferForSymbol, Background), - (OpenBufferForSymbolResponse, Background), - (OpenBufferResponse, Background), - (PerformRename, Background), - (PerformRenameResponse, Background), - (Ping, Foreground), - (PrepareRename, Background), - (PrepareRenameResponse, Background), - (ProjectEntryResponse, Foreground), - (RefreshInlayHints, Foreground), - (RejoinChannelBuffers, Foreground), - (RejoinChannelBuffersResponse, Foreground), - (RejoinRoom, Foreground), - (RejoinRoomResponse, Foreground), - (ReloadBuffers, Foreground), - (ReloadBuffersResponse, Foreground), - (RemoveChannelMember, Foreground), - (RemoveChannelMessage, Foreground), - (UpdateChannelMessage, Foreground), - (RemoveContact, Foreground), - (RemoveProjectCollaborator, Foreground), - (RenameChannel, Foreground), - (RenameChannelResponse, Foreground), - (RenameProjectEntry, Foreground), - (RequestContact, Foreground), - (ResolveCompletionDocumentation, Background), - (ResolveCompletionDocumentationResponse, Background), - (ResolveInlayHint, Background), - (ResolveInlayHintResponse, Background), - (RespondToChannelInvite, Foreground), - (RespondToContactRequest, Foreground), - (RoomUpdated, Foreground), - (SaveBuffer, Foreground), - (SetChannelMemberRole, Foreground), - (SetChannelVisibility, Foreground), - (SearchProject, Background), - (SearchProjectResponse, Background), - (SendChannelMessage, Background), - (SendChannelMessageResponse, Background), - (ShareProject, Foreground), - (ShareProjectResponse, Foreground), - (ShowContacts, Foreground), - (StartLanguageServer, Foreground), - (SubscribeToChannels, Foreground), - (SynchronizeBuffers, Foreground), - (SynchronizeBuffersResponse, Foreground), - (TaskContextForLocation, Background), - (TaskContext, Background), - (TaskTemplates, Background), - (TaskTemplatesResponse, Background), - (Test, Foreground), - (Unfollow, Foreground), - (UnshareProject, Foreground), - (UpdateBuffer, Foreground), - (UpdateBufferFile, Foreground), - (UpdateChannelBuffer, Foreground), - (UpdateChannelBufferCollaborators, Foreground), - (UpdateChannels, Foreground), - (UpdateUserChannels, Foreground), - (UpdateContacts, Foreground), - (UpdateDiagnosticSummary, Foreground), - (UpdateDiffBase, Foreground), - (UpdateFollowers, Foreground), - (UpdateInviteInfo, Foreground), - (UpdateLanguageServer, Foreground), - (UpdateParticipantLocation, Foreground), - (UpdateProject, Foreground), - (UpdateProjectCollaborator, Foreground), - (UpdateWorktree, Foreground), - (UpdateWorktreeSettings, Foreground), - (UsersResponse, Foreground), - (LspExtExpandMacro, Background), - (LspExtExpandMacroResponse, Background), - (SetRoomParticipantRole, Foreground), - (BlameBuffer, Foreground), - (BlameBufferResponse, Foreground), - (CreateDevServerProject, Background), - (CreateDevServerProjectResponse, Foreground), - (CreateDevServer, Foreground), - (CreateDevServerResponse, Foreground), - (DevServerInstructions, Foreground), - (ShutdownDevServer, Foreground), - (ReconnectDevServer, Foreground), - (ReconnectDevServerResponse, Foreground), - (ShareDevServerProject, Foreground), - (JoinDevServerProject, Foreground), - (RejoinRemoteProjects, Foreground), - (RejoinRemoteProjectsResponse, Foreground), - (MultiLspQuery, Background), - (MultiLspQueryResponse, Background), - (DevServerProjectsUpdate, Foreground), - (ValidateDevServerProjectRequest, Background), - (DeleteDevServer, Foreground), - (DeleteDevServerProject, Foreground), - (RegenerateDevServerToken, Foreground), - (RegenerateDevServerTokenResponse, Foreground), - (RenameDevServer, Foreground), - (OpenNewBuffer, Foreground), - (RestartLanguageServers, Foreground), -); - -request_messages!( - (ApplyCodeAction, ApplyCodeActionResponse), - ( - ApplyCompletionAdditionalEdits, - ApplyCompletionAdditionalEditsResponse - ), - (Call, Ack), - (CancelCall, Ack), - (CopyProjectEntry, ProjectEntryResponse), - (CompleteWithLanguageModel, LanguageModelResponse), - (ComputeEmbeddings, ComputeEmbeddingsResponse), - (CountTokensWithLanguageModel, CountTokensResponse), - (CreateChannel, CreateChannelResponse), - (CreateProjectEntry, ProjectEntryResponse), - (CreateRoom, CreateRoomResponse), - (DeclineCall, Ack), - (DeleteChannel, Ack), - (DeleteProjectEntry, ProjectEntryResponse), - (ExpandProjectEntry, ExpandProjectEntryResponse), - (Follow, FollowResponse), - (FormatBuffers, FormatBuffersResponse), - (FuzzySearchUsers, UsersResponse), - (GetCachedEmbeddings, GetCachedEmbeddingsResponse), - (GetChannelMembers, GetChannelMembersResponse), - (GetChannelMessages, GetChannelMessagesResponse), - (GetChannelMessagesById, GetChannelMessagesResponse), - (GetCodeActions, GetCodeActionsResponse), - (GetCompletions, GetCompletionsResponse), - (GetDefinition, GetDefinitionResponse), - (GetImplementation, GetImplementationResponse), - (GetDocumentHighlights, GetDocumentHighlightsResponse), - (GetHover, GetHoverResponse), - (GetNotifications, GetNotificationsResponse), - (GetPrivateUserInfo, GetPrivateUserInfoResponse), - (GetProjectSymbols, GetProjectSymbolsResponse), - (GetReferences, GetReferencesResponse), - (GetSupermavenApiKey, GetSupermavenApiKeyResponse), - (GetTypeDefinition, GetTypeDefinitionResponse), - (GetUsers, UsersResponse), - (IncomingCall, Ack), - (InlayHints, InlayHintsResponse), - (InviteChannelMember, Ack), - (JoinChannel, JoinRoomResponse), - (JoinChannelBuffer, JoinChannelBufferResponse), - (JoinChannelChat, JoinChannelChatResponse), - (JoinHostedProject, JoinProjectResponse), - (JoinProject, JoinProjectResponse), - (JoinRoom, JoinRoomResponse), - (LeaveChannelBuffer, Ack), - (LeaveRoom, Ack), - (MarkNotificationRead, Ack), - (MoveChannel, Ack), - (OnTypeFormatting, OnTypeFormattingResponse), - (OpenBufferById, OpenBufferResponse), - (OpenBufferByPath, OpenBufferResponse), - (OpenBufferForSymbol, OpenBufferForSymbolResponse), - (OpenNewBuffer, OpenBufferResponse), - (PerformRename, PerformRenameResponse), - (Ping, Ack), - (PrepareRename, PrepareRenameResponse), - (RefreshInlayHints, Ack), - (RejoinChannelBuffers, RejoinChannelBuffersResponse), - (RejoinRoom, RejoinRoomResponse), - (ReloadBuffers, ReloadBuffersResponse), - (RemoveChannelMember, Ack), - (RemoveChannelMessage, Ack), - (UpdateChannelMessage, Ack), - (RemoveContact, Ack), - (RenameChannel, RenameChannelResponse), - (RenameProjectEntry, ProjectEntryResponse), - (RequestContact, Ack), - ( - ResolveCompletionDocumentation, - ResolveCompletionDocumentationResponse - ), - (ResolveInlayHint, ResolveInlayHintResponse), - (RespondToChannelInvite, Ack), - (RespondToContactRequest, Ack), - (SaveBuffer, BufferSaved), - (SearchProject, SearchProjectResponse), - (SendChannelMessage, SendChannelMessageResponse), - (SetChannelMemberRole, Ack), - (SetChannelVisibility, Ack), - (ShareProject, ShareProjectResponse), - (SynchronizeBuffers, SynchronizeBuffersResponse), - (TaskContextForLocation, TaskContext), - (TaskTemplates, TaskTemplatesResponse), - (Test, Test), - (UpdateBuffer, Ack), - (UpdateParticipantLocation, Ack), - (UpdateProject, Ack), - (UpdateWorktree, Ack), - (LspExtExpandMacro, LspExtExpandMacroResponse), - (SetRoomParticipantRole, Ack), - (BlameBuffer, BlameBufferResponse), - (CreateDevServerProject, CreateDevServerProjectResponse), - (CreateDevServer, CreateDevServerResponse), - (ShutdownDevServer, Ack), - (ShareDevServerProject, ShareProjectResponse), - (JoinDevServerProject, JoinProjectResponse), - (RejoinRemoteProjects, RejoinRemoteProjectsResponse), - (ReconnectDevServer, ReconnectDevServerResponse), - (ValidateDevServerProjectRequest, Ack), - (MultiLspQuery, MultiLspQueryResponse), - (DeleteDevServer, Ack), - (DeleteDevServerProject, Ack), - (RegenerateDevServerToken, RegenerateDevServerTokenResponse), - (RenameDevServer, Ack), - (RestartLanguageServers, Ack) -); - -entity_messages!( - {project_id, ShareProject}, - AddProjectCollaborator, - ApplyCodeAction, - ApplyCompletionAdditionalEdits, - BlameBuffer, - BufferReloaded, - BufferSaved, - CopyProjectEntry, - CreateBufferForPeer, - CreateProjectEntry, - DeleteProjectEntry, - ExpandProjectEntry, - FormatBuffers, - GetCodeActions, - GetCompletions, - GetDefinition, - GetImplementation, - GetDocumentHighlights, - GetHover, - GetProjectSymbols, - GetReferences, - GetTypeDefinition, - InlayHints, - JoinProject, - LeaveProject, - MultiLspQuery, - RestartLanguageServers, - OnTypeFormatting, - OpenNewBuffer, - OpenBufferById, - OpenBufferByPath, - OpenBufferForSymbol, - PerformRename, - PrepareRename, - RefreshInlayHints, - ReloadBuffers, - RemoveProjectCollaborator, - RenameProjectEntry, - ResolveCompletionDocumentation, - ResolveInlayHint, - SaveBuffer, - SearchProject, - StartLanguageServer, - SynchronizeBuffers, - TaskContextForLocation, - TaskTemplates, - UnshareProject, - UpdateBuffer, - UpdateBufferFile, - UpdateDiagnosticSummary, - UpdateDiffBase, - UpdateLanguageServer, - UpdateProject, - UpdateProjectCollaborator, - UpdateWorktree, - UpdateWorktreeSettings, - LspExtExpandMacro, -); - -entity_messages!( - {channel_id, Channel}, - ChannelMessageSent, - ChannelMessageUpdate, - RemoveChannelMessage, - UpdateChannelMessage, - UpdateChannelBuffer, - UpdateChannelBufferCollaborators, -); +use std::{fmt::Debug, io}; const KIB: usize = 1024; const MIB: usize = KIB * 1024; @@ -615,109 +106,6 @@ where } } -impl From for SystemTime { - fn from(val: Timestamp) -> Self { - UNIX_EPOCH - .checked_add(Duration::new(val.seconds, val.nanos)) - .unwrap() - } -} - -impl From for Timestamp { - fn from(time: SystemTime) -> Self { - let duration = time.duration_since(UNIX_EPOCH).unwrap(); - Self { - seconds: duration.as_secs(), - nanos: duration.subsec_nanos(), - } - } -} - -impl From for Nonce { - fn from(nonce: u128) -> Self { - let upper_half = (nonce >> 64) as u64; - let lower_half = nonce as u64; - Self { - upper_half, - lower_half, - } - } -} - -impl From for u128 { - fn from(nonce: Nonce) -> Self { - let upper_half = (nonce.upper_half as u128) << 64; - let lower_half = nonce.lower_half as u128; - upper_half | lower_half - } -} - -pub fn split_worktree_update( - mut message: UpdateWorktree, - max_chunk_size: usize, -) -> impl Iterator { - let mut done_files = false; - - let mut repository_map = message - .updated_repositories - .into_iter() - .map(|repo| (repo.work_directory_id, repo)) - .collect::>(); - - iter::from_fn(move || { - if done_files { - return None; - } - - let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size); - let updated_entries: Vec<_> = message - .updated_entries - .drain(..updated_entries_chunk_size) - .collect(); - - let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size); - let removed_entries = message - .removed_entries - .drain(..removed_entries_chunk_size) - .collect(); - - done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty(); - - let mut updated_repositories = Vec::new(); - - if !repository_map.is_empty() { - for entry in &updated_entries { - if let Some(repo) = repository_map.remove(&entry.id) { - updated_repositories.push(repo) - } - } - } - - let removed_repositories = if done_files { - mem::take(&mut message.removed_repositories) - } else { - Default::default() - }; - - if done_files { - updated_repositories.extend(mem::take(&mut repository_map).into_values()); - } - - Some(UpdateWorktree { - project_id: message.project_id, - worktree_id: message.worktree_id, - root_name: message.root_name.clone(), - abs_path: message.abs_path.clone(), - updated_entries, - removed_entries, - scan_id: message.scan_id, - is_last_update: done_files && message.is_last_update, - updated_repositories, - removed_repositories, - }) - }) -} - #[cfg(test)] mod tests { use super::*; @@ -753,28 +141,4 @@ mod tests { stream.read().await.unwrap(); assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN); } - - #[gpui::test] - fn test_converting_peer_id_from_and_to_u64() { - let peer_id = PeerId { - owner_id: 10, - id: 3, - }; - assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id); - let peer_id = PeerId { - owner_id: u32::MAX, - id: 3, - }; - assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id); - let peer_id = PeerId { - owner_id: 10, - id: u32::MAX, - }; - assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id); - let peer_id = PeerId { - owner_id: u32::MAX, - id: u32::MAX, - }; - assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id); - } } diff --git a/crates/rpc/src/rpc.rs b/crates/rpc/src/rpc.rs index 880102e8d3..b8741fd805 100644 --- a/crates/rpc/src/rpc.rs +++ b/crates/rpc/src/rpc.rs @@ -1,16 +1,15 @@ pub mod auth; mod conn; -mod error; mod extension; mod notification; mod peer; pub mod proto; pub use conn::Connection; -pub use error::*; pub use extension::*; pub use notification::*; pub use peer::*; +pub use proto::{error::*, Receipt, TypedEnvelope}; mod macros; pub const PROTOCOL_VERSION: u32 = 68;