server: log additional info in the livequery poller logs

https://github.com/hasura/graphql-engine-mono/pull/1529

GitOrigin-RevId: 27060632d5ac0da3f695c6755350f4e32dc0efc1
This commit is contained in:
Karthikeyan Chinnakonda 2021-06-16 18:57:26 +05:30 committed by hasura-bot
parent c651c9b2ff
commit 62e7fe62db
11 changed files with 347 additions and 147 deletions

View File

@ -3,6 +3,8 @@
## Next release
(Add entries below in the order of server, console, cli, docs, others)
- server: make improvements in the `livequery-poller-log`
## v2.0.0-beta.2
### Bug fixes and improvements

View File

@ -542,6 +542,7 @@ library
, Hasura.GraphQL.Execute.Resolve
, Hasura.GraphQL.Execute.Types
, Hasura.GraphQL.Explain
, Hasura.GraphQL.ParameterizedQueryHash
, Hasura.GraphQL.Parser
, Hasura.GraphQL.Parser.Class
, Hasura.GraphQL.Parser.Class.Parse

View File

@ -46,10 +46,10 @@ import qualified Hasura.Tracing as Tracing
import Hasura.Base.Error
import Hasura.EncJSON
import Hasura.GraphQL.ParameterizedQueryHash
import Hasura.GraphQL.Parser.Column (UnpreparedValue)
import Hasura.GraphQL.Parser.Directives
import Hasura.GraphQL.Parser.Monad
import Hasura.GraphQL.Parser.Schema (Variable)
import Hasura.GraphQL.RemoteServer (execRemoteGQ)
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.Metadata.Class
@ -263,7 +263,7 @@ getResolvedExecPlan
-> HTTP.Manager
-> [HTTP.Header]
-> (GQLReqUnparsed, GQLReqParsed)
-> m (Telem.CacheHit, (G.SelectionSet G.NoFragments Variable, ResolvedExecutionPlan))
-> m (Telem.CacheHit, (ParameterizedQueryHash, ResolvedExecutionPlan))
getResolvedExecPlan env logger {- planCache-} userInfo sqlGenCtx
sc _scVer queryType httpManager reqHeaders (reqUnparsed, reqParsed) = -- do
@ -285,7 +285,7 @@ getResolvedExecPlan env logger {- planCache-} userInfo sqlGenCtx
-- addPlanToCache plan =
-- liftIO $ EP.addPlan scVer (userRole userInfo)
-- opNameM queryStr plan planCache
noExistingPlan :: m (G.SelectionSet G.NoFragments Variable, ResolvedExecutionPlan)
noExistingPlan :: m (ParameterizedQueryHash, ResolvedExecutionPlan)
noExistingPlan = do
-- GraphQL requests may incorporate fragments which insert a pre-defined
-- part of a GraphQL query. Here we make sure to remember those
@ -296,42 +296,47 @@ getResolvedExecPlan env logger {- planCache-} userInfo sqlGenCtx
mapMaybe takeFragment $ unGQLExecDoc $ _grQuery reqParsed
(gCtx, queryParts) <- getExecPlanPartial userInfo sc queryType reqParsed
case queryParts of
G.TypedOperationDefinition G.OperationTypeQuery _ varDefs directives selSet -> do
-- (Here the above fragment inlining is actually executed.)
inlinedSelSet <- EI.inlineSelectionSet fragments selSet
(executionPlan, queryRootFields, normalizedSelectionSet) <-
EQ.convertQuerySelSet env logger gCtx userInfo httpManager reqHeaders directives inlinedSelSet varDefs (_grVariables reqUnparsed) (scSetGraphqlIntrospectionOptions sc)
pure $ (normalizedSelectionSet, QueryExecutionPlan executionPlan queryRootFields)
(normalizedSelectionSet, resolvedExecPlan) <-
case queryParts of
G.TypedOperationDefinition G.OperationTypeQuery _ varDefs directives selSet -> do
-- (Here the above fragment inlining is actually executed.)
inlinedSelSet <- EI.inlineSelectionSet fragments selSet
(executionPlan, queryRootFields, normalizedSelectionSet) <-
EQ.convertQuerySelSet env logger gCtx userInfo httpManager reqHeaders directives inlinedSelSet varDefs (_grVariables reqUnparsed) (scSetGraphqlIntrospectionOptions sc)
pure $ (normalizedSelectionSet, QueryExecutionPlan executionPlan queryRootFields)
-- See Note [Temporarily disabling query plan caching]
-- traverse_ (addPlanToCache . EP.RPQuery) plan
G.TypedOperationDefinition G.OperationTypeMutation _ varDefs directives selSet -> do
-- (Here the above fragment inlining is actually executed.)
inlinedSelSet <- EI.inlineSelectionSet fragments selSet
(executionPlan, normalizedSelectionSet) <-
EM.convertMutationSelectionSet env logger gCtx sqlGenCtx userInfo httpManager reqHeaders directives inlinedSelSet varDefs (_grVariables reqUnparsed) (scSetGraphqlIntrospectionOptions sc)
pure $ (normalizedSelectionSet, MutationExecutionPlan executionPlan)
-- See Note [Temporarily disabling query plan caching]
-- traverse_ (addPlanToCache . EP.RPQuery) plan
G.TypedOperationDefinition G.OperationTypeSubscription _ varDefs directives selSet -> do
-- (Here the above fragment inlining is actually executed.)
inlinedSelSet <- EI.inlineSelectionSet fragments selSet
-- Parse as query to check correctness
(unpreparedAST, _reusability, normalizedDirectives, normalizedSelectionSet) <-
EQ.parseGraphQLQuery gCtx varDefs (_grVariables reqUnparsed) directives inlinedSelSet
-- Process directives on the subscription
(dirMap, _) <- (`onLeft` reportParseErrors) =<<
runParseT (parseDirectives customDirectives (G.DLExecutable G.EDLSUBSCRIPTION) normalizedDirectives)
-- A subscription should have exactly one root field.
-- However, for testing purposes, we may allow several root fields; we check for this by
-- looking for directive "_multiple_top_level_fields" on the subscription. THIS IS NOT A
-- SUPPORTED FEATURE. We might remove it in the future without warning. DO NOT USE THIS.
allowMultipleRootFields <- withDirective dirMap multipleRootFields $ pure . isJust
case inlinedSelSet of
[_] -> pure ()
[] -> throw500 "empty selset for subscription"
_ -> unless allowMultipleRootFields $
throw400 ValidationFailed "subscriptions must select one top level field"
subscriptionPlan <- buildSubscriptionPlan userInfo unpreparedAST
pure (normalizedSelectionSet, SubscriptionExecutionPlan subscriptionPlan)
-- See Note [Temporarily disabling query plan caching]
-- traverse_ (addPlanToCache . EP.RPQuery) plan
G.TypedOperationDefinition G.OperationTypeMutation _ varDefs directives selSet -> do
-- (Here the above fragment inlining is actually executed.)
inlinedSelSet <- EI.inlineSelectionSet fragments selSet
(executionPlan, normalizedSelectionSet) <-
EM.convertMutationSelectionSet env logger gCtx sqlGenCtx userInfo httpManager reqHeaders directives inlinedSelSet varDefs (_grVariables reqUnparsed) (scSetGraphqlIntrospectionOptions sc)
pure $ (normalizedSelectionSet, MutationExecutionPlan executionPlan)
-- See Note [Temporarily disabling query plan caching]
-- traverse_ (addPlanToCache . EP.RPQuery) plan
G.TypedOperationDefinition G.OperationTypeSubscription _ varDefs directives selSet -> do
-- (Here the above fragment inlining is actually executed.)
inlinedSelSet <- EI.inlineSelectionSet fragments selSet
-- Parse as query to check correctness
(unpreparedAST, _reusability, normalizedDirectives, normalizedSelectionSet) <-
EQ.parseGraphQLQuery gCtx varDefs (_grVariables reqUnparsed) directives inlinedSelSet
-- Process directives on the subscription
(dirMap, _) <- (`onLeft` reportParseErrors) =<<
runParseT (parseDirectives customDirectives (G.DLExecutable G.EDLSUBSCRIPTION) normalizedDirectives)
-- A subscription should have exactly one root field.
-- However, for testing purposes, we may allow several root fields; we check for this by
-- looking for directive "_multiple_top_level_fields" on the subscription. THIS IS NOT A
-- SUPPORTED FEATURE. We might remove it in the future without warning. DO NOT USE THIS.
allowMultipleRootFields <- withDirective dirMap multipleRootFields $ pure . isJust
case inlinedSelSet of
[_] -> pure ()
[] -> throw500 "empty selset for subscription"
_ -> unless allowMultipleRootFields $
throw400 ValidationFailed "subscriptions must select one top level field"
subscriptionPlan <- buildSubscriptionPlan userInfo unpreparedAST
pure (normalizedSelectionSet, SubscriptionExecutionPlan subscriptionPlan)
-- the parameterized query hash is calculated here because it is used in multiple
-- places and instead of calculating it separately, this is a common place to calculate
-- the parameterized query hash and then thread it to the required places
pure $ (calculateParameterizedQueryHash normalizedSelectionSet, resolvedExecPlan)

