change type of fetch interval to milliseconds (#939)

This commit is contained in:
Tirumarai Selvan 2018-10-30 20:50:18 +05:30 committed by Shahidh K Muhammed
parent 4890434477
commit b40807c9ec
2 changed files with 17 additions and 17 deletions

View File

@ -169,10 +169,10 @@ main = do
void $ C.forkIO $ checkForUpdates loggerCtx httpManager
maxEvThrds <- getFromEnv defaultMaxEventThreads "HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE"
evPollSec <- getFromEnv defaultPollingIntervalSec "HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL"
evFetchMilliSec <- getFromEnv defaultFetchIntervalMilliSec "HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL"
logEnvHeaders <- getFromEnv False "LOG_HEADERS_FROM_ENV"
eventEngineCtx <- atomically $ initEventEngineCtx maxEvThrds evPollSec
eventEngineCtx <- atomically $ initEventEngineCtx maxEvThrds evFetchMilliSec
httpSession <- WrqS.newSessionControl Nothing TLS.tlsManagerSettings
void $ C.forkIO $ processEventQueue hloggerCtx logEnvHeaders httpSession pool cacheRef eventEngineCtx

View File

@ -9,7 +9,7 @@ module Hasura.Events.Lib
, processEventQueue
, unlockAllEvents
, defaultMaxEventThreads
, defaultPollingIntervalSec
, defaultFetchIntervalMilliSec
, Event(..)
) where
@ -129,45 +129,45 @@ data Invocation
data EventEngineCtx
= EventEngineCtx
{ _eeCtxEventQueue :: TQ.TQueue Event
, _eeCtxEventThreads :: TVar Int
, _eeCtxMaxEventThreads :: Int
, _eeCtxPollingIntervalSec :: Int
{ _eeCtxEventQueue :: TQ.TQueue Event
, _eeCtxEventThreads :: TVar Int
, _eeCtxMaxEventThreads :: Int
, _eeCtxFetchIntervalMilliSec :: Int
}
defaultMaxEventThreads :: Int
defaultMaxEventThreads = 100
defaultPollingIntervalSec :: Int
defaultPollingIntervalSec = 1
defaultFetchIntervalMilliSec :: Int
defaultFetchIntervalMilliSec = 1000
retryAfterHeader :: CI.CI T.Text
retryAfterHeader = "Retry-After"
initEventEngineCtx :: Int -> Int -> STM EventEngineCtx
initEventEngineCtx maxT pollI = do
initEventEngineCtx maxT fetchI = do
q <- TQ.newTQueue
c <- newTVar 0
return $ EventEngineCtx q c maxT pollI
return $ EventEngineCtx q c maxT fetchI
processEventQueue :: L.LoggerCtx -> LogEnvHeaders -> WS.Session -> Q.PGPool -> CacheRef -> EventEngineCtx -> IO ()
processEventQueue logctx logenv httpSess pool cacheRef eectx = do
putStrLn "event_trigger: starting workers"
threads <- mapM async [pollThread , consumeThread]
threads <- mapM async [fetchThread , consumeThread]
void $ waitAny threads
where
pollThread = pollEvents (mkHLogger logctx) pool eectx
fetchThread = pushEvents (mkHLogger logctx) pool eectx
consumeThread = consumeEvents (mkHLogger logctx) logenv httpSess pool cacheRef eectx
pollEvents
pushEvents
:: HLogger -> Q.PGPool -> EventEngineCtx -> IO ()
pollEvents logger pool eectx = forever $ do
let EventEngineCtx q _ _ pollI = eectx
pushEvents logger pool eectx = forever $ do
let EventEngineCtx q _ _ fetchI = eectx
eventsOrError <- runExceptT $ Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) fetchEvents
case eventsOrError of
Left err -> logger $ L.toEngineLog $ EventInternalErr err
Right events -> atomically $ mapM_ (TQ.writeTQueue q) events
threadDelay (pollI * 1000 * 1000)
threadDelay (fetchI * 1000)
consumeEvents
:: HLogger -> LogEnvHeaders -> WS.Session -> Q.PGPool -> CacheRef -> EventEngineCtx -> IO ()