Merge pull request #2856 from lexi-lambda/more-subscription-multiplexing

Refactor query plan caching and improve subscription multiplexing
This commit is contained in:
Vamshi Surabhi 2019-09-17 09:13:29 +05:30 committed by GitHub
commit 41551f19bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 1482 additions and 1049 deletions

View File

@ -244,9 +244,11 @@ library
, Hasura.GraphQL.Execute.Plan
, Hasura.GraphQL.Execute.Query
, Hasura.GraphQL.Execute.LiveQuery
, Hasura.GraphQL.Execute.LiveQuery.Types
, Hasura.GraphQL.Execute.LiveQuery.Multiplexed
, Hasura.GraphQL.Execute.LiveQuery.Fallback
, Hasura.GraphQL.Execute.LiveQuery.Options
, Hasura.GraphQL.Execute.LiveQuery.Plan
, Hasura.GraphQL.Execute.LiveQuery.Poll
, Hasura.GraphQL.Execute.LiveQuery.State
, Hasura.GraphQL.Execute.LiveQuery.TMap
, Hasura.GraphQL.Resolve
, Hasura.GraphQL.Resolve.Types
, Hasura.GraphQL.Resolve.Context
@ -265,13 +267,15 @@ library
, Hasura.HTTP
, Control.Concurrent.Extended
, Control.Lens.Extended
, Data.Text.Extended
, Data.Aeson.Extended
, Data.Sequence.NonEmpty
, Data.TByteString
, Data.HashMap.Strict.InsOrd.Extended
, Data.Parser.JSONPath
, Data.Sequence.NonEmpty
, Data.TByteString
, Data.Text.Extended
, Data.Time.Clock.Units
, Hasura.SQL.DML
, Hasura.SQL.Error
@ -299,6 +303,7 @@ library
EmptyCase
FlexibleContexts
FlexibleInstances
FunctionalDependencies
GeneralizedNewtypeDeriving
InstanceSigs
LambdaCase
@ -307,6 +312,7 @@ library
NoImplicitPrelude
OverloadedStrings
QuasiQuotes
RankNTypes
ScopedTypeVariables
TemplateHaskell
TupleSections
@ -341,6 +347,7 @@ executable graphql-engine
EmptyCase
FlexibleContexts
FlexibleInstances
FunctionalDependencies
GeneralizedNewtypeDeriving
InstanceSigs
LambdaCase
@ -349,6 +356,7 @@ executable graphql-engine
NoImplicitPrelude
OverloadedStrings
QuasiQuotes
RankNTypes
ScopedTypeVariables
TemplateHaskell
TupleSections

View File

@ -0,0 +1,16 @@
module Control.Concurrent.Extended
( module Control.Concurrent
, threadDelay
) where
import Prelude
import qualified Control.Concurrent as Base
import Control.Concurrent hiding (threadDelay)
import Data.Time.Clock (DiffTime)
import Data.Time.Clock.Units (Microseconds (..))
-- | Like 'Base.threadDelay', but takes a 'DiffTime' instead of an 'Int'.
threadDelay :: DiffTime -> IO ()
threadDelay = Base.threadDelay . round . Microseconds

View File

