Add command run API to Stream.

This commit is contained in:
Dillon Kearns 2024-04-04 19:20:30 -07:00
parent 094f5e8147
commit f78f7ae973
5 changed files with 379 additions and 132 deletions

File diff suppressed because one or more lines are too long

View File

@ -1,119 +0,0 @@
module StreamDemo exposing (run)
import BackendTask exposing (BackendTask)
import FatalError exposing (FatalError)
import Json.Decode as Decode
import Pages.Script as Script exposing (Script)
import Stream exposing (Stream)
run : Script
run =
Script.withoutCliOptions
--Stream.fileRead "elm.json"
--Stream.command "ls" [ "-l" ]
-- |> Stream.pipe Stream.stdout
-- |> Stream.run
--elmFormatString : Stream { read : (), write : Never }
--elmFormatString string =
-- string
-- |> Stream.fromString
-- |> Stream.pipe
--Stream.fileRead "script/src/StreamDemo.elm"
--Stream.stdin
-- |> Stream.pipe (Stream.command "elm-format" [ "--stdin" ])
-- --|> Stream.pipe (Stream.fileWrite "my-formatted-example.elm")
-- |> Stream.pipe Stream.stdout
-- |> Stream.run
--unzip
(zip
|> BackendTask.andThen
(\_ ->
readType
|> BackendTask.andThen
(\type_ ->
Script.log ("Found type: " ++ type_)
)
--Stream.fileRead zipFile
-- |> Stream.pipe Stream.unzip
-- |> Stream.pipe (Stream.command "jq" [ ".type" ])
-- |> Stream.pipe Stream.stdout
-- |> Stream.run
)
)
--unzip
readType : BackendTask FatalError String
readType =
Stream.fileRead zipFile
|> Stream.pipe Stream.unzip
|> Stream.readJson (Decode.field "type" Decode.string)
zip =
--Stream.command "elm-review" [ "--report=json" ]
--|> Stream.pipe Stream.stdout
--Stream.command "ls" [ "-l" ]
Stream.fileRead "elm.json"
|> Stream.pipe Stream.gzip
--|> Stream.pipe Stream.stdout
|> Stream.pipe (Stream.fileWrite zipFile)
|> Stream.run
unzip : BackendTask FatalError ()
unzip =
Stream.fileRead zipFile
|> Stream.pipe Stream.unzip
|> Stream.pipe Stream.stdout
|> Stream.run
zipFile : String.String
zipFile =
"elm-review-report.gz.json"
example1 : BackendTask FatalError ()
example1 =
formatFile
(Stream.fromString
"""module Foo
a = 1
b = 2
"""
)
(Stream.fileWrite "my-formatted-example.elm")
example2 : BackendTask FatalError ()
example2 =
formatFile
(Stream.fileRead "script/src/StreamDemo.elm")
Stream.stdout
formatFile : Stream { read : (), write : fromWriteable } -> Stream { read : anything, write : () } -> BackendTask FatalError ()
formatFile source destination =
source
|> Stream.pipe (Stream.command "elm-format" [ "--stdin" ])
|> Stream.pipe destination
|> Stream.run
--Kind of a cool thing with the phantom record type there, you can annotate things in a more limited way if you choose to if you know that a given command doesn't accept `stdin` (and therefore can't be piped to), or doesn't give meaningful output (and therefore you don't want things to pipe from it).
--command : String -> List String -> Stream { read : read, write : write }
--elmFormatString : Stream { read : (), write : Never }
--elmFormatString string =
-- string
-- |> Stream.fromString
-- |> Stream.pipe (Stream.command "elm-format" "--stdin")
--
--
--chmodX : String -> Stream { read : Never, write : Never }

View File

