mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-10-04 22:07:40 +03:00
server: fix poller error metric not decrementing when subscriptions are closed
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/9589 GitOrigin-RevId: 8e3bd019f632d7dd5010cdc147bd561856b21ad2
This commit is contained in:
parent
ab735a3a2c
commit
eb02f33c0e
@ -40,7 +40,7 @@ import Hasura.GraphQL.Execute.Backend
|
||||
import Hasura.GraphQL.Execute.Subscription.Options
|
||||
import Hasura.GraphQL.Execute.Subscription.Plan
|
||||
import Hasura.GraphQL.Execute.Subscription.Poll
|
||||
import Hasura.GraphQL.Execute.Subscription.Poll.Common (PollerResponseState (PRSSuccess))
|
||||
import Hasura.GraphQL.Execute.Subscription.Poll.Common (PollerResponseState (PRSError, PRSSuccess))
|
||||
import Hasura.GraphQL.Execute.Subscription.TMap qualified as TMap
|
||||
import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash)
|
||||
import Hasura.GraphQL.Transport.Backend
|
||||
@ -411,7 +411,7 @@ removeLiveQuery logger serverMetrics prometheusMetrics lqState lqId@(SubscriberD
|
||||
detM <- getQueryDet lqMap
|
||||
case detM of
|
||||
Nothing -> return (pure ())
|
||||
Just (Poller cohorts _ ioState parameterizedQueryHash operationNamesMap, cohort) -> do
|
||||
Just (Poller cohorts pollerState ioState parameterizedQueryHash operationNamesMap, cohort) -> do
|
||||
TMap.lookup maybeOperationName operationNamesMap >>= \case
|
||||
-- If only one operation name is present in the map, delete it
|
||||
Just 1 -> TMap.delete maybeOperationName operationNamesMap
|
||||
@ -421,7 +421,7 @@ removeLiveQuery logger serverMetrics prometheusMetrics lqState lqId@(SubscriberD
|
||||
-- removed
|
||||
Just _ -> TMap.adjust (\v -> v - 1) maybeOperationName operationNamesMap
|
||||
Nothing -> return ()
|
||||
cleanHandlerC cohorts ioState cohort parameterizedQueryHash
|
||||
cleanHandlerC cohorts pollerState ioState cohort parameterizedQueryHash
|
||||
liftIO $ EKG.Gauge.dec $ smActiveSubscriptions serverMetrics
|
||||
liftIO $ EKG.Gauge.dec $ smActiveLiveQueries serverMetrics
|
||||
where
|
||||
@ -435,7 +435,7 @@ removeLiveQuery logger serverMetrics prometheusMetrics lqState lqId@(SubscriberD
|
||||
cohortM <- TMap.lookup cohortId (_pCohorts poller)
|
||||
return $ (poller,) <$> cohortM
|
||||
|
||||
cleanHandlerC cohortMap ioState handlerC parameterizedQueryHash = do
|
||||
cleanHandlerC cohortMap pollerState ioState handlerC parameterizedQueryHash = do
|
||||
let curOps = _cExistingSubscribers handlerC
|
||||
newOps = _cNewSubscribers handlerC
|
||||
TMap.delete sinkId curOps
|
||||
@ -461,6 +461,11 @@ removeLiveQuery logger serverMetrics prometheusMetrics lqState lqId@(SubscriberD
|
||||
Just threadRef -> do
|
||||
Immortal.stop threadRef
|
||||
liftIO $ do
|
||||
pollerLastState <- STM.readTVarIO pollerState
|
||||
when (pollerLastState == PRSError)
|
||||
$ Prometheus.Gauge.dec
|
||||
$ submActiveLiveQueryPollersInError
|
||||
$ pmSubscriptionMetrics prometheusMetrics
|
||||
Prometheus.Gauge.dec $ submActiveLiveQueryPollers $ pmSubscriptionMetrics prometheusMetrics
|
||||
let numSubscriptionMetric = submActiveSubscriptions $ pmSubscriptionMetrics $ prometheusMetrics
|
||||
recordMetricWithLabel
|
||||
@ -502,7 +507,7 @@ removeStreamingQuery logger serverMetrics prometheusMetrics subscriptionState (S
|
||||
detM <- getQueryDet streamQMap
|
||||
case detM of
|
||||
Nothing -> return (pure ())
|
||||
Just (Poller cohorts _ ioState parameterizedQueryHash operationNamesMap, currentCohortId, cohort) -> do
|
||||
Just (Poller cohorts pollerState ioState parameterizedQueryHash operationNamesMap, currentCohortId, cohort) -> do
|
||||
TMap.lookup maybeOperationName operationNamesMap >>= \case
|
||||
-- If only one operation name is present in the map, delete it
|
||||
Just 1 -> TMap.delete maybeOperationName operationNamesMap
|
||||
@ -512,7 +517,7 @@ removeStreamingQuery logger serverMetrics prometheusMetrics subscriptionState (S
|
||||
-- removed
|
||||
Just _ -> TMap.adjust (\v -> v - 1) maybeOperationName operationNamesMap
|
||||
Nothing -> return ()
|
||||
cleanHandlerC cohorts ioState (cohort, currentCohortId) parameterizedQueryHash
|
||||
cleanHandlerC cohorts pollerState ioState (cohort, currentCohortId) parameterizedQueryHash
|
||||
liftIO $ do
|
||||
EKG.Gauge.dec $ smActiveSubscriptions serverMetrics
|
||||
EKG.Gauge.dec $ smActiveStreamingSubscriptions serverMetrics
|
||||
@ -529,7 +534,7 @@ removeStreamingQuery logger serverMetrics prometheusMetrics subscriptionState (S
|
||||
cohortM <- TMap.lookup updatedCohortId (_pCohorts poller)
|
||||
return $ (poller,updatedCohortId,) <$> cohortM
|
||||
|
||||
cleanHandlerC cohortMap ioState (handlerC, currentCohortId) parameterizedQueryHash = do
|
||||
cleanHandlerC cohortMap pollerState ioState (handlerC, currentCohortId) parameterizedQueryHash = do
|
||||
let curOps = _cExistingSubscribers handlerC
|
||||
newOps = _cNewSubscribers handlerC
|
||||
TMap.delete sinkId curOps
|
||||
@ -555,6 +560,11 @@ removeStreamingQuery logger serverMetrics prometheusMetrics subscriptionState (S
|
||||
Just threadRef -> do
|
||||
Immortal.stop threadRef
|
||||
liftIO $ do
|
||||
pollerLastState <- STM.readTVarIO pollerState
|
||||
when (pollerLastState == PRSError)
|
||||
$ Prometheus.Gauge.dec
|
||||
$ submActiveStreamingPollersInError
|
||||
$ pmSubscriptionMetrics prometheusMetrics
|
||||
Prometheus.Gauge.dec $ submActiveStreamingPollers $ pmSubscriptionMetrics prometheusMetrics
|
||||
let numSubscriptionMetric = submActiveSubscriptions $ pmSubscriptionMetrics $ prometheusMetrics
|
||||
recordMetricWithLabel
|
||||
|
Loading…
Reference in New Issue
Block a user