1
0
mirror of https://github.com/hasura/graphql-engine.git synced 2024-12-18 21:12:09 +03:00
graphql-engine/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs
Robert d900a65399 server: Reduce and localize CPP usage
Some of our use of CPP causes trouble for ormolu, compare https://github.com/tweag/ormolu/issues/774.
Specifically, for understandable reasons, it can't deal well with `#ifdef` use that is not at the top-level.

This PR removes the problematic usage in ways that I hope are also a net non-loss regardless of helping
out ormolu (or other tooling).

- The default value for enabled APIs moves to the top level, next to the command line help, so
  they'll stay in sync more easily.
- All the CPP around using `assertNFHere` is moved to one module.

https://github.com/hasura/graphql-engine-mono/pull/2361

GitOrigin-RevId: ed6e039e6d8960322fd8d1312df762ad197c29b1
2021-09-16 18:52:37 +00:00

539 lines
21 KiB
Haskell
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

-- | Multiplexed live query poller threads; see "Hasura.GraphQL.Execute.LiveQuery" for details.
module Hasura.GraphQL.Execute.LiveQuery.Poll (
-- * Pollers
Poller(..)
, PollerId(..)
, PollerIOState(..)
, pollQuery
, PollerKey(..)
, PollerMap
, dumpPollerMap
, PollDetails(..)
, BatchExecutionDetails(..)
, CohortExecutionDetails(..)
, LiveQueryPostPollHook
, defaultLiveQueryPostPollHook
-- * Cohorts
, Cohort(..)
, CohortId
, newCohortId
, CohortVariables
, CohortKey
, CohortMap
-- * Subscribers
, Subscriber(..)
, SubscriberId
, newSubscriberId
, SubscriberMetadata
, mkSubscriberMetadata
, unSubscriberMetadata
, SubscriberMap
, OnChange
, LGQResponse
, LiveQueryResponse(..)
, LiveQueryMetadata(..)
, SubscriberExecutionDetails (..)
-- * Batch
, BatchId (..)
) where
import Data.List.Split (chunksOf)
import GHC.AssertNF.CPP
import Hasura.Prelude
import qualified Control.Concurrent.Async as A
import qualified Control.Concurrent.STM as STM
import qualified Control.Immortal as Immortal
import qualified Crypto.Hash as CH
import qualified Data.Aeson.Extended as J
import qualified Data.ByteString as BS
import qualified Data.HashMap.Strict as Map
import qualified Data.Time.Clock as Clock
import qualified Data.UUID as UUID
import qualified Data.UUID.V4 as UUID
import qualified ListT
import qualified StmContainers.Map as STMMap
import Control.Lens
import Data.Monoid (Sum (..))
import Data.Text.Extended
import qualified Hasura.GraphQL.Execute.LiveQuery.TMap as TMap
import qualified Hasura.Logging as L
import Hasura.Base.Error
import Hasura.GraphQL.Execute.Backend
import Hasura.GraphQL.Execute.LiveQuery.Options
import Hasura.GraphQL.Execute.LiveQuery.Plan
import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash)
import Hasura.GraphQL.Transport.Backend
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.GraphQL.Transport.WebSocket.Protocol (OperationId)
import qualified Hasura.GraphQL.Transport.WebSocket.Server as WS
import Hasura.RQL.Types.Backend
import Hasura.RQL.Types.Common (SourceName, getNonNegativeInt)
import Hasura.Server.Types (RequestId)
import Hasura.Session
-- ----------------------------------------------------------------------------------------------
-- Subscribers
newtype SubscriberId
= SubscriberId { unSubscriberId :: UUID.UUID }
deriving (Show, Eq, Generic, Hashable, J.ToJSON)
newSubscriberId :: IO SubscriberId
newSubscriberId =
SubscriberId <$> UUID.nextRandom
-- | Allows a user of the live query subsystem (currently websocket transport)
-- to attach arbitrary metadata about a subscriber. This information is available
-- as part of Subscriber in CohortExecutionDetails and can be logged by customizing
-- in pollerlog
newtype SubscriberMetadata
= SubscriberMetadata { unSubscriberMetadata :: J.Value }
deriving (Show, Eq, J.ToJSON)
mkSubscriberMetadata :: WS.WSId -> OperationId -> Maybe OperationName -> RequestId -> SubscriberMetadata
mkSubscriberMetadata websocketId operationId operationName reqId =
SubscriberMetadata $ J.object
[ "websocket_id" J..= websocketId
, "operation_id" J..= operationId
, "operation_name" J..= operationName
, "request_id" J..= reqId
]
data Subscriber
= Subscriber
{ _sId :: !SubscriberId
, _sMetadata :: !SubscriberMetadata
, _sRequestId :: !RequestId
, _sOperationName :: !(Maybe OperationName)
, _sOnChangeCallback :: !OnChange
}
-- | live query onChange metadata, used for adding more extra analytics data
data LiveQueryMetadata
= LiveQueryMetadata
{ _lqmExecutionTime :: !Clock.DiffTime
}
data LiveQueryResponse
= LiveQueryResponse
{ _lqrPayload :: !BS.ByteString
, _lqrExecutionTime :: !Clock.DiffTime
}
type LGQResponse = GQResult LiveQueryResponse
type OnChange = LGQResponse -> IO ()
type SubscriberMap = TMap.TMap SubscriberId Subscriber
-- -------------------------------------------------------------------------------------------------
-- Cohorts
-- | A batched group of 'Subscriber's who are not only listening to the same query but also have
-- identical session and query variables. Each result pushed to a 'Cohort' is forwarded along to
-- each of its 'Subscriber's.
--
-- In SQL, each 'Cohort' corresponds to a single row in the laterally-joined @_subs@ table (and
-- therefore a single row in the query result).
--
-- See also 'CohortMap'.
data Cohort
= Cohort
{ _cCohortId :: !CohortId
-- ^ a unique identifier used to identify the cohort in the generated query
, _cPreviousResponse :: !(STM.TVar (Maybe ResponseHash))
-- ^ a hash of the previous query result, if any, used to determine if we need to push an updated
-- result to the subscribers or not
, _cExistingSubscribers :: !SubscriberMap
-- ^ the subscribers weve already pushed a result to; we push new results to them iff the
-- response changes
, _cNewSubscribers :: !SubscriberMap
-- ^ subscribers we havent yet pushed any results to; we push results to them regardless if the
-- result changed, then merge them in the map of existing subscribers
}
-- | The @BatchId@ is a number based ID to uniquely identify a batch in a single poll and
-- it's used to identify the batch to which a cohort belongs to.
newtype BatchId
= BatchId { _unBatchId :: Int }
deriving (Show, Eq, J.ToJSON)
{- Note [Blake2b faster than SHA-256]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
At the time of writing, from https://blake2.net, it is stated,
"BLAKE2 is a cryptographic hash function faster than MD5, SHA-1, SHA-2, and SHA-3,
yet is at least as secure as the latest standard SHA-3".
-}
-- | A hash used to determine if the result changed without having to keep the entire result in
-- memory. Using a cryptographic hash ensures that a hash collision is almost impossible: with 256
-- bits, even if a subscription changes once per second for an entire year, the probability of a
-- hash collision is ~4.294417×10-63. See Note [Blake2b faster than SHA-256].
newtype ResponseHash = ResponseHash { unResponseHash :: CH.Digest CH.Blake2b_256 }
deriving (Show, Eq)
instance J.ToJSON ResponseHash where
toJSON = J.toJSON . show . unResponseHash
mkRespHash :: BS.ByteString -> ResponseHash
mkRespHash = ResponseHash . CH.hash
-- | A key we use to determine if two 'Subscriber's belong in the same 'Cohort'
-- (assuming they already meet the criteria to be in the same 'Poller'). Note
-- the distinction between this and 'CohortId'; the latter is a completely
-- synthetic key used only to identify the cohort in the generated SQL query.
type CohortKey = CohortVariables
-- | This has the invariant, maintained in 'removeLiveQuery', that it contains
-- no 'Cohort' with zero total (existing + new) subscribers.
type CohortMap = TMap.TMap CohortKey Cohort
dumpCohortMap :: CohortMap -> IO J.Value
dumpCohortMap cohortMap = do
cohorts <- STM.atomically $ TMap.toList cohortMap
fmap J.toJSON . forM cohorts $ \(variableValues, cohort) -> do
cohortJ <- dumpCohort cohort
return $ J.object
[ "variables" J..= variableValues
, "cohort" J..= cohortJ
]
where
dumpCohort (Cohort respId respTV curOps newOps) =
STM.atomically $ do
prevResHash <- STM.readTVar respTV
curOpIds <- TMap.toList curOps
newOpIds <- TMap.toList newOps
return $ J.object
[ "resp_id" J..= respId
, "current_ops" J..= map fst curOpIds
, "new_ops" J..= map fst newOpIds
, "previous_result_hash" J..= prevResHash
]
data CohortSnapshot
= CohortSnapshot
{ _csVariables :: !CohortVariables
, _csPreviousResponse :: !(STM.TVar (Maybe ResponseHash))
, _csExistingSubscribers :: ![Subscriber]
, _csNewSubscribers :: ![Subscriber]
}
pushResultToCohort
:: GQResult BS.ByteString
-> Maybe ResponseHash
-> LiveQueryMetadata
-> CohortSnapshot
-> IO ( [SubscriberExecutionDetails], [SubscriberExecutionDetails])
-- ^ subscribers to which data has been pushed, subscribers which already
-- have this data (this information is exposed by metrics reporting)
pushResultToCohort result !respHashM (LiveQueryMetadata dTime) cohortSnapshot = do
prevRespHashM <- STM.readTVarIO respRef
-- write to the current websockets if needed
(subscribersToPush, subscribersToIgnore) <-
if isExecError result || respHashM /= prevRespHashM
then do
$assertNFHere respHashM -- so we don't write thunks to mutable vars
STM.atomically $ STM.writeTVar respRef respHashM
return (newSinks <> curSinks, mempty)
else
return (newSinks, curSinks)
pushResultToSubscribers subscribersToPush
pure $ over (each.each) (\Subscriber{..}
-> SubscriberExecutionDetails _sId _sMetadata)
(subscribersToPush, subscribersToIgnore)
where
CohortSnapshot _ respRef curSinks newSinks = cohortSnapshot
response = result <&> \payload -> LiveQueryResponse payload dTime
pushResultToSubscribers =
A.mapConcurrently_ $ \Subscriber {..} -> _sOnChangeCallback response
-- -----------------------------------------------------------------------------
-- Pollers
-- | A unique, multiplexed query. Each 'Poller' has its own polling thread that
-- periodically polls Postgres and pushes results to each of its listening
-- 'Cohort's.
--
-- In SQL, an 'Poller' corresponds to a single, multiplexed query, though in
-- practice, 'Poller's with large numbers of 'Cohort's are batched into
-- multiple concurrent queries for performance reasons.
data Poller
= Poller
{ _pCohorts :: !CohortMap
, _pIOState :: !(STM.TMVar PollerIOState)
-- ^ This is in a separate 'STM.TMVar' because its important that we are
-- able to construct 'Poller' values in 'STM.STM' --- we need the insertion
-- into the 'PollerMap' to be atomic to ensure that we dont accidentally
-- create two for the same query due to a race. However, we cant spawn the
-- worker thread or create the metrics store in 'STM.STM', so we insert it
-- into the 'Poller' only after were certain we wont create any duplicates.
--
-- This var is "write once", moving monotonically from empty to full.
-- TODO this could probably be tightened up to something like
-- 'STM PollerIOState'
}
data PollerIOState
= PollerIOState
{ _pThread :: !Immortal.Thread
-- ^ a handle on the pollers worker thread that can be used to
-- 'Immortal.stop' it if all its cohorts stop listening
, _pId :: !PollerId
}
data PollerKey
-- we don't need operation name here as a subscription will only have a
-- single top level field
= PollerKey
{ _lgSource :: !SourceName
, _lgRole :: !RoleName
, _lgQuery :: !Text
} deriving (Show, Eq, Generic)
instance Hashable PollerKey
instance J.ToJSON PollerKey where
toJSON (PollerKey source role query) =
J.object [ "source" J..= source
, "role" J..= role
, "query" J..= query
]
type PollerMap = STMMap.Map PollerKey Poller
dumpPollerMap :: Bool -> PollerMap -> IO J.Value
dumpPollerMap extended lqMap =
fmap J.toJSON $ do
entries <- STM.atomically $ ListT.toList $ STMMap.listT lqMap
forM entries $ \(PollerKey source role query, Poller cohortsMap ioState) -> do
PollerIOState threadId pollerId <- STM.atomically $ STM.readTMVar ioState
cohortsJ <-
if extended
then Just <$> dumpCohortMap cohortsMap
else return Nothing
return $ J.object
[ "source" J..= source
, "role" J..= role
, "thread_id" J..= show (Immortal.threadId threadId)
, "poller_id" J..= pollerId
, "multiplexed_query" J..= query
, "cohorts" J..= cohortsJ
]
-- | An ID to track unique 'Poller's, so that we can gather metrics about each
-- poller
newtype PollerId = PollerId { unPollerId :: UUID.UUID }
deriving (Show, Eq, Generic, J.ToJSON)
data SubscriberExecutionDetails
= SubscriberExecutionDetails
{ _sedSubscriberId :: !SubscriberId
, _sedSubscriberMetadata :: !SubscriberMetadata
} deriving (Show, Eq)
-- | Execution information related to a cohort on a poll cycle
data CohortExecutionDetails
= CohortExecutionDetails
{ _cedCohortId :: !CohortId
, _cedVariables :: !CohortVariables
, _cedResponseSize :: !(Maybe Int)
-- ^ Nothing in case of an error
, _cedPushedTo :: ![SubscriberExecutionDetails]
-- ^ The response on this cycle has been pushed to these above subscribers
-- New subscribers (those which haven't been around during the previous poll
-- cycle) will always be part of this
, _cedIgnored :: ![SubscriberExecutionDetails]
-- ^ The response on this cycle has *not* been pushed to these above
-- subscribers. This would when the response hasn't changed from the previous
-- polled cycle
, _cedBatchId :: !BatchId
} deriving (Show, Eq)
-- | Execution information related to a single batched execution
data BatchExecutionDetails
= BatchExecutionDetails
{ _bedPgExecutionTime :: !Clock.DiffTime
-- ^ postgres execution time of each batch
, _bedPushTime :: !Clock.DiffTime
-- ^ time to taken to push to all cohorts belonging to this batch
, _bedBatchId :: !BatchId
-- ^ id of the batch
, _bedCohorts :: ![CohortExecutionDetails]
-- ^ execution details of the cohorts belonging to this batch
, _bedBatchResponseSizeBytes :: !(Maybe Int)
} deriving (Show, Eq)
-- | see Note [Minimal LiveQuery Poller Log]
batchExecutionDetailMinimal :: BatchExecutionDetails -> J.Value
batchExecutionDetailMinimal BatchExecutionDetails{..} =
let batchRespSize =
maybe mempty
(\respSize -> ["batch_response_size_bytes" J..= respSize])
_bedBatchResponseSizeBytes
in
J.object ([ "pg_execution_time" J..= _bedPgExecutionTime
, "push_time" J..= _bedPushTime
]
-- log batch resp size only when there are no errors
<> batchRespSize)
data PollDetails
= PollDetails
{ _pdPollerId :: !PollerId
-- ^ the unique ID (basically a thread that run as a 'Poller') for the
-- 'Poller'
, _pdGeneratedSql :: !Text
-- ^ the multiplexed SQL query to be run against the database with all the
-- variables together
, _pdSnapshotTime :: !Clock.DiffTime
-- ^ the time taken to get a snapshot of cohorts from our 'LiveQueriesState'
-- data structure
, _pdBatches :: ![BatchExecutionDetails]
-- ^ list of execution batches and their details
, _pdTotalTime :: !Clock.DiffTime
-- ^ total time spent on a poll cycle
, _pdLiveQueryOptions :: !LiveQueriesOptions
, _pdSource :: !SourceName
, _pdRole :: !RoleName
, _pdParameterizedQueryHash :: !ParameterizedQueryHash
} deriving (Show, Eq)
{- Note [Minimal LiveQuery Poller Log]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
We only want to log the minimal information in the livequery-poller-log as it
could be expensive to log the details of every subscriber (all poller related
information can always be retrieved by dumping the current live queries state)
We capture a lot more details in PollDetails and BatchExecutionDetails than
that is logged currently as other implementations such as pro can use them if
they need to.
-}
-- | see Note [Minimal LiveQuery Poller Log]
pollDetailMinimal :: PollDetails -> J.Value
pollDetailMinimal PollDetails{..} =
J.object [ "poller_id" J..= _pdPollerId
, "snapshot_time" J..= _pdSnapshotTime
, "batches" J..= map batchExecutionDetailMinimal _pdBatches
, "total_time" J..= _pdTotalTime
, "source" J..= _pdSource
, "role" J..= _pdRole
]
instance L.ToEngineLog PollDetails L.Hasura where
toEngineLog pl = (L.LevelInfo, L.ELTLivequeryPollerLog, pollDetailMinimal pl)
type LiveQueryPostPollHook = PollDetails -> IO ()
-- the default LiveQueryPostPollHook
defaultLiveQueryPostPollHook :: L.Logger L.Hasura -> LiveQueryPostPollHook
defaultLiveQueryPostPollHook = L.unLogger
-- | Where the magic happens: the top-level action run periodically by each
-- active 'Poller'. This needs to be async exception safe.
pollQuery
:: forall b
. BackendTransport b
=> PollerId
-> LiveQueriesOptions
-> (SourceName, SourceConfig b)
-> RoleName
-> ParameterizedQueryHash
-> MultiplexedQuery b
-> CohortMap
-> LiveQueryPostPollHook
-> IO ()
pollQuery pollerId lqOpts (sourceName, sourceConfig) roleName parameterizedQueryHash query cohortMap postPollHook = do
(totalTime, (snapshotTime, batchesDetails)) <- withElapsedTime $ do
-- snapshot the current cohorts and split them into batches
(snapshotTime, cohortBatches) <- withElapsedTime $ do
-- get a snapshot of all the cohorts
-- this need not be done in a transaction
cohorts <- STM.atomically $ TMap.toList cohortMap
cohortSnapshots <- mapM (STM.atomically . getCohortSnapshot) cohorts
-- cohorts are broken down into batches specified by the batch size
let cohortBatches = chunksOf (getNonNegativeInt (unBatchSize batchSize)) cohortSnapshots
-- associating every batch with their BatchId
pure $ zip (BatchId <$> [1 .. ]) cohortBatches
-- concurrently process each batch
batchesDetails <- A.forConcurrently cohortBatches $ \(batchId, cohorts) -> do
(queryExecutionTime, mxRes) <- runDBSubscription @b sourceConfig query $ over (each._2) _csVariables cohorts
let lqMeta = LiveQueryMetadata $ convertDuration queryExecutionTime
operations = getCohortOperations cohorts mxRes
-- batch response size is the sum of the response sizes of the cohorts
batchResponseSize =
case mxRes of
Left _ -> Nothing
Right resp -> Just $ getSum $ foldMap (Sum . BS.length . snd) resp
(pushTime, cohortsExecutionDetails) <- withElapsedTime $
A.forConcurrently operations $ \(res, cohortId, respData, snapshot) -> do
(pushedSubscribers, ignoredSubscribers) <-
pushResultToCohort res (fst <$> respData) lqMeta snapshot
pure CohortExecutionDetails
{ _cedCohortId = cohortId
, _cedVariables = _csVariables snapshot
, _cedPushedTo = pushedSubscribers
, _cedIgnored = ignoredSubscribers
, _cedResponseSize = snd <$> respData
, _cedBatchId = batchId
}
pure $ BatchExecutionDetails queryExecutionTime
pushTime
batchId
cohortsExecutionDetails
batchResponseSize
pure (snapshotTime, batchesDetails)
let pollDetails = PollDetails
{ _pdPollerId = pollerId
, _pdGeneratedSql = toTxt query
, _pdSnapshotTime = snapshotTime
, _pdBatches = batchesDetails
, _pdLiveQueryOptions = lqOpts
, _pdTotalTime = totalTime
, _pdSource = sourceName
, _pdRole = roleName
, _pdParameterizedQueryHash = parameterizedQueryHash
}
postPollHook pollDetails
where
LiveQueriesOptions batchSize _ = lqOpts
getCohortSnapshot (cohortVars, handlerC) = do
let Cohort resId respRef curOpsTV newOpsTV = handlerC
curOpsL <- TMap.toList curOpsTV
newOpsL <- TMap.toList newOpsTV
forM_ newOpsL $ \(k, action) -> TMap.insert action k curOpsTV
TMap.reset newOpsTV
let cohortSnapshot = CohortSnapshot cohortVars respRef (map snd curOpsL) (map snd newOpsL)
return (resId, cohortSnapshot)
getCohortOperations cohorts = \case
Left e ->
-- TODO: this is internal error
let resp = throwError $ GQExecError [encodeGQLErr False e]
in [(resp, cohortId, Nothing, snapshot) | (cohortId, snapshot) <- cohorts]
Right responses -> do
let cohortSnapshotMap = Map.fromList cohorts
flip mapMaybe responses $ \(cohortId, respBS) ->
let respHash = mkRespHash respBS
respSize = BS.length respBS
-- TODO: currently we ignore the cases when the cohortId from
-- Postgres response is not present in the cohort map of this batch
-- (this shouldn't happen but if it happens it means a logic error and
-- we should log it)
in (pure respBS, cohortId, Just (respHash, respSize),) <$>
Map.lookup cohortId cohortSnapshotMap