Improve stream error handling.

This commit is contained in:
Dillon Kearns 2024-04-05 09:46:17 -07:00
parent 5169843215
commit 1d4d80d902
5 changed files with 159 additions and 64 deletions

View File

@ -1,4 +1,4 @@
import { Transform } from "node:stream";
import { Transform, Readable } from "node:stream";
export async function hello(input, { cwd, env }) {
return `Hello!`;
@ -12,6 +12,11 @@ export async function upperCaseStream() {
});
}
export async function readStreamNotADuplex() {
return process.stdin;
export async function customReadStream() {
return new Readable({
read(size) {
this.push("hello");
this.push(null);
},
});
}

View File

@ -44,6 +44,11 @@ run =
|> Stream.read
|> test "custom duplex"
(Expect.equal "ASDF\nQWER\n")
, Stream.fromString "qwer\n"
|> Stream.pipe (Stream.customDuplex "customReadStream" Encode.null)
|> Stream.read
|> expectError "invalid stream"
"Expected 'customReadStream' to be a duplex stream!"
, Stream.fileRead "elm.json"
|> Stream.pipe Stream.gzip
|> Stream.pipe (Stream.fileWrite zipFile)
@ -91,6 +96,25 @@ test name toExpectation task =
)
expectError : String -> String -> BackendTask FatalError a -> BackendTask FatalError Test.Test
expectError name message task =
task
|> BackendTask.toResult
|> BackendTask.map
(\result ->
Test.test name <|
\() ->
case result of
Ok data ->
Expect.fail "Expected a failure, but got success!"
Err error ->
error
|> Expect.equal
(FatalError.fromString message)
)
try : BackendTask { error | fatal : FatalError } data -> BackendTask FatalError data
try =
BackendTask.allowFatal

View File

