2020-05-13 15:33:16 +03:00
= Scheduled Triggers
This module implements the functionality of invoking webhooks during specified
time events aka scheduled events. The scheduled events are the events generated
by the graphql-engine using the cron triggers or/and a scheduled event can
be created by the user at a specified time with the payload, webhook, headers
and the retry configuration. Scheduled events are modeled using rows in Postgres
with a @timestamp@ column.
This module implements scheduling and delivery of scheduled
1. Scheduling a cron event involves creating new cron events. New
cron events are created based on the cron schedule and the number of
scheduled events that are already present in the scheduled events buffer.
The graphql-engine computes the new scheduled events and writes them to
the database.(Generator)
2. Delivering a scheduled event involves reading undelivered scheduled events
from the database and delivering them to the webhook server. (Processor)
The rationale behind separating the event scheduling and event delivery
mechanism into two different threads is that the scheduling and delivering of
the scheduled events are not directly dependent on each other. The generator
will almost always try to create scheduled events which are supposed to be
delivered in the future (timestamp > current_timestamp) and the processor
will fetch scheduled events of the past (timestamp < current_timestamp). So,
the set of the scheduled events generated by the generator and the processor
will never be the same. The point here is that they're not correlated to each
other. They can be split into different threads for a better performance.
== Implementation
2020-11-25 13:56:44 +03:00
The scheduled triggers eventing is being implemented in the metadata storage.
All functions that make interaction to storage system are abstracted in
the @'MonadMetadataStorage' class.
2020-05-13 15:33:16 +03:00
During the startup, two threads are started:
1. Generator: Fetches the list of scheduled triggers from cache and generates
the scheduled events.
- Additional events will be generated only if there are fewer than 100
scheduled events.
- The upcoming events timestamp will be generated using:
- cron schedule of the scheduled trigger
- max timestamp of the scheduled events that already exist or
current_timestamp(when no scheduled events exist)
- The timestamp of the scheduled events is stored with timezone because
`SELECT NOW()` returns timestamp with timezone, so it's good to
compare two things of the same type.
This effectively corresponds to doing an INSERT with values containing
specific timestamp.
2. Processor: Fetches the undelivered cron events and the scheduled events
from the database and which have timestamp lesser than the
current timestamp and then process them.
module Hasura.Eventing.ScheduledTrigger
( runCronEventsGenerator
, processScheduledTriggers
2020-11-25 13:56:44 +03:00
, generateScheduleTimes
2020-05-13 15:33:16 +03:00
, CronEventSeed(..)
2020-07-02 14:57:09 +03:00
, initLockedEventsCtx
, LockedEventsCtx(..)
2020-11-25 13:56:44 +03:00
-- * Database interactions
-- Following function names are similar to those present in
-- 'MonadMetadataStorage' type class. To avoid duplication,
-- 'Tx' is suffixed to identify as database transactions
, getDeprivedCronTriggerStatsTx
, getScheduledEventsForDeliveryTx
, insertInvocationTx
, setScheduledEventOpTx
, unlockScheduledEventsTx
, unlockAllLockedScheduledEventsTx
, insertScheduledEventTx
2020-12-14 07:30:19 +03:00
, dropFutureCronEventsTx
2021-01-07 12:04:22 +03:00
, getOneOffScheduledEventsTx
, getCronEventsTx
, deleteScheduledEventTx
, getInvocationsTx
-- * Export utility functions which are useful to build
-- SQLs for fetching data from metadata storage
, mkScheduledEventStatusFilter
, scheduledTimeOrderBy
, mkPaginationSelectExp
, withCount
, invocationFieldExtractors
, mkEventIdBoolExp
2020-05-13 15:33:16 +03:00
) where
2020-08-27 19:36:39 +03:00
import Hasura.Prelude
2020-11-25 13:56:44 +03:00
import qualified Data.Aeson as J
import qualified Data.ByteString.Lazy as BL
import qualified Data.Environment as Env
import qualified Data.HashMap.Strict as Map
2021-01-07 12:04:22 +03:00
import qualified Data.List.NonEmpty as NE
2020-11-25 13:56:44 +03:00
import qualified Data.Set as Set
import qualified Data.TByteString as TBS
import qualified Data.Text as T
import qualified Database.PG.Query as Q
import qualified Network.HTTP.Client as HTTP
import qualified Text.Builder as TB
import Control.Arrow.Extended (dup)
import Control.Concurrent.Extended (sleep)
2020-07-02 14:57:09 +03:00
import Control.Concurrent.STM.TVar
2020-05-13 15:33:16 +03:00
import Data.Has
2020-11-25 13:56:44 +03:00
import Data.Int (Int64)
import Data.List (unfoldr)
2020-05-13 15:33:16 +03:00
import Data.Time.Clock
2020-10-27 16:53:49 +03:00
import System.Cron
2021-01-07 12:04:22 +03:00
import qualified Hasura.Backends.Postgres.SQL.DML as S
2020-11-25 13:56:44 +03:00
import qualified Hasura.Logging as L
import qualified Hasura.Tracing as Tracing
2020-10-27 16:53:49 +03:00
import Hasura.Backends.Postgres.SQL.Types
2020-07-14 22:00:58 +03:00
import Hasura.Eventing.Common
2020-05-13 15:33:16 +03:00
import Hasura.Eventing.HTTP
2020-11-25 13:56:44 +03:00
import Hasura.Eventing.ScheduledTrigger.Types
2020-05-13 15:33:16 +03:00
import Hasura.HTTP
2020-11-25 13:56:44 +03:00
import Hasura.Metadata.Class
import Hasura.RQL.DDL.EventTrigger (getHeaderInfosFromConf)
2020-05-13 15:33:16 +03:00
import Hasura.RQL.DDL.Headers
import Hasura.RQL.Types
2020-11-25 13:56:44 +03:00
import Hasura.Server.Version (HasVersion)
2020-05-13 15:33:16 +03:00
import Hasura.SQL.Types
2020-07-03 03:55:07 +03:00
2020-05-13 15:33:16 +03:00
-- | runCronEventsGenerator makes sure that all the cron triggers
-- have an adequate buffer of cron events.
2020-11-25 13:56:44 +03:00
:: ( MonadIO m
, MonadMetadataStorage (MetadataStorageT m)
=> L.Logger L.Hasura
2020-05-13 15:33:16 +03:00
-> IO SchemaCache
2020-11-25 13:56:44 +03:00
-> m void
runCronEventsGenerator logger getSC = do
2020-05-13 15:33:16 +03:00
forever $ do
2020-11-25 13:56:44 +03:00
sc <- liftIO getSC
2020-05-13 15:33:16 +03:00
-- get cron triggers from cache
let cronTriggersCache = scCronTriggers sc
-- get cron trigger stats from db
2020-11-25 13:56:44 +03:00
eitherRes <- runMetadataStorageT $ do
deprivedCronTriggerStats <- getDeprivedCronTriggerStats
-- join stats with cron triggers and produce @[(CronTriggerInfo, CronTriggerStats)]@
cronTriggersForHydrationWithStats <-
catMaybes <$>
mapM (withCronTrigger cronTriggersCache) deprivedCronTriggerStats
insertCronEventsFor cronTriggersForHydrationWithStats
2020-05-13 15:33:16 +03:00
2020-11-25 13:56:44 +03:00
onLeft eitherRes $ L.unLogger logger .
2020-12-14 07:30:19 +03:00
ScheduledTriggerInternalErr . err500 Unexpected . tshow
2020-05-13 15:33:16 +03:00
2020-11-25 13:56:44 +03:00
liftIO $ sleep (minutes 1)
2020-05-13 15:33:16 +03:00
withCronTrigger cronTriggerCache cronTriggerStat = do
case Map.lookup (ctsName cronTriggerStat) cronTriggerCache of
Nothing -> do
L.unLogger logger $
ScheduledTriggerInternalErr $
2020-11-25 13:56:44 +03:00
err500 Unexpected "could not find scheduled trigger in the schema cache"
2020-05-13 15:33:16 +03:00
pure Nothing
Just cronTrigger -> pure $
Just (cronTrigger, cronTriggerStat)
2020-11-25 13:56:44 +03:00
:: (MonadMetadataStorage m)
=> [(CronTriggerInfo, CronTriggerStats)]
-> m ()
2020-05-13 15:33:16 +03:00
insertCronEventsFor cronTriggersWithStats = do
let scheduledEvents = flip concatMap cronTriggersWithStats $ \(cti, stats) ->
generateCronEventsFrom (ctsMaxScheduledTime stats) cti
case scheduledEvents of
[] -> pure ()
2020-11-25 13:56:44 +03:00
events -> insertScheduledEvent $ SESCron events
2020-05-13 15:33:16 +03:00
generateCronEventsFrom :: UTCTime -> CronTriggerInfo-> [CronEventSeed]
generateCronEventsFrom startTime CronTriggerInfo{..} =
map (CronEventSeed ctiName) $
generateScheduleTimes startTime 100 ctiSchedule -- generate next 100 events
-- | Generates next @n events starting @from according to 'CronSchedule'
generateScheduleTimes :: UTCTime -> Int -> CronSchedule -> [UTCTime]
generateScheduleTimes from n cron = take n $ go from
go = unfoldr (fmap dup . nextMatch cron)
2020-11-25 13:56:44 +03:00
:: ( HasVersion
, MonadIO m
, Tracing.HasReporter m
, MonadMetadataStorage (MetadataStorageT m)
2020-05-13 15:33:16 +03:00
=> L.Logger L.Hasura
-> LogEnvHeaders
-> HTTP.Manager
2020-11-25 13:56:44 +03:00
-> [CronEvent]
2020-05-13 15:33:16 +03:00
-> IO SchemaCache
2020-07-02 14:57:09 +03:00
-> TVar (Set.Set CronEventId)
2020-07-14 22:00:58 +03:00
-> m ()
2020-11-25 13:56:44 +03:00
processCronEvents logger logEnv httpMgr cronEvents getSC lockedCronEvents = do
2020-07-14 22:00:58 +03:00
cronTriggersInfo <- scCronTriggers <$> liftIO getSC
2020-11-25 13:56:44 +03:00
-- save the locked cron events that have been fetched from the
-- database, the events stored here will be unlocked in case a
-- graceful shutdown is initiated in midst of processing these events
saveLockedEvents (map _ceId cronEvents) lockedCronEvents
-- The `createdAt` of a cron event is the `created_at` of the cron trigger
for_ cronEvents $ \(CronEvent id' name st _ tries _ _)-> do
case Map.lookup name cronTriggersInfo of
Nothing -> logInternalError $
err500 Unexpected "could not find cron trigger in cache"
Just CronTriggerInfo{..} -> do
let webhookUrl = unResolvedWebhook ctiWebhookInfo
payload = ScheduledEventWebhookPayload id' (Just name) st
(fromMaybe J.Null ctiPayload) ctiComment
retryCtx = RetryContext tries ctiRetryConf
finally <- runMetadataStorageT $ flip runReaderT (logger, httpMgr) $
processScheduledEvent logEnv id' ctiHeaders retryCtx
payload webhookUrl Cron
removeEventFromLockedEvents id' lockedCronEvents
onLeft finally logInternalError
2020-05-13 15:33:16 +03:00
2020-07-14 22:00:58 +03:00
logInternalError err = liftIO . L.unLogger logger $ ScheduledTriggerInternalErr err
2020-05-13 15:33:16 +03:00
2020-09-09 09:47:34 +03:00
2020-11-25 13:56:44 +03:00
:: ( HasVersion
, MonadIO m
, Tracing.HasReporter m
, MonadMetadataStorage (MetadataStorageT m)
2020-07-14 22:00:58 +03:00
=> Env.Environment
-> L.Logger L.Hasura
2020-05-13 15:33:16 +03:00
-> LogEnvHeaders
-> HTTP.Manager
2020-11-25 13:56:44 +03:00
-> [OneOffScheduledEvent]
2020-09-07 09:15:15 +03:00
-> TVar (Set.Set OneOffScheduledEventId)
2020-07-14 22:00:58 +03:00
-> m ()
2020-11-25 13:56:44 +03:00
processOneOffScheduledEvents env logger logEnv httpMgr oneOffEvents lockedOneOffScheduledEvents = do
-- save the locked one-off events that have been fetched from the
-- database, the events stored here will be unlocked in case a
-- graceful shutdown is initiated in midst of processing these events
saveLockedEvents (map _ooseId oneOffEvents) lockedOneOffScheduledEvents
for_ oneOffEvents $ \OneOffScheduledEvent{..} -> do
(either logInternalError pure) =<< runMetadataStorageT do
webhookInfo <- resolveWebhook env _ooseWebhookConf
headerInfo <- getHeaderInfosFromConf env _ooseHeaderConf
let webhookUrl = unResolvedWebhook webhookInfo
payload = ScheduledEventWebhookPayload _ooseId Nothing
_ooseScheduledTime (fromMaybe J.Null _oosePayload)
_ooseComment (Just _ooseCreatedAt)
retryCtx = RetryContext _ooseTries _ooseRetryConf
flip runReaderT (logger, httpMgr) $
processScheduledEvent logEnv _ooseId headerInfo retryCtx payload webhookUrl OneOff
removeEventFromLockedEvents _ooseId lockedOneOffScheduledEvents
2020-05-13 15:33:16 +03:00
2020-07-14 22:00:58 +03:00
logInternalError err = liftIO . L.unLogger logger $ ScheduledTriggerInternalErr err
2020-05-13 15:33:16 +03:00
2020-11-25 13:56:44 +03:00
:: ( HasVersion
, MonadIO m
, Tracing.HasReporter m
, MonadMetadataStorage (MetadataStorageT m)
2020-07-14 22:00:58 +03:00
=> Env.Environment
-> L.Logger L.Hasura
2020-05-13 15:33:16 +03:00
-> LogEnvHeaders
-> HTTP.Manager
-> IO SchemaCache
2020-07-02 14:57:09 +03:00
-> LockedEventsCtx
2020-07-14 22:00:58 +03:00
-> m void
2020-11-25 13:56:44 +03:00
processScheduledTriggers env logger logEnv httpMgr getSC LockedEventsCtx {..} =
2020-05-13 15:33:16 +03:00
forever $ do
2020-11-25 13:56:44 +03:00
result <- runMetadataStorageT getScheduledEventsForDelivery
case result of
Left e -> logInternalError e
Right (cronEvents, oneOffEvents) -> do
processCronEvents logger logEnv httpMgr cronEvents getSC leCronEvents
processOneOffScheduledEvents env logger logEnv httpMgr oneOffEvents leOneOffEvents
liftIO $ sleep (minutes 1)
logInternalError err = liftIO . L.unLogger logger $ ScheduledTriggerInternalErr err
2020-05-13 15:33:16 +03:00
2020-11-25 13:56:44 +03:00
:: ( MonadReader r m
, Has HTTP.Manager r
, Has (L.Logger L.Hasura) r
, HasVersion
, MonadIO m
, Tracing.HasReporter m
, MonadMetadataStorage m
2020-05-13 15:33:16 +03:00
=> LogEnvHeaders
2020-11-25 13:56:44 +03:00
-> ScheduledEventId
-> [EventHeaderInfo]
-> RetryContext
-> ScheduledEventWebhookPayload
-> Text
2020-05-13 15:33:16 +03:00
-> ScheduledEventType
-> m ()
2020-11-25 13:56:44 +03:00
processScheduledEvent logEnv eventId eventHeaders retryCtx payload webhookUrl type'
= Tracing.runTraceT traceNote do
2020-05-13 15:33:16 +03:00
currentTime <- liftIO getCurrentTime
2020-11-25 13:56:44 +03:00
let retryConf = _rctxConf retryCtx
scheduledTime = sewpScheduledTime payload
if convertDuration (diffUTCTime currentTime scheduledTime)
> unNonNegativeDiffTime (strcToleranceSeconds retryConf)
then processDead eventId type'
2020-05-13 15:33:16 +03:00
else do
let timeoutSeconds = round $ unNonNegativeDiffTime
2020-11-25 13:56:44 +03:00
$ strcTimeoutSeconds retryConf
2020-05-13 15:33:16 +03:00
httpTimeout = HTTP.responseTimeoutMicro (timeoutSeconds * 1000000)
2020-11-25 13:56:44 +03:00
headers = addDefaultHeaders $ map encodeHeader eventHeaders
extraLogCtx = ExtraLogContext (Just currentTime) eventId
webhookReqBodyJson = J.toJSON payload
2020-07-28 20:52:44 +03:00
webhookReqBody = J.encode webhookReqBodyJson
requestDetails = RequestDetails $ BL.length webhookReqBody
2020-11-25 13:56:44 +03:00
eitherRes <- runExceptT $ tryWebhook headers httpTimeout webhookReqBody (T.unpack webhookUrl)
logHTTPForST eitherRes extraLogCtx requestDetails
let decodedHeaders = map (decodeHeader logEnv eventHeaders) headers
case eitherRes of
Left e -> processError eventId retryCtx decodedHeaders type' webhookReqBodyJson e
Right r -> processSuccess eventId decodedHeaders type' webhookReqBodyJson r
2020-09-16 01:03:41 +03:00
2020-11-25 13:56:44 +03:00
traceNote = "Scheduled trigger" <> foldMap ((": " <>) . triggerNameToTxt) (sewpName payload)
2020-05-13 15:33:16 +03:00
2020-11-25 13:56:44 +03:00
:: ( MonadIO m
, MonadMetadataStorage m
=> ScheduledEventId
-> RetryContext
2020-07-03 03:55:07 +03:00
-> [HeaderConf]
-> ScheduledEventType
-> J.Value
-> HTTPErr a
-> m ()
2020-11-25 13:56:44 +03:00
processError eventId retryCtx decodedHeaders type' reqJson err = do
2020-05-13 15:33:16 +03:00
let invocation = case err of
HClient excp -> do
let errMsg = TBS.fromLBS $ J.encode $ show excp
2020-11-25 13:56:44 +03:00
mkInvocation eventId 1000 decodedHeaders errMsg [] reqJson
2020-05-13 15:33:16 +03:00
HParse _ detail -> do
let errMsg = TBS.fromLBS $ J.encode detail
2020-11-25 13:56:44 +03:00
mkInvocation eventId 1001 decodedHeaders errMsg [] reqJson
2020-05-13 15:33:16 +03:00
HStatus errResp -> do
let respPayload = hrsBody errResp
respHeaders = hrsHeaders errResp
respStatus = hrsStatus errResp
2020-11-25 13:56:44 +03:00
mkInvocation eventId respStatus decodedHeaders respPayload respHeaders reqJson
2020-05-13 15:33:16 +03:00
HOther detail -> do
2020-11-25 13:56:44 +03:00
let errMsg = (TBS.fromLBS $ J.encode detail)
mkInvocation eventId 500 decodedHeaders errMsg [] reqJson
insertScheduledEventInvocation invocation type'
retryOrMarkError eventId retryCtx err type'
:: (MonadIO m, MonadMetadataStorage m)
=> ScheduledEventId
-> RetryContext
-> HTTPErr a
-> ScheduledEventType
-> m ()
retryOrMarkError eventId retryCtx err type' = do
let RetryContext tries retryConf = retryCtx
mRetryHeader = getRetryAfterHeaderFromHTTPErr err
2020-05-13 15:33:16 +03:00
mRetryHeaderSeconds = parseRetryHeaderValue =<< mRetryHeader
2020-11-25 13:56:44 +03:00
triesExhausted = tries >= strcNumRetries retryConf
2020-05-13 15:33:16 +03:00
noRetryHeader = isNothing mRetryHeaderSeconds
if triesExhausted && noRetryHeader
2020-11-25 13:56:44 +03:00
setScheduledEventOp eventId (SEOpStatus SESError) type'
2020-05-13 15:33:16 +03:00
else do
currentTime <- liftIO getCurrentTime
let delay = fromMaybe (round $ unNonNegativeDiffTime
2020-11-25 13:56:44 +03:00
$ strcRetryIntervalSeconds retryConf)
2020-05-13 15:33:16 +03:00
diff = fromIntegral delay
retryTime = addUTCTime diff currentTime
2020-11-25 13:56:44 +03:00
setScheduledEventOp eventId (SEOpRetry retryTime) type'
2020-05-13 15:33:16 +03:00
{- Note [Scheduled event lifecycle]
Scheduled events move between six different states over the course of their
lifetime, as represented by the following flowchart:
┌───────────┐ ┌────────┐ ┌───────────┐
│ scheduled │─(a)─→│ locked │─(b)─→│ delivered │
└───────────┘ └────────┘ └───────────┘
↑ │ ┌───────┐
└────(c)───────┼─────(d)──→│ error │
│ └───────┘
│ ┌──────┐
└─────(e)──→│ dead │
When a scheduled event is first created, it starts in the 'scheduled' state,
and it can transition to other states in the following ways:
a. When graphql-engine fetches a scheduled event from the database to process
it, it sets its state to 'locked'. This prevents multiple graphql-engine
instances running on the same database from processing the same
scheduled event concurrently.
b. When a scheduled event is processed successfully, it is marked 'delivered'.
c. If a scheduled event fails to be processed, but it hasn’t yet reached
its maximum retry limit, its retry counter is incremented and
it is returned to the 'scheduled' state.
d. If a scheduled event fails to be processed and *has* reached its
retry limit, its state is set to 'error'.
e. If for whatever reason the difference between the current time and the
scheduled time is greater than the tolerance of the scheduled event, it
will not be processed and its state will be set to 'dead'.
2020-11-25 13:56:44 +03:00
:: (MonadMetadataStorage m)
=> ScheduledEventId
2020-07-03 03:55:07 +03:00
-> [HeaderConf]
-> ScheduledEventType
-> J.Value
-> HTTPResp a
-> m ()
2020-11-25 13:56:44 +03:00
processSuccess eventId decodedHeaders type' reqBodyJson resp = do
2020-05-13 15:33:16 +03:00
let respBody = hrsBody resp
respHeaders = hrsHeaders resp
respStatus = hrsStatus resp
2020-11-25 13:56:44 +03:00
invocation = mkInvocation eventId respStatus decodedHeaders respBody respHeaders reqBodyJson
insertScheduledEventInvocation invocation type'
setScheduledEventOp eventId (SEOpStatus SESDelivered) type'
:: (MonadMetadataStorage m)
=> ScheduledEventId -> ScheduledEventType -> m ()
processDead eventId type' =
setScheduledEventOp eventId (SEOpStatus SESDead) type'
2020-05-13 15:33:16 +03:00
2020-11-25 13:56:44 +03:00
:: ScheduledEventId
2020-07-03 03:55:07 +03:00
-> Int
-> [HeaderConf]
-> TBS.TByteString
-> [HeaderConf]
-> J.Value
2020-11-25 13:56:44 +03:00
-> (Invocation 'ScheduledType)
mkInvocation eventId status reqHeaders respBody respHeaders reqBodyJson
2020-05-13 15:33:16 +03:00
= let resp = if isClientError status
then mkClientErr respBody
else mkResp status respBody respHeaders
2020-11-25 13:56:44 +03:00
2020-05-13 15:33:16 +03:00
2020-07-03 03:55:07 +03:00
(mkWebhookReq reqBodyJson reqHeaders invocationVersionST)
2020-05-13 15:33:16 +03:00
2020-11-25 13:56:44 +03:00
-- metadata database transactions
getDeprivedCronTriggerStatsTx :: Q.TxE QErr [CronTriggerStats]
getDeprivedCronTriggerStatsTx =
map (\(n, count, maxTx) -> CronTriggerStats n count maxTx) <$>
Q.listQE defaultTxErrorHandler
count(*) as upcoming_events_count,
max(scheduled_time) as max_scheduled_time
FROM hdb_catalog.hdb_cron_events
WHERE tries = 0 and status = 'scheduled'
GROUP BY trigger_name
) AS q
WHERE q.upcoming_events_count < 100
|] () True
getScheduledEventsForDeliveryTx :: Q.TxE QErr ([CronEvent], [OneOffScheduledEvent])
getScheduledEventsForDeliveryTx =
(,) <$> getCronEventsForDelivery <*> getOneOffEventsForDelivery
getCronEventsForDelivery :: Q.TxE QErr [CronEvent]
getCronEventsForDelivery =
map (Q.getAltJ . runIdentity) <$> Q.listQE defaultTxErrorHandler [Q.sql|
( UPDATE hdb_catalog.hdb_cron_events
SET status = 'locked'
FROM hdb_catalog.hdb_cron_events t
WHERE ( t.status = 'scheduled'
and (
(t.next_retry_at is NULL and t.scheduled_time <= now()) or
(t.next_retry_at is not NULL and t.next_retry_at <= now())
SELECT row_to_json(t.*) FROM cte AS t
|] () True
getOneOffEventsForDelivery :: Q.TxE QErr [OneOffScheduledEvent]
getOneOffEventsForDelivery = do
map (Q.getAltJ . runIdentity) <$> Q.listQE defaultTxErrorHandler [Q.sql|
WITH cte AS (
UPDATE hdb_catalog.hdb_scheduled_events
SET status = 'locked'
FROM hdb_catalog.hdb_scheduled_events t
WHERE ( t.status = 'scheduled'
and (
(t.next_retry_at is NULL and t.scheduled_time <= now()) or
(t.next_retry_at is not NULL and t.next_retry_at <= now())
SELECT row_to_json(t.*) FROM cte AS t
|] () False
insertInvocationTx :: Invocation 'ScheduledType -> ScheduledEventType -> Q.TxE QErr ()
insertInvocationTx invo type' = do
2020-05-13 15:33:16 +03:00
case type' of
2020-09-09 09:47:34 +03:00
Cron -> do
2020-05-13 15:33:16 +03:00
Q.unitQE defaultTxErrorHandler
INSERT INTO hdb_catalog.hdb_cron_event_invocation_logs
(event_id, status, request, response)
VALUES ($1, $2, $3, $4)
|] ( iEventId invo
, fromIntegral $ iStatus invo :: Int64
, Q.AltJ $ J.toJSON $ iRequest invo
, Q.AltJ $ J.toJSON $ iResponse invo) True
Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.hdb_cron_events
SET tries = tries + 1
WHERE id = $1
|] (Identity $ iEventId invo) True
2020-09-09 09:47:34 +03:00
OneOff -> do
2020-05-13 15:33:16 +03:00
Q.unitQE defaultTxErrorHandler
INSERT INTO hdb_catalog.hdb_scheduled_event_invocation_logs
(event_id, status, request, response)
VALUES ($1, $2, $3, $4)
|] ( iEventId invo
, fromIntegral $ iStatus invo :: Int64
, Q.AltJ $ J.toJSON $ iRequest invo
, Q.AltJ $ J.toJSON $ iResponse invo) True
Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.hdb_scheduled_events
SET tries = tries + 1
WHERE id = $1
|] (Identity $ iEventId invo) True
2020-11-25 13:56:44 +03:00
:: ScheduledEventId -> ScheduledEventOp -> ScheduledEventType -> Q.TxE QErr ()
setScheduledEventOpTx eventId op type' = case op of
SEOpRetry time -> setRetry time
SEOpStatus status -> setStatus status
2020-05-13 15:33:16 +03:00
2020-11-25 13:56:44 +03:00
setRetry time =
case type' of
Cron ->
Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.hdb_cron_events
SET next_retry_at = $1,
STATUS = 'scheduled'
WHERE id = $2
|] (time, eventId) True
OneOff ->
Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.hdb_scheduled_events
SET next_retry_at = $1,
STATUS = 'scheduled'
WHERE id = $2
|] (time, eventId) True
setStatus status =
case type' of
Cron -> do
Q.unitQE defaultTxErrorHandler
UPDATE hdb_catalog.hdb_cron_events
SET status = $2
WHERE id = $1
|] (eventId, status) True
OneOff -> do
Q.unitQE defaultTxErrorHandler
UPDATE hdb_catalog.hdb_scheduled_events
SET status = $2
WHERE id = $1
|] (eventId, status) True
unlockScheduledEventsTx :: ScheduledEventType -> [ScheduledEventId] -> Q.TxE QErr Int
unlockScheduledEventsTx type' eventIds =
case type' of
Cron ->
(runIdentity . Q.getRow) <$> Q.withQE defaultTxErrorHandler
WITH "cte" AS
(UPDATE hdb_catalog.hdb_cron_events
SET status = 'scheduled'
WHERE id = ANY($1::text[]) and status = 'locked'
SELECT count(*) FROM "cte"
|] (Identity $ ScheduledEventIdArray eventIds) True
OneOff ->
(runIdentity . Q.getRow) <$> Q.withQE defaultTxErrorHandler
WITH "cte" AS
(UPDATE hdb_catalog.hdb_scheduled_events
SET status = 'scheduled'
WHERE id = ANY($1::text[]) AND status = 'locked'
SELECT count(*) FROM "cte"
|] (Identity $ ScheduledEventIdArray eventIds) True
unlockAllLockedScheduledEventsTx :: Q.TxE QErr ()
unlockAllLockedScheduledEventsTx = do
2020-07-02 14:57:09 +03:00
Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.hdb_cron_events
SET status = 'scheduled'
WHERE status = 'locked'
|] () True
Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.hdb_scheduled_events
SET status = 'scheduled'
WHERE status = 'locked'
|] () True
2020-11-25 13:56:44 +03:00
insertScheduledEventTx :: ScheduledEventSeed -> Q.TxE QErr ()
insertScheduledEventTx = \case
SESOneOff CreateScheduledEvent{..} ->
Q.unitQE defaultTxErrorHandler
INSERT INTO hdb_catalog.hdb_scheduled_events
($1, $2, $3, $4, $5, $6)
|] ( Q.AltJ cseWebhook
, cseScheduleAt
, Q.AltJ csePayload
, Q.AltJ cseRetryConf
, Q.AltJ cseHeaders
, cseComment)
SESCron cronSeeds -> insertCronEventsTx cronSeeds
insertCronEventsTx :: [CronEventSeed] -> Q.TxE QErr ()
insertCronEventsTx events = do
let insertCronEventsSql = TB.run $ toSQL
2021-01-07 12:04:22 +03:00
2020-11-25 13:56:44 +03:00
{ siTable = cronEventsTable
, siCols = map unsafePGCol ["trigger_name", "scheduled_time"]
2021-01-07 12:04:22 +03:00
, siValues = S.ValuesExp $ map (toTupleExp . toArr) events
, siConflict = Just $ S.DoNothing Nothing
2020-11-25 13:56:44 +03:00
, siRet = Nothing
Q.unitQE defaultTxErrorHandler (Q.fromText insertCronEventsSql) () False
toArr (CronEventSeed n t) = [(triggerNameToTxt n), (formatTime' t)]
2021-01-07 12:04:22 +03:00
toTupleExp = S.TupleExp . map S.SELit
2020-11-25 13:56:44 +03:00
2020-12-14 07:30:19 +03:00
dropFutureCronEventsTx :: TriggerName -> Q.TxE QErr ()
dropFutureCronEventsTx name =
Q.unitQE defaultTxErrorHandler
DELETE FROM hdb_catalog.hdb_cron_events
WHERE trigger_name = $1 AND scheduled_time > now() AND tries = 0
|] (Identity name) False
2020-11-25 13:56:44 +03:00
cronEventsTable :: QualifiedTable
cronEventsTable =
QualifiedObject "hdb_catalog" $ TableName "hdb_cron_events"
2021-01-07 12:04:22 +03:00
mkScheduledEventStatusFilter :: [ScheduledEventStatus] -> S.BoolExp
mkScheduledEventStatusFilter = \case
[] -> S.BELit True
v -> S.BEIN (S.SEIdentifier $ Identifier "status")
$ map (S.SELit . scheduledEventStatusToText) v
scheduledTimeOrderBy :: S.OrderByExp
scheduledTimeOrderBy =
let scheduledTimeCol = S.SEIdentifier $ Identifier "scheduled_time"
in S.OrderByExp $ flip (NE.:|) [] $ S.OrderByItem scheduledTimeCol
(Just S.OTAsc) Nothing
-- | Build a select expression which outputs total count and
-- list of json rows with pagination limit and offset applied
:: S.Select
-> ScheduledEventPagination
-> S.Select
mkPaginationSelectExp allRowsSelect ScheduledEventPagination{..} =
{ S.selCTEs = [(S.toAlias countCteAlias, allRowsSelect), (S.toAlias limitCteAlias, limitCteSelect)]
, S.selExtr = [countExtractor, rowsExtractor]
countCteAlias = Identifier "count_cte"
limitCteAlias = Identifier "limit_cte"
countExtractor =
let selectExp = S.mkSelect
{ S.selExtr = [S.Extractor S.countStar Nothing]
, S.selFrom = Just $ S.mkIdenFromExp countCteAlias
in S.Extractor (S.SESelect selectExp) Nothing
limitCteSelect = S.mkSelect
{ S.selExtr = [S.selectStar]
, S.selFrom = Just $ S.mkIdenFromExp countCteAlias
, S.selLimit = (S.LimitExp . S.intToSQLExp) <$> _sepLimit
, S.selOffset = (S.OffsetExp . S.intToSQLExp) <$> _sepOffset
rowsExtractor =
let jsonAgg = S.SEUnsafe "json_agg(row_to_json(limit_cte.*))"
selectExp = S.mkSelect
{ S.selExtr = [S.Extractor jsonAgg Nothing]
, S.selFrom = Just $ S.mkIdenFromExp limitCteAlias
in S.Extractor (S.handleIfNull (S.SELit "[]") (S.SESelect selectExp)) Nothing
withCount :: (Int, Q.AltJ a) -> WithTotalCount a
withCount (count, Q.AltJ a) = WithTotalCount count a
:: ScheduledEventPagination
-> [ScheduledEventStatus]
-> Q.TxE QErr (WithTotalCount [OneOffScheduledEvent])
getOneOffScheduledEventsTx pagination statuses = do
let table = QualifiedObject "hdb_catalog" $ TableName "hdb_scheduled_events"
statusFilter = mkScheduledEventStatusFilter statuses
select = S.mkSelect
{ S.selExtr = [S.selectStar]
, S.selFrom = Just $ S.mkSimpleFromExp table
, S.selWhere = Just $ S.WhereFrag statusFilter
, S.selOrderBy = Just scheduledTimeOrderBy
sql = Q.fromBuilder $ toSQL $ mkPaginationSelectExp select pagination
(withCount . Q.getRow) <$> Q.withQE defaultTxErrorHandler sql () False
:: TriggerName
-> ScheduledEventPagination
-> [ScheduledEventStatus]
-> Q.TxE QErr (WithTotalCount [CronEvent])
getCronEventsTx triggerName pagination status = do
let triggerNameFilter =
S.BECompare S.SEQ (S.SEIdentifier $ Identifier "trigger_name") (S.SELit $ triggerNameToTxt triggerName)
statusFilter = mkScheduledEventStatusFilter status
select = S.mkSelect
{ S.selExtr = [S.selectStar]
, S.selFrom = Just $ S.mkSimpleFromExp cronEventsTable
, S.selWhere = Just $ S.WhereFrag $ S.BEBin S.AndOp triggerNameFilter statusFilter
, S.selOrderBy = Just scheduledTimeOrderBy
sql = Q.fromBuilder $ toSQL $ mkPaginationSelectExp select pagination
(withCount . Q.getRow) <$> Q.withQE defaultTxErrorHandler sql () False
:: ScheduledEventId -> ScheduledEventType -> Q.TxE QErr ()
deleteScheduledEventTx eventId = \case
OneOff ->
Q.unitQE defaultTxErrorHandler [Q.sql|
DELETE FROM hdb_catalog.hdb_scheduled_events
WHERE id = $1
|] (Identity eventId) False
Cron ->
Q.unitQE defaultTxErrorHandler [Q.sql|
DELETE FROM hdb_catalog.hdb_cron_events
WHERE id = $1
|] (Identity eventId) False
invocationFieldExtractors :: QualifiedTable -> [S.Extractor]
invocationFieldExtractors table =
[ S.Extractor (seIden "id") Nothing
, S.Extractor (seIden "event_id") Nothing
, S.Extractor (seIden "status") Nothing
, S.Extractor (withJsonTypeAnn $ seIden "request") Nothing
, S.Extractor (withJsonTypeAnn $ seIden "response") Nothing
, S.Extractor (seIden "created_at") Nothing
withJsonTypeAnn e = S.SETyAnn e $ S.TypeAnn "json"
seIden = S.SEQIdentifier . S.mkQIdentifierTable table . Identifier
mkEventIdBoolExp :: QualifiedTable -> EventId -> S.BoolExp
mkEventIdBoolExp table eventId =
S.BECompare S.SEQ (S.SEQIdentifier $ S.mkQIdentifierTable table $ Identifier "event_id")
(S.SELit $ unEventId eventId)
:: GetInvocationsBy
-> ScheduledEventPagination
-> Q.TxE QErr (WithTotalCount [ScheduledEventInvocation])
getInvocationsTx invocationsBy pagination = do
let sql = Q.fromBuilder $ toSQL $ mkPaginationSelectExp allRowsSelect pagination
(withCount . Q.getRow) <$> Q.withQE defaultTxErrorHandler sql () True
createdAtOrderBy table =
let createdAtCol = S.SEQIdentifier $ S.mkQIdentifierTable table $ Identifier "created_at"
in S.OrderByExp $ flip (NE.:|) [] $ S.OrderByItem createdAtCol (Just S.OTDesc) Nothing
oneOffInvocationsTable = QualifiedObject "hdb_catalog" $ TableName "hdb_scheduled_event_invocation_logs"
cronInvocationsTable = QualifiedObject "hdb_catalog" $ TableName "hdb_cron_event_invocation_logs"
allRowsSelect = case invocationsBy of
GIBEventId eventId eventType ->
let table = case eventType of
OneOff -> oneOffInvocationsTable
Cron -> cronInvocationsTable
in S.mkSelect
{ S.selExtr = invocationFieldExtractors table
, S.selFrom = Just $ S.mkSimpleFromExp table
, S.selOrderBy = Just $ createdAtOrderBy table
, S.selWhere = Just $ S.WhereFrag $ mkEventIdBoolExp table eventId
GIBEvent event -> case event of
SEOneOff ->
let table = oneOffInvocationsTable
in S.mkSelect
{ S.selExtr = invocationFieldExtractors table
, S.selFrom = Just $ S.mkSimpleFromExp table
, S.selOrderBy = Just $ createdAtOrderBy table
SECron triggerName ->
let invocationTable = cronInvocationsTable
eventTable = cronEventsTable
joinCondition = S.JoinOn $ S.BECompare S.SEQ
(S.SEQIdentifier $ S.mkQIdentifierTable eventTable $ Identifier "id")
(S.SEQIdentifier $ S.mkQIdentifierTable invocationTable $ Identifier "event_id")
joinTables =
S.JoinExpr (S.FISimple invocationTable Nothing) S.Inner
(S.FISimple eventTable Nothing) joinCondition
triggerBoolExp = S.BECompare S.SEQ
(S.SEQIdentifier $ S.mkQIdentifierTable eventTable (Identifier "trigger_name"))
(S.SELit $ triggerNameToTxt triggerName)
in S.mkSelect
{ S.selExtr = invocationFieldExtractors invocationTable
, S.selFrom = Just $ S.FromExp [S.FIJoin joinTables]
, S.selWhere = Just $ S.WhereFrag triggerBoolExp
, S.selOrderBy = Just $ createdAtOrderBy invocationTable