Add a 30 minute timeout for event trigger locks

This commit is contained in:
Phil Freeman 2020-07-28 13:45:20 -07:00
parent 0ae5384115
commit fe70d9fbe8
12 changed files with 30 additions and 23 deletions

View File

@ -209,7 +209,7 @@ processEventQueue logger logenv httpMgr pool getSchemaCache eeCtx@EventEngineCtx
Tracing.runTraceTInContext
tracingCtx
t <- processEvent event
& runTraceT "process event"
& runTraceT "Event trigger"
& withEventEngineCtx eeCtx
& flip runReaderT (logger, httpMgr)
& LA.async
@ -408,10 +408,11 @@ fetchEvents :: Int -> Q.TxE QErr [Event]
fetchEvents limitI =
map uncurryEvent <$> Q.listQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_log
SET locked = 't'
SET locked = NOW()
WHERE id IN ( SELECT l.id
FROM hdb_catalog.event_log l
WHERE l.delivered = 'f' and l.error = 'f' and l.locked = 'f'
WHERE l.delivered = 'f' and l.error = 'f'
and (l.locked IS NULL or l.locked < NOW() - interval '30 minute')
and (l.next_retry_at is NULL or l.next_retry_at <= now())
and l.archived = 'f'
ORDER BY created_at
@ -448,14 +449,14 @@ insertInvocation invo = do
setSuccess :: Event -> Q.TxE QErr ()
setSuccess e = Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_log
SET delivered = 't', next_retry_at = NULL, locked = 'f'
SET delivered = 't', next_retry_at = NULL, locked = NULL
WHERE id = $1
|] (Identity $ eId e) True
setError :: Event -> Q.TxE QErr ()
setError e = Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_log
SET error = 't', next_retry_at = NULL, locked = 'f'
SET error = 't', next_retry_at = NULL, locked = NULL
WHERE id = $1
|] (Identity $ eId e) True
@ -463,7 +464,7 @@ setRetry :: Event -> UTCTime -> Q.TxE QErr ()
setRetry e time =
Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_log
SET next_retry_at = $1, locked = 'f'
SET next_retry_at = $1, locked = NULL
WHERE id = $2
|] (time, eId e) True
@ -471,8 +472,8 @@ unlockAllEvents :: Q.TxE QErr ()
unlockAllEvents =
Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_log
SET locked = 'f'
WHERE locked = 't'
SET locked = NULL
WHERE locked IS NOT NULL
|] () True
toInt64 :: (Integral a) => a -> Int64
@ -495,12 +496,12 @@ unlockEvents eventIds =
[Q.sql|
WITH "cte" AS
(UPDATE hdb_catalog.event_log
SET locked = 'f'
SET locked = NULL
WHERE id = ANY($1::text[])
-- only unlock those events that have been locked, it's possible
-- that an event has been processed but not yet been removed from
-- the saved locked events, which will lead to a double send
AND locked = 't'
AND locked IS NOT NULL
RETURNING *)
SELECT count(*) FROM "cte"
|] (Identity $ EventIdArray eventIds) True

View File

@ -375,7 +375,7 @@ processCronEvents logger logEnv httpMgr pgpool getSC lockedCronEvents = do
ctiRetryConf
ctiHeaders
ctiComment
finally <- Tracing.runTraceT "scheduled event" . runExceptT $
finally <- Tracing.runTraceT "Scheduled event" . runExceptT $
runReaderT (processScheduledEvent logEnv pgpool scheduledEvent CronScheduledEvent) (logger, httpMgr)
removeEventFromLockedEvents id' lockedCronEvents
either logInternalError pure finally
@ -430,7 +430,7 @@ processStandAloneEvents env logger logEnv httpMgr pgpool lockedStandAloneEvents
retryConf
headerInfo'
comment
finally <- Tracing.runTraceT "scheduled event" . runExceptT $
finally <- Tracing.runTraceT "Scheduled event" . runExceptT $
runReaderT (processScheduledEvent logEnv pgpool scheduledEvent StandAloneEvent) $
(logger, httpMgr)
removeEventFromLockedEvents id' lockedStandAloneEvents

View File

@ -411,7 +411,7 @@ asyncActionsProcessor env cacheRef pgPool httpManager = forever $ do
either mempty return res
callHandler :: ActionCache -> ActionLogItem -> m ()
callHandler actionCache actionLogItem = Tracing.runTraceT "async actions processor" do
callHandler actionCache actionLogItem = Tracing.runTraceT "Async actions processor" do
let ActionLogItem actionId actionName reqHeaders
sessionVariables inputPayload = actionLogItem
case Map.lookup actionName actionCache of

View File

