mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-17 20:41:49 +03:00
6e574f1bbe
## Description ### I want to speak to the `Manager` Oh boy. This PR is both fairly straightforward and overreaching, so let's break it down. For most network access, we need a [`HTTP.Manager`](https://hackage.haskell.org/package/http-client-0.1.0.0/docs/Network-HTTP-Client-Manager.html). It is created only once, at the top level, when starting the engine, and is then threaded through the application to wherever we need to make a network call. As of main, the way we do this is not standardized: most of the GraphQL execution code passes it "manually" as a function argument throughout the code. We also have a custom monad constraint, `HasHttpManagerM`, that describes a monad's ability to provide a manager. And, finally, several parts of the code store the manager in some kind of argument structure, such as `RunT`'s `RunCtx`. This PR's first goal is to harmonize all of this: we always create the manager at the root, and we already have it when we do our very first `runReaderT`. Wouldn't it make sense for the rest of the code to not manually pass it anywhere, to not store it anywhere, but to always rely on the current monad providing it? This is, in short, what this PR does: it implements a constraint on the base monads, so that they provide the manager, and removes most explicit passing from the code. ### First come, first served One way this PR goes a tiny bit further than "just" doing the aforementioned harmonization is that it starts the process of implementing the "Services oriented architecture" roughly outlined in this [draft document](https://docs.google.com/document/d/1FAigqrST0juU1WcT4HIxJxe1iEBwTuBZodTaeUvsKqQ/edit?usp=sharing). Instead of using the existing `HasHTTPManagerM`, this PR revamps it into the `ProvidesNetwork` service. The idea is, again, that we should make all "external" dependencies of the engine, all things that the core of the engine doesn't care about, a "service". This allows us to define clear APIs for features, to choose different implementations based on which version of the engine we're running, harmonizes our many scattered monadic constraints... Which is why this service is called "Network": we can refine it, moving forward, to be the constraint that defines how all network communication is to operate, instead of relying on disparate classes constraint or hardcoded decisions. A comment in the code clarifies this intent. ### Side-effects? In my Haskell? This PR also unavoidably touches some other aspects of the codebase. One such example: it introduces `Hasura.App.AppContext`, named after `HasuraPro.Context.AppContext`: a name for the reader structure at the base level. It also transforms `Handler` from a type alias to a newtype, as `Handler` is where we actually enforce HTTP limits; but without `Handler` being a distinct type, any code path could simply do a `runExceptT $ runReader` and forget to enforce them. (As a rule of thumb, i am starting to consider any straggling `runReaderT` or `runExceptT` as a code smell: we should not stack / unstack monads haphazardly, and every layer should be an opaque `newtype` with a corresponding run function.) ## Further work In several places, i have left TODOs when i have encountered things that suggest that we should do further unrelated cleanups. I'll write down the follow-up steps, either in the aforementioned document or on slack. But, in short, at a glance, in approximate order, we could: - delete `ExecutionCtx` as it is only a subset of `ServerCtx`, and remove one more `runReaderT` call - delete `ServerConfigCtx` as it is only a subset of `ServerCtx`, and remove it from `RunCtx` - remove `ServerCtx` from `HandlerCtx`, and make it part of `AppContext`, or even make it the `AppContext` altogether (since, at least for the OSS version, `AppContext` is there again only a subset) - remove `CacheBuildParams` and `CacheBuild` altogether, as they're just a distinct stack that is a `ReaderT` on top of `IO` that contains, you guessed it, the same thing as `ServerCtx` - move `RunT` out of `RQL.Types` and rename it, since after the previous cleanups **it only contains `UserInfo`**; it could be bundled with the authentication service, made a small implementation detail in `Hasura.Server.Auth` - rename `PGMetadaStorageT` to something a bit more accurate, such as `App`, and enforce its IO base This would significantly simply our complex stack. From there, or in parallel, we can start moving existing dependencies as Services. For the purpose of supporting read replicas entitlement, we could move `MonadResolveSource` to a `SourceResolver` service, as attempted in #7653, and transform `UserAuthenticationM` into a `Authentication` service. PR-URL: https://github.com/hasura/graphql-engine-mono/pull/7736 GitOrigin-RevId: 68cce710eb9e7d752bda1ba0c49541d24df8209f
784 lines
33 KiB
Haskell
784 lines
33 KiB
Haskell
-- | Execution of GraphQL queries over HTTP transport
|
|
module Hasura.GraphQL.Transport.HTTP
|
|
( QueryCacheKey (..),
|
|
MonadExecuteQuery (..),
|
|
CachedDirective (..),
|
|
runGQ,
|
|
runGQBatched,
|
|
coalescePostgresMutations,
|
|
extractFieldFromResponse,
|
|
buildRaw,
|
|
encodeAnnotatedResponseParts,
|
|
encodeEncJSONResults,
|
|
|
|
-- * imported from HTTP.Protocol; required by pro
|
|
GQLReq (..),
|
|
GQLReqUnparsed,
|
|
GQLReqParsed,
|
|
GQLExecDoc (..),
|
|
OperationName (..),
|
|
GQLQueryText (..),
|
|
AnnotatedResponsePart (..),
|
|
CacheStoreSuccess (..),
|
|
CacheStoreFailure (..),
|
|
SessVarPred,
|
|
filterVariablesFromQuery,
|
|
runSessVarPred,
|
|
)
|
|
where
|
|
|
|
import Control.Lens (Traversal', foldOf, to)
|
|
import Control.Monad.Morph (hoist)
|
|
import Control.Monad.Trans.Control (MonadBaseControl)
|
|
import Data.Aeson qualified as J
|
|
import Data.Aeson.Ordered qualified as JO
|
|
import Data.Bifoldable
|
|
import Data.ByteString.Lazy qualified as LBS
|
|
import Data.Dependent.Map qualified as DM
|
|
import Data.Environment qualified as Env
|
|
import Data.HashMap.Strict.InsOrd qualified as OMap
|
|
import Data.Monoid (Any (..))
|
|
import Data.Text qualified as T
|
|
import Hasura.Backends.Postgres.Instances.Transport (runPGMutationTransaction)
|
|
import Hasura.Base.Error
|
|
import Hasura.EncJSON
|
|
import Hasura.GraphQL.Execute qualified as E
|
|
import Hasura.GraphQL.Execute.Action qualified as EA
|
|
import Hasura.GraphQL.Execute.Backend qualified as EB
|
|
import Hasura.GraphQL.Execute.RemoteJoin qualified as RJ
|
|
import Hasura.GraphQL.Logging
|
|
( MonadQueryLog (logQueryLog),
|
|
QueryLog (..),
|
|
QueryLogKind (..),
|
|
)
|
|
import Hasura.GraphQL.Namespace
|
|
import Hasura.GraphQL.ParameterizedQueryHash
|
|
import Hasura.GraphQL.Parser.Directives (CachedDirective (..), DirectiveMap, cached)
|
|
import Hasura.GraphQL.Transport.Backend
|
|
import Hasura.GraphQL.Transport.HTTP.Protocol
|
|
import Hasura.GraphQL.Transport.Instances ()
|
|
import Hasura.HTTP
|
|
( HttpResponse (HttpResponse, _hrBody),
|
|
addHttpResponseHeaders,
|
|
)
|
|
import Hasura.Logging qualified as L
|
|
import Hasura.Metadata.Class
|
|
import Hasura.Prelude
|
|
import Hasura.RQL.IR
|
|
import Hasura.RQL.Types.Action
|
|
import Hasura.RQL.Types.Backend
|
|
import Hasura.RQL.Types.ResultCustomization
|
|
import Hasura.RQL.Types.SchemaCache
|
|
import Hasura.RemoteSchema.SchemaCache
|
|
import Hasura.SQL.AnyBackend qualified as AB
|
|
import Hasura.SQL.Backend
|
|
import Hasura.Server.Init.Config
|
|
import Hasura.Server.Limits
|
|
import Hasura.Server.Logging
|
|
import Hasura.Server.Logging qualified as L
|
|
import Hasura.Server.Prometheus
|
|
( GraphQLRequestMetrics (..),
|
|
PrometheusMetrics (..),
|
|
)
|
|
import Hasura.Server.Telemetry.Counters qualified as Telem
|
|
import Hasura.Server.Types (RequestId)
|
|
import Hasura.Services.Network
|
|
import Hasura.Session
|
|
import Hasura.Tracing (MonadTrace, TraceT, trace)
|
|
import Hasura.Tracing qualified as Tracing
|
|
import Language.GraphQL.Draft.Syntax qualified as G
|
|
import Network.HTTP.Types qualified as HTTP
|
|
import Network.Wai.Extended qualified as Wai
|
|
import System.Metrics.Prometheus.Counter qualified as Prometheus.Counter
|
|
import System.Metrics.Prometheus.Histogram qualified as Prometheus.Histogram
|
|
|
|
data QueryCacheKey = QueryCacheKey
|
|
{ qckQueryString :: !GQLReqParsed,
|
|
qckUserRole :: !RoleName,
|
|
qckSession :: !SessionVariables
|
|
}
|
|
|
|
instance J.ToJSON QueryCacheKey where
|
|
toJSON (QueryCacheKey qs ur sess) =
|
|
J.object ["query_string" J..= qs, "user_role" J..= ur, "session" J..= sess]
|
|
|
|
type CacheStoreResponse = Either CacheStoreFailure CacheStoreSuccess
|
|
|
|
data CacheStoreSuccess
|
|
= CacheStoreSkipped
|
|
| CacheStoreHit
|
|
deriving (Eq, Show)
|
|
|
|
data CacheStoreFailure
|
|
= CacheStoreLimitReached
|
|
| CacheStoreNotEnoughCapacity
|
|
| CacheStoreBackendError String
|
|
deriving (Eq, Show)
|
|
|
|
class Monad m => MonadExecuteQuery m where
|
|
-- | This method does two things: it looks up a query result in the
|
|
-- server-side cache, if a cache is used, and it additionally returns HTTP
|
|
-- headers that can instruct a client how long a response can be cached
|
|
-- locally (i.e. client-side).
|
|
cacheLookup ::
|
|
-- | Used to check if the elaborated query supports caching
|
|
[RemoteSchemaInfo] ->
|
|
-- | Used to check if actions query supports caching (unsupported if `forward_client_headers` is set)
|
|
[ActionsInfo] ->
|
|
-- | Key that uniquely identifies the result of a query execution
|
|
QueryCacheKey ->
|
|
-- | Cached Directive from GraphQL query AST
|
|
Maybe CachedDirective ->
|
|
-- | HTTP headers to be sent back to the caller for this GraphQL request,
|
|
-- containing e.g. time-to-live information, and a cached value if found and
|
|
-- within time-to-live. So a return value (non-empty-ttl-headers, Nothing)
|
|
-- represents that we don't have a server-side cache of the query, but that
|
|
-- the client should store it locally. The value ([], Just json) represents
|
|
-- that the client should not store the response locally, but we do have a
|
|
-- server-side cache value that can be used to avoid query execution.
|
|
TraceT (ExceptT QErr m) (HTTP.ResponseHeaders, Maybe EncJSON)
|
|
|
|
-- | Store a json response for a query that we've executed in the cache. Note
|
|
-- that, as part of this, 'cacheStore' has to decide whether the response is
|
|
-- cacheable. A very similar decision is also made in 'cacheLookup', since it
|
|
-- has to construct corresponding cache-enabling headers that are sent to the
|
|
-- client. But note that the HTTP headers influence client-side caching,
|
|
-- whereas 'cacheStore' changes the server-side cache.
|
|
cacheStore ::
|
|
-- | Key under which to store the result of a query execution
|
|
QueryCacheKey ->
|
|
-- | Cached Directive from GraphQL query AST
|
|
Maybe CachedDirective ->
|
|
-- | Result of a query execution
|
|
EncJSON ->
|
|
-- | Always succeeds
|
|
TraceT (ExceptT QErr m) CacheStoreResponse
|
|
|
|
default cacheLookup ::
|
|
(m ~ t n, MonadTrans t, MonadExecuteQuery n) =>
|
|
[RemoteSchemaInfo] ->
|
|
[ActionsInfo] ->
|
|
QueryCacheKey ->
|
|
Maybe CachedDirective ->
|
|
TraceT (ExceptT QErr m) (HTTP.ResponseHeaders, Maybe EncJSON)
|
|
cacheLookup a b c d = hoist (hoist lift) $ cacheLookup a b c d
|
|
|
|
default cacheStore ::
|
|
(m ~ t n, MonadTrans t, MonadExecuteQuery n) =>
|
|
QueryCacheKey ->
|
|
Maybe CachedDirective ->
|
|
EncJSON ->
|
|
TraceT (ExceptT QErr m) CacheStoreResponse
|
|
cacheStore a b c = hoist (hoist lift) $ cacheStore a b c
|
|
|
|
instance MonadExecuteQuery m => MonadExecuteQuery (ReaderT r m)
|
|
|
|
instance MonadExecuteQuery m => MonadExecuteQuery (ExceptT r m)
|
|
|
|
instance MonadExecuteQuery m => MonadExecuteQuery (TraceT m)
|
|
|
|
-- | A partial response, e.g. from a remote schema call or postgres
|
|
-- postgres query, which we'll assemble into the final response for
|
|
-- the client. It is annotated with timing metadata.
|
|
data AnnotatedResponsePart = AnnotatedResponsePart
|
|
{ arpTimeIO :: DiffTime,
|
|
arpLocality :: Telem.Locality,
|
|
arpResponse :: EncJSON,
|
|
arpHeaders :: HTTP.ResponseHeaders
|
|
}
|
|
|
|
-- | A full response, annotated with timing metadata.
|
|
data AnnotatedResponse = AnnotatedResponse
|
|
{ arQueryType :: Telem.QueryType,
|
|
arTimeIO :: DiffTime,
|
|
arLocality :: Telem.Locality,
|
|
arResponse :: HttpResponse (Maybe GQResponse, EncJSON)
|
|
}
|
|
|
|
-- | Merge response parts into a full response.
|
|
buildResponseFromParts ::
|
|
(MonadError QErr m) =>
|
|
Telem.QueryType ->
|
|
Either (Either GQExecError QErr) (RootFieldMap AnnotatedResponsePart) ->
|
|
HTTP.ResponseHeaders ->
|
|
m AnnotatedResponse
|
|
buildResponseFromParts telemType partsErr cacheHeaders =
|
|
buildResponse telemType partsErr \parts ->
|
|
let responseData = Right $ encJToLBS $ encodeAnnotatedResponseParts parts
|
|
in AnnotatedResponse
|
|
{ arQueryType = telemType,
|
|
arTimeIO = sum (fmap arpTimeIO parts),
|
|
arLocality = foldMap arpLocality parts,
|
|
arResponse =
|
|
HttpResponse
|
|
(Just responseData, encodeGQResp responseData)
|
|
(cacheHeaders <> foldMap arpHeaders parts)
|
|
}
|
|
|
|
buildResponse ::
|
|
(MonadError QErr m) =>
|
|
Telem.QueryType ->
|
|
Either (Either GQExecError QErr) a ->
|
|
(a -> AnnotatedResponse) ->
|
|
m AnnotatedResponse
|
|
buildResponse telemType res f = case res of
|
|
Right a -> pure $ f a
|
|
Left (Right err) -> throwError err
|
|
Left (Left err) ->
|
|
pure $
|
|
AnnotatedResponse
|
|
{ arQueryType = telemType,
|
|
arTimeIO = 0,
|
|
arLocality = Telem.Remote,
|
|
arResponse =
|
|
HttpResponse
|
|
(Just (Left err), encodeGQResp $ Left err)
|
|
[]
|
|
}
|
|
|
|
-- | A predicate on session variables. The 'Monoid' instance makes it simple
|
|
-- to combine several predicates disjunctively.
|
|
-- | The definition includes `Maybe` which allows us to short-circuit calls like @mempty <> m@ and @m <> mempty@, which
|
|
-- otherwise might build up long repeated chains of calls to @\_ _ -> False@.
|
|
newtype SessVarPred = SessVarPred {unSessVarPred :: Maybe (SessionVariable -> SessionVariableValue -> Bool)}
|
|
deriving (Semigroup, Monoid) via (Maybe (SessionVariable -> SessionVariableValue -> Any))
|
|
|
|
keepAllSessionVariables :: SessVarPred
|
|
keepAllSessionVariables = SessVarPred $ Just $ \_ _ -> True
|
|
|
|
runSessVarPred :: SessVarPred -> SessionVariables -> SessionVariables
|
|
runSessVarPred = filterSessionVariables . fromMaybe (\_ _ -> False) . unSessVarPred
|
|
|
|
-- | Filter out only those session variables used by the query AST provided
|
|
filterVariablesFromQuery ::
|
|
[ RootField
|
|
(QueryDBRoot (RemoteRelationshipField UnpreparedValue) UnpreparedValue)
|
|
(RemoteSchemaRootField (RemoteRelationshipField UnpreparedValue) RemoteSchemaVariable)
|
|
(ActionQuery (RemoteRelationshipField UnpreparedValue))
|
|
d
|
|
] ->
|
|
SessVarPred
|
|
filterVariablesFromQuery = foldMap \case
|
|
RFDB _ exists ->
|
|
AB.dispatchAnyBackend @Backend exists \case
|
|
SourceConfigWith _ _ (QDBR db) -> bifoldMap remoteFieldPred toPred db
|
|
RFRemote remote -> foldOf (traverse . _SessionPresetVariable . to match) remote
|
|
RFAction actionQ -> foldMap remoteFieldPred actionQ
|
|
RFRaw {} -> mempty
|
|
RFMulti {} -> mempty
|
|
where
|
|
_SessionPresetVariable :: Traversal' RemoteSchemaVariable SessionVariable
|
|
_SessionPresetVariable f (SessionPresetVariable a b c) =
|
|
(\a' -> SessionPresetVariable a' b c) <$> f a
|
|
_SessionPresetVariable _ x = pure x
|
|
|
|
toPred :: UnpreparedValue bet -> SessVarPred
|
|
-- if we see a reference to the whole session variables object,
|
|
-- then we need to keep everything:
|
|
toPred UVSession = keepAllSessionVariables
|
|
-- if we only see a specific session variable, we only need to keep that one:
|
|
toPred (UVSessionVar _type sv) = match sv
|
|
toPred _ = mempty
|
|
|
|
match :: SessionVariable -> SessVarPred
|
|
match sv = SessVarPred $ Just $ \sv' _ -> sv == sv'
|
|
|
|
remoteFieldPred :: RemoteRelationshipField UnpreparedValue -> SessVarPred
|
|
remoteFieldPred = \case
|
|
RemoteSchemaField RemoteSchemaSelect {..} ->
|
|
foldOf (traverse . _SessionPresetVariable . to match) _rselSelection
|
|
RemoteSourceField exists ->
|
|
AB.dispatchAnyBackend @Backend exists \RemoteSourceSelect {..} ->
|
|
case _rssSelection of
|
|
SourceRelationshipObject obj -> foldMap toPred obj
|
|
SourceRelationshipArray arr -> foldMap toPred arr
|
|
SourceRelationshipArrayAggregate agg -> foldMap toPred agg
|
|
|
|
-- | Run (execute) a single GraphQL query
|
|
runGQ ::
|
|
forall m.
|
|
( MonadIO m,
|
|
MonadBaseControl IO m,
|
|
MonadError QErr m,
|
|
MonadReader E.ExecutionCtx m,
|
|
E.MonadGQLExecutionCheck m,
|
|
MonadQueryLog m,
|
|
MonadTrace m,
|
|
MonadExecuteQuery m,
|
|
MonadMetadataStorage m,
|
|
EB.MonadQueryTags m,
|
|
HasResourceLimits m,
|
|
ProvidesNetwork m
|
|
) =>
|
|
Env.Environment ->
|
|
L.Logger L.Hasura ->
|
|
RequestId ->
|
|
UserInfo ->
|
|
Wai.IpAddress ->
|
|
[HTTP.Header] ->
|
|
E.GraphQLQueryType ->
|
|
GQLReqUnparsed ->
|
|
m (GQLQueryOperationSuccessLog, HttpResponse (Maybe GQResponse, EncJSON))
|
|
runGQ env logger reqId userInfo ipAddress reqHeaders queryType reqUnparsed = do
|
|
E.ExecutionCtx _ sqlGenCtx sc scVer enableAL readOnlyMode prometheusMetrics <- ask
|
|
let gqlMetrics = pmGraphQLRequestMetrics prometheusMetrics
|
|
|
|
(totalTime, (response, parameterizedQueryHash, gqlOpType)) <- withElapsedTime $ do
|
|
(reqParsed, runLimits, queryParts) <- observeGQLQueryError gqlMetrics Nothing $ do
|
|
-- 1. Run system authorization on the 'reqUnparsed :: GQLReqUnparsed' query.
|
|
reqParsed <-
|
|
E.checkGQLExecution userInfo (reqHeaders, ipAddress) enableAL sc reqUnparsed reqId
|
|
>>= flip onLeft throwError
|
|
|
|
operationLimit <- askGraphqlOperationLimit reqId userInfo (scApiLimits sc)
|
|
let runLimits = runResourceLimits operationLimit
|
|
|
|
-- 2. Construct the first step of the execution plan from 'reqParsed :: GQLParsed'.
|
|
queryParts <- getSingleOperation reqParsed
|
|
return (reqParsed, runLimits, queryParts)
|
|
|
|
let gqlOpType = G._todType queryParts
|
|
observeGQLQueryError gqlMetrics (Just gqlOpType) $ do
|
|
-- 3. Construct the remainder of the execution plan.
|
|
let maybeOperationName = _unOperationName <$> _grOperationName reqParsed
|
|
(parameterizedQueryHash, execPlan) <-
|
|
E.getResolvedExecPlan
|
|
env
|
|
logger
|
|
prometheusMetrics
|
|
userInfo
|
|
sqlGenCtx
|
|
readOnlyMode
|
|
sc
|
|
scVer
|
|
queryType
|
|
reqHeaders
|
|
reqUnparsed
|
|
queryParts
|
|
maybeOperationName
|
|
reqId
|
|
|
|
-- 4. Execute the execution plan producing a 'AnnotatedResponse'.
|
|
response <- executePlan reqParsed runLimits execPlan
|
|
return (response, parameterizedQueryHash, gqlOpType)
|
|
|
|
-- 5. Record telemetry
|
|
recordTimings totalTime response
|
|
|
|
-- 6. Record Prometheus metrics (query successes)
|
|
liftIO $ recordGQLQuerySuccess gqlMetrics totalTime gqlOpType
|
|
|
|
-- 7. Return the response along with logging metadata.
|
|
let requestSize = LBS.length $ J.encode reqUnparsed
|
|
responseSize = LBS.length $ encJToLBS $ snd $ _hrBody $ arResponse $ response
|
|
return
|
|
( GQLQueryOperationSuccessLog reqUnparsed totalTime responseSize requestSize parameterizedQueryHash,
|
|
arResponse response
|
|
)
|
|
where
|
|
doQErr :: ExceptT QErr m a -> ExceptT (Either GQExecError QErr) m a
|
|
doQErr = withExceptT Right
|
|
|
|
forWithKey = flip OMap.traverseWithKey
|
|
|
|
executePlan ::
|
|
GQLReqParsed ->
|
|
(m AnnotatedResponse -> m AnnotatedResponse) ->
|
|
E.ResolvedExecutionPlan ->
|
|
m AnnotatedResponse
|
|
executePlan reqParsed runLimits execPlan = case execPlan of
|
|
E.QueryExecutionPlan queryPlans asts dirMap -> trace "Query" $ do
|
|
-- Attempt to lookup a cached response in the query cache.
|
|
-- 'keyedLookup' is a monadic action possibly returning a cache hit.
|
|
-- 'keyedStore' is a function to write a new response to the cache.
|
|
let (keyedLookup, keyedStore) = cacheAccess reqParsed queryPlans asts dirMap
|
|
(cachingHeaders, cachedValue) <- keyedLookup
|
|
case fmap decodeGQResp cachedValue of
|
|
-- If we get a cache hit, annotate the response with metadata and return it.
|
|
Just cachedResponseData -> do
|
|
logQueryLog logger $ QueryLog reqUnparsed Nothing reqId QueryLogKindCached
|
|
pure $
|
|
AnnotatedResponse
|
|
{ arQueryType = Telem.Query,
|
|
arTimeIO = 0,
|
|
arLocality = Telem.Local,
|
|
arResponse = HttpResponse cachedResponseData cachingHeaders
|
|
}
|
|
-- If we get a cache miss, we must run the query against the graphql engine.
|
|
Nothing -> runLimits $ do
|
|
-- 1. 'traverse' the 'ExecutionPlan' executing every step.
|
|
-- TODO: can this be a `catch` rather than a `runExceptT`?
|
|
conclusion <- runExceptT $ forWithKey queryPlans executeQueryStep
|
|
-- 2. Construct an 'AnnotatedResponse' from the results of all steps in the 'ExecutionPlan'.
|
|
result <- buildResponseFromParts Telem.Query conclusion cachingHeaders
|
|
let response@(HttpResponse responseData _) = arResponse result
|
|
-- 3. Cache the 'AnnotatedResponse'.
|
|
cacheStoreRes <- keyedStore (snd responseData)
|
|
let headers = case cacheStoreRes of
|
|
-- Note: Warning header format: "Warning: <warn-code> <warn-agent> <warn-text> [warn-date]"
|
|
-- See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Warning
|
|
Right _ -> []
|
|
(Left CacheStoreLimitReached) -> [("warning", "199 - cache-store-size-limit-exceeded")]
|
|
(Left CacheStoreNotEnoughCapacity) -> [("warning", "199 - cache-store-capacity-exceeded")]
|
|
(Left (CacheStoreBackendError _)) -> [("warning", "199 - cache-store-error")]
|
|
in -- 4. Return the response.
|
|
pure $ result {arResponse = addHttpResponseHeaders headers response}
|
|
E.MutationExecutionPlan mutationPlans -> runLimits $ do
|
|
{- Note [Backwards-compatible transaction optimisation]
|
|
|
|
For backwards compatibility, we perform the following optimisation: if all mutation steps
|
|
are going to the same source, and that source is Postgres, we group all mutations as a
|
|
transaction. This is a somewhat dangerous beaviour, and we would prefer, in the future,
|
|
to make transactionality explicit rather than implicit and context-dependent.
|
|
-}
|
|
case coalescePostgresMutations mutationPlans of
|
|
-- we are in the aforementioned case; we circumvent the normal process
|
|
Just (sourceConfig, resolvedConnectionTemplate, pgMutations) -> do
|
|
res <-
|
|
-- TODO: can this be a `catch` rather than a `runExceptT`?
|
|
runExceptT $
|
|
doQErr $
|
|
runPGMutationTransaction reqId reqUnparsed userInfo logger sourceConfig resolvedConnectionTemplate pgMutations
|
|
-- we do not construct response parts since we have only one part
|
|
buildResponse Telem.Mutation res \(telemTimeIO_DT, parts) ->
|
|
let responseData = Right $ encJToLBS $ encodeEncJSONResults parts
|
|
in AnnotatedResponse
|
|
{ arQueryType = Telem.Mutation,
|
|
arTimeIO = telemTimeIO_DT,
|
|
arLocality = Telem.Local,
|
|
arResponse =
|
|
HttpResponse
|
|
(Just responseData, encodeGQResp responseData)
|
|
[]
|
|
}
|
|
|
|
-- we are not in the transaction case; proceeding normally
|
|
Nothing -> do
|
|
-- TODO: can this be a `catch` rather than a `runExceptT`?
|
|
conclusion <- runExceptT $ forWithKey mutationPlans executeMutationStep
|
|
buildResponseFromParts Telem.Mutation conclusion []
|
|
E.SubscriptionExecutionPlan _sub ->
|
|
throw400 UnexpectedPayload "subscriptions are not supported over HTTP, use websockets instead"
|
|
|
|
executeQueryStep ::
|
|
RootFieldAlias ->
|
|
EB.ExecutionStep ->
|
|
ExceptT (Either GQExecError QErr) m AnnotatedResponsePart
|
|
executeQueryStep fieldName = \case
|
|
E.ExecStepDB _headers exists remoteJoins -> doQErr $ do
|
|
(telemTimeIO_DT, resp) <-
|
|
AB.dispatchAnyBackend @BackendTransport
|
|
exists
|
|
\(EB.DBStepInfo _ sourceConfig genSql tx resolvedConnectionTemplate :: EB.DBStepInfo b) ->
|
|
runDBQuery @b reqId reqUnparsed fieldName userInfo logger sourceConfig tx genSql resolvedConnectionTemplate
|
|
finalResponse <-
|
|
RJ.processRemoteJoins reqId logger env reqHeaders userInfo resp remoteJoins reqUnparsed
|
|
pure $ AnnotatedResponsePart telemTimeIO_DT Telem.Local finalResponse []
|
|
E.ExecStepRemote rsi resultCustomizer gqlReq remoteJoins -> do
|
|
logQueryLog logger $ QueryLog reqUnparsed Nothing reqId QueryLogKindRemoteSchema
|
|
runRemoteGQ fieldName rsi resultCustomizer gqlReq remoteJoins
|
|
E.ExecStepAction aep _ remoteJoins -> do
|
|
logQueryLog logger $ QueryLog reqUnparsed Nothing reqId QueryLogKindAction
|
|
(time, resp) <- doQErr $ do
|
|
(time, (resp, _)) <- EA.runActionExecution userInfo aep
|
|
finalResponse <-
|
|
RJ.processRemoteJoins reqId logger env reqHeaders userInfo resp remoteJoins reqUnparsed
|
|
pure (time, finalResponse)
|
|
pure $ AnnotatedResponsePart time Telem.Empty resp []
|
|
E.ExecStepRaw json -> do
|
|
logQueryLog logger $ QueryLog reqUnparsed Nothing reqId QueryLogKindIntrospection
|
|
buildRaw json
|
|
-- For `ExecStepMulti`, execute all steps and then concat them in a list
|
|
E.ExecStepMulti lst -> do
|
|
_all <- traverse (executeQueryStep fieldName) lst
|
|
pure $ AnnotatedResponsePart 0 Telem.Local (encJFromList (map arpResponse _all)) []
|
|
|
|
executeMutationStep ::
|
|
RootFieldAlias ->
|
|
EB.ExecutionStep ->
|
|
ExceptT (Either GQExecError QErr) m AnnotatedResponsePart
|
|
executeMutationStep fieldName = \case
|
|
E.ExecStepDB responseHeaders exists remoteJoins -> doQErr $ do
|
|
(telemTimeIO_DT, resp) <-
|
|
AB.dispatchAnyBackend @BackendTransport
|
|
exists
|
|
\(EB.DBStepInfo _ sourceConfig genSql tx resolvedConnectionTemplate :: EB.DBStepInfo b) ->
|
|
runDBMutation @b reqId reqUnparsed fieldName userInfo logger sourceConfig tx genSql resolvedConnectionTemplate
|
|
finalResponse <-
|
|
RJ.processRemoteJoins reqId logger env reqHeaders userInfo resp remoteJoins reqUnparsed
|
|
pure $ AnnotatedResponsePart telemTimeIO_DT Telem.Local finalResponse responseHeaders
|
|
E.ExecStepRemote rsi resultCustomizer gqlReq remoteJoins -> do
|
|
logQueryLog logger $ QueryLog reqUnparsed Nothing reqId QueryLogKindRemoteSchema
|
|
runRemoteGQ fieldName rsi resultCustomizer gqlReq remoteJoins
|
|
E.ExecStepAction aep _ remoteJoins -> do
|
|
logQueryLog logger $ QueryLog reqUnparsed Nothing reqId QueryLogKindAction
|
|
(time, (resp, hdrs)) <- doQErr $ do
|
|
(time, (resp, hdrs)) <- EA.runActionExecution userInfo aep
|
|
finalResponse <-
|
|
RJ.processRemoteJoins reqId logger env reqHeaders userInfo resp remoteJoins reqUnparsed
|
|
pure (time, (finalResponse, hdrs))
|
|
pure $ AnnotatedResponsePart time Telem.Empty resp $ fromMaybe [] hdrs
|
|
E.ExecStepRaw json -> do
|
|
logQueryLog logger $ QueryLog reqUnparsed Nothing reqId QueryLogKindIntrospection
|
|
buildRaw json
|
|
-- For `ExecStepMulti`, execute all steps and then concat them in a list
|
|
E.ExecStepMulti lst -> do
|
|
_all <- traverse (executeQueryStep fieldName) lst
|
|
pure $ AnnotatedResponsePart 0 Telem.Local (encJFromList (map arpResponse _all)) []
|
|
|
|
runRemoteGQ fieldName rsi resultCustomizer gqlReq remoteJoins = do
|
|
(telemTimeIO_DT, remoteResponseHeaders, resp) <-
|
|
doQErr $ E.execRemoteGQ env userInfo reqHeaders (rsDef rsi) gqlReq
|
|
value <- extractFieldFromResponse fieldName resultCustomizer resp
|
|
finalResponse <-
|
|
doQErr $
|
|
RJ.processRemoteJoins
|
|
reqId
|
|
logger
|
|
env
|
|
reqHeaders
|
|
userInfo
|
|
-- TODO: avoid encode and decode here
|
|
(encJFromOrderedValue value)
|
|
remoteJoins
|
|
reqUnparsed
|
|
let filteredHeaders = filter ((== "Set-Cookie") . fst) remoteResponseHeaders
|
|
pure $ AnnotatedResponsePart telemTimeIO_DT Telem.Remote finalResponse filteredHeaders
|
|
|
|
cacheAccess ::
|
|
GQLReqParsed ->
|
|
EB.ExecutionPlan ->
|
|
[QueryRootField UnpreparedValue] ->
|
|
DirectiveMap ->
|
|
( m (HTTP.ResponseHeaders, Maybe EncJSON),
|
|
EncJSON -> m CacheStoreResponse
|
|
)
|
|
cacheAccess reqParsed queryPlans asts dirMap =
|
|
let filteredSessionVars = runSessVarPred (filterVariablesFromQuery asts) (_uiSession userInfo)
|
|
remoteSchemas =
|
|
OMap.elems queryPlans >>= \case
|
|
E.ExecStepDB _headers _dbAST remoteJoins -> do
|
|
maybe [] (map RJ._rsjRemoteSchema . RJ.getRemoteSchemaJoins) remoteJoins
|
|
_ -> []
|
|
getExecStepActionWithActionInfo acc execStep = case execStep of
|
|
EB.ExecStepAction _ actionInfo _remoteJoins -> (actionInfo : acc)
|
|
_ -> acc
|
|
actionsInfo =
|
|
foldl getExecStepActionWithActionInfo [] $
|
|
OMap.elems $
|
|
OMap.filter
|
|
( \case
|
|
E.ExecStepAction _ _ _remoteJoins -> True
|
|
_ -> False
|
|
)
|
|
queryPlans
|
|
cacheKey = QueryCacheKey reqParsed (_uiRole userInfo) filteredSessionVars
|
|
cachedDirective = runIdentity <$> DM.lookup cached dirMap
|
|
in ( Tracing.interpTraceT (liftEitherM . runExceptT) $
|
|
cacheLookup remoteSchemas actionsInfo cacheKey cachedDirective,
|
|
Tracing.interpTraceT (liftEitherM . runExceptT)
|
|
. cacheStore cacheKey cachedDirective
|
|
)
|
|
|
|
recordTimings :: DiffTime -> AnnotatedResponse -> m ()
|
|
recordTimings totalTime result = do
|
|
Telem.recordTimingMetric
|
|
Telem.RequestDimensions
|
|
{ telemTransport = Telem.HTTP,
|
|
telemQueryType = arQueryType result,
|
|
telemLocality = arLocality result
|
|
}
|
|
Telem.RequestTimings
|
|
{ telemTimeIO = convertDuration $ arTimeIO result,
|
|
telemTimeTot = convertDuration totalTime
|
|
}
|
|
|
|
-- Catch, record, and re-throw errors.
|
|
observeGQLQueryError ::
|
|
forall n e a.
|
|
( MonadIO n,
|
|
MonadError e n
|
|
) =>
|
|
GraphQLRequestMetrics ->
|
|
Maybe G.OperationType ->
|
|
n a ->
|
|
n a
|
|
observeGQLQueryError gqlMetrics mOpType action =
|
|
catchError (fmap Right action) (pure . Left) >>= \case
|
|
Right result ->
|
|
pure result
|
|
Left err -> do
|
|
case mOpType of
|
|
Nothing ->
|
|
liftIO $ Prometheus.Counter.inc (gqlRequestsUnknownFailure gqlMetrics)
|
|
Just opType -> case opType of
|
|
G.OperationTypeQuery ->
|
|
liftIO $ Prometheus.Counter.inc (gqlRequestsQueryFailure gqlMetrics)
|
|
G.OperationTypeMutation ->
|
|
liftIO $ Prometheus.Counter.inc (gqlRequestsMutationFailure gqlMetrics)
|
|
G.OperationTypeSubscription ->
|
|
-- We do not collect metrics for subscriptions at the request level.
|
|
pure ()
|
|
throwError err
|
|
|
|
-- Tally and record execution times for successful GraphQL requests.
|
|
recordGQLQuerySuccess ::
|
|
GraphQLRequestMetrics -> DiffTime -> G.OperationType -> IO ()
|
|
recordGQLQuerySuccess gqlMetrics totalTime = \case
|
|
G.OperationTypeQuery -> liftIO $ do
|
|
Prometheus.Counter.inc (gqlRequestsQuerySuccess gqlMetrics)
|
|
Prometheus.Histogram.observe (gqlExecutionTimeSecondsQuery gqlMetrics) (realToFrac totalTime)
|
|
G.OperationTypeMutation -> liftIO $ do
|
|
Prometheus.Counter.inc (gqlRequestsMutationSuccess gqlMetrics)
|
|
Prometheus.Histogram.observe (gqlExecutionTimeSecondsMutation gqlMetrics) (realToFrac totalTime)
|
|
G.OperationTypeSubscription ->
|
|
-- We do not collect metrics for subscriptions at the request level.
|
|
-- Furthermore, we do not serve GraphQL subscriptions over HTTP.
|
|
pure ()
|
|
|
|
coalescePostgresMutations ::
|
|
EB.ExecutionPlan ->
|
|
Maybe
|
|
( SourceConfig ('Postgres 'Vanilla),
|
|
ResolvedConnectionTemplate ('Postgres 'Vanilla),
|
|
InsOrdHashMap RootFieldAlias (EB.DBStepInfo ('Postgres 'Vanilla))
|
|
)
|
|
coalescePostgresMutations plan = do
|
|
-- we extract the name and config of the first mutation root, if any
|
|
(oneSourceName, oneResolvedConnectionTemplate, oneSourceConfig) <- case toList plan of
|
|
(E.ExecStepDB _ exists _remoteJoins : _) ->
|
|
AB.unpackAnyBackend @('Postgres 'Vanilla) exists <&> \dbsi ->
|
|
( EB.dbsiSourceName dbsi,
|
|
EB.dbsiResolvedConnectionTemplate dbsi,
|
|
EB.dbsiSourceConfig dbsi
|
|
)
|
|
_ -> Nothing
|
|
-- we then test whether all mutations are going to that same first source
|
|
-- and that it is Postgres
|
|
mutations <- for plan \case
|
|
E.ExecStepDB _ exists remoteJoins -> do
|
|
dbStepInfo <- AB.unpackAnyBackend @('Postgres 'Vanilla) exists
|
|
guard $
|
|
oneSourceName == EB.dbsiSourceName dbStepInfo
|
|
&& isNothing remoteJoins
|
|
&& oneResolvedConnectionTemplate == EB.dbsiResolvedConnectionTemplate dbStepInfo
|
|
Just dbStepInfo
|
|
_ -> Nothing
|
|
Just (oneSourceConfig, oneResolvedConnectionTemplate, mutations)
|
|
|
|
data GraphQLResponse
|
|
= GraphQLResponseErrors [J.Value]
|
|
| GraphQLResponseData JO.Value
|
|
|
|
decodeGraphQLResponse :: LBS.ByteString -> Either Text GraphQLResponse
|
|
decodeGraphQLResponse bs = do
|
|
val <- mapLeft T.pack $ JO.eitherDecode bs
|
|
valObj <- JO.asObject val
|
|
case JO.lookup "errors" valObj of
|
|
Just (JO.Array errs) -> Right $ GraphQLResponseErrors (toList $ JO.fromOrdered <$> errs)
|
|
Just _ -> Left "Invalid \"errors\" field in response from remote"
|
|
Nothing -> do
|
|
dataVal <- JO.lookup "data" valObj `onNothing` Left "Missing \"data\" field in response from remote"
|
|
Right $ GraphQLResponseData dataVal
|
|
|
|
extractFieldFromResponse ::
|
|
forall m.
|
|
Monad m =>
|
|
RootFieldAlias ->
|
|
ResultCustomizer ->
|
|
LBS.ByteString ->
|
|
ExceptT (Either GQExecError QErr) m JO.Value
|
|
extractFieldFromResponse fieldName resultCustomizer resp = do
|
|
let fieldName' = G.unName $ _rfaAlias fieldName
|
|
dataVal <-
|
|
applyResultCustomizer resultCustomizer
|
|
<$> do
|
|
graphQLResponse <- decodeGraphQLResponse resp `onLeft` do400
|
|
case graphQLResponse of
|
|
GraphQLResponseErrors errs -> doGQExecError errs
|
|
GraphQLResponseData d -> pure d
|
|
dataObj <- onLeft (JO.asObject dataVal) do400
|
|
fieldVal <-
|
|
onNothing (JO.lookup fieldName' dataObj) $
|
|
do400 $
|
|
"expecting key " <> fieldName'
|
|
return fieldVal
|
|
where
|
|
do400 = withExceptT Right . throw400 RemoteSchemaError
|
|
doGQExecError = withExceptT Left . throwError . GQExecError
|
|
|
|
buildRaw :: Applicative m => JO.Value -> m AnnotatedResponsePart
|
|
buildRaw json = do
|
|
let obj = encJFromOrderedValue json
|
|
telemTimeIO_DT = 0
|
|
pure $ AnnotatedResponsePart telemTimeIO_DT Telem.Local obj []
|
|
|
|
encodeAnnotatedResponseParts :: RootFieldMap AnnotatedResponsePart -> EncJSON
|
|
encodeAnnotatedResponseParts = encodeEncJSONResults . fmap arpResponse
|
|
|
|
encodeEncJSONResults :: RootFieldMap EncJSON -> EncJSON
|
|
encodeEncJSONResults =
|
|
encNameMap . fmap (namespacedField id encNameMap) . unflattenNamespaces
|
|
where
|
|
encNameMap = encJFromInsOrdHashMap . OMap.mapKeys G.unName
|
|
|
|
-- | Run (execute) a batched GraphQL query (see 'GQLBatchedReqs').
|
|
runGQBatched ::
|
|
forall m.
|
|
( MonadIO m,
|
|
MonadBaseControl IO m,
|
|
MonadError QErr m,
|
|
MonadReader E.ExecutionCtx m,
|
|
E.MonadGQLExecutionCheck m,
|
|
MonadQueryLog m,
|
|
MonadTrace m,
|
|
MonadExecuteQuery m,
|
|
MonadMetadataStorage m,
|
|
EB.MonadQueryTags m,
|
|
HasResourceLimits m,
|
|
ProvidesNetwork m
|
|
) =>
|
|
Env.Environment ->
|
|
L.Logger L.Hasura ->
|
|
RequestId ->
|
|
ResponseInternalErrorsConfig ->
|
|
UserInfo ->
|
|
Wai.IpAddress ->
|
|
[HTTP.Header] ->
|
|
E.GraphQLQueryType ->
|
|
-- | the batched request with unparsed GraphQL query
|
|
GQLBatchedReqs (GQLReq GQLQueryText) ->
|
|
m (HttpLogGraphQLInfo, HttpResponse EncJSON)
|
|
runGQBatched env logger reqId responseErrorsConfig userInfo ipAddress reqHdrs queryType query =
|
|
case query of
|
|
GQLSingleRequest req -> do
|
|
(gqlQueryOperationLog, httpResp) <- runGQ env logger reqId userInfo ipAddress reqHdrs queryType req
|
|
let httpLoggingGQInfo = (CommonHttpLogMetadata L.RequestModeSingle (Just (GQLSingleRequest (GQLQueryOperationSuccess gqlQueryOperationLog))), (PQHSetSingleton (gqolParameterizedQueryHash gqlQueryOperationLog)))
|
|
pure (httpLoggingGQInfo, snd <$> httpResp)
|
|
GQLBatchedReqs reqs -> do
|
|
-- It's unclear what we should do if we receive multiple
|
|
-- responses with distinct headers, so just do the simplest thing
|
|
-- in this case, and don't forward any.
|
|
executionCtx <- ask
|
|
E.checkGQLBatchedReqs userInfo reqId reqs (E._ecxSchemaCache executionCtx) >>= flip onLeft throwError
|
|
let includeInternal = shouldIncludeInternal (_uiRole userInfo) responseErrorsConfig
|
|
removeHeaders =
|
|
flip HttpResponse []
|
|
. encJFromList
|
|
. map (either (encJFromJValue . encodeGQErr includeInternal) _hrBody)
|
|
responses <- traverse (\req -> fmap (req,) . try . (fmap . fmap . fmap) snd . runGQ env logger reqId userInfo ipAddress reqHdrs queryType $ req) reqs
|
|
let requestsOperationLogs = map fst $ rights $ map snd responses
|
|
batchOperationLogs =
|
|
map
|
|
( \(req, resp) ->
|
|
case resp of
|
|
Left err -> GQLQueryOperationError $ GQLQueryOperationErrorLog req err
|
|
Right (successOpLog, _) -> GQLQueryOperationSuccess successOpLog
|
|
)
|
|
responses
|
|
parameterizedQueryHashes = map gqolParameterizedQueryHash requestsOperationLogs
|
|
httpLoggingGQInfo = (CommonHttpLogMetadata L.RequestModeBatched ((Just (GQLBatchedReqs batchOperationLogs))), PQHSetBatched parameterizedQueryHashes)
|
|
pure (httpLoggingGQInfo, removeHeaders (map ((fmap snd) . snd) responses))
|
|
where
|
|
try = flip catchError (pure . Left) . fmap Right
|