diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index 73b32f2116..62d2c6fb31 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -966,8 +966,6 @@ mod tests { server.roll_access_token(); server.allow_connections(); cx.foreground().advance_clock(Duration::from_secs(10)); - assert_eq!(server.auth_count(), 1); - cx.foreground().advance_clock(Duration::from_secs(10)); while !matches!(status.next().await, Some(Status::Connected { .. })) {} assert_eq!(server.auth_count(), 2); // Client re-authenticated due to an invalid token } diff --git a/crates/gpui/src/executor.rs b/crates/gpui/src/executor.rs index ae21ef9537..2089b954fb 100644 --- a/crates/gpui/src/executor.rs +++ b/crates/gpui/src/executor.rs @@ -330,14 +330,34 @@ impl Deterministic { } pub fn advance_clock(&self, duration: Duration) { - let mut state = self.state.lock(); - state.now += duration; - let now = state.now; - let mut pending_timers = mem::take(&mut state.pending_timers); - drop(state); + let new_now = self.state.lock().now + duration; + loop { + self.run_until_parked(); + let mut state = self.state.lock(); - pending_timers.retain(|(_, wakeup, _)| *wakeup > now); - self.state.lock().pending_timers.extend(pending_timers); + if let Some((_, wakeup_time, _)) = state.pending_timers.first() { + let wakeup_time = *wakeup_time; + if wakeup_time < new_now { + let timer_count = state + .pending_timers + .iter() + .take_while(|(_, t, _)| *t == wakeup_time) + .count(); + state.now = wakeup_time; + let timers_to_wake = state + .pending_timers + .drain(0..timer_count) + .collect::>(); + drop(state); + drop(timers_to_wake); + continue; + } + } + + break; + } + + self.state.lock().now = new_now; } } @@ -640,9 +660,6 @@ impl Background { for _ in 0..yields { yield_now().await; } - - let delay = Duration::from_millis(executor.state.lock().rng.gen_range(0..100)); - executor.advance_clock(delay); } } _ => panic!("this method can only be called on a deterministic executor"), diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index d5f790833e..8f1d66e47a 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -88,13 +88,14 @@ pub struct Peer { #[derive(Clone)] pub struct ConnectionState { - outgoing_tx: futures::channel::mpsc::UnboundedSender, + outgoing_tx: futures::channel::mpsc::UnboundedSender, next_message_id: Arc, response_channels: Arc>>>>, } const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1); +const WRITE_TIMEOUT: Duration = Duration::from_secs(2); impl Peer { pub fn new() -> Arc { @@ -142,19 +143,25 @@ impl Peer { this.connections.write().remove(&connection_id); }); + // Send messages on this frequency so the connection isn't closed. + let keepalive_timer = create_timer(KEEPALIVE_INTERVAL).fuse(); + futures::pin_mut!(keepalive_timer); + loop { let read_message = reader.read().fuse(); futures::pin_mut!(read_message); - let read_timeout = create_timer(2 * KEEPALIVE_INTERVAL).fuse(); - futures::pin_mut!(read_timeout); + + // Disconnect if we don't receive messages at least this frequently. + let receive_timeout = create_timer(3 * KEEPALIVE_INTERVAL).fuse(); + futures::pin_mut!(receive_timeout); loop { futures::select_biased! { outgoing = outgoing_rx.next().fuse() => match outgoing { Some(outgoing) => { - let outgoing = proto::Message::Envelope(outgoing); - if let Some(result) = writer.write(outgoing).timeout(2 * KEEPALIVE_INTERVAL).await { + if let Some(result) = writer.write(outgoing).timeout(WRITE_TIMEOUT).await { result.context("failed to write RPC message")?; + keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse()); } else { Err(anyhow!("timed out writing message"))?; } @@ -168,18 +175,18 @@ impl Peer { return Ok(()); } } - break; }, - _ = create_timer(KEEPALIVE_INTERVAL).fuse() => { - if let Some(result) = writer.write(proto::Message::Ping).timeout(2 * KEEPALIVE_INTERVAL).await { - result.context("failed to send websocket ping")?; + _ = keepalive_timer => { + if let Some(result) = writer.write(proto::Message::Ping).timeout(WRITE_TIMEOUT).await { + result.context("failed to send keepalive")?; + keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse()); } else { - Err(anyhow!("timed out sending websocket ping"))?; + Err(anyhow!("timed out sending keepalive"))?; } } - _ = read_timeout => { - Err(anyhow!("timed out reading message"))? + _ = receive_timeout => { + Err(anyhow!("delay between messages too long"))? } } } @@ -278,11 +285,11 @@ impl Peer { .insert(message_id, tx); connection .outgoing_tx - .unbounded_send(request.into_envelope( + .unbounded_send(proto::Message::Envelope(request.into_envelope( message_id, None, original_sender_id.map(|id| id.0), - )) + ))) .map_err(|_| anyhow!("connection was closed"))?; Ok(()) }); @@ -305,7 +312,9 @@ impl Peer { .fetch_add(1, atomic::Ordering::SeqCst); connection .outgoing_tx - .unbounded_send(message.into_envelope(message_id, None, None))?; + .unbounded_send(proto::Message::Envelope( + message.into_envelope(message_id, None, None), + ))?; Ok(()) } @@ -321,7 +330,11 @@ impl Peer { .fetch_add(1, atomic::Ordering::SeqCst); connection .outgoing_tx - .unbounded_send(message.into_envelope(message_id, None, Some(sender_id.0)))?; + .unbounded_send(proto::Message::Envelope(message.into_envelope( + message_id, + None, + Some(sender_id.0), + )))?; Ok(()) } @@ -336,7 +349,11 @@ impl Peer { .fetch_add(1, atomic::Ordering::SeqCst); connection .outgoing_tx - .unbounded_send(response.into_envelope(message_id, Some(receipt.message_id), None))?; + .unbounded_send(proto::Message::Envelope(response.into_envelope( + message_id, + Some(receipt.message_id), + None, + )))?; Ok(()) } @@ -351,7 +368,11 @@ impl Peer { .fetch_add(1, atomic::Ordering::SeqCst); connection .outgoing_tx - .unbounded_send(response.into_envelope(message_id, Some(receipt.message_id), None))?; + .unbounded_send(proto::Message::Envelope(response.into_envelope( + message_id, + Some(receipt.message_id), + None, + )))?; Ok(()) } diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index a1cb3dbc2e..ffb74f4939 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -283,6 +283,7 @@ pub struct MessageStream { encoding_buffer: Vec, } +#[derive(Debug)] pub enum Message { Envelope(Envelope), Ping, diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 46536dab09..241217fe63 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -2732,8 +2732,6 @@ mod tests { .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete()) .await; - eprintln!("sharing"); - project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap(); // Join the worktree as client B.