graphql-ws: fix stop all pollers when a connection is dropped (#1315)

<!-- The PR description should answer 2 important questions: -->

### What

<!-- What is this PR trying to accomplish (and why, if it's not
obvious)? -->

<!-- Consider: do we need to add a changelog entry? -->

<!-- Does this PR introduce new validation that might break old builds?
-->

<!-- Consider: do we need to put new checks behind a flag? -->
Fix a bug with pollers are not being stopped when the connection is
dropped.

### How
Fetch all poller operation ids at once from a mutex map and stop each
poller associated with them in a for-loop.
<!-- How is it trying to accomplish it (what are the implementation
steps)? -->

V3_GIT_ORIGIN_REV_ID: f17511d611b92ace3de2c83e606d26214649878b
This commit is contained in:
Rakesh Emmadi 2024-11-05 09:08:17 +05:30 committed by hasura-bot
parent c8d43adb1c
commit d4465ce035

View File

@ -88,10 +88,8 @@ impl<M> Connections<M> {
if let Some(connection) = self.remove_connection(id).await {
// Record the connection drop in metrics
connection.context.metrics.record_connection_drop();
// Clean up pollers when the connection ends
for operation_id in connection.pollers.read().await.keys() {
connection.stop_poller(operation_id).await;
}
// Stop all pollers associated with the connection
connection.stop_all_pollers().await;
}
}
@ -158,6 +156,25 @@ impl<M> Connection<M> {
}
}
/// Stops all pollers associated with the connection.
pub(crate) async fn stop_all_pollers(&self)
where
M: WebSocketMetrics,
{
// Fetch all poller keys from the mutex-protected map by cloning them into a vector.
// This is necessary to avoid holding the mutex guard lock while dropping the pollers.
let pollers = self
.pollers
.read()
.await
.keys()
.cloned()
.collect::<Vec<_>>();
for operation_id in pollers {
self.stop_poller(&operation_id).await;
}
}
/// Internal method to remove a poller associated with the given operation ID.
async fn remove_poller(&self, key: &protocol::OperationId) -> Option<poller::Poller> {
let mut map = self.pollers.write().await;