mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-14 08:02:15 +03:00
server: add new metrics for scheduled triggers
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/8401 Co-authored-by: Rob Dominguez <24390149+robertjdominguez@users.noreply.github.com> GitOrigin-RevId: 0cdec9e7b5e9251bf7c8b710b7552d065d62e195
This commit is contained in:
parent
e40e89d2d7
commit
e317c1a53f
@ -146,6 +146,46 @@ curl 'http://127.0.0.1:8080/v1/metrics' -H 'Authorization: Bearer <secret>'
|
||||
</td>
|
||||
<td>Compare this to <a href="/latest/api-reference/syntax-defs/#pgpoolsettings">pool settings</a>.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td><code>hasura_cron_events_invocation_total</code></td>
|
||||
<td>Total number of cron events invoked</td>
|
||||
<td>Counter</td>
|
||||
<td>• "status": success|failed<br /></td>
|
||||
<td>Total number of invocations made for cron events.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td><code>hasura_cron_events_processed_total</code></td>
|
||||
<td>Total number of cron events processed</td>
|
||||
<td>Counter</td>
|
||||
<td>• "status": success|failed<br /></td>
|
||||
<td>
|
||||
Compare this to <code>hasura_cron_events_invocation_total</code>. A high difference between the two metrics
|
||||
indicates high failure rate of the cron webhook.
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td><code>hasura_oneoff_events_invocation_total</code></td>
|
||||
<td>Total number of one-off events invoked</td>
|
||||
<td>Counter</td>
|
||||
<td>• "status": success|failed<br /></td>
|
||||
<td>Total number of invocations made for one-off events.</td>
|
||||
</tr>
|
||||
|
||||
<tr>
|
||||
<td>
|
||||
<code>hasura_oneoff_events_processed_total</code>
|
||||
</td>
|
||||
<td>Total number of one-off events processed</td>
|
||||
<td>Counter</td>
|
||||
<td>
|
||||
• "status": success|failed
|
||||
<br />
|
||||
</td>
|
||||
<td>
|
||||
Compare this to <code>hasura_oneoff_events_invocation_total</code>. A high difference between the two metrics
|
||||
indicates high failure rate of the one-off webhook.
|
||||
</td>
|
||||
</tr>
|
||||
|
||||
</table>
|
||||
|
||||
|
@ -1323,7 +1323,7 @@ mkHGEServer setupHook appStateRef ekgStore = do
|
||||
logger
|
||||
scheduledEventsStatsLogger
|
||||
appEnvManager
|
||||
appEnvPrometheusMetrics
|
||||
(pmScheduledTriggerMetrics appEnvPrometheusMetrics)
|
||||
(getSchemaCache appStateRef)
|
||||
lockedEventsCtx
|
||||
|
||||
|
@ -160,7 +160,7 @@ import Hasura.RQL.Types.Eventing
|
||||
import Hasura.RQL.Types.ScheduledTrigger
|
||||
import Hasura.RQL.Types.SchemaCache
|
||||
import Hasura.SQL.Types
|
||||
import Hasura.Server.Prometheus (PrometheusMetrics (..))
|
||||
import Hasura.Server.Prometheus (ScheduledTriggerMetrics (..))
|
||||
import Hasura.Tracing qualified as Tracing
|
||||
import Network.HTTP.Client.Transformable qualified as HTTP
|
||||
import Refined (unrefine)
|
||||
@ -246,12 +246,12 @@ processCronEvents ::
|
||||
) =>
|
||||
L.Logger L.Hasura ->
|
||||
HTTP.Manager ->
|
||||
PrometheusMetrics ->
|
||||
ScheduledTriggerMetrics ->
|
||||
[CronEvent] ->
|
||||
IO SchemaCache ->
|
||||
TVar (Set.Set CronEventId) ->
|
||||
m ()
|
||||
processCronEvents logger httpMgr prometheusMetrics cronEvents getSC lockedCronEvents = do
|
||||
processCronEvents logger httpMgr scheduledTriggerMetrics cronEvents getSC lockedCronEvents = do
|
||||
cronTriggersInfo <- scCronTriggers <$> liftIO getSC
|
||||
-- save the locked cron events that have been fetched from the
|
||||
-- database, the events stored here will be unlocked in case a
|
||||
@ -281,7 +281,7 @@ processCronEvents logger httpMgr prometheusMetrics cronEvents getSC lockedCronEv
|
||||
runExceptT $
|
||||
flip runReaderT (logger, httpMgr) $
|
||||
processScheduledEvent
|
||||
prometheusMetrics
|
||||
scheduledTriggerMetrics
|
||||
id'
|
||||
ctiHeaders
|
||||
retryCtx
|
||||
@ -295,7 +295,7 @@ processCronEvents logger httpMgr prometheusMetrics cronEvents getSC lockedCronEv
|
||||
let eventTimeoutMessage = "Cron Scheduled event " <> id' <<> " of cron trigger " <> name <<> " timed out while processing."
|
||||
eventTimeoutError = err500 TimeOut eventTimeoutMessage
|
||||
logInternalError eventTimeoutError
|
||||
runExceptT (processError id' retryCtx [] Cron (mkErrorObject eventTimeoutMessage) (HOther $ T.unpack eventTimeoutMessage))
|
||||
runExceptT (processError id' retryCtx [] Cron (mkErrorObject eventTimeoutMessage) (HOther $ T.unpack eventTimeoutMessage) scheduledTriggerMetrics)
|
||||
>>= (`onLeft` logInternalError)
|
||||
Just finally -> onLeft finally logInternalError
|
||||
removeEventFromLockedEvents id' lockedCronEvents
|
||||
@ -315,7 +315,7 @@ processOneOffScheduledEvents ::
|
||||
Env.Environment ->
|
||||
L.Logger L.Hasura ->
|
||||
HTTP.Manager ->
|
||||
PrometheusMetrics ->
|
||||
ScheduledTriggerMetrics ->
|
||||
[OneOffScheduledEvent] ->
|
||||
TVar (Set.Set OneOffScheduledEventId) ->
|
||||
m ()
|
||||
@ -323,7 +323,7 @@ processOneOffScheduledEvents
|
||||
env
|
||||
logger
|
||||
httpMgr
|
||||
prometheusMetrics
|
||||
scheduledTriggerMetrics
|
||||
oneOffEvents
|
||||
lockedOneOffScheduledEvents = do
|
||||
-- save the locked one-off events that have been fetched from the
|
||||
@ -357,7 +357,7 @@ processOneOffScheduledEvents
|
||||
Right (webhookEnvRecord, eventHeaderInfo) -> do
|
||||
let processScheduledEventAction =
|
||||
flip runReaderT (logger, httpMgr) $
|
||||
processScheduledEvent prometheusMetrics _ooseId eventHeaderInfo retryCtx payload webhookEnvRecord OneOff
|
||||
processScheduledEvent scheduledTriggerMetrics _ooseId eventHeaderInfo retryCtx payload webhookEnvRecord OneOff
|
||||
|
||||
eventTimeout = unrefine $ strcTimeoutSeconds $ _ooseRetryConf
|
||||
|
||||
@ -370,7 +370,7 @@ processOneOffScheduledEvents
|
||||
let eventTimeoutMessage = "One-off Scheduled event " <> _ooseId <<> " timed out while processing."
|
||||
eventTimeoutError = err500 TimeOut eventTimeoutMessage
|
||||
lift $ logInternalError eventTimeoutError
|
||||
processError _ooseId retryCtx [] OneOff (mkErrorObject eventTimeoutMessage) (HOther $ T.unpack eventTimeoutMessage)
|
||||
processError _ooseId retryCtx [] OneOff (mkErrorObject eventTimeoutMessage) (HOther $ T.unpack eventTimeoutMessage) scheduledTriggerMetrics
|
||||
)
|
||||
removeEventFromLockedEvents _ooseId lockedOneOffScheduledEvents
|
||||
Left envVarError ->
|
||||
@ -381,6 +381,7 @@ processOneOffScheduledEvents
|
||||
OneOff
|
||||
(mkErrorObject $ "Error creating the request. " <> (mkInvalidEnvVarErrMsg $ envVarError))
|
||||
(HOther $ T.unpack $ qeError (err400 NotFound (mkInvalidEnvVarErrMsg envVarError)))
|
||||
scheduledTriggerMetrics
|
||||
where
|
||||
logInternalError err = liftIO . L.unLogger logger $ ScheduledTriggerInternalErr err
|
||||
getTemplateFromUrl url = printURLTemplate $ unInputWebhook url
|
||||
@ -403,11 +404,11 @@ processScheduledTriggers ::
|
||||
L.Logger L.Hasura ->
|
||||
FetchedScheduledEventsStatsLogger ->
|
||||
HTTP.Manager ->
|
||||
PrometheusMetrics ->
|
||||
ScheduledTriggerMetrics ->
|
||||
IO SchemaCache ->
|
||||
LockedEventsCtx ->
|
||||
m (Forever m)
|
||||
processScheduledTriggers env logger statsLogger httpMgr prometheusMetrics getSC LockedEventsCtx {..} = do
|
||||
processScheduledTriggers env logger statsLogger httpMgr scheduledTriggerMetrics getSC LockedEventsCtx {..} = do
|
||||
return $
|
||||
Forever () $
|
||||
const do
|
||||
@ -415,8 +416,8 @@ processScheduledTriggers env logger statsLogger httpMgr prometheusMetrics getSC
|
||||
Left e -> logInternalError e
|
||||
Right (cronEvents, oneOffEvents) -> do
|
||||
logFetchedScheduledEventsStats statsLogger (CronEventsCount $ length cronEvents) (OneOffScheduledEventsCount $ length oneOffEvents)
|
||||
processCronEvents logger httpMgr prometheusMetrics cronEvents getSC leCronEvents
|
||||
processOneOffScheduledEvents env logger httpMgr prometheusMetrics oneOffEvents leOneOffEvents
|
||||
processCronEvents logger httpMgr scheduledTriggerMetrics cronEvents getSC leCronEvents
|
||||
processOneOffScheduledEvents env logger httpMgr scheduledTriggerMetrics oneOffEvents leOneOffEvents
|
||||
-- NOTE: cron events are scheduled at times with minute resolution (as on
|
||||
-- unix), while one-off events can be set for arbitrary times. The sleep
|
||||
-- time here determines how overdue a scheduled event (cron or one-off)
|
||||
@ -434,7 +435,7 @@ processScheduledEvent ::
|
||||
MonadMetadataStorage m,
|
||||
MonadError QErr m
|
||||
) =>
|
||||
PrometheusMetrics ->
|
||||
ScheduledTriggerMetrics ->
|
||||
ScheduledEventId ->
|
||||
[EventHeaderInfo] ->
|
||||
RetryContext ->
|
||||
@ -442,7 +443,7 @@ processScheduledEvent ::
|
||||
EnvRecord ResolvedWebhook ->
|
||||
ScheduledEventType ->
|
||||
m ()
|
||||
processScheduledEvent prometheusMetrics eventId eventHeaders retryCtx payload webhookUrl type' =
|
||||
processScheduledEvent scheduledTriggerMetrics eventId eventHeaders retryCtx payload webhookUrl type' =
|
||||
Tracing.newTrace Tracing.sampleAlways traceNote do
|
||||
currentTime <- liftIO getCurrentTime
|
||||
let retryConf = _rctxConf retryCtx
|
||||
@ -471,20 +472,25 @@ processScheduledEvent prometheusMetrics eventId eventHeaders retryCtx payload we
|
||||
Left _err -> pure ()
|
||||
Right response ->
|
||||
Prometheus.Counter.add
|
||||
(pmScheduledTriggerBytesReceived prometheusMetrics)
|
||||
(stmScheduledTriggerBytesReceived scheduledTriggerMetrics)
|
||||
(hrsSize response)
|
||||
let RequestDetails {_rdOriginalSize, _rdTransformedSize} = d
|
||||
in Prometheus.Counter.add
|
||||
(pmScheduledTriggerBytesSent prometheusMetrics)
|
||||
(stmScheduledTriggerBytesSent scheduledTriggerMetrics)
|
||||
(fromMaybe _rdOriginalSize _rdTransformedSize)
|
||||
case (type', e) of
|
||||
(Cron, Left _err) -> Prometheus.Counter.inc (stmCronEventsInvocationTotalFailure scheduledTriggerMetrics)
|
||||
(Cron, Right _) -> Prometheus.Counter.inc (stmCronEventsInvocationTotalSuccess scheduledTriggerMetrics)
|
||||
(OneOff, Left _err) -> Prometheus.Counter.inc (stmOneOffEventsInvocationTotalFailure scheduledTriggerMetrics)
|
||||
(OneOff, Right _) -> Prometheus.Counter.inc (stmOneOffEventsInvocationTotalSuccess scheduledTriggerMetrics)
|
||||
sessionVars = _rdSessionVars reqDetails
|
||||
resp <- invokeRequest reqDetails responseTransform sessionVars logger
|
||||
pure (request, resp)
|
||||
case eitherReqRes of
|
||||
Right (req, resp) ->
|
||||
let reqBody = fromMaybe J.Null $ preview (HTTP.body . HTTP._RequestBodyLBS) req >>= J.decode @J.Value
|
||||
in processSuccess eventId decodedHeaders type' reqBody resp
|
||||
Left (HTTPError reqBody e) -> processError eventId retryCtx decodedHeaders type' reqBody e
|
||||
in processSuccess eventId decodedHeaders type' reqBody resp scheduledTriggerMetrics
|
||||
Left (HTTPError reqBody e) -> processError eventId retryCtx decodedHeaders type' reqBody e scheduledTriggerMetrics
|
||||
Left (TransformationError _ e) -> do
|
||||
-- Log The Transformation Error
|
||||
logger :: L.Logger L.Hasura <- asks getter
|
||||
@ -506,8 +512,9 @@ processError ::
|
||||
ScheduledEventType ->
|
||||
J.Value ->
|
||||
HTTPErr a ->
|
||||
ScheduledTriggerMetrics ->
|
||||
m ()
|
||||
processError eventId retryCtx decodedHeaders type' reqJson err = do
|
||||
processError eventId retryCtx decodedHeaders type' reqJson err scheduledTriggerMetric = do
|
||||
let invocation = case err of
|
||||
HClient httpException ->
|
||||
let statusMaybe = getHTTPExceptionStatus httpException
|
||||
@ -521,7 +528,7 @@ processError eventId retryCtx decodedHeaders type' reqJson err = do
|
||||
let errMsg = (SB.fromLBS $ J.encode detail)
|
||||
mkInvocation eventId (Just 500) decodedHeaders errMsg [] reqJson
|
||||
liftEitherM $ insertScheduledEventInvocation invocation type'
|
||||
retryOrMarkError eventId retryCtx err type'
|
||||
retryOrMarkError eventId retryCtx err type' scheduledTriggerMetric
|
||||
|
||||
retryOrMarkError ::
|
||||
(MonadIO m, MonadMetadataStorage m, MonadError QErr m) =>
|
||||
@ -529,15 +536,20 @@ retryOrMarkError ::
|
||||
RetryContext ->
|
||||
HTTPErr a ->
|
||||
ScheduledEventType ->
|
||||
ScheduledTriggerMetrics ->
|
||||
m ()
|
||||
retryOrMarkError eventId retryCtx err type' = do
|
||||
retryOrMarkError eventId retryCtx err type' scheduledTriggerMetric = do
|
||||
let RetryContext tries retryConf = retryCtx
|
||||
mRetryHeader = getRetryAfterHeaderFromHTTPErr err
|
||||
mRetryHeaderSeconds = parseRetryHeaderValue =<< mRetryHeader
|
||||
triesExhausted = tries >= strcNumRetries retryConf
|
||||
noRetryHeader = isNothing mRetryHeaderSeconds
|
||||
if triesExhausted && noRetryHeader
|
||||
then liftEitherM $ setScheduledEventOp eventId (SEOpStatus SESError) type'
|
||||
then do
|
||||
liftEitherM $ setScheduledEventOp eventId (SEOpStatus SESError) type'
|
||||
case type' of
|
||||
Cron -> liftIO $ Prometheus.Counter.inc (stmCronEventsProcessedTotalFailure scheduledTriggerMetric)
|
||||
OneOff -> liftIO $ Prometheus.Counter.inc (stmOneOffEventsProcessedTotalFailure scheduledTriggerMetric)
|
||||
else do
|
||||
currentTime <- liftIO getCurrentTime
|
||||
let delay =
|
||||
@ -582,20 +594,24 @@ and it can transition to other states in the following ways:
|
||||
-}
|
||||
|
||||
processSuccess ::
|
||||
(MonadMetadataStorage m, MonadError QErr m) =>
|
||||
(MonadMetadataStorage m, MonadError QErr m, MonadIO m) =>
|
||||
ScheduledEventId ->
|
||||
[HeaderConf] ->
|
||||
ScheduledEventType ->
|
||||
J.Value ->
|
||||
HTTPResp a ->
|
||||
ScheduledTriggerMetrics ->
|
||||
m ()
|
||||
processSuccess eventId decodedHeaders type' reqBodyJson resp = do
|
||||
processSuccess eventId decodedHeaders type' reqBodyJson resp scheduledTriggerMetric = do
|
||||
let respBody = hrsBody resp
|
||||
respHeaders = hrsHeaders resp
|
||||
respStatus = hrsStatus resp
|
||||
invocation = mkInvocation eventId (Just respStatus) decodedHeaders respBody respHeaders reqBodyJson
|
||||
liftEitherM $ insertScheduledEventInvocation invocation type'
|
||||
liftEitherM $ setScheduledEventOp eventId (SEOpStatus SESDelivered) type'
|
||||
case type' of
|
||||
Cron -> liftIO $ Prometheus.Counter.inc (stmCronEventsProcessedTotalSuccess scheduledTriggerMetric)
|
||||
OneOff -> liftIO $ Prometheus.Counter.inc (stmOneOffEventsProcessedTotalSuccess scheduledTriggerMetric)
|
||||
|
||||
processDead ::
|
||||
(MonadMetadataStorage m, MonadError QErr m) =>
|
||||
|
@ -14,6 +14,7 @@ module Hasura.Server.Prometheus
|
||||
decWarpThreads,
|
||||
incWebsocketConnections,
|
||||
decWebsocketConnections,
|
||||
ScheduledTriggerMetrics (..),
|
||||
)
|
||||
where
|
||||
|
||||
@ -39,8 +40,7 @@ data PrometheusMetrics = PrometheusMetrics
|
||||
pmWebSocketBytesSent :: Counter,
|
||||
pmActionBytesReceived :: Counter,
|
||||
pmActionBytesSent :: Counter,
|
||||
pmScheduledTriggerBytesReceived :: Counter,
|
||||
pmScheduledTriggerBytesSent :: Counter
|
||||
pmScheduledTriggerMetrics :: ScheduledTriggerMetrics
|
||||
}
|
||||
|
||||
data GraphQLRequestMetrics = GraphQLRequestMetrics
|
||||
@ -67,6 +67,19 @@ data EventTriggerMetrics = EventTriggerMetrics
|
||||
eventInvocationTotalFailure :: Counter
|
||||
}
|
||||
|
||||
data ScheduledTriggerMetrics = ScheduledTriggerMetrics
|
||||
{ stmScheduledTriggerBytesReceived :: Counter,
|
||||
stmScheduledTriggerBytesSent :: Counter,
|
||||
stmCronEventsInvocationTotalSuccess :: Counter,
|
||||
stmCronEventsInvocationTotalFailure :: Counter,
|
||||
stmOneOffEventsInvocationTotalSuccess :: Counter,
|
||||
stmOneOffEventsInvocationTotalFailure :: Counter,
|
||||
stmCronEventsProcessedTotalSuccess :: Counter,
|
||||
stmCronEventsProcessedTotalFailure :: Counter,
|
||||
stmOneOffEventsProcessedTotalSuccess :: Counter,
|
||||
stmOneOffEventsProcessedTotalFailure :: Counter
|
||||
}
|
||||
|
||||
-- | Create dummy mutable references without associating them to a metrics
|
||||
-- store.
|
||||
makeDummyPrometheusMetrics :: IO PrometheusMetrics
|
||||
@ -79,8 +92,7 @@ makeDummyPrometheusMetrics = do
|
||||
pmWebSocketBytesSent <- Counter.new
|
||||
pmActionBytesReceived <- Counter.new
|
||||
pmActionBytesSent <- Counter.new
|
||||
pmScheduledTriggerBytesReceived <- Counter.new
|
||||
pmScheduledTriggerBytesSent <- Counter.new
|
||||
pmScheduledTriggerMetrics <- makeDummyScheduledTriggerMetrics
|
||||
pure PrometheusMetrics {..}
|
||||
|
||||
makeDummyGraphQLRequestMetrics :: IO GraphQLRequestMetrics
|
||||
@ -109,6 +121,20 @@ makeDummyEventTriggerMetrics = do
|
||||
eventInvocationTotalFailure <- Counter.new
|
||||
pure EventTriggerMetrics {..}
|
||||
|
||||
makeDummyScheduledTriggerMetrics :: IO ScheduledTriggerMetrics
|
||||
makeDummyScheduledTriggerMetrics = do
|
||||
stmScheduledTriggerBytesReceived <- Counter.new
|
||||
stmScheduledTriggerBytesSent <- Counter.new
|
||||
stmCronEventsInvocationTotalSuccess <- Counter.new
|
||||
stmCronEventsInvocationTotalFailure <- Counter.new
|
||||
stmOneOffEventsInvocationTotalSuccess <- Counter.new
|
||||
stmOneOffEventsInvocationTotalFailure <- Counter.new
|
||||
stmCronEventsProcessedTotalSuccess <- Counter.new
|
||||
stmCronEventsProcessedTotalFailure <- Counter.new
|
||||
stmOneOffEventsProcessedTotalSuccess <- Counter.new
|
||||
stmOneOffEventsProcessedTotalFailure <- Counter.new
|
||||
pure ScheduledTriggerMetrics {..}
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
-- | A mutable reference for atomically sampling the number of websocket
|
||||
|
Loading…
Reference in New Issue
Block a user