diff --git a/Haxl/Core.hs b/Haxl/Core.hs index f59ea84..c16ff2c 100644 --- a/Haxl/Core.hs +++ b/Haxl/Core.hs @@ -50,6 +50,7 @@ module Haxl.Core ( , numFetches , ppStats , ppFetchStats + , aggregateFetchBatches , Profile , emptyProfile , profile diff --git a/Haxl/Core/Fetch.hs b/Haxl/Core/Fetch.hs index c780b50..7627d7b 100644 --- a/Haxl/Core/Fetch.hs +++ b/Haxl/Core/Fetch.hs @@ -347,7 +347,7 @@ performRequestStore env reqStore = -- complete, and all of the 'ResultVar's are full. performFetches :: forall u w. Env u w -> [BlockedFetches u] -> IO () -performFetches env@Env{flags=f, statsRef=sref} jobs = do +performFetches env@Env{flags=f, statsRef=sref, statsBatchIdRef=sbref} jobs = do t0 <- getTimestamp let @@ -378,7 +378,7 @@ performFetches env@Env{flags=f, statsRef=sref} jobs = do return $ FetchToDo reqs $ (if report f >= 2 - then wrapFetchInStats sref dsName (length reqs) + then wrapFetchInStats sref sbref dsName (length reqs) else id) $ wrapFetchInTrace i (length reqs) dsName $ wrapFetchInCatch reqs @@ -439,21 +439,24 @@ wrapFetchInCatch reqs fetch = wrapFetchInStats :: IORef Stats + -> IORef Int -> Text -> Int -> PerformFetch req -> PerformFetch req -wrapFetchInStats !statsRef dataSource batchSize perform = do +wrapFetchInStats !statsRef !batchIdRef dataSource batchSize perform = do case perform of SyncFetch f -> SyncFetch $ \reqs -> do + bid <- newBatchId fail_ref <- newIORef 0 (t0,t,alloc,_) <- statsForIO (f (map (addFailureCount fail_ref) reqs)) failures <- readIORef fail_ref - updateFetchStats t0 t alloc batchSize failures + updateFetchStats bid t0 t alloc batchSize failures AsyncFetch f -> do AsyncFetch $ \reqs inner -> do + bid <- newBatchId inner_r <- newIORef (0, 0) fail_ref <- newIORef 0 let inner' = do @@ -463,20 +466,22 @@ wrapFetchInStats !statsRef dataSource batchSize perform = do (t0, totalTime, totalAlloc, _) <- statsForIO (f reqs' inner') (innerTime, innerAlloc) <- readIORef inner_r failures <- readIORef fail_ref - updateFetchStats t0 (totalTime - innerTime) (totalAlloc - innerAlloc) - batchSize failures + updateFetchStats bid t0 (totalTime - innerTime) + (totalAlloc - innerAlloc) batchSize failures BackgroundFetch io -> do BackgroundFetch $ \reqs -> do + bid <- newBatchId startTime <- getTimestamp - io (map (addTimer startTime) reqs) + io (map (addTimer bid startTime) reqs) where + newBatchId = atomicModifyIORef' batchIdRef $ \x -> (x+1,x+1) statsForIO io = do prevAlloc <- getAllocationCounter (t0,t,a) <- time io postAlloc <- getAllocationCounter return (t0,t, fromIntegral $ prevAlloc - postAlloc, a) - addTimer t0 (BlockedFetch req (ResultVar fn)) = + addTimer bid t0 (BlockedFetch req (ResultVar fn)) = BlockedFetch req $ ResultVar $ \result isChildThread -> do t1 <- getTimestamp -- We cannot measure allocation easily for BackgroundFetch. Here we @@ -486,21 +491,22 @@ wrapFetchInStats !statsRef dataSource batchSize perform = do -- meaningful. -- see Note [tracking allocation in child threads] allocs <- if isChildThread then getAllocationCounter else return 0 - updateFetchStats t0 (t1 - t0) + updateFetchStats bid t0 (t1 - t0) (negate allocs) 1 -- batch size: we don't know if this is a batch or not (if isLeft result then 1 else 0) -- failures fn result isChildThread updateFetchStats - :: Timestamp -> Microseconds -> Int64 -> Int -> Int -> IO () - updateFetchStats start time space batch failures = do + :: Int -> Timestamp -> Microseconds -> Int64 -> Int -> Int -> IO () + updateFetchStats bid start time space batch failures = do let this = FetchStats { fetchDataSource = dataSource , fetchBatchSize = batch , fetchStart = start , fetchDuration = time , fetchSpace = space - , fetchFailures = failures } + , fetchFailures = failures + , fetchBatchId = bid } atomicModifyIORef' statsRef $ \(Stats fs) -> (Stats (this : fs), ()) addFailureCount :: IORef Int -> BlockedFetch r -> BlockedFetch r diff --git a/Haxl/Core/Monad.hs b/Haxl/Core/Monad.hs index 3864a62..0682556 100644 --- a/Haxl/Core/Monad.hs +++ b/Haxl/Core/Monad.hs @@ -163,6 +163,10 @@ data Env u w = Env , statsRef :: {-# UNPACK #-} !(IORef Stats) -- ^ statistics, collected according to the 'report' level in 'flags'. + , statsBatchIdRef :: {-# UNPACK #-} !(IORef Int) + -- ^ keeps track of a Unique ID for each batch dispatched with stats + -- enabled, for aggregating after. + , profLabel :: ProfileLabel -- ^ current profiling label, see 'withLabel' @@ -226,6 +230,7 @@ caches env = (dataCache env, memoCache env) initEnvWithData :: StateStore -> u -> Caches u w -> IO (Env u w) initEnvWithData states e (dcache, mcache) = do sref <- newIORef emptyStats + sbref <- newIORef 0 pref <- newIORef emptyProfile rs <- newIORef noRequests -- RequestStore rq <- newIORef JobNil @@ -240,6 +245,7 @@ initEnvWithData states e (dcache, mcache) = do , userEnv = e , states = states , statsRef = sref + , statsBatchIdRef = sbref , profLabel = "MAIN" , profRef = pref , reqStoreRef = rs @@ -865,11 +871,11 @@ unsafeLiftIO m = GenHaxl $ \_env -> Done <$> m -- Note: this function does not catch async exceptions. This is a flaw in Haxl -- where it can sometimes leave the environment in a bad state when async -- exceptions are thrown (for example the cache may think a fetch is happening --- but the exception has stopped it). TODO would be to make Haxl async excpetion +-- but the exception has stopped it). TODO would be to make Haxl async exception -- safe and then remove the rethrowAsyncExceptions below, but for now this is -- safer to avoid bugs. Additionally this would not protect you from async -- exceptions thrown while executing code in the scheduler, and so relying on --- this function to catch all async excpetions would be ambitious at best. +-- this function to catch all async exceptions would be ambitious at best. unsafeToHaxlException :: GenHaxl u w a -> GenHaxl u w a unsafeToHaxlException (GenHaxl m) = GenHaxl $ \env -> do r <- m env `Exception.catch` \e -> do diff --git a/Haxl/Core/Stats.hs b/Haxl/Core/Stats.hs index 9b909dd..b85358c 100644 --- a/Haxl/Core/Stats.hs +++ b/Haxl/Core/Stats.hs @@ -7,6 +7,7 @@ {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE RankNTypes #-} -- | -- Types and operations for statistics and profiling. Most users @@ -25,6 +26,7 @@ module Haxl.Core.Stats , numFetches , ppStats , ppFetchStats + , aggregateFetchBatches -- * Profiling , Profile @@ -43,12 +45,13 @@ module Haxl.Core.Stats ) where import Data.Aeson +import Data.Function (on) import Data.HashMap.Strict (HashMap) import Data.HashSet (HashSet) import Data.Int -import Data.List (intercalate, maximumBy, minimumBy) +import Data.List (intercalate, maximumBy, minimumBy, sortOn, groupBy) import Data.Semigroup (Semigroup) -import Data.Ord (comparing) +import Data.Ord (comparing, Down(..)) import Data.Text (Text) import Data.Time.Clock.POSIX import Text.Printf @@ -118,6 +121,7 @@ data FetchStats , fetchDuration :: {-# UNPACK #-} !Microseconds , fetchSpace :: {-# UNPACK #-} !Int64 , fetchFailures :: {-# UNPACK #-} !Int + , fetchBatchId :: {-# UNPACK #-} !Int } -- | The stack trace of a call to 'dataFetch'. These are collected @@ -126,7 +130,7 @@ data FetchStats { fetchReq :: String , fetchStack :: [String] } - deriving (Show) + deriving (Eq, Show) -- | Pretty-print RoundStats. ppFetchStats :: FetchStats -> String @@ -136,6 +140,14 @@ ppFetchStats FetchStats{..} = (fromIntegral fetchDuration / 1000 :: Double) fetchSpace fetchFailures ppFetchStats (FetchCall r ss) = show r ++ '\n':show ss +-- | Aggregate stats merging FetchStats from the same dispatched batch into one. +aggregateFetchBatches :: ([FetchStats] -> a) -> Stats -> [a] +aggregateFetchBatches agg (Stats fetches) = + map agg $ + groupBy ((==) `on` fetchBatchId) $ + sortOn (Down . fetchBatchId) + [f | f@FetchStats{} <- fetches] + instance ToJSON FetchStats where toJSON FetchStats{..} = object [ "datasource" .= fetchDataSource @@ -144,6 +156,7 @@ instance ToJSON FetchStats where , "duration" .= fetchDuration , "allocation" .= fetchSpace , "failures" .= fetchFailures + , "bachid" .= fetchBatchId ] toJSON (FetchCall req strs) = object [ "request" .= req diff --git a/changelog.md b/changelog.md index fb17406..26c2ca6 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,6 @@ +# Changes in version + * Added fetchBatchId to FetchStats + # Changes in version 2.3.0.0 * Removed `FutureFetch` diff --git a/haxl.cabal b/haxl.cabal index 7d818a1..5383968 100644 --- a/haxl.cabal +++ b/haxl.cabal @@ -137,6 +137,7 @@ test-suite test ParallelTests ProfileTests SleepDataSource + StatsTests TestBadDataSource TestExampleDataSource TestTypes diff --git a/tests/AllTests.hs b/tests/AllTests.hs index 4d76b88..5628d7a 100644 --- a/tests/AllTests.hs +++ b/tests/AllTests.hs @@ -19,6 +19,7 @@ import TestBadDataSource import FullyAsyncTest import WriteTests import ParallelTests +import StatsTests import Test.HUnit @@ -37,4 +38,5 @@ allTests = TestList , TestLabel "FullyAsyncTest" FullyAsyncTest.tests , TestLabel "WriteTest" WriteTests.tests , TestLabel "ParallelTest" ParallelTests.tests + , TestLabel "StatsTests" StatsTests.tests ] diff --git a/tests/StatsTests.hs b/tests/StatsTests.hs new file mode 100644 index 0000000..59a38e5 --- /dev/null +++ b/tests/StatsTests.hs @@ -0,0 +1,53 @@ +-- Copyright (c) 2014-present, Facebook, Inc. +-- All rights reserved. +-- +-- This source code is distributed under the terms of a BSD license, +-- found in the LICENSE file. + +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RecordWildCards #-} + +module StatsTests (tests) where + +import Test.HUnit +import Data.List + +import Haxl.Core + +aggregateBatches :: Test +aggregateBatches = TestCase $ do + let + statsNoBatches = [ FetchStats { fetchDataSource = "foo" + , fetchBatchSize = 7 + , fetchStart = 0 + , fetchDuration = 10 + , fetchSpace = 1 + , fetchFailures = 2 + , fetchBatchId = n } + | n <- reverse [1..10] ++ [11..20] ] + ++ [ FetchCall "A" ["B"], FetchCall "C" ["D"] ] + fetchBatch = [ FetchStats { fetchDataSource = "batch" + , fetchBatchSize = 1 + , fetchStart = 100 + , fetchDuration = 1000 * n + , fetchSpace = 3 + , fetchFailures = if n <= 3 then 1 else 0 + , fetchBatchId = 123 } | n <- [1..50] ] + agg (sz,bids) FetchStats{..} = (sz + fetchBatchSize, fetchBatchId:bids) + agg _ _ = error "unexpected" + agg' = foldl' agg (0,[]) + aggNoBatch = aggregateFetchBatches agg' (Stats statsNoBatches) + expectedNoBatch = [(7, [n]) | n <- reverse [1..20] :: [Int]] + aggBatch = aggregateFetchBatches agg' (Stats fetchBatch) + expectedResultBatch = (50, [123 | _ <- [1..50] :: [Int]]) + aggInterspersedBatch = + aggregateFetchBatches agg' + (Stats $ intersperse (head fetchBatch) statsNoBatches) + expectedResultInterspersed = + (21, [123 | _ <- [1..21] :: [Int]]) : expectedNoBatch + assertEqual "No batch has no change" expectedNoBatch aggNoBatch + assertEqual "Batch is combined" [expectedResultBatch] aggBatch + assertEqual + "Grouping works as expected" expectedResultInterspersed aggInterspersedBatch + +tests = TestList [TestLabel "Aggregate Batches" aggregateBatches]