Handle createReadStream errors.

This commit is contained in:
Dillon Kearns 2024-04-17 15:26:46 -07:00
parent 5a5e952a40
commit e5e1798459
3 changed files with 215 additions and 156 deletions

View File

@ -1,4 +1,4 @@
import { Transform, Readable } from "node:stream";
import { Writable, Transform, Readable } from "node:stream";
export async function hello(input, { cwd, env }) {
return `Hello!`;
@ -26,9 +26,17 @@ export async function customReadStream() {
export async function customWrite(input) {
return {
stream: process.stdout,
stream: stdout(),
metadata: () => {
return "Hi! I'm metadata from customWriteStream!";
},
};
}
function stdout() {
return new Writable({
write(chunk, encoding, callback) {
process.stdout.write(chunk, callback);
},
});
}

View File

@ -186,6 +186,10 @@ b =
|> try
|> test "writeStream meta"
(Expect.equal "Hi! I'm metadata from customWriteStream!")
, Stream.fileRead "does-not-exist"
|> Stream.run
|> expectError "file not found error"
"Error: ENOENT: no such file or directory, open '/Users/dillonkearns/src/github.com/dillonkearns/elm-pages/examples/end-to-end/does-not-exist'"
]

View File

@ -19,7 +19,7 @@ import * as readline from "readline";
import { spawn as spawnCallback } from "cross-spawn";
import * as consumers from "stream/consumers";
import * as zlib from "node:zlib";
import { Readable } from "node:stream";
import { Readable, Writable } from "node:stream";
import * as validateStream from "./validate-stream.js";
import { default as makeFetchHappenOriginal } from "make-fetch-happen";
import mergeStreams from "@sindresorhus/merge-streams";
@ -627,15 +627,194 @@ function runStream(req, portsFile) {
})
);
} else if (kind === "none") {
resolve(
jsonResponse(req, {
body: null,
metadata: await tryCallingFunction(metadataResponse),
})
);
if (!lastStream) {
resolve(jsonResponse(req, { body: null }));
} else {
let resolvedMeta = await tryCallingFunction(metadataResponse);
lastStream.once("finish", async () => {
resolve(
jsonResponse(req, {
body: null,
metadata: resolvedMeta,
})
);
});
lastStream.once("end", async () => {
resolve(
jsonResponse(req, {
body: null,
metadata: resolvedMeta,
})
);
});
}
} else if (kind === "command") {
// already handled in parts.forEach
}
/**
*
* @param {import('node:stream').Stream} lastStream
* @param {{ name: string }} part
* @param {{cwd: string, quiet: boolean, env: object}} param2
* @returns {Promise<{stream: import('node:stream').Stream, metadata?: any}>}
*/
async function pipePartToStream(
lastStream,
part,
{ cwd, quiet, env },
portsFile,
resolve,
isLastProcess,
kind
) {
if (verbosity > 1 && !quiet) {
}
if (part.name === "stdout") {
return { stream: lastStream.pipe(stdout()) };
} else if (part.name === "stderr") {
return { stream: lastStream.pipe(stderr()) };
} else if (part.name === "stdin") {
return { stream: process.stdin };
} else if (part.name === "fileRead") {
const newLocal = fs.createReadStream(path.resolve(cwd, part.path));
newLocal.once("error", (error) => {
newLocal.close();
resolve({ error: error.toString() });
});
return { stream: newLocal };
} else if (part.name === "customDuplex") {
const newLocal = await portsFile[part.portName](part.input, {
cwd,
quiet,
env,
});
if (validateStream.isDuplexStream(newLocal.stream)) {
lastStream.pipe(newLocal.stream);
return newLocal;
} else {
throw `Expected '${part.portName}' to be a duplex stream!`;
}
} else if (part.name === "customRead") {
return {
metadata: null,
stream: await portsFile[part.portName](part.input, {
cwd,
quiet,
env,
}),
};
} else if (part.name === "customWrite") {
const newLocal = await portsFile[part.portName](part.input, {
cwd,
quiet,
env,
});
if (!validateStream.isWritableStream(newLocal.stream)) {
console.error("Expected a writable stream!");
resolve({ error: "Expected a writable stream!" });
} else {
lastStream && lastStream.pipe(newLocal.stream);
}
return newLocal;
} else if (part.name === "gzip") {
return { metadata: null, stream: lastStream.pipe(zlib.createGzip()) };
} else if (part.name === "unzip") {
return {
metadata: null,
stream: lastStream.pipe(zlib.createUnzip()),
};
} else if (part.name === "fileWrite") {
return {
metadata: null,
stream: lastStream.pipe(
fs.createWriteStream(path.resolve(part.path))
),
};
} else if (part.name === "httpWrite") {
const makeFetchHappen = makeFetchHappenOriginal.defaults({
// cache: mode === "build" ? "no-cache" : "default",
cache: "default",
});
const response = await makeFetchHappen(part.url, {
body: lastStream,
duplex: "half",
redirect: "follow",
method: part.method,
headers: part.headers,
retry: part.retries,
timeout: part.timeoutInMs,
});
let metadata = () => {
return {
headers: Object.fromEntries(response.headers.entries()),
statusCode: response.status,
// bodyKind,
url: response.url,
statusText: response.statusText,
};
};
return { metadata, stream: response.body };
} else if (part.name === "command") {
const { command, args, allowNon0Status, output } = part;
/** @type {'ignore' | 'inherit'} } */
let letPrint = quiet ? "ignore" : "inherit";
let stderrKind = kind === "none" ? letPrint : "pipe";
if (output === "Ignore") {
stderrKind = "ignore";
} else if (output === "Print") {
stderrKind = letPrint;
}
/**
* @type {import('node:child_process').ChildProcess}
*/
const newProcess = spawnCallback(command, args, {
stdio: [
"pipe",
// if we are capturing stderr instead of stdout, print out stdout with `inherit`
output === "InsteadOfStdout" || kind === "none"
? letPrint
: "pipe",
stderrKind,
],
cwd: cwd,
env: env,
});
newProcess.once("error", (error) => {
resolve({ error: error.toString() });
});
lastStream && lastStream.pipe(newProcess.stdin);
let newStream;
if (output === "MergeWithStdout") {
newStream = mergeStreams([newProcess.stdout, newProcess.stderr]);
} else if (output === "InsteadOfStdout") {
newStream = newProcess.stderr;
} else {
newStream = newProcess.stdout;
}
if (isLastProcess) {
return {
stream: newStream,
metadata: new Promise((resolve) => {
newProcess.once("exit", (code) => {
resolve({
exitCode: code,
});
});
}),
};
} else {
return { metadata: null, stream: newStream };
}
} else if (part.name === "fromString") {
return { stream: Readable.from([part.string]), metadata: null };
} else {
// console.error(`Unknown stream part: ${part.name}!`);
// process.exit(1);
throw `Unknown stream part: ${part.name}!`;
}
}
} catch (error) {
if (lastStream) {
lastStream.destroy();
@ -646,6 +825,21 @@ function runStream(req, portsFile) {
});
}
function stdout() {
return new Writable({
write(chunk, encoding, callback) {
process.stdout.write(chunk, callback);
},
});
}
function stderr() {
return new Writable({
write(chunk, encoding, callback) {
process.stderr.write(chunk, callback);
},
});
}
async function tryCallingFunction(func) {
if (func) {
// if is promise
@ -661,154 +855,7 @@ async function tryCallingFunction(func) {
}
}
/**
*
* @param {import('node:stream').Stream} lastStream
* @param {{ name: string }} part
* @param {{cwd: string, quiet: boolean, env: object}} param2
* @returns {Promise<{stream: import('node:stream').Stream, metadata: any}>}
*/
async function pipePartToStream(
lastStream,
part,
{ cwd, quiet, env },
portsFile,
resolve,
isLastProcess,
kind
) {
if (verbosity > 1 && !quiet) {
}
if (part.name === "stdout") {
return { stream: lastStream.pipe(process.stdout) };
} else if (part.name === "stderr") {
return { stream: lastStream.pipe(process.stderr) };
} else if (part.name === "stdin") {
return { stream: process.stdin };
} else if (part.name === "fileRead") {
return { stream: fs.createReadStream(path.resolve(cwd, part.path)) };
} else if (part.name === "customDuplex") {
const newLocal = await portsFile[part.portName](part.input, {
cwd,
quiet,
env,
});
if (validateStream.isDuplexStream(newLocal.stream)) {
lastStream.pipe(newLocal.stream);
return newLocal;
} else {
throw `Expected '${part.portName}' to be a duplex stream!`;
}
} else if (part.name === "customRead") {
return {
metadata: null,
stream: await portsFile[part.portName](part.input, { cwd, quiet, env }),
};
} else if (part.name === "customWrite") {
const newLocal = await portsFile[part.portName](part.input, {
cwd,
quiet,
env,
});
if (!validateStream.isWritableStream(newLocal.stream)) {
console.error("Expected a writable stream!");
resolve({ error: "Expected a writable stream!" });
} else {
lastStream && lastStream.pipe(newLocal.stream);
}
return newLocal;
} else if (part.name === "gzip") {
return { metadata: null, stream: lastStream.pipe(zlib.createGzip()) };
} else if (part.name === "unzip") {
return { metadata: null, stream: lastStream.pipe(zlib.createUnzip()) };
} else if (part.name === "fileWrite") {
return {
metadata: null,
stream: lastStream.pipe(fs.createWriteStream(path.resolve(part.path))),
};
} else if (part.name === "httpWrite") {
const makeFetchHappen = makeFetchHappenOriginal.defaults({
// cache: mode === "build" ? "no-cache" : "default",
cache: "default",
});
const response = await makeFetchHappen(part.url, {
body: lastStream,
duplex: "half",
redirect: "follow",
method: part.method,
headers: part.headers,
retry: part.retries,
timeout: part.timeoutInMs,
});
let metadata = () => {
return {
headers: Object.fromEntries(response.headers.entries()),
statusCode: response.status,
// bodyKind,
url: response.url,
statusText: response.statusText,
};
};
return { metadata, stream: response.body };
} else if (part.name === "command") {
const { command, args, allowNon0Status, output } = part;
/** @type {'ignore' | 'inherit'} } */
let letPrint = quiet ? "ignore" : "inherit";
let stderrKind = kind === "none" ? letPrint : "pipe";
if (output === "Ignore") {
stderrKind = "ignore";
} else if (output === "Print") {
stderrKind = letPrint;
}
/**
* @type {import('node:child_process').ChildProcess}
*/
const newProcess = spawnCallback(command, args, {
stdio: [
"pipe",
// if we are capturing stderr instead of stdout, print out stdout with `inherit`
output === "InsteadOfStdout" || kind === "none" ? letPrint : "pipe",
stderrKind,
],
cwd: cwd,
env: env,
});
newProcess.once("error", (error) => {
resolve({ error: error.toString() });
});
lastStream && lastStream.pipe(newProcess.stdin);
let newStream;
if (output === "MergeWithStdout") {
newStream = mergeStreams([newProcess.stdout, newProcess.stderr]);
} else if (output === "InsteadOfStdout") {
newStream = newProcess.stderr;
} else {
newStream = newProcess.stdout;
}
if (isLastProcess) {
return {
stream: newStream,
metadata: new Promise((resolve) => {
newProcess.once("exit", (code) => {
resolve({
exitCode: code,
});
});
}),
};
} else {
return { metadata: null, stream: newStream };
}
} else if (part.name === "fromString") {
return { stream: Readable.from([part.string]), metadata: null };
} else {
// console.error(`Unknown stream part: ${part.name}!`);
// process.exit(1);
throw `Unknown stream part: ${part.name}!`;
}
}
async function runShell(req) {
const cwd = path.resolve(...req.dir);