add custom headers for webhooks, refactor retry logic (#419)

This commit is contained in:
Tirumarai Selvan 2018-09-24 17:20:11 +05:30 committed by Shahidh K Muhammed
parent 65f29610f0
commit 2cd2b23b2d
15 changed files with 258 additions and 87 deletions

View File

@ -79,6 +79,7 @@ refs:
background: true
environment:
HASURA_GRAPHQL_DATABASE_URL: postgres://gql_test:@localhost:5432/gql_test
EVENT_WEBHOOK_HEADER: MyEnvValue
- run:
name: create test output dir
command: |

View File

@ -28,7 +28,7 @@ import qualified Database.PG.Query as Q
import qualified Database.PG.Query.Connection as Q
curCatalogVer :: T.Text
curCatalogVer = "2"
curCatalogVer = "3"
initCatalogSafe :: UTCTime -> Q.TxE QErr String
initCatalogSafe initTime = do
@ -180,6 +180,13 @@ migrateFrom1 = do
-- set as system defined
setAsSystemDefined
migrateFrom2 :: Q.TxE QErr ()
migrateFrom2 = Q.catchE defaultTxErrorHandler $ do
Q.unitQ "ALTER TABLE hdb_catalog.event_triggers ADD COLUMN headers JSON" () False
Q.unitQ "ALTER TABLE hdb_catalog.event_log ADD COLUMN next_retry_at TIMESTAMP" () False
Q.unitQ "CREATE INDEX ON hdb_catalog.event_log (trigger_id)" () False
Q.unitQ "CREATE INDEX ON hdb_catalog.event_invocation_logs (event_id)" () False
migrateCatalog :: UTCTime -> Q.TxE QErr String
migrateCatalog migrationTime = do
preVer <- getCatalogVersion
@ -188,9 +195,14 @@ migrateCatalog migrationTime = do
| preVer == "0.8" -> do
migrateFrom08
migrateFrom1
migrateFrom2
afterMigrate
| preVer == "1" -> do
migrateFrom1
migrateFrom2
afterMigrate
| preVer == "2" -> do
migrateFrom2
afterMigrate
| otherwise -> throw400 NotSupported $
"migrate: unsupported version : " <> preVer

View File

@ -20,21 +20,24 @@ import Control.Monad.STM (STM, atomically, retry)
import Data.Aeson
import Data.Aeson.Casing
import Data.Aeson.TH
import Data.Either (isLeft)
import Data.Has
import Data.Int (Int64)
import Data.IORef (IORef, readIORef)
import Data.Time.Clock
import Hasura.Events.HTTP
import Hasura.Prelude
import Hasura.RQL.Types
import Hasura.SQL.Types
import qualified Control.Concurrent.STM.TQueue as TQ
import qualified Control.Retry as R
import qualified Control.Lens as CL
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as B
import qualified Data.CaseInsensitive as CI
import qualified Data.HashMap.Strict as M
import qualified Data.TByteString as TBS
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import qualified Data.Time.Clock as Time
import qualified Database.PG.Query as Q
import qualified Hasura.GraphQL.Schema as GS
@ -152,49 +155,57 @@ processEvent
=> Q.PGPool -> Event -> m ()
processEvent pool e = do
(logger:: HLogger) <- asks getter
retryPolicy <- getRetryPolicy e
res <- R.retrying retryPolicy shouldRetry $ tryWebhook pool e
liftIO $ either (errorFn logger) (void.return) res
unlockRes <- liftIO $ runExceptT $ runUnlockQ pool e
liftIO $ either (logQErr logger) (void.return ) unlockRes
res <- tryWebhook pool e
finally <- either errorFn successFn res
liftIO $ either (logQErr logger) (void.return) finally
where
shouldRetry :: (Monad m ) => R.RetryStatus -> Either HTTPErr a -> m Bool
shouldRetry _ eitherResp = return $ isLeft eitherResp
errorFn
:: ( MonadReader r m
, MonadIO m
, Has WS.Session r
, Has HLogger r
, Has CacheRef r
, Has EventEngineCtx r
)
=> HTTPErr -> m (Either QErr ())
errorFn err = do
(logger:: HLogger) <- asks getter
liftIO $ logger $ L.toEngineLog err
checkError
errorFn :: HLogger -> HTTPErr -> IO ()
errorFn logger err = do
logger $ L.toEngineLog err
errorRes <- runExceptT $ runErrorQ pool e
case errorRes of
Left err' -> logQErr logger err'
Right _ -> return ()
successFn
:: ( MonadReader r m
, MonadIO m
, Has WS.Session r
, Has HLogger r
, Has CacheRef r
, Has EventEngineCtx r
)
=> B.ByteString -> m (Either QErr ())
successFn _ = liftIO $ runExceptT $ runUnlockQ pool e
logQErr :: HLogger -> QErr -> IO ()
logQErr logger err = logger $ L.toEngineLog $ EventInternalErr err
getRetryPolicy
:: ( MonadReader r m
, MonadIO m
, Has WS.Session r
, Has HLogger r
, Has CacheRef r
, Has EventEngineCtx r
)
=> Event -> m (R.RetryPolicyM m)
getRetryPolicy e = do
cacheRef::CacheRef <- asks getter
(cache, _) <- liftIO $ readIORef cacheRef
let eti = getEventTriggerInfoFromEvent cache e
retryConfM = etiRetryConf <$> eti
retryConf = fromMaybe (RetryConf 0 10) retryConfM
let remainingRetries = max 0 $ fromIntegral (rcNumRetries retryConf) - getTries
delay = fromIntegral (rcIntervalSec retryConf) * 1000000
policy = R.constantDelay delay <> R.limitRetries remainingRetries
return policy
where
getTries :: Int
getTries = fromIntegral $ eTries e
checkError
:: ( MonadReader r m
, MonadIO m
, Has WS.Session r
, Has HLogger r
, Has CacheRef r
, Has EventEngineCtx r
)
=> m (Either QErr ())
checkError = do
cacheRef::CacheRef <- asks getter
(cache, _) <- liftIO $ readIORef cacheRef
let eti = getEventTriggerInfoFromEvent cache e
retryConfM = etiRetryConf <$> eti
retryConf = fromMaybe (RetryConf 0 10) retryConfM
tries = eTries e
if tries >= rcNumRetries retryConf -- current_try = tries + 1 , allowed_total_tries = rcNumRetries retryConf + 1
then liftIO $ runExceptT $ runErrorAndUnlockQ pool e
else liftIO $ runExceptT $ runRetryAfterAndUnlockQ pool e retryConf
tryWebhook
:: ( MonadReader r m
@ -204,20 +215,21 @@ tryWebhook
, Has CacheRef r
, Has EventEngineCtx r
)
=> Q.PGPool -> Event -> R.RetryStatus -> m (Either HTTPErr B.ByteString)
tryWebhook pool e _ = do
=> Q.PGPool -> Event -> m (Either HTTPErr B.ByteString)
tryWebhook pool e = do
logger:: HLogger <- asks getter
cacheRef::CacheRef <- asks getter
(cache, _) <- liftIO $ readIORef cacheRef
let eti = getEventTriggerInfoFromEvent cache e
case eti of
let meti = getEventTriggerInfoFromEvent cache e
case meti of
Nothing -> return $ Left $ HOther "table or event-trigger not found"
Just et -> do
let webhook = etiWebhook et
Just eti -> do
let webhook = etiWebhook eti
createdAt = eCreatedAt e
eventId = eId e
headersRaw = etiHeaders eti
headers = map encodeHeader headersRaw
eeCtx <- asks getter
-- wait for counter and then increment beforing making http
liftIO $ atomically $ do
let EventEngineCtx _ c maxT _ = eeCtx
@ -225,7 +237,7 @@ tryWebhook pool e _ = do
if countThreads >= maxT
then retry
else modifyTVar' c (+1)
eitherResp <- runExceptT $ runHTTP W.defaults (mkAnyHTTPPost (T.unpack webhook) (Just $ toJSON e)) (Just (ExtraContext createdAt eventId))
eitherResp <- runExceptT $ runHTTP (addHeaders headers W.defaults) (mkAnyHTTPPost (T.unpack webhook) (Just $ toJSON e)) (Just (ExtraContext createdAt eventId))
--decrement counter once http is done
liftIO $ atomically $ do
@ -244,6 +256,15 @@ tryWebhook pool e _ = do
Left err -> liftIO $ logger $ L.toEngineLog $ EventInternalErr err
Right _ -> return ()
return eitherResp
where
addHeaders :: [(N.HeaderName, BS.ByteString)] -> W.Options -> W.Options
addHeaders headers opts = foldl (\acc h -> acc CL.& W.header (fst h) CL..~ [snd h] ) opts headers
encodeHeader :: (HeaderName, T.Text)-> (N.HeaderName, BS.ByteString)
encodeHeader header =
let name = CI.mk $ T.encodeUtf8 $ fst header
value = T.encodeUtf8 $ snd header
in (name, value)
getEventTriggerInfoFromEvent :: SchemaCache -> Event -> Maybe EventTriggerInfo
getEventTriggerInfoFromEvent sc e = let table = eTable e
@ -259,7 +280,7 @@ fetchEvents =
FROM hdb_catalog.event_log l
JOIN hdb_catalog.event_triggers e
ON (l.trigger_id = e.id)
WHERE l.delivered ='f' and l.error = 'f' and l.locked = 'f'
WHERE l.delivered ='f' and l.error = 'f' and l.locked = 'f' and (l.next_retry_at is NULL or l.next_retry_at <= now())
LIMIT 100 )
RETURNING id, schema_name, table_name, trigger_id, trigger_name, payload::json, tries, created_at
|] () True
@ -293,14 +314,6 @@ markError e =
WHERE id = $1
|] (Identity $ eId e) True
-- lockEvent :: Event -> Q.TxE QErr ()
-- lockEvent e =
-- Q.unitQE defaultTxErrorHandler [Q.sql|
-- UPDATE hdb_catalog.event_log
-- SET locked = 't'
-- WHERE id = $1
-- |] (Identity $ eId e) True
unlockEvent :: Event -> Q.TxE QErr ()
unlockEvent e =
Q.unitQE defaultTxErrorHandler [Q.sql|
@ -316,19 +329,44 @@ unlockAllEvents =
SET locked = 'f'
|] () False
setNextRetry :: Event -> UTCTime -> Q.TxE QErr ()
setNextRetry e time =
Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_log
SET next_retry_at = $1
WHERE id = $2
|] (time, eId e) True
clearNextRetry :: Event -> Q.TxE QErr ()
clearNextRetry e =
Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_log
SET next_retry_at = NULL
WHERE id = $1
|] (Identity $ eId e) True
runFailureQ :: Q.PGPool -> Invocation -> ExceptT QErr IO ()
runFailureQ pool invo = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ insertInvocation invo
runSuccessQ :: Q.PGPool -> Event -> Invocation -> ExceptT QErr IO ()
runSuccessQ pool e invo = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ do
insertInvocation invo
clearNextRetry e
markDelivered e
runErrorQ :: Q.PGPool -> Event -> ExceptT QErr IO ()
runErrorQ pool e = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ markError e
runErrorAndUnlockQ :: Q.PGPool -> Event -> ExceptT QErr IO ()
runErrorAndUnlockQ pool e = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ do
markError e
clearNextRetry e
unlockEvent e
-- runLockQ :: Q.PGPool -> Event -> ExceptT QErr IO ()
-- runLockQ pool e = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ lockEvent e
runRetryAfterAndUnlockQ :: Q.PGPool -> Event -> RetryConf -> ExceptT QErr IO ()
runRetryAfterAndUnlockQ pool e rconf = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ do
currentTime <- liftIO getCurrentTime
let diff = fromIntegral $ rcIntervalSec rconf
retryTime = addUTCTime diff currentTime
setNextRetry e retryTime
unlockEvent e
runUnlockQ :: Q.PGPool -> Event -> ExceptT QErr IO ()
runUnlockQ pool e = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ unlockEvent e

