uploads: might have solved everything

This commit is contained in:
@wwwjim 2020-10-10 01:41:50 -07:00
parent ba294b3043
commit b89062fa2c
3 changed files with 64 additions and 2 deletions

View File

@ -4,6 +4,7 @@ import * as Social from "~/node_common/social";
import * as Strings from "~/common/strings";
import * as Logs from "~/node_common/script-logging";
import Throttle from "~/node_common/vendor/throttle";
import AbortController from "abort-controller";
import BusBoyConstructor from "busboy";
import Queue from "p-queue";
@ -47,7 +48,7 @@ export async function formMultipart(req, res, { user, bucketName }) {
const busboy = new BusBoyConstructor({
headers: req.headers,
highWaterMark: HIGH_WATER_MARK,
highWaterMark: HIGH_WATER_MARK * 5,
});
const _createStreamAndUploadToTextile = async (writableStream) => {
@ -166,7 +167,9 @@ export async function formMultipart(req, res, { user, bucketName }) {
});
Logs.task("req.pipe(writableStream)", WORKER_NAME);
req.pipe(writableStream);
req
.pipe(new Throttle({ bytes: HIGH_WATER_MARK, interval: 400 }))
.pipe(writableStream);
});
};

55
node_common/vendor/throttle.js vendored Normal file
View File

@ -0,0 +1,55 @@
const { Transform } = require("stream");
/**
* Transform stream with throttle functionality
*
* @class
* @extends Transform
* @param {Object} options - Configuration options
* @param {Integer} options.bytes - Number of bytes to send in one chunk
* @param {Integer} options.interval - Interval for sending chunks, in miliseconds
*/
export default class Throttle extends Transform {
constructor(options) {
super();
Object.assign(this, options);
this.previousPassTime = Date.now();
this.queue = [];
this.intervalId = this.setPushInterval();
}
_transform(chunks, _, cb) {
for (const chunk of chunks) {
this.queue.push(chunk);
}
this.isQueueFull() ? setTimeout(cb, this.interval) : cb();
}
_flush(cb) {
clearInterval(this.intervalId);
this.intervalId = this.setPushInterval(cb);
}
setPushInterval(cb = null) {
return setInterval(() => {
const elapsedTime = Date.now() - this.previousPassTime;
if (elapsedTime < this.interval) return;
if (this.queue.length > 0) {
this.push(this.getChunk());
return (this.previousPassTime += elapsedTime);
}
clearInterval(this.intervalId);
return cb && cb();
}, this.interval / 10);
}
getChunk() {
return Buffer.from(this.queue.splice(0, this.bytes));
}
isQueueFull() {
return this.queue.length >= 2 * this.bytes;
}
}

View File

@ -268,4 +268,8 @@ app.prepare().then(async () => {
console.log(`[ slate ] client: http://localhost:${Environment.PORT}`);
});
// NOTE(jim): Node 14 quirk where you have to set this 0, you can't actually
// Have a long timeout, If you set it to 0 you just disable it.
listenServer.headersTimeout = 0;
});