View File

@ -38,45 +38,52 @@ module Hasura.GraphQL.Execute.LiveQuery.Poll (
, LGQResponse
, LiveQueryResponse(..)
, LiveQueryMetadata(..)
, SubscriberExecutionDetails (..)
-- * Batch
, BatchId (..)
) where
import Data.List.Split (chunksOf)
import Data.List.Split (chunksOf)
#ifndef PROFILING
import GHC.AssertNF
#endif
import Hasura.Prelude
import qualified Control.Concurrent.Async as A
import qualified Control.Concurrent.STM as STM
import qualified Control.Immortal as Immortal
import qualified Crypto.Hash as CH
import qualified Data.Aeson.Extended as J
import qualified Data.Aeson.TH as J
import qualified Data.ByteString as BS
import qualified Data.HashMap.Strict as Map
import qualified Data.Time.Clock as Clock
import qualified Data.UUID as UUID
import qualified Data.UUID.V4 as UUID
import qualified Control.Concurrent.Async as A
import qualified Control.Concurrent.STM as STM
import qualified Control.Immortal as Immortal
import qualified Crypto.Hash as CH
import qualified Data.Aeson.Extended as J
import qualified Data.ByteString as BS
import qualified Data.HashMap.Strict as Map
import qualified Data.Time.Clock as Clock
import qualified Data.UUID as UUID
import qualified Data.UUID.V4 as UUID
import qualified ListT
import qualified StmContainers.Map as STMMap
import qualified StmContainers.Map as STMMap
import Control.Lens
import Data.Monoid (Sum (..))
import Data.Text.Extended
import qualified Hasura.GraphQL.Execute.LiveQuery.TMap as TMap
import qualified Hasura.Logging as L
import qualified Hasura.GraphQL.Execute.LiveQuery.TMap as TMap
import qualified Hasura.Logging as L
import Hasura.Base.Error
import Hasura.GraphQL.Execute.Backend
import Hasura.GraphQL.Execute.LiveQuery.Options
import Hasura.GraphQL.Execute.LiveQuery.Plan
import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash)
import Hasura.GraphQL.Transport.Backend
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.GraphQL.Transport.WebSocket.Protocol (OperationId)
import qualified Hasura.GraphQL.Transport.WebSocket.Server as WS
import Hasura.RQL.Types.Backend
import Hasura.RQL.Types.Common (SourceName, getNonNegativeInt)
import Hasura.RQL.Types.Common (SourceName, getNonNegativeInt)
import Hasura.Server.Types (RequestId)
import Hasura.Session
-- ----------------------------------------------------------------------------------------------
-- Subscribers
@ -96,13 +103,21 @@ newtype SubscriberMetadata
= SubscriberMetadata { unSubscriberMetadata :: J.Value }
deriving (Show, Eq, J.ToJSON)
mkSubscriberMetadata :: J.Value -> SubscriberMetadata
mkSubscriberMetadata = SubscriberMetadata
mkSubscriberMetadata :: WS.WSId -> OperationId -> Maybe OperationName -> RequestId -> SubscriberMetadata
mkSubscriberMetadata websocketId operationId operationName reqId =
SubscriberMetadata $ J.object
[ "websocket_id" J..= websocketId
, "operation_id" J..= operationId
, "operation_name" J..= operationName
, "request_id" J..= reqId
]
data Subscriber
= Subscriber
{ _sId :: !SubscriberId
, _sMetadata :: !SubscriberMetadata
, _sRequestId :: !RequestId
, _sOperationName :: !(Maybe OperationName)
, _sOnChangeCallback :: !OnChange
}
@ -150,6 +165,12 @@ data Cohort
-- result changed, then merge them in the map of existing subscribers
}
-- | The @BatchId@ is a number based ID to uniquely identify a batch in a single poll and
-- it's used to identify the batch to which a cohort belongs to.
newtype BatchId
= BatchId { _unBatchId :: Int }
deriving (Show, Eq, J.ToJSON)
{- Note [Blake2b faster than SHA-256]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
At the time of writing, from https://blake2.net, it is stated,
@ -214,9 +235,7 @@ pushResultToCohort
-> Maybe ResponseHash
-> LiveQueryMetadata
-> CohortSnapshot
-> IO ( [(SubscriberId, SubscriberMetadata)]
, [(SubscriberId, SubscriberMetadata)]
)
-> IO ( [SubscriberExecutionDetails], [SubscriberExecutionDetails])
-- ^ subscribers to which data has been pushed, subscribers which already
-- have this data (this information is exposed by metrics reporting)
pushResultToCohort result !respHashM (LiveQueryMetadata dTime) cohortSnapshot = do
@ -233,14 +252,15 @@ pushResultToCohort result !respHashM (LiveQueryMetadata dTime) cohortSnapshot =
else
return (newSinks, curSinks)
pushResultToSubscribers subscribersToPush
pure $ over (each.each) (\Subscriber{..} -> (_sId, _sMetadata))
(subscribersToPush, subscribersToIgnore)
pure $ over (each.each) (\Subscriber{..}
-> SubscriberExecutionDetails _sId _sMetadata)
(subscribersToPush, subscribersToIgnore)
where
CohortSnapshot _ respRef curSinks newSinks = cohortSnapshot
response = result <&> \payload -> LiveQueryResponse payload dTime
pushResultToSubscribers =
A.mapConcurrently_ $ \(Subscriber _ _ action) -> action response
A.mapConcurrently_ $ \Subscriber {..} -> _sOnChangeCallback response
-- -----------------------------------------------------------------------------
-- Pollers
@ -320,6 +340,12 @@ dumpPollerMap extended lqMap =
newtype PollerId = PollerId { unPollerId :: UUID.UUID }
deriving (Show, Eq, Generic, J.ToJSON)
data SubscriberExecutionDetails
= SubscriberExecutionDetails
{ _sedSubscriberId :: !SubscriberId
, _sedSubscriberMetadata :: !SubscriberMetadata
} deriving (Show, Eq)
-- | Execution information related to a cohort on a poll cycle
data CohortExecutionDetails
= CohortExecutionDetails
@ -327,58 +353,66 @@ data CohortExecutionDetails
, _cedVariables :: !CohortVariables
, _cedResponseSize :: !(Maybe Int)
-- ^ Nothing in case of an error
, _cedPushedTo :: ![(SubscriberId, SubscriberMetadata)]
, _cedPushedTo :: ![SubscriberExecutionDetails]
-- ^ The response on this cycle has been pushed to these above subscribers
-- New subscribers (those which haven't been around during the previous poll
-- cycle) will always be part of this
, _cedIgnored :: ![(SubscriberId, SubscriberMetadata)]
, _cedIgnored :: ![SubscriberExecutionDetails]
-- ^ The response on this cycle has *not* been pushed to these above
-- subscribers. This would when the response hasn't changed from the previous
-- polled cycle
, _cedBatchId :: !BatchId
} deriving (Show, Eq)
$(J.deriveToJSON hasuraJSON ''CohortExecutionDetails)
-- | Execution information related to a single batched execution
data BatchExecutionDetails
= BatchExecutionDetails
{ _bedPgExecutionTime :: !Clock.DiffTime
{ _bedPgExecutionTime :: !Clock.DiffTime
-- ^ postgres execution time of each batch
, _bedPushTime :: !Clock.DiffTime
, _bedPushTime :: !Clock.DiffTime
-- ^ time to taken to push to all cohorts belonging to this batch
, _bedCohorts :: ![CohortExecutionDetails]
, _bedBatchId :: !BatchId
-- ^ id of the batch
, _bedCohorts :: ![CohortExecutionDetails]
-- ^ execution details of the cohorts belonging to this batch
, _bedBatchResponseSizeBytes :: !(Maybe Int)
} deriving (Show, Eq)
-- | see Note [Minimal LiveQuery Poller Log]
batchExecutionDetailMinimal :: BatchExecutionDetails -> J.Value
batchExecutionDetailMinimal BatchExecutionDetails{..} =
J.object [ "pg_execution_time" J..= _bedPgExecutionTime
, "push_time" J..= _bedPushTime
]
$(J.deriveToJSON hasuraJSON ''BatchExecutionDetails)
let batchRespSize =
maybe mempty
(\respSize -> ["batch_response_size_bytes" J..= respSize])
_bedBatchResponseSizeBytes
in
J.object ([ "pg_execution_time" J..= _bedPgExecutionTime
, "push_time" J..= _bedPushTime
]
-- log batch resp size only when there are no errors
<> batchRespSize)
data PollDetails
= PollDetails
{ _pdPollerId :: !PollerId
{ _pdPollerId :: !PollerId
-- ^ the unique ID (basically a thread that run as a 'Poller') for the
-- 'Poller'
, _pdGeneratedSql :: !Text
, _pdGeneratedSql :: !Text
-- ^ the multiplexed SQL query to be run against the database with all the
-- variables together
, _pdSnapshotTime :: !Clock.DiffTime
, _pdSnapshotTime :: !Clock.DiffTime
-- ^ the time taken to get a snapshot of cohorts from our 'LiveQueriesState'
-- data structure
, _pdBatches :: ![BatchExecutionDetails]
, _pdBatches :: ![BatchExecutionDetails]
-- ^ list of execution batches and their details
, _pdTotalTime :: !Clock.DiffTime
, _pdTotalTime :: !Clock.DiffTime
-- ^ total time spent on a poll cycle
, _pdLiveQueryOptions :: !LiveQueriesOptions
, _pdLiveQueryOptions :: !LiveQueriesOptions
, _pdSource :: !SourceName
, _pdRole :: !RoleName
, _pdParameterizedQueryHash :: !ParameterizedQueryHash
} deriving (Show, Eq)
$(J.deriveToJSON hasuraJSON ''PollDetails)
{- Note [Minimal LiveQuery Poller Log]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
We only want to log the minimal information in the livequery-poller-log as it
@ -396,6 +430,8 @@ pollDetailMinimal PollDetails{..} =
, "snapshot_time" J..= _pdSnapshotTime
, "batches" J..= map batchExecutionDetailMinimal _pdBatches
, "total_time" J..= _pdTotalTime
, "source" J..= _pdSource
, "role" J..= _pdRole
]
instance L.ToEngineLog PollDetails L.Hasura where
@ -414,12 +450,14 @@ pollQuery
. BackendTransport b
=> PollerId
-> LiveQueriesOptions
-> SourceConfig b
-> (SourceName, SourceConfig b)
-> RoleName
-> ParameterizedQueryHash
-> MultiplexedQuery b
-> CohortMap
-> LiveQueryPostPollHook
-> IO ()
pollQuery pollerId lqOpts sourceConfig query cohortMap postPollHook = do
pollQuery pollerId lqOpts (sourceName, sourceConfig) roleName parameterizedQueryHash query cohortMap postPollHook = do
(totalTime, (snapshotTime, batchesDetails)) <- withElapsedTime $ do
-- snapshot the current cohorts and split them into batches
@ -429,15 +467,21 @@ pollQuery pollerId lqOpts sourceConfig query cohortMap postPollHook = do
cohorts <- STM.atomically $ TMap.toList cohortMap
cohortSnapshots <- mapM (STM.atomically . getCohortSnapshot) cohorts
-- cohorts are broken down into batches specified by the batch size
pure $ chunksOf (getNonNegativeInt (unBatchSize batchSize)) cohortSnapshots
let cohortBatches = chunksOf (getNonNegativeInt (unBatchSize batchSize)) cohortSnapshots
-- associating every batch with their BatchId
pure $ zip (BatchId <$> [1 .. ]) cohortBatches
-- concurrently process each batch
batchesDetails <- A.forConcurrently cohortBatches $ \cohorts -> do
batchesDetails <- A.forConcurrently cohortBatches $ \(batchId, cohorts) -> do
(queryExecutionTime, mxRes) <- runDBSubscription @b sourceConfig query $ over (each._2) _csVariables cohorts
let lqMeta = LiveQueryMetadata $ convertDuration queryExecutionTime
operations = getCohortOperations cohorts mxRes
-- batch response size is the sum of the response sizes of the cohorts
batchResponseSize =
case mxRes of
Left _ -> Nothing
Right resp -> Just $ getSum $ foldMap (Sum . BS.length . snd) resp
(pushTime, cohortsExecutionDetails) <- withElapsedTime $
A.forConcurrently operations $ \(res, cohortId, respData, snapshot) -> do
(pushedSubscribers, ignoredSubscribers) <-
@ -448,8 +492,13 @@ pollQuery pollerId lqOpts sourceConfig query cohortMap postPollHook = do
, _cedPushedTo = pushedSubscribers
, _cedIgnored = ignoredSubscribers
, _cedResponseSize = snd <$> respData
, _cedBatchId = batchId
}
pure $ BatchExecutionDetails queryExecutionTime pushTime cohortsExecutionDetails
pure $ BatchExecutionDetails queryExecutionTime
pushTime
batchId
cohortsExecutionDetails
batchResponseSize
pure (snapshotTime, batchesDetails)
@ -460,6 +509,9 @@ pollQuery pollerId lqOpts sourceConfig query cohortMap postPollHook = do
, _pdBatches = batchesDetails
, _pdLiveQueryOptions = lqOpts
, _pdTotalTime = totalTime
, _pdSource = sourceName
, _pdRole = roleName
, _pdParameterizedQueryHash = parameterizedQueryHash
}
postPollHook pollDetails
where

