mirror of
https://github.com/github/semantic.git
synced 2024-12-30 10:27:45 +03:00
Merge branch 'master' into higher-order-effects
This commit is contained in:
commit
e77f9fb9c3
@ -5,11 +5,11 @@ import Prologue
|
||||
import System.Environment
|
||||
import Text.Read (readMaybe)
|
||||
|
||||
envLookupHost :: MonadIO io => String -> String -> io String
|
||||
envLookupHost defaultHost k = liftIO $ fromMaybe defaultHost <$> lookupEnv k
|
||||
envLookupString :: MonadIO io => String -> String -> io String
|
||||
envLookupString defaultVal k = liftIO $ fromMaybe defaultVal <$> lookupEnv k
|
||||
|
||||
envLookupPort :: MonadIO io => Int -> String -> io Int
|
||||
envLookupPort defaultPort k = liftIO $ parsePort <$> lookupEnv k
|
||||
where parsePort x | Just s <- x
|
||||
envLookupInt :: MonadIO io => Int -> String -> io Int
|
||||
envLookupInt defaultVal k = liftIO $ parse <$> lookupEnv k
|
||||
where parse x | Just s <- x
|
||||
, Just p <- readMaybe s = p
|
||||
| otherwise = defaultPort
|
||||
| otherwise = defaultVal
|
||||
|
@ -9,46 +9,47 @@ module Semantic.Queue
|
||||
where
|
||||
|
||||
import Control.Concurrent.Async as Async
|
||||
import Control.Concurrent.STM.TMQueue
|
||||
import Control.Concurrent.STM.TBMQueue
|
||||
import Control.Monad
|
||||
import GHC.Conc
|
||||
|
||||
-- | 'AsyncQueue' represents a 'TMQueue' that's drained from a separate thread.
|
||||
-- | 'AsyncQueue' represents a 'TBMQueue' that's drained from a separate thread.
|
||||
-- It is intended to be used to queue data from a pure function and then process
|
||||
-- that data in IO on a separate thread. 'AsyncQueue' is parameterized by:
|
||||
-- * 'a' - the type of message stored on the queue.
|
||||
-- * 'extra' - any other type needed to process messages on the queue.
|
||||
data AsyncQueue a extra
|
||||
= AsyncQueue
|
||||
{ asyncQueue :: TMQueue a -- ^ The underlying 'TMQueue'.
|
||||
{ asyncQueue :: TBMQueue a -- ^ The underlying 'TBMQueue'.
|
||||
, asyncQueueSink :: Async () -- ^ A sink that will drain the queue.
|
||||
, asyncQueueExtra :: extra -- ^ Any exta data the queue needs to use.
|
||||
}
|
||||
|
||||
|
||||
-- | Create a new AsyncQueue using the default sink.
|
||||
newQueue :: (extra -> a -> IO ()) -> extra -> IO (AsyncQueue a extra)
|
||||
newQueue = newQueue' . sink
|
||||
-- | Create a new AsyncQueue with the given capacity using the default sink.
|
||||
newQueue :: Int -> (extra -> a -> IO ()) -> extra -> IO (AsyncQueue a extra)
|
||||
newQueue i = newQueue' i . sink
|
||||
|
||||
-- | Create a new AsyncQueue, specifying a custom sink.
|
||||
newQueue' :: (extra -> TMQueue a -> IO ()) -> extra -> IO (AsyncQueue a extra)
|
||||
newQueue' f extra = do
|
||||
q <- newTMQueueIO
|
||||
-- | Create a new AsyncQueue with the given capacity, specifying a custom sink.
|
||||
newQueue' :: Int -> (extra -> TBMQueue a -> IO ()) -> extra -> IO (AsyncQueue a extra)
|
||||
newQueue' i f extra = do
|
||||
q <- newTBMQueueIO i
|
||||
s <- Async.async (f extra q)
|
||||
pure (AsyncQueue q s extra)
|
||||
|
||||
-- | Queue a message.
|
||||
queue :: AsyncQueue a extra -> a -> IO ()
|
||||
queue AsyncQueue{..} = atomically . writeTMQueue asyncQueue
|
||||
queue AsyncQueue{..} = void . atomically . tryWriteTBMQueue asyncQueue
|
||||
|
||||
-- | Drain messages from the queue, calling the specified function for each message.
|
||||
sink :: (extra -> a -> IO ()) -> extra -> TMQueue a -> IO ()
|
||||
sink :: (extra -> a -> IO ()) -> extra -> TBMQueue a -> IO ()
|
||||
sink f extra q = do
|
||||
msg <- atomically (readTMQueue q)
|
||||
msg <- atomically (readTBMQueue q)
|
||||
maybe (pure ()) go msg
|
||||
where go msg = f extra msg >> sink f extra q
|
||||
|
||||
-- | Close the queue.
|
||||
closeQueue :: AsyncQueue a extra -> IO ()
|
||||
closeQueue AsyncQueue{..} = do
|
||||
atomically (closeTMQueue asyncQueue)
|
||||
atomically (closeTBMQueue asyncQueue)
|
||||
Async.wait asyncQueueSink
|
||||
|
@ -134,9 +134,10 @@ runTask = runTaskWithOptions defaultOptions
|
||||
-- | Execute a 'TaskEff' with the passed 'Options', yielding its result value in 'IO'.
|
||||
runTaskWithOptions :: Options -> TaskEff a -> IO a
|
||||
runTaskWithOptions options task = do
|
||||
let size = 100 -- Max size of telemetry queues, less important for the CLI.
|
||||
options <- configureOptionsForHandle stderr options
|
||||
statter <- defaultStatsClient >>= newQueue sendStat
|
||||
logger <- newQueue logMessage options
|
||||
statter <- defaultStatsClient >>= newQueue size sendStat
|
||||
logger <- newQueue size logMessage options
|
||||
|
||||
result <- runTaskWithOptions' options logger statter task
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user