Add menu command to join worktree using URL on clipboard

Introduce rpc::Client struct that wraps zed_rpc::Peer
This commit is contained in:
Max Brunsfeld 2021-06-17 17:40:38 -07:00
parent 3a78f053f6
commit 7a88e44264
8 changed files with 294 additions and 167 deletions

View File

@ -1799,6 +1799,10 @@ impl<'a, T: View> ViewContext<'a, T> {
&self.app.cx.background
}
pub fn platform(&self) -> Arc<dyn Platform> {
self.app.platform()
}
pub fn prompt<F>(&self, level: PromptLevel, msg: &str, answers: &[&str], done_fn: F)
where
F: 'static + FnOnce(usize, &mut MutableAppContext),

View File

@ -1,12 +1,10 @@
use std::sync::Arc;
pub mod assets;
pub mod editor;
pub mod file_finder;
pub mod language;
pub mod menus;
mod operation_queue;
mod rpc;
pub mod rpc;
pub mod settings;
mod sum_tree;
#[cfg(test)]
@ -20,7 +18,7 @@ mod worktree;
pub struct AppState {
pub settings: postage::watch::Receiver<settings::Settings>,
pub language_registry: std::sync::Arc<language::LanguageRegistry>,
pub rpc: Arc<zed_rpc::Peer>,
pub rpc: rpc::Client,
}
pub fn init(cx: &mut gpui::MutableAppContext) {

View File

@ -6,7 +6,7 @@ use log::LevelFilter;
use simplelog::SimpleLogger;
use std::{fs, path::PathBuf, sync::Arc};
use zed::{
self, assets, editor, file_finder, language, menus, settings,
self, assets, editor, file_finder, language, menus, rpc, settings,
workspace::{self, OpenParams},
AppState,
};
@ -23,7 +23,7 @@ fn main() {
let app_state = AppState {
language_registry,
settings,
rpc: zed_rpc::Peer::new(),
rpc: rpc::Client::new(),
};
app.run(move |cx| {

View File

@ -20,6 +20,12 @@ pub fn menus(state: AppState) -> Vec<Menu<'static>> {
action: "workspace:share_worktree",
arg: None,
},
MenuItem::Action {
name: "Join",
keystroke: None,
action: "workspace:join_worktree",
arg: None,
},
MenuItem::Action {
name: "Quit",
keystroke: Some("cmd-q"),

View File

@ -1,6 +1,199 @@
use super::util::SurfResultExt as _;
use anyhow::{anyhow, Context, Result};
use gpui::executor::Background;
use gpui::{AsyncAppContext, Task};
use lazy_static::lazy_static;
use postage::prelude::Stream;
use std::{future::Future, sync::Arc};
use zed_rpc::{proto, Peer, TypedEnvelope};
use smol::lock::Mutex;
use std::time::Duration;
use std::{convert::TryFrom, future::Future, sync::Arc};
use surf::Url;
use zed_rpc::{proto::RequestMessage, rest, Peer, TypedEnvelope};
pub use zed_rpc::{proto, ConnectionId};
lazy_static! {
static ref ZED_SERVER_URL: String =
std::env::var("ZED_SERVER_URL").unwrap_or("https://zed.dev".to_string());
}
#[derive(Clone)]
pub struct Client {
peer: Arc<Peer>,
state: Arc<Mutex<ClientState>>,
}
#[derive(Default)]
struct ClientState {
// TODO - allow multiple connections
connection_id: Option<ConnectionId>,
}
impl Client {
pub fn new() -> Self {
Self {
peer: Peer::new(),
state: Default::default(),
}
}
pub fn on_message<H, M>(&self, handler: H, cx: &mut gpui::MutableAppContext)
where
H: 'static + for<'a> MessageHandler<'a, M>,
M: proto::EnvelopedMessage,
{
let peer = self.peer.clone();
let mut messages = smol::block_on(peer.add_message_handler::<M>());
cx.spawn(|mut cx| async move {
while let Some(message) = messages.recv().await {
if let Err(err) = handler.handle(message, &peer, &mut cx).await {
log::error!("error handling message: {:?}", err);
}
}
})
.detach();
}
pub async fn connect_to_server(
&self,
cx: &AsyncAppContext,
executor: &Arc<Background>,
) -> surf::Result<ConnectionId> {
if let Some(connection_id) = self.state.lock().await.connection_id {
return Ok(connection_id);
}
let (user_id, access_token) = Self::login(cx.platform(), executor).await?;
let mut response = surf::get(format!(
"{}{}",
*ZED_SERVER_URL,
&rest::GET_RPC_ADDRESS_PATH
))
.header(
"Authorization",
http_auth_basic::Credentials::new(&user_id, &access_token).as_http_header(),
)
.await
.context("rpc address request failed")?;
let rest::GetRpcAddressResponse { address } = response
.body_json()
.await
.context("failed to parse rpc address response")?;
// TODO - If the `ZED_SERVER_URL` uses https, then wrap this stream in
// a TLS stream using `native-tls`.
let stream = smol::net::TcpStream::connect(&address).await?;
log::info!("connected to rpc address {}", address);
let connection_id = self.peer.add_connection(stream).await;
executor
.spawn(self.peer.handle_messages(connection_id))
.detach();
let auth_response = self
.peer
.request(
connection_id,
proto::Auth {
user_id: user_id.parse()?,
access_token,
},
)
.await
.context("rpc auth request failed")?;
if !auth_response.credentials_valid {
Err(anyhow!("failed to authenticate with RPC server"))?;
}
Ok(connection_id)
}
pub fn login(
platform: Arc<dyn gpui::Platform>,
executor: &Arc<gpui::executor::Background>,
) -> Task<Result<(String, String)>> {
let executor = executor.clone();
executor.clone().spawn(async move {
if let Some((user_id, access_token)) = platform.read_credentials(&ZED_SERVER_URL) {
log::info!("already signed in. user_id: {}", user_id);
return Ok((user_id, String::from_utf8(access_token).unwrap()));
}
// Generate a pair of asymmetric encryption keys. The public key will be used by the
// zed server to encrypt the user's access token, so that it can'be intercepted by
// any other app running on the user's device.
let (public_key, private_key) =
zed_rpc::auth::keypair().expect("failed to generate keypair for auth");
let public_key_string =
String::try_from(public_key).expect("failed to serialize public key for auth");
// Start an HTTP server to receive the redirect from Zed's sign-in page.
let server = tiny_http::Server::http("127.0.0.1:0").expect("failed to find open port");
let port = server.server_addr().port();
// Open the Zed sign-in page in the user's browser, with query parameters that indicate
// that the user is signing in from a Zed app running on the same device.
platform.open_url(&format!(
"{}/sign_in?native_app_port={}&native_app_public_key={}",
*ZED_SERVER_URL, port, public_key_string
));
// Receive the HTTP request from the user's browser. Retrieve the user id and encrypted
// access token from the query params.
//
// TODO - Avoid ever starting more than one HTTP server. Maybe switch to using a
// custom URL scheme instead of this local HTTP server.
let (user_id, access_token) = executor
.spawn(async move {
if let Some(req) = server.recv_timeout(Duration::from_secs(10 * 60))? {
let path = req.url();
let mut user_id = None;
let mut access_token = None;
let url = Url::parse(&format!("http://example.com{}", path))
.context("failed to parse login notification url")?;
for (key, value) in url.query_pairs() {
if key == "access_token" {
access_token = Some(value.to_string());
} else if key == "user_id" {
user_id = Some(value.to_string());
}
}
req.respond(
tiny_http::Response::from_string(LOGIN_RESPONSE).with_header(
tiny_http::Header::from_bytes("Content-Type", "text/html").unwrap(),
),
)
.context("failed to respond to login http request")?;
Ok((
user_id.ok_or_else(|| anyhow!("missing user_id parameter"))?,
access_token
.ok_or_else(|| anyhow!("missing access_token parameter"))?,
))
} else {
Err(anyhow!("didn't receive login redirect"))
}
})
.await?;
let access_token = private_key
.decrypt_string(&access_token)
.context("failed to decrypt access token")?;
platform.activate(true);
platform.write_credentials(&ZED_SERVER_URL, &user_id, access_token.as_bytes());
Ok((user_id.to_string(), access_token))
})
}
pub fn request<T: RequestMessage>(
&self,
connection_id: ConnectionId,
req: T,
) -> impl Future<Output = Result<T::Response>> {
self.peer.request(connection_id, req)
}
}
pub trait MessageHandler<'a, M: proto::EnvelopedMessage> {
type Output: 'a + Future<Output = anyhow::Result<()>>;
@ -31,28 +224,33 @@ where
}
}
pub trait PeerExt {
fn on_message<H, M>(&self, handler: H, cx: &mut gpui::MutableAppContext)
where
H: 'static + for<'a> MessageHandler<'a, M>,
M: proto::EnvelopedMessage;
const WORKTREE_URL_PREFIX: &'static str = "zed://worktrees/";
pub fn encode_worktree_url(id: u64, access_token: &str) -> String {
format!("{}{}/{}", WORKTREE_URL_PREFIX, id, access_token)
}
impl PeerExt for Arc<Peer> {
fn on_message<H, M>(&self, handler: H, cx: &mut gpui::MutableAppContext)
where
H: 'static + for<'a> MessageHandler<'a, M>,
M: proto::EnvelopedMessage,
{
let rpc = self.clone();
let mut messages = smol::block_on(self.add_message_handler::<M>());
cx.spawn(|mut cx| async move {
while let Some(message) = messages.recv().await {
if let Err(err) = handler.handle(message, &rpc, &mut cx).await {
log::error!("error handling message: {:?}", err);
}
}
})
.detach();
pub fn decode_worktree_url(url: &str) -> Option<(u64, String)> {
let path = url.strip_prefix(WORKTREE_URL_PREFIX)?;
let mut parts = path.split('/');
let id = parts.next()?.parse::<u64>().ok()?;
let access_token = parts.next()?;
if access_token.is_empty() {
return None;
}
Some((id, access_token.to_string()))
}
const LOGIN_RESPONSE: &'static str = "
<!DOCTYPE html>
<html>
<script>window.close();</script>
</html>
";
#[test]
fn test_encode_and_decode_worktree_url() {
let url = encode_worktree_url(5, "deadbeef");
assert_eq!(decode_worktree_url(&url), Some((5, "deadbeef".to_string())));
assert_eq!(decode_worktree_url("not://the-right-format"), None);
}

View File

@ -1,4 +1,4 @@
use crate::{language::LanguageRegistry, settings, time::ReplicaId, AppState};
use crate::{AppState, language::LanguageRegistry, rpc, settings, time::ReplicaId};
use ctor::ctor;
use gpui::AppContext;
use rand::Rng;
@ -150,6 +150,6 @@ pub fn build_app_state(cx: &AppContext) -> AppState {
AppState {
settings,
language_registry,
rpc: zed_rpc::Peer::new(),
rpc: rpc::Client::new(),
}
}

View File

@ -4,14 +4,13 @@ pub mod pane_group;
use crate::{
editor::{Buffer, Editor},
language::LanguageRegistry,
rpc::PeerExt as _,
rpc,
settings::Settings,
time::ReplicaId,
util::SurfResultExt as _,
worktree::{FileHandle, Worktree, WorktreeHandle},
AppState,
};
use anyhow::{anyhow, Context as _};
use anyhow::anyhow;
use gpui::{
color::rgbu, elements::*, json::to_string_pretty, keymap::Binding, AnyViewHandle, AppContext,
AsyncAppContext, ClipboardItem, Entity, ModelHandle, MutableAppContext, PathPromptOptions,
@ -24,22 +23,20 @@ use postage::watch;
use smol::prelude::*;
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
convert::TryFrom,
future::Future,
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
use surf::Url;
use zed_rpc::{proto, rest, Peer, TypedEnvelope};
use zed_rpc::{proto, Peer, TypedEnvelope};
pub fn init(cx: &mut MutableAppContext, rpc: Arc<Peer>) {
pub fn init(cx: &mut MutableAppContext, rpc: rpc::Client) {
cx.add_global_action("workspace:open", open);
cx.add_global_action("workspace:open_paths", open_paths);
cx.add_action("workspace:save", Workspace::save_active_item);
cx.add_action("workspace:debug_elements", Workspace::debug_elements);
cx.add_action("workspace:new_file", Workspace::open_new_file);
cx.add_action("workspace:share_worktree", Workspace::share_worktree);
cx.add_action("workspace:join_worktree", Workspace::join_worktree);
cx.add_bindings(vec![
Binding::new("cmd-s", "workspace:save", None),
Binding::new("cmd-alt-i", "workspace:debug_elements", None),
@ -312,7 +309,7 @@ pub struct State {
pub struct Workspace {
pub settings: watch::Receiver<Settings>,
language_registry: Arc<LanguageRegistry>,
rpc: Arc<Peer>,
rpc: rpc::Client,
modal: Option<AnyViewHandle>,
center: PaneGroup,
panes: Vec<ViewHandle<Pane>>,
@ -331,7 +328,7 @@ impl Workspace {
replica_id: ReplicaId,
settings: watch::Receiver<Settings>,
language_registry: Arc<LanguageRegistry>,
rpc: Arc<Peer>,
rpc: rpc::Client,
cx: &mut ViewContext<Self>,
) -> Self {
let pane = cx.add_view(|_| Pane::new(settings.clone()));
@ -665,47 +662,11 @@ impl Workspace {
fn share_worktree(&mut self, _: &(), cx: &mut ViewContext<Self>) {
let rpc = self.rpc.clone();
let zed_url = std::env::var("ZED_SERVER_URL").unwrap_or("https://zed.dev".to_string());
let executor = cx.background_executor().clone();
let platform = cx.platform();
let task = cx.spawn::<_, _, surf::Result<()>>(|this, mut cx| async move {
let (user_id, access_token) =
login(zed_url.clone(), cx.platform(), cx.background_executor()).await?;
let mut response = surf::get(format!("{}{}", &zed_url, &rest::GET_RPC_ADDRESS_PATH))
.header(
"Authorization",
http_auth_basic::Credentials::new(&user_id, &access_token).as_http_header(),
)
.await
.context("rpc address request failed")?;
let rest::GetRpcAddressResponse { address } = response
.body_json()
.await
.context("failed to parse rpc address response")?;
// TODO - If the `ZED_SERVER_URL` uses https, then wrap this stream in
// a TLS stream using `native-tls`.
let stream = smol::net::TcpStream::connect(&address).await?;
log::info!("connected to rpc address {}", address);
let connection_id = rpc.add_connection(stream).await;
executor.spawn(rpc.handle_messages(connection_id)).detach();
let auth_response = rpc
.request(
connection_id,
proto::Auth {
user_id: user_id.parse()?,
access_token,
},
)
.await
.context("rpc auth request failed")?;
if !auth_response.credentials_valid {
Err(anyhow!("failed to authenticate with RPC server"))?;
}
let task = cx.spawn(|this, mut cx| async move {
let connection_id = rpc.connect_to_server(&cx, &executor).await?;
let share_task = this.update(&mut cx, |this, cx| {
let worktree = this.worktrees.iter().next()?;
@ -713,9 +674,12 @@ impl Workspace {
});
if let Some(share_task) = share_task {
share_task.await?;
let (worktree_id, access_token) = share_task.await?;
let worktree_url = rpc::encode_worktree_url(worktree_id, &access_token);
log::info!("wrote worktree url to clipboard: {}", worktree_url);
platform.write_to_clipboard(ClipboardItem::new(worktree_url));
}
Ok(())
surf::Result::Ok(())
});
cx.spawn(|_, _| async move {
@ -726,6 +690,43 @@ impl Workspace {
.detach();
}
fn join_worktree(&mut self, _: &(), cx: &mut ViewContext<Self>) {
let rpc = self.rpc.clone();
let executor = cx.background_executor().clone();
let task = cx.spawn(|_, cx| async move {
let connection_id = rpc.connect_to_server(&cx, &executor).await?;
let worktree_url = cx
.platform()
.read_from_clipboard()
.ok_or_else(|| anyhow!("failed to read url from clipboard"))?;
let (worktree_id, access_token) = rpc::decode_worktree_url(worktree_url.text())
.ok_or_else(|| anyhow!("failed to decode worktree url"))?;
log::info!("read worktree url from clipboard: {}", worktree_url.text());
let open_worktree_response = rpc
.request(
connection_id,
proto::OpenWorktree {
worktree_id,
access_token,
},
)
.await?;
log::info!("joined worktree: {:?}", open_worktree_response);
surf::Result::Ok(())
});
cx.spawn(|_, _| async move {
if let Err(e) = task.await {
log::error!("joing failed: {}", e);
}
})
.detach();
}
fn add_pane(&mut self, cx: &mut ViewContext<Self>) -> ViewHandle<Pane> {
let pane = cx.add_view(|_| Pane::new(self.settings.clone()));
let pane_id = pane.id();
@ -858,86 +859,6 @@ impl WorkspaceHandle for ViewHandle<Workspace> {
}
}
fn login(
zed_url: String,
platform: Arc<dyn gpui::Platform>,
executor: Arc<gpui::executor::Background>,
) -> Task<anyhow::Result<(String, String)>> {
executor.clone().spawn(async move {
if let Some((user_id, access_token)) = platform.read_credentials(&zed_url) {
log::info!("already signed in. user_id: {}", user_id);
return Ok((user_id, String::from_utf8(access_token).unwrap()));
}
// Generate a pair of asymmetric encryption keys. The public key will be used by the
// zed server to encrypt the user's access token, so that it can'be intercepted by
// any other app running on the user's device.
let (public_key, private_key) =
zed_rpc::auth::keypair().expect("failed to generate keypair for auth");
let public_key_string =
String::try_from(public_key).expect("failed to serialize public key for auth");
// Start an HTTP server to receive the redirect from Zed's sign-in page.
let server = tiny_http::Server::http("127.0.0.1:0").expect("failed to find open port");
let port = server.server_addr().port();
// Open the Zed sign-in page in the user's browser, with query parameters that indicate
// that the user is signing in from a Zed app running on the same device.
platform.open_url(&format!(
"{}/sign_in?native_app_port={}&native_app_public_key={}",
zed_url, port, public_key_string
));
// Receive the HTTP request from the user's browser. Retrieve the user id and encrypted
// access token from the query params.
//
// TODO - Avoid ever starting more than one HTTP server. Maybe switch to using a
// custom URL scheme instead of this local HTTP server.
let (user_id, access_token) = executor
.spawn::<anyhow::Result<_>, _>(async move {
if let Some(req) = server.recv_timeout(Duration::from_secs(10 * 60))? {
let path = req.url();
let mut user_id = None;
let mut access_token = None;
let url = Url::parse(&format!("http://example.com{}", path))
.context("failed to parse login notification url")?;
for (key, value) in url.query_pairs() {
if key == "access_token" {
access_token = Some(value.to_string());
} else if key == "user_id" {
user_id = Some(value.to_string());
}
}
req.respond(
tiny_http::Response::from_string(LOGIN_RESPONSE).with_header(
tiny_http::Header::from_bytes("Content-Type", "text/html").unwrap(),
),
)
.context("failed to respond to login http request")?;
Ok(user_id.zip(access_token))
} else {
Ok(None)
}
})
.await?
.ok_or_else(|| anyhow!(""))?;
let access_token = private_key
.decrypt_string(&access_token)
.context("failed to decrypt access token")?;
platform.activate(true);
platform.write_credentials(&zed_url, &user_id, access_token.as_bytes());
Ok((user_id.to_string(), access_token))
})
}
const LOGIN_RESPONSE: &'static str = "
<!DOCTYPE html>
<html>
<script>window.close();</script>
</html>
";
#[cfg(test)]
mod tests {
use super::*;

View File

@ -4,6 +4,7 @@ mod ignore;
use crate::{
editor::{History, Rope},
rpc::{self, proto, ConnectionId},
sum_tree::{self, Cursor, Edit, SumTree},
util::Bias,
};
@ -31,7 +32,6 @@ use std::{
sync::{Arc, Weak},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use zed_rpc::{proto, ConnectionId, Peer};
use self::{char_bag::CharBag, ignore::IgnoreStack};
@ -53,7 +53,7 @@ pub struct Worktree {
scan_state: (watch::Sender<ScanState>, watch::Receiver<ScanState>),
_event_stream_handle: fsevent::Handle,
poll_scheduled: bool,
rpc: Option<Arc<Peer>>,
rpc: Option<rpc::Client>,
}
#[derive(Clone, Debug)]
@ -227,10 +227,10 @@ impl Worktree {
pub fn share(
&mut self,
client: Arc<Peer>,
client: rpc::Client,
connection_id: ConnectionId,
cx: &mut ModelContext<Self>,
) -> Task<anyhow::Result<()>> {
) -> Task<anyhow::Result<(u64, String)>> {
self.rpc = Some(client.clone());
let snapshot = self.snapshot();
cx.spawn(|_this, cx| async move {
@ -254,7 +254,7 @@ impl Worktree {
.await?;
log::info!("sharing worktree {:?}", share_response);
Ok(())
Ok((share_response.worktree_id, share_response.access_token))
})
}
}