2021-07-02 20:24:49 +03:00
module Hasura.Eventing.Common
2021-09-20 10:34:59 +03:00
( LockedEventsCtx (..),
2022-09-13 11:33:44 +03:00
2022-09-15 14:45:14 +03:00
2022-10-11 22:26:38 +03:00
2021-09-20 10:34:59 +03:00
2022-09-13 11:33:44 +03:00
import Control.Arrow.Extended
2020-07-02 14:57:09 +03:00
import Control.Concurrent.STM.TVar
import Control.Monad.STM
2022-09-13 11:33:44 +03:00
import Data.List (unfoldr)
2020-07-14 22:00:58 +03:00
import Data.Set qualified as Set
2022-09-13 11:33:44 +03:00
import Data.Time
2022-10-11 22:26:38 +03:00
import Hasura.Base.Error (QErr)
2021-09-20 10:34:59 +03:00
import Hasura.Prelude
2021-05-14 12:38:37 +03:00
import Hasura.RQL.Types.Action (LockedActionEventId)
2021-09-20 10:34:59 +03:00
import Hasura.RQL.Types.Common
2022-10-11 22:26:38 +03:00
import Hasura.RQL.Types.EventTrigger
2021-09-20 10:34:59 +03:00
import Hasura.RQL.Types.Eventing (EventId)
2020-09-07 09:15:15 +03:00
import Hasura.RQL.Types.ScheduledTrigger (CronEventId, OneOffScheduledEventId)
2022-09-13 11:33:44 +03:00
import System.Cron
2020-07-02 14:57:09 +03:00
data LockedEventsCtx = LockedEventsCtx
2020-09-07 09:15:15 +03:00
{ leCronEvents :: TVar (Set.Set CronEventId),
leOneOffEvents :: TVar (Set.Set OneOffScheduledEventId),
2021-09-20 10:34:59 +03:00
leEvents :: TVar (HashMap SourceName (Set.Set EventId)),
2021-05-14 12:38:37 +03:00
leActionEvents :: TVar (Set.Set LockedActionEventId)
2020-07-02 14:57:09 +03:00
-- | After the events are fetched from the DB, we store the locked events
-- in a hash set(order doesn't matter and look ups are faster) in the
-- event engine context
2020-11-25 13:56:44 +03:00
saveLockedEvents :: (MonadIO m) => [EventId] -> TVar (Set.Set EventId) -> m ()
2020-07-02 14:57:09 +03:00
saveLockedEvents eventIds lockedEvents =
2020-07-14 22:00:58 +03:00
liftIO $
atomically $ do
2020-07-02 14:57:09 +03:00
lockedEventsVals <- readTVar lockedEvents
writeTVar lockedEvents $!
Set.union lockedEventsVals $
Set.fromList eventIds
-- | Remove an event from the 'LockedEventsCtx' after it has been processed
2020-11-25 13:56:44 +03:00
removeEventFromLockedEvents ::
MonadIO m => EventId -> TVar (Set.Set EventId) -> m ()
2020-07-02 14:57:09 +03:00
removeEventFromLockedEvents eventId lockedEvents =
2020-07-14 22:00:58 +03:00
liftIO $
atomically $ do
2020-07-02 14:57:09 +03:00
lockedEventsVals <- readTVar lockedEvents
writeTVar lockedEvents $! Set.delete eventId lockedEventsVals
2022-09-13 11:33:44 +03:00
-- | Generates next @n events starting @from according to 'CronSchedule'
generateScheduleTimes :: UTCTime -> Int -> CronSchedule -> [UTCTime]
generateScheduleTimes from n cron = take n $ go from
go = unfoldr (fmap dup . nextMatch cron)
2022-09-15 14:45:14 +03:00
-- | number of cleanup schedules to be generated in one iteration
cleanupSchedulesToBeGenerated :: Int
cleanupSchedulesToBeGenerated = 50
2022-10-11 22:26:38 +03:00
deleteEventTriggerLogsInBatchesWith ::
(MonadIO m, MonadError QErr m) =>
IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus)) ->
TriggerLogCleanupConfig ->
(TriggerLogCleanupConfig -> IO (Either QErr DeletedEventLogStats)) ->
m DeletedEventLogStats
deleteEventTriggerLogsInBatchesWith getLatestCleanupConfig oldCleanupConfig dbLogDeleteAction = do
-- fetch the latest cleanup config from the schema cache
latestCleanupConfig <- liftIO getLatestCleanupConfig
case latestCleanupConfig of
-- if the cleanup has been paused, then don't delete anything
Just (_, ETCSPaused) -> pure (DeletedEventLogStats 0 0)
nonPausedNewConfig -> do
-- get latest cleanup config if available, else use the older one
let cleanupConfig = maybe oldCleanupConfig fst nonPausedNewConfig
-- delete one batch of the logs
deletedStatsForCurrentBatch@(DeletedEventLogStats delEventLogsInBatch delInvocationLogsInBatch) <-
liftEitherM $ liftIO $ dbLogDeleteAction cleanupConfig
-- If no logs has been deleted, then end the recursion
if deletedStatsForCurrentBatch == (DeletedEventLogStats 0 0)
then pure deletedStatsForCurrentBatch
else do
-- if non zero logs has been deleted then do a recursion
(DeletedEventLogStats deletedRemainingEventLogs deletedRemainingInvocationLogs) <-
deleteEventTriggerLogsInBatchesWith getLatestCleanupConfig cleanupConfig dbLogDeleteAction
-- Finally collect all the statistics
pure (DeletedEventLogStats (delEventLogsInBatch + deletedRemainingEventLogs) (delInvocationLogsInBatch + deletedRemainingInvocationLogs))