mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-15 09:22:43 +03:00
server: support caching when forwarding client headers
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/9021 Co-authored-by: Anon Ray <616387+ecthiender@users.noreply.github.com> GitOrigin-RevId: d82eaee50d6bb5bbb2c01f2900875ba8ab73a1b0
This commit is contained in:
parent
8f4692d871
commit
a6eaeceabe
@ -106,7 +106,7 @@ import Hasura.GraphQL.Execute.Subscription.Poll qualified as ES
|
||||
import Hasura.GraphQL.Execute.Subscription.State qualified as ES
|
||||
import Hasura.GraphQL.Logging (MonadExecutionLog (..), MonadQueryLog (..))
|
||||
import Hasura.GraphQL.Transport.HTTP
|
||||
( CacheStoreSuccess (CacheStoreSkipped),
|
||||
( CacheResult (..),
|
||||
MonadExecuteQuery (..),
|
||||
)
|
||||
import Hasura.GraphQL.Transport.HTTP.Protocol (toParsed)
|
||||
@ -697,8 +697,7 @@ instance HttpLog AppM where
|
||||
mkHttpAccessLogContext userInfoM loggingSettings reqId waiReq reqBody (BL.length response) compressedResponse qTime cType headers rb batchQueryOpLogs
|
||||
|
||||
instance MonadExecuteQuery AppM where
|
||||
cacheLookup _ _ _ _ = pure $ Right ([], Nothing)
|
||||
cacheStore _ _ _ = pure $ Right (Right CacheStoreSkipped)
|
||||
cacheLookup _ _ _ _ _ _ = pure $ Right ([], ResponseUncached Nothing)
|
||||
|
||||
instance UserAuthentication AppM where
|
||||
resolveUserInfo logger manager headers authMode reqs =
|
||||
|
@ -1,8 +1,9 @@
|
||||
-- | Execution of GraphQL queries over HTTP transport
|
||||
module Hasura.GraphQL.Transport.HTTP
|
||||
( QueryCacheKey (..),
|
||||
MonadExecuteQuery (..),
|
||||
( MonadExecuteQuery (..),
|
||||
CacheResult (..),
|
||||
CachedDirective (..),
|
||||
ResponseCacher (..),
|
||||
runGQ,
|
||||
runGQBatched,
|
||||
coalescePostgresMutations,
|
||||
@ -19,9 +20,7 @@ module Hasura.GraphQL.Transport.HTTP
|
||||
OperationName (..),
|
||||
GQLQueryText (..),
|
||||
AnnotatedResponsePart (..),
|
||||
CacheStoreSuccess (..),
|
||||
CacheStoreFailure (..),
|
||||
CacheStoreResponse,
|
||||
CacheStoreResponse (..),
|
||||
SessVarPred,
|
||||
filterVariablesFromQuery,
|
||||
runSessVarPred,
|
||||
@ -70,12 +69,10 @@ import Hasura.Metadata.Class
|
||||
import Hasura.Prelude
|
||||
import Hasura.QueryTags
|
||||
import Hasura.RQL.IR
|
||||
import Hasura.RQL.Types.Action
|
||||
import Hasura.RQL.Types.Backend
|
||||
import Hasura.RQL.Types.BackendType
|
||||
import Hasura.RQL.Types.Common
|
||||
import Hasura.RQL.Types.ResultCustomization
|
||||
import Hasura.RQL.Types.Roles (RoleName)
|
||||
import Hasura.RQL.Types.SchemaCache
|
||||
import Hasura.RemoteSchema.SchemaCache
|
||||
import Hasura.SQL.AnyBackend qualified as AB
|
||||
@ -92,35 +89,31 @@ import Hasura.Server.Telemetry.Counters qualified as Telem
|
||||
import Hasura.Server.Types (ReadOnlyMode (..), RequestId (..))
|
||||
import Hasura.Services
|
||||
import Hasura.Session (SessionVariable, SessionVariableValue, SessionVariables, UserInfo (..), filterSessionVariables)
|
||||
import Hasura.Tracing (MonadTrace, TraceT, attachMetadata)
|
||||
import Hasura.Tracing (MonadTrace, attachMetadata)
|
||||
import Language.GraphQL.Draft.Syntax qualified as G
|
||||
import Network.HTTP.Types qualified as HTTP
|
||||
import Network.Wai.Extended qualified as Wai
|
||||
import System.Metrics.Prometheus.Counter qualified as Prometheus.Counter
|
||||
import System.Metrics.Prometheus.Histogram qualified as Prometheus.Histogram
|
||||
|
||||
data QueryCacheKey = QueryCacheKey
|
||||
{ qckQueryString :: !GQLReqParsed,
|
||||
qckUserRole :: !RoleName,
|
||||
qckSession :: !SessionVariables
|
||||
}
|
||||
-- | Encapsulates a function that stores a query response in the cache.
|
||||
-- `cacheLookup` decides when such an invitation to store is generated.
|
||||
newtype ResponseCacher = ResponseCacher {runStoreResponse :: forall m. (MonadTrace m, MonadIO m) => EncJSON -> m (Either QErr CacheStoreResponse)}
|
||||
|
||||
instance J.ToJSON QueryCacheKey where
|
||||
toJSON (QueryCacheKey qs ur sess) =
|
||||
J.object ["query_string" J..= qs, "user_role" J..= ur, "session" J..= sess]
|
||||
|
||||
type CacheStoreResponse = Either CacheStoreFailure CacheStoreSuccess
|
||||
|
||||
data CacheStoreSuccess
|
||||
= CacheStoreSkipped
|
||||
| CacheStoreHit
|
||||
deriving (Eq, Show)
|
||||
|
||||
data CacheStoreFailure
|
||||
= CacheStoreLimitReached
|
||||
data CacheStoreResponse
|
||||
= -- | Cache storage is unconditional, just
|
||||
-- not always available.
|
||||
CacheStoreSuccess
|
||||
| CacheStoreLimitReached
|
||||
| CacheStoreNotEnoughCapacity
|
||||
| CacheStoreBackendError String
|
||||
deriving (Eq, Show)
|
||||
|
||||
data CacheResult
|
||||
= -- | We have a cached response for this query
|
||||
ResponseCached EncJSON
|
||||
| -- | We don't have a cached response. The `ResponseCacher` can be used to
|
||||
-- store the response in the cache after a fresh execution.
|
||||
ResponseUncached (Maybe ResponseCacher)
|
||||
|
||||
class Monad m => MonadExecuteQuery m where
|
||||
-- | This method does two things: it looks up a query result in the
|
||||
@ -128,62 +121,39 @@ class Monad m => MonadExecuteQuery m where
|
||||
-- headers that can instruct a client how long a response can be cached
|
||||
-- locally (i.e. client-side).
|
||||
cacheLookup ::
|
||||
-- | Used to check if the elaborated query supports caching
|
||||
[RemoteSchemaInfo] ->
|
||||
-- | Used to check if actions query supports caching (unsupported if `forward_client_headers` is set)
|
||||
[ActionsInfo] ->
|
||||
-- | Key that uniquely identifies the result of a query execution
|
||||
QueryCacheKey ->
|
||||
-- | Cached Directive from GraphQL query AST
|
||||
-- | How we _would've_ executed the query. Ideally we'd use this as a
|
||||
-- caching key, but it's not serializable... [cont'd]
|
||||
EB.ExecutionPlan ->
|
||||
-- | Somewhat less processed plan of how we _would've_ executed the query.
|
||||
[QueryRootField UnpreparedValue] ->
|
||||
-- | `@cached` directive from the query AST
|
||||
Maybe CachedDirective ->
|
||||
-- | HTTP headers to be sent back to the caller for this GraphQL request,
|
||||
-- containing e.g. time-to-live information, and a cached value if found and
|
||||
-- within time-to-live. So a return value (non-empty-ttl-headers, Nothing)
|
||||
-- represents that we don't have a server-side cache of the query, but that
|
||||
-- the client should store it locally. The value ([], Just json) represents
|
||||
-- that the client should not store the response locally, but we do have a
|
||||
-- server-side cache value that can be used to avoid query execution.
|
||||
m (Either QErr (HTTP.ResponseHeaders, Maybe EncJSON))
|
||||
|
||||
-- | Store a json response for a query that we've executed in the cache. Note
|
||||
-- that, as part of this, 'cacheStore' has to decide whether the response is
|
||||
-- cacheable. A very similar decision is also made in 'cacheLookup', since it
|
||||
-- has to construct corresponding cache-enabling headers that are sent to the
|
||||
-- client. But note that the HTTP headers influence client-side caching,
|
||||
-- whereas 'cacheStore' changes the server-side cache.
|
||||
cacheStore ::
|
||||
-- | Key under which to store the result of a query execution
|
||||
QueryCacheKey ->
|
||||
-- | Cached Directive from GraphQL query AST
|
||||
Maybe CachedDirective ->
|
||||
-- | Result of a query execution
|
||||
EncJSON ->
|
||||
-- | Always succeeds
|
||||
m (Either QErr CacheStoreResponse)
|
||||
|
||||
-- | [cont'd] ... which is why we additionally pass serializable structures
|
||||
-- from earlier in the query processing pipeline. This includes the query
|
||||
-- AST, which additionally specifies the `@cached` directive with TTL info...
|
||||
GQLReqParsed ->
|
||||
-- | ... and the `UserInfo`
|
||||
UserInfo ->
|
||||
-- | Used for remote schemas and actions
|
||||
[HTTP.Header] ->
|
||||
-- | Non-empty response headers instruct the client to store the response
|
||||
-- locally.
|
||||
m (Either QErr (HTTP.ResponseHeaders, CacheResult))
|
||||
default cacheLookup ::
|
||||
(m ~ t n, MonadTrans t, MonadExecuteQuery n) =>
|
||||
[RemoteSchemaInfo] ->
|
||||
[ActionsInfo] ->
|
||||
QueryCacheKey ->
|
||||
EB.ExecutionPlan ->
|
||||
[QueryRootField UnpreparedValue] ->
|
||||
Maybe CachedDirective ->
|
||||
m (Either QErr (HTTP.ResponseHeaders, Maybe EncJSON))
|
||||
cacheLookup a b c d = lift $ cacheLookup a b c d
|
||||
|
||||
default cacheStore ::
|
||||
(m ~ t n, MonadTrans t, MonadExecuteQuery n) =>
|
||||
QueryCacheKey ->
|
||||
Maybe CachedDirective ->
|
||||
EncJSON ->
|
||||
m (Either QErr CacheStoreResponse)
|
||||
cacheStore a b c = lift $ cacheStore a b c
|
||||
GQLReqParsed ->
|
||||
UserInfo ->
|
||||
[HTTP.Header] ->
|
||||
m (Either QErr (HTTP.ResponseHeaders, CacheResult))
|
||||
cacheLookup a b c d e f = lift $ cacheLookup a b c d e f
|
||||
|
||||
instance MonadExecuteQuery m => MonadExecuteQuery (ReaderT r m)
|
||||
|
||||
instance MonadExecuteQuery m => MonadExecuteQuery (ExceptT e m)
|
||||
|
||||
instance (MonadExecuteQuery m, MonadIO m) => MonadExecuteQuery (TraceT m)
|
||||
|
||||
-- | A partial response, e.g. from a remote schema call or postgres
|
||||
-- postgres query, which we'll assemble into the final response for
|
||||
-- the client. It is annotated with timing metadata.
|
||||
@ -410,24 +380,22 @@ runGQ env sqlGenCtx sc scVer enableAL readOnlyMode prometheusMetrics logger agen
|
||||
E.QueryExecutionPlan queryPlans asts dirMap -> do
|
||||
-- https://opentelemetry.io/docs/reference/specification/trace/semantic_conventions/instrumentation/graphql/
|
||||
attachMetadata [("graphql.operation.type", "query")]
|
||||
let cachedDirective = runIdentity <$> DM.lookup cached dirMap
|
||||
-- Attempt to lookup a cached response in the query cache.
|
||||
-- 'keyedLookup' is a monadic action possibly returning a cache hit.
|
||||
-- 'keyedStore' is a function to write a new response to the cache.
|
||||
let (keyedLookup, keyedStore) = cacheAccess reqParsed queryPlans asts dirMap
|
||||
(cachingHeaders, cachedValue) <- keyedLookup
|
||||
case fmap decodeGQResp cachedValue of
|
||||
(cachingHeaders, cachedValue) <- liftEitherM $ cacheLookup queryPlans asts cachedDirective reqParsed userInfo reqHeaders
|
||||
case cachedValue of
|
||||
-- If we get a cache hit, annotate the response with metadata and return it.
|
||||
Just cachedResponseData -> do
|
||||
ResponseCached cachedResponseData -> do
|
||||
logQueryLog logger $ QueryLog reqUnparsed Nothing reqId QueryLogKindCached
|
||||
pure $
|
||||
AnnotatedResponse
|
||||
{ arQueryType = Telem.Query,
|
||||
arTimeIO = 0,
|
||||
arLocality = Telem.Local,
|
||||
arResponse = HttpResponse cachedResponseData cachingHeaders
|
||||
arResponse = HttpResponse (decodeGQResp cachedResponseData) cachingHeaders
|
||||
}
|
||||
-- If we get a cache miss, we must run the query against the graphql engine.
|
||||
Nothing -> runLimits $ do
|
||||
ResponseUncached storeResponseM -> runLimits $ do
|
||||
-- 1. 'traverse' the 'ExecutionPlan' executing every step.
|
||||
-- TODO: can this be a `catch` rather than a `runExceptT`?
|
||||
conclusion <- runExceptT $ forWithKey queryPlans executeQueryStep
|
||||
@ -435,14 +403,24 @@ runGQ env sqlGenCtx sc scVer enableAL readOnlyMode prometheusMetrics logger agen
|
||||
result <- buildResponseFromParts Telem.Query conclusion
|
||||
let response@(HttpResponse responseData _) = arResponse result
|
||||
-- 3. Cache the 'AnnotatedResponse'.
|
||||
cacheStoreRes <- keyedStore (snd responseData)
|
||||
case storeResponseM of
|
||||
-- No caching intended
|
||||
Nothing ->
|
||||
-- TODO: we probably don't want to use `cachingHeaders` here.
|
||||
-- If no caching was intended, then we shouldn't instruct the
|
||||
-- client to cache, either. The only reason we're passing
|
||||
-- headers here is to avoid breaking changes.
|
||||
pure $ result {arResponse = addHttpResponseHeaders cachingHeaders response}
|
||||
-- Caching intended; store result and instruct client through HTTP headers
|
||||
Just ResponseCacher {..} -> do
|
||||
cacheStoreRes <- liftEitherM $ runStoreResponse (snd responseData)
|
||||
let headers = case cacheStoreRes of
|
||||
-- Note: Warning header format: "Warning: <warn-code> <warn-agent> <warn-text> [warn-date]"
|
||||
-- See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Warning
|
||||
Right _ -> cachingHeaders
|
||||
(Left CacheStoreLimitReached) -> [("warning", "199 - cache-store-size-limit-exceeded")]
|
||||
(Left CacheStoreNotEnoughCapacity) -> [("warning", "199 - cache-store-capacity-exceeded")]
|
||||
(Left (CacheStoreBackendError _)) -> [("warning", "199 - cache-store-error")]
|
||||
CacheStoreSuccess -> cachingHeaders
|
||||
CacheStoreLimitReached -> [("warning", "199 - cache-store-size-limit-exceeded")]
|
||||
CacheStoreNotEnoughCapacity -> [("warning", "199 - cache-store-capacity-exceeded")]
|
||||
CacheStoreBackendError _ -> [("warning", "199 - cache-store-error")]
|
||||
in -- 4. Return the response.
|
||||
pure $ result {arResponse = addHttpResponseHeaders headers response}
|
||||
E.MutationExecutionPlan mutationPlans -> runLimits $ do
|
||||
@ -570,43 +548,6 @@ runGQ env sqlGenCtx sc scVer enableAL readOnlyMode prometheusMetrics logger agen
|
||||
let filteredHeaders = filter ((== "Set-Cookie") . fst) remoteResponseHeaders
|
||||
pure $ AnnotatedResponsePart telemTimeIO_DT Telem.Remote finalResponse filteredHeaders
|
||||
|
||||
cacheAccess ::
|
||||
GQLReqParsed ->
|
||||
EB.ExecutionPlan ->
|
||||
[QueryRootField UnpreparedValue] ->
|
||||
DirectiveMap ->
|
||||
( m (HTTP.ResponseHeaders, Maybe EncJSON),
|
||||
EncJSON -> m CacheStoreResponse
|
||||
)
|
||||
cacheAccess reqParsed queryPlans asts dirMap =
|
||||
let filteredSessionVars = runSessVarPred (filterVariablesFromQuery asts) (_uiSession userInfo)
|
||||
collectRemoteJoins = maybe [] (map RJ._rsjRemoteSchema . RJ.getRemoteSchemaJoins)
|
||||
remoteSchemas =
|
||||
InsOrdHashMap.elems queryPlans >>= \case
|
||||
E.ExecStepDB _headers _dbAST remoteJoins ->
|
||||
collectRemoteJoins remoteJoins
|
||||
E.ExecStepRemote remoteSchemaInfo _ _ remoteJoins ->
|
||||
[remoteSchemaInfo] <> collectRemoteJoins remoteJoins
|
||||
E.ExecStepAction _ _ remoteJoins -> collectRemoteJoins remoteJoins
|
||||
_ -> []
|
||||
getExecStepActionWithActionInfo acc execStep = case execStep of
|
||||
E.ExecStepAction _ actionInfo _remoteJoins -> (actionInfo : acc)
|
||||
_ -> acc
|
||||
actionsInfo =
|
||||
foldl getExecStepActionWithActionInfo [] $
|
||||
InsOrdHashMap.elems $
|
||||
InsOrdHashMap.filter
|
||||
( \case
|
||||
E.ExecStepAction _ _ _remoteJoins -> True
|
||||
_ -> False
|
||||
)
|
||||
queryPlans
|
||||
cacheKey = QueryCacheKey reqParsed (_uiRole userInfo) filteredSessionVars
|
||||
cachedDirective = runIdentity <$> DM.lookup cached dirMap
|
||||
in ( liftEitherM $ cacheLookup remoteSchemas actionsInfo cacheKey cachedDirective,
|
||||
liftEitherM . cacheStore cacheKey cachedDirective
|
||||
)
|
||||
|
||||
recordTimings :: DiffTime -> AnnotatedResponse -> m ()
|
||||
recordTimings totalTime result = do
|
||||
Telem.recordTimingMetric
|
||||
|
@ -498,42 +498,21 @@ onStart enabledLogTypes agentLicenseKey serverEnv wsConn shouldCaptureVariables
|
||||
E.QueryExecutionPlan queryPlan asts dirMap -> do
|
||||
-- https://opentelemetry.io/docs/reference/specification/trace/semantic_conventions/instrumentation/graphql/
|
||||
Tracing.attachMetadata [("graphql.operation.type", "query")]
|
||||
let filteredSessionVars = runSessVarPred (filterVariablesFromQuery asts) (_uiSession userInfo)
|
||||
cacheKey = QueryCacheKey reqParsed (_uiRole userInfo) filteredSessionVars
|
||||
collectRemoteJoins = maybe [] (map RJ._rsjRemoteSchema . RJ.getRemoteSchemaJoins)
|
||||
remoteSchemas =
|
||||
InsOrdHashMap.elems queryPlan >>= \case
|
||||
E.ExecStepDB _headers _dbAST remoteJoins ->
|
||||
collectRemoteJoins remoteJoins
|
||||
E.ExecStepRemote remoteSchemaInfo _ _ remoteJoins ->
|
||||
[remoteSchemaInfo] <> collectRemoteJoins remoteJoins
|
||||
E.ExecStepAction _ _ remoteJoins -> collectRemoteJoins remoteJoins
|
||||
_ -> []
|
||||
|
||||
actionsInfo =
|
||||
foldl getExecStepActionWithActionInfo [] $
|
||||
InsOrdHashMap.elems $
|
||||
InsOrdHashMap.filter
|
||||
( \case
|
||||
E.ExecStepAction _ _ _remoteJoins -> True
|
||||
_ -> False
|
||||
)
|
||||
queryPlan
|
||||
cachedDirective = runIdentity <$> DM.lookup cached dirMap
|
||||
let cachedDirective = runIdentity <$> DM.lookup cached dirMap
|
||||
|
||||
-- We ignore the response headers (containing TTL information) because
|
||||
-- WebSockets don't support them.
|
||||
cachedValue <-
|
||||
cacheLookup remoteSchemas actionsInfo cacheKey cachedDirective >>= \case
|
||||
cacheLookup queryPlan asts cachedDirective reqParsed userInfo reqHdrs >>= \case
|
||||
Right (_responseHeaders, cachedValue) -> pure cachedValue
|
||||
Left _err -> throwError ()
|
||||
case cachedValue of
|
||||
Just cachedResponseData -> do
|
||||
ResponseCached cachedResponseData -> do
|
||||
logQueryLog logger $ QueryLog q Nothing requestId QueryLogKindCached
|
||||
let reportedExecutionTime = 0
|
||||
liftIO $ recordGQLQuerySuccess reportedExecutionTime gqlOpType
|
||||
sendSuccResp cachedResponseData opName parameterizedQueryHash $ ES.SubscriptionMetadata reportedExecutionTime
|
||||
Nothing -> do
|
||||
ResponseUncached storeResponseM -> do
|
||||
conclusion <- runExceptT $
|
||||
runLimits $
|
||||
forWithKey queryPlan $ \fieldName ->
|
||||
@ -576,14 +555,14 @@ onStart enabledLogTypes agentLicenseKey serverEnv wsConn shouldCaptureVariables
|
||||
pure $ AnnotatedResponsePart 0 Telem.Local (encJFromList (map arpResponse allResponses)) []
|
||||
in getResponse
|
||||
sendResultFromFragments Telem.Query timerTot requestId conclusion opName parameterizedQueryHash gqlOpType
|
||||
case conclusion of
|
||||
Left _ -> pure ()
|
||||
Right results ->
|
||||
-- Note: The result of cacheStore is ignored here since we can't ensure that
|
||||
case (storeResponseM, conclusion) of
|
||||
(Just ResponseCacher {..}, Right results) ->
|
||||
-- Note: The result of `runStoreResponse` is ignored here since we can't ensure that
|
||||
-- the WS client will respond correctly to multiple messages.
|
||||
void $
|
||||
cacheStore cacheKey cachedDirective $
|
||||
runStoreResponse $
|
||||
encodeAnnotatedResponseParts results
|
||||
_ -> pure ()
|
||||
|
||||
liftIO $ sendCompleted (Just requestId) (Just parameterizedQueryHash)
|
||||
E.MutationExecutionPlan mutationPlan -> do
|
||||
@ -737,10 +716,6 @@ onStart enabledLogTypes agentLicenseKey serverEnv wsConn shouldCaptureVariables
|
||||
postExecErrAction = WS._wsaPostExecErrMessageAction onMessageActions
|
||||
fmtErrorMessage = WS._wsaErrorMsgFormat onMessageActions
|
||||
|
||||
getExecStepActionWithActionInfo acc execStep = case execStep of
|
||||
E.ExecStepAction _ actionInfo _remoteJoins -> actionInfo : acc
|
||||
_ -> acc
|
||||
|
||||
doQErr ::
|
||||
Monad n =>
|
||||
ExceptT QErr n a ->
|
||||
|
Loading…
Reference in New Issue
Block a user