Replace callback-based requests/messages with streams

This commit is contained in:
Antonio Scandurra 2021-06-16 14:26:54 +02:00
parent 8b66e0aa7e
commit 8112efd522
8 changed files with 306 additions and 137 deletions

View File

@ -1348,10 +1348,6 @@ impl MutableAppContext {
AsyncAppContext(self.weak_self.as_ref().unwrap().upgrade().unwrap())
}
pub fn to_background(&self) -> BackgroundAppContext {
//
}
pub fn write_to_clipboard(&self, item: ClipboardItem) {
self.platform.write_to_clipboard(item);
}

View File

@ -479,8 +479,13 @@ mod tests {
let app_state = cx.read(build_app_state);
let (window_id, workspace) = cx.add_window(|cx| {
let mut workspace =
Workspace::new(0, app_state.settings, app_state.language_registry, cx);
let mut workspace = Workspace::new(
0,
app_state.settings,
app_state.language_registry,
app_state.rpc_client,
cx,
);
workspace.add_worktree(tmp_dir.path(), cx);
workspace
});
@ -551,6 +556,7 @@ mod tests {
0,
app_state.settings.clone(),
app_state.language_registry.clone(),
app_state.rpc_client.clone(),
cx,
);
workspace.add_worktree(tmp_dir.path(), cx);
@ -614,6 +620,7 @@ mod tests {
0,
app_state.settings.clone(),
app_state.language_registry.clone(),
app_state.rpc_client.clone(),
cx,
);
workspace.add_worktree(&file_path, cx);
@ -665,6 +672,7 @@ mod tests {
0,
app_state.settings.clone(),
app_state.language_registry.clone(),
app_state.rpc_client.clone(),
cx,
)
});

View File