View File

@ -325,9 +325,9 @@ fetchMetadata = do
mkTriggerMetaDefs = mapM trigRowToDef
trigRowToDef (sn, tn, trn, Q.AltJ tDefVal, webhook, nr, rint) = do
trigRowToDef (sn, tn, trn, Q.AltJ tDefVal, webhook, nr, rint, Q.AltJ mheaders) = do
tDef <- decodeValue tDefVal
return (QualifiedTable sn tn, DTS.EventTriggerDef trn tDef webhook (RetryConf nr rint))
return (QualifiedTable sn tn, DTS.EventTriggerDef trn tDef webhook (RetryConf nr rint) mheaders)
fetchTables =
Q.listQ [Q.sql|
@ -357,7 +357,7 @@ fetchMetadata = do
|] () False
fetchEventTriggers =
Q.listQ [Q.sql|
SELECT e.schema_name, e.table_name, e.name, e.definition::json, e.webhook, e.num_retries, e.retry_interval
SELECT e.schema_name, e.table_name, e.name, e.definition::json, e.webhook, e.num_retries, e.retry_interval, e.headers::json
FROM hdb_catalog.event_triggers e
|] () False

View File

@ -344,9 +344,11 @@ buildSchemaCache = flip execStateT emptySchemaCache $ do
addQTemplateToCache qti
eventTriggers <- lift $ Q.catchE defaultTxErrorHandler fetchEventTriggers
forM_ eventTriggers $ \(sn, tn, trid, trn, Q.AltJ tDefVal, webhook, nr, rint) -> do
forM_ eventTriggers $ \(sn, tn, trid, trn, Q.AltJ tDefVal, webhook, nr, rint, Q.AltJ mheaders) -> do
let headerConfs = fromMaybe [] mheaders
headers <- getHeadersFromConf headerConfs
tDef <- decodeValue tDefVal
addEventTriggerToCache (QualifiedTable sn tn) trid trn tDef (RetryConf nr rint) webhook
addEventTriggerToCache (QualifiedTable sn tn) trid trn tDef (RetryConf nr rint) webhook headers
liftTx $ mkTriggerQ trid trn (QualifiedTable sn tn) tDef
@ -387,7 +389,7 @@ buildSchemaCache = flip execStateT emptySchemaCache $ do
fetchEventTriggers =
Q.listQ [Q.sql|
SELECT e.schema_name, e.table_name, e.id, e.name, e.definition::json, e.webhook, e.num_retries, e.retry_interval
SELECT e.schema_name, e.table_name, e.id, e.name, e.definition::json, e.webhook, e.num_retries, e.retry_interval, e.headers::json
FROM hdb_catalog.event_triggers e
|] () False

