fix: a race condition in DDS static messages sent as internal events (#868)

This commit is contained in:
三咲雅 · Misaki Masa 2024-04-03 00:31:11 +08:00 committed by GitHub
parent ec83c5dc5c
commit 1b030e0f52
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 87 additions and 81 deletions

View File

@ -1,11 +1,11 @@
use anyhow::Result;
use mlua::{ExternalResult, IntoLua, Lua, Value};
use serde::{Deserialize, Serialize};
use serde::Serialize;
use super::{BodyBulk, BodyCd, BodyCustom, BodyHey, BodyHi, BodyHover, BodyRename, BodyYank};
use crate::Payload;
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize)]
#[serde(untagged)]
pub enum Body<'a> {
Hi(BodyHi<'a>),

View File

@ -1,23 +1,19 @@
use mlua::{IntoLua, Lua, Value};
use serde::{Deserialize, Serialize};
use serde::Serialize;
use super::Body;
use crate::ValueSendable;
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug)]
pub struct BodyCustom {
#[serde(skip)]
pub kind: String,
#[serde(flatten)]
pub value: ValueSendable,
}
impl BodyCustom {
#[inline]
pub fn from_str(kind: &str, value: &str) -> anyhow::Result<Body<'static>> {
let mut me = serde_json::from_str::<Self>(value)?;
kind.clone_into(&mut me.kind);
Ok(me.into())
Ok(Self { kind: kind.to_owned(), value: serde_json::from_str(value)? }.into())
}
#[inline]
@ -33,3 +29,9 @@ impl From<BodyCustom> for Body<'_> {
impl IntoLua<'_> for BodyCustom {
fn into_lua(self, lua: &Lua) -> mlua::Result<Value> { self.value.into_lua(lua) }
}
impl Serialize for BodyCustom {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
serde::Serialize::serialize(&self.value, serializer)
}
}

View File

@ -23,8 +23,8 @@ impl<'a> BodyYank<'a> {
impl BodyYank<'static> {
#[inline]
pub fn dummy(cut: bool) -> Body<'static> {
Self { cut, urls: Default::default(), dummy: true }.into()
pub fn dummy() -> Body<'static> {
Self { cut: false, urls: Default::default(), dummy: true }.into()
}
}
@ -37,7 +37,7 @@ impl IntoLua<'_> for BodyYank<'static> {
if let Some(Cow::Owned(urls)) = Some(self.urls).filter(|_| !self.dummy) {
BodyYankIter { cut: self.cut, urls: urls.into_iter().collect() }.into_lua(lua)
} else {
lua.create_table_from([("cut", self.cut)])?.into_lua(lua)
lua.create_table()?.into_lua(lua)
}
}
}

View File

