mirror of
https://github.com/github/semantic.git
synced 2024-11-28 01:47:01 +03:00
Continue to fill out stats functionality
This commit is contained in:
parent
f80445945c
commit
3d5cf4347e
@ -103,6 +103,7 @@ library
|
||||
, MonadRandom
|
||||
, mtl
|
||||
, network
|
||||
, network-uri
|
||||
, optparse-applicative
|
||||
, parallel
|
||||
, parsers
|
||||
|
@ -11,8 +11,8 @@ data AsyncQ a b
|
||||
, extra :: b
|
||||
}
|
||||
|
||||
newQueue :: b -> (b -> TMQueue a -> IO ()) -> IO (AsyncQ a b)
|
||||
newQueue b f = do
|
||||
newQueue :: (b -> TMQueue a -> IO ()) -> b-> IO (AsyncQ a b)
|
||||
newQueue f b = do
|
||||
q <- newTMQueueIO
|
||||
sink <- Async.async (f b q)
|
||||
pure (AsyncQ q sink b)
|
||||
|
@ -1,26 +1,40 @@
|
||||
module Semantic.Stat where
|
||||
module Semantic.Stat
|
||||
(
|
||||
-- Primary API for creating stats.
|
||||
increment
|
||||
, decrement
|
||||
, count
|
||||
, gauge
|
||||
, timing
|
||||
, histogram
|
||||
, set
|
||||
, Stat
|
||||
|
||||
, queueStat -- Queue a Stat to be sent
|
||||
, statSink -- Sink the queue to the network
|
||||
|
||||
-- Client
|
||||
, defaultStatsClient
|
||||
, StatClient
|
||||
) where
|
||||
|
||||
|
||||
import Control.Arrow ((&&&))
|
||||
import Control.Concurrent.STM.TMQueue
|
||||
import GHC.Conc
|
||||
import Data.Functor
|
||||
import Data.List (intercalate)
|
||||
import Data.Maybe
|
||||
import Data.Monoid
|
||||
import Semantic.Queue
|
||||
|
||||
import System.IO
|
||||
import System.IO.Error
|
||||
import GHC.Conc
|
||||
import Network.Socket (Socket(..), SocketType(..), socket, connect, getAddrInfo, addrFamily, addrAddress, defaultProtocol)
|
||||
import Network.Socket.ByteString
|
||||
import Network.URI
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import System.Environment
|
||||
import System.IO.Error
|
||||
import System.IO
|
||||
|
||||
data Metric
|
||||
= Counter Int
|
||||
| Gauge Double
|
||||
| Histogram Double
|
||||
| Set Double
|
||||
| Timer Double
|
||||
|
||||
type Tag = (String, String)
|
||||
type Tags = [Tag]
|
||||
import Semantic.Queue
|
||||
|
||||
data Stat
|
||||
= Stat { name :: String
|
||||
@ -28,50 +42,101 @@ data Stat
|
||||
, tags :: Tags
|
||||
}
|
||||
|
||||
count :: String -> Int -> Tags -> Stat
|
||||
count n v = Stat n (Counter v)
|
||||
data Metric
|
||||
= Counter Int -- Counters track how many times something happens per second.
|
||||
| Gauge Double -- Gauges track the ebb and flow of a particular metric value over time.
|
||||
| Histogram Double -- Histograms calculate the statistical distribution of any kind of value.
|
||||
| Set Double -- Sets count the number of unique elements in a group
|
||||
| Timer Double -- Timers measure the amount of time a section of code takes to execute.
|
||||
|
||||
type Tag = (String, String)
|
||||
|
||||
type Tags = [Tag]
|
||||
|
||||
|
||||
-- Increment a counter.
|
||||
increment :: String -> Tags -> Stat
|
||||
increment n = count n 1
|
||||
|
||||
-- Decrement a counter.
|
||||
decrement :: String -> Tags -> Stat
|
||||
decrement n = count n (-1)
|
||||
|
||||
-- Arbitrary count.
|
||||
count :: String -> Int -> Tags -> Stat
|
||||
count n v = Stat n (Counter v)
|
||||
|
||||
data UdpClient
|
||||
= UdpClient
|
||||
-- Arbitrary gauge value.
|
||||
gauge :: String -> Double -> Tags -> Stat
|
||||
gauge n v = Stat n (Gauge v)
|
||||
|
||||
-- Timing in milliseconds.
|
||||
timing :: String -> Double -> Tags -> Stat
|
||||
timing n v = Stat n (Timer v)
|
||||
|
||||
-- Histogram measurement.
|
||||
histogram :: String -> Double -> Tags -> Stat
|
||||
histogram n v = Stat n (Histogram v)
|
||||
|
||||
-- Set counter.
|
||||
set :: String -> Double -> Tags -> Stat
|
||||
set n v = Stat n (Set v)
|
||||
|
||||
data StatClient
|
||||
= StatClient
|
||||
{ udpSocket :: Socket
|
||||
, udpNamespace :: String
|
||||
, namespace :: String
|
||||
}
|
||||
|
||||
statsClient :: String -> Int -> IO UdpClient
|
||||
statsClient host port = do
|
||||
(addr:_) <- getAddrInfo Nothing (Just host) (Just (show port))
|
||||
-- Create a default stats client. This function consults two optional
|
||||
-- environment variables for the stats URI (default: 127.0.0.1:28125).
|
||||
-- * STATS_ADDR - String URI to send stats to in the form of `host:port`.
|
||||
-- * DOGSTATSD_HOST - String hostname which will override the above host.
|
||||
-- Generally used on kubes pods.
|
||||
defaultStatsClient :: IO StatClient
|
||||
defaultStatsClient = do
|
||||
addr <- lookupEnv "STATS_ADDR"
|
||||
let (host', port) = maybe defaultHostPort parseAddr addr
|
||||
|
||||
-- When running in Kubes, DOGSTATSD_HOST is set with the dogstatsd host.
|
||||
kubesHost <- lookupEnv "DOGSTATSD_HOST"
|
||||
let host = fromMaybe host' kubesHost
|
||||
|
||||
hPutStrLn stderr (host <> ":" <> port)
|
||||
statsClient host port "semantic"
|
||||
where
|
||||
defaultHostPort = ("127.0.0.1", "28125")
|
||||
parseAddr = maybe defaultHostPort parseAuthority . parseURI
|
||||
parseAuthority = maybe defaultHostPort (uriRegName &&& uriPort) . uriAuthority
|
||||
|
||||
|
||||
-- Create a StatsClient at the specified host and port with a namespace prefix.
|
||||
statsClient :: String -> String -> String -> IO StatClient
|
||||
statsClient host port namespace = do
|
||||
(addr:_) <- getAddrInfo Nothing (Just host) (Just port)
|
||||
sock <- socket (addrFamily addr) Datagram defaultProtocol
|
||||
connect sock (addrAddress addr)
|
||||
pure (UdpClient sock "")
|
||||
pure (StatClient sock namespace)
|
||||
|
||||
sendStats :: UdpClient -> String -> IO ()
|
||||
sendStats UdpClient{..} d = do
|
||||
res <- tryIOError (sendAll udpSocket msg)
|
||||
pure $ either (const ()) id res
|
||||
where msg = B.pack (prefix <> d)
|
||||
prefix | null udpNamespace = ""
|
||||
| otherwise = udpNamespace <> "."
|
||||
-- Send a stat over the StatClient's socket.
|
||||
sendStats :: StatClient -> Stat -> IO ()
|
||||
sendStats StatClient{..} = void . tryIOError . sendAll udpSocket . B.pack . datagram
|
||||
where datagram stat = renderString prefix (renders stat "")
|
||||
prefix | null namespace = ""
|
||||
| otherwise = namespace <> "."
|
||||
|
||||
|
||||
queueStat :: AsyncQ Stat UdpClient -> Stat -> IO ()
|
||||
-- Queue a stat to be sent.
|
||||
queueStat :: AsyncQ Stat StatClient -> Stat -> IO ()
|
||||
queueStat AsyncQ{..} = atomically . writeTMQueue queue
|
||||
|
||||
statSink :: UdpClient -> TMQueue Stat -> IO ()
|
||||
-- Drains stat messages from the queue and sends those stats over the configured
|
||||
-- UDP socket. Intended to be run in a dedicated thread.
|
||||
statSink :: StatClient -> TMQueue Stat -> IO ()
|
||||
statSink client q = do
|
||||
stat <- atomically (readTMQueue q)
|
||||
case stat of
|
||||
Just stat -> do
|
||||
hPutStrLn stderr $ renders stat ""
|
||||
_ <- sendStats client (renders stat "")
|
||||
statSink client q
|
||||
_ -> pure ()
|
||||
maybe (pure ()) send stat
|
||||
where send stat = sendStats client stat >> statSink client q
|
||||
|
||||
|
||||
-- Datagram Rendering
|
||||
|
@ -104,11 +104,11 @@ readBlobPairs from = ReadBlobPairs from `Then` return
|
||||
writeToOutput :: Either Handle FilePath -> B.ByteString -> Task ()
|
||||
writeToOutput path contents = WriteToOutput path contents `Then` return
|
||||
|
||||
|
||||
-- | A 'Task' which logs a message at a specific log level to stderr.
|
||||
writeLog :: Level -> String -> [(String, String)] -> Task ()
|
||||
writeLog level message pairs = WriteLog level message pairs `Then` return
|
||||
|
||||
-- | A 'Task' which writes a stat.
|
||||
writeStat :: Stat -> Task ()
|
||||
writeStat stat = WriteStat stat `Then` return
|
||||
|
||||
@ -160,18 +160,15 @@ runTask = runTaskWithOptions defaultOptions
|
||||
runTaskWithOptions :: Options -> Task a -> IO a
|
||||
runTaskWithOptions options task = do
|
||||
options <- configureOptionsForHandle stderr options
|
||||
-- client <- dogStatsdClient "statsd://127.0.0.1:8125/semantic"
|
||||
client <- statsClient "127.0.0.1" 8125
|
||||
statter <- newQueue client statSink
|
||||
logger <- newQueue options logSink
|
||||
statter <- defaultStatsClient >>= newQueue statSink
|
||||
logger <- newQueue logSink options
|
||||
|
||||
result <- run options logger statter task
|
||||
|
||||
closeQueue statter
|
||||
closeQueue logger
|
||||
either (die . displayException) pure result
|
||||
|
||||
where run :: Options -> AsyncQ Message Options -> AsyncQ Stat UdpClient -> Task a -> IO (Either SomeException a)
|
||||
where run :: Options -> AsyncQ Message Options -> AsyncQ Stat StatClient -> Task a -> IO (Either SomeException a)
|
||||
run options logger statter = go
|
||||
where go :: Task a -> IO (Either SomeException a)
|
||||
go = iterFreerA (\ task yield -> case task of
|
||||
|
Loading…
Reference in New Issue
Block a user