2020-08-01 03:17:07 +03:00
|
|
|
import * as LibraryManager from "~/node_common/managers/library";
|
|
|
|
import * as Utilities from "~/node_common/utilities";
|
2020-09-22 03:36:45 +03:00
|
|
|
import * as Social from "~/node_common/social";
|
2020-10-09 21:46:42 +03:00
|
|
|
import * as Strings from "~/common/strings";
|
|
|
|
import * as Logs from "~/node_common/script-logging";
|
2020-08-01 03:17:07 +03:00
|
|
|
|
2020-10-09 03:40:26 +03:00
|
|
|
import AbortController from "abort-controller";
|
2020-10-10 04:22:03 +03:00
|
|
|
import BusBoyConstructor from "busboy";
|
2020-10-09 21:46:42 +03:00
|
|
|
import Queue from "p-queue";
|
2020-08-16 02:42:26 +03:00
|
|
|
|
2020-10-10 04:22:03 +03:00
|
|
|
const WORKER_NAME = "BROWSER->RENDER->TEXTILE";
|
2020-08-16 12:23:02 +03:00
|
|
|
const HIGH_WATER_MARK = 1024 * 1024 * 3;
|
2020-08-16 02:42:26 +03:00
|
|
|
|
2020-10-10 04:22:03 +03:00
|
|
|
export async function formMultipart(req, res, { user, bucketName }) {
|
|
|
|
console.log("\n\n\n");
|
2020-10-09 21:46:42 +03:00
|
|
|
|
2020-10-10 04:22:03 +03:00
|
|
|
const singleConcurrencyQueue = new Queue({ concurrency: 1 });
|
|
|
|
const controller = new AbortController();
|
2020-10-09 21:46:42 +03:00
|
|
|
const heapUsed = Strings.bytesToSize(process.memoryUsage().heapUsed);
|
2020-10-10 04:22:03 +03:00
|
|
|
const timeoutMap = {};
|
|
|
|
|
2020-10-10 08:12:28 +03:00
|
|
|
const { signal } = controller;
|
|
|
|
|
2020-08-17 07:22:35 +03:00
|
|
|
let data = null;
|
2020-10-09 21:46:42 +03:00
|
|
|
let dataPath = null;
|
|
|
|
|
2020-10-10 04:22:03 +03:00
|
|
|
Logs.taskTimeless(`${user.username} is pushing ${req.params.b}`, WORKER_NAME);
|
|
|
|
Logs.taskTimeless(`heap size is ${heapUsed}`, WORKER_NAME);
|
2020-08-16 02:42:26 +03:00
|
|
|
|
2020-10-09 21:46:42 +03:00
|
|
|
let {
|
|
|
|
buckets,
|
|
|
|
bucketKey,
|
|
|
|
bucketRoot,
|
|
|
|
} = await Utilities.getBucketAPIFromUserToken({
|
|
|
|
user,
|
|
|
|
bucketName,
|
|
|
|
});
|
2020-10-09 05:32:30 +03:00
|
|
|
|
2020-10-10 04:22:03 +03:00
|
|
|
if (!buckets) {
|
|
|
|
Logs.error("Utilities.getBucketAPIFromUserToken()");
|
|
|
|
return {
|
|
|
|
decorator: "UPLOAD_NO_BUCKETS",
|
|
|
|
error: true,
|
|
|
|
message: `No buckets for ${user.username}.`,
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
2020-10-10 08:12:28 +03:00
|
|
|
const busboy = new BusBoyConstructor({
|
|
|
|
headers: req.headers,
|
|
|
|
highWaterMark: HIGH_WATER_MARK,
|
|
|
|
});
|
|
|
|
|
|
|
|
const _createStreamAndUploadToTextile = async (writableStream) => {
|
2020-10-10 04:22:03 +03:00
|
|
|
return new Promise(function(resolvePromiseFn, rejectPromiseFn) {
|
|
|
|
function _safeForcedSingleConcurrencyFn(actionFn, rejectFn, timeoutId) {
|
|
|
|
singleConcurrencyQueue.add(async function() {
|
|
|
|
try {
|
|
|
|
await actionFn();
|
|
|
|
} catch (e) {
|
|
|
|
Logs.error(`${timeoutId} : queue.pause()`);
|
|
|
|
singleConcurrencyQueue.pause();
|
|
|
|
|
|
|
|
Logs.error(`${timeoutId} : controller.abort()`);
|
|
|
|
controller.abort();
|
|
|
|
|
|
|
|
Logs.error(`${timeoutId} : sendTextileSlackMessage()`);
|
|
|
|
Social.sendTextileSlackMessage({
|
|
|
|
file: "/node_common/upload.js",
|
|
|
|
user,
|
|
|
|
message: e.message,
|
|
|
|
code: e.code,
|
|
|
|
functionName: `${timeoutId} : _safeForcedSingleConcurrencyFn()`,
|
|
|
|
});
|
|
|
|
|
|
|
|
Logs.error(`${timeoutId} : req.unpipe()`);
|
|
|
|
req.unpipe();
|
|
|
|
|
|
|
|
Logs.error(
|
|
|
|
`${timeoutId} : rejectFn() of safeForcedSingleConcurrencyFn()`
|
|
|
|
);
|
2020-10-10 08:12:28 +03:00
|
|
|
|
2020-10-10 04:22:03 +03:00
|
|
|
return rejectFn({
|
|
|
|
decorator: "UPLOAD_FAILURE",
|
|
|
|
error: true,
|
|
|
|
message: e.message,
|
|
|
|
id: timeoutId,
|
|
|
|
});
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
// NOTE(jim)
|
|
|
|
//
|
2020-10-10 08:12:28 +03:00
|
|
|
// stream - ReadableStream constructor
|
|
|
|
// mime - */* file type
|
|
|
|
// filename - filename reference for extension later.
|
|
|
|
//
|
|
|
|
//
|
|
|
|
writableStream.on("file", function(
|
2020-10-10 04:22:03 +03:00
|
|
|
fieldname,
|
|
|
|
stream,
|
|
|
|
filename,
|
|
|
|
encoding,
|
|
|
|
mime
|
|
|
|
) {
|
|
|
|
const timeoutId = `${user.username}-${filename}`;
|
|
|
|
|
|
|
|
data = LibraryManager.createLocalDataIncomplete({
|
|
|
|
name: filename,
|
|
|
|
type: mime,
|
|
|
|
});
|
|
|
|
|
|
|
|
return _safeForcedSingleConcurrencyFn(
|
2020-10-09 21:46:42 +03:00
|
|
|
async () => {
|
2020-10-10 08:12:28 +03:00
|
|
|
let push = await buckets
|
|
|
|
.pushPath(bucketKey, data.id, stream, {
|
|
|
|
root: bucketRoot,
|
|
|
|
signal,
|
|
|
|
progress: function(num) {
|
|
|
|
if (num % (HIGH_WATER_MARK * 10) !== 0) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
Logs.note(`${timeoutId} : ${Strings.bytesToSize(num)}`);
|
|
|
|
},
|
|
|
|
})
|
|
|
|
.catch(function(e) {
|
|
|
|
throw new Error(e.message);
|
|
|
|
});
|
2020-10-10 04:22:03 +03:00
|
|
|
|
2020-10-09 21:46:42 +03:00
|
|
|
dataPath = push.path.path;
|
|
|
|
|
|
|
|
req.unpipe();
|
2020-10-10 04:22:03 +03:00
|
|
|
Logs.task(`${timeoutId} : req.unpipe()`, WORKER_NAME);
|
2020-10-09 21:46:42 +03:00
|
|
|
},
|
2020-10-10 04:22:03 +03:00
|
|
|
rejectPromiseFn,
|
|
|
|
timeoutId
|
2020-10-09 21:46:42 +03:00
|
|
|
);
|
2020-08-16 12:23:02 +03:00
|
|
|
});
|
2020-08-04 04:35:31 +03:00
|
|
|
|
2020-10-10 08:12:28 +03:00
|
|
|
writableStream.on("finish", function() {
|
2020-10-10 04:22:03 +03:00
|
|
|
return _safeForcedSingleConcurrencyFn(() => {
|
|
|
|
Logs.task("busboy finished");
|
|
|
|
|
|
|
|
if (Strings.isEmpty(dataPath)) {
|
|
|
|
return rejectPromiseFn({
|
|
|
|
decorator: "UPLOAD_FAILURE",
|
|
|
|
error: true,
|
|
|
|
message: "Missing Textile URL data.",
|
|
|
|
});
|
|
|
|
}
|
2020-10-09 21:46:42 +03:00
|
|
|
|
2020-10-10 04:22:03 +03:00
|
|
|
Logs.task(dataPath, WORKER_NAME);
|
2020-10-09 21:46:42 +03:00
|
|
|
|
2020-10-10 04:22:03 +03:00
|
|
|
return resolvePromiseFn({
|
|
|
|
decorator: "UPLOAD_STREAM_SUCCESS",
|
|
|
|
data: dataPath,
|
|
|
|
});
|
|
|
|
}, rejectPromiseFn);
|
2020-10-09 21:46:42 +03:00
|
|
|
});
|
2020-09-22 05:31:17 +03:00
|
|
|
|
2020-10-10 08:12:28 +03:00
|
|
|
writableStream.on("error", function(e) {
|
2020-10-10 04:22:03 +03:00
|
|
|
return _safeForcedSingleConcurrencyFn(() => {
|
|
|
|
throw new Error(e.message);
|
|
|
|
}, rejectPromiseFn);
|
2020-08-17 07:22:35 +03:00
|
|
|
});
|
2020-08-01 03:17:07 +03:00
|
|
|
|
2020-10-10 08:12:28 +03:00
|
|
|
Logs.task("req.pipe(writableStream)", WORKER_NAME);
|
|
|
|
req.pipe(writableStream);
|
2020-08-01 03:17:07 +03:00
|
|
|
});
|
2020-10-09 21:46:42 +03:00
|
|
|
};
|
2020-08-16 12:23:02 +03:00
|
|
|
|
2020-10-10 08:12:28 +03:00
|
|
|
let response = null;
|
|
|
|
try {
|
|
|
|
response = await _createStreamAndUploadToTextile(busboy);
|
|
|
|
} catch (e) {
|
|
|
|
Logs.error(e.message);
|
|
|
|
res.set("Connection", "close");
|
|
|
|
|
|
|
|
return response;
|
|
|
|
}
|
|
|
|
|
|
|
|
Logs.task("response", WORKER_NAME);
|
2020-10-09 21:46:42 +03:00
|
|
|
console.log(response);
|
2020-08-17 07:22:35 +03:00
|
|
|
|
2020-09-22 05:31:17 +03:00
|
|
|
if (response && response.error) {
|
2020-10-10 04:22:03 +03:00
|
|
|
res.set("Connection", "close");
|
|
|
|
|
2020-08-17 07:22:35 +03:00
|
|
|
return response;
|
|
|
|
}
|
|
|
|
|
2020-10-10 04:22:03 +03:00
|
|
|
Logs.note("non-essential Utilities.getBucketAPIFromuserToken()");
|
2020-10-09 21:46:42 +03:00
|
|
|
let refreshed = await Utilities.getBucketAPIFromUserToken({
|
2020-09-23 14:17:56 +03:00
|
|
|
user,
|
|
|
|
bucketName,
|
|
|
|
});
|
2020-09-22 10:01:48 +03:00
|
|
|
|
2020-10-09 21:46:42 +03:00
|
|
|
if (!refreshed.buckets) {
|
2020-10-10 08:12:28 +03:00
|
|
|
Logs.error("Utilities.getBucketAPIFromuserToken() failed");
|
2020-09-22 10:01:48 +03:00
|
|
|
return {
|
2020-10-09 21:46:42 +03:00
|
|
|
decorator: "UPLOAD_FAILURE",
|
2020-09-22 10:01:48 +03:00
|
|
|
error: true,
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
2020-08-17 07:22:35 +03:00
|
|
|
try {
|
2020-10-09 21:46:42 +03:00
|
|
|
const newUpload = await refreshed.buckets.listIpfsPath(response.data);
|
2020-08-17 07:22:35 +03:00
|
|
|
data.size = newUpload.size;
|
2020-10-09 21:46:42 +03:00
|
|
|
|
|
|
|
Logs.task(
|
2020-10-10 08:12:28 +03:00
|
|
|
`${data.name} : ${Strings.bytesToSize(data.size)} uploaded`,
|
2020-10-10 04:22:03 +03:00
|
|
|
WORKER_NAME
|
2020-10-09 21:46:42 +03:00
|
|
|
);
|
2020-08-17 07:22:35 +03:00
|
|
|
} catch (e) {
|
2020-09-22 03:36:45 +03:00
|
|
|
Social.sendTextileSlackMessage({
|
|
|
|
file: "/node_common/upload.js",
|
|
|
|
user,
|
|
|
|
message: e.message,
|
|
|
|
code: e.code,
|
2020-10-10 08:12:28 +03:00
|
|
|
functionName: `refreshed.buckets.listIpfsPath`,
|
2020-09-22 03:36:45 +03:00
|
|
|
});
|
|
|
|
|
2020-08-17 07:22:35 +03:00
|
|
|
return {
|
2020-10-09 21:46:42 +03:00
|
|
|
decorator: "UPLOAD_VERIFY_FAILURE",
|
2020-08-17 07:22:35 +03:00
|
|
|
error: true,
|
2020-10-07 13:02:08 +03:00
|
|
|
message: e.message,
|
2020-08-17 07:22:35 +03:00
|
|
|
};
|
|
|
|
}
|
|
|
|
|
2020-10-10 04:22:03 +03:00
|
|
|
Logs.task(`SUCCESS !!!`, WORKER_NAME);
|
|
|
|
|
|
|
|
console.log(`\n\n\n`);
|
2020-10-09 21:46:42 +03:00
|
|
|
return { decorator: "UPLOAD_SUCCESS", data, ipfs: response.data };
|
2020-10-10 04:22:03 +03:00
|
|
|
}
|