respect retry-after header on event trigger response (#525)

This commit is contained in:
Tirumarai Selvan 2018-10-26 21:58:03 +05:30 committed by Shahidh K Muhammed
parent 8b0082eac1
commit baf7c493bc
10 changed files with 398 additions and 210 deletions

View File

@ -7,6 +7,7 @@ import 'brace/mode/json';
import 'react-table/react-table.css';
import { deleteItem, vExpandRow, vCollapseRow } from './ViewActions'; // eslint-disable-line no-unused-vars
import FilterQuery from './FilterQuery';
import parseRowData from '../StreamingLogs/util';
import {
setOrderCol,
setOrderType,
@ -271,6 +272,8 @@ const ViewRows = ({
const currentIndex = row.index;
const currentRow = curRows[0].events[currentIndex];
const invocationRowsData = [];
const requestData = [];
const responseData = [];
currentRow.logs.map((r, rowIndex) => {
const newRow = {};
const status =
@ -280,6 +283,8 @@ const ViewRows = ({
<i className={styles.invocationFailure + ' fa fa-times'} />
);
requestData.push(parseRowData(r, 'request'));
responseData.push(parseRowData(r, 'response'));
// Insert cells corresponding to all rows
invocationColumns.forEach(col => {
const getCellContent = () => {
@ -324,20 +329,8 @@ const ViewRows = ({
showPagination={false}
SubComponent={logRow => {
const finalIndex = logRow.index;
const finalRow = currentRow.logs[finalIndex];
const currentPayload = JSON.stringify(
finalRow.request,
null,
4
);
// check if response is type JSON
let finalResponse = finalRow.response;
try {
finalResponse = JSON.parse(finalRow.response);
finalResponse = JSON.stringify(finalResponse, null, 4);
} catch (e) {
console.error(e);
}
const finalRequest = requestData[finalIndex];
const finalResponse = responseData[finalIndex];
return (
<div style={{ padding: '20px' }}>
<Tabs
@ -346,15 +339,41 @@ const ViewRows = ({
id="requestResponseTab"
>
<Tab eventKey={1} title="Request">
{finalRequest.headers ? (
<div className={styles.add_mar_top}>
<div className={styles.subheading_text}>
Headers
</div>
<AceEditor
mode="json"
theme="github"
name="payload"
value={JSON.stringify(
finalRequest.headers,
null,
4
)}
minLines={4}
maxLines={20}
width="100%"
showPrintMargin={false}
showGutter={false}
/>
</div>
) : null}
<div className={styles.add_mar_top}>
<div className={styles.subheading_text}>
Request
Payload
</div>
<AceEditor
mode="json"
theme="github"
name="payload"
value={currentPayload}
value={JSON.stringify(
finalRequest.data,
null,
4
)}
minLines={4}
maxLines={100}
width="100%"
@ -364,15 +383,41 @@ const ViewRows = ({
</div>
</Tab>
<Tab eventKey={2} title="Response">
{finalResponse.headers ? (
<div className={styles.add_mar_top}>
<div className={styles.subheading_text}>
Headers
</div>
<AceEditor
mode="json"
theme="github"
name="response"
value={JSON.stringify(
finalResponse.headers,
null,
4
)}
minLines={4}
maxLines={20}
width="100%"
showPrintMargin={false}
showGutter={false}
/>
</div>
) : null}
<div className={styles.add_mar_top}>
<div className={styles.subheading_text}>
Response
Payload
</div>
<AceEditor
mode="json"
theme="github"
name="response"
value={finalResponse}
value={JSON.stringify(
finalResponse.data,
null,
4
)}
minLines={4}
maxLines={100}
width="100%"

View File

@ -8,6 +8,7 @@ import Tab from 'react-bootstrap/lib/Tab';
import RedeliverEvent from '../TableCommon/RedeliverEvent';
import TableHeader from '../TableCommon/TableHeader';
import semverCheck from '../../../../helpers/semver';
import parseRowData from './util';
import {
loadEventLogs,
setTrigger,
@ -187,7 +188,9 @@ class StreamingLogs extends Component {
filterAll: true,
});
const invocationRowsData = [];
log.rows.map(r => {
const requestData = [];
const responseData = [];
log.rows.map((r, i) => {
const newRow = {};
const status =
r.status === 200 ? (
@ -196,6 +199,8 @@ class StreamingLogs extends Component {
<i className={styles.invocationFailure + ' fa fa-times'} />
);
requestData.push(parseRowData(r, 'request'));
responseData.push(parseRowData(r, 'response'));
// Insert cells corresponding to all rows
invocationColumns.forEach(col => {
const getCellContent = () => {
@ -220,20 +225,20 @@ class StreamingLogs extends Component {
if (col === 'operation') {
return (
<div className={conditionalClassname}>
{r.request.event.op.toLowerCase()}
{requestData[i].data.event.op.toLowerCase()}
</div>
);
}
if (col === 'primary_key') {
const tableName = r.request.table.name;
const tableName = requestData[i].data.table.name;
const tableData = allSchemas.filter(
row => row.table_name === tableName
);
const primaryKey = tableData[0].primary_key.columns; // handle all primary keys
const pkHtml = [];
primaryKey.map(pk => {
const newPrimaryKeyData = r.request.event.data.new
? r.request.event.data.new[pk]
const newPrimaryKeyData = requestData[i].data.event.data.new
? requestData[i].data.event.data.new[pk]
: '';
pkHtml.push(
<div>
@ -333,20 +338,8 @@ class StreamingLogs extends Component {
}
SubComponent={logRow => {
const finalIndex = logRow.index;
const finalRow = log.rows[finalIndex];
const currentPayload = JSON.stringify(
finalRow.request,
null,
4
);
// check if response is type JSON
let finalResponse = finalRow.response;
try {
finalResponse = JSON.parse(finalRow.response);
finalResponse = JSON.stringify(finalResponse, null, 4);
} catch (e) {
console.error(e);
}
const finalRequest = requestData[finalIndex];
const finalResponse = responseData[finalIndex];
return (
<div style={{ padding: '20px' }}>
<Tabs
@ -355,13 +348,35 @@ class StreamingLogs extends Component {
id="requestResponseTab"
>
<Tab eventKey={1} title="Request">
{finalRequest.headers ? (
<div className={styles.add_mar_top}>
<div className={styles.subheading_text}>
Headers
</div>
<AceEditor
mode="json"
theme="github"
name="headers"
value={JSON.stringify(
finalRequest.headers,
null,
4
)}
minLines={4}
maxLines={20}
width="100%"
showPrintMargin={false}
showGutter={false}
/>
</div>
) : null}
<div className={styles.add_mar_top}>
<div className={styles.subheading_text}>Request</div>
<div className={styles.subheading_text}>Payload</div>
<AceEditor
mode="json"
theme="github"
name="payload"
value={currentPayload}
value={JSON.stringify(finalRequest.data, null, 4)}
minLines={4}
maxLines={100}
width="100%"
@ -371,13 +386,35 @@ class StreamingLogs extends Component {
</div>
</Tab>
<Tab eventKey={2} title="Response">
{finalResponse.headers ? (
<div className={styles.add_mar_top}>
<div className={styles.subheading_text}>
Headers
</div>
<AceEditor
mode="json"
theme="github"
name="response"
value={JSON.stringify(
finalResponse.headers,
null,
4
)}
minLines={4}
maxLines={20}
width="100%"
showPrintMargin={false}
showGutter={false}
/>
</div>
) : null}
<div className={styles.add_mar_top}>
<div className={styles.subheading_text}>Response</div>
<AceEditor
mode="json"
theme="github"
name="response"
value={finalResponse}
value={JSON.stringify(finalResponse.data, null, 4)}
minLines={4}
maxLines={100}
width="100%"

View File

@ -0,0 +1,46 @@
const parseRowData = (row, dataType) => {
switch (dataType) {
case 'request':
switch (row.request.version) {
case '2':
const data = row.request.payload;
return {
data: data,
headers: row.request.headers,
};
default:
return {
data: row.request,
};
}
case 'response':
let data;
switch (row.response.version) {
case '2':
try {
data = JSON.parse(row.response.data.body);
} catch (e) {
console.log(e);
data = row.response.data.body;
}
return {
data: data,
headers: row.response.data.headers,
};
default:
try {
data = JSON.parse(row.response);
} catch (e) {
console.log(e);
data = row.response;
}
return {
data: data,
};
}
default:
return false;
}
};
export default parseRowData;

View File

@ -170,11 +170,12 @@ main = do
maxEvThrds <- getFromEnv defaultMaxEventThreads "HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE"
evPollSec <- getFromEnv defaultPollingIntervalSec "HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL"
logEnvHeaders <- getFromEnv False "LOG_HEADERS_FROM_ENV"
eventEngineCtx <- atomically $ initEventEngineCtx maxEvThrds evPollSec
httpSession <- WrqS.newSessionControl Nothing TLS.tlsManagerSettings
void $ C.forkIO $ processEventQueue hloggerCtx httpSession pool cacheRef eventEngineCtx
void $ C.forkIO $ processEventQueue hloggerCtx logEnvHeaders httpSession pool cacheRef eventEngineCtx
Warp.runSettings warpSettings app
@ -213,7 +214,7 @@ main = do
let mRes = case mEnv of
Nothing -> Just defaults
Just val -> readMaybe val
eRes = maybe (Left "HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE is not an integer") Right mRes
eRes = maybe (Left $ "Wrong expected type for environment variable: " <> env) Right mRes
either ((>> exitFailure) . putStrLn) return eRes
cleanSuccess = putStrLn "successfully cleaned graphql-engine related data"

View File

@ -9,17 +9,12 @@
module Hasura.Events.HTTP
( HTTP(..)
, mkHTTP
, mkAnyHTTPPost
, mkHTTPMaybe
, HTTPErr(..)
, HTTPResp(..)
, runHTTP
, default2xxParser
, noBody2xxParser
, defaultRetryPolicy
, defaultRetryFn
, defaultParser
, defaultParserMaybe
, isNetworkError
, isNetworkErrorHC
, HLogger
@ -37,8 +32,6 @@ import qualified Data.TByteString as TBS
import qualified Data.Text as T
import qualified Data.Text.Encoding as TE
import qualified Data.Text.Encoding.Error as TE
import qualified Data.Text.Lazy as TL
import qualified Data.Text.Lazy.Encoding as TLE
import qualified Data.Time.Clock as Time
import qualified Network.HTTP.Client as H
import qualified Network.HTTP.Types as N
@ -70,10 +63,44 @@ data ExtraContext
$(J.deriveJSON (J.aesonDrop 2 J.snakeCase){J.omitNothingFields=True} ''ExtraContext)
data HTTPResp
= HTTPResp
{ hrsStatus :: !Int
, hrsHeaders :: ![HeaderConf]
, hrsBody :: !TBS.TByteString
} deriving (Show, Eq)
$(J.deriveToJSON (J.aesonDrop 3 J.snakeCase){J.omitNothingFields=True} ''HTTPResp)
instance ToEngineLog HTTPResp where
toEngineLog resp = (LevelInfo, "event-trigger", J.toJSON resp )
mkHTTPResp :: W.Response B.ByteString -> HTTPResp
mkHTTPResp resp =
HTTPResp
(resp ^. W.responseStatus.W.statusCode)
(map decodeHeader $ resp ^. W.responseHeaders)
(TBS.fromLBS $ resp ^. W.responseBody)
where
decodeBS = TE.decodeUtf8With TE.lenientDecode
decodeHeader (hdrName, hdrVal)
= HeaderConf (decodeBS $ CI.original hdrName) (HVValue (decodeBS hdrVal))
data HTTPRespExtra
= HTTPRespExtra
{ _hreResponse :: HTTPResp
, _hreContext :: Maybe ExtraContext
}
$(J.deriveToJSON (J.aesonDrop 4 J.snakeCase){J.omitNothingFields=True} ''HTTPRespExtra)
instance ToEngineLog HTTPRespExtra where
toEngineLog resp = (LevelInfo, "event-trigger", J.toJSON resp )
data HTTPErr
= HClient !H.HttpException
| HParse !N.Status !String
| HStatus !N.Status TBS.TByteString
| HStatus !HTTPResp
| HOther !String
deriving (Show)
@ -84,18 +111,19 @@ instance J.ToJSON HTTPErr where
( "parse"
, J.toJSON (N.statusCode st, show e)
)
(HStatus st resp) ->
("status", J.toJSON (N.statusCode st, resp))
(HStatus resp) ->
("status", J.toJSON resp)
(HOther e) -> ("internal", J.toJSON $ show e)
where
toObj :: (T.Text, J.Value) -> J.Value
toObj (k, v) = J.object [ "type" J..= k
, "detail" J..= v]
-- encapsulates a http operation
instance ToEngineLog HTTPErr where
toEngineLog err = (LevelError, "event-trigger", J.toJSON err )
data HTTP a
= HTTP
{ _hMethod :: !String
@ -138,79 +166,20 @@ defaultRetryPolicy =
R.capDelay (120 * 1000 * 1000) (R.fullJitterBackoff (2 * 1000 * 1000))
<> R.limitRetries 15
-- a helper function
respJson :: (J.FromJSON a) => W.Response B.ByteString -> Either HTTPErr a
respJson resp =
either (Left . HParse respCode) return $
J.eitherDecode respBody
where
respCode = resp ^. W.responseStatus
respBody = resp ^. W.responseBody
defaultParser :: (J.FromJSON a) => W.Response B.ByteString -> Either HTTPErr a
defaultParser resp = if
| respCode == N.status200 -> respJson resp
| otherwise -> do
let val = TBS.fromLBS $ resp ^. W.responseBody
throwError $ HStatus respCode val
anyBodyParser :: W.Response B.ByteString -> Either HTTPErr HTTPResp
anyBodyParser resp = do
let httpResp = mkHTTPResp resp
if respCode >= N.status200 && respCode < N.status300
then return httpResp
else throwError $ HStatus httpResp
where
respCode = resp ^. W.responseStatus
-- like default parser but turns 404 into maybe
defaultParserMaybe
:: (J.FromJSON a) => W.Response B.ByteString -> Either HTTPErr (Maybe a)
defaultParserMaybe resp = if
| respCode == N.status200 -> Just <$> respJson resp
| respCode == N.status404 -> return Nothing
| otherwise -> do
let val = TBS.fromLBS $ resp ^. W.responseBody
throwError $ HStatus respCode val
where
respCode = resp ^. W.responseStatus
-- default parser which allows all 2xx responses
default2xxParser :: (J.FromJSON a) => W.Response B.ByteString -> Either HTTPErr a
default2xxParser resp = if
| respCode >= N.status200 && respCode < N.status300 -> respJson resp
| otherwise -> do
let val = TBS.fromLBS $ resp ^. W.responseBody
throwError $ HStatus respCode val
where
respCode = resp ^. W.responseStatus
noBody2xxParser :: W.Response B.ByteString -> Either HTTPErr ()
noBody2xxParser resp = if
| respCode >= N.status200 && respCode < N.status300 -> return ()
| otherwise -> do
let val = TBS.fromLBS $ resp ^. W.responseBody
throwError $ HStatus respCode val
where
respCode = resp ^. W.responseStatus
anyBodyParser :: W.Response B.ByteString -> Either HTTPErr B.ByteString
anyBodyParser resp = if
| respCode >= N.status200 && respCode < N.status300 -> return $ resp ^. W.responseBody
| otherwise -> do
let val = TBS.fromLBS $ resp ^. W.responseBody
throwError $ HStatus respCode val
where
respCode = resp ^. W.responseStatus
mkHTTP :: (J.FromJSON a) => String -> String -> HTTP a
mkHTTP method url =
HTTP method url Nothing Nothing id defaultParser
defaultRetryFn defaultRetryPolicy
mkAnyHTTPPost :: String -> Maybe J.Value -> HTTP B.ByteString
mkAnyHTTPPost :: String -> Maybe J.Value -> HTTP HTTPResp
mkAnyHTTPPost url payload =
HTTP "POST" url payload Nothing id anyBodyParser
defaultRetryFn defaultRetryPolicy
mkHTTPMaybe :: (J.FromJSON a) => String -> String -> HTTP (Maybe a)
mkHTTPMaybe method url =
HTTP method url Nothing Nothing id defaultParserMaybe
defaultRetryFn defaultRetryPolicy
-- internal logging related types
data HTTPReq
= HTTPReq
@ -226,43 +195,6 @@ $(J.deriveJSON (J.aesonDrop 4 J.snakeCase){J.omitNothingFields=True} ''HTTPReq)
instance ToEngineLog HTTPReq where
toEngineLog req = (LevelInfo, "event-trigger", J.toJSON req )
instance ToEngineLog HTTPResp where
toEngineLog resp = (LevelInfo, "event-trigger", J.toJSON resp )
data HTTPResp
= HTTPResp
{ _hrsStatus :: !Int
, _hrsHeaders :: ![T.Text]
, _hrsBody :: !TL.Text
} deriving (Show, Eq)
$(J.deriveJSON (J.aesonDrop 4 J.snakeCase){J.omitNothingFields=True} ''HTTPResp)
data HTTPRespExtra
= HTTPRespExtra
{ _hreResponse :: HTTPResp
, _hreContext :: Maybe ExtraContext
}
$(J.deriveJSON (J.aesonDrop 4 J.snakeCase){J.omitNothingFields=True} ''HTTPRespExtra)
instance ToEngineLog HTTPRespExtra where
toEngineLog resp = (LevelInfo, "event-trigger", J.toJSON resp )
mkHTTPResp :: W.Response B.ByteString -> HTTPResp
mkHTTPResp resp =
HTTPResp
(resp ^. W.responseStatus.W.statusCode)
(map decodeHeader $ resp ^. W.responseHeaders)
(decodeLBS $ resp ^. W.responseBody)
where
decodeBS = TE.decodeUtf8With TE.lenientDecode
decodeLBS = TLE.decodeUtf8With TE.lenientDecode
decodeHeader (hdrName, hdrVal)
= decodeBS (CI.original hdrName) <> " : " <> decodeBS hdrVal
runHTTP
:: ( MonadReader r m
, MonadError HTTPErr m

View File

@ -32,12 +32,13 @@ import Hasura.SQL.Types
import qualified Control.Concurrent.STM.TQueue as TQ
import qualified Control.Lens as CL
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as B
import qualified Data.CaseInsensitive as CI
import qualified Data.HashMap.Strict as M
import qualified Data.TByteString as TBS
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import qualified Data.Text.Encoding as TE
import qualified Data.Text.Encoding.Error as TE
import qualified Data.Time.Clock as Time
import qualified Database.PG.Query as Q
import qualified Hasura.GraphQL.Schema as GS
@ -46,6 +47,12 @@ import qualified Network.HTTP.Types as N
import qualified Network.Wreq as W
import qualified Network.Wreq.Session as WS
type Version = T.Text
invocationVersion :: Version
invocationVersion = "2"
type LogEnvHeaders = Bool
type CacheRef = IORef (SchemaCache, GS.GCtxMap)
@ -70,8 +77,6 @@ data Event
, eTable :: QualifiedTable
, eTrigger :: TriggerMeta
, eEvent :: Value
-- , eDelivered :: Bool
-- , eError :: Bool
, eTries :: Int
, eCreatedAt :: Time.UTCTime
} deriving (Show, Eq)
@ -89,12 +94,37 @@ instance ToJSON Event where
$(deriveFromJSON (aesonDrop 1 snakeCase){omitNothingFields=True} ''Event)
data Request
= Request
{ _rqPayload :: Value
, _rqHeaders :: Maybe [HeaderConf]
, _rqVersion :: T.Text
}
$(deriveToJSON (aesonDrop 3 snakeCase){omitNothingFields=True} ''Request)
data WebhookResponse
= WebhookResponse
{ _wrsBody :: TBS.TByteString
, _wrsHeaders :: Maybe [HeaderConf]
, _wrsStatus :: Int
}
$(deriveToJSON (aesonDrop 4 snakeCase){omitNothingFields=True} ''WebhookResponse)
data InitError = InitError { _ieMessage :: TBS.TByteString}
$(deriveToJSON (aesonDrop 3 snakeCase){omitNothingFields=True} ''InitError)
data Response = ResponseType1 WebhookResponse | ResponseType2 InitError
instance ToJSON Response where
toJSON (ResponseType1 resp) = object ["type" .= String "webhook_response", "data" .= toJSON resp, "version" .= invocationVersion]
toJSON (ResponseType2 err) = object ["type" .= String "init_error", "data" .= toJSON err, "version" .= invocationVersion]
data Invocation
= Invocation
{ iEventId :: EventId
, iStatus :: Int64
, iRequest :: Value
, iResponse :: TBS.TByteString
, iStatus :: Int
, iRequest :: Request
, iResponse :: Response
}
data EventEngineCtx
@ -111,20 +141,23 @@ defaultMaxEventThreads = 100
defaultPollingIntervalSec :: Int
defaultPollingIntervalSec = 1
retryAfterHeader :: CI.CI T.Text
retryAfterHeader = "Retry-After"
initEventEngineCtx :: Int -> Int -> STM EventEngineCtx
initEventEngineCtx maxT pollI = do
q <- TQ.newTQueue
c <- newTVar 0
return $ EventEngineCtx q c maxT pollI
processEventQueue :: L.LoggerCtx -> WS.Session -> Q.PGPool -> CacheRef -> EventEngineCtx -> IO ()
processEventQueue logctx httpSess pool cacheRef eectx = do
processEventQueue :: L.LoggerCtx -> LogEnvHeaders -> WS.Session -> Q.PGPool -> CacheRef -> EventEngineCtx -> IO ()
processEventQueue logctx logenv httpSess pool cacheRef eectx = do
putStrLn "event_trigger: starting workers"
threads <- mapM async [pollThread , consumeThread]
void $ waitAny threads
where
pollThread = pollEvents (mkHLogger logctx) pool eectx
consumeThread = consumeEvents (mkHLogger logctx) httpSess pool cacheRef eectx
consumeThread = consumeEvents (mkHLogger logctx) logenv httpSess pool cacheRef eectx
pollEvents
:: HLogger -> Q.PGPool -> EventEngineCtx -> IO ()
@ -137,12 +170,12 @@ pollEvents logger pool eectx = forever $ do
threadDelay (pollI * 1000 * 1000)
consumeEvents
:: HLogger -> WS.Session -> Q.PGPool -> CacheRef -> EventEngineCtx -> IO ()
consumeEvents logger httpSess pool cacheRef eectx = forever $ do
:: HLogger -> LogEnvHeaders -> WS.Session -> Q.PGPool -> CacheRef -> EventEngineCtx -> IO ()
consumeEvents logger logenv httpSess pool cacheRef eectx = forever $ do
event <- atomically $ do
let EventEngineCtx q _ _ _ = eectx
TQ.readTQueue q
async $ runReaderT (processEvent pool event) (logger, httpSess, cacheRef, eectx)
async $ runReaderT (processEvent logenv pool event) (logger, httpSess, cacheRef, eectx)
processEvent
:: ( MonadReader r m
@ -152,10 +185,10 @@ processEvent
, Has CacheRef r
, Has EventEngineCtx r
)
=> Q.PGPool -> Event -> m ()
processEvent pool e = do
=> LogEnvHeaders -> Q.PGPool -> Event -> m ()
processEvent logenv pool e = do
(logger:: HLogger) <- asks getter
res <- tryWebhook pool e
res <- tryWebhook logenv pool e
finally <- either errorFn successFn res
liftIO $ either (logQErr logger) (void.return) finally
where
@ -171,7 +204,7 @@ processEvent pool e = do
errorFn err = do
(logger:: HLogger) <- asks getter
liftIO $ logger $ L.toEngineLog err
checkError
checkError err
successFn
:: ( MonadReader r m
@ -181,7 +214,7 @@ processEvent pool e = do
, Has CacheRef r
, Has EventEngineCtx r
)
=> B.ByteString -> m (Either QErr ())
=> HTTPResp -> m (Either QErr ())
successFn _ = liftIO $ runExceptT $ runUnlockQ pool e
logQErr :: HLogger -> QErr -> IO ()
@ -195,17 +228,45 @@ processEvent pool e = do
, Has CacheRef r
, Has EventEngineCtx r
)
=> m (Either QErr ())
checkError = do
=> HTTPErr -> m (Either QErr ())
checkError err = do
let mretryHeader = getRetryAfterHeaderFromError err
cacheRef::CacheRef <- asks getter
(cache, _) <- liftIO $ readIORef cacheRef
let eti = getEventTriggerInfoFromEvent cache e
retryConfM = etiRetryConf <$> eti
retryConf = fromMaybe (RetryConf 0 10) retryConfM
tries = eTries e
if tries >= rcNumRetries retryConf -- current_try = tries + 1 , allowed_total_tries = rcNumRetries retryConf + 1
retryHeaderSeconds = parseRetryHeader mretryHeader
triesExhausted = tries >= rcNumRetries retryConf
noRetryHeader = isNothing retryHeaderSeconds
if triesExhausted && noRetryHeader -- current_try = tries + 1 , allowed_total_tries = rcNumRetries retryConf + 1
then liftIO $ runExceptT $ runErrorAndUnlockQ pool e
else liftIO $ runExceptT $ runRetryAfterAndUnlockQ pool e retryConf
else do
let delay = chooseDelay triesExhausted (rcIntervalSec retryConf) retryHeaderSeconds
liftIO $ runExceptT $ runRetryAfterAndUnlockQ pool e delay
getRetryAfterHeaderFromError (HStatus resp) = getRetryAfterHeaderFromResp resp
getRetryAfterHeaderFromError _ = Nothing
getRetryAfterHeaderFromResp resp
= let mHeader = find (\(HeaderConf name _) -> CI.mk name == retryAfterHeader) (hrsHeaders resp)
in case mHeader of
Just (HeaderConf _ (HVValue value)) -> Just value
_ -> Nothing
parseRetryHeader Nothing = Nothing
parseRetryHeader (Just hValue)
= let seconds = readMaybe $ T.unpack hValue
in case seconds of
Nothing -> Nothing
Just sec -> if sec > 0 then Just sec else Nothing
-- we need to choose delay between the retry conf and the retry-after header
chooseDelay _ retryConfSec Nothing = retryConfSec
chooseDelay triesExhausted retryConfSec (Just retryHeaderSec) = if triesExhausted
then retryHeaderSec
else min retryHeaderSec retryConfSec
tryWebhook
:: ( MonadReader r m
@ -215,8 +276,8 @@ tryWebhook
, Has CacheRef r
, Has EventEngineCtx r
)
=> Q.PGPool -> Event -> m (Either HTTPErr B.ByteString)
tryWebhook pool e = do
=> LogEnvHeaders -> Q.PGPool -> Event -> m (Either HTTPErr HTTPResp)
tryWebhook logenv pool e = do
logger:: HLogger <- asks getter
cacheRef::CacheRef <- asks getter
(cache, _) <- liftIO $ readIORef cacheRef
@ -227,8 +288,8 @@ tryWebhook pool e = do
let webhook = etiWebhook eti
createdAt = eCreatedAt e
eventId = eId e
headersRaw = etiHeaders eti
headers = map encodeHeader headersRaw
headerInfos = etiHeaders eti
headers = map encodeHeader headerInfos
eeCtx <- asks getter
-- wait for counter and then increment beforing making http
liftIO $ atomically $ do
@ -237,7 +298,9 @@ tryWebhook pool e = do
if countThreads >= maxT
then retry
else modifyTVar' c (+1)
eitherResp <- runExceptT $ runHTTP (addHeaders headers W.defaults) (mkAnyHTTPPost (T.unpack webhook) (Just $ toJSON e)) (Just (ExtraContext createdAt eventId))
let options = addHeaders headers W.defaults
decodedHeaders = map (decodeHeader headerInfos) $ options CL.^. W.headers
eitherResp <- runExceptT $ runHTTP options (mkAnyHTTPPost (T.unpack webhook) (Just $ toJSON e)) (Just (ExtraContext createdAt eventId))
--decrement counter once http is done
liftIO $ atomically $ do
@ -247,24 +310,78 @@ tryWebhook pool e = do
finally <- liftIO $ runExceptT $ case eitherResp of
Left err ->
case err of
HClient excp -> runFailureQ pool $ Invocation (eId e) 1000 (toJSON e) (TBS.fromLBS $ encode $ show excp)
HParse _ detail -> runFailureQ pool $ Invocation (eId e) 1001 (toJSON e) (TBS.fromLBS $ encode detail)
HStatus status detail -> runFailureQ pool $ Invocation (eId e) (fromIntegral $ N.statusCode status) (toJSON e) detail
HOther detail -> runFailureQ pool $ Invocation (eId e) 500 (toJSON e) (TBS.fromLBS $ encode detail)
Right resp -> runSuccessQ pool e $ Invocation (eId e) 200 (toJSON e) (TBS.fromLBS resp)
HClient excp -> let errMsg = TBS.fromLBS $ encode $ show excp
in runFailureQ pool $ mkInvo e 1000 decodedHeaders errMsg []
HParse _ detail -> let errMsg = TBS.fromLBS $ encode detail
in runFailureQ pool $ mkInvo e 1001 decodedHeaders errMsg []
HStatus errResp -> let respPayload = hrsBody errResp
respHeaders = hrsHeaders errResp
respStatus = hrsStatus errResp
in runFailureQ pool $ mkInvo e respStatus decodedHeaders respPayload respHeaders
HOther detail -> let errMsg = (TBS.fromLBS $ encode detail)
in runFailureQ pool $ mkInvo e 500 decodedHeaders errMsg []
Right resp -> let respPayload = hrsBody resp
respHeaders = hrsHeaders resp
respStatus = hrsStatus resp
in runSuccessQ pool e $ mkInvo e respStatus decodedHeaders respPayload respHeaders
case finally of
Left err -> liftIO $ logger $ L.toEngineLog $ EventInternalErr err
Right _ -> return ()
return eitherResp
where
mkInvo :: Event -> Int -> [HeaderConf] -> TBS.TByteString -> [HeaderConf] -> Invocation
mkInvo e' status reqHeaders respBody respHeaders
= let resp = if isInitError status then mkErr respBody else mkResp status respBody respHeaders
in
Invocation
(eId e')
status
(mkWebhookReq (toJSON e) reqHeaders)
resp
addHeaders :: [(N.HeaderName, BS.ByteString)] -> W.Options -> W.Options
addHeaders headers opts = foldl (\acc h -> acc CL.& W.header (fst h) CL..~ [snd h] ) opts headers
encodeHeader :: (HeaderName, T.Text)-> (N.HeaderName, BS.ByteString)
encodeHeader header =
let name = CI.mk $ T.encodeUtf8 $ fst header
value = T.encodeUtf8 $ snd header
in (name, value)
encodeHeader :: EventHeaderInfo -> (N.HeaderName, BS.ByteString)
encodeHeader (EventHeaderInfo hconf cache) =
let (HeaderConf name _) = hconf
ciname = CI.mk $ T.encodeUtf8 name
value = T.encodeUtf8 cache
in (ciname, value)
decodeHeader :: [EventHeaderInfo] -> (N.HeaderName, BS.ByteString) -> HeaderConf
decodeHeader headerInfos (hdrName, hdrVal)
= let name = decodeBS $ CI.original hdrName
getName ehi = let (HeaderConf name' _) = ehiHeaderConf ehi
in name'
mehi = find (\hi -> getName hi == name) headerInfos
in case mehi of
Nothing -> HeaderConf name (HVValue (decodeBS hdrVal))
Just ehi -> if logenv
then HeaderConf name (HVValue (ehiCachedValue ehi))
else ehiHeaderConf ehi
where
decodeBS = TE.decodeUtf8With TE.lenientDecode
mkWebhookReq :: Value -> [HeaderConf] -> Request
mkWebhookReq payload headers = Request payload (mkMaybe headers) invocationVersion
mkResp :: Int -> TBS.TByteString -> [HeaderConf] -> Response
mkResp status payload headers =
let wr = WebhookResponse payload (mkMaybe headers) status
in ResponseType1 wr
mkErr :: TBS.TByteString -> Response
mkErr message =
let ir = InitError message
in ResponseType2 ir
mkMaybe :: [a] -> Maybe [a]
mkMaybe [] = Nothing
mkMaybe x = Just x
isInitError :: Int -> Bool
isInitError status = status >= 1000
getEventTriggerInfoFromEvent :: SchemaCache -> Event -> Maybe EventTriggerInfo
getEventTriggerInfoFromEvent sc e = let table = eTable e
@ -291,7 +408,7 @@ insertInvocation invo = do
Q.unitQE defaultTxErrorHandler [Q.sql|
INSERT INTO hdb_catalog.event_invocation_logs (event_id, status, request, response)
VALUES ($1, $2, $3, $4)
|] (iEventId invo, iStatus invo, Q.AltJ $ toJSON $ iRequest invo, Q.AltJ $ toJSON $ iResponse invo) True
|] (iEventId invo, toInt64 $ iStatus invo, Q.AltJ $ toJSON $ iRequest invo, Q.AltJ $ toJSON $ iResponse invo) True
Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_log
SET tries = tries + 1
@ -360,13 +477,16 @@ runErrorAndUnlockQ pool e = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $
clearNextRetry e
unlockEvent e
runRetryAfterAndUnlockQ :: Q.PGPool -> Event -> RetryConf -> ExceptT QErr IO ()
runRetryAfterAndUnlockQ pool e rconf = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ do
runRetryAfterAndUnlockQ :: Q.PGPool -> Event -> Int -> ExceptT QErr IO ()
runRetryAfterAndUnlockQ pool e delay = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ do
currentTime <- liftIO getCurrentTime
let diff = fromIntegral $ rcIntervalSec rconf
let diff = fromIntegral delay
retryTime = addUTCTime diff currentTime
setNextRetry e retryTime
unlockEvent e
runUnlockQ :: Q.PGPool -> Event -> ExceptT QErr IO ()
runUnlockQ pool e = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ unlockEvent e
toInt64 :: (Integral a) => a -> Int64
toInt64 = fromIntegral

View File

@ -380,12 +380,10 @@ buildSchemaCache = flip execStateT emptySchemaCache $ do
let headerConfs = fromMaybe [] mheaders
qt = QualifiedTable sn tn
allCols <- getCols . tiFieldInfoMap <$> askTabInfo qt
headers <- getHeadersFromConf headerConfs
headers <- getHeaderInfosFromConf headerConfs
tDef <- decodeValue tDefVal
addEventTriggerToCache (QualifiedTable sn tn) trid trn tDef (RetryConf nr rint) webhook headers
liftTx $ mkTriggerQ trid trn qt allCols tDef
where
permHelper sn tn rn pDef pa = do
qCtx <- mkAdminQCtx <$> get

View File

@ -256,8 +256,8 @@ subTableP2 qt replace q@(EventTriggerDef name def webhook rconf mheaders) = do
else
liftTx $ addEventTriggerToCatalog qt allCols q
let headerConfs = fromMaybe [] mheaders
headers <- getHeadersFromConf headerConfs
addEventTriggerToCache qt trid name def rconf webhook headers
headerInfos <- getHeaderInfosFromConf headerConfs
addEventTriggerToCache qt trid name def rconf webhook headerInfos
subTableP2shim :: (P2C m) => (QualifiedTable, Bool, EventTriggerDef) -> m RespBody
subTableP2shim (qt, replace, etdef) = do
@ -300,17 +300,17 @@ instance HDBQuery DeliverEventQuery where
phaseTwo q _ = deliverEvent q
schemaCachePolicy = SCPNoChange
getHeadersFromConf :: (P2C m) => [HeaderConf] -> m [(HeaderName, T.Text)]
getHeadersFromConf = mapM getHeader
getHeaderInfosFromConf :: (P2C m) => [HeaderConf] -> m [EventHeaderInfo]
getHeaderInfosFromConf = mapM getHeader
where
getHeader :: (P2C m) => HeaderConf -> m (HeaderName, T.Text)
getHeader :: (P2C m) => HeaderConf -> m EventHeaderInfo
getHeader hconf = case hconf of
(HeaderConf name (HVValue val)) -> return (name, val)
(HeaderConf name (HVEnv val)) -> do
(HeaderConf _ (HVValue val)) -> return $ EventHeaderInfo hconf val
(HeaderConf _ (HVEnv val)) -> do
mEnv <- liftIO $ lookupEnv (T.unpack val)
case mEnv of
Nothing -> throw400 NotFound $ "environment variable '" <> val <> "' not set"
Just val' -> return (name, T.pack val')
Just envval -> return $ EventHeaderInfo hconf (T.pack envval)
toInt64 :: (Integral a) => a -> Int64
toInt64 = fromIntegral

View File

@ -376,7 +376,7 @@ data EventTriggerInfo
, etiDelete :: !(Maybe OpTriggerInfo)
, etiRetryConf :: !RetryConf
, etiWebhook :: !T.Text
, etiHeaders :: ![(HeaderName, T.Text)]
, etiHeaders :: ![EventHeaderInfo]
} deriving (Show, Eq)
$(deriveToJSON (aesonDrop 3 snakeCase) ''EventTriggerInfo)
@ -637,7 +637,7 @@ addEventTriggerToCache
-> TriggerOpsDef
-> RetryConf
-> T.Text
-> [(HeaderName, T.Text)]
-> [EventHeaderInfo]
-> m ()
addEventTriggerToCache qt trid trn tdef rconf webhook headers =
modTableInCache modEventTriggerInfo qt

View File

@ -19,6 +19,7 @@ module Hasura.RQL.Types.Subscribe
, HeaderConf(..)
, HeaderValue(..)
, HeaderName
, EventHeaderInfo(..)
) where
import Data.Aeson
@ -89,6 +90,14 @@ instance ToJSON HeaderConf where
toJSON (HeaderConf name (HVValue val)) = object ["name" .= name, "value" .= val]
toJSON (HeaderConf name (HVEnv val)) = object ["name" .= name, "value_from_env" .= val]
data EventHeaderInfo
= EventHeaderInfo
{ ehiHeaderConf :: !HeaderConf
, ehiCachedValue :: !T.Text
} deriving (Show, Eq, Lift)
$(deriveToJSON (aesonDrop 3 snakeCase){omitNothingFields=True} ''EventHeaderInfo)
data CreateEventTriggerQuery
= CreateEventTriggerQuery
{ cetqName :: !T.Text