graphql-engine/server/src-lib/Hasura/Backends/Postgres/Execute/Subscription.hs

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

315 lines
13 KiB
Haskell
Raw Normal View History

{-# LANGUAGE TemplateHaskell #-}
-- | Postgres Execute subscription
--
-- Multiplex is an optimization which allows us to group similar queries into a
-- single query, and routing the response rows afterwards. See
-- https://hasura.io/docs/latest/graphql/core/databases/postgres/subscriptions/execution-and-performance.html
-- for more details
--
-- See 'Hasura.Backends.Postgres.Instances.Execute'.
module Hasura.Backends.Postgres.Execute.Subscription
( MultiplexedQuery (..),
QueryParametersInfo (..),
mkMultiplexedQuery,
mkStreamingMultiplexedQuery,
resolveMultiplexedValue,
validateVariablesTx,
executeMultiplexedQuery,
executeStreamingMultiplexedQuery,
executeQuery,
SubscriptionType (..),
)
where
import Control.Lens
import Data.ByteString qualified as B
import Data.HashMap.Strict qualified as Map
import Data.HashMap.Strict.InsOrd qualified as OMap
import Data.HashSet qualified as Set
import Data.Semigroup.Generic
import Data.Text.Extended
import Database.PG.Query qualified as PG
import Hasura.Backends.Postgres.Connection
import Hasura.Backends.Postgres.SQL.DML qualified as S
import Hasura.Backends.Postgres.SQL.Error
import Hasura.Backends.Postgres.SQL.Types
import Hasura.Backends.Postgres.SQL.Value
import Hasura.Backends.Postgres.Translate.Column (toTxtValue)
import Hasura.Backends.Postgres.Translate.Select qualified as DS
import Hasura.Backends.Postgres.Types.Column
import Hasura.Base.Error
import Hasura.GraphQL.Execute.Subscription.Plan
server: Metadata origin for definitions (type parameter version v2) The code that builds the GraphQL schema, and `buildGQLContext` in particular, is partial: not every value of `(ServerConfigCtx, GraphQLQueryType, SourceCache, HashMap RemoteSchemaName (RemoteSchemaCtx, MetadataObject), ActionCache, AnnotatedCustomTypes)` results in a valid GraphQL schema. When it fails, we want to be able to return better error messages than we currently do. The key thing that is missing is a way to trace back GraphQL type information to their origin from the Hasura metadata. Currently, we have a number of correctness checks of our GraphQL schema. But these correctness checks only have access to pure GraphQL type information, and hence can only report errors in terms of that. Possibly the worst is the "conflicting definitions" error, which, in practice, can only be debugged by Hasura engineers. This is terrible DX for customers. This PR allows us to print better error messages, by adding a field to the `Definition` type that traces the GraphQL type to its origin in the metadata. So the idea is simple: just add `MetadataObjId`, or `Maybe` that, or some other sum type of that, to `Definition`. However, we want to avoid having to import a `Hasura.RQL` module from `Hasura.GraphQL.Parser`. So we instead define this additional field of `Definition` through a new type parameter, which is threaded through in `Hasura.GraphQL.Parser`. We then define type synonyms in `Hasura.GraphQL.Schema.Parser` that fill in this type parameter, so that it is not visible for the majority of the codebase. The idea of associating metadata information to `Definition`s really comes to fruition when combined with hasura/graphql-engine-mono#4517. Their combination would allow us to use the API of fatal errors (just like the current `MonadError QErr`) to report _inconsistencies_ in the metadata. Such inconsistencies are then _automatically_ ignored. So no ad-hoc decisions need to be made on how to cut out inconsistent metadata from the GraphQL schema. This will allow us to report much better errors, as well as improve the likelihood of a successful HGE startup. PR-URL: https://github.com/hasura/graphql-engine-mono/pull/4770 Co-authored-by: Samir Talwar <47582+SamirTalwar@users.noreply.github.com> GitOrigin-RevId: 728402b0cae83ae8e83463a826ceeb609001acae
2022-06-28 18:52:26 +03:00
import Hasura.GraphQL.Parser.Names
import Hasura.Prelude
server: support remote relationships on SQL Server and BigQuery (#1497) Remote relationships are now supported on SQL Server and BigQuery. The major change though is the re-architecture of remote join execution logic. Prior to this PR, each backend is responsible for processing the remote relationships that are part of their AST. This is not ideal as there is nothing specific about a remote join's execution that ties it to a backend. The only backend specific part is whether or not the specification of the remote relationship is valid (i.e, we'll need to validate whether the scalars are compatible). The approach now changes to this: 1. Before delegating the AST to the backend, we traverse the AST, collect all the remote joins while modifying the AST to add necessary join fields where needed. 1. Once the remote joins are collected from the AST, the database call is made to fetch the response. The necessary data for the remote join(s) is collected from the database's response and one or more remote schema calls are constructed as necessary. 1. The remote schema calls are then executed and the data from the database and from the remote schemas is joined to produce the final response. ### Known issues 1. Ideally the traversal of the IR to collect remote joins should return an AST which does not include remote join fields. This operation can be type safe but isn't taken up as part of the PR. 1. There is a lot of code duplication between `Transport/HTTP.hs` and `Transport/Websocket.hs` which needs to be fixed ASAP. This too hasn't been taken up by this PR. 1. The type which represents the execution plan is only modified to handle our current remote joins and as such it will have to be changed to accommodate general remote joins. 1. Use of lenses would have reduced the boilerplate code to collect remote joins from the base AST. 1. The current remote join logic assumes that the join columns of a remote relationship appear with their names in the database response. This however is incorrect as they could be aliased. This can be taken up by anyone, I've left a comment in the code. ### Notes to the reviewers I think it is best reviewed commit by commit. 1. The first one is very straight forward. 1. The second one refactors the remote join execution logic but other than moving things around, it doesn't change the user facing functionality. This moves Postgres specific parts to `Backends/Postgres` module from `Execute`. Some IR related code to `Hasura.RQL.IR` module. Simplifies various type class function signatures as a backend doesn't have to handle remote joins anymore 1. The third one fixes partial case matches that for some weird reason weren't shown as warnings before this refactor 1. The fourth one generalizes the validation logic of remote relationships and implements `scalarTypeGraphQLName` function on SQL Server and BigQuery which is used by the validation logic. This enables remote relationships on BigQuery and SQL Server. https://github.com/hasura/graphql-engine-mono/pull/1497 GitOrigin-RevId: 77dd8eed326602b16e9a8496f52f46d22b795598
2021-06-11 06:26:50 +03:00
import Hasura.RQL.IR
import Hasura.RQL.Types.Backend
import Hasura.RQL.Types.Column
import Hasura.RQL.Types.Common
import Hasura.RQL.Types.Subscription
import Hasura.SQL.Backend
import Hasura.SQL.Types
import Hasura.Session
import Language.GraphQL.Draft.Syntax qualified as G
----------------------------------------------------------------------------------------------------
-- Variables
subsAlias :: S.TableAlias
subsAlias = S.mkTableAlias "_subs"
subsIdentifier :: TableIdentifier
subsIdentifier = S.tableAliasToIdentifier subsAlias
resultIdAlias, resultVarsAlias :: S.ColumnAlias
resultIdAlias = S.mkColumnAlias "result_id"
resultVarsAlias = S.mkColumnAlias "result_vars"
fldRespAlias :: S.TableAlias
fldRespAlias = S.mkTableAlias "_fld_resp"
fldRespIdentifier :: TableIdentifier
fldRespIdentifier = S.tableAliasToIdentifier fldRespAlias
-- | Internal: Used to collect information about various parameters
-- of a subscription field's AST as we resolve them to SQL expressions.
data QueryParametersInfo (b :: BackendType) = QueryParametersInfo
{ _qpiReusableVariableValues :: HashMap G.Name (ColumnValue b),
_qpiSyntheticVariableValues :: Seq (ColumnValue b),
-- | The session variables that are referenced in the query root fld's AST.
-- This information is used to determine a cohort's required session
-- variables
_qpiReferencedSessionVariables :: Set.HashSet SessionVariable
}
deriving (Generic)
deriving (Semigroup, Monoid) via (GenericSemigroupMonoid (QueryParametersInfo b))
makeLenses ''QueryParametersInfo
-- | Checks if the provided arguments are valid values for their corresponding types.
-- | Generates SQL of the format "select 'v1'::t1, 'v2'::t2 ..."
validateVariablesTx ::
forall pgKind f m.
(Traversable f, MonadTx m, MonadIO m) =>
f (ColumnValue ('Postgres pgKind)) ->
m (ValidatedVariables f)
validateVariablesTx variableValues = do
-- no need to test the types when there are no variables to test.
unless (null variableValues) do
let valSel = mkValidationSel $ toList variableValues
PG.Discard () <- liftTx $ PG.rawQE dataExnErrHandler (PG.fromBuilder $ toSQL valSel) [] False
pure ()
pure . ValidatedVariables $ fmap (txtEncodedVal . cvValue) variableValues
where
mkExtr = flip S.Extractor Nothing . toTxtValue
mkValidationSel vars =
S.mkSelect {S.selExtr = map mkExtr vars}
-- Explicitly look for the class of errors raised when the format of a value
-- provided for a type is incorrect.
dataExnErrHandler = mkTxErrorHandler (has _PGDataException)
----------------------------------------------------------------------------------------------------
-- Multiplexed queries
newtype MultiplexedQuery = MultiplexedQuery {unMultiplexedQuery :: PG.Query}
deriving (Eq, Hashable)
instance ToTxt MultiplexedQuery where
toTxt = PG.getQueryText . unMultiplexedQuery
toSQLFromItem ::
( Backend ('Postgres pgKind),
DS.PostgresAnnotatedFieldJSON pgKind
) =>
S.TableAlias ->
QueryDB ('Postgres pgKind) Void S.SQLExp ->
S.FromItem
toSQLFromItem = flip \case
QDBSingleRow s -> S.mkSelFromItem $ DS.mkSQLSelect JASSingleObject s
QDBMultipleRows s -> S.mkSelFromItem $ DS.mkSQLSelect JASMultipleRows s
QDBAggregation s -> S.mkSelFromItem $ DS.mkAggregateSelect s
QDBConnection s -> S.mkSelectWithFromItem $ DS.mkConnectionSelect s
QDBStreamMultipleRows s -> S.mkSelFromItem $ DS.mkStreamSQLSelect s
mkMultiplexedQuery ::
( Backend ('Postgres pgKind),
DS.PostgresAnnotatedFieldJSON pgKind
) =>
OMap.InsOrdHashMap G.Name (QueryDB ('Postgres pgKind) Void S.SQLExp) ->
MultiplexedQuery
mkMultiplexedQuery rootFields =
MultiplexedQuery . PG.fromBuilder . toSQL $
S.mkSelect
{ S.selExtr =
-- SELECT _subs.result_id, _fld_resp.root AS result
[ S.Extractor (mkQualifiedIdentifier subsIdentifier (Identifier "result_id")) Nothing,
S.Extractor (mkQualifiedIdentifier fldRespIdentifier (Identifier "root")) (Just $ S.toColumnAlias $ Identifier "result")
],
S.selFrom =
Just $
S.FromExp
[ S.FIJoin $
S.JoinExpr subsInputFromItem S.LeftOuter responseLateralFromItem (S.JoinOn $ S.BELit True)
]
}
where
-- FROM unnest($1::uuid[], $2::json[]) _subs (result_id, result_vars)
subsInputFromItem =
S.FIUnnest
[S.SEPrep 1 `S.SETyAnn` S.TypeAnn "uuid[]", S.SEPrep 2 `S.SETyAnn` S.TypeAnn "json[]"]
(S.toTableAlias $ Identifier "_subs")
[S.toColumnAlias $ Identifier "result_id", S.toColumnAlias $ Identifier "result_vars"]
-- LEFT OUTER JOIN LATERAL ( ... ) _fld_resp
responseLateralFromItem = S.mkLateralFromItem selectRootFields fldRespAlias
selectRootFields =
S.mkSelect
{ S.selExtr = [S.Extractor rootFieldsJsonAggregate (Just $ S.toColumnAlias $ Identifier "root")],
S.selFrom =
Just . S.FromExp $
OMap.toList rootFields <&> \(fieldAlias, resolvedAST) ->
toSQLFromItem (S.mkTableAlias $ G.unName fieldAlias) resolvedAST
}
-- json_build_object('field1', field1.root, 'field2', field2.root, ...)
rootFieldsJsonAggregate = S.SEFnApp "json_build_object" rootFieldsJsonPairs Nothing
rootFieldsJsonPairs = flip concatMap (OMap.keys rootFields) $ \fieldAlias ->
[ S.SELit (G.unName fieldAlias),
mkQualifiedIdentifier (aliasToIdentifier fieldAlias) (Identifier "root")
]
mkQualifiedIdentifier prefix = S.SEQIdentifier . S.QIdentifier (S.QualifiedIdentifier prefix Nothing)
aliasToIdentifier = TableIdentifier . G.unName
mkStreamingMultiplexedQuery ::
( Backend ('Postgres pgKind),
DS.PostgresAnnotatedFieldJSON pgKind
) =>
(G.Name, (QueryDB ('Postgres pgKind) Void S.SQLExp)) ->
MultiplexedQuery
mkStreamingMultiplexedQuery (fieldAlias, resolvedAST) =
MultiplexedQuery . PG.fromBuilder . toSQL $
S.mkSelect
{ S.selExtr =
-- SELECT _subs.result_id, _fld_resp.root, _fld_resp.cursor AS result
[ S.Extractor (mkQualifiedIdentifier subsIdentifier (Identifier "result_id")) Nothing,
S.Extractor (mkQualifiedIdentifier fldRespIdentifier (Identifier "root")) (Just $ S.toColumnAlias $ Identifier "result"),
S.Extractor (mkQualifiedIdentifier fldRespIdentifier (Identifier "cursor")) (Just $ S.toColumnAlias $ Identifier "cursor")
],
S.selFrom =
Just $
S.FromExp
[ S.FIJoin $
S.JoinExpr subsInputFromItem S.LeftOuter responseLateralFromItem (S.JoinOn $ S.BELit True)
]
}
where
-- FROM unnest($1::uuid[], $2::json[]) _subs (result_id, result_vars)
subsInputFromItem =
S.FIUnnest
[S.SEPrep 1 `S.SETyAnn` S.TypeAnn "uuid[]", S.SEPrep 2 `S.SETyAnn` S.TypeAnn "json[]"]
subsAlias
[resultIdAlias, resultVarsAlias]
-- LEFT OUTER JOIN LATERAL ( ... ) _fld_resp
responseLateralFromItem = S.mkLateralFromItem selectRootFields (S.toTableAlias $ Identifier "_fld_resp")
selectRootFields =
S.mkSelect
{ S.selExtr = [(S.Extractor rootFieldJsonAggregate (Just $ S.toColumnAlias $ Identifier "root")), cursorExtractor],
S.selFrom =
Just . S.FromExp $
pure $
toSQLFromItem (S.mkTableAlias $ G.unName fieldAlias) resolvedAST
}
-- json_build_object('field1', field1.root, 'field2', field2.root, ...)
rootFieldJsonAggregate = S.SEFnApp "json_build_object" rootFieldJsonPair Nothing
rootFieldJsonPair =
[ S.SELit (G.unName fieldAlias),
mkQualifiedIdentifier (aliasToIdentifier fieldAlias) (Identifier "root")
]
-- to_json("root"."cursor") AS "cursor"
cursorSQLExp = S.SEFnApp "to_json" [mkQualifiedIdentifier (aliasToIdentifier fieldAlias) (Identifier "cursor")] Nothing
cursorExtractor = S.Extractor cursorSQLExp (Just $ S.toColumnAlias $ Identifier "cursor")
mkQualifiedIdentifier prefix = S.SEQIdentifier . S.QIdentifier (S.QualifiedIdentifier prefix Nothing)
aliasToIdentifier = TableIdentifier . G.unName
-- | Resolves an 'GR.UnresolvedVal' by converting 'GR.UVPG' values to SQL
-- expressions that refer to the @result_vars@ input object, collecting information
-- about various parameters of the query along the way.
resolveMultiplexedValue ::
( MonadState (QueryParametersInfo ('Postgres pgKind)) m,
MonadError QErr m
) =>
SessionVariables ->
UnpreparedValue ('Postgres pgKind) ->
m S.SQLExp
resolveMultiplexedValue allSessionVars = \case
UVParameter varM colVal -> do
server: Metadata origin for definitions (type parameter version v2) The code that builds the GraphQL schema, and `buildGQLContext` in particular, is partial: not every value of `(ServerConfigCtx, GraphQLQueryType, SourceCache, HashMap RemoteSchemaName (RemoteSchemaCtx, MetadataObject), ActionCache, AnnotatedCustomTypes)` results in a valid GraphQL schema. When it fails, we want to be able to return better error messages than we currently do. The key thing that is missing is a way to trace back GraphQL type information to their origin from the Hasura metadata. Currently, we have a number of correctness checks of our GraphQL schema. But these correctness checks only have access to pure GraphQL type information, and hence can only report errors in terms of that. Possibly the worst is the "conflicting definitions" error, which, in practice, can only be debugged by Hasura engineers. This is terrible DX for customers. This PR allows us to print better error messages, by adding a field to the `Definition` type that traces the GraphQL type to its origin in the metadata. So the idea is simple: just add `MetadataObjId`, or `Maybe` that, or some other sum type of that, to `Definition`. However, we want to avoid having to import a `Hasura.RQL` module from `Hasura.GraphQL.Parser`. So we instead define this additional field of `Definition` through a new type parameter, which is threaded through in `Hasura.GraphQL.Parser`. We then define type synonyms in `Hasura.GraphQL.Schema.Parser` that fill in this type parameter, so that it is not visible for the majority of the codebase. The idea of associating metadata information to `Definition`s really comes to fruition when combined with hasura/graphql-engine-mono#4517. Their combination would allow us to use the API of fatal errors (just like the current `MonadError QErr`) to report _inconsistencies_ in the metadata. Such inconsistencies are then _automatically_ ignored. So no ad-hoc decisions need to be made on how to cut out inconsistent metadata from the GraphQL schema. This will allow us to report much better errors, as well as improve the likelihood of a successful HGE startup. PR-URL: https://github.com/hasura/graphql-engine-mono/pull/4770 Co-authored-by: Samir Talwar <47582+SamirTalwar@users.noreply.github.com> GitOrigin-RevId: 728402b0cae83ae8e83463a826ceeb609001acae
2022-06-28 18:52:26 +03:00
varJsonPath <- case fmap getName varM of
Just varName -> do
modifying qpiReusableVariableValues $ Map.insert varName colVal
pure ["query", G.unName varName]
Nothing -> do
syntheticVarIndex <- use (qpiSyntheticVariableValues . to length)
modifying qpiSyntheticVariableValues (|> colVal)
pure ["synthetic", tshow syntheticVarIndex]
pure $ fromResVars (CollectableTypeScalar $ unsafePGColumnToBackend $ cvType colVal) varJsonPath
UVSessionVar ty sessVar -> do
_ <-
getSessionVariableValue sessVar allSessionVars
`onNothing` throw400
NotFound
("missing session variable: " <>> sessionVariableToText sessVar)
modifying qpiReferencedSessionVariables (Set.insert sessVar)
pure $ fromResVars ty ["session", sessionVariableToText sessVar]
UVLiteral sqlExp -> pure sqlExp
UVSession -> do
-- if the entire session is referenced, then add all session vars in referenced vars
modifying qpiReferencedSessionVariables (const $ getSessionVariablesSet allSessionVars)
pure $ fromResVars (CollectableTypeScalar PGJSON) ["session"]
where
fromResVars pgType jPath =
addTypeAnnotation pgType $
S.SEOpApp
(S.SQLOp "#>>")
[ S.SEQIdentifier $ S.QIdentifier (S.QualifiedIdentifier subsIdentifier Nothing) (Identifier "result_vars"),
S.SEArray $ map S.SELit jPath
]
addTypeAnnotation pgType =
flip S.SETyAnn (S.mkTypeAnn pgType) . case pgType of
CollectableTypeScalar scalarType -> withConstructorFn scalarType
CollectableTypeArray _ -> id
----------------------------------------------------------------------------------------------------
-- Execution
executeMultiplexedQuery ::
(MonadTx m) =>
MultiplexedQuery ->
[(CohortId, CohortVariables)] ->
m [(CohortId, B.ByteString)]
executeMultiplexedQuery (MultiplexedQuery query) cohorts =
executeQuery query cohorts
executeStreamingMultiplexedQuery ::
(MonadTx m) =>
MultiplexedQuery ->
[(CohortId, CohortVariables)] ->
m [(CohortId, B.ByteString, PG.ViaJSON CursorVariableValues)]
executeStreamingMultiplexedQuery (MultiplexedQuery query) cohorts = do
executeQuery query cohorts
-- | Internal; used by both 'executeMultiplexedQuery', 'executeStreamingMultiplexedQuery'
-- and 'pgDBSubscriptionExplain'.
executeQuery ::
(MonadTx m, PG.FromRes a) =>
PG.Query ->
[(CohortId, CohortVariables)] ->
m a
executeQuery query cohorts =
let (cohortIds, cohortVars) = unzip cohorts
preparedArgs = (CohortIdArray cohortIds, CohortVariablesArray cohortVars)
in liftTx $ PG.withQE defaultTxErrorHandler query preparedArgs True