server: add parameterized query hash for websocket logs

https://github.com/hasura/graphql-engine-mono/pull/2061

Co-authored-by: Naveen Naidu <30195193+Naveenaidu@users.noreply.github.com>
GitOrigin-RevId: bf26b804d93b19ef7fc15c71fec80fb6d6632e64
This commit is contained in:
Anon Ray 2021-09-06 17:56:45 +05:30 committed by hasura-bot
parent 94f3ad041c
commit dc1ac69dac
4 changed files with 169 additions and 60 deletions

View File

@ -772,6 +772,8 @@ case "$SERVER_TEST_TO_RUN" in
export HASURA_GRAPHQL_ADMIN_SECRET="HGE$RANDOM$RANDOM"
export HASURA_GRAPHQL_ENABLED_LOG_TYPES=" startup,http-log,webhook-log,websocket-log,query-log"
export HASURA_GRAPHQL_LOG_LEVEL="debug"
#run_hge_with_args serve
# we are doing this instead of calling run_hge_with_args, because we want to save in a custom log file

View File

@ -3,13 +3,13 @@
{-# LANGUAGE CPP #-}
module Hasura.GraphQL.Transport.WebSocket
( onConn
( -- | the main handlers for the websocket server
onConn
, onMessage
, onClose
-- ^ the main handlers for the websocket server
-- | helpers for sending messages to the client
, sendMsg
, sendCloseWithMsg
-- ^ helpers for sending messages to the client
) where
-- NOTE!:
@ -64,6 +64,7 @@ import Hasura.Backends.Postgres.Instances.Transport (runPGMutationTran
import Hasura.Base.Error
import Hasura.EncJSON
import Hasura.GraphQL.Logging
import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash)
import Hasura.GraphQL.Parser.Directives (cached)
import Hasura.GraphQL.Transport.Backend
import Hasura.GraphQL.Transport.HTTP (MonadExecuteQuery (..),
@ -110,11 +111,12 @@ $(J.deriveToJSON
data OperationDetails
= OperationDetails
{ _odOperationId :: !OperationId
, _odRequestId :: !(Maybe RequestId)
, _odOperationName :: !(Maybe OperationName)
, _odOperationType :: !OpDetail
, _odQuery :: !(Maybe GQLReqUnparsed)
{ _odOperationId :: !OperationId
, _odRequestId :: !(Maybe RequestId)
, _odOperationName :: !(Maybe OperationName)
, _odOperationType :: !OpDetail
, _odQuery :: !(Maybe GQLReqUnparsed)
, _odParameterizedQueryHash :: !(Maybe ParameterizedQueryHash)
} deriving (Show, Eq)
$(J.deriveToJSON hasuraJSON ''OperationDetails)
@ -215,8 +217,15 @@ sendCloseWithMsg logger wsConn errCode mErrServerMsg = do
wsc = WS.getRawWebSocketConnection wsConn
errMsg = encodeServerErrorMsg errCode
sendMsgWithMetadata :: (MonadIO m) => WSConn -> ServerMsg -> LQ.LiveQueryMetadata -> m ()
sendMsgWithMetadata wsConn msg (LQ.LiveQueryMetadata execTime) =
sendMsgWithMetadata
:: (MonadIO m)
=> WSConn
-> ServerMsg
-> Maybe OperationName
-> Maybe ParameterizedQueryHash
-> LQ.LiveQueryMetadata
-> m ()
sendMsgWithMetadata wsConn msg opName paramQueryHash (LQ.LiveQueryMetadata execTime) =
liftIO $ WS.sendMsg wsConn $ WS.WSQueueResponse bs wsInfo
where
bs = encodeServerMsg msg
@ -227,8 +236,10 @@ sendMsgWithMetadata wsConn msg (LQ.LiveQueryMetadata execTime) =
wsInfo = Just $! WS.WSEventInfo
{ WS._wseiEventType = msgType
, WS._wseiOperationId = operationId
, WS._wseiOperationName = opName
, WS._wseiQueryExecutionTime = Just $! realToFrac execTime
, WS._wseiResponseSize = Just $! LBS.length bs
, WS._wseiParameterizedQueryHash = paramQueryHash
}
onConn :: (MonadIO m, MonadReader WSServerEnv m)
@ -352,11 +363,12 @@ onStart
-> m ()
onStart env enabledLogTypes serverEnv wsConn (StartMsg opId q) onMessageActions = catchAndIgnore $ do
timerTot <- startTimer
opM <- liftIO $ STM.atomically $ STMMap.lookup opId opMap
op <- liftIO $ STM.atomically $ STMMap.lookup opId opMap
let opName = _grOperationName q
-- NOTE: it should be safe to rely on this check later on in this function, since we expect that
-- we process all operations on a websocket connection serially:
when (isJust opM) $ withComplete $ sendStartErr $
when (isJust op) $ withComplete $ sendStartErr $
"an operation already exists with this id: " <> unOperationId opId
userInfoM <- liftIO $ STM.readTVarIO userInfoR
@ -401,7 +413,7 @@ onStart env enabledLogTypes serverEnv wsConn (StartMsg opId q) onMessageActions
case cachedValue of
Just cachedResponseData -> do
logQueryLog logger $ QueryLog q Nothing requestId QueryLogKindCached
sendSuccResp cachedResponseData $ LQ.LiveQueryMetadata 0
sendSuccResp cachedResponseData opName parameterizedQueryHash $ LQ.LiveQueryMetadata 0
Nothing -> do
conclusion <- runExceptT $ forWithKey queryPlan $ \fieldName -> \case
E.ExecStepDB _headers exists remoteJoins -> doQErr $ do
@ -434,7 +446,7 @@ onStart env enabledLogTypes serverEnv wsConn (StartMsg opId q) onMessageActions
E.ExecStepRaw json -> do
logQueryLog logger $ QueryLog q Nothing requestId QueryLogKindIntrospection
buildRaw json
buildResultFromFragments Telem.Query timerTot requestId conclusion
buildResultFromFragments Telem.Query timerTot requestId conclusion opName parameterizedQueryHash
case conclusion of
Left _ -> pure ()
Right results -> do
@ -443,7 +455,8 @@ onStart env enabledLogTypes serverEnv wsConn (StartMsg opId q) onMessageActions
void $ Tracing.interpTraceT (withExceptT mempty) $
cacheStore cacheKey cachedDirective $ encJFromInsOrdHashMap $
rfResponse <$> OMap.mapKeys G.unName results
liftIO $ sendCompleted (Just requestId)
liftIO $ sendCompleted (Just requestId) (Just parameterizedQueryHash)
E.MutationExecutionPlan mutationPlan -> do
-- See Note [Backwards-compatible transaction optimisation]
@ -458,7 +471,7 @@ onStart env enabledLogTypes serverEnv wsConn (StartMsg opId q) onMessageActions
telemLocality = Telem.Local
telemTimeIO = convertDuration telemTimeIO_DT
telemTimeTot <- Seconds <$> timerTot
sendSuccResp (encJFromInsOrdHashMap $ OMap.mapKeys G.unName results) $
sendSuccResp (encJFromInsOrdHashMap $ OMap.mapKeys G.unName results) opName parameterizedQueryHash $
LQ.LiveQueryMetadata telemTimeIO_DT
-- Telemetry. NOTE: don't time network IO:
Telem.recordTimingMetric Telem.RequestDimensions{..} Telem.RequestTimings{..}
@ -497,8 +510,8 @@ onStart env enabledLogTypes serverEnv wsConn (StartMsg opId q) onMessageActions
E.ExecStepRaw json -> do
logQueryLog logger $ QueryLog q Nothing requestId QueryLogKindIntrospection
buildRaw json
buildResultFromFragments Telem.Query timerTot requestId conclusion
liftIO $ sendCompleted (Just requestId)
buildResultFromFragments Telem.Query timerTot requestId conclusion opName parameterizedQueryHash
liftIO $ sendCompleted (Just requestId) (Just parameterizedQueryHash)
E.SubscriptionExecutionPlan subExec -> do
case subExec of
@ -507,7 +520,7 @@ onStart env enabledLogTypes serverEnv wsConn (StartMsg opId q) onMessageActions
liftIO do
let allActionIds = map fst $ OMap.elems actions
case NE.nonEmpty allActionIds of
Nothing -> sendCompleted $ Just requestId
Nothing -> sendCompleted (Just requestId) (Just parameterizedQueryHash)
Just actionIds -> do
let sendResponseIO actionLogMap = do
(dTime, resultsE) <- withElapsedTime $ runExceptT $
@ -521,10 +534,10 @@ onStart env enabledLogTypes serverEnv wsConn (StartMsg opId q) onMessageActions
let dataMsg = sendDataMsg $
DataMsg opId $ pure $ encJToLBS $
encJFromInsOrdHashMap $ OMap.mapKeys G.unName results
sendMsgWithMetadata wsConn dataMsg $ LQ.LiveQueryMetadata dTime
sendMsgWithMetadata wsConn dataMsg opName (Just parameterizedQueryHash) $ LQ.LiveQueryMetadata dTime
asyncActionQueryLive = LQ.LAAQNoRelationships $
LQ.LiveAsyncActionQueryWithNoRelationships sendResponseIO (sendCompleted (Just requestId))
LQ.LiveAsyncActionQueryWithNoRelationships sendResponseIO (sendCompleted (Just requestId) (Just parameterizedQueryHash))
LQ.addAsyncActionLiveQuery (LQ._lqsAsyncActions lqMap) opId actionIds
(sendError requestId) asyncActionQueryLive
@ -556,7 +569,7 @@ onStart env enabledLogTypes serverEnv wsConn (StartMsg opId q) onMessageActions
nonEmptyActionIds onUnexpectedException
asyncActionQueryLive
liftIO $ logOpEv ODStarted (Just requestId)
liftIO $ logOpEv ODStarted (Just requestId) (Just parameterizedQueryHash)
where
sendDataMsg = WS._wsaGetDataMessageType onMessageActions
closeConnAction = WS._wsaConnectionCloseAction onMessageActions
@ -583,12 +596,12 @@ onStart env enabledLogTypes serverEnv wsConn (StartMsg opId q) onMessageActions
Left (Right err) -> postExecErr requestId err
Right results -> f results
buildResultFromFragments telemQueryType timerTot requestId r =
buildResultFromFragments telemQueryType timerTot requestId r opName pqh =
buildResult requestId r \results -> do
let telemLocality = foldMap rfLocality results
telemTimeIO = convertDuration $ sum $ fmap rfTimeIO results
telemTimeTot <- Seconds <$> timerTot
sendSuccResp (encJFromInsOrdHashMap (fmap rfResponse (OMap.mapKeys G.unName results))) $
sendSuccResp (encJFromInsOrdHashMap (fmap rfResponse (OMap.mapKeys G.unName results))) opName pqh $
LQ.LiveQueryMetadata $ sum $ fmap rfTimeIO results
-- Telemetry. NOTE: don't time network IO:
Telem.recordTimingMetric Telem.RequestDimensions{..} Telem.RequestTimings{..}
@ -612,11 +625,11 @@ onStart env enabledLogTypes serverEnv wsConn (StartMsg opId q) onMessageActions
WSConnData userInfoR opMap errRespTy queryType = WS.getData wsConn
logOpEv opTy reqId =
logOpEv opTy reqId parameterizedQueryHash =
-- See Note [Disable query printing when query-log is disabled]
let queryToLog = bool Nothing (Just q) (Set.member L.ELTQueryLog enabledLogTypes)
in logWSEvent logger wsConn $ EOperation $
OperationDetails opId reqId (_grOperationName q) opTy queryToLog
OperationDetails opId reqId (_grOperationName q) opTy queryToLog parameterizedQueryHash
getErrFn ERTLegacy = encodeQErr
getErrFn ERTGraphqlCompliant = encodeGQLErr
@ -625,17 +638,17 @@ onStart env enabledLogTypes serverEnv wsConn (StartMsg opId q) onMessageActions
let errFn = getErrFn errRespTy
sendMsg wsConn $
SMErr $ ErrorMsg opId $ errFn False $ err400 StartFailed e
liftIO $ logOpEv (ODProtoErr e) Nothing
liftIO $ logOpEv (ODProtoErr e) Nothing Nothing
liftIO $ closeConnAction wsConn opId (T.unpack e)
sendCompleted reqId = do
sendCompleted reqId paramQueryHash = do
sendMsg wsConn (SMComplete . CompletionMsg $ opId)
logOpEv ODCompleted reqId
logOpEv ODCompleted reqId paramQueryHash
postExecErr :: RequestId -> QErr -> ExceptT () m ()
postExecErr reqId qErr = do
let errFn = getErrFn errRespTy False
liftIO $ logOpEv (ODQueryErr qErr) (Just reqId)
liftIO $ logOpEv (ODQueryErr qErr) (Just reqId) Nothing
postExecErr' $ GQExecError $ pure $ errFn qErr
postExecErr' :: GQExecError -> ExceptT () m ()
@ -646,21 +659,27 @@ onStart env enabledLogTypes serverEnv wsConn (StartMsg opId q) onMessageActions
sendError reqId qErr = do
let errFn = getErrFn errRespTy
logOpEv (ODQueryErr qErr) (Just reqId)
logOpEv (ODQueryErr qErr) (Just reqId) Nothing
let err = case errRespTy of
ERTLegacy -> errFn False qErr
ERTGraphqlCompliant -> J.object ["errors" J..= [errFn False qErr]]
sendMsg wsConn (SMErr $ ErrorMsg opId err)
sendSuccResp :: EncJSON -> LQ.LiveQueryMetadata -> ExceptT () m ()
sendSuccResp encJson =
sendMsgWithMetadata wsConn $
sendDataMsg $ DataMsg opId $ pure $ encJToLBS encJson
sendSuccResp
:: EncJSON
-> Maybe OperationName
-> ParameterizedQueryHash
-> LQ.LiveQueryMetadata
-> ExceptT () m ()
sendSuccResp encJson opName queryHash =
sendMsgWithMetadata wsConn
(sendDataMsg $ DataMsg opId $ pure $ encJToLBS encJson)
opName (Just queryHash)
withComplete :: ExceptT () m () -> ExceptT () m a
withComplete action = do
action
liftIO $ sendCompleted Nothing
liftIO $ sendCompleted Nothing Nothing
throwError ()
restartLiveQuery parameterizedQueryHash requestId liveQueryBuilder lqId actionLogMap = do
@ -676,7 +695,16 @@ onStart env enabledLogTypes serverEnv wsConn (StartMsg opId q) onMessageActions
-- crucial we don't lose lqId after addLiveQuery returns successfully.
!lqId <- liftIO $ AB.dispatchAnyBackend @BackendTransport exists
\(E.MultiplexedLiveQueryPlan liveQueryPlan) ->
LQ.addLiveQuery logger (_wseServerMetrics serverEnv) subscriberMetadata lqMap sourceName parameterizedQueryHash opName requestId liveQueryPlan liveQOnChange
LQ.addLiveQuery logger
(_wseServerMetrics serverEnv)
subscriberMetadata
lqMap
sourceName
parameterizedQueryHash
opName
requestId
liveQueryPlan
(liveQOnChange opName parameterizedQueryHash)
#ifndef PROFILING
liftIO $ $assertNFHere (lqId, opName) -- so we don't write thunks to mutable vars
#endif
@ -687,12 +715,13 @@ onStart env enabledLogTypes serverEnv wsConn (StartMsg opId q) onMessageActions
pure lqId
-- on change, send message on the websocket
liveQOnChange :: LQ.OnChange
liveQOnChange = \case
liveQOnChange :: Maybe OperationName -> ParameterizedQueryHash -> LQ.OnChange
liveQOnChange opName queryHash = \case
Right (LQ.LiveQueryResponse bs dTime) ->
sendMsgWithMetadata wsConn
(sendDataMsg $ DataMsg opId $ pure $ LBS.fromStrict bs)
(LQ.LiveQueryMetadata dTime)
(sendDataMsg $ DataMsg opId $ pure $ LBS.fromStrict bs)
opName (Just queryHash)
(LQ.LiveQueryMetadata dTime)
resp -> sendMsg wsConn $
sendDataMsg $ DataMsg opId $ LBS.fromStrict . LQ._lqrPayload <$> resp
@ -785,7 +814,7 @@ stopOperation serverEnv wsConn opId logWhenOpNotExist = do
logger = _wseLogger serverEnv
lqMap = _wseLiveQMap serverEnv
opMap = _wscOpMap $ WS.getData wsConn
opDet n = OperationDetails opId Nothing n ODStopped Nothing
opDet n = OperationDetails opId Nothing n ODStopped Nothing Nothing
onConnInit
:: (HasVersion, MonadIO m, UserAuthentication (Tracing.TraceT m))

View File

@ -31,6 +31,7 @@ import qualified Network.WebSockets as WS
import qualified StmContainers.Map as STMMap
import qualified System.IO.Error as E
import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash)
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.GraphQL.Transport.WebSocket.Protocol
import qualified Hasura.Logging as L
@ -72,10 +73,12 @@ $(J.deriveToJSON
-- extra websocket event info
data WSEventInfo
= WSEventInfo
{ _wseiEventType :: !(Maybe ServerMsgType)
, _wseiOperationId :: !(Maybe OperationId)
, _wseiQueryExecutionTime :: !(Maybe Double)
, _wseiResponseSize :: !(Maybe Int64)
{ _wseiEventType :: !(Maybe ServerMsgType)
, _wseiOperationId :: !(Maybe OperationId)
, _wseiOperationName :: !(Maybe OperationName)
, _wseiQueryExecutionTime :: !(Maybe Double)
, _wseiResponseSize :: !(Maybe Int64)
, _wseiParameterizedQueryHash :: !(Maybe ParameterizedQueryHash)
} deriving (Show, Eq)
$(J.deriveToJSON
J.defaultOptions { J.fieldLabelModifier = J.snakeCase . drop 5

View File

@ -10,6 +10,19 @@ from context import PytestConf
if not PytestConf.config.getoption("--test-logging"):
pytest.skip("--test-logging missing, skipping tests", allow_module_level=True)
def parse_logs():
# parse the log file into a json list
log_file = os.getenv('LOGGING_TEST_LOGFILE_PATH', None)
if not log_file:
print('Could not determine log file path to test logging!')
assert False
loglines = []
with open(log_file, 'r') as f:
loglines = f.readlines()
logs = list(map(lambda x: json.loads(x.strip()), loglines))
assert len(logs) > 0
return logs
class TestLogging():
dir = 'queries/logging'
success_query = {'query': 'query { hello {code name} }'}
@ -70,26 +83,13 @@ class TestLogging():
assert resp.status_code == 401 and 'error' in resp.json()
# gather and parse the logs now
self.logs = self._parse_logs(hge_ctx)
self.logs = parse_logs()
# sometimes the log might take time to buffer
time.sleep(2)
yield
finally:
self._teardown(hge_ctx)
def _parse_logs(self, hge_ctx):
# parse the log file into a json list
log_file = os.getenv('LOGGING_TEST_LOGFILE_PATH', None)
if not log_file:
print('Could not determine log file path to test logging!')
assert False
loglines = []
with open(log_file, 'r') as f:
loglines = f.readlines()
logs = list(map(lambda x: json.loads(x.strip()), loglines))
assert len(logs) > 0
return logs
def test_startup_logs(self, hge_ctx):
def _get_server_config(x):
return x['type'] == 'startup' and \
@ -205,3 +205,78 @@ class TestLogging():
assert http_logs[0]['detail']['operation']['error']['code'] == 'access-denied'
assert http_logs[0]['detail']['operation'].get('query') is None
assert http_logs[0]['detail']['operation']['raw_query'] is not None
class TestWebsocketLogging():
"""
Test logs emitted on websocket transport
1. websocket-log
2. ws-server
"""
dir = 'queries/logging'
query = {
'query': 'query GetHello { hello {code name} }',
'operationName': 'GetHello'
}
query_id = 'successful-ws-log-test'
def _teardown(self, hge_ctx):
st_code, resp = hge_ctx.v1q_f(self.dir + '/teardown.yaml')
assert st_code == 200, resp
@pytest.fixture(autouse=True)
def transact(self, hge_ctx):
# setup some tables
st_code, resp = hge_ctx.v1q_f(self.dir + '/setup.yaml')
assert st_code == 200, resp
try:
# make a successful websocket query
headers = {'x-request-id': self.query_id}
if hge_ctx.hge_key:
headers['x-hasura-admin-secret'] = hge_ctx.hge_key
resp = hge_ctx.ws_client.send_query(self.query, headers=headers,
query_id=self.query_id,
timeout=5)
try:
ev = next(resp)
assert ev['type'] == 'data' and ev['id'] == self.query_id, ev
finally:
hge_ctx.ws_client.stop(self.query_id)
# sometimes the log might take time to buffer
time.sleep(2)
# gather and parse the logs now
self.logs = parse_logs()
yield
finally:
self._teardown(hge_ctx)
def test_websocket_log(self, hge_ctx):
"""
tests for the `websocket-log` type. currently tests presence of operation_name
"""
def _get_websocket_operation_logs(x):
return x['type'] == 'websocket-log' and x['detail']['event']['type'] == 'operation'
ws_logs = list(filter(_get_websocket_operation_logs, self.logs))
assert len(ws_logs) > 0
onelog = ws_logs[0]['detail']['event']['detail']
assert 'request_id' in onelog
assert 'operation_name' in onelog
assert 'query' in onelog
assert 'query' in onelog['query']
def test_ws_server_log(self, hge_ctx):
"""
tests for the `websocket-log` type. currently tests presence of operation_name
"""
def _get_ws_server_logs(x):
return x['type'] == 'ws-server' and 'metadata' in x['detail']
ws_logs = list(filter(_get_ws_server_logs, self.logs))
assert len(ws_logs) > 0
onelog = ws_logs[0]['detail']
assert 'operation_id' in onelog['metadata']
assert 'operation_name' in onelog['metadata']