diff --git a/cabal.project b/cabal.project index ed6aa71..d281fbc 100644 --- a/cabal.project +++ b/cabal.project @@ -1,3 +1,11 @@ packages: heftia/ heftia-effects/ + +source-repository-package + type: git + location: https://github.com/sayo-hs/data-effects + tag: 5bb33aa9186a4d7de19341a490d8aef9e5798dfd + subdir: data-effects-core + subdir: data-effects-th + subdir: data-effects diff --git a/heftia-effects/Example/Pipe/Main.hs b/heftia-effects/Example/Pipe/Main.hs new file mode 100644 index 0000000..ea2e6fa --- /dev/null +++ b/heftia-effects/Example/Pipe/Main.hs @@ -0,0 +1,39 @@ +-- This Source Code Form is subject to the terms of the Mozilla Public +-- License, v. 2.0. If a copy of the MPL was not distributed with this +-- file, You can obtain one at https://mozilla.org/MPL/2.0/. + +module Main where + +import Control.Effect.Handler.Heftia.Concurrent.Pipe.Async (runAsyncPipe) +import Control.Effect.Handler.Heftia.Concurrent.Pipe.MVar (runMVarPipeLine) +import Control.Effect.Handler.Heftia.Unlift (runUnliftIO) +import Control.Monad.IO.Class (liftIO) +import Data.Effect.Concurrent.Pipe ( + consume, + feed, + passthrough, + pipeLoop, + unmaskPipe, + (*|>), + (|*>), + (|>), + ) + +main :: IO () +main = runUnliftIO + . runAsyncPipe + . runMVarPipeLine @String + $ do + unmaskPipe @String do + _ <- feed "direct pipe test" |> (liftIO . putStrLn =<< consume) + _ <- feed "passthrough test" *|> passthrough |*> (liftIO . putStrLn =<< consume) + _ <- pipeLoop @String do + feed "loop test" + liftIO . putStrLn =<< consume + pure () + +{- result: +direct pipe test +passthrough test +loop test +-} diff --git a/heftia-effects/heftia-effects.cabal b/heftia-effects/heftia-effects.cabal index f72734a..b9068c0 100644 --- a/heftia-effects/heftia-effects.cabal +++ b/heftia-effects/heftia-effects.cabal @@ -57,6 +57,9 @@ common common-base ghc-typelits-knownnat ^>= 0.7, data-effects ^>= 0.1, heftia ^>= 0.2, + lens, + async, + data-default, ghc-options: -Wall -fplugin GHC.TypeLits.KnownNat.Solver @@ -80,6 +83,8 @@ library Control.Effect.Handler.Heftia.KVStore Control.Effect.Handler.Heftia.Fresh Control.Effect.Handler.Heftia.Fail + Control.Effect.Handler.Heftia.Concurrent.Pipe.MVar + Control.Effect.Handler.Heftia.Concurrent.Pipe.Async reexported-modules: Control.Effect.Hefty, @@ -126,6 +131,10 @@ library Data.Effect.KVStore, Data.Effect.Fresh, Data.Effect.Fail, + Data.Effect.Concurrent.Pipe, + Data.Effect.Concurrent.Timer, + Data.Effect.Shell, + Data.Effect.Foldl, -- Modules included in this executable, other than Main. -- other-modules: @@ -218,3 +227,11 @@ executable SemanticsZoo hs-source-dirs: Example/SemanticsZoo build-depends: heftia-effects, + +executable Pipe + import: common-base + + main-is: Main.hs + hs-source-dirs: Example/Pipe + build-depends: + heftia-effects, diff --git a/heftia-effects/src/Control/Effect/Handler/Heftia/Concurrent/Pipe/Async.hs b/heftia-effects/src/Control/Effect/Handler/Heftia/Concurrent/Pipe/Async.hs new file mode 100644 index 0000000..6d7c50b --- /dev/null +++ b/heftia-effects/src/Control/Effect/Handler/Heftia/Concurrent/Pipe/Async.hs @@ -0,0 +1,44 @@ +-- This Source Code Form is subject to the terms of the Mozilla Public +-- License, v. 2.0. If a copy of the MPL was not distributed with this +-- file, You can obtain one at https://mozilla.org/MPL/2.0/. + +module Control.Effect.Handler.Heftia.Concurrent.Pipe.Async where + +import Control.Applicative (empty) +import Control.Arrow ((>>>)) +import Control.Effect (type (~>)) +import Control.Effect.ExtensibleFinal (type (:!!)) +import Control.Effect.Hefty (interpretRec, interpretRecH) +import Control.Lens ((^?), _Just, _Right) +import Data.Effect.Concurrent.Pipe (LPipeF, LYield, PipeF (Passthrough), PipeH (..), Yield (Yield)) +import Data.Effect.Unlift (UnliftIO) +import Data.Hefty.Extensible (ForallHFunctor, type (<<|), type (<|)) +import Data.Tuple (swap) +import UnliftIO (MonadUnliftIO, atomically) +import UnliftIO.Async (withAsync) +import UnliftIO.Async qualified as Async +import UnliftIO.Concurrent qualified as Conc + +runAsyncPipe :: + forall eh ef. + (UnliftIO <<| eh, IO <| ef, ForallHFunctor eh) => + PipeH ': eh :!! LPipeF ': LYield ': ef ~> eh :!! ef +runAsyncPipe = + interpretRecH \case + PipeTo a b -> a `Async.concurrently` b + FstWaitPipeTo a b -> a `thenStopAsync` b + SndWaitPipeTo a b -> swap <$> b `thenStopAsync` a + RacePipeTo a b -> a `Async.race` b + WaitBoth _ a b -> a `Async.concurrently` b + ThenStop _ a b -> a `thenStopAsync` b + Race _ a b -> a `Async.race` b + >>> interpretRec (\Passthrough -> atomically empty) + >>> interpretRec (\Yield -> Conc.yield) + +thenStopAsync :: MonadUnliftIO m => m a -> m b -> m (a, Maybe b) +thenStopAsync m1 m2 = + withAsync m1 \a1 -> + withAsync m2 \a2 -> do + r1 <- Async.wait a1 + r2 <- Async.poll a2 + pure (r1, r2 ^? _Just . _Right) diff --git a/heftia-effects/src/Control/Effect/Handler/Heftia/Concurrent/Pipe/MVar.hs b/heftia-effects/src/Control/Effect/Handler/Heftia/Concurrent/Pipe/MVar.hs new file mode 100644 index 0000000..7d05106 --- /dev/null +++ b/heftia-effects/src/Control/Effect/Handler/Heftia/Concurrent/Pipe/MVar.hs @@ -0,0 +1,131 @@ +{-# LANGUAGE AllowAmbiguousTypes #-} +{-# LANGUAGE TemplateHaskell #-} + +-- This Source Code Form is subject to the terms of the Mozilla Public +-- License, v. 2.0. If a copy of the MPL was not distributed with this +-- file, You can obtain one at https://mozilla.org/MPL/2.0/. + +module Control.Effect.Handler.Heftia.Concurrent.Pipe.MVar where + +import Control.Applicative (empty) +import Control.Arrow ((>>>)) +import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar, readMVar, tryPutMVar, tryReadMVar) +import Control.Effect (type (<<:), type (~>)) +import Control.Effect.ExtensibleFinal ((:!!)) +import Control.Effect.Handler.Heftia.Reader (runReader) +import Control.Effect.Hefty ( + interposeRec, + interposeRecH, + interpretRec, + interpretRecH, + raiseUnder2, + raiseUnderH, + type ($), + ) +import Control.Lens (makeLenses, (.~), (?~), (^.)) +import Control.Monad (forever, liftM2) +import Control.Monad.IO.Class (MonadIO, liftIO) +import Data.Effect.Concurrent.Pipe +import Data.Effect.Reader (Ask (..), Local (..), asks, local) +import Data.Function ((&)) +import Data.Hefty.Extensible (ForallHFunctor, type (<<|), type (<|)) +import GHC.Conc (atomically) +import GHC.Generics (Generic) + +data MVarPipeEnv p = MVarPipeEnv + { _inputMVar :: Maybe (MVar p) + , _outputMVar :: Maybe (MVar p) + , _pipeMasked :: Bool + } + deriving stock (Generic) +makeLenses ''MVarPipeEnv + +runMVarPipeLine :: + forall p eh ef. + (PipeH <<| eh, PipeF <| ef, Yield <| ef, IO <| ef, ForallHFunctor eh) => + PipeLine p ': eh :!! LFeed p ': LConsume p ': ef ~> eh :!! ef +runMVarPipeLine = + (raiseUnderH >>> raiseUnder2) + >>> interpretRecH \case + UnmaskPipe a -> local (pipeMasked @p .~ False) a + MaskPipe a -> local (pipeMasked @p .~ True) a + PipeLoop a -> do + v <- newEmptyMVar @p & liftIO + local ((inputMVar ?~ v) . (outputMVar ?~ v)) a + >>> applyPipeMVar @p + >>> interpretRec \case + TryFeed x -> maybe (pure True) (liftIO . (`tryPutMVar` x)) =<< asks (^. outputMVar) + Feed x -> mapM_ (liftIO . (`putMVar` x)) =<< asks (^. outputMVar) + >>> interpretRec \case + TryConsume -> maybe (pure Nothing) (liftIO . tryReadMVar) =<< asks (^. inputMVar @p) + Consume -> liftIO . maybe (atomically empty) readMVar =<< asks (^. inputMVar @p) + >>> runReader @(MVarPipeEnv p) (MVarPipeEnv Nothing Nothing True) + +applyPipeMVar :: + forall p eh ef. + ( PipeH <<| eh + , PipeF <| ef + , Yield <| ef + , Feed p <| ef + , Consume p <| ef + , Local (MVarPipeEnv p) <<| eh + , Ask (MVarPipeEnv p) <| ef + , IO <| ef + , ForallHFunctor eh + ) => + eh :!! ef ~> eh :!! ef +applyPipeMVar = + interposeRecH \case + PipeTo a b -> pipe pipeTo a b + FstWaitPipeTo a b -> pipe fstWaitPipeTo a b + SndWaitPipeTo a b -> pipe sndWaitPipeTo a b + RacePipeTo a b -> pipe racePipeTo a b + WaitBoth d a b -> branch d (waitBoth d) a b + ThenStop d a b -> branch d (thenStop d) a b + Race d a b -> branch d (race d) a b + >>> interposeRec \Passthrough -> defaultPassthrough @p + where + pipe :: + (eh :!! ef $ a -> eh :!! ef $ b -> eh :!! ef $ x) -> + eh :!! ef $ a -> + eh :!! ef $ b -> + eh :!! ef $ x + pipe f upstream downstream = do + mask <- asks (^. pipeMasked @p) + if mask + then f upstream downstream + else do + v <- newEmptyMVar @p & liftIO + f + (local (outputMVar ?~ v) upstream) + (local (inputMVar ?~ v) downstream) + + branch :: + Bool -> + (eh :!! ef $ a -> eh :!! ef $ b -> eh :!! ef $ x) -> + eh :!! ef $ a -> + eh :!! ef $ b -> + eh :!! ef $ x + branch doesDistribute f a b = do + mask <- asks (^. pipeMasked @p) + if doesDistribute && not mask + then do + iv <- asks (^. inputMVar @p) + ov <- asks (^. outputMVar @p) + dupMVarMay iv \ivs -> + dupMVarMay ov \ovs -> + f + (local ((inputMVar .~ (fst <$> ivs)) . (outputMVar .~ (fst <$> ovs))) a) + (local ((inputMVar .~ (snd <$> ivs)) . (outputMVar .~ (snd <$> ovs))) b) + else f a b + + dupMVarMay :: (MonadIO m, PipeH <<: m) => Maybe (MVar a) -> (Maybe (MVar a, MVar a) -> m b) -> m b + dupMVarMay = maybe ($ Nothing) ((. (. Just)) . dupMVar) + + dupMVar :: (MonadIO m, PipeH <<: m) => MVar a -> ((MVar a, MVar a) -> m b) -> m b + dupMVar v f = do + (v1, v2) <- liftIO $ liftM2 (,) newEmptyMVar newEmptyMVar + fmap fst $ + f (v1, v2) *|| forever do + ev <- liftIO $ readMVar v + liftIO (putMVar v1 ev) *|* liftIO (putMVar v2 ev) diff --git a/heftia-effects/src/Control/Effect/Handler/Heftia/Shell.hs b/heftia-effects/src/Control/Effect/Handler/Heftia/Shell.hs new file mode 100644 index 0000000..4dfb38c --- /dev/null +++ b/heftia-effects/src/Control/Effect/Handler/Heftia/Shell.hs @@ -0,0 +1,11 @@ +-- This Source Code Form is subject to the terms of the Mozilla Public +-- License, v. 2.0. If a copy of the MPL was not distributed with this +-- file, You can obtain one at https://mozilla.org/MPL/2.0/. + +{- | +Copyright : (c) 2024 Yamada Ryo +License : MPL-2.0 (see the file LICENSE) +Maintainer : ymdfield@outlook.jp +Stability : experimental +Portability : portable +-}