diff --git a/server/lib/api-tests/test/Test/Subscriptions/LiveQueriesSpec.hs b/server/lib/api-tests/test/Test/Subscriptions/LiveQueriesSpec.hs index 175f1cf7da9..dffd340842a 100644 --- a/server/lib/api-tests/test/Test/Subscriptions/LiveQueriesSpec.hs +++ b/server/lib/api-tests/test/Test/Subscriptions/LiveQueriesSpec.hs @@ -60,7 +60,7 @@ tests opts = withSubscriptions $ do shouldBe = shouldReturnYaml opts describe "Websockets-based live queries" do - it "Sanity check" \(mkSubscription, testEnvironment) -> do + it "Hasura sends updated query results after insert" \(mkSubscription, testEnvironment) -> do let schemaName :: Schema.SchemaName schemaName = Schema.getSchemaName testEnvironment query <- @@ -122,7 +122,86 @@ tests opts = withSubscriptions $ do - id: 2 name: "B" |] - actual :: IO Value - actual = getNextResponse query - actual `shouldBe` expected + getNextResponse query `shouldBe` expected + + it "Multiplexes" \(mkSubscription, testEnvironment) -> do + let schemaName :: Schema.SchemaName + schemaName = Schema.getSchemaName testEnvironment + + subIdEq3 <- + mkSubscription + [graphql| + subscription { + #{schemaName}_example(where: { id: { _eq: 3 } }) { + id + name + } + } + |] + + subIdEq4 <- + mkSubscription + [graphql| + subscription { + #{schemaName}_example(where: { id: { _eq: 4 } }) { + id + name + } + } + |] + + getNextResponse subIdEq3 + `shouldBe` [yaml| + data: + hasura_example: [] + |] + + getNextResponse subIdEq4 + `shouldBe` [yaml| + data: + hasura_example: [] + |] + + let expected :: Value + expected = + [yaml| + data: + insert_hasura_example: + affected_rows: 2 + |] + + actual :: IO Value + actual = + postGraphql + testEnvironment + [graphql| + mutation { + insert_#{schemaName}_example( + objects: + [ {id: 3, name: "A"}, + {id: 4, name: "B"} + ] + ) { + affected_rows + } + } + |] + + actual `shouldBe` expected + + getNextResponse subIdEq3 + `shouldBe` [yaml| + data: + hasura_example: + - id: 3 + name: "A" + |] + + getNextResponse subIdEq4 + `shouldBe` [yaml| + data: + hasura_example: + - id: 4 + name: "B" + |] diff --git a/server/lib/test-harness/src/Harness/GraphqlEngine.hs b/server/lib/test-harness/src/Harness/GraphqlEngine.hs index bd210df23be..9fe3a66a41e 100644 --- a/server/lib/test-harness/src/Harness/GraphqlEngine.hs +++ b/server/lib/test-harness/src/Harness/GraphqlEngine.hs @@ -54,14 +54,16 @@ import Control.Lens (preview) import Control.Monad.Trans.Managed (ManagedT (..), lowerManagedT) import Data.Aeson import Data.Aeson.Encode.Pretty as AP -import Data.Aeson.Lens (key) +import Data.Aeson.Lens (key, _String) import Data.Aeson.QQ.Simple import Data.Aeson.Types (Pair) import Data.Environment qualified as Env +import Data.IORef (IORef, atomicModifyIORef', newIORef, readIORef) +import Data.Map.Strict qualified as Map import Data.Text qualified as T import Data.Time (getCurrentTime) import Harness.Constants qualified as Constants -import Harness.Exceptions (SomeException (SomeException), bracket, catch, displayException, withFrozenCallStack) +import Harness.Exceptions (bracket, throw, withFrozenCallStack) import Harness.Http qualified as Http import Harness.Quoter.Yaml (yaml) import Harness.TestEnvironment (BackendSettings (..), Server (..), TestEnvironment (..), getServer, serverUrl, testLog, testLogBytestring) @@ -76,7 +78,6 @@ import Hasura.Server.Prometheus (makeDummyPrometheusMetrics) import Network.Socket qualified as Socket import Network.Wai.Handler.Warp qualified as Warp import Network.WebSockets qualified as WS -import System.Log.FastLogger import System.Metrics qualified as EKG import System.Timeout (timeout) import Test.Hspec @@ -417,13 +418,26 @@ withSubscriptions = aroundAllWith \actionWithSubAndTest testEnvironment -> do -- send initialization message WS.sendTextData conn (encode initMessage) - -- open communication channel with responses - messageMVar <- newEmptyMVar + -- Open communication channel with responses. + -- + -- TODO: this currently doesn't capture any notion of ordering across + -- subscriptions (only within a single subscription). This might be + -- something we want to change in future. For now, we can tell that + -- response @S@ comes before response @T@ with a single identifier. + handlers <- newIORef Map.empty -- counter for subscriptions subNextId <- newTVarIO 1 - let -- Is this an actual message or client/server busywork? + let -- Shorthand for an 'atomicModifyIORef'' call that returns no information. + atomicModify :: IORef x -> (x -> x) -> IO () + atomicModify ref f = atomicModifyIORef' ref \x -> (f x, ()) + + -- Convenience function for converting JSON values to strings. + jsonToString :: Value -> String + jsonToString = T.unpack . WS.fromLazyByteString . encode + + -- Is this an actual message or client/server busywork? isInteresting :: Value -> Bool isInteresting res = preview (key "type") res @@ -431,63 +445,72 @@ withSubscriptions = aroundAllWith \actionWithSubAndTest testEnvironment -> do Just "connection_ack" -- connection acknowledged ] - -- listens for server responses and populates @messageMVar@ with the new response. - -- It will only read one message at a time because it is blocked by reading/writing - -- to the MVar. - -- Will throw an exception to the other thread if it encounters an error. + -- listens for server responses and populates @handlers@ with the new + -- response. It will only read one message at a time because it is + -- blocked by reading/writing to the MVar. Will throw an exception to + -- the other thread if it encounters an error. responseListener :: IO () responseListener = do res <- eitherDecode <$> WS.receiveData conn + case res of Left err -> do - logger testEnvironment $ toLogStr $ "subscription decode failed: " ++ err - putMVar messageMVar (Left err) + testLog testEnvironment $ "subscription decode failed: " ++ err + throw $ userError err Right msg -> do when (isInteresting msg) do - logger testEnvironment (toLogStr $ "subscriptions message: " ++ show msg) - case preview (key "payload") msg of + testLog testEnvironment $ "subscriptions message: " ++ jsonToString msg + + let maybePayload :: Maybe Value + maybePayload = preview (key "payload") msg + + maybeIdentifier :: Maybe Text + maybeIdentifier = preview (key "id" . _String) msg + + case liftA2 (,) maybePayload maybeIdentifier of Nothing -> do - logger testEnvironment (toLogStr ("Unable to parse message" :: Text)) - putMVar messageMVar (Left $ "Unable to parse message: " ++ show msg) - Just payload -> putMVar messageMVar (Right payload) + testLog testEnvironment "Unable to parse message" + throw $ userError ("Unable to parse message: " ++ show msg) + Just (payload, identifier) -> + readIORef handlers >>= \mvars -> + case Map.lookup identifier mvars of + Just mvar -> putMVar mvar (Right payload) + Nothing -> throw (userError "Unexpected handler identifier") + responseListener - -- Create a subscription over this websocket connection. - -- Will be used by the user to create new subscriptions. - -- The handled can be used to request the next response. + -- Create a subscription over this websocket connection. Will be used + -- by the user to create new subscriptions. The handled can be used to + -- request the next response. mkSub :: Value -> IO SubscriptionHandle mkSub query = do -- each subscription has an id, this manages a new id for each subscription. subId <- atomically do currSubId <- readTVar subNextId let !next = currSubId + 1 + writeTVar subNextId next pure currSubId + + messageBox <- newEmptyMVar + atomicModify handlers (Map.insert (tshow subId) messageBox) + -- initialize a connection. WS.sendTextData conn (encode $ startQueryMessage subId query) - pure $ SubscriptionHandle messageMVar + pure $ SubscriptionHandle messageBox handleExceptionsAndTimeout action = do let time = seconds subscriptionsTimeoutTime - catch - ( do - res <- - timeout - (fromIntegral $ diffTimeToMicroSeconds time) - action - case res of - Nothing -> - error ("Subscription exceeded the allotted time of: " <> show time) - Just () -> - pure () - ) - ( \(SomeException err) -> - putMVar messageMVar (Left $ displayException err) - ) - _ <- Async.async (handleExceptionsAndTimeout responseListener) + timeout (fromIntegral $ diffTimeToMicroSeconds time) action >>= \case + Nothing -> throw $ userError do + "Subscription exceeded the allotted time of: " <> show time + Just _ -> pure () - actionWithSubAndTest (mkSub, testEnvironment) + -- @withAsync@ will take care of cancelling the 'responseListener' thread + -- for us once the test has been executed. + Async.withAsync (handleExceptionsAndTimeout responseListener) \_ -> do + actionWithSubAndTest (mkSub, testEnvironment) -- | Get the next response received on a subscription. -- Blocks until data is available.