mirror of
https://github.com/github/semantic.git
synced 2025-01-04 05:27:08 +03:00
Port over the thoroughly-benighted Distribute effect.
This needs to die.
This commit is contained in:
parent
dda7ebbe6b
commit
3cb89b0aaf
@ -1,4 +1,5 @@
|
||||
{-# LANGUAGE DeriveFunctor, ExistentialQuantification, FlexibleContexts, FlexibleInstances, GeneralizedNewtypeDeriving, MultiParamTypeClasses, StandaloneDeriving, TypeOperators, UndecidableInstances #-}
|
||||
-- TODO: We should kill this entirely, because with fused-effects 1.0 we can unlift the various runConcurrently operations.
|
||||
module Semantic.Distribute
|
||||
( distribute
|
||||
, distributeFor
|
||||
@ -10,8 +11,8 @@ module Semantic.Distribute
|
||||
) where
|
||||
|
||||
import qualified Control.Concurrent.Async as Async
|
||||
import Control.Effect.Carrier
|
||||
import Control.Effect.Reader
|
||||
import Control.Algebra
|
||||
import Control.Carrier.Reader
|
||||
import Control.Monad.IO.Unlift
|
||||
import Control.Parallel.Strategies
|
||||
import Prologue
|
||||
@ -19,19 +20,19 @@ import Prologue
|
||||
-- | Distribute a 'Traversable' container of tasks over the available cores (i.e. execute them concurrently), collecting their results.
|
||||
--
|
||||
-- This is a concurrent analogue of 'sequenceA'.
|
||||
distribute :: (Member Distribute sig, Traversable t, Carrier sig m) => t (m output) -> m (t output)
|
||||
distribute :: (Has Distribute sig m, Traversable t) => t (m output) -> m (t output)
|
||||
distribute = fmap (withStrategy (parTraversable rseq)) <$> traverse (send . flip Distribute pure)
|
||||
|
||||
-- | Distribute the application of a function to each element of a 'Traversable' container of inputs over the available cores (i.e. perform the function concurrently for each element), collecting the results.
|
||||
--
|
||||
-- This is a concurrent analogue of 'for' or 'traverse' (with the arguments flipped).
|
||||
distributeFor :: (Member Distribute sig, Traversable t, Carrier sig m) => t a -> (a -> m output) -> m (t output)
|
||||
distributeFor :: (Has Distribute sig m, Traversable t) => t a -> (a -> m output) -> m (t output)
|
||||
distributeFor inputs toTask = distribute (fmap toTask inputs)
|
||||
|
||||
-- | Distribute the application of a function to each element of a 'Traversable' container of inputs over the available cores (i.e. perform the function concurrently for each element), combining the results 'Monoid'ally into a final value.
|
||||
--
|
||||
-- This is a concurrent analogue of 'foldMap'.
|
||||
distributeFoldMap :: (Member Distribute sig, Monoid output, Traversable t, Carrier sig m) => (a -> m output) -> t a -> m output
|
||||
distributeFoldMap :: (Has Distribute sig m, Monoid output, Traversable t) => (a -> m output) -> t a -> m output
|
||||
distributeFoldMap toTask inputs = fmap fold (distribute (fmap toTask inputs))
|
||||
|
||||
|
||||
@ -45,7 +46,7 @@ instance HFunctor Distribute where
|
||||
hmap f (Distribute m k) = Distribute (f m) (f . k)
|
||||
|
||||
instance Effect Distribute where
|
||||
handle state handler (Distribute task k) = Distribute (handler (task <$ state)) (handler . fmap k)
|
||||
thread state handler (Distribute task k) = Distribute (handler (task <$ state)) (handler . fmap k)
|
||||
|
||||
|
||||
-- | Evaluate a 'Distribute' effect concurrently.
|
||||
@ -60,11 +61,11 @@ newtype DistributeC m a = DistributeC { runDistributeC :: ReaderC (UnliftIO m) m
|
||||
|
||||
-- This can be simpler if we add an instance to fused-effects that takes
|
||||
-- care of this folderol for us (then we can justt derive the MonadUnliftIO instance)
|
||||
instance (MonadIO m, Carrier sig m) => MonadUnliftIO (DistributeC m) where
|
||||
instance (MonadIO m, Algebra sig m) => MonadUnliftIO (DistributeC m) where
|
||||
askUnliftIO = DistributeC . ReaderC $ \ u -> pure (UnliftIO (runDistribute u))
|
||||
|
||||
instance (Carrier sig m, MonadIO m) => Carrier (Distribute :+: sig) (DistributeC m) where
|
||||
eff (L (Distribute task k)) = do
|
||||
instance (Algebra sig m, MonadIO m) => Algebra (Distribute :+: sig) (DistributeC m) where
|
||||
alg (L (Distribute task k)) = do
|
||||
handler <- DistributeC ask
|
||||
liftIO (Async.runConcurrently (Async.Concurrently (runDistribute handler task))) >>= k
|
||||
eff (R other) = DistributeC (eff (R (handleCoercible other)))
|
||||
alg (R other) = DistributeC (alg (R (handleCoercible other)))
|
||||
|
Loading…
Reference in New Issue
Block a user