@ -84,7 +84,7 @@ impl Client {
server.take().map(|h| h.abort());
*server = Server::make().await.ok();
if server.is_some() {
super::STATE.load().await.ok();
super::STATE.load_or_create().await;
}
if mem::replace(&mut first, false) && server.is_some() {

View File

@ -133,7 +133,7 @@ impl Pubsub {
pub fn pub_from_yank(cut: bool, urls: &HashSet<Url>) {
if LOCAL.read().contains_key("yank") {
Self::pub_(BodyYank::dummy(cut));
Self::pub_(BodyYank::dummy());
}
if PEERS.read().values().any(|p| p.able("yank")) {
Client::push(BodyYank::borrowed(cut, urls).with_severity(30));

View File

@ -11,7 +11,7 @@ pub enum ValueSendable {
Boolean(bool),
Integer(i64),
Number(f64),
String(Vec<u8>),
String(String),
Table(HashMap<ValueSendableKey, ValueSendable>),
}
@ -31,10 +31,7 @@ impl ValueSendable {
let mut map = HashMap::with_capacity(table.len());
for pair in table {
let (ValueSendableKey::String(k), Self::String(v)) = pair else {
continue;
};
if let (Ok(k), Ok(v)) = (String::from_utf8(k), String::from_utf8(v)) {
if let (ValueSendableKey::String(k), Self::String(v)) = pair {
map.insert(k, v);
}
}
@ -52,7 +49,7 @@ impl<'a> TryFrom<Value<'a>> for ValueSendable {
Value::LightUserData(_) => Err("light userdata is not supported".into_lua_err())?,
Value::Integer(n) => Self::Integer(n),
Value::Number(n) => Self::Number(n),
Value::String(s) => Self::String(s.as_bytes().to_vec()),
Value::String(s) => Self::String(s.to_str()?.to_owned()),
Value::Table(t) => {
let mut map = HashMap::with_capacity(t.len().map(|l| l as usize)?);
for result in t.pairs::<Value, Value>() {
@ -96,7 +93,7 @@ pub enum ValueSendableKey {
Boolean(bool),
Integer(i64),
Number(OrderedFloat),
String(Vec<u8>),
String(String),
}
impl ValueSendableKey {

View File

@ -95,18 +95,20 @@ impl Server {
let Ok(payload) = Payload::from_str(&s) else { return };
let Body::Hi(hi) = payload.body else { return };
let mut clients = CLIENTS.write();
id.replace(payload.sender).and_then(|id| clients.remove(&id));
if let Some(ref state) = *STATE.read() {
state.values().for_each(|s| _ = tx.send(format!("{s}\n")));
if id.is_none() {
if let Some(ref state) = *STATE.read() {
state.values().for_each(|s| _ = tx.send(s.clone()));
}
}
let mut clients = CLIENTS.write();
id.replace(payload.sender).and_then(|id| clients.remove(&id));
clients.insert(payload.sender, Client {
id: payload.sender,
tx,
abilities: hi.abilities.into_iter().map(|s| s.into_owned()).collect(),
});
Self::handle_hey(&clients);
}

View File

@ -30,29 +30,11 @@ impl State {
}
}
pub async fn load(&self) -> Result<()> {
let mut buf = BufReader::new(File::open(BOOT.state_dir.join(".dds")).await?);
let mut line = String::new();
let mut inner = HashMap::new();
while buf.read_line(&mut line).await? > 0 {
let mut parts = line.splitn(5, ',');
let Some(kind) = parts.next() else { continue };
let Some(_) = parts.next() else { continue };
let Some(severity) = parts.next().and_then(|s| s.parse::<u8>().ok()) else { continue };
let Some(_) = parts.next() else { continue };
let Some(body) = parts.next() else { continue };
inner.insert(format!("{}_{severity}_{kind}", Body::tab(kind, body)), mem::take(&mut line));
pub async fn load_or_create(&self) {
if self.load().await.is_err() {
self.inner.write().replace(Default::default());
self.last.store(timestamp_us(), Ordering::Relaxed);
}
let clients = CLIENTS.read();
for payload in inner.values() {
clients.values().for_each(|c| _ = c.tx.send(format!("{payload}\n")));
}
self.inner.write().replace(inner);
self.last.store(timestamp_us(), Ordering::Relaxed);
Ok(())
}
pub async fn drain(&self) -> Result<()> {
@ -77,6 +59,32 @@ impl State {
buf.write_u8(b'\n').await?;
}
buf.flush().await?;
Ok(())
}
async fn load(&self) -> Result<()> {
let mut buf = BufReader::new(File::open(BOOT.state_dir.join(".dds")).await?);
let mut line = String::new();
let mut inner = HashMap::new();
while buf.read_line(&mut line).await? > 0 {
let mut parts = line.splitn(5, ',');
let Some(kind) = parts.next() else { continue };
let Some(_) = parts.next() else { continue };
let Some(severity) = parts.next().and_then(|s| s.parse::<u8>().ok()) else { continue };
let Some(_) = parts.next() else { continue };
let Some(body) = parts.next() else { continue };
inner.insert(format!("{}_{severity}_{kind}", Body::tab(kind, body)), mem::take(&mut line));
}
let clients = CLIENTS.read();
for payload in inner.values() {
clients.values().for_each(|c| _ = c.tx.send(payload.clone()));
}
self.inner.write().replace(inner);
self.last.store(timestamp_us(), Ordering::Relaxed);
Ok(())
}

View File

@ -17,7 +17,7 @@ pub(crate) struct App {
impl App {
pub(crate) async fn serve() -> Result<()> {
let term = Term::start()?;
let signals = Signals::start()?;
let (mut rx, signals) = (Event::take(), Signals::start()?);
Lives::register()?;
let mut app = Self { cx: Ctx::make(), term: Some(term), signals };
@ -25,7 +25,7 @@ impl App {
let mut times = 0;
let mut events = Vec::with_capacity(200);
while app.signals.rx.recv_many(&mut events, 50).await > 0 {
while rx.recv_many(&mut events, 50).await > 0 {
for event in events.drain(..) {
times += 1;
app.dispatch(event)?;
@ -38,7 +38,7 @@ impl App {
if times >= 50 {
times = 0;
app.render();
} else if let Ok(event) = app.signals.rx.try_recv() {
} else if let Ok(event) = rx.try_recv() {
events.push(event);
emit!(Render);
} else {

View File

@ -39,6 +39,8 @@ async fn main() -> anyhow::Result<()> {
_ = fdlimit::raise_fd_limit();
yazi_shared::init();
yazi_config::init()?;
yazi_adaptor::init();

View File

@ -1,23 +1,18 @@
use anyhow::Result;
use crossterm::event::{Event as CrosstermEvent, EventStream, KeyEvent, KeyEventKind};
use futures::StreamExt;
use tokio::{select, sync::mpsc, task::JoinHandle};
use tokio::{select, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use yazi_shared::event::Event;
pub(super) struct Signals {
tx: mpsc::UnboundedSender<Event>,
pub rx: mpsc::UnboundedReceiver<Event>,
ct: CancellationToken,
ct: CancellationToken,
}
impl Signals {
pub(super) fn start() -> Result<Self> {
let (tx, rx) = mpsc::unbounded_channel();
let ct = CancellationToken::new();
let mut signals = Self { tx: tx.clone(), rx, ct };
let mut signals = Self { ct: CancellationToken::new() };
Event::init(tx);
signals.spawn_system_task()?;
signals.spawn_crossterm_task();
@ -52,12 +47,11 @@ impl Signals {
SIGCONT,
])?;
let tx = self.tx.clone();
Ok(tokio::spawn(async move {
while let Some(signal) = signals.next().await {
match signal {
SIGHUP | SIGTERM | SIGQUIT | SIGINT => {
tx.send(Event::Quit(Default::default())).ok();
Event::Quit(Default::default()).emit();
}
SIGCONT if HIDER.try_acquire().is_ok() => AppProxy::resume(),
_ => {}
@ -68,23 +62,18 @@ impl Signals {
fn spawn_crossterm_task(&mut self) -> JoinHandle<()> {
let mut reader = EventStream::new();
let (tx, ct) = (self.tx.clone(), self.ct.clone());
let ct = self.ct.clone();
tokio::spawn(async move {
loop {
select! {
_ = ct.cancelled() => break,
Some(Ok(event)) = reader.next() => {
let event = match event {
// We need to check key event kind;
// otherwise event will be dispatched twice.
CrosstermEvent::Key(key @ KeyEvent { kind: KeyEventKind::Press, .. }) => Event::Key(key),
CrosstermEvent::Paste(str) => Event::Paste(str),
CrosstermEvent::Resize(..) => Event::Resize,
_ => continue,
};
if tx.send(event).is_err() {
break;
match event {
CrosstermEvent::Key(key @ KeyEvent { kind: KeyEventKind::Press, .. }) => Event::Key(key).emit(),
CrosstermEvent::Paste(str) => Event::Paste(str).emit(),
CrosstermEvent::Resize(..) => Event::Resize.emit(),
_ => {},
}
}
}

View File

@ -26,10 +26,7 @@ impl TryFrom<Cmd> for Opt {
let mut data: OptData = c.take_data().unwrap_or_default();
if let Some(args) = c.named.get("args") {
data.args = shell_words::split(args)?
.into_iter()
.map(|s| ValueSendable::String(s.into_bytes()))
.collect();
data.args = shell_words::split(args)?.into_iter().map(ValueSendable::String).collect();
}
Ok(Self { name, sync: c.named.contains_key("sync"), data })

View File

@ -7,6 +7,7 @@ use super::Cmd;
use crate::{Layer, RoCell};
static TX: RoCell<mpsc::UnboundedSender<Event>> = RoCell::new();
static RX: RoCell<mpsc::UnboundedReceiver<Event>> = RoCell::new();
#[derive(Debug)]
pub enum Event {
@ -27,7 +28,14 @@ pub struct EventQuit {
impl Event {
#[inline]
pub fn init(tx: mpsc::UnboundedSender<Event>) { TX.init(tx); }
pub fn init() {
let (tx, rx) = mpsc::unbounded_channel();
TX.init(tx);
RX.init(rx);
}
#[inline]
pub fn take() -> mpsc::UnboundedReceiver<Event> { RX.drop() }
#[inline]
pub fn emit(self) { TX.send(self).ok(); }

View File

@ -32,3 +32,5 @@ pub use ro_cell::*;
pub use throttle::*;
pub use time::*;
pub use xdg::*;
pub fn init() { event::Event::init(); }

View File

@ -34,10 +34,9 @@ impl<T> RoCell<T> {
}
#[inline]
pub fn drop(&self) {
unsafe {
*self.0.get() = None;
}
pub fn drop(&self) -> T {
debug_assert!(self.is_initialized());
unsafe { mem::take(&mut *self.0.get()).unwrap_unchecked() }
}
#[inline]