mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-18 13:02:11 +03:00
fbd1262ea6
### Description
This PR implements operation timeouts, as specced in #1232.
RFC: [rfcs/operation-timeout-api-limits.md](c025a90fe9/rfcs/operation-timeout-api-limits.md
)
There's still some things to be done (tests and docs most notably), but apart from that it can
be reviewed. I'd still appreciate feedback on the RFC!
TODO:
- [x] break out the `ApiLimits` refactoring into a separate PR: #2103
- [x] finish the `pg-client-hs` PR: https://github.com/hasura/pg-client-hs/pull/39
- [x] remove configurability, after testing, prior to merging
- [ ] tests: #2390 has some tests that I've run locally to confirm things work on a fundamental level
- [x] changelog
- [x] documentation
- [x] fill in the detailed PR checklist
### Changelog
- [x] `CHANGELOG.md` is updated with user-facing content relevant to this PR. If no changelog is required, then add the `no-changelog-required` label.
### Affected components
- [x] Server
- [ ] Console
- [ ] CLI
- [x] Docs
- [ ] Tests
### Related Issues
Product spec: #1232.
### Solution and Design
Compare `rfcs/operation-timeout-api-limits.md`.
### Steps to test and verify
Configure operation timeouts, e.g. by posting
```
{
"type": "set_api_limits",
"args": {
"operation_timeout": {
"global": 3
}
}
}
```
to `v1/metadata` to set an operation timeout of 3s. Then verify that
1. non-admin queries that take longer than 3s time out with a nice error message
2. that those queries return after ~3s (at least for postgres)
3. also that everything else still works as usual
### Limitations, known bugs & workarounds
- while this will cause slow queries against any backends to fail, it's only verified to actually interrupt queries against postgres
- this will only successfully short-cut (cancel) queries to postgres if the database server is responsive
#### Catalog upgrade
Does this PR change Hasura Catalog version?
- [x] No
#### Metadata
Does this PR add a new Metadata feature?
- [x] Yes
- Does `run_sql` auto manages the new metadata through schema diffing?
- [x] Not required
- Does `run_sql` auto manages the definitions of metadata on renaming?
- [x] Not required
- Does `export_metadata`/`replace_metadata` supports the new metadata added?
- [x] Yes
#### GraphQL
- [x] No new GraphQL schema is generated
#### Breaking changes
- [x] No Breaking changes
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/1593
GitOrigin-RevId: f0582d0be3ed9fadf89e0c4aaf96344d18331dc4
618 lines
26 KiB
Haskell
618 lines
26 KiB
Haskell
-- | Execution of GraphQL queries over HTTP transport
|
|
module Hasura.GraphQL.Transport.HTTP
|
|
( QueryCacheKey (..),
|
|
MonadExecuteQuery (..),
|
|
CachedDirective (..),
|
|
runGQ,
|
|
runGQBatched,
|
|
coalescePostgresMutations,
|
|
extractFieldFromResponse,
|
|
buildRaw,
|
|
|
|
-- * imported from HTTP.Protocol; required by pro
|
|
GQLReq (..),
|
|
GQLReqUnparsed,
|
|
GQLReqParsed,
|
|
GQLExecDoc (..),
|
|
OperationName (..),
|
|
GQLQueryText (..),
|
|
ResultsFragment (..),
|
|
CacheStoreSuccess (..),
|
|
CacheStoreFailure (..),
|
|
SessVarPred,
|
|
filterVariablesFromQuery,
|
|
runSessVarPred,
|
|
)
|
|
where
|
|
|
|
import Control.Lens (Traversal', toListOf, _4)
|
|
import Control.Monad.Morph (hoist)
|
|
import Control.Monad.Trans.Control (MonadBaseControl)
|
|
import Data.Aeson qualified as J
|
|
import Data.Aeson.Ordered qualified as JO
|
|
import Data.ByteString.Lazy qualified as LBS
|
|
import Data.Dependent.Map qualified as DM
|
|
import Data.Environment qualified as Env
|
|
import Data.HashMap.Strict.InsOrd qualified as OMap
|
|
import Data.Text qualified as T
|
|
import Hasura.Backends.Postgres.Instances.Transport (runPGMutationTransaction)
|
|
import Hasura.Base.Error
|
|
import Hasura.EncJSON
|
|
import Hasura.GraphQL.Execute qualified as E
|
|
import Hasura.GraphQL.Execute.Action qualified as EA
|
|
import Hasura.GraphQL.Execute.Backend qualified as EB
|
|
import Hasura.GraphQL.Execute.RemoteJoin qualified as RJ
|
|
import Hasura.GraphQL.Logging
|
|
( MonadQueryLog (logQueryLog),
|
|
QueryLog (..),
|
|
QueryLogKind (..),
|
|
)
|
|
import Hasura.GraphQL.ParameterizedQueryHash
|
|
import Hasura.GraphQL.Parser.Column (UnpreparedValue (..))
|
|
import Hasura.GraphQL.Parser.Directives (CachedDirective (..), cached)
|
|
import Hasura.GraphQL.Transport.Backend
|
|
import Hasura.GraphQL.Transport.HTTP.Protocol
|
|
import Hasura.GraphQL.Transport.Instances ()
|
|
import Hasura.HTTP
|
|
import Hasura.Logging qualified as L
|
|
import Hasura.Metadata.Class
|
|
import Hasura.Prelude
|
|
import Hasura.RQL.IR
|
|
import Hasura.RQL.Types
|
|
import Hasura.SQL.AnyBackend qualified as AB
|
|
import Hasura.Server.Init.Config
|
|
import Hasura.Server.Limits
|
|
import Hasura.Server.Logging
|
|
import Hasura.Server.Logging qualified as L
|
|
import Hasura.Server.Telemetry.Counters qualified as Telem
|
|
import Hasura.Server.Types (RequestId)
|
|
import Hasura.Server.Version (HasVersion)
|
|
import Hasura.Session
|
|
import Hasura.Tracing (MonadTrace, TraceT, trace)
|
|
import Hasura.Tracing qualified as Tracing
|
|
import Language.GraphQL.Draft.Syntax qualified as G
|
|
import Network.HTTP.Types qualified as HTTP
|
|
import Network.Wai.Extended qualified as Wai
|
|
|
|
data QueryCacheKey = QueryCacheKey
|
|
{ qckQueryString :: !GQLReqParsed,
|
|
qckUserRole :: !RoleName,
|
|
qckSession :: !SessionVariables
|
|
}
|
|
|
|
instance J.ToJSON QueryCacheKey where
|
|
toJSON (QueryCacheKey qs ur sess) =
|
|
J.object ["query_string" J..= qs, "user_role" J..= ur, "session" J..= sess]
|
|
|
|
type CacheStoreResponse = Either CacheStoreFailure CacheStoreSuccess
|
|
|
|
data CacheStoreSuccess
|
|
= CacheStoreSkipped
|
|
| CacheStoreHit
|
|
deriving (Eq, Show)
|
|
|
|
data CacheStoreFailure
|
|
= CacheStoreLimitReached
|
|
| CacheStoreNotEnoughCapacity
|
|
| CacheStoreBackendError String
|
|
deriving (Eq, Show)
|
|
|
|
class Monad m => MonadExecuteQuery m where
|
|
-- | This method does two things: it looks up a query result in the
|
|
-- server-side cache, if a cache is used, and it additionally returns HTTP
|
|
-- headers that can instruct a client how long a response can be cached
|
|
-- locally (i.e. client-side).
|
|
cacheLookup ::
|
|
-- | Used to check if the elaborated query supports caching
|
|
[RemoteSchemaInfo] ->
|
|
-- | Used to check if actions query supports caching (unsupported if `forward_client_headers` is set)
|
|
[ActionsInfo] ->
|
|
-- | Key that uniquely identifies the result of a query execution
|
|
QueryCacheKey ->
|
|
-- | Cached Directive from GraphQL query AST
|
|
Maybe CachedDirective ->
|
|
-- | HTTP headers to be sent back to the caller for this GraphQL request,
|
|
-- containing e.g. time-to-live information, and a cached value if found and
|
|
-- within time-to-live. So a return value (non-empty-ttl-headers, Nothing)
|
|
-- represents that we don't have a server-side cache of the query, but that
|
|
-- the client should store it locally. The value ([], Just json) represents
|
|
-- that the client should not store the response locally, but we do have a
|
|
-- server-side cache value that can be used to avoid query execution.
|
|
TraceT (ExceptT QErr m) (HTTP.ResponseHeaders, Maybe EncJSON)
|
|
|
|
-- | Store a json response for a query that we've executed in the cache. Note
|
|
-- that, as part of this, 'cacheStore' has to decide whether the response is
|
|
-- cacheable. A very similar decision is also made in 'cacheLookup', since it
|
|
-- has to construct corresponding cache-enabling headers that are sent to the
|
|
-- client. But note that the HTTP headers influence client-side caching,
|
|
-- whereas 'cacheStore' changes the server-side cache.
|
|
cacheStore ::
|
|
-- | Key under which to store the result of a query execution
|
|
QueryCacheKey ->
|
|
-- | Cached Directive from GraphQL query AST
|
|
Maybe CachedDirective ->
|
|
-- | Result of a query execution
|
|
EncJSON ->
|
|
-- | Always succeeds
|
|
TraceT (ExceptT QErr m) CacheStoreResponse
|
|
|
|
default cacheLookup ::
|
|
(m ~ t n, MonadTrans t, MonadExecuteQuery n) =>
|
|
[RemoteSchemaInfo] ->
|
|
[ActionsInfo] ->
|
|
QueryCacheKey ->
|
|
Maybe CachedDirective ->
|
|
TraceT (ExceptT QErr m) (HTTP.ResponseHeaders, Maybe EncJSON)
|
|
cacheLookup a b c d = hoist (hoist lift) $ cacheLookup a b c d
|
|
|
|
default cacheStore ::
|
|
(m ~ t n, MonadTrans t, MonadExecuteQuery n) =>
|
|
QueryCacheKey ->
|
|
Maybe CachedDirective ->
|
|
EncJSON ->
|
|
TraceT (ExceptT QErr m) CacheStoreResponse
|
|
cacheStore a b c = hoist (hoist lift) $ cacheStore a b c
|
|
|
|
instance MonadExecuteQuery m => MonadExecuteQuery (ReaderT r m)
|
|
|
|
instance MonadExecuteQuery m => MonadExecuteQuery (ExceptT r m)
|
|
|
|
instance MonadExecuteQuery m => MonadExecuteQuery (TraceT m)
|
|
|
|
instance MonadExecuteQuery m => MonadExecuteQuery (MetadataStorageT m)
|
|
|
|
-- | A partial result, e.g. from a remote schema or postgres, which we'll
|
|
-- assemble into the final result for the client.
|
|
--
|
|
-- Nothing to do with graphql fragments...
|
|
data ResultsFragment = ResultsFragment
|
|
{ rfTimeIO :: DiffTime,
|
|
rfLocality :: Telem.Locality,
|
|
rfResponse :: EncJSON,
|
|
rfHeaders :: HTTP.ResponseHeaders
|
|
}
|
|
|
|
-- | A predicate on session variables. The 'Monoid' instance makes it simple
|
|
-- to combine several predicates disjunctively.
|
|
newtype SessVarPred = SessVarPred {unSessVarPred :: SessionVariable -> SessionVariableValue -> Bool}
|
|
|
|
keepAllSessionVariables :: SessVarPred
|
|
keepAllSessionVariables = SessVarPred $ \_ _ -> True
|
|
|
|
instance Semigroup SessVarPred where
|
|
SessVarPred p1 <> SessVarPred p2 = SessVarPred $ \sv svv ->
|
|
p1 sv svv || p2 sv svv
|
|
|
|
instance Monoid SessVarPred where
|
|
mempty = SessVarPred $ \_ _ -> False
|
|
|
|
runSessVarPred :: SessVarPred -> SessionVariables -> SessionVariables
|
|
runSessVarPred = filterSessionVariables . unSessVarPred
|
|
|
|
-- | Filter out only those session variables used by the query AST provided
|
|
filterVariablesFromQuery ::
|
|
Backend backend =>
|
|
[RootField (QueryDBRoot (RemoteSelect UnpreparedValue) UnpreparedValue) RemoteField (ActionQuery backend (RemoteSelect UnpreparedValue) (UnpreparedValue backend)) d] ->
|
|
SessVarPred
|
|
filterVariablesFromQuery query = fold $ rootToSessVarPreds =<< query
|
|
where
|
|
rootToSessVarPreds = \case
|
|
RFDB _ exists ->
|
|
AB.dispatchAnyBackend @Backend exists \case
|
|
SourceConfigWith _ _ (QDBR db) -> toPred <$> toListOf traverse db
|
|
RFRemote remote -> match <$> toListOf (traverse . _SessionPresetVariable) remote
|
|
RFAction actionQ -> toPred <$> toListOf traverse actionQ
|
|
_ -> []
|
|
|
|
_SessionPresetVariable :: Traversal' RemoteSchemaVariable SessionVariable
|
|
_SessionPresetVariable f (SessionPresetVariable a b c) =
|
|
(\a' -> SessionPresetVariable a' b c) <$> f a
|
|
_SessionPresetVariable _ x = pure x
|
|
|
|
toPred :: UnpreparedValue bet -> SessVarPred
|
|
-- if we see a reference to the whole session variables object,
|
|
-- then we need to keep everything:
|
|
toPred UVSession = keepAllSessionVariables
|
|
-- if we only see a specific session variable, we only need to keep that one:
|
|
toPred (UVSessionVar _type sv) = match sv
|
|
toPred _ = mempty
|
|
|
|
match :: SessionVariable -> SessVarPred
|
|
match sv = SessVarPred $ \sv' _ -> sv == sv'
|
|
|
|
-- | Run (execute) a single GraphQL query
|
|
runGQ ::
|
|
forall m.
|
|
( HasVersion,
|
|
MonadIO m,
|
|
MonadBaseControl IO m,
|
|
MonadError QErr m,
|
|
MonadReader E.ExecutionCtx m,
|
|
E.MonadGQLExecutionCheck m,
|
|
MonadQueryLog m,
|
|
MonadTrace m,
|
|
MonadExecuteQuery m,
|
|
MonadMetadataStorage (MetadataStorageT m),
|
|
EB.MonadQueryTags m,
|
|
HasResourceLimits m
|
|
) =>
|
|
Env.Environment ->
|
|
L.Logger L.Hasura ->
|
|
RequestId ->
|
|
UserInfo ->
|
|
Wai.IpAddress ->
|
|
[HTTP.Header] ->
|
|
E.GraphQLQueryType ->
|
|
GQLReqUnparsed ->
|
|
m (GQLQueryOperationSuccessLog, HttpResponse (Maybe GQResponse, EncJSON))
|
|
runGQ env logger reqId userInfo ipAddress reqHeaders queryType reqUnparsed = do
|
|
(totalTime, (telemQueryType, telemTimeIO_DT, telemLocality, resp, parameterizedQueryHash)) <- withElapsedTime $ do
|
|
E.ExecutionCtx _ sqlGenCtx sc scVer httpManager enableAL <- ask
|
|
|
|
-- run system authorization on the GraphQL API
|
|
reqParsed <-
|
|
E.checkGQLExecution userInfo (reqHeaders, ipAddress) enableAL sc reqUnparsed
|
|
>>= flip onLeft throwError
|
|
|
|
operationLimit <- askGraphqlOperationLimit
|
|
let runLimits = runResourceLimits $ operationLimit userInfo (scApiLimits sc)
|
|
|
|
(parameterizedQueryHash, execPlan) <-
|
|
E.getResolvedExecPlan
|
|
env
|
|
logger
|
|
userInfo
|
|
sqlGenCtx
|
|
sc
|
|
scVer
|
|
queryType
|
|
httpManager
|
|
reqHeaders
|
|
(reqUnparsed, reqParsed)
|
|
reqId
|
|
|
|
case execPlan of
|
|
E.QueryExecutionPlan queryPlans asts dirMap -> trace "Query" $ do
|
|
let filteredSessionVars = runSessVarPred (filterVariablesFromQuery asts) (_uiSession userInfo)
|
|
cacheKey = QueryCacheKey reqParsed (_uiRole userInfo) filteredSessionVars
|
|
remoteSchemas =
|
|
OMap.elems queryPlans >>= \case
|
|
E.ExecStepDB _headers _dbAST remoteJoins -> do
|
|
maybe [] (map RJ._rsjRemoteSchema . RJ.getRemoteSchemaJoins) remoteJoins
|
|
_ -> []
|
|
actionsInfo =
|
|
foldl getExecStepActionWithActionInfo [] $
|
|
OMap.elems $
|
|
OMap.filter
|
|
( \case
|
|
E.ExecStepAction _ _ _remoteJoins -> True
|
|
_ -> False
|
|
)
|
|
queryPlans
|
|
cachedDirective = runIdentity <$> DM.lookup cached dirMap
|
|
|
|
(responseHeaders, cachedValue) <- Tracing.interpTraceT (liftEitherM . runExceptT) $ cacheLookup remoteSchemas actionsInfo cacheKey cachedDirective
|
|
case fmap decodeGQResp cachedValue of
|
|
Just cachedResponseData -> do
|
|
logQueryLog logger $ QueryLog reqUnparsed Nothing reqId QueryLogKindCached
|
|
pure (Telem.Query, 0, Telem.Local, HttpResponse cachedResponseData responseHeaders, parameterizedQueryHash)
|
|
Nothing -> runLimits $ do
|
|
conclusion <- runExceptT $
|
|
forWithKey queryPlans $ \fieldName -> \case
|
|
E.ExecStepDB _headers exists remoteJoins -> doQErr $ do
|
|
(telemTimeIO_DT, resp) <-
|
|
AB.dispatchAnyBackend @BackendTransport
|
|
exists
|
|
\(EB.DBStepInfo _ sourceConfig genSql tx :: EB.DBStepInfo b) ->
|
|
runDBQuery @b
|
|
reqId
|
|
reqUnparsed
|
|
fieldName
|
|
userInfo
|
|
logger
|
|
sourceConfig
|
|
tx
|
|
genSql
|
|
finalResponse <-
|
|
RJ.processRemoteJoins reqId logger env httpManager reqHeaders userInfo resp remoteJoins reqUnparsed
|
|
pure $ ResultsFragment telemTimeIO_DT Telem.Local finalResponse []
|
|
E.ExecStepRemote rsi resultCustomizer gqlReq -> do
|
|
logQueryLog logger $ QueryLog reqUnparsed Nothing reqId QueryLogKindRemoteSchema
|
|
runRemoteGQ httpManager fieldName rsi resultCustomizer gqlReq
|
|
E.ExecStepAction aep _ remoteJoins -> do
|
|
logQueryLog logger $ QueryLog reqUnparsed Nothing reqId QueryLogKindAction
|
|
|
|
(time, (resp, _)) <- doQErr $ do
|
|
(time, (resp, hdrs)) <- EA.runActionExecution userInfo aep
|
|
finalResponse <-
|
|
RJ.processRemoteJoins reqId logger env httpManager reqHeaders userInfo resp remoteJoins reqUnparsed
|
|
pure (time, (finalResponse, hdrs))
|
|
pure $ ResultsFragment time Telem.Empty resp []
|
|
E.ExecStepRaw json -> do
|
|
logQueryLog logger $ QueryLog reqUnparsed Nothing reqId QueryLogKindIntrospection
|
|
buildRaw json
|
|
out@(_, _, _, HttpResponse responseData _, _) <-
|
|
buildResultFromFragments Telem.Query conclusion responseHeaders parameterizedQueryHash
|
|
Tracing.interpTraceT (liftEitherM . runExceptT) do
|
|
cacheStoreRes <- cacheStore cacheKey cachedDirective (snd responseData)
|
|
let headers = case cacheStoreRes of
|
|
-- Note: Warning header format: "Warning: <warn-code> <warn-agent> <warn-text> [warn-date]"
|
|
-- See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Warning
|
|
Right _ -> []
|
|
(Left CacheStoreLimitReached) -> [("warning", "199 - cache-store-size-limit-exceeded")]
|
|
(Left CacheStoreNotEnoughCapacity) -> [("warning", "199 - cache-store-capacity-exceeded")]
|
|
(Left (CacheStoreBackendError _)) -> [("warning", "199 - cache-store-error")]
|
|
in pure $ out & _4 %~ addHttpResponseHeaders headers
|
|
E.MutationExecutionPlan mutationPlans -> runLimits $ do
|
|
{- Note [Backwards-compatible transaction optimisation]
|
|
|
|
For backwards compatibility, we perform the following optimisation: if all mutation steps
|
|
are going to the same source, and that source is Postgres, we group all mutations as a
|
|
transaction. This is a somewhat dangerous beaviour, and we would prefer, in the future,
|
|
to make transactionality explicit rather than implicit and context-dependent.
|
|
-}
|
|
case coalescePostgresMutations mutationPlans of
|
|
-- we are in the aforementioned case; we circumvent the normal process
|
|
Just (sourceConfig, pgMutations) -> do
|
|
resp <-
|
|
runExceptT $
|
|
doQErr $
|
|
runPGMutationTransaction reqId reqUnparsed userInfo logger sourceConfig pgMutations
|
|
-- we do not construct result fragments since we have only one result
|
|
buildResult Telem.Mutation parameterizedQueryHash resp \(telemTimeIO_DT, results) ->
|
|
let responseData = Right $ encJToLBS $ encJFromInsOrdHashMap $ OMap.mapKeys G.unName results
|
|
in ( Telem.Mutation,
|
|
telemTimeIO_DT,
|
|
Telem.Local,
|
|
HttpResponse
|
|
(Just responseData, encodeGQResp responseData)
|
|
[],
|
|
parameterizedQueryHash
|
|
)
|
|
|
|
-- we are not in the transaction case; proceeding normally
|
|
Nothing -> do
|
|
conclusion <- runExceptT $
|
|
forWithKey mutationPlans $ \fieldName -> \case
|
|
E.ExecStepDB responseHeaders exists remoteJoins -> doQErr $ do
|
|
(telemTimeIO_DT, resp) <-
|
|
AB.dispatchAnyBackend @BackendTransport
|
|
exists
|
|
\(EB.DBStepInfo _ sourceConfig genSql tx :: EB.DBStepInfo b) ->
|
|
runDBMutation @b
|
|
reqId
|
|
reqUnparsed
|
|
fieldName
|
|
userInfo
|
|
logger
|
|
sourceConfig
|
|
tx
|
|
genSql
|
|
|
|
finalResponse <-
|
|
RJ.processRemoteJoins reqId logger env httpManager reqHeaders userInfo resp remoteJoins reqUnparsed
|
|
pure $ ResultsFragment telemTimeIO_DT Telem.Local finalResponse responseHeaders
|
|
E.ExecStepRemote rsi resultCustomizer gqlReq -> do
|
|
logQueryLog logger $ QueryLog reqUnparsed Nothing reqId QueryLogKindRemoteSchema
|
|
runRemoteGQ httpManager fieldName rsi resultCustomizer gqlReq
|
|
E.ExecStepAction aep _ remoteJoins -> do
|
|
logQueryLog logger $ QueryLog reqUnparsed Nothing reqId QueryLogKindAction
|
|
(time, (resp, hdrs)) <- doQErr $ do
|
|
(time, (resp, hdrs)) <- EA.runActionExecution userInfo aep
|
|
finalResponse <-
|
|
RJ.processRemoteJoins reqId logger env httpManager reqHeaders userInfo resp remoteJoins reqUnparsed
|
|
pure (time, (finalResponse, hdrs))
|
|
pure $ ResultsFragment time Telem.Empty resp $ fromMaybe [] hdrs
|
|
E.ExecStepRaw json -> do
|
|
logQueryLog logger $ QueryLog reqUnparsed Nothing reqId QueryLogKindIntrospection
|
|
buildRaw json
|
|
buildResultFromFragments Telem.Mutation conclusion [] parameterizedQueryHash
|
|
E.SubscriptionExecutionPlan _sub ->
|
|
throw400 UnexpectedPayload "subscriptions are not supported over HTTP, use websockets instead"
|
|
-- The response and misc telemetry data:
|
|
let telemTimeIO = convertDuration telemTimeIO_DT
|
|
telemTimeTot = convertDuration totalTime
|
|
telemTransport = Telem.HTTP
|
|
requestSize = LBS.length $ J.encode reqUnparsed
|
|
responseSize = LBS.length $ encJToLBS $ snd $ _hrBody resp
|
|
Telem.recordTimingMetric Telem.RequestDimensions {..} Telem.RequestTimings {..}
|
|
return (GQLQueryOperationSuccessLog reqUnparsed totalTime responseSize requestSize parameterizedQueryHash, resp)
|
|
where
|
|
getExecStepActionWithActionInfo acc execStep = case execStep of
|
|
EB.ExecStepAction _ actionInfo _remoteJoins -> (actionInfo : acc)
|
|
_ -> acc
|
|
|
|
doQErr = withExceptT Right
|
|
|
|
forWithKey = flip OMap.traverseWithKey
|
|
|
|
runRemoteGQ httpManager fieldName rsi resultCustomizer gqlReq = do
|
|
(telemTimeIO_DT, remoteResponseHeaders, resp) <-
|
|
doQErr $ E.execRemoteGQ env httpManager userInfo reqHeaders (rsDef rsi) gqlReq
|
|
value <- extractFieldFromResponse fieldName rsi resultCustomizer resp
|
|
let filteredHeaders = filter ((== "Set-Cookie") . fst) remoteResponseHeaders
|
|
pure $ ResultsFragment telemTimeIO_DT Telem.Remote (encJFromOrderedValue value) filteredHeaders
|
|
|
|
buildResultFromFragments ::
|
|
Telem.QueryType ->
|
|
Either (Either GQExecError QErr) (InsOrdHashMap G.Name ResultsFragment) ->
|
|
HTTP.ResponseHeaders ->
|
|
ParameterizedQueryHash ->
|
|
m
|
|
( Telem.QueryType,
|
|
DiffTime,
|
|
Telem.Locality,
|
|
HttpResponse (Maybe GQResponse, EncJSON),
|
|
ParameterizedQueryHash
|
|
)
|
|
buildResultFromFragments telemType fragments cacheHeaders parameterizedQueryHash =
|
|
buildResult telemType parameterizedQueryHash fragments \results ->
|
|
let responseData = Right $ encJToLBS $ encJFromInsOrdHashMap $ rfResponse <$> OMap.mapKeys G.unName results
|
|
in ( telemType,
|
|
sum (fmap rfTimeIO results),
|
|
foldMap rfLocality results,
|
|
HttpResponse
|
|
(Just responseData, encodeGQResp responseData)
|
|
(cacheHeaders <> foldMap rfHeaders results),
|
|
parameterizedQueryHash
|
|
)
|
|
|
|
buildResult ::
|
|
Telem.QueryType ->
|
|
ParameterizedQueryHash ->
|
|
Either (Either GQExecError QErr) a ->
|
|
( a ->
|
|
( Telem.QueryType,
|
|
DiffTime,
|
|
Telem.Locality,
|
|
HttpResponse (Maybe GQResponse, EncJSON),
|
|
ParameterizedQueryHash
|
|
)
|
|
) ->
|
|
m
|
|
( Telem.QueryType,
|
|
DiffTime,
|
|
Telem.Locality,
|
|
HttpResponse (Maybe GQResponse, EncJSON),
|
|
ParameterizedQueryHash
|
|
)
|
|
buildResult telemType parameterizedQueryHash result f = case result of
|
|
Right a -> pure $ f a
|
|
Left (Right err) -> throwError err
|
|
Left (Left err) ->
|
|
pure
|
|
( telemType,
|
|
0,
|
|
Telem.Remote,
|
|
HttpResponse
|
|
(Just (Left err), encodeGQResp $ Left err)
|
|
[],
|
|
parameterizedQueryHash
|
|
)
|
|
|
|
coalescePostgresMutations ::
|
|
EB.ExecutionPlan ->
|
|
Maybe
|
|
( SourceConfig ('Postgres 'Vanilla),
|
|
InsOrdHashMap G.Name (EB.DBStepInfo ('Postgres 'Vanilla))
|
|
)
|
|
coalescePostgresMutations plan = do
|
|
-- we extract the name and config of the first mutation root, if any
|
|
(oneSourceName, oneSourceConfig) <- case toList plan of
|
|
(E.ExecStepDB _ exists _remoteJoins : _) ->
|
|
AB.unpackAnyBackend @('Postgres 'Vanilla) exists <&> \dbsi ->
|
|
( EB.dbsiSourceName dbsi,
|
|
EB.dbsiSourceConfig dbsi
|
|
)
|
|
_ -> Nothing
|
|
-- we then test whether all mutations are going to that same first source
|
|
-- and that it is Postgres
|
|
mutations <- for plan \case
|
|
E.ExecStepDB _ exists remoteJoins -> do
|
|
dbStepInfo <- AB.unpackAnyBackend @('Postgres 'Vanilla) exists
|
|
guard $ oneSourceName == EB.dbsiSourceName dbStepInfo && isNothing remoteJoins
|
|
Just dbStepInfo
|
|
_ -> Nothing
|
|
Just (oneSourceConfig, mutations)
|
|
|
|
extractFieldFromResponse ::
|
|
forall m.
|
|
Monad m =>
|
|
G.Name ->
|
|
RemoteSchemaInfo ->
|
|
RemoteResultCustomizer ->
|
|
LBS.ByteString ->
|
|
ExceptT (Either GQExecError QErr) m JO.Value
|
|
extractFieldFromResponse fieldName rsi resultCustomizer resp = do
|
|
let namespace = fmap G.unName $ _rscNamespaceFieldName $ rsCustomizer rsi
|
|
fieldName' = G.unName fieldName
|
|
val <- onLeft (JO.eitherDecode resp) $ do400 . T.pack
|
|
valObj <- onLeft (JO.asObject val) do400
|
|
dataVal <-
|
|
applyRemoteResultCustomizer resultCustomizer <$> case JO.toList valObj of
|
|
[("data", v)] -> pure v
|
|
_ -> case JO.lookup "errors" valObj of
|
|
Just (JO.Array err) -> doGQExecError $ toList $ fmap JO.fromOrdered err
|
|
_ -> do400 "Received invalid JSON value from remote"
|
|
case namespace of
|
|
Just _ ->
|
|
-- If using a custom namespace field then the response from the remote server
|
|
-- will already be unwrapped so just return it.
|
|
return dataVal
|
|
_ -> do
|
|
-- No custom namespace so we need to look up the field name in the data
|
|
-- object.
|
|
dataObj <- onLeft (JO.asObject dataVal) do400
|
|
fieldVal <-
|
|
onNothing (JO.lookup fieldName' dataObj) $
|
|
do400 $ "expecting key " <> fieldName'
|
|
return fieldVal
|
|
where
|
|
do400 = withExceptT Right . throw400 RemoteSchemaError
|
|
doGQExecError = withExceptT Left . throwError . GQExecError
|
|
|
|
buildRaw :: Applicative m => JO.Value -> m ResultsFragment
|
|
buildRaw json = do
|
|
let obj = encJFromOrderedValue json
|
|
telemTimeIO_DT = 0
|
|
pure $ ResultsFragment telemTimeIO_DT Telem.Local obj []
|
|
|
|
-- | Run (execute) a batched GraphQL query (see 'GQLBatchedReqs').
|
|
runGQBatched ::
|
|
forall m.
|
|
( HasVersion,
|
|
MonadIO m,
|
|
MonadBaseControl IO m,
|
|
MonadError QErr m,
|
|
MonadReader E.ExecutionCtx m,
|
|
E.MonadGQLExecutionCheck m,
|
|
MonadQueryLog m,
|
|
MonadTrace m,
|
|
MonadExecuteQuery m,
|
|
HttpLog m,
|
|
MonadMetadataStorage (MetadataStorageT m),
|
|
EB.MonadQueryTags m,
|
|
HasResourceLimits m
|
|
) =>
|
|
Env.Environment ->
|
|
L.Logger L.Hasura ->
|
|
RequestId ->
|
|
ResponseInternalErrorsConfig ->
|
|
UserInfo ->
|
|
Wai.IpAddress ->
|
|
[HTTP.Header] ->
|
|
E.GraphQLQueryType ->
|
|
-- | the batched request with unparsed GraphQL query
|
|
GQLBatchedReqs (GQLReq GQLQueryText) ->
|
|
m (HttpLogMetadata m, HttpResponse EncJSON)
|
|
runGQBatched env logger reqId responseErrorsConfig userInfo ipAddress reqHdrs queryType query =
|
|
case query of
|
|
GQLSingleRequest req -> do
|
|
(gqlQueryOperationLog, httpResp) <- runGQ env logger reqId userInfo ipAddress reqHdrs queryType req
|
|
let httpLoggingMetadata = buildHttpLogMetadata @m (PQHSetSingleton (gqolParameterizedQueryHash gqlQueryOperationLog)) L.RequestModeSingle (Just (GQLSingleRequest (GQLQueryOperationSuccess gqlQueryOperationLog)))
|
|
pure (httpLoggingMetadata, snd <$> httpResp)
|
|
GQLBatchedReqs reqs -> do
|
|
-- It's unclear what we should do if we receive multiple
|
|
-- responses with distinct headers, so just do the simplest thing
|
|
-- in this case, and don't forward any.
|
|
let includeInternal = shouldIncludeInternal (_uiRole userInfo) responseErrorsConfig
|
|
removeHeaders =
|
|
flip HttpResponse []
|
|
. encJFromList
|
|
. map (either (encJFromJValue . encodeGQErr includeInternal) _hrBody)
|
|
responses <- traverse (\req -> fmap (req,) . try . (fmap . fmap . fmap) snd . runGQ env logger reqId userInfo ipAddress reqHdrs queryType $ req) reqs
|
|
let requestsOperationLogs = map fst $ rights $ map snd responses
|
|
batchOperationLogs =
|
|
map
|
|
( \(req, resp) ->
|
|
case resp of
|
|
Left err -> GQLQueryOperationError $ GQLQueryOperationErrorLog req err
|
|
Right (successOpLog, _) -> GQLQueryOperationSuccess successOpLog
|
|
)
|
|
responses
|
|
parameterizedQueryHashes = map gqolParameterizedQueryHash requestsOperationLogs
|
|
httpLoggingMetadata = buildHttpLogMetadata @m (PQHSetBatched parameterizedQueryHashes) L.RequestModeBatched (Just (GQLBatchedReqs batchOperationLogs))
|
|
pure (httpLoggingMetadata, removeHeaders (map ((fmap snd) . snd) responses))
|
|
where
|
|
try = flip catchError (pure . Left) . fmap Right
|