server: process event triggers with a timeout

PR-URL: https://github.com/hasura/graphql-engine-mono/pull/8823
GitOrigin-RevId: 4a38ef993fffe018ae16e232f2abb5d30c604855
This commit is contained in:
Puru Gupta 2023-04-24 14:57:22 +05:30 committed by hasura-bot
parent 1698f9dd91
commit 44d9987e92

View File

@ -95,6 +95,7 @@ import System.Metrics.Gauge qualified as EKG.Gauge
import System.Metrics.Prometheus.Counter qualified as Prometheus.Counter
import System.Metrics.Prometheus.Gauge qualified as Prometheus.Gauge
import System.Metrics.Prometheus.Histogram qualified as Prometheus.Histogram
import System.Timeout.Lifted (timeout)
newtype EventInternalErr
= EventInternalErr QErr
@ -270,6 +271,12 @@ logFetchedEventsStatistics logger backendEvents =
{-# ANN processEventQueue ("HLint: ignore Use withAsync" :: String) #-}
-- | `upperBoundEventTriggerTimeout` is the maximum amount of time
-- an event trigger can take to process. This function is intended
-- to use with a timeout.
upperBoundEventTriggerTimeout :: DiffTime
upperBoundEventTriggerTimeout = minutes 30
-- | Service events from our in-DB queue.
--
-- There are a few competing concerns and constraints here; we want to...
@ -442,7 +449,8 @@ processEventQueue logger statsLogger httpMgr getSchemaCache getEventEngineCtx ac
processEvent ::
forall io r b.
( MonadIO io,
( MonadBaseControl IO io,
MonadIO io,
MonadReader r io,
Has HTTP.Manager r,
Has (L.Logger L.Hasura) r,
@ -490,7 +498,6 @@ processEventQueue logger statsLogger httpMgr getSchemaCache getEventEngineCtx ac
runExceptT (setRetry sourceConfig e (addUTCTime 60 currentTime) maintenanceModeVersion)
>>= flip onLeft logQErr
Right eti -> trace (spanName eti) do
eventExecutionStartTime <- liftIO getCurrentTime
let webhook = wciCachedValue $ etiWebhookInfo eti
retryConf = etiRetryConf eti
timeoutSeconds = fromMaybe defaultTimeoutSeconds (rcTimeoutSec retryConf)
@ -501,72 +508,85 @@ processEventQueue logger statsLogger httpMgr getSchemaCache getEventEngineCtx ac
extraLogCtx = ExtraLogContext (epId ep) (Just $ etiName eti)
requestTransform = etiRequestTransform eti
responseTransform = mkResponseTransform <$> etiResponseTransform eti
eitherReqRes <-
runExceptT $
mkRequest headers httpTimeout payload requestTransform (_envVarValue webhook) >>= \reqDetails -> do
let request = extractRequest reqDetails
logger' res details = do
logHTTPForET res extraLogCtx details (_envVarName webhook) logHeaders
liftIO $ do
case res of
Left _err -> pure ()
Right response ->
Prometheus.Counter.add
(eventTriggerBytesReceived eventTriggerMetrics)
(hrsSize response)
let RequestDetails {_rdOriginalSize, _rdTransformedSize} = details
in Prometheus.Counter.add
(eventTriggerBytesSent eventTriggerMetrics)
(fromMaybe _rdOriginalSize _rdTransformedSize)
-- Event Triggers have a configuration parameter called
-- HASURA_GRAPHQL_EVENTS_HTTP_WORKERS, which is used
-- to control the concurrency of http delivery.
-- This bracket is used to increment and decrement an
-- HTTP Worker EKG Gauge for the duration of the
-- request invocation
resp <-
bracket_
( do
liftIO $ EKG.Gauge.inc $ smNumEventHTTPWorkers serverMetrics
liftIO $ Prometheus.Gauge.inc (eventTriggerHTTPWorkers eventTriggerMetrics)
)
( do
liftIO $ EKG.Gauge.dec $ smNumEventHTTPWorkers serverMetrics
liftIO $ Prometheus.Gauge.dec (eventTriggerHTTPWorkers eventTriggerMetrics)
)
(invokeRequest reqDetails responseTransform (_rdSessionVars reqDetails) logger')
pure (request, resp)
case eitherReqRes of
Right (req, resp) -> do
let reqBody = fromMaybe J.Null $ preview (HTTP.body . HTTP._RequestBodyLBS) req >>= J.decode @J.Value
processSuccess sourceConfig e logHeaders reqBody maintenanceModeVersion resp >>= flip onLeft logQErr
eventExecutionFinishTime <- liftIO getCurrentTime
let eventWebhookProcessingTime' = realToFrac $ diffUTCTime eventExecutionFinishTime eventExecutionStartTime
-- For event_processing_time, the start time is defined as the expected delivery time for an event, i.e.:
-- - For event with no retries: created_at time
-- - For event with retries: next_retry_at time
eventStartTime = fromMaybe (eCreatedAt e) (eRetryAt e)
-- The timestamps in the DB are supposed to be UTC time, so the timestamps (`eventExecutionFinishTime` and
-- `eventStartTime`) used here in calculation are all UTC time.
eventProcessingTime' = realToFrac $ diffUTCTime eventExecutionFinishTime eventStartTime
liftIO $ do
EKG.Distribution.add (smEventWebhookProcessingTime serverMetrics) eventWebhookProcessingTime'
Prometheus.Histogram.observe (eventWebhookProcessingTime eventTriggerMetrics) eventWebhookProcessingTime'
EKG.Distribution.add (smEventProcessingTime serverMetrics) eventProcessingTime'
Prometheus.Histogram.observe (eventProcessingTime eventTriggerMetrics) eventProcessingTime'
Prometheus.Counter.inc (eventProcessedTotalSuccess eventTriggerMetrics)
Prometheus.Counter.inc (eventInvocationTotalSuccess eventTriggerMetrics)
Left eventError -> do
-- TODO (paritosh): We can also add a label to the metric to indicate the type of error
liftIO $ Prometheus.Counter.inc (eventInvocationTotalFailure eventTriggerMetrics)
case eventError of
(HTTPError reqBody err) ->
processError @b sourceConfig e retryConf logHeaders reqBody maintenanceModeVersion eventTriggerMetrics err >>= flip onLeft logQErr
(TransformationError _ err) -> do
L.unLogger logger $ L.UnstructuredLog L.LevelError (SB.fromLBS $ J.encode err)
eventTriggerProcessingTimeout = maybe upperBoundEventTriggerTimeout (min upperBoundEventTriggerTimeout . fromIntegral) (rcTimeoutSec retryConf)
eventTriggerProcessAction = do
eventExecutionStartTime <- liftIO getCurrentTime
eitherReqRes <-
runExceptT $
mkRequest headers httpTimeout payload requestTransform (_envVarValue webhook) >>= \reqDetails -> do
let request = extractRequest reqDetails
logger' res details = do
logHTTPForET res extraLogCtx details (_envVarName webhook) logHeaders
liftIO $ do
case res of
Left _err -> pure ()
Right response ->
Prometheus.Counter.add
(eventTriggerBytesReceived eventTriggerMetrics)
(hrsSize response)
let RequestDetails {_rdOriginalSize, _rdTransformedSize} = details
in Prometheus.Counter.add
(eventTriggerBytesSent eventTriggerMetrics)
(fromMaybe _rdOriginalSize _rdTransformedSize)
-- Event Triggers have a configuration parameter called
-- HASURA_GRAPHQL_EVENTS_HTTP_WORKERS, which is used
-- to control the concurrency of http delivery.
-- This bracket is used to increment and decrement an
-- HTTP Worker EKG Gauge for the duration of the
-- request invocation
resp <-
bracket_
( do
liftIO $ EKG.Gauge.inc $ smNumEventHTTPWorkers serverMetrics
liftIO $ Prometheus.Gauge.inc (eventTriggerHTTPWorkers eventTriggerMetrics)
)
( do
liftIO $ EKG.Gauge.dec $ smNumEventHTTPWorkers serverMetrics
liftIO $ Prometheus.Gauge.dec (eventTriggerHTTPWorkers eventTriggerMetrics)
)
(invokeRequest reqDetails responseTransform (_rdSessionVars reqDetails) logger')
pure (request, resp)
case eitherReqRes of
Right (req, resp) -> do
let reqBody = fromMaybe J.Null $ preview (HTTP.body . HTTP._RequestBodyLBS) req >>= J.decode @J.Value
processSuccess sourceConfig e logHeaders reqBody maintenanceModeVersion resp >>= flip onLeft logQErr
eventExecutionFinishTime <- liftIO getCurrentTime
let eventWebhookProcessingTime' = realToFrac $ diffUTCTime eventExecutionFinishTime eventExecutionStartTime
-- For event_processing_time, the start time is defined as the expected delivery time for an event, i.e.:
-- - For event with no retries: created_at time
-- - For event with retries: next_retry_at time
eventStartTime = fromMaybe (eCreatedAt e) (eRetryAt e)
-- The timestamps in the DB are supposed to be UTC time, so the timestamps (`eventExecutionFinishTime` and
-- `eventStartTime`) used here in calculation are all UTC time.
eventProcessingTime' = realToFrac $ diffUTCTime eventExecutionFinishTime eventStartTime
liftIO $ do
EKG.Distribution.add (smEventWebhookProcessingTime serverMetrics) eventWebhookProcessingTime'
Prometheus.Histogram.observe (eventWebhookProcessingTime eventTriggerMetrics) eventWebhookProcessingTime'
EKG.Distribution.add (smEventProcessingTime serverMetrics) eventProcessingTime'
Prometheus.Histogram.observe (eventProcessingTime eventTriggerMetrics) eventProcessingTime'
Prometheus.Counter.inc (eventProcessedTotalSuccess eventTriggerMetrics)
Prometheus.Counter.inc (eventInvocationTotalSuccess eventTriggerMetrics)
Left eventError -> do
-- TODO (paritosh): We can also add a label to the metric to indicate the type of error
liftIO $ Prometheus.Counter.inc (eventInvocationTotalFailure eventTriggerMetrics)
case eventError of
(HTTPError reqBody err) ->
processError @b sourceConfig e retryConf logHeaders reqBody maintenanceModeVersion eventTriggerMetrics err >>= flip onLeft logQErr
(TransformationError _ err) -> do
L.unLogger logger $ L.UnstructuredLog L.LevelError (SB.fromLBS $ J.encode err)
-- Record an Event Error
recordError' @b sourceConfig e Nothing PESetError maintenanceModeVersion >>= flip onLeft logQErr
-- Try to process the event trigger with a timeout of min(`uppserBoundEventTriggerTimeout`, event's response timeout),
-- so that we're never blocked forever while processing a single event trigger.
--
-- If the request times out, then process it as an erroneous invocation and move on.
timeout (fromInteger (diffTimeToMicroSeconds eventTriggerProcessingTimeout)) eventTriggerProcessAction
`onNothingM` do
let eventTriggerTimeoutMessage = "Event Trigger " <> etiName eti <<> " timed out while processing."
processError @b sourceConfig e retryConf logHeaders J.Null maintenanceModeVersion eventTriggerMetrics (HOther $ T.unpack eventTriggerTimeoutMessage)
>>= flip onLeft logQErr
-- Record an Event Error
recordError' @b sourceConfig e Nothing PESetError maintenanceModeVersion >>= flip onLeft logQErr
-- removing an event from the _eeCtxLockedEvents after the event has been processed:
removeEventTriggerEventFromLockedEvents sourceName (eId e) leEvents