View File

@ -46,11 +46,14 @@ import Hasura.GraphQL.Execute.Backend
import Hasura.GraphQL.Execute.LiveQuery.Options
import Hasura.GraphQL.Execute.LiveQuery.Plan
import Hasura.GraphQL.Execute.LiveQuery.Poll
import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash)
import Hasura.GraphQL.Transport.Backend
import Hasura.GraphQL.Transport.HTTP.Protocol (OperationName)
import Hasura.GraphQL.Transport.WebSocket.Protocol
import Hasura.RQL.Types.Action
import Hasura.RQL.Types.Common (SourceName, unNonNegativeDiffTime)
import Hasura.Server.Init (ServerMetrics (..))
import Hasura.Server.Types (RequestId)
-- | The top-level datatype that holds the state for all active live queries.
@ -95,18 +98,23 @@ addLiveQuery
-> SubscriberMetadata
-> LiveQueriesState
-> SourceName
-> ParameterizedQueryHash
-> Maybe OperationName
-- ^ operation name of the query
-> RequestId
-> LiveQueryPlan b (MultiplexedQuery b)
-> OnChange
-- ^ the action to be executed when result changes
-> IO LiveQueryId
addLiveQuery logger serverMetrics subscriberMetadata lqState source plan onResultAction = do
addLiveQuery logger serverMetrics subscriberMetadata lqState
source parameterizedQueryHash operationName requestId plan onResultAction = do
-- CAREFUL!: It's absolutely crucial that we can't throw any exceptions here!
-- disposable UUIDs:
cohortId <- newCohortId
subscriberId <- newSubscriberId
let !subscriber = Subscriber subscriberId subscriberMetadata onResultAction
let !subscriber = Subscriber subscriberId subscriberMetadata requestId operationName onResultAction
#ifndef PROFILING
$assertNFHere subscriber -- so we don't write thunks to mutable vars
@ -131,7 +139,7 @@ addLiveQuery logger serverMetrics subscriberMetadata lqState source plan onResul
onJust handlerM $ \handler -> do
pollerId <- PollerId <$> UUID.nextRandom
threadRef <- forkImmortal ("pollQuery." <> show pollerId) logger $ forever $ do
pollQuery @b pollerId lqOpts sourceConfig query (_pCohorts handler) postPollHook
pollQuery @b pollerId lqOpts (source, sourceConfig) role parameterizedQueryHash query (_pCohorts handler) postPollHook
sleep $ unNonNegativeDiffTime $ unRefetchInterval refetchInterval
let !pState = PollerIOState threadRef pollerId
#ifndef PROFILING

