{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
module Hasura.Events.Lib
( initEventEngineCtx
, processEventQueue
, unlockAllEvents
, defaultMaxEventThreads
, defaultPollingIntervalSec
, Event(..)
) where
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (async, waitAny)
import Control.Concurrent.STM.TVar
import Control.Monad.STM (STM, atomically, retry)
import Data.Aeson
import Data.Aeson.Casing
import Data.Aeson.TH
import Data.Has
import Data.Int (Int64)
import Data.IORef (IORef, readIORef)
import Data.Time.Clock
import Hasura.Events.HTTP
import Hasura.Prelude
import Hasura.RQL.Types
import Hasura.SQL.Types
import qualified Control.Concurrent.STM.TQueue as TQ
import qualified Control.Lens as CL
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as B
import qualified Data.CaseInsensitive as CI
import qualified Data.HashMap.Strict as M
import qualified Data.TByteString as TBS
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import qualified Data.Time.Clock as Time
import qualified Database.PG.Query as Q
import qualified Hasura.GraphQL.Schema as GS
import qualified Hasura.Logging as L
import qualified Network.HTTP.Types as N
import qualified Network.Wreq as W
import qualified Network.Wreq.Session as WS
type CacheRef = IORef (SchemaCache, GS.GCtxMap)
newtype EventInternalErr
= EventInternalErr QErr
deriving (Show, Eq)
instance L.ToEngineLog EventInternalErr where
toEngineLog (EventInternalErr qerr) = (L.LevelError, "event-trigger", toJSON qerr )
data TriggerMeta
= TriggerMeta
{ tmId :: TriggerId
, tmName :: TriggerName
} deriving (Show, Eq)
$(deriveJSON (aesonDrop 2 snakeCase){omitNothingFields=True} ''TriggerMeta)
data Event
= Event
{ eId :: EventId
, eTable :: QualifiedTable
, eTrigger :: TriggerMeta
, eEvent :: Value
-- , eDelivered :: Bool
-- , eError :: Bool
, eTries :: Int
, eCreatedAt :: Time.UTCTime
} deriving (Show, Eq)
instance ToJSON Event where
toJSON (Event eid (QualifiedTable sn tn) trigger event _ created)=
object [ "id" .= eid
, "table" .= object [ "schema" .= sn
, "name" .= tn
, "trigger" .= trigger
, "event" .= event
, "created_at" .= created
$(deriveFromJSON (aesonDrop 1 snakeCase){omitNothingFields=True} ''Event)
data Invocation
= Invocation
{ iEventId :: EventId
, iStatus :: Int64
, iRequest :: Value
, iResponse :: TBS.TByteString
data EventEngineCtx
= EventEngineCtx
{ _eeCtxEventQueue :: TQ.TQueue Event
, _eeCtxEventThreads :: TVar Int
, _eeCtxMaxEventThreads :: Int
, _eeCtxPollingIntervalSec :: Int
defaultMaxEventThreads :: Int
defaultMaxEventThreads = 100
defaultPollingIntervalSec :: Int
defaultPollingIntervalSec = 1
initEventEngineCtx :: Int -> Int -> STM EventEngineCtx
initEventEngineCtx maxT pollI = do
q <- TQ.newTQueue
c <- newTVar 0
return $ EventEngineCtx q c maxT pollI
processEventQueue :: L.LoggerCtx -> WS.Session -> Q.PGPool -> CacheRef -> EventEngineCtx -> IO ()
processEventQueue logctx httpSess pool cacheRef eectx = do
putStrLn "event_trigger: starting workers"
threads <- mapM async [pollThread , consumeThread]
void $ waitAny threads
pollThread = pollEvents (mkHLogger logctx) pool eectx
consumeThread = consumeEvents (mkHLogger logctx) httpSess pool cacheRef eectx
:: HLogger -> Q.PGPool -> EventEngineCtx -> IO ()
pollEvents logger pool eectx = forever $ do
let EventEngineCtx q _ _ pollI = eectx
eventsOrError <- runExceptT $ Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) fetchEvents
case eventsOrError of
Left err -> logger $ L.toEngineLog $ EventInternalErr err
Right events -> atomically $ mapM_ (TQ.writeTQueue q) events
threadDelay (pollI * 1000 * 1000)
:: HLogger -> WS.Session -> Q.PGPool -> CacheRef -> EventEngineCtx -> IO ()
consumeEvents logger httpSess pool cacheRef eectx = forever $ do
event <- atomically $ do
let EventEngineCtx q _ _ _ = eectx
TQ.readTQueue q
async $ runReaderT (processEvent pool event) (logger, httpSess, cacheRef, eectx)
:: ( MonadReader r m
, MonadIO m
, Has WS.Session r
, Has HLogger r
, Has CacheRef r
, Has EventEngineCtx r
=> Q.PGPool -> Event -> m ()
processEvent pool e = do
(logger:: HLogger) <- asks getter
res <- tryWebhook pool e
finally <- either errorFn successFn res
liftIO $ either (logQErr logger) (void.return) finally
:: ( MonadReader r m
, MonadIO m
, Has WS.Session r
, Has HLogger r
, Has CacheRef r
, Has EventEngineCtx r
=> HTTPErr -> m (Either QErr ())
errorFn err = do
(logger:: HLogger) <- asks getter
liftIO $ logger $ L.toEngineLog err
:: ( MonadReader r m
, MonadIO m
, Has WS.Session r
, Has HLogger r
, Has CacheRef r
, Has EventEngineCtx r
=> B.ByteString -> m (Either QErr ())
successFn _ = liftIO $ runExceptT $ runUnlockQ pool e
logQErr :: HLogger -> QErr -> IO ()
logQErr logger err = logger $ L.toEngineLog $ EventInternalErr err
:: ( MonadReader r m
, MonadIO m
, Has WS.Session r
, Has HLogger r
, Has CacheRef r
, Has EventEngineCtx r
=> m (Either QErr ())
checkError = do
cacheRef::CacheRef <- asks getter
(cache, _) <- liftIO $ readIORef cacheRef
let eti = getEventTriggerInfoFromEvent cache e
retryConfM = etiRetryConf <$> eti
retryConf = fromMaybe (RetryConf 0 10) retryConfM
tries = eTries e
if tries >= rcNumRetries retryConf -- current_try = tries + 1 , allowed_total_tries = rcNumRetries retryConf + 1
then liftIO $ runExceptT $ runErrorAndUnlockQ pool e
else liftIO $ runExceptT $ runRetryAfterAndUnlockQ pool e retryConf
:: ( MonadReader r m
, MonadIO m
, Has WS.Session r
, Has HLogger r
, Has CacheRef r
, Has EventEngineCtx r
=> Q.PGPool -> Event -> m (Either HTTPErr B.ByteString)
tryWebhook pool e = do
logger:: HLogger <- asks getter
cacheRef::CacheRef <- asks getter
(cache, _) <- liftIO $ readIORef cacheRef
let meti = getEventTriggerInfoFromEvent cache e
case meti of
Nothing -> return $ Left $ HOther "table or event-trigger not found"
Just eti -> do
let webhook = etiWebhook eti
createdAt = eCreatedAt e
eventId = eId e
headersRaw = etiHeaders eti
headers = map encodeHeader headersRaw
eeCtx <- asks getter
-- wait for counter and then increment beforing making http
liftIO $ atomically $ do
let EventEngineCtx _ c maxT _ = eeCtx
countThreads <- readTVar c
if countThreads >= maxT
then retry
else modifyTVar' c (+1)
eitherResp <- runExceptT $ runHTTP (addHeaders headers W.defaults) (mkAnyHTTPPost (T.unpack webhook) (Just $ toJSON e)) (Just (ExtraContext createdAt eventId))
--decrement counter once http is done
liftIO $ atomically $ do
let EventEngineCtx _ c _ _ = eeCtx
modifyTVar' c (\v -> v - 1)
finally <- liftIO $ runExceptT $ case eitherResp of
Left err ->
case err of
HClient excp -> runFailureQ pool $ Invocation (eId e) 1000 (toJSON e) (TBS.fromLBS $ encode $ show excp)
HParse _ detail -> runFailureQ pool $ Invocation (eId e) 1001 (toJSON e) (TBS.fromLBS $ encode detail)
HStatus status detail -> runFailureQ pool $ Invocation (eId e) (fromIntegral $ N.statusCode status) (toJSON e) detail
HOther detail -> runFailureQ pool $ Invocation (eId e) 500 (toJSON e) (TBS.fromLBS $ encode detail)
Right resp -> runSuccessQ pool e $ Invocation (eId e) 200 (toJSON e) (TBS.fromLBS resp)
case finally of
Left err -> liftIO $ logger $ L.toEngineLog $ EventInternalErr err
Right _ -> return ()
return eitherResp
addHeaders :: [(N.HeaderName, BS.ByteString)] -> W.Options -> W.Options
addHeaders headers opts = foldl (\acc h -> acc CL.& W.header (fst h) CL..~ [snd h] ) opts headers
encodeHeader :: (HeaderName, T.Text)-> (N.HeaderName, BS.ByteString)
encodeHeader header =
let name = CI.mk $ T.encodeUtf8 $ fst header
value = T.encodeUtf8 $ snd header
in (name, value)
getEventTriggerInfoFromEvent :: SchemaCache -> Event -> Maybe EventTriggerInfo
getEventTriggerInfoFromEvent sc e = let table = eTable e
tableInfo = M.lookup table $ scTables sc
in M.lookup ( tmName $ eTrigger e) =<< (tiEventTriggerInfoMap <$> tableInfo)
fetchEvents :: Q.TxE QErr [Event]
fetchEvents =
map uncurryEvent <$> Q.listQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_log
SET locked = 't'
FROM hdb_catalog.event_log l
JOIN hdb_catalog.event_triggers e
ON (l.trigger_id = e.id)
WHERE l.delivered ='f' and l.error = 'f' and l.locked = 'f' and (l.next_retry_at is NULL or l.next_retry_at <= now())
LIMIT 100 )
RETURNING id, schema_name, table_name, trigger_id, trigger_name, payload::json, tries, created_at
|] () True
where uncurryEvent (id', sn, tn, trid, trn, Q.AltJ payload, tries, created) = Event id' (QualifiedTable sn tn) (TriggerMeta trid trn) payload tries created
insertInvocation :: Invocation -> Q.TxE QErr ()
insertInvocation invo = do
Q.unitQE defaultTxErrorHandler [Q.sql|
INSERT INTO hdb_catalog.event_invocation_logs (event_id, status, request, response)
VALUES ($1, $2, $3, $4)
|] (iEventId invo, iStatus invo, Q.AltJ $ toJSON $ iRequest invo, Q.AltJ $ toJSON $ iResponse invo) True
Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_log
SET tries = tries + 1
WHERE id = $1
|] (Identity $ iEventId invo) True
markDelivered :: Event -> Q.TxE QErr ()
markDelivered e =
Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_log
SET delivered = 't', error = 'f'
WHERE id = $1
|] (Identity $ eId e) True
markError :: Event -> Q.TxE QErr ()
markError e =
Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_log
SET error = 't'
WHERE id = $1
|] (Identity $ eId e) True
unlockEvent :: Event -> Q.TxE QErr ()
unlockEvent e =
Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_log
SET locked = 'f'
WHERE id = $1
|] (Identity $ eId e) True
unlockAllEvents :: Q.TxE QErr ()
unlockAllEvents =
Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_log
SET locked = 'f'
|] () False
setNextRetry :: Event -> UTCTime -> Q.TxE QErr ()
setNextRetry e time =
Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_log
SET next_retry_at = $1
WHERE id = $2
|] (time, eId e) True
clearNextRetry :: Event -> Q.TxE QErr ()
clearNextRetry e =
Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_log
SET next_retry_at = NULL
WHERE id = $1
|] (Identity $ eId e) True
runFailureQ :: Q.PGPool -> Invocation -> ExceptT QErr IO ()
runFailureQ pool invo = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ insertInvocation invo
runSuccessQ :: Q.PGPool -> Event -> Invocation -> ExceptT QErr IO ()
runSuccessQ pool e invo = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ do
insertInvocation invo
clearNextRetry e
markDelivered e
runErrorAndUnlockQ :: Q.PGPool -> Event -> ExceptT QErr IO ()
runErrorAndUnlockQ pool e = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ do
markError e
clearNextRetry e
unlockEvent e
runRetryAfterAndUnlockQ :: Q.PGPool -> Event -> RetryConf -> ExceptT QErr IO ()
runRetryAfterAndUnlockQ pool e rconf = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ do
currentTime <- liftIO getCurrentTime
let diff = fromIntegral $ rcIntervalSec rconf
retryTime = addUTCTime diff currentTime
setNextRetry e retryTime
unlockEvent e
runUnlockQ :: Q.PGPool -> Event -> ExceptT QErr IO ()
runUnlockQ pool e = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ unlockEvent e