Avoid retaining executor when using Connection::in_memory

This commit is contained in:
Max Brunsfeld 2022-03-01 18:02:12 -08:00
parent 95b2f4fb16
commit acf7ef3d61

View File

@ -59,18 +59,21 @@ impl Connection {
) { ) {
use futures::channel::mpsc; use futures::channel::mpsc;
use io::{Error, ErrorKind}; use io::{Error, ErrorKind};
use std::sync::Arc;
let (tx, rx) = mpsc::unbounded::<WebSocketMessage>(); let (tx, rx) = mpsc::unbounded::<WebSocketMessage>();
let tx = tx let tx = tx
.sink_map_err(|e| WebSocketError::from(Error::new(ErrorKind::Other, e))) .sink_map_err(|e| WebSocketError::from(Error::new(ErrorKind::Other, e)))
.with({ .with({
let executor = Arc::downgrade(&executor);
let kill_rx = kill_rx.clone(); let kill_rx = kill_rx.clone();
let executor = executor.clone();
move |msg| { move |msg| {
let kill_rx = kill_rx.clone(); let kill_rx = kill_rx.clone();
let executor = executor.clone(); let executor = executor.clone();
Box::pin(async move { Box::pin(async move {
executor.simulate_random_delay().await; if let Some(executor) = executor.upgrade() {
executor.simulate_random_delay().await;
}
if kill_rx.borrow().is_none() { if kill_rx.borrow().is_none() {
Ok(msg) Ok(msg)
} else { } else {
@ -80,9 +83,11 @@ impl Connection {
} }
}); });
let rx = rx.then(move |msg| { let rx = rx.then(move |msg| {
let executor = executor.clone(); let executor = Arc::downgrade(&executor);
Box::pin(async move { Box::pin(async move {
executor.simulate_random_delay().await; if let Some(executor) = executor.upgrade() {
executor.simulate_random_delay().await;
}
msg msg
}) })
}); });