From 2325755954bb3a777403503d709b412e01219ba9 Mon Sep 17 00:00:00 2001 From: Karthikeyan Chinnakonda Date: Sat, 23 Apr 2022 01:23:12 +0530 Subject: [PATCH] server: streaming subscriptions schema generation and tests (incremental PR - 3) PR-URL: https://github.com/hasura/graphql-engine-mono/pull/4259 Co-authored-by: Rikin Kachhia <54616969+rikinsk@users.noreply.github.com> Co-authored-by: Brandon Simmons <210815+jberryman@users.noreply.github.com> Co-authored-by: paritosh-08 <85472423+paritosh-08@users.noreply.github.com> GitOrigin-RevId: 4d1b4ec3c01f3a839f4392d3b77950fc3ab30236 --- .circleci/server-test-names.txt | 1 + .circleci/test-server.sh | 19 ++ .../docker-compose.yaml | 13 +- server/graphql-engine.cabal | 3 +- .../Backends/BigQuery/Instances/Schema.hs | 4 +- .../Backends/BigQuery/Instances/Types.hs | 1 + .../Backends/DataWrapper/Adapter/Schema.hs | 2 + .../Backends/MSSQL/Instances/Execute.hs | 2 +- .../Hasura/Backends/MSSQL/Instances/Schema.hs | 7 +- .../Backends/MySQL/Instances/Execute.hs | 2 +- .../Hasura/Backends/MySQL/Instances/Schema.hs | 4 +- server/src-lib/Hasura/Backends/MySQL/Plan.hs | 7 +- .../Backends/Postgres/Execute/Subscription.hs | 6 +- .../Backends/Postgres/Instances/Execute.hs | 24 +- .../Backends/Postgres/Instances/Schema.hs | 4 +- .../Backends/Postgres/Instances/Transport.hs | 4 +- server/src-lib/Hasura/GraphQL/Context.hs | 3 +- server/src-lib/Hasura/GraphQL/Execute.hs | 14 +- .../src-lib/Hasura/GraphQL/Execute/Action.hs | 7 +- .../GraphQL/Execute/Subscription/State.hs | 94 ++++--- server/src-lib/Hasura/GraphQL/Explain.hs | 11 +- .../src-lib/Hasura/GraphQL/Parser/Column.hs | 6 +- .../Hasura/GraphQL/Parser/Constants.hs | 30 +++ server/src-lib/Hasura/GraphQL/Schema.hs | 87 ++++-- .../src-lib/Hasura/GraphQL/Schema/Backend.hs | 10 +- server/src-lib/Hasura/GraphQL/Schema/Build.hs | 21 +- .../src-lib/Hasura/GraphQL/Schema/Common.hs | 6 + .../GraphQL/Schema/SubscriptionStream.hs | 254 ++++++++++++++++++ .../Hasura/GraphQL/Transport/WebSocket.hs | 14 +- server/src-lib/Hasura/RQL/DDL/Schema/Table.hs | 2 +- server/src-lib/Hasura/RQL/IR/Root.hs | 8 - server/src-lib/Hasura/Server/API/Config.hs | 4 + server/src-lib/Hasura/Server/App.hs | 23 +- server/src-lib/Hasura/Server/Init/Config.hs | 6 +- server/src-lib/Hasura/Server/Types.hs | 9 +- server/tests-py/conftest.py | 14 + server/tests-py/context.py | 1 + .../queries/remote_schemas/tbls_setup.yaml | 9 + .../queries/remote_schemas/tbls_teardown.yaml | 4 +- .../subscriptions/streaming/setup.yaml | 39 +++ .../subscriptions/streaming/steps.yaml | 91 +++++++ .../subscriptions/streaming/teardown.yaml | 7 + server/tests-py/test_events.py | 92 ++----- server/tests-py/test_subscriptions.py | 111 ++++++++ server/tests-py/utils.py | 40 +++ 45 files changed, 928 insertions(+), 192 deletions(-) create mode 100644 server/src-lib/Hasura/GraphQL/Schema/SubscriptionStream.hs create mode 100644 server/tests-py/queries/subscriptions/streaming/setup.yaml create mode 100644 server/tests-py/queries/subscriptions/streaming/steps.yaml create mode 100644 server/tests-py/queries/subscriptions/streaming/teardown.yaml diff --git a/.circleci/server-test-names.txt b/.circleci/server-test-names.txt index 995a016d9f2..af0ecafe20c 100644 --- a/.circleci/server-test-names.txt +++ b/.circleci/server-test-names.txt @@ -25,6 +25,7 @@ ws-metadata-api-disabled remote-schema-permissions function-permissions roles-inheritance +streaming-subscriptions remote-schema-https query-caching query-logs diff --git a/.circleci/test-server.sh b/.circleci/test-server.sh index 9173f07f8d5..8d4827795ba 100755 --- a/.circleci/test-server.sh +++ b/.circleci/test-server.sh @@ -719,6 +719,24 @@ roles-inheritance) kill_hge_servers ;; +streaming-subscriptions) + echo -e "\n$(time_elapsed): <########## TEST GRAPHQL-ENGINE WITH STREAMING SUBSCRIPTIONS #########################>\n" + + export HASURA_GRAPHQL_EXPERIMENTAL_FEATURES="streaming_subscriptions" + export HASURA_GRAPHQL_ADMIN_SECRET="HGE$RANDOM$RANDOM" + + run_hge_with_args serve + wait_for_port 8080 + + # run all the subscriptions tests with streaming subscriptions enabled + pytest --hge-urls "$HGE_URL" --pg-urls "$HASURA_GRAPHQL_DATABASE_URL" --hge-key="$HASURA_GRAPHQL_ADMIN_SECRET" test_subscriptions.py --test-streaming-subscriptions + + unset HASURA_GRAPHQL_ADMIN_SECRET + unset HASURA_GRAPHQL_EXPERIMENTAL_FEATURES + + kill_hge_servers + ;; + query-caching) echo -e "\n$(time_elapsed): <########## TEST GRAPHQL-ENGINE QUERY CACHING #####################################>\n" export HASURA_GRAPHQL_ADMIN_SECRET="HGE$RANDOM$RANDOM" @@ -858,6 +876,7 @@ remote-schema-https) kill $GQL_SERVER_PID ;; + post-webhook) webhook_tests_check_root diff --git a/install-manifests/docker-compose-mysql-preview/docker-compose.yaml b/install-manifests/docker-compose-mysql-preview/docker-compose.yaml index ef7fd4f69bb..cac187729d4 100644 --- a/install-manifests/docker-compose-mysql-preview/docker-compose.yaml +++ b/install-manifests/docker-compose-mysql-preview/docker-compose.yaml @@ -15,17 +15,17 @@ services: - "postgres" command: - graphql-engine - - --mysql-host + - --mysql-host - - - --mysql-user + - --mysql-user - - - --mysql-port + - --mysql-port - - - --mysql-dbname + - --mysql-dbname - - - --mysql-password + - --mysql-password - - - serve + - serve restart: always environment: HASURA_GRAPHQL_DATABASE_URL: postgres://postgres:postgrespassword@postgres:5432/postgres @@ -38,4 +38,3 @@ services: # HASURA_GRAPHQL_ADMIN_SECRET: myadminsecretkey volumes: db_data: - diff --git a/server/graphql-engine.cabal b/server/graphql-engine.cabal index 25963f3e26e..9851a2043c6 100644 --- a/server/graphql-engine.cabal +++ b/server/graphql-engine.cabal @@ -202,7 +202,6 @@ library , mtl , openapi3 , optparse-applicative - , optparse-generic , parsec , pg-client , postgresql-binary @@ -225,6 +224,7 @@ library , text-builder >= 0.6 , these , time >= 1.9 + , time-compat , transformers , transformers-base , unordered-containers >= 0.2.12 @@ -787,6 +787,7 @@ library , Hasura.GraphQL.Schema.Remote , Hasura.GraphQL.Schema.RemoteRelationship , Hasura.GraphQL.Schema.Select + , Hasura.GraphQL.Schema.SubscriptionStream , Hasura.GraphQL.Schema.Table , Hasura.GraphQL.Schema.Update , Hasura.GraphQL.Transport.Backend diff --git a/server/src-lib/Hasura/Backends/BigQuery/Instances/Schema.hs b/server/src-lib/Hasura/Backends/BigQuery/Instances/Schema.hs index 736fa3de574..8ae95e32008 100644 --- a/server/src-lib/Hasura/Backends/BigQuery/Instances/Schema.hs +++ b/server/src-lib/Hasura/Backends/BigQuery/Instances/Schema.hs @@ -1,5 +1,5 @@ {-# LANGUAGE ApplicativeDo #-} -{-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE TemplateHaskellQuotes #-} {-# OPTIONS_GHC -fno-warn-orphans #-} module Hasura.Backends.BigQuery.Instances.Schema () where @@ -33,6 +33,7 @@ instance BackendSchema 'BigQuery where -- top level parsers buildTableQueryFields = GSB.buildTableQueryFields buildTableRelayQueryFields = bqBuildTableRelayQueryFields + buildTableStreamingSubscriptionFields = GSB.buildTableStreamingSubscriptionFields buildTableInsertMutationFields = bqBuildTableInsertMutationFields buildTableUpdateMutationFields = bqBuildTableUpdateMutationFields buildTableDeleteMutationFields = bqBuildTableDeleteMutationFields @@ -43,6 +44,7 @@ instance BackendSchema 'BigQuery where -- backend extensions relayExtension = Nothing nodesAggExtension = Just () + streamSubscriptionExtension = Nothing -- table arguments tableArguments = defaultTableArgs diff --git a/server/src-lib/Hasura/Backends/BigQuery/Instances/Types.hs b/server/src-lib/Hasura/Backends/BigQuery/Instances/Types.hs index e6cf55531af..9900b818dec 100644 --- a/server/src-lib/Hasura/Backends/BigQuery/Instances/Types.hs +++ b/server/src-lib/Hasura/Backends/BigQuery/Instances/Types.hs @@ -35,6 +35,7 @@ instance Backend 'BigQuery where type XRelay 'BigQuery = XDisable type XNodesAgg 'BigQuery = XEnable type XNestedInserts 'BigQuery = XDisable + type XStreamingSubscription 'BigQuery = XDisable type ExtraTableMetadata 'BigQuery = () diff --git a/server/src-lib/Hasura/Backends/DataWrapper/Adapter/Schema.hs b/server/src-lib/Hasura/Backends/DataWrapper/Adapter/Schema.hs index 20282c77683..72c2887fced 100644 --- a/server/src-lib/Hasura/Backends/DataWrapper/Adapter/Schema.hs +++ b/server/src-lib/Hasura/Backends/DataWrapper/Adapter/Schema.hs @@ -38,10 +38,12 @@ instance BackendSchema 'DataWrapper where buildTableInsertMutationFields _ _ _ _ _ = pure [] buildTableUpdateMutationFields _ _ _ _ = pure [] buildTableDeleteMutationFields _ _ _ _ = pure [] + buildTableStreamingSubscriptionFields _ _ _ _ = pure [] -- backend extensions relayExtension = Nothing nodesAggExtension = Nothing + streamSubscriptionExtension = Nothing -- table arguments tableArguments = tableArgs' diff --git a/server/src-lib/Hasura/Backends/MSSQL/Instances/Execute.hs b/server/src-lib/Hasura/Backends/MSSQL/Instances/Execute.hs index 41863f06473..f80b012f5ad 100644 --- a/server/src-lib/Hasura/Backends/MSSQL/Instances/Execute.hs +++ b/server/src-lib/Hasura/Backends/MSSQL/Instances/Execute.hs @@ -274,7 +274,7 @@ prepareStateCohortVariables sourceConfig session prepState = do session namedVars posVars - mempty + mempty -- streaming cursor variables are kept empty because streaming subscriptions aren't yet supported for MS-SQL -- | Ensure that the set of variables (with value instantiations) that occur in -- a (RQL) query produce a well-formed and executable (SQL) query when diff --git a/server/src-lib/Hasura/Backends/MSSQL/Instances/Schema.hs b/server/src-lib/Hasura/Backends/MSSQL/Instances/Schema.hs index cf30dd69736..cdefcd842fc 100644 --- a/server/src-lib/Hasura/Backends/MSSQL/Instances/Schema.hs +++ b/server/src-lib/Hasura/Backends/MSSQL/Instances/Schema.hs @@ -1,5 +1,5 @@ {-# LANGUAGE ApplicativeDo #-} -{-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE TemplateHaskellQuotes #-} {-# OPTIONS_GHC -fno-warn-orphans #-} -- | MSSQL Instances Schema @@ -44,8 +44,8 @@ instance BackendSchema 'MSSQL where -- top level parsers buildTableQueryFields = GSB.buildTableQueryFields buildTableRelayQueryFields = msBuildTableRelayQueryFields - buildTableInsertMutationFields = - GSB.buildTableInsertMutationFields backendInsertParser + buildTableStreamingSubscriptionFields = GSB.buildTableStreamingSubscriptionFields + buildTableInsertMutationFields = GSB.buildTableInsertMutationFields backendInsertParser buildTableDeleteMutationFields = GSB.buildTableDeleteMutationFields buildTableUpdateMutationFields = msBuildTableUpdateMutationFields @@ -56,6 +56,7 @@ instance BackendSchema 'MSSQL where -- backend extensions relayExtension = Nothing nodesAggExtension = Just () + streamSubscriptionExtension = Nothing -- table arguments tableArguments = msTableArgs diff --git a/server/src-lib/Hasura/Backends/MySQL/Instances/Execute.hs b/server/src-lib/Hasura/Backends/MySQL/Instances/Execute.hs index 9c2801f062f..1ab9c040d4e 100644 --- a/server/src-lib/Hasura/Backends/MySQL/Instances/Execute.hs +++ b/server/src-lib/Hasura/Backends/MySQL/Instances/Execute.hs @@ -36,7 +36,7 @@ instance BackendExecute 'MySQL where mkDBQueryPlan = mysqlDBQueryPlan mkDBMutationPlan = error "mkDBMutationPlan: MySQL backend does not support this operation yet." mkLiveQuerySubscriptionPlan _ _ _ _ = error "mkLiveQuerySubscriptionPlan: MySQL backend does not support this operation yet." - mkDBStreamingSubscriptionPlan _ _ _ _ = error "mkLiveQuerySubscriptionPlan: MySQL backend does not support this operation yet." + mkDBStreamingSubscriptionPlan _ _ _ _ = error "mkDBStreamingSubscriptionPlan: MySQL backend does not support this operation yet." mkDBQueryExplain = mysqlDBQueryExplain mkSubscriptionExplain _ = error "mkSubscriptionExplain: MySQL backend does not support this operation yet." mkDBRemoteRelationshipPlan = error "mkDBRemoteRelationshipPlan: MySQL does not support this operation yet." diff --git a/server/src-lib/Hasura/Backends/MySQL/Instances/Schema.hs b/server/src-lib/Hasura/Backends/MySQL/Instances/Schema.hs index 5b5e2240080..f15d357af36 100644 --- a/server/src-lib/Hasura/Backends/MySQL/Instances/Schema.hs +++ b/server/src-lib/Hasura/Backends/MySQL/Instances/Schema.hs @@ -1,5 +1,5 @@ {-# LANGUAGE ApplicativeDo #-} -{-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE TemplateHaskellQuotes #-} {-# OPTIONS_GHC -fno-warn-orphans #-} module Hasura.Backends.MySQL.Instances.Schema () where @@ -30,6 +30,7 @@ import Language.GraphQL.Draft.Syntax qualified as G instance BackendSchema 'MySQL where buildTableQueryFields = GSB.buildTableQueryFields buildTableRelayQueryFields = buildTableRelayQueryFields' + buildTableStreamingSubscriptionFields = GSB.buildTableStreamingSubscriptionFields buildTableInsertMutationFields = buildTableInsertMutationFields' buildTableUpdateMutationFields = buildTableUpdateMutationFields' buildTableDeleteMutationFields = buildTableDeleteMutationFields' @@ -39,6 +40,7 @@ instance BackendSchema 'MySQL where relayExtension = Nothing tableArguments = mysqlTableArgs nodesAggExtension = Just () + streamSubscriptionExtension = Nothing columnParser = columnParser' jsonPathArg = jsonPathArg' orderByOperators = orderByOperators' diff --git a/server/src-lib/Hasura/Backends/MySQL/Plan.hs b/server/src-lib/Hasura/Backends/MySQL/Plan.hs index 92eb98cd82d..555c9ff7118 100644 --- a/server/src-lib/Hasura/Backends/MySQL/Plan.hs +++ b/server/src-lib/Hasura/Backends/MySQL/Plan.hs @@ -42,11 +42,8 @@ planQuery :: m Select planQuery sessionVariables queryDB = do rootField <- traverse (prepareValueQuery sessionVariables) queryDB - sel <- - runValidate (runFromIr (fromRootField rootField)) - `onLeft` (throw400 NotSupported . tshow) - pure $ - sel + runValidate (runFromIr (fromRootField rootField)) + `onLeft` (throw400 NotSupported . tshow) -- | Prepare a value without any query planning; we just execute the -- query with the values embedded. diff --git a/server/src-lib/Hasura/Backends/Postgres/Execute/Subscription.hs b/server/src-lib/Hasura/Backends/Postgres/Execute/Subscription.hs index e849c8467da..dbe3dadbfa0 100644 --- a/server/src-lib/Hasura/Backends/Postgres/Execute/Subscription.hs +++ b/server/src-lib/Hasura/Backends/Postgres/Execute/Subscription.hs @@ -18,6 +18,7 @@ module Hasura.Backends.Postgres.Execute.Subscription executeMultiplexedQuery, executeStreamingMultiplexedQuery, executeQuery, + SubscriptionType (..), ) where @@ -163,7 +164,7 @@ mkMultiplexedQuery rootFields = mkQualifiedIdentifier (aliasToIdentifier fieldAlias) (Identifier "root") ] - mkQualifiedIdentifier prefix = S.SEQIdentifier . S.QIdentifier (S.QualifiedIdentifier prefix Nothing) -- TODO fix this Nothing of course + mkQualifiedIdentifier prefix = S.SEQIdentifier . S.QIdentifier (S.QualifiedIdentifier prefix Nothing) aliasToIdentifier = Identifier . G.unName mkStreamingMultiplexedQuery :: @@ -274,7 +275,8 @@ executeMultiplexedQuery :: MultiplexedQuery -> [(CohortId, CohortVariables)] -> m [(CohortId, B.ByteString)] -executeMultiplexedQuery (MultiplexedQuery query) = executeQuery query +executeMultiplexedQuery (MultiplexedQuery query) cohorts = + executeQuery query cohorts executeStreamingMultiplexedQuery :: (MonadTx m) => diff --git a/server/src-lib/Hasura/Backends/Postgres/Instances/Execute.hs b/server/src-lib/Hasura/Backends/Postgres/Instances/Execute.hs index 2423ebb6c90..d095f7c76fc 100644 --- a/server/src-lib/Hasura/Backends/Postgres/Instances/Execute.hs +++ b/server/src-lib/Hasura/Backends/Postgres/Instances/Execute.hs @@ -59,7 +59,10 @@ import Hasura.GraphQL.Namespace ( RootFieldAlias (..), RootFieldMap, ) -import Hasura.GraphQL.Parser (UnpreparedValue (..)) +import Hasura.GraphQL.Namespace qualified as G +import Hasura.GraphQL.Parser + ( UnpreparedValue (..), + ) import Hasura.Prelude import Hasura.QueryTags ( QueryTagsComment (..), @@ -75,18 +78,19 @@ import Hasura.RQL.IR.Update qualified as IR import Hasura.RQL.Types ( Backend (..), BackendType (Postgres), - FieldName, - JsonAggSelect (..), - SourceName, - getFieldNameTxt, + ColumnInfo (..), liftTx, ) import Hasura.RQL.Types.Column - ( ColumnInfo (..), - ColumnType (..), + ( ColumnType (..), ColumnValue (..), ) -import Hasura.RQL.Types.Common (StringifyNumbers) +import Hasura.RQL.Types.Common + ( FieldName (..), + JsonAggSelect (..), + SourceName, + StringifyNumbers, + ) import Hasura.SQL.AnyBackend qualified as AB import Hasura.Session (UserInfo (..)) import Hasura.Tracing qualified as Tracing @@ -309,7 +313,7 @@ pgDBLiveQuerySubscriptionPlan :: pgDBLiveQuerySubscriptionPlan userInfo _sourceName sourceConfig namespace unpreparedAST = do (preparedAST, PGL.QueryParametersInfo {..}) <- flip runStateT mempty $ - for unpreparedAST $ traverse (PGL.resolveMultiplexedValue $ _uiSession userInfo) + for unpreparedAST $ traverse (PGL.resolveMultiplexedValue (_uiSession userInfo)) subscriptionQueryTagsComment <- ask let multiplexedQuery = PGL.mkMultiplexedQuery $ OMap.mapKeys _rfaAlias preparedAST multiplexedQueryWithQueryTags = @@ -351,7 +355,7 @@ pgDBStreamingSubscriptionPlan userInfo _sourceName sourceConfig (rootFieldAlias, flip runStateT mempty $ traverse (PGL.resolveMultiplexedValue (_uiSession userInfo)) unpreparedAST subscriptionQueryTagsComment <- ask - let multiplexedQuery = PGL.mkStreamingMultiplexedQuery (_rfaAlias rootFieldAlias, preparedAST) + let multiplexedQuery = PGL.mkStreamingMultiplexedQuery (G._rfaAlias rootFieldAlias, preparedAST) multiplexedQueryWithQueryTags = multiplexedQuery {PGL.unMultiplexedQuery = appendSQLWithQueryTags (PGL.unMultiplexedQuery multiplexedQuery) subscriptionQueryTagsComment} roleName = _uiRole userInfo diff --git a/server/src-lib/Hasura/Backends/Postgres/Instances/Schema.hs b/server/src-lib/Hasura/Backends/Postgres/Instances/Schema.hs index 4dfc74c7b12..f5d882f7c3d 100644 --- a/server/src-lib/Hasura/Backends/Postgres/Instances/Schema.hs +++ b/server/src-lib/Hasura/Backends/Postgres/Instances/Schema.hs @@ -1,5 +1,5 @@ {-# LANGUAGE ApplicativeDo #-} -{-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE TemplateHaskellQuotes #-} {-# LANGUAGE UndecidableInstances #-} {-# OPTIONS_GHC -fno-warn-orphans #-} @@ -135,6 +135,7 @@ instance -- top level parsers buildTableQueryFields = GSB.buildTableQueryFields buildTableRelayQueryFields = pgkBuildTableRelayQueryFields + buildTableStreamingSubscriptionFields = GSB.buildTableStreamingSubscriptionFields buildTableInsertMutationFields = GSB.buildTableInsertMutationFields backendInsertParser buildTableUpdateMutationFields = pgkBuildTableUpdateMutationFields buildTableDeleteMutationFields = GSB.buildTableDeleteMutationFields @@ -149,6 +150,7 @@ instance -- backend extensions relayExtension = pgkRelayExtension @pgKind nodesAggExtension = Just () + streamSubscriptionExtension = Just () -- indivdual components columnParser = columnParser diff --git a/server/src-lib/Hasura/Backends/Postgres/Instances/Transport.hs b/server/src-lib/Hasura/Backends/Postgres/Instances/Transport.hs index bd5af91ff32..4700284f796 100644 --- a/server/src-lib/Hasura/Backends/Postgres/Instances/Transport.hs +++ b/server/src-lib/Hasura/Backends/Postgres/Instances/Transport.hs @@ -112,9 +112,7 @@ runPGSubscription :: m (DiffTime, Either QErr [(CohortId, B.ByteString)]) runPGSubscription sourceConfig query variables = withElapsedTime $ - runExceptT $ - runQueryTx (_pscExecCtx sourceConfig) $ - PGL.executeMultiplexedQuery query variables + runExceptT $ runQueryTx (_pscExecCtx sourceConfig) $ PGL.executeMultiplexedQuery query variables runPGStreamingSubscription :: MonadIO m => diff --git a/server/src-lib/Hasura/GraphQL/Context.hs b/server/src-lib/Hasura/GraphQL/Context.hs index 588c465bd15..778ef07ed71 100644 --- a/server/src-lib/Hasura/GraphQL/Context.hs +++ b/server/src-lib/Hasura/GraphQL/Context.hs @@ -30,7 +30,8 @@ $(deriveToJSON hasuraJSON ''RoleContext) data GQLContext = GQLContext { gqlQueryParser :: ParserFn (RootFieldMap (IR.QueryRootField UnpreparedValue)), - gqlMutationParser :: Maybe (ParserFn (RootFieldMap (IR.MutationRootField UnpreparedValue))) + gqlMutationParser :: Maybe (ParserFn (RootFieldMap (IR.MutationRootField UnpreparedValue))), + gqlSubscriptionParser :: Maybe (ParserFn (RootFieldMap (IR.QueryRootField UnpreparedValue))) } instance J.ToJSON GQLContext where diff --git a/server/src-lib/Hasura/GraphQL/Execute.hs b/server/src-lib/Hasura/GraphQL/Execute.hs index 2335d571c08..6ab9b918a09 100644 --- a/server/src-lib/Hasura/GraphQL/Execute.hs +++ b/server/src-lib/Hasura/GraphQL/Execute.hs @@ -33,6 +33,7 @@ import Hasura.GraphQL.Execute.Common qualified as EC import Hasura.GraphQL.Execute.Mutation qualified as EM import Hasura.GraphQL.Execute.Query qualified as EQ import Hasura.GraphQL.Execute.RemoteJoin qualified as RJ +import Hasura.GraphQL.Execute.Resolve qualified as ER import Hasura.GraphQL.Execute.Subscription.Plan qualified as ES import Hasura.GraphQL.Execute.Types qualified as ET import Hasura.GraphQL.Namespace @@ -377,9 +378,14 @@ getResolvedExecPlan maybeOperationName pure (parameterizedQueryHash, MutationExecutionPlan executionPlan) G.TypedOperationDefinition G.OperationTypeSubscription _ varDefs directives inlinedSelSet -> do - -- Parse as query to check correctness - (unpreparedAST, normalizedDirectives, normalizedSelectionSet) <- - EQ.parseGraphQLQuery gCtx varDefs (_grVariables reqUnparsed) directives inlinedSelSet + (normalizedDirectives, normalizedSelectionSet) <- + ER.resolveVariables + varDefs + (fromMaybe mempty (_grVariables reqUnparsed)) + directives + inlinedSelSet + subscriptionParser <- C.gqlSubscriptionParser gCtx `onNothing` throw400 ValidationFailed "no subscriptions exist" + unpreparedAST <- (subscriptionParser >>> (`onLeft` reportParseErrors)) normalizedSelectionSet let parameterizedQueryHash = calculateParameterizedQueryHash normalizedSelectionSet -- Process directives on the subscription dirMap <- @@ -391,8 +397,8 @@ getResolvedExecPlan -- SUPPORTED FEATURE. We might remove it in the future without warning. DO NOT USE THIS. allowMultipleRootFields <- withDirective dirMap multipleRootFields $ pure . isJust case inlinedSelSet of - [_] -> pure () [] -> throw500 "empty selset for subscription" + [_] -> pure () _ -> unless (allowMultipleRootFields && isSingleNamespace unpreparedAST) $ throw400 ValidationFailed "subscriptions must select one top level field" diff --git a/server/src-lib/Hasura/GraphQL/Execute/Action.hs b/server/src-lib/Hasura/GraphQL/Execute/Action.hs index 5a7d65098af..de03e776d6b 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/Action.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/Action.hs @@ -270,7 +270,12 @@ resolveAsyncActionQuery userInfo annAction = AsyncErrors -> mkAnnFldFromPGCol errorsColumn jsonbToRecordSet = QualifiedObject "pg_catalog" $ FunctionName "jsonb_to_recordset" - actionLogInput = UVParameter Nothing $ ColumnValue (ColumnScalar PGJSONB) $ PGValJSONB $ Q.JSONB $ J.toJSON [actionLogResponse] + actionLogInput = + UVParameter Nothing $ + ColumnValue (ColumnScalar PGJSONB) $ + PGValJSONB $ + Q.JSONB $ + J.toJSON [actionLogResponse] functionArgs = RS.FunctionArgsExp [RS.AEInput actionLogInput] mempty tableFromExp = RS.FromFunction jsonbToRecordSet functionArgs $ diff --git a/server/src-lib/Hasura/GraphQL/Execute/Subscription/State.hs b/server/src-lib/Hasura/GraphQL/Execute/Subscription/State.hs index 603403cb047..0d21b14251e 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/Subscription/State.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/Subscription/State.hs @@ -65,8 +65,8 @@ data SubscriptionsState = SubscriptionsState _ssLiveQueryMap :: PollerMap (), _ssStreamQueryMap :: PollerMap (STM.TVar CursorVariableValues), -- | A hook function which is run after each fetch cycle - _ssPostPollHook :: !SubscriptionPostPollHook, - _ssAsyncActions :: !AsyncActionSubscriptionState + _ssPostPollHook :: SubscriptionPostPollHook, + _ssAsyncActions :: AsyncActionSubscriptionState } initSubscriptionsState :: @@ -105,6 +105,39 @@ type LiveQuerySubscriberDetails = SubscriberDetails CohortKey -- details and then stop it. type StreamingSubscriberDetails = SubscriberDetails (CohortKey, STM.TVar CursorVariableValues) +-- | `findPollerForSubscriber` places a subscriber in the correct poller. +-- If the poller doesn't exist then we create one otherwise we return the +-- existing one. +findPollerForSubscriber :: + Subscriber -> + CohortId -> + PollerMap streamCursorVars -> + PollerKey -> + CohortKey -> + (Subscriber -> Cohort streamCursorVars -> STM.STM streamCursorVars) -> + (Subscriber -> CohortId -> Poller streamCursorVars -> STM.STM streamCursorVars) -> + STM.STM ((Maybe (Poller streamCursorVars)), streamCursorVars) +findPollerForSubscriber subscriber cohortId pollerMap pollerKey cohortKey addToCohort addToPoller = + -- a handler is returned only when it is newly created + STMMap.lookup pollerKey pollerMap >>= \case + Just poller -> do + -- Found a poller, now check if a cohort also exists + cursorVars <- + TMap.lookup cohortKey (_pCohorts poller) >>= \case + -- cohort found too! Simply add the subscriber to the cohort + Just cohort -> addToCohort subscriber cohort + -- cohort not found. Create a cohort with the subscriber and add + -- the cohort to the poller + Nothing -> addToPoller subscriber cohortId poller + return (Nothing, cursorVars) + Nothing -> do + -- no poller found, so create one with the cohort + -- and the subscriber within it. + !poller <- Poller <$> TMap.new <*> STM.newEmptyTMVar + cursorVars <- addToPoller subscriber cohortId poller + STMMap.insert poller pollerKey pollerMap + return $ (Just poller, cursorVars) + -- | Fork a thread handling a regular (live query) subscription addLiveQuery :: forall b. @@ -142,33 +175,28 @@ addLiveQuery let !subscriber = Subscriber subscriberId subscriberMetadata requestId operationName onResultAction $assertNFHere subscriber -- so we don't write thunks to mutable vars - - -- a handler is returned only when it is newly created - handlerM <- + (pollerMaybe, ()) <- STM.atomically $ - STMMap.lookup handlerId lqMap >>= \case - Just handler -> do - TMap.lookup cohortKey (_pCohorts handler) >>= \case - Just cohort -> addToCohort subscriber cohort - Nothing -> addToPoller subscriber cohortId handler - return Nothing - Nothing -> do - !poller <- newPoller - addToPoller subscriber cohortId poller - STMMap.insert poller handlerId lqMap - return $ Just poller + findPollerForSubscriber + subscriber + cohortId + lqMap + handlerId + cohortKey + addToCohort + addToPoller -- we can then attach a polling thread if it is new the livequery can only be -- cancelled after putTMVar - onJust handlerM $ \handler -> do + onJust pollerMaybe $ \poller -> do pollerId <- PollerId <$> UUID.nextRandom threadRef <- forkImmortal ("pollLiveQuery." <> show pollerId) logger $ forever $ do - pollLiveQuery @b pollerId lqOpts (source, sourceConfig) role parameterizedQueryHash query (_pCohorts handler) postPollHook + pollLiveQuery @b pollerId lqOpts (source, sourceConfig) role parameterizedQueryHash query (_pCohorts poller) postPollHook sleep $ unNonNegativeDiffTime $ unRefetchInterval refetchInterval let !pState = PollerIOState threadRef pollerId $assertNFHere pState -- so we don't write thunks to mutable vars - STM.atomically $ STM.putTMVar (_pIOState handler) pState + STM.atomically $ STM.putTMVar (_pIOState poller) pState liftIO $ EKG.Gauge.inc $ smActiveSubscriptions serverMetrics @@ -193,11 +221,7 @@ addLiveQuery addToCohort subscriber newCohort TMap.insert newCohort cohortKey $ _pCohorts handler - newPoller = Poller <$> TMap.new <*> STM.newEmptyTMVar - -- | Fork a thread handling a streaming subscription --- --- TODO can we DRY and combine this with 'addLiveQuery'? addStreamSubscriptionQuery :: forall b. BackendTransport b => @@ -237,22 +261,16 @@ addStreamSubscriptionQuery let !subscriber = Subscriber subscriberId subscriberMetadata requestId operationName onResultAction $assertNFHere subscriber -- so we don't write thunks to mutable vars - - -- a handler is returned only when it is newly created (handlerM, cohortCursorTVar) <- STM.atomically $ - STMMap.lookup handlerId streamQueryMap >>= \case - Just handler -> do - cohortCursorTVar <- - TMap.lookup cohortKey (_pCohorts handler) >>= \case - Just cohort -> addToCohort subscriber cohort - Nothing -> addToPoller subscriber cohortId handler - return (Nothing, cohortCursorTVar) - Nothing -> do - !poller <- newPoller - cohortCursorTVar <- addToPoller subscriber cohortId poller - STMMap.insert poller handlerId streamQueryMap - return $ (Just poller, cohortCursorTVar) + findPollerForSubscriber + subscriber + cohortId + streamQueryMap + handlerId + cohortKey + addToCohort + addToPoller -- we can then attach a polling thread if it is new the subscription can only be -- cancelled after putTMVar @@ -288,8 +306,6 @@ addStreamSubscriptionQuery TMap.insert newCohort cohortKey $ _pCohorts handler pure cohortCursorVals - newPoller = Poller <$> TMap.new <*> STM.newEmptyTMVar - removeLiveQuery :: L.Logger L.Hasura -> ServerMetrics -> diff --git a/server/src-lib/Hasura/GraphQL/Explain.hs b/server/src-lib/Hasura/GraphQL/Explain.hs index 2a7b606b6e2..4db60656986 100644 --- a/server/src-lib/Hasura/GraphQL/Explain.hs +++ b/server/src-lib/Hasura/GraphQL/Explain.hs @@ -13,12 +13,14 @@ import Data.HashMap.Strict qualified as Map import Data.HashMap.Strict.InsOrd qualified as OMap import Hasura.Base.Error import Hasura.EncJSON +import Hasura.GraphQL.Context qualified as C import Hasura.GraphQL.Execute qualified as E import Hasura.GraphQL.Execute.Action qualified as E import Hasura.GraphQL.Execute.Backend import Hasura.GraphQL.Execute.Instances () import Hasura.GraphQL.Execute.Query qualified as E import Hasura.GraphQL.Execute.RemoteJoin.Collect qualified as RJ +import Hasura.GraphQL.Execute.Resolve qualified as ER import Hasura.GraphQL.Namespace (RootFieldAlias) import Hasura.GraphQL.ParameterizedQueryHash import Hasura.GraphQL.Parser @@ -100,7 +102,14 @@ explainGQLQuery sc (GQLExplain query userVarsRaw maybeIsRelay) = do G.TypedOperationDefinition G.OperationTypeMutation _ _ _ _ -> throw400 InvalidParams "only queries can be explained" G.TypedOperationDefinition G.OperationTypeSubscription _ varDefs directives inlinedSelSet -> do - (unpreparedQueries, _, normalizedSelectionSet) <- E.parseGraphQLQuery graphQLContext varDefs (GH._grVariables query) directives inlinedSelSet + (_normalizedDirectives, normalizedSelectionSet) <- + ER.resolveVariables + varDefs + (fromMaybe mempty (GH._grVariables query)) + directives + inlinedSelSet + subscriptionParser <- C.gqlSubscriptionParser graphQLContext `onNothing` throw400 NotFound "no subscriptions found" + unpreparedQueries <- (subscriptionParser >>> (`onLeft` reportParseErrors)) normalizedSelectionSet let parameterizedQueryHash = calculateParameterizedQueryHash normalizedSelectionSet -- TODO: validate directives here -- query-tags are not necessary for EXPLAIN API diff --git a/server/src-lib/Hasura/GraphQL/Parser/Column.hs b/server/src-lib/Hasura/GraphQL/Parser/Column.hs index 16949a8808e..a7e136064ad 100644 --- a/server/src-lib/Hasura/GraphQL/Parser/Column.hs +++ b/server/src-lib/Hasura/GraphQL/Parser/Column.hs @@ -63,8 +63,10 @@ openValueOrigin (ValueWithOrigin _ a) = a openValueOrigin (ValueNoOrigin a) = a mkParameter :: ValueWithOrigin (ColumnValue b) -> UnpreparedValue b -mkParameter (ValueWithOrigin valInfo columnValue) = UVParameter (Just valInfo) columnValue -mkParameter (ValueNoOrigin columnValue) = UVParameter Nothing columnValue +mkParameter (ValueWithOrigin valInfo columnValue) = + UVParameter (Just valInfo) columnValue +mkParameter (ValueNoOrigin columnValue) = + UVParameter Nothing columnValue -- TODO: figure out what the purpose of this method is. peelWithOrigin :: MonadParse m => Parser 'Both m a -> Parser 'Both m (ValueWithOrigin a) diff --git a/server/src-lib/Hasura/GraphQL/Parser/Constants.hs b/server/src-lib/Hasura/GraphQL/Parser/Constants.hs index 11a93721194..64707108a3d 100644 --- a/server/src-lib/Hasura/GraphQL/Parser/Constants.hs +++ b/server/src-lib/Hasura/GraphQL/Parser/Constants.hs @@ -14,6 +14,9 @@ import Language.GraphQL.Draft.Syntax.QQ as G _A :: G.Name _A = [G.name|A|] +_ASC :: G.Name +_ASC = [G.name|ASC|] + _Boolean :: G.Name _Boolean = [G.name|Boolean|] @@ -35,6 +38,9 @@ _Datetime = [G.name|Datetime|] _Date :: G.Name _Date = [G.name|Date|] +_DESC :: G.Name +_DESC = [G.name|DESC|] + _Double :: G.Name _Double = [G.name|Double|] @@ -398,6 +404,15 @@ __st_touches = [G.name|_st_touches|] __st_within :: G.Name __st_within = [G.name|_st_within|] +__subscription :: G.Name +__subscription = [G.name|_subscription|] + +__stream_cursor_input :: G.Name +__stream_cursor_input = [G.name|_stream_cursor_input|] + +__stream_cursor_value_input :: G.Name +__stream_cursor_value_input = [G.name|_stream_cursor_value_input|] + __update_column :: G.Name __update_column = [G.name|_update_column|] @@ -431,6 +446,9 @@ _avg = [G.name|avg|] _a :: G.Name _a = [G.name|a|] +_batch_size :: G.Name +_batch_size = [G.name|batch_size|] + _before :: G.Name _before = [G.name|before|] @@ -461,6 +479,9 @@ _created_at = [G.name|created_at|] _cursor :: G.Name _cursor = [G.name|cursor|] +_cursor_ordering :: G.Name +_cursor_ordering = [G.name|cursor_ordering|] + _data :: G.Name _data = [G.name|data|] @@ -545,6 +566,9 @@ _includeDeprecated = [G.name|includeDeprecated|] _include :: G.Name _include = [G.name|include|] +_initial_value :: G.Name +_initial_value = [G.name|initial_value|] + _inputFields :: G.Name _inputFields = [G.name|inputFields|] @@ -617,6 +641,9 @@ _on_conflict = [G.name|on_conflict|] _order_by :: G.Name _order_by = [G.name|order_by|] +_ordering :: G.Name +_ordering = [G.name|ordering|] + _output :: G.Name _output = [G.name|output|] @@ -677,6 +704,9 @@ _stddev_samp = [G.name|stddev_samp|] _stddev :: G.Name _stddev = [G.name|stddev|] +_stream :: G.Name +_stream = [G.name|_stream|] + _subscriptionType :: G.Name _subscriptionType = [G.name|subscriptionType|] diff --git a/server/src-lib/Hasura/GraphQL/Schema.hs b/server/src-lib/Hasura/GraphQL/Schema.hs index 649355300e7..1ba075b4222 100644 --- a/server/src-lib/Hasura/GraphQL/Schema.hs +++ b/server/src-lib/Hasura/GraphQL/Schema.hs @@ -95,6 +95,7 @@ buildGQLContext ServerConfigCtx {..} queryType sources allRemoteSchemas allActio allActionInfos = Map.elems allActions allTableRoles = Set.fromList $ getTableRoles =<< Map.elems sources allRoles = nonTableRoles <> allTableRoles + roleContexts <- -- Buld role contexts in parallel. We'd prefer deterministic parallelism -- but that isn't really acheivable (see mono #3829). NOTE: the admin role @@ -113,6 +114,7 @@ buildGQLContext ServerConfigCtx {..} queryType sources allRemoteSchemas allActio customTypes role _sccRemoteSchemaPermsCtx + (bool StreamingSubscriptionsDisabled StreamingSubscriptionsEnabled $ EFStreamingSubscriptions `elem` _sccExperimentalFeatures) QueryRelay -> (,mempty,G.SchemaIntrospection mempty) <$> buildRelayRoleContext @@ -145,12 +147,13 @@ buildRoleContext :: AnnotatedCustomTypes -> RoleName -> RemoteSchemaPermsCtx -> + StreamingSubscriptionsCtx -> m ( RoleContext GQLContext, HashSet InconsistentMetadata, G.SchemaIntrospection ) -buildRoleContext options sources remotes allActionInfos customTypes role remoteSchemaPermsCtx = do +buildRoleContext options sources remotes allActionInfos customTypes role remoteSchemaPermsCtx streamingSubscriptionsCtx = do let ( SQLGenCtx stringifyNum dangerousBooleanCollapse optimizePermissionFilters, queryType, functionPermsCtx @@ -165,7 +168,7 @@ buildRoleContext options sources remotes allActionInfos customTypes role remoteS optimizePermissionFilters runMonadSchema role roleQueryContext sources (fst <$> remotes) $ do -- build all sources - (sourcesQueryFields, sourcesMutationFrontendFields, sourcesMutationBackendFields) <- + (sourcesQueryFields, sourcesMutationFrontendFields, sourcesMutationBackendFields, subscriptionFields) <- fmap mconcat $ traverse (buildBackendSource buildSource) $ toList sources -- build all remote schemas -- we only keep the ones that don't result in a name conflict @@ -173,13 +176,14 @@ buildRoleContext options sources remotes allActionInfos customTypes role remoteS buildAndValidateRemoteSchemas remotes sourcesQueryFields sourcesMutationBackendFields role remoteSchemaPermsCtx let remotesQueryFields = concatMap piQuery remoteSchemaFields remotesMutationFields = concat $ mapMaybe piMutation remoteSchemaFields + remotesSubscriptionFields = concat $ mapMaybe piSubscription remoteSchemaFields mutationParserFrontend <- buildMutationParser remotesMutationFields allActionInfos customTypes sourcesMutationFrontendFields mutationParserBackend <- buildMutationParser remotesMutationFields allActionInfos customTypes sourcesMutationBackendFields subscriptionParser <- - buildSubscriptionParser sourcesQueryFields allActionInfos customTypes + buildSubscriptionParser subscriptionFields allActionInfos customTypes remotesSubscriptionFields queryParserFrontend <- buildQueryParser sourcesQueryFields remotesQueryFields allActionInfos customTypes mutationParserFrontend subscriptionParser queryParserBackend <- @@ -212,9 +216,15 @@ buildRoleContext options sources remotes allActionInfos customTypes role remoteS -- (since we're running this in parallel in caller, be strict) let !frontendContext = - GQLContext (finalizeParser queryParserFrontend) (finalizeParser <$> mutationParserFrontend) + GQLContext + (finalizeParser queryParserFrontend) + (finalizeParser <$> mutationParserFrontend) + (finalizeParser <$> subscriptionParser) !backendContext = - GQLContext (finalizeParser queryParserBackend) (finalizeParser <$> mutationParserBackend) + GQLContext + (finalizeParser queryParserBackend) + (finalizeParser <$> mutationParserBackend) + (finalizeParser <$> subscriptionParser) pure ( RoleContext frontendContext $ Just backendContext, @@ -230,18 +240,24 @@ buildRoleContext options sources remotes allActionInfos customTypes role remoteS m ( [FieldParser (P.ParseT Identity) (NamespacedField (QueryRootField UnpreparedValue))], [FieldParser (P.ParseT Identity) (NamespacedField (MutationRootField UnpreparedValue))], - [FieldParser (P.ParseT Identity) (NamespacedField (MutationRootField UnpreparedValue))] + [FieldParser (P.ParseT Identity) (NamespacedField (MutationRootField UnpreparedValue))], + [FieldParser (P.ParseT Identity) (NamespacedField (QueryRootField UnpreparedValue))] ) buildSource (SourceInfo sourceName tables functions sourceConfig queryTagsConfig sourceCustomization) = withSourceCustomization sourceCustomization do let validFunctions = takeValidFunctions functions validTables = takeValidTables tables mkTypename <- asks getter - (,,) + uncustomizedQueryFields <- buildQueryFields sourceName sourceConfig validTables validFunctions queryTagsConfig + uncustomizedStreamSubscriptionFields <- + case streamingSubscriptionsCtx of + StreamingSubscriptionsEnabled -> buildTableStreamSubscriptionFields sourceName sourceConfig validTables queryTagsConfig + StreamingSubscriptionsDisabled -> pure mempty + (,,,) <$> customizeFields sourceCustomization (mkTypename <> P.MkTypename (<> G.__query)) - (buildQueryFields sourceName sourceConfig validTables validFunctions queryTagsConfig) + (pure uncustomizedQueryFields) <*> customizeFields sourceCustomization (mkTypename <> P.MkTypename (<> G.__mutation_frontend)) @@ -250,6 +266,10 @@ buildRoleContext options sources remotes allActionInfos customTypes role remoteS sourceCustomization (mkTypename <> P.MkTypename (<> G.__mutation_backend)) (buildMutationFields Backend sourceName sourceConfig validTables validFunctions queryTagsConfig) + <*> customizeFields + sourceCustomization + (mkTypename <> P.MkTypename (<> G.__subscription)) + (pure $ uncustomizedStreamSubscriptionFields <> uncustomizedQueryFields) buildRelayRoleContext :: forall m. @@ -293,7 +313,7 @@ buildRelayRoleContext options sources allActionInfos customTypes role = do mutationParserBackend <- buildMutationParser mempty allActionInfos customTypes mutationBackendFields subscriptionParser <- - buildSubscriptionParser queryPGFields [] customTypes + buildSubscriptionParser queryPGFields [] customTypes [] queryParserFrontend <- queryWithIntrospectionHelper queryPGFields mutationParserFrontend subscriptionParser queryParserBackend <- @@ -314,9 +334,15 @@ buildRelayRoleContext options sources allActionInfos customTypes role = do (P.parserType <$> subscriptionParser) let frontendContext = - GQLContext (finalizeParser queryParserFrontend) (finalizeParser <$> mutationParserFrontend) + GQLContext + (finalizeParser queryParserFrontend) + (finalizeParser <$> mutationParserFrontend) + (finalizeParser <$> subscriptionParser) backendContext = - GQLContext (finalizeParser queryParserBackend) (finalizeParser <$> mutationParserBackend) + GQLContext + (finalizeParser queryParserBackend) + (finalizeParser <$> mutationParserBackend) + (finalizeParser <$> subscriptionParser) pure $ RoleContext frontendContext $ Just backendContext where @@ -389,10 +415,10 @@ unauthenticatedContext allRemotes remoteSchemaPermsCtx = do context {_rscRemoteRelationships = mempty} runMonadSchema fakeRole fakeQueryContext mempty mempty do - (queryFields, mutationFields, remoteErrors) <- case remoteSchemaPermsCtx of + (queryFields, mutationFields, subscriptionFields, remoteErrors) <- case remoteSchemaPermsCtx of RemoteSchemaPermsEnabled -> -- Permissions are enabled, unauthenticated users have access to nothing. - pure ([], [], mempty) + pure ([], [], [], mempty) RemoteSchemaPermsDisabled -> do -- Permissions are disabled, unauthenticated users have access to remote schemas. (remoteFields, remoteSchemaErrors) <- @@ -400,19 +426,24 @@ unauthenticatedContext allRemotes remoteSchemaPermsCtx = do pure ( fmap (fmap RFRemote) <$> concatMap piQuery remoteFields, fmap (fmap RFRemote) <$> concat (mapMaybe piMutation remoteFields), + fmap (fmap RFRemote) <$> concat (mapMaybe piSubscription remoteFields), remoteSchemaErrors ) mutationParser <- whenMaybe (not $ null mutationFields) $ P.safeSelectionSet mutationRoot (Just $ G.Description "mutation root") mutationFields <&> fmap (flattenNamespaces . fmap typenameToNamespacedRawRF) + subscriptionParser <- + whenMaybe (not $ null subscriptionFields) $ + P.safeSelectionSet subscriptionRoot (Just $ G.Description "subscription root") subscriptionFields + <&> fmap (flattenNamespaces . fmap typenameToNamespacedRawRF) queryParser <- queryWithIntrospectionHelper queryFields mutationParser Nothing void $ buildIntrospectionSchema (P.parserType queryParser) (P.parserType <$> mutationParser) - Nothing - pure (GQLContext (finalizeParser queryParser) (finalizeParser <$> mutationParser), remoteErrors) + (P.parserType <$> subscriptionParser) + pure (GQLContext (finalizeParser queryParser) (finalizeParser <$> mutationParser) (finalizeParser <$> subscriptionParser), remoteErrors) ------------------------------------------------------------------------------- -- Building parser fields @@ -510,6 +541,27 @@ buildQueryFields sourceName sourceConfig tables (takeExposedAs FEAQuery -> funct where mkRF = mkRootField sourceName sourceConfig queryTagsConfig QDBR +buildTableStreamSubscriptionFields :: + forall b r m n. + MonadBuildSchema b r m n => + SourceName -> + SourceConfig b -> + TableCache b -> + Maybe QueryTagsConfig -> + m [P.FieldParser n (QueryRootField UnpreparedValue)] +buildTableStreamSubscriptionFields sourceName sourceConfig tables queryTagsConfig = do + tableSelectExpParsers <- for (Map.toList tables) \(tableName, tableInfo) -> do + tableGQLName <- getTableGQLName @b tableInfo + mkRF $ + buildTableStreamingSubscriptionFields + sourceName + tableName + tableInfo + tableGQLName + pure $ concat tableSelectExpParsers + where + mkRF = mkRootField sourceName sourceConfig queryTagsConfig QDBR + buildRelayQueryFields :: forall b r m n. MonadBuildSchema b r m n => @@ -651,10 +703,11 @@ buildSubscriptionParser :: [P.FieldParser n (NamespacedField (QueryRootField UnpreparedValue))] -> [ActionInfo] -> AnnotatedCustomTypes -> + [P.FieldParser n (NamespacedField (RemoteSchemaRootField (RemoteRelationshipField UnpreparedValue) RemoteSchemaVariable))] -> m (Maybe (Parser 'Output n (RootFieldMap (QueryRootField UnpreparedValue)))) -buildSubscriptionParser queryFields allActions customTypes = do +buildSubscriptionParser sourceSubscriptionFields allActions customTypes remoteSubscriptionFields = do actionSubscriptionFields <- fmap (fmap NotNamespaced) . concat <$> traverse (buildActionSubscriptionFields customTypes) allActions - let subscriptionFields = queryFields <> actionSubscriptionFields + let subscriptionFields = sourceSubscriptionFields <> actionSubscriptionFields <> fmap (fmap $ fmap RFRemote) remoteSubscriptionFields whenMaybe (not $ null subscriptionFields) $ P.safeSelectionSet subscriptionRoot Nothing subscriptionFields <&> fmap (flattenNamespaces . fmap typenameToNamespacedRawRF) diff --git a/server/src-lib/Hasura/GraphQL/Schema/Backend.hs b/server/src-lib/Hasura/GraphQL/Schema/Backend.hs index 3be49a2cb93..1846b869613 100644 --- a/server/src-lib/Hasura/GraphQL/Schema/Backend.hs +++ b/server/src-lib/Hasura/GraphQL/Schema/Backend.hs @@ -92,7 +92,13 @@ class TableInfo b -> G.Name -> m [FieldParser n (QueryDB b (RemoteRelationshipField UnpreparedValue) (UnpreparedValue b))] - + buildTableStreamingSubscriptionFields :: + MonadBuildSchema b r m n => + SourceName -> + TableName b -> + TableInfo b -> + G.Name -> + m [FieldParser n (QueryDB b (RemoteRelationshipField UnpreparedValue) (UnpreparedValue b))] buildTableRelayQueryFields :: MonadBuildSchema b r m n => SourceName -> @@ -101,7 +107,6 @@ class G.Name -> NESeq (ColumnInfo b) -> m [FieldParser n (QueryDB b (RemoteRelationshipField UnpreparedValue) (UnpreparedValue b))] - buildTableInsertMutationFields :: MonadBuildSchema b r m n => Scenario -> @@ -181,6 +186,7 @@ class -- backend extensions relayExtension :: Maybe (XRelay b) nodesAggExtension :: Maybe (XNodesAgg b) + streamSubscriptionExtension :: Maybe (XStreamingSubscription b) -- individual components columnParser :: diff --git a/server/src-lib/Hasura/GraphQL/Schema/Build.hs b/server/src-lib/Hasura/GraphQL/Schema/Build.hs index 6b693c97438..94340f448c0 100644 --- a/server/src-lib/Hasura/GraphQL/Schema/Build.hs +++ b/server/src-lib/Hasura/GraphQL/Schema/Build.hs @@ -49,6 +49,7 @@ module Hasura.GraphQL.Schema.Build buildTableDeleteMutationFields, buildTableInsertMutationFields, buildTableQueryFields, + buildTableStreamingSubscriptionFields, buildTableUpdateMutationFields, ) where @@ -60,6 +61,7 @@ import Hasura.GraphQL.Schema.Backend (MonadBuildSchema) import Hasura.GraphQL.Schema.Common import Hasura.GraphQL.Schema.Mutation import Hasura.GraphQL.Schema.Select +import Hasura.GraphQL.Schema.SubscriptionStream (selectStreamTable) import Hasura.GraphQL.Schema.Update (updateTable, updateTableByPk) import Hasura.Prelude import Hasura.RQL.IR @@ -96,6 +98,24 @@ buildTableQueryFields sourceName tableName tableInfo gqlName = do defaultSelectAggDesc = "fetch aggregated fields from the table: " <>> tableName TableCustomRootFields {..} = _tcCustomRootFields . _tciCustomConfig $ _tiCoreInfo tableInfo +buildTableStreamingSubscriptionFields :: + forall b r m n. + MonadBuildSchema b r m n => + SourceName -> + TableName b -> + TableInfo b -> + G.Name -> + m [FieldParser n (QueryDB b (RemoteRelationshipField UnpreparedValue) (UnpreparedValue b))] +buildTableStreamingSubscriptionFields sourceName tableName tableInfo gqlName = do + let customRootFields = _tcCustomRootFields $ _tciCustomConfig $ _tiCoreInfo tableInfo + selectDesc = Just $ G.Description $ "fetch data from the table in a streaming manner : " <>> tableName + selectStreamName <- + mkRootFieldName $ (fromMaybe gqlName $ _crfName $ _tcrfSelect customRootFields) <> G._stream + catMaybes + <$> sequenceA + [ optionalFieldParser QDBStreamMultipleRows $ selectStreamTable sourceName tableInfo selectStreamName selectDesc + ] + buildTableInsertMutationFields :: forall b r m n. MonadBuildSchema b r m n => @@ -244,7 +264,6 @@ buildFunctionMutationFields :: m [FieldParser n (MutationDB b (RemoteRelationshipField UnpreparedValue) (UnpreparedValue b))] buildFunctionMutationFields sourceName functionName functionInfo tableName = do let funcDesc = Just $ G.Description $ "execute VOLATILE function " <> functionName <<> " which returns " <>> tableName - jsonAggSelect = _fiJsonAggSelect functionInfo catMaybes <$> sequenceA diff --git a/server/src-lib/Hasura/GraphQL/Schema/Common.hs b/server/src-lib/Hasura/GraphQL/Schema/Common.hs index e9d8155f1d7..d9b91978e61 100644 --- a/server/src-lib/Hasura/GraphQL/Schema/Common.hs +++ b/server/src-lib/Hasura/GraphQL/Schema/Common.hs @@ -13,7 +13,9 @@ module Hasura.GraphQL.Schema.Common QueryContext (..), Scenario (..), SelectArgs, + SelectStreamArgs, SelectExp, + StreamSelectExp, TablePerms, askTableInfo, comparisonAggOperators, @@ -77,12 +79,16 @@ type MonadBuildSchemaBase r m n = type SelectExp b = IR.AnnSimpleSelectG b (IR.RemoteRelationshipField P.UnpreparedValue) (P.UnpreparedValue b) +type StreamSelectExp b = IR.AnnSimpleStreamSelectG b (IR.RemoteRelationshipField P.UnpreparedValue) (P.UnpreparedValue b) + type AggSelectExp b = IR.AnnAggregateSelectG b (IR.RemoteRelationshipField P.UnpreparedValue) (P.UnpreparedValue b) type ConnectionSelectExp b = IR.ConnectionSelect b (IR.RemoteRelationshipField P.UnpreparedValue) (P.UnpreparedValue b) type SelectArgs b = IR.SelectArgsG b (P.UnpreparedValue b) +type SelectStreamArgs b = IR.SelectStreamArgsG b (P.UnpreparedValue b) + type TablePerms b = IR.TablePermG b (P.UnpreparedValue b) type AnnotatedFields b = IR.AnnFieldsG b (IR.RemoteRelationshipField P.UnpreparedValue) (P.UnpreparedValue b) diff --git a/server/src-lib/Hasura/GraphQL/Schema/SubscriptionStream.hs b/server/src-lib/Hasura/GraphQL/Schema/SubscriptionStream.hs new file mode 100644 index 00000000000..1678ee4b174 --- /dev/null +++ b/server/src-lib/Hasura/GraphQL/Schema/SubscriptionStream.hs @@ -0,0 +1,254 @@ +{-# LANGUAGE ApplicativeDo #-} +{-# LANGUAGE TemplateHaskellQuotes #-} + +-- | Generate the GraphQL schema types related to streaming subscriptions. +module Hasura.GraphQL.Schema.SubscriptionStream + ( selectStreamTable, + ) +where + +import Data.Has +import Data.List.NonEmpty qualified as NE +import Data.Text.Extended ((<>>)) +import Hasura.Base.Error (QErr) +import Hasura.GraphQL.Parser + ( InputFieldsParser, + Kind (..), + Parser, + ) +import Hasura.GraphQL.Parser qualified as P +import Hasura.GraphQL.Parser.Class +import Hasura.GraphQL.Parser.Constants qualified as G +import Hasura.GraphQL.Schema.Backend +import Hasura.GraphQL.Schema.Common +import Hasura.GraphQL.Schema.Select (tablePermissionsInfo, tableSelectionList, tableWhereArg) +import Hasura.GraphQL.Schema.Table (getTableGQLName, tableSelectColumns, tableSelectPermissions) +import Hasura.Prelude +import Hasura.RQL.IR.Select qualified as IR +import Hasura.RQL.Types +import Language.GraphQL.Draft.Syntax qualified as G + +-- | Argument to limit the maximum number of results returned in a single batch. +cursorBatchSizeArg :: + forall n. + MonadParse n => + InputFieldsParser n Int +cursorBatchSizeArg = + fromIntegral + <$> P.field batchSizeName batchSizeDesc P.nonNegativeInt + where + batchSizeName = G._batch_size + batchSizeDesc = Just $ G.Description "maximum number of rows returned in a single batch" + +-- | Cursor ordering enum fields +-- +-- > enum cursor_ordering { +-- > ASC +-- > DESC +-- > } +cursorOrderingArgParser :: + forall n m r. + (MonadSchema n m, Has P.MkTypename r, MonadReader r m) => + m (Parser 'Both n CursorOrdering) +cursorOrderingArgParser = do + enumName <- P.mkTypename G._cursor_ordering + let description = + Just $ + G.Description $ + "ordering argument of a cursor" + pure $ + P.enum enumName description $ + NE.fromList -- It's fine to use fromList here because we know the list is never empty. + [ ( define enumNameVal, + snd enumNameVal + ) + | enumNameVal <- [(G._ASC, COAscending), (G._DESC, CODescending)] + ] + where + define (name, val) = + let orderingTypeDesc = bool "descending" "ascending" $ val == COAscending + in P.Definition name (Just $ G.Description $ orderingTypeDesc <> " ordering of the cursor") P.EnumValueInfo + +-- | Argument to specify the ordering of the cursor. +-- > ordering: cursor_ordering +cursorOrderingArg :: + forall n m r. + (MonadSchema n m, Has P.MkTypename r, MonadReader r m) => + m (InputFieldsParser n (Maybe CursorOrdering)) +cursorOrderingArg = do + cursorOrderingParser' <- cursorOrderingArgParser + pure do + P.fieldOptional G._ordering (Just $ G.Description "cursor ordering") cursorOrderingParser' + +-- | Input fields parser to parse the value of a table's column +-- > column_name: column_type +streamColumnParserArg :: + forall b n m r. + (BackendSchema b, MonadSchema n m, Has P.MkTypename r, MonadReader r m, MonadError QErr m) => + ColumnInfo b -> + m (InputFieldsParser n (Maybe (ColumnInfo b, ColumnValue b))) +streamColumnParserArg colInfo = do + fieldParser <- typedParser colInfo + let fieldName = ciName colInfo + fieldDesc = ciDescription colInfo + pure do + P.fieldOptional fieldName fieldDesc fieldParser <&> fmap (colInfo,) + where + typedParser columnInfo = do + fmap P.openValueOrigin <$> columnParser (ciType columnInfo) (G.Nullability $ ciIsNullable columnInfo) + +-- | Input object parser whose keys are the column names and the values are the +-- initial values of those columns from where the streaming should start. +-- > input table_stream_cursor_value_input { +-- > col1: col1_type +-- > col2: col2_type +-- ... +-- > } +streamColumnValueParser :: + forall b n m r. + (BackendSchema b, MonadSchema n m, Has P.MkTypename r, MonadReader r m, MonadError QErr m) => + SourceName -> + G.Name -> + [ColumnInfo b] -> + m (Parser 'Input n [(ColumnInfo b, ColumnValue b)]) +streamColumnValueParser sourceName tableGQLName colInfos = + memoizeOn 'streamColumnValueParser (sourceName, tableGQLName) $ do + columnVals <- sequenceA <$> traverse streamColumnParserArg colInfos + objName <- P.mkTypename $ tableGQLName <> G.__stream_cursor_value_input + pure do + let description = G.Description $ "Initial value of the column from where the streaming should start" + P.object objName (Just description) columnVals <&> catMaybes + +-- | Argument to accept the initial value from where the streaming should start. +-- > initial_value: table_stream_cursor_value_input! +streamColumnValueParserArg :: + forall b n m r. + ( BackendSchema b, + MonadSchema n m, + Has P.MkTypename r, + MonadReader r m, + MonadError QErr m + ) => + SourceName -> + G.Name -> + [ColumnInfo b] -> + m (InputFieldsParser n [(ColumnInfo b, ColumnValue b)]) +streamColumnValueParserArg sourceName tableGQLName colInfos = do + columnValueParser <- streamColumnValueParser sourceName tableGQLName colInfos + pure do + P.field G._initial_value (Just $ G.Description "Stream column input with initial value") columnValueParser + +-- | Argument to accept the cursor data. At the time of writing this, only a single +-- column cursor is supported and if multiple column cursors are provided, +-- then a parse error is thrown. +-- > +tableStreamColumnArg :: + forall n m r b. + (BackendSchema b, MonadSchema n m, Has P.MkTypename r, MonadReader r m, MonadError QErr m) => + SourceName -> + G.Name -> + [ColumnInfo b] -> + m (InputFieldsParser n [IR.StreamCursorItem b]) +tableStreamColumnArg sourceName tableGQLName colInfos = do + cursorOrderingParser <- cursorOrderingArg + streamColumnParser <- streamColumnValueParserArg sourceName tableGQLName colInfos + pure $ do + orderingArg <- cursorOrderingParser + columnArg <- streamColumnParser + pure $ (uncurry (IR.StreamCursorItem (fromMaybe COAscending orderingArg))) <$> columnArg + +-- | Input object that contains the initial value of a column +-- along with how it needs to be ordered. +-- > input table_stream_cursor_input { +-- > initial_value: table_stream_cursor_value_input! +-- > ordering: cursor_ordering +-- > } +tableStreamCursorExp :: + forall m n r b. + MonadBuildSchema b r m n => + SourceName -> + TableInfo b -> + m (Parser 'Input n [(IR.StreamCursorItem b)]) +tableStreamCursorExp sourceName tableInfo = + memoizeOn 'tableStreamCursorExp (sourceName, tableInfoName tableInfo) $ do + tableGQLName <- getTableGQLName tableInfo + columnInfos <- tableSelectColumns sourceName tableInfo + objName <- P.mkTypename $ tableGQLName <> G.__stream_cursor_input + let description = + G.Description $ "Streaming cursor of the table " <>> tableGQLName + columnParsers <- tableStreamColumnArg sourceName tableGQLName columnInfos + pure $ P.object objName (Just description) columnParsers + +-- | Argument to accept the cursor input object. +-- > cursor: [table_stream_cursor_input]! +tableStreamCursorArg :: + forall b r m n. + MonadBuildSchema b r m n => + SourceName -> + TableInfo b -> + m (InputFieldsParser n [IR.StreamCursorItem b]) +tableStreamCursorArg sourceName tableInfo = do + cursorParser <- tableStreamCursorExp sourceName tableInfo + pure $ do + cursorArgs <- + P.field cursorName cursorDesc $ P.list $ P.nullable cursorParser + pure $ concat $ catMaybes cursorArgs + where + cursorName = G._cursor + cursorDesc = Just $ G.Description "cursor to stream the results returned by the query" + +-- | Arguments to the streaming subscription field. +-- > table_stream (cursor: [table_stream_cursor_input]!, batch_size: Int!, where: table_bool_exp) +tableStreamArguments :: + forall b r m n. + MonadBuildSchema b r m n => + SourceName -> + TableInfo b -> + m (InputFieldsParser n (SelectStreamArgs b)) +tableStreamArguments sourceName tableInfo = do + whereParser <- tableWhereArg sourceName tableInfo + cursorParser <- tableStreamCursorArg sourceName tableInfo + pure $ do + whereArg <- whereParser + cursorArg <- + cursorParser `P.bindFields` \case + [] -> parseError "one streaming column field is expected" + [c] -> pure c + _ -> parseError "multiple column cursors are not supported yet" + batchSizeArg <- cursorBatchSizeArg + pure $ + IR.SelectStreamArgsG whereArg batchSizeArg cursorArg + +-- | Field parser for a streaming subscription for a table. +selectStreamTable :: + forall b r m n. + MonadBuildSchema b r m n => + SourceName -> + -- | table info + TableInfo b -> + -- | field display name + G.Name -> + -- | field description, if any + Maybe G.Description -> + m (Maybe (P.FieldParser n (StreamSelectExp b))) +selectStreamTable sourceName tableInfo fieldName description = runMaybeT $ do + selectPermissions <- MaybeT $ tableSelectPermissions tableInfo + xStreamSubscription <- hoistMaybe $ streamSubscriptionExtension @b + stringifyNum <- asks $ qcStringifyNum . getter + tableStreamArgsParser <- lift $ tableStreamArguments sourceName tableInfo + selectionSetParser <- MaybeT $ tableSelectionList sourceName tableInfo + lift $ + memoizeOn 'selectStreamTable (sourceName, tableName, fieldName) $ do + pure $ + P.subselection fieldName description tableStreamArgsParser selectionSetParser + <&> \(args, fields) -> + IR.AnnSelectStreamG + { IR._assnXStreamingSubscription = xStreamSubscription, + IR._assnFields = fields, + IR._assnFrom = IR.FromTable tableName, + IR._assnPerm = tablePermissionsInfo selectPermissions, + IR._assnArgs = args, + IR._assnStrfyNum = stringifyNum + } + where + tableName = tableInfoName tableInfo diff --git a/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs b/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs index 519e4a22ad8..0d3755ca1ef 100644 --- a/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs +++ b/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs @@ -92,7 +92,7 @@ import Network.HTTP.Types qualified as HTTP import Network.WebSockets qualified as WS import StmContainers.Map qualified as STMMap --- | 'ES.LiveQueryId' comes from 'Hasura.GraphQL.Execute.LiveQuery.State.addLiveQuery'. We use +-- | 'ES.SubscriberDetails' comes from 'Hasura.GraphQL.Execute.LiveQuery.State.addLiveQuery'. We use -- this to track a connection's operations so we can remove them from 'LiveQueryState', and -- log. -- @@ -224,7 +224,7 @@ sendCloseWithMsg :: m () sendCloseWithMsg logger wsConn errCode mErrServerMsg mCode = do case mErrServerMsg of - Just errServerMsg -> do + Just errServerMsg -> sendMsg wsConn errServerMsg Nothing -> pure () logWSEvent logger wsConn EClosed @@ -287,7 +287,7 @@ onConn wsId requestHead ipAddress onConnHActions = do -- NOTE: the "Keep-Alive" delay is something that's mentioned -- in the Apollo spec. For 'graphql-ws', we're using the Ping -- messages that are part of the spec. - keepAliveAction keepAliveDelay wsConn = do + keepAliveAction keepAliveDelay wsConn = liftIO $ forever $ do kaAction wsConn @@ -518,7 +518,7 @@ onStart env enabledLogTypes serverEnv wsConn (StartMsg opId q) onMessageActions sendResultFromFragments Telem.Query timerTot requestId conclusion opName parameterizedQueryHash case conclusion of Left _ -> pure () - Right results -> do + Right results -> -- Note: The result of cacheStore is ignored here since we can't ensure that -- the WS client will respond correctly to multiple messages. void $ @@ -664,7 +664,7 @@ onStart env enabledLogTypes serverEnv wsConn (StartMsg opId q) onMessageActions fmtErrorMessage = WS._wsaErrorMsgFormat onMessageActions getExecStepActionWithActionInfo acc execStep = case execStep of - E.ExecStepAction _ actionInfo _remoteJoins -> (actionInfo : acc) + E.ExecStepAction _ actionInfo _remoteJoins -> actionInfo : acc _ -> acc doQErr :: @@ -682,7 +682,7 @@ onStart env enabledLogTypes serverEnv wsConn (StartMsg opId q) onMessageActions ExceptT f n a withErr embed f action = do res <- runExceptT $ f $ lift action - onLeft res (\e -> throwError $ embed e) + onLeft res (throwError . embed) forWithKey = flip OMap.traverseWithKey @@ -950,7 +950,7 @@ onPing wsConn mPayload = onPong :: (MonadIO m) => WSConn -> Maybe PingPongPayload -> m () onPong wsConn mPayload = liftIO $ case mPayload of - Just message -> do + Just message -> when (message /= keepAliveMessage) $ sendMsg wsConn (SMPing mPayload) -- NOTE: this is done to avoid sending Ping for every "keepalive" that the server sends diff --git a/server/src-lib/Hasura/RQL/DDL/Schema/Table.hs b/server/src-lib/Hasura/RQL/DDL/Schema/Table.hs index 8fc10b60301..610dd52b6f4 100644 --- a/server/src-lib/Hasura/RQL/DDL/Schema/Table.hs +++ b/server/src-lib/Hasura/RQL/DDL/Schema/Table.hs @@ -129,7 +129,7 @@ checkConflictingNode :: Text -> m () checkConflictingNode sc tnGQL = do - let GQLContext queryParser _ = scUnauthenticatedGQLContext sc + let GQLContext queryParser _ _ = scUnauthenticatedGQLContext sc -- { -- __schema { -- queryType { diff --git a/server/src-lib/Hasura/RQL/IR/Root.hs b/server/src-lib/Hasura/RQL/IR/Root.hs index ef272315050..7b8f91d492e 100644 --- a/server/src-lib/Hasura/RQL/IR/Root.hs +++ b/server/src-lib/Hasura/RQL/IR/Root.hs @@ -6,7 +6,6 @@ module Hasura.RQL.IR.Root ActionMutation (..), QueryRootField, MutationRootField, - SubscriptionRootField, QueryDBRoot (..), MutationDBRoot (..), RemoteRelationshipField (..), @@ -98,10 +97,3 @@ type MutationRootField v = (RemoteSchemaRootField (RemoteRelationshipField v) RQL.RemoteSchemaVariable) (MutationActionRoot v) JO.Value - -type SubscriptionRootField v = - RootField - (QueryDBRoot (RemoteRelationshipField v) v) - Void - Void - Void diff --git a/server/src-lib/Hasura/Server/API/Config.hs b/server/src-lib/Hasura/Server/API/Config.hs index 5c9ba3d6fd1..bce3a91708b 100644 --- a/server/src-lib/Hasura/Server/API/Config.hs +++ b/server/src-lib/Hasura/Server/API/Config.hs @@ -40,6 +40,7 @@ data ServerConfig = ServerConfig scfgJwt :: ![JWTInfo], scfgIsAllowListEnabled :: !Bool, scfgLiveQueries :: !ES.LiveQueriesOptions, + scfgStreamingQueries :: !ES.SubscriptionsOptions, scfgConsoleAssetsDir :: !(Maybe Text), scfgExperimentalFeatures :: !(Set.HashSet ExperimentalFeature) } @@ -53,6 +54,7 @@ runGetConfig :: AuthMode -> Bool -> ES.LiveQueriesOptions -> + ES.SubscriptionsOptions -> Maybe Text -> Set.HashSet ExperimentalFeature -> ServerConfig @@ -62,6 +64,7 @@ runGetConfig am isAllowListEnabled liveQueryOpts + streamQueryOpts consoleAssetsDir experimentalFeatures = ServerConfig @@ -74,6 +77,7 @@ runGetConfig (getJWTInfo am) isAllowListEnabled liveQueryOpts + streamQueryOpts consoleAssetsDir experimentalFeatures diff --git a/server/src-lib/Hasura/Server/App.hs b/server/src-lib/Hasura/Server/App.hs index ddbd2b0837b..7b45840049f 100644 --- a/server/src-lib/Hasura/Server/App.hs +++ b/server/src-lib/Hasura/Server/App.hs @@ -404,7 +404,15 @@ v1QueryHandler query = do experimentalFeatures <- asks (scExperimentalFeatures . hcServerCtx) eventingMode <- asks (scEventingMode . hcServerCtx) readOnlyMode <- asks (scEnableReadOnlyMode . hcServerCtx) - let serverConfigCtx = ServerConfigCtx functionPermsCtx remoteSchemaPermsCtx sqlGenCtx maintenanceMode experimentalFeatures eventingMode readOnlyMode + let serverConfigCtx = + ServerConfigCtx + functionPermsCtx + remoteSchemaPermsCtx + sqlGenCtx + maintenanceMode + experimentalFeatures + eventingMode + readOnlyMode runQuery env logger @@ -495,7 +503,16 @@ v2QueryHandler query = do maintenanceMode <- asks (scEnableMaintenanceMode . hcServerCtx) eventingMode <- asks (scEventingMode . hcServerCtx) readOnlyMode <- asks (scEnableReadOnlyMode . hcServerCtx) - let serverConfigCtx = ServerConfigCtx functionPermsCtx remoteSchemaPermsCtx sqlGenCtx maintenanceMode experimentalFeatures eventingMode readOnlyMode + let serverConfigCtx = + ServerConfigCtx + functionPermsCtx + remoteSchemaPermsCtx + sqlGenCtx + maintenanceMode + experimentalFeatures + eventingMode + readOnlyMode + V2Q.runQuery env instanceId userInfo schemaCache httpMgr serverConfigCtx query v1Alpha1GQHandler :: @@ -676,6 +693,7 @@ configApiGetHandler serverCtx@ServerCtx {..} consoleAssetsDir = scAuthMode scEnableAllowlist (ES._ssLiveQueryOptions $ scSubscriptionState) + (ES._ssStreamQueryOptions $ scSubscriptionState) consoleAssetsDir scExperimentalFeatures return (emptyHttpLogMetadata @m, JSONResp $ HttpResponse (encJFromJValue res) []) @@ -793,6 +811,7 @@ mkWaiApp postPollHook = fromMaybe (ES.defaultSubscriptionPostPollHook logger) liveQueryHook subscriptionsState <- liftIO $ ES.initSubscriptionsState lqOpts streamQOpts postPollHook + wsServerEnv <- WS.createWSServerEnv logger diff --git a/server/src-lib/Hasura/Server/Init/Config.hs b/server/src-lib/Hasura/Server/Init/Config.hs index b644c5491da..75e99515bfc 100644 --- a/server/src-lib/Hasura/Server/Init/Config.hs +++ b/server/src-lib/Hasura/Server/Init/Config.hs @@ -378,8 +378,12 @@ readExperimentalFeatures = mapM readAPI . T.splitOn "," . T.pack where readAPI si = case T.toLower $ T.strip si of "inherited_roles" -> Right EFInheritedRoles + "streaming_subscriptions" -> Right EFStreamingSubscriptions "optimize_permission_filters" -> Right EFOptimizePermissionFilters - _ -> Left "Only expecting list of comma separated experimental features" + _ -> + Left $ + "Only expecting list of comma separated experimental features, options are:" + ++ "inherited_roles, streaming_subscriptions, optimize_permission_filters" readLogLevel :: String -> Either String L.LogLevel readLogLevel s = case T.toLower $ T.strip $ T.pack s of diff --git a/server/src-lib/Hasura/Server/Types.hs b/server/src-lib/Hasura/Server/Types.hs index 1cec01ed6cb..8c4aeb4dbeb 100644 --- a/server/src-lib/Hasura/Server/Types.hs +++ b/server/src-lib/Hasura/Server/Types.hs @@ -7,6 +7,7 @@ module Hasura.Server.Types PGVersion (PGVersion), RequestId (..), ServerConfigCtx (..), + StreamingSubscriptionsCtx (..), HasServerConfigCtx (..), getRequestId, ) @@ -48,6 +49,7 @@ newtype InstanceId = InstanceId {getInstanceId :: Text} data ExperimentalFeature = EFInheritedRoles | EFOptimizePermissionFilters + | EFStreamingSubscriptions deriving (Show, Eq, Generic) instance Hashable ExperimentalFeature @@ -56,12 +58,14 @@ instance FromJSON ExperimentalFeature where parseJSON = withText "ExperimentalFeature" $ \case "inherited_roles" -> pure EFInheritedRoles "optimize_permission_filters" -> pure EFOptimizePermissionFilters - _ -> fail "ExperimentalFeature can only be one of these value: inherited_roles, optimize_permission_filters" + "streaming_subscriptions" -> pure EFStreamingSubscriptions + _ -> fail "ExperimentalFeature can only be one of these value: inherited_roles, optimize_permission_filters or streaming_subscriptions" instance ToJSON ExperimentalFeature where toJSON = \case EFInheritedRoles -> "inherited_roles" EFOptimizePermissionFilters -> "optimize_permission_filters" + EFStreamingSubscriptions -> "streaming_subscriptions" data MaintenanceMode = MaintenanceModeEnabled | MaintenanceModeDisabled deriving (Show, Eq) @@ -74,6 +78,9 @@ instance FromJSON MaintenanceMode where instance ToJSON MaintenanceMode where toJSON = Bool . (== MaintenanceModeEnabled) +data StreamingSubscriptionsCtx = StreamingSubscriptionsEnabled | StreamingSubscriptionsDisabled + deriving (Show, Eq) + -- | See Note [ReadOnly Mode] data ReadOnlyMode = ReadOnlyModeEnabled | ReadOnlyModeDisabled deriving (Show, Eq) diff --git a/server/tests-py/conftest.py b/server/tests-py/conftest.py index 009e1655a74..7d4a8d779dd 100644 --- a/server/tests-py/conftest.py +++ b/server/tests-py/conftest.py @@ -103,6 +103,14 @@ def pytest_addoption(parser): help="Run manual function permission tests" ) + parser.addoption( + "--test-streaming-subscriptions", + action="store_true", + default=False, + required=False, + help="Run streaming subscription tests" + ) + parser.addoption( "--test-jwk-url", action="store_true", @@ -373,6 +381,12 @@ use_function_permission_fixtures = pytest.mark.usefixtures( 'functions_permissions_fixtures' ) +@pytest.fixture(scope='class') +def streaming_subscriptions_fixtures(hge_ctx): + if not hge_ctx.streaming_subscriptions: + pytest.skip('These tests are meant to be run with --test-streaming-subscriptions set with pytest') + return + @pytest.fixture(scope='class') def pro_tests_fixtures(hge_ctx): if not hge_ctx.pro_tests: diff --git a/server/tests-py/context.py b/server/tests-py/context.py index 029bb7b61d9..d27b5983dc8 100644 --- a/server/tests-py/context.py +++ b/server/tests-py/context.py @@ -779,6 +779,7 @@ class HGECtx: self.metadata_disabled = config.getoption('--test-metadata-disabled') self.may_skip_test_teardown = False self.function_permissions = config.getoption('--test-function-permissions') + self.streaming_subscriptions = config.getoption('--test-streaming-subscriptions') # This will be GC'd, but we also explicitly dispose() in teardown() self.engine = create_engine(self.pg_url) diff --git a/server/tests-py/queries/remote_schemas/tbls_setup.yaml b/server/tests-py/queries/remote_schemas/tbls_setup.yaml index 7fd5b756ab4..9f191151406 100644 --- a/server/tests-py/queries/remote_schemas/tbls_setup.yaml +++ b/server/tests-py/queries/remote_schemas/tbls_setup.yaml @@ -9,12 +9,21 @@ args: code TEXT, name TEXT ); + CREATE TABLE authors ( + id SERIAL PRIMARY KEY, + name TEXT + ); - type: track_table args: schema: public name: hello +- type: track_table + args: + schema: public + name: authors + - type: add_remote_schema args: name: simple2-graphql diff --git a/server/tests-py/queries/remote_schemas/tbls_teardown.yaml b/server/tests-py/queries/remote_schemas/tbls_teardown.yaml index b62d28e2f58..4cc3de85a0f 100644 --- a/server/tests-py/queries/remote_schemas/tbls_teardown.yaml +++ b/server/tests-py/queries/remote_schemas/tbls_teardown.yaml @@ -3,9 +3,9 @@ args: - type: run_sql args: sql: | - drop table hello + drop table hello; + drop table authors; - type: remove_remote_schema args: name: simple2-graphql - diff --git a/server/tests-py/queries/subscriptions/streaming/setup.yaml b/server/tests-py/queries/subscriptions/streaming/setup.yaml new file mode 100644 index 00000000000..dc815bef130 --- /dev/null +++ b/server/tests-py/queries/subscriptions/streaming/setup.yaml @@ -0,0 +1,39 @@ +type: bulk +args: + +- type: run_sql + args: + sql: | + create table hge_tests.test_t2( + c1 int, + c2 text, + created_at timestamptz default now() + ); + create table hge_tests.articles( + id serial primary key, + content text, + title text, + is_public bool default false + ); + + +- type: track_table + args: + schema: hge_tests + name: test_t2 +- type: track_table + args: + schema: hge_tests + name: articles +- type: create_select_permission + args: + table: + schema: hge_tests + name: articles + role: public + permission: + columns: + - title + - content + filter: + is_public: true diff --git a/server/tests-py/queries/subscriptions/streaming/steps.yaml b/server/tests-py/queries/subscriptions/streaming/steps.yaml new file mode 100644 index 00000000000..ab439ebc1ab --- /dev/null +++ b/server/tests-py/queries/subscriptions/streaming/steps.yaml @@ -0,0 +1,91 @@ +- + description: Insert mutation 1 + name: insert_hge_tests_test_t2 + query: | + mutation insert_articles($objects: [hge_tests_test_t2_insert_input!]! ) { + insert_hge_tests_test_t2(objects: $objects) { + returning { + c1 + c2 + } + } + } + variables: | + { + "objects": [ + { + "c1": 1, + "c2": "test1" + } + ] + } + response: | + { + "insert_hge_tests_test_t2": { + "returning": [ + { + "c1": 1, + "c2": "test1" + } + ] + } + } + stream_response: | + [ + { + "c1": 1, + "c2": "test1" + } + ] + +- + description: Insert mutation 1 + name: insert_hge_tests_test_t2 + query: | + mutation insert_articles($objects: [hge_tests_test_t2_insert_input!]! ) { + insert_hge_tests_test_t2(objects: $objects) { + returning { + c1 + c2 + } + } + } + variables: | + { + "objects": [ + { + "c1": 2, + "c2": "test2" + }, + { + "c1": 3, + "c2": "test3" + } + ] + } + response: | + { + "insert_hge_tests_test_t2": { + "returning": [ + { + "c1": 2, + "c2": "test2" + }, + { + "c1": 3, + "c2": "test3" + } + ] + } + } + stream_response: | + [ + { + "c1": 2, + "c2": "test2" + }, + { + "c1": 3, + "c2": "test3" + } + ] diff --git a/server/tests-py/queries/subscriptions/streaming/teardown.yaml b/server/tests-py/queries/subscriptions/streaming/teardown.yaml new file mode 100644 index 00000000000..b13bdff072c --- /dev/null +++ b/server/tests-py/queries/subscriptions/streaming/teardown.yaml @@ -0,0 +1,7 @@ +type: bulk +args: +- type: run_sql + args: + sql: | + drop table hge_tests.test_t2; + drop table hge_tests.articles; diff --git a/server/tests-py/test_events.py b/server/tests-py/test_events.py index dffe87c6b43..365ea39baf3 100644 --- a/server/tests-py/test_events.py +++ b/server/tests-py/test_events.py @@ -5,6 +5,7 @@ import queue import time import json import utils +from utils import * from validate import check_query, check_query_f, check_event, check_event_transformed, check_events usefixtures = pytest.mark.usefixtures @@ -27,47 +28,6 @@ def select_last_event_fromdb(hge_ctx): st_code, resp = hge_ctx.v1q(q) return st_code, resp - -def insert(hge_ctx, table, row, returning=[], headers = {}): - return insert_many(hge_ctx, table, [row], returning, headers) - -def insert_many(hge_ctx, table, rows, returning=[], headers = {}): - q = { - "type": "insert", - "args": { - "table": table, - "objects": rows, - "returning": returning - } - } - st_code, resp = hge_ctx.v1q(q, headers = headers) - return st_code, resp - - -def update(hge_ctx, table, where_exp, set_exp, headers = {}): - q = { - "type": "update", - "args": { - "table": table, - "where": where_exp, - "$set": set_exp - } - } - st_code, resp = hge_ctx.v1q(q, headers = headers) - return st_code, resp - - -def delete(hge_ctx, table, where_exp, headers = {}): - q = { - "type": "delete", - "args": { - "table": table, - "where": where_exp - } - } - st_code, resp = hge_ctx.v1q(q, headers = headers) - return st_code, resp - def insert_mutation(hge_ctx, table, row, headers = {}): return insert_many_mutation(hge_ctx, table, [row], headers) @@ -81,7 +41,7 @@ def insert_many_mutation(hge_ctx, table, rows, headers = {}): else: insert_value_type = table["name"] + "_" + "insert" + "_" + "input" insert_mutation_field = "insert" + "_" + table["name"] - + insert_mutation_query = """ mutation {mutation_name}($values: [{insert_value_type}!]!) {{ {insert_mutation_field}(objects: $values) {{ @@ -89,10 +49,10 @@ def insert_many_mutation(hge_ctx, table, rows, headers = {}): }} }} """.format(mutation_name = mutation_name, insert_value_type = insert_value_type, insert_mutation_field = insert_mutation_field ) - + variables = {'values': rows} graphql_query = {'query': insert_mutation_query, 'variables': variables} - + st_code, resp = hge_ctx.v1graphqlq(graphql_query, headers = headers) return st_code, resp @@ -104,7 +64,7 @@ def update_mutation(hge_ctx, table, where_exp, set_exp, headers = {}): update_mutation_field = "update" + "_" + table["schema"] +"_" + table["name"] else: update_mutation_field = "update" + "_" + table["name"] - + update_mutation_query = """ mutation {mutation_name} {{ {update_mutation_field}(where: {where_exp}, _set: {set_exp}) {{ @@ -115,7 +75,7 @@ def update_mutation(hge_ctx, table, where_exp, set_exp, headers = {}): update_mutation_field = update_mutation_field, where_exp = where_exp, set_exp = set_exp) - + print("--- UPDATE MUTATION QUERY ---- \n", update_mutation_query) graphql_query = {'query': update_mutation_query} @@ -132,7 +92,7 @@ def delete_mutation(hge_ctx, table, where_exp, headers = {}): delete_mutation_field = "delete" + "_" + table["schema"] +"_" + table["name"] else: delete_mutation_field = "delete" + "_" + table["name"] - + delete_mutation_query = """ mutation {mutation_name} {{ {delete_mutation_field}(where: {where_exp}) {{ @@ -142,7 +102,7 @@ def delete_mutation(hge_ctx, table, where_exp, headers = {}): """.format(mutation_name = mutation_name, delete_mutation_field = delete_mutation_field, where_exp = where_exp) - + print("--- DELETE MUTATION QUERY ---- \n", delete_mutation_query) graphql_query = {'query': delete_mutation_query} @@ -187,7 +147,7 @@ class TestEventCreateAndDeleteMSSQL: def test_create_reset(self, hge_ctx): check_query_f(hge_ctx, self.dir() + "/create_and_reset_mssql.yaml") - + table = {"schema": "hge_tests", "name": "test_t1"} init_row = {"c1": 1, "c2": "world"} st_code, resp = insert_mutation(hge_ctx, table, init_row) @@ -272,7 +232,7 @@ class TestEventFloodCommon(object): } } st, resp = hge_ctx.v2q(locked_counts) - + assert st == 200, resp # Make sure we have 2*HASURA_GRAPHQL_EVENTS_FETCH_BATCH_SIZE events checked out: # - 100 prefetched @@ -283,7 +243,7 @@ class TestEventFloodCommon(object): assert resp['result'][1][0] == '200' elif (hge_ctx.backend == "mssql"): assert resp['result'][1][0] == 200 - + # Rather than sleep arbitrarily, loop until assertions pass: utils.until_asserts_pass(30, check_backpressure) # ...then make sure we're truly stable: @@ -384,7 +344,7 @@ class TestEventDataFormatBigIntMSSQL(object): "new": {"id": 50755254975729665, "name": "hello"} } - # TODO: Naveen: Insert mutation on big int values in MSSQL source + # TODO: Naveen: Insert mutation on big int values in MSSQL source # does not work as of now, hence using 'run_sql' to directly insert rows # and trigger the event trigger. When they are supported in future, we # might wanna use the insert_mutation here for consistency. @@ -396,7 +356,7 @@ class TestEventDataFormatBigIntMSSQL(object): "sql":''' INSERT INTO hge_tests.test_bigint ([id], [name]) VALUES (50755254975729665, 'hello') ''' - } + } } st_code, resp = hge_ctx.v2q(insert_bigint_sql) print("----------- resp ----------\n", resp) @@ -440,7 +400,7 @@ class TestCreateEventQueryCommon(object): #assert st_code == 400, resp check_event(hge_ctx, evts_webhook, "t1_all", table, "INSERT", exp_ev_data) assert st_code == 200, resp - + # Check Update Event Trigger Payload if (hge_ctx.backend == "postgres"): where_exp = {"c1": 1} @@ -450,7 +410,7 @@ class TestCreateEventQueryCommon(object): where_exp = '{c1: {_eq: 1}}' set_exp = '{c2: "world"}' st_code, resp = update_mutation(hge_ctx, table, where_exp, set_exp) - + exp_ev_data = { "old": init_row, "new": {"c1": 1, "c2": "world"} @@ -636,7 +596,7 @@ class TestUpdateEventQuery(object): table = {"schema": "hge_tests", "name": "test_t1"} # Expect that inserting a row (which would have triggered in original - # create_event_trigger) does not trigger + # create_event_trigger) does not trigger init_row = {"c1": 1, "c2": "hello", "c3": {"name": "clarke"}} st_code, resp = insert(hge_ctx, table, init_row) assert st_code == 200, resp @@ -688,7 +648,7 @@ class TestUpdateEventQueryMSSQL(object): @classmethod def dir(cls): return 'queries/event_triggers/update_query' - + @pytest.fixture(autouse=True) def transact(self, request, hge_ctx, evts_webhook): print("In setup method") @@ -718,7 +678,7 @@ class TestUpdateEventQueryMSSQL(object): print("--- TEARDOWN STARTED -----") st_code, resp = hge_ctx.v2q_f(self.dir() + '/teardown-mssql.yaml') assert st_code == 200, resp - + def test_update_basic(self, hge_ctx, evts_webhook): table = {"schema": "hge_tests", "name": "test_t1"} @@ -867,7 +827,7 @@ class TestDeleteEventQueryMSSQL(object): with pytest.raises(queue.Empty): # NOTE: use a bit of a delay here, to catch any stray events generated above check_event(hge_ctx, evts_webhook, "t1_all", table, "DELETE", exp_ev_data, get_timeout=2) - + @usefixtures('per_class_tests_db_state') class TestEventSelCols: @@ -957,7 +917,7 @@ class TestEventSelColsMSSQL: where_exp = '{c1: {_eq: 1}}' set_exp = '{c1: 2}' - + # expected no event hence previous expected data st_code, resp = update_mutation(hge_ctx, table, where_exp, set_exp) print("----- RESP 2 -----", resp) @@ -971,7 +931,7 @@ class TestEventSelColsMSSQL: "old": {"c1": 2, "c2": "hello", "c3": "bellamy"}, "new": {"c1": 2, "c2": "world", "c3": "bellamy"} } - + st_code, resp = update_mutation(hge_ctx, table, where_exp, set_exp) print("----- RESP 3 -----", resp) assert st_code == 200, resp @@ -1166,9 +1126,9 @@ class TestEventUpdateOnlyMSSQL: with pytest.raises(queue.Empty): # NOTE: use a bit of a delay here, to catch any stray events generated above check_event(hge_ctx, evts_webhook, "t1_update", table, "DELETE", exp_ev_data, get_timeout=2) - + # CASE 3: An Update transaction, which can give rise to both CASE 1 and CASE 2 - # described above. + # described above. # i.e for a single update transaction which changes the primary key of a row # and a non primary key of another row, 2 event triggers should be fired. def test_update_both_cases(self, hge_ctx, evts_webhook): @@ -1192,7 +1152,7 @@ class TestEventUpdateOnlyMSSQL: # INSERT operations will not fire event triggers with pytest.raises(queue.Empty): check_event(hge_ctx, evts_webhook, "t1_update", table, "INSERT", exp_insert_ev_data, get_timeout=0) - + # An UPDATE SQL which will create two events, one for each case # The following update transaction does the following changes # We have the following values in table [(1, 'hello'), (2, 'world')] @@ -1203,8 +1163,8 @@ class TestEventUpdateOnlyMSSQL: "source": "mssql", "sql":''' UPDATE hge_tests.test_t1 - SET c1 = (CASE WHEN c1 = 1 THEN 2 - WHEN c1 = 2 THEN 3 + SET c1 = (CASE WHEN c1 = 1 THEN 2 + WHEN c1 = 2 THEN 3 ELSE c1 END), c2 = (CASE WHEN c1 = 2 THEN N'clarke' ELSE c2 END) ''' diff --git a/server/tests-py/test_subscriptions.py b/server/tests-py/test_subscriptions.py index a0cb41be4a4..7a9699debca 100644 --- a/server/tests-py/test_subscriptions.py +++ b/server/tests-py/test_subscriptions.py @@ -1,11 +1,13 @@ #!/usr/bin/env python3 +import datetime import time import pytest import json import queue from validate import check_query_f from collections import OrderedDict +from utils import insert_many from ruamel.yaml import YAML usefixtures = pytest.mark.usefixtures @@ -414,6 +416,115 @@ class TestSubscriptionLiveQueries: with pytest.raises(queue.Empty): ev = ws_client.get_ws_event(3) +@usefixtures('per_method_tests_db_state','ws_conn_init','streaming_subscriptions_fixtures') +class TestStreamingSubscription: + + @classmethod + def dir(cls): + return 'queries/subscriptions/streaming' + + def test_basic_streaming_subscription_existing_static_data(self, hge_ctx, ws_client): + ''' + Create connection using connection_init + ''' + ws_client.init_as_admin() + + query = """ + subscription ($batch_size: Int!) { + hge_tests_stream_query: hge_tests_articles_stream(cursor: {initial_value: {id: 0}}, batch_size: $batch_size) { + id + title + } + } + """ + + liveQs = [] + headers={} + articles_to_insert = [] + for i in range(10): + articles_to_insert.append({"id": i + 1, "title": "Article title {}".format(i + 1)}) + st_code, resp = insert_many(hge_ctx, {"schema": "hge_tests", "name": "articles"}, articles_to_insert) + assert st_code == 200, resp + if hge_ctx.hge_key is not None: + headers['X-Hasura-Admin-Secret'] = hge_ctx.hge_key + subscrPayload = { 'query': query, 'variables': { 'batch_size': 2 } } + respLive = ws_client.send_query(subscrPayload, query_id='stream_1', headers=headers, timeout=15) + liveQs.append(respLive) + for idx in range(5): + ev = next(respLive) + assert ev['type'] == 'data', ev + assert ev['id'] == 'stream_1', ev + # fetching two rows per batch + expected_payload = [ {"id": 2*idx+1, "title": "Article title {}".format(2*idx+1)}, {"id": 2*idx+2, "title": "Article title {}".format(2*idx+2)}] + assert ev['payload']['data'] == {'hge_tests_stream_query': expected_payload}, ev['payload']['data'] + + # stop the streaming subscription + frame = { + 'id': 'stream_1', + 'type': 'stop' + } + ws_client.send(frame) + + with pytest.raises(queue.Empty): + ev = ws_client.get_ws_event(3) + + def test_streaming_subscriptions_with_concurrent_data_inserts(self, hge_ctx, ws_client): + ''' + Create connection using connection_init + ''' + ws_client.init_as_admin() + headers={} + query = """ + subscription ($batch_size: Int!, $initial_created_at: timestamptz!) { + hge_tests_stream_query: hge_tests_test_t2_stream(cursor: [{initial_value: {created_at: $initial_created_at}, ordering: ASC}], batch_size: $batch_size) { + c1 + c2 + } + } + """ + + with open(self.dir() + "/steps.yaml") as c: + conf = yaml.load(c) + + subscrPayload = { 'query': query, 'variables': { 'batch_size': 2, 'initial_created_at': "2020-01-01" } } + respLive = ws_client.send_query(subscrPayload, query_id='stream_1', headers=headers, timeout=15) + + assert isinstance(conf, list) == True, 'Not an list' + for index, step in enumerate(conf): + mutationPayload = { 'query': step['query'] } + if 'variables' in step and step['variables']: + mutationPayload['variables'] = json.loads(step['variables']) + + expected_resp = json.loads(step['response']) + + mutResp = ws_client.send_query(mutationPayload,'mutation_'+str(index),timeout=15) + ev = next(mutResp) + assert ev['type'] == 'data' and ev['id'] == 'mutation_'+str(index), ev + assert ev['payload']['data'] == expected_resp, ev['payload']['data'] + + ev = next(mutResp) + assert ev['type'] == 'complete' and ev['id'] == 'mutation_'+str(index), ev + + ev = next(respLive) + assert ev['type'] == 'data', ev + assert ev['id'] == 'stream_1', ev + + expectedReturnedResponse = json.loads(step['stream_response']) + expectedLiveResponse = { 'hge_tests_stream_query' : expectedReturnedResponse } + + assert ev['payload']['data'] == expectedLiveResponse, ev['payload']['data'] + + # stop the streaming subscription + frame = { + 'id': 'stream_1', + 'type': 'stop' + } + ws_client.send(frame) + + with pytest.raises(queue.Empty): + ev = ws_client.get_ws_event(3) + + @usefixtures('per_method_tests_db_state','ws_conn_init_graphql_ws') class TestSubscriptionLiveQueriesForGraphQLWS: diff --git a/server/tests-py/utils.py b/server/tests-py/utils.py index ad8fc1e9b5e..c989b63a90e 100644 --- a/server/tests-py/utils.py +++ b/server/tests-py/utils.py @@ -17,3 +17,43 @@ def until_asserts_pass(tries, func): except AssertionError: time.sleep(0.3) pass + +def insert(hge_ctx, table, row, returning=[], headers = {}): + return insert_many(hge_ctx, table, [row], returning, headers) + +def insert_many(hge_ctx, table, rows, returning=[], headers = {}): + q = { + "type": "insert", + "args": { + "table": table, + "objects": rows, + "returning": returning + } + } + st_code, resp = hge_ctx.v1q(q, headers = headers) + return st_code, resp + + +def update(hge_ctx, table, where_exp, set_exp, headers = {}): + q = { + "type": "update", + "args": { + "table": table, + "where": where_exp, + "$set": set_exp + } + } + st_code, resp = hge_ctx.v1q(q, headers = headers) + return st_code, resp + + +def delete(hge_ctx, table, where_exp, headers = {}): + q = { + "type": "delete", + "args": { + "table": table, + "where": where_exp + } + } + st_code, resp = hge_ctx.v1q(q, headers = headers) + return st_code, resp