diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index efaaaea52e..cef4e6867c 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -1,6 +1,7 @@ use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope}; use anyhow::{anyhow, Result}; use async_tungstenite::tungstenite::Message as WebSocketMessage; +use collections::HashMap; use futures::{SinkExt as _, StreamExt as _}; use prost::Message as _; use serde::Serialize; @@ -485,11 +486,15 @@ pub fn split_worktree_update( max_chunk_size: usize, ) -> impl Iterator { let mut done_files = false; - let mut done_statuses = false; - let mut repository_index = 0; - let mut root_repo_found = false; + + let mut repository_map = message + .updated_repositories + .into_iter() + .map(|repo| (repo.work_directory_id, repo)) + .collect::>(); + iter::from_fn(move || { - if done_files && done_statuses { + if done_files { return None; } @@ -499,25 +504,6 @@ pub fn split_worktree_update( .drain(..updated_entries_chunk_size) .collect(); - let mut updated_repositories: Vec<_> = Default::default(); - - if !root_repo_found { - for entry in updated_entries.iter() { - if let Some(repo) = message.updated_repositories.get(0) { - if repo.work_directory_id == entry.id { - root_repo_found = true; - updated_repositories.push(RepositoryEntry { - work_directory_id: repo.work_directory_id, - branch: repo.branch.clone(), - removed_repo_paths: Default::default(), - updated_statuses: Default::default(), - }); - break; - } - } - } - } - let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size); let removed_entries = message .removed_entries @@ -526,64 +512,25 @@ pub fn split_worktree_update( done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty(); - // Wait to send repositories until after we've guaranteed that their associated entries - // will be read - if done_files { - let mut total_statuses = 0; - while total_statuses < max_chunk_size - && repository_index < message.updated_repositories.len() - { - let updated_statuses_chunk_size = cmp::min( - message.updated_repositories[repository_index] - .updated_statuses - .len(), - max_chunk_size - total_statuses, - ); + let mut updated_repositories = Vec::new(); - let updated_statuses: Vec<_> = message.updated_repositories[repository_index] - .updated_statuses - .drain(..updated_statuses_chunk_size) - .collect(); - - total_statuses += updated_statuses.len(); - - let done_this_repo = message.updated_repositories[repository_index] - .updated_statuses - .is_empty(); - - let removed_repo_paths = if done_this_repo { - mem::take( - &mut message.updated_repositories[repository_index].removed_repo_paths, - ) - } else { - Default::default() - }; - - updated_repositories.push(RepositoryEntry { - work_directory_id: message.updated_repositories[repository_index] - .work_directory_id, - branch: message.updated_repositories[repository_index] - .branch - .clone(), - updated_statuses, - removed_repo_paths, - }); - - if done_this_repo { - repository_index += 1; + if !repository_map.is_empty() { + for entry in &updated_entries { + if let Some(repo) = repository_map.remove(&entry.id) { + updated_repositories.push(repo) } } - } else { - Default::default() - }; + } - let removed_repositories = if done_files && done_statuses { + let removed_repositories = if done_files { mem::take(&mut message.removed_repositories) } else { Default::default() }; - done_statuses = repository_index >= message.updated_repositories.len(); + if done_files { + updated_repositories.extend(mem::take(&mut repository_map).into_values()); + } Some(UpdateWorktree { project_id: message.project_id,