2021-09-29 11:13:30 +03:00
|
|
|
{-# LANGUAGE PatternSynonyms #-}
|
2022-03-16 03:39:21 +03:00
|
|
|
{-# LANGUAGE TemplateHaskell #-}
|
2021-09-29 11:13:30 +03:00
|
|
|
|
2020-05-13 15:33:16 +03:00
|
|
|
-- |
|
|
|
|
-- = Event Triggers
|
2021-09-24 01:56:37 +03:00
|
|
|
--
|
2020-05-13 15:33:16 +03:00
|
|
|
-- Event triggers are like ordinary SQL triggers, except instead of calling a SQL
|
|
|
|
-- procedure, they call a webhook. The event delivery mechanism involves coordination
|
|
|
|
-- between both the database and graphql-engine: only the SQL database knows
|
|
|
|
-- when the events should fire, but only graphql-engine know how to actually
|
|
|
|
-- deliver them.
|
2021-09-24 01:56:37 +03:00
|
|
|
--
|
2020-05-13 15:33:16 +03:00
|
|
|
-- Therefore, event triggers are implemented in two parts:
|
2021-09-24 01:56:37 +03:00
|
|
|
--
|
2020-05-13 15:33:16 +03:00
|
|
|
-- 1. Every event trigger is backed by a bona fide SQL trigger. When the SQL trigger
|
|
|
|
-- fires, it creates a new record in the hdb_catalog.event_log table.
|
2021-09-24 01:56:37 +03:00
|
|
|
--
|
2020-05-13 15:33:16 +03:00
|
|
|
-- 2. Concurrently, a thread in graphql-engine monitors the hdb_catalog.event_log
|
|
|
|
-- table for new events. When new event(s) are found, it uses the information
|
|
|
|
-- (URL,payload and headers) stored in the event to deliver the event
|
|
|
|
-- to the webhook.
|
2021-09-24 01:56:37 +03:00
|
|
|
--
|
2020-05-13 15:33:16 +03:00
|
|
|
-- The creation and deletion of SQL trigger itself is managed by the metadata DDL
|
|
|
|
-- APIs (see Hasura.RQL.DDL.EventTrigger), so this module focuses on event delivery.
|
2021-09-24 01:56:37 +03:00
|
|
|
--
|
2020-05-13 15:33:16 +03:00
|
|
|
-- Most of the subtleties involve guaranteeing reliable delivery of events:
|
|
|
|
-- we guarantee that every event will be delivered at least once,
|
|
|
|
-- even if graphql-engine crashes. This means we have to record the state
|
|
|
|
-- of each event in the database, and we have to retry
|
|
|
|
-- failed requests at a regular (user-configurable) interval.
|
|
|
|
module Hasura.Eventing.EventTrigger
|
2018-09-05 14:26:46 +03:00
|
|
|
( initEventEngineCtx,
|
2023-01-30 09:06:45 +03:00
|
|
|
createFetchedEventsStatsLogger,
|
|
|
|
closeFetchedEventsStatsLogger,
|
2020-03-11 09:27:31 +03:00
|
|
|
processEventQueue,
|
2022-08-17 04:07:44 +03:00
|
|
|
defaultMaxEventThreads,
|
|
|
|
defaultFetchInterval,
|
2018-09-07 14:51:01 +03:00
|
|
|
Event (..),
|
2020-04-14 09:01:50 +03:00
|
|
|
EventEngineCtx (..),
|
2021-09-20 10:34:59 +03:00
|
|
|
-- Exported for testing
|
|
|
|
saveLockedEventTriggerEvents,
|
|
|
|
removeEventTriggerEventFromLockedEvents,
|
2022-09-15 14:45:14 +03:00
|
|
|
logQErr,
|
2018-09-05 14:26:46 +03:00
|
|
|
)
|
|
|
|
where
|
2021-09-24 01:56:37 +03:00
|
|
|
|
2021-09-20 10:34:59 +03:00
|
|
|
import Control.Concurrent.Async.Lifted.Safe qualified as LA
|
|
|
|
import Control.Concurrent.Extended (Forever (..), sleep)
|
2018-09-05 14:26:46 +03:00
|
|
|
import Control.Concurrent.STM.TVar
|
2023-01-30 09:06:45 +03:00
|
|
|
import Control.FoldDebounce qualified as FDebounce
|
2021-12-07 01:39:29 +03:00
|
|
|
import Control.Lens
|
2021-09-20 10:34:59 +03:00
|
|
|
import Control.Monad.Catch (MonadMask, bracket_, finally, mask_)
|
2020-03-05 20:59:26 +03:00
|
|
|
import Control.Monad.STM
|
2021-09-20 10:34:59 +03:00
|
|
|
import Control.Monad.Trans.Control (MonadBaseControl)
|
2021-09-29 11:13:30 +03:00
|
|
|
import Data.Aeson qualified as J
|
2023-01-30 09:06:45 +03:00
|
|
|
import Data.Aeson.Key qualified as Key
|
|
|
|
import Data.Aeson.KeyMap qualified as KeyMap
|
Rewrite `Tracing` to allow for only one `TraceT` in the entire stack.
This PR is on top of #7789.
### Description
This PR entirely rewrites the API of the Tracing library, to make `interpTraceT` a thing of the past. Before this change, we ran traces by sticking a `TraceT` on top of whatever we were doing. This had several major drawbacks:
- we were carrying a bunch of `TraceT` across the codebase, and the entire codebase had to know about it
- we needed to carry a second class constraint around (`HasReporterM`) to be able to run all of those traces
- we kept having to do stack rewriting with `interpTraceT`, which went from inconvenient to horrible
- we had to declare several behavioral instances on `TraceT m`
This PR rewrite all of `Tracing` using a more conventional model: there is ONE `TraceT` at the bottom of the stack, and there is an associated class constraint `MonadTrace`: any part of the code that happens to satisfy `MonadTrace` is able to create new traces. We NEVER have to do stack rewriting, `interpTraceT` is gone, and `TraceT` and `Reporter` become implementation details that 99% of the code is blissfully unaware of: code that needs to do tracing only needs to declare that the monad in which it operates implements `MonadTrace`.
In doing so, this PR revealed **several bugs in the codebase**: places where we were expecting to trace something, but due to the default instance of `HasReporterM IO` we would actually not do anything. This PR also splits the code of `Tracing` in more byte-sized modules, with the goal of potentially moving to `server/lib` down the line.
### Remaining work
This PR is a draft; what's left to do is:
- [x] make Pro compile; i haven't updated `HasuraPro/Main` yet
- [x] document Tracing by writing a note that explains how to use the library, and the meaning of "reporter", "trace" and "span", as well as the pitfalls
- [x] discuss some of the trade-offs in the implementation, which is why i'm opening this PR already despite it not fully building yet
- [x] it depends on #7789 being merged first
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/7791
GitOrigin-RevId: cadd32d039134c93ddbf364599a2f4dd988adea8
2023-03-13 20:37:16 +03:00
|
|
|
import Data.Aeson.Lens qualified as JL
|
2018-09-05 14:26:46 +03:00
|
|
|
import Data.Aeson.TH
|
2021-09-20 10:34:59 +03:00
|
|
|
import Data.Has
|
2018-09-05 14:26:46 +03:00
|
|
|
import Data.HashMap.Strict qualified as M
|
2022-06-17 12:56:38 +03:00
|
|
|
import Data.SerializableBlob qualified as SB
|
2021-09-20 10:34:59 +03:00
|
|
|
import Data.Set qualified as Set
|
2020-03-05 20:59:26 +03:00
|
|
|
import Data.String
|
2021-09-20 10:34:59 +03:00
|
|
|
import Data.Text qualified as T
|
2020-10-21 19:35:06 +03:00
|
|
|
import Data.Text.Extended
|
2020-10-29 03:04:21 +03:00
|
|
|
import Data.Text.NonEmpty
|
2018-09-24 14:50:11 +03:00
|
|
|
import Data.Time.Clock
|
2021-09-20 10:34:59 +03:00
|
|
|
import Data.Time.Clock qualified as Time
|
|
|
|
import Hasura.Backends.Postgres.SQL.Types hiding (TableName)
|
2021-05-11 18:18:31 +03:00
|
|
|
import Hasura.Base.Error
|
2020-07-02 14:57:09 +03:00
|
|
|
import Hasura.Eventing.Common
|
2020-07-14 22:00:58 +03:00
|
|
|
import Hasura.Eventing.HTTP
|
2021-09-20 16:14:28 +03:00
|
|
|
import Hasura.HTTP (getHTTPExceptionStatus)
|
2021-09-20 10:34:59 +03:00
|
|
|
import Hasura.Logging qualified as L
|
2020-10-27 16:53:49 +03:00
|
|
|
import Hasura.Prelude
|
2018-11-23 16:02:46 +03:00
|
|
|
import Hasura.RQL.DDL.Headers
|
2022-03-08 03:42:06 +03:00
|
|
|
import Hasura.RQL.DDL.Webhook.Transform
|
2022-04-27 16:57:28 +03:00
|
|
|
import Hasura.RQL.Types.Backend
|
|
|
|
import Hasura.RQL.Types.Common
|
|
|
|
import Hasura.RQL.Types.EventTrigger
|
2021-09-20 10:34:59 +03:00
|
|
|
import Hasura.RQL.Types.Eventing.Backend
|
2022-04-27 16:57:28 +03:00
|
|
|
import Hasura.RQL.Types.SchemaCache
|
|
|
|
import Hasura.RQL.Types.Source
|
2021-09-20 10:34:59 +03:00
|
|
|
import Hasura.SQL.AnyBackend qualified as AB
|
2022-04-27 16:57:28 +03:00
|
|
|
import Hasura.SQL.Backend
|
2021-09-20 10:34:59 +03:00
|
|
|
import Hasura.Server.Metrics (ServerMetrics (..))
|
2022-08-15 08:32:55 +03:00
|
|
|
import Hasura.Server.Prometheus (EventTriggerMetrics (..))
|
2021-04-21 13:55:18 +03:00
|
|
|
import Hasura.Server.Types
|
2021-09-20 10:34:59 +03:00
|
|
|
import Hasura.Tracing qualified as Tracing
|
2021-12-07 01:39:29 +03:00
|
|
|
import Network.HTTP.Client.Transformable qualified as HTTP
|
2022-09-21 21:01:48 +03:00
|
|
|
import Refined (NonNegative, Positive, Refined, refineTH, unrefine)
|
2021-09-20 10:34:59 +03:00
|
|
|
import System.Metrics.Distribution qualified as EKG.Distribution
|
|
|
|
import System.Metrics.Gauge qualified as EKG.Gauge
|
2022-12-28 06:47:42 +03:00
|
|
|
import System.Metrics.Prometheus.Counter qualified as Prometheus.Counter
|
2022-08-15 08:32:55 +03:00
|
|
|
import System.Metrics.Prometheus.Gauge qualified as Prometheus.Gauge
|
|
|
|
import System.Metrics.Prometheus.Histogram qualified as Prometheus.Histogram
|
2021-05-31 16:54:08 +03:00
|
|
|
|
2018-09-05 14:26:46 +03:00
|
|
|
newtype EventInternalErr
|
|
|
|
= EventInternalErr QErr
|
2022-07-01 14:47:20 +03:00
|
|
|
deriving (Eq)
|
2018-09-05 14:26:46 +03:00
|
|
|
|
2019-11-26 15:14:21 +03:00
|
|
|
instance L.ToEngineLog EventInternalErr L.Hasura where
|
2021-09-29 11:13:30 +03:00
|
|
|
toEngineLog (EventInternalErr qerr) = (L.LevelError, L.eventTriggerLogType, J.toJSON qerr)
|
2018-09-05 14:26:46 +03:00
|
|
|
|
2021-04-21 13:55:18 +03:00
|
|
|
{- Note [Maintenance mode]
|
|
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
|
|
|
|
Maintenance mode is a mode in which users can upgrade their graphql-engine
|
|
|
|
without any down time. More on maintenance mode can be found here:
|
|
|
|
https://github.com/hasura/graphql-engine-mono/issues/431.
|
|
|
|
|
|
|
|
Basically, there are a few main things that maintenance mode boils down to:
|
|
|
|
|
|
|
|
1. No operation that may change the metadata will be allowed.
|
|
|
|
2. Migrations are not applied when the graphql-engine is started, so the
|
|
|
|
catalog schema will be in the older version.
|
|
|
|
3. Event triggers should continue working in the new code with the older
|
|
|
|
catalog schema i.e it should work even if there are any schema changes
|
|
|
|
to the `hdb_catalog.event_log` table.
|
|
|
|
|
|
|
|
#1 and #2 are fairly self-explanatory. For #3, we need to support fetching
|
|
|
|
events depending upon the catalog version. So, fetch events works in the
|
|
|
|
following way now:
|
|
|
|
|
|
|
|
1. Check if maintenance mode is enabled
|
|
|
|
2. If maintenance mode is enabled then read the catalog version from the DB
|
|
|
|
and accordingly fire the appropriate query to the events log table.
|
|
|
|
When maintenance mode is disabled, we query the events log table according
|
|
|
|
to the latest catalog, we do not read the catalog version for this.
|
|
|
|
-}
|
|
|
|
|
|
|
|
-- | See Note [Maintenance Mode]
|
2020-05-13 15:33:16 +03:00
|
|
|
data EventEngineCtx = EventEngineCtx
|
|
|
|
{ _eeCtxEventThreadsCapacity :: TVar Int,
|
|
|
|
_eeCtxFetchInterval :: DiffTime,
|
2022-09-21 21:01:48 +03:00
|
|
|
_eeCtxFetchSize :: Refined NonNegative Int
|
2020-05-13 15:33:16 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
data DeliveryInfo = DeliveryInfo
|
|
|
|
{ diCurrentRetry :: Int,
|
|
|
|
diMaxRetries :: Int
|
|
|
|
}
|
|
|
|
deriving (Show, Eq)
|
|
|
|
|
2021-01-19 22:14:42 +03:00
|
|
|
$(deriveJSON hasuraJSON {omitNothingFields = True} ''DeliveryInfo)
|
2020-05-13 15:33:16 +03:00
|
|
|
|
2019-02-07 15:37:28 +03:00
|
|
|
newtype QualifiedTableStrict = QualifiedTableStrict
|
|
|
|
{ getQualifiedTable :: QualifiedTable
|
|
|
|
}
|
|
|
|
deriving (Show, Eq)
|
|
|
|
|
2021-09-29 11:13:30 +03:00
|
|
|
instance J.ToJSON QualifiedTableStrict where
|
2019-02-07 15:37:28 +03:00
|
|
|
toJSON (QualifiedTableStrict (QualifiedObject sn tn)) =
|
2021-09-29 11:13:30 +03:00
|
|
|
J.object
|
|
|
|
[ "schema" J..= sn,
|
|
|
|
"name" J..= tn
|
2018-09-05 14:26:46 +03:00
|
|
|
]
|
2021-09-24 01:56:37 +03:00
|
|
|
|
2021-09-20 10:34:59 +03:00
|
|
|
data EventPayload (b :: BackendType) = EventPayload
|
2019-02-07 15:37:28 +03:00
|
|
|
{ epId :: EventId,
|
2021-09-20 10:34:59 +03:00
|
|
|
epTable :: TableName b,
|
2020-05-13 15:33:16 +03:00
|
|
|
epTrigger :: TriggerMetadata,
|
2021-09-29 11:13:30 +03:00
|
|
|
epEvent :: J.Value,
|
2019-02-07 15:37:28 +03:00
|
|
|
epDeliveryInfo :: DeliveryInfo,
|
|
|
|
epCreatedAt :: Time.UTCTime
|
2021-09-20 10:34:59 +03:00
|
|
|
}
|
|
|
|
deriving (Generic)
|
2021-09-24 01:56:37 +03:00
|
|
|
|
2021-09-20 10:34:59 +03:00
|
|
|
deriving instance Backend b => Show (EventPayload b)
|
2021-09-24 01:56:37 +03:00
|
|
|
|
2021-09-20 10:34:59 +03:00
|
|
|
deriving instance Backend b => Eq (EventPayload b)
|
2019-02-07 15:37:28 +03:00
|
|
|
|
2021-09-29 11:13:30 +03:00
|
|
|
instance Backend b => J.ToJSON (EventPayload b) where
|
|
|
|
toJSON = J.genericToJSON hasuraJSON {omitNothingFields = True}
|
2018-09-05 14:26:46 +03:00
|
|
|
|
2022-09-21 21:01:48 +03:00
|
|
|
defaultMaxEventThreads :: Refined Positive Int
|
|
|
|
defaultMaxEventThreads = $$(refineTH 100)
|
2022-08-17 04:07:44 +03:00
|
|
|
|
|
|
|
defaultFetchInterval :: DiffTime
|
|
|
|
defaultFetchInterval = seconds 1
|
|
|
|
|
2022-09-21 21:01:48 +03:00
|
|
|
initEventEngineCtx :: Int -> DiffTime -> Refined NonNegative Int -> STM EventEngineCtx
|
2021-04-27 20:22:54 +03:00
|
|
|
initEventEngineCtx maxT _eeCtxFetchInterval _eeCtxFetchSize = do
|
2020-03-11 09:27:31 +03:00
|
|
|
_eeCtxEventThreadsCapacity <- newTVar maxT
|
|
|
|
return $ EventEngineCtx {..}
|
|
|
|
|
2021-09-20 10:34:59 +03:00
|
|
|
saveLockedEventTriggerEvents :: MonadIO m => SourceName -> [EventId] -> TVar (HashMap SourceName (Set.Set EventId)) -> m ()
|
|
|
|
saveLockedEventTriggerEvents sourceName eventIds lockedEvents =
|
|
|
|
liftIO $
|
|
|
|
atomically $ do
|
|
|
|
lockedEventsVals <- readTVar lockedEvents
|
|
|
|
case M.lookup sourceName lockedEventsVals of
|
|
|
|
Nothing -> writeTVar lockedEvents $! M.singleton sourceName (Set.fromList eventIds)
|
|
|
|
Just _ -> writeTVar lockedEvents $! M.insertWith Set.union sourceName (Set.fromList eventIds) lockedEventsVals
|
2021-09-24 01:56:37 +03:00
|
|
|
|
2021-09-20 10:34:59 +03:00
|
|
|
removeEventTriggerEventFromLockedEvents ::
|
|
|
|
MonadIO m => SourceName -> EventId -> TVar (HashMap SourceName (Set.Set EventId)) -> m ()
|
|
|
|
removeEventTriggerEventFromLockedEvents sourceName eventId lockedEvents =
|
|
|
|
liftIO $
|
|
|
|
atomically $ do
|
|
|
|
lockedEventsVals <- readTVar lockedEvents
|
|
|
|
writeTVar lockedEvents $! M.adjust (Set.delete eventId) sourceName lockedEventsVals
|
|
|
|
|
|
|
|
type BackendEventWithSource = AB.AnyBackend EventWithSource
|
|
|
|
|
|
|
|
type FetchEventArguments = ([BackendEventWithSource], Int, Bool)
|
2021-05-14 12:38:37 +03:00
|
|
|
|
2023-01-30 09:06:45 +03:00
|
|
|
newtype EventsCount = EventsCount {unEventsCount :: Int}
|
|
|
|
deriving (Eq, Show, J.ToJSON, J.FromJSON, Num)
|
|
|
|
|
|
|
|
newtype NumEventsFetchedPerSource = NumEventsFetchedPerSource {unNumEventsFetchedPerSource :: HashMap SourceName EventsCount}
|
|
|
|
deriving (Eq, Show)
|
|
|
|
|
|
|
|
instance J.ToJSON NumEventsFetchedPerSource where
|
|
|
|
toJSON (NumEventsFetchedPerSource m) =
|
|
|
|
J.Object $ KeyMap.fromList $ map ((Key.fromText . sourceNameToText) *** J.toJSON) $ M.toList m
|
|
|
|
|
|
|
|
instance Semigroup NumEventsFetchedPerSource where
|
|
|
|
(NumEventsFetchedPerSource lMap) <> (NumEventsFetchedPerSource rMap) =
|
|
|
|
NumEventsFetchedPerSource $ M.unionWith (+) lMap rMap
|
|
|
|
|
|
|
|
instance Monoid NumEventsFetchedPerSource where
|
|
|
|
mempty = NumEventsFetchedPerSource mempty
|
|
|
|
|
|
|
|
data FetchedEventsStats = FetchedEventsStats
|
|
|
|
{ _fesNumEventsFetched :: NumEventsFetchedPerSource,
|
|
|
|
_fesNumFetches :: Int
|
|
|
|
}
|
|
|
|
deriving (Eq, Show)
|
|
|
|
|
|
|
|
$(deriveToJSON hasuraJSON ''FetchedEventsStats)
|
|
|
|
|
|
|
|
instance L.ToEngineLog FetchedEventsStats L.Hasura where
|
|
|
|
toEngineLog stats =
|
|
|
|
(L.LevelInfo, L.eventTriggerProcessLogType, J.toJSON stats)
|
|
|
|
|
|
|
|
instance Semigroup FetchedEventsStats where
|
|
|
|
(FetchedEventsStats lMap lFetches) <> (FetchedEventsStats rMap rFetches) =
|
|
|
|
FetchedEventsStats (lMap <> rMap) (lFetches + rFetches)
|
|
|
|
|
|
|
|
instance Monoid FetchedEventsStats where
|
|
|
|
mempty = FetchedEventsStats mempty 0
|
|
|
|
|
|
|
|
type FetchedEventsStatsLogger = FDebounce.Trigger FetchedEventsStats FetchedEventsStats
|
|
|
|
|
|
|
|
-- | Logger to accumulate stats of fetched events over a period of time and log once using @'L.Logger L.Hasura'.
|
|
|
|
-- See @'createStatsLogger' for more details.
|
|
|
|
createFetchedEventsStatsLogger :: (MonadIO m) => L.Logger L.Hasura -> m FetchedEventsStatsLogger
|
2023-03-14 07:38:09 +03:00
|
|
|
createFetchedEventsStatsLogger = L.createStatsLogger
|
2023-01-30 09:06:45 +03:00
|
|
|
|
|
|
|
-- | Close the fetched events stats logger.
|
|
|
|
closeFetchedEventsStatsLogger :: (MonadIO m) => L.Logger L.Hasura -> FetchedEventsStatsLogger -> m ()
|
2023-03-14 07:38:09 +03:00
|
|
|
closeFetchedEventsStatsLogger = L.closeStatsLogger L.eventTriggerProcessLogType
|
2023-01-30 09:06:45 +03:00
|
|
|
|
|
|
|
-- | Log statistics of fetched events. See @'logStats' for more details.
|
|
|
|
logFetchedEventsStatistics ::
|
|
|
|
(MonadIO m) =>
|
|
|
|
FetchedEventsStatsLogger ->
|
|
|
|
[BackendEventWithSource] ->
|
|
|
|
m ()
|
|
|
|
logFetchedEventsStatistics logger backendEvents =
|
2023-03-14 07:38:09 +03:00
|
|
|
L.logStats logger (FetchedEventsStats numEventsFetchedPerSource 1)
|
2023-01-30 09:06:45 +03:00
|
|
|
where
|
|
|
|
numEventsFetchedPerSource =
|
|
|
|
let sourceNames = flip map backendEvents $
|
|
|
|
\backendEvent -> AB.dispatchAnyBackend @Backend backendEvent _ewsSourceName
|
|
|
|
in NumEventsFetchedPerSource $ M.fromListWith (+) [(sourceName, 1) | sourceName <- sourceNames]
|
|
|
|
|
2023-02-20 20:41:55 +03:00
|
|
|
{-# ANN processEventQueue ("HLint: ignore Use withAsync" :: String) #-}
|
|
|
|
|
2020-03-11 09:27:31 +03:00
|
|
|
-- | Service events from our in-DB queue.
|
|
|
|
--
|
|
|
|
-- There are a few competing concerns and constraints here; we want to...
|
|
|
|
-- - fetch events in batches for lower DB pressure
|
|
|
|
-- - don't fetch more than N at a time (since that can mean: space leak, less
|
2020-04-14 09:01:50 +03:00
|
|
|
-- effective scale out, possible double sends for events we've checked out
|
2020-03-11 09:27:31 +03:00
|
|
|
-- on exit (TODO clean shutdown procedure))
|
|
|
|
-- - try not to cause webhook workers to stall waiting on DB fetch
|
|
|
|
-- - limit webhook HTTP concurrency per HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE
|
|
|
|
processEventQueue ::
|
2021-05-14 12:38:37 +03:00
|
|
|
forall m.
|
2021-10-13 19:38:56 +03:00
|
|
|
( MonadIO m,
|
2020-07-14 22:00:58 +03:00
|
|
|
MonadBaseControl IO m,
|
|
|
|
LA.Forall (LA.Pure m),
|
Rewrite `Tracing` to allow for only one `TraceT` in the entire stack.
This PR is on top of #7789.
### Description
This PR entirely rewrites the API of the Tracing library, to make `interpTraceT` a thing of the past. Before this change, we ran traces by sticking a `TraceT` on top of whatever we were doing. This had several major drawbacks:
- we were carrying a bunch of `TraceT` across the codebase, and the entire codebase had to know about it
- we needed to carry a second class constraint around (`HasReporterM`) to be able to run all of those traces
- we kept having to do stack rewriting with `interpTraceT`, which went from inconvenient to horrible
- we had to declare several behavioral instances on `TraceT m`
This PR rewrite all of `Tracing` using a more conventional model: there is ONE `TraceT` at the bottom of the stack, and there is an associated class constraint `MonadTrace`: any part of the code that happens to satisfy `MonadTrace` is able to create new traces. We NEVER have to do stack rewriting, `interpTraceT` is gone, and `TraceT` and `Reporter` become implementation details that 99% of the code is blissfully unaware of: code that needs to do tracing only needs to declare that the monad in which it operates implements `MonadTrace`.
In doing so, this PR revealed **several bugs in the codebase**: places where we were expecting to trace something, but due to the default instance of `HasReporterM IO` we would actually not do anything. This PR also splits the code of `Tracing` in more byte-sized modules, with the goal of potentially moving to `server/lib` down the line.
### Remaining work
This PR is a draft; what's left to do is:
- [x] make Pro compile; i haven't updated `HasuraPro/Main` yet
- [x] document Tracing by writing a note that explains how to use the library, and the meaning of "reporter", "trace" and "span", as well as the pitfalls
- [x] discuss some of the trade-offs in the implementation, which is why i'm opening this PR already despite it not fully building yet
- [x] it depends on #7789 being merged first
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/7791
GitOrigin-RevId: cadd32d039134c93ddbf364599a2f4dd988adea8
2023-03-13 20:37:16 +03:00
|
|
|
MonadMask m,
|
|
|
|
Tracing.MonadTrace m
|
2020-07-14 22:00:58 +03:00
|
|
|
) =>
|
|
|
|
L.Logger L.Hasura ->
|
2023-01-30 09:06:45 +03:00
|
|
|
FetchedEventsStatsLogger ->
|
2020-07-14 22:00:58 +03:00
|
|
|
HTTP.Manager ->
|
|
|
|
IO SchemaCache ->
|
|
|
|
EventEngineCtx ->
|
|
|
|
LockedEventsCtx ->
|
Server: Add Event Trigger Metrics
- [x] **Event Triggers Metrics**
- [x] Distribution of size of event trigger fetches / Number of events fetched in the last `event trigger fetch`
- [x] Event Triggers: Number of event trigger HTTP workers in process
- [x] Event Triggers: Avg event trigger lock time (if an event has been fetched but not processed because http worker is not free)
#### Sample response
The metrics can be viewed from the `/dev/ekg` endpoint
```json
{
"num_events_fetched":{
"max":0,
"mean":0,
"count":1,
"min":0,
"variance":null,
"type":"d",
"sum":0
},
"num_event_trigger_http_workers":{
"type":"g",
"val":0
},
"event_lock_time":{
"max":0,
"mean":0,
"count":0,
"min":0,
"variance":0,
"type":"d",
"sum":0
},
```
#### Todo
- [ ] Group similar metrics together (Eg: Group all the metrics related to Event trigger, How do we do it??)
Closes: https://github.com/hasura/graphql-engine-mono/issues/202
GitOrigin-RevId: bada11d871272b04c8a09d006d9d037a8464a472
2021-03-07 08:24:26 +03:00
|
|
|
ServerMetrics ->
|
2022-08-15 08:32:55 +03:00
|
|
|
EventTriggerMetrics ->
|
2022-04-28 23:55:13 +03:00
|
|
|
MaintenanceMode () ->
|
2021-05-14 12:38:37 +03:00
|
|
|
m (Forever m)
|
2023-01-30 09:06:45 +03:00
|
|
|
processEventQueue logger statsLogger httpMgr getSchemaCache EventEngineCtx {..} LockedEventsCtx {leEvents} serverMetrics eventTriggerMetrics maintenanceMode = do
|
2020-03-11 09:27:31 +03:00
|
|
|
events0 <- popEventsBatch
|
2021-05-14 12:38:37 +03:00
|
|
|
return $ Forever (events0, 0, False) go
|
2018-09-05 14:26:46 +03:00
|
|
|
where
|
2022-09-21 21:01:48 +03:00
|
|
|
fetchBatchSize = unrefine _eeCtxFetchSize
|
2021-04-21 13:55:18 +03:00
|
|
|
|
2021-09-20 10:34:59 +03:00
|
|
|
popEventsBatch :: m [BackendEventWithSource]
|
2020-03-11 09:27:31 +03:00
|
|
|
popEventsBatch = do
|
2020-11-12 12:25:48 +03:00
|
|
|
{-
|
|
|
|
SELECT FOR UPDATE .. SKIP LOCKED can throw serialization errors in RepeatableRead: https://stackoverflow.com/a/53289263/1911889
|
|
|
|
We can avoid this safely by running it in ReadCommitted as Postgres will recheck the
|
|
|
|
predicate condition if a row is updated concurrently: https://www.postgresql.org/docs/9.5/transaction-iso.html#XACT-READ-COMMITTED
|
|
|
|
|
|
|
|
Every other action on an event_log row (like post-processing, archival, etc) are single writes (no R-W or W-R)
|
|
|
|
so it is safe to perform them in ReadCommitted as well (the writes will then acquire some serial order).
|
|
|
|
Any serial order of updates to a row will lead to an eventually consistent state as the row will have
|
|
|
|
(delivered=t or error=t or archived=t) after a fixed number of tries (assuming it begins with locked='f').
|
|
|
|
-}
|
2021-09-20 10:34:59 +03:00
|
|
|
allSources <- scSources <$> liftIO getSchemaCache
|
2023-01-30 09:06:45 +03:00
|
|
|
events <- liftIO . fmap concat $
|
2021-06-14 19:08:40 +03:00
|
|
|
-- fetch pending events across all the sources asynchronously
|
2021-09-20 10:34:59 +03:00
|
|
|
LA.forConcurrently (M.toList allSources) \(sourceName, sourceCache) ->
|
2023-01-16 20:19:45 +03:00
|
|
|
AB.dispatchAnyBackend @BackendEventTrigger sourceCache \(SourceInfo _sourceName tableCache _functionCache _customSQLCache sourceConfig _queryTagsConfig _sourceCustomization :: SourceInfo b) -> do
|
2021-09-20 10:34:59 +03:00
|
|
|
let tables = M.elems tableCache
|
2022-03-08 12:05:26 +03:00
|
|
|
triggerMap = _tiEventTriggerInfoMap <$> tables
|
|
|
|
eventTriggerCount = sum (M.size <$> triggerMap)
|
2022-06-17 12:56:38 +03:00
|
|
|
triggerNames = concatMap M.keys triggerMap
|
2021-05-25 09:50:13 +03:00
|
|
|
|
|
|
|
-- only process events for this source if at least one event trigger exists
|
2021-09-20 10:34:59 +03:00
|
|
|
if eventTriggerCount > 0
|
2022-11-16 20:10:59 +03:00
|
|
|
then do
|
|
|
|
eventPollStartTime <- getCurrentTime
|
|
|
|
runExceptT (fetchUndeliveredEvents @b sourceConfig sourceName triggerNames maintenanceMode (FetchBatchSize fetchBatchSize)) >>= \case
|
|
|
|
Right events -> do
|
|
|
|
if (null events)
|
|
|
|
then return []
|
|
|
|
else do
|
|
|
|
eventsFetchedTime <- getCurrentTime -- This is also the poll end time
|
|
|
|
let eventPollTime = realToFrac $ diffUTCTime eventsFetchedTime eventPollStartTime
|
|
|
|
_ <- EKG.Distribution.add (smEventFetchTimePerBatch serverMetrics) eventPollTime
|
|
|
|
Prometheus.Histogram.observe (eventsFetchTimePerBatch eventTriggerMetrics) eventPollTime
|
|
|
|
_ <- EKG.Distribution.add (smNumEventsFetchedPerBatch serverMetrics) (fromIntegral $ length events)
|
|
|
|
saveLockedEventTriggerEvents sourceName (eId <$> events) leEvents
|
|
|
|
return $ map (\event -> AB.mkAnyBackend @b $ EventWithSource event sourceConfig sourceName eventsFetchedTime) events
|
|
|
|
Left err -> do
|
|
|
|
liftIO $ L.unLogger logger $ EventInternalErr err
|
|
|
|
pure []
|
2021-05-25 09:50:13 +03:00
|
|
|
else pure []
|
2020-03-11 09:27:31 +03:00
|
|
|
|
2023-01-30 09:06:45 +03:00
|
|
|
-- Log the statistics of events fetched
|
|
|
|
logFetchedEventsStatistics statsLogger events
|
|
|
|
pure events
|
|
|
|
|
2021-04-29 07:01:06 +03:00
|
|
|
-- !!! CAREFUL !!!
|
|
|
|
-- The logic here in particular is subtle and has been fixed, broken,
|
|
|
|
-- and fixed again in several different ways, several times.
|
|
|
|
-- !!! CAREFUL !!!
|
|
|
|
--
|
2020-03-11 09:27:31 +03:00
|
|
|
-- work on this batch of events while prefetching the next. Recurse after we've forked workers
|
|
|
|
-- for each in the batch, minding the requested pool size.
|
2021-05-14 12:38:37 +03:00
|
|
|
go :: FetchEventArguments -> m FetchEventArguments
|
|
|
|
go (events, !fullFetchCount, !alreadyWarned) = do
|
2020-03-11 09:27:31 +03:00
|
|
|
-- process events ASAP until we've caught up; only then can we sleep
|
2020-07-14 22:00:58 +03:00
|
|
|
when (null events) . liftIO $ sleep _eeCtxFetchInterval
|
2020-03-11 09:27:31 +03:00
|
|
|
|
|
|
|
-- Prefetch next events payload while concurrently working through our current batch.
|
|
|
|
-- NOTE: we probably don't need to prefetch so early, but probably not
|
|
|
|
-- worth the effort for something more fine-tuned
|
2020-07-14 22:00:58 +03:00
|
|
|
eventsNext <- LA.withAsync popEventsBatch $ \eventsNextA -> do
|
2020-03-11 09:27:31 +03:00
|
|
|
-- process approximately in order, minding HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE:
|
2021-04-29 07:01:06 +03:00
|
|
|
forM_ events $ \eventWithSource ->
|
|
|
|
-- NOTE: we implement a logical bracket pattern here with the
|
|
|
|
-- increment and decrement of _eeCtxEventThreadsCapacity which
|
|
|
|
-- depends on not putting anything that can throw in the body here:
|
2021-09-20 10:34:59 +03:00
|
|
|
AB.dispatchAnyBackend @BackendEventTrigger eventWithSource \(eventWithSource' :: EventWithSource b) ->
|
|
|
|
mask_ $ do
|
|
|
|
liftIO $
|
|
|
|
atomically $ do
|
|
|
|
-- block until < HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE threads:
|
|
|
|
capacity <- readTVar _eeCtxEventThreadsCapacity
|
|
|
|
check $ capacity > 0
|
|
|
|
writeTVar _eeCtxEventThreadsCapacity (capacity - 1)
|
|
|
|
-- since there is some capacity in our worker threads, we can launch another:
|
|
|
|
let restoreCapacity =
|
|
|
|
liftIO $
|
|
|
|
atomically $
|
|
|
|
modifyTVar' _eeCtxEventThreadsCapacity (+ 1)
|
|
|
|
t <-
|
|
|
|
LA.async $
|
|
|
|
flip runReaderT (logger, httpMgr) $
|
|
|
|
processEvent eventWithSource'
|
|
|
|
`finally`
|
|
|
|
-- NOTE!: this needs to happen IN THE FORKED THREAD:
|
|
|
|
restoreCapacity
|
|
|
|
LA.link t
|
2021-04-29 07:01:06 +03:00
|
|
|
|
|
|
|
-- return when next batch ready; some 'processEvent' threads may be running.
|
2020-07-14 22:00:58 +03:00
|
|
|
LA.wait eventsNextA
|
2020-03-11 09:27:31 +03:00
|
|
|
|
|
|
|
let lenEvents = length events
|
|
|
|
if
|
|
|
|
| lenEvents == fetchBatchSize -> do
|
2020-04-14 09:01:50 +03:00
|
|
|
-- If we've seen N fetches in a row from the DB come back full (i.e. only limited
|
2020-03-11 09:27:31 +03:00
|
|
|
-- by our LIMIT clause), then we say we're clearly falling behind:
|
|
|
|
let clearlyBehind = fullFetchCount >= 3
|
|
|
|
unless alreadyWarned $
|
|
|
|
when clearlyBehind $
|
|
|
|
L.unLogger logger $
|
|
|
|
L.UnstructuredLog L.LevelWarn $
|
|
|
|
fromString $
|
|
|
|
"Events processor may not be keeping up with events generated in postgres, "
|
|
|
|
<> "or we're working on a backlog of events. Consider increasing "
|
|
|
|
<> "HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE"
|
2021-05-14 12:38:37 +03:00
|
|
|
return (eventsNext, (fullFetchCount + 1), (alreadyWarned || clearlyBehind))
|
2020-03-11 09:27:31 +03:00
|
|
|
| otherwise -> do
|
|
|
|
when (lenEvents /= fetchBatchSize && alreadyWarned) $
|
|
|
|
-- emit as warning in case users are only logging warning severity and saw above
|
|
|
|
L.unLogger logger $
|
|
|
|
L.UnstructuredLog L.LevelWarn $
|
|
|
|
fromString $
|
|
|
|
"It looks like the events processor is keeping up again."
|
2021-05-14 12:38:37 +03:00
|
|
|
return (eventsNext, 0, False)
|
2021-09-24 01:56:37 +03:00
|
|
|
|
Rewrite `Tracing` to allow for only one `TraceT` in the entire stack.
This PR is on top of #7789.
### Description
This PR entirely rewrites the API of the Tracing library, to make `interpTraceT` a thing of the past. Before this change, we ran traces by sticking a `TraceT` on top of whatever we were doing. This had several major drawbacks:
- we were carrying a bunch of `TraceT` across the codebase, and the entire codebase had to know about it
- we needed to carry a second class constraint around (`HasReporterM`) to be able to run all of those traces
- we kept having to do stack rewriting with `interpTraceT`, which went from inconvenient to horrible
- we had to declare several behavioral instances on `TraceT m`
This PR rewrite all of `Tracing` using a more conventional model: there is ONE `TraceT` at the bottom of the stack, and there is an associated class constraint `MonadTrace`: any part of the code that happens to satisfy `MonadTrace` is able to create new traces. We NEVER have to do stack rewriting, `interpTraceT` is gone, and `TraceT` and `Reporter` become implementation details that 99% of the code is blissfully unaware of: code that needs to do tracing only needs to declare that the monad in which it operates implements `MonadTrace`.
In doing so, this PR revealed **several bugs in the codebase**: places where we were expecting to trace something, but due to the default instance of `HasReporterM IO` we would actually not do anything. This PR also splits the code of `Tracing` in more byte-sized modules, with the goal of potentially moving to `server/lib` down the line.
### Remaining work
This PR is a draft; what's left to do is:
- [x] make Pro compile; i haven't updated `HasuraPro/Main` yet
- [x] document Tracing by writing a note that explains how to use the library, and the meaning of "reporter", "trace" and "span", as well as the pitfalls
- [x] discuss some of the trade-offs in the implementation, which is why i'm opening this PR already despite it not fully building yet
- [x] it depends on #7789 being merged first
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/7791
GitOrigin-RevId: cadd32d039134c93ddbf364599a2f4dd988adea8
2023-03-13 20:37:16 +03:00
|
|
|
-- \| Extract a trace context from an event trigger payload.
|
|
|
|
extractEventContext :: forall io. MonadIO io => J.Value -> io (Maybe Tracing.TraceContext)
|
|
|
|
extractEventContext e = do
|
|
|
|
let traceIdMaybe =
|
|
|
|
Tracing.traceIdFromHex . txtToBs
|
|
|
|
=<< e ^? JL.key "trace_context" . JL.key "trace_id" . JL._String
|
|
|
|
for traceIdMaybe $ \traceId -> do
|
|
|
|
freshSpanId <- Tracing.randomSpanId
|
|
|
|
let parentSpanId =
|
|
|
|
Tracing.spanIdFromHex . txtToBs
|
|
|
|
=<< e ^? JL.key "trace_context" . JL.key "span_id" . JL._String
|
|
|
|
samplingState =
|
|
|
|
Tracing.samplingStateFromHeader $
|
|
|
|
e ^? JL.key "trace_context" . JL.key "sampling_state" . JL._String
|
|
|
|
pure $ Tracing.TraceContext traceId freshSpanId parentSpanId samplingState
|
|
|
|
|
2020-03-05 20:59:26 +03:00
|
|
|
processEvent ::
|
2021-09-20 10:34:59 +03:00
|
|
|
forall io r b.
|
2021-10-13 19:38:56 +03:00
|
|
|
( MonadIO io,
|
2020-07-14 22:00:58 +03:00
|
|
|
MonadReader r io,
|
2020-03-05 20:59:26 +03:00
|
|
|
Has HTTP.Manager r,
|
|
|
|
Has (L.Logger L.Hasura) r,
|
Server: Add Event Trigger Metrics
- [x] **Event Triggers Metrics**
- [x] Distribution of size of event trigger fetches / Number of events fetched in the last `event trigger fetch`
- [x] Event Triggers: Number of event trigger HTTP workers in process
- [x] Event Triggers: Avg event trigger lock time (if an event has been fetched but not processed because http worker is not free)
#### Sample response
The metrics can be viewed from the `/dev/ekg` endpoint
```json
{
"num_events_fetched":{
"max":0,
"mean":0,
"count":1,
"min":0,
"variance":null,
"type":"d",
"sum":0
},
"num_event_trigger_http_workers":{
"type":"g",
"val":0
},
"event_lock_time":{
"max":0,
"mean":0,
"count":0,
"min":0,
"variance":0,
"type":"d",
"sum":0
},
```
#### Todo
- [ ] Group similar metrics together (Eg: Group all the metrics related to Event trigger, How do we do it??)
Closes: https://github.com/hasura/graphql-engine-mono/issues/202
GitOrigin-RevId: bada11d871272b04c8a09d006d9d037a8464a472
2021-03-07 08:24:26 +03:00
|
|
|
MonadMask io,
|
Rewrite `Tracing` to allow for only one `TraceT` in the entire stack.
This PR is on top of #7789.
### Description
This PR entirely rewrites the API of the Tracing library, to make `interpTraceT` a thing of the past. Before this change, we ran traces by sticking a `TraceT` on top of whatever we were doing. This had several major drawbacks:
- we were carrying a bunch of `TraceT` across the codebase, and the entire codebase had to know about it
- we needed to carry a second class constraint around (`HasReporterM`) to be able to run all of those traces
- we kept having to do stack rewriting with `interpTraceT`, which went from inconvenient to horrible
- we had to declare several behavioral instances on `TraceT m`
This PR rewrite all of `Tracing` using a more conventional model: there is ONE `TraceT` at the bottom of the stack, and there is an associated class constraint `MonadTrace`: any part of the code that happens to satisfy `MonadTrace` is able to create new traces. We NEVER have to do stack rewriting, `interpTraceT` is gone, and `TraceT` and `Reporter` become implementation details that 99% of the code is blissfully unaware of: code that needs to do tracing only needs to declare that the monad in which it operates implements `MonadTrace`.
In doing so, this PR revealed **several bugs in the codebase**: places where we were expecting to trace something, but due to the default instance of `HasReporterM IO` we would actually not do anything. This PR also splits the code of `Tracing` in more byte-sized modules, with the goal of potentially moving to `server/lib` down the line.
### Remaining work
This PR is a draft; what's left to do is:
- [x] make Pro compile; i haven't updated `HasuraPro/Main` yet
- [x] document Tracing by writing a note that explains how to use the library, and the meaning of "reporter", "trace" and "span", as well as the pitfalls
- [x] discuss some of the trade-offs in the implementation, which is why i'm opening this PR already despite it not fully building yet
- [x] it depends on #7789 being merged first
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/7791
GitOrigin-RevId: cadd32d039134c93ddbf364599a2f4dd988adea8
2023-03-13 20:37:16 +03:00
|
|
|
BackendEventTrigger b,
|
|
|
|
Tracing.MonadTrace io
|
2020-03-05 20:59:26 +03:00
|
|
|
) =>
|
2021-09-20 10:34:59 +03:00
|
|
|
EventWithSource b ->
|
Server: Add Event Trigger Metrics
- [x] **Event Triggers Metrics**
- [x] Distribution of size of event trigger fetches / Number of events fetched in the last `event trigger fetch`
- [x] Event Triggers: Number of event trigger HTTP workers in process
- [x] Event Triggers: Avg event trigger lock time (if an event has been fetched but not processed because http worker is not free)
#### Sample response
The metrics can be viewed from the `/dev/ekg` endpoint
```json
{
"num_events_fetched":{
"max":0,
"mean":0,
"count":1,
"min":0,
"variance":null,
"type":"d",
"sum":0
},
"num_event_trigger_http_workers":{
"type":"g",
"val":0
},
"event_lock_time":{
"max":0,
"mean":0,
"count":0,
"min":0,
"variance":0,
"type":"d",
"sum":0
},
```
#### Todo
- [ ] Group similar metrics together (Eg: Group all the metrics related to Event trigger, How do we do it??)
Closes: https://github.com/hasura/graphql-engine-mono/issues/202
GitOrigin-RevId: bada11d871272b04c8a09d006d9d037a8464a472
2021-03-07 08:24:26 +03:00
|
|
|
io ()
|
2021-09-20 10:34:59 +03:00
|
|
|
processEvent (EventWithSource e sourceConfig sourceName eventFetchedTime) = do
|
2021-05-17 12:49:16 +03:00
|
|
|
-- Track Queue Time of Event (in seconds). See `smEventQueueTime`
|
|
|
|
-- Queue Time = Time when the event was fetched from DB - Time when the event is being processed
|
Server: Add Event Trigger Metrics
- [x] **Event Triggers Metrics**
- [x] Distribution of size of event trigger fetches / Number of events fetched in the last `event trigger fetch`
- [x] Event Triggers: Number of event trigger HTTP workers in process
- [x] Event Triggers: Avg event trigger lock time (if an event has been fetched but not processed because http worker is not free)
#### Sample response
The metrics can be viewed from the `/dev/ekg` endpoint
```json
{
"num_events_fetched":{
"max":0,
"mean":0,
"count":1,
"min":0,
"variance":null,
"type":"d",
"sum":0
},
"num_event_trigger_http_workers":{
"type":"g",
"val":0
},
"event_lock_time":{
"max":0,
"mean":0,
"count":0,
"min":0,
"variance":0,
"type":"d",
"sum":0
},
```
#### Todo
- [ ] Group similar metrics together (Eg: Group all the metrics related to Event trigger, How do we do it??)
Closes: https://github.com/hasura/graphql-engine-mono/issues/202
GitOrigin-RevId: bada11d871272b04c8a09d006d9d037a8464a472
2021-03-07 08:24:26 +03:00
|
|
|
eventProcessTime <- liftIO getCurrentTime
|
2021-05-17 12:49:16 +03:00
|
|
|
let eventQueueTime = realToFrac $ diffUTCTime eventProcessTime eventFetchedTime
|
|
|
|
_ <- liftIO $ EKG.Distribution.add (smEventQueueTime serverMetrics) eventQueueTime
|
2022-08-15 08:32:55 +03:00
|
|
|
liftIO $ Prometheus.Histogram.observe (eventQueueTimeSeconds eventTriggerMetrics) eventQueueTime
|
Server: Add Event Trigger Metrics
- [x] **Event Triggers Metrics**
- [x] Distribution of size of event trigger fetches / Number of events fetched in the last `event trigger fetch`
- [x] Event Triggers: Number of event trigger HTTP workers in process
- [x] Event Triggers: Avg event trigger lock time (if an event has been fetched but not processed because http worker is not free)
#### Sample response
The metrics can be viewed from the `/dev/ekg` endpoint
```json
{
"num_events_fetched":{
"max":0,
"mean":0,
"count":1,
"min":0,
"variance":null,
"type":"d",
"sum":0
},
"num_event_trigger_http_workers":{
"type":"g",
"val":0
},
"event_lock_time":{
"max":0,
"mean":0,
"count":0,
"min":0,
"variance":0,
"type":"d",
"sum":0
},
```
#### Todo
- [ ] Group similar metrics together (Eg: Group all the metrics related to Event trigger, How do we do it??)
Closes: https://github.com/hasura/graphql-engine-mono/issues/202
GitOrigin-RevId: bada11d871272b04c8a09d006d9d037a8464a472
2021-03-07 08:24:26 +03:00
|
|
|
|
2020-03-05 20:59:26 +03:00
|
|
|
cache <- liftIO getSchemaCache
|
2020-09-24 09:46:24 +03:00
|
|
|
|
Rewrite `Tracing` to allow for only one `TraceT` in the entire stack.
This PR is on top of #7789.
### Description
This PR entirely rewrites the API of the Tracing library, to make `interpTraceT` a thing of the past. Before this change, we ran traces by sticking a `TraceT` on top of whatever we were doing. This had several major drawbacks:
- we were carrying a bunch of `TraceT` across the codebase, and the entire codebase had to know about it
- we needed to carry a second class constraint around (`HasReporterM`) to be able to run all of those traces
- we kept having to do stack rewriting with `interpTraceT`, which went from inconvenient to horrible
- we had to declare several behavioral instances on `TraceT m`
This PR rewrite all of `Tracing` using a more conventional model: there is ONE `TraceT` at the bottom of the stack, and there is an associated class constraint `MonadTrace`: any part of the code that happens to satisfy `MonadTrace` is able to create new traces. We NEVER have to do stack rewriting, `interpTraceT` is gone, and `TraceT` and `Reporter` become implementation details that 99% of the code is blissfully unaware of: code that needs to do tracing only needs to declare that the monad in which it operates implements `MonadTrace`.
In doing so, this PR revealed **several bugs in the codebase**: places where we were expecting to trace something, but due to the default instance of `HasReporterM IO` we would actually not do anything. This PR also splits the code of `Tracing` in more byte-sized modules, with the goal of potentially moving to `server/lib` down the line.
### Remaining work
This PR is a draft; what's left to do is:
- [x] make Pro compile; i haven't updated `HasuraPro/Main` yet
- [x] document Tracing by writing a note that explains how to use the library, and the meaning of "reporter", "trace" and "span", as well as the pitfalls
- [x] discuss some of the trade-offs in the implementation, which is why i'm opening this PR already despite it not fully building yet
- [x] it depends on #7789 being merged first
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/7791
GitOrigin-RevId: cadd32d039134c93ddbf364599a2f4dd988adea8
2023-03-13 20:37:16 +03:00
|
|
|
trace <-
|
|
|
|
extractEventContext (eEvent e) <&> \case
|
|
|
|
Nothing -> Tracing.newTrace Tracing.sampleAlways
|
|
|
|
Just ctx -> Tracing.newTraceWith ctx Tracing.sampleAlways
|
2020-09-08 20:14:10 +03:00
|
|
|
let spanName eti = "Event trigger: " <> unNonEmptyText (unTriggerName (etiName eti))
|
2020-09-24 09:46:24 +03:00
|
|
|
|
2022-04-28 23:55:13 +03:00
|
|
|
maintenanceModeVersionEither :: Either QErr (MaintenanceMode MaintenanceModeVersion) <-
|
2021-04-21 13:55:18 +03:00
|
|
|
case maintenanceMode of
|
2022-04-28 23:55:13 +03:00
|
|
|
MaintenanceModeEnabled () -> do
|
2021-09-20 10:34:59 +03:00
|
|
|
runExceptT (getMaintenanceModeVersion @b sourceConfig) <&> \case
|
|
|
|
Left err -> Left err
|
2022-04-28 23:55:13 +03:00
|
|
|
Right maintenanceModeVersion -> Right $ (MaintenanceModeEnabled maintenanceModeVersion)
|
|
|
|
MaintenanceModeDisabled -> return $ Right MaintenanceModeDisabled
|
2021-04-21 13:55:18 +03:00
|
|
|
|
|
|
|
case maintenanceModeVersionEither of
|
|
|
|
Left maintenanceModeVersionErr -> logQErr maintenanceModeVersionErr
|
|
|
|
Right maintenanceModeVersion ->
|
|
|
|
case getEventTriggerInfoFromEvent cache e of
|
|
|
|
Left err -> do
|
|
|
|
-- This rare error can happen in the following known cases:
|
|
|
|
-- i) schema cache is not up-to-date (due to some bug, say during schema syncing across multiple instances)
|
|
|
|
-- ii) the event trigger is dropped when this event was just fetched
|
|
|
|
logQErr $ err500 Unexpected err
|
2021-09-20 10:34:59 +03:00
|
|
|
currentTime <- liftIO getCurrentTime
|
|
|
|
-- For such an event, we unlock the event and retry after a minute
|
|
|
|
runExceptT (setRetry sourceConfig e (addUTCTime 60 currentTime) maintenanceModeVersion)
|
|
|
|
>>= flip onLeft logQErr
|
Rewrite `Tracing` to allow for only one `TraceT` in the entire stack.
This PR is on top of #7789.
### Description
This PR entirely rewrites the API of the Tracing library, to make `interpTraceT` a thing of the past. Before this change, we ran traces by sticking a `TraceT` on top of whatever we were doing. This had several major drawbacks:
- we were carrying a bunch of `TraceT` across the codebase, and the entire codebase had to know about it
- we needed to carry a second class constraint around (`HasReporterM`) to be able to run all of those traces
- we kept having to do stack rewriting with `interpTraceT`, which went from inconvenient to horrible
- we had to declare several behavioral instances on `TraceT m`
This PR rewrite all of `Tracing` using a more conventional model: there is ONE `TraceT` at the bottom of the stack, and there is an associated class constraint `MonadTrace`: any part of the code that happens to satisfy `MonadTrace` is able to create new traces. We NEVER have to do stack rewriting, `interpTraceT` is gone, and `TraceT` and `Reporter` become implementation details that 99% of the code is blissfully unaware of: code that needs to do tracing only needs to declare that the monad in which it operates implements `MonadTrace`.
In doing so, this PR revealed **several bugs in the codebase**: places where we were expecting to trace something, but due to the default instance of `HasReporterM IO` we would actually not do anything. This PR also splits the code of `Tracing` in more byte-sized modules, with the goal of potentially moving to `server/lib` down the line.
### Remaining work
This PR is a draft; what's left to do is:
- [x] make Pro compile; i haven't updated `HasuraPro/Main` yet
- [x] document Tracing by writing a note that explains how to use the library, and the meaning of "reporter", "trace" and "span", as well as the pitfalls
- [x] discuss some of the trade-offs in the implementation, which is why i'm opening this PR already despite it not fully building yet
- [x] it depends on #7789 being merged first
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/7791
GitOrigin-RevId: cadd32d039134c93ddbf364599a2f4dd988adea8
2023-03-13 20:37:16 +03:00
|
|
|
Right eti -> trace (spanName eti) do
|
2022-11-16 20:10:59 +03:00
|
|
|
eventExecutionStartTime <- liftIO getCurrentTime
|
2021-09-20 16:14:28 +03:00
|
|
|
let webhook = wciCachedValue $ etiWebhookInfo eti
|
2021-04-21 13:55:18 +03:00
|
|
|
retryConf = etiRetryConf eti
|
|
|
|
timeoutSeconds = fromMaybe defaultTimeoutSeconds (rcTimeoutSec retryConf)
|
2021-09-16 14:03:01 +03:00
|
|
|
httpTimeout = HTTP.responseTimeoutMicro (timeoutSeconds * 1000000)
|
2022-06-05 23:27:09 +03:00
|
|
|
(headers, logHeaders) = prepareHeaders (etiHeaders eti)
|
2021-04-21 13:55:18 +03:00
|
|
|
ep = createEventPayload retryConf e
|
2021-09-29 11:13:30 +03:00
|
|
|
payload = J.encode $ J.toJSON ep
|
2021-07-05 10:47:45 +03:00
|
|
|
extraLogCtx = ExtraLogContext (epId ep) (Just $ etiName eti)
|
2022-03-08 03:42:06 +03:00
|
|
|
requestTransform = etiRequestTransform eti
|
2022-01-19 07:46:42 +03:00
|
|
|
responseTransform = mkResponseTransform <$> etiResponseTransform eti
|
2021-12-07 01:39:29 +03:00
|
|
|
eitherReqRes <-
|
2021-09-29 11:13:30 +03:00
|
|
|
runExceptT $
|
2022-06-05 23:27:09 +03:00
|
|
|
mkRequest headers httpTimeout payload requestTransform (_envVarValue webhook) >>= \reqDetails -> do
|
2021-12-07 01:39:29 +03:00
|
|
|
let request = extractRequest reqDetails
|
2022-12-28 06:47:42 +03:00
|
|
|
logger' res details = do
|
|
|
|
logHTTPForET res extraLogCtx details (_envVarName webhook) logHeaders
|
|
|
|
liftIO $ do
|
|
|
|
case res of
|
|
|
|
Left _err -> pure ()
|
|
|
|
Right response ->
|
|
|
|
Prometheus.Counter.add
|
|
|
|
(eventTriggerBytesReceived eventTriggerMetrics)
|
|
|
|
(hrsSize response)
|
|
|
|
let RequestDetails {_rdOriginalSize, _rdTransformedSize} = details
|
|
|
|
in Prometheus.Counter.add
|
|
|
|
(eventTriggerBytesSent eventTriggerMetrics)
|
|
|
|
(fromMaybe _rdOriginalSize _rdTransformedSize)
|
2021-12-07 01:39:29 +03:00
|
|
|
-- Event Triggers have a configuration parameter called
|
|
|
|
-- HASURA_GRAPHQL_EVENTS_HTTP_WORKERS, which is used
|
|
|
|
-- to control the concurrency of http delivery.
|
|
|
|
-- This bracket is used to increment and decrement an
|
|
|
|
-- HTTP Worker EKG Gauge for the duration of the
|
|
|
|
-- request invocation
|
|
|
|
resp <-
|
|
|
|
bracket_
|
2022-08-15 08:32:55 +03:00
|
|
|
( do
|
|
|
|
liftIO $ EKG.Gauge.inc $ smNumEventHTTPWorkers serverMetrics
|
|
|
|
liftIO $ Prometheus.Gauge.inc (eventTriggerHTTPWorkers eventTriggerMetrics)
|
|
|
|
)
|
|
|
|
( do
|
|
|
|
liftIO $ EKG.Gauge.dec $ smNumEventHTTPWorkers serverMetrics
|
|
|
|
liftIO $ Prometheus.Gauge.dec (eventTriggerHTTPWorkers eventTriggerMetrics)
|
|
|
|
)
|
2022-03-08 03:42:06 +03:00
|
|
|
(invokeRequest reqDetails responseTransform (_rdSessionVars reqDetails) logger')
|
2021-12-07 01:39:29 +03:00
|
|
|
pure (request, resp)
|
|
|
|
case eitherReqRes of
|
2022-11-16 20:10:59 +03:00
|
|
|
Right (req, resp) -> do
|
2021-12-07 01:39:29 +03:00
|
|
|
let reqBody = fromMaybe J.Null $ view HTTP.body req >>= J.decode @J.Value
|
2022-11-16 20:10:59 +03:00
|
|
|
processSuccess sourceConfig e logHeaders reqBody maintenanceModeVersion resp >>= flip onLeft logQErr
|
|
|
|
eventExecutionFinishTime <- liftIO getCurrentTime
|
2022-11-21 10:47:45 +03:00
|
|
|
let eventWebhookProcessingTime' = realToFrac $ diffUTCTime eventExecutionFinishTime eventExecutionStartTime
|
2022-12-06 18:09:18 +03:00
|
|
|
-- For event_processing_time, the start time is defined as the expected delivery time for an event, i.e.:
|
|
|
|
-- - For event with no retries: created_at time
|
|
|
|
-- - For event with retries: next_retry_at time
|
|
|
|
eventStartTime = fromMaybe (eCreatedAt e) (eRetryAt e)
|
|
|
|
-- The timestamps in the DB are supposed to be UTC time, so the timestamps (`eventExecutionFinishTime` and
|
|
|
|
-- `eventStartTime`) used here in calculation are all UTC time.
|
|
|
|
eventProcessingTime' = realToFrac $ diffUTCTime eventExecutionFinishTime eventStartTime
|
|
|
|
liftIO $ do
|
|
|
|
EKG.Distribution.add (smEventWebhookProcessingTime serverMetrics) eventWebhookProcessingTime'
|
|
|
|
Prometheus.Histogram.observe (eventWebhookProcessingTime eventTriggerMetrics) eventWebhookProcessingTime'
|
|
|
|
EKG.Distribution.add (smEventProcessingTime serverMetrics) eventProcessingTime'
|
|
|
|
Prometheus.Histogram.observe (eventProcessingTime eventTriggerMetrics) eventProcessingTime'
|
2021-12-07 01:39:29 +03:00
|
|
|
Left (HTTPError reqBody err) ->
|
|
|
|
processError @b sourceConfig e retryConf logHeaders reqBody maintenanceModeVersion err >>= flip onLeft logQErr
|
|
|
|
Left (TransformationError _ err) -> do
|
2022-06-17 12:56:38 +03:00
|
|
|
L.unLogger logger $ L.UnstructuredLog L.LevelError (SB.fromLBS $ J.encode err)
|
2021-09-29 11:13:30 +03:00
|
|
|
|
|
|
|
-- Record an Event Error
|
|
|
|
recordError' @b sourceConfig e Nothing PESetError maintenanceModeVersion >>= flip onLeft logQErr
|
2021-04-29 07:01:06 +03:00
|
|
|
-- removing an event from the _eeCtxLockedEvents after the event has been processed:
|
2021-09-20 10:34:59 +03:00
|
|
|
removeEventTriggerEventFromLockedEvents sourceName (eId e) leEvents
|
2020-05-13 15:33:16 +03:00
|
|
|
|
2021-09-20 10:34:59 +03:00
|
|
|
createEventPayload :: RetryConf -> Event b -> EventPayload b
|
2019-02-22 15:25:36 +03:00
|
|
|
createEventPayload retryConf e =
|
|
|
|
EventPayload
|
|
|
|
{ epId = eId e,
|
2021-09-20 10:34:59 +03:00
|
|
|
epTable = eTable e,
|
2019-02-22 15:25:36 +03:00
|
|
|
epTrigger = eTrigger e,
|
|
|
|
epEvent = eEvent e,
|
|
|
|
epDeliveryInfo =
|
|
|
|
DeliveryInfo
|
|
|
|
{ diCurrentRetry = eTries e,
|
|
|
|
diMaxRetries = rcNumRetries retryConf
|
|
|
|
},
|
|
|
|
epCreatedAt = eCreatedAt e
|
|
|
|
}
|
|
|
|
|
|
|
|
processSuccess ::
|
2021-09-20 10:34:59 +03:00
|
|
|
forall b m a.
|
|
|
|
(MonadIO m, BackendEventTrigger b) =>
|
|
|
|
SourceConfig b ->
|
|
|
|
Event b ->
|
2021-04-21 13:55:18 +03:00
|
|
|
[HeaderConf] ->
|
2021-12-07 01:39:29 +03:00
|
|
|
J.Value ->
|
2022-04-28 23:55:13 +03:00
|
|
|
MaintenanceMode MaintenanceModeVersion ->
|
2021-04-21 13:55:18 +03:00
|
|
|
HTTPResp a ->
|
2019-02-22 15:25:36 +03:00
|
|
|
m (Either QErr ())
|
2021-07-02 20:24:49 +03:00
|
|
|
processSuccess sourceConfig e reqHeaders ep maintenanceModeVersion resp = do
|
2019-02-22 15:25:36 +03:00
|
|
|
let respBody = hrsBody resp
|
|
|
|
respHeaders = hrsHeaders resp
|
|
|
|
respStatus = hrsStatus resp
|
2021-12-07 01:39:29 +03:00
|
|
|
eid = eId e
|
|
|
|
invocation = mkInvocation eid ep (Just respStatus) reqHeaders respBody respHeaders
|
2021-09-20 10:34:59 +03:00
|
|
|
recordSuccess @b sourceConfig e invocation maintenanceModeVersion
|
2019-02-22 15:25:36 +03:00
|
|
|
|
|
|
|
processError ::
|
2021-09-20 10:34:59 +03:00
|
|
|
forall b m a.
|
|
|
|
( MonadIO m,
|
|
|
|
BackendEventTrigger b
|
|
|
|
) =>
|
|
|
|
SourceConfig b ->
|
|
|
|
Event b ->
|
2021-04-21 13:55:18 +03:00
|
|
|
RetryConf ->
|
|
|
|
[HeaderConf] ->
|
2021-12-07 01:39:29 +03:00
|
|
|
J.Value ->
|
2022-04-28 23:55:13 +03:00
|
|
|
MaintenanceMode MaintenanceModeVersion ->
|
2021-04-21 13:55:18 +03:00
|
|
|
HTTPErr a ->
|
2019-02-22 15:25:36 +03:00
|
|
|
m (Either QErr ())
|
2021-07-02 20:24:49 +03:00
|
|
|
processError sourceConfig e retryConf reqHeaders ep maintenanceModeVersion err = do
|
2019-02-22 15:25:36 +03:00
|
|
|
let invocation = case err of
|
2021-09-20 16:14:28 +03:00
|
|
|
HClient httpException ->
|
|
|
|
let statusMaybe = getHTTPExceptionStatus httpException
|
2022-06-17 12:56:38 +03:00
|
|
|
in mkInvocation (eId e) ep statusMaybe reqHeaders (SB.fromLBS (J.encode httpException)) []
|
2019-02-22 15:25:36 +03:00
|
|
|
HStatus errResp -> do
|
|
|
|
let respPayload = hrsBody errResp
|
|
|
|
respHeaders = hrsHeaders errResp
|
|
|
|
respStatus = hrsStatus errResp
|
2021-12-07 01:39:29 +03:00
|
|
|
mkInvocation (eId e) ep (Just respStatus) reqHeaders respPayload respHeaders
|
2019-02-22 15:25:36 +03:00
|
|
|
HOther detail -> do
|
2022-06-17 12:56:38 +03:00
|
|
|
let errMsg = SB.fromLBS $ J.encode detail
|
2021-12-07 01:39:29 +03:00
|
|
|
mkInvocation (eId e) ep (Just 500) reqHeaders errMsg []
|
2021-09-20 10:34:59 +03:00
|
|
|
retryOrError <- retryOrSetError e retryConf err
|
|
|
|
recordError @b sourceConfig e invocation retryOrError maintenanceModeVersion
|
2021-04-21 13:55:18 +03:00
|
|
|
|
|
|
|
retryOrSetError ::
|
2021-09-20 10:34:59 +03:00
|
|
|
MonadIO m =>
|
|
|
|
Event b ->
|
2021-04-21 13:55:18 +03:00
|
|
|
RetryConf ->
|
|
|
|
HTTPErr a ->
|
2021-09-20 10:34:59 +03:00
|
|
|
m ProcessEventError
|
|
|
|
retryOrSetError e retryConf err = do
|
2019-02-22 15:25:36 +03:00
|
|
|
let mretryHeader = getRetryAfterHeaderFromError err
|
|
|
|
tries = eTries e
|
2020-01-16 04:56:57 +03:00
|
|
|
mretryHeaderSeconds = mretryHeader >>= parseRetryHeader
|
2019-02-22 15:25:36 +03:00
|
|
|
triesExhausted = tries >= rcNumRetries retryConf
|
|
|
|
noRetryHeader = isNothing mretryHeaderSeconds
|
|
|
|
-- current_try = tries + 1 , allowed_total_tries = rcNumRetries retryConf + 1
|
|
|
|
if triesExhausted && noRetryHeader
|
2021-09-20 10:34:59 +03:00
|
|
|
then pure PESetError
|
2019-02-22 15:25:36 +03:00
|
|
|
else do
|
|
|
|
currentTime <- liftIO getCurrentTime
|
|
|
|
let delay = fromMaybe (rcIntervalSec retryConf) mretryHeaderSeconds
|
|
|
|
diff = fromIntegral delay
|
|
|
|
retryTime = addUTCTime diff currentTime
|
2021-09-20 10:34:59 +03:00
|
|
|
pure $ PESetRetry retryTime
|
2018-09-05 14:26:46 +03:00
|
|
|
where
|
2018-10-26 19:28:03 +03:00
|
|
|
getRetryAfterHeaderFromError (HStatus resp) = getRetryAfterHeaderFromResp resp
|
2019-02-22 15:25:36 +03:00
|
|
|
getRetryAfterHeaderFromError _ = Nothing
|
2018-10-26 19:28:03 +03:00
|
|
|
|
2020-01-16 04:56:57 +03:00
|
|
|
parseRetryHeader = mfilter (> 0) . readMaybe . T.unpack
|
2018-10-26 19:28:03 +03:00
|
|
|
|
2020-05-13 15:33:16 +03:00
|
|
|
mkInvocation ::
|
2021-12-07 01:39:29 +03:00
|
|
|
EventId ->
|
|
|
|
J.Value ->
|
2021-09-20 16:14:28 +03:00
|
|
|
Maybe Int ->
|
2021-09-20 10:34:59 +03:00
|
|
|
[HeaderConf] ->
|
2022-06-17 12:56:38 +03:00
|
|
|
SB.SerializableBlob ->
|
2021-09-20 10:34:59 +03:00
|
|
|
[HeaderConf] ->
|
2020-10-28 19:40:33 +03:00
|
|
|
Invocation 'EventType
|
2021-12-07 01:39:29 +03:00
|
|
|
mkInvocation eid ep statusMaybe reqHeaders respBody respHeaders =
|
2021-09-20 16:14:28 +03:00
|
|
|
let resp =
|
|
|
|
case statusMaybe of
|
|
|
|
Nothing -> mkClientErr respBody
|
|
|
|
Just status ->
|
|
|
|
if status >= 200 && status < 300
|
|
|
|
then mkResp status respBody respHeaders
|
|
|
|
else mkClientErr respBody
|
|
|
|
in Invocation
|
2021-12-07 01:39:29 +03:00
|
|
|
eid
|
2021-09-20 16:14:28 +03:00
|
|
|
statusMaybe
|
2021-12-07 01:39:29 +03:00
|
|
|
(mkWebhookReq ep reqHeaders invocationVersionET)
|
2021-09-20 16:14:28 +03:00
|
|
|
resp
|
2021-09-24 01:56:37 +03:00
|
|
|
|
2019-11-26 15:14:21 +03:00
|
|
|
logQErr :: (MonadReader r m, Has (L.Logger L.Hasura) r, MonadIO m) => QErr -> m ()
|
2019-02-22 15:25:36 +03:00
|
|
|
logQErr err = do
|
2019-11-26 15:14:21 +03:00
|
|
|
logger :: L.Logger L.Hasura <- asks getter
|
|
|
|
L.unLogger logger $ EventInternalErr err
|
2019-02-22 15:25:36 +03:00
|
|
|
|
2021-02-14 09:07:52 +03:00
|
|
|
getEventTriggerInfoFromEvent ::
|
2021-09-20 10:34:59 +03:00
|
|
|
forall b. Backend b => SchemaCache -> Event b -> Either Text (EventTriggerInfo b)
|
2020-09-03 12:30:29 +03:00
|
|
|
getEventTriggerInfoFromEvent sc e = do
|
|
|
|
let table = eTable e
|
2021-09-20 10:34:59 +03:00
|
|
|
mTableInfo = unsafeTableInfo @b (eSource e) table $ scSources sc
|
2020-09-03 13:15:43 +03:00
|
|
|
tableInfo <- onNothing mTableInfo $ Left ("table '" <> table <<> "' not found")
|
2020-09-03 12:30:29 +03:00
|
|
|
let triggerName = tmName $ eTrigger e
|
|
|
|
mEventTriggerInfo = M.lookup triggerName (_tiEventTriggerInfoMap tableInfo)
|
|
|
|
onNothing mEventTriggerInfo $
|
|
|
|
Left
|
|
|
|
( "event trigger '"
|
|
|
|
<> triggerNameToTxt triggerName
|
2020-09-03 13:15:43 +03:00
|
|
|
<> "' on table '"
|
|
|
|
<> table <<> "' not found"
|
|
|
|
)
|