diff --git a/Haxl/Core.hs b/Haxl/Core.hs index 8a37514..6cba0f3 100644 --- a/Haxl/Core.hs +++ b/Haxl/Core.hs @@ -44,6 +44,7 @@ module Haxl.Core ( -- ** Statistics , Stats(..) , FetchStats(..) + , CallId , Microseconds , Timestamp , emptyStats @@ -52,6 +53,8 @@ module Haxl.Core ( , ppFetchStats , aggregateFetchBatches , Profile(..) + , ProfileMemo(..) + , ProfileFetch(..) , emptyProfile , ProfileLabel , ProfileKey @@ -59,7 +62,6 @@ module Haxl.Core ( , emptyProfileData , AllocCount , LabelHitCount - , MemoHitCount -- ** Tracing flags , Flags(..) diff --git a/Haxl/Core/DataCache.hs b/Haxl/Core/DataCache.hs index 4d02ebe..d9cb4f7 100644 --- a/Haxl/Core/DataCache.hs +++ b/Haxl/Core/DataCache.hs @@ -22,6 +22,7 @@ module Haxl.Core.DataCache , insertWithShow , lookup , showCache + , readCache ) where import Prelude hiding (lookup) @@ -166,3 +167,23 @@ showCache (DataCache cache) readRes = H.foldM goSubCache [] cache Just (Left e) -> (showReq request, Left e) : res Just (Right result) -> (showReq request, Right (showRes result)) : res + +-- | Dumps the contents of the cache responses to list +readCache + :: forall res ret + . DataCache res + -> (forall a . res a -> IO ret) + -> IO [(TypeRep, [Either SomeException ret])] +readCache (DataCache cache) readRes = H.foldM goSubCache [] cache + where + goSubCache + :: [(TypeRep, [Either SomeException ret])] + -> (TypeRep, SubCache res) + -> IO [(TypeRep, [Either SomeException ret])] + goSubCache res (ty, SubCache _showReq _showRes hm) = do + subCacheResult <- H.foldM go [] hm + return $ (ty, subCacheResult):res + where + go res (_request, rvar) = do + r <- try $ readRes rvar + return $ r : res diff --git a/Haxl/Core/Fetch.hs b/Haxl/Core/Fetch.hs index 71aac66..541b20b 100644 --- a/Haxl/Core/Fetch.hs +++ b/Haxl/Core/Fetch.hs @@ -74,15 +74,18 @@ data CacheResult u w a = Uncached (ResultVar a) {-# UNPACK #-} !(IVar u w a) + {-# UNPACK #-} !CallId -- | The request has been seen before, but its result has not yet been -- fetched. | CachedNotFetched {-# UNPACK #-} !(IVar u w a) + {-# UNPACK #-} !CallId -- | The request has been seen before, and its result has already been -- fetched. | Cached (ResultVal a w) + {-# UNPACK #-} !CallId -- | Show functions for request and its result. @@ -101,31 +104,32 @@ cachedWithInsert :: forall r a u w. (DataSource u r, Typeable (r a)) => (r a -> String) -- See Note [showFn] - -> (r a -> IVar u w a -> DataCache (IVar u w) -> IO ()) + -> (r a -> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ()) -> Env u w -> r a -> IO (CacheResult u w a) -cachedWithInsert showFn insertFn Env{..} req = do +cachedWithInsert showFn insertFn env@Env{..} req = do let doFetch = do ivar <- newIVar + k <- nextCallId env let !rvar = stdResultVar ivar completions submittedReqsRef flags (Proxy :: Proxy r) - insertFn req ivar dataCache - return (Uncached rvar ivar) + insertFn req (DataCacheItem ivar k) dataCache + return (Uncached rvar ivar k) mbRes <- DataCache.lookup req dataCache case mbRes of Nothing -> doFetch - Just i@IVar{ivarRef = cr} -> do + Just (DataCacheItem i@IVar{ivarRef = cr} k) -> do e <- readIORef cr case e of IVarEmpty _ -> do ivar <- withCurrentCCS i - return (CachedNotFetched ivar) + return (CachedNotFetched ivar k) IVarFull r -> do ifTrace flags 3 $ putStrLn $ case r of ThrowIO{} -> "Cached error: " ++ showFn req ThrowHaxl{} -> "Cached error: " ++ showFn req Ok{} -> "Cached request: " ++ showFn req - return (Cached r) + return (Cached r k) -- | Make a ResultVar with the standard function for sending a CompletionReq @@ -160,15 +164,15 @@ stdResultVar ivar completions ref flags p = -- | Record the call stack for a data fetch in the Stats. Only useful -- when profiling. -logFetch :: Env u w -> (r a -> String) -> r a -> IO () +logFetch :: Env u w -> (r a -> String) -> r a -> CallId -> IO () #ifdef PROFILING -logFetch env showFn req = do +logFetch env showFn req fid = do ifReport (flags env) 5 $ do stack <- currentCallStack modifyIORef' (statsRef env) $ \(Stats s) -> - Stats (FetchCall (showFn req) stack : s) + Stats (FetchCall (showFn req) stack fid : s) #else -logFetch _ _ _ = return () +logFetch _ _ _ _ = return () #endif -- | Performs actual fetching of data for a 'Request' from a 'DataSource'. @@ -190,38 +194,44 @@ dataFetchWithInsert :: forall u w r a . (DataSource u r, Eq (r a), Hashable (r a), Typeable (r a)) => (r a -> String) -- See Note [showFn] - -> (r a -> IVar u w a -> DataCache (IVar u w) -> IO ()) + -> (r a -> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ()) -> r a -> GenHaxl u w a dataFetchWithInsert showFn insertFn req = GenHaxl $ \env@Env{..} -> do -- First, check the cache res <- cachedWithInsert showFn insertFn env req - ifProfiling flags $ addProfileFetch env req case res of -- This request has not been seen before - Uncached rvar ivar -> do - logFetch env showFn req + Uncached rvar ivar fid -> do + logFetch env showFn req fid + ifProfiling flags $ addProfileFetch env req fid False -- -- Check whether the data source wants to submit requests -- eagerly, or batch them up. -- + let blockedFetch = BlockedFetch req rvar + let blockedFetchI = BlockedFetchInternal fid case schedulerHint userEnv :: SchedulerHint r of SubmitImmediately -> - performFetches env [BlockedFetches [BlockedFetch req rvar]] + performFetches env [BlockedFetches [blockedFetch] [blockedFetchI]] TryToBatch -> -- add the request to the RequestStore and continue modifyIORef' reqStoreRef $ \bs -> - addRequest (BlockedFetch req rvar) bs + addRequest blockedFetch blockedFetchI bs -- return $ Blocked ivar (Return ivar) -- Seen before but not fetched yet. We're blocked, but we don't have -- to add the request to the RequestStore. - CachedNotFetched ivar -> return $ Blocked ivar (Return ivar) + CachedNotFetched ivar fid -> do + ifProfiling flags $ addProfileFetch env req fid True + return $ Blocked ivar (Return ivar) -- Cached: either a result, or an exception - Cached r -> done r + Cached r fid -> do + ifProfiling flags $ addProfileFetch env req fid True + done r -- | A data request that is not cached. This is not what you want for -- normal read requests, because then multiple identical requests may @@ -246,11 +256,12 @@ uncachedRequest req = do subRef <- env submittedReqsRef if recording flg /= 0 then dataFetch req - else GenHaxl $ \Env{..} -> do + else GenHaxl $ \e@Env{..} -> do ivar <- newIVar + k <- nextCallId e let !rvar = stdResultVar ivar completions subRef flg (Proxy :: Proxy r) modifyIORef' reqStoreRef $ \bs -> - addRequest (BlockedFetch req rvar) bs + addRequest (BlockedFetch req rvar) (BlockedFetchInternal k) bs return $ Blocked ivar (Return ivar) @@ -275,9 +286,11 @@ cacheResultWithShow (showReq, showRes) = cacheResultWithInsert showReq cacheResultWithInsert :: Typeable (r a) => (r a -> String) -- See Note [showFn] - -> (r a -> IVar u w a -> DataCache (IVar u w) -> IO ()) -> r a - -> IO a -> GenHaxl u w a -cacheResultWithInsert showFn insertFn req val = GenHaxl $ \Env{..} -> do + -> (r a -> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ()) + -> r a + -> IO a + -> GenHaxl u w a +cacheResultWithInsert showFn insertFn req val = GenHaxl $ \e@Env{..} -> do mbRes <- DataCache.lookup req dataCache case mbRes of Nothing -> do @@ -287,9 +300,10 @@ cacheResultWithInsert showFn insertFn req val = GenHaxl $ \Env{..} -> do _ -> return () let result = eitherToResultThrowIO eitherResult ivar <- newFullIVar result - insertFn req ivar dataCache + k <- nextCallId e + insertFn req (DataCacheItem ivar k) dataCache done result - Just IVar{ivarRef = cr} -> do + Just (DataCacheItem IVar{ivarRef = cr} _) -> do e <- readIORef cr case e of IVarEmpty _ -> corruptCache @@ -313,12 +327,13 @@ cacheResultWithInsert showFn insertFn req val = GenHaxl $ \Env{..} -> do -- cacheRequest :: Request req a => req a -> Either SomeException a -> GenHaxl u w () -cacheRequest request result = GenHaxl $ \Env{..} -> do +cacheRequest request result = GenHaxl $ \e@Env{..} -> do mbRes <- DataCache.lookup request dataCache case mbRes of Nothing -> do cr <- newFullIVar (eitherToResult result) - DataCache.insert request cr dataCache + k <- nextCallId e + DataCache.insert request (DataCacheItem cr k) dataCache return (Done ()) -- It is an error if the request is already in the cache. @@ -334,9 +349,10 @@ cacheRequest request result = GenHaxl $ \Env{..} -> do -- Useful e.g. for unit tests dupableCacheRequest :: Request req a => req a -> Either SomeException a -> GenHaxl u w () -dupableCacheRequest request result = GenHaxl $ \Env{..} -> do +dupableCacheRequest request result = GenHaxl $ \e@Env{..} -> do cr <- newFullIVar (eitherToResult result) - DataCache.insert request cr dataCache + k <- nextCallId e + DataCache.insert request (DataCacheItem cr k) dataCache return (Done ()) performRequestStore @@ -353,11 +369,11 @@ performFetches env@Env{flags=f, statsRef=sref, statsBatchIdRef=sbref} jobs = do t0 <- getTimestamp ifTrace f 3 $ - forM_ jobs $ \(BlockedFetches reqs) -> + forM_ jobs $ \(BlockedFetches reqs _) -> forM_ reqs $ \(BlockedFetch r _) -> putStrLn (showp r) let - applyFetch i (BlockedFetches (reqs :: [BlockedFetch r])) = + applyFetch i bfs@(BlockedFetches (reqs :: [BlockedFetch r]) _) = case stateGet (states env) of Nothing -> return (FetchToDo reqs (SyncFetch (mapM_ (setError e)))) @@ -367,10 +383,9 @@ performFetches env@Env{flags=f, statsRef=sref, statsBatchIdRef=sbref} jobs = do <> ": " <> Text.pack (showp req) Just state -> - return - $ FetchToDo reqs + return $ FetchToDo reqs $ (if report f >= 2 - then wrapFetchInStats sref sbref dsName (length reqs) + then wrapFetchInStats sref sbref dsName (length reqs) bfs else id) $ wrapFetchInTrace i (length reqs) dsName $ wrapFetchInCatch reqs @@ -430,14 +445,21 @@ wrapFetchInCatch reqs fetch = wrapFetchInStats - :: IORef Stats + :: forall u req . + IORef Stats -> IORef Int -> Text -> Int + -> BlockedFetches u -> PerformFetch req -> PerformFetch req - -wrapFetchInStats !statsRef !batchIdRef dataSource batchSize perform = do +wrapFetchInStats + !statsRef + !batchIdRef + dataSource + batchSize + (BlockedFetches _reqs reqsI) + perform = do case perform of SyncFetch f -> SyncFetch $ \reqs -> do @@ -445,7 +467,7 @@ wrapFetchInStats !statsRef !batchIdRef dataSource batchSize perform = do fail_ref <- newIORef 0 (t0,t,alloc,_) <- statsForIO (f (map (addFailureCount fail_ref) reqs)) failures <- readIORef fail_ref - updateFetchStats bid t0 t alloc batchSize failures + updateFetchStats bid allFids t0 t alloc batchSize failures AsyncFetch f -> do AsyncFetch $ \reqs inner -> do bid <- newBatchId @@ -458,14 +480,15 @@ wrapFetchInStats !statsRef !batchIdRef dataSource batchSize perform = do (t0, totalTime, totalAlloc, _) <- statsForIO (f reqs' inner') (innerTime, innerAlloc) <- readIORef inner_r failures <- readIORef fail_ref - updateFetchStats bid t0 (totalTime - innerTime) + updateFetchStats bid allFids t0 (totalTime - innerTime) (totalAlloc - innerAlloc) batchSize failures BackgroundFetch io -> do BackgroundFetch $ \reqs -> do bid <- newBatchId startTime <- getTimestamp - io (map (addTimer bid startTime) reqs) + io (zipWith (addTimer bid startTime) reqs reqsI) where + allFids = map (\(BlockedFetchInternal k) -> k) reqsI newBatchId = atomicModifyIORef' batchIdRef $ \x -> (x+1,x+1) statsForIO io = do prevAlloc <- getAllocationCounter @@ -473,32 +496,44 @@ wrapFetchInStats !statsRef !batchIdRef dataSource batchSize perform = do postAlloc <- getAllocationCounter return (t0,t, fromIntegral $ prevAlloc - postAlloc, a) - addTimer bid t0 (BlockedFetch req (ResultVar fn)) = - BlockedFetch req $ ResultVar $ \result isChildThread -> do - t1 <- getTimestamp - -- We cannot measure allocation easily for BackgroundFetch. Here we - -- just attribute all allocation to the last `putResultFromChildThread` - -- and use 0 for the others. While the individual allocations may not - -- be correct, the total sum and amortized allocation are still - -- meaningful. - -- see Note [tracking allocation in child threads] - allocs <- if isChildThread then getAllocationCounter else return 0 - updateFetchStats bid t0 (t1 - t0) - (negate allocs) - 1 -- batch size: we don't know if this is a batch or not - (if isLeft result then 1 else 0) -- failures - fn result isChildThread + addTimer + bid + t0 + (BlockedFetch req (ResultVar fn)) + (BlockedFetchInternal fid) = + BlockedFetch req $ ResultVar $ \result isChildThread -> do + t1 <- getTimestamp + -- We cannot measure allocation easily for BackgroundFetch. Here we + -- just attribute all allocation to the last + -- `putResultFromChildThread` and use 0 for the others. + -- While the individual allocations may not be correct, + -- the total sum and amortized allocation are still meaningful. + -- see Note [tracking allocation in child threads] + allocs <- if isChildThread then getAllocationCounter else return 0 + updateFetchStats bid [fid] t0 (t1 - t0) + (negate allocs) + 1 -- batch size: we don't know if this is a batch or not + (if isLeft result then 1 else 0) -- failures + fn result isChildThread updateFetchStats - :: Int -> Timestamp -> Microseconds -> Int64 -> Int -> Int -> IO () - updateFetchStats bid start time space batch failures = do + :: Int + -> [CallId] + -> Timestamp + -> Microseconds + -> Int64 + -> Int + -> Int + -> IO () + updateFetchStats bid fids start time space batch failures = do let this = FetchStats { fetchDataSource = dataSource , fetchBatchSize = batch , fetchStart = start , fetchDuration = time , fetchSpace = space , fetchFailures = failures - , fetchBatchId = bid } + , fetchBatchId = bid + , fetchIds = fids } atomicModifyIORef' statsRef $ \(Stats fs) -> (Stats (this : fs), ()) addFailureCount :: IORef Int -> BlockedFetch r -> BlockedFetch r diff --git a/Haxl/Core/Memo.hs b/Haxl/Core/Memo.hs index 7b07490..db2dd17 100644 --- a/Haxl/Core/Memo.hs +++ b/Haxl/Core/Memo.hs @@ -45,6 +45,7 @@ import qualified Data.HashMap.Strict as HashMap import Data.Text (Text) import Data.Typeable import Data.Hashable +import Data.Int import Data.Word import GHC.Prim (Addr#) @@ -53,6 +54,7 @@ import Haxl.Core.Exception import Haxl.Core.DataCache as DataCache import Haxl.Core.Flags import Haxl.Core.Monad +import Haxl.Core.Stats import Haxl.Core.Profile import Haxl.Core.Util (trace_) @@ -73,17 +75,18 @@ cachedComputation => req a -> GenHaxl u w a -> GenHaxl u w a cachedComputation req haxl = GenHaxl $ \env@Env{..} -> do - ifProfiling flags $ - modifyIORef' - profRef - (incrementMemoHitCounterFor (profCurrentKey profCurrent)) mbRes <- DataCache.lookup req memoCache case mbRes of - Just ivar -> unHaxl (getIVarWithWrites ivar) env + Just (DataCacheItem ivar k) -> do + ifProfiling flags $ do + incrementMemoHitCounterFor env k True + unHaxl (getIVarWithWrites ivar) env Nothing -> do ivar <- newIVar - DataCache.insertNotShowable req ivar memoCache - unHaxl (execMemoNow haxl ivar) env + k <- nextCallId env + -- no need to incremenetMemoHitCounter as execMemo will do it + DataCache.insertNotShowable req (DataCacheItem ivar k) memoCache + execMemoNowProfiled env haxl ivar k -- | Like 'cachedComputation', but fails if the cache is already @@ -101,18 +104,15 @@ preCacheComputation , Typeable (req a)) => req a -> GenHaxl u w a -> GenHaxl u w a preCacheComputation req haxl = GenHaxl $ \env@Env{..} -> do - ifProfiling flags $ - modifyIORef' - profRef - (incrementMemoHitCounterFor (profCurrentKey profCurrent)) mbRes <- DataCache.lookup req memoCache case mbRes of Just _ -> return $ Throw $ toException $ InvalidParameter "preCacheComputation: key is already cached" Nothing -> do ivar <- newIVar - DataCache.insertNotShowable req ivar memoCache - unHaxl (execMemoNow haxl ivar) env + k <- nextCallId env + DataCache.insertNotShowable req (DataCacheItem ivar k) memoCache + execMemoNowProfiled env haxl ivar k -- ----------------------------------------------------------------------------- -- Memoization @@ -204,13 +204,43 @@ runMemo (MemoVar memoRef) = GenHaxl $ \env -> do MemoReady cont -> trace_ "MemoReady" $ do ivar <- newIVar writeIORef memoRef (MemoRun ivar) - unHaxl (execMemoNow cont ivar) env + execMemoNow env cont ivar -- The memo has already been run, get (or wait for) for the result MemoRun ivar -> trace_ "MemoRun" $ unHaxl (getIVarWithWrites ivar) env +execMemoNowProfiled + :: Env u w + -> GenHaxl u w a + -> IVar u w a + -> CallId + -> IO (Result u w a) +execMemoNowProfiled envOuter cont ivar cid = if report (flags envOuter) < 4 + then execMemoNow envOuter cont ivar + else do + incrementMemoHitCounterFor envOuter cid False + unHaxl + (collectMemoData 0 $ GenHaxl $ \e -> execMemoNow e cont ivar) + envOuter + where + addStats :: Env u w -> Int64 -> IO () + addStats env acc = modifyIORef' (statsRef env) $ \(Stats s) -> + Stats (MemoCall cid acc : s) + collectMemoData :: Int64 -> GenHaxl u w a -> GenHaxl u w a + collectMemoData acc f = GenHaxl $ \env -> do + a0 <- getAllocationCounter + r <- unHaxl f env{memoKey=cid} + a1 <- getAllocationCounter + let newTotal = acc + (a0 - a1) + ret <- case r of + Done a -> do addStats env newTotal; return (Done a) + Throw e -> do addStats env newTotal; return (Throw e) + Blocked ivar k -> + return (Blocked ivar (Cont (collectMemoData newTotal (toHaxl k)))) + setAllocationCounter a1 + return ret -execMemoNow :: GenHaxl u w a -> IVar u w a -> GenHaxl u w a -execMemoNow cont ivar = GenHaxl $ \env -> do +execMemoNow :: Env u w -> GenHaxl u w a -> IVar u w a -> IO (Result u w a) +execMemoNow env cont ivar = do wlogs <- newIORef NilWrites let !ienv = imperative env { writeLogsRef = wlogs } diff --git a/Haxl/Core/Monad.hs b/Haxl/Core/Monad.hs index 420f1f8..92242c8 100644 --- a/Haxl/Core/Monad.hs +++ b/Haxl/Core/Monad.hs @@ -74,12 +74,14 @@ module Haxl.Core.Monad -- * Env , Env(..) + , DataCacheItem(..) , Caches , caches , initEnvWithData , initEnv , emptyEnv , env, withEnv + , nextCallId , speculate , imperative @@ -130,6 +132,7 @@ import Control.Monad import qualified Control.Exception as Exception import Data.IORef import Data.Int +import Data.Either (rights) import GHC.Exts (IsString(..)) import Text.PrettyPrint hiding ((<>)) import Text.Printf @@ -151,13 +154,19 @@ import Haxl.Core.CallGraph -- The environment -- | The data we carry around in the Haxl monad. + +data DataCacheItem u w a = DataCacheItem (IVar u w a) {-# UNPACK #-} !CallId + data Env u w = Env - { dataCache :: {-# UNPACK #-} !(DataCache (IVar u w)) + { dataCache :: {-# UNPACK #-} !(DataCache (DataCacheItem u w)) -- ^ cached data fetches - , memoCache :: {-# UNPACK #-} !(DataCache (IVar u w)) + , memoCache :: {-# UNPACK #-} !(DataCache (DataCacheItem u w)) -- ^ memoized computations + , memoKey :: {-# UNPACK #-} !CallId + -- ^ current running memo key + , flags :: !Flags -- conservatively not unpacking, because this is passed -- to 'fetch' and would need to be rebuilt. @@ -172,8 +181,11 @@ data Env u w = Env -- ^ keeps track of a Unique ID for each batch dispatched with stats -- enabled, for aggregating after. + , callIdRef :: {-# UNPACK #-} !(IORef CallId) + -- ^ keeps track of a Unique ID for each fetch/memo. + , profCurrent :: ProfileCurrent - -- ^ current profiling label, see 'withLabel' + -- ^ current profiling label, see 'withLabel' , profRef :: {-# UNPACK #-} !(IORef Profile) -- ^ profiling data, collected according to the 'report' level in 'flags'. @@ -230,15 +242,28 @@ data ProfileCurrent = ProfileCurrent , profCurrentLabel :: {-# UNPACK #-} !ProfileLabel } -type Caches u w = (DataCache (IVar u w), DataCache (IVar u w)) +type Caches u w = (DataCache (DataCacheItem u w), DataCache (DataCacheItem u w)) caches :: Env u w -> Caches u w caches env = (dataCache env, memoCache env) +getMaxCallId :: DataCache (DataCacheItem u w) -> IO (Maybe Int) +getMaxCallId c = do + callIds <- rights . concatMap snd <$> + DataCache.readCache c (\(DataCacheItem _ i) -> return i) + case callIds of + [] -> return Nothing + vals -> return $ Just (maximum vals) + + -- | Initialize an environment with a 'StateStore', an input map, a -- preexisting 'DataCache', and a seed for the random number generator. initEnvWithData :: StateStore -> u -> Caches u w -> IO (Env u w) initEnvWithData states e (dcache, mcache) = do + newCid <- max <$> + (maybe 0 ((+) 1) <$> getMaxCallId dcache) <*> + (maybe 0 ((+) 1) <$> getMaxCallId mcache) + ciref<- newIORef newCid sref <- newIORef emptyStats sbref <- newIORef 0 pref <- newIORef emptyProfile @@ -251,12 +276,14 @@ initEnvWithData states e (dcache, mcache) = do return Env { dataCache = dcache , memoCache = mcache + , memoKey = (-1) , flags = defaultFlags , userEnv = e , states = states , statsRef = sref , statsBatchIdRef = sbref , profCurrent = ProfileCurrent 0 "MAIN" + , callIdRef = ciref , profRef = pref , reqStoreRef = rs , runQueueRef = rq @@ -814,6 +841,9 @@ withEnv newEnv (GenHaxl m) = GenHaxl $ \_env -> do Blocked ivar k -> return (Blocked ivar (Cont (withEnv newEnv (toHaxl k)))) +nextCallId :: Env u w -> IO CallId +nextCallId env = atomicModifyIORef' (callIdRef env) $ \x -> (x+1,x+1) + #ifdef PROFILING -- ----------------------------------------------------------------------------- -- CallGraph recording @@ -972,7 +1002,7 @@ dumpCacheAsHaskellFn fnName fnType cacheFn = do cache <- env dataCache -- NB. dataCache, not memoCache. We ignore memoized -- results when dumping the cache. let - readIVar IVar{ivarRef = !ref} = do + readIVar (DataCacheItem IVar{ivarRef = !ref} _) = do r <- readIORef ref case r of IVarFull (Ok a _) -> return (Just (Right a)) diff --git a/Haxl/Core/Profile.hs b/Haxl/Core/Profile.hs index f7c1467..dfce648 100644 --- a/Haxl/Core/Profile.hs +++ b/Haxl/Core/Profile.hs @@ -27,7 +27,6 @@ import Data.Hashable #if __GLASGOW_HASKELL__ < 804 import Data.Monoid #endif -import Data.Text (Text) import Data.Typeable import qualified Data.HashMap.Strict as HashMap import GHC.Exts @@ -179,45 +178,36 @@ profileCont m env = do return r {-# INLINE profileCont #-} - -incrementMemoHitCounterFor :: ProfileKey -> Profile -> Profile -incrementMemoHitCounterFor key p = - p { profile = - HashMap.insertWith - incrementMemoHitCounter - key - (emptyProfileData { profileMemoHits = 1 }) - (profile p) +incrementMemoHitCounterFor :: Env u w -> CallId -> Bool -> IO () +incrementMemoHitCounterFor env callId wasCached = do + modifyIORef' (profRef env) $ \p -> p { + profile = HashMap.insertWith + upd + (profCurrentKey $ profCurrent env) + (emptyProfileData { profileMemos = [val] }) + (profile p) } - -incrementMemoHitCounter :: ProfileData -> ProfileData -> ProfileData -incrementMemoHitCounter _ pd = pd { profileMemoHits = - succ (profileMemoHits pd) - } + where + val = ProfileMemo callId wasCached + upd _ old = old { profileMemos = val : profileMemos old } {-# NOINLINE addProfileFetch #-} addProfileFetch :: forall r u w a . (DataSourceName r, Eq (r a), Hashable (r a), Typeable (r a)) - => Env u w -> r a -> IO () -addProfileFetch env _req = do + => Env u w -> r a -> CallId -> Bool -> IO () +addProfileFetch env _req cid wasCached = do c <- getAllocationCounter let (ProfileCurrent profKey _) = profCurrent env modifyIORef' (profRef env) $ \ p -> let - dsName :: Text - dsName = dataSourceName (Proxy :: Proxy r) - - upd _ old = old { profileFetches = - HashMap.insertWith (+) dsName 1 (profileFetches old) } + val = ProfileFetch cid (memoKey env) wasCached + upd _ old = old { profileFetches = val : profileFetches old } in p { profile = HashMap.insertWith upd profKey - (emptyProfileData { profileFetches = - HashMap.singleton dsName 1 - } - ) + (emptyProfileData { profileFetches = [val] }) (profile p) } -- So we do not count the allocation overhead of addProfileFetch diff --git a/Haxl/Core/RequestStore.hs b/Haxl/Core/RequestStore.hs index 5650a66..059b8cb 100644 --- a/Haxl/Core/RequestStore.hs +++ b/Haxl/Core/RequestStore.hs @@ -21,6 +21,7 @@ -- module Haxl.Core.RequestStore ( BlockedFetches(..) + , BlockedFetchInternal(..) , RequestStore , isEmpty , noRequests @@ -35,6 +36,7 @@ module Haxl.Core.RequestStore ) where import Haxl.Core.DataSource +import Haxl.Core.Stats import Data.Map (Map) import qualified Data.Map.Strict as Map import Data.Proxy @@ -48,9 +50,12 @@ newtype RequestStore u = RequestStore (Map TypeRep (BlockedFetches u)) -- is dynamically-typed. It maps the TypeRep of the request to the -- 'BlockedFetches' for that 'DataSource'. +newtype BlockedFetchInternal = BlockedFetchInternal CallId + -- | A batch of 'BlockedFetch' objects for a single 'DataSource' data BlockedFetches u = - forall r. (DataSource u r) => BlockedFetches [BlockedFetch r] + forall r. (DataSource u r) => + BlockedFetches [BlockedFetch r] [BlockedFetchInternal] isEmpty :: RequestStore u -> Bool isEmpty (RequestStore m) = Map.null m @@ -62,13 +67,13 @@ noRequests = RequestStore Map.empty -- | Adds a 'BlockedFetch' to a 'RequestStore'. addRequest :: forall u r. (DataSource u r) - => BlockedFetch r -> RequestStore u -> RequestStore u -addRequest bf (RequestStore m) = - RequestStore $ Map.insertWith combine ty (BlockedFetches [bf]) m + => BlockedFetch r -> BlockedFetchInternal -> RequestStore u -> RequestStore u +addRequest bf bfi (RequestStore m) = + RequestStore $ Map.insertWith combine ty (BlockedFetches [bf] [bfi]) m where combine :: BlockedFetches u -> BlockedFetches u -> BlockedFetches u - combine _ (BlockedFetches bfs) - | typeOf1 (getR bfs) == ty = BlockedFetches (unsafeCoerce bf:bfs) + combine _ (BlockedFetches bfs bfis) + | typeOf1 (getR bfs) == ty = BlockedFetches (unsafeCoerce bf:bfs) (bfi:bfis) | otherwise = error "RequestStore.insert" -- the dynamic type check here should be unnecessary, but if -- there are bugs in `Typeable` or `Map` then we'll get an diff --git a/Haxl/Core/Stats.hs b/Haxl/Core/Stats.hs index a28bc22..5c39f92 100644 --- a/Haxl/Core/Stats.hs +++ b/Haxl/Core/Stats.hs @@ -18,6 +18,7 @@ module Haxl.Core.Stats ( -- * Data-source stats Stats(..) + , CallId , FetchStats(..) , Microseconds , Timestamp @@ -30,6 +31,8 @@ module Haxl.Core.Stats -- * Profiling , Profile(..) + , ProfileMemo(..) + , ProfileFetch(..) , emptyProfile , ProfileKey , ProfileLabel @@ -37,7 +40,6 @@ module Haxl.Core.Stats , emptyProfileData , AllocCount , LabelHitCount - , MemoHitCount -- * Allocation , getAllocationCounter @@ -106,6 +108,7 @@ ppStats (Stats rss) = fetchWasRunning fs t1 t2 = (fetchStart fs + fetchDuration fs) >= t1 && fetchStart fs < t2 +type CallId = Int -- | Maps data source name to the number of requests made in that round. -- The map only contains entries for sources that made requests in that @@ -115,11 +118,12 @@ data FetchStats = FetchStats { fetchDataSource :: Text , fetchBatchSize :: {-# UNPACK #-} !Int - , fetchStart :: !Timestamp -- TODO should be something else + , fetchStart :: {-# UNPACK #-} !Timestamp , fetchDuration :: {-# UNPACK #-} !Microseconds , fetchSpace :: {-# UNPACK #-} !Int64 , fetchFailures :: {-# UNPACK #-} !Int , fetchBatchId :: {-# UNPACK #-} !Int + , fetchIds :: [CallId] } -- | The stack trace of a call to 'dataFetch'. These are collected @@ -127,6 +131,11 @@ data FetchStats | FetchCall { fetchReq :: String , fetchStack :: [String] + , fetchStatId :: {-# UNPACK #-} !CallId + } + | MemoCall + { memoStatId :: {-# UNPACK #-} !CallId + , memoSpace :: {-# UNPACK #-} !Int64 } deriving (Eq, Show) @@ -136,7 +145,8 @@ ppFetchStats FetchStats{..} = printf "%s: %d fetches (%.2fms, %d bytes, %d failures)" (Text.unpack fetchDataSource) fetchBatchSize (fromIntegral fetchDuration / 1000 :: Double) fetchSpace fetchFailures -ppFetchStats (FetchCall r ss) = show r ++ '\n':show ss +ppFetchStats (FetchCall r ss _) = show r ++ '\n':show ss +ppFetchStats (MemoCall _r _ss) = "" -- | Aggregate stats merging FetchStats from the same dispatched batch into one. aggregateFetchBatches :: ([FetchStats] -> a) -> Stats -> [a] @@ -154,11 +164,17 @@ instance ToJSON FetchStats where , "duration" .= fetchDuration , "allocation" .= fetchSpace , "failures" .= fetchFailures - , "bachid" .= fetchBatchId + , "batchid" .= fetchBatchId + , "fetchids" .= fetchIds ] - toJSON (FetchCall req strs) = object + toJSON (FetchCall req strs fid) = object [ "request" .= req , "stack" .= strs + , "fetchid" .= fid + ] + toJSON (MemoCall cid allocs) = object + [ "callid" .= cid + , "allocation" .= allocs ] emptyStats :: Stats @@ -174,9 +190,21 @@ numFetches (Stats rs) = sum [ fetchBatchSize | FetchStats{..} <- rs ] type ProfileLabel = Text type AllocCount = Int64 type LabelHitCount = Int64 -type MemoHitCount = Int64 type ProfileKey = Int64 +data ProfileFetch = ProfileFetch + { profileFetchFetchId :: {-# UNPACK #-} !CallId + , profileFetchMemoId :: {-# UNPACK #-} !CallId + , profileFetchWasCached :: !Bool + } + deriving (Show, Eq) + +data ProfileMemo = ProfileMemo + { profileMemoId :: {-# UNPACK #-} !CallId + , profileMemoWasCached :: !Bool + } + deriving (Show, Eq) + data Profile = Profile { profile :: HashMap ProfileKey ProfileData -- ^ Data per key (essentially per call stack) @@ -193,14 +221,14 @@ emptyProfile = Profile HashMap.empty (HashMap.singleton ("MAIN", 0) 0) 1 data ProfileData = ProfileData { profileAllocs :: {-# UNPACK #-} !AllocCount -- ^ allocations made by this label - , profileFetches :: HashMap Text Int - -- ^ map from datasource name => fetch count + , profileFetches :: [ProfileFetch] + -- ^ fetches made in this label , profileLabelHits :: {-# UNPACK #-} !LabelHitCount -- ^ number of hits at this label - , profileMemoHits :: {-# UNPACK #-} !MemoHitCount - -- ^ number of hits to memoized computation at this label + , profileMemos :: [ProfileMemo] + -- ^ memo and a boolean representing if it was cached at the time } deriving Show emptyProfileData :: ProfileData -emptyProfileData = ProfileData 0 HashMap.empty 0 0 +emptyProfileData = ProfileData 0 [] 0 [] diff --git a/changelog.md b/changelog.md index 60f39c6..5ca04cf 100644 --- a/changelog.md +++ b/changelog.md @@ -1,6 +1,6 @@ # Changes in version * Added fetchBatchId to FetchStats - * Profiling now tracks full stacks + * Profiling now tracks full stacks and links each label to memos/fetches # Changes in version 2.3.0.0 * Removed `FutureFetch` diff --git a/tests/BatchTests.hs b/tests/BatchTests.hs index 2efce21..4d1fe42 100644 --- a/tests/BatchTests.hs +++ b/tests/BatchTests.hs @@ -141,6 +141,8 @@ cacheReuse future = do tao <- MockTAO.initGlobalState future let st = stateSet tao stateEmpty env2 <- initEnvWithData st testinput (caches env) + cid <- readIORef (callIdRef env2) + assertBool "callId is unique" (cid > 0) -- ensure no more data fetching rounds needed expectResultWithEnv 12 batching7_ env2 diff --git a/tests/ProfileTests.hs b/tests/ProfileTests.hs index b3388aa..b884b09 100644 --- a/tests/ProfileTests.hs +++ b/tests/ProfileTests.hs @@ -23,6 +23,7 @@ import Control.Exception (evaluate) import Data.Aeson import Data.IORef import qualified Data.HashMap.Strict as HashMap +import Data.Int import TestUtils import WorkDataSource @@ -131,9 +132,43 @@ threadAlloc batches = do -- if we actually do more than 1 batch then the above test is not useful +-- Test that we correctly attribute memo work +memos:: Assertion +memos = do + env <- mkProfilingEnv + let + memoAllocs = 10000000 :: Int64 + doWorkMemo = memo (1 :: Int) $ unsafeLiftIO $ do + a0 <- getAllocationCounter + setAllocationCounter $ a0 - memoAllocs + return (5 :: Int) + _ <- runHaxl env $ andThen + (withLabel "do" doWorkMemo) + (withLabel "cached" doWorkMemo) + profData <- labelToDataMap <$> readIORef (profRef env) + case HashMap.lookup "do" profData of + Nothing -> assertFailure "do not in data" + Just ProfileData{..} -> do + assertEqual "has correct memo id" profileMemos [ProfileMemo 1 False] + assertBool "allocs are included in 'do'" (profileAllocs > memoAllocs) + case HashMap.lookup "cached" profData of + Nothing -> assertFailure "cached not in data" + Just ProfileData{..} -> do + assertEqual "has correct memo id" profileMemos [ProfileMemo 1 True] + assertBool "allocs are *not* included in 'cached'" (profileAllocs < 50000) + (Stats memoStats) <- readIORef (statsRef env) + assertEqual "exactly 1 memo/fetch" 1 (length memoStats) + let memoStat = head memoStats + putStrLn $ "memoStat=" ++ show memoStat + assertEqual "correct call id" 1 (memoStatId memoStat) + assertBool "allocs are big enough" $ memoSpace memoStat >= memoAllocs + assertBool "allocs are not too big" $ memoSpace memoStat < memoAllocs + 100000 + + tests = TestList [ TestLabel "collectsdata" $ TestCase collectsdata , TestLabel "exceptions" $ TestCase exceptions , TestLabel "threads" $ TestCase (threadAlloc 1) , TestLabel "threads with batch" $ TestCase (threadAlloc 50) + , TestLabel "memos" $ TestCase memos ] diff --git a/tests/StatsTests.hs b/tests/StatsTests.hs index 59a38e5..40dc42a 100644 --- a/tests/StatsTests.hs +++ b/tests/StatsTests.hs @@ -11,8 +11,20 @@ module StatsTests (tests) where import Test.HUnit import Data.List +import Data.Maybe +import Haxl.Prelude import Haxl.Core +import Prelude() + + +import ExampleDataSource +import SleepDataSource +import Haxl.DataSource.ConcurrentIO + +import Control.Monad (void) +import Data.IORef +import qualified Data.HashMap.Strict as HashMap aggregateBatches :: Test aggregateBatches = TestCase $ do @@ -23,16 +35,18 @@ aggregateBatches = TestCase $ do , fetchDuration = 10 , fetchSpace = 1 , fetchFailures = 2 - , fetchBatchId = n } + , fetchBatchId = n + , fetchIds = [1,2] } | n <- reverse [1..10] ++ [11..20] ] - ++ [ FetchCall "A" ["B"], FetchCall "C" ["D"] ] + ++ [ FetchCall "A" ["B"] 1, FetchCall "C" ["D"] 2 ] fetchBatch = [ FetchStats { fetchDataSource = "batch" , fetchBatchSize = 1 , fetchStart = 100 , fetchDuration = 1000 * n , fetchSpace = 3 , fetchFailures = if n <= 3 then 1 else 0 - , fetchBatchId = 123 } | n <- [1..50] ] + , fetchBatchId = 123 + , fetchIds = [fromIntegral n] } | n <- [1..50] ] agg (sz,bids) FetchStats{..} = (sz + fetchBatchSize, fetchBatchId:bids) agg _ _ = error "unexpected" agg' = foldl' agg (0,[]) @@ -50,4 +64,104 @@ aggregateBatches = TestCase $ do assertEqual "Grouping works as expected" expectedResultInterspersed aggInterspersedBatch -tests = TestList [TestLabel "Aggregate Batches" aggregateBatches] + +testEnv = do + -- To use a data source, we need to initialize its state: + exstate <- ExampleDataSource.initGlobalState + sleepState <- mkConcurrentIOState + + -- And create a StateStore object containing the states we need: + let st = stateSet exstate (stateSet sleepState stateEmpty) + + -- Create the Env: + env <- initEnv st () + return env{ flags = (flags env){ report = 5 } } + + +fetchIdsSync :: Test +fetchIdsSync = TestCase $ do + env <- testEnv + _ <- runHaxl env $ + sequence_ + [ void $ countAardvarks "abcabc" + (length <$> listWombats 3) + , void $ listWombats 100 + , void $ listWombats 99 ] + -- expect a single DS stat + (Stats stats) <- readIORef (statsRef env) + let + fetchStats = [x | x@FetchStats{} <- stats] + assertEqual "Only 1 batch" 1 (length fetchStats) + +fetchIdsBackground :: Test +fetchIdsBackground = TestCase $ do + env <- testEnv + _ <- runHaxl env $ + sequence_ + [ withLabel "short" $ sleep 1 + , withLabel "long" $ sleep 500 ] + + -- make sure that with memo'ing we still preserve the stack + _ <- runHaxl env $ withLabel "base" + (memo (1 :: Int) $ withLabel "child" $ sleep 102) + + _ <- runHaxl env $ withLabel "short_cached" $ sleep 1 + + -- expect a single DS stat + (Stats stats) <- readIORef (statsRef env) + (Profile p pt _) <- readIORef (profRef env) + let + keyMap = + HashMap.fromList [ (label, k) | ((label,_), k) <- HashMap.toList pt] + revMap = HashMap.fromList [(v,k) | (k,v) <- HashMap.toList pt] + parentMap = + HashMap.fromList $ + catMaybes + [ case HashMap.lookup kp revMap of + Just (lp,_) -> Just (label, lp) + Nothing -> Nothing + | ((label,kp), _) <- HashMap.toList pt] + fetchMap = HashMap.fromList [ (fid, x) | x@FetchStats{} <- stats + , fid <- fetchIds x] + get l = [ (prof, wasCached, fetchStat) + | Just key <- [HashMap.lookup l keyMap] + , Just prof <- [HashMap.lookup key p] + , ProfileFetch fid _ wasCached <- profileFetches prof + , Just fetchStat <- [HashMap.lookup fid fetchMap]] + [(short, shortWC, shortFetch)] = get "short" + [(long, longWC, longFetch)] = get "long" + [(shortCached, shortCachedWC, shortCachedFetch)] = get "short_cached" + + assertEqual "3 batches" 3 (HashMap.size fetchMap) + assertEqual "6 labels (inc MAIN)" 6 (HashMap.size keyMap) + + assertEqual "child parent is base" + (Just "base") + (HashMap.lookup "child" parentMap) + + assertEqual "base parent is MAIN" + (Just "MAIN") + (HashMap.lookup "base" parentMap) + + assertEqual "long parent is MAIN" + (Just "MAIN") + (HashMap.lookup "long" parentMap) + + assertBool "original fetches not cached (short)" (not shortWC) + assertBool "original fetches not cached (long)" (not longWC) + assertBool "was cached short" shortCachedWC + + assertEqual "one fetch short" 1 (length $ profileFetches short) + assertEqual "one fetch long" 1 (length $ profileFetches long) + assertEqual "one fetch short_cached" 1 (length $ profileFetches shortCached) + + assertBool "short fetch mapped properly" (fetchDuration shortFetch < 100000) + assertEqual + "short cached fetch mapped properly" + (fetchDuration shortFetch) + (fetchDuration shortCachedFetch) + assertBool "long fetch was mapped properly" (fetchDuration longFetch > 100000) + + +tests = TestList [ TestLabel "Aggregate Batches" aggregateBatches + , TestLabel "Fetch IDs Sync" fetchIdsSync + , TestLabel "Fetch IDs Background" fetchIdsBackground ]