From b89062fa2c58a349b44d20c327a58c5973819d16 Mon Sep 17 00:00:00 2001 From: "@wwwjim" Date: Sat, 10 Oct 2020 01:41:50 -0700 Subject: [PATCH] uploads: might have solved everything --- node_common/upload.js | 7 +++-- node_common/vendor/throttle.js | 55 ++++++++++++++++++++++++++++++++++ server.js | 4 +++ 3 files changed, 64 insertions(+), 2 deletions(-) create mode 100644 node_common/vendor/throttle.js diff --git a/node_common/upload.js b/node_common/upload.js index f5d4160a..ba919a93 100644 --- a/node_common/upload.js +++ b/node_common/upload.js @@ -4,6 +4,7 @@ import * as Social from "~/node_common/social"; import * as Strings from "~/common/strings"; import * as Logs from "~/node_common/script-logging"; +import Throttle from "~/node_common/vendor/throttle"; import AbortController from "abort-controller"; import BusBoyConstructor from "busboy"; import Queue from "p-queue"; @@ -47,7 +48,7 @@ export async function formMultipart(req, res, { user, bucketName }) { const busboy = new BusBoyConstructor({ headers: req.headers, - highWaterMark: HIGH_WATER_MARK, + highWaterMark: HIGH_WATER_MARK * 5, }); const _createStreamAndUploadToTextile = async (writableStream) => { @@ -166,7 +167,9 @@ export async function formMultipart(req, res, { user, bucketName }) { }); Logs.task("req.pipe(writableStream)", WORKER_NAME); - req.pipe(writableStream); + req + .pipe(new Throttle({ bytes: HIGH_WATER_MARK, interval: 400 })) + .pipe(writableStream); }); }; diff --git a/node_common/vendor/throttle.js b/node_common/vendor/throttle.js new file mode 100644 index 00000000..2c654476 --- /dev/null +++ b/node_common/vendor/throttle.js @@ -0,0 +1,55 @@ +const { Transform } = require("stream"); + +/** + * Transform stream with throttle functionality + * + * @class + * @extends Transform + * @param {Object} options - Configuration options + * @param {Integer} options.bytes - Number of bytes to send in one chunk + * @param {Integer} options.interval - Interval for sending chunks, in miliseconds + */ +export default class Throttle extends Transform { + constructor(options) { + super(); + Object.assign(this, options); + this.previousPassTime = Date.now(); + this.queue = []; + this.intervalId = this.setPushInterval(); + } + + _transform(chunks, _, cb) { + for (const chunk of chunks) { + this.queue.push(chunk); + } + this.isQueueFull() ? setTimeout(cb, this.interval) : cb(); + } + + _flush(cb) { + clearInterval(this.intervalId); + this.intervalId = this.setPushInterval(cb); + } + + setPushInterval(cb = null) { + return setInterval(() => { + const elapsedTime = Date.now() - this.previousPassTime; + if (elapsedTime < this.interval) return; + + if (this.queue.length > 0) { + this.push(this.getChunk()); + return (this.previousPassTime += elapsedTime); + } + + clearInterval(this.intervalId); + return cb && cb(); + }, this.interval / 10); + } + + getChunk() { + return Buffer.from(this.queue.splice(0, this.bytes)); + } + + isQueueFull() { + return this.queue.length >= 2 * this.bytes; + } +} diff --git a/server.js b/server.js index c020bcf7..72d7dfe8 100644 --- a/server.js +++ b/server.js @@ -268,4 +268,8 @@ app.prepare().then(async () => { console.log(`[ slate ] client: http://localhost:${Environment.PORT}`); }); + + // NOTE(jim): Node 14 quirk where you have to set this 0, you can't actually + // Have a long timeout, If you set it to 0 you just disable it. + listenServer.headersTimeout = 0; });