Merge pull request #5473 from paf31/event-trigger-lock-timeout

https://github.com/hasura/graphql-engine/pull/5473
This commit is contained in:
kodiakhq[bot] 2020-09-09 18:18:02 +00:00 committed by GitHub
commit e680f2c88f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 24 additions and 21 deletions

View File

@ -379,7 +379,6 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp pos
lockedEventsCtx <- liftIO $ atomically initLockedEventsCtx
-- prepare event triggers data
prepareEvents _icPgPool logger
eventEngineCtx <- liftIO $ atomically $ initEventEngineCtx maxEvThrds fetchI
unLogger logger $ mkGenericStrLog LevelInfo "event_triggers" "starting workers"
@ -441,8 +440,10 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp pos
liftIO $ Warp.runSettings warpSettings app
where
-- | prepareEvents is a function to unlock all the events that are
-- locked and unprocessed, which is called while hasura is started.
-- | prepareScheduledEvents is a function to unlock all the scheduled trigger
-- events that are locked and unprocessed, which is called while hasura is
-- started.
--
-- Locked and unprocessed events can occur in 2 ways
-- 1.
-- Hasura's shutdown was not graceful in which all the fetched
@ -452,12 +453,6 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp pos
-- There is another hasura instance which is processing events and
-- it will lock events to process them.
-- So, unlocking all the locked events might re-deliver an event(due to #2).
prepareEvents pool (Logger logger) = do
liftIO $ logger $ mkGenericStrLog LevelInfo "event_triggers" "preparing data"
res <- liftIO $ runTx pool (Q.ReadCommitted, Nothing) unlockAllEvents
either (printErrJExit EventSubSystemError) return res
-- | prepareScheduledEvents is like prepareEvents, but for scheduled triggers
prepareScheduledEvents pool (Logger logger) = do
liftIO $ logger $ mkGenericStrLog LevelInfo "scheduled_triggers" "preparing data"
res <- liftIO $ runTx pool (Q.ReadCommitted, Nothing) unlockAllLockedScheduledEvents

View File

@ -417,10 +417,11 @@ fetchEvents :: Int -> Q.TxE QErr [Event]
fetchEvents limitI =
map uncurryEvent <$> Q.listQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_log
SET locked = 't'
SET locked = NOW()
WHERE id IN ( SELECT l.id
FROM hdb_catalog.event_log l
WHERE l.delivered = 'f' and l.error = 'f' and l.locked = 'f'
WHERE l.delivered = 'f' and l.error = 'f'
and (l.locked IS NULL or l.locked < (NOW() - interval '30 minute'))
and (l.next_retry_at is NULL or l.next_retry_at <= now())
and l.archived = 'f'
ORDER BY created_at
@ -457,14 +458,14 @@ insertInvocation invo = do
setSuccess :: Event -> Q.TxE QErr ()
setSuccess e = Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_log
SET delivered = 't', next_retry_at = NULL, locked = 'f'
SET delivered = 't', next_retry_at = NULL, locked = NULL
WHERE id = $1
|] (Identity $ eId e) True
setError :: Event -> Q.TxE QErr ()
setError e = Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_log
SET error = 't', next_retry_at = NULL, locked = 'f'
SET error = 't', next_retry_at = NULL, locked = NULL
WHERE id = $1
|] (Identity $ eId e) True
@ -472,7 +473,7 @@ setRetry :: Event -> UTCTime -> Q.TxE QErr ()
setRetry e time =
Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_log
SET next_retry_at = $1, locked = 'f'
SET next_retry_at = $1, locked = NULL
WHERE id = $2
|] (time, eId e) True
@ -480,8 +481,8 @@ unlockAllEvents :: Q.TxE QErr ()
unlockAllEvents =
Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_log
SET locked = 'f'
WHERE locked = 't'
SET locked = NULL
WHERE locked IS NOT NULL
|] () True
toInt64 :: (Integral a) => a -> Int64
@ -504,12 +505,12 @@ unlockEvents eventIds =
[Q.sql|
WITH "cte" AS
(UPDATE hdb_catalog.event_log
SET locked = 'f'
SET locked = NULL
WHERE id = ANY($1::text[])
-- only unlock those events that have been locked, it's possible
-- that an event has been processed but not yet been removed from
-- the saved locked events, which will lead to a double send
AND locked = 't'
AND locked IS NOT NULL
RETURNING *)
SELECT count(*) FROM "cte"
|] (Identity $ EventIdArray eventIds) True

View File

@ -158,7 +158,7 @@ fetchEvent :: EventId -> Q.TxE QErr (EventId, Bool)
fetchEvent eid = do
events <- Q.listQE defaultTxErrorHandler
[Q.sql|
SELECT l.id, l.locked
SELECT l.id, l.locked IS NOT NULL AND l.locked >= (NOW() - interval '30 minute')
FROM hdb_catalog.event_log l
JOIN hdb_catalog.event_triggers e
ON l.trigger_name = e.name

View File

@ -1 +1 @@
37
38

View File

@ -302,7 +302,8 @@ CREATE TABLE hdb_catalog.event_log
error BOOLEAN NOT NULL DEFAULT FALSE,
tries INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMP DEFAULT NOW(),
locked BOOLEAN NOT NULL DEFAULT FALSE,
/* when locked IS NULL the event is unlocked and can be processed */
locked TIMESTAMPTZ,
next_retry_at TIMESTAMP,
archived BOOLEAN NOT NULL DEFAULT FALSE
);

View File

@ -0,0 +1,3 @@
ALTER TABLE hdb_catalog.event_log ALTER COLUMN locked DROP DEFAULT;
ALTER TABLE hdb_catalog.event_log ALTER COLUMN locked DROP NOT NULL;
ALTER TABLE hdb_catalog.event_log ALTER COLUMN locked TYPE TIMESTAMPTZ USING CASE WHEN locked THEN NOW() ELSE NULL END;

View File

@ -0,0 +1,3 @@
ALTER TABLE hdb_catalog.event_log ALTER COLUMN locked TYPE BOOLEAN USING locked IS NOT NULL;
ALTER TABLE hdb_catalog.event_log ALTER COLUMN locked SET NOT NULL;
ALTER TABLE hdb_catalog.event_log ALTER COLUMN locked SET DEFAULT false;