Add initial Stream API.

This commit is contained in:
Dillon Kearns 2024-04-02 06:35:12 -07:00
parent 5249115507
commit ec464f43a4
5 changed files with 487 additions and 0 deletions

View File

@ -22,6 +22,7 @@

View File

@ -0,0 +1,119 @@
module StreamDemo exposing (run)
import BackendTask exposing (BackendTask)
import FatalError exposing (FatalError)
import Json.Decode as Decode
import Pages.Script as Script exposing (Script)
import Stream exposing (Stream)
run : Script
run =
--Stream.fileRead "elm.json"
--Stream.command "ls" [ "-l" ]
-- |> Stream.pipe Stream.stdout
-- |>
--elmFormatString : Stream { read : (), write : Never }
--elmFormatString string =
-- string
-- |> Stream.fromString
-- |> Stream.pipe
--Stream.fileRead "script/src/StreamDemo.elm"
-- |> Stream.pipe (Stream.command "elm-format" [ "--stdin" ])
-- --|> Stream.pipe (Stream.fileWrite "my-formatted-example.elm")
-- |> Stream.pipe Stream.stdout
-- |>
|> BackendTask.andThen
(\_ ->
|> BackendTask.andThen
(\type_ ->
Script.log ("Found type: " ++ type_)
--Stream.fileRead zipFile
-- |> Stream.pipe Stream.unzip
-- |> Stream.pipe (Stream.command "jq" [ ".type" ])
-- |> Stream.pipe Stream.stdout
-- |>
readType : BackendTask FatalError String
readType =
Stream.fileRead zipFile
|> Stream.pipe Stream.unzip
|> Stream.readJson (Decode.field "type" Decode.string)
zip =
--Stream.command "elm-review" [ "--report=json" ]
--|> Stream.pipe Stream.stdout
--Stream.command "ls" [ "-l" ]
Stream.fileRead "elm.json"
|> Stream.pipe Stream.gzip
--|> Stream.pipe Stream.stdout
|> Stream.pipe (Stream.fileWrite zipFile)
unzip : BackendTask FatalError ()
unzip =
Stream.fileRead zipFile
|> Stream.pipe Stream.unzip
|> Stream.pipe Stream.stdout
zipFile : String.String
zipFile =
example1 : BackendTask FatalError ()
example1 =
"""module Foo
a = 1
b = 2
(Stream.fileWrite "my-formatted-example.elm")
example2 : BackendTask FatalError ()
example2 =
(Stream.fileRead "script/src/StreamDemo.elm")
formatFile : Stream { read : (), write : fromWriteable } -> Stream { read : anything, write : () } -> BackendTask FatalError ()
formatFile source destination =
|> Stream.pipe (Stream.command "elm-format" [ "--stdin" ])
|> Stream.pipe destination
--Kind of a cool thing with the phantom record type there, you can annotate things in a more limited way if you choose to if you know that a given command doesn't accept `stdin` (and therefore can't be piped to), or doesn't give meaningful output (and therefore you don't want things to pipe from it).
--command : String -> List String -> Stream { read : read, write : write }
--elmFormatString : Stream { read : (), write : Never }
--elmFormatString string =
-- string
-- |> Stream.fromString
-- |> Stream.pipe (Stream.command "elm-format" "--stdin")
--chmodX : String -> Stream { read : Never, write : Never }

View File

@ -0,0 +1,119 @@
module StreamDemo exposing (run)
import BackendTask exposing (BackendTask)
import FatalError exposing (FatalError)
import Json.Decode as Decode
import Pages.Script as Script exposing (Script)
import Stream exposing (Stream)
run : Script
run =
--Stream.fileRead "elm.json"
--Stream.command "ls" [ "-l" ]
-- |> Stream.pipe Stream.stdout
-- |>
--elmFormatString : Stream { read : (), write : Never }
--elmFormatString string =
-- string
-- |> Stream.fromString
-- |> Stream.pipe
--Stream.fileRead "script/src/StreamDemo.elm"
-- |> Stream.pipe (Stream.command "elm-format" [ "--stdin" ])
-- --|> Stream.pipe (Stream.fileWrite "my-formatted-example.elm")
-- |> Stream.pipe Stream.stdout
-- |>
|> BackendTask.andThen
(\_ ->
|> BackendTask.andThen
(\type_ ->
Script.log ("Found type: " ++ type_)
--Stream.fileRead zipFile
-- |> Stream.pipe Stream.unzip
-- |> Stream.pipe (Stream.command "jq" [ ".type" ])
-- |> Stream.pipe Stream.stdout
-- |>
readType : BackendTask FatalError String
readType =
Stream.fileRead zipFile
|> Stream.pipe Stream.unzip
|> Stream.readJson (Decode.field "type" Decode.string)
zip =
--Stream.command "elm-review" [ "--report=json" ]
--|> Stream.pipe Stream.stdout
--Stream.command "ls" [ "-l" ]
Stream.fileRead "elm.json"
|> Stream.pipe Stream.gzip
--|> Stream.pipe Stream.stdout
|> Stream.pipe (Stream.fileWrite zipFile)
unzip : BackendTask FatalError ()
unzip =
Stream.fileRead zipFile
|> Stream.pipe Stream.unzip
|> Stream.pipe Stream.stdout
zipFile : String.String
zipFile =
example1 : BackendTask FatalError ()
example1 =
"""module Foo
a = 1
b = 2
(Stream.fileWrite "my-formatted-example.elm")
example2 : BackendTask FatalError ()
example2 =
(Stream.fileRead "script/src/StreamDemo.elm")
formatFile : Stream { read : (), write : fromWriteable } -> Stream { read : anything, write : () } -> BackendTask FatalError ()
formatFile source destination =
|> Stream.pipe (Stream.command "elm-format" [ "--stdin" ])
|> Stream.pipe destination
--Kind of a cool thing with the phantom record type there, you can annotate things in a more limited way if you choose to if you know that a given command doesn't accept `stdin` (and therefore can't be piped to), or doesn't give meaningful output (and therefore you don't want things to pipe from it).
--command : String -> List String -> Stream { read : read, write : write }
--elmFormatString : Stream { read : (), write : Never }
--elmFormatString string =
-- string
-- |> Stream.fromString
-- |> Stream.pipe (Stream.command "elm-format" "--stdin")
--chmodX : String -> Stream { read : Never, write : Never }