View File

@ -12,6 +12,7 @@ import Data.Int (Int64)
import Hasura.Prelude
import Hasura.RQL.Types
import Hasura.SQL.Types
import System.Environment (lookupEnv)
import qualified Data.FileEmbed as FE
import qualified Data.HashMap.Strict as HashMap
@ -106,12 +107,12 @@ mkTriggerQ trid trn (QualifiedTable sn tn) (TriggerOpsDef insert update delete)
addEventTriggerToCatalog :: QualifiedTable -> EventTriggerDef
-> Q.TxE QErr TriggerId
addEventTriggerToCatalog qt@(QualifiedTable sn tn) (EventTriggerDef name def webhook rconf) = do
addEventTriggerToCatalog qt@(QualifiedTable sn tn) (EventTriggerDef name def webhook rconf mheaders) = do
ids <- map runIdentity <$> Q.listQE defaultTxErrorHandler [Q.sql|
INSERT into hdb_catalog.event_triggers (name, type, schema_name, table_name, definition, webhook, num_retries, retry_interval)
VALUES ($1, 'table', $2, $3, $4, $5, $6, $7)
INSERT into hdb_catalog.event_triggers (name, type, schema_name, table_name, definition, webhook, num_retries, retry_interval, headers)
VALUES ($1, 'table', $2, $3, $4, $5, $6, $7, $8)
RETURNING id
|] (name, sn, tn, Q.AltJ $ toJSON def, webhook, toInt64 $ rcNumRetries rconf, toInt64 $ rcIntervalSec rconf) True
|] (name, sn, tn, Q.AltJ $ toJSON def, webhook, toInt64 $ rcNumRetries rconf, toInt64 $ rcIntervalSec rconf, Q.AltJ $ toJSON mheaders) True
trid <- getTrid ids
mkTriggerQ trid name qt def
@ -136,17 +137,18 @@ updateEventTriggerToCatalog
:: QualifiedTable
-> EventTriggerDef
-> Q.TxE QErr TriggerId
updateEventTriggerToCatalog qt (EventTriggerDef name def webhook rconf) = do
updateEventTriggerToCatalog qt (EventTriggerDef name def webhook rconf mheaders) = do
ids <- map runIdentity <$> Q.listQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_triggers
SET
definition = $1,
webhook = $2,
num_retries = $3,
retry_interval = $4
WHERE name = $5
retry_interval = $4,
headers = $5
WHERE name = $6
RETURNING id
|] (Q.AltJ $ toJSON def, webhook, toInt64 $ rcNumRetries rconf, toInt64 $ rcIntervalSec rconf, name) True
|] (Q.AltJ $ toJSON def, webhook, toInt64 $ rcNumRetries rconf, toInt64 $ rcIntervalSec rconf, Q.AltJ $ toJSON mheaders, name) True
trid <- getTrid ids
mkTriggerQ trid name qt def
return trid
@ -199,7 +201,7 @@ markForDelivery eid =
|] (Identity eid) True
subTableP1 :: (P1C m) => CreateEventTriggerQuery -> m (QualifiedTable, Bool, EventTriggerDef)
subTableP1 (CreateEventTriggerQuery name qt insert update delete retryConf webhook replace) = do
subTableP1 (CreateEventTriggerQuery name qt insert update delete retryConf webhook mheaders replace) = do
adminOnly
ti <- askTabInfo qt
-- can only replace for same table
@ -212,7 +214,7 @@ subTableP1 (CreateEventTriggerQuery name qt insert update delete retryConf webho
assertCols ti delete
let rconf = fromMaybe (RetryConf defaultNumRetries defaultRetryInterval) retryConf
return (qt, replace, EventTriggerDef name (TriggerOpsDef insert update delete) webhook rconf)
return (qt, replace, EventTriggerDef name (TriggerOpsDef insert update delete) webhook rconf mheaders)
where
assertCols _ Nothing = return ()
assertCols ti (Just sos) = do
@ -222,14 +224,16 @@ subTableP1 (CreateEventTriggerQuery name qt insert update delete retryConf webho
SubCArray pgcols -> forM_ pgcols (assertPGCol (tiFieldInfoMap ti) "")
subTableP2 :: (P2C m) => QualifiedTable -> Bool -> EventTriggerDef -> m ()
subTableP2 qt replace q@(EventTriggerDef name def webhook rconf) = do
subTableP2 qt replace q@(EventTriggerDef name def webhook rconf mheaders) = do
trid <- if replace
then do
delEventTriggerFromCache qt name
liftTx $ updateEventTriggerToCatalog qt q
else
liftTx $ addEventTriggerToCatalog qt q
addEventTriggerToCache qt trid name def rconf webhook
let headerConfs = fromMaybe [] mheaders
headers <- getHeadersFromConf headerConfs
addEventTriggerToCache qt trid name def rconf webhook headers
subTableP2shim :: (P2C m) => (QualifiedTable, Bool, EventTriggerDef) -> m RespBody
subTableP2shim (qt, replace, etdef) = do
@ -272,5 +276,17 @@ instance HDBQuery DeliverEventQuery where
phaseTwo q _ = deliverEvent q
schemaCachePolicy = SCPNoChange
getHeadersFromConf :: (P2C m) => [HeaderConf] -> m [(HeaderName, T.Text)]
getHeadersFromConf = mapM getHeader
where
getHeader :: (P2C m) => HeaderConf -> m (HeaderName, T.Text)
getHeader hconf = case hconf of
(HeaderConf name (HVValue val)) -> return (name, val)
(HeaderConf name (HVEnv val)) -> do
mEnv <- liftIO $ lookupEnv (T.unpack val)
case mEnv of
Nothing -> throw400 NotFound $ "environment variable '" <> val <> "' not set"
Just val' -> return (name, T.pack val')
toInt64 :: (Integral a) => a -> Int64
toInt64 = fromIntegral

View File

@ -182,7 +182,7 @@ instance CacheRM P1 where
instance UserInfoM P2 where
askUserInfo = ask
type P2C m = (QErrM m, CacheRWM m, MonadTx m)
type P2C m = (QErrM m, CacheRWM m, MonadTx m, MonadIO m)
class (Monad m) => MonadTx m where
liftTx :: Q.TxE QErr a -> m a

View File

@ -357,6 +357,7 @@ data EventTriggerInfo
, etiDelete :: !(Maybe OpTriggerInfo)
, etiRetryConf :: !RetryConf
, etiWebhook :: !T.Text
, etiHeaders :: ![(HeaderName, T.Text)]
} deriving (Show, Eq)
$(deriveToJSON (aesonDrop 3 snakeCase) ''EventTriggerInfo)
@ -594,8 +595,9 @@ addEventTriggerToCache
-> TriggerOpsDef
-> RetryConf
-> T.Text
-> [(HeaderName, T.Text)]
-> m ()
addEventTriggerToCache qt trid trn tdef rconf webhook =
addEventTriggerToCache qt trid trn tdef rconf webhook headers =
modTableInCache modEventTriggerInfo qt
where
modEventTriggerInfo ti = do
@ -607,6 +609,7 @@ addEventTriggerToCache qt trid trn tdef rconf webhook =
(getOpInfo trn ti $ tdDelete tdef)
rconf
webhook
headers
etim = tiEventTriggerInfoMap ti
-- fail $ show (toJSON eti)
return $ ti { tiEventTriggerInfoMap = M.insert trn eti etim}

