server: API support for fetching events

PR-URL: https://github.com/hasura/graphql-engine-mono/pull/7587
GitOrigin-RevId: da9e77abe9e9eb0e414aaddccd021433cd40e604
This commit is contained in:
Krushan Bauva 2023-04-25 16:52:27 +05:30 committed by hasura-bot
parent 7066b52777
commit 2a68a4ac18
9 changed files with 830 additions and 44 deletions

View File

@ -185,6 +185,92 @@ X-Hasura-Role: admin
| payload | true | JSON | Some JSON payload to send to trigger |
| source | false | [SourceName](/api-reference/syntax-defs.mdx#sourcename) | Name of the source database of the trigger (default: `default`) |
## pg_get_event_logs {#metadata-pg-get-event-logs}
`pg_get_event_logs` is used to fetch the event logs for a given event trigger.
```http
POST /v1/metadata HTTP/1.1
Content-Type: application/json
X-Hasura-Role: admin
{
"type": "pg_get_event_logs",
"args": {
"name": "sample_trigger",
"source": "default",
"status": "processed",
"limit": 10,
"offset": 0
}
}
```
### Args syntax {#metadata-pg-get-event-logs-syntax}
| Key | Required | Schema | Description |
| ------- | -------- | --------------------------------------------------------- | ------------------------------------------------------------------------------------------------------ |
| name | true | [TriggerName](/api-reference/syntax-defs.mdx#triggername) | Name of the Event Trigger |
| source | false | [SourceName](/api-reference/syntax-defs.mdx#sourcename) | Name of the source database of the trigger (default: `default`) |
| status | false | \[`pending` \| `processed` \] | Type of event logs to be fetched. If `status` is not provided then all types of status are included |
| limit | false | Integer | Maximum number of event logs to be returned in one API call (default: `100`) |
| offset | false | Integer | Starting point from where the event logs need to be returned (default: `0`) |
## pg_get_event_invocation_logs {#metadata-pg-get-event-invocation-logs}
`pg_get_event_invocation_logs` is used to fetch the invocation logs for a given event trigger.
```http
POST /v1/metadata HTTP/1.1
Content-Type: application/json
X-Hasura-Role: admin
{
"type": "pg_get_event_invocation_logs",
"args": {
"name": "sample_trigger",
"source": "default",
"limit": 10,
"offset": 0
}
}
```
### Args syntax {#metadata-pg-get-event-invocation-logs-syntax}
| Key | Required | Schema | Description |
| ------- | -------- | --------------------------------------------------------- | ------------------------------------------------------------------------------------------------------ |
| name | true | [TriggerName](/api-reference/syntax-defs.mdx#triggername) | Name of the Event Trigger |
| source | false | [SourceName](/api-reference/syntax-defs.mdx#sourcename) | Name of the source database of the trigger (default: `default`) |
| limit | false | Integer | Maximum number of invocation logs to be returned in one API call (default: `100`) |
| offset | false | Integer | Starting point from where the invocation logs need to be returned (default: `0`) |
## pg_get_event_by_id {#metadata-pg-get-event-by-id}
`pg_get_event_by_id` is used to fetch the event and invocation logs for a given `event_id`.
```http
POST /v1/metadata HTTP/1.1
Content-Type: application/json
X-Hasura-Role: admin
{
"type": "pg_get_event_by_id",
"args": {
"source" : "default",
"event_id" : "009335c9-7b01-4ea5-b790-66a112e165f9",
"invocation_log_limit" : 100,
"invocation_log_offset" : 0
}
}
```
### Args syntax {#metadata-pg-get-event-by-id-syntax}
| Key | Required | Schema | Description |
| ------- | -------- | --------------------------------------------------------- | --------------------------------------------------------------------------------- |
| source | false | [SourceName](/api-reference/syntax-defs.mdx#sourcename) | Name of the source database of the trigger (default: `default`) |
| event_id | true | String | UUID of the event |
| invocation_log_limit | false | Integer | Maximum number of invocation logs to be returned in one API call (default: `100`) |
| invocation_log_offset | false | Integer | Starting point from where the invocation logs need to be returned (default: `0`) |
---
## mssql_create_event_trigger {#metadata-mssql-create-event-trigger}
@ -347,6 +433,92 @@ X-Hasura-Role: admin
| payload | true | JSON | Some JSON payload to send to trigger |
| source | false | [SourceName](/api-reference/syntax-defs.mdx#sourcename) | Name of the source database of the trigger (default: `default`) |
## mssql_get_event_logs {#metadata-mssql-get-event-logs}
`mssql_get_event_logs` is used to fetch the event logs for a given event trigger.
```http
POST /v1/metadata HTTP/1.1
Content-Type: application/json
X-Hasura-Role: admin
{
"type": "mssql_get_event_logs",
"args": {
"name": "sample_trigger",
"source": "default",
"status": "processed",
"limit": 10,
"offset": 0
}
}
```
### Args syntax {#metadata-mssql-get-event-logs-syntax}
| Key | Required | Schema | Description |
| ------- | -------- | --------------------------------------------------------- | ------------------------------------------------------------------------------------------------------ |
| name | true | [TriggerName](/api-reference/syntax-defs.mdx#triggername) | Name of the Event Trigger |
| source | false | [SourceName](/api-reference/syntax-defs.mdx#sourcename) | Name of the source database of the trigger (default: `default`) |
| status | false | \[`pending` \| `processed` \] | Type of event logs to be fetched. If `status` is not provided then all types of status are included |
| limit | false | Integer | Maximum number of event logs to be returned in one API call (default: `100`) |
| offset | false | Integer | Starting point from where the event logs need to be returned (default: `0`) |
## mssql_get_event_invocation_logs {#metadata-mssql-get-event-invocation-logs}
`mssql_get_event_invocation_logs` is used to fetch the invocation logs for a given event trigger.
```http
POST /v1/metadata HTTP/1.1
Content-Type: application/json
X-Hasura-Role: admin
{
"type": "mssql_get_event_invocation_logs",
"args": {
"name": "sample_trigger",
"source": "default",
"limit": 10,
"offset": 0
}
}
```
### Args syntax {#metadata-mssql-get-event-invocation-logs-syntax}
| Key | Required | Schema | Description |
| ------- | -------- | --------------------------------------------------------- | ------------------------------------------------------------------------------------------------------ |
| name | true | [TriggerName](/api-reference/syntax-defs.mdx#triggername) | Name of the Event Trigger |
| source | false | [SourceName](/api-reference/syntax-defs.mdx#sourcename) | Name of the source database of the trigger (default: `default`) |
| limit | false | Integer | Maximum number of invocation logs to be returned in one API call (default: `100`) |
| offset | false | Integer | Starting point from where the invocation logs need to be returned (default: `0`) |
## mssql_get_event_by_id {#metadata-mssql-get-event-by-id}
`mssql_get_event_by_id` is used to fetch the event and invocation logs for a given `event_id`.
```http
POST /v1/metadata HTTP/1.1
Content-Type: application/json
X-Hasura-Role: admin
{
"type": "mssql_get_event_by_id",
"args": {
"source" : "default",
"event_id" : "81531A4C-AED7-4EFE-964D-D115A77B05C2",
"invocation_log_limit" : 100,
"invocation_log_offset" : 0
}
}
```
### Args syntax {#metadata-mssql-get-event-by-id-syntax}
| Key | Required | Schema | Description |
| ------- | -------- | --------------------------------------------------------- | --------------------------------------------------------------------------------- |
| source | false | [SourceName](/api-reference/syntax-defs.mdx#sourcename) | Name of the source database of the trigger (default: `default`) |
| event_id | true | String | UUID of the event |
| invocation_log_limit | false | Integer | Maximum number of invocation logs to be returned in one API call (default: `100`) |
| invocation_log_offset | false | Integer | Starting point from where the invocation logs need to be returned (default: `0`) |
## cleanup_event_trigger_logs {#metadata-cleanup-event-trigger-logs}
<ProductBadge free standard pro ee self />

View File

@ -25,6 +25,9 @@ module Hasura.Backends.MSSQL.DDL.EventTrigger
updateCleanupEventStatusToPaused,
updateCleanupEventStatusToCompleted,
deleteEventTriggerLogs,
fetchEventLogs,
fetchEventInvocationLogs,
fetchEventById,
)
where
@ -449,59 +452,24 @@ fetchEvents source triggerNames (FetchBatchSize fetchBatchSize) = do
-- 'IN' MSSQL operator.
triggerNamesTxt = "(" <> commaSeparated (map (\t -> "'" <> toTxt t <> "'") triggerNames) <> ")"
uncurryEvent (id', sn, tn, trn, payload' :: Text, tries, created_at :: B.ByteString, next_retry_at :: Maybe B.ByteString) = do
payload <- encodePayload payload'
createdAt <- convertTime created_at
retryAt <- traverse convertTime next_retry_at
uncurryEvent (eventId, sn, tn, trn, payload :: Text, tries, createdAt :: B.ByteString, nextRetryAt :: Maybe B.ByteString) = do
-- see Note [Encode Event Trigger Payload to JSON in SQL Server]
payload' <- encodeJSON payload "payload decode failed while fetching MSSQL events"
createdAt' <- bsToUTCTime createdAt "conversion of created_at to UTCTime failed while fetching MSSQL events"
retryAt <- traverse (`bsToUTCTime` "conversion of next_retry_at to UTCTime failed while fetching MSSQL events") nextRetryAt
pure $
Event
{ eId = EventId (bsToTxt id'),
{ eId = EventId (bsToTxt eventId),
eSource = source,
eTable = (TableName tn (SchemaName sn)),
eTrigger = TriggerMetadata (TriggerName $ mkNonEmptyTextUnsafe trn),
eEvent = payload,
eEvent = payload',
eTries = tries,
eCreatedAt = createdAt,
eCreatedAt = createdAt',
eRetryAt = retryAt
}
-- Note: We do not have JSON datatype in SQL Server. But since in
-- 'mkAllTriggersQ' we ensure that all the values in the payload column of
-- hdb_catalog.event_log is always a JSON. We can directly decode the payload
-- value and not worry that the decoding will fail.
--
-- We ensure that the values in 'hd_catalog.event_log' is always a JSON is by
-- using the 'FOR JSON PATH' MSSQL operand when inserting value into the
-- 'hdb_catalog.event_log' table.
encodePayload :: (J.FromJSON a, QErrM m) => Text -> m a
encodePayload payload =
onLeft
-- The NVARCHAR column has UTF-16 or UCS-2 encoding. Ref: https://learn.microsoft.com/en-us/sql/t-sql/data-types/nchar-and-nvarchar-transact-sql?view=sql-server-ver16#nvarchar---n--max--
-- But JSON strings are expected to have UTF-8 encoding as per spec. Ref: https://www.rfc-editor.org/rfc/rfc8259#section-8.1
-- Hence it's important to encode the payload into UTF-8 else the decoding of
-- text to JSON will fail.
(J.eitherDecode $ fromStrict $ TE.encodeUtf8 payload)
(\_ -> throw500 $ T.pack "payload decode failed while fetching MSSQL events")
-- Note: The ODBC server does not have a FromJSON instance of UTCTime and only
-- supports DateTime2 and SmallDateTime. But the above two data types do not
-- have time offsets and 'Event' stores the time as UTCTime. But during
-- 'mkAllTriggersQ' we do save them as UTC Time format. So we can directly decode
-- the time we get from the DB as UTCTime and not worry about exception being
-- thrown during decoding.
--
-- We ensure that the time stored in 'create_at' column is a UTCTime, by
-- defaulting the 'created_at' column to use 'SYSDATETIMEOFFSET()' MSSQL function
-- in 'init_mssql_source.sql' file. The 'SYSDATETIMEOFFSET()' function returns
-- value that contains the date and time of the computer on which the instance of
-- SQL Server is running. The time zone offset is included.
convertTime :: (QErrM m) => B.ByteString -> m UTCTime
convertTime createdAt =
onLeft
(readEither (T.unpack $ bsToTxt createdAt) :: Either String UTCTime)
(\_ -> throw500 $ T.pack "conversion to UTCTime failed while fetching MSSQL events")
dropTriggerQ :: TriggerName -> SchemaName -> TxE QErr ()
dropTriggerQ triggerName schemaName =
mapM_ (dropTriggerOp triggerName schemaName) [INSERT, UPDATE, DELETE]
@ -1207,3 +1175,225 @@ deleteEventTriggerLogs ::
deleteEventTriggerLogs sourceConfig oldCleanupConfig getLatestCleanupConfig = do
deleteEventTriggerLogsInBatchesWith getLatestCleanupConfig oldCleanupConfig $ \cleanupConfig -> do
runMSSQLSourceWriteTx sourceConfig $ deleteEventTriggerLogsTx cleanupConfig
fetchEventLogs ::
(MonadIO m, MonadError QErr m) =>
MSSQLSourceConfig ->
GetEventLogs b ->
m [EventLog]
fetchEventLogs sourceConfig getEventLogs = do
liftIO (runMSSQLSourceReadTx sourceConfig $ fetchEventLogsTxE getEventLogs)
`onLeftM` (throwError . prefixQErr "unexpected error while fetching event logs: ")
fetchEventLogsTxE :: GetEventLogs b -> TxE QErr [EventLog]
fetchEventLogsTxE GetEventLogs {..} = do
case status of
Pending -> do
events <-
multiRowQueryE
HGE.defaultMSSQLTxErrorHandler
[ODBC.sql|
SELECT CONVERT(varchar(MAX), id), schema_name, table_name, trigger_name, payload, delivered, error, tries,
CONVERT(varchar(MAX), created_at), CONVERT(varchar(MAX), locked), CONVERT(varchar(MAX), next_retry_at), archived
FROM hdb_catalog.event_log
WHERE trigger_name = $triggerName
AND delivered=0 AND error=0 AND archived=0
ORDER BY created_at DESC OFFSET $offset ROWS FETCH NEXT $limit ROWS ONLY;
|]
mapM uncurryEventLog events
Processed -> do
events <-
multiRowQueryE
HGE.defaultMSSQLTxErrorHandler
[ODBC.sql|
SELECT CONVERT(varchar(MAX), id), schema_name, table_name, trigger_name, payload, delivered, error, tries,
CONVERT(varchar(MAX), created_at), CONVERT(varchar(MAX), locked), CONVERT(varchar(MAX), next_retry_at), archived
FROM hdb_catalog.event_log
WHERE trigger_name = $triggerName
AND (delivered=1 OR error=1) AND archived=0
ORDER BY created_at DESC OFFSET $offset ROWS FETCH NEXT $limit ROWS ONLY;
|]
mapM uncurryEventLog events
All -> do
events <-
multiRowQueryE
HGE.defaultMSSQLTxErrorHandler
[ODBC.sql|
SELECT CONVERT(varchar(MAX), id), schema_name, table_name, trigger_name, payload, delivered, error, tries,
CONVERT(varchar(MAX), created_at), CONVERT(varchar(MAX), locked), CONVERT(varchar(MAX), next_retry_at), archived
FROM hdb_catalog.event_log
WHERE trigger_name = $triggerName
ORDER BY created_at DESC OFFSET $offset ROWS FETCH NEXT $limit ROWS ONLY;
|]
mapM uncurryEventLog events
where
triggerName = triggerNameToTxt _gelName
limit = _gelLimit
offset = _gelOffset
status = _gelStatus
fetchEventInvocationLogs ::
(MonadError QErr m, MonadIO m) =>
MSSQLSourceConfig ->
GetEventInvocations b ->
m [EventInvocationLog]
fetchEventInvocationLogs sourceConfig getEventInvocationLogs = do
liftIO (runMSSQLSourceReadTx sourceConfig $ fetchEventInvocationLogsTxE getEventInvocationLogs)
`onLeftM` (throwError . prefixQErr "unexpected error while fetching invocation logs: ")
fetchEventInvocationLogsTxE :: GetEventInvocations b -> TxE QErr [EventInvocationLog]
fetchEventInvocationLogsTxE GetEventInvocations {..} = do
invocations <-
multiRowQueryE
HGE.defaultMSSQLTxErrorHandler
[ODBC.sql|
SELECT CONVERT(varchar(MAX), id), trigger_name, CONVERT(varchar(MAX), event_id),
status, request, response, CONVERT(varchar(MAX), created_at)
FROM hdb_catalog.event_invocation_logs
WHERE trigger_name = $triggerName
ORDER BY created_at DESC OFFSET $offset ROWS FETCH NEXT $limit ROWS ONLY;
|]
mapM uncurryEventInvocationLog invocations
where
triggerName = triggerNameToTxt _geiName
limit = _geiLimit
offset = _geiOffset
fetchEventById ::
(MonadError QErr m, MonadIO m) =>
MSSQLSourceConfig ->
GetEventById b ->
m (EventLogWithInvocations)
fetchEventById sourceConfig getEventById = do
fetchEventByIdTxE' <- liftIO $ runMSSQLSourceReadTx sourceConfig $ fetchEventByIdTxE getEventById
case fetchEventByIdTxE' of
Left err ->
throwError $
prefixQErr ("unexpected error while fetching event with id " <> eventId <> ": ") err
Right eventLogWithInvocations -> do
if isNothing (elwiEvent eventLogWithInvocations)
then throw400 NotExists errMsg
else return eventLogWithInvocations
where
eventId = unEventId $ _gebiEventId getEventById
errMsg = "event id " <> eventId <> " does not exist"
fetchEventByIdTxE :: GetEventById b -> TxE QErr (EventLogWithInvocations)
fetchEventByIdTxE GetEventById {..} = do
eventsQuery <-
multiRowQueryE
HGE.defaultMSSQLTxErrorHandler
[ODBC.sql|
SELECT CONVERT(varchar(MAX), id), schema_name, table_name, trigger_name, payload, delivered, error, tries,
CONVERT(varchar(MAX), created_at), CONVERT(varchar(MAX), locked), CONVERT(varchar(MAX), next_retry_at), archived
FROM hdb_catalog.event_log
WHERE id = $eventId;
|]
events <- mapM uncurryEventLog eventsQuery
case events of
[] -> return $ EventLogWithInvocations Nothing []
[event] -> do
invocationsQuery <-
multiRowQueryE
HGE.defaultMSSQLTxErrorHandler
[ODBC.sql|
SELECT CONVERT(varchar(MAX), id), trigger_name, CONVERT(varchar(MAX), event_id),
status, request, response, CONVERT(varchar(MAX), created_at)
FROM hdb_catalog.event_invocation_logs
WHERE event_id = $eventId
ORDER BY created_at DESC OFFSET $offset ROWS FETCH NEXT $limit ROWS ONLY;
|]
invocations <- mapM uncurryEventInvocationLog invocationsQuery
pure $ EventLogWithInvocations (Just event) invocations
_ -> throw500 $ "Unexpected error: Multiple events present with event id " <> eventId
where
eventId = unEventId _gebiEventId
limit = _gebiInvocationLogLimit
offset = _gebiInvocationLogOffset
uncurryEventLog ::
(MonadError QErr m) =>
(B.ByteString, Text, Text, Text, Text, Bool, Bool, Int, B.ByteString, Maybe B.ByteString, Maybe B.ByteString, Bool) ->
m EventLog
uncurryEventLog (eventId, schemaName, tableName, triggerName, payload, delivered, isError, tries, createdAt, locked, nextRetryAt, archived) = do
-- see Note [Encode Event Trigger Payload to JSON in SQL Server]
payload' <- encodeJSON payload "payload decode failed while fetching MSSQL events"
createdAt' <- bsToUTCTime createdAt "conversion of created_at to UTCTime failed while fetching MSSQL events"
locked' <- traverse (`bsToUTCTime` "conversion of locked to UTCTime failed while fetching MSSQL events") locked
nextRetryAt' <- traverse (`bsToUTCTime` "conversion of next_retry_at to UTCTime failed while fetching MSSQL events") nextRetryAt
pure
EventLog
{ elId = EventId (bsToTxt eventId),
elSchemaName = schemaName,
elTableName = tableName,
elTriggerName = TriggerName (mkNonEmptyTextUnsafe triggerName),
elPayload = payload',
elDelivered = delivered,
elError = isError,
elTries = tries,
elCreatedAt = createdAt',
elLocked = locked',
elNextRetryAt = nextRetryAt',
elArchived = archived
}
uncurryEventInvocationLog ::
(MonadError QErr m) =>
(B.ByteString, Text, B.ByteString, Maybe Int, Text, Text, B.ByteString) ->
m EventInvocationLog
uncurryEventInvocationLog (invocationId, triggerName, eventId, status, request, response, createdAt) = do
request' <- encodeJSON request "request decode failed while fetching MSSQL event invocations"
response' <- encodeJSON response "response decode failed while fetching MSSQL event invocations"
createdAt' <- bsToUTCTime createdAt "conversion of created_at to UTCTime failed while fetching MSSQL event invocations"
pure
EventInvocationLog
{ eilId = bsToTxt invocationId,
eilTriggerName = TriggerName (mkNonEmptyTextUnsafe triggerName),
eilEventId = EventId (bsToTxt eventId),
eilHttpStatus = status,
eilRequest = request',
eilResponse = response',
eilCreatedAt = createdAt'
}
{- Note [Encode Event Trigger Payload to JSON in SQL Server]
We do not have JSON datatype in SQL Server. But since in 'mkAllTriggersQ' we
ensure that all the values in the payload column of hdb_catalog.event_log is
always a JSON. We can directly decode the payload value and not worry that the
decoding will fail.
We ensure that the values in 'hd_catalog.event_log' is always a JSON is by using
the 'FOR JSON PATH' MSSQL operand when inserting value into the
'hdb_catalog.event_log' table.
-}
encodeJSON :: (J.FromJSON a, QErrM m) => Text -> String -> m a
encodeJSON json err =
onLeft
-- The NVARCHAR column has UTF-16 or UCS-2 encoding. Ref:
-- https://learn.microsoft.com/en-us/sql/t-sql/data-types/nchar-and-nvarchar-transact-sql?view=sql-server-ver16#nvarchar---n--max--
-- But JSON strings are expected to have UTF-8 encoding as per spec. Ref:
-- https://www.rfc-editor.org/rfc/rfc8259#section-8.1 Hence it's important
-- to encode the json into UTF-8 else the decoding of text to JSON will
-- fail.
(J.eitherDecode $ fromStrict $ TE.encodeUtf8 json)
(\_ -> throw500 $ T.pack err)
-- | UTCTime type is used to store all the time related information pertaining
-- to event triggers (i.e `created_at`, `locked` and `next_retry_at`). The ODBC
-- server does not have a FromJSON instance of UTCTime datatype. This mean the
-- direct conversion of the "time related data" which ODBC server fetches to
-- UTCTime is not possible.
--
-- As a workaround, we cast the data from ODBC server to Bytestring and then use
-- the `readEither` to parse that bytestring to UTCTime.
--
-- We make sure that the parse will never fail, by ensuring that values present
-- in the `created_at`, `locked` and `next_retry_at` columns are always in UTC
-- Time.
bsToUTCTime :: MonadError QErr m => B.ByteString -> String -> m UTCTime
bsToUTCTime timeInByteString err =
onLeft
(readEither (T.unpack $ bsToTxt timeInByteString) :: Either String UTCTime)
(\_ -> throw500 $ T.pack err)

View File

@ -31,6 +31,9 @@ module Hasura.Backends.Postgres.DDL.EventTrigger
updateCleanupEventStatusToPaused,
updateCleanupEventStatusToCompleted,
deleteEventTriggerLogs,
fetchEventLogs,
fetchEventInvocationLogs,
fetchEventById,
)
where
@ -1149,3 +1152,171 @@ deleteEventTriggerLogs ::
deleteEventTriggerLogs sourceConfig oldCleanupConfig getLatestCleanupConfig = do
deleteEventTriggerLogsInBatchesWith getLatestCleanupConfig oldCleanupConfig $ \cleanupConfig -> do
runPgSourceWriteTx sourceConfig InternalRawQuery $ deleteEventTriggerLogsTx cleanupConfig
fetchEventLogs ::
(MonadError QErr m, MonadIO m) =>
PGSourceConfig ->
GetEventLogs b ->
m [EventLog]
fetchEventLogs sourceConfig getEventLogs = do
liftIO (runPgSourceReadTx sourceConfig $ fetchEventLogsTxE getEventLogs)
`onLeftM` (throwError . prefixQErr "unexpected error while fetching event logs: ")
fetchEventLogsTxE :: GetEventLogs b -> PG.TxE QErr [EventLog]
fetchEventLogsTxE GetEventLogs {..} = do
case status of
Pending -> do
map uncurryEventLog
<$> PG.withQE
defaultTxErrorHandler
[PG.sql|
SELECT *
FROM hdb_catalog.event_log
WHERE trigger_name = $1
AND delivered=false AND error=false AND archived=false ORDER BY created_at DESC LIMIT $2 OFFSET $3;
|]
(triggerName, limit, offset)
True
Processed -> do
map uncurryEventLog
<$> PG.withQE
defaultTxErrorHandler
[PG.sql|
SELECT *
FROM hdb_catalog.event_log
WHERE trigger_name = $1
AND (delivered=true OR error=true) AND archived=false ORDER BY created_at DESC LIMIT $2 OFFSET $3;
|]
(triggerName, limit, offset)
True
All -> do
map uncurryEventLog
<$> PG.withQE
defaultTxErrorHandler
[PG.sql|
SELECT *
FROM hdb_catalog.event_log
WHERE trigger_name = $1
ORDER BY created_at DESC LIMIT $2 OFFSET $3;
|]
(triggerName, limit, offset)
True
where
triggerName = triggerNameToTxt _gelName
status = _gelStatus
limit :: Int64 = fromIntegral $ _gelLimit
offset :: Int64 = fromIntegral $ _gelOffset
fetchEventInvocationLogs ::
(MonadError QErr m, MonadIO m) =>
PGSourceConfig ->
GetEventInvocations b ->
m [EventInvocationLog]
fetchEventInvocationLogs sourceConfig getEventInvocationLogs = do
liftIO (runPgSourceReadTx sourceConfig $ fetchEventInvocationLogsTxE getEventInvocationLogs)
`onLeftM` (throwError . prefixQErr "unexpected error while fetching invocation logs: ")
fetchEventInvocationLogsTxE :: GetEventInvocations b -> PG.TxE QErr [EventInvocationLog]
fetchEventInvocationLogsTxE GetEventInvocations {..} = do
map uncurryEventInvocationLog
<$> PG.withQE
defaultTxErrorHandler
[PG.sql|
SELECT *
FROM hdb_catalog.event_invocation_logs
WHERE trigger_name = $1
ORDER BY created_at DESC LIMIT $2 OFFSET $3;
|]
(triggerName, limit, offset)
True
where
triggerName = triggerNameToTxt _geiName
limit :: Int64 = fromIntegral $ _geiLimit
offset :: Int64 = fromIntegral $ _geiOffset
fetchEventById ::
(MonadError QErr m, MonadIO m) =>
PGSourceConfig ->
GetEventById b ->
m (EventLogWithInvocations)
fetchEventById sourceConfig getEventById = do
fetchEventByIdTxE' <- liftIO $ runPgSourceReadTx sourceConfig $ fetchEventByIdTxE getEventById
case fetchEventByIdTxE' of
Left err ->
throwError $
prefixQErr ("unexpected error while fetching event with id " <> eventId <> ": ") err
Right eventLogWithInvocations -> do
if isNothing (elwiEvent eventLogWithInvocations)
then throw400 NotExists errMsg
else return eventLogWithInvocations
where
eventId = unEventId $ _gebiEventId getEventById
errMsg = "event id " <> eventId <> " does not exist"
fetchEventByIdTxE :: GetEventById b -> PG.TxE QErr (EventLogWithInvocations)
fetchEventByIdTxE GetEventById {..} = do
events <-
map uncurryEventLog
<$> PG.withQE
defaultTxErrorHandler
[PG.sql|
SELECT *
FROM hdb_catalog.event_log
WHERE id = $1;
|]
(Identity eventId)
True
case events of
[] -> return $ EventLogWithInvocations Nothing []
[event] -> do
invocations <-
map uncurryEventInvocationLog
<$> PG.withQE
defaultTxErrorHandler
[PG.sql|
SELECT *
FROM hdb_catalog.event_invocation_logs
WHERE event_id = $1
ORDER BY created_at DESC LIMIT $2 OFFSET $3;
|]
(eventId, limit, offset)
True
pure $ EventLogWithInvocations (Just event) invocations
_ -> throw500 $ "Unexpected error: Multiple events present with event id " <> eventId
where
eventId = unEventId _gebiEventId
limit :: Int64 = fromIntegral $ _gebiInvocationLogLimit
offset :: Int64 = fromIntegral $ _gebiInvocationLogOffset
uncurryEventLog ::
(EventId, Text, Text, TriggerName, PG.ViaJSON Value, Bool, Bool, Int, Time.UTCTime, Maybe Time.UTCTime, Maybe Time.UTCTime, Bool) ->
EventLog
uncurryEventLog (eventId, schemaName, tableName, triggerName, PG.ViaJSON payload, delivered, isError, tries, createdAt, locked, nextRetryAt, archived) =
EventLog
{ elId = eventId,
elSchemaName = schemaName,
elTableName = tableName,
elTriggerName = triggerName,
elPayload = payload,
elDelivered = delivered,
elError = isError,
elTries = tries,
elCreatedAt = createdAt,
elLocked = locked,
elNextRetryAt = nextRetryAt,
elArchived = archived
}
uncurryEventInvocationLog ::
(Text, TriggerName, EventId, Maybe Int, PG.ViaJSON Value, PG.ViaJSON Value, Time.UTCTime) ->
EventInvocationLog
uncurryEventInvocationLog (invocationId, triggerName, eventId, status, PG.ViaJSON request, PG.ViaJSON response, createdAt) =
EventInvocationLog
{ eilId = invocationId,
eilTriggerName = triggerName,
eilEventId = eventId,
eilHttpStatus = status,
eilRequest = request,
eilResponse = response,
eilCreatedAt = createdAt
}

View File

@ -42,6 +42,9 @@ module Hasura.RQL.DDL.EventTrigger
MonadEventLogCleanup (..),
getAllEventTriggersWithCleanupConfig,
getAllETWithCleanupConfigInTableMetadata,
runGetEventLogs,
runGetEventInvocationLogs,
runGetEventById,
)
where
@ -450,6 +453,21 @@ askEventTriggerInfo sourceName triggerName = do
where
errMsg = "event trigger " <> triggerName <<> " does not exist"
checkIfTriggerNameExists ::
forall b m.
(Backend b, CacheRM m) =>
SourceName ->
TriggerName ->
m (Bool)
checkIfTriggerNameExists sourceName triggerName = do
schemaCache <- askSchemaCache
-- TODO: The name getTabInfoFromSchemaCache is misleading here.
-- There is a JIRA ticket that addresses this (https://hasurahq.atlassian.net/browse/GS-535)
let tableInfoMaybe = getTabInfoFromSchemaCache @b schemaCache sourceName triggerName
case tableInfoMaybe of
Nothing -> pure False
_ -> pure True
-- This change helps us create functions for the event triggers
-- without the function name being truncated by PG, since PG allows
-- for only 63 chars for identifiers.
@ -751,3 +769,44 @@ getAllETWithCleanupConfigInTableMetadata tMetadata =
)
$ OMap.toList
$ _tmEventTriggers tMetadata
runGetEventLogs ::
forall b m.
(MonadIO m, CacheRM m, MonadError QErr m, BackendEventTrigger b, MetadataM m) =>
GetEventLogs b ->
m EncJSON
runGetEventLogs getEventLogs = do
sourceConfig <- askSourceConfig @b sourceName
doesTriggerExists <- checkIfTriggerNameExists @b sourceName triggerName
if not doesTriggerExists
then throw400 NotExists $ "event trigger " <> triggerName <<> " does not exist"
else encJFromJValue <$> fetchEventLogs sourceConfig getEventLogs
where
sourceName = _gelSourceName getEventLogs
triggerName = _gelName getEventLogs
runGetEventInvocationLogs ::
forall b m.
(MonadIO m, CacheRM m, MonadError QErr m, BackendEventTrigger b, MetadataM m) =>
GetEventInvocations b ->
m EncJSON
runGetEventInvocationLogs getEventInvocations = do
sourceConfig <- askSourceConfig @b sourceName
doesTriggerExists <- checkIfTriggerNameExists @b sourceName triggerName
if not doesTriggerExists
then throw400 NotExists $ "event trigger " <> triggerName <<> " does not exist"
else encJFromJValue <$> fetchEventInvocationLogs sourceConfig getEventInvocations
where
sourceName = _geiSourceName getEventInvocations
triggerName = _geiName getEventInvocations
runGetEventById ::
forall b m.
(MonadIO m, CacheRM m, MonadError QErr m, BackendEventTrigger b, MetadataM m) =>
GetEventById b ->
m EncJSON
runGetEventById getEventById = do
sourceConfig <- askSourceConfig @b sourceName
encJFromJValue <$> fetchEventById sourceConfig getEventById
where
sourceName = _gebiSourceName getEventById

View File

@ -35,6 +35,13 @@ module Hasura.RQL.Types.EventTrigger
TriggerLogCleanupToggleConfig (..),
updateCleanupConfig,
isIllegalTriggerName,
EventLogStatus (..),
GetEventLogs (..),
EventLog (..),
GetEventInvocations (..),
EventInvocationLog (..),
GetEventById (..),
EventLogWithInvocations (..),
)
where
@ -551,3 +558,136 @@ data DeletedEventLogStats = DeletedEventLogStats
deletedInvocationLogs :: Int
}
deriving (Show, Eq)
data EventLogStatus
= Processed
| Pending
| All
deriving (Show, Eq)
instance ToJSON EventLogStatus where
toJSON Processed = String "processed"
toJSON Pending = String "pending"
toJSON All = String "all"
instance FromJSON EventLogStatus where
parseJSON (String "processed") = pure Processed
parseJSON (String "pending") = pure Pending
parseJSON _ = fail "event logs status can only be one of the following: processed or pending"
data GetEventLogs (b :: BackendType) = GetEventLogs
{ _gelName :: TriggerName,
_gelSourceName :: SourceName,
_gelLimit :: Int,
_gelOffset :: Int,
_gelStatus :: EventLogStatus
}
deriving (Show)
instance ToJSON (GetEventLogs b) where
toJSON GetEventLogs {..} =
object $
[ "name" .= _gelName,
"source" .= _gelSourceName,
"limit" .= _gelLimit,
"offset" .= _gelOffset,
"status" .= _gelStatus
]
instance FromJSON (GetEventLogs b) where
parseJSON = withObject "GetEventLogs" $ \o ->
GetEventLogs
<$> o .: "name"
<*> o .:? "source" .!= SNDefault
<*> o .:? "limit" .!= 100
<*> o .:? "offset" .!= 0
<*> o .:? "status" .!= All
data EventLog = EventLog
{ elId :: EventId,
elSchemaName :: Text,
elTableName :: Text,
elTriggerName :: TriggerName,
elPayload :: Value,
elDelivered :: Bool,
elError :: Bool,
elTries :: Int,
elCreatedAt :: Time.UTCTime,
elLocked :: Maybe Time.UTCTime,
elNextRetryAt :: Maybe Time.UTCTime,
elArchived :: Bool
}
deriving (Eq, Generic)
$(deriveToJSON hasuraJSON ''EventLog)
data GetEventInvocations (b :: BackendType) = GetEventInvocations
{ _geiName :: TriggerName,
_geiSourceName :: SourceName,
_geiLimit :: Int,
_geiOffset :: Int
}
deriving (Show)
instance ToJSON (GetEventInvocations b) where
toJSON GetEventInvocations {..} =
object $
[ "name" .= _geiName,
"source" .= _geiSourceName,
"limit" .= _geiLimit,
"offset" .= _geiOffset
]
instance FromJSON (GetEventInvocations b) where
parseJSON = withObject "GetEventInvocations" $ \o ->
GetEventInvocations
<$> o .: "name"
<*> o .:? "source" .!= SNDefault
<*> o .:? "limit" .!= 100
<*> o .:? "offset" .!= 0
data EventInvocationLog = EventInvocationLog
{ eilId :: Text,
eilTriggerName :: TriggerName,
eilEventId :: EventId,
eilHttpStatus :: Maybe Int,
eilRequest :: Value,
eilResponse :: Value,
eilCreatedAt :: Time.UTCTime
}
deriving (Generic)
$(deriveToJSON hasuraJSON ''EventInvocationLog)
data GetEventById (b :: BackendType) = GetEventById
{ _gebiSourceName :: SourceName,
_gebiEventId :: EventId,
_gebiInvocationLogLimit :: Int,
_gebiInvocationLogOffset :: Int
}
deriving (Show)
instance ToJSON (GetEventById b) where
toJSON GetEventById {..} =
object $
[ "source" .= _gebiSourceName,
"event_id" .= _gebiEventId,
"invocation_log_limit" .= _gebiInvocationLogLimit,
"invocation_log_offset" .= _gebiInvocationLogOffset
]
instance FromJSON (GetEventById b) where
parseJSON = withObject "GetEventById" $ \o ->
GetEventById
<$> o .:? "source" .!= SNDefault
<*> o .: "event_id"
<*> o .:? "invocation_log_limit" .!= 100
<*> o .:? "invocation_log_offset" .!= 0
data EventLogWithInvocations = EventLogWithInvocations
{ elwiEvent :: Maybe EventLog,
elwiInvocations :: [EventInvocationLog]
}
deriving (Generic)
$(deriveToJSON hasuraJSON ''EventLogWithInvocations)

View File

@ -290,6 +290,27 @@ class Backend b => BackendEventTrigger (b :: BackendType) where
IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus)) ->
m DeletedEventLogStats
-- | @fetchEventLogs fetches event logs from the source for a given event trigger.
fetchEventLogs ::
(MonadIO m, MonadError QErr m) =>
SourceConfig b ->
GetEventLogs b ->
m [EventLog]
-- | @fetchEventInvocationLogs fetches invocation logs from the source for a given event trigger.
fetchEventInvocationLogs ::
(MonadIO m, MonadError QErr m) =>
SourceConfig b ->
GetEventInvocations b ->
m [EventInvocationLog]
-- | @fetchEventById fetches the event and it's invocation logs from the source for a given EventId.
fetchEventById ::
(MonadIO m, MonadError QErr m) =>
SourceConfig b ->
GetEventById b ->
m (EventLogWithInvocations)
--------------------------------------------------------------------------------
-- TODO: move those instances to 'Backend/*/Instances/Eventing' and create a
-- corresponding 'Instances.hs' file in this directory to import them, similarly
@ -319,6 +340,9 @@ instance BackendEventTrigger ('Postgres 'Vanilla) where
updateCleanupEventStatusToPaused = Postgres.updateCleanupEventStatusToPaused
updateCleanupEventStatusToCompleted = Postgres.updateCleanupEventStatusToCompleted
deleteEventTriggerLogs = Postgres.deleteEventTriggerLogs
fetchEventLogs = Postgres.fetchEventLogs
fetchEventInvocationLogs = Postgres.fetchEventInvocationLogs
fetchEventById = Postgres.fetchEventById
instance BackendEventTrigger ('Postgres 'Citus) where
insertManualEvent _ _ _ _ _ _ = throw400 NotSupported $ "Event triggers are not supported for Citus sources"
@ -342,6 +366,9 @@ instance BackendEventTrigger ('Postgres 'Citus) where
updateCleanupEventStatusToPaused _ _ = throw400 NotSupported $ "Event triggers are not supported for Citus sources"
updateCleanupEventStatusToCompleted _ _ _ = throw400 NotSupported $ "Event triggers are not supported for Citus sources"
deleteEventTriggerLogs _ _ _ = throw400 NotSupported $ "Event triggers are not supported for Citus sources"
fetchEventLogs _ _ = throw400 NotSupported $ "Event triggers are not supported for Citus sources"
fetchEventInvocationLogs _ _ = throw400 NotSupported $ "Event triggers are not supported for Citus sources"
fetchEventById _ _ = throw400 NotSupported $ "Event triggers are not supported for Citus sources"
instance BackendEventTrigger ('Postgres 'Cockroach) where
insertManualEvent = Postgres.insertManualEvent
@ -365,6 +392,9 @@ instance BackendEventTrigger ('Postgres 'Cockroach) where
updateCleanupEventStatusToPaused = Postgres.updateCleanupEventStatusToPaused
updateCleanupEventStatusToCompleted = Postgres.updateCleanupEventStatusToCompleted
deleteEventTriggerLogs = Postgres.deleteEventTriggerLogs
fetchEventLogs = Postgres.fetchEventLogs
fetchEventInvocationLogs = Postgres.fetchEventInvocationLogs
fetchEventById = Postgres.fetchEventById
instance BackendEventTrigger 'MSSQL where
insertManualEvent = MSSQL.insertManualEvent
@ -388,6 +418,9 @@ instance BackendEventTrigger 'MSSQL where
updateCleanupEventStatusToPaused = MSSQL.updateCleanupEventStatusToPaused
updateCleanupEventStatusToCompleted = MSSQL.updateCleanupEventStatusToCompleted
deleteEventTriggerLogs = MSSQL.deleteEventTriggerLogs
fetchEventInvocationLogs = MSSQL.fetchEventInvocationLogs
fetchEventLogs = MSSQL.fetchEventLogs
fetchEventById = MSSQL.fetchEventById
instance BackendEventTrigger 'BigQuery where
insertManualEvent _ _ _ _ _ _ = throw400 NotSupported $ "Event triggers are not supported for BigQuery sources"
@ -411,6 +444,9 @@ instance BackendEventTrigger 'BigQuery where
updateCleanupEventStatusToPaused _ _ = throw400 NotSupported $ "Event triggers are not supported for BigQuery sources"
updateCleanupEventStatusToCompleted _ _ _ = throw400 NotSupported $ "Event triggers are not supported for BigQuery sources"
deleteEventTriggerLogs _ _ _ = throw400 NotSupported $ "Event triggers are not supported for BigQuery sources"
fetchEventLogs _ _ = throw400 NotSupported $ "Event triggers are not supported for BigQuery sources"
fetchEventInvocationLogs _ _ = throw400 NotSupported $ "Event triggers are not supported for BigQuery sources"
fetchEventById _ _ = throw400 NotSupported $ "Event triggers are not supported for BigQuery sources"
instance BackendEventTrigger 'MySQL where
insertManualEvent _ _ _ _ _ _ = throw400 NotSupported $ "Event triggers are not supported for MySQL sources"
@ -434,6 +470,9 @@ instance BackendEventTrigger 'MySQL where
updateCleanupEventStatusToPaused _ _ = throw400 NotSupported $ "Event triggers are not supported for MySQL sources"
updateCleanupEventStatusToCompleted _ _ _ = throw400 NotSupported $ "Event triggers are not supported for MySQL sources"
deleteEventTriggerLogs _ _ _ = throw400 NotSupported $ "Event triggers are not supported for MySQL sources"
fetchEventLogs _ _ = throw400 NotSupported $ "Event triggers are not supported for MySQL sources"
fetchEventInvocationLogs _ _ = throw400 NotSupported $ "Event triggers are not supported for MySQL sources"
fetchEventById _ _ = throw400 NotSupported $ "Event triggers are not supported for MySQL sources"
--------------------------------------------------------------------------------
@ -475,3 +514,6 @@ instance BackendEventTrigger 'DataConnector where
updateCleanupEventStatusToPaused _ _ = throw400 NotSupported $ "Event triggers are not supported for Data Connector sources"
updateCleanupEventStatusToCompleted _ _ _ = throw400 NotSupported $ "Event triggers are not supported for Data Connector sources"
deleteEventTriggerLogs _ _ _ = throw400 NotSupported $ "Event triggers are not supported for Data Connector sources"
fetchEventLogs _ _ = throw400 NotSupported $ "Event triggers are not supported for Data Connector sources"
fetchEventInvocationLogs _ _ = throw400 NotSupported $ "Event triggers are not supported for Data Connector sources"
fetchEventById _ _ = throw400 NotSupported $ "Event triggers are not supported for Data Connector sources"

View File

@ -155,7 +155,10 @@ eventTriggerCommands =
[ commandParser "invoke_event_trigger" $ RMInvokeEventTrigger . mkAnyBackend @b,
commandParser "create_event_trigger" $ RMCreateEventTrigger . mkAnyBackend @b,
commandParser "delete_event_trigger" $ RMDeleteEventTrigger . mkAnyBackend @b,
commandParser "redeliver_event" $ RMRedeliverEvent . mkAnyBackend @b
commandParser "redeliver_event" $ RMRedeliverEvent . mkAnyBackend @b,
commandParser "get_event_logs" $ RMGetEventLogs . mkAnyBackend @b,
commandParser "get_event_invocation_logs" $ RMGetEventInvocationLogs . mkAnyBackend @b,
commandParser "get_event_by_id" $ RMGetEventById . mkAnyBackend @b
]
computedFieldCommands :: forall (b :: BackendType). Backend b => [CommandParser b]

View File

@ -191,6 +191,9 @@ queryModifiesMetadata = \case
case q of
RMRedeliverEvent _ -> False
RMInvokeEventTrigger _ -> False
RMGetEventLogs _ -> False
RMGetEventInvocationLogs _ -> False
RMGetEventById _ -> False
RMGetInconsistentMetadata _ -> False
RMIntrospectRemoteSchema _ -> False
RMDumpInternalState _ -> False
@ -428,6 +431,9 @@ runMetadataQueryV1M env checkFeatureFlag remoteSchemaPerms currentResourceVersio
RMCleanupEventTriggerLog q -> runCleanupEventTriggerLog q
RMResumeEventTriggerCleanup q -> runEventTriggerResumeCleanup q
RMPauseEventTriggerCleanup q -> runEventTriggerPauseCleanup q
RMGetEventLogs q -> dispatchEventTrigger runGetEventLogs q
RMGetEventInvocationLogs q -> dispatchEventTrigger runGetEventInvocationLogs q
RMGetEventById q -> dispatchEventTrigger runGetEventById q
RMAddRemoteSchema q -> runAddRemoteSchema env q
RMUpdateRemoteSchema q -> runUpdateRemoteSchema env q
RMRemoveRemoteSchema q -> runRemoveRemoteSchema q

View File

@ -110,6 +110,9 @@ data RQLMetadataV1
| RMCleanupEventTriggerLog !TriggerLogCleanupConfig
| RMResumeEventTriggerCleanup !TriggerLogCleanupToggleConfig
| RMPauseEventTriggerCleanup !TriggerLogCleanupToggleConfig
| RMGetEventLogs !(AnyBackend GetEventLogs)
| RMGetEventInvocationLogs !(AnyBackend GetEventInvocations)
| RMGetEventById !(AnyBackend GetEventById)
| -- Remote schemas
RMAddRemoteSchema !AddRemoteSchemaQuery
| RMUpdateRemoteSchema !AddRemoteSchemaQuery