[add] effectful pipes.

This commit is contained in:
Yamada Ryo 2024-08-12 00:22:40 +09:00
parent f7875c734d
commit 02cd13b934
No known key found for this signature in database
GPG Key ID: AAE3C7A542B02DBF
6 changed files with 250 additions and 0 deletions

View File

@ -1,3 +1,11 @@
packages:
heftia/
heftia-effects/
source-repository-package
type: git
location: https://github.com/sayo-hs/data-effects
tag: 5bb33aa9186a4d7de19341a490d8aef9e5798dfd
subdir: data-effects-core
subdir: data-effects-th
subdir: data-effects

View File

@ -0,0 +1,39 @@
-- This Source Code Form is subject to the terms of the Mozilla Public
-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at https://mozilla.org/MPL/2.0/.
module Main where
import Control.Effect.Handler.Heftia.Concurrent.Pipe.Async (runAsyncPipe)
import Control.Effect.Handler.Heftia.Concurrent.Pipe.MVar (runMVarPipeLine)
import Control.Effect.Handler.Heftia.Unlift (runUnliftIO)
import Control.Monad.IO.Class (liftIO)
import Data.Effect.Concurrent.Pipe (
consume,
feed,
passthrough,
pipeLoop,
unmaskPipe,
(*|>),
(|*>),
(|>),
)
main :: IO ()
main = runUnliftIO
. runAsyncPipe
. runMVarPipeLine @String
$ do
unmaskPipe @String do
_ <- feed "direct pipe test" |> (liftIO . putStrLn =<< consume)
_ <- feed "passthrough test" *|> passthrough |*> (liftIO . putStrLn =<< consume)
_ <- pipeLoop @String do
feed "loop test"
liftIO . putStrLn =<< consume
pure ()
{- result:
direct pipe test
passthrough test
loop test
-}

View File

@ -57,6 +57,9 @@ common common-base
ghc-typelits-knownnat ^>= 0.7,
data-effects ^>= 0.1,
heftia ^>= 0.2,
lens,
async,
data-default,
ghc-options: -Wall -fplugin GHC.TypeLits.KnownNat.Solver
@ -80,6 +83,8 @@ library
Control.Effect.Handler.Heftia.KVStore
Control.Effect.Handler.Heftia.Fresh
Control.Effect.Handler.Heftia.Fail
Control.Effect.Handler.Heftia.Concurrent.Pipe.MVar
Control.Effect.Handler.Heftia.Concurrent.Pipe.Async
reexported-modules:
Control.Effect.Hefty,
@ -126,6 +131,10 @@ library
Data.Effect.KVStore,
Data.Effect.Fresh,
Data.Effect.Fail,
Data.Effect.Concurrent.Pipe,
Data.Effect.Concurrent.Timer,
Data.Effect.Shell,
Data.Effect.Foldl,
-- Modules included in this executable, other than Main.
-- other-modules:
@ -218,3 +227,11 @@ executable SemanticsZoo
hs-source-dirs: Example/SemanticsZoo
build-depends:
heftia-effects,
executable Pipe
import: common-base
main-is: Main.hs
hs-source-dirs: Example/Pipe
build-depends:
heftia-effects,

View File

@ -0,0 +1,44 @@
-- This Source Code Form is subject to the terms of the Mozilla Public
-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at https://mozilla.org/MPL/2.0/.
module Control.Effect.Handler.Heftia.Concurrent.Pipe.Async where
import Control.Applicative (empty)
import Control.Arrow ((>>>))
import Control.Effect (type (~>))
import Control.Effect.ExtensibleFinal (type (:!!))
import Control.Effect.Hefty (interpretRec, interpretRecH)
import Control.Lens ((^?), _Just, _Right)
import Data.Effect.Concurrent.Pipe (LPipeF, LYield, PipeF (Passthrough), PipeH (..), Yield (Yield))
import Data.Effect.Unlift (UnliftIO)
import Data.Hefty.Extensible (ForallHFunctor, type (<<|), type (<|))
import Data.Tuple (swap)
import UnliftIO (MonadUnliftIO, atomically)
import UnliftIO.Async (withAsync)
import UnliftIO.Async qualified as Async
import UnliftIO.Concurrent qualified as Conc
runAsyncPipe ::
forall eh ef.
(UnliftIO <<| eh, IO <| ef, ForallHFunctor eh) =>
PipeH ': eh :!! LPipeF ': LYield ': ef ~> eh :!! ef
runAsyncPipe =
interpretRecH \case
PipeTo a b -> a `Async.concurrently` b
FstWaitPipeTo a b -> a `thenStopAsync` b
SndWaitPipeTo a b -> swap <$> b `thenStopAsync` a
RacePipeTo a b -> a `Async.race` b
WaitBoth _ a b -> a `Async.concurrently` b
ThenStop _ a b -> a `thenStopAsync` b
Race _ a b -> a `Async.race` b
>>> interpretRec (\Passthrough -> atomically empty)
>>> interpretRec (\Yield -> Conc.yield)
thenStopAsync :: MonadUnliftIO m => m a -> m b -> m (a, Maybe b)
thenStopAsync m1 m2 =
withAsync m1 \a1 ->
withAsync m2 \a2 -> do
r1 <- Async.wait a1
r2 <- Async.poll a2
pure (r1, r2 ^? _Just . _Right)

View File

