[add] Subprocess effects.

This commit is contained in:
Yamada Ryo 2024-11-03 18:41:51 +09:00
parent 77a78dd206
commit 134d6bf155
No known key found for this signature in database
GPG Key ID: AAE3C7A542B02DBF
8 changed files with 392 additions and 37 deletions

View File

@ -13,7 +13,7 @@ allow-newer: eff:primitive
source-repository-package source-repository-package
type: git type: git
location: https://github.com/sayo-hs/data-effects location: https://github.com/sayo-hs/data-effects
tag: f4f78c801ac061e45f5305e1ade43580dc6c8357 tag: 48425e364068b5321c8afeee072cdc4d339c7c81
subdir: data-effects-core subdir: data-effects-core
subdir: data-effects-th subdir: data-effects-th
subdir: data-effects subdir: data-effects

View File

@ -52,20 +52,20 @@ main =
scope_ @"fs1" "/fs1" \_ -> do scope_ @"fs1" "/fs1" \_ -> do
scope_ @"fs2" "/fs2" \outer -> do scope_ @"fs2" "/fs2" \outer -> do
outer do outer do
s1 <- readFS' @"fs1" "/a/b/c" s1 <- readFS'' @"fs1" "/a/b/c"
liftIO $ putStrLn $ "content: " <> show s1 liftIO $ putStrLn $ "content: " <> show s1
writeFS' @"fs1" "/d/e/f" "foobar" writeFS'' @"fs1" "/d/e/f" "foobar"
liftIO $ putStrLn "-----" liftIO $ putStrLn "-----"
s2 <- readFS' @"fs2" "/a/b/c" s2 <- readFS'' @"fs2" "/a/b/c"
liftIO $ putStrLn $ "content: " <> show s2 liftIO $ putStrLn $ "content: " <> show s2
writeFS' @"fs2" "/d/e/f" "foobar" writeFS'' @"fs2" "/d/e/f" "foobar"
liftIO $ putStrLn "-----" liftIO $ putStrLn "-----"
transactFS' @"fs2" do transactFS'' @"fs2" do
outer $ transactFS' @"fs1" do outer $ transactFS'' @"fs1" do
liftIO $ print "hello" liftIO $ print "hello"
{- {-

View File

@ -0,0 +1,23 @@
-- SPDX-License-Identifier: MPL-2.0
{-# LANGUAGE PartialTypeSignatures #-}
module Main where
import Control.Monad.Hefty (liftIO, (&))
import Control.Monad.Hefty.Concurrent.Subprocess (
CreateProcess (stdout),
StdStream (CreatePipe),
SubprocResult,
readStdout'',
runSubprocIO,
scope,
shell,
)
import Control.Monad.Hefty.Unlift (runUnliftIO)
main :: IO ()
main = runUnliftIO . runSubprocIO $ do
r :: SubprocResult p a <-
scope @"echo" (shell "echo a b c") {stdout = CreatePipe} \_ -> do
readStdout'' @"echo"
print r & liftIO

View File

@ -65,6 +65,9 @@ common common-base
these, these,
co-log-core, co-log-core,
random, random,
process,
bytestring,
text,
ghc-options: -Wall ghc-options: -Wall
@ -90,6 +93,7 @@ library
Control.Monad.Hefty.Concurrent.Parallel Control.Monad.Hefty.Concurrent.Parallel
Control.Monad.Hefty.Concurrent.Stream Control.Monad.Hefty.Concurrent.Stream
Control.Monad.Hefty.Concurrent.Timer Control.Monad.Hefty.Concurrent.Timer
Control.Monad.Hefty.Concurrent.Subprocess
Control.Monad.Hefty.Log Control.Monad.Hefty.Log
reexported-modules: reexported-modules:
@ -227,6 +231,14 @@ executable FileSystemProvider
build-depends: build-depends:
heftia-effects, heftia-effects,
executable Subprocess
import: common-base
main-is: Main.hs
hs-source-dirs: Example/Subprocess
build-depends:
heftia-effects,
executable UnliftIO executable UnliftIO
import: common-base import: common-base

View File

@ -0,0 +1,274 @@
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE TemplateHaskell #-}
{-# OPTIONS_GHC -fplugin GHC.TypeLits.KnownNat.Solver #-}
-- SPDX-License-Identifier: MPL-2.0 AND BSD-3-Clause
-- (c) The University of Glasgow 2004-2008
-- (c) Sayo Koyoneda 2024
module Control.Monad.Hefty.Concurrent.Subprocess (
module Control.Monad.Hefty.Concurrent.Subprocess,
module Control.Monad.Hefty.Provider,
module System.Process,
module System.Posix.Types,
module System.IO,
module System.Exit,
module Data.ByteString,
)
where
import Control.Applicative ((<|>))
import Control.Concurrent (forkIO)
import Control.Monad (liftM2)
import Control.Monad.Hefty (
Eff,
LNop,
interpret,
interpretH,
liftIO,
makeEffectF,
(&),
type (<<|),
type (<|),
type (~>),
)
import Control.Monad.Hefty.Provider
import Control.Monad.Hefty.Unlift (UnliftIO, withRunInIO)
import Data.ByteString (ByteString, hGet, hGetNonBlocking, hPut)
import Data.ByteString qualified as BS
import Data.Function (fix)
import Data.Maybe (fromJust, isNothing)
import System.Exit (ExitCode)
import System.IO (Handle)
import System.Posix.Types (GroupID, UserID)
import System.Process (CmdSpec (RawCommand, ShellCommand))
import System.Process qualified as Raw
import UnliftIO (TMVar, atomically, finally, mask, newEmptyTMVarIO, putTMVar, readTMVar, tryReadTMVar, uninterruptibleMask_)
import UnliftIO.Concurrent (ThreadId, killThread)
import UnliftIO.Process (terminateProcess, waitForProcess)
data Subprocess p a where
WriteStdin :: ByteString -> Subprocess ('SubprocMode 'Piped o e lp 'Kill) ()
TryWriteStdin :: ByteString -> Subprocess ('SubprocMode 'Piped o e lp ls) Bool
ReadStdout :: Subprocess ('SubprocMode i 'Piped e lp ls) ByteString
ReadStderr :: Subprocess ('SubprocMode i o 'Piped lp ls) ByteString
PollSubproc :: Subprocess ('SubprocMode i o e lp 'Wait) (Maybe ExitCode)
data SubprocMode = SubprocMode StreamMode StreamMode StreamMode Lifecycle Lifecycle
data StreamMode = Piped | NoPipe
data Lifecycle = Kill | Wait
makeEffectF [''Subprocess]
type SubprocProvider eh ef = Provider SubprocResult CreateProcess (Const2 LNop) Subprocess eh ef
data SubprocResult p a where
RaceResult :: Either ExitCode a -> SubprocResult ('SubprocMode i o e 'Kill 'Kill) a
SubprocResult :: ExitCode -> Maybe a -> SubprocResult ('SubprocMode i o e 'Wait 'Kill) a
ScopeResult :: Maybe ExitCode -> a -> SubprocResult ('SubprocMode i o e 'Kill 'Wait) a
SubprocScopeResult :: ExitCode -> a -> SubprocResult ('SubprocMode i o e 'Wait 'Wait) a
deriving stock instance (Show a) => Show (SubprocResult p a)
deriving stock instance (Eq a) => Eq (SubprocResult p a)
runSubprocIO :: (UnliftIO <<| eh, IO <| ef) => Eff (SubprocProvider eh ef ': eh) ef ~> Eff eh ef
runSubprocIO =
runProvider \cp@CreateProcess {subprocLifecycle, scopeLifecycle} m -> withRunInIO \run -> do
(hi, ho, he, ph) <- Raw.createProcess (toRawCreateProcess cp) & liftIO
procStatus <- newEmptyTMVarIO
scopeStatus <- newEmptyTMVarIO
mask \restore -> do
let
runThread :: TMVar a -> IO a -> IO ThreadId
runThread var a = forkIO $ atomically . putTMVar var =<< a
tScope <- runThread scopeStatus $ restore . run $ do
m
& interpretH \case {}
& interpret \case
WriteStdin s -> hPut (fromJust hi) s & liftIO
TryWriteStdin s -> do
stat <- atomically $ tryReadTMVar procStatus
if isNothing stat
then do
hPut (fromJust hi) s & liftIO
pure True
else pure False
ReadStdout -> hRead (fromJust ho) & liftIO
ReadStderr -> hRead (fromJust he) & liftIO
PollSubproc -> atomically $ tryReadTMVar procStatus
_ <- runThread procStatus $ waitForProcess ph
finally
case (subprocLifecycle, scopeLifecycle) of
(WaitMode, WaitMode) ->
liftM2
SubprocScopeResult
(atomically $ readTMVar procStatus)
(atomically $ readTMVar scopeStatus)
(WaitMode, KillMode) -> do
exitCode <- atomically $ readTMVar procStatus
scopeResult <- atomically $ tryReadTMVar scopeStatus
pure $ SubprocResult exitCode scopeResult
(KillMode, WaitMode) -> do
scopeResult <- atomically $ readTMVar scopeStatus
exitCode <- atomically $ tryReadTMVar procStatus
pure $ ScopeResult exitCode scopeResult
(KillMode, KillMode) ->
RaceResult
<$> atomically
( (Left <$> readTMVar procStatus)
<|> (Right <$> readTMVar scopeStatus)
)
do
uninterruptibleMask_ do
terminateProcess ph
killThread tScope
atomically $ readTMVar procStatus
hRead :: Handle -> IO ByteString
hRead h = flip fix BS.empty \next acc -> do
s <- hGet h chunkSize
if BS.null s
then pure acc
else next $ acc <> s
chunkSize :: Int
chunkSize = 4096
data CreateProcess p where
CreateProcess
:: { cmdspec :: CmdSpec
-- ^ Executable & arguments, or shell command. If 'cwd' is 'Nothing', relative paths are resolved with respect to the current working directory. If 'cwd' is provided, it is implementation-dependent whether relative paths are resolved with respect to 'cwd' or the current working directory, so absolute paths should be used to ensure portability.
, stdin :: StdStream i
-- ^ How to determine stdin
, stdout :: StdStream o
-- ^ How to determine stdout
, stderr :: StdStream e
-- ^ How to determine stderr
, subprocLifecycle :: LifecycleMode lp
-- ^ Whether to kill the subprocess or wait when the scope's computation finishes first.
, scopeLifecycle :: LifecycleMode ls
-- ^ Whether to cancel the scope's computation or wait when the subprocess finishes first.
, cwd :: Maybe FilePath
-- ^ Optional path to the working directory for the new process
, env :: Maybe [(String, String)]
-- ^ Optional environment (otherwise inherit from the current process)
, closeFds :: Bool
-- ^ Close all file descriptors except stdin, stdout and stderr in the new process (on Windows, only works if std_in, std_out, and std_err are all Inherit). This implementation will call close on every fd from 3 to the maximum of open files, which can be slow for high maximum of open files. XXX verify what happens with fds in nodejs child processes
, createGroup :: Bool
-- ^ Create a new process group. On JavaScript this also creates a new session.
, delegateCtlc :: Bool
-- ^ Delegate control-C handling. Use this for interactive console processes to let them handle control-C themselves (see below for details).
, detachConsole :: Bool
-- ^ Use the windows DETACHED_PROCESS flag when creating the process; does nothing on other platforms.
, createNewConsole :: Bool
-- ^ Use the windows CREATE_NEW_CONSOLE flag when creating the process; does nothing on other platforms.
, newSession :: Bool
-- ^ Use posix setsid to start the new process in a new session; starts process in a new session on JavaScript; does nothing on other platforms.
, childGroup :: Maybe GroupID
-- ^ Use posix setgid to set child process's group id; works for JavaScript when system running nodejs is posix. does nothing on other platforms.
, childUser :: Maybe UserID
-- ^ Use posix setuid to set child process's user id; works for JavaScript when system running nodejs is posix. does nothing on other platforms.
, useProcessJobs :: Bool
-- ^ On Windows systems this flag indicates that we should wait for the entire process tree
-- to finish before unblocking. On POSIX systems this flag is ignored. See $exec-on-windows for details.
}
-> CreateProcess ('SubprocMode i o e lp ls)
data StdStream s where
CreatePipe :: StdStream 'Piped
-- ^ Create a new pipe. The returned
-- @Handle@ will use the default encoding
-- and newline translation mode (just
-- like @Handle@s created by @openFile@).
Inherit :: StdStream 'NoPipe
-- ^ Inherit Handle from parent
UseHandle
:: Handle
-> StdStream 'NoPipe
-- ^ Use the supplied Handle
NoStream :: StdStream 'NoPipe
-- ^ Close the stream's file descriptor without
-- passing a Handle. On POSIX systems this may
-- lead to strange behavior in the child process
-- because attempting to read or write after the
-- file has been closed throws an error. This
-- should only be used with child processes that
-- don't use the file descriptor at all. If you
-- wish to ignore the child process's output you
-- should either create a pipe and drain it
-- manually or pass a @Handle@ that writes to
-- @\/dev\/null@.
data LifecycleMode t where
KillMode :: LifecycleMode 'Kill
WaitMode :: LifecycleMode 'Wait
process :: FilePath -> [String] -> CreateProcess ('SubprocMode 'NoPipe 'NoPipe 'NoPipe 'Wait 'Wait)
process cmd args = command $ RawCommand cmd args
shell :: String -> CreateProcess ('SubprocMode 'NoPipe 'NoPipe 'NoPipe 'Wait 'Wait)
shell = command . ShellCommand
command :: CmdSpec -> CreateProcess ('SubprocMode 'NoPipe 'NoPipe 'NoPipe 'Wait 'Wait)
command cmdspec = commandWith cmdspec Inherit Inherit Inherit WaitMode WaitMode
commandWith
:: CmdSpec
-> StdStream i
-> StdStream o
-> StdStream e
-> LifecycleMode lp
-> LifecycleMode ls
-> CreateProcess ('SubprocMode i o e lp ls)
commandWith cmdspec stdin stdout stderr subprocLifecycle scopeLifecycle =
CreateProcess
{ cmdspec = cmdspec
, stdin = stdin
, stdout = stdout
, stderr = stderr
, subprocLifecycle
, scopeLifecycle
, cwd = Nothing
, env = Nothing
, closeFds = False
, createGroup = False
, delegateCtlc = False
, detachConsole = False
, createNewConsole = False
, newSession = False
, childGroup = Nothing
, childUser = Nothing
, useProcessJobs = False
}
toRawCreateProcess :: CreateProcess stdio -> Raw.CreateProcess
toRawCreateProcess (CreateProcess {..}) =
Raw.CreateProcess
{ cmdspec = cmdspec
, cwd = cwd
, env = env
, std_in = toRawStdStream stdin
, std_out = toRawStdStream stdout
, std_err = toRawStdStream stderr
, close_fds = closeFds
, create_group = createGroup
, delegate_ctlc = delegateCtlc
, detach_console = detachConsole
, create_new_console = createNewConsole
, new_session = newSession
, child_group = childGroup
, child_user = childUser
, use_process_jobs = useProcessJobs
}
toRawStdStream :: StdStream pipe -> Raw.StdStream
toRawStdStream = \case
CreatePipe -> Raw.CreatePipe
Inherit -> Raw.Inherit
UseHandle h -> Raw.UseHandle h
NoStream -> Raw.NoStream

View File

@ -1,5 +1,3 @@
{-# LANGUAGE UndecidableInstances #-}
-- SPDX-License-Identifier: MPL-2.0 -- SPDX-License-Identifier: MPL-2.0
{- | {- |
@ -18,34 +16,46 @@ import Control.Monad.Hefty (
HFunctor, HFunctor,
KeyH (KeyH), KeyH (KeyH),
MemberHBy, MemberHBy,
Type,
interpretH, interpretH,
tag, key,
tagH, keyH,
transEffHF, transEffHF,
untag, unkey,
untagH, unkeyH,
weaken, weaken,
weakenNH, weakenNH,
type (#), type (##>),
type (##), type (#>),
type (~>), type (~>),
) )
import Data.Effect.HFunctor (hfmap)
import Data.Effect.Provider hiding (Provider, Provider_) import Data.Effect.Provider hiding (Provider, Provider_)
import Data.Effect.Provider qualified as D import Data.Effect.Provider qualified as D
import Data.Functor.Const (Const (Const))
import Data.Functor.Identity (Identity (Identity)) import Data.Functor.Identity (Identity (Identity))
type Provider ctx i sh sf eh ef = D.Provider ctx i (ProviderEff ctx i sh sf eh ef) type Provider ctx i sh sf eh ef = D.Provider ctx i (ProviderEff ctx i sh sf eh ef)
type Provider_ i sh sf eh ef = Provider Identity i sh sf eh ef
newtype ProviderEff ctx i sh sf eh ef a newtype ProviderEff ctx i sh sf eh ef p a
= ProviderEff {unProviderEff :: Eff (sh ': Provider ctx i sh sf eh ef ': eh) (sf ': ef) a} = ProviderEff {unProviderEff :: Eff (sh p ': Provider ctx i sh sf eh ef ': eh) (sf p ': ef) a}
type Provider_ i sh sf eh ef =
D.Provider (Const1 Identity) (Const i :: () -> Type) (Const1 (ProviderEff_ i sh sf eh ef))
newtype ProviderEff_ i sh sf eh ef a
= ProviderEff_ {unProviderEff_ :: Eff (sh ': Provider_ i sh sf eh ef ': eh) (sf ': ef) a}
newtype Const2 ff x f a = Const2 {getConst2 :: ff f a}
instance (HFunctor ff) => HFunctor (Const2 ff x) where
hfmap phi (Const2 ff) = Const2 $ hfmap phi ff
runProvider runProvider
:: forall ctx i sh sf eh ef :: forall ctx i sh sf eh ef
. ( forall x . ( forall p x
. i . i p
-> Eff (sh ': Provider ctx i sh sf eh ef ': eh) (sf ': ef) x -> Eff (sh p ': Provider ctx i sh sf eh ef ': eh) (sf p ': ef) x
-> Eff (Provider ctx i sh sf eh ef ': eh) ef (ctx x) -> Eff (Provider ctx i sh sf eh ef ': eh) ef (ctx p x)
) )
-> Eff (Provider ctx i sh sf eh ef ': eh) ef ~> Eff eh ef -> Eff (Provider ctx i sh sf eh ef ': eh) ef ~> Eff eh ef
runProvider run = runProvider run =
@ -55,44 +65,61 @@ runProvider run =
runProvider_ runProvider_
:: forall i sh sf eh ef :: forall i sh sf eh ef
. ( forall x . (HFunctor sh)
=> ( forall x
. i . i
-> Eff (sh ': Provider_ i sh sf eh ef ': eh) (sf ': ef) x -> Eff (sh ': Provider_ i sh sf eh ef ': eh) (sf ': ef) x
-> Eff (Provider_ i sh sf eh ef ': eh) ef x -> Eff (Provider_ i sh sf eh ef ': eh) ef x
) )
-> Eff (Provider_ i sh sf eh ef ': eh) ef ~> Eff eh ef -> Eff (Provider_ i sh sf eh ef ': eh) ef ~> Eff eh ef
runProvider_ run = runProvider \i a -> run i (Identity <$> a) runProvider_ run =
interpretH \(KeyH (Provide (Const i) f)) ->
runProvider_ run $
run
i
( fmap (Const1 . Identity)
. unProviderEff_
. getConst1
$ f
$ Const1
. ProviderEff_
. transEffHF (weakenNH @2) weaken
)
scope scope
:: forall tag ctx i eh ef a sh sf bh bf :: forall key ctx i p eh ef a sh sf bh bf
. ( MemberHBy . ( MemberHBy
(ProviderKey ctx i) (ProviderKey ctx i)
(Provider' ctx i (ProviderEff ctx i sh sf bh bf)) (Provider' ctx i (ProviderEff ctx i sh sf bh bf))
eh eh
, HFunctor sh , HFunctor (sh p)
) )
=> i => i p
-> ( Eff eh ef ~> Eff (sh ## tag ': Provider ctx i sh sf bh bf ': bh) (sf # tag ': bf) -> ( Eff eh ef ~> Eff (key ##> sh p ': Provider ctx i sh sf bh bf ': bh) (key #> sf p ': bf)
-> Eff (sh ## tag ': Provider ctx i sh sf bh bf ': bh) (sf # tag ': bf) a -> Eff (key ##> sh p ': Provider ctx i sh sf bh bf ': bh) (key #> sf p ': bf) a
) )
-> Eff eh ef (ctx a) -> Eff eh ef (ctx p a)
scope i f = scope i f =
i ..! \runInScope -> i ..! \runInScope ->
ProviderEff $ untagH . untag $ f (tagH . tag . unProviderEff . runInScope) ProviderEff $ unkeyH . unkey $ f (keyH . key . unProviderEff . runInScope)
scope_ scope_
:: forall tag i eh ef a sh sf bh bf :: forall key i eh ef a sh sf bh bf
. ( MemberHBy . ( MemberHBy
(ProviderKey Identity i) (ProviderKey (Const1 Identity :: () -> Type -> Type) (Const i :: () -> Type))
(Provider' Identity i (ProviderEff Identity i sh sf bh bf)) ( Provider'
(Const1 Identity)
(Const i)
(Const1 (ProviderEff_ i sh sf bh bf))
)
eh eh
, HFunctor sh , HFunctor sh
) )
=> i => i
-> ( Eff eh ef ~> Eff (sh ## tag ': Provider_ i sh sf bh bf ': bh) (sf # tag ': bf) -> ( Eff eh ef ~> Eff (key ##> sh ': Provider_ i sh sf bh bf ': bh) (key #> sf ': bf)
-> Eff (sh ## tag ': Provider_ i sh sf bh bf ': bh) (sf # tag ': bf) a -> Eff (key ##> sh ': Provider_ i sh sf bh bf ': bh) (key #> sf ': bf) a
) )
-> Eff eh ef a -> Eff eh ef a
scope_ i f = scope_ i f =
i .! \runInScope -> i .! \runInScope ->
ProviderEff $ untagH . untag $ f (tagH . tag . unProviderEff . runInScope) ProviderEff_ $ unkeyH . unkey $ f (keyH . key . unProviderEff_ . runInScope)

View File

@ -638,8 +638,10 @@ module Control.Monad.Hefty (
tagH, tagH,
untagH, untagH,
retagH, retagH,
key,
unkey, unkey,
rekey, rekey,
keyH,
unkeyH, unkeyH,
rekeyH, rekeyH,
@ -725,6 +727,8 @@ import Control.Monad.Hefty.Transform (
bundleN, bundleN,
bundleUnder, bundleUnder,
bundleUnderH, bundleUnderH,
key,
keyH,
raise, raise,
raiseAll, raiseAll,
raiseAllH, raiseAllH,

View File

@ -593,6 +593,13 @@ retagH
retagH = transformH $ TagH . unTagH retagH = transformH $ TagH . unTagH
{-# INLINE retagH #-} {-# INLINE retagH #-}
-- | Attaches the @key@ to the first-order effect at the head of the list.
key
:: forall key e ef eh
. Eff eh (e ': ef) ~> Eff eh (key #> e ': ef)
key = transform Key
{-# INLINE key #-}
-- | Removes the @key@ from the keyed first-order effect at the head of the list. -- | Removes the @key@ from the keyed first-order effect at the head of the list.
unkey unkey
:: forall key e ef eh :: forall key e ef eh
@ -609,6 +616,14 @@ rekey
rekey = transform $ Key . unKey rekey = transform $ Key . unKey
{-# INLINE rekey #-} {-# INLINE rekey #-}
-- | Attaches the @key@ to the higher-order effect at the head of the list.
keyH
:: forall key e ef eh
. (HFunctor e)
=> Eff (e ': eh) ef ~> Eff (key ##> e ': eh) ef
keyH = transformH KeyH
{-# INLINE keyH #-}
-- | Removes the @key@ from the keyed higher-order effect at the head of the list. -- | Removes the @key@ from the keyed higher-order effect at the head of the list.
unkeyH unkeyH
:: forall key e eh ef :: forall key e eh ef