1
1
mirror of https://github.com/github/semantic.git synced 2024-11-24 08:54:07 +03:00

Refactor a bit more into Semantic.Queue

Also, docs and record prefixes
This commit is contained in:
Timothy Clem 2017-10-05 10:26:11 -07:00
parent ce2e3e737e
commit 87bd442c96
5 changed files with 120 additions and 103 deletions

View File

@ -1,12 +1,10 @@
module Semantic.Log where
import Control.Concurrent.STM.TMQueue
import Data.Bifunctor (second)
import Data.Error (withSGRCode)
import Data.Foldable (toList)
import Data.List (intersperse)
import Data.Semigroup ((<>))
import GHC.Conc
import qualified Data.Time.Clock.POSIX as Time (getCurrentTime)
import qualified Data.Time.Format as Time
import qualified Data.Time.LocalTime as LocalTime
@ -30,19 +28,15 @@ data Level
deriving (Eq, Ord, Show)
queueLogMessage :: AsyncQ Message Options -> Level -> String -> [(String, String)] -> IO ()
queueLogMessage AsyncQ{..} level message pairs
| Just logLevel <- optionsLevel extra, level <= logLevel = Time.getCurrentTime >>= LocalTime.utcToLocalZonedTime >>= atomically . writeTMQueue queue . Message level message pairs
-- | Queue a message to be logged.
queueLogMessage :: AsyncQueue Message Options -> Level -> String -> [(String, String)] -> IO ()
queueLogMessage q@AsyncQueue{..} level message pairs
| Just logLevel <- optionsLevel asyncQueueExtra, level <= logLevel = Time.getCurrentTime >>= LocalTime.utcToLocalZonedTime >>= queue q . Message level message pairs
| otherwise = pure ()
logSink :: Options -> TMQueue Message -> IO ()
logSink options@Options{..} queue = do
message <- atomically (readTMQueue queue)
case message of
Just message -> do
hPutStr stderr (optionsFormatter options message)
logSink options queue
_ -> pure ()
-- | Log a message to stderr.
logMessage :: Options -> Message -> IO ()
logMessage options@Options{..} message = hPutStr stderr (optionsFormatter options message)
-- | Format log messaging using "logfmt".
--

View File

