diff --git a/CHANGELOG.md b/CHANGELOG.md index 733dc9700ef..d605211e4d3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,8 @@ ## Next release (Add entries below in the order of server, console, cli, docs, others) +- server: make improvements in the `livequery-poller-log` + ## v2.0.0-beta.2 ### Bug fixes and improvements diff --git a/server/graphql-engine.cabal b/server/graphql-engine.cabal index eab2a95fa3d..2fdf4c4889f 100644 --- a/server/graphql-engine.cabal +++ b/server/graphql-engine.cabal @@ -542,6 +542,7 @@ library , Hasura.GraphQL.Execute.Resolve , Hasura.GraphQL.Execute.Types , Hasura.GraphQL.Explain + , Hasura.GraphQL.ParameterizedQueryHash , Hasura.GraphQL.Parser , Hasura.GraphQL.Parser.Class , Hasura.GraphQL.Parser.Class.Parse diff --git a/server/src-lib/Hasura/GraphQL/Execute.hs b/server/src-lib/Hasura/GraphQL/Execute.hs index f893c41a6d7..cf7024311ff 100644 --- a/server/src-lib/Hasura/GraphQL/Execute.hs +++ b/server/src-lib/Hasura/GraphQL/Execute.hs @@ -46,10 +46,10 @@ import qualified Hasura.Tracing as Tracing import Hasura.Base.Error import Hasura.EncJSON +import Hasura.GraphQL.ParameterizedQueryHash import Hasura.GraphQL.Parser.Column (UnpreparedValue) import Hasura.GraphQL.Parser.Directives import Hasura.GraphQL.Parser.Monad -import Hasura.GraphQL.Parser.Schema (Variable) import Hasura.GraphQL.RemoteServer (execRemoteGQ) import Hasura.GraphQL.Transport.HTTP.Protocol import Hasura.Metadata.Class @@ -263,7 +263,7 @@ getResolvedExecPlan -> HTTP.Manager -> [HTTP.Header] -> (GQLReqUnparsed, GQLReqParsed) - -> m (Telem.CacheHit, (G.SelectionSet G.NoFragments Variable, ResolvedExecutionPlan)) + -> m (Telem.CacheHit, (ParameterizedQueryHash, ResolvedExecutionPlan)) getResolvedExecPlan env logger {- planCache-} userInfo sqlGenCtx sc _scVer queryType httpManager reqHeaders (reqUnparsed, reqParsed) = -- do @@ -285,7 +285,7 @@ getResolvedExecPlan env logger {- planCache-} userInfo sqlGenCtx -- addPlanToCache plan = -- liftIO $ EP.addPlan scVer (userRole userInfo) -- opNameM queryStr plan planCache - noExistingPlan :: m (G.SelectionSet G.NoFragments Variable, ResolvedExecutionPlan) + noExistingPlan :: m (ParameterizedQueryHash, ResolvedExecutionPlan) noExistingPlan = do -- GraphQL requests may incorporate fragments which insert a pre-defined -- part of a GraphQL query. Here we make sure to remember those @@ -296,42 +296,47 @@ getResolvedExecPlan env logger {- planCache-} userInfo sqlGenCtx mapMaybe takeFragment $ unGQLExecDoc $ _grQuery reqParsed (gCtx, queryParts) <- getExecPlanPartial userInfo sc queryType reqParsed - case queryParts of - G.TypedOperationDefinition G.OperationTypeQuery _ varDefs directives selSet -> do - -- (Here the above fragment inlining is actually executed.) - inlinedSelSet <- EI.inlineSelectionSet fragments selSet - (executionPlan, queryRootFields, normalizedSelectionSet) <- - EQ.convertQuerySelSet env logger gCtx userInfo httpManager reqHeaders directives inlinedSelSet varDefs (_grVariables reqUnparsed) (scSetGraphqlIntrospectionOptions sc) - pure $ (normalizedSelectionSet, QueryExecutionPlan executionPlan queryRootFields) + (normalizedSelectionSet, resolvedExecPlan) <- + case queryParts of + G.TypedOperationDefinition G.OperationTypeQuery _ varDefs directives selSet -> do + -- (Here the above fragment inlining is actually executed.) + inlinedSelSet <- EI.inlineSelectionSet fragments selSet + (executionPlan, queryRootFields, normalizedSelectionSet) <- + EQ.convertQuerySelSet env logger gCtx userInfo httpManager reqHeaders directives inlinedSelSet varDefs (_grVariables reqUnparsed) (scSetGraphqlIntrospectionOptions sc) + pure $ (normalizedSelectionSet, QueryExecutionPlan executionPlan queryRootFields) - -- See Note [Temporarily disabling query plan caching] - -- traverse_ (addPlanToCache . EP.RPQuery) plan - G.TypedOperationDefinition G.OperationTypeMutation _ varDefs directives selSet -> do - -- (Here the above fragment inlining is actually executed.) - inlinedSelSet <- EI.inlineSelectionSet fragments selSet - (executionPlan, normalizedSelectionSet) <- - EM.convertMutationSelectionSet env logger gCtx sqlGenCtx userInfo httpManager reqHeaders directives inlinedSelSet varDefs (_grVariables reqUnparsed) (scSetGraphqlIntrospectionOptions sc) - pure $ (normalizedSelectionSet, MutationExecutionPlan executionPlan) - -- See Note [Temporarily disabling query plan caching] - -- traverse_ (addPlanToCache . EP.RPQuery) plan - G.TypedOperationDefinition G.OperationTypeSubscription _ varDefs directives selSet -> do - -- (Here the above fragment inlining is actually executed.) - inlinedSelSet <- EI.inlineSelectionSet fragments selSet - -- Parse as query to check correctness - (unpreparedAST, _reusability, normalizedDirectives, normalizedSelectionSet) <- - EQ.parseGraphQLQuery gCtx varDefs (_grVariables reqUnparsed) directives inlinedSelSet - -- Process directives on the subscription - (dirMap, _) <- (`onLeft` reportParseErrors) =<< - runParseT (parseDirectives customDirectives (G.DLExecutable G.EDLSUBSCRIPTION) normalizedDirectives) - -- A subscription should have exactly one root field. - -- However, for testing purposes, we may allow several root fields; we check for this by - -- looking for directive "_multiple_top_level_fields" on the subscription. THIS IS NOT A - -- SUPPORTED FEATURE. We might remove it in the future without warning. DO NOT USE THIS. - allowMultipleRootFields <- withDirective dirMap multipleRootFields $ pure . isJust - case inlinedSelSet of - [_] -> pure () - [] -> throw500 "empty selset for subscription" - _ -> unless allowMultipleRootFields $ - throw400 ValidationFailed "subscriptions must select one top level field" - subscriptionPlan <- buildSubscriptionPlan userInfo unpreparedAST - pure (normalizedSelectionSet, SubscriptionExecutionPlan subscriptionPlan) + -- See Note [Temporarily disabling query plan caching] + -- traverse_ (addPlanToCache . EP.RPQuery) plan + G.TypedOperationDefinition G.OperationTypeMutation _ varDefs directives selSet -> do + -- (Here the above fragment inlining is actually executed.) + inlinedSelSet <- EI.inlineSelectionSet fragments selSet + (executionPlan, normalizedSelectionSet) <- + EM.convertMutationSelectionSet env logger gCtx sqlGenCtx userInfo httpManager reqHeaders directives inlinedSelSet varDefs (_grVariables reqUnparsed) (scSetGraphqlIntrospectionOptions sc) + pure $ (normalizedSelectionSet, MutationExecutionPlan executionPlan) + -- See Note [Temporarily disabling query plan caching] + -- traverse_ (addPlanToCache . EP.RPQuery) plan + G.TypedOperationDefinition G.OperationTypeSubscription _ varDefs directives selSet -> do + -- (Here the above fragment inlining is actually executed.) + inlinedSelSet <- EI.inlineSelectionSet fragments selSet + -- Parse as query to check correctness + (unpreparedAST, _reusability, normalizedDirectives, normalizedSelectionSet) <- + EQ.parseGraphQLQuery gCtx varDefs (_grVariables reqUnparsed) directives inlinedSelSet + -- Process directives on the subscription + (dirMap, _) <- (`onLeft` reportParseErrors) =<< + runParseT (parseDirectives customDirectives (G.DLExecutable G.EDLSUBSCRIPTION) normalizedDirectives) + -- A subscription should have exactly one root field. + -- However, for testing purposes, we may allow several root fields; we check for this by + -- looking for directive "_multiple_top_level_fields" on the subscription. THIS IS NOT A + -- SUPPORTED FEATURE. We might remove it in the future without warning. DO NOT USE THIS. + allowMultipleRootFields <- withDirective dirMap multipleRootFields $ pure . isJust + case inlinedSelSet of + [_] -> pure () + [] -> throw500 "empty selset for subscription" + _ -> unless allowMultipleRootFields $ + throw400 ValidationFailed "subscriptions must select one top level field" + subscriptionPlan <- buildSubscriptionPlan userInfo unpreparedAST + pure (normalizedSelectionSet, SubscriptionExecutionPlan subscriptionPlan) + -- the parameterized query hash is calculated here because it is used in multiple + -- places and instead of calculating it separately, this is a common place to calculate + -- the parameterized query hash and then thread it to the required places + pure $ (calculateParameterizedQueryHash normalizedSelectionSet, resolvedExecPlan) diff --git a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs index 5b640790964..a8231bf1a6e 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs @@ -38,45 +38,52 @@ module Hasura.GraphQL.Execute.LiveQuery.Poll ( , LGQResponse , LiveQueryResponse(..) , LiveQueryMetadata(..) + , SubscriberExecutionDetails (..) + + -- * Batch + , BatchId (..) ) where -import Data.List.Split (chunksOf) +import Data.List.Split (chunksOf) #ifndef PROFILING import GHC.AssertNF #endif import Hasura.Prelude -import qualified Control.Concurrent.Async as A -import qualified Control.Concurrent.STM as STM -import qualified Control.Immortal as Immortal -import qualified Crypto.Hash as CH -import qualified Data.Aeson.Extended as J -import qualified Data.Aeson.TH as J -import qualified Data.ByteString as BS -import qualified Data.HashMap.Strict as Map -import qualified Data.Time.Clock as Clock -import qualified Data.UUID as UUID -import qualified Data.UUID.V4 as UUID +import qualified Control.Concurrent.Async as A +import qualified Control.Concurrent.STM as STM +import qualified Control.Immortal as Immortal +import qualified Crypto.Hash as CH +import qualified Data.Aeson.Extended as J +import qualified Data.ByteString as BS +import qualified Data.HashMap.Strict as Map +import qualified Data.Time.Clock as Clock +import qualified Data.UUID as UUID +import qualified Data.UUID.V4 as UUID import qualified ListT -import qualified StmContainers.Map as STMMap +import qualified StmContainers.Map as STMMap import Control.Lens +import Data.Monoid (Sum (..)) import Data.Text.Extended -import qualified Hasura.GraphQL.Execute.LiveQuery.TMap as TMap -import qualified Hasura.Logging as L +import qualified Hasura.GraphQL.Execute.LiveQuery.TMap as TMap +import qualified Hasura.Logging as L import Hasura.Base.Error import Hasura.GraphQL.Execute.Backend import Hasura.GraphQL.Execute.LiveQuery.Options import Hasura.GraphQL.Execute.LiveQuery.Plan +import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash) import Hasura.GraphQL.Transport.Backend import Hasura.GraphQL.Transport.HTTP.Protocol +import Hasura.GraphQL.Transport.WebSocket.Protocol (OperationId) +import qualified Hasura.GraphQL.Transport.WebSocket.Server as WS import Hasura.RQL.Types.Backend -import Hasura.RQL.Types.Common (SourceName, getNonNegativeInt) +import Hasura.RQL.Types.Common (SourceName, getNonNegativeInt) +import Hasura.Server.Types (RequestId) import Hasura.Session - -- ---------------------------------------------------------------------------------------------- -- Subscribers @@ -96,13 +103,21 @@ newtype SubscriberMetadata = SubscriberMetadata { unSubscriberMetadata :: J.Value } deriving (Show, Eq, J.ToJSON) -mkSubscriberMetadata :: J.Value -> SubscriberMetadata -mkSubscriberMetadata = SubscriberMetadata +mkSubscriberMetadata :: WS.WSId -> OperationId -> Maybe OperationName -> RequestId -> SubscriberMetadata +mkSubscriberMetadata websocketId operationId operationName reqId = + SubscriberMetadata $ J.object + [ "websocket_id" J..= websocketId + , "operation_id" J..= operationId + , "operation_name" J..= operationName + , "request_id" J..= reqId + ] data Subscriber = Subscriber { _sId :: !SubscriberId , _sMetadata :: !SubscriberMetadata + , _sRequestId :: !RequestId + , _sOperationName :: !(Maybe OperationName) , _sOnChangeCallback :: !OnChange } @@ -150,6 +165,12 @@ data Cohort -- result changed, then merge them in the map of existing subscribers } +-- | The @BatchId@ is a number based ID to uniquely identify a batch in a single poll and +-- it's used to identify the batch to which a cohort belongs to. +newtype BatchId + = BatchId { _unBatchId :: Int } + deriving (Show, Eq, J.ToJSON) + {- Note [Blake2b faster than SHA-256] ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ At the time of writing, from https://blake2.net, it is stated, @@ -214,9 +235,7 @@ pushResultToCohort -> Maybe ResponseHash -> LiveQueryMetadata -> CohortSnapshot - -> IO ( [(SubscriberId, SubscriberMetadata)] - , [(SubscriberId, SubscriberMetadata)] - ) + -> IO ( [SubscriberExecutionDetails], [SubscriberExecutionDetails]) -- ^ subscribers to which data has been pushed, subscribers which already -- have this data (this information is exposed by metrics reporting) pushResultToCohort result !respHashM (LiveQueryMetadata dTime) cohortSnapshot = do @@ -233,14 +252,15 @@ pushResultToCohort result !respHashM (LiveQueryMetadata dTime) cohortSnapshot = else return (newSinks, curSinks) pushResultToSubscribers subscribersToPush - pure $ over (each.each) (\Subscriber{..} -> (_sId, _sMetadata)) - (subscribersToPush, subscribersToIgnore) + pure $ over (each.each) (\Subscriber{..} + -> SubscriberExecutionDetails _sId _sMetadata) + (subscribersToPush, subscribersToIgnore) where CohortSnapshot _ respRef curSinks newSinks = cohortSnapshot response = result <&> \payload -> LiveQueryResponse payload dTime pushResultToSubscribers = - A.mapConcurrently_ $ \(Subscriber _ _ action) -> action response + A.mapConcurrently_ $ \Subscriber {..} -> _sOnChangeCallback response -- ----------------------------------------------------------------------------- -- Pollers @@ -320,6 +340,12 @@ dumpPollerMap extended lqMap = newtype PollerId = PollerId { unPollerId :: UUID.UUID } deriving (Show, Eq, Generic, J.ToJSON) +data SubscriberExecutionDetails + = SubscriberExecutionDetails + { _sedSubscriberId :: !SubscriberId + , _sedSubscriberMetadata :: !SubscriberMetadata + } deriving (Show, Eq) + -- | Execution information related to a cohort on a poll cycle data CohortExecutionDetails = CohortExecutionDetails @@ -327,58 +353,66 @@ data CohortExecutionDetails , _cedVariables :: !CohortVariables , _cedResponseSize :: !(Maybe Int) -- ^ Nothing in case of an error - , _cedPushedTo :: ![(SubscriberId, SubscriberMetadata)] + , _cedPushedTo :: ![SubscriberExecutionDetails] -- ^ The response on this cycle has been pushed to these above subscribers -- New subscribers (those which haven't been around during the previous poll -- cycle) will always be part of this - , _cedIgnored :: ![(SubscriberId, SubscriberMetadata)] + , _cedIgnored :: ![SubscriberExecutionDetails] -- ^ The response on this cycle has *not* been pushed to these above -- subscribers. This would when the response hasn't changed from the previous -- polled cycle + , _cedBatchId :: !BatchId } deriving (Show, Eq) -$(J.deriveToJSON hasuraJSON ''CohortExecutionDetails) - -- | Execution information related to a single batched execution data BatchExecutionDetails = BatchExecutionDetails - { _bedPgExecutionTime :: !Clock.DiffTime + { _bedPgExecutionTime :: !Clock.DiffTime -- ^ postgres execution time of each batch - , _bedPushTime :: !Clock.DiffTime + , _bedPushTime :: !Clock.DiffTime -- ^ time to taken to push to all cohorts belonging to this batch - , _bedCohorts :: ![CohortExecutionDetails] + , _bedBatchId :: !BatchId + -- ^ id of the batch + , _bedCohorts :: ![CohortExecutionDetails] -- ^ execution details of the cohorts belonging to this batch + , _bedBatchResponseSizeBytes :: !(Maybe Int) } deriving (Show, Eq) -- | see Note [Minimal LiveQuery Poller Log] batchExecutionDetailMinimal :: BatchExecutionDetails -> J.Value batchExecutionDetailMinimal BatchExecutionDetails{..} = - J.object [ "pg_execution_time" J..= _bedPgExecutionTime - , "push_time" J..= _bedPushTime - ] - -$(J.deriveToJSON hasuraJSON ''BatchExecutionDetails) + let batchRespSize = + maybe mempty + (\respSize -> ["batch_response_size_bytes" J..= respSize]) + _bedBatchResponseSizeBytes + in + J.object ([ "pg_execution_time" J..= _bedPgExecutionTime + , "push_time" J..= _bedPushTime + ] + -- log batch resp size only when there are no errors + <> batchRespSize) data PollDetails = PollDetails - { _pdPollerId :: !PollerId + { _pdPollerId :: !PollerId -- ^ the unique ID (basically a thread that run as a 'Poller') for the -- 'Poller' - , _pdGeneratedSql :: !Text + , _pdGeneratedSql :: !Text -- ^ the multiplexed SQL query to be run against the database with all the -- variables together - , _pdSnapshotTime :: !Clock.DiffTime + , _pdSnapshotTime :: !Clock.DiffTime -- ^ the time taken to get a snapshot of cohorts from our 'LiveQueriesState' -- data structure - , _pdBatches :: ![BatchExecutionDetails] + , _pdBatches :: ![BatchExecutionDetails] -- ^ list of execution batches and their details - , _pdTotalTime :: !Clock.DiffTime + , _pdTotalTime :: !Clock.DiffTime -- ^ total time spent on a poll cycle - , _pdLiveQueryOptions :: !LiveQueriesOptions + , _pdLiveQueryOptions :: !LiveQueriesOptions + , _pdSource :: !SourceName + , _pdRole :: !RoleName + , _pdParameterizedQueryHash :: !ParameterizedQueryHash } deriving (Show, Eq) -$(J.deriveToJSON hasuraJSON ''PollDetails) - {- Note [Minimal LiveQuery Poller Log] ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ We only want to log the minimal information in the livequery-poller-log as it @@ -396,6 +430,8 @@ pollDetailMinimal PollDetails{..} = , "snapshot_time" J..= _pdSnapshotTime , "batches" J..= map batchExecutionDetailMinimal _pdBatches , "total_time" J..= _pdTotalTime + , "source" J..= _pdSource + , "role" J..= _pdRole ] instance L.ToEngineLog PollDetails L.Hasura where @@ -414,12 +450,14 @@ pollQuery . BackendTransport b => PollerId -> LiveQueriesOptions - -> SourceConfig b + -> (SourceName, SourceConfig b) + -> RoleName + -> ParameterizedQueryHash -> MultiplexedQuery b -> CohortMap -> LiveQueryPostPollHook -> IO () -pollQuery pollerId lqOpts sourceConfig query cohortMap postPollHook = do +pollQuery pollerId lqOpts (sourceName, sourceConfig) roleName parameterizedQueryHash query cohortMap postPollHook = do (totalTime, (snapshotTime, batchesDetails)) <- withElapsedTime $ do -- snapshot the current cohorts and split them into batches @@ -429,15 +467,21 @@ pollQuery pollerId lqOpts sourceConfig query cohortMap postPollHook = do cohorts <- STM.atomically $ TMap.toList cohortMap cohortSnapshots <- mapM (STM.atomically . getCohortSnapshot) cohorts -- cohorts are broken down into batches specified by the batch size - pure $ chunksOf (getNonNegativeInt (unBatchSize batchSize)) cohortSnapshots + let cohortBatches = chunksOf (getNonNegativeInt (unBatchSize batchSize)) cohortSnapshots + -- associating every batch with their BatchId + pure $ zip (BatchId <$> [1 .. ]) cohortBatches -- concurrently process each batch - batchesDetails <- A.forConcurrently cohortBatches $ \cohorts -> do + batchesDetails <- A.forConcurrently cohortBatches $ \(batchId, cohorts) -> do (queryExecutionTime, mxRes) <- runDBSubscription @b sourceConfig query $ over (each._2) _csVariables cohorts let lqMeta = LiveQueryMetadata $ convertDuration queryExecutionTime operations = getCohortOperations cohorts mxRes - + -- batch response size is the sum of the response sizes of the cohorts + batchResponseSize = + case mxRes of + Left _ -> Nothing + Right resp -> Just $ getSum $ foldMap (Sum . BS.length . snd) resp (pushTime, cohortsExecutionDetails) <- withElapsedTime $ A.forConcurrently operations $ \(res, cohortId, respData, snapshot) -> do (pushedSubscribers, ignoredSubscribers) <- @@ -448,8 +492,13 @@ pollQuery pollerId lqOpts sourceConfig query cohortMap postPollHook = do , _cedPushedTo = pushedSubscribers , _cedIgnored = ignoredSubscribers , _cedResponseSize = snd <$> respData + , _cedBatchId = batchId } - pure $ BatchExecutionDetails queryExecutionTime pushTime cohortsExecutionDetails + pure $ BatchExecutionDetails queryExecutionTime + pushTime + batchId + cohortsExecutionDetails + batchResponseSize pure (snapshotTime, batchesDetails) @@ -460,6 +509,9 @@ pollQuery pollerId lqOpts sourceConfig query cohortMap postPollHook = do , _pdBatches = batchesDetails , _pdLiveQueryOptions = lqOpts , _pdTotalTime = totalTime + , _pdSource = sourceName + , _pdRole = roleName + , _pdParameterizedQueryHash = parameterizedQueryHash } postPollHook pollDetails where diff --git a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/State.hs b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/State.hs index f2119bef8ae..6fbdc952297 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/State.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/State.hs @@ -46,11 +46,14 @@ import Hasura.GraphQL.Execute.Backend import Hasura.GraphQL.Execute.LiveQuery.Options import Hasura.GraphQL.Execute.LiveQuery.Plan import Hasura.GraphQL.Execute.LiveQuery.Poll +import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash) import Hasura.GraphQL.Transport.Backend +import Hasura.GraphQL.Transport.HTTP.Protocol (OperationName) import Hasura.GraphQL.Transport.WebSocket.Protocol import Hasura.RQL.Types.Action import Hasura.RQL.Types.Common (SourceName, unNonNegativeDiffTime) import Hasura.Server.Init (ServerMetrics (..)) +import Hasura.Server.Types (RequestId) -- | The top-level datatype that holds the state for all active live queries. @@ -95,18 +98,23 @@ addLiveQuery -> SubscriberMetadata -> LiveQueriesState -> SourceName + -> ParameterizedQueryHash + -> Maybe OperationName + -- ^ operation name of the query + -> RequestId -> LiveQueryPlan b (MultiplexedQuery b) -> OnChange -- ^ the action to be executed when result changes -> IO LiveQueryId -addLiveQuery logger serverMetrics subscriberMetadata lqState source plan onResultAction = do +addLiveQuery logger serverMetrics subscriberMetadata lqState + source parameterizedQueryHash operationName requestId plan onResultAction = do -- CAREFUL!: It's absolutely crucial that we can't throw any exceptions here! -- disposable UUIDs: cohortId <- newCohortId subscriberId <- newSubscriberId - let !subscriber = Subscriber subscriberId subscriberMetadata onResultAction + let !subscriber = Subscriber subscriberId subscriberMetadata requestId operationName onResultAction #ifndef PROFILING $assertNFHere subscriber -- so we don't write thunks to mutable vars @@ -131,7 +139,7 @@ addLiveQuery logger serverMetrics subscriberMetadata lqState source plan onResul onJust handlerM $ \handler -> do pollerId <- PollerId <$> UUID.nextRandom threadRef <- forkImmortal ("pollQuery." <> show pollerId) logger $ forever $ do - pollQuery @b pollerId lqOpts sourceConfig query (_pCohorts handler) postPollHook + pollQuery @b pollerId lqOpts (source, sourceConfig) role parameterizedQueryHash query (_pCohorts handler) postPollHook sleep $ unNonNegativeDiffTime $ unRefetchInterval refetchInterval let !pState = PollerIOState threadRef pollerId #ifndef PROFILING diff --git a/server/src-lib/Hasura/GraphQL/ParameterizedQueryHash.hs b/server/src-lib/Hasura/GraphQL/ParameterizedQueryHash.hs new file mode 100644 index 00000000000..885aac21121 --- /dev/null +++ b/server/src-lib/Hasura/GraphQL/ParameterizedQueryHash.hs @@ -0,0 +1,135 @@ +{-| +This module calculates parameterized query hash, which is a way to +hash an incoming query (after resolving variables) with all leaf nodes +(i.e. scalar values) discarded. In other words, two queries having the same +parameterized query hash are essentially the same query but may differ in +leaf values. + +For example: + +1. query { + authors (where: {id: {_eq: 2}}) { + id + name + } + } + +2. query { + authors (where: {id: {_eq: 203943}}) { + id + name + } + } + +3. query { + authors (where: {id: {_eq: $id}}) { + id + name + } + } + + For any value of `id` + +4. query { + authors (where: $whereBoolExp) { + id + name + } + } + + only when `whereBoolExp` is of the form of + + { + "id": { + "_eq": + } + } + +All the above queries should result in the same parameterized query hash. + +The following steps are done to calculate the parameterized query hash: + +1. Normalize the GraphQL query by substituting the variables (if any) in appropriate places. +2. Substitute any scalar GraphQL values (Int, Float, Enum, String and Boolean) to null +3. For input objects and list, traverse through them and do step no 2. +4. Calculate the hash of the query obtained from step 3. + +Note: Parameterized query hash is a PRO only feature +-} + +module Hasura.GraphQL.ParameterizedQueryHash + ( calculateParameterizedQueryHash + , ParameterizedQueryHash + ) +where + +import Hasura.Prelude + +import qualified Data.Aeson as J +import qualified Data.ByteString as B +import qualified Data.HashMap.Strict as Map +import qualified Language.GraphQL.Draft.Printer as G +import qualified Language.GraphQL.Draft.Syntax as G +import qualified Text.Builder as Text + +import Hasura.GraphQL.Parser (InputValue (..), Variable (..)) + +import Hasura.Server.Utils (cryptoHash) + +newtype ParameterizedQueryHash + = ParameterizedQueryHash { unParamQueryHash :: B.ByteString } + deriving (Show, Eq) + +instance J.ToJSON ParameterizedQueryHash where + toJSON = J.String . bsToTxt . unParamQueryHash + +normalizeSelectionSet :: G.SelectionSet G.NoFragments Variable -> G.SelectionSet G.NoFragments Void +normalizeSelectionSet = (normalizeSelection =<<) + where + normalizeSelection :: G.Selection G.NoFragments Variable -> G.SelectionSet G.NoFragments Void + normalizeSelection (G.SelectionField fld) = pure $ G.SelectionField (normalizeField fld) + normalizeSelection (G.SelectionInlineFragment (G.InlineFragment _ _ selSet)) = + normalizeSelectionSet selSet + + normalizeField (G.Field _alias name args _directives selSet) = + G.Field Nothing name (Map.map normalizeValue args) mempty $ normalizeSelectionSet selSet + + normalizeConstValue :: G.Value Void -> G.Value Void + normalizeConstValue = \case + G.VNull -> G.VNull + G.VInt _ -> G.VNull + G.VFloat _ -> G.VNull + G.VString _ -> G.VNull + G.VBoolean _ -> G.VNull + G.VEnum _ -> G.VNull + G.VList l -> G.VList $ map normalizeConstValue l + G.VObject obj -> G.VObject $ Map.map normalizeConstValue obj + + jsonToNormalizedGQLVal :: J.Value -> G.Value Void + jsonToNormalizedGQLVal = \case + J.Null -> G.VNull + J.Bool _ -> G.VNull + J.String _ -> G.VNull + J.Number _ -> G.VNull + J.Array l -> G.VList $ jsonToNormalizedGQLVal <$> toList l + J.Object vals -> G.VObject $ Map.fromList $ + flip map (Map.toList vals) $ \(key, val) -> + (G.unsafeMkName key, jsonToNormalizedGQLVal val) + + normalizeValue :: G.Value Variable -> G.Value Void + normalizeValue = \case + G.VNull -> G.VNull + G.VInt _ -> G.VNull + G.VFloat _ -> G.VNull + G.VString _ -> G.VNull + G.VBoolean _ -> G.VNull + G.VEnum _ -> G.VNull + G.VList l -> G.VList $ map normalizeValue l + G.VObject obj -> G.VObject $ Map.map normalizeValue obj + G.VVariable (Variable _info _type value) -> + case value of + GraphQLValue val -> normalizeConstValue val + JSONValue v -> jsonToNormalizedGQLVal v + +calculateParameterizedQueryHash :: G.SelectionSet G.NoFragments Variable -> ParameterizedQueryHash +calculateParameterizedQueryHash = ParameterizedQueryHash . cryptoHash . Text.run . G.selectionSet . normalizeSelectionSet diff --git a/server/src-lib/Hasura/GraphQL/Transport/HTTP.hs b/server/src-lib/Hasura/GraphQL/Transport/HTTP.hs index 36ae91f44be..68570e99bf1 100644 --- a/server/src-lib/Hasura/GraphQL/Transport/HTTP.hs +++ b/server/src-lib/Hasura/GraphQL/Transport/HTTP.hs @@ -52,8 +52,8 @@ import Hasura.Base.Error import Hasura.EncJSON import Hasura.GraphQL.Logging (MonadQueryLog (logQueryLog), QueryLog (..), QueryLogKind (..)) +import Hasura.GraphQL.ParameterizedQueryHash import Hasura.GraphQL.Parser.Column (UnpreparedValue (..)) -import Hasura.GraphQL.Parser.Schema (Variable) import Hasura.GraphQL.Transport.Backend import Hasura.GraphQL.Transport.HTTP.Protocol import Hasura.GraphQL.Transport.Instances () @@ -203,16 +203,16 @@ runGQ -> [HTTP.Header] -> E.GraphQLQueryType -> GQLReqUnparsed - -> m (G.SelectionSet G.NoFragments Variable, HttpResponse (Maybe GQResponse, EncJSON)) + -> m (ParameterizedQueryHash, HttpResponse (Maybe GQResponse, EncJSON)) runGQ env logger reqId userInfo ipAddress reqHeaders queryType reqUnparsed = do - (telemTimeTot_DT, (telemCacheHit, (telemQueryType, telemTimeIO_DT, telemLocality, resp, normalizedSelectionSet))) <- withElapsedTime $ do + (telemTimeTot_DT, (telemCacheHit, (telemQueryType, telemTimeIO_DT, telemLocality, resp, parameterizedQueryHash))) <- withElapsedTime $ do E.ExecutionCtx _ sqlGenCtx {- planCache -} sc scVer httpManager enableAL <- ask -- run system authorization on the GraphQL API reqParsed <- E.checkGQLExecution userInfo (reqHeaders, ipAddress) enableAL sc reqUnparsed >>= flip onLeft throwError - (telemCacheHit, (normalizedSelectionSet, execPlan)) <- + (telemCacheHit, (parameterizedQueryHash, execPlan)) <- E.getResolvedExecPlan env logger {- planCache -} userInfo sqlGenCtx sc scVer queryType @@ -236,7 +236,7 @@ runGQ env logger reqId userInfo ipAddress reqHeaders queryType reqUnparsed = do case fmap decodeGQResp cachedValue of Just cachedResponseData -> do logQueryLog logger $ QueryLog reqUnparsed Nothing reqId QueryLogKindCached - pure (Telem.Query, 0, Telem.Local, HttpResponse cachedResponseData responseHeaders, normalizedSelectionSet) + pure (Telem.Query, 0, Telem.Local, HttpResponse cachedResponseData responseHeaders, parameterizedQueryHash) Nothing -> do conclusion <- runExceptT $ forWithKey queryPlans $ \fieldName -> \case @@ -278,7 +278,7 @@ runGQ env logger reqId userInfo ipAddress reqHeaders queryType reqUnparsed = do logQueryLog logger $ QueryLog reqUnparsed Nothing reqId QueryLogKindIntrospection buildRaw json out@(_, _, _, HttpResponse responseData _, _) <- - buildResultFromFragments Telem.Query conclusion responseHeaders normalizedSelectionSet + buildResultFromFragments Telem.Query conclusion responseHeaders parameterizedQueryHash Tracing.interpTraceT (liftEitherM . runExceptT) $ cacheStore cacheKey $ snd responseData pure out @@ -296,7 +296,7 @@ runGQ env logger reqId userInfo ipAddress reqHeaders queryType reqUnparsed = do resp <- runExceptT $ doQErr $ runPGMutationTransaction reqId reqUnparsed userInfo logger sourceConfig pgMutations -- we do not construct result fragments since we have only one result - buildResult Telem.Mutation normalizedSelectionSet resp \(telemTimeIO_DT, results) -> + buildResult Telem.Mutation parameterizedQueryHash resp \(telemTimeIO_DT, results) -> let responseData = Right $ encJToLBS $ encJFromInsOrdHashMap $ OMap.mapKeys G.unName results in ( Telem.Mutation , telemTimeIO_DT @@ -304,7 +304,7 @@ runGQ env logger reqId userInfo ipAddress reqHeaders queryType reqUnparsed = do , HttpResponse (Just responseData, encodeGQResp responseData) [] - , normalizedSelectionSet + , parameterizedQueryHash ) -- we are not in the transaction case; proceeding normally @@ -347,7 +347,7 @@ runGQ env logger reqId userInfo ipAddress reqHeaders queryType reqUnparsed = do E.ExecStepRaw json -> do logQueryLog logger $ QueryLog reqUnparsed Nothing reqId QueryLogKindIntrospection buildRaw json - buildResultFromFragments Telem.Mutation conclusion [] normalizedSelectionSet + buildResultFromFragments Telem.Mutation conclusion [] parameterizedQueryHash E.SubscriptionExecutionPlan _sub -> throw400 UnexpectedPayload "subscriptions are not supported over HTTP, use websockets instead" @@ -356,7 +356,7 @@ runGQ env logger reqId userInfo ipAddress reqHeaders queryType reqUnparsed = do telemTimeTot = convertDuration telemTimeTot_DT telemTransport = Telem.HTTP Telem.recordTimingMetric Telem.RequestDimensions{..} Telem.RequestTimings{..} - return (normalizedSelectionSet, resp) + return (parameterizedQueryHash, resp) where getExecStepActionWithActionInfo acc execStep = case execStep of EB.ExecStepAction _ actionInfo _remoteJoins -> (actionInfo:acc) @@ -377,15 +377,15 @@ runGQ env logger reqId userInfo ipAddress reqHeaders queryType reqUnparsed = do :: Telem.QueryType -> Either (Either GQExecError QErr) (InsOrdHashMap G.Name ResultsFragment) -> HTTP.ResponseHeaders - -> G.SelectionSet G.NoFragments Variable + -> ParameterizedQueryHash -> m ( Telem.QueryType , DiffTime , Telem.Locality , HttpResponse (Maybe GQResponse, EncJSON) - , G.SelectionSet G.NoFragments Variable + , ParameterizedQueryHash ) - buildResultFromFragments telemType fragments cacheHeaders normalizedSelSet = - buildResult telemType normalizedSelSet fragments \results -> + buildResultFromFragments telemType fragments cacheHeaders parameterizedQueryHash = + buildResult telemType parameterizedQueryHash fragments \results -> let responseData = Right $ encJToLBS $ encJFromInsOrdHashMap $ rfResponse <$> OMap.mapKeys G.unName results in ( telemType , sum (fmap rfTimeIO results) @@ -393,28 +393,28 @@ runGQ env logger reqId userInfo ipAddress reqHeaders queryType reqUnparsed = do , HttpResponse (Just responseData, encodeGQResp responseData) (cacheHeaders <> foldMap rfHeaders results) - , normalizedSelSet + , parameterizedQueryHash ) buildResult :: Telem.QueryType - -> G.SelectionSet G.NoFragments Variable + -> ParameterizedQueryHash -> Either (Either GQExecError QErr) a -> (a -> ( Telem.QueryType , DiffTime , Telem.Locality , HttpResponse (Maybe GQResponse, EncJSON) - , G.SelectionSet G.NoFragments Variable + , ParameterizedQueryHash ) ) -> m ( Telem.QueryType , DiffTime , Telem.Locality , HttpResponse (Maybe GQResponse, EncJSON) - , G.SelectionSet G.NoFragments Variable + , ParameterizedQueryHash ) - buildResult telemType normalizedSelSet result f = case result of + buildResult telemType parameterizedQueryHash result f = case result of Right a -> pure $ f a Left (Right err) -> throwError err Left (Left err) -> pure ( telemType @@ -423,7 +423,7 @@ runGQ env logger reqId userInfo ipAddress reqHeaders queryType reqUnparsed = do , HttpResponse (Just (Left err), encodeGQResp $ Left err) [] - , normalizedSelSet + , parameterizedQueryHash ) coalescePostgresMutations @@ -502,8 +502,8 @@ runGQBatched runGQBatched env logger reqId responseErrorsConfig userInfo ipAddress reqHdrs queryType query = case query of GQLSingleRequest req -> do - (normalizedSelectionSet, httpResp) <- runGQ env logger reqId userInfo ipAddress reqHdrs queryType req - let httpLoggingMetadata = buildHTTPLoggingMetadata @m [normalizedSelectionSet] + (parameterizedQueryHash, httpResp) <- runGQ env logger reqId userInfo ipAddress reqHdrs queryType req + let httpLoggingMetadata = buildHTTPLoggingMetadata @m [parameterizedQueryHash] pure (httpLoggingMetadata, snd <$> httpResp) GQLBatchedReqs reqs -> do -- It's unclear what we should do if we receive multiple diff --git a/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs b/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs index dee0ec391ae..dd7de735033 100644 --- a/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs +++ b/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs @@ -378,7 +378,7 @@ onStart env serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do userInfo sqlGenCtx sc scVer queryType httpMgr reqHdrs (q, reqParsed) - (telemCacheHit, (_normalizeSelSet, execPlan)) <- onLeft execPlanE (withComplete . preExecErr requestId) + (telemCacheHit, (parameterizedQueryHash, execPlan)) <- onLeft execPlanE (withComplete . preExecErr requestId) case execPlan of E.QueryExecutionPlan queryPlan asts -> Tracing.trace "Query" $ do @@ -539,7 +539,7 @@ onStart env serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do E.SEOnSourceDB actionIds liveQueryBuilder -> do actionLogMapE <- fmap fst <$> runExceptT (EA.fetchActionLogResponses actionIds) actionLogMap <- onLeft actionLogMapE (withComplete . preExecErr requestId) - lqIdE <- liftIO $ startLiveQuery liveQueryBuilder actionLogMap + lqIdE <- liftIO $ startLiveQuery liveQueryBuilder parameterizedQueryHash requestId actionLogMap lqId <- onLeft lqIdE (withComplete . preExecErr requestId) -- Update async action query subscription state @@ -552,7 +552,8 @@ onStart env serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do logQueryLog logger $ QueryLog q Nothing requestId QueryLogKindAction liftIO $ do let asyncActionQueryLive = LQ.LAAQOnSourceDB $ - LQ.LiveAsyncActionQueryOnSource lqId actionLogMap $ restartLiveQuery liveQueryBuilder + LQ.LiveAsyncActionQueryOnSource lqId actionLogMap $ + restartLiveQuery parameterizedQueryHash requestId liveQueryBuilder onUnexpectedException err = do sendError requestId err @@ -659,24 +660,20 @@ onStart env serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do liftIO $ sendCompleted Nothing throwError () - restartLiveQuery liveQueryBuilder lqId actionLogMap = do + restartLiveQuery parameterizedQueryHash requestId liveQueryBuilder lqId actionLogMap = do LQ.removeLiveQuery logger (_wseServerMetrics serverEnv) lqMap lqId - either (const Nothing) Just <$> startLiveQuery liveQueryBuilder actionLogMap + either (const Nothing) Just <$> startLiveQuery liveQueryBuilder parameterizedQueryHash requestId actionLogMap - startLiveQuery liveQueryBuilder actionLogMap = do + startLiveQuery liveQueryBuilder parameterizedQueryHash requestId actionLogMap = do liveQueryE <- runExceptT $ liveQueryBuilder actionLogMap for liveQueryE $ \(sourceName, E.LQP exists) -> do - let subscriberMetadata = LQ.mkSubscriberMetadata $ J.object - [ "websocket_id" J..= WS.getWSId wsConn - , "operation_id" J..= opId - ] - + let !opName = _grOperationName q + subscriberMetadata = LQ.mkSubscriberMetadata (WS.getWSId wsConn) opId opName requestId -- NOTE!: we mask async exceptions higher in the call stack, but it's -- crucial we don't lose lqId after addLiveQuery returns successfully. !lqId <- liftIO $ AB.dispatchAnyBackend @BackendTransport exists \(E.MultiplexedLiveQueryPlan liveQueryPlan) -> - LQ.addLiveQuery logger (_wseServerMetrics serverEnv) subscriberMetadata lqMap sourceName liveQueryPlan liveQOnChange - let !opName = _grOperationName q + LQ.addLiveQuery logger (_wseServerMetrics serverEnv) subscriberMetadata lqMap sourceName parameterizedQueryHash opName requestId liveQueryPlan liveQOnChange #ifndef PROFILING liftIO $ $assertNFHere (lqId, opName) -- so we don't write thunks to mutable vars #endif diff --git a/server/src-lib/Hasura/Server/Logging.hs b/server/src-lib/Hasura/Server/Logging.hs index 8f273575044..59172705235 100644 --- a/server/src-lib/Hasura/Server/Logging.hs +++ b/server/src-lib/Hasura/Server/Logging.hs @@ -20,32 +20,32 @@ module Hasura.Server.Logging import Hasura.Prelude -import qualified Data.ByteString.Lazy as BL -import qualified Data.Environment as Env -import qualified Data.HashMap.Strict as HM -import qualified Data.TByteString as TBS -import qualified Data.Text as T -import qualified Language.GraphQL.Draft.Syntax as G -import qualified Network.HTTP.Types as HTTP -import qualified Network.Wai.Extended as Wai +import qualified Data.ByteString.Lazy as BL +import qualified Data.Environment as Env +import qualified Data.HashMap.Strict as HM +import qualified Data.TByteString as TBS +import qualified Data.Text as T +import qualified Network.HTTP.Types as HTTP +import qualified Network.Wai.Extended as Wai import Data.Aeson import Data.Aeson.TH -import Data.Int (Int64) +import Data.Int (Int64) import Data.Text.Extended import Hasura.Base.Error -import Hasura.GraphQL.Parser.Schema (Variable) +import Hasura.GraphQL.ParameterizedQueryHash import Hasura.HTTP import Hasura.Logging import Hasura.Metadata.Class import Hasura.RQL.Types import Hasura.Server.Compression import Hasura.Server.Types -import Hasura.Server.Utils (DeprecatedEnvVars (..), EnvVarsMovedToMetadata (..), - deprecatedEnvVars, envVarsMovedToMetadata) +import Hasura.Server.Utils (DeprecatedEnvVars (..), + EnvVarsMovedToMetadata (..), + deprecatedEnvVars, envVarsMovedToMetadata) import Hasura.Session -import Hasura.Tracing (TraceT) +import Hasura.Tracing (TraceT) data StartupLog @@ -130,7 +130,7 @@ class (Monad m, Monoid (HTTPLoggingMetadata m)) => HttpLog m where type HTTPLoggingMetadata m - buildHTTPLoggingMetadata :: [(G.SelectionSet G.NoFragments Variable)] -> HTTPLoggingMetadata m + buildHTTPLoggingMetadata :: [ParameterizedQueryHash] -> HTTPLoggingMetadata m logHttpError :: Logger Hasura diff --git a/server/src-lib/Hasura/Server/Rest.hs b/server/src-lib/Hasura/Server/Rest.hs index 56cbb7819ba..56fab714d0f 100644 --- a/server/src-lib/Hasura/Server/Rest.hs +++ b/server/src-lib/Hasura/Server/Rest.hs @@ -156,8 +156,8 @@ runCustomEndpoint env execCtx requestId userInfo reqHeaders ipAddress RestReques -- with the query string from the schema cache, and pass it -- through to the /v1/graphql endpoint. (httpLoggingMetadata, handlerResp) <- flip runReaderT execCtx $ do - (normalizedSelectionSet, resp) <- GH.runGQ env (E._ecxLogger execCtx) requestId userInfo ipAddress reqHeaders E.QueryHasura (mkPassthroughRequest queryx resolvedVariables) - let httpLoggingMetadata = buildHTTPLoggingMetadata @m [normalizedSelectionSet] + (parameterizedQueryHash, resp) <- GH.runGQ env (E._ecxLogger execCtx) requestId userInfo ipAddress reqHeaders E.QueryHasura (mkPassthroughRequest queryx resolvedVariables) + let httpLoggingMetadata = buildHTTPLoggingMetadata @m [parameterizedQueryHash] return (httpLoggingMetadata, fst <$> resp) case sequence handlerResp of Just resp -> pure $ (httpLoggingMetadata, fmap encodeHTTPResp resp) diff --git a/server/src-lib/Hasura/Server/Types.hs b/server/src-lib/Hasura/Server/Types.hs index b8baf4c57c0..7727dc09f9a 100644 --- a/server/src-lib/Hasura/Server/Types.hs +++ b/server/src-lib/Hasura/Server/Types.hs @@ -15,7 +15,7 @@ import Hasura.Server.Utils newtype RequestId = RequestId { unRequestId :: Text } - deriving (Show, Eq, ToJSON, FromJSON) + deriving (Show, Eq, ToJSON, FromJSON, Hashable) getRequestId :: (MonadIO m) => [HTTP.Header] -> m (RequestId, [HTTP.Header]) getRequestId headers = do