mirror of
https://github.com/github/semantic.git
synced 2024-11-24 08:54:07 +03:00
Write some UDP packets
This commit is contained in:
parent
6abc878243
commit
f80445945c
@ -102,6 +102,7 @@ library
|
||||
, mersenne-random-pure64
|
||||
, MonadRandom
|
||||
, mtl
|
||||
, network
|
||||
, optparse-applicative
|
||||
, parallel
|
||||
, parsers
|
||||
|
@ -1,14 +1,16 @@
|
||||
module Semantic.Stat where
|
||||
|
||||
|
||||
import Control.Concurrent.STM.TMQueue
|
||||
import qualified Control.Concurrent.Async as Async
|
||||
import GHC.Conc
|
||||
import Data.List (intercalate)
|
||||
import Data.Monoid
|
||||
import Semantic.Queue
|
||||
|
||||
import System.IO
|
||||
import System.IO.Error
|
||||
import Network.Socket (Socket(..), SocketType(..), socket, connect, getAddrInfo, addrFamily, addrAddress, defaultProtocol)
|
||||
import Network.Socket.ByteString
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
|
||||
data Metric
|
||||
= Counter Int
|
||||
@ -35,26 +37,54 @@ increment n = count n 1
|
||||
decrement :: String -> Tags -> Stat
|
||||
decrement n = count n (-1)
|
||||
|
||||
queueStat :: AsyncQ Stat () -> Stat -> IO ()
|
||||
|
||||
data UdpClient
|
||||
= UdpClient
|
||||
{ udpSocket :: Socket
|
||||
, udpNamespace :: String
|
||||
}
|
||||
|
||||
statsClient :: String -> Int -> IO UdpClient
|
||||
statsClient host port = do
|
||||
(addr:_) <- getAddrInfo Nothing (Just host) (Just (show port))
|
||||
sock <- socket (addrFamily addr) Datagram defaultProtocol
|
||||
connect sock (addrAddress addr)
|
||||
pure (UdpClient sock "")
|
||||
|
||||
sendStats :: UdpClient -> String -> IO ()
|
||||
sendStats UdpClient{..} d = do
|
||||
res <- tryIOError (sendAll udpSocket msg)
|
||||
pure $ either (const ()) id res
|
||||
where msg = B.pack (prefix <> d)
|
||||
prefix | null udpNamespace = ""
|
||||
| otherwise = udpNamespace <> "."
|
||||
|
||||
|
||||
queueStat :: AsyncQ Stat UdpClient -> Stat -> IO ()
|
||||
queueStat AsyncQ{..} = atomically . writeTMQueue queue
|
||||
|
||||
statSink :: () -> TMQueue Stat -> IO ()
|
||||
statSink x q = do
|
||||
statSink :: UdpClient -> TMQueue Stat -> IO ()
|
||||
statSink client q = do
|
||||
stat <- atomically (readTMQueue q)
|
||||
case stat of
|
||||
Just stat -> do
|
||||
hPutStrLn stderr $ renders stat ""
|
||||
statSink x q
|
||||
_ <- sendStats client (renders stat "")
|
||||
statSink client q
|
||||
_ -> pure ()
|
||||
|
||||
|
||||
-- Rendering
|
||||
-- Datagram Rendering
|
||||
|
||||
type RenderS = String -> String
|
||||
|
||||
class Render a where
|
||||
renders :: a -> RenderS
|
||||
|
||||
renderString :: String -> RenderS
|
||||
renderString = (<>)
|
||||
|
||||
|
||||
-- Instances
|
||||
|
||||
instance Render Stat where
|
||||
@ -65,11 +95,11 @@ instance Render Stat where
|
||||
. renders tags
|
||||
|
||||
instance Render Metric where
|
||||
renders (Counter x) = renderString "c|" . renders x
|
||||
renders (Gauge x) = renderString "g|" . renders x
|
||||
renders (Histogram x) = renderString "h|" . renders x
|
||||
renders (Set x) = renderString "s|" . renders x
|
||||
renders (Timer x) = renderString "ms|" . renders x
|
||||
renders (Counter x) = renderString "c|" . renders x
|
||||
renders (Gauge x) = renderString "g|" . renders x
|
||||
renders (Histogram x) = renderString "h|" . renders x
|
||||
renders (Set x) = renderString "s|" . renders x
|
||||
renders (Timer x) = renderString "ms|" . renders x
|
||||
|
||||
instance Render Tags where
|
||||
renders [] = renderString ""
|
||||
@ -83,6 +113,3 @@ instance Render Int where
|
||||
|
||||
instance Render Double where
|
||||
renders = shows
|
||||
|
||||
renderString :: String -> RenderS
|
||||
renderString = (<>)
|
||||
|
@ -62,6 +62,7 @@ import Semantic.Log
|
||||
import Semantic.Stat as Stat
|
||||
import Semantic.Queue
|
||||
|
||||
|
||||
data TaskF output where
|
||||
ReadBlobs :: Either Handle [(FilePath, Maybe Language)] -> TaskF [Blob]
|
||||
ReadBlobPairs :: Either Handle [Both (FilePath, Maybe Language)] -> TaskF [Both Blob]
|
||||
@ -159,15 +160,18 @@ runTask = runTaskWithOptions defaultOptions
|
||||
runTaskWithOptions :: Options -> Task a -> IO a
|
||||
runTaskWithOptions options task = do
|
||||
options <- configureOptionsForHandle stderr options
|
||||
-- client <- dogStatsdClient "statsd://127.0.0.1:8125/semantic"
|
||||
client <- statsClient "127.0.0.1" 8125
|
||||
statter <- newQueue client statSink
|
||||
logger <- newQueue options logSink
|
||||
statter <- newQueue () statSink
|
||||
|
||||
result <- run options logger statter task
|
||||
|
||||
closeQueue logger
|
||||
closeQueue statter
|
||||
closeQueue logger
|
||||
either (die . displayException) pure result
|
||||
where run :: Options -> AsyncQ Message Options -> AsyncQ Stat () -> Task a -> IO (Either SomeException a)
|
||||
|
||||
where run :: Options -> AsyncQ Message Options -> AsyncQ Stat UdpClient -> Task a -> IO (Either SomeException a)
|
||||
run options logger statter = go
|
||||
where go :: Task a -> IO (Either SomeException a)
|
||||
go = iterFreerA (\ task yield -> case task of
|
||||
@ -204,10 +208,11 @@ runParser Options{..} blob@Blob{..} = go
|
||||
liftIO ((Right <$> parseToAST language blob) `catchError` (pure . Left . toException)) >>= either throwError pure
|
||||
AssignmentParser parser assignment -> do
|
||||
ast <- go parser `catchError` \ err -> do
|
||||
writeStat (Stat.increment "semantic.parse.errors" languageTag)
|
||||
writeStat (Stat.increment "parse.errors" languageTag)
|
||||
writeLog Error "failed parsing" (("tag", "parse") : blobFields) >> throwError err
|
||||
logTiming "assign" $ case Assignment.assign blobSource assignment ast of
|
||||
Left err -> do
|
||||
writeStat (Stat.increment "assign.errors" languageTag)
|
||||
let formatted = Error.formatError optionsPrintSource (optionsIsTerminal && optionsEnableColour) blob err
|
||||
writeLog Error formatted (("tag", "assign") : blobFields)
|
||||
throwError (toException err)
|
||||
|
Loading…
Reference in New Issue
Block a user