Enqueue dev server response work when thread pool is all busy.

This commit is contained in:
Dillon Kearns 2021-07-20 14:14:56 -07:00
parent 4ede380fab
commit 951d57a12a

View File

@ -17,6 +17,7 @@ const baseMiddleware = require("./basepath-middleware.js");
* @param {{ port: string; base: string }} options * @param {{ port: string; base: string }} options
*/ */
async function start(options) { async function start(options) {
let threadReadyQueue = [];
let pool = []; let pool = [];
ensureDirSync(path.join(process.cwd(), ".elm-pages", "http-response-cache")); ensureDirSync(path.join(process.cwd(), ".elm-pages", "http-response-cache"));
const cpuCount = os.cpus().length; const cpuCount = os.cpus().length;
@ -65,6 +66,7 @@ async function start(options) {
for (let index = 0; index < poolSize; index++) { for (let index = 0; index < poolSize; index++) {
pool.push(initWorker(options.base)); pool.push(initWorker(options.base));
} }
runPendingWork();
} }
setup(); setup();
@ -115,7 +117,7 @@ async function start(options) {
if (request.url && request.url.startsWith("/stream")) { if (request.url && request.url.startsWith("/stream")) {
handleStream(request, response); handleStream(request, response);
} else { } else {
handleNavigationRequest(request, response, next); await handleNavigationRequest(request, response, next);
} }
} }
@ -226,39 +228,40 @@ async function start(options) {
); );
} }
/**
* @param {string} pathname
* @param {((value: any) => any) | null | undefined} onOk
* @param {((reason: any) => PromiseLike<never>) | null | undefined} onErr
*/
function runRenderThread(pathname, onOk, onErr) { function runRenderThread(pathname, onOk, onErr) {
const readyThread = pool.find((thread) => thread.ready); let cleanUpThread = () => {};
if (!readyThread) { return new Promise(async (resolve, reject) => {
readyThread = pool[0]; const readyThread = await waitForThread();
} console.log(`Rendering ${pathname}`, readyThread.worker.threadId);
const cleanUpThread = () => { cleanUpThread = () => {
cleanUp(readyThread); cleanUp(readyThread);
}; };
return new Promise((resolve, reject) => {
if (readyThread) { readyThread.ready = false;
readyThread.ready = false; readyThread.worker.postMessage({
readyThread.worker.postMessage({ mode: "dev-server",
mode: "dev-server", pathname,
pathname, });
}); readyThread.worker.on("message", (message) => {
readyThread.worker.on("message", (message) => { if (message.tag === "done") {
if (message.tag === "done") { resolve(message.data);
resolve(message.data); } else if (message.tag === "watch") {
} else if (message.tag === "watch") { // console.log("@@@ WATCH", message.data);
// console.log("@@@ WATCH", message.data); message.data.forEach((pattern) => watcher.add(pattern));
message.data.forEach((pattern) => watcher.add(pattern)); } else if (message.tag === "error") {
} else if (message.tag === "error") { reject(message.data);
reject(message.data); } else {
} else { throw `Unhandled message: ${message}`;
throw `Unhandled message: ${message}`; }
} });
}); readyThread.worker.on("error", (error) => {
readyThread.worker.on("error", (error) => { reject(error.context);
reject(error.context); });
});
} else {
console.error("TODO - running out of ready threads not yet handled");
}
}) })
.then(onOk) .then(onOk)
.catch(onErr) .catch(onErr)
@ -271,6 +274,7 @@ async function start(options) {
thread.worker.removeAllListeners("message"); thread.worker.removeAllListeners("message");
thread.worker.removeAllListeners("error"); thread.worker.removeAllListeners("error");
thread.ready = true; thread.ready = true;
runPendingWork();
} }
/** /**
@ -283,7 +287,7 @@ async function start(options) {
const pathname = urlParts.pathname || ""; const pathname = urlParts.pathname || "";
try { try {
await pendingCliCompile; await pendingCliCompile;
runRenderThread( await runRenderThread(
pathname, pathname,
function (renderResult) { function (renderResult) {
const is404 = renderResult.is404; const is404 = renderResult.is404;
@ -357,22 +361,43 @@ async function start(options) {
next(); next();
} }
} }
}
/** /**
* @param {string} basePath * @returns {Promise<{ ready:boolean; worker: Worker }>}
*/ * */
function initWorker(basePath) { function waitForThread() {
let newWorker = { return new Promise((resolve, reject) => {
worker: new Worker(path.join(__dirname, "./render-worker.js"), { threadReadyQueue.push(resolve);
env: SHARE_ENV, runPendingWork();
workerData: { basePath }, });
}), }
ready: false,
}; function runPendingWork() {
newWorker.worker.once("online", () => { const readyThreads = pool.filter((thread) => thread.ready);
newWorker.ready = true; readyThreads.forEach((readyThread) => {
}); const startTask = threadReadyQueue.shift();
return newWorker; if (startTask) {
startTask(readyThread);
}
});
}
/**
* @param {string} basePath
*/
function initWorker(basePath) {
let newWorker = {
worker: new Worker(path.join(__dirname, "./render-worker.js"), {
env: SHARE_ENV,
workerData: { basePath },
}),
ready: false,
};
newWorker.worker.once("online", () => {
newWorker.ready = true;
});
return newWorker;
}
} }
function timeMiddleware() { function timeMiddleware() {