From c68f6c7ba18239c9b46549bd33f6eb2a3a08332e Mon Sep 17 00:00:00 2001 From: Krushan Bauva Date: Fri, 7 Jul 2023 12:46:05 +0530 Subject: [PATCH] server: extend `livequery-poller-log` to log errors PR-URL: https://github.com/hasura/graphql-engine-mono/pull/9708 GitOrigin-RevId: ba2075f5f62bd56805fde0a8f02803b05105d775 --- .../Execute/Subscription/Poll/Common.hs | 50 +++++++++++++------ .../Execute/Subscription/Poll/LiveQuery.hs | 50 +++++++++++++------ .../Subscription/Poll/StreamingQuery.hs | 38 ++++++++++---- 3 files changed, 98 insertions(+), 40 deletions(-) diff --git a/server/src-lib/Hasura/GraphQL/Execute/Subscription/Poll/Common.hs b/server/src-lib/Hasura/GraphQL/Execute/Subscription/Poll/Common.hs index 59d00b69e97..84eb041b969 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/Subscription/Poll/Common.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/Subscription/Poll/Common.hs @@ -8,6 +8,7 @@ module Hasura.GraphQL.Execute.Subscription.Poll.Common BackendPollerKey (..), PollerMap, dumpPollerMap, + PollDetailsError (..), PollDetails (..), BatchExecutionDetails (..), CohortExecutionDetails (..), @@ -53,9 +54,11 @@ import Control.Immortal qualified as Immortal import Crypto.Hash qualified as CH import Data.Aeson qualified as J import Data.ByteString qualified as BS +import Data.Text (unpack) import Data.Time.Clock qualified as Clock import Data.UUID qualified as UUID import Data.UUID.V4 qualified as UUID +import Hasura.Base.Error (QErr, showQErr) import Hasura.GraphQL.Execute.Subscription.Options import Hasura.GraphQL.Execute.Subscription.Plan import Hasura.GraphQL.Execute.Subscription.TMap qualified as TMap @@ -384,6 +387,22 @@ batchExecutionDetailMinimal BatchExecutionDetails {..} = <> batchRespSize ) +data PollDetailsError = PollDetailsError + { _pdeBatchId :: BatchId, + _pdeErrorDetails :: QErr + } + deriving (Eq) + +instance Show PollDetailsError where + show pde = "batch_id = " ++ show (_pdeBatchId pde) ++ ", detail = " ++ unpack (showQErr $ _pdeErrorDetails pde) + +instance J.ToJSON PollDetailsError where + toJSON PollDetailsError {..} = + J.object + $ [ "batch_id" J..= _pdeBatchId, + "detail" J..= _pdeErrorDetails + ] + -- TODO consider refactoring into two types: one that is returned from pollLiveQuery and pollStreamingQuery, and a parent type containing pollerId, sourceName, and so on, which is assembled at the callsites of those two functions. Move postPollHook out of those functions to callsites data PollDetails = PollDetails { -- | the unique ID (basically a thread that run as a 'Poller') for the @@ -405,7 +424,9 @@ data PollDetails = PollDetails _pdLiveQueryOptions :: SubscriptionsOptions, _pdSource :: SourceName, _pdRole :: RoleName, - _pdParameterizedQueryHash :: ParameterizedQueryHash + _pdParameterizedQueryHash :: ParameterizedQueryHash, + _pdLogLevel :: L.LogLevel, + _pdErrors :: Maybe [PollDetailsError] } deriving (Show, Eq) @@ -423,23 +444,24 @@ they need to. pollDetailMinimal :: PollDetails -> J.Value pollDetailMinimal PollDetails {..} = J.object - [ "poller_id" J..= _pdPollerId, - "kind" J..= _pdKind, - "snapshot_time" J..= _pdSnapshotTime, - "batches" J..= batches, -- TODO: deprecate this field - "execution_batches" J..= batches, - "subscriber_count" J..= sum (map (length . _bedCohorts) _pdBatches), - "total_time" J..= _pdTotalTime, - "source" J..= _pdSource, - "generated_sql" J..= _pdGeneratedSql, - "role" J..= _pdRole, - "subscription_options" J..= _pdLiveQueryOptions - ] + $ [ "poller_id" J..= _pdPollerId, + "kind" J..= _pdKind, + "snapshot_time" J..= _pdSnapshotTime, + "batches" J..= batches, -- TODO: deprecate this field + "execution_batches" J..= batches, + "subscriber_count" J..= sum (map (length . _bedCohorts) _pdBatches), + "total_time" J..= _pdTotalTime, + "source" J..= _pdSource, + "generated_sql" J..= _pdGeneratedSql, + "role" J..= _pdRole, + "subscription_options" J..= _pdLiveQueryOptions + ] + <> maybe [] (\err -> ["errors" J..= err]) _pdErrors where batches = map batchExecutionDetailMinimal _pdBatches instance L.ToEngineLog PollDetails L.Hasura where - toEngineLog pl = (L.LevelInfo, L.ELTLivequeryPollerLog, pollDetailMinimal pl) + toEngineLog pl = (_pdLogLevel pl, L.ELTLivequeryPollerLog, pollDetailMinimal pl) type SubscriptionPostPollHook = PollDetails -> IO () diff --git a/server/src-lib/Hasura/GraphQL/Execute/Subscription/Poll/LiveQuery.hs b/server/src-lib/Hasura/GraphQL/Execute/Subscription/Poll/LiveQuery.hs index da53b9fe08c..1b31f48ea07 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/Subscription/Poll/LiveQuery.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/Subscription/Poll/LiveQuery.hs @@ -28,6 +28,7 @@ import Hasura.GraphQL.Execute.Subscription.Types import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash) import Hasura.GraphQL.Transport.Backend import Hasura.GraphQL.Transport.HTTP.Protocol +import Hasura.Logging (LogLevel (..)) import Hasura.Prelude import Hasura.RQL.Types.Backend import Hasura.RQL.Types.BackendTag (backendTag, reify) @@ -97,7 +98,7 @@ pollLiveQuery :: IO () pollLiveQuery pollerId pollerResponseState lqOpts (sourceName, sourceConfig) roleName parameterizedQueryHash query cohortMap postPollHook prometheusMetrics granularPrometheusMetricsState operationNamesMap' resolvedConnectionTemplate modifier = do operationNamesMap <- STM.atomically $ TMap.getMap operationNamesMap' - (totalTime, (snapshotTime, batchesDetails)) <- withElapsedTime $ do + (totalTime, (snapshotTime, (batchesDetails, maybeErrors))) <- withElapsedTime $ do -- snapshot the current cohorts and split them into batches (snapshotTime, cohortBatches) <- withElapsedTime $ do -- get a snapshot of all the cohorts @@ -110,7 +111,7 @@ pollLiveQuery pollerId pollerResponseState lqOpts (sourceName, sourceConfig) rol pure $ zip (BatchId <$> [1 ..]) cohortBatches -- concurrently process each batch - batchesDetails <- A.forConcurrently cohortBatches $ \(batchId, cohorts) -> do + batchesDetailsWithMaybeError <- A.forConcurrently cohortBatches $ \(batchId, cohorts) -> do (queryExecutionTime, mxRes) <- runDBSubscription @b sourceConfig query (over (each . _2) C._csVariables cohorts) resolvedConnectionTemplate let dbExecTimeMetric = submDBExecTotalTime $ pmSubscriptionMetrics $ prometheusMetrics @@ -124,15 +125,22 @@ pollLiveQuery pollerId pollerResponseState lqOpts (sourceName, sourceConfig) rol previousPollerResponseState <- STM.readTVarIO pollerResponseState - case mxRes of - Left _ -> do + maybeError <- case mxRes of + Left err -> do when (previousPollerResponseState == PRSSuccess) $ do Prometheus.Gauge.inc $ submActiveLiveQueryPollersInError $ pmSubscriptionMetrics prometheusMetrics STM.atomically $ STM.writeTVar pollerResponseState PRSError + let pollDetailsError = + PollDetailsError + { _pdeBatchId = batchId, + _pdeErrorDetails = err + } + return $ Just pollDetailsError Right _ -> do when (previousPollerResponseState == PRSError) $ do Prometheus.Gauge.dec $ submActiveLiveQueryPollersInError $ pmSubscriptionMetrics prometheusMetrics STM.atomically $ STM.writeTVar pollerResponseState PRSSuccess + return Nothing let lqMeta = SubscriptionMetadata $ convertDuration queryExecutionTime operations = getCohortOperations cohorts mxRes @@ -162,17 +170,17 @@ pollLiveQuery pollerId pollerResponseState lqOpts (sourceName, sourceConfig) rol let pgExecutionTime = case reify (backendTag @b) of Postgres Vanilla -> Just queryExecutionTime _ -> Nothing - pure - $ BatchExecutionDetails - pgExecutionTime - queryExecutionTime - pushTime - batchId - cohortsExecutionDetails - batchResponseSize - pure (snapshotTime, batchesDetails) - - let pollDetails = + batchExecDetails = + BatchExecutionDetails + pgExecutionTime + queryExecutionTime + pushTime + batchId + cohortsExecutionDetails + batchResponseSize + pure $ (batchExecDetails, maybeError) + pure (snapshotTime, unzip batchesDetailsWithMaybeError) + let initPollDetails = PollDetails { _pdPollerId = pollerId, _pdKind = LiveQuery, @@ -183,8 +191,18 @@ pollLiveQuery pollerId pollerResponseState lqOpts (sourceName, sourceConfig) rol _pdTotalTime = totalTime, _pdSource = sourceName, _pdRole = roleName, - _pdParameterizedQueryHash = parameterizedQueryHash + _pdParameterizedQueryHash = parameterizedQueryHash, + _pdLogLevel = LevelInfo, + _pdErrors = Nothing } + maybePollDetailsErrors = sequenceA maybeErrors + pollDetails = case maybePollDetailsErrors of + Nothing -> initPollDetails + Just pollDetailsErrors -> + initPollDetails + { _pdLogLevel = LevelError, + _pdErrors = Just pollDetailsErrors + } postPollHook pollDetails let totalTimeMetric = submTotalTime $ pmSubscriptionMetrics $ prometheusMetrics recordSubcriptionMetric diff --git a/server/src-lib/Hasura/GraphQL/Execute/Subscription/Poll/StreamingQuery.hs b/server/src-lib/Hasura/GraphQL/Execute/Subscription/Poll/StreamingQuery.hs index bb771598896..229e469450d 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/Subscription/Poll/StreamingQuery.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/Subscription/Poll/StreamingQuery.hs @@ -30,6 +30,7 @@ import Hasura.GraphQL.Execute.Subscription.Types import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash) import Hasura.GraphQL.Transport.Backend import Hasura.GraphQL.Transport.HTTP.Protocol +import Hasura.Logging (LogLevel (..)) import Hasura.Prelude import Hasura.RQL.Types.Backend import Hasura.RQL.Types.BackendTag (backendTag, reify) @@ -259,7 +260,7 @@ pollStreamingQuery :: IO () pollStreamingQuery pollerId pollerResponseState streamingQueryOpts (sourceName, sourceConfig) roleName parameterizedQueryHash query cohortMap rootFieldName postPollHook testActionMaybe prometheusMetrics granularPrometheusMetricsState operationNames' resolvedConnectionTemplate modifier = do operationNames <- STM.atomically $ TMap.getMap operationNames' - (totalTime, (snapshotTime, batchesDetailsAndProcessedCohorts)) <- withElapsedTime $ do + (totalTime, (snapshotTime, (batchesDetails, processedCohorts, maybeErrors))) <- withElapsedTime $ do -- snapshot the current cohorts and split them into batches -- This STM transaction is a read only transaction i.e. it doesn't mutate any state (snapshotTime, cohortBatches) <- withElapsedTime $ do @@ -275,7 +276,7 @@ pollStreamingQuery pollerId pollerResponseState streamingQueryOpts (sourceName, for_ testActionMaybe id -- IO action intended to run after the cohorts have been snapshotted -- concurrently process each batch and also get the processed cohort with the new updated cohort key - batchesDetailsAndProcessedCohorts <- A.forConcurrently cohortBatches $ \(batchId, cohorts) -> do + batchesDetailsAndProcessedCohortsWithMaybeError <- A.forConcurrently cohortBatches $ \(batchId, cohorts) -> do (queryExecutionTime, mxRes) <- runDBStreamingSubscription @b sourceConfig @@ -293,15 +294,22 @@ pollStreamingQuery pollerId pollerResponseState streamingQueryOpts (sourceName, previousPollerResponseState <- STM.readTVarIO pollerResponseState - case mxRes of - Left _ -> do + maybeError <- case mxRes of + Left err -> do when (previousPollerResponseState == PRSSuccess) $ do Prometheus.Gauge.inc $ submActiveStreamingPollersInError $ pmSubscriptionMetrics prometheusMetrics STM.atomically $ STM.writeTVar pollerResponseState PRSError + let pollDetailsError = + PollDetailsError + { _pdeBatchId = batchId, + _pdeErrorDetails = err + } + return $ Just pollDetailsError Right _ -> do when (previousPollerResponseState == PRSError) $ do Prometheus.Gauge.dec $ submActiveStreamingPollersInError $ pmSubscriptionMetrics prometheusMetrics STM.atomically $ STM.writeTVar pollerResponseState PRSSuccess + return Nothing let subscriptionMeta = SubscriptionMetadata $ convertDuration queryExecutionTime operations = getCohortOperations cohorts mxRes @@ -351,23 +359,33 @@ pollStreamingQuery pollerId pollerResponseState streamingQueryOpts (sourceName, batchId (fst <$> cohortsExecutionDetails) batchResponseSize - pure $ (batchExecDetails, processedCohortBatch) + pure $ (batchExecDetails, processedCohortBatch, maybeError) - pure (snapshotTime, batchesDetailsAndProcessedCohorts) + pure (snapshotTime, unzip3 batchesDetailsAndProcessedCohortsWithMaybeError) - let pollDetails = + let initPollDetails = PollDetails { _pdPollerId = pollerId, _pdKind = Streaming, _pdGeneratedSql = toTxt query, _pdSnapshotTime = snapshotTime, - _pdBatches = fst <$> batchesDetailsAndProcessedCohorts, + _pdBatches = batchesDetails, _pdLiveQueryOptions = streamingQueryOpts, _pdTotalTime = totalTime, _pdSource = sourceName, _pdRole = roleName, - _pdParameterizedQueryHash = parameterizedQueryHash + _pdParameterizedQueryHash = parameterizedQueryHash, + _pdLogLevel = LevelInfo, + _pdErrors = Nothing } + maybePollDetailsErrors = sequenceA maybeErrors + pollDetails = case maybePollDetailsErrors of + Nothing -> initPollDetails + Just pollDetailsError -> + initPollDetails + { _pdLogLevel = LevelError, + _pdErrors = Just pollDetailsError + } STM.atomically $ do -- constructing a cohort map for all the cohorts that have been @@ -375,7 +393,7 @@ pollStreamingQuery pollerId pollerResponseState streamingQueryOpts (sourceName, -- processed cohorts is an array of tuples of the current poll cohort variables and a tuple -- of the cohort and the new cohort key - let processedCohortsMap = HashMap.fromList $ snd =<< batchesDetailsAndProcessedCohorts + let processedCohortsMap = HashMap.fromList $ concat processedCohorts -- rebuilding the cohorts and the cohort map, see [Streaming subscription polling] -- and [Streaming subscriptions rebuilding cohort map]