server: extend livequery-poller-log to log errors

PR-URL: https://github.com/hasura/graphql-engine-mono/pull/9708
GitOrigin-RevId: ba2075f5f62bd56805fde0a8f02803b05105d775
This commit is contained in:
Krushan Bauva 2023-07-07 12:46:05 +05:30 committed by hasura-bot
parent 1d8d934157
commit c68f6c7ba1
3 changed files with 98 additions and 40 deletions

View File

@ -8,6 +8,7 @@ module Hasura.GraphQL.Execute.Subscription.Poll.Common
BackendPollerKey (..),
PollerMap,
dumpPollerMap,
PollDetailsError (..),
PollDetails (..),
BatchExecutionDetails (..),
CohortExecutionDetails (..),
@ -53,9 +54,11 @@ import Control.Immortal qualified as Immortal
import Crypto.Hash qualified as CH
import Data.Aeson qualified as J
import Data.ByteString qualified as BS
import Data.Text (unpack)
import Data.Time.Clock qualified as Clock
import Data.UUID qualified as UUID
import Data.UUID.V4 qualified as UUID
import Hasura.Base.Error (QErr, showQErr)
import Hasura.GraphQL.Execute.Subscription.Options
import Hasura.GraphQL.Execute.Subscription.Plan
import Hasura.GraphQL.Execute.Subscription.TMap qualified as TMap
@ -384,6 +387,22 @@ batchExecutionDetailMinimal BatchExecutionDetails {..} =
<> batchRespSize
)
data PollDetailsError = PollDetailsError
{ _pdeBatchId :: BatchId,
_pdeErrorDetails :: QErr
}
deriving (Eq)
instance Show PollDetailsError where
show pde = "batch_id = " ++ show (_pdeBatchId pde) ++ ", detail = " ++ unpack (showQErr $ _pdeErrorDetails pde)
instance J.ToJSON PollDetailsError where
toJSON PollDetailsError {..} =
J.object
$ [ "batch_id" J..= _pdeBatchId,
"detail" J..= _pdeErrorDetails
]
-- TODO consider refactoring into two types: one that is returned from pollLiveQuery and pollStreamingQuery, and a parent type containing pollerId, sourceName, and so on, which is assembled at the callsites of those two functions. Move postPollHook out of those functions to callsites
data PollDetails = PollDetails
{ -- | the unique ID (basically a thread that run as a 'Poller') for the
@ -405,7 +424,9 @@ data PollDetails = PollDetails
_pdLiveQueryOptions :: SubscriptionsOptions,
_pdSource :: SourceName,
_pdRole :: RoleName,
_pdParameterizedQueryHash :: ParameterizedQueryHash
_pdParameterizedQueryHash :: ParameterizedQueryHash,
_pdLogLevel :: L.LogLevel,
_pdErrors :: Maybe [PollDetailsError]
}
deriving (Show, Eq)
@ -423,7 +444,7 @@ they need to.
pollDetailMinimal :: PollDetails -> J.Value
pollDetailMinimal PollDetails {..} =
J.object
[ "poller_id" J..= _pdPollerId,
$ [ "poller_id" J..= _pdPollerId,
"kind" J..= _pdKind,
"snapshot_time" J..= _pdSnapshotTime,
"batches" J..= batches, -- TODO: deprecate this field
@ -435,11 +456,12 @@ pollDetailMinimal PollDetails {..} =
"role" J..= _pdRole,
"subscription_options" J..= _pdLiveQueryOptions
]
<> maybe [] (\err -> ["errors" J..= err]) _pdErrors
where
batches = map batchExecutionDetailMinimal _pdBatches
instance L.ToEngineLog PollDetails L.Hasura where
toEngineLog pl = (L.LevelInfo, L.ELTLivequeryPollerLog, pollDetailMinimal pl)
toEngineLog pl = (_pdLogLevel pl, L.ELTLivequeryPollerLog, pollDetailMinimal pl)
type SubscriptionPostPollHook = PollDetails -> IO ()

View File

@ -28,6 +28,7 @@ import Hasura.GraphQL.Execute.Subscription.Types
import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash)
import Hasura.GraphQL.Transport.Backend
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.Logging (LogLevel (..))
import Hasura.Prelude
import Hasura.RQL.Types.Backend
import Hasura.RQL.Types.BackendTag (backendTag, reify)
@ -97,7 +98,7 @@ pollLiveQuery ::
IO ()
pollLiveQuery pollerId pollerResponseState lqOpts (sourceName, sourceConfig) roleName parameterizedQueryHash query cohortMap postPollHook prometheusMetrics granularPrometheusMetricsState operationNamesMap' resolvedConnectionTemplate modifier = do
operationNamesMap <- STM.atomically $ TMap.getMap operationNamesMap'
(totalTime, (snapshotTime, batchesDetails)) <- withElapsedTime $ do
(totalTime, (snapshotTime, (batchesDetails, maybeErrors))) <- withElapsedTime $ do
-- snapshot the current cohorts and split them into batches
(snapshotTime, cohortBatches) <- withElapsedTime $ do
-- get a snapshot of all the cohorts
@ -110,7 +111,7 @@ pollLiveQuery pollerId pollerResponseState lqOpts (sourceName, sourceConfig) rol
pure $ zip (BatchId <$> [1 ..]) cohortBatches
-- concurrently process each batch
batchesDetails <- A.forConcurrently cohortBatches $ \(batchId, cohorts) -> do
batchesDetailsWithMaybeError <- A.forConcurrently cohortBatches $ \(batchId, cohorts) -> do
(queryExecutionTime, mxRes) <- runDBSubscription @b sourceConfig query (over (each . _2) C._csVariables cohorts) resolvedConnectionTemplate
let dbExecTimeMetric = submDBExecTotalTime $ pmSubscriptionMetrics $ prometheusMetrics
@ -124,15 +125,22 @@ pollLiveQuery pollerId pollerResponseState lqOpts (sourceName, sourceConfig) rol
previousPollerResponseState <- STM.readTVarIO pollerResponseState
case mxRes of
Left _ -> do
maybeError <- case mxRes of
Left err -> do
when (previousPollerResponseState == PRSSuccess) $ do
Prometheus.Gauge.inc $ submActiveLiveQueryPollersInError $ pmSubscriptionMetrics prometheusMetrics
STM.atomically $ STM.writeTVar pollerResponseState PRSError
let pollDetailsError =
PollDetailsError
{ _pdeBatchId = batchId,
_pdeErrorDetails = err
}
return $ Just pollDetailsError
Right _ -> do
when (previousPollerResponseState == PRSError) $ do
Prometheus.Gauge.dec $ submActiveLiveQueryPollersInError $ pmSubscriptionMetrics prometheusMetrics
STM.atomically $ STM.writeTVar pollerResponseState PRSSuccess
return Nothing
let lqMeta = SubscriptionMetadata $ convertDuration queryExecutionTime
operations = getCohortOperations cohorts mxRes
@ -162,17 +170,17 @@ pollLiveQuery pollerId pollerResponseState lqOpts (sourceName, sourceConfig) rol
let pgExecutionTime = case reify (backendTag @b) of
Postgres Vanilla -> Just queryExecutionTime
_ -> Nothing
pure
$ BatchExecutionDetails
batchExecDetails =
BatchExecutionDetails
pgExecutionTime
queryExecutionTime
pushTime
batchId
cohortsExecutionDetails
batchResponseSize
pure (snapshotTime, batchesDetails)
let pollDetails =
pure $ (batchExecDetails, maybeError)
pure (snapshotTime, unzip batchesDetailsWithMaybeError)
let initPollDetails =
PollDetails
{ _pdPollerId = pollerId,
_pdKind = LiveQuery,
@ -183,7 +191,17 @@ pollLiveQuery pollerId pollerResponseState lqOpts (sourceName, sourceConfig) rol
_pdTotalTime = totalTime,
_pdSource = sourceName,
_pdRole = roleName,
_pdParameterizedQueryHash = parameterizedQueryHash
_pdParameterizedQueryHash = parameterizedQueryHash,
_pdLogLevel = LevelInfo,
_pdErrors = Nothing
}
maybePollDetailsErrors = sequenceA maybeErrors
pollDetails = case maybePollDetailsErrors of
Nothing -> initPollDetails
Just pollDetailsErrors ->
initPollDetails
{ _pdLogLevel = LevelError,
_pdErrors = Just pollDetailsErrors
}
postPollHook pollDetails
let totalTimeMetric = submTotalTime $ pmSubscriptionMetrics $ prometheusMetrics