View File

@ -0,0 +1,135 @@
{-|
This module calculates parameterized query hash, which is a way to
hash an incoming query (after resolving variables) with all leaf nodes
(i.e. scalar values) discarded. In other words, two queries having the same
parameterized query hash are essentially the same query but may differ in
leaf values.
For example:
1. query {
authors (where: {id: {_eq: 2}}) {
id
name
}
}
2. query {
authors (where: {id: {_eq: 203943}}) {
id
name
}
}
3. query {
authors (where: {id: {_eq: $id}}) {
id
name
}
}
For any value of `id`
4. query {
authors (where: $whereBoolExp) {
id
name
}
}
only when `whereBoolExp` is of the form of
{
"id": {
"_eq": <id>
}
}
All the above queries should result in the same parameterized query hash.
The following steps are done to calculate the parameterized query hash:
1. Normalize the GraphQL query by substituting the variables (if any) in appropriate places.
2. Substitute any scalar GraphQL values (Int, Float, Enum, String and Boolean) to null
3. For input objects and list, traverse through them and do step no 2.
4. Calculate the hash of the query obtained from step 3.
Note: Parameterized query hash is a PRO only feature
-}
module Hasura.GraphQL.ParameterizedQueryHash
( calculateParameterizedQueryHash
, ParameterizedQueryHash
)
where
import Hasura.Prelude
import qualified Data.Aeson as J
import qualified Data.ByteString as B
import qualified Data.HashMap.Strict as Map
import qualified Language.GraphQL.Draft.Printer as G
import qualified Language.GraphQL.Draft.Syntax as G
import qualified Text.Builder as Text
import Hasura.GraphQL.Parser (InputValue (..), Variable (..))
import Hasura.Server.Utils (cryptoHash)
newtype ParameterizedQueryHash
= ParameterizedQueryHash { unParamQueryHash :: B.ByteString }
deriving (Show, Eq)
instance J.ToJSON ParameterizedQueryHash where
toJSON = J.String . bsToTxt . unParamQueryHash
normalizeSelectionSet :: G.SelectionSet G.NoFragments Variable -> G.SelectionSet G.NoFragments Void
normalizeSelectionSet = (normalizeSelection =<<)
where
normalizeSelection :: G.Selection G.NoFragments Variable -> G.SelectionSet G.NoFragments Void
normalizeSelection (G.SelectionField fld) = pure $ G.SelectionField (normalizeField fld)
normalizeSelection (G.SelectionInlineFragment (G.InlineFragment _ _ selSet)) =
normalizeSelectionSet selSet
normalizeField (G.Field _alias name args _directives selSet) =
G.Field Nothing name (Map.map normalizeValue args) mempty $ normalizeSelectionSet selSet
normalizeConstValue :: G.Value Void -> G.Value Void
normalizeConstValue = \case
G.VNull -> G.VNull
G.VInt _ -> G.VNull
G.VFloat _ -> G.VNull
G.VString _ -> G.VNull
G.VBoolean _ -> G.VNull
G.VEnum _ -> G.VNull
G.VList l -> G.VList $ map normalizeConstValue l
G.VObject obj -> G.VObject $ Map.map normalizeConstValue obj
jsonToNormalizedGQLVal :: J.Value -> G.Value Void
jsonToNormalizedGQLVal = \case
J.Null -> G.VNull
J.Bool _ -> G.VNull
J.String _ -> G.VNull
J.Number _ -> G.VNull
J.Array l -> G.VList $ jsonToNormalizedGQLVal <$> toList l
J.Object vals -> G.VObject $ Map.fromList $
flip map (Map.toList vals) $ \(key, val) ->
(G.unsafeMkName key, jsonToNormalizedGQLVal val)
normalizeValue :: G.Value Variable -> G.Value Void
normalizeValue = \case
G.VNull -> G.VNull
G.VInt _ -> G.VNull
G.VFloat _ -> G.VNull
G.VString _ -> G.VNull
G.VBoolean _ -> G.VNull
G.VEnum _ -> G.VNull
G.VList l -> G.VList $ map normalizeValue l
G.VObject obj -> G.VObject $ Map.map normalizeValue obj
G.VVariable (Variable _info _type value) ->
case value of
GraphQLValue val -> normalizeConstValue val
JSONValue v -> jsonToNormalizedGQLVal v
calculateParameterizedQueryHash :: G.SelectionSet G.NoFragments Variable -> ParameterizedQueryHash
calculateParameterizedQueryHash = ParameterizedQueryHash . cryptoHash . Text.run . G.selectionSet . normalizeSelectionSet