View File

@ -15,6 +15,9 @@ module Hasura.RQL.Types.Subscribe
, RetryConf(..)
, DeleteEventTriggerQuery(..)
, DeliverEventQuery(..)
, HeaderConf(..)
, HeaderValue(..)
, HeaderName
) where
import Data.Aeson
@ -30,6 +33,7 @@ import qualified Data.Text as T
type TriggerName = T.Text
type TriggerId = T.Text
type EventId = T.Text
type HeaderName = T.Text
data SubscribeColumns = SubCStar | SubCArray [PGCol] deriving (Show, Eq, Lift)
@ -59,6 +63,28 @@ data RetryConf
$(deriveJSON (aesonDrop 2 snakeCase){omitNothingFields=True} ''RetryConf)
data HeaderConf = HeaderConf HeaderName HeaderValue
deriving (Show, Eq, Lift)
data HeaderValue = HVValue T.Text | HVEnv T.Text
deriving (Show, Eq, Lift)
instance FromJSON HeaderConf where
parseJSON (Object o) = do
name <- o .: "name"
value <- o .:? "value"
valueFromEnv <- o .:? "value_from_env"
case (value, valueFromEnv ) of
(Nothing, Nothing) -> fail "expecting value or value_from_env keys"
(Just val, Nothing) -> return $ HeaderConf name (HVValue val)
(Nothing, Just val) -> return $ HeaderConf name (HVEnv val)
(Just _, Just _) -> fail "expecting only one of value or value_from_env keys"
parseJSON _ = fail "expecting object for headers"
instance ToJSON HeaderConf where
toJSON (HeaderConf name (HVValue val)) = object ["name" .= name, "value" .= val]
toJSON (HeaderConf name (HVEnv val)) = object ["name" .= name, "value_from_env" .= val]
data CreateEventTriggerQuery
= CreateEventTriggerQuery
{ cetqName :: !T.Text
@ -68,6 +94,7 @@ data CreateEventTriggerQuery
, cetqDelete :: !(Maybe SubscribeOpSpec)
, cetqRetryConf :: !(Maybe RetryConf)
, cetqWebhook :: !T.Text
, cetqHeaders :: !(Maybe [HeaderConf])
, cetqReplace :: !Bool
} deriving (Show, Eq, Lift)
@ -79,7 +106,8 @@ instance FromJSON CreateEventTriggerQuery where
update <- o .:? "update"
delete <- o .:? "delete"
retryConf <- o .:? "retry_conf"
webhook <- o .: "webhook"
webhook <- o .: "webhook"
headers <- o .:? "headers"
replace <- o .:? "replace" .!= False
let regex = mkRegex "^\\w+$"
mName = matchRegex regex (T.unpack name)
@ -89,7 +117,7 @@ instance FromJSON CreateEventTriggerQuery where
case insert <|> update <|> delete of
Just _ -> return ()
Nothing -> fail "must provide operation spec(s)"
return $ CreateEventTriggerQuery name table insert update delete retryConf webhook replace
return $ CreateEventTriggerQuery name table insert update delete retryConf webhook headers replace
parseJSON _ = fail "expecting an object"
$(deriveToJSON (aesonDrop 4 snakeCase){omitNothingFields=True} ''CreateEventTriggerQuery)
@ -127,6 +155,7 @@ data EventTriggerDef
, etdDefinition :: !TriggerOpsDef
, etdWebhook :: !T.Text
, etdRetryConf :: !RetryConf
, etdHeaders :: !(Maybe [HeaderConf])
} deriving (Show, Eq, Lift)
$(deriveJSON (aesonDrop 3 snakeCase){omitNothingFields=True} ''EventTriggerDef)