View File

@ -17,6 +17,10 @@ import { Spinnies } from './spinnies/index.js'
import { default as which } from "which";
import * as readline from "readline";
import { spawn as spawnCallback } from "cross-spawn";
import * as consumers from 'stream/consumers'
import * as zlib from 'node:zlib'
import { Readable } from "node:stream";
let verbosity = 2;
@ -515,6 +519,8 @@ async function runInternalJob(
return [requestHash, await runQuestion(requestToPerform)];
} else if (requestToPerform.url === "elm-pages-internal://shell") {
return [requestHash, await runShell(requestToPerform)];
} else if (requestToPerform.url === "elm-pages-internal://stream") {
return [requestHash, await runStream(requestToPerform)];
} else if (requestToPerform.url === "elm-pages-internal://start-spinner") {
return [requestHash, runStartSpinner(requestToPerform)];
} else if (requestToPerform.url === "elm-pages-internal://stop-spinner") {
@ -571,6 +577,80 @@ async function runWhich(req) {
async function runQuestion(req) {
return jsonResponse(req, await question(req.body.args[0]));
function runStream(req) {
return new Promise(async (resolve, reject) => {
try {
const cwd = path.resolve(...req.dir);
const quiet = req.quiet;
const env = { ...process.env, ...req.env };
const kind = req.body.args[0].kind;
const parts = req.body.args[0].parts;
let lastStream = null;
parts.forEach((part, index) => {
let isLastProcess = index === parts.length - 1;
let thisStream = pipePartToStream(lastStream, part, { cwd, quiet, env });
lastStream = thisStream;
if (kind === "json") {
resolve(jsonResponse(req, await consumers.json(lastStream)));
} else if (kind === "text") {
resolve(jsonResponse(req, await consumers.text(lastStream)));
} else {
lastStream.once("finish", async () => {
resolve(jsonResponse(req, null));
lastStream.once("error", (error) => {
console.log('Stream error!');
reject(jsonResponse(req, null));
} catch (error) {
* @param {import('node:stream').Stream} lastStream
* @param {{ name: string }} part
* @param {{cwd: string, quiet: boolean, env: object}} param2
* @returns
function pipePartToStream(lastStream, part, { cwd, quiet, env }) {
if (verbosity > 1 && !quiet) {
if ( === "stdout") {
return lastStream.pipe(process.stdout);
} else if ( === "stdin") {
return process.stdin;
} else if ( === "fileRead") {
return fs.createReadStream(part.path);
} else if ( === "gzip") {
return lastStream.pipe(zlib.createGzip());
} else if ( === "unzip") {
return lastStream.pipe(zlib.createUnzip());
} else if ( === "fileWrite") {
return lastStream.pipe(fs.createWriteStream(part.path));
} else if ( === "command") {
const {command, args} = part;
const newProcess = spawnCallback(command, args, {
stdio: ["pipe", "pipe", "pipe"],
lastStream && lastStream.pipe(newProcess.stdin);
return newProcess.stdout;
} else if ( === "fromString") {
return Readable.from([part.string]);
} else {
console.error(`Unknown stream part: ${}!`);
async function runShell(req) {
const cwd = path.resolve(...req.dir);

src/Stream.elm Normal file
View File

@ -0,0 +1,168 @@
module Stream exposing (Stream, command, fileRead, fileWrite, fromString, httpRead, httpWrite, pipe, read, run, stdin, stdout, gzip, readJson, unzip)
@docs Stream, command, fileRead, fileWrite, fromString, httpRead, httpWrite, pipe, read, run, stdin, stdout, gzip, readJson, unzip
import BackendTask exposing (BackendTask)
import BackendTask.Http exposing (Body)
import BackendTask.Internal.Request
import Bytes exposing (Bytes)
import FatalError exposing (FatalError)
import Json.Decode as Decode exposing (Decoder)
import Json.Encode as Encode
{-| -}
type Stream kind
= Stream (List StreamPart)
type StreamPart
= StreamPart String (List ( String, Encode.Value ))
single : String -> List ( String, Encode.Value ) -> Stream kind
single inner1 inner2 =
Stream [ StreamPart inner1 inner2 ]
{-| -}
stdin : Stream { read : (), write : Never }
stdin =
single "stdin" []
{-| -}
stdout : Stream { read : Never, write : () }
stdout =
single "stdout" []
{-| -}
fileRead : String -> Stream { read : (), write : Never }
fileRead path =
single "fileRead" [ ( "path", Encode.string path ) ]
{-| -}
fileWrite : String -> Stream { read : Never, write : () }
fileWrite path =
single "fileWrite" [ ( "path", Encode.string path ) ]
{-| -}
gzip : Stream { read : (), write : () }
gzip =
single "gzip" []
{-| -}
unzip : Stream { read : (), write : () }
unzip =
single "unzip" []
{-| -}
httpRead :
{ url : String
, method : String
, headers : List ( String, String )
, body : Body
, retries : Maybe Int
, timeoutInMs : Maybe Int
-> Stream { read : (), write : Never }
httpRead string =
single "httpRead" []
{-| -}
httpWrite :
{ url : String
, method : String
, headers : List ( String, String )
, retries : Maybe Int
, timeoutInMs : Maybe Int
-> Stream { read : Never, write : () }
httpWrite string =
single "httpWrite" []
{-| -}
pipe :
-- to
Stream { read : toReadable, write : toWriteable }
-- from
-> Stream { read : (), write : fromWriteable }
-> Stream { read : toReadable, write : toWriteable }
pipe (Stream to) (Stream from) =
Stream (from ++ to)
{-| -}
run : Stream { read : read, write : () } -> BackendTask FatalError ()
run stream =
{ name = "stream"
, body = BackendTask.Http.jsonBody (pipelineEncoder stream "none")
, expect = BackendTask.Http.expectJson (Decode.succeed ())
pipelineEncoder : Stream a -> String -> Encode.Value
pipelineEncoder (Stream parts) kind =
[ ( "kind", Encode.string kind )
, ( "parts"
, Encode.list
(\(StreamPart name data) ->
Encode.object (( "name", Encode.string name ) :: data)
{-| -}
fromString : String -> Stream { read : (), write : Never }
fromString string =
single "fromString" [ ( "string", Encode.string string ) ]
{-| -}
read : Stream { read : (), write : write } -> BackendTask FatalError String
read stream =
{ name = "stream"
, body = BackendTask.Http.jsonBody (pipelineEncoder stream "text")
, expect = BackendTask.Http.expectJson Decode.string
{-| -}
readJson : Decoder value -> Stream { read : (), write : write } -> BackendTask FatalError value
readJson decoder stream =
{ name = "stream"
, body = BackendTask.Http.jsonBody (pipelineEncoder stream "json")
, expect = BackendTask.Http.expectJson decoder
{-| -}
readBytes : Stream { read : (), write : write } -> BackendTask FatalError Bytes
readBytes stream = (FatalError.fromString "Not implemented")
{-| -}
command : String -> List String -> Stream { read : read, write : write }
command command_ args_ =
single "command"
[ ( "command", Encode.string command_ )
, ( "args", Encode.list Encode.string args_ )