View File

@ -52,8 +52,8 @@ import Hasura.Base.Error
import Hasura.EncJSON
import Hasura.GraphQL.Logging (MonadQueryLog (logQueryLog),
QueryLog (..), QueryLogKind (..))
import Hasura.GraphQL.ParameterizedQueryHash
import Hasura.GraphQL.Parser.Column (UnpreparedValue (..))
import Hasura.GraphQL.Parser.Schema (Variable)
import Hasura.GraphQL.Transport.Backend
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.GraphQL.Transport.Instances ()
@ -203,16 +203,16 @@ runGQ
-> [HTTP.Header]
-> E.GraphQLQueryType
-> GQLReqUnparsed
-> m (G.SelectionSet G.NoFragments Variable, HttpResponse (Maybe GQResponse, EncJSON))
-> m (ParameterizedQueryHash, HttpResponse (Maybe GQResponse, EncJSON))
runGQ env logger reqId userInfo ipAddress reqHeaders queryType reqUnparsed = do
(telemTimeTot_DT, (telemCacheHit, (telemQueryType, telemTimeIO_DT, telemLocality, resp, normalizedSelectionSet))) <- withElapsedTime $ do
(telemTimeTot_DT, (telemCacheHit, (telemQueryType, telemTimeIO_DT, telemLocality, resp, parameterizedQueryHash))) <- withElapsedTime $ do
E.ExecutionCtx _ sqlGenCtx {- planCache -} sc scVer httpManager enableAL <- ask
-- run system authorization on the GraphQL API
reqParsed <- E.checkGQLExecution userInfo (reqHeaders, ipAddress) enableAL sc reqUnparsed
>>= flip onLeft throwError
(telemCacheHit, (normalizedSelectionSet, execPlan)) <-
(telemCacheHit, (parameterizedQueryHash, execPlan)) <-
E.getResolvedExecPlan
env logger {- planCache -}
userInfo sqlGenCtx sc scVer queryType
@ -236,7 +236,7 @@ runGQ env logger reqId userInfo ipAddress reqHeaders queryType reqUnparsed = do
case fmap decodeGQResp cachedValue of
Just cachedResponseData -> do
logQueryLog logger $ QueryLog reqUnparsed Nothing reqId QueryLogKindCached
pure (Telem.Query, 0, Telem.Local, HttpResponse cachedResponseData responseHeaders, normalizedSelectionSet)
pure (Telem.Query, 0, Telem.Local, HttpResponse cachedResponseData responseHeaders, parameterizedQueryHash)
Nothing -> do
conclusion <- runExceptT $ forWithKey queryPlans $ \fieldName -> \case
@ -278,7 +278,7 @@ runGQ env logger reqId userInfo ipAddress reqHeaders queryType reqUnparsed = do
logQueryLog logger $ QueryLog reqUnparsed Nothing reqId QueryLogKindIntrospection
buildRaw json
out@(_, _, _, HttpResponse responseData _, _) <-
buildResultFromFragments Telem.Query conclusion responseHeaders normalizedSelectionSet
buildResultFromFragments Telem.Query conclusion responseHeaders parameterizedQueryHash
Tracing.interpTraceT (liftEitherM . runExceptT) $ cacheStore cacheKey $ snd responseData
pure out
@ -296,7 +296,7 @@ runGQ env logger reqId userInfo ipAddress reqHeaders queryType reqUnparsed = do
resp <- runExceptT $ doQErr $
runPGMutationTransaction reqId reqUnparsed userInfo logger sourceConfig pgMutations
-- we do not construct result fragments since we have only one result
buildResult Telem.Mutation normalizedSelectionSet resp \(telemTimeIO_DT, results) ->
buildResult Telem.Mutation parameterizedQueryHash resp \(telemTimeIO_DT, results) ->
let responseData = Right $ encJToLBS $ encJFromInsOrdHashMap $ OMap.mapKeys G.unName results
in ( Telem.Mutation
, telemTimeIO_DT
@ -304,7 +304,7 @@ runGQ env logger reqId userInfo ipAddress reqHeaders queryType reqUnparsed = do
, HttpResponse
(Just responseData, encodeGQResp responseData)
[]
, normalizedSelectionSet
, parameterizedQueryHash
)
-- we are not in the transaction case; proceeding normally
@ -347,7 +347,7 @@ runGQ env logger reqId userInfo ipAddress reqHeaders queryType reqUnparsed = do
E.ExecStepRaw json -> do
logQueryLog logger $ QueryLog reqUnparsed Nothing reqId QueryLogKindIntrospection
buildRaw json
buildResultFromFragments Telem.Mutation conclusion [] normalizedSelectionSet
buildResultFromFragments Telem.Mutation conclusion [] parameterizedQueryHash
E.SubscriptionExecutionPlan _sub ->
throw400 UnexpectedPayload "subscriptions are not supported over HTTP, use websockets instead"
@ -356,7 +356,7 @@ runGQ env logger reqId userInfo ipAddress reqHeaders queryType reqUnparsed = do
telemTimeTot = convertDuration telemTimeTot_DT
telemTransport = Telem.HTTP
Telem.recordTimingMetric Telem.RequestDimensions{..} Telem.RequestTimings{..}
return (normalizedSelectionSet, resp)
return (parameterizedQueryHash, resp)
where
getExecStepActionWithActionInfo acc execStep = case execStep of
EB.ExecStepAction _ actionInfo _remoteJoins -> (actionInfo:acc)
@ -377,15 +377,15 @@ runGQ env logger reqId userInfo ipAddress reqHeaders queryType reqUnparsed = do
:: Telem.QueryType
-> Either (Either GQExecError QErr) (InsOrdHashMap G.Name ResultsFragment)
-> HTTP.ResponseHeaders
-> G.SelectionSet G.NoFragments Variable
-> ParameterizedQueryHash
-> m ( Telem.QueryType
, DiffTime
, Telem.Locality
, HttpResponse (Maybe GQResponse, EncJSON)
, G.SelectionSet G.NoFragments Variable
, ParameterizedQueryHash
)
buildResultFromFragments telemType fragments cacheHeaders normalizedSelSet =
buildResult telemType normalizedSelSet fragments \results ->
buildResultFromFragments telemType fragments cacheHeaders parameterizedQueryHash =
buildResult telemType parameterizedQueryHash fragments \results ->
let responseData = Right $ encJToLBS $ encJFromInsOrdHashMap $ rfResponse <$> OMap.mapKeys G.unName results
in ( telemType
, sum (fmap rfTimeIO results)
@ -393,28 +393,28 @@ runGQ env logger reqId userInfo ipAddress reqHeaders queryType reqUnparsed = do
, HttpResponse
(Just responseData, encodeGQResp responseData)
(cacheHeaders <> foldMap rfHeaders results)
, normalizedSelSet
, parameterizedQueryHash
)
buildResult
:: Telem.QueryType
-> G.SelectionSet G.NoFragments Variable
-> ParameterizedQueryHash
-> Either (Either GQExecError QErr) a
-> (a ->
( Telem.QueryType
, DiffTime
, Telem.Locality
, HttpResponse (Maybe GQResponse, EncJSON)
, G.SelectionSet G.NoFragments Variable
, ParameterizedQueryHash
)
)
-> m ( Telem.QueryType
, DiffTime
, Telem.Locality
, HttpResponse (Maybe GQResponse, EncJSON)
, G.SelectionSet G.NoFragments Variable
, ParameterizedQueryHash
)
buildResult telemType normalizedSelSet result f = case result of
buildResult telemType parameterizedQueryHash result f = case result of
Right a -> pure $ f a
Left (Right err) -> throwError err
Left (Left err) -> pure ( telemType
@ -423,7 +423,7 @@ runGQ env logger reqId userInfo ipAddress reqHeaders queryType reqUnparsed = do
, HttpResponse
(Just (Left err), encodeGQResp $ Left err)
[]
, normalizedSelSet
, parameterizedQueryHash
)
coalescePostgresMutations
@ -502,8 +502,8 @@ runGQBatched
runGQBatched env logger reqId responseErrorsConfig userInfo ipAddress reqHdrs queryType query =
case query of
GQLSingleRequest req -> do
(normalizedSelectionSet, httpResp) <- runGQ env logger reqId userInfo ipAddress reqHdrs queryType req
let httpLoggingMetadata = buildHTTPLoggingMetadata @m [normalizedSelectionSet]
(parameterizedQueryHash, httpResp) <- runGQ env logger reqId userInfo ipAddress reqHdrs queryType req
let httpLoggingMetadata = buildHTTPLoggingMetadata @m [parameterizedQueryHash]
pure (httpLoggingMetadata, snd <$> httpResp)
GQLBatchedReqs reqs -> do
-- It's unclear what we should do if we receive multiple