View File

@ -30,6 +30,7 @@ import Hasura.GraphQL.Execute.Subscription.Types
import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash)
import Hasura.GraphQL.Transport.Backend
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.Logging (LogLevel (..))
import Hasura.Prelude
import Hasura.RQL.Types.Backend
import Hasura.RQL.Types.BackendTag (backendTag, reify)
@ -259,7 +260,7 @@ pollStreamingQuery ::
IO ()
pollStreamingQuery pollerId pollerResponseState streamingQueryOpts (sourceName, sourceConfig) roleName parameterizedQueryHash query cohortMap rootFieldName postPollHook testActionMaybe prometheusMetrics granularPrometheusMetricsState operationNames' resolvedConnectionTemplate modifier = do
operationNames <- STM.atomically $ TMap.getMap operationNames'
(totalTime, (snapshotTime, batchesDetailsAndProcessedCohorts)) <- withElapsedTime $ do
(totalTime, (snapshotTime, (batchesDetails, processedCohorts, maybeErrors))) <- withElapsedTime $ do
-- snapshot the current cohorts and split them into batches
-- This STM transaction is a read only transaction i.e. it doesn't mutate any state
(snapshotTime, cohortBatches) <- withElapsedTime $ do
@ -275,7 +276,7 @@ pollStreamingQuery pollerId pollerResponseState streamingQueryOpts (sourceName,
for_ testActionMaybe id -- IO action intended to run after the cohorts have been snapshotted
-- concurrently process each batch and also get the processed cohort with the new updated cohort key
batchesDetailsAndProcessedCohorts <- A.forConcurrently cohortBatches $ \(batchId, cohorts) -> do
batchesDetailsAndProcessedCohortsWithMaybeError <- A.forConcurrently cohortBatches $ \(batchId, cohorts) -> do
(queryExecutionTime, mxRes) <-
runDBStreamingSubscription @b
sourceConfig
@ -293,15 +294,22 @@ pollStreamingQuery pollerId pollerResponseState streamingQueryOpts (sourceName,
previousPollerResponseState <- STM.readTVarIO pollerResponseState
case mxRes of
Left _ -> do
maybeError <- case mxRes of
Left err -> do
when (previousPollerResponseState == PRSSuccess) $ do
Prometheus.Gauge.inc $ submActiveStreamingPollersInError $ pmSubscriptionMetrics prometheusMetrics
STM.atomically $ STM.writeTVar pollerResponseState PRSError
let pollDetailsError =
PollDetailsError
{ _pdeBatchId = batchId,
_pdeErrorDetails = err
}
return $ Just pollDetailsError
Right _ -> do
when (previousPollerResponseState == PRSError) $ do
Prometheus.Gauge.dec $ submActiveStreamingPollersInError $ pmSubscriptionMetrics prometheusMetrics
STM.atomically $ STM.writeTVar pollerResponseState PRSSuccess
return Nothing
let subscriptionMeta = SubscriptionMetadata $ convertDuration queryExecutionTime
operations = getCohortOperations cohorts mxRes
@ -351,22 +359,32 @@ pollStreamingQuery pollerId pollerResponseState streamingQueryOpts (sourceName,
batchId
(fst <$> cohortsExecutionDetails)
batchResponseSize
pure $ (batchExecDetails, processedCohortBatch)
pure $ (batchExecDetails, processedCohortBatch, maybeError)
pure (snapshotTime, batchesDetailsAndProcessedCohorts)
pure (snapshotTime, unzip3 batchesDetailsAndProcessedCohortsWithMaybeError)
let pollDetails =
let initPollDetails =
PollDetails
{ _pdPollerId = pollerId,
_pdKind = Streaming,
_pdGeneratedSql = toTxt query,
_pdSnapshotTime = snapshotTime,
_pdBatches = fst <$> batchesDetailsAndProcessedCohorts,
_pdBatches = batchesDetails,
_pdLiveQueryOptions = streamingQueryOpts,
_pdTotalTime = totalTime,
_pdSource = sourceName,
_pdRole = roleName,
_pdParameterizedQueryHash = parameterizedQueryHash
_pdParameterizedQueryHash = parameterizedQueryHash,
_pdLogLevel = LevelInfo,
_pdErrors = Nothing
}
maybePollDetailsErrors = sequenceA maybeErrors
pollDetails = case maybePollDetailsErrors of
Nothing -> initPollDetails
Just pollDetailsError ->
initPollDetails
{ _pdLogLevel = LevelError,
_pdErrors = Just pollDetailsError
}
STM.atomically $ do
@ -375,7 +393,7 @@ pollStreamingQuery pollerId pollerResponseState streamingQueryOpts (sourceName,
-- processed cohorts is an array of tuples of the current poll cohort variables and a tuple
-- of the cohort and the new cohort key
let processedCohortsMap = HashMap.fromList $ snd =<< batchesDetailsAndProcessedCohorts
let processedCohortsMap = HashMap.fromList $ concat processedCohorts
-- rebuilding the cohorts and the cohort map, see [Streaming subscription polling]
-- and [Streaming subscriptions rebuilding cohort map]