diff --git a/server/src-lib/Hasura/Eventing/EventTrigger.hs b/server/src-lib/Hasura/Eventing/EventTrigger.hs index 706f58b7285..49c0d5a93c8 100644 --- a/server/src-lib/Hasura/Eventing/EventTrigger.hs +++ b/server/src-lib/Hasura/Eventing/EventTrigger.hs @@ -246,21 +246,21 @@ processEventQueue logger logenv httpMgr pool getSchemaCache eeCtx@EventEngineCtx => Event -> io () processEvent e = do cache <- liftIO getSchemaCache - + tracingCtx <- liftIO (Tracing.extractEventContext (eEvent e)) let spanName eti = "Event trigger: " <> unNonEmptyText (unTriggerName (etiName eti)) runTraceT = maybe Tracing.runTraceT Tracing.runTraceTInContext tracingCtx - + case getEventTriggerInfoFromEvent cache e of Left err -> do -- This rare error can happen in the following known cases: -- i) schema cache is not up-to-date (due to some bug, say during schema syncing across multiple instances) -- ii) the event trigger is dropped when this event was just fetched logQErr $ err500 Unexpected err - liftIO . runExceptT $ Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ do + liftIO . runExceptT $ Q.runTx pool (Q.ReadCommitted, Just Q.ReadWrite) $ do currentTime <- liftIO getCurrentTime -- For such an event, we unlock the event and retry after a minute setRetry e (addUTCTime 60 currentTime) @@ -324,7 +324,7 @@ processSuccess pool e decodedHeaders ep resp = do respHeaders = hrsHeaders resp respStatus = hrsStatus resp invocation = mkInvocation ep respStatus decodedHeaders respBody respHeaders - liftIO $ runExceptT $ Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ do + liftIO $ runExceptT $ Q.runTx pool (Q.ReadCommitted, Just Q.ReadWrite) $ do insertInvocation invocation setSuccess e @@ -348,7 +348,7 @@ processError pool e retryConf decodedHeaders ep err = do HOther detail -> do let errMsg = (TBS.fromLBS $ encode detail) mkInvocation ep 500 decodedHeaders errMsg [] - liftIO $ runExceptT $ Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ do + liftIO $ runExceptT $ Q.runTx pool (Q.ReadCommitted, Just Q.ReadWrite) $ do insertInvocation invocation retryOrSetError e retryConf err @@ -421,7 +421,7 @@ fetchEvents limitI = SET locked = NOW() WHERE id IN ( SELECT l.id FROM hdb_catalog.event_log l - WHERE l.delivered = 'f' and l.error = 'f' + WHERE l.delivered = 'f' and l.error = 'f' and (l.locked IS NULL or l.locked < (NOW() - interval '30 minute')) and (l.next_retry_at is NULL or l.next_retry_at <= now()) and l.archived = 'f' diff --git a/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs b/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs index 8fcfd54e43f..0136fa03cbc 100644 --- a/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs +++ b/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs @@ -542,7 +542,7 @@ processError pgpool se decodedHeaders type' reqJson err = do let errMsg = (TBS.fromLBS $ J.encode detail) mkInvocation se 500 decodedHeaders errMsg [] reqJson liftExceptTIO $ - Q.runTx pgpool (Q.RepeatableRead, Just Q.ReadWrite) $ do + Q.runTx pgpool (Q.ReadCommitted, Just Q.ReadWrite) $ do insertInvocation invocation type' retryOrMarkError se err type' @@ -610,14 +610,14 @@ processSuccess pgpool se decodedHeaders type' reqBodyJson resp = do respStatus = hrsStatus resp invocation = mkInvocation se respStatus decodedHeaders respBody respHeaders reqBodyJson liftExceptTIO $ - Q.runTx pgpool (Q.RepeatableRead, Just Q.ReadWrite) $ do + Q.runTx pgpool (Q.ReadCommitted, Just Q.ReadWrite) $ do insertInvocation invocation type' setScheduledEventStatus (sefId se) SESDelivered type' processDead :: (MonadIO m, MonadError QErr m) => Q.PGPool -> ScheduledEventFull -> ScheduledEventType -> m () processDead pgpool se type' = liftExceptTIO $ - Q.runTx pgpool (Q.RepeatableRead, Just Q.ReadWrite) $ + Q.runTx pgpool (Q.ReadCommitted, Just Q.ReadWrite) $ setScheduledEventStatus (sefId se) SESDead type' setRetry :: ScheduledEventFull -> UTCTime -> ScheduledEventType -> Q.TxE QErr ()