mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-15 01:12:56 +03:00
server: lower the isolation level to ReadCommited while inserting/updating invocation logs and events (#5824)
https://github.com/hasura/graphql-engine/pull/5824
This commit is contained in:
parent
cbe9704a53
commit
43a6290124
@ -246,21 +246,21 @@ processEventQueue logger logenv httpMgr pool getSchemaCache eeCtx@EventEngineCtx
|
||||
=> Event -> io ()
|
||||
processEvent e = do
|
||||
cache <- liftIO getSchemaCache
|
||||
|
||||
|
||||
tracingCtx <- liftIO (Tracing.extractEventContext (eEvent e))
|
||||
let spanName eti = "Event trigger: " <> unNonEmptyText (unTriggerName (etiName eti))
|
||||
runTraceT = maybe
|
||||
Tracing.runTraceT
|
||||
Tracing.runTraceTInContext
|
||||
tracingCtx
|
||||
|
||||
|
||||
case getEventTriggerInfoFromEvent cache e of
|
||||
Left err -> do
|
||||
-- This rare error can happen in the following known cases:
|
||||
-- i) schema cache is not up-to-date (due to some bug, say during schema syncing across multiple instances)
|
||||
-- ii) the event trigger is dropped when this event was just fetched
|
||||
logQErr $ err500 Unexpected err
|
||||
liftIO . runExceptT $ Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ do
|
||||
liftIO . runExceptT $ Q.runTx pool (Q.ReadCommitted, Just Q.ReadWrite) $ do
|
||||
currentTime <- liftIO getCurrentTime
|
||||
-- For such an event, we unlock the event and retry after a minute
|
||||
setRetry e (addUTCTime 60 currentTime)
|
||||
@ -324,7 +324,7 @@ processSuccess pool e decodedHeaders ep resp = do
|
||||
respHeaders = hrsHeaders resp
|
||||
respStatus = hrsStatus resp
|
||||
invocation = mkInvocation ep respStatus decodedHeaders respBody respHeaders
|
||||
liftIO $ runExceptT $ Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ do
|
||||
liftIO $ runExceptT $ Q.runTx pool (Q.ReadCommitted, Just Q.ReadWrite) $ do
|
||||
insertInvocation invocation
|
||||
setSuccess e
|
||||
|
||||
@ -348,7 +348,7 @@ processError pool e retryConf decodedHeaders ep err = do
|
||||
HOther detail -> do
|
||||
let errMsg = (TBS.fromLBS $ encode detail)
|
||||
mkInvocation ep 500 decodedHeaders errMsg []
|
||||
liftIO $ runExceptT $ Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ do
|
||||
liftIO $ runExceptT $ Q.runTx pool (Q.ReadCommitted, Just Q.ReadWrite) $ do
|
||||
insertInvocation invocation
|
||||
retryOrSetError e retryConf err
|
||||
|
||||
@ -421,7 +421,7 @@ fetchEvents limitI =
|
||||
SET locked = NOW()
|
||||
WHERE id IN ( SELECT l.id
|
||||
FROM hdb_catalog.event_log l
|
||||
WHERE l.delivered = 'f' and l.error = 'f'
|
||||
WHERE l.delivered = 'f' and l.error = 'f'
|
||||
and (l.locked IS NULL or l.locked < (NOW() - interval '30 minute'))
|
||||
and (l.next_retry_at is NULL or l.next_retry_at <= now())
|
||||
and l.archived = 'f'
|
||||
|
@ -542,7 +542,7 @@ processError pgpool se decodedHeaders type' reqJson err = do
|
||||
let errMsg = (TBS.fromLBS $ J.encode detail)
|
||||
mkInvocation se 500 decodedHeaders errMsg [] reqJson
|
||||
liftExceptTIO $
|
||||
Q.runTx pgpool (Q.RepeatableRead, Just Q.ReadWrite) $ do
|
||||
Q.runTx pgpool (Q.ReadCommitted, Just Q.ReadWrite) $ do
|
||||
insertInvocation invocation type'
|
||||
retryOrMarkError se err type'
|
||||
|
||||
@ -610,14 +610,14 @@ processSuccess pgpool se decodedHeaders type' reqBodyJson resp = do
|
||||
respStatus = hrsStatus resp
|
||||
invocation = mkInvocation se respStatus decodedHeaders respBody respHeaders reqBodyJson
|
||||
liftExceptTIO $
|
||||
Q.runTx pgpool (Q.RepeatableRead, Just Q.ReadWrite) $ do
|
||||
Q.runTx pgpool (Q.ReadCommitted, Just Q.ReadWrite) $ do
|
||||
insertInvocation invocation type'
|
||||
setScheduledEventStatus (sefId se) SESDelivered type'
|
||||
|
||||
processDead :: (MonadIO m, MonadError QErr m) => Q.PGPool -> ScheduledEventFull -> ScheduledEventType -> m ()
|
||||
processDead pgpool se type' =
|
||||
liftExceptTIO $
|
||||
Q.runTx pgpool (Q.RepeatableRead, Just Q.ReadWrite) $
|
||||
Q.runTx pgpool (Q.ReadCommitted, Just Q.ReadWrite) $
|
||||
setScheduledEventStatus (sefId se) SESDead type'
|
||||
|
||||
setRetry :: ScheduledEventFull -> UTCTime -> ScheduledEventType -> Q.TxE QErr ()
|
||||
|
Loading…
Reference in New Issue
Block a user