Add multiplexed subscription testing

PR-URL: https://github.com/hasura/graphql-engine-mono/pull/6637
GitOrigin-RevId: a92b333b88c548ba514e69598a86098d8f6243cc
This commit is contained in:
Tom Harding 2022-11-02 09:38:26 +00:00 committed by hasura-bot
parent 7b4a7917ce
commit 7df6198b68
2 changed files with 144 additions and 42 deletions

View File

@ -60,7 +60,7 @@ tests opts = withSubscriptions $ do
shouldBe = shouldReturnYaml opts
describe "Websockets-based live queries" do
it "Sanity check" \(mkSubscription, testEnvironment) -> do
it "Hasura sends updated query results after insert" \(mkSubscription, testEnvironment) -> do
let schemaName :: Schema.SchemaName
schemaName = Schema.getSchemaName testEnvironment
query <-
@ -122,7 +122,86 @@ tests opts = withSubscriptions $ do
- id: 2
name: "B"
|]
getNextResponse query `shouldBe` expected
it "Multiplexes" \(mkSubscription, testEnvironment) -> do
let schemaName :: Schema.SchemaName
schemaName = Schema.getSchemaName testEnvironment
subIdEq3 <-
mkSubscription
[graphql|
subscription {
#{schemaName}_example(where: { id: { _eq: 3 } }) {
id
name
}
}
|]
subIdEq4 <-
mkSubscription
[graphql|
subscription {
#{schemaName}_example(where: { id: { _eq: 4 } }) {
id
name
}
}
|]
getNextResponse subIdEq3
`shouldBe` [yaml|
data:
hasura_example: []
|]
getNextResponse subIdEq4
`shouldBe` [yaml|
data:
hasura_example: []
|]
let expected :: Value
expected =
[yaml|
data:
insert_hasura_example:
affected_rows: 2
|]
actual :: IO Value
actual = getNextResponse query
actual =
postGraphql
testEnvironment
[graphql|
mutation {
insert_#{schemaName}_example(
objects:
[ {id: 3, name: "A"},
{id: 4, name: "B"}
]
) {
affected_rows
}
}
|]
actual `shouldBe` expected
getNextResponse subIdEq3
`shouldBe` [yaml|
data:
hasura_example:
- id: 3
name: "A"
|]
getNextResponse subIdEq4
`shouldBe` [yaml|
data:
hasura_example:
- id: 4
name: "B"
|]

View File

