diff --git a/src/Semantic/Distribute.hs b/src/Semantic/Distribute.hs index d453fc11c..055d80af0 100644 --- a/src/Semantic/Distribute.hs +++ b/src/Semantic/Distribute.hs @@ -1,9 +1,10 @@ -{-# LANGUAGE GADTs, RankNTypes, TypeOperators #-} +{-# LANGUAGE GADTs, RankNTypes, TypeOperators, UndecidableInstances #-} module Semantic.Distribute where import qualified Control.Concurrent.Async as Async -import Control.Monad.Effect +import Control.Monad.Effect hiding (run) import Control.Monad.Effect.Exception +import Control.Monad.Effect.Run import Control.Monad.IO.Class import Control.Parallel.Strategies import Prologue hiding (MonadError (..)) @@ -44,10 +45,16 @@ data Distribute task output where Bidistribute :: Bitraversable t => t (task output1) (task output2) -> Distribute task (t output1 output2) -runDistribute :: Members '[Exc SomeException, IO] effs => (forall output . task output -> IO (Either SomeException output)) -> Eff (Distribute task ': effs) a -> Eff effs a -runDistribute run = interpret (\ task -> case task of - Distribute tasks -> liftIO (Async.mapConcurrently run tasks) >>= either throwError pure . sequenceA . withStrategy (parTraversable (parTraversable rseq)) - Bidistribute tasks -> liftIO (Async.runConcurrently (bitraverse (Async.Concurrently . run) (Async.Concurrently . run) tasks)) >>= either throwError pure . bisequenceA . withStrategy (parBitraversable (parTraversable rseq) (parTraversable rseq))) +runDistribute :: Members '[Exc SomeException, IO] effs => Eff (Distribute task ': effs) a -> Action task -> Eff effs a +runDistribute m action = interpret (\ task -> case task of + Distribute tasks -> liftIO (Async.mapConcurrently (runAction action) tasks) >>= either throwError pure . sequenceA . withStrategy (parTraversable (parTraversable rseq)) + Bidistribute tasks -> liftIO (Async.runConcurrently (bitraverse (Async.Concurrently . runAction action) (Async.Concurrently . runAction action) tasks)) >>= either throwError pure . bisequenceA . withStrategy (parBitraversable (parTraversable rseq) (parTraversable rseq))) m parBitraversable :: Bitraversable t => Strategy a -> Strategy b -> Strategy (t a b) parBitraversable strat1 strat2 = bitraverse (rparWith strat1) (rparWith strat2) + + +newtype Action task = Action { runAction :: forall output . task output -> IO (Either SomeException output) } + +instance (Members '[Exc SomeException, IO] effects, Run effects result rest) => Run (Distribute task ': effects) result (Action task -> rest) where + run = fmap run . runDistribute diff --git a/src/Semantic/Task.hs b/src/Semantic/Task.hs index acff031bb..ae544d6db 100644 --- a/src/Semantic/Task.hs +++ b/src/Semantic/Task.hs @@ -128,7 +128,7 @@ runTaskWithOptions options task = do (result, stat) <- withTiming "run" [] $ do let run :: Task a -> IO (Either SomeException a) - run = runM . runError . flip runReader (Queues logger statter) . runTelemetry . flip runReader options . IO.runFiles . runTaskF . runDistribute (run . unwrapTask) + run = runM . runError . flip runReader (Queues logger statter) . runTelemetry . flip runReader options . IO.runFiles . runTaskF . flip runDistribute (Action (run . unwrapTask)) run task queue statter stat