@ -0,0 +1,85 @@
module StreamTests exposing (run)
import BackendTask exposing (BackendTask)
import BackendTaskTest exposing (testScript)
import Expect
import FatalError exposing (FatalError)
import Json.Decode as Decode
import Pages.Script as Script exposing (Script)
import Stream exposing (Stream)
import Test
run : Script
run =
testScript "Stream"
[ Stream.fromString "asdf\nqwer\n"
|> Stream.captureCommandWithInput "wc" [ "-l" ]
|> test "capture stdin"
(\output ->
output.stdout
|> String.trim
|> Expect.equal
"2"
)
, Stream.fromString "asdf\nqwer\n"
|> Stream.runCommandWithInput "wc" [ "-l" ]
|> try
|> test "run stdin"
(\() ->
Expect.pass
)
, Stream.fileRead "elm.json"
|> Stream.pipe Stream.gzip
|> Stream.pipe (Stream.fileWrite zipFile)
|> Stream.run
|> BackendTask.andThen
(\() ->
Stream.fileRead zipFile
|> Stream.pipe Stream.unzip
|> Stream.readJson (Decode.field "type" Decode.string)
)
|> test "zip and unzip" (Expect.equal "application")
, Stream.fromString
"""module Foo
a = 1
b = 2
"""
|> Stream.captureCommandWithInput "elm-format" [ "--stdin" ]
|> test "elm-format --stdin"
(\{ stdout } ->
stdout
|> Expect.equal
"""module Foo exposing (a, b)
a =
1
b =
2
"""
)
]
test : String -> (a -> Expect.Expectation) -> BackendTask FatalError a -> BackendTask FatalError Test.Test
test name toExpectation task =
task
|> BackendTask.map
(\data ->
Test.test name <|
\() -> toExpectation data
)
try : BackendTask { error | fatal : FatalError } data -> BackendTask FatalError data
try =
BackendTask.allowFatal
zipFile : String.String
zipFile =
"elm-review-report.gz.json"

View File

