From 14b4de37f762c4d1b1da7aff5bb723483c45f925 Mon Sep 17 00:00:00 2001 From: paritosh-08 <85472423+paritosh-08@users.noreply.github.com> Date: Tue, 13 Jun 2023 14:52:36 +0530 Subject: [PATCH] server: close subscriptions (with 1012) on metadata change PR-URL: https://github.com/hasura/graphql-engine-mono/pull/9159 GitOrigin-RevId: 5ab25ef57782c126de8852f3546809feb66a9d44 --- .../graphql-engine-flags/reference.mdx | 14 ++++++ .../lib/test-harness/src/Harness/Constants.hs | 3 +- .../src/Harness/GlobalTestEnvironment.hs | 3 +- .../test-harness/src/Harness/GraphqlEngine.hs | 48 +++++++++++-------- .../test-harness/src/Harness/Test/Protocol.hs | 14 ++---- server/src-lib/Hasura/App.hs | 1 + server/src-lib/Hasura/App/State.hs | 9 ++-- .../GraphQL/Execute/Subscription/State.hs | 2 +- .../Hasura/GraphQL/Transport/WebSocket.hs | 19 ++++++++ .../GraphQL/Transport/WebSocket/Server.hs | 44 ++++++++++------- .../Hasura/RQL/DDL/Schema/Cache/Config.hs | 3 +- server/src-lib/Hasura/Server/API/Metadata.hs | 11 ++++- server/src-lib/Hasura/Server/App.hs | 11 +++-- server/src-lib/Hasura/Server/Init.hs | 2 + .../Hasura/Server/Init/Arg/Command/Serve.hs | 21 +++++++- server/src-lib/Hasura/Server/Init/Config.hs | 6 ++- server/src-lib/Hasura/Server/Init/Env.hs | 3 ++ server/src-lib/Hasura/Server/Types.hs | 21 ++++++++ server/src-test/Hasura/Server/Init/ArgSpec.hs | 13 +++++ server/src-test/Hasura/Server/InitSpec.hs | 3 +- server/test-postgres/Constants.hs | 3 +- server/test-postgres/Main.hs | 1 + .../Test/Hasura/StreamingSubscriptionSuite.hs | 2 +- server/tests-py/context.py | 11 ++++- server/tests-py/test_logging.py | 2 +- 25 files changed, 203 insertions(+), 67 deletions(-) diff --git a/docs/docs/deployment/graphql-engine-flags/reference.mdx b/docs/docs/deployment/graphql-engine-flags/reference.mdx index 05c743cce36..1a1e3dd0f1e 100644 --- a/docs/docs/deployment/graphql-engine-flags/reference.mdx +++ b/docs/docs/deployment/graphql-engine-flags/reference.mdx @@ -206,6 +206,20 @@ for JSON encoding-decoding. | **Default** | `false` | | **Supported in** | CE, Enterprise Edition | +### Close WebSocket connections on metadata change + +When metadata changes, close all WebSocket connections (with error code `1012`). This is useful when you want to ensure +that all clients reconnect to the latest metadata. + +| | | +| ------------------- | ----------------------------------------------------- | +| **Flag** | `--disable-close-websockets-on-metadata-change` | +| **Env var** | `HASURA_GRAPHQL_CLOSE_WEBSOCKETS_ON_METADATA_CHANGE` | +| **Accepted values** | Boolean | +| **Options** | `true` or `false` | +| **Default** | `true` | +| **Supported in** | CE, Enterprise Edition, Cloud | + ### Connections per Read-Replica The maximum number of Postgres connections per [read-replica](databases/database-config/read-replicas.mdx) that can be diff --git a/server/lib/test-harness/src/Harness/Constants.hs b/server/lib/test-harness/src/Harness/Constants.hs index 3bda0730eb1..d5ec6650b27 100644 --- a/server/lib/test-harness/src/Harness/Constants.hs +++ b/server/lib/test-harness/src/Harness/Constants.hs @@ -308,7 +308,8 @@ serveOptions = soDefaultNamingConvention = Init._default Init.defaultNamingConventionOption, soExtensionsSchema = ExtensionsSchema "public", soMetadataDefaults = emptyMetadataDefaults, - soApolloFederationStatus = ApolloFederationDisabled + soApolloFederationStatus = ApolloFederationDisabled, + soCloseWebsocketsOnMetadataChangeStatus = Init._default Init.closeWebsocketsOnMetadataChangeOption } -- | What log level should be used by the engine; this is not exported, and diff --git a/server/lib/test-harness/src/Harness/GlobalTestEnvironment.hs b/server/lib/test-harness/src/Harness/GlobalTestEnvironment.hs index db55e9469d4..cb1b3e4a128 100644 --- a/server/lib/test-harness/src/Harness/GlobalTestEnvironment.hs +++ b/server/lib/test-harness/src/Harness/GlobalTestEnvironment.hs @@ -22,7 +22,6 @@ import Harness.Logging.Messages import Harness.Services.Composed qualified as Services import Harness.Test.BackendType import Hasura.Prelude -import Network.WebSockets qualified as WS -- | static information across an entire test suite run data GlobalTestEnvironment = GlobalTestEnvironment @@ -88,7 +87,7 @@ instance Show GlobalTestEnvironment where -- | How should we make requests to `graphql-engine`? Both WebSocket- and HTTP- -- based requests are supported. -data Protocol = HTTP | WebSocket WS.Connection +data Protocol = HTTP | WebSocket -- | Credentials for our testing modes. See 'SpecHook.setupTestingMode' for the -- practical consequences of this type. diff --git a/server/lib/test-harness/src/Harness/GraphqlEngine.hs b/server/lib/test-harness/src/Harness/GraphqlEngine.hs index 071d18e62db..f52630d266d 100644 --- a/server/lib/test-harness/src/Harness/GraphqlEngine.hs +++ b/server/lib/test-harness/src/Harness/GraphqlEngine.hs @@ -69,7 +69,7 @@ import Harness.Http qualified as Http import Harness.Logging import Harness.Quoter.Yaml (fromYaml, yaml) import Harness.Services.GraphqlEngine -import Harness.TestEnvironment (Protocol (..), Server (..), TestEnvironment (..), TestingRole (..), getServer, requestProtocol, serverUrl, traceIf) +import Harness.TestEnvironment (Protocol (..), Server (..), TestEnvironment (..), TestingRole (..), getServer, requestProtocol, server, serverUrl, traceIf) import Harness.WebSockets (responseListener, sendMessages) import Hasura.App qualified as App import Hasura.Logging (Hasura) @@ -150,38 +150,46 @@ postGraphqlViaHttpOrWebSocketWithHeadersStatus :: (HasCallStack) => Int -> TestEnvironment -> Http.RequestHeaders -> Value -> IO Value postGraphqlViaHttpOrWebSocketWithHeadersStatus statusCode testEnv headers requestBody = do withFrozenCallStack $ case requestProtocol (globalEnvironment testEnv) of - WebSocket connection -> postWithHeadersStatusViaWebSocket testEnv connection (addAuthzHeaders testEnv headers) requestBody + WebSocket -> postWithHeadersStatusViaWebSocket testEnv (addAuthzHeaders testEnv headers) requestBody HTTP -> postWithHeadersStatus statusCode testEnv "/v1/graphql" headers requestBody -- | Post some JSON to graphql-engine, getting back more JSON, via websockets. -- -- This will be used by 'postWithHeadersStatus' if the 'TestEnvironment' sets -- the 'requestProtocol' to 'WebSocket'. -postWithHeadersStatusViaWebSocket :: TestEnvironment -> WS.Connection -> Http.RequestHeaders -> Value -> IO Value -postWithHeadersStatusViaWebSocket testEnv connection headers requestBody = do +postWithHeadersStatusViaWebSocket :: TestEnvironment -> Http.RequestHeaders -> Value -> IO Value +postWithHeadersStatusViaWebSocket testEnv headers requestBody = do let preparedHeaders :: HashMap Text ByteString preparedHeaders = HashMap.fromList [ (decodeUtf8 (original key), value) | (key, value) <- headers ] - sendMessages - testEnv - connection - [ object - [ "type" .= String "connection_init", - "payload" .= object ["headers" .= preparedHeaders] - ], - object - [ "id" .= String "some-request-id", - "type" .= String "start", - "payload" .= requestBody - ] - ] + server' = server (globalEnvironment testEnv) + port' = port server' + host = T.unpack (T.drop 7 (T.pack (urlPrefix server'))) + path = "/v1/graphql" - responseListener testEnv connection \_ type' payload -> do - when (type' `notElem` ["data", "error"]) $ fail ("Websocket message type " <> T.unpack type' <> " received. Payload: " <> Char8.unpack (encode payload)) - pure payload + -- paritosh: We are initiating a websocket connection for each request and sending the request instead of initiating a + -- websocket connection for the test and reusing the connection for requests. This is done to avoid managing the + -- connection (as we do have metadata changes as part of test and HGE closes websockets on metadata changes). + WS.runClient host (fromIntegral port') path \connection -> do + sendMessages + testEnv + connection + [ object + [ "type" .= String "connection_init", + "payload" .= object ["headers" .= preparedHeaders] + ], + object + [ "id" .= String "some-request-id", + "type" .= String "start", + "payload" .= requestBody + ] + ] + responseListener testEnv connection \_ type' payload -> do + when (type' `notElem` ["data", "error"]) $ fail ("Websocket message type " <> T.unpack type' <> " received. Payload: " <> Char8.unpack (encode payload)) + pure payload -- | Post some JSON to graphql-engine, getting back more JSON. -- diff --git a/server/lib/test-harness/src/Harness/Test/Protocol.hs b/server/lib/test-harness/src/Harness/Test/Protocol.hs index 02f6620ad5e..fdf2dcffe64 100644 --- a/server/lib/test-harness/src/Harness/Test/Protocol.hs +++ b/server/lib/test-harness/src/Harness/Test/Protocol.hs @@ -3,11 +3,9 @@ module Harness.Test.Protocol ) where -import GHC.Word (Word16) import Harness.TestEnvironment (GlobalTestEnvironment, Protocol (..)) import Harness.TestEnvironment qualified as TestEnvironment import Hasura.Prelude -import Network.WebSockets qualified as WS import Test.Hspec (ActionWith, SpecWith, aroundAllWith, describe) import Test.Hspec.Core.Spec (Item (..), mapSpecItem_) @@ -25,14 +23,10 @@ withEachProtocol spec = do let connectWS :: ActionWith GlobalTestEnvironment -> ActionWith GlobalTestEnvironment connectWS k globalTestEnvironment = do - let port' :: Word16 - port' = TestEnvironment.port (TestEnvironment.server globalTestEnvironment) - - WS.runClient "127.0.0.1" (fromIntegral port') "/v1/graphql" \connection -> - k - globalTestEnvironment - { TestEnvironment.requestProtocol = WebSocket connection - } + k + globalTestEnvironment + { TestEnvironment.requestProtocol = WebSocket + } aroundAllWith connectWS $ describe "Over WebSockets" $ flip mapSpecItem_ spec \item -> item {itemExample = \params -> itemExample item params} diff --git a/server/src-lib/Hasura/App.hs b/server/src-lib/Hasura/App.hs index 0921635f8e9..2a546fd09fb 100644 --- a/server/src-lib/Hasura/App.hs +++ b/server/src-lib/Hasura/App.hs @@ -508,6 +508,7 @@ initialiseAppContext env serveOptions@ServeOptions {..} AppInit {..} = do soDefaultNamingConvention soMetadataDefaults soApolloFederationStatus + soCloseWebsocketsOnMetadataChangeStatus -- Create the schema cache rebuildableSchemaCache <- diff --git a/server/src-lib/Hasura/App/State.hs b/server/src-lib/Hasura/App/State.hs index 443a906e05c..58ef57d021d 100644 --- a/server/src-lib/Hasura/App/State.hs +++ b/server/src-lib/Hasura/App/State.hs @@ -158,7 +158,8 @@ data AppContext = AppContext acEnableTelemetry :: TelemetryStatus, acEventEngineCtx :: EventEngineCtx, acAsyncActionsFetchInterval :: OptionalInterval, - acApolloFederationStatus :: ApolloFederationStatus + acApolloFederationStatus :: ApolloFederationStatus, + acCloseWebsocketsOnMetadataChangeStatus :: CloseWebsocketsOnMetadataChangeStatus } -- | Collection of the LoggerCtx, the regular Logger and the PGLogger @@ -268,7 +269,8 @@ buildAppContextRule = proc (ServeOptions {..}, env, _keys) -> do acEnableTelemetry = soEnableTelemetry, acEventEngineCtx = eventEngineCtx, acAsyncActionsFetchInterval = soAsyncActionsFetchInterval, - acApolloFederationStatus = soApolloFederationStatus + acApolloFederationStatus = soApolloFederationStatus, + acCloseWebsocketsOnMetadataChangeStatus = soCloseWebsocketsOnMetadataChangeStatus } where buildSqlGenCtx = Inc.cache proc (experimentalFeatures, stringifyNum, dangerousBooleanCollapse) -> do @@ -340,5 +342,6 @@ buildCacheDynamicConfig AppContext {..} = do _cdcExperimentalFeatures = acExperimentalFeatures, _cdcDefaultNamingConvention = acDefaultNamingConvention, _cdcMetadataDefaults = acMetadataDefaults, - _cdcApolloFederationStatus = acApolloFederationStatus + _cdcApolloFederationStatus = acApolloFederationStatus, + _cdcCloseWebsocketsOnMetadataChangeStatus = acCloseWebsocketsOnMetadataChangeStatus } diff --git a/server/src-lib/Hasura/GraphQL/Execute/Subscription/State.hs b/server/src-lib/Hasura/GraphQL/Execute/Subscription/State.hs index 3cc802574af..8bb19e2a5f9 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/Subscription/State.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/Subscription/State.hs @@ -584,7 +584,7 @@ removeStreamingQuery logger serverMetrics prometheusMetrics subscriptionState (S (GaugeVector.dec numSubscriptionMetric promMetricGranularLabel) (GaugeVector.dec numSubscriptionMetric promMetricLabel) --- | An async action query whose relationships are refered to table in a source. +-- | An async action query whose relationships are referred to table in a source. -- We need to generate an SQL statement with the action response and execute it -- in the source database so as to fetch response joined with relationship rows. -- For more details see Note [Resolving async action query] diff --git a/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs b/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs index 48bf3c37c7e..277d048d63a 100644 --- a/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs +++ b/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs @@ -17,6 +17,9 @@ module Hasura.GraphQL.Transport.WebSocket onClose, sendMsg, sendCloseWithMsg, + mkCloseWebsocketsOnMetadataChangeAction, + runWebsocketCloseOnMetadataChangeAction, + WebsocketCloseOnMetadataChangeAction, ) where @@ -68,6 +71,7 @@ import Hasura.GraphQL.Transport.Instances () import Hasura.GraphQL.Transport.WebSocket.Protocol import Hasura.GraphQL.Transport.WebSocket.Server qualified as WS import Hasura.GraphQL.Transport.WebSocket.Types +import Hasura.GraphQL.Transport.WebSocket.Types qualified as WS import Hasura.Logging qualified as L import Hasura.Metadata.Class import Hasura.Prelude @@ -1213,3 +1217,18 @@ onClose logger serverMetrics prometheusMetrics subscriptionsState wsConn granula StreamingQuerySubscriber streamSubscriberId -> ES.removeStreamingQuery logger serverMetrics prometheusMetrics subscriptionsState streamSubscriberId granularPrometheusMetricsState operationName where opMap = _wscOpMap $ WS.getData wsConn + +newtype WebsocketCloseOnMetadataChangeAction = WebsocketCloseOnMetadataChangeAction + { runWebsocketCloseOnMetadataChangeAction :: IO () + } + +-- | By default, we close all the websocket connections when the metadata changes. This function is used to create the +-- action that will be run when the metadata changes. +mkCloseWebsocketsOnMetadataChangeAction :: WS.WSServer WS.WSConnData -> WebsocketCloseOnMetadataChangeAction +mkCloseWebsocketsOnMetadataChangeAction wsServer = + WebsocketCloseOnMetadataChangeAction + $ WS.closeAllConnectionsWithReason + wsServer + "Closing all websocket connections as the metadata has changed" + "Server state changed, restarting the server" + id diff --git a/server/src-lib/Hasura/GraphQL/Transport/WebSocket/Server.hs b/server/src-lib/Hasura/GraphQL/Transport/WebSocket/Server.hs index 417f964a010..6f624939383 100644 --- a/server/src-lib/Hasura/GraphQL/Transport/WebSocket/Server.hs +++ b/server/src-lib/Hasura/GraphQL/Transport/WebSocket/Server.hs @@ -24,6 +24,7 @@ module Hasura.GraphQL.Transport.WebSocket.Server sendMsgAndCloseConn, createServerApp, createWSServer, + closeAllConnectionsWithReason, getData, getRawWebSocketConnection, getWSId, @@ -278,6 +279,22 @@ closeAllWith :: closeAllWith closer msg conns = void $ A.mapConcurrently (closer msg . snd) conns +closeAllConnectionsWithReason :: + WSServer a -> + String -> + BL.ByteString -> + (SecuritySensitiveUserConfig -> SecuritySensitiveUserConfig) -> + IO () +closeAllConnectionsWithReason (WSServer (L.Logger writeLog) userConfRef serverStatus) logMsg reason updateConf = do + writeLog + $ WSReaperThreadLog + $ fromString + $ logMsg + conns <- STM.atomically $ do + STM.modifyTVar' userConfRef updateConf + flushConnMap serverStatus + closeAllWith (flip forceConnReconnect) reason conns + -- | Resets the current connections map to an empty one if the server is -- running and returns the list of connections that were in the map -- before flushing it. @@ -354,7 +371,7 @@ data WSHandlers m a = WSHandlers -- stringify big query numeric, experimental features and invalidates/closes all -- connections if there are any changes. websocketConnectionReaper :: IO (AuthMode, AllowListStatus, CorsPolicy, SQLGenCtx, Set.HashSet ExperimentalFeature, NamingCase) -> IO SchemaCache -> WSServer a -> IO Void -websocketConnectionReaper getLatestConfig getSchemaCache (WSServer (L.Logger writeLog) userConfRef serverStatus) = +websocketConnectionReaper getLatestConfig getSchemaCache ws@(WSServer _ userConfRef _) = forever $ do (currAuthMode, currEnableAllowlist, currCorsPolicy, currSqlGenCtx, currExperimentalFeatures, currDefaultNamingCase) <- getLatestConfig currAllowlist <- scAllowlist <$> getSchemaCache @@ -370,21 +387,6 @@ websocketConnectionReaper getLatestConfig getSchemaCache (WSServer (L.Logger wri (currDefaultNamingCase, prevDefaultNamingCase) sleep $ seconds 1 where - closeAllConnectionsWithReason :: - String -> - BL.ByteString -> - (SecuritySensitiveUserConfig -> SecuritySensitiveUserConfig) -> - IO () - closeAllConnectionsWithReason logMsg reason updateConf = do - writeLog - $ WSReaperThreadLog - $ fromString - $ logMsg - conns <- STM.atomically $ do - STM.modifyTVar' userConfRef updateConf - flushConnMap serverStatus - closeAllWith (flip forceConnReconnect) reason conns - -- Close all connections based on - -- if CorsPolicy changed -> close -- if AuthMode changed -> close @@ -419,12 +421,14 @@ websocketConnectionReaper getLatestConfig getSchemaCache (WSServer (L.Logger wri -- if CORS policy has changed, close all connections | hasCorsPolicyChanged -> closeAllConnectionsWithReason + ws "closing all websocket connections as the cors policy changed" "cors policy changed" (\conf -> conf {ssucCorsPolicy = currCorsPolicy}) -- if any auth config has changed, close all connections | hasAuthModeChanged -> closeAllConnectionsWithReason + ws "closing all websocket connections as the auth mode changed" "auth mode changed" (\conf -> conf {ssucAuthMode = currAuthMode}) @@ -434,6 +438,7 @@ websocketConnectionReaper getLatestConfig getSchemaCache (WSServer (L.Logger wri -- connections. | hasAllowlistEnabled -> closeAllConnectionsWithReason + ws "closing all websocket connections as allow list is enabled" "allow list enabled" (\conf -> conf {ssucEnableAllowlist = currEnableAllowlist}) @@ -441,42 +446,49 @@ websocketConnectionReaper getLatestConfig getSchemaCache (WSServer (L.Logger wri -- allowlist, we need to close all the connections. | hasAllowlistUpdated -> closeAllConnectionsWithReason + ws "closing all websocket connections as the allow list has been updated" "allow list updated" (\conf -> conf {ssucAllowlist = currAllowlist}) -- if HASURA_GRAPHQL_STRINGIFY_NUMERIC_TYPES has changed, close all connections | hasStringifyNumChanged -> closeAllConnectionsWithReason + ws "closing all websocket connections as the HASURA_GRAPHQL_STRINGIFY_NUMERIC_TYPES setting changed" "HASURA_GRAPHQL_STRINGIFY_NUMERIC_TYPES env var changed" (\conf -> conf {ssucSQLGenCtx = currSqlGenCtx}) -- if HASURA_GRAPHQL_V1_BOOLEAN_NULL_COLLAPSE has changed, close all connections | hasDangerousBooleanCollapseChanged -> closeAllConnectionsWithReason + ws "closing all websocket connections as the HASURA_GRAPHQL_V1_BOOLEAN_NULL_COLLAPSE setting changed" "HASURA_GRAPHQL_V1_BOOLEAN_NULL_COLLAPSE env var changed" (\conf -> conf {ssucSQLGenCtx = currSqlGenCtx}) -- if 'bigquery_string_numeric_input' option added/removed from experimental features, close all connections | hasBigqueryStringNumericInputChanged -> closeAllConnectionsWithReason + ws "closing all websocket connections as the 'bigquery_string_numeric_input' option has been added/removed from HASURA_GRAPHQL_EXPERIMENTAL_FEATURES" "'bigquery_string_numeric_input' removed/added in HASURA_GRAPHQL_EXPERIMENTAL_FEATURES env var" (\conf -> conf {ssucSQLGenCtx = currSqlGenCtx}) -- if 'hide_aggregation_predicates' option added/removed from experimental features, close all connections | hasHideAggregationPredicatesChanged -> closeAllConnectionsWithReason + ws "closing all websocket connections as the 'hide-aggregation-predicates' option has been added/removed from HASURA_GRAPHQL_EXPERIMENTAL_FEATURES" "'hide-aggregation-predicates' removed/added in HASURA_GRAPHQL_EXPERIMENTAL_FEATURES env var" (\conf -> conf {ssucExperimentalFeatures = currExperimentalFeatures}) -- if 'hide_stream_fields' option added/removed from experimental features, close all connections | hasHideStreamFieldsChanged -> closeAllConnectionsWithReason + ws "closing all websocket connections as the 'hide-stream-fields' option has been added/removed from HASURA_GRAPHQL_EXPERIMENTAL_FEATURES" "'hide-stream-fields' removed/added in HASURA_GRAPHQL_EXPERIMENTAL_FEATURES env var" (\conf -> conf {ssucExperimentalFeatures = currExperimentalFeatures}) -- if naming convention has been changed, close all connections | hasDefaultNamingCaseChanged -> closeAllConnectionsWithReason + ws "closing all websocket connections as the 'naming_convention' option has been added/removed from HASURA_GRAPHQL_EXPERIMENTAL_FEATURES and the HASURA_GRAPHQL_DEFAULT_NAMING_CONVENTION has changed" "naming convention has been changed" (\conf -> conf {ssucExperimentalFeatures = currExperimentalFeatures, ssucDefaultNamingCase = currDefaultNamingCase}) diff --git a/server/src-lib/Hasura/RQL/DDL/Schema/Cache/Config.hs b/server/src-lib/Hasura/RQL/DDL/Schema/Cache/Config.hs index c7e1f85c926..fc19cae9847 100644 --- a/server/src-lib/Hasura/RQL/DDL/Schema/Cache/Config.hs +++ b/server/src-lib/Hasura/RQL/DDL/Schema/Cache/Config.hs @@ -72,6 +72,7 @@ data CacheDynamicConfig = CacheDynamicConfig _cdcExperimentalFeatures :: HashSet ExperimentalFeature, _cdcDefaultNamingConvention :: NamingCase, _cdcMetadataDefaults :: MetadataDefaults, - _cdcApolloFederationStatus :: ApolloFederationStatus + _cdcApolloFederationStatus :: ApolloFederationStatus, + _cdcCloseWebsocketsOnMetadataChangeStatus :: CloseWebsocketsOnMetadataChangeStatus } deriving (Eq) diff --git a/server/src-lib/Hasura/Server/API/Metadata.hs b/server/src-lib/Hasura/Server/API/Metadata.hs index 0c4cd43de3e..a1e6cc4d79f 100644 --- a/server/src-lib/Hasura/Server/API/Metadata.hs +++ b/server/src-lib/Hasura/Server/API/Metadata.hs @@ -18,6 +18,7 @@ import Hasura.Base.Error import Hasura.EncJSON import Hasura.Eventing.Backend import Hasura.Function.API qualified as Functions +import Hasura.GraphQL.Transport.WebSocket qualified as WS import Hasura.Logging qualified as L import Hasura.LogicalModel.API qualified as LogicalModel import Hasura.Metadata.Class @@ -105,9 +106,10 @@ runMetadataQuery :: ) => AppContext -> RebuildableSchemaCache -> + WS.WebsocketCloseOnMetadataChangeAction -> RQLMetadata -> m (EncJSON, RebuildableSchemaCache) -runMetadataQuery appContext schemaCache RQLMetadata {..} = do +runMetadataQuery appContext schemaCache closeWebsocketsOnMetadataChange RQLMetadata {..} = do AppEnv {..} <- askAppEnv let logger = _lsLogger appEnvLoggers MetadataWithResourceVersion metadata currentResourceVersion <- Tracing.newSpan "fetchMetadata" $ liftEitherM fetchMetadata @@ -184,6 +186,13 @@ runMetadataQuery appContext schemaCache RQLMetadata {..} = do $ setMetadataResourceVersionInSchemaCache newResourceVersion & runCacheRWT dynamicConfig modSchemaCache + -- Close all subscriptions with 1012 code (subscribers should reconnect) + -- and close poller threads + when ((_cdcCloseWebsocketsOnMetadataChangeStatus dynamicConfig) == CWMCEnabled) + $ Tracing.newSpan "closeWebsocketsOnMetadataChange" + $ liftIO + $ WS.runWebsocketCloseOnMetadataChangeAction closeWebsocketsOnMetadataChange + pure (r, modSchemaCache') (MaintenanceModeEnabled (), ReadOnlyModeDisabled) -> throw500 "metadata cannot be modified in maintenance mode" diff --git a/server/src-lib/Hasura/Server/App.hs b/server/src-lib/Hasura/Server/App.hs index 3bad7049779..ca81eb31759 100644 --- a/server/src-lib/Hasura/Server/App.hs +++ b/server/src-lib/Hasura/Server/App.hs @@ -66,6 +66,7 @@ import Hasura.GraphQL.Logging (MonadExecutionLog, MonadQueryLog) import Hasura.GraphQL.Transport.HTTP qualified as GH import Hasura.GraphQL.Transport.HTTP.Protocol qualified as GH import Hasura.GraphQL.Transport.WSServerApp qualified as WS +import Hasura.GraphQL.Transport.WebSocket qualified as WS import Hasura.GraphQL.Transport.WebSocket.Server qualified as WS import Hasura.GraphQL.Transport.WebSocket.Types qualified as WS import Hasura.HTTP @@ -495,9 +496,10 @@ v1MetadataHandler :: UserInfoM m ) => ((RebuildableSchemaCache -> m (EncJSON, RebuildableSchemaCache)) -> m EncJSON) -> + WS.WebsocketCloseOnMetadataChangeAction -> RQLMetadata -> m (HttpResponse EncJSON) -v1MetadataHandler schemaCacheRefUpdater query = Tracing.newSpan "Metadata" $ do +v1MetadataHandler schemaCacheRefUpdater closeWebsocketsOnMetadataChangeAction query = Tracing.newSpan "Metadata" $ do (liftEitherM . authorizeV1MetadataApi query) =<< ask appContext <- asks hcAppContext r <- @@ -505,6 +507,7 @@ v1MetadataHandler schemaCacheRefUpdater query = Tracing.newSpan "Metadata" $ do runMetadataQuery appContext schemaCache + closeWebsocketsOnMetadataChangeAction query pure $ HttpResponse r [] @@ -805,6 +808,7 @@ mkWaiApp setupHook appStateRef consoleType ekgStore wsServerEnv = do Spock.spockAsApp $ Spock.spockT lowerIO $ httpApp setupHook appStateRef appEnv consoleType ekgStore + $ WS.mkCloseWebsocketsOnMetadataChangeAction (WS._wseServer wsServerEnv) let wsServerApp = WS.createWSServerApp (_lsEnabledLogTypes appEnvLoggingSettings) wsServerEnv appEnvWebSocketConnectionInitTimeout appEnvLicenseKeyCache stopWSServer = WS.stopWSServerApp wsServerEnv @@ -846,8 +850,9 @@ httpApp :: AppEnv -> ConsoleType m -> EKG.Store EKG.EmptyMetrics -> + WS.WebsocketCloseOnMetadataChangeAction -> Spock.SpockT m () -httpApp setupHook appStateRef AppEnv {..} consoleType ekgStore = do +httpApp setupHook appStateRef AppEnv {..} consoleType ekgStore closeWebsocketsOnMetadataChangeAction = do -- Additional spock action to run setupHook appStateRef @@ -970,7 +975,7 @@ httpApp setupHook appStateRef AppEnv {..} consoleType ekgStore = do $ spockAction encodeQErr id $ mkPostHandler $ fmap (emptyHttpLogGraphQLInfo,) - <$> mkAPIRespHandler (v1MetadataHandler schemaCacheUpdater) + <$> mkAPIRespHandler (v1MetadataHandler schemaCacheUpdater closeWebsocketsOnMetadataChangeAction) Spock.post "v2/query" $ do onlyWhenApiEnabled isMetadataEnabled appStateRef diff --git a/server/src-lib/Hasura/Server/Init.hs b/server/src-lib/Hasura/Server/Init.hs index 498c37ec4a8..4de009cc48e 100644 --- a/server/src-lib/Hasura/Server/Init.hs +++ b/server/src-lib/Hasura/Server/Init.hs @@ -214,6 +214,8 @@ mkServeOptions sor@ServeOptionsRaw {..} = do soApolloFederationStatus <- do apolloFederationStatusOptionM <- withOptionDefault (pure <$> rsoApolloFederationStatus) apolloFederationStatusOption pure $ getApolloFederationStatus soExperimentalFeatures apolloFederationStatusOptionM + soCloseWebsocketsOnMetadataChangeStatus <- do + withOptionDefault rsoCloseWebsocketsOnMetadataChangeStatus closeWebsocketsOnMetadataChangeOption pure ServeOptions {..} -- | Fetch Postgres 'Query.ConnParams' components from the environment diff --git a/server/src-lib/Hasura/Server/Init/Arg/Command/Serve.hs b/server/src-lib/Hasura/Server/Init/Arg/Command/Serve.hs index 6e0ce4d6d29..d4d5b4cd2e1 100644 --- a/server/src-lib/Hasura/Server/Init/Arg/Command/Serve.hs +++ b/server/src-lib/Hasura/Server/Init/Arg/Command/Serve.hs @@ -60,6 +60,7 @@ module Hasura.Server.Init.Arg.Command.Serve parseMetadataDefaults, metadataDefaultsOption, apolloFederationStatusOption, + closeWebsocketsOnMetadataChangeOption, -- * Pretty Printer serveCmdFooter, @@ -146,6 +147,7 @@ serveCommandParser = <*> parseExtensionsSchema <*> parseMetadataDefaults <*> parseApolloFederationStatus + <*> parseEnableCloseWebsocketsOnMetadataChange -------------------------------------------------------------------------------- -- Serve Options @@ -1158,6 +1160,22 @@ parseApolloFederationStatus = <> Opt.help (Config._helpMessage apolloFederationStatusOption) ) +closeWebsocketsOnMetadataChangeOption :: Config.Option (Types.CloseWebsocketsOnMetadataChangeStatus) +closeWebsocketsOnMetadataChangeOption = + Config.Option + { Config._default = Types.CWMCEnabled, + Config._envVar = "HASURA_GRAPHQL_CLOSE_WEBSOCKETS_ON_METADATA_CHANGE", + Config._helpMessage = "Close all the websocket connections (with error code 1012) on metadata change (default: true)." + } + +parseEnableCloseWebsocketsOnMetadataChange :: Opt.Parser (Maybe Types.CloseWebsocketsOnMetadataChangeStatus) +parseEnableCloseWebsocketsOnMetadataChange = + (bool Nothing (Just Types.CWMCDisabled)) + <$> Opt.switch + ( Opt.long "disable-close-websockets-on-metadata-change" + <> Opt.help (Config._helpMessage closeWebsocketsOnMetadataChangeOption) + ) + -------------------------------------------------------------------------------- -- Pretty Printer @@ -1256,6 +1274,7 @@ serveCmdFooter = Config.optionPP enableMetadataQueryLoggingOption, Config.optionPP defaultNamingConventionOption, Config.optionPP metadataDBExtensionsSchemaOption, - Config.optionPP apolloFederationStatusOption + Config.optionPP apolloFederationStatusOption, + Config.optionPP closeWebsocketsOnMetadataChangeOption ] eventEnvs = [Config.optionPP graphqlEventsHttpPoolSizeOption, Config.optionPP graphqlEventsFetchIntervalOption] diff --git a/server/src-lib/Hasura/Server/Init/Config.hs b/server/src-lib/Hasura/Server/Init/Config.hs index a0e4e7c2a50..645426f0ca1 100644 --- a/server/src-lib/Hasura/Server/Init/Config.hs +++ b/server/src-lib/Hasura/Server/Init/Config.hs @@ -320,7 +320,8 @@ data ServeOptionsRaw impl = ServeOptionsRaw rsoDefaultNamingConvention :: Maybe NamingCase, rsoExtensionsSchema :: Maybe MonadTx.ExtensionsSchema, rsoMetadataDefaults :: Maybe MetadataDefaults, - rsoApolloFederationStatus :: Maybe Server.Types.ApolloFederationStatus + rsoApolloFederationStatus :: Maybe Server.Types.ApolloFederationStatus, + rsoCloseWebsocketsOnMetadataChangeStatus :: Maybe Server.Types.CloseWebsocketsOnMetadataChangeStatus } -- | Whether or not to serve Console assets. @@ -618,7 +619,8 @@ data ServeOptions impl = ServeOptions soDefaultNamingConvention :: NamingCase, soExtensionsSchema :: MonadTx.ExtensionsSchema, soMetadataDefaults :: MetadataDefaults, - soApolloFederationStatus :: Server.Types.ApolloFederationStatus + soApolloFederationStatus :: Server.Types.ApolloFederationStatus, + soCloseWebsocketsOnMetadataChangeStatus :: Server.Types.CloseWebsocketsOnMetadataChangeStatus } -- | 'ResponseInternalErrorsConfig' represents the encoding of the diff --git a/server/src-lib/Hasura/Server/Init/Env.hs b/server/src-lib/Hasura/Server/Init/Env.hs index c9d40e1083d..2cd8cee1285 100644 --- a/server/src-lib/Hasura/Server/Init/Env.hs +++ b/server/src-lib/Hasura/Server/Init/Env.hs @@ -372,3 +372,6 @@ instance FromEnv Server.Types.ApolloFederationStatus where instance FromEnv GranularPrometheusMetricsState where fromEnv = fmap (bool GranularMetricsOff GranularMetricsOn) . fromEnv @Bool + +instance FromEnv Server.Types.CloseWebsocketsOnMetadataChangeStatus where + fromEnv = fmap (bool Server.Types.CWMCDisabled Server.Types.CWMCEnabled) . fromEnv @Bool diff --git a/server/src-lib/Hasura/Server/Types.hs b/server/src-lib/Hasura/Server/Types.hs index 2fcf71e78a4..86376b71f24 100644 --- a/server/src-lib/Hasura/Server/Types.hs +++ b/server/src-lib/Hasura/Server/Types.hs @@ -18,6 +18,8 @@ module Hasura.Server.Types ApolloFederationStatus (..), isApolloFederationEnabled, GranularPrometheusMetricsState (..), + CloseWebsocketsOnMetadataChangeStatus (..), + isCloseWebsocketsOnMetadataChangeStatusEnabled, MonadGetPolicies (..), ) where @@ -180,6 +182,25 @@ instance ToJSON GranularPrometheusMetricsState where GranularMetricsOff -> Bool False GranularMetricsOn -> Bool True +-- | Whether or not to close websocket connections on metadata change. +data CloseWebsocketsOnMetadataChangeStatus = CWMCEnabled | CWMCDisabled + deriving stock (Show, Eq, Ord, Generic) + +instance NFData CloseWebsocketsOnMetadataChangeStatus + +instance Hashable CloseWebsocketsOnMetadataChangeStatus + +instance FromJSON CloseWebsocketsOnMetadataChangeStatus where + parseJSON = fmap (bool CWMCDisabled CWMCEnabled) . parseJSON + +isCloseWebsocketsOnMetadataChangeStatusEnabled :: CloseWebsocketsOnMetadataChangeStatus -> Bool +isCloseWebsocketsOnMetadataChangeStatusEnabled = \case + CWMCEnabled -> True + CWMCDisabled -> False + +instance ToJSON CloseWebsocketsOnMetadataChangeStatus where + toJSON = toJSON . isCloseWebsocketsOnMetadataChangeStatusEnabled + class (Monad m) => MonadGetPolicies m where runGetApiTimeLimit :: m (Maybe MaxTime) diff --git a/server/src-test/Hasura/Server/Init/ArgSpec.hs b/server/src-test/Hasura/Server/Init/ArgSpec.hs index ffce1d98bfa..0ec1b22403c 100644 --- a/server/src-test/Hasura/Server/Init/ArgSpec.hs +++ b/server/src-test/Hasura/Server/Init/ArgSpec.hs @@ -2096,3 +2096,16 @@ serveParserSpec = Opt.Success enableApolloFederation -> enableApolloFederation == (Just Types.ApolloFederationEnabled) Opt.Failure _pf -> False Opt.CompletionInvoked _cr -> False + + Hspec.it "It accepts '--disable-close-websockets-on-metadata-change'" $ do + let -- Given + parserInfo = Opt.info (UUT.serveCommandParser @Logging.Hasura Opt.<**> Opt.helper) Opt.fullDesc + -- When + argInput = ["--disable-close-websockets-on-metadata-change"] + -- Then + result = Opt.execParserPure Opt.defaultPrefs parserInfo argInput + + fmap UUT.rsoCloseWebsocketsOnMetadataChangeStatus result `Hspec.shouldSatisfy` \case + Opt.Success disableCloseWebsocketsOnMetadataChange -> disableCloseWebsocketsOnMetadataChange == (Just Types.CWMCDisabled) + Opt.Failure _pf -> False + Opt.CompletionInvoked _cr -> False diff --git a/server/src-test/Hasura/Server/InitSpec.hs b/server/src-test/Hasura/Server/InitSpec.hs index 1ff9f3f5ffb..74e6ed96174 100644 --- a/server/src-test/Hasura/Server/InitSpec.hs +++ b/server/src-test/Hasura/Server/InitSpec.hs @@ -91,7 +91,8 @@ emptyServeOptionsRaw = rsoDefaultNamingConvention = Nothing, rsoExtensionsSchema = Nothing, rsoMetadataDefaults = Nothing, - rsoApolloFederationStatus = Nothing + rsoApolloFederationStatus = Nothing, + rsoCloseWebsocketsOnMetadataChangeStatus = Nothing } mkServeOptionsSpec :: Hspec.Spec diff --git a/server/test-postgres/Constants.hs b/server/test-postgres/Constants.hs index 99570924fce..a233f387a2f 100644 --- a/server/test-postgres/Constants.hs +++ b/server/test-postgres/Constants.hs @@ -90,7 +90,8 @@ serveOptions = soDefaultNamingConvention = Init._default Init.defaultNamingConventionOption, soExtensionsSchema = ExtensionsSchema "public", soMetadataDefaults = emptyMetadataDefaults, - soApolloFederationStatus = ApolloFederationDisabled + soApolloFederationStatus = ApolloFederationDisabled, + soCloseWebsocketsOnMetadataChangeStatus = Init._default Init.closeWebsocketsOnMetadataChangeOption } -- | What log level should be used by the engine; this is not exported, and diff --git a/server/test-postgres/Main.hs b/server/test-postgres/Main.hs index 64324638e45..e535c78bba6 100644 --- a/server/test-postgres/Main.hs +++ b/server/test-postgres/Main.hs @@ -126,6 +126,7 @@ main = do (_default defaultNamingConventionOption) emptyMetadataDefaults ApolloFederationDisabled + (_default closeWebsocketsOnMetadataChangeOption) cacheBuildParams = CacheBuildParams httpManager (mkPgSourceResolver print) mkMSSQLSourceResolver staticConfig (_appInit, appEnv) <- diff --git a/server/test-postgres/Test/Hasura/StreamingSubscriptionSuite.hs b/server/test-postgres/Test/Hasura/StreamingSubscriptionSuite.hs index 17e2073e84d..c1f26dbd778 100644 --- a/server/test-postgres/Test/Hasura/StreamingSubscriptionSuite.hs +++ b/server/test-postgres/Test/Hasura/StreamingSubscriptionSuite.hs @@ -193,7 +193,7 @@ streamingSubscriptionPollingSpec srcConfig = do -- well create a new function with the other variables being set to `mempty` mkCohortVariables' = mkCohortVariables mempty mempty mempty mempty - describe "Streaming subcription poll" $ do + describe "Streaming subscription poll" $ do cohortId1 <- runIO newCohortId (subscriberId1, subscriberId2) <- runIO $ (,) <$> newSubscriberId <*> newSubscriberId let subscriber1 = mkSubscriber subscriberId1 diff --git a/server/tests-py/context.py b/server/tests-py/context.py index 8132fdc28e6..2d00faecdc6 100644 --- a/server/tests-py/context.py +++ b/server/tests-py/context.py @@ -78,13 +78,20 @@ class GQLWsClient(): def get_ws_query_event(self, query_id, timeout): return self.ws_id_query_queues[query_id].get(timeout=timeout) - def send(self, frame): + def send(self, frame, count=0): self.wait_for_connection() if frame.get('type') == 'stop': self.ws_active_query_ids.discard( frame.get('id') ) elif frame.get('type') == 'start' and 'id' in frame: self.ws_id_query_queues[frame['id']] = queue.Queue(maxsize=-1) - self._ws.send(json.dumps(frame)) + try: + self._ws.send(json.dumps(frame)) + except websocket.WebSocketConnectionClosedException: + if count > 2: + raise websocket.WebSocketConnectionClosedException("Connection is already closed and cannot be recreated even after 3 attempts") + # Connection closed, try to recreate the connection and send the frame again + self.recreate_conn() + self.send(frame, count+1) def init_as_admin(self): headers={} diff --git a/server/tests-py/test_logging.py b/server/tests-py/test_logging.py index 34ae7a6e967..74bc1de1795 100644 --- a/server/tests-py/test_logging.py +++ b/server/tests-py/test_logging.py @@ -264,7 +264,7 @@ class TestWebsocketLogging: tests for the `websocket-log` type. currently tests presence of operation_name """ def _get_ws_server_logs(x): - return x['type'] == 'ws-server' and 'metadata' in x['detail'] + return x['type'] == 'ws-server' and 'metadata' in x['detail'] and type(x['detail']) != str ws_logs = list(filter(_get_ws_server_logs, logs_from_requests)) assert len(ws_logs) > 0