mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-13 19:33:55 +03:00
* Add support for multiple top-level fields in a subscription to improve testability of subscriptions * Add an internal flag to enable multiple subscriptions * Add missing call to withConstructorFn in live queries (fix #3239) Co-authored-by: Alexis King <lexi.lambda@gmail.com>
This commit is contained in:
parent
9a16e25769
commit
4d10a610f4
@ -37,6 +37,7 @@ Read more about the session argument for computed fields in the [docs](https://h
|
||||
|
||||
(Add entries here in the order of: server, console, cli, docs, others)
|
||||
|
||||
- server: fix mishandling of GeoJSON inputs in subscriptions (fix #3239)
|
||||
- console: avoid count queries for large tables (#4692)
|
||||
- console: add read replica support section to pro popup (#4118)
|
||||
- console: allow modifying default value for PK (fix #4075) (#4679)
|
||||
|
@ -331,32 +331,6 @@ getMutOp ctx sqlGenCtx userInfo manager reqHeaders selSet =
|
||||
ordByCtx = _gOrdByCtx ctx
|
||||
insCtxMap = _gInsCtxMap ctx
|
||||
|
||||
getSubsOpM
|
||||
:: ( MonadError QErr m
|
||||
, MonadReader r m
|
||||
, Has QueryCtxMap r
|
||||
, Has FieldMap r
|
||||
, Has OrdByCtx r
|
||||
, Has SQLGenCtx r
|
||||
, Has UserInfo r
|
||||
, MonadIO m
|
||||
, HasVersion
|
||||
)
|
||||
=> PGExecCtx
|
||||
-> QueryReusability
|
||||
-> VQ.Field
|
||||
-> QueryActionExecuter
|
||||
-> m (EL.LiveQueryPlan, Maybe EL.ReusableLiveQueryPlan)
|
||||
getSubsOpM pgExecCtx initialReusability fld actionExecuter =
|
||||
case VQ._fName fld of
|
||||
"__typename" ->
|
||||
throwVE "you cannot create a subscription on '__typename' field"
|
||||
_ -> do
|
||||
(astUnresolved, finalReusability) <- runReusabilityTWith initialReusability $
|
||||
GR.queryFldToPGAST fld actionExecuter
|
||||
let varTypes = finalReusability ^? _Reusable
|
||||
EL.buildLiveQueryPlan pgExecCtx (VQ._fAlias fld) astUnresolved varTypes
|
||||
|
||||
getSubsOp
|
||||
:: ( MonadError QErr m
|
||||
, MonadIO m
|
||||
@ -368,10 +342,11 @@ getSubsOp
|
||||
-> UserInfo
|
||||
-> QueryReusability
|
||||
-> QueryActionExecuter
|
||||
-> VQ.Field
|
||||
-> VQ.SelSet
|
||||
-> m (EL.LiveQueryPlan, Maybe EL.ReusableLiveQueryPlan)
|
||||
getSubsOp pgExecCtx gCtx sqlGenCtx userInfo queryReusability actionExecuter fld =
|
||||
runE gCtx sqlGenCtx userInfo $ getSubsOpM pgExecCtx queryReusability fld actionExecuter
|
||||
getSubsOp pgExecCtx gCtx sqlGenCtx userInfo queryReusability actionExecuter fields =
|
||||
runE gCtx sqlGenCtx userInfo $ EL.buildLiveQueryPlan pgExecCtx queryReusability actionExecuter fields
|
||||
-- runE gCtx sqlGenCtx userInfo $ getSubsOpM pgExecCtx queryReusability fld actionExecuter
|
||||
|
||||
execRemoteGQ
|
||||
:: ( HasVersion
|
||||
|
@ -44,16 +44,21 @@ import Data.Has
|
||||
import Data.UUID (UUID)
|
||||
|
||||
import qualified Hasura.GraphQL.Resolve as GR
|
||||
import qualified Hasura.GraphQL.Resolve.Action as RA
|
||||
import qualified Hasura.GraphQL.Resolve.Types as GR
|
||||
import qualified Hasura.GraphQL.Transport.HTTP.Protocol as GH
|
||||
import qualified Hasura.GraphQL.Validate as GV
|
||||
import qualified Hasura.GraphQL.Validate.Types as GV
|
||||
import qualified Hasura.SQL.DML as S
|
||||
|
||||
import Hasura.Db
|
||||
import Hasura.EncJSON
|
||||
import Hasura.GraphQL.Utils
|
||||
import Hasura.RQL.Types
|
||||
import Hasura.SQL.Error
|
||||
import Hasura.SQL.Types
|
||||
import Hasura.SQL.Value
|
||||
import Hasura.Server.Version (HasVersion)
|
||||
|
||||
-- -------------------------------------------------------------------------------------------------
|
||||
-- Multiplexed queries
|
||||
@ -61,26 +66,39 @@ import Hasura.SQL.Value
|
||||
newtype MultiplexedQuery = MultiplexedQuery { unMultiplexedQuery :: Q.Query }
|
||||
deriving (Show, Eq, Hashable, J.ToJSON)
|
||||
|
||||
mkMultiplexedQuery :: Q.Query -> MultiplexedQuery
|
||||
mkMultiplexedQuery baseQuery =
|
||||
MultiplexedQuery . Q.fromText $ foldMap Q.getQueryText [queryPrefix, baseQuery, querySuffix]
|
||||
mkMultiplexedQuery :: Map.HashMap G.Alias GR.QueryRootFldResolved -> MultiplexedQuery
|
||||
mkMultiplexedQuery rootFields = MultiplexedQuery . Q.fromBuilder . toSQL $ S.mkSelect
|
||||
{ S.selExtr =
|
||||
-- SELECT _subs.result_id, _fld_resp.root AS result
|
||||
[ S.Extractor (mkQualIden (Iden "_subs") (Iden "result_id")) Nothing
|
||||
, S.Extractor (mkQualIden (Iden "_fld_resp") (Iden "root")) (Just . S.Alias $ Iden "result") ]
|
||||
, S.selFrom = Just $ S.FromExp [S.FIJoin $
|
||||
S.JoinExpr subsInputFromItem S.LeftOuter responseLateralFromItem (S.JoinOn $ S.BELit True)]
|
||||
}
|
||||
where
|
||||
queryPrefix =
|
||||
[Q.sql|
|
||||
select
|
||||
_subs.result_id, _fld_resp.root as result
|
||||
from
|
||||
unnest(
|
||||
$1::uuid[], $2::json[]
|
||||
) _subs (result_id, result_vars)
|
||||
left outer join lateral
|
||||
(
|
||||
|]
|
||||
-- FROM unnest($1::uuid[], $2::json[]) _subs (result_id, result_vars)
|
||||
subsInputFromItem = S.FIUnnest
|
||||
[S.SEPrep 1 `S.SETyAnn` S.TypeAnn "uuid[]", S.SEPrep 2 `S.SETyAnn` S.TypeAnn "json[]"]
|
||||
(S.Alias $ Iden "_subs")
|
||||
[S.SEIden $ Iden "result_id", S.SEIden $ Iden "result_vars"]
|
||||
|
||||
querySuffix =
|
||||
[Q.sql|
|
||||
) _fld_resp ON ('true')
|
||||
|]
|
||||
-- LEFT OUTER JOIN LATERAL ( ... ) _fld_resp
|
||||
responseLateralFromItem = S.mkLateralFromItem selectRootFields (S.Alias $ Iden "_fld_resp")
|
||||
selectRootFields = S.mkSelect
|
||||
{ S.selExtr = [S.Extractor rootFieldsJsonAggregate (Just . S.Alias $ Iden "root")]
|
||||
, S.selFrom = Just . S.FromExp $
|
||||
flip map (Map.toList rootFields) $ \(fieldAlias, resolvedAST) ->
|
||||
S.mkSelFromItem (GR.toSQLSelect resolvedAST) (S.Alias $ aliasToIden fieldAlias)
|
||||
}
|
||||
|
||||
-- json_build_object('field1', field1.root, 'field2', field2.root, ...)
|
||||
rootFieldsJsonAggregate = S.SEFnApp "json_build_object" rootFieldsJsonPairs Nothing
|
||||
rootFieldsJsonPairs = flip concatMap (Map.keys rootFields) $ \fieldAlias ->
|
||||
[ S.SELit (G.unName $ G.unAlias fieldAlias)
|
||||
, mkQualIden (aliasToIden fieldAlias) (Iden "root") ]
|
||||
|
||||
mkQualIden prefix = S.SEQIden . S.QIden (S.QualIden prefix Nothing) -- TODO fix this Nothing of course
|
||||
aliasToIden = Iden . G.unName . G.unAlias
|
||||
|
||||
-- | Resolves an 'GR.UnresolvedVal' by converting 'GR.UVPG' values to SQL expressions that refer to
|
||||
-- the @result_vars@ input object, collecting variable values along the way.
|
||||
@ -103,11 +121,13 @@ resolveMultiplexedValue = \case
|
||||
GR.UVSQL sqlExp -> pure sqlExp
|
||||
GR.UVSession -> pure $ fromResVars (PGTypeScalar PGJSON) ["session"]
|
||||
where
|
||||
fromResVars ty jPath =
|
||||
flip S.SETyAnn (S.mkTypeAnn ty) $ S.SEOpApp (S.SQLOp "#>>")
|
||||
fromResVars pgType jPath = addTypeAnnotation pgType $ S.SEOpApp (S.SQLOp "#>>")
|
||||
[ S.SEQIden $ S.QIden (S.QualIden (Iden "_subs") Nothing) (Iden "result_vars")
|
||||
, S.SEArray $ map S.SELit jPath
|
||||
]
|
||||
addTypeAnnotation pgType = flip S.SETyAnn (S.mkTypeAnn pgType) . case pgType of
|
||||
PGTypeScalar scalarType -> withConstructorFn scalarType
|
||||
PGTypeArray _ -> id
|
||||
|
||||
newtype CohortId = CohortId { unCohortId :: UUID }
|
||||
deriving (Show, Eq, Hashable, J.ToJSON, Q.FromCol)
|
||||
@ -230,7 +250,6 @@ data LiveQueryPlan
|
||||
data ParameterizedLiveQueryPlan
|
||||
= ParameterizedLiveQueryPlan
|
||||
{ _plqpRole :: !RoleName
|
||||
, _plqpAlias :: !G.Alias
|
||||
, _plqpQuery :: !MultiplexedQuery
|
||||
} deriving (Show)
|
||||
$(J.deriveToJSON (J.aesonDrop 4 J.snakeCase) ''ParameterizedLiveQueryPlan)
|
||||
@ -249,30 +268,41 @@ buildLiveQueryPlan
|
||||
:: ( MonadError QErr m
|
||||
, MonadReader r m
|
||||
, Has UserInfo r
|
||||
, Has GR.FieldMap r
|
||||
, Has GR.OrdByCtx r
|
||||
, Has GR.QueryCtxMap r
|
||||
, Has SQLGenCtx r
|
||||
, HasVersion
|
||||
, MonadIO m
|
||||
)
|
||||
=> PGExecCtx
|
||||
-> G.Alias
|
||||
-> GR.QueryRootFldUnresolved
|
||||
-> Maybe GV.ReusableVariableTypes
|
||||
-> GV.QueryReusability
|
||||
-> RA.QueryActionExecuter
|
||||
-> GV.SelSet
|
||||
-> m (LiveQueryPlan, Maybe ReusableLiveQueryPlan)
|
||||
buildLiveQueryPlan pgExecCtx fieldAlias astUnresolved varTypes = do
|
||||
buildLiveQueryPlan pgExecCtx initialReusability actionExecutioner fields = do
|
||||
((resolvedASTs, (queryVariableValues, syntheticVariableValues)), finalReusability) <-
|
||||
GV.runReusabilityTWith initialReusability . flip runStateT mempty $
|
||||
fmap Map.fromList . for (toList fields) $ \field -> case GV._fName field of
|
||||
"__typename" -> throwVE "you cannot create a subscription on '__typename' field"
|
||||
_ -> do
|
||||
unresolvedAST <- GR.queryFldToPGAST field actionExecutioner
|
||||
resolvedAST <- GR.traverseQueryRootFldAST resolveMultiplexedValue unresolvedAST
|
||||
pure (GV._fAlias field, resolvedAST)
|
||||
|
||||
userInfo <- asks getter
|
||||
|
||||
(astResolved, (queryVariableValues, syntheticVariableValues)) <- flip runStateT mempty $
|
||||
GR.traverseQueryRootFldAST resolveMultiplexedValue astUnresolved
|
||||
let pgQuery = mkMultiplexedQuery $ GR.toPGQuery astResolved
|
||||
let multiplexedQuery = mkMultiplexedQuery resolvedASTs
|
||||
roleName = _uiRole userInfo
|
||||
parameterizedPlan = ParameterizedLiveQueryPlan roleName fieldAlias pgQuery
|
||||
parameterizedPlan = ParameterizedLiveQueryPlan roleName multiplexedQuery
|
||||
|
||||
-- We need to ensure that the values provided for variables
|
||||
-- are correct according to Postgres. Without this check
|
||||
-- an invalid value for a variable for one instance of the
|
||||
-- subscription will take down the entire multiplexed query
|
||||
-- We need to ensure that the values provided for variables are correct according to Postgres.
|
||||
-- Without this check an invalid value for a variable for one instance of the subscription will
|
||||
-- take down the entire multiplexed query.
|
||||
validatedQueryVars <- validateVariables pgExecCtx queryVariableValues
|
||||
validatedSyntheticVars <- validateVariables pgExecCtx (toList syntheticVariableValues)
|
||||
let cohortVariables = CohortVariables (_uiSession userInfo) validatedQueryVars validatedSyntheticVars
|
||||
plan = LiveQueryPlan parameterizedPlan cohortVariables
|
||||
varTypes = finalReusability ^? GV._Reusable
|
||||
reusablePlan = ReusableLiveQueryPlan parameterizedPlan validatedSyntheticVars <$> varTypes
|
||||
pure (plan, reusablePlan)
|
||||
|
||||
|
@ -44,7 +44,6 @@ import qualified Data.HashMap.Strict as Map
|
||||
import qualified Data.Time.Clock as Clock
|
||||
import qualified Data.UUID as UUID
|
||||
import qualified Data.UUID.V4 as UUID
|
||||
import qualified Language.GraphQL.Draft.Syntax as G
|
||||
import qualified ListT
|
||||
import qualified StmContainers.Map as STMMap
|
||||
import qualified System.Metrics.Distribution as Metrics
|
||||
@ -65,11 +64,7 @@ import Hasura.Session
|
||||
-- -------------------------------------------------------------------------------------------------
|
||||
-- Subscribers
|
||||
|
||||
data Subscriber
|
||||
= Subscriber
|
||||
{ _sRootAlias :: !G.Alias
|
||||
, _sOnChangeCallback :: !OnChange
|
||||
}
|
||||
newtype Subscriber = Subscriber { _sOnChangeCallback :: OnChange }
|
||||
|
||||
-- | live query onChange metadata, used for adding more extra analytics data
|
||||
data LiveQueryMetadata
|
||||
@ -85,7 +80,6 @@ data LiveQueryResponse
|
||||
}
|
||||
|
||||
type LGQResponse = GQResult LiveQueryResponse
|
||||
|
||||
type OnChange = LGQResponse -> IO ()
|
||||
|
||||
newtype SubscriberId = SubscriberId { _unSinkId :: UUID.UUID }
|
||||
@ -183,7 +177,6 @@ data CohortSnapshot
|
||||
|
||||
pushResultToCohort
|
||||
:: GQResult EncJSON
|
||||
-- ^ a response that still needs to be wrapped with each 'Subscriber'’s root 'G.Alias'
|
||||
-> Maybe ResponseHash
|
||||
-> LiveQueryMetadata
|
||||
-> CohortSnapshot
|
||||
@ -202,13 +195,8 @@ pushResultToCohort result !respHashM (LiveQueryMetadata dTime) cohortSnapshot =
|
||||
pushResultToSubscribers sinks
|
||||
where
|
||||
CohortSnapshot _ respRef curSinks newSinks = cohortSnapshot
|
||||
pushResultToSubscribers = A.mapConcurrently_ $ \(Subscriber alias action) ->
|
||||
let aliasText = G.unName $ G.unAlias alias
|
||||
wrapWithAlias response = LiveQueryResponse
|
||||
{ _lqrPayload = encJToLBS $ encJFromAssocList [(aliasText, response)]
|
||||
, _lqrExecutionTime = dTime
|
||||
}
|
||||
in action (wrapWithAlias <$> result)
|
||||
response = result <&> \payload -> LiveQueryResponse (encJToLBS payload) dTime
|
||||
pushResultToSubscribers = A.mapConcurrently_ $ \(Subscriber action) -> action response
|
||||
|
||||
-- -------------------------------------------------------------------------------------------------
|
||||
-- Pollers
|
||||
|
@ -107,11 +107,11 @@ addLiveQuery logger lqState plan onResultAction = do
|
||||
where
|
||||
LiveQueriesState lqOpts pgExecCtx lqMap = lqState
|
||||
LiveQueriesOptions batchSize refetchInterval = lqOpts
|
||||
LiveQueryPlan (ParameterizedLiveQueryPlan role alias query) cohortKey = plan
|
||||
LiveQueryPlan (ParameterizedLiveQueryPlan role query) cohortKey = plan
|
||||
|
||||
handlerId = PollerKey role query
|
||||
|
||||
!subscriber = Subscriber alias onResultAction
|
||||
!subscriber = Subscriber onResultAction
|
||||
addToCohort sinkId handlerC =
|
||||
TMap.insert subscriber sinkId $ _cNewSubscribers handlerC
|
||||
|
||||
|
@ -12,6 +12,7 @@ module Hasura.GraphQL.Resolve
|
||||
, QueryRootFldUnresolved
|
||||
, QueryRootFldResolved
|
||||
, toPGQuery
|
||||
, toSQLSelect
|
||||
|
||||
, RIntro.schemaR
|
||||
, RIntro.typeR
|
||||
@ -68,12 +69,12 @@ traverseQueryRootFldAST f = \case
|
||||
|
||||
toPGQuery :: QueryRootFldResolved -> Q.Query
|
||||
toPGQuery = \case
|
||||
QRFPk s -> DS.selectQuerySQL DS.JASSingleObject s
|
||||
QRFSimple s -> DS.selectQuerySQL DS.JASMultipleRows s
|
||||
QRFAgg s -> DS.selectAggQuerySQL s
|
||||
QRFActionSelect s -> DS.selectQuerySQL DS.JASSingleObject s
|
||||
QRFActionExecuteObject s -> DS.selectQuerySQL DS.JASSingleObject s
|
||||
QRFActionExecuteList s -> DS.selectQuerySQL DS.JASMultipleRows s
|
||||
QRFPk s -> Q.fromBuilder $ toSQL $ DS.mkSQLSelect DS.JASSingleObject s
|
||||
QRFSimple s -> Q.fromBuilder $ toSQL $ DS.mkSQLSelect DS.JASMultipleRows s
|
||||
QRFAgg s -> Q.fromBuilder $ toSQL $ DS.mkAggSelect s
|
||||
QRFActionSelect s -> Q.fromBuilder $ toSQL $ DS.mkSQLSelect DS.JASSingleObject s
|
||||
QRFActionExecuteObject s -> Q.fromBuilder $ toSQL $ DS.mkSQLSelect DS.JASSingleObject s
|
||||
QRFActionExecuteList s -> Q.fromBuilder $ toSQL $ DS.mkSQLSelect DS.JASMultipleRows s
|
||||
|
||||
validateHdrs
|
||||
:: (Foldable t, QErrM m) => UserInfo -> t Text -> m ()
|
||||
@ -189,3 +190,12 @@ getOpCtx f = do
|
||||
opCtxMap <- asks getter
|
||||
onNothing (Map.lookup f opCtxMap) $ throw500 $
|
||||
"lookup failed: opctx: " <> showName f
|
||||
|
||||
toSQLSelect :: QueryRootFldResolved -> S.Select
|
||||
toSQLSelect = \case
|
||||
QRFPk s -> DS.mkSQLSelect DS.JASSingleObject s
|
||||
QRFSimple s -> DS.mkSQLSelect DS.JASMultipleRows s
|
||||
QRFAgg s -> DS.mkAggSelect s
|
||||
QRFActionSelect s -> DS.mkSQLSelect DS.JASSingleObject s
|
||||
QRFActionExecuteObject s -> DS.mkSQLSelect DS.JASSingleObject s
|
||||
QRFActionExecuteList s -> DS.mkSQLSelect DS.JASSingleObject s
|
||||
|
@ -171,7 +171,7 @@ resolveActionMutationSync field executionContext sessionVariables = do
|
||||
(_fType field) $ _fSelSet field
|
||||
astResolved <- RS.traverseAnnSimpleSel resolveValTxt selectAstUnresolved
|
||||
let jsonAggType = mkJsonAggSelect outputType
|
||||
return $ (,respHeaders) $ asSingleRowJsonResp (RS.selectQuerySQL jsonAggType astResolved) []
|
||||
return $ (,respHeaders) $ asSingleRowJsonResp (Q.fromBuilder $ toSQL $ RS.mkSQLSelect jsonAggType astResolved) []
|
||||
where
|
||||
ActionExecutionContext actionName outputType outputFields definitionList resolvedWebhook confHeaders
|
||||
forwardClientHeaders = executionContext
|
||||
|
@ -21,8 +21,8 @@ import Data.Has
|
||||
|
||||
import qualified Data.HashMap.Strict as Map
|
||||
import qualified Data.HashSet as HS
|
||||
import qualified Data.Sequence as Seq
|
||||
import qualified Language.GraphQL.Draft.Syntax as G
|
||||
import qualified Data.Sequence as Seq
|
||||
|
||||
import Hasura.GraphQL.Schema
|
||||
import Hasura.GraphQL.Transport.HTTP.Protocol
|
||||
@ -58,8 +58,7 @@ getTypedOp opNameM selSets opDefs =
|
||||
throwVE $ "operationName cannot be used when " <>
|
||||
"an anonymous operation exists in the document"
|
||||
(Nothing, [selSet], []) ->
|
||||
return $ G.TypedOperationDefinition
|
||||
G.OperationTypeQuery Nothing [] [] selSet
|
||||
return $ G.TypedOperationDefinition G.OperationTypeQuery Nothing [] [] selSet
|
||||
(Nothing, [], [opDef]) ->
|
||||
return opDef
|
||||
(Nothing, _, _) ->
|
||||
@ -145,7 +144,7 @@ validateFrag (G.FragmentDefinition n onTy dirs selSet) = do
|
||||
data RootSelSet
|
||||
= RQuery !SelSet
|
||||
| RMutation !SelSet
|
||||
| RSubscription !Field
|
||||
| RSubscription !SelSet
|
||||
deriving (Show, Eq)
|
||||
|
||||
validateGQ
|
||||
@ -172,12 +171,15 @@ validateGQ (QueryParts opDef opRoot fragDefsL varValsM) = do
|
||||
G.OperationTypeQuery -> return $ RQuery selSet
|
||||
G.OperationTypeMutation -> return $ RMutation selSet
|
||||
G.OperationTypeSubscription ->
|
||||
case Seq.viewl selSet of
|
||||
Seq.EmptyL -> throw500 "empty selset for subscription"
|
||||
fld Seq.:< rst -> do
|
||||
unless (null rst) $
|
||||
throwVE "subscription must select only one top level field"
|
||||
return $ RSubscription fld
|
||||
case selSet of
|
||||
Seq.Empty -> throw500 "empty selset for subscription"
|
||||
(_ Seq.:<| rst) -> do
|
||||
-- As an internal testing feature, we support subscribing to multiple
|
||||
-- selection sets. First check if the corresponding directive is set.
|
||||
let multipleAllowed = elem (G.Directive "_multiple_top_level_fields" []) (G._todDirectives opDef)
|
||||
unless (multipleAllowed || null rst) $
|
||||
throwVE "subscriptions must select one top level field"
|
||||
return $ RSubscription selSet
|
||||
|
||||
isQueryInAllowlist :: GQLExecDoc -> HS.HashSet GQLQuery -> Bool
|
||||
isQueryInAllowlist q = HS.member gqlQuery
|
||||
|
@ -825,6 +825,9 @@ class (Monad m) => MonadReusability m where
|
||||
instance (MonadReusability m) => MonadReusability (ReaderT r m) where
|
||||
recordVariableUse a b = lift $ recordVariableUse a b
|
||||
markNotReusable = lift markNotReusable
|
||||
instance (MonadReusability m) => MonadReusability (StateT s m) where
|
||||
recordVariableUse a b = lift $ recordVariableUse a b
|
||||
markNotReusable = lift markNotReusable
|
||||
|
||||
newtype ReusabilityT m a = ReusabilityT { unReusabilityT :: StateT QueryReusability m a }
|
||||
deriving (Functor, Applicative, Monad, MonadError e, MonadReader r, MonadIO)
|
||||
|
@ -1,7 +1,5 @@
|
||||
module Hasura.RQL.DML.Select
|
||||
( selectP2
|
||||
, selectQuerySQL
|
||||
, selectAggQuerySQL
|
||||
, convSelectQuery
|
||||
, asSingleRowJsonResp
|
||||
, module Hasura.RQL.DML.Select.Internal
|
||||
@ -275,14 +273,6 @@ selectP2 jsonAggSelect (sel, p) =
|
||||
where
|
||||
selectSQL = toSQL $ mkSQLSelect jsonAggSelect sel
|
||||
|
||||
selectQuerySQL :: JsonAggSelect -> AnnSimpleSel -> Q.Query
|
||||
selectQuerySQL jsonAggSelect sel =
|
||||
Q.fromBuilder $ toSQL $ mkSQLSelect jsonAggSelect sel
|
||||
|
||||
selectAggQuerySQL :: AnnAggSel -> Q.Query
|
||||
selectAggQuerySQL =
|
||||
Q.fromBuilder . toSQL . mkAggSelect
|
||||
|
||||
asSingleRowJsonResp :: Q.Query -> [Q.PrepArg] -> Q.TxE QErr EncJSON
|
||||
asSingleRowJsonResp query args =
|
||||
encJFromBS . runIdentity . Q.getRow
|
||||
|
@ -716,7 +716,7 @@ baseNodeToSel joinCond baseNode =
|
||||
, S.selWhere = Just $ injectJoinCond joinCond whr
|
||||
}
|
||||
baseSelAls = S.Alias $ mkBaseTableAls pfx
|
||||
baseFromItem = S.FISelect (S.Lateral False) baseSel baseSelAls
|
||||
baseFromItem = S.mkSelFromItem baseSel baseSelAls
|
||||
|
||||
-- function to create a joined from item from two from items
|
||||
leftOuterJoin current new =
|
||||
|
@ -23,7 +23,10 @@ paren t = TB.char '(' <> t <> TB.char ')'
|
||||
|
||||
data Select
|
||||
= Select
|
||||
{ selDistinct :: !(Maybe DistinctExpr)
|
||||
{ selCTEs :: ![(Alias, Select)]
|
||||
-- ^ Unlike 'SelectWith', does not allow data-modifying statements (as those are only allowed at
|
||||
-- the top level of a query).
|
||||
, selDistinct :: !(Maybe DistinctExpr)
|
||||
, selExtr :: ![Extractor]
|
||||
, selFrom :: !(Maybe FromExp)
|
||||
, selWhere :: !(Maybe WhereFrag)
|
||||
@ -37,7 +40,7 @@ instance NFData Select
|
||||
instance Cacheable Select
|
||||
|
||||
mkSelect :: Select
|
||||
mkSelect = Select Nothing [] Nothing
|
||||
mkSelect = Select [] Nothing [] Nothing
|
||||
Nothing Nothing Nothing
|
||||
Nothing Nothing Nothing
|
||||
|
||||
@ -163,17 +166,20 @@ instance ToSQL WhereFrag where
|
||||
"WHERE" <-> paren (toSQL be)
|
||||
|
||||
instance ToSQL Select where
|
||||
toSQL sel =
|
||||
"SELECT"
|
||||
<-> toSQL (selDistinct sel)
|
||||
<-> (", " <+> selExtr sel)
|
||||
<-> toSQL (selFrom sel)
|
||||
<-> toSQL (selWhere sel)
|
||||
<-> toSQL (selGroupBy sel)
|
||||
<-> toSQL (selHaving sel)
|
||||
<-> toSQL (selOrderBy sel)
|
||||
<-> toSQL (selLimit sel)
|
||||
<-> toSQL (selOffset sel)
|
||||
toSQL sel = case selCTEs sel of
|
||||
[] -> "SELECT"
|
||||
<-> toSQL (selDistinct sel)
|
||||
<-> (", " <+> selExtr sel)
|
||||
<-> toSQL (selFrom sel)
|
||||
<-> toSQL (selWhere sel)
|
||||
<-> toSQL (selGroupBy sel)
|
||||
<-> toSQL (selHaving sel)
|
||||
<-> toSQL (selOrderBy sel)
|
||||
<-> toSQL (selLimit sel)
|
||||
<-> toSQL (selOffset sel)
|
||||
-- reuse SelectWith if there are any CTEs, since the generated SQL is the same
|
||||
ctes -> toSQL $ SelectWith (map (CTESelect <$>) ctes) sel { selCTEs = [] }
|
||||
|
||||
|
||||
mkSIdenExp :: (IsIden a) => a -> SQLExp
|
||||
mkSIdenExp = SEIden . toIden
|
||||
|
@ -60,6 +60,7 @@ uSelect :: S.Select -> Uniq S.Select
|
||||
uSelect sel = do
|
||||
-- this has to be the first thing to process
|
||||
newFromM <- mapM uFromExp fromM
|
||||
newCTEs <- for ctes $ \(alias, cte) -> (,) <$> addAlias alias <*> uSelect cte
|
||||
|
||||
newWhereM <- forM whereM $
|
||||
\(S.WhereFrag be) -> S.WhereFrag <$> uBoolExp be
|
||||
@ -70,10 +71,10 @@ uSelect sel = do
|
||||
newOrdM <- mapM uOrderBy ordByM
|
||||
newDistM <- mapM uDistinct distM
|
||||
newExtrs <- mapM uExtractor extrs
|
||||
return $ S.Select newDistM newExtrs newFromM newWhereM newGrpM
|
||||
return $ S.Select newCTEs newDistM newExtrs newFromM newWhereM newGrpM
|
||||
newHavnM newOrdM limitM offM
|
||||
where
|
||||
S.Select distM extrs fromM whereM grpM havnM ordByM limitM offM = sel
|
||||
S.Select ctes distM extrs fromM whereM grpM havnM ordByM limitM offM = sel
|
||||
uDistinct = \case
|
||||
S.DistinctSimple -> return S.DistinctSimple
|
||||
S.DistinctOn l -> S.DistinctOn <$> mapM uSqlExp l
|
||||
|
@ -7,7 +7,7 @@ response:
|
||||
- name: New York
|
||||
query:
|
||||
query: |
|
||||
{
|
||||
query {
|
||||
geog_table(where: {
|
||||
geog_col: {
|
||||
_cast: {
|
||||
|
@ -8,7 +8,7 @@ response:
|
||||
- name: Paris
|
||||
query:
|
||||
query: |
|
||||
{
|
||||
query {
|
||||
geog_as_geom_table(where: {
|
||||
geom_col: {
|
||||
_cast: {
|
||||
|
@ -9,7 +9,7 @@ response:
|
||||
message: 'field "integer" not found in type: ''geography_cast_exp'''
|
||||
query:
|
||||
query: |
|
||||
{
|
||||
query {
|
||||
geog_table(where: {geog_col: {_cast: {integer: {_eq: 0}}}}) {
|
||||
name
|
||||
}
|
||||
|
@ -18,7 +18,7 @@ class TestGraphQLQueryBasic:
|
||||
|
||||
def test_select_query_author_with_include_directive(self, hge_ctx, transport):
|
||||
check_query_f(hge_ctx, self.dir() + '/select_query_author_include_directive.yaml', transport)
|
||||
|
||||
|
||||
# Can't run server upgrade tests, as this test has a schema change
|
||||
@pytest.mark.skip_server_upgrade_test
|
||||
def test_select_various_postgres_types(self, hge_ctx, transport):
|
||||
@ -388,7 +388,7 @@ class TestGraphQLQueryBoolExpJsonB:
|
||||
def dir(cls):
|
||||
return 'queries/graphql_query/boolexp/jsonb'
|
||||
|
||||
@pytest.mark.parametrize("transport", ['http', 'websocket'])
|
||||
@pytest.mark.parametrize("transport", ['http', 'websocket', 'subscription'])
|
||||
@usefixtures('per_class_tests_db_state')
|
||||
class TestGraphQLQueryBoolExpPostGIS:
|
||||
|
||||
|
@ -10,8 +10,10 @@ import os
|
||||
import base64
|
||||
import jsondiff
|
||||
import jwt
|
||||
import queue
|
||||
import random
|
||||
import warnings
|
||||
import pytest
|
||||
|
||||
from context import GQLWsClient, PytestConf
|
||||
|
||||
@ -194,27 +196,34 @@ def check_query(hge_ctx, conf, transport='http', add_auth=True, claims_namespace
|
||||
test_forbidden_when_admin_secret_reqd(hge_ctx, conf)
|
||||
headers['X-Hasura-Admin-Secret'] = hge_ctx.hge_key
|
||||
|
||||
assert transport in ['websocket', 'http'], "Unknown transport type " + transport
|
||||
if transport == 'websocket':
|
||||
assert 'response' in conf
|
||||
assert conf['url'].endswith('/graphql')
|
||||
print('running on websocket')
|
||||
return validate_gql_ws_q(
|
||||
hge_ctx,
|
||||
conf['url'],
|
||||
conf['query'],
|
||||
headers,
|
||||
conf['response'],
|
||||
True
|
||||
)
|
||||
elif transport == 'http':
|
||||
assert transport in ['http', 'websocket', 'subscription'], "Unknown transport type " + transport
|
||||
if transport == 'http':
|
||||
print('running on http')
|
||||
return validate_http_anyq(hge_ctx, conf['url'], conf['query'], headers,
|
||||
conf['status'], conf.get('response'))
|
||||
elif transport == 'websocket':
|
||||
print('running on websocket')
|
||||
return validate_gql_ws_q(hge_ctx, conf, headers, retry=True)
|
||||
elif transport == 'subscription':
|
||||
print('running via subscription')
|
||||
return validate_gql_ws_q(hge_ctx, conf, headers, retry=True, via_subscription=True)
|
||||
|
||||
|
||||
def validate_gql_ws_q(hge_ctx, conf, headers, retry=False, via_subscription=False):
|
||||
assert 'response' in conf
|
||||
assert conf['url'].endswith('/graphql')
|
||||
endpoint = conf['url']
|
||||
query = conf['query']
|
||||
exp_http_response = conf['response']
|
||||
|
||||
if via_subscription:
|
||||
query_text = query['query']
|
||||
assert query_text.startswith('query '), query_text
|
||||
# make the query into a subscription and add the
|
||||
# _multiple_subscriptions directive that enables having more
|
||||
# than 1 root field in a subscription
|
||||
query['query'] = 'subscription' + query_text[len('query'):].replace("{"," @_multiple_top_level_fields {",1)
|
||||
|
||||
def validate_gql_ws_q(hge_ctx, endpoint, query, headers, exp_http_response, retry=False):
|
||||
if endpoint == '/v1alpha1/graphql':
|
||||
ws_client = GQLWsClient(hge_ctx, '/v1alpha1/graphql')
|
||||
else:
|
||||
@ -223,7 +232,7 @@ def validate_gql_ws_q(hge_ctx, endpoint, query, headers, exp_http_response, retr
|
||||
if not headers or len(headers) == 0:
|
||||
ws_client.init({})
|
||||
|
||||
query_resp = ws_client.send_query(query, headers=headers, timeout=15)
|
||||
query_resp = ws_client.send_query(query, query_id='hge_test', headers=headers, timeout=15)
|
||||
resp = next(query_resp)
|
||||
print('websocket resp: ', resp)
|
||||
|
||||
@ -240,10 +249,15 @@ def validate_gql_ws_q(hge_ctx, endpoint, query, headers, exp_http_response, retr
|
||||
assert resp['type'] in ['data', 'error'], resp
|
||||
else:
|
||||
assert resp['type'] == 'data', resp
|
||||
|
||||
assert 'payload' in resp, resp
|
||||
resp_done = next(query_resp)
|
||||
assert resp_done['type'] == 'complete'
|
||||
|
||||
if via_subscription:
|
||||
ws_client.send({ 'id': 'hge_test', 'type': 'stop' })
|
||||
with pytest.raises(queue.Empty):
|
||||
ws_client.get_ws_event(0)
|
||||
else:
|
||||
resp_done = next(query_resp)
|
||||
assert resp_done['type'] == 'complete'
|
||||
|
||||
return assert_graphql_resp_expected(resp['payload'], exp_http_response, query, skip_if_err_msg=hge_ctx.avoid_err_msg_checks)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user