2020-05-13 15:33:16 +03:00
|
|
|
|
{-|
|
|
|
|
|
= Scheduled Triggers
|
|
|
|
|
|
|
|
|
|
This module implements the functionality of invoking webhooks during specified
|
|
|
|
|
time events aka scheduled events. The scheduled events are the events generated
|
|
|
|
|
by the graphql-engine using the cron triggers or/and a scheduled event can
|
|
|
|
|
be created by the user at a specified time with the payload, webhook, headers
|
|
|
|
|
and the retry configuration. Scheduled events are modeled using rows in Postgres
|
|
|
|
|
with a @timestamp@ column.
|
|
|
|
|
|
|
|
|
|
This module implements scheduling and delivery of scheduled
|
|
|
|
|
events:
|
|
|
|
|
|
|
|
|
|
1. Scheduling a cron event involves creating new cron events. New
|
|
|
|
|
cron events are created based on the cron schedule and the number of
|
|
|
|
|
scheduled events that are already present in the scheduled events buffer.
|
|
|
|
|
The graphql-engine computes the new scheduled events and writes them to
|
|
|
|
|
the database.(Generator)
|
|
|
|
|
|
|
|
|
|
2. Delivering a scheduled event involves reading undelivered scheduled events
|
|
|
|
|
from the database and delivering them to the webhook server. (Processor)
|
|
|
|
|
|
|
|
|
|
The rationale behind separating the event scheduling and event delivery
|
|
|
|
|
mechanism into two different threads is that the scheduling and delivering of
|
|
|
|
|
the scheduled events are not directly dependent on each other. The generator
|
|
|
|
|
will almost always try to create scheduled events which are supposed to be
|
|
|
|
|
delivered in the future (timestamp > current_timestamp) and the processor
|
|
|
|
|
will fetch scheduled events of the past (timestamp < current_timestamp). So,
|
|
|
|
|
the set of the scheduled events generated by the generator and the processor
|
|
|
|
|
will never be the same. The point here is that they're not correlated to each
|
|
|
|
|
other. They can be split into different threads for a better performance.
|
|
|
|
|
|
|
|
|
|
== Implementation
|
|
|
|
|
|
|
|
|
|
During the startup, two threads are started:
|
|
|
|
|
|
|
|
|
|
1. Generator: Fetches the list of scheduled triggers from cache and generates
|
|
|
|
|
the scheduled events.
|
|
|
|
|
|
|
|
|
|
- Additional events will be generated only if there are fewer than 100
|
|
|
|
|
scheduled events.
|
|
|
|
|
|
|
|
|
|
- The upcoming events timestamp will be generated using:
|
|
|
|
|
|
|
|
|
|
- cron schedule of the scheduled trigger
|
|
|
|
|
|
|
|
|
|
- max timestamp of the scheduled events that already exist or
|
|
|
|
|
current_timestamp(when no scheduled events exist)
|
|
|
|
|
|
|
|
|
|
- The timestamp of the scheduled events is stored with timezone because
|
|
|
|
|
`SELECT NOW()` returns timestamp with timezone, so it's good to
|
|
|
|
|
compare two things of the same type.
|
|
|
|
|
|
|
|
|
|
This effectively corresponds to doing an INSERT with values containing
|
|
|
|
|
specific timestamp.
|
|
|
|
|
|
|
|
|
|
2. Processor: Fetches the undelivered cron events and the scheduled events
|
|
|
|
|
from the database and which have timestamp lesser than the
|
|
|
|
|
current timestamp and then process them.
|
|
|
|
|
-}
|
|
|
|
|
module Hasura.Eventing.ScheduledTrigger
|
|
|
|
|
( runCronEventsGenerator
|
|
|
|
|
, processScheduledTriggers
|
|
|
|
|
|
|
|
|
|
, CronEventSeed(..)
|
|
|
|
|
, generateScheduleTimes
|
|
|
|
|
, insertCronEvents
|
|
|
|
|
, StandAloneScheduledEvent(..)
|
2020-07-02 14:57:09 +03:00
|
|
|
|
, initLockedEventsCtx
|
|
|
|
|
, LockedEventsCtx(..)
|
|
|
|
|
, unlockCronEvents
|
|
|
|
|
, unlockStandaloneScheduledEvents
|
|
|
|
|
, unlockAllLockedScheduledEvents
|
2020-05-13 15:33:16 +03:00
|
|
|
|
) where
|
|
|
|
|
|
2020-07-14 22:00:58 +03:00
|
|
|
|
import Control.Arrow.Extended (dup)
|
|
|
|
|
import Control.Concurrent.Extended (sleep)
|
2020-07-02 14:57:09 +03:00
|
|
|
|
import Control.Concurrent.STM.TVar
|
2020-05-13 15:33:16 +03:00
|
|
|
|
import Data.Has
|
2020-07-14 22:00:58 +03:00
|
|
|
|
import Data.Int (Int64)
|
|
|
|
|
import Data.List (unfoldr)
|
2020-05-13 15:33:16 +03:00
|
|
|
|
import Data.Time.Clock
|
2020-07-14 22:00:58 +03:00
|
|
|
|
import Hasura.Eventing.Common
|
2020-05-13 15:33:16 +03:00
|
|
|
|
import Hasura.Eventing.HTTP
|
|
|
|
|
import Hasura.HTTP
|
|
|
|
|
import Hasura.Prelude
|
2020-07-14 22:00:58 +03:00
|
|
|
|
import Hasura.RQL.DDL.EventTrigger (getHeaderInfosFromConf)
|
2020-05-13 15:33:16 +03:00
|
|
|
|
import Hasura.RQL.DDL.Headers
|
|
|
|
|
import Hasura.RQL.Types
|
2020-07-14 22:00:58 +03:00
|
|
|
|
import Hasura.Server.Version (HasVersion)
|
2020-05-13 15:33:16 +03:00
|
|
|
|
import Hasura.SQL.DML
|
|
|
|
|
import Hasura.SQL.Types
|
|
|
|
|
import System.Cron
|
|
|
|
|
|
2020-07-14 22:00:58 +03:00
|
|
|
|
import qualified Data.Aeson as J
|
|
|
|
|
import qualified Data.Aeson.Casing as J
|
|
|
|
|
import qualified Data.Aeson.TH as J
|
2020-07-28 20:52:44 +03:00
|
|
|
|
import qualified Data.ByteString.Lazy as BL
|
2020-07-14 22:00:58 +03:00
|
|
|
|
import qualified Data.Environment as Env
|
|
|
|
|
import qualified Data.HashMap.Strict as Map
|
|
|
|
|
import qualified Data.Set as Set
|
|
|
|
|
import qualified Data.TByteString as TBS
|
|
|
|
|
import qualified Data.Text as T
|
|
|
|
|
import qualified Database.PG.Query as Q
|
|
|
|
|
import qualified Database.PG.Query.PTI as PTI
|
|
|
|
|
import qualified Hasura.Logging as L
|
2020-07-15 13:40:48 +03:00
|
|
|
|
import qualified Hasura.Tracing as Tracing
|
2020-07-14 22:00:58 +03:00
|
|
|
|
import qualified Network.HTTP.Client as HTTP
|
|
|
|
|
import qualified PostgreSQL.Binary.Decoding as PD
|
|
|
|
|
import qualified PostgreSQL.Binary.Encoding as PE
|
|
|
|
|
import qualified Text.Builder as TB (run)
|
2020-07-02 14:57:09 +03:00
|
|
|
|
|
2020-05-13 15:33:16 +03:00
|
|
|
|
|
|
|
|
|
newtype ScheduledTriggerInternalErr
|
|
|
|
|
= ScheduledTriggerInternalErr QErr
|
|
|
|
|
deriving (Show, Eq)
|
|
|
|
|
|
|
|
|
|
instance L.ToEngineLog ScheduledTriggerInternalErr L.Hasura where
|
|
|
|
|
toEngineLog (ScheduledTriggerInternalErr qerr) =
|
|
|
|
|
(L.LevelError, L.scheduledTriggerLogType, J.toJSON qerr)
|
|
|
|
|
|
|
|
|
|
cronEventsTable :: QualifiedTable
|
|
|
|
|
cronEventsTable =
|
|
|
|
|
QualifiedObject
|
|
|
|
|
hdbCatalogSchema
|
|
|
|
|
(TableName $ T.pack "hdb_cron_events")
|
|
|
|
|
|
|
|
|
|
data ScheduledEventStatus
|
|
|
|
|
= SESScheduled
|
|
|
|
|
| SESLocked
|
|
|
|
|
| SESDelivered
|
|
|
|
|
| SESError
|
|
|
|
|
| SESDead
|
|
|
|
|
deriving (Show, Eq)
|
|
|
|
|
|
|
|
|
|
scheduledEventStatusToText :: ScheduledEventStatus -> Text
|
|
|
|
|
scheduledEventStatusToText SESScheduled = "scheduled"
|
2020-07-14 22:00:58 +03:00
|
|
|
|
scheduledEventStatusToText SESLocked = "locked"
|
2020-05-13 15:33:16 +03:00
|
|
|
|
scheduledEventStatusToText SESDelivered = "delivered"
|
2020-07-14 22:00:58 +03:00
|
|
|
|
scheduledEventStatusToText SESError = "error"
|
|
|
|
|
scheduledEventStatusToText SESDead = "dead"
|
2020-05-13 15:33:16 +03:00
|
|
|
|
|
|
|
|
|
instance Q.ToPrepArg ScheduledEventStatus where
|
|
|
|
|
toPrepVal = Q.toPrepVal . scheduledEventStatusToText
|
|
|
|
|
|
|
|
|
|
instance Q.FromCol ScheduledEventStatus where
|
|
|
|
|
fromCol bs = flip Q.fromColHelper bs $ PD.enum $ \case
|
|
|
|
|
"scheduled" -> Just SESScheduled
|
|
|
|
|
"locked" -> Just SESLocked
|
|
|
|
|
"delivered" -> Just SESDelivered
|
|
|
|
|
"error" -> Just SESError
|
|
|
|
|
"dead" -> Just SESDead
|
|
|
|
|
_ -> Nothing
|
|
|
|
|
|
|
|
|
|
instance J.ToJSON ScheduledEventStatus where
|
|
|
|
|
toJSON = J.String . scheduledEventStatusToText
|
|
|
|
|
|
2020-07-02 14:57:09 +03:00
|
|
|
|
type ScheduledEventId = Text
|
|
|
|
|
|
2020-05-13 15:33:16 +03:00
|
|
|
|
data CronTriggerStats
|
|
|
|
|
= CronTriggerStats
|
|
|
|
|
{ ctsName :: !TriggerName
|
|
|
|
|
, ctsUpcomingEventsCount :: !Int
|
|
|
|
|
, ctsMaxScheduledTime :: !UTCTime
|
|
|
|
|
} deriving (Show, Eq)
|
|
|
|
|
|
|
|
|
|
data CronEventSeed
|
|
|
|
|
= CronEventSeed
|
|
|
|
|
{ cesName :: !TriggerName
|
|
|
|
|
, cesScheduledTime :: !UTCTime
|
|
|
|
|
} deriving (Show, Eq)
|
|
|
|
|
|
|
|
|
|
data CronEventPartial
|
|
|
|
|
= CronEventPartial
|
2020-07-02 14:57:09 +03:00
|
|
|
|
{ cepId :: !CronEventId
|
2020-05-13 15:33:16 +03:00
|
|
|
|
, cepName :: !TriggerName
|
|
|
|
|
, cepScheduledTime :: !UTCTime
|
|
|
|
|
, cepTries :: !Int
|
|
|
|
|
} deriving (Show, Eq)
|
|
|
|
|
|
|
|
|
|
data ScheduledEventFull
|
|
|
|
|
= ScheduledEventFull
|
2020-07-02 14:57:09 +03:00
|
|
|
|
{ sefId :: !ScheduledEventId
|
2020-05-13 15:33:16 +03:00
|
|
|
|
, sefName :: !(Maybe TriggerName)
|
|
|
|
|
-- ^ sefName is the name of the cron trigger.
|
|
|
|
|
-- A standalone scheduled event is not associated with a name, so in that
|
|
|
|
|
-- case, 'sefName' will be @Nothing@
|
|
|
|
|
, sefScheduledTime :: !UTCTime
|
|
|
|
|
, sefTries :: !Int
|
|
|
|
|
, sefWebhook :: !Text
|
|
|
|
|
, sefPayload :: !J.Value
|
|
|
|
|
, sefRetryConf :: !STRetryConf
|
|
|
|
|
, sefHeaders :: ![EventHeaderInfo]
|
|
|
|
|
, sefComment :: !(Maybe Text)
|
|
|
|
|
} deriving (Show, Eq)
|
|
|
|
|
|
|
|
|
|
$(J.deriveToJSON (J.aesonDrop 3 J.snakeCase) {J.omitNothingFields = True} ''ScheduledEventFull)
|
|
|
|
|
|
|
|
|
|
data StandAloneScheduledEvent
|
|
|
|
|
= StandAloneScheduledEvent
|
2020-07-02 14:57:09 +03:00
|
|
|
|
{ saseId :: !StandAloneScheduledEventId
|
2020-05-13 15:33:16 +03:00
|
|
|
|
, saseScheduledTime :: !UTCTime
|
|
|
|
|
, saseTries :: !Int
|
|
|
|
|
, saseWebhook :: !InputWebhook
|
|
|
|
|
, sasePayload :: !(Maybe J.Value)
|
|
|
|
|
, saseRetryConf :: !STRetryConf
|
|
|
|
|
, saseHeaderConf :: ![HeaderConf]
|
|
|
|
|
, saseComment :: !(Maybe Text)
|
|
|
|
|
} deriving (Show, Eq)
|
|
|
|
|
|
|
|
|
|
$(J.deriveToJSON (J.aesonDrop 4 J.snakeCase) {J.omitNothingFields = True} ''StandAloneScheduledEvent)
|
|
|
|
|
|
|
|
|
|
-- | The 'ScheduledEventType' data type is needed to differentiate
|
|
|
|
|
-- between a 'CronScheduledEvent' and 'StandAloneEvent' scheduled
|
|
|
|
|
-- event because they both have different configurations
|
|
|
|
|
-- and they live in different tables.
|
|
|
|
|
data ScheduledEventType =
|
|
|
|
|
CronScheduledEvent
|
|
|
|
|
-- ^ A Cron scheduled event has a template defined which will
|
|
|
|
|
-- contain the webhook, header configuration, retry
|
|
|
|
|
-- configuration and a payload. Every cron event created
|
|
|
|
|
-- uses the above mentioned configurations defined in the template.
|
|
|
|
|
-- The configuration defined with the cron trigger is cached
|
|
|
|
|
-- and hence it's not fetched along the cron scheduled events.
|
|
|
|
|
| StandAloneEvent
|
|
|
|
|
-- ^ A standalone scheduled event doesn't have any template defined
|
|
|
|
|
-- so all the configuration is fetched along the scheduled events.
|
|
|
|
|
deriving (Eq, Show)
|
|
|
|
|
|
2020-07-03 03:55:07 +03:00
|
|
|
|
data ScheduledEventWebhookPayload
|
|
|
|
|
= ScheduledEventWebhookPayload
|
|
|
|
|
{ sewpId :: !Text
|
|
|
|
|
, sewpName :: !(Maybe TriggerName)
|
|
|
|
|
, sewpScheduledTime :: !UTCTime
|
|
|
|
|
, sewpPayload :: !J.Value
|
|
|
|
|
, sewpComment :: !(Maybe Text)
|
|
|
|
|
, sewpCreatedAt :: !UTCTime
|
|
|
|
|
} deriving (Show, Eq)
|
|
|
|
|
|
|
|
|
|
$(J.deriveToJSON (J.aesonDrop 4 J.snakeCase) {J.omitNothingFields = True} ''ScheduledEventWebhookPayload)
|
|
|
|
|
|
2020-05-13 15:33:16 +03:00
|
|
|
|
-- | runCronEventsGenerator makes sure that all the cron triggers
|
|
|
|
|
-- have an adequate buffer of cron events.
|
|
|
|
|
runCronEventsGenerator ::
|
|
|
|
|
L.Logger L.Hasura
|
|
|
|
|
-> Q.PGPool
|
|
|
|
|
-> IO SchemaCache
|
|
|
|
|
-> IO void
|
|
|
|
|
runCronEventsGenerator logger pgpool getSC = do
|
|
|
|
|
forever $ do
|
|
|
|
|
sc <- getSC
|
|
|
|
|
-- get cron triggers from cache
|
|
|
|
|
let cronTriggersCache = scCronTriggers sc
|
|
|
|
|
|
|
|
|
|
-- get cron trigger stats from db
|
|
|
|
|
runExceptT
|
|
|
|
|
(Q.runTx pgpool (Q.ReadCommitted, Just Q.ReadOnly) getDeprivedCronTriggerStats) >>= \case
|
|
|
|
|
Left err -> L.unLogger logger $
|
|
|
|
|
ScheduledTriggerInternalErr $ err500 Unexpected (T.pack $ show err)
|
|
|
|
|
Right deprivedCronTriggerStats -> do
|
|
|
|
|
-- join stats with cron triggers and produce @[(CronTriggerInfo, CronTriggerStats)]@
|
|
|
|
|
cronTriggersForHydrationWithStats <-
|
|
|
|
|
catMaybes <$>
|
|
|
|
|
mapM (withCronTrigger cronTriggersCache) deprivedCronTriggerStats
|
|
|
|
|
-- insert cron events for cron triggers that need hydration
|
|
|
|
|
runExceptT
|
|
|
|
|
(Q.runTx pgpool (Q.ReadCommitted, Just Q.ReadWrite) $
|
|
|
|
|
insertCronEventsFor cronTriggersForHydrationWithStats) >>= \case
|
|
|
|
|
Right _ -> pure ()
|
|
|
|
|
Left err ->
|
|
|
|
|
L.unLogger logger $ ScheduledTriggerInternalErr $ err500 Unexpected (T.pack $ show err)
|
|
|
|
|
sleep (minutes 1)
|
|
|
|
|
where
|
|
|
|
|
getDeprivedCronTriggerStats = liftTx $ do
|
|
|
|
|
map uncurryStats <$>
|
|
|
|
|
Q.listQE defaultTxErrorHandler
|
|
|
|
|
[Q.sql|
|
|
|
|
|
SELECT name, upcoming_events_count, max_scheduled_time
|
|
|
|
|
FROM hdb_catalog.hdb_cron_events_stats
|
|
|
|
|
WHERE upcoming_events_count < 100
|
|
|
|
|
|] () True
|
|
|
|
|
|
|
|
|
|
uncurryStats (n, count, maxTs) = CronTriggerStats n count maxTs
|
|
|
|
|
|
|
|
|
|
withCronTrigger cronTriggerCache cronTriggerStat = do
|
|
|
|
|
case Map.lookup (ctsName cronTriggerStat) cronTriggerCache of
|
|
|
|
|
Nothing -> do
|
|
|
|
|
L.unLogger logger $
|
|
|
|
|
ScheduledTriggerInternalErr $
|
|
|
|
|
err500 Unexpected $
|
|
|
|
|
"could not find scheduled trigger in the schema cache"
|
|
|
|
|
pure Nothing
|
|
|
|
|
Just cronTrigger -> pure $
|
|
|
|
|
Just (cronTrigger, cronTriggerStat)
|
|
|
|
|
|
|
|
|
|
insertCronEventsFor :: [(CronTriggerInfo, CronTriggerStats)] -> Q.TxE QErr ()
|
|
|
|
|
insertCronEventsFor cronTriggersWithStats = do
|
|
|
|
|
let scheduledEvents = flip concatMap cronTriggersWithStats $ \(cti, stats) ->
|
|
|
|
|
generateCronEventsFrom (ctsMaxScheduledTime stats) cti
|
|
|
|
|
case scheduledEvents of
|
|
|
|
|
[] -> pure ()
|
|
|
|
|
events -> do
|
|
|
|
|
let insertCronEventsSql = TB.run $ toSQL
|
|
|
|
|
SQLInsert
|
|
|
|
|
{ siTable = cronEventsTable
|
|
|
|
|
, siCols = map unsafePGCol ["trigger_name", "scheduled_time"]
|
|
|
|
|
, siValues = ValuesExp $ map (toTupleExp . toArr) events
|
|
|
|
|
, siConflict = Just $ DoNothing Nothing
|
|
|
|
|
, siRet = Nothing
|
|
|
|
|
}
|
|
|
|
|
Q.unitQE defaultTxErrorHandler (Q.fromText insertCronEventsSql) () False
|
|
|
|
|
where
|
|
|
|
|
toArr (CronEventSeed n t) = [(triggerNameToTxt n), (formatTime' t)]
|
|
|
|
|
toTupleExp = TupleExp . map SELit
|
|
|
|
|
|
|
|
|
|
insertCronEvents :: [CronEventSeed] -> Q.TxE QErr ()
|
|
|
|
|
insertCronEvents events = do
|
|
|
|
|
let insertCronEventsSql = TB.run $ toSQL
|
|
|
|
|
SQLInsert
|
|
|
|
|
{ siTable = cronEventsTable
|
|
|
|
|
, siCols = map unsafePGCol ["trigger_name", "scheduled_time"]
|
|
|
|
|
, siValues = ValuesExp $ map (toTupleExp . toArr) events
|
|
|
|
|
, siConflict = Just $ DoNothing Nothing
|
|
|
|
|
, siRet = Nothing
|
|
|
|
|
}
|
|
|
|
|
Q.unitQE defaultTxErrorHandler (Q.fromText insertCronEventsSql) () False
|
|
|
|
|
where
|
|
|
|
|
toArr (CronEventSeed n t) = [(triggerNameToTxt n), (formatTime' t)]
|
|
|
|
|
toTupleExp = TupleExp . map SELit
|
|
|
|
|
|
|
|
|
|
generateCronEventsFrom :: UTCTime -> CronTriggerInfo-> [CronEventSeed]
|
|
|
|
|
generateCronEventsFrom startTime CronTriggerInfo{..} =
|
|
|
|
|
map (CronEventSeed ctiName) $
|
|
|
|
|
generateScheduleTimes startTime 100 ctiSchedule -- generate next 100 events
|
|
|
|
|
|
|
|
|
|
-- | Generates next @n events starting @from according to 'CronSchedule'
|
|
|
|
|
generateScheduleTimes :: UTCTime -> Int -> CronSchedule -> [UTCTime]
|
|
|
|
|
generateScheduleTimes from n cron = take n $ go from
|
|
|
|
|
where
|
|
|
|
|
go = unfoldr (fmap dup . nextMatch cron)
|
|
|
|
|
|
|
|
|
|
processCronEvents
|
2020-07-15 13:40:48 +03:00
|
|
|
|
:: (HasVersion, MonadIO m, Tracing.HasReporter m)
|
2020-05-13 15:33:16 +03:00
|
|
|
|
=> L.Logger L.Hasura
|
|
|
|
|
-> LogEnvHeaders
|
|
|
|
|
-> HTTP.Manager
|
|
|
|
|
-> Q.PGPool
|
|
|
|
|
-> IO SchemaCache
|
2020-07-02 14:57:09 +03:00
|
|
|
|
-> TVar (Set.Set CronEventId)
|
2020-07-14 22:00:58 +03:00
|
|
|
|
-> m ()
|
2020-07-02 14:57:09 +03:00
|
|
|
|
processCronEvents logger logEnv httpMgr pgpool getSC lockedCronEvents = do
|
2020-07-14 22:00:58 +03:00
|
|
|
|
cronTriggersInfo <- scCronTriggers <$> liftIO getSC
|
2020-05-13 15:33:16 +03:00
|
|
|
|
cronScheduledEvents <-
|
2020-07-14 22:00:58 +03:00
|
|
|
|
liftIO . runExceptT $
|
|
|
|
|
Q.runTx pgpool (Q.ReadCommitted, Just Q.ReadWrite) getPartialCronEvents
|
2020-05-13 15:33:16 +03:00
|
|
|
|
case cronScheduledEvents of
|
2020-07-02 14:57:09 +03:00
|
|
|
|
Right partialEvents -> do
|
|
|
|
|
-- save the locked standalone events that have been fetched from the
|
|
|
|
|
-- database, the events stored here will be unlocked in case a
|
|
|
|
|
-- graceful shutdown is initiated in midst of processing these events
|
|
|
|
|
saveLockedEvents (map cepId partialEvents) lockedCronEvents
|
2020-05-13 15:33:16 +03:00
|
|
|
|
for_ partialEvents $ \(CronEventPartial id' name st tries)-> do
|
|
|
|
|
case Map.lookup name cronTriggersInfo of
|
|
|
|
|
Nothing -> logInternalError $
|
|
|
|
|
err500 Unexpected "could not find cron trigger in cache"
|
|
|
|
|
Just CronTriggerInfo{..} -> do
|
|
|
|
|
let webhook = unResolvedWebhook ctiWebhookInfo
|
|
|
|
|
payload' = fromMaybe J.Null ctiPayload
|
|
|
|
|
scheduledEvent =
|
|
|
|
|
ScheduledEventFull id'
|
|
|
|
|
(Just name)
|
|
|
|
|
st
|
|
|
|
|
tries
|
|
|
|
|
webhook
|
|
|
|
|
payload'
|
|
|
|
|
ctiRetryConf
|
|
|
|
|
ctiHeaders
|
|
|
|
|
ctiComment
|
2020-07-15 13:40:48 +03:00
|
|
|
|
finally <- Tracing.runTraceT "scheduled event" . runExceptT $
|
2020-05-13 15:33:16 +03:00
|
|
|
|
runReaderT (processScheduledEvent logEnv pgpool scheduledEvent CronScheduledEvent) (logger, httpMgr)
|
2020-07-02 14:57:09 +03:00
|
|
|
|
removeEventFromLockedEvents id' lockedCronEvents
|
2020-05-13 15:33:16 +03:00
|
|
|
|
either logInternalError pure finally
|
|
|
|
|
Left err -> logInternalError err
|
|
|
|
|
where
|
2020-07-14 22:00:58 +03:00
|
|
|
|
logInternalError err = liftIO . L.unLogger logger $ ScheduledTriggerInternalErr err
|
2020-05-13 15:33:16 +03:00
|
|
|
|
|
|
|
|
|
processStandAloneEvents
|
2020-07-15 13:40:48 +03:00
|
|
|
|
:: (HasVersion, MonadIO m, Tracing.HasReporter m)
|
2020-07-14 22:00:58 +03:00
|
|
|
|
=> Env.Environment
|
|
|
|
|
-> L.Logger L.Hasura
|
2020-05-13 15:33:16 +03:00
|
|
|
|
-> LogEnvHeaders
|
|
|
|
|
-> HTTP.Manager
|
|
|
|
|
-> Q.PGPool
|
2020-07-02 14:57:09 +03:00
|
|
|
|
-> TVar (Set.Set StandAloneScheduledEventId)
|
2020-07-14 22:00:58 +03:00
|
|
|
|
-> m ()
|
|
|
|
|
processStandAloneEvents env logger logEnv httpMgr pgpool lockedStandAloneEvents = do
|
2020-05-13 15:33:16 +03:00
|
|
|
|
standAloneScheduledEvents <-
|
2020-07-14 22:00:58 +03:00
|
|
|
|
liftIO . runExceptT $
|
2020-05-13 15:33:16 +03:00
|
|
|
|
Q.runTx pgpool (Q.ReadCommitted, Just Q.ReadWrite) getOneOffScheduledEvents
|
|
|
|
|
case standAloneScheduledEvents of
|
2020-07-02 14:57:09 +03:00
|
|
|
|
Right standAloneScheduledEvents' -> do
|
|
|
|
|
-- save the locked standalone events that have been fetched from the
|
|
|
|
|
-- database, the events stored here will be unlocked in case a
|
|
|
|
|
-- graceful shutdown is initiated in midst of processing these events
|
|
|
|
|
saveLockedEvents (map saseId standAloneScheduledEvents') lockedStandAloneEvents
|
2020-05-13 15:33:16 +03:00
|
|
|
|
for_ standAloneScheduledEvents' $
|
|
|
|
|
\(StandAloneScheduledEvent id'
|
|
|
|
|
scheduledTime
|
|
|
|
|
tries
|
|
|
|
|
webhookConf
|
|
|
|
|
payload
|
|
|
|
|
retryConf
|
|
|
|
|
headerConf
|
|
|
|
|
comment )
|
|
|
|
|
-> do
|
2020-07-14 22:00:58 +03:00
|
|
|
|
webhookInfo <- liftIO . runExceptT $ resolveWebhook env webhookConf
|
|
|
|
|
headerInfo <- liftIO . runExceptT $ getHeaderInfosFromConf env headerConf
|
2020-05-13 15:33:16 +03:00
|
|
|
|
|
|
|
|
|
case webhookInfo of
|
|
|
|
|
Right webhookInfo' -> do
|
|
|
|
|
case headerInfo of
|
|
|
|
|
Right headerInfo' -> do
|
|
|
|
|
let webhook = unResolvedWebhook webhookInfo'
|
|
|
|
|
payload' = fromMaybe J.Null payload
|
|
|
|
|
scheduledEvent = ScheduledEventFull id'
|
|
|
|
|
Nothing
|
|
|
|
|
scheduledTime
|
|
|
|
|
tries
|
|
|
|
|
webhook
|
|
|
|
|
payload'
|
|
|
|
|
retryConf
|
|
|
|
|
headerInfo'
|
|
|
|
|
comment
|
2020-07-15 13:40:48 +03:00
|
|
|
|
finally <- Tracing.runTraceT "scheduled event" . runExceptT $
|
2020-05-13 15:33:16 +03:00
|
|
|
|
runReaderT (processScheduledEvent logEnv pgpool scheduledEvent StandAloneEvent) $
|
|
|
|
|
(logger, httpMgr)
|
2020-07-02 14:57:09 +03:00
|
|
|
|
removeEventFromLockedEvents id' lockedStandAloneEvents
|
2020-05-13 15:33:16 +03:00
|
|
|
|
either logInternalError pure finally
|
|
|
|
|
|
|
|
|
|
Left headerInfoErr -> logInternalError headerInfoErr
|
|
|
|
|
|
|
|
|
|
Left webhookInfoErr -> logInternalError webhookInfoErr
|
|
|
|
|
|
|
|
|
|
Left standAloneScheduledEventsErr -> logInternalError standAloneScheduledEventsErr
|
|
|
|
|
where
|
2020-07-14 22:00:58 +03:00
|
|
|
|
logInternalError err = liftIO . L.unLogger logger $ ScheduledTriggerInternalErr err
|
2020-05-13 15:33:16 +03:00
|
|
|
|
|
|
|
|
|
processScheduledTriggers
|
2020-07-15 13:40:48 +03:00
|
|
|
|
:: (HasVersion, MonadIO m, Tracing.HasReporter m)
|
2020-07-14 22:00:58 +03:00
|
|
|
|
=> Env.Environment
|
|
|
|
|
-> L.Logger L.Hasura
|
2020-05-13 15:33:16 +03:00
|
|
|
|
-> LogEnvHeaders
|
|
|
|
|
-> HTTP.Manager
|
|
|
|
|
-> Q.PGPool
|
|
|
|
|
-> IO SchemaCache
|
2020-07-02 14:57:09 +03:00
|
|
|
|
-> LockedEventsCtx
|
2020-07-14 22:00:58 +03:00
|
|
|
|
-> m void
|
|
|
|
|
processScheduledTriggers env logger logEnv httpMgr pgpool getSC LockedEventsCtx {..} =
|
2020-05-13 15:33:16 +03:00
|
|
|
|
forever $ do
|
2020-07-02 14:57:09 +03:00
|
|
|
|
processCronEvents logger logEnv httpMgr pgpool getSC leCronEvents
|
2020-07-14 22:00:58 +03:00
|
|
|
|
processStandAloneEvents env logger logEnv httpMgr pgpool leStandAloneEvents
|
|
|
|
|
liftIO $ sleep (minutes 1)
|
2020-05-13 15:33:16 +03:00
|
|
|
|
|
|
|
|
|
processScheduledEvent ::
|
|
|
|
|
( MonadReader r m
|
|
|
|
|
, Has HTTP.Manager r
|
|
|
|
|
, Has (L.Logger L.Hasura) r
|
|
|
|
|
, HasVersion
|
|
|
|
|
, MonadIO m
|
|
|
|
|
, MonadError QErr m
|
2020-07-15 13:40:48 +03:00
|
|
|
|
, Tracing.MonadTrace m
|
2020-05-13 15:33:16 +03:00
|
|
|
|
)
|
|
|
|
|
=> LogEnvHeaders
|
|
|
|
|
-> Q.PGPool
|
|
|
|
|
-> ScheduledEventFull
|
|
|
|
|
-> ScheduledEventType
|
|
|
|
|
-> m ()
|
|
|
|
|
processScheduledEvent
|
|
|
|
|
logEnv pgpool se@ScheduledEventFull {..} type' = do
|
|
|
|
|
currentTime <- liftIO getCurrentTime
|
|
|
|
|
if convertDuration (diffUTCTime currentTime sefScheduledTime)
|
|
|
|
|
> unNonNegativeDiffTime (strcToleranceSeconds sefRetryConf)
|
|
|
|
|
then processDead pgpool se type'
|
|
|
|
|
else do
|
|
|
|
|
let timeoutSeconds = round $ unNonNegativeDiffTime
|
|
|
|
|
$ strcTimeoutSeconds sefRetryConf
|
|
|
|
|
httpTimeout = HTTP.responseTimeoutMicro (timeoutSeconds * 1000000)
|
|
|
|
|
headers = addDefaultHeaders $ map encodeHeader sefHeaders
|
|
|
|
|
extraLogCtx = ExtraLogContext (Just currentTime) sefId
|
2020-07-03 03:55:07 +03:00
|
|
|
|
webhookReqPayload =
|
|
|
|
|
ScheduledEventWebhookPayload sefId sefName sefScheduledTime sefPayload sefComment currentTime
|
|
|
|
|
webhookReqBodyJson = J.toJSON webhookReqPayload
|
2020-07-28 20:52:44 +03:00
|
|
|
|
webhookReqBody = J.encode webhookReqBodyJson
|
|
|
|
|
requestDetails = RequestDetails $ BL.length webhookReqBody
|
|
|
|
|
res <- runExceptT $ tryWebhook headers httpTimeout webhookReqBody (T.unpack sefWebhook)
|
|
|
|
|
logHTTPForST res extraLogCtx requestDetails
|
2020-05-13 15:33:16 +03:00
|
|
|
|
let decodedHeaders = map (decodeHeader logEnv sefHeaders) headers
|
|
|
|
|
either
|
2020-07-03 03:55:07 +03:00
|
|
|
|
(processError pgpool se decodedHeaders type' webhookReqBodyJson)
|
|
|
|
|
(processSuccess pgpool se decodedHeaders type' webhookReqBodyJson)
|
2020-05-13 15:33:16 +03:00
|
|
|
|
res
|
|
|
|
|
|
|
|
|
|
processError
|
|
|
|
|
:: (MonadIO m, MonadError QErr m)
|
2020-07-03 03:55:07 +03:00
|
|
|
|
=> Q.PGPool
|
|
|
|
|
-> ScheduledEventFull
|
|
|
|
|
-> [HeaderConf]
|
|
|
|
|
-> ScheduledEventType
|
|
|
|
|
-> J.Value
|
|
|
|
|
-> HTTPErr a
|
|
|
|
|
-> m ()
|
|
|
|
|
processError pgpool se decodedHeaders type' reqJson err = do
|
2020-05-13 15:33:16 +03:00
|
|
|
|
let invocation = case err of
|
|
|
|
|
HClient excp -> do
|
|
|
|
|
let errMsg = TBS.fromLBS $ J.encode $ show excp
|
2020-07-03 03:55:07 +03:00
|
|
|
|
mkInvocation se 1000 decodedHeaders errMsg [] reqJson
|
2020-05-13 15:33:16 +03:00
|
|
|
|
HParse _ detail -> do
|
|
|
|
|
let errMsg = TBS.fromLBS $ J.encode detail
|
2020-07-03 03:55:07 +03:00
|
|
|
|
mkInvocation se 1001 decodedHeaders errMsg [] reqJson
|
2020-05-13 15:33:16 +03:00
|
|
|
|
HStatus errResp -> do
|
|
|
|
|
let respPayload = hrsBody errResp
|
|
|
|
|
respHeaders = hrsHeaders errResp
|
|
|
|
|
respStatus = hrsStatus errResp
|
2020-07-03 03:55:07 +03:00
|
|
|
|
mkInvocation se respStatus decodedHeaders respPayload respHeaders reqJson
|
2020-05-13 15:33:16 +03:00
|
|
|
|
HOther detail -> do
|
|
|
|
|
let errMsg = (TBS.fromLBS $ J.encode detail)
|
2020-07-03 03:55:07 +03:00
|
|
|
|
mkInvocation se 500 decodedHeaders errMsg [] reqJson
|
2020-05-13 15:33:16 +03:00
|
|
|
|
liftExceptTIO $
|
|
|
|
|
Q.runTx pgpool (Q.RepeatableRead, Just Q.ReadWrite) $ do
|
|
|
|
|
insertInvocation invocation type'
|
|
|
|
|
retryOrMarkError se err type'
|
|
|
|
|
|
|
|
|
|
retryOrMarkError :: ScheduledEventFull -> HTTPErr a -> ScheduledEventType -> Q.TxE QErr ()
|
|
|
|
|
retryOrMarkError se@ScheduledEventFull {..} err type' = do
|
|
|
|
|
let mRetryHeader = getRetryAfterHeaderFromHTTPErr err
|
|
|
|
|
mRetryHeaderSeconds = parseRetryHeaderValue =<< mRetryHeader
|
|
|
|
|
triesExhausted = sefTries >= strcNumRetries sefRetryConf
|
|
|
|
|
noRetryHeader = isNothing mRetryHeaderSeconds
|
|
|
|
|
if triesExhausted && noRetryHeader
|
|
|
|
|
then do
|
|
|
|
|
setScheduledEventStatus sefId SESError type'
|
|
|
|
|
else do
|
|
|
|
|
currentTime <- liftIO getCurrentTime
|
|
|
|
|
let delay = fromMaybe (round $ unNonNegativeDiffTime
|
|
|
|
|
$ strcRetryIntervalSeconds sefRetryConf)
|
|
|
|
|
$ mRetryHeaderSeconds
|
|
|
|
|
diff = fromIntegral delay
|
|
|
|
|
retryTime = addUTCTime diff currentTime
|
|
|
|
|
setRetry se retryTime type'
|
|
|
|
|
|
|
|
|
|
{- Note [Scheduled event lifecycle]
|
|
|
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
|
Scheduled events move between six different states over the course of their
|
|
|
|
|
lifetime, as represented by the following flowchart:
|
|
|
|
|
┌───────────┐ ┌────────┐ ┌───────────┐
|
|
|
|
|
│ scheduled │─(a)─→│ locked │─(b)─→│ delivered │
|
|
|
|
|
└───────────┘ └────────┘ └───────────┘
|
|
|
|
|
↑ │ ┌───────┐
|
|
|
|
|
└────(c)───────┼─────(d)──→│ error │
|
|
|
|
|
│ └───────┘
|
|
|
|
|
│ ┌──────┐
|
|
|
|
|
└─────(e)──→│ dead │
|
|
|
|
|
└──────┘
|
|
|
|
|
|
|
|
|
|
When a scheduled event is first created, it starts in the 'scheduled' state,
|
|
|
|
|
and it can transition to other states in the following ways:
|
|
|
|
|
a. When graphql-engine fetches a scheduled event from the database to process
|
|
|
|
|
it, it sets its state to 'locked'. This prevents multiple graphql-engine
|
|
|
|
|
instances running on the same database from processing the same
|
|
|
|
|
scheduled event concurrently.
|
|
|
|
|
b. When a scheduled event is processed successfully, it is marked 'delivered'.
|
|
|
|
|
c. If a scheduled event fails to be processed, but it hasn’t yet reached
|
|
|
|
|
its maximum retry limit, its retry counter is incremented and
|
|
|
|
|
it is returned to the 'scheduled' state.
|
|
|
|
|
d. If a scheduled event fails to be processed and *has* reached its
|
|
|
|
|
retry limit, its state is set to 'error'.
|
|
|
|
|
e. If for whatever reason the difference between the current time and the
|
|
|
|
|
scheduled time is greater than the tolerance of the scheduled event, it
|
|
|
|
|
will not be processed and its state will be set to 'dead'.
|
|
|
|
|
-}
|
|
|
|
|
|
|
|
|
|
processSuccess
|
|
|
|
|
:: (MonadIO m, MonadError QErr m)
|
2020-07-03 03:55:07 +03:00
|
|
|
|
=> Q.PGPool
|
|
|
|
|
-> ScheduledEventFull
|
|
|
|
|
-> [HeaderConf]
|
|
|
|
|
-> ScheduledEventType
|
|
|
|
|
-> J.Value
|
|
|
|
|
-> HTTPResp a
|
|
|
|
|
-> m ()
|
|
|
|
|
processSuccess pgpool se decodedHeaders type' reqBodyJson resp = do
|
2020-05-13 15:33:16 +03:00
|
|
|
|
let respBody = hrsBody resp
|
|
|
|
|
respHeaders = hrsHeaders resp
|
|
|
|
|
respStatus = hrsStatus resp
|
2020-07-03 03:55:07 +03:00
|
|
|
|
invocation = mkInvocation se respStatus decodedHeaders respBody respHeaders reqBodyJson
|
2020-05-13 15:33:16 +03:00
|
|
|
|
liftExceptTIO $
|
|
|
|
|
Q.runTx pgpool (Q.RepeatableRead, Just Q.ReadWrite) $ do
|
|
|
|
|
insertInvocation invocation type'
|
|
|
|
|
setScheduledEventStatus (sefId se) SESDelivered type'
|
|
|
|
|
|
|
|
|
|
processDead :: (MonadIO m, MonadError QErr m) => Q.PGPool -> ScheduledEventFull -> ScheduledEventType -> m ()
|
|
|
|
|
processDead pgpool se type' =
|
|
|
|
|
liftExceptTIO $
|
|
|
|
|
Q.runTx pgpool (Q.RepeatableRead, Just Q.ReadWrite) $
|
|
|
|
|
setScheduledEventStatus (sefId se) SESDead type'
|
|
|
|
|
|
|
|
|
|
setRetry :: ScheduledEventFull -> UTCTime -> ScheduledEventType -> Q.TxE QErr ()
|
|
|
|
|
setRetry se time type' =
|
|
|
|
|
case type' of
|
|
|
|
|
CronScheduledEvent ->
|
|
|
|
|
Q.unitQE defaultTxErrorHandler [Q.sql|
|
|
|
|
|
UPDATE hdb_catalog.hdb_cron_events
|
|
|
|
|
SET next_retry_at = $1,
|
|
|
|
|
STATUS = 'scheduled'
|
|
|
|
|
WHERE id = $2
|
|
|
|
|
|] (time, sefId se) True
|
|
|
|
|
StandAloneEvent ->
|
|
|
|
|
Q.unitQE defaultTxErrorHandler [Q.sql|
|
|
|
|
|
UPDATE hdb_catalog.hdb_scheduled_events
|
|
|
|
|
SET next_retry_at = $1,
|
|
|
|
|
STATUS = 'scheduled'
|
|
|
|
|
WHERE id = $2
|
|
|
|
|
|] (time, sefId se) True
|
|
|
|
|
|
|
|
|
|
mkInvocation
|
2020-07-03 03:55:07 +03:00
|
|
|
|
:: ScheduledEventFull
|
|
|
|
|
-> Int
|
|
|
|
|
-> [HeaderConf]
|
|
|
|
|
-> TBS.TByteString
|
|
|
|
|
-> [HeaderConf]
|
|
|
|
|
-> J.Value
|
2020-05-13 15:33:16 +03:00
|
|
|
|
-> (Invocation 'ScheduledType)
|
2020-07-03 03:55:07 +03:00
|
|
|
|
mkInvocation ScheduledEventFull {sefId} status reqHeaders respBody respHeaders reqBodyJson
|
2020-05-13 15:33:16 +03:00
|
|
|
|
= let resp = if isClientError status
|
|
|
|
|
then mkClientErr respBody
|
|
|
|
|
else mkResp status respBody respHeaders
|
|
|
|
|
in
|
|
|
|
|
Invocation
|
2020-07-03 03:55:07 +03:00
|
|
|
|
sefId
|
2020-05-13 15:33:16 +03:00
|
|
|
|
status
|
2020-07-03 03:55:07 +03:00
|
|
|
|
(mkWebhookReq reqBodyJson reqHeaders invocationVersionST)
|
2020-05-13 15:33:16 +03:00
|
|
|
|
resp
|
|
|
|
|
|
|
|
|
|
insertInvocation :: (Invocation 'ScheduledType) -> ScheduledEventType -> Q.TxE QErr ()
|
|
|
|
|
insertInvocation invo type' = do
|
|
|
|
|
case type' of
|
|
|
|
|
CronScheduledEvent -> do
|
|
|
|
|
Q.unitQE defaultTxErrorHandler
|
|
|
|
|
[Q.sql|
|
|
|
|
|
INSERT INTO hdb_catalog.hdb_cron_event_invocation_logs
|
|
|
|
|
(event_id, status, request, response)
|
|
|
|
|
VALUES ($1, $2, $3, $4)
|
|
|
|
|
|] ( iEventId invo
|
|
|
|
|
, fromIntegral $ iStatus invo :: Int64
|
|
|
|
|
, Q.AltJ $ J.toJSON $ iRequest invo
|
|
|
|
|
, Q.AltJ $ J.toJSON $ iResponse invo) True
|
|
|
|
|
Q.unitQE defaultTxErrorHandler [Q.sql|
|
|
|
|
|
UPDATE hdb_catalog.hdb_cron_events
|
|
|
|
|
SET tries = tries + 1
|
|
|
|
|
WHERE id = $1
|
|
|
|
|
|] (Identity $ iEventId invo) True
|
|
|
|
|
StandAloneEvent -> do
|
|
|
|
|
Q.unitQE defaultTxErrorHandler
|
|
|
|
|
[Q.sql|
|
|
|
|
|
INSERT INTO hdb_catalog.hdb_scheduled_event_invocation_logs
|
|
|
|
|
(event_id, status, request, response)
|
|
|
|
|
VALUES ($1, $2, $3, $4)
|
|
|
|
|
|] ( iEventId invo
|
|
|
|
|
, fromIntegral $ iStatus invo :: Int64
|
|
|
|
|
, Q.AltJ $ J.toJSON $ iRequest invo
|
|
|
|
|
, Q.AltJ $ J.toJSON $ iResponse invo) True
|
|
|
|
|
Q.unitQE defaultTxErrorHandler [Q.sql|
|
|
|
|
|
UPDATE hdb_catalog.hdb_scheduled_events
|
|
|
|
|
SET tries = tries + 1
|
|
|
|
|
WHERE id = $1
|
|
|
|
|
|] (Identity $ iEventId invo) True
|
|
|
|
|
|
|
|
|
|
setScheduledEventStatus :: Text -> ScheduledEventStatus -> ScheduledEventType -> Q.TxE QErr ()
|
|
|
|
|
setScheduledEventStatus scheduledEventId status type' =
|
|
|
|
|
case type' of
|
|
|
|
|
CronScheduledEvent -> do
|
|
|
|
|
Q.unitQE defaultTxErrorHandler
|
|
|
|
|
[Q.sql|
|
|
|
|
|
UPDATE hdb_catalog.hdb_cron_events
|
|
|
|
|
SET status = $2
|
|
|
|
|
WHERE id = $1
|
|
|
|
|
|] (scheduledEventId, status) True
|
|
|
|
|
StandAloneEvent -> do
|
|
|
|
|
Q.unitQE defaultTxErrorHandler
|
|
|
|
|
[Q.sql|
|
|
|
|
|
UPDATE hdb_catalog.hdb_scheduled_events
|
|
|
|
|
SET status = $2
|
|
|
|
|
WHERE id = $1
|
|
|
|
|
|] (scheduledEventId, status) True
|
|
|
|
|
|
|
|
|
|
getPartialCronEvents :: Q.TxE QErr [CronEventPartial]
|
|
|
|
|
getPartialCronEvents = do
|
|
|
|
|
map uncurryEvent <$> Q.listQE defaultTxErrorHandler [Q.sql|
|
|
|
|
|
UPDATE hdb_catalog.hdb_cron_events
|
|
|
|
|
SET status = 'locked'
|
|
|
|
|
WHERE id IN ( SELECT t.id
|
|
|
|
|
FROM hdb_catalog.hdb_cron_events t
|
|
|
|
|
WHERE ( t.status = 'scheduled'
|
|
|
|
|
and (
|
|
|
|
|
(t.next_retry_at is NULL and t.scheduled_time <= now()) or
|
|
|
|
|
(t.next_retry_at is not NULL and t.next_retry_at <= now())
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
FOR UPDATE SKIP LOCKED
|
|
|
|
|
)
|
|
|
|
|
RETURNING id, trigger_name, scheduled_time, tries
|
|
|
|
|
|] () True
|
|
|
|
|
where uncurryEvent (i, n, st, tries) = CronEventPartial i n st tries
|
|
|
|
|
|
|
|
|
|
getOneOffScheduledEvents :: Q.TxE QErr [StandAloneScheduledEvent]
|
|
|
|
|
getOneOffScheduledEvents = do
|
|
|
|
|
map uncurryOneOffEvent <$> Q.listQE defaultTxErrorHandler [Q.sql|
|
|
|
|
|
UPDATE hdb_catalog.hdb_scheduled_events
|
|
|
|
|
SET status = 'locked'
|
|
|
|
|
WHERE id IN ( SELECT t.id
|
|
|
|
|
FROM hdb_catalog.hdb_scheduled_events t
|
|
|
|
|
WHERE ( t.status = 'scheduled'
|
|
|
|
|
and (
|
|
|
|
|
(t.next_retry_at is NULL and t.scheduled_time <= now()) or
|
|
|
|
|
(t.next_retry_at is not NULL and t.next_retry_at <= now())
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
FOR UPDATE SKIP LOCKED
|
|
|
|
|
)
|
|
|
|
|
RETURNING id, webhook_conf, scheduled_time, retry_conf, payload, header_conf, tries, comment
|
|
|
|
|
|] () False
|
|
|
|
|
where
|
|
|
|
|
uncurryOneOffEvent ( eventId
|
|
|
|
|
, webhookConf
|
|
|
|
|
, scheduledTime
|
|
|
|
|
, retryConf
|
|
|
|
|
, payload
|
|
|
|
|
, headerConf
|
|
|
|
|
, tries
|
|
|
|
|
, comment ) =
|
|
|
|
|
StandAloneScheduledEvent eventId
|
|
|
|
|
scheduledTime
|
|
|
|
|
tries
|
|
|
|
|
(Q.getAltJ webhookConf)
|
|
|
|
|
(Q.getAltJ payload)
|
|
|
|
|
(Q.getAltJ retryConf)
|
|
|
|
|
(Q.getAltJ headerConf)
|
|
|
|
|
comment
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
liftExceptTIO :: (MonadError e m, MonadIO m) => ExceptT e IO a -> m a
|
|
|
|
|
liftExceptTIO m = liftEither =<< liftIO (runExceptT m)
|
2020-07-02 14:57:09 +03:00
|
|
|
|
|
|
|
|
|
newtype ScheduledEventIdArray =
|
|
|
|
|
ScheduledEventIdArray { unScheduledEventIdArray :: [ScheduledEventId]}
|
|
|
|
|
deriving (Show, Eq)
|
|
|
|
|
|
|
|
|
|
instance Q.ToPrepArg ScheduledEventIdArray where
|
|
|
|
|
toPrepVal (ScheduledEventIdArray l) = Q.toPrepValHelper PTI.unknown encoder $ l
|
|
|
|
|
where
|
|
|
|
|
-- 25 is the OID value of TEXT, https://jdbc.postgresql.org/development/privateapi/constant-values.html
|
|
|
|
|
encoder = PE.array 25 . PE.dimensionArray foldl' (PE.encodingArray . PE.text_strict)
|
|
|
|
|
|
|
|
|
|
unlockCronEvents :: [ScheduledEventId] -> Q.TxE QErr Int
|
|
|
|
|
unlockCronEvents scheduledEventIds =
|
|
|
|
|
(runIdentity . Q.getRow) <$> Q.withQE defaultTxErrorHandler
|
|
|
|
|
[Q.sql|
|
|
|
|
|
WITH "cte" AS
|
|
|
|
|
(UPDATE hdb_catalog.hdb_cron_events
|
|
|
|
|
SET status = 'scheduled'
|
|
|
|
|
WHERE id = ANY($1::text[]) and status = 'locked'
|
|
|
|
|
RETURNING *)
|
|
|
|
|
SELECT count(*) FROM "cte"
|
|
|
|
|
|] (Identity $ ScheduledEventIdArray scheduledEventIds) True
|
|
|
|
|
|
|
|
|
|
unlockStandaloneScheduledEvents :: [ScheduledEventId] -> Q.TxE QErr Int
|
|
|
|
|
unlockStandaloneScheduledEvents scheduledEventIds =
|
|
|
|
|
(runIdentity . Q.getRow) <$> Q.withQE defaultTxErrorHandler
|
|
|
|
|
[Q.sql|
|
|
|
|
|
WITH "cte" AS
|
|
|
|
|
(UPDATE hdb_catalog.hdb_scheduled_events
|
|
|
|
|
SET status = 'scheduled'
|
|
|
|
|
WHERE id = ANY($1::text[]) AND status = 'locked'
|
|
|
|
|
RETURNING *)
|
|
|
|
|
SELECT count(*) FROM "cte"
|
|
|
|
|
|] (Identity $ ScheduledEventIdArray scheduledEventIds) True
|
|
|
|
|
|
|
|
|
|
unlockAllLockedScheduledEvents :: Q.TxE QErr ()
|
|
|
|
|
unlockAllLockedScheduledEvents = do
|
|
|
|
|
Q.unitQE defaultTxErrorHandler [Q.sql|
|
|
|
|
|
UPDATE hdb_catalog.hdb_cron_events
|
|
|
|
|
SET status = 'scheduled'
|
|
|
|
|
WHERE status = 'locked'
|
|
|
|
|
|] () True
|
|
|
|
|
Q.unitQE defaultTxErrorHandler [Q.sql|
|
|
|
|
|
UPDATE hdb_catalog.hdb_scheduled_events
|
|
|
|
|
SET status = 'scheduled'
|
|
|
|
|
WHERE status = 'locked'
|
|
|
|
|
|] () True
|