@ -21,8 +21,7 @@ import {ChildProcess} from 'node:child_process';
import * as consumers from 'stream/consumers'
import * as zlib from 'node:zlib'
import { Readable } from "node:stream";
import * as validateStream from "./validate-stream.js";
let verbosity = 2;
const spinnies = new Spinnies();
@ -582,57 +581,72 @@ async function runQuestion(req) {
return jsonResponse(req, await question(req.body.args[0]));
}
function runStream(req, portsFile) {
return new Promise(async (resolve, reject) => {
try {
const cwd = path.resolve(...req.dir);
const quiet = req.quiet;
const env = { ...process.env, ...req.env };
const kind = req.body.args[0].kind;
const parts = req.body.args[0].parts;
return new Promise(async (resolve) => {
let lastStream = null;
let index = 0;
try {
const cwd = path.resolve(...req.dir);
const quiet = req.quiet;
const env = { ...process.env, ...req.env };
const kind = req.body.args[0].kind;
const parts = req.body.args[0].parts;
let index = 0;
for (const part of parts) {
let isLastProcess = index === parts.length - 1;
let thisStream;
if (isLastProcess && (kind === "command" || kind === "commandCode")) {
const {command, args} = part;
let stdio;
if (kind === "command") {
stdio = ["pipe", "pipe", "pipe"];
} else if (kind === "commandCode") {
stdio = quiet ? ['pipe', 'ignore', 'ignore'] : ['pipe', 'inherit', 'inherit'];
} else {
throw new Error(`Unknown kind: ${kind}`);
}
const newProcess = spawnCallback(command, args, {
stdio,
cwd: cwd,
env: env,
const { command, args } = part;
let stdio;
if (kind === "command") {
stdio = ["pipe", "pipe", "pipe"];
} else if (kind === "commandCode") {
stdio = quiet
? ["pipe", "ignore", "ignore"]
: ["pipe", "inherit", "inherit"];
} else {
throw new Error(`Unknown kind: ${kind}`);
}
const newProcess = spawnCallback(command, args, {
stdio,
cwd: cwd,
env: env,
});
lastStream && lastStream.pipe(newProcess.stdin);
if (kind === "command") {
let stdoutOutput = "";
let stderrOutput = "";
let combinedOutput = "";
newProcess.stderr.on("data", function (data) {
stderrOutput += data;
combinedOutput += data;
});
lastStream && lastStream.pipe(newProcess.stdin);
if (kind === "command") {
let stdoutOutput = "";
let stderrOutput = "";
let combinedOutput = "";
newProcess.stderr.on("data", function (data) {
stderrOutput += data;
combinedOutput += data;
});
newProcess.stdout.on("data", function (data) {
stdoutOutput += data;
combinedOutput += data;
});
newProcess.on("close", async (exitCode) => {
resolve(jsonResponse(req, { stdoutOutput, stderrOutput, combinedOutput, exitCode }));
});
} else {
newProcess.on("close", async (exitCode) => {
resolve(jsonResponse(req, { exitCode }));
});
}
newProcess.stdout.on("data", function (data) {
stdoutOutput += data;
combinedOutput += data;
});
newProcess.on("close", async (exitCode) => {
resolve(
jsonResponse(req, {
stdoutOutput,
stderrOutput,
combinedOutput,
exitCode,
})
);
});
} else {
newProcess.on("close", async (exitCode) => {
resolve(jsonResponse(req, { exitCode }));
});
}
} else {
thisStream = await pipePartToStream(lastStream, part, { cwd, quiet, env }, portsFile);
thisStream = await pipePartToStream(
lastStream,
part,
{ cwd, quiet, env },
portsFile,
(value) => resolve(jsonResponse(req, value))
);
}
lastStream = thisStream;
index += 1;
@ -657,23 +671,30 @@ function runStream(req, portsFile) {
// console.error(error);
// reject(jsonResponse(req, null));
// });
} catch (error) {
console.trace(error);
process.exit(1);
if (lastStream) {
lastStream.destroy();
}
resolve(jsonResponse(req, { error: error.toString() }));
}
});
}
/**
*
* @param {import('node:stream').Stream} lastStream
* @param {{ name: string }} part
* @param {{cwd: string, quiet: boolean, env: object}} param2
* @returns
*
* @param {import('node:stream').Stream} lastStream
* @param {{ name: string }} part
* @param {{cwd: string, quiet: boolean, env: object}} param2
* @returns
*/
async function pipePartToStream(lastStream, part, { cwd, quiet, env }, portsFile) {
async function pipePartToStream(
lastStream,
part,
{ cwd, quiet, env },
portsFile,
resolve
) {
if (verbosity > 1 && !quiet) {
}
if (part.name === "stdout") {
@ -683,7 +704,19 @@ async function pipePartToStream(lastStream, part, { cwd, quiet, env }, portsFile
} else if (part.name === "fileRead") {
return fs.createReadStream(path.resolve(cwd, part.path));
} else if (part.name === "customDuplex") {
return portsFile[part.portName](part.input, { cwd, quiet, env });
// console.log(part);
const newLocal = await portsFile[part.portName](part.input, {
cwd,
quiet,
env,
});
if (validateStream.isDuplexStream(newLocal)) {
console.log("Piping to duplex stream!", part.portName);
lastStream.pipe(newLocal);
return newLocal;
} else {
throw `Expected '${part.portName}' to be a duplex stream!`;
}
} else if (part.name === "customRead") {
return portsFile[part.portName](part.input, { cwd, quiet, env });
} else if (part.name === "customWrite") {
@ -695,7 +728,7 @@ async function pipePartToStream(lastStream, part, { cwd, quiet, env }, portsFile
} else if (part.name === "fileWrite") {
return lastStream.pipe(fs.createWriteStream(path.resolve(part.path)));
} else if (part.name === "command") {
const {command, args, allowNon0Status} = part;
const { command, args, allowNon0Status } = part;
/**
* @type {import('node:child_process').ChildProcess}
*/
@ -711,7 +744,6 @@ async function pipePartToStream(lastStream, part, { cwd, quiet, env }, portsFile
newProcess.on("exit", (code) => {
if (code !== 0) {
if (allowNon0Status) {
} else {
console.error("ERROR in exit code!", code);
process.exit(1);
@ -723,8 +755,9 @@ async function pipePartToStream(lastStream, part, { cwd, quiet, env }, portsFile
} else if (part.name === "fromString") {
return Readable.from([part.string]);
} else {
console.error(`Unknown stream part: ${part.name}!`);
process.exit(1);
// console.error(`Unknown stream part: ${part.name}!`);
// process.exit(1);
throw `Unknown stream part: ${part.name}!`;
}
}

View File

@ -0,0 +1,25 @@
// source: https://www.30secondsofcode.org/js/s/typecheck-nodejs-streams/
export function isReadableStream(val) {
return (
val !== null &&
typeof val === "object" &&
typeof val.pipe === "function" &&
typeof val._read === "function" &&
typeof val._readableState === "object"
);
}
export function isWritableStream(val) {
return (
val !== null &&
typeof val === "object" &&
typeof val.pipe === "function" &&
typeof val._write === "function" &&
typeof val._writableState === "object"
);
}
export function isDuplexStream(val) {
return isReadableStream(val) && isWritableStream(val);
}

View File

@ -193,8 +193,16 @@ read stream =
BackendTask.Internal.Request.request
{ name = "stream"
, body = BackendTask.Http.jsonBody (pipelineEncoder stream "text")
, expect = BackendTask.Http.expectJson Decode.string
, expect =
BackendTask.Http.expectJson
(Decode.oneOf
[ Decode.field "error" Decode.string
|> Decode.map (FatalError.fromString >> Err)
, Decode.string |> Decode.map Ok
]
)
}
|> BackendTask.andThen BackendTask.fromResult
{-| -}