mirror of
https://github.com/facebook/Haxl.git
synced 2025-01-08 19:00:25 +03:00
Remove FutureFetch from Haxl (#109)
Summary: Pull Request resolved: https://github.com/facebook/Haxl/pull/109 FutureFetch is unused (except for one test) and overall has not proven itself to be a useful fetch type. It adds a new waiting point (the others being BackgroundFetch and Async/Sync fetches) which can add latency. For example if all three are dispatched in one round how would the scheduler know ahead of time which one to wait on in order to make forward progress. Reviewed By: simonmar Differential Revision: D19410093 fbshipit-source-id: 40c900fbff9e06098acb2a21fc59b49adefadc5b
This commit is contained in:
parent
5b33204c10
commit
c49ba39bd9
@ -143,20 +143,6 @@ data PerformFetch req
|
||||
-- ^ Fetches the data in the background, calling 'putResult' at
|
||||
-- any time in the future. This is the best kind of fetch,
|
||||
-- because it provides the most concurrency.
|
||||
| FutureFetch ([BlockedFetch req] -> IO (IO ()))
|
||||
-- ^ Returns an IO action that, when performed, waits for the data
|
||||
-- to be received. This is the second-best type of fetch, because
|
||||
-- the scheduler still has to perform the blocking wait at some
|
||||
-- point in the future, and when it has multiple blocking waits to
|
||||
-- perform, it can't know which one will return first.
|
||||
--
|
||||
-- Why not just forkIO the IO action to make a FutureFetch into a
|
||||
-- BackgroundFetch? The blocking wait will probably do a safe FFI
|
||||
-- call, which means it needs its own OS thread. If we don't want
|
||||
-- to create an arbitrary number of OS threads, then FutureFetch
|
||||
-- enables all the blocking waits to be done on a single thread.
|
||||
-- Also, you might have a data source that requires all calls to
|
||||
-- be made in the same OS thread.
|
||||
|
||||
|
||||
-- | A 'BlockedFetch' is a pair of
|
||||
|
@ -205,11 +205,8 @@ dataFetchWithInsert showFn insertFn req =
|
||||
-- eagerly, or batch them up.
|
||||
--
|
||||
case schedulerHint userEnv :: SchedulerHint r of
|
||||
SubmitImmediately -> do
|
||||
(_,ios) <- performFetches 0 env
|
||||
[BlockedFetches [BlockedFetch req rvar]]
|
||||
when (not (null ios)) $
|
||||
error "bad data source:SubmitImmediately but returns FutureFetch"
|
||||
SubmitImmediately ->
|
||||
performFetches env [BlockedFetches [BlockedFetch req rvar]]
|
||||
TryToBatch ->
|
||||
-- add the request to the RequestStore and continue
|
||||
modifyIORef' reqStoreRef $ \bs ->
|
||||
@ -341,18 +338,16 @@ dupableCacheRequest request result = GenHaxl $ \Env{..} -> do
|
||||
return (Done ())
|
||||
|
||||
performRequestStore
|
||||
:: forall u w. Int -> Env u w -> RequestStore u -> IO (Int, [IO ()])
|
||||
performRequestStore n env reqStore =
|
||||
performFetches n env (contents reqStore)
|
||||
:: forall u w. Env u w -> RequestStore u -> IO ()
|
||||
performRequestStore env reqStore =
|
||||
performFetches env (contents reqStore)
|
||||
|
||||
-- | Issues a batch of fetches in a 'RequestStore'. After
|
||||
-- 'performFetches', all the requests in the 'RequestStore' are
|
||||
-- complete, and all of the 'ResultVar's are full.
|
||||
performFetches
|
||||
:: forall u w. Int -> Env u w -> [BlockedFetches u] -> IO (Int, [IO ()])
|
||||
performFetches n env@Env{flags=f, statsRef=sref} jobs = do
|
||||
let !n' = n + length jobs
|
||||
|
||||
:: forall u w. Env u w -> [BlockedFetches u] -> IO ()
|
||||
performFetches env@Env{flags=f, statsRef=sref} jobs = do
|
||||
t0 <- getTimestamp
|
||||
|
||||
let
|
||||
@ -391,9 +386,9 @@ performFetches n env@Env{flags=f, statsRef=sref} jobs = do
|
||||
where
|
||||
dsName = dataSourceName (Proxy :: Proxy r)
|
||||
|
||||
fetches <- zipWithM applyFetch [n..] jobs
|
||||
fetches <- zipWithM applyFetch [0..] jobs
|
||||
|
||||
waits <- scheduleFetches fetches (submittedReqsRef env) (flags env)
|
||||
scheduleFetches fetches (submittedReqsRef env) (flags env)
|
||||
|
||||
t1 <- getTimestamp
|
||||
let roundtime = fromIntegral (t1 - t0) / 1000000 :: Double
|
||||
@ -401,8 +396,6 @@ performFetches n env@Env{flags=f, statsRef=sref} jobs = do
|
||||
ifTrace f 1 $
|
||||
printf "Batch data fetch done (%.2fs)\n" (realToFrac roundtime :: Double)
|
||||
|
||||
return (n', waits)
|
||||
|
||||
data FetchToDo where
|
||||
FetchToDo
|
||||
:: forall (req :: * -> *). (DataSourceName req, Typeable req)
|
||||
@ -430,9 +423,6 @@ wrapFetchInCatch reqs fetch =
|
||||
-- sources themselves to catch (synchronous) exceptions. Async
|
||||
-- exceptions aren't a problem because we're going to rethrow
|
||||
-- them all the way to runHaxl anyway.
|
||||
FutureFetch f ->
|
||||
FutureFetch $ \reqs -> f reqs `Exception.catch` (
|
||||
\e -> handler e >> return (return ()))
|
||||
BackgroundFetch f ->
|
||||
BackgroundFetch $ \reqs -> f reqs `Exception.catch` handler
|
||||
where
|
||||
@ -475,16 +465,6 @@ wrapFetchInStats !statsRef dataSource batchSize perform = do
|
||||
failures <- readIORef fail_ref
|
||||
updateFetchStats t0 (totalTime - innerTime) (totalAlloc - innerAlloc)
|
||||
batchSize failures
|
||||
FutureFetch submit ->
|
||||
FutureFetch $ \reqs -> do
|
||||
fail_ref <- newIORef 0
|
||||
let reqs' = map (addFailureCount fail_ref) reqs
|
||||
(t0, submitTime, submitAlloc, wait) <- statsForIO (submit reqs')
|
||||
return $ do
|
||||
(_, waitTime, waitAlloc, _) <- statsForIO wait
|
||||
failures <- readIORef fail_ref
|
||||
updateFetchStats t0 (submitTime + waitTime) (submitAlloc + waitAlloc)
|
||||
batchSize failures
|
||||
BackgroundFetch io -> do
|
||||
BackgroundFetch $ \reqs -> do
|
||||
startTime <- getTimestamp
|
||||
@ -561,7 +541,7 @@ time io = do
|
||||
|
||||
-- | Start all the async fetches first, then perform the sync fetches before
|
||||
-- getting the results of the async fetches.
|
||||
scheduleFetches :: [FetchToDo] -> IORef ReqCountMap -> Flags -> IO [IO ()]
|
||||
scheduleFetches :: [FetchToDo] -> IORef ReqCountMap -> Flags -> IO ()
|
||||
scheduleFetches fetches ref flags = do
|
||||
-- update ReqCountmap for these fetches
|
||||
ifReport flags 1 $ sequence_
|
||||
@ -570,18 +550,12 @@ scheduleFetches fetches ref flags = do
|
||||
| FetchToDo (reqs :: [BlockedFetch r]) _f <- fetches
|
||||
]
|
||||
fully_async_fetches
|
||||
waits <- future_fetches
|
||||
async_fetches sync_fetches
|
||||
return waits
|
||||
where
|
||||
fully_async_fetches :: IO ()
|
||||
fully_async_fetches = sequence_
|
||||
[f reqs | FetchToDo reqs (BackgroundFetch f) <- fetches]
|
||||
|
||||
future_fetches :: IO [IO ()]
|
||||
future_fetches = sequence
|
||||
[f reqs | FetchToDo reqs (FutureFetch f) <- fetches]
|
||||
|
||||
async_fetches :: IO () -> IO ()
|
||||
async_fetches = compose
|
||||
[f reqs | FetchToDo reqs (AsyncFetch f) <- fetches]
|
||||
|
@ -195,11 +195,6 @@ data Env u w = Env
|
||||
-- become non-empty is how the scheduler blocks waiting for
|
||||
-- data fetches to return.
|
||||
|
||||
, pendingWaits :: [IO ()]
|
||||
-- ^ this is a list of IO actions returned by 'FutureFetch'
|
||||
-- data sources. These do a blocking wait for the results of
|
||||
-- some data fetch.
|
||||
|
||||
, speculative :: {-# UNPACK #-} !Int
|
||||
|
||||
, writeLogsRef :: {-# UNPACK #-} !(IORef (WriteTree w))
|
||||
@ -251,7 +246,6 @@ initEnvWithData states e (dcache, mcache) = do
|
||||
, runQueueRef = rq
|
||||
, submittedReqsRef = sr
|
||||
, completions = comps
|
||||
, pendingWaits = []
|
||||
, speculative = 0
|
||||
, writeLogsRef = wl
|
||||
, writeLogsRefNoMemo = wlnm
|
||||
|
@ -137,13 +137,7 @@ runHaxlWithWrites env@Env{..} haxl = do
|
||||
ifTrace flags 3 $ printf "emptyRunQueue\n"
|
||||
haxls <- checkCompletions env
|
||||
case haxls of
|
||||
JobNil -> do
|
||||
case pendingWaits of
|
||||
[] -> checkRequestStore env
|
||||
wait:waits -> do
|
||||
ifTrace flags 3 $ printf "invoking wait\n"
|
||||
wait
|
||||
emptyRunQueue env { pendingWaits = waits } -- check completions
|
||||
JobNil -> checkRequestStore env
|
||||
_ -> reschedule env haxls
|
||||
|
||||
checkRequestStore :: Env u w -> IO ()
|
||||
@ -153,15 +147,15 @@ runHaxlWithWrites env@Env{..} haxl = do
|
||||
then waitCompletions env
|
||||
else do
|
||||
writeIORef reqStoreRef noRequests
|
||||
(_, waits) <- performRequestStore 0 env reqStore
|
||||
ifTrace flags 3 $ printf "performFetches: %d waits\n" (length waits)
|
||||
performRequestStore env reqStore
|
||||
ifTrace flags 3 $ printf "performFetches\n"
|
||||
-- empty the cache if we're not caching. Is this the best
|
||||
-- place to do it? We do get to de-duplicate requests that
|
||||
-- happen simultaneously.
|
||||
when (caching flags == 0) $ do
|
||||
let DataCache dc = dataCache
|
||||
H.foldM (\_ (k, _) -> H.delete dc k) () dc
|
||||
emptyRunQueue env{ pendingWaits = waits ++ pendingWaits }
|
||||
emptyRunQueue env
|
||||
|
||||
checkCompletions :: Env u w -> IO (JobList u w)
|
||||
checkCompletions Env{..} = do
|
||||
|
@ -1,7 +1,11 @@
|
||||
# Changes in version 2.3.0.0
|
||||
* Removed `FutureFetch`
|
||||
|
||||
# Changes in version 2.2.0.0
|
||||
|
||||
* Use BasicHashTable for the Haxl DataCache instead of HashMap
|
||||
* API Changes in: Haxl.Core.DataCache, Haxl.Core.Fetch
|
||||
* Removed support for GHC < 8.2
|
||||
|
||||
# Changes in version 2.1.2.0
|
||||
|
||||
|
@ -161,11 +161,10 @@ data PerformFetch
|
||||
= SyncFetch ([BlockedFetch req] -> IO ())
|
||||
| AsyncFetch ([BlockedFetch req] -> IO () -> IO ())
|
||||
| BackgroundFetch ([BlockedFetch req] -> IO ())
|
||||
| FutureFetch ([BlockedFetch req] -> IO (IO ()))
|
||||
```
|
||||
|
||||
A data source can fetch either synchronously (`SyncFetch`),
|
||||
asynchronously (`AsyncFetch` or `FutureFetch`), or in the background
|
||||
asynchronously (`AsyncFetch`), or in the background
|
||||
(`BackgroundFetch`). The `BackgroundFetch` option is the most
|
||||
flexible because it allows fetching to proceed concurrently with
|
||||
computation.
|
||||
|
@ -1,5 +1,5 @@
|
||||
name: haxl
|
||||
version: 2.2.0.0
|
||||
version: 2.3.0.0
|
||||
synopsis: A Haskell library for efficient, concurrent,
|
||||
and concise data access.
|
||||
homepage: https://github.com/facebook/Haxl
|
||||
|
@ -28,6 +28,10 @@ import Data.Typeable
|
||||
import Prelude ()
|
||||
import qualified Data.Map as Map
|
||||
import qualified Data.Text as Text
|
||||
import Control.Concurrent
|
||||
import Control.Exception
|
||||
import Control.Monad (void)
|
||||
|
||||
|
||||
import Haxl.Prelude
|
||||
import Haxl.Core
|
||||
@ -57,17 +61,20 @@ instance DataSourceName TAOReq where
|
||||
|
||||
instance DataSource UserEnv TAOReq where
|
||||
fetch TAOState{..} _flags _user
|
||||
| future = FutureFetch $ return . mapM_ doFetch
|
||||
| otherwise = SyncFetch $ mapM_ doFetch
|
||||
| future = BackgroundFetch $ \f -> do
|
||||
mask_ $ void . forkIO $ mapM_ (doFetch True) f
|
||||
| otherwise = SyncFetch $ mapM_ (doFetch False)
|
||||
|
||||
initGlobalState :: Bool -> IO (State TAOReq)
|
||||
initGlobalState future = return TAOState { future=future }
|
||||
|
||||
doFetch :: BlockedFetch TAOReq -> IO ()
|
||||
doFetch (BlockedFetch req@(AssocRangeId2s a b) r) =
|
||||
case Map.lookup (a, b) assocs of
|
||||
Nothing -> putFailure r . NotFound . Text.pack $ show req
|
||||
Just result -> putSuccess r result
|
||||
doFetch :: Bool -> BlockedFetch TAOReq -> IO ()
|
||||
doFetch bg (BlockedFetch req@(AssocRangeId2s a b) r) = put result
|
||||
where put = if bg then putResultFromChildThread r else putResult r
|
||||
result = case Map.lookup (a, b) assocs of
|
||||
Nothing -> except . NotFound . Text.pack $ show req
|
||||
Just result -> Right result
|
||||
|
||||
|
||||
assocs :: Map (Id,Id) [Id]
|
||||
assocs = Map.fromList [
|
||||
|
Loading…
Reference in New Issue
Block a user