mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-15 09:22:43 +03:00
server: implement trace sampling
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/7300 GitOrigin-RevId: d96d7fa5aaf0c1e71d1c4c0fa8f0162abce39e18
This commit is contained in:
parent
e1bf220b37
commit
ee78e32c6e
@ -65,6 +65,7 @@ import Hasura.Prelude
|
||||
import Hasura.Server.Init (PostgresConnInfo (..), ServeOptions (..), unsafePort)
|
||||
import Hasura.Server.Metrics (ServerMetricsSpec, createServerMetrics)
|
||||
import Hasura.Server.Prometheus (makeDummyPrometheusMetrics)
|
||||
import Hasura.Tracing (sampleAlways)
|
||||
import Network.Socket qualified as Socket
|
||||
import Network.Wai.Handler.Warp qualified as Warp
|
||||
import System.Metrics qualified as EKG
|
||||
@ -336,6 +337,7 @@ runApp serveOptions = do
|
||||
ekgStore
|
||||
Nothing
|
||||
prometheusMetrics
|
||||
sampleAlways
|
||||
|
||||
-- | Used only for 'runApp' above.
|
||||
data TestMetricsSpec name metricType tags
|
||||
|
@ -28,6 +28,7 @@ import Hasura.Server.Migrate (downgradeCatalog)
|
||||
import Hasura.Server.Prometheus (makeDummyPrometheusMetrics)
|
||||
import Hasura.Server.Version
|
||||
import Hasura.ShutdownLatch
|
||||
import Hasura.Tracing (sampleAlways)
|
||||
import System.Exit qualified as Sys
|
||||
import System.Metrics qualified as EKG
|
||||
import System.Posix.Signals qualified as Signals
|
||||
@ -90,7 +91,7 @@ runApp env (HGEOptions rci metadataDbUrl hgeCmd) = do
|
||||
GC.ourIdleGC logger (seconds 0.3) (seconds 10) (seconds 60)
|
||||
|
||||
flip runPGMetadataStorageAppT (_scMetadataDbPool serveCtx, pgLogger) . lowerManagedT $ do
|
||||
runHGEServer (const $ pure ()) env serveOptions serveCtx initTime Nothing serverMetrics ekgStore Nothing prometheusMetrics
|
||||
runHGEServer (const $ pure ()) env serveOptions serveCtx initTime Nothing serverMetrics ekgStore Nothing prometheusMetrics sampleAlways
|
||||
HCExport -> do
|
||||
GlobalCtx {..} <- initGlobalCtx env metadataDbUrl rci
|
||||
res <- runTxWithMinimalPool _gcMetadataDbConnInfo fetchMetadataFromCatalog
|
||||
|
@ -569,10 +569,11 @@ runHGEServer ::
|
||||
-- | A hook which can be called to indicate when the server is started succesfully
|
||||
Maybe (IO ()) ->
|
||||
PrometheusMetrics ->
|
||||
Tracing.SamplingPolicy ->
|
||||
ManagedT m ()
|
||||
runHGEServer setupHook env serveOptions serveCtx initTime postPollHook serverMetrics ekgStore startupStatusHook prometheusMetrics = do
|
||||
runHGEServer setupHook env serveOptions serveCtx initTime postPollHook serverMetrics ekgStore startupStatusHook prometheusMetrics traceSamplingPolicy = do
|
||||
waiApplication <-
|
||||
mkHGEServer setupHook env serveOptions serveCtx postPollHook serverMetrics ekgStore prometheusMetrics
|
||||
mkHGEServer setupHook env serveOptions serveCtx postPollHook serverMetrics ekgStore prometheusMetrics traceSamplingPolicy
|
||||
|
||||
let logger = _lsLogger $ _scLoggers serveCtx
|
||||
-- `startupStatusHook`: add `Service started successfully` message to config_status
|
||||
@ -660,8 +661,9 @@ mkHGEServer ::
|
||||
ServerMetrics ->
|
||||
EKG.Store EKG.EmptyMetrics ->
|
||||
PrometheusMetrics ->
|
||||
Tracing.SamplingPolicy ->
|
||||
ManagedT m Application
|
||||
mkHGEServer setupHook env ServeOptions {..} ServeCtx {..} postPollHook serverMetrics ekgStore prometheusMetrics = do
|
||||
mkHGEServer setupHook env ServeOptions {..} ServeCtx {..} postPollHook serverMetrics ekgStore prometheusMetrics traceSamplingPolicy = do
|
||||
-- Comment this to enable expensive assertions from "GHC.AssertNF". These
|
||||
-- will log lines to STDOUT containing "not in normal form". In the future we
|
||||
-- could try to integrate this into our tests. For now this is a development
|
||||
@ -732,6 +734,7 @@ mkHGEServer setupHook env ServeOptions {..} ServeCtx {..} postPollHook serverMet
|
||||
soEnableMetadataQueryLogging
|
||||
soDefaultNamingConvention
|
||||
soMetadataDefaults
|
||||
traceSamplingPolicy
|
||||
|
||||
let serverConfigCtx =
|
||||
ServerConfigCtx
|
||||
|
@ -60,7 +60,7 @@ runQueryExplain ::
|
||||
) =>
|
||||
DBStepInfo 'BigQuery ->
|
||||
m EncJSON
|
||||
runQueryExplain (DBStepInfo _ _ _ action) = run $ runTraceTWithReporter noReporter "explain" action
|
||||
runQueryExplain (DBStepInfo _ _ _ action) = run $ ignoreTraceT action
|
||||
|
||||
runMutation ::
|
||||
( MonadError QErr m
|
||||
|
@ -47,7 +47,7 @@ import Hasura.SQL.Types (CollectableType (..))
|
||||
import Hasura.Server.Migrate.Version (SourceCatalogMigrationState (..))
|
||||
import Hasura.Server.Utils qualified as HSU
|
||||
import Hasura.Session (SessionVariable, mkSessionVariable)
|
||||
import Hasura.Tracing (noReporter, runTraceTWithReporter)
|
||||
import Hasura.Tracing (ignoreTraceT)
|
||||
import Language.GraphQL.Draft.Syntax qualified as GQL
|
||||
import Network.HTTP.Client qualified as HTTP
|
||||
import Network.HTTP.Client.Manager
|
||||
@ -117,7 +117,7 @@ resolveBackendInfo' logger = proc (invalidationKeys, optionsMap) -> do
|
||||
m (Either QErr DC.DataConnectorInfo)
|
||||
getDataConnectorCapabilities options@DC.DataConnectorOptions {..} manager = runExceptT do
|
||||
capabilitiesU <-
|
||||
runTraceTWithReporter noReporter "capabilities"
|
||||
ignoreTraceT
|
||||
. flip runAgentClientT (AgentClientContext logger _dcoUri manager Nothing)
|
||||
$ genericClient // API._capabilities
|
||||
|
||||
@ -151,7 +151,7 @@ resolveSourceConfig'
|
||||
validateConfiguration sourceName dataConnectorName _dciConfigSchemaResponse transformedConfig
|
||||
|
||||
schemaResponseU <-
|
||||
runTraceTWithReporter noReporter "resolve source"
|
||||
ignoreTraceT
|
||||
. flip runAgentClientT (AgentClientContext logger _dcoUri manager (DC.sourceTimeoutMicroseconds <$> timeout))
|
||||
$ (genericClient // API._schema) (toTxt sourceName) transformedConfig
|
||||
|
||||
|
@ -82,6 +82,6 @@ runDBQueryExplain' (DBStepInfo _ SourceConfig {..} _ action) =
|
||||
liftEitherM
|
||||
. liftIO
|
||||
. runExceptT
|
||||
. Tracing.runTraceTWithReporter Tracing.noReporter "explain"
|
||||
. Tracing.ignoreTraceT
|
||||
. flip runAgentClientT (AgentClientContext nullLogger _scEndpoint _scManager _scTimeoutMicroseconds)
|
||||
$ action
|
||||
|
@ -61,7 +61,7 @@ runQueryExplain ::
|
||||
) =>
|
||||
DBStepInfo 'MySQL ->
|
||||
m EncJSON
|
||||
runQueryExplain (DBStepInfo _ _ _ action) = run $ runTraceTWithReporter noReporter "explain" action
|
||||
runQueryExplain (DBStepInfo _ _ _ action) = run $ ignoreTraceT action
|
||||
|
||||
mkQueryLog ::
|
||||
GQLReqUnparsed ->
|
||||
|
@ -145,7 +145,7 @@ runPGQueryExplain (DBStepInfo _ sourceConfig _ action) =
|
||||
-- matching instance of BackendExecute. However, Explain doesn't need tracing! Rather than
|
||||
-- introducing a separate "ExplainMonad", we simply use @runTraceTWithReporter@ to remove the
|
||||
-- TraceT.
|
||||
runQueryTx (_pscExecCtx sourceConfig) $ runTraceTWithReporter noReporter "explain" $ action
|
||||
runQueryTx (_pscExecCtx sourceConfig) $ ignoreTraceT action
|
||||
|
||||
mkQueryLog ::
|
||||
GQLReqUnparsed ->
|
||||
|
@ -369,10 +369,8 @@ processEventQueue logger httpMgr getSchemaCache EventEngineCtx {..} LockedEvents
|
||||
tracingCtx <- liftIO (Tracing.extractEventContext (eEvent e))
|
||||
let spanName eti = "Event trigger: " <> unNonEmptyText (unTriggerName (etiName eti))
|
||||
runTraceT =
|
||||
maybe
|
||||
Tracing.runTraceT
|
||||
Tracing.runTraceTInContext
|
||||
tracingCtx
|
||||
(maybe Tracing.runTraceT Tracing.runTraceTInContext tracingCtx)
|
||||
Tracing.sampleAlways
|
||||
|
||||
maintenanceModeVersionEither :: Either QErr (MaintenanceMode MaintenanceModeVersion) <-
|
||||
case maintenanceMode of
|
||||
|
@ -353,7 +353,7 @@ processScheduledEvent ::
|
||||
ScheduledEventType ->
|
||||
m ()
|
||||
processScheduledEvent eventId eventHeaders retryCtx payload webhookUrl type' =
|
||||
Tracing.runTraceT traceNote do
|
||||
Tracing.runTraceT Tracing.sampleAlways traceNote do
|
||||
currentTime <- liftIO getCurrentTime
|
||||
let retryConf = _rctxConf retryCtx
|
||||
scheduledTime = sewpScheduledTime payload
|
||||
|
@ -461,7 +461,7 @@ asyncActionsProcessor env logger getSCFromRef' lockedActionEvents httpManager sl
|
||||
liftIO $ sleep $ milliseconds sleepTime
|
||||
where
|
||||
callHandler :: ActionCache -> ActionLogItem -> m ()
|
||||
callHandler actionCache actionLogItem = Tracing.runTraceT "async actions processor" do
|
||||
callHandler actionCache actionLogItem = Tracing.runTraceT Tracing.sampleAlways "async actions processor" do
|
||||
let ActionLogItem
|
||||
actionId
|
||||
actionName
|
||||
|
@ -116,6 +116,7 @@ createWSServerEnv ::
|
||||
KeepAliveDelay ->
|
||||
ServerMetrics ->
|
||||
PrometheusMetrics ->
|
||||
Tracing.SamplingPolicy ->
|
||||
m WSServerEnv
|
||||
createWSServerEnv
|
||||
logger
|
||||
@ -128,7 +129,8 @@ createWSServerEnv
|
||||
enableAL
|
||||
keepAliveDelay
|
||||
serverMetrics
|
||||
prometheusMetrics = do
|
||||
prometheusMetrics
|
||||
traceSamplingPolicy = do
|
||||
wsServer <- liftIO $ STM.atomically $ WS.createWSServer logger
|
||||
pure $
|
||||
WSServerEnv
|
||||
@ -144,6 +146,7 @@ createWSServerEnv
|
||||
keepAliveDelay
|
||||
serverMetrics
|
||||
prometheusMetrics
|
||||
traceSamplingPolicy
|
||||
|
||||
mkWSActions :: L.Logger L.Hasura -> WSSubProtocol -> WS.WSActions WSConnData
|
||||
mkWSActions logger subProtocol =
|
||||
|
@ -794,7 +794,8 @@ onStart env enabledLogTypes serverEnv wsConn shouldCaptureVariables (StartMsg op
|
||||
enableAL
|
||||
_keepAliveDelay
|
||||
_serverMetrics
|
||||
prometheusMetrics = serverEnv
|
||||
prometheusMetrics
|
||||
_ = serverEnv
|
||||
|
||||
gqlMetrics = pmGraphQLRequestMetrics prometheusMetrics
|
||||
|
||||
@ -1010,7 +1011,7 @@ onMessage ::
|
||||
LBS.ByteString ->
|
||||
WS.WSActions WSConnData ->
|
||||
m ()
|
||||
onMessage env enabledLogTypes authMode serverEnv wsConn msgRaw onMessageActions = Tracing.runTraceT "websocket" do
|
||||
onMessage env enabledLogTypes authMode serverEnv wsConn msgRaw onMessageActions = Tracing.runTraceT (_wseTraceSamplingPolicy serverEnv) "websocket" do
|
||||
case J.eitherDecode msgRaw of
|
||||
Left e -> do
|
||||
let err = ConnErrMsg $ "parsing ClientMessage failed: " <> T.pack e
|
||||
|
@ -3,7 +3,7 @@ module Hasura.GraphQL.Transport.WebSocket.Types
|
||||
WSConn,
|
||||
WSConnData (WSConnData, _wscOpMap, _wscUser),
|
||||
WSConnState (CSInitError, CSInitialised, CSNotInitialised),
|
||||
WSServerEnv (WSServerEnv, _wseCorsPolicy, _wseGCtxMap, _wseHManager, _wseKeepAliveDelay, _wseSubscriptionState, _wseLogger, _wseServer, _wseServerMetrics, _wsePrometheusMetrics),
|
||||
WSServerEnv (..),
|
||||
WsClientState (WsClientState, wscsIpAddress, wscsReqHeaders, wscsTokenExpTime, wscsUserInfo),
|
||||
WsHeaders (WsHeaders, unWsHeaders),
|
||||
SubscriberType (..),
|
||||
@ -28,6 +28,7 @@ import Hasura.Server.Metrics (ServerMetrics (..))
|
||||
import Hasura.Server.Prometheus (PrometheusMetrics (..))
|
||||
import Hasura.Server.Types (ReadOnlyMode (..))
|
||||
import Hasura.Session
|
||||
import Hasura.Tracing qualified as Tracing
|
||||
import Network.HTTP.Client qualified as HTTP
|
||||
import Network.HTTP.Types qualified as HTTP
|
||||
import Network.Wai.Extended qualified as Wai
|
||||
@ -84,7 +85,8 @@ data WSServerEnv = WSServerEnv
|
||||
_wseEnableAllowlist :: !Bool,
|
||||
_wseKeepAliveDelay :: !KeepAliveDelay,
|
||||
_wseServerMetrics :: !ServerMetrics,
|
||||
_wsePrometheusMetrics :: !PrometheusMetrics
|
||||
_wsePrometheusMetrics :: !PrometheusMetrics,
|
||||
_wseTraceSamplingPolicy :: !Tracing.SamplingPolicy
|
||||
}
|
||||
|
||||
data SubscriberType
|
||||
|
@ -380,7 +380,7 @@ runGetSourceTables env GetSourceTables {..} = do
|
||||
validateConfiguration _gstSourceName dcName configSchemaResponse transformedConfig
|
||||
|
||||
schemaResponse <-
|
||||
Tracing.runTraceTWithReporter Tracing.noReporter "resolve source"
|
||||
Tracing.ignoreTraceT
|
||||
. flip Agent.Client.runAgentClientT (Agent.Client.AgentClientContext logger _dcoUri manager (DC.Types.sourceTimeoutMicroseconds <$> timeout))
|
||||
$ schemaGuard =<< (Servant.Client.genericClient // API._schema) (Text.E.toTxt _gstSourceName) transformedConfig
|
||||
|
||||
@ -443,7 +443,7 @@ runGetTableInfo env GetTableInfo {..} = do
|
||||
validateConfiguration _gtiSourceName dcName configSchemaResponse transformedConfig
|
||||
|
||||
schemaResponse <-
|
||||
Tracing.runTraceTWithReporter Tracing.noReporter "resolve source"
|
||||
Tracing.ignoreTraceT
|
||||
. flip Agent.Client.runAgentClientT (Agent.Client.AgentClientContext logger _dcoUri manager (DC.Types.sourceTimeoutMicroseconds <$> timeout))
|
||||
$ schemaGuard =<< (Servant.Client.genericClient // API._schema) (Text.E.toTxt _gtiSourceName) transformedConfig
|
||||
|
||||
|
@ -86,7 +86,7 @@ buildRemoteSchemas env =
|
||||
|
||||
-- TODO continue propagating MonadTrace up calls so that we can get tracing
|
||||
-- for remote schema introspection. This will require modifying CacheBuild.
|
||||
noopTrace = Tracing.runTraceTWithReporter Tracing.noReporter "buildSchemaCacheRule"
|
||||
noopTrace = Tracing.runTraceTWithReporter Tracing.noReporter Tracing.sampleNever "buildSchemaCacheRule"
|
||||
|
||||
mkRemoteSchemaMetadataObject remoteSchema =
|
||||
MetadataObject (MORemoteSchema (_rsmName remoteSchema)) (toJSON remoteSchema)
|
||||
|
@ -136,7 +136,8 @@ data ServerCtx = ServerCtx
|
||||
scEnableReadOnlyMode :: !ReadOnlyMode,
|
||||
scDefaultNamingConvention :: !(Maybe NamingCase),
|
||||
scPrometheusMetrics :: !PrometheusMetrics,
|
||||
scMetadataDefaults :: !MetadataDefaults
|
||||
scMetadataDefaults :: !MetadataDefaults,
|
||||
scTraceSamplingPolicy :: !Tracing.SamplingPolicy
|
||||
}
|
||||
|
||||
data HandlerCtx = HandlerCtx
|
||||
@ -300,11 +301,9 @@ mkSpockAction serverCtx@ServerCtx {..} qErrEncoder qErrModifier apiHandler = do
|
||||
(MonadIO m1, Tracing.HasReporter m1) =>
|
||||
Tracing.TraceT m1 a1 ->
|
||||
m1 a1
|
||||
runTraceT =
|
||||
maybe
|
||||
Tracing.runTraceT
|
||||
Tracing.runTraceTInContext
|
||||
tracingCtx
|
||||
runTraceT = do
|
||||
(maybe Tracing.runTraceT Tracing.runTraceTInContext tracingCtx)
|
||||
scTraceSamplingPolicy
|
||||
(fromString (B8.unpack pathInfo))
|
||||
|
||||
runHandler ::
|
||||
@ -825,6 +824,7 @@ mkWaiApp ::
|
||||
Maybe NamingCase ->
|
||||
-- | default metadata entries
|
||||
MetadataDefaults ->
|
||||
Tracing.SamplingPolicy ->
|
||||
m HasuraApp
|
||||
mkWaiApp
|
||||
setupHook
|
||||
@ -861,7 +861,8 @@ mkWaiApp
|
||||
wsConnInitTimeout
|
||||
enableMetadataQueryLogging
|
||||
defaultNC
|
||||
metadataDefaults = do
|
||||
metadataDefaults
|
||||
traceSamplingPolicy = do
|
||||
let getSchemaCache' = first lastBuiltSchemaCache <$> readSchemaCacheRef schemaCacheRef
|
||||
|
||||
let corsPolicy = mkDefaultCorsPolicy corsCfg
|
||||
@ -882,6 +883,7 @@ mkWaiApp
|
||||
keepAliveDelay
|
||||
serverMetrics
|
||||
prometheusMetrics
|
||||
traceSamplingPolicy
|
||||
|
||||
let serverCtx =
|
||||
ServerCtx
|
||||
@ -906,7 +908,8 @@ mkWaiApp
|
||||
scEnableReadOnlyMode = readOnlyMode,
|
||||
scDefaultNamingConvention = defaultNC,
|
||||
scPrometheusMetrics = prometheusMetrics,
|
||||
scMetadataDefaults = metadataDefaults
|
||||
scMetadataDefaults = metadataDefaults,
|
||||
scTraceSamplingPolicy = traceSamplingPolicy
|
||||
}
|
||||
|
||||
spockApp <- liftWithStateless $ \lowerIO ->
|
||||
|
@ -165,7 +165,10 @@ setupAuthMode adminSecretHashSet mWebHook mJwtSecrets mUnAuthRole httpManager lo
|
||||
-- header), do not start a background thread for refreshing the JWK
|
||||
getJwkFromUrl url = do
|
||||
ref <- liftIO $ newIORef $ JWKSet []
|
||||
maybeExpiry <- hoist lift $ withJwkError $ Tracing.runTraceT "jwk init" $ updateJwkRef logger httpManager url ref
|
||||
maybeExpiry <-
|
||||
hoist lift . withJwkError $
|
||||
Tracing.runTraceT Tracing.sampleAlways "jwk init" $
|
||||
updateJwkRef logger httpManager url ref
|
||||
case maybeExpiry of
|
||||
Nothing -> return ref
|
||||
Just time -> do
|
||||
|
@ -319,7 +319,7 @@ jwkRefreshCtrl ::
|
||||
m void
|
||||
jwkRefreshCtrl logger manager url ref time = do
|
||||
liftIO $ C.sleep time
|
||||
forever $ Tracing.runTraceT "jwk refresh" do
|
||||
forever $ Tracing.runTraceT Tracing.sampleAlways "jwk refresh" do
|
||||
res <- runExceptT $ updateJwkRef logger manager url ref
|
||||
mTime <- onLeft res (const $ logNotice >> return Nothing)
|
||||
-- if can't parse time from header, defaults to 1 min
|
||||
|
@ -7,11 +7,17 @@ module Hasura.Tracing
|
||||
runTraceTWith,
|
||||
runTraceTWithReporter,
|
||||
runTraceTInContext,
|
||||
ignoreTraceT,
|
||||
interpTraceT,
|
||||
TraceContext (..),
|
||||
Reporter (..),
|
||||
noReporter,
|
||||
HasReporter (..),
|
||||
SamplingPolicy,
|
||||
sampleNever,
|
||||
sampleAlways,
|
||||
sampleRandomly,
|
||||
sampleOneInN,
|
||||
TracingMetadata,
|
||||
extractB3HttpContext,
|
||||
tracedHttpRequest,
|
||||
@ -41,6 +47,8 @@ import Hasura.Tracing.TraceId
|
||||
)
|
||||
import Network.HTTP.Client.Manager (HasHttpManagerM (..))
|
||||
import Network.HTTP.Client.Transformable qualified as HTTP
|
||||
import Refined (Positive, Refined, unrefine)
|
||||
import System.Random.Stateful qualified as Random
|
||||
|
||||
-- | Any additional human-readable key-value pairs relevant
|
||||
-- to the execution of a block of code.
|
||||
@ -85,12 +93,97 @@ data TraceContext = TraceContext
|
||||
{ -- | TODO what is this exactly? The topmost span id?
|
||||
tcCurrentTrace :: !TraceId,
|
||||
tcCurrentSpan :: !SpanId,
|
||||
tcCurrentParent :: !(Maybe SpanId)
|
||||
tcCurrentParent :: !(Maybe SpanId),
|
||||
tcSamplingState :: !SamplingState
|
||||
}
|
||||
|
||||
-- | B3 propagation sampling state.
|
||||
--
|
||||
-- Debug sampling state not represented.
|
||||
data SamplingState = SamplingDefer | SamplingDeny | SamplingAccept
|
||||
|
||||
-- | Convert a sampling state to a value for the X-B3-Sampled header. A return
|
||||
-- value of Nothing indicates that the header should not be set.
|
||||
samplingStateToHeader :: IsString s => SamplingState -> Maybe s
|
||||
samplingStateToHeader = \case
|
||||
SamplingDefer -> Nothing
|
||||
SamplingDeny -> Just "0"
|
||||
SamplingAccept -> Just "1"
|
||||
|
||||
-- | Convert a X-B3-Sampled header value to a sampling state. An input of
|
||||
-- Nothing indicates that the header was not set.
|
||||
samplingStateFromHeader :: (IsString s, Eq s) => Maybe s -> SamplingState
|
||||
samplingStateFromHeader = \case
|
||||
Nothing -> SamplingDefer
|
||||
Just "0" -> SamplingDeny
|
||||
Just "1" -> SamplingAccept
|
||||
Just _ -> SamplingDefer
|
||||
|
||||
data TraceTEnv = TraceTEnv
|
||||
{ tteTraceContext :: TraceContext,
|
||||
tteReporter :: Reporter,
|
||||
tteSamplingDecision :: SamplingDecision
|
||||
}
|
||||
|
||||
-- | A local decision about whether or not to sample spans.
|
||||
data SamplingDecision = SampleNever | SampleAlways
|
||||
|
||||
-- | An IO action for deciding whether or not to sample a trace.
|
||||
--
|
||||
-- Currently restricted to deny access to the B3 sampling state, but we may
|
||||
-- want to be more flexible in the future.
|
||||
type SamplingPolicy = IO SamplingDecision
|
||||
|
||||
-- Helper for consistently deciding whether or not to sample a trace based on
|
||||
-- trace context and sampling policy.
|
||||
decideSampling :: SamplingState -> SamplingPolicy -> IO SamplingDecision
|
||||
decideSampling samplingState samplingPolicy =
|
||||
case samplingState of
|
||||
SamplingDefer -> samplingPolicy
|
||||
SamplingDeny -> pure SampleNever
|
||||
SamplingAccept -> pure SampleAlways
|
||||
|
||||
-- Helper for consistently updating the sampling state when a sampling decision
|
||||
-- is made.
|
||||
updateSamplingState :: SamplingDecision -> SamplingState -> SamplingState
|
||||
updateSamplingState samplingDecision = \case
|
||||
SamplingDefer ->
|
||||
case samplingDecision of
|
||||
SampleNever -> SamplingDefer
|
||||
SampleAlways -> SamplingAccept
|
||||
SamplingDeny -> SamplingDeny
|
||||
SamplingAccept -> SamplingAccept
|
||||
|
||||
sampleNever :: SamplingPolicy
|
||||
sampleNever = pure SampleNever
|
||||
|
||||
sampleAlways :: SamplingPolicy
|
||||
sampleAlways = pure SampleAlways
|
||||
|
||||
-- @sampleRandomly p@ returns `SampleAlways` with probability @p@ and
|
||||
-- `SampleNever` with probability @1 - p@.
|
||||
sampleRandomly :: Double -> SamplingPolicy
|
||||
sampleRandomly samplingProbability
|
||||
| samplingProbability <= 0 = pure SampleNever
|
||||
| samplingProbability >= 1 = pure SampleAlways
|
||||
| otherwise = do
|
||||
x <- Random.uniformRM (0, 1) Random.globalStdGen
|
||||
pure $ if x < samplingProbability then SampleAlways else SampleNever
|
||||
|
||||
-- Like @sampleRandomly@, but with the probability expressed as the denominator
|
||||
-- N of the fraction 1/N.
|
||||
sampleOneInN :: Refined Positive Int -> SamplingPolicy
|
||||
sampleOneInN denominator
|
||||
| n == 1 = pure SampleAlways
|
||||
| otherwise = do
|
||||
x <- Random.uniformRM (0, n - 1) Random.globalStdGen
|
||||
pure $ if x == 0 then SampleAlways else SampleNever
|
||||
where
|
||||
n = unrefine denominator
|
||||
|
||||
-- | The 'TraceT' monad transformer adds the ability to keep track of
|
||||
-- the current trace context.
|
||||
newtype TraceT m a = TraceT {unTraceT :: ReaderT (TraceContext, Reporter) (WriterT TracingMetadata m) a}
|
||||
newtype TraceT m a = TraceT {unTraceT :: ReaderT TraceTEnv (WriterT TracingMetadata m) a}
|
||||
deriving (Functor, Applicative, Monad, MonadIO, MonadFix, MonadMask, MonadCatch, MonadThrow, MonadBase b, MonadBaseControl b)
|
||||
|
||||
instance MonadTrans TraceT where
|
||||
@ -113,34 +206,52 @@ instance (HasHttpManagerM m) => HasHttpManagerM (TraceT m) where
|
||||
-- | Run an action in the 'TraceT' monad transformer.
|
||||
-- 'runTraceT' delimits a new trace with its root span, and the arguments
|
||||
-- specify a name and metadata for that span.
|
||||
runTraceT :: (HasReporter m, MonadIO m) => Text -> TraceT m a -> m a
|
||||
runTraceT name tma = do
|
||||
runTraceT :: (HasReporter m, MonadIO m) => SamplingPolicy -> Text -> TraceT m a -> m a
|
||||
runTraceT policy name tma = do
|
||||
rep <- askReporter
|
||||
runTraceTWithReporter rep name tma
|
||||
runTraceTWithReporter rep policy name tma
|
||||
|
||||
runTraceTWith :: MonadIO m => TraceContext -> Reporter -> Text -> TraceT m a -> m a
|
||||
runTraceTWith ctx rep name tma =
|
||||
runReporter rep ctx name $
|
||||
runWriterT $
|
||||
runReaderT (unTraceT tma) (ctx, rep)
|
||||
runTraceTWith ::
|
||||
MonadIO m => TraceContext -> Reporter -> SamplingPolicy -> Text -> TraceT m a -> m a
|
||||
runTraceTWith ctx rep policy name tma = do
|
||||
samplingDecision <- liftIO $ decideSampling (tcSamplingState ctx) policy
|
||||
let subCtx =
|
||||
ctx
|
||||
{ tcSamplingState =
|
||||
updateSamplingState samplingDecision (tcSamplingState ctx)
|
||||
}
|
||||
report =
|
||||
case samplingDecision of
|
||||
SampleNever -> fmap fst
|
||||
SampleAlways -> runReporter rep ctx name
|
||||
report . runWriterT $
|
||||
runReaderT (unTraceT tma) (TraceTEnv subCtx rep samplingDecision)
|
||||
|
||||
-- | Run an action in the 'TraceT' monad transformer in an
|
||||
-- existing context.
|
||||
runTraceTInContext :: (MonadIO m, HasReporter m) => TraceContext -> Text -> TraceT m a -> m a
|
||||
runTraceTInContext ctx name tma = do
|
||||
runTraceTInContext ::
|
||||
(MonadIO m, HasReporter m) => TraceContext -> SamplingPolicy -> Text -> TraceT m a -> m a
|
||||
runTraceTInContext ctx policy name tma = do
|
||||
rep <- askReporter
|
||||
runTraceTWith ctx rep name tma
|
||||
runTraceTWith ctx rep policy name tma
|
||||
|
||||
-- | Run an action in the 'TraceT' monad transformer in an
|
||||
-- existing context.
|
||||
runTraceTWithReporter :: MonadIO m => Reporter -> Text -> TraceT m a -> m a
|
||||
runTraceTWithReporter rep name tma = do
|
||||
runTraceTWithReporter ::
|
||||
MonadIO m => Reporter -> SamplingPolicy -> Text -> TraceT m a -> m a
|
||||
runTraceTWithReporter rep policy name tma = do
|
||||
ctx <-
|
||||
TraceContext
|
||||
<$> liftIO randomTraceId
|
||||
<*> liftIO randomSpanId
|
||||
<*> pure Nothing
|
||||
runTraceTWith ctx rep name tma
|
||||
<*> pure SamplingDefer
|
||||
runTraceTWith ctx rep policy name tma
|
||||
|
||||
-- | Run an action in the 'TraceT' monad transformer while suppressing all
|
||||
-- tracing-related side-effects.
|
||||
ignoreTraceT :: MonadIO m => TraceT m a -> m a
|
||||
ignoreTraceT = runTraceTWithReporter noReporter sampleNever ""
|
||||
|
||||
-- | Monads which support tracing. 'TraceT' is the standard example.
|
||||
class Monad m => MonadTrace m where
|
||||
@ -154,6 +265,9 @@ class Monad m => MonadTrace m where
|
||||
-- | Ask for the current tracing reporter
|
||||
currentReporter :: m Reporter
|
||||
|
||||
-- | Ask for the current sampling decision
|
||||
currentSamplingDecision :: m SamplingDecision
|
||||
|
||||
-- | Log some metadata to be attached to the current span
|
||||
attachMetadata :: TracingMetadata -> m ()
|
||||
|
||||
@ -188,7 +302,8 @@ interpTraceT ::
|
||||
interpTraceT f (TraceT rwma) = do
|
||||
ctx <- currentContext
|
||||
rep <- currentReporter
|
||||
(b, meta) <- f (runWriterT (runReaderT rwma (ctx, rep)))
|
||||
samplingDecision <- currentSamplingDecision
|
||||
(b, meta) <- f (runWriterT (runReaderT rwma (TraceTEnv ctx rep samplingDecision)))
|
||||
attachMetadata meta
|
||||
pure b
|
||||
|
||||
@ -197,18 +312,24 @@ interpTraceT f (TraceT rwma) = do
|
||||
instance MonadIO m => MonadTrace (TraceT m) where
|
||||
-- Note: this implementation is so awkward because we don't want to give the
|
||||
-- derived MonadReader/Writer instances to TraceT
|
||||
trace name ma = TraceT . ReaderT $ \(ctx, rep) -> do
|
||||
spanId <- liftIO randomSpanId
|
||||
let subCtx =
|
||||
ctx
|
||||
{ tcCurrentSpan = spanId,
|
||||
tcCurrentParent = Just (tcCurrentSpan ctx)
|
||||
}
|
||||
lift . runReporter rep subCtx name . runWriterT $ runReaderT (unTraceT ma) (subCtx, rep)
|
||||
trace name ma = TraceT . ReaderT $ \env@(TraceTEnv ctx rep samplingDecision) -> do
|
||||
case samplingDecision of
|
||||
SampleNever -> runReaderT (unTraceT ma) env
|
||||
SampleAlways -> do
|
||||
spanId <- liftIO randomSpanId
|
||||
let subCtx =
|
||||
ctx
|
||||
{ tcCurrentSpan = spanId,
|
||||
tcCurrentParent = Just (tcCurrentSpan ctx)
|
||||
}
|
||||
lift . runReporter rep subCtx name . runWriterT $
|
||||
runReaderT (unTraceT ma) (TraceTEnv subCtx rep samplingDecision)
|
||||
|
||||
currentContext = TraceT (asks fst)
|
||||
currentContext = TraceT (asks tteTraceContext)
|
||||
|
||||
currentReporter = TraceT (asks snd)
|
||||
currentReporter = TraceT (asks tteReporter)
|
||||
|
||||
currentSamplingDecision = TraceT (asks tteSamplingDecision)
|
||||
|
||||
attachMetadata = TraceT . tell
|
||||
|
||||
@ -216,28 +337,33 @@ instance MonadTrace m => MonadTrace (ReaderT r m) where
|
||||
trace = mapReaderT . trace
|
||||
currentContext = lift currentContext
|
||||
currentReporter = lift currentReporter
|
||||
currentSamplingDecision = lift currentSamplingDecision
|
||||
attachMetadata = lift . attachMetadata
|
||||
|
||||
instance MonadTrace m => MonadTrace (StateT e m) where
|
||||
trace = mapStateT . trace
|
||||
currentContext = lift currentContext
|
||||
currentReporter = lift currentReporter
|
||||
currentSamplingDecision = lift currentSamplingDecision
|
||||
attachMetadata = lift . attachMetadata
|
||||
|
||||
instance MonadTrace m => MonadTrace (ExceptT e m) where
|
||||
trace = mapExceptT . trace
|
||||
currentContext = lift currentContext
|
||||
currentReporter = lift currentReporter
|
||||
currentSamplingDecision = lift currentSamplingDecision
|
||||
attachMetadata = lift . attachMetadata
|
||||
|
||||
-- | Inject the trace context as a set of HTTP headers.
|
||||
injectB3HttpContext :: TraceContext -> [HTTP.Header]
|
||||
injectB3HttpContext TraceContext {..} =
|
||||
("X-B3-TraceId", traceIdToHex tcCurrentTrace)
|
||||
: ("X-B3-SpanId", spanIdToHex tcCurrentSpan)
|
||||
: [ ("X-B3-ParentSpanId", spanIdToHex parentID)
|
||||
| parentID <- maybeToList tcCurrentParent
|
||||
]
|
||||
let traceId = (b3HeaderTraceId, traceIdToHex tcCurrentTrace)
|
||||
spanId = (b3HeaderSpanId, spanIdToHex tcCurrentSpan)
|
||||
parentSpanIdMaybe =
|
||||
(,) b3HeaderParentSpanId . spanIdToHex <$> tcCurrentParent
|
||||
samplingStateMaybe =
|
||||
(,) b3HeaderSampled <$> samplingStateToHeader tcSamplingState
|
||||
in traceId : spanId : catMaybes [parentSpanIdMaybe, samplingStateMaybe]
|
||||
|
||||
-- | Extract the trace and parent span headers from a HTTP request
|
||||
-- and create a new 'TraceContext'. The new context will contain
|
||||
@ -245,12 +371,11 @@ injectB3HttpContext TraceContext {..} =
|
||||
-- the immediate parent span.
|
||||
extractB3HttpContext :: [HTTP.Header] -> IO (Maybe TraceContext)
|
||||
extractB3HttpContext hdrs = do
|
||||
freshSpanId <- liftIO randomSpanId
|
||||
-- B3 TraceIds can have a length of either 64 bits (16 hex chars) or 128 bits
|
||||
-- (32 hex chars). For 64-bit TraceIds, we pad them with zeros on the left to
|
||||
-- make them 128 bits long.
|
||||
let traceId =
|
||||
lookup "X-B3-TraceId" hdrs >>= \rawTraceId ->
|
||||
let traceIdMaybe =
|
||||
lookup b3HeaderTraceId hdrs >>= \rawTraceId ->
|
||||
if
|
||||
| Char8.length rawTraceId == 32 ->
|
||||
traceIdFromHex rawTraceId
|
||||
@ -258,30 +383,50 @@ extractB3HttpContext hdrs = do
|
||||
traceIdFromHex $ Char8.replicate 16 '0' <> rawTraceId
|
||||
| otherwise ->
|
||||
Nothing
|
||||
pure $
|
||||
TraceContext
|
||||
<$> traceId
|
||||
<*> pure freshSpanId
|
||||
<*> pure (spanIdFromHex =<< lookup "X-B3-SpanId" hdrs)
|
||||
for traceIdMaybe $ \traceId -> do
|
||||
freshSpanId <- liftIO randomSpanId
|
||||
let parentSpanId = spanIdFromHex =<< lookup b3HeaderSpanId hdrs
|
||||
samplingState = samplingStateFromHeader $ lookup b3HeaderSampled hdrs
|
||||
pure $ TraceContext traceId freshSpanId parentSpanId samplingState
|
||||
|
||||
b3HeaderTraceId, b3HeaderSpanId, b3HeaderParentSpanId, b3HeaderSampled :: IsString s => s
|
||||
b3HeaderTraceId = "X-B3-TraceId"
|
||||
b3HeaderSpanId = "X-B3-SpanId"
|
||||
b3HeaderParentSpanId = "X-B3-ParentSpanId"
|
||||
b3HeaderSampled = "X-B3-Sampled"
|
||||
|
||||
-- | Inject the trace context as a JSON value, appropriate for
|
||||
-- storing in (e.g.) an event trigger payload.
|
||||
injectEventContext :: TraceContext -> J.Value
|
||||
injectEventContext TraceContext {..} =
|
||||
J.object
|
||||
[ "trace_id" J..= bsToTxt (traceIdToHex tcCurrentTrace),
|
||||
"span_id" J..= bsToTxt (spanIdToHex tcCurrentSpan)
|
||||
]
|
||||
let idFields =
|
||||
[ eventKeyTraceId J..= bsToTxt (traceIdToHex tcCurrentTrace),
|
||||
eventKeySpanId J..= bsToTxt (spanIdToHex tcCurrentSpan)
|
||||
]
|
||||
samplingFieldMaybe =
|
||||
(J..=) eventKeySamplingState <$> samplingStateToHeader @Text tcSamplingState
|
||||
in J.object $ idFields ++ maybeToList samplingFieldMaybe
|
||||
|
||||
-- | Extract a trace context from an event trigger payload.
|
||||
extractEventContext :: J.Value -> IO (Maybe TraceContext)
|
||||
extractEventContext e = do
|
||||
freshSpanId <- randomSpanId
|
||||
pure $
|
||||
TraceContext
|
||||
<$> (traceIdFromHex . txtToBs =<< e ^? JL.key "trace_context" . JL.key "trace_id" . JL._String)
|
||||
<*> pure freshSpanId
|
||||
<*> pure (spanIdFromHex . txtToBs =<< e ^? JL.key "trace_context" . JL.key "span_id" . JL._String)
|
||||
let traceIdMaybe =
|
||||
traceIdFromHex . txtToBs
|
||||
=<< e ^? JL.key "trace_context" . JL.key eventKeyTraceId . JL._String
|
||||
for traceIdMaybe $ \traceId -> do
|
||||
freshSpanId <- randomSpanId
|
||||
let parentSpanId =
|
||||
spanIdFromHex . txtToBs
|
||||
=<< e ^? JL.key "trace_context" . JL.key eventKeySpanId . JL._String
|
||||
samplingState =
|
||||
samplingStateFromHeader $
|
||||
e ^? JL.key "trace_context" . JL.key eventKeySamplingState . JL._String
|
||||
pure $ TraceContext traceId freshSpanId parentSpanId samplingState
|
||||
|
||||
eventKeyTraceId, eventKeySpanId, eventKeySamplingState :: J.Key
|
||||
eventKeyTraceId = "trace_id"
|
||||
eventKeySpanId = "span_id"
|
||||
eventKeySamplingState = "sampling_state"
|
||||
|
||||
-- | Perform HTTP request which supports Trace headers using a
|
||||
-- HTTP.Request value
|
||||
|
Loading…
Reference in New Issue
Block a user