mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-17 20:41:49 +03:00
fd30fb343b
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/3982 Co-authored-by: Brandon Simmons <210815+jberryman@users.noreply.github.com> GitOrigin-RevId: f90b2e8f394e7bd69780f003d2d980475f104f42
267 lines
10 KiB
Haskell
267 lines
10 KiB
Haskell
{-# LANGUAGE ApplicativeDo #-}
|
|
{-# LANGUAGE TemplateHaskellQuotes #-}
|
|
|
|
-- | Generate the GraphQL schema types related to streaming subscriptions.
|
|
module Hasura.GraphQL.Schema.SubscriptionStream
|
|
( selectStreamTable,
|
|
)
|
|
where
|
|
|
|
import Data.Has
|
|
import Data.List.NonEmpty qualified as NE
|
|
import Data.Text.Extended ((<>>))
|
|
import Hasura.Base.Error (QErr)
|
|
import Hasura.GraphQL.Parser
|
|
( InputFieldsParser,
|
|
Kind (..),
|
|
Parser,
|
|
)
|
|
import Hasura.GraphQL.Parser qualified as P
|
|
import Hasura.GraphQL.Parser.Class
|
|
import Hasura.GraphQL.Parser.Constants qualified as G
|
|
import Hasura.GraphQL.Schema.Backend
|
|
import Hasura.GraphQL.Schema.Common
|
|
import Hasura.GraphQL.Schema.Select (tablePermissionsInfo, tableSelectionList, tableWhereArg)
|
|
import Hasura.GraphQL.Schema.Table (getTableGQLName, tableSelectColumns, tableSelectPermissions)
|
|
import Hasura.Prelude
|
|
import Hasura.RQL.IR.Select qualified as IR
|
|
import Hasura.RQL.Types.Column
|
|
import Hasura.RQL.Types.Common
|
|
import Hasura.RQL.Types.SchemaCache
|
|
import Hasura.RQL.Types.SourceCustomization (NamingCase, applyFieldNameCaseCust, applyTypeNameCaseCust)
|
|
import Hasura.RQL.Types.Subscription
|
|
import Hasura.RQL.Types.Table
|
|
import Language.GraphQL.Draft.Syntax qualified as G
|
|
|
|
-- | Argument to limit the maximum number of results returned in a single batch.
|
|
cursorBatchSizeArg ::
|
|
forall n.
|
|
MonadParse n =>
|
|
NamingCase ->
|
|
InputFieldsParser n Int
|
|
cursorBatchSizeArg tCase =
|
|
fromIntegral
|
|
<$> P.field batchSizeName batchSizeDesc P.nonNegativeInt
|
|
where
|
|
batchSizeName = applyFieldNameCaseCust tCase G._batch_size
|
|
batchSizeDesc = Just $ G.Description "maximum number of rows returned in a single batch"
|
|
|
|
-- | Cursor ordering enum fields
|
|
--
|
|
-- > enum cursor_ordering {
|
|
-- > ASC
|
|
-- > DESC
|
|
-- > }
|
|
cursorOrderingArgParser ::
|
|
forall n m r.
|
|
(MonadSchema n m, Has P.MkTypename r, Has NamingCase r, MonadReader r m) =>
|
|
m (Parser 'Both n CursorOrdering)
|
|
cursorOrderingArgParser = do
|
|
tCase <- asks getter
|
|
enumName <- P.mkTypename $ applyTypeNameCaseCust tCase G._cursor_ordering
|
|
let description =
|
|
Just $
|
|
G.Description $
|
|
"ordering argument of a cursor"
|
|
pure $
|
|
P.enum enumName description $
|
|
NE.fromList -- It's fine to use fromList here because we know the list is never empty.
|
|
[ ( define enumNameVal,
|
|
snd enumNameVal
|
|
)
|
|
| enumNameVal <- [(G._ASC, COAscending), (G._DESC, CODescending)]
|
|
]
|
|
where
|
|
define (name, val) =
|
|
let orderingTypeDesc = bool "descending" "ascending" $ val == COAscending
|
|
in P.Definition name (Just $ G.Description $ orderingTypeDesc <> " ordering of the cursor") P.EnumValueInfo
|
|
|
|
-- | Argument to specify the ordering of the cursor.
|
|
-- > ordering: cursor_ordering
|
|
cursorOrderingArg ::
|
|
forall n m r.
|
|
(MonadSchema n m, Has P.MkTypename r, Has NamingCase r, MonadReader r m) =>
|
|
m (InputFieldsParser n (Maybe CursorOrdering))
|
|
cursorOrderingArg = do
|
|
cursorOrderingParser' <- cursorOrderingArgParser
|
|
pure do
|
|
P.fieldOptional G._ordering (Just $ G.Description "cursor ordering") cursorOrderingParser'
|
|
|
|
-- | Input fields parser to parse the value of a table's column
|
|
-- > column_name: column_type
|
|
streamColumnParserArg ::
|
|
forall b n m r.
|
|
(BackendSchema b, MonadSchema n m, Has P.MkTypename r, MonadReader r m, MonadError QErr m, Has NamingCase r) =>
|
|
ColumnInfo b ->
|
|
m (InputFieldsParser n (Maybe (ColumnInfo b, ColumnValue b)))
|
|
streamColumnParserArg colInfo = do
|
|
fieldParser <- typedParser colInfo
|
|
let fieldName = ciName colInfo
|
|
fieldDesc = ciDescription colInfo
|
|
pure do
|
|
P.fieldOptional fieldName fieldDesc fieldParser <&> fmap (colInfo,)
|
|
where
|
|
typedParser columnInfo = do
|
|
fmap P.openValueOrigin <$> columnParser (ciType columnInfo) (G.Nullability $ ciIsNullable columnInfo)
|
|
|
|
-- | Input object parser whose keys are the column names and the values are the
|
|
-- initial values of those columns from where the streaming should start.
|
|
-- > input table_stream_cursor_value_input {
|
|
-- > col1: col1_type
|
|
-- > col2: col2_type
|
|
-- ...
|
|
-- > }
|
|
streamColumnValueParser ::
|
|
forall b n m r.
|
|
(BackendSchema b, MonadSchema n m, Has P.MkTypename r, MonadReader r m, MonadError QErr m, Has NamingCase r) =>
|
|
SourceName ->
|
|
G.Name ->
|
|
[ColumnInfo b] ->
|
|
m (Parser 'Input n [(ColumnInfo b, ColumnValue b)])
|
|
streamColumnValueParser sourceName tableGQLName colInfos =
|
|
memoizeOn 'streamColumnValueParser (sourceName, tableGQLName) $ do
|
|
tCase <- asks getter
|
|
columnVals <- sequenceA <$> traverse streamColumnParserArg colInfos
|
|
objName <- P.mkTypename $ tableGQLName <> applyTypeNameCaseCust tCase G.__stream_cursor_value_input
|
|
pure do
|
|
let description = G.Description $ "Initial value of the column from where the streaming should start"
|
|
P.object objName (Just description) columnVals <&> catMaybes
|
|
|
|
-- | Argument to accept the initial value from where the streaming should start.
|
|
-- > initial_value: table_stream_cursor_value_input!
|
|
streamColumnValueParserArg ::
|
|
forall b n m r.
|
|
( BackendSchema b,
|
|
MonadSchema n m,
|
|
Has P.MkTypename r,
|
|
MonadReader r m,
|
|
MonadError QErr m,
|
|
Has NamingCase r
|
|
) =>
|
|
SourceName ->
|
|
G.Name ->
|
|
[ColumnInfo b] ->
|
|
m (InputFieldsParser n [(ColumnInfo b, ColumnValue b)])
|
|
streamColumnValueParserArg sourceName tableGQLName colInfos = do
|
|
tCase <- asks getter
|
|
columnValueParser <- streamColumnValueParser sourceName tableGQLName colInfos
|
|
pure do
|
|
P.field (applyFieldNameCaseCust tCase G._initial_value) (Just $ G.Description "Stream column input with initial value") columnValueParser
|
|
|
|
-- | Argument to accept the cursor data. At the time of writing this, only a single
|
|
-- column cursor is supported and if multiple column cursors are provided,
|
|
-- then a parse error is thrown.
|
|
-- >
|
|
tableStreamColumnArg ::
|
|
forall n m r b.
|
|
(BackendSchema b, MonadSchema n m, Has P.MkTypename r, MonadReader r m, MonadError QErr m, Has NamingCase r) =>
|
|
SourceName ->
|
|
G.Name ->
|
|
[ColumnInfo b] ->
|
|
m (InputFieldsParser n [IR.StreamCursorItem b])
|
|
tableStreamColumnArg sourceName tableGQLName colInfos = do
|
|
cursorOrderingParser <- cursorOrderingArg
|
|
streamColumnParser <- streamColumnValueParserArg sourceName tableGQLName colInfos
|
|
pure $ do
|
|
orderingArg <- cursorOrderingParser
|
|
columnArg <- streamColumnParser
|
|
pure $ (uncurry (IR.StreamCursorItem (fromMaybe COAscending orderingArg))) <$> columnArg
|
|
|
|
-- | Input object that contains the initial value of a column
|
|
-- along with how it needs to be ordered.
|
|
-- > input table_stream_cursor_input {
|
|
-- > initial_value: table_stream_cursor_value_input!
|
|
-- > ordering: cursor_ordering
|
|
-- > }
|
|
tableStreamCursorExp ::
|
|
forall m n r b.
|
|
MonadBuildSchema b r m n =>
|
|
SourceName ->
|
|
TableInfo b ->
|
|
m (Parser 'Input n [(IR.StreamCursorItem b)])
|
|
tableStreamCursorExp sourceName tableInfo =
|
|
memoizeOn 'tableStreamCursorExp (sourceName, tableInfoName tableInfo) $ do
|
|
tCase <- asks getter
|
|
tableGQLName <- getTableGQLName tableInfo
|
|
columnInfos <- tableSelectColumns sourceName tableInfo
|
|
objName <- P.mkTypename $ tableGQLName <> applyTypeNameCaseCust tCase G.__stream_cursor_input
|
|
let description =
|
|
G.Description $ "Streaming cursor of the table " <>> tableGQLName
|
|
columnParsers <- tableStreamColumnArg sourceName tableGQLName columnInfos
|
|
pure $ P.object objName (Just description) columnParsers
|
|
|
|
-- | Argument to accept the cursor input object.
|
|
-- > cursor: [table_stream_cursor_input]!
|
|
tableStreamCursorArg ::
|
|
forall b r m n.
|
|
MonadBuildSchema b r m n =>
|
|
SourceName ->
|
|
TableInfo b ->
|
|
m (InputFieldsParser n [IR.StreamCursorItem b])
|
|
tableStreamCursorArg sourceName tableInfo = do
|
|
cursorParser <- tableStreamCursorExp sourceName tableInfo
|
|
pure $ do
|
|
cursorArgs <-
|
|
P.field cursorName cursorDesc $ P.list $ P.nullable cursorParser
|
|
pure $ concat $ catMaybes cursorArgs
|
|
where
|
|
cursorName = G._cursor
|
|
cursorDesc = Just $ G.Description "cursor to stream the results returned by the query"
|
|
|
|
-- | Arguments to the streaming subscription field.
|
|
-- > table_stream (cursor: [table_stream_cursor_input]!, batch_size: Int!, where: table_bool_exp)
|
|
tableStreamArguments ::
|
|
forall b r m n.
|
|
MonadBuildSchema b r m n =>
|
|
SourceName ->
|
|
TableInfo b ->
|
|
m (InputFieldsParser n (SelectStreamArgs b))
|
|
tableStreamArguments sourceName tableInfo = do
|
|
tCase <- asks getter
|
|
whereParser <- tableWhereArg sourceName tableInfo
|
|
cursorParser <- tableStreamCursorArg sourceName tableInfo
|
|
pure $ do
|
|
whereArg <- whereParser
|
|
cursorArg <-
|
|
cursorParser `P.bindFields` \case
|
|
[] -> parseError "one streaming column field is expected"
|
|
[c] -> pure c
|
|
_ -> parseError "multiple column cursors are not supported yet"
|
|
batchSizeArg <- cursorBatchSizeArg tCase
|
|
pure $
|
|
IR.SelectStreamArgsG whereArg batchSizeArg cursorArg
|
|
|
|
-- | Field parser for a streaming subscription for a table.
|
|
selectStreamTable ::
|
|
forall b r m n.
|
|
MonadBuildSchema b r m n =>
|
|
SourceName ->
|
|
-- | table info
|
|
TableInfo b ->
|
|
-- | field display name
|
|
G.Name ->
|
|
-- | field description, if any
|
|
Maybe G.Description ->
|
|
m (Maybe (P.FieldParser n (StreamSelectExp b)))
|
|
selectStreamTable sourceName tableInfo fieldName description = runMaybeT $ do
|
|
selectPermissions <- MaybeT $ tableSelectPermissions tableInfo
|
|
xStreamSubscription <- hoistMaybe $ streamSubscriptionExtension @b
|
|
stringifyNum <- asks $ qcStringifyNum . getter
|
|
tableStreamArgsParser <- lift $ tableStreamArguments sourceName tableInfo
|
|
selectionSetParser <- MaybeT $ tableSelectionList sourceName tableInfo
|
|
lift $
|
|
memoizeOn 'selectStreamTable (sourceName, tableName, fieldName) $ do
|
|
pure $
|
|
P.subselection fieldName description tableStreamArgsParser selectionSetParser
|
|
<&> \(args, fields) ->
|
|
IR.AnnSelectStreamG
|
|
{ IR._assnXStreamingSubscription = xStreamSubscription,
|
|
IR._assnFields = fields,
|
|
IR._assnFrom = IR.FromTable tableName,
|
|
IR._assnPerm = tablePermissionsInfo selectPermissions,
|
|
IR._assnArgs = args,
|
|
IR._assnStrfyNum = stringifyNum
|
|
}
|
|
where
|
|
tableName = tableInfoName tableInfo
|