mirror of
https://github.com/hasura/graphql-engine.git
synced 2025-01-07 08:13:18 +03:00
fbd1262ea6
### Description
This PR implements operation timeouts, as specced in #1232.
RFC: [rfcs/operation-timeout-api-limits.md](c025a90fe9/rfcs/operation-timeout-api-limits.md
)
There's still some things to be done (tests and docs most notably), but apart from that it can
be reviewed. I'd still appreciate feedback on the RFC!
TODO:
- [x] break out the `ApiLimits` refactoring into a separate PR: #2103
- [x] finish the `pg-client-hs` PR: https://github.com/hasura/pg-client-hs/pull/39
- [x] remove configurability, after testing, prior to merging
- [ ] tests: #2390 has some tests that I've run locally to confirm things work on a fundamental level
- [x] changelog
- [x] documentation
- [x] fill in the detailed PR checklist
### Changelog
- [x] `CHANGELOG.md` is updated with user-facing content relevant to this PR. If no changelog is required, then add the `no-changelog-required` label.
### Affected components
- [x] Server
- [ ] Console
- [ ] CLI
- [x] Docs
- [ ] Tests
### Related Issues
Product spec: #1232.
### Solution and Design
Compare `rfcs/operation-timeout-api-limits.md`.
### Steps to test and verify
Configure operation timeouts, e.g. by posting
```
{
"type": "set_api_limits",
"args": {
"operation_timeout": {
"global": 3
}
}
}
```
to `v1/metadata` to set an operation timeout of 3s. Then verify that
1. non-admin queries that take longer than 3s time out with a nice error message
2. that those queries return after ~3s (at least for postgres)
3. also that everything else still works as usual
### Limitations, known bugs & workarounds
- while this will cause slow queries against any backends to fail, it's only verified to actually interrupt queries against postgres
- this will only successfully short-cut (cancel) queries to postgres if the database server is responsive
#### Catalog upgrade
Does this PR change Hasura Catalog version?
- [x] No
#### Metadata
Does this PR add a new Metadata feature?
- [x] Yes
- Does `run_sql` auto manages the new metadata through schema diffing?
- [x] Not required
- Does `run_sql` auto manages the definitions of metadata on renaming?
- [x] Not required
- Does `export_metadata`/`replace_metadata` supports the new metadata added?
- [x] Yes
#### GraphQL
- [x] No new GraphQL schema is generated
#### Breaking changes
- [x] No Breaking changes
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/1593
GitOrigin-RevId: f0582d0be3ed9fadf89e0c4aaf96344d18331dc4
169 lines
5.1 KiB
Haskell
169 lines
5.1 KiB
Haskell
module Hasura.GraphQL.Transport.WSServerApp
|
|
( createWSServerApp,
|
|
stopWSServerApp,
|
|
createWSServerEnv,
|
|
)
|
|
where
|
|
|
|
import Control.Concurrent.Async.Lifted.Safe qualified as LA
|
|
import Control.Concurrent.STM qualified as STM
|
|
import Control.Exception.Lifted
|
|
import Control.Monad.Trans.Control qualified as MC
|
|
import Data.Aeson (toJSON)
|
|
import Data.ByteString.Char8 qualified as B (pack)
|
|
import Data.Environment qualified as Env
|
|
import Data.Text (pack, unpack)
|
|
import Hasura.GraphQL.Execute qualified as E
|
|
import Hasura.GraphQL.Execute.Backend qualified as EB
|
|
import Hasura.GraphQL.Execute.LiveQuery.State qualified as LQ
|
|
import Hasura.GraphQL.Logging
|
|
import Hasura.GraphQL.Transport.HTTP (MonadExecuteQuery)
|
|
import Hasura.GraphQL.Transport.Instances ()
|
|
import Hasura.GraphQL.Transport.WebSocket
|
|
import Hasura.GraphQL.Transport.WebSocket.Protocol
|
|
import Hasura.GraphQL.Transport.WebSocket.Server qualified as WS
|
|
import Hasura.GraphQL.Transport.WebSocket.Types
|
|
import Hasura.Logging qualified as L
|
|
import Hasura.Metadata.Class
|
|
import Hasura.Prelude
|
|
import Hasura.RQL.Types
|
|
import Hasura.Server.Auth (AuthMode, UserAuthentication)
|
|
import Hasura.Server.Cors
|
|
import Hasura.Server.Init.Config
|
|
( KeepAliveDelay,
|
|
WSConnectionInitTimeout,
|
|
)
|
|
import Hasura.Server.Limits
|
|
import Hasura.Server.Metrics (ServerMetrics (..))
|
|
import Hasura.Server.Version (HasVersion)
|
|
import Hasura.Tracing qualified as Tracing
|
|
import Network.HTTP.Client qualified as H
|
|
import Network.WebSockets qualified as WS
|
|
import System.Metrics.Gauge qualified as EKG.Gauge
|
|
|
|
createWSServerApp ::
|
|
( HasVersion,
|
|
MonadIO m,
|
|
MC.MonadBaseControl IO m,
|
|
LA.Forall (LA.Pure m),
|
|
UserAuthentication (Tracing.TraceT m),
|
|
E.MonadGQLExecutionCheck m,
|
|
WS.MonadWSLog m,
|
|
MonadQueryLog m,
|
|
Tracing.HasReporter m,
|
|
MonadExecuteQuery m,
|
|
MonadMetadataStorage (MetadataStorageT m),
|
|
EB.MonadQueryTags m,
|
|
HasResourceLimits m
|
|
) =>
|
|
Env.Environment ->
|
|
HashSet (L.EngineLogType L.Hasura) ->
|
|
AuthMode ->
|
|
WSServerEnv ->
|
|
WSConnectionInitTimeout ->
|
|
WS.HasuraServerApp m
|
|
-- -- ^ aka generalized 'WS.ServerApp'
|
|
createWSServerApp env enabledLogTypes authMode serverEnv connInitTimeout = \ !ipAddress !pendingConn ->
|
|
WS.createServerApp connInitTimeout (_wseServer serverEnv) handlers ipAddress pendingConn
|
|
where
|
|
handlers =
|
|
WS.WSHandlers
|
|
onConnHandler
|
|
onMessageHandler
|
|
onCloseHandler
|
|
|
|
logger = _wseLogger serverEnv
|
|
serverMetrics = _wseServerMetrics serverEnv
|
|
|
|
wsActions = mkWSActions logger
|
|
|
|
-- Mask async exceptions during event processing to help maintain integrity of mutable vars:
|
|
-- here `sp` stands for sub-protocol
|
|
onConnHandler rid rh ip sp = mask_ do
|
|
liftIO $ EKG.Gauge.inc $ smWebsocketConnections serverMetrics
|
|
flip runReaderT serverEnv $ onConn rid rh ip (wsActions sp)
|
|
|
|
onMessageHandler conn bs sp =
|
|
mask_ $
|
|
onMessage env enabledLogTypes authMode serverEnv conn bs (wsActions sp)
|
|
|
|
onCloseHandler conn = mask_ do
|
|
liftIO $ EKG.Gauge.dec $ smWebsocketConnections serverMetrics
|
|
onClose logger serverMetrics (_wseLiveQMap serverEnv) conn
|
|
|
|
stopWSServerApp :: WSServerEnv -> IO ()
|
|
stopWSServerApp wsEnv = WS.shutdown (_wseServer wsEnv)
|
|
|
|
createWSServerEnv ::
|
|
(MonadIO m) =>
|
|
L.Logger L.Hasura ->
|
|
LQ.LiveQueriesState ->
|
|
IO (SchemaCache, SchemaCacheVer) ->
|
|
H.Manager ->
|
|
CorsPolicy ->
|
|
SQLGenCtx ->
|
|
Bool ->
|
|
KeepAliveDelay ->
|
|
ServerMetrics ->
|
|
m WSServerEnv
|
|
createWSServerEnv
|
|
logger
|
|
lqState
|
|
getSchemaCache
|
|
httpManager
|
|
corsPolicy
|
|
sqlGenCtx
|
|
enableAL
|
|
keepAliveDelay
|
|
serverMetrics = do
|
|
wsServer <- liftIO $ STM.atomically $ WS.createWSServer logger
|
|
pure $
|
|
WSServerEnv
|
|
logger
|
|
lqState
|
|
getSchemaCache
|
|
httpManager
|
|
corsPolicy
|
|
sqlGenCtx
|
|
wsServer
|
|
enableAL
|
|
keepAliveDelay
|
|
serverMetrics
|
|
|
|
mkWSActions :: L.Logger L.Hasura -> WSSubProtocol -> WS.WSActions WSConnData
|
|
mkWSActions logger subProtocol =
|
|
WS.WSActions
|
|
mkPostExecErrMessageAction
|
|
mkOnErrorMessageAction
|
|
mkConnectionCloseAction
|
|
keepAliveAction
|
|
getServerMsgType
|
|
mkAcceptRequest
|
|
where
|
|
mkPostExecErrMessageAction wsConn opId execErr =
|
|
sendMsg wsConn $ case subProtocol of
|
|
Apollo -> SMData $ DataMsg opId $ throwError execErr
|
|
GraphQLWS -> SMErr $ ErrorMsg opId $ toJSON execErr
|
|
|
|
mkOnErrorMessageAction wsConn err mErrMsg = case subProtocol of
|
|
Apollo -> sendMsg wsConn $ SMConnErr err
|
|
GraphQLWS -> sendCloseWithMsg logger wsConn (GenericError4400 $ (fromMaybe "" mErrMsg) <> (unpack . unConnErrMsg $ err)) Nothing
|
|
|
|
mkConnectionCloseAction wsConn opId errMsg =
|
|
when (subProtocol == GraphQLWS) $
|
|
sendCloseWithMsg logger wsConn (GenericError4400 errMsg) (Just . SMErr $ ErrorMsg opId $ toJSON (pack errMsg))
|
|
|
|
getServerMsgType = case subProtocol of
|
|
Apollo -> SMData
|
|
GraphQLWS -> SMNext
|
|
|
|
keepAliveAction wsConn = sendMsg wsConn $
|
|
case subProtocol of
|
|
Apollo -> SMConnKeepAlive
|
|
GraphQLWS -> SMPing . Just $ keepAliveMessage
|
|
|
|
mkAcceptRequest =
|
|
WS.defaultAcceptRequest
|
|
{ WS.acceptSubprotocol = Just . B.pack . showSubProtocol $ subProtocol
|
|
}
|