From 770407110cd7a21fd5b584cf8308b3009be90239 Mon Sep 17 00:00:00 2001 From: Naveen Naidu Date: Thu, 18 May 2023 18:25:53 +0530 Subject: [PATCH] server: add total time, db exec time subscription metrics PR-URL: https://github.com/hasura/graphql-engine-mono/pull/8899 Co-authored-by: paritosh-08 <85472423+paritosh-08@users.noreply.github.com> Co-authored-by: Rob Dominguez <24390149+robertjdominguez@users.noreply.github.com> GitOrigin-RevId: c8c4a89576ae95265a8e4f4e6803a12ba7e840d4 --- .../graphql-engine-flags/reference.mdx | 2 +- docs/docs/enterprise/metrics.mdx | 96 +++++++- .../Execute/Subscription/Poll/Common.hs | 11 +- .../Execute/Subscription/Poll/LiveQuery.hs | 27 +- .../Subscription/Poll/StreamingQuery.hs | 25 +- .../GraphQL/Execute/Subscription/State.hs | 232 +++++++++++++----- .../GraphQL/Execute/Subscription/TMap.hs | 4 + .../Hasura/GraphQL/ParameterizedQueryHash.hs | 2 +- .../Hasura/GraphQL/Transport/WSServerApp.hs | 7 +- .../Hasura/GraphQL/Transport/WebSocket.hs | 64 +++-- server/src-lib/Hasura/Server/Prometheus.hs | 94 ++++++- server/src-lib/Hasura/Server/Types.hs | 13 +- .../Test/Hasura/StreamingSubscriptionSuite.hs | 29 ++- 13 files changed, 472 insertions(+), 134 deletions(-) diff --git a/docs/docs/deployment/graphql-engine-flags/reference.mdx b/docs/docs/deployment/graphql-engine-flags/reference.mdx index b41a77383b2..b74c78d4ec5 100644 --- a/docs/docs/deployment/graphql-engine-flags/reference.mdx +++ b/docs/docs/deployment/graphql-engine-flags/reference.mdx @@ -342,7 +342,7 @@ Enable the Hasura Console (served by the server on `/` and `/console`). ### Enable High-cardinality Labels for Metrics Enable high-cardinality labels for [Prometheus Metrics](/enterprise/metrics.mdx). Enabling this setting will add more labels to -some of the metrics (e.g. `operationName` label for Graphql subscription metrics). +some of the metrics (e.g. `operation_name` label for Graphql subscription metrics). | | | | ------------------- | ------------------------------------------------- | diff --git a/docs/docs/enterprise/metrics.mdx b/docs/docs/enterprise/metrics.mdx index 9ca9bc07087..3b769da67fe 100644 --- a/docs/docs/enterprise/metrics.mdx +++ b/docs/docs/enterprise/metrics.mdx @@ -44,9 +44,10 @@ The metrics endpoint should be configured with a secret to prevent misuse and sh Starting in `v2.26.0`, Hasura GraphQL Engine exposes metrics with high-cardinality labels by default. -You can disable [the cardinality of labels for -metrics](/deployment/graphql-engine-flags/reference.mdx#enable-high-cardinality-labels-for-metrics) if you are -experiencing high memory usage, which can be due to a large number of labels in the metrics (typically more than 10000). +You can disable +[the cardinality of labels for metrics](/deployment/graphql-engine-flags/reference.mdx#enable-high-cardinality-labels-for-metrics) +if you are experiencing high memory usage, which can be due to a large number of labels in the metrics (typically more +than 10000). ::: @@ -54,6 +55,16 @@ experiencing high memory usage, which can be due to a large number of labels in The following metrics are exported by Hasura GraphQL Engine: +### Hasura active subscriptions + +Current number of active subscriptions, representing the subscription load on the server. + +| | | +| ------ | ------------------------------------------------------------------------------------------ | +| Name | `hasura_active_subscriptions` | +| Type | Gauge | +| Labels | `subscription_kind`: streaming \| live-query, `operation_name`, `parameterized_query_hash` | + ### Hasura active subscription pollers Current number of active subscription pollers. A subscription poller @@ -84,16 +95,6 @@ runtime errors. | Type | Gauge | | Labels | `subscription_kind`: streaming \| live-query | -### Hasura active subscriptions - -Current number of active subscriptions, representing the subscription load on the server. - -| | | -| ------ | ----------------------------- | -| Name | `hasura_active_subscriptions` | -| Type | Gauge | -| Labels | none | - ### Hasura Cache requests count Tracks cache hit and miss requests, which helps in monitoring and optimizing cache utilization. @@ -283,6 +284,75 @@ Current number of active PostgreSQL connections. Compare this to | Type | Gauge | | Labels | `source_name`: name of the database
`conn_info`: connection url string (password omitted) or name of the connection url environment variable
`role`: primary \| replica | +### Hasura subscription database execution time + +The time taken to run the subscription's multiplexed query in the database for a single batch. + +A subscription poller +[multiplexes](https://github.com/hasura/graphql-engine/blob/master/architecture/live-queries.md#idea-3-batch-multiple-live-queries-into-one-sql-query) +similar subscriptions together. During every run (every 1 second by default), the poller splits the different variables +for the multiplexed query into batches (by default 100) and execute the batches. This metric observes the time taken for +each batch to execute on the database. + +If this metric is high, then it may be an indication that the database is not performing as expected, you should +consider investigating the subscription query and see if indexes can help improve performance. + +| | | +| ------ | ------------------------------------------------------------------------------------------ | +| Name | `hasura_subscription_db_execution_time_seconds` | +| Type | Histogram

Buckets: 0.000001, 0.0001, 0.01, 0.1, 0.3, 1, 3, 10, 30, 100 | +| Labels | `subscription_kind`: streaming \| live-query, `operation_name`, `parameterized_query_hash` | + +### Hasura subscription total time + +The time taken to complete running of one subscription poller. + +A subscription poller +[multiplexes](https://github.com/hasura/graphql-engine/blob/master/architecture/live-queries.md#idea-3-batch-multiple-live-queries-into-one-sql-query) +similar subscriptions together. This subscription poller runs every 1 second by default and queries the database with +the multiplexed query to fetch the latest data. In a polling instance, the poller not only queries the database but does +other operations like splitting similar queries into batches (by default 100) before fetching their data from the +database, etc. **This metric is the total time taken to complete all the operations in a single poll.** + +If the value of this metric is high, then it may be an indication that the multiplexed query is taking longer to execute +in the database, verify with +[`hasura_subscription_db_execution_time_seconds`](/enterprise/metrics.mdx/#hasura-subscription-database-execution-time) +metric. + +In a single poll, the subscription poller splits the different variables for the multiplexed query into batches (by +default 100) and executes the batches. We use the `hasura_subscription_db_execution_time_seconds` metric to observe the +time taken for each batch to execute on the database. This means for a single poll there can be multiple values for +`hasura_subscription_db_execution_time_seconds` metric. + +Let's look at an example to understand these metrics better: + +Say we have 650 subscriptions with the same selection set but different input arguments. These 650 subscriptions will be +grouped to form one multiplexed query. A single poller would be created to run this multiplexed query. This poller will +run every 1 second. + +The default batch size in hasura is 100, so the 650 subscriptions will be split into 7 batches for execution during a +single poll. + +During a single poll: + +- Batch 1: `hasura_subscription_db_execution_time_seconds` = 0.002 seconds +- Batch 2: `hasura_subscription_db_execution_time_seconds` = 0.001 seconds +- Batch 3: `hasura_subscription_db_execution_time_seconds` = 0.003 seconds +- Batch 4: `hasura_subscription_db_execution_time_seconds` = 0.001 seconds +- Batch 5: `hasura_subscription_db_execution_time_seconds` = 0.002 seconds +- Batch 6: `hasura_subscription_db_execution_time_seconds` = 0.001 seconds +- Batch 7: `hasura_subscription_db_execution_time_seconds` = 0.002 seconds + +The `hasura_subscription_total_time_seconds` would be sum of all the database execution times shown in the batches, plus +some extra process time for other tasks the poller does during a single poll. In this case, it would be approximately +0.013 seconds. + +| | | +| ------ | ------------------------------------------------------------------------------------------ | +| Name | `hasura_subscription_total_time_seconds` | +| Type | Histogram

