mirror of
https://github.com/facebook/Haxl.git
synced 2024-12-23 16:53:02 +03:00
Add tracking of Fetch Stats for the same batch (#111)
Summary: Pull Request resolved: https://github.com/facebook/Haxl/pull/111 Right now BackgroundFetches produce multiple FetchStats for the same batch, but it is not possible to link these together to get an idea of how big the batch was. This introduces a field to FetchStats that can be used to link batches together as well as a utility method to do this in a default manner Reviewed By: watashi Differential Revision: D19469048 fbshipit-source-id: fce687c49ac4cbdc7cbd6804f37b6f120d7efad3
This commit is contained in:
parent
f18121aaeb
commit
14ebbe2c85
@ -50,6 +50,7 @@ module Haxl.Core (
|
||||
, numFetches
|
||||
, ppStats
|
||||
, ppFetchStats
|
||||
, aggregateFetchBatches
|
||||
, Profile
|
||||
, emptyProfile
|
||||
, profile
|
||||
|
@ -347,7 +347,7 @@ performRequestStore env reqStore =
|
||||
-- complete, and all of the 'ResultVar's are full.
|
||||
performFetches
|
||||
:: forall u w. Env u w -> [BlockedFetches u] -> IO ()
|
||||
performFetches env@Env{flags=f, statsRef=sref} jobs = do
|
||||
performFetches env@Env{flags=f, statsRef=sref, statsBatchIdRef=sbref} jobs = do
|
||||
t0 <- getTimestamp
|
||||
|
||||
let
|
||||
@ -378,7 +378,7 @@ performFetches env@Env{flags=f, statsRef=sref} jobs = do
|
||||
return
|
||||
$ FetchToDo reqs
|
||||
$ (if report f >= 2
|
||||
then wrapFetchInStats sref dsName (length reqs)
|
||||
then wrapFetchInStats sref sbref dsName (length reqs)
|
||||
else id)
|
||||
$ wrapFetchInTrace i (length reqs) dsName
|
||||
$ wrapFetchInCatch reqs
|
||||
@ -439,21 +439,24 @@ wrapFetchInCatch reqs fetch =
|
||||
|
||||
wrapFetchInStats
|
||||
:: IORef Stats
|
||||
-> IORef Int
|
||||
-> Text
|
||||
-> Int
|
||||
-> PerformFetch req
|
||||
-> PerformFetch req
|
||||
|
||||
wrapFetchInStats !statsRef dataSource batchSize perform = do
|
||||
wrapFetchInStats !statsRef !batchIdRef dataSource batchSize perform = do
|
||||
case perform of
|
||||
SyncFetch f ->
|
||||
SyncFetch $ \reqs -> do
|
||||
bid <- newBatchId
|
||||
fail_ref <- newIORef 0
|
||||
(t0,t,alloc,_) <- statsForIO (f (map (addFailureCount fail_ref) reqs))
|
||||
failures <- readIORef fail_ref
|
||||
updateFetchStats t0 t alloc batchSize failures
|
||||
updateFetchStats bid t0 t alloc batchSize failures
|
||||
AsyncFetch f -> do
|
||||
AsyncFetch $ \reqs inner -> do
|
||||
bid <- newBatchId
|
||||
inner_r <- newIORef (0, 0)
|
||||
fail_ref <- newIORef 0
|
||||
let inner' = do
|
||||
@ -463,20 +466,22 @@ wrapFetchInStats !statsRef dataSource batchSize perform = do
|
||||
(t0, totalTime, totalAlloc, _) <- statsForIO (f reqs' inner')
|
||||
(innerTime, innerAlloc) <- readIORef inner_r
|
||||
failures <- readIORef fail_ref
|
||||
updateFetchStats t0 (totalTime - innerTime) (totalAlloc - innerAlloc)
|
||||
batchSize failures
|
||||
updateFetchStats bid t0 (totalTime - innerTime)
|
||||
(totalAlloc - innerAlloc) batchSize failures
|
||||
BackgroundFetch io -> do
|
||||
BackgroundFetch $ \reqs -> do
|
||||
bid <- newBatchId
|
||||
startTime <- getTimestamp
|
||||
io (map (addTimer startTime) reqs)
|
||||
io (map (addTimer bid startTime) reqs)
|
||||
where
|
||||
newBatchId = atomicModifyIORef' batchIdRef $ \x -> (x+1,x+1)
|
||||
statsForIO io = do
|
||||
prevAlloc <- getAllocationCounter
|
||||
(t0,t,a) <- time io
|
||||
postAlloc <- getAllocationCounter
|
||||
return (t0,t, fromIntegral $ prevAlloc - postAlloc, a)
|
||||
|
||||
addTimer t0 (BlockedFetch req (ResultVar fn)) =
|
||||
addTimer bid t0 (BlockedFetch req (ResultVar fn)) =
|
||||
BlockedFetch req $ ResultVar $ \result isChildThread -> do
|
||||
t1 <- getTimestamp
|
||||
-- We cannot measure allocation easily for BackgroundFetch. Here we
|
||||
@ -486,21 +491,22 @@ wrapFetchInStats !statsRef dataSource batchSize perform = do
|
||||
-- meaningful.
|
||||
-- see Note [tracking allocation in child threads]
|
||||
allocs <- if isChildThread then getAllocationCounter else return 0
|
||||
updateFetchStats t0 (t1 - t0)
|
||||
updateFetchStats bid t0 (t1 - t0)
|
||||
(negate allocs)
|
||||
1 -- batch size: we don't know if this is a batch or not
|
||||
(if isLeft result then 1 else 0) -- failures
|
||||
fn result isChildThread
|
||||
|
||||
updateFetchStats
|
||||
:: Timestamp -> Microseconds -> Int64 -> Int -> Int -> IO ()
|
||||
updateFetchStats start time space batch failures = do
|
||||
:: Int -> Timestamp -> Microseconds -> Int64 -> Int -> Int -> IO ()
|
||||
updateFetchStats bid start time space batch failures = do
|
||||
let this = FetchStats { fetchDataSource = dataSource
|
||||
, fetchBatchSize = batch
|
||||
, fetchStart = start
|
||||
, fetchDuration = time
|
||||
, fetchSpace = space
|
||||
, fetchFailures = failures }
|
||||
, fetchFailures = failures
|
||||
, fetchBatchId = bid }
|
||||
atomicModifyIORef' statsRef $ \(Stats fs) -> (Stats (this : fs), ())
|
||||
|
||||
addFailureCount :: IORef Int -> BlockedFetch r -> BlockedFetch r
|
||||
|
@ -163,6 +163,10 @@ data Env u w = Env
|
||||
, statsRef :: {-# UNPACK #-} !(IORef Stats)
|
||||
-- ^ statistics, collected according to the 'report' level in 'flags'.
|
||||
|
||||
, statsBatchIdRef :: {-# UNPACK #-} !(IORef Int)
|
||||
-- ^ keeps track of a Unique ID for each batch dispatched with stats
|
||||
-- enabled, for aggregating after.
|
||||
|
||||
, profLabel :: ProfileLabel
|
||||
-- ^ current profiling label, see 'withLabel'
|
||||
|
||||
@ -226,6 +230,7 @@ caches env = (dataCache env, memoCache env)
|
||||
initEnvWithData :: StateStore -> u -> Caches u w -> IO (Env u w)
|
||||
initEnvWithData states e (dcache, mcache) = do
|
||||
sref <- newIORef emptyStats
|
||||
sbref <- newIORef 0
|
||||
pref <- newIORef emptyProfile
|
||||
rs <- newIORef noRequests -- RequestStore
|
||||
rq <- newIORef JobNil
|
||||
@ -240,6 +245,7 @@ initEnvWithData states e (dcache, mcache) = do
|
||||
, userEnv = e
|
||||
, states = states
|
||||
, statsRef = sref
|
||||
, statsBatchIdRef = sbref
|
||||
, profLabel = "MAIN"
|
||||
, profRef = pref
|
||||
, reqStoreRef = rs
|
||||
@ -865,11 +871,11 @@ unsafeLiftIO m = GenHaxl $ \_env -> Done <$> m
|
||||
-- Note: this function does not catch async exceptions. This is a flaw in Haxl
|
||||
-- where it can sometimes leave the environment in a bad state when async
|
||||
-- exceptions are thrown (for example the cache may think a fetch is happening
|
||||
-- but the exception has stopped it). TODO would be to make Haxl async excpetion
|
||||
-- but the exception has stopped it). TODO would be to make Haxl async exception
|
||||
-- safe and then remove the rethrowAsyncExceptions below, but for now this is
|
||||
-- safer to avoid bugs. Additionally this would not protect you from async
|
||||
-- exceptions thrown while executing code in the scheduler, and so relying on
|
||||
-- this function to catch all async excpetions would be ambitious at best.
|
||||
-- this function to catch all async exceptions would be ambitious at best.
|
||||
unsafeToHaxlException :: GenHaxl u w a -> GenHaxl u w a
|
||||
unsafeToHaxlException (GenHaxl m) = GenHaxl $ \env -> do
|
||||
r <- m env `Exception.catch` \e -> do
|
||||
|
@ -7,6 +7,7 @@
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE RecordWildCards #-}
|
||||
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
|
||||
{-# LANGUAGE RankNTypes #-}
|
||||
|
||||
-- |
|
||||
-- Types and operations for statistics and profiling. Most users
|
||||
@ -25,6 +26,7 @@ module Haxl.Core.Stats
|
||||
, numFetches
|
||||
, ppStats
|
||||
, ppFetchStats
|
||||
, aggregateFetchBatches
|
||||
|
||||
-- * Profiling
|
||||
, Profile
|
||||
@ -43,12 +45,13 @@ module Haxl.Core.Stats
|
||||
) where
|
||||
|
||||
import Data.Aeson
|
||||
import Data.Function (on)
|
||||
import Data.HashMap.Strict (HashMap)
|
||||
import Data.HashSet (HashSet)
|
||||
import Data.Int
|
||||
import Data.List (intercalate, maximumBy, minimumBy)
|
||||
import Data.List (intercalate, maximumBy, minimumBy, sortOn, groupBy)
|
||||
import Data.Semigroup (Semigroup)
|
||||
import Data.Ord (comparing)
|
||||
import Data.Ord (comparing, Down(..))
|
||||
import Data.Text (Text)
|
||||
import Data.Time.Clock.POSIX
|
||||
import Text.Printf
|
||||
@ -118,6 +121,7 @@ data FetchStats
|
||||
, fetchDuration :: {-# UNPACK #-} !Microseconds
|
||||
, fetchSpace :: {-# UNPACK #-} !Int64
|
||||
, fetchFailures :: {-# UNPACK #-} !Int
|
||||
, fetchBatchId :: {-# UNPACK #-} !Int
|
||||
}
|
||||
|
||||
-- | The stack trace of a call to 'dataFetch'. These are collected
|
||||
@ -126,7 +130,7 @@ data FetchStats
|
||||
{ fetchReq :: String
|
||||
, fetchStack :: [String]
|
||||
}
|
||||
deriving (Show)
|
||||
deriving (Eq, Show)
|
||||
|
||||
-- | Pretty-print RoundStats.
|
||||
ppFetchStats :: FetchStats -> String
|
||||
@ -136,6 +140,14 @@ ppFetchStats FetchStats{..} =
|
||||
(fromIntegral fetchDuration / 1000 :: Double) fetchSpace fetchFailures
|
||||
ppFetchStats (FetchCall r ss) = show r ++ '\n':show ss
|
||||
|
||||
-- | Aggregate stats merging FetchStats from the same dispatched batch into one.
|
||||
aggregateFetchBatches :: ([FetchStats] -> a) -> Stats -> [a]
|
||||
aggregateFetchBatches agg (Stats fetches) =
|
||||
map agg $
|
||||
groupBy ((==) `on` fetchBatchId) $
|
||||
sortOn (Down . fetchBatchId)
|
||||
[f | f@FetchStats{} <- fetches]
|
||||
|
||||
instance ToJSON FetchStats where
|
||||
toJSON FetchStats{..} = object
|
||||
[ "datasource" .= fetchDataSource
|
||||
@ -144,6 +156,7 @@ instance ToJSON FetchStats where
|
||||
, "duration" .= fetchDuration
|
||||
, "allocation" .= fetchSpace
|
||||
, "failures" .= fetchFailures
|
||||
, "bachid" .= fetchBatchId
|
||||
]
|
||||
toJSON (FetchCall req strs) = object
|
||||
[ "request" .= req
|
||||
|
@ -1,3 +1,6 @@
|
||||
# Changes in version <next>
|
||||
* Added fetchBatchId to FetchStats
|
||||
|
||||
# Changes in version 2.3.0.0
|
||||
* Removed `FutureFetch`
|
||||
|
||||
|
@ -137,6 +137,7 @@ test-suite test
|
||||
ParallelTests
|
||||
ProfileTests
|
||||
SleepDataSource
|
||||
StatsTests
|
||||
TestBadDataSource
|
||||
TestExampleDataSource
|
||||
TestTypes
|
||||
|
@ -19,6 +19,7 @@ import TestBadDataSource
|
||||
import FullyAsyncTest
|
||||
import WriteTests
|
||||
import ParallelTests
|
||||
import StatsTests
|
||||
|
||||
import Test.HUnit
|
||||
|
||||
@ -37,4 +38,5 @@ allTests = TestList
|
||||
, TestLabel "FullyAsyncTest" FullyAsyncTest.tests
|
||||
, TestLabel "WriteTest" WriteTests.tests
|
||||
, TestLabel "ParallelTest" ParallelTests.tests
|
||||
, TestLabel "StatsTests" StatsTests.tests
|
||||
]
|
||||
|
53
tests/StatsTests.hs
Normal file
53
tests/StatsTests.hs
Normal file
@ -0,0 +1,53 @@
|
||||
-- Copyright (c) 2014-present, Facebook, Inc.
|
||||
-- All rights reserved.
|
||||
--
|
||||
-- This source code is distributed under the terms of a BSD license,
|
||||
-- found in the LICENSE file.
|
||||
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE RecordWildCards #-}
|
||||
|
||||
module StatsTests (tests) where
|
||||
|
||||
import Test.HUnit
|
||||
import Data.List
|
||||
|
||||
import Haxl.Core
|
||||
|
||||
aggregateBatches :: Test
|
||||
aggregateBatches = TestCase $ do
|
||||
let
|
||||
statsNoBatches = [ FetchStats { fetchDataSource = "foo"
|
||||
, fetchBatchSize = 7
|
||||
, fetchStart = 0
|
||||
, fetchDuration = 10
|
||||
, fetchSpace = 1
|
||||
, fetchFailures = 2
|
||||
, fetchBatchId = n }
|
||||
| n <- reverse [1..10] ++ [11..20] ]
|
||||
++ [ FetchCall "A" ["B"], FetchCall "C" ["D"] ]
|
||||
fetchBatch = [ FetchStats { fetchDataSource = "batch"
|
||||
, fetchBatchSize = 1
|
||||
, fetchStart = 100
|
||||
, fetchDuration = 1000 * n
|
||||
, fetchSpace = 3
|
||||
, fetchFailures = if n <= 3 then 1 else 0
|
||||
, fetchBatchId = 123 } | n <- [1..50] ]
|
||||
agg (sz,bids) FetchStats{..} = (sz + fetchBatchSize, fetchBatchId:bids)
|
||||
agg _ _ = error "unexpected"
|
||||
agg' = foldl' agg (0,[])
|
||||
aggNoBatch = aggregateFetchBatches agg' (Stats statsNoBatches)
|
||||
expectedNoBatch = [(7, [n]) | n <- reverse [1..20] :: [Int]]
|
||||
aggBatch = aggregateFetchBatches agg' (Stats fetchBatch)
|
||||
expectedResultBatch = (50, [123 | _ <- [1..50] :: [Int]])
|
||||
aggInterspersedBatch =
|
||||
aggregateFetchBatches agg'
|
||||
(Stats $ intersperse (head fetchBatch) statsNoBatches)
|
||||
expectedResultInterspersed =
|
||||
(21, [123 | _ <- [1..21] :: [Int]]) : expectedNoBatch
|
||||
assertEqual "No batch has no change" expectedNoBatch aggNoBatch
|
||||
assertEqual "Batch is combined" [expectedResultBatch] aggBatch
|
||||
assertEqual
|
||||
"Grouping works as expected" expectedResultInterspersed aggInterspersedBatch
|
||||
|
||||
tests = TestList [TestLabel "Aggregate Batches" aggregateBatches]
|
Loading…
Reference in New Issue
Block a user