@ -0,0 +1,131 @@
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE TemplateHaskell #-}
-- This Source Code Form is subject to the terms of the Mozilla Public
-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at https://mozilla.org/MPL/2.0/.
module Control.Effect.Handler.Heftia.Concurrent.Pipe.MVar where
import Control.Applicative (empty)
import Control.Arrow ((>>>))
import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar, readMVar, tryPutMVar, tryReadMVar)
import Control.Effect (type (<<:), type (~>))
import Control.Effect.ExtensibleFinal ((:!!))
import Control.Effect.Handler.Heftia.Reader (runReader)
import Control.Effect.Hefty (
interposeRec,
interposeRecH,
interpretRec,
interpretRecH,
raiseUnder2,
raiseUnderH,
type ($),
)
import Control.Lens (makeLenses, (.~), (?~), (^.))
import Control.Monad (forever, liftM2)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.Effect.Concurrent.Pipe
import Data.Effect.Reader (Ask (..), Local (..), asks, local)
import Data.Function ((&))
import Data.Hefty.Extensible (ForallHFunctor, type (<<|), type (<|))
import GHC.Conc (atomically)
import GHC.Generics (Generic)
data MVarPipeEnv p = MVarPipeEnv
{ _inputMVar :: Maybe (MVar p)
, _outputMVar :: Maybe (MVar p)
, _pipeMasked :: Bool
}
deriving stock (Generic)
makeLenses ''MVarPipeEnv
runMVarPipeLine ::
forall p eh ef.
(PipeH <<| eh, PipeF <| ef, Yield <| ef, IO <| ef, ForallHFunctor eh) =>
PipeLine p ': eh :!! LFeed p ': LConsume p ': ef ~> eh :!! ef
runMVarPipeLine =
(raiseUnderH >>> raiseUnder2)
>>> interpretRecH \case
UnmaskPipe a -> local (pipeMasked @p .~ False) a
MaskPipe a -> local (pipeMasked @p .~ True) a
PipeLoop a -> do
v <- newEmptyMVar @p & liftIO
local ((inputMVar ?~ v) . (outputMVar ?~ v)) a
>>> applyPipeMVar @p
>>> interpretRec \case
TryFeed x -> maybe (pure True) (liftIO . (`tryPutMVar` x)) =<< asks (^. outputMVar)
Feed x -> mapM_ (liftIO . (`putMVar` x)) =<< asks (^. outputMVar)
>>> interpretRec \case
TryConsume -> maybe (pure Nothing) (liftIO . tryReadMVar) =<< asks (^. inputMVar @p)
Consume -> liftIO . maybe (atomically empty) readMVar =<< asks (^. inputMVar @p)
>>> runReader @(MVarPipeEnv p) (MVarPipeEnv Nothing Nothing True)
applyPipeMVar ::
forall p eh ef.
( PipeH <<| eh
, PipeF <| ef
, Yield <| ef
, Feed p <| ef
, Consume p <| ef
, Local (MVarPipeEnv p) <<| eh
, Ask (MVarPipeEnv p) <| ef
, IO <| ef
, ForallHFunctor eh
) =>
eh :!! ef ~> eh :!! ef
applyPipeMVar =
interposeRecH \case
PipeTo a b -> pipe pipeTo a b
FstWaitPipeTo a b -> pipe fstWaitPipeTo a b
SndWaitPipeTo a b -> pipe sndWaitPipeTo a b
RacePipeTo a b -> pipe racePipeTo a b
WaitBoth d a b -> branch d (waitBoth d) a b
ThenStop d a b -> branch d (thenStop d) a b
Race d a b -> branch d (race d) a b
>>> interposeRec \Passthrough -> defaultPassthrough @p
where
pipe ::
(eh :!! ef $ a -> eh :!! ef $ b -> eh :!! ef $ x) ->
eh :!! ef $ a ->
eh :!! ef $ b ->
eh :!! ef $ x
pipe f upstream downstream = do
mask <- asks (^. pipeMasked @p)
if mask
then f upstream downstream
else do
v <- newEmptyMVar @p & liftIO
f
(local (outputMVar ?~ v) upstream)
(local (inputMVar ?~ v) downstream)
branch ::
Bool ->
(eh :!! ef $ a -> eh :!! ef $ b -> eh :!! ef $ x) ->
eh :!! ef $ a ->
eh :!! ef $ b ->
eh :!! ef $ x
branch doesDistribute f a b = do
mask <- asks (^. pipeMasked @p)
if doesDistribute && not mask
then do
iv <- asks (^. inputMVar @p)
ov <- asks (^. outputMVar @p)
dupMVarMay iv \ivs ->
dupMVarMay ov \ovs ->
f
(local ((inputMVar .~ (fst <$> ivs)) . (outputMVar .~ (fst <$> ovs))) a)
(local ((inputMVar .~ (snd <$> ivs)) . (outputMVar .~ (snd <$> ovs))) b)
else f a b
dupMVarMay :: (MonadIO m, PipeH <<: m) => Maybe (MVar a) -> (Maybe (MVar a, MVar a) -> m b) -> m b
dupMVarMay = maybe ($ Nothing) ((. (. Just)) . dupMVar)
dupMVar :: (MonadIO m, PipeH <<: m) => MVar a -> ((MVar a, MVar a) -> m b) -> m b
dupMVar v f = do
(v1, v2) <- liftIO $ liftM2 (,) newEmptyMVar newEmptyMVar
fmap fst $
f (v1, v2) *|| forever do
ev <- liftIO $ readMVar v
liftIO (putMVar v1 ev) *|* liftIO (putMVar v2 ev)

View File

@ -0,0 +1,11 @@
-- This Source Code Form is subject to the terms of the Mozilla Public
-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at https://mozilla.org/MPL/2.0/.
{- |
Copyright : (c) 2024 Yamada Ryo
License : MPL-2.0 (see the file LICENSE)
Maintainer : ymdfield@outlook.jp
Stability : experimental
Portability : portable
-}