mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-14 08:02:15 +03:00
server: add total time, db exec time subscription metrics
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/8899 Co-authored-by: paritosh-08 <85472423+paritosh-08@users.noreply.github.com> Co-authored-by: Rob Dominguez <24390149+robertjdominguez@users.noreply.github.com> GitOrigin-RevId: c8c4a89576ae95265a8e4f4e6803a12ba7e840d4
This commit is contained in:
parent
db5370bb62
commit
770407110c
@ -342,7 +342,7 @@ Enable the Hasura Console (served by the server on `/` and `/console`).
|
||||
### Enable High-cardinality Labels for Metrics
|
||||
|
||||
Enable high-cardinality labels for [Prometheus Metrics](/enterprise/metrics.mdx). Enabling this setting will add more labels to
|
||||
some of the metrics (e.g. `operationName` label for Graphql subscription metrics).
|
||||
some of the metrics (e.g. `operation_name` label for Graphql subscription metrics).
|
||||
|
||||
| | |
|
||||
| ------------------- | ------------------------------------------------- |
|
||||
|
@ -44,9 +44,10 @@ The metrics endpoint should be configured with a secret to prevent misuse and sh
|
||||
|
||||
Starting in `v2.26.0`, Hasura GraphQL Engine exposes metrics with high-cardinality labels by default.
|
||||
|
||||
You can disable [the cardinality of labels for
|
||||
metrics](/deployment/graphql-engine-flags/reference.mdx#enable-high-cardinality-labels-for-metrics) if you are
|
||||
experiencing high memory usage, which can be due to a large number of labels in the metrics (typically more than 10000).
|
||||
You can disable
|
||||
[the cardinality of labels for metrics](/deployment/graphql-engine-flags/reference.mdx#enable-high-cardinality-labels-for-metrics)
|
||||
if you are experiencing high memory usage, which can be due to a large number of labels in the metrics (typically more
|
||||
than 10000).
|
||||
|
||||
:::
|
||||
|
||||
@ -54,6 +55,16 @@ experiencing high memory usage, which can be due to a large number of labels in
|
||||
|
||||
The following metrics are exported by Hasura GraphQL Engine:
|
||||
|
||||
### Hasura active subscriptions
|
||||
|
||||
Current number of active subscriptions, representing the subscription load on the server.
|
||||
|
||||
| | |
|
||||
| ------ | ------------------------------------------------------------------------------------------ |
|
||||
| Name | `hasura_active_subscriptions` |
|
||||
| Type | Gauge |
|
||||
| Labels | `subscription_kind`: streaming \| live-query, `operation_name`, `parameterized_query_hash` |
|
||||
|
||||
### Hasura active subscription pollers
|
||||
|
||||
Current number of active subscription pollers. A subscription poller
|
||||
@ -84,16 +95,6 @@ runtime errors.
|
||||
| Type | Gauge |
|
||||
| Labels | `subscription_kind`: streaming \| live-query |
|
||||
|
||||
### Hasura active subscriptions
|
||||
|
||||
Current number of active subscriptions, representing the subscription load on the server.
|
||||
|
||||
| | |
|
||||
| ------ | ----------------------------- |
|
||||
| Name | `hasura_active_subscriptions` |
|
||||
| Type | Gauge |
|
||||
| Labels | none |
|
||||
|
||||
### Hasura Cache requests count
|
||||
|
||||
Tracks cache hit and miss requests, which helps in monitoring and optimizing cache utilization.
|
||||
@ -283,6 +284,75 @@ Current number of active PostgreSQL connections. Compare this to
|
||||
| Type | Gauge |
|
||||
| Labels | `source_name`: name of the database<br />`conn_info`: connection url string (password omitted) or name of the connection url environment variable<br />`role`: primary \| replica |
|
||||
|
||||
### Hasura subscription database execution time
|
||||
|
||||
The time taken to run the subscription's multiplexed query in the database for a single batch.
|
||||
|
||||
A subscription poller
|
||||
[multiplexes](https://github.com/hasura/graphql-engine/blob/master/architecture/live-queries.md#idea-3-batch-multiple-live-queries-into-one-sql-query)
|
||||
similar subscriptions together. During every run (every 1 second by default), the poller splits the different variables
|
||||
for the multiplexed query into batches (by default 100) and execute the batches. This metric observes the time taken for
|
||||
each batch to execute on the database.
|
||||
|
||||
If this metric is high, then it may be an indication that the database is not performing as expected, you should
|
||||
consider investigating the subscription query and see if indexes can help improve performance.
|
||||
|
||||
| | |
|
||||
| ------ | ------------------------------------------------------------------------------------------ |
|
||||
| Name | `hasura_subscription_db_execution_time_seconds` |
|
||||
| Type | Histogram<br /><br />Buckets: 0.000001, 0.0001, 0.01, 0.1, 0.3, 1, 3, 10, 30, 100 |
|
||||
| Labels | `subscription_kind`: streaming \| live-query, `operation_name`, `parameterized_query_hash` |
|
||||
|
||||
### Hasura subscription total time
|
||||
|
||||
The time taken to complete running of one subscription poller.
|
||||
|
||||
A subscription poller
|
||||
[multiplexes](https://github.com/hasura/graphql-engine/blob/master/architecture/live-queries.md#idea-3-batch-multiple-live-queries-into-one-sql-query)
|
||||
similar subscriptions together. This subscription poller runs every 1 second by default and queries the database with
|
||||
the multiplexed query to fetch the latest data. In a polling instance, the poller not only queries the database but does
|
||||
other operations like splitting similar queries into batches (by default 100) before fetching their data from the
|
||||
database, etc. **This metric is the total time taken to complete all the operations in a single poll.**
|
||||
|
||||
If the value of this metric is high, then it may be an indication that the multiplexed query is taking longer to execute
|
||||
in the database, verify with
|
||||
[`hasura_subscription_db_execution_time_seconds`](/enterprise/metrics.mdx/#hasura-subscription-database-execution-time)
|
||||
metric.
|
||||
|
||||
In a single poll, the subscription poller splits the different variables for the multiplexed query into batches (by
|
||||
default 100) and executes the batches. We use the `hasura_subscription_db_execution_time_seconds` metric to observe the
|
||||
time taken for each batch to execute on the database. This means for a single poll there can be multiple values for
|
||||
`hasura_subscription_db_execution_time_seconds` metric.
|
||||
|
||||
Let's look at an example to understand these metrics better:
|
||||
|
||||
Say we have 650 subscriptions with the same selection set but different input arguments. These 650 subscriptions will be
|
||||
grouped to form one multiplexed query. A single poller would be created to run this multiplexed query. This poller will
|
||||
run every 1 second.
|
||||
|
||||
The default batch size in hasura is 100, so the 650 subscriptions will be split into 7 batches for execution during a
|
||||
single poll.
|
||||
|
||||
During a single poll:
|
||||
|
||||
- Batch 1: `hasura_subscription_db_execution_time_seconds` = 0.002 seconds
|
||||
- Batch 2: `hasura_subscription_db_execution_time_seconds` = 0.001 seconds
|
||||
- Batch 3: `hasura_subscription_db_execution_time_seconds` = 0.003 seconds
|
||||
- Batch 4: `hasura_subscription_db_execution_time_seconds` = 0.001 seconds
|
||||
- Batch 5: `hasura_subscription_db_execution_time_seconds` = 0.002 seconds
|
||||
- Batch 6: `hasura_subscription_db_execution_time_seconds` = 0.001 seconds
|
||||
- Batch 7: `hasura_subscription_db_execution_time_seconds` = 0.002 seconds
|
||||
|
||||
The `hasura_subscription_total_time_seconds` would be sum of all the database execution times shown in the batches, plus
|
||||
some extra process time for other tasks the poller does during a single poll. In this case, it would be approximately
|
||||
0.013 seconds.
|
||||
|
||||
| | |
|
||||
| ------ | ------------------------------------------------------------------------------------------ |
|
||||
| Name | `hasura_subscription_total_time_seconds` |
|
||||
| Type | Histogram<br /><br />Buckets: 0.000001, 0.0001, 0.01, 0.1, 0.3, 1, 3, 10, 30, 100 |
|
||||
| Labels | `subscription_kind`: streaming \| live-query, `operation_name`, `parameterized_query_hash` |
|
||||
|
||||
### Hasura WebSocket connections
|
||||
|
||||
Current number of active WebSocket connections, representing the WebSocket load on the server.
|
||||
|
@ -247,7 +247,11 @@ data Poller streamCursor = Poller
|
||||
-- This var is "write once", moving monotonically from empty to full.
|
||||
-- TODO this could probably be tightened up to something like
|
||||
-- 'STM PollerIOState'
|
||||
_pIOState :: STM.TMVar PollerIOState
|
||||
_pIOState :: STM.TMVar PollerIOState,
|
||||
_pParameterizedQueryHash :: ParameterizedQueryHash,
|
||||
-- The operation names of the subscriptions that are part of this poller. This is
|
||||
-- used while emitting subscription metrics
|
||||
_pOperationNamesMap :: TMap.TMap (Maybe OperationName) Int
|
||||
}
|
||||
|
||||
data PollerIOState = PollerIOState
|
||||
@ -291,7 +295,7 @@ dumpPollerMap :: Bool -> PollerMap streamCursor -> IO J.Value
|
||||
dumpPollerMap extended pollerMap =
|
||||
fmap J.toJSON $ do
|
||||
entries <- STM.atomically $ ListT.toList $ STMMap.listT pollerMap
|
||||
forM entries $ \(pollerKey, Poller cohortsMap _responseState ioState) ->
|
||||
forM entries $ \(pollerKey, Poller cohortsMap _responseState ioState _paramQueryHash _opNames) ->
|
||||
AB.dispatchAnyBackend @Backend (unBackendPollerKey pollerKey) $ \(PollerKey source role query _connectionKey) -> do
|
||||
PollerIOState threadId pollerId <- STM.atomically $ STM.readTMVar ioState
|
||||
cohortsJ <-
|
||||
@ -305,7 +309,8 @@ dumpPollerMap extended pollerMap =
|
||||
"thread_id" J..= show (Immortal.threadId threadId),
|
||||
"poller_id" J..= pollerId,
|
||||
"multiplexed_query" J..= query,
|
||||
"cohorts" J..= cohortsJ
|
||||
"cohorts" J..= cohortsJ,
|
||||
"parameterized_query_hash" J..= _paramQueryHash
|
||||
]
|
||||
|
||||
-- | An ID to track unique 'Poller's, so that we can gather metrics about each
|
||||
|
@ -33,9 +33,11 @@ import Hasura.RQL.Types.BackendType (BackendType (..), PostgresKind (Vanilla))
|
||||
import Hasura.RQL.Types.Common (SourceName)
|
||||
import Hasura.RQL.Types.Roles (RoleName)
|
||||
import Hasura.RQL.Types.Subscription (SubscriptionType (..))
|
||||
import Hasura.Server.Prometheus (PrometheusMetrics (..), SubscriptionMetrics (..))
|
||||
import Hasura.Server.Prometheus (PrometheusMetrics (..), SubscriptionMetrics (..), liveQuerySubscriptionLabel, recordSubcriptionMetric)
|
||||
import Hasura.Server.Types (GranularPrometheusMetricsState (..))
|
||||
import Refined (unrefine)
|
||||
import System.Metrics.Prometheus.Gauge qualified as Prometheus.Gauge
|
||||
import System.Metrics.Prometheus.HistogramVector qualified as HistogramVector
|
||||
|
||||
pushResultToCohort ::
|
||||
GQResult BS.ByteString ->
|
||||
@ -86,9 +88,12 @@ pollLiveQuery ::
|
||||
CohortMap 'LiveQuery ->
|
||||
SubscriptionPostPollHook ->
|
||||
PrometheusMetrics ->
|
||||
IO GranularPrometheusMetricsState ->
|
||||
TMap.TMap (Maybe OperationName) Int ->
|
||||
ResolvedConnectionTemplate b ->
|
||||
IO ()
|
||||
pollLiveQuery pollerId pollerResponseState lqOpts (sourceName, sourceConfig) roleName parameterizedQueryHash query cohortMap postPollHook prometheusMetrics resolvedConnectionTemplate = do
|
||||
pollLiveQuery pollerId pollerResponseState lqOpts (sourceName, sourceConfig) roleName parameterizedQueryHash query cohortMap postPollHook prometheusMetrics granularPrometheusMetricsState operationNamesMap' resolvedConnectionTemplate = do
|
||||
operationNamesMap <- STM.atomically $ TMap.getMap operationNamesMap'
|
||||
(totalTime, (snapshotTime, batchesDetails)) <- withElapsedTime $ do
|
||||
-- snapshot the current cohorts and split them into batches
|
||||
(snapshotTime, cohortBatches) <- withElapsedTime $ do
|
||||
@ -105,6 +110,15 @@ pollLiveQuery pollerId pollerResponseState lqOpts (sourceName, sourceConfig) rol
|
||||
batchesDetails <- A.forConcurrently cohortBatches $ \(batchId, cohorts) -> do
|
||||
(queryExecutionTime, mxRes) <- runDBSubscription @b sourceConfig query (over (each . _2) C._csVariables cohorts) resolvedConnectionTemplate
|
||||
|
||||
let dbExecTimeMetric = submDBExecTotalTime $ pmSubscriptionMetrics $ prometheusMetrics
|
||||
recordSubcriptionMetric
|
||||
granularPrometheusMetricsState
|
||||
True
|
||||
operationNamesMap
|
||||
parameterizedQueryHash
|
||||
liveQuerySubscriptionLabel
|
||||
(flip (HistogramVector.observe dbExecTimeMetric) (realToFrac queryExecutionTime))
|
||||
|
||||
previousPollerResponseState <- STM.readTVarIO pollerResponseState
|
||||
|
||||
case mxRes of
|
||||
@ -152,7 +166,6 @@ pollLiveQuery pollerId pollerResponseState lqOpts (sourceName, sourceConfig) rol
|
||||
batchId
|
||||
cohortsExecutionDetails
|
||||
batchResponseSize
|
||||
|
||||
pure (snapshotTime, batchesDetails)
|
||||
|
||||
let pollDetails =
|
||||
@ -169,6 +182,14 @@ pollLiveQuery pollerId pollerResponseState lqOpts (sourceName, sourceConfig) rol
|
||||
_pdParameterizedQueryHash = parameterizedQueryHash
|
||||
}
|
||||
postPollHook pollDetails
|
||||
let totalTimeMetric = submTotalTime $ pmSubscriptionMetrics $ prometheusMetrics
|
||||
recordSubcriptionMetric
|
||||
granularPrometheusMetricsState
|
||||
True
|
||||
operationNamesMap
|
||||
parameterizedQueryHash
|
||||
liveQuerySubscriptionLabel
|
||||
(flip (HistogramVector.observe totalTimeMetric) (realToFrac totalTime))
|
||||
where
|
||||
SubscriptionsOptions batchSize _ = lqOpts
|
||||
|
||||
|
@ -37,10 +37,12 @@ import Hasura.RQL.Types.Common (SourceName)
|
||||
import Hasura.RQL.Types.Roles (RoleName)
|
||||
import Hasura.RQL.Types.Subscription (SubscriptionType (..))
|
||||
import Hasura.SQL.Value (TxtEncodedVal (..))
|
||||
import Hasura.Server.Prometheus (PrometheusMetrics (..), SubscriptionMetrics (..))
|
||||
import Hasura.Server.Prometheus (PrometheusMetrics (..), SubscriptionMetrics (..), recordSubcriptionMetric, streamingSubscriptionLabel)
|
||||
import Hasura.Server.Types (GranularPrometheusMetricsState (..))
|
||||
import Language.GraphQL.Draft.Syntax qualified as G
|
||||
import Refined (unrefine)
|
||||
import System.Metrics.Prometheus.Gauge qualified as Prometheus.Gauge
|
||||
import System.Metrics.Prometheus.HistogramVector qualified as HistogramVector
|
||||
import Text.Shakespeare.Text (st)
|
||||
|
||||
{- Note [Streaming subscriptions rebuilding cohort map]
|
||||
@ -249,9 +251,12 @@ pollStreamingQuery ::
|
||||
SubscriptionPostPollHook ->
|
||||
Maybe (IO ()) -> -- Optional IO action to make this function (pollStreamingQuery) testable
|
||||
PrometheusMetrics ->
|
||||
IO GranularPrometheusMetricsState ->
|
||||
TMap.TMap (Maybe OperationName) Int ->
|
||||
ResolvedConnectionTemplate b ->
|
||||
IO ()
|
||||
pollStreamingQuery pollerId pollerResponseState streamingQueryOpts (sourceName, sourceConfig) roleName parameterizedQueryHash query cohortMap rootFieldName postPollHook testActionMaybe prometheusMetrics resolvedConnectionTemplate = do
|
||||
pollStreamingQuery pollerId pollerResponseState streamingQueryOpts (sourceName, sourceConfig) roleName parameterizedQueryHash query cohortMap rootFieldName postPollHook testActionMaybe prometheusMetrics granularPrometheusMetricsState operationNames' resolvedConnectionTemplate = do
|
||||
operationNames <- STM.atomically $ TMap.getMap operationNames'
|
||||
(totalTime, (snapshotTime, batchesDetailsAndProcessedCohorts)) <- withElapsedTime $ do
|
||||
-- snapshot the current cohorts and split them into batches
|
||||
-- This STM transaction is a read only transaction i.e. it doesn't mutate any state
|
||||
@ -275,6 +280,14 @@ pollStreamingQuery pollerId pollerResponseState streamingQueryOpts (sourceName,
|
||||
query
|
||||
(over (each . _2) C._csVariables $ fmap (fmap fst) cohorts)
|
||||
resolvedConnectionTemplate
|
||||
let dbExecTimeMetric = submDBExecTotalTime $ pmSubscriptionMetrics $ prometheusMetrics
|
||||
recordSubcriptionMetric
|
||||
granularPrometheusMetricsState
|
||||
True
|
||||
operationNames
|
||||
parameterizedQueryHash
|
||||
streamingSubscriptionLabel
|
||||
(flip (HistogramVector.observe dbExecTimeMetric) (realToFrac queryExecutionTime))
|
||||
|
||||
previousPollerResponseState <- STM.readTVarIO pollerResponseState
|
||||
|
||||
@ -426,6 +439,14 @@ pollStreamingQuery pollerId pollerResponseState streamingQueryOpts (sourceName,
|
||||
currentCohorts
|
||||
TMap.replace cohortMap updatedCohortsMap
|
||||
postPollHook pollDetails
|
||||
let totalTimeMetric = submTotalTime $ pmSubscriptionMetrics $ prometheusMetrics
|
||||
recordSubcriptionMetric
|
||||
granularPrometheusMetricsState
|
||||
True
|
||||
operationNames
|
||||
parameterizedQueryHash
|
||||
streamingSubscriptionLabel
|
||||
(flip (HistogramVector.observe totalTimeMetric) (realToFrac totalTime))
|
||||
where
|
||||
SubscriptionsOptions batchSize _ = streamingQueryOpts
|
||||
|
||||
|
@ -52,13 +52,22 @@ import Hasura.RQL.Types.Action
|
||||
import Hasura.RQL.Types.Common (SourceName)
|
||||
import Hasura.SQL.AnyBackend qualified as AB
|
||||
import Hasura.Server.Metrics (ServerMetrics (..))
|
||||
import Hasura.Server.Prometheus (PrometheusMetrics (..), SubscriptionMetrics (..))
|
||||
import Hasura.Server.Types (RequestId)
|
||||
import Hasura.Server.Prometheus
|
||||
( DynamicSubscriptionLabel (..),
|
||||
PrometheusMetrics (..),
|
||||
SubscriptionLabel (..),
|
||||
SubscriptionMetrics (..),
|
||||
liveQuerySubscriptionLabel,
|
||||
recordMetricWithLabel,
|
||||
streamingSubscriptionLabel,
|
||||
)
|
||||
import Hasura.Server.Types (GranularPrometheusMetricsState (..), RequestId)
|
||||
import Language.GraphQL.Draft.Syntax qualified as G
|
||||
import Refined (unrefine)
|
||||
import StmContainers.Map qualified as STMMap
|
||||
import System.Metrics.Gauge qualified as EKG.Gauge
|
||||
import System.Metrics.Prometheus.Gauge qualified as Prometheus.Gauge
|
||||
import System.Metrics.Prometheus.GaugeVector qualified as GaugeVector
|
||||
|
||||
-- | The top-level datatype that holds the state for all active subscriptions.
|
||||
--
|
||||
@ -117,8 +126,10 @@ findPollerForSubscriber ::
|
||||
CohortKey ->
|
||||
(Subscriber -> Cohort streamCursorVars -> STM.STM streamCursorVars) ->
|
||||
(Subscriber -> Poller streamCursorVars -> STM.STM streamCursorVars) ->
|
||||
ParameterizedQueryHash ->
|
||||
Maybe OperationName ->
|
||||
STM.STM ((Maybe (Poller streamCursorVars)), streamCursorVars)
|
||||
findPollerForSubscriber subscriber pollerMap pollerKey cohortKey addToCohort addToPoller =
|
||||
findPollerForSubscriber subscriber pollerMap pollerKey cohortKey addToCohort addToPoller parameterizedQueryHash maybeOperationName =
|
||||
-- a handler is returned only when it is newly created
|
||||
STMMap.lookup pollerKey pollerMap >>= \case
|
||||
Just poller -> do
|
||||
@ -130,11 +141,18 @@ findPollerForSubscriber subscriber pollerMap pollerKey cohortKey addToCohort add
|
||||
-- cohort not found. Create a cohort with the subscriber and add
|
||||
-- the cohort to the poller
|
||||
Nothing -> addToPoller subscriber poller
|
||||
-- Add the operation name of the subcription to the poller, if it doesn't exist
|
||||
-- else increment the count for the operation name
|
||||
TMap.lookup maybeOperationName (_pOperationNamesMap poller) >>= \case
|
||||
Nothing -> TMap.insert 1 maybeOperationName (_pOperationNamesMap poller)
|
||||
Just _ -> TMap.adjust (+ 1) maybeOperationName (_pOperationNamesMap poller)
|
||||
return (Nothing, cursorVars)
|
||||
Nothing -> do
|
||||
-- no poller found, so create one with the cohort
|
||||
-- and the subscriber within it.
|
||||
!poller <- Poller <$> TMap.new <*> STM.newTVar PRSSuccess <*> STM.newEmptyTMVar
|
||||
operationNamesMap <- TMap.new
|
||||
TMap.insert 1 maybeOperationName operationNamesMap
|
||||
!poller <- Poller <$> TMap.new <*> STM.newTVar PRSSuccess <*> STM.newEmptyTMVar <*> pure parameterizedQueryHash <*> pure operationNamesMap
|
||||
cursorVars <- addToPoller subscriber poller
|
||||
STMMap.insert poller pollerKey pollerMap
|
||||
return $ (Just poller, cursorVars)
|
||||
@ -155,6 +173,7 @@ addLiveQuery ::
|
||||
Maybe OperationName ->
|
||||
RequestId ->
|
||||
SubscriptionQueryPlan b (MultiplexedQuery b) ->
|
||||
IO GranularPrometheusMetricsState ->
|
||||
-- | the action to be executed when result changes
|
||||
OnChange ->
|
||||
IO LiveQuerySubscriberDetails
|
||||
@ -170,6 +189,7 @@ addLiveQuery
|
||||
operationName
|
||||
requestId
|
||||
plan
|
||||
granularPrometheusMetricsState
|
||||
onResultAction = do
|
||||
-- CAREFUL!: It's absolutely crucial that we can't throw any exceptions here!
|
||||
|
||||
@ -187,6 +207,8 @@ addLiveQuery
|
||||
cohortKey
|
||||
addToCohort
|
||||
addToPoller
|
||||
parameterizedQueryHash
|
||||
operationName
|
||||
|
||||
-- we can then attach a polling thread if it is new the livequery can only be
|
||||
-- cancelled after putTMVar
|
||||
@ -207,6 +229,8 @@ addLiveQuery
|
||||
(_pCohorts poller)
|
||||
postPollHook
|
||||
prometheusMetrics
|
||||
granularPrometheusMetricsState
|
||||
(_pOperationNamesMap poller)
|
||||
resolvedConnectionTemplate
|
||||
sleep $ unrefine $ unRefetchInterval refetchInterval
|
||||
let !pState = PollerIOState threadRef pollerId
|
||||
@ -215,7 +239,14 @@ addLiveQuery
|
||||
liftIO $ Prometheus.Gauge.inc $ submActiveLiveQueryPollers $ pmSubscriptionMetrics $ prometheusMetrics
|
||||
|
||||
liftIO $ EKG.Gauge.inc $ smActiveSubscriptions serverMetrics
|
||||
liftIO $ Prometheus.Gauge.inc $ pmActiveSubscriptions prometheusMetrics
|
||||
let promMetricGranularLabel = SubscriptionLabel liveQuerySubscriptionLabel (Just $ DynamicSubscriptionLabel parameterizedQueryHash operationName)
|
||||
promMetricLabel = SubscriptionLabel liveQuerySubscriptionLabel Nothing
|
||||
let numSubscriptionMetric = submActiveSubscriptions $ pmSubscriptionMetrics $ prometheusMetrics
|
||||
recordMetricWithLabel
|
||||
granularPrometheusMetricsState
|
||||
True
|
||||
(GaugeVector.inc numSubscriptionMetric promMetricGranularLabel)
|
||||
(GaugeVector.inc numSubscriptionMetric promMetricLabel)
|
||||
liftIO $ EKG.Gauge.inc $ smActiveLiveQueries serverMetrics
|
||||
|
||||
pure $ SubscriberDetails handlerId cohortKey subscriberId
|
||||
@ -256,6 +287,7 @@ addStreamSubscriptionQuery ::
|
||||
-- | root field name
|
||||
G.Name ->
|
||||
SubscriptionQueryPlan b (MultiplexedQuery b) ->
|
||||
IO GranularPrometheusMetricsState ->
|
||||
-- | the action to be executed when result changes
|
||||
OnChange ->
|
||||
IO StreamingSubscriberDetails
|
||||
@ -272,6 +304,7 @@ addStreamSubscriptionQuery
|
||||
requestId
|
||||
rootFieldName
|
||||
plan
|
||||
granularPrometheusMetricsState
|
||||
onResultAction = do
|
||||
-- CAREFUL!: It's absolutely crucial that we can't throw any exceptions here!
|
||||
|
||||
@ -289,6 +322,8 @@ addStreamSubscriptionQuery
|
||||
cohortKey
|
||||
addToCohort
|
||||
addToPoller
|
||||
parameterizedQueryHash
|
||||
operationName
|
||||
|
||||
-- we can then attach a polling thread if it is new the subscription can only be
|
||||
-- cancelled after putTMVar
|
||||
@ -298,7 +333,22 @@ addStreamSubscriptionQuery
|
||||
forever $ do
|
||||
(_, streamQOpts) <- getSubscriptionOptions
|
||||
let SubscriptionsOptions _ refetchInterval = streamQOpts
|
||||
pollStreamingQuery @b pollerId (_pPollerState handler) streamQOpts (source, sourceConfig) role parameterizedQueryHash query (_pCohorts handler) rootFieldName postPollHook Nothing prometheusMetrics resolvedConnectionTemplate
|
||||
pollStreamingQuery @b
|
||||
pollerId
|
||||
(_pPollerState handler)
|
||||
streamQOpts
|
||||
(source, sourceConfig)
|
||||
role
|
||||
parameterizedQueryHash
|
||||
query
|
||||
(_pCohorts handler)
|
||||
rootFieldName
|
||||
postPollHook
|
||||
Nothing
|
||||
prometheusMetrics
|
||||
granularPrometheusMetricsState
|
||||
(_pOperationNamesMap handler)
|
||||
resolvedConnectionTemplate
|
||||
sleep $ unrefine $ unRefetchInterval refetchInterval
|
||||
let !pState = PollerIOState threadRef pollerId
|
||||
$assertNFHere pState -- so we don't write thunks to mutable vars
|
||||
@ -307,9 +357,17 @@ addStreamSubscriptionQuery
|
||||
|
||||
liftIO $ do
|
||||
EKG.Gauge.inc $ smActiveSubscriptions serverMetrics
|
||||
Prometheus.Gauge.inc $ pmActiveSubscriptions prometheusMetrics
|
||||
EKG.Gauge.inc $ smActiveStreamingSubscriptions serverMetrics
|
||||
|
||||
let promMetricGranularLabel = SubscriptionLabel streamingSubscriptionLabel (Just $ DynamicSubscriptionLabel parameterizedQueryHash operationName)
|
||||
promMetricLabel = SubscriptionLabel streamingSubscriptionLabel Nothing
|
||||
numSubscriptionMetric = submActiveSubscriptions $ pmSubscriptionMetrics $ prometheusMetrics
|
||||
recordMetricWithLabel
|
||||
granularPrometheusMetricsState
|
||||
True
|
||||
(GaugeVector.inc numSubscriptionMetric promMetricGranularLabel)
|
||||
(GaugeVector.inc numSubscriptionMetric promMetricLabel)
|
||||
|
||||
pure $ SubscriberDetails handlerId (cohortKey, cohortCursorTVar) subscriberId
|
||||
where
|
||||
SubscriptionsState _ streamQueryMap postPollHook _ = subscriptionState
|
||||
@ -336,16 +394,27 @@ removeLiveQuery ::
|
||||
SubscriptionsState ->
|
||||
-- the query and the associated operation
|
||||
LiveQuerySubscriberDetails ->
|
||||
IO GranularPrometheusMetricsState ->
|
||||
Maybe OperationName ->
|
||||
IO ()
|
||||
removeLiveQuery logger serverMetrics prometheusMetrics lqState lqId@(SubscriberDetails handlerId cohortId sinkId) = mask_ $ do
|
||||
mbCleanupIO <- STM.atomically $ do
|
||||
detM <- getQueryDet lqMap
|
||||
fmap join $
|
||||
forM detM $ \(Poller cohorts _ ioState, cohort) ->
|
||||
cleanHandlerC cohorts ioState cohort
|
||||
sequence_ mbCleanupIO
|
||||
removeLiveQuery logger serverMetrics prometheusMetrics lqState lqId@(SubscriberDetails handlerId cohortId sinkId) granularPrometheusMetricsState maybeOperationName = mask_ $ do
|
||||
join $
|
||||
STM.atomically $ do
|
||||
detM <- getQueryDet lqMap
|
||||
case detM of
|
||||
Nothing -> return (pure ())
|
||||
Just (Poller cohorts _ ioState parameterizedQueryHash operationNamesMap, cohort) -> do
|
||||
TMap.lookup maybeOperationName operationNamesMap >>= \case
|
||||
-- If only one operation name is present in the map, delete it
|
||||
Just 1 -> TMap.delete maybeOperationName operationNamesMap
|
||||
-- If the count of a operation name is more than 1, then it means there
|
||||
-- are more subscriptions with the same name and we should keep emitting
|
||||
-- the metrics until the all the subscription with that operaion name are
|
||||
-- removed
|
||||
Just _ -> TMap.adjust (\v -> v - 1) maybeOperationName operationNamesMap
|
||||
Nothing -> return ()
|
||||
cleanHandlerC cohorts ioState cohort parameterizedQueryHash
|
||||
liftIO $ EKG.Gauge.dec $ smActiveSubscriptions serverMetrics
|
||||
liftIO $ Prometheus.Gauge.dec $ pmActiveSubscriptions prometheusMetrics
|
||||
liftIO $ EKG.Gauge.dec $ smActiveLiveQueries serverMetrics
|
||||
where
|
||||
lqMap = _ssLiveQueryMap lqState
|
||||
@ -357,7 +426,7 @@ removeLiveQuery logger serverMetrics prometheusMetrics lqState lqId@(SubscriberD
|
||||
cohortM <- TMap.lookup cohortId (_pCohorts poller)
|
||||
return $ (poller,) <$> cohortM
|
||||
|
||||
cleanHandlerC cohortMap ioState handlerC = do
|
||||
cleanHandlerC cohortMap ioState handlerC parameterizedQueryHash = do
|
||||
let curOps = _cExistingSubscribers handlerC
|
||||
newOps = _cNewSubscribers handlerC
|
||||
TMap.delete sinkId curOps
|
||||
@ -368,6 +437,8 @@ removeLiveQuery logger serverMetrics prometheusMetrics lqState lqId@(SubscriberD
|
||||
<*> TMap.null newOps
|
||||
when cohortIsEmpty $ TMap.delete cohortId cohortMap
|
||||
handlerIsEmpty <- TMap.null cohortMap
|
||||
let promMetricGranularLabel = SubscriptionLabel liveQuerySubscriptionLabel (Just $ DynamicSubscriptionLabel parameterizedQueryHash maybeOperationName)
|
||||
promMetricLabel = SubscriptionLabel liveQuerySubscriptionLabel Nothing
|
||||
-- when there is no need for handler i.e, this happens to be the last
|
||||
-- operation, take the ref for the polling thread to cancel it
|
||||
if handlerIsEmpty
|
||||
@ -375,21 +446,34 @@ removeLiveQuery logger serverMetrics prometheusMetrics lqState lqId@(SubscriberD
|
||||
STMMap.delete handlerId lqMap
|
||||
threadRefM <- fmap _pThread <$> STM.tryReadTMVar ioState
|
||||
return $
|
||||
Just $ -- deferred IO:
|
||||
case threadRefM of
|
||||
Just threadRef -> do
|
||||
Immortal.stop threadRef
|
||||
liftIO $ Prometheus.Gauge.dec $ submActiveLiveQueryPollers $ pmSubscriptionMetrics prometheusMetrics
|
||||
|
||||
-- This would seem to imply addLiveQuery broke or a bug
|
||||
-- elsewhere. Be paranoid and log:
|
||||
Nothing ->
|
||||
L.unLogger logger $
|
||||
L.UnstructuredLog L.LevelError $
|
||||
fromString $
|
||||
"In removeLiveQuery no worker thread installed. Please report this as a bug: "
|
||||
<> show lqId
|
||||
else return Nothing
|
||||
-- deferred IO:
|
||||
case threadRefM of
|
||||
Just threadRef -> do
|
||||
Immortal.stop threadRef
|
||||
liftIO $ do
|
||||
Prometheus.Gauge.dec $ submActiveLiveQueryPollers $ pmSubscriptionMetrics prometheusMetrics
|
||||
let numSubscriptionMetric = submActiveSubscriptions $ pmSubscriptionMetrics $ prometheusMetrics
|
||||
recordMetricWithLabel
|
||||
granularPrometheusMetricsState
|
||||
True
|
||||
(GaugeVector.dec numSubscriptionMetric promMetricGranularLabel)
|
||||
(GaugeVector.dec numSubscriptionMetric promMetricLabel)
|
||||
-- This would seem to imply addLiveQuery broke or a bug
|
||||
-- elsewhere. Be paranoid and log:
|
||||
Nothing ->
|
||||
L.unLogger logger $
|
||||
L.UnstructuredLog L.LevelError $
|
||||
fromString $
|
||||
"In removeLiveQuery no worker thread installed. Please report this as a bug: "
|
||||
<> show lqId
|
||||
else do
|
||||
let numSubscriptionMetric = submActiveSubscriptions $ pmSubscriptionMetrics $ prometheusMetrics
|
||||
return $
|
||||
recordMetricWithLabel
|
||||
granularPrometheusMetricsState
|
||||
True
|
||||
(GaugeVector.dec numSubscriptionMetric promMetricGranularLabel)
|
||||
(GaugeVector.dec numSubscriptionMetric promMetricLabel)
|
||||
|
||||
removeStreamingQuery ::
|
||||
L.Logger L.Hasura ->
|
||||
@ -398,17 +482,28 @@ removeStreamingQuery ::
|
||||
SubscriptionsState ->
|
||||
-- the query and the associated operation
|
||||
StreamingSubscriberDetails ->
|
||||
IO GranularPrometheusMetricsState ->
|
||||
Maybe OperationName ->
|
||||
IO ()
|
||||
removeStreamingQuery logger serverMetrics prometheusMetrics subscriptionState (SubscriberDetails handlerId (cohortId, cursorVariableTV) sinkId) = mask_ $ do
|
||||
mbCleanupIO <- STM.atomically $ do
|
||||
detM <- getQueryDet streamQMap
|
||||
fmap join $
|
||||
forM detM $ \(Poller cohorts _ ioState, currentCohortId, cohort) ->
|
||||
cleanHandlerC cohorts ioState (cohort, currentCohortId)
|
||||
sequence_ mbCleanupIO
|
||||
removeStreamingQuery logger serverMetrics prometheusMetrics subscriptionState (SubscriberDetails handlerId (cohortId, cursorVariableTV) sinkId) granularPrometheusMetricsState maybeOperationName = mask_ $ do
|
||||
join $
|
||||
STM.atomically $ do
|
||||
detM <- getQueryDet streamQMap
|
||||
case detM of
|
||||
Nothing -> return (pure ())
|
||||
Just (Poller cohorts _ ioState parameterizedQueryHash operationNamesMap, currentCohortId, cohort) -> do
|
||||
TMap.lookup maybeOperationName operationNamesMap >>= \case
|
||||
-- If only one operation name is present in the map, delete it
|
||||
Just 1 -> TMap.delete maybeOperationName operationNamesMap
|
||||
-- If the count of a operation name is more than 1, then it means there
|
||||
-- are more subscriptions with the same name and we should keep emitting
|
||||
-- the metrics until the all the subscription with the operaion name are
|
||||
-- removed
|
||||
Just _ -> TMap.adjust (\v -> v - 1) maybeOperationName operationNamesMap
|
||||
Nothing -> return ()
|
||||
cleanHandlerC cohorts ioState (cohort, currentCohortId) parameterizedQueryHash
|
||||
liftIO $ do
|
||||
EKG.Gauge.dec $ smActiveSubscriptions serverMetrics
|
||||
Prometheus.Gauge.dec $ pmActiveSubscriptions prometheusMetrics
|
||||
EKG.Gauge.dec $ smActiveStreamingSubscriptions serverMetrics
|
||||
where
|
||||
streamQMap = _ssStreamQueryMap subscriptionState
|
||||
@ -422,7 +517,7 @@ removeStreamingQuery logger serverMetrics prometheusMetrics subscriptionState (S
|
||||
cohortM <- TMap.lookup updatedCohortId (_pCohorts poller)
|
||||
return $ (poller,updatedCohortId,) <$> cohortM
|
||||
|
||||
cleanHandlerC cohortMap ioState (handlerC, currentCohortId) = do
|
||||
cleanHandlerC cohortMap ioState (handlerC, currentCohortId) parameterizedQueryHash = do
|
||||
let curOps = _cExistingSubscribers handlerC
|
||||
newOps = _cNewSubscribers handlerC
|
||||
TMap.delete sinkId curOps
|
||||
@ -433,6 +528,8 @@ removeStreamingQuery logger serverMetrics prometheusMetrics subscriptionState (S
|
||||
<*> TMap.null newOps
|
||||
when cohortIsEmpty $ TMap.delete currentCohortId cohortMap
|
||||
handlerIsEmpty <- TMap.null cohortMap
|
||||
let promMetricGranularLabel = SubscriptionLabel streamingSubscriptionLabel (Just $ DynamicSubscriptionLabel parameterizedQueryHash maybeOperationName)
|
||||
promMetricLabel = SubscriptionLabel streamingSubscriptionLabel Nothing
|
||||
-- when there is no need for handler i.e,
|
||||
-- operation, take the ref for the polling thread to cancel it
|
||||
if handlerIsEmpty
|
||||
@ -440,28 +537,39 @@ removeStreamingQuery logger serverMetrics prometheusMetrics subscriptionState (S
|
||||
STMMap.delete handlerId streamQMap
|
||||
threadRefM <- fmap _pThread <$> STM.tryReadTMVar ioState
|
||||
return $
|
||||
Just $ -- deferred IO:
|
||||
case threadRefM of
|
||||
Just threadRef -> do
|
||||
Immortal.stop threadRef
|
||||
liftIO $
|
||||
Prometheus.Gauge.dec $
|
||||
submActiveStreamingPollers $
|
||||
pmSubscriptionMetrics prometheusMetrics
|
||||
-- This would seem to imply addStreamSubscriptionQuery broke or a bug
|
||||
-- elsewhere. Be paranoid and log:
|
||||
Nothing ->
|
||||
L.unLogger logger $
|
||||
L.UnstructuredLog L.LevelError $
|
||||
fromString $
|
||||
"In removeStreamingQuery no worker thread installed. Please report this as a bug: "
|
||||
<> " poller_id: "
|
||||
<> show handlerId
|
||||
<> ", cohort_id: "
|
||||
<> show cohortId
|
||||
<> ", subscriber_id:"
|
||||
<> show sinkId
|
||||
else return Nothing
|
||||
-- deferred IO:
|
||||
case threadRefM of
|
||||
Just threadRef -> do
|
||||
Immortal.stop threadRef
|
||||
liftIO $ do
|
||||
Prometheus.Gauge.dec $ submActiveStreamingPollers $ pmSubscriptionMetrics prometheusMetrics
|
||||
let numSubscriptionMetric = submActiveSubscriptions $ pmSubscriptionMetrics $ prometheusMetrics
|
||||
recordMetricWithLabel
|
||||
granularPrometheusMetricsState
|
||||
True
|
||||
(GaugeVector.dec numSubscriptionMetric promMetricGranularLabel)
|
||||
(GaugeVector.dec numSubscriptionMetric promMetricLabel)
|
||||
-- This would seem to imply addStreamSubscriptionQuery broke or a bug
|
||||
-- elsewhere. Be paranoid and log:
|
||||
Nothing ->
|
||||
L.unLogger logger $
|
||||
L.UnstructuredLog L.LevelError $
|
||||
fromString $
|
||||
"In removeStreamingQuery no worker thread installed. Please report this as a bug: "
|
||||
<> " poller_id: "
|
||||
<> show handlerId
|
||||
<> ", cohort_id: "
|
||||
<> show cohortId
|
||||
<> ", subscriber_id:"
|
||||
<> show sinkId
|
||||
else do
|
||||
let numSubscriptionMetric = submActiveSubscriptions $ pmSubscriptionMetrics $ prometheusMetrics
|
||||
return $
|
||||
recordMetricWithLabel
|
||||
granularPrometheusMetricsState
|
||||
True
|
||||
(GaugeVector.dec numSubscriptionMetric promMetricGranularLabel)
|
||||
(GaugeVector.dec numSubscriptionMetric promMetricLabel)
|
||||
|
||||
-- | An async action query whose relationships are refered to table in a source.
|
||||
-- We need to generate an SQL statement with the action response and execute it
|
||||
|
@ -11,6 +11,7 @@ module Hasura.GraphQL.Execute.Subscription.TMap
|
||||
union,
|
||||
filterWithKey,
|
||||
getMap,
|
||||
adjust,
|
||||
)
|
||||
where
|
||||
|
||||
@ -58,3 +59,6 @@ union mapA mapB = do
|
||||
|
||||
getMap :: TMap k v -> STM (HashMap.HashMap k v)
|
||||
getMap = readTVar . unTMap
|
||||
|
||||
adjust :: (Hashable k) => (v -> v) -> k -> TMap k v -> STM ()
|
||||
adjust f k mapTV = modifyTVar' (unTMap mapTV) $ HashMap.adjust f k
|
||||
|
@ -121,7 +121,7 @@ parameterizedQueryHashListToObject =
|
||||
[("parameterized_query_hash", J.toJSON queryHashes)]
|
||||
|
||||
newtype ParameterizedQueryHash = ParameterizedQueryHash {unParamQueryHash :: B.ByteString}
|
||||
deriving (Show, Eq)
|
||||
deriving (Show, Eq, Ord)
|
||||
|
||||
instance J.ToJSON ParameterizedQueryHash where
|
||||
toJSON = J.String . bsToTxt . unParamQueryHash
|
||||
|
@ -40,6 +40,7 @@ import Hasura.Server.Prometheus
|
||||
decWebsocketConnections,
|
||||
incWebsocketConnections,
|
||||
)
|
||||
import Hasura.Server.Types (MonadGetPolicies (..))
|
||||
import Hasura.Services.Network
|
||||
import Hasura.Tracing qualified as Tracing
|
||||
import Network.WebSockets qualified as WS
|
||||
@ -59,7 +60,8 @@ createWSServerApp ::
|
||||
MonadQueryTags m,
|
||||
HasResourceLimits m,
|
||||
ProvidesNetwork m,
|
||||
Tracing.MonadTrace m
|
||||
Tracing.MonadTrace m,
|
||||
MonadGetPolicies m
|
||||
) =>
|
||||
HashSet (L.EngineLogType L.Hasura) ->
|
||||
WSServerEnv impl ->
|
||||
@ -96,9 +98,10 @@ createWSServerApp enabledLogTypes serverEnv connInitTimeout licenseKeyCache = \
|
||||
onMessage enabledLogTypes getAuthMode serverEnv conn bs (wsActions sp) licenseKeyCache
|
||||
|
||||
onCloseHandler conn = mask_ do
|
||||
granularPrometheusMetricsState <- runGetPrometheusMetricsGranularity
|
||||
liftIO $ EKG.Gauge.dec $ smWebsocketConnections serverMetrics
|
||||
liftIO $ decWebsocketConnections $ pmConnections prometheusMetrics
|
||||
onClose logger serverMetrics prometheusMetrics (_wseSubscriptionState serverEnv) conn
|
||||
onClose logger serverMetrics prometheusMetrics (_wseSubscriptionState serverEnv) conn granularPrometheusMetricsState
|
||||
|
||||
stopWSServerApp :: WSServerEnv impl -> IO ()
|
||||
stopWSServerApp wsEnv = WS.shutdown (_wseServer wsEnv)
|
||||
|
@ -93,7 +93,7 @@ import Hasura.Server.Prometheus
|
||||
PrometheusMetrics (..),
|
||||
)
|
||||
import Hasura.Server.Telemetry.Counters qualified as Telem
|
||||
import Hasura.Server.Types (RequestId, getRequestId)
|
||||
import Hasura.Server.Types (GranularPrometheusMetricsState (..), MonadGetPolicies (..), RequestId, getRequestId)
|
||||
import Hasura.Services.Network
|
||||
import Hasura.Session
|
||||
import Hasura.Tracing qualified as Tracing
|
||||
@ -420,7 +420,8 @@ onStart ::
|
||||
MonadMetadataStorage m,
|
||||
MonadQueryTags m,
|
||||
HasResourceLimits m,
|
||||
ProvidesNetwork m
|
||||
ProvidesNetwork m,
|
||||
MonadGetPolicies m
|
||||
) =>
|
||||
HashSet (L.EngineLogType L.Hasura) ->
|
||||
Maybe (CredentialCache AgentLicenseKey) ->
|
||||
@ -679,9 +680,9 @@ onStart enabledLogTypes agentLicenseKey serverEnv wsConn shouldCaptureVariables
|
||||
E.SEOnSourceDB (E.SSLivequery actionIds liveQueryBuilder) -> do
|
||||
actionLogMapE <- fmap fst <$> runExceptT (EA.fetchActionLogResponses actionIds)
|
||||
actionLogMap <- onLeft actionLogMapE (withComplete . preExecErr requestId (Just gqlOpType))
|
||||
opMetadataE <- liftIO $ startLiveQuery liveQueryBuilder parameterizedQueryHash requestId actionLogMap
|
||||
granularPrometheusMetricsState <- runGetPrometheusMetricsGranularity
|
||||
opMetadataE <- liftIO $ startLiveQuery liveQueryBuilder parameterizedQueryHash requestId actionLogMap granularPrometheusMetricsState
|
||||
lqId <- onLeft opMetadataE (withComplete . preExecErr requestId (Just gqlOpType))
|
||||
|
||||
-- Update async action query subscription state
|
||||
case NE.nonEmpty (toList actionIds) of
|
||||
Nothing -> do
|
||||
@ -694,11 +695,11 @@ onStart enabledLogTypes agentLicenseKey serverEnv wsConn shouldCaptureVariables
|
||||
let asyncActionQueryLive =
|
||||
ES.LAAQOnSourceDB $
|
||||
ES.LiveAsyncActionQueryOnSource lqId actionLogMap $
|
||||
restartLiveQuery parameterizedQueryHash requestId liveQueryBuilder
|
||||
restartLiveQuery parameterizedQueryHash requestId liveQueryBuilder granularPrometheusMetricsState (_grOperationName reqParsed)
|
||||
|
||||
onUnexpectedException err = do
|
||||
sendError requestId err
|
||||
stopOperation serverEnv wsConn opId (pure ()) -- Don't log in case opId don't exist
|
||||
stopOperation serverEnv wsConn opId granularPrometheusMetricsState (pure ()) -- Don't log in case opId don't exist
|
||||
ES.addAsyncActionLiveQuery
|
||||
(ES._ssAsyncActions subscriptionsState)
|
||||
opId
|
||||
@ -706,7 +707,8 @@ onStart enabledLogTypes agentLicenseKey serverEnv wsConn shouldCaptureVariables
|
||||
onUnexpectedException
|
||||
asyncActionQueryLive
|
||||
E.SEOnSourceDB (E.SSStreaming rootFieldName streamQueryBuilder) -> do
|
||||
liftIO $ startStreamingQuery rootFieldName streamQueryBuilder parameterizedQueryHash requestId
|
||||
granularPrometheusMetricsState <- runGetPrometheusMetricsGranularity
|
||||
liftIO $ startStreamingQuery rootFieldName streamQueryBuilder parameterizedQueryHash requestId granularPrometheusMetricsState
|
||||
|
||||
liftIO $ Prometheus.Counter.inc (gqlRequestsSubscriptionSuccess gqlMetrics)
|
||||
liftIO $ logOpEv ODStarted (Just requestId) (Just parameterizedQueryHash)
|
||||
@ -894,12 +896,13 @@ onStart enabledLogTypes agentLicenseKey serverEnv wsConn shouldCaptureVariables
|
||||
liftIO $ sendCompleted Nothing Nothing
|
||||
throwError ()
|
||||
|
||||
restartLiveQuery parameterizedQueryHash requestId liveQueryBuilder lqId actionLogMap = do
|
||||
ES.removeLiveQuery logger (_wseServerMetrics serverEnv) (_wsePrometheusMetrics serverEnv) subscriptionsState lqId
|
||||
either (const Nothing) Just <$> startLiveQuery liveQueryBuilder parameterizedQueryHash requestId actionLogMap
|
||||
restartLiveQuery parameterizedQueryHash requestId liveQueryBuilder granularPrometheusMetricsState maybeOperationName lqId actionLogMap = do
|
||||
ES.removeLiveQuery logger (_wseServerMetrics serverEnv) (_wsePrometheusMetrics serverEnv) subscriptionsState lqId granularPrometheusMetricsState maybeOperationName
|
||||
either (const Nothing) Just <$> startLiveQuery liveQueryBuilder parameterizedQueryHash requestId actionLogMap granularPrometheusMetricsState
|
||||
|
||||
startLiveQuery liveQueryBuilder parameterizedQueryHash requestId actionLogMap = do
|
||||
startLiveQuery liveQueryBuilder parameterizedQueryHash requestId actionLogMap granularPrometheusMetricsState = do
|
||||
liveQueryE <- runExceptT $ liveQueryBuilder actionLogMap
|
||||
|
||||
for liveQueryE $ \(sourceName, E.SubscriptionQueryPlan exists) -> do
|
||||
let !opName = _grOperationName q
|
||||
subscriberMetadata = ES.mkSubscriberMetadata (WS.getWSId wsConn) opId opName requestId
|
||||
@ -920,14 +923,16 @@ onStart enabledLogTypes agentLicenseKey serverEnv wsConn shouldCaptureVariables
|
||||
opName
|
||||
requestId
|
||||
liveQueryPlan
|
||||
granularPrometheusMetricsState
|
||||
(onChange opName parameterizedQueryHash $ ES._sqpNamespace liveQueryPlan)
|
||||
|
||||
liftIO $ $assertNFHere (lqId, opName) -- so we don't write thunks to mutable vars
|
||||
STM.atomically $
|
||||
-- NOTE: see crucial `lookup` check above, ensuring this doesn't clobber:
|
||||
STMMap.insert (LiveQuerySubscriber lqId, opName) opId opMap
|
||||
pure lqId
|
||||
|
||||
startStreamingQuery rootFieldName (sourceName, E.SubscriptionQueryPlan exists) parameterizedQueryHash requestId = do
|
||||
startStreamingQuery rootFieldName (sourceName, E.SubscriptionQueryPlan exists) parameterizedQueryHash requestId granularPrometheusMetricsState = do
|
||||
let !opName = _grOperationName q
|
||||
subscriberMetadata = ES.mkSubscriberMetadata (WS.getWSId wsConn) opId opName requestId
|
||||
-- NOTE!: we mask async exceptions higher in the call stack, but it's
|
||||
@ -948,6 +953,7 @@ onStart enabledLogTypes agentLicenseKey serverEnv wsConn shouldCaptureVariables
|
||||
requestId
|
||||
(_rfaAlias rootFieldName)
|
||||
streamQueryPlan
|
||||
granularPrometheusMetricsState
|
||||
(onChange opName parameterizedQueryHash $ ES._sqpNamespace streamQueryPlan)
|
||||
liftIO $ $assertNFHere (streamSubscriberId, opName) -- so we don't write thunks to mutable vars
|
||||
STM.atomically $
|
||||
@ -1017,7 +1023,8 @@ onMessage ::
|
||||
MonadQueryTags m,
|
||||
HasResourceLimits m,
|
||||
ProvidesNetwork m,
|
||||
Tracing.MonadTrace m
|
||||
Tracing.MonadTrace m,
|
||||
MonadGetPolicies m
|
||||
) =>
|
||||
HashSet (L.EngineLogType L.Hasura) ->
|
||||
IO AuthMode ->
|
||||
@ -1052,7 +1059,9 @@ onMessage enabledLogTypes authMode serverEnv wsConn msgRaw onMessageActions agen
|
||||
then CaptureQueryVariables
|
||||
else DoNotCaptureQueryVariables
|
||||
onStart enabledLogTypes agentLicenseKey serverEnv wsConn shouldCaptureVariables startMsg onMessageActions
|
||||
CMStop stopMsg -> onStop serverEnv wsConn stopMsg
|
||||
CMStop stopMsg -> do
|
||||
granularPrometheusMetricsState <- runGetPrometheusMetricsGranularity
|
||||
onStop serverEnv wsConn stopMsg granularPrometheusMetricsState
|
||||
-- specfic to graphql-ws
|
||||
CMPing mPayload -> onPing wsConn mPayload
|
||||
CMPong _mPayload -> pure ()
|
||||
@ -1067,15 +1076,15 @@ onPing :: (MonadIO m) => WSConn -> Maybe PingPongPayload -> m ()
|
||||
onPing wsConn mPayload =
|
||||
liftIO $ sendMsg wsConn (SMPong mPayload)
|
||||
|
||||
onStop :: (MonadIO m) => WSServerEnv impl -> WSConn -> StopMsg -> m ()
|
||||
onStop serverEnv wsConn (StopMsg opId) = liftIO $ do
|
||||
onStop :: (MonadIO m) => WSServerEnv impl -> WSConn -> StopMsg -> IO GranularPrometheusMetricsState -> m ()
|
||||
onStop serverEnv wsConn (StopMsg opId) granularPrometheusMetricsState = liftIO $ do
|
||||
-- When a stop message is received for an operation, it may not be present in OpMap
|
||||
-- in these cases:
|
||||
-- 1. If the operation is a query/mutation - as we remove the operation from the
|
||||
-- OpMap as soon as it is executed
|
||||
-- 2. A misbehaving client
|
||||
-- 3. A bug on our end
|
||||
stopOperation serverEnv wsConn opId $
|
||||
stopOperation serverEnv wsConn opId granularPrometheusMetricsState $
|
||||
L.unLogger logger $
|
||||
L.UnstructuredLog L.LevelDebug $
|
||||
fromString $
|
||||
@ -1085,17 +1094,17 @@ onStop serverEnv wsConn (StopMsg opId) = liftIO $ do
|
||||
where
|
||||
logger = _wseLogger serverEnv
|
||||
|
||||
stopOperation :: WSServerEnv impl -> WSConn -> OperationId -> IO () -> IO ()
|
||||
stopOperation serverEnv wsConn opId logWhenOpNotExist = do
|
||||
stopOperation :: WSServerEnv impl -> WSConn -> OperationId -> IO GranularPrometheusMetricsState -> IO () -> IO ()
|
||||
stopOperation serverEnv wsConn opId granularPrometheusMetricsState logWhenOpNotExist = do
|
||||
opM <- liftIO $ STM.atomically $ STMMap.lookup opId opMap
|
||||
case opM of
|
||||
Just (subscriberDetails, opNameM) -> do
|
||||
logWSEvent logger wsConn $ EOperation $ opDet opNameM
|
||||
Just (subscriberDetails, operationName) -> do
|
||||
logWSEvent logger wsConn $ EOperation $ opDet operationName
|
||||
case subscriberDetails of
|
||||
LiveQuerySubscriber lqId ->
|
||||
ES.removeLiveQuery logger (_wseServerMetrics serverEnv) (_wsePrometheusMetrics serverEnv) subscriptionState lqId
|
||||
ES.removeLiveQuery logger (_wseServerMetrics serverEnv) (_wsePrometheusMetrics serverEnv) subscriptionState lqId granularPrometheusMetricsState operationName
|
||||
StreamingQuerySubscriber streamSubscriberId ->
|
||||
ES.removeStreamingQuery logger (_wseServerMetrics serverEnv) (_wsePrometheusMetrics serverEnv) subscriptionState streamSubscriberId
|
||||
ES.removeStreamingQuery logger (_wseServerMetrics serverEnv) (_wsePrometheusMetrics serverEnv) subscriptionState streamSubscriberId granularPrometheusMetricsState operationName
|
||||
Nothing -> logWhenOpNotExist
|
||||
STM.atomically $ STMMap.delete opId opMap
|
||||
where
|
||||
@ -1182,14 +1191,15 @@ onClose ::
|
||||
PrometheusMetrics ->
|
||||
ES.SubscriptionsState ->
|
||||
WSConn ->
|
||||
IO GranularPrometheusMetricsState ->
|
||||
m ()
|
||||
onClose logger serverMetrics prometheusMetrics subscriptionsState wsConn = do
|
||||
onClose logger serverMetrics prometheusMetrics subscriptionsState wsConn granularPrometheusMetricsState = do
|
||||
logWSEvent logger wsConn EClosed
|
||||
operations <- liftIO $ STM.atomically $ ListT.toList $ STMMap.listT opMap
|
||||
liftIO $
|
||||
for_ operations $ \(_, (subscriber, _)) ->
|
||||
for_ operations $ \(_, (subscriber, operationName)) ->
|
||||
case subscriber of
|
||||
LiveQuerySubscriber lqId -> ES.removeLiveQuery logger serverMetrics prometheusMetrics subscriptionsState lqId
|
||||
StreamingQuerySubscriber streamSubscriberId -> ES.removeStreamingQuery logger serverMetrics prometheusMetrics subscriptionsState streamSubscriberId
|
||||
LiveQuerySubscriber lqId -> ES.removeLiveQuery logger serverMetrics prometheusMetrics subscriptionsState lqId granularPrometheusMetricsState operationName
|
||||
StreamingQuerySubscriber streamSubscriberId -> ES.removeStreamingQuery logger serverMetrics prometheusMetrics subscriptionsState streamSubscriberId granularPrometheusMetricsState operationName
|
||||
where
|
||||
opMap = _wscOpMap $ WS.getData wsConn
|
||||
|
@ -1,3 +1,5 @@
|
||||
{-# LANGUAGE DeriveAnyClass #-}
|
||||
|
||||
-- | Mutable references for Prometheus metrics.
|
||||
--
|
||||
-- These metrics are independent from the metrics in "Hasura.Server.Metrics".
|
||||
@ -20,20 +22,32 @@ module Hasura.Server.Prometheus
|
||||
TriggerNameLabel (..),
|
||||
GranularPrometheusMetricsState (..),
|
||||
observeHistogramWithLabel,
|
||||
SubscriptionKindLabel (..),
|
||||
SubscriptionLabel (..),
|
||||
DynamicSubscriptionLabel (..),
|
||||
streamingSubscriptionLabel,
|
||||
liveQuerySubscriptionLabel,
|
||||
recordMetricWithLabel,
|
||||
recordSubcriptionMetric,
|
||||
)
|
||||
where
|
||||
|
||||
import Data.HashMap.Internal.Strict qualified as Map
|
||||
import Data.HashMap.Strict qualified as HashMap
|
||||
import Data.IORef (IORef, atomicModifyIORef', newIORef, readIORef)
|
||||
import Data.Int (Int64)
|
||||
import Hasura.GraphQL.ParameterizedQueryHash
|
||||
import Hasura.GraphQL.Transport.HTTP.Protocol (OperationName (..))
|
||||
import Hasura.Prelude
|
||||
import Hasura.RQL.Types.EventTrigger (TriggerName, triggerNameToTxt)
|
||||
import Hasura.Server.Types (GranularPrometheusMetricsState (..))
|
||||
import Language.GraphQL.Draft.Syntax qualified as G
|
||||
import System.Metrics.Prometheus (ToLabels (..))
|
||||
import System.Metrics.Prometheus.Counter (Counter)
|
||||
import System.Metrics.Prometheus.Counter qualified as Counter
|
||||
import System.Metrics.Prometheus.Gauge (Gauge)
|
||||
import System.Metrics.Prometheus.Gauge qualified as Gauge
|
||||
import System.Metrics.Prometheus.GaugeVector qualified as GaugeVector
|
||||
import System.Metrics.Prometheus.Histogram (Histogram)
|
||||
import System.Metrics.Prometheus.Histogram qualified as Histogram
|
||||
import System.Metrics.Prometheus.HistogramVector (HistogramVector)
|
||||
@ -44,7 +58,6 @@ import System.Metrics.Prometheus.HistogramVector qualified as HistogramVector
|
||||
-- | Mutable references for Prometheus metrics.
|
||||
data PrometheusMetrics = PrometheusMetrics
|
||||
{ pmConnections :: ConnectionsGauge,
|
||||
pmActiveSubscriptions :: Gauge,
|
||||
pmGraphQLRequestMetrics :: GraphQLRequestMetrics,
|
||||
pmEventTriggerMetrics :: EventTriggerMetrics,
|
||||
pmWebSocketBytesReceived :: Counter,
|
||||
@ -101,7 +114,10 @@ data SubscriptionMetrics = SubscriptionMetrics
|
||||
{ submActiveLiveQueryPollers :: Gauge,
|
||||
submActiveStreamingPollers :: Gauge,
|
||||
submActiveLiveQueryPollersInError :: Gauge,
|
||||
submActiveStreamingPollersInError :: Gauge
|
||||
submActiveStreamingPollersInError :: Gauge,
|
||||
submTotalTime :: HistogramVector.HistogramVector SubscriptionLabel,
|
||||
submDBExecTotalTime :: HistogramVector.HistogramVector SubscriptionLabel,
|
||||
submActiveSubscriptions :: GaugeVector.GaugeVector SubscriptionLabel
|
||||
}
|
||||
|
||||
data CacheRequestMetrics = CacheRequestMetrics
|
||||
@ -114,7 +130,6 @@ data CacheRequestMetrics = CacheRequestMetrics
|
||||
makeDummyPrometheusMetrics :: IO PrometheusMetrics
|
||||
makeDummyPrometheusMetrics = do
|
||||
pmConnections <- newConnectionsGauge
|
||||
pmActiveSubscriptions <- Gauge.new
|
||||
pmGraphQLRequestMetrics <- makeDummyGraphQLRequestMetrics
|
||||
pmEventTriggerMetrics <- makeDummyEventTriggerMetrics
|
||||
pmWebSocketBytesReceived <- Counter.new
|
||||
@ -176,6 +191,9 @@ makeDummySubscriptionMetrics = do
|
||||
submActiveStreamingPollers <- Gauge.new
|
||||
submActiveLiveQueryPollersInError <- Gauge.new
|
||||
submActiveStreamingPollersInError <- Gauge.new
|
||||
submTotalTime <- HistogramVector.new []
|
||||
submDBExecTotalTime <- HistogramVector.new []
|
||||
submActiveSubscriptions <- GaugeVector.new
|
||||
pure SubscriptionMetrics {..}
|
||||
|
||||
makeDummyCacheRequestMetrics :: IO CacheRequestMetrics
|
||||
@ -239,6 +257,40 @@ instance ToLabels (Maybe TriggerNameLabel) where
|
||||
toLabels Nothing = Map.empty
|
||||
toLabels (Just (TriggerNameLabel triggerName)) = Map.singleton "trigger_name" (triggerNameToTxt triggerName)
|
||||
|
||||
data SubscriptionKindLabel = SubscriptionKindLabel
|
||||
{ subscription_kind :: Text
|
||||
}
|
||||
deriving stock (Generic, Ord, Eq)
|
||||
deriving anyclass (ToLabels)
|
||||
|
||||
streamingSubscriptionLabel :: SubscriptionKindLabel
|
||||
streamingSubscriptionLabel = SubscriptionKindLabel "streaming"
|
||||
|
||||
liveQuerySubscriptionLabel :: SubscriptionKindLabel
|
||||
liveQuerySubscriptionLabel = SubscriptionKindLabel "live-query"
|
||||
|
||||
data DynamicSubscriptionLabel = DynamicSubscriptionLabel
|
||||
{ _dslParamQueryHash :: ParameterizedQueryHash,
|
||||
_dslOperationName :: Maybe OperationName
|
||||
}
|
||||
deriving stock (Generic, Ord, Eq)
|
||||
|
||||
instance ToLabels DynamicSubscriptionLabel where
|
||||
toLabels (DynamicSubscriptionLabel hash opName) =
|
||||
Map.fromList $
|
||||
[("parameterized_query_hash", bsToTxt $ unParamQueryHash hash)]
|
||||
<> maybe [] (\op -> [("operation_name", G.unName $ _unOperationName op)]) opName
|
||||
|
||||
data SubscriptionLabel = SubscriptionLabel
|
||||
{ _slKind :: SubscriptionKindLabel,
|
||||
_slDynamicLabels :: Maybe DynamicSubscriptionLabel
|
||||
}
|
||||
deriving stock (Generic, Ord, Eq)
|
||||
|
||||
instance ToLabels SubscriptionLabel where
|
||||
toLabels (SubscriptionLabel kind Nothing) = Map.fromList $ [("subscription_kind", subscription_kind kind)]
|
||||
toLabels (SubscriptionLabel kind (Just dl)) = (Map.fromList $ [("subscription_kind", subscription_kind kind)]) <> toLabels dl
|
||||
|
||||
-- | Record metrics with dynamic label
|
||||
recordMetricWithLabel ::
|
||||
(MonadIO m) =>
|
||||
@ -280,3 +332,39 @@ observeHistogramWithLabel getMetricState alwaysObserve histogramVector label val
|
||||
alwaysObserve
|
||||
(liftIO $ HistogramVector.observe histogramVector (Just label) value)
|
||||
(liftIO $ HistogramVector.observe histogramVector Nothing value)
|
||||
|
||||
-- | Record a subscription metric for all the operation names present in the subscription.
|
||||
-- Use this when you want to update the same value of the metric for all the operation names.
|
||||
recordSubcriptionMetric ::
|
||||
(MonadIO m) =>
|
||||
(IO GranularPrometheusMetricsState) ->
|
||||
-- should the metric be observed without a label when granularMetricsState is OFF
|
||||
Bool ->
|
||||
HashMap (Maybe OperationName) Int ->
|
||||
ParameterizedQueryHash ->
|
||||
SubscriptionKindLabel ->
|
||||
-- the mertic action to perform
|
||||
(SubscriptionLabel -> IO ()) ->
|
||||
m ()
|
||||
recordSubcriptionMetric getMetricState alwaysObserve operationNamesMap parameterizedQueryHash subscriptionKind metricAction = do
|
||||
-- if no operation names are present, then emit metric with only param query hash as dynamic label
|
||||
if (null operationNamesMap)
|
||||
then do
|
||||
let promMetricGranularLabel = SubscriptionLabel subscriptionKind (Just $ DynamicSubscriptionLabel parameterizedQueryHash Nothing)
|
||||
promMetricLabel = SubscriptionLabel subscriptionKind Nothing
|
||||
recordMetricWithLabel
|
||||
getMetricState
|
||||
alwaysObserve
|
||||
(metricAction promMetricGranularLabel)
|
||||
(metricAction promMetricLabel)
|
||||
else -- if operationNames are present, then emit the same metric for all the operation names
|
||||
do
|
||||
let operationNames = HashMap.keys operationNamesMap
|
||||
for_ operationNames $ \opName -> do
|
||||
let promMetricGranularLabel = SubscriptionLabel subscriptionKind (Just $ DynamicSubscriptionLabel parameterizedQueryHash opName)
|
||||
promMetricLabel = SubscriptionLabel subscriptionKind Nothing
|
||||
recordMetricWithLabel
|
||||
getMetricState
|
||||
alwaysObserve
|
||||
(metricAction promMetricGranularLabel)
|
||||
(metricAction promMetricLabel)
|
||||
|
@ -159,11 +159,11 @@ instance ToJSON ApolloFederationStatus where
|
||||
|
||||
-- | Whether or not to enable granular metrics for Prometheus.
|
||||
--
|
||||
-- `GranularMetricsOn` will enable the dynamic labels for the metrics. `GranularMetricsOff` will disable the dynamic
|
||||
-- labels for the metrics.
|
||||
-- `GranularMetricsOn` will enable the dynamic labels for the metrics.
|
||||
-- `GranularMetricsOff` will disable the dynamic labels for the metrics.
|
||||
--
|
||||
-- **Warning**: Enabling dynamic labels for Prometheus metrics can cause cardinality issues and can cause memory usage
|
||||
-- to increase.
|
||||
-- **Warning**: Enabling dynamic labels for Prometheus metrics can cause cardinality
|
||||
-- issues and can cause memory usage to increase.
|
||||
data GranularPrometheusMetricsState
|
||||
= GranularMetricsOff
|
||||
| GranularMetricsOn
|
||||
@ -182,6 +182,11 @@ instance ToJSON GranularPrometheusMetricsState where
|
||||
class Monad m => MonadGetPolicies m where
|
||||
runGetApiTimeLimit ::
|
||||
m (Maybe MaxTime)
|
||||
|
||||
-- 'GranularPrometheusMetricsState' is used to decide if dynamic labels needs to be
|
||||
-- added when emitting the prometheus metric. The state of this can be dynamically
|
||||
-- changed via policies. Hence we need to fetch the value from the policy everytime
|
||||
-- before emitting the metric. Thus we create an IO action which fetches the value.
|
||||
runGetPrometheusMetricsGranularity ::
|
||||
m (IO GranularPrometheusMetricsState)
|
||||
|
||||
|
@ -42,7 +42,7 @@ import Hasura.RQL.Types.Roles (RoleName, mkRoleName)
|
||||
import Hasura.Server.Init (considerEnv, databaseUrlOption, runWithEnv, _envVar)
|
||||
import Hasura.Server.Metrics (createServerMetrics)
|
||||
import Hasura.Server.Prometheus (makeDummyPrometheusMetrics)
|
||||
import Hasura.Server.Types (RequestId (..))
|
||||
import Hasura.Server.Types (GranularPrometheusMetricsState (..), RequestId (..))
|
||||
import Language.GraphQL.Draft.Syntax.QQ qualified as G
|
||||
import ListT qualified
|
||||
import StmContainers.Map qualified as STMMap
|
||||
@ -95,6 +95,9 @@ getStaticCohortSnapshot (Cohort cohortId _respRef existingSubsTV newSubsTV _) =
|
||||
|
||||
streamingSubscriptionPollingSpec :: SourceConfig ('Postgres 'Vanilla) -> Spec
|
||||
streamingSubscriptionPollingSpec srcConfig = do
|
||||
dummyServerStore <- runIO newStore
|
||||
dummyServerMetrics <- runIO $ createServerMetrics dummyServerStore
|
||||
dummyPromMetrics <- runIO makeDummyPrometheusMetrics
|
||||
let setupDDLTx =
|
||||
PG.unitQE
|
||||
defaultTxErrorHandler
|
||||
@ -133,6 +136,7 @@ streamingSubscriptionPollingSpec srcConfig = do
|
||||
|
||||
pollerId <- runIO $ PollerId <$> UUID.nextRandom
|
||||
pollerResponseState <- runIO $ STM.newTVarIO PRSSuccess
|
||||
emptyOperationNamesMap <- runIO $ STM.atomically $ TMap.new
|
||||
let defaultSubscriptionOptions = mkSubscriptionsOptions Nothing Nothing -- use default values
|
||||
paramQueryHash = mkUnsafeParameterizedQueryHash "random"
|
||||
-- hardcoded multiplexed query which is generated for the following GraphQL query:
|
||||
@ -151,7 +155,6 @@ streamingSubscriptionPollingSpec srcConfig = do
|
||||
ORDER BY "root.pg.id" ASC ) AS "_2_root" ) AS "numbers_stream" )
|
||||
AS "_fld_resp" ON ('true')
|
||||
|]
|
||||
dummyPrometheusMetrics <- runIO makeDummyPrometheusMetrics
|
||||
let pollingAction cohortMap testSyncAction =
|
||||
pollStreamingQuery
|
||||
@('Postgres 'Vanilla)
|
||||
@ -166,7 +169,10 @@ streamingSubscriptionPollingSpec srcConfig = do
|
||||
[G.name|randomRootField|]
|
||||
(const $ pure ())
|
||||
testSyncAction
|
||||
dummyPrometheusMetrics
|
||||
dummyPromMetrics
|
||||
(pure GranularMetricsOff)
|
||||
emptyOperationNamesMap
|
||||
Nothing
|
||||
|
||||
mkSubscriber sId =
|
||||
let wsId = maybe (error "Invalid UUID") WS.mkUnsafeWSId $ UUID.fromString "ec981f92-8d5a-47ab-a306-80af7cfb1113"
|
||||
@ -218,7 +224,7 @@ streamingSubscriptionPollingSpec srcConfig = do
|
||||
TMap.reset cohortMap
|
||||
TMap.insert cohort1 cohortKey1 cohortMap
|
||||
|
||||
runIO $ pollingAction cohortMap Nothing Nothing
|
||||
runIO $ pollingAction cohortMap Nothing
|
||||
currentCohortMap <- runIO $ STM.atomically $ TMap.getMap cohortMap
|
||||
|
||||
it "the key of the cohort1 should have been moved from the cohortKey1 to cohortKey2, so it should not be found anymore at cohortKey1" $ do
|
||||
@ -241,7 +247,7 @@ streamingSubscriptionPollingSpec srcConfig = do
|
||||
MVar.readMVar syncMVar
|
||||
STM.atomically $ TMap.insert cohort2 cohortKey3 cohortMap
|
||||
Async.withAsync
|
||||
(pollingAction cohortMap (Just syncAction) Nothing)
|
||||
(pollingAction cohortMap (Just syncAction))
|
||||
( \pollAsync -> do
|
||||
MVar.putMVar syncMVar ()
|
||||
Async.wait pollAsync
|
||||
@ -264,7 +270,7 @@ streamingSubscriptionPollingSpec srcConfig = do
|
||||
MVar.readMVar syncMVar
|
||||
STM.atomically $ TMap.delete cohortKey1 cohortMap
|
||||
Async.withAsync
|
||||
(pollingAction cohortMap (Just syncAction) Nothing)
|
||||
(pollingAction cohortMap (Just syncAction))
|
||||
( \pollAsync -> do
|
||||
MVar.putMVar syncMVar ()
|
||||
Async.wait pollAsync
|
||||
@ -283,7 +289,7 @@ streamingSubscriptionPollingSpec srcConfig = do
|
||||
MVar.readMVar syncMVar
|
||||
STM.atomically $ addSubscriberToCohort newTemporarySubscriber cohort1
|
||||
Async.withAsync
|
||||
(pollingAction cohortMap (Just syncAction) Nothing)
|
||||
(pollingAction cohortMap (Just syncAction))
|
||||
( \pollAsync -> do
|
||||
-- concurrently inserting a new cohort to a key (cohortKey2) to which
|
||||
-- cohort1 is expected to be associated after the current poll
|
||||
@ -315,7 +321,7 @@ streamingSubscriptionPollingSpec srcConfig = do
|
||||
MVar.readMVar syncMVar
|
||||
STM.atomically $ TMap.delete temporarySubscriberId (_cNewSubscribers cohort1)
|
||||
Async.withAsync
|
||||
(pollingAction cohortMap (Just syncAction) Nothing)
|
||||
(pollingAction cohortMap (Just syncAction))
|
||||
( \pollAsync -> do
|
||||
MVar.putMVar syncMVar ()
|
||||
Async.wait pollAsync
|
||||
@ -339,10 +345,6 @@ streamingSubscriptionPollingSpec srcConfig = do
|
||||
TMap.delete temporarySubscriberId (_cNewSubscribers cohort1)
|
||||
|
||||
describe "Adding two subscribers concurrently" $ do
|
||||
dummyServerStore <- runIO newStore
|
||||
dummyServerMetrics <- runIO $ createServerMetrics dummyServerStore
|
||||
dummyPromMetrics <- runIO makeDummyPrometheusMetrics
|
||||
|
||||
subscriptionState <- do
|
||||
runIO $ initSubscriptionsState (const (pure ()))
|
||||
|
||||
@ -389,6 +391,7 @@ streamingSubscriptionPollingSpec srcConfig = do
|
||||
reqId
|
||||
[G.name|numbers_stream|]
|
||||
subscriptionQueryPlan
|
||||
(pure GranularMetricsOff)
|
||||
(const (pure ()))
|
||||
|
||||
it "concurrently adding two subscribers should retain both of them in the poller map" $ do
|
||||
@ -403,7 +406,7 @@ streamingSubscriptionPollingSpec srcConfig = do
|
||||
|
||||
streamQueryMapEntries <- STM.atomically $ ListT.toList $ STMMap.listT streamQueryMap
|
||||
length streamQueryMapEntries `shouldBe` 1
|
||||
let (pollerKey, (Poller currentCohortMap _ ioState)) = head streamQueryMapEntries
|
||||
let (pollerKey, (Poller currentCohortMap _ ioState _ _)) = head streamQueryMapEntries
|
||||
cohorts <- STM.atomically $ TMap.toList currentCohortMap
|
||||
length cohorts `shouldBe` 1
|
||||
let (_cohortKey, Cohort _ _ curSubsTV newSubsTV _) = head cohorts
|
||||
|
Loading…
Reference in New Issue
Block a user