View File

@ -193,7 +193,8 @@ CREATE TABLE hdb_catalog.event_triggers
webhook TEXT NOT NULL,
num_retries INTEGER DEFAULT 0,
retry_interval INTEGER DEFAULT 10,
comment TEXT
comment TEXT,
headers JSON
);
CREATE TABLE hdb_catalog.event_log
@ -208,9 +209,12 @@ CREATE TABLE hdb_catalog.event_log
error BOOLEAN NOT NULL DEFAULT FALSE,
tries INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMP DEFAULT NOW(),
locked BOOLEAN NOT NULL DEFAULT FALSE
locked BOOLEAN NOT NULL DEFAULT FALSE,
next_retry_at TIMESTAMP
);
CREATE INDEX ON hdb_catalog.event_log (trigger_id);
CREATE TABLE hdb_catalog.event_invocation_logs
(
id TEXT DEFAULT gen_random_uuid() PRIMARY KEY,
@ -222,3 +226,5 @@ CREATE TABLE hdb_catalog.event_invocation_logs
FOREIGN KEY (event_id) REFERENCES hdb_catalog.event_log (id)
);
CREATE INDEX ON hdb_catalog.event_invocation_logs (event_id);

View File

@ -16,7 +16,7 @@ extra-deps:
# use https URLs so that build systems can clone these repos
# - graphql-api-0.3.0
- git: https://github.com/hasura/pg-client-hs.git
commit: e61bc37794b4d9e281bad44b2d7c8d35f2dbc770
commit: 7978e04f24790f18f06a67fe6065f470abc1c764
- git: https://github.com/hasura/graphql-parser-hs.git
commit: eae59812ec537b3756c3ddb5f59a7cc59508869b
- git: https://github.com/tdammers/ginger.git

