From 264d70644b784d851cc30b878dadab72f838c214 Mon Sep 17 00:00:00 2001 From: Alexis King Date: Wed, 28 Aug 2019 07:19:21 -0500 Subject: [PATCH] Multiplex all subscriptions, grouping them by their resolved SQL query --- server/graphql-engine.cabal | 15 +- server/src-lib/Control/Concurrent/Extended.hs | 16 + server/src-lib/Data/Time/Clock/Units.hs | 14 +- server/src-lib/Hasura/Cache.hs | 3 + server/src-lib/Hasura/Db.hs | 8 + server/src-lib/Hasura/GraphQL/Execute.hs | 49 +-- .../Hasura/GraphQL/Execute/LiveQuery.hs | 395 +++++------------- .../GraphQL/Execute/LiveQuery/Fallback.hs | 249 ----------- .../GraphQL/Execute/LiveQuery/Options.hs | 37 ++ .../Hasura/GraphQL/Execute/LiveQuery/Plan.hs | 208 +++++++++ .../Hasura/GraphQL/Execute/LiveQuery/Poll.hs | 386 +++++++++++++++++ .../Hasura/GraphQL/Execute/LiveQuery/State.hs | 147 +++++++ .../Hasura/GraphQL/Execute/LiveQuery/TMap.hs | 42 ++ .../Hasura/GraphQL/Execute/LiveQuery/Types.hs | 115 ----- server/src-lib/Hasura/GraphQL/Execute/Plan.hs | 2 +- .../src-lib/Hasura/GraphQL/Execute/Query.hs | 68 +-- .../Hasura/GraphQL/Resolve/InputValue.hs | 2 + .../Hasura/GraphQL/Transport/HTTP/Protocol.hs | 17 +- server/src-lib/Hasura/GraphQL/Validate.hs | 151 ++++--- server/src-lib/Hasura/Server/App.hs | 2 +- server/src-lib/Hasura/Server/Init.hs | 18 +- server/stack.yaml | 2 +- server/stack.yaml.lock | 6 +- 23 files changed, 1115 insertions(+), 837 deletions(-) create mode 100644 server/src-lib/Control/Concurrent/Extended.hs delete mode 100644 server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Fallback.hs create mode 100644 server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Options.hs create mode 100644 server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Plan.hs create mode 100644 server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs create mode 100644 server/src-lib/Hasura/GraphQL/Execute/LiveQuery/State.hs create mode 100644 server/src-lib/Hasura/GraphQL/Execute/LiveQuery/TMap.hs delete mode 100644 server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Types.hs diff --git a/server/graphql-engine.cabal b/server/graphql-engine.cabal index 556d33ce609..de32263f0e9 100644 --- a/server/graphql-engine.cabal +++ b/server/graphql-engine.cabal @@ -244,9 +244,11 @@ library , Hasura.GraphQL.Execute.Plan , Hasura.GraphQL.Execute.Query , Hasura.GraphQL.Execute.LiveQuery - , Hasura.GraphQL.Execute.LiveQuery.Types - , Hasura.GraphQL.Execute.LiveQuery.Multiplexed - , Hasura.GraphQL.Execute.LiveQuery.Fallback + , Hasura.GraphQL.Execute.LiveQuery.Options + , Hasura.GraphQL.Execute.LiveQuery.Plan + , Hasura.GraphQL.Execute.LiveQuery.Poll + , Hasura.GraphQL.Execute.LiveQuery.State + , Hasura.GraphQL.Execute.LiveQuery.TMap , Hasura.GraphQL.Resolve , Hasura.GraphQL.Resolve.Types , Hasura.GraphQL.Resolve.Context @@ -265,13 +267,14 @@ library , Hasura.HTTP + , Control.Concurrent.Extended , Control.Lens.Extended - , Data.Text.Extended , Data.Aeson.Extended - , Data.Sequence.NonEmpty - , Data.TByteString , Data.HashMap.Strict.InsOrd.Extended , Data.Parser.JSONPath + , Data.Sequence.NonEmpty + , Data.TByteString + , Data.Text.Extended , Data.Time.Clock.Units , Hasura.SQL.DML diff --git a/server/src-lib/Control/Concurrent/Extended.hs b/server/src-lib/Control/Concurrent/Extended.hs new file mode 100644 index 00000000000..ed83a5535ab --- /dev/null +++ b/server/src-lib/Control/Concurrent/Extended.hs @@ -0,0 +1,16 @@ +module Control.Concurrent.Extended + ( module Control.Concurrent + , threadDelay + ) where + +import Prelude + +import qualified Control.Concurrent as Base + +import Control.Concurrent hiding (threadDelay) +import Data.Time.Clock (DiffTime) +import Data.Time.Clock.Units (Microseconds (..)) + +-- | Like 'Base.threadDelay', but takes a 'DiffTime' instead of an 'Int'. +threadDelay :: DiffTime -> IO () +threadDelay = Base.threadDelay . round . Microseconds diff --git a/server/src-lib/Data/Time/Clock/Units.hs b/server/src-lib/Data/Time/Clock/Units.hs index dd17fe59181..5e81ff656d2 100644 --- a/server/src-lib/Data/Time/Clock/Units.hs +++ b/server/src-lib/Data/Time/Clock/Units.hs @@ -1,7 +1,7 @@ {-# LANGUAGE AllowAmbiguousTypes #-} -{-# LANGUAGE DataKinds #-} -{-# LANGUAGE DerivingVia #-} -{-# LANGUAGE TypeOperators #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DerivingVia #-} +{-# LANGUAGE TypeOperators #-} {-| Types for time intervals of various units. Each newtype wraps 'DiffTime', but they have different 'Num' instances. The intent is to use the record selectors to write literals with @@ -39,11 +39,11 @@ module Data.Time.Clock.Units , Nanoseconds(..) ) where -import Prelude +import Prelude -import Data.Proxy -import Data.Time.Clock -import GHC.TypeLits +import Data.Proxy +import Data.Time.Clock +import GHC.TypeLits type Seconds = DiffTime diff --git a/server/src-lib/Hasura/Cache.hs b/server/src-lib/Hasura/Cache.hs index a7dcb2d5d9e..a33d609774f 100644 --- a/server/src-lib/Hasura/Cache.hs +++ b/server/src-lib/Hasura/Cache.hs @@ -1,3 +1,6 @@ +{-| An in-memory, unbounded, capability-local cache implementation. By making the cache +capability-local, data may be recomputed up to once per capability (which usually means up to once +per OS thread), but write contention from multiple threads is unlikely. -} module Hasura.Cache ( UnboundedCache , initCache diff --git a/server/src-lib/Hasura/Db.hs b/server/src-lib/Hasura/Db.hs index 6d1ec196b49..e820169914f 100644 --- a/server/src-lib/Hasura/Db.hs +++ b/server/src-lib/Hasura/Db.hs @@ -45,6 +45,14 @@ instance (MonadTx m) => MonadTx (ReaderT s m) where instance (MonadTx m) => MonadTx (ValidateT e m) where liftTx = lift . liftTx +-- | Like 'Q.TxE', but defers acquiring a Postgres connection until the first execution of 'liftTx'. +-- If no call to 'liftTx' is ever reached (i.e. a successful result is returned or an error is +-- raised before ever executing a query), no connection is ever acquired. +-- +-- This is useful for certain code paths that only conditionally need database access. For example, +-- although most queries will eventually hit Postgres, introspection queries or queries that +-- exclusively use remote schemas never will; using 'LazyTx' keeps those branches from unnecessarily +-- allocating a connection. data LazyTx e a = LTErr !e | LTNoTx !a diff --git a/server/src-lib/Hasura/GraphQL/Execute.hs b/server/src-lib/Hasura/GraphQL/Execute.hs index f5810b1efa6..72e12611658 100644 --- a/server/src-lib/Hasura/GraphQL/Execute.hs +++ b/server/src-lib/Hasura/GraphQL/Execute.hs @@ -113,7 +113,7 @@ gatherTypeLocs gCtx nodes = -- This is for when the graphql query is validated type ExecPlanPartial - = GQExecPlan (GCtx, VQ.RootSelSet, [G.VariableDefinition]) + = GQExecPlan (GCtx, VQ.RootSelSet, Maybe VQ.ReusableVariableTypes) getExecPlanPartial :: (MonadError QErr m) @@ -140,9 +140,8 @@ getExecPlanPartial userInfo sc enableAL req = do case typeLoc of VT.TLHasuraType -> do - rootSelSet <- runReaderT (VQ.validateGQ queryParts) gCtx - let varDefs = G._todVariableDefinitions $ VQ.qpOpDef queryParts - return $ GExPHasura (gCtx, rootSelSet, varDefs) + (rootSelSet, varTypes) <- runReaderT (VQ.validateGQ queryParts) gCtx + return $ GExPHasura (gCtx, rootSelSet, varTypes) VT.TLRemoteType _ rsi -> return $ GExPRemote rsi opDef where @@ -167,7 +166,7 @@ getExecPlanPartial userInfo sc enableAL req = do data ExecOp = ExOpQuery !LazyRespTx !(Maybe EQ.GeneratedSqlMap) | ExOpMutation !LazyRespTx - | ExOpSubs !EL.LiveQueryOp + | ExOpSubs !EL.LiveQueryPlan -- The graphql query is resolved into an execution operation type ExecPlanResolved @@ -196,7 +195,7 @@ getResolvedExecPlan pgExecCtx planCache userInfo sqlGenCtx (tx, genSql) <- EQ.queryOpFromPlan usrVars queryVars queryPlan return $ ExOpQuery tx (Just genSql) EP.RPSubs subsPlan -> - ExOpSubs <$> EL.subsOpFromPlan pgExecCtx usrVars queryVars subsPlan + ExOpSubs <$> EL.reuseLiveQueryPlan pgExecCtx usrVars queryVars subsPlan Nothing -> noExistingPlan where GQLReq opNameM queryStr queryVars = reqUnparsed @@ -204,21 +203,19 @@ getResolvedExecPlan pgExecCtx planCache userInfo sqlGenCtx liftIO $ EP.addPlan scVer (userRole userInfo) opNameM queryStr plan planCache noExistingPlan = do - req <- toParsed reqUnparsed + req <- toParsed reqUnparsed partialExecPlan <- getExecPlanPartial userInfo sc enableAL req - forM partialExecPlan $ \(gCtx, rootSelSet, varDefs) -> + forM partialExecPlan $ \(gCtx, rootSelSet, varTypes) -> case rootSelSet of VQ.RMutation selSet -> ExOpMutation <$> getMutOp gCtx sqlGenCtx userInfo selSet VQ.RQuery selSet -> do - (queryTx, planM, genSql) <- getQueryOp gCtx sqlGenCtx - userInfo selSet varDefs - mapM_ (addPlanToCache . EP.RPQuery) planM + (queryTx, plan, genSql) <- getQueryOp gCtx sqlGenCtx userInfo selSet varTypes + traverse_ (addPlanToCache . EP.RPQuery) plan return $ ExOpQuery queryTx (Just genSql) VQ.RSubscription fld -> do - (lqOp, planM) <- getSubsOp pgExecCtx gCtx sqlGenCtx - userInfo reqUnparsed varDefs fld - mapM_ (addPlanToCache . EP.RPSubs) planM + (lqOp, plan) <- getSubsOp pgExecCtx gCtx sqlGenCtx userInfo varTypes fld + traverse_ (addPlanToCache . EP.RPSubs) plan return $ ExOpSubs lqOp -- Monad for resolving a hasura query/mutation @@ -258,10 +255,10 @@ getQueryOp -> SQLGenCtx -> UserInfo -> VQ.SelSet - -> [G.VariableDefinition] + -> Maybe VQ.ReusableVariableTypes -> m (LazyRespTx, Maybe EQ.ReusableQueryPlan, EQ.GeneratedSqlMap) -getQueryOp gCtx sqlGenCtx userInfo fields varDefs = - runE gCtx sqlGenCtx userInfo $ EQ.convertQuerySelSet varDefs fields +getQueryOp gCtx sqlGenCtx userInfo fields varTypes = + runE gCtx sqlGenCtx userInfo $ EQ.convertQuerySelSet varTypes fields mutationRootName :: Text mutationRootName = "mutation_root" @@ -318,17 +315,16 @@ getSubsOpM , MonadIO m ) => PGExecCtx - -> GQLReqUnparsed - -> [G.VariableDefinition] + -> Maybe VQ.ReusableVariableTypes -> VQ.Field - -> m (EL.LiveQueryOp, Maybe EL.SubsPlan) -getSubsOpM pgExecCtx req varDefs fld = + -> m (EL.LiveQueryPlan, Maybe EL.ReusableLiveQueryPlan) +getSubsOpM pgExecCtx varTypes fld = case VQ._fName fld of "__typename" -> throwVE "you cannot create a subscription on '__typename' field" _ -> do astUnresolved <- GR.queryFldToPGAST fld - EL.subsOpFromPGAST pgExecCtx req varDefs (VQ._fAlias fld, astUnresolved) + EL.buildLiveQueryPlan pgExecCtx (VQ._fAlias fld) astUnresolved varTypes getSubsOp :: ( MonadError QErr m @@ -338,12 +334,11 @@ getSubsOp -> GCtx -> SQLGenCtx -> UserInfo - -> GQLReqUnparsed - -> [G.VariableDefinition] + -> Maybe VQ.ReusableVariableTypes -> VQ.Field - -> m (EL.LiveQueryOp, Maybe EL.SubsPlan) -getSubsOp pgExecCtx gCtx sqlGenCtx userInfo req varDefs fld = - runE gCtx sqlGenCtx userInfo $ getSubsOpM pgExecCtx req varDefs fld + -> m (EL.LiveQueryPlan, Maybe EL.ReusableLiveQueryPlan) +getSubsOp pgExecCtx gCtx sqlGenCtx userInfo varTypes fld = + runE gCtx sqlGenCtx userInfo $ getSubsOpM pgExecCtx varTypes fld execRemoteGQ :: ( MonadIO m diff --git a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery.hs b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery.hs index 974e4dafc45..438fb8ef444 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery.hs @@ -1,305 +1,118 @@ +{-# LANGUAGE CPP #-} + +{-| += Reasonably efficient PostgreSQL live queries + +The module implements /query multiplexing/, which is our implementation strategy for live queries +(i.e. GraphQL subscriptions) made against Postgres. Fundamentally, our implementation is built +around polling, which is never ideal, but it’s a lot easier to implement than trying to do something +event-based. To minimize the resource cost of polling, we use /multiplexing/, which is essentially +a two-tier batching strategy. + +== The high-level idea + +The objective is to minimize the number of concurrent polling workers to reduce database load as +much as possible. A very naïve strategy would be to group identical queries together so we only have +one poller per /unique/ active subscription. That’s a good start, but of course, in practice, most +queries differ slightly. However, it happens that they very frequently /only differ in their +variables/ (that is, GraphQL query variables and session variables), and in those cases, we try to +generated parameterized SQL. This means that the same prepared SQL query can be reused, just with a +different set of variables. + +To give a concrete example, consider the following query: + +> subscription vote_count($post_id: Int!) { +> vote_count(where: {post_id: {_eq: $post_id}}) { +> votes +> } +> } + +No matter what the client provides for @$post_id@, we will always generate the same SQL: + +> SELECT votes FROM vote_count WHERE post_id = $1 + +If multiple clients subscribe to @vote_count@, we can certainly reuse the same prepared query. For +example, imagine we had 10 concurrent subscribers, each listening on a distinct @$post_id@: + +> let postIds = [3, 11, 32, 56, 13, 97, 24, 43, 109, 48] + +We could iterate over @postIds@ in Haskell, executing the same prepared query 10 times: + +> for postIds $ \postId -> +> Q.listQE defaultTxErrorHandler preparedQuery (Identity postId) True + +Sadly, that on its own isn’t good enough. The overhead of running each query is large enough that +Postgres becomes overwhelmed if we have to serve lots of concurrent subscribers. Therefore, what we +want to be able to do is somehow make one query instead of ten. + +=== Multiplexing + +This is where multiplexing comes in. By taking advantage of Postgres +, +we can do the iteration in Postgres rather than in Haskell, allowing us to pay the query overhead +just once for all ten subscribers. Essentially, lateral joins add 'map'-like functionality to SQL, +so we can run our query once per @$post_id@: + +> SELECT results.votes +> FROM unnest($1::integer[]) query_variables (post_id) +> LEFT JOIN LATERAL ( +> SELECT coalesce(json_agg(votes), '[]') +> FROM vote_count WHERE vote_count.post_id = query_variables.post_id +> ) results ON true + +If we generalize this approach just a little bit more, we can apply this transformation to arbitrary +queries parameterized over arbitrary session and query variables! + +== Implementation overview + +To support query multiplexing, we maintain a tree of the following types, where @>@ should be read +as “contains”: + +@ +'LiveQueriesState' > 'Poller' > 'Cohort' > 'Subscriber' +@ + +Here’s a brief summary of each type’s role: + + * A 'Subscriber' is an actual client with an open websocket connection. + + * A 'Cohort' is a set of 'Subscriber's that are all subscribed to the same query /with the exact + same variables/. (By batching these together, we can do better than multiplexing, since we can + just query the data once.) + + * A 'Poller' is a worker thread for a single, multiplexed query. It fetches data for a set of + 'Cohort's that all use the same parameterized query, but have different sets of variables. + + * Finally, the 'LiveQueriesState' is the top-level container that holds all the active 'Poller's. + +Additional details are provided by the documentation for individual bindings. +-} module Hasura.GraphQL.Execute.LiveQuery - ( RefetchInterval - , refetchIntervalFromMilli - , LQM.BatchSize - , LQM.mkBatchSize - , LQM.MxOpts - , LQM.mkMxOpts - , LQF.FallbackOpts - , LQF.mkFallbackOpts - , LQOpts - , mkLQOpts + ( LiveQueryPlan + , ReusableLiveQueryPlan + , reuseLiveQueryPlan + , buildLiveQueryPlan , LiveQueriesState , initLiveQueriesState , dumpLiveQueriesState - , LiveQueryOp + , LiveQueriesOptions(..) + , BatchSize(..) + , RefetchInterval(..) + , mkLiveQueriesOptions + , LiveQueryId , addLiveQuery , removeLiveQuery - - , SubsPlan - , subsOpFromPlan - , subsOpFromPGAST ) where -import Control.Lens -import Data.Has +import Hasura.GraphQL.Execute.LiveQuery.Options +import Hasura.GraphQL.Execute.LiveQuery.Plan +import Hasura.GraphQL.Execute.LiveQuery.State -import qualified Control.Concurrent.STM as STM -import qualified Data.Aeson as J -import qualified Data.HashMap.Strict as Map -import qualified Data.HashSet as Set -import qualified Data.Text as T -import qualified Database.PG.Query as Q -import qualified Language.GraphQL.Draft.Syntax as G - -import qualified Hasura.GraphQL.Execute.LiveQuery.Fallback as LQF -import qualified Hasura.GraphQL.Execute.LiveQuery.Multiplexed as LQM -import qualified Hasura.GraphQL.Resolve as GR -import qualified Hasura.GraphQL.Transport.HTTP.Protocol as GH -import qualified Hasura.GraphQL.Validate as GV -import qualified Hasura.SQL.DML as S - -import Hasura.Db -import Hasura.EncJSON -import Hasura.GraphQL.Execute.LiveQuery.Types +#ifdef __HADDOCK_VERSION__ import Hasura.Prelude -import Hasura.RQL.DML.Select (asSingleRowJsonResp) -import Hasura.RQL.Types -import Hasura.SQL.Error -import Hasura.SQL.Types -import Hasura.SQL.Value - -data LQOpts - = LQOpts - { _loMxOpts :: LQM.MxOpts - , _loFallbackOpts :: LQF.FallbackOpts - } deriving (Show, Eq) - --- | Required for logging server configuration on startup -instance J.ToJSON LQOpts where - toJSON (LQOpts mxOpts fbOpts) = - J.object [ "multiplexed_options" J..= mxOpts - , "fallback_options" J..= fbOpts - ] - -mkLQOpts :: LQM.MxOpts -> LQF.FallbackOpts -> LQOpts -mkLQOpts = LQOpts - -data LiveQueriesState - = LiveQueriesState - { _lqsMultiplexed :: !LQM.LiveQueriesState - , _lqsFallback :: !LQF.LiveQueriesState - , _lqsPGExecTx :: !PGExecCtx - } - -dumpLiveQueriesState - :: Bool -> LiveQueriesState -> IO J.Value -dumpLiveQueriesState extended (LiveQueriesState mx fallback _) = do - mxJ <- LQM.dumpLiveQueriesState extended mx - fallbackJ <- LQF.dumpLiveQueriesState fallback - return $ J.object - [ "fallback" J..= fallbackJ - , "multiplexed" J..= mxJ - ] - -initLiveQueriesState - :: LQOpts - -> PGExecCtx - -> IO LiveQueriesState -initLiveQueriesState (LQOpts mxOpts fallbackOpts) pgExecCtx = do - (mxMap, fallbackMap) <- STM.atomically $ - (,) <$> LQM.initLiveQueriesState mxOpts - <*> LQF.initLiveQueriesState fallbackOpts - return $ LiveQueriesState mxMap fallbackMap pgExecCtx - -data LiveQueryOp - = LQMultiplexed !LQM.MxOp - | LQFallback !LQF.FallbackOp - -data LiveQueryId - = LQIMultiplexed !LQM.LiveQueryId - | LQIFallback !LQF.LiveQueryId - -addLiveQuery - :: LiveQueriesState - -> LiveQueryOp - -- the action to be executed when result changes - -> OnChange - -> IO LiveQueryId -addLiveQuery lqState liveQOp onResultAction = - case liveQOp of - LQMultiplexed mxOp -> - LQIMultiplexed <$> LQM.addLiveQuery pgExecCtx mxMap mxOp onResultAction - LQFallback fallbackOp -> - LQIFallback <$> LQF.addLiveQuery - pgExecCtx fallbackMap fallbackOp onResultAction - where - LiveQueriesState mxMap fallbackMap pgExecCtx = lqState - -removeLiveQuery - :: LiveQueriesState - -- the query and the associated operation - -> LiveQueryId - -> IO () -removeLiveQuery lqState = \case - LQIMultiplexed lqId -> LQM.removeLiveQuery mxMap lqId - LQIFallback lqId -> LQF.removeLiveQuery fallbackMap lqId - where - LiveQueriesState mxMap fallbackMap _ = lqState - -data SubsPlan - = SubsPlan - { _sfMxOpCtx :: !LQM.MxOpCtx - , _sfVariableTypes :: !GV.VarPGTypes - } - -instance J.ToJSON SubsPlan where - toJSON (SubsPlan opCtx varTypes) = - J.object [ "mx_op_ctx" J..= opCtx - , "variable_types" J..= varTypes - ] - -collectNonNullableVars - :: (MonadState GV.VarPGTypes m) - => GR.UnresolvedVal -> m () -collectNonNullableVars = \case - GR.UVPG annPGVal -> do - let GR.AnnPGVal varM isNullable colTy _ = annPGVal - case (varM, isNullable) of - (Just var, False) -> modify (Map.insert var colTy) - _ -> return () - _ -> return () - -type TextEncodedVariables - = Map.HashMap G.Variable TxtEncodedPGVal - --- | converts the partial unresolved value containing --- variables, session variables to an SQL expression --- referring correctly to the values from '_subs' temporary table --- The variables are at _subs.result_vars.variables and --- session variables at _subs.result_vars.user -toMultiplexedQueryVar - :: (MonadState GV.AnnPGVarVals m) - => GR.UnresolvedVal -> m S.SQLExp -toMultiplexedQueryVar = \case - GR.UVPG annPGVal -> - let GR.AnnPGVal varM isNullable _ colVal = annPGVal - in case (varM, isNullable) of - -- we don't check for nullability as - -- this is only used for reusable plans - -- the check has to be made before this - (Just var, _) -> do - modify $ Map.insert var colVal - return $ fromResVars (PGTypeScalar $ pstType colVal) - [ "variables" - , G.unName $ G.unVariable var - ] - _ -> return $ toTxtValue colVal - GR.UVSessVar ty sessVar -> - return $ fromResVars ty [ "user", T.toLower sessVar] - GR.UVSQL sqlExp -> return sqlExp - where - fromResVars ty jPath = - flip S.SETyAnn (S.mkTypeAnn ty) $ S.SEOpApp (S.SQLOp "#>>") - [ S.SEQIden $ S.QIden (S.QualIden $ Iden "_subs") - (Iden "result_vars") - , S.SEArray $ map S.SELit jPath - ] - --- | Creates a live query operation and if possible, a reusable plan --- -subsOpFromPGAST - :: ( MonadError QErr m - , MonadReader r m - , Has UserInfo r - , MonadIO m - ) - - => PGExecCtx - -- ^ to validate arguments - - -> GH.GQLReqUnparsed - -- ^ used as part of an identifier in the underlying live query systems - -- to avoid unnecessary load on Postgres where possible - - -> [G.VariableDefinition] - -- ^ variable definitions as seen in the subscription, needed in - -- checking whether the subscription can be multiplexed or not - - -> (G.Alias, GR.QueryRootFldUnresolved) - -- ^ The alias and the partially processed live query field - - -> m (LiveQueryOp, Maybe SubsPlan) -subsOpFromPGAST pgExecCtx reqUnparsed varDefs (fldAls, astUnresolved) = do - userInfo <- asks getter - - -- collect the variables (with their types) used inside the subscription - (_, varTypes) <- flip runStateT mempty $ GR.traverseQueryRootFldAST - collectNonNullableVars astUnresolved - - -- Can the subscription be multiplexed? - -- Only if all variables are non null and can be prepared - if Set.fromList (Map.keys varTypes) == allVars - then mkMultiplexedOp userInfo varTypes - else mkFallbackOp userInfo - where - allVars = Set.fromList $ map G._vdVariable varDefs - - -- multiplexed subscription - mkMultiplexedOp userInfo varTypes = do - (astResolved, annVarVals) <- - flip runStateT mempty $ GR.traverseQueryRootFldAST - toMultiplexedQueryVar astUnresolved - let mxOpCtx = LQM.mkMxOpCtx (userRole userInfo) - (GH._grQuery reqUnparsed) fldAls $ - GR.toPGQuery astResolved - - -- We need to ensure that the values provided for variables - -- are correct according to Postgres. Without this check - -- an invalid value for a variable for one instance of the - -- subscription will take down the entire multiplexed query - txtEncodedVars <- validateAnnVarValsOnPg pgExecCtx annVarVals - let mxOp = (mxOpCtx, userVars userInfo, txtEncodedVars) - return (LQMultiplexed mxOp, Just $ SubsPlan mxOpCtx varTypes) - - -- fallback tx subscription - mkFallbackOp userInfo = do - (astResolved, prepArgs) <- - flip runStateT mempty $ GR.traverseQueryRootFldAST - GR.resolveValPrep astUnresolved - let tx = withUserInfo userInfo $ liftTx $ - asSingleRowJsonResp (GR.toPGQuery astResolved) $ toList prepArgs - fallbackOp = LQF.mkFallbackOp userInfo reqUnparsed $ withAlias tx - return (LQFallback fallbackOp, Nothing) - - fldAlsT = G.unName $ G.unAlias fldAls - withAlias tx = - encJFromAssocList . pure . (,) fldAlsT <$> tx - --- | Checks if the provided arguments are valid values for their corresponding types. --- Generates SQL of the format "select 'v1'::t1, 'v2'::t2 ..." -validateAnnVarValsOnPg - :: ( MonadError QErr m - , MonadIO m - ) - => PGExecCtx - -> GV.AnnPGVarVals - -> m TextEncodedVariables -validateAnnVarValsOnPg pgExecCtx annVarVals = do - let valSel = mkValidationSel $ Map.elems annVarVals - - Q.Discard _ <- runTx' $ liftTx $ - Q.rawQE dataExnErrHandler (Q.fromBuilder $ toSQL valSel) [] False - return $ fmap (txtEncodedPGVal . pstValue) annVarVals - - where - mkExtrs = map (flip S.Extractor Nothing . toTxtValue) - mkValidationSel vars = - S.mkSelect { S.selExtr = mkExtrs vars } - runTx' tx = do - res <- liftIO $ runExceptT (runLazyTx' pgExecCtx tx) - liftEither res - - -- Explicitly look for the class of errors raised when the format of a value provided - -- for a type is incorrect. - dataExnErrHandler = mkTxErrorHandler (has _PGDataException) - --- | Use the existing plan with new variables and session variables --- to create a live query operation -subsOpFromPlan - :: ( MonadError QErr m - , MonadIO m - ) - => PGExecCtx - -> UserVars - -> Maybe GH.VariableValues - -> SubsPlan - -> m LiveQueryOp -subsOpFromPlan pgExecCtx usrVars varValsM (SubsPlan mxOpCtx varTypes) = do - annVarVals <- GV.getAnnPGVarVals varTypes varValsM - txtEncodedVars <- validateAnnVarValsOnPg pgExecCtx annVarVals - return $ LQMultiplexed (mxOpCtx, usrVars, txtEncodedVars) +import Hasura.GraphQL.Execute.LiveQuery.Poll +#endif diff --git a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Fallback.hs b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Fallback.hs deleted file mode 100644 index d195a00c96c..00000000000 --- a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Fallback.hs +++ /dev/null @@ -1,249 +0,0 @@ -module Hasura.GraphQL.Execute.LiveQuery.Fallback - ( RefetchInterval - , refetchIntervalFromMilli - , FallbackOpts - , mkFallbackOpts - - , LiveQueriesState - , initLiveQueriesState - , dumpLiveQueriesState - - , FallbackOp - , mkFallbackOp - , LiveQueryId - , addLiveQuery - , removeLiveQuery - ) where - -import qualified Control.Concurrent.Async as A -import qualified Control.Concurrent.STM as STM -import qualified Data.Aeson as J -import qualified ListT -import qualified StmContainers.Map as STMMap - -import Control.Concurrent (threadDelay) - -import Hasura.EncJSON -import Hasura.GraphQL.Execute.LiveQuery.Types -import Hasura.GraphQL.Transport.HTTP.Protocol -import Hasura.Prelude -import Hasura.RQL.Types - -data LiveQuery - = LiveQuery - { _lqUser :: !UserInfo - , _lqRequest :: !GQLReqUnparsed - } deriving (Show, Eq, Generic) - -instance J.ToJSON LiveQuery where - toJSON (LiveQuery user req) = - J.object [ "user" J..= userVars user - , "request" J..= req - ] - -instance Hashable LiveQuery - -data LQHandler - = LQHandler - -- the tx to be executed - { _lqhRespTx :: !LazyRespTx - -- previous result - , _lqhPrevRes :: !RespTV - -- the actions that have been run previously - -- we run these if the response changes - , _lqhCurOps :: !Sinks - -- we run these operations regardless - -- and then merge them with current operations - , _lqhNewOps :: !Sinks - } - -data FallbackOpts - = FallbackOpts - { _foRefetchInterval :: !RefetchInterval - } deriving (Show, Eq) - -instance J.ToJSON FallbackOpts where - toJSON (FallbackOpts refetchInterval) = - J.object [ "refetch_delay" J..= refetchInterval - ] - --- 1 second -defaultRefetchInterval :: RefetchInterval -defaultRefetchInterval = - refetchIntervalFromMilli 1000 - -mkFallbackOpts - :: Maybe RefetchInterval - -> FallbackOpts -mkFallbackOpts refetchIntervalM = - FallbackOpts - (fromMaybe defaultRefetchInterval refetchIntervalM) - -data LiveQueriesState - = LiveQueriesState - { _lqsOptions :: !FallbackOpts - , _lqsLiveQueryMap :: !LiveQueryMap - } - -dumpLiveQueriesState :: LiveQueriesState -> IO J.Value -dumpLiveQueriesState (LiveQueriesState opts lqMap) = do - lqMapJ <- dumpLiveQueryMap lqMap - return $ J.object - [ "options" J..= opts - , "live_queries_map" J..= lqMapJ - ] - -initLiveQueriesState - :: FallbackOpts - -> STM.STM LiveQueriesState -initLiveQueriesState lqOptions = - LiveQueriesState - lqOptions - <$> STMMap.new - -data LiveQueryId - = LiveQueryId - { _lqiQuery :: !LiveQuery - , _lqiSink :: !SinkId - } - -type LiveQueryMap = STMMap.Map LiveQuery (LQHandler, ThreadTM) - -dumpLiveQueryMap :: LiveQueryMap -> IO J.Value -dumpLiveQueryMap lqMap = - fmap J.toJSON $ STM.atomically $ do - entries <- ListT.toList $ STMMap.listT lqMap - forM entries $ \(lq, (lqHandler, threadRef)) -> do - prevResHash <- STM.readTVar $ _lqhPrevRes lqHandler - threadId <- A.asyncThreadId <$> STM.readTMVar threadRef - curOps <- toListTMap $ _lqhCurOps lqHandler - newOps <- toListTMap $ _lqhNewOps lqHandler - return $ J.object - [ "query" J..= lq - , "thread_id" J..= show threadId - , "current_ops" J..= map fst curOps - , "new_ops" J..= map fst newOps - , "previous_result_hash" J..= prevResHash - ] - -removeLiveQuery - :: LiveQueriesState - -- the query and the associated operation - -> LiveQueryId - -> IO () -removeLiveQuery lqState (LiveQueryId liveQ k) = do - - -- clean the handler's state - threadRefM <- STM.atomically $ do - lqHandlerM <- STMMap.lookup liveQ lqMap - maybe (return Nothing) cleanLQHandler lqHandlerM - - -- cancel the polling thread - onJust threadRefM A.cancel - - where - lqMap = _lqsLiveQueryMap lqState - cleanLQHandler (handler, threadRef) = do - let curOps = _lqhCurOps handler - newOps = _lqhNewOps handler - deleteTMap k curOps - deleteTMap k newOps - cancelPollThread <- (&&) - <$> nullTMap curOps - <*> nullTMap newOps - -- if this happens to be the last operation, take the - -- ref for the polling thread to cancel it - if cancelPollThread then do - STMMap.delete liveQ lqMap - Just <$> STM.takeTMVar threadRef - else return Nothing - --- the transaction associated with this query -type FallbackOp = (LiveQuery, LazyRespTx) - -mkFallbackOp - :: UserInfo -> GQLReqUnparsed - -> LazyRespTx -> FallbackOp -mkFallbackOp userInfo req tx = - (LiveQuery userInfo req, tx) - - -addLiveQuery - :: PGExecCtx - -> LiveQueriesState - -- the query - -> FallbackOp - -- the action to be executed when result changes - -> OnChange - -> IO LiveQueryId -addLiveQuery pgExecCtx lqState (liveQ, respTx) onResultAction= do - - sinkId <- newSinkId - - -- a handler is returned only when it is newly created - handlerM <- STM.atomically $ do - lqHandlerM <- STMMap.lookup liveQ lqMap - maybe (newHandler sinkId) (addToExistingHandler sinkId) lqHandlerM - - -- we can then attach a polling thread if it is new - -- the livequery can only be cancelled after putTMVar - onJust handlerM $ \(handler, pollerThreadTM) -> do - threadRef <- A.async $ forever $ do - pollQuery pgExecCtx handler - threadDelay $ refetchIntervalToMicro refetchInterval - STM.atomically $ STM.putTMVar pollerThreadTM threadRef - - return $ LiveQueryId liveQ sinkId - - where - - LiveQueriesState lqOpts lqMap = lqState - FallbackOpts refetchInterval = lqOpts - - addToExistingHandler sinkId (handler, _) = do - insertTMap onResultAction sinkId $ _lqhNewOps handler - return Nothing - - newHandler sinkId = do - handler <- LQHandler - <$> return respTx - <*> STM.newTVar Nothing - <*> newTMap - <*> newTMap - insertTMap onResultAction sinkId $ _lqhNewOps handler - asyncRefTM <- STM.newEmptyTMVar - STMMap.insert (handler, asyncRefTM) liveQ lqMap - return $ Just (handler, asyncRefTM) - -pollQuery - :: PGExecCtx - -> LQHandler - -> IO () -pollQuery pgExecCtx (LQHandler respTx respTV curOpsTV newOpsTV) = do - - resOrErr <- runExceptT $ runLazyTx pgExecCtx respTx - - let (resp, respHashM) = case encJToLBS <$> resOrErr of - Left e -> (GQExecError [encodeGQErr False e], Nothing) - Right lbs -> (GQSuccess lbs, Just $ mkRespHash lbs) - - -- extract the current and new operations - (curOps, newOps) <- STM.atomically $ do - curOpsL <- toListTMap curOpsTV - newOpsL <- toListTMap newOpsTV - forM_ newOpsL $ \(k, action) -> insertTMap action k curOpsTV - resetTMap newOpsTV - return (curOpsL, newOpsL) - - runOperations resp newOps - - -- write to the current websockets if needed - prevRespHashM <- STM.readTVarIO respTV - when (isExecError resp || respHashM /= prevRespHashM) $ do - runOperations resp curOps - STM.atomically $ STM.writeTVar respTV respHashM - - where - runOperation resp action = action resp - runOperations resp = - void . A.mapConcurrently (runOperation resp . snd) diff --git a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Options.hs b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Options.hs new file mode 100644 index 00000000000..786be25e476 --- /dev/null +++ b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Options.hs @@ -0,0 +1,37 @@ +module Hasura.GraphQL.Execute.LiveQuery.Options + ( LiveQueriesOptions(..) + , BatchSize(..) + , RefetchInterval(..) + , mkLiveQueriesOptions + ) where + +import Hasura.Prelude + +import qualified Data.Aeson as J + +import Data.Time.Clock (DiffTime) +import Data.Time.Clock.Units (seconds) + +data LiveQueriesOptions + = LiveQueriesOptions + { _lqoBatchSize :: !BatchSize + , _lqoRefetchInterval :: !RefetchInterval + } deriving (Show, Eq) + +mkLiveQueriesOptions :: Maybe BatchSize -> Maybe RefetchInterval -> LiveQueriesOptions +mkLiveQueriesOptions batchSize refetchInterval = LiveQueriesOptions + { _lqoBatchSize = fromMaybe (BatchSize 100) batchSize + , _lqoRefetchInterval = fromMaybe (RefetchInterval $ seconds 1) refetchInterval + } + +instance J.ToJSON LiveQueriesOptions where + toJSON (LiveQueriesOptions batchSize refetchInterval) = + J.object [ "batch_size" J..= batchSize + , "refetch_delay" J..= refetchInterval + ] + +newtype BatchSize = BatchSize { unBatchSize :: Int } + deriving (Show, Eq, J.ToJSON) + +newtype RefetchInterval = RefetchInterval { unRefetchInterval :: DiffTime } + deriving (Show, Eq, J.ToJSON) diff --git a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Plan.hs b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Plan.hs new file mode 100644 index 00000000000..76542ba8d2c --- /dev/null +++ b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Plan.hs @@ -0,0 +1,208 @@ +-- | Construction of multiplexed live query plans; see "Hasura.GraphQL.Execute.LiveQuery" for +-- details. +module Hasura.GraphQL.Execute.LiveQuery.Plan + ( MultiplexedQuery + , mkMultiplexedQuery + , unMultiplexedQuery + , toMultiplexedQueryVar + + , LiveQueryPlan(..) + , ParameterizedLiveQueryPlan(..) + , ReusableLiveQueryPlan + , ValidatedQueryVariables + , buildLiveQueryPlan + , reuseLiveQueryPlan + ) where + +import Hasura.Prelude + +import qualified Data.Aeson.Casing as J +import qualified Data.Aeson.Extended as J +import qualified Data.Aeson.TH as J +import qualified Data.HashMap.Strict as Map +import qualified Data.Text as T +import qualified Database.PG.Query as Q +import qualified Language.GraphQL.Draft.Syntax as G + +import Control.Lens +import Data.Has + +import qualified Hasura.GraphQL.Resolve as GR +import qualified Hasura.GraphQL.Transport.HTTP.Protocol as GH +import qualified Hasura.GraphQL.Validate as GV +import qualified Hasura.SQL.DML as S + +import Hasura.Db +import Hasura.RQL.Types +import Hasura.SQL.Error +import Hasura.SQL.Types +import Hasura.SQL.Value + +-- ------------------------------------------------------------------------------------------------- +-- Multiplexed queries + +newtype MultiplexedQuery = MultiplexedQuery { unMultiplexedQuery :: Q.Query } + deriving (Show, Eq, Hashable, J.ToJSON) + +mkMultiplexedQuery :: Q.Query -> MultiplexedQuery +mkMultiplexedQuery baseQuery = + MultiplexedQuery . Q.fromText $ foldMap Q.getQueryText [queryPrefix, baseQuery, querySuffix] + where + queryPrefix = + [Q.sql| + select + _subs.result_id, _fld_resp.root as result + from + unnest( + $1::uuid[], $2::json[] + ) _subs (result_id, result_vars) + left outer join lateral + ( + |] + + querySuffix = + [Q.sql| + ) _fld_resp ON ('true') + |] + +-- | converts the partial unresolved value containing +-- variables, session variables to an SQL expression +-- referring correctly to the values from '_subs' temporary table +-- The variables are at _subs.result_vars.variables and +-- session variables at _subs.result_vars.user +toMultiplexedQueryVar + :: (MonadState GV.ReusableVariableValues m) + => GR.UnresolvedVal -> m S.SQLExp +toMultiplexedQueryVar = \case + GR.UVPG annPGVal -> + let GR.AnnPGVal varM isNullable _ colVal = annPGVal + in case (varM, isNullable) of + -- we don't check for nullability as + -- this is only used for reusable plans + -- the check has to be made before this + (Just var, _) -> do + modify $ Map.insert var colVal + return $ fromResVars (PGTypeScalar $ pstType colVal) + [ "variables" + , G.unName $ G.unVariable var + ] + _ -> return $ toTxtValue colVal + GR.UVSessVar ty sessVar -> + return $ fromResVars ty [ "user", T.toLower sessVar] + GR.UVSQL sqlExp -> return sqlExp + where + fromResVars ty jPath = + flip S.SETyAnn (S.mkTypeAnn ty) $ S.SEOpApp (S.SQLOp "#>>") + [ S.SEQIden $ S.QIden (S.QualIden $ Iden "_subs") + (Iden "result_vars") + , S.SEArray $ map S.SELit jPath + ] + +-- ------------------------------------------------------------------------------------------------- +-- Variable validation + +-- | When running multiplexed queries, we have to be especially careful about user input, since +-- invalid values will cause the query to fail, causing collateral damage for anyone else +-- multiplexed into the same query. Therefore, we pre-validate variables against Postgres by +-- executing a no-op query of the shape +-- +-- > SELECT 'v1'::t1, 'v2'::t2, ..., 'vn'::tn +-- +-- so if any variable values are invalid, the error will be caught early. +newtype ValidatedQueryVariables = ValidatedQueryVariables (Map.HashMap G.Variable TxtEncodedPGVal) + deriving (Show, Eq, Hashable, J.ToJSON) + +-- | Checks if the provided arguments are valid values for their corresponding types. +-- Generates SQL of the format "select 'v1'::t1, 'v2'::t2 ..." +validateQueryVariables + :: (MonadError QErr m, MonadIO m) + => PGExecCtx + -> GV.ReusableVariableValues + -> m ValidatedQueryVariables +validateQueryVariables pgExecCtx annVarVals = do + let valSel = mkValidationSel $ Map.elems annVarVals + Q.Discard () <- runTx' $ liftTx $ + Q.rawQE dataExnErrHandler (Q.fromBuilder $ toSQL valSel) [] False + pure . ValidatedQueryVariables $ fmap (txtEncodedPGVal . pstValue) annVarVals + where + mkExtrs = map (flip S.Extractor Nothing . toTxtValue) + mkValidationSel vars = + S.mkSelect { S.selExtr = mkExtrs vars } + runTx' tx = do + res <- liftIO $ runExceptT (runLazyTx' pgExecCtx tx) + liftEither res + + -- Explicitly look for the class of errors raised when the format of a value provided + -- for a type is incorrect. + dataExnErrHandler = mkTxErrorHandler (has _PGDataException) + +-- ------------------------------------------------------------------------------------------------- +-- Live query plans + +-- | A self-contained, ready-to-execute live query plan. Contains enough information to find an +-- existing poller that this can be added to /or/ to create a new poller if necessary. +data LiveQueryPlan + = LiveQueryPlan + { _lqpParameterizedPlan :: !ParameterizedLiveQueryPlan + , _lqpSessionVariables :: !UserVars + , _lqpQueryVariables :: !ValidatedQueryVariables + } + +data ParameterizedLiveQueryPlan + = ParameterizedLiveQueryPlan + { _plqpRole :: !RoleName + , _plqpAlias :: !G.Alias + , _plqpQuery :: !MultiplexedQuery + } deriving (Show) +$(J.deriveToJSON (J.aesonDrop 4 J.snakeCase) ''ParameterizedLiveQueryPlan) + +data ReusableLiveQueryPlan + = ReusableLiveQueryPlan + { _rlqpParameterizedPlan :: !ParameterizedLiveQueryPlan + , _rlqpQueryVariableTypes :: !GV.ReusableVariableTypes + } deriving (Show) +$(J.deriveToJSON (J.aesonDrop 4 J.snakeCase) ''ReusableLiveQueryPlan) + +-- | Constructs a new execution plan for a live query and returns a reusable version of the plan if +-- possible. +buildLiveQueryPlan + :: ( MonadError QErr m + , MonadReader r m + , Has UserInfo r + , MonadIO m + ) + => PGExecCtx + -> G.Alias + -> GR.QueryRootFldUnresolved + -> Maybe GV.ReusableVariableTypes + -> m (LiveQueryPlan, Maybe ReusableLiveQueryPlan) +buildLiveQueryPlan pgExecCtx fieldAlias astUnresolved varTypes = do + userInfo <- asks getter + + (astResolved, annVarVals) <- + flip runStateT mempty $ GR.traverseQueryRootFldAST + toMultiplexedQueryVar astUnresolved + let pgQuery = mkMultiplexedQuery $ GR.toPGQuery astResolved + parameterizedPlan = ParameterizedLiveQueryPlan (userRole userInfo) fieldAlias pgQuery + + -- We need to ensure that the values provided for variables + -- are correct according to Postgres. Without this check + -- an invalid value for a variable for one instance of the + -- subscription will take down the entire multiplexed query + validatedVars <- validateQueryVariables pgExecCtx annVarVals + let plan = LiveQueryPlan parameterizedPlan (userVars userInfo) validatedVars + reusablePlan = ReusableLiveQueryPlan parameterizedPlan <$> varTypes + pure (plan, reusablePlan) + +reuseLiveQueryPlan + :: (MonadError QErr m, MonadIO m) + => PGExecCtx + -> UserVars + -> Maybe GH.VariableValues + -> ReusableLiveQueryPlan + -> m LiveQueryPlan +reuseLiveQueryPlan pgExecCtx sessionVars queryVars reusablePlan = do + let ReusableLiveQueryPlan parameterizedPlan varTypes = reusablePlan + annVarVals <- GV.validateVariablesForReuse varTypes queryVars + validatedVars <- validateQueryVariables pgExecCtx annVarVals + pure $ LiveQueryPlan parameterizedPlan sessionVars validatedVars diff --git a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs new file mode 100644 index 00000000000..c5fb65ab762 --- /dev/null +++ b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs @@ -0,0 +1,386 @@ +-- | Multiplexed live query poller threads; see "Hasura.GraphQL.Execute.LiveQuery" for details. +module Hasura.GraphQL.Execute.LiveQuery.Poll ( + -- * Pollers + Poller(..) + , PollerIOState(..) + , pollQuery + + , PollerKey(..) + , PollerMap + , dumpPollerMap + + , RefetchMetrics + , initRefetchMetrics + + -- * Cohorts + , Cohort(..) + , CohortId + , newCohortId + , CohortVariables(..) + , CohortKey + , CohortMap + + -- * Subscribers + , Subscriber(..) + , SubscriberId + , newSinkId + , SubscriberMap + , OnChange + ) where + +import Hasura.Prelude + +import qualified Control.Concurrent.Async as A +import qualified Control.Concurrent.STM as STM +import qualified Crypto.Hash as CH +import qualified Data.Aeson.Extended as J +import qualified Data.ByteString as BS +import qualified Data.HashMap.Strict as Map +import qualified Data.Time.Clock as Clock +import qualified Data.UUID as UUID +import qualified Data.UUID.V4 as UUID +import qualified Database.PG.Query as Q +import qualified Language.GraphQL.Draft.Syntax as G +import qualified ListT +import qualified StmContainers.Map as STMMap +import qualified System.Metrics.Distribution as Metrics + +-- remove these when array encoding is merged +import qualified Database.PG.Query.PTI as PTI +import qualified PostgreSQL.Binary.Encoding as PE + +import Data.List.Split (chunksOf) + +import qualified Hasura.GraphQL.Execute.LiveQuery.TMap as TMap + +import Hasura.Db +import Hasura.EncJSON +import Hasura.GraphQL.Execute.LiveQuery.Options +import Hasura.GraphQL.Execute.LiveQuery.Plan +import Hasura.GraphQL.Transport.HTTP.Protocol +import Hasura.RQL.Types + +-- ------------------------------------------------------------------------------------------------- +-- Subscribers + +data Subscriber + = Subscriber + { _sRootAlias :: !G.Alias + , _sOnChangeCallback :: !OnChange + } + +type OnChange = GQResponse -> IO () + +newtype SubscriberId = SubscriberId { _unSinkId :: UUID.UUID } + deriving (Show, Eq, Hashable, J.ToJSON) + +newSinkId :: IO SubscriberId +newSinkId = SubscriberId <$> UUID.nextRandom + +type SubscriberMap = TMap.TMap SubscriberId Subscriber + +-- ------------------------------------------------------------------------------------------------- +-- Cohorts + +-- | A batched group of 'Subscriber's who are not only listening to the same query but also have +-- identical session and query variables. Each result pushed to a 'Cohort' is forwarded along to +-- each of its 'Subscriber's. +-- +-- In SQL, each 'Cohort' corresponds to a single row in the laterally-joined @_subs@ table (and +-- therefore a single row in the query result). +data Cohort + = Cohort + { _cCohortId :: !CohortId + -- ^ a unique identifier used to identify the cohort in the generated query + , _cPreviousResponse :: !(STM.TVar (Maybe ResponseHash)) + -- ^ a hash of the previous query result, if any, used to determine if we need to push an updated + -- result to the subscribers or not + , _cExistingSubscribers :: !SubscriberMap + -- ^ the subscribers we’ve already pushed a result to; we push new results to them iff the + -- response changes + , _cNewSubscribers :: !SubscriberMap + -- ^ subscribers we haven’t yet pushed any results to; we push results to them regardless if the + -- result changed, then merge them in the map of existing subscribers + } + +newtype CohortId = CohortId { unCohortId :: UUID.UUID } + deriving (Show, Eq, Hashable, Q.FromCol) + +newCohortId :: IO CohortId +newCohortId = CohortId <$> UUID.nextRandom + +data CohortVariables + = CohortVariables + { _cvSessionVariables :: !UserVars + , _cvQueryVariables :: !ValidatedQueryVariables + } deriving (Show, Eq, Generic) +instance Hashable CohortVariables + +instance J.ToJSON CohortVariables where + toJSON (CohortVariables sessionVars queryVars) = + J.object ["user" J..= sessionVars, "variables" J..= queryVars] + +-- | A hash used to determine if the result changed without having to keep the entire result in +-- memory. Using a cryptographic hash ensures that a hash collision is almost impossible: with 256 +-- bits, even if a subscription changes once per second for an entire year, the probability of a +-- hash collision is ~4.294417×10-63. We use Blake2b because it is faster than SHA-256 +newtype ResponseHash = ResponseHash { unResponseHash :: CH.Digest CH.Blake2b_256 } + deriving (Show, Eq) + +instance J.ToJSON ResponseHash where + toJSON = J.toJSON . show . unResponseHash + +mkRespHash :: BS.ByteString -> ResponseHash +mkRespHash = ResponseHash . CH.hash + +-- | A key we use to determine if two 'Subscriber's belong in the same 'Cohort' (assuming they +-- already meet the criteria to be in the same 'Poller'). Note the distinction between this and +-- 'CohortId'; the latter is a completely synthetic key used only to identify the cohort in the +-- generated SQL query. +type CohortKey = CohortVariables +type CohortMap = TMap.TMap CohortKey Cohort + +dumpCohortMap :: CohortMap -> IO J.Value +dumpCohortMap cohortMap = do + cohorts <- STM.atomically $ TMap.toList cohortMap + fmap J.toJSON . forM cohorts $ \(CohortVariables usrVars varVals, cohort) -> do + cohortJ <- dumpCohort cohort + return $ J.object + [ "session_vars" J..= usrVars + , "variable_values" J..= varVals + , "cohort" J..= cohortJ + ] + where + dumpCohort (Cohort respId respTV curOps newOps) = + STM.atomically $ do + prevResHash <- STM.readTVar respTV + curOpIds <- TMap.toList curOps + newOpIds <- TMap.toList newOps + return $ J.object + [ "resp_id" J..= unCohortId respId + , "current_ops" J..= map fst curOpIds + , "new_ops" J..= map fst newOpIds + , "previous_result_hash" J..= prevResHash + ] + +data CohortSnapshot + = CohortSnapshot + { _csVariables :: !CohortVariables + , _csPreviousResponse :: !(STM.TVar (Maybe ResponseHash)) + , _csExistingSubscribers :: ![Subscriber] + , _csNewSubscribers :: ![Subscriber] + } + +pushResultToCohort + :: GQResult EncJSON + -- ^ a response that still needs to be wrapped with each 'Subscriber'’s root 'G.Alias' + -> Maybe ResponseHash + -> CohortSnapshot + -> IO () +pushResultToCohort result respHashM cohortSnapshot = do + prevRespHashM <- STM.readTVarIO respRef + -- write to the current websockets if needed + sinks <- + if isExecError result || respHashM /= prevRespHashM + then do + STM.atomically $ STM.writeTVar respRef respHashM + return (newSinks <> curSinks) + else + return newSinks + pushResultToSubscribers sinks + where + CohortSnapshot _ respRef curSinks newSinks = cohortSnapshot + pushResultToSubscribers = A.mapConcurrently_ $ \(Subscriber alias action) -> + let aliasText = G.unName $ G.unAlias alias + wrapWithAlias response = + encJToLBS $ encJFromAssocList [(aliasText, response)] + in action (wrapWithAlias <$> result) + +-- ------------------------------------------------------------------------------------------------- +-- Pollers + +-- | A unique, multiplexed query. Each 'Poller' has its own polling thread that periodically polls +-- Postgres and pushes results to each of its listening 'Cohort's. +-- +-- In SQL, an 'Poller' corresponds to a single, multiplexed query, though in practice, 'Poller's +-- with large numbers of 'Cohort's are batched into multiple concurrent queries for performance +-- reasons. +data Poller + = Poller + { _pCohorts :: !CohortMap + , _pIOState :: !(STM.TMVar PollerIOState) + -- ^ This is in a separate 'STM.TMVar' because it’s important that we are able to construct + -- 'Poller' values in 'STM.STM' --- we need the insertion into the 'PollerMap' to be atomic to + -- ensure that we don’t accidentally create two for the same query due to a race. However, we + -- can’t spawn the worker thread or create the metrics store in 'STM.STM', so we insert it into + -- the 'Poller' only after we’re certain we won’t create any duplicates. + } +data PollerIOState + = PollerIOState + { _pThread :: !(A.Async ()) + -- ^ a handle on the poller’s worker thread that can be used to 'A.cancel' it if all its cohorts + -- stop listening + , _pMetrics :: !RefetchMetrics + } + +data RefetchMetrics + = RefetchMetrics + { _rmSnapshot :: !Metrics.Distribution + , _rmPush :: !Metrics.Distribution + , _rmQuery :: !Metrics.Distribution + , _rmTotal :: !Metrics.Distribution + } + +initRefetchMetrics :: IO RefetchMetrics +initRefetchMetrics = RefetchMetrics <$> Metrics.new <*> Metrics.new <*> Metrics.new <*> Metrics.new + +data PollerKey + -- we don't need operation name here as a subscription will + -- only have a single top level field + = PollerKey + { _lgRole :: !RoleName + , _lgQuery :: !MultiplexedQuery + } deriving (Show, Eq, Generic) + +instance Hashable PollerKey + +instance J.ToJSON PollerKey where + toJSON (PollerKey role query) = + J.object [ "role" J..= role + , "query" J..= query + ] + +type PollerMap = STMMap.Map PollerKey Poller + +dumpPollerMap :: Bool -> PollerMap -> IO J.Value +dumpPollerMap extended lqMap = + fmap J.toJSON $ do + entries <- STM.atomically $ ListT.toList $ STMMap.listT lqMap + forM entries $ \(PollerKey role query, Poller cohortsMap ioState) -> do + PollerIOState threadId metrics <- STM.atomically $ STM.readTMVar ioState + metricsJ <- dumpRefetchMetrics metrics + cohortsJ <- + if extended + then Just <$> dumpCohortMap cohortsMap + else return Nothing + return $ J.object + [ "role" J..= role + , "thread_id" J..= show (A.asyncThreadId threadId) + , "multiplexed_query" J..= query + , "cohorts" J..= cohortsJ + , "metrics" J..= metricsJ + ] + where + dumpRefetchMetrics metrics = do + snapshotS <- Metrics.read $ _rmSnapshot metrics + queryS <- Metrics.read $ _rmQuery metrics + pushS <- Metrics.read $ _rmPush metrics + totalS <- Metrics.read $ _rmTotal metrics + return $ J.object + [ "snapshot" J..= dumpStats snapshotS + , "query" J..= dumpStats queryS + , "push" J..= dumpStats pushS + , "total" J..= dumpStats totalS + ] + + dumpStats stats = + J.object + [ "mean" J..= Metrics.mean stats + , "variance" J..= Metrics.variance stats + , "count" J..= Metrics.count stats + , "min" J..= Metrics.min stats + , "max" J..= Metrics.max stats + ] + +newtype CohortIdArray = CohortIdArray { unCohortIdArray :: [CohortId] } + deriving (Show, Eq) + +instance Q.ToPrepArg CohortIdArray where + toPrepVal (CohortIdArray l) = Q.toPrepValHelper PTI.unknown encoder $ map unCohortId l + where + encoder = PE.array 2950 . PE.dimensionArray foldl' (PE.encodingArray . PE.uuid) + +newtype CohortVariablesArray = CohortVariablesArray { unCohortVariablesArray :: [CohortVariables] } + deriving (Show, Eq) + +instance Q.ToPrepArg CohortVariablesArray where + toPrepVal (CohortVariablesArray l) = + Q.toPrepValHelper PTI.unknown encoder (map J.toJSON l) + where + encoder = PE.array 114 . PE.dimensionArray foldl' (PE.encodingArray . PE.json_ast) + +-- | Where the magic happens: the top-level action run periodically by each active 'Poller'. +pollQuery + :: RefetchMetrics + -> BatchSize + -> PGExecCtx + -> MultiplexedQuery + -> Poller + -> IO () +pollQuery metrics batchSize pgExecCtx pgQuery handler = do + procInit <- Clock.getCurrentTime + + -- get a snapshot of all the cohorts + -- this need not be done in a transaction + cohorts <- STM.atomically $ TMap.toList cohortMap + cohortSnapshotMap <- Map.fromList <$> mapM (STM.atomically . getCohortSnapshot) cohorts + + let queryVarsBatches = chunksOf (unBatchSize batchSize) $ getQueryVars cohortSnapshotMap + + snapshotFinish <- Clock.getCurrentTime + Metrics.add (_rmSnapshot metrics) $ + realToFrac $ Clock.diffUTCTime snapshotFinish procInit + flip A.mapConcurrently_ queryVarsBatches $ \queryVars -> do + queryInit <- Clock.getCurrentTime + mxRes <- runExceptT . runLazyTx' pgExecCtx . liftTx $ Q.listQE defaultTxErrorHandler + (unMultiplexedQuery pgQuery) (mkMxQueryPrepArgs queryVars) True + queryFinish <- Clock.getCurrentTime + Metrics.add (_rmQuery metrics) $ + realToFrac $ Clock.diffUTCTime queryFinish queryInit + let operations = getCohortOperations cohortSnapshotMap mxRes + -- concurrently push each unique result + A.mapConcurrently_ (uncurry3 pushResultToCohort) operations + pushFinish <- Clock.getCurrentTime + Metrics.add (_rmPush metrics) $ + realToFrac $ Clock.diffUTCTime pushFinish queryFinish + procFinish <- Clock.getCurrentTime + Metrics.add (_rmTotal metrics) $ + realToFrac $ Clock.diffUTCTime procFinish procInit + + where + Poller cohortMap _ = handler + + uncurry3 :: (a -> b -> c -> d) -> (a, b, c) -> d + uncurry3 f (a, b, c) = f a b c + + getCohortSnapshot (cohortVars, handlerC) = do + let Cohort resId respRef curOpsTV newOpsTV = handlerC + curOpsL <- TMap.toList curOpsTV + newOpsL <- TMap.toList newOpsTV + forM_ newOpsL $ \(k, action) -> TMap.insert action k curOpsTV + TMap.reset newOpsTV + let cohortSnapshot = CohortSnapshot cohortVars respRef (map snd curOpsL) (map snd newOpsL) + return (resId, cohortSnapshot) + + getQueryVars cohortSnapshotMap = + Map.toList $ fmap _csVariables cohortSnapshotMap + + mkMxQueryPrepArgs l = + let (respIdL, respVarL) = unzip l + in (CohortIdArray respIdL, CohortVariablesArray respVarL) + + getCohortOperations cohortSnapshotMap = \case + Left e -> + -- TODO: this is internal error + let resp = GQExecError [encodeGQErr False e] + in [ (resp, Nothing, snapshot) + | (_, snapshot) <- Map.toList cohortSnapshotMap + ] + Right responses -> + flip mapMaybe responses $ \(respId, result) -> + -- TODO: change it to use bytestrings directly + let -- No reason to use lazy bytestrings here, since (1) we fetch the entire result set + -- from Postgres strictly and (2) even if we didn’t, hashing will have to force the + -- whole thing anyway. + respHash = mkRespHash (encJToBS result) + in (GQSuccess result, Just respHash,) <$> Map.lookup respId cohortSnapshotMap diff --git a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/State.hs b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/State.hs new file mode 100644 index 00000000000..383a28e8c36 --- /dev/null +++ b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/State.hs @@ -0,0 +1,147 @@ +-- | Top-level management of live query poller threads. The implementation of the polling itself is +-- in "Hasura.GraphQL.Execute.LiveQuery.Poll". See "Hasura.GraphQL.Execute.LiveQuery" for high-level +-- details. +module Hasura.GraphQL.Execute.LiveQuery.State + ( LiveQueriesState + , initLiveQueriesState + , dumpLiveQueriesState + + , LiveQueryId + , addLiveQuery + , removeLiveQuery + ) where + +import Hasura.Prelude + +import qualified Control.Concurrent.Async as A +import qualified Control.Concurrent.STM as STM +import qualified Data.Aeson.Extended as J +import qualified StmContainers.Map as STMMap + +import Control.Concurrent.Extended (threadDelay) + +import qualified Hasura.GraphQL.Execute.LiveQuery.TMap as TMap + +import Hasura.Db +import Hasura.GraphQL.Execute.LiveQuery.Options +import Hasura.GraphQL.Execute.LiveQuery.Plan +import Hasura.GraphQL.Execute.LiveQuery.Poll + +-- | The top-level datatype that holds the state for all active live queries. +data LiveQueriesState + = LiveQueriesState + { _lqsOptions :: !LiveQueriesOptions + , _lqsPGExecTx :: !PGExecCtx + , _lqsLiveQueryMap :: !PollerMap + } + +initLiveQueriesState :: LiveQueriesOptions -> PGExecCtx -> IO LiveQueriesState +initLiveQueriesState options pgCtx = LiveQueriesState options pgCtx <$> STMMap.newIO + +dumpLiveQueriesState :: Bool -> LiveQueriesState -> IO J.Value +dumpLiveQueriesState extended (LiveQueriesState opts _ lqMap) = do + lqMapJ <- dumpPollerMap extended lqMap + return $ J.object + [ "options" J..= opts + , "live_queries_map" J..= lqMapJ + ] + +data LiveQueryId + = LiveQueryId + { _lqiPoller :: !PollerKey + , _lqiCohort :: !CohortKey + , _lqiSubscriber :: !SubscriberId + } + +addLiveQuery + :: LiveQueriesState + -> LiveQueryPlan + -> OnChange + -- ^ the action to be executed when result changes + -> IO LiveQueryId +addLiveQuery lqState plan onResultAction = do + responseId <- newCohortId + sinkId <- newSinkId + + -- a handler is returned only when it is newly created + handlerM <- STM.atomically $ do + handlerM <- STMMap.lookup handlerId lqMap + case handlerM of + Just handler -> do + cohortM <- TMap.lookup cohortId $ _pCohorts handler + case cohortM of + Just cohort -> addToCohort sinkId cohort + Nothing -> addToPoller sinkId responseId handler + return Nothing + Nothing -> do + poller <- newPoller + addToPoller sinkId responseId poller + STMMap.insert poller handlerId lqMap + return $ Just poller + + -- we can then attach a polling thread if it is new + -- the livequery can only be cancelled after putTMVar + onJust handlerM $ \handler -> do + metrics <- initRefetchMetrics + threadRef <- A.async $ forever $ do + pollQuery metrics batchSize pgExecCtx query handler + threadDelay $ unRefetchInterval refetchInterval + STM.atomically $ STM.putTMVar (_pIOState handler) (PollerIOState threadRef metrics) + + pure $ LiveQueryId handlerId cohortId sinkId + where + LiveQueriesState lqOpts pgExecCtx lqMap = lqState + LiveQueriesOptions batchSize refetchInterval = lqOpts + LiveQueryPlan (ParameterizedLiveQueryPlan role alias query) sessionVars queryVars = plan + + handlerId = PollerKey role query + cohortId = CohortVariables sessionVars queryVars + + addToCohort sinkId handlerC = + TMap.insert (Subscriber alias onResultAction) sinkId $ _cNewSubscribers handlerC + + addToPoller sinkId responseId handler = do + newCohort <- Cohort responseId <$> STM.newTVar Nothing <*> TMap.new <*> TMap.new + addToCohort sinkId newCohort + TMap.insert newCohort cohortId $ _pCohorts handler + + newPoller = Poller <$> TMap.new <*> STM.newEmptyTMVar + +removeLiveQuery + :: LiveQueriesState + -- the query and the associated operation + -> LiveQueryId + -> IO () +removeLiveQuery lqState (LiveQueryId handlerId cohortId sinkId) = do + threadRef <- STM.atomically $ do + detM <- getQueryDet + fmap join $ forM detM $ \(Poller cohorts ioState, cohort) -> + cleanHandlerC cohorts ioState cohort + traverse_ A.cancel threadRef + where + lqMap = _lqsLiveQueryMap lqState + + getQueryDet = do + pollerM <- STMMap.lookup handlerId lqMap + fmap join $ forM pollerM $ \poller -> do + cohortM <- TMap.lookup cohortId (_pCohorts poller) + return $ (poller,) <$> cohortM + + cleanHandlerC cohortMap ioState handlerC = do + let curOps = _cExistingSubscribers handlerC + newOps = _cNewSubscribers handlerC + TMap.delete sinkId curOps + TMap.delete sinkId newOps + cohortIsEmpty <- (&&) + <$> TMap.null curOps + <*> TMap.null newOps + when cohortIsEmpty $ TMap.delete cohortId cohortMap + handlerIsEmpty <- TMap.null cohortMap + -- when there is no need for handler + -- i.e, this happens to be the last operation, take the + -- ref for the polling thread to cancel it + if handlerIsEmpty + then do + STMMap.delete handlerId lqMap + fmap _pThread <$> STM.tryReadTMVar ioState + else return Nothing diff --git a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/TMap.hs b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/TMap.hs new file mode 100644 index 00000000000..af2a93fb994 --- /dev/null +++ b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/TMap.hs @@ -0,0 +1,42 @@ +module Hasura.GraphQL.Execute.LiveQuery.TMap + ( TMap + , new + , reset + , null + , lookup + , insert + , delete + , toList + ) where + +import Hasura.Prelude hiding (lookup, null, toList) + +import qualified Data.HashMap.Strict as Map + +import Control.Concurrent.STM + +-- | A coarse-grained transactional map implemented by simply wrapping a 'Map.HashMap' in a 'TVar'. +-- Compared to "StmContainers.Map", this provides much faster iteration over the elements at the +-- cost of significantly increased contention on writes. +newtype TMap k v = TMap { unTMap :: TVar (Map.HashMap k v) } + +new :: STM (TMap k v) +new = TMap <$> newTVar Map.empty + +reset :: TMap k v -> STM () +reset = flip writeTVar Map.empty . unTMap + +null :: TMap k v -> STM Bool +null = fmap Map.null . readTVar . unTMap + +lookup :: (Eq k, Hashable k) => k -> TMap k v -> STM (Maybe v) +lookup k = fmap (Map.lookup k) . readTVar . unTMap + +insert :: (Eq k, Hashable k) => v -> k -> TMap k v -> STM () +insert v k mapTv = modifyTVar' (unTMap mapTv) $ Map.insert k v + +delete :: (Eq k, Hashable k) => k -> TMap k v -> STM () +delete k mapTv = modifyTVar' (unTMap mapTv) $ Map.delete k + +toList :: TMap k v -> STM [(k, v)] +toList = fmap Map.toList . readTVar . unTMap diff --git a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Types.hs b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Types.hs deleted file mode 100644 index b0ea4b9911e..00000000000 --- a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Types.hs +++ /dev/null @@ -1,115 +0,0 @@ -module Hasura.GraphQL.Execute.LiveQuery.Types - ( OnChange - , ThreadTM - , SinkId - , newSinkId - , Sinks - - , RespHash - , mkRespHash - , RespTV - - , RefetchInterval - , refetchIntervalFromMilli - , refetchIntervalToMicro - - , TMap - , newTMap - , resetTMap - , nullTMap - , insertTMap - , deleteTMap - , lookupTMap - , toListTMap - ) where - -import Data.Word (Word32) - -import qualified Control.Concurrent.Async as A -import qualified Control.Concurrent.STM as STM -import qualified Crypto.Hash as CH -import qualified Data.Aeson as J -import qualified Data.ByteString.Lazy as LBS -import qualified Data.HashMap.Strict as Map -import qualified Data.UUID as UUID -import qualified Data.UUID.V4 as UUID - -import Hasura.GraphQL.Transport.HTTP.Protocol -import Hasura.Prelude - -type OnChange = GQResp -> IO () -type ThreadTM = STM.TMVar (A.Async ()) - --- a cryptographic hash should ensure that --- a hash collision is almost improbable --- Blake2b because it is faster than Sha256 --- With 256 bits, and 86400 * 365 (a subscription open for 365 days) --- there is ~ 4.294417×10-63 chance of a hash collision. - -newtype RespHash - = RespHash {unRespHash :: CH.Digest CH.Blake2b_256} - deriving (Show, Eq) - -instance J.ToJSON RespHash where - toJSON = J.toJSON . show . unRespHash - -mkRespHash :: LBS.ByteString -> RespHash -mkRespHash = RespHash . CH.hashlazy - -type RespTV = STM.TVar (Maybe RespHash) - -newtype RefetchInterval - = RefetchInterval {unRefetchInterval :: Word32} - deriving (Show, Eq, J.ToJSON) - -refetchIntervalFromMilli :: Word32 -> RefetchInterval -refetchIntervalFromMilli = RefetchInterval - -refetchIntervalToMicro :: RefetchInterval -> Int -refetchIntervalToMicro ri = fromIntegral $ 1000 * unRefetchInterval ri - --- compared to stm.stmmap, this provides a much faster --- iteration over the elements at the cost of slower --- concurrent insertions -newtype TMap k v - = TMap {unTMap :: STM.TVar (Map.HashMap k v)} - -newTMap :: STM.STM (TMap k v) -newTMap = - TMap <$> STM.newTVar Map.empty - -resetTMap :: TMap k v -> STM.STM () -resetTMap = - flip STM.writeTVar Map.empty . unTMap - -nullTMap :: TMap k v -> STM.STM Bool -nullTMap = - fmap Map.null . STM.readTVar . unTMap - -insertTMap :: (Eq k, Hashable k) => v -> k -> TMap k v -> STM.STM () -insertTMap v k mapTv = - STM.modifyTVar' (unTMap mapTv) $ Map.insert k v - -deleteTMap :: (Eq k, Hashable k) => k -> TMap k v -> STM.STM () -deleteTMap k mapTv = - STM.modifyTVar' (unTMap mapTv) $ Map.delete k - -lookupTMap :: (Eq k, Hashable k) => k -> TMap k v -> STM.STM (Maybe v) -lookupTMap k = - fmap (Map.lookup k) . STM.readTVar . unTMap - -toListTMap :: TMap k v -> STM.STM [(k, v)] -toListTMap = - fmap Map.toList . STM.readTVar . unTMap - -newtype SinkId - = SinkId {_unSinkId :: UUID.UUID} - deriving (Show, Eq, Hashable) - -newSinkId :: IO SinkId -newSinkId = SinkId <$> UUID.nextRandom - -instance J.ToJSON SinkId where - toJSON = J.toJSON . show - -type Sinks = TMap SinkId OnChange diff --git a/server/src-lib/Hasura/GraphQL/Execute/Plan.hs b/server/src-lib/Hasura/GraphQL/Execute/Plan.hs index 137b8746189..22bb7b515a8 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/Plan.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/Plan.hs @@ -44,7 +44,7 @@ newtype PlanCache data ReusablePlan = RPQuery !EQ.ReusableQueryPlan - | RPSubs !LQ.SubsPlan + | RPSubs !LQ.ReusableLiveQueryPlan instance J.ToJSON ReusablePlan where toJSON = \case diff --git a/server/src-lib/Hasura/GraphQL/Execute/Query.hs b/server/src-lib/Hasura/GraphQL/Execute/Query.hs index 646f6e08285..bd08d65956d 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/Query.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/Query.hs @@ -12,7 +12,6 @@ import qualified Data.Aeson as J import qualified Data.ByteString as B import qualified Data.ByteString.Lazy as LBS import qualified Data.HashMap.Strict as Map -import qualified Data.HashSet as Set import qualified Data.IntMap as IntMap import qualified Data.TByteString as TBS import qualified Data.Text as T @@ -63,18 +62,12 @@ instance J.ToJSON RootFieldPlan where RFPRaw encJson -> J.toJSON $ TBS.fromBS encJson RFPPostgres pgPlan -> J.toJSON pgPlan -type VariableTypes = Map.HashMap G.Variable PGColumnType - -data QueryPlan - = QueryPlan - { _qpVariables :: ![G.VariableDefinition] - , _qpFldPlans :: ![(G.Alias, RootFieldPlan)] - } +type FieldPlans = [(G.Alias, RootFieldPlan)] data ReusableQueryPlan = ReusableQueryPlan - { _rqpVariableTypes :: !VariableTypes - , _rqpFldPlans :: ![(G.Alias, RootFieldPlan)] + { _rqpVariableTypes :: !GV.ReusableVariableTypes + , _rqpFldPlans :: !FieldPlans } instance J.ToJSON ReusableQueryPlan where @@ -83,32 +76,9 @@ instance J.ToJSON ReusableQueryPlan where , "field_plans" J..= fldPlans ] -getReusablePlan :: QueryPlan -> Maybe ReusableQueryPlan -getReusablePlan (QueryPlan vars fldPlans) = - if all fldPlanReusable $ map snd fldPlans - then Just $ ReusableQueryPlan varTypes fldPlans - else Nothing - where - allVars = Set.fromList $ map G._vdVariable vars - - -- this is quite aggressive, we can improve this by - -- computing used variables in each field - allUsed fldPlanVars = - Set.null $ Set.difference allVars $ Set.fromList fldPlanVars - - fldPlanReusable = \case - RFPRaw _ -> True - RFPPostgres pgPlan -> allUsed $ Map.keys $ _ppVariables pgPlan - - varTypesOfPlan = \case - RFPRaw _ -> mempty - RFPPostgres pgPlan -> snd <$> _ppVariables pgPlan - - varTypes = Map.unions $ map (varTypesOfPlan . snd) fldPlans - withPlan :: (MonadError QErr m) - => UserVars -> PGPlan -> GV.AnnPGVarVals -> m PreparedSql + => UserVars -> PGPlan -> GV.ReusableVariableValues -> m PreparedSql withPlan usrVars (PGPlan q reqVars prepMap) annVars = do prepMap' <- foldM getVar prepMap (Map.toList reqVars) let args = withUserVars usrVars $ IntMap.elems prepMap' @@ -125,9 +95,9 @@ withPlan usrVars (PGPlan q reqVars prepMap) annVars = do mkCurPlanTx :: (MonadError QErr m) => UserVars - -> QueryPlan + -> FieldPlans -> m (LazyRespTx, GeneratedSqlMap) -mkCurPlanTx usrVars (QueryPlan _ fldPlans) = do +mkCurPlanTx usrVars fldPlans = do -- generate the SQL and prepared vars or the bytestring resolved <- forM fldPlans $ \(alias, fldPlan) -> do fldResp <- case fldPlan of @@ -160,8 +130,8 @@ getVarArgNum getVarArgNum var colTy = do PlanningSt curArgNum vars prepped <- get case Map.lookup var vars of - Just argNum -> return $ fst argNum - Nothing -> do + Just (argNum, _) -> pure argNum + Nothing -> do put $ PlanningSt (curArgNum + 1) (Map.insert var (curArgNum, colTy) vars) prepped return curArgNum @@ -186,10 +156,11 @@ prepareWithPlan => UnresolvedVal -> m S.SQLExp prepareWithPlan = \case R.UVPG annPGVal -> do - let AnnPGVal varM isNullable colTy colVal = annPGVal - argNum <- case (varM, isNullable) of - (Just var, False) -> getVarArgNum var colTy - _ -> getNextArgNum + let AnnPGVal varM _ colTy colVal = annPGVal + argNum <- case varM of + -- TODO: Use reusability information? + Just var -> getVarArgNum var colTy + Nothing -> getNextArgNum addPrepArg argNum $ toBinaryValue colVal return $ toPrepParam argNum (pstType colVal) @@ -216,10 +187,10 @@ convertQuerySelSet , Has SQLGenCtx r , Has UserInfo r ) - => [G.VariableDefinition] + => Maybe GV.ReusableVariableTypes -> V.SelSet -> m (LazyRespTx, Maybe ReusableQueryPlan, GeneratedSqlMap) -convertQuerySelSet varDefs fields = do +convertQuerySelSet varTypes fields = do usrVars <- asks (userVars . getter) fldPlans <- forM (toList fields) $ \fld -> do fldPlan <- case V._fName fld of @@ -233,10 +204,9 @@ convertQuerySelSet varDefs fields = do prepareWithPlan unresolvedAst return $ RFPPostgres $ PGPlan (R.toPGQuery q) vars prepped return (V._fAlias fld, fldPlan) - let queryPlan = QueryPlan varDefs fldPlans - reusablePlanM = getReusablePlan queryPlan - (tx, sql) <- mkCurPlanTx usrVars queryPlan - return (tx, reusablePlanM, sql) + let reusablePlan = ReusableQueryPlan <$> varTypes <*> pure fldPlans + (tx, sql) <- mkCurPlanTx usrVars fldPlans + return (tx, reusablePlan, sql) -- use the existing plan and new variables to create a pg query queryOpFromPlan @@ -246,7 +216,7 @@ queryOpFromPlan -> ReusableQueryPlan -> m (LazyRespTx, GeneratedSqlMap) queryOpFromPlan usrVars varValsM (ReusableQueryPlan varTypes fldPlans) = do - validatedVars <- GV.getAnnPGVarVals varTypes varValsM + validatedVars <- GV.validateVariablesForReuse varTypes varValsM -- generate the SQL and prepared vars or the bytestring resolved <- forM fldPlans $ \(alias, fldPlan) -> (alias,) <$> case fldPlan of diff --git a/server/src-lib/Hasura/GraphQL/Resolve/InputValue.hs b/server/src-lib/Hasura/GraphQL/Resolve/InputValue.hs index acbbe77c93f..bd871dbcc68 100644 --- a/server/src-lib/Hasura/GraphQL/Resolve/InputValue.hs +++ b/server/src-lib/Hasura/GraphQL/Resolve/InputValue.hs @@ -44,6 +44,8 @@ tyMismatch expectedTy v = getAnnInpValKind (_aivValue v) <> " for value of type " <> G.showGT (_aivType v) +-- Note: This currently must be manually kept in sync with 'mkReusableVariableTypes' from +-- "Hasura.GraphQL.Validate"! It should be possible to merge them if we do #2801. asPGColumnTypeAndValueM :: (MonadError QErr m) => AnnInpVal diff --git a/server/src-lib/Hasura/GraphQL/Transport/HTTP/Protocol.hs b/server/src-lib/Hasura/GraphQL/Transport/HTTP/Protocol.hs index f6ac8e5dc0f..14cbe501acf 100644 --- a/server/src-lib/Hasura/GraphQL/Transport/HTTP/Protocol.hs +++ b/server/src-lib/Hasura/GraphQL/Transport/HTTP/Protocol.hs @@ -9,7 +9,8 @@ module Hasura.GraphQL.Transport.HTTP.Protocol , VariableValues , encodeGQErr , encodeGQResp - , GQResp(..) + , GQResult(..) + , GQResponse , isExecError , RemoteGqlResp(..) , GraphqlResponse(..) @@ -82,18 +83,20 @@ encodeGQErr :: Bool -> QErr -> J.Value encodeGQErr includeInternal qErr = J.object [ "errors" J..= [encodeGQLErr includeInternal qErr]] -data GQResp - = GQSuccess !BL.ByteString +data GQResult a + = GQSuccess !a | GQPreExecError ![J.Value] | GQExecError ![J.Value] - deriving (Show, Eq) + deriving (Show, Eq, Functor, Foldable, Traversable) -isExecError :: GQResp -> Bool +type GQResponse = GQResult BL.ByteString + +isExecError :: GQResult a -> Bool isExecError = \case GQExecError _ -> True _ -> False -encodeGQResp :: GQResp -> EncJSON +encodeGQResp :: GQResponse -> EncJSON encodeGQResp gqResp = encJFromAssocList $ case gqResp of GQSuccess r -> [("data", encJFromLBS r)] @@ -118,7 +121,7 @@ encodeRemoteGqlResp (RemoteGqlResp d e ex) = -- | Represents a proper GraphQL response data GraphqlResponse - = GRHasura !GQResp + = GRHasura !GQResponse | GRRemote !RemoteGqlResp encodeGraphqlResponse :: GraphqlResponse -> EncJSON diff --git a/server/src-lib/Hasura/GraphQL/Validate.hs b/server/src-lib/Hasura/GraphQL/Validate.hs index a4befe6c115..13de005e43c 100644 --- a/server/src-lib/Hasura/GraphQL/Validate.hs +++ b/server/src-lib/Hasura/GraphQL/Validate.hs @@ -2,23 +2,24 @@ module Hasura.GraphQL.Validate ( validateGQ , showVars , RootSelSet(..) + , SelSet + , Field(..) , getTypedOp - , QueryParts (..) + , QueryParts(..) , getQueryParts - , getAnnVarVals , isQueryInAllowlist - , VarPGTypes - , AnnPGVarVals - , getAnnPGVarVals - , Field(..) - , SelSet + , ReusableVariableTypes + , ReusableVariableValues + , validateVariablesForReuse ) where -import Data.Has import Hasura.Prelude +import Data.Aeson +import Data.Has + import qualified Data.HashMap.Strict as Map import qualified Data.HashSet as HS import qualified Data.Sequence as Seq @@ -69,81 +70,90 @@ getTypedOp opNameM selSets opDefs = throwVE $ "exactly one operation has to be present " <> "in the document when operationName is not specified" --- For all the variables defined there will be a value in the final map +-- | For all the variables defined there will be a value in the final map -- If no default, not in variables and nullable, then null value -getAnnVarVals - :: ( MonadReader r m, Has TypeMap r - , MonadError QErr m - ) - => [G.VariableDefinition] - -> VariableValues - -> m AnnVarVals -getAnnVarVals varDefsL inpVals = withPathK "variableValues" $ do - +validateVariables + :: (MonadReader r m, Has TypeMap r, MonadError QErr m) + => [G.VariableDefinition] -> VariableValues -> m AnnVarVals +validateVariables varDefsL inpVals = withPathK "variableValues" $ do varDefs <- onLeft (mkMapWith G._vdVariable varDefsL) $ \dups -> throwVE $ "the following variables are defined more than once: " <> showVars dups let unexpectedVars = filter (not . (`Map.member` varDefs)) $ Map.keys inpVals - unless (null unexpectedVars) $ throwVE $ "unexpected variables in variableValues: " <> showVars unexpectedVars - forM varDefs $ \(G.VariableDefinition var ty defM) -> do - let baseTy = getBaseTy ty - baseTyInfo <- getTyInfoVE baseTy - -- check that the variable is defined on input types - when (isObjTy baseTyInfo) $ throwVE $ objTyErrMsg baseTy - - let defM' = bool (defM <|> Just G.VCNull) defM $ G.isNotNull ty - annDefM <- withPathK "defaultValue" $ - mapM (validateInputValue constValueParser ty) defM' - let inpValM = Map.lookup var inpVals - annInpValM <- withPathK (G.unName $ G.unVariable var) $ - mapM (validateInputValue jsonParser ty) inpValM - let varValM = annInpValM <|> annDefM - onNothing varValM $ throwVE $ - "expecting a value for non-nullable variable: " <> - showVars [var] <> - " of type: " <> G.showGT ty <> - " in variableValues" + traverse validateVariable varDefs where - objTyErrMsg namedTy = - "variables can only be defined on input types" - <> "(enums, scalars, input objects), but " - <> showNamedTy namedTy <> " is an object type" + validateVariable (G.VariableDefinition var ty defM) = do + let baseTy = getBaseTy ty + baseTyInfo <- getTyInfoVE baseTy + -- check that the variable is defined on input types + when (isObjTy baseTyInfo) $ throwVE $ + "variables can only be defined on input types" + <> "(enums, scalars, input objects), but " + <> showNamedTy baseTy <> " is an object type" + + let defM' = bool (defM <|> Just G.VCNull) defM $ G.isNotNull ty + annDefM <- withPathK "defaultValue" $ + mapM (validateInputValue constValueParser ty) defM' + let inpValM = Map.lookup var inpVals + annInpValM <- withPathK (G.unName $ G.unVariable var) $ + mapM (validateInputValue jsonParser ty) inpValM + let varValM = annInpValM <|> annDefM + onNothing varValM $ throwVE $ + "expecting a value for non-nullable variable: " <> + showVars [var] <> + " of type: " <> G.showGT ty <> + " in variableValues" + showVars :: (Functor f, Foldable f) => f G.Variable -> Text showVars = showNames . fmap G.unVariable -type VarPGTypes = Map.HashMap G.Variable PGColumnType -type AnnPGVarVals = Map.HashMap G.Variable (WithScalarType PGScalarValue) +-- | Returned by 'validateGQ' when a query’s variables are sufficiently simple that this query is +-- eligible to be /reused/, which means that the resolved SQL query will not change even if any of +-- its variable values change. This is used to determine which query plans can be cached. +newtype ReusableVariableTypes = ReusableVariableTypes (Map.HashMap G.Variable PGColumnType) + deriving (Show, Eq, ToJSON) +type ReusableVariableValues = Map.HashMap G.Variable (WithScalarType PGScalarValue) --- this is in similar spirit to getAnnVarVals, however --- here it is much simpler and can get rid of typemap requirement --- combine the two if possible -getAnnPGVarVals +mkReusableVariableTypes :: [G.VariableDefinition] -> AnnVarVals -> Maybe ReusableVariableTypes +mkReusableVariableTypes varDefs inputValues = ReusableVariableTypes <$> do + -- If any of the variables are nullable, this query isn’t reusable, since a null variable used + -- in a condition like {_eq: $var} removes the condition entirely, requiring different SQL. + guard (not $ any (G.isNullable . G._vdType) varDefs) + + -- Note: This currently must be manually kept in sync with 'asPGColumnTypeAndValueM' from + -- "Hasura.GraphQL.Resolve.InputValue"! It should be possible to merge them if we do #2801. + for inputValues $ \inputValue -> case _aivValue inputValue of + AGScalar pgType _ -> Just $ PGColumnScalar pgType + AGEnum _ (AGEReference reference _) -> Just $ PGColumnEnumReference reference + _ -> Nothing + +-- | This is similar in spirit to 'validateVariables' but uses preexisting 'ReusableVariableTypes' +-- information to parse Postgres values directly for use with a reusable query plan. (Ideally, it +-- would be nice to be able to share more of the logic instead of duplicating it.) +validateVariablesForReuse :: (MonadError QErr m) - => VarPGTypes - -> Maybe VariableValues - -> m AnnPGVarVals -getAnnPGVarVals varTypes varValsM = - flip Map.traverseWithKey varTypes $ \varName varType -> do - let unexpectedVars = filter - (not . (`Map.member` varTypes)) $ Map.keys varVals + => ReusableVariableTypes -> Maybe VariableValues -> m ReusableVariableValues +validateVariablesForReuse (ReusableVariableTypes varTypes) varValsM = + withPathK "variableValues" $ do + let unexpectedVars = filter (not . (`Map.member` varTypes)) $ Map.keys varVals unless (null unexpectedVars) $ - throwVE $ "unexpected variables in variableValues: " <> - showVars unexpectedVars - varVal <- onNothing (Map.lookup varName varVals) $ - throwVE $ "expecting a value for non-nullable variable: " <> - showVars [varName] <> - -- TODO: we don't have the graphql type - -- " of type: " <> T.pack (show varType) <> - " in variableValues" - parsePGScalarValue varType varVal - where - varVals = fromMaybe Map.empty varValsM + throwVE $ "unexpected variables: " <> showVars unexpectedVars + + flip Map.traverseWithKey varTypes $ \varName varType -> + withPathK (G.unName $ G.unVariable varName) $ do + varVal <- onNothing (Map.lookup varName varVals) $ + throwVE "expected a value for non-nullable variable" + -- TODO: we don't have the graphql type + -- <> " of type: " <> T.pack (show varType) + parsePGScalarValue varType varVal + where + varVals = fromMaybe Map.empty varValsM validateFrag :: (MonadError QErr m, MonadReader r m, Has TypeMap r) @@ -167,14 +177,15 @@ validateGQ :: (MonadError QErr m, MonadReader GCtx m) -- => GraphQLRequest => QueryParts - -> m RootSelSet + -> m (RootSelSet, Maybe ReusableVariableTypes) validateGQ (QueryParts opDef opRoot fragDefsL varValsM) = do ctx <- ask -- annotate the variables of this operation - annVarVals <- getAnnVarVals (G._todVariableDefinitions opDef) $ - fromMaybe Map.empty varValsM + let varDefs = G._todVariableDefinitions opDef + annVarVals <- validateVariables varDefs $ fromMaybe Map.empty varValsM + let reusableTypes = mkReusableVariableTypes varDefs annVarVals -- annotate the fragments fragDefs <- onLeft (mkMapWith G._fdName fragDefsL) $ \dups -> @@ -188,7 +199,7 @@ validateGQ (QueryParts opDef opRoot fragDefsL varValsM) = do selSet <- flip runReaderT valCtx $ denormSelSet [] opRoot $ G._todSelectionSet opDef - case G._todType opDef of + rootSelSet <- case G._todType opDef of G.OperationTypeQuery -> return $ RQuery selSet G.OperationTypeMutation -> return $ RMutation selSet G.OperationTypeSubscription -> @@ -199,6 +210,8 @@ validateGQ (QueryParts opDef opRoot fragDefsL varValsM) = do throwVE "subscription must select only one top level field" return $ RSubscription fld + pure (rootSelSet, reusableTypes) + isQueryInAllowlist :: GQLExecDoc -> HS.HashSet GQLQuery -> Bool isQueryInAllowlist q = HS.member gqlQuery where diff --git a/server/src-lib/Hasura/Server/App.hs b/server/src-lib/Hasura/Server/App.hs index 5ca54e9aa28..2e69e08208d 100644 --- a/server/src-lib/Hasura/Server/App.hs +++ b/server/src-lib/Hasura/Server/App.hs @@ -458,7 +458,7 @@ mkWaiApp -> Bool -> InstanceId -> S.HashSet API - -> EL.LQOpts + -> EL.LiveQueriesOptions -> IO HasuraApp mkWaiApp isoLevel loggerCtx sqlGenCtx enableAL pool ci httpManager mode corsCfg enableConsole consoleAssetsDir enableTelemetry instanceId apis lqOpts = do diff --git a/server/src-lib/Hasura/Server/Init.hs b/server/src-lib/Hasura/Server/Init.hs index 1b6b77dca90..3e1eb3d6740 100644 --- a/server/src-lib/Hasura/Server/Init.hs +++ b/server/src-lib/Hasura/Server/Init.hs @@ -14,6 +14,7 @@ import qualified Data.Text.Encoding as TE import qualified Text.PrettyPrint.ANSI.Leijen as PP import Data.Char (toLower) +import Data.Time.Clock.Units (milliseconds) import Network.Wai.Handler.Warp (HostPreference) import Options.Applicative @@ -95,7 +96,7 @@ data ServeOptions , soEnableTelemetry :: !Bool , soStringifyNum :: !Bool , soEnabledAPIs :: !(Set.HashSet API) - , soLiveQueryOpts :: !LQ.LQOpts + , soLiveQueryOpts :: !LQ.LiveQueriesOptions , soEnableAllowlist :: !Bool , soEnabledLogTypes :: !(Set.HashSet L.EngineLogType) , soLogLevel :: !L.LogLevel @@ -186,10 +187,10 @@ instance FromEnv [API] where fromEnv = readAPIs instance FromEnv LQ.BatchSize where - fromEnv = fmap LQ.mkBatchSize . readEither + fromEnv = fmap LQ.BatchSize . readEither instance FromEnv LQ.RefetchInterval where - fromEnv = fmap LQ.refetchIntervalFromMilli . readEither + fromEnv = fmap (LQ.RefetchInterval . milliseconds . fromInteger) . readEither instance FromEnv JWTConfig where fromEnv = readJson @@ -353,14 +354,9 @@ mkServeOptions rso = do _ -> corsCfg mkLQOpts = do - mxRefetchIntM <- withEnv (rsoMxRefetchInt rso) $ - fst mxRefetchDelayEnv - mxBatchSizeM <- withEnv (rsoMxBatchSize rso) $ - fst mxBatchSizeEnv - fallbackRefetchIntM <- withEnv (rsoFallbackRefetchInt rso) $ - fst fallbackRefetchDelayEnv - return $ LQ.mkLQOpts (LQ.mkMxOpts mxBatchSizeM mxRefetchIntM) - (LQ.mkFallbackOpts fallbackRefetchIntM) + mxRefetchIntM <- withEnv (rsoMxRefetchInt rso) $ fst mxRefetchDelayEnv + mxBatchSizeM <- withEnv (rsoMxBatchSize rso) $ fst mxBatchSizeEnv + return $ LQ.mkLiveQueriesOptions mxBatchSizeM mxRefetchIntM mkExamplesDoc :: [[String]] -> PP.Doc diff --git a/server/stack.yaml b/server/stack.yaml index e1d27ec081d..3757eb616e5 100644 --- a/server/stack.yaml +++ b/server/stack.yaml @@ -17,7 +17,7 @@ rebuild-ghc-options: true extra-deps: # use https URLs so that build systems can clone these repos - git: https://github.com/hasura/pg-client-hs.git - commit: 1eb97c11f52b360ce7f796d9427dc294ce8e45fc + commit: de5c023ed7d2f75a77972ff52b6e5ed19d010ca2 - git: https://github.com/hasura/graphql-parser-hs.git commit: 1ccdbb4c4d743b679f3141992df39feaee971640 - git: https://github.com/hasura/ci-info-hs.git diff --git a/server/stack.yaml.lock b/server/stack.yaml.lock index 54d4bbec474..3e87f0c2cf8 100644 --- a/server/stack.yaml.lock +++ b/server/stack.yaml.lock @@ -13,11 +13,11 @@ packages: git: https://github.com/hasura/pg-client-hs.git pantry-tree: size: 1107 - sha256: 8d5502889184e1b751d55c4b7d0e10985711284c6fae0b40cf7cb171949f0ccf - commit: 1eb97c11f52b360ce7f796d9427dc294ce8e45fc + sha256: e008d56e5b0535223b856be94a8e71e31d7dabe10b2951a38447df5089de1876 + commit: de5c023ed7d2f75a77972ff52b6e5ed19d010ca2 original: git: https://github.com/hasura/pg-client-hs.git - commit: 1eb97c11f52b360ce7f796d9427dc294ce8e45fc + commit: de5c023ed7d2f75a77972ff52b6e5ed19d010ca2 - completed: cabal-file: size: 3295