Add request timings and count histograms to telemetry. Closes #3552

We upload a set of accumulating timers and counters to track service
time for different types of operations, across several dimensions (e.g.
did we hit the plan cache, was a remote involved, etc.)

Also...

Standardize on DiffTime as a standard duration type, and try to use it
consistently.

See discussion here:
https://github.com/hasura/graphql-engine/pull/3584#pullrequestreview-340679369

It should be possible to overwrite that module so the new threadDelay
sticks per the pattern in #3705 blocked on #3558

Rename the Control.Concurrent.Extended.threadDelay to `sleep` since a
naive use with a literal argument would be very bad!

We catch a bug in 'computeTimeDiff'.

Add convenient 'Read' instances to the time unit utility types. Make
'Second' a newtype to support this.
This commit is contained in:
Brandon Simmons 2020-01-15 20:56:57 -05:00 committed by Alexis King
parent 21efd3aa1e
commit 58ef316118
24 changed files with 614 additions and 187 deletions

View File

@ -196,6 +196,7 @@ library
, Control.Arrow.Trans
, Control.Monad.Stateless
, Control.Monad.Unique
, Data.Time.Clock.Units
-- exposed for tests
, Data.Parser.CacheControl
@ -222,6 +223,8 @@ library
, Hasura.Server.Migrate
, Hasura.Server.Compression
, Hasura.Server.PGDump
-- Exposed for testing:
, Hasura.Server.Telemetry.Counters
, Hasura.RQL.Types
, Hasura.RQL.Types.Run
@ -358,7 +361,6 @@ library
, Data.Sequence.NonEmpty
, Data.TByteString
, Data.Text.Extended
, Data.Time.Clock.Units
, Hasura.SQL.DML
, Hasura.SQL.Error
@ -407,9 +409,11 @@ test-suite graphql-engine-tests
main-is: Main.hs
other-modules:
Data.Parser.CacheControlSpec
Data.TimeSpec
Hasura.IncrementalSpec
Hasura.RQL.MetadataSpec
Hasura.Server.MigrateSpec
Hasura.Server.TelemetrySpec
-- Benchmarks related to caching (e.g. the plan cache).
--

View File

@ -1,5 +1,7 @@
module Control.Concurrent.Extended
( module Control.Concurrent
, sleep
-- * Deprecated
, threadDelay
) where
@ -8,9 +10,15 @@ import Prelude
import qualified Control.Concurrent as Base
import Control.Concurrent hiding (threadDelay)
import Data.Time.Clock (DiffTime)
import Data.Time.Clock.Units (Microseconds (..))
import Data.Time.Clock.Units (Microseconds (..), DiffTime)
-- | Like 'Base.threadDelay', but takes a 'DiffTime' instead of an 'Int'.
threadDelay :: DiffTime -> IO ()
threadDelay = Base.threadDelay . round . Microseconds
-- | Like 'Base.threadDelay', but takes a 'DiffTime' instead of an 'Int' microseconds.
--
-- NOTE: you cannot simply replace e.g. @threadDelay 1000@ with @sleep 1000@ since those literals
-- have different meanings!
sleep :: DiffTime -> IO ()
sleep = Base.threadDelay . round . Microseconds
{-# DEPRECATED threadDelay "Please use `sleep` instead (and read the docs!)" #-}
threadDelay :: Int -> IO ()
threadDelay = Base.threadDelay

View File

@ -25,55 +25,91 @@ You can also go the other way using the constructors rather than the selectors:
0.5
@
Generally, it doesnt make sense to pass these wrappers around or put them inside data structures,
since any function that needs a duration should just accept a 'DiffTime', but theyre useful for
literals and conversions to/from other types. -}
NOTE: the 'Real' and 'Fractional' instances just essentially add or strip the unit label (as
above), so you can't use 'realToFrac' to convert between the units types here. Instead try
'fromUnits' which is less of a foot-gun.
The 'Read' instances for these types mirror the behavior of the 'RealFrac' instance wrt numeric
literals for convenient serialization (e.g. when working with env vars):
@
>>> read "1.2" :: Milliseconds
Milliseconds {milliseconds = 0.0012s}
@
Generally, if you need to pass around a duration between functions you should use 'DiffTime'
directly. However if storing a duration in a type that will be serialized, e.g. one having
a 'ToJSON' instance, it is better to use one of these explicit wrapper types so that it's
obvious what units will be used. -}
module Data.Time.Clock.Units
( Days(..)
, Hours(..)
, Minutes(..)
, Seconds
, seconds
, Seconds(..)
, Milliseconds(..)
, Microseconds(..)
, Nanoseconds(..)
-- * Converting between units
, Duration(..)
, fromUnits
-- * Reexports
-- | We use 'DiffTime' as the standard type for unit-agnostic duration in our
-- code. You'll need to convert to a 'NominalDiffTime' (with 'fromUnits') in
-- order to do anything useful with 'UTCTime' with these durations.
--
-- NOTE: some care must be taken especially when 'NominalDiffTime' interacts
-- with 'UTCTime':
--
-- - a 'DiffTime' or 'NominalDiffTime' my be negative
-- - 'addUTCTime' and 'diffUTCTime' do not attempt to handle leap seconds
, DiffTime
) where
import Prelude
import Control.Arrow (first)
import Data.Aeson
import Data.Hashable
import Data.Proxy
import Data.Time.Clock
import GHC.TypeLits
import Numeric (readFloat)
type Seconds = DiffTime
seconds :: DiffTime -> DiffTime
seconds = id
newtype Seconds = Seconds { seconds :: DiffTime }
-- NOTE: we want Show to give a pastable data structure string, even
-- though Read is custom.
deriving (Duration, Show, Eq, Ord, ToJSON, FromJSON)
deriving (Read, Num, Fractional, Real, Hashable, RealFrac) via (TimeUnit (SecondsP 1))
-- TODO if needed: deriving (ToJSON, FromJSON) via (TimeUnit ..) making sure
-- to copy Aeson instances (with withBoundedScientific), and e.g.
-- toJSON (5 :: Minutes) == Number 5
newtype Days = Days { days :: DiffTime }
deriving (Show, Eq, Ord)
deriving (Num, Fractional, Real, RealFrac) via (TimeUnit (SecondsP 86400))
deriving (Duration, Show, Eq, Ord)
deriving (Read, Num, Fractional, Real, Hashable, RealFrac) via (TimeUnit (SecondsP 86400))
newtype Hours = Hours { hours :: DiffTime }
deriving (Show, Eq, Ord)
deriving (Num, Fractional, Real, RealFrac) via (TimeUnit (SecondsP 3600))
deriving (Duration, Show, Eq, Ord)
deriving (Read, Num, Fractional, Real, Hashable, RealFrac) via (TimeUnit (SecondsP 3600))
newtype Minutes = Minutes { minutes :: DiffTime }
deriving (Show, Eq, Ord)
deriving (Num, Fractional, Real, RealFrac) via (TimeUnit (SecondsP 60))
deriving (Duration, Show, Eq, Ord)
deriving (Read, Num, Fractional, Real, Hashable, RealFrac) via (TimeUnit (SecondsP 60))
newtype Milliseconds = Milliseconds { milliseconds :: DiffTime }
deriving (Show, Eq, Ord)
deriving (Num, Fractional, Real, RealFrac) via (TimeUnit 1000000000)
deriving (Duration, Show, Eq, Ord)
deriving (Read, Num, Fractional, Real, Hashable, RealFrac) via (TimeUnit 1000000000)
newtype Microseconds = Microseconds { microseconds :: DiffTime }
deriving (Show, Eq, Ord)
deriving (Num, Fractional, Real, RealFrac) via (TimeUnit 1000000)
deriving (Duration, Show, Eq, Ord)
deriving (Read, Num, Fractional, Real, Hashable, RealFrac) via (TimeUnit 1000000)
newtype Nanoseconds = Nanoseconds { nanoseconds :: DiffTime }
deriving (Show, Eq, Ord)
deriving (Num, Fractional, Real, RealFrac) via (TimeUnit 1000)
deriving (Duration, Show, Eq, Ord)
deriving (Read, Num, Fractional, Real, Hashable, RealFrac) via (TimeUnit 1000)
-- Internal for deriving via
newtype TimeUnit (picosPerUnit :: Nat) = TimeUnit DiffTime
deriving (Show, Eq, Ord)
@ -92,6 +128,9 @@ instance (KnownNat picosPerUnit) => Num (TimeUnit picosPerUnit) where
signum (TimeUnit a) = TimeUnit $ signum a
fromInteger a = TimeUnit . picosecondsToDiffTime $ a * natNum @picosPerUnit
instance (KnownNat picosPerUnit) => Read (TimeUnit picosPerUnit) where
readsPrec _ = map (first fromRational) . readFloat
instance (KnownNat picosPerUnit) => Fractional (TimeUnit picosPerUnit) where
TimeUnit a / TimeUnit b = TimeUnit . picosecondsToDiffTime $
diffTimeToPicoseconds a * natNum @picosPerUnit `div` diffTimeToPicoseconds b
@ -107,3 +146,26 @@ instance (KnownNat picosPerUnit) => RealFrac (TimeUnit picosPerUnit) where
round = round . toRational
ceiling = ceiling . toRational
floor = floor . toRational
-- we can ignore unit:
instance Hashable (TimeUnit a) where
hashWithSalt salt (TimeUnit dt) = hashWithSalt salt $
(realToFrac :: DiffTime -> Double) dt
-- | Duration types isomorphic to 'DiffTime', powering 'fromUnits'.
class Duration d where
fromDiffTime :: DiffTime -> d
toDiffTime :: d -> DiffTime
instance Duration DiffTime where
fromDiffTime = id
toDiffTime = id
instance Duration NominalDiffTime where
fromDiffTime = realToFrac
toDiffTime = realToFrac
-- | Safe conversion between duration units.
fromUnits :: (Duration x, Duration y)=> x -> y
fromUnits = fromDiffTime . toDiffTime

