server: add active multiplexed subscriptions metrics

PR-URL: https://github.com/hasura/graphql-engine-mono/pull/8644
Co-authored-by: Rob Dominguez <24390149+robertjdominguez@users.noreply.github.com>
GitOrigin-RevId: ff1f646cd6a4b8dd5106db4693b2c6ad465ae052
This commit is contained in:
Karthikeyan Chinnakonda 2023-04-18 21:01:33 +05:30 committed by hasura-bot
parent 170c95e887
commit 41ed0d369b
4 changed files with 71 additions and 27 deletions

View File

@ -188,6 +188,24 @@ curl 'http://127.0.0.1:8080/v1/metrics' -H 'Authorization: Bearer <secret>'
indicates high failure rate of the one-off webhook.
</td>
</tr>
<tr>
<td>
<code>hasura_active_subscription_pollers</code>
</td>
<td>Current number of active subscription pollers. A subscription poller <a href="https://github.com/hasura/graphql-engine/blob/master/architecture/live-queries.md#idea-3-batch-multiple-live-queries-into-one-sql-query">multiplexes </a> similar subscriptions together.
</td>
<td>Gauge</td>
<td>
&#8226; "subscription_kind": streaming|live-query
<br />
</td>
<td>
The value of this metric is supposed to be proportional to the number of uniquely parameterised subscriptions i.e. subscriptions with the same selection set
but with different input arguments and session variables are multiplexed on the same poller.
If this metric is high then it may be an indication that there are too many uniquely parameterised subscriptions
which could be optimized for better performance.
</td>
</tr>
</table>

View File