@ -165,12 +165,12 @@ runHasuraGQ
runHasuraGQ reqId (query, queryParsed) userInfo resolvedOp = do
(E.ExecutionCtx logger _ pgExecCtx _ _ _ _ _) <- ask
(telemTimeIO, respE) <- withElapsedTime $ runExceptT $ case resolvedOp of
E.ExOpQuery tx genSql asts -> trace "pg" $ do
E.ExOpQuery tx genSql asts -> trace "Query" $ do
-- log the generated SQL and the graphql query
logQueryLog logger query genSql reqId
Tracing.interpTraceT id $ executeQuery queryParsed asts genSql pgExecCtx Q.ReadOnly tx
E.ExOpMutation respHeaders tx -> trace "pg" $ do
E.ExOpMutation respHeaders tx -> trace "Mutation" $ do
logQueryLog logger query Nothing reqId
ctx <- Tracing.currentContext
(respHeaders,) <$> Tracing.interpTraceT (runLazyTx pgExecCtx Q.ReadWrite . withTraceContext ctx . withUserInfo userInfo) tx

View File

@ -356,11 +356,11 @@ onStart env serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
-> E.ExecOp (Tracing.TraceT (LazyTx QErr))
-> ExceptT () m ()
runHasuraGQ timerTot telemCacheHit reqId query queryParsed userInfo = \case
E.ExOpQuery opTx genSql asts -> Tracing.trace "pg" $
E.ExOpQuery opTx genSql asts -> Tracing.trace "Query" $
execQueryOrMut Telem.Query genSql . fmap snd $
Tracing.interpTraceT id $ executeQuery queryParsed asts genSql pgExecCtx Q.ReadOnly opTx
-- Response headers discarded over websockets
E.ExOpMutation _ opTx -> Tracing.trace "pg" do
E.ExOpMutation _ opTx -> Tracing.trace "Mutation" do
execQueryOrMut Telem.Mutation Nothing $
Tracing.interpTraceT (runLazyTx pgExecCtx Q.ReadWrite . withUserInfo userInfo) opTx
E.ExOpSubs lqOp -> do
@ -517,7 +517,7 @@ onMessage
-> AuthMode
-> WSServerEnv
-> WSConn -> BL.ByteString -> m ()
onMessage env authMode serverEnv wsConn msgRaw = Tracing.runTraceT "websocket" do
onMessage env authMode serverEnv wsConn msgRaw = Tracing.runTraceT "Websocket" do
case J.eitherDecode msgRaw of
Left e -> do
let err = ConnErrMsg $ "parsing ClientMessage failed: " <> T.pack e

View File

@ -158,7 +158,7 @@ fetchEvent :: EventId -> Q.TxE QErr (EventId, Bool)
fetchEvent eid = do
events <- Q.listQE defaultTxErrorHandler
[Q.sql|
SELECT l.id, l.locked
SELECT l.id, l.locked IS NOT NULL AND l.locked >= NOW() - interval '30 minute'
FROM hdb_catalog.event_log l
JOIN hdb_catalog.event_triggers e
ON l.trigger_name = e.name

View File

@ -164,7 +164,7 @@ setupAuthMode mAdminSecretHash mWebHook mJwtSecret mUnAuthRole httpManager logge
-- header), do not start a background thread for refreshing the JWK
getJwkFromUrl url = do
ref <- liftIO $ newIORef $ JWKSet []
maybeExpiry <- withJwkError $ Tracing.runTraceT "jwk init" $ updateJwkRef logger httpManager url ref
maybeExpiry <- withJwkError $ Tracing.runTraceT "JWK init" $ updateJwkRef logger httpManager url ref
case maybeExpiry of
Nothing -> return ref
Just time -> do

View File

@ -136,7 +136,7 @@ jwkRefreshCtrl
-> m void
jwkRefreshCtrl logger manager url ref time = do
liftIO $ C.sleep time
forever $ Tracing.runTraceT "jwk refresh" do
forever $ Tracing.runTraceT "JWK refresh" do
res <- runExceptT $ updateJwkRef logger manager url ref
mTime <- either (const $ logNotice >> return Nothing) return res
-- if can't parse time from header, defaults to 1 min

View File

@ -1 +1 @@
37
38

View File

@ -302,7 +302,7 @@ CREATE TABLE hdb_catalog.event_log
error BOOLEAN NOT NULL DEFAULT FALSE,
tries INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMP DEFAULT NOW(),
locked BOOLEAN NOT NULL DEFAULT FALSE,
locked TIMESTAMP,
next_retry_at TIMESTAMP,
archived BOOLEAN NOT NULL DEFAULT FALSE
);

View File

@ -0,0 +1,3 @@
ALTER TABLE hdb_catalog.event_log ALTER COLUMN locked DROP DEFAULT;
ALTER TABLE hdb_catalog.event_log ALTER COLUMN locked DROP NOT NULL;
ALTER TABLE hdb_catalog.event_log ALTER COLUMN locked TYPE TIMESTAMP USING CASE WHEN locked THEN NOW() ELSE NULL END;

View File

@ -0,0 +1,3 @@
ALTER TABLE hdb_catalog.event_log ALTER COLUMN locked TYPE BOOLEAN USING locked IS NOT NULL;
ALTER TABLE hdb_catalog.event_log ALTER COLUMN locked SET NOT NULL;
ALTER TABLE hdb_catalog.event_log ALTER COLUMN locked SET DEFAULT false;