1
1
mirror of https://github.com/github/semantic.git synced 2024-12-26 16:33:03 +03:00

Merge pull request #1957 from github/bounded-queues

Bounded queues
This commit is contained in:
Rob Rix 2018-06-13 08:44:25 -04:00 committed by GitHub
commit 3e7d06f0a1
3 changed files with 25 additions and 23 deletions

View File

@ -5,11 +5,11 @@ import Prologue
import System.Environment
import Text.Read (readMaybe)
envLookupHost :: MonadIO io => String -> String -> io String
envLookupHost defaultHost k = liftIO $ fromMaybe defaultHost <$> lookupEnv k
envLookupString :: MonadIO io => String -> String -> io String
envLookupString defaultVal k = liftIO $ fromMaybe defaultVal <$> lookupEnv k
envLookupPort :: MonadIO io => Int -> String -> io Int
envLookupPort defaultPort k = liftIO $ parsePort <$> lookupEnv k
where parsePort x | Just s <- x
envLookupInt :: MonadIO io => Int -> String -> io Int
envLookupInt defaultVal k = liftIO $ parse <$> lookupEnv k
where parse x | Just s <- x
, Just p <- readMaybe s = p
| otherwise = defaultPort
| otherwise = defaultVal

View File

@ -9,46 +9,47 @@ module Semantic.Queue
where
import Control.Concurrent.Async as Async
import Control.Concurrent.STM.TMQueue
import Control.Concurrent.STM.TBMQueue
import Control.Monad
import GHC.Conc
-- | 'AsyncQueue' represents a 'TMQueue' that's drained from a separate thread.
-- | 'AsyncQueue' represents a 'TBMQueue' that's drained from a separate thread.
-- It is intended to be used to queue data from a pure function and then process
-- that data in IO on a separate thread. 'AsyncQueue' is parameterized by:
-- * 'a' - the type of message stored on the queue.
-- * 'extra' - any other type needed to process messages on the queue.
data AsyncQueue a extra
= AsyncQueue
{ asyncQueue :: TMQueue a -- ^ The underlying 'TMQueue'.
{ asyncQueue :: TBMQueue a -- ^ The underlying 'TBMQueue'.
, asyncQueueSink :: Async () -- ^ A sink that will drain the queue.
, asyncQueueExtra :: extra -- ^ Any exta data the queue needs to use.
}
-- | Create a new AsyncQueue using the default sink.
newQueue :: (extra -> a -> IO ()) -> extra -> IO (AsyncQueue a extra)
newQueue = newQueue' . sink
-- | Create a new AsyncQueue with the given capacity using the default sink.
newQueue :: Int -> (extra -> a -> IO ()) -> extra -> IO (AsyncQueue a extra)
newQueue i = newQueue' i . sink
-- | Create a new AsyncQueue, specifying a custom sink.
newQueue' :: (extra -> TMQueue a -> IO ()) -> extra -> IO (AsyncQueue a extra)
newQueue' f extra = do
q <- newTMQueueIO
-- | Create a new AsyncQueue with the given capacity, specifying a custom sink.
newQueue' :: Int -> (extra -> TBMQueue a -> IO ()) -> extra -> IO (AsyncQueue a extra)
newQueue' i f extra = do
q <- newTBMQueueIO i
s <- Async.async (f extra q)
pure (AsyncQueue q s extra)
-- | Queue a message.
queue :: AsyncQueue a extra -> a -> IO ()
queue AsyncQueue{..} = atomically . writeTMQueue asyncQueue
queue AsyncQueue{..} = void . atomically . tryWriteTBMQueue asyncQueue
-- | Drain messages from the queue, calling the specified function for each message.
sink :: (extra -> a -> IO ()) -> extra -> TMQueue a -> IO ()
sink :: (extra -> a -> IO ()) -> extra -> TBMQueue a -> IO ()
sink f extra q = do
msg <- atomically (readTMQueue q)
msg <- atomically (readTBMQueue q)
maybe (pure ()) go msg
where go msg = f extra msg >> sink f extra q
-- | Close the queue.
closeQueue :: AsyncQueue a extra -> IO ()
closeQueue AsyncQueue{..} = do
atomically (closeTMQueue asyncQueue)
atomically (closeTBMQueue asyncQueue)
Async.wait asyncQueueSink

View File

@ -132,9 +132,10 @@ runTask = runTaskWithOptions defaultOptions
-- | Execute a 'TaskEff' with the passed 'Options', yielding its result value in 'IO'.
runTaskWithOptions :: Options -> TaskEff a -> IO a
runTaskWithOptions options task = do
let size = 100 -- Max size of telemetry queues, less important for the CLI.
options <- configureOptionsForHandle stderr options
statter <- defaultStatsClient >>= newQueue sendStat
logger <- newQueue logMessage options
statter <- defaultStatsClient >>= newQueue size sendStat
logger <- newQueue size logMessage options
result <- runTaskWithOptions' options logger statter task