Introduce IpcSenderWithContext

This commit is contained in:
Kunal Mohan 2021-02-11 23:07:54 +05:30
parent 715e5f9785
commit 685e2eef0c
4 changed files with 190 additions and 170 deletions

View File

@ -122,6 +122,37 @@ thread_local!(
/// stack in the form of an [`ErrorContext`].
static OPENCALLS: RefCell<ErrorContext> = RefCell::default()
);
pub struct IpcSenderWithContext {
err_ctx: ErrorContext,
sender: UnixStream,
}
impl IpcSenderWithContext {
pub fn new() -> Self {
Self {
err_ctx: ErrorContext::new(),
sender: UnixStream::connect(ZELLIJ_IPC_PIPE).unwrap(),
}
}
pub fn update(&mut self, ctx: ErrorContext) {
self.err_ctx = ctx;
}
pub fn send(&mut self, msg: ApiCommand) -> std::io::Result<()> {
let command = bincode::serialize(&(self.err_ctx, msg)).unwrap();
self.sender.write_all(&command)
}
}
impl std::clone::Clone for IpcSenderWithContext {
fn clone(&self) -> Self {
Self {
err_ctx: self.err_ctx,
sender: UnixStream::connect(ZELLIJ_IPC_PIPE).unwrap(),
}
}
}
task_local! {
/// A key to some task local storage that holds a representation of the task's call
@ -512,7 +543,7 @@ pub fn start(mut os_input: Box<dyn OsApi>, opts: CliArgs, config: Config) {
}
});
let mut server_stream = UnixStream::connect(ZELLIJ_IPC_PIPE).unwrap();
let mut send_server_instructions = IpcSenderWithContext::new();
#[warn(clippy::never_loop)]
loop {
@ -522,13 +553,13 @@ pub fn start(mut os_input: Box<dyn OsApi>, opts: CliArgs, config: Config) {
err_ctx.add_call(ContextType::App(AppContext::from(&app_instruction)));
send_screen_instructions.update(err_ctx);
send_server_instructions.update(err_ctx);
match app_instruction {
AppInstruction::Exit => {
break;
}
AppInstruction::Error(backtrace) => {
let api_command = bincode::serialize(&(err_ctx, ApiCommand::Quit)).unwrap();
server_stream.write_all(&api_command).unwrap();
let _ = send_server_instructions.send(ApiCommand::Quit);
let _ = ipc_thread.join();
let _ = send_screen_instructions.send(ScreenInstruction::Quit);
let _ = screen_thread.join();
@ -554,15 +585,12 @@ pub fn start(mut os_input: Box<dyn OsApi>, opts: CliArgs, config: Config) {
send_plugin_instructions.send(instruction).unwrap();
}
AppInstruction::ToPty(instruction) => {
let api_command =
bincode::serialize(&(err_ctx, ApiCommand::ToPty(instruction))).unwrap();
server_stream.write_all(&api_command).unwrap();
let _ = send_server_instructions.send(ApiCommand::ToPty(instruction));
}
}
}
let api_command = bincode::serialize(&(err_ctx, ApiCommand::Quit)).unwrap();
server_stream.write_all(&api_command).unwrap();
let _ = send_server_instructions.send(ApiCommand::Quit);
let _ = ipc_thread.join().unwrap();
let _ = send_screen_instructions.send(ScreenInstruction::Quit);
screen_thread.join().unwrap();

View File

@ -8,14 +8,12 @@ use ::std::sync::mpsc::Receiver;
use ::std::time::{Duration, Instant};
use ::vte;
use serde::{Deserialize, Serialize};
use std::io::Write;
use std::os::unix::net::UnixStream;
use std::path::PathBuf;
use super::{ScreenInstruction, OPENCALLS};
use super::{IpcSenderWithContext, ScreenInstruction, OPENCALLS};
use crate::layout::Layout;
use crate::os_input_output::OsApi;
use crate::utils::{consts::ZELLIJ_IPC_PIPE, logging::debug_to_file};
use crate::utils::logging::debug_to_file;
use crate::{
common::ApiCommand,
errors::{ContextType, ErrorContext},
@ -83,94 +81,94 @@ pub enum VteEvent {
struct VteEventSender {
id: RawFd,
err_ctx: ErrorContext,
server_stream: UnixStream,
send_server_instructions: IpcSenderWithContext,
}
impl VteEventSender {
pub fn new(id: RawFd, err_ctx: ErrorContext) -> Self {
pub fn new(id: RawFd, send_server_instructions: IpcSenderWithContext) -> Self {
VteEventSender {
id,
err_ctx,
server_stream: UnixStream::connect(ZELLIJ_IPC_PIPE).unwrap(),
send_server_instructions,
}
}
}
impl vte::Perform for VteEventSender {
fn print(&mut self, c: char) {
let api_command = bincode::serialize(&(
self.err_ctx,
ApiCommand::ToScreen(ScreenInstruction::Pty(self.id, VteEvent::Print(c))),
))
self.send_server_instructions
.send(ApiCommand::ToScreen(ScreenInstruction::Pty(
self.id,
VteEvent::Print(c),
)))
.unwrap();
self.server_stream.write_all(&api_command).unwrap();
}
fn execute(&mut self, byte: u8) {
let api_command = bincode::serialize(&(
self.err_ctx,
ApiCommand::ToScreen(ScreenInstruction::Pty(self.id, VteEvent::Execute(byte))),
))
self.send_server_instructions
.send(ApiCommand::ToScreen(ScreenInstruction::Pty(
self.id,
VteEvent::Execute(byte),
)))
.unwrap();
self.server_stream.write_all(&api_command).unwrap();
}
fn hook(&mut self, params: &[i64], intermediates: &[u8], ignore: bool, c: char) {
let params = params.iter().copied().collect();
let intermediates = intermediates.iter().copied().collect();
let instruction =
ScreenInstruction::Pty(self.id, VteEvent::Hook(params, intermediates, ignore, c));
let api_command =
bincode::serialize(&(self.err_ctx, ApiCommand::ToScreen(instruction))).unwrap();
self.server_stream.write_all(&api_command).unwrap();
self.send_server_instructions
.send(ApiCommand::ToScreen(ScreenInstruction::Pty(
self.id,
VteEvent::Hook(params, intermediates, ignore, c),
)))
.unwrap();
}
fn put(&mut self, byte: u8) {
let api_command = bincode::serialize(&(
self.err_ctx,
ApiCommand::ToScreen(ScreenInstruction::Pty(self.id, VteEvent::Put(byte))),
))
self.send_server_instructions
.send(ApiCommand::ToScreen(ScreenInstruction::Pty(
self.id,
VteEvent::Put(byte),
)))
.unwrap();
self.server_stream.write_all(&api_command).unwrap();
}
fn unhook(&mut self) {
let api_command = bincode::serialize(&(
self.err_ctx,
ApiCommand::ToScreen(ScreenInstruction::Pty(self.id, VteEvent::Unhook)),
))
self.send_server_instructions
.send(ApiCommand::ToScreen(ScreenInstruction::Pty(
self.id,
VteEvent::Unhook,
)))
.unwrap();
self.server_stream.write_all(&api_command).unwrap();
}
fn osc_dispatch(&mut self, params: &[&[u8]], bell_terminated: bool) {
let params = params.iter().map(|p| p.to_vec()).collect();
let instruction =
ScreenInstruction::Pty(self.id, VteEvent::OscDispatch(params, bell_terminated));
let api_command =
bincode::serialize(&(self.err_ctx, ApiCommand::ToScreen(instruction))).unwrap();
self.server_stream.write_all(&api_command).unwrap();
self.send_server_instructions
.send(ApiCommand::ToScreen(ScreenInstruction::Pty(
self.id,
VteEvent::OscDispatch(params, bell_terminated),
)))
.unwrap();
}
fn csi_dispatch(&mut self, params: &[i64], intermediates: &[u8], ignore: bool, c: char) {
let params = params.iter().copied().collect();
let intermediates = intermediates.iter().copied().collect();
let instruction = ScreenInstruction::Pty(
self.send_server_instructions
.send(ApiCommand::ToScreen(ScreenInstruction::Pty(
self.id,
VteEvent::CsiDispatch(params, intermediates, ignore, c),
);
let api_command =
bincode::serialize(&(self.err_ctx, ApiCommand::ToScreen(instruction))).unwrap();
self.server_stream.write_all(&api_command).unwrap();
)))
.unwrap();
}
fn esc_dispatch(&mut self, intermediates: &[u8], ignore: bool, byte: u8) {
let intermediates = intermediates.iter().copied().collect();
let instruction =
ScreenInstruction::Pty(self.id, VteEvent::EscDispatch(intermediates, ignore, byte));
let api_command =
bincode::serialize(&(self.err_ctx, ApiCommand::ToScreen(instruction))).unwrap();
self.server_stream.write_all(&api_command).unwrap();
self.send_server_instructions
.send(ApiCommand::ToScreen(ScreenInstruction::Pty(
self.id,
VteEvent::EscDispatch(intermediates, ignore, byte),
)))
.unwrap();
}
}
@ -192,7 +190,7 @@ pub struct PtyBus {
os_input: Box<dyn OsApi>,
debug_to_file: bool,
task_handles: HashMap<RawFd, JoinHandle<()>>,
pub server_stream: UnixStream,
pub send_server_instructions: IpcSenderWithContext,
}
fn stream_terminal_bytes(pid: RawFd, os_input: Box<dyn OsApi>, debug: bool) -> JoinHandle<()> {
@ -200,9 +198,10 @@ fn stream_terminal_bytes(pid: RawFd, os_input: Box<dyn OsApi>, debug: bool) -> J
task::spawn({
async move {
err_ctx.add_call(ContextType::AsyncTask);
let mut server_stream = UnixStream::connect(ZELLIJ_IPC_PIPE).unwrap();
let mut send_server_instructions = IpcSenderWithContext::new();
send_server_instructions.update(err_ctx);
let mut vte_parser = vte::Parser::new();
let mut vte_event_sender = VteEventSender::new(pid, err_ctx);
let mut vte_event_sender = VteEventSender::new(pid, send_server_instructions.clone());
let mut terminal_bytes = ReadFromPid::new(&pid, os_input);
let mut last_byte_receive_time: Option<Instant> = None;
@ -228,12 +227,9 @@ fn stream_terminal_bytes(pid: RawFd, os_input: Box<dyn OsApi>, debug: bool) -> J
Some(receive_time) => {
if receive_time.elapsed() > max_render_pause {
pending_render = false;
let api_command = bincode::serialize(&(
err_ctx,
ApiCommand::ToScreen(ScreenInstruction::Render),
))
send_server_instructions
.send(ApiCommand::ToScreen(ScreenInstruction::Render))
.unwrap();
server_stream.write_all(&api_command).unwrap();
last_byte_receive_time = Some(Instant::now());
} else {
pending_render = true;
@ -247,31 +243,26 @@ fn stream_terminal_bytes(pid: RawFd, os_input: Box<dyn OsApi>, debug: bool) -> J
} else {
if pending_render {
pending_render = false;
let api_command = bincode::serialize(&(
err_ctx,
ApiCommand::ToScreen(ScreenInstruction::Render),
))
send_server_instructions
.send(ApiCommand::ToScreen(ScreenInstruction::Render))
.unwrap();
server_stream.write_all(&api_command).unwrap();
}
last_byte_receive_time = None;
task::sleep(::std::time::Duration::from_millis(10)).await;
}
}
let api_command =
bincode::serialize(&(err_ctx, ApiCommand::ToScreen(ScreenInstruction::Render)))
send_server_instructions
.send(ApiCommand::ToScreen(ScreenInstruction::Render))
.unwrap();
server_stream.write_all(&api_command).unwrap();
#[cfg(not(test))]
// this is a little hacky, and is because the tests end the file as soon as
// we read everything, rather than hanging until there is new data
// a better solution would be to fix the test fakes, but this will do for now
let api_command = bincode::serialize(&(
err_ctx,
ApiCommand::ToScreen(ScreenInstruction::ClosePane(PaneId::Terminal(pid))),
))
send_server_instructions
.send(ApiCommand::ToScreen(ScreenInstruction::ClosePane(
PaneId::Terminal(pid),
)))
.unwrap();
server_stream.write_all(&api_command).unwrap();
}
})
}
@ -280,7 +271,7 @@ impl PtyBus {
pub fn new(
receive_pty_instructions: Receiver<(PtyInstruction, ErrorContext)>,
os_input: Box<dyn OsApi>,
server_stream: UnixStream,
send_server_instructions: IpcSenderWithContext,
debug_to_file: bool,
) -> Self {
PtyBus {
@ -289,7 +280,7 @@ impl PtyBus {
id_to_child_pid: HashMap::new(),
debug_to_file,
task_handles: HashMap::new(),
server_stream,
send_server_instructions,
}
}
pub fn spawn_terminal(&mut self, file_to_open: Option<PathBuf>) -> RawFd {
@ -309,15 +300,12 @@ impl PtyBus {
self.id_to_child_pid.insert(pid_primary, pid_secondary);
new_pane_pids.push(pid_primary);
}
let api_command = bincode::serialize(&(
err_ctx,
ApiCommand::ToScreen(ScreenInstruction::ApplyLayout((
self.send_server_instructions
.send(ApiCommand::ToScreen(ScreenInstruction::ApplyLayout((
layout,
new_pane_pids.clone(),
))),
))
))))
.unwrap();
self.server_stream.write_all(&api_command).unwrap();
for id in new_pane_pids {
let task_handle = stream_terminal_bytes(id, self.os_input.clone(), self.debug_to_file);
self.task_handles.insert(id, task_handle);
@ -333,11 +321,10 @@ impl PtyBus {
handle.cancel().await;
});
}
PaneId::Plugin(pid) => {
let api_command =
bincode::serialize(&(err_ctx, ApiCommand::ClosePluginPane(pid))).unwrap();
self.server_stream.write_all(&api_command).unwrap();
}
PaneId::Plugin(pid) => self
.send_server_instructions
.send(ApiCommand::ClosePluginPane(pid))
.unwrap(),
}
}
pub fn close_tab(&mut self, ids: Vec<PaneId>, err_ctx: ErrorContext) {

View File

@ -3,12 +3,20 @@ mod client;
mod common;
mod server;
use client::{boundaries, layout, panes, tab};
use common::{
command_is_executing, errors, ipc, os_input_output, pty_bus, screen, start, utils, wasm_vm,
ApiCommand, IpcSenderWithContext,
};
use directories_next::ProjectDirs;
use structopt::StructOpt;
use crate::cli::CliArgs;
use crate::command_is_executing::CommandIsExecuting;
use crate::errors::ErrorContext;
use crate::os_input_output::get_os_input;
use crate::utils::{
consts::{ZELLIJ_IPC_PIPE, ZELLIJ_TMP_DIR, ZELLIJ_TMP_LOG_DIR},
consts::{ZELLIJ_TMP_DIR, ZELLIJ_TMP_LOG_DIR},
logging::*,
};
use client::{boundaries, layout, panes, tab};
@ -22,6 +30,27 @@ use std::os::unix::net::UnixStream;
use structopt::StructOpt;
pub fn main() {
// First run installation of default plugins & layouts
let project_dirs = ProjectDirs::from("org", "Zellij Contributors", "Zellij").unwrap();
let data_dir = project_dirs.data_dir();
let mut assets = asset_map! {
"assets/layouts/default.yaml" => "layouts/default.yaml",
"assets/layouts/strider.yaml" => "layouts/strider.yaml",
};
assets.extend(asset_map! {
"assets/plugins/status-bar.wasm" => "plugins/status-bar.wasm",
"assets/plugins/tab-bar.wasm" => "plugins/tab-bar.wasm",
"assets/plugins/strider.wasm" => "plugins/strider.wasm",
});
for (path, bytes) in assets {
let path = data_dir.join(path);
std::fs::create_dir_all(path.parent().unwrap()).unwrap();
if !path.exists() {
std::fs::write(path, bytes).expect("Failed to install default assets!");
}
}
let opts = CliArgs::from_args();
let config = match Config::try_from(&opts) {
Ok(config) => config,
@ -33,48 +62,29 @@ pub fn main() {
if let Some(split_dir) = opts.split {
match split_dir {
'h' => {
let mut stream = UnixStream::connect(ZELLIJ_IPC_PIPE).unwrap();
let api_command =
bincode::serialize(&(ErrorContext::new(), ApiCommand::SplitHorizontally))
let mut send_server_instructions = IpcSenderWithContext::new();
send_server_instructions
.send(ApiCommand::SplitHorizontally)
.unwrap();
stream.write_all(&api_command).unwrap();
}
'v' => {
let mut stream = UnixStream::connect(ZELLIJ_IPC_PIPE).unwrap();
let api_command =
bincode::serialize(&(ErrorContext::new(), ApiCommand::SplitVertically))
let mut send_server_instructions = IpcSenderWithContext::new();
send_server_instructions
.send(ApiCommand::SplitVertically)
.unwrap();
stream.write_all(&api_command).unwrap();
}
_ => {}
};
} else if opts.move_focus {
let mut stream = UnixStream::connect(ZELLIJ_IPC_PIPE).unwrap();
let api_command =
bincode::serialize(&(ErrorContext::new(), ApiCommand::MoveFocus)).unwrap();
stream.write_all(&api_command).unwrap();
let mut send_server_instructions = IpcSenderWithContext::new();
send_server_instructions
.send(ApiCommand::MoveFocus)
.unwrap();
} else if let Some(file_to_open) = opts.open_file {
let mut stream = UnixStream::connect(ZELLIJ_IPC_PIPE).unwrap();
let api_command =
bincode::serialize(&(ErrorContext::new(), ApiCommand::OpenFile(file_to_open))).unwrap();
stream.write_all(&api_command).unwrap();
} else if let Some(crate::cli::ConfigCli::GenerateCompletion { shell }) = opts.option {
let shell = match shell.as_ref() {
"bash" => structopt::clap::Shell::Bash,
"fish" => structopt::clap::Shell::Fish,
"zsh" => structopt::clap::Shell::Zsh,
"powerShell" => structopt::clap::Shell::PowerShell,
"elvish" => structopt::clap::Shell::Elvish,
other => {
eprintln!("Unsupported shell: {}", other);
std::process::exit(1);
}
};
let mut out = std::io::stdout();
CliArgs::clap().gen_completions_to("zellij", shell, &mut out);
} else if let Some(crate::cli::ConfigCli::Setup { .. }) = opts.option {
setup::dump_default_config().expect("Failed to print to stdout");
std::process::exit(1);
let mut send_server_instructions = IpcSenderWithContext::new();
send_server_instructions
.send(ApiCommand::OpenFile(file_to_open))
.unwrap();
} else {
let os_input = get_os_input();
atomic_create_dir(ZELLIJ_TMP_DIR).unwrap();

View File

@ -1,7 +1,8 @@
use crate::cli::CliArgs;
use crate::command_is_executing::CommandIsExecuting;
use crate::common::{
ApiCommand, AppInstruction, ChannelWithContext, SenderType, SenderWithContext,
ApiCommand, AppInstruction, ChannelWithContext, IpcSenderWithContext, SenderType,
SenderWithContext,
};
use crate::errors::{ContextType, ErrorContext, PtyContext};
use crate::layout::Layout;
@ -11,8 +12,7 @@ use crate::pty_bus::{PtyBus, PtyInstruction};
use crate::screen::ScreenInstruction;
use crate::utils::consts::ZELLIJ_IPC_PIPE;
use crate::wasm_vm::PluginInstruction;
use std::io::{Read, Write};
use std::os::unix::net::UnixStream;
use std::io::Read;
use std::path::PathBuf;
use std::sync::mpsc::channel;
use std::thread;
@ -41,11 +41,12 @@ pub fn start_server(
let default_layout = None;
let maybe_layout = opts.layout.or(default_layout).map(Layout::new);
let server_stream = UnixStream::connect(ZELLIJ_IPC_PIPE).unwrap();
let send_server_instructions = IpcSenderWithContext::new();
let mut pty_bus = PtyBus::new(
receive_pty_instructions,
os_input.clone(),
server_stream,
send_server_instructions,
opts.debug,
);
@ -63,46 +64,40 @@ pub fn start_server(
match event {
PtyInstruction::SpawnTerminal(file_to_open) => {
let pid = pty_bus.spawn_terminal(file_to_open);
let api_command = bincode::serialize(&(
err_ctx,
ApiCommand::ToScreen(ScreenInstruction::NewPane(PaneId::Terminal(pid))),
))
pty_bus
.send_server_instructions
.send(ApiCommand::ToScreen(ScreenInstruction::NewPane(
PaneId::Terminal(pid),
)))
.unwrap();
pty_bus.server_stream.write_all(&api_command).unwrap();
}
PtyInstruction::SpawnTerminalVertically(file_to_open) => {
let pid = pty_bus.spawn_terminal(file_to_open);
let api_command = bincode::serialize(&(
err_ctx,
ApiCommand::ToScreen(ScreenInstruction::VerticalSplit(
pty_bus
.send_server_instructions
.send(ApiCommand::ToScreen(ScreenInstruction::VerticalSplit(
PaneId::Terminal(pid),
)),
))
)))
.unwrap();
pty_bus.server_stream.write_all(&api_command).unwrap();
}
PtyInstruction::SpawnTerminalHorizontally(file_to_open) => {
let pid = pty_bus.spawn_terminal(file_to_open);
let api_command = bincode::serialize(&(
err_ctx,
ApiCommand::ToScreen(ScreenInstruction::HorizontalSplit(
pty_bus
.send_server_instructions
.send(ApiCommand::ToScreen(ScreenInstruction::HorizontalSplit(
PaneId::Terminal(pid),
)),
))
)))
.unwrap();
pty_bus.server_stream.write_all(&api_command).unwrap();
}
PtyInstruction::NewTab => {
if let Some(layout) = maybe_layout.clone() {
pty_bus.spawn_terminals_for_layout(layout, err_ctx);
} else {
let pid = pty_bus.spawn_terminal(None);
let api_command = bincode::serialize(&(
err_ctx,
ApiCommand::ToScreen(ScreenInstruction::NewTab(pid)),
))
pty_bus
.send_server_instructions
.send(ApiCommand::ToScreen(ScreenInstruction::NewTab(pid)))
.unwrap();
pty_bus.server_stream.write_all(&api_command).unwrap();
}
}
PtyInstruction::ClosePane(id) => {