@ -17,6 +17,7 @@ import { Spinnies } from './spinnies/index.js'
import { default as which } from "which";
import * as readline from "readline";
import { spawn as spawnCallback } from "cross-spawn";
import {ChildProcess} from 'node:child_process';
import * as consumers from 'stream/consumers'
import * as zlib from 'node:zlib'
import { Readable } from "node:stream";
@ -589,24 +590,68 @@ function runStream(req) {
parts.forEach((part, index) => {
let isLastProcess = index === parts.length - 1;
let thisStream = pipePartToStream(lastStream, part, { cwd, quiet, env });
let thisStream;
if (isLastProcess && (kind === "command" || kind === "commandCode")) {
const {command, args} = part;
let stdio;
if (kind === "command") {
stdio = ["pipe", "pipe", "pipe"];
} else if (kind === "commandCode") {
stdio = quiet ? ['pipe', 'ignore', 'ignore'] : ['pipe', 'inherit', 'inherit'];
} else {
throw new Error(`Unknown kind: ${kind}`);
}
const newProcess = spawnCallback(command, args, {
stdio,
cwd: cwd,
env: env,
});
lastStream && lastStream.pipe(newProcess.stdin);
if (kind === "command") {
let stdoutOutput = "";
let stderrOutput = "";
let combinedOutput = "";
newProcess.stderr.on("data", function (data) {
stderrOutput += data;
combinedOutput += data;
});
newProcess.stdout.on("data", function (data) {
stdoutOutput += data;
combinedOutput += data;
});
newProcess.on("close", async (exitCode) => {
resolve(jsonResponse(req, { stdoutOutput, stderrOutput, combinedOutput, exitCode }));
});
} else {
newProcess.on("close", async (exitCode) => {
resolve(jsonResponse(req, { exitCode }));
});
}
} else {
thisStream = pipePartToStream(lastStream, part, { cwd, quiet, env });
}
lastStream = thisStream;
});
if (kind === "json") {
resolve(jsonResponse(req, await consumers.json(lastStream)));
} else if (kind === "text") {
resolve(jsonResponse(req, await consumers.text(lastStream)));
} else {
lastStream.once("finish", async () => {
} else if (kind === "none") {
// lastStream.once("finish", async () => {
// resolve(jsonResponse(req, null));
// });
lastStream.once("close", () => {
resolve(jsonResponse(req, null));
});
} else if (kind === "command") {
// already handled in parts.forEach
}
lastStream.once("error", (error) => {
console.log('Stream error!');
console.error(error);
reject(jsonResponse(req, null));
});
// lastStream.once("error", (error) => {
// console.log('Stream error!');
// console.error(error);
// reject(jsonResponse(req, null));
// });
} catch (error) {
console.trace(error);
@ -638,12 +683,29 @@ function pipePartToStream(lastStream, part, { cwd, quiet, env }) {
} else if (part.name === "fileWrite") {
return lastStream.pipe(fs.createWriteStream(path.resolve(part.path)));
} else if (part.name === "command") {
const {command, args} = part;
const {command, args, allowNon0Status} = part;
/**
* @type {import('node:child_process').ChildProcess}
*/
const newProcess = spawnCallback(command, args, {
stdio: ["pipe", "pipe", "pipe"],
cwd: cwd,
env: env,
});
newProcess.on("error", (error) => {
console.error("ERROR in pipeline!", error);
process.exit(1);
});
newProcess.on("exit", (code) => {
if (code !== 0) {
if (allowNon0Status) {
} else {
console.error("ERROR in exit code!", code);
process.exit(1);
}
}
});
lastStream && lastStream.pipe(newProcess.stdin);
return newProcess.stdout;
} else if (part.name === "fromString") {

View File

@ -1,9 +1,32 @@
module Stream exposing (Stream, command, fileRead, fileWrite, fromString, httpRead, httpWrite, pipe, read, run, stdin, stdout, gzip, readJson, unzip)
module Stream exposing
( Stream, command, fileRead, fileWrite, fromString, httpRead, httpWrite, pipe, read, run, stdin, stdout, gzip, readJson, unzip
, CommandOutput, captureCommandWithInput, runCommandWithInput
, captureCommand, runCommand
, commandWithOptions
, CommandOptions, defaultCommandOptions, allowNon0Status, inheritUnused, withOutput, withTimeout
, OutputChannel(..)
)
{-|
@docs Stream, command, fileRead, fileWrite, fromString, httpRead, httpWrite, pipe, read, run, stdin, stdout, gzip, readJson, unzip
@docs CommandOutput, captureCommandWithInput, runCommandWithInput
## Running Commands
@docs captureCommand, runCommand
## Command Options
@docs commandWithOptions
@docs CommandOptions, defaultCommandOptions, allowNon0Status, inheritUnused, withOutput, withTimeout
@docs OutputChannel
-}
import BackendTask exposing (BackendTask)
@ -94,9 +117,7 @@ httpWrite string =
{-| -}
pipe :
-- to
Stream { read : toReadable, write : toWriteable }
-- from
-> Stream { read : (), write : fromWriteable }
-> Stream { read : toReadable, write : toWriteable }
pipe (Stream to) (Stream from) =
@ -162,7 +183,205 @@ readBytes stream =
{-| -}
command : String -> List String -> Stream { read : read, write : write }
command command_ args_ =
commandWithOptions defaultCommandOptions command_ args_
{-| -}
commandWithOptions : CommandOptions -> String -> List String -> Stream { read : read, write : write }
commandWithOptions (CommandOptions options) command_ args_ =
single "command"
[ ( "command", Encode.string command_ )
, ( "args", Encode.list Encode.string args_ )
, ( "allowNon0Status", Encode.bool options.allowNon0Status )
, ( "output", encodeChannel options.output )
, ( "timeoutInMs", nullable Encode.int options.timeoutInMs )
]
nullable : (a -> Encode.Value) -> Maybe a -> Encode.Value
nullable encoder maybeValue =
case maybeValue of
Just value ->
encoder value
Nothing ->
Encode.null
{-| -}
type OutputChannel
= Stdout
| Stderr
| Both
{-| -}
type CommandOptions
= CommandOptions CommandOptions_
type alias CommandOptions_ =
{ output : OutputChannel
, inheritUnused : Bool
, allowNon0Status : Bool
, timeoutInMs : Maybe Int
}
{-| -}
defaultCommandOptions : CommandOptions
defaultCommandOptions =
CommandOptions
{ output = Stdout
, inheritUnused = False
, allowNon0Status = False
, timeoutInMs = Nothing
}
{-| -}
withOutput : OutputChannel -> CommandOptions -> CommandOptions
withOutput output (CommandOptions cmd) =
CommandOptions { cmd | output = output }
{-| -}
allowNon0Status : CommandOptions -> CommandOptions
allowNon0Status (CommandOptions cmd) =
CommandOptions { cmd | allowNon0Status = True }
{-| -}
withTimeout : Int -> CommandOptions -> CommandOptions
withTimeout timeoutMs (CommandOptions cmd) =
CommandOptions { cmd | timeoutInMs = Just timeoutMs }
{-| -}
inheritUnused : CommandOptions -> CommandOptions
inheritUnused (CommandOptions cmd) =
CommandOptions { cmd | inheritUnused = True }
encodeChannel : OutputChannel -> Encode.Value
encodeChannel output =
Encode.string
(case output of
Stdout ->
"stdout"
Stderr ->
"stderr"
Both ->
"both"
)
{-| -}
type alias CommandOutput =
{ stdout : String
, stderr : String
, combined : String
, exitCode : Int
}
commandOutputDecoder : Decoder CommandOutput
commandOutputDecoder =
Decode.map4 CommandOutput
(Decode.field "stdoutOutput" Decode.string)
(Decode.field "stderrOutput" Decode.string)
(Decode.field "combinedOutput" Decode.string)
(Decode.field "exitCode" Decode.int)
{-| -}
captureCommandWithInput : String -> List String -> Stream { read : (), write : write } -> BackendTask FatalError CommandOutput
captureCommandWithInput command_ args_ stream =
captureCommand_ command_ args_ (Just stream)
{-| -}
captureCommand_ : String -> List String -> Maybe (Stream { read : (), write : write }) -> BackendTask FatalError CommandOutput
captureCommand_ command_ args_ maybeStream =
BackendTask.Internal.Request.request
{ name = "stream"
, body =
BackendTask.Http.jsonBody
(pipelineEncoder
(case maybeStream of
Just stream ->
stream
|> pipe (command command_ args_)
Nothing ->
command command_ args_
)
"command"
)
, expect = BackendTask.Http.expectJson commandOutputDecoder
}
{-| -}
runCommandWithInput : String -> List String -> Stream { read : (), write : write } -> BackendTask { fatal : FatalError, recoverable : Int } ()
runCommandWithInput command_ args_ maybeStream =
runCommand_ command_ args_ (Just maybeStream)
{-| -}
runCommand_ :
String
-> List String
-> Maybe (Stream { read : (), write : write })
-> BackendTask { fatal : FatalError, recoverable : Int } ()
runCommand_ command_ args_ maybeStream =
BackendTask.Internal.Request.request
{ name = "stream"
, body =
BackendTask.Http.jsonBody
(pipelineEncoder
(case maybeStream of
Just stream ->
stream
|> pipe (command command_ args_)
Nothing ->
command command_ args_
)
"commandCode"
)
, expect = BackendTask.Http.expectJson (Decode.field "exitCode" Decode.int)
}
|> BackendTask.andThen
(\exitCode ->
if exitCode == 0 then
BackendTask.succeed ()
else
BackendTask.fail
(FatalError.recoverable
{ title = "Command Failed"
, body = "Command `" ++ commandToString command_ args_ ++ "` failed with exit code " ++ String.fromInt exitCode
}
exitCode
)
)
{-| -}
captureCommand : String -> List String -> BackendTask FatalError CommandOutput
captureCommand command_ args_ =
captureCommand_ command_ args_ Nothing
{-| -}
runCommand : String -> List String -> BackendTask { fatal : FatalError, recoverable : Int } ()
runCommand command_ args_ =
runCommand_ command_ args_ Nothing
commandToString : String -> List String -> String
commandToString command_ args_ =
command_ ++ " " ++ String.join " " args_