diff --git a/console/src/components/Services/EventTrigger/ProcessedEvents/ViewRows.js b/console/src/components/Services/EventTrigger/ProcessedEvents/ViewRows.js index efba033e461..677ff9ed3c3 100644 --- a/console/src/components/Services/EventTrigger/ProcessedEvents/ViewRows.js +++ b/console/src/components/Services/EventTrigger/ProcessedEvents/ViewRows.js @@ -7,6 +7,7 @@ import 'brace/mode/json'; import 'react-table/react-table.css'; import { deleteItem, vExpandRow, vCollapseRow } from './ViewActions'; // eslint-disable-line no-unused-vars import FilterQuery from './FilterQuery'; +import parseRowData from '../StreamingLogs/util'; import { setOrderCol, setOrderType, @@ -271,6 +272,8 @@ const ViewRows = ({ const currentIndex = row.index; const currentRow = curRows[0].events[currentIndex]; const invocationRowsData = []; + const requestData = []; + const responseData = []; currentRow.logs.map((r, rowIndex) => { const newRow = {}; const status = @@ -280,6 +283,8 @@ const ViewRows = ({ ); + requestData.push(parseRowData(r, 'request')); + responseData.push(parseRowData(r, 'response')); // Insert cells corresponding to all rows invocationColumns.forEach(col => { const getCellContent = () => { @@ -324,20 +329,8 @@ const ViewRows = ({ showPagination={false} SubComponent={logRow => { const finalIndex = logRow.index; - const finalRow = currentRow.logs[finalIndex]; - const currentPayload = JSON.stringify( - finalRow.request, - null, - 4 - ); - // check if response is type JSON - let finalResponse = finalRow.response; - try { - finalResponse = JSON.parse(finalRow.response); - finalResponse = JSON.stringify(finalResponse, null, 4); - } catch (e) { - console.error(e); - } + const finalRequest = requestData[finalIndex]; + const finalResponse = responseData[finalIndex]; return (
+ {finalRequest.headers ? ( +
+
+ Headers +
+ +
+ ) : null}
- Request + Payload
+ {finalResponse.headers ? ( +
+
+ Headers +
+ +
+ ) : null}
- Response + Payload
{ + const requestData = []; + const responseData = []; + log.rows.map((r, i) => { const newRow = {}; const status = r.status === 200 ? ( @@ -196,6 +199,8 @@ class StreamingLogs extends Component { ); + requestData.push(parseRowData(r, 'request')); + responseData.push(parseRowData(r, 'response')); // Insert cells corresponding to all rows invocationColumns.forEach(col => { const getCellContent = () => { @@ -220,20 +225,20 @@ class StreamingLogs extends Component { if (col === 'operation') { return (
- {r.request.event.op.toLowerCase()} + {requestData[i].data.event.op.toLowerCase()}
); } if (col === 'primary_key') { - const tableName = r.request.table.name; + const tableName = requestData[i].data.table.name; const tableData = allSchemas.filter( row => row.table_name === tableName ); const primaryKey = tableData[0].primary_key.columns; // handle all primary keys const pkHtml = []; primaryKey.map(pk => { - const newPrimaryKeyData = r.request.event.data.new - ? r.request.event.data.new[pk] + const newPrimaryKeyData = requestData[i].data.event.data.new + ? requestData[i].data.event.data.new[pk] : ''; pkHtml.push(
@@ -333,20 +338,8 @@ class StreamingLogs extends Component { } SubComponent={logRow => { const finalIndex = logRow.index; - const finalRow = log.rows[finalIndex]; - const currentPayload = JSON.stringify( - finalRow.request, - null, - 4 - ); - // check if response is type JSON - let finalResponse = finalRow.response; - try { - finalResponse = JSON.parse(finalRow.response); - finalResponse = JSON.stringify(finalResponse, null, 4); - } catch (e) { - console.error(e); - } + const finalRequest = requestData[finalIndex]; + const finalResponse = responseData[finalIndex]; return (
+ {finalRequest.headers ? ( +
+
+ Headers +
+ +
+ ) : null}
-
Request
+
Payload
+ {finalResponse.headers ? ( +
+
+ Headers +
+ +
+ ) : null}
Response
{ + switch (dataType) { + case 'request': + switch (row.request.version) { + case '2': + const data = row.request.payload; + return { + data: data, + headers: row.request.headers, + }; + default: + return { + data: row.request, + }; + } + case 'response': + let data; + switch (row.response.version) { + case '2': + try { + data = JSON.parse(row.response.data.body); + } catch (e) { + console.log(e); + data = row.response.data.body; + } + return { + data: data, + headers: row.response.data.headers, + }; + default: + try { + data = JSON.parse(row.response); + } catch (e) { + console.log(e); + data = row.response; + } + return { + data: data, + }; + } + default: + return false; + } +}; + +export default parseRowData; diff --git a/server/src-exec/Main.hs b/server/src-exec/Main.hs index 92ee2f7496c..3ba8054e7ea 100644 --- a/server/src-exec/Main.hs +++ b/server/src-exec/Main.hs @@ -170,11 +170,12 @@ main = do maxEvThrds <- getFromEnv defaultMaxEventThreads "HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE" evPollSec <- getFromEnv defaultPollingIntervalSec "HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL" + logEnvHeaders <- getFromEnv False "LOG_HEADERS_FROM_ENV" eventEngineCtx <- atomically $ initEventEngineCtx maxEvThrds evPollSec httpSession <- WrqS.newSessionControl Nothing TLS.tlsManagerSettings - void $ C.forkIO $ processEventQueue hloggerCtx httpSession pool cacheRef eventEngineCtx + void $ C.forkIO $ processEventQueue hloggerCtx logEnvHeaders httpSession pool cacheRef eventEngineCtx Warp.runSettings warpSettings app @@ -213,7 +214,7 @@ main = do let mRes = case mEnv of Nothing -> Just defaults Just val -> readMaybe val - eRes = maybe (Left "HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE is not an integer") Right mRes + eRes = maybe (Left $ "Wrong expected type for environment variable: " <> env) Right mRes either ((>> exitFailure) . putStrLn) return eRes cleanSuccess = putStrLn "successfully cleaned graphql-engine related data" diff --git a/server/src-lib/Hasura/Events/HTTP.hs b/server/src-lib/Hasura/Events/HTTP.hs index 57ba38fd530..a013dbe6202 100644 --- a/server/src-lib/Hasura/Events/HTTP.hs +++ b/server/src-lib/Hasura/Events/HTTP.hs @@ -9,17 +9,12 @@ module Hasura.Events.HTTP ( HTTP(..) - , mkHTTP , mkAnyHTTPPost - , mkHTTPMaybe , HTTPErr(..) + , HTTPResp(..) , runHTTP - , default2xxParser - , noBody2xxParser , defaultRetryPolicy , defaultRetryFn - , defaultParser - , defaultParserMaybe , isNetworkError , isNetworkErrorHC , HLogger @@ -37,8 +32,6 @@ import qualified Data.TByteString as TBS import qualified Data.Text as T import qualified Data.Text.Encoding as TE import qualified Data.Text.Encoding.Error as TE -import qualified Data.Text.Lazy as TL -import qualified Data.Text.Lazy.Encoding as TLE import qualified Data.Time.Clock as Time import qualified Network.HTTP.Client as H import qualified Network.HTTP.Types as N @@ -70,10 +63,44 @@ data ExtraContext $(J.deriveJSON (J.aesonDrop 2 J.snakeCase){J.omitNothingFields=True} ''ExtraContext) +data HTTPResp + = HTTPResp + { hrsStatus :: !Int + , hrsHeaders :: ![HeaderConf] + , hrsBody :: !TBS.TByteString + } deriving (Show, Eq) + +$(J.deriveToJSON (J.aesonDrop 3 J.snakeCase){J.omitNothingFields=True} ''HTTPResp) + +instance ToEngineLog HTTPResp where + toEngineLog resp = (LevelInfo, "event-trigger", J.toJSON resp ) + +mkHTTPResp :: W.Response B.ByteString -> HTTPResp +mkHTTPResp resp = + HTTPResp + (resp ^. W.responseStatus.W.statusCode) + (map decodeHeader $ resp ^. W.responseHeaders) + (TBS.fromLBS $ resp ^. W.responseBody) + where + decodeBS = TE.decodeUtf8With TE.lenientDecode + decodeHeader (hdrName, hdrVal) + = HeaderConf (decodeBS $ CI.original hdrName) (HVValue (decodeBS hdrVal)) + +data HTTPRespExtra + = HTTPRespExtra + { _hreResponse :: HTTPResp + , _hreContext :: Maybe ExtraContext + } + +$(J.deriveToJSON (J.aesonDrop 4 J.snakeCase){J.omitNothingFields=True} ''HTTPRespExtra) + +instance ToEngineLog HTTPRespExtra where + toEngineLog resp = (LevelInfo, "event-trigger", J.toJSON resp ) + data HTTPErr = HClient !H.HttpException | HParse !N.Status !String - | HStatus !N.Status TBS.TByteString + | HStatus !HTTPResp | HOther !String deriving (Show) @@ -84,18 +111,19 @@ instance J.ToJSON HTTPErr where ( "parse" , J.toJSON (N.statusCode st, show e) ) - (HStatus st resp) -> - ("status", J.toJSON (N.statusCode st, resp)) + (HStatus resp) -> + ("status", J.toJSON resp) (HOther e) -> ("internal", J.toJSON $ show e) where toObj :: (T.Text, J.Value) -> J.Value toObj (k, v) = J.object [ "type" J..= k , "detail" J..= v] - -- encapsulates a http operation instance ToEngineLog HTTPErr where toEngineLog err = (LevelError, "event-trigger", J.toJSON err ) + + data HTTP a = HTTP { _hMethod :: !String @@ -138,79 +166,20 @@ defaultRetryPolicy = R.capDelay (120 * 1000 * 1000) (R.fullJitterBackoff (2 * 1000 * 1000)) <> R.limitRetries 15 --- a helper function -respJson :: (J.FromJSON a) => W.Response B.ByteString -> Either HTTPErr a -respJson resp = - either (Left . HParse respCode) return $ - J.eitherDecode respBody - where - respCode = resp ^. W.responseStatus - respBody = resp ^. W.responseBody - -defaultParser :: (J.FromJSON a) => W.Response B.ByteString -> Either HTTPErr a -defaultParser resp = if - | respCode == N.status200 -> respJson resp - | otherwise -> do - let val = TBS.fromLBS $ resp ^. W.responseBody - throwError $ HStatus respCode val +anyBodyParser :: W.Response B.ByteString -> Either HTTPErr HTTPResp +anyBodyParser resp = do + let httpResp = mkHTTPResp resp + if respCode >= N.status200 && respCode < N.status300 + then return httpResp + else throwError $ HStatus httpResp where respCode = resp ^. W.responseStatus --- like default parser but turns 404 into maybe -defaultParserMaybe - :: (J.FromJSON a) => W.Response B.ByteString -> Either HTTPErr (Maybe a) -defaultParserMaybe resp = if - | respCode == N.status200 -> Just <$> respJson resp - | respCode == N.status404 -> return Nothing - | otherwise -> do - let val = TBS.fromLBS $ resp ^. W.responseBody - throwError $ HStatus respCode val - where - respCode = resp ^. W.responseStatus - --- default parser which allows all 2xx responses -default2xxParser :: (J.FromJSON a) => W.Response B.ByteString -> Either HTTPErr a -default2xxParser resp = if - | respCode >= N.status200 && respCode < N.status300 -> respJson resp - | otherwise -> do - let val = TBS.fromLBS $ resp ^. W.responseBody - throwError $ HStatus respCode val - where - respCode = resp ^. W.responseStatus - -noBody2xxParser :: W.Response B.ByteString -> Either HTTPErr () -noBody2xxParser resp = if - | respCode >= N.status200 && respCode < N.status300 -> return () - | otherwise -> do - let val = TBS.fromLBS $ resp ^. W.responseBody - throwError $ HStatus respCode val - where - respCode = resp ^. W.responseStatus - -anyBodyParser :: W.Response B.ByteString -> Either HTTPErr B.ByteString -anyBodyParser resp = if - | respCode >= N.status200 && respCode < N.status300 -> return $ resp ^. W.responseBody - | otherwise -> do - let val = TBS.fromLBS $ resp ^. W.responseBody - throwError $ HStatus respCode val - where - respCode = resp ^. W.responseStatus - -mkHTTP :: (J.FromJSON a) => String -> String -> HTTP a -mkHTTP method url = - HTTP method url Nothing Nothing id defaultParser - defaultRetryFn defaultRetryPolicy - -mkAnyHTTPPost :: String -> Maybe J.Value -> HTTP B.ByteString +mkAnyHTTPPost :: String -> Maybe J.Value -> HTTP HTTPResp mkAnyHTTPPost url payload = HTTP "POST" url payload Nothing id anyBodyParser defaultRetryFn defaultRetryPolicy -mkHTTPMaybe :: (J.FromJSON a) => String -> String -> HTTP (Maybe a) -mkHTTPMaybe method url = - HTTP method url Nothing Nothing id defaultParserMaybe - defaultRetryFn defaultRetryPolicy - -- internal logging related types data HTTPReq = HTTPReq @@ -226,43 +195,6 @@ $(J.deriveJSON (J.aesonDrop 4 J.snakeCase){J.omitNothingFields=True} ''HTTPReq) instance ToEngineLog HTTPReq where toEngineLog req = (LevelInfo, "event-trigger", J.toJSON req ) -instance ToEngineLog HTTPResp where - toEngineLog resp = (LevelInfo, "event-trigger", J.toJSON resp ) - -data HTTPResp - = HTTPResp - { _hrsStatus :: !Int - , _hrsHeaders :: ![T.Text] - , _hrsBody :: !TL.Text - } deriving (Show, Eq) - -$(J.deriveJSON (J.aesonDrop 4 J.snakeCase){J.omitNothingFields=True} ''HTTPResp) - - -data HTTPRespExtra - = HTTPRespExtra - { _hreResponse :: HTTPResp - , _hreContext :: Maybe ExtraContext - } - -$(J.deriveJSON (J.aesonDrop 4 J.snakeCase){J.omitNothingFields=True} ''HTTPRespExtra) - -instance ToEngineLog HTTPRespExtra where - toEngineLog resp = (LevelInfo, "event-trigger", J.toJSON resp ) - -mkHTTPResp :: W.Response B.ByteString -> HTTPResp -mkHTTPResp resp = - HTTPResp - (resp ^. W.responseStatus.W.statusCode) - (map decodeHeader $ resp ^. W.responseHeaders) - (decodeLBS $ resp ^. W.responseBody) - where - decodeBS = TE.decodeUtf8With TE.lenientDecode - decodeLBS = TLE.decodeUtf8With TE.lenientDecode - decodeHeader (hdrName, hdrVal) - = decodeBS (CI.original hdrName) <> " : " <> decodeBS hdrVal - - runHTTP :: ( MonadReader r m , MonadError HTTPErr m diff --git a/server/src-lib/Hasura/Events/Lib.hs b/server/src-lib/Hasura/Events/Lib.hs index d3badc13a9a..47cf19b6b91 100644 --- a/server/src-lib/Hasura/Events/Lib.hs +++ b/server/src-lib/Hasura/Events/Lib.hs @@ -32,12 +32,13 @@ import Hasura.SQL.Types import qualified Control.Concurrent.STM.TQueue as TQ import qualified Control.Lens as CL import qualified Data.ByteString as BS -import qualified Data.ByteString.Lazy as B import qualified Data.CaseInsensitive as CI import qualified Data.HashMap.Strict as M import qualified Data.TByteString as TBS import qualified Data.Text as T import qualified Data.Text.Encoding as T +import qualified Data.Text.Encoding as TE +import qualified Data.Text.Encoding.Error as TE import qualified Data.Time.Clock as Time import qualified Database.PG.Query as Q import qualified Hasura.GraphQL.Schema as GS @@ -46,6 +47,12 @@ import qualified Network.HTTP.Types as N import qualified Network.Wreq as W import qualified Network.Wreq.Session as WS +type Version = T.Text + +invocationVersion :: Version +invocationVersion = "2" + +type LogEnvHeaders = Bool type CacheRef = IORef (SchemaCache, GS.GCtxMap) @@ -70,8 +77,6 @@ data Event , eTable :: QualifiedTable , eTrigger :: TriggerMeta , eEvent :: Value - -- , eDelivered :: Bool - -- , eError :: Bool , eTries :: Int , eCreatedAt :: Time.UTCTime } deriving (Show, Eq) @@ -89,12 +94,37 @@ instance ToJSON Event where $(deriveFromJSON (aesonDrop 1 snakeCase){omitNothingFields=True} ''Event) +data Request + = Request + { _rqPayload :: Value + , _rqHeaders :: Maybe [HeaderConf] + , _rqVersion :: T.Text + } +$(deriveToJSON (aesonDrop 3 snakeCase){omitNothingFields=True} ''Request) + +data WebhookResponse + = WebhookResponse + { _wrsBody :: TBS.TByteString + , _wrsHeaders :: Maybe [HeaderConf] + , _wrsStatus :: Int + } +$(deriveToJSON (aesonDrop 4 snakeCase){omitNothingFields=True} ''WebhookResponse) + +data InitError = InitError { _ieMessage :: TBS.TByteString} +$(deriveToJSON (aesonDrop 3 snakeCase){omitNothingFields=True} ''InitError) + +data Response = ResponseType1 WebhookResponse | ResponseType2 InitError + +instance ToJSON Response where + toJSON (ResponseType1 resp) = object ["type" .= String "webhook_response", "data" .= toJSON resp, "version" .= invocationVersion] + toJSON (ResponseType2 err) = object ["type" .= String "init_error", "data" .= toJSON err, "version" .= invocationVersion] + data Invocation = Invocation { iEventId :: EventId - , iStatus :: Int64 - , iRequest :: Value - , iResponse :: TBS.TByteString + , iStatus :: Int + , iRequest :: Request + , iResponse :: Response } data EventEngineCtx @@ -111,20 +141,23 @@ defaultMaxEventThreads = 100 defaultPollingIntervalSec :: Int defaultPollingIntervalSec = 1 +retryAfterHeader :: CI.CI T.Text +retryAfterHeader = "Retry-After" + initEventEngineCtx :: Int -> Int -> STM EventEngineCtx initEventEngineCtx maxT pollI = do q <- TQ.newTQueue c <- newTVar 0 return $ EventEngineCtx q c maxT pollI -processEventQueue :: L.LoggerCtx -> WS.Session -> Q.PGPool -> CacheRef -> EventEngineCtx -> IO () -processEventQueue logctx httpSess pool cacheRef eectx = do +processEventQueue :: L.LoggerCtx -> LogEnvHeaders -> WS.Session -> Q.PGPool -> CacheRef -> EventEngineCtx -> IO () +processEventQueue logctx logenv httpSess pool cacheRef eectx = do putStrLn "event_trigger: starting workers" threads <- mapM async [pollThread , consumeThread] void $ waitAny threads where pollThread = pollEvents (mkHLogger logctx) pool eectx - consumeThread = consumeEvents (mkHLogger logctx) httpSess pool cacheRef eectx + consumeThread = consumeEvents (mkHLogger logctx) logenv httpSess pool cacheRef eectx pollEvents :: HLogger -> Q.PGPool -> EventEngineCtx -> IO () @@ -137,12 +170,12 @@ pollEvents logger pool eectx = forever $ do threadDelay (pollI * 1000 * 1000) consumeEvents - :: HLogger -> WS.Session -> Q.PGPool -> CacheRef -> EventEngineCtx -> IO () -consumeEvents logger httpSess pool cacheRef eectx = forever $ do + :: HLogger -> LogEnvHeaders -> WS.Session -> Q.PGPool -> CacheRef -> EventEngineCtx -> IO () +consumeEvents logger logenv httpSess pool cacheRef eectx = forever $ do event <- atomically $ do let EventEngineCtx q _ _ _ = eectx TQ.readTQueue q - async $ runReaderT (processEvent pool event) (logger, httpSess, cacheRef, eectx) + async $ runReaderT (processEvent logenv pool event) (logger, httpSess, cacheRef, eectx) processEvent :: ( MonadReader r m @@ -152,10 +185,10 @@ processEvent , Has CacheRef r , Has EventEngineCtx r ) - => Q.PGPool -> Event -> m () -processEvent pool e = do + => LogEnvHeaders -> Q.PGPool -> Event -> m () +processEvent logenv pool e = do (logger:: HLogger) <- asks getter - res <- tryWebhook pool e + res <- tryWebhook logenv pool e finally <- either errorFn successFn res liftIO $ either (logQErr logger) (void.return) finally where @@ -171,7 +204,7 @@ processEvent pool e = do errorFn err = do (logger:: HLogger) <- asks getter liftIO $ logger $ L.toEngineLog err - checkError + checkError err successFn :: ( MonadReader r m @@ -181,7 +214,7 @@ processEvent pool e = do , Has CacheRef r , Has EventEngineCtx r ) - => B.ByteString -> m (Either QErr ()) + => HTTPResp -> m (Either QErr ()) successFn _ = liftIO $ runExceptT $ runUnlockQ pool e logQErr :: HLogger -> QErr -> IO () @@ -195,17 +228,45 @@ processEvent pool e = do , Has CacheRef r , Has EventEngineCtx r ) - => m (Either QErr ()) - checkError = do + => HTTPErr -> m (Either QErr ()) + checkError err = do + let mretryHeader = getRetryAfterHeaderFromError err cacheRef::CacheRef <- asks getter (cache, _) <- liftIO $ readIORef cacheRef let eti = getEventTriggerInfoFromEvent cache e retryConfM = etiRetryConf <$> eti retryConf = fromMaybe (RetryConf 0 10) retryConfM tries = eTries e - if tries >= rcNumRetries retryConf -- current_try = tries + 1 , allowed_total_tries = rcNumRetries retryConf + 1 + retryHeaderSeconds = parseRetryHeader mretryHeader + triesExhausted = tries >= rcNumRetries retryConf + noRetryHeader = isNothing retryHeaderSeconds + if triesExhausted && noRetryHeader -- current_try = tries + 1 , allowed_total_tries = rcNumRetries retryConf + 1 then liftIO $ runExceptT $ runErrorAndUnlockQ pool e - else liftIO $ runExceptT $ runRetryAfterAndUnlockQ pool e retryConf + else do + let delay = chooseDelay triesExhausted (rcIntervalSec retryConf) retryHeaderSeconds + liftIO $ runExceptT $ runRetryAfterAndUnlockQ pool e delay + + getRetryAfterHeaderFromError (HStatus resp) = getRetryAfterHeaderFromResp resp + getRetryAfterHeaderFromError _ = Nothing + + getRetryAfterHeaderFromResp resp + = let mHeader = find (\(HeaderConf name _) -> CI.mk name == retryAfterHeader) (hrsHeaders resp) + in case mHeader of + Just (HeaderConf _ (HVValue value)) -> Just value + _ -> Nothing + + parseRetryHeader Nothing = Nothing + parseRetryHeader (Just hValue) + = let seconds = readMaybe $ T.unpack hValue + in case seconds of + Nothing -> Nothing + Just sec -> if sec > 0 then Just sec else Nothing + + -- we need to choose delay between the retry conf and the retry-after header + chooseDelay _ retryConfSec Nothing = retryConfSec + chooseDelay triesExhausted retryConfSec (Just retryHeaderSec) = if triesExhausted + then retryHeaderSec + else min retryHeaderSec retryConfSec tryWebhook :: ( MonadReader r m @@ -215,8 +276,8 @@ tryWebhook , Has CacheRef r , Has EventEngineCtx r ) - => Q.PGPool -> Event -> m (Either HTTPErr B.ByteString) -tryWebhook pool e = do + => LogEnvHeaders -> Q.PGPool -> Event -> m (Either HTTPErr HTTPResp) +tryWebhook logenv pool e = do logger:: HLogger <- asks getter cacheRef::CacheRef <- asks getter (cache, _) <- liftIO $ readIORef cacheRef @@ -227,8 +288,8 @@ tryWebhook pool e = do let webhook = etiWebhook eti createdAt = eCreatedAt e eventId = eId e - headersRaw = etiHeaders eti - headers = map encodeHeader headersRaw + headerInfos = etiHeaders eti + headers = map encodeHeader headerInfos eeCtx <- asks getter -- wait for counter and then increment beforing making http liftIO $ atomically $ do @@ -237,7 +298,9 @@ tryWebhook pool e = do if countThreads >= maxT then retry else modifyTVar' c (+1) - eitherResp <- runExceptT $ runHTTP (addHeaders headers W.defaults) (mkAnyHTTPPost (T.unpack webhook) (Just $ toJSON e)) (Just (ExtraContext createdAt eventId)) + let options = addHeaders headers W.defaults + decodedHeaders = map (decodeHeader headerInfos) $ options CL.^. W.headers + eitherResp <- runExceptT $ runHTTP options (mkAnyHTTPPost (T.unpack webhook) (Just $ toJSON e)) (Just (ExtraContext createdAt eventId)) --decrement counter once http is done liftIO $ atomically $ do @@ -247,24 +310,78 @@ tryWebhook pool e = do finally <- liftIO $ runExceptT $ case eitherResp of Left err -> case err of - HClient excp -> runFailureQ pool $ Invocation (eId e) 1000 (toJSON e) (TBS.fromLBS $ encode $ show excp) - HParse _ detail -> runFailureQ pool $ Invocation (eId e) 1001 (toJSON e) (TBS.fromLBS $ encode detail) - HStatus status detail -> runFailureQ pool $ Invocation (eId e) (fromIntegral $ N.statusCode status) (toJSON e) detail - HOther detail -> runFailureQ pool $ Invocation (eId e) 500 (toJSON e) (TBS.fromLBS $ encode detail) - Right resp -> runSuccessQ pool e $ Invocation (eId e) 200 (toJSON e) (TBS.fromLBS resp) + HClient excp -> let errMsg = TBS.fromLBS $ encode $ show excp + in runFailureQ pool $ mkInvo e 1000 decodedHeaders errMsg [] + HParse _ detail -> let errMsg = TBS.fromLBS $ encode detail + in runFailureQ pool $ mkInvo e 1001 decodedHeaders errMsg [] + HStatus errResp -> let respPayload = hrsBody errResp + respHeaders = hrsHeaders errResp + respStatus = hrsStatus errResp + in runFailureQ pool $ mkInvo e respStatus decodedHeaders respPayload respHeaders + HOther detail -> let errMsg = (TBS.fromLBS $ encode detail) + in runFailureQ pool $ mkInvo e 500 decodedHeaders errMsg [] + Right resp -> let respPayload = hrsBody resp + respHeaders = hrsHeaders resp + respStatus = hrsStatus resp + in runSuccessQ pool e $ mkInvo e respStatus decodedHeaders respPayload respHeaders case finally of Left err -> liftIO $ logger $ L.toEngineLog $ EventInternalErr err Right _ -> return () return eitherResp where + mkInvo :: Event -> Int -> [HeaderConf] -> TBS.TByteString -> [HeaderConf] -> Invocation + mkInvo e' status reqHeaders respBody respHeaders + = let resp = if isInitError status then mkErr respBody else mkResp status respBody respHeaders + in + Invocation + (eId e') + status + (mkWebhookReq (toJSON e) reqHeaders) + resp addHeaders :: [(N.HeaderName, BS.ByteString)] -> W.Options -> W.Options addHeaders headers opts = foldl (\acc h -> acc CL.& W.header (fst h) CL..~ [snd h] ) opts headers - encodeHeader :: (HeaderName, T.Text)-> (N.HeaderName, BS.ByteString) - encodeHeader header = - let name = CI.mk $ T.encodeUtf8 $ fst header - value = T.encodeUtf8 $ snd header - in (name, value) + encodeHeader :: EventHeaderInfo -> (N.HeaderName, BS.ByteString) + encodeHeader (EventHeaderInfo hconf cache) = + let (HeaderConf name _) = hconf + ciname = CI.mk $ T.encodeUtf8 name + value = T.encodeUtf8 cache + in (ciname, value) + + decodeHeader :: [EventHeaderInfo] -> (N.HeaderName, BS.ByteString) -> HeaderConf + decodeHeader headerInfos (hdrName, hdrVal) + = let name = decodeBS $ CI.original hdrName + getName ehi = let (HeaderConf name' _) = ehiHeaderConf ehi + in name' + mehi = find (\hi -> getName hi == name) headerInfos + in case mehi of + Nothing -> HeaderConf name (HVValue (decodeBS hdrVal)) + Just ehi -> if logenv + then HeaderConf name (HVValue (ehiCachedValue ehi)) + else ehiHeaderConf ehi + where + decodeBS = TE.decodeUtf8With TE.lenientDecode + + mkWebhookReq :: Value -> [HeaderConf] -> Request + mkWebhookReq payload headers = Request payload (mkMaybe headers) invocationVersion + + mkResp :: Int -> TBS.TByteString -> [HeaderConf] -> Response + mkResp status payload headers = + let wr = WebhookResponse payload (mkMaybe headers) status + in ResponseType1 wr + + mkErr :: TBS.TByteString -> Response + mkErr message = + let ir = InitError message + in ResponseType2 ir + + mkMaybe :: [a] -> Maybe [a] + mkMaybe [] = Nothing + mkMaybe x = Just x + + isInitError :: Int -> Bool + isInitError status = status >= 1000 + getEventTriggerInfoFromEvent :: SchemaCache -> Event -> Maybe EventTriggerInfo getEventTriggerInfoFromEvent sc e = let table = eTable e @@ -291,7 +408,7 @@ insertInvocation invo = do Q.unitQE defaultTxErrorHandler [Q.sql| INSERT INTO hdb_catalog.event_invocation_logs (event_id, status, request, response) VALUES ($1, $2, $3, $4) - |] (iEventId invo, iStatus invo, Q.AltJ $ toJSON $ iRequest invo, Q.AltJ $ toJSON $ iResponse invo) True + |] (iEventId invo, toInt64 $ iStatus invo, Q.AltJ $ toJSON $ iRequest invo, Q.AltJ $ toJSON $ iResponse invo) True Q.unitQE defaultTxErrorHandler [Q.sql| UPDATE hdb_catalog.event_log SET tries = tries + 1 @@ -360,13 +477,16 @@ runErrorAndUnlockQ pool e = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ clearNextRetry e unlockEvent e -runRetryAfterAndUnlockQ :: Q.PGPool -> Event -> RetryConf -> ExceptT QErr IO () -runRetryAfterAndUnlockQ pool e rconf = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ do +runRetryAfterAndUnlockQ :: Q.PGPool -> Event -> Int -> ExceptT QErr IO () +runRetryAfterAndUnlockQ pool e delay = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ do currentTime <- liftIO getCurrentTime - let diff = fromIntegral $ rcIntervalSec rconf + let diff = fromIntegral delay retryTime = addUTCTime diff currentTime setNextRetry e retryTime unlockEvent e runUnlockQ :: Q.PGPool -> Event -> ExceptT QErr IO () runUnlockQ pool e = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ unlockEvent e + +toInt64 :: (Integral a) => a -> Int64 +toInt64 = fromIntegral diff --git a/server/src-lib/Hasura/RQL/DDL/Schema/Table.hs b/server/src-lib/Hasura/RQL/DDL/Schema/Table.hs index eaca89560f0..048ebb74f6d 100644 --- a/server/src-lib/Hasura/RQL/DDL/Schema/Table.hs +++ b/server/src-lib/Hasura/RQL/DDL/Schema/Table.hs @@ -380,12 +380,10 @@ buildSchemaCache = flip execStateT emptySchemaCache $ do let headerConfs = fromMaybe [] mheaders qt = QualifiedTable sn tn allCols <- getCols . tiFieldInfoMap <$> askTabInfo qt - headers <- getHeadersFromConf headerConfs + headers <- getHeaderInfosFromConf headerConfs tDef <- decodeValue tDefVal addEventTriggerToCache (QualifiedTable sn tn) trid trn tDef (RetryConf nr rint) webhook headers liftTx $ mkTriggerQ trid trn qt allCols tDef - - where permHelper sn tn rn pDef pa = do qCtx <- mkAdminQCtx <$> get diff --git a/server/src-lib/Hasura/RQL/DDL/Subscribe.hs b/server/src-lib/Hasura/RQL/DDL/Subscribe.hs index d0e4c71c16a..6f70ffecd19 100644 --- a/server/src-lib/Hasura/RQL/DDL/Subscribe.hs +++ b/server/src-lib/Hasura/RQL/DDL/Subscribe.hs @@ -256,8 +256,8 @@ subTableP2 qt replace q@(EventTriggerDef name def webhook rconf mheaders) = do else liftTx $ addEventTriggerToCatalog qt allCols q let headerConfs = fromMaybe [] mheaders - headers <- getHeadersFromConf headerConfs - addEventTriggerToCache qt trid name def rconf webhook headers + headerInfos <- getHeaderInfosFromConf headerConfs + addEventTriggerToCache qt trid name def rconf webhook headerInfos subTableP2shim :: (P2C m) => (QualifiedTable, Bool, EventTriggerDef) -> m RespBody subTableP2shim (qt, replace, etdef) = do @@ -300,17 +300,17 @@ instance HDBQuery DeliverEventQuery where phaseTwo q _ = deliverEvent q schemaCachePolicy = SCPNoChange -getHeadersFromConf :: (P2C m) => [HeaderConf] -> m [(HeaderName, T.Text)] -getHeadersFromConf = mapM getHeader +getHeaderInfosFromConf :: (P2C m) => [HeaderConf] -> m [EventHeaderInfo] +getHeaderInfosFromConf = mapM getHeader where - getHeader :: (P2C m) => HeaderConf -> m (HeaderName, T.Text) + getHeader :: (P2C m) => HeaderConf -> m EventHeaderInfo getHeader hconf = case hconf of - (HeaderConf name (HVValue val)) -> return (name, val) - (HeaderConf name (HVEnv val)) -> do + (HeaderConf _ (HVValue val)) -> return $ EventHeaderInfo hconf val + (HeaderConf _ (HVEnv val)) -> do mEnv <- liftIO $ lookupEnv (T.unpack val) case mEnv of Nothing -> throw400 NotFound $ "environment variable '" <> val <> "' not set" - Just val' -> return (name, T.pack val') + Just envval -> return $ EventHeaderInfo hconf (T.pack envval) toInt64 :: (Integral a) => a -> Int64 toInt64 = fromIntegral diff --git a/server/src-lib/Hasura/RQL/Types/SchemaCache.hs b/server/src-lib/Hasura/RQL/Types/SchemaCache.hs index cf0dabe4d1e..acb94a88c35 100644 --- a/server/src-lib/Hasura/RQL/Types/SchemaCache.hs +++ b/server/src-lib/Hasura/RQL/Types/SchemaCache.hs @@ -376,7 +376,7 @@ data EventTriggerInfo , etiDelete :: !(Maybe OpTriggerInfo) , etiRetryConf :: !RetryConf , etiWebhook :: !T.Text - , etiHeaders :: ![(HeaderName, T.Text)] + , etiHeaders :: ![EventHeaderInfo] } deriving (Show, Eq) $(deriveToJSON (aesonDrop 3 snakeCase) ''EventTriggerInfo) @@ -637,7 +637,7 @@ addEventTriggerToCache -> TriggerOpsDef -> RetryConf -> T.Text - -> [(HeaderName, T.Text)] + -> [EventHeaderInfo] -> m () addEventTriggerToCache qt trid trn tdef rconf webhook headers = modTableInCache modEventTriggerInfo qt diff --git a/server/src-lib/Hasura/RQL/Types/Subscribe.hs b/server/src-lib/Hasura/RQL/Types/Subscribe.hs index c5436264a97..703d684122b 100644 --- a/server/src-lib/Hasura/RQL/Types/Subscribe.hs +++ b/server/src-lib/Hasura/RQL/Types/Subscribe.hs @@ -19,6 +19,7 @@ module Hasura.RQL.Types.Subscribe , HeaderConf(..) , HeaderValue(..) , HeaderName + , EventHeaderInfo(..) ) where import Data.Aeson @@ -89,6 +90,14 @@ instance ToJSON HeaderConf where toJSON (HeaderConf name (HVValue val)) = object ["name" .= name, "value" .= val] toJSON (HeaderConf name (HVEnv val)) = object ["name" .= name, "value_from_env" .= val] +data EventHeaderInfo + = EventHeaderInfo + { ehiHeaderConf :: !HeaderConf + , ehiCachedValue :: !T.Text + } deriving (Show, Eq, Lift) + +$(deriveToJSON (aesonDrop 3 snakeCase){omitNothingFields=True} ''EventHeaderInfo) + data CreateEventTriggerQuery = CreateEventTriggerQuery { cetqName :: !T.Text