mirror of
https://github.com/facebook/Haxl.git
synced 2024-12-24 01:04:21 +03:00
Track fetches/memos accurately in profiling (#120)
Summary: Pull Request resolved: https://github.com/facebook/Haxl/pull/120 This adds tracking of memo/fetches per label by a unique id for each. Using this we can track exactly where time was spent, and where it was shared Reviewed By: simonmar Differential Revision: D20792435 fbshipit-source-id: 55c1e778d313d103a910c6dd5be512f95125acce
This commit is contained in:
parent
fdfb86379b
commit
15a8c2cc84
@ -44,6 +44,7 @@ module Haxl.Core (
|
||||
-- ** Statistics
|
||||
, Stats(..)
|
||||
, FetchStats(..)
|
||||
, CallId
|
||||
, Microseconds
|
||||
, Timestamp
|
||||
, emptyStats
|
||||
@ -52,6 +53,8 @@ module Haxl.Core (
|
||||
, ppFetchStats
|
||||
, aggregateFetchBatches
|
||||
, Profile(..)
|
||||
, ProfileMemo(..)
|
||||
, ProfileFetch(..)
|
||||
, emptyProfile
|
||||
, ProfileLabel
|
||||
, ProfileKey
|
||||
@ -59,7 +62,6 @@ module Haxl.Core (
|
||||
, emptyProfileData
|
||||
, AllocCount
|
||||
, LabelHitCount
|
||||
, MemoHitCount
|
||||
|
||||
-- ** Tracing flags
|
||||
, Flags(..)
|
||||
|
@ -22,6 +22,7 @@ module Haxl.Core.DataCache
|
||||
, insertWithShow
|
||||
, lookup
|
||||
, showCache
|
||||
, readCache
|
||||
) where
|
||||
|
||||
import Prelude hiding (lookup)
|
||||
@ -166,3 +167,23 @@ showCache (DataCache cache) readRes = H.foldM goSubCache [] cache
|
||||
Just (Left e) -> (showReq request, Left e) : res
|
||||
Just (Right result) ->
|
||||
(showReq request, Right (showRes result)) : res
|
||||
|
||||
-- | Dumps the contents of the cache responses to list
|
||||
readCache
|
||||
:: forall res ret
|
||||
. DataCache res
|
||||
-> (forall a . res a -> IO ret)
|
||||
-> IO [(TypeRep, [Either SomeException ret])]
|
||||
readCache (DataCache cache) readRes = H.foldM goSubCache [] cache
|
||||
where
|
||||
goSubCache
|
||||
:: [(TypeRep, [Either SomeException ret])]
|
||||
-> (TypeRep, SubCache res)
|
||||
-> IO [(TypeRep, [Either SomeException ret])]
|
||||
goSubCache res (ty, SubCache _showReq _showRes hm) = do
|
||||
subCacheResult <- H.foldM go [] hm
|
||||
return $ (ty, subCacheResult):res
|
||||
where
|
||||
go res (_request, rvar) = do
|
||||
r <- try $ readRes rvar
|
||||
return $ r : res
|
||||
|
@ -74,15 +74,18 @@ data CacheResult u w a
|
||||
= Uncached
|
||||
(ResultVar a)
|
||||
{-# UNPACK #-} !(IVar u w a)
|
||||
{-# UNPACK #-} !CallId
|
||||
|
||||
-- | The request has been seen before, but its result has not yet been
|
||||
-- fetched.
|
||||
| CachedNotFetched
|
||||
{-# UNPACK #-} !(IVar u w a)
|
||||
{-# UNPACK #-} !CallId
|
||||
|
||||
-- | The request has been seen before, and its result has already been
|
||||
-- fetched.
|
||||
| Cached (ResultVal a w)
|
||||
{-# UNPACK #-} !CallId
|
||||
|
||||
|
||||
-- | Show functions for request and its result.
|
||||
@ -101,31 +104,32 @@ cachedWithInsert
|
||||
:: forall r a u w.
|
||||
(DataSource u r, Typeable (r a))
|
||||
=> (r a -> String) -- See Note [showFn]
|
||||
-> (r a -> IVar u w a -> DataCache (IVar u w) -> IO ())
|
||||
-> (r a -> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
|
||||
-> Env u w -> r a -> IO (CacheResult u w a)
|
||||
cachedWithInsert showFn insertFn Env{..} req = do
|
||||
cachedWithInsert showFn insertFn env@Env{..} req = do
|
||||
let
|
||||
doFetch = do
|
||||
ivar <- newIVar
|
||||
k <- nextCallId env
|
||||
let !rvar = stdResultVar ivar completions submittedReqsRef flags
|
||||
(Proxy :: Proxy r)
|
||||
insertFn req ivar dataCache
|
||||
return (Uncached rvar ivar)
|
||||
insertFn req (DataCacheItem ivar k) dataCache
|
||||
return (Uncached rvar ivar k)
|
||||
mbRes <- DataCache.lookup req dataCache
|
||||
case mbRes of
|
||||
Nothing -> doFetch
|
||||
Just i@IVar{ivarRef = cr} -> do
|
||||
Just (DataCacheItem i@IVar{ivarRef = cr} k) -> do
|
||||
e <- readIORef cr
|
||||
case e of
|
||||
IVarEmpty _ -> do
|
||||
ivar <- withCurrentCCS i
|
||||
return (CachedNotFetched ivar)
|
||||
return (CachedNotFetched ivar k)
|
||||
IVarFull r -> do
|
||||
ifTrace flags 3 $ putStrLn $ case r of
|
||||
ThrowIO{} -> "Cached error: " ++ showFn req
|
||||
ThrowHaxl{} -> "Cached error: " ++ showFn req
|
||||
Ok{} -> "Cached request: " ++ showFn req
|
||||
return (Cached r)
|
||||
return (Cached r k)
|
||||
|
||||
|
||||
-- | Make a ResultVar with the standard function for sending a CompletionReq
|
||||
@ -160,15 +164,15 @@ stdResultVar ivar completions ref flags p =
|
||||
|
||||
-- | Record the call stack for a data fetch in the Stats. Only useful
|
||||
-- when profiling.
|
||||
logFetch :: Env u w -> (r a -> String) -> r a -> IO ()
|
||||
logFetch :: Env u w -> (r a -> String) -> r a -> CallId -> IO ()
|
||||
#ifdef PROFILING
|
||||
logFetch env showFn req = do
|
||||
logFetch env showFn req fid = do
|
||||
ifReport (flags env) 5 $ do
|
||||
stack <- currentCallStack
|
||||
modifyIORef' (statsRef env) $ \(Stats s) ->
|
||||
Stats (FetchCall (showFn req) stack : s)
|
||||
Stats (FetchCall (showFn req) stack fid : s)
|
||||
#else
|
||||
logFetch _ _ _ = return ()
|
||||
logFetch _ _ _ _ = return ()
|
||||
#endif
|
||||
|
||||
-- | Performs actual fetching of data for a 'Request' from a 'DataSource'.
|
||||
@ -190,38 +194,44 @@ dataFetchWithInsert
|
||||
:: forall u w r a
|
||||
. (DataSource u r, Eq (r a), Hashable (r a), Typeable (r a))
|
||||
=> (r a -> String) -- See Note [showFn]
|
||||
-> (r a -> IVar u w a -> DataCache (IVar u w) -> IO ())
|
||||
-> (r a -> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
|
||||
-> r a
|
||||
-> GenHaxl u w a
|
||||
dataFetchWithInsert showFn insertFn req =
|
||||
GenHaxl $ \env@Env{..} -> do
|
||||
-- First, check the cache
|
||||
res <- cachedWithInsert showFn insertFn env req
|
||||
ifProfiling flags $ addProfileFetch env req
|
||||
case res of
|
||||
-- This request has not been seen before
|
||||
Uncached rvar ivar -> do
|
||||
logFetch env showFn req
|
||||
Uncached rvar ivar fid -> do
|
||||
logFetch env showFn req fid
|
||||
ifProfiling flags $ addProfileFetch env req fid False
|
||||
--
|
||||
-- Check whether the data source wants to submit requests
|
||||
-- eagerly, or batch them up.
|
||||
--
|
||||
let blockedFetch = BlockedFetch req rvar
|
||||
let blockedFetchI = BlockedFetchInternal fid
|
||||
case schedulerHint userEnv :: SchedulerHint r of
|
||||
SubmitImmediately ->
|
||||
performFetches env [BlockedFetches [BlockedFetch req rvar]]
|
||||
performFetches env [BlockedFetches [blockedFetch] [blockedFetchI]]
|
||||
TryToBatch ->
|
||||
-- add the request to the RequestStore and continue
|
||||
modifyIORef' reqStoreRef $ \bs ->
|
||||
addRequest (BlockedFetch req rvar) bs
|
||||
addRequest blockedFetch blockedFetchI bs
|
||||
--
|
||||
return $ Blocked ivar (Return ivar)
|
||||
|
||||
-- Seen before but not fetched yet. We're blocked, but we don't have
|
||||
-- to add the request to the RequestStore.
|
||||
CachedNotFetched ivar -> return $ Blocked ivar (Return ivar)
|
||||
CachedNotFetched ivar fid -> do
|
||||
ifProfiling flags $ addProfileFetch env req fid True
|
||||
return $ Blocked ivar (Return ivar)
|
||||
|
||||
-- Cached: either a result, or an exception
|
||||
Cached r -> done r
|
||||
Cached r fid -> do
|
||||
ifProfiling flags $ addProfileFetch env req fid True
|
||||
done r
|
||||
|
||||
-- | A data request that is not cached. This is not what you want for
|
||||
-- normal read requests, because then multiple identical requests may
|
||||
@ -246,11 +256,12 @@ uncachedRequest req = do
|
||||
subRef <- env submittedReqsRef
|
||||
if recording flg /= 0
|
||||
then dataFetch req
|
||||
else GenHaxl $ \Env{..} -> do
|
||||
else GenHaxl $ \e@Env{..} -> do
|
||||
ivar <- newIVar
|
||||
k <- nextCallId e
|
||||
let !rvar = stdResultVar ivar completions subRef flg (Proxy :: Proxy r)
|
||||
modifyIORef' reqStoreRef $ \bs ->
|
||||
addRequest (BlockedFetch req rvar) bs
|
||||
addRequest (BlockedFetch req rvar) (BlockedFetchInternal k) bs
|
||||
return $ Blocked ivar (Return ivar)
|
||||
|
||||
|
||||
@ -275,9 +286,11 @@ cacheResultWithShow (showReq, showRes) = cacheResultWithInsert showReq
|
||||
cacheResultWithInsert
|
||||
:: Typeable (r a)
|
||||
=> (r a -> String) -- See Note [showFn]
|
||||
-> (r a -> IVar u w a -> DataCache (IVar u w) -> IO ()) -> r a
|
||||
-> IO a -> GenHaxl u w a
|
||||
cacheResultWithInsert showFn insertFn req val = GenHaxl $ \Env{..} -> do
|
||||
-> (r a -> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
|
||||
-> r a
|
||||
-> IO a
|
||||
-> GenHaxl u w a
|
||||
cacheResultWithInsert showFn insertFn req val = GenHaxl $ \e@Env{..} -> do
|
||||
mbRes <- DataCache.lookup req dataCache
|
||||
case mbRes of
|
||||
Nothing -> do
|
||||
@ -287,9 +300,10 @@ cacheResultWithInsert showFn insertFn req val = GenHaxl $ \Env{..} -> do
|
||||
_ -> return ()
|
||||
let result = eitherToResultThrowIO eitherResult
|
||||
ivar <- newFullIVar result
|
||||
insertFn req ivar dataCache
|
||||
k <- nextCallId e
|
||||
insertFn req (DataCacheItem ivar k) dataCache
|
||||
done result
|
||||
Just IVar{ivarRef = cr} -> do
|
||||
Just (DataCacheItem IVar{ivarRef = cr} _) -> do
|
||||
e <- readIORef cr
|
||||
case e of
|
||||
IVarEmpty _ -> corruptCache
|
||||
@ -313,12 +327,13 @@ cacheResultWithInsert showFn insertFn req val = GenHaxl $ \Env{..} -> do
|
||||
--
|
||||
cacheRequest
|
||||
:: Request req a => req a -> Either SomeException a -> GenHaxl u w ()
|
||||
cacheRequest request result = GenHaxl $ \Env{..} -> do
|
||||
cacheRequest request result = GenHaxl $ \e@Env{..} -> do
|
||||
mbRes <- DataCache.lookup request dataCache
|
||||
case mbRes of
|
||||
Nothing -> do
|
||||
cr <- newFullIVar (eitherToResult result)
|
||||
DataCache.insert request cr dataCache
|
||||
k <- nextCallId e
|
||||
DataCache.insert request (DataCacheItem cr k) dataCache
|
||||
return (Done ())
|
||||
|
||||
-- It is an error if the request is already in the cache.
|
||||
@ -334,9 +349,10 @@ cacheRequest request result = GenHaxl $ \Env{..} -> do
|
||||
-- Useful e.g. for unit tests
|
||||
dupableCacheRequest
|
||||
:: Request req a => req a -> Either SomeException a -> GenHaxl u w ()
|
||||
dupableCacheRequest request result = GenHaxl $ \Env{..} -> do
|
||||
dupableCacheRequest request result = GenHaxl $ \e@Env{..} -> do
|
||||
cr <- newFullIVar (eitherToResult result)
|
||||
DataCache.insert request cr dataCache
|
||||
k <- nextCallId e
|
||||
DataCache.insert request (DataCacheItem cr k) dataCache
|
||||
return (Done ())
|
||||
|
||||
performRequestStore
|
||||
@ -353,11 +369,11 @@ performFetches env@Env{flags=f, statsRef=sref, statsBatchIdRef=sbref} jobs = do
|
||||
t0 <- getTimestamp
|
||||
|
||||
ifTrace f 3 $
|
||||
forM_ jobs $ \(BlockedFetches reqs) ->
|
||||
forM_ jobs $ \(BlockedFetches reqs _) ->
|
||||
forM_ reqs $ \(BlockedFetch r _) -> putStrLn (showp r)
|
||||
|
||||
let
|
||||
applyFetch i (BlockedFetches (reqs :: [BlockedFetch r])) =
|
||||
applyFetch i bfs@(BlockedFetches (reqs :: [BlockedFetch r]) _) =
|
||||
case stateGet (states env) of
|
||||
Nothing ->
|
||||
return (FetchToDo reqs (SyncFetch (mapM_ (setError e))))
|
||||
@ -367,10 +383,9 @@ performFetches env@Env{flags=f, statsRef=sref, statsBatchIdRef=sbref} jobs = do
|
||||
<> ": "
|
||||
<> Text.pack (showp req)
|
||||
Just state ->
|
||||
return
|
||||
$ FetchToDo reqs
|
||||
return $ FetchToDo reqs
|
||||
$ (if report f >= 2
|
||||
then wrapFetchInStats sref sbref dsName (length reqs)
|
||||
then wrapFetchInStats sref sbref dsName (length reqs) bfs
|
||||
else id)
|
||||
$ wrapFetchInTrace i (length reqs) dsName
|
||||
$ wrapFetchInCatch reqs
|
||||
@ -430,14 +445,21 @@ wrapFetchInCatch reqs fetch =
|
||||
|
||||
|
||||
wrapFetchInStats
|
||||
:: IORef Stats
|
||||
:: forall u req .
|
||||
IORef Stats
|
||||
-> IORef Int
|
||||
-> Text
|
||||
-> Int
|
||||
-> BlockedFetches u
|
||||
-> PerformFetch req
|
||||
-> PerformFetch req
|
||||
|
||||
wrapFetchInStats !statsRef !batchIdRef dataSource batchSize perform = do
|
||||
wrapFetchInStats
|
||||
!statsRef
|
||||
!batchIdRef
|
||||
dataSource
|
||||
batchSize
|
||||
(BlockedFetches _reqs reqsI)
|
||||
perform = do
|
||||
case perform of
|
||||
SyncFetch f ->
|
||||
SyncFetch $ \reqs -> do
|
||||
@ -445,7 +467,7 @@ wrapFetchInStats !statsRef !batchIdRef dataSource batchSize perform = do
|
||||
fail_ref <- newIORef 0
|
||||
(t0,t,alloc,_) <- statsForIO (f (map (addFailureCount fail_ref) reqs))
|
||||
failures <- readIORef fail_ref
|
||||
updateFetchStats bid t0 t alloc batchSize failures
|
||||
updateFetchStats bid allFids t0 t alloc batchSize failures
|
||||
AsyncFetch f -> do
|
||||
AsyncFetch $ \reqs inner -> do
|
||||
bid <- newBatchId
|
||||
@ -458,14 +480,15 @@ wrapFetchInStats !statsRef !batchIdRef dataSource batchSize perform = do
|
||||
(t0, totalTime, totalAlloc, _) <- statsForIO (f reqs' inner')
|
||||
(innerTime, innerAlloc) <- readIORef inner_r
|
||||
failures <- readIORef fail_ref
|
||||
updateFetchStats bid t0 (totalTime - innerTime)
|
||||
updateFetchStats bid allFids t0 (totalTime - innerTime)
|
||||
(totalAlloc - innerAlloc) batchSize failures
|
||||
BackgroundFetch io -> do
|
||||
BackgroundFetch $ \reqs -> do
|
||||
bid <- newBatchId
|
||||
startTime <- getTimestamp
|
||||
io (map (addTimer bid startTime) reqs)
|
||||
io (zipWith (addTimer bid startTime) reqs reqsI)
|
||||
where
|
||||
allFids = map (\(BlockedFetchInternal k) -> k) reqsI
|
||||
newBatchId = atomicModifyIORef' batchIdRef $ \x -> (x+1,x+1)
|
||||
statsForIO io = do
|
||||
prevAlloc <- getAllocationCounter
|
||||
@ -473,32 +496,44 @@ wrapFetchInStats !statsRef !batchIdRef dataSource batchSize perform = do
|
||||
postAlloc <- getAllocationCounter
|
||||
return (t0,t, fromIntegral $ prevAlloc - postAlloc, a)
|
||||
|
||||
addTimer bid t0 (BlockedFetch req (ResultVar fn)) =
|
||||
BlockedFetch req $ ResultVar $ \result isChildThread -> do
|
||||
t1 <- getTimestamp
|
||||
-- We cannot measure allocation easily for BackgroundFetch. Here we
|
||||
-- just attribute all allocation to the last `putResultFromChildThread`
|
||||
-- and use 0 for the others. While the individual allocations may not
|
||||
-- be correct, the total sum and amortized allocation are still
|
||||
-- meaningful.
|
||||
-- see Note [tracking allocation in child threads]
|
||||
allocs <- if isChildThread then getAllocationCounter else return 0
|
||||
updateFetchStats bid t0 (t1 - t0)
|
||||
(negate allocs)
|
||||
1 -- batch size: we don't know if this is a batch or not
|
||||
(if isLeft result then 1 else 0) -- failures
|
||||
fn result isChildThread
|
||||
addTimer
|
||||
bid
|
||||
t0
|
||||
(BlockedFetch req (ResultVar fn))
|
||||
(BlockedFetchInternal fid) =
|
||||
BlockedFetch req $ ResultVar $ \result isChildThread -> do
|
||||
t1 <- getTimestamp
|
||||
-- We cannot measure allocation easily for BackgroundFetch. Here we
|
||||
-- just attribute all allocation to the last
|
||||
-- `putResultFromChildThread` and use 0 for the others.
|
||||
-- While the individual allocations may not be correct,
|
||||
-- the total sum and amortized allocation are still meaningful.
|
||||
-- see Note [tracking allocation in child threads]
|
||||
allocs <- if isChildThread then getAllocationCounter else return 0
|
||||
updateFetchStats bid [fid] t0 (t1 - t0)
|
||||
(negate allocs)
|
||||
1 -- batch size: we don't know if this is a batch or not
|
||||
(if isLeft result then 1 else 0) -- failures
|
||||
fn result isChildThread
|
||||
|
||||
updateFetchStats
|
||||
:: Int -> Timestamp -> Microseconds -> Int64 -> Int -> Int -> IO ()
|
||||
updateFetchStats bid start time space batch failures = do
|
||||
:: Int
|
||||
-> [CallId]
|
||||
-> Timestamp
|
||||
-> Microseconds
|
||||
-> Int64
|
||||
-> Int
|
||||
-> Int
|
||||
-> IO ()
|
||||
updateFetchStats bid fids start time space batch failures = do
|
||||
let this = FetchStats { fetchDataSource = dataSource
|
||||
, fetchBatchSize = batch
|
||||
, fetchStart = start
|
||||
, fetchDuration = time
|
||||
, fetchSpace = space
|
||||
, fetchFailures = failures
|
||||
, fetchBatchId = bid }
|
||||
, fetchBatchId = bid
|
||||
, fetchIds = fids }
|
||||
atomicModifyIORef' statsRef $ \(Stats fs) -> (Stats (this : fs), ())
|
||||
|
||||
addFailureCount :: IORef Int -> BlockedFetch r -> BlockedFetch r
|
||||
|
@ -45,6 +45,7 @@ import qualified Data.HashMap.Strict as HashMap
|
||||
import Data.Text (Text)
|
||||
import Data.Typeable
|
||||
import Data.Hashable
|
||||
import Data.Int
|
||||
import Data.Word
|
||||
|
||||
import GHC.Prim (Addr#)
|
||||
@ -53,6 +54,7 @@ import Haxl.Core.Exception
|
||||
import Haxl.Core.DataCache as DataCache
|
||||
import Haxl.Core.Flags
|
||||
import Haxl.Core.Monad
|
||||
import Haxl.Core.Stats
|
||||
import Haxl.Core.Profile
|
||||
import Haxl.Core.Util (trace_)
|
||||
|
||||
@ -73,17 +75,18 @@ cachedComputation
|
||||
=> req a -> GenHaxl u w a -> GenHaxl u w a
|
||||
|
||||
cachedComputation req haxl = GenHaxl $ \env@Env{..} -> do
|
||||
ifProfiling flags $
|
||||
modifyIORef'
|
||||
profRef
|
||||
(incrementMemoHitCounterFor (profCurrentKey profCurrent))
|
||||
mbRes <- DataCache.lookup req memoCache
|
||||
case mbRes of
|
||||
Just ivar -> unHaxl (getIVarWithWrites ivar) env
|
||||
Just (DataCacheItem ivar k) -> do
|
||||
ifProfiling flags $ do
|
||||
incrementMemoHitCounterFor env k True
|
||||
unHaxl (getIVarWithWrites ivar) env
|
||||
Nothing -> do
|
||||
ivar <- newIVar
|
||||
DataCache.insertNotShowable req ivar memoCache
|
||||
unHaxl (execMemoNow haxl ivar) env
|
||||
k <- nextCallId env
|
||||
-- no need to incremenetMemoHitCounter as execMemo will do it
|
||||
DataCache.insertNotShowable req (DataCacheItem ivar k) memoCache
|
||||
execMemoNowProfiled env haxl ivar k
|
||||
|
||||
|
||||
-- | Like 'cachedComputation', but fails if the cache is already
|
||||
@ -101,18 +104,15 @@ preCacheComputation
|
||||
, Typeable (req a))
|
||||
=> req a -> GenHaxl u w a -> GenHaxl u w a
|
||||
preCacheComputation req haxl = GenHaxl $ \env@Env{..} -> do
|
||||
ifProfiling flags $
|
||||
modifyIORef'
|
||||
profRef
|
||||
(incrementMemoHitCounterFor (profCurrentKey profCurrent))
|
||||
mbRes <- DataCache.lookup req memoCache
|
||||
case mbRes of
|
||||
Just _ -> return $ Throw $ toException $ InvalidParameter
|
||||
"preCacheComputation: key is already cached"
|
||||
Nothing -> do
|
||||
ivar <- newIVar
|
||||
DataCache.insertNotShowable req ivar memoCache
|
||||
unHaxl (execMemoNow haxl ivar) env
|
||||
k <- nextCallId env
|
||||
DataCache.insertNotShowable req (DataCacheItem ivar k) memoCache
|
||||
execMemoNowProfiled env haxl ivar k
|
||||
|
||||
-- -----------------------------------------------------------------------------
|
||||
-- Memoization
|
||||
@ -204,13 +204,43 @@ runMemo (MemoVar memoRef) = GenHaxl $ \env -> do
|
||||
MemoReady cont -> trace_ "MemoReady" $ do
|
||||
ivar <- newIVar
|
||||
writeIORef memoRef (MemoRun ivar)
|
||||
unHaxl (execMemoNow cont ivar) env
|
||||
execMemoNow env cont ivar
|
||||
-- The memo has already been run, get (or wait for) for the result
|
||||
MemoRun ivar -> trace_ "MemoRun" $ unHaxl (getIVarWithWrites ivar) env
|
||||
|
||||
execMemoNowProfiled
|
||||
:: Env u w
|
||||
-> GenHaxl u w a
|
||||
-> IVar u w a
|
||||
-> CallId
|
||||
-> IO (Result u w a)
|
||||
execMemoNowProfiled envOuter cont ivar cid = if report (flags envOuter) < 4
|
||||
then execMemoNow envOuter cont ivar
|
||||
else do
|
||||
incrementMemoHitCounterFor envOuter cid False
|
||||
unHaxl
|
||||
(collectMemoData 0 $ GenHaxl $ \e -> execMemoNow e cont ivar)
|
||||
envOuter
|
||||
where
|
||||
addStats :: Env u w -> Int64 -> IO ()
|
||||
addStats env acc = modifyIORef' (statsRef env) $ \(Stats s) ->
|
||||
Stats (MemoCall cid acc : s)
|
||||
collectMemoData :: Int64 -> GenHaxl u w a -> GenHaxl u w a
|
||||
collectMemoData acc f = GenHaxl $ \env -> do
|
||||
a0 <- getAllocationCounter
|
||||
r <- unHaxl f env{memoKey=cid}
|
||||
a1 <- getAllocationCounter
|
||||
let newTotal = acc + (a0 - a1)
|
||||
ret <- case r of
|
||||
Done a -> do addStats env newTotal; return (Done a)
|
||||
Throw e -> do addStats env newTotal; return (Throw e)
|
||||
Blocked ivar k ->
|
||||
return (Blocked ivar (Cont (collectMemoData newTotal (toHaxl k))))
|
||||
setAllocationCounter a1
|
||||
return ret
|
||||
|
||||
execMemoNow :: GenHaxl u w a -> IVar u w a -> GenHaxl u w a
|
||||
execMemoNow cont ivar = GenHaxl $ \env -> do
|
||||
execMemoNow :: Env u w -> GenHaxl u w a -> IVar u w a -> IO (Result u w a)
|
||||
execMemoNow env cont ivar = do
|
||||
wlogs <- newIORef NilWrites
|
||||
let
|
||||
!ienv = imperative env { writeLogsRef = wlogs }
|
||||
|
@ -74,12 +74,14 @@ module Haxl.Core.Monad
|
||||
|
||||
-- * Env
|
||||
, Env(..)
|
||||
, DataCacheItem(..)
|
||||
, Caches
|
||||
, caches
|
||||
, initEnvWithData
|
||||
, initEnv
|
||||
, emptyEnv
|
||||
, env, withEnv
|
||||
, nextCallId
|
||||
, speculate
|
||||
, imperative
|
||||
|
||||
@ -130,6 +132,7 @@ import Control.Monad
|
||||
import qualified Control.Exception as Exception
|
||||
import Data.IORef
|
||||
import Data.Int
|
||||
import Data.Either (rights)
|
||||
import GHC.Exts (IsString(..))
|
||||
import Text.PrettyPrint hiding ((<>))
|
||||
import Text.Printf
|
||||
@ -151,13 +154,19 @@ import Haxl.Core.CallGraph
|
||||
-- The environment
|
||||
|
||||
-- | The data we carry around in the Haxl monad.
|
||||
|
||||
data DataCacheItem u w a = DataCacheItem (IVar u w a) {-# UNPACK #-} !CallId
|
||||
|
||||
data Env u w = Env
|
||||
{ dataCache :: {-# UNPACK #-} !(DataCache (IVar u w))
|
||||
{ dataCache :: {-# UNPACK #-} !(DataCache (DataCacheItem u w))
|
||||
-- ^ cached data fetches
|
||||
|
||||
, memoCache :: {-# UNPACK #-} !(DataCache (IVar u w))
|
||||
, memoCache :: {-# UNPACK #-} !(DataCache (DataCacheItem u w))
|
||||
-- ^ memoized computations
|
||||
|
||||
, memoKey :: {-# UNPACK #-} !CallId
|
||||
-- ^ current running memo key
|
||||
|
||||
, flags :: !Flags
|
||||
-- conservatively not unpacking, because this is passed
|
||||
-- to 'fetch' and would need to be rebuilt.
|
||||
@ -172,8 +181,11 @@ data Env u w = Env
|
||||
-- ^ keeps track of a Unique ID for each batch dispatched with stats
|
||||
-- enabled, for aggregating after.
|
||||
|
||||
, callIdRef :: {-# UNPACK #-} !(IORef CallId)
|
||||
-- ^ keeps track of a Unique ID for each fetch/memo.
|
||||
|
||||
, profCurrent :: ProfileCurrent
|
||||
-- ^ current profiling label, see 'withLabel'
|
||||
-- ^ current profiling label, see 'withLabel'
|
||||
|
||||
, profRef :: {-# UNPACK #-} !(IORef Profile)
|
||||
-- ^ profiling data, collected according to the 'report' level in 'flags'.
|
||||
@ -230,15 +242,28 @@ data ProfileCurrent = ProfileCurrent
|
||||
, profCurrentLabel :: {-# UNPACK #-} !ProfileLabel
|
||||
}
|
||||
|
||||
type Caches u w = (DataCache (IVar u w), DataCache (IVar u w))
|
||||
type Caches u w = (DataCache (DataCacheItem u w), DataCache (DataCacheItem u w))
|
||||
|
||||
caches :: Env u w -> Caches u w
|
||||
caches env = (dataCache env, memoCache env)
|
||||
|
||||
getMaxCallId :: DataCache (DataCacheItem u w) -> IO (Maybe Int)
|
||||
getMaxCallId c = do
|
||||
callIds <- rights . concatMap snd <$>
|
||||
DataCache.readCache c (\(DataCacheItem _ i) -> return i)
|
||||
case callIds of
|
||||
[] -> return Nothing
|
||||
vals -> return $ Just (maximum vals)
|
||||
|
||||
|
||||
-- | Initialize an environment with a 'StateStore', an input map, a
|
||||
-- preexisting 'DataCache', and a seed for the random number generator.
|
||||
initEnvWithData :: StateStore -> u -> Caches u w -> IO (Env u w)
|
||||
initEnvWithData states e (dcache, mcache) = do
|
||||
newCid <- max <$>
|
||||
(maybe 0 ((+) 1) <$> getMaxCallId dcache) <*>
|
||||
(maybe 0 ((+) 1) <$> getMaxCallId mcache)
|
||||
ciref<- newIORef newCid
|
||||
sref <- newIORef emptyStats
|
||||
sbref <- newIORef 0
|
||||
pref <- newIORef emptyProfile
|
||||
@ -251,12 +276,14 @@ initEnvWithData states e (dcache, mcache) = do
|
||||
return Env
|
||||
{ dataCache = dcache
|
||||
, memoCache = mcache
|
||||
, memoKey = (-1)
|
||||
, flags = defaultFlags
|
||||
, userEnv = e
|
||||
, states = states
|
||||
, statsRef = sref
|
||||
, statsBatchIdRef = sbref
|
||||
, profCurrent = ProfileCurrent 0 "MAIN"
|
||||
, callIdRef = ciref
|
||||
, profRef = pref
|
||||
, reqStoreRef = rs
|
||||
, runQueueRef = rq
|
||||
@ -814,6 +841,9 @@ withEnv newEnv (GenHaxl m) = GenHaxl $ \_env -> do
|
||||
Blocked ivar k ->
|
||||
return (Blocked ivar (Cont (withEnv newEnv (toHaxl k))))
|
||||
|
||||
nextCallId :: Env u w -> IO CallId
|
||||
nextCallId env = atomicModifyIORef' (callIdRef env) $ \x -> (x+1,x+1)
|
||||
|
||||
#ifdef PROFILING
|
||||
-- -----------------------------------------------------------------------------
|
||||
-- CallGraph recording
|
||||
@ -972,7 +1002,7 @@ dumpCacheAsHaskellFn fnName fnType cacheFn = do
|
||||
cache <- env dataCache -- NB. dataCache, not memoCache. We ignore memoized
|
||||
-- results when dumping the cache.
|
||||
let
|
||||
readIVar IVar{ivarRef = !ref} = do
|
||||
readIVar (DataCacheItem IVar{ivarRef = !ref} _) = do
|
||||
r <- readIORef ref
|
||||
case r of
|
||||
IVarFull (Ok a _) -> return (Just (Right a))
|
||||
|
@ -27,7 +27,6 @@ import Data.Hashable
|
||||
#if __GLASGOW_HASKELL__ < 804
|
||||
import Data.Monoid
|
||||
#endif
|
||||
import Data.Text (Text)
|
||||
import Data.Typeable
|
||||
import qualified Data.HashMap.Strict as HashMap
|
||||
import GHC.Exts
|
||||
@ -179,45 +178,36 @@ profileCont m env = do
|
||||
return r
|
||||
{-# INLINE profileCont #-}
|
||||
|
||||
|
||||
incrementMemoHitCounterFor :: ProfileKey -> Profile -> Profile
|
||||
incrementMemoHitCounterFor key p =
|
||||
p { profile =
|
||||
HashMap.insertWith
|
||||
incrementMemoHitCounter
|
||||
key
|
||||
(emptyProfileData { profileMemoHits = 1 })
|
||||
(profile p)
|
||||
incrementMemoHitCounterFor :: Env u w -> CallId -> Bool -> IO ()
|
||||
incrementMemoHitCounterFor env callId wasCached = do
|
||||
modifyIORef' (profRef env) $ \p -> p {
|
||||
profile = HashMap.insertWith
|
||||
upd
|
||||
(profCurrentKey $ profCurrent env)
|
||||
(emptyProfileData { profileMemos = [val] })
|
||||
(profile p)
|
||||
}
|
||||
|
||||
incrementMemoHitCounter :: ProfileData -> ProfileData -> ProfileData
|
||||
incrementMemoHitCounter _ pd = pd { profileMemoHits =
|
||||
succ (profileMemoHits pd)
|
||||
}
|
||||
where
|
||||
val = ProfileMemo callId wasCached
|
||||
upd _ old = old { profileMemos = val : profileMemos old }
|
||||
|
||||
{-# NOINLINE addProfileFetch #-}
|
||||
addProfileFetch
|
||||
:: forall r u w a . (DataSourceName r, Eq (r a), Hashable (r a), Typeable (r a))
|
||||
=> Env u w -> r a -> IO ()
|
||||
addProfileFetch env _req = do
|
||||
=> Env u w -> r a -> CallId -> Bool -> IO ()
|
||||
addProfileFetch env _req cid wasCached = do
|
||||
c <- getAllocationCounter
|
||||
let (ProfileCurrent profKey _) = profCurrent env
|
||||
modifyIORef' (profRef env) $ \ p ->
|
||||
let
|
||||
dsName :: Text
|
||||
dsName = dataSourceName (Proxy :: Proxy r)
|
||||
|
||||
upd _ old = old { profileFetches =
|
||||
HashMap.insertWith (+) dsName 1 (profileFetches old) }
|
||||
val = ProfileFetch cid (memoKey env) wasCached
|
||||
upd _ old = old { profileFetches = val : profileFetches old }
|
||||
|
||||
in p { profile =
|
||||
HashMap.insertWith
|
||||
upd
|
||||
profKey
|
||||
(emptyProfileData { profileFetches =
|
||||
HashMap.singleton dsName 1
|
||||
}
|
||||
)
|
||||
(emptyProfileData { profileFetches = [val] })
|
||||
(profile p)
|
||||
}
|
||||
-- So we do not count the allocation overhead of addProfileFetch
|
||||
|
@ -21,6 +21,7 @@
|
||||
--
|
||||
module Haxl.Core.RequestStore
|
||||
( BlockedFetches(..)
|
||||
, BlockedFetchInternal(..)
|
||||
, RequestStore
|
||||
, isEmpty
|
||||
, noRequests
|
||||
@ -35,6 +36,7 @@ module Haxl.Core.RequestStore
|
||||
) where
|
||||
|
||||
import Haxl.Core.DataSource
|
||||
import Haxl.Core.Stats
|
||||
import Data.Map (Map)
|
||||
import qualified Data.Map.Strict as Map
|
||||
import Data.Proxy
|
||||
@ -48,9 +50,12 @@ newtype RequestStore u = RequestStore (Map TypeRep (BlockedFetches u))
|
||||
-- is dynamically-typed. It maps the TypeRep of the request to the
|
||||
-- 'BlockedFetches' for that 'DataSource'.
|
||||
|
||||
newtype BlockedFetchInternal = BlockedFetchInternal CallId
|
||||
|
||||
-- | A batch of 'BlockedFetch' objects for a single 'DataSource'
|
||||
data BlockedFetches u =
|
||||
forall r. (DataSource u r) => BlockedFetches [BlockedFetch r]
|
||||
forall r. (DataSource u r) =>
|
||||
BlockedFetches [BlockedFetch r] [BlockedFetchInternal]
|
||||
|
||||
isEmpty :: RequestStore u -> Bool
|
||||
isEmpty (RequestStore m) = Map.null m
|
||||
@ -62,13 +67,13 @@ noRequests = RequestStore Map.empty
|
||||
-- | Adds a 'BlockedFetch' to a 'RequestStore'.
|
||||
addRequest
|
||||
:: forall u r. (DataSource u r)
|
||||
=> BlockedFetch r -> RequestStore u -> RequestStore u
|
||||
addRequest bf (RequestStore m) =
|
||||
RequestStore $ Map.insertWith combine ty (BlockedFetches [bf]) m
|
||||
=> BlockedFetch r -> BlockedFetchInternal -> RequestStore u -> RequestStore u
|
||||
addRequest bf bfi (RequestStore m) =
|
||||
RequestStore $ Map.insertWith combine ty (BlockedFetches [bf] [bfi]) m
|
||||
where
|
||||
combine :: BlockedFetches u -> BlockedFetches u -> BlockedFetches u
|
||||
combine _ (BlockedFetches bfs)
|
||||
| typeOf1 (getR bfs) == ty = BlockedFetches (unsafeCoerce bf:bfs)
|
||||
combine _ (BlockedFetches bfs bfis)
|
||||
| typeOf1 (getR bfs) == ty = BlockedFetches (unsafeCoerce bf:bfs) (bfi:bfis)
|
||||
| otherwise = error "RequestStore.insert"
|
||||
-- the dynamic type check here should be unnecessary, but if
|
||||
-- there are bugs in `Typeable` or `Map` then we'll get an
|
||||
|
@ -18,6 +18,7 @@ module Haxl.Core.Stats
|
||||
(
|
||||
-- * Data-source stats
|
||||
Stats(..)
|
||||
, CallId
|
||||
, FetchStats(..)
|
||||
, Microseconds
|
||||
, Timestamp
|
||||
@ -30,6 +31,8 @@ module Haxl.Core.Stats
|
||||
|
||||
-- * Profiling
|
||||
, Profile(..)
|
||||
, ProfileMemo(..)
|
||||
, ProfileFetch(..)
|
||||
, emptyProfile
|
||||
, ProfileKey
|
||||
, ProfileLabel
|
||||
@ -37,7 +40,6 @@ module Haxl.Core.Stats
|
||||
, emptyProfileData
|
||||
, AllocCount
|
||||
, LabelHitCount
|
||||
, MemoHitCount
|
||||
|
||||
-- * Allocation
|
||||
, getAllocationCounter
|
||||
@ -106,6 +108,7 @@ ppStats (Stats rss) =
|
||||
fetchWasRunning fs t1 t2 =
|
||||
(fetchStart fs + fetchDuration fs) >= t1 && fetchStart fs < t2
|
||||
|
||||
type CallId = Int
|
||||
|
||||
-- | Maps data source name to the number of requests made in that round.
|
||||
-- The map only contains entries for sources that made requests in that
|
||||
@ -115,11 +118,12 @@ data FetchStats
|
||||
= FetchStats
|
||||
{ fetchDataSource :: Text
|
||||
, fetchBatchSize :: {-# UNPACK #-} !Int
|
||||
, fetchStart :: !Timestamp -- TODO should be something else
|
||||
, fetchStart :: {-# UNPACK #-} !Timestamp
|
||||
, fetchDuration :: {-# UNPACK #-} !Microseconds
|
||||
, fetchSpace :: {-# UNPACK #-} !Int64
|
||||
, fetchFailures :: {-# UNPACK #-} !Int
|
||||
, fetchBatchId :: {-# UNPACK #-} !Int
|
||||
, fetchIds :: [CallId]
|
||||
}
|
||||
|
||||
-- | The stack trace of a call to 'dataFetch'. These are collected
|
||||
@ -127,6 +131,11 @@ data FetchStats
|
||||
| FetchCall
|
||||
{ fetchReq :: String
|
||||
, fetchStack :: [String]
|
||||
, fetchStatId :: {-# UNPACK #-} !CallId
|
||||
}
|
||||
| MemoCall
|
||||
{ memoStatId :: {-# UNPACK #-} !CallId
|
||||
, memoSpace :: {-# UNPACK #-} !Int64
|
||||
}
|
||||
deriving (Eq, Show)
|
||||
|
||||
@ -136,7 +145,8 @@ ppFetchStats FetchStats{..} =
|
||||
printf "%s: %d fetches (%.2fms, %d bytes, %d failures)"
|
||||
(Text.unpack fetchDataSource) fetchBatchSize
|
||||
(fromIntegral fetchDuration / 1000 :: Double) fetchSpace fetchFailures
|
||||
ppFetchStats (FetchCall r ss) = show r ++ '\n':show ss
|
||||
ppFetchStats (FetchCall r ss _) = show r ++ '\n':show ss
|
||||
ppFetchStats (MemoCall _r _ss) = ""
|
||||
|
||||
-- | Aggregate stats merging FetchStats from the same dispatched batch into one.
|
||||
aggregateFetchBatches :: ([FetchStats] -> a) -> Stats -> [a]
|
||||
@ -154,11 +164,17 @@ instance ToJSON FetchStats where
|
||||
, "duration" .= fetchDuration
|
||||
, "allocation" .= fetchSpace
|
||||
, "failures" .= fetchFailures
|
||||
, "bachid" .= fetchBatchId
|
||||
, "batchid" .= fetchBatchId
|
||||
, "fetchids" .= fetchIds
|
||||
]
|
||||
toJSON (FetchCall req strs) = object
|
||||
toJSON (FetchCall req strs fid) = object
|
||||
[ "request" .= req
|
||||
, "stack" .= strs
|
||||
, "fetchid" .= fid
|
||||
]
|
||||
toJSON (MemoCall cid allocs) = object
|
||||
[ "callid" .= cid
|
||||
, "allocation" .= allocs
|
||||
]
|
||||
|
||||
emptyStats :: Stats
|
||||
@ -174,9 +190,21 @@ numFetches (Stats rs) = sum [ fetchBatchSize | FetchStats{..} <- rs ]
|
||||
type ProfileLabel = Text
|
||||
type AllocCount = Int64
|
||||
type LabelHitCount = Int64
|
||||
type MemoHitCount = Int64
|
||||
type ProfileKey = Int64
|
||||
|
||||
data ProfileFetch = ProfileFetch
|
||||
{ profileFetchFetchId :: {-# UNPACK #-} !CallId
|
||||
, profileFetchMemoId :: {-# UNPACK #-} !CallId
|
||||
, profileFetchWasCached :: !Bool
|
||||
}
|
||||
deriving (Show, Eq)
|
||||
|
||||
data ProfileMemo = ProfileMemo
|
||||
{ profileMemoId :: {-# UNPACK #-} !CallId
|
||||
, profileMemoWasCached :: !Bool
|
||||
}
|
||||
deriving (Show, Eq)
|
||||
|
||||
data Profile = Profile
|
||||
{ profile :: HashMap ProfileKey ProfileData
|
||||
-- ^ Data per key (essentially per call stack)
|
||||
@ -193,14 +221,14 @@ emptyProfile = Profile HashMap.empty (HashMap.singleton ("MAIN", 0) 0) 1
|
||||
data ProfileData = ProfileData
|
||||
{ profileAllocs :: {-# UNPACK #-} !AllocCount
|
||||
-- ^ allocations made by this label
|
||||
, profileFetches :: HashMap Text Int
|
||||
-- ^ map from datasource name => fetch count
|
||||
, profileFetches :: [ProfileFetch]
|
||||
-- ^ fetches made in this label
|
||||
, profileLabelHits :: {-# UNPACK #-} !LabelHitCount
|
||||
-- ^ number of hits at this label
|
||||
, profileMemoHits :: {-# UNPACK #-} !MemoHitCount
|
||||
-- ^ number of hits to memoized computation at this label
|
||||
, profileMemos :: [ProfileMemo]
|
||||
-- ^ memo and a boolean representing if it was cached at the time
|
||||
}
|
||||
deriving Show
|
||||
|
||||
emptyProfileData :: ProfileData
|
||||
emptyProfileData = ProfileData 0 HashMap.empty 0 0
|
||||
emptyProfileData = ProfileData 0 [] 0 []
|
||||
|
@ -1,6 +1,6 @@
|
||||
# Changes in version <next>
|
||||
* Added fetchBatchId to FetchStats
|
||||
* Profiling now tracks full stacks
|
||||
* Profiling now tracks full stacks and links each label to memos/fetches
|
||||
|
||||
# Changes in version 2.3.0.0
|
||||
* Removed `FutureFetch`
|
||||
|
@ -141,6 +141,8 @@ cacheReuse future = do
|
||||
tao <- MockTAO.initGlobalState future
|
||||
let st = stateSet tao stateEmpty
|
||||
env2 <- initEnvWithData st testinput (caches env)
|
||||
cid <- readIORef (callIdRef env2)
|
||||
assertBool "callId is unique" (cid > 0)
|
||||
|
||||
-- ensure no more data fetching rounds needed
|
||||
expectResultWithEnv 12 batching7_ env2
|
||||
|
@ -23,6 +23,7 @@ import Control.Exception (evaluate)
|
||||
import Data.Aeson
|
||||
import Data.IORef
|
||||
import qualified Data.HashMap.Strict as HashMap
|
||||
import Data.Int
|
||||
|
||||
import TestUtils
|
||||
import WorkDataSource
|
||||
@ -131,9 +132,43 @@ threadAlloc batches = do
|
||||
-- if we actually do more than 1 batch then the above test is not useful
|
||||
|
||||
|
||||
-- Test that we correctly attribute memo work
|
||||
memos:: Assertion
|
||||
memos = do
|
||||
env <- mkProfilingEnv
|
||||
let
|
||||
memoAllocs = 10000000 :: Int64
|
||||
doWorkMemo = memo (1 :: Int) $ unsafeLiftIO $ do
|
||||
a0 <- getAllocationCounter
|
||||
setAllocationCounter $ a0 - memoAllocs
|
||||
return (5 :: Int)
|
||||
_ <- runHaxl env $ andThen
|
||||
(withLabel "do" doWorkMemo)
|
||||
(withLabel "cached" doWorkMemo)
|
||||
profData <- labelToDataMap <$> readIORef (profRef env)
|
||||
case HashMap.lookup "do" profData of
|
||||
Nothing -> assertFailure "do not in data"
|
||||
Just ProfileData{..} -> do
|
||||
assertEqual "has correct memo id" profileMemos [ProfileMemo 1 False]
|
||||
assertBool "allocs are included in 'do'" (profileAllocs > memoAllocs)
|
||||
case HashMap.lookup "cached" profData of
|
||||
Nothing -> assertFailure "cached not in data"
|
||||
Just ProfileData{..} -> do
|
||||
assertEqual "has correct memo id" profileMemos [ProfileMemo 1 True]
|
||||
assertBool "allocs are *not* included in 'cached'" (profileAllocs < 50000)
|
||||
(Stats memoStats) <- readIORef (statsRef env)
|
||||
assertEqual "exactly 1 memo/fetch" 1 (length memoStats)
|
||||
let memoStat = head memoStats
|
||||
putStrLn $ "memoStat=" ++ show memoStat
|
||||
assertEqual "correct call id" 1 (memoStatId memoStat)
|
||||
assertBool "allocs are big enough" $ memoSpace memoStat >= memoAllocs
|
||||
assertBool "allocs are not too big" $ memoSpace memoStat < memoAllocs + 100000
|
||||
|
||||
|
||||
tests = TestList
|
||||
[ TestLabel "collectsdata" $ TestCase collectsdata
|
||||
, TestLabel "exceptions" $ TestCase exceptions
|
||||
, TestLabel "threads" $ TestCase (threadAlloc 1)
|
||||
, TestLabel "threads with batch" $ TestCase (threadAlloc 50)
|
||||
, TestLabel "memos" $ TestCase memos
|
||||
]
|
||||
|
@ -11,8 +11,20 @@ module StatsTests (tests) where
|
||||
|
||||
import Test.HUnit
|
||||
import Data.List
|
||||
import Data.Maybe
|
||||
|
||||
import Haxl.Prelude
|
||||
import Haxl.Core
|
||||
import Prelude()
|
||||
|
||||
|
||||
import ExampleDataSource
|
||||
import SleepDataSource
|
||||
import Haxl.DataSource.ConcurrentIO
|
||||
|
||||
import Control.Monad (void)
|
||||
import Data.IORef
|
||||
import qualified Data.HashMap.Strict as HashMap
|
||||
|
||||
aggregateBatches :: Test
|
||||
aggregateBatches = TestCase $ do
|
||||
@ -23,16 +35,18 @@ aggregateBatches = TestCase $ do
|
||||
, fetchDuration = 10
|
||||
, fetchSpace = 1
|
||||
, fetchFailures = 2
|
||||
, fetchBatchId = n }
|
||||
, fetchBatchId = n
|
||||
, fetchIds = [1,2] }
|
||||
| n <- reverse [1..10] ++ [11..20] ]
|
||||
++ [ FetchCall "A" ["B"], FetchCall "C" ["D"] ]
|
||||
++ [ FetchCall "A" ["B"] 1, FetchCall "C" ["D"] 2 ]
|
||||
fetchBatch = [ FetchStats { fetchDataSource = "batch"
|
||||
, fetchBatchSize = 1
|
||||
, fetchStart = 100
|
||||
, fetchDuration = 1000 * n
|
||||
, fetchSpace = 3
|
||||
, fetchFailures = if n <= 3 then 1 else 0
|
||||
, fetchBatchId = 123 } | n <- [1..50] ]
|
||||
, fetchBatchId = 123
|
||||
, fetchIds = [fromIntegral n] } | n <- [1..50] ]
|
||||
agg (sz,bids) FetchStats{..} = (sz + fetchBatchSize, fetchBatchId:bids)
|
||||
agg _ _ = error "unexpected"
|
||||
agg' = foldl' agg (0,[])
|
||||
@ -50,4 +64,104 @@ aggregateBatches = TestCase $ do
|
||||
assertEqual
|
||||
"Grouping works as expected" expectedResultInterspersed aggInterspersedBatch
|
||||
|
||||
tests = TestList [TestLabel "Aggregate Batches" aggregateBatches]
|
||||
|
||||
testEnv = do
|
||||
-- To use a data source, we need to initialize its state:
|
||||
exstate <- ExampleDataSource.initGlobalState
|
||||
sleepState <- mkConcurrentIOState
|
||||
|
||||
-- And create a StateStore object containing the states we need:
|
||||
let st = stateSet exstate (stateSet sleepState stateEmpty)
|
||||
|
||||
-- Create the Env:
|
||||
env <- initEnv st ()
|
||||
return env{ flags = (flags env){ report = 5 } }
|
||||
|
||||
|
||||
fetchIdsSync :: Test
|
||||
fetchIdsSync = TestCase $ do
|
||||
env <- testEnv
|
||||
_ <- runHaxl env $
|
||||
sequence_
|
||||
[ void $ countAardvarks "abcabc" + (length <$> listWombats 3)
|
||||
, void $ listWombats 100
|
||||
, void $ listWombats 99 ]
|
||||
-- expect a single DS stat
|
||||
(Stats stats) <- readIORef (statsRef env)
|
||||
let
|
||||
fetchStats = [x | x@FetchStats{} <- stats]
|
||||
assertEqual "Only 1 batch" 1 (length fetchStats)
|
||||
|
||||
fetchIdsBackground :: Test
|
||||
fetchIdsBackground = TestCase $ do
|
||||
env <- testEnv
|
||||
_ <- runHaxl env $
|
||||
sequence_
|
||||
[ withLabel "short" $ sleep 1
|
||||
, withLabel "long" $ sleep 500 ]
|
||||
|
||||
-- make sure that with memo'ing we still preserve the stack
|
||||
_ <- runHaxl env $ withLabel "base"
|
||||
(memo (1 :: Int) $ withLabel "child" $ sleep 102)
|
||||
|
||||
_ <- runHaxl env $ withLabel "short_cached" $ sleep 1
|
||||
|
||||
-- expect a single DS stat
|
||||
(Stats stats) <- readIORef (statsRef env)
|
||||
(Profile p pt _) <- readIORef (profRef env)
|
||||
let
|
||||
keyMap =
|
||||
HashMap.fromList [ (label, k) | ((label,_), k) <- HashMap.toList pt]
|
||||
revMap = HashMap.fromList [(v,k) | (k,v) <- HashMap.toList pt]
|
||||
parentMap =
|
||||
HashMap.fromList $
|
||||
catMaybes
|
||||
[ case HashMap.lookup kp revMap of
|
||||
Just (lp,_) -> Just (label, lp)
|
||||
Nothing -> Nothing
|
||||
| ((label,kp), _) <- HashMap.toList pt]
|
||||
fetchMap = HashMap.fromList [ (fid, x) | x@FetchStats{} <- stats
|
||||
, fid <- fetchIds x]
|
||||
get l = [ (prof, wasCached, fetchStat)
|
||||
| Just key <- [HashMap.lookup l keyMap]
|
||||
, Just prof <- [HashMap.lookup key p]
|
||||
, ProfileFetch fid _ wasCached <- profileFetches prof
|
||||
, Just fetchStat <- [HashMap.lookup fid fetchMap]]
|
||||
[(short, shortWC, shortFetch)] = get "short"
|
||||
[(long, longWC, longFetch)] = get "long"
|
||||
[(shortCached, shortCachedWC, shortCachedFetch)] = get "short_cached"
|
||||
|
||||
assertEqual "3 batches" 3 (HashMap.size fetchMap)
|
||||
assertEqual "6 labels (inc MAIN)" 6 (HashMap.size keyMap)
|
||||
|
||||
assertEqual "child parent is base"
|
||||
(Just "base")
|
||||
(HashMap.lookup "child" parentMap)
|
||||
|
||||
assertEqual "base parent is MAIN"
|
||||
(Just "MAIN")
|
||||
(HashMap.lookup "base" parentMap)
|
||||
|
||||
assertEqual "long parent is MAIN"
|
||||
(Just "MAIN")
|
||||
(HashMap.lookup "long" parentMap)
|
||||
|
||||
assertBool "original fetches not cached (short)" (not shortWC)
|
||||
assertBool "original fetches not cached (long)" (not longWC)
|
||||
assertBool "was cached short" shortCachedWC
|
||||
|
||||
assertEqual "one fetch short" 1 (length $ profileFetches short)
|
||||
assertEqual "one fetch long" 1 (length $ profileFetches long)
|
||||
assertEqual "one fetch short_cached" 1 (length $ profileFetches shortCached)
|
||||
|
||||
assertBool "short fetch mapped properly" (fetchDuration shortFetch < 100000)
|
||||
assertEqual
|
||||
"short cached fetch mapped properly"
|
||||
(fetchDuration shortFetch)
|
||||
(fetchDuration shortCachedFetch)
|
||||
assertBool "long fetch was mapped properly" (fetchDuration longFetch > 100000)
|
||||
|
||||
|
||||
tests = TestList [ TestLabel "Aggregate Batches" aggregateBatches
|
||||
, TestLabel "Fetch IDs Sync" fetchIdsSync
|
||||
, TestLabel "Fetch IDs Background" fetchIdsBackground ]
|
||||
|
Loading…
Reference in New Issue
Block a user