2021-09-29 11:13:30 +03:00
|
|
|
|
{-# LANGUAGE PatternSynonyms #-}
|
|
|
|
|
|
2021-09-24 01:56:37 +03:00
|
|
|
|
-- |
|
|
|
|
|
-- = Scheduled Triggers
|
|
|
|
|
--
|
|
|
|
|
-- This module implements the functionality of invoking webhooks during specified
|
|
|
|
|
-- time events aka scheduled events. The scheduled events are the events generated
|
|
|
|
|
-- by the graphql-engine using the cron triggers or/and a scheduled event can
|
|
|
|
|
-- be created by the user at a specified time with the payload, webhook, headers
|
|
|
|
|
-- and the retry configuration. Scheduled events are modeled using rows in Postgres
|
|
|
|
|
-- with a @timestamp@ column.
|
|
|
|
|
--
|
|
|
|
|
-- This module implements scheduling and delivery of scheduled
|
|
|
|
|
-- events:
|
|
|
|
|
--
|
|
|
|
|
-- 1. Scheduling a cron event involves creating new cron events. New
|
|
|
|
|
-- cron events are created based on the cron schedule and the number of
|
|
|
|
|
-- scheduled events that are already present in the scheduled events buffer.
|
|
|
|
|
-- The graphql-engine computes the new scheduled events and writes them to
|
|
|
|
|
-- the database.(Generator)
|
|
|
|
|
--
|
|
|
|
|
-- 2. Delivering a scheduled event involves reading undelivered scheduled events
|
|
|
|
|
-- from the database and delivering them to the webhook server. (Processor)
|
|
|
|
|
--
|
|
|
|
|
-- The rationale behind separating the event scheduling and event delivery
|
|
|
|
|
-- mechanism into two different threads is that the scheduling and delivering of
|
|
|
|
|
-- the scheduled events are not directly dependent on each other. The generator
|
|
|
|
|
-- will almost always try to create scheduled events which are supposed to be
|
|
|
|
|
-- delivered in the future (timestamp > current_timestamp) and the processor
|
|
|
|
|
-- will fetch scheduled events of the past (timestamp < current_timestamp). So,
|
|
|
|
|
-- the set of the scheduled events generated by the generator and the processor
|
|
|
|
|
-- will never be the same. The point here is that they're not correlated to each
|
|
|
|
|
-- other. They can be split into different threads for a better performance.
|
|
|
|
|
--
|
|
|
|
|
-- == Implementation
|
|
|
|
|
--
|
|
|
|
|
-- The scheduled triggers eventing is being implemented in the metadata storage.
|
|
|
|
|
-- All functions that make interaction to storage system are abstracted in
|
|
|
|
|
-- the @'MonadMetadataStorage' class.
|
|
|
|
|
--
|
|
|
|
|
-- During the startup, two threads are started:
|
|
|
|
|
--
|
|
|
|
|
-- 1. Generator: Fetches the list of scheduled triggers from cache and generates
|
|
|
|
|
-- the scheduled events.
|
|
|
|
|
--
|
|
|
|
|
-- - Additional events will be generated only if there are fewer than 100
|
|
|
|
|
-- scheduled events.
|
|
|
|
|
--
|
|
|
|
|
-- - The upcoming events timestamp will be generated using:
|
|
|
|
|
--
|
|
|
|
|
-- - cron schedule of the scheduled trigger
|
|
|
|
|
--
|
|
|
|
|
-- - max timestamp of the scheduled events that already exist or
|
|
|
|
|
-- current_timestamp(when no scheduled events exist)
|
|
|
|
|
--
|
|
|
|
|
-- - The timestamp of the scheduled events is stored with timezone because
|
|
|
|
|
-- `SELECT NOW()` returns timestamp with timezone, so it's good to
|
|
|
|
|
-- compare two things of the same type.
|
|
|
|
|
--
|
|
|
|
|
-- This effectively corresponds to doing an INSERT with values containing
|
|
|
|
|
-- specific timestamp.
|
|
|
|
|
--
|
|
|
|
|
-- 2. Processor: Fetches the undelivered cron events and the scheduled events
|
|
|
|
|
-- from the database and which have timestamp lesser than the
|
|
|
|
|
-- current timestamp and then process them.
|
|
|
|
|
--
|
|
|
|
|
-- TODO
|
|
|
|
|
-- - Consider and document ordering guarantees
|
|
|
|
|
-- - do we have any in the presence of multiple hasura instances?
|
|
|
|
|
-- - If we have nothing useful to say about ordering, then consider processing
|
|
|
|
|
-- events asynchronously, so that a slow webhook doesn't cause everything
|
|
|
|
|
-- subsequent to be delayed
|
2020-05-13 15:33:16 +03:00
|
|
|
|
module Hasura.Eventing.ScheduledTrigger
|
2021-09-24 01:56:37 +03:00
|
|
|
|
( runCronEventsGenerator,
|
|
|
|
|
processScheduledTriggers,
|
|
|
|
|
generateScheduleTimes,
|
|
|
|
|
CronEventSeed (..),
|
|
|
|
|
LockedEventsCtx (..),
|
|
|
|
|
|
|
|
|
|
-- * Database interactions
|
|
|
|
|
|
|
|
|
|
-- Following function names are similar to those present in
|
|
|
|
|
-- 'MonadMetadataStorage' type class. To avoid duplication,
|
|
|
|
|
-- 'Tx' is suffixed to identify as database transactions
|
|
|
|
|
getDeprivedCronTriggerStatsTx,
|
|
|
|
|
getScheduledEventsForDeliveryTx,
|
|
|
|
|
insertInvocationTx,
|
|
|
|
|
setScheduledEventOpTx,
|
|
|
|
|
unlockScheduledEventsTx,
|
|
|
|
|
unlockAllLockedScheduledEventsTx,
|
|
|
|
|
insertCronEventsTx,
|
|
|
|
|
insertOneOffScheduledEventTx,
|
|
|
|
|
dropFutureCronEventsTx,
|
|
|
|
|
getOneOffScheduledEventsTx,
|
|
|
|
|
getCronEventsTx,
|
|
|
|
|
deleteScheduledEventTx,
|
|
|
|
|
getInvocationsTx,
|
|
|
|
|
getInvocationsQuery,
|
|
|
|
|
getInvocationsQueryNoPagination,
|
|
|
|
|
|
|
|
|
|
-- * Export utility functions which are useful to build
|
|
|
|
|
|
|
|
|
|
-- SQLs for fetching data from metadata storage
|
|
|
|
|
mkScheduledEventStatusFilter,
|
|
|
|
|
scheduledTimeOrderBy,
|
|
|
|
|
mkPaginationSelectExp,
|
|
|
|
|
withCount,
|
|
|
|
|
invocationFieldExtractors,
|
|
|
|
|
mkEventIdBoolExp,
|
|
|
|
|
EventTables (..),
|
|
|
|
|
)
|
|
|
|
|
where
|
|
|
|
|
|
|
|
|
|
import Control.Arrow.Extended (dup)
|
|
|
|
|
import Control.Concurrent.Extended (Forever (..), sleep)
|
|
|
|
|
import Control.Concurrent.STM
|
2021-12-07 01:39:29 +03:00
|
|
|
|
import Control.Lens (view)
|
2021-09-24 01:56:37 +03:00
|
|
|
|
import Data.Aeson qualified as J
|
|
|
|
|
import Data.Environment qualified as Env
|
|
|
|
|
import Data.Has
|
|
|
|
|
import Data.HashMap.Strict qualified as Map
|
|
|
|
|
import Data.Int (Int64)
|
|
|
|
|
import Data.List (unfoldr)
|
|
|
|
|
import Data.List.NonEmpty qualified as NE
|
|
|
|
|
import Data.Set qualified as Set
|
|
|
|
|
import Data.TByteString qualified as TBS
|
|
|
|
|
import Data.Time.Clock
|
|
|
|
|
import Database.PG.Query qualified as Q
|
|
|
|
|
import Hasura.Backends.Postgres.SQL.DML qualified as S
|
|
|
|
|
import Hasura.Backends.Postgres.SQL.Types
|
|
|
|
|
import Hasura.Base.Error
|
|
|
|
|
import Hasura.Eventing.Common
|
|
|
|
|
import Hasura.Eventing.HTTP
|
|
|
|
|
import Hasura.Eventing.ScheduledTrigger.Types
|
|
|
|
|
import Hasura.HTTP (getHTTPExceptionStatus)
|
|
|
|
|
import Hasura.Logging qualified as L
|
|
|
|
|
import Hasura.Metadata.Class
|
|
|
|
|
import Hasura.Prelude
|
|
|
|
|
import Hasura.RQL.DDL.EventTrigger (getHeaderInfosFromConf)
|
|
|
|
|
import Hasura.RQL.DDL.Headers
|
2022-03-08 03:42:06 +03:00
|
|
|
|
import Hasura.RQL.DDL.Webhook.Transform
|
2021-09-24 01:56:37 +03:00
|
|
|
|
import Hasura.RQL.Types
|
|
|
|
|
import Hasura.SQL.Types
|
|
|
|
|
import Hasura.Tracing qualified as Tracing
|
|
|
|
|
import Network.HTTP.Client.Transformable qualified as HTTP
|
|
|
|
|
import System.Cron
|
|
|
|
|
import Text.Builder qualified as TB
|
2020-07-03 03:55:07 +03:00
|
|
|
|
|
2020-05-13 15:33:16 +03:00
|
|
|
|
-- | runCronEventsGenerator makes sure that all the cron triggers
|
|
|
|
|
-- have an adequate buffer of cron events.
|
2021-09-24 01:56:37 +03:00
|
|
|
|
runCronEventsGenerator ::
|
|
|
|
|
( MonadIO m,
|
|
|
|
|
MonadMetadataStorage (MetadataStorageT m)
|
|
|
|
|
) =>
|
|
|
|
|
L.Logger L.Hasura ->
|
|
|
|
|
IO SchemaCache ->
|
|
|
|
|
m void
|
2020-11-25 13:56:44 +03:00
|
|
|
|
runCronEventsGenerator logger getSC = do
|
2020-05-13 15:33:16 +03:00
|
|
|
|
forever $ do
|
2020-11-25 13:56:44 +03:00
|
|
|
|
sc <- liftIO getSC
|
2020-05-13 15:33:16 +03:00
|
|
|
|
-- get cron triggers from cache
|
|
|
|
|
let cronTriggersCache = scCronTriggers sc
|
|
|
|
|
|
2021-04-27 08:34:14 +03:00
|
|
|
|
unless (Map.null cronTriggersCache) $ do
|
2021-03-30 15:57:38 +03:00
|
|
|
|
-- Poll the DB only when there's at-least one cron trigger present
|
|
|
|
|
-- in the schema cache
|
|
|
|
|
-- get cron trigger stats from db
|
2021-05-14 12:38:37 +03:00
|
|
|
|
-- When shutdown is initiated, we stop generating new cron events
|
2021-03-30 15:57:38 +03:00
|
|
|
|
eitherRes <- runMetadataStorageT $ do
|
2021-05-26 19:19:26 +03:00
|
|
|
|
deprivedCronTriggerStats <- getDeprivedCronTriggerStats $ Map.keys cronTriggersCache
|
2021-03-30 15:57:38 +03:00
|
|
|
|
-- join stats with cron triggers and produce @[(CronTriggerInfo, CronTriggerStats)]@
|
|
|
|
|
cronTriggersForHydrationWithStats <-
|
2021-09-24 01:56:37 +03:00
|
|
|
|
catMaybes
|
|
|
|
|
<$> mapM (withCronTrigger cronTriggersCache) deprivedCronTriggerStats
|
2021-03-30 15:57:38 +03:00
|
|
|
|
insertCronEventsFor cronTriggersForHydrationWithStats
|
|
|
|
|
|
2021-09-24 01:56:37 +03:00
|
|
|
|
onLeft eitherRes $
|
|
|
|
|
L.unLogger logger
|
|
|
|
|
. ScheduledTriggerInternalErr
|
|
|
|
|
. err500 Unexpected
|
|
|
|
|
. tshow
|
2020-05-13 15:33:16 +03:00
|
|
|
|
|
2021-04-27 08:34:14 +03:00
|
|
|
|
-- See discussion: https://github.com/hasura/graphql-engine-mono/issues/1001
|
2020-11-25 13:56:44 +03:00
|
|
|
|
liftIO $ sleep (minutes 1)
|
2021-09-24 01:56:37 +03:00
|
|
|
|
where
|
|
|
|
|
withCronTrigger cronTriggerCache cronTriggerStat = do
|
|
|
|
|
case Map.lookup (ctsName cronTriggerStat) cronTriggerCache of
|
|
|
|
|
Nothing -> do
|
|
|
|
|
L.unLogger logger $
|
|
|
|
|
ScheduledTriggerInternalErr $
|
|
|
|
|
err500 Unexpected "could not find scheduled trigger in the schema cache"
|
|
|
|
|
pure Nothing
|
|
|
|
|
Just cronTrigger ->
|
|
|
|
|
pure $
|
2020-05-13 15:33:16 +03:00
|
|
|
|
Just (cronTrigger, cronTriggerStat)
|
|
|
|
|
|
2021-09-24 01:56:37 +03:00
|
|
|
|
insertCronEventsFor ::
|
|
|
|
|
(MonadMetadataStorage m) =>
|
|
|
|
|
[(CronTriggerInfo, CronTriggerStats)] ->
|
|
|
|
|
m ()
|
2020-05-13 15:33:16 +03:00
|
|
|
|
insertCronEventsFor cronTriggersWithStats = do
|
|
|
|
|
let scheduledEvents = flip concatMap cronTriggersWithStats $ \(cti, stats) ->
|
|
|
|
|
generateCronEventsFrom (ctsMaxScheduledTime stats) cti
|
|
|
|
|
case scheduledEvents of
|
2021-09-24 01:56:37 +03:00
|
|
|
|
[] -> pure ()
|
2021-09-13 21:00:53 +03:00
|
|
|
|
events -> insertCronEvents events
|
2020-05-13 15:33:16 +03:00
|
|
|
|
|
2021-09-24 01:56:37 +03:00
|
|
|
|
generateCronEventsFrom :: UTCTime -> CronTriggerInfo -> [CronEventSeed]
|
|
|
|
|
generateCronEventsFrom startTime CronTriggerInfo {..} =
|
2020-05-13 15:33:16 +03:00
|
|
|
|
map (CronEventSeed ctiName) $
|
2021-09-24 01:56:37 +03:00
|
|
|
|
-- generate next 100 events; see getDeprivedCronTriggerStatsTx:
|
|
|
|
|
generateScheduleTimes startTime 100 ctiSchedule
|
2020-05-13 15:33:16 +03:00
|
|
|
|
|
|
|
|
|
-- | Generates next @n events starting @from according to 'CronSchedule'
|
|
|
|
|
generateScheduleTimes :: UTCTime -> Int -> CronSchedule -> [UTCTime]
|
|
|
|
|
generateScheduleTimes from n cron = take n $ go from
|
|
|
|
|
where
|
|
|
|
|
go = unfoldr (fmap dup . nextMatch cron)
|
|
|
|
|
|
2021-09-24 01:56:37 +03:00
|
|
|
|
processCronEvents ::
|
2021-10-13 19:38:56 +03:00
|
|
|
|
( MonadIO m,
|
2021-09-24 01:56:37 +03:00
|
|
|
|
Tracing.HasReporter m,
|
|
|
|
|
MonadMetadataStorage (MetadataStorageT m)
|
|
|
|
|
) =>
|
|
|
|
|
L.Logger L.Hasura ->
|
|
|
|
|
LogBehavior ->
|
|
|
|
|
HTTP.Manager ->
|
|
|
|
|
[CronEvent] ->
|
|
|
|
|
IO SchemaCache ->
|
|
|
|
|
TVar (Set.Set CronEventId) ->
|
|
|
|
|
m ()
|
2021-07-02 20:24:49 +03:00
|
|
|
|
processCronEvents logger logBehavior httpMgr cronEvents getSC lockedCronEvents = do
|
2020-07-14 22:00:58 +03:00
|
|
|
|
cronTriggersInfo <- scCronTriggers <$> liftIO getSC
|
2020-11-25 13:56:44 +03:00
|
|
|
|
-- save the locked cron events that have been fetched from the
|
|
|
|
|
-- database, the events stored here will be unlocked in case a
|
|
|
|
|
-- graceful shutdown is initiated in midst of processing these events
|
|
|
|
|
saveLockedEvents (map _ceId cronEvents) lockedCronEvents
|
|
|
|
|
-- The `createdAt` of a cron event is the `created_at` of the cron trigger
|
2021-09-24 01:56:37 +03:00
|
|
|
|
for_ cronEvents $ \(CronEvent id' name st _ tries _ _) -> do
|
2020-11-25 13:56:44 +03:00
|
|
|
|
case Map.lookup name cronTriggersInfo of
|
2021-09-24 01:56:37 +03:00
|
|
|
|
Nothing ->
|
|
|
|
|
logInternalError $
|
|
|
|
|
err500 Unexpected "could not find cron trigger in cache"
|
|
|
|
|
Just CronTriggerInfo {..} -> do
|
|
|
|
|
let payload =
|
|
|
|
|
ScheduledEventWebhookPayload
|
|
|
|
|
id'
|
|
|
|
|
(Just name)
|
|
|
|
|
st
|
|
|
|
|
(fromMaybe J.Null ctiPayload)
|
|
|
|
|
ctiComment
|
|
|
|
|
Nothing
|
2022-01-19 07:46:42 +03:00
|
|
|
|
ctiRequestTransform
|
|
|
|
|
ctiResponseTransform
|
2020-11-25 13:56:44 +03:00
|
|
|
|
retryCtx = RetryContext tries ctiRetryConf
|
2021-09-24 01:56:37 +03:00
|
|
|
|
finally <-
|
|
|
|
|
runMetadataStorageT $
|
|
|
|
|
flip runReaderT (logger, httpMgr) $
|
|
|
|
|
processScheduledEvent
|
|
|
|
|
logBehavior
|
|
|
|
|
id'
|
|
|
|
|
ctiHeaders
|
|
|
|
|
retryCtx
|
|
|
|
|
payload
|
|
|
|
|
ctiWebhookInfo
|
|
|
|
|
Cron
|
2020-11-25 13:56:44 +03:00
|
|
|
|
removeEventFromLockedEvents id' lockedCronEvents
|
|
|
|
|
onLeft finally logInternalError
|
2020-05-13 15:33:16 +03:00
|
|
|
|
where
|
2020-07-14 22:00:58 +03:00
|
|
|
|
logInternalError err = liftIO . L.unLogger logger $ ScheduledTriggerInternalErr err
|
2020-05-13 15:33:16 +03:00
|
|
|
|
|
2021-09-24 01:56:37 +03:00
|
|
|
|
processOneOffScheduledEvents ::
|
2021-10-13 19:38:56 +03:00
|
|
|
|
( MonadIO m,
|
2021-09-24 01:56:37 +03:00
|
|
|
|
Tracing.HasReporter m,
|
|
|
|
|
MonadMetadataStorage (MetadataStorageT m)
|
|
|
|
|
) =>
|
|
|
|
|
Env.Environment ->
|
|
|
|
|
L.Logger L.Hasura ->
|
|
|
|
|
LogBehavior ->
|
|
|
|
|
HTTP.Manager ->
|
|
|
|
|
[OneOffScheduledEvent] ->
|
|
|
|
|
TVar (Set.Set OneOffScheduledEventId) ->
|
|
|
|
|
m ()
|
2020-09-09 09:47:34 +03:00
|
|
|
|
processOneOffScheduledEvents
|
2021-09-24 01:56:37 +03:00
|
|
|
|
env
|
|
|
|
|
logger
|
|
|
|
|
logBehavior
|
|
|
|
|
httpMgr
|
|
|
|
|
oneOffEvents
|
|
|
|
|
lockedOneOffScheduledEvents = do
|
|
|
|
|
-- save the locked one-off events that have been fetched from the
|
|
|
|
|
-- database, the events stored here will be unlocked in case a
|
|
|
|
|
-- graceful shutdown is initiated in midst of processing these events
|
|
|
|
|
saveLockedEvents (map _ooseId oneOffEvents) lockedOneOffScheduledEvents
|
|
|
|
|
for_ oneOffEvents $ \OneOffScheduledEvent {..} -> do
|
|
|
|
|
(either logInternalError pure) =<< runMetadataStorageT do
|
|
|
|
|
webhookInfo <- resolveWebhook env _ooseWebhookConf
|
|
|
|
|
headerInfo <- getHeaderInfosFromConf env _ooseHeaderConf
|
|
|
|
|
let payload =
|
|
|
|
|
ScheduledEventWebhookPayload
|
|
|
|
|
_ooseId
|
|
|
|
|
Nothing
|
|
|
|
|
_ooseScheduledTime
|
|
|
|
|
(fromMaybe J.Null _oosePayload)
|
|
|
|
|
_ooseComment
|
|
|
|
|
(Just _ooseCreatedAt)
|
2022-01-19 07:46:42 +03:00
|
|
|
|
_ooseRequestTransform
|
|
|
|
|
_ooseResponseTransform
|
2021-09-24 01:56:37 +03:00
|
|
|
|
retryCtx = RetryContext _ooseTries _ooseRetryConf
|
|
|
|
|
|
|
|
|
|
flip runReaderT (logger, httpMgr) $
|
|
|
|
|
processScheduledEvent logBehavior _ooseId headerInfo retryCtx payload webhookInfo OneOff
|
|
|
|
|
removeEventFromLockedEvents _ooseId lockedOneOffScheduledEvents
|
|
|
|
|
where
|
|
|
|
|
logInternalError err = liftIO . L.unLogger logger $ ScheduledTriggerInternalErr err
|
|
|
|
|
|
|
|
|
|
processScheduledTriggers ::
|
2021-10-13 19:38:56 +03:00
|
|
|
|
( MonadIO m,
|
2021-09-24 01:56:37 +03:00
|
|
|
|
Tracing.HasReporter m,
|
|
|
|
|
MonadMetadataStorage (MetadataStorageT m)
|
|
|
|
|
) =>
|
|
|
|
|
Env.Environment ->
|
|
|
|
|
L.Logger L.Hasura ->
|
|
|
|
|
LogBehavior ->
|
|
|
|
|
HTTP.Manager ->
|
|
|
|
|
IO SchemaCache ->
|
|
|
|
|
LockedEventsCtx ->
|
|
|
|
|
m (Forever m)
|
2021-07-02 20:24:49 +03:00
|
|
|
|
processScheduledTriggers env logger logBehavior httpMgr getSC LockedEventsCtx {..} = do
|
2021-09-24 01:56:37 +03:00
|
|
|
|
return $
|
|
|
|
|
Forever () $
|
|
|
|
|
const $ do
|
|
|
|
|
result <- runMetadataStorageT getScheduledEventsForDelivery
|
|
|
|
|
case result of
|
|
|
|
|
Left e -> logInternalError e
|
|
|
|
|
Right (cronEvents, oneOffEvents) -> do
|
|
|
|
|
processCronEvents logger logBehavior httpMgr cronEvents getSC leCronEvents
|
|
|
|
|
processOneOffScheduledEvents env logger logBehavior httpMgr oneOffEvents leOneOffEvents
|
2021-05-14 12:38:37 +03:00
|
|
|
|
-- NOTE: cron events are scheduled at times with minute resolution (as on
|
|
|
|
|
-- unix), while one-off events can be set for arbitrary times. The sleep
|
|
|
|
|
-- time here determines how overdue a scheduled event (cron or one-off)
|
|
|
|
|
-- might be before we begin processing:
|
2021-09-24 01:56:37 +03:00
|
|
|
|
liftIO $ sleep (seconds 10)
|
2020-11-25 13:56:44 +03:00
|
|
|
|
where
|
|
|
|
|
logInternalError err = liftIO . L.unLogger logger $ ScheduledTriggerInternalErr err
|
2020-05-13 15:33:16 +03:00
|
|
|
|
|
2021-09-24 01:56:37 +03:00
|
|
|
|
processScheduledEvent ::
|
|
|
|
|
( MonadReader r m,
|
|
|
|
|
Has HTTP.Manager r,
|
|
|
|
|
Has (L.Logger L.Hasura) r,
|
|
|
|
|
MonadIO m,
|
|
|
|
|
Tracing.HasReporter m,
|
|
|
|
|
MonadMetadataStorage m
|
|
|
|
|
) =>
|
|
|
|
|
LogBehavior ->
|
|
|
|
|
ScheduledEventId ->
|
|
|
|
|
[EventHeaderInfo] ->
|
|
|
|
|
RetryContext ->
|
|
|
|
|
ScheduledEventWebhookPayload ->
|
|
|
|
|
ResolvedWebhook ->
|
|
|
|
|
ScheduledEventType ->
|
|
|
|
|
m ()
|
|
|
|
|
processScheduledEvent logBehavior eventId eventHeaders retryCtx payload webhookUrl type' =
|
|
|
|
|
Tracing.runTraceT traceNote do
|
|
|
|
|
currentTime <- liftIO getCurrentTime
|
|
|
|
|
let retryConf = _rctxConf retryCtx
|
|
|
|
|
scheduledTime = sewpScheduledTime payload
|
|
|
|
|
if convertDuration (diffUTCTime currentTime scheduledTime)
|
|
|
|
|
> unNonNegativeDiffTime (strcToleranceSeconds retryConf)
|
|
|
|
|
then processDead eventId type'
|
|
|
|
|
else do
|
|
|
|
|
let timeoutSeconds =
|
|
|
|
|
round $
|
|
|
|
|
unNonNegativeDiffTime $
|
|
|
|
|
strcTimeoutSeconds retryConf
|
|
|
|
|
httpTimeout = HTTP.responseTimeoutMicro (timeoutSeconds * 1000000)
|
|
|
|
|
(headers, decodedHeaders) = prepareHeaders logBehavior eventHeaders
|
|
|
|
|
extraLogCtx = ExtraLogContext eventId (sewpName payload)
|
|
|
|
|
webhookReqBodyJson = J.toJSON payload
|
|
|
|
|
webhookReqBody = J.encode webhookReqBodyJson
|
2022-03-08 03:42:06 +03:00
|
|
|
|
requestTransform = sewpRequestTransform payload
|
2022-01-19 07:46:42 +03:00
|
|
|
|
responseTransform = mkResponseTransform <$> sewpResponseTransform payload
|
|
|
|
|
|
2021-12-07 01:39:29 +03:00
|
|
|
|
eitherReqRes <-
|
2021-09-29 11:13:30 +03:00
|
|
|
|
runExceptT $
|
2022-01-19 07:46:42 +03:00
|
|
|
|
mkRequest headers httpTimeout webhookReqBody requestTransform webhookUrl >>= \reqDetails -> do
|
2021-12-07 01:39:29 +03:00
|
|
|
|
let request = extractRequest reqDetails
|
|
|
|
|
logger e d = logHTTPForST e extraLogCtx d logBehavior
|
2022-03-08 03:42:06 +03:00
|
|
|
|
sessionVars = _rdSessionVars reqDetails
|
|
|
|
|
resp <- invokeRequest reqDetails responseTransform sessionVars logger
|
2021-12-07 01:39:29 +03:00
|
|
|
|
pure (request, resp)
|
|
|
|
|
case eitherReqRes of
|
|
|
|
|
Right (req, resp) ->
|
|
|
|
|
let reqBody = fromMaybe J.Null $ view HTTP.body req >>= J.decode @J.Value
|
|
|
|
|
in processSuccess eventId decodedHeaders type' reqBody resp
|
|
|
|
|
Left (HTTPError reqBody e) -> processError eventId retryCtx decodedHeaders type' reqBody e
|
|
|
|
|
Left (TransformationError _ e) -> do
|
2021-09-29 11:13:30 +03:00
|
|
|
|
-- Log The Transformation Error
|
|
|
|
|
logger :: L.Logger L.Hasura <- asks getter
|
|
|
|
|
L.unLogger logger $ L.UnstructuredLog L.LevelError (TBS.fromLBS $ J.encode e)
|
|
|
|
|
|
|
|
|
|
-- Set event state to Error
|
|
|
|
|
setScheduledEventOp eventId (SEOpStatus SESError) type'
|
2020-09-16 01:03:41 +03:00
|
|
|
|
where
|
2020-11-25 13:56:44 +03:00
|
|
|
|
traceNote = "Scheduled trigger" <> foldMap ((": " <>) . triggerNameToTxt) (sewpName payload)
|
2020-05-13 15:33:16 +03:00
|
|
|
|
|
2021-09-24 01:56:37 +03:00
|
|
|
|
processError ::
|
|
|
|
|
( MonadIO m,
|
|
|
|
|
MonadMetadataStorage m
|
|
|
|
|
) =>
|
|
|
|
|
ScheduledEventId ->
|
|
|
|
|
RetryContext ->
|
|
|
|
|
[HeaderConf] ->
|
|
|
|
|
ScheduledEventType ->
|
|
|
|
|
J.Value ->
|
|
|
|
|
HTTPErr a ->
|
|
|
|
|
m ()
|
2020-11-25 13:56:44 +03:00
|
|
|
|
processError eventId retryCtx decodedHeaders type' reqJson err = do
|
2020-05-13 15:33:16 +03:00
|
|
|
|
let invocation = case err of
|
2021-09-20 16:14:28 +03:00
|
|
|
|
HClient httpException ->
|
|
|
|
|
let statusMaybe = getHTTPExceptionStatus httpException
|
2021-09-24 01:56:37 +03:00
|
|
|
|
in mkInvocation eventId statusMaybe decodedHeaders (TBS.fromLBS $ J.encode httpException) [] reqJson
|
2020-05-13 15:33:16 +03:00
|
|
|
|
HStatus errResp -> do
|
|
|
|
|
let respPayload = hrsBody errResp
|
|
|
|
|
respHeaders = hrsHeaders errResp
|
|
|
|
|
respStatus = hrsStatus errResp
|
2021-09-20 16:14:28 +03:00
|
|
|
|
mkInvocation eventId (Just respStatus) decodedHeaders respPayload respHeaders reqJson
|
2020-05-13 15:33:16 +03:00
|
|
|
|
HOther detail -> do
|
2020-11-25 13:56:44 +03:00
|
|
|
|
let errMsg = (TBS.fromLBS $ J.encode detail)
|
2021-09-20 16:14:28 +03:00
|
|
|
|
mkInvocation eventId (Just 500) decodedHeaders errMsg [] reqJson
|
2020-11-25 13:56:44 +03:00
|
|
|
|
insertScheduledEventInvocation invocation type'
|
|
|
|
|
retryOrMarkError eventId retryCtx err type'
|
|
|
|
|
|
2021-09-24 01:56:37 +03:00
|
|
|
|
retryOrMarkError ::
|
|
|
|
|
(MonadIO m, MonadMetadataStorage m) =>
|
|
|
|
|
ScheduledEventId ->
|
|
|
|
|
RetryContext ->
|
|
|
|
|
HTTPErr a ->
|
|
|
|
|
ScheduledEventType ->
|
|
|
|
|
m ()
|
2020-11-25 13:56:44 +03:00
|
|
|
|
retryOrMarkError eventId retryCtx err type' = do
|
|
|
|
|
let RetryContext tries retryConf = retryCtx
|
|
|
|
|
mRetryHeader = getRetryAfterHeaderFromHTTPErr err
|
2020-05-13 15:33:16 +03:00
|
|
|
|
mRetryHeaderSeconds = parseRetryHeaderValue =<< mRetryHeader
|
2020-11-25 13:56:44 +03:00
|
|
|
|
triesExhausted = tries >= strcNumRetries retryConf
|
2020-05-13 15:33:16 +03:00
|
|
|
|
noRetryHeader = isNothing mRetryHeaderSeconds
|
|
|
|
|
if triesExhausted && noRetryHeader
|
2021-09-24 01:56:37 +03:00
|
|
|
|
then setScheduledEventOp eventId (SEOpStatus SESError) type'
|
2020-05-13 15:33:16 +03:00
|
|
|
|
else do
|
|
|
|
|
currentTime <- liftIO getCurrentTime
|
2021-09-24 01:56:37 +03:00
|
|
|
|
let delay =
|
|
|
|
|
fromMaybe
|
|
|
|
|
( round $
|
|
|
|
|
unNonNegativeDiffTime $
|
|
|
|
|
strcRetryIntervalSeconds retryConf
|
|
|
|
|
)
|
|
|
|
|
mRetryHeaderSeconds
|
2020-05-13 15:33:16 +03:00
|
|
|
|
diff = fromIntegral delay
|
|
|
|
|
retryTime = addUTCTime diff currentTime
|
2020-11-25 13:56:44 +03:00
|
|
|
|
setScheduledEventOp eventId (SEOpRetry retryTime) type'
|
2020-05-13 15:33:16 +03:00
|
|
|
|
|
|
|
|
|
{- Note [Scheduled event lifecycle]
|
|
|
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
|
Scheduled events move between six different states over the course of their
|
|
|
|
|
lifetime, as represented by the following flowchart:
|
Deploy server documentation to github page in CI
_(This PR is on top of #3352.)_
## Description
This PR overhauls our documentation CI steps to push all generated server documentation to the `gh-pages` branch of the OSS repo. The goal of this PR is to arrive in the situation where `https://hasura.github.io/graphql-engine/server/` is automatically populated to contain the following:
- all the markdown files from `server/documentation`, copied verbatim, no transformation applied
- all the notes, collected from the code by the `extract-notes.sh` script, in `server/notes`
- the generated haddock documentation for each major release or branch in `server/haddock`.
To do so, this PR does the following:
- it includes the script to extract notes from #3352,
- it rewrites the documentation checking CI step, to generate the notes and publish the resulting "server/documentation" folder,
- it includes a new CI step to deploy the documentation to the `gh-pages` branch
Of note:
- we will generate a different haddock folder for each main branch and release; in practice, that means the _main_, _stable_, _alpha_, _beta_ branches, and every build tagged with a version number
- the step that builds the haddock documentation checks that ALL projects in the repo build, including pro, but the deploy only deploys the graphql-engine documentation, as it pushes it to a publicly-accessible place
## Required work
**DO NOT MERGE THIS PR IT IS NOT READY**. Some work needs to go into this PR before it is ready.
First of all: the `gh-pages` branch of the OSS repo does NOT yet contain the documentation scaffolding that this new process assumes. At the bare minimum, it should be a orphan branch that contains a top-level README.md file, and a _server_ folder. An example of the bare minimum required can be previewed [on my fork](https://nicuveo.github.io/graphql-engine/server/).
The content of the `server/documentation` folder needs to be adjusted to reflect this; at the very least, a `README.md` file needs to be added to do the indexing (again, see the placeholder [on my fork](https://nicuveo.github.io/graphql-engine/server/) for an example).
This way of publishing documentation must be validated against [proposed changes to the documentation](https://github.com/hasura/graphql-engine-mono/pull/3294). @marionschleifer what do you think?
~~The buildkite code in this branch is currently untested, and I am not sure how to test it.~~
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/3380
GitOrigin-RevId: b24f6759c64ae29886c1f1b481b172febc512032
2022-01-31 16:15:31 +03:00
|
|
|
|
|
|
|
|
|
┌───────────┐ ┌────────┐ ┌───────────┐
|
|
|
|
|
│ scheduled │─(1)─→│ locked │─(2)─→│ delivered │
|
|
|
|
|
└───────────┘ └────────┘ └───────────┘
|
|
|
|
|
↑ │ ┌───────┐
|
|
|
|
|
└────(3)───────┼─────(4)──→│ error │
|
|
|
|
|
│ └───────┘
|
|
|
|
|
│ ┌──────┐
|
|
|
|
|
└─────(5)──→│ dead │
|
|
|
|
|
└──────┘
|
2020-05-13 15:33:16 +03:00
|
|
|
|
|
|
|
|
|
When a scheduled event is first created, it starts in the 'scheduled' state,
|
|
|
|
|
and it can transition to other states in the following ways:
|
Deploy server documentation to github page in CI
_(This PR is on top of #3352.)_
## Description
This PR overhauls our documentation CI steps to push all generated server documentation to the `gh-pages` branch of the OSS repo. The goal of this PR is to arrive in the situation where `https://hasura.github.io/graphql-engine/server/` is automatically populated to contain the following:
- all the markdown files from `server/documentation`, copied verbatim, no transformation applied
- all the notes, collected from the code by the `extract-notes.sh` script, in `server/notes`
- the generated haddock documentation for each major release or branch in `server/haddock`.
To do so, this PR does the following:
- it includes the script to extract notes from #3352,
- it rewrites the documentation checking CI step, to generate the notes and publish the resulting "server/documentation" folder,
- it includes a new CI step to deploy the documentation to the `gh-pages` branch
Of note:
- we will generate a different haddock folder for each main branch and release; in practice, that means the _main_, _stable_, _alpha_, _beta_ branches, and every build tagged with a version number
- the step that builds the haddock documentation checks that ALL projects in the repo build, including pro, but the deploy only deploys the graphql-engine documentation, as it pushes it to a publicly-accessible place
## Required work
**DO NOT MERGE THIS PR IT IS NOT READY**. Some work needs to go into this PR before it is ready.
First of all: the `gh-pages` branch of the OSS repo does NOT yet contain the documentation scaffolding that this new process assumes. At the bare minimum, it should be a orphan branch that contains a top-level README.md file, and a _server_ folder. An example of the bare minimum required can be previewed [on my fork](https://nicuveo.github.io/graphql-engine/server/).
The content of the `server/documentation` folder needs to be adjusted to reflect this; at the very least, a `README.md` file needs to be added to do the indexing (again, see the placeholder [on my fork](https://nicuveo.github.io/graphql-engine/server/) for an example).
This way of publishing documentation must be validated against [proposed changes to the documentation](https://github.com/hasura/graphql-engine-mono/pull/3294). @marionschleifer what do you think?
~~The buildkite code in this branch is currently untested, and I am not sure how to test it.~~
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/3380
GitOrigin-RevId: b24f6759c64ae29886c1f1b481b172febc512032
2022-01-31 16:15:31 +03:00
|
|
|
|
1. When graphql-engine fetches a scheduled event from the database to process
|
2020-05-13 15:33:16 +03:00
|
|
|
|
it, it sets its state to 'locked'. This prevents multiple graphql-engine
|
|
|
|
|
instances running on the same database from processing the same
|
|
|
|
|
scheduled event concurrently.
|
Deploy server documentation to github page in CI
_(This PR is on top of #3352.)_
## Description
This PR overhauls our documentation CI steps to push all generated server documentation to the `gh-pages` branch of the OSS repo. The goal of this PR is to arrive in the situation where `https://hasura.github.io/graphql-engine/server/` is automatically populated to contain the following:
- all the markdown files from `server/documentation`, copied verbatim, no transformation applied
- all the notes, collected from the code by the `extract-notes.sh` script, in `server/notes`
- the generated haddock documentation for each major release or branch in `server/haddock`.
To do so, this PR does the following:
- it includes the script to extract notes from #3352,
- it rewrites the documentation checking CI step, to generate the notes and publish the resulting "server/documentation" folder,
- it includes a new CI step to deploy the documentation to the `gh-pages` branch
Of note:
- we will generate a different haddock folder for each main branch and release; in practice, that means the _main_, _stable_, _alpha_, _beta_ branches, and every build tagged with a version number
- the step that builds the haddock documentation checks that ALL projects in the repo build, including pro, but the deploy only deploys the graphql-engine documentation, as it pushes it to a publicly-accessible place
## Required work
**DO NOT MERGE THIS PR IT IS NOT READY**. Some work needs to go into this PR before it is ready.
First of all: the `gh-pages` branch of the OSS repo does NOT yet contain the documentation scaffolding that this new process assumes. At the bare minimum, it should be a orphan branch that contains a top-level README.md file, and a _server_ folder. An example of the bare minimum required can be previewed [on my fork](https://nicuveo.github.io/graphql-engine/server/).
The content of the `server/documentation` folder needs to be adjusted to reflect this; at the very least, a `README.md` file needs to be added to do the indexing (again, see the placeholder [on my fork](https://nicuveo.github.io/graphql-engine/server/) for an example).
This way of publishing documentation must be validated against [proposed changes to the documentation](https://github.com/hasura/graphql-engine-mono/pull/3294). @marionschleifer what do you think?
~~The buildkite code in this branch is currently untested, and I am not sure how to test it.~~
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/3380
GitOrigin-RevId: b24f6759c64ae29886c1f1b481b172febc512032
2022-01-31 16:15:31 +03:00
|
|
|
|
2. When a scheduled event is processed successfully, it is marked 'delivered'.
|
|
|
|
|
3. If a scheduled event fails to be processed, but it hasn’t yet reached
|
2020-05-13 15:33:16 +03:00
|
|
|
|
its maximum retry limit, its retry counter is incremented and
|
|
|
|
|
it is returned to the 'scheduled' state.
|
Deploy server documentation to github page in CI
_(This PR is on top of #3352.)_
## Description
This PR overhauls our documentation CI steps to push all generated server documentation to the `gh-pages` branch of the OSS repo. The goal of this PR is to arrive in the situation where `https://hasura.github.io/graphql-engine/server/` is automatically populated to contain the following:
- all the markdown files from `server/documentation`, copied verbatim, no transformation applied
- all the notes, collected from the code by the `extract-notes.sh` script, in `server/notes`
- the generated haddock documentation for each major release or branch in `server/haddock`.
To do so, this PR does the following:
- it includes the script to extract notes from #3352,
- it rewrites the documentation checking CI step, to generate the notes and publish the resulting "server/documentation" folder,
- it includes a new CI step to deploy the documentation to the `gh-pages` branch
Of note:
- we will generate a different haddock folder for each main branch and release; in practice, that means the _main_, _stable_, _alpha_, _beta_ branches, and every build tagged with a version number
- the step that builds the haddock documentation checks that ALL projects in the repo build, including pro, but the deploy only deploys the graphql-engine documentation, as it pushes it to a publicly-accessible place
## Required work
**DO NOT MERGE THIS PR IT IS NOT READY**. Some work needs to go into this PR before it is ready.
First of all: the `gh-pages` branch of the OSS repo does NOT yet contain the documentation scaffolding that this new process assumes. At the bare minimum, it should be a orphan branch that contains a top-level README.md file, and a _server_ folder. An example of the bare minimum required can be previewed [on my fork](https://nicuveo.github.io/graphql-engine/server/).
The content of the `server/documentation` folder needs to be adjusted to reflect this; at the very least, a `README.md` file needs to be added to do the indexing (again, see the placeholder [on my fork](https://nicuveo.github.io/graphql-engine/server/) for an example).
This way of publishing documentation must be validated against [proposed changes to the documentation](https://github.com/hasura/graphql-engine-mono/pull/3294). @marionschleifer what do you think?
~~The buildkite code in this branch is currently untested, and I am not sure how to test it.~~
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/3380
GitOrigin-RevId: b24f6759c64ae29886c1f1b481b172febc512032
2022-01-31 16:15:31 +03:00
|
|
|
|
4. If a scheduled event fails to be processed and *has* reached its
|
2020-05-13 15:33:16 +03:00
|
|
|
|
retry limit, its state is set to 'error'.
|
Deploy server documentation to github page in CI
_(This PR is on top of #3352.)_
## Description
This PR overhauls our documentation CI steps to push all generated server documentation to the `gh-pages` branch of the OSS repo. The goal of this PR is to arrive in the situation where `https://hasura.github.io/graphql-engine/server/` is automatically populated to contain the following:
- all the markdown files from `server/documentation`, copied verbatim, no transformation applied
- all the notes, collected from the code by the `extract-notes.sh` script, in `server/notes`
- the generated haddock documentation for each major release or branch in `server/haddock`.
To do so, this PR does the following:
- it includes the script to extract notes from #3352,
- it rewrites the documentation checking CI step, to generate the notes and publish the resulting "server/documentation" folder,
- it includes a new CI step to deploy the documentation to the `gh-pages` branch
Of note:
- we will generate a different haddock folder for each main branch and release; in practice, that means the _main_, _stable_, _alpha_, _beta_ branches, and every build tagged with a version number
- the step that builds the haddock documentation checks that ALL projects in the repo build, including pro, but the deploy only deploys the graphql-engine documentation, as it pushes it to a publicly-accessible place
## Required work
**DO NOT MERGE THIS PR IT IS NOT READY**. Some work needs to go into this PR before it is ready.
First of all: the `gh-pages` branch of the OSS repo does NOT yet contain the documentation scaffolding that this new process assumes. At the bare minimum, it should be a orphan branch that contains a top-level README.md file, and a _server_ folder. An example of the bare minimum required can be previewed [on my fork](https://nicuveo.github.io/graphql-engine/server/).
The content of the `server/documentation` folder needs to be adjusted to reflect this; at the very least, a `README.md` file needs to be added to do the indexing (again, see the placeholder [on my fork](https://nicuveo.github.io/graphql-engine/server/) for an example).
This way of publishing documentation must be validated against [proposed changes to the documentation](https://github.com/hasura/graphql-engine-mono/pull/3294). @marionschleifer what do you think?
~~The buildkite code in this branch is currently untested, and I am not sure how to test it.~~
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/3380
GitOrigin-RevId: b24f6759c64ae29886c1f1b481b172febc512032
2022-01-31 16:15:31 +03:00
|
|
|
|
5. If for whatever reason the difference between the current time and the
|
2020-05-13 15:33:16 +03:00
|
|
|
|
scheduled time is greater than the tolerance of the scheduled event, it
|
|
|
|
|
will not be processed and its state will be set to 'dead'.
|
|
|
|
|
-}
|
|
|
|
|
|
2021-09-24 01:56:37 +03:00
|
|
|
|
processSuccess ::
|
|
|
|
|
(MonadMetadataStorage m) =>
|
|
|
|
|
ScheduledEventId ->
|
|
|
|
|
[HeaderConf] ->
|
|
|
|
|
ScheduledEventType ->
|
|
|
|
|
J.Value ->
|
|
|
|
|
HTTPResp a ->
|
|
|
|
|
m ()
|
2020-11-25 13:56:44 +03:00
|
|
|
|
processSuccess eventId decodedHeaders type' reqBodyJson resp = do
|
2020-05-13 15:33:16 +03:00
|
|
|
|
let respBody = hrsBody resp
|
|
|
|
|
respHeaders = hrsHeaders resp
|
|
|
|
|
respStatus = hrsStatus resp
|
2021-09-20 16:14:28 +03:00
|
|
|
|
invocation = mkInvocation eventId (Just respStatus) decodedHeaders respBody respHeaders reqBodyJson
|
2020-11-25 13:56:44 +03:00
|
|
|
|
insertScheduledEventInvocation invocation type'
|
|
|
|
|
setScheduledEventOp eventId (SEOpStatus SESDelivered) type'
|
|
|
|
|
|
2021-09-24 01:56:37 +03:00
|
|
|
|
processDead ::
|
|
|
|
|
(MonadMetadataStorage m) =>
|
|
|
|
|
ScheduledEventId ->
|
|
|
|
|
ScheduledEventType ->
|
|
|
|
|
m ()
|
2020-11-25 13:56:44 +03:00
|
|
|
|
processDead eventId type' =
|
|
|
|
|
setScheduledEventOp eventId (SEOpStatus SESDead) type'
|
2020-05-13 15:33:16 +03:00
|
|
|
|
|
2021-09-24 01:56:37 +03:00
|
|
|
|
mkInvocation ::
|
|
|
|
|
ScheduledEventId ->
|
|
|
|
|
Maybe Int ->
|
|
|
|
|
[HeaderConf] ->
|
|
|
|
|
TBS.TByteString ->
|
|
|
|
|
[HeaderConf] ->
|
|
|
|
|
J.Value ->
|
|
|
|
|
(Invocation 'ScheduledType)
|
2021-09-20 16:14:28 +03:00
|
|
|
|
mkInvocation eventId status reqHeaders respBody respHeaders reqBodyJson =
|
2021-09-24 01:56:37 +03:00
|
|
|
|
Invocation
|
2021-09-20 16:14:28 +03:00
|
|
|
|
eventId
|
|
|
|
|
status
|
|
|
|
|
(mkWebhookReq reqBodyJson reqHeaders invocationVersionST)
|
|
|
|
|
(mkInvocationResp status respBody respHeaders)
|
2020-05-13 15:33:16 +03:00
|
|
|
|
|
2020-11-25 13:56:44 +03:00
|
|
|
|
-- metadata database transactions
|
|
|
|
|
|
2021-04-27 08:34:14 +03:00
|
|
|
|
-- | Get cron trigger stats for cron jobs with fewer than 100 future reified
|
|
|
|
|
-- events in the database
|
|
|
|
|
--
|
|
|
|
|
-- The point here is to maintain a certain number of future events so the user
|
|
|
|
|
-- can kind of see what's coming up, and obviously to give 'processCronEvents'
|
|
|
|
|
-- something to do.
|
2021-05-26 19:19:26 +03:00
|
|
|
|
getDeprivedCronTriggerStatsTx :: [TriggerName] -> Q.TxE QErr [CronTriggerStats]
|
|
|
|
|
getDeprivedCronTriggerStatsTx cronTriggerNames =
|
2021-09-24 01:56:37 +03:00
|
|
|
|
map (\(n, count, maxTx) -> CronTriggerStats n count maxTx)
|
|
|
|
|
<$> Q.listQE
|
|
|
|
|
defaultTxErrorHandler
|
|
|
|
|
[Q.sql|
|
2021-05-26 19:19:26 +03:00
|
|
|
|
SELECT t.trigger_name, coalesce(q.upcoming_events_count, 0), coalesce(q.max_scheduled_time, now())
|
|
|
|
|
FROM (SELECT UNNEST ($1::text[]) as trigger_name) as t
|
|
|
|
|
LEFT JOIN
|
2020-11-25 13:56:44 +03:00
|
|
|
|
( SELECT
|
|
|
|
|
trigger_name,
|
2021-05-26 19:19:26 +03:00
|
|
|
|
count(1) as upcoming_events_count,
|
2020-11-25 13:56:44 +03:00
|
|
|
|
max(scheduled_time) as max_scheduled_time
|
|
|
|
|
FROM hdb_catalog.hdb_cron_events
|
|
|
|
|
WHERE tries = 0 and status = 'scheduled'
|
|
|
|
|
GROUP BY trigger_name
|
|
|
|
|
) AS q
|
2021-05-26 19:19:26 +03:00
|
|
|
|
ON t.trigger_name = q.trigger_name
|
|
|
|
|
WHERE coalesce(q.upcoming_events_count, 0) < 100
|
2021-09-24 01:56:37 +03:00
|
|
|
|
|]
|
|
|
|
|
(Identity $ PGTextArray $ map triggerNameToTxt cronTriggerNames)
|
|
|
|
|
True
|
2020-11-25 13:56:44 +03:00
|
|
|
|
|
2021-04-27 08:34:14 +03:00
|
|
|
|
-- TODO
|
|
|
|
|
-- - cron events have minute resolution, while one-off events have arbitrary
|
|
|
|
|
-- resolution, so it doesn't make sense to fetch them at the same rate
|
|
|
|
|
-- - if we decide to fetch cron events less frequently we should wake up that
|
|
|
|
|
-- thread at second 0 of every minute, and then pass hasura's now time into
|
|
|
|
|
-- the query (since the DB may disagree about the time)
|
2020-11-25 13:56:44 +03:00
|
|
|
|
getScheduledEventsForDeliveryTx :: Q.TxE QErr ([CronEvent], [OneOffScheduledEvent])
|
|
|
|
|
getScheduledEventsForDeliveryTx =
|
|
|
|
|
(,) <$> getCronEventsForDelivery <*> getOneOffEventsForDelivery
|
|
|
|
|
where
|
|
|
|
|
getCronEventsForDelivery :: Q.TxE QErr [CronEvent]
|
|
|
|
|
getCronEventsForDelivery =
|
2021-09-24 01:56:37 +03:00
|
|
|
|
map (Q.getAltJ . runIdentity)
|
|
|
|
|
<$> Q.listQE
|
|
|
|
|
defaultTxErrorHandler
|
|
|
|
|
[Q.sql|
|
2020-11-25 13:56:44 +03:00
|
|
|
|
WITH cte AS
|
|
|
|
|
( UPDATE hdb_catalog.hdb_cron_events
|
|
|
|
|
SET status = 'locked'
|
|
|
|
|
WHERE id IN ( SELECT t.id
|
|
|
|
|
FROM hdb_catalog.hdb_cron_events t
|
|
|
|
|
WHERE ( t.status = 'scheduled'
|
|
|
|
|
and (
|
|
|
|
|
(t.next_retry_at is NULL and t.scheduled_time <= now()) or
|
|
|
|
|
(t.next_retry_at is not NULL and t.next_retry_at <= now())
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
FOR UPDATE SKIP LOCKED
|
|
|
|
|
)
|
|
|
|
|
RETURNING *
|
|
|
|
|
)
|
|
|
|
|
SELECT row_to_json(t.*) FROM cte AS t
|
2021-09-24 01:56:37 +03:00
|
|
|
|
|]
|
|
|
|
|
()
|
|
|
|
|
True
|
2020-11-25 13:56:44 +03:00
|
|
|
|
|
|
|
|
|
getOneOffEventsForDelivery :: Q.TxE QErr [OneOffScheduledEvent]
|
|
|
|
|
getOneOffEventsForDelivery = do
|
2021-09-24 01:56:37 +03:00
|
|
|
|
map (Q.getAltJ . runIdentity)
|
|
|
|
|
<$> Q.listQE
|
|
|
|
|
defaultTxErrorHandler
|
|
|
|
|
[Q.sql|
|
2020-11-25 13:56:44 +03:00
|
|
|
|
WITH cte AS (
|
|
|
|
|
UPDATE hdb_catalog.hdb_scheduled_events
|
|
|
|
|
SET status = 'locked'
|
|
|
|
|
WHERE id IN ( SELECT t.id
|
|
|
|
|
FROM hdb_catalog.hdb_scheduled_events t
|
|
|
|
|
WHERE ( t.status = 'scheduled'
|
|
|
|
|
and (
|
|
|
|
|
(t.next_retry_at is NULL and t.scheduled_time <= now()) or
|
|
|
|
|
(t.next_retry_at is not NULL and t.next_retry_at <= now())
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
FOR UPDATE SKIP LOCKED
|
|
|
|
|
)
|
|
|
|
|
RETURNING *
|
|
|
|
|
)
|
|
|
|
|
SELECT row_to_json(t.*) FROM cte AS t
|
2021-09-24 01:56:37 +03:00
|
|
|
|
|]
|
|
|
|
|
()
|
|
|
|
|
False
|
2020-11-25 13:56:44 +03:00
|
|
|
|
|
|
|
|
|
insertInvocationTx :: Invocation 'ScheduledType -> ScheduledEventType -> Q.TxE QErr ()
|
|
|
|
|
insertInvocationTx invo type' = do
|
2020-05-13 15:33:16 +03:00
|
|
|
|
case type' of
|
2020-09-09 09:47:34 +03:00
|
|
|
|
Cron -> do
|
2021-09-24 01:56:37 +03:00
|
|
|
|
Q.unitQE
|
|
|
|
|
defaultTxErrorHandler
|
2020-05-13 15:33:16 +03:00
|
|
|
|
[Q.sql|
|
|
|
|
|
INSERT INTO hdb_catalog.hdb_cron_event_invocation_logs
|
|
|
|
|
(event_id, status, request, response)
|
|
|
|
|
VALUES ($1, $2, $3, $4)
|
2021-09-24 01:56:37 +03:00
|
|
|
|
|]
|
|
|
|
|
( iEventId invo,
|
|
|
|
|
fromIntegral <$> iStatus invo :: Maybe Int64,
|
|
|
|
|
Q.AltJ $ J.toJSON $ iRequest invo,
|
|
|
|
|
Q.AltJ $ J.toJSON $ iResponse invo
|
|
|
|
|
)
|
|
|
|
|
True
|
|
|
|
|
Q.unitQE
|
|
|
|
|
defaultTxErrorHandler
|
|
|
|
|
[Q.sql|
|
2020-05-13 15:33:16 +03:00
|
|
|
|
UPDATE hdb_catalog.hdb_cron_events
|
|
|
|
|
SET tries = tries + 1
|
|
|
|
|
WHERE id = $1
|
2021-09-24 01:56:37 +03:00
|
|
|
|
|]
|
|
|
|
|
(Identity $ iEventId invo)
|
|
|
|
|
True
|
2020-09-09 09:47:34 +03:00
|
|
|
|
OneOff -> do
|
2021-09-24 01:56:37 +03:00
|
|
|
|
Q.unitQE
|
|
|
|
|
defaultTxErrorHandler
|
2020-05-13 15:33:16 +03:00
|
|
|
|
[Q.sql|
|
|
|
|
|
INSERT INTO hdb_catalog.hdb_scheduled_event_invocation_logs
|
|
|
|
|
(event_id, status, request, response)
|
|
|
|
|
VALUES ($1, $2, $3, $4)
|
2021-09-24 01:56:37 +03:00
|
|
|
|
|]
|
|
|
|
|
( iEventId invo,
|
|
|
|
|
fromIntegral <$> iStatus invo :: Maybe Int64,
|
|
|
|
|
Q.AltJ $ J.toJSON $ iRequest invo,
|
|
|
|
|
Q.AltJ $ J.toJSON $ iResponse invo
|
|
|
|
|
)
|
|
|
|
|
True
|
|
|
|
|
Q.unitQE
|
|
|
|
|
defaultTxErrorHandler
|
|
|
|
|
[Q.sql|
|
2020-05-13 15:33:16 +03:00
|
|
|
|
UPDATE hdb_catalog.hdb_scheduled_events
|
|
|
|
|
SET tries = tries + 1
|
|
|
|
|
WHERE id = $1
|
2021-09-24 01:56:37 +03:00
|
|
|
|
|]
|
|
|
|
|
(Identity $ iEventId invo)
|
|
|
|
|
True
|
2020-05-13 15:33:16 +03:00
|
|
|
|
|
2021-09-24 01:56:37 +03:00
|
|
|
|
setScheduledEventOpTx ::
|
|
|
|
|
ScheduledEventId -> ScheduledEventOp -> ScheduledEventType -> Q.TxE QErr ()
|
2020-11-25 13:56:44 +03:00
|
|
|
|
setScheduledEventOpTx eventId op type' = case op of
|
2021-09-24 01:56:37 +03:00
|
|
|
|
SEOpRetry time -> setRetry time
|
2020-11-25 13:56:44 +03:00
|
|
|
|
SEOpStatus status -> setStatus status
|
2020-05-13 15:33:16 +03:00
|
|
|
|
where
|
2020-11-25 13:56:44 +03:00
|
|
|
|
setRetry time =
|
|
|
|
|
case type' of
|
|
|
|
|
Cron ->
|
2021-09-24 01:56:37 +03:00
|
|
|
|
Q.unitQE
|
|
|
|
|
defaultTxErrorHandler
|
|
|
|
|
[Q.sql|
|
2020-11-25 13:56:44 +03:00
|
|
|
|
UPDATE hdb_catalog.hdb_cron_events
|
|
|
|
|
SET next_retry_at = $1,
|
|
|
|
|
STATUS = 'scheduled'
|
|
|
|
|
WHERE id = $2
|
2021-09-24 01:56:37 +03:00
|
|
|
|
|]
|
|
|
|
|
(time, eventId)
|
|
|
|
|
True
|
2020-11-25 13:56:44 +03:00
|
|
|
|
OneOff ->
|
2021-09-24 01:56:37 +03:00
|
|
|
|
Q.unitQE
|
|
|
|
|
defaultTxErrorHandler
|
|
|
|
|
[Q.sql|
|
2020-11-25 13:56:44 +03:00
|
|
|
|
UPDATE hdb_catalog.hdb_scheduled_events
|
|
|
|
|
SET next_retry_at = $1,
|
|
|
|
|
STATUS = 'scheduled'
|
|
|
|
|
WHERE id = $2
|
2021-09-24 01:56:37 +03:00
|
|
|
|
|]
|
|
|
|
|
(time, eventId)
|
|
|
|
|
True
|
2020-11-25 13:56:44 +03:00
|
|
|
|
setStatus status =
|
|
|
|
|
case type' of
|
|
|
|
|
Cron -> do
|
2021-09-24 01:56:37 +03:00
|
|
|
|
Q.unitQE
|
|
|
|
|
defaultTxErrorHandler
|
|
|
|
|
[Q.sql|
|
2020-11-25 13:56:44 +03:00
|
|
|
|
UPDATE hdb_catalog.hdb_cron_events
|
|
|
|
|
SET status = $2
|
|
|
|
|
WHERE id = $1
|
2021-09-24 01:56:37 +03:00
|
|
|
|
|]
|
|
|
|
|
(eventId, status)
|
|
|
|
|
True
|
2020-11-25 13:56:44 +03:00
|
|
|
|
OneOff -> do
|
2021-09-24 01:56:37 +03:00
|
|
|
|
Q.unitQE
|
|
|
|
|
defaultTxErrorHandler
|
|
|
|
|
[Q.sql|
|
2020-11-25 13:56:44 +03:00
|
|
|
|
UPDATE hdb_catalog.hdb_scheduled_events
|
|
|
|
|
SET status = $2
|
|
|
|
|
WHERE id = $1
|
2021-09-24 01:56:37 +03:00
|
|
|
|
|]
|
|
|
|
|
(eventId, status)
|
|
|
|
|
True
|
2020-11-25 13:56:44 +03:00
|
|
|
|
|
|
|
|
|
unlockScheduledEventsTx :: ScheduledEventType -> [ScheduledEventId] -> Q.TxE QErr Int
|
|
|
|
|
unlockScheduledEventsTx type' eventIds =
|
2021-05-26 19:19:26 +03:00
|
|
|
|
let eventIdsTextArray = map unEventId eventIds
|
2021-09-24 01:56:37 +03:00
|
|
|
|
in case type' of
|
|
|
|
|
Cron ->
|
|
|
|
|
(runIdentity . Q.getRow)
|
|
|
|
|
<$> Q.withQE
|
|
|
|
|
defaultTxErrorHandler
|
|
|
|
|
[Q.sql|
|
2020-11-25 13:56:44 +03:00
|
|
|
|
WITH "cte" AS
|
|
|
|
|
(UPDATE hdb_catalog.hdb_cron_events
|
|
|
|
|
SET status = 'scheduled'
|
|
|
|
|
WHERE id = ANY($1::text[]) and status = 'locked'
|
|
|
|
|
RETURNING *)
|
|
|
|
|
SELECT count(*) FROM "cte"
|
2021-09-24 01:56:37 +03:00
|
|
|
|
|]
|
|
|
|
|
(Identity $ PGTextArray eventIdsTextArray)
|
|
|
|
|
True
|
|
|
|
|
OneOff ->
|
|
|
|
|
(runIdentity . Q.getRow)
|
|
|
|
|
<$> Q.withQE
|
|
|
|
|
defaultTxErrorHandler
|
|
|
|
|
[Q.sql|
|
2020-11-25 13:56:44 +03:00
|
|
|
|
WITH "cte" AS
|
|
|
|
|
(UPDATE hdb_catalog.hdb_scheduled_events
|
|
|
|
|
SET status = 'scheduled'
|
|
|
|
|
WHERE id = ANY($1::text[]) AND status = 'locked'
|
|
|
|
|
RETURNING *)
|
|
|
|
|
SELECT count(*) FROM "cte"
|
2021-09-24 01:56:37 +03:00
|
|
|
|
|]
|
|
|
|
|
(Identity $ PGTextArray eventIdsTextArray)
|
|
|
|
|
True
|
2020-11-25 13:56:44 +03:00
|
|
|
|
|
|
|
|
|
unlockAllLockedScheduledEventsTx :: Q.TxE QErr ()
|
|
|
|
|
unlockAllLockedScheduledEventsTx = do
|
2021-09-24 01:56:37 +03:00
|
|
|
|
Q.unitQE
|
|
|
|
|
defaultTxErrorHandler
|
|
|
|
|
[Q.sql|
|
2020-07-02 14:57:09 +03:00
|
|
|
|
UPDATE hdb_catalog.hdb_cron_events
|
|
|
|
|
SET status = 'scheduled'
|
|
|
|
|
WHERE status = 'locked'
|
2021-09-24 01:56:37 +03:00
|
|
|
|
|]
|
|
|
|
|
()
|
|
|
|
|
True
|
|
|
|
|
Q.unitQE
|
|
|
|
|
defaultTxErrorHandler
|
|
|
|
|
[Q.sql|
|
2020-07-02 14:57:09 +03:00
|
|
|
|
UPDATE hdb_catalog.hdb_scheduled_events
|
|
|
|
|
SET status = 'scheduled'
|
|
|
|
|
WHERE status = 'locked'
|
2021-09-24 01:56:37 +03:00
|
|
|
|
|]
|
|
|
|
|
()
|
|
|
|
|
True
|
2020-11-25 13:56:44 +03:00
|
|
|
|
|
2021-09-13 21:00:53 +03:00
|
|
|
|
insertCronEventsTx :: [CronEventSeed] -> Q.TxE QErr ()
|
|
|
|
|
insertCronEventsTx cronSeeds = do
|
2021-09-24 01:56:37 +03:00
|
|
|
|
let insertCronEventsSql =
|
|
|
|
|
TB.run $
|
|
|
|
|
toSQL
|
|
|
|
|
S.SQLInsert
|
|
|
|
|
{ siTable = cronEventsTable,
|
|
|
|
|
siCols = map unsafePGCol ["trigger_name", "scheduled_time"],
|
|
|
|
|
siValues = S.ValuesExp $ map (toTupleExp . toArr) cronSeeds,
|
|
|
|
|
siConflict = Just $ S.DoNothing Nothing,
|
|
|
|
|
siRet = Nothing
|
|
|
|
|
}
|
2021-09-13 21:00:53 +03:00
|
|
|
|
Q.unitQE defaultTxErrorHandler (Q.fromText insertCronEventsSql) () False
|
|
|
|
|
where
|
|
|
|
|
toArr (CronEventSeed n t) = [(triggerNameToTxt n), (formatTime' t)]
|
|
|
|
|
toTupleExp = S.TupleExp . map S.SELit
|
|
|
|
|
|
|
|
|
|
insertOneOffScheduledEventTx :: OneOffEvent -> Q.TxE QErr EventId
|
2021-09-24 01:56:37 +03:00
|
|
|
|
insertOneOffScheduledEventTx CreateScheduledEvent {..} =
|
|
|
|
|
runIdentity . Q.getRow
|
|
|
|
|
<$> Q.withQE
|
|
|
|
|
defaultTxErrorHandler
|
|
|
|
|
[Q.sql|
|
2021-09-13 21:00:53 +03:00
|
|
|
|
INSERT INTO hdb_catalog.hdb_scheduled_events
|
|
|
|
|
(webhook_conf,scheduled_time,payload,retry_conf,header_conf,comment)
|
|
|
|
|
VALUES
|
|
|
|
|
($1, $2, $3, $4, $5, $6) RETURNING id
|
2021-09-24 01:56:37 +03:00
|
|
|
|
|]
|
|
|
|
|
( Q.AltJ cseWebhook,
|
|
|
|
|
cseScheduleAt,
|
|
|
|
|
Q.AltJ csePayload,
|
|
|
|
|
Q.AltJ cseRetryConf,
|
|
|
|
|
Q.AltJ cseHeaders,
|
|
|
|
|
cseComment
|
|
|
|
|
)
|
2021-09-13 21:00:53 +03:00
|
|
|
|
False
|
2020-11-25 13:56:44 +03:00
|
|
|
|
|
2021-05-26 19:19:26 +03:00
|
|
|
|
dropFutureCronEventsTx :: ClearCronEvents -> Q.TxE QErr ()
|
|
|
|
|
dropFutureCronEventsTx = \case
|
|
|
|
|
SingleCronTrigger triggerName ->
|
2021-09-24 01:56:37 +03:00
|
|
|
|
Q.unitQE
|
|
|
|
|
defaultTxErrorHandler
|
|
|
|
|
[Q.sql|
|
2021-05-26 19:19:26 +03:00
|
|
|
|
DELETE FROM hdb_catalog.hdb_cron_events
|
|
|
|
|
WHERE trigger_name = $1 AND scheduled_time > now() AND tries = 0
|
2021-09-24 01:56:37 +03:00
|
|
|
|
|]
|
|
|
|
|
(Identity triggerName)
|
|
|
|
|
True
|
2021-05-26 19:19:26 +03:00
|
|
|
|
MetadataCronTriggers triggerNames ->
|
2021-09-24 01:56:37 +03:00
|
|
|
|
Q.unitQE
|
|
|
|
|
defaultTxErrorHandler
|
|
|
|
|
[Q.sql|
|
2021-05-26 19:19:26 +03:00
|
|
|
|
DELETE FROM hdb_catalog.hdb_cron_events
|
|
|
|
|
WHERE scheduled_time > now() AND tries = 0 AND trigger_name = ANY($1::text[])
|
2021-09-24 01:56:37 +03:00
|
|
|
|
|]
|
|
|
|
|
(Identity $ PGTextArray $ map triggerNameToTxt triggerNames)
|
|
|
|
|
False
|
2020-12-14 07:30:19 +03:00
|
|
|
|
|
2020-11-25 13:56:44 +03:00
|
|
|
|
cronEventsTable :: QualifiedTable
|
|
|
|
|
cronEventsTable =
|
|
|
|
|
QualifiedObject "hdb_catalog" $ TableName "hdb_cron_events"
|
2021-01-07 12:04:22 +03:00
|
|
|
|
|
|
|
|
|
mkScheduledEventStatusFilter :: [ScheduledEventStatus] -> S.BoolExp
|
|
|
|
|
mkScheduledEventStatusFilter = \case
|
|
|
|
|
[] -> S.BELit True
|
2021-09-24 01:56:37 +03:00
|
|
|
|
v ->
|
|
|
|
|
S.BEIN (S.SEIdentifier $ Identifier "status") $
|
|
|
|
|
map (S.SELit . scheduledEventStatusToText) v
|
2021-01-07 12:04:22 +03:00
|
|
|
|
|
|
|
|
|
scheduledTimeOrderBy :: S.OrderByExp
|
|
|
|
|
scheduledTimeOrderBy =
|
|
|
|
|
let scheduledTimeCol = S.SEIdentifier $ Identifier "scheduled_time"
|
2021-09-24 01:56:37 +03:00
|
|
|
|
in S.OrderByExp $
|
|
|
|
|
flip (NE.:|) [] $
|
|
|
|
|
S.OrderByItem
|
|
|
|
|
scheduledTimeCol
|
|
|
|
|
(Just S.OTAsc)
|
|
|
|
|
Nothing
|
2021-01-07 12:04:22 +03:00
|
|
|
|
|
|
|
|
|
-- | Build a select expression which outputs total count and
|
|
|
|
|
-- list of json rows with pagination limit and offset applied
|
2021-09-24 01:56:37 +03:00
|
|
|
|
mkPaginationSelectExp ::
|
|
|
|
|
S.Select ->
|
|
|
|
|
ScheduledEventPagination ->
|
|
|
|
|
S.Select
|
|
|
|
|
mkPaginationSelectExp allRowsSelect ScheduledEventPagination {..} =
|
2021-01-07 12:04:22 +03:00
|
|
|
|
S.mkSelect
|
2021-09-24 01:56:37 +03:00
|
|
|
|
{ S.selCTEs = [(S.toAlias countCteAlias, allRowsSelect), (S.toAlias limitCteAlias, limitCteSelect)],
|
|
|
|
|
S.selExtr = [countExtractor, rowsExtractor]
|
|
|
|
|
}
|
2021-01-07 12:04:22 +03:00
|
|
|
|
where
|
|
|
|
|
countCteAlias = Identifier "count_cte"
|
|
|
|
|
limitCteAlias = Identifier "limit_cte"
|
|
|
|
|
|
|
|
|
|
countExtractor =
|
2021-09-24 01:56:37 +03:00
|
|
|
|
let selectExp =
|
|
|
|
|
S.mkSelect
|
|
|
|
|
{ S.selExtr = [S.Extractor S.countStar Nothing],
|
|
|
|
|
S.selFrom = Just $ S.mkIdenFromExp countCteAlias
|
|
|
|
|
}
|
|
|
|
|
in S.Extractor (S.SESelect selectExp) Nothing
|
|
|
|
|
|
|
|
|
|
limitCteSelect =
|
|
|
|
|
S.mkSelect
|
|
|
|
|
{ S.selExtr = [S.selectStar],
|
|
|
|
|
S.selFrom = Just $ S.mkIdenFromExp countCteAlias,
|
|
|
|
|
S.selLimit = (S.LimitExp . S.intToSQLExp) <$> _sepLimit,
|
|
|
|
|
S.selOffset = (S.OffsetExp . S.intToSQLExp) <$> _sepOffset
|
|
|
|
|
}
|
2021-01-07 12:04:22 +03:00
|
|
|
|
|
|
|
|
|
rowsExtractor =
|
|
|
|
|
let jsonAgg = S.SEUnsafe "json_agg(row_to_json(limit_cte.*))"
|
2021-09-24 01:56:37 +03:00
|
|
|
|
selectExp =
|
|
|
|
|
S.mkSelect
|
|
|
|
|
{ S.selExtr = [S.Extractor jsonAgg Nothing],
|
|
|
|
|
S.selFrom = Just $ S.mkIdenFromExp limitCteAlias
|
|
|
|
|
}
|
|
|
|
|
in S.Extractor (S.handleIfNull (S.SELit "[]") (S.SESelect selectExp)) Nothing
|
2021-01-07 12:04:22 +03:00
|
|
|
|
|
|
|
|
|
withCount :: (Int, Q.AltJ a) -> WithTotalCount a
|
|
|
|
|
withCount (count, Q.AltJ a) = WithTotalCount count a
|
|
|
|
|
|
2021-09-24 01:56:37 +03:00
|
|
|
|
getOneOffScheduledEventsTx ::
|
|
|
|
|
ScheduledEventPagination ->
|
|
|
|
|
[ScheduledEventStatus] ->
|
|
|
|
|
Q.TxE QErr (WithTotalCount [OneOffScheduledEvent])
|
2021-01-07 12:04:22 +03:00
|
|
|
|
getOneOffScheduledEventsTx pagination statuses = do
|
|
|
|
|
let table = QualifiedObject "hdb_catalog" $ TableName "hdb_scheduled_events"
|
|
|
|
|
statusFilter = mkScheduledEventStatusFilter statuses
|
2021-09-24 01:56:37 +03:00
|
|
|
|
select =
|
|
|
|
|
S.mkSelect
|
|
|
|
|
{ S.selExtr = [S.selectStar],
|
|
|
|
|
S.selFrom = Just $ S.mkSimpleFromExp table,
|
|
|
|
|
S.selWhere = Just $ S.WhereFrag statusFilter,
|
|
|
|
|
S.selOrderBy = Just scheduledTimeOrderBy
|
|
|
|
|
}
|
2021-01-07 12:04:22 +03:00
|
|
|
|
sql = Q.fromBuilder $ toSQL $ mkPaginationSelectExp select pagination
|
|
|
|
|
(withCount . Q.getRow) <$> Q.withQE defaultTxErrorHandler sql () False
|
|
|
|
|
|
2021-09-24 01:56:37 +03:00
|
|
|
|
getCronEventsTx ::
|
|
|
|
|
TriggerName ->
|
|
|
|
|
ScheduledEventPagination ->
|
|
|
|
|
[ScheduledEventStatus] ->
|
|
|
|
|
Q.TxE QErr (WithTotalCount [CronEvent])
|
2021-01-07 12:04:22 +03:00
|
|
|
|
getCronEventsTx triggerName pagination status = do
|
|
|
|
|
let triggerNameFilter =
|
|
|
|
|
S.BECompare S.SEQ (S.SEIdentifier $ Identifier "trigger_name") (S.SELit $ triggerNameToTxt triggerName)
|
|
|
|
|
statusFilter = mkScheduledEventStatusFilter status
|
2021-09-24 01:56:37 +03:00
|
|
|
|
select =
|
|
|
|
|
S.mkSelect
|
|
|
|
|
{ S.selExtr = [S.selectStar],
|
|
|
|
|
S.selFrom = Just $ S.mkSimpleFromExp cronEventsTable,
|
|
|
|
|
S.selWhere = Just $ S.WhereFrag $ S.BEBin S.AndOp triggerNameFilter statusFilter,
|
|
|
|
|
S.selOrderBy = Just scheduledTimeOrderBy
|
|
|
|
|
}
|
2021-01-07 12:04:22 +03:00
|
|
|
|
sql = Q.fromBuilder $ toSQL $ mkPaginationSelectExp select pagination
|
|
|
|
|
(withCount . Q.getRow) <$> Q.withQE defaultTxErrorHandler sql () False
|
|
|
|
|
|
2021-09-24 01:56:37 +03:00
|
|
|
|
deleteScheduledEventTx ::
|
|
|
|
|
ScheduledEventId -> ScheduledEventType -> Q.TxE QErr ()
|
2021-01-07 12:04:22 +03:00
|
|
|
|
deleteScheduledEventTx eventId = \case
|
|
|
|
|
OneOff ->
|
2021-09-24 01:56:37 +03:00
|
|
|
|
Q.unitQE
|
|
|
|
|
defaultTxErrorHandler
|
|
|
|
|
[Q.sql|
|
2021-01-07 12:04:22 +03:00
|
|
|
|
DELETE FROM hdb_catalog.hdb_scheduled_events
|
|
|
|
|
WHERE id = $1
|
2021-09-24 01:56:37 +03:00
|
|
|
|
|]
|
|
|
|
|
(Identity eventId)
|
|
|
|
|
False
|
|
|
|
|
Cron ->
|
|
|
|
|
Q.unitQE
|
|
|
|
|
defaultTxErrorHandler
|
|
|
|
|
[Q.sql|
|
2021-01-07 12:04:22 +03:00
|
|
|
|
DELETE FROM hdb_catalog.hdb_cron_events
|
|
|
|
|
WHERE id = $1
|
2021-09-24 01:56:37 +03:00
|
|
|
|
|]
|
|
|
|
|
(Identity eventId)
|
|
|
|
|
False
|
2021-01-07 12:04:22 +03:00
|
|
|
|
|
|
|
|
|
invocationFieldExtractors :: QualifiedTable -> [S.Extractor]
|
|
|
|
|
invocationFieldExtractors table =
|
2021-09-24 01:56:37 +03:00
|
|
|
|
[ S.Extractor (seIden "id") Nothing,
|
|
|
|
|
S.Extractor (seIden "event_id") Nothing,
|
|
|
|
|
S.Extractor (seIden "status") Nothing,
|
|
|
|
|
S.Extractor (withJsonTypeAnn $ seIden "request") Nothing,
|
|
|
|
|
S.Extractor (withJsonTypeAnn $ seIden "response") Nothing,
|
|
|
|
|
S.Extractor (seIden "created_at") Nothing
|
2021-01-07 12:04:22 +03:00
|
|
|
|
]
|
|
|
|
|
where
|
|
|
|
|
withJsonTypeAnn e = S.SETyAnn e $ S.TypeAnn "json"
|
|
|
|
|
seIden = S.SEQIdentifier . S.mkQIdentifierTable table . Identifier
|
|
|
|
|
|
|
|
|
|
mkEventIdBoolExp :: QualifiedTable -> EventId -> S.BoolExp
|
|
|
|
|
mkEventIdBoolExp table eventId =
|
2021-09-24 01:56:37 +03:00
|
|
|
|
S.BECompare
|
|
|
|
|
S.SEQ
|
|
|
|
|
(S.SEQIdentifier $ S.mkQIdentifierTable table $ Identifier "event_id")
|
|
|
|
|
(S.SELit $ unEventId eventId)
|
|
|
|
|
|
|
|
|
|
getInvocationsTx ::
|
|
|
|
|
GetInvocationsBy ->
|
|
|
|
|
ScheduledEventPagination ->
|
|
|
|
|
Q.TxE QErr (WithTotalCount [ScheduledEventInvocation])
|
2021-01-07 12:04:22 +03:00
|
|
|
|
getInvocationsTx invocationsBy pagination = do
|
2021-02-22 19:02:04 +03:00
|
|
|
|
let eventsTables = EventTables oneOffInvocationsTable cronInvocationsTable cronEventsTable
|
|
|
|
|
sql = Q.fromBuilder $ toSQL $ getInvocationsQuery eventsTables invocationsBy pagination
|
2021-01-07 12:04:22 +03:00
|
|
|
|
(withCount . Q.getRow) <$> Q.withQE defaultTxErrorHandler sql () True
|
2021-02-22 19:02:04 +03:00
|
|
|
|
where
|
|
|
|
|
oneOffInvocationsTable = QualifiedObject "hdb_catalog" $ TableName "hdb_scheduled_event_invocation_logs"
|
|
|
|
|
cronInvocationsTable = QualifiedObject "hdb_catalog" $ TableName "hdb_cron_event_invocation_logs"
|
|
|
|
|
|
2021-09-24 01:56:37 +03:00
|
|
|
|
data EventTables = EventTables
|
|
|
|
|
{ etOneOffInvocationsTable :: QualifiedTable,
|
|
|
|
|
etCronInvocationsTable :: QualifiedTable,
|
|
|
|
|
etCronEventsTable :: QualifiedTable
|
2021-02-22 19:02:04 +03:00
|
|
|
|
}
|
2021-02-11 20:54:25 +03:00
|
|
|
|
|
2021-04-14 04:23:45 +03:00
|
|
|
|
getInvocationsQueryNoPagination :: EventTables -> GetInvocationsBy -> S.Select
|
|
|
|
|
getInvocationsQueryNoPagination (EventTables oneOffInvocationsTable cronInvocationsTable cronEventsTable') invocationsBy =
|
2021-09-24 01:56:37 +03:00
|
|
|
|
allRowsSelect
|
2021-01-07 12:04:22 +03:00
|
|
|
|
where
|
|
|
|
|
createdAtOrderBy table =
|
|
|
|
|
let createdAtCol = S.SEQIdentifier $ S.mkQIdentifierTable table $ Identifier "created_at"
|
2021-09-24 01:56:37 +03:00
|
|
|
|
in S.OrderByExp $ flip (NE.:|) [] $ S.OrderByItem createdAtCol (Just S.OTDesc) Nothing
|
2021-01-07 12:04:22 +03:00
|
|
|
|
|
|
|
|
|
allRowsSelect = case invocationsBy of
|
|
|
|
|
GIBEventId eventId eventType ->
|
|
|
|
|
let table = case eventType of
|
|
|
|
|
OneOff -> oneOffInvocationsTable
|
2021-09-24 01:56:37 +03:00
|
|
|
|
Cron -> cronInvocationsTable
|
|
|
|
|
in S.mkSelect
|
|
|
|
|
{ S.selExtr = invocationFieldExtractors table,
|
|
|
|
|
S.selFrom = Just $ S.mkSimpleFromExp table,
|
|
|
|
|
S.selOrderBy = Just $ createdAtOrderBy table,
|
|
|
|
|
S.selWhere = Just $ S.WhereFrag $ mkEventIdBoolExp table eventId
|
|
|
|
|
}
|
2021-01-07 12:04:22 +03:00
|
|
|
|
GIBEvent event -> case event of
|
|
|
|
|
SEOneOff ->
|
|
|
|
|
let table = oneOffInvocationsTable
|
2021-09-24 01:56:37 +03:00
|
|
|
|
in S.mkSelect
|
|
|
|
|
{ S.selExtr = invocationFieldExtractors table,
|
|
|
|
|
S.selFrom = Just $ S.mkSimpleFromExp table,
|
|
|
|
|
S.selOrderBy = Just $ createdAtOrderBy table
|
|
|
|
|
}
|
2021-01-07 12:04:22 +03:00
|
|
|
|
SECron triggerName ->
|
|
|
|
|
let invocationTable = cronInvocationsTable
|
2021-02-22 19:02:04 +03:00
|
|
|
|
eventTable = cronEventsTable'
|
2021-09-24 01:56:37 +03:00
|
|
|
|
joinCondition =
|
|
|
|
|
S.JoinOn $
|
|
|
|
|
S.BECompare
|
|
|
|
|
S.SEQ
|
|
|
|
|
(S.SEQIdentifier $ S.mkQIdentifierTable eventTable $ Identifier "id")
|
|
|
|
|
(S.SEQIdentifier $ S.mkQIdentifierTable invocationTable $ Identifier "event_id")
|
2021-01-07 12:04:22 +03:00
|
|
|
|
joinTables =
|
2021-09-24 01:56:37 +03:00
|
|
|
|
S.JoinExpr
|
|
|
|
|
(S.FISimple invocationTable Nothing)
|
|
|
|
|
S.Inner
|
|
|
|
|
(S.FISimple eventTable Nothing)
|
|
|
|
|
joinCondition
|
|
|
|
|
triggerBoolExp =
|
|
|
|
|
S.BECompare
|
|
|
|
|
S.SEQ
|
|
|
|
|
(S.SEQIdentifier $ S.mkQIdentifierTable eventTable (Identifier "trigger_name"))
|
|
|
|
|
(S.SELit $ triggerNameToTxt triggerName)
|
|
|
|
|
in S.mkSelect
|
|
|
|
|
{ S.selExtr = invocationFieldExtractors invocationTable,
|
|
|
|
|
S.selFrom = Just $ S.FromExp [S.FIJoin joinTables],
|
|
|
|
|
S.selWhere = Just $ S.WhereFrag triggerBoolExp,
|
|
|
|
|
S.selOrderBy = Just $ createdAtOrderBy invocationTable
|
|
|
|
|
}
|
2021-04-14 04:23:45 +03:00
|
|
|
|
|
|
|
|
|
getInvocationsQuery :: EventTables -> GetInvocationsBy -> ScheduledEventPagination -> S.Select
|
|
|
|
|
getInvocationsQuery ets invocationsBy pagination =
|
|
|
|
|
mkPaginationSelectExp (getInvocationsQueryNoPagination ets invocationsBy) pagination
|