From f2d134917e7bb06942b4722788da1d071c14776b Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 22 Jun 2022 18:39:12 +0200 Subject: [PATCH] Remove non-determinism from `Peer` caused by smol's `timeout` helper --- crates/collab/src/integration_tests.rs | 2 +- crates/rpc/src/peer.rs | 59 ++++++++++++++------------ 2 files changed, 34 insertions(+), 27 deletions(-) diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index e50c1c4628..02c3a2ea8a 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -4722,7 +4722,7 @@ async fn test_random_collaboration( op_start_signals.remove(guest_ix); server.forbid_connections(); server.disconnect_client(removed_guest_id); - deterministic.advance_clock(5 * RECEIVE_TIMEOUT); + deterministic.advance_clock(RECEIVE_TIMEOUT); deterministic.start_waiting(); let (guest, guest_project, mut guest_cx, guest_err) = guest.await; deterministic.finish_waiting(); diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index 59abf3c8e3..2361812409 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -11,7 +11,6 @@ use futures::{ }; use parking_lot::{Mutex, RwLock}; use serde::{ser::SerializeStruct, Serialize}; -use smol_timeout::TimeoutExt; use std::sync::atomic::Ordering::SeqCst; use std::{ fmt, @@ -177,14 +176,17 @@ impl Peer { outgoing = outgoing_rx.next().fuse() => match outgoing { Some(outgoing) => { tracing::debug!(%connection_id, "outgoing rpc message: writing"); - if let Some(result) = writer.write(outgoing).timeout(WRITE_TIMEOUT).await { - tracing::debug!(%connection_id, "outgoing rpc message: done writing"); - result.context("failed to write RPC message")?; - tracing::debug!(%connection_id, "keepalive interval: resetting after sending message"); - keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse()); - } else { - tracing::debug!(%connection_id, "outgoing rpc message: writing timed out"); - Err(anyhow!("timed out writing message"))?; + futures::select_biased! { + result = writer.write(outgoing).fuse() => { + tracing::debug!(%connection_id, "outgoing rpc message: done writing"); + result.context("failed to write RPC message")?; + tracing::debug!(%connection_id, "keepalive interval: resetting after sending message"); + keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse()); + } + _ = create_timer(WRITE_TIMEOUT).fuse() => { + tracing::debug!(%connection_id, "outgoing rpc message: writing timed out"); + Err(anyhow!("timed out writing message"))?; + } } } None => { @@ -199,32 +201,37 @@ impl Peer { receive_timeout.set(create_timer(RECEIVE_TIMEOUT).fuse()); if let proto::Message::Envelope(incoming) = incoming { tracing::debug!(%connection_id, "incoming rpc message: processing"); - match incoming_tx.send(incoming).timeout(RECEIVE_TIMEOUT).await { - Some(Ok(_)) => { - tracing::debug!(%connection_id, "incoming rpc message: processed"); + futures::select_biased! { + result = incoming_tx.send(incoming).fuse() => match result { + Ok(_) => { + tracing::debug!(%connection_id, "incoming rpc message: processed"); + } + Err(_) => { + tracing::debug!(%connection_id, "incoming rpc message: channel closed"); + return Ok(()) + } }, - Some(Err(_)) => { - tracing::debug!(%connection_id, "incoming rpc message: channel closed"); - return Ok(()) - }, - None => { + _ = create_timer(WRITE_TIMEOUT).fuse() => { tracing::debug!(%connection_id, "incoming rpc message: processing timed out"); Err(anyhow!("timed out processing incoming message"))? - }, + } } } break; }, _ = keepalive_timer => { tracing::debug!(%connection_id, "keepalive interval: pinging"); - if let Some(result) = writer.write(proto::Message::Ping).timeout(WRITE_TIMEOUT).await { - tracing::debug!(%connection_id, "keepalive interval: done pinging"); - result.context("failed to send keepalive")?; - tracing::debug!(%connection_id, "keepalive interval: resetting after pinging"); - keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse()); - } else { - tracing::debug!(%connection_id, "keepalive interval: pinging timed out"); - Err(anyhow!("timed out sending keepalive"))?; + futures::select_biased! { + result = writer.write(proto::Message::Ping).fuse() => { + tracing::debug!(%connection_id, "keepalive interval: done pinging"); + result.context("failed to send keepalive")?; + tracing::debug!(%connection_id, "keepalive interval: resetting after pinging"); + keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse()); + } + _ = create_timer(WRITE_TIMEOUT).fuse() => { + tracing::debug!(%connection_id, "keepalive interval: pinging timed out"); + Err(anyhow!("timed out sending keepalive"))?; + } } } _ = receive_timeout => {