slate/scripts/worker-storage-deals.js

257 lines
7.1 KiB
JavaScript

import "isomorphic-fetch";
import * as Environment from "~/node_common/environment";
import * as Constants from "~/node_common/constants";
import * as Strings from "~/common/strings";
import * as Logging from "~/common/logging";
import configs from "~/knexfile";
import knex from "knex";
import { Buckets, PrivateKey, Filecoin, Client, ThreadID } from "@textile/hub";
const envConfig = configs["development"];
const DB = knex(envConfig);
const TEXTILE_KEY_INFO = {
key: Environment.TEXTILE_HUB_KEY,
secret: Environment.TEXTILE_HUB_SECRET,
};
const recordTextileBucketInfoProduction = async (props) => {
const users = await DB.select("id", "data").from("users").where({ textileToken: null });
let userUpdates = [];
let i = 0;
for (let user of users) {
if (i % 50 === 0) {
console.log(i);
if (userUpdates.length) {
await pushUserUpdates(userUpdates);
userUpdates = [];
}
}
i += 1;
try {
const textileKey = user.data?.tokens?.api;
if (!textileKey) {
console.log(`ERROR: user ${user.id} does not have textile key`);
return;
}
let buckets = await Buckets.withKeyInfo(TEXTILE_KEY_INFO);
const identity = PrivateKey.fromString(textileKey);
const textileToken = await buckets.getToken(identity);
buckets.context.withToken(textileToken);
const client = new Client(buckets.context);
const res = await client.getThread("buckets");
const textileThreadID =
typeof res.id === "string" ? res.id : ThreadID.fromBytes(res.id).toString();
buckets.context.withThread(textileThreadID);
const roots = await buckets.list();
const existing = roots.find((bucket) => bucket.name === Constants.textile.mainBucket);
if (!existing) {
console.log(`ERROR ${user.id} missing existing bucket. Listed buckets:`);
console.log(roots);
}
let ipfs = existing.path;
const textileBucketCID = Strings.ipfsToCid(ipfs);
if (!textileToken || !textileThreadID || !textileBucketCID) {
console.log(`ERROR ${user.id} missing some value`);
return;
}
// console.log({ textileToken, textileThreadID, textileBucketCID });
userUpdates.push({
id: user.id,
textileKey,
textileToken,
textileThreadID,
textileBucketCID,
});
} catch (e) {
console.log(e);
}
}
if (userUpdates.length) {
await pushUserUpdates(userUpdates);
}
console.log("SCRIPT FINISHED");
};
const pushUserUpdates = async (userUpdates) => {
let query = userUpdates.map((user) => "(?::uuid, ?, ?, ?, ?)").join(", ");
let values = [];
for (let user of userUpdates) {
values.push(
user.id,
user.textileKey,
user.textileToken,
user.textileThreadID,
user.textileBucketCID
);
}
await DB.raw(
`UPDATE ?? as u SET ?? = ??, ?? = ??, ?? = ??, ?? = ?? from (values ${query}) as c(??, ??, ??, ??, ??) WHERE ?? = ??`,
[
"users",
"textileKey",
"c.textileKey",
"textileToken",
"c.textileToken",
"textileThreadID",
"c.textileThreadID",
"textileBucketCID",
"c.textileBucketCID",
...values,
"id",
"textileKey",
"textileToken",
"textileThreadID",
"textileBucketCID",
"c.id",
"u.id",
]
);
};
const recordTextileBucketInfo = async (props) => {
const users = await DB.select("users.id", "users.textileKey")
.from("users")
.where({ "users.textileToken": null });
const userUpdates = [];
for (let user of users) {
try {
let buckets = await Buckets.withKeyInfo(TEXTILE_KEY_INFO);
const identity = PrivateKey.fromString(user.textileKey);
const textileToken = await buckets.getToken(identity);
buckets.context.withToken(textileToken);
const client = new Client(buckets.context);
const res = await client.getThread("buckets");
const textileThreadID =
typeof res.id === "string" ? res.id : ThreadID.fromBytes(res.id).toString();
buckets.context.withThread(textileThreadID);
const roots = await buckets.list();
const existing = roots.find((bucket) => bucket.name === Constants.textile.mainBucket);
let ipfs = existing.path;
const textileBucketCID = Strings.ipfsToCid(ipfs);
if (!textileToken || !textileThreadID || !textileBucketCID) {
console.log("ERROR missing some value");
continue;
}
// console.log({ textileToken, textileThreadID, textileBucketCID });
userUpdates.push({ id: user.id, textileToken, textileThreadID, textileBucketCID });
} catch (e) {
console.log(e);
console.log(user.id);
}
}
let query = userUpdates.map((user) => "(?::uuid, ?, ?, ?)").join(", ");
let values = [];
for (let user of userUpdates) {
values.push(user.id, user.textileToken, user.textileThreadID, user.textileBucketCID);
}
await DB.raw(
`UPDATE ?? as u SET ?? = ??, ?? = ??, ?? = ?? from (values ${query}) as c(??, ??, ??, ??) WHERE ?? = ??`,
[
"users",
"textileToken",
"c.textileToken",
"textileThreadID",
"c.textileThreadID",
"textileBucketCID",
"c.textileBucketCID",
...values,
"id",
"textileToken",
"textileThreadID",
"textileBucketCID",
"c.id",
"u.id",
]
);
console.log("SCRIPT FINISHED");
};
const run = async (props) => {
let successful = [];
let i = 0;
const users = await DB.select("id", "textileBucketCID")
.from("users")
.whereExists(function () {
this.select("id")
.from("files")
.whereRaw('users.id = "files"."ownerId"')
.where("files.isLink", false);
})
.whereNotExists(function () {
this.select("id")
.from("deals")
.whereRaw('"users"."textileBucketCID" = "deals"."textileBucketCID"');
});
for (let user of users) {
if (i % 500 === 0) {
console.log(i);
if (successful.length) {
await DB.insert(successful).into("deals");
successful = [];
}
}
i += 1;
let json = await addToEstuary(user.textileBucketCID);
if (json?.pin?.cid && json?.requestid) {
successful.push({
textileBucketCID: user.textileBucketCID,
pinCID: json.pin.cid,
requestId: json.requestid,
});
} else {
console.log(`ERROR storage deal for ${user.id}`);
}
}
if (successful.length) {
await DB.insert(successful).into("deals");
}
console.log("SCRIPT FINISHED");
};
const addToEstuary = async (cid) => {
try {
let res = await fetch("https://api.estuary.tech/content/add-ipfs", {
method: "POST",
headers: {
Authorization: `Bearer ${Environment.ESTUARY_API_KEY}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
name: "",
root: cid,
}),
});
let json = await res.json();
return json;
} catch (e) {
Logging.error({
error: e,
decorator: "ADD_CID_TO_ESTUARY",
});
}
console.log("SCRIPT FINISHED");
};
// recordTextileBucketInfoProduction();
// recordTextileBucketInfo();
run();
// addToEstuary();