mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-14 17:02:49 +03:00
server: close subscriptions (with 1012) on metadata change
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/9159 GitOrigin-RevId: 5ab25ef57782c126de8852f3546809feb66a9d44
This commit is contained in:
parent
4a269b84fc
commit
14b4de37f7
@ -206,6 +206,20 @@ for JSON encoding-decoding.
|
||||
| **Default** | `false` |
|
||||
| **Supported in** | CE, Enterprise Edition |
|
||||
|
||||
### Close WebSocket connections on metadata change
|
||||
|
||||
When metadata changes, close all WebSocket connections (with error code `1012`). This is useful when you want to ensure
|
||||
that all clients reconnect to the latest metadata.
|
||||
|
||||
| | |
|
||||
| ------------------- | ----------------------------------------------------- |
|
||||
| **Flag** | `--disable-close-websockets-on-metadata-change` |
|
||||
| **Env var** | `HASURA_GRAPHQL_CLOSE_WEBSOCKETS_ON_METADATA_CHANGE` |
|
||||
| **Accepted values** | Boolean |
|
||||
| **Options** | `true` or `false` |
|
||||
| **Default** | `true` |
|
||||
| **Supported in** | CE, Enterprise Edition, Cloud |
|
||||
|
||||
### Connections per Read-Replica
|
||||
|
||||
The maximum number of Postgres connections per [read-replica](databases/database-config/read-replicas.mdx) that can be
|
||||
|
@ -308,7 +308,8 @@ serveOptions =
|
||||
soDefaultNamingConvention = Init._default Init.defaultNamingConventionOption,
|
||||
soExtensionsSchema = ExtensionsSchema "public",
|
||||
soMetadataDefaults = emptyMetadataDefaults,
|
||||
soApolloFederationStatus = ApolloFederationDisabled
|
||||
soApolloFederationStatus = ApolloFederationDisabled,
|
||||
soCloseWebsocketsOnMetadataChangeStatus = Init._default Init.closeWebsocketsOnMetadataChangeOption
|
||||
}
|
||||
|
||||
-- | What log level should be used by the engine; this is not exported, and
|
||||
|
@ -22,7 +22,6 @@ import Harness.Logging.Messages
|
||||
import Harness.Services.Composed qualified as Services
|
||||
import Harness.Test.BackendType
|
||||
import Hasura.Prelude
|
||||
import Network.WebSockets qualified as WS
|
||||
|
||||
-- | static information across an entire test suite run
|
||||
data GlobalTestEnvironment = GlobalTestEnvironment
|
||||
@ -88,7 +87,7 @@ instance Show GlobalTestEnvironment where
|
||||
|
||||
-- | How should we make requests to `graphql-engine`? Both WebSocket- and HTTP-
|
||||
-- based requests are supported.
|
||||
data Protocol = HTTP | WebSocket WS.Connection
|
||||
data Protocol = HTTP | WebSocket
|
||||
|
||||
-- | Credentials for our testing modes. See 'SpecHook.setupTestingMode' for the
|
||||
-- practical consequences of this type.
|
||||
|
@ -69,7 +69,7 @@ import Harness.Http qualified as Http
|
||||
import Harness.Logging
|
||||
import Harness.Quoter.Yaml (fromYaml, yaml)
|
||||
import Harness.Services.GraphqlEngine
|
||||
import Harness.TestEnvironment (Protocol (..), Server (..), TestEnvironment (..), TestingRole (..), getServer, requestProtocol, serverUrl, traceIf)
|
||||
import Harness.TestEnvironment (Protocol (..), Server (..), TestEnvironment (..), TestingRole (..), getServer, requestProtocol, server, serverUrl, traceIf)
|
||||
import Harness.WebSockets (responseListener, sendMessages)
|
||||
import Hasura.App qualified as App
|
||||
import Hasura.Logging (Hasura)
|
||||
@ -150,38 +150,46 @@ postGraphqlViaHttpOrWebSocketWithHeadersStatus ::
|
||||
(HasCallStack) => Int -> TestEnvironment -> Http.RequestHeaders -> Value -> IO Value
|
||||
postGraphqlViaHttpOrWebSocketWithHeadersStatus statusCode testEnv headers requestBody = do
|
||||
withFrozenCallStack $ case requestProtocol (globalEnvironment testEnv) of
|
||||
WebSocket connection -> postWithHeadersStatusViaWebSocket testEnv connection (addAuthzHeaders testEnv headers) requestBody
|
||||
WebSocket -> postWithHeadersStatusViaWebSocket testEnv (addAuthzHeaders testEnv headers) requestBody
|
||||
HTTP -> postWithHeadersStatus statusCode testEnv "/v1/graphql" headers requestBody
|
||||
|
||||
-- | Post some JSON to graphql-engine, getting back more JSON, via websockets.
|
||||
--
|
||||
-- This will be used by 'postWithHeadersStatus' if the 'TestEnvironment' sets
|
||||
-- the 'requestProtocol' to 'WebSocket'.
|
||||
postWithHeadersStatusViaWebSocket :: TestEnvironment -> WS.Connection -> Http.RequestHeaders -> Value -> IO Value
|
||||
postWithHeadersStatusViaWebSocket testEnv connection headers requestBody = do
|
||||
postWithHeadersStatusViaWebSocket :: TestEnvironment -> Http.RequestHeaders -> Value -> IO Value
|
||||
postWithHeadersStatusViaWebSocket testEnv headers requestBody = do
|
||||
let preparedHeaders :: HashMap Text ByteString
|
||||
preparedHeaders =
|
||||
HashMap.fromList
|
||||
[ (decodeUtf8 (original key), value)
|
||||
| (key, value) <- headers
|
||||
]
|
||||
sendMessages
|
||||
testEnv
|
||||
connection
|
||||
[ object
|
||||
[ "type" .= String "connection_init",
|
||||
"payload" .= object ["headers" .= preparedHeaders]
|
||||
],
|
||||
object
|
||||
[ "id" .= String "some-request-id",
|
||||
"type" .= String "start",
|
||||
"payload" .= requestBody
|
||||
]
|
||||
]
|
||||
server' = server (globalEnvironment testEnv)
|
||||
port' = port server'
|
||||
host = T.unpack (T.drop 7 (T.pack (urlPrefix server')))
|
||||
path = "/v1/graphql"
|
||||
|
||||
responseListener testEnv connection \_ type' payload -> do
|
||||
when (type' `notElem` ["data", "error"]) $ fail ("Websocket message type " <> T.unpack type' <> " received. Payload: " <> Char8.unpack (encode payload))
|
||||
pure payload
|
||||
-- paritosh: We are initiating a websocket connection for each request and sending the request instead of initiating a
|
||||
-- websocket connection for the test and reusing the connection for requests. This is done to avoid managing the
|
||||
-- connection (as we do have metadata changes as part of test and HGE closes websockets on metadata changes).
|
||||
WS.runClient host (fromIntegral port') path \connection -> do
|
||||
sendMessages
|
||||
testEnv
|
||||
connection
|
||||
[ object
|
||||
[ "type" .= String "connection_init",
|
||||
"payload" .= object ["headers" .= preparedHeaders]
|
||||
],
|
||||
object
|
||||
[ "id" .= String "some-request-id",
|
||||
"type" .= String "start",
|
||||
"payload" .= requestBody
|
||||
]
|
||||
]
|
||||
responseListener testEnv connection \_ type' payload -> do
|
||||
when (type' `notElem` ["data", "error"]) $ fail ("Websocket message type " <> T.unpack type' <> " received. Payload: " <> Char8.unpack (encode payload))
|
||||
pure payload
|
||||
|
||||
-- | Post some JSON to graphql-engine, getting back more JSON.
|
||||
--
|
||||
|
@ -3,11 +3,9 @@ module Harness.Test.Protocol
|
||||
)
|
||||
where
|
||||
|
||||
import GHC.Word (Word16)
|
||||
import Harness.TestEnvironment (GlobalTestEnvironment, Protocol (..))
|
||||
import Harness.TestEnvironment qualified as TestEnvironment
|
||||
import Hasura.Prelude
|
||||
import Network.WebSockets qualified as WS
|
||||
import Test.Hspec (ActionWith, SpecWith, aroundAllWith, describe)
|
||||
import Test.Hspec.Core.Spec (Item (..), mapSpecItem_)
|
||||
|
||||
@ -25,14 +23,10 @@ withEachProtocol spec = do
|
||||
|
||||
let connectWS :: ActionWith GlobalTestEnvironment -> ActionWith GlobalTestEnvironment
|
||||
connectWS k globalTestEnvironment = do
|
||||
let port' :: Word16
|
||||
port' = TestEnvironment.port (TestEnvironment.server globalTestEnvironment)
|
||||
|
||||
WS.runClient "127.0.0.1" (fromIntegral port') "/v1/graphql" \connection ->
|
||||
k
|
||||
globalTestEnvironment
|
||||
{ TestEnvironment.requestProtocol = WebSocket connection
|
||||
}
|
||||
k
|
||||
globalTestEnvironment
|
||||
{ TestEnvironment.requestProtocol = WebSocket
|
||||
}
|
||||
|
||||
aroundAllWith connectWS $ describe "Over WebSockets" $ flip mapSpecItem_ spec \item ->
|
||||
item {itemExample = \params -> itemExample item params}
|
||||
|
@ -508,6 +508,7 @@ initialiseAppContext env serveOptions@ServeOptions {..} AppInit {..} = do
|
||||
soDefaultNamingConvention
|
||||
soMetadataDefaults
|
||||
soApolloFederationStatus
|
||||
soCloseWebsocketsOnMetadataChangeStatus
|
||||
|
||||
-- Create the schema cache
|
||||
rebuildableSchemaCache <-
|
||||
|
@ -158,7 +158,8 @@ data AppContext = AppContext
|
||||
acEnableTelemetry :: TelemetryStatus,
|
||||
acEventEngineCtx :: EventEngineCtx,
|
||||
acAsyncActionsFetchInterval :: OptionalInterval,
|
||||
acApolloFederationStatus :: ApolloFederationStatus
|
||||
acApolloFederationStatus :: ApolloFederationStatus,
|
||||
acCloseWebsocketsOnMetadataChangeStatus :: CloseWebsocketsOnMetadataChangeStatus
|
||||
}
|
||||
|
||||
-- | Collection of the LoggerCtx, the regular Logger and the PGLogger
|
||||
@ -268,7 +269,8 @@ buildAppContextRule = proc (ServeOptions {..}, env, _keys) -> do
|
||||
acEnableTelemetry = soEnableTelemetry,
|
||||
acEventEngineCtx = eventEngineCtx,
|
||||
acAsyncActionsFetchInterval = soAsyncActionsFetchInterval,
|
||||
acApolloFederationStatus = soApolloFederationStatus
|
||||
acApolloFederationStatus = soApolloFederationStatus,
|
||||
acCloseWebsocketsOnMetadataChangeStatus = soCloseWebsocketsOnMetadataChangeStatus
|
||||
}
|
||||
where
|
||||
buildSqlGenCtx = Inc.cache proc (experimentalFeatures, stringifyNum, dangerousBooleanCollapse) -> do
|
||||
@ -340,5 +342,6 @@ buildCacheDynamicConfig AppContext {..} = do
|
||||
_cdcExperimentalFeatures = acExperimentalFeatures,
|
||||
_cdcDefaultNamingConvention = acDefaultNamingConvention,
|
||||
_cdcMetadataDefaults = acMetadataDefaults,
|
||||
_cdcApolloFederationStatus = acApolloFederationStatus
|
||||
_cdcApolloFederationStatus = acApolloFederationStatus,
|
||||
_cdcCloseWebsocketsOnMetadataChangeStatus = acCloseWebsocketsOnMetadataChangeStatus
|
||||
}
|
||||
|
@ -584,7 +584,7 @@ removeStreamingQuery logger serverMetrics prometheusMetrics subscriptionState (S
|
||||
(GaugeVector.dec numSubscriptionMetric promMetricGranularLabel)
|
||||
(GaugeVector.dec numSubscriptionMetric promMetricLabel)
|
||||
|
||||
-- | An async action query whose relationships are refered to table in a source.
|
||||
-- | An async action query whose relationships are referred to table in a source.
|
||||
-- We need to generate an SQL statement with the action response and execute it
|
||||
-- in the source database so as to fetch response joined with relationship rows.
|
||||
-- For more details see Note [Resolving async action query]
|
||||
|
@ -17,6 +17,9 @@ module Hasura.GraphQL.Transport.WebSocket
|
||||
onClose,
|
||||
sendMsg,
|
||||
sendCloseWithMsg,
|
||||
mkCloseWebsocketsOnMetadataChangeAction,
|
||||
runWebsocketCloseOnMetadataChangeAction,
|
||||
WebsocketCloseOnMetadataChangeAction,
|
||||
)
|
||||
where
|
||||
|
||||
@ -68,6 +71,7 @@ import Hasura.GraphQL.Transport.Instances ()
|
||||
import Hasura.GraphQL.Transport.WebSocket.Protocol
|
||||
import Hasura.GraphQL.Transport.WebSocket.Server qualified as WS
|
||||
import Hasura.GraphQL.Transport.WebSocket.Types
|
||||
import Hasura.GraphQL.Transport.WebSocket.Types qualified as WS
|
||||
import Hasura.Logging qualified as L
|
||||
import Hasura.Metadata.Class
|
||||
import Hasura.Prelude
|
||||
@ -1213,3 +1217,18 @@ onClose logger serverMetrics prometheusMetrics subscriptionsState wsConn granula
|
||||
StreamingQuerySubscriber streamSubscriberId -> ES.removeStreamingQuery logger serverMetrics prometheusMetrics subscriptionsState streamSubscriberId granularPrometheusMetricsState operationName
|
||||
where
|
||||
opMap = _wscOpMap $ WS.getData wsConn
|
||||
|
||||
newtype WebsocketCloseOnMetadataChangeAction = WebsocketCloseOnMetadataChangeAction
|
||||
{ runWebsocketCloseOnMetadataChangeAction :: IO ()
|
||||
}
|
||||
|
||||
-- | By default, we close all the websocket connections when the metadata changes. This function is used to create the
|
||||
-- action that will be run when the metadata changes.
|
||||
mkCloseWebsocketsOnMetadataChangeAction :: WS.WSServer WS.WSConnData -> WebsocketCloseOnMetadataChangeAction
|
||||
mkCloseWebsocketsOnMetadataChangeAction wsServer =
|
||||
WebsocketCloseOnMetadataChangeAction
|
||||
$ WS.closeAllConnectionsWithReason
|
||||
wsServer
|
||||
"Closing all websocket connections as the metadata has changed"
|
||||
"Server state changed, restarting the server"
|
||||
id
|
||||
|
@ -24,6 +24,7 @@ module Hasura.GraphQL.Transport.WebSocket.Server
|
||||
sendMsgAndCloseConn,
|
||||
createServerApp,
|
||||
createWSServer,
|
||||
closeAllConnectionsWithReason,
|
||||
getData,
|
||||
getRawWebSocketConnection,
|
||||
getWSId,
|
||||
@ -278,6 +279,22 @@ closeAllWith ::
|
||||
closeAllWith closer msg conns =
|
||||
void $ A.mapConcurrently (closer msg . snd) conns
|
||||
|
||||
closeAllConnectionsWithReason ::
|
||||
WSServer a ->
|
||||
String ->
|
||||
BL.ByteString ->
|
||||
(SecuritySensitiveUserConfig -> SecuritySensitiveUserConfig) ->
|
||||
IO ()
|
||||
closeAllConnectionsWithReason (WSServer (L.Logger writeLog) userConfRef serverStatus) logMsg reason updateConf = do
|
||||
writeLog
|
||||
$ WSReaperThreadLog
|
||||
$ fromString
|
||||
$ logMsg
|
||||
conns <- STM.atomically $ do
|
||||
STM.modifyTVar' userConfRef updateConf
|
||||
flushConnMap serverStatus
|
||||
closeAllWith (flip forceConnReconnect) reason conns
|
||||
|
||||
-- | Resets the current connections map to an empty one if the server is
|
||||
-- running and returns the list of connections that were in the map
|
||||
-- before flushing it.
|
||||
@ -354,7 +371,7 @@ data WSHandlers m a = WSHandlers
|
||||
-- stringify big query numeric, experimental features and invalidates/closes all
|
||||
-- connections if there are any changes.
|
||||
websocketConnectionReaper :: IO (AuthMode, AllowListStatus, CorsPolicy, SQLGenCtx, Set.HashSet ExperimentalFeature, NamingCase) -> IO SchemaCache -> WSServer a -> IO Void
|
||||
websocketConnectionReaper getLatestConfig getSchemaCache (WSServer (L.Logger writeLog) userConfRef serverStatus) =
|
||||
websocketConnectionReaper getLatestConfig getSchemaCache ws@(WSServer _ userConfRef _) =
|
||||
forever $ do
|
||||
(currAuthMode, currEnableAllowlist, currCorsPolicy, currSqlGenCtx, currExperimentalFeatures, currDefaultNamingCase) <- getLatestConfig
|
||||
currAllowlist <- scAllowlist <$> getSchemaCache
|
||||
@ -370,21 +387,6 @@ websocketConnectionReaper getLatestConfig getSchemaCache (WSServer (L.Logger wri
|
||||
(currDefaultNamingCase, prevDefaultNamingCase)
|
||||
sleep $ seconds 1
|
||||
where
|
||||
closeAllConnectionsWithReason ::
|
||||
String ->
|
||||
BL.ByteString ->
|
||||
(SecuritySensitiveUserConfig -> SecuritySensitiveUserConfig) ->
|
||||
IO ()
|
||||
closeAllConnectionsWithReason logMsg reason updateConf = do
|
||||
writeLog
|
||||
$ WSReaperThreadLog
|
||||
$ fromString
|
||||
$ logMsg
|
||||
conns <- STM.atomically $ do
|
||||
STM.modifyTVar' userConfRef updateConf
|
||||
flushConnMap serverStatus
|
||||
closeAllWith (flip forceConnReconnect) reason conns
|
||||
|
||||
-- Close all connections based on -
|
||||
-- if CorsPolicy changed -> close
|
||||
-- if AuthMode changed -> close
|
||||
@ -419,12 +421,14 @@ websocketConnectionReaper getLatestConfig getSchemaCache (WSServer (L.Logger wri
|
||||
-- if CORS policy has changed, close all connections
|
||||
| hasCorsPolicyChanged ->
|
||||
closeAllConnectionsWithReason
|
||||
ws
|
||||
"closing all websocket connections as the cors policy changed"
|
||||
"cors policy changed"
|
||||
(\conf -> conf {ssucCorsPolicy = currCorsPolicy})
|
||||
-- if any auth config has changed, close all connections
|
||||
| hasAuthModeChanged ->
|
||||
closeAllConnectionsWithReason
|
||||
ws
|
||||
"closing all websocket connections as the auth mode changed"
|
||||
"auth mode changed"
|
||||
(\conf -> conf {ssucAuthMode = currAuthMode})
|
||||
@ -434,6 +438,7 @@ websocketConnectionReaper getLatestConfig getSchemaCache (WSServer (L.Logger wri
|
||||
-- connections.
|
||||
| hasAllowlistEnabled ->
|
||||
closeAllConnectionsWithReason
|
||||
ws
|
||||
"closing all websocket connections as allow list is enabled"
|
||||
"allow list enabled"
|
||||
(\conf -> conf {ssucEnableAllowlist = currEnableAllowlist})
|
||||
@ -441,42 +446,49 @@ websocketConnectionReaper getLatestConfig getSchemaCache (WSServer (L.Logger wri
|
||||
-- allowlist, we need to close all the connections.
|
||||
| hasAllowlistUpdated ->
|
||||
closeAllConnectionsWithReason
|
||||
ws
|
||||
"closing all websocket connections as the allow list has been updated"
|
||||
"allow list updated"
|
||||
(\conf -> conf {ssucAllowlist = currAllowlist})
|
||||
-- if HASURA_GRAPHQL_STRINGIFY_NUMERIC_TYPES has changed, close all connections
|
||||
| hasStringifyNumChanged ->
|
||||
closeAllConnectionsWithReason
|
||||
ws
|
||||
"closing all websocket connections as the HASURA_GRAPHQL_STRINGIFY_NUMERIC_TYPES setting changed"
|
||||
"HASURA_GRAPHQL_STRINGIFY_NUMERIC_TYPES env var changed"
|
||||
(\conf -> conf {ssucSQLGenCtx = currSqlGenCtx})
|
||||
-- if HASURA_GRAPHQL_V1_BOOLEAN_NULL_COLLAPSE has changed, close all connections
|
||||
| hasDangerousBooleanCollapseChanged ->
|
||||
closeAllConnectionsWithReason
|
||||
ws
|
||||
"closing all websocket connections as the HASURA_GRAPHQL_V1_BOOLEAN_NULL_COLLAPSE setting changed"
|
||||
"HASURA_GRAPHQL_V1_BOOLEAN_NULL_COLLAPSE env var changed"
|
||||
(\conf -> conf {ssucSQLGenCtx = currSqlGenCtx})
|
||||
-- if 'bigquery_string_numeric_input' option added/removed from experimental features, close all connections
|
||||
| hasBigqueryStringNumericInputChanged ->
|
||||
closeAllConnectionsWithReason
|
||||
ws
|
||||
"closing all websocket connections as the 'bigquery_string_numeric_input' option has been added/removed from HASURA_GRAPHQL_EXPERIMENTAL_FEATURES"
|
||||
"'bigquery_string_numeric_input' removed/added in HASURA_GRAPHQL_EXPERIMENTAL_FEATURES env var"
|
||||
(\conf -> conf {ssucSQLGenCtx = currSqlGenCtx})
|
||||
-- if 'hide_aggregation_predicates' option added/removed from experimental features, close all connections
|
||||
| hasHideAggregationPredicatesChanged ->
|
||||
closeAllConnectionsWithReason
|
||||
ws
|
||||
"closing all websocket connections as the 'hide-aggregation-predicates' option has been added/removed from HASURA_GRAPHQL_EXPERIMENTAL_FEATURES"
|
||||
"'hide-aggregation-predicates' removed/added in HASURA_GRAPHQL_EXPERIMENTAL_FEATURES env var"
|
||||
(\conf -> conf {ssucExperimentalFeatures = currExperimentalFeatures})
|
||||
-- if 'hide_stream_fields' option added/removed from experimental features, close all connections
|
||||
| hasHideStreamFieldsChanged ->
|
||||
closeAllConnectionsWithReason
|
||||
ws
|
||||
"closing all websocket connections as the 'hide-stream-fields' option has been added/removed from HASURA_GRAPHQL_EXPERIMENTAL_FEATURES"
|
||||
"'hide-stream-fields' removed/added in HASURA_GRAPHQL_EXPERIMENTAL_FEATURES env var"
|
||||
(\conf -> conf {ssucExperimentalFeatures = currExperimentalFeatures})
|
||||
-- if naming convention has been changed, close all connections
|
||||
| hasDefaultNamingCaseChanged ->
|
||||
closeAllConnectionsWithReason
|
||||
ws
|
||||
"closing all websocket connections as the 'naming_convention' option has been added/removed from HASURA_GRAPHQL_EXPERIMENTAL_FEATURES and the HASURA_GRAPHQL_DEFAULT_NAMING_CONVENTION has changed"
|
||||
"naming convention has been changed"
|
||||
(\conf -> conf {ssucExperimentalFeatures = currExperimentalFeatures, ssucDefaultNamingCase = currDefaultNamingCase})
|
||||
|
@ -72,6 +72,7 @@ data CacheDynamicConfig = CacheDynamicConfig
|
||||
_cdcExperimentalFeatures :: HashSet ExperimentalFeature,
|
||||
_cdcDefaultNamingConvention :: NamingCase,
|
||||
_cdcMetadataDefaults :: MetadataDefaults,
|
||||
_cdcApolloFederationStatus :: ApolloFederationStatus
|
||||
_cdcApolloFederationStatus :: ApolloFederationStatus,
|
||||
_cdcCloseWebsocketsOnMetadataChangeStatus :: CloseWebsocketsOnMetadataChangeStatus
|
||||
}
|
||||
deriving (Eq)
|
||||
|
@ -18,6 +18,7 @@ import Hasura.Base.Error
|
||||
import Hasura.EncJSON
|
||||
import Hasura.Eventing.Backend
|
||||
import Hasura.Function.API qualified as Functions
|
||||
import Hasura.GraphQL.Transport.WebSocket qualified as WS
|
||||
import Hasura.Logging qualified as L
|
||||
import Hasura.LogicalModel.API qualified as LogicalModel
|
||||
import Hasura.Metadata.Class
|
||||
@ -105,9 +106,10 @@ runMetadataQuery ::
|
||||
) =>
|
||||
AppContext ->
|
||||
RebuildableSchemaCache ->
|
||||
WS.WebsocketCloseOnMetadataChangeAction ->
|
||||
RQLMetadata ->
|
||||
m (EncJSON, RebuildableSchemaCache)
|
||||
runMetadataQuery appContext schemaCache RQLMetadata {..} = do
|
||||
runMetadataQuery appContext schemaCache closeWebsocketsOnMetadataChange RQLMetadata {..} = do
|
||||
AppEnv {..} <- askAppEnv
|
||||
let logger = _lsLogger appEnvLoggers
|
||||
MetadataWithResourceVersion metadata currentResourceVersion <- Tracing.newSpan "fetchMetadata" $ liftEitherM fetchMetadata
|
||||
@ -184,6 +186,13 @@ runMetadataQuery appContext schemaCache RQLMetadata {..} = do
|
||||
$ setMetadataResourceVersionInSchemaCache newResourceVersion
|
||||
& runCacheRWT dynamicConfig modSchemaCache
|
||||
|
||||
-- Close all subscriptions with 1012 code (subscribers should reconnect)
|
||||
-- and close poller threads
|
||||
when ((_cdcCloseWebsocketsOnMetadataChangeStatus dynamicConfig) == CWMCEnabled)
|
||||
$ Tracing.newSpan "closeWebsocketsOnMetadataChange"
|
||||
$ liftIO
|
||||
$ WS.runWebsocketCloseOnMetadataChangeAction closeWebsocketsOnMetadataChange
|
||||
|
||||
pure (r, modSchemaCache')
|
||||
(MaintenanceModeEnabled (), ReadOnlyModeDisabled) ->
|
||||
throw500 "metadata cannot be modified in maintenance mode"
|
||||
|
@ -66,6 +66,7 @@ import Hasura.GraphQL.Logging (MonadExecutionLog, MonadQueryLog)
|
||||
import Hasura.GraphQL.Transport.HTTP qualified as GH
|
||||
import Hasura.GraphQL.Transport.HTTP.Protocol qualified as GH
|
||||
import Hasura.GraphQL.Transport.WSServerApp qualified as WS
|
||||
import Hasura.GraphQL.Transport.WebSocket qualified as WS
|
||||
import Hasura.GraphQL.Transport.WebSocket.Server qualified as WS
|
||||
import Hasura.GraphQL.Transport.WebSocket.Types qualified as WS
|
||||
import Hasura.HTTP
|
||||
@ -495,9 +496,10 @@ v1MetadataHandler ::
|
||||
UserInfoM m
|
||||
) =>
|
||||
((RebuildableSchemaCache -> m (EncJSON, RebuildableSchemaCache)) -> m EncJSON) ->
|
||||
WS.WebsocketCloseOnMetadataChangeAction ->
|
||||
RQLMetadata ->
|
||||
m (HttpResponse EncJSON)
|
||||
v1MetadataHandler schemaCacheRefUpdater query = Tracing.newSpan "Metadata" $ do
|
||||
v1MetadataHandler schemaCacheRefUpdater closeWebsocketsOnMetadataChangeAction query = Tracing.newSpan "Metadata" $ do
|
||||
(liftEitherM . authorizeV1MetadataApi query) =<< ask
|
||||
appContext <- asks hcAppContext
|
||||
r <-
|
||||
@ -505,6 +507,7 @@ v1MetadataHandler schemaCacheRefUpdater query = Tracing.newSpan "Metadata" $ do
|
||||
runMetadataQuery
|
||||
appContext
|
||||
schemaCache
|
||||
closeWebsocketsOnMetadataChangeAction
|
||||
query
|
||||
pure $ HttpResponse r []
|
||||
|
||||
@ -805,6 +808,7 @@ mkWaiApp setupHook appStateRef consoleType ekgStore wsServerEnv = do
|
||||
Spock.spockAsApp
|
||||
$ Spock.spockT lowerIO
|
||||
$ httpApp setupHook appStateRef appEnv consoleType ekgStore
|
||||
$ WS.mkCloseWebsocketsOnMetadataChangeAction (WS._wseServer wsServerEnv)
|
||||
|
||||
let wsServerApp = WS.createWSServerApp (_lsEnabledLogTypes appEnvLoggingSettings) wsServerEnv appEnvWebSocketConnectionInitTimeout appEnvLicenseKeyCache
|
||||
stopWSServer = WS.stopWSServerApp wsServerEnv
|
||||
@ -846,8 +850,9 @@ httpApp ::
|
||||
AppEnv ->
|
||||
ConsoleType m ->
|
||||
EKG.Store EKG.EmptyMetrics ->
|
||||
WS.WebsocketCloseOnMetadataChangeAction ->
|
||||
Spock.SpockT m ()
|
||||
httpApp setupHook appStateRef AppEnv {..} consoleType ekgStore = do
|
||||
httpApp setupHook appStateRef AppEnv {..} consoleType ekgStore closeWebsocketsOnMetadataChangeAction = do
|
||||
-- Additional spock action to run
|
||||
setupHook appStateRef
|
||||
|
||||
@ -970,7 +975,7 @@ httpApp setupHook appStateRef AppEnv {..} consoleType ekgStore = do
|
||||
$ spockAction encodeQErr id
|
||||
$ mkPostHandler
|
||||
$ fmap (emptyHttpLogGraphQLInfo,)
|
||||
<$> mkAPIRespHandler (v1MetadataHandler schemaCacheUpdater)
|
||||
<$> mkAPIRespHandler (v1MetadataHandler schemaCacheUpdater closeWebsocketsOnMetadataChangeAction)
|
||||
|
||||
Spock.post "v2/query" $ do
|
||||
onlyWhenApiEnabled isMetadataEnabled appStateRef
|
||||
|
@ -214,6 +214,8 @@ mkServeOptions sor@ServeOptionsRaw {..} = do
|
||||
soApolloFederationStatus <- do
|
||||
apolloFederationStatusOptionM <- withOptionDefault (pure <$> rsoApolloFederationStatus) apolloFederationStatusOption
|
||||
pure $ getApolloFederationStatus soExperimentalFeatures apolloFederationStatusOptionM
|
||||
soCloseWebsocketsOnMetadataChangeStatus <- do
|
||||
withOptionDefault rsoCloseWebsocketsOnMetadataChangeStatus closeWebsocketsOnMetadataChangeOption
|
||||
pure ServeOptions {..}
|
||||
|
||||
-- | Fetch Postgres 'Query.ConnParams' components from the environment
|
||||
|
@ -60,6 +60,7 @@ module Hasura.Server.Init.Arg.Command.Serve
|
||||
parseMetadataDefaults,
|
||||
metadataDefaultsOption,
|
||||
apolloFederationStatusOption,
|
||||
closeWebsocketsOnMetadataChangeOption,
|
||||
|
||||
-- * Pretty Printer
|
||||
serveCmdFooter,
|
||||
@ -146,6 +147,7 @@ serveCommandParser =
|
||||
<*> parseExtensionsSchema
|
||||
<*> parseMetadataDefaults
|
||||
<*> parseApolloFederationStatus
|
||||
<*> parseEnableCloseWebsocketsOnMetadataChange
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
-- Serve Options
|
||||
@ -1158,6 +1160,22 @@ parseApolloFederationStatus =
|
||||
<> Opt.help (Config._helpMessage apolloFederationStatusOption)
|
||||
)
|
||||
|
||||
closeWebsocketsOnMetadataChangeOption :: Config.Option (Types.CloseWebsocketsOnMetadataChangeStatus)
|
||||
closeWebsocketsOnMetadataChangeOption =
|
||||
Config.Option
|
||||
{ Config._default = Types.CWMCEnabled,
|
||||
Config._envVar = "HASURA_GRAPHQL_CLOSE_WEBSOCKETS_ON_METADATA_CHANGE",
|
||||
Config._helpMessage = "Close all the websocket connections (with error code 1012) on metadata change (default: true)."
|
||||
}
|
||||
|
||||
parseEnableCloseWebsocketsOnMetadataChange :: Opt.Parser (Maybe Types.CloseWebsocketsOnMetadataChangeStatus)
|
||||
parseEnableCloseWebsocketsOnMetadataChange =
|
||||
(bool Nothing (Just Types.CWMCDisabled))
|
||||
<$> Opt.switch
|
||||
( Opt.long "disable-close-websockets-on-metadata-change"
|
||||
<> Opt.help (Config._helpMessage closeWebsocketsOnMetadataChangeOption)
|
||||
)
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
-- Pretty Printer
|
||||
|
||||
@ -1256,6 +1274,7 @@ serveCmdFooter =
|
||||
Config.optionPP enableMetadataQueryLoggingOption,
|
||||
Config.optionPP defaultNamingConventionOption,
|
||||
Config.optionPP metadataDBExtensionsSchemaOption,
|
||||
Config.optionPP apolloFederationStatusOption
|
||||
Config.optionPP apolloFederationStatusOption,
|
||||
Config.optionPP closeWebsocketsOnMetadataChangeOption
|
||||
]
|
||||
eventEnvs = [Config.optionPP graphqlEventsHttpPoolSizeOption, Config.optionPP graphqlEventsFetchIntervalOption]
|
||||
|
@ -320,7 +320,8 @@ data ServeOptionsRaw impl = ServeOptionsRaw
|
||||
rsoDefaultNamingConvention :: Maybe NamingCase,
|
||||
rsoExtensionsSchema :: Maybe MonadTx.ExtensionsSchema,
|
||||
rsoMetadataDefaults :: Maybe MetadataDefaults,
|
||||
rsoApolloFederationStatus :: Maybe Server.Types.ApolloFederationStatus
|
||||
rsoApolloFederationStatus :: Maybe Server.Types.ApolloFederationStatus,
|
||||
rsoCloseWebsocketsOnMetadataChangeStatus :: Maybe Server.Types.CloseWebsocketsOnMetadataChangeStatus
|
||||
}
|
||||
|
||||
-- | Whether or not to serve Console assets.
|
||||
@ -618,7 +619,8 @@ data ServeOptions impl = ServeOptions
|
||||
soDefaultNamingConvention :: NamingCase,
|
||||
soExtensionsSchema :: MonadTx.ExtensionsSchema,
|
||||
soMetadataDefaults :: MetadataDefaults,
|
||||
soApolloFederationStatus :: Server.Types.ApolloFederationStatus
|
||||
soApolloFederationStatus :: Server.Types.ApolloFederationStatus,
|
||||
soCloseWebsocketsOnMetadataChangeStatus :: Server.Types.CloseWebsocketsOnMetadataChangeStatus
|
||||
}
|
||||
|
||||
-- | 'ResponseInternalErrorsConfig' represents the encoding of the
|
||||
|
@ -372,3 +372,6 @@ instance FromEnv Server.Types.ApolloFederationStatus where
|
||||
|
||||
instance FromEnv GranularPrometheusMetricsState where
|
||||
fromEnv = fmap (bool GranularMetricsOff GranularMetricsOn) . fromEnv @Bool
|
||||
|
||||
instance FromEnv Server.Types.CloseWebsocketsOnMetadataChangeStatus where
|
||||
fromEnv = fmap (bool Server.Types.CWMCDisabled Server.Types.CWMCEnabled) . fromEnv @Bool
|
||||
|
@ -18,6 +18,8 @@ module Hasura.Server.Types
|
||||
ApolloFederationStatus (..),
|
||||
isApolloFederationEnabled,
|
||||
GranularPrometheusMetricsState (..),
|
||||
CloseWebsocketsOnMetadataChangeStatus (..),
|
||||
isCloseWebsocketsOnMetadataChangeStatusEnabled,
|
||||
MonadGetPolicies (..),
|
||||
)
|
||||
where
|
||||
@ -180,6 +182,25 @@ instance ToJSON GranularPrometheusMetricsState where
|
||||
GranularMetricsOff -> Bool False
|
||||
GranularMetricsOn -> Bool True
|
||||
|
||||
-- | Whether or not to close websocket connections on metadata change.
|
||||
data CloseWebsocketsOnMetadataChangeStatus = CWMCEnabled | CWMCDisabled
|
||||
deriving stock (Show, Eq, Ord, Generic)
|
||||
|
||||
instance NFData CloseWebsocketsOnMetadataChangeStatus
|
||||
|
||||
instance Hashable CloseWebsocketsOnMetadataChangeStatus
|
||||
|
||||
instance FromJSON CloseWebsocketsOnMetadataChangeStatus where
|
||||
parseJSON = fmap (bool CWMCDisabled CWMCEnabled) . parseJSON
|
||||
|
||||
isCloseWebsocketsOnMetadataChangeStatusEnabled :: CloseWebsocketsOnMetadataChangeStatus -> Bool
|
||||
isCloseWebsocketsOnMetadataChangeStatusEnabled = \case
|
||||
CWMCEnabled -> True
|
||||
CWMCDisabled -> False
|
||||
|
||||
instance ToJSON CloseWebsocketsOnMetadataChangeStatus where
|
||||
toJSON = toJSON . isCloseWebsocketsOnMetadataChangeStatusEnabled
|
||||
|
||||
class (Monad m) => MonadGetPolicies m where
|
||||
runGetApiTimeLimit ::
|
||||
m (Maybe MaxTime)
|
||||
|
@ -2096,3 +2096,16 @@ serveParserSpec =
|
||||
Opt.Success enableApolloFederation -> enableApolloFederation == (Just Types.ApolloFederationEnabled)
|
||||
Opt.Failure _pf -> False
|
||||
Opt.CompletionInvoked _cr -> False
|
||||
|
||||
Hspec.it "It accepts '--disable-close-websockets-on-metadata-change'" $ do
|
||||
let -- Given
|
||||
parserInfo = Opt.info (UUT.serveCommandParser @Logging.Hasura Opt.<**> Opt.helper) Opt.fullDesc
|
||||
-- When
|
||||
argInput = ["--disable-close-websockets-on-metadata-change"]
|
||||
-- Then
|
||||
result = Opt.execParserPure Opt.defaultPrefs parserInfo argInput
|
||||
|
||||
fmap UUT.rsoCloseWebsocketsOnMetadataChangeStatus result `Hspec.shouldSatisfy` \case
|
||||
Opt.Success disableCloseWebsocketsOnMetadataChange -> disableCloseWebsocketsOnMetadataChange == (Just Types.CWMCDisabled)
|
||||
Opt.Failure _pf -> False
|
||||
Opt.CompletionInvoked _cr -> False
|
||||
|
@ -91,7 +91,8 @@ emptyServeOptionsRaw =
|
||||
rsoDefaultNamingConvention = Nothing,
|
||||
rsoExtensionsSchema = Nothing,
|
||||
rsoMetadataDefaults = Nothing,
|
||||
rsoApolloFederationStatus = Nothing
|
||||
rsoApolloFederationStatus = Nothing,
|
||||
rsoCloseWebsocketsOnMetadataChangeStatus = Nothing
|
||||
}
|
||||
|
||||
mkServeOptionsSpec :: Hspec.Spec
|
||||
|
@ -90,7 +90,8 @@ serveOptions =
|
||||
soDefaultNamingConvention = Init._default Init.defaultNamingConventionOption,
|
||||
soExtensionsSchema = ExtensionsSchema "public",
|
||||
soMetadataDefaults = emptyMetadataDefaults,
|
||||
soApolloFederationStatus = ApolloFederationDisabled
|
||||
soApolloFederationStatus = ApolloFederationDisabled,
|
||||
soCloseWebsocketsOnMetadataChangeStatus = Init._default Init.closeWebsocketsOnMetadataChangeOption
|
||||
}
|
||||
|
||||
-- | What log level should be used by the engine; this is not exported, and
|
||||
|
@ -126,6 +126,7 @@ main = do
|
||||
(_default defaultNamingConventionOption)
|
||||
emptyMetadataDefaults
|
||||
ApolloFederationDisabled
|
||||
(_default closeWebsocketsOnMetadataChangeOption)
|
||||
cacheBuildParams = CacheBuildParams httpManager (mkPgSourceResolver print) mkMSSQLSourceResolver staticConfig
|
||||
|
||||
(_appInit, appEnv) <-
|
||||
|
@ -193,7 +193,7 @@ streamingSubscriptionPollingSpec srcConfig = do
|
||||
-- well create a new function with the other variables being set to `mempty`
|
||||
mkCohortVariables' = mkCohortVariables mempty mempty mempty mempty
|
||||
|
||||
describe "Streaming subcription poll" $ do
|
||||
describe "Streaming subscription poll" $ do
|
||||
cohortId1 <- runIO newCohortId
|
||||
(subscriberId1, subscriberId2) <- runIO $ (,) <$> newSubscriberId <*> newSubscriberId
|
||||
let subscriber1 = mkSubscriber subscriberId1
|
||||
|
@ -78,13 +78,20 @@ class GQLWsClient():
|
||||
def get_ws_query_event(self, query_id, timeout):
|
||||
return self.ws_id_query_queues[query_id].get(timeout=timeout)
|
||||
|
||||
def send(self, frame):
|
||||
def send(self, frame, count=0):
|
||||
self.wait_for_connection()
|
||||
if frame.get('type') == 'stop':
|
||||
self.ws_active_query_ids.discard( frame.get('id') )
|
||||
elif frame.get('type') == 'start' and 'id' in frame:
|
||||
self.ws_id_query_queues[frame['id']] = queue.Queue(maxsize=-1)
|
||||
self._ws.send(json.dumps(frame))
|
||||
try:
|
||||
self._ws.send(json.dumps(frame))
|
||||
except websocket.WebSocketConnectionClosedException:
|
||||
if count > 2:
|
||||
raise websocket.WebSocketConnectionClosedException("Connection is already closed and cannot be recreated even after 3 attempts")
|
||||
# Connection closed, try to recreate the connection and send the frame again
|
||||
self.recreate_conn()
|
||||
self.send(frame, count+1)
|
||||
|
||||
def init_as_admin(self):
|
||||
headers={}
|
||||
|
@ -264,7 +264,7 @@ class TestWebsocketLogging:
|
||||
tests for the `websocket-log` type. currently tests presence of operation_name
|
||||
"""
|
||||
def _get_ws_server_logs(x):
|
||||
return x['type'] == 'ws-server' and 'metadata' in x['detail']
|
||||
return x['type'] == 'ws-server' and 'metadata' in x['detail'] and type(x['detail']) != str
|
||||
|
||||
ws_logs = list(filter(_get_ws_server_logs, logs_from_requests))
|
||||
assert len(ws_logs) > 0
|
||||
|
Loading…
Reference in New Issue
Block a user