add backgroundFetchAcquireRelease (#107)

Summary:
Pull Request resolved: https://github.com/facebook/Haxl/pull/107

Add a simple mechanism (similar to asyncFetchAcquireRelease) to allow simple converting of data sources from AsyncFetch to BackgroundFetch

Reviewed By: simonmar

Differential Revision: D19272624

fbshipit-source-id: 3aec107de26fb59a4be3b2818b4f769f3404b15f
This commit is contained in:
Dylan Yudaken 2020-01-15 08:11:35 -08:00 committed by Facebook Github Bot
parent 65b9ab8595
commit f084f913b1
4 changed files with 132 additions and 28 deletions

View File

@ -87,6 +87,7 @@ module Haxl.Core (
-- ** Default fetch implementations
, asyncFetch, asyncFetchWithDispatch, asyncFetchAcquireRelease
, backgroundFetchAcquireRelease
, stubFetch
, syncFetch

View File

@ -39,6 +39,7 @@ module Haxl.Core.DataSource
-- * Default fetch implementations
, asyncFetch, asyncFetchWithDispatch
, asyncFetchAcquireRelease
, backgroundFetchAcquireRelease
, stubFetch
, syncFetch
@ -57,6 +58,13 @@ import Haxl.Core.Flags
import Haxl.Core.ShowP
import Haxl.Core.StateStore
import GHC.Conc ( newStablePtrPrimMVar
, PrimMVar)
import Control.Concurrent ( threadCapability
, forkOn
, myThreadId )
import Control.Concurrent.MVar
import Foreign.StablePtr
-- ---------------------------------------------------------------------------
-- DataSource class
@ -190,7 +198,7 @@ putSuccess :: ResultVar a -> a -> IO ()
putSuccess r = putResult r . Right
putResult :: ResultVar a -> Either SomeException a -> IO ()
putResult (ResultVar io) res = io res False
putResult (ResultVar io) res = io res False
-- | Like `putResult`, but used to get correct accounting when work is
-- being done in child threads. This is particularly important for
@ -380,3 +388,67 @@ submitFetch
-> IO (IO ())
submitFetch service fetchFn (BlockedFetch request result)
= (putResult result =<<) <$> fetchFn service request
backgroundFetchAcquireRelease
:: IO service
-- ^ Resource acquisition for this datasource
-> (service -> IO ())
-- ^ Resource release
-> (service -> Int -> StablePtr PrimMVar -> IO ())
-- ^ Dispatch all the pending requests and when ready trigger the given mvar
-> (service -> IO ())
-- ^ Process all requests
-> (forall a. service -> request a -> IO (IO (Either SomeException a)))
-- ^ Submits an individual request to the service.
-> State request
-- ^ Currently unused.
-> Flags
-- ^ Currently unused.
-> u
-- ^ Currently unused.
-> PerformFetch request
backgroundFetchAcquireRelease
acquire release dispatch process enqueue _state _flags _si =
BackgroundFetch $ \requests -> do
mvar <- newEmptyMVar
mask $ \restore -> do
(cap, _) <- threadCapability =<< myThreadId
service <- acquire
getResults <- (do
results <- restore $ mapM (submit service) requests
-- dispatch takes ownership of sp, so we call it under `mask` to
-- ensure that it can safely manage that resource.
sp <- newStablePtrPrimMVar mvar
dispatch service cap sp
return (sequence_ results)) `onException` release service
-- now spawn off a background thread to wait on the dispatch to finish
_tid <- forkOn cap $ do
takeMVar mvar
-- todo: it is possible that we would want to do
-- this processResults on the main scheduler thread for performance
-- which might reduce thread switching, especially for large batches
-- but for now this seems to work just fine
let rethrow = rethrowFromBg requests
_ <- finally
(restore $ (process service >> getResults) `catch` rethrow)
(release service `catch` rethrow)
return ()
return ()
where
rethrowFromBg requests (e :: SomeException) = do
mapM_ (rethrow1 e) requests
rethrowAsyncExceptions e
rethrow1 e (BlockedFetch _ result) =
putResultFromChildThread result (Left e)
-- similar to submitFetch but uses putResultFromChildThread
submit service (BlockedFetch request result) =
(putResultFromChildThread result =<<) <$> enqueue service request

View File

@ -34,6 +34,13 @@ import Data.Typeable
import Data.Hashable
import Control.Concurrent
import GHC.Conc ( PrimMVar )
import Foreign.StablePtr
import Foreign.C.Types ( CInt(..) )
foreign import ccall safe
hs_try_putmvar :: CInt -> StablePtr PrimMVar -> IO ()
data FailAfter a where
FailAfter :: Int -> FailAfter Int
deriving Typeable
@ -55,32 +62,42 @@ instance StateKey FailAfter where
, failDispatch :: IO ()
, failWaitDelay :: Int
, failWait :: IO ()
, failUseBackground :: Bool
}
instance DataSourceName FailAfter where
dataSourceName _ = "BadDataSource"
instance DataSource u FailAfter where
-- I'll define exampleFetch below
fetch state@FailAfterState{..} = asyncFetchAcquireRelease
(do threadDelay failAcquireDelay; failAcquire)
(\_ -> do threadDelay failReleaseDelay; failRelease)
(\_ -> do threadDelay failDispatchDelay; failDispatch)
(\_ -> do threadDelay failWaitDelay; failWait)
submit state
fetch state@FailAfterState{..}
| failUseBackground = bgFetch
acquire release dispatchbg wait
submit state
| otherwise = asyncFetchAcquireRelease
acquire release dispatch wait
submit state
where
bgFetch = backgroundFetchAcquireRelease
acquire = do threadDelay failAcquireDelay; failAcquire
release _ = do threadDelay failReleaseDelay; failRelease
dispatch _ = do threadDelay failDispatchDelay; failDispatch
dispatchbg _ i c = (do
failDispatch
_ <- mask_ $ forkIO $ finally
(threadDelay failDispatchDelay)
putmvar
return ()) `onException` putmvar
where
putmvar = hs_try_putmvar (fromIntegral i) c
wait _ = do threadDelay failWaitDelay; failWait
submit :: () -> FailAfter a -> IO (IO (Either SomeException a))
submit _ (FailAfter t) = do
threadDelay t
return (return (Left (toException (FetchError "failed request"))))
-- Every data source should define a function 'initGlobalState' that
-- initialises the state for that data source. The arguments to this
-- function might vary depending on the data source - we might need to
-- pass in resources from the environment, or parameters to set up the
-- data source.
initGlobalState :: IO (State FailAfter)
initGlobalState = do
initGlobalState :: Bool -> IO (State FailAfter)
initGlobalState useBackground = do
return FailAfterState
{ failAcquireDelay = 0
, failAcquire = return ()
@ -90,4 +107,5 @@ initGlobalState = do
, failDispatch = return ()
, failWaitDelay = 0
, failWait = return ()
, failUseBackground = useBackground
}

View File

@ -19,25 +19,28 @@ import Control.Exception
import ExampleDataSource
import BadDataSource
testEnv fn = do
testEnv bg fn = do
exstate <- ExampleDataSource.initGlobalState
badstate <- BadDataSource.initGlobalState
badstate <- BadDataSource.initGlobalState bg
let st = stateSet exstate $ stateSet (fn badstate) stateEmpty
initEnv st ()
wombats :: GenHaxl () () Int
wombats = length <$> listWombats 3
badDataSourceTest :: Test
badDataSourceTest = TestCase $ do
wombatsMany :: GenHaxl () () Int
wombatsMany = length <$> listWombats 7
badDataSourceTest :: Bool -> Test
badDataSourceTest bg = TestCase $ do
-- test that a failed acquire doesn't fail the other requests
ref <- newIORef False
env <- testEnv $ \st ->
env <- testEnv bg $ \st ->
st { failAcquire = throwIO (DataSourceError "acquire")
, failRelease = writeIORef ref True }
x <- runHaxl env $
(dataFetch (FailAfter 0) + wombats)
(dataFetch (FailAfter 0) + wombatsMany)
`Haxl.catch` \DataSourceError{} -> wombats
assertEqual "badDataSourceTest1" 3 x
@ -47,12 +50,12 @@ badDataSourceTest = TestCase $ do
-- test that a failed dispatch doesn't fail the other requests
ref <- newIORef False
env <- testEnv $ \st ->
env <- testEnv bg $ \st ->
st { failDispatch = throwIO (DataSourceError "dispatch")
, failRelease = writeIORef ref True }
x <- runHaxl env $
(dataFetch (FailAfter 0) + wombats)
(dataFetch (FailAfter 0) + wombatsMany)
`Haxl.catch` \DataSourceError{} -> wombats
assertEqual "badDataSourceTest3" x 3
@ -61,11 +64,11 @@ badDataSourceTest = TestCase $ do
assertEqual "badDataSourceTest4" True =<< readIORef ref
-- test that a failed wait is a DataSourceError
env <- testEnv $ \st ->
env <- testEnv bg $ \st ->
st { failWait = throwIO (DataSourceError "wait") }
x <- runHaxl env $
(dataFetch (FailAfter 0) + wombats)
(dataFetch (FailAfter 0) + wombatsMany)
`Haxl.catch` \DataSourceError{} -> wombats
assertEqual "badDataSourceTest5" x 3
@ -75,18 +78,28 @@ badDataSourceTest = TestCase $ do
-- test that a failed release is still a DataSourceError, even
-- though the request will have completed successfully
env <- testEnv $ \st ->
env <- testEnv bg $ \st ->
st { failRelease = throwIO (DataSourceError "release") }
x <- runHaxl env $
(dataFetch (FailAfter 0) + wombats)
(dataFetch (FailAfter 0) + wombatsMany)
`Haxl.catch` \DataSourceError{} -> wombats
assertEqual "badDataSourceTest7" x 3
-- test that if we don't throw anything we get the result
-- (which is a fetch error for this source)
env <- testEnv bg id
x <- runHaxl env $
(dataFetch (FailAfter 0) + wombatsMany)
`Haxl.catch` \FetchError{} -> wombats
assertEqual "badDataSourceTest8" x 3
tests = TestList
[ TestLabel "badDataSourceTest" badDataSourceTest
[ TestLabel "badDataSourceTest async" (badDataSourceTest False)
, TestLabel "badDataSourceTest background" (badDataSourceTest True)
]