graphql-engine/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs
Brandon Simmons 58ef316118 Add request timings and count histograms to telemetry. Closes #3552
We upload a set of accumulating timers and counters to track service
time for different types of operations, across several dimensions (e.g.
did we hit the plan cache, was a remote involved, etc.)

Also...

Standardize on DiffTime as a standard duration type, and try to use it
consistently.

See discussion here:
https://github.com/hasura/graphql-engine/pull/3584#pullrequestreview-340679369

It should be possible to overwrite that module so the new threadDelay
sticks per the pattern in #3705 blocked on #3558

Rename the Control.Concurrent.Extended.threadDelay to `sleep` since a
naive use with a literal argument would be very bad!

We catch a bug in 'computeTimeDiff'.

Add convenient 'Read' instances to the time unit utility types. Make
'Second' a newtype to support this.
2020-02-03 18:50:10 -06:00

374 lines
14 KiB
Haskell
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

-- | Multiplexed live query poller threads; see "Hasura.GraphQL.Execute.LiveQuery" for details.
module Hasura.GraphQL.Execute.LiveQuery.Poll (
-- * Pollers
Poller(..)
, PollerIOState(..)
, pollQuery
, PollerKey(..)
, PollerMap
, dumpPollerMap
, RefetchMetrics
, initRefetchMetrics
-- * Cohorts
, Cohort(..)
, CohortId
, newCohortId
, CohortVariables
, CohortKey
, CohortMap
-- * Subscribers
, Subscriber(..)
, SubscriberId
, newSinkId
, SubscriberMap
, OnChange
, LGQResponse
, LiveQueryResponse(..)
, LiveQueryMetadata(..)
) where
import Hasura.Prelude
import qualified Control.Concurrent.Async as A
import qualified Control.Concurrent.STM as STM
import qualified Crypto.Hash as CH
import qualified Data.Aeson.Extended as J
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BL
import qualified Data.HashMap.Strict as Map
import qualified Data.Time.Clock as Clock
import qualified Data.UUID as UUID
import qualified Data.UUID.V4 as UUID
import qualified Language.GraphQL.Draft.Syntax as G
import qualified ListT
import qualified StmContainers.Map as STMMap
import qualified System.Metrics.Distribution as Metrics
import Data.List.Split (chunksOf)
import qualified Hasura.GraphQL.Execute.LiveQuery.TMap as TMap
import Hasura.Db
import Hasura.EncJSON
import Hasura.GraphQL.Execute.LiveQuery.Options
import Hasura.GraphQL.Execute.LiveQuery.Plan
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.RQL.Types
-- -------------------------------------------------------------------------------------------------
-- Subscribers
data Subscriber
= Subscriber
{ _sRootAlias :: !G.Alias
, _sOnChangeCallback :: !OnChange
}
-- | live query onChange metadata, used for adding more extra analytics data
data LiveQueryMetadata
= LiveQueryMetadata
{ _lqmExecutionTime :: !Clock.DiffTime
-- ^ Time spent waiting on the generated query to execute on postgres or the remote.
}
data LiveQueryResponse
= LiveQueryResponse
{ _lqrPayload :: !BL.ByteString
, _lqrExecutionTime :: !Clock.DiffTime
}
type LGQResponse = GQResult LiveQueryResponse
type OnChange = LGQResponse -> IO ()
newtype SubscriberId = SubscriberId { _unSinkId :: UUID.UUID }
deriving (Show, Eq, Hashable, J.ToJSON)
newSinkId :: IO SubscriberId
newSinkId = SubscriberId <$> UUID.nextRandom
type SubscriberMap = TMap.TMap SubscriberId Subscriber
-- -------------------------------------------------------------------------------------------------
-- Cohorts
-- | A batched group of 'Subscriber's who are not only listening to the same query but also have
-- identical session and query variables. Each result pushed to a 'Cohort' is forwarded along to
-- each of its 'Subscriber's.
--
-- In SQL, each 'Cohort' corresponds to a single row in the laterally-joined @_subs@ table (and
-- therefore a single row in the query result).
data Cohort
= Cohort
{ _cCohortId :: !CohortId
-- ^ a unique identifier used to identify the cohort in the generated query
, _cPreviousResponse :: !(STM.TVar (Maybe ResponseHash))
-- ^ a hash of the previous query result, if any, used to determine if we need to push an updated
-- result to the subscribers or not
, _cExistingSubscribers :: !SubscriberMap
-- ^ the subscribers weve already pushed a result to; we push new results to them iff the
-- response changes
, _cNewSubscribers :: !SubscriberMap
-- ^ subscribers we havent yet pushed any results to; we push results to them regardless if the
-- result changed, then merge them in the map of existing subscribers
}
{- Note [Blake2b faster than SHA-256]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
At the time of writing, from https://blake2.net, it is stated,
"BLAKE2 is a cryptographic hash function faster than MD5, SHA-1, SHA-2, and SHA-3,
yet is at least as secure as the latest standard SHA-3".
-}
-- | A hash used to determine if the result changed without having to keep the entire result in
-- memory. Using a cryptographic hash ensures that a hash collision is almost impossible: with 256
-- bits, even if a subscription changes once per second for an entire year, the probability of a
-- hash collision is ~4.294417×10-63. See Note [Blake2b faster than SHA-256].
newtype ResponseHash = ResponseHash { unResponseHash :: CH.Digest CH.Blake2b_256 }
deriving (Show, Eq)
instance J.ToJSON ResponseHash where
toJSON = J.toJSON . show . unResponseHash
mkRespHash :: BS.ByteString -> ResponseHash
mkRespHash = ResponseHash . CH.hash
-- | A key we use to determine if two 'Subscriber's belong in the same 'Cohort' (assuming they
-- already meet the criteria to be in the same 'Poller'). Note the distinction between this and
-- 'CohortId'; the latter is a completely synthetic key used only to identify the cohort in the
-- generated SQL query.
type CohortKey = CohortVariables
type CohortMap = TMap.TMap CohortKey Cohort
dumpCohortMap :: CohortMap -> IO J.Value
dumpCohortMap cohortMap = do
cohorts <- STM.atomically $ TMap.toList cohortMap
fmap J.toJSON . forM cohorts $ \(variableValues, cohort) -> do
cohortJ <- dumpCohort cohort
return $ J.object
[ "variables" J..= variableValues
, "cohort" J..= cohortJ
]
where
dumpCohort (Cohort respId respTV curOps newOps) =
STM.atomically $ do
prevResHash <- STM.readTVar respTV
curOpIds <- TMap.toList curOps
newOpIds <- TMap.toList newOps
return $ J.object
[ "resp_id" J..= respId
, "current_ops" J..= map fst curOpIds
, "new_ops" J..= map fst newOpIds
, "previous_result_hash" J..= prevResHash
]
data CohortSnapshot
= CohortSnapshot
{ _csVariables :: !CohortVariables
, _csPreviousResponse :: !(STM.TVar (Maybe ResponseHash))
, _csExistingSubscribers :: ![Subscriber]
, _csNewSubscribers :: ![Subscriber]
}
pushResultToCohort
:: GQResult EncJSON
-- ^ a response that still needs to be wrapped with each 'Subscriber's root 'G.Alias'
-> Maybe ResponseHash
-> LiveQueryMetadata
-> CohortSnapshot
-> IO ()
pushResultToCohort result respHashM (LiveQueryMetadata dTime) cohortSnapshot = do
prevRespHashM <- STM.readTVarIO respRef
-- write to the current websockets if needed
sinks <-
if isExecError result || respHashM /= prevRespHashM
then do
STM.atomically $ STM.writeTVar respRef respHashM
return (newSinks <> curSinks)
else
return newSinks
pushResultToSubscribers sinks
where
CohortSnapshot _ respRef curSinks newSinks = cohortSnapshot
pushResultToSubscribers = A.mapConcurrently_ $ \(Subscriber alias action) ->
let aliasText = G.unName $ G.unAlias alias
wrapWithAlias response = LiveQueryResponse
{ _lqrPayload = encJToLBS $ encJFromAssocList [(aliasText, response)]
, _lqrExecutionTime = dTime
}
in action (wrapWithAlias <$> result)
-- -------------------------------------------------------------------------------------------------
-- Pollers
-- | A unique, multiplexed query. Each 'Poller' has its own polling thread that periodically polls
-- Postgres and pushes results to each of its listening 'Cohort's.
--
-- In SQL, an 'Poller' corresponds to a single, multiplexed query, though in practice, 'Poller's
-- with large numbers of 'Cohort's are batched into multiple concurrent queries for performance
-- reasons.
data Poller
= Poller
{ _pCohorts :: !CohortMap
, _pIOState :: !(STM.TMVar PollerIOState)
-- ^ This is in a separate 'STM.TMVar' because its important that we are able to construct
-- 'Poller' values in 'STM.STM' --- we need the insertion into the 'PollerMap' to be atomic to
-- ensure that we dont accidentally create two for the same query due to a race. However, we
-- cant spawn the worker thread or create the metrics store in 'STM.STM', so we insert it into
-- the 'Poller' only after were certain we wont create any duplicates.
}
data PollerIOState
= PollerIOState
{ _pThread :: !(A.Async ())
-- ^ a handle on the pollers worker thread that can be used to 'A.cancel' it if all its cohorts
-- stop listening
, _pMetrics :: !RefetchMetrics
}
data RefetchMetrics
= RefetchMetrics
{ _rmSnapshot :: !Metrics.Distribution
, _rmPush :: !Metrics.Distribution
, _rmQuery :: !Metrics.Distribution
, _rmTotal :: !Metrics.Distribution
}
initRefetchMetrics :: IO RefetchMetrics
initRefetchMetrics = RefetchMetrics <$> Metrics.new <*> Metrics.new <*> Metrics.new <*> Metrics.new
data PollerKey
-- we don't need operation name here as a subscription will
-- only have a single top level field
= PollerKey
{ _lgRole :: !RoleName
, _lgQuery :: !MultiplexedQuery
} deriving (Show, Eq, Generic)
instance Hashable PollerKey
instance J.ToJSON PollerKey where
toJSON (PollerKey role query) =
J.object [ "role" J..= role
, "query" J..= query
]
type PollerMap = STMMap.Map PollerKey Poller
dumpPollerMap :: Bool -> PollerMap -> IO J.Value
dumpPollerMap extended lqMap =
fmap J.toJSON $ do
entries <- STM.atomically $ ListT.toList $ STMMap.listT lqMap
forM entries $ \(PollerKey role query, Poller cohortsMap ioState) -> do
PollerIOState threadId metrics <- STM.atomically $ STM.readTMVar ioState
metricsJ <- dumpRefetchMetrics metrics
cohortsJ <-
if extended
then Just <$> dumpCohortMap cohortsMap
else return Nothing
return $ J.object
[ "role" J..= role
, "thread_id" J..= show (A.asyncThreadId threadId)
, "multiplexed_query" J..= query
, "cohorts" J..= cohortsJ
, "metrics" J..= metricsJ
]
where
dumpRefetchMetrics metrics = do
snapshotS <- Metrics.read $ _rmSnapshot metrics
queryS <- Metrics.read $ _rmQuery metrics
pushS <- Metrics.read $ _rmPush metrics
totalS <- Metrics.read $ _rmTotal metrics
return $ J.object
[ "snapshot" J..= dumpStats snapshotS
, "query" J..= dumpStats queryS
, "push" J..= dumpStats pushS
, "total" J..= dumpStats totalS
]
dumpStats stats =
J.object
[ "mean" J..= Metrics.mean stats
, "variance" J..= Metrics.variance stats
, "count" J..= Metrics.count stats
, "min" J..= Metrics.min stats
, "max" J..= Metrics.max stats
]
-- | Where the magic happens: the top-level action run periodically by each active 'Poller'.
pollQuery
:: RefetchMetrics
-> BatchSize
-> PGExecCtx
-> MultiplexedQuery
-> Poller
-> IO ()
pollQuery metrics batchSize pgExecCtx pgQuery handler = do
procInit <- Clock.getCurrentTime
-- get a snapshot of all the cohorts
-- this need not be done in a transaction
cohorts <- STM.atomically $ TMap.toList cohortMap
cohortSnapshotMap <- Map.fromList <$> mapM (STM.atomically . getCohortSnapshot) cohorts
let queryVarsBatches = chunksOf (unBatchSize batchSize) $ getQueryVars cohortSnapshotMap
snapshotFinish <- Clock.getCurrentTime
Metrics.add (_rmSnapshot metrics) $
realToFrac $ Clock.diffUTCTime snapshotFinish procInit
flip A.mapConcurrently_ queryVarsBatches $ \queryVars -> do
queryInit <- Clock.getCurrentTime
mxRes <- runExceptT . runLazyTx' pgExecCtx $ executeMultiplexedQuery pgQuery queryVars
queryFinish <- Clock.getCurrentTime
let dt = Clock.diffUTCTime queryFinish queryInit
queryTime = realToFrac dt
lqMeta = LiveQueryMetadata $ fromUnits dt
operations = getCohortOperations cohortSnapshotMap lqMeta mxRes
Metrics.add (_rmQuery metrics) queryTime
-- concurrently push each unique result
A.mapConcurrently_ (uncurry4 pushResultToCohort) operations
pushFinish <- Clock.getCurrentTime
Metrics.add (_rmPush metrics) $
realToFrac $ Clock.diffUTCTime pushFinish queryFinish
procFinish <- Clock.getCurrentTime
Metrics.add (_rmTotal metrics) $
realToFrac $ Clock.diffUTCTime procFinish procInit
where
Poller cohortMap _ = handler
uncurry4 :: (a -> b -> c -> d -> e) -> (a, b, c, d) -> e
uncurry4 f (a, b, c, d) = f a b c d
getCohortSnapshot (cohortVars, handlerC) = do
let Cohort resId respRef curOpsTV newOpsTV = handlerC
curOpsL <- TMap.toList curOpsTV
newOpsL <- TMap.toList newOpsTV
forM_ newOpsL $ \(k, action) -> TMap.insert action k curOpsTV
TMap.reset newOpsTV
let cohortSnapshot = CohortSnapshot cohortVars respRef (map snd curOpsL) (map snd newOpsL)
return (resId, cohortSnapshot)
getQueryVars cohortSnapshotMap =
Map.toList $ fmap _csVariables cohortSnapshotMap
getCohortOperations cohortSnapshotMap actionMeta = \case
Left e ->
-- TODO: this is internal error
let resp = GQExecError [encodeGQErr False e]
in [ (resp, Nothing, actionMeta, snapshot)
| (_, snapshot) <- Map.toList cohortSnapshotMap
]
Right responses ->
flip mapMaybe responses $ \(respId, result) ->
-- TODO: change it to use bytestrings directly
let -- No reason to use lazy bytestrings here, since (1) we fetch the entire result set
-- from Postgres strictly and (2) even if we didnt, hashing will have to force the
-- whole thing anyway.
respHash = mkRespHash (encJToBS result)
in (GQSuccess result, Just respHash, actionMeta,) <$> Map.lookup respId cohortSnapshotMap