mirror of
https://github.com/hasura/graphql-engine.git
synced 2025-01-05 14:27:59 +03:00
75f0629c5d
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/9873 GitOrigin-RevId: 703a16da479d35908a9a8c2862884d11a3135731
1232 lines
50 KiB
Haskell
1232 lines
50 KiB
Haskell
{-# LANGUAGE PatternSynonyms #-}
|
||
{-# LANGUAGE QuasiQuotes #-}
|
||
|
||
-- |
|
||
-- = Scheduled Triggers
|
||
--
|
||
-- This module implements the functionality of invoking webhooks during specified
|
||
-- time events aka scheduled events. The scheduled events are the events generated
|
||
-- by the graphql-engine using the cron triggers or/and a scheduled event can
|
||
-- be created by the user at a specified time with the payload, webhook, headers
|
||
-- and the retry configuration. Scheduled events are modeled using rows in Postgres
|
||
-- with a @timestamp@ column.
|
||
--
|
||
-- This module implements scheduling and delivery of scheduled
|
||
-- events:
|
||
--
|
||
-- 1. Scheduling a cron event involves creating new cron events. New
|
||
-- cron events are created based on the cron schedule and the number of
|
||
-- scheduled events that are already present in the scheduled events buffer.
|
||
-- The graphql-engine computes the new scheduled events and writes them to
|
||
-- the database.(Generator)
|
||
--
|
||
-- 2. Delivering a scheduled event involves reading undelivered scheduled events
|
||
-- from the database and delivering them to the webhook server. (Processor)
|
||
--
|
||
-- The rationale behind separating the event scheduling and event delivery
|
||
-- mechanism into two different threads is that the scheduling and delivering of
|
||
-- the scheduled events are not directly dependent on each other. The generator
|
||
-- will almost always try to create scheduled events which are supposed to be
|
||
-- delivered in the future (timestamp > current_timestamp) and the processor
|
||
-- will fetch scheduled events of the past (timestamp < current_timestamp). So,
|
||
-- the set of the scheduled events generated by the generator and the processor
|
||
-- will never be the same. The point here is that they're not correlated to each
|
||
-- other. They can be split into different threads for a better performance.
|
||
--
|
||
-- == Implementation
|
||
--
|
||
-- The scheduled triggers eventing is being implemented in the metadata storage.
|
||
-- All functions that make interaction to storage system are abstracted in
|
||
-- the @'MonadMetadataStorage' class.
|
||
--
|
||
-- During the startup, two threads are started:
|
||
--
|
||
-- 1. Generator: Fetches the list of scheduled triggers from cache and generates
|
||
-- the scheduled events.
|
||
--
|
||
-- - Additional events will be generated only if there are fewer than 100
|
||
-- scheduled events.
|
||
--
|
||
-- - The upcoming events timestamp will be generated using:
|
||
--
|
||
-- - cron schedule of the scheduled trigger
|
||
--
|
||
-- - max timestamp of the scheduled events that already exist or
|
||
-- current_timestamp(when no scheduled events exist)
|
||
--
|
||
-- - The timestamp of the scheduled events is stored with timezone because
|
||
-- `SELECT NOW()` returns timestamp with timezone, so it's good to
|
||
-- compare two things of the same type.
|
||
--
|
||
-- This effectively corresponds to doing an INSERT with values containing
|
||
-- specific timestamp.
|
||
--
|
||
-- 2. Processor: Fetches the undelivered cron events and the scheduled events
|
||
-- from the database and which have timestamp lesser than the
|
||
-- current timestamp and then process them.
|
||
--
|
||
-- TODO
|
||
-- - Consider and document ordering guarantees
|
||
-- - do we have any in the presence of multiple hasura instances?
|
||
-- - If we have nothing useful to say about ordering, then consider processing
|
||
-- events asynchronously, so that a slow webhook doesn't cause everything
|
||
-- subsequent to be delayed
|
||
module Hasura.Eventing.ScheduledTrigger
|
||
( runCronEventsGenerator,
|
||
processScheduledTriggers,
|
||
generateScheduleTimes,
|
||
CronEventSeed (..),
|
||
LockedEventsCtx (..),
|
||
|
||
-- * Cron trigger stats logger
|
||
createFetchedCronTriggerStatsLogger,
|
||
closeFetchedCronTriggersStatsLogger,
|
||
|
||
-- * Scheduled events stats logger
|
||
createFetchedScheduledEventsStatsLogger,
|
||
closeFetchedScheduledEventsStatsLogger,
|
||
|
||
-- * Database interactions
|
||
|
||
-- Following function names are similar to those present in
|
||
-- 'MonadMetadataStorage' type class. To avoid duplication,
|
||
-- 'Tx' is suffixed to identify as database transactions
|
||
getDeprivedCronTriggerStatsTx,
|
||
getScheduledEventsForDeliveryTx,
|
||
insertInvocationTx,
|
||
setScheduledEventOpTx,
|
||
unlockScheduledEventsTx,
|
||
unlockAllLockedScheduledEventsTx,
|
||
insertCronEventsTx,
|
||
insertOneOffScheduledEventTx,
|
||
dropFutureCronEventsTx,
|
||
getOneOffScheduledEventsTx,
|
||
getCronEventsTx,
|
||
deleteScheduledEventTx,
|
||
getScheduledEventInvocationsTx,
|
||
getScheduledEventsInvocationsQuery,
|
||
getScheduledEventsInvocationsQueryNoPagination,
|
||
|
||
-- * Export utility functions which are useful to build
|
||
|
||
-- SQLs for fetching data from metadata storage
|
||
mkScheduledEventStatusFilter,
|
||
scheduledTimeOrderBy,
|
||
executeWithOptionalTotalCount,
|
||
mkPaginationSelectExp,
|
||
withCount,
|
||
invocationFieldExtractors,
|
||
mkEventIdBoolExp,
|
||
EventTables (..),
|
||
)
|
||
where
|
||
|
||
import Control.Concurrent.Async.Lifted (forConcurrently_)
|
||
import Control.Concurrent.Extended (Forever (..), sleep)
|
||
import Control.Concurrent.STM
|
||
import Control.Lens (preview)
|
||
import Control.Monad.Trans.Control (MonadBaseControl)
|
||
import Data.Aeson qualified as J
|
||
import Data.Environment qualified as Env
|
||
import Data.Has
|
||
import Data.HashMap.Strict qualified as HashMap
|
||
import Data.Int (Int64)
|
||
import Data.List.NonEmpty qualified as NE
|
||
import Data.SerializableBlob qualified as SB
|
||
import Data.Set qualified as Set
|
||
import Data.Text qualified as T
|
||
import Data.Text.Extended (ToTxt (..), (<<>))
|
||
import Data.These
|
||
import Data.Time.Clock
|
||
import Data.URL.Template (printTemplate)
|
||
import Database.PG.Query qualified as PG
|
||
import Hasura.Backends.Postgres.Execute.Types
|
||
import Hasura.Backends.Postgres.SQL.DML qualified as S
|
||
import Hasura.Backends.Postgres.SQL.Types
|
||
import Hasura.Base.Error
|
||
import Hasura.Eventing.Common
|
||
import Hasura.Eventing.HTTP
|
||
import Hasura.Eventing.ScheduledTrigger.Types
|
||
import Hasura.HTTP (getHTTPExceptionStatus)
|
||
import Hasura.Logging qualified as L
|
||
import Hasura.Metadata.Class
|
||
import Hasura.Prelude
|
||
import Hasura.RQL.DDL.EventTrigger (ResolveHeaderError, getHeaderInfosFromConfEither)
|
||
import Hasura.RQL.DDL.Webhook.Transform
|
||
import Hasura.RQL.Types.Common
|
||
import Hasura.RQL.Types.EventTrigger
|
||
import Hasura.RQL.Types.Eventing
|
||
import Hasura.RQL.Types.ScheduledTrigger
|
||
import Hasura.RQL.Types.SchemaCache
|
||
import Hasura.SQL.Types
|
||
import Hasura.Server.Prometheus (ScheduledTriggerMetrics (..))
|
||
import Hasura.Server.Types (TriggersErrorLogLevelStatus (..))
|
||
import Hasura.Tracing qualified as Tracing
|
||
import Network.HTTP.Client.Transformable qualified as HTTP
|
||
import Refined (unrefine)
|
||
import System.Metrics.Prometheus.Counter as Prometheus.Counter
|
||
import System.Timeout.Lifted (timeout)
|
||
import Text.Builder qualified as TB
|
||
|
||
-- | runCronEventsGenerator makes sure that all the cron triggers
|
||
-- have an adequate buffer of cron events.
|
||
runCronEventsGenerator ::
|
||
( MonadIO m,
|
||
MonadMetadataStorage m
|
||
) =>
|
||
L.Logger L.Hasura ->
|
||
FetchedCronTriggerStatsLogger ->
|
||
IO SchemaCache ->
|
||
m void
|
||
runCronEventsGenerator logger cronTriggerStatsLogger getSC = do
|
||
forever $ do
|
||
sc <- liftIO getSC
|
||
-- get cron triggers from cache
|
||
let cronTriggersCache = scCronTriggers sc
|
||
|
||
unless (HashMap.null cronTriggersCache) $ do
|
||
-- Poll the DB only when there's at-least one cron trigger present
|
||
-- in the schema cache
|
||
-- get cron trigger stats from db
|
||
-- When shutdown is initiated, we stop generating new cron events
|
||
eitherRes <- runExceptT $ do
|
||
deprivedCronTriggerStats <- liftEitherM $ getDeprivedCronTriggerStats $ HashMap.keys cronTriggersCache
|
||
-- Log fetched deprived cron trigger stats
|
||
logFetchedCronTriggersStats cronTriggerStatsLogger deprivedCronTriggerStats
|
||
-- join stats with cron triggers and produce @[(CronTriggerInfo, CronTriggerStats)]@
|
||
cronTriggersForHydrationWithStats <-
|
||
catMaybes
|
||
<$> mapM (withCronTrigger cronTriggersCache) deprivedCronTriggerStats
|
||
insertCronEventsFor cronTriggersForHydrationWithStats
|
||
|
||
onLeft eitherRes $ L.unLogger logger . ScheduledTriggerInternalErr
|
||
|
||
-- See discussion: https://github.com/hasura/graphql-engine-mono/issues/1001
|
||
liftIO $ sleep (minutes 1)
|
||
where
|
||
withCronTrigger cronTriggerCache cronTriggerStat = do
|
||
case HashMap.lookup (_ctsName cronTriggerStat) cronTriggerCache of
|
||
Nothing -> do
|
||
L.unLogger logger
|
||
$ ScheduledTriggerInternalErr
|
||
$ err500 Unexpected "could not find scheduled trigger in the schema cache"
|
||
pure Nothing
|
||
Just cronTrigger ->
|
||
pure
|
||
$ Just (cronTrigger, cronTriggerStat)
|
||
|
||
insertCronEventsFor ::
|
||
(MonadMetadataStorage m, MonadError QErr m) =>
|
||
[(CronTriggerInfo, CronTriggerStats)] ->
|
||
m ()
|
||
insertCronEventsFor cronTriggersWithStats = do
|
||
let scheduledEvents = flip concatMap cronTriggersWithStats $ \(cti, stats) ->
|
||
generateCronEventsFrom (_ctsMaxScheduledTime stats) cti
|
||
case scheduledEvents of
|
||
[] -> pure ()
|
||
events -> liftEitherM $ insertCronEvents events
|
||
|
||
generateCronEventsFrom :: UTCTime -> CronTriggerInfo -> [CronEventSeed]
|
||
generateCronEventsFrom startTime CronTriggerInfo {..} =
|
||
map (CronEventSeed ctiName)
|
||
$
|
||
-- generate next 100 events; see getDeprivedCronTriggerStatsTx:
|
||
generateScheduleTimes startTime 100 ctiSchedule
|
||
|
||
-- | `upperBoundScheduledEventTimeout` is the maximum amount of time
|
||
-- a scheduled event can take to process. This function is intended
|
||
-- to use with a timeout.
|
||
upperBoundScheduledEventTimeout :: DiffTime
|
||
upperBoundScheduledEventTimeout = minutes 30
|
||
|
||
processCronEvents ::
|
||
( MonadIO m,
|
||
MonadMetadataStorage m,
|
||
Tracing.MonadTrace m,
|
||
MonadBaseControl IO m
|
||
) =>
|
||
L.Logger L.Hasura ->
|
||
HTTP.Manager ->
|
||
ScheduledTriggerMetrics ->
|
||
[CronEvent] ->
|
||
HashMap TriggerName CronTriggerInfo ->
|
||
TVar (Set.Set CronEventId) ->
|
||
TriggersErrorLogLevelStatus ->
|
||
m ()
|
||
processCronEvents logger httpMgr scheduledTriggerMetrics cronEvents cronTriggersInfo lockedCronEvents triggersErrorLogLevelStatus = do
|
||
-- save the locked cron events that have been fetched from the
|
||
-- database, the events stored here will be unlocked in case a
|
||
-- graceful shutdown is initiated in midst of processing these events
|
||
saveLockedEvents (map _ceId cronEvents) lockedCronEvents
|
||
-- The `createdAt` of a cron event is the `created_at` of the cron trigger
|
||
forConcurrently_ cronEvents $ \(CronEvent id' name st _ tries _ _) -> do
|
||
case HashMap.lookup name cronTriggersInfo of
|
||
Nothing ->
|
||
logInternalError
|
||
$ err500 Unexpected
|
||
$ "could not find cron trigger "
|
||
<> name
|
||
<<> " in the schema cache"
|
||
Just CronTriggerInfo {..} -> do
|
||
let payload =
|
||
ScheduledEventWebhookPayload
|
||
id'
|
||
(Just name)
|
||
st
|
||
(fromMaybe J.Null ctiPayload)
|
||
ctiComment
|
||
Nothing
|
||
ctiRequestTransform
|
||
ctiResponseTransform
|
||
retryCtx = RetryContext tries ctiRetryConf
|
||
eventProcessingTimeout = min upperBoundScheduledEventTimeout (unrefine $ strcTimeoutSeconds $ ctiRetryConf)
|
||
processScheduledEventAction =
|
||
runExceptT
|
||
$ flip runReaderT (logger, httpMgr)
|
||
$ processScheduledEvent
|
||
scheduledTriggerMetrics
|
||
id'
|
||
ctiHeaders
|
||
retryCtx
|
||
payload
|
||
ctiWebhookInfo
|
||
Cron
|
||
triggersErrorLogLevelStatus
|
||
eventProcessedMaybe <-
|
||
timeout (fromInteger (diffTimeToMicroSeconds eventProcessingTimeout)) $ processScheduledEventAction
|
||
case eventProcessedMaybe of
|
||
Nothing -> do
|
||
let eventTimeoutMessage = "Cron Scheduled event " <> id' <<> " of cron trigger " <> name <<> " timed out while processing."
|
||
eventTimeoutError = err500 TimeoutErrorCode eventTimeoutMessage
|
||
logInternalError eventTimeoutError
|
||
runExceptT (processError id' retryCtx [] Cron (mkErrorObject eventTimeoutMessage) (HOther $ T.unpack eventTimeoutMessage) scheduledTriggerMetrics)
|
||
>>= (`onLeft` logInternalError)
|
||
Just finally -> onLeft finally logInternalError
|
||
removeEventFromLockedEvents id' lockedCronEvents
|
||
where
|
||
logInternalError err = liftIO . L.unLogger logger $ ScheduledTriggerInternalErr err
|
||
|
||
mkErrorObject :: Text -> J.Value
|
||
mkErrorObject errorMessage =
|
||
J.object $ ["error" J..= errorMessage]
|
||
|
||
processOneOffScheduledEvents ::
|
||
( MonadIO m,
|
||
Tracing.MonadTrace m,
|
||
MonadMetadataStorage m,
|
||
MonadBaseControl IO m
|
||
) =>
|
||
Env.Environment ->
|
||
L.Logger L.Hasura ->
|
||
HTTP.Manager ->
|
||
ScheduledTriggerMetrics ->
|
||
[OneOffScheduledEvent] ->
|
||
TVar (Set.Set OneOffScheduledEventId) ->
|
||
TriggersErrorLogLevelStatus ->
|
||
m ()
|
||
processOneOffScheduledEvents
|
||
env
|
||
logger
|
||
httpMgr
|
||
scheduledTriggerMetrics
|
||
oneOffEvents
|
||
lockedOneOffScheduledEvents
|
||
triggersErrorLogLevelStatus = do
|
||
-- save the locked one-off events that have been fetched from the
|
||
-- database, the events stored here will be unlocked in case a
|
||
-- graceful shutdown is initiated in midst of processing these events
|
||
saveLockedEvents (map _ooseId oneOffEvents) lockedOneOffScheduledEvents
|
||
forConcurrently_ oneOffEvents $ \OneOffScheduledEvent {..} -> do
|
||
(either logInternalError pure) =<< runExceptT do
|
||
let payload =
|
||
ScheduledEventWebhookPayload
|
||
_ooseId
|
||
Nothing
|
||
_ooseScheduledTime
|
||
(fromMaybe J.Null _oosePayload)
|
||
_ooseComment
|
||
(Just _ooseCreatedAt)
|
||
_ooseRequestTransform
|
||
_ooseResponseTransform
|
||
retryCtx = RetryContext _ooseTries _ooseRetryConf
|
||
resolvedWebhookInfoEither = resolveWebhookEither env _ooseWebhookConf
|
||
resolvedHeaderInfoEither = getHeaderInfosFromConfEither env _ooseHeaderConf
|
||
-- `webhookAndHeaderInfo` returns webhook and header info (and errors)
|
||
webhookAndHeaderInfo = case (resolvedWebhookInfoEither, resolvedHeaderInfoEither) of
|
||
(Right resolvedEventWebhookInfo, Right resolvedEventHeaderInfo) -> do
|
||
let resolvedWebhookEnvRecord = EnvRecord (getTemplateFromUrl _ooseWebhookConf) resolvedEventWebhookInfo
|
||
Right (resolvedWebhookEnvRecord, resolvedEventHeaderInfo)
|
||
(Left eventWebhookErrorVars, Right _) -> Left $ This eventWebhookErrorVars
|
||
(Right _, Left eventHeaderErrorVars) -> Left $ That eventHeaderErrorVars
|
||
(Left eventWebhookErrors, Left eventHeaderErrorVars) -> Left $ These eventWebhookErrors eventHeaderErrorVars
|
||
case webhookAndHeaderInfo of
|
||
Right (webhookEnvRecord, eventHeaderInfo) -> do
|
||
let processScheduledEventAction =
|
||
flip runReaderT (logger, httpMgr)
|
||
$ processScheduledEvent scheduledTriggerMetrics _ooseId eventHeaderInfo retryCtx payload webhookEnvRecord OneOff triggersErrorLogLevelStatus
|
||
|
||
eventTimeout = unrefine $ strcTimeoutSeconds $ _ooseRetryConf
|
||
|
||
-- Try to process the event with a timeout of min(`uppserBoundScheduledEventTimeout`, event's response timeout),
|
||
-- so that we're never blocked forever while processing a single event.
|
||
--
|
||
-- If the request times out, then process it as an erroneous invocation and move on.
|
||
timeout (fromInteger (diffTimeToMicroSeconds (min upperBoundScheduledEventTimeout eventTimeout))) processScheduledEventAction
|
||
`onNothingM` ( do
|
||
let eventTimeoutMessage = "One-off Scheduled event " <> _ooseId <<> " timed out while processing."
|
||
eventTimeoutError = err500 TimeoutErrorCode eventTimeoutMessage
|
||
lift $ logInternalError eventTimeoutError
|
||
processError _ooseId retryCtx [] OneOff (mkErrorObject eventTimeoutMessage) (HOther $ T.unpack eventTimeoutMessage) scheduledTriggerMetrics
|
||
)
|
||
removeEventFromLockedEvents _ooseId lockedOneOffScheduledEvents
|
||
Left envVarError ->
|
||
processError
|
||
_ooseId
|
||
retryCtx
|
||
[]
|
||
OneOff
|
||
(mkErrorObject $ "Error creating the request. " <> (mkInvalidEnvVarErrMsg $ envVarError))
|
||
(HOther $ T.unpack $ qeError (err400 NotFound (mkInvalidEnvVarErrMsg envVarError)))
|
||
scheduledTriggerMetrics
|
||
where
|
||
logInternalError err = liftIO . L.unLogger logger $ ScheduledTriggerInternalErr err
|
||
getTemplateFromUrl url = printTemplate $ unInputWebhook url
|
||
mkInvalidEnvVarErrMsg envVarErrorValues = "The value for environment variables not found: " <> (getInvalidEnvVarText envVarErrorValues)
|
||
mkErrorObject :: Text -> J.Value
|
||
mkErrorObject errorMessage =
|
||
J.object $ ["error" J..= errorMessage]
|
||
getInvalidEnvVarText :: These ResolveWebhookError ResolveHeaderError -> Text
|
||
getInvalidEnvVarText (This a) = toTxt a
|
||
getInvalidEnvVarText (That b) = toTxt b
|
||
getInvalidEnvVarText (These a b) = toTxt a <> ", " <> toTxt b
|
||
|
||
processScheduledTriggers ::
|
||
( MonadIO m,
|
||
Tracing.MonadTrace m,
|
||
MonadMetadataStorage m,
|
||
MonadBaseControl IO m
|
||
) =>
|
||
IO Env.Environment ->
|
||
L.Logger L.Hasura ->
|
||
FetchedScheduledEventsStatsLogger ->
|
||
HTTP.Manager ->
|
||
ScheduledTriggerMetrics ->
|
||
IO SchemaCache ->
|
||
LockedEventsCtx ->
|
||
TriggersErrorLogLevelStatus ->
|
||
m (Forever m)
|
||
processScheduledTriggers getEnvHook logger statsLogger httpMgr scheduledTriggerMetrics getSC LockedEventsCtx {..} triggersErrorLogLevelStatus = do
|
||
return
|
||
$ Forever ()
|
||
$ const do
|
||
cronTriggersInfo <- scCronTriggers <$> liftIO getSC
|
||
env <- liftIO getEnvHook
|
||
getScheduledEventsForDelivery (HashMap.keys cronTriggersInfo) >>= \case
|
||
Left e -> logInternalError e
|
||
Right (cronEvents, oneOffEvents) -> do
|
||
logFetchedScheduledEventsStats statsLogger (CronEventsCount $ length cronEvents) (OneOffScheduledEventsCount $ length oneOffEvents)
|
||
processCronEvents logger httpMgr scheduledTriggerMetrics cronEvents cronTriggersInfo leCronEvents triggersErrorLogLevelStatus
|
||
processOneOffScheduledEvents env logger httpMgr scheduledTriggerMetrics oneOffEvents leOneOffEvents triggersErrorLogLevelStatus
|
||
-- NOTE: cron events are scheduled at times with minute resolution (as on
|
||
-- unix), while one-off events can be set for arbitrary times. The sleep
|
||
-- time here determines how overdue a scheduled event (cron or one-off)
|
||
-- might be before we begin processing:
|
||
liftIO $ sleep (seconds 10)
|
||
where
|
||
logInternalError err = liftIO . L.unLogger logger $ ScheduledTriggerInternalErr err
|
||
|
||
processScheduledEvent ::
|
||
( MonadReader r m,
|
||
Has HTTP.Manager r,
|
||
Has (L.Logger L.Hasura) r,
|
||
MonadIO m,
|
||
Tracing.MonadTrace m,
|
||
MonadMetadataStorage m,
|
||
MonadError QErr m
|
||
) =>
|
||
ScheduledTriggerMetrics ->
|
||
ScheduledEventId ->
|
||
[EventHeaderInfo] ->
|
||
RetryContext ->
|
||
ScheduledEventWebhookPayload ->
|
||
EnvRecord ResolvedWebhook ->
|
||
ScheduledEventType ->
|
||
TriggersErrorLogLevelStatus ->
|
||
m ()
|
||
processScheduledEvent scheduledTriggerMetrics eventId eventHeaders retryCtx payload webhookUrl type' triggersErrorLogLevelStatus =
|
||
Tracing.newTrace Tracing.sampleAlways traceNote do
|
||
currentTime <- liftIO getCurrentTime
|
||
let retryConf = _rctxConf retryCtx
|
||
scheduledTime = sewpScheduledTime payload
|
||
if convertDuration (diffUTCTime currentTime scheduledTime)
|
||
> unrefine (strcToleranceSeconds retryConf)
|
||
then processDead eventId type'
|
||
else do
|
||
let timeoutSeconds = round $ unrefine (strcTimeoutSeconds retryConf)
|
||
httpTimeout = HTTP.responseTimeoutMicro (timeoutSeconds * 1000000)
|
||
(headers, decodedHeaders) = prepareHeaders eventHeaders
|
||
extraLogCtx = ExtraLogContext eventId (sewpName payload)
|
||
webhookReqBodyJson = J.toJSON payload
|
||
webhookReqBody = J.encode webhookReqBodyJson
|
||
requestTransform = sewpRequestTransform payload
|
||
responseTransform = mkResponseTransform <$> sewpResponseTransform payload
|
||
|
||
eitherReqRes <-
|
||
runExceptT
|
||
$ mkRequest headers httpTimeout webhookReqBody requestTransform (_envVarValue webhookUrl)
|
||
>>= \reqDetails -> do
|
||
let request = extractRequest reqDetails
|
||
logger e d = do
|
||
logHTTPForST e extraLogCtx d (_envVarName webhookUrl) decodedHeaders triggersErrorLogLevelStatus
|
||
liftIO $ do
|
||
case e of
|
||
Left _err -> pure ()
|
||
Right response ->
|
||
Prometheus.Counter.add
|
||
(stmScheduledTriggerBytesReceived scheduledTriggerMetrics)
|
||
(hrsSize response)
|
||
let RequestDetails {_rdOriginalSize, _rdTransformedSize} = d
|
||
in Prometheus.Counter.add
|
||
(stmScheduledTriggerBytesSent scheduledTriggerMetrics)
|
||
(fromMaybe _rdOriginalSize _rdTransformedSize)
|
||
case (type', e) of
|
||
(Cron, Left _err) -> Prometheus.Counter.inc (stmCronEventsInvocationTotalFailure scheduledTriggerMetrics)
|
||
(Cron, Right _) -> Prometheus.Counter.inc (stmCronEventsInvocationTotalSuccess scheduledTriggerMetrics)
|
||
(OneOff, Left _err) -> Prometheus.Counter.inc (stmOneOffEventsInvocationTotalFailure scheduledTriggerMetrics)
|
||
(OneOff, Right _) -> Prometheus.Counter.inc (stmOneOffEventsInvocationTotalSuccess scheduledTriggerMetrics)
|
||
sessionVars = _rdSessionVars reqDetails
|
||
resp <- invokeRequest reqDetails responseTransform sessionVars logger
|
||
pure (request, resp)
|
||
case eitherReqRes of
|
||
Right (req, resp) ->
|
||
let reqBody = fromMaybe J.Null $ preview (HTTP.body . HTTP._RequestBodyLBS) req >>= J.decode @J.Value
|
||
in processSuccess eventId decodedHeaders type' reqBody resp scheduledTriggerMetrics
|
||
Left (HTTPError reqBody e) -> processError eventId retryCtx decodedHeaders type' reqBody e scheduledTriggerMetrics
|
||
Left (TransformationError _ e) -> do
|
||
-- Log The Transformation Error
|
||
logger :: L.Logger L.Hasura <- asks getter
|
||
L.unLogger logger $ L.UnstructuredLog L.LevelError (SB.fromLBS $ J.encode e)
|
||
|
||
-- Set event state to Error
|
||
liftEitherM $ setScheduledEventOp eventId (SEOpStatus SESError) type'
|
||
where
|
||
traceNote = "Scheduled trigger" <> foldMap ((": " <>) . triggerNameToTxt) (sewpName payload)
|
||
|
||
processError ::
|
||
( MonadIO m,
|
||
MonadMetadataStorage m,
|
||
MonadError QErr m
|
||
) =>
|
||
ScheduledEventId ->
|
||
RetryContext ->
|
||
[HeaderConf] ->
|
||
ScheduledEventType ->
|
||
J.Value ->
|
||
HTTPErr a ->
|
||
ScheduledTriggerMetrics ->
|
||
m ()
|
||
processError eventId retryCtx decodedHeaders type' reqJson err scheduledTriggerMetric = do
|
||
let invocation = case err of
|
||
HClient httpException ->
|
||
let statusMaybe = getHTTPExceptionStatus httpException
|
||
in mkInvocation eventId statusMaybe decodedHeaders (SB.fromLBS $ httpExceptionErrorEncoding httpException) [] reqJson
|
||
HStatus errResp -> do
|
||
let respPayload = hrsBody errResp
|
||
respHeaders = hrsHeaders errResp
|
||
respStatus = hrsStatus errResp
|
||
mkInvocation eventId (Just respStatus) decodedHeaders respPayload respHeaders reqJson
|
||
HOther detail -> do
|
||
let errMsg = (SB.fromLBS $ J.encode detail)
|
||
mkInvocation eventId (Just 500) decodedHeaders errMsg [] reqJson
|
||
liftEitherM $ insertScheduledEventInvocation invocation type'
|
||
retryOrMarkError eventId retryCtx err type' scheduledTriggerMetric
|
||
|
||
retryOrMarkError ::
|
||
(MonadIO m, MonadMetadataStorage m, MonadError QErr m) =>
|
||
ScheduledEventId ->
|
||
RetryContext ->
|
||
HTTPErr a ->
|
||
ScheduledEventType ->
|
||
ScheduledTriggerMetrics ->
|
||
m ()
|
||
retryOrMarkError eventId retryCtx err type' scheduledTriggerMetric = do
|
||
let RetryContext tries retryConf = retryCtx
|
||
mRetryHeader = getRetryAfterHeaderFromHTTPErr err
|
||
mRetryHeaderSeconds = parseRetryHeaderValue =<< mRetryHeader
|
||
triesExhausted = tries >= strcNumRetries retryConf
|
||
noRetryHeader = isNothing mRetryHeaderSeconds
|
||
if triesExhausted && noRetryHeader
|
||
then do
|
||
liftEitherM $ setScheduledEventOp eventId (SEOpStatus SESError) type'
|
||
case type' of
|
||
Cron -> liftIO $ Prometheus.Counter.inc (stmCronEventsProcessedTotalFailure scheduledTriggerMetric)
|
||
OneOff -> liftIO $ Prometheus.Counter.inc (stmOneOffEventsProcessedTotalFailure scheduledTriggerMetric)
|
||
else do
|
||
currentTime <- liftIO getCurrentTime
|
||
let delay =
|
||
fromMaybe
|
||
( round $ unrefine (strcRetryIntervalSeconds retryConf)
|
||
)
|
||
mRetryHeaderSeconds
|
||
diff = fromIntegral delay
|
||
retryTime = addUTCTime diff currentTime
|
||
liftEitherM $ setScheduledEventOp eventId (SEOpRetry retryTime) type'
|
||
|
||
{- Note [Scheduled event lifecycle]
|
||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||
Scheduled events move between six different states over the course of their
|
||
lifetime, as represented by the following flowchart:
|
||
|
||
┌───────────┐ ┌────────┐ ┌───────────┐
|
||
│ scheduled │─(1)─→│ locked │─(2)─→│ delivered │
|
||
└───────────┘ └────────┘ └───────────┘
|
||
↑ │ ┌───────┐
|
||
└────(3)───────┼─────(4)──→│ error │
|
||
│ └───────┘
|
||
│ ┌──────┐
|
||
└─────(5)──→│ dead │
|
||
└──────┘
|
||
|
||
When a scheduled event is first created, it starts in the 'scheduled' state,
|
||
and it can transition to other states in the following ways:
|
||
1. When graphql-engine fetches a scheduled event from the database to process
|
||
it, it sets its state to 'locked'. This prevents multiple graphql-engine
|
||
instances running on the same database from processing the same
|
||
scheduled event concurrently.
|
||
2. When a scheduled event is processed successfully, it is marked 'delivered'.
|
||
3. If a scheduled event fails to be processed, but it hasn’t yet reached
|
||
its maximum retry limit, its retry counter is incremented and
|
||
it is returned to the 'scheduled' state.
|
||
4. If a scheduled event fails to be processed and *has* reached its
|
||
retry limit, its state is set to 'error'.
|
||
5. If for whatever reason the difference between the current time and the
|
||
scheduled time is greater than the tolerance of the scheduled event, it
|
||
will not be processed and its state will be set to 'dead'.
|
||
-}
|
||
|
||
processSuccess ::
|
||
(MonadMetadataStorage m, MonadError QErr m, MonadIO m) =>
|
||
ScheduledEventId ->
|
||
[HeaderConf] ->
|
||
ScheduledEventType ->
|
||
J.Value ->
|
||
HTTPResp a ->
|
||
ScheduledTriggerMetrics ->
|
||
m ()
|
||
processSuccess eventId decodedHeaders type' reqBodyJson resp scheduledTriggerMetric = do
|
||
let respBody = hrsBody resp
|
||
respHeaders = hrsHeaders resp
|
||
respStatus = hrsStatus resp
|
||
invocation = mkInvocation eventId (Just respStatus) decodedHeaders respBody respHeaders reqBodyJson
|
||
liftEitherM $ insertScheduledEventInvocation invocation type'
|
||
liftEitherM $ setScheduledEventOp eventId (SEOpStatus SESDelivered) type'
|
||
case type' of
|
||
Cron -> liftIO $ Prometheus.Counter.inc (stmCronEventsProcessedTotalSuccess scheduledTriggerMetric)
|
||
OneOff -> liftIO $ Prometheus.Counter.inc (stmOneOffEventsProcessedTotalSuccess scheduledTriggerMetric)
|
||
|
||
processDead ::
|
||
(MonadMetadataStorage m, MonadError QErr m) =>
|
||
ScheduledEventId ->
|
||
ScheduledEventType ->
|
||
m ()
|
||
processDead eventId type' =
|
||
liftEitherM $ setScheduledEventOp eventId (SEOpStatus SESDead) type'
|
||
|
||
mkInvocation ::
|
||
ScheduledEventId ->
|
||
Maybe Int ->
|
||
[HeaderConf] ->
|
||
SB.SerializableBlob ->
|
||
[HeaderConf] ->
|
||
J.Value ->
|
||
(Invocation 'ScheduledType)
|
||
mkInvocation eventId status reqHeaders respBody respHeaders reqBodyJson =
|
||
Invocation
|
||
eventId
|
||
status
|
||
(mkWebhookReq reqBodyJson reqHeaders invocationVersionST)
|
||
(mkInvocationResp status respBody respHeaders)
|
||
|
||
-- metadata database transactions
|
||
|
||
-- | Get cron trigger stats for cron jobs with fewer than 100 future reified
|
||
-- events in the database
|
||
--
|
||
-- The point here is to maintain a certain number of future events so the user
|
||
-- can kind of see what's coming up, and obviously to give 'processCronEvents'
|
||
-- something to do.
|
||
getDeprivedCronTriggerStatsTx :: [TriggerName] -> PG.TxE QErr [CronTriggerStats]
|
||
getDeprivedCronTriggerStatsTx cronTriggerNames =
|
||
map (\(n, count, maxTx) -> CronTriggerStats n count maxTx)
|
||
<$> PG.withQE
|
||
defaultTxErrorHandler
|
||
[PG.sql|
|
||
SELECT t.trigger_name, coalesce(q.upcoming_events_count, 0), coalesce(q.max_scheduled_time, now())
|
||
FROM (SELECT UNNEST ($1::text[]) as trigger_name) as t
|
||
LEFT JOIN
|
||
( SELECT
|
||
trigger_name,
|
||
count(1) as upcoming_events_count,
|
||
max(scheduled_time) as max_scheduled_time
|
||
FROM hdb_catalog.hdb_cron_events
|
||
WHERE tries = 0 and status = 'scheduled'
|
||
GROUP BY trigger_name
|
||
) AS q
|
||
ON t.trigger_name = q.trigger_name
|
||
WHERE coalesce(q.upcoming_events_count, 0) < 100
|
||
|]
|
||
(Identity $ PGTextArray $ map triggerNameToTxt cronTriggerNames)
|
||
True
|
||
|
||
-- TODO
|
||
-- - cron events have minute resolution, while one-off events have arbitrary
|
||
-- resolution, so it doesn't make sense to fetch them at the same rate
|
||
-- - if we decide to fetch cron events less frequently we should wake up that
|
||
-- thread at second 0 of every minute, and then pass hasura's now time into
|
||
-- the query (since the DB may disagree about the time)
|
||
getScheduledEventsForDeliveryTx :: [TriggerName] -> PG.TxE QErr ([CronEvent], [OneOffScheduledEvent])
|
||
getScheduledEventsForDeliveryTx cronTriggerNames =
|
||
(,) <$> getCronEventsForDelivery <*> getOneOffEventsForDelivery
|
||
where
|
||
getCronEventsForDelivery :: PG.TxE QErr [CronEvent]
|
||
getCronEventsForDelivery =
|
||
map (PG.getViaJSON . runIdentity)
|
||
<$> PG.withQE
|
||
defaultTxErrorHandler
|
||
[PG.sql|
|
||
WITH cte AS
|
||
( UPDATE hdb_catalog.hdb_cron_events
|
||
SET status = 'locked'
|
||
WHERE id IN ( SELECT t.id
|
||
FROM hdb_catalog.hdb_cron_events t
|
||
WHERE ( t.status = 'scheduled'
|
||
and (
|
||
(t.next_retry_at is NULL and t.scheduled_time <= now()) or
|
||
(t.next_retry_at is not NULL and t.next_retry_at <= now())
|
||
)
|
||
) AND trigger_name = ANY($1)
|
||
FOR UPDATE SKIP LOCKED
|
||
)
|
||
RETURNING *
|
||
)
|
||
SELECT row_to_json(t.*) FROM cte AS t
|
||
|]
|
||
(Identity $ PGTextArray $ triggerNameToTxt <$> cronTriggerNames)
|
||
True
|
||
|
||
getOneOffEventsForDelivery :: PG.TxE QErr [OneOffScheduledEvent]
|
||
getOneOffEventsForDelivery = do
|
||
map (PG.getViaJSON . runIdentity)
|
||
<$> PG.withQE
|
||
defaultTxErrorHandler
|
||
[PG.sql|
|
||
WITH cte AS (
|
||
UPDATE hdb_catalog.hdb_scheduled_events
|
||
SET status = 'locked'
|
||
WHERE id IN ( SELECT t.id
|
||
FROM hdb_catalog.hdb_scheduled_events t
|
||
WHERE ( t.status = 'scheduled'
|
||
and (
|
||
(t.next_retry_at is NULL and t.scheduled_time <= now()) or
|
||
(t.next_retry_at is not NULL and t.next_retry_at <= now())
|
||
)
|
||
)
|
||
FOR UPDATE SKIP LOCKED
|
||
)
|
||
RETURNING *
|
||
)
|
||
SELECT row_to_json(t.*) FROM cte AS t
|
||
|]
|
||
()
|
||
False
|
||
|
||
insertInvocationTx :: Invocation 'ScheduledType -> ScheduledEventType -> PG.TxE QErr ()
|
||
insertInvocationTx invo type' = do
|
||
case type' of
|
||
Cron -> do
|
||
PG.unitQE
|
||
defaultTxErrorHandler
|
||
[PG.sql|
|
||
INSERT INTO hdb_catalog.hdb_cron_event_invocation_logs
|
||
(event_id, status, request, response)
|
||
VALUES ($1, $2, $3, $4)
|
||
|]
|
||
( iEventId invo,
|
||
fromIntegral <$> iStatus invo :: Maybe Int64,
|
||
PG.ViaJSON $ J.toJSON $ iRequest invo,
|
||
PG.ViaJSON $ J.toJSON $ iResponse invo
|
||
)
|
||
True
|
||
PG.unitQE
|
||
defaultTxErrorHandler
|
||
[PG.sql|
|
||
UPDATE hdb_catalog.hdb_cron_events
|
||
SET tries = tries + 1
|
||
WHERE id = $1
|
||
|]
|
||
(Identity $ iEventId invo)
|
||
True
|
||
OneOff -> do
|
||
PG.unitQE
|
||
defaultTxErrorHandler
|
||
[PG.sql|
|
||
INSERT INTO hdb_catalog.hdb_scheduled_event_invocation_logs
|
||
(event_id, status, request, response)
|
||
VALUES ($1, $2, $3, $4)
|
||
|]
|
||
( iEventId invo,
|
||
fromIntegral <$> iStatus invo :: Maybe Int64,
|
||
PG.ViaJSON $ J.toJSON $ iRequest invo,
|
||
PG.ViaJSON $ J.toJSON $ iResponse invo
|
||
)
|
||
True
|
||
PG.unitQE
|
||
defaultTxErrorHandler
|
||
[PG.sql|
|
||
UPDATE hdb_catalog.hdb_scheduled_events
|
||
SET tries = tries + 1
|
||
WHERE id = $1
|
||
|]
|
||
(Identity $ iEventId invo)
|
||
True
|
||
|
||
setScheduledEventOpTx ::
|
||
ScheduledEventId -> ScheduledEventOp -> ScheduledEventType -> PG.TxE QErr ()
|
||
setScheduledEventOpTx eventId op type' = case op of
|
||
SEOpRetry time -> setRetry time
|
||
SEOpStatus status -> setStatus status
|
||
where
|
||
setRetry time =
|
||
case type' of
|
||
Cron ->
|
||
PG.unitQE
|
||
defaultTxErrorHandler
|
||
[PG.sql|
|
||
UPDATE hdb_catalog.hdb_cron_events
|
||
SET next_retry_at = $1,
|
||
STATUS = 'scheduled'
|
||
WHERE id = $2
|
||
|]
|
||
(time, eventId)
|
||
True
|
||
OneOff ->
|
||
PG.unitQE
|
||
defaultTxErrorHandler
|
||
[PG.sql|
|
||
UPDATE hdb_catalog.hdb_scheduled_events
|
||
SET next_retry_at = $1,
|
||
STATUS = 'scheduled'
|
||
WHERE id = $2
|
||
|]
|
||
(time, eventId)
|
||
True
|
||
setStatus status =
|
||
case type' of
|
||
Cron -> do
|
||
PG.unitQE
|
||
defaultTxErrorHandler
|
||
[PG.sql|
|
||
UPDATE hdb_catalog.hdb_cron_events
|
||
SET status = $2
|
||
WHERE id = $1
|
||
|]
|
||
(eventId, status)
|
||
True
|
||
OneOff -> do
|
||
PG.unitQE
|
||
defaultTxErrorHandler
|
||
[PG.sql|
|
||
UPDATE hdb_catalog.hdb_scheduled_events
|
||
SET status = $2
|
||
WHERE id = $1
|
||
|]
|
||
(eventId, status)
|
||
True
|
||
|
||
unlockScheduledEventsTx :: ScheduledEventType -> [ScheduledEventId] -> PG.TxE QErr Int
|
||
unlockScheduledEventsTx type' eventIds =
|
||
let eventIdsTextArray = map unEventId eventIds
|
||
in case type' of
|
||
Cron ->
|
||
(runIdentity . PG.getRow)
|
||
<$> PG.withQE
|
||
defaultTxErrorHandler
|
||
[PG.sql|
|
||
WITH "cte" AS
|
||
(UPDATE hdb_catalog.hdb_cron_events
|
||
SET status = 'scheduled'
|
||
WHERE id = ANY($1::text[]) and status = 'locked'
|
||
RETURNING *)
|
||
SELECT count(*) FROM "cte"
|
||
|]
|
||
(Identity $ PGTextArray eventIdsTextArray)
|
||
True
|
||
OneOff ->
|
||
(runIdentity . PG.getRow)
|
||
<$> PG.withQE
|
||
defaultTxErrorHandler
|
||
[PG.sql|
|
||
WITH "cte" AS
|
||
(UPDATE hdb_catalog.hdb_scheduled_events
|
||
SET status = 'scheduled'
|
||
WHERE id = ANY($1::text[]) AND status = 'locked'
|
||
RETURNING *)
|
||
SELECT count(*) FROM "cte"
|
||
|]
|
||
(Identity $ PGTextArray eventIdsTextArray)
|
||
True
|
||
|
||
unlockAllLockedScheduledEventsTx :: PG.TxE QErr ()
|
||
unlockAllLockedScheduledEventsTx = do
|
||
PG.unitQE
|
||
defaultTxErrorHandler
|
||
[PG.sql|
|
||
UPDATE hdb_catalog.hdb_cron_events
|
||
SET status = 'scheduled'
|
||
WHERE status = 'locked'
|
||
|]
|
||
()
|
||
True
|
||
PG.unitQE
|
||
defaultTxErrorHandler
|
||
[PG.sql|
|
||
UPDATE hdb_catalog.hdb_scheduled_events
|
||
SET status = 'scheduled'
|
||
WHERE status = 'locked'
|
||
|]
|
||
()
|
||
True
|
||
|
||
insertCronEventsTx :: [CronEventSeed] -> PG.TxE QErr ()
|
||
insertCronEventsTx cronSeeds = do
|
||
let insertCronEventsSql =
|
||
TB.run
|
||
$ toSQL
|
||
S.SQLInsert
|
||
{ siTable = cronEventsTable,
|
||
siCols = map unsafePGCol ["trigger_name", "scheduled_time"],
|
||
siValues = S.ValuesExp $ map (toTupleExp . toArr) cronSeeds,
|
||
siConflict = Just $ S.DoNothing Nothing,
|
||
siRet = Nothing
|
||
}
|
||
PG.unitQE defaultTxErrorHandler (PG.fromText insertCronEventsSql) () False
|
||
where
|
||
toArr (CronEventSeed n t) = [(triggerNameToTxt n), (formatTime' t)]
|
||
toTupleExp = S.TupleExp . map S.SELit
|
||
|
||
insertOneOffScheduledEventTx :: OneOffEvent -> PG.TxE QErr EventId
|
||
insertOneOffScheduledEventTx CreateScheduledEvent {..} =
|
||
runIdentity
|
||
. PG.getRow
|
||
<$> PG.withQE
|
||
defaultTxErrorHandler
|
||
[PG.sql|
|
||
INSERT INTO hdb_catalog.hdb_scheduled_events
|
||
(webhook_conf,scheduled_time,payload,retry_conf,header_conf,comment)
|
||
VALUES
|
||
($1, $2, $3, $4, $5, $6) RETURNING id
|
||
|]
|
||
( PG.ViaJSON cseWebhook,
|
||
cseScheduleAt,
|
||
PG.ViaJSON csePayload,
|
||
PG.ViaJSON cseRetryConf,
|
||
PG.ViaJSON cseHeaders,
|
||
cseComment
|
||
)
|
||
False
|
||
|
||
dropFutureCronEventsTx :: ClearCronEvents -> PG.TxE QErr ()
|
||
dropFutureCronEventsTx = \case
|
||
SingleCronTrigger triggerName ->
|
||
PG.unitQE
|
||
defaultTxErrorHandler
|
||
[PG.sql|
|
||
DELETE FROM hdb_catalog.hdb_cron_events
|
||
WHERE trigger_name = $1 AND scheduled_time > now() AND tries = 0
|
||
|]
|
||
(Identity triggerName)
|
||
True
|
||
MetadataCronTriggers triggerNames ->
|
||
PG.unitQE
|
||
defaultTxErrorHandler
|
||
[PG.sql|
|
||
DELETE FROM hdb_catalog.hdb_cron_events
|
||
WHERE scheduled_time > now() AND tries = 0 AND trigger_name = ANY($1::text[])
|
||
|]
|
||
(Identity $ PGTextArray $ map triggerNameToTxt triggerNames)
|
||
False
|
||
|
||
cronEventsTable :: QualifiedTable
|
||
cronEventsTable =
|
||
QualifiedObject "hdb_catalog" $ TableName "hdb_cron_events"
|
||
|
||
mkScheduledEventStatusFilter :: [ScheduledEventStatus] -> S.BoolExp
|
||
mkScheduledEventStatusFilter = \case
|
||
[] -> S.BELit True
|
||
v ->
|
||
S.BEIN (S.SEIdentifier $ Identifier "status")
|
||
$ map (S.SELit . scheduledEventStatusToText) v
|
||
|
||
scheduledTimeOrderBy :: S.OrderByExp
|
||
scheduledTimeOrderBy =
|
||
let scheduledTimeCol = S.SEIdentifier $ Identifier "scheduled_time"
|
||
in S.OrderByExp
|
||
$ flip (NE.:|) []
|
||
$ S.OrderByItem
|
||
scheduledTimeCol
|
||
(Just S.OTAsc)
|
||
Nothing
|
||
|
||
-- | Build a select expression which outputs total count and
|
||
-- list of json rows with pagination limit and offset applied
|
||
mkPaginationSelectExp ::
|
||
S.Select ->
|
||
ScheduledEventPagination ->
|
||
RowsCountOption ->
|
||
S.Select
|
||
mkPaginationSelectExp allRowsSelect ScheduledEventPagination {..} shouldIncludeRowsCount =
|
||
S.mkSelect
|
||
{ S.selCTEs = [(countCteAlias, S.ICTESelect allRowsSelect), (limitCteAlias, limitCteSelect)],
|
||
S.selExtr =
|
||
case shouldIncludeRowsCount of
|
||
IncludeRowsCount -> [countExtractor, rowsExtractor]
|
||
DontIncludeRowsCount -> [rowsExtractor]
|
||
}
|
||
where
|
||
countCteAlias = S.mkTableAlias "count_cte"
|
||
limitCteAlias = S.mkTableAlias "limit_cte"
|
||
|
||
countExtractor =
|
||
let selectExp =
|
||
S.mkSelect
|
||
{ S.selExtr = [S.Extractor S.countStar Nothing],
|
||
S.selFrom = Just $ S.mkIdenFromExp (S.tableAliasToIdentifier countCteAlias)
|
||
}
|
||
in S.Extractor (S.SESelect selectExp) Nothing
|
||
|
||
limitCteSelect =
|
||
S.ICTESelect
|
||
S.mkSelect
|
||
{ S.selExtr = [S.selectStar],
|
||
S.selFrom = Just $ S.mkIdenFromExp (S.tableAliasToIdentifier countCteAlias),
|
||
S.selLimit = (S.LimitExp . S.intToSQLExp) <$> _sepLimit,
|
||
S.selOffset = (S.OffsetExp . S.intToSQLExp) <$> _sepOffset
|
||
}
|
||
|
||
rowsExtractor =
|
||
let jsonAgg = S.SEUnsafe "json_agg(row_to_json(limit_cte.*))"
|
||
selectExp =
|
||
S.mkSelect
|
||
{ S.selExtr = [S.Extractor jsonAgg Nothing],
|
||
S.selFrom = Just $ S.mkIdenFromExp (S.tableAliasToIdentifier limitCteAlias)
|
||
}
|
||
in S.Extractor (S.handleIfNull (S.SELit "[]") (S.SESelect selectExp)) Nothing
|
||
|
||
withCount :: (Int, PG.ViaJSON a) -> WithOptionalTotalCount a
|
||
withCount (count, PG.ViaJSON a) = WithOptionalTotalCount (Just count) a
|
||
|
||
withoutCount :: PG.ViaJSON a -> WithOptionalTotalCount a
|
||
withoutCount (PG.ViaJSON a) = WithOptionalTotalCount Nothing a
|
||
|
||
executeWithOptionalTotalCount :: (J.FromJSON a) => PG.Query -> RowsCountOption -> PG.TxE QErr (WithOptionalTotalCount a)
|
||
executeWithOptionalTotalCount sql getRowsCount =
|
||
case getRowsCount of
|
||
IncludeRowsCount -> (withCount . PG.getRow) <$> PG.withQE defaultTxErrorHandler sql () False
|
||
DontIncludeRowsCount -> (withoutCount . runIdentity . PG.getRow) <$> PG.withQE defaultTxErrorHandler sql () False
|
||
|
||
getOneOffScheduledEventsTx ::
|
||
ScheduledEventPagination ->
|
||
[ScheduledEventStatus] ->
|
||
RowsCountOption ->
|
||
PG.TxE QErr (WithOptionalTotalCount [OneOffScheduledEvent])
|
||
getOneOffScheduledEventsTx pagination statuses getRowsCount = do
|
||
let table = QualifiedObject "hdb_catalog" $ TableName "hdb_scheduled_events"
|
||
statusFilter = mkScheduledEventStatusFilter statuses
|
||
select =
|
||
S.mkSelect
|
||
{ S.selExtr = [S.selectStar],
|
||
S.selFrom = Just $ S.mkSimpleFromExp table,
|
||
S.selWhere = Just $ S.WhereFrag statusFilter,
|
||
S.selOrderBy = Just scheduledTimeOrderBy
|
||
}
|
||
sql = PG.fromBuilder $ toSQL $ mkPaginationSelectExp select pagination getRowsCount
|
||
executeWithOptionalTotalCount sql getRowsCount
|
||
|
||
getCronEventsTx ::
|
||
TriggerName ->
|
||
ScheduledEventPagination ->
|
||
[ScheduledEventStatus] ->
|
||
RowsCountOption ->
|
||
PG.TxE QErr (WithOptionalTotalCount [CronEvent])
|
||
getCronEventsTx triggerName pagination status getRowsCount = do
|
||
let triggerNameFilter =
|
||
S.BECompare S.SEQ (S.SEIdentifier $ Identifier "trigger_name") (S.SELit $ triggerNameToTxt triggerName)
|
||
statusFilter = mkScheduledEventStatusFilter status
|
||
select =
|
||
S.mkSelect
|
||
{ S.selExtr = [S.selectStar],
|
||
S.selFrom = Just $ S.mkSimpleFromExp cronEventsTable,
|
||
S.selWhere = Just $ S.WhereFrag $ S.BEBin S.AndOp triggerNameFilter statusFilter,
|
||
S.selOrderBy = Just scheduledTimeOrderBy
|
||
}
|
||
sql = PG.fromBuilder $ toSQL $ mkPaginationSelectExp select pagination getRowsCount
|
||
executeWithOptionalTotalCount sql getRowsCount
|
||
|
||
deleteScheduledEventTx ::
|
||
ScheduledEventId -> ScheduledEventType -> PG.TxE QErr ()
|
||
deleteScheduledEventTx eventId = \case
|
||
OneOff ->
|
||
PG.unitQE
|
||
defaultTxErrorHandler
|
||
[PG.sql|
|
||
DELETE FROM hdb_catalog.hdb_scheduled_events
|
||
WHERE id = $1
|
||
|]
|
||
(Identity eventId)
|
||
False
|
||
Cron ->
|
||
PG.unitQE
|
||
defaultTxErrorHandler
|
||
[PG.sql|
|
||
DELETE FROM hdb_catalog.hdb_cron_events
|
||
WHERE id = $1
|
||
|]
|
||
(Identity eventId)
|
||
False
|
||
|
||
invocationFieldExtractors :: QualifiedTable -> [S.Extractor]
|
||
invocationFieldExtractors table =
|
||
[ S.Extractor (seIden "id") Nothing,
|
||
S.Extractor (seIden "event_id") Nothing,
|
||
S.Extractor (seIden "status") Nothing,
|
||
S.Extractor (withJsonTypeAnn $ seIden "request") Nothing,
|
||
S.Extractor (withJsonTypeAnn $ seIden "response") Nothing,
|
||
S.Extractor (seIden "created_at") Nothing
|
||
]
|
||
where
|
||
withJsonTypeAnn e = S.SETyAnn e $ S.TypeAnn "json"
|
||
seIden = S.SEQIdentifier . S.mkQIdentifierTable table . Identifier
|
||
|
||
mkEventIdBoolExp :: QualifiedTable -> EventId -> S.BoolExp
|
||
mkEventIdBoolExp table eventId =
|
||
S.BECompare
|
||
S.SEQ
|
||
(S.SEQIdentifier $ S.mkQIdentifierTable table $ Identifier "event_id")
|
||
(S.SELit $ unEventId eventId)
|
||
|
||
getScheduledEventInvocationsTx ::
|
||
GetScheduledEventInvocations ->
|
||
PG.TxE QErr (WithOptionalTotalCount [ScheduledEventInvocation])
|
||
getScheduledEventInvocationsTx getEventInvocations = do
|
||
let eventsTables = EventTables oneOffInvocationsTable cronInvocationsTable cronEventsTable
|
||
sql = PG.fromBuilder $ toSQL $ getScheduledEventsInvocationsQuery eventsTables getEventInvocations
|
||
executeWithOptionalTotalCount sql (_geiGetRowsCount getEventInvocations)
|
||
where
|
||
oneOffInvocationsTable = QualifiedObject "hdb_catalog" $ TableName "hdb_scheduled_event_invocation_logs"
|
||
cronInvocationsTable = QualifiedObject "hdb_catalog" $ TableName "hdb_cron_event_invocation_logs"
|
||
|
||
data EventTables = EventTables
|
||
{ etOneOffInvocationsTable :: QualifiedTable,
|
||
etCronInvocationsTable :: QualifiedTable,
|
||
etCronEventsTable :: QualifiedTable
|
||
}
|
||
|
||
getScheduledEventsInvocationsQueryNoPagination :: EventTables -> GetScheduledEventInvocationsBy -> S.Select
|
||
getScheduledEventsInvocationsQueryNoPagination (EventTables oneOffInvocationsTable cronInvocationsTable cronEventsTable') invocationsBy =
|
||
allRowsSelect
|
||
where
|
||
createdAtOrderBy table =
|
||
let createdAtCol = S.SEQIdentifier $ S.mkQIdentifierTable table $ Identifier "created_at"
|
||
in S.OrderByExp $ flip (NE.:|) [] $ S.OrderByItem createdAtCol (Just S.OTDesc) Nothing
|
||
|
||
allRowsSelect = case invocationsBy of
|
||
GIBEventId eventId eventType ->
|
||
let table = case eventType of
|
||
OneOff -> oneOffInvocationsTable
|
||
Cron -> cronInvocationsTable
|
||
in S.mkSelect
|
||
{ S.selExtr = invocationFieldExtractors table,
|
||
S.selFrom = Just $ S.mkSimpleFromExp table,
|
||
S.selOrderBy = Just $ createdAtOrderBy table,
|
||
S.selWhere = Just $ S.WhereFrag $ mkEventIdBoolExp table eventId
|
||
}
|
||
GIBEvent event -> case event of
|
||
SEOneOff ->
|
||
let table = oneOffInvocationsTable
|
||
in S.mkSelect
|
||
{ S.selExtr = invocationFieldExtractors table,
|
||
S.selFrom = Just $ S.mkSimpleFromExp table,
|
||
S.selOrderBy = Just $ createdAtOrderBy table
|
||
}
|
||
SECron triggerName ->
|
||
let invocationTable = cronInvocationsTable
|
||
eventTable = cronEventsTable'
|
||
joinCondition =
|
||
S.JoinOn
|
||
$ S.BECompare
|
||
S.SEQ
|
||
(S.SEQIdentifier $ S.mkQIdentifierTable eventTable $ Identifier "id")
|
||
(S.SEQIdentifier $ S.mkQIdentifierTable invocationTable $ Identifier "event_id")
|
||
joinTables =
|
||
S.JoinExpr
|
||
(S.FISimple invocationTable Nothing)
|
||
S.Inner
|
||
(S.FISimple eventTable Nothing)
|
||
joinCondition
|
||
triggerBoolExp =
|
||
S.BECompare
|
||
S.SEQ
|
||
(S.SEQIdentifier $ S.mkQIdentifierTable eventTable (Identifier "trigger_name"))
|
||
(S.SELit $ triggerNameToTxt triggerName)
|
||
in S.mkSelect
|
||
{ S.selExtr = invocationFieldExtractors invocationTable,
|
||
S.selFrom = Just $ S.FromExp [S.FIJoin joinTables],
|
||
S.selWhere = Just $ S.WhereFrag triggerBoolExp,
|
||
S.selOrderBy = Just $ createdAtOrderBy invocationTable
|
||
}
|
||
|
||
getScheduledEventsInvocationsQuery :: EventTables -> GetScheduledEventInvocations -> S.Select
|
||
getScheduledEventsInvocationsQuery eventTables (GetScheduledEventInvocations invocationsBy pagination shouldIncludeRowsCount) =
|
||
let invocationsSelect = getScheduledEventsInvocationsQueryNoPagination eventTables invocationsBy
|
||
in mkPaginationSelectExp invocationsSelect pagination shouldIncludeRowsCount
|
||
|
||
-- | Logger to accumulate stats of fetched scheduled events over a period of time and log once using @'L.Logger L.Hasura'.
|
||
-- See @'createStatsLogger' for more details.
|
||
createFetchedScheduledEventsStatsLogger :: (MonadIO m) => L.Logger L.Hasura -> m FetchedScheduledEventsStatsLogger
|
||
createFetchedScheduledEventsStatsLogger = L.createStatsLogger
|
||
|
||
-- | Close the fetched scheduled events stats logger.
|
||
closeFetchedScheduledEventsStatsLogger ::
|
||
(MonadIO m) => L.Logger L.Hasura -> FetchedScheduledEventsStatsLogger -> m ()
|
||
closeFetchedScheduledEventsStatsLogger = L.closeStatsLogger L.scheduledTriggerProcessLogType
|
||
|
||
-- | Log statistics of fetched scheduled events. See @'logStats' for more details.
|
||
logFetchedScheduledEventsStats ::
|
||
(MonadIO m) =>
|
||
FetchedScheduledEventsStatsLogger ->
|
||
CronEventsCount ->
|
||
OneOffScheduledEventsCount ->
|
||
m ()
|
||
logFetchedScheduledEventsStats logger cron oneOff =
|
||
L.logStats logger (FetchedScheduledEventsStats cron oneOff 1)
|
||
|
||
-- | Logger to accumulate stats of fetched cron triggers, for generating cron events, over a period of time and
|
||
-- log once using @'L.Logger L.Hasura'.
|
||
-- See @'createStatsLogger' for more details.
|
||
createFetchedCronTriggerStatsLogger :: (MonadIO m) => L.Logger L.Hasura -> m FetchedCronTriggerStatsLogger
|
||
createFetchedCronTriggerStatsLogger = L.createStatsLogger
|
||
|
||
-- | Close the fetched cron trigger stats logger.
|
||
closeFetchedCronTriggersStatsLogger ::
|
||
(MonadIO m) => L.Logger L.Hasura -> FetchedCronTriggerStatsLogger -> m ()
|
||
closeFetchedCronTriggersStatsLogger = L.closeStatsLogger L.cronEventGeneratorProcessType
|
||
|
||
-- | Log statistics of fetched cron triggers. See @'logStats' for more details.
|
||
logFetchedCronTriggersStats ::
|
||
(MonadIO m) =>
|
||
FetchedCronTriggerStatsLogger ->
|
||
[CronTriggerStats] ->
|
||
m ()
|
||
logFetchedCronTriggersStats logger cronTriggerStats =
|
||
L.logStats logger (FetchedCronTriggerStats cronTriggerStats 1)
|