mirror of
https://github.com/github/semantic.git
synced 2024-12-29 01:42:43 +03:00
Telemetry interface handles details of creating/closing async queues
This commit is contained in:
parent
148961ac4e
commit
033a748ed7
@ -43,7 +43,7 @@ data Options
|
||||
, optionsFailOnWarning :: Bool -- ^ Should semantic fail fast on assignment warnings (for testing)
|
||||
}
|
||||
|
||||
data StatsAddr = StatsAddr { addrHost :: String, addrPort :: String }
|
||||
data StatsAddr = StatsAddr { addrHost :: Stat.Host, addrPort :: Stat.Port }
|
||||
|
||||
defaultOptions :: Options
|
||||
defaultOptions = Options (Just Warning) Nothing False
|
||||
@ -76,40 +76,34 @@ defaultConfig' options@Options{..} = do
|
||||
, configOptions = options
|
||||
}
|
||||
|
||||
withTelemetry :: Config -> (TelemetryQueues -> IO c) -> IO c
|
||||
withTelemetry config action =
|
||||
withLoggerFromConfig config $ \logger ->
|
||||
withHaystackFromConfig config (queueLogMessage logger Error) $ \haystack ->
|
||||
withStatterFromConfig config $ \statter ->
|
||||
action (TelemetryQueues logger statter haystack)
|
||||
|
||||
defaultHaystackFromConfig :: Config -> Haystack.ErrorLogger -> IO HaystackQueue
|
||||
defaultHaystackFromConfig c@Config{..} logError = haystackClientFromConfig c >>= newAsyncQueue configMaxTelemetyQueueSize (Haystack.reportError logError)
|
||||
withLoggerFromConfig :: Config -> (LogQueue -> IO c) -> IO c
|
||||
withLoggerFromConfig Config{..} = withLogger opts configMaxTelemetyQueueSize
|
||||
where opts = LogOptions {
|
||||
logOptionsLevel = optionsLogLevel configOptions
|
||||
, logOptionsFormatter = configLogFormatter
|
||||
, logOptionsContext =
|
||||
[ ("app", configAppName)
|
||||
, ("pid", show configProcessID)
|
||||
, ("hostname", configHostName)
|
||||
, ("sha", buildSHA)
|
||||
] <> [("request_id", x) | x <- toList (optionsRequestID configOptions) ]
|
||||
}
|
||||
|
||||
haystackClientFromConfig :: Config -> IO HaystackClient
|
||||
haystackClientFromConfig Config{..} = haystackClient configHaystackURL tlsManagerSettings configAppName
|
||||
withHaystackFromConfig :: Config -> Haystack.ErrorLogger -> (HaystackQueue -> IO c) -> IO c
|
||||
withHaystackFromConfig Config{..} errorLogger =
|
||||
withHaystack configHaystackURL tlsManagerSettings configAppName errorLogger configMaxTelemetyQueueSize
|
||||
|
||||
|
||||
withLogger :: Config -> (LogQueue -> IO c) -> IO c
|
||||
withLogger c = bracket (defaultLoggerFromConfig c) closeAsyncQueue
|
||||
|
||||
defaultLoggerFromConfig :: Config -> IO LogQueue
|
||||
defaultLoggerFromConfig Config{..} =
|
||||
newAsyncQueue configMaxTelemetyQueueSize Log.writeLogMessage LogOptions {
|
||||
logOptionsLevel = optionsLogLevel configOptions
|
||||
, logOptionsFormatter = configLogFormatter
|
||||
, logOptionsContext =
|
||||
[ ("app", configAppName)
|
||||
, ("pid", show configProcessID)
|
||||
, ("hostname", configHostName)
|
||||
, ("sha", buildSHA)
|
||||
] <> [("request_id", x) | x <- toList (optionsRequestID configOptions) ]
|
||||
}
|
||||
|
||||
withStatter :: Config -> (StatQueue -> IO c) -> IO c
|
||||
withStatter c = bracket (defaultStatterFromConfig c) $ \statter -> do
|
||||
closeAsyncQueue statter
|
||||
Stat.closeStatClient (asyncQueueExtra statter)
|
||||
|
||||
defaultStatterFromConfig :: Config -> IO StatQueue
|
||||
defaultStatterFromConfig c@Config{..} = statsClientFromConfig c >>= newAsyncQueue configMaxTelemetyQueueSize Stat.sendStat
|
||||
|
||||
statsClientFromConfig :: Config -> IO StatsClient
|
||||
statsClientFromConfig Config{..} = statsClient (addrHost configStatsAddr) (addrPort configStatsAddr) configAppName
|
||||
withStatterFromConfig :: Config -> (StatQueue -> IO c) -> IO c
|
||||
withStatterFromConfig Config{..} = withStatter host port configAppName configMaxTelemetyQueueSize
|
||||
where host = addrHost configStatsAddr
|
||||
port = addrPort configStatsAddr
|
||||
|
||||
lookupStatsAddr :: IO StatsAddr
|
||||
lookupStatsAddr = do
|
||||
|
@ -75,7 +75,6 @@ import Semantic.Distribute
|
||||
import qualified Semantic.IO as IO
|
||||
import Semantic.Resolution
|
||||
import Semantic.Telemetry
|
||||
import Semantic.Telemetry.Stat as Stat
|
||||
import Serializing.Format hiding (Options)
|
||||
import System.Exit (die)
|
||||
|
||||
@ -130,9 +129,8 @@ runTask = runTaskWithOptions defaultOptions
|
||||
runTaskWithOptions :: Options -> TaskEff a -> IO a
|
||||
runTaskWithOptions opts task = do
|
||||
config <- defaultConfig' opts
|
||||
result <- withLogger config $ \logger ->
|
||||
withStatter config $ \statter ->
|
||||
runTaskWithConfig config logger statter task
|
||||
result <- withTelemetry config $ \(TelemetryQueues logger statter _) ->
|
||||
runTaskWithConfig config logger statter task
|
||||
either (die . displayException) pure result
|
||||
|
||||
-- | Execute a 'TaskEff' yielding its result value in 'IO'.
|
||||
@ -197,26 +195,26 @@ runParser blob@Blob{..} parser = case parser of
|
||||
|
||||
AssignmentParser parser assignment -> do
|
||||
ast <- runParser blob parser `catchError` \ (SomeException err) -> do
|
||||
writeStat (Stat.increment "parse.parse_failures" languageTag)
|
||||
writeStat (increment "parse.parse_failures" languageTag)
|
||||
writeLog Error "failed parsing" (("task", "parse") : blobFields)
|
||||
throwError (toException err)
|
||||
config <- ask
|
||||
time "parse.assign" languageTag $
|
||||
case Assignment.assign blobSource assignment ast of
|
||||
Left err -> do
|
||||
writeStat (Stat.increment "parse.assign_errors" languageTag)
|
||||
writeStat (increment "parse.assign_errors" languageTag)
|
||||
logError config Error blob err (("task", "assign") : blobFields)
|
||||
throwError (toException err)
|
||||
Right term -> do
|
||||
for_ (errors term) $ \ err -> case Error.errorActual err of
|
||||
Just "ParseError" -> do
|
||||
writeStat (Stat.increment "parse.parse_errors" languageTag)
|
||||
writeStat (increment "parse.parse_errors" languageTag)
|
||||
logError config Warning blob err (("task", "parse") : blobFields)
|
||||
_ -> do
|
||||
writeStat (Stat.increment "parse.assign_warnings" languageTag)
|
||||
writeStat (increment "parse.assign_warnings" languageTag)
|
||||
logError config Warning blob err (("task", "assign") : blobFields)
|
||||
when (optionsFailOnWarning (configOptions config)) $ throwError (toException err)
|
||||
writeStat (Stat.count "parse.nodes" (length term) languageTag)
|
||||
writeStat (count "parse.nodes" (length term) languageTag)
|
||||
pure term
|
||||
MarkdownParser ->
|
||||
time "parse.cmark_parse" languageTag $
|
||||
|
@ -2,9 +2,13 @@
|
||||
module Semantic.Telemetry
|
||||
(
|
||||
-- Async telemetry interface
|
||||
LogQueue
|
||||
withLogger
|
||||
, withHaystack
|
||||
, withStatter
|
||||
, LogQueue
|
||||
, StatQueue
|
||||
, HaystackQueue
|
||||
, TelemetryQueues(..)
|
||||
, queueLogMessage
|
||||
, queueErrorReport
|
||||
, queueStat
|
||||
@ -43,20 +47,54 @@ module Semantic.Telemetry
|
||||
, ignoreTelemetry
|
||||
) where
|
||||
|
||||
import Control.Exception
|
||||
import Control.Monad.Effect
|
||||
import Control.Monad.IO.Class
|
||||
import Semantic.Telemetry.AsyncQueue
|
||||
import Semantic.Telemetry.Haystack
|
||||
import Semantic.Telemetry.Log
|
||||
import Semantic.Telemetry.Stat as Stat
|
||||
import Control.Exception
|
||||
import Control.Monad.Effect
|
||||
import Control.Monad.IO.Class
|
||||
import qualified Data.Time.Clock.POSIX as Time (getCurrentTime)
|
||||
import qualified Data.Time.LocalTime as LocalTime
|
||||
import Network.HTTP.Client
|
||||
import Semantic.Telemetry.AsyncQueue
|
||||
import Semantic.Telemetry.Haystack
|
||||
import Semantic.Telemetry.Log
|
||||
import Semantic.Telemetry.Stat as Stat
|
||||
|
||||
type LogQueue = AsyncQueue Message LogOptions
|
||||
type StatQueue = AsyncQueue Stat StatsClient
|
||||
type HaystackQueue = AsyncQueue ErrorReport HaystackClient
|
||||
|
||||
data TelemetryQueues
|
||||
= TelemetryQueues
|
||||
{ telemetryLogger :: LogQueue
|
||||
, telemetryStatter :: StatQueue
|
||||
, telemetryHaystack :: HaystackQueue
|
||||
}
|
||||
|
||||
-- | Execute an action in IO with access to a logger (async log queue).
|
||||
withLogger :: LogOptions -- ^ Log options
|
||||
-> Int -- ^ Max stats queue size before dropping stats
|
||||
-> (LogQueue -> IO c) -- ^ Action in IO
|
||||
-> IO c
|
||||
withLogger options size = bracket setup closeAsyncQueue
|
||||
where setup = newAsyncQueue size writeLogMessage options
|
||||
|
||||
-- | Execute an action in IO with access to haystack (async error reporting queue).
|
||||
withHaystack :: Maybe String -> ManagerSettings -> String -> ErrorLogger -> Int -> (HaystackQueue -> IO c) -> IO c
|
||||
withHaystack url settings appName errorLogger size = bracket setup closeAsyncQueue
|
||||
where setup = haystackClient url settings appName >>= newAsyncQueue size (reportError errorLogger)
|
||||
|
||||
-- | Execute an action in IO with access to a statter (async stat queue).
|
||||
-- Handles the bracketed setup and teardown of the underlying 'AsyncQueue' and
|
||||
-- 'StatsClient'.
|
||||
withStatter :: Host -- ^ Statsd host
|
||||
-> Port -- ^ Statsd port
|
||||
-> Namespace -- ^ Namespace prefix for stats
|
||||
-> Int -- ^ Max stats queue size before dropping stats
|
||||
-> (StatQueue -> IO c) -- ^ Action in IO
|
||||
-> IO c
|
||||
withStatter host port ns size = bracket setup teardown
|
||||
where setup = statsClient host port ns >>= newAsyncQueue size sendStat
|
||||
teardown statter = closeAsyncQueue statter >> Stat.closeStatClient (asyncQueueExtra statter)
|
||||
|
||||
-- | Queue a message to be logged.
|
||||
queueLogMessage :: MonadIO io => LogQueue -> Level -> String -> [(String, String)] -> io ()
|
||||
queueLogMessage q@AsyncQueue{..} level message pairs
|
||||
|
@ -11,6 +11,9 @@ module Semantic.Telemetry.Stat
|
||||
, set
|
||||
, Stat
|
||||
, Tags
|
||||
, Host
|
||||
, Port
|
||||
, Namespace
|
||||
|
||||
-- Client
|
||||
, statsClient
|
||||
@ -99,17 +102,21 @@ data StatsClient
|
||||
= StatsClient
|
||||
{ statsClientUDPSocket :: Socket
|
||||
, statsClientNamespace :: String
|
||||
, statsClientUDPHost :: String
|
||||
, statsClientUDPPort :: String
|
||||
, statsClientUDPHost :: Host
|
||||
, statsClientUDPPort :: Port
|
||||
}
|
||||
|
||||
type Host = String
|
||||
type Port = String
|
||||
type Namespace = String
|
||||
|
||||
-- | Create a StatsClient at the specified host and port with a namespace prefix.
|
||||
statsClient :: MonadIO io => String -> String -> String -> io StatsClient
|
||||
statsClient host port statsClientNamespace = liftIO $ do
|
||||
statsClient :: MonadIO io => Host -> Port -> Namespace -> io StatsClient
|
||||
statsClient host port ns = liftIO $ do
|
||||
(addr:_) <- getAddrInfo Nothing (Just host) (Just port)
|
||||
sock <- socket (addrFamily addr) Datagram defaultProtocol
|
||||
connect sock (addrAddress addr)
|
||||
pure (StatsClient sock statsClientNamespace host port)
|
||||
pure (StatsClient sock ns host port)
|
||||
|
||||
-- | Close the client's underlying socket.
|
||||
closeStatClient :: MonadIO io => StatsClient -> io ()
|
||||
|
Loading…
Reference in New Issue
Block a user