mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-09-20 15:09:02 +03:00
server: implement transaction workaround
GitOrigin-RevId: 404f8bbdefb8a99b31fac75e4253daad81c8af90
This commit is contained in:
parent
7f420ad1ae
commit
71ae144aa6
25
CHANGELOG.md
25
CHANGELOG.md
@ -3,6 +3,31 @@
|
||||
## Next release
|
||||
(Add entries here in the order of: server, console, cli, docs, others)
|
||||
|
||||
### Transactions for Postgres mutations
|
||||
|
||||
With v2 came the introduction of heterogeneous execution: in one query or mutation, you can target different sources: it is possible, for instance, in one mutation, to both insert a row in a table in a table on Postgres and another row in another table on MSSQL:
|
||||
|
||||
```
|
||||
mutation {
|
||||
// goes to Postgres
|
||||
insert_author_one(object: {name: "Simon Peyton Jones"}) {
|
||||
name
|
||||
}
|
||||
|
||||
// goes to MSSQL
|
||||
insert_publication_one(object: {name: "Template meta-programming for Haskell"}) {
|
||||
name
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
However, heterogeneous execution has a cost: we can no longer run mutations as a transaction, given that each part may target a different database. This is a regression compared to v1.
|
||||
|
||||
While we want to fix this by offering, in the future, an explicit API that allows our users to *choose* when a series of mutations are executed as a transaction, for now we are introducing the following optimisation: when all the fields in a mutation target the same Postgres source, we will run them as a transaction like we would have in v1.
|
||||
|
||||
|
||||
### Bug fixes and improvements
|
||||
|
||||
- server: add `--async-actions-fetch-interval` command-line flag and `HASURA_GRAPHQL_ASYNC_ACTIONS_FETCH_INTERVAL` environment variable for configuring
|
||||
async actions re-fetch interval from metadata storage (fix #6460)
|
||||
- server: add 'replace_configuration' option (default: false) in the add source API payload
|
||||
|
@ -57,10 +57,11 @@ msDBQueryPlan
|
||||
-> [HTTP.Header]
|
||||
-> UserInfo
|
||||
-> [G.Directive G.Name]
|
||||
-> SourceName
|
||||
-> SourceConfig 'MSSQL
|
||||
-> QueryDB 'MSSQL (UnpreparedValue 'MSSQL)
|
||||
-> m ExecutionStep
|
||||
msDBQueryPlan _env _manager _reqHeaders userInfo _directives sourceConfig qrf = do
|
||||
msDBQueryPlan _env _manager _reqHeaders userInfo _directives sourceName sourceConfig qrf = do
|
||||
select <- fromSelect <$> planNoPlan userInfo qrf
|
||||
let queryString = ODBC.renderQuery $ toQueryPretty select
|
||||
pool = _mscConnectionPool sourceConfig
|
||||
@ -68,7 +69,7 @@ msDBQueryPlan _env _manager _reqHeaders userInfo _directives sourceConfig qrf =
|
||||
pure
|
||||
$ ExecStepDB []
|
||||
. AB.mkAnyBackend
|
||||
$ DBStepInfo sourceConfig (Just queryString) odbcQuery
|
||||
$ DBStepInfo sourceName sourceConfig (Just queryString) odbcQuery
|
||||
|
||||
-- mutation
|
||||
|
||||
@ -81,10 +82,11 @@ msDBMutationPlan
|
||||
-> [HTTP.Header]
|
||||
-> UserInfo
|
||||
-> Bool
|
||||
-> SourceName
|
||||
-> SourceConfig 'MSSQL
|
||||
-> MutationDB 'MSSQL (UnpreparedValue 'MSSQL)
|
||||
-> m ExecutionStep
|
||||
msDBMutationPlan _env _manager _reqHeaders _userInfo _stringifyNum _sourceConfig _mrf =
|
||||
msDBMutationPlan _env _manager _reqHeaders _userInfo _stringifyNum _sourceName _sourceConfig _mrf =
|
||||
throw500 "mutations are not supported in MSSQL; this should be unreachable"
|
||||
|
||||
|
||||
@ -95,10 +97,11 @@ msDBSubscriptionPlan
|
||||
( MonadError QErr m
|
||||
)
|
||||
=> UserInfo
|
||||
-> SourceName
|
||||
-> SourceConfig 'MSSQL
|
||||
-> InsOrdHashMap G.Name (QueryDB 'MSSQL (UnpreparedValue 'MSSQL))
|
||||
-> m (LiveQueryPlan 'MSSQL (MultiplexedQuery 'MSSQL))
|
||||
msDBSubscriptionPlan userInfo sourceConfig rootFields = do
|
||||
msDBSubscriptionPlan userInfo _sourceName sourceConfig rootFields = do
|
||||
-- WARNING: only keeping the first root field for now!
|
||||
query <- traverse mkQuery $ head $ OMap.toList rootFields
|
||||
let roleName = _uiRole userInfo
|
||||
|
@ -58,17 +58,18 @@ pgDBQueryPlan
|
||||
-> [HTTP.Header]
|
||||
-> UserInfo
|
||||
-> [G.Directive G.Name]
|
||||
-> SourceName
|
||||
-> SourceConfig 'Postgres
|
||||
-> QueryDB 'Postgres (UnpreparedValue 'Postgres)
|
||||
-> m ExecutionStep
|
||||
pgDBQueryPlan env manager reqHeaders userInfo _directives sourceConfig qrf = do
|
||||
pgDBQueryPlan env manager reqHeaders userInfo _directives sourceName sourceConfig qrf = do
|
||||
(preparedQuery, PlanningSt _ _ planVals expectedVariables) <- flip runStateT initPlanningSt $ traverseQueryDB prepareWithPlan qrf
|
||||
validateSessionVariables expectedVariables $ _uiSession userInfo
|
||||
let (action, preparedSQL) = mkCurPlanTx env manager reqHeaders userInfo $ irToRootFieldPlan planVals preparedQuery
|
||||
pure
|
||||
$ ExecStepDB []
|
||||
. AB.mkAnyBackend
|
||||
$ DBStepInfo sourceConfig preparedSQL action
|
||||
$ DBStepInfo sourceName sourceConfig preparedSQL action
|
||||
|
||||
|
||||
-- mutation
|
||||
@ -164,10 +165,11 @@ pgDBMutationPlan
|
||||
-> [HTTP.Header]
|
||||
-> UserInfo
|
||||
-> Bool
|
||||
-> SourceName
|
||||
-> SourceConfig 'Postgres
|
||||
-> MutationDB 'Postgres (UnpreparedValue 'Postgres)
|
||||
-> m ExecutionStep
|
||||
pgDBMutationPlan env manager reqHeaders userInfo stringifyNum sourceConfig mrf =
|
||||
pgDBMutationPlan env manager reqHeaders userInfo stringifyNum sourceName sourceConfig mrf =
|
||||
go <$> case mrf of
|
||||
MDBInsert s -> convertInsert env userSession remoteJoinCtx s stringifyNum
|
||||
MDBUpdate s -> convertUpdate env userSession remoteJoinCtx s stringifyNum
|
||||
@ -176,7 +178,8 @@ pgDBMutationPlan env manager reqHeaders userInfo stringifyNum sourceConfig mrf =
|
||||
where
|
||||
userSession = _uiSession userInfo
|
||||
remoteJoinCtx = (manager, reqHeaders, userInfo)
|
||||
go = ExecStepDB [] . AB.mkAnyBackend . DBStepInfo sourceConfig Nothing
|
||||
go = ExecStepDB [] . AB.mkAnyBackend . DBStepInfo sourceName sourceConfig Nothing
|
||||
|
||||
|
||||
-- subscription
|
||||
|
||||
@ -186,10 +189,11 @@ pgDBSubscriptionPlan
|
||||
, MonadIO m
|
||||
)
|
||||
=> UserInfo
|
||||
-> SourceName
|
||||
-> SourceConfig 'Postgres
|
||||
-> InsOrdHashMap G.Name (QueryDB 'Postgres (UnpreparedValue 'Postgres))
|
||||
-> m (LiveQueryPlan 'Postgres (MultiplexedQuery 'Postgres))
|
||||
pgDBSubscriptionPlan userInfo sourceConfig unpreparedAST = do
|
||||
pgDBSubscriptionPlan userInfo _sourceName sourceConfig unpreparedAST = do
|
||||
(preparedAST, PGL.QueryParametersInfo{..}) <- flip runStateT mempty $
|
||||
for unpreparedAST $ traverseQueryDB PGL.resolveMultiplexedValue
|
||||
let multiplexedQuery = PGL.mkMultiplexedQuery preparedAST
|
||||
|
@ -1,11 +1,14 @@
|
||||
{-# OPTIONS_GHC -fno-warn-orphans #-}
|
||||
|
||||
module Hasura.Backends.Postgres.Instances.Transport () where
|
||||
module Hasura.Backends.Postgres.Instances.Transport
|
||||
( runPGMutationTransaction
|
||||
) where
|
||||
|
||||
import Hasura.Prelude
|
||||
|
||||
import qualified Data.Aeson as J
|
||||
import qualified Data.ByteString as B
|
||||
import qualified Data.HashMap.Strict.InsOrd as OMap
|
||||
import qualified Database.PG.Query as Q
|
||||
import qualified Language.GraphQL.Draft.Syntax as G
|
||||
|
||||
@ -110,3 +113,32 @@ mkQueryLog gqlQuery fieldName preparedSql requestId =
|
||||
where
|
||||
generatedQuery = preparedSql <&> \(EQ.PreparedSql query args _) ->
|
||||
GeneratedQuery (Q.getQueryText query) (J.toJSON $ pgScalarValueToJson . snd <$> args)
|
||||
|
||||
|
||||
-- ad-hoc transaction optimisation
|
||||
-- see Note [Backwards-compatible transaction optimisation]
|
||||
|
||||
runPGMutationTransaction
|
||||
:: ( MonadIO m
|
||||
, MonadError QErr m
|
||||
, MonadQueryLog m
|
||||
, MonadTrace m
|
||||
)
|
||||
=> RequestId
|
||||
-> GQLReqUnparsed
|
||||
-> UserInfo
|
||||
-> L.Logger L.Hasura
|
||||
-> SourceConfig 'Postgres
|
||||
-> InsOrdHashMap G.Name (DBStepInfo 'Postgres)
|
||||
-> m (DiffTime, InsOrdHashMap G.Name EncJSON)
|
||||
runPGMutationTransaction reqId query userInfo logger sourceConfig mutations = do
|
||||
logQueryLog logger $ mkQueryLog query $$(G.litName "transaction") Nothing reqId
|
||||
ctx <- Tracing.currentContext
|
||||
withElapsedTime $ do
|
||||
Tracing.interpTraceT (
|
||||
liftEitherM . liftIO . runExceptT
|
||||
. runLazyTx (_pscExecCtx sourceConfig) Q.ReadWrite
|
||||
. withTraceContext ctx
|
||||
. withUserInfo userInfo
|
||||
) $ flip OMap.traverseWithKey mutations \fieldName dbsi ->
|
||||
trace ("Postgres Mutation for root field " <>> fieldName) $ dbsiAction dbsi
|
||||
|
@ -239,7 +239,7 @@ buildSubscriptionPlan userInfo rootFields = do
|
||||
\(C.SourceConfigWith (sourceConfig :: SourceConfig b) _) -> do
|
||||
qdbs <- traverse (checkField @b sourceName) allFields
|
||||
LQP . AB.mkAnyBackend . MultiplexedLiveQueryPlan
|
||||
<$> EB.mkDBSubscriptionPlan userInfo sourceConfig qdbs
|
||||
<$> EB.mkDBSubscriptionPlan userInfo sourceName sourceConfig qdbs
|
||||
pure (sourceName, lqp)
|
||||
|
||||
checkField
|
||||
|
@ -24,6 +24,7 @@ import Hasura.GraphQL.Execute.LiveQuery.Plan
|
||||
import Hasura.GraphQL.Parser hiding (Type)
|
||||
import Hasura.RQL.IR.RemoteJoin
|
||||
import Hasura.RQL.Types.Backend
|
||||
import Hasura.RQL.Types.Common
|
||||
import Hasura.RQL.Types.Error
|
||||
import Hasura.RQL.Types.RemoteSchema
|
||||
import Hasura.SQL.Backend
|
||||
@ -55,6 +56,7 @@ class ( Backend b
|
||||
-> [HTTP.Header]
|
||||
-> UserInfo
|
||||
-> [G.Directive G.Name]
|
||||
-> SourceName
|
||||
-> SourceConfig b
|
||||
-> QueryDB b (UnpreparedValue b)
|
||||
-> m ExecutionStep
|
||||
@ -68,6 +70,7 @@ class ( Backend b
|
||||
-> [HTTP.Header]
|
||||
-> UserInfo
|
||||
-> Bool
|
||||
-> SourceName
|
||||
-> SourceConfig b
|
||||
-> MutationDB b (UnpreparedValue b)
|
||||
-> m ExecutionStep
|
||||
@ -77,15 +80,17 @@ class ( Backend b
|
||||
, MonadIO m
|
||||
)
|
||||
=> UserInfo
|
||||
-> SourceName
|
||||
-> SourceConfig b
|
||||
-> InsOrdHashMap G.Name (QueryDB b (UnpreparedValue b))
|
||||
-> m (LiveQueryPlan b (MultiplexedQuery b))
|
||||
|
||||
data DBStepInfo b =
|
||||
DBStepInfo
|
||||
(SourceConfig b)
|
||||
(Maybe (PreparedQuery b))
|
||||
(ExecutionMonad b EncJSON)
|
||||
data DBStepInfo b = DBStepInfo
|
||||
{ dbsiSourceName :: SourceName
|
||||
, dbsiSourceConfig :: SourceConfig b
|
||||
, dbsiPreparedQuery :: Maybe (PreparedQuery b)
|
||||
, dbsiAction :: ExecutionMonad b EncJSON
|
||||
}
|
||||
|
||||
-- | One execution step to processing a GraphQL query (e.g. one root field).
|
||||
data ExecutionStep where
|
||||
@ -119,5 +124,5 @@ getRemoteSchemaInfo
|
||||
. BackendExecute b
|
||||
=> DBStepInfo b
|
||||
-> [RemoteSchemaInfo]
|
||||
getRemoteSchemaInfo (DBStepInfo _ genSql _) =
|
||||
getRemoteSchemaInfo (DBStepInfo _ _ genSql _) =
|
||||
IR._rjRemoteSchema <$> maybe [] (getRemoteJoins @b) genSql
|
||||
|
@ -81,10 +81,10 @@ convertMutationSelectionSet env logger gqlContext SQLGenCtx{stringifyNum} userIn
|
||||
|
||||
-- Transform the RQL AST into a prepared SQL query
|
||||
txs <- for unpreparedQueries \case
|
||||
RFDB _ exists ->
|
||||
RFDB sourceName exists ->
|
||||
AB.dispatchAnyBackend @BackendExecute exists
|
||||
\(SourceConfigWith sourceConfig (MDBR db)) ->
|
||||
mkDBMutationPlan env manager reqHeaders userInfo stringifyNum sourceConfig db
|
||||
mkDBMutationPlan env manager reqHeaders userInfo stringifyNum sourceName sourceConfig db
|
||||
RFRemote remoteField -> do
|
||||
RemoteFieldG remoteSchemaInfo resolvedRemoteField <- resolveRemoteField userInfo remoteField
|
||||
pure $ buildExecStepRemote remoteSchemaInfo G.OperationTypeMutation $ [G.SelectionField resolvedRemoteField]
|
||||
|
@ -79,10 +79,10 @@ convertQuerySelSet env logger gqlContext userInfo manager reqHeaders directives
|
||||
-- Transform the query plans into an execution plan
|
||||
let usrVars = _uiSession userInfo
|
||||
executionPlan <- for unpreparedQueries \case
|
||||
RFDB _ exists ->
|
||||
RFDB sourceName exists ->
|
||||
AB.dispatchAnyBackend @BackendExecute exists
|
||||
\(SourceConfigWith sourceConfig (QDBR db)) ->
|
||||
mkDBQueryPlan env manager reqHeaders userInfo directives sourceConfig db
|
||||
mkDBQueryPlan env manager reqHeaders userInfo directives sourceName sourceConfig db
|
||||
RFRemote rf -> do
|
||||
RemoteFieldG remoteSchemaInfo remoteField <- for rf $ resolveRemoteVariable userInfo
|
||||
pure $ buildExecStepRemote remoteSchemaInfo G.OperationTypeQuery [G.SelectionField remoteField]
|
||||
|
@ -5,6 +5,7 @@ module Hasura.GraphQL.Transport.HTTP
|
||||
, MonadExecuteQuery(..)
|
||||
, runGQ
|
||||
, runGQBatched
|
||||
, coalescePostgresMutations
|
||||
, extractFieldFromResponse
|
||||
, buildRaw
|
||||
-- * imported from HTTP.Protocol; required by pro
|
||||
@ -45,6 +46,7 @@ import qualified Hasura.SQL.AnyBackend as AB
|
||||
import qualified Hasura.Server.Telemetry.Counters as Telem
|
||||
import qualified Hasura.Tracing as Tracing
|
||||
|
||||
import Hasura.Backends.Postgres.Instances.Transport (runPGMutationTransaction)
|
||||
import Hasura.EncJSON
|
||||
import Hasura.GraphQL.Context
|
||||
import Hasura.GraphQL.Logging (MonadQueryLog)
|
||||
@ -226,7 +228,7 @@ runGQ env logger reqId userInfo ipAddress reqHeaders queryType reqUnparsed = do
|
||||
E.ExecStepDB _headers exists -> doQErr $ do
|
||||
(telemTimeIO_DT, resp) <-
|
||||
AB.dispatchAnyBackend @BackendTransport exists
|
||||
\(EB.DBStepInfo sourceConfig genSql tx) ->
|
||||
\(EB.DBStepInfo _ sourceConfig genSql tx) ->
|
||||
runDBQuery
|
||||
reqId
|
||||
reqUnparsed
|
||||
@ -244,16 +246,41 @@ runGQ env logger reqId userInfo ipAddress reqHeaders queryType reqUnparsed = do
|
||||
pure $ ResultsFragment time Telem.Empty r []
|
||||
E.ExecStepRaw json ->
|
||||
buildRaw json
|
||||
out@(_, _, _, HttpResponse responseData _) <- buildResult Telem.Query conclusion responseHeaders
|
||||
out@(_, _, _, HttpResponse responseData _) <- buildResultFromFragments Telem.Query conclusion responseHeaders
|
||||
Tracing.interpTraceT (liftEitherM . runExceptT) $ cacheStore cacheKey $ snd responseData
|
||||
pure out
|
||||
|
||||
E.MutationExecutionPlan mutationPlans -> do
|
||||
{- Note [Backwards-compatible transaction optimisation]
|
||||
|
||||
For backwards compatibility, we perform the following optimisation: if all mutation steps
|
||||
are going to the same source, and that source is Postgres, we group all mutations as a
|
||||
transaction. This is a somewhat dangerous beaviour, and we would prefer, in the future,
|
||||
to make transactionality explicit rather than implicit and context-dependent.
|
||||
-}
|
||||
case coalescePostgresMutations mutationPlans of
|
||||
-- we are in the aforementioned case; we circumvent the normal process
|
||||
Just (sourceConfig, pgMutations) -> do
|
||||
resp <- runExceptT $ doQErr $
|
||||
runPGMutationTransaction reqId reqUnparsed userInfo logger sourceConfig pgMutations
|
||||
-- we do not construct result fragments since we have only one result
|
||||
buildResult Telem.Mutation resp \(telemTimeIO_DT, results) ->
|
||||
let responseData = Right $ encJToLBS $ encJFromInsOrdHashMap $ OMap.mapKeys G.unName results
|
||||
in ( Telem.Mutation
|
||||
, telemTimeIO_DT
|
||||
, Telem.Local
|
||||
, HttpResponse
|
||||
(Just responseData, encodeGQResp responseData)
|
||||
[]
|
||||
)
|
||||
|
||||
-- we are not in the transaction case; proceeding normally
|
||||
Nothing -> do
|
||||
conclusion <- runExceptT $ forWithKey mutationPlans $ \fieldName -> \case
|
||||
E.ExecStepDB responseHeaders exists -> doQErr $ do
|
||||
(telemTimeIO_DT, resp) <-
|
||||
AB.dispatchAnyBackend @BackendTransport exists
|
||||
\(EB.DBStepInfo sourceConfig genSql tx) ->
|
||||
\(EB.DBStepInfo _ sourceConfig genSql tx) ->
|
||||
runDBMutation
|
||||
reqId
|
||||
reqUnparsed
|
||||
@ -271,7 +298,7 @@ runGQ env logger reqId userInfo ipAddress reqHeaders queryType reqUnparsed = do
|
||||
pure $ ResultsFragment time Telem.Empty r $ fromMaybe [] hdrs
|
||||
E.ExecStepRaw json ->
|
||||
buildRaw json
|
||||
buildResult Telem.Mutation conclusion []
|
||||
buildResultFromFragments Telem.Mutation conclusion []
|
||||
|
||||
E.SubscriptionExecutionPlan _sub ->
|
||||
throw400 UnexpectedPayload "subscriptions are not supported over HTTP, use websockets instead"
|
||||
@ -293,18 +320,15 @@ runGQ env logger reqId userInfo ipAddress reqHeaders queryType reqUnparsed = do
|
||||
let filteredHeaders = filter ((== "Set-Cookie") . fst) remoteResponseHeaders
|
||||
pure $ ResultsFragment telemTimeIO_DT Telem.Remote (JO.toEncJSON value) filteredHeaders
|
||||
|
||||
buildResult
|
||||
buildResultFromFragments
|
||||
:: Telem.QueryType
|
||||
-> Either (Either GQExecError QErr) (InsOrdHashMap G.Name ResultsFragment)
|
||||
-> HTTP.ResponseHeaders
|
||||
-> m (Telem.QueryType, DiffTime, Telem.Locality, HttpResponse (Maybe GQResponse, EncJSON))
|
||||
buildResult telemType (Left (Left err)) _ = pure
|
||||
( telemType , 0 , Telem.Remote , HttpResponse (Just (Left err), encodeGQResp $ Left err) [])
|
||||
buildResult _telemType (Left (Right err)) _ = throwError err
|
||||
buildResult telemType (Right results) cacheHeaders = do
|
||||
let responseData = pure $ encJToLBS $ encJFromInsOrdHashMap $ rfResponse <$> OMap.mapKeys G.unName results
|
||||
pure
|
||||
( telemType
|
||||
buildResultFromFragments telemType fragments cacheHeaders =
|
||||
buildResult telemType fragments \results ->
|
||||
let responseData = Right $ encJToLBS $ encJFromInsOrdHashMap $ rfResponse <$> OMap.mapKeys G.unName results
|
||||
in ( telemType
|
||||
, sum (fmap rfTimeIO results)
|
||||
, foldMap rfLocality results
|
||||
, HttpResponse
|
||||
@ -312,6 +336,45 @@ runGQ env logger reqId userInfo ipAddress reqHeaders queryType reqUnparsed = do
|
||||
(cacheHeaders <> foldMap rfHeaders results)
|
||||
)
|
||||
|
||||
buildResult
|
||||
:: Telem.QueryType
|
||||
-> Either (Either GQExecError QErr) a
|
||||
-> (a -> (Telem.QueryType, DiffTime, Telem.Locality, HttpResponse (Maybe GQResponse, EncJSON)))
|
||||
-> m (Telem.QueryType, DiffTime, Telem.Locality, HttpResponse (Maybe GQResponse, EncJSON))
|
||||
buildResult telemType result f = case result of
|
||||
Right a -> pure $ f a
|
||||
Left (Right err) -> throwError err
|
||||
Left (Left err) -> pure ( telemType
|
||||
, 0
|
||||
, Telem.Remote
|
||||
, HttpResponse
|
||||
(Just (Left err), encodeGQResp $ Left err)
|
||||
[]
|
||||
)
|
||||
|
||||
coalescePostgresMutations
|
||||
:: EB.ExecutionPlan
|
||||
-> Maybe ( SourceConfig 'Postgres
|
||||
, InsOrdHashMap G.Name (EB.DBStepInfo 'Postgres)
|
||||
)
|
||||
coalescePostgresMutations plan = do
|
||||
-- we extract the name and config of the first mutation root, if any
|
||||
(oneSourceName, oneSourceConfig) <- case toList plan of
|
||||
(E.ExecStepDB _ exists:_) -> AB.unpackAnyBackend @'Postgres exists <&> \dbsi ->
|
||||
( EB.dbsiSourceName dbsi
|
||||
, EB.dbsiSourceConfig dbsi
|
||||
)
|
||||
_ -> Nothing
|
||||
-- we then test whether all mutations are going to that same first source
|
||||
-- and that it is Postgres
|
||||
mutations <- for plan \case
|
||||
E.ExecStepDB _ exists -> do
|
||||
dbStepInfo <- AB.unpackAnyBackend @'Postgres exists
|
||||
guard $ oneSourceName == EB.dbsiSourceName dbStepInfo
|
||||
Just dbStepInfo
|
||||
_ -> Nothing
|
||||
Just (oneSourceConfig, mutations)
|
||||
|
||||
extractFieldFromResponse
|
||||
:: Monad m => Text -> LBS.ByteString -> ExceptT (Either GQExecError QErr) m JO.Value
|
||||
extractFieldFromResponse fieldName bs = do
|
||||
@ -377,4 +440,3 @@ runGQBatched env logger reqId responseErrorsConfig userInfo ipAddress reqHdrs qu
|
||||
removeHeaders <$> traverse (try . (fmap . fmap) snd . runGQ env logger reqId userInfo ipAddress reqHdrs queryType) reqs
|
||||
where
|
||||
try = flip catchError (pure . Left) . fmap Right
|
||||
|
||||
|
@ -59,12 +59,14 @@ import qualified Hasura.SQL.AnyBackend as AB
|
||||
import qualified Hasura.Server.Telemetry.Counters as Telem
|
||||
import qualified Hasura.Tracing as Tracing
|
||||
|
||||
import Hasura.Backends.Postgres.Instances.Transport (runPGMutationTransaction)
|
||||
import Hasura.EncJSON
|
||||
import Hasura.GraphQL.Logging
|
||||
import Hasura.GraphQL.Transport.Backend
|
||||
import Hasura.GraphQL.Transport.HTTP (MonadExecuteQuery (..),
|
||||
QueryCacheKey (..),
|
||||
ResultsFragment (..), buildRaw,
|
||||
coalescePostgresMutations,
|
||||
extractFieldFromResponse,
|
||||
filterVariablesFromQuery,
|
||||
runSessVarPred)
|
||||
@ -81,6 +83,7 @@ import Hasura.Server.Types (RequestId, getRequ
|
||||
import Hasura.Server.Version (HasVersion)
|
||||
import Hasura.Session
|
||||
|
||||
|
||||
-- | 'LQ.LiveQueryId' comes from 'Hasura.GraphQL.Execute.LiveQuery.State.addLiveQuery'. We use
|
||||
-- this to track a connection's operations so we can remove them from 'LiveQueryState', and
|
||||
-- log.
|
||||
@ -393,7 +396,7 @@ onStart env serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
|
||||
E.ExecStepDB _headers exists -> doQErr $ do
|
||||
(telemTimeIO_DT, resp) <-
|
||||
AB.dispatchAnyBackend @BackendTransport exists
|
||||
\(EB.DBStepInfo sourceConfig genSql tx) ->
|
||||
\(EB.DBStepInfo _ sourceConfig genSql tx) ->
|
||||
runDBQuery
|
||||
requestId
|
||||
q
|
||||
@ -411,7 +414,7 @@ onStart env serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
|
||||
pure $ ResultsFragment time Telem.Empty r []
|
||||
E.ExecStepRaw json ->
|
||||
buildRaw json
|
||||
buildResult Telem.Query telemCacheHit timerTot requestId conclusion
|
||||
buildResultFromFragments Telem.Query telemCacheHit timerTot requestId conclusion
|
||||
case conclusion of
|
||||
Left _ -> pure ()
|
||||
Right results -> Tracing.interpTraceT (withExceptT mempty) $
|
||||
@ -420,12 +423,31 @@ onStart env serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
|
||||
liftIO $ sendCompleted (Just requestId)
|
||||
|
||||
E.MutationExecutionPlan mutationPlan -> do
|
||||
-- See Note [Backwards-compatible transaction optimisation]
|
||||
case coalescePostgresMutations mutationPlan of
|
||||
-- we are in the aforementioned case; we circumvent the normal process
|
||||
Just (sourceConfig, pgMutations) -> do
|
||||
resp <- runExceptT $ doQErr $
|
||||
runPGMutationTransaction requestId q userInfo logger sourceConfig pgMutations
|
||||
-- we do not construct result fragments since we have only one result
|
||||
buildResult requestId resp \(telemTimeIO_DT, results) -> do
|
||||
let telemQueryType = Telem.Query
|
||||
telemLocality = Telem.Local
|
||||
telemTimeIO = convertDuration telemTimeIO_DT
|
||||
telemTimeTot <- Seconds <$> timerTot
|
||||
sendSuccResp (encJFromInsOrdHashMap $ OMap.mapKeys G.unName results) $
|
||||
LQ.LiveQueryMetadata telemTimeIO_DT
|
||||
-- Telemetry. NOTE: don't time network IO:
|
||||
Telem.recordTimingMetric Telem.RequestDimensions{..} Telem.RequestTimings{..}
|
||||
|
||||
-- we are not in the transaction case; proceeding normally
|
||||
Nothing -> do
|
||||
conclusion <- runExceptT $ forWithKey mutationPlan $ \fieldName -> \case
|
||||
-- Ignoring response headers since we can't send them over WebSocket
|
||||
E.ExecStepDB _responseHeaders exists -> doQErr $ do
|
||||
(telemTimeIO_DT, resp) <-
|
||||
AB.dispatchAnyBackend @BackendTransport exists
|
||||
\(EB.DBStepInfo sourceConfig genSql tx) ->
|
||||
\(EB.DBStepInfo _ sourceConfig genSql tx) ->
|
||||
runDBMutation
|
||||
requestId
|
||||
q
|
||||
@ -443,7 +465,7 @@ onStart env serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
|
||||
runRemoteGQ fieldName userInfo reqHdrs rsi gqlReq
|
||||
E.ExecStepRaw json ->
|
||||
buildRaw json
|
||||
buildResult Telem.Query telemCacheHit timerTot requestId conclusion
|
||||
buildResultFromFragments Telem.Query telemCacheHit timerTot requestId conclusion
|
||||
liftIO $ sendCompleted (Just requestId)
|
||||
|
||||
E.SubscriptionExecutionPlan subExec -> do
|
||||
@ -506,9 +528,19 @@ onStart env serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
|
||||
|
||||
telemTransport = Telem.WebSocket
|
||||
|
||||
buildResult _ _ _ _ (Left (Left err)) = postExecErr' err
|
||||
buildResult _ _ _ requestId (Left (Right err)) = postExecErr requestId err
|
||||
buildResult telemQueryType telemCacheHit timerTot _ (Right results) = do
|
||||
buildResult
|
||||
:: forall a
|
||||
. RequestId
|
||||
-> Either (Either GQExecError QErr) a
|
||||
-> (a -> ExceptT () m ())
|
||||
-> ExceptT () m ()
|
||||
buildResult requestId r f = case r of
|
||||
Left (Left err) -> postExecErr' err
|
||||
Left (Right err) -> postExecErr requestId err
|
||||
Right results -> f results
|
||||
|
||||
buildResultFromFragments telemQueryType telemCacheHit timerTot requestId r =
|
||||
buildResult requestId r \results -> do
|
||||
let telemLocality = foldMap rfLocality results
|
||||
telemTimeIO = convertDuration $ sum $ fmap rfTimeIO results
|
||||
telemTimeTot <- Seconds <$> timerTot
|
||||
|
@ -0,0 +1,15 @@
|
||||
type: bulk
|
||||
args:
|
||||
|
||||
- type: run_sql
|
||||
args:
|
||||
sql: |
|
||||
create table author(
|
||||
id serial primary key,
|
||||
name text unique
|
||||
);
|
||||
|
||||
- type: track_table
|
||||
args:
|
||||
schema: public
|
||||
name: author
|
@ -0,0 +1,6 @@
|
||||
type: bulk
|
||||
args:
|
||||
- type: run_sql
|
||||
args:
|
||||
sql: |
|
||||
drop table author
|
@ -0,0 +1,52 @@
|
||||
- description: Insert a value into the table.
|
||||
url: /v1/graphql
|
||||
status: 200
|
||||
response:
|
||||
data:
|
||||
insert_author_one:
|
||||
name: "Rick Astley"
|
||||
query:
|
||||
query: |
|
||||
mutation {
|
||||
insert_author_one(object: {id: 1, name: "Rick Astley"}) {
|
||||
name
|
||||
}
|
||||
}
|
||||
|
||||
- description: Send one valid mutation followed by an erroneous one.
|
||||
url: /v1/graphql
|
||||
status: 200
|
||||
response:
|
||||
errors:
|
||||
- extensions:
|
||||
path: $.selectionSet.insert_author_one.args.object
|
||||
code: constraint-violation
|
||||
message: Uniqueness violation. duplicate key value violates unique constraint
|
||||
"author_pkey"
|
||||
query:
|
||||
query: |
|
||||
mutation {
|
||||
insert_author(objects: {id: 2, name: "Eduard Anatolyevich Khil"}) {
|
||||
returning {
|
||||
name
|
||||
}
|
||||
}
|
||||
insert_author_one(object: {id: 1, name: "Rick Astley"}) {
|
||||
name
|
||||
}
|
||||
}
|
||||
|
||||
- description: Check that the first mutation was reverted.
|
||||
url: /v1/graphql
|
||||
status: 200
|
||||
response:
|
||||
data:
|
||||
author:
|
||||
- name: "Rick Astley"
|
||||
query:
|
||||
query: |
|
||||
query {
|
||||
author {
|
||||
name
|
||||
}
|
||||
}
|
@ -0,0 +1,53 @@
|
||||
- description: Insert a value into the table.
|
||||
url: /v1/graphql
|
||||
status: 200
|
||||
response:
|
||||
data:
|
||||
insert_author_one:
|
||||
name: "Rick Astley"
|
||||
query:
|
||||
query: |
|
||||
mutation {
|
||||
insert_author_one(object: {id: 1, name: "Rick Astley"}) {
|
||||
name
|
||||
}
|
||||
}
|
||||
|
||||
- description: Send one valid mutation followed by an erroneous one.
|
||||
url: /v1/graphql
|
||||
status: 200
|
||||
response:
|
||||
data:
|
||||
errors:
|
||||
- extensions:
|
||||
path: $.selectionSet.insert_author_one.args.object
|
||||
code: constraint-violation
|
||||
message: Uniqueness violation. duplicate key value violates unique constraint
|
||||
"author_pkey"
|
||||
query:
|
||||
query: |
|
||||
mutation {
|
||||
insert_author(objects: {id: 2, name: "Eduard Anatolyevich Khil"}) {
|
||||
returning {
|
||||
name
|
||||
}
|
||||
}
|
||||
insert_author_one(object: {id: 1, name: "Rick Astley"}) {
|
||||
name
|
||||
}
|
||||
}
|
||||
|
||||
- description: Check that the first mutation was reverted.
|
||||
url: /v1/graphql
|
||||
status: 200
|
||||
response:
|
||||
data:
|
||||
author:
|
||||
- name: "Rick Astley"
|
||||
query:
|
||||
query: |
|
||||
query {
|
||||
author {
|
||||
name
|
||||
}
|
||||
}
|
@ -0,0 +1,7 @@
|
||||
type: bulk
|
||||
args:
|
||||
|
||||
- type: run_sql
|
||||
args:
|
||||
sql: |
|
||||
delete from author;
|
@ -655,3 +655,13 @@ class TestGraphQLInheritedRoles:
|
||||
# should be removed/modified.
|
||||
def test_mutations_not_exposed_for_inherited_roles(self, hge_ctx, transport):
|
||||
check_query_f(hge_ctx, self.dir() + '/mutation_not_exposed_to_inherited_roles.yaml')
|
||||
|
||||
@pytest.mark.parametrize('transport', ['http', 'websocket'])
|
||||
@use_mutation_fixtures
|
||||
class TestGraphQLMutationTransactions:
|
||||
def test_transaction_revert(self, hge_ctx, transport):
|
||||
check_query_f(hge_ctx, self.dir() + '/transaction_revert_' + transport + '.yaml', transport)
|
||||
|
||||
@classmethod
|
||||
def dir(cls):
|
||||
return 'queries/graphql_mutation/transactions'
|
||||
|
Loading…
Reference in New Issue
Block a user