@ -0,0 +1,109 @@
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DerivingVia #-}
{-# LANGUAGE TypeOperators #-}
{-| Types for time intervals of various units. Each newtype wraps 'DiffTime', but they have
different 'Num' instances. The intent is to use the record selectors to write literals with
particular units, like this:
@
>>> 'milliseconds' 500
0.5s
>>> 'hours' 3
10800s
>>> 'minutes' 1.5 + 'seconds' 30
120s
@
You can also go the other way using the constructors rather than the selectors:
@
>>> 'toRational' '$' 'Minutes' ('seconds' 17)
17 % 60
>>> 'realToFrac' ('Days' ('hours' 12)) :: 'Double'
0.5
@
Generally, it doesnt make sense to pass these wrappers around or put them inside data structures,
since any function that needs a duration should just accept a 'DiffTime', but theyre useful for
literals and conversions to/from other types. -}
module Data.Time.Clock.Units
( Days(..)
, Hours(..)
, Minutes(..)
, Seconds
, seconds
, Milliseconds(..)
, Microseconds(..)
, Nanoseconds(..)
) where
import Prelude
import Data.Proxy
import Data.Time.Clock
import GHC.TypeLits
type Seconds = DiffTime
seconds :: DiffTime -> DiffTime
seconds = id
newtype Days = Days { days :: DiffTime }
deriving (Show, Eq, Ord)
deriving (Num, Fractional, Real, RealFrac) via (TimeUnit (SecondsP 86400))
newtype Hours = Hours { hours :: DiffTime }
deriving (Show, Eq, Ord)
deriving (Num, Fractional, Real, RealFrac) via (TimeUnit (SecondsP 3600))
newtype Minutes = Minutes { minutes :: DiffTime }
deriving (Show, Eq, Ord)
deriving (Num, Fractional, Real, RealFrac) via (TimeUnit (SecondsP 60))
newtype Milliseconds = Milliseconds { milliseconds :: DiffTime }
deriving (Show, Eq, Ord)
deriving (Num, Fractional, Real, RealFrac) via (TimeUnit 1000000000)
newtype Microseconds = Microseconds { microseconds :: DiffTime }
deriving (Show, Eq, Ord)
deriving (Num, Fractional, Real, RealFrac) via (TimeUnit 1000000)
newtype Nanoseconds = Nanoseconds { nanoseconds :: DiffTime }
deriving (Show, Eq, Ord)
deriving (Num, Fractional, Real, RealFrac) via (TimeUnit 1000)
newtype TimeUnit (picosPerUnit :: Nat) = TimeUnit DiffTime
deriving (Show, Eq, Ord)
type SecondsP n = n GHC.TypeLits.* 1000000000000
natNum :: forall n a. (KnownNat n, Num a) => a
natNum = fromInteger $ natVal (Proxy @n)
instance (KnownNat picosPerUnit) => Num (TimeUnit picosPerUnit) where
TimeUnit a + TimeUnit b = TimeUnit $ a + b
TimeUnit a - TimeUnit b = TimeUnit $ a - b
TimeUnit a * TimeUnit b = TimeUnit . picosecondsToDiffTime $
diffTimeToPicoseconds a * diffTimeToPicoseconds b `div` natNum @picosPerUnit
negate (TimeUnit a) = TimeUnit $ negate a
abs (TimeUnit a) = TimeUnit $ abs a
signum (TimeUnit a) = TimeUnit $ signum a
fromInteger a = TimeUnit . picosecondsToDiffTime $ a * natNum @picosPerUnit
instance (KnownNat picosPerUnit) => Fractional (TimeUnit picosPerUnit) where
TimeUnit a / TimeUnit b = TimeUnit . picosecondsToDiffTime $
diffTimeToPicoseconds a * natNum @picosPerUnit `div` diffTimeToPicoseconds b
fromRational a = TimeUnit . picosecondsToDiffTime $ round (a * natNum @picosPerUnit)
instance (KnownNat picosPerUnit) => Real (TimeUnit picosPerUnit) where
toRational (TimeUnit a) = toRational (diffTimeToPicoseconds a) / natNum @picosPerUnit
instance (KnownNat picosPerUnit) => RealFrac (TimeUnit picosPerUnit) where
properFraction a = (i, a - fromIntegral i)
where i = truncate a
truncate = truncate . toRational
round = round . toRational
ceiling = ceiling . toRational
floor = floor . toRational

View File

@ -1,3 +1,6 @@
{-| An in-memory, unbounded, capability-local cache implementation. By making the cache
capability-local, data may be recomputed up to once per capability (which usually means up to once
per OS thread), but write contention from multiple threads is unlikely. -}
module Hasura.Cache
( UnboundedCache
, initCache

View File

@ -45,6 +45,14 @@ instance (MonadTx m) => MonadTx (ReaderT s m) where
instance (MonadTx m) => MonadTx (ValidateT e m) where
liftTx = lift . liftTx
-- | Like 'Q.TxE', but defers acquiring a Postgres connection until the first execution of 'liftTx'.
-- If no call to 'liftTx' is ever reached (i.e. a successful result is returned or an error is
-- raised before ever executing a query), no connection is ever acquired.
--
-- This is useful for certain code paths that only conditionally need database access. For example,
-- although most queries will eventually hit Postgres, introspection queries or queries that
-- exclusively use remote schemas never will; using 'LazyTx' keeps those branches from unnecessarily
-- allocating a connection.
data LazyTx e a
= LTErr !e
| LTNoTx !a

View File

@ -112,8 +112,7 @@ gatherTypeLocs gCtx nodes =
in maybe qr (Map.union qr) mr
-- This is for when the graphql query is validated
type ExecPlanPartial
= GQExecPlan (GCtx, VQ.RootSelSet, [G.VariableDefinition])
type ExecPlanPartial = GQExecPlan (GCtx, VQ.RootSelSet)
getExecPlanPartial
:: (MonadError QErr m)
@ -141,8 +140,7 @@ getExecPlanPartial userInfo sc enableAL req = do
case typeLoc of
VT.TLHasuraType -> do
rootSelSet <- runReaderT (VQ.validateGQ queryParts) gCtx
let varDefs = G._todVariableDefinitions $ VQ.qpOpDef queryParts
return $ GExPHasura (gCtx, rootSelSet, varDefs)
return $ GExPHasura (gCtx, rootSelSet)
VT.TLRemoteType _ rsi ->
return $ GExPRemote rsi opDef
where
@ -167,7 +165,7 @@ getExecPlanPartial userInfo sc enableAL req = do
data ExecOp
= ExOpQuery !LazyRespTx !(Maybe EQ.GeneratedSqlMap)
| ExOpMutation !LazyRespTx
| ExOpSubs !EL.LiveQueryOp
| ExOpSubs !EL.LiveQueryPlan
-- The graphql query is resolved into an execution operation
type ExecPlanResolved
@ -196,7 +194,7 @@ getResolvedExecPlan pgExecCtx planCache userInfo sqlGenCtx
(tx, genSql) <- EQ.queryOpFromPlan usrVars queryVars queryPlan
return $ ExOpQuery tx (Just genSql)
EP.RPSubs subsPlan ->
ExOpSubs <$> EL.subsOpFromPlan pgExecCtx usrVars queryVars subsPlan
ExOpSubs <$> EL.reuseLiveQueryPlan pgExecCtx usrVars queryVars subsPlan
Nothing -> noExistingPlan
where
GQLReq opNameM queryStr queryVars = reqUnparsed
@ -204,21 +202,19 @@ getResolvedExecPlan pgExecCtx planCache userInfo sqlGenCtx
liftIO $ EP.addPlan scVer (userRole userInfo)
opNameM queryStr plan planCache
noExistingPlan = do
req <- toParsed reqUnparsed
req <- toParsed reqUnparsed
partialExecPlan <- getExecPlanPartial userInfo sc enableAL req
forM partialExecPlan $ \(gCtx, rootSelSet, varDefs) ->
forM partialExecPlan $ \(gCtx, rootSelSet) ->
case rootSelSet of
VQ.RMutation selSet ->
ExOpMutation <$> getMutOp gCtx sqlGenCtx userInfo selSet
VQ.RQuery selSet -> do
(queryTx, planM, genSql) <- getQueryOp gCtx sqlGenCtx
userInfo selSet varDefs
mapM_ (addPlanToCache . EP.RPQuery) planM
(queryTx, plan, genSql) <- getQueryOp gCtx sqlGenCtx userInfo selSet
traverse_ (addPlanToCache . EP.RPQuery) plan
return $ ExOpQuery queryTx (Just genSql)
VQ.RSubscription fld -> do
(lqOp, planM) <- getSubsOp pgExecCtx gCtx sqlGenCtx
userInfo reqUnparsed varDefs fld
mapM_ (addPlanToCache . EP.RPSubs) planM
(lqOp, plan) <- getSubsOp pgExecCtx gCtx sqlGenCtx userInfo fld
traverse_ (addPlanToCache . EP.RPSubs) plan
return $ ExOpSubs lqOp
-- Monad for resolving a hasura query/mutation
@ -258,10 +254,9 @@ getQueryOp
-> SQLGenCtx
-> UserInfo
-> VQ.SelSet
-> [G.VariableDefinition]
-> m (LazyRespTx, Maybe EQ.ReusableQueryPlan, EQ.GeneratedSqlMap)
getQueryOp gCtx sqlGenCtx userInfo fields varDefs =
runE gCtx sqlGenCtx userInfo $ EQ.convertQuerySelSet varDefs fields
getQueryOp gCtx sqlGenCtx userInfo fields =
runE gCtx sqlGenCtx userInfo $ EQ.convertQuerySelSet fields
mutationRootName :: Text
mutationRootName = "mutation_root"
@ -282,7 +277,7 @@ resolveMutSelSet fields = do
aliasedTxs <- forM (toList fields) $ \fld -> do
fldRespTx <- case VQ._fName fld of
"__typename" -> return $ return $ encJFromJValue mutationRootName
_ -> liftTx <$> GR.mutFldToTx fld
_ -> fmap liftTx . evalResolveT $ GR.mutFldToTx fld
return (G.unName $ G.unAlias $ VQ._fAlias fld, fldRespTx)
-- combines all transactions into a single transaction
@ -318,17 +313,15 @@ getSubsOpM
, MonadIO m
)
=> PGExecCtx
-> GQLReqUnparsed
-> [G.VariableDefinition]
-> VQ.Field
-> m (EL.LiveQueryOp, Maybe EL.SubsPlan)
getSubsOpM pgExecCtx req varDefs fld =
-> m (EL.LiveQueryPlan, Maybe EL.ReusableLiveQueryPlan)
getSubsOpM pgExecCtx fld =
case VQ._fName fld of
"__typename" ->
throwVE "you cannot create a subscription on '__typename' field"
_ -> do
astUnresolved <- GR.queryFldToPGAST fld
EL.subsOpFromPGAST pgExecCtx req varDefs (VQ._fAlias fld, astUnresolved)
(astUnresolved, varTypes) <- runResolveT $ GR.queryFldToPGAST fld
EL.buildLiveQueryPlan pgExecCtx (VQ._fAlias fld) astUnresolved varTypes
getSubsOp
:: ( MonadError QErr m
@ -338,12 +331,10 @@ getSubsOp
-> GCtx
-> SQLGenCtx
-> UserInfo
-> GQLReqUnparsed
-> [G.VariableDefinition]
-> VQ.Field
-> m (EL.LiveQueryOp, Maybe EL.SubsPlan)
getSubsOp pgExecCtx gCtx sqlGenCtx userInfo req varDefs fld =
runE gCtx sqlGenCtx userInfo $ getSubsOpM pgExecCtx req varDefs fld
-> m (EL.LiveQueryPlan, Maybe EL.ReusableLiveQueryPlan)
getSubsOp pgExecCtx gCtx sqlGenCtx userInfo fld =
runE gCtx sqlGenCtx userInfo $ getSubsOpM pgExecCtx fld
execRemoteGQ
:: ( MonadIO m

View File

@ -1,305 +1,118 @@
{-# LANGUAGE CPP #-}
{-|
= Reasonably efficient PostgreSQL live queries
The module implements /query multiplexing/, which is our implementation strategy for live queries
(i.e. GraphQL subscriptions) made against Postgres. Fundamentally, our implementation is built
around polling, which is never ideal, but its a lot easier to implement than trying to do something
event-based. To minimize the resource cost of polling, we use /multiplexing/, which is essentially
a two-tier batching strategy.
== The high-level idea
The objective is to minimize the number of concurrent polling workers to reduce database load as
much as possible. A very naïve strategy would be to group identical queries together so we only have
one poller per /unique/ active subscription. Thats a good start, but of course, in practice, most
queries differ slightly. However, it happens that they very frequently /only differ in their
variables/ (that is, GraphQL query variables and session variables), and in those cases, we try to
generated parameterized SQL. This means that the same prepared SQL query can be reused, just with a
different set of variables.
To give a concrete example, consider the following query:
> subscription vote_count($post_id: Int!) {
> vote_count(where: {post_id: {_eq: $post_id}}) {
> votes
> }
> }
No matter what the client provides for @$post_id@, we will always generate the same SQL:
> SELECT votes FROM vote_count WHERE post_id = $1
If multiple clients subscribe to @vote_count@, we can certainly reuse the same prepared query. For
example, imagine we had 10 concurrent subscribers, each listening on a distinct @$post_id@:
> let postIds = [3, 11, 32, 56, 13, 97, 24, 43, 109, 48]
We could iterate over @postIds@ in Haskell, executing the same prepared query 10 times:
> for postIds $ \postId ->
> Q.listQE defaultTxErrorHandler preparedQuery (Identity postId) True
Sadly, that on its own isnt good enough. The overhead of running each query is large enough that
Postgres becomes overwhelmed if we have to serve lots of concurrent subscribers. Therefore, what we
want to be able to do is somehow make one query instead of ten.
=== Multiplexing
This is where multiplexing comes in. By taking advantage of Postgres
<https://www.postgresql.org/docs/11/queries-table-expressions.html#QUERIES-LATERAL lateral joins>,
we can do the iteration in Postgres rather than in Haskell, allowing us to pay the query overhead
just once for all ten subscribers. Essentially, lateral joins add 'map'-like functionality to SQL,
so we can run our query once per @$post_id@:
> SELECT results.votes
> FROM unnest($1::integer[]) query_variables (post_id)
> LEFT JOIN LATERAL (
> SELECT coalesce(json_agg(votes), '[]')
> FROM vote_count WHERE vote_count.post_id = query_variables.post_id
> ) results ON true
If we generalize this approach just a little bit more, we can apply this transformation to arbitrary
queries parameterized over arbitrary session and query variables!
== Implementation overview
To support query multiplexing, we maintain a tree of the following types, where @>@ should be read
as contains:
@
'LiveQueriesState' > 'Poller' > 'Cohort' > 'Subscriber'
@
Heres a brief summary of each types role:
* A 'Subscriber' is an actual client with an open websocket connection.
* A 'Cohort' is a set of 'Subscriber's that are all subscribed to the same query /with the exact
same variables/. (By batching these together, we can do better than multiplexing, since we can
just query the data once.)
* A 'Poller' is a worker thread for a single, multiplexed query. It fetches data for a set of
'Cohort's that all use the same parameterized query, but have different sets of variables.
* Finally, the 'LiveQueriesState' is the top-level container that holds all the active 'Poller's.
Additional details are provided by the documentation for individual bindings.
-}
module Hasura.GraphQL.Execute.LiveQuery
( RefetchInterval
, refetchIntervalFromMilli
, LQM.BatchSize
, LQM.mkBatchSize
, LQM.MxOpts
, LQM.mkMxOpts
, LQF.FallbackOpts
, LQF.mkFallbackOpts
, LQOpts
, mkLQOpts
( LiveQueryPlan
, ReusableLiveQueryPlan
, reuseLiveQueryPlan
, buildLiveQueryPlan
, LiveQueriesState
, initLiveQueriesState
, dumpLiveQueriesState
, LiveQueryOp
, LiveQueriesOptions(..)
, BatchSize(..)
, RefetchInterval(..)
, mkLiveQueriesOptions
, LiveQueryId
, addLiveQuery
, removeLiveQuery
, SubsPlan
, subsOpFromPlan
, subsOpFromPGAST
) where
import Control.Lens
import Data.Has
import Hasura.GraphQL.Execute.LiveQuery.Options
import Hasura.GraphQL.Execute.LiveQuery.Plan
import Hasura.GraphQL.Execute.LiveQuery.State
import qualified Control.Concurrent.STM as STM
import qualified Data.Aeson as J
import qualified Data.HashMap.Strict as Map
import qualified Data.HashSet as Set
import qualified Data.Text as T
import qualified Database.PG.Query as Q
import qualified Language.GraphQL.Draft.Syntax as G
import qualified Hasura.GraphQL.Execute.LiveQuery.Fallback as LQF
import qualified Hasura.GraphQL.Execute.LiveQuery.Multiplexed as LQM
import qualified Hasura.GraphQL.Resolve as GR
import qualified Hasura.GraphQL.Transport.HTTP.Protocol as GH
import qualified Hasura.GraphQL.Validate as GV
import qualified Hasura.SQL.DML as S
import Hasura.Db
import Hasura.EncJSON
import Hasura.GraphQL.Execute.LiveQuery.Types
#ifdef __HADDOCK_VERSION__
import Hasura.Prelude
import Hasura.RQL.DML.Select (asSingleRowJsonResp)
import Hasura.RQL.Types
import Hasura.SQL.Error
import Hasura.SQL.Types
import Hasura.SQL.Value
data LQOpts
= LQOpts
{ _loMxOpts :: LQM.MxOpts
, _loFallbackOpts :: LQF.FallbackOpts
} deriving (Show, Eq)
-- | Required for logging server configuration on startup
instance J.ToJSON LQOpts where
toJSON (LQOpts mxOpts fbOpts) =
J.object [ "multiplexed_options" J..= mxOpts
, "fallback_options" J..= fbOpts
]
mkLQOpts :: LQM.MxOpts -> LQF.FallbackOpts -> LQOpts
mkLQOpts = LQOpts
data LiveQueriesState
= LiveQueriesState
{ _lqsMultiplexed :: !LQM.LiveQueriesState
, _lqsFallback :: !LQF.LiveQueriesState
, _lqsPGExecTx :: !PGExecCtx
}
dumpLiveQueriesState
:: Bool -> LiveQueriesState -> IO J.Value
dumpLiveQueriesState extended (LiveQueriesState mx fallback _) = do
mxJ <- LQM.dumpLiveQueriesState extended mx
fallbackJ <- LQF.dumpLiveQueriesState fallback
return $ J.object
[ "fallback" J..= fallbackJ
, "multiplexed" J..= mxJ
]
initLiveQueriesState
:: LQOpts
-> PGExecCtx
-> IO LiveQueriesState
initLiveQueriesState (LQOpts mxOpts fallbackOpts) pgExecCtx = do
(mxMap, fallbackMap) <- STM.atomically $
(,) <$> LQM.initLiveQueriesState mxOpts
<*> LQF.initLiveQueriesState fallbackOpts
return $ LiveQueriesState mxMap fallbackMap pgExecCtx
data LiveQueryOp
= LQMultiplexed !LQM.MxOp
| LQFallback !LQF.FallbackOp
data LiveQueryId
= LQIMultiplexed !LQM.LiveQueryId
| LQIFallback !LQF.LiveQueryId
addLiveQuery
:: LiveQueriesState
-> LiveQueryOp
-- the action to be executed when result changes
-> OnChange
-> IO LiveQueryId
addLiveQuery lqState liveQOp onResultAction =
case liveQOp of
LQMultiplexed mxOp ->
LQIMultiplexed <$> LQM.addLiveQuery pgExecCtx mxMap mxOp onResultAction
LQFallback fallbackOp ->
LQIFallback <$> LQF.addLiveQuery
pgExecCtx fallbackMap fallbackOp onResultAction
where
LiveQueriesState mxMap fallbackMap pgExecCtx = lqState
removeLiveQuery
:: LiveQueriesState
-- the query and the associated operation
-> LiveQueryId
-> IO ()
removeLiveQuery lqState = \case
LQIMultiplexed lqId -> LQM.removeLiveQuery mxMap lqId
LQIFallback lqId -> LQF.removeLiveQuery fallbackMap lqId
where
LiveQueriesState mxMap fallbackMap _ = lqState
data SubsPlan
= SubsPlan
{ _sfMxOpCtx :: !LQM.MxOpCtx
, _sfVariableTypes :: !GV.VarPGTypes
}
instance J.ToJSON SubsPlan where
toJSON (SubsPlan opCtx varTypes) =
J.object [ "mx_op_ctx" J..= opCtx
, "variable_types" J..= varTypes
]
collectNonNullableVars
:: (MonadState GV.VarPGTypes m)
=> GR.UnresolvedVal -> m ()
collectNonNullableVars = \case
GR.UVPG annPGVal -> do
let GR.AnnPGVal varM isNullable colTy _ = annPGVal
case (varM, isNullable) of
(Just var, False) -> modify (Map.insert var colTy)
_ -> return ()
_ -> return ()
type TextEncodedVariables
= Map.HashMap G.Variable TxtEncodedPGVal
-- | converts the partial unresolved value containing
-- variables, session variables to an SQL expression
-- referring correctly to the values from '_subs' temporary table
-- The variables are at _subs.result_vars.variables and
-- session variables at _subs.result_vars.user
toMultiplexedQueryVar
:: (MonadState GV.AnnPGVarVals m)
=> GR.UnresolvedVal -> m S.SQLExp
toMultiplexedQueryVar = \case
GR.UVPG annPGVal ->
let GR.AnnPGVal varM isNullable _ colVal = annPGVal
in case (varM, isNullable) of
-- we don't check for nullability as
-- this is only used for reusable plans
-- the check has to be made before this
(Just var, _) -> do
modify $ Map.insert var colVal
return $ fromResVars (PGTypeScalar $ pstType colVal)
[ "variables"
, G.unName $ G.unVariable var
]
_ -> return $ toTxtValue colVal
GR.UVSessVar ty sessVar ->
return $ fromResVars ty [ "user", T.toLower sessVar]
GR.UVSQL sqlExp -> return sqlExp
where
fromResVars ty jPath =
flip S.SETyAnn (S.mkTypeAnn ty) $ S.SEOpApp (S.SQLOp "#>>")
[ S.SEQIden $ S.QIden (S.QualIden $ Iden "_subs")
(Iden "result_vars")
, S.SEArray $ map S.SELit jPath
]
-- | Creates a live query operation and if possible, a reusable plan
--
subsOpFromPGAST
:: ( MonadError QErr m
, MonadReader r m
, Has UserInfo r
, MonadIO m
)
=> PGExecCtx
-- ^ to validate arguments
-> GH.GQLReqUnparsed
-- ^ used as part of an identifier in the underlying live query systems
-- to avoid unnecessary load on Postgres where possible
-> [G.VariableDefinition]
-- ^ variable definitions as seen in the subscription, needed in
-- checking whether the subscription can be multiplexed or not
-> (G.Alias, GR.QueryRootFldUnresolved)
-- ^ The alias and the partially processed live query field
-> m (LiveQueryOp, Maybe SubsPlan)
subsOpFromPGAST pgExecCtx reqUnparsed varDefs (fldAls, astUnresolved) = do
userInfo <- asks getter
-- collect the variables (with their types) used inside the subscription
(_, varTypes) <- flip runStateT mempty $ GR.traverseQueryRootFldAST
collectNonNullableVars astUnresolved
-- Can the subscription be multiplexed?
-- Only if all variables are non null and can be prepared
if Set.fromList (Map.keys varTypes) == allVars
then mkMultiplexedOp userInfo varTypes
else mkFallbackOp userInfo
where
allVars = Set.fromList $ map G._vdVariable varDefs
-- multiplexed subscription
mkMultiplexedOp userInfo varTypes = do
(astResolved, annVarVals) <-
flip runStateT mempty $ GR.traverseQueryRootFldAST
toMultiplexedQueryVar astUnresolved
let mxOpCtx = LQM.mkMxOpCtx (userRole userInfo)
(GH._grQuery reqUnparsed) fldAls $
GR.toPGQuery astResolved
-- We need to ensure that the values provided for variables
-- are correct according to Postgres. Without this check
-- an invalid value for a variable for one instance of the
-- subscription will take down the entire multiplexed query
txtEncodedVars <- validateAnnVarValsOnPg pgExecCtx annVarVals
let mxOp = (mxOpCtx, userVars userInfo, txtEncodedVars)
return (LQMultiplexed mxOp, Just $ SubsPlan mxOpCtx varTypes)
-- fallback tx subscription
mkFallbackOp userInfo = do
(astResolved, prepArgs) <-
flip runStateT mempty $ GR.traverseQueryRootFldAST
GR.resolveValPrep astUnresolved
let tx = withUserInfo userInfo $ liftTx $
asSingleRowJsonResp (GR.toPGQuery astResolved) $ toList prepArgs
fallbackOp = LQF.mkFallbackOp userInfo reqUnparsed $ withAlias tx
return (LQFallback fallbackOp, Nothing)
fldAlsT = G.unName $ G.unAlias fldAls
withAlias tx =
encJFromAssocList . pure . (,) fldAlsT <$> tx
-- | Checks if the provided arguments are valid values for their corresponding types.
-- Generates SQL of the format "select 'v1'::t1, 'v2'::t2 ..."
validateAnnVarValsOnPg
:: ( MonadError QErr m
, MonadIO m
)
=> PGExecCtx
-> GV.AnnPGVarVals
-> m TextEncodedVariables
validateAnnVarValsOnPg pgExecCtx annVarVals = do
let valSel = mkValidationSel $ Map.elems annVarVals
Q.Discard _ <- runTx' $ liftTx $
Q.rawQE dataExnErrHandler (Q.fromBuilder $ toSQL valSel) [] False
return $ fmap (txtEncodedPGVal . pstValue) annVarVals
where
mkExtrs = map (flip S.Extractor Nothing . toTxtValue)
mkValidationSel vars =
S.mkSelect { S.selExtr = mkExtrs vars }
runTx' tx = do
res <- liftIO $ runExceptT (runLazyTx' pgExecCtx tx)
liftEither res
-- Explicitly look for the class of errors raised when the format of a value provided
-- for a type is incorrect.
dataExnErrHandler = mkTxErrorHandler (has _PGDataException)
-- | Use the existing plan with new variables and session variables
-- to create a live query operation
subsOpFromPlan
:: ( MonadError QErr m
, MonadIO m
)
=> PGExecCtx
-> UserVars
-> Maybe GH.VariableValues
-> SubsPlan
-> m LiveQueryOp
subsOpFromPlan pgExecCtx usrVars varValsM (SubsPlan mxOpCtx varTypes) = do
annVarVals <- GV.getAnnPGVarVals varTypes varValsM
txtEncodedVars <- validateAnnVarValsOnPg pgExecCtx annVarVals
return $ LQMultiplexed (mxOpCtx, usrVars, txtEncodedVars)
import Hasura.GraphQL.Execute.LiveQuery.Poll
#endif

View File

@ -1,249 +0,0 @@
module Hasura.GraphQL.Execute.LiveQuery.Fallback
( RefetchInterval
, refetchIntervalFromMilli
, FallbackOpts
, mkFallbackOpts
, LiveQueriesState
, initLiveQueriesState
, dumpLiveQueriesState
, FallbackOp
, mkFallbackOp
, LiveQueryId
, addLiveQuery
, removeLiveQuery
) where
import qualified Control.Concurrent.Async as A
import qualified Control.Concurrent.STM as STM
import qualified Data.Aeson as J
import qualified ListT
import qualified StmContainers.Map as STMMap
import Control.Concurrent (threadDelay)
import Hasura.EncJSON
import Hasura.GraphQL.Execute.LiveQuery.Types
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.Prelude
import Hasura.RQL.Types
data LiveQuery
= LiveQuery
{ _lqUser :: !UserInfo
, _lqRequest :: !GQLReqUnparsed
} deriving (Show, Eq, Generic)
instance J.ToJSON LiveQuery where
toJSON (LiveQuery user req) =
J.object [ "user" J..= userVars user
, "request" J..= req
]
instance Hashable LiveQuery
data LQHandler
= LQHandler
-- the tx to be executed
{ _lqhRespTx :: !LazyRespTx
-- previous result
, _lqhPrevRes :: !RespTV
-- the actions that have been run previously
-- we run these if the response changes
, _lqhCurOps :: !Sinks
-- we run these operations regardless
-- and then merge them with current operations
, _lqhNewOps :: !Sinks
}
data FallbackOpts
= FallbackOpts
{ _foRefetchInterval :: !RefetchInterval
} deriving (Show, Eq)
instance J.ToJSON FallbackOpts where
toJSON (FallbackOpts refetchInterval) =
J.object [ "refetch_delay" J..= refetchInterval
]
-- 1 second
defaultRefetchInterval :: RefetchInterval
defaultRefetchInterval =
refetchIntervalFromMilli 1000
mkFallbackOpts
:: Maybe RefetchInterval
-> FallbackOpts
mkFallbackOpts refetchIntervalM =
FallbackOpts
(fromMaybe defaultRefetchInterval refetchIntervalM)
data LiveQueriesState
= LiveQueriesState
{ _lqsOptions :: !FallbackOpts
, _lqsLiveQueryMap :: !LiveQueryMap
}
dumpLiveQueriesState :: LiveQueriesState -> IO J.Value
dumpLiveQueriesState (LiveQueriesState opts lqMap) = do
lqMapJ <- dumpLiveQueryMap lqMap
return $ J.object
[ "options" J..= opts
, "live_queries_map" J..= lqMapJ
]
initLiveQueriesState
:: FallbackOpts
-> STM.STM LiveQueriesState
initLiveQueriesState lqOptions =
LiveQueriesState
lqOptions
<$> STMMap.new
data LiveQueryId
= LiveQueryId
{ _lqiQuery :: !LiveQuery
, _lqiSink :: !SinkId
}
type LiveQueryMap = STMMap.Map LiveQuery (LQHandler, ThreadTM)
dumpLiveQueryMap :: LiveQueryMap -> IO J.Value
dumpLiveQueryMap lqMap =
fmap J.toJSON $ STM.atomically $ do
entries <- ListT.toList $ STMMap.listT lqMap
forM entries $ \(lq, (lqHandler, threadRef)) -> do
prevResHash <- STM.readTVar $ _lqhPrevRes lqHandler
threadId <- A.asyncThreadId <$> STM.readTMVar threadRef
curOps <- toListTMap $ _lqhCurOps lqHandler
newOps <- toListTMap $ _lqhNewOps lqHandler
return $ J.object
[ "query" J..= lq
, "thread_id" J..= show threadId
, "current_ops" J..= map fst curOps
, "new_ops" J..= map fst newOps
, "previous_result_hash" J..= prevResHash
]
removeLiveQuery
:: LiveQueriesState
-- the query and the associated operation
-> LiveQueryId
-> IO ()
removeLiveQuery lqState (LiveQueryId liveQ k) = do
-- clean the handler's state
threadRefM <- STM.atomically $ do
lqHandlerM <- STMMap.lookup liveQ lqMap
maybe (return Nothing) cleanLQHandler lqHandlerM
-- cancel the polling thread
onJust threadRefM A.cancel
where
lqMap = _lqsLiveQueryMap lqState
cleanLQHandler (handler, threadRef) = do
let curOps = _lqhCurOps handler
newOps = _lqhNewOps handler
deleteTMap k curOps
deleteTMap k newOps
cancelPollThread <- (&&)
<$> nullTMap curOps
<*> nullTMap newOps
-- if this happens to be the last operation, take the
-- ref for the polling thread to cancel it
if cancelPollThread then do
STMMap.delete liveQ lqMap
Just <$> STM.takeTMVar threadRef
else return Nothing
-- the transaction associated with this query
type FallbackOp = (LiveQuery, LazyRespTx)
mkFallbackOp
:: UserInfo -> GQLReqUnparsed
-> LazyRespTx -> FallbackOp
mkFallbackOp userInfo req tx =
(LiveQuery userInfo req, tx)
addLiveQuery
:: PGExecCtx
-> LiveQueriesState
-- the query
-> FallbackOp
-- the action to be executed when result changes
-> OnChange
-> IO LiveQueryId
addLiveQuery pgExecCtx lqState (liveQ, respTx) onResultAction= do
sinkId <- newSinkId
-- a handler is returned only when it is newly created
handlerM <- STM.atomically $ do
lqHandlerM <- STMMap.lookup liveQ lqMap
maybe (newHandler sinkId) (addToExistingHandler sinkId) lqHandlerM
-- we can then attach a polling thread if it is new
-- the livequery can only be cancelled after putTMVar
onJust handlerM $ \(handler, pollerThreadTM) -> do
threadRef <- A.async $ forever $ do
pollQuery pgExecCtx handler
threadDelay $ refetchIntervalToMicro refetchInterval
STM.atomically $ STM.putTMVar pollerThreadTM threadRef
return $ LiveQueryId liveQ sinkId
where
LiveQueriesState lqOpts lqMap = lqState
FallbackOpts refetchInterval = lqOpts
addToExistingHandler sinkId (handler, _) = do
insertTMap onResultAction sinkId $ _lqhNewOps handler
return Nothing
newHandler sinkId = do
handler <- LQHandler
<$> return respTx
<*> STM.newTVar Nothing
<*> newTMap
<*> newTMap
insertTMap onResultAction sinkId $ _lqhNewOps handler
asyncRefTM <- STM.newEmptyTMVar
STMMap.insert (handler, asyncRefTM) liveQ lqMap
return $ Just (handler, asyncRefTM)
pollQuery
:: PGExecCtx
-> LQHandler
-> IO ()
pollQuery pgExecCtx (LQHandler respTx respTV curOpsTV newOpsTV) = do
resOrErr <- runExceptT $ runLazyTx pgExecCtx respTx
let (resp, respHashM) = case encJToLBS <$> resOrErr of
Left e -> (GQExecError [encodeGQErr False e], Nothing)
Right lbs -> (GQSuccess lbs, Just $ mkRespHash lbs)
-- extract the current and new operations
(curOps, newOps) <- STM.atomically $ do
curOpsL <- toListTMap curOpsTV
newOpsL <- toListTMap newOpsTV
forM_ newOpsL $ \(k, action) -> insertTMap action k curOpsTV
resetTMap newOpsTV
return (curOpsL, newOpsL)
runOperations resp newOps
-- write to the current websockets if needed
prevRespHashM <- STM.readTVarIO respTV
when (isExecError resp || respHashM /= prevRespHashM) $ do
runOperations resp curOps
STM.atomically $ STM.writeTVar respTV respHashM
where
runOperation resp action = action resp
runOperations resp =
void . A.mapConcurrently (runOperation resp . snd)

View File

@ -0,0 +1,37 @@
module Hasura.GraphQL.Execute.LiveQuery.Options
( LiveQueriesOptions(..)
, BatchSize(..)
, RefetchInterval(..)
, mkLiveQueriesOptions
) where
import Hasura.Prelude
import qualified Data.Aeson as J
import Data.Time.Clock (DiffTime)
import Data.Time.Clock.Units (seconds)
data LiveQueriesOptions
= LiveQueriesOptions
{ _lqoBatchSize :: !BatchSize
, _lqoRefetchInterval :: !RefetchInterval
} deriving (Show, Eq)
mkLiveQueriesOptions :: Maybe BatchSize -> Maybe RefetchInterval -> LiveQueriesOptions
mkLiveQueriesOptions batchSize refetchInterval = LiveQueriesOptions
{ _lqoBatchSize = fromMaybe (BatchSize 100) batchSize
, _lqoRefetchInterval = fromMaybe (RefetchInterval $ seconds 1) refetchInterval
}
instance J.ToJSON LiveQueriesOptions where
toJSON (LiveQueriesOptions batchSize refetchInterval) =
J.object [ "batch_size" J..= batchSize
, "refetch_delay" J..= refetchInterval
]
newtype BatchSize = BatchSize { unBatchSize :: Int }
deriving (Show, Eq, J.ToJSON)
newtype RefetchInterval = RefetchInterval { unRefetchInterval :: DiffTime }
deriving (Show, Eq, J.ToJSON)

View File

@ -0,0 +1,198 @@
-- | Construction of multiplexed live query plans; see "Hasura.GraphQL.Execute.LiveQuery" for
-- details.
module Hasura.GraphQL.Execute.LiveQuery.Plan
( MultiplexedQuery
, mkMultiplexedQuery
, unMultiplexedQuery
, toMultiplexedQueryVar
, LiveQueryPlan(..)
, ParameterizedLiveQueryPlan(..)
, ReusableLiveQueryPlan
, ValidatedQueryVariables
, buildLiveQueryPlan
, reuseLiveQueryPlan
) where
import Hasura.Prelude
import qualified Data.Aeson.Casing as J
import qualified Data.Aeson.Extended as J
import qualified Data.Aeson.TH as J
import qualified Data.HashMap.Strict as Map
import qualified Data.Text as T
import qualified Database.PG.Query as Q
import qualified Language.GraphQL.Draft.Syntax as G
import Control.Lens
import Data.Has
import qualified Hasura.GraphQL.Resolve as GR
import qualified Hasura.GraphQL.Transport.HTTP.Protocol as GH
import qualified Hasura.GraphQL.Validate as GV
import qualified Hasura.SQL.DML as S
import Hasura.Db
import Hasura.RQL.Types
import Hasura.SQL.Error
import Hasura.SQL.Types
import Hasura.SQL.Value
-- -------------------------------------------------------------------------------------------------
-- Multiplexed queries
newtype MultiplexedQuery = MultiplexedQuery { unMultiplexedQuery :: Q.Query }
deriving (Show, Eq, Hashable, J.ToJSON)
mkMultiplexedQuery :: Q.Query -> MultiplexedQuery
mkMultiplexedQuery baseQuery =
MultiplexedQuery . Q.fromText $ foldMap Q.getQueryText [queryPrefix, baseQuery, querySuffix]
where
queryPrefix =
[Q.sql|
select
_subs.result_id, _fld_resp.root as result
from
unnest(
$1::uuid[], $2::json[]
) _subs (result_id, result_vars)
left outer join lateral
(
|]
querySuffix =
[Q.sql|
) _fld_resp ON ('true')
|]
-- | converts the partial unresolved value containing
-- variables, session variables to an SQL expression
-- referring correctly to the values from '_subs' temporary table
-- The variables are at _subs.result_vars.variables and
-- session variables at _subs.result_vars.user
toMultiplexedQueryVar :: (MonadState GV.ReusableVariableValues m) => GR.UnresolvedVal -> m S.SQLExp
toMultiplexedQueryVar = \case
GR.UVPG annPGVal ->
let GR.AnnPGVal varM _ colVal = annPGVal
in case varM of
Just var -> do
modify $ Map.insert var colVal
pure $ fromResVars (PGTypeScalar $ pstType colVal)
["variables", G.unName $ G.unVariable var]
Nothing -> return $ toTxtValue colVal
GR.UVSessVar ty sessVar -> pure $ fromResVars ty ["user", T.toLower sessVar]
GR.UVSQL sqlExp -> pure sqlExp
where
fromResVars ty jPath =
flip S.SETyAnn (S.mkTypeAnn ty) $ S.SEOpApp (S.SQLOp "#>>")
[ S.SEQIden $ S.QIden (S.QualIden $ Iden "_subs") (Iden "result_vars")
, S.SEArray $ map S.SELit jPath
]
-- -------------------------------------------------------------------------------------------------
-- Variable validation
-- | When running multiplexed queries, we have to be especially careful about user input, since
-- invalid values will cause the query to fail, causing collateral damage for anyone else
-- multiplexed into the same query. Therefore, we pre-validate variables against Postgres by
-- executing a no-op query of the shape
--
-- > SELECT 'v1'::t1, 'v2'::t2, ..., 'vn'::tn
--
-- so if any variable values are invalid, the error will be caught early.
newtype ValidatedQueryVariables = ValidatedQueryVariables (Map.HashMap G.Variable TxtEncodedPGVal)
deriving (Show, Eq, Hashable, J.ToJSON)
-- | Checks if the provided arguments are valid values for their corresponding types.
-- Generates SQL of the format "select 'v1'::t1, 'v2'::t2 ..."
validateQueryVariables
:: (MonadError QErr m, MonadIO m)
=> PGExecCtx
-> GV.ReusableVariableValues
-> m ValidatedQueryVariables
validateQueryVariables pgExecCtx annVarVals = do
let valSel = mkValidationSel $ Map.elems annVarVals
Q.Discard () <- runTx' $ liftTx $
Q.rawQE dataExnErrHandler (Q.fromBuilder $ toSQL valSel) [] False
pure . ValidatedQueryVariables $ fmap (txtEncodedPGVal . pstValue) annVarVals
where
mkExtrs = map (flip S.Extractor Nothing . toTxtValue)
mkValidationSel vars =
S.mkSelect { S.selExtr = mkExtrs vars }
runTx' tx = do
res <- liftIO $ runExceptT (runLazyTx' pgExecCtx tx)
liftEither res
-- Explicitly look for the class of errors raised when the format of a value provided
-- for a type is incorrect.
dataExnErrHandler = mkTxErrorHandler (has _PGDataException)
-- -------------------------------------------------------------------------------------------------
-- Live query plans
-- | A self-contained, ready-to-execute live query plan. Contains enough information to find an
-- existing poller that this can be added to /or/ to create a new poller if necessary.
data LiveQueryPlan
= LiveQueryPlan
{ _lqpParameterizedPlan :: !ParameterizedLiveQueryPlan
, _lqpSessionVariables :: !UserVars
, _lqpQueryVariables :: !ValidatedQueryVariables
}
data ParameterizedLiveQueryPlan
= ParameterizedLiveQueryPlan
{ _plqpRole :: !RoleName
, _plqpAlias :: !G.Alias
, _plqpQuery :: !MultiplexedQuery
} deriving (Show)
$(J.deriveToJSON (J.aesonDrop 4 J.snakeCase) ''ParameterizedLiveQueryPlan)
data ReusableLiveQueryPlan
= ReusableLiveQueryPlan
{ _rlqpParameterizedPlan :: !ParameterizedLiveQueryPlan
, _rlqpQueryVariableTypes :: !GV.ReusableVariableTypes
} deriving (Show)
$(J.deriveToJSON (J.aesonDrop 4 J.snakeCase) ''ReusableLiveQueryPlan)
-- | Constructs a new execution plan for a live query and returns a reusable version of the plan if
-- possible.
buildLiveQueryPlan
:: ( MonadError QErr m
, MonadReader r m
, Has UserInfo r
, MonadIO m
)
=> PGExecCtx
-> G.Alias
-> GR.QueryRootFldUnresolved
-> Maybe GV.ReusableVariableTypes
-> m (LiveQueryPlan, Maybe ReusableLiveQueryPlan)
buildLiveQueryPlan pgExecCtx fieldAlias astUnresolved varTypes = do
userInfo <- asks getter
(astResolved, annVarVals) <- flip runStateT mempty $
GR.traverseQueryRootFldAST toMultiplexedQueryVar astUnresolved
let pgQuery = mkMultiplexedQuery $ GR.toPGQuery astResolved
parameterizedPlan = ParameterizedLiveQueryPlan (userRole userInfo) fieldAlias pgQuery
-- We need to ensure that the values provided for variables
-- are correct according to Postgres. Without this check
-- an invalid value for a variable for one instance of the
-- subscription will take down the entire multiplexed query
validatedVars <- validateQueryVariables pgExecCtx annVarVals
let plan = LiveQueryPlan parameterizedPlan (userVars userInfo) validatedVars
reusablePlan = ReusableLiveQueryPlan parameterizedPlan <$> varTypes
pure (plan, reusablePlan)
reuseLiveQueryPlan
:: (MonadError QErr m, MonadIO m)
=> PGExecCtx
-> UserVars
-> Maybe GH.VariableValues
-> ReusableLiveQueryPlan
-> m LiveQueryPlan
reuseLiveQueryPlan pgExecCtx sessionVars queryVars reusablePlan = do
let ReusableLiveQueryPlan parameterizedPlan varTypes = reusablePlan
annVarVals <- GV.validateVariablesForReuse varTypes queryVars
validatedVars <- validateQueryVariables pgExecCtx annVarVals
pure $ LiveQueryPlan parameterizedPlan sessionVars validatedVars

View File

@ -0,0 +1,386 @@
-- | Multiplexed live query poller threads; see "Hasura.GraphQL.Execute.LiveQuery" for details.
module Hasura.GraphQL.Execute.LiveQuery.Poll (
-- * Pollers
Poller(..)
, PollerIOState(..)
, pollQuery
, PollerKey(..)
, PollerMap
, dumpPollerMap
, RefetchMetrics
, initRefetchMetrics
-- * Cohorts
, Cohort(..)
, CohortId
, newCohortId
, CohortVariables(..)
, CohortKey
, CohortMap
-- * Subscribers
, Subscriber(..)
, SubscriberId
, newSinkId
, SubscriberMap
, OnChange
) where
import Hasura.Prelude
import qualified Control.Concurrent.Async as A
import qualified Control.Concurrent.STM as STM
import qualified Crypto.Hash as CH
import qualified Data.Aeson.Extended as J
import qualified Data.ByteString as BS
import qualified Data.HashMap.Strict as Map
import qualified Data.Time.Clock as Clock
import qualified Data.UUID as UUID
import qualified Data.UUID.V4 as UUID
import qualified Database.PG.Query as Q
import qualified Language.GraphQL.Draft.Syntax as G
import qualified ListT
import qualified StmContainers.Map as STMMap
import qualified System.Metrics.Distribution as Metrics
-- remove these when array encoding is merged
import qualified Database.PG.Query.PTI as PTI
import qualified PostgreSQL.Binary.Encoding as PE
import Data.List.Split (chunksOf)
import qualified Hasura.GraphQL.Execute.LiveQuery.TMap as TMap
import Hasura.Db
import Hasura.EncJSON
import Hasura.GraphQL.Execute.LiveQuery.Options
import Hasura.GraphQL.Execute.LiveQuery.Plan
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.RQL.Types
-- -------------------------------------------------------------------------------------------------
-- Subscribers
data Subscriber
= Subscriber
{ _sRootAlias :: !G.Alias
, _sOnChangeCallback :: !OnChange
}
type OnChange = GQResponse -> IO ()
newtype SubscriberId = SubscriberId { _unSinkId :: UUID.UUID }
deriving (Show, Eq, Hashable, J.ToJSON)
newSinkId :: IO SubscriberId
newSinkId = SubscriberId <$> UUID.nextRandom
type SubscriberMap = TMap.TMap SubscriberId Subscriber
-- -------------------------------------------------------------------------------------------------
-- Cohorts
-- | A batched group of 'Subscriber's who are not only listening to the same query but also have
-- identical session and query variables. Each result pushed to a 'Cohort' is forwarded along to
-- each of its 'Subscriber's.
--
-- In SQL, each 'Cohort' corresponds to a single row in the laterally-joined @_subs@ table (and
-- therefore a single row in the query result).
data Cohort
= Cohort
{ _cCohortId :: !CohortId
-- ^ a unique identifier used to identify the cohort in the generated query
, _cPreviousResponse :: !(STM.TVar (Maybe ResponseHash))
-- ^ a hash of the previous query result, if any, used to determine if we need to push an updated
-- result to the subscribers or not
, _cExistingSubscribers :: !SubscriberMap
-- ^ the subscribers weve already pushed a result to; we push new results to them iff the
-- response changes
, _cNewSubscribers :: !SubscriberMap
-- ^ subscribers we havent yet pushed any results to; we push results to them regardless if the
-- result changed, then merge them in the map of existing subscribers
}
newtype CohortId = CohortId { unCohortId :: UUID.UUID }
deriving (Show, Eq, Hashable, Q.FromCol)
newCohortId :: IO CohortId
newCohortId = CohortId <$> UUID.nextRandom
data CohortVariables
= CohortVariables
{ _cvSessionVariables :: !UserVars
, _cvQueryVariables :: !ValidatedQueryVariables
} deriving (Show, Eq, Generic)
instance Hashable CohortVariables
instance J.ToJSON CohortVariables where
toJSON (CohortVariables sessionVars queryVars) =
J.object ["user" J..= sessionVars, "variables" J..= queryVars]
-- | A hash used to determine if the result changed without having to keep the entire result in
-- memory. Using a cryptographic hash ensures that a hash collision is almost impossible: with 256
-- bits, even if a subscription changes once per second for an entire year, the probability of a
-- hash collision is ~4.294417×10-63. We use Blake2b because it is faster than SHA-256
newtype ResponseHash = ResponseHash { unResponseHash :: CH.Digest CH.Blake2b_256 }
deriving (Show, Eq)
instance J.ToJSON ResponseHash where
toJSON = J.toJSON . show . unResponseHash
mkRespHash :: BS.ByteString -> ResponseHash
mkRespHash = ResponseHash . CH.hash
-- | A key we use to determine if two 'Subscriber's belong in the same 'Cohort' (assuming they
-- already meet the criteria to be in the same 'Poller'). Note the distinction between this and
-- 'CohortId'; the latter is a completely synthetic key used only to identify the cohort in the
-- generated SQL query.
type CohortKey = CohortVariables
type CohortMap = TMap.TMap CohortKey Cohort
dumpCohortMap :: CohortMap -> IO J.Value
dumpCohortMap cohortMap = do
cohorts <- STM.atomically $ TMap.toList cohortMap
fmap J.toJSON . forM cohorts $ \(CohortVariables usrVars varVals, cohort) -> do
cohortJ <- dumpCohort cohort
return $ J.object
[ "session_vars" J..= usrVars
, "variable_values" J..= varVals
, "cohort" J..= cohortJ
]
where
dumpCohort (Cohort respId respTV curOps newOps) =
STM.atomically $ do
prevResHash <- STM.readTVar respTV
curOpIds <- TMap.toList curOps
newOpIds <- TMap.toList newOps
return $ J.object
[ "resp_id" J..= unCohortId respId
, "current_ops" J..= map fst curOpIds
, "new_ops" J..= map fst newOpIds
, "previous_result_hash" J..= prevResHash
]
data CohortSnapshot
= CohortSnapshot
{ _csVariables :: !CohortVariables
, _csPreviousResponse :: !(STM.TVar (Maybe ResponseHash))
, _csExistingSubscribers :: ![Subscriber]
, _csNewSubscribers :: ![Subscriber]
}
pushResultToCohort
:: GQResult EncJSON
-- ^ a response that still needs to be wrapped with each 'Subscriber's root 'G.Alias'
-> Maybe ResponseHash
-> CohortSnapshot
-> IO ()
pushResultToCohort result respHashM cohortSnapshot = do
prevRespHashM <- STM.readTVarIO respRef
-- write to the current websockets if needed
sinks <-
if isExecError result || respHashM /= prevRespHashM
then do
STM.atomically $ STM.writeTVar respRef respHashM
return (newSinks <> curSinks)
else
return newSinks
pushResultToSubscribers sinks
where
CohortSnapshot _ respRef curSinks newSinks = cohortSnapshot
pushResultToSubscribers = A.mapConcurrently_ $ \(Subscriber alias action) ->
let aliasText = G.unName $ G.unAlias alias
wrapWithAlias response =
encJToLBS $ encJFromAssocList [(aliasText, response)]
in action (wrapWithAlias <$> result)
-- -------------------------------------------------------------------------------------------------
-- Pollers
-- | A unique, multiplexed query. Each 'Poller' has its own polling thread that periodically polls
-- Postgres and pushes results to each of its listening 'Cohort's.
--
-- In SQL, an 'Poller' corresponds to a single, multiplexed query, though in practice, 'Poller's
-- with large numbers of 'Cohort's are batched into multiple concurrent queries for performance
-- reasons.
data Poller
= Poller
{ _pCohorts :: !CohortMap
, _pIOState :: !(STM.TMVar PollerIOState)
-- ^ This is in a separate 'STM.TMVar' because its important that we are able to construct
-- 'Poller' values in 'STM.STM' --- we need the insertion into the 'PollerMap' to be atomic to
-- ensure that we dont accidentally create two for the same query due to a race. However, we
-- cant spawn the worker thread or create the metrics store in 'STM.STM', so we insert it into
-- the 'Poller' only after were certain we wont create any duplicates.
}
data PollerIOState
= PollerIOState
{ _pThread :: !(A.Async ())
-- ^ a handle on the pollers worker thread that can be used to 'A.cancel' it if all its cohorts
-- stop listening
, _pMetrics :: !RefetchMetrics
}
data RefetchMetrics
= RefetchMetrics
{ _rmSnapshot :: !Metrics.Distribution
, _rmPush :: !Metrics.Distribution
, _rmQuery :: !Metrics.Distribution
, _rmTotal :: !Metrics.Distribution
}
initRefetchMetrics :: IO RefetchMetrics
initRefetchMetrics = RefetchMetrics <$> Metrics.new <*> Metrics.new <*> Metrics.new <*> Metrics.new
data PollerKey
-- we don't need operation name here as a subscription will
-- only have a single top level field
= PollerKey
{ _lgRole :: !RoleName
, _lgQuery :: !MultiplexedQuery
} deriving (Show, Eq, Generic)
instance Hashable PollerKey
instance J.ToJSON PollerKey where
toJSON (PollerKey role query) =
J.object [ "role" J..= role
, "query" J..= query
]
type PollerMap = STMMap.Map PollerKey Poller
dumpPollerMap :: Bool -> PollerMap -> IO J.Value
dumpPollerMap extended lqMap =
fmap J.toJSON $ do
entries <- STM.atomically $ ListT.toList $ STMMap.listT lqMap
forM entries $ \(PollerKey role query, Poller cohortsMap ioState) -> do
PollerIOState threadId metrics <- STM.atomically $ STM.readTMVar ioState
metricsJ <- dumpRefetchMetrics metrics
cohortsJ <-
if extended
then Just <$> dumpCohortMap cohortsMap
else return Nothing
return $ J.object
[ "role" J..= role
, "thread_id" J..= show (A.asyncThreadId threadId)
, "multiplexed_query" J..= query
, "cohorts" J..= cohortsJ
, "metrics" J..= metricsJ
]
where
dumpRefetchMetrics metrics = do
snapshotS <- Metrics.read $ _rmSnapshot metrics
queryS <- Metrics.read $ _rmQuery metrics
pushS <- Metrics.read $ _rmPush metrics
totalS <- Metrics.read $ _rmTotal metrics
return $ J.object
[ "snapshot" J..= dumpStats snapshotS
, "query" J..= dumpStats queryS
, "push" J..= dumpStats pushS
, "total" J..= dumpStats totalS
]
dumpStats stats =
J.object
[ "mean" J..= Metrics.mean stats
, "variance" J..= Metrics.variance stats
, "count" J..= Metrics.count stats
, "min" J..= Metrics.min stats
, "max" J..= Metrics.max stats
]
newtype CohortIdArray = CohortIdArray { unCohortIdArray :: [CohortId] }
deriving (Show, Eq)
instance Q.ToPrepArg CohortIdArray where
toPrepVal (CohortIdArray l) = Q.toPrepValHelper PTI.unknown encoder $ map unCohortId l
where
encoder = PE.array 2950 . PE.dimensionArray foldl' (PE.encodingArray . PE.uuid)
newtype CohortVariablesArray = CohortVariablesArray { unCohortVariablesArray :: [CohortVariables] }
deriving (Show, Eq)
instance Q.ToPrepArg CohortVariablesArray where
toPrepVal (CohortVariablesArray l) =
Q.toPrepValHelper PTI.unknown encoder (map J.toJSON l)
where
encoder = PE.array 114 . PE.dimensionArray foldl' (PE.encodingArray . PE.json_ast)
-- | Where the magic happens: the top-level action run periodically by each active 'Poller'.
pollQuery
:: RefetchMetrics
-> BatchSize
-> PGExecCtx
-> MultiplexedQuery
-> Poller
-> IO ()
pollQuery metrics batchSize pgExecCtx pgQuery handler = do
procInit <- Clock.getCurrentTime
-- get a snapshot of all the cohorts
-- this need not be done in a transaction
cohorts <- STM.atomically $ TMap.toList cohortMap
cohortSnapshotMap <- Map.fromList <$> mapM (STM.atomically . getCohortSnapshot) cohorts
let queryVarsBatches = chunksOf (unBatchSize batchSize) $ getQueryVars cohortSnapshotMap
snapshotFinish <- Clock.getCurrentTime
Metrics.add (_rmSnapshot metrics) $
realToFrac $ Clock.diffUTCTime snapshotFinish procInit
flip A.mapConcurrently_ queryVarsBatches $ \queryVars -> do
queryInit <- Clock.getCurrentTime
mxRes <- runExceptT . runLazyTx' pgExecCtx . liftTx $ Q.listQE defaultTxErrorHandler
(unMultiplexedQuery pgQuery) (mkMxQueryPrepArgs queryVars) True
queryFinish <- Clock.getCurrentTime
Metrics.add (_rmQuery metrics) $
realToFrac $ Clock.diffUTCTime queryFinish queryInit
let operations = getCohortOperations cohortSnapshotMap mxRes
-- concurrently push each unique result
A.mapConcurrently_ (uncurry3 pushResultToCohort) operations
pushFinish <- Clock.getCurrentTime
Metrics.add (_rmPush metrics) $
realToFrac $ Clock.diffUTCTime pushFinish queryFinish
procFinish <- Clock.getCurrentTime
Metrics.add (_rmTotal metrics) $
realToFrac $ Clock.diffUTCTime procFinish procInit
where
Poller cohortMap _ = handler
uncurry3 :: (a -> b -> c -> d) -> (a, b, c) -> d
uncurry3 f (a, b, c) = f a b c
getCohortSnapshot (cohortVars, handlerC) = do
let Cohort resId respRef curOpsTV newOpsTV = handlerC
curOpsL <- TMap.toList curOpsTV
newOpsL <- TMap.toList newOpsTV
forM_ newOpsL $ \(k, action) -> TMap.insert action k curOpsTV
TMap.reset newOpsTV
let cohortSnapshot = CohortSnapshot cohortVars respRef (map snd curOpsL) (map snd newOpsL)
return (resId, cohortSnapshot)
getQueryVars cohortSnapshotMap =
Map.toList $ fmap _csVariables cohortSnapshotMap
mkMxQueryPrepArgs l =
let (respIdL, respVarL) = unzip l
in (CohortIdArray respIdL, CohortVariablesArray respVarL)
getCohortOperations cohortSnapshotMap = \case
Left e ->
-- TODO: this is internal error
let resp = GQExecError [encodeGQErr False e]
in [ (resp, Nothing, snapshot)
| (_, snapshot) <- Map.toList cohortSnapshotMap
]
Right responses ->
flip mapMaybe responses $ \(respId, result) ->
-- TODO: change it to use bytestrings directly
let -- No reason to use lazy bytestrings here, since (1) we fetch the entire result set
-- from Postgres strictly and (2) even if we didnt, hashing will have to force the
-- whole thing anyway.
respHash = mkRespHash (encJToBS result)
in (GQSuccess result, Just respHash,) <$> Map.lookup respId cohortSnapshotMap

View File

@ -0,0 +1,147 @@
-- | Top-level management of live query poller threads. The implementation of the polling itself is
-- in "Hasura.GraphQL.Execute.LiveQuery.Poll". See "Hasura.GraphQL.Execute.LiveQuery" for high-level
-- details.
module Hasura.GraphQL.Execute.LiveQuery.State
( LiveQueriesState
, initLiveQueriesState
, dumpLiveQueriesState
, LiveQueryId
, addLiveQuery
, removeLiveQuery
) where
import Hasura.Prelude
import qualified Control.Concurrent.Async as A
import qualified Control.Concurrent.STM as STM
import qualified Data.Aeson.Extended as J
import qualified StmContainers.Map as STMMap
import Control.Concurrent.Extended (threadDelay)
import qualified Hasura.GraphQL.Execute.LiveQuery.TMap as TMap
import Hasura.Db
import Hasura.GraphQL.Execute.LiveQuery.Options
import Hasura.GraphQL.Execute.LiveQuery.Plan
import Hasura.GraphQL.Execute.LiveQuery.Poll
-- | The top-level datatype that holds the state for all active live queries.
data LiveQueriesState
= LiveQueriesState
{ _lqsOptions :: !LiveQueriesOptions
, _lqsPGExecTx :: !PGExecCtx
, _lqsLiveQueryMap :: !PollerMap
}
initLiveQueriesState :: LiveQueriesOptions -> PGExecCtx -> IO LiveQueriesState
initLiveQueriesState options pgCtx = LiveQueriesState options pgCtx <$> STMMap.newIO
dumpLiveQueriesState :: Bool -> LiveQueriesState -> IO J.Value
dumpLiveQueriesState extended (LiveQueriesState opts _ lqMap) = do
lqMapJ <- dumpPollerMap extended lqMap
return $ J.object
[ "options" J..= opts
, "live_queries_map" J..= lqMapJ
]
data LiveQueryId
= LiveQueryId
{ _lqiPoller :: !PollerKey
, _lqiCohort :: !CohortKey
, _lqiSubscriber :: !SubscriberId
}
addLiveQuery
:: LiveQueriesState
-> LiveQueryPlan
-> OnChange
-- ^ the action to be executed when result changes
-> IO LiveQueryId
addLiveQuery lqState plan onResultAction = do
responseId <- newCohortId
sinkId <- newSinkId
-- a handler is returned only when it is newly created
handlerM <- STM.atomically $ do
handlerM <- STMMap.lookup handlerId lqMap
case handlerM of
Just handler -> do
cohortM <- TMap.lookup cohortId $ _pCohorts handler
case cohortM of
Just cohort -> addToCohort sinkId cohort
Nothing -> addToPoller sinkId responseId handler
return Nothing
Nothing -> do
poller <- newPoller
addToPoller sinkId responseId poller
STMMap.insert poller handlerId lqMap
return $ Just poller
-- we can then attach a polling thread if it is new
-- the livequery can only be cancelled after putTMVar
onJust handlerM $ \handler -> do
metrics <- initRefetchMetrics
threadRef <- A.async $ forever $ do
pollQuery metrics batchSize pgExecCtx query handler
threadDelay $ unRefetchInterval refetchInterval
STM.atomically $ STM.putTMVar (_pIOState handler) (PollerIOState threadRef metrics)
pure $ LiveQueryId handlerId cohortId sinkId
where
LiveQueriesState lqOpts pgExecCtx lqMap = lqState
LiveQueriesOptions batchSize refetchInterval = lqOpts
LiveQueryPlan (ParameterizedLiveQueryPlan role alias query) sessionVars queryVars = plan
handlerId = PollerKey role query
cohortId = CohortVariables sessionVars queryVars
addToCohort sinkId handlerC =
TMap.insert (Subscriber alias onResultAction) sinkId $ _cNewSubscribers handlerC
addToPoller sinkId responseId handler = do
newCohort <- Cohort responseId <$> STM.newTVar Nothing <*> TMap.new <*> TMap.new
addToCohort sinkId newCohort
TMap.insert newCohort cohortId $ _pCohorts handler
newPoller = Poller <$> TMap.new <*> STM.newEmptyTMVar
removeLiveQuery
:: LiveQueriesState
-- the query and the associated operation
-> LiveQueryId
-> IO ()
removeLiveQuery lqState (LiveQueryId handlerId cohortId sinkId) = do
threadRef <- STM.atomically $ do
detM <- getQueryDet
fmap join $ forM detM $ \(Poller cohorts ioState, cohort) ->
cleanHandlerC cohorts ioState cohort
traverse_ A.cancel threadRef
where
lqMap = _lqsLiveQueryMap lqState
getQueryDet = do
pollerM <- STMMap.lookup handlerId lqMap
fmap join $ forM pollerM $ \poller -> do
cohortM <- TMap.lookup cohortId (_pCohorts poller)
return $ (poller,) <$> cohortM
cleanHandlerC cohortMap ioState handlerC = do
let curOps = _cExistingSubscribers handlerC
newOps = _cNewSubscribers handlerC
TMap.delete sinkId curOps
TMap.delete sinkId newOps
cohortIsEmpty <- (&&)
<$> TMap.null curOps
<*> TMap.null newOps
when cohortIsEmpty $ TMap.delete cohortId cohortMap
handlerIsEmpty <- TMap.null cohortMap
-- when there is no need for handler
-- i.e, this happens to be the last operation, take the
-- ref for the polling thread to cancel it
if handlerIsEmpty
then do
STMMap.delete handlerId lqMap
fmap _pThread <$> STM.tryReadTMVar ioState
else return Nothing

View File

@ -0,0 +1,42 @@
module Hasura.GraphQL.Execute.LiveQuery.TMap
( TMap
, new
, reset
, null
, lookup
, insert
, delete
, toList
) where
import Hasura.Prelude hiding (lookup, null, toList)
import qualified Data.HashMap.Strict as Map
import Control.Concurrent.STM
-- | A coarse-grained transactional map implemented by simply wrapping a 'Map.HashMap' in a 'TVar'.
-- Compared to "StmContainers.Map", this provides much faster iteration over the elements at the
-- cost of significantly increased contention on writes.
newtype TMap k v = TMap { unTMap :: TVar (Map.HashMap k v) }
new :: STM (TMap k v)
new = TMap <$> newTVar Map.empty
reset :: TMap k v -> STM ()
reset = flip writeTVar Map.empty . unTMap
null :: TMap k v -> STM Bool
null = fmap Map.null . readTVar . unTMap
lookup :: (Eq k, Hashable k) => k -> TMap k v -> STM (Maybe v)
lookup k = fmap (Map.lookup k) . readTVar . unTMap
insert :: (Eq k, Hashable k) => v -> k -> TMap k v -> STM ()
insert v k mapTv = modifyTVar' (unTMap mapTv) $ Map.insert k v
delete :: (Eq k, Hashable k) => k -> TMap k v -> STM ()
delete k mapTv = modifyTVar' (unTMap mapTv) $ Map.delete k
toList :: TMap k v -> STM [(k, v)]
toList = fmap Map.toList . readTVar . unTMap

View File

@ -1,115 +0,0 @@
module Hasura.GraphQL.Execute.LiveQuery.Types
( OnChange
, ThreadTM
, SinkId
, newSinkId
, Sinks
, RespHash
, mkRespHash
, RespTV
, RefetchInterval
, refetchIntervalFromMilli
, refetchIntervalToMicro
, TMap
, newTMap
, resetTMap
, nullTMap
, insertTMap
, deleteTMap
, lookupTMap
, toListTMap
) where
import Data.Word (Word32)
import qualified Control.Concurrent.Async as A
import qualified Control.Concurrent.STM as STM
import qualified Crypto.Hash as CH
import qualified Data.Aeson as J
import qualified Data.ByteString.Lazy as LBS
import qualified Data.HashMap.Strict as Map
import qualified Data.UUID as UUID
import qualified Data.UUID.V4 as UUID
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.Prelude
type OnChange = GQResp -> IO ()
type ThreadTM = STM.TMVar (A.Async ())
-- a cryptographic hash should ensure that
-- a hash collision is almost improbable
-- Blake2b because it is faster than Sha256
-- With 256 bits, and 86400 * 365 (a subscription open for 365 days)
-- there is ~ 4.294417×10-63 chance of a hash collision.
newtype RespHash
= RespHash {unRespHash :: CH.Digest CH.Blake2b_256}
deriving (Show, Eq)
instance J.ToJSON RespHash where
toJSON = J.toJSON . show . unRespHash
mkRespHash :: LBS.ByteString -> RespHash
mkRespHash = RespHash . CH.hashlazy
type RespTV = STM.TVar (Maybe RespHash)
newtype RefetchInterval
= RefetchInterval {unRefetchInterval :: Word32}
deriving (Show, Eq, J.ToJSON)
refetchIntervalFromMilli :: Word32 -> RefetchInterval
refetchIntervalFromMilli = RefetchInterval
refetchIntervalToMicro :: RefetchInterval -> Int
refetchIntervalToMicro ri = fromIntegral $ 1000 * unRefetchInterval ri
-- compared to stm.stmmap, this provides a much faster
-- iteration over the elements at the cost of slower
-- concurrent insertions
newtype TMap k v
= TMap {unTMap :: STM.TVar (Map.HashMap k v)}
newTMap :: STM.STM (TMap k v)
newTMap =
TMap <$> STM.newTVar Map.empty
resetTMap :: TMap k v -> STM.STM ()
resetTMap =
flip STM.writeTVar Map.empty . unTMap
nullTMap :: TMap k v -> STM.STM Bool
nullTMap =
fmap Map.null . STM.readTVar . unTMap
insertTMap :: (Eq k, Hashable k) => v -> k -> TMap k v -> STM.STM ()
insertTMap v k mapTv =
STM.modifyTVar' (unTMap mapTv) $ Map.insert k v
deleteTMap :: (Eq k, Hashable k) => k -> TMap k v -> STM.STM ()
deleteTMap k mapTv =
STM.modifyTVar' (unTMap mapTv) $ Map.delete k
lookupTMap :: (Eq k, Hashable k) => k -> TMap k v -> STM.STM (Maybe v)
lookupTMap k =
fmap (Map.lookup k) . STM.readTVar . unTMap
toListTMap :: TMap k v -> STM.STM [(k, v)]
toListTMap =
fmap Map.toList . STM.readTVar . unTMap
newtype SinkId
= SinkId {_unSinkId :: UUID.UUID}
deriving (Show, Eq, Hashable)
newSinkId :: IO SinkId
newSinkId = SinkId <$> UUID.nextRandom
instance J.ToJSON SinkId where
toJSON = J.toJSON . show
type Sinks = TMap SinkId OnChange

View File

@ -44,7 +44,7 @@ newtype PlanCache
data ReusablePlan
= RPQuery !EQ.ReusableQueryPlan
| RPSubs !LQ.SubsPlan
| RPSubs !LQ.ReusableLiveQueryPlan
instance J.ToJSON ReusablePlan where
toJSON = \case

View File

@ -12,7 +12,6 @@ import qualified Data.Aeson as J
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as LBS
import qualified Data.HashMap.Strict as Map
import qualified Data.HashSet as Set
import qualified Data.IntMap as IntMap
import qualified Data.TByteString as TBS
import qualified Data.Text as T
@ -34,7 +33,7 @@ import Hasura.RQL.Types
import Hasura.SQL.Types
import Hasura.SQL.Value
type PlanVariables = Map.HashMap G.Variable (Int, PGColumnType)
type PlanVariables = Map.HashMap G.Variable Int
type PrepArgMap = IntMap.IntMap Q.PrepArg
data PGPlan
@ -63,18 +62,12 @@ instance J.ToJSON RootFieldPlan where
RFPRaw encJson -> J.toJSON $ TBS.fromBS encJson
RFPPostgres pgPlan -> J.toJSON pgPlan
type VariableTypes = Map.HashMap G.Variable PGColumnType
data QueryPlan
= QueryPlan
{ _qpVariables :: ![G.VariableDefinition]
, _qpFldPlans :: ![(G.Alias, RootFieldPlan)]
}
type FieldPlans = [(G.Alias, RootFieldPlan)]
data ReusableQueryPlan
= ReusableQueryPlan
{ _rqpVariableTypes :: !VariableTypes
, _rqpFldPlans :: ![(G.Alias, RootFieldPlan)]
{ _rqpVariableTypes :: !ReusableVariableTypes
, _rqpFldPlans :: !FieldPlans
}
instance J.ToJSON ReusableQueryPlan where
@ -83,38 +76,15 @@ instance J.ToJSON ReusableQueryPlan where
, "field_plans" J..= fldPlans
]
getReusablePlan :: QueryPlan -> Maybe ReusableQueryPlan
getReusablePlan (QueryPlan vars fldPlans) =
if all fldPlanReusable $ map snd fldPlans
then Just $ ReusableQueryPlan varTypes fldPlans
else Nothing
where
allVars = Set.fromList $ map G._vdVariable vars
-- this is quite aggressive, we can improve this by
-- computing used variables in each field
allUsed fldPlanVars =
Set.null $ Set.difference allVars $ Set.fromList fldPlanVars
fldPlanReusable = \case
RFPRaw _ -> True
RFPPostgres pgPlan -> allUsed $ Map.keys $ _ppVariables pgPlan
varTypesOfPlan = \case
RFPRaw _ -> mempty
RFPPostgres pgPlan -> snd <$> _ppVariables pgPlan
varTypes = Map.unions $ map (varTypesOfPlan . snd) fldPlans
withPlan
:: (MonadError QErr m)
=> UserVars -> PGPlan -> GV.AnnPGVarVals -> m PreparedSql
=> UserVars -> PGPlan -> ReusableVariableValues -> m PreparedSql
withPlan usrVars (PGPlan q reqVars prepMap) annVars = do
prepMap' <- foldM getVar prepMap (Map.toList reqVars)
let args = withUserVars usrVars $ IntMap.elems prepMap'
return $ PreparedSql q args
where
getVar accum (var, (prepNo, _)) = do
getVar accum (var, prepNo) = do
let varName = G.unName $ G.unVariable var
colVal <- onNothing (Map.lookup var annVars) $
throw500 $ "missing variable in annVars : " <> varName
@ -125,9 +95,9 @@ withPlan usrVars (PGPlan q reqVars prepMap) annVars = do
mkCurPlanTx
:: (MonadError QErr m)
=> UserVars
-> QueryPlan
-> FieldPlans
-> m (LazyRespTx, GeneratedSqlMap)
mkCurPlanTx usrVars (QueryPlan _ fldPlans) = do
mkCurPlanTx usrVars fldPlans = do
-- generate the SQL and prepared vars or the bytestring
resolved <- forM fldPlans $ \(alias, fldPlan) -> do
fldResp <- case fldPlan of
@ -154,42 +124,33 @@ initPlanningSt :: PlanningSt
initPlanningSt =
PlanningSt 2 Map.empty IntMap.empty
getVarArgNum
:: (MonadState PlanningSt m)
=> G.Variable -> PGColumnType -> m Int
getVarArgNum var colTy = do
getVarArgNum :: (MonadState PlanningSt m) => G.Variable -> m Int
getVarArgNum var = do
PlanningSt curArgNum vars prepped <- get
case Map.lookup var vars of
Just argNum -> return $ fst argNum
Nothing -> do
put $ PlanningSt (curArgNum + 1)
(Map.insert var (curArgNum, colTy) vars) prepped
return curArgNum
Just argNum -> pure argNum
Nothing -> do
put $ PlanningSt (curArgNum + 1) (Map.insert var curArgNum vars) prepped
pure curArgNum
addPrepArg
:: (MonadState PlanningSt m)
=> Int -> Q.PrepArg -> m ()
addPrepArg :: (MonadState PlanningSt m) => Int -> Q.PrepArg -> m ()
addPrepArg argNum arg = do
PlanningSt curArgNum vars prepped <- get
put $ PlanningSt curArgNum vars $ IntMap.insert argNum arg prepped
getNextArgNum
:: (MonadState PlanningSt m)
=> m Int
getNextArgNum :: (MonadState PlanningSt m) => m Int
getNextArgNum = do
PlanningSt curArgNum vars prepped <- get
put $ PlanningSt (curArgNum + 1) vars prepped
return curArgNum
prepareWithPlan
:: (MonadState PlanningSt m)
=> UnresolvedVal -> m S.SQLExp
prepareWithPlan :: (MonadState PlanningSt m) => UnresolvedVal -> m S.SQLExp
prepareWithPlan = \case
R.UVPG annPGVal -> do
let AnnPGVal varM isNullable colTy colVal = annPGVal
argNum <- case (varM, isNullable) of
(Just var, False) -> getVarArgNum var colTy
_ -> getNextArgNum
let AnnPGVal varM _ colVal = annPGVal
argNum <- case varM of
Just var -> getVarArgNum var
Nothing -> getNextArgNum
addPrepArg argNum $ toBinaryValue colVal
return $ toPrepParam argNum (pstType colVal)
@ -216,27 +177,24 @@ convertQuerySelSet
, Has SQLGenCtx r
, Has UserInfo r
)
=> [G.VariableDefinition]
-> V.SelSet
=> V.SelSet
-> m (LazyRespTx, Maybe ReusableQueryPlan, GeneratedSqlMap)
convertQuerySelSet varDefs fields = do
convertQuerySelSet fields = do
usrVars <- asks (userVars . getter)
fldPlans <- forM (toList fields) $ \fld -> do
(fldPlans, varTypes) <- runResolveT . forM (toList fields) $ \fld -> do
fldPlan <- case V._fName fld of
"__type" -> fldPlanFromJ <$> R.typeR fld
"__schema" -> fldPlanFromJ <$> R.schemaR fld
"__typename" -> return $ fldPlanFromJ queryRootName
"__typename" -> pure $ fldPlanFromJ queryRootName
_ -> do
unresolvedAst <- R.queryFldToPGAST fld
(q, PlanningSt _ vars prepped) <-
flip runStateT initPlanningSt $ R.traverseQueryRootFldAST
prepareWithPlan unresolvedAst
return $ RFPPostgres $ PGPlan (R.toPGQuery q) vars prepped
return (V._fAlias fld, fldPlan)
let queryPlan = QueryPlan varDefs fldPlans
reusablePlanM = getReusablePlan queryPlan
(tx, sql) <- mkCurPlanTx usrVars queryPlan
return (tx, reusablePlanM, sql)
(q, PlanningSt _ vars prepped) <- flip runStateT initPlanningSt $
R.traverseQueryRootFldAST prepareWithPlan unresolvedAst
pure . RFPPostgres $ PGPlan (R.toPGQuery q) vars prepped
pure (V._fAlias fld, fldPlan)
let reusablePlan = ReusableQueryPlan <$> varTypes <*> pure fldPlans
(tx, sql) <- mkCurPlanTx usrVars fldPlans
pure (tx, reusablePlan, sql)
-- use the existing plan and new variables to create a pg query
queryOpFromPlan
@ -246,7 +204,7 @@ queryOpFromPlan
-> ReusableQueryPlan
-> m (LazyRespTx, GeneratedSqlMap)
queryOpFromPlan usrVars varValsM (ReusableQueryPlan varTypes fldPlans) = do
validatedVars <- GV.getAnnPGVarVals varTypes varValsM
validatedVars <- GV.validateVariablesForReuse varTypes varValsM
-- generate the SQL and prepared vars or the bytestring
resolved <- forM fldPlans $ \(alias, fldPlan) ->
(alias,) <$> case fldPlan of

View File

@ -12,6 +12,7 @@ import qualified Language.GraphQL.Draft.Syntax as G
import Hasura.EncJSON
import Hasura.GraphQL.Context
import Hasura.GraphQL.Resolve.Types
import Hasura.Prelude
import Hasura.RQL.DML.Internal
import Hasura.RQL.Types
@ -88,7 +89,7 @@ explainField userInfo gCtx sqlGenCtx fld =
_ -> do
unresolvedAST <-
runExplain (queryCtxMap, userInfo, fldMap, orderByCtx, sqlGenCtx) $
RS.queryFldToPGAST fld
evalResolveT $ RS.queryFldToPGAST fld
resolvedAST <- RS.traverseQueryRootFldAST (resolveVal userInfo)
unresolvedAST
let txtSQL = Q.getQueryText $ RS.toPGQuery resolvedAST
@ -114,7 +115,7 @@ explainGQLQuery
explainGQLQuery pgExecCtx sc sqlGenCtx enableAL (GQLExplain query userVarsRaw) = do
execPlan <- E.getExecPlanPartial userInfo sc enableAL query
(gCtx, rootSelSet) <- case execPlan of
E.GExPHasura (gCtx, rootSelSet, _) ->
E.GExPHasura (gCtx, rootSelSet) ->
return (gCtx, rootSelSet)
E.GExPRemote _ _ ->
throw400 InvalidParams "only hasura queries can be explained"

View File

@ -42,7 +42,7 @@ validateHdrs userInfo hdrs = do
throw400 NotFound $ hdr <<> " header is expected but not found"
queryFldToPGAST
:: ( MonadError QErr m, MonadReader r m, Has FieldMap r
:: ( MonadResolve m, MonadReader r m, Has FieldMap r
, Has OrdByCtx r, Has SQLGenCtx r, Has UserInfo r
, Has QueryCtxMap r
)
@ -69,7 +69,7 @@ queryFldToPGAST fld = do
RS.convertFuncQueryAgg ctx fld
queryFldToSQL
:: ( MonadError QErr m, MonadReader r m, Has FieldMap r
:: ( MonadResolve m, MonadReader r m, Has FieldMap r
, Has OrdByCtx r, Has SQLGenCtx r, Has UserInfo r
, Has QueryCtxMap r
)
@ -85,7 +85,7 @@ queryFldToSQL fn fld = do
return $ RS.toPGQuery resolvedAST
mutFldToTx
:: ( MonadError QErr m
:: ( MonadResolve m
, MonadReader r m
, Has UserInfo r
, Has MutationCtxMap r
@ -112,7 +112,7 @@ mutFldToTx fld = do
RM.convertDelete ctx fld
getOpCtx
:: ( MonadError QErr m
:: ( MonadResolve m
, MonadReader r m
, Has (OpCtxMap a) r
)

View File

@ -20,9 +20,7 @@ import qualified Hasura.SQL.DML as S
type OpExp = OpExpG UnresolvedVal
parseOpExps
:: (MonadError QErr m)
=> PGColumnType -> AnnInpVal -> m [OpExp]
parseOpExps :: (MonadResolve m) => PGColumnType -> AnnInpVal -> m [OpExp]
parseOpExps colTy annVal = do
opExpsM <- flip withObjectM annVal $ \nt objM -> forM objM $ \obj ->
forM (OMap.toList obj) $ \(k, v) ->
@ -82,12 +80,12 @@ parseOpExps colTy annVal = do
<> showName k
return $ catMaybes $ fromMaybe [] opExpsM
where
asOpRhs = fmap (fmap UVPG) . asPGColumnValueM
asOpRhs = fmap (fmap mkParameterizablePGValue) . asPGColumnValueM
parseAsObjectM v f = asObjectM v >>= mapM f
asPGArray rhsTy v = do
valsM <- parseMany asPGColumnValue v
valsM <- parseMany (openOpaqueValue <=< asPGColumnValue) v
forM valsM $ \vals -> do
let arrayExp = S.SEArray $ map (txtEncoder . pstValue . _apvValue) vals
return $ UVSQL $ S.SETyAnn arrayExp $ S.mkTypeAnn $
@ -97,26 +95,25 @@ parseOpExps colTy annVal = do
-- somehow get rid of this.)
PGTypeArray (unsafePGColumnToRepresentation rhsTy)
resolveIsNull v = case _aivValue v of
AGScalar _ Nothing -> return Nothing
AGScalar _ (Just (PGValBoolean b)) ->
return $ Just $ bool ANISNOTNULL ANISNULL b
AGScalar _ _ -> throw500 "boolean value is expected"
_ -> tyMismatch "pgvalue" v
resolveIsNull v = asPGColumnValueM v >>= traverse openOpaqueValue >>= \case
Nothing -> pure Nothing
Just annPGVal -> case pstValue $ _apvValue annPGVal of
PGValBoolean b -> pure . Just $ bool ANISNOTNULL ANISNULL b
_ -> throw500 "boolean value is expected"
parseAsSTDWithinObj obj = do
distanceVal <- onNothing (OMap.lookup "distance" obj) $
throw500 "expected \"distance\" input field in st_d_within"
dist <- UVPG <$> asPGColumnValue distanceVal
dist <- mkParameterizablePGValue <$> asPGColumnValue distanceVal
fromVal <- onNothing (OMap.lookup "from" obj) $
throw500 "expected \"from\" input field in st_d_within"
from <- UVPG <$> asPGColumnValue fromVal
from <- mkParameterizablePGValue <$> asPGColumnValue fromVal
case colTy of
PGColumnScalar PGGeography -> do
useSpheroidVal <-
onNothing (OMap.lookup "use_spheroid" obj) $
throw500 "expected \"use_spheroid\" input field in st_d_within"
useSpheroid <- UVPG <$> asPGColumnValue useSpheroidVal
useSpheroid <- mkParameterizablePGValue <$> asPGColumnValue useSpheroidVal
return $ ASTDWithinGeog $ DWithinGeogOp dist from useSpheroid
PGColumnScalar PGGeometry ->
return $ ASTDWithinGeom $ DWithinGeomOp dist from
@ -125,22 +122,23 @@ parseOpExps colTy annVal = do
parseAsSTIntersectsNbandGeomObj obj = do
nbandVal <- onNothing (OMap.lookup "nband" obj) $
throw500 "expected \"nband\" input field"
nband <- UVPG <$> asPGColumnValue nbandVal
nband <- mkParameterizablePGValue <$> asPGColumnValue nbandVal
geommin <- parseGeommin obj
return $ ASTIntersectsNbandGeom $ STIntersectsNbandGeommin nband geommin
parseAsSTIntersectsGeomNbandObj obj = do
nbandMM <- (fmap . fmap) UVPG <$> mapM asPGColumnValueM (OMap.lookup "nband" obj)
nbandMM <- fmap (fmap mkParameterizablePGValue) <$>
traverse asPGColumnValueM (OMap.lookup "nband" obj)
geommin <- parseGeommin obj
return $ ASTIntersectsGeomNband $ STIntersectsGeomminNband geommin $ join nbandMM
parseGeommin obj = do
geomminVal <- onNothing (OMap.lookup "geommin" obj) $
throw500 "expected \"geommin\" input field"
UVPG <$> asPGColumnValue geomminVal
mkParameterizablePGValue <$> asPGColumnValue geomminVal
parseCastExpression
:: (MonadError QErr m)
:: (MonadResolve m)
=> AnnInpVal -> m (Maybe (CastExp UnresolvedVal))
parseCastExpression =
withObjectM $ \_ objM -> forM objM $ \obj -> do
@ -151,7 +149,7 @@ parseCastExpression =
return $ Map.fromList targetExps
parseColExp
:: ( MonadError QErr m
:: ( MonadResolve m
, MonadReader r m
, Has FieldMap r
)
@ -169,7 +167,7 @@ parseColExp nt n val = do
fmapAnnBoolExp partialSQLExpToUnresolvedVal permExp
parseBoolExp
:: ( MonadError QErr m
:: ( MonadResolve m
, MonadReader r m
, Has FieldMap r
)

View File

@ -99,21 +99,21 @@ withArg args arg f = prependArgsInPath $ nameAsPath arg $
getArg args arg >>= f
withArgM
:: (MonadError QErr m)
:: (MonadResolve m)
=> ArgsMap
-> G.Name
-> (AnnInpVal -> m a)
-> m (Maybe a)
withArgM args arg f = prependArgsInPath $ nameAsPath arg $
mapM f $ handleNull =<< Map.lookup arg args
where
handleNull v = bool (Just v) Nothing $
hasNullVal $ _aivValue v
withArgM args argName f = do
wrappedArg <- for (Map.lookup argName args) $ \arg -> do
when (isJust (_aivVariable arg) && G.isNullable (_aivType arg)) markNotReusable
pure . bool (Just arg) Nothing $ hasNullVal (_aivValue arg)
prependArgsInPath . nameAsPath argName $ traverse f (join wrappedArg)
type PrepArgs = Seq.Seq Q.PrepArg
prepare :: (MonadState PrepArgs m) => AnnPGVal -> m S.SQLExp
prepare (AnnPGVal _ _ _ scalarValue) = prepareColVal scalarValue
prepare (AnnPGVal _ _ scalarValue) = prepareColVal scalarValue
resolveValPrep
:: (MonadState PrepArgs m)
@ -141,7 +141,7 @@ prepareColVal (WithScalarType scalarType colVal) = do
return $ toPrepParam (Seq.length preparedArgs + 1) scalarType
txtConverter :: Applicative f => AnnPGVal -> f S.SQLExp
txtConverter (AnnPGVal _ _ _ scalarValue) = pure $ toTxtValue scalarValue
txtConverter (AnnPGVal _ _ scalarValue) = pure $ toTxtValue scalarValue
withSelSet :: (Monad m) => SelSet -> (Field -> m a) -> m [(Text, a)]
withSelSet selSet f =

View File

@ -1,9 +1,15 @@
module Hasura.GraphQL.Resolve.InputValue
( withNotNull
, tyMismatch
, OpaqueValue
, OpaquePGValue
, mkParameterizablePGValue
, openOpaqueValue
, asPGColumnTypeAndValueM
, asPGColumnValueM
, asPGColumnValue
, asEnumVal
, asEnumValM
, withObject
@ -44,96 +50,124 @@ tyMismatch expectedTy v =
getAnnInpValKind (_aivValue v) <> " for value of type " <>
G.showGT (_aivType v)
asPGColumnTypeAndValueM
:: (MonadError QErr m)
=> AnnInpVal
-> m (PGColumnType, WithScalarType (Maybe PGScalarValue))
asPGColumnTypeAndValueM v = case _aivValue v of
AGScalar colTy val -> pure (PGColumnScalar colTy, WithScalarType colTy val)
AGEnum _ (AGEReference reference maybeValue) -> do
let maybeScalarValue = PGValText . RQL.getEnumValue <$> maybeValue
pure (PGColumnEnumReference reference, WithScalarType PGText maybeScalarValue)
_ -> tyMismatch "pgvalue" v
-- | As part of query reusability tracking (see 'QueryReusability'), functions that parse input
-- values call 'markNotReusable' when the value comes from a variable. However, always calling
-- 'markNotReusable' when parsing column values (using 'asPGColumnValue' and its variants) would be
-- much too conservative: often the value is simply validated and wrapped immediately in 'UVPG',
-- which allows it to be parameterized over.
--
-- Always omitting the check would be incorrect, as some callers inspect the column values and use
-- them to generate different SQL, which is where 'OpaqueValue' comes in. Functions like
-- 'asPGColumnValue' return an 'OpaquePGValue', which can be safely converted to an 'UnresolvedVal'
-- via 'mkParameterizablePGValue' without marking the query as non-reusable. Other callers that wish
-- to inspect the value can instead call 'openOpaqueValue' to get the value out, and /that/ will
-- mark the query non-reusable, instead.
--
-- In other words, 'OpaqueValue' is a mechanism of delaying the 'markNotReusable' call until were
-- confident its value will actually affect the generated SQL.
data OpaqueValue a
= OpaqueValue
{ _opgvValue :: !a
, _opgvIsVariable :: !Bool
} deriving (Show)
type OpaquePGValue = OpaqueValue AnnPGVal
asPGColumnTypeAndAnnValueM :: (MonadError QErr m) => AnnInpVal -> m (PGColumnType, Maybe AnnPGVal)
mkParameterizablePGValue :: OpaquePGValue -> UnresolvedVal
mkParameterizablePGValue (OpaqueValue v _) = UVPG v
openOpaqueValue :: (MonadResolve m) => OpaqueValue a -> m a
openOpaqueValue (OpaqueValue v isVariable) = when isVariable markNotReusable $> v
asPGColumnTypeAndValueM
:: (MonadResolve m)
=> AnnInpVal
-> m (PGColumnType, WithScalarType (Maybe (OpaqueValue PGScalarValue)))
asPGColumnTypeAndValueM v = do
(columnType, scalarValueM) <- case _aivValue v of
AGScalar colTy val -> pure (PGColumnScalar colTy, WithScalarType colTy val)
AGEnum _ (AGEReference reference maybeValue) -> do
let maybeScalarValue = PGValText . RQL.getEnumValue <$> maybeValue
pure (PGColumnEnumReference reference, WithScalarType PGText maybeScalarValue)
_ -> tyMismatch "pgvalue" v
for_ (_aivVariable v) $ \variableName -> if
-- If the value is a nullable variable, then the caller might make a different decision based on
-- whether the result is 'Nothing' or 'Just', which would change the generated query, so we have
-- to unconditionally mark the query non-reusable.
| G.isNullable (_aivType v) -> markNotReusable
| otherwise -> recordVariableUse variableName columnType
let isVariable = isJust $ _aivVariable v
pure (columnType, fmap (flip OpaqueValue isVariable) <$> scalarValueM)
asPGColumnTypeAndAnnValueM :: (MonadResolve m) => AnnInpVal -> m (PGColumnType, Maybe OpaquePGValue)
asPGColumnTypeAndAnnValueM v = do
(columnType, scalarValueM) <- asPGColumnTypeAndValueM v
let mkAnnPGColVal = AnnPGVal (_aivVariable v) (G.isNullable (_aivType v)) columnType
pure (columnType, mkAnnPGColVal <$> sequence scalarValueM)
let mkAnnPGColVal = AnnPGVal (_aivVariable v) (G.isNullable (_aivType v))
replaceOpaqueValue (WithScalarType scalarType (OpaqueValue scalarValue isVariable)) =
OpaqueValue (mkAnnPGColVal (WithScalarType scalarType scalarValue)) isVariable
pure (columnType, replaceOpaqueValue <$> sequence scalarValueM)
asPGColumnValueM :: (MonadError QErr m) => AnnInpVal -> m (Maybe AnnPGVal)
asPGColumnValueM :: (MonadResolve m) => AnnInpVal -> m (Maybe OpaquePGValue)
asPGColumnValueM = fmap snd . asPGColumnTypeAndAnnValueM
asPGColumnValue :: (MonadError QErr m) => AnnInpVal -> m AnnPGVal
asPGColumnValue :: (MonadResolve m) => AnnInpVal -> m OpaquePGValue
asPGColumnValue v = do
(columnType, annPGValM) <- asPGColumnTypeAndAnnValueM v
onNothing annPGValM $ throw500 ("unexpected null for type " <>> columnType)
openInputValue :: (MonadResolve m) => AnnInpVal -> m AnnGValue
openInputValue v = when (isJust $ _aivVariable v) markNotReusable $> _aivValue v
-- | Note: only handles “synthetic” enums (see 'EnumValuesInfo'). Enum table references are handled
-- by 'asPGColumnTypeAndValueM' and its variants.
asEnumVal :: (MonadError QErr m) => AnnInpVal -> m (G.NamedType, G.EnumValue)
-- by 'asPGColumnType' and its variants.
asEnumVal :: (MonadResolve m) => AnnInpVal -> m (G.NamedType, G.EnumValue)
asEnumVal = asEnumValM >=> \case
(ty, Just val) -> pure (ty, val)
(ty, Nothing) -> throw500 $ "unexpected null for ty " <> showNamedTy ty
-- | Like 'asEnumVal', only handles “synthetic” enums.
asEnumValM :: (MonadError QErr m) => AnnInpVal -> m (G.NamedType, Maybe G.EnumValue)
asEnumValM v = case _aivValue v of
asEnumValM :: (MonadResolve m) => AnnInpVal -> m (G.NamedType, Maybe G.EnumValue)
asEnumValM v = openInputValue v >>= \case
AGEnum ty (AGESynthetic valM) -> return (ty, valM)
_ -> tyMismatch "enum" v
withObject
:: (MonadError QErr m)
=> (G.NamedType -> AnnGObject -> m a) -> AnnInpVal -> m a
withObject fn v = case _aivValue v of
withObject :: (MonadResolve m) => (G.NamedType -> AnnGObject -> m a) -> AnnInpVal -> m a
withObject fn v = openInputValue v >>= \case
AGObject nt (Just obj) -> fn nt obj
AGObject _ Nothing ->
throw500 $ "unexpected null for ty"
<> G.showGT (_aivType v)
_ -> tyMismatch "object" v
asObject
:: (MonadError QErr m)
=> AnnInpVal -> m AnnGObject
asObject :: (MonadResolve m) => AnnInpVal -> m AnnGObject
asObject = withObject (\_ o -> return o)
withObjectM
:: (MonadError QErr m)
=> (G.NamedType -> Maybe AnnGObject -> m a) -> AnnInpVal -> m a
withObjectM fn v = case _aivValue v of
withObjectM :: (MonadResolve m) => (G.NamedType -> Maybe AnnGObject -> m a) -> AnnInpVal -> m a
withObjectM fn v = openInputValue v >>= \case
AGObject nt objM -> fn nt objM
_ -> tyMismatch "object" v
asObjectM
:: (MonadError QErr m)
=> AnnInpVal -> m (Maybe AnnGObject)
asObjectM :: (MonadResolve m) => AnnInpVal -> m (Maybe AnnGObject)
asObjectM = withObjectM (\_ o -> return o)
withArrayM
:: (MonadError QErr m)
=> (G.ListType -> Maybe [AnnInpVal] -> m a) -> AnnInpVal -> m a
withArrayM fn v = case _aivValue v of
withArrayM :: (MonadResolve m) => (G.ListType -> Maybe [AnnInpVal] -> m a) -> AnnInpVal -> m a
withArrayM fn v = openInputValue v >>= \case
AGArray lt listM -> fn lt listM
_ -> tyMismatch "array" v
withArray
:: (MonadError QErr m)
=> (G.ListType -> [AnnInpVal] -> m a) -> AnnInpVal -> m a
withArray fn v = case _aivValue v of
withArray :: (MonadResolve m) => (G.ListType -> [AnnInpVal] -> m a) -> AnnInpVal -> m a
withArray fn v = openInputValue v >>= \case
AGArray lt (Just l) -> fn lt l
AGArray _ Nothing -> throw500 $ "unexpected null for ty"
<> G.showGT (_aivType v)
_ -> tyMismatch "array" v
asArray
:: (MonadError QErr m)
=> AnnInpVal -> m [AnnInpVal]
asArray :: (MonadResolve m) => AnnInpVal -> m [AnnInpVal]
asArray = withArray (\_ vals -> return vals)
parseMany
:: (MonadError QErr m)
=> (AnnInpVal -> m a) -> AnnInpVal -> m (Maybe [a])
parseMany fn v = case _aivValue v of
parseMany :: (MonadResolve m) => (AnnInpVal -> m a) -> AnnInpVal -> m (Maybe [a])
parseMany fn v = openInputValue v >>= \case
AGArray _ arrM -> mapM (mapM fn) arrM
_ -> tyMismatch "array" v
@ -145,16 +179,12 @@ onlyText = \case
PGValVarchar t -> return t
_ -> throw500 "expecting text for asPGColText"
asPGColText
:: (MonadError QErr m)
=> AnnInpVal -> m Text
asPGColText :: (MonadResolve m) => AnnInpVal -> m Text
asPGColText val = do
pgColVal <- pstValue . _apvValue <$> asPGColumnValue val
onlyText pgColVal
pgColVal <- openOpaqueValue =<< asPGColumnValue val
onlyText (pstValue $ _apvValue pgColVal)
asPGColTextM
:: (MonadError QErr m)
=> AnnInpVal -> m (Maybe Text)
asPGColTextM :: (MonadResolve m) => AnnInpVal -> m (Maybe Text)
asPGColTextM val = do
pgColValM <- fmap (pstValue . _apvValue) <$> asPGColumnValueM val
mapM onlyText pgColValM
pgColValM <- traverse openOpaqueValue =<< asPGColumnValueM val
traverse onlyText (pstValue . _apvValue <$> pgColValM)

View File

@ -85,7 +85,7 @@ data AnnInsObj
} deriving (Show, Eq)
mkAnnInsObj
:: (MonadError QErr m, Has InsCtxMap r, MonadReader r m)
:: (MonadResolve m, Has InsCtxMap r, MonadReader r m)
=> RelationInfoMap
-> AnnGObject
-> m AnnInsObj
@ -95,7 +95,7 @@ mkAnnInsObj relInfoMap annObj =
emptyInsObj = AnnInsObj [] [] []
traverseInsObj
:: (MonadError QErr m, Has InsCtxMap r, MonadReader r m)
:: (MonadResolve m, Has InsCtxMap r, MonadReader r m)
=> RelationInfoMap
-> (G.Name, AnnInpVal)
-> AnnInsObj
@ -109,7 +109,7 @@ traverseInsObj rim (gName, annVal) defVal@(AnnInsObj cols objRels arrRels) =
parseValue = do
(_, WithScalarType scalarType maybeScalarValue) <- asPGColumnTypeAndValueM annVal
let columnName = PGCol $ G.unName gName
scalarValue = fromMaybe (PGNull scalarType) maybeScalarValue
scalarValue <- maybe (pure $ PGNull scalarType) openOpaqueValue maybeScalarValue
pure $ AnnInsObj ((columnName, WithScalarType scalarType scalarValue):cols) objRels arrRels
parseObject = do
@ -154,7 +154,7 @@ traverseInsObj rim (gName, annVal) defVal@(AnnInsObj cols objRels arrRels) =
bool withNonEmptyArrData (return defVal) $ null arrDataVals
parseOnConflict
:: (MonadError QErr m)
:: (MonadResolve m)
=> QualifiedTable -> Maybe UpdPermForIns
-> AnnInpVal -> m RI.ConflictClauseP1
parseOnConflict tn updFiltrM val = withPathK "on_conflict" $
@ -474,7 +474,7 @@ prefixErrPath fld =
withPathK "selectionSet" . fieldAsPath fld . withPathK "args"
convertInsert
:: ( MonadError QErr m, MonadReader r m, Has FieldMap r
:: ( MonadResolve m, MonadReader r m, Has FieldMap r
, Has OrdByCtx r, Has SQLGenCtx r, Has InsCtxMap r
)
=> RoleName
@ -492,7 +492,7 @@ convertInsert role tn fld = prefixErrPath fld $ do
where
withNonEmptyObjs annVals mutFlds = do
InsCtx vn tableCols defValMap relInfoMap updPerm <- getInsCtx tn
annObjs <- mapM asObject annVals
annObjs <- traverse asObject annVals
annInsObjs <- forM annObjs $ mkAnnInsObj relInfoMap
conflictClauseM <- forM onConflictM $ parseOnConflict tn updPerm
defValMapRes <- mapM (convPartialSQLExp sessVarFromCurrentSetting)

View File

@ -19,8 +19,6 @@ import Hasura.GraphQL.Validate.Field
import Hasura.GraphQL.Validate.InputValue
import Hasura.GraphQL.Validate.Types
import Hasura.RQL.Types
import Hasura.SQL.Types
import Hasura.SQL.Value
data TypeKind
= TKSCALAR
@ -335,22 +333,16 @@ schemaR fld =
_ -> return J.Null
typeR
:: ( MonadReader r m, Has TypeMap r
, MonadError QErr m)
:: (MonadResolve m, MonadReader r m, Has TypeMap r)
=> Field -> m J.Value
typeR fld = do
name <- withArg args "name" $ \arg -> do
pgColVal <- pstValue . _apvValue <$> asPGColumnValue arg
case pgColVal of
PGValText t -> return t
_ -> throw500 "expecting string for name arg of __type"
name <- asPGColText =<< getArg args "name"
typeR' (G.Name name) fld
where
args = _fArguments fld
typeR'
:: ( MonadReader r m, Has TypeMap r
, MonadError QErr m)
:: (MonadReader r m, Has TypeMap r, MonadError QErr m)
=> G.Name -> Field -> m J.Value
typeR' n fld = do
tyMap <- asks getter

View File

@ -32,7 +32,7 @@ import Hasura.SQL.Types
import Hasura.SQL.Value
convertMutResp
:: ( MonadError QErr m, MonadReader r m, Has FieldMap r
:: ( MonadResolve m, MonadReader r m, Has FieldMap r
, Has OrdByCtx r, Has SQLGenCtx r
)
=> G.NamedType -> SelSet -> m (RR.MutFldsG UnresolvedVal)
@ -53,13 +53,13 @@ convertMutResp ty selSet =
UVSQL sqlExp -> pure $ UVSQL sqlExp
convertRowObj
:: (MonadError QErr m)
:: (MonadResolve m)
=> AnnInpVal
-> m [(PGCol, UnresolvedVal)]
convertRowObj val =
flip withObject val $ \_ obj ->
forM (OMap.toList obj) $ \(k, v) -> do
prepExpM <- fmap UVPG <$> asPGColumnValueM v
prepExpM <- fmap mkParameterizablePGValue <$> asPGColumnValueM v
let prepExp = fromMaybe (UVSQL S.SENull) prepExpM
return (PGCol $ G.unName k, prepExp)
@ -78,23 +78,23 @@ lhsExpOp op annTy (col, e) =
annExp = S.SETyAnn e annTy
convObjWithOp
:: (MonadError QErr m)
:: (MonadResolve m)
=> ApplySQLOp -> AnnInpVal -> m [(PGCol, UnresolvedVal)]
convObjWithOp opFn val =
flip withObject val $ \_ obj -> forM (OMap.toList obj) $ \(k, v) -> do
colVal <- pstValue . _apvValue <$> asPGColumnValue v
colVal <- openOpaqueValue =<< asPGColumnValue v
let pgCol = PGCol $ G.unName k
-- TODO: why are we using txtEncoder here?
encVal = txtEncoder colVal
encVal = txtEncoder . pstValue $ _apvValue colVal
sqlExp = opFn (pgCol, encVal)
return (pgCol, UVSQL sqlExp)
convDeleteAtPathObj
:: (MonadError QErr m)
:: (MonadResolve m)
=> AnnInpVal -> m [(PGCol, UnresolvedVal)]
convDeleteAtPathObj val =
flip withObject val $ \_ obj -> forM (OMap.toList obj) $ \(k, v) -> do
vals <- flip withArray v $ \_ annVals -> mapM asPGColumnValue annVals
vals <- traverse (openOpaqueValue <=< asPGColumnValue) =<< asArray v
let valExps = map (txtEncoder . pstValue . _apvValue) vals
pgCol = PGCol $ G.unName k
annEncVal = S.SETyAnn (S.SEArray valExps) S.textArrTypeAnn
@ -103,7 +103,7 @@ convDeleteAtPathObj val =
return (pgCol, UVSQL sqlExp)
convertUpdateP1
:: ( MonadError QErr m
:: ( MonadResolve m
, MonadReader r m, Has FieldMap r
, Has OrdByCtx r, Has SQLGenCtx r
)
@ -159,7 +159,7 @@ convertUpdateP1 opCtx fld = do
args = _fArguments fld
convertUpdate
:: ( MonadError QErr m
:: ( MonadResolve m
, MonadReader r m, Has FieldMap r
, Has OrdByCtx r, Has SQLGenCtx r
)
@ -180,7 +180,7 @@ convertUpdate opCtx fld = do
bool whenNonEmptyItems whenEmptyItems $ null $ RU.uqp1SetExps annUpdResolved
convertDelete
:: ( MonadError QErr m
:: ( MonadResolve m
, MonadReader r m, Has FieldMap r
, Has OrdByCtx r, Has SQLGenCtx r
)

View File

@ -47,16 +47,16 @@ jsonPathToColExp t = case parseJSONPath t of
elToColExp (Index i) = S.SELit $ T.pack (show i)
argsToColOp :: (MonadError QErr m) => ArgsMap -> m (Maybe RS.ColOp)
argsToColOp :: (MonadResolve m) => ArgsMap -> m (Maybe RS.ColOp)
argsToColOp args = maybe (return Nothing) toOp $ Map.lookup "path" args
where
toJsonPathExp = fmap (RS.ColOp S.jsonbPathOp) . jsonPathToColExp
toOp v = asPGColTextM v >>= mapM toJsonPathExp
toOp v = asPGColTextM v >>= traverse toJsonPathExp
type AnnFlds = RS.AnnFldsG UnresolvedVal
fromSelSet
:: ( MonadError QErr m, MonadReader r m, Has FieldMap r
:: ( MonadResolve m, MonadReader r m, Has FieldMap r
, Has OrdByCtx r, Has SQLGenCtx r
)
=> G.NamedType -> SelSet -> m AnnFlds
@ -88,7 +88,7 @@ fromSelSet fldTy flds =
type TableAggFlds = RS.TableAggFldsG UnresolvedVal
fromAggSelSet
:: ( MonadError QErr m, MonadReader r m, Has FieldMap r
:: ( MonadResolve m, MonadReader r m, Has FieldMap r
, Has OrdByCtx r, Has SQLGenCtx r
)
=> G.NamedType -> SelSet -> m TableAggFlds
@ -105,7 +105,7 @@ fromAggSelSet fldTy selSet = fmap toFields $
type TableArgs = RS.TableArgsG UnresolvedVal
parseTableArgs
:: ( MonadError QErr m, MonadReader r m
:: ( MonadResolve m, MonadReader r m
, Has FieldMap r, Has OrdByCtx r
)
=> ArgsMap -> m TableArgs
@ -114,7 +114,7 @@ parseTableArgs args = do
ordByExpML <- withArgM args "order_by" parseOrderBy
let ordByExpM = NE.nonEmpty =<< ordByExpML
limitExpM <- withArgM args "limit" parseLimit
offsetExpM <- withArgM args "offset" $ asPGColumnValue >=> txtConverter
offsetExpM <- withArgM args "offset" $ asPGColumnValue >=> openOpaqueValue >=> txtConverter
distOnColsML <- withArgM args "distinct_on" parseColumns
let distOnColsM = NE.nonEmpty =<< distOnColsML
mapM_ (validateDistOn ordByExpM) distOnColsM
@ -137,7 +137,7 @@ parseTableArgs args = do
type AnnSimpleSelect = RS.AnnSimpleSelG UnresolvedVal
fromField
:: ( MonadError QErr m, MonadReader r m, Has FieldMap r
:: ( MonadResolve m, MonadReader r m, Has FieldMap r
, Has OrdByCtx r, Has SQLGenCtx r
)
=> QualifiedTable -> AnnBoolExpPartialSQL
@ -165,7 +165,7 @@ getOrdByItemMap nt = do
throw500 $ "could not lookup " <> showNamedTy nt
parseOrderBy
:: ( MonadError QErr m
:: ( MonadResolve m
, MonadReader r m
, Has OrdByCtx r
)
@ -175,7 +175,7 @@ parseOrderBy = fmap concat . withArray f
f _ = mapM (withObject (getAnnObItems id))
getAnnObItems
:: ( MonadError QErr m
:: ( MonadResolve m
, MonadReader r m
, Has OrdByCtx r
)
@ -215,7 +215,7 @@ mkOrdByItemG ordTy aobCol nullsOrd =
OrderByItemG (Just $ OrderType ordTy) aobCol (Just $ NullsOrder nullsOrd)
parseAggOrdBy
:: (MonadError QErr m)
:: (MonadResolve m)
=> (RS.AnnAggOrdBy -> RS.AnnObColG UnresolvedVal)
-> AnnGObject
-> m [RS.AnnOrderByItemG UnresolvedVal]
@ -252,10 +252,10 @@ parseOrderByEnum = \case
G.EnumValue v -> throw500 $
"enum value " <> showName v <> " not found in type order_by"
parseLimit :: ( MonadError QErr m ) => AnnInpVal -> m Int
parseLimit :: (MonadResolve m) => AnnInpVal -> m Int
parseLimit v = do
pgColVal <- pstValue . _apvValue <$> asPGColumnValue v
limit <- maybe noIntErr return $ pgColValueToInt pgColVal
pgColVal <- openOpaqueValue =<< asPGColumnValue v
limit <- maybe noIntErr return . pgColValueToInt . pstValue $ _apvValue pgColVal
-- validate int value
onlyPositiveInt limit
return limit
@ -266,13 +266,11 @@ type AnnSimpleSel = RS.AnnSimpleSelG UnresolvedVal
type PGColValMap = Map.HashMap G.Name AnnInpVal
pgColValToBoolExp
:: (MonadError QErr m)
=> PGColArgMap -> PGColValMap -> m AnnBoolExpUnresolved
pgColValToBoolExp :: (MonadResolve m) => PGColArgMap -> PGColValMap -> m AnnBoolExpUnresolved
pgColValToBoolExp colArgMap colValMap = do
colExps <- forM colVals $ \(name, val) ->
BoolFld <$> do
opExp <- AEQ True . UVPG <$> asPGColumnValue val
opExp <- AEQ True . mkParameterizablePGValue <$> asPGColumnValue val
colInfo <- onNothing (Map.lookup name colArgMap) $
throw500 $ "column name " <> showName name
<> " not found in column arguments map"
@ -282,7 +280,7 @@ pgColValToBoolExp colArgMap colValMap = do
colVals = Map.toList colValMap
fromFieldByPKey
:: ( MonadError QErr m
:: ( MonadResolve m
, MonadReader r m
, Has FieldMap r
, Has OrdByCtx r
@ -304,7 +302,7 @@ fromFieldByPKey tn colArgMap permFilter fld = fieldAsPath fld $ do
fldTy = _fType fld
convertSelect
:: ( MonadError QErr m, MonadReader r m, Has FieldMap r
:: ( MonadResolve m, MonadReader r m, Has FieldMap r
, Has OrdByCtx r, Has SQLGenCtx r
)
=> SelOpCtx -> Field -> m QueryRootFldUnresolved
@ -315,7 +313,7 @@ convertSelect opCtx fld =
SelOpCtx qt _ permFilter permLimit = opCtx
convertSelectByPKey
:: ( MonadError QErr m, MonadReader r m, Has FieldMap r
:: ( MonadResolve m, MonadReader r m, Has FieldMap r
, Has OrdByCtx r, Has SQLGenCtx r
)
=> SelPkOpCtx -> Field -> m QueryRootFldUnresolved
@ -326,22 +324,22 @@ convertSelectByPKey opCtx fld =
SelPkOpCtx qt _ permFilter colArgMap = opCtx
-- agg select related
parseColumns :: MonadError QErr m => AnnInpVal -> m [PGCol]
parseColumns :: (MonadResolve m) => AnnInpVal -> m [PGCol]
parseColumns val =
flip withArray val $ \_ vals ->
forM vals $ \v -> do
(_, enumVal) <- asEnumVal v
return $ PGCol $ G.unName $ G.unEnumValue enumVal
convertCount :: MonadError QErr m => ArgsMap -> m S.CountType
convertCount :: (MonadResolve m) => ArgsMap -> m S.CountType
convertCount args = do
columnsM <- withArgM args "columns" parseColumns
isDistinct <- or <$> withArgM args "distinct" parseDistinct
maybe (return S.CTStar) (mkCType isDistinct) columnsM
where
parseDistinct v = do
val <- pstValue . _apvValue <$> asPGColumnValue v
case val of
val <- openOpaqueValue =<< asPGColumnValue v
case pstValue $ _apvValue val of
PGValBoolean b -> return b
_ ->
throw500 "expecting Boolean for \"distinct\""
@ -360,9 +358,7 @@ convertColFlds ty selSet = fmap toFields $
"__typename" -> return $ RS.PCFExp $ G.unName $ G.unNamedType ty
n -> return $ RS.PCFCol $ PGCol $ G.unName n
convertAggFld
:: (Monad m, MonadError QErr m)
=> G.NamedType -> SelSet -> m RS.AggFlds
convertAggFld :: (MonadResolve m) => G.NamedType -> SelSet -> m RS.AggFlds
convertAggFld ty selSet = fmap toFields $
withSelSet selSet $ \fld -> do
let fType = _fType fld
@ -381,7 +377,7 @@ convertAggFld ty selSet = fmap toFields $
type AnnAggSel = RS.AnnAggSelG UnresolvedVal
fromAggField
:: ( MonadError QErr m, MonadReader r m, Has FieldMap r
:: ( MonadResolve m, MonadReader r m, Has FieldMap r
, Has OrdByCtx r, Has SQLGenCtx r
)
=> QualifiedTable -> AnnBoolExpPartialSQL
@ -399,7 +395,7 @@ fromAggField tn permFilter permLimit fld = fieldAsPath fld $ do
args = _fArguments fld
convertAggSelect
:: ( MonadError QErr m, MonadReader r m, Has FieldMap r
:: ( MonadResolve m, MonadReader r m, Has FieldMap r
, Has OrdByCtx r, Has SQLGenCtx r
)
=> SelOpCtx -> Field -> m QueryRootFldUnresolved
@ -411,7 +407,7 @@ convertAggSelect opCtx fld =
SelOpCtx qt _ permFilter permLimit = opCtx
parseFunctionArgs
::( MonadError QErr m)
:: (MonadResolve m)
=> FuncArgSeq
-> AnnInpVal
-> m (RS.FunctionArgsExpG UnresolvedVal)
@ -423,7 +419,7 @@ parseFunctionArgs argSeq val = flip withObject val $ \_ obj -> do
parsePositionalArg obj (FuncArgItem gqlName _ _) =
maybe (pure Nothing) (fmap Just . parseArg) $ OMap.lookup gqlName obj
parseArg = fmap (maybe (UVSQL S.SENull) UVPG) . asPGColumnValueM
parseArg = fmap (maybe (UVSQL S.SENull) mkParameterizablePGValue) . asPGColumnValueM
parseNamedArg obj (FuncArgItem gqlName maybeSqlName hasDefault) =
case OMap.lookup gqlName obj of
@ -436,7 +432,7 @@ parseFunctionArgs argSeq val = flip withObject val $ \_ obj -> do
else pure Nothing
fromFuncQueryField
:: (MonadError QErr m)
:: (MonadResolve m)
=> (Field -> m s)
-> QualifiedFunction -> FuncArgSeq
-> Field
@ -447,7 +443,7 @@ fromFuncQueryField fn qf argSeq fld = fieldAsPath fld $ do
RS.AnnFnSel qf funcArgs <$> fn fld
convertFuncQuerySimple
:: ( MonadError QErr m
:: ( MonadResolve m
, MonadReader r m
, Has FieldMap r
, Has OrdByCtx r
@ -461,7 +457,7 @@ convertFuncQuerySimple funcOpCtx fld =
FuncQOpCtx qt _ permFilter permLimit qf argSeq = funcOpCtx
convertFuncQueryAgg
:: ( MonadError QErr m
:: ( MonadResolve m
, MonadReader r m
, Has FieldMap r
, Has OrdByCtx r

View File

@ -1,3 +1,5 @@
{-# LANGUAGE UndecidableInstances #-}
module Hasura.GraphQL.Resolve.Types where
import Hasura.Prelude
@ -7,9 +9,11 @@ import qualified Data.Sequence as Seq
import qualified Data.Text as T
import qualified Language.GraphQL.Draft.Syntax as G
import Hasura.GraphQL.Validate.Types
import Hasura.RQL.Types.BoolExp
import Hasura.RQL.Types.Column
import Hasura.RQL.Types.Common
import Hasura.RQL.Types.Error
import Hasura.RQL.Types.Permission
import Hasura.SQL.Types
import Hasura.SQL.Value
@ -145,11 +149,6 @@ data AnnPGVal
= AnnPGVal
{ _apvVariable :: !(Maybe G.Variable)
, _apvIsNullable :: !Bool
, _apvType :: !PGColumnType
-- ^ Note: '_apvValue' is a @'WithScalarType' 'PGScalarValue'@, so it includes its type as a
-- 'PGScalarType'. However, we /also/ need to keep the original 'PGColumnType' information around
-- in case we need to re-parse a new value with its type because were reusing a cached query
-- plan.
, _apvValue :: !(WithScalarType PGScalarValue)
} deriving (Show, Eq)
@ -161,14 +160,64 @@ partialSQLExpToUnresolvedVal = \case
PSESessVar ty sessVar -> UVSessVar ty sessVar
PSESQLExp s -> UVSQL s
-- A value that will be converted to an sql expression eventually
-- | A value that will be converted to an sql expression eventually
data UnresolvedVal
-- From a session variable
= UVSessVar !(PGType PGScalarType) !SessVar
-- This is postgres
-- | a SQL value literal that can be parameterized over
| UVPG !AnnPGVal
-- This is a full resolved sql expression
-- | an arbitrary SQL expression, which /cannot/ be parameterized over
| UVSQL !S.SQLExp
deriving (Show, Eq)
type AnnBoolExpUnresolved = AnnBoolExp UnresolvedVal
-- | Tracks whether or not a query is /reusable/. Reusable queries are nice, since we can cache
-- their resolved ASTs and avoid re-resolving them if we receive an identical query. However, we
-- cant always safely reuse queries if they have variables, since some variable values can affect
-- the generated SQL. For example, consider the following query:
--
-- > query users_where($condition: users_bool_exp!) {
-- > users(where: $condition) {
-- > id
-- > }
-- > }
--
-- Different values for @$condition@ will produce completely different queries, so we cant reuse
-- its plan (unless the variable values were also all identical, of course, but we dont bother
-- caching those).
--
-- If a query does turn out to be reusable, we build up a 'ReusableVariableTypes' value that maps
-- variable names to their types so that we can use a fast path for validating new sets of
-- variables (namely 'Hasura.GraphQL.Validate.validateVariablesForReuse').
data QueryReusability
= Reusable !ReusableVariableTypes
| NotReusable
deriving (Show, Eq)
instance Semigroup QueryReusability where
Reusable a <> Reusable b = Reusable (a <> b)
_ <> _ = NotReusable
instance Monoid QueryReusability where
mempty = Reusable mempty
class (MonadError QErr m) => MonadResolve m where
recordVariableUse :: G.Variable -> PGColumnType -> m ()
markNotReusable :: m ()
newtype ResolveT m a = ResolveT { unResolveT :: StateT QueryReusability m a }
deriving (Functor, Applicative, Monad, MonadError e, MonadReader r)
instance (MonadError QErr m) => MonadResolve (ResolveT m) where
recordVariableUse varName varType = ResolveT $
modify' (<> Reusable (ReusableVariableTypes $ Map.singleton varName varType))
markNotReusable = ResolveT $ put NotReusable
runResolveT :: (Functor m) => ResolveT m a -> m (a, Maybe ReusableVariableTypes)
runResolveT = fmap (fmap getVarTypes) . flip runStateT mempty . unResolveT
where
getVarTypes = \case
Reusable varTypes -> Just varTypes
NotReusable -> Nothing
evalResolveT :: (Monad m) => ResolveT m a -> m a
evalResolveT = flip evalStateT mempty . unResolveT

View File

@ -9,7 +9,8 @@ module Hasura.GraphQL.Transport.HTTP.Protocol
, VariableValues
, encodeGQErr
, encodeGQResp
, GQResp(..)
, GQResult(..)
, GQResponse
, isExecError
, RemoteGqlResp(..)
, GraphqlResponse(..)
@ -82,18 +83,20 @@ encodeGQErr :: Bool -> QErr -> J.Value
encodeGQErr includeInternal qErr =
J.object [ "errors" J..= [encodeGQLErr includeInternal qErr]]
data GQResp
= GQSuccess !BL.ByteString
data GQResult a
= GQSuccess !a
| GQPreExecError ![J.Value]
| GQExecError ![J.Value]
deriving (Show, Eq)
deriving (Show, Eq, Functor, Foldable, Traversable)
isExecError :: GQResp -> Bool
type GQResponse = GQResult BL.ByteString
isExecError :: GQResult a -> Bool
isExecError = \case
GQExecError _ -> True
_ -> False
encodeGQResp :: GQResp -> EncJSON
encodeGQResp :: GQResponse -> EncJSON
encodeGQResp gqResp =
encJFromAssocList $ case gqResp of
GQSuccess r -> [("data", encJFromLBS r)]
@ -118,7 +121,7 @@ encodeRemoteGqlResp (RemoteGqlResp d e ex) =
-- | Represents a proper GraphQL response
data GraphqlResponse
= GRHasura !GQResp
= GRHasura !GQResponse
| GRRemote !RemoteGqlResp
encodeGraphqlResponse :: GraphqlResponse -> EncJSON

View File

@ -2,23 +2,23 @@ module Hasura.GraphQL.Validate
( validateGQ
, showVars
, RootSelSet(..)
, SelSet
, Field(..)
, getTypedOp
, QueryParts (..)
, QueryParts(..)
, getQueryParts
, getAnnVarVals
, ReusableVariableTypes(..)
, ReusableVariableValues
, validateVariablesForReuse
, isQueryInAllowlist
, VarPGTypes
, AnnPGVarVals
, getAnnPGVarVals
, Field(..)
, SelSet
) where
import Data.Has
import Hasura.Prelude
import Data.Has
import qualified Data.HashMap.Strict as Map
import qualified Data.HashSet as HS
import qualified Data.Sequence as Seq
@ -33,9 +33,6 @@ import Hasura.GraphQL.Validate.Types
import Hasura.RQL.Types
import Hasura.RQL.Types.QueryCollection
import Hasura.SQL.Types (WithScalarType)
import Hasura.SQL.Value (PGScalarValue)
data QueryParts
= QueryParts
{ qpOpDef :: !G.TypedOperationDefinition
@ -69,81 +66,70 @@ getTypedOp opNameM selSets opDefs =
throwVE $ "exactly one operation has to be present " <>
"in the document when operationName is not specified"
-- For all the variables defined there will be a value in the final map
-- | For all the variables defined there will be a value in the final map
-- If no default, not in variables and nullable, then null value
getAnnVarVals
:: ( MonadReader r m, Has TypeMap r
, MonadError QErr m
)
=> [G.VariableDefinition]
-> VariableValues
-> m AnnVarVals
getAnnVarVals varDefsL inpVals = withPathK "variableValues" $ do
validateVariables
:: (MonadReader r m, Has TypeMap r, MonadError QErr m)
=> [G.VariableDefinition] -> VariableValues -> m AnnVarVals
validateVariables varDefsL inpVals = withPathK "variableValues" $ do
varDefs <- onLeft (mkMapWith G._vdVariable varDefsL) $ \dups ->
throwVE $ "the following variables are defined more than once: " <>
showVars dups
let unexpectedVars = filter (not . (`Map.member` varDefs)) $ Map.keys inpVals
unless (null unexpectedVars) $
throwVE $ "unexpected variables in variableValues: " <>
showVars unexpectedVars
forM varDefs $ \(G.VariableDefinition var ty defM) -> do
let baseTy = getBaseTy ty
baseTyInfo <- getTyInfoVE baseTy
-- check that the variable is defined on input types
when (isObjTy baseTyInfo) $ throwVE $ objTyErrMsg baseTy
let defM' = bool (defM <|> Just G.VCNull) defM $ G.isNotNull ty
annDefM <- withPathK "defaultValue" $
mapM (validateInputValue constValueParser ty) defM'
let inpValM = Map.lookup var inpVals
annInpValM <- withPathK (G.unName $ G.unVariable var) $
mapM (validateInputValue jsonParser ty) inpValM
let varValM = annInpValM <|> annDefM
onNothing varValM $ throwVE $
"expecting a value for non-nullable variable: " <>
showVars [var] <>
" of type: " <> G.showGT ty <>
" in variableValues"
traverse validateVariable varDefs
where
objTyErrMsg namedTy =
"variables can only be defined on input types"
<> "(enums, scalars, input objects), but "
<> showNamedTy namedTy <> " is an object type"
validateVariable (G.VariableDefinition var ty defM) = do
let baseTy = getBaseTy ty
baseTyInfo <- getTyInfoVE baseTy
-- check that the variable is defined on input types
when (isObjTy baseTyInfo) $ throwVE $
"variables can only be defined on input types"
<> "(enums, scalars, input objects), but "
<> showNamedTy baseTy <> " is an object type"
let defM' = bool (defM <|> Just G.VCNull) defM $ G.isNotNull ty
annDefM <- withPathK "defaultValue" $
mapM (validateInputValue constValueParser ty) defM'
let inpValM = Map.lookup var inpVals
annInpValM <- withPathK (G.unName $ G.unVariable var) $
mapM (validateInputValue jsonParser ty) inpValM
let varValM = annInpValM <|> annDefM
onNothing varValM $ throwVE $
"expecting a value for non-nullable variable: " <>
showVars [var] <>
" of type: " <> G.showGT ty <>
" in variableValues"
showVars :: (Functor f, Foldable f) => f G.Variable -> Text
showVars = showNames . fmap G.unVariable
type VarPGTypes = Map.HashMap G.Variable PGColumnType
type AnnPGVarVals = Map.HashMap G.Variable (WithScalarType PGScalarValue)
-- this is in similar spirit to getAnnVarVals, however
-- here it is much simpler and can get rid of typemap requirement
-- combine the two if possible
getAnnPGVarVals
-- | This is similar in spirit to 'validateVariables' but uses preexisting 'ReusableVariableTypes'
-- information to parse Postgres values directly for use with a reusable query plan. (Ideally, it
-- would be nice to be able to share more of the logic instead of duplicating it.)
validateVariablesForReuse
:: (MonadError QErr m)
=> VarPGTypes
-> Maybe VariableValues
-> m AnnPGVarVals
getAnnPGVarVals varTypes varValsM =
flip Map.traverseWithKey varTypes $ \varName varType -> do
let unexpectedVars = filter
(not . (`Map.member` varTypes)) $ Map.keys varVals
=> ReusableVariableTypes -> Maybe VariableValues -> m ReusableVariableValues
validateVariablesForReuse (ReusableVariableTypes varTypes) varValsM =
withPathK "variableValues" $ do
let unexpectedVars = filter (not . (`Map.member` varTypes)) $ Map.keys varVals
unless (null unexpectedVars) $
throwVE $ "unexpected variables in variableValues: " <>
showVars unexpectedVars
varVal <- onNothing (Map.lookup varName varVals) $
throwVE $ "expecting a value for non-nullable variable: " <>
showVars [varName] <>
-- TODO: we don't have the graphql type
-- " of type: " <> T.pack (show varType) <>
" in variableValues"
parsePGScalarValue varType varVal
where
varVals = fromMaybe Map.empty varValsM
throwVE $ "unexpected variables: " <> showVars unexpectedVars
flip Map.traverseWithKey varTypes $ \varName varType ->
withPathK (G.unName $ G.unVariable varName) $ do
varVal <- onNothing (Map.lookup varName varVals) $
throwVE "expected a value for non-nullable variable"
-- TODO: we don't have the graphql type
-- <> " of type: " <> T.pack (show varType)
parsePGScalarValue varType varVal
where
varVals = fromMaybe Map.empty varValsM
validateFrag
:: (MonadError QErr m, MonadReader r m, Has TypeMap r)
@ -162,19 +148,12 @@ data RootSelSet
| RSubscription !Field
deriving (Show, Eq)
-- {-# SCC validateGQ #-}
validateGQ
:: (MonadError QErr m, MonadReader GCtx m)
-- => GraphQLRequest
=> QueryParts
-> m RootSelSet
validateGQ :: (MonadError QErr m, MonadReader GCtx m) => QueryParts -> m RootSelSet
validateGQ (QueryParts opDef opRoot fragDefsL varValsM) = do
ctx <- ask
-- annotate the variables of this operation
annVarVals <- getAnnVarVals (G._todVariableDefinitions opDef) $
fromMaybe Map.empty varValsM
annVarVals <- validateVariables (G._todVariableDefinitions opDef) $ fromMaybe Map.empty varValsM
-- annotate the fragments
fragDefs <- onLeft (mkMapWith G._fdName fragDefsL) $ \dups ->

View File

@ -59,6 +59,9 @@ module Hasura.GraphQL.Validate.Types
, hasNullVal
, getAnnInpValKind
, stripTypenames
, ReusableVariableTypes(..)
, ReusableVariableValues
, module Hasura.GraphQL.Utils
) where
@ -745,3 +748,10 @@ stripTypenames = map filterExecDef
let newSelset = filterSelSet $ G._fSelectionSet f
in Just $ G.SelectionField f{G._fSelectionSet = newSelset}
_ -> Just s
-- | Used by 'Hasura.GraphQL.Validate.validateVariablesForReuse' to parse new sets of variables for
-- reusable query plans; see also 'Hasura.GraphQL.Resolve.Types.QueryReusability'.
newtype ReusableVariableTypes
= ReusableVariableTypes { unReusableVarTypes :: Map.HashMap G.Variable RQL.PGColumnType }
deriving (Show, Eq, Semigroup, Monoid, J.ToJSON)
type ReusableVariableValues = Map.HashMap G.Variable (WithScalarType PGScalarValue)

View File

@ -6,10 +6,15 @@ module Hasura.RQL.DML.Mutation
)
where
import Hasura.Prelude
import qualified Data.HashMap.Strict as Map
import qualified Data.Sequence as DS
import qualified Database.PG.Query as Q
import qualified Hasura.SQL.DML as S
import Hasura.EncJSON
import Hasura.Prelude
import Hasura.RQL.DML.Internal
import Hasura.RQL.DML.Returning
import Hasura.RQL.DML.Select
@ -18,10 +23,6 @@ import Hasura.RQL.Types
import Hasura.SQL.Types
import Hasura.SQL.Value
import qualified Data.HashMap.Strict as Map
import qualified Database.PG.Query as Q
import qualified Hasura.SQL.DML as S
data Mutation
= Mutation
{ _mTable :: !QualifiedTable
@ -88,7 +89,7 @@ mutateAndFetchCols qt cols (cte, p) strfyNum =
AnnSelG selFlds tabFrom tabPerm noTableArgs strfyNum
mkSelCTEFromColVals
:: MonadError QErr m
:: (MonadError QErr m)
=> QualifiedTable -> [PGColumnInfo] -> [ColVals] -> m S.CTE
mkSelCTEFromColVals qt allCols colVals =
S.CTESelect <$> case colVals of

View File

@ -458,7 +458,7 @@ mkWaiApp
-> Bool
-> InstanceId
-> S.HashSet API
-> EL.LQOpts
-> EL.LiveQueriesOptions
-> IO HasuraApp
mkWaiApp isoLevel loggerCtx sqlGenCtx enableAL pool ci httpManager mode corsCfg
enableConsole consoleAssetsDir enableTelemetry instanceId apis lqOpts = do

View File

@ -14,6 +14,7 @@ import qualified Data.Text.Encoding as TE
import qualified Text.PrettyPrint.ANSI.Leijen as PP
import Data.Char (toLower)
import Data.Time.Clock.Units (milliseconds)
import Network.Wai.Handler.Warp (HostPreference)
import Options.Applicative
@ -95,7 +96,7 @@ data ServeOptions
, soEnableTelemetry :: !Bool
, soStringifyNum :: !Bool
, soEnabledAPIs :: !(Set.HashSet API)
, soLiveQueryOpts :: !LQ.LQOpts
, soLiveQueryOpts :: !LQ.LiveQueriesOptions
, soEnableAllowlist :: !Bool
, soEnabledLogTypes :: !(Set.HashSet L.EngineLogType)
, soLogLevel :: !L.LogLevel
@ -186,10 +187,10 @@ instance FromEnv [API] where
fromEnv = readAPIs
instance FromEnv LQ.BatchSize where
fromEnv = fmap LQ.mkBatchSize . readEither
fromEnv = fmap LQ.BatchSize . readEither
instance FromEnv LQ.RefetchInterval where
fromEnv = fmap LQ.refetchIntervalFromMilli . readEither
fromEnv = fmap (LQ.RefetchInterval . milliseconds . fromInteger) . readEither
instance FromEnv JWTConfig where
fromEnv = readJson
@ -353,14 +354,9 @@ mkServeOptions rso = do
_ -> corsCfg
mkLQOpts = do
mxRefetchIntM <- withEnv (rsoMxRefetchInt rso) $
fst mxRefetchDelayEnv
mxBatchSizeM <- withEnv (rsoMxBatchSize rso) $
fst mxBatchSizeEnv
fallbackRefetchIntM <- withEnv (rsoFallbackRefetchInt rso) $
fst fallbackRefetchDelayEnv
return $ LQ.mkLQOpts (LQ.mkMxOpts mxBatchSizeM mxRefetchIntM)
(LQ.mkFallbackOpts fallbackRefetchIntM)
mxRefetchIntM <- withEnv (rsoMxRefetchInt rso) $ fst mxRefetchDelayEnv
mxBatchSizeM <- withEnv (rsoMxBatchSize rso) $ fst mxBatchSizeEnv
return $ LQ.mkLiveQueriesOptions mxBatchSizeM mxRefetchIntM
mkExamplesDoc :: [[String]] -> PP.Doc

View File

@ -17,7 +17,7 @@ rebuild-ghc-options: true
extra-deps:
# use https URLs so that build systems can clone these repos
- git: https://github.com/hasura/pg-client-hs.git
commit: 1eb97c11f52b360ce7f796d9427dc294ce8e45fc
commit: de5c023ed7d2f75a77972ff52b6e5ed19d010ca2
- git: https://github.com/hasura/graphql-parser-hs.git
commit: 1ccdbb4c4d743b679f3141992df39feaee971640
- git: https://github.com/hasura/ci-info-hs.git

View File

@ -13,11 +13,11 @@ packages:
git: https://github.com/hasura/pg-client-hs.git
pantry-tree:
size: 1107
sha256: 8d5502889184e1b751d55c4b7d0e10985711284c6fae0b40cf7cb171949f0ccf
commit: 1eb97c11f52b360ce7f796d9427dc294ce8e45fc
sha256: e008d56e5b0535223b856be94a8e71e31d7dabe10b2951a38447df5089de1876
commit: de5c023ed7d2f75a77972ff52b6e5ed19d010ca2
original:
git: https://github.com/hasura/pg-client-hs.git
commit: 1eb97c11f52b360ce7f796d9427dc294ce8e45fc
commit: de5c023ed7d2f75a77972ff52b6e5ed19d010ca2
- completed:
cabal-file:
size: 3295

View File

@ -16,6 +16,10 @@
{
"c1": 1,
"c2": "test1"
},
{
"c1": 2,
"c2": "test2"
}
]
}
@ -26,6 +30,10 @@
{
"c1": 1,
"c2": "test1"
},
{
"c1": 2,
"c2": "test2"
}
]
}
@ -57,12 +65,23 @@
]
}
}
live_response: |
[
{
"c1": 1,
"c2": "test11"
},
{
"c1": 2,
"c2": "test2"
}
]
-
description: Delete mutation
name: delete_hge_tests_test_t2
query: |
mutation delete_test_t2 {
delete_hge_tests_test_t2(where: {c1: {_eq: 1}}) {
delete_hge_tests_test_t2(where: {c1: {_in: [1, 2]}}) {
affected_rows
}
}
@ -70,6 +89,6 @@
response: |
{
"delete_hge_tests_test_t2": {
"affected_rows": 1
"affected_rows": 2
}
}

View File

@ -192,21 +192,22 @@ class TestSubscriptionLiveQueries(object):
conf = yaml.safe_load(c)
queryTmplt = """
subscription {
hge_tests_live_query_{0}: hge_tests_test_t2(order_by: {c1: desc}, limit: 1) {
subscription ($result_limit: Int!) {
hge_tests_live_query_{0}: hge_tests_test_t2(order_by: {c1: asc}, limit: $result_limit) {
c1,
c2
}
}
"""
queries = [(0, 1), (1, 2), (2, 2)]
liveQs = []
for i in [0,1,2]:
for i, resultLimit in queries:
query = queryTmplt.replace('{0}',str(i))
headers={}
if hge_ctx.hge_key is not None:
headers['X-Hasura-Admin-Secret'] = hge_ctx.hge_key
subscrPayload = { 'query': query }
subscrPayload = { 'query': query, 'variables': { 'result_limit': resultLimit } }
respLive = ws_client.send_query(subscrPayload, query_id='live_'+str(i), headers=headers, timeout=15)
liveQs.append(respLive)
ev = next(respLive)
@ -230,16 +231,22 @@ class TestSubscriptionLiveQueries(object):
ev = next(mutResp)
assert ev['type'] == 'complete' and ev['id'] == 'mutation_'+str(index), ev
for i, respLive in enumerate(liveQs):
for (i, resultLimit), respLive in zip(queries, liveQs):
ev = next(respLive)
assert ev['type'] == 'data', ev
assert ev['id'] == 'live_' + str(i), ev
assert ev['payload']['data'] == {
'hge_tests_live_query_'+str(i): expected_resp[step['name']]['returning'] if 'returning' in expected_resp[
step['name']] else []
}, ev['payload']['data']
for i in [0,1,2]:
expectedReturnedResponse = []
if 'live_response' in step:
expectedReturnedResponse = json.loads(step['live_response'])
elif 'returning' in expected_resp[step['name']]:
expectedReturnedResponse = expected_resp[step['name']]['returning']
expectedLimitedResponse = expectedReturnedResponse[:resultLimit]
expectedLiveResponse = { 'hge_tests_live_query_'+str(i): expectedLimitedResponse }
assert ev['payload']['data'] == expectedLiveResponse, ev['payload']['data']
for i, _ in queries:
# stop live operation
frame = {
'id': 'live_'+str(i),