{-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE QuasiQuotes #-} -- | -- = Scheduled Triggers -- -- This module implements the functionality of invoking webhooks during specified -- time events aka scheduled events. The scheduled events are the events generated -- by the graphql-engine using the cron triggers or/and a scheduled event can -- be created by the user at a specified time with the payload, webhook, headers -- and the retry configuration. Scheduled events are modeled using rows in Postgres -- with a @timestamp@ column. -- -- This module implements scheduling and delivery of scheduled -- events: -- -- 1. Scheduling a cron event involves creating new cron events. New -- cron events are created based on the cron schedule and the number of -- scheduled events that are already present in the scheduled events buffer. -- The graphql-engine computes the new scheduled events and writes them to -- the database.(Generator) -- -- 2. Delivering a scheduled event involves reading undelivered scheduled events -- from the database and delivering them to the webhook server. (Processor) -- -- The rationale behind separating the event scheduling and event delivery -- mechanism into two different threads is that the scheduling and delivering of -- the scheduled events are not directly dependent on each other. The generator -- will almost always try to create scheduled events which are supposed to be -- delivered in the future (timestamp > current_timestamp) and the processor -- will fetch scheduled events of the past (timestamp < current_timestamp). So, -- the set of the scheduled events generated by the generator and the processor -- will never be the same. The point here is that they're not correlated to each -- other. They can be split into different threads for a better performance. -- -- == Implementation -- -- The scheduled triggers eventing is being implemented in the metadata storage. -- All functions that make interaction to storage system are abstracted in -- the @'MonadMetadataStorage' class. -- -- During the startup, two threads are started: -- -- 1. Generator: Fetches the list of scheduled triggers from cache and generates -- the scheduled events. -- -- - Additional events will be generated only if there are fewer than 100 -- scheduled events. -- -- - The upcoming events timestamp will be generated using: -- -- - cron schedule of the scheduled trigger -- -- - max timestamp of the scheduled events that already exist or -- current_timestamp(when no scheduled events exist) -- -- - The timestamp of the scheduled events is stored with timezone because -- `SELECT NOW()` returns timestamp with timezone, so it's good to -- compare two things of the same type. -- -- This effectively corresponds to doing an INSERT with values containing -- specific timestamp. -- -- 2. Processor: Fetches the undelivered cron events and the scheduled events -- from the database and which have timestamp lesser than the -- current timestamp and then process them. -- -- TODO -- - Consider and document ordering guarantees -- - do we have any in the presence of multiple hasura instances? -- - If we have nothing useful to say about ordering, then consider processing -- events asynchronously, so that a slow webhook doesn't cause everything -- subsequent to be delayed module Hasura.Eventing.ScheduledTrigger ( runCronEventsGenerator, processScheduledTriggers, generateScheduleTimes, CronEventSeed (..), LockedEventsCtx (..), -- * Cron trigger stats logger createFetchedCronTriggerStatsLogger, closeFetchedCronTriggersStatsLogger, -- * Scheduled events stats logger createFetchedScheduledEventsStatsLogger, closeFetchedScheduledEventsStatsLogger, -- * Database interactions -- Following function names are similar to those present in -- 'MonadMetadataStorage' type class. To avoid duplication, -- 'Tx' is suffixed to identify as database transactions getDeprivedCronTriggerStatsTx, getScheduledEventsForDeliveryTx, insertInvocationTx, setScheduledEventOpTx, unlockScheduledEventsTx, unlockAllLockedScheduledEventsTx, insertCronEventsTx, insertOneOffScheduledEventTx, dropFutureCronEventsTx, getOneOffScheduledEventsTx, getCronEventsTx, deleteScheduledEventTx, getScheduledEventInvocationsTx, getScheduledEventsInvocationsQuery, getScheduledEventsInvocationsQueryNoPagination, -- * Export utility functions which are useful to build -- SQLs for fetching data from metadata storage mkScheduledEventStatusFilter, scheduledTimeOrderBy, executeWithOptionalTotalCount, mkPaginationSelectExp, withCount, invocationFieldExtractors, mkEventIdBoolExp, EventTables (..), ) where import Control.Concurrent.Async.Lifted (forConcurrently_) import Control.Concurrent.Extended (Forever (..), sleep) import Control.Concurrent.STM import Control.Lens (preview) import Control.Monad.Trans.Control (MonadBaseControl) import Data.Aeson qualified as J import Data.Environment qualified as Env import Data.Has import Data.HashMap.Strict qualified as Map import Data.Int (Int64) import Data.List.NonEmpty qualified as NE import Data.SerializableBlob qualified as SB import Data.Set qualified as Set import Data.Text qualified as T import Data.Text.Extended (ToTxt (..), (<<>)) import Data.These import Data.Time.Clock import Data.URL.Template (printURLTemplate) import Database.PG.Query qualified as PG import Hasura.Backends.Postgres.Execute.Types import Hasura.Backends.Postgres.SQL.DML qualified as S import Hasura.Backends.Postgres.SQL.Types import Hasura.Base.Error import Hasura.Eventing.Common import Hasura.Eventing.HTTP import Hasura.Eventing.ScheduledTrigger.Types import Hasura.HTTP (getHTTPExceptionStatus) import Hasura.Logging qualified as L import Hasura.Metadata.Class import Hasura.Prelude import Hasura.RQL.DDL.EventTrigger (ResolveHeaderError, getHeaderInfosFromConfEither) import Hasura.RQL.DDL.Headers import Hasura.RQL.DDL.Webhook.Transform import Hasura.RQL.Types.Common import Hasura.RQL.Types.EventTrigger import Hasura.RQL.Types.Eventing import Hasura.RQL.Types.ScheduledTrigger import Hasura.RQL.Types.SchemaCache import Hasura.SQL.Types import Hasura.Server.Prometheus (ScheduledTriggerMetrics (..)) import Hasura.Tracing qualified as Tracing import Network.HTTP.Client.Transformable qualified as HTTP import Refined (unrefine) import System.Metrics.Prometheus.Counter as Prometheus.Counter import System.Timeout.Lifted (timeout) import Text.Builder qualified as TB -- | runCronEventsGenerator makes sure that all the cron triggers -- have an adequate buffer of cron events. runCronEventsGenerator :: ( MonadIO m, MonadMetadataStorage m ) => L.Logger L.Hasura -> FetchedCronTriggerStatsLogger -> IO SchemaCache -> m void runCronEventsGenerator logger cronTriggerStatsLogger getSC = do forever $ do sc <- liftIO getSC -- get cron triggers from cache let cronTriggersCache = scCronTriggers sc unless (Map.null cronTriggersCache) $ do -- Poll the DB only when there's at-least one cron trigger present -- in the schema cache -- get cron trigger stats from db -- When shutdown is initiated, we stop generating new cron events eitherRes <- runExceptT $ do deprivedCronTriggerStats <- liftEitherM $ getDeprivedCronTriggerStats $ Map.keys cronTriggersCache -- Log fetched deprived cron trigger stats logFetchedCronTriggersStats cronTriggerStatsLogger deprivedCronTriggerStats -- join stats with cron triggers and produce @[(CronTriggerInfo, CronTriggerStats)]@ cronTriggersForHydrationWithStats <- catMaybes <$> mapM (withCronTrigger cronTriggersCache) deprivedCronTriggerStats insertCronEventsFor cronTriggersForHydrationWithStats onLeft eitherRes $ L.unLogger logger . ScheduledTriggerInternalErr -- See discussion: https://github.com/hasura/graphql-engine-mono/issues/1001 liftIO $ sleep (minutes 1) where withCronTrigger cronTriggerCache cronTriggerStat = do case Map.lookup (_ctsName cronTriggerStat) cronTriggerCache of Nothing -> do L.unLogger logger $ ScheduledTriggerInternalErr $ err500 Unexpected "could not find scheduled trigger in the schema cache" pure Nothing Just cronTrigger -> pure $ Just (cronTrigger, cronTriggerStat) insertCronEventsFor :: (MonadMetadataStorage m, MonadError QErr m) => [(CronTriggerInfo, CronTriggerStats)] -> m () insertCronEventsFor cronTriggersWithStats = do let scheduledEvents = flip concatMap cronTriggersWithStats $ \(cti, stats) -> generateCronEventsFrom (_ctsMaxScheduledTime stats) cti case scheduledEvents of [] -> pure () events -> liftEitherM $ insertCronEvents events generateCronEventsFrom :: UTCTime -> CronTriggerInfo -> [CronEventSeed] generateCronEventsFrom startTime CronTriggerInfo {..} = map (CronEventSeed ctiName) $ -- generate next 100 events; see getDeprivedCronTriggerStatsTx: generateScheduleTimes startTime 100 ctiSchedule -- | `upperBoundScheduledEventTimeout` is the maximum amount of time -- a scheduled event can take to process. This function is intended -- to use with a timeout. upperBoundScheduledEventTimeout :: DiffTime upperBoundScheduledEventTimeout = minutes 30 processCronEvents :: ( MonadIO m, MonadMetadataStorage m, Tracing.MonadTrace m, MonadBaseControl IO m ) => L.Logger L.Hasura -> HTTP.Manager -> ScheduledTriggerMetrics -> [CronEvent] -> HashMap TriggerName CronTriggerInfo -> TVar (Set.Set CronEventId) -> m () processCronEvents logger httpMgr scheduledTriggerMetrics cronEvents cronTriggersInfo lockedCronEvents = do -- save the locked cron events that have been fetched from the -- database, the events stored here will be unlocked in case a -- graceful shutdown is initiated in midst of processing these events saveLockedEvents (map _ceId cronEvents) lockedCronEvents -- The `createdAt` of a cron event is the `created_at` of the cron trigger forConcurrently_ cronEvents $ \(CronEvent id' name st _ tries _ _) -> do case Map.lookup name cronTriggersInfo of Nothing -> logInternalError $ err500 Unexpected $ "could not find cron trigger " <> name <<> " in the schema cache" Just CronTriggerInfo {..} -> do let payload = ScheduledEventWebhookPayload id' (Just name) st (fromMaybe J.Null ctiPayload) ctiComment Nothing ctiRequestTransform ctiResponseTransform retryCtx = RetryContext tries ctiRetryConf eventProcessingTimeout = min upperBoundScheduledEventTimeout (unrefine $ strcTimeoutSeconds $ ctiRetryConf) processScheduledEventAction = runExceptT $ flip runReaderT (logger, httpMgr) $ processScheduledEvent scheduledTriggerMetrics id' ctiHeaders retryCtx payload ctiWebhookInfo Cron eventProcessedMaybe <- timeout (fromInteger (diffTimeToMicroSeconds eventProcessingTimeout)) $ processScheduledEventAction case eventProcessedMaybe of Nothing -> do let eventTimeoutMessage = "Cron Scheduled event " <> id' <<> " of cron trigger " <> name <<> " timed out while processing." eventTimeoutError = err500 TimeoutErrorCode eventTimeoutMessage logInternalError eventTimeoutError runExceptT (processError id' retryCtx [] Cron (mkErrorObject eventTimeoutMessage) (HOther $ T.unpack eventTimeoutMessage) scheduledTriggerMetrics) >>= (`onLeft` logInternalError) Just finally -> onLeft finally logInternalError removeEventFromLockedEvents id' lockedCronEvents where logInternalError err = liftIO . L.unLogger logger $ ScheduledTriggerInternalErr err mkErrorObject :: Text -> J.Value mkErrorObject errorMessage = J.object $ ["error" J..= errorMessage] processOneOffScheduledEvents :: ( MonadIO m, Tracing.MonadTrace m, MonadMetadataStorage m, MonadBaseControl IO m ) => Env.Environment -> L.Logger L.Hasura -> HTTP.Manager -> ScheduledTriggerMetrics -> [OneOffScheduledEvent] -> TVar (Set.Set OneOffScheduledEventId) -> m () processOneOffScheduledEvents env logger httpMgr scheduledTriggerMetrics oneOffEvents lockedOneOffScheduledEvents = do -- save the locked one-off events that have been fetched from the -- database, the events stored here will be unlocked in case a -- graceful shutdown is initiated in midst of processing these events saveLockedEvents (map _ooseId oneOffEvents) lockedOneOffScheduledEvents forConcurrently_ oneOffEvents $ \OneOffScheduledEvent {..} -> do (either logInternalError pure) =<< runExceptT do let payload = ScheduledEventWebhookPayload _ooseId Nothing _ooseScheduledTime (fromMaybe J.Null _oosePayload) _ooseComment (Just _ooseCreatedAt) _ooseRequestTransform _ooseResponseTransform retryCtx = RetryContext _ooseTries _ooseRetryConf resolvedWebhookInfoEither = resolveWebhookEither env _ooseWebhookConf resolvedHeaderInfoEither = getHeaderInfosFromConfEither env _ooseHeaderConf -- `webhookAndHeaderInfo` returns webhook and header info (and errors) webhookAndHeaderInfo = case (resolvedWebhookInfoEither, resolvedHeaderInfoEither) of (Right resolvedEventWebhookInfo, Right resolvedEventHeaderInfo) -> do let resolvedWebhookEnvRecord = EnvRecord (getTemplateFromUrl _ooseWebhookConf) resolvedEventWebhookInfo Right (resolvedWebhookEnvRecord, resolvedEventHeaderInfo) (Left eventWebhookErrorVars, Right _) -> Left $ This eventWebhookErrorVars (Right _, Left eventHeaderErrorVars) -> Left $ That eventHeaderErrorVars (Left eventWebhookErrors, Left eventHeaderErrorVars) -> Left $ These eventWebhookErrors eventHeaderErrorVars case webhookAndHeaderInfo of Right (webhookEnvRecord, eventHeaderInfo) -> do let processScheduledEventAction = flip runReaderT (logger, httpMgr) $ processScheduledEvent scheduledTriggerMetrics _ooseId eventHeaderInfo retryCtx payload webhookEnvRecord OneOff eventTimeout = unrefine $ strcTimeoutSeconds $ _ooseRetryConf -- Try to process the event with a timeout of min(`uppserBoundScheduledEventTimeout`, event's response timeout), -- so that we're never blocked forever while processing a single event. -- -- If the request times out, then process it as an erroneous invocation and move on. timeout (fromInteger (diffTimeToMicroSeconds (min upperBoundScheduledEventTimeout eventTimeout))) processScheduledEventAction `onNothingM` ( do let eventTimeoutMessage = "One-off Scheduled event " <> _ooseId <<> " timed out while processing." eventTimeoutError = err500 TimeoutErrorCode eventTimeoutMessage lift $ logInternalError eventTimeoutError processError _ooseId retryCtx [] OneOff (mkErrorObject eventTimeoutMessage) (HOther $ T.unpack eventTimeoutMessage) scheduledTriggerMetrics ) removeEventFromLockedEvents _ooseId lockedOneOffScheduledEvents Left envVarError -> processError _ooseId retryCtx [] OneOff (mkErrorObject $ "Error creating the request. " <> (mkInvalidEnvVarErrMsg $ envVarError)) (HOther $ T.unpack $ qeError (err400 NotFound (mkInvalidEnvVarErrMsg envVarError))) scheduledTriggerMetrics where logInternalError err = liftIO . L.unLogger logger $ ScheduledTriggerInternalErr err getTemplateFromUrl url = printURLTemplate $ unInputWebhook url mkInvalidEnvVarErrMsg envVarErrorValues = "The value for environment variables not found: " <> (getInvalidEnvVarText envVarErrorValues) mkErrorObject :: Text -> J.Value mkErrorObject errorMessage = J.object $ ["error" J..= errorMessage] getInvalidEnvVarText :: These ResolveWebhookError ResolveHeaderError -> Text getInvalidEnvVarText (This a) = toTxt a getInvalidEnvVarText (That b) = toTxt b getInvalidEnvVarText (These a b) = toTxt a <> ", " <> toTxt b processScheduledTriggers :: ( MonadIO m, Tracing.MonadTrace m, MonadMetadataStorage m, MonadBaseControl IO m ) => IO Env.Environment -> L.Logger L.Hasura -> FetchedScheduledEventsStatsLogger -> HTTP.Manager -> ScheduledTriggerMetrics -> IO SchemaCache -> LockedEventsCtx -> m (Forever m) processScheduledTriggers getEnvHook logger statsLogger httpMgr scheduledTriggerMetrics getSC LockedEventsCtx {..} = do return $ Forever () $ const do cronTriggersInfo <- scCronTriggers <$> liftIO getSC env <- liftIO getEnvHook getScheduledEventsForDelivery (Map.keys cronTriggersInfo) >>= \case Left e -> logInternalError e Right (cronEvents, oneOffEvents) -> do logFetchedScheduledEventsStats statsLogger (CronEventsCount $ length cronEvents) (OneOffScheduledEventsCount $ length oneOffEvents) processCronEvents logger httpMgr scheduledTriggerMetrics cronEvents cronTriggersInfo leCronEvents processOneOffScheduledEvents env logger httpMgr scheduledTriggerMetrics oneOffEvents leOneOffEvents -- NOTE: cron events are scheduled at times with minute resolution (as on -- unix), while one-off events can be set for arbitrary times. The sleep -- time here determines how overdue a scheduled event (cron or one-off) -- might be before we begin processing: liftIO $ sleep (seconds 10) where logInternalError err = liftIO . L.unLogger logger $ ScheduledTriggerInternalErr err processScheduledEvent :: ( MonadReader r m, Has HTTP.Manager r, Has (L.Logger L.Hasura) r, MonadIO m, Tracing.MonadTrace m, MonadMetadataStorage m, MonadError QErr m ) => ScheduledTriggerMetrics -> ScheduledEventId -> [EventHeaderInfo] -> RetryContext -> ScheduledEventWebhookPayload -> EnvRecord ResolvedWebhook -> ScheduledEventType -> m () processScheduledEvent scheduledTriggerMetrics eventId eventHeaders retryCtx payload webhookUrl type' = Tracing.newTrace Tracing.sampleAlways traceNote do currentTime <- liftIO getCurrentTime let retryConf = _rctxConf retryCtx scheduledTime = sewpScheduledTime payload if convertDuration (diffUTCTime currentTime scheduledTime) > unrefine (strcToleranceSeconds retryConf) then processDead eventId type' else do let timeoutSeconds = round $ unrefine (strcTimeoutSeconds retryConf) httpTimeout = HTTP.responseTimeoutMicro (timeoutSeconds * 1000000) (headers, decodedHeaders) = prepareHeaders eventHeaders extraLogCtx = ExtraLogContext eventId (sewpName payload) webhookReqBodyJson = J.toJSON payload webhookReqBody = J.encode webhookReqBodyJson requestTransform = sewpRequestTransform payload responseTransform = mkResponseTransform <$> sewpResponseTransform payload eitherReqRes <- runExceptT $ mkRequest headers httpTimeout webhookReqBody requestTransform (_envVarValue webhookUrl) >>= \reqDetails -> do let request = extractRequest reqDetails logger e d = do logHTTPForST e extraLogCtx d (_envVarName webhookUrl) decodedHeaders liftIO $ do case e of Left _err -> pure () Right response -> Prometheus.Counter.add (stmScheduledTriggerBytesReceived scheduledTriggerMetrics) (hrsSize response) let RequestDetails {_rdOriginalSize, _rdTransformedSize} = d in Prometheus.Counter.add (stmScheduledTriggerBytesSent scheduledTriggerMetrics) (fromMaybe _rdOriginalSize _rdTransformedSize) case (type', e) of (Cron, Left _err) -> Prometheus.Counter.inc (stmCronEventsInvocationTotalFailure scheduledTriggerMetrics) (Cron, Right _) -> Prometheus.Counter.inc (stmCronEventsInvocationTotalSuccess scheduledTriggerMetrics) (OneOff, Left _err) -> Prometheus.Counter.inc (stmOneOffEventsInvocationTotalFailure scheduledTriggerMetrics) (OneOff, Right _) -> Prometheus.Counter.inc (stmOneOffEventsInvocationTotalSuccess scheduledTriggerMetrics) sessionVars = _rdSessionVars reqDetails resp <- invokeRequest reqDetails responseTransform sessionVars logger pure (request, resp) case eitherReqRes of Right (req, resp) -> let reqBody = fromMaybe J.Null $ preview (HTTP.body . HTTP._RequestBodyLBS) req >>= J.decode @J.Value in processSuccess eventId decodedHeaders type' reqBody resp scheduledTriggerMetrics Left (HTTPError reqBody e) -> processError eventId retryCtx decodedHeaders type' reqBody e scheduledTriggerMetrics Left (TransformationError _ e) -> do -- Log The Transformation Error logger :: L.Logger L.Hasura <- asks getter L.unLogger logger $ L.UnstructuredLog L.LevelError (SB.fromLBS $ J.encode e) -- Set event state to Error liftEitherM $ setScheduledEventOp eventId (SEOpStatus SESError) type' where traceNote = "Scheduled trigger" <> foldMap ((": " <>) . triggerNameToTxt) (sewpName payload) processError :: ( MonadIO m, MonadMetadataStorage m, MonadError QErr m ) => ScheduledEventId -> RetryContext -> [HeaderConf] -> ScheduledEventType -> J.Value -> HTTPErr a -> ScheduledTriggerMetrics -> m () processError eventId retryCtx decodedHeaders type' reqJson err scheduledTriggerMetric = do let invocation = case err of HClient httpException -> let statusMaybe = getHTTPExceptionStatus httpException in mkInvocation eventId statusMaybe decodedHeaders (SB.fromLBS $ httpExceptionErrorEncoding httpException) [] reqJson HStatus errResp -> do let respPayload = hrsBody errResp respHeaders = hrsHeaders errResp respStatus = hrsStatus errResp mkInvocation eventId (Just respStatus) decodedHeaders respPayload respHeaders reqJson HOther detail -> do let errMsg = (SB.fromLBS $ J.encode detail) mkInvocation eventId (Just 500) decodedHeaders errMsg [] reqJson liftEitherM $ insertScheduledEventInvocation invocation type' retryOrMarkError eventId retryCtx err type' scheduledTriggerMetric retryOrMarkError :: (MonadIO m, MonadMetadataStorage m, MonadError QErr m) => ScheduledEventId -> RetryContext -> HTTPErr a -> ScheduledEventType -> ScheduledTriggerMetrics -> m () retryOrMarkError eventId retryCtx err type' scheduledTriggerMetric = do let RetryContext tries retryConf = retryCtx mRetryHeader = getRetryAfterHeaderFromHTTPErr err mRetryHeaderSeconds = parseRetryHeaderValue =<< mRetryHeader triesExhausted = tries >= strcNumRetries retryConf noRetryHeader = isNothing mRetryHeaderSeconds if triesExhausted && noRetryHeader then do liftEitherM $ setScheduledEventOp eventId (SEOpStatus SESError) type' case type' of Cron -> liftIO $ Prometheus.Counter.inc (stmCronEventsProcessedTotalFailure scheduledTriggerMetric) OneOff -> liftIO $ Prometheus.Counter.inc (stmOneOffEventsProcessedTotalFailure scheduledTriggerMetric) else do currentTime <- liftIO getCurrentTime let delay = fromMaybe ( round $ unrefine (strcRetryIntervalSeconds retryConf) ) mRetryHeaderSeconds diff = fromIntegral delay retryTime = addUTCTime diff currentTime liftEitherM $ setScheduledEventOp eventId (SEOpRetry retryTime) type' {- Note [Scheduled event lifecycle] ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Scheduled events move between six different states over the course of their lifetime, as represented by the following flowchart: ┌───────────┐ ┌────────┐ ┌───────────┐ │ scheduled │─(1)─→│ locked │─(2)─→│ delivered │ └───────────┘ └────────┘ └───────────┘ ↑ │ ┌───────┐ └────(3)───────┼─────(4)──→│ error │ │ └───────┘ │ ┌──────┐ └─────(5)──→│ dead │ └──────┘ When a scheduled event is first created, it starts in the 'scheduled' state, and it can transition to other states in the following ways: 1. When graphql-engine fetches a scheduled event from the database to process it, it sets its state to 'locked'. This prevents multiple graphql-engine instances running on the same database from processing the same scheduled event concurrently. 2. When a scheduled event is processed successfully, it is marked 'delivered'. 3. If a scheduled event fails to be processed, but it hasn’t yet reached its maximum retry limit, its retry counter is incremented and it is returned to the 'scheduled' state. 4. If a scheduled event fails to be processed and *has* reached its retry limit, its state is set to 'error'. 5. If for whatever reason the difference between the current time and the scheduled time is greater than the tolerance of the scheduled event, it will not be processed and its state will be set to 'dead'. -} processSuccess :: (MonadMetadataStorage m, MonadError QErr m, MonadIO m) => ScheduledEventId -> [HeaderConf] -> ScheduledEventType -> J.Value -> HTTPResp a -> ScheduledTriggerMetrics -> m () processSuccess eventId decodedHeaders type' reqBodyJson resp scheduledTriggerMetric = do let respBody = hrsBody resp respHeaders = hrsHeaders resp respStatus = hrsStatus resp invocation = mkInvocation eventId (Just respStatus) decodedHeaders respBody respHeaders reqBodyJson liftEitherM $ insertScheduledEventInvocation invocation type' liftEitherM $ setScheduledEventOp eventId (SEOpStatus SESDelivered) type' case type' of Cron -> liftIO $ Prometheus.Counter.inc (stmCronEventsProcessedTotalSuccess scheduledTriggerMetric) OneOff -> liftIO $ Prometheus.Counter.inc (stmOneOffEventsProcessedTotalSuccess scheduledTriggerMetric) processDead :: (MonadMetadataStorage m, MonadError QErr m) => ScheduledEventId -> ScheduledEventType -> m () processDead eventId type' = liftEitherM $ setScheduledEventOp eventId (SEOpStatus SESDead) type' mkInvocation :: ScheduledEventId -> Maybe Int -> [HeaderConf] -> SB.SerializableBlob -> [HeaderConf] -> J.Value -> (Invocation 'ScheduledType) mkInvocation eventId status reqHeaders respBody respHeaders reqBodyJson = Invocation eventId status (mkWebhookReq reqBodyJson reqHeaders invocationVersionST) (mkInvocationResp status respBody respHeaders) -- metadata database transactions -- | Get cron trigger stats for cron jobs with fewer than 100 future reified -- events in the database -- -- The point here is to maintain a certain number of future events so the user -- can kind of see what's coming up, and obviously to give 'processCronEvents' -- something to do. getDeprivedCronTriggerStatsTx :: [TriggerName] -> PG.TxE QErr [CronTriggerStats] getDeprivedCronTriggerStatsTx cronTriggerNames = map (\(n, count, maxTx) -> CronTriggerStats n count maxTx) <$> PG.withQE defaultTxErrorHandler [PG.sql| SELECT t.trigger_name, coalesce(q.upcoming_events_count, 0), coalesce(q.max_scheduled_time, now()) FROM (SELECT UNNEST ($1::text[]) as trigger_name) as t LEFT JOIN ( SELECT trigger_name, count(1) as upcoming_events_count, max(scheduled_time) as max_scheduled_time FROM hdb_catalog.hdb_cron_events WHERE tries = 0 and status = 'scheduled' GROUP BY trigger_name ) AS q ON t.trigger_name = q.trigger_name WHERE coalesce(q.upcoming_events_count, 0) < 100 |] (Identity $ PGTextArray $ map triggerNameToTxt cronTriggerNames) True -- TODO -- - cron events have minute resolution, while one-off events have arbitrary -- resolution, so it doesn't make sense to fetch them at the same rate -- - if we decide to fetch cron events less frequently we should wake up that -- thread at second 0 of every minute, and then pass hasura's now time into -- the query (since the DB may disagree about the time) getScheduledEventsForDeliveryTx :: [TriggerName] -> PG.TxE QErr ([CronEvent], [OneOffScheduledEvent]) getScheduledEventsForDeliveryTx cronTriggerNames = (,) <$> getCronEventsForDelivery <*> getOneOffEventsForDelivery where getCronEventsForDelivery :: PG.TxE QErr [CronEvent] getCronEventsForDelivery = map (PG.getViaJSON . runIdentity) <$> PG.withQE defaultTxErrorHandler [PG.sql| WITH cte AS ( UPDATE hdb_catalog.hdb_cron_events SET status = 'locked' WHERE id IN ( SELECT t.id FROM hdb_catalog.hdb_cron_events t WHERE ( t.status = 'scheduled' and ( (t.next_retry_at is NULL and t.scheduled_time <= now()) or (t.next_retry_at is not NULL and t.next_retry_at <= now()) ) ) AND trigger_name = ANY($1) FOR UPDATE SKIP LOCKED ) RETURNING * ) SELECT row_to_json(t.*) FROM cte AS t |] (Identity $ PGTextArray $ triggerNameToTxt <$> cronTriggerNames) True getOneOffEventsForDelivery :: PG.TxE QErr [OneOffScheduledEvent] getOneOffEventsForDelivery = do map (PG.getViaJSON . runIdentity) <$> PG.withQE defaultTxErrorHandler [PG.sql| WITH cte AS ( UPDATE hdb_catalog.hdb_scheduled_events SET status = 'locked' WHERE id IN ( SELECT t.id FROM hdb_catalog.hdb_scheduled_events t WHERE ( t.status = 'scheduled' and ( (t.next_retry_at is NULL and t.scheduled_time <= now()) or (t.next_retry_at is not NULL and t.next_retry_at <= now()) ) ) FOR UPDATE SKIP LOCKED ) RETURNING * ) SELECT row_to_json(t.*) FROM cte AS t |] () False insertInvocationTx :: Invocation 'ScheduledType -> ScheduledEventType -> PG.TxE QErr () insertInvocationTx invo type' = do case type' of Cron -> do PG.unitQE defaultTxErrorHandler [PG.sql| INSERT INTO hdb_catalog.hdb_cron_event_invocation_logs (event_id, status, request, response) VALUES ($1, $2, $3, $4) |] ( iEventId invo, fromIntegral <$> iStatus invo :: Maybe Int64, PG.ViaJSON $ J.toJSON $ iRequest invo, PG.ViaJSON $ J.toJSON $ iResponse invo ) True PG.unitQE defaultTxErrorHandler [PG.sql| UPDATE hdb_catalog.hdb_cron_events SET tries = tries + 1 WHERE id = $1 |] (Identity $ iEventId invo) True OneOff -> do PG.unitQE defaultTxErrorHandler [PG.sql| INSERT INTO hdb_catalog.hdb_scheduled_event_invocation_logs (event_id, status, request, response) VALUES ($1, $2, $3, $4) |] ( iEventId invo, fromIntegral <$> iStatus invo :: Maybe Int64, PG.ViaJSON $ J.toJSON $ iRequest invo, PG.ViaJSON $ J.toJSON $ iResponse invo ) True PG.unitQE defaultTxErrorHandler [PG.sql| UPDATE hdb_catalog.hdb_scheduled_events SET tries = tries + 1 WHERE id = $1 |] (Identity $ iEventId invo) True setScheduledEventOpTx :: ScheduledEventId -> ScheduledEventOp -> ScheduledEventType -> PG.TxE QErr () setScheduledEventOpTx eventId op type' = case op of SEOpRetry time -> setRetry time SEOpStatus status -> setStatus status where setRetry time = case type' of Cron -> PG.unitQE defaultTxErrorHandler [PG.sql| UPDATE hdb_catalog.hdb_cron_events SET next_retry_at = $1, STATUS = 'scheduled' WHERE id = $2 |] (time, eventId) True OneOff -> PG.unitQE defaultTxErrorHandler [PG.sql| UPDATE hdb_catalog.hdb_scheduled_events SET next_retry_at = $1, STATUS = 'scheduled' WHERE id = $2 |] (time, eventId) True setStatus status = case type' of Cron -> do PG.unitQE defaultTxErrorHandler [PG.sql| UPDATE hdb_catalog.hdb_cron_events SET status = $2 WHERE id = $1 |] (eventId, status) True OneOff -> do PG.unitQE defaultTxErrorHandler [PG.sql| UPDATE hdb_catalog.hdb_scheduled_events SET status = $2 WHERE id = $1 |] (eventId, status) True unlockScheduledEventsTx :: ScheduledEventType -> [ScheduledEventId] -> PG.TxE QErr Int unlockScheduledEventsTx type' eventIds = let eventIdsTextArray = map unEventId eventIds in case type' of Cron -> (runIdentity . PG.getRow) <$> PG.withQE defaultTxErrorHandler [PG.sql| WITH "cte" AS (UPDATE hdb_catalog.hdb_cron_events SET status = 'scheduled' WHERE id = ANY($1::text[]) and status = 'locked' RETURNING *) SELECT count(*) FROM "cte" |] (Identity $ PGTextArray eventIdsTextArray) True OneOff -> (runIdentity . PG.getRow) <$> PG.withQE defaultTxErrorHandler [PG.sql| WITH "cte" AS (UPDATE hdb_catalog.hdb_scheduled_events SET status = 'scheduled' WHERE id = ANY($1::text[]) AND status = 'locked' RETURNING *) SELECT count(*) FROM "cte" |] (Identity $ PGTextArray eventIdsTextArray) True unlockAllLockedScheduledEventsTx :: PG.TxE QErr () unlockAllLockedScheduledEventsTx = do PG.unitQE defaultTxErrorHandler [PG.sql| UPDATE hdb_catalog.hdb_cron_events SET status = 'scheduled' WHERE status = 'locked' |] () True PG.unitQE defaultTxErrorHandler [PG.sql| UPDATE hdb_catalog.hdb_scheduled_events SET status = 'scheduled' WHERE status = 'locked' |] () True insertCronEventsTx :: [CronEventSeed] -> PG.TxE QErr () insertCronEventsTx cronSeeds = do let insertCronEventsSql = TB.run $ toSQL S.SQLInsert { siTable = cronEventsTable, siCols = map unsafePGCol ["trigger_name", "scheduled_time"], siValues = S.ValuesExp $ map (toTupleExp . toArr) cronSeeds, siConflict = Just $ S.DoNothing Nothing, siRet = Nothing } PG.unitQE defaultTxErrorHandler (PG.fromText insertCronEventsSql) () False where toArr (CronEventSeed n t) = [(triggerNameToTxt n), (formatTime' t)] toTupleExp = S.TupleExp . map S.SELit insertOneOffScheduledEventTx :: OneOffEvent -> PG.TxE QErr EventId insertOneOffScheduledEventTx CreateScheduledEvent {..} = runIdentity . PG.getRow <$> PG.withQE defaultTxErrorHandler [PG.sql| INSERT INTO hdb_catalog.hdb_scheduled_events (webhook_conf,scheduled_time,payload,retry_conf,header_conf,comment) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id |] ( PG.ViaJSON cseWebhook, cseScheduleAt, PG.ViaJSON csePayload, PG.ViaJSON cseRetryConf, PG.ViaJSON cseHeaders, cseComment ) False dropFutureCronEventsTx :: ClearCronEvents -> PG.TxE QErr () dropFutureCronEventsTx = \case SingleCronTrigger triggerName -> PG.unitQE defaultTxErrorHandler [PG.sql| DELETE FROM hdb_catalog.hdb_cron_events WHERE trigger_name = $1 AND scheduled_time > now() AND tries = 0 |] (Identity triggerName) True MetadataCronTriggers triggerNames -> PG.unitQE defaultTxErrorHandler [PG.sql| DELETE FROM hdb_catalog.hdb_cron_events WHERE scheduled_time > now() AND tries = 0 AND trigger_name = ANY($1::text[]) |] (Identity $ PGTextArray $ map triggerNameToTxt triggerNames) False cronEventsTable :: QualifiedTable cronEventsTable = QualifiedObject "hdb_catalog" $ TableName "hdb_cron_events" mkScheduledEventStatusFilter :: [ScheduledEventStatus] -> S.BoolExp mkScheduledEventStatusFilter = \case [] -> S.BELit True v -> S.BEIN (S.SEIdentifier $ Identifier "status") $ map (S.SELit . scheduledEventStatusToText) v scheduledTimeOrderBy :: S.OrderByExp scheduledTimeOrderBy = let scheduledTimeCol = S.SEIdentifier $ Identifier "scheduled_time" in S.OrderByExp $ flip (NE.:|) [] $ S.OrderByItem scheduledTimeCol (Just S.OTAsc) Nothing -- | Build a select expression which outputs total count and -- list of json rows with pagination limit and offset applied mkPaginationSelectExp :: S.Select -> ScheduledEventPagination -> RowsCountOption -> S.Select mkPaginationSelectExp allRowsSelect ScheduledEventPagination {..} shouldIncludeRowsCount = S.mkSelect { S.selCTEs = [(countCteAlias, S.ICTESelect allRowsSelect), (limitCteAlias, limitCteSelect)], S.selExtr = case shouldIncludeRowsCount of IncludeRowsCount -> [countExtractor, rowsExtractor] DontIncludeRowsCount -> [rowsExtractor] } where countCteAlias = S.mkTableAlias "count_cte" limitCteAlias = S.mkTableAlias "limit_cte" countExtractor = let selectExp = S.mkSelect { S.selExtr = [S.Extractor S.countStar Nothing], S.selFrom = Just $ S.mkIdenFromExp (S.tableAliasToIdentifier countCteAlias) } in S.Extractor (S.SESelect selectExp) Nothing limitCteSelect = S.ICTESelect S.mkSelect { S.selExtr = [S.selectStar], S.selFrom = Just $ S.mkIdenFromExp (S.tableAliasToIdentifier countCteAlias), S.selLimit = (S.LimitExp . S.intToSQLExp) <$> _sepLimit, S.selOffset = (S.OffsetExp . S.intToSQLExp) <$> _sepOffset } rowsExtractor = let jsonAgg = S.SEUnsafe "json_agg(row_to_json(limit_cte.*))" selectExp = S.mkSelect { S.selExtr = [S.Extractor jsonAgg Nothing], S.selFrom = Just $ S.mkIdenFromExp (S.tableAliasToIdentifier limitCteAlias) } in S.Extractor (S.handleIfNull (S.SELit "[]") (S.SESelect selectExp)) Nothing withCount :: (Int, PG.ViaJSON a) -> WithOptionalTotalCount a withCount (count, PG.ViaJSON a) = WithOptionalTotalCount (Just count) a withoutCount :: PG.ViaJSON a -> WithOptionalTotalCount a withoutCount (PG.ViaJSON a) = WithOptionalTotalCount Nothing a executeWithOptionalTotalCount :: J.FromJSON a => PG.Query -> RowsCountOption -> PG.TxE QErr (WithOptionalTotalCount a) executeWithOptionalTotalCount sql getRowsCount = case getRowsCount of IncludeRowsCount -> (withCount . PG.getRow) <$> PG.withQE defaultTxErrorHandler sql () False DontIncludeRowsCount -> (withoutCount . runIdentity . PG.getRow) <$> PG.withQE defaultTxErrorHandler sql () False getOneOffScheduledEventsTx :: ScheduledEventPagination -> [ScheduledEventStatus] -> RowsCountOption -> PG.TxE QErr (WithOptionalTotalCount [OneOffScheduledEvent]) getOneOffScheduledEventsTx pagination statuses getRowsCount = do let table = QualifiedObject "hdb_catalog" $ TableName "hdb_scheduled_events" statusFilter = mkScheduledEventStatusFilter statuses select = S.mkSelect { S.selExtr = [S.selectStar], S.selFrom = Just $ S.mkSimpleFromExp table, S.selWhere = Just $ S.WhereFrag statusFilter, S.selOrderBy = Just scheduledTimeOrderBy } sql = PG.fromBuilder $ toSQL $ mkPaginationSelectExp select pagination getRowsCount executeWithOptionalTotalCount sql getRowsCount getCronEventsTx :: TriggerName -> ScheduledEventPagination -> [ScheduledEventStatus] -> RowsCountOption -> PG.TxE QErr (WithOptionalTotalCount [CronEvent]) getCronEventsTx triggerName pagination status getRowsCount = do let triggerNameFilter = S.BECompare S.SEQ (S.SEIdentifier $ Identifier "trigger_name") (S.SELit $ triggerNameToTxt triggerName) statusFilter = mkScheduledEventStatusFilter status select = S.mkSelect { S.selExtr = [S.selectStar], S.selFrom = Just $ S.mkSimpleFromExp cronEventsTable, S.selWhere = Just $ S.WhereFrag $ S.BEBin S.AndOp triggerNameFilter statusFilter, S.selOrderBy = Just scheduledTimeOrderBy } sql = PG.fromBuilder $ toSQL $ mkPaginationSelectExp select pagination getRowsCount executeWithOptionalTotalCount sql getRowsCount deleteScheduledEventTx :: ScheduledEventId -> ScheduledEventType -> PG.TxE QErr () deleteScheduledEventTx eventId = \case OneOff -> PG.unitQE defaultTxErrorHandler [PG.sql| DELETE FROM hdb_catalog.hdb_scheduled_events WHERE id = $1 |] (Identity eventId) False Cron -> PG.unitQE defaultTxErrorHandler [PG.sql| DELETE FROM hdb_catalog.hdb_cron_events WHERE id = $1 |] (Identity eventId) False invocationFieldExtractors :: QualifiedTable -> [S.Extractor] invocationFieldExtractors table = [ S.Extractor (seIden "id") Nothing, S.Extractor (seIden "event_id") Nothing, S.Extractor (seIden "status") Nothing, S.Extractor (withJsonTypeAnn $ seIden "request") Nothing, S.Extractor (withJsonTypeAnn $ seIden "response") Nothing, S.Extractor (seIden "created_at") Nothing ] where withJsonTypeAnn e = S.SETyAnn e $ S.TypeAnn "json" seIden = S.SEQIdentifier . S.mkQIdentifierTable table . Identifier mkEventIdBoolExp :: QualifiedTable -> EventId -> S.BoolExp mkEventIdBoolExp table eventId = S.BECompare S.SEQ (S.SEQIdentifier $ S.mkQIdentifierTable table $ Identifier "event_id") (S.SELit $ unEventId eventId) getScheduledEventInvocationsTx :: GetScheduledEventInvocations -> PG.TxE QErr (WithOptionalTotalCount [ScheduledEventInvocation]) getScheduledEventInvocationsTx getEventInvocations = do let eventsTables = EventTables oneOffInvocationsTable cronInvocationsTable cronEventsTable sql = PG.fromBuilder $ toSQL $ getScheduledEventsInvocationsQuery eventsTables getEventInvocations executeWithOptionalTotalCount sql (_geiGetRowsCount getEventInvocations) where oneOffInvocationsTable = QualifiedObject "hdb_catalog" $ TableName "hdb_scheduled_event_invocation_logs" cronInvocationsTable = QualifiedObject "hdb_catalog" $ TableName "hdb_cron_event_invocation_logs" data EventTables = EventTables { etOneOffInvocationsTable :: QualifiedTable, etCronInvocationsTable :: QualifiedTable, etCronEventsTable :: QualifiedTable } getScheduledEventsInvocationsQueryNoPagination :: EventTables -> GetScheduledEventInvocationsBy -> S.Select getScheduledEventsInvocationsQueryNoPagination (EventTables oneOffInvocationsTable cronInvocationsTable cronEventsTable') invocationsBy = allRowsSelect where createdAtOrderBy table = let createdAtCol = S.SEQIdentifier $ S.mkQIdentifierTable table $ Identifier "created_at" in S.OrderByExp $ flip (NE.:|) [] $ S.OrderByItem createdAtCol (Just S.OTDesc) Nothing allRowsSelect = case invocationsBy of GIBEventId eventId eventType -> let table = case eventType of OneOff -> oneOffInvocationsTable Cron -> cronInvocationsTable in S.mkSelect { S.selExtr = invocationFieldExtractors table, S.selFrom = Just $ S.mkSimpleFromExp table, S.selOrderBy = Just $ createdAtOrderBy table, S.selWhere = Just $ S.WhereFrag $ mkEventIdBoolExp table eventId } GIBEvent event -> case event of SEOneOff -> let table = oneOffInvocationsTable in S.mkSelect { S.selExtr = invocationFieldExtractors table, S.selFrom = Just $ S.mkSimpleFromExp table, S.selOrderBy = Just $ createdAtOrderBy table } SECron triggerName -> let invocationTable = cronInvocationsTable eventTable = cronEventsTable' joinCondition = S.JoinOn $ S.BECompare S.SEQ (S.SEQIdentifier $ S.mkQIdentifierTable eventTable $ Identifier "id") (S.SEQIdentifier $ S.mkQIdentifierTable invocationTable $ Identifier "event_id") joinTables = S.JoinExpr (S.FISimple invocationTable Nothing) S.Inner (S.FISimple eventTable Nothing) joinCondition triggerBoolExp = S.BECompare S.SEQ (S.SEQIdentifier $ S.mkQIdentifierTable eventTable (Identifier "trigger_name")) (S.SELit $ triggerNameToTxt triggerName) in S.mkSelect { S.selExtr = invocationFieldExtractors invocationTable, S.selFrom = Just $ S.FromExp [S.FIJoin joinTables], S.selWhere = Just $ S.WhereFrag triggerBoolExp, S.selOrderBy = Just $ createdAtOrderBy invocationTable } getScheduledEventsInvocationsQuery :: EventTables -> GetScheduledEventInvocations -> S.Select getScheduledEventsInvocationsQuery eventTables (GetScheduledEventInvocations invocationsBy pagination shouldIncludeRowsCount) = let invocationsSelect = getScheduledEventsInvocationsQueryNoPagination eventTables invocationsBy in mkPaginationSelectExp invocationsSelect pagination shouldIncludeRowsCount -- | Logger to accumulate stats of fetched scheduled events over a period of time and log once using @'L.Logger L.Hasura'. -- See @'createStatsLogger' for more details. createFetchedScheduledEventsStatsLogger :: (MonadIO m) => L.Logger L.Hasura -> m FetchedScheduledEventsStatsLogger createFetchedScheduledEventsStatsLogger = L.createStatsLogger -- | Close the fetched scheduled events stats logger. closeFetchedScheduledEventsStatsLogger :: (MonadIO m) => L.Logger L.Hasura -> FetchedScheduledEventsStatsLogger -> m () closeFetchedScheduledEventsStatsLogger = L.closeStatsLogger L.scheduledTriggerProcessLogType -- | Log statistics of fetched scheduled events. See @'logStats' for more details. logFetchedScheduledEventsStats :: (MonadIO m) => FetchedScheduledEventsStatsLogger -> CronEventsCount -> OneOffScheduledEventsCount -> m () logFetchedScheduledEventsStats logger cron oneOff = L.logStats logger (FetchedScheduledEventsStats cron oneOff 1) -- | Logger to accumulate stats of fetched cron triggers, for generating cron events, over a period of time and -- log once using @'L.Logger L.Hasura'. -- See @'createStatsLogger' for more details. createFetchedCronTriggerStatsLogger :: (MonadIO m) => L.Logger L.Hasura -> m FetchedCronTriggerStatsLogger createFetchedCronTriggerStatsLogger = L.createStatsLogger -- | Close the fetched cron trigger stats logger. closeFetchedCronTriggersStatsLogger :: (MonadIO m) => L.Logger L.Hasura -> FetchedCronTriggerStatsLogger -> m () closeFetchedCronTriggersStatsLogger = L.closeStatsLogger L.cronEventGeneratorProcessType -- | Log statistics of fetched cron triggers. See @'logStats' for more details. logFetchedCronTriggersStats :: (MonadIO m) => FetchedCronTriggerStatsLogger -> [CronTriggerStats] -> m () logFetchedCronTriggersStats logger cronTriggerStats = L.logStats logger (FetchedCronTriggerStats cronTriggerStats 1)