mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-17 04:24:35 +03:00
db710d38b7
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/5746 Co-authored-by: pranshi06 <85474619+pranshi06@users.noreply.github.com> Co-authored-by: Puru Gupta <32328846+purugupta99@users.noreply.github.com> Co-authored-by: Karthikeyan Chinnakonda <15602904+codingkarthik@users.noreply.github.com> GitOrigin-RevId: f4e5e06389ca57bdb5f7771f459c07418787111d
55 lines
1.9 KiB
Haskell
55 lines
1.9 KiB
Haskell
module Hasura.Eventing.Common
|
|
( LockedEventsCtx (..),
|
|
saveLockedEvents,
|
|
removeEventFromLockedEvents,
|
|
generateScheduleTimes,
|
|
)
|
|
where
|
|
|
|
import Control.Arrow.Extended
|
|
import Control.Concurrent.STM.TVar
|
|
import Control.Monad.STM
|
|
import Data.List (unfoldr)
|
|
import Data.Set qualified as Set
|
|
import Data.Time
|
|
import Hasura.Prelude
|
|
import Hasura.RQL.Types.Action (LockedActionEventId)
|
|
import Hasura.RQL.Types.Common
|
|
import Hasura.RQL.Types.Eventing (EventId)
|
|
import Hasura.RQL.Types.ScheduledTrigger (CronEventId, OneOffScheduledEventId)
|
|
import System.Cron
|
|
|
|
data LockedEventsCtx = LockedEventsCtx
|
|
{ leCronEvents :: TVar (Set.Set CronEventId),
|
|
leOneOffEvents :: TVar (Set.Set OneOffScheduledEventId),
|
|
leEvents :: TVar (HashMap SourceName (Set.Set EventId)),
|
|
leActionEvents :: TVar (Set.Set LockedActionEventId)
|
|
}
|
|
|
|
-- | After the events are fetched from the DB, we store the locked events
|
|
-- in a hash set(order doesn't matter and look ups are faster) in the
|
|
-- event engine context
|
|
saveLockedEvents :: (MonadIO m) => [EventId] -> TVar (Set.Set EventId) -> m ()
|
|
saveLockedEvents eventIds lockedEvents =
|
|
liftIO $
|
|
atomically $ do
|
|
lockedEventsVals <- readTVar lockedEvents
|
|
writeTVar lockedEvents
|
|
$! Set.union lockedEventsVals
|
|
$ Set.fromList eventIds
|
|
|
|
-- | Remove an event from the 'LockedEventsCtx' after it has been processed
|
|
removeEventFromLockedEvents ::
|
|
MonadIO m => EventId -> TVar (Set.Set EventId) -> m ()
|
|
removeEventFromLockedEvents eventId lockedEvents =
|
|
liftIO $
|
|
atomically $ do
|
|
lockedEventsVals <- readTVar lockedEvents
|
|
writeTVar lockedEvents $! Set.delete eventId lockedEventsVals
|
|
|
|
-- | Generates next @n events starting @from according to 'CronSchedule'
|
|
generateScheduleTimes :: UTCTime -> Int -> CronSchedule -> [UTCTime]
|
|
generateScheduleTimes from n cron = take n $ go from
|
|
where
|
|
go = unfoldr (fmap dup . nextMatch cron)
|