View File

@ -0,0 +1,31 @@
type: bulk
args:
- type: run_sql
args:
sql: |
create table hge_tests.test_t1(
c1 int,
c2 text
);
- type: track_table
args:
schema: hge_tests
name: test_t1
- type: create_event_trigger
args:
name: t1_all
table:
schema: hge_tests
name: test_t1
insert:
columns: "*"
update:
columns: "*"
delete:
columns: "*"
webhook: http://127.0.0.1:5592
headers:
- name: "X-Header-From-Value"
value: "MyValue"
- name: "X-Header-From-Env"
value_from_env: "EVENT_WEBHOOK_HEADER"

View File

@ -0,0 +1,9 @@
type: bulk
args:
- type: delete_event_trigger
args:
name: t1_all
- type: run_sql
args:
sql: |
drop table hge_tests.test_t1

View File

@ -135,6 +135,30 @@ class TestRetryConf(object):
tries = hge_ctx.get_error_queue_size()
assert tries == 5, tries
class TestEvtHeaders(object):
@pytest.fixture(autouse=True)
def transact(self, request, hge_ctx):
print ("In setup method")
st_code, resp = hge_ctx.v1q_f('queries/event_triggers/headers/setup.yaml')
assert st_code == 200, resp
yield
st_code, resp = hge_ctx.v1q_f('queries/event_triggers/headers/teardown.yaml')
assert st_code == 200, resp
def test_basic(self, hge_ctx):
table = {"schema" : "hge_tests", "name": "test_t1"}
init_row = {"c1" : 1, "c2" : "hello"}
exp_ev_data = {
"old": None,
"new": init_row
}
headers = {"X-Header-From-Value": "MyValue", "X-Header-From-Env": "MyEnvValue"}
st_code, resp = insert(hge_ctx, table, init_row)
assert st_code == 200, resp
check_event(hge_ctx, "t1_all", table, "INSERT", exp_ev_data, headers, "/")
class TestUpdateEvtQuery(object):
@pytest.fixture(autouse=True)

View File

@ -25,7 +25,7 @@ def validate_event_payload(ev_payload, trig_name, table):
def validate_event_headers(ev_headers, headers):
for key, value in headers.items():
v = ev_headers.get(key)
assert v == value
assert v == value, (key, v)
def validate_event_webhook(ev_webhook_path, webhook_path):
assert ev_webhook_path == webhook_path