EventTrigger: Fix num_event_fetched EKG metric

GitOrigin-RevId: 6ca047edb235f1b30986c08698410ce71f587849
This commit is contained in:
Naveen Naidu 2021-05-17 15:19:16 +05:30 committed by hasura-bot
parent 4fce62d125
commit 14f80c55ff
2 changed files with 15 additions and 15 deletions

View File

@ -243,8 +243,6 @@ processEventQueue
-> m (Forever m)
processEventQueue logger logenv httpMgr getSchemaCache EventEngineCtx{..} LockedEventsCtx{leEvents} serverMetrics maintenanceMode responseLogBehaviour = do
events0 <- popEventsBatch
-- Track number of events fetched in EKG
_ <- liftIO $ EKG.Distribution.add (smNumEventsFetched serverMetrics) (fromIntegral $ length events0)
return $ Forever (events0, 0, False) go
where
fetchBatchSize = getNonNegativeInt _eeCtxFetchSize
@ -283,6 +281,8 @@ processEventQueue logger logenv httpMgr getSchemaCache EventEngineCtx{..} Locked
liftIO $ L.unLogger logger $ EventInternalErr err
return []
Right events -> do
-- Track number of events fetched in EKG
_ <- liftIO $ EKG.Distribution.add (smNumEventsFetchedPerBatch serverMetrics) (fromIntegral $ length events)
-- The time when the events were fetched. This is used to calculate the average lock time of an event.
eventsFetchedTime <- liftIO getCurrentTime
saveLockedEvents (map eId events) leEvents
@ -358,11 +358,11 @@ processEventQueue logger logenv httpMgr getSchemaCache EventEngineCtx{..} Locked
=> EventWithSource ('Postgres 'Vanilla)
-> io ()
processEvent (e, sourceConfig, eventFetchedTime) = do
-- Track Lock Time of Event
-- Lock Time = Time when the event was fetched from DB - Time when the event is being processed
-- Track Queue Time of Event (in seconds). See `smEventQueueTime`
-- Queue Time = Time when the event was fetched from DB - Time when the event is being processed
eventProcessTime <- liftIO getCurrentTime
let eventLockTime = realToFrac $ diffUTCTime eventProcessTime eventFetchedTime
_ <- liftIO $ EKG.Distribution.add (smEventLockTime serverMetrics) eventLockTime
let eventQueueTime = realToFrac $ diffUTCTime eventProcessTime eventFetchedTime
_ <- liftIO $ EKG.Distribution.add (smEventQueueTime serverMetrics) eventQueueTime
cache <- liftIO getSchemaCache

View File

@ -387,18 +387,18 @@ runWithEnv env m = runIdentity $ runExceptT $ runReaderT m env
-- | Collection of various server metrics
data ServerMetrics
= ServerMetrics
{ smWarpThreads :: !EKG.Gauge.Gauge
{ smWarpThreads :: !EKG.Gauge.Gauge
-- ^ Current Number of active Warp threads
, smWebsocketConnections :: !EKG.Gauge.Gauge
, smWebsocketConnections :: !EKG.Gauge.Gauge
-- ^ Current number of active websocket connections
, smActiveSubscriptions :: !EKG.Gauge.Gauge
, smActiveSubscriptions :: !EKG.Gauge.Gauge
-- ^ Current number of active subscriptions
, smNumEventsFetched :: !EKG.Distribution.Distribution
, smNumEventsFetchedPerBatch :: !EKG.Distribution.Distribution
-- ^ Total Number of events fetched from last 'Event Trigger Fetch'
, smNumEventHTTPWorkers :: !EKG.Gauge.Gauge
, smNumEventHTTPWorkers :: !EKG.Gauge.Gauge
-- ^ Current number of Event trigger's HTTP workers in process
, smEventLockTime :: !EKG.Distribution.Distribution
-- ^ Time between the 'Event Trigger Fetch' from DB and the processing of the event
, smEventQueueTime :: !EKG.Distribution.Distribution
-- ^ Time (in seconds) between the 'Event Trigger Fetch' from DB and the processing of the event
}
createServerMetrics :: EKG.Store -> IO ServerMetrics
@ -406,7 +406,7 @@ createServerMetrics store = do
smWarpThreads <- EKG.createGauge "warp_threads" store
smWebsocketConnections <- EKG.createGauge "websocket_connections" store
smActiveSubscriptions <- EKG.createGauge "active_subscriptions" store
smNumEventsFetched <- EKG.createDistribution "num_events_fetched" store
smNumEventsFetchedPerBatch <- EKG.createDistribution "events_fetched_per_batch" store
smNumEventHTTPWorkers <- EKG.createGauge "num_event_trigger_http_workers" store
smEventLockTime <- EKG.createDistribution "event_lock_time" store
smEventQueueTime <- EKG.createDistribution "event_queue_time" store
pure ServerMetrics { .. }