Buckets: 0.000001, 0.0001, 0.01, 0.1, 0.3, 1, 3, 10, 30, 100 | +| Labels | `subscription_kind`: streaming \| live-query, `operation_name`, `parameterized_query_hash` | + ### Hasura WebSocket connections Current number of active WebSocket connections, representing the WebSocket load on the server. diff --git a/server/src-lib/Hasura/GraphQL/Execute/Subscription/Poll/Common.hs b/server/src-lib/Hasura/GraphQL/Execute/Subscription/Poll/Common.hs index cd32268e98e..937407e47c1 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/Subscription/Poll/Common.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/Subscription/Poll/Common.hs @@ -247,7 +247,11 @@ data Poller streamCursor = Poller -- This var is "write once", moving monotonically from empty to full. -- TODO this could probably be tightened up to something like -- 'STM PollerIOState' - _pIOState :: STM.TMVar PollerIOState + _pIOState :: STM.TMVar PollerIOState, + _pParameterizedQueryHash :: ParameterizedQueryHash, + -- The operation names of the subscriptions that are part of this poller. This is + -- used while emitting subscription metrics + _pOperationNamesMap :: TMap.TMap (Maybe OperationName) Int } data PollerIOState = PollerIOState @@ -291,7 +295,7 @@ dumpPollerMap :: Bool -> PollerMap streamCursor -> IO J.Value dumpPollerMap extended pollerMap = fmap J.toJSON $ do entries <- STM.atomically $ ListT.toList $ STMMap.listT pollerMap - forM entries $ \(pollerKey, Poller cohortsMap _responseState ioState) -> + forM entries $ \(pollerKey, Poller cohortsMap _responseState ioState _paramQueryHash _opNames) -> AB.dispatchAnyBackend @Backend (unBackendPollerKey pollerKey) $ \(PollerKey source role query _connectionKey) -> do PollerIOState threadId pollerId <- STM.atomically $ STM.readTMVar ioState cohortsJ <- @@ -305,7 +309,8 @@ dumpPollerMap extended pollerMap = "thread_id" J..= show (Immortal.threadId threadId), "poller_id" J..= pollerId, "multiplexed_query" J..= query, - "cohorts" J..= cohortsJ + "cohorts" J..= cohortsJ, + "parameterized_query_hash" J..= _paramQueryHash ] -- | An ID to track unique 'Poller's, so that we can gather metrics about each diff --git a/server/src-lib/Hasura/GraphQL/Execute/Subscription/Poll/LiveQuery.hs b/server/src-lib/Hasura/GraphQL/Execute/Subscription/Poll/LiveQuery.hs index 9cf06b7e318..7ce1a5a438c 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/Subscription/Poll/LiveQuery.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/Subscription/Poll/LiveQuery.hs @@ -33,9 +33,11 @@ import Hasura.RQL.Types.BackendType (BackendType (..), PostgresKind (Vanilla)) import Hasura.RQL.Types.Common (SourceName) import Hasura.RQL.Types.Roles (RoleName) import Hasura.RQL.Types.Subscription (SubscriptionType (..)) -import Hasura.Server.Prometheus (PrometheusMetrics (..), SubscriptionMetrics (..)) +import Hasura.Server.Prometheus (PrometheusMetrics (..), SubscriptionMetrics (..), liveQuerySubscriptionLabel, recordSubcriptionMetric) +import Hasura.Server.Types (GranularPrometheusMetricsState (..)) import Refined (unrefine) import System.Metrics.Prometheus.Gauge qualified as Prometheus.Gauge +import System.Metrics.Prometheus.HistogramVector qualified as HistogramVector pushResultToCohort :: GQResult BS.ByteString -> @@ -86,9 +88,12 @@ pollLiveQuery :: CohortMap 'LiveQuery -> SubscriptionPostPollHook -> PrometheusMetrics -> + IO GranularPrometheusMetricsState -> + TMap.TMap (Maybe OperationName) Int -> ResolvedConnectionTemplate b -> IO () -pollLiveQuery pollerId pollerResponseState lqOpts (sourceName, sourceConfig) roleName parameterizedQueryHash query cohortMap postPollHook prometheusMetrics resolvedConnectionTemplate = do +pollLiveQuery pollerId pollerResponseState lqOpts (sourceName, sourceConfig) roleName parameterizedQueryHash query cohortMap postPollHook prometheusMetrics granularPrometheusMetricsState operationNamesMap' resolvedConnectionTemplate = do + operationNamesMap <- STM.atomically $ TMap.getMap operationNamesMap' (totalTime, (snapshotTime, batchesDetails)) <- withElapsedTime $ do -- snapshot the current cohorts and split them into batches (snapshotTime, cohortBatches) <- withElapsedTime $ do @@ -105,6 +110,15 @@ pollLiveQuery pollerId pollerResponseState lqOpts (sourceName, sourceConfig) rol batchesDetails <- A.forConcurrently cohortBatches $ \(batchId, cohorts) -> do (queryExecutionTime, mxRes) <- runDBSubscription @b sourceConfig query (over (each . _2) C._csVariables cohorts) resolvedConnectionTemplate + let dbExecTimeMetric = submDBExecTotalTime $ pmSubscriptionMetrics $ prometheusMetrics + recordSubcriptionMetric + granularPrometheusMetricsState + True + operationNamesMap + parameterizedQueryHash + liveQuerySubscriptionLabel + (flip (HistogramVector.observe dbExecTimeMetric) (realToFrac queryExecutionTime)) + previousPollerResponseState <- STM.readTVarIO pollerResponseState case mxRes of @@ -152,7 +166,6 @@ pollLiveQuery pollerId pollerResponseState lqOpts (sourceName, sourceConfig) rol batchId cohortsExecutionDetails batchResponseSize - pure (snapshotTime, batchesDetails) let pollDetails = @@ -169,6 +182,14 @@ pollLiveQuery pollerId pollerResponseState lqOpts (sourceName, sourceConfig) rol _pdParameterizedQueryHash = parameterizedQueryHash } postPollHook pollDetails + let totalTimeMetric = submTotalTime $ pmSubscriptionMetrics $ prometheusMetrics + recordSubcriptionMetric + granularPrometheusMetricsState + True + operationNamesMap + parameterizedQueryHash + liveQuerySubscriptionLabel + (flip (HistogramVector.observe totalTimeMetric) (realToFrac totalTime)) where SubscriptionsOptions batchSize _ = lqOpts diff --git a/server/src-lib/Hasura/GraphQL/Execute/Subscription/Poll/StreamingQuery.hs b/server/src-lib/Hasura/GraphQL/Execute/Subscription/Poll/StreamingQuery.hs index a2248ad9e9e..da6e83311b4 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/Subscription/Poll/StreamingQuery.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/Subscription/Poll/StreamingQuery.hs @@ -37,10 +37,12 @@ import Hasura.RQL.Types.Common (SourceName) import Hasura.RQL.Types.Roles (RoleName) import Hasura.RQL.Types.Subscription (SubscriptionType (..)) import Hasura.SQL.Value (TxtEncodedVal (..)) -import Hasura.Server.Prometheus (PrometheusMetrics (..), SubscriptionMetrics (..)) +import Hasura.Server.Prometheus (PrometheusMetrics (..), SubscriptionMetrics (..), recordSubcriptionMetric, streamingSubscriptionLabel) +import Hasura.Server.Types (GranularPrometheusMetricsState (..)) import Language.GraphQL.Draft.Syntax qualified as G import Refined (unrefine) import System.Metrics.Prometheus.Gauge qualified as Prometheus.Gauge +import System.Metrics.Prometheus.HistogramVector qualified as HistogramVector import Text.Shakespeare.Text (st) {- Note [Streaming subscriptions rebuilding cohort map] @@ -249,9 +251,12 @@ pollStreamingQuery :: SubscriptionPostPollHook -> Maybe (IO ()) -> -- Optional IO action to make this function (pollStreamingQuery) testable PrometheusMetrics -> + IO GranularPrometheusMetricsState -> + TMap.TMap (Maybe OperationName) Int -> ResolvedConnectionTemplate b -> IO () -pollStreamingQuery pollerId pollerResponseState streamingQueryOpts (sourceName, sourceConfig) roleName parameterizedQueryHash query cohortMap rootFieldName postPollHook testActionMaybe prometheusMetrics resolvedConnectionTemplate = do +pollStreamingQuery pollerId pollerResponseState streamingQueryOpts (sourceName, sourceConfig) roleName parameterizedQueryHash query cohortMap rootFieldName postPollHook testActionMaybe prometheusMetrics granularPrometheusMetricsState operationNames' resolvedConnectionTemplate = do + operationNames <- STM.atomically $ TMap.getMap operationNames' (totalTime, (snapshotTime, batchesDetailsAndProcessedCohorts)) <- withElapsedTime $ do -- snapshot the current cohorts and split them into batches -- This STM transaction is a read only transaction i.e. it doesn't mutate any state @@ -275,6 +280,14 @@ pollStreamingQuery pollerId pollerResponseState streamingQueryOpts (sourceName, query (over (each . _2) C._csVariables $ fmap (fmap fst) cohorts) resolvedConnectionTemplate + let dbExecTimeMetric = submDBExecTotalTime $ pmSubscriptionMetrics $ prometheusMetrics + recordSubcriptionMetric + granularPrometheusMetricsState + True + operationNames + parameterizedQueryHash + streamingSubscriptionLabel + (flip (HistogramVector.observe dbExecTimeMetric) (realToFrac queryExecutionTime)) previousPollerResponseState <- STM.readTVarIO pollerResponseState @@ -426,6 +439,14 @@ pollStreamingQuery pollerId pollerResponseState streamingQueryOpts (sourceName, currentCohorts TMap.replace cohortMap updatedCohortsMap postPollHook pollDetails + let totalTimeMetric = submTotalTime $ pmSubscriptionMetrics $ prometheusMetrics + recordSubcriptionMetric + granularPrometheusMetricsState + True + operationNames + parameterizedQueryHash + streamingSubscriptionLabel + (flip (HistogramVector.observe totalTimeMetric) (realToFrac totalTime)) where SubscriptionsOptions batchSize _ = streamingQueryOpts diff --git a/server/src-lib/Hasura/GraphQL/Execute/Subscription/State.hs b/server/src-lib/Hasura/GraphQL/Execute/Subscription/State.hs index 6190ba49cf3..e7d5fec3a77 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/Subscription/State.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/Subscription/State.hs @@ -52,13 +52,22 @@ import Hasura.RQL.Types.Action import Hasura.RQL.Types.Common (SourceName) import Hasura.SQL.AnyBackend qualified as AB import Hasura.Server.Metrics (ServerMetrics (..)) -import Hasura.Server.Prometheus (PrometheusMetrics (..), SubscriptionMetrics (..)) -import Hasura.Server.Types (RequestId) +import Hasura.Server.Prometheus + ( DynamicSubscriptionLabel (..), + PrometheusMetrics (..), + SubscriptionLabel (..), + SubscriptionMetrics (..), + liveQuerySubscriptionLabel, + recordMetricWithLabel, + streamingSubscriptionLabel, + ) +import Hasura.Server.Types (GranularPrometheusMetricsState (..), RequestId) import Language.GraphQL.Draft.Syntax qualified as G import Refined (unrefine) import StmContainers.Map qualified as STMMap import System.Metrics.Gauge qualified as EKG.Gauge import System.Metrics.Prometheus.Gauge qualified as Prometheus.Gauge +import System.Metrics.Prometheus.GaugeVector qualified as GaugeVector -- | The top-level datatype that holds the state for all active subscriptions. -- @@ -117,8 +126,10 @@ findPollerForSubscriber :: CohortKey -> (Subscriber -> Cohort streamCursorVars -> STM.STM streamCursorVars) -> (Subscriber -> Poller streamCursorVars -> STM.STM streamCursorVars) -> + ParameterizedQueryHash -> + Maybe OperationName -> STM.STM ((Maybe (Poller streamCursorVars)), streamCursorVars) -findPollerForSubscriber subscriber pollerMap pollerKey cohortKey addToCohort addToPoller = +findPollerForSubscriber subscriber pollerMap pollerKey cohortKey addToCohort addToPoller parameterizedQueryHash maybeOperationName = -- a handler is returned only when it is newly created STMMap.lookup pollerKey pollerMap >>= \case Just poller -> do @@ -130,11 +141,18 @@ findPollerForSubscriber subscriber pollerMap pollerKey cohortKey addToCohort add -- cohort not found. Create a cohort with the subscriber and add -- the cohort to the poller Nothing -> addToPoller subscriber poller + -- Add the operation name of the subcription to the poller, if it doesn't exist + -- else increment the count for the operation name + TMap.lookup maybeOperationName (_pOperationNamesMap poller) >>= \case + Nothing -> TMap.insert 1 maybeOperationName (_pOperationNamesMap poller) + Just _ -> TMap.adjust (+ 1) maybeOperationName (_pOperationNamesMap poller) return (Nothing, cursorVars) Nothing -> do -- no poller found, so create one with the cohort -- and the subscriber within it. - !poller <- Poller <$> TMap.new <*> STM.newTVar PRSSuccess <*> STM.newEmptyTMVar + operationNamesMap <- TMap.new + TMap.insert 1 maybeOperationName operationNamesMap + !poller <- Poller <$> TMap.new <*> STM.newTVar PRSSuccess <*> STM.newEmptyTMVar <*> pure parameterizedQueryHash <*> pure operationNamesMap cursorVars <- addToPoller subscriber poller STMMap.insert poller pollerKey pollerMap return $ (Just poller, cursorVars) @@ -155,6 +173,7 @@ addLiveQuery :: Maybe OperationName -> RequestId -> SubscriptionQueryPlan b (MultiplexedQuery b) -> + IO GranularPrometheusMetricsState -> -- | the action to be executed when result changes OnChange -> IO LiveQuerySubscriberDetails @@ -170,6 +189,7 @@ addLiveQuery operationName requestId plan + granularPrometheusMetricsState onResultAction = do -- CAREFUL!: It's absolutely crucial that we can't throw any exceptions here! @@ -187,6 +207,8 @@ addLiveQuery cohortKey addToCohort addToPoller + parameterizedQueryHash + operationName -- we can then attach a polling thread if it is new the livequery can only be -- cancelled after putTMVar @@ -207,6 +229,8 @@ addLiveQuery (_pCohorts poller) postPollHook prometheusMetrics + granularPrometheusMetricsState + (_pOperationNamesMap poller) resolvedConnectionTemplate sleep $ unrefine $ unRefetchInterval refetchInterval let !pState = PollerIOState threadRef pollerId @@ -215,7 +239,14 @@ addLiveQuery liftIO $ Prometheus.Gauge.inc $ submActiveLiveQueryPollers $ pmSubscriptionMetrics $ prometheusMetrics liftIO $ EKG.Gauge.inc $ smActiveSubscriptions serverMetrics - liftIO $ Prometheus.Gauge.inc $ pmActiveSubscriptions prometheusMetrics + let promMetricGranularLabel = SubscriptionLabel liveQuerySubscriptionLabel (Just $ DynamicSubscriptionLabel parameterizedQueryHash operationName) + promMetricLabel = SubscriptionLabel liveQuerySubscriptionLabel Nothing + let numSubscriptionMetric = submActiveSubscriptions $ pmSubscriptionMetrics $ prometheusMetrics + recordMetricWithLabel + granularPrometheusMetricsState + True + (GaugeVector.inc numSubscriptionMetric promMetricGranularLabel) + (GaugeVector.inc numSubscriptionMetric promMetricLabel) liftIO $ EKG.Gauge.inc $ smActiveLiveQueries serverMetrics pure $ SubscriberDetails handlerId cohortKey subscriberId @@ -256,6 +287,7 @@ addStreamSubscriptionQuery :: -- | root field name G.Name -> SubscriptionQueryPlan b (MultiplexedQuery b) -> + IO GranularPrometheusMetricsState -> -- | the action to be executed when result changes OnChange -> IO StreamingSubscriberDetails @@ -272,6 +304,7 @@ addStreamSubscriptionQuery requestId rootFieldName plan + granularPrometheusMetricsState onResultAction = do -- CAREFUL!: It's absolutely crucial that we can't throw any exceptions here! @@ -289,6 +322,8 @@ addStreamSubscriptionQuery cohortKey addToCohort addToPoller + parameterizedQueryHash + operationName -- we can then attach a polling thread if it is new the subscription can only be -- cancelled after putTMVar @@ -298,7 +333,22 @@ addStreamSubscriptionQuery forever $ do (_, streamQOpts) <- getSubscriptionOptions let SubscriptionsOptions _ refetchInterval = streamQOpts - pollStreamingQuery @b pollerId (_pPollerState handler) streamQOpts (source, sourceConfig) role parameterizedQueryHash query (_pCohorts handler) rootFieldName postPollHook Nothing prometheusMetrics resolvedConnectionTemplate + pollStreamingQuery @b + pollerId + (_pPollerState handler) + streamQOpts + (source, sourceConfig) + role + parameterizedQueryHash + query + (_pCohorts handler) + rootFieldName + postPollHook + Nothing + prometheusMetrics + granularPrometheusMetricsState + (_pOperationNamesMap handler) + resolvedConnectionTemplate sleep $ unrefine $ unRefetchInterval refetchInterval let !pState = PollerIOState threadRef pollerId $assertNFHere pState -- so we don't write thunks to mutable vars @@ -307,9 +357,17 @@ addStreamSubscriptionQuery liftIO $ do EKG.Gauge.inc $ smActiveSubscriptions serverMetrics - Prometheus.Gauge.inc $ pmActiveSubscriptions prometheusMetrics EKG.Gauge.inc $ smActiveStreamingSubscriptions serverMetrics + let promMetricGranularLabel = SubscriptionLabel streamingSubscriptionLabel (Just $ DynamicSubscriptionLabel parameterizedQueryHash operationName) + promMetricLabel = SubscriptionLabel streamingSubscriptionLabel Nothing + numSubscriptionMetric = submActiveSubscriptions $ pmSubscriptionMetrics $ prometheusMetrics + recordMetricWithLabel + granularPrometheusMetricsState + True + (GaugeVector.inc numSubscriptionMetric promMetricGranularLabel) + (GaugeVector.inc numSubscriptionMetric promMetricLabel) + pure $ SubscriberDetails handlerId (cohortKey, cohortCursorTVar) subscriberId where SubscriptionsState _ streamQueryMap postPollHook _ = subscriptionState @@ -336,16 +394,27 @@ removeLiveQuery :: SubscriptionsState -> -- the query and the associated operation LiveQuerySubscriberDetails -> + IO GranularPrometheusMetricsState -> + Maybe OperationName -> IO () -removeLiveQuery logger serverMetrics prometheusMetrics lqState lqId@(SubscriberDetails handlerId cohortId sinkId) = mask_ $ do - mbCleanupIO <- STM.atomically $ do - detM <- getQueryDet lqMap - fmap join $ - forM detM $ \(Poller cohorts _ ioState, cohort) -> - cleanHandlerC cohorts ioState cohort - sequence_ mbCleanupIO +removeLiveQuery logger serverMetrics prometheusMetrics lqState lqId@(SubscriberDetails handlerId cohortId sinkId) granularPrometheusMetricsState maybeOperationName = mask_ $ do + join $ + STM.atomically $ do + detM <- getQueryDet lqMap + case detM of + Nothing -> return (pure ()) + Just (Poller cohorts _ ioState parameterizedQueryHash operationNamesMap, cohort) -> do + TMap.lookup maybeOperationName operationNamesMap >>= \case + -- If only one operation name is present in the map, delete it + Just 1 -> TMap.delete maybeOperationName operationNamesMap + -- If the count of a operation name is more than 1, then it means there + -- are more subscriptions with the same name and we should keep emitting + -- the metrics until the all the subscription with that operaion name are + -- removed + Just _ -> TMap.adjust (\v -> v - 1) maybeOperationName operationNamesMap + Nothing -> return () + cleanHandlerC cohorts ioState cohort parameterizedQueryHash liftIO $ EKG.Gauge.dec $ smActiveSubscriptions serverMetrics - liftIO $ Prometheus.Gauge.dec $ pmActiveSubscriptions prometheusMetrics liftIO $ EKG.Gauge.dec $ smActiveLiveQueries serverMetrics where lqMap = _ssLiveQueryMap lqState @@ -357,7 +426,7 @@ removeLiveQuery logger serverMetrics prometheusMetrics lqState lqId@(SubscriberD cohortM <- TMap.lookup cohortId (_pCohorts poller) return $ (poller,) <$> cohortM - cleanHandlerC cohortMap ioState handlerC = do + cleanHandlerC cohortMap ioState handlerC parameterizedQueryHash = do let curOps = _cExistingSubscribers handlerC newOps = _cNewSubscribers handlerC TMap.delete sinkId curOps @@ -368,6 +437,8 @@ removeLiveQuery logger serverMetrics prometheusMetrics lqState lqId@(SubscriberD <*> TMap.null newOps when cohortIsEmpty $ TMap.delete cohortId cohortMap handlerIsEmpty <- TMap.null cohortMap + let promMetricGranularLabel = SubscriptionLabel liveQuerySubscriptionLabel (Just $ DynamicSubscriptionLabel parameterizedQueryHash maybeOperationName) + promMetricLabel = SubscriptionLabel liveQuerySubscriptionLabel Nothing -- when there is no need for handler i.e, this happens to be the last -- operation, take the ref for the polling thread to cancel it if handlerIsEmpty @@ -375,21 +446,34 @@ removeLiveQuery logger serverMetrics prometheusMetrics lqState lqId@(SubscriberD STMMap.delete handlerId lqMap threadRefM <- fmap _pThread <$> STM.tryReadTMVar ioState return $ - Just $ -- deferred IO: - case threadRefM of - Just threadRef -> do - Immortal.stop threadRef - liftIO $ Prometheus.Gauge.dec $ submActiveLiveQueryPollers $ pmSubscriptionMetrics prometheusMetrics - - -- This would seem to imply addLiveQuery broke or a bug - -- elsewhere. Be paranoid and log: - Nothing -> - L.unLogger logger $ - L.UnstructuredLog L.LevelError $ - fromString $ - "In removeLiveQuery no worker thread installed. Please report this as a bug: " - <> show lqId - else return Nothing + -- deferred IO: + case threadRefM of + Just threadRef -> do + Immortal.stop threadRef + liftIO $ do + Prometheus.Gauge.dec $ submActiveLiveQueryPollers $ pmSubscriptionMetrics prometheusMetrics + let numSubscriptionMetric = submActiveSubscriptions $ pmSubscriptionMetrics $ prometheusMetrics + recordMetricWithLabel + granularPrometheusMetricsState + True + (GaugeVector.dec numSubscriptionMetric promMetricGranularLabel) + (GaugeVector.dec numSubscriptionMetric promMetricLabel) + -- This would seem to imply addLiveQuery broke or a bug + -- elsewhere. Be paranoid and log: + Nothing -> + L.unLogger logger $ + L.UnstructuredLog L.LevelError $ + fromString $ + "In removeLiveQuery no worker thread installed. Please report this as a bug: " + <> show lqId + else do + let numSubscriptionMetric = submActiveSubscriptions $ pmSubscriptionMetrics $ prometheusMetrics + return $ + recordMetricWithLabel + granularPrometheusMetricsState + True + (GaugeVector.dec numSubscriptionMetric promMetricGranularLabel) + (GaugeVector.dec numSubscriptionMetric promMetricLabel) removeStreamingQuery :: L.Logger L.Hasura -> @@ -398,17 +482,28 @@ removeStreamingQuery :: SubscriptionsState -> -- the query and the associated operation StreamingSubscriberDetails -> + IO GranularPrometheusMetricsState -> + Maybe OperationName -> IO () -removeStreamingQuery logger serverMetrics prometheusMetrics subscriptionState (SubscriberDetails handlerId (cohortId, cursorVariableTV) sinkId) = mask_ $ do - mbCleanupIO <- STM.atomically $ do - detM <- getQueryDet streamQMap - fmap join $ - forM detM $ \(Poller cohorts _ ioState, currentCohortId, cohort) -> - cleanHandlerC cohorts ioState (cohort, currentCohortId) - sequence_ mbCleanupIO +removeStreamingQuery logger serverMetrics prometheusMetrics subscriptionState (SubscriberDetails handlerId (cohortId, cursorVariableTV) sinkId) granularPrometheusMetricsState maybeOperationName = mask_ $ do + join $ + STM.atomically $ do + detM <- getQueryDet streamQMap + case detM of + Nothing -> return (pure ()) + Just (Poller cohorts _ ioState parameterizedQueryHash operationNamesMap, currentCohortId, cohort) -> do + TMap.lookup maybeOperationName operationNamesMap >>= \case + -- If only one operation name is present in the map, delete it + Just 1 -> TMap.delete maybeOperationName operationNamesMap + -- If the count of a operation name is more than 1, then it means there + -- are more subscriptions with the same name and we should keep emitting + -- the metrics until the all the subscription with the operaion name are + -- removed + Just _ -> TMap.adjust (\v -> v - 1) maybeOperationName operationNamesMap + Nothing -> return () + cleanHandlerC cohorts ioState (cohort, currentCohortId) parameterizedQueryHash liftIO $ do EKG.Gauge.dec $ smActiveSubscriptions serverMetrics - Prometheus.Gauge.dec $ pmActiveSubscriptions prometheusMetrics EKG.Gauge.dec $ smActiveStreamingSubscriptions serverMetrics where streamQMap = _ssStreamQueryMap subscriptionState @@ -422,7 +517,7 @@ removeStreamingQuery logger serverMetrics prometheusMetrics subscriptionState (S cohortM <- TMap.lookup updatedCohortId (_pCohorts poller) return $ (poller,updatedCohortId,) <$> cohortM - cleanHandlerC cohortMap ioState (handlerC, currentCohortId) = do + cleanHandlerC cohortMap ioState (handlerC, currentCohortId) parameterizedQueryHash = do let curOps = _cExistingSubscribers handlerC newOps = _cNewSubscribers handlerC TMap.delete sinkId curOps @@ -433,6 +528,8 @@ removeStreamingQuery logger serverMetrics prometheusMetrics subscriptionState (S <*> TMap.null newOps when cohortIsEmpty $ TMap.delete currentCohortId cohortMap handlerIsEmpty <- TMap.null cohortMap + let promMetricGranularLabel = SubscriptionLabel streamingSubscriptionLabel (Just $ DynamicSubscriptionLabel parameterizedQueryHash maybeOperationName) + promMetricLabel = SubscriptionLabel streamingSubscriptionLabel Nothing -- when there is no need for handler i.e, -- operation, take the ref for the polling thread to cancel it if handlerIsEmpty @@ -440,28 +537,39 @@ removeStreamingQuery logger serverMetrics prometheusMetrics subscriptionState (S STMMap.delete handlerId streamQMap threadRefM <- fmap _pThread <$> STM.tryReadTMVar ioState return $ - Just $ -- deferred IO: - case threadRefM of - Just threadRef -> do - Immortal.stop threadRef - liftIO $ - Prometheus.Gauge.dec $ - submActiveStreamingPollers $ - pmSubscriptionMetrics prometheusMetrics - -- This would seem to imply addStreamSubscriptionQuery broke or a bug - -- elsewhere. Be paranoid and log: - Nothing -> - L.unLogger logger $ - L.UnstructuredLog L.LevelError $ - fromString $ - "In removeStreamingQuery no worker thread installed. Please report this as a bug: " - <> " poller_id: " - <> show handlerId - <> ", cohort_id: " - <> show cohortId - <> ", subscriber_id:" - <> show sinkId - else return Nothing + -- deferred IO: + case threadRefM of + Just threadRef -> do + Immortal.stop threadRef + liftIO $ do + Prometheus.Gauge.dec $ submActiveStreamingPollers $ pmSubscriptionMetrics prometheusMetrics + let numSubscriptionMetric = submActiveSubscriptions $ pmSubscriptionMetrics $ prometheusMetrics + recordMetricWithLabel + granularPrometheusMetricsState + True + (GaugeVector.dec numSubscriptionMetric promMetricGranularLabel) + (GaugeVector.dec numSubscriptionMetric promMetricLabel) + -- This would seem to imply addStreamSubscriptionQuery broke or a bug + -- elsewhere. Be paranoid and log: + Nothing -> + L.unLogger logger $ + L.UnstructuredLog L.LevelError $ + fromString $ + "In removeStreamingQuery no worker thread installed. Please report this as a bug: " + <> " poller_id: " + <> show handlerId + <> ", cohort_id: " + <> show cohortId + <> ", subscriber_id:" + <> show sinkId + else do + let numSubscriptionMetric = submActiveSubscriptions $ pmSubscriptionMetrics $ prometheusMetrics + return $ + recordMetricWithLabel + granularPrometheusMetricsState + True + (GaugeVector.dec numSubscriptionMetric promMetricGranularLabel) + (GaugeVector.dec numSubscriptionMetric promMetricLabel) -- | An async action query whose relationships are refered to table in a source. -- We need to generate an SQL statement with the action response and execute it diff --git a/server/src-lib/Hasura/GraphQL/Execute/Subscription/TMap.hs b/server/src-lib/Hasura/GraphQL/Execute/Subscription/TMap.hs index ae86a557c12..5f542a63908 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/Subscription/TMap.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/Subscription/TMap.hs @@ -11,6 +11,7 @@ module Hasura.GraphQL.Execute.Subscription.TMap union, filterWithKey, getMap, + adjust, ) where @@ -58,3 +59,6 @@ union mapA mapB = do getMap :: TMap k v -> STM (HashMap.HashMap k v) getMap = readTVar . unTMap + +adjust :: (Hashable k) => (v -> v) -> k -> TMap k v -> STM () +adjust f k mapTV = modifyTVar' (unTMap mapTV) $ HashMap.adjust f k diff --git a/server/src-lib/Hasura/GraphQL/ParameterizedQueryHash.hs b/server/src-lib/Hasura/GraphQL/ParameterizedQueryHash.hs index d418f672d01..d39fdc5cfbc 100644 --- a/server/src-lib/Hasura/GraphQL/ParameterizedQueryHash.hs +++ b/server/src-lib/Hasura/GraphQL/ParameterizedQueryHash.hs @@ -121,7 +121,7 @@ parameterizedQueryHashListToObject = [("parameterized_query_hash", J.toJSON queryHashes)] newtype ParameterizedQueryHash = ParameterizedQueryHash {unParamQueryHash :: B.ByteString} - deriving (Show, Eq) + deriving (Show, Eq, Ord) instance J.ToJSON ParameterizedQueryHash where toJSON = J.String . bsToTxt . unParamQueryHash diff --git a/server/src-lib/Hasura/GraphQL/Transport/WSServerApp.hs b/server/src-lib/Hasura/GraphQL/Transport/WSServerApp.hs index 715d4e3d3b8..95ae6eb10de 100644 --- a/server/src-lib/Hasura/GraphQL/Transport/WSServerApp.hs +++ b/server/src-lib/Hasura/GraphQL/Transport/WSServerApp.hs @@ -40,6 +40,7 @@ import Hasura.Server.Prometheus decWebsocketConnections, incWebsocketConnections, ) +import Hasura.Server.Types (MonadGetPolicies (..)) import Hasura.Services.Network import Hasura.Tracing qualified as Tracing import Network.WebSockets qualified as WS @@ -59,7 +60,8 @@ createWSServerApp :: MonadQueryTags m, HasResourceLimits m, ProvidesNetwork m, - Tracing.MonadTrace m + Tracing.MonadTrace m, + MonadGetPolicies m ) => HashSet (L.EngineLogType L.Hasura) -> WSServerEnv impl -> @@ -96,9 +98,10 @@ createWSServerApp enabledLogTypes serverEnv connInitTimeout licenseKeyCache = \ onMessage enabledLogTypes getAuthMode serverEnv conn bs (wsActions sp) licenseKeyCache onCloseHandler conn = mask_ do + granularPrometheusMetricsState <- runGetPrometheusMetricsGranularity liftIO $ EKG.Gauge.dec $ smWebsocketConnections serverMetrics liftIO $ decWebsocketConnections $ pmConnections prometheusMetrics - onClose logger serverMetrics prometheusMetrics (_wseSubscriptionState serverEnv) conn + onClose logger serverMetrics prometheusMetrics (_wseSubscriptionState serverEnv) conn granularPrometheusMetricsState stopWSServerApp :: WSServerEnv impl -> IO () stopWSServerApp wsEnv = WS.shutdown (_wseServer wsEnv) diff --git a/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs b/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs index 3cbbacca858..30df2c13da3 100644 --- a/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs +++ b/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs @@ -93,7 +93,7 @@ import Hasura.Server.Prometheus PrometheusMetrics (..), ) import Hasura.Server.Telemetry.Counters qualified as Telem -import Hasura.Server.Types (RequestId, getRequestId) +import Hasura.Server.Types (GranularPrometheusMetricsState (..), MonadGetPolicies (..), RequestId, getRequestId) import Hasura.Services.Network import Hasura.Session import Hasura.Tracing qualified as Tracing @@ -420,7 +420,8 @@ onStart :: MonadMetadataStorage m, MonadQueryTags m, HasResourceLimits m, - ProvidesNetwork m + ProvidesNetwork m, + MonadGetPolicies m ) => HashSet (L.EngineLogType L.Hasura) -> Maybe (CredentialCache AgentLicenseKey) -> @@ -679,9 +680,9 @@ onStart enabledLogTypes agentLicenseKey serverEnv wsConn shouldCaptureVariables E.SEOnSourceDB (E.SSLivequery actionIds liveQueryBuilder) -> do actionLogMapE <- fmap fst <$> runExceptT (EA.fetchActionLogResponses actionIds) actionLogMap <- onLeft actionLogMapE (withComplete . preExecErr requestId (Just gqlOpType)) - opMetadataE <- liftIO $ startLiveQuery liveQueryBuilder parameterizedQueryHash requestId actionLogMap + granularPrometheusMetricsState <- runGetPrometheusMetricsGranularity + opMetadataE <- liftIO $ startLiveQuery liveQueryBuilder parameterizedQueryHash requestId actionLogMap granularPrometheusMetricsState lqId <- onLeft opMetadataE (withComplete . preExecErr requestId (Just gqlOpType)) - -- Update async action query subscription state case NE.nonEmpty (toList actionIds) of Nothing -> do @@ -694,11 +695,11 @@ onStart enabledLogTypes agentLicenseKey serverEnv wsConn shouldCaptureVariables let asyncActionQueryLive = ES.LAAQOnSourceDB $ ES.LiveAsyncActionQueryOnSource lqId actionLogMap $ - restartLiveQuery parameterizedQueryHash requestId liveQueryBuilder + restartLiveQuery parameterizedQueryHash requestId liveQueryBuilder granularPrometheusMetricsState (_grOperationName reqParsed) onUnexpectedException err = do sendError requestId err - stopOperation serverEnv wsConn opId (pure ()) -- Don't log in case opId don't exist + stopOperation serverEnv wsConn opId granularPrometheusMetricsState (pure ()) -- Don't log in case opId don't exist ES.addAsyncActionLiveQuery (ES._ssAsyncActions subscriptionsState) opId @@ -706,7 +707,8 @@ onStart enabledLogTypes agentLicenseKey serverEnv wsConn shouldCaptureVariables onUnexpectedException asyncActionQueryLive E.SEOnSourceDB (E.SSStreaming rootFieldName streamQueryBuilder) -> do - liftIO $ startStreamingQuery rootFieldName streamQueryBuilder parameterizedQueryHash requestId + granularPrometheusMetricsState <- runGetPrometheusMetricsGranularity + liftIO $ startStreamingQuery rootFieldName streamQueryBuilder parameterizedQueryHash requestId granularPrometheusMetricsState liftIO $ Prometheus.Counter.inc (gqlRequestsSubscriptionSuccess gqlMetrics) liftIO $ logOpEv ODStarted (Just requestId) (Just parameterizedQueryHash) @@ -894,12 +896,13 @@ onStart enabledLogTypes agentLicenseKey serverEnv wsConn shouldCaptureVariables liftIO $ sendCompleted Nothing Nothing throwError () - restartLiveQuery parameterizedQueryHash requestId liveQueryBuilder lqId actionLogMap = do - ES.removeLiveQuery logger (_wseServerMetrics serverEnv) (_wsePrometheusMetrics serverEnv) subscriptionsState lqId - either (const Nothing) Just <$> startLiveQuery liveQueryBuilder parameterizedQueryHash requestId actionLogMap + restartLiveQuery parameterizedQueryHash requestId liveQueryBuilder granularPrometheusMetricsState maybeOperationName lqId actionLogMap = do + ES.removeLiveQuery logger (_wseServerMetrics serverEnv) (_wsePrometheusMetrics serverEnv) subscriptionsState lqId granularPrometheusMetricsState maybeOperationName + either (const Nothing) Just <$> startLiveQuery liveQueryBuilder parameterizedQueryHash requestId actionLogMap granularPrometheusMetricsState - startLiveQuery liveQueryBuilder parameterizedQueryHash requestId actionLogMap = do + startLiveQuery liveQueryBuilder parameterizedQueryHash requestId actionLogMap granularPrometheusMetricsState = do liveQueryE <- runExceptT $ liveQueryBuilder actionLogMap + for liveQueryE $ \(sourceName, E.SubscriptionQueryPlan exists) -> do let !opName = _grOperationName q subscriberMetadata = ES.mkSubscriberMetadata (WS.getWSId wsConn) opId opName requestId @@ -920,14 +923,16 @@ onStart enabledLogTypes agentLicenseKey serverEnv wsConn shouldCaptureVariables opName requestId liveQueryPlan + granularPrometheusMetricsState (onChange opName parameterizedQueryHash $ ES._sqpNamespace liveQueryPlan) + liftIO $ $assertNFHere (lqId, opName) -- so we don't write thunks to mutable vars STM.atomically $ -- NOTE: see crucial `lookup` check above, ensuring this doesn't clobber: STMMap.insert (LiveQuerySubscriber lqId, opName) opId opMap pure lqId - startStreamingQuery rootFieldName (sourceName, E.SubscriptionQueryPlan exists) parameterizedQueryHash requestId = do + startStreamingQuery rootFieldName (sourceName, E.SubscriptionQueryPlan exists) parameterizedQueryHash requestId granularPrometheusMetricsState = do let !opName = _grOperationName q subscriberMetadata = ES.mkSubscriberMetadata (WS.getWSId wsConn) opId opName requestId -- NOTE!: we mask async exceptions higher in the call stack, but it's @@ -948,6 +953,7 @@ onStart enabledLogTypes agentLicenseKey serverEnv wsConn shouldCaptureVariables requestId (_rfaAlias rootFieldName) streamQueryPlan + granularPrometheusMetricsState (onChange opName parameterizedQueryHash $ ES._sqpNamespace streamQueryPlan) liftIO $ $assertNFHere (streamSubscriberId, opName) -- so we don't write thunks to mutable vars STM.atomically $ @@ -1017,7 +1023,8 @@ onMessage :: MonadQueryTags m, HasResourceLimits m, ProvidesNetwork m, - Tracing.MonadTrace m + Tracing.MonadTrace m, + MonadGetPolicies m ) => HashSet (L.EngineLogType L.Hasura) -> IO AuthMode -> @@ -1052,7 +1059,9 @@ onMessage enabledLogTypes authMode serverEnv wsConn msgRaw onMessageActions agen then CaptureQueryVariables else DoNotCaptureQueryVariables onStart enabledLogTypes agentLicenseKey serverEnv wsConn shouldCaptureVariables startMsg onMessageActions - CMStop stopMsg -> onStop serverEnv wsConn stopMsg + CMStop stopMsg -> do + granularPrometheusMetricsState <- runGetPrometheusMetricsGranularity + onStop serverEnv wsConn stopMsg granularPrometheusMetricsState -- specfic to graphql-ws CMPing mPayload -> onPing wsConn mPayload CMPong _mPayload -> pure () @@ -1067,15 +1076,15 @@ onPing :: (MonadIO m) => WSConn -> Maybe PingPongPayload -> m () onPing wsConn mPayload = liftIO $ sendMsg wsConn (SMPong mPayload) -onStop :: (MonadIO m) => WSServerEnv impl -> WSConn -> StopMsg -> m () -onStop serverEnv wsConn (StopMsg opId) = liftIO $ do +onStop :: (MonadIO m) => WSServerEnv impl -> WSConn -> StopMsg -> IO GranularPrometheusMetricsState -> m () +onStop serverEnv wsConn (StopMsg opId) granularPrometheusMetricsState = liftIO $ do -- When a stop message is received for an operation, it may not be present in OpMap -- in these cases: -- 1. If the operation is a query/mutation - as we remove the operation from the -- OpMap as soon as it is executed -- 2. A misbehaving client -- 3. A bug on our end - stopOperation serverEnv wsConn opId $ + stopOperation serverEnv wsConn opId granularPrometheusMetricsState $ L.unLogger logger $ L.UnstructuredLog L.LevelDebug $ fromString $ @@ -1085,17 +1094,17 @@ onStop serverEnv wsConn (StopMsg opId) = liftIO $ do where logger = _wseLogger serverEnv -stopOperation :: WSServerEnv impl -> WSConn -> OperationId -> IO () -> IO () -stopOperation serverEnv wsConn opId logWhenOpNotExist = do +stopOperation :: WSServerEnv impl -> WSConn -> OperationId -> IO GranularPrometheusMetricsState -> IO () -> IO () +stopOperation serverEnv wsConn opId granularPrometheusMetricsState logWhenOpNotExist = do opM <- liftIO $ STM.atomically $ STMMap.lookup opId opMap case opM of - Just (subscriberDetails, opNameM) -> do - logWSEvent logger wsConn $ EOperation $ opDet opNameM + Just (subscriberDetails, operationName) -> do + logWSEvent logger wsConn $ EOperation $ opDet operationName case subscriberDetails of LiveQuerySubscriber lqId -> - ES.removeLiveQuery logger (_wseServerMetrics serverEnv) (_wsePrometheusMetrics serverEnv) subscriptionState lqId + ES.removeLiveQuery logger (_wseServerMetrics serverEnv) (_wsePrometheusMetrics serverEnv) subscriptionState lqId granularPrometheusMetricsState operationName StreamingQuerySubscriber streamSubscriberId -> - ES.removeStreamingQuery logger (_wseServerMetrics serverEnv) (_wsePrometheusMetrics serverEnv) subscriptionState streamSubscriberId + ES.removeStreamingQuery logger (_wseServerMetrics serverEnv) (_wsePrometheusMetrics serverEnv) subscriptionState streamSubscriberId granularPrometheusMetricsState operationName Nothing -> logWhenOpNotExist STM.atomically $ STMMap.delete opId opMap where @@ -1182,14 +1191,15 @@ onClose :: PrometheusMetrics -> ES.SubscriptionsState -> WSConn -> + IO GranularPrometheusMetricsState -> m () -onClose logger serverMetrics prometheusMetrics subscriptionsState wsConn = do +onClose logger serverMetrics prometheusMetrics subscriptionsState wsConn granularPrometheusMetricsState = do logWSEvent logger wsConn EClosed operations <- liftIO $ STM.atomically $ ListT.toList $ STMMap.listT opMap liftIO $ - for_ operations $ \(_, (subscriber, _)) -> + for_ operations $ \(_, (subscriber, operationName)) -> case subscriber of - LiveQuerySubscriber lqId -> ES.removeLiveQuery logger serverMetrics prometheusMetrics subscriptionsState lqId - StreamingQuerySubscriber streamSubscriberId -> ES.removeStreamingQuery logger serverMetrics prometheusMetrics subscriptionsState streamSubscriberId + LiveQuerySubscriber lqId -> ES.removeLiveQuery logger serverMetrics prometheusMetrics subscriptionsState lqId granularPrometheusMetricsState operationName + StreamingQuerySubscriber streamSubscriberId -> ES.removeStreamingQuery logger serverMetrics prometheusMetrics subscriptionsState streamSubscriberId granularPrometheusMetricsState operationName where opMap = _wscOpMap $ WS.getData wsConn diff --git a/server/src-lib/Hasura/Server/Prometheus.hs b/server/src-lib/Hasura/Server/Prometheus.hs index 5a305d82bdf..c4cb3edb08f 100644 --- a/server/src-lib/Hasura/Server/Prometheus.hs +++ b/server/src-lib/Hasura/Server/Prometheus.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE DeriveAnyClass #-} + -- | Mutable references for Prometheus metrics. -- -- These metrics are independent from the metrics in "Hasura.Server.Metrics". @@ -20,20 +22,32 @@ module Hasura.Server.Prometheus TriggerNameLabel (..), GranularPrometheusMetricsState (..), observeHistogramWithLabel, + SubscriptionKindLabel (..), + SubscriptionLabel (..), + DynamicSubscriptionLabel (..), + streamingSubscriptionLabel, + liveQuerySubscriptionLabel, + recordMetricWithLabel, + recordSubcriptionMetric, ) where import Data.HashMap.Internal.Strict qualified as Map +import Data.HashMap.Strict qualified as HashMap import Data.IORef (IORef, atomicModifyIORef', newIORef, readIORef) import Data.Int (Int64) +import Hasura.GraphQL.ParameterizedQueryHash +import Hasura.GraphQL.Transport.HTTP.Protocol (OperationName (..)) import Hasura.Prelude import Hasura.RQL.Types.EventTrigger (TriggerName, triggerNameToTxt) import Hasura.Server.Types (GranularPrometheusMetricsState (..)) +import Language.GraphQL.Draft.Syntax qualified as G import System.Metrics.Prometheus (ToLabels (..)) import System.Metrics.Prometheus.Counter (Counter) import System.Metrics.Prometheus.Counter qualified as Counter import System.Metrics.Prometheus.Gauge (Gauge) import System.Metrics.Prometheus.Gauge qualified as Gauge +import System.Metrics.Prometheus.GaugeVector qualified as GaugeVector import System.Metrics.Prometheus.Histogram (Histogram) import System.Metrics.Prometheus.Histogram qualified as Histogram import System.Metrics.Prometheus.HistogramVector (HistogramVector) @@ -44,7 +58,6 @@ import System.Metrics.Prometheus.HistogramVector qualified as HistogramVector -- | Mutable references for Prometheus metrics. data PrometheusMetrics = PrometheusMetrics { pmConnections :: ConnectionsGauge, - pmActiveSubscriptions :: Gauge, pmGraphQLRequestMetrics :: GraphQLRequestMetrics, pmEventTriggerMetrics :: EventTriggerMetrics, pmWebSocketBytesReceived :: Counter, @@ -101,7 +114,10 @@ data SubscriptionMetrics = SubscriptionMetrics { submActiveLiveQueryPollers :: Gauge, submActiveStreamingPollers :: Gauge, submActiveLiveQueryPollersInError :: Gauge, - submActiveStreamingPollersInError :: Gauge + submActiveStreamingPollersInError :: Gauge, + submTotalTime :: HistogramVector.HistogramVector SubscriptionLabel, + submDBExecTotalTime :: HistogramVector.HistogramVector SubscriptionLabel, + submActiveSubscriptions :: GaugeVector.GaugeVector SubscriptionLabel } data CacheRequestMetrics = CacheRequestMetrics @@ -114,7 +130,6 @@ data CacheRequestMetrics = CacheRequestMetrics makeDummyPrometheusMetrics :: IO PrometheusMetrics makeDummyPrometheusMetrics = do pmConnections <- newConnectionsGauge - pmActiveSubscriptions <- Gauge.new pmGraphQLRequestMetrics <- makeDummyGraphQLRequestMetrics pmEventTriggerMetrics <- makeDummyEventTriggerMetrics pmWebSocketBytesReceived <- Counter.new @@ -176,6 +191,9 @@ makeDummySubscriptionMetrics = do submActiveStreamingPollers <- Gauge.new submActiveLiveQueryPollersInError <- Gauge.new submActiveStreamingPollersInError <- Gauge.new + submTotalTime <- HistogramVector.new [] + submDBExecTotalTime <- HistogramVector.new [] + submActiveSubscriptions <- GaugeVector.new pure SubscriptionMetrics {..} makeDummyCacheRequestMetrics :: IO CacheRequestMetrics @@ -239,6 +257,40 @@ instance ToLabels (Maybe TriggerNameLabel) where toLabels Nothing = Map.empty toLabels (Just (TriggerNameLabel triggerName)) = Map.singleton "trigger_name" (triggerNameToTxt triggerName) +data SubscriptionKindLabel = SubscriptionKindLabel + { subscription_kind :: Text + } + deriving stock (Generic, Ord, Eq) + deriving anyclass (ToLabels) + +streamingSubscriptionLabel :: SubscriptionKindLabel +streamingSubscriptionLabel = SubscriptionKindLabel "streaming" + +liveQuerySubscriptionLabel :: SubscriptionKindLabel +liveQuerySubscriptionLabel = SubscriptionKindLabel "live-query" + +data DynamicSubscriptionLabel = DynamicSubscriptionLabel + { _dslParamQueryHash :: ParameterizedQueryHash, + _dslOperationName :: Maybe OperationName + } + deriving stock (Generic, Ord, Eq) + +instance ToLabels DynamicSubscriptionLabel where + toLabels (DynamicSubscriptionLabel hash opName) = + Map.fromList $ + [("parameterized_query_hash", bsToTxt $ unParamQueryHash hash)] + <> maybe [] (\op -> [("operation_name", G.unName $ _unOperationName op)]) opName + +data SubscriptionLabel = SubscriptionLabel + { _slKind :: SubscriptionKindLabel, + _slDynamicLabels :: Maybe DynamicSubscriptionLabel + } + deriving stock (Generic, Ord, Eq) + +instance ToLabels SubscriptionLabel where + toLabels (SubscriptionLabel kind Nothing) = Map.fromList $ [("subscription_kind", subscription_kind kind)] + toLabels (SubscriptionLabel kind (Just dl)) = (Map.fromList $ [("subscription_kind", subscription_kind kind)]) <> toLabels dl + -- | Record metrics with dynamic label recordMetricWithLabel :: (MonadIO m) => @@ -280,3 +332,39 @@ observeHistogramWithLabel getMetricState alwaysObserve histogramVector label val alwaysObserve (liftIO $ HistogramVector.observe histogramVector (Just label) value) (liftIO $ HistogramVector.observe histogramVector Nothing value) + +-- | Record a subscription metric for all the operation names present in the subscription. +-- Use this when you want to update the same value of the metric for all the operation names. +recordSubcriptionMetric :: + (MonadIO m) => + (IO GranularPrometheusMetricsState) -> + -- should the metric be observed without a label when granularMetricsState is OFF + Bool -> + HashMap (Maybe OperationName) Int -> + ParameterizedQueryHash -> + SubscriptionKindLabel -> + -- the mertic action to perform + (SubscriptionLabel -> IO ()) -> + m () +recordSubcriptionMetric getMetricState alwaysObserve operationNamesMap parameterizedQueryHash subscriptionKind metricAction = do + -- if no operation names are present, then emit metric with only param query hash as dynamic label + if (null operationNamesMap) + then do + let promMetricGranularLabel = SubscriptionLabel subscriptionKind (Just $ DynamicSubscriptionLabel parameterizedQueryHash Nothing) + promMetricLabel = SubscriptionLabel subscriptionKind Nothing + recordMetricWithLabel + getMetricState + alwaysObserve + (metricAction promMetricGranularLabel) + (metricAction promMetricLabel) + else -- if operationNames are present, then emit the same metric for all the operation names + do + let operationNames = HashMap.keys operationNamesMap + for_ operationNames $ \opName -> do + let promMetricGranularLabel = SubscriptionLabel subscriptionKind (Just $ DynamicSubscriptionLabel parameterizedQueryHash opName) + promMetricLabel = SubscriptionLabel subscriptionKind Nothing + recordMetricWithLabel + getMetricState + alwaysObserve + (metricAction promMetricGranularLabel) + (metricAction promMetricLabel) diff --git a/server/src-lib/Hasura/Server/Types.hs b/server/src-lib/Hasura/Server/Types.hs index 589bf66f5dd..ea6b245cc1f 100644 --- a/server/src-lib/Hasura/Server/Types.hs +++ b/server/src-lib/Hasura/Server/Types.hs @@ -159,11 +159,11 @@ instance ToJSON ApolloFederationStatus where -- | Whether or not to enable granular metrics for Prometheus. -- --- `GranularMetricsOn` will enable the dynamic labels for the metrics. `GranularMetricsOff` will disable the dynamic --- labels for the metrics. +-- `GranularMetricsOn` will enable the dynamic labels for the metrics. +-- `GranularMetricsOff` will disable the dynamic labels for the metrics. -- --- **Warning**: Enabling dynamic labels for Prometheus metrics can cause cardinality issues and can cause memory usage --- to increase. +-- **Warning**: Enabling dynamic labels for Prometheus metrics can cause cardinality +-- issues and can cause memory usage to increase. data GranularPrometheusMetricsState = GranularMetricsOff | GranularMetricsOn @@ -182,6 +182,11 @@ instance ToJSON GranularPrometheusMetricsState where class Monad m => MonadGetPolicies m where runGetApiTimeLimit :: m (Maybe MaxTime) + + -- 'GranularPrometheusMetricsState' is used to decide if dynamic labels needs to be + -- added when emitting the prometheus metric. The state of this can be dynamically + -- changed via policies. Hence we need to fetch the value from the policy everytime + -- before emitting the metric. Thus we create an IO action which fetches the value. runGetPrometheusMetricsGranularity :: m (IO GranularPrometheusMetricsState) diff --git a/server/test-postgres/Test/Hasura/StreamingSubscriptionSuite.hs b/server/test-postgres/Test/Hasura/StreamingSubscriptionSuite.hs index d06f7b68588..1df8fed4b82 100644 --- a/server/test-postgres/Test/Hasura/StreamingSubscriptionSuite.hs +++ b/server/test-postgres/Test/Hasura/StreamingSubscriptionSuite.hs @@ -42,7 +42,7 @@ import Hasura.RQL.Types.Roles (RoleName, mkRoleName) import Hasura.Server.Init (considerEnv, databaseUrlOption, runWithEnv, _envVar) import Hasura.Server.Metrics (createServerMetrics) import Hasura.Server.Prometheus (makeDummyPrometheusMetrics) -import Hasura.Server.Types (RequestId (..)) +import Hasura.Server.Types (GranularPrometheusMetricsState (..), RequestId (..)) import Language.GraphQL.Draft.Syntax.QQ qualified as G import ListT qualified import StmContainers.Map qualified as STMMap @@ -95,6 +95,9 @@ getStaticCohortSnapshot (Cohort cohortId _respRef existingSubsTV newSubsTV _) = streamingSubscriptionPollingSpec :: SourceConfig ('Postgres 'Vanilla) -> Spec streamingSubscriptionPollingSpec srcConfig = do + dummyServerStore <- runIO newStore + dummyServerMetrics <- runIO $ createServerMetrics dummyServerStore + dummyPromMetrics <- runIO makeDummyPrometheusMetrics let setupDDLTx = PG.unitQE defaultTxErrorHandler @@ -133,6 +136,7 @@ streamingSubscriptionPollingSpec srcConfig = do pollerId <- runIO $ PollerId <$> UUID.nextRandom pollerResponseState <- runIO $ STM.newTVarIO PRSSuccess + emptyOperationNamesMap <- runIO $ STM.atomically $ TMap.new let defaultSubscriptionOptions = mkSubscriptionsOptions Nothing Nothing -- use default values paramQueryHash = mkUnsafeParameterizedQueryHash "random" -- hardcoded multiplexed query which is generated for the following GraphQL query: @@ -151,7 +155,6 @@ streamingSubscriptionPollingSpec srcConfig = do ORDER BY "root.pg.id" ASC ) AS "_2_root" ) AS "numbers_stream" ) AS "_fld_resp" ON ('true') |] - dummyPrometheusMetrics <- runIO makeDummyPrometheusMetrics let pollingAction cohortMap testSyncAction = pollStreamingQuery @('Postgres 'Vanilla) @@ -166,7 +169,10 @@ streamingSubscriptionPollingSpec srcConfig = do [G.name|randomRootField|] (const $ pure ()) testSyncAction - dummyPrometheusMetrics + dummyPromMetrics + (pure GranularMetricsOff) + emptyOperationNamesMap + Nothing mkSubscriber sId = let wsId = maybe (error "Invalid UUID") WS.mkUnsafeWSId $ UUID.fromString "ec981f92-8d5a-47ab-a306-80af7cfb1113" @@ -218,7 +224,7 @@ streamingSubscriptionPollingSpec srcConfig = do TMap.reset cohortMap TMap.insert cohort1 cohortKey1 cohortMap - runIO $ pollingAction cohortMap Nothing Nothing + runIO $ pollingAction cohortMap Nothing currentCohortMap <- runIO $ STM.atomically $ TMap.getMap cohortMap it "the key of the cohort1 should have been moved from the cohortKey1 to cohortKey2, so it should not be found anymore at cohortKey1" $ do @@ -241,7 +247,7 @@ streamingSubscriptionPollingSpec srcConfig = do MVar.readMVar syncMVar STM.atomically $ TMap.insert cohort2 cohortKey3 cohortMap Async.withAsync - (pollingAction cohortMap (Just syncAction) Nothing) + (pollingAction cohortMap (Just syncAction)) ( \pollAsync -> do MVar.putMVar syncMVar () Async.wait pollAsync @@ -264,7 +270,7 @@ streamingSubscriptionPollingSpec srcConfig = do MVar.readMVar syncMVar STM.atomically $ TMap.delete cohortKey1 cohortMap Async.withAsync - (pollingAction cohortMap (Just syncAction) Nothing) + (pollingAction cohortMap (Just syncAction)) ( \pollAsync -> do MVar.putMVar syncMVar () Async.wait pollAsync @@ -283,7 +289,7 @@ streamingSubscriptionPollingSpec srcConfig = do MVar.readMVar syncMVar STM.atomically $ addSubscriberToCohort newTemporarySubscriber cohort1 Async.withAsync - (pollingAction cohortMap (Just syncAction) Nothing) + (pollingAction cohortMap (Just syncAction)) ( \pollAsync -> do -- concurrently inserting a new cohort to a key (cohortKey2) to which -- cohort1 is expected to be associated after the current poll @@ -315,7 +321,7 @@ streamingSubscriptionPollingSpec srcConfig = do MVar.readMVar syncMVar STM.atomically $ TMap.delete temporarySubscriberId (_cNewSubscribers cohort1) Async.withAsync - (pollingAction cohortMap (Just syncAction) Nothing) + (pollingAction cohortMap (Just syncAction)) ( \pollAsync -> do MVar.putMVar syncMVar () Async.wait pollAsync @@ -339,10 +345,6 @@ streamingSubscriptionPollingSpec srcConfig = do TMap.delete temporarySubscriberId (_cNewSubscribers cohort1) describe "Adding two subscribers concurrently" $ do - dummyServerStore <- runIO newStore - dummyServerMetrics <- runIO $ createServerMetrics dummyServerStore - dummyPromMetrics <- runIO makeDummyPrometheusMetrics - subscriptionState <- do runIO $ initSubscriptionsState (const (pure ())) @@ -389,6 +391,7 @@ streamingSubscriptionPollingSpec srcConfig = do reqId [G.name|numbers_stream|] subscriptionQueryPlan + (pure GranularMetricsOff) (const (pure ())) it "concurrently adding two subscribers should retain both of them in the poller map" $ do @@ -403,7 +406,7 @@ streamingSubscriptionPollingSpec srcConfig = do streamQueryMapEntries <- STM.atomically $ ListT.toList $ STMMap.listT streamQueryMap length streamQueryMapEntries `shouldBe` 1 - let (pollerKey, (Poller currentCohortMap _ ioState)) = head streamQueryMapEntries + let (pollerKey, (Poller currentCohortMap _ ioState _ _)) = head streamQueryMapEntries cohorts <- STM.atomically $ TMap.toList currentCohortMap length cohorts `shouldBe` 1 let (_cohortKey, Cohort _ _ curSubsTV newSubsTV _) = head cohorts