Update contacts when peers join/leave and when project status changes

This commit is contained in:
Antonio Scandurra 2022-05-09 15:05:30 +02:00
parent 3319e0a613
commit 95d29c4a7b
2 changed files with 313 additions and 345 deletions

View File

@ -22,7 +22,7 @@ use axum::{
routing::get, routing::get,
Extension, Router, TypedHeader, Extension, Router, TypedHeader,
}; };
use collections::{HashMap, HashSet}; use collections::HashMap;
use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt}; use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use rpc::{ use rpc::{
@ -49,7 +49,7 @@ use tokio::{
time::Sleep, time::Sleep,
}; };
use tower::ServiceBuilder; use tower::ServiceBuilder;
use tracing::{info_span, instrument, Instrument}; use tracing::{info_span, Instrument};
type MessageHandler = type MessageHandler =
Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>; Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
@ -335,14 +335,10 @@ impl Server {
for (project_id, project) in removed_connection.hosted_projects { for (project_id, project) in removed_connection.hosted_projects {
if let Some(share) = project.share { if let Some(share) = project.share {
broadcast( broadcast(connection_id, share.guests.keys().copied(), |conn_id| {
connection_id, self.peer
share.guests.keys().copied().collect(), .send(conn_id, proto::UnshareProject { project_id })
|conn_id| { });
self.peer
.send(conn_id, proto::UnshareProject { project_id })
},
);
} }
} }
@ -363,14 +359,12 @@ impl Server {
.db .db
.get_contacts(removed_connection.user_id) .get_contacts(removed_connection.user_id)
.await?; .await?;
let mut update = proto::UpdateContacts::default();
update.contacts.push(proto::Contact {
user_id: removed_connection.user_id.to_proto(),
projects: Default::default(),
online: false,
});
let store = self.store().await; let store = self.store().await;
let mut update = proto::UpdateContacts::default();
update
.contacts
.push(store.contact_for_user(removed_connection.user_id));
for user_id in contacts_to_update.current { for user_id in contacts_to_update.current {
for connection_id in store.connection_ids_for_user(user_id) { for connection_id in store.connection_ids_for_user(user_id) {
self.peer.send(connection_id, update.clone()).trace_err(); self.peer.send(connection_id, update.clone()).trace_err();
@ -407,10 +401,13 @@ impl Server {
self: Arc<Server>, self: Arc<Server>,
request: TypedEnvelope<proto::UnregisterProject>, request: TypedEnvelope<proto::UnregisterProject>,
) -> Result<()> { ) -> Result<()> {
let mut state = self.store_mut().await; let user_id = {
let project = state.unregister_project(request.payload.project_id, request.sender_id)?; let mut state = self.store_mut().await;
// TODO state.unregister_project(request.payload.project_id, request.sender_id)?;
// self.update_contacts_for_users(&*state, &project.authorized_user_ids()); state.user_id_for_connection(request.sender_id)?
};
self.update_user_contacts(user_id).await?;
Ok(()) Ok(())
} }
@ -419,27 +416,55 @@ impl Server {
request: TypedEnvelope<proto::ShareProject>, request: TypedEnvelope<proto::ShareProject>,
response: Response<proto::ShareProject>, response: Response<proto::ShareProject>,
) -> Result<()> { ) -> Result<()> {
let mut state = self.store_mut().await; let user_id = {
let project = state.share_project(request.payload.project_id, request.sender_id)?; let mut state = self.store_mut().await;
// TODO state.share_project(request.payload.project_id, request.sender_id)?;
// self.update_contacts_for_users(&mut *state, &project.authorized_user_ids); state.user_id_for_connection(request.sender_id)?
};
self.update_user_contacts(user_id).await?;
response.send(proto::Ack {})?; response.send(proto::Ack {})?;
Ok(()) Ok(())
} }
async fn update_user_contacts(self: &Arc<Server>, user_id: UserId) -> Result<()> {
let contacts = self.app_state.db.get_contacts(user_id).await?;
let store = self.store().await;
let updated_contact = store.contact_for_user(user_id);
for contact_user_id in contacts.current {
for contact_conn_id in store.connection_ids_for_user(contact_user_id) {
self.peer
.send(
contact_conn_id,
proto::UpdateContacts {
contacts: vec![updated_contact.clone()],
remove_contacts: Default::default(),
incoming_requests: Default::default(),
remove_incoming_requests: Default::default(),
outgoing_requests: Default::default(),
remove_outgoing_requests: Default::default(),
},
)
.trace_err();
}
}
Ok(())
}
async fn unshare_project( async fn unshare_project(
self: Arc<Server>, self: Arc<Server>,
request: TypedEnvelope<proto::UnshareProject>, request: TypedEnvelope<proto::UnshareProject>,
) -> Result<()> { ) -> Result<()> {
let project_id = request.payload.project_id; let project_id = request.payload.project_id;
let mut state = self.store_mut().await; let project;
let project = state.unshare_project(project_id, request.sender_id)?; {
broadcast(request.sender_id, project.connection_ids, |conn_id| { let mut state = self.store_mut().await;
self.peer project = state.unshare_project(project_id, request.sender_id)?;
.send(conn_id, proto::UnshareProject { project_id }) broadcast(request.sender_id, project.connection_ids, |conn_id| {
}); self.peer
// TODO .send(conn_id, proto::UnshareProject { project_id })
// self.update_contacts_for_users(&mut *state, &project.authorized_user_ids); });
}
self.update_user_contacts(project.host_user_id).await?;
Ok(()) Ok(())
} }
@ -449,74 +474,74 @@ impl Server {
response: Response<proto::JoinProject>, response: Response<proto::JoinProject>,
) -> Result<()> { ) -> Result<()> {
let project_id = request.payload.project_id; let project_id = request.payload.project_id;
let response_payload;
let state = &mut *self.store_mut().await; let host_user_id;
let user_id = state.user_id_for_connection(request.sender_id)?; {
let (response_payload, connection_ids, contact_user_ids) = state let state = &mut *self.store_mut().await;
.join_project(request.sender_id, user_id, project_id) let user_id = state.user_id_for_connection(request.sender_id)?;
.and_then(|joined| { let joined = state.join_project(request.sender_id, user_id, project_id)?;
let share = joined.project.share()?; let share = joined.project.share()?;
let peer_count = share.guests.len(); let peer_count = share.guests.len();
let mut collaborators = Vec::with_capacity(peer_count); let mut collaborators = Vec::with_capacity(peer_count);
collaborators.push(proto::Collaborator { collaborators.push(proto::Collaborator {
peer_id: joined.project.host_connection_id.0, peer_id: joined.project.host_connection_id.0,
replica_id: 0, replica_id: 0,
user_id: joined.project.host_user_id.to_proto(), user_id: joined.project.host_user_id.to_proto(),
}); });
let worktrees = share let worktrees = share
.worktrees .worktrees
.iter() .iter()
.filter_map(|(id, shared_worktree)| { .filter_map(|(id, shared_worktree)| {
let worktree = joined.project.worktrees.get(&id)?; let worktree = joined.project.worktrees.get(&id)?;
Some(proto::Worktree { Some(proto::Worktree {
id: *id, id: *id,
root_name: worktree.root_name.clone(), root_name: worktree.root_name.clone(),
entries: shared_worktree.entries.values().cloned().collect(), entries: shared_worktree.entries.values().cloned().collect(),
diagnostic_summaries: shared_worktree diagnostic_summaries: shared_worktree
.diagnostic_summaries .diagnostic_summaries
.values() .values()
.cloned() .cloned()
.collect(), .collect(),
visible: worktree.visible, visible: worktree.visible,
scan_id: shared_worktree.scan_id, scan_id: shared_worktree.scan_id,
})
}) })
.collect(); })
for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests { .collect();
if *peer_conn_id != request.sender_id { for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
collaborators.push(proto::Collaborator { if *peer_conn_id != request.sender_id {
peer_id: peer_conn_id.0, collaborators.push(proto::Collaborator {
replica_id: *peer_replica_id as u32, peer_id: peer_conn_id.0,
user_id: peer_user_id.to_proto(), replica_id: *peer_replica_id as u32,
}); user_id: peer_user_id.to_proto(),
} });
} }
let response = proto::JoinProjectResponse { }
worktrees, response_payload = proto::JoinProjectResponse {
replica_id: joined.replica_id as u32, worktrees,
collaborators, replica_id: joined.replica_id as u32,
language_servers: joined.project.language_servers.clone(), collaborators,
}; language_servers: joined.project.language_servers.clone(),
let connection_ids = joined.project.connection_ids(); };
let contact_user_ids = joined.project.authorized_user_ids(); host_user_id = joined.project.host_user_id;
Ok((response, connection_ids, contact_user_ids)) broadcast(
})?; request.sender_id,
joined.project.connection_ids(),
broadcast(request.sender_id, connection_ids, |conn_id| { |conn_id| {
self.peer.send( self.peer.send(
conn_id, conn_id,
proto::AddProjectCollaborator { proto::AddProjectCollaborator {
project_id, project_id,
collaborator: Some(proto::Collaborator { collaborator: Some(proto::Collaborator {
peer_id: request.sender_id.0, peer_id: request.sender_id.0,
replica_id: response_payload.replica_id, replica_id: response_payload.replica_id,
user_id: user_id.to_proto(), user_id: user_id.to_proto(),
}), }),
},
)
}, },
) );
}); }
// TODO self.update_user_contacts(host_user_id).await?;
// self.update_contacts_for_users(state, &contact_user_ids);
response.send(response_payload)?; response.send(response_payload)?;
Ok(()) Ok(())
} }
@ -527,19 +552,21 @@ impl Server {
) -> Result<()> { ) -> Result<()> {
let sender_id = request.sender_id; let sender_id = request.sender_id;
let project_id = request.payload.project_id; let project_id = request.payload.project_id;
let mut state = self.store_mut().await; let project;
let worktree = state.leave_project(sender_id, project_id)?; {
broadcast(sender_id, worktree.connection_ids, |conn_id| { let mut state = self.store_mut().await;
self.peer.send( project = state.leave_project(sender_id, project_id)?;
conn_id, broadcast(sender_id, project.connection_ids, |conn_id| {
proto::RemoveProjectCollaborator { self.peer.send(
project_id, conn_id,
peer_id: sender_id.0, proto::RemoveProjectCollaborator {
}, project_id,
) peer_id: sender_id.0,
}); },
// TODO )
// self.update_contacts_for_users(&*state, &worktree.authorized_user_ids); });
}
self.update_user_contacts(project.host_user_id).await?;
Ok(()) Ok(())
} }
@ -548,37 +575,30 @@ impl Server {
request: TypedEnvelope<proto::RegisterWorktree>, request: TypedEnvelope<proto::RegisterWorktree>,
response: Response<proto::RegisterWorktree>, response: Response<proto::RegisterWorktree>,
) -> Result<()> { ) -> Result<()> {
let mut contact_user_ids = HashSet::default(); let host_user_id;
for github_login in &request.payload.authorized_logins { {
let contact_user_id = self.app_state.db.create_user(github_login, false).await?; let mut state = self.store_mut().await;
contact_user_ids.insert(contact_user_id); host_user_id = state.user_id_for_connection(request.sender_id)?;
let guest_connection_ids = state
.read_project(request.payload.project_id, request.sender_id)?
.guest_connection_ids();
state.register_worktree(
request.payload.project_id,
request.payload.worktree_id,
request.sender_id,
Worktree {
root_name: request.payload.root_name.clone(),
visible: request.payload.visible,
},
)?;
broadcast(request.sender_id, guest_connection_ids, |connection_id| {
self.peer
.forward_send(request.sender_id, connection_id, request.payload.clone())
});
} }
self.update_user_contacts(host_user_id).await?;
let mut state = self.store_mut().await;
let host_user_id = state.user_id_for_connection(request.sender_id)?;
contact_user_ids.insert(host_user_id);
let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
let guest_connection_ids = state
.read_project(request.payload.project_id, request.sender_id)?
.guest_connection_ids();
state.register_worktree(
request.payload.project_id,
request.payload.worktree_id,
request.sender_id,
Worktree {
authorized_user_ids: contact_user_ids.clone(),
root_name: request.payload.root_name.clone(),
visible: request.payload.visible,
},
)?;
broadcast(request.sender_id, guest_connection_ids, |connection_id| {
self.peer
.forward_send(request.sender_id, connection_id, request.payload.clone())
});
// TODO
// self.update_contacts_for_users(&*state, &contact_user_ids);
response.send(proto::Ack {})?; response.send(proto::Ack {})?;
Ok(()) Ok(())
} }
@ -587,22 +607,25 @@ impl Server {
self: Arc<Server>, self: Arc<Server>,
request: TypedEnvelope<proto::UnregisterWorktree>, request: TypedEnvelope<proto::UnregisterWorktree>,
) -> Result<()> { ) -> Result<()> {
let host_user_id;
let project_id = request.payload.project_id; let project_id = request.payload.project_id;
let worktree_id = request.payload.worktree_id; let worktree_id = request.payload.worktree_id;
let mut state = self.store_mut().await; {
let (worktree, guest_connection_ids) = let mut state = self.store_mut().await;
state.unregister_worktree(project_id, worktree_id, request.sender_id)?; let (_, guest_connection_ids) =
broadcast(request.sender_id, guest_connection_ids, |conn_id| { state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
self.peer.send( host_user_id = state.user_id_for_connection(request.sender_id)?;
conn_id, broadcast(request.sender_id, guest_connection_ids, |conn_id| {
proto::UnregisterWorktree { self.peer.send(
project_id, conn_id,
worktree_id, proto::UnregisterWorktree {
}, project_id,
) worktree_id,
}); },
// TODO )
// self.update_contacts_for_users(&*state, &worktree.authorized_user_ids); });
}
self.update_user_contacts(host_user_id).await?;
Ok(()) Ok(())
} }
@ -950,8 +973,7 @@ impl Server {
response: Response<proto::RequestContact>, response: Response<proto::RequestContact>,
) -> Result<()> { ) -> Result<()> {
let requester_id = self let requester_id = self
.store .store()
.read()
.await .await
.user_id_for_connection(request.sender_id)?; .user_id_for_connection(request.sender_id)?;
let responder_id = UserId::from_proto(request.payload.responder_id); let responder_id = UserId::from_proto(request.payload.responder_id);
@ -989,8 +1011,7 @@ impl Server {
response: Response<proto::RespondToContactRequest>, response: Response<proto::RespondToContactRequest>,
) -> Result<()> { ) -> Result<()> {
let responder_id = self let responder_id = self
.store .store()
.read()
.await .await
.user_id_for_connection(request.sender_id)?; .user_id_for_connection(request.sender_id)?;
let requester_id = UserId::from_proto(request.payload.requester_id); let requester_id = UserId::from_proto(request.payload.requester_id);
@ -1000,45 +1021,28 @@ impl Server {
.respond_to_contact_request(responder_id, requester_id, accept) .respond_to_contact_request(responder_id, requester_id, accept)
.await?; .await?;
let store = self.store().await;
// Update responder with new contact // Update responder with new contact
let mut update = proto::UpdateContacts::default(); let mut update = proto::UpdateContacts::default();
if accept { if accept {
update.contacts.push(proto::Contact { update.contacts.push(store.contact_for_user(requester_id));
user_id: requester_id.to_proto(),
projects: Default::default(), // TODO
online: true, // TODO
});
} }
update update
.remove_incoming_requests .remove_incoming_requests
.push(requester_id.to_proto()); .push(requester_id.to_proto());
for connection_id in self for connection_id in store.connection_ids_for_user(responder_id) {
.store
.read()
.await
.connection_ids_for_user(responder_id)
{
self.peer.send(connection_id, update.clone())?; self.peer.send(connection_id, update.clone())?;
} }
// Update requester with new contact // Update requester with new contact
let mut update = proto::UpdateContacts::default(); let mut update = proto::UpdateContacts::default();
if accept { if accept {
update.contacts.push(proto::Contact { update.contacts.push(store.contact_for_user(responder_id));
user_id: responder_id.to_proto(),
projects: Default::default(), // TODO
online: true, // TODO
});
} }
update update
.remove_outgoing_requests .remove_outgoing_requests
.push(responder_id.to_proto()); .push(responder_id.to_proto());
for connection_id in self for connection_id in store.connection_ids_for_user(requester_id) {
.store
.read()
.await
.connection_ids_for_user(requester_id)
{
self.peer.send(connection_id, update.clone())?; self.peer.send(connection_id, update.clone())?;
} }
@ -1312,9 +1316,11 @@ impl Executor for RealExecutor {
} }
} }
#[instrument(skip(f))] fn broadcast<F>(
fn broadcast<F>(sender_id: ConnectionId, receiver_ids: Vec<ConnectionId>, mut f: F) sender_id: ConnectionId,
where receiver_ids: impl IntoIterator<Item = ConnectionId>,
mut f: F,
) where
F: FnMut(ConnectionId) -> anyhow::Result<()>, F: FnMut(ConnectionId) -> anyhow::Result<()>,
{ {
for receiver_id in receiver_ids { for receiver_id in receiver_ids {
@ -1461,7 +1467,7 @@ mod tests {
self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials, self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
EstablishConnectionError, UserStore, RECEIVE_TIMEOUT, EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
}; };
use collections::BTreeMap; use collections::{BTreeMap, HashSet};
use editor::{ use editor::{
self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename, self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
ToOffset, ToggleCodeActions, Undo, ToOffset, ToggleCodeActions, Undo,
@ -4890,6 +4896,7 @@ mod tests {
#[gpui::test(iterations = 10)] #[gpui::test(iterations = 10)]
async fn test_contacts( async fn test_contacts(
deterministic: Arc<Deterministic>,
cx_a: &mut TestAppContext, cx_a: &mut TestAppContext,
cx_b: &mut TestAppContext, cx_b: &mut TestAppContext,
cx_c: &mut TestAppContext, cx_c: &mut TestAppContext,
@ -4903,15 +4910,26 @@ mod tests {
let client_a = server.create_client(cx_a, "user_a").await; let client_a = server.create_client(cx_a, "user_a").await;
let client_b = server.create_client(cx_b, "user_b").await; let client_b = server.create_client(cx_b, "user_b").await;
let client_c = server.create_client(cx_c, "user_c").await; let client_c = server.create_client(cx_c, "user_c").await;
server
.make_contacts(vec![
(&client_a, cx_a),
(&client_b, cx_b),
(&client_c, cx_c),
])
.await;
deterministic.run_until_parked();
client_a.user_store.read_with(cx_a, |store, _| {
assert_eq!(contacts(store), [("user_b", vec![]), ("user_c", vec![])])
});
client_b.user_store.read_with(cx_b, |store, _| {
assert_eq!(contacts(store), [("user_a", vec![]), ("user_c", vec![])])
});
client_c.user_store.read_with(cx_c, |store, _| {
assert_eq!(contacts(store), [("user_a", vec![]), ("user_b", vec![])])
});
// Share a worktree as client A. // Share a worktree as client A.
fs.insert_tree( fs.create_dir(Path::new("/a")).await.unwrap();
"/a",
json!({
".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
}),
)
.await;
let project_a = cx_a.update(|cx| { let project_a = cx_a.update(|cx| {
Project::local( Project::local(
@ -4932,24 +4950,22 @@ mod tests {
.read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete()) .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
.await; .await;
client_a deterministic.run_until_parked();
.user_store client_a.user_store.read_with(cx_a, |store, _| {
.condition(&cx_a, |user_store, _| { assert_eq!(contacts(store), [("user_b", vec![]), ("user_c", vec![])])
contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])] });
}) client_b.user_store.read_with(cx_b, |store, _| {
.await; assert_eq!(
client_b contacts(store),
.user_store [("user_a", vec![("a", false, vec![])]), ("user_c", vec![])]
.condition(&cx_b, |user_store, _| { )
contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])] });
}) client_c.user_store.read_with(cx_c, |store, _| {
.await; assert_eq!(
client_c contacts(store),
.user_store [("user_a", vec![("a", false, vec![])]), ("user_b", vec![])]
.condition(&cx_c, |user_store, _| { )
contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])] });
})
.await;
let project_id = project_a let project_id = project_a
.update(cx_a, |project, _| project.next_remote_id()) .update(cx_a, |project, _| project.next_remote_id())
@ -4958,24 +4974,22 @@ mod tests {
.update(cx_a, |project, cx| project.share(cx)) .update(cx_a, |project, cx| project.share(cx))
.await .await
.unwrap(); .unwrap();
client_a deterministic.run_until_parked();
.user_store client_a.user_store.read_with(cx_a, |store, _| {
.condition(&cx_a, |user_store, _| { assert_eq!(contacts(store), [("user_b", vec![]), ("user_c", vec![])])
contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])] });
}) client_b.user_store.read_with(cx_b, |store, _| {
.await; assert_eq!(
client_b contacts(store),
.user_store [("user_a", vec![("a", true, vec![])]), ("user_c", vec![])]
.condition(&cx_b, |user_store, _| { )
contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])] });
}) client_c.user_store.read_with(cx_c, |store, _| {
.await; assert_eq!(
client_c contacts(store),
.user_store [("user_a", vec![("a", true, vec![])]), ("user_b", vec![])]
.condition(&cx_c, |user_store, _| { )
contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])] });
})
.await;
let _project_b = Project::remote( let _project_b = Project::remote(
project_id, project_id,
@ -4987,25 +5001,28 @@ mod tests {
) )
.await .await
.unwrap(); .unwrap();
deterministic.run_until_parked();
client_a client_a.user_store.read_with(cx_a, |store, _| {
.user_store assert_eq!(contacts(store), [("user_b", vec![]), ("user_c", vec![])])
.condition(&cx_a, |user_store, _| { });
contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])] client_b.user_store.read_with(cx_b, |store, _| {
}) assert_eq!(
.await; contacts(store),
client_b [
.user_store ("user_a", vec![("a", true, vec!["user_b"])]),
.condition(&cx_b, |user_store, _| { ("user_c", vec![])
contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])] ]
}) )
.await; });
client_c client_c.user_store.read_with(cx_c, |store, _| {
.user_store assert_eq!(
.condition(&cx_c, |user_store, _| { contacts(store),
contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])] [
}) ("user_a", vec![("a", true, vec!["user_b"])]),
.await; ("user_b", vec![])
]
)
});
project_a project_a
.condition(&cx_a, |project, _| { .condition(&cx_a, |project, _| {
@ -5014,18 +5031,16 @@ mod tests {
.await; .await;
cx_a.update(move |_| drop(project_a)); cx_a.update(move |_| drop(project_a));
client_a deterministic.run_until_parked();
.user_store client_a.user_store.read_with(cx_a, |store, _| {
.condition(&cx_a, |user_store, _| contacts(user_store) == vec![]) assert_eq!(contacts(store), [("user_b", vec![]), ("user_c", vec![])])
.await; });
client_b client_b.user_store.read_with(cx_b, |store, _| {
.user_store assert_eq!(contacts(store), [("user_a", vec![]), ("user_c", vec![])])
.condition(&cx_b, |user_store, _| contacts(user_store) == vec![]) });
.await; client_c.user_store.read_with(cx_c, |store, _| {
client_c assert_eq!(contacts(store), [("user_a", vec![]), ("user_b", vec![])])
.user_store });
.condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
.await;
fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, bool, Vec<&str>)>)> { fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, bool, Vec<&str>)>)> {
user_store user_store
@ -6370,6 +6385,28 @@ mod tests {
self.forbid_connections.store(false, SeqCst); self.forbid_connections.store(false, SeqCst);
} }
async fn make_contacts(&self, mut clients: Vec<(&TestClient, &mut TestAppContext)>) {
while let Some((client_a, cx_a)) = clients.pop() {
for (client_b, cx_b) in &mut clients {
client_a
.user_store
.update(cx_a, |store, _| {
store.request_contact(client_b.user_id().unwrap())
})
.await
.unwrap();
cx_a.foreground().run_until_parked();
client_b
.user_store
.update(*cx_b, |store, _| {
store.respond_to_contact_request(client_a.user_id().unwrap(), true)
})
.await
.unwrap();
}
}
}
async fn build_app_state(test_db: &TestDb) -> Arc<AppState> { async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
Arc::new(AppState { Arc::new(AppState {
db: test_db.db().clone(), db: test_db.db().clone(),
@ -6457,7 +6494,7 @@ mod tests {
} }
fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary { fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
self.user_store.read_with(cx, |store, cx| ContactsSummary { self.user_store.read_with(cx, |store, _| ContactsSummary {
current: store current: store
.contacts() .contacts()
.iter() .iter()

View File

@ -10,7 +10,6 @@ pub struct Store {
connections: HashMap<ConnectionId, ConnectionState>, connections: HashMap<ConnectionId, ConnectionState>,
connections_by_user_id: HashMap<UserId, HashSet<ConnectionId>>, connections_by_user_id: HashMap<UserId, HashSet<ConnectionId>>,
projects: HashMap<u64, Project>, projects: HashMap<u64, Project>,
visible_projects_by_user_id: HashMap<UserId, HashSet<u64>>,
channels: HashMap<ChannelId, Channel>, channels: HashMap<ChannelId, Channel>,
next_project_id: u64, next_project_id: u64,
} }
@ -30,7 +29,6 @@ pub struct Project {
} }
pub struct Worktree { pub struct Worktree {
pub authorized_user_ids: Vec<UserId>,
pub root_name: String, pub root_name: String,
pub visible: bool, pub visible: bool,
} }
@ -69,18 +67,16 @@ pub struct JoinedProject<'a> {
pub project: &'a Project, pub project: &'a Project,
} }
pub struct SharedProject { pub struct SharedProject {}
pub authorized_user_ids: Vec<UserId>,
}
pub struct UnsharedProject { pub struct UnsharedProject {
pub connection_ids: Vec<ConnectionId>, pub connection_ids: Vec<ConnectionId>,
pub authorized_user_ids: Vec<UserId>, pub host_user_id: UserId,
} }
pub struct LeftProject { pub struct LeftProject {
pub connection_ids: Vec<ConnectionId>, pub connection_ids: Vec<ConnectionId>,
pub authorized_user_ids: Vec<UserId>, pub host_user_id: UserId,
} }
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
@ -155,13 +151,11 @@ impl Store {
result.user_id = connection.user_id; result.user_id = connection.user_id;
for project_id in connection.projects.clone() { for project_id in connection.projects.clone() {
if let Ok(project) = self.unregister_project(project_id, connection_id) { if let Ok(project) = self.unregister_project(project_id, connection_id) {
result.contact_ids.extend(project.authorized_user_ids());
result.hosted_projects.insert(project_id, project); result.hosted_projects.insert(project_id, project);
} else if let Ok(project) = self.leave_project(connection_id, project_id) { } else if let Ok(project) = self.leave_project(connection_id, project_id) {
result result
.guest_project_ids .guest_project_ids
.insert(project_id, project.connection_ids); .insert(project_id, project.connection_ids);
result.contact_ids.extend(project.authorized_user_ids);
} }
} }
@ -215,6 +209,14 @@ impl Store {
.copied() .copied()
} }
pub fn is_user_online(&self, user_id: UserId) -> bool {
!self
.connections_by_user_id
.get(&user_id)
.unwrap_or(&Default::default())
.is_empty()
}
pub fn build_initial_contacts_update(&self, contacts: db::Contacts) -> proto::UpdateContacts { pub fn build_initial_contacts_update(&self, contacts: db::Contacts) -> proto::UpdateContacts {
let mut update = proto::UpdateContacts::default(); let mut update = proto::UpdateContacts::default();
for user_id in contacts.current { for user_id in contacts.current {
@ -241,7 +243,7 @@ impl Store {
proto::Contact { proto::Contact {
user_id: user_id.to_proto(), user_id: user_id.to_proto(),
projects: self.project_metadata_for_user(user_id), projects: self.project_metadata_for_user(user_id),
online: self.connection_ids_for_user(user_id).next().is_some(), online: self.is_user_online(user_id),
} }
} }
@ -359,13 +361,6 @@ impl Store {
.get_mut(&project_id) .get_mut(&project_id)
.ok_or_else(|| anyhow!("no such project"))?; .ok_or_else(|| anyhow!("no such project"))?;
if project.host_connection_id == connection_id { if project.host_connection_id == connection_id {
for authorized_user_id in &worktree.authorized_user_ids {
self.visible_projects_by_user_id
.entry(*authorized_user_id)
.or_default()
.insert(project_id);
}
project.worktrees.insert(worktree_id, worktree); project.worktrees.insert(worktree_id, worktree);
if let Ok(share) = project.share_mut() { if let Ok(share) = project.share_mut() {
share.worktrees.insert(worktree_id, Default::default()); share.worktrees.insert(worktree_id, Default::default());
@ -385,14 +380,6 @@ impl Store {
match self.projects.entry(project_id) { match self.projects.entry(project_id) {
hash_map::Entry::Occupied(e) => { hash_map::Entry::Occupied(e) => {
if e.get().host_connection_id == connection_id { if e.get().host_connection_id == connection_id {
for user_id in e.get().authorized_user_ids() {
if let hash_map::Entry::Occupied(mut projects) =
self.visible_projects_by_user_id.entry(user_id)
{
projects.get_mut().remove(&project_id);
}
}
let project = e.remove(); let project = e.remove();
if let Some(host_connection) = self.connections.get_mut(&connection_id) { if let Some(host_connection) = self.connections.get_mut(&connection_id) {
@ -441,16 +428,6 @@ impl Store {
share.worktrees.remove(&worktree_id); share.worktrees.remove(&worktree_id);
} }
for authorized_user_id in &worktree.authorized_user_ids {
if let Some(visible_projects) =
self.visible_projects_by_user_id.get_mut(authorized_user_id)
{
if !project.has_authorized_user_id(*authorized_user_id) {
visible_projects.remove(&project_id);
}
}
}
Ok((worktree, guest_connection_ids)) Ok((worktree, guest_connection_ids))
} }
@ -466,9 +443,7 @@ impl Store {
share.worktrees.insert(*worktree_id, Default::default()); share.worktrees.insert(*worktree_id, Default::default());
} }
project.share = Some(share); project.share = Some(share);
return Ok(SharedProject { return Ok(SharedProject {});
authorized_user_ids: project.authorized_user_ids(),
});
} }
} }
Err(anyhow!("no such project"))? Err(anyhow!("no such project"))?
@ -490,7 +465,6 @@ impl Store {
} }
let connection_ids = project.connection_ids(); let connection_ids = project.connection_ids();
let authorized_user_ids = project.authorized_user_ids();
if let Some(share) = project.share.take() { if let Some(share) = project.share.take() {
for connection_id in share.guests.into_keys() { for connection_id in share.guests.into_keys() {
if let Some(connection) = self.connections.get_mut(&connection_id) { if let Some(connection) = self.connections.get_mut(&connection_id) {
@ -500,7 +474,7 @@ impl Store {
Ok(UnsharedProject { Ok(UnsharedProject {
connection_ids, connection_ids,
authorized_user_ids, host_user_id: project.host_user_id,
}) })
} else { } else {
Err(anyhow!("project is not shared"))? Err(anyhow!("project is not shared"))?
@ -564,13 +538,6 @@ impl Store {
let project = self let project = self
.projects .projects
.get_mut(&project_id) .get_mut(&project_id)
.and_then(|project| {
if project.has_authorized_user_id(user_id) {
Some(project)
} else {
None
}
})
.ok_or_else(|| anyhow!("no such project"))?; .ok_or_else(|| anyhow!("no such project"))?;
let share = project.share_mut()?; let share = project.share_mut()?;
@ -612,12 +579,9 @@ impl Store {
connection.projects.remove(&project_id); connection.projects.remove(&project_id);
} }
let connection_ids = project.connection_ids();
let authorized_user_ids = project.authorized_user_ids();
Ok(LeftProject { Ok(LeftProject {
connection_ids, connection_ids: project.connection_ids(),
authorized_user_ids, host_user_id: project.host_user_id,
}) })
} }
@ -767,14 +731,6 @@ impl Store {
let host_connection = self.connections.get(&project.host_connection_id).unwrap(); let host_connection = self.connections.get(&project.host_connection_id).unwrap();
assert!(host_connection.projects.contains(project_id)); assert!(host_connection.projects.contains(project_id));
for authorized_user_ids in project.authorized_user_ids() {
let visible_project_ids = self
.visible_projects_by_user_id
.get(&authorized_user_ids)
.unwrap();
assert!(visible_project_ids.contains(project_id));
}
if let Some(share) = &project.share { if let Some(share) = &project.share {
for guest_connection_id in share.guests.keys() { for guest_connection_id in share.guests.keys() {
let guest_connection = self.connections.get(guest_connection_id).unwrap(); let guest_connection = self.connections.get(guest_connection_id).unwrap();
@ -792,13 +748,6 @@ impl Store {
} }
} }
for (user_id, visible_project_ids) in &self.visible_projects_by_user_id {
for project_id in visible_project_ids {
let project = self.projects.get(project_id).unwrap();
assert!(project.authorized_user_ids().contains(user_id));
}
}
for (channel_id, channel) in &self.channels { for (channel_id, channel) in &self.channels {
for connection_id in &channel.connection_ids { for connection_id in &channel.connection_ids {
let connection = self.connections.get(connection_id).unwrap(); let connection = self.connections.get(connection_id).unwrap();
@ -809,24 +758,6 @@ impl Store {
} }
impl Project { impl Project {
pub fn has_authorized_user_id(&self, user_id: UserId) -> bool {
self.worktrees
.values()
.any(|worktree| worktree.authorized_user_ids.contains(&user_id))
}
pub fn authorized_user_ids(&self) -> Vec<UserId> {
let mut ids = self
.worktrees
.values()
.flat_map(|worktree| worktree.authorized_user_ids.iter())
.copied()
.collect::<Vec<_>>();
ids.sort_unstable();
ids.dedup();
ids
}
pub fn guest_connection_ids(&self) -> Vec<ConnectionId> { pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
if let Some(share) = &self.share { if let Some(share) = &self.share {
share.guests.keys().copied().collect() share.guests.keys().copied().collect()