@ -1,26 +1,54 @@
module Semantic.Queue where
module Semantic.Queue
(
AsyncQueue(..)
, newQueue
, newQueue'
, queue
, closeQueue
)
where
import Control.Concurrent.Async as Async
import Control.Concurrent.STM.TMQueue
import GHC.Conc
-- Represents a TMQueue that's drained from a separate thread.
data AsyncQ a b
= AsyncQ
{ queue :: TMQueue a -- The queue.
, sink :: Async () -- A sink that will drain the queue.
, extra :: b -- Any extra data the queue needs to use.
-- | 'AsyncQueue' represents a 'TMQueue' that's drained from a separate thread.
-- It is intended to be used to queue data from a pure function and then process
-- that data in IO on a separate thread. 'AsyncQueue' is parameterized by:
-- * 'a' - the type of message stored on the queue.
-- * 'extra' - any other type needed to process messages on the queue.
data AsyncQueue a extra
= AsyncQueue
{ asyncQueue :: TMQueue a -- ^ The underlying 'TMQueue'.
, asyncQueueSink :: Async () -- ^ A sink that will drain the queue.
, asyncQueueExtra :: extra -- ^ Any exta data the queue needs to use.
}
-- Create a new AsyncQ.
newQueue :: (b -> TMQueue a -> IO ()) -> b-> IO (AsyncQ a b)
newQueue f b = do
q <- newTMQueueIO
sink <- Async.async (f b q)
pure (AsyncQ q sink b)
-- Close the queue.
closeQueue :: AsyncQ a b -> IO ()
closeQueue AsyncQ{..} = do
atomically (closeTMQueue queue)
Async.wait sink
-- | Create a new AsyncQueue using the default sink.
newQueue :: (extra -> a -> IO ()) -> extra -> IO (AsyncQueue a extra)
newQueue = newQueue' . sink
-- | Create a new AsyncQueue, specifying a custom sink.
newQueue' :: (extra -> TMQueue a -> IO ()) -> extra -> IO (AsyncQueue a extra)
newQueue' f extra = do
q <- newTMQueueIO
s <- Async.async (f extra q)
pure (AsyncQueue q s extra)
-- | Queue a message.
queue :: AsyncQueue a extra -> a -> IO ()
queue AsyncQueue{..} = atomically . writeTMQueue asyncQueue
-- | Drain messages from the queue, calling the specified function for each message.
sink :: (extra -> a -> IO ()) -> extra -> TMQueue a -> IO ()
sink f extra q = do
msg <- atomically (readTMQueue q)
maybe (pure ()) go msg
where go msg = f extra msg >> sink f extra q
-- | Close the queue.
closeQueue :: AsyncQueue a extra -> IO ()
closeQueue AsyncQueue{..} = do
atomically (closeTMQueue asyncQueue)
Async.wait asyncQueueSink

View File

@ -10,26 +10,21 @@ module Semantic.Stat
, set
, Stat
, queueStat -- Queue a Stat to be sent
, statSink -- Sink the queue to the network
-- Client
, defaultStatsClient
, StatsClient(..)
-- Internal, exposed for testing
, renderDatagram
, sendStats
, sendStat
) where
import Control.Arrow ((&&&))
import Control.Concurrent.STM.TMQueue
import Data.Functor
import Data.List (intercalate)
import Data.Maybe
import Data.Monoid
import GHC.Conc
import Network.Socket (Socket(..), SocketType(..), socket, connect, getAddrInfo, addrFamily, addrAddress, defaultProtocol)
import Network.Socket.ByteString
import Network.URI
@ -37,66 +32,68 @@ import qualified Data.ByteString.Char8 as B
import System.Environment
import System.IO.Error
import Semantic.Queue
-- | A named piece of data you wish to record a specific 'Metric' for.
-- See https://docs.datadoghq.com/guides/dogstatsd/ for more details.
data Stat
= Stat { name :: String
, value :: Metric
, tags :: Tags
}
= Stat
{ statName :: String -- ^ Stat name, usually separated by '.' (e.g. "system.metric.name")
, statValue :: Metric -- ^ 'Metric' value.
, statTags :: Tags -- ^ Key/value 'Tags' (optional).
}
-- | The various supported metric types in Datadog.
data Metric
= Counter Int -- Counters track how many times something happens per second.
| Gauge Double -- Gauges track the ebb and flow of a particular metric value over time.
| Histogram Double -- Histograms calculate the statistical distribution of any kind of value.
| Set Double -- Sets count the number of unique elements in a group
| Timer Double -- Timers measure the amount of time a section of code takes to execute.
= Counter Int -- ^ Counters track how many times something happens per second.
| Gauge Double -- ^ Gauges track the ebb and flow of a particular metric value over time.
| Histogram Double -- ^ Histograms calculate the statistical distribution of any kind of value.
| Set Double -- ^ Sets count the number of unique elements in a group
| Timer Double -- ^ Timers measure the amount of time a section of code takes to execute.
-- Tags are just key/value annotations. Values can blank.
-- | Tags are key/value annotations. Values can blank.
type Tags = [(String, String)]
-- Increment a counter.
-- | Increment a counter.
increment :: String -> Tags -> Stat
increment n = count n 1
-- Decrement a counter.
-- | Decrement a counter.
decrement :: String -> Tags -> Stat
decrement n = count n (-1)
-- Arbitrary count.
-- | Arbitrary count.
count :: String -> Int -> Tags -> Stat
count n v = Stat n (Counter v)
-- Arbitrary gauge value.
-- | Arbitrary gauge value.
gauge :: String -> Double -> Tags -> Stat
gauge n v = Stat n (Gauge v)
-- Timing in milliseconds.
-- | Timing in milliseconds.
timing :: String -> Double -> Tags -> Stat
timing n v = Stat n (Timer v)
-- Histogram measurement.
-- | Histogram measurement.
histogram :: String -> Double -> Tags -> Stat
histogram n v = Stat n (Histogram v)
-- Set counter.
-- | Set counter.
set :: String -> Double -> Tags -> Stat
set n v = Stat n (Set v)
data StatsClient
= StatsClient
{ udpSocket :: Socket
, namespace :: String
, udpHost :: String
, udpPort :: String
{ statsClientUDPSocket :: Socket
, statsClientNamespace :: String
, statsClientUDPHost :: String
, statsClientUDPPort :: String
}
-- Create a default stats client. This function consults two optional
-- environment variables for the stats URI (default: 127.0.0.1:28125).
-- * STATS_ADDR - String URI to send stats to in the form of `host:port`.
-- * DOGSTATSD_HOST - String hostname which will override the above host.
-- Generally used on kubes pods.
-- | Create a default stats client. This function consults two optional
-- environment variables for the stats URI (default: 127.0.0.1:28125).
-- * STATS_ADDR - String URI to send stats to in the form of `host:port`.
-- * DOGSTATSD_HOST - String hostname which will override the above host.
-- Generally used on kubes pods.
defaultStatsClient :: IO StatsClient
defaultStatsClient = do
addr <- lookupEnv "STATS_ADDR"
@ -108,34 +105,26 @@ defaultStatsClient = do
statsClient host port "semantic"
where
defaultHostPort = ("127.0.0.1", "28125")
defaultHost = "127.0.0.1"
defaultPort = "28125"
defaultHostPort = (defaultHost, defaultPort)
parseAddr = maybe defaultHostPort parseAuthority . parseURI
parseAuthority = maybe defaultHostPort (uriRegName &&& (drop 1 . uriPort)) . uriAuthority
parseAuthority = maybe defaultHostPort (uriRegName &&& (parsePort . uriPort)) . uriAuthority
parsePort s | null s = defaultPort
| otherwise = dropWhile (':' ==) s
-- Create a StatsClient at the specified host and port with a namespace prefix.
-- | Create a StatsClient at the specified host and port with a namespace prefix.
statsClient :: String -> String -> String -> IO StatsClient
statsClient host port namespace = do
statsClient host port statsClientNamespace = do
(addr:_) <- getAddrInfo Nothing (Just host) (Just port)
sock <- socket (addrFamily addr) Datagram defaultProtocol
connect sock (addrAddress addr)
pure (StatsClient sock namespace host port)
pure (StatsClient sock statsClientNamespace host port)
-- Send a stat over the StatsClient's socket.
sendStats :: StatsClient -> Stat -> IO ()
sendStats StatsClient{..} = void . tryIOError . sendAll udpSocket . B.pack . renderDatagram namespace
-- Queue a stat to be sent.
queueStat :: AsyncQ Stat StatsClient -> Stat -> IO ()
queueStat AsyncQ{..} = atomically . writeTMQueue queue
-- Drains stat messages from the queue and sends those stats over the configured
-- UDP socket. Intended to be run in a dedicated thread.
statSink :: StatsClient -> TMQueue Stat -> IO ()
statSink client q = do
stat <- atomically (readTMQueue q)
maybe (pure ()) send stat
where send stat = sendStats client stat >> statSink client q
-- | Send a stat over the StatsClient's socket.
sendStat :: StatsClient -> Stat -> IO ()
sendStat StatsClient{..} = void . tryIOError . sendAll statsClientUDPSocket . B.pack . renderDatagram statsClientNamespace
-- Datagram Rendering
@ -148,7 +137,7 @@ class Render a where
renderString :: String -> RenderS
renderString = (<>)
-- Render a Stat (with namespace prefix) to a datagram String.
-- | Render a Stat (with namespace prefix) to a datagram String.
renderDatagram :: String -> Stat -> String
renderDatagram namespace stat = renderString prefix (renders stat "")
where prefix | null namespace = ""
@ -158,10 +147,10 @@ renderDatagram namespace stat = renderString prefix (renders stat "")
instance Render Stat where
renders Stat{..}
= renderString name
= renderString statName
. renderString ":"
. renders value
. renders tags
. renders statValue
. renders statTags
instance Render Metric where
renders (Counter x) = renders x . renderString "|c"

View File

@ -157,15 +157,15 @@ runTask = runTaskWithOptions defaultOptions
runTaskWithOptions :: Options -> Task a -> IO a
runTaskWithOptions options task = do
options <- configureOptionsForHandle stderr options
statter <- defaultStatsClient >>= newQueue statSink
logger <- newQueue logSink options
statter <- defaultStatsClient >>= newQueue sendStat
logger <- newQueue logMessage options
result <- run options logger statter task
closeQueue statter
closeQueue logger
either (die . displayException) pure result
where run :: Options -> AsyncQ Message Options -> AsyncQ Stat StatsClient -> Task a -> IO (Either SomeException a)
where run :: Options -> AsyncQueue Message Options -> AsyncQueue Stat StatsClient -> Task a -> IO (Either SomeException a)
run options logger statter = go
where go :: Task a -> IO (Either SomeException a)
go = iterFreerA (\ task yield -> case task of
@ -173,7 +173,7 @@ runTaskWithOptions options task = do
ReadBlobPairs source -> (either Files.readBlobPairsFromHandle (traverse (traverse (uncurry Files.readFile))) source >>= yield) `catchError` (pure . Left . toException)
WriteToOutput destination contents -> either B.hPutStr B.writeFile destination contents >>= yield
WriteLog level message pairs -> queueLogMessage logger level message pairs >>= yield
WriteStat stat -> queueStat statter stat >>= yield
WriteStat stat -> queue statter stat >>= yield
Time message pairs task -> do
start <- Time.getCurrentTime
!res <- go task

View File

@ -24,21 +24,27 @@ spec = do
describe "defaultStatsClient" $ do
it "sets appropriate defaults" $ do
StatsClient{..} <- defaultStatsClient
namespace `shouldBe` "semantic"
udpHost `shouldBe` "127.0.0.1"
udpPort `shouldBe` "28125"
statsClientNamespace `shouldBe` "semantic"
statsClientUDPHost `shouldBe` "127.0.0.1"
statsClientUDPPort `shouldBe` "28125"
around (withEnvironment "STATS_ADDR" "localhost:8125") $
it "takes STATS_ADDR from environment" $ do
StatsClient{..} <- defaultStatsClient
udpHost `shouldBe` "localhost"
udpPort `shouldBe` "8125"
statsClientUDPHost `shouldBe` "localhost"
statsClientUDPPort `shouldBe` "8125"
around (withEnvironment "STATS_ADDR" "localhost") $
it "handles STATS_ADDR with just hostname" $ do
StatsClient{..} <- defaultStatsClient
statsClientUDPHost `shouldBe` "localhost"
statsClientUDPPort `shouldBe` "28125"
around (withEnvironment "DOGSTATSD_HOST" "0.0.0.0") $
it "takes DOGSTATSD_HOST from environment" $ do
StatsClient{..} <- defaultStatsClient
udpHost `shouldBe` "0.0.0.0"
udpPort `shouldBe` "28125"
statsClientUDPHost `shouldBe` "0.0.0.0"
statsClientUDPPort `shouldBe` "28125"
describe "renderDatagram" $ do
let key = "app.metric"
@ -51,7 +57,7 @@ spec = do
it "renders count" $
renderDatagram "" (count key 8 []) `shouldBe` "app.metric:8|c"
it "renders namespace" $
it "renders statsClientNamespace" $
renderDatagram "pre" (increment key []) `shouldBe` "pre.app.metric:1|c"
describe "tags" $ do
@ -68,10 +74,10 @@ spec = do
let inc = increment key [("key", "value"), ("a", "")]
renderDatagram "" inc `shouldBe` "app.metric:1|c|#key:value,a"
describe "sendStats" $
describe "sendStat" $
it "delivers datagram" $ do
client@StatsClient{..} <- defaultStatsClient
withSocketPair $ \(clientSoc, serverSoc) -> do
sendStats client { udpSocket = clientSoc } (increment "app.metric" [])
sendStat client { statsClientUDPSocket = clientSoc } (increment "app.metric" [])
info <- recv serverSoc 1024
info `shouldBe` "semantic.app.metric:1|c"