@ -54,14 +54,16 @@ import Control.Lens (preview)
import Control.Monad.Trans.Managed (ManagedT (..), lowerManagedT)
import Data.Aeson
import Data.Aeson.Encode.Pretty as AP
import Data.Aeson.Lens (key)
import Data.Aeson.Lens (key, _String)
import Data.Aeson.QQ.Simple
import Data.Aeson.Types (Pair)
import Data.Environment qualified as Env
import Data.IORef (IORef, atomicModifyIORef', newIORef, readIORef)
import Data.Map.Strict qualified as Map
import Data.Text qualified as T
import Data.Time (getCurrentTime)
import Harness.Constants qualified as Constants
import Harness.Exceptions (SomeException (SomeException), bracket, catch, displayException, withFrozenCallStack)
import Harness.Exceptions (bracket, throw, withFrozenCallStack)
import Harness.Http qualified as Http
import Harness.Quoter.Yaml (yaml)
import Harness.TestEnvironment (BackendSettings (..), Server (..), TestEnvironment (..), getServer, serverUrl, testLog, testLogBytestring)
@ -76,7 +78,6 @@ import Hasura.Server.Prometheus (makeDummyPrometheusMetrics)
import Network.Socket qualified as Socket
import Network.Wai.Handler.Warp qualified as Warp
import Network.WebSockets qualified as WS
import System.Log.FastLogger
import System.Metrics qualified as EKG
import System.Timeout (timeout)
import Test.Hspec
@ -417,13 +418,26 @@ withSubscriptions = aroundAllWith \actionWithSubAndTest testEnvironment -> do
-- send initialization message
WS.sendTextData conn (encode initMessage)
-- open communication channel with responses
messageMVar <- newEmptyMVar
-- Open communication channel with responses.
--
-- TODO: this currently doesn't capture any notion of ordering across
-- subscriptions (only within a single subscription). This might be
-- something we want to change in future. For now, we can tell that
-- response @S@ comes before response @T@ with a single identifier.
handlers <- newIORef Map.empty
-- counter for subscriptions
subNextId <- newTVarIO 1
let -- Is this an actual message or client/server busywork?
let -- Shorthand for an 'atomicModifyIORef'' call that returns no information.
atomicModify :: IORef x -> (x -> x) -> IO ()
atomicModify ref f = atomicModifyIORef' ref \x -> (f x, ())
-- Convenience function for converting JSON values to strings.
jsonToString :: Value -> String
jsonToString = T.unpack . WS.fromLazyByteString . encode
-- Is this an actual message or client/server busywork?
isInteresting :: Value -> Bool
isInteresting res =
preview (key "type") res
@ -431,62 +445,71 @@ withSubscriptions = aroundAllWith \actionWithSubAndTest testEnvironment -> do
Just "connection_ack" -- connection acknowledged
]
-- listens for server responses and populates @messageMVar@ with the new response.
-- It will only read one message at a time because it is blocked by reading/writing
-- to the MVar.
-- Will throw an exception to the other thread if it encounters an error.
-- listens for server responses and populates @handlers@ with the new
-- response. It will only read one message at a time because it is
-- blocked by reading/writing to the MVar. Will throw an exception to
-- the other thread if it encounters an error.
responseListener :: IO ()
responseListener = do
res <- eitherDecode <$> WS.receiveData conn
case res of
Left err -> do
logger testEnvironment $ toLogStr $ "subscription decode failed: " ++ err
putMVar messageMVar (Left err)
testLog testEnvironment $ "subscription decode failed: " ++ err
throw $ userError err
Right msg -> do
when (isInteresting msg) do
logger testEnvironment (toLogStr $ "subscriptions message: " ++ show msg)
case preview (key "payload") msg of
testLog testEnvironment $ "subscriptions message: " ++ jsonToString msg
let maybePayload :: Maybe Value
maybePayload = preview (key "payload") msg
maybeIdentifier :: Maybe Text
maybeIdentifier = preview (key "id" . _String) msg
case liftA2 (,) maybePayload maybeIdentifier of
Nothing -> do
logger testEnvironment (toLogStr ("Unable to parse message" :: Text))
putMVar messageMVar (Left $ "Unable to parse message: " ++ show msg)
Just payload -> putMVar messageMVar (Right payload)
testLog testEnvironment "Unable to parse message"
throw $ userError ("Unable to parse message: " ++ show msg)
Just (payload, identifier) ->
readIORef handlers >>= \mvars ->
case Map.lookup identifier mvars of
Just mvar -> putMVar mvar (Right payload)
Nothing -> throw (userError "Unexpected handler identifier")
responseListener
-- Create a subscription over this websocket connection.
-- Will be used by the user to create new subscriptions.
-- The handled can be used to request the next response.
-- Create a subscription over this websocket connection. Will be used
-- by the user to create new subscriptions. The handled can be used to
-- request the next response.
mkSub :: Value -> IO SubscriptionHandle
mkSub query = do
-- each subscription has an id, this manages a new id for each subscription.
subId <- atomically do
currSubId <- readTVar subNextId
let !next = currSubId + 1
writeTVar subNextId next
pure currSubId
messageBox <- newEmptyMVar
atomicModify handlers (Map.insert (tshow subId) messageBox)
-- initialize a connection.
WS.sendTextData conn (encode $ startQueryMessage subId query)
pure $ SubscriptionHandle messageMVar
pure $ SubscriptionHandle messageBox
handleExceptionsAndTimeout action = do
let time = seconds subscriptionsTimeoutTime
catch
( do
res <-
timeout
(fromIntegral $ diffTimeToMicroSeconds time)
action
case res of
Nothing ->
error ("Subscription exceeded the allotted time of: " <> show time)
Just () ->
pure ()
)
( \(SomeException err) ->
putMVar messageMVar (Left $ displayException err)
)
_ <- Async.async (handleExceptionsAndTimeout responseListener)
timeout (fromIntegral $ diffTimeToMicroSeconds time) action >>= \case
Nothing -> throw $ userError do
"Subscription exceeded the allotted time of: " <> show time
Just _ -> pure ()
-- @withAsync@ will take care of cancelling the 'responseListener' thread
-- for us once the test has been executed.
Async.withAsync (handleExceptionsAndTimeout responseListener) \_ -> do
actionWithSubAndTest (mkSub, testEnvironment)
-- | Get the next response received on a subscription.