server: configurable websocket keep alive interval (#6092)

Accept new server flag --websocket-keepalive to control
websockets keep-alive interval

Co-authored-by: Auke Booij <auke@hasura.io>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
This commit is contained in:
Sasha Bogicevic 2020-11-03 18:04:48 +01:00 committed by GitHub
parent fd8d51a37a
commit 81e836a12c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 72 additions and 35 deletions

View File

@ -120,6 +120,8 @@ This release contains the [PDV refactor (#4111)](https://github.com/hasura/graph
- server: accept only non-negative integers for batch size and refetch interval (close #5653) (#5759) - server: accept only non-negative integers for batch size and refetch interval (close #5653) (#5759)
- server: fix bug which arised when renaming a table which had a manual relationship defined (close #4158) - server: fix bug which arised when renaming a table which had a manual relationship defined (close #4158)
- server: limit the length of event trigger names (close #5786) - server: limit the length of event trigger names (close #5786)
- server: Configurable websocket keep-alive interval. Add `--websocket-keepalive` command-line flag
and handle `HASURA_GRAPHQL_WEBSOCKET_KEEPALIVE` env variable (fix #3539)
**NOTE:** If you have event triggers with names greater than 42 chars, then you should update their names to avoid running into Postgres identifier limit bug (#5786) **NOTE:** If you have event triggers with names greater than 42 chars, then you should update their names to avoid running into Postgres identifier limit bug (#5786)
- server: validate remote schema queries (fixes #4143) - server: validate remote schema queries (fixes #4143)
- server: fix issue with tracking custom functions that return `SETOF` materialized view (close #5294) (#5945) - server: fix issue with tracking custom functions that return `SETOF` materialized view (close #5294) (#5945)

View File

@ -339,7 +339,6 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp pos
_idleGCThread <- C.forkImmortal "ourIdleGC" logger $ liftIO $ _idleGCThread <- C.forkImmortal "ourIdleGC" logger $ liftIO $
ourIdleGC logger (seconds 0.3) (seconds 10) (seconds 60) ourIdleGC logger (seconds 0.3) (seconds 10) (seconds 60)
HasuraApp app cacheRef cacheInitTime stopWsServer <- flip onException (flushLogger loggerCtx) $ HasuraApp app cacheRef cacheInitTime stopWsServer <- flip onException (flushLogger loggerCtx) $
mkWaiApp env mkWaiApp env
soTxIso soTxIso
@ -364,6 +363,7 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp pos
_icSchemaCache _icSchemaCache
ekgStore ekgStore
soConnectionOptions soConnectionOptions
soWebsocketKeepAlive
-- log inconsistent schema objects -- log inconsistent schema objects
inconsObjs <- scInconsistentObjs <$> liftIO (getSCFromRef cacheRef) inconsObjs <- scInconsistentObjs <$> liftIO (getSCFromRef cacheRef)
@ -429,7 +429,7 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp pos
, eventQueueThread , eventQueueThread
, scheduledEventsThread , scheduledEventsThread
, cronEventsThread , cronEventsThread
] <> maybe [] pure telemetryThread ] <> onNothing telemetryThread []
finishTime <- liftIO Clock.getCurrentTime finishTime <- liftIO Clock.getCurrentTime
let apiInitTime = realToFrac $ Clock.diffUTCTime finishTime initTime let apiInitTime = realToFrac $ Clock.diffUTCTime finishTime initTime

View File

@ -73,6 +73,7 @@ import qualified Hasura.GraphQL.Transport.WebSocket.Server as WS
import qualified Hasura.Logging as L import qualified Hasura.Logging as L
import qualified Hasura.Server.Telemetry.Counters as Telem import qualified Hasura.Server.Telemetry.Counters as Telem
import qualified Hasura.Tracing as Tracing import qualified Hasura.Tracing as Tracing
import Hasura.Server.Init.Config (KeepAliveDelay (..))
-- | 'LQ.LiveQueryId' comes from 'Hasura.GraphQL.Execute.LiveQuery.State.addLiveQuery'. We use -- | 'LQ.LiveQueryId' comes from 'Hasura.GraphQL.Execute.LiveQuery.State.addLiveQuery'. We use
-- this to track a connection's operations so we can remove them from 'LiveQueryState', and -- this to track a connection's operations so we can remove them from 'LiveQueryState', and
@ -228,11 +229,12 @@ data WSServerEnv
-- , _wseQueryCache :: !E.PlanCache -- See Note [Temporarily disabling query plan caching] -- , _wseQueryCache :: !E.PlanCache -- See Note [Temporarily disabling query plan caching]
, _wseServer :: !WSServer , _wseServer :: !WSServer
, _wseEnableAllowlist :: !Bool , _wseEnableAllowlist :: !Bool
, _wseKeepAliveDelay :: !KeepAliveDelay
} }
onConn :: (MonadIO m) onConn :: (MonadIO m, MonadReader WSServerEnv m)
=> L.Logger L.Hasura -> CorsPolicy -> WS.OnConnH m WSConnData => WS.OnConnH m WSConnData
onConn (L.Logger logger) corsPolicy wsId requestHead ipAddress = do onConn wsId requestHead ipAddress = do
res <- runExceptT $ do res <- runExceptT $ do
(errType, queryType) <- checkPath (errType, queryType) <- checkPath
let reqHdrs = WS.requestHeaders requestHead let reqHdrs = WS.requestHeaders requestHead
@ -241,9 +243,10 @@ onConn (L.Logger logger) corsPolicy wsId requestHead ipAddress = do
either reject accept res either reject accept res
where where
keepAliveAction wsConn = liftIO $ forever $ do keepAliveAction keepAliveDelay wsConn = do
liftIO $ forever $ do
sendMsg wsConn SMConnKeepAlive sendMsg wsConn SMConnKeepAlive
sleep $ seconds 5 sleep $ seconds (unKeepAliveDelay keepAliveDelay)
tokenExpiryHandler wsConn = do tokenExpiryHandler wsConn = do
expTime <- liftIO $ STM.atomically $ do expTime <- liftIO $ STM.atomically $ do
@ -256,6 +259,8 @@ onConn (L.Logger logger) corsPolicy wsId requestHead ipAddress = do
sleep $ convertDuration $ TC.diffUTCTime expTime currTime sleep $ convertDuration $ TC.diffUTCTime expTime currTime
accept (hdrs, errType, queryType) = do accept (hdrs, errType, queryType) = do
(L.Logger logger) <- asks _wseLogger
keepAliveDelay <- asks _wseKeepAliveDelay
logger $ mkWsInfoLog Nothing (WsConnInfo wsId Nothing Nothing) EAccepted logger $ mkWsInfoLog Nothing (WsConnInfo wsId Nothing Nothing) EAccepted
connData <- liftIO $ WSConnData connData <- liftIO $ WSConnData
<$> STM.newTVarIO (CSNotInitialised hdrs ipAddress) <$> STM.newTVarIO (CSNotInitialised hdrs ipAddress)
@ -264,9 +269,9 @@ onConn (L.Logger logger) corsPolicy wsId requestHead ipAddress = do
<*> pure queryType <*> pure queryType
let acceptRequest = WS.defaultAcceptRequest let acceptRequest = WS.defaultAcceptRequest
{ WS.acceptSubprotocol = Just "graphql-ws"} { WS.acceptSubprotocol = Just "graphql-ws"}
return $ Right $ WS.AcceptWith connData acceptRequest keepAliveAction tokenExpiryHandler return $ Right $ WS.AcceptWith connData acceptRequest (keepAliveAction keepAliveDelay) tokenExpiryHandler
reject qErr = do reject qErr = do
(L.Logger logger) <- asks _wseLogger
logger $ mkWsErrorLog Nothing (WsConnInfo wsId Nothing Nothing) (ERejected qErr) logger $ mkWsErrorLog Nothing (WsConnInfo wsId Nothing Nothing) (ERejected qErr)
return $ Left $ WS.RejectRequest return $ Left $ WS.RejectRequest
(H.statusCode $ qeStatus qErr) (H.statusCode $ qeStatus qErr)
@ -283,7 +288,10 @@ onConn (L.Logger logger) corsPolicy wsId requestHead ipAddress = do
getOrigin = getOrigin =
find ((==) "Origin" . fst) (WS.requestHeaders requestHead) find ((==) "Origin" . fst) (WS.requestHeaders requestHead)
enforceCors origin reqHdrs = case cpConfig corsPolicy of enforceCors origin reqHdrs = do
(L.Logger logger) <- asks _wseLogger
corsPolicy <- asks _wseCorsPolicy
case cpConfig corsPolicy of
CCAllowAll -> return reqHdrs CCAllowAll -> return reqHdrs
CCDisabled readCookie -> CCDisabled readCookie ->
if readCookie if readCookie
@ -444,7 +452,7 @@ onStart env serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
return $ ResultsFragment telemTimeIO_DT Telem.Remote (JO.toEncJSON value) [] return $ ResultsFragment telemTimeIO_DT Telem.Remote (JO.toEncJSON value) []
WSServerEnv logger pgExecCtx lqMap getSchemaCache httpMgr _ sqlGenCtx {- planCache -} WSServerEnv logger pgExecCtx lqMap getSchemaCache httpMgr _ sqlGenCtx {- planCache -}
_ enableAL = serverEnv _ enableAL _keepAliveDelay = serverEnv
WSConnData userInfoR opMap errRespTy queryType = WS.getData wsConn WSConnData userInfoR opMap errRespTy queryType = WS.getData wsConn
@ -690,14 +698,15 @@ createWSServerEnv
-> CorsPolicy -> CorsPolicy
-> SQLGenCtx -> SQLGenCtx
-> Bool -> Bool
-> KeepAliveDelay
-- -> E.PlanCache -- -> E.PlanCache
-> m WSServerEnv -> m WSServerEnv
createWSServerEnv logger isPgCtx lqState getSchemaCache httpManager createWSServerEnv logger isPgCtx lqState getSchemaCache httpManager
corsPolicy sqlGenCtx enableAL {- planCache -} = do corsPolicy sqlGenCtx enableAL keepAliveDelay {- planCache -} = do
wsServer <- liftIO $ STM.atomically $ WS.createWSServer logger wsServer <- liftIO $ STM.atomically $ WS.createWSServer logger
return $ return $
WSServerEnv logger isPgCtx lqState getSchemaCache httpManager corsPolicy WSServerEnv logger isPgCtx lqState getSchemaCache httpManager corsPolicy
sqlGenCtx {- planCache -} wsServer enableAL sqlGenCtx {- planCache -} wsServer enableAL keepAliveDelay
createWSServerApp createWSServerApp
:: ( HasVersion :: ( HasVersion
@ -723,7 +732,7 @@ createWSServerApp env authMode serverEnv = \ !ipAddress !pendingConn ->
handlers = handlers =
WS.WSHandlers WS.WSHandlers
-- Mask async exceptions during event processing to help maintain integrity of mutable vars: -- Mask async exceptions during event processing to help maintain integrity of mutable vars:
(\rid rh ip -> mask_ $ onConn (_wseLogger serverEnv) (_wseCorsPolicy serverEnv) rid rh ip) (\rid rh ip -> mask_ $ flip runReaderT serverEnv $ onConn rid rh ip)
(\conn bs -> mask_ $ onMessage env authMode serverEnv conn bs) (\conn bs -> mask_ $ onMessage env authMode serverEnv conn bs)
(mask_ . onClose (_wseLogger serverEnv) (_wseLiveQMap serverEnv)) (mask_ . onClose (_wseLogger serverEnv) (_wseLiveQMap serverEnv))

View File

@ -333,7 +333,7 @@ mkSpockAction serverCtx qErrEncoder qErrModifier apiHandler = do
possiblyCompressedLazyBytes userInfo reqId waiReq req qTime respBytes respHeaders reqHeaders = do possiblyCompressedLazyBytes userInfo reqId waiReq req qTime respBytes respHeaders reqHeaders = do
let (compressedResp, mEncodingHeader, mCompressionType) = let (compressedResp, mEncodingHeader, mCompressionType) =
compressResponse (Wai.requestHeaders waiReq) respBytes compressResponse (Wai.requestHeaders waiReq) respBytes
encodingHeader = maybe [] pure mEncodingHeader encodingHeader = onNothing mEncodingHeader []
reqIdHeader = (requestIdHeader, txtToBs $ unRequestId reqId) reqIdHeader = (requestIdHeader, txtToBs $ unRequestId reqId)
allRespHeaders = pure reqIdHeader <> encodingHeader <> respHeaders allRespHeaders = pure reqIdHeader <> encodingHeader <> respHeaders
lift $ logHttpSuccess logger userInfo reqId waiReq req respBytes compressedResp qTime mCompressionType reqHeaders lift $ logHttpSuccess logger userInfo reqId waiReq req respBytes compressedResp qTime mCompressionType reqHeaders
@ -602,9 +602,10 @@ mkWaiApp
-> (RebuildableSchemaCache Run, Maybe UTCTime) -> (RebuildableSchemaCache Run, Maybe UTCTime)
-> EKG.Store -> EKG.Store
-> WS.ConnectionOptions -> WS.ConnectionOptions
-> KeepAliveDelay
-> m HasuraApp -> m HasuraApp
mkWaiApp env isoLevel logger sqlGenCtx enableAL pool pgExecCtxCustom ci httpManager mode corsCfg enableConsole consoleAssetsDir mkWaiApp env isoLevel logger sqlGenCtx enableAL pool pgExecCtxCustom ci httpManager mode corsCfg enableConsole consoleAssetsDir
enableTelemetry instanceId apis lqOpts _ {- planCacheOptions -} responseErrorsConfig liveQueryHook (schemaCache, cacheBuiltTime) ekgStore connectionOptions = do enableTelemetry instanceId apis lqOpts _ {- planCacheOptions -} responseErrorsConfig liveQueryHook (schemaCache, cacheBuiltTime) ekgStore connectionOptions keepAliveDelay = do
-- See Note [Temporarily disabling query plan caching] -- See Note [Temporarily disabling query plan caching]
-- (planCache, schemaCacheRef) <- initialiseCache -- (planCache, schemaCacheRef) <- initialiseCache
@ -617,7 +618,7 @@ mkWaiApp env isoLevel logger sqlGenCtx enableAL pool pgExecCtxCustom ci httpMana
lqState <- liftIO $ EL.initLiveQueriesState lqOpts pgExecCtx postPollHook lqState <- liftIO $ EL.initLiveQueriesState lqOpts pgExecCtx postPollHook
wsServerEnv <- WS.createWSServerEnv logger pgExecCtx lqState getSchemaCache httpManager wsServerEnv <- WS.createWSServerEnv logger pgExecCtx lqState getSchemaCache httpManager
corsPolicy sqlGenCtx enableAL {- planCache -} corsPolicy sqlGenCtx enableAL keepAliveDelay {- planCache -}
let serverCtx = ServerCtx let serverCtx = ServerCtx
{ scPGExecCtx = pgExecCtx { scPGExecCtx = pgExecCtx

View File

@ -180,13 +180,15 @@ mkServeOptions rso = do
then WS.PermessageDeflateCompression WS.defaultPermessageDeflate then WS.PermessageDeflateCompression WS.defaultPermessageDeflate
else WS.NoCompression else WS.NoCompression
} }
webSocketKeepAlive <- KeepAliveDelay . fromIntegral . fromMaybe 5
<$> withEnv (rsoWebSocketKeepAlive rso) (fst webSocketKeepAliveEnv)
return $ ServeOptions port host connParams txIso adminScrt authHook jwtSecret return $ ServeOptions port host connParams txIso adminScrt authHook jwtSecret
unAuthRole corsCfg enableConsole consoleAssetsDir unAuthRole corsCfg enableConsole consoleAssetsDir
enableTelemetry strfyNum enabledAPIs lqOpts enableAL enableTelemetry strfyNum enabledAPIs lqOpts enableAL
enabledLogs serverLogLevel planCacheOptions enabledLogs serverLogLevel planCacheOptions
internalErrorsConfig eventsHttpPoolSize eventsFetchInterval internalErrorsConfig eventsHttpPoolSize eventsFetchInterval
logHeadersFromEnv connectionOptions logHeadersFromEnv connectionOptions webSocketKeepAlive
where where
#ifdef DeveloperAPIs #ifdef DeveloperAPIs
defaultAPIs = [METADATA,GRAPHQL,PGDUMP,CONFIG,DEVELOPER] defaultAPIs = [METADATA,GRAPHQL,PGDUMP,CONFIG,DEVELOPER]
@ -325,7 +327,7 @@ serveCmdFooter =
, jwtSecretEnv, unAuthRoleEnv, corsDomainEnv, corsDisableEnv, enableConsoleEnv , jwtSecretEnv, unAuthRoleEnv, corsDomainEnv, corsDisableEnv, enableConsoleEnv
, enableTelemetryEnv, wsReadCookieEnv, stringifyNumEnv, enabledAPIsEnv , enableTelemetryEnv, wsReadCookieEnv, stringifyNumEnv, enabledAPIsEnv
, enableAllowlistEnv, enabledLogsEnv, logLevelEnv, devModeEnv , enableAllowlistEnv, enabledLogsEnv, logLevelEnv, devModeEnv
, adminInternalErrorsEnv , adminInternalErrorsEnv, webSocketKeepAliveEnv
] ]
eventEnvs = [ eventsHttpPoolSizeEnv, eventsFetchIntervalEnv ] eventEnvs = [ eventsHttpPoolSizeEnv, eventsFetchIntervalEnv ]
@ -943,6 +945,7 @@ serveOptsToLog so =
, "log_level" J..= soLogLevel so , "log_level" J..= soLogLevel so
, "plan_cache_options" J..= soPlanCacheOptions so , "plan_cache_options" J..= soPlanCacheOptions so
, "websocket_compression_options" J..= show (WS.connectionCompressionOptions . soConnectionOptions $ so) , "websocket_compression_options" J..= show (WS.connectionCompressionOptions . soConnectionOptions $ so)
, "websocket_keep_alive" J..= show (soWebsocketKeepAlive so)
] ]
mkGenericStrLog :: L.LogLevel -> Text -> String -> StartupLog mkGenericStrLog :: L.LogLevel -> Text -> String -> StartupLog
@ -989,6 +992,7 @@ serveOptionsParser =
<*> parseGraphqlEventsFetchInterval <*> parseGraphqlEventsFetchInterval
<*> parseLogHeadersFromEnv <*> parseLogHeadersFromEnv
<*> parseWebSocketCompression <*> parseWebSocketCompression
<*> parseWebSocketKeepAlive
-- | This implements the mapping between application versions -- | This implements the mapping between application versions
-- and catalog schema versions. -- and catalog schema versions.
@ -1035,3 +1039,17 @@ parseWebSocketCompression =
switch ( long "websocket-compression" <> switch ( long "websocket-compression" <>
help (snd webSocketCompressionEnv) help (snd webSocketCompressionEnv)
) )
webSocketKeepAliveEnv :: (String, String)
webSocketKeepAliveEnv =
( "HASURA_GRAPHQL_WEBSOCKET_KEEPALIVE"
, "Control websocket keep-alive timeout (default 5 seconds)"
)
parseWebSocketKeepAlive :: Parser (Maybe Int)
parseWebSocketKeepAlive =
optional $
option (eitherReader readEither)
( long "websocket-keepalive" <>
help (snd webSocketKeepAliveEnv)
)

View File

@ -66,6 +66,7 @@ data RawServeOptions impl
, rsoEventsFetchInterval :: !(Maybe Milliseconds) , rsoEventsFetchInterval :: !(Maybe Milliseconds)
, rsoLogHeadersFromEnv :: !Bool , rsoLogHeadersFromEnv :: !Bool
, rsoWebSocketCompression :: !Bool , rsoWebSocketCompression :: !Bool
, rsoWebSocketKeepAlive :: !(Maybe Int)
} }
-- | @'ResponseInternalErrorsConfig' represents the encoding of the internal -- | @'ResponseInternalErrorsConfig' represents the encoding of the internal
@ -83,6 +84,11 @@ shouldIncludeInternal role = \case
InternalErrorsAdminOnly -> isAdmin role InternalErrorsAdminOnly -> isAdmin role
InternalErrorsDisabled -> False InternalErrorsDisabled -> False
newtype KeepAliveDelay
= KeepAliveDelay
{ unKeepAliveDelay :: Seconds
} deriving (Eq, Show)
data ServeOptions impl data ServeOptions impl
= ServeOptions = ServeOptions
{ soPort :: !Int { soPort :: !Int
@ -109,6 +115,7 @@ data ServeOptions impl
, soEventsFetchInterval :: !(Maybe Milliseconds) , soEventsFetchInterval :: !(Maybe Milliseconds)
, soLogHeadersFromEnv :: !Bool , soLogHeadersFromEnv :: !Bool
, soConnectionOptions :: !WS.ConnectionOptions , soConnectionOptions :: !WS.ConnectionOptions
, soWebsocketKeepAlive :: !KeepAliveDelay
} }
data DowngradeOptions data DowngradeOptions