diff --git a/crates/channel/src/channel.rs b/crates/channel/src/channel.rs index 67c560a1fc..15631b7dd3 100644 --- a/crates/channel/src/channel.rs +++ b/crates/channel/src/channel.rs @@ -1,7 +1,14 @@ mod channel_store; pub mod channel_buffer; +use std::sync::Arc; + pub use channel_store::*; +use client::Client; #[cfg(test)] mod channel_store_tests; + +pub fn init(client: &Arc) { + channel_buffer::init(client); +} diff --git a/crates/channel/src/channel_buffer.rs b/crates/channel/src/channel_buffer.rs index d88810ff56..a59fec1553 100644 --- a/crates/channel/src/channel_buffer.rs +++ b/crates/channel/src/channel_buffer.rs @@ -6,30 +6,34 @@ use rpc::{proto, TypedEnvelope}; use std::sync::Arc; use util::ResultExt; -// Open the channel document -// ChannelDocumentView { ChannelDocument, Editor } -> On clone, clones internal ChannelDocument handle, instantiates new editor -// Produces a view which is: (ChannelDocument, Editor), ChannelDocument manages subscriptions -// ChannelDocuments -> Buffers -> Editor with that buffer - -// ChannelDocuments { -// ChannleBuffers: HashMap> -// } - -type BufferId = u64; +pub(crate) fn init(client: &Arc) { + client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer); + client.add_model_message_handler(ChannelBuffer::handle_add_channel_buffer_collaborator); + client.add_model_message_handler(ChannelBuffer::handle_remove_channel_buffer_collaborator); +} pub struct ChannelBuffer { channel_id: ChannelId, - buffer_id: BufferId, + collaborators: Vec, buffer: ModelHandle, client: Arc, + _subscription: client::Subscription, } impl Entity for ChannelBuffer { type Event = (); + + fn release(&mut self, _: &mut AppContext) { + self.client + .send(proto::LeaveChannelBuffer { + channel_id: self.channel_id, + }) + .log_err(); + } } impl ChannelBuffer { - pub fn for_channel( + pub fn join_channel( channel_id: ChannelId, client: Arc, cx: &mut AppContext, @@ -45,19 +49,24 @@ impl ChannelBuffer { .into_iter() .map(language::proto::deserialize_operation) .collect::, _>>()?; - let buffer_id = response.buffer_id; - let buffer = cx.add_model(|cx| language::Buffer::new(0, base_text, cx)); + let collaborators = response.collaborators; + + let buffer = + cx.add_model(|cx| language::Buffer::new(response.replica_id as u16, base_text, cx)); buffer.update(&mut cx, |buffer, cx| buffer.apply_ops(operations, cx))?; + let subscription = client.subscribe_to_entity(channel_id)?; + anyhow::Ok(cx.add_model(|cx| { cx.subscribe(&buffer, Self::on_buffer_update).detach(); - client.add_model_message_handler(Self::handle_update_channel_buffer); + Self { - buffer_id, buffer, client, channel_id, + collaborators, + _subscription: subscription.set_model(&cx.handle(), &mut cx.to_async()), } })) }) @@ -77,6 +86,7 @@ impl ChannelBuffer { .collect::, _>>()?; this.update(&mut cx, |this, cx| { + cx.notify(); this.buffer .update(cx, |buffer, cx| buffer.apply_ops(ops, cx)) })?; @@ -84,6 +94,49 @@ impl ChannelBuffer { Ok(()) } + async fn handle_add_channel_buffer_collaborator( + this: ModelHandle, + envelope: TypedEnvelope, + _: Arc, + mut cx: AsyncAppContext, + ) -> Result<()> { + let collaborator = envelope.payload.collaborator.ok_or_else(|| { + anyhow::anyhow!( + "Should have gotten a collaborator in the AddChannelBufferCollaborator message" + ) + })?; + + this.update(&mut cx, |this, cx| { + this.collaborators.push(collaborator); + cx.notify(); + }); + + Ok(()) + } + + async fn handle_remove_channel_buffer_collaborator( + this: ModelHandle, + message: TypedEnvelope, + _: Arc, + mut cx: AsyncAppContext, + ) -> Result<()> { + this.update(&mut cx, |this, cx| { + this.collaborators.retain(|collaborator| { + if collaborator.peer_id == message.payload.peer_id { + this.buffer.update(cx, |buffer, cx| { + buffer.remove_peer(collaborator.replica_id as u16, cx) + }); + false + } else { + true + } + }); + cx.notify(); + }); + + Ok(()) + } + fn on_buffer_update( &mut self, _: ModelHandle, @@ -94,7 +147,7 @@ impl ChannelBuffer { let operation = language::proto::serialize_operation(operation); self.client .send(proto::UpdateChannelBuffer { - buffer_id: self.buffer_id, + channel_id: self.channel_id, operations: vec![operation], }) .log_err(); @@ -104,4 +157,8 @@ impl ChannelBuffer { pub fn buffer(&self) -> ModelHandle { self.buffer.clone() } + + pub fn collaborators(&self) -> &[proto::Collaborator] { + &self.collaborators + } } diff --git a/crates/collab/src/db/queries/buffers.rs b/crates/collab/src/db/queries/buffers.rs index 473dd1afe9..7f0e5a75f0 100644 --- a/crates/collab/src/db/queries/buffers.rs +++ b/crates/collab/src/db/queries/buffers.rs @@ -11,7 +11,6 @@ impl Database { self.transaction(|tx| async move { let tx = tx; - // Get or create buffer from channel self.check_user_is_channel_member(channel_id, user_id, &tx) .await?; @@ -116,6 +115,7 @@ impl Database { Ok(proto::JoinChannelBufferResponse { buffer_id: buffer.id.to_proto(), + replica_id: replica_id.to_proto() as u32, base_text, operations, collaborators: collaborators @@ -137,67 +137,128 @@ impl Database { connection: ConnectionId, ) -> Result> { self.transaction(|tx| async move { - let result = channel_buffer_collaborator::Entity::delete_many() - .filter( - Condition::all() - .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)) - .add( - channel_buffer_collaborator::Column::ConnectionId - .eq(connection.id as i32), - ) - .add( - channel_buffer_collaborator::Column::ConnectionServerId - .eq(connection.owner_id as i32), - ), - ) - .exec(&*tx) - .await?; - if result.rows_affected == 0 { - Err(anyhow!("not a collaborator on this project"))?; - } - - let mut connections = Vec::new(); - let mut rows = channel_buffer_collaborator::Entity::find() - .filter( - Condition::all() - .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)), - ) - .stream(&*tx) - .await?; - while let Some(row) = rows.next().await { - let row = row?; - connections.push(ConnectionId { - id: row.connection_id as u32, - owner_id: row.connection_server_id.0 as u32, - }); - } - - Ok(connections) + self.leave_channel_buffer_internal(channel_id, connection, &*tx) + .await }) .await } + pub async fn leave_channel_buffer_internal( + &self, + channel_id: ChannelId, + connection: ConnectionId, + tx: &DatabaseTransaction, + ) -> Result> { + let result = channel_buffer_collaborator::Entity::delete_many() + .filter( + Condition::all() + .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)) + .add(channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32)) + .add( + channel_buffer_collaborator::Column::ConnectionServerId + .eq(connection.owner_id as i32), + ), + ) + .exec(&*tx) + .await?; + if result.rows_affected == 0 { + Err(anyhow!("not a collaborator on this project"))?; + } + + let mut connections = Vec::new(); + let mut rows = channel_buffer_collaborator::Entity::find() + .filter( + Condition::all().add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)), + ) + .stream(&*tx) + .await?; + while let Some(row) = rows.next().await { + let row = row?; + connections.push(ConnectionId { + id: row.connection_id as u32, + owner_id: row.connection_server_id.0 as u32, + }); + } + + Ok(connections) + } + pub async fn leave_channel_buffers( &self, connection: ConnectionId, - ) -> Result> { - // + ) -> Result)>> { + self.transaction(|tx| async move { + #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)] + enum QueryChannelIds { + ChannelId, + } + + let channel_ids: Vec = channel_buffer_collaborator::Entity::find() + .select_only() + .column(channel_buffer_collaborator::Column::ChannelId) + .filter(Condition::all().add( + channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32), + )) + .into_values::<_, QueryChannelIds>() + .all(&*tx) + .await?; + + let mut result = Vec::new(); + for channel_id in channel_ids { + let collaborators = self + .leave_channel_buffer_internal(channel_id, connection, &*tx) + .await?; + result.push((channel_id, collaborators)); + } + + Ok(result) + }) + .await } - pub async fn get_channel_buffer_collaborators(&self, channel_id: ChannelId) -> Result<()> { - todo!() + #[cfg(debug_assertions)] + pub async fn get_channel_buffer_collaborators( + &self, + channel_id: ChannelId, + ) -> Result> { + self.transaction(|tx| async move { + #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)] + enum QueryUserIds { + UserId, + } + + let users: Vec = channel_buffer_collaborator::Entity::find() + .select_only() + .column(channel_buffer_collaborator::Column::UserId) + .filter( + Condition::all() + .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)), + ) + .into_values::<_, QueryUserIds>() + .all(&*tx) + .await?; + + Ok(users) + }) + .await } pub async fn update_channel_buffer( &self, - buffer_id: BufferId, + channel_id: ChannelId, + user: UserId, operations: &[proto::Operation], - ) -> Result<()> { + ) -> Result> { self.transaction(|tx| async move { - let buffer = buffer::Entity::find_by_id(buffer_id) + self.check_user_is_channel_member(channel_id, user, &*tx) + .await?; + + let buffer = buffer::Entity::find() + .filter(buffer::Column::ChannelId.eq(channel_id)) .one(&*tx) .await? .ok_or_else(|| anyhow!("no such buffer"))?; + let buffer_id = buffer.id; buffer_operation::Entity::insert_many(operations.iter().filter_map(|operation| { match operation.variant.as_ref()? { proto::operation::Variant::Edit(operation) => { @@ -237,7 +298,23 @@ impl Database { .exec(&*tx) .await?; - Ok(()) + let mut connections = Vec::new(); + let mut rows = channel_buffer_collaborator::Entity::find() + .filter( + Condition::all() + .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)), + ) + .stream(&*tx) + .await?; + while let Some(row) = rows.next().await { + let row = row?; + connections.push(ConnectionId { + id: row.connection_id as u32, + owner_id: row.connection_server_id.0 as u32, + }); + } + + Ok(connections) }) .await } diff --git a/crates/collab/src/db/tests/buffer_tests.rs b/crates/collab/src/db/tests/buffer_tests.rs index c25071e1a2..08252e382e 100644 --- a/crates/collab/src/db/tests/buffer_tests.rs +++ b/crates/collab/src/db/tests/buffer_tests.rs @@ -66,11 +66,10 @@ async fn test_channel_buffers(db: &Arc) { .unwrap(); let connection_id_a = ConnectionId { owner_id, id: 1 }; - let buffer_response_a = db + let _ = db .join_channel_buffer(zed_id, a_id, connection_id_a) .await .unwrap(); - let buffer_id = BufferId::from_proto(buffer_response_a.buffer_id); let mut buffer_a = Buffer::new(0, 0, "".to_string()); let mut operations = Vec::new(); @@ -85,7 +84,7 @@ async fn test_channel_buffers(db: &Arc) { .map(|op| proto::serialize_operation(&language::Operation::Buffer(op))) .collect::>(); - db.update_channel_buffer(buffer_id, &operations) + db.update_channel_buffer(zed_id, a_id, &operations) .await .unwrap(); @@ -115,7 +114,7 @@ async fn test_channel_buffers(db: &Arc) { .await .is_err()); - //Ensure that both collaborators have shown up + // Ensure that both collaborators have shown up assert_eq!( buffer_response_b.collaborators, &[ @@ -132,6 +131,10 @@ async fn test_channel_buffers(db: &Arc) { ] ); + // Ensure that get_channel_buffer_collaborators works + let zed_collaborats = db.get_channel_buffer_collaborators(zed_id).await.unwrap(); + assert_eq!(zed_collaborats, &[a_id, b_id]); + let collaborators = db .leave_channel_buffer(zed_id, connection_id_b) .await @@ -139,7 +142,18 @@ async fn test_channel_buffers(db: &Arc) { assert_eq!(collaborators, &[connection_id_a],); - db.connection_lost(connection_id_a).await.unwrap(); - // assert!() - // Test buffer epoch incrementing? + let cargo_id = db.create_root_channel("cargo", "2", a_id).await.unwrap(); + let _ = db + .join_channel_buffer(cargo_id, a_id, connection_id_a) + .await + .unwrap(); + + db.leave_channel_buffers(connection_id_a).await.unwrap(); + + let zed_collaborators = db.get_channel_buffer_collaborators(zed_id).await.unwrap(); + let cargo_collaborators = db.get_channel_buffer_collaborators(cargo_id).await.unwrap(); + assert_eq!(zed_collaborators, &[]); + assert_eq!(cargo_collaborators, &[]); + + // TODO: test buffer epoch incrementing } diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index da5e7e6398..2bd39c861d 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -2,10 +2,7 @@ mod connection_pool; use crate::{ auth, - db::{ - self, BufferId, ChannelId, ChannelsForUser, Database, ProjectId, RoomId, ServerId, User, - UserId, - }, + db::{self, ChannelId, ChannelsForUser, Database, ProjectId, RoomId, ServerId, User, UserId}, executor::Executor, AppState, Result, }; @@ -38,8 +35,8 @@ use lazy_static::lazy_static; use prometheus::{register_int_gauge, IntGauge}; use rpc::{ proto::{ - self, Ack, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, LiveKitConnectionInfo, - RequestMessage, + self, Ack, AddChannelBufferCollaborator, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, + LiveKitConnectionInfo, RequestMessage, }, Connection, ConnectionId, Peer, Receipt, TypedEnvelope, }; @@ -860,6 +857,7 @@ async fn connection_lost( futures::select_biased! { _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => { leave_room_for_session(&session).await.trace_err(); + leave_channel_buffers_for_session(&session).await.trace_err(); if !session .connection_pool() @@ -872,6 +870,8 @@ async fn connection_lost( } } update_user_contacts(session.user_id, &session).await?; + + } _ = teardown.changed().fuse() => {} } @@ -2496,8 +2496,51 @@ async fn join_channel_buffer( .join_channel_buffer(channel_id, session.user_id, session.connection_id) .await?; + let replica_id = open_response.replica_id; + let collaborators = open_response.collaborators.clone(); + response.send(open_response)?; + let update = AddChannelBufferCollaborator { + channel_id: channel_id.to_proto(), + collaborator: Some(proto::Collaborator { + user_id: session.user_id.to_proto(), + peer_id: Some(session.connection_id.into()), + replica_id, + }), + }; + channel_buffer_updated( + session.connection_id, + collaborators + .iter() + .filter_map(|collaborator| Some(collaborator.peer_id?.into())), + &update, + &session.peer, + ); + + Ok(()) +} + +async fn update_channel_buffer( + request: proto::UpdateChannelBuffer, + session: Session, +) -> Result<()> { + let db = session.db().await; + let channel_id = ChannelId::from_proto(request.channel_id); + + let collaborators = db + .update_channel_buffer(channel_id, session.user_id, &request.operations) + .await?; + + channel_buffer_updated( + session.connection_id, + collaborators, + &proto::UpdateChannelBuffer { + channel_id: channel_id.to_proto(), + operations: request.operations, + }, + &session.peer, + ); Ok(()) } @@ -2515,18 +2558,28 @@ async fn leave_channel_buffer( response.send(Ack {})?; + channel_buffer_updated( + session.connection_id, + collaborators_to_notify, + &proto::RemoveChannelBufferCollaborator { + channel_id: channel_id.to_proto(), + peer_id: Some(session.connection_id.into()), + }, + &session.peer, + ); + Ok(()) } -async fn update_channel_buffer( - request: proto::UpdateChannelBuffer, - session: Session, -) -> Result<()> { - let db = session.db().await; - - // TODO: Broadcast to buffer members - - Ok(()) +fn channel_buffer_updated( + sender_id: ConnectionId, + collaborators: impl IntoIterator, + message: &T, + peer: &Peer, +) { + broadcast(Some(sender_id), collaborators.into_iter(), |peer_id| { + peer.send(peer_id.into(), message.clone()) + }); } async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> Result<()> { @@ -2854,6 +2907,28 @@ async fn leave_room_for_session(session: &Session) -> Result<()> { Ok(()) } +async fn leave_channel_buffers_for_session(session: &Session) -> Result<()> { + let left_channel_buffers = session + .db() + .await + .leave_channel_buffers(session.connection_id) + .await?; + + for (channel_id, connections) in left_channel_buffers { + channel_buffer_updated( + session.connection_id, + connections, + &proto::RemoveChannelBufferCollaborator { + channel_id: channel_id.to_proto(), + peer_id: Some(session.connection_id.into()), + }, + &session.peer, + ); + } + + Ok(()) +} + fn project_left(project: &db::LeftProject, session: &Session) { for connection_id in &project.connection_ids { if project.host_user_id == session.user_id { diff --git a/crates/collab/src/tests.rs b/crates/collab/src/tests.rs index 831bccbb72..25f059c0aa 100644 --- a/crates/collab/src/tests.rs +++ b/crates/collab/src/tests.rs @@ -211,6 +211,7 @@ impl TestServer { workspace::init(app_state.clone(), cx); audio::init((), cx); call::init(client.clone(), user_store.clone(), cx); + channel::init(&client); }); client diff --git a/crates/collab/src/tests/channel_buffer_tests.rs b/crates/collab/src/tests/channel_buffer_tests.rs index c41f5de803..d9880496f6 100644 --- a/crates/collab/src/tests/channel_buffer_tests.rs +++ b/crates/collab/src/tests/channel_buffer_tests.rs @@ -1,11 +1,13 @@ -use crate::tests::TestServer; +use crate::{rpc::RECONNECT_TIMEOUT, tests::TestServer}; use channel::channel_buffer::ChannelBuffer; +use client::UserId; use gpui::{executor::Deterministic, ModelHandle, TestAppContext}; -use std::{ops::Range, sync::Arc}; +use rpc::{proto, RECEIVE_TIMEOUT}; +use std::sync::Arc; #[gpui::test] -async fn test_channel_buffers( +async fn test_core_channel_buffers( deterministic: Arc, cx_a: &mut TestAppContext, cx_b: &mut TestAppContext, @@ -19,60 +21,103 @@ async fn test_channel_buffers( .make_channel("zed", (&client_a, cx_a), &mut [(&client_b, cx_b)]) .await; + // Client A joins the channel buffer let channel_buffer_a = cx_a - .update(|cx| ChannelBuffer::for_channel(zed_id, client_a.client().to_owned(), cx)) + .update(|cx| ChannelBuffer::join_channel(zed_id, client_a.client().to_owned(), cx)) .await .unwrap(); + // Client A edits the buffer let buffer_a = channel_buffer_a.read_with(cx_a, |buffer, _| buffer.buffer()); - edit_channel_buffer(&buffer_a, cx_a, [(0..0, "hello world")]); - edit_channel_buffer(&buffer_a, cx_a, [(5..5, ", cruel")]); - edit_channel_buffer(&buffer_a, cx_a, [(0..5, "goodbye")]); - undo_channel_buffer(&buffer_a, cx_a); + buffer_a.update(cx_a, |buffer, cx| { + buffer.edit([(0..0, "hello world")], None, cx) + }); + buffer_a.update(cx_a, |buffer, cx| { + buffer.edit([(5..5, ", cruel")], None, cx) + }); + buffer_a.update(cx_a, |buffer, cx| { + buffer.edit([(0..5, "goodbye")], None, cx) + }); + buffer_a.update(cx_a, |buffer, cx| buffer.undo(cx)); + deterministic.run_until_parked(); - assert_eq!(channel_buffer_text(&buffer_a, cx_a), "hello, cruel world"); + assert_eq!(buffer_text(&buffer_a, cx_a), "hello, cruel world"); + // Client B joins the channel buffer let channel_buffer_b = cx_b - .update(|cx| ChannelBuffer::for_channel(zed_id, client_b.client().to_owned(), cx)) + .update(|cx| ChannelBuffer::join_channel(zed_id, client_b.client().to_owned(), cx)) .await .unwrap(); + channel_buffer_b.read_with(cx_b, |buffer, _| { + assert_collaborators( + buffer.collaborators(), + &[client_a.user_id(), client_b.user_id()], + ); + }); + + // Client B sees the correct text, and then edits it let buffer_b = channel_buffer_b.read_with(cx_b, |buffer, _| buffer.buffer()); + assert_eq!(buffer_text(&buffer_b, cx_b), "hello, cruel world"); + buffer_b.update(cx_b, |buffer, cx| { + buffer.edit([(7..12, "beautiful")], None, cx) + }); - assert_eq!(channel_buffer_text(&buffer_b, cx_b), "hello, cruel world"); - - edit_channel_buffer(&buffer_b, cx_b, [(7..12, "beautiful")]); + // Both A and B see the new edit + deterministic.run_until_parked(); + assert_eq!(buffer_text(&buffer_a, cx_a), "hello, beautiful world"); + assert_eq!(buffer_text(&buffer_b, cx_b), "hello, beautiful world"); + // Client A closes the channel buffer. + cx_a.update(|_| drop(channel_buffer_a)); deterministic.run_until_parked(); + // Client B sees that client A is gone from the channel buffer. + channel_buffer_b.read_with(cx_b, |buffer, _| { + assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]); + }); + + // Client A rejoins the channel buffer + let _channel_buffer_a = cx_a + .update(|cx| ChannelBuffer::join_channel(zed_id, client_a.client().to_owned(), cx)) + .await + .unwrap(); + deterministic.run_until_parked(); + + // Sanity test, make sure we saw A rejoining + channel_buffer_b.read_with(cx_b, |buffer, _| { + assert_collaborators( + &buffer.collaborators(), + &[client_b.user_id(), client_a.user_id()], + ); + }); + + // Client A loses connection. + server.forbid_connections(); + server.disconnect_client(client_a.peer_id().unwrap()); + deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); + + // Client B observes A disconnect + channel_buffer_b.read_with(cx_b, |buffer, _| { + assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]); + }); + + // TODO: + // - Test synchronizing offline updates, what happens to A's channel buffer? +} + +#[track_caller] +fn assert_collaborators(collaborators: &[proto::Collaborator], ids: &[Option]) { assert_eq!( - channel_buffer_text(&buffer_a, cx_a), - "hello, beautiful world" - ); - assert_eq!( - channel_buffer_text(&buffer_b, cx_b), - "hello, beautiful world" + collaborators + .into_iter() + .map(|collaborator| collaborator.user_id) + .collect::>(), + ids.into_iter().map(|id| id.unwrap()).collect::>() ); } -fn edit_channel_buffer( - channel_buffer: &ModelHandle, - cx: &mut TestAppContext, - edits: I, -) where - I: IntoIterator, &'static str)>, -{ - channel_buffer.update(cx, |buffer, cx| buffer.edit(edits, None, cx)); -} - -fn undo_channel_buffer(channel_buffer: &ModelHandle, cx: &mut TestAppContext) { - channel_buffer.update(cx, |buffer, cx| buffer.undo(cx)); -} - -fn channel_buffer_text( - channel_buffer: &ModelHandle, - cx: &mut TestAppContext, -) -> String { +fn buffer_text(channel_buffer: &ModelHandle, cx: &mut TestAppContext) -> String { channel_buffer.read_with(cx, |buffer, _| buffer.text()) } diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 88ad46abc7..b97feff06b 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -147,6 +147,8 @@ message Envelope { JoinChannelBufferResponse join_channel_buffer_response = 132; UpdateChannelBuffer update_channel_buffer = 133; LeaveChannelBuffer leave_channel_buffer = 134; + AddChannelBufferCollaborator add_channel_buffer_collaborator = 135; + RemoveChannelBufferCollaborator remove_channel_buffer_collaborator = 136; } } @@ -416,6 +418,16 @@ message RemoveProjectCollaborator { PeerId peer_id = 2; } +message AddChannelBufferCollaborator { + uint64 channel_id = 1; + Collaborator collaborator = 2; +} + +message RemoveChannelBufferCollaborator { + uint64 channel_id = 1; + PeerId peer_id = 2; +} + message GetDefinition { uint64 project_id = 1; uint64 buffer_id = 2; @@ -546,8 +558,8 @@ message UpdateBuffer { } message UpdateChannelBuffer { - uint64 buffer_id = 2; - repeated Operation operations = 3; + uint64 channel_id = 1; + repeated Operation operations = 2; } message UpdateBufferFile { @@ -964,9 +976,10 @@ message JoinChannelBuffer { message JoinChannelBufferResponse { uint64 buffer_id = 1; - string base_text = 2; - repeated Operation operations = 3; - repeated Collaborator collaborators = 4; + uint32 replica_id = 2; + string base_text = 3; + repeated Operation operations = 4; + repeated Collaborator collaborators = 5; } message LeaveChannelBuffer { diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index 68219d3ad8..f0f49c6230 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -252,7 +252,9 @@ messages!( (JoinChannelBuffer, Foreground), (JoinChannelBufferResponse, Foreground), (LeaveChannelBuffer, Background), - (UpdateChannelBuffer, Foreground) + (UpdateChannelBuffer, Foreground), + (RemoveChannelBufferCollaborator, Foreground), + (AddChannelBufferCollaborator, Foreground), ); request_messages!( @@ -376,7 +378,12 @@ entity_messages!( UpdateDiffBase ); -entity_messages!(buffer_id, UpdateChannelBuffer); +entity_messages!( + channel_id, + UpdateChannelBuffer, + RemoveChannelBufferCollaborator, + AddChannelBufferCollaborator +); const KIB: usize = 1024; const MIB: usize = KIB * 1024; diff --git a/crates/zed/src/main.rs b/crates/zed/src/main.rs index b905c1d37b..3b1fccb927 100644 --- a/crates/zed/src/main.rs +++ b/crates/zed/src/main.rs @@ -158,6 +158,7 @@ fn main() { outline::init(cx); project_symbols::init(cx); project_panel::init(Assets, cx); + channel::init(&client); diagnostics::init(cx); search::init(cx); semantic_index::init(fs.clone(), http.clone(), languages.clone(), cx);