View File

@ -378,7 +378,7 @@ onStart env serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
userInfo sqlGenCtx sc scVer queryType
httpMgr reqHdrs (q, reqParsed)
(telemCacheHit, (_normalizeSelSet, execPlan)) <- onLeft execPlanE (withComplete . preExecErr requestId)
(telemCacheHit, (parameterizedQueryHash, execPlan)) <- onLeft execPlanE (withComplete . preExecErr requestId)
case execPlan of
E.QueryExecutionPlan queryPlan asts -> Tracing.trace "Query" $ do
@ -539,7 +539,7 @@ onStart env serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
E.SEOnSourceDB actionIds liveQueryBuilder -> do
actionLogMapE <- fmap fst <$> runExceptT (EA.fetchActionLogResponses actionIds)
actionLogMap <- onLeft actionLogMapE (withComplete . preExecErr requestId)
lqIdE <- liftIO $ startLiveQuery liveQueryBuilder actionLogMap
lqIdE <- liftIO $ startLiveQuery liveQueryBuilder parameterizedQueryHash requestId actionLogMap
lqId <- onLeft lqIdE (withComplete . preExecErr requestId)
-- Update async action query subscription state
@ -552,7 +552,8 @@ onStart env serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
logQueryLog logger $ QueryLog q Nothing requestId QueryLogKindAction
liftIO $ do
let asyncActionQueryLive = LQ.LAAQOnSourceDB $
LQ.LiveAsyncActionQueryOnSource lqId actionLogMap $ restartLiveQuery liveQueryBuilder
LQ.LiveAsyncActionQueryOnSource lqId actionLogMap $
restartLiveQuery parameterizedQueryHash requestId liveQueryBuilder
onUnexpectedException err = do
sendError requestId err
@ -659,24 +660,20 @@ onStart env serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
liftIO $ sendCompleted Nothing
throwError ()
restartLiveQuery liveQueryBuilder lqId actionLogMap = do
restartLiveQuery parameterizedQueryHash requestId liveQueryBuilder lqId actionLogMap = do
LQ.removeLiveQuery logger (_wseServerMetrics serverEnv) lqMap lqId
either (const Nothing) Just <$> startLiveQuery liveQueryBuilder actionLogMap
either (const Nothing) Just <$> startLiveQuery liveQueryBuilder parameterizedQueryHash requestId actionLogMap
startLiveQuery liveQueryBuilder actionLogMap = do
startLiveQuery liveQueryBuilder parameterizedQueryHash requestId actionLogMap = do
liveQueryE <- runExceptT $ liveQueryBuilder actionLogMap
for liveQueryE $ \(sourceName, E.LQP exists) -> do
let subscriberMetadata = LQ.mkSubscriberMetadata $ J.object
[ "websocket_id" J..= WS.getWSId wsConn
, "operation_id" J..= opId
]
let !opName = _grOperationName q
subscriberMetadata = LQ.mkSubscriberMetadata (WS.getWSId wsConn) opId opName requestId
-- NOTE!: we mask async exceptions higher in the call stack, but it's
-- crucial we don't lose lqId after addLiveQuery returns successfully.
!lqId <- liftIO $ AB.dispatchAnyBackend @BackendTransport exists
\(E.MultiplexedLiveQueryPlan liveQueryPlan) ->
LQ.addLiveQuery logger (_wseServerMetrics serverEnv) subscriberMetadata lqMap sourceName liveQueryPlan liveQOnChange
let !opName = _grOperationName q
LQ.addLiveQuery logger (_wseServerMetrics serverEnv) subscriberMetadata lqMap sourceName parameterizedQueryHash opName requestId liveQueryPlan liveQOnChange
#ifndef PROFILING
liftIO $ $assertNFHere (lqId, opName) -- so we don't write thunks to mutable vars
#endif

