mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-15 01:12:56 +03:00
Merge pull request #5700 from paf31/512
https://github.com/hasura/graphql-engine/pull/5700
This commit is contained in:
commit
a7f3f95103
@ -204,13 +204,7 @@ processEventQueue logger logenv httpMgr pool getSchemaCache eeCtx@EventEngineCtx
|
||||
eventsNext <- LA.withAsync popEventsBatch $ \eventsNextA -> do
|
||||
-- process approximately in order, minding HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE:
|
||||
forM_ events $ \event -> do
|
||||
tracingCtx <- liftIO (Tracing.extractEventContext (eEvent event))
|
||||
let runTraceT = maybe
|
||||
Tracing.runTraceT
|
||||
Tracing.runTraceTInContext
|
||||
tracingCtx
|
||||
t <- processEvent event
|
||||
& runTraceT "process event"
|
||||
& withEventEngineCtx eeCtx
|
||||
& flip runReaderT (logger, httpMgr)
|
||||
& LA.async
|
||||
@ -247,13 +241,20 @@ processEventQueue logger logenv httpMgr pool getSchemaCache eeCtx@EventEngineCtx
|
||||
, MonadReader r io
|
||||
, Has HTTP.Manager r
|
||||
, Has (L.Logger L.Hasura) r
|
||||
, Tracing.MonadTrace io
|
||||
, Tracing.HasReporter io
|
||||
)
|
||||
=> Event -> io ()
|
||||
processEvent e = do
|
||||
cache <- liftIO getSchemaCache
|
||||
let meti = getEventTriggerInfoFromEvent cache e
|
||||
case meti of
|
||||
|
||||
tracingCtx <- liftIO (Tracing.extractEventContext (eEvent e))
|
||||
let spanName eti = "Event trigger: " <> unNonEmptyText (unTriggerName (etiName eti))
|
||||
runTraceT = maybe
|
||||
Tracing.runTraceT
|
||||
Tracing.runTraceTInContext
|
||||
tracingCtx
|
||||
|
||||
case getEventTriggerInfoFromEvent cache e of
|
||||
Left err -> do
|
||||
-- This rare error can happen in the following known cases:
|
||||
-- i) schema cache is not up-to-date (due to some bug, say during schema syncing across multiple instances)
|
||||
@ -264,7 +265,7 @@ processEventQueue logger logenv httpMgr pool getSchemaCache eeCtx@EventEngineCtx
|
||||
-- For such an event, we unlock the event and retry after a minute
|
||||
setRetry e (addUTCTime 60 currentTime)
|
||||
>>= flip onLeft logQErr
|
||||
Right eti -> do
|
||||
Right eti -> runTraceT (spanName eti) do
|
||||
let webhook = T.unpack $ wciCachedValue $ etiWebhookInfo eti
|
||||
retryConf = etiRetryConf eti
|
||||
timeoutSeconds = fromMaybe defaultTimeoutSeconds (rcTimeoutSec retryConf)
|
||||
|
@ -376,7 +376,7 @@ processCronEvents logger logEnv httpMgr pgpool getSC lockedCronEvents = do
|
||||
ctiRetryConf
|
||||
ctiHeaders
|
||||
ctiComment
|
||||
finally <- Tracing.runTraceT "scheduled event" . runExceptT $
|
||||
finally <- runExceptT $
|
||||
runReaderT (processScheduledEvent logEnv pgpool scheduledEvent CronScheduledEvent) (logger, httpMgr)
|
||||
removeEventFromLockedEvents id' lockedCronEvents
|
||||
either logInternalError pure finally
|
||||
@ -431,9 +431,9 @@ processStandAloneEvents env logger logEnv httpMgr pgpool lockedStandAloneEvents
|
||||
retryConf
|
||||
headerInfo'
|
||||
comment
|
||||
finally <- Tracing.runTraceT "scheduled event" . runExceptT $
|
||||
finally <- runExceptT $
|
||||
runReaderT (processScheduledEvent logEnv pgpool scheduledEvent StandAloneEvent) $
|
||||
(logger, httpMgr)
|
||||
(logger, httpMgr)
|
||||
removeEventFromLockedEvents id' lockedStandAloneEvents
|
||||
either logInternalError pure finally
|
||||
|
||||
@ -468,15 +468,14 @@ processScheduledEvent ::
|
||||
, HasVersion
|
||||
, MonadIO m
|
||||
, MonadError QErr m
|
||||
, Tracing.MonadTrace m
|
||||
, Tracing.HasReporter m
|
||||
)
|
||||
=> LogEnvHeaders
|
||||
-> Q.PGPool
|
||||
-> ScheduledEventFull
|
||||
-> ScheduledEventType
|
||||
-> m ()
|
||||
processScheduledEvent
|
||||
logEnv pgpool se@ScheduledEventFull {..} type' = do
|
||||
processScheduledEvent logEnv pgpool se@ScheduledEventFull {..} type' = Tracing.runTraceT traceNote do
|
||||
currentTime <- liftIO getCurrentTime
|
||||
if convertDuration (diffUTCTime currentTime sefScheduledTime)
|
||||
> unNonNegativeDiffTime (strcToleranceSeconds sefRetryConf)
|
||||
@ -499,6 +498,8 @@ processScheduledEvent
|
||||
(processError pgpool se decodedHeaders type' webhookReqBodyJson)
|
||||
(processSuccess pgpool se decodedHeaders type' webhookReqBodyJson)
|
||||
res
|
||||
where
|
||||
traceNote = "Scheduled trigger" <> foldMap ((": " <>) . unNonEmptyText . unTriggerName) sefName
|
||||
|
||||
processError
|
||||
:: (MonadIO m, MonadError QErr m)
|
||||
|
Loading…
Reference in New Issue
Block a user