mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-14 17:02:49 +03:00
Rewrite Tracing
to allow for only one TraceT
in the entire stack.
This PR is on top of #7789. ### Description This PR entirely rewrites the API of the Tracing library, to make `interpTraceT` a thing of the past. Before this change, we ran traces by sticking a `TraceT` on top of whatever we were doing. This had several major drawbacks: - we were carrying a bunch of `TraceT` across the codebase, and the entire codebase had to know about it - we needed to carry a second class constraint around (`HasReporterM`) to be able to run all of those traces - we kept having to do stack rewriting with `interpTraceT`, which went from inconvenient to horrible - we had to declare several behavioral instances on `TraceT m` This PR rewrite all of `Tracing` using a more conventional model: there is ONE `TraceT` at the bottom of the stack, and there is an associated class constraint `MonadTrace`: any part of the code that happens to satisfy `MonadTrace` is able to create new traces. We NEVER have to do stack rewriting, `interpTraceT` is gone, and `TraceT` and `Reporter` become implementation details that 99% of the code is blissfully unaware of: code that needs to do tracing only needs to declare that the monad in which it operates implements `MonadTrace`. In doing so, this PR revealed **several bugs in the codebase**: places where we were expecting to trace something, but due to the default instance of `HasReporterM IO` we would actually not do anything. This PR also splits the code of `Tracing` in more byte-sized modules, with the goal of potentially moving to `server/lib` down the line. ### Remaining work This PR is a draft; what's left to do is: - [x] make Pro compile; i haven't updated `HasuraPro/Main` yet - [x] document Tracing by writing a note that explains how to use the library, and the meaning of "reporter", "trace" and "span", as well as the pitfalls - [x] discuss some of the trade-offs in the implementation, which is why i'm opening this PR already despite it not fully building yet - [x] it depends on #7789 being merged first PR-URL: https://github.com/hasura/graphql-engine-mono/pull/7791 GitOrigin-RevId: cadd32d039134c93ddbf364599a2f4dd988adea8
This commit is contained in:
parent
99a7a89fa3
commit
cf531b05cb
@ -748,6 +748,15 @@ library
|
||||
, Hasura.LogicalModel.Schema
|
||||
, Hasura.LogicalModel.Types
|
||||
|
||||
, Hasura.Tracing
|
||||
, Hasura.Tracing.Class
|
||||
, Hasura.Tracing.Context
|
||||
, Hasura.Tracing.Monad
|
||||
, Hasura.Tracing.Reporter
|
||||
, Hasura.Tracing.Sampling
|
||||
, Hasura.Tracing.TraceId
|
||||
, Hasura.Tracing.Utils
|
||||
|
||||
, Hasura.Server.Auth.WebHook
|
||||
, Hasura.Server.Middleware
|
||||
, Hasura.Server.Cors
|
||||
@ -988,8 +997,6 @@ library
|
||||
, Hasura.SQL.Types
|
||||
, Hasura.SQL.Value
|
||||
, Hasura.SQL.WKT
|
||||
, Hasura.Tracing
|
||||
, Hasura.Tracing.TraceId
|
||||
, Hasura.QueryTags
|
||||
, Network.HTTP.Client.Transformable
|
||||
, Network.HTTP.Client.DynamicTlsPermissions
|
||||
|
@ -374,10 +374,9 @@ runApp serveOptions = do
|
||||
pure (EKG.subset EKG.emptyOf store, serverMetrics)
|
||||
prometheusMetrics <- makeDummyPrometheusMetrics
|
||||
let managedServerCtx = App.initialiseContext env globalCtx serveOptions Nothing serverMetrics prometheusMetrics sampleAlways
|
||||
runManagedT managedServerCtx \(appCtx, appEnv) -> do
|
||||
flip App.runPGMetadataStorageAppT (appCtx, appEnv)
|
||||
. lowerManagedT
|
||||
$ do
|
||||
runManagedT managedServerCtx \(appCtx, appEnv) ->
|
||||
App.runPGMetadataStorageAppT (appCtx, appEnv) $
|
||||
lowerManagedT $
|
||||
App.runHGEServer
|
||||
(const $ pure ())
|
||||
appCtx
|
||||
|
@ -30,7 +30,7 @@ import Hasura.Server.Migrate (downgradeCatalog)
|
||||
import Hasura.Server.Prometheus (makeDummyPrometheusMetrics)
|
||||
import Hasura.Server.Version
|
||||
import Hasura.ShutdownLatch
|
||||
import Hasura.Tracing (sampleAlways)
|
||||
import Hasura.Tracing (ignoreTraceT, sampleAlways)
|
||||
import System.Environment (getEnvironment, lookupEnv, unsetEnv)
|
||||
import System.Exit qualified as Sys
|
||||
import System.Metrics qualified as EKG
|
||||
@ -55,7 +55,7 @@ main = maybeWithGhcDebug $ do
|
||||
clearEnvironment = getEnvironment >>= traverse_ \(v, _) -> unsetEnv v
|
||||
|
||||
runApp :: Env.Environment -> HGEOptions (ServeOptions Hasura) -> IO ()
|
||||
runApp env (HGEOptions rci metadataDbUrl hgeCmd) = do
|
||||
runApp env (HGEOptions rci metadataDbUrl hgeCmd) = ignoreTraceT do
|
||||
initTime <- liftIO getCurrentTime
|
||||
|
||||
case hgeCmd of
|
||||
@ -74,7 +74,7 @@ runApp env (HGEOptions rci metadataDbUrl hgeCmd) = do
|
||||
|
||||
pure (EKG.subset EKG.emptyOf store, serverMetrics)
|
||||
|
||||
prometheusMetrics <- makeDummyPrometheusMetrics
|
||||
prometheusMetrics <- lift makeDummyPrometheusMetrics
|
||||
|
||||
-- It'd be nice if we didn't have to call runManagedT twice here, but
|
||||
-- there is a data dependency problem since the call to runPGMetadataStorageApp
|
||||
@ -93,10 +93,12 @@ runApp env (HGEOptions rci metadataDbUrl hgeCmd) = do
|
||||
let Loggers _ logger _ = appEnvLoggers appEnv
|
||||
|
||||
_idleGCThread <-
|
||||
lift $
|
||||
C.forkImmortal "ourIdleGC" logger $
|
||||
GC.ourIdleGC logger (seconds 0.3) (seconds 10) (seconds 60)
|
||||
|
||||
flip runPGMetadataStorageAppT (appCtx, appEnv) . lowerManagedT $ do
|
||||
runPGMetadataStorageAppT (appCtx, appEnv) $
|
||||
lowerManagedT $
|
||||
runHGEServer (const $ pure ()) appCtx appEnv initTime Nothing ekgStore
|
||||
HCExport -> do
|
||||
GlobalCtx {..} <- initGlobalCtx env metadataDbUrl rci
|
||||
|
@ -13,7 +13,8 @@ module Hasura.App
|
||||
ExitException (ExitException),
|
||||
GlobalCtx (..),
|
||||
AppContext (..),
|
||||
PGMetadataStorageAppT (runPGMetadataStorageAppT),
|
||||
PGMetadataStorageAppT,
|
||||
runPGMetadataStorageAppT,
|
||||
accessDeniedErrMsg,
|
||||
flushLogger,
|
||||
getCatalogStateTx,
|
||||
@ -154,7 +155,7 @@ import Hasura.Server.Version
|
||||
import Hasura.Services
|
||||
import Hasura.Session
|
||||
import Hasura.ShutdownLatch
|
||||
import Hasura.Tracing qualified as Tracing
|
||||
import Hasura.Tracing
|
||||
import Network.HTTP.Client qualified as HTTP
|
||||
import Network.HTTP.Client.Blocklisting (Blocklist)
|
||||
import Network.HTTP.Client.CreateManager (mkHttpManager)
|
||||
@ -279,8 +280,8 @@ initGlobalCtx env metadataDbUrl defaultPgConnInfo = do
|
||||
mkGlobalCtx mdConnInfo (Just (dbUrl, srcConnInfo))
|
||||
|
||||
-- | An application with Postgres database as a metadata storage
|
||||
newtype PGMetadataStorageAppT m a = PGMetadataStorageAppT {runPGMetadataStorageAppT :: (AppContext, AppEnv) -> m a}
|
||||
deriving
|
||||
newtype PGMetadataStorageAppT m a = PGMetadataStorageAppT (ReaderT (AppContext, AppEnv) (TraceT m) a)
|
||||
deriving newtype
|
||||
( Functor,
|
||||
Applicative,
|
||||
Monad,
|
||||
@ -289,20 +290,29 @@ newtype PGMetadataStorageAppT m a = PGMetadataStorageAppT {runPGMetadataStorageA
|
||||
MonadCatch,
|
||||
MonadThrow,
|
||||
MonadMask,
|
||||
HasServerConfigCtx,
|
||||
MonadReader (AppContext, AppEnv),
|
||||
MonadBase b,
|
||||
MonadBaseControl b
|
||||
)
|
||||
via (ReaderT (AppContext, AppEnv) m)
|
||||
deriving
|
||||
( MonadTrans
|
||||
)
|
||||
via (ReaderT (AppContext, AppEnv))
|
||||
|
||||
instance (MonadIO m, MonadBaseControl IO m) => MonadTrace (PGMetadataStorageAppT m) where
|
||||
newTraceWith c p n (PGMetadataStorageAppT a) = PGMetadataStorageAppT $ newTraceWith c p n a
|
||||
newSpanWith i n (PGMetadataStorageAppT a) = PGMetadataStorageAppT $ newSpanWith i n a
|
||||
currentContext = PGMetadataStorageAppT currentContext
|
||||
attachMetadata = PGMetadataStorageAppT . attachMetadata
|
||||
|
||||
instance MonadTrans PGMetadataStorageAppT where
|
||||
lift = PGMetadataStorageAppT . lift . lift
|
||||
|
||||
instance Monad m => ProvidesNetwork (PGMetadataStorageAppT m) where
|
||||
askHTTPManager = appEnvManager <$> asks snd
|
||||
|
||||
instance HasServerConfigCtx m => HasServerConfigCtx (PGMetadataStorageAppT m) where
|
||||
askServerConfigCtx = lift askServerConfigCtx
|
||||
|
||||
runPGMetadataStorageAppT :: (AppContext, AppEnv) -> PGMetadataStorageAppT m a -> m a
|
||||
runPGMetadataStorageAppT c (PGMetadataStorageAppT a) = ignoreTraceT $ runReaderT a c
|
||||
|
||||
resolvePostgresConnInfo ::
|
||||
(MonadIO m) => Env.Environment -> UrlConf -> Maybe Int -> m PG.ConnInfo
|
||||
resolvePostgresConnInfo env dbUrlConf maybeRetries = do
|
||||
@ -314,7 +324,7 @@ resolvePostgresConnInfo env dbUrlConf maybeRetries = do
|
||||
retries = fromMaybe 1 maybeRetries
|
||||
|
||||
initAuthMode ::
|
||||
(C.ForkableMonadIO m, Tracing.HasReporter m) =>
|
||||
(C.ForkableMonadIO m) =>
|
||||
HashSet AdminSecretHash ->
|
||||
Maybe AuthHook ->
|
||||
[JWTConfig] ->
|
||||
@ -337,7 +347,7 @@ initAuthMode adminSecret authHook jwtSecret unAuthRole httpManager logger = do
|
||||
-- forking a dedicated polling thread to dynamically get the latest JWK settings
|
||||
-- set by the user and update the JWK accordingly. This will help in applying the
|
||||
-- updates without restarting HGE.
|
||||
_ <- C.forkImmortal "update JWK" logger $ updateJwkCtx authMode httpManager logger
|
||||
void $ C.forkImmortal "update JWK" logger $ updateJwkCtx authMode httpManager logger
|
||||
return authMode
|
||||
|
||||
initSubscriptionsState ::
|
||||
@ -414,7 +424,7 @@ initialiseContext ::
|
||||
Maybe ES.SubscriptionPostPollHook ->
|
||||
ServerMetrics ->
|
||||
PrometheusMetrics ->
|
||||
Tracing.SamplingPolicy ->
|
||||
SamplingPolicy ->
|
||||
ManagedT m (AppContext, AppEnv)
|
||||
initialiseContext env GlobalCtx {..} serveOptions@ServeOptions {..} liveQueryHook serverMetrics prometheusMetrics traceSamplingPolicy = do
|
||||
instanceId <- liftIO generateInstanceId
|
||||
@ -647,7 +657,7 @@ runHGEServer ::
|
||||
MonadMask m,
|
||||
MonadStateless IO m,
|
||||
LA.Forall (LA.Pure m),
|
||||
UserAuthentication (Tracing.TraceT m),
|
||||
UserAuthentication m,
|
||||
HttpLog m,
|
||||
ConsoleRenderer m,
|
||||
MonadVersionAPIWithExtraData m,
|
||||
@ -657,13 +667,13 @@ runHGEServer ::
|
||||
MonadQueryLog m,
|
||||
WS.MonadWSLog m,
|
||||
MonadExecuteQuery m,
|
||||
Tracing.HasReporter m,
|
||||
HasResourceLimits m,
|
||||
MonadMetadataStorageQueryAPI m,
|
||||
MonadResolveSource m,
|
||||
EB.MonadQueryTags m,
|
||||
MonadEventLogCleanup m,
|
||||
ProvidesHasuraServices m,
|
||||
MonadTrace m,
|
||||
MonadGetApiTimeLimit m
|
||||
) =>
|
||||
(AppContext -> Spock.SpockT m ()) ->
|
||||
@ -738,7 +748,7 @@ mkHGEServer ::
|
||||
MonadMask m,
|
||||
MonadStateless IO m,
|
||||
LA.Forall (LA.Pure m),
|
||||
UserAuthentication (Tracing.TraceT m),
|
||||
UserAuthentication m,
|
||||
HttpLog m,
|
||||
ConsoleRenderer m,
|
||||
MonadVersionAPIWithExtraData m,
|
||||
@ -748,13 +758,13 @@ mkHGEServer ::
|
||||
MonadQueryLog m,
|
||||
WS.MonadWSLog m,
|
||||
MonadExecuteQuery m,
|
||||
Tracing.HasReporter m,
|
||||
HasResourceLimits m,
|
||||
MonadMetadataStorageQueryAPI m,
|
||||
MonadResolveSource m,
|
||||
EB.MonadQueryTags m,
|
||||
MonadEventLogCleanup m,
|
||||
ProvidesHasuraServices m,
|
||||
MonadTrace m,
|
||||
MonadGetApiTimeLimit m
|
||||
) =>
|
||||
(AppContext -> Spock.SpockT m ()) ->
|
||||
@ -1089,8 +1099,6 @@ mkHGEServer setupHook appCtx@AppContext {..} appEnv@AppEnv {..} ekgStore = do
|
||||
(getSchemaCache cacheRef)
|
||||
lockedEventsCtx
|
||||
|
||||
instance (Monad m) => Tracing.HasReporter (PGMetadataStorageAppT m)
|
||||
|
||||
instance (Monad m) => HasResourceLimits (PGMetadataStorageAppT m) where
|
||||
askHTTPHandlerLimit = pure $ ResourceLimits id
|
||||
askGraphqlOperationLimit _ _ _ = pure $ ResourceLimits id
|
||||
@ -1113,10 +1121,10 @@ instance (MonadIO m) => HttpLog (PGMetadataStorageAppT m) where
|
||||
mkHttpAccessLogContext userInfoM loggingSettings reqId waiReq reqBody (BL.length response) compressedResponse qTime cType headers rb batchQueryOpLogs
|
||||
|
||||
instance (Monad m) => MonadExecuteQuery (PGMetadataStorageAppT m) where
|
||||
cacheLookup _ _ _ _ = pure ([], Nothing)
|
||||
cacheStore _ _ _ = pure (Right CacheStoreSkipped)
|
||||
cacheLookup _ _ _ _ = pure $ Right ([], Nothing)
|
||||
cacheStore _ _ _ = pure $ Right (Right CacheStoreSkipped)
|
||||
|
||||
instance (MonadIO m, MonadBaseControl IO m) => UserAuthentication (Tracing.TraceT (PGMetadataStorageAppT m)) where
|
||||
instance (MonadIO m, MonadBaseControl IO m) => UserAuthentication (PGMetadataStorageAppT m) where
|
||||
resolveUserInfo logger manager headers authMode reqs =
|
||||
runExceptT $ do
|
||||
(a, b, c) <- getUserInfoWithExpTime logger manager headers authMode reqs
|
||||
|
@ -4,7 +4,7 @@
|
||||
module Hasura.Backends.DataConnector.Adapter.Metadata () where
|
||||
|
||||
import Control.Arrow.Extended
|
||||
import Control.Monad.Trans.Control (MonadBaseControl)
|
||||
import Control.Monad.Trans.Control
|
||||
import Data.Aeson qualified as J
|
||||
import Data.Aeson.Key qualified as K
|
||||
import Data.Aeson.KeyMap qualified as KM
|
||||
@ -76,6 +76,7 @@ instance BackendMetadata 'DataConnector where
|
||||
supportsBeingRemoteRelationshipTarget = supportsBeingRemoteRelationshipTarget'
|
||||
|
||||
resolveBackendInfo' ::
|
||||
forall arr m.
|
||||
( ArrowChoice arr,
|
||||
Inc.ArrowCache m arr,
|
||||
Inc.ArrowDistribute arr,
|
||||
@ -97,14 +98,6 @@ resolveBackendInfo' logger = proc (invalidationKeys, optionsMap) -> do
|
||||
returnA -< HashMap.catMaybes maybeDataConnectorCapabilities
|
||||
where
|
||||
getDataConnectorCapabilitiesIfNeeded ::
|
||||
forall arr m.
|
||||
( ArrowChoice arr,
|
||||
Inc.ArrowCache m arr,
|
||||
ArrowWriter (Seq (Either InconsistentMetadata MetadataDependency)) arr,
|
||||
MonadIO m,
|
||||
MonadBaseControl IO m,
|
||||
ProvidesNetwork m
|
||||
) =>
|
||||
(Inc.Dependency (Maybe (HashMap DC.DataConnectorName Inc.InvalidationKey)), DC.DataConnectorName, DC.DataConnectorOptions) `arr` Maybe DC.DataConnectorInfo
|
||||
getDataConnectorCapabilitiesIfNeeded = Inc.cache proc (invalidationKeys, dataConnectorName, dataConnectorOptions) -> do
|
||||
let metadataObj = MetadataObject (MODataConnectorAgent dataConnectorName) $ J.toJSON dataConnectorName
|
||||
@ -117,7 +110,6 @@ resolveBackendInfo' logger = proc (invalidationKeys, optionsMap) -> do
|
||||
|) metadataObj
|
||||
|
||||
getDataConnectorCapabilities ::
|
||||
(MonadIO m, MonadBaseControl IO m) =>
|
||||
DC.DataConnectorOptions ->
|
||||
HTTP.Manager ->
|
||||
m (Either QErr DC.DataConnectorInfo)
|
||||
@ -133,7 +125,9 @@ resolveBackendInfo' logger = proc (invalidationKeys, optionsMap) -> do
|
||||
capabilitiesCase defaultAction capabilitiesAction errorAction capabilitiesU
|
||||
|
||||
resolveSourceConfig' ::
|
||||
(MonadIO m, MonadBaseControl IO m) =>
|
||||
( MonadIO m,
|
||||
MonadBaseControl IO m
|
||||
) =>
|
||||
Logger Hasura ->
|
||||
SourceName ->
|
||||
DC.ConnSourceConfig ->
|
||||
|
@ -57,7 +57,7 @@ runDBQuery' ::
|
||||
runDBQuery' requestId query fieldName _userInfo logger SourceConfig {..} action queryRequest _ = do
|
||||
void $ HGL.logQueryLog logger $ mkQueryLog query fieldName queryRequest requestId
|
||||
withElapsedTime
|
||||
. Tracing.trace ("Data Connector backend query for root field " <>> fieldName)
|
||||
. Tracing.newSpan ("Data Connector backend query for root field " <>> fieldName)
|
||||
. flip runAgentClientT (AgentClientContext logger _scEndpoint _scManager _scTimeoutMicroseconds)
|
||||
. runOnBaseMonad
|
||||
$ action
|
||||
@ -108,7 +108,7 @@ runDBMutation' ::
|
||||
runDBMutation' requestId query fieldName _userInfo logger SourceConfig {..} action queryRequest _ = do
|
||||
void $ HGL.logQueryLog logger $ mkQueryLog query fieldName queryRequest requestId
|
||||
withElapsedTime
|
||||
. Tracing.trace ("Data Connector backend mutation for root field " <>> fieldName)
|
||||
. Tracing.newSpan ("Data Connector backend mutation for root field " <>> fieldName)
|
||||
. flip runAgentClientT (AgentClientContext logger _scEndpoint _scManager _scTimeoutMicroseconds)
|
||||
. runOnBaseMonad
|
||||
$ action
|
||||
|
@ -14,7 +14,7 @@ import Hasura.Base.Error
|
||||
import Hasura.HTTP qualified
|
||||
import Hasura.Logging (Hasura, Logger)
|
||||
import Hasura.Prelude
|
||||
import Hasura.Tracing (MonadTrace, tracedHttpRequest)
|
||||
import Hasura.Tracing (MonadTrace, traceHTTPRequest)
|
||||
import Network.HTTP.Client (Manager)
|
||||
import Network.HTTP.Client qualified as HTTP
|
||||
import Network.HTTP.Client.Transformable qualified as TransformableHTTP
|
||||
@ -57,7 +57,8 @@ runRequestAcceptStatus' acceptStatus req = do
|
||||
transformableReq &~ do
|
||||
for _accResponseTimeout \x -> TransformableHTTP.timeout .= HTTP.responseTimeoutMicro x
|
||||
|
||||
(tracedReq, responseOrException) <- tracedHttpRequest transformableReq' (\tracedReq -> fmap (tracedReq,) . liftIO . try @HTTP.HttpException $ TransformableHTTP.performRequest tracedReq _accHttpManager)
|
||||
(tracedReq, responseOrException) <- traceHTTPRequest transformableReq' \tracedReq ->
|
||||
fmap (tracedReq,) . liftIO . try @HTTP.HttpException $ TransformableHTTP.performRequest tracedReq _accHttpManager
|
||||
logAgentRequest _accLogger tracedReq responseOrException
|
||||
case responseOrException of
|
||||
-- throwConnectionError is used here in order to avoid a metadata inconsistency error
|
||||
|
@ -67,8 +67,8 @@ logAgentRequest (Logger writeLog) req responseOrError = do
|
||||
Right response -> Just . statusCode $ responseStatus response
|
||||
Left httpExn -> Hasura.HTTP.getHTTPExceptionStatus $ Hasura.HTTP.HttpException httpExn
|
||||
_aclError = either (Just . Hasura.HTTP.serializeHTTPExceptionMessageForDebugging) (const Nothing) responseOrError
|
||||
_aclTraceId = bsToTxt $ traceIdToHex $ Tracing.tcCurrentTrace traceCtx
|
||||
_aclSpanId = bsToTxt $ spanIdToHex $ Tracing.tcCurrentSpan traceCtx
|
||||
_aclTraceId = maybe "" (bsToTxt . traceIdToHex . Tracing.tcCurrentTrace) traceCtx
|
||||
_aclSpanId = maybe "" (bsToTxt . spanIdToHex . Tracing.tcCurrentSpan) traceCtx
|
||||
writeLog AgentCommunicationLog {..}
|
||||
|
||||
extractRequestLogInfoFromClientRequest :: Request -> RequestLogInfo
|
||||
@ -88,8 +88,8 @@ logClientError (Logger writeLog) clientError = do
|
||||
_ -> Nothing
|
||||
_aclRequest = extractRequestLogInfoFromClientInfo clientError
|
||||
_aclError = Just $ Hasura.HTTP.serializeServantClientErrorMessageForDebugging clientError
|
||||
_aclTraceId = bsToTxt $ traceIdToHex $ Tracing.tcCurrentTrace traceCtx
|
||||
_aclSpanId = bsToTxt $ spanIdToHex $ Tracing.tcCurrentSpan traceCtx
|
||||
_aclTraceId = maybe "" (bsToTxt . traceIdToHex . Tracing.tcCurrentTrace) traceCtx
|
||||
_aclSpanId = maybe "" (bsToTxt . spanIdToHex . Tracing.tcCurrentSpan) traceCtx
|
||||
writeLog AgentCommunicationLog {..}
|
||||
|
||||
extractRequestLogInfoFromClientInfo :: ClientError -> Maybe RequestLogInfo
|
||||
|
@ -112,7 +112,7 @@ insertManualEvent ::
|
||||
TriggerName ->
|
||||
J.Value ->
|
||||
UserInfo ->
|
||||
Tracing.TraceContext ->
|
||||
Maybe Tracing.TraceContext ->
|
||||
m EventId
|
||||
insertManualEvent sourceConfig tableName triggerName payload _userInfo _traceCtx =
|
||||
liftEitherM $
|
||||
|
@ -74,7 +74,7 @@ runQuery ::
|
||||
runQuery reqId query fieldName _userInfo logger _sourceConfig tx genSql _ = do
|
||||
logQueryLog logger $ mkQueryLog query fieldName genSql reqId
|
||||
withElapsedTime $
|
||||
trace ("MSSQL Query for root field " <>> fieldName) $
|
||||
newSpan ("MSSQL Query for root field " <>> fieldName) $
|
||||
run tx
|
||||
|
||||
runQueryExplain ::
|
||||
@ -109,7 +109,7 @@ runMutation ::
|
||||
runMutation reqId query fieldName _userInfo logger _sourceConfig tx _genSql _ = do
|
||||
logQueryLog logger $ mkQueryLog query fieldName Nothing reqId
|
||||
withElapsedTime $
|
||||
trace ("MSSQL Mutation for root field " <>> fieldName) $
|
||||
newSpan ("MSSQL Mutation for root field " <>> fieldName) $
|
||||
run tx
|
||||
|
||||
runSubscription ::
|
||||
|
@ -49,7 +49,7 @@ runQuery ::
|
||||
runQuery reqId query fieldName _userInfo logger _sourceConfig tx genSql _ = do
|
||||
logQueryLog logger $ mkQueryLog query fieldName genSql reqId
|
||||
withElapsedTime $
|
||||
trace ("MySQL Query for root field " <>> fieldName) $
|
||||
newSpan ("MySQL Query for root field " <>> fieldName) $
|
||||
run tx
|
||||
|
||||
runQueryExplain ::
|
||||
|
@ -137,19 +137,18 @@ sessionInfoJsonExp = S.SELit . encodeToStrictText
|
||||
withUserInfo :: (MonadIO m) => UserInfo -> PG.TxET QErr m a -> PG.TxET QErr m a
|
||||
withUserInfo uInfo tx = setHeadersTx (_uiSession uInfo) >> tx
|
||||
|
||||
setTraceContextInTx :: (MonadIO m) => Tracing.TraceContext -> PG.TxET QErr m ()
|
||||
setTraceContextInTx traceCtx = PG.unitQE defaultTxErrorHandler sql () False
|
||||
where
|
||||
sql =
|
||||
PG.fromText $
|
||||
"SET LOCAL \"hasura.tracecontext\" = "
|
||||
<> toSQLTxt (S.SELit . encodeToStrictText . Tracing.injectEventContext $ traceCtx)
|
||||
setTraceContextInTx :: (MonadIO m) => Maybe Tracing.TraceContext -> PG.TxET QErr m ()
|
||||
setTraceContextInTx = \case
|
||||
Nothing -> pure ()
|
||||
Just ctx -> do
|
||||
let sql = PG.fromText $ "SET LOCAL \"hasura.tracecontext\" = " <> toSQLTxt (S.SELit . encodeToStrictText . toJSON $ ctx)
|
||||
PG.unitQE defaultTxErrorHandler sql () False
|
||||
|
||||
-- | Inject the trace context as a transaction-local variable,
|
||||
-- so that it can be picked up by any triggers (including event triggers).
|
||||
withTraceContext ::
|
||||
(MonadIO m) =>
|
||||
Tracing.TraceContext ->
|
||||
Maybe (Tracing.TraceContext) ->
|
||||
PG.TxET QErr m a ->
|
||||
PG.TxET QErr m a
|
||||
withTraceContext ctx tx = setTraceContextInTx ctx >> tx
|
||||
|
@ -112,7 +112,7 @@ insertManualEvent ::
|
||||
TriggerName ->
|
||||
Value ->
|
||||
UserInfo ->
|
||||
Tracing.TraceContext ->
|
||||
Maybe Tracing.TraceContext ->
|
||||
m EventId
|
||||
insertManualEvent sourceConfig tableName triggerName payload userInfo traceCtx =
|
||||
-- NOTE: The methods `setTraceContextInTx` and `setHeadersTx` are being used
|
||||
|
@ -109,7 +109,7 @@ insertMultipleObjects multiObjIns additionalColumns userInfo mutationOutput plan
|
||||
mutationOutput
|
||||
columnInfos
|
||||
rowCount = tshow . length $ IR._aiInsertObject multiObjIns
|
||||
Tracing.trace ("Insert (" <> rowCount <> ") " <> qualifiedObjectToText table) do
|
||||
Tracing.newSpan ("Insert (" <> rowCount <> ") " <> qualifiedObjectToText table) do
|
||||
Tracing.attachMetadata [("count", rowCount)]
|
||||
PGE.execInsertQuery stringifyNum tCase userInfo (insertQuery, planVars)
|
||||
|
||||
@ -146,7 +146,8 @@ insertObject ::
|
||||
Options.StringifyNumbers ->
|
||||
Maybe NamingCase ->
|
||||
m (Int, Maybe (ColumnValues ('Postgres pgKind) TxtEncodedVal))
|
||||
insertObject singleObjIns additionalColumns userInfo planVars stringifyNum tCase = Tracing.trace ("Insert " <> qualifiedObjectToText table) do
|
||||
insertObject singleObjIns additionalColumns userInfo planVars stringifyNum tCase =
|
||||
Tracing.newSpan ("Insert " <> qualifiedObjectToText table) do
|
||||
validateInsert (Map.keys columns) (map IR._riRelationInfo objectRels) (Map.keys additionalColumns)
|
||||
|
||||
-- insert all object relations and fetch this insert dependent column values
|
||||
|
@ -491,7 +491,7 @@ mkCurPlanTx userInfo ps@(PreparedSql q prepMap) =
|
||||
-- WARNING: this quietly assumes the intmap keys are contiguous
|
||||
prepArgs = fst <$> IntMap.elems args
|
||||
in (,Just ps) $ OnBaseMonad do
|
||||
Tracing.trace "Postgres" $
|
||||
Tracing.newSpan "Postgres" $
|
||||
runIdentity . PG.getRow
|
||||
<$> PG.rawQE dmlTxErrorHandler q prepArgs True
|
||||
|
||||
|
@ -79,7 +79,7 @@ runPGQuery reqId query fieldName _userInfo logger sourceConfig tx genSql resolve
|
||||
-- log the generated SQL and the graphql query
|
||||
logQueryLog logger $ mkQueryLog query fieldName genSql reqId (resolvedConnectionTemplate <$ resolvedConnectionTemplate)
|
||||
withElapsedTime $
|
||||
trace ("Postgres Query for root field " <>> fieldName) $
|
||||
newSpan ("Postgres Query for root field " <>> fieldName) $
|
||||
runQueryTx (_pscExecCtx sourceConfig) (GraphQLQuery resolvedConnectionTemplate) $
|
||||
runOnBaseMonad tx
|
||||
|
||||
@ -104,7 +104,7 @@ runPGMutation reqId query fieldName userInfo logger sourceConfig tx _genSql reso
|
||||
-- log the graphql query
|
||||
logQueryLog logger $ mkQueryLog query fieldName Nothing reqId (resolvedConnectionTemplate <$ resolvedConnectionTemplate)
|
||||
withElapsedTime $
|
||||
trace ("Postgres Mutation for root field " <>> fieldName) $
|
||||
newSpan ("Postgres Mutation for root field " <>> fieldName) $
|
||||
runTxWithCtxAndUserInfo userInfo (_pscExecCtx sourceConfig) (Tx PG.ReadWrite Nothing) (GraphQLQuery resolvedConnectionTemplate) $
|
||||
runOnBaseMonad tx
|
||||
|
||||
@ -189,6 +189,6 @@ runPGMutationTransaction reqId query userInfo logger sourceConfig resolvedConnec
|
||||
withElapsedTime $
|
||||
runTxWithCtxAndUserInfo userInfo (_pscExecCtx sourceConfig) (Tx PG.ReadWrite Nothing) (GraphQLQuery resolvedConnectionTemplate) $
|
||||
flip OMap.traverseWithKey mutations \fieldName dbsi ->
|
||||
trace ("Postgres Mutation for root field " <>> fieldName) $
|
||||
newSpan ("Postgres Mutation for root field " <>> fieldName) $
|
||||
runOnBaseMonad $
|
||||
dbsiAction dbsi
|
||||
|
@ -55,6 +55,7 @@ import Control.Monad.Trans.Control (MonadBaseControl)
|
||||
import Data.Aeson qualified as J
|
||||
import Data.Aeson.Key qualified as Key
|
||||
import Data.Aeson.KeyMap qualified as KeyMap
|
||||
import Data.Aeson.Lens qualified as JL
|
||||
import Data.Aeson.TH
|
||||
import Data.Has
|
||||
import Data.HashMap.Strict qualified as M
|
||||
@ -280,10 +281,10 @@ logFetchedEventsStatistics logger backendEvents =
|
||||
processEventQueue ::
|
||||
forall m.
|
||||
( MonadIO m,
|
||||
Tracing.HasReporter m,
|
||||
MonadBaseControl IO m,
|
||||
LA.Forall (LA.Pure m),
|
||||
MonadMask m
|
||||
MonadMask m,
|
||||
Tracing.MonadTrace m
|
||||
) =>
|
||||
L.Logger L.Hasura ->
|
||||
FetchedEventsStatsLogger ->
|
||||
@ -418,16 +419,31 @@ processEventQueue logger statsLogger httpMgr getSchemaCache EventEngineCtx {..}
|
||||
"It looks like the events processor is keeping up again."
|
||||
return (eventsNext, 0, False)
|
||||
|
||||
-- \| Extract a trace context from an event trigger payload.
|
||||
extractEventContext :: forall io. MonadIO io => J.Value -> io (Maybe Tracing.TraceContext)
|
||||
extractEventContext e = do
|
||||
let traceIdMaybe =
|
||||
Tracing.traceIdFromHex . txtToBs
|
||||
=<< e ^? JL.key "trace_context" . JL.key "trace_id" . JL._String
|
||||
for traceIdMaybe $ \traceId -> do
|
||||
freshSpanId <- Tracing.randomSpanId
|
||||
let parentSpanId =
|
||||
Tracing.spanIdFromHex . txtToBs
|
||||
=<< e ^? JL.key "trace_context" . JL.key "span_id" . JL._String
|
||||
samplingState =
|
||||
Tracing.samplingStateFromHeader $
|
||||
e ^? JL.key "trace_context" . JL.key "sampling_state" . JL._String
|
||||
pure $ Tracing.TraceContext traceId freshSpanId parentSpanId samplingState
|
||||
|
||||
processEvent ::
|
||||
forall io r b.
|
||||
( MonadIO io,
|
||||
MonadReader r io,
|
||||
Has HTTP.Manager r,
|
||||
Has (L.Logger L.Hasura) r,
|
||||
Tracing.HasReporter io,
|
||||
MonadMask io,
|
||||
MonadBaseControl IO io,
|
||||
BackendEventTrigger b
|
||||
BackendEventTrigger b,
|
||||
Tracing.MonadTrace io
|
||||
) =>
|
||||
EventWithSource b ->
|
||||
io ()
|
||||
@ -441,11 +457,11 @@ processEventQueue logger statsLogger httpMgr getSchemaCache EventEngineCtx {..}
|
||||
|
||||
cache <- liftIO getSchemaCache
|
||||
|
||||
tracingCtx <- liftIO (Tracing.extractEventContext (eEvent e))
|
||||
trace <-
|
||||
extractEventContext (eEvent e) <&> \case
|
||||
Nothing -> Tracing.newTrace Tracing.sampleAlways
|
||||
Just ctx -> Tracing.newTraceWith ctx Tracing.sampleAlways
|
||||
let spanName eti = "Event trigger: " <> unNonEmptyText (unTriggerName (etiName eti))
|
||||
runTraceT =
|
||||
(maybe Tracing.runTraceT Tracing.runTraceTInContext tracingCtx)
|
||||
Tracing.sampleAlways
|
||||
|
||||
maintenanceModeVersionEither :: Either QErr (MaintenanceMode MaintenanceModeVersion) <-
|
||||
case maintenanceMode of
|
||||
@ -468,7 +484,7 @@ processEventQueue logger statsLogger httpMgr getSchemaCache EventEngineCtx {..}
|
||||
-- For such an event, we unlock the event and retry after a minute
|
||||
runExceptT (setRetry sourceConfig e (addUTCTime 60 currentTime) maintenanceModeVersion)
|
||||
>>= flip onLeft logQErr
|
||||
Right eti -> runTraceT (spanName eti) do
|
||||
Right eti -> trace (spanName eti) do
|
||||
eventExecutionStartTime <- liftIO getCurrentTime
|
||||
let webhook = wciCachedValue $ etiWebhookInfo eti
|
||||
retryConf = etiRetryConf eti
|
||||
|
@ -336,7 +336,7 @@ invokeRequest reqDetails@RequestDetails {..} respTransform' sessionVars logger =
|
||||
reqBody = fromMaybe J.Null $ view HTTP.body finalReq >>= J.decode @J.Value
|
||||
manager <- asks getter
|
||||
-- Perform the HTTP Request
|
||||
eitherResp <- tracedHttpRequest finalReq $ runHTTP manager
|
||||
eitherResp <- traceHTTPRequest finalReq $ runHTTP manager
|
||||
-- Log the result along with the pre/post transformation Request data
|
||||
logger eitherResp reqDetails
|
||||
resp <- eitherResp `onLeft` (throwError . HTTPError reqBody)
|
||||
|
@ -124,7 +124,6 @@ where
|
||||
import Control.Concurrent.Extended (Forever (..), sleep)
|
||||
import Control.Concurrent.STM
|
||||
import Control.Lens (view)
|
||||
import Control.Monad.Trans.Control (MonadBaseControl)
|
||||
import Data.Aeson qualified as J
|
||||
import Data.Environment qualified as Env
|
||||
import Data.Has
|
||||
@ -230,9 +229,8 @@ generateCronEventsFrom startTime CronTriggerInfo {..} =
|
||||
|
||||
processCronEvents ::
|
||||
( MonadIO m,
|
||||
MonadBaseControl IO m,
|
||||
Tracing.HasReporter m,
|
||||
MonadMetadataStorage m
|
||||
MonadMetadataStorage m,
|
||||
Tracing.MonadTrace m
|
||||
) =>
|
||||
L.Logger L.Hasura ->
|
||||
HTTP.Manager ->
|
||||
@ -284,8 +282,7 @@ processCronEvents logger httpMgr prometheusMetrics cronEvents getSC lockedCronEv
|
||||
|
||||
processOneOffScheduledEvents ::
|
||||
( MonadIO m,
|
||||
MonadBaseControl IO m,
|
||||
Tracing.HasReporter m,
|
||||
Tracing.MonadTrace m,
|
||||
MonadMetadataStorage m
|
||||
) =>
|
||||
Env.Environment ->
|
||||
@ -332,8 +329,7 @@ processOneOffScheduledEvents
|
||||
|
||||
processScheduledTriggers ::
|
||||
( MonadIO m,
|
||||
MonadBaseControl IO m,
|
||||
Tracing.HasReporter m,
|
||||
Tracing.MonadTrace m,
|
||||
MonadMetadataStorage m
|
||||
) =>
|
||||
Env.Environment ->
|
||||
@ -367,8 +363,7 @@ processScheduledEvent ::
|
||||
Has HTTP.Manager r,
|
||||
Has (L.Logger L.Hasura) r,
|
||||
MonadIO m,
|
||||
MonadBaseControl IO m,
|
||||
Tracing.HasReporter m,
|
||||
Tracing.MonadTrace m,
|
||||
MonadMetadataStorage m,
|
||||
MonadError QErr m
|
||||
) =>
|
||||
@ -381,7 +376,7 @@ processScheduledEvent ::
|
||||
ScheduledEventType ->
|
||||
m ()
|
||||
processScheduledEvent prometheusMetrics eventId eventHeaders retryCtx payload webhookUrl type' =
|
||||
Tracing.runTraceT Tracing.sampleAlways traceNote do
|
||||
Tracing.newTrace Tracing.sampleAlways traceNote do
|
||||
currentTime <- liftIO getCurrentTime
|
||||
let retryConf = _rctxConf retryCtx
|
||||
scheduledTime = sewpScheduledTime payload
|
||||
|
@ -432,9 +432,9 @@ asyncActionsProcessor ::
|
||||
( MonadIO m,
|
||||
MonadBaseControl IO m,
|
||||
LA.Forall (LA.Pure m),
|
||||
Tracing.HasReporter m,
|
||||
MonadMetadataStorage m,
|
||||
ProvidesNetwork m
|
||||
ProvidesNetwork m,
|
||||
Tracing.MonadTrace m
|
||||
) =>
|
||||
Env.Environment ->
|
||||
L.Logger L.Hasura ->
|
||||
@ -469,7 +469,8 @@ asyncActionsProcessor env logger getSCFromRef' lockedActionEvents prometheusMetr
|
||||
liftIO $ sleep $ milliseconds sleepTime
|
||||
where
|
||||
callHandler :: ActionCache -> ActionLogItem -> m ()
|
||||
callHandler actionCache actionLogItem = Tracing.runTraceT Tracing.sampleAlways "async actions processor" do
|
||||
callHandler actionCache actionLogItem =
|
||||
Tracing.newTrace Tracing.sampleAlways "async actions processor" do
|
||||
httpManager <- askHTTPManager
|
||||
let ActionLogItem
|
||||
actionId
|
||||
@ -492,7 +493,6 @@ asyncActionsProcessor env logger getSCFromRef' lockedActionEvents prometheusMetr
|
||||
metadataResponseTransform = _adResponseTransform definition
|
||||
eitherRes <-
|
||||
runExceptT $
|
||||
-- TODO: do we need to add the logger as a reader? can't we just give it as an argument?
|
||||
flip runReaderT logger $
|
||||
callWebhook
|
||||
env
|
||||
@ -593,7 +593,7 @@ callWebhook
|
||||
actualSize = fromMaybe requestBodySize transformedReqSize
|
||||
|
||||
httpResponse <-
|
||||
Tracing.tracedHttpRequest actualReq $ \request ->
|
||||
Tracing.traceHTTPRequest actualReq $ \request ->
|
||||
liftIO . try $ HTTP.performRequest request manager
|
||||
|
||||
let requestInfo = ActionRequestInfo webhookEnvName postPayload (confHeaders <> toHeadersConf clientHeaders) transformedReq
|
||||
|
@ -164,7 +164,7 @@ execRemoteGQ env userInfo reqHdrs rsdef gqlReq@GQLReq {..} = do
|
||||
& set HTTP.timeout (HTTP.responseTimeoutMicro (timeout * 1000000))
|
||||
|
||||
manager <- askHTTPManager
|
||||
Tracing.tracedHttpRequest req \req' -> do
|
||||
Tracing.traceHTTPRequest req \req' -> do
|
||||
(time, res) <- withElapsedTime $ liftIO $ try $ HTTP.performRequest req' manager
|
||||
resp <- onLeft res (throwRemoteSchemaHttp webhookEnvRecord)
|
||||
pure (time, mkSetCookieHeaders resp, resp ^. Wreq.responseBody)
|
||||
|
@ -21,6 +21,7 @@ module Hasura.GraphQL.Transport.HTTP
|
||||
AnnotatedResponsePart (..),
|
||||
CacheStoreSuccess (..),
|
||||
CacheStoreFailure (..),
|
||||
CacheStoreResponse,
|
||||
SessVarPred,
|
||||
filterVariablesFromQuery,
|
||||
runSessVarPred,
|
||||
@ -28,7 +29,6 @@ module Hasura.GraphQL.Transport.HTTP
|
||||
where
|
||||
|
||||
import Control.Lens (Traversal', foldOf, to)
|
||||
import Control.Monad.Morph (hoist)
|
||||
import Control.Monad.Trans.Control (MonadBaseControl)
|
||||
import Data.Aeson qualified as J
|
||||
import Data.Aeson.Ordered qualified as JO
|
||||
@ -84,8 +84,7 @@ import Hasura.Server.Telemetry.Counters qualified as Telem
|
||||
import Hasura.Server.Types (RequestId)
|
||||
import Hasura.Services.Network
|
||||
import Hasura.Session
|
||||
import Hasura.Tracing (MonadTrace, TraceT, trace)
|
||||
import Hasura.Tracing qualified as Tracing
|
||||
import Hasura.Tracing (MonadTrace, TraceT, newSpan)
|
||||
import Language.GraphQL.Draft.Syntax qualified as G
|
||||
import Network.HTTP.Types qualified as HTTP
|
||||
import Network.Wai.Extended qualified as Wai
|
||||
@ -136,7 +135,7 @@ class Monad m => MonadExecuteQuery m where
|
||||
-- the client should store it locally. The value ([], Just json) represents
|
||||
-- that the client should not store the response locally, but we do have a
|
||||
-- server-side cache value that can be used to avoid query execution.
|
||||
TraceT (ExceptT QErr m) (HTTP.ResponseHeaders, Maybe EncJSON)
|
||||
m (Either QErr (HTTP.ResponseHeaders, Maybe EncJSON))
|
||||
|
||||
-- | Store a json response for a query that we've executed in the cache. Note
|
||||
-- that, as part of this, 'cacheStore' has to decide whether the response is
|
||||
@ -152,7 +151,7 @@ class Monad m => MonadExecuteQuery m where
|
||||
-- | Result of a query execution
|
||||
EncJSON ->
|
||||
-- | Always succeeds
|
||||
TraceT (ExceptT QErr m) CacheStoreResponse
|
||||
m (Either QErr CacheStoreResponse)
|
||||
|
||||
default cacheLookup ::
|
||||
(m ~ t n, MonadTrans t, MonadExecuteQuery n) =>
|
||||
@ -160,22 +159,22 @@ class Monad m => MonadExecuteQuery m where
|
||||
[ActionsInfo] ->
|
||||
QueryCacheKey ->
|
||||
Maybe CachedDirective ->
|
||||
TraceT (ExceptT QErr m) (HTTP.ResponseHeaders, Maybe EncJSON)
|
||||
cacheLookup a b c d = hoist (hoist lift) $ cacheLookup a b c d
|
||||
m (Either QErr (HTTP.ResponseHeaders, Maybe EncJSON))
|
||||
cacheLookup a b c d = lift $ cacheLookup a b c d
|
||||
|
||||
default cacheStore ::
|
||||
(m ~ t n, MonadTrans t, MonadExecuteQuery n) =>
|
||||
QueryCacheKey ->
|
||||
Maybe CachedDirective ->
|
||||
EncJSON ->
|
||||
TraceT (ExceptT QErr m) CacheStoreResponse
|
||||
cacheStore a b c = hoist (hoist lift) $ cacheStore a b c
|
||||
m (Either QErr CacheStoreResponse)
|
||||
cacheStore a b c = lift $ cacheStore a b c
|
||||
|
||||
instance MonadExecuteQuery m => MonadExecuteQuery (ReaderT r m)
|
||||
|
||||
instance MonadExecuteQuery m => MonadExecuteQuery (ExceptT r m)
|
||||
instance MonadExecuteQuery m => MonadExecuteQuery (ExceptT e m)
|
||||
|
||||
instance MonadExecuteQuery m => MonadExecuteQuery (TraceT m)
|
||||
instance (MonadExecuteQuery m, MonadIO m) => MonadExecuteQuery (TraceT m)
|
||||
|
||||
-- | A partial response, e.g. from a remote schema call or postgres
|
||||
-- postgres query, which we'll assemble into the final response for
|
||||
@ -387,7 +386,7 @@ runGQ env logger reqId userInfo ipAddress reqHeaders queryType reqUnparsed = do
|
||||
E.ResolvedExecutionPlan ->
|
||||
m AnnotatedResponse
|
||||
executePlan reqParsed runLimits execPlan = case execPlan of
|
||||
E.QueryExecutionPlan queryPlans asts dirMap -> trace "Query" $ do
|
||||
E.QueryExecutionPlan queryPlans asts dirMap -> newSpan "Query" $ do
|
||||
-- Attempt to lookup a cached response in the query cache.
|
||||
-- 'keyedLookup' is a monadic action possibly returning a cache hit.
|
||||
-- 'keyedStore' is a function to write a new response to the cache.
|
||||
@ -574,10 +573,8 @@ runGQ env logger reqId userInfo ipAddress reqHeaders queryType reqUnparsed = do
|
||||
queryPlans
|
||||
cacheKey = QueryCacheKey reqParsed (_uiRole userInfo) filteredSessionVars
|
||||
cachedDirective = runIdentity <$> DM.lookup cached dirMap
|
||||
in ( Tracing.interpTraceT (liftEitherM . runExceptT) $
|
||||
cacheLookup remoteSchemas actionsInfo cacheKey cachedDirective,
|
||||
Tracing.interpTraceT (liftEitherM . runExceptT)
|
||||
. cacheStore cacheKey cachedDirective
|
||||
in ( liftEitherM $ cacheLookup remoteSchemas actionsInfo cacheKey cachedDirective,
|
||||
liftEitherM . cacheStore cacheKey cachedDirective
|
||||
)
|
||||
|
||||
recordTimings :: DiffTime -> AnnotatedResponse -> m ()
|
||||
|
@ -54,16 +54,16 @@ createWSServerApp ::
|
||||
( MonadIO m,
|
||||
MC.MonadBaseControl IO m,
|
||||
LA.Forall (LA.Pure m),
|
||||
UserAuthentication (Tracing.TraceT m),
|
||||
UserAuthentication m,
|
||||
E.MonadGQLExecutionCheck m,
|
||||
WS.MonadWSLog m,
|
||||
MonadQueryLog m,
|
||||
Tracing.HasReporter m,
|
||||
MonadExecuteQuery m,
|
||||
MonadMetadataStorage m,
|
||||
EB.MonadQueryTags m,
|
||||
HasResourceLimits m,
|
||||
ProvidesNetwork m
|
||||
ProvidesNetwork m,
|
||||
Tracing.MonadTrace m
|
||||
) =>
|
||||
Env.Environment ->
|
||||
HashSet (L.EngineLogType L.Hasura) ->
|
||||
|
@ -478,7 +478,7 @@ onStart env enabledLogTypes serverEnv wsConn shouldCaptureVariables (StartMsg op
|
||||
(parameterizedQueryHash, execPlan) <- onLeft execPlanE (withComplete . preExecErr requestId (Just gqlOpType))
|
||||
|
||||
case execPlan of
|
||||
E.QueryExecutionPlan queryPlan asts dirMap -> Tracing.trace "Query" $ do
|
||||
E.QueryExecutionPlan queryPlan asts dirMap -> Tracing.newSpan "Query" $ do
|
||||
let filteredSessionVars = runSessVarPred (filterVariablesFromQuery asts) (_uiSession userInfo)
|
||||
cacheKey = QueryCacheKey reqParsed (_uiRole userInfo) filteredSessionVars
|
||||
remoteSchemas =
|
||||
@ -499,7 +499,10 @@ onStart env enabledLogTypes serverEnv wsConn shouldCaptureVariables (StartMsg op
|
||||
|
||||
-- We ignore the response headers (containing TTL information) because
|
||||
-- WebSockets don't support them.
|
||||
(_responseHeaders, cachedValue) <- Tracing.interpTraceT (withExceptT mempty) $ cacheLookup remoteSchemas actionsInfo cacheKey cachedDirective
|
||||
cachedValue <-
|
||||
cacheLookup remoteSchemas actionsInfo cacheKey cachedDirective >>= \case
|
||||
Right (_responseHeaders, cachedValue) -> pure cachedValue
|
||||
Left _err -> throwError ()
|
||||
case cachedValue of
|
||||
Just cachedResponseData -> do
|
||||
logQueryLog logger $ QueryLog q Nothing requestId QueryLogKindCached
|
||||
@ -554,7 +557,6 @@ onStart env enabledLogTypes serverEnv wsConn shouldCaptureVariables (StartMsg op
|
||||
-- Note: The result of cacheStore is ignored here since we can't ensure that
|
||||
-- the WS client will respond correctly to multiple messages.
|
||||
void $
|
||||
Tracing.interpTraceT (withExceptT mempty) $
|
||||
cacheStore cacheKey cachedDirective $
|
||||
encodeAnnotatedResponseParts results
|
||||
|
||||
@ -1000,16 +1002,16 @@ onStart env enabledLogTypes serverEnv wsConn shouldCaptureVariables (StartMsg op
|
||||
|
||||
onMessage ::
|
||||
( MonadIO m,
|
||||
UserAuthentication (Tracing.TraceT m),
|
||||
UserAuthentication m,
|
||||
E.MonadGQLExecutionCheck m,
|
||||
MonadQueryLog m,
|
||||
Tracing.HasReporter m,
|
||||
MonadExecuteQuery m,
|
||||
MC.MonadBaseControl IO m,
|
||||
MonadMetadataStorage m,
|
||||
EB.MonadQueryTags m,
|
||||
HasResourceLimits m,
|
||||
ProvidesNetwork m
|
||||
ProvidesNetwork m,
|
||||
Tracing.MonadTrace m
|
||||
) =>
|
||||
Env.Environment ->
|
||||
HashSet (L.EngineLogType L.Hasura) ->
|
||||
@ -1019,7 +1021,8 @@ onMessage ::
|
||||
LBS.ByteString ->
|
||||
WS.WSActions WSConnData ->
|
||||
m ()
|
||||
onMessage env enabledLogTypes authMode serverEnv wsConn msgRaw onMessageActions = Tracing.runTraceT (_wseTraceSamplingPolicy serverEnv) "websocket" do
|
||||
onMessage env enabledLogTypes authMode serverEnv wsConn msgRaw onMessageActions =
|
||||
Tracing.newTrace (_wseTraceSamplingPolicy serverEnv) "websocket" do
|
||||
case J.eitherDecode msgRaw of
|
||||
Left e -> do
|
||||
let err = ConnErrMsg $ "parsing ClientMessage failed: " <> T.pack e
|
||||
@ -1096,7 +1099,7 @@ stopOperation serverEnv wsConn opId logWhenOpNotExist = do
|
||||
opDet n = OperationDetails opId Nothing n ODStopped Nothing Nothing
|
||||
|
||||
onConnInit ::
|
||||
(MonadIO m, UserAuthentication (Tracing.TraceT m)) =>
|
||||
(MonadIO m, UserAuthentication m) =>
|
||||
L.Logger L.Hasura ->
|
||||
HTTP.Manager ->
|
||||
WSConn ->
|
||||
@ -1106,7 +1109,7 @@ onConnInit ::
|
||||
WS.WSOnErrorMessageAction WSConnData ->
|
||||
-- | this is the message handler for handling "keep-alive" messages to the client
|
||||
WS.WSKeepAliveMessageAction WSConnData ->
|
||||
Tracing.TraceT m ()
|
||||
m ()
|
||||
onConnInit logger manager wsConn authMode connParamsM onConnInitErrAction keepAliveMessageAction = do
|
||||
-- TODO(from master): what should be the behaviour of connection_init message when a
|
||||
-- connection is already iniatilized? Currently, we seem to be doing
|
||||
|
@ -14,7 +14,7 @@ where
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
import Control.Monad.Except
|
||||
import Control.Monad.Trans.Control (MonadBaseControl)
|
||||
import Control.Monad.Trans.Control
|
||||
import Data.Aeson (FromJSON, ToJSON, (.!=), (.:), (.:?), (.=))
|
||||
import Data.Aeson qualified as Aeson
|
||||
import Data.Has
|
||||
@ -85,9 +85,9 @@ runAddDataConnectorAgent ::
|
||||
SC.Build.CacheRWM m,
|
||||
Has (L.Logger L.Hasura) r,
|
||||
MonadReader r m,
|
||||
MonadBaseControl IO m,
|
||||
MonadError Error.QErr m,
|
||||
MonadIO m
|
||||
MonadIO m,
|
||||
MonadBaseControl IO m
|
||||
) =>
|
||||
DCAddAgent ->
|
||||
m EncJSON
|
||||
|
@ -458,7 +458,10 @@ lookupDataConnectorOptions dcName bmap =
|
||||
`onNothing` (Error.throw400 Error.DataConnectorError ("Data connector named " <> Text.E.toTxt dcName <> " was not found in the data connector backend config"))
|
||||
|
||||
querySourceSchema ::
|
||||
(MonadIO m, MonadBaseControl IO m, MonadError QErr m) =>
|
||||
( MonadIO m,
|
||||
MonadBaseControl IO m,
|
||||
MonadError QErr m
|
||||
) =>
|
||||
L.Logger L.Hasura ->
|
||||
HTTP.Manager ->
|
||||
Maybe DC.Types.SourceTimeout ->
|
||||
|
@ -38,7 +38,7 @@ class Backend b => BackendEventTrigger (b :: BackendType) where
|
||||
TriggerName ->
|
||||
Value ->
|
||||
UserInfo ->
|
||||
Tracing.TraceContext ->
|
||||
Maybe Tracing.TraceContext ->
|
||||
m EventId
|
||||
|
||||
-- | @fetchUndeliveredEvents@ fetches the undelivered events from the source
|
||||
|
@ -8,7 +8,7 @@ where
|
||||
|
||||
import Control.Arrow.Extended
|
||||
import Control.Arrow.Interpret
|
||||
import Control.Monad.Trans.Control (MonadBaseControl)
|
||||
import Control.Monad.Trans.Control
|
||||
import Data.Aeson
|
||||
import Data.ByteString.Lazy qualified as BL
|
||||
import Data.Environment qualified as Env
|
||||
@ -93,7 +93,8 @@ buildRemoteSchemas env =
|
||||
|
||||
-- TODO continue propagating MonadTrace up calls so that we can get tracing
|
||||
-- for remote schema introspection. This will require modifying CacheBuild.
|
||||
noopTrace = Tracing.runTraceTWithReporter Tracing.noReporter Tracing.sampleNever "buildSchemaCacheRule"
|
||||
-- TODO(Antoine): do this when changing CacheBuild to be on top of the app's m.
|
||||
noopTrace = Tracing.ignoreTraceT
|
||||
|
||||
mkRemoteSchemaMetadataObject remoteSchema =
|
||||
MetadataObject (MORemoteSchema (_rsmName remoteSchema)) (toJSON remoteSchema)
|
||||
|
@ -399,7 +399,7 @@ runMetadataQuery ::
|
||||
m (EncJSON, RebuildableSchemaCache)
|
||||
runMetadataQuery env logger instanceId userInfo serverConfigCtx schemaCacheRef RQLMetadata {..} = do
|
||||
schemaCache <- liftIO $ fst <$> readSchemaCacheRef schemaCacheRef
|
||||
(metadata, currentResourceVersion) <- Tracing.trace "fetchMetadata" $ liftEitherM fetchMetadata
|
||||
(metadata, currentResourceVersion) <- Tracing.newSpan "fetchMetadata" $ liftEitherM fetchMetadata
|
||||
let exportsMetadata = \case
|
||||
RMV1 (RMExportMetadata _) -> True
|
||||
RMV2 (RMV2ExportMetadata _) -> True
|
||||
@ -439,7 +439,7 @@ runMetadataQuery env logger instanceId userInfo serverConfigCtx schemaCacheRef R
|
||||
String $
|
||||
"Attempting to put new metadata in storage"
|
||||
newResourceVersion <-
|
||||
Tracing.trace "setMetadata" $
|
||||
Tracing.newSpan "setMetadata" $
|
||||
liftEitherM $
|
||||
setMetadata (fromMaybe currentResourceVersion _rqlMetadataResourceVersion) modMetadata
|
||||
L.unLogger logger $
|
||||
@ -448,7 +448,7 @@ runMetadataQuery env logger instanceId userInfo serverConfigCtx schemaCacheRef R
|
||||
"Put new metadata in storage, received new resource version " <> tshow newResourceVersion
|
||||
|
||||
-- notify schema cache sync
|
||||
Tracing.trace "notifySchemaCacheSync" $
|
||||
Tracing.newSpan "notifySchemaCacheSync" $
|
||||
liftEitherM $
|
||||
notifySchemaCacheSync newResourceVersion instanceId cacheInvalidations
|
||||
L.unLogger logger $
|
||||
@ -457,7 +457,7 @@ runMetadataQuery env logger instanceId userInfo serverConfigCtx schemaCacheRef R
|
||||
"Sent schema cache sync notification at resource version " <> tshow newResourceVersion
|
||||
|
||||
(_, modSchemaCache', _) <-
|
||||
Tracing.trace "setMetadataResourceVersionInSchemaCache" $
|
||||
Tracing.newSpan "setMetadataResourceVersionInSchemaCache" $
|
||||
setMetadataResourceVersionInSchemaCache newResourceVersion
|
||||
& runCacheRWT modSchemaCache
|
||||
& peelRun (RunCtx userInfo serverConfigCtx)
|
||||
@ -617,10 +617,10 @@ runMetadataQueryM env currentResourceVersion =
|
||||
-- NOTE: This is a good place to install tracing, since it's involved in
|
||||
-- the recursive case via "bulk":
|
||||
RMV1 q ->
|
||||
Tracing.trace ("v1 " <> T.pack (constrName q)) $
|
||||
Tracing.newSpan ("v1 " <> T.pack (constrName q)) $
|
||||
runMetadataQueryV1M env currentResourceVersion q
|
||||
RMV2 q ->
|
||||
Tracing.trace ("v2 " <> T.pack (constrName q)) $
|
||||
Tracing.newSpan ("v2 " <> T.pack (constrName q)) $
|
||||
runMetadataQueryV2M currentResourceVersion q
|
||||
|
||||
runMetadataQueryV1M ::
|
||||
|
@ -122,7 +122,7 @@ runQuery env instanceId userInfo schemaCache serverConfigCtx rqlQuery = do
|
||||
when ((_sccReadOnlyMode serverConfigCtx == ReadOnlyModeEnabled) && queryModifiesUserDB rqlQuery) $
|
||||
throw400 NotSupported "Cannot run write queries when read-only mode is enabled"
|
||||
|
||||
(metadata, currentResourceVersion) <- Tracing.trace "fetchMetadata" $ liftEitherM fetchMetadata
|
||||
(metadata, currentResourceVersion) <- Tracing.newSpan "fetchMetadata" $ liftEitherM fetchMetadata
|
||||
result <-
|
||||
runQueryM env rqlQuery & \x -> do
|
||||
((js, meta), rsc, ci) <-
|
||||
@ -142,11 +142,11 @@ runQuery env instanceId userInfo schemaCache serverConfigCtx rqlQuery = do
|
||||
MaintenanceModeDisabled -> do
|
||||
-- set modified metadata in storage
|
||||
newResourceVersion <-
|
||||
Tracing.trace "setMetadata" $
|
||||
Tracing.newSpan "setMetadata" $
|
||||
liftEitherM $
|
||||
setMetadata currentResourceVersion updatedMetadata
|
||||
-- notify schema cache sync
|
||||
Tracing.trace "notifySchemaCacheSync" $
|
||||
Tracing.newSpan "notifySchemaCacheSync" $
|
||||
liftEitherM $
|
||||
notifySchemaCacheSync newResourceVersion instanceId invalidations
|
||||
MaintenanceModeEnabled () ->
|
||||
@ -185,7 +185,7 @@ runQueryM ::
|
||||
Env.Environment ->
|
||||
RQLQuery ->
|
||||
m EncJSON
|
||||
runQueryM env rq = Tracing.trace (T.pack $ constrName rq) $ case rq of
|
||||
runQueryM env rq = Tracing.newSpan (T.pack $ constrName rq) $ case rq of
|
||||
RQInsert q -> runInsert q
|
||||
RQSelect q -> runSelect q
|
||||
RQUpdate q -> runUpdate q
|
||||
|
@ -35,6 +35,7 @@ import Data.Aeson.KeyMap qualified as KM
|
||||
import Data.Aeson.Types qualified as J
|
||||
import Data.ByteString.Builder qualified as BB
|
||||
import Data.ByteString.Char8 qualified as B8
|
||||
import Data.ByteString.Char8 qualified as Char8
|
||||
import Data.ByteString.Lazy qualified as BL
|
||||
import Data.CaseInsensitive qualified as CI
|
||||
import Data.HashMap.Strict qualified as M
|
||||
@ -96,6 +97,7 @@ import Hasura.Server.Utils
|
||||
import Hasura.Server.Version
|
||||
import Hasura.Services
|
||||
import Hasura.Session
|
||||
import Hasura.Tracing (MonadTrace)
|
||||
import Hasura.Tracing qualified as Tracing
|
||||
import Network.HTTP.Types qualified as HTTP
|
||||
import Network.Mime (defaultMimeLookup)
|
||||
@ -129,8 +131,7 @@ newtype Handler m a = Handler (ReaderT HandlerCtx (ExceptT QErr m) a)
|
||||
MonadBaseControl b,
|
||||
MonadReader HandlerCtx,
|
||||
MonadError QErr,
|
||||
-- Tracing.HasReporter,
|
||||
Tracing.MonadTrace,
|
||||
MonadTrace,
|
||||
HasResourceLimits,
|
||||
MonadResolveSource,
|
||||
HasServerConfigCtx,
|
||||
@ -271,10 +272,10 @@ mkSpockAction ::
|
||||
( MonadIO m,
|
||||
MonadBaseControl IO m,
|
||||
FromJSON a,
|
||||
UserAuthentication (Tracing.TraceT m),
|
||||
UserAuthentication m,
|
||||
HttpLog m,
|
||||
Tracing.HasReporter m,
|
||||
HasResourceLimits m
|
||||
HasResourceLimits m,
|
||||
MonadTrace m
|
||||
) =>
|
||||
AppContext ->
|
||||
AppEnv ->
|
||||
@ -282,7 +283,7 @@ mkSpockAction ::
|
||||
(Bool -> QErr -> Value) ->
|
||||
-- | `QErr` modifier
|
||||
(QErr -> QErr) ->
|
||||
APIHandler (Tracing.TraceT m) a ->
|
||||
APIHandler m a ->
|
||||
Spock.ActionT m ()
|
||||
mkSpockAction appCtx@AppContext {..} appEnv@AppEnv {..} qErrEncoder qErrModifier apiHandler = do
|
||||
req <- Spock.request
|
||||
@ -294,19 +295,35 @@ mkSpockAction appCtx@AppContext {..} appEnv@AppEnv {..} qErrEncoder qErrModifier
|
||||
(ioWaitTime, reqBody) <- withElapsedTime $ liftIO $ Wai.strictRequestBody req
|
||||
|
||||
(requestId, headers) <- getRequestId origHeaders
|
||||
tracingCtx <- liftIO $ Tracing.extractB3HttpContext headers
|
||||
tracingCtx <- liftIO do
|
||||
-- B3 TraceIds can have a length of either 64 bits (16 hex chars) or 128 bits
|
||||
-- (32 hex chars). For 64-bit TraceIds, we pad them with zeros on the left to
|
||||
-- make them 128 bits long.
|
||||
let traceIdMaybe =
|
||||
lookup "X-B3-TraceId" headers >>= \rawTraceId ->
|
||||
if
|
||||
| Char8.length rawTraceId == 32 ->
|
||||
Tracing.traceIdFromHex rawTraceId
|
||||
| Char8.length rawTraceId == 16 ->
|
||||
Tracing.traceIdFromHex $ Char8.replicate 16 '0' <> rawTraceId
|
||||
| otherwise ->
|
||||
Nothing
|
||||
for traceIdMaybe $ \traceId -> do
|
||||
freshSpanId <- Tracing.randomSpanId
|
||||
let parentSpanId = Tracing.spanIdFromHex =<< lookup "X-B3-SpanId" headers
|
||||
samplingState = Tracing.samplingStateFromHeader $ lookup "X-B3-Sampled" headers
|
||||
pure $ Tracing.TraceContext traceId freshSpanId parentSpanId samplingState
|
||||
|
||||
let runTraceT ::
|
||||
let runTrace ::
|
||||
forall m1 a1.
|
||||
(MonadIO m1, MonadBaseControl IO m1, Tracing.HasReporter m1) =>
|
||||
Tracing.TraceT m1 a1 ->
|
||||
(MonadIO m1, MonadTrace m1) =>
|
||||
m1 a1 ->
|
||||
m1 a1
|
||||
runTraceT = do
|
||||
(maybe Tracing.runTraceT Tracing.runTraceTInContext tracingCtx)
|
||||
appEnvTraceSamplingPolicy
|
||||
(fromString (B8.unpack pathInfo))
|
||||
runTrace = case tracingCtx of
|
||||
Nothing -> Tracing.newTrace appEnvTraceSamplingPolicy (fromString (B8.unpack pathInfo))
|
||||
Just ctx -> Tracing.newTraceWith ctx appEnvTraceSamplingPolicy (fromString (B8.unpack pathInfo))
|
||||
|
||||
getInfo parsedRequest = do
|
||||
let getInfo parsedRequest = do
|
||||
authenticationResp <- lift (resolveUserInfo (_lsLogger appEnvLoggers) appEnvManager headers acAuthMode parsedRequest)
|
||||
authInfo <- onLeft authenticationResp (logErrorAndResp Nothing requestId req (reqBody, Nothing) False origHeaders (ExtraUserInfo Nothing) . qErrModifier)
|
||||
let (userInfo, _, authHeaders, extraUserInfo) = authInfo
|
||||
@ -318,7 +335,7 @@ mkSpockAction appCtx@AppContext {..} appEnv@AppEnv {..} qErrEncoder qErrModifier
|
||||
extraUserInfo
|
||||
)
|
||||
|
||||
mapActionT runTraceT $ do
|
||||
mapActionT runTrace do
|
||||
-- Add the request ID to the tracing metadata so that we
|
||||
-- can correlate requests and traces
|
||||
lift $ Tracing.attachMetadata [("request_id", unRequestId requestId)]
|
||||
@ -400,7 +417,7 @@ v1QueryHandler ::
|
||||
MonadError QErr m,
|
||||
MonadBaseControl IO m,
|
||||
MonadMetadataApiAuthorization m,
|
||||
Tracing.MonadTrace m,
|
||||
MonadTrace m,
|
||||
MonadReader HandlerCtx m,
|
||||
MonadMetadataStorageQueryAPI m,
|
||||
MonadResolveSource m,
|
||||
@ -453,7 +470,7 @@ v1MetadataHandler ::
|
||||
MonadError QErr m,
|
||||
MonadBaseControl IO m,
|
||||
MonadReader HandlerCtx m,
|
||||
Tracing.MonadTrace m,
|
||||
MonadTrace m,
|
||||
MonadMetadataStorageQueryAPI m,
|
||||
MonadResolveSource m,
|
||||
MonadMetadataApiAuthorization m,
|
||||
@ -463,7 +480,7 @@ v1MetadataHandler ::
|
||||
) =>
|
||||
RQLMetadata ->
|
||||
m (HttpResponse EncJSON)
|
||||
v1MetadataHandler query = Tracing.trace "Metadata" $ do
|
||||
v1MetadataHandler query = Tracing.newSpan "Metadata" $ do
|
||||
(liftEitherM . authorizeV1MetadataApi query) =<< ask
|
||||
userInfo <- asks hcUser
|
||||
AppContext {..} <- asks hcAppContext
|
||||
@ -505,7 +522,7 @@ v2QueryHandler ::
|
||||
MonadError QErr m,
|
||||
MonadBaseControl IO m,
|
||||
MonadMetadataApiAuthorization m,
|
||||
Tracing.MonadTrace m,
|
||||
MonadTrace m,
|
||||
MonadReader HandlerCtx m,
|
||||
MonadMetadataStorage m,
|
||||
MonadResolveSource m,
|
||||
@ -514,7 +531,7 @@ v2QueryHandler ::
|
||||
) =>
|
||||
V2Q.RQLQuery ->
|
||||
m (HttpResponse EncJSON)
|
||||
v2QueryHandler query = Tracing.trace "v2 Query" $ do
|
||||
v2QueryHandler query = Tracing.newSpan "v2 Query" $ do
|
||||
(liftEitherM . authorizeV2QueryApi query) =<< ask
|
||||
scRef <- asks (acCacheRef . hcAppContext)
|
||||
logger <- asks (_lsLogger . appEnvLoggers . hcAppEnv)
|
||||
@ -553,7 +570,7 @@ v1Alpha1GQHandler ::
|
||||
MonadBaseControl IO m,
|
||||
E.MonadGQLExecutionCheck m,
|
||||
MonadQueryLog m,
|
||||
Tracing.MonadTrace m,
|
||||
MonadTrace m,
|
||||
GH.MonadExecuteQuery m,
|
||||
MonadError QErr m,
|
||||
MonadReader HandlerCtx m,
|
||||
@ -595,7 +612,7 @@ v1GQHandler ::
|
||||
MonadBaseControl IO m,
|
||||
E.MonadGQLExecutionCheck m,
|
||||
MonadQueryLog m,
|
||||
Tracing.MonadTrace m,
|
||||
MonadTrace m,
|
||||
GH.MonadExecuteQuery m,
|
||||
MonadError QErr m,
|
||||
MonadReader HandlerCtx m,
|
||||
@ -613,7 +630,7 @@ v1GQRelayHandler ::
|
||||
MonadBaseControl IO m,
|
||||
E.MonadGQLExecutionCheck m,
|
||||
MonadQueryLog m,
|
||||
Tracing.MonadTrace m,
|
||||
MonadTrace m,
|
||||
GH.MonadExecuteQuery m,
|
||||
MonadError QErr m,
|
||||
MonadReader HandlerCtx m,
|
||||
@ -634,7 +651,7 @@ gqlExplainHandler ::
|
||||
MonadReader HandlerCtx m,
|
||||
MonadMetadataStorage m,
|
||||
EB.MonadQueryTags m,
|
||||
Tracing.MonadTrace m
|
||||
MonadTrace m
|
||||
) =>
|
||||
GE.GQLExplain ->
|
||||
m (HttpResponse EncJSON)
|
||||
@ -712,7 +729,13 @@ renderHtmlTemplate template jVal =
|
||||
-- | Default implementation of the 'MonadConfigApiHandler'
|
||||
configApiGetHandler ::
|
||||
forall m.
|
||||
(MonadIO m, MonadBaseControl IO m, UserAuthentication (Tracing.TraceT m), HttpLog m, Tracing.HasReporter m, HasResourceLimits m) =>
|
||||
( MonadIO m,
|
||||
MonadBaseControl IO m,
|
||||
UserAuthentication m,
|
||||
HttpLog m,
|
||||
HasResourceLimits m,
|
||||
MonadTrace m
|
||||
) =>
|
||||
AppContext ->
|
||||
AppEnv ->
|
||||
Spock.SpockCtxT () m ()
|
||||
@ -751,13 +774,13 @@ mkWaiApp ::
|
||||
ConsoleRenderer m,
|
||||
MonadVersionAPIWithExtraData m,
|
||||
HttpLog m,
|
||||
UserAuthentication (Tracing.TraceT m),
|
||||
UserAuthentication m,
|
||||
MonadMetadataApiAuthorization m,
|
||||
E.MonadGQLExecutionCheck m,
|
||||
MonadConfigApiHandler m,
|
||||
MonadQueryLog m,
|
||||
WS.MonadWSLog m,
|
||||
Tracing.HasReporter m,
|
||||
MonadTrace m,
|
||||
GH.MonadExecuteQuery m,
|
||||
HasResourceLimits m,
|
||||
MonadMetadataStorageQueryAPI m,
|
||||
@ -818,12 +841,12 @@ httpApp ::
|
||||
ConsoleRenderer m,
|
||||
MonadVersionAPIWithExtraData m,
|
||||
HttpLog m,
|
||||
UserAuthentication (Tracing.TraceT m),
|
||||
UserAuthentication m,
|
||||
MonadMetadataApiAuthorization m,
|
||||
E.MonadGQLExecutionCheck m,
|
||||
MonadConfigApiHandler m,
|
||||
MonadQueryLog m,
|
||||
Tracing.HasReporter m,
|
||||
MonadTrace m,
|
||||
GH.MonadExecuteQuery m,
|
||||
MonadMetadataStorageQueryAPI m,
|
||||
HasResourceLimits m,
|
||||
@ -908,10 +931,11 @@ httpApp setupHook appCtx@AppContext {..} appEnv@AppEnv {..} ekgStore = do
|
||||
MonadMetadataStorage n,
|
||||
EB.MonadQueryTags n,
|
||||
HasResourceLimits n,
|
||||
ProvidesNetwork n
|
||||
ProvidesNetwork n,
|
||||
MonadTrace n
|
||||
) =>
|
||||
RestRequest Spock.SpockMethod ->
|
||||
Handler (Tracing.TraceT n) (HttpLogGraphQLInfo, APIResp)
|
||||
Handler n (HttpLogGraphQLInfo, APIResp)
|
||||
customEndpointHandler restReq = do
|
||||
endpoints <- liftIO $ scEndpoints <$> getSchemaCache acCacheRef
|
||||
execCtx <- mkExecutionContext
|
||||
@ -1077,14 +1101,14 @@ httpApp setupHook appCtx@AppContext {..} appEnv@AppEnv {..} ekgStore = do
|
||||
( FromJSON a,
|
||||
MonadIO n,
|
||||
MonadBaseControl IO n,
|
||||
UserAuthentication (Tracing.TraceT n),
|
||||
UserAuthentication n,
|
||||
HttpLog n,
|
||||
Tracing.HasReporter n,
|
||||
MonadTrace n,
|
||||
HasResourceLimits n
|
||||
) =>
|
||||
(Bool -> QErr -> Value) ->
|
||||
(QErr -> QErr) ->
|
||||
APIHandler (Tracing.TraceT n) a ->
|
||||
APIHandler n a ->
|
||||
Spock.ActionT n ()
|
||||
spockAction qErrEncoder qErrModifier apiHandler = mkSpockAction appCtx appEnv qErrEncoder qErrModifier apiHandler
|
||||
|
||||
|
@ -45,7 +45,6 @@ import Hasura.Server.Auth.JWT hiding (processJwt_)
|
||||
import Hasura.Server.Auth.WebHook
|
||||
import Hasura.Server.Utils
|
||||
import Hasura.Session
|
||||
import Hasura.Tracing qualified as Tracing
|
||||
import Network.HTTP.Client qualified as HTTP
|
||||
import Network.HTTP.Types qualified as HTTP
|
||||
|
||||
@ -106,9 +105,9 @@ data AuthMode
|
||||
--
|
||||
-- This must only be run once, on launch.
|
||||
setupAuthMode ::
|
||||
( Tracing.HasReporter m,
|
||||
MonadError Text m,
|
||||
MonadIO m
|
||||
( MonadError Text m,
|
||||
MonadIO m,
|
||||
MonadBaseControl IO m
|
||||
) =>
|
||||
Set.HashSet AdminSecretHash ->
|
||||
Maybe AuthHook ->
|
||||
@ -147,7 +146,7 @@ setupAuthMode adminSecretHashSet mWebHook mJwtSecrets mUnAuthRole logger httpMan
|
||||
" requires --admin-secret (HASURA_GRAPHQL_ADMIN_SECRET) or "
|
||||
<> " --access-key (HASURA_GRAPHQL_ACCESS_KEY) to be set"
|
||||
|
||||
mkJwtCtx :: (MonadIO m, MonadError Text m) => JWTConfig -> m JWTCtx
|
||||
mkJwtCtx :: (MonadIO m, MonadBaseControl IO m, MonadError Text m) => JWTConfig -> m JWTCtx
|
||||
mkJwtCtx JWTConfig {..} = do
|
||||
(jwkUri, jwkKeyConfig) <- case jcKeyOrUrl of
|
||||
Left jwk -> do
|
||||
@ -157,16 +156,15 @@ setupAuthMode adminSecretHashSet mWebHook mJwtSecrets mUnAuthRole logger httpMan
|
||||
-- which will be populated by the 'updateJWKCtx' poller thread
|
||||
Right uri -> do
|
||||
-- fetch JWK initially and throw error if it fails
|
||||
void $ liftEitherM $ liftIO $ runExceptT $ withJwkError $ Tracing.runTraceT Tracing.sampleAlways "jwk init" $ fetchJwk logger httpManager uri
|
||||
void $ withJwkError $ fetchJwk logger httpManager uri
|
||||
jwkRef <- liftIO $ newIORef (JWKSet [], Nothing)
|
||||
return (Just uri, jwkRef)
|
||||
let jwtHeader = fromMaybe JHAuthorization jcHeader
|
||||
return $ JWTCtx jwkUri jwkKeyConfig jcAudience jcIssuer jcClaims jcAllowedSkew jwtHeader
|
||||
|
||||
withJwkError :: ExceptT JwkFetchError IO (JWKSet, HTTP.ResponseHeaders) -> ExceptT Text IO (JWKSet, HTTP.ResponseHeaders)
|
||||
withJwkError act = do
|
||||
res <- lift $ runExceptT act
|
||||
onLeft res $ \case
|
||||
withJwkError a = do
|
||||
res <- runExceptT a
|
||||
onLeft res \case
|
||||
-- when fetching JWK initially, except expiry parsing error, all errors are critical
|
||||
JFEHttpException _ msg -> throwError msg
|
||||
JFEHttpError _ _ _ e -> throwError e
|
||||
@ -176,7 +174,8 @@ setupAuthMode adminSecretHashSet mWebHook mJwtSecrets mUnAuthRole logger httpMan
|
||||
-- | Core logic to fork a poller thread to update the JWK based on the
|
||||
-- expiry time specified in @Expires@ header or @Cache-Control@ header
|
||||
updateJwkCtx ::
|
||||
(MonadIO m, Tracing.HasReporter m) =>
|
||||
forall m.
|
||||
(MonadIO m, MonadBaseControl IO m) =>
|
||||
AuthMode ->
|
||||
HTTP.Manager ->
|
||||
Logger Hasura ->
|
||||
@ -187,10 +186,7 @@ updateJwkCtx authMode httpManager logger = forever $ do
|
||||
_ -> pure ()
|
||||
liftIO $ sleep $ seconds 1
|
||||
where
|
||||
updateJwkFromUrl ::
|
||||
(Tracing.HasReporter m, MonadIO m) =>
|
||||
JWTCtx ->
|
||||
m ()
|
||||
updateJwkFromUrl :: JWTCtx -> m ()
|
||||
updateJwkFromUrl (JWTCtx url ref _ _ _ _ _) =
|
||||
for_ url \uri -> do
|
||||
(jwkSet, jwkExpiry) <- liftIO $ readIORef ref
|
||||
@ -208,7 +204,7 @@ updateJwkCtx authMode httpManager logger = forever $ do
|
||||
-- | Authenticate the request using the headers and the configured 'AuthMode'.
|
||||
getUserInfoWithExpTime ::
|
||||
forall m.
|
||||
(MonadIO m, MonadBaseControl IO m, MonadError QErr m, Tracing.MonadTrace m) =>
|
||||
(MonadIO m, MonadBaseControl IO m, MonadError QErr m) =>
|
||||
Logger Hasura ->
|
||||
HTTP.Manager ->
|
||||
[HTTP.Header] ->
|
||||
|
@ -99,7 +99,6 @@ import Hasura.Server.Utils
|
||||
userRoleHeader,
|
||||
)
|
||||
import Hasura.Session
|
||||
import Hasura.Tracing qualified as Tracing
|
||||
import Network.HTTP.Client.Transformable qualified as HTTP
|
||||
import Network.HTTP.Types as N
|
||||
import Network.URI (URI)
|
||||
@ -310,18 +309,14 @@ $(J.deriveJSON hasuraJSON ''HasuraClaims)
|
||||
-- | An action that fetches the JWKs and updates the expiry time and JWKs in the
|
||||
-- IORef
|
||||
fetchAndUpdateJWKs ::
|
||||
(MonadIO m) =>
|
||||
(MonadIO m, MonadBaseControl IO m) =>
|
||||
Logger Hasura ->
|
||||
HTTP.Manager ->
|
||||
URI ->
|
||||
IORef (Jose.JWKSet, Maybe UTCTime) ->
|
||||
m ()
|
||||
fetchAndUpdateJWKs logger httpManager url jwkRef = do
|
||||
res <-
|
||||
liftIO $
|
||||
runExceptT $
|
||||
Tracing.runTraceT Tracing.sampleAlways "jwk fetch" $
|
||||
fetchJwk logger httpManager url
|
||||
res <- runExceptT $ fetchJwk logger httpManager url
|
||||
case res of
|
||||
-- As this 'fetchJwk' is going to happen always in background thread, we are
|
||||
-- not going to throw fatal error(s). If there is any error fetching JWK -
|
||||
@ -352,8 +347,7 @@ fetchAndUpdateJWKs logger httpManager url jwkRef = do
|
||||
fetchJwk ::
|
||||
( MonadIO m,
|
||||
MonadBaseControl IO m,
|
||||
MonadError JwkFetchError m,
|
||||
Tracing.MonadTrace m
|
||||
MonadError JwkFetchError m
|
||||
) =>
|
||||
Logger Hasura ->
|
||||
HTTP.Manager ->
|
||||
@ -366,9 +360,7 @@ fetchJwk (Logger logger) manager url = do
|
||||
res <- try $ do
|
||||
req <- liftIO $ HTTP.mkRequestThrow $ tshow url
|
||||
let req' = req & over HTTP.headers addDefaultHeaders
|
||||
|
||||
Tracing.tracedHttpRequest req' \req'' -> do
|
||||
liftIO $ HTTP.performRequest req'' manager
|
||||
liftIO $ HTTP.performRequest req' manager
|
||||
resp <- onLeft res logAndThrowHttp
|
||||
let status = resp ^. Wreq.responseStatus
|
||||
respBody = resp ^. Wreq.responseBody
|
||||
|
@ -24,7 +24,6 @@ import Hasura.Prelude
|
||||
import Hasura.Server.Logging
|
||||
import Hasura.Server.Utils
|
||||
import Hasura.Session
|
||||
import Hasura.Tracing qualified as Tracing
|
||||
import Network.HTTP.Client.Transformable qualified as HTTP
|
||||
import Network.Wreq qualified as Wreq
|
||||
|
||||
@ -54,7 +53,7 @@ hookMethod authHook = case ahType authHook of
|
||||
-- for finer-grained auth. (#2666)
|
||||
userInfoFromAuthHook ::
|
||||
forall m.
|
||||
(MonadIO m, MonadBaseControl IO m, MonadError QErr m, Tracing.MonadTrace m) =>
|
||||
(MonadIO m, MonadBaseControl IO m, MonadError QErr m) =>
|
||||
Logger Hasura ->
|
||||
HTTP.Manager ->
|
||||
AuthHook ->
|
||||
@ -73,22 +72,22 @@ userInfoFromAuthHook logger manager hook reqHeaders reqs = do
|
||||
performHTTPRequest = do
|
||||
let url = T.unpack $ ahUrl hook
|
||||
req <- liftIO $ HTTP.mkRequestThrow $ T.pack url
|
||||
Tracing.tracedHttpRequest req \req' -> liftIO do
|
||||
liftIO do
|
||||
case ahType hook of
|
||||
AHTGet -> do
|
||||
let isCommonHeader = (`elem` commonClientHeadersIgnored)
|
||||
filteredHeaders = filter (not . isCommonHeader . fst) reqHeaders
|
||||
req'' = req' & set HTTP.headers (addDefaultHeaders filteredHeaders)
|
||||
HTTP.performRequest req'' manager
|
||||
req' = req & set HTTP.headers (addDefaultHeaders filteredHeaders)
|
||||
HTTP.performRequest req' manager
|
||||
AHTPost -> do
|
||||
let contentType = ("Content-Type", "application/json")
|
||||
headersPayload = J.toJSON $ Map.fromList $ hdrsToText reqHeaders
|
||||
req'' =
|
||||
req' =
|
||||
req
|
||||
& set HTTP.method "POST"
|
||||
& set HTTP.headers (addDefaultHeaders [contentType])
|
||||
& set HTTP.body (Just $ J.encode $ object ["headers" J..= headersPayload, "request" J..= reqs])
|
||||
HTTP.performRequest req'' manager
|
||||
HTTP.performRequest req' manager
|
||||
|
||||
logAndThrow :: HTTP.HttpException -> m a
|
||||
logAndThrow err = do
|
||||
|
@ -1,498 +1,83 @@
|
||||
{-# LANGUAGE UndecidableInstances #-}
|
||||
module Hasura.Tracing (module Tracing) where
|
||||
|
||||
module Hasura.Tracing
|
||||
( MonadTrace (..),
|
||||
TraceT,
|
||||
runTraceT,
|
||||
runTraceTWith,
|
||||
runTraceTWithReporter,
|
||||
runTraceTInContext,
|
||||
ignoreTraceT,
|
||||
interpTraceT,
|
||||
TraceContext (..),
|
||||
Reporter (..),
|
||||
noReporter,
|
||||
HasReporter (..),
|
||||
SamplingPolicy,
|
||||
sampleNever,
|
||||
sampleAlways,
|
||||
sampleRandomly,
|
||||
sampleOneInN,
|
||||
TracingMetadata,
|
||||
extractB3HttpContext,
|
||||
tracedHttpRequest,
|
||||
injectEventContext,
|
||||
extractEventContext,
|
||||
)
|
||||
where
|
||||
import Hasura.Tracing.Class as Tracing
|
||||
import Hasura.Tracing.Context as Tracing
|
||||
import Hasura.Tracing.Monad as Tracing
|
||||
import Hasura.Tracing.Reporter as Tracing
|
||||
import Hasura.Tracing.Sampling as Tracing
|
||||
import Hasura.Tracing.TraceId as Tracing
|
||||
import Hasura.Tracing.Utils as Tracing
|
||||
|
||||
import Control.Lens (over, view, (^?))
|
||||
import Control.Monad.Catch (MonadCatch, MonadMask, MonadThrow)
|
||||
import Control.Monad.Morph
|
||||
import Control.Monad.Trans.Control
|
||||
import Data.Aeson qualified as J
|
||||
import Data.Aeson.Lens qualified as JL
|
||||
import Data.ByteString.Char8 qualified as Char8
|
||||
import Data.IORef
|
||||
import Data.String (fromString)
|
||||
import Hasura.Prelude
|
||||
import Hasura.Tracing.TraceId
|
||||
( SpanId,
|
||||
TraceId,
|
||||
randomSpanId,
|
||||
randomTraceId,
|
||||
spanIdFromHex,
|
||||
spanIdToHex,
|
||||
traceIdFromHex,
|
||||
traceIdToHex,
|
||||
)
|
||||
import Network.HTTP.Client.Transformable qualified as HTTP
|
||||
import Refined (Positive, Refined, unrefine)
|
||||
import System.Random.Stateful qualified as Random
|
||||
{- Note [Tracing]
|
||||
|
||||
-- | Any additional human-readable key-value pairs relevant
|
||||
-- to the execution of a block of code.
|
||||
type TracingMetadata = [(Text, Text)]
|
||||
## Usage
|
||||
|
||||
newtype Reporter = Reporter
|
||||
{ runReporter ::
|
||||
forall io a.
|
||||
(MonadIO io, MonadBaseControl IO io) =>
|
||||
TraceContext ->
|
||||
-- the current trace context
|
||||
Text ->
|
||||
-- human-readable name for this block of code
|
||||
IO TracingMetadata ->
|
||||
-- an IO action that gets all of the metadata logged so far by the action
|
||||
-- being traced
|
||||
io a ->
|
||||
-- the action we want to trace
|
||||
io a
|
||||
}
|
||||
The Tracing library allows us to trace arbitrary pieces of our code, providing
|
||||
that the current monad implements 'MonadTrace'.
|
||||
|
||||
noReporter :: Reporter
|
||||
noReporter = Reporter \_ _ _ -> id
|
||||
newTrace "request" do
|
||||
userInfo <- newSpan "authentication" retrieveUserInfo
|
||||
parsedQuery <- newSpan "parsing" $ parseQuery q
|
||||
result <- newSpan "execution" $ runQuery parsedQuery userInfo
|
||||
pure result
|
||||
|
||||
-- | A type class for monads which support some way to report execution traces.
|
||||
--
|
||||
-- See @instance Tracing.HasReporter (AppM impl)@ in @HasuraPro.App@.
|
||||
class Monad m => HasReporter m where
|
||||
-- | Get the current tracer
|
||||
askReporter :: m Reporter
|
||||
default askReporter :: m Reporter
|
||||
askReporter = pure noReporter
|
||||
## Trace and span
|
||||
|
||||
instance HasReporter m => HasReporter (ReaderT r m) where
|
||||
askReporter = lift askReporter
|
||||
Each _trace_ is distinct, and is composed of one or more _spans_. Spans are
|
||||
organized as a tree: the root span covers the entire trace, and each sub span
|
||||
keeps track of its parent.
|
||||
|
||||
instance HasReporter m => HasReporter (ExceptT e m) where
|
||||
askReporter = lift askReporter
|
||||
We report each span individually, and to each of them we associate a
|
||||
'TraceContext', that contains:
|
||||
- a trace id, common to all the spans of that trace
|
||||
- a unique span id, generated randomly
|
||||
- the span id of the parent span, if any
|
||||
- whether that trace was sampled (see "Sampling").
|
||||
|
||||
instance HasReporter IO
|
||||
All of this can be retrieved for the current span with 'currentContext'.
|
||||
|
||||
-- | A trace context records the current active trace,
|
||||
-- the active span within that trace, and the span's parent,
|
||||
-- unless the current span is the root.
|
||||
data TraceContext = TraceContext
|
||||
{ -- | TODO what is this exactly? The topmost span id?
|
||||
tcCurrentTrace :: !TraceId,
|
||||
tcCurrentSpan :: !SpanId,
|
||||
tcCurrentParent :: !(Maybe SpanId),
|
||||
tcSamplingState :: !SamplingState
|
||||
}
|
||||
Starting a new trace masks the previous one; in the following example, "span2"
|
||||
is associated to "trace2" and "span1" is associated to "trace1"; the two trees
|
||||
are distinct:
|
||||
|
||||
-- | B3 propagation sampling state.
|
||||
--
|
||||
-- Debug sampling state not represented.
|
||||
data SamplingState = SamplingDefer | SamplingDeny | SamplingAccept
|
||||
newTrace "trace1" $
|
||||
newSpan "span1" $
|
||||
newTrace "trace2" $
|
||||
newSpan "span2"
|
||||
|
||||
-- | Convert a sampling state to a value for the X-B3-Sampled header. A return
|
||||
-- value of Nothing indicates that the header should not be set.
|
||||
samplingStateToHeader :: IsString s => SamplingState -> Maybe s
|
||||
samplingStateToHeader = \case
|
||||
SamplingDefer -> Nothing
|
||||
SamplingDeny -> Just "0"
|
||||
SamplingAccept -> Just "1"
|
||||
Lastly, a span that is started outside of a root trace is, for now, silently
|
||||
ignored, as it has no trace id to attach to. This is a design decision we may
|
||||
revisit.
|
||||
|
||||
-- | Convert a X-B3-Sampled header value to a sampling state. An input of
|
||||
-- Nothing indicates that the header was not set.
|
||||
samplingStateFromHeader :: (IsString s, Eq s) => Maybe s -> SamplingState
|
||||
samplingStateFromHeader = \case
|
||||
Nothing -> SamplingDefer
|
||||
Just "0" -> SamplingDeny
|
||||
Just "1" -> SamplingAccept
|
||||
Just _ -> SamplingDefer
|
||||
## Metadata
|
||||
|
||||
data TraceTEnv = TraceTEnv
|
||||
{ tteTraceContext :: TraceContext,
|
||||
tteReporter :: Reporter,
|
||||
tteMetadataRef :: IORef TracingMetadata,
|
||||
tteSamplingDecision :: SamplingDecision
|
||||
}
|
||||
Metadata can be attached to the current trace with 'attachMetadata', as a list
|
||||
of pair of text key and text values.
|
||||
|
||||
-- | A local decision about whether or not to sample spans.
|
||||
data SamplingDecision = SampleNever | SampleAlways
|
||||
## Reporters
|
||||
|
||||
-- | An IO action for deciding whether or not to sample a trace.
|
||||
--
|
||||
-- Currently restricted to deny access to the B3 sampling state, but we may
|
||||
-- want to be more flexible in the future.
|
||||
type SamplingPolicy = IO SamplingDecision
|
||||
'TraceT' is the de-facto implementation of 'MonadTrace'; but, in practice, it
|
||||
only does half the job: once a span finishes, 'TraceT' delegates the job of
|
||||
actually reporting / exporting all relevant information to a 'Reporter'. Said
|
||||
reporter must be provided to 'runTraceT', and is a wrapper around a function in
|
||||
IO that processes the span.
|
||||
|
||||
-- Helper for consistently deciding whether or not to sample a trace based on
|
||||
-- trace context and sampling policy.
|
||||
decideSampling :: SamplingState -> SamplingPolicy -> IO SamplingDecision
|
||||
decideSampling samplingState samplingPolicy =
|
||||
case samplingState of
|
||||
SamplingDefer -> samplingPolicy
|
||||
SamplingDeny -> pure SampleNever
|
||||
SamplingAccept -> pure SampleAlways
|
||||
In practice, 'TraceT' is only a reader that keeps track of the reporter, the
|
||||
default sampling policy, and the current trace.
|
||||
|
||||
-- Helper for consistently updating the sampling state when a sampling decision
|
||||
-- is made.
|
||||
updateSamplingState :: SamplingDecision -> SamplingState -> SamplingState
|
||||
updateSamplingState samplingDecision = \case
|
||||
SamplingDefer ->
|
||||
case samplingDecision of
|
||||
SampleNever -> SamplingDefer
|
||||
SampleAlways -> SamplingAccept
|
||||
SamplingDeny -> SamplingDeny
|
||||
SamplingAccept -> SamplingAccept
|
||||
## Sampling
|
||||
|
||||
sampleNever :: SamplingPolicy
|
||||
sampleNever = pure SampleNever
|
||||
To run 'TraceT', you must also provide a 'SamplingPolicy': an IO action that,
|
||||
when evaluated, will decide whether an arbitrary trace should be reporter or
|
||||
not. This decision is only made once per trace: every span within a trace will
|
||||
use the same result: they're either all reporter, or none of them are.
|
||||
|
||||
sampleAlways :: SamplingPolicy
|
||||
sampleAlways = pure SampleAlways
|
||||
When starting a trace, the default sampling policy can be overriden. You can for
|
||||
instance run 'TraceT' with an action that, by default, only reports one out of
|
||||
every ten traces, but use 'newTraceWithPolicy sampleAlways' when sending
|
||||
critical requests to your authentication service.
|
||||
|
||||
-- @sampleRandomly p@ returns `SampleAlways` with probability @p@ and
|
||||
-- `SampleNever` with probability @1 - p@.
|
||||
sampleRandomly :: Double -> SamplingPolicy
|
||||
sampleRandomly samplingProbability
|
||||
| samplingProbability <= 0 = pure SampleNever
|
||||
| samplingProbability >= 1 = pure SampleAlways
|
||||
| otherwise = do
|
||||
x <- Random.uniformRM (0, 1) Random.globalStdGen
|
||||
pure $ if x < samplingProbability then SampleAlways else SampleNever
|
||||
Note that sampling and reporting are distinct: using 'sampleAlways' simply
|
||||
guarantees that the 'Reporter' you provided will be called.
|
||||
|
||||
-- Like @sampleRandomly@, but with the probability expressed as the denominator
|
||||
-- N of the fraction 1/N.
|
||||
sampleOneInN :: Refined Positive Int -> SamplingPolicy
|
||||
sampleOneInN denominator
|
||||
| n == 1 = pure SampleAlways
|
||||
| otherwise = do
|
||||
x <- Random.uniformRM (0, n - 1) Random.globalStdGen
|
||||
pure $ if x == 0 then SampleAlways else SampleNever
|
||||
where
|
||||
n = unrefine denominator
|
||||
|
||||
-- | The 'TraceT' monad transformer adds the ability to keep track of
|
||||
-- the current trace context.
|
||||
newtype TraceT m a = TraceT {unTraceT :: ReaderT TraceTEnv m a}
|
||||
deriving
|
||||
( Functor,
|
||||
Applicative,
|
||||
Monad,
|
||||
MonadIO,
|
||||
MonadFix,
|
||||
MonadMask,
|
||||
MonadCatch,
|
||||
MonadThrow,
|
||||
MonadBase b,
|
||||
MonadBaseControl b
|
||||
)
|
||||
|
||||
instance MonadTrans TraceT where
|
||||
lift = TraceT . lift
|
||||
|
||||
instance MFunctor TraceT where
|
||||
hoist f (TraceT rwma) = TraceT (hoist f rwma)
|
||||
|
||||
instance MonadError e m => MonadError e (TraceT m) where
|
||||
throwError = lift . throwError
|
||||
catchError (TraceT m) f = TraceT (catchError m (unTraceT . f))
|
||||
|
||||
instance MonadReader r m => MonadReader r (TraceT m) where
|
||||
ask = TraceT $ lift ask
|
||||
local f m = TraceT $ mapReaderT (local f) (unTraceT m)
|
||||
|
||||
-- | Run an action in the 'TraceT' monad transformer.
|
||||
-- 'runTraceT' delimits a new trace with its root span, and the arguments
|
||||
-- specify a name and metadata for that span.
|
||||
runTraceT ::
|
||||
(HasReporter m, MonadIO m, MonadBaseControl IO m) =>
|
||||
SamplingPolicy ->
|
||||
Text ->
|
||||
TraceT m a ->
|
||||
m a
|
||||
runTraceT policy name tma = do
|
||||
rep <- askReporter
|
||||
runTraceTWithReporter rep policy name tma
|
||||
|
||||
runTraceTWith ::
|
||||
(MonadIO m, MonadBaseControl IO m) =>
|
||||
TraceContext ->
|
||||
Reporter ->
|
||||
SamplingPolicy ->
|
||||
Text ->
|
||||
TraceT m a ->
|
||||
m a
|
||||
runTraceTWith ctx rep policy name tma = do
|
||||
samplingDecision <- liftIO $ decideSampling (tcSamplingState ctx) policy
|
||||
metadataRef <- liftIO $ newIORef []
|
||||
let subCtx =
|
||||
ctx
|
||||
{ tcSamplingState =
|
||||
updateSamplingState samplingDecision (tcSamplingState ctx)
|
||||
}
|
||||
report =
|
||||
case samplingDecision of
|
||||
SampleNever -> id
|
||||
SampleAlways -> do
|
||||
runReporter rep ctx name (readIORef metadataRef)
|
||||
report $
|
||||
runReaderT (unTraceT tma) (TraceTEnv subCtx rep metadataRef samplingDecision)
|
||||
|
||||
-- | Run an action in the 'TraceT' monad transformer in an
|
||||
-- existing context.
|
||||
runTraceTInContext ::
|
||||
(MonadIO m, MonadBaseControl IO m, HasReporter m) =>
|
||||
TraceContext ->
|
||||
SamplingPolicy ->
|
||||
Text ->
|
||||
TraceT m a ->
|
||||
m a
|
||||
runTraceTInContext ctx policy name tma = do
|
||||
rep <- askReporter
|
||||
runTraceTWith ctx rep policy name tma
|
||||
|
||||
-- | Run an action in the 'TraceT' monad transformer in an
|
||||
-- existing context.
|
||||
runTraceTWithReporter ::
|
||||
(MonadIO m, MonadBaseControl IO m) =>
|
||||
Reporter ->
|
||||
SamplingPolicy ->
|
||||
Text ->
|
||||
TraceT m a ->
|
||||
m a
|
||||
runTraceTWithReporter rep policy name tma = do
|
||||
ctx <-
|
||||
TraceContext
|
||||
<$> liftIO randomTraceId
|
||||
<*> liftIO randomSpanId
|
||||
<*> pure Nothing
|
||||
<*> pure SamplingDefer
|
||||
runTraceTWith ctx rep policy name tma
|
||||
|
||||
-- | Run an action in the 'TraceT' monad transformer while suppressing all
|
||||
-- tracing-related side-effects.
|
||||
ignoreTraceT :: (MonadIO m, MonadBaseControl IO m) => TraceT m a -> m a
|
||||
ignoreTraceT = runTraceTWithReporter noReporter sampleNever ""
|
||||
|
||||
-- | Monads which support tracing. 'TraceT' is the standard example.
|
||||
class Monad m => MonadTrace m where
|
||||
-- | Trace the execution of a block of code, attaching a human-readable name.
|
||||
trace :: Text -> m a -> m a
|
||||
|
||||
-- | Ask for the current tracing context, so that we can provide it to any
|
||||
-- downstream services, e.g. in HTTP headers.
|
||||
currentContext :: m TraceContext
|
||||
|
||||
-- | Ask for the current tracing reporter
|
||||
currentReporter :: m Reporter
|
||||
|
||||
-- | Ask for the current handle on the tracing metadata
|
||||
currentMetadataRef :: m (IORef TracingMetadata)
|
||||
|
||||
-- | Ask for the current sampling decision
|
||||
currentSamplingDecision :: m SamplingDecision
|
||||
|
||||
-- | Log some metadata to be attached to the current span
|
||||
attachMetadata :: TracingMetadata -> m ()
|
||||
|
||||
-- | Reinterpret a 'TraceT' action in another 'MonadTrace'.
|
||||
-- This can be useful when you need to reorganize a monad transformer stack, for
|
||||
-- example, to embed an action in some monadic computation, while preserving tracing
|
||||
-- metadata and context.
|
||||
--
|
||||
-- For example, we use this function in various places in 'BackendExecute',
|
||||
-- where we receive an action to execute in some concrete monad transformer stack.
|
||||
-- See the various implementations of 'runQuery' for examples.
|
||||
-- Ideally, the input computation's type would be sufficiently polymorphic that
|
||||
-- we would not need to reorder monads inthe transformer stack. However, the monad
|
||||
-- transformer stacks must be concrete, because their types are defined by
|
||||
-- an associated type family 'ExecutionMonad'. Hence, we need to use this function
|
||||
-- to peel off the outermost 'TraceT' constructor, and embed the computation in some
|
||||
-- other 'MonadTrace'.
|
||||
--
|
||||
-- A second example is related to caching. The 'cacheLookup' function returns an
|
||||
-- action in a concrete transformer stack, again because we are constrained by the
|
||||
-- usage of a type class. We need to reinterpret the 'TraceT' component of this
|
||||
-- concrete stack in some other abstract monad transformer stack, using this function.
|
||||
--
|
||||
-- Laws:
|
||||
--
|
||||
-- > interpTraceT id (hoist f (TraceT x)) = interpTraceT f (TraceT x)
|
||||
interpTraceT :: MonadTrace n => (m a -> n b) -> TraceT m a -> n b
|
||||
interpTraceT f (TraceT rma) = do
|
||||
ctx <- currentContext
|
||||
rep <- currentReporter
|
||||
metadataRef <- currentMetadataRef
|
||||
samplingDecision <- currentSamplingDecision
|
||||
f (runReaderT rma (TraceTEnv ctx rep metadataRef samplingDecision))
|
||||
|
||||
-- | If the underlying monad can report trace data, then 'TraceT' will
|
||||
-- collect it and hand it off to that reporter.
|
||||
instance (MonadIO m, MonadBaseControl IO m) => MonadTrace (TraceT m) where
|
||||
-- Note: this implementation is so awkward because we don't want to give the
|
||||
-- derived MonadReader/Writer instances to TraceT
|
||||
trace name ma =
|
||||
TraceT $
|
||||
ReaderT $ \env@(TraceTEnv ctx rep _ samplingDecision) -> do
|
||||
case samplingDecision of
|
||||
SampleNever -> runReaderT (unTraceT ma) env
|
||||
SampleAlways -> do
|
||||
spanId <- liftIO randomSpanId
|
||||
let subCtx =
|
||||
ctx
|
||||
{ tcCurrentSpan = spanId,
|
||||
tcCurrentParent = Just (tcCurrentSpan ctx)
|
||||
}
|
||||
metadataRef <- liftIO $ newIORef []
|
||||
runReporter rep subCtx name (readIORef metadataRef) $
|
||||
runReaderT
|
||||
(unTraceT ma)
|
||||
(TraceTEnv subCtx rep metadataRef samplingDecision)
|
||||
|
||||
currentContext = TraceT (asks tteTraceContext)
|
||||
|
||||
currentReporter = TraceT (asks tteReporter)
|
||||
|
||||
currentMetadataRef = TraceT (asks tteMetadataRef)
|
||||
|
||||
currentSamplingDecision = TraceT (asks tteSamplingDecision)
|
||||
|
||||
attachMetadata metadata =
|
||||
TraceT $
|
||||
ReaderT $ \env ->
|
||||
liftIO $ modifyIORef' (tteMetadataRef env) (metadata ++)
|
||||
|
||||
instance MonadTrace m => MonadTrace (ReaderT r m) where
|
||||
trace = mapReaderT . trace
|
||||
currentContext = lift currentContext
|
||||
currentReporter = lift currentReporter
|
||||
currentMetadataRef = lift currentMetadataRef
|
||||
currentSamplingDecision = lift currentSamplingDecision
|
||||
attachMetadata = lift . attachMetadata
|
||||
|
||||
instance MonadTrace m => MonadTrace (StateT e m) where
|
||||
trace = mapStateT . trace
|
||||
currentContext = lift currentContext
|
||||
currentReporter = lift currentReporter
|
||||
currentMetadataRef = lift currentMetadataRef
|
||||
currentSamplingDecision = lift currentSamplingDecision
|
||||
attachMetadata = lift . attachMetadata
|
||||
|
||||
instance MonadTrace m => MonadTrace (ExceptT e m) where
|
||||
trace = mapExceptT . trace
|
||||
currentContext = lift currentContext
|
||||
currentReporter = lift currentReporter
|
||||
currentMetadataRef = lift currentMetadataRef
|
||||
currentSamplingDecision = lift currentSamplingDecision
|
||||
attachMetadata = lift . attachMetadata
|
||||
|
||||
-- | Inject the trace context as a set of HTTP headers.
|
||||
injectB3HttpContext :: TraceContext -> [HTTP.Header]
|
||||
injectB3HttpContext TraceContext {..} =
|
||||
let traceId = (b3HeaderTraceId, traceIdToHex tcCurrentTrace)
|
||||
spanId = (b3HeaderSpanId, spanIdToHex tcCurrentSpan)
|
||||
parentSpanIdMaybe =
|
||||
(,) b3HeaderParentSpanId . spanIdToHex <$> tcCurrentParent
|
||||
samplingStateMaybe =
|
||||
(,) b3HeaderSampled <$> samplingStateToHeader tcSamplingState
|
||||
in traceId : spanId : catMaybes [parentSpanIdMaybe, samplingStateMaybe]
|
||||
|
||||
-- | Extract the trace and parent span headers from a HTTP request
|
||||
-- and create a new 'TraceContext'. The new context will contain
|
||||
-- a fresh span ID, and the provided span ID will be assigned as
|
||||
-- the immediate parent span.
|
||||
extractB3HttpContext :: [HTTP.Header] -> IO (Maybe TraceContext)
|
||||
extractB3HttpContext hdrs = do
|
||||
-- B3 TraceIds can have a length of either 64 bits (16 hex chars) or 128 bits
|
||||
-- (32 hex chars). For 64-bit TraceIds, we pad them with zeros on the left to
|
||||
-- make them 128 bits long.
|
||||
let traceIdMaybe =
|
||||
lookup b3HeaderTraceId hdrs >>= \rawTraceId ->
|
||||
if
|
||||
| Char8.length rawTraceId == 32 ->
|
||||
traceIdFromHex rawTraceId
|
||||
| Char8.length rawTraceId == 16 ->
|
||||
traceIdFromHex $ Char8.replicate 16 '0' <> rawTraceId
|
||||
| otherwise ->
|
||||
Nothing
|
||||
for traceIdMaybe $ \traceId -> do
|
||||
freshSpanId <- liftIO randomSpanId
|
||||
let parentSpanId = spanIdFromHex =<< lookup b3HeaderSpanId hdrs
|
||||
samplingState = samplingStateFromHeader $ lookup b3HeaderSampled hdrs
|
||||
pure $ TraceContext traceId freshSpanId parentSpanId samplingState
|
||||
|
||||
b3HeaderTraceId, b3HeaderSpanId, b3HeaderParentSpanId, b3HeaderSampled :: IsString s => s
|
||||
b3HeaderTraceId = "X-B3-TraceId"
|
||||
b3HeaderSpanId = "X-B3-SpanId"
|
||||
b3HeaderParentSpanId = "X-B3-ParentSpanId"
|
||||
b3HeaderSampled = "X-B3-Sampled"
|
||||
|
||||
-- | Inject the trace context as a JSON value, appropriate for
|
||||
-- storing in (e.g.) an event trigger payload.
|
||||
injectEventContext :: TraceContext -> J.Value
|
||||
injectEventContext TraceContext {..} =
|
||||
let idFields =
|
||||
[ eventKeyTraceId J..= bsToTxt (traceIdToHex tcCurrentTrace),
|
||||
eventKeySpanId J..= bsToTxt (spanIdToHex tcCurrentSpan)
|
||||
]
|
||||
samplingFieldMaybe =
|
||||
(J..=) eventKeySamplingState <$> samplingStateToHeader @Text tcSamplingState
|
||||
in J.object $ idFields ++ maybeToList samplingFieldMaybe
|
||||
|
||||
-- | Extract a trace context from an event trigger payload.
|
||||
extractEventContext :: J.Value -> IO (Maybe TraceContext)
|
||||
extractEventContext e = do
|
||||
let traceIdMaybe =
|
||||
traceIdFromHex . txtToBs
|
||||
=<< e ^? JL.key "trace_context" . JL.key eventKeyTraceId . JL._String
|
||||
for traceIdMaybe $ \traceId -> do
|
||||
freshSpanId <- randomSpanId
|
||||
let parentSpanId =
|
||||
spanIdFromHex . txtToBs
|
||||
=<< e ^? JL.key "trace_context" . JL.key eventKeySpanId . JL._String
|
||||
samplingState =
|
||||
samplingStateFromHeader $
|
||||
e ^? JL.key "trace_context" . JL.key eventKeySamplingState . JL._String
|
||||
pure $ TraceContext traceId freshSpanId parentSpanId samplingState
|
||||
|
||||
eventKeyTraceId, eventKeySpanId, eventKeySamplingState :: J.Key
|
||||
eventKeyTraceId = "trace_id"
|
||||
eventKeySpanId = "span_id"
|
||||
eventKeySamplingState = "sampling_state"
|
||||
|
||||
-- | Perform HTTP request which supports Trace headers using a
|
||||
-- HTTP.Request value
|
||||
--
|
||||
-- TODO REFACTOR:
|
||||
-- - inline 'HTTP.performRequest' so that we can be sure a trace is always logged
|
||||
-- - Inline 'try' here since we always use that at call sites
|
||||
tracedHttpRequest ::
|
||||
MonadTrace m =>
|
||||
-- | http request that needs to be made
|
||||
HTTP.Request ->
|
||||
-- | a function that takes the traced request and executes it
|
||||
(HTTP.Request -> m a) ->
|
||||
m a
|
||||
tracedHttpRequest req f = do
|
||||
let method = bsToTxt (view HTTP.method req)
|
||||
uri = view HTTP.url req
|
||||
trace (method <> " " <> uri) do
|
||||
let reqBytes = HTTP.getReqSize req
|
||||
attachMetadata [("request_body_bytes", fromString (show reqBytes))]
|
||||
ctx <- currentContext
|
||||
f $ over HTTP.headers (injectB3HttpContext ctx <>) req
|
||||
-}
|
||||
|
91
server/src-lib/Hasura/Tracing/Class.hs
Normal file
91
server/src-lib/Hasura/Tracing/Class.hs
Normal file
@ -0,0 +1,91 @@
|
||||
-- | Defines the Tracing API.
|
||||
--
|
||||
-- The 'MonadTrace' class defines the "public API" of this component.
|
||||
module Hasura.Tracing.Class
|
||||
( MonadTrace (..),
|
||||
newTrace,
|
||||
newSpan,
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Monad.Morph
|
||||
import Control.Monad.Trans.Maybe
|
||||
import Hasura.Prelude
|
||||
import Hasura.Tracing.Context
|
||||
import Hasura.Tracing.Sampling
|
||||
import Hasura.Tracing.TraceId
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
-- MonadTrace
|
||||
|
||||
class Monad m => MonadTrace m where
|
||||
-- | Trace the execution of a block of code, attaching a human-readable
|
||||
-- name. This starts a new trace and its corresponding root span, to which
|
||||
-- subsequent spans will be attached.
|
||||
newTraceWith ::
|
||||
TraceContext ->
|
||||
SamplingPolicy ->
|
||||
Text ->
|
||||
m a ->
|
||||
m a
|
||||
|
||||
-- | Starts a new span within the current trace. No-op if there's no current
|
||||
-- trace.
|
||||
--
|
||||
-- TODO: we could rewrite this to start a new trace if there isn't one, using
|
||||
-- the default reporter and policy? This would guarantee that no span is ever
|
||||
-- lost, but would also risk reporting undesired spans.
|
||||
newSpanWith ::
|
||||
SpanId ->
|
||||
Text ->
|
||||
m a ->
|
||||
m a
|
||||
|
||||
-- | Ask for the current tracing context, so that we can provide it to any
|
||||
-- downstream services, e.g. in HTTP headers. Returns 'Nothing' if we're not
|
||||
-- currently tracing anything.
|
||||
currentContext :: m (Maybe TraceContext)
|
||||
|
||||
-- | Log some arbitrary metadata to be attached to the current span, if any.
|
||||
attachMetadata :: TraceMetadata -> m ()
|
||||
|
||||
instance MonadTrace m => MonadTrace (ReaderT r m) where
|
||||
newTraceWith c p n = mapReaderT (newTraceWith c p n)
|
||||
newSpanWith i n = mapReaderT (newSpanWith i n)
|
||||
currentContext = lift currentContext
|
||||
attachMetadata = lift . attachMetadata
|
||||
|
||||
instance MonadTrace m => MonadTrace (StateT e m) where
|
||||
newTraceWith c p n = mapStateT (newTraceWith c p n)
|
||||
newSpanWith i n = mapStateT (newSpanWith i n)
|
||||
currentContext = lift currentContext
|
||||
attachMetadata = lift . attachMetadata
|
||||
|
||||
instance MonadTrace m => MonadTrace (ExceptT e m) where
|
||||
newTraceWith c p n = mapExceptT (newTraceWith c p n)
|
||||
newSpanWith i n = mapExceptT (newSpanWith i n)
|
||||
currentContext = lift currentContext
|
||||
attachMetadata = lift . attachMetadata
|
||||
|
||||
instance MonadTrace m => MonadTrace (MaybeT m) where
|
||||
newTraceWith c p n = mapMaybeT (newTraceWith c p n)
|
||||
newSpanWith i n = mapMaybeT (newSpanWith i n)
|
||||
currentContext = lift currentContext
|
||||
attachMetadata = lift . attachMetadata
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
-- Trace helpers
|
||||
|
||||
-- | Create a new trace using a randomly-generated context.
|
||||
newTrace :: (MonadIO m, MonadTrace m) => SamplingPolicy -> Text -> m a -> m a
|
||||
newTrace policy name body = do
|
||||
traceId <- randomTraceId
|
||||
spanId <- randomSpanId
|
||||
let context = TraceContext traceId spanId Nothing SamplingDefer
|
||||
newTraceWith context policy name body
|
||||
|
||||
-- | Create a new span with a randomly-generated id.
|
||||
newSpan :: (MonadIO m, MonadTrace m) => Text -> m a -> m a
|
||||
newSpan name body = do
|
||||
spanId <- randomSpanId
|
||||
newSpanWith spanId name body
|
37
server/src-lib/Hasura/Tracing/Context.hs
Normal file
37
server/src-lib/Hasura/Tracing/Context.hs
Normal file
@ -0,0 +1,37 @@
|
||||
module Hasura.Tracing.Context
|
||||
( TraceContext (..),
|
||||
TraceMetadata,
|
||||
)
|
||||
where
|
||||
|
||||
import Data.Aeson ((.=))
|
||||
import Data.Aeson qualified as J
|
||||
import Hasura.Prelude
|
||||
import Hasura.Tracing.Sampling
|
||||
import Hasura.Tracing.TraceId
|
||||
|
||||
-- | Any additional human-readable key-value pairs relevant
|
||||
-- to the execution of a block of code.
|
||||
type TraceMetadata = [(Text, Text)]
|
||||
|
||||
-- | A trace context records the current active trace, the active span
|
||||
-- within that trace, and the span's parent, unless the current span
|
||||
-- is the root.
|
||||
data TraceContext = TraceContext
|
||||
{ tcCurrentTrace :: TraceId,
|
||||
tcCurrentSpan :: SpanId,
|
||||
tcCurrentParent :: Maybe SpanId,
|
||||
tcSamplingState :: SamplingState
|
||||
}
|
||||
|
||||
-- Should this be here? This implicitly ties Tracing to the name of fields in HTTP headers.
|
||||
instance J.ToJSON TraceContext where
|
||||
toJSON TraceContext {..} =
|
||||
let idFields =
|
||||
[ "trace_id" .= bsToTxt (traceIdToHex tcCurrentTrace),
|
||||
"span_id" .= bsToTxt (spanIdToHex tcCurrentSpan)
|
||||
]
|
||||
samplingFieldMaybe =
|
||||
samplingStateToHeader @Text tcSamplingState <&> \t ->
|
||||
"sampling_state" .= t
|
||||
in J.object $ idFields ++ maybeToList samplingFieldMaybe
|
141
server/src-lib/Hasura/Tracing/Monad.hs
Normal file
141
server/src-lib/Hasura/Tracing/Monad.hs
Normal file
@ -0,0 +1,141 @@
|
||||
{-# LANGUAGE UndecidableInstances #-}
|
||||
|
||||
module Hasura.Tracing.Monad
|
||||
( TraceT (..),
|
||||
runTraceT,
|
||||
ignoreTraceT,
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Lens
|
||||
import Control.Monad.Catch (MonadCatch, MonadMask, MonadThrow)
|
||||
import Control.Monad.Morph
|
||||
import Control.Monad.Trans.Control
|
||||
import Data.IORef
|
||||
import Hasura.Prelude
|
||||
import Hasura.Tracing.Class
|
||||
import Hasura.Tracing.Context
|
||||
import Hasura.Tracing.Reporter
|
||||
import Hasura.Tracing.Sampling
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
-- TraceT
|
||||
|
||||
-- | TraceT is the standard implementation of 'MonadTrace'. Via a 'Reader', it
|
||||
-- keeps track of the default policy and reporter to use thoughout the stack, as
|
||||
-- well as the current trace.
|
||||
newtype TraceT m a = TraceT (ReaderT (Reporter, Maybe TraceEnv) m a)
|
||||
deriving
|
||||
( Functor,
|
||||
Applicative,
|
||||
Monad,
|
||||
MonadIO,
|
||||
MonadFix,
|
||||
MonadMask,
|
||||
MonadCatch,
|
||||
MonadThrow,
|
||||
MonadState s,
|
||||
MonadError e,
|
||||
MonadBase b,
|
||||
MonadBaseControl b
|
||||
)
|
||||
|
||||
-- | Runs the 'TraceT' monad, by providing the default reporter. This does NOT
|
||||
-- start a trace.
|
||||
--
|
||||
-- TODO: we could change this to always start a trace with a default name? This
|
||||
-- would allow us to guarantee that there is always a current trace, but this
|
||||
-- might not always be the correct behaviour: in practice, we would end up
|
||||
-- generating one that spans the entire lifetime of the engine if 'runTraceT'
|
||||
-- were to be used from 'main'.
|
||||
runTraceT :: Reporter -> TraceT m a -> m a
|
||||
runTraceT reporter (TraceT m) = runReaderT m (reporter, Nothing)
|
||||
|
||||
-- | Run the 'TraceT' monad, but without actually tracing anything: no report
|
||||
-- will be emitted, even if calls to 'newTraceWith' force the trace to be
|
||||
-- sampled.
|
||||
ignoreTraceT :: TraceT m a -> m a
|
||||
ignoreTraceT = runTraceT noReporter
|
||||
|
||||
instance MonadTrans TraceT where
|
||||
lift = TraceT . lift
|
||||
|
||||
-- | Hides the fact that TraceT is a reader to the rest of the stack.
|
||||
instance MonadReader r m => MonadReader r (TraceT m) where
|
||||
ask = lift ask
|
||||
local f (TraceT m) = TraceT $ mapReaderT (local f) m
|
||||
|
||||
instance (MonadIO m, MonadBaseControl IO m) => MonadTrace (TraceT m) where
|
||||
newTraceWith context policy name (TraceT body) = TraceT do
|
||||
reporter <- asks fst
|
||||
samplingDecision <- decideSampling (tcSamplingState context) policy
|
||||
metadataRef <- liftIO $ newIORef []
|
||||
let report = case samplingDecision of
|
||||
SampleNever -> id
|
||||
SampleAlways -> runReporter reporter context name (readIORef metadataRef)
|
||||
updatedContext =
|
||||
context
|
||||
{ tcSamplingState = updateSamplingState samplingDecision (tcSamplingState context)
|
||||
}
|
||||
traceEnv = TraceEnv updatedContext metadataRef samplingDecision
|
||||
report $ local (_2 .~ Just traceEnv) body
|
||||
|
||||
newSpanWith spanId name (TraceT body) = TraceT do
|
||||
(reporter, traceEnv) <- ask
|
||||
case traceEnv of
|
||||
-- we are not currently in a trace: ignore this span
|
||||
Nothing -> body
|
||||
Just env -> case teSamplingDecision env of
|
||||
-- this trace is not sampled: ignore this span
|
||||
SampleNever -> body
|
||||
SampleAlways -> do
|
||||
metadataRef <- liftIO $ newIORef []
|
||||
let subContext =
|
||||
(teTraceContext env)
|
||||
{ tcCurrentSpan = spanId,
|
||||
tcCurrentParent = Just (tcCurrentSpan $ teTraceContext env)
|
||||
}
|
||||
subTraceEnv =
|
||||
env
|
||||
{ teTraceContext = subContext,
|
||||
teMetadataRef = metadataRef
|
||||
}
|
||||
runReporter reporter subContext name (readIORef metadataRef) $
|
||||
local (_2 .~ Just subTraceEnv) body
|
||||
|
||||
currentContext = TraceT $ asks $ fmap teTraceContext . snd
|
||||
|
||||
attachMetadata metadata = TraceT do
|
||||
asks (fmap teMetadataRef . snd) >>= \case
|
||||
Nothing -> pure ()
|
||||
Just ref -> liftIO $ modifyIORef' ref (metadata ++)
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
-- Internal
|
||||
|
||||
-- | Information about the current trace and span.
|
||||
data TraceEnv = TraceEnv
|
||||
{ teTraceContext :: TraceContext,
|
||||
teMetadataRef :: IORef TraceMetadata,
|
||||
teSamplingDecision :: SamplingDecision
|
||||
}
|
||||
|
||||
-- Helper for consistently deciding whether or not to sample a trace based on
|
||||
-- trace context and sampling policy.
|
||||
decideSampling :: MonadIO m => SamplingState -> SamplingPolicy -> m SamplingDecision
|
||||
decideSampling samplingState samplingPolicy =
|
||||
case samplingState of
|
||||
SamplingDefer -> liftIO samplingPolicy
|
||||
SamplingDeny -> pure SampleNever
|
||||
SamplingAccept -> pure SampleAlways
|
||||
|
||||
-- Helper for consistently updating the sampling state when a sampling decision
|
||||
-- is made.
|
||||
updateSamplingState :: SamplingDecision -> SamplingState -> SamplingState
|
||||
updateSamplingState samplingDecision = \case
|
||||
SamplingDefer ->
|
||||
case samplingDecision of
|
||||
SampleNever -> SamplingDefer
|
||||
SampleAlways -> SamplingAccept
|
||||
SamplingDeny -> SamplingDeny
|
||||
SamplingAccept -> SamplingAccept
|
28
server/src-lib/Hasura/Tracing/Reporter.hs
Normal file
28
server/src-lib/Hasura/Tracing/Reporter.hs
Normal file
@ -0,0 +1,28 @@
|
||||
module Hasura.Tracing.Reporter
|
||||
( Reporter (..),
|
||||
noReporter,
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Monad.Trans.Control
|
||||
import Hasura.Prelude
|
||||
import Hasura.Tracing.Context
|
||||
|
||||
newtype Reporter = Reporter
|
||||
{ runReporter ::
|
||||
forall m a.
|
||||
(MonadIO m, MonadBaseControl IO m) =>
|
||||
-- \| Current trace context, providing the trace id and span info.
|
||||
TraceContext ->
|
||||
-- \| Human readable name of this span.
|
||||
Text ->
|
||||
-- \| IO action that retrieves the metadata associated with the
|
||||
-- current span.
|
||||
IO TraceMetadata ->
|
||||
-- \| The monadic action to report
|
||||
m a ->
|
||||
m a
|
||||
}
|
||||
|
||||
noReporter :: Reporter
|
||||
noReporter = Reporter \_ _ _ -> id
|
88
server/src-lib/Hasura/Tracing/Sampling.hs
Normal file
88
server/src-lib/Hasura/Tracing/Sampling.hs
Normal file
@ -0,0 +1,88 @@
|
||||
module Hasura.Tracing.Sampling
|
||||
( -- * SamplingState
|
||||
SamplingState (..),
|
||||
samplingStateToHeader,
|
||||
samplingStateFromHeader,
|
||||
|
||||
-- * SamplingDecision
|
||||
SamplingDecision (..),
|
||||
|
||||
-- * SamplingPolicy
|
||||
SamplingPolicy,
|
||||
sampleNever,
|
||||
sampleAlways,
|
||||
sampleRandomly,
|
||||
sampleOneInN,
|
||||
)
|
||||
where
|
||||
|
||||
import Hasura.Prelude
|
||||
import Refined (Positive, Refined, unrefine)
|
||||
import System.Random.Stateful qualified as Random
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
-- SamplingState
|
||||
|
||||
-- | B3 propagation sampling state.
|
||||
--
|
||||
-- Debug sampling state not represented.
|
||||
data SamplingState = SamplingDefer | SamplingDeny | SamplingAccept
|
||||
|
||||
-- | Convert a sampling state to a value for the X-B3-Sampled header. A return
|
||||
-- value of Nothing indicates that the header should not be set.
|
||||
samplingStateToHeader :: IsString s => SamplingState -> Maybe s
|
||||
samplingStateToHeader = \case
|
||||
SamplingDefer -> Nothing
|
||||
SamplingDeny -> Just "0"
|
||||
SamplingAccept -> Just "1"
|
||||
|
||||
-- | Convert a X-B3-Sampled header value to a sampling state. An input of
|
||||
-- Nothing indicates that the header was not set.
|
||||
samplingStateFromHeader :: (IsString s, Eq s) => Maybe s -> SamplingState
|
||||
samplingStateFromHeader = \case
|
||||
Nothing -> SamplingDefer
|
||||
Just "0" -> SamplingDeny
|
||||
Just "1" -> SamplingAccept
|
||||
Just _ -> SamplingDefer
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
-- SamplingDecision
|
||||
|
||||
-- | A local decision about whether or not to sample spans.
|
||||
data SamplingDecision = SampleNever | SampleAlways
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
-- SamplingPolicy
|
||||
|
||||
-- | An IO action for deciding whether or not to sample a trace.
|
||||
--
|
||||
-- Currently restricted to deny access to the B3 sampling state, but we may
|
||||
-- want to be more flexible in the future.
|
||||
type SamplingPolicy = IO SamplingDecision
|
||||
|
||||
sampleNever :: SamplingPolicy
|
||||
sampleNever = pure SampleNever
|
||||
|
||||
sampleAlways :: SamplingPolicy
|
||||
sampleAlways = pure SampleAlways
|
||||
|
||||
-- @sampleRandomly p@ returns `SampleAlways` with probability @p@ and
|
||||
-- `SampleNever` with probability @1 - p@.
|
||||
sampleRandomly :: Double -> SamplingPolicy
|
||||
sampleRandomly samplingProbability
|
||||
| samplingProbability <= 0 = pure SampleNever
|
||||
| samplingProbability >= 1 = pure SampleAlways
|
||||
| otherwise = do
|
||||
x <- Random.uniformRM (0, 1) Random.globalStdGen
|
||||
pure $ if x < samplingProbability then SampleAlways else SampleNever
|
||||
|
||||
-- Like @sampleRandomly@, but with the probability expressed as the denominator
|
||||
-- N of the fraction 1/N.
|
||||
sampleOneInN :: Refined Positive Int -> SamplingPolicy
|
||||
sampleOneInN denominator
|
||||
| n == 1 = pure SampleAlways
|
||||
| otherwise = do
|
||||
x <- Random.uniformRM (0, n - 1) Random.globalStdGen
|
||||
pure $ if x == 0 then SampleAlways else SampleNever
|
||||
where
|
||||
n = unrefine denominator
|
@ -26,8 +26,7 @@ import Hasura.Prelude
|
||||
import System.Random.Stateful qualified as Random
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
-- * TraceId
|
||||
-- TraceId
|
||||
|
||||
-- | 128-bit trace identifiers.
|
||||
--
|
||||
@ -42,8 +41,8 @@ data TraceId
|
||||
traceIdBytes :: Int
|
||||
traceIdBytes = 16
|
||||
|
||||
randomTraceId :: IO TraceId
|
||||
randomTraceId = do
|
||||
randomTraceId :: MonadIO m => m TraceId
|
||||
randomTraceId = liftIO do
|
||||
(w1, w2) <-
|
||||
flip Random.applyAtomicGen Random.globalStdGen $ \gen0 ->
|
||||
let (!w1, !gen1) = Random.random gen0
|
||||
@ -84,8 +83,7 @@ traceIdToHex :: TraceId -> ByteString
|
||||
traceIdToHex = Base16.encode . traceIdToBytes
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
---- * SpanId
|
||||
-- SpanId
|
||||
|
||||
-- | 64-bit span identifiers
|
||||
--
|
||||
@ -97,8 +95,8 @@ newtype SpanId = SpanId Word64
|
||||
spanIdBytes :: Int
|
||||
spanIdBytes = 8
|
||||
|
||||
randomSpanId :: IO SpanId
|
||||
randomSpanId = do
|
||||
randomSpanId :: MonadIO m => m SpanId
|
||||
randomSpanId = liftIO do
|
||||
w <- Random.uniformM Random.globalStdGen
|
||||
if w == 0
|
||||
then randomSpanId
|
||||
|
49
server/src-lib/Hasura/Tracing/Utils.hs
Normal file
49
server/src-lib/Hasura/Tracing/Utils.hs
Normal file
@ -0,0 +1,49 @@
|
||||
-- | This module contains a collection of utility functions we use with tracing
|
||||
-- throughout the codebase, but that are not a core part of the library. If we
|
||||
-- were to move tracing to a separate library, those functions should be kept
|
||||
-- here in the core engine code.
|
||||
module Hasura.Tracing.Utils
|
||||
( traceHTTPRequest,
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Lens
|
||||
import Data.String
|
||||
import Hasura.Prelude
|
||||
import Hasura.Tracing.Class
|
||||
import Hasura.Tracing.Context
|
||||
import Hasura.Tracing.Sampling
|
||||
import Hasura.Tracing.TraceId
|
||||
import Network.HTTP.Client.Transformable qualified as HTTP
|
||||
|
||||
-- | Wrap the execution of an HTTP request in a span in the current
|
||||
-- trace. Despite its name, this function does not start a new trace, and the
|
||||
-- span will therefore not be recorded if the surrounding context isn't traced
|
||||
-- (see 'spanWith').
|
||||
--
|
||||
-- Additionally, this function adds metadata regarding the request to the
|
||||
-- created span, and injects the trace context into the HTTP header.
|
||||
traceHTTPRequest ::
|
||||
(MonadIO m, MonadTrace m) =>
|
||||
-- | http request that needs to be made
|
||||
HTTP.Request ->
|
||||
-- | a function that takes the traced request and executes it
|
||||
(HTTP.Request -> m a) ->
|
||||
m a
|
||||
traceHTTPRequest req f = do
|
||||
let method = bsToTxt (view HTTP.method req)
|
||||
uri = view HTTP.url req
|
||||
newSpan (method <> " " <> uri) do
|
||||
let reqBytes = HTTP.getReqSize req
|
||||
attachMetadata [("request_body_bytes", fromString (show reqBytes))]
|
||||
headers <- fmap (maybe [] toHeaders) currentContext
|
||||
f $ over HTTP.headers (headers <>) req
|
||||
where
|
||||
toHeaders :: TraceContext -> [HTTP.Header]
|
||||
toHeaders TraceContext {..} =
|
||||
catMaybes
|
||||
[ Just ("X-B3-TraceId", traceIdToHex tcCurrentTrace),
|
||||
Just ("X-B3-SpanId", spanIdToHex tcCurrentSpan),
|
||||
("X-B3-ParentSpanId",) . spanIdToHex <$> tcCurrentParent,
|
||||
("X-B3-Sampled",) <$> samplingStateToHeader tcSamplingState
|
||||
]
|
@ -4,8 +4,6 @@ module Hasura.Server.AuthSpec (spec) where
|
||||
|
||||
import Control.Concurrent.Extended (ForkableMonadIO)
|
||||
import Control.Lens hiding ((.=))
|
||||
import Control.Monad.Trans.Control
|
||||
import Control.Monad.Trans.Managed
|
||||
import Crypto.JOSE.JWK qualified as Jose
|
||||
import Crypto.JWT qualified as JWT
|
||||
import Data.Aeson ((.=))
|
||||
@ -24,7 +22,6 @@ import Hasura.Server.Auth hiding (getUserInfoWithExpTime, processJwt)
|
||||
import Hasura.Server.Auth.JWT hiding (processJwt)
|
||||
import Hasura.Server.Utils
|
||||
import Hasura.Session
|
||||
import Hasura.Tracing qualified as Tracing
|
||||
import Network.HTTP.Client qualified as HTTP
|
||||
import Network.HTTP.Types qualified as HTTP
|
||||
import Test.Hspec
|
||||
@ -626,16 +623,8 @@ mkRoleNameE = fromMaybe (error "fixme") . mkRoleName
|
||||
mkJSONPathE :: Text -> J.JSONPath
|
||||
mkJSONPathE = either (error . T.unpack) id . parseJSONPath
|
||||
|
||||
newtype NoReporter a = NoReporter {runNoReporter :: IO a}
|
||||
deriving newtype (Functor, Applicative, Monad, MonadIO, MonadBase IO, MonadBaseControl IO)
|
||||
|
||||
instance Tracing.HasReporter NoReporter
|
||||
|
||||
instance Tracing.HasReporter (ManagedT NoReporter)
|
||||
|
||||
setupAuthMode' ::
|
||||
( Tracing.HasReporter m,
|
||||
ForkableMonadIO m
|
||||
( ForkableMonadIO m
|
||||
) =>
|
||||
Maybe (HashSet AdminSecretHash) ->
|
||||
Maybe AuthHook ->
|
||||
@ -644,11 +633,7 @@ setupAuthMode' ::
|
||||
m (Either () AuthMode)
|
||||
setupAuthMode' mAdminSecretHash mWebHook jwtSecrets mUnAuthRole = do
|
||||
httpManager <- liftIO $ HTTP.newManager HTTP.defaultManagerSettings
|
||||
-- just throw away the error message for ease of testing:
|
||||
fmap (either (const $ Left ()) Right) $
|
||||
liftIO $
|
||||
runNoReporter $
|
||||
lowerManagedT $
|
||||
fmap (mapLeft $ const ()) $
|
||||
runExceptT $
|
||||
setupAuthMode
|
||||
(fromMaybe Set.empty mAdminSecretHash)
|
||||
|
@ -15,11 +15,12 @@ import Data.Time.Clock (getCurrentTime)
|
||||
import Data.URL.Template
|
||||
import Database.PG.Query qualified as PG
|
||||
import Hasura.App
|
||||
( PGMetadataStorageAppT (..),
|
||||
( PGMetadataStorageAppT,
|
||||
initGlobalCtx,
|
||||
initialiseContext,
|
||||
mkMSSQLSourceResolver,
|
||||
mkPgSourceResolver,
|
||||
runPGMetadataStorageAppT,
|
||||
)
|
||||
import Hasura.Backends.Postgres.Connection.Settings
|
||||
import Hasura.Backends.Postgres.Execute.Types
|
||||
@ -132,7 +133,7 @@ main = do
|
||||
let run :: ExceptT QErr (PGMetadataStorageAppT CacheBuild) a -> IO a
|
||||
run =
|
||||
runExceptT
|
||||
>>> flip runPGMetadataStorageAppT (appCtx, appEnv)
|
||||
>>> runPGMetadataStorageAppT (appCtx, appEnv)
|
||||
>>> runCacheBuild cacheBuildParams
|
||||
>>> runExceptT
|
||||
>=> flip onLeft printErrJExit
|
||||
|
Loading…
Reference in New Issue
Block a user