From a6ffcdd0cf2d82f5106dcf24ff94bd3532ed9519 Mon Sep 17 00:00:00 2001 From: Nathan Sobo Date: Mon, 2 Jan 2023 20:12:00 -0700 Subject: [PATCH] Track open buffers when handling sync requests When a host sends a buffer to a guest for the first time, they record that they have done so in a set tied to that guest's peer id. When the guest reconnects and syncs buffers, they do so under a different peer id, so we need to be sure we track which buffers we have sent them to avoid sending them the same buffer twice, which violates the guest's assumptions. --- crates/fs/src/fs.rs | 4 ++-- crates/gpui/src/executor.rs | 11 ++++++----- crates/project/src/project.rs | 28 +++++++++++++++++++++------- 3 files changed, 29 insertions(+), 14 deletions(-) diff --git a/crates/fs/src/fs.rs b/crates/fs/src/fs.rs index d585195ab4..184b2f2e5a 100644 --- a/crates/fs/src/fs.rs +++ b/crates/fs/src/fs.rs @@ -23,7 +23,7 @@ use std::{ time::{Duration, SystemTime}, }; use tempfile::NamedTempFile; -use util::{post_inc, ResultExt}; +use util::ResultExt; #[cfg(any(test, feature = "test-support"))] use collections::{btree_map, BTreeMap}; @@ -840,7 +840,7 @@ impl Fs for FakeFs { let target = normalize_path(target); let mut state = self.state.lock().await; let mtime = state.next_mtime; - let inode = post_inc(&mut state.next_inode); + let inode = util::post_inc(&mut state.next_inode); state.next_mtime += Duration::from_nanos(1); let source_entry = state.read_path(&source).await?; let content = source_entry.lock().await.file_content(&source)?.clone(); diff --git a/crates/gpui/src/executor.rs b/crates/gpui/src/executor.rs index faf2a9729f..16afa987e9 100644 --- a/crates/gpui/src/executor.rs +++ b/crates/gpui/src/executor.rs @@ -4,7 +4,7 @@ use futures::channel::mpsc; use smol::{channel, prelude::*, Executor}; use std::{ any::Any, - fmt::{self, Display, Write as _}, + fmt::{self, Display}, marker::PhantomData, mem, pin::Pin, @@ -17,8 +17,7 @@ use std::{ use crate::{ platform::{self, Dispatcher}, - util::{self, CwdBacktrace}, - MutableAppContext, + util, MutableAppContext, }; pub enum Foreground { @@ -549,6 +548,8 @@ impl Future for Timer { #[cfg(any(test, feature = "test-support"))] impl DeterministicState { fn push_to_history(&mut self, event: ExecutorEvent) { + use std::fmt::Write as _; + self.poll_history.push(event); if let Some(prev_history) = &self.previous_poll_history { let ix = self.poll_history.len() - 1; @@ -560,7 +561,7 @@ impl DeterministicState { "current runnable backtrace:\n{:?}", self.runnable_backtraces.get_mut(&event.id()).map(|trace| { trace.resolve(); - CwdBacktrace(trace) + util::CwdBacktrace(trace) }) ) .unwrap(); @@ -571,7 +572,7 @@ impl DeterministicState { .get_mut(&prev_event.id()) .map(|trace| { trace.resolve(); - CwdBacktrace(trace) + util::CwdBacktrace(trace) }) ) .unwrap(); diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index d9dd42dae1..be6dc18b35 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -62,7 +62,7 @@ use std::{ time::Instant, }; use terminal::{Terminal, TerminalBuilder}; -use util::{defer, post_inc, ResultExt, TryFutureExt as _}; +use util::{debug_panic, defer, post_inc, ResultExt, TryFutureExt as _}; pub use fs::*; pub use worktree::*; @@ -1501,16 +1501,20 @@ impl Project { } Some(OpenBuffer::Weak(existing_handle)) => { if existing_handle.upgrade(cx).is_some() { + debug_panic!("already registered buffer with remote id {}", remote_id); Err(anyhow!( "already registered buffer with remote id {}", remote_id ))? } } - Some(OpenBuffer::Strong(_)) => Err(anyhow!( - "already registered buffer with remote id {}", - remote_id - ))?, + Some(OpenBuffer::Strong(_)) => { + debug_panic!("already registered buffer with remote id {}", remote_id); + Err(anyhow!( + "already registered buffer with remote id {}", + remote_id + ))? + } } cx.subscribe(buffer, |this, buffer, event, cx| { this.on_buffer_event(buffer, event, cx); @@ -5150,18 +5154,28 @@ impl Project { this: ModelHandle, envelope: TypedEnvelope, _: Arc, - cx: AsyncAppContext, + mut cx: AsyncAppContext, ) -> Result { let project_id = envelope.payload.project_id; let mut response = proto::SynchronizeBuffersResponse { buffers: Default::default(), }; - this.read_with(&cx, |this, cx| { + this.update(&mut cx, |this, cx| { + let Some(guest_id) = envelope.original_sender_id else { + log::error!("missing original_sender_id on SynchronizeBuffers request"); + return; + }; + for buffer in envelope.payload.buffers { let buffer_id = buffer.id; let remote_version = language::proto::deserialize_version(buffer.version); if let Some(buffer) = this.buffer_for_id(buffer_id, cx) { + this.shared_buffers + .entry(guest_id) + .or_default() + .insert(buffer_id); + let buffer = buffer.read(cx); response.buffers.push(proto::BufferVersion { id: buffer_id,