@ -51,7 +51,7 @@ import Hasura.RQL.Types.Action
import Hasura.RQL.Types.Common (SourceName)
import Hasura.SQL.AnyBackend qualified as AB
import Hasura.Server.Metrics (ServerMetrics (..))
import Hasura.Server.Prometheus (PrometheusMetrics (..))
import Hasura.Server.Prometheus (PrometheusMetrics (..), SubscriptionMetrics (..))
import Hasura.Server.Types (RequestId)
import Language.GraphQL.Draft.Syntax qualified as G
import Refined (unrefine)
@ -200,6 +200,7 @@ addLiveQuery
let !pState = PollerIOState threadRef pollerId
$assertNFHere pState -- so we don't write thunks to mutable vars
STM.atomically $ STM.putTMVar (_pIOState poller) pState
liftIO $ Prometheus.Gauge.inc $ submActiveLiveQueryPollers $ pmSubscriptionMetrics $ prometheusMetrics
liftIO $ EKG.Gauge.inc $ smActiveSubscriptions serverMetrics
liftIO $ Prometheus.Gauge.inc $ pmActiveSubscriptions prometheusMetrics
@ -290,10 +291,12 @@ addStreamSubscriptionQuery
let !pState = PollerIOState threadRef pollerId
$assertNFHere pState -- so we don't write thunks to mutable vars
STM.atomically $ STM.putTMVar (_pIOState handler) pState
liftIO $ Prometheus.Gauge.inc $ submActiveStreamingPollers $ pmSubscriptionMetrics $ prometheusMetrics
liftIO $ EKG.Gauge.inc $ smActiveSubscriptions serverMetrics
liftIO $ Prometheus.Gauge.inc $ pmActiveSubscriptions prometheusMetrics
liftIO $ EKG.Gauge.inc $ smActiveStreamingSubscriptions serverMetrics
liftIO $ do
EKG.Gauge.inc $ smActiveSubscriptions serverMetrics
Prometheus.Gauge.inc $ pmActiveSubscriptions prometheusMetrics
EKG.Gauge.inc $ smActiveStreamingSubscriptions serverMetrics
pure $ SubscriberDetails handlerId (cohortKey, cohortCursorTVar) subscriberId
where
@ -362,7 +365,10 @@ removeLiveQuery logger serverMetrics prometheusMetrics lqState lqId@(SubscriberD
return $
Just $ -- deferred IO:
case threadRefM of
Just threadRef -> Immortal.stop threadRef
Just threadRef -> do
Immortal.stop threadRef
liftIO $ Prometheus.Gauge.dec $ submActiveLiveQueryPollers $ pmSubscriptionMetrics prometheusMetrics
-- This would seem to imply addLiveQuery broke or a bug
-- elsewhere. Be paranoid and log:
Nothing ->
@ -388,9 +394,10 @@ removeStreamingQuery logger serverMetrics prometheusMetrics subscriptionState (S
forM detM $ \(Poller cohorts ioState, currentCohortId, cohort) ->
cleanHandlerC cohorts ioState (cohort, currentCohortId)
sequence_ mbCleanupIO
liftIO $ EKG.Gauge.dec $ smActiveSubscriptions serverMetrics
liftIO $ Prometheus.Gauge.dec $ pmActiveSubscriptions prometheusMetrics
liftIO $ EKG.Gauge.dec $ smActiveStreamingSubscriptions serverMetrics
liftIO $ do
EKG.Gauge.dec $ smActiveSubscriptions serverMetrics
Prometheus.Gauge.dec $ pmActiveSubscriptions prometheusMetrics
EKG.Gauge.dec $ smActiveStreamingSubscriptions serverMetrics
where
streamQMap = _ssStreamQueryMap subscriptionState
@ -423,14 +430,19 @@ removeStreamingQuery logger serverMetrics prometheusMetrics subscriptionState (S
return $
Just $ -- deferred IO:
case threadRefM of
Just threadRef -> Immortal.stop threadRef
Just threadRef -> do
Immortal.stop threadRef
liftIO $
Prometheus.Gauge.dec $
submActiveStreamingPollers $
pmSubscriptionMetrics prometheusMetrics
-- This would seem to imply addStreamSubscriptionQuery broke or a bug
-- elsewhere. Be paranoid and log:
Nothing ->
L.unLogger logger $
L.UnstructuredLog L.LevelError $
fromString $
"In removeLiveQuery no worker thread installed. Please report this as a bug: "
"In removeStreamingQuery no worker thread installed. Please report this as a bug: "
<> " poller_id: "
<> show handlerId
<> ", cohort_id: "

View File

@ -67,13 +67,13 @@ data
"schema_cache_metadata_resource_version"
'GaugeType
()
-- | Current number active live queries
-- | Current number of subscribers connected to live queries
ActiveLiveQueries ::
ServerMetricsSpec
"active_livequeries"
'GaugeType
()
-- | Current number of streaming subscriptions
-- | Current number of subscribers connected to streaming subscriptions
ActiveStreaming ::
ServerMetricsSpec
"active_streaming_subscriptions"
@ -103,18 +103,18 @@ data
-- | Mutable references for the server metrics. See `ServerMetricsSpec` for a
-- description of each metric.
data ServerMetrics = ServerMetrics
{ smWarpThreads :: !Gauge,
smWebsocketConnections :: !Gauge,
smActiveSubscriptions :: !Gauge,
smNumEventsFetchedPerBatch :: !Distribution,
smNumEventHTTPWorkers :: !Gauge,
smEventQueueTime :: !Distribution,
smSchemaCacheMetadataResourceVersion :: !Gauge,
smActiveLiveQueries :: !Gauge,
smActiveStreamingSubscriptions :: !Gauge,
smEventFetchTimePerBatch :: !Distribution,
smEventWebhookProcessingTime :: !Distribution,
smEventProcessingTime :: !Distribution
{ smWarpThreads :: Gauge,
smWebsocketConnections :: Gauge,
smActiveSubscriptions :: Gauge,
smNumEventsFetchedPerBatch :: Distribution,
smNumEventHTTPWorkers :: Gauge,
smEventQueueTime :: Distribution,
smSchemaCacheMetadataResourceVersion :: Gauge,
smActiveLiveQueries :: Gauge,
smActiveStreamingSubscriptions :: Gauge,
smEventFetchTimePerBatch :: Distribution,
smEventWebhookProcessingTime :: Distribution,
smEventProcessingTime :: Distribution
}
createServerMetrics :: Store ServerMetricsSpec -> IO ServerMetrics

View File

@ -15,6 +15,7 @@ module Hasura.Server.Prometheus
incWebsocketConnections,
decWebsocketConnections,
ScheduledTriggerMetrics (..),
SubscriptionMetrics (..),
)
where
@ -40,7 +41,8 @@ data PrometheusMetrics = PrometheusMetrics
pmWebSocketBytesSent :: Counter,
pmActionBytesReceived :: Counter,
pmActionBytesSent :: Counter,
pmScheduledTriggerMetrics :: ScheduledTriggerMetrics
pmScheduledTriggerMetrics :: ScheduledTriggerMetrics,
pmSubscriptionMetrics :: SubscriptionMetrics
}
data GraphQLRequestMetrics = GraphQLRequestMetrics
@ -80,6 +82,11 @@ data ScheduledTriggerMetrics = ScheduledTriggerMetrics
stmOneOffEventsProcessedTotalFailure :: Counter
}
data SubscriptionMetrics = SubscriptionMetrics
{ submActiveLiveQueryPollers :: Gauge,
submActiveStreamingPollers :: Gauge
}
-- | Create dummy mutable references without associating them to a metrics
-- store.
makeDummyPrometheusMetrics :: IO PrometheusMetrics
@ -93,6 +100,7 @@ makeDummyPrometheusMetrics = do
pmActionBytesReceived <- Counter.new
pmActionBytesSent <- Counter.new
pmScheduledTriggerMetrics <- makeDummyScheduledTriggerMetrics
pmSubscriptionMetrics <- makeDummySubscriptionMetrics
pure PrometheusMetrics {..}
makeDummyGraphQLRequestMetrics :: IO GraphQLRequestMetrics
@ -135,6 +143,12 @@ makeDummyScheduledTriggerMetrics = do
stmOneOffEventsProcessedTotalFailure <- Counter.new
pure ScheduledTriggerMetrics {..}
makeDummySubscriptionMetrics :: IO SubscriptionMetrics
makeDummySubscriptionMetrics = do
submActiveLiveQueryPollers <- Gauge.new
submActiveStreamingPollers <- Gauge.new
pure SubscriptionMetrics {..}
--------------------------------------------------------------------------------
-- | A mutable reference for atomically sampling the number of websocket