@ -1,8 +1,5 @@
use futures::Future;
use gpui::MutableAppContext;
use rpc_client::RpcClient;
use std::sync::Arc;
use zed_rpc::proto::RequestMessage;
pub mod assets;
pub mod editor;
@ -27,27 +24,6 @@ pub struct AppState {
pub rpc_client: Arc<RpcClient>,
}
impl AppState {
pub async fn on_rpc_request<Req, F, Fut>(
&self,
cx: &mut MutableAppContext,
handler: F,
) where
Req: RequestMessage,
F: 'static + Send + Sync + Fn(Req, &AppState, &mut MutableAppContext) -> Fut,
Fut: 'static + Send + Sync + Future<Output = Req::Response>,
{
let app_state = self.clone();
let cx = cx.to_background();
app_state
.rpc_client
.on_request(move |req| cx.update(|cx| async move {
handler(req, &app_state, cx)
})
.await
}
}
pub fn init(cx: &mut gpui::MutableAppContext) {
cx.add_global_action("app:quit", quit);
}

View File

@ -25,13 +25,13 @@ fn main() {
let app_state = AppState {
language_registry,
settings,
rpc_client: Arc::new(RpcClient::new()),
rpc_client: RpcClient::new(),
};
app.run(move |cx| {
cx.set_menus(menus::menus(app_state.clone()));
zed::init(cx);
workspace::init(cx, &app_state);
workspace::init(cx, app_state.rpc_client.clone());
editor::init(cx);
file_finder::init(cx);

View File

@ -1,7 +1,7 @@
use anyhow::{anyhow, Result};
use futures::future::{BoxFuture, Either, FutureExt};
use futures::future::Either;
use postage::{
barrier, oneshot,
barrier, mpsc, oneshot,
prelude::{Sink, Stream},
};
use smol::{
@ -30,87 +30,128 @@ struct RpcConnection {
_close_barrier: barrier::Sender,
}
type RequestHandler = Box<
dyn Send
+ Sync
+ Fn(&mut Option<proto::Envelope>, &AtomicU32) -> Option<BoxFuture<'static, proto::Envelope>>,
>;
type MessageHandler =
Box<dyn Send + Sync + Fn(&mut Option<proto::Envelope>) -> Option<BoxFuture<'static, ()>>>;
Box<dyn Send + Sync + Fn(&mut Option<proto::Envelope>, ConnectionId) -> Option<ErasedMessage>>;
struct ErasedMessage {
id: u32,
connection_id: ConnectionId,
body: proto::Envelope,
}
pub struct Message<T: EnvelopedMessage> {
connection_id: ConnectionId,
body: Option<T>,
}
impl<T: EnvelopedMessage> From<ErasedMessage> for Message<T> {
fn from(message: ErasedMessage) -> Self {
Self {
connection_id: message.connection_id,
body: T::from_envelope(message.body),
}
}
}
impl<T: EnvelopedMessage> Message<T> {
pub fn connection_id(&self) -> ConnectionId {
self.connection_id
}
pub fn body(&mut self) -> T {
self.body.take().expect("body already taken")
}
}
pub struct Request<T: RequestMessage> {
id: u32,
connection_id: ConnectionId,
body: Option<T>,
}
impl<T: RequestMessage> From<ErasedMessage> for Request<T> {
fn from(message: ErasedMessage) -> Self {
Self {
id: message.id,
connection_id: message.connection_id,
body: T::from_envelope(message.body),
}
}
}
impl<T: RequestMessage> Request<T> {
pub fn connection_id(&self) -> ConnectionId {
self.connection_id
}
pub fn body(&mut self) -> T {
self.body.take().expect("body already taken")
}
}
pub struct RpcClient {
connections: RwLock<HashMap<ConnectionId, Arc<RpcConnection>>>,
request_handlers: RwLock<Vec<RequestHandler>>,
message_handlers: RwLock<Vec<MessageHandler>>,
handler_types: RwLock<HashSet<TypeId>>,
message_handlers: RwLock<Vec<(mpsc::Sender<ErasedMessage>, MessageHandler)>>,
handler_types: Mutex<HashSet<TypeId>>,
next_connection_id: AtomicU32,
}
impl RpcClient {
pub fn new() -> Self {
Self {
request_handlers: Default::default(),
pub fn new() -> Arc<Self> {
Arc::new(Self {
connections: Default::default(),
message_handlers: Default::default(),
handler_types: Default::default(),
connections: Default::default(),
next_connection_id: Default::default(),
}
})
}
pub async fn on_request<Req, F, Fut>(&self, handler: F)
where
Req: RequestMessage,
F: 'static + Send + Sync + Fn(Req) -> Fut,
Fut: 'static + Send + Sync + Future<Output = Req::Response>,
{
if !self.handler_types.write().await.insert(TypeId::of::<Req>()) {
panic!("duplicate request handler type");
pub async fn add_request_handler<T: RequestMessage>(&self) -> impl Stream<Item = Request<T>> {
if !self.handler_types.lock().await.insert(TypeId::of::<T>()) {
panic!("duplicate handler type");
}
self.request_handlers
.write()
.await
.push(Box::new(move |envelope, next_message_id| {
if envelope.as_ref().map_or(false, Req::matches_envelope) {
let (tx, rx) = mpsc::channel(256);
self.message_handlers.write().await.push((
tx,
Box::new(move |envelope, connection_id| {
if envelope.as_ref().map_or(false, T::matches_envelope) {
let envelope = Option::take(envelope).unwrap();
let message_id = next_message_id.fetch_add(1, atomic::Ordering::SeqCst);
let responding_to = envelope.id;
let request = Req::from_envelope(envelope).unwrap();
Some(
handler(request)
.map(move |response| {
response.into_envelope(message_id, Some(responding_to))
})
.boxed(),
)
Some(ErasedMessage {
id: envelope.id,
connection_id,
body: envelope,
})
} else {
None
}
}));
}),
));
rx.map(Request::from)
}
pub async fn on_message<M, F, Fut>(&self, handler: F)
where
M: EnvelopedMessage,
F: 'static + Send + Sync + Fn(M) -> Fut,
Fut: 'static + Send + Sync + Future<Output = ()>,
{
if !self.handler_types.write().await.insert(TypeId::of::<M>()) {
panic!("duplicate request handler type");
pub async fn add_message_handler<T: EnvelopedMessage>(&self) -> impl Stream<Item = Message<T>> {
if !self.handler_types.lock().await.insert(TypeId::of::<T>()) {
panic!("duplicate handler type");
}
self.message_handlers
.write()
.await
.push(Box::new(move |envelope| {
if envelope.as_ref().map_or(false, M::matches_envelope) {
let (tx, rx) = mpsc::channel(256);
self.message_handlers.write().await.push((
tx,
Box::new(move |envelope, connection_id| {
if envelope.as_ref().map_or(false, T::matches_envelope) {
let envelope = Option::take(envelope).unwrap();
let request = M::from_envelope(envelope).unwrap();
Some(handler(request).boxed())
Some(ErasedMessage {
id: envelope.id,
connection_id,
body: envelope,
})
} else {
None
}
}));
}),
));
rx.map(Message::from)
}
pub async fn add_connection<Conn>(
@ -167,36 +208,14 @@ impl RpcClient {
} else {
let mut handled = false;
let mut envelope = Some(incoming);
for handler in this.request_handlers.iter() {
if let Some(future) =
handler(&mut envelope, &connection.next_message_id)
{
let response = future.await;
if let Err(error) = connection
.writer
.lock()
.await
.write_message(&response)
.await
{
log::warn!("failed to write response: {}", error);
return;
}
for (tx, handler) in this.message_handlers.read().await.iter() {
if let Some(message) = handler(&mut envelope, connection_id) {
let _ = tx.clone().send(message).await;
handled = true;
break;
}
}
if !handled {
for handler in this.message_handlers.iter() {
if let Some(future) = handler(&mut envelope) {
future.await;
handled = true;
break;
}
}
}
if !handled {
log::warn!("unhandled message: {:?}", envelope.unwrap().payload);
}
@ -281,6 +300,33 @@ impl RpcClient {
Ok(())
}
}
pub fn respond<T: RequestMessage>(
self: &Arc<Self>,
request: Request<T>,
response: T::Response,
) -> impl Future<Output = Result<()>> {
let this = self.clone();
async move {
let connection = this
.connections
.read()
.await
.get(&request.connection_id)
.ok_or_else(|| anyhow!("unknown connection: {}", request.connection_id.0))?
.clone();
let message_id = connection
.next_message_id
.fetch_add(1, atomic::Ordering::SeqCst);
connection
.writer
.lock()
.await
.write_message(&response.into_envelope(message_id, Some(request.id)))
.await?;
Ok(())
}
}
}
#[cfg(test)]
@ -304,7 +350,7 @@ mod tests {
let (server_conn, _) = listener.accept().await.unwrap();
let mut server_stream = MessageStream::new(server_conn);
let client = Arc::new(RpcClient::new());
let client = RpcClient::new();
let (connection_id, handler) = client.add_connection(client_conn).await;
executor.spawn(handler).detach();
@ -363,7 +409,7 @@ mod tests {
let client_conn = UnixStream::connect(&socket_path).await.unwrap();
let (mut server_conn, _) = listener.accept().await.unwrap();
let client = Arc::new(RpcClient::new());
let client = RpcClient::new();
let (connection_id, handler) = client.add_connection(client_conn).await;
executor.spawn(handler).detach();
client.disconnect(connection_id).await;
@ -390,7 +436,7 @@ mod tests {
let mut client_conn = UnixStream::connect(&socket_path).await.unwrap();
client_conn.close().await.unwrap();
let client = Arc::new(RpcClient::new());
let client = RpcClient::new();
let (connection_id, handler) = client.add_connection(client_conn).await;
executor.spawn(handler).detach();
let err = client

View File

@ -1,4 +1,6 @@
use crate::{language::LanguageRegistry, settings, time::ReplicaId, AppState};
use crate::{
language::LanguageRegistry, rpc_client::RpcClient, settings, time::ReplicaId, AppState,
};
use ctor::ctor;
use gpui::AppContext;
use rand::Rng;
@ -150,5 +152,6 @@ pub fn build_app_state(cx: &AppContext) -> AppState {
AppState {
settings,
language_registry,
rpc_client: RpcClient::new(),
}
}

View File

@ -1,5 +1,8 @@
use crate::rpc_client::{Message, Request, RpcClient};
use postage::prelude::Stream;
use rand::prelude::*;
use std::cmp::Ordering;
use std::{cmp::Ordering, future::Future, sync::Arc};
use zed_rpc::proto;
#[derive(Copy, Clone, Eq, PartialEq, Debug, Hash)]
pub enum Bias {
@ -53,6 +56,104 @@ where
}
}
pub trait RequestHandler<'a, R: proto::RequestMessage> {
type Output: 'a + Future<Output = anyhow::Result<()>>;
fn handle(
&self,
request: Request<R>,
client: Arc<RpcClient>,
cx: &'a mut gpui::AsyncAppContext,
) -> Self::Output;
}
impl<'a, R, F, Fut> RequestHandler<'a, R> for F
where
R: proto::RequestMessage,
F: Fn(Request<R>, Arc<RpcClient>, &'a mut gpui::AsyncAppContext) -> Fut,
Fut: 'a + Future<Output = anyhow::Result<()>>,
{
type Output = Fut;
fn handle(
&self,
request: Request<R>,
client: Arc<RpcClient>,
cx: &'a mut gpui::AsyncAppContext,
) -> Self::Output {
(self)(request, client, cx)
}
}
pub trait MessageHandler<'a, M: proto::EnvelopedMessage> {
type Output: 'a + Future<Output = anyhow::Result<()>>;
fn handle(
&self,
message: Message<M>,
client: Arc<RpcClient>,
cx: &'a mut gpui::AsyncAppContext,
) -> Self::Output;
}
impl<'a, M, F, Fut> MessageHandler<'a, M> for F
where
M: proto::EnvelopedMessage,
F: Fn(Message<M>, Arc<RpcClient>, &'a mut gpui::AsyncAppContext) -> Fut,
Fut: 'a + Future<Output = anyhow::Result<()>>,
{
type Output = Fut;
fn handle(
&self,
message: Message<M>,
client: Arc<RpcClient>,
cx: &'a mut gpui::AsyncAppContext,
) -> Self::Output {
(self)(message, client, cx)
}
}
pub fn spawn_request_handler<H, R>(
handler: H,
client: &Arc<RpcClient>,
cx: &mut gpui::MutableAppContext,
) where
H: 'static + for<'a> RequestHandler<'a, R>,
R: proto::RequestMessage,
{
let client = client.clone();
let mut requests = smol::block_on(client.add_request_handler::<R>());
cx.spawn(|mut cx| async move {
while let Some(request) = requests.recv().await {
if let Err(err) = handler.handle(request, client.clone(), &mut cx).await {
log::error!("error handling request: {:?}", err);
}
}
})
.detach();
}
pub fn spawn_message_handler<H, M>(
handler: H,
client: &Arc<RpcClient>,
cx: &mut gpui::MutableAppContext,
) where
H: 'static + for<'a> MessageHandler<'a, M>,
M: proto::EnvelopedMessage,
{
let client = client.clone();
let mut messages = smol::block_on(client.add_message_handler::<M>());
cx.spawn(|mut cx| async move {
while let Some(message) = messages.recv().await {
if let Err(err) = handler.handle(message, client.clone(), &mut cx).await {
log::error!("error handling message: {:?}", err);
}
}
})
.detach();
}
pub struct RandomCharIter<T: Rng>(T);
impl<T: Rng> RandomCharIter<T> {

View File

@ -4,10 +4,10 @@ pub mod pane_group;
use crate::{
editor::{Buffer, Editor},
language::LanguageRegistry,
rpc_client::RpcClient,
rpc_client::{Request, RpcClient},
settings::Settings,
time::ReplicaId,
util::SurfResultExt as _,
util::{self, SurfResultExt as _},
worktree::{FileHandle, Worktree, WorktreeHandle},
AppState,
};
@ -33,7 +33,7 @@ use std::{
use surf::Url;
use zed_rpc::{proto, rest::CreateWorktreeResponse};
pub fn init(cx: &mut MutableAppContext, rpc_client: &mut RpcClient) {
pub fn init(cx: &mut MutableAppContext, rpc_client: Arc<RpcClient>) {
cx.add_global_action("workspace:open", open);
cx.add_global_action("workspace:open_paths", open_paths);
cx.add_action("workspace:save", Workspace::save_active_item);
@ -46,8 +46,7 @@ pub fn init(cx: &mut MutableAppContext, rpc_client: &mut RpcClient) {
]);
pane::init(cx);
let cx = cx.to_async();
rpc_client.on_request(move |req| handle_open_buffer(req, cx));
util::spawn_request_handler(handle_open_buffer, &rpc_client, cx);
}
pub struct OpenParams {
@ -100,6 +99,7 @@ fn open_paths(params: &OpenParams, cx: &mut MutableAppContext) {
0,
params.app_state.settings.clone(),
params.app_state.language_registry.clone(),
params.app_state.rpc_client.clone(),
cx,
);
let open_paths = view.open_paths(&params.paths, cx);
@ -108,8 +108,19 @@ fn open_paths(params: &OpenParams, cx: &mut MutableAppContext) {
});
}
fn handle_open_buffer(request: zed_rpc::proto::OpenBuffer, cx: AsyncAppContext) {
//
async fn handle_open_buffer(
mut request: Request<proto::OpenBuffer>,
rpc_client: Arc<RpcClient>,
cx: &mut AsyncAppContext,
) -> anyhow::Result<()> {
let body = request.body();
dbg!(body.path);
rpc_client
.respond(request, proto::OpenBufferResponse { buffer: None })
.await?;
dbg!(cx.read(|app| app.root_view_id(1)));
Ok(())
}
pub trait Item: Entity + Sized {
@ -302,6 +313,7 @@ pub struct State {
pub struct Workspace {
pub settings: watch::Receiver<Settings>,
language_registry: Arc<LanguageRegistry>,
rpc_client: Arc<RpcClient>,
modal: Option<AnyViewHandle>,
center: PaneGroup,
panes: Vec<ViewHandle<Pane>>,
@ -320,6 +332,7 @@ impl Workspace {
replica_id: ReplicaId,
settings: watch::Receiver<Settings>,
language_registry: Arc<LanguageRegistry>,
rpc_client: Arc<RpcClient>,
cx: &mut ViewContext<Self>,
) -> Self {
let pane = cx.add_view(|_| Pane::new(settings.clone()));
@ -336,6 +349,7 @@ impl Workspace {
active_pane: pane.clone(),
settings,
language_registry,
rpc_client,
replica_id,
worktrees: Default::default(),
items: Default::default(),
@ -651,6 +665,7 @@ impl Workspace {
}
fn share_worktree(&mut self, _: &(), cx: &mut ViewContext<Self>) {
let rpc_client = self.rpc_client.clone();
let zed_url = std::env::var("ZED_SERVER_URL").unwrap_or("https://zed.dev".to_string());
let executor = cx.background_executor().clone();
@ -677,7 +692,6 @@ impl Workspace {
// a TLS stream using `native-tls`.
let stream = smol::net::TcpStream::connect(rpc_address).await?;
let rpc_client = Arc::new(RpcClient::new());
let (connection_id, handler) = rpc_client.add_connection(stream).await;
executor.spawn(handler).detach();
@ -942,7 +956,7 @@ mod tests {
fn test_open_paths_action(cx: &mut gpui::MutableAppContext) {
let app_state = build_app_state(cx.as_ref());
init(cx);
init(cx, app_state.rpc_client.clone());
let dir = temp_tree(json!({
"a": {
@ -1010,8 +1024,13 @@ mod tests {
let app_state = cx.read(build_app_state);
let (_, workspace) = cx.add_window(|cx| {
let mut workspace =
Workspace::new(0, app_state.settings, app_state.language_registry, cx);
let mut workspace = Workspace::new(
0,
app_state.settings,
app_state.language_registry,
app_state.rpc_client,
cx,
);
workspace.add_worktree(dir.path(), cx);
workspace
});
@ -1114,8 +1133,13 @@ mod tests {
let app_state = cx.read(build_app_state);
let (_, workspace) = cx.add_window(|cx| {
let mut workspace =
Workspace::new(0, app_state.settings, app_state.language_registry, cx);
let mut workspace = Workspace::new(
0,
app_state.settings,
app_state.language_registry,
app_state.rpc_client,
cx,
);
workspace.add_worktree(dir1.path(), cx);
workspace
});
@ -1183,8 +1207,13 @@ mod tests {
let app_state = cx.read(build_app_state);
let (window_id, workspace) = cx.add_window(|cx| {
let mut workspace =
Workspace::new(0, app_state.settings, app_state.language_registry, cx);
let mut workspace = Workspace::new(
0,
app_state.settings,
app_state.language_registry,
app_state.rpc_client,
cx,
);
workspace.add_worktree(dir.path(), cx);
workspace
});
@ -1227,8 +1256,13 @@ mod tests {
let dir = TempDir::new("test-new-file").unwrap();
let app_state = cx.read(build_app_state);
let (_, workspace) = cx.add_window(|cx| {
let mut workspace =
Workspace::new(0, app_state.settings, app_state.language_registry, cx);
let mut workspace = Workspace::new(
0,
app_state.settings,
app_state.language_registry,
app_state.rpc_client,
cx,
);
workspace.add_worktree(dir.path(), cx);
workspace
});
@ -1328,8 +1362,13 @@ mod tests {
let app_state = cx.read(build_app_state);
let (window_id, workspace) = cx.add_window(|cx| {
let mut workspace =
Workspace::new(0, app_state.settings, app_state.language_registry, cx);
let mut workspace = Workspace::new(
0,
app_state.settings,
app_state.language_registry,
app_state.rpc_client,
cx,
);
workspace.add_worktree(dir.path(), cx);
workspace
});