mirror of
https://github.com/facebook/Haxl.git
synced 2024-10-04 06:07:32 +03:00
Correctly account for allocation done in child threads
Summary: This isn't pretty, but it's the least intrusive and most efficient way I could find to do it. The tricky part is that when doing multiple putResults in the same child thread, we have to ensure the *last* one (and only the last one) is putResultFromChildThread. Reviewed By: xich Differential Revision: D6519631 fbshipit-source-id: 1c3c40f311031ac4cc8ed82daefcb7740b91541e
This commit is contained in:
parent
550d1a5e1d
commit
6b75496a29
@ -79,6 +79,7 @@ module Haxl.Core (
|
||||
, putFailure
|
||||
, putResult
|
||||
, putSuccess
|
||||
, putResultFromChildThread
|
||||
|
||||
-- ** Default fetch implementations
|
||||
, asyncFetch, asyncFetchWithDispatch, asyncFetchAcquireRelease
|
||||
|
@ -34,6 +34,7 @@ module Haxl.Core.DataSource
|
||||
, mkResultVar
|
||||
, putFailure
|
||||
, putResult
|
||||
, putResultFromChildThread
|
||||
, putSuccess
|
||||
|
||||
-- * Default fetch implementations
|
||||
@ -182,9 +183,12 @@ data BlockedFetch r = forall a. BlockedFetch (r a) (ResultVar a)
|
||||
-- ResultVar
|
||||
|
||||
-- | A sink for the result of a data fetch in 'BlockedFetch'
|
||||
newtype ResultVar a = ResultVar (Either SomeException a -> IO ())
|
||||
newtype ResultVar a = ResultVar (Either SomeException a -> Bool -> IO ())
|
||||
-- The Bool here is True if result was returned by a child thread,
|
||||
-- rather than the main runHaxl thread. see Note [tracking allocation in
|
||||
-- child threads]
|
||||
|
||||
mkResultVar :: (Either SomeException a -> IO ()) -> ResultVar a
|
||||
mkResultVar :: (Either SomeException a -> Bool -> IO ()) -> ResultVar a
|
||||
mkResultVar = ResultVar
|
||||
|
||||
putFailure :: (Exception e) => ResultVar a -> e -> IO ()
|
||||
@ -194,7 +198,26 @@ putSuccess :: ResultVar a -> a -> IO ()
|
||||
putSuccess r = putResult r . Right
|
||||
|
||||
putResult :: ResultVar a -> Either SomeException a -> IO ()
|
||||
putResult (ResultVar io) res = io res
|
||||
putResult (ResultVar io) res = io res False
|
||||
|
||||
-- | Like `putResult`, but used to get correct accounting when work is
|
||||
-- being done in child threads. This is particularly important for
|
||||
-- data sources that are using 'BackgroundFetch', The allocation performed
|
||||
-- in the child thread up to this point will be propagated back to the
|
||||
-- thread that called 'runHaxl'.
|
||||
--
|
||||
-- Note: if you're doing multiple 'putResult' calls in the same thread
|
||||
-- ensure that only the /last/ one is 'putResultFromChildThread'. If you
|
||||
-- make multiple 'putResultFromChildThread' calls, the allocation will be
|
||||
-- counted multiple times.
|
||||
--
|
||||
-- If you are reusing a thread for multiple fetches, you should call
|
||||
-- @System.Mem.setAllocationCounter 0@ after
|
||||
-- 'putResultFromChildThread', so that allocation is not counted
|
||||
-- multiple times.
|
||||
putResultFromChildThread :: ResultVar a -> Either SomeException a -> IO ()
|
||||
putResultFromChildThread (ResultVar io) res = io res True
|
||||
-- see Note [tracking allocation in child threads]
|
||||
|
||||
-- | Function for easily setting a fetch to a particular exception
|
||||
setError :: (Exception e) => (forall a. r a -> e) -> BlockedFetch r -> IO ()
|
||||
|
@ -101,11 +101,9 @@ cachedWithInsert showFn insertFn Env{..} req = do
|
||||
let
|
||||
doFetch = do
|
||||
ivar <- newIVar
|
||||
let done r = atomically $ do
|
||||
cs <- readTVar completions
|
||||
writeTVar completions (CompleteReq r ivar : cs)
|
||||
let !rvar = stdResultVar ivar completions
|
||||
writeIORef cacheRef $! insertFn req ivar cache
|
||||
return (Uncached (mkResultVar done) ivar)
|
||||
return (Uncached rvar ivar)
|
||||
case DataCache.lookup req cache of
|
||||
Nothing -> doFetch
|
||||
Just (IVar cr) -> do
|
||||
@ -119,6 +117,24 @@ cachedWithInsert showFn insertFn Env{..} req = do
|
||||
Ok _ -> "Cached request: " ++ showFn req
|
||||
return (Cached r)
|
||||
|
||||
|
||||
-- | Make a ResultVar with the standard function for sending a CompletionReq
|
||||
-- to the scheduler.
|
||||
stdResultVar :: IVar u a -> TVar [CompleteReq u] -> ResultVar a
|
||||
stdResultVar ivar completions = mkResultVar $ \r isChildThread -> do
|
||||
allocs <- if isChildThread
|
||||
then
|
||||
-- In a child thread, return the current allocation counter too,
|
||||
-- for correct tracking of allocation.
|
||||
getAllocationCounter
|
||||
else
|
||||
return 0
|
||||
atomically $ do
|
||||
cs <- readTVar completions
|
||||
writeTVar completions (CompleteReq r ivar allocs : cs)
|
||||
{-# INLINE stdResultVar #-}
|
||||
|
||||
|
||||
-- | Record the call stack for a data fetch in the Stats. Only useful
|
||||
-- when profiling.
|
||||
logFetch :: Env u -> (r a -> String) -> r a -> IO ()
|
||||
@ -208,13 +224,11 @@ uncachedRequest req = do
|
||||
if isRecordingFlag /= 0
|
||||
then dataFetch req
|
||||
else GenHaxl $ \Env{..} -> do
|
||||
cr <- newIVar
|
||||
let done r = atomically $ do
|
||||
cs <- readTVar completions
|
||||
writeTVar completions (CompleteReq r cr : cs)
|
||||
ivar <- newIVar
|
||||
let !rvar = stdResultVar ivar completions
|
||||
modifyIORef' reqStoreRef $ \bs ->
|
||||
addRequest (BlockedFetch req (mkResultVar done)) bs
|
||||
return $ Blocked cr (Cont (getIVar cr))
|
||||
addRequest (BlockedFetch req rvar) bs
|
||||
return $ Blocked ivar (Cont (getIVar ivar))
|
||||
|
||||
|
||||
-- | Transparently provides caching. Useful for datasources that can
|
||||
@ -450,13 +464,13 @@ wrapFetchInStats !statsRef dataSource batchSize perform = do
|
||||
return (t0,t, fromIntegral $ prevAlloc - postAlloc, a)
|
||||
|
||||
addTimer t0 (BlockedFetch req (ResultVar fn)) =
|
||||
BlockedFetch req $ ResultVar $ \result -> do
|
||||
BlockedFetch req $ ResultVar $ \result isChildThread -> do
|
||||
t1 <- getTimestamp
|
||||
updateFetchStats t0 (t1 - t0)
|
||||
0 -- allocs: we can't measure this easily for BackgroundFetch
|
||||
1 -- batch size: we don't know if this is a batch or not
|
||||
(if isLeft result then 1 else 0) -- failures
|
||||
fn result
|
||||
fn result isChildThread
|
||||
|
||||
updateFetchStats
|
||||
:: Timestamp -> Microseconds -> Int64 -> Int -> Int -> IO ()
|
||||
@ -471,9 +485,9 @@ wrapFetchInStats !statsRef dataSource batchSize perform = do
|
||||
|
||||
addFailureCount :: IORef Int -> BlockedFetch r -> BlockedFetch r
|
||||
addFailureCount ref (BlockedFetch req (ResultVar fn)) =
|
||||
BlockedFetch req $ ResultVar $ \result -> do
|
||||
BlockedFetch req $ ResultVar $ \result isChildThread -> do
|
||||
when (isLeft result) $ atomicModifyIORef' ref (\r -> (r+1,()))
|
||||
fn result
|
||||
fn result isChildThread
|
||||
|
||||
wrapFetchInTrace
|
||||
:: Int
|
||||
|
@ -116,6 +116,7 @@ import Control.Applicative hiding (Const)
|
||||
import Prelude hiding (catch)
|
||||
#endif
|
||||
import Data.IORef
|
||||
import Data.Int
|
||||
import GHC.Exts (IsString(..))
|
||||
import Text.PrettyPrint hiding ((<>))
|
||||
import Text.Printf
|
||||
@ -399,11 +400,52 @@ eitherToResult (Left e) = ThrowHaxl e
|
||||
-- data source is just to add these to a queue ('completions') using
|
||||
-- 'putResult'; the scheduler collects them from the queue and unblocks
|
||||
-- the relevant computations.
|
||||
data CompleteReq u =
|
||||
forall a . CompleteReq (Either SomeException a)
|
||||
!(IVar u a) -- IVar because the result is cached
|
||||
data CompleteReq u
|
||||
= forall a . CompleteReq
|
||||
(Either SomeException a)
|
||||
!(IVar u a) -- IVar because the result is cached
|
||||
{-# UNPACK #-} !Int64 -- see Note [tracking allocation in child threads]
|
||||
|
||||
|
||||
{- Note [tracking allocation in child threads]
|
||||
|
||||
For a BackgroundFetch, we might be doing some of the work in a
|
||||
separate thread, but we want to make sure that the parent thread gets
|
||||
charged for the allocation, so that allocation limits still work.
|
||||
|
||||
The design is a bit tricky here. We want to track the allocation
|
||||
accurately but without adding much overhead.
|
||||
|
||||
The best way to propagate the allocation back from the child thread is
|
||||
through putResult. If we had some other method, we would also need a
|
||||
way to synchronise it with the main runHaxl loop; the advantage of
|
||||
putResult is that this is already a synchronisation method, because
|
||||
runHaxl is waiting for the result of the dataFetch.
|
||||
|
||||
(slight wrinkle here: runHaxl might not wait for the result of the
|
||||
dataFetch in the case where we do some speculative execution in
|
||||
pAnd/pOr)
|
||||
|
||||
We need a special version of putResult for child threads
|
||||
(putResultFromChildThread), because we don't want to propagate any
|
||||
allocation from the runHaxl thread back to itself and count it twice.
|
||||
|
||||
We also want to capture the allocation as late as possible, so that we
|
||||
count everything. For that reason, we pass a Bool down from putResult
|
||||
into the function in the ResultVar, and it reads the allocation
|
||||
counter as the last thing before adding the result to the completions
|
||||
TVar.
|
||||
|
||||
The other problem to consider is how to capture the allocation when
|
||||
the child thread is doing multiple putResults. Our solution here is
|
||||
to ensure that the *last* one is a putResultFromChildThread, so it
|
||||
captures all the allocation from everything leading up to it.
|
||||
|
||||
Why not reset the counter each time, so we could do multiple
|
||||
putResultFromChildThreads? Because the child thread might be using an
|
||||
allocation limit itself, and changing the counter would mess it up.
|
||||
-}
|
||||
|
||||
-- -----------------------------------------------------------------------------
|
||||
-- Result
|
||||
|
||||
|
@ -29,6 +29,7 @@ import Haxl.Core.Monad
|
||||
import Haxl.Core.Fetch
|
||||
import Haxl.Core.Profile
|
||||
import Haxl.Core.RequestStore as RequestStore
|
||||
import Haxl.Core.Stats
|
||||
|
||||
|
||||
-- -----------------------------------------------------------------------------
|
||||
@ -148,7 +149,11 @@ runHaxl env@Env{..} haxl = do
|
||||
[] -> return JobNil
|
||||
_ -> do
|
||||
ifTrace flags 3 $ printf "%d complete\n" (length comps)
|
||||
let getComplete (CompleteReq a (IVar cr)) = do
|
||||
let
|
||||
getComplete (CompleteReq a (IVar cr) allocs) = do
|
||||
when (allocs < 0) $ do
|
||||
cur <- getAllocationCounter
|
||||
setAllocationCounter (cur + allocs)
|
||||
r <- readIORef cr
|
||||
case r of
|
||||
IVarFull _ -> do
|
||||
|
@ -80,4 +80,4 @@ instance
|
||||
fetch _state _flags _u = BackgroundFetch $ \bfs -> do
|
||||
forM_ bfs $ \(BlockedFetch req rv) ->
|
||||
mask $ \unmask ->
|
||||
forkFinally (unmask (performIO req)) (putResult rv)
|
||||
forkFinally (unmask (performIO req)) (putResultFromChildThread rv)
|
||||
|
@ -145,6 +145,7 @@ test-suite test
|
||||
TestExampleDataSource
|
||||
TestTypes
|
||||
TestUtils
|
||||
WorkDataSource
|
||||
|
||||
type:
|
||||
exitcode-stdio-1.0
|
||||
|
@ -13,6 +13,8 @@ import Haxl.Prelude
|
||||
|
||||
import Haxl.Core
|
||||
import Haxl.Core.Monad
|
||||
import Haxl.Core.Stats
|
||||
import Haxl.DataSource.ConcurrentIO
|
||||
|
||||
import Test.HUnit
|
||||
|
||||
@ -24,6 +26,7 @@ import qualified Data.HashMap.Strict as HashMap
|
||||
import qualified Data.HashSet as HashSet
|
||||
|
||||
import TestUtils
|
||||
import WorkDataSource
|
||||
|
||||
mkProfilingEnv = do
|
||||
env <- makeTestEnv False
|
||||
@ -73,7 +76,24 @@ exceptions = do
|
||||
assertBool "inner label added" $
|
||||
HashMap.member "inner" profData
|
||||
|
||||
|
||||
-- Test that we correctly attribute work done in child threads when
|
||||
-- using BackgroundFetch to the caller of runHaxl. This is important
|
||||
-- for correct accounting when relying on allocation limits.
|
||||
threadAlloc :: Assertion
|
||||
threadAlloc = do
|
||||
st <- mkConcurrentIOState
|
||||
env <- initEnv (stateSet st stateEmpty) ()
|
||||
a0 <- getAllocationCounter
|
||||
_x <- runHaxl env $ work 100000
|
||||
a1 <- getAllocationCounter
|
||||
assertBool "threadAlloc" $ (a0 - a1) > 1000000
|
||||
-- the result was 16MB on 64-bit, or around 25KB if we miss the allocs
|
||||
-- in the child thread.
|
||||
|
||||
|
||||
tests = TestList
|
||||
[ TestLabel "collectsdata" $ TestCase collectsdata
|
||||
, TestLabel "exceptions" $ TestCase exceptions
|
||||
, TestLabel "threads" $ TestCase threadAlloc
|
||||
]
|
||||
|
44
tests/WorkDataSource.hs
Normal file
44
tests/WorkDataSource.hs
Normal file
@ -0,0 +1,44 @@
|
||||
-- Copyright (c) 2014-present, Facebook, Inc.
|
||||
-- All rights reserved.
|
||||
--
|
||||
-- This source code is distributed under the terms of a BSD license,
|
||||
-- found in the LICENSE file.
|
||||
|
||||
{-# LANGUAGE DeriveDataTypeable #-}
|
||||
{-# LANGUAGE GADTs #-}
|
||||
{-# LANGUAGE MultiParamTypeClasses #-}
|
||||
{-# LANGUAGE StandaloneDeriving #-}
|
||||
{-# LANGUAGE TypeFamilies #-}
|
||||
{-# LANGUAGE FlexibleInstances #-}
|
||||
|
||||
module WorkDataSource (
|
||||
work,
|
||||
) where
|
||||
|
||||
import Haxl.Prelude
|
||||
import Prelude ()
|
||||
|
||||
import Haxl.Core
|
||||
import Haxl.DataSource.ConcurrentIO
|
||||
|
||||
import Control.Exception
|
||||
import Data.Hashable
|
||||
import Data.Typeable
|
||||
|
||||
work :: Int -> GenHaxl u Int
|
||||
work n = dataFetch (Work n)
|
||||
|
||||
data Work deriving Typeable
|
||||
instance ConcurrentIO Work where
|
||||
data ConcurrentIOReq Work a where
|
||||
Work :: Int -> ConcurrentIOReq Work Int
|
||||
|
||||
performIO (Work n) = evaluate (sum [1..n]) >> return n
|
||||
|
||||
deriving instance Eq (ConcurrentIOReq Work a)
|
||||
deriving instance Show (ConcurrentIOReq Work a)
|
||||
|
||||
instance ShowP (ConcurrentIOReq Work) where showp = show
|
||||
|
||||
instance Hashable (ConcurrentIOReq Work a) where
|
||||
hashWithSalt s (Work n) = hashWithSalt s n
|
Loading…
Reference in New Issue
Block a user