Fixed jobs to terminate if interrupted by exception.

This commit is contained in:
Martin Sosic 2020-10-29 18:11:39 +01:00
parent 4889fefd0f
commit 0c4ae72495

View File

@ -1,3 +1,5 @@
{-# LANGUAGE ScopedTypeVariables #-}
module Generator.Job.Process module Generator.Job.Process
( runProcessAsJob ( runProcessAsJob
, runNodeCommandAsJob , runNodeCommandAsJob
@ -5,44 +7,54 @@ module Generator.Job.Process
import Control.Concurrent (writeChan) import Control.Concurrent (writeChan)
import Control.Concurrent.Async (Concurrently (..)) import Control.Concurrent.Async (Concurrently (..))
import qualified Data.ByteString.Char8 as BS import Control.Exception (bracket)
import Data.Conduit (runConduit, (.|)) import qualified Data.ByteString.Char8 as BS
import qualified Data.Conduit.List as CL import Data.Conduit (runConduit, (.|))
import qualified Data.Conduit.Process as CP import qualified Data.Conduit.List as CL
import System.Exit (ExitCode (..)) import qualified Data.Conduit.Process as CP
import qualified System.Process as P import System.Exit (ExitCode (..))
import Text.Read (readMaybe) import qualified System.Process as P
import qualified Text.Regex.TDFA as R import Text.Read (readMaybe)
import qualified Text.Regex.TDFA as R
import qualified Generator.Common as C import qualified Generator.Common as C
import qualified Generator.Job as J import qualified Generator.Job as J
import StrongPath (Abs, Dir, Path) import StrongPath (Abs, Dir, Path)
import qualified StrongPath as SP import qualified StrongPath as SP
-- | Runs a given process while streaming its stderr and stdout to provided channel. -- | Runs a given process while streaming its stderr and stdout to provided channel.
-- Returns exit code of the process once it finishes, and also sends it to he channel. -- Returns exit code of the process once it finishes, and also sends it to he channel.
-- Makes sure to terminate the process if exception occurs.
runProcessAsJob :: P.CreateProcess -> J.JobType -> J.Job runProcessAsJob :: P.CreateProcess -> J.JobType -> J.Job
runProcessAsJob process jobType chan = do runProcessAsJob process jobType chan = bracket
(CP.ClosedStream, stdoutStream, stderrStream, processHandle) <- CP.streamingProcess process (CP.streamingProcess process)
(\(_, _, _, sph) -> terminateStreamingProcess sph)
runStreamingProcessAsJob
where
runStreamingProcessAsJob (CP.ClosedStream, stdoutStream, stderrStream, processHandle) = do
let forwardStdoutToChan = runConduit $ stdoutStream .| CL.mapM_
(\bs -> writeChan chan $ J.JobMessage { J._data = J.JobOutput (BS.unpack bs) J.Stdout
, J._jobType = jobType })
let forwardStdoutToChan = runConduit $ stdoutStream .| CL.mapM_ let forwardStderrToChan = runConduit $ stderrStream .| CL.mapM_
(\bs -> writeChan chan $ J.JobMessage { J._data = J.JobOutput (BS.unpack bs) J.Stdout (\bs -> writeChan chan $ J.JobMessage { J._data = J.JobOutput (BS.unpack bs) J.Stderr
, J._jobType = jobType }) , J._jobType = jobType })
let forwardStderrToChan = runConduit $ stderrStream .| CL.mapM_ exitCode <- runConcurrently $
(\bs -> writeChan chan $ J.JobMessage { J._data = J.JobOutput (BS.unpack bs) J.Stderr Concurrently forwardStdoutToChan *>
, J._jobType = jobType }) Concurrently forwardStderrToChan *>
Concurrently (CP.waitForStreamingProcess processHandle)
exitCode <- runConcurrently $ writeChan chan $ J.JobMessage { J._data = J.JobExit exitCode
Concurrently forwardStdoutToChan *> , J._jobType = jobType }
Concurrently forwardStderrToChan *>
Concurrently (CP.waitForStreamingProcess processHandle)
writeChan chan $ J.JobMessage { J._data = J.JobExit exitCode return exitCode
, J._jobType = jobType }
return exitCode terminateStreamingProcess streamingProcessHandle = do
let processHandle = CP.streamingProcessHandleRaw streamingProcessHandle
P.terminateProcess processHandle
return $ ExitFailure 1
runNodeCommandAsJob :: Path Abs (Dir a) -> String -> [String] -> J.JobType -> J.Job runNodeCommandAsJob :: Path Abs (Dir a) -> String -> [String] -> J.JobType -> J.Job
runNodeCommandAsJob fromDir command args jobType chan = do runNodeCommandAsJob fromDir command args jobType chan = do