mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-17 20:41:49 +03:00
2b0c990d50
GitOrigin-RevId: 65267a7b6343a12b690f7710785fdfa30873e479
605 lines
27 KiB
Haskell
605 lines
27 KiB
Haskell
module Hasura.GraphQL.Execute.Action
|
|
( runActionExecution
|
|
, asyncActionsProcessor
|
|
, resolveActionExecution
|
|
, resolveActionMutationAsync
|
|
, resolveAsyncActionQuery
|
|
, insertActionTx
|
|
, fetchUndeliveredActionEventsTx
|
|
, setActionStatusTx
|
|
, fetchActionResponseTx
|
|
, clearActionDataTx
|
|
, module Types
|
|
) where
|
|
|
|
import Hasura.Prelude
|
|
|
|
import qualified Control.Concurrent.Async.Lifted.Safe as LA
|
|
import qualified Data.Aeson as J
|
|
import qualified Data.Aeson.Ordered as AO
|
|
import qualified Data.Aeson.TH as J
|
|
import qualified Data.ByteString.Lazy as BL
|
|
import qualified Data.CaseInsensitive as CI
|
|
import qualified Data.Environment as Env
|
|
import qualified Data.HashMap.Strict as Map
|
|
import qualified Data.HashSet as Set
|
|
import qualified Data.IntMap as IntMap
|
|
import qualified Data.Text as T
|
|
import qualified Database.PG.Query as Q
|
|
import qualified Language.GraphQL.Draft.Syntax as G
|
|
import qualified Network.HTTP.Client as HTTP
|
|
import qualified Network.HTTP.Types as HTTP
|
|
import qualified Network.Wreq as Wreq
|
|
|
|
import Control.Concurrent (threadDelay)
|
|
import Control.Exception (try)
|
|
import Control.Lens
|
|
import Control.Monad.Trans.Control (MonadBaseControl)
|
|
import Data.Has
|
|
import Data.IORef
|
|
import Data.Int (Int64)
|
|
import Data.Text.Extended
|
|
|
|
import qualified Hasura.Backends.Postgres.Execute.RemoteJoin as RJ
|
|
import qualified Hasura.Backends.Postgres.SQL.DML as S
|
|
import qualified Hasura.Backends.Postgres.Translate.Select as RS
|
|
import qualified Hasura.GraphQL.Execute.RemoteJoin as RJ
|
|
import qualified Hasura.Logging as L
|
|
import qualified Hasura.RQL.IR.Select as RS
|
|
import qualified Hasura.Tracing as Tracing
|
|
|
|
import Hasura.Backends.Postgres.SQL.Types
|
|
import Hasura.Backends.Postgres.SQL.Value (PGScalarValue (..))
|
|
import Hasura.Backends.Postgres.Translate.Column (toTxtValue)
|
|
import Hasura.Backends.Postgres.Translate.Select (asSingleRowJsonResp)
|
|
import Hasura.EncJSON
|
|
import Hasura.GraphQL.Execute.Action.Types as Types
|
|
import Hasura.GraphQL.Execute.Prepare
|
|
import Hasura.GraphQL.Parser
|
|
import Hasura.HTTP
|
|
import Hasura.Metadata.Class
|
|
import Hasura.RQL.DDL.Headers
|
|
import Hasura.RQL.DDL.Schema.Cache
|
|
import Hasura.RQL.Types
|
|
import Hasura.SQL.Types
|
|
import Hasura.Server.Utils (mkClientHeadersForward,
|
|
mkSetCookieHeaders)
|
|
import Hasura.Server.Version (HasVersion)
|
|
import Hasura.Session
|
|
|
|
runActionExecution
|
|
:: ( MonadIO m, MonadBaseControl IO m
|
|
, MonadError QErr m, Tracing.MonadTrace m
|
|
, MonadMetadataStorage (MetadataStorageT m)
|
|
)
|
|
=> ActionExecutionPlan -> m (DiffTime, EncJSON)
|
|
runActionExecution aep = do
|
|
(time, resp) <- withElapsedTime $ case aep of
|
|
AEPSync e -> unActionExecution e
|
|
AEPAsyncQuery actionId f -> do
|
|
actionLogResponse <- liftEitherM $ runMetadataStorageT $ fetchActionResponse actionId
|
|
unActionExecution $ f actionLogResponse
|
|
AEPAsyncMutation m -> pure m
|
|
pure (time, resp)
|
|
|
|
newtype ActionContext
|
|
= ActionContext {_acName :: ActionName}
|
|
deriving (Show, Eq)
|
|
$(J.deriveJSON hasuraJSON ''ActionContext)
|
|
|
|
data ActionWebhookPayload
|
|
= ActionWebhookPayload
|
|
{ _awpAction :: !ActionContext
|
|
, _awpSessionVariables :: !SessionVariables
|
|
, _awpInput :: !J.Value
|
|
} deriving (Show, Eq)
|
|
$(J.deriveJSON hasuraJSON ''ActionWebhookPayload)
|
|
|
|
data ActionWebhookErrorResponse
|
|
= ActionWebhookErrorResponse
|
|
{ _awerMessage :: !Text
|
|
, _awerCode :: !(Maybe Text)
|
|
} deriving (Show, Eq)
|
|
$(J.deriveJSON hasuraJSON ''ActionWebhookErrorResponse)
|
|
|
|
data ActionWebhookResponse
|
|
= AWRArray ![Map.HashMap G.Name J.Value]
|
|
| AWRObject !(Map.HashMap G.Name J.Value)
|
|
deriving (Show, Eq)
|
|
|
|
instance J.FromJSON ActionWebhookResponse where
|
|
parseJSON v = case v of
|
|
J.Array{} -> AWRArray <$> J.parseJSON v
|
|
J.Object{} -> AWRObject <$> J.parseJSON v
|
|
_ -> fail "expecting object or array of objects for action webhook response"
|
|
|
|
instance J.ToJSON ActionWebhookResponse where
|
|
toJSON (AWRArray objects) = J.toJSON objects
|
|
toJSON (AWRObject obj) = J.toJSON obj
|
|
|
|
data ActionRequestInfo
|
|
= ActionRequestInfo
|
|
{ _areqiUrl :: !Text
|
|
, _areqiBody :: !J.Value
|
|
, _areqiHeaders :: ![HeaderConf]
|
|
} deriving (Show, Eq)
|
|
$(J.deriveToJSON hasuraJSON ''ActionRequestInfo)
|
|
|
|
data ActionResponseInfo
|
|
= ActionResponseInfo
|
|
{ _aresiStatus :: !Int
|
|
, _aresiBody :: !J.Value
|
|
, _aresiHeaders :: ![HeaderConf]
|
|
} deriving (Show, Eq)
|
|
$(J.deriveToJSON hasuraJSON ''ActionResponseInfo)
|
|
|
|
data ActionInternalError
|
|
= ActionInternalError
|
|
{ _aieError :: !J.Value
|
|
, _aieRequest :: !ActionRequestInfo
|
|
, _aieResponse :: !(Maybe ActionResponseInfo)
|
|
} deriving (Show, Eq)
|
|
$(J.deriveToJSON hasuraJSON ''ActionInternalError)
|
|
|
|
-- * Action handler logging related
|
|
data ActionHandlerLog
|
|
= ActionHandlerLog
|
|
{ _ahlRequestSize :: !Int64
|
|
, _ahlResponseSize :: !Int64
|
|
} deriving (Show)
|
|
$(J.deriveJSON hasuraJSON{J.omitNothingFields=True} ''ActionHandlerLog)
|
|
|
|
instance L.ToEngineLog ActionHandlerLog L.Hasura where
|
|
toEngineLog ahl = (L.LevelInfo, L.ELTActionHandler, J.toJSON ahl)
|
|
|
|
|
|
-- | Synchronously execute webhook handler and resolve response to action "output"
|
|
resolveActionExecution
|
|
:: ( HasVersion
|
|
, MonadError QErr m
|
|
, MonadIO m
|
|
, Tracing.MonadTrace m
|
|
)
|
|
=> Env.Environment
|
|
-> L.Logger L.Hasura
|
|
-> UserInfo
|
|
-> AnnActionExecution 'Postgres (UnpreparedValue 'Postgres)
|
|
-> ActionExecContext
|
|
-> m ActionExecuteResult
|
|
resolveActionExecution env logger userInfo annAction execContext = do
|
|
let actionContext = ActionContext actionName
|
|
handlerPayload = ActionWebhookPayload actionContext sessionVariables inputPayload
|
|
(webhookRes, respHeaders) <- flip runReaderT logger $
|
|
callWebhook env manager outputType outputFields reqHeaders confHeaders
|
|
forwardClientHeaders resolvedWebhook handlerPayload timeout
|
|
|
|
flip ActionExecuteResult respHeaders <$> case actionSource of
|
|
-- Build client response
|
|
ASINoSource -> pure $ ActionExecution $ pure $ AO.toEncJSON $ makeActionResponseNoRelations annFields webhookRes
|
|
ASISource sourceConfig -> do
|
|
let webhookResponseExpression = RS.AEInput $ UVLiteral $
|
|
toTxtValue $ ColumnValue (ColumnScalar PGJSONB) $ PGValJSONB $ Q.JSONB $ J.toJSON webhookRes
|
|
selectAstUnresolved = processOutputSelectionSet webhookResponseExpression
|
|
outputType definitionList annFields stringifyNum
|
|
(astResolved, finalPlanningSt) <- flip runStateT initPlanningSt $ RS.traverseAnnSimpleSelect prepareWithPlan selectAstUnresolved
|
|
let prepArgs = fmap fst $ IntMap.elems $ withUserVars (_uiSession userInfo) $ _psPrepped finalPlanningSt
|
|
pure $ executeActionInDb sourceConfig astResolved prepArgs
|
|
where
|
|
AnnActionExecution actionName outputType annFields inputPayload
|
|
outputFields definitionList resolvedWebhook confHeaders
|
|
forwardClientHeaders stringifyNum timeout actionSource = annAction
|
|
ActionExecContext manager reqHeaders sessionVariables = execContext
|
|
|
|
|
|
executeActionInDb :: SourceConfig 'Postgres -> RS.AnnSimpleSel 'Postgres -> [Q.PrepArg] -> ActionExecution
|
|
executeActionInDb sourceConfig astResolved prepArgs = ActionExecution do
|
|
let (astResolvedWithoutRemoteJoins, maybeRemoteJoins) = RJ.getRemoteJoinsSelect astResolved
|
|
jsonAggType = mkJsonAggSelect outputType
|
|
liftEitherM $ runExceptT $ runLazyTx (_pscExecCtx sourceConfig) Q.ReadOnly $
|
|
case maybeRemoteJoins of
|
|
Just remoteJoins ->
|
|
let query = Q.fromBuilder $ toSQL $
|
|
RS.mkSQLSelect jsonAggType astResolvedWithoutRemoteJoins
|
|
in RJ.executeQueryWithRemoteJoins env manager reqHeaders userInfo query prepArgs remoteJoins
|
|
Nothing ->
|
|
liftTx $ asSingleRowJsonResp (Q.fromBuilder $ toSQL $ RS.mkSQLSelect jsonAggType astResolved) prepArgs
|
|
|
|
|
|
-- | Build action response from the Webhook JSON response when there are no relationships defined
|
|
makeActionResponseNoRelations :: RS.AnnFieldsG b v -> ActionWebhookResponse -> AO.Value
|
|
makeActionResponseNoRelations annFields webhookResponse =
|
|
let mkResponseObject obj =
|
|
AO.object $ flip mapMaybe annFields $ \(fieldName, annField) ->
|
|
let fieldText = getFieldNameTxt fieldName
|
|
in (fieldText,) <$> case annField of
|
|
RS.AFExpression t -> Just $ AO.String t
|
|
_ -> AO.toOrdered <$> Map.lookup fieldText (mapKeys G.unName obj)
|
|
in case webhookResponse of
|
|
AWRArray objs -> AO.array $ map mkResponseObject objs
|
|
AWRObject obj -> mkResponseObject obj
|
|
|
|
{- Note: [Async action architecture]
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
In async actions, acquiring the action result is deferred. The async action mutation is made to
|
|
initiate the action which returns an UUID. The UUID is used to query/subsribe for actions response.
|
|
|
|
On mutation, the server makes an action log record in hdb_catalog.hdb_action_log table with request headers
|
|
and input arguments. The `asyncActionsProcessor` background thread processes the async actions by executing
|
|
the webhook handler and writing back the response payload or errors if any in the database.
|
|
|
|
When an async action query/subscription is made, the server fetches the relavent data from the hdb_action_log
|
|
table provides the action response. See Note [Resolving async action query/subscription] below.
|
|
-}
|
|
|
|
-- | Resolve asynchronous action mutation which returns only the action uuid
|
|
resolveActionMutationAsync
|
|
:: (MonadMetadataStorage m)
|
|
=> AnnActionMutationAsync
|
|
-> [HTTP.Header]
|
|
-> SessionVariables
|
|
-> m EncJSON
|
|
resolveActionMutationAsync annAction reqHeaders sessionVariables = do
|
|
actionId <- insertAction actionName sessionVariables reqHeaders inputArgs
|
|
pure $ encJFromJValue $ actionIdToText actionId
|
|
where
|
|
AnnActionMutationAsync actionName inputArgs = annAction
|
|
|
|
{- Note: [Resolving async action query/subscription]
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
Resolving async action query involves in selecting fields from hdb_catalog.hdb_action_log table.
|
|
See Note [Async action architecture] above. See the table's Postgres schema in src-rsr/initialise.sql.
|
|
The webhook's response JSON stored in "response_payload" column has to be fetched as "output"
|
|
along with relationships (if any) to other tables. The in-built pg_catalog function `jsonb_to_record`
|
|
helps in converting any JSON object to Postgres record type. Thus generated record is used to resolve
|
|
action's type. Here, we treat the "output" field as a computed field to hdb_action_log table with
|
|
`jsonb_to_record` as custom SQL function.
|
|
-}
|
|
|
|
-- TODO: Add tracing here? Avoided now because currently the function is pure
|
|
resolveAsyncActionQuery
|
|
:: UserInfo
|
|
-> AnnActionAsyncQuery 'Postgres (UnpreparedValue 'Postgres)
|
|
-> ActionLogResponse
|
|
-> ActionExecution
|
|
resolveAsyncActionQuery userInfo annAction actionLogResponse = ActionExecution
|
|
case actionSource of
|
|
ASINoSource -> do
|
|
let ActionLogResponse{..} = actionLogResponse
|
|
resolvedFields <- for asyncFields $ \(fieldName, fld) -> do
|
|
let fieldText = getFieldNameTxt fieldName
|
|
(fieldText,) <$> case fld of
|
|
AsyncTypename t -> pure $ AO.String t
|
|
AsyncOutput annFields ->
|
|
fromMaybe AO.Null <$> forM _alrResponsePayload
|
|
\response -> makeActionResponseNoRelations annFields <$> decodeValue response
|
|
AsyncId -> pure $ AO.String $ actionIdToText actionId
|
|
AsyncCreatedAt -> pure $ AO.toOrdered $ J.toJSON _alrCreatedAt
|
|
AsyncErrors -> pure $ AO.toOrdered $ J.toJSON _alrErrors
|
|
pure $ AO.toEncJSON $ AO.object resolvedFields
|
|
|
|
ASISource sourceConfig -> do
|
|
let jsonAggSelect = mkJsonAggSelect outputType
|
|
annotatedFields = asyncFields <&> second \case
|
|
AsyncTypename t -> RS.AFExpression t
|
|
AsyncOutput annFields ->
|
|
-- See Note [Resolving async action query/subscription]
|
|
let inputTableArgument = RS.AETableRow $ Just $ Identifier "response_payload"
|
|
in RS.AFComputedField () $ RS.CFSTable jsonAggSelect $
|
|
processOutputSelectionSet inputTableArgument outputType
|
|
definitionList annFields stringifyNumerics
|
|
|
|
AsyncId -> mkAnnFldFromPGCol idColumn
|
|
AsyncCreatedAt -> mkAnnFldFromPGCol createdAtColumn
|
|
AsyncErrors -> mkAnnFldFromPGCol errorsColumn
|
|
|
|
jsonbToRecordSet = QualifiedObject "pg_catalog" $ FunctionName "jsonb_to_recordset"
|
|
actionLogInput = UVLiteral $ S.SELit $ lbsToTxt $ J.encode [actionLogResponse]
|
|
functionArgs = RS.FunctionArgsExp [RS.AEInput actionLogInput] mempty
|
|
tableFromExp = RS.FromFunction jsonbToRecordSet functionArgs $ Just
|
|
[idColumn, createdAtColumn, responsePayloadColumn, errorsColumn, sessionVarsColumn]
|
|
tableArguments = RS.noSelectArgs
|
|
{ RS._saWhere = Just tableBoolExpression}
|
|
tablePermissions = RS.TablePerm annBoolExpTrue Nothing
|
|
annSelect = RS.AnnSelectG annotatedFields tableFromExp tablePermissions
|
|
tableArguments stringifyNumerics
|
|
|
|
(selectResolved, _) <- flip runStateT Set.empty $ RS.traverseAnnSimpleSelect prepareWithoutPlan annSelect
|
|
liftEitherM $ liftIO $ runPgSourceReadTx sourceConfig $
|
|
asSingleRowJsonResp (Q.fromBuilder $ toSQL $ RS.mkSQLSelect jsonAggSelect selectResolved) []
|
|
where
|
|
AnnActionAsyncQuery _ actionId outputType asyncFields definitionList stringifyNumerics actionSource = annAction
|
|
|
|
idColumn = (unsafePGCol "id", PGUUID)
|
|
responsePayloadColumn = (unsafePGCol "response_payload", PGJSONB)
|
|
createdAtColumn = (unsafePGCol "created_at", PGTimeStampTZ)
|
|
errorsColumn = (unsafePGCol "errors", PGJSONB)
|
|
sessionVarsColumn = (unsafePGCol "session_variables", PGJSONB)
|
|
|
|
-- TODO (from master):- Avoid using ColumnInfo
|
|
mkAnnFldFromPGCol columnInfoArgs =
|
|
RS.mkAnnColumnField (mkPGColumnInfo columnInfoArgs) Nothing Nothing
|
|
|
|
mkPGColumnInfo (column', columnType) =
|
|
ColumnInfo column' (G.unsafeMkName $ getPGColTxt column') 0 (ColumnScalar columnType) True Nothing
|
|
|
|
tableBoolExpression =
|
|
let actionIdColumnInfo = ColumnInfo (unsafePGCol "id") $$(G.litName "id")
|
|
0 (ColumnScalar PGUUID) False Nothing
|
|
actionIdColumnEq = BoolFld $ AVCol actionIdColumnInfo [AEQ True $ UVLiteral $ S.SELit $ actionIdToText actionId]
|
|
sessionVarsColumnInfo = mkPGColumnInfo sessionVarsColumn
|
|
sessionVarValue = UVParameter Nothing $ ColumnValue (ColumnScalar PGJSONB) $
|
|
PGValJSONB $ Q.JSONB $ J.toJSON $ _uiSession userInfo
|
|
sessionVarsColumnEq = BoolFld $ AVCol sessionVarsColumnInfo [AEQ True sessionVarValue]
|
|
|
|
-- For non-admin roles, accessing an async action's response should be allowed only for the user
|
|
-- who initiated the action through mutation. The action's response is accessible for a query/subscription
|
|
-- only when it's session variables are equal to that of action's.
|
|
in if (adminRoleName == (_uiRole userInfo)) then actionIdColumnEq
|
|
else BoolAnd [actionIdColumnEq, sessionVarsColumnEq]
|
|
|
|
|
|
-- | Process async actions from hdb_catalog.hdb_action_log table. This functions is executed in a background thread.
|
|
-- See Note [Async action architecture] above
|
|
asyncActionsProcessor
|
|
:: forall m void
|
|
. ( HasVersion
|
|
, MonadIO m
|
|
, MonadBaseControl IO m
|
|
, LA.Forall (LA.Pure m)
|
|
, Tracing.HasReporter m
|
|
, MonadMetadataStorage (MetadataStorageT m)
|
|
)
|
|
=> Env.Environment
|
|
-> L.Logger L.Hasura
|
|
-> IORef (RebuildableSchemaCache, SchemaCacheVer)
|
|
-> HTTP.Manager
|
|
-> m void
|
|
asyncActionsProcessor env logger cacheRef httpManager = forever $ do
|
|
asyncInvocationsE <- runMetadataStorageT fetchUndeliveredActionEvents
|
|
asyncInvocations <- liftIO $ onLeft asyncInvocationsE mempty
|
|
actionCache <- scActions . lastBuiltSchemaCache . fst <$> liftIO (readIORef cacheRef)
|
|
LA.mapConcurrently_ (callHandler actionCache) asyncInvocations
|
|
liftIO $ threadDelay (1 * 1000 * 1000)
|
|
where
|
|
callHandler :: ActionCache -> ActionLogItem -> m ()
|
|
callHandler actionCache actionLogItem = Tracing.runTraceT "async actions processor" do
|
|
let ActionLogItem actionId actionName reqHeaders
|
|
sessionVariables inputPayload = actionLogItem
|
|
case Map.lookup actionName actionCache of
|
|
Nothing -> return ()
|
|
Just actionInfo -> do
|
|
let definition = _aiDefinition actionInfo
|
|
outputFields = getActionOutputFields $ _aiOutputObject actionInfo
|
|
webhookUrl = _adHandler definition
|
|
forwardClientHeaders = _adForwardClientHeaders definition
|
|
confHeaders = _adHeaders definition
|
|
timeout = _adTimeout definition
|
|
outputType = _adOutputType definition
|
|
actionContext = ActionContext actionName
|
|
eitherRes <- runExceptT $ flip runReaderT logger $
|
|
callWebhook env httpManager outputType outputFields reqHeaders confHeaders
|
|
forwardClientHeaders webhookUrl
|
|
(ActionWebhookPayload actionContext sessionVariables inputPayload)
|
|
timeout
|
|
resE <- runMetadataStorageT $ setActionStatus actionId $ case eitherRes of
|
|
Left e -> AASError e
|
|
Right (responsePayload, _) -> AASCompleted $ J.toJSON responsePayload
|
|
|
|
liftIO $ onLeft resE mempty
|
|
|
|
callWebhook
|
|
:: forall m r.
|
|
( HasVersion
|
|
, MonadIO m
|
|
, MonadError QErr m
|
|
, Tracing.MonadTrace m
|
|
, MonadReader r m
|
|
, Has (L.Logger L.Hasura) r
|
|
)
|
|
=> Env.Environment
|
|
-> HTTP.Manager
|
|
-> GraphQLType
|
|
-> ActionOutputFields
|
|
-> [HTTP.Header]
|
|
-> [HeaderConf]
|
|
-> Bool
|
|
-> ResolvedWebhook
|
|
-> ActionWebhookPayload
|
|
-> Timeout
|
|
-> m (ActionWebhookResponse, HTTP.ResponseHeaders)
|
|
callWebhook env manager outputType outputFields reqHeaders confHeaders
|
|
forwardClientHeaders resolvedWebhook actionWebhookPayload timeoutSeconds = do
|
|
resolvedConfHeaders <- makeHeadersFromConf env confHeaders
|
|
let clientHeaders = if forwardClientHeaders then mkClientHeadersForward reqHeaders else []
|
|
contentType = ("Content-Type", "application/json")
|
|
-- Using HashMap to avoid duplicate headers between configuration headers
|
|
-- and client headers where configuration headers are preferred
|
|
hdrs = contentType : (Map.toList . Map.fromList) (resolvedConfHeaders <> clientHeaders)
|
|
postPayload = J.toJSON actionWebhookPayload
|
|
requestBody = J.encode postPayload
|
|
requestBodySize = BL.length requestBody
|
|
url = unResolvedWebhook resolvedWebhook
|
|
responseTimeout = HTTP.responseTimeoutMicro $ (unTimeout timeoutSeconds) * 1000000
|
|
httpResponse <- do
|
|
initReq <- liftIO $ HTTP.parseRequest (T.unpack url)
|
|
let req = initReq { HTTP.method = "POST"
|
|
, HTTP.requestHeaders = addDefaultHeaders hdrs
|
|
, HTTP.requestBody = HTTP.RequestBodyLBS requestBody
|
|
, HTTP.responseTimeout = responseTimeout
|
|
}
|
|
Tracing.tracedHttpRequest req \req' ->
|
|
liftIO . try $ HTTP.httpLbs req' manager
|
|
let requestInfo = ActionRequestInfo url postPayload $
|
|
confHeaders <> toHeadersConf clientHeaders
|
|
case httpResponse of
|
|
Left e ->
|
|
throw500WithDetail "http exception when calling webhook" $
|
|
J.toJSON $ ActionInternalError (J.toJSON $ HttpException e) requestInfo Nothing
|
|
|
|
Right responseWreq -> do
|
|
let responseBody = responseWreq ^. Wreq.responseBody
|
|
responseBodySize = BL.length responseBody
|
|
responseStatus = responseWreq ^. Wreq.responseStatus
|
|
mkResponseInfo respBody =
|
|
ActionResponseInfo (HTTP.statusCode responseStatus) respBody $
|
|
toHeadersConf $ responseWreq ^. Wreq.responseHeaders
|
|
|
|
-- log the request and response to/from the action handler
|
|
logger :: (L.Logger L.Hasura) <- asks getter
|
|
L.unLogger logger $ ActionHandlerLog requestBodySize responseBodySize
|
|
|
|
case J.eitherDecode responseBody of
|
|
Left e -> do
|
|
let responseInfo = mkResponseInfo $ J.String $ bsToTxt $ BL.toStrict responseBody
|
|
throw500WithDetail "not a valid json response from webhook" $ J.toJSON $
|
|
ActionInternalError (J.toJSON $ "invalid json: " <> e) requestInfo $ Just responseInfo
|
|
|
|
Right responseValue -> do
|
|
let responseInfo = mkResponseInfo responseValue
|
|
addInternalToErr e =
|
|
let actionInternalError = J.toJSON $
|
|
ActionInternalError (J.String "unexpected response") requestInfo $ Just responseInfo
|
|
in e{qeInternal = Just actionInternalError}
|
|
|
|
if | HTTP.statusIsSuccessful responseStatus -> do
|
|
let expectingArray = isListType outputType
|
|
modifyQErr addInternalToErr $ do
|
|
webhookResponse <- decodeValue responseValue
|
|
case webhookResponse of
|
|
AWRArray objs -> do
|
|
unless expectingArray $
|
|
throwUnexpected "expecting object for action webhook response but got array"
|
|
mapM_ validateResponseObject objs
|
|
AWRObject obj -> do
|
|
when expectingArray $
|
|
throwUnexpected "expecting array for action webhook response but got object"
|
|
validateResponseObject obj
|
|
pure (webhookResponse, mkSetCookieHeaders responseWreq)
|
|
|
|
| HTTP.statusIsClientError responseStatus -> do
|
|
ActionWebhookErrorResponse message maybeCode <-
|
|
modifyQErr addInternalToErr $ decodeValue responseValue
|
|
let code = maybe Unexpected ActionWebhookCode maybeCode
|
|
qErr = QErr [] responseStatus message code Nothing
|
|
throwError qErr
|
|
|
|
| otherwise -> do
|
|
let err = J.toJSON $ "expecting 2xx or 4xx status code, but found "
|
|
++ show (HTTP.statusCode responseStatus)
|
|
throw500WithDetail "internal error" $ J.toJSON $
|
|
ActionInternalError err requestInfo $ Just responseInfo
|
|
where
|
|
throwUnexpected = throw400 Unexpected
|
|
|
|
-- Webhook response object should conform to action output fields
|
|
validateResponseObject obj = do
|
|
-- Fields not specified in the output type shouldn't be present in the response
|
|
let extraFields = filter (not . flip Map.member outputFields) $ Map.keys obj
|
|
unless (null extraFields) $ throwUnexpected $
|
|
"unexpected fields in webhook response: " <> commaSeparated extraFields
|
|
|
|
void $ flip Map.traverseWithKey outputFields $ \fieldName fieldTy ->
|
|
-- When field is non-nullable, it has to present in the response with no null value
|
|
unless (G.isNullable fieldTy) $ case Map.lookup fieldName obj of
|
|
Nothing -> throwUnexpected $
|
|
"field " <> fieldName <<> " expected in webhook response, but not found"
|
|
Just v -> when (v == J.Null) $ throwUnexpected $
|
|
"expecting not null value for field " <>> fieldName
|
|
|
|
processOutputSelectionSet
|
|
:: RS.ArgumentExp 'Postgres v
|
|
-> GraphQLType
|
|
-> [(PGCol, PGScalarType)]
|
|
-> RS.AnnFieldsG 'Postgres v
|
|
-> Bool
|
|
-> RS.AnnSimpleSelG 'Postgres v
|
|
processOutputSelectionSet tableRowInput actionOutputType definitionList annotatedFields =
|
|
RS.AnnSelectG annotatedFields selectFrom RS.noTablePermissions RS.noSelectArgs
|
|
where
|
|
jsonbToPostgresRecordFunction =
|
|
QualifiedObject "pg_catalog" $ FunctionName $
|
|
if isListType actionOutputType then
|
|
"jsonb_to_recordset" -- Multirow array response
|
|
else "jsonb_to_record" -- Single object response
|
|
|
|
functionArgs = RS.FunctionArgsExp [tableRowInput] mempty
|
|
selectFrom = RS.FromFunction jsonbToPostgresRecordFunction functionArgs $ Just definitionList
|
|
|
|
mkJsonAggSelect :: GraphQLType -> JsonAggSelect
|
|
mkJsonAggSelect =
|
|
bool JASSingleObject JASMultipleRows . isListType
|
|
|
|
insertActionTx
|
|
:: ActionName -> SessionVariables -> [HTTP.Header] -> J.Value
|
|
-> Q.TxE QErr ActionId
|
|
insertActionTx actionName sessionVariables httpHeaders inputArgsPayload =
|
|
runIdentity . Q.getRow <$> Q.withQE defaultTxErrorHandler [Q.sql|
|
|
INSERT INTO
|
|
"hdb_catalog"."hdb_action_log"
|
|
("action_name", "session_variables", "request_headers", "input_payload", "status")
|
|
VALUES
|
|
($1, $2, $3, $4, $5)
|
|
RETURNING "id"
|
|
|]
|
|
( actionName
|
|
, Q.AltJ sessionVariables
|
|
, Q.AltJ $ toHeadersMap httpHeaders
|
|
, Q.AltJ inputArgsPayload
|
|
, "created"::Text
|
|
) False
|
|
where
|
|
toHeadersMap = Map.fromList . map ((bsToTxt . CI.original) *** bsToTxt)
|
|
|
|
fetchUndeliveredActionEventsTx :: Q.TxE QErr [ActionLogItem]
|
|
fetchUndeliveredActionEventsTx =
|
|
map mapEvent <$> Q.listQE defaultTxErrorHandler [Q.sql|
|
|
update hdb_catalog.hdb_action_log set status = 'processing'
|
|
where
|
|
id in (
|
|
select id from hdb_catalog.hdb_action_log
|
|
where status = 'created'
|
|
for update skip locked limit 10
|
|
)
|
|
returning
|
|
id, action_name, request_headers::json, session_variables::json, input_payload::json
|
|
|] () False
|
|
where
|
|
mapEvent (actionId, actionName, Q.AltJ headersMap,
|
|
Q.AltJ sessionVariables, Q.AltJ inputPayload) =
|
|
ActionLogItem actionId actionName (fromHeadersMap headersMap) sessionVariables inputPayload
|
|
|
|
fromHeadersMap = map ((CI.mk . txtToBs) *** txtToBs) . Map.toList
|
|
|
|
setActionStatusTx :: ActionId -> AsyncActionStatus -> Q.TxE QErr ()
|
|
setActionStatusTx actionId = \case
|
|
AASCompleted responsePayload ->
|
|
Q.unitQE defaultTxErrorHandler [Q.sql|
|
|
update hdb_catalog.hdb_action_log
|
|
set response_payload = $1, status = 'completed'
|
|
where id = $2
|
|
|] (Q.AltJ responsePayload, actionId) False
|
|
|
|
AASError qerr ->
|
|
Q.unitQE defaultTxErrorHandler [Q.sql|
|
|
update hdb_catalog.hdb_action_log
|
|
set errors = $1, status = 'error'
|
|
where id = $2
|
|
|] (Q.AltJ qerr, actionId) False
|
|
|
|
fetchActionResponseTx :: ActionId -> Q.TxE QErr ActionLogResponse
|
|
fetchActionResponseTx actionId = do
|
|
(ca, rp, errs, Q.AltJ sessVars) <-
|
|
Q.getRow <$> Q.withQE defaultTxErrorHandler [Q.sql|
|
|
SELECT created_at, response_payload::json, errors::json, session_variables::json
|
|
FROM hdb_catalog.hdb_action_log
|
|
WHERE id = $1
|
|
|] (Identity actionId) True
|
|
pure $ ActionLogResponse actionId ca (Q.getAltJ <$> rp) (Q.getAltJ <$> errs) sessVars
|
|
|
|
clearActionDataTx :: ActionName -> Q.TxE QErr ()
|
|
clearActionDataTx actionName =
|
|
Q.unitQE defaultTxErrorHandler [Q.sql|
|
|
DELETE FROM hdb_catalog.hdb_action_log
|
|
WHERE action_name = $1
|
|
|] (Identity actionName) True
|