View File

@ -20,32 +20,32 @@ module Hasura.Server.Logging
import Hasura.Prelude
import qualified Data.ByteString.Lazy as BL
import qualified Data.Environment as Env
import qualified Data.HashMap.Strict as HM
import qualified Data.TByteString as TBS
import qualified Data.Text as T
import qualified Language.GraphQL.Draft.Syntax as G
import qualified Network.HTTP.Types as HTTP
import qualified Network.Wai.Extended as Wai
import qualified Data.ByteString.Lazy as BL
import qualified Data.Environment as Env
import qualified Data.HashMap.Strict as HM
import qualified Data.TByteString as TBS
import qualified Data.Text as T
import qualified Network.HTTP.Types as HTTP
import qualified Network.Wai.Extended as Wai
import Data.Aeson
import Data.Aeson.TH
import Data.Int (Int64)
import Data.Int (Int64)
import Data.Text.Extended
import Hasura.Base.Error
import Hasura.GraphQL.Parser.Schema (Variable)
import Hasura.GraphQL.ParameterizedQueryHash
import Hasura.HTTP
import Hasura.Logging
import Hasura.Metadata.Class
import Hasura.RQL.Types
import Hasura.Server.Compression
import Hasura.Server.Types
import Hasura.Server.Utils (DeprecatedEnvVars (..), EnvVarsMovedToMetadata (..),
deprecatedEnvVars, envVarsMovedToMetadata)
import Hasura.Server.Utils (DeprecatedEnvVars (..),
EnvVarsMovedToMetadata (..),
deprecatedEnvVars, envVarsMovedToMetadata)
import Hasura.Session
import Hasura.Tracing (TraceT)
import Hasura.Tracing (TraceT)
data StartupLog
@ -130,7 +130,7 @@ class (Monad m, Monoid (HTTPLoggingMetadata m)) => HttpLog m where
type HTTPLoggingMetadata m
buildHTTPLoggingMetadata :: [(G.SelectionSet G.NoFragments Variable)] -> HTTPLoggingMetadata m
buildHTTPLoggingMetadata :: [ParameterizedQueryHash] -> HTTPLoggingMetadata m
logHttpError
:: Logger Hasura

