refactor: wrap Opener with Cow to avoid unnecessary memory reallocations when opening files (#805)

This commit is contained in:
三咲雅 · Misaki Masa 2024-03-12 23:35:26 +08:00 committed by GitHub
parent 78b98a98c3
commit cddd0727ef
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 433 additions and 320 deletions

3
Cargo.lock generated
View File

@ -2810,7 +2810,6 @@ dependencies = [
"anyhow",
"crossterm",
"futures",
"libc",
"md-5",
"mlua",
"parking_lot",
@ -2859,10 +2858,12 @@ dependencies = [
"base64 0.22.0",
"crossterm",
"futures",
"libc",
"parking_lot",
"regex",
"tokio",
"tokio-stream",
"tokio-util",
"tracing",
"trash",
"yazi-adaptor",

View File

@ -46,7 +46,7 @@ impl Ueberzug {
tx.send(Some((path.to_path_buf(), rect)))?;
Adaptor::shown_store(rect, (0, 0));
} else {
bail!("uninitialized ueberzug");
bail!("uninitialized ueberzugpp");
}
let path = path.to_owned();
@ -66,12 +66,13 @@ impl Ueberzug {
if let Some(tx) = &*DEMON {
Ok(tx.send(None)?)
} else {
bail!("uninitialized ueberzug");
bail!("uninitialized ueberzugpp");
}
}
fn create_demon(adaptor: Adaptor) -> Result<Child> {
let result = Command::new("ueberzug")
// TODO: demon
let result = Command::new("ueberzugpp")
.args(["layer", "-so", &adaptor.to_string()])
.kill_on_drop(true)
.stdin(Stdio::piped())
@ -79,7 +80,7 @@ impl Ueberzug {
.spawn();
if let Err(ref e) = result {
warn!("ueberzug spawning failed: {}", e);
warn!("ueberzugpp spawning failed: {e}");
}
Ok(result?)
}
@ -98,9 +99,9 @@ impl Ueberzug {
async fn send_command(child: &mut Child, cmd: Option<(PathBuf, Rect)>) -> Result<()> {
let stdin = child.stdin.as_mut().unwrap();
if let Some((path, rect)) = cmd {
debug!("ueberzug rect before adjustment: {:?}", rect);
debug!("ueberzugpp rect before adjustment: {:?}", rect);
let rect = Self::adjust_rect(rect);
debug!("ueberzug rect after adjustment: {:?}", rect);
debug!("ueberzugpp rect after adjustment: {:?}", rect);
let s = format!(
r#"{{"action":"add","identifier":"yazi","x":{},"y":{},"max_width":{},"max_height":{},"path":"{}"}}{}"#,
@ -111,10 +112,10 @@ impl Ueberzug {
path.to_string_lossy(),
"\n"
);
debug!("ueberzug command: {}", s);
debug!("ueberzugpp command: {}", s);
stdin.write_all(s.as_bytes()).await?;
} else {
debug!("ueberzug command: remove");
debug!("ueberzugpp command: remove");
stdin
.write_all(format!(r#"{{"action":"remove","identifier":"yazi"}}{}"#, "\n").as_bytes())
.await?;

View File

@ -74,8 +74,11 @@ impl Boot {
println!(" WAYLAND_DISPLAY: {:?}", env::var_os("WAYLAND_DISPLAY"));
println!(" DISPLAY: {:?}", env::var_os("DISPLAY"));
println!("\nUeberzug");
println!(" Version: {:?}", std::process::Command::new("ueberzug").arg("--version").output());
println!("\nUeberzug++");
println!(
" Version: {:?}",
std::process::Command::new("ueberzugpp").arg("--version").output()
);
println!("\nWSL");
println!(

View File

@ -106,18 +106,18 @@ keymap = [
{ on = [ "N" ], run = "find_arrow --previous", desc = "Go to previous found file" },
# Sorting
{ on = [ ",", "m" ], run = "sort modified --dir-first", desc = "Sort by modified time" },
{ on = [ ",", "M" ], run = "sort modified --reverse --dir-first", desc = "Sort by modified time (reverse)" },
{ on = [ ",", "c" ], run = "sort created --dir-first", desc = "Sort by created time" },
{ on = [ ",", "C" ], run = "sort created --reverse --dir-first", desc = "Sort by created time (reverse)" },
{ on = [ ",", "e" ], run = "sort extension --dir-first", desc = "Sort by extension" },
{ on = [ ",", "E" ], run = "sort extension --reverse --dir-first", desc = "Sort by extension (reverse)" },
{ on = [ ",", "a" ], run = "sort alphabetical --dir-first", desc = "Sort alphabetically" },
{ on = [ ",", "A" ], run = "sort alphabetical --reverse --dir-first", desc = "Sort alphabetically (reverse)" },
{ on = [ ",", "n" ], run = "sort natural --dir-first", desc = "Sort naturally" },
{ on = [ ",", "N" ], run = "sort natural --reverse --dir-first", desc = "Sort naturally (reverse)" },
{ on = [ ",", "s" ], run = "sort size --dir-first", desc = "Sort by size" },
{ on = [ ",", "S" ], run = "sort size --reverse --dir-first", desc = "Sort by size (reverse)" },
{ on = [ ",", "m" ], run = "sort modified", desc = "Sort by modified time" },
{ on = [ ",", "M" ], run = "sort modified --reverse", desc = "Sort by modified time (reverse)" },
{ on = [ ",", "c" ], run = "sort created", desc = "Sort by created time" },
{ on = [ ",", "C" ], run = "sort created --reverse", desc = "Sort by created time (reverse)" },
{ on = [ ",", "e" ], run = "sort extension", desc = "Sort by extension" },
{ on = [ ",", "E" ], run = "sort extension --reverse", desc = "Sort by extension (reverse)" },
{ on = [ ",", "a" ], run = "sort alphabetical", desc = "Sort alphabetically" },
{ on = [ ",", "A" ], run = "sort alphabetical --reverse", desc = "Sort alphabetically (reverse)" },
{ on = [ ",", "n" ], run = "sort natural", desc = "Sort naturally" },
{ on = [ ",", "N" ], run = "sort natural --reverse", desc = "Sort naturally (reverse)" },
{ on = [ ",", "s" ], run = "sort size", desc = "Sort by size" },
{ on = [ ",", "S" ], run = "sort size --reverse", desc = "Sort by size (reverse)" },
# Tabs
{ on = [ "t" ], run = "tab_create --current", desc = "Create a new tab using the current path" },

View File

@ -1,10 +1,9 @@
use std::{collections::HashMap, ffi::{OsStr, OsString}, io::{stdout, BufWriter, Write}, path::PathBuf};
use std::{borrow::Cow, collections::HashMap, ffi::{OsStr, OsString}, io::{stdout, BufWriter, Write}, path::PathBuf};
use anyhow::{anyhow, Result};
use tokio::{fs::{self, OpenOptions}, io::{stdin, AsyncReadExt, AsyncWriteExt}};
use yazi_config::{OPEN, PREVIEW};
use yazi_plugin::external::{self, ShellOpt};
use yazi_proxy::{AppProxy, HIDER, WATCHER};
use yazi_proxy::{AppProxy, TasksProxy, HIDER, WATCHER};
use yazi_shared::{fs::{accessible, max_common_root, File, FilesOp, Url}, term::Term, Defer};
use crate::manager::Manager;
@ -32,18 +31,13 @@ impl Manager {
.write_all(s.as_encoded_bytes())
.await?;
let _permit = HIDER.acquire().await.unwrap();
let _defer1 = Defer::new(AppProxy::resume);
let _defer2 = Defer::new(|| tokio::spawn(fs::remove_file(tmp.clone())));
AppProxy::stop().await;
let _defer1 = Defer::new(|| tokio::spawn(fs::remove_file(tmp.clone())));
TasksProxy::process_exec(vec![OsString::new(), tmp.to_owned().into()], Cow::Borrowed(opener))
.await;
let mut child = external::shell(ShellOpt {
cmd: (*opener.run).into(),
args: vec![OsString::new(), tmp.to_owned().into()],
piped: false,
orphan: false,
})?;
child.wait().await?;
let _permit = HIDER.acquire().await.unwrap();
let _defer2 = Defer::new(AppProxy::resume);
AppProxy::stop().await;
let new: Vec<_> = fs::read_to_string(&tmp).await?.lines().map(PathBuf::from).collect();
Self::bulk_rename_do(cwd, root, old, new).await

View File

@ -1,4 +1,4 @@
use std::ffi::OsString;
use std::{borrow::Cow, ffi::OsString};
use tracing::error;
use yazi_boot::ARGS;
@ -84,10 +84,10 @@ impl Manager {
if targets.is_empty() {
return;
} else if !opt.interactive {
return tasks.file_open(&opt.hovered, &targets);
return tasks.process_from_files(opt.hovered, targets);
}
let openers: Vec<_> = OPEN.common_openers(&targets).into_iter().cloned().collect();
let openers: Vec<_> = OPEN.common_openers(&targets);
if openers.is_empty() {
return;
}
@ -98,7 +98,7 @@ impl Manager {
openers.iter().map(|o| o.desc.clone()).collect(),
));
if let Ok(choice) = result.await {
TasksProxy::open_with(urls, openers[choice].clone());
TasksProxy::open_with(urls, Cow::Borrowed(openers[choice]));
}
});
}

View File

@ -1,3 +1,5 @@
use std::borrow::Cow;
use yazi_config::{open::Opener, popup::InputCfg};
use yazi_proxy::{InputProxy, TasksProxy};
use yazi_shared::event::Cmd;
@ -38,14 +40,17 @@ impl Tab {
}
}
TasksProxy::open_with(selected, Opener {
run: opt.run,
block: opt.block,
orphan: false,
desc: Default::default(),
for_: None,
spread: true,
});
TasksProxy::open_with(
selected,
Cow::Owned(Opener {
run: opt.run,
block: opt.block,
orphan: false,
desc: Default::default(),
for_: None,
spread: true,
}),
);
});
}
}

View File

@ -2,4 +2,5 @@ mod arrow;
mod cancel;
mod inspect;
mod open_with;
mod process_exec;
mod toggle;

View File

@ -5,7 +5,10 @@ use crate::tasks::Tasks;
impl Tasks {
pub fn open_with(&mut self, opt: impl TryInto<OpenWithOpt>) {
if let Ok(opt) = opt.try_into() {
self.file_open_with(&opt.opener, &opt.targets);
self.process_from_opener(
opt.opener,
opt.targets.into_iter().map(|u| u.into_os_string()).collect(),
);
}
}
}

View File

@ -0,0 +1,11 @@
use yazi_proxy::options::ProcessExecOpt;
use crate::tasks::Tasks;
impl Tasks {
pub fn process_exec(&mut self, opt: impl TryInto<ProcessExecOpt>) {
if let Ok(opt) = opt.try_into() {
self.scheduler.process_open(opt.opener, opt.args, Some(opt.done));
}
}
}

View File

@ -0,0 +1,51 @@
use std::collections::HashSet;
use tracing::debug;
use yazi_shared::fs::Url;
use super::Tasks;
impl Tasks {
pub fn file_cut(&self, src: &[&Url], dest: &Url, force: bool) {
for &u in src {
let to = dest.join(u.file_name().unwrap());
if force && *u == to {
debug!("file_cut: same file, skipping {:?}", to);
} else {
self.scheduler.file_cut(u.clone(), to, force);
}
}
}
pub fn file_copy(&self, src: &[&Url], dest: &Url, force: bool, follow: bool) {
for &u in src {
let to = dest.join(u.file_name().unwrap());
if force && *u == to {
debug!("file_copy: same file, skipping {:?}", to);
} else {
self.scheduler.file_copy(u.clone(), to, force, follow);
}
}
}
pub fn file_link(&self, src: &HashSet<Url>, dest: &Url, relative: bool, force: bool) {
for u in src {
let to = dest.join(u.file_name().unwrap());
if force && *u == to {
debug!("file_link: same file, skipping {:?}", to);
} else {
self.scheduler.file_link(u.clone(), to, relative, force);
}
}
}
pub fn file_remove(&self, targets: Vec<Url>, permanently: bool) {
for u in targets {
if permanently {
self.scheduler.file_delete(u);
} else {
self.scheduler.file_trash(u);
}
}
}
}

View File

@ -1,4 +1,8 @@
mod commands;
mod file;
mod plugin;
mod preload;
mod process;
mod progress;
mod tasks;

View File

@ -0,0 +1,15 @@
use yazi_plugin::ValueSendable;
use super::Tasks;
impl Tasks {
#[inline]
pub fn plugin_micro(&self, name: String, args: Vec<ValueSendable>) {
self.scheduler.plugin_micro(name, args);
}
#[inline]
pub fn plugin_macro(&self, name: String, args: Vec<ValueSendable>) {
self.scheduler.plugin_macro(name, args);
}
}

View File

@ -0,0 +1,94 @@
use std::{collections::HashMap, mem};
use yazi_config::{manager::SortBy, plugin::{PluginRule, MAX_PRELOADERS}, PLUGIN};
use yazi_shared::{fs::{File, Url}, MIME_DIR};
use super::Tasks;
use crate::folder::Files;
impl Tasks {
pub fn preload_paged(&self, paged: &[File], mimetype: &HashMap<Url, String>) {
let mut single_tasks = Vec::with_capacity(paged.len());
let mut multi_tasks: [Vec<_>; MAX_PRELOADERS as usize] = Default::default();
let loaded = self.scheduler.preload.rule_loaded.read();
for f in paged {
let mime = if f.is_dir() { Some(MIME_DIR) } else { mimetype.get(&f.url).map(|s| &**s) };
let factors = |s: &str| match s {
"mime" => mime.is_some(),
_ => false,
};
for rule in PLUGIN.preloaders(&f.url, mime, factors) {
if loaded.get(&f.url).is_some_and(|x| x & (1 << rule.id) != 0) {
continue;
}
if rule.multi {
multi_tasks[rule.id as usize].push(f);
} else {
single_tasks.push((rule, f));
}
}
}
drop(loaded);
let mut loaded = self.scheduler.preload.rule_loaded.write();
let mut go = |rule: &PluginRule, targets: Vec<&File>| {
for &f in &targets {
if let Some(n) = loaded.get_mut(&f.url) {
*n |= 1 << rule.id;
} else {
loaded.insert(f.url.clone(), 1 << rule.id);
}
}
self.scheduler.preload_paged(rule, targets);
};
#[allow(clippy::needless_range_loop)]
for i in 0..PLUGIN.preloaders.len() {
if !multi_tasks[i].is_empty() {
go(&PLUGIN.preloaders[i], mem::take(&mut multi_tasks[i]));
}
}
for (rule, target) in single_tasks {
go(rule, vec![target]);
}
}
pub fn preload_affected(&self, affected: &[File], mimetype: &HashMap<Url, String>) {
{
let mut loaded = self.scheduler.preload.rule_loaded.write();
for f in affected {
loaded.remove(&f.url);
}
}
self.preload_paged(affected, mimetype);
}
pub fn preload_sorted(&self, targets: &Files) {
if targets.sorter().by != SortBy::Size {
return;
}
let targets: Vec<_> = {
let loading = self.scheduler.preload.size_loading.read();
targets
.iter()
.filter(|f| f.is_dir() && !targets.sizes.contains_key(&f.url) && !loading.contains(&f.url))
.map(|f| &f.url)
.collect()
};
if targets.is_empty() {
return;
}
let mut loading = self.scheduler.preload.size_loading.write();
for &target in &targets {
loading.insert(target.clone());
}
self.scheduler.preload_size(targets);
}
}

View File

@ -0,0 +1,41 @@
use std::{borrow::Cow, collections::HashMap, ffi::OsString, mem};
use yazi_config::{open::Opener, OPEN};
use yazi_shared::fs::Url;
use super::Tasks;
impl Tasks {
pub fn process_from_files(&self, hovered: Url, targets: Vec<(Url, String)>) {
let mut openers = HashMap::new();
for (url, mime) in targets {
if let Some(opener) = OPEN.openers(&url, mime).and_then(|o| o.first().copied()) {
openers.entry(opener).or_insert_with(|| vec![hovered.clone()]).push(url);
}
}
for (opener, args) in openers {
self.process_from_opener(
Cow::Borrowed(opener),
args.into_iter().map(|u| u.into_os_string()).collect(),
);
}
}
pub fn process_from_opener(&self, opener: Cow<'static, Opener>, mut args: Vec<OsString>) {
if opener.spread {
self.scheduler.process_open(opener, args, None);
return;
}
if args.is_empty() {
return;
}
if args.len() == 2 {
self.scheduler.process_open(opener, args, None);
return;
}
let hovered = mem::take(&mut args[0]);
for target in args.into_iter().skip(1) {
self.scheduler.process_open(opener.clone(), vec![hovered.clone(), target], None);
}
}
}

View File

@ -1,14 +1,10 @@
use std::{collections::{HashMap, HashSet}, ffi::OsStr, mem, sync::Arc, time::Duration};
use std::{sync::Arc, time::Duration};
use tokio::time::sleep;
use tracing::debug;
use yazi_config::{manager::SortBy, open::Opener, plugin::{PluginRule, MAX_PRELOADERS}, OPEN, PLUGIN};
use yazi_plugin::ValueSendable;
use yazi_scheduler::{Scheduler, TaskSummary};
use yazi_shared::{emit, event::Cmd, fs::{File, Url}, term::Term, Layer, MIME_DIR};
use yazi_shared::{emit, event::Cmd, term::Term, Layer};
use super::{TasksProgress, TASKS_BORDER, TASKS_PADDING, TASKS_PERCENT};
use crate::folder::Files;
pub struct Tasks {
pub(super) scheduler: Arc<Scheduler>,
@ -56,168 +52,6 @@ impl Tasks {
ongoing.values().take(Self::limit()).map(Into::into).collect()
}
pub fn file_open(&self, hovered: &Url, targets: &[(Url, String)]) {
let mut openers = HashMap::new();
for (url, mime) in targets {
if let Some(opener) = OPEN.openers(url, mime).and_then(|o| o.first().copied()) {
openers.entry(opener).or_insert_with(|| vec![hovered]).push(url);
}
}
for (opener, args) in openers {
self.file_open_with(opener, &args);
}
}
pub fn file_open_with(&self, opener: &Opener, args: &[impl AsRef<OsStr>]) {
if opener.spread {
self.scheduler.process_open(opener, args);
return;
}
for target in args.iter().skip(1) {
self.scheduler.process_open(opener, &[&args[0], target]);
}
}
pub fn file_cut(&self, src: &[&Url], dest: &Url, force: bool) {
for &u in src {
let to = dest.join(u.file_name().unwrap());
if force && *u == to {
debug!("file_cut: same file, skipping {:?}", to);
} else {
self.scheduler.file_cut(u.clone(), to, force);
}
}
}
pub fn file_copy(&self, src: &[&Url], dest: &Url, force: bool, follow: bool) {
for &u in src {
let to = dest.join(u.file_name().unwrap());
if force && *u == to {
debug!("file_copy: same file, skipping {:?}", to);
} else {
self.scheduler.file_copy(u.clone(), to, force, follow);
}
}
}
pub fn file_link(&self, src: &HashSet<Url>, dest: &Url, relative: bool, force: bool) {
for u in src {
let to = dest.join(u.file_name().unwrap());
if force && *u == to {
debug!("file_link: same file, skipping {:?}", to);
} else {
self.scheduler.file_link(u.clone(), to, relative, force);
}
}
}
pub fn file_remove(&self, targets: Vec<Url>, permanently: bool) {
for u in targets {
if permanently {
self.scheduler.file_delete(u);
} else {
self.scheduler.file_trash(u);
}
}
}
#[inline]
pub fn plugin_micro(&self, name: String, args: Vec<ValueSendable>) {
self.scheduler.plugin_micro(name, args);
}
#[inline]
pub fn plugin_macro(&self, name: String, args: Vec<ValueSendable>) {
self.scheduler.plugin_macro(name, args);
}
pub fn preload_paged(&self, paged: &[File], mimetype: &HashMap<Url, String>) {
let mut single_tasks = Vec::with_capacity(paged.len());
let mut multi_tasks: [Vec<_>; MAX_PRELOADERS as usize] = Default::default();
let loaded = self.scheduler.preload.rule_loaded.read();
for f in paged {
let mime = if f.is_dir() { Some(MIME_DIR) } else { mimetype.get(&f.url).map(|s| &**s) };
let factors = |s: &str| match s {
"mime" => mime.is_some(),
_ => false,
};
for rule in PLUGIN.preloaders(&f.url, mime, factors) {
if loaded.get(&f.url).is_some_and(|x| x & (1 << rule.id) != 0) {
continue;
}
if rule.multi {
multi_tasks[rule.id as usize].push(f);
} else {
single_tasks.push((rule, f));
}
}
}
drop(loaded);
let mut loaded = self.scheduler.preload.rule_loaded.write();
let mut go = |rule: &PluginRule, targets: Vec<&File>| {
for &f in &targets {
if let Some(n) = loaded.get_mut(&f.url) {
*n |= 1 << rule.id;
} else {
loaded.insert(f.url.clone(), 1 << rule.id);
}
}
self.scheduler.preload_paged(rule, targets);
};
#[allow(clippy::needless_range_loop)]
for i in 0..PLUGIN.preloaders.len() {
if !multi_tasks[i].is_empty() {
go(&PLUGIN.preloaders[i], mem::take(&mut multi_tasks[i]));
}
}
for (rule, target) in single_tasks {
go(rule, vec![target]);
}
}
pub fn preload_affected(&self, affected: &[File], mimetype: &HashMap<Url, String>) {
{
let mut loaded = self.scheduler.preload.rule_loaded.write();
for f in affected {
loaded.remove(&f.url);
}
}
self.preload_paged(affected, mimetype);
}
pub fn preload_sorted(&self, targets: &Files) {
if targets.sorter().by != SortBy::Size {
return;
}
let targets: Vec<_> = {
let loading = self.scheduler.preload.size_loading.read();
targets
.iter()
.filter(|f| f.is_dir() && !targets.sizes.contains_key(&f.url) && !loading.contains(&f.url))
.map(|f| &f.url)
.collect()
};
if targets.is_empty() {
return;
}
let mut loading = self.scheduler.preload.size_loading.write();
for &target in &targets {
loading.insert(target.clone());
}
self.scheduler.preload_size(targets);
}
}
impl Tasks {
#[inline]
pub fn len(&self) -> usize { self.scheduler.ongoing.lock().len() }
}

View File

@ -27,7 +27,7 @@ impl Which {
};
if opt.tx.try_send(opt.idx).is_err() {
error!("callback: send error");
error!("which callback: send error");
}
}
}

View File

@ -156,6 +156,7 @@ impl<'a> Executor<'a> {
on!(inspect);
on!(cancel);
on!(open_with);
on!(process_exec);
#[allow(clippy::single_match)]
match cmd.name.as_str() {

View File

@ -36,5 +36,4 @@ unicode-width = "^0"
yazi-prebuild = "0.1.2"
[target."cfg(unix)".dependencies]
libc = "^0"
uzers = "^0"

View File

@ -3,7 +3,6 @@ mod fzf;
mod highlighter;
mod lsar;
mod rg;
mod shell;
mod zoxide;
pub use fd::*;
@ -11,5 +10,4 @@ pub use fzf::*;
pub use highlighter::*;
pub use lsar::*;
pub use rg::*;
pub use shell::*;
pub use zoxide::*;

View File

@ -1,9 +1,11 @@
mod input;
mod notify;
mod open;
mod process;
mod select;
pub use input::*;
pub use notify::*;
pub use open::*;
pub use process::*;
pub use select::*;

View File

@ -1,3 +1,5 @@
use std::borrow::Cow;
use yazi_config::open::Opener;
use yazi_shared::{event::Cmd, fs::Url};
@ -16,7 +18,7 @@ impl From<Cmd> for OpenDoOpt {
// --- Open with
pub struct OpenWithOpt {
pub targets: Vec<Url>,
pub opener: Opener,
pub opener: Cow<'static, Opener>,
}
impl TryFrom<Cmd> for OpenWithOpt {

View File

@ -0,0 +1,18 @@
use std::{borrow::Cow, ffi::OsString};
use tokio::sync::oneshot;
use yazi_config::open::Opener;
use yazi_shared::event::Cmd;
// --- Exec
pub struct ProcessExecOpt {
pub args: Vec<OsString>,
pub opener: Cow<'static, Opener>,
pub done: oneshot::Sender<()>,
}
impl TryFrom<Cmd> for ProcessExecOpt {
type Error = ();
fn try_from(mut c: Cmd) -> Result<Self, Self::Error> { c.take_data().ok_or(()) }
}

View File

@ -1,13 +1,26 @@
use std::{borrow::Cow, ffi::OsString};
use tokio::sync::oneshot;
use yazi_config::open::Opener;
use yazi_shared::{emit, event::Cmd, fs::Url, Layer};
use crate::options::OpenWithOpt;
use crate::options::{OpenWithOpt, ProcessExecOpt};
pub struct TasksProxy;
impl TasksProxy {
#[inline]
pub fn open_with(targets: Vec<Url>, opener: Opener) {
pub fn open_with(targets: Vec<Url>, opener: Cow<'static, Opener>) {
emit!(Call(Cmd::new("open_with").with_data(OpenWithOpt { targets, opener }), Layer::Tasks));
}
#[inline]
pub async fn process_exec(args: Vec<OsString>, opener: Cow<'static, Opener>) {
let (tx, rx) = oneshot::channel();
emit!(Call(
Cmd::new("process_exec").with_data(ProcessExecOpt { args, opener, done: tx }),
Layer::Tasks
));
rx.await.ok();
}
}

View File

@ -25,9 +25,13 @@ parking_lot = "^0"
regex = "^1"
tokio = { version = "^1", features = [ "parking_lot", "rt-multi-thread" ] }
tokio-stream = "^0"
tokio-util = "^0"
# Logging
tracing = { version = "^0", features = [ "max_level_debug", "release_max_level_warn" ] }
[target."cfg(unix)".dependencies]
libc = "^0"
[target.'cfg(not(target_os = "android"))'.dependencies]
trash = "^3"

View File

@ -2,6 +2,8 @@
mod op;
mod process;
mod shell;
pub use op::*;
pub use process::*;
pub use shell::*;

View File

@ -1,20 +1,45 @@
use std::ffi::OsString;
use tokio::sync::oneshot;
use yazi_plugin::external::ShellOpt;
use tokio_util::sync::CancellationToken;
use super::ShellOpt;
#[derive(Debug)]
pub struct ProcessOpOpen {
pub id: usize,
pub cmd: OsString,
pub args: Vec<OsString>,
pub block: bool,
pub orphan: bool,
pub cancel: oneshot::Sender<()>,
pub struct ProcessOpBlock {
pub id: usize,
pub cmd: OsString,
pub args: Vec<OsString>,
}
impl From<ProcessOpOpen> for ShellOpt {
fn from(op: ProcessOpOpen) -> Self {
Self { cmd: op.cmd, args: op.args, piped: false, orphan: op.orphan }
impl From<ProcessOpBlock> for ShellOpt {
fn from(op: ProcessOpBlock) -> Self {
Self { cmd: op.cmd, args: op.args, piped: false, orphan: false }
}
}
#[derive(Debug)]
pub struct ProcessOpOrphan {
pub id: usize,
pub cmd: OsString,
pub args: Vec<OsString>,
}
impl From<ProcessOpOrphan> for ShellOpt {
fn from(op: ProcessOpOrphan) -> Self {
Self { cmd: op.cmd, args: op.args, piped: false, orphan: true }
}
}
#[derive(Debug)]
pub struct ProcessOpBg {
pub id: usize,
pub cmd: OsString,
pub args: Vec<OsString>,
pub ct: CancellationToken,
}
impl From<ProcessOpBg> for ShellOpt {
fn from(op: ProcessOpBg) -> Self {
Self { cmd: op.cmd, args: op.args, piped: true, orphan: false }
}
}

View File

@ -1,10 +1,9 @@
use anyhow::Result;
use tokio::{io::{AsyncBufReadExt, BufReader}, select, sync::mpsc};
use yazi_plugin::external::{self, ShellOpt};
use yazi_proxy::{AppProxy, HIDER};
use yazi_shared::Defer;
use super::ProcessOpOpen;
use super::{ProcessOpBg, ProcessOpBlock, ProcessOpOrphan, ShellOpt};
use crate::TaskProg;
pub struct Process {
@ -14,28 +13,53 @@ pub struct Process {
impl Process {
pub fn new(prog: mpsc::UnboundedSender<TaskProg>) -> Self { Self { prog } }
pub async fn open(&self, mut task: ProcessOpOpen) -> Result<()> {
if task.block {
return self.open_block(task).await;
pub async fn block(&self, task: ProcessOpBlock) -> Result<()> {
let _permit = HIDER.acquire().await.unwrap();
let _defer = Defer::new(AppProxy::resume);
AppProxy::stop().await;
let (id, cmd) = (task.id, task.cmd.clone());
let result = super::shell(task.into());
if let Err(e) = result {
AppProxy::notify_warn(&cmd.to_string_lossy(), &format!("Failed to spawn process: {e}"));
return self.succ(id);
}
if task.orphan {
return self.open_orphan(task).await;
let status = result.unwrap().wait().await?;
if !status.success() {
let content = match status.code() {
Some(code) => format!("Process exited with status code: {code}"),
None => "Process terminated by signal".to_string(),
};
AppProxy::notify_warn(&cmd.to_string_lossy(), &content);
}
self.succ(id)
}
pub async fn orphan(&self, task: ProcessOpOrphan) -> Result<()> {
let id = task.id;
match super::shell(task.into()) {
Ok(_) => self.succ(id)?,
Err(e) => {
self.prog.send(TaskProg::New(id, 0))?;
self.fail(id, format!("Failed to spawn process: {e}"))?;
}
}
Ok(())
}
pub async fn bg(&self, task: ProcessOpBg) -> Result<()> {
self.prog.send(TaskProg::New(task.id, 0))?;
let mut child = external::shell(ShellOpt {
cmd: task.cmd,
args: task.args,
piped: true,
..Default::default()
})?;
let mut child =
super::shell(ShellOpt { cmd: task.cmd, args: task.args, piped: true, ..Default::default() })?;
let mut stdout = BufReader::new(child.stdout.take().unwrap()).lines();
let mut stderr = BufReader::new(child.stderr.take().unwrap()).lines();
loop {
select! {
_ = task.cancel.closed() => {
_ = task.ct.cancelled() => {
child.start_kill().ok();
break;
}
@ -61,43 +85,6 @@ impl Process {
self.prog.send(TaskProg::Adv(task.id, 1, 0))?;
self.succ(task.id)
}
async fn open_block(&self, task: ProcessOpOpen) -> Result<()> {
let _permit = HIDER.acquire().await.unwrap();
let _defer = Defer::new(AppProxy::resume);
AppProxy::stop().await;
let (id, cmd) = (task.id, task.cmd.clone());
let result = external::shell(task.into());
if let Err(e) = result {
AppProxy::notify_warn(&cmd.to_string_lossy(), &format!("Failed to spawn process: {e}"));
return self.succ(id);
}
let status = result.unwrap().wait().await?;
if !status.success() {
let content = match status.code() {
Some(code) => format!("Process exited with status code: {code}"),
None => "Process terminated by signal".to_string(),
};
AppProxy::notify_warn(&cmd.to_string_lossy(), &content);
}
self.succ(id)
}
async fn open_orphan(&self, task: ProcessOpOpen) -> Result<()> {
let id = task.id;
match external::shell(task.into()) {
Ok(_) => self.succ(id)?,
Err(e) => {
self.prog.send(TaskProg::New(id, 0))?;
self.fail(id, format!("Failed to spawn process: {e}"))?;
}
}
Ok(())
}
}
impl Process {

View File

@ -12,11 +12,6 @@ pub struct ShellOpt {
}
impl ShellOpt {
pub fn with_piped(mut self) -> Self {
self.piped = true;
self
}
#[inline]
fn stdio(&self) -> Stdio {
if self.orphan {

View File

@ -1,14 +1,15 @@
use std::{ffi::OsStr, sync::Arc, time::Duration};
use std::{borrow::Cow, ffi::OsString, sync::Arc, time::Duration};
use futures::{future::BoxFuture, FutureExt};
use parking_lot::Mutex;
use tokio::{fs, select, sync::{mpsc::{self, UnboundedReceiver}, oneshot}};
use tokio_util::sync::CancellationToken;
use yazi_config::{open::Opener, plugin::PluginRule, TASKS};
use yazi_plugin::ValueSendable;
use yazi_shared::{fs::{unique_path, Url}, Throttle};
use super::{Ongoing, TaskProg, TaskStage};
use crate::{file::{File, FileOpDelete, FileOpLink, FileOpPaste, FileOpTrash}, plugin::{Plugin, PluginOpEntry}, preload::{Preload, PreloadOpRule, PreloadOpSize}, process::{Process, ProcessOpOpen}, TaskKind, TaskOp, HIGH, LOW, NORMAL};
use crate::{file::{File, FileOpDelete, FileOpLink, FileOpPaste, FileOpTrash}, plugin::{Plugin, PluginOpEntry}, preload::{Preload, PreloadOpRule, PreloadOpSize}, process::{Process, ProcessOpBg, ProcessOpBlock, ProcessOpOrphan}, TaskKind, TaskOp, HIGH, LOW, NORMAL};
pub struct Scheduler {
pub file: Arc<File>,
@ -332,47 +333,55 @@ impl Scheduler {
}
}
pub fn process_open(&self, opener: &Opener, args: &[impl AsRef<OsStr>]) {
pub fn process_open(
&self,
opener: Cow<'static, Opener>,
args: Vec<OsString>,
done: Option<oneshot::Sender<()>>,
) {
let name = {
let s = format!("Run `{}`", opener.run);
let args = args.iter().map(|a| a.as_ref().to_string_lossy()).collect::<Vec<_>>().join(" ");
if args.is_empty() { s } else { format!("{s} with `{args}`") }
let args = args.iter().map(|a| a.to_string_lossy()).collect::<Vec<_>>().join(" ");
if args.is_empty() {
format!("Run {:?}", opener.run)
} else {
format!("Run {:?} with `{args}`", opener.run)
}
};
let ct = CancellationToken::new();
let mut ongoing = self.ongoing.lock();
let id = ongoing.add(TaskKind::User, name);
let (cancel_tx, mut cancel_rx) = oneshot::channel();
let id = ongoing.add(TaskKind::User, name);
ongoing.hooks.insert(id, {
let ct = ct.clone();
let ongoing = self.ongoing.clone();
Box::new(move |canceled: bool| {
async move {
if canceled {
cancel_rx.close();
}
ongoing.lock().try_remove(id, TaskStage::Hooked);
if canceled {
ct.cancel();
}
if let Some(tx) = done {
tx.send(()).ok();
}
}
.boxed()
})
});
let args = args.iter().map(|a| a.as_ref().to_os_string()).collect::<Vec<_>>();
tokio::spawn({
let process = self.process.clone();
let opener = opener.clone();
let process = self.process.clone();
_ = self.micro.try_send(
async move {
process
.open(ProcessOpOpen {
id,
cmd: opener.run.into(),
args,
block: opener.block,
orphan: opener.orphan,
cancel: cancel_tx,
})
.await
.ok();
if opener.block {
process.block(ProcessOpBlock { id, cmd: OsString::from(&opener.run), args }).await.ok();
} else if opener.orphan {
process.orphan(ProcessOpOrphan { id, cmd: OsString::from(&opener.run), args }).await.ok();
} else {
process.bg(ProcessOpBg { id, cmd: OsString::from(&opener.run), args, ct }).await.ok();
}
}
});
.boxed(),
HIGH,
);
}
}