uploads: finally streams work

This commit is contained in:
jimmylee 2020-08-16 21:22:35 -07:00
parent b82e01d8da
commit 9e4ca3d2ae
3 changed files with 52 additions and 83 deletions

View File

@ -2,101 +2,70 @@ import * as LibraryManager from "~/node_common/managers/library";
import * as Utilities from "~/node_common/utilities"; import * as Utilities from "~/node_common/utilities";
import B from "busboy"; import B from "busboy";
import util from "util";
import { Readable } from "stream";
const HIGH_WATER_MARK = 1024 * 1024 * 3; const HIGH_WATER_MARK = 1024 * 1024 * 3;
const ReadStream = function (object) { export const formMultipart = async (req, res, { user }) => {
if (object instanceof Buffer || typeof object === "string") { let data = null;
Readable.call(this, {
highWaterMark: HIGH_WATER_MARK,
encoding: "utf8",
});
} else {
Readable.call(this, { objectMode: true });
}
this._object = object;
};
util.inherits(ReadStream, Readable); const upload = () =>
new Promise(async (resolve, reject) => {
ReadStream.prototype._read = function () { let form = new B({
this.push(this._object); headers: req.headers,
this.push(null); highWaterMark: HIGH_WATER_MARK,
this._object = null;
};
const createReadStream = (object) => {
return new ReadStream(object);
};
export const formMultipart = (req, res, { user }) =>
new Promise(async (resolve, reject) => {
let form = new B({
headers: req.headers,
highWaterMark: HIGH_WATER_MARK,
});
let fields = [];
let buffer = null;
let target = null;
form.on("file", function (fieldname, file, filename, encoding, mime) {
file.on("data", (data) => {
fields.push(data);
}); });
file.on("end", () => { form.on("file", async function (fieldname, stream, filename, encoding, mime) {
target = { data = LibraryManager.createLocalDataIncomplete({
type: mime,
name: filename, name: filename,
}; type: mime,
});
buffer = Buffer.concat(fields); let push;
try {
const token = user.data.tokens.api;
const { buckets, bucketKey } = await Utilities.getBucketAPIFromUserToken(token);
push = await buckets.pushPath(bucketKey, data.id, stream);
} catch (e) {
return reject({
decorator: "SERVER_BUCKETS_PUSH_ISSUE",
error: true,
message: e,
});
}
return resolve({ decorator: "SERVER_BUCKET_STREAM_SUCCESS", data: push.path.path });
}); });
});
form.on("error", (e) => { form.on("error", (e) => {
return reject({
decorator: "SERVER_UPLOAD_PARSE_FAILURE",
error: true,
message: e,
});
});
form.on("finish", async () => {
const data = LibraryManager.createLocalDataIncomplete(target);
const stream = createReadStream(buffer);
let push;
try {
const { buckets, bucketKey } = await Utilities.getBucketAPIFromUserToken(user.data.tokens.api);
push = await buckets.pushPath(bucketKey, data.name, stream);
} catch (e) {
return reject({ return reject({
decorator: "SERVER_BUCKETS_PUSH_ISSUE", decorator: "SERVER_BUCKET_STREAM_FAILURE",
error: true, error: true,
message: e, message: e,
}); });
} });
let ipfs = push.path.path; req.pipe(form);
try {
const { buckets, bucketKey } = await Utilities.getBucketAPIFromUserToken(user.data.tokens.api);
const newUpload = await buckets.listIpfsPath(ipfs);
data.size = newUpload.size;
} catch (e) {
return reject({
decorator: "SERVER_BUCKETS_VERIFY_ISSUE",
error: true,
message: e,
});
}
return resolve({ decorator: "SERVER_UPLOAD_SUCCESS", data, ipfs });
}); });
return req.pipe(form); const response = await upload();
});
if (response.error) {
return response;
}
try {
const token = user.data.tokens.api;
const { buckets } = await Utilities.getBucketAPIFromUserToken(token);
const newUpload = await buckets.listIpfsPath(response.data);
data.size = newUpload.size;
} catch (e) {
return {
decorator: "SERVER_BUCKETS_VERIFY_ISSUE",
error: true,
message: e,
};
}
return { decorator: "SERVER_UPLOAD_SUCCESS", data, ipfs: response.data };
};

View File

@ -1,5 +1,5 @@
import * as MW from "~/node_common/middleware"; import * as MW from "~/node_common/middleware";
import * as Upload from "~/node_common/upload-fs"; import * as Upload from "~/node_common/upload";
import * as Utilities from "~/node_common/utilities"; import * as Utilities from "~/node_common/utilities";
import * as Data from "~/node_common/data"; import * as Data from "~/node_common/data";
import * as LibraryManager from "~/node_common/managers/library"; import * as LibraryManager from "~/node_common/managers/library";

View File

@ -2,7 +2,7 @@ import * as MW from "~/node_common/middleware";
import * as Data from "~/node_common/data"; import * as Data from "~/node_common/data";
import * as LibraryManager from "~/node_common/managers/library"; import * as LibraryManager from "~/node_common/managers/library";
import * as Strings from "~/common/strings"; import * as Strings from "~/common/strings";
import * as Upload from "~/node_common/upload-fs"; import * as Upload from "~/node_common/upload";
const initCORS = MW.init(MW.CORS); const initCORS = MW.init(MW.CORS);