{-| = Scheduled Triggers This module implements the functionality of invoking webhooks during specified time events aka scheduled events. The scheduled events are the events generated by the graphql-engine using the cron triggers or/and a scheduled event can be created by the user at a specified time with the payload, webhook, headers and the retry configuration. Scheduled events are modeled using rows in Postgres with a @timestamp@ column. This module implements scheduling and delivery of scheduled events: 1. Scheduling a cron event involves creating new cron events. New cron events are created based on the cron schedule and the number of scheduled events that are already present in the scheduled events buffer. The graphql-engine computes the new scheduled events and writes them to the database.(Generator) 2. Delivering a scheduled event involves reading undelivered scheduled events from the database and delivering them to the webhook server. (Processor) The rationale behind separating the event scheduling and event delivery mechanism into two different threads is that the scheduling and delivering of the scheduled events are not directly dependent on each other. The generator will almost always try to create scheduled events which are supposed to be delivered in the future (timestamp > current_timestamp) and the processor will fetch scheduled events of the past (timestamp < current_timestamp). So, the set of the scheduled events generated by the generator and the processor will never be the same. The point here is that they're not correlated to each other. They can be split into different threads for a better performance. == Implementation The scheduled triggers eventing is being implemented in the metadata storage. All functions that make interaction to storage system are abstracted in the @'MonadMetadataStorage' class. During the startup, two threads are started: 1. Generator: Fetches the list of scheduled triggers from cache and generates the scheduled events. - Additional events will be generated only if there are fewer than 100 scheduled events. - The upcoming events timestamp will be generated using: - cron schedule of the scheduled trigger - max timestamp of the scheduled events that already exist or current_timestamp(when no scheduled events exist) - The timestamp of the scheduled events is stored with timezone because `SELECT NOW()` returns timestamp with timezone, so it's good to compare two things of the same type. This effectively corresponds to doing an INSERT with values containing specific timestamp. 2. Processor: Fetches the undelivered cron events and the scheduled events from the database and which have timestamp lesser than the current timestamp and then process them. -} module Hasura.Eventing.ScheduledTrigger ( runCronEventsGenerator , processScheduledTriggers , generateScheduleTimes , CronEventSeed(..) , initLockedEventsCtx , LockedEventsCtx(..) -- * Database interactions -- Following function names are similar to those present in -- 'MonadMetadataStorage' type class. To avoid duplication, -- 'Tx' is suffixed to identify as database transactions , getDeprivedCronTriggerStatsTx , getScheduledEventsForDeliveryTx , insertInvocationTx , setScheduledEventOpTx , unlockScheduledEventsTx , unlockAllLockedScheduledEventsTx , insertScheduledEventTx ) where import Hasura.Prelude import qualified Data.Aeson as J import qualified Data.ByteString.Lazy as BL import qualified Data.Environment as Env import qualified Data.HashMap.Strict as Map import qualified Data.Set as Set import qualified Data.TByteString as TBS import qualified Data.Text as T import qualified Database.PG.Query as Q import qualified Network.HTTP.Client as HTTP import qualified Text.Builder as TB import Control.Arrow.Extended (dup) import Control.Concurrent.Extended (sleep) import Control.Concurrent.STM.TVar import Data.Has import Data.Int (Int64) import Data.List (unfoldr) import Data.Time.Clock import System.Cron import qualified Hasura.Logging as L import qualified Hasura.Tracing as Tracing import Hasura.Backends.Postgres.SQL.DML import Hasura.Backends.Postgres.SQL.Types import Hasura.Eventing.Common import Hasura.Eventing.HTTP import Hasura.Eventing.ScheduledTrigger.Types import Hasura.HTTP import Hasura.Metadata.Class import Hasura.RQL.DDL.EventTrigger (getHeaderInfosFromConf) import Hasura.RQL.DDL.Headers import Hasura.RQL.Types import Hasura.Server.Version (HasVersion) import Hasura.SQL.Types -- | runCronEventsGenerator makes sure that all the cron triggers -- have an adequate buffer of cron events. runCronEventsGenerator :: ( MonadIO m , MonadMetadataStorage (MetadataStorageT m) ) => L.Logger L.Hasura -> IO SchemaCache -> m void runCronEventsGenerator logger getSC = do forever $ do sc <- liftIO getSC -- get cron triggers from cache let cronTriggersCache = scCronTriggers sc -- get cron trigger stats from db eitherRes <- runMetadataStorageT $ do deprivedCronTriggerStats <- getDeprivedCronTriggerStats -- join stats with cron triggers and produce @[(CronTriggerInfo, CronTriggerStats)]@ cronTriggersForHydrationWithStats <- catMaybes <$> mapM (withCronTrigger cronTriggersCache) deprivedCronTriggerStats insertCronEventsFor cronTriggersForHydrationWithStats onLeft eitherRes $ L.unLogger logger . ScheduledTriggerInternalErr . err500 Unexpected . T.pack . show liftIO $ sleep (minutes 1) where withCronTrigger cronTriggerCache cronTriggerStat = do case Map.lookup (ctsName cronTriggerStat) cronTriggerCache of Nothing -> do L.unLogger logger $ ScheduledTriggerInternalErr $ err500 Unexpected "could not find scheduled trigger in the schema cache" pure Nothing Just cronTrigger -> pure $ Just (cronTrigger, cronTriggerStat) insertCronEventsFor :: (MonadMetadataStorage m) => [(CronTriggerInfo, CronTriggerStats)] -> m () insertCronEventsFor cronTriggersWithStats = do let scheduledEvents = flip concatMap cronTriggersWithStats $ \(cti, stats) -> generateCronEventsFrom (ctsMaxScheduledTime stats) cti case scheduledEvents of [] -> pure () events -> insertScheduledEvent $ SESCron events generateCronEventsFrom :: UTCTime -> CronTriggerInfo-> [CronEventSeed] generateCronEventsFrom startTime CronTriggerInfo{..} = map (CronEventSeed ctiName) $ generateScheduleTimes startTime 100 ctiSchedule -- generate next 100 events -- | Generates next @n events starting @from according to 'CronSchedule' generateScheduleTimes :: UTCTime -> Int -> CronSchedule -> [UTCTime] generateScheduleTimes from n cron = take n $ go from where go = unfoldr (fmap dup . nextMatch cron) processCronEvents :: ( HasVersion , MonadIO m , Tracing.HasReporter m , MonadMetadataStorage (MetadataStorageT m) ) => L.Logger L.Hasura -> LogEnvHeaders -> HTTP.Manager -> [CronEvent] -> IO SchemaCache -> TVar (Set.Set CronEventId) -> m () processCronEvents logger logEnv httpMgr cronEvents getSC lockedCronEvents = do cronTriggersInfo <- scCronTriggers <$> liftIO getSC -- save the locked cron events that have been fetched from the -- database, the events stored here will be unlocked in case a -- graceful shutdown is initiated in midst of processing these events saveLockedEvents (map _ceId cronEvents) lockedCronEvents -- The `createdAt` of a cron event is the `created_at` of the cron trigger for_ cronEvents $ \(CronEvent id' name st _ tries _ _)-> do case Map.lookup name cronTriggersInfo of Nothing -> logInternalError $ err500 Unexpected "could not find cron trigger in cache" Just CronTriggerInfo{..} -> do let webhookUrl = unResolvedWebhook ctiWebhookInfo payload = ScheduledEventWebhookPayload id' (Just name) st (fromMaybe J.Null ctiPayload) ctiComment Nothing retryCtx = RetryContext tries ctiRetryConf finally <- runMetadataStorageT $ flip runReaderT (logger, httpMgr) $ processScheduledEvent logEnv id' ctiHeaders retryCtx payload webhookUrl Cron removeEventFromLockedEvents id' lockedCronEvents onLeft finally logInternalError where logInternalError err = liftIO . L.unLogger logger $ ScheduledTriggerInternalErr err processOneOffScheduledEvents :: ( HasVersion , MonadIO m , Tracing.HasReporter m , MonadMetadataStorage (MetadataStorageT m) ) => Env.Environment -> L.Logger L.Hasura -> LogEnvHeaders -> HTTP.Manager -> [OneOffScheduledEvent] -> TVar (Set.Set OneOffScheduledEventId) -> m () processOneOffScheduledEvents env logger logEnv httpMgr oneOffEvents lockedOneOffScheduledEvents = do -- save the locked one-off events that have been fetched from the -- database, the events stored here will be unlocked in case a -- graceful shutdown is initiated in midst of processing these events saveLockedEvents (map _ooseId oneOffEvents) lockedOneOffScheduledEvents for_ oneOffEvents $ \OneOffScheduledEvent{..} -> do (either logInternalError pure) =<< runMetadataStorageT do webhookInfo <- resolveWebhook env _ooseWebhookConf headerInfo <- getHeaderInfosFromConf env _ooseHeaderConf let webhookUrl = unResolvedWebhook webhookInfo payload = ScheduledEventWebhookPayload _ooseId Nothing _ooseScheduledTime (fromMaybe J.Null _oosePayload) _ooseComment (Just _ooseCreatedAt) retryCtx = RetryContext _ooseTries _ooseRetryConf flip runReaderT (logger, httpMgr) $ processScheduledEvent logEnv _ooseId headerInfo retryCtx payload webhookUrl OneOff removeEventFromLockedEvents _ooseId lockedOneOffScheduledEvents where logInternalError err = liftIO . L.unLogger logger $ ScheduledTriggerInternalErr err processScheduledTriggers :: ( HasVersion , MonadIO m , Tracing.HasReporter m , MonadMetadataStorage (MetadataStorageT m) ) => Env.Environment -> L.Logger L.Hasura -> LogEnvHeaders -> HTTP.Manager -> IO SchemaCache -> LockedEventsCtx -> m void processScheduledTriggers env logger logEnv httpMgr getSC LockedEventsCtx {..} = forever $ do result <- runMetadataStorageT getScheduledEventsForDelivery case result of Left e -> logInternalError e Right (cronEvents, oneOffEvents) -> do processCronEvents logger logEnv httpMgr cronEvents getSC leCronEvents processOneOffScheduledEvents env logger logEnv httpMgr oneOffEvents leOneOffEvents liftIO $ sleep (minutes 1) where logInternalError err = liftIO . L.unLogger logger $ ScheduledTriggerInternalErr err processScheduledEvent :: ( MonadReader r m , Has HTTP.Manager r , Has (L.Logger L.Hasura) r , HasVersion , MonadIO m , Tracing.HasReporter m , MonadMetadataStorage m ) => LogEnvHeaders -> ScheduledEventId -> [EventHeaderInfo] -> RetryContext -> ScheduledEventWebhookPayload -> Text -> ScheduledEventType -> m () processScheduledEvent logEnv eventId eventHeaders retryCtx payload webhookUrl type' = Tracing.runTraceT traceNote do currentTime <- liftIO getCurrentTime let retryConf = _rctxConf retryCtx scheduledTime = sewpScheduledTime payload if convertDuration (diffUTCTime currentTime scheduledTime) > unNonNegativeDiffTime (strcToleranceSeconds retryConf) then processDead eventId type' else do let timeoutSeconds = round $ unNonNegativeDiffTime $ strcTimeoutSeconds retryConf httpTimeout = HTTP.responseTimeoutMicro (timeoutSeconds * 1000000) headers = addDefaultHeaders $ map encodeHeader eventHeaders extraLogCtx = ExtraLogContext (Just currentTime) eventId webhookReqBodyJson = J.toJSON payload webhookReqBody = J.encode webhookReqBodyJson requestDetails = RequestDetails $ BL.length webhookReqBody eitherRes <- runExceptT $ tryWebhook headers httpTimeout webhookReqBody (T.unpack webhookUrl) logHTTPForST eitherRes extraLogCtx requestDetails let decodedHeaders = map (decodeHeader logEnv eventHeaders) headers case eitherRes of Left e -> processError eventId retryCtx decodedHeaders type' webhookReqBodyJson e Right r -> processSuccess eventId decodedHeaders type' webhookReqBodyJson r where traceNote = "Scheduled trigger" <> foldMap ((": " <>) . triggerNameToTxt) (sewpName payload) processError :: ( MonadIO m , MonadMetadataStorage m ) => ScheduledEventId -> RetryContext -> [HeaderConf] -> ScheduledEventType -> J.Value -> HTTPErr a -> m () processError eventId retryCtx decodedHeaders type' reqJson err = do let invocation = case err of HClient excp -> do let errMsg = TBS.fromLBS $ J.encode $ show excp mkInvocation eventId 1000 decodedHeaders errMsg [] reqJson HParse _ detail -> do let errMsg = TBS.fromLBS $ J.encode detail mkInvocation eventId 1001 decodedHeaders errMsg [] reqJson HStatus errResp -> do let respPayload = hrsBody errResp respHeaders = hrsHeaders errResp respStatus = hrsStatus errResp mkInvocation eventId respStatus decodedHeaders respPayload respHeaders reqJson HOther detail -> do let errMsg = (TBS.fromLBS $ J.encode detail) mkInvocation eventId 500 decodedHeaders errMsg [] reqJson insertScheduledEventInvocation invocation type' retryOrMarkError eventId retryCtx err type' retryOrMarkError :: (MonadIO m, MonadMetadataStorage m) => ScheduledEventId -> RetryContext -> HTTPErr a -> ScheduledEventType -> m () retryOrMarkError eventId retryCtx err type' = do let RetryContext tries retryConf = retryCtx mRetryHeader = getRetryAfterHeaderFromHTTPErr err mRetryHeaderSeconds = parseRetryHeaderValue =<< mRetryHeader triesExhausted = tries >= strcNumRetries retryConf noRetryHeader = isNothing mRetryHeaderSeconds if triesExhausted && noRetryHeader then setScheduledEventOp eventId (SEOpStatus SESError) type' else do currentTime <- liftIO getCurrentTime let delay = fromMaybe (round $ unNonNegativeDiffTime $ strcRetryIntervalSeconds retryConf) mRetryHeaderSeconds diff = fromIntegral delay retryTime = addUTCTime diff currentTime setScheduledEventOp eventId (SEOpRetry retryTime) type' {- Note [Scheduled event lifecycle] ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Scheduled events move between six different states over the course of their lifetime, as represented by the following flowchart: ┌───────────┐ ┌────────┐ ┌───────────┐ │ scheduled │─(a)─→│ locked │─(b)─→│ delivered │ └───────────┘ └────────┘ └───────────┘ ↑ │ ┌───────┐ └────(c)───────┼─────(d)──→│ error │ │ └───────┘ │ ┌──────┐ └─────(e)──→│ dead │ └──────┘ When a scheduled event is first created, it starts in the 'scheduled' state, and it can transition to other states in the following ways: a. When graphql-engine fetches a scheduled event from the database to process it, it sets its state to 'locked'. This prevents multiple graphql-engine instances running on the same database from processing the same scheduled event concurrently. b. When a scheduled event is processed successfully, it is marked 'delivered'. c. If a scheduled event fails to be processed, but it hasn’t yet reached its maximum retry limit, its retry counter is incremented and it is returned to the 'scheduled' state. d. If a scheduled event fails to be processed and *has* reached its retry limit, its state is set to 'error'. e. If for whatever reason the difference between the current time and the scheduled time is greater than the tolerance of the scheduled event, it will not be processed and its state will be set to 'dead'. -} processSuccess :: (MonadMetadataStorage m) => ScheduledEventId -> [HeaderConf] -> ScheduledEventType -> J.Value -> HTTPResp a -> m () processSuccess eventId decodedHeaders type' reqBodyJson resp = do let respBody = hrsBody resp respHeaders = hrsHeaders resp respStatus = hrsStatus resp invocation = mkInvocation eventId respStatus decodedHeaders respBody respHeaders reqBodyJson insertScheduledEventInvocation invocation type' setScheduledEventOp eventId (SEOpStatus SESDelivered) type' processDead :: (MonadMetadataStorage m) => ScheduledEventId -> ScheduledEventType -> m () processDead eventId type' = setScheduledEventOp eventId (SEOpStatus SESDead) type' mkInvocation :: ScheduledEventId -> Int -> [HeaderConf] -> TBS.TByteString -> [HeaderConf] -> J.Value -> (Invocation 'ScheduledType) mkInvocation eventId status reqHeaders respBody respHeaders reqBodyJson = let resp = if isClientError status then mkClientErr respBody else mkResp status respBody respHeaders in Invocation eventId status (mkWebhookReq reqBodyJson reqHeaders invocationVersionST) resp -- metadata database transactions getDeprivedCronTriggerStatsTx :: Q.TxE QErr [CronTriggerStats] getDeprivedCronTriggerStatsTx = map (\(n, count, maxTx) -> CronTriggerStats n count maxTx) <$> Q.listQE defaultTxErrorHandler [Q.sql| SELECT * FROM ( SELECT trigger_name, count(*) as upcoming_events_count, max(scheduled_time) as max_scheduled_time FROM hdb_catalog.hdb_cron_events WHERE tries = 0 and status = 'scheduled' GROUP BY trigger_name ) AS q WHERE q.upcoming_events_count < 100 |] () True getScheduledEventsForDeliveryTx :: Q.TxE QErr ([CronEvent], [OneOffScheduledEvent]) getScheduledEventsForDeliveryTx = (,) <$> getCronEventsForDelivery <*> getOneOffEventsForDelivery where getCronEventsForDelivery :: Q.TxE QErr [CronEvent] getCronEventsForDelivery = map (Q.getAltJ . runIdentity) <$> Q.listQE defaultTxErrorHandler [Q.sql| WITH cte AS ( UPDATE hdb_catalog.hdb_cron_events SET status = 'locked' WHERE id IN ( SELECT t.id FROM hdb_catalog.hdb_cron_events t WHERE ( t.status = 'scheduled' and ( (t.next_retry_at is NULL and t.scheduled_time <= now()) or (t.next_retry_at is not NULL and t.next_retry_at <= now()) ) ) FOR UPDATE SKIP LOCKED ) RETURNING * ) SELECT row_to_json(t.*) FROM cte AS t |] () True getOneOffEventsForDelivery :: Q.TxE QErr [OneOffScheduledEvent] getOneOffEventsForDelivery = do map (Q.getAltJ . runIdentity) <$> Q.listQE defaultTxErrorHandler [Q.sql| WITH cte AS ( UPDATE hdb_catalog.hdb_scheduled_events SET status = 'locked' WHERE id IN ( SELECT t.id FROM hdb_catalog.hdb_scheduled_events t WHERE ( t.status = 'scheduled' and ( (t.next_retry_at is NULL and t.scheduled_time <= now()) or (t.next_retry_at is not NULL and t.next_retry_at <= now()) ) ) FOR UPDATE SKIP LOCKED ) RETURNING * ) SELECT row_to_json(t.*) FROM cte AS t |] () False insertInvocationTx :: Invocation 'ScheduledType -> ScheduledEventType -> Q.TxE QErr () insertInvocationTx invo type' = do case type' of Cron -> do Q.unitQE defaultTxErrorHandler [Q.sql| INSERT INTO hdb_catalog.hdb_cron_event_invocation_logs (event_id, status, request, response) VALUES ($1, $2, $3, $4) |] ( iEventId invo , fromIntegral $ iStatus invo :: Int64 , Q.AltJ $ J.toJSON $ iRequest invo , Q.AltJ $ J.toJSON $ iResponse invo) True Q.unitQE defaultTxErrorHandler [Q.sql| UPDATE hdb_catalog.hdb_cron_events SET tries = tries + 1 WHERE id = $1 |] (Identity $ iEventId invo) True OneOff -> do Q.unitQE defaultTxErrorHandler [Q.sql| INSERT INTO hdb_catalog.hdb_scheduled_event_invocation_logs (event_id, status, request, response) VALUES ($1, $2, $3, $4) |] ( iEventId invo , fromIntegral $ iStatus invo :: Int64 , Q.AltJ $ J.toJSON $ iRequest invo , Q.AltJ $ J.toJSON $ iResponse invo) True Q.unitQE defaultTxErrorHandler [Q.sql| UPDATE hdb_catalog.hdb_scheduled_events SET tries = tries + 1 WHERE id = $1 |] (Identity $ iEventId invo) True setScheduledEventOpTx :: ScheduledEventId -> ScheduledEventOp -> ScheduledEventType -> Q.TxE QErr () setScheduledEventOpTx eventId op type' = case op of SEOpRetry time -> setRetry time SEOpStatus status -> setStatus status where setRetry time = case type' of Cron -> Q.unitQE defaultTxErrorHandler [Q.sql| UPDATE hdb_catalog.hdb_cron_events SET next_retry_at = $1, STATUS = 'scheduled' WHERE id = $2 |] (time, eventId) True OneOff -> Q.unitQE defaultTxErrorHandler [Q.sql| UPDATE hdb_catalog.hdb_scheduled_events SET next_retry_at = $1, STATUS = 'scheduled' WHERE id = $2 |] (time, eventId) True setStatus status = case type' of Cron -> do Q.unitQE defaultTxErrorHandler [Q.sql| UPDATE hdb_catalog.hdb_cron_events SET status = $2 WHERE id = $1 |] (eventId, status) True OneOff -> do Q.unitQE defaultTxErrorHandler [Q.sql| UPDATE hdb_catalog.hdb_scheduled_events SET status = $2 WHERE id = $1 |] (eventId, status) True unlockScheduledEventsTx :: ScheduledEventType -> [ScheduledEventId] -> Q.TxE QErr Int unlockScheduledEventsTx type' eventIds = case type' of Cron -> (runIdentity . Q.getRow) <$> Q.withQE defaultTxErrorHandler [Q.sql| WITH "cte" AS (UPDATE hdb_catalog.hdb_cron_events SET status = 'scheduled' WHERE id = ANY($1::text[]) and status = 'locked' RETURNING *) SELECT count(*) FROM "cte" |] (Identity $ ScheduledEventIdArray eventIds) True OneOff -> (runIdentity . Q.getRow) <$> Q.withQE defaultTxErrorHandler [Q.sql| WITH "cte" AS (UPDATE hdb_catalog.hdb_scheduled_events SET status = 'scheduled' WHERE id = ANY($1::text[]) AND status = 'locked' RETURNING *) SELECT count(*) FROM "cte" |] (Identity $ ScheduledEventIdArray eventIds) True unlockAllLockedScheduledEventsTx :: Q.TxE QErr () unlockAllLockedScheduledEventsTx = do Q.unitQE defaultTxErrorHandler [Q.sql| UPDATE hdb_catalog.hdb_cron_events SET status = 'scheduled' WHERE status = 'locked' |] () True Q.unitQE defaultTxErrorHandler [Q.sql| UPDATE hdb_catalog.hdb_scheduled_events SET status = 'scheduled' WHERE status = 'locked' |] () True insertScheduledEventTx :: ScheduledEventSeed -> Q.TxE QErr () insertScheduledEventTx = \case SESOneOff CreateScheduledEvent{..} -> Q.unitQE defaultTxErrorHandler [Q.sql| INSERT INTO hdb_catalog.hdb_scheduled_events (webhook_conf,scheduled_time,payload,retry_conf,header_conf,comment) VALUES ($1, $2, $3, $4, $5, $6) |] ( Q.AltJ cseWebhook , cseScheduleAt , Q.AltJ csePayload , Q.AltJ cseRetryConf , Q.AltJ cseHeaders , cseComment) False SESCron cronSeeds -> insertCronEventsTx cronSeeds where insertCronEventsTx :: [CronEventSeed] -> Q.TxE QErr () insertCronEventsTx events = do let insertCronEventsSql = TB.run $ toSQL SQLInsert { siTable = cronEventsTable , siCols = map unsafePGCol ["trigger_name", "scheduled_time"] , siValues = ValuesExp $ map (toTupleExp . toArr) events , siConflict = Just $ DoNothing Nothing , siRet = Nothing } Q.unitQE defaultTxErrorHandler (Q.fromText insertCronEventsSql) () False where toArr (CronEventSeed n t) = [(triggerNameToTxt n), (formatTime' t)] toTupleExp = TupleExp . map SELit cronEventsTable :: QualifiedTable cronEventsTable = QualifiedObject "hdb_catalog" $ TableName "hdb_cron_events"