View File

@ -156,8 +156,8 @@ runCustomEndpoint env execCtx requestId userInfo reqHeaders ipAddress RestReques
-- with the query string from the schema cache, and pass it
-- through to the /v1/graphql endpoint.
(httpLoggingMetadata, handlerResp) <- flip runReaderT execCtx $ do
(normalizedSelectionSet, resp) <- GH.runGQ env (E._ecxLogger execCtx) requestId userInfo ipAddress reqHeaders E.QueryHasura (mkPassthroughRequest queryx resolvedVariables)
let httpLoggingMetadata = buildHTTPLoggingMetadata @m [normalizedSelectionSet]
(parameterizedQueryHash, resp) <- GH.runGQ env (E._ecxLogger execCtx) requestId userInfo ipAddress reqHeaders E.QueryHasura (mkPassthroughRequest queryx resolvedVariables)
let httpLoggingMetadata = buildHTTPLoggingMetadata @m [parameterizedQueryHash]
return (httpLoggingMetadata, fst <$> resp)
case sequence handlerResp of
Just resp -> pure $ (httpLoggingMetadata, fmap encodeHTTPResp resp)

View File

@ -15,7 +15,7 @@ import Hasura.Server.Utils
newtype RequestId
= RequestId { unRequestId :: Text }
deriving (Show, Eq, ToJSON, FromJSON)
deriving (Show, Eq, ToJSON, FromJSON, Hashable)
getRequestId :: (MonadIO m) => [HTTP.Header] -> m (RequestId, [HTTP.Header])
getRequestId headers = do