View File

@ -241,12 +241,13 @@ runHGEServer ServeOptions{..} InitCtx{..} initTime = do
$ Warp.defaultSettings
maxEvThrds <- liftIO $ getFromEnv defaultMaxEventThreads "HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE"
evFetchMilliSec <- liftIO $ getFromEnv defaultFetchIntervalMilliSec "HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL"
fetchI <- fmap milliseconds $ liftIO $
getFromEnv defaultFetchIntervalMilliSec "HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL"
logEnvHeaders <- liftIO $ getFromEnv False "LOG_HEADERS_FROM_ENV"
-- prepare event triggers data
prepareEvents _icPgPool logger
eventEngineCtx <- liftIO $ atomically $ initEventEngineCtx maxEvThrds evFetchMilliSec
eventEngineCtx <- liftIO $ atomically $ initEventEngineCtx maxEvThrds fetchI
unLogger logger $ mkGenericStrLog LevelInfo "event_triggers" "starting workers"
void $ liftIO $ C.forkIO $ processEventQueue logger logEnvHeaders
_icHttpManager _icPgPool (getSCFromRef cacheRef) eventEngineCtx
@ -254,6 +255,7 @@ runHGEServer ServeOptions{..} InitCtx{..} initTime = do
-- start a background thread to check for updates
void $ liftIO $ C.forkIO $ checkForUpdates loggerCtx _icHttpManager
-- TODO async/immortal:
-- start a background thread for telemetry
when soEnableTelemetry $ do
unLogger logger $ mkGenericStrLog LevelInfo "telemetry" telemetryNotice

View File

@ -7,7 +7,7 @@ module Hasura.Events.Lib
, Event(..)
) where
import Control.Concurrent (threadDelay)
import Control.Concurrent.Extended (sleep)
import Control.Concurrent.Async (async, waitAny)
import Control.Concurrent.STM.TVar
import Control.Exception (try)
@ -149,19 +149,19 @@ data EventEngineCtx
{ _eeCtxEventQueue :: TQ.TQueue Event
, _eeCtxEventThreads :: TVar Int
, _eeCtxMaxEventThreads :: Int
, _eeCtxFetchIntervalMilliSec :: Int
, _eeCtxFetchInterval :: DiffTime
}
defaultMaxEventThreads :: Int
defaultMaxEventThreads = 100
defaultFetchIntervalMilliSec :: Int
defaultFetchIntervalMilliSec :: Milliseconds
defaultFetchIntervalMilliSec = 1000
retryAfterHeader :: CI.CI T.Text
retryAfterHeader = "Retry-After"
initEventEngineCtx :: Int -> Int -> STM EventEngineCtx
initEventEngineCtx :: Int -> DiffTime -> STM EventEngineCtx
initEventEngineCtx maxT fetchI = do
q <- TQ.newTQueue
c <- newTVar 0
@ -185,7 +185,7 @@ pushEvents logger pool eectx = forever $ do
case eventsOrError of
Left err -> L.unLogger logger $ EventInternalErr err
Right events -> atomically $ mapM_ (TQ.writeTQueue q) events
threadDelay (fetchI * 1000)
sleep fetchI
consumeEvents
:: (HasVersion) => L.Logger L.Hasura -> LogEnvHeaders -> HTTP.Manager -> Q.PGPool -> IO SchemaCache
@ -285,7 +285,7 @@ retryOrSetError :: Event -> RetryConf -> HTTPErr -> Q.TxE QErr ()
retryOrSetError e retryConf err = do
let mretryHeader = getRetryAfterHeaderFromError err
tries = eTries e
mretryHeaderSeconds = parseRetryHeader mretryHeader
mretryHeaderSeconds = mretryHeader >>= parseRetryHeader
triesExhausted = tries >= rcNumRetries retryConf
noRetryHeader = isNothing mretryHeaderSeconds
-- current_try = tries + 1 , allowed_total_tries = rcNumRetries retryConf + 1
@ -308,12 +308,8 @@ retryOrSetError e retryConf err = do
in case mHeader of
Just (HeaderConf _ (HVValue value)) -> Just value
_ -> Nothing
parseRetryHeader Nothing = Nothing
parseRetryHeader (Just hValue)
= let seconds = readMaybe $ T.unpack hValue
in case seconds of
Nothing -> Nothing
Just sec -> if sec > 0 then Just sec else Nothing
parseRetryHeader = mfilter (> 0) . readMaybe . T.unpack
encodeHeader :: EventHeaderInfo -> HTTP.Header
encodeHeader (EventHeaderInfo hconf cache) =

View File

@ -57,6 +57,7 @@ import qualified Hasura.GraphQL.Resolve as GR
import qualified Hasura.GraphQL.Validate as VQ
import qualified Hasura.GraphQL.Validate.Types as VT
import qualified Hasura.Logging as L
import qualified Hasura.Server.Telemetry.Counters as Telem
-- The current execution plan of a graphql operation, it is
-- currently, either local pg execution or a remote execution
@ -184,7 +185,7 @@ getResolvedExecPlan
-> SchemaCache
-> SchemaCacheVer
-> GQLReqUnparsed
-> m ExecPlanResolved
-> m (Telem.CacheHit, ExecPlanResolved)
getResolvedExecPlan pgExecCtx planCache userInfo sqlGenCtx
enableAL sc scVer reqUnparsed = do
planM <- liftIO $ EP.getPlan scVer (userRole userInfo)
@ -192,13 +193,13 @@ getResolvedExecPlan pgExecCtx planCache userInfo sqlGenCtx
let usrVars = userVars userInfo
case planM of
-- plans are only for queries and subscriptions
Just plan -> GExPHasura <$> case plan of
Just plan -> (Telem.Hit,) . GExPHasura <$> case plan of
EP.RPQuery queryPlan -> do
(tx, genSql) <- EQ.queryOpFromPlan usrVars queryVars queryPlan
return $ ExOpQuery tx (Just genSql)
EP.RPSubs subsPlan ->
ExOpSubs <$> EL.reuseLiveQueryPlan pgExecCtx usrVars queryVars subsPlan
Nothing -> noExistingPlan
Nothing -> (Telem.Miss,) <$> noExistingPlan
where
GQLReq opNameM queryStr queryVars = reqUnparsed
addPlanToCache plan =
@ -357,7 +358,8 @@ execRemoteGQ
-> GQLReqUnparsed
-> RemoteSchemaInfo
-> G.TypedOperationDefinition
-> m (HttpResponse EncJSON)
-> m (DiffTime, HttpResponse EncJSON)
-- ^ Also returns time spent in http request, for telemetry.
execRemoteGQ reqId userInfo reqHdrs q rsi opDef = do
execCtx <- ask
let logger = _ecxLogger execCtx
@ -387,11 +389,12 @@ execRemoteGQ reqId userInfo reqHdrs q rsi opDef = do
}
L.unLogger logger $ QueryLog q Nothing reqId
res <- liftIO $ try $ HTTP.httpLbs req manager
(time, res) <- withElapsedTime $ liftIO $ try $ HTTP.httpLbs req manager
resp <- either httpThrow return res
let cookieHdrs = getCookieHdr (resp ^.. Wreq.responseHeader "Set-Cookie")
respHdrs = Just $ mkRespHeaders cookieHdrs
return $ HttpResponse (encJFromLBS $ resp ^. Wreq.responseBody) respHdrs
!httpResp = HttpResponse (encJFromLBS $ resp ^. Wreq.responseBody) respHdrs
return (time, httpResp)
where
RemoteSchemaInfo url hdrConf fwdClientHdrs timeout = rsi

View File

@ -9,9 +9,6 @@ import Hasura.Prelude
import qualified Data.Aeson as J
import Data.Time.Clock (DiffTime)
import Data.Time.Clock.Units (seconds)
data LiveQueriesOptions
= LiveQueriesOptions
{ _lqoBatchSize :: !BatchSize
@ -21,7 +18,7 @@ data LiveQueriesOptions
mkLiveQueriesOptions :: Maybe BatchSize -> Maybe RefetchInterval -> LiveQueriesOptions
mkLiveQueriesOptions batchSize refetchInterval = LiveQueriesOptions
{ _lqoBatchSize = fromMaybe (BatchSize 100) batchSize
, _lqoRefetchInterval = fromMaybe (RefetchInterval $ seconds 1) refetchInterval
, _lqoRefetchInterval = fromMaybe (RefetchInterval 1) refetchInterval
}
instance J.ToJSON LiveQueriesOptions where
@ -33,5 +30,7 @@ instance J.ToJSON LiveQueriesOptions where
newtype BatchSize = BatchSize { unBatchSize :: Int }
deriving (Show, Eq, J.ToJSON)
-- TODO this is treated as milliseconds in fromEnv and as seconds in ToJSON.
-- ideally this would have e.g. ... unRefetchInterval :: Milliseconds
newtype RefetchInterval = RefetchInterval { unRefetchInterval :: DiffTime }
deriving (Show, Eq, J.ToJSON)

View File

@ -71,13 +71,14 @@ data Subscriber
-- | live query onChange metadata, used for adding more extra analytics data
data LiveQueryMetadata
= LiveQueryMetadata
{ _lqmExecutionTime :: !Clock.NominalDiffTime
{ _lqmExecutionTime :: !Clock.DiffTime
-- ^ Time spent waiting on the generated query to execute on postgres or the remote.
}
data LiveQueryResponse
= LiveQueryResponse
{ _lqrPayload :: !BL.ByteString
, _lqrExecutionTime :: !Clock.NominalDiffTime
, _lqrExecutionTime :: !Clock.DiffTime
}
type LGQResponse = GQResult LiveQueryResponse
@ -324,7 +325,7 @@ pollQuery metrics batchSize pgExecCtx pgQuery handler = do
queryFinish <- Clock.getCurrentTime
let dt = Clock.diffUTCTime queryFinish queryInit
queryTime = realToFrac dt
lqMeta = LiveQueryMetadata dt
lqMeta = LiveQueryMetadata $ fromUnits dt
operations = getCohortOperations cohortSnapshotMap lqMeta mxRes
Metrics.add (_rmQuery metrics) queryTime

View File

@ -18,7 +18,7 @@ import qualified Control.Concurrent.STM as STM
import qualified Data.Aeson.Extended as J
import qualified StmContainers.Map as STMMap
import Control.Concurrent.Extended (threadDelay)
import Control.Concurrent.Extended (sleep)
import qualified Hasura.GraphQL.Execute.LiveQuery.TMap as TMap
@ -85,7 +85,7 @@ addLiveQuery lqState plan onResultAction = do
metrics <- initRefetchMetrics
threadRef <- A.async $ forever $ do
pollQuery metrics batchSize pgExecCtx query handler
threadDelay $ unRefetchInterval refetchInterval
sleep $ unRefetchInterval refetchInterval
STM.atomically $ STM.putTMVar (_pIOState handler) (PollerIOState threadRef metrics)
pure $ LiveQueryId handlerId cohortKey sinkId

View File

@ -1,3 +1,4 @@
{-# LANGUAGE RecordWildCards #-}
module Hasura.GraphQL.Transport.HTTP
( runGQ
, runGQBatched
@ -17,6 +18,8 @@ import Hasura.Server.Version (HasVersion)
import qualified Database.PG.Query as Q
import qualified Hasura.GraphQL.Execute as E
import qualified Hasura.Logging as L
import qualified Hasura.Server.Telemetry.Counters as Telem
import qualified Language.GraphQL.Draft.Syntax as G
runGQ
:: ( HasVersion
@ -30,14 +33,25 @@ runGQ
-> GQLReq GQLQueryText
-> m (HttpResponse EncJSON)
runGQ reqId userInfo reqHdrs req = do
E.ExecutionCtx _ sqlGenCtx pgExecCtx planCache sc scVer _ enableAL <- ask
execPlan <- E.getResolvedExecPlan pgExecCtx planCache
userInfo sqlGenCtx enableAL sc scVer req
case execPlan of
E.GExPHasura resolvedOp ->
flip HttpResponse Nothing <$> runHasuraGQ reqId req userInfo resolvedOp
E.GExPRemote rsi opDef ->
E.execRemoteGQ reqId userInfo reqHdrs req rsi opDef
-- The response and misc telemetry data:
let telemTransport = Telem.HTTP
(telemTimeTot_DT, (telemCacheHit, telemLocality, (telemTimeIO_DT, telemQueryType, !resp))) <- withElapsedTime $ do
E.ExecutionCtx _ sqlGenCtx pgExecCtx planCache sc scVer _ enableAL <- ask
(telemCacheHit, execPlan) <- E.getResolvedExecPlan pgExecCtx planCache
userInfo sqlGenCtx enableAL sc scVer req
case execPlan of
E.GExPHasura resolvedOp -> do
(telemTimeIO, telemQueryType, resp) <- runHasuraGQ reqId req userInfo resolvedOp
return (telemCacheHit, Telem.Local, (telemTimeIO, telemQueryType, HttpResponse resp Nothing))
E.GExPRemote rsi opDef -> do
let telemQueryType | G._todType opDef == G.OperationTypeMutation = Telem.Mutation
| otherwise = Telem.Query
(telemTimeIO, resp) <- E.execRemoteGQ reqId userInfo reqHdrs req rsi opDef
return (telemCacheHit, Telem.Remote, (telemTimeIO, telemQueryType, resp))
let telemTimeIO = fromUnits telemTimeIO_DT
telemTimeTot = fromUnits telemTimeTot_DT
Telem.recordTimingMetric Telem.RequestDimensions{..} Telem.RequestTimings{..}
return resp
runGQBatched
:: ( HasVersion
@ -75,10 +89,12 @@ runHasuraGQ
-> GQLReqUnparsed
-> UserInfo
-> E.ExecOp
-> m EncJSON
-> m (DiffTime, Telem.QueryType, EncJSON)
-- ^ Also return 'Mutation' when the operation was a mutation, and the time
-- spent in the PG query; for telemetry.
runHasuraGQ reqId query userInfo resolvedOp = do
E.ExecutionCtx logger _ pgExecCtx _ _ _ _ _ <- ask
respE <- liftIO $ runExceptT $ case resolvedOp of
(telemTimeIO, respE) <- withElapsedTime $ liftIO $ runExceptT $ case resolvedOp of
E.ExOpQuery tx genSql -> do
-- log the generated SQL and the graphql query
L.unLogger logger $ QueryLog query genSql reqId
@ -91,4 +107,6 @@ runHasuraGQ reqId query userInfo resolvedOp = do
throw400 UnexpectedPayload
"subscriptions are not supported over HTTP, use websockets instead"
resp <- liftEither respE
return $ encodeGQResp $ GQSuccess $ encJToLBS resp
let !json = encodeGQResp $ GQSuccess $ encJToLBS resp
telemQueryType = case resolvedOp of E.ExOpMutation{} -> Telem.Mutation ; _ -> Telem.Query
return (telemTimeIO, telemQueryType, json)

View File

@ -1,4 +1,5 @@
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
module Hasura.GraphQL.Transport.WebSocket
( createWSServerApp
@ -27,7 +28,7 @@ import qualified Network.HTTP.Types as H
import qualified Network.WebSockets as WS
import qualified StmContainers.Map as STMMap
import Control.Concurrent (threadDelay)
import Control.Concurrent.Extended (sleep)
import qualified ListT
import Hasura.EncJSON
@ -41,8 +42,7 @@ import Hasura.Server.Auth (AuthMode, UserAuth
resolveUserInfo)
import Hasura.Server.Context
import Hasura.Server.Cors
import Hasura.Server.Utils (RequestId, diffTimeToMicro,
getRequestId, withElapsedTime)
import Hasura.Server.Utils (RequestId, getRequestId)
import Hasura.Server.Version (HasVersion)
import qualified Hasura.GraphQL.Execute as E
@ -50,6 +50,7 @@ import qualified Hasura.GraphQL.Execute.LiveQuery as LQ
import qualified Hasura.GraphQL.Execute.LiveQuery.Poll as LQ
import qualified Hasura.GraphQL.Transport.WebSocket.Server as WS
import qualified Hasura.Logging as L
import qualified Hasura.Server.Telemetry.Counters as Telem
type OperationMap
@ -199,7 +200,7 @@ onConn (L.Logger logger) corsPolicy wsId requestHead = do
where
keepAliveAction wsConn = liftIO $ forever $ do
sendMsg wsConn SMConnKeepAlive
threadDelay $ 5 * 1000 * 1000
sleep $ seconds 5
jwtExpiryHandler wsConn = do
expTime <- liftIO $ STM.atomically $ do
@ -210,7 +211,7 @@ onConn (L.Logger logger) corsPolicy wsId requestHead = do
CSInitialised _ expTimeM _ ->
maybe STM.retry return expTimeM
currTime <- TC.getCurrentTime
threadDelay $ diffTimeToMicro $ TC.diffUTCTime expTime currTime
sleep $ fromUnits $ TC.diffUTCTime expTime currTime
accept hdrs errType = do
logger $ mkWsInfoLog Nothing (WsConnInfo wsId Nothing Nothing) EAccepted
@ -273,7 +274,7 @@ onConn (L.Logger logger) corsPolicy wsId requestHead = do
onStart :: HasVersion => WSServerEnv -> WSConn -> StartMsg -> IO ()
onStart serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
timerTot <- startTimer
opM <- liftIO $ STM.atomically $ STMMap.lookup opId opMap
when (isJust opM) $ withComplete $ sendStartErr $
@ -293,23 +294,26 @@ onStart serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
(sc, scVer) <- liftIO getSchemaCache
execPlanE <- runExceptT $ E.getResolvedExecPlan pgExecCtx
planCache userInfo sqlGenCtx enableAL sc scVer q
execPlan <- either (withComplete . preExecErr requestId) return execPlanE
(telemCacheHit, execPlan) <- either (withComplete . preExecErr requestId) return execPlanE
let execCtx = E.ExecutionCtx logger sqlGenCtx pgExecCtx
planCache sc scVer httpMgr enableAL
case execPlan of
E.GExPHasura resolvedOp ->
runHasuraGQ requestId q userInfo resolvedOp
runHasuraGQ timerTot telemCacheHit requestId q userInfo resolvedOp
E.GExPRemote rsi opDef ->
runRemoteGQ execCtx requestId userInfo reqHdrs opDef rsi
runRemoteGQ timerTot telemCacheHit execCtx requestId userInfo reqHdrs opDef rsi
where
runHasuraGQ :: RequestId -> GQLReqUnparsed -> UserInfo -> E.ExecOp
telemTransport = Telem.HTTP
runHasuraGQ :: ExceptT () IO DiffTime
-> Telem.CacheHit -> RequestId -> GQLReqUnparsed -> UserInfo -> E.ExecOp
-> ExceptT () IO ()
runHasuraGQ reqId query userInfo = \case
runHasuraGQ timerTot telemCacheHit reqId query userInfo = \case
E.ExOpQuery opTx genSql ->
execQueryOrMut reqId query genSql $ runLazyTx' pgExecCtx opTx
execQueryOrMut Telem.Query genSql $ runLazyTx' pgExecCtx opTx
E.ExOpMutation opTx ->
execQueryOrMut reqId query Nothing $
execQueryOrMut Telem.Mutation Nothing $
runLazyTx pgExecCtx Q.ReadWrite $ withUserInfo userInfo opTx
E.ExOpSubs lqOp -> do
-- log the graphql query
@ -319,28 +323,47 @@ onStart serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
STMMap.insert (lqId, _grOperationName q) opId opMap
logOpEv ODStarted (Just reqId)
execQueryOrMut reqId query genSql action = do
logOpEv ODStarted (Just reqId)
-- log the generated SQL and the graphql query
L.unLogger logger $ QueryLog query genSql reqId
(dt, resp) <- withElapsedTime $ liftIO $ runExceptT action
let lqMeta = LQ.LiveQueryMetadata dt
either (postExecErr reqId) (`sendSuccResp` lqMeta) resp
sendCompleted (Just reqId)
where
telemLocality = Telem.Local
execQueryOrMut telemQueryType genSql action = do
logOpEv ODStarted (Just reqId)
-- log the generated SQL and the graphql query
L.unLogger logger $ QueryLog query genSql reqId
(withElapsedTime $ liftIO $ runExceptT action) >>= \case
(_, Left err) -> postExecErr reqId err
(telemTimeIO_DT, Right encJson) -> do
-- Telemetry. NOTE: don't time network IO:
telemTimeTot <- Seconds <$> timerTot
sendSuccResp encJson $ LQ.LiveQueryMetadata telemTimeIO_DT
let telemTimeIO = fromUnits telemTimeIO_DT
Telem.recordTimingMetric Telem.RequestDimensions{..} Telem.RequestTimings{..}
runRemoteGQ :: E.ExecutionCtx -> RequestId -> UserInfo -> [H.Header]
sendCompleted (Just reqId)
runRemoteGQ :: ExceptT () IO DiffTime
-> Telem.CacheHit -> E.ExecutionCtx -> RequestId -> UserInfo -> [H.Header]
-> G.TypedOperationDefinition -> RemoteSchemaInfo
-> ExceptT () IO ()
runRemoteGQ execCtx reqId userInfo reqHdrs opDef rsi = do
when (G._todType opDef == G.OperationTypeSubscription) $
withComplete $ preExecErr reqId $
err400 NotSupported "subscription to remote server is not supported"
runRemoteGQ timerTot telemCacheHit execCtx reqId userInfo reqHdrs opDef rsi = do
let telemLocality = Telem.Remote
telemQueryType <- case G._todType opDef of
G.OperationTypeSubscription ->
withComplete $ preExecErr reqId $
err400 NotSupported "subscription to remote server is not supported"
G.OperationTypeMutation -> return Telem.Mutation
G.OperationTypeQuery -> return Telem.Query
-- if it's not a subscription, use HTTP to execute the query on the remote
(dt, resp) <- withElapsedTime $ runExceptT $ flip runReaderT execCtx $
E.execRemoteGQ reqId userInfo reqHdrs q rsi opDef
let ocMeta = LQ.LiveQueryMetadata dt
either (postExecErr reqId) (\val -> sendRemoteResp reqId (_hrBody val) ocMeta) resp
(runExceptT $ flip runReaderT execCtx $
E.execRemoteGQ reqId userInfo reqHdrs q rsi opDef) >>= \case
Left err -> postExecErr reqId err
Right (telemTimeIO_DT, !val) -> do
-- Telemetry. NOTE: don't time network IO:
telemTimeTot <- Seconds <$> timerTot
sendRemoteResp reqId (_hrBody val) $ LQ.LiveQueryMetadata telemTimeIO_DT
let telemTimeIO = fromUnits telemTimeIO_DT
Telem.recordTimingMetric Telem.RequestDimensions{..} Telem.RequestTimings{..}
sendCompleted (Just reqId)
sendRemoteResp reqId resp meta =

View File

@ -11,6 +11,10 @@ module Hasura.Prelude
, spanMaybeM
, findWithIndex
, mapFromL
-- * Measuring and working with moments and durations
, withElapsedTime
, startTimer
, module Data.Time.Clock.Units
) where
import Control.Applicative as M (Alternative (..))
@ -54,12 +58,14 @@ import GHC.Generics as M (Generic)
import Prelude as M hiding (fail, init, lookup)
import Test.QuickCheck.Arbitrary.Generic as M
import Text.Read as M (readEither, readMaybe)
import Data.Time.Clock.Units
import qualified Data.ByteString as B
import qualified Data.HashMap.Strict as Map
import qualified Data.Text as T
import qualified Data.Text.Encoding as TE
import qualified Data.Text.Encoding.Error as TE
import qualified GHC.Clock as Clock
import qualified Test.QuickCheck as QC
alphaNumerics :: String
@ -106,3 +112,33 @@ findWithIndex p l = do
-- TODO: Move to Data.HashMap.Strict.Extended; rename to fromListWith?
mapFromL :: (Eq k, Hashable k) => (a -> k) -> [a] -> Map.HashMap k a
mapFromL f = Map.fromList . map (\v -> (f v, v))
-- | Time an IO action, returning the time with microsecond precision. The
-- result of the input action will be evaluated to WHNF.
--
-- The result 'DiffTime' is guarenteed to be >= 0.
withElapsedTime :: MonadIO m=> m a -> m (DiffTime, a)
withElapsedTime ma = do
bef <- liftIO Clock.getMonotonicTimeNSec
!a <- ma
aft <- liftIO Clock.getMonotonicTimeNSec
let !dur = nanoseconds $ fromIntegral (aft - bef)
return (dur, a)
-- | Start timing and return an action to return the elapsed time since 'startTimer' was called.
--
-- @
-- timer <- startTimer
-- someStuffToTime
-- elapsed <- timer
-- moreStuff
-- elapsedBoth <- timer
-- @
startTimer :: (MonadIO m, MonadIO n)=> m (n DiffTime)
startTimer = do
!bef <- liftIO Clock.getMonotonicTimeNSec
return $ do
aft <- liftIO Clock.getMonotonicTimeNSec
return $ nanoseconds $ fromIntegral (aft - bef)

View File

@ -201,7 +201,8 @@ mkSpockAction
-> Spock.ActionT m ()
mkSpockAction serverCtx qErrEncoder qErrModifier apiHandler = do
req <- Spock.request
reqBody <- liftIO $ Wai.strictRequestBody req
-- Bytes are actually read from the socket here. Time this.
(ioWaitTime, reqBody) <- withElapsedTime $ liftIO $ Wai.strictRequestBody req
let headers = Wai.requestHeaders req
authMode = scAuthMode serverCtx
manager = scManager serverCtx
@ -215,9 +216,7 @@ mkSpockAction serverCtx qErrEncoder qErrModifier apiHandler = do
let handlerState = HandlerCtx serverCtx userInfo headers requestId
curRole = userRole userInfo
t1 <- liftIO Clock.getCurrentTime -- for measuring response time purposes
(result, q) <- case apiHandler of
(serviceTime, (result, q)) <- withElapsedTime $ case apiHandler of
AHGet handler -> do
res <- lift $ runReaderT (runExceptT handler) handlerState
return (res, Nothing)

View File

@ -137,7 +137,7 @@ mkJwtCtx conf httpManager logger = do
case mTime of
Nothing -> return ref
Just t -> do
jwkRefreshCtrl logger httpManager url ref t
jwkRefreshCtrl logger httpManager url ref (fromUnits t)
return ref
let claimsFmt = fromMaybe JCFJson (jcClaimsFormat conf)
return $ JWTCtx jwkRef (jcClaimNs conf) (jcAudience conf) claimsFmt (jcIssuer conf)

View File

@ -27,10 +27,10 @@ import Hasura.Prelude
import Hasura.RQL.Types
import Hasura.Server.Auth.JWT.Internal (parseHmacKey, parseRsaKey)
import Hasura.Server.Auth.JWT.Logging
import Hasura.Server.Utils (diffTimeToMicro, fmapL, userRoleHeader)
import Hasura.Server.Utils (fmapL, userRoleHeader)
import Hasura.Server.Version (HasVersion)
import qualified Control.Concurrent as C
import qualified Control.Concurrent.Extended as C
import qualified Crypto.JWT as Jose
import qualified Data.Aeson as A
import qualified Data.Aeson.Casing as A
@ -98,11 +98,15 @@ defaultClaimNs :: T.Text
defaultClaimNs = "https://hasura.io/jwt/claims"
-- | if the time is greater than 100 seconds, should refresh the JWK 10 seonds
-- before the expiry, else refresh at given seconds
computeDiffTime :: NominalDiffTime -> Int
-- before the expiry, else refresh at given interval.
--
-- Apparently we want to make sure to refresh the JWKs preemptively so we're
-- not verifying the signature of JWTs with a too old JWK, while the other case
-- (interval <= 100 sec) is useful for development but someone who knows needs
-- TODO a proper job documenting this.
computeDiffTime :: DiffTime -> DiffTime
computeDiffTime t =
let intTime = diffTimeToMicro t
in if intTime > 100 then intTime - 10 else intTime
if t > seconds 100 then t - seconds 10 else t
-- | create a background thread to refresh the JWK
jwkRefreshCtrl
@ -111,22 +115,21 @@ jwkRefreshCtrl
-> HTTP.Manager
-> URI
-> IORef Jose.JWKSet
-> NominalDiffTime
-> DiffTime
-> m ()
jwkRefreshCtrl logger manager url ref time =
void $ liftIO $ C.forkIO $ do
C.threadDelay $ diffTimeToMicro time
C.sleep time
forever $ do
res <- runExceptT $ updateJwkRef logger manager url ref
mTime <- either (const $ logNotice >> return Nothing) return res
-- if can't parse time from header, defaults to 1 min
let delay = maybe (60 * aSecond) computeDiffTime mTime
C.threadDelay delay
let delay = maybe (minutes 1) (computeDiffTime . fromUnits) mTime
C.sleep delay
where
logNotice = do
let err = JwkRefreshLog LevelInfo (JLNInfo "retrying again in 60 secs") Nothing
liftIO $ unLogger logger err
aSecond = 1000 * 1000
-- | Given a JWK url, fetch JWK from it and update the IORef

View File

@ -8,15 +8,15 @@ import Control.Monad (forever)
import Data.Text.Conversions (toText)
import qualified CI
import qualified Control.Concurrent as C
import qualified Data.Aeson as A
import qualified Data.Aeson.Casing as A
import qualified Data.Aeson.TH as A
import qualified Data.Text as T
import qualified Network.HTTP.Client as H
import qualified Network.URI.Encode as URI
import qualified Network.Wreq as Wreq
import qualified System.Log.FastLogger as FL
import qualified Control.Concurrent.Extended as C
import qualified Data.Aeson as A
import qualified Data.Aeson.Casing as A
import qualified Data.Aeson.TH as A
import qualified Data.Text as T
import qualified Network.HTTP.Client as H
import qualified Network.URI.Encode as URI
import qualified Network.Wreq as Wreq
import qualified System.Log.FastLogger as FL
import Hasura.HTTP
import Hasura.Logging (LoggerCtx (..))
@ -44,7 +44,7 @@ checkForUpdates (LoggerCtx loggerSet _ _ _) manager = do
when (latestVersion /= currentVersion) $
FL.pushLogStrLn loggerSet $ FL.toLogStr $ updateMsg latestVersion
C.threadDelay aDay
C.sleep $ days 1
where
updateMsg v = "Update: A new version is available: " <> toText v
@ -56,8 +56,6 @@ checkForUpdates (LoggerCtx loggerSet _ _ _) manager = do
Nothing -> "server"
Just ci -> "server-" <> (T.toLower . T.pack $ show ci)
aDay = 86400 * 1000 * 1000
-- ignoring if there is any error in response and returning the current version
decodeResp bs = case A.eitherDecode bs of
Left _ -> return $ UpdateInfo currentVersion

View File

@ -154,8 +154,8 @@ class (Monad m) => HttpLog m where
-> BL.ByteString
-- ^ the compressed response bytes
-- ^ TODO: make the above two type represented
-> Maybe (UTCTime, UTCTime)
-- ^ possible execution time
-> Maybe (DiffTime, DiffTime)
-- ^ IO/network wait time and service time (respectively) for this request, if available.
-> Maybe CompressionType
-- ^ possible compression type
-> [HTTP.Header]
@ -192,7 +192,10 @@ data OperationLog
{ olRequestId :: !RequestId
, olUserVars :: !(Maybe UserVars)
, olResponseSize :: !(Maybe Int64)
, olQueryExecutionTime :: !(Maybe Double)
, olRequestReadTime :: !(Maybe Seconds)
-- ^ Request IO wait time, i.e. time spent reading the full request from the socket.
, olQueryExecutionTime :: !(Maybe Seconds)
-- ^ Service time, not including request IO wait time.
, olQuery :: !(Maybe Value)
, olRawQuery :: !(Maybe Text)
, olError :: !(Maybe QErr)
@ -215,7 +218,7 @@ mkHttpAccessLogContext
-> RequestId
-> Wai.Request
-> BL.ByteString
-> Maybe (UTCTime, UTCTime)
-> Maybe (DiffTime, DiffTime)
-> Maybe CompressionType
-> [HTTP.Header]
-> HttpLogContext
@ -233,7 +236,8 @@ mkHttpAccessLogContext userInfoM reqId req res mTimeT compressTypeM headers =
{ olRequestId = reqId
, olUserVars = userVars <$> userInfoM
, olResponseSize = respSize
, olQueryExecutionTime = respTime
, olRequestReadTime = Seconds . fst <$> mTiming
, olQueryExecutionTime = Seconds . snd <$> mTiming
, olQuery = Nothing
, olRawQuery = Nothing
, olError = Nothing
@ -251,7 +255,7 @@ mkHttpErrorLogContext
-> Wai.Request
-> QErr
-> Either BL.ByteString Value
-> Maybe (UTCTime, UTCTime)
-> Maybe (DiffTime, DiffTime)
-> Maybe CompressionType
-> [HTTP.Header]
-> HttpLogContext
@ -269,7 +273,8 @@ mkHttpErrorLogContext userInfoM reqId req err query mTimeT compressTypeM headers
{ olRequestId = reqId
, olUserVars = userVars <$> userInfoM
, olResponseSize = Just $ BL.length $ encode err
, olQueryExecutionTime = computeTimeDiff mTimeT
, olRequestReadTime = Seconds . fst <$> mTiming
, olQueryExecutionTime = Seconds . snd <$> mTiming
, olQuery = either (const Nothing) Just query
, olRawQuery = either (Just . bsToTxt . BL.toStrict) (const Nothing) query
, olError = Just err

View File

@ -18,13 +18,13 @@ import Data.Aeson.Casing
import Data.Aeson.TH
import Data.IORef
import qualified Control.Concurrent as C
import qualified Control.Concurrent.STM as STM
import qualified Data.Text as T
import qualified Data.Time as UTC
import qualified Database.PG.Query as PG
import qualified Database.PostgreSQL.LibPQ as PQ
import qualified Network.HTTP.Client as HTTP
import qualified Control.Concurrent.Extended as C
import qualified Control.Concurrent.STM as STM
import qualified Data.Text as T
import qualified Data.Time as UTC
import qualified Database.PG.Query as PG
import qualified Database.PostgreSQL.LibPQ as PQ
import qualified Network.HTTP.Client as HTTP
pgChannel :: PG.PGChannel
pgChannel = "hasura_schema_update"
@ -126,7 +126,7 @@ listener sqlGenCtx pool logger httpMgr updateEventRef
liftIO $ runExceptT $ PG.listen pool pgChannel notifyHandler
either onError return listenResE
logWarn
C.threadDelay $ 1 * 1000 * 1000 -- 1 second
C.sleep $ seconds 1
where
threadType = TTListener

View File

@ -1,3 +1,4 @@
{-# LANGUAGE RecordWildCards #-}
{-|
Send anonymized metrics to the telemetry server regarding usage of various
features of Hasura.
@ -19,19 +20,20 @@ import Hasura.Logging
import Hasura.Prelude
import Hasura.RQL.Types
import Hasura.Server.Init
import Hasura.Server.Telemetry.Counters
import Hasura.Server.Version
import qualified CI
import qualified Control.Concurrent as C
import qualified Data.Aeson as A
import qualified Data.Aeson.Casing as A
import qualified Data.Aeson.TH as A
import qualified Data.ByteString.Lazy as BL
import qualified Data.HashMap.Strict as Map
import qualified Data.Text as T
import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Types as HTTP
import qualified Network.Wreq as Wreq
import qualified Control.Concurrent.Extended as C
import qualified Data.Aeson as A
import qualified Data.Aeson.Casing as A
import qualified Data.Aeson.TH as A
import qualified Data.ByteString.Lazy as BL
import qualified Data.HashMap.Strict as Map
import qualified Data.Text as T
import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Types as HTTP
import qualified Network.Wreq as Wreq
data RelationshipMetric
@ -61,6 +63,7 @@ data Metrics
, _mtEventTriggers :: !Int
, _mtRemoteSchemas :: !Int
, _mtFunctions :: !Int
, _mtServiceTimings :: !ServiceTimingMetrics
} deriving (Show, Eq)
$(A.deriveToJSON (A.aesonDrop 3 A.snakeCase) ''Metrics)
@ -92,6 +95,9 @@ mkPayload dbId instanceId version metrics = do
VersionRelease _ -> "server"
pure $ TelemetryPayload topic $ HasuraTelemetry dbId instanceId version ci metrics
-- | An infinite loop that sends updated telemetry data ('Metrics') every 24
-- hours. The send time depends on when the server was started and will
-- naturally drift.
runTelemetry
:: (HasVersion)
=> Logger Hasura
@ -105,12 +111,13 @@ runTelemetry (Logger logger) manager getSchemaCache dbId instanceId = do
let options = wreqOptions manager []
forever $ do
schemaCache <- getSchemaCache
let metrics = computeMetrics schemaCache
serviceTimings <- dumpServiceTimingMetrics
let metrics = computeMetrics schemaCache serviceTimings
payload <- A.encode <$> mkPayload dbId instanceId currentVersion metrics
logger $ debugLBS $ "metrics_info: " <> payload
resp <- try $ Wreq.postWith options (T.unpack telemetryUrl) payload
either logHttpEx handleHttpResp resp
C.threadDelay aDay
C.sleep $ days 1
where
logHttpEx :: HTTP.HttpException -> IO ()
@ -125,31 +132,29 @@ runTelemetry (Logger logger) manager getSchemaCache dbId instanceId = do
let httpErr = Just $ mkHttpError telemetryUrl (Just resp) Nothing
logger $ mkTelemetryLog "http_error" "failed to post telemetry" httpErr
aDay = 86400 * 1000 * 1000
computeMetrics :: SchemaCache -> Metrics
computeMetrics sc =
let nTables = countUserTables (isNothing . _tciViewInfo . _tiCoreInfo)
nViews = countUserTables (isJust . _tciViewInfo . _tiCoreInfo)
nEnumTables = countUserTables (isJust . _tciEnumValues . _tiCoreInfo)
computeMetrics :: SchemaCache -> ServiceTimingMetrics -> Metrics
computeMetrics sc _mtServiceTimings =
let _mtTables = countUserTables (isNothing . _tciViewInfo . _tiCoreInfo)
_mtViews = countUserTables (isJust . _tciViewInfo . _tiCoreInfo)
_mtEnumTables = countUserTables (isJust . _tciEnumValues . _tiCoreInfo)
allRels = join $ Map.elems $ Map.map (getRels . _tciFieldInfoMap . _tiCoreInfo) userTables
(manualRels, autoRels) = partition riIsManual allRels
relMetrics = RelationshipMetric (length manualRels) (length autoRels)
_mtRelationships = RelationshipMetric (length manualRels) (length autoRels)
rolePerms = join $ Map.elems $ Map.map permsOfTbl userTables
nRoles = length $ nub $ fst <$> rolePerms
_pmRoles = length $ nub $ fst <$> rolePerms
allPerms = snd <$> rolePerms
insPerms = calcPerms _permIns allPerms
selPerms = calcPerms _permSel allPerms
updPerms = calcPerms _permUpd allPerms
delPerms = calcPerms _permDel allPerms
permMetrics =
PermissionMetric selPerms insPerms updPerms delPerms nRoles
evtTriggers = Map.size $ Map.filter (not . Map.null)
_pmInsert = calcPerms _permIns allPerms
_pmSelect = calcPerms _permSel allPerms
_pmUpdate = calcPerms _permUpd allPerms
_pmDelete = calcPerms _permDel allPerms
_mtPermissions =
PermissionMetric{..}
_mtEventTriggers = Map.size $ Map.filter (not . Map.null)
$ Map.map _tiEventTriggerInfoMap userTables
rmSchemas = Map.size $ scRemoteSchemas sc
funcs = Map.size $ Map.filter (not . isSystemDefined . fiSystemDefined) $ scFunctions sc
_mtRemoteSchemas = Map.size $ scRemoteSchemas sc
_mtFunctions = Map.size $ Map.filter (not . isSystemDefined . fiSystemDefined) $ scFunctions sc
in Metrics nTables nViews nEnumTables relMetrics permMetrics evtTriggers rmSchemas funcs
in Metrics{..}
where
userTables = Map.filter (not . isSystemDefined . _tciSystemDefined . _tiCoreInfo) $ scTables sc

View File

@ -0,0 +1,183 @@
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE RecordWildCards #-}
{-|
Counters used in telemetry collection. Additional counters can be added here.and
serviced in "Hasura.Server.Telemetry".
-}
module Hasura.Server.Telemetry.Counters
(
-- * Service timing and counts, by various dimensions
-- ** Local metric recording
recordTimingMetric
, RequestDimensions(..), RequestTimings(..)
-- *** Dimensions
, CacheHit(..), QueryType(..), Locality(..), Transport(..)
-- ** Metric upload
, dumpServiceTimingMetrics
, ServiceTimingMetrics(..)
, ServiceTimingMetric(..)
, RunningTimeBucket(..)
, RequestTimingsCount(..)
)
where
import qualified Data.Aeson as A
import qualified Data.Aeson.Casing as A
import qualified Data.Aeson.TH as A
import Data.Hashable
import qualified Data.HashMap.Strict as HM
import Data.IORef
import Data.Time.Clock.POSIX (POSIXTime, getPOSIXTime)
import GHC.IO.Unsafe (unsafePerformIO)
import Hasura.Prelude
-- | The properties that characterize this request. The dimensions over which
-- we collect metrics for each serviced request.
data RequestDimensions =
RequestDimensions {
telemCacheHit :: !CacheHit
, telemQueryType :: !QueryType
, telemLocality :: !Locality
, telemTransport :: !Transport
}
deriving (Show, Generic, Eq)
instance Hashable RequestDimensions
-- | Accumulated time metrics.
data RequestTimings =
RequestTimings {
telemTimeIO :: !Seconds
-- ^ Time spent waiting on PG/remote http calls
, telemTimeTot :: !Seconds
-- ^ Total service time for request (including 'telemTimeIO')
}
-- | Sum
instance Semigroup RequestTimings where
RequestTimings a b <> RequestTimings x y = RequestTimings (a+x) (b+y)
-- | 'RequestTimings' along with the count
data RequestTimingsCount =
RequestTimingsCount {
telemTimeIO :: !Seconds
, telemTimeTot :: !Seconds
, telemCount :: !Word
-- ^ The number of requests that have contributed to the accumulated timings above.
-- So e.g. @telemTimeTot / count@ would give the mean service time.
}
deriving (Show, Generic, Eq)
-- | Sum
instance Semigroup RequestTimingsCount where
RequestTimingsCount a b c <> RequestTimingsCount x y z =
RequestTimingsCount (a+x) (b+y) (c+z)
-- | Internal. Counts and durations across many 'RequestDimensions'.
--
-- NOTE: We use the global mutable variable pattern for metric collection
-- counters for convenience at collection site (don't wear hairshirts that
-- discourage useful reporting).
requestCounters :: IORef (HM.HashMap (RequestDimensions, RunningTimeBucket) RequestTimingsCount)
{-# NOINLINE requestCounters #-}
requestCounters = unsafePerformIO $ newIORef HM.empty
-- | Internal. Since these metrics are accumulated while graphql-engine is
-- running and sent periodically, we need to include a tag that is unique for
-- each start of hge. This lets us e.g. query for just the latest uploaded
-- sample for each start of hge.
--
-- We use time rather than a UUID since having this be monotonic increasing is
-- convenient.
approxStartTime :: POSIXTime
{-# NOINLINE approxStartTime #-}
approxStartTime = unsafePerformIO getPOSIXTime
-- | Did this request hit the plan cache?
data CacheHit = Hit | Miss
deriving (Enum, Show, Eq, Generic)
instance Hashable CacheHit
instance A.ToJSON CacheHit
instance A.FromJSON CacheHit
-- | Was this request a mutation (involved DB writes)?
data QueryType = Mutation | Query
deriving (Enum, Show, Eq, Generic)
instance Hashable QueryType
instance A.ToJSON QueryType
instance A.FromJSON QueryType
-- | Was this a PG local query, or did it involve remote execution?
data Locality = Local | Remote
deriving (Enum, Show, Eq, Generic)
instance Hashable Locality
instance A.ToJSON Locality
instance A.FromJSON Locality
-- | Was this a query over http or websockets?
data Transport = HTTP | WebSocket
deriving (Enum, Show, Eq, Generic)
instance Hashable Transport
instance A.ToJSON Transport
instance A.FromJSON Transport
-- | The timings and counts here were from requests with total time longer than
-- 'bucketGreaterThan' (but less than any larger bucket cutoff times).
newtype RunningTimeBucket = RunningTimeBucket { bucketGreaterThan :: Seconds }
deriving (Fractional, Num, Ord, Eq, Show, Generic, A.ToJSON, A.FromJSON, Hashable)
-- NOTE: an HDR histogram is a nice way to collect metrics when you don't know
-- a priori what the most useful binning is. It's not clear how we'd make use
-- of that here though.
totalTimeBuckets :: [RunningTimeBucket]
totalTimeBuckets = [0, 1000, 10*1000, 100*1000]
-- | Save a timing metric sample in our in-memory store. These will be
-- accumulated and uploaded periodically in "Hasura.Server.Telemetry".
recordTimingMetric :: MonadIO m=> RequestDimensions -> RequestTimings -> m ()
recordTimingMetric reqDimensions RequestTimings{..} = liftIO $ do
let ourBucket = fromMaybe 0 $ -- although we expect 'head' would be safe here
listToMaybe $ dropWhile (> realToFrac telemTimeTot) $
reverse $ sort totalTimeBuckets
atomicModifyIORef' requestCounters $ (,()) .
HM.insertWith (<>) (reqDimensions, ourBucket) RequestTimingsCount{telemCount = 1, ..}
-- | The final shape of this part of our metrics data JSON. This should allow
-- reasonably efficient querying using GIN indexes and JSONB containment
-- operations (which treat arrays as sets).
data ServiceTimingMetrics
= ServiceTimingMetrics
{ collectionTag :: Int
-- ^ This is set to a new unique value when the counters reset (e.g. because of a restart)
, serviceTimingMetrics :: [ServiceTimingMetric]
}
deriving (Show, Generic, Eq)
data ServiceTimingMetric
= ServiceTimingMetric
{ dimensions :: RequestDimensions
, bucket :: RunningTimeBucket
, metrics :: RequestTimingsCount
}
deriving (Show, Generic, Eq)
$(A.deriveJSON (A.aesonDrop 5 A.snakeCase) ''RequestTimingsCount)
$(A.deriveJSON (A.aesonDrop 5 A.snakeCase) ''RequestDimensions)
instance A.ToJSON ServiceTimingMetric
instance A.FromJSON ServiceTimingMetric
instance A.ToJSON ServiceTimingMetrics
instance A.FromJSON ServiceTimingMetrics
dumpServiceTimingMetrics :: MonadIO m=> m ServiceTimingMetrics
dumpServiceTimingMetrics = liftIO $ do
cs <- readIORef requestCounters
let serviceTimingMetrics = flip map (HM.toList cs) $
\((dimensions, bucket), metrics)-> ServiceTimingMetric{..}
collectionTag = round approxStartTime
return ServiceTimingMetrics{..}

View File

@ -4,7 +4,6 @@ module Hasura.Server.Utils where
import Data.Aeson
import Data.Char
import Data.List (find)
import Data.Time.Clock
import Language.Haskell.TH.Syntax (Lift)
import System.Environment
import System.Exit
@ -117,13 +116,6 @@ fmapL :: (a -> a') -> Either a b -> Either a' b
fmapL fn (Left e) = Left (fn e)
fmapL _ (Right x) = pure x
-- diff time to micro seconds
diffTimeToMicro :: NominalDiffTime -> Int
diffTimeToMicro diff =
floor (realToFrac diff :: Double) * aSecond
where
aSecond = 1000 * 1000
generateFingerprint :: IO Text
generateFingerprint = UUID.toText <$> UUID.nextRandom
@ -223,10 +215,3 @@ makeReasonMessage errors showError =
[singleError] -> "because " <> showError singleError
_ -> "for the following reasons:\n" <> T.unlines
(map (("" <>) . showError) errors)
withElapsedTime :: MonadIO m => m a -> m (NominalDiffTime, a)
withElapsedTime ma = do
t1 <- liftIO getCurrentTime
a <- ma
t2 <- liftIO getCurrentTime
return (diffUTCTime t2 t1, a)

View File

@ -0,0 +1,44 @@
module Data.TimeSpec (spec) where
-- | Time-related properties we care about.
import Prelude
import Data.Time.Clock.Units
import Data.Time
import Data.Aeson
import Test.Hspec
spec :: Spec
spec = do
timeUnitsSpec
diffTimeSpec
timeUnitsSpec :: Spec
timeUnitsSpec =
describe "time units" $ do
it "converts correctly" $ do
seconds 123 `shouldBe` 123
milliseconds 123 `shouldBe` 0.123
microseconds 123 `shouldBe` 0.000123
nanoseconds 123 `shouldBe` 0.000000123
it "has a correct Read instance" $ do
seconds (read "123") `shouldBe` 123
milliseconds (read "123") `shouldBe` 0.123
microseconds (read "123") `shouldBe` 0.000123
nanoseconds (read "123") `shouldBe` 0.000000123
it "JSON serializes as proper units" $ do
toJSON (1 :: Seconds) `shouldBe` Number 1
decode "1.0" `shouldBe` Just (1 :: Seconds)
it "converts with fromUnits" $ do
fromUnits (2 :: Minutes) `shouldBe` (120 :: NominalDiffTime)
fromUnits (60 :: Seconds) `shouldBe` (1 :: Minutes)
diffTimeSpec :: Spec
diffTimeSpec =
describe "DiffTime" $ do
it "JSON serializes as seconds" $ do
-- although we would prefer to use Seconds instead...
toJSON (1 :: DiffTime) `shouldBe` Number 1
decode "1.0" `shouldBe` Just (1 :: DiffTime)

View File

@ -0,0 +1,48 @@
{-# LANGUAGE DuplicateRecordFields #-}
module Hasura.Server.TelemetrySpec (spec) where
import Hasura.Prelude
import qualified Data.Aeson as A
import Hasura.Server.Telemetry.Counters
import Test.Hspec
spec :: Spec
spec = do
telemetryCountersTests
-- NOTE: this test is effectful/stateful; if we need to we can implement an
-- operation to clear metric store.
telemetryCountersTests :: Spec
telemetryCountersTests = do
describe "request timing counters" $ do
it "is at first empty" $ do
fmap serviceTimingMetrics dumpServiceTimingMetrics `shouldReturn` []
-- excercise accumulating and buckets:
let expected =
-- NOTE: ordering is arbitrary here (and hence fragile)
[ServiceTimingMetric {
dimensions = RequestDimensions Hit Mutation Local HTTP
, bucket = RunningTimeBucket {bucketGreaterThan = 0}
, metrics = RequestTimingsCount {telemTimeIO = 2, telemTimeTot = 200, telemCount = 2}},
ServiceTimingMetric {
dimensions = RequestDimensions Miss Mutation Local HTTP
, bucket = RunningTimeBucket {bucketGreaterThan = 1000}
, metrics = RequestTimingsCount {telemTimeIO = 2, telemTimeTot = 2002, telemCount = 2}},
ServiceTimingMetric {
dimensions = RequestDimensions Hit Query Remote WebSocket
, bucket = RunningTimeBucket {bucketGreaterThan = 100000}
, metrics = RequestTimingsCount {telemTimeIO = 1, telemTimeTot = 100001, telemCount = 1}}]
it "accumulates as expected" $ do
recordTimingMetric (RequestDimensions Hit Mutation Local HTTP) (RequestTimings 1 100)
recordTimingMetric (RequestDimensions Hit Mutation Local HTTP) (RequestTimings 1 100)
recordTimingMetric (RequestDimensions Miss Mutation Local HTTP) (RequestTimings 1 1001)
recordTimingMetric (RequestDimensions Miss Mutation Local HTTP) (RequestTimings 1 1001)
recordTimingMetric (RequestDimensions Hit Query Remote WebSocket) (RequestTimings 1 100001)
fmap serviceTimingMetrics dumpServiceTimingMetrics `shouldReturn` expected
it "serializes and deserializes properly" $ do
fmap (fmap serviceTimingMetrics . A.eitherDecode . A.encode) dumpServiceTimingMetrics
`shouldReturn` Right expected

View File

@ -20,8 +20,9 @@ import qualified Test.Hspec.Runner as Hspec
import Hasura.Db (PGExecCtx (..))
import Hasura.RQL.Types (SQLGenCtx (..), adminUserInfo)
import Hasura.RQL.Types.Run
import Hasura.Server.Init (RawConnInfo, mkConnInfo, mkRawConnInfo,
parseRawConnInfo, runWithEnv)
import Hasura.Server.Init (RawConnInfo, mkConnInfo,
mkRawConnInfo, parseRawConnInfo,
runWithEnv)
import Hasura.Server.Migrate
import Hasura.Server.Version
@ -29,6 +30,8 @@ import qualified Data.Parser.CacheControlSpec as CacheControlParser
import qualified Hasura.IncrementalSpec as IncrementalSpec
import qualified Hasura.RQL.MetadataSpec as MetadataSpec
import qualified Hasura.Server.MigrateSpec as MigrateSpec
import qualified Data.TimeSpec as TimeSpec
import qualified Hasura.Server.TelemetrySpec as TelemetrySpec
data TestSuites
= AllSuites !RawConnInfo
@ -55,6 +58,8 @@ unitSpecs = do
describe "Data.Parser.CacheControl" CacheControlParser.spec
describe "Hasura.Incremental" IncrementalSpec.spec
describe "Hasura.RQL.Metadata" MetadataSpec.spec
describe "Data.Time" TimeSpec.spec
describe "Hasura.Server.Telemetry" TelemetrySpec.spec
buildPostgresSpecs :: (HasVersion) => RawConnInfo -> IO Spec
buildPostgresSpecs pgConnOptions = do