Updated data generator to support >2M members (#19484)

no issue

The data generator went out of memory when trying to generate fake data
for > 2M members. This adds some improvements to make sure it doesn't go
out of memory.

---------

Co-authored-by: Fabien "egg" O'Carroll <fabien@allou.is>
This commit is contained in:
Simon Backx 2024-01-15 16:23:49 +01:00 committed by GitHub
parent 709a0cf3c4
commit 285a684ef6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 243 additions and 182 deletions

View File

@ -32,7 +32,8 @@ class DataGenerator {
printDependencies,
withDefault,
seed,
quantities = {}
quantities = {},
useTransaction = true
}) {
this.knex = knex;
this.tableList = tables || [];
@ -46,6 +47,7 @@ class DataGenerator {
this.printDependencies = printDependencies;
this.seed = seed;
this.quantities = quantities;
this.useTransaction = useTransaction;
}
sortTableList() {
@ -185,13 +187,26 @@ class DataGenerator {
process.exit(0);
}
if (this.useTransaction) {
await this.knex.transaction(async (transaction) => {
// Performance improvements
if (!DatabaseInfo.isSQLite(this.knex)) {
await transaction.raw('SET autocommit=0;');
}
await this.#run(transaction);
}, {isolationLevel: 'read committed'});
} else {
await this.#run(this.knex);
}
this.logger.info(`Completed data import in ${((Date.now() - start) / 1000).toFixed(1)}s`);
}
async #run(transaction) {
if (!DatabaseInfo.isSQLite(this.knex)) {
await transaction.raw('ALTER INSTANCE DISABLE INNODB REDO_LOG;');
await transaction.raw('SET FOREIGN_KEY_CHECKS=0;');
await transaction.raw('SET unique_checks=0;');
await transaction.raw('SET autocommit=0;');
await transaction.raw('SET GLOBAL local_infile=1;');
}
@ -255,9 +270,6 @@ class DataGenerator {
});
await tableImporter.finalise();
}
}, {isolationLevel: 'read committed'});
this.logger.info(`Completed data import in ${((Date.now() - start) / 1000).toFixed(1)}s`);
}
}

View File

@ -39,6 +39,10 @@ class EmailRecipientsImporter extends TableImporter {
}
async import(quantity) {
if (quantity === 0) {
return;
}
const now = Date.now();
const emails = await this.transaction
.select(

View File

@ -13,6 +13,10 @@ class EmailsImporter extends TableImporter {
}
async import(quantity) {
if (quantity === 0) {
return;
}
const posts = await this.transaction.select('id', 'title', 'published_at').from('posts').where('type', 'post').where('status', 'published').orderBy('published_at', 'desc');
this.newsletters = await this.transaction.select('id').from('newsletters').orderBy('sort_order');
this.membersSubscribeEvents = await this.transaction.select('id', 'newsletter_id', 'created_at').from('members_subscribe_events');

View File

@ -76,7 +76,7 @@ class MembersImporter extends TableImporter {
uuid: faker.datatype.uuid(),
transient_id: faker.datatype.uuid(),
email: `${name.replace(' ', '.').replace(/[^a-zA-Z0-9]/g, '').toLowerCase()}${faker.datatype.number({min: 0, max: 999999})}@example.com`,
status: luck(5) ? 'comped' : luck(25) ? 'paid' : 'free',
status: luck(5) ? 'comped' : luck(15) ? 'paid' : 'free',
name: name,
expertise: luck(30) ? faker.name.jobTitle() : undefined,
geolocation: JSON.stringify({

View File

@ -12,9 +12,25 @@ class MembersLoginEventsImporter extends TableImporter {
}
async import(quantity) {
const members = await this.transaction.select('id', 'created_at').from('members');
if (quantity === 0) {
return;
}
let offset = 0;
let limit = 100000;
// eslint-disable-next-line no-constant-condition
while (true) {
const members = await this.transaction.select('id', 'created_at').from('members').limit(limit).offset(offset);
if (members.length === 0) {
break;
}
await this.importForEach(members, quantity ? quantity / members.length : 5);
offset += limit;
}
}
setReferencedModel(model) {

View File

@ -9,9 +9,21 @@ class MembersNewslettersImporter extends TableImporter {
}
async import(quantity) {
const membersSubscribeEvents = await this.transaction.select('member_id', 'newsletter_id').from('members_subscribe_events');
let offset = 0;
let limit = 100000;
// eslint-disable-next-line no-constant-condition
while (true) {
const membersSubscribeEvents = await this.transaction.select('member_id', 'newsletter_id').from('members_subscribe_events').limit(limit).offset(offset);
if (membersSubscribeEvents.length === 0) {
break;
}
await this.importForEach(membersSubscribeEvents, quantity ? quantity / membersSubscribeEvents.length : 1);
offset += limit;
}
}
generate() {

View File

@ -9,14 +9,26 @@ class MembersPaidSubscriptionEventsImporter extends TableImporter {
}
async import() {
const subscriptions = await this.transaction.select('id', 'customer_id', 'plan_currency', 'plan_amount', 'created_at', 'plan_id', 'status', 'cancel_at_period_end', 'current_period_end').from('members_stripe_customers_subscriptions');
const membersStripeCustomers = await this.transaction.select('id', 'member_id', 'customer_id').from('members_stripe_customers');
let offset = 0;
let limit = 1000;
// eslint-disable-next-line no-constant-condition
while (true) {
const subscriptions = await this.transaction.select('id', 'customer_id', 'plan_currency', 'plan_amount', 'created_at', 'plan_id', 'status', 'cancel_at_period_end', 'current_period_end').from('members_stripe_customers_subscriptions').limit(limit).offset(offset);
if (subscriptions.length === 0) {
break;
}
const membersStripeCustomers = await this.transaction.select('id', 'member_id', 'customer_id').from('members_stripe_customers').whereIn('customer_id', subscriptions.map(subscription => subscription.customer_id));
this.membersStripeCustomers = new Map();
for (const customer of membersStripeCustomers) {
this.membersStripeCustomers.set(customer.customer_id, customer);
}
await this.importForEach(subscriptions, 2);
offset += limit;
}
}
setReferencedModel(model) {

View File

@ -11,9 +11,20 @@ class MembersStatusEventsImporter extends TableImporter {
}
async import(quantity) {
const members = await this.transaction.select('id', 'created_at', 'status').from('members');
let offset = 0;
let limit = 100000;
// eslint-disable-next-line no-constant-condition
while (true) {
const members = await this.transaction.select('id', 'created_at', 'status').from('members').limit(limit).offset(offset);
if (members.length === 0) {
break;
}
await this.importForEach(members, quantity ? quantity / members.length : 2);
offset += limit;
}
}
setReferencedModel(model) {

View File

@ -10,9 +10,24 @@ class MembersStripeCustomersImporter extends TableImporter {
}
async import(quantity) {
const members = await this.transaction.select('id', 'name', 'email', 'created_at', 'status').from('members');
if (quantity === 0) {
return;
}
let offset = 0;
let limit = 100000;
// eslint-disable-next-line no-constant-condition
while (true) {
const members = await this.transaction.select('id', 'name', 'email', 'created_at', 'status').from('members').limit(limit).offset(offset);
if (members.length === 0) {
break;
}
await this.importForEach(members, quantity ? quantity / members.length : 1);
offset += limit;
}
}
generate() {
@ -21,7 +36,7 @@ class MembersStripeCustomersImporter extends TableImporter {
// The number should increase the older the member is
const daysSinceMemberCreated = Math.floor((new Date() - new Date(this.model.created_at)) / (1000 * 60 * 60 * 24));
const shouldHaveStripeCustomer = faker.datatype.number({min: 0, max: 100}) < Math.max(Math.min(daysSinceMemberCreated / 30, 30), 5);
const shouldHaveStripeCustomer = faker.datatype.number({min: 0, max: 100}) < Math.max(Math.min(daysSinceMemberCreated / 60, 15), 2);
if (!shouldHaveStripeCustomer) {
return;

View File

@ -13,24 +13,44 @@ class MembersStripeCustomersSubscriptionsImporter extends TableImporter {
}
async import() {
const membersProducts = await this.transaction.select('member_id', 'product_id').from('members_products');
this.members = await this.transaction.select('id', 'status', 'created_at').from('members');//.where('status', 'paid');
const membersStripeCustomers = await this.transaction.select('id', 'member_id', 'customer_id').from('members_stripe_customers');
let offset = 0;
let limit = 5000;
this.products = await this.transaction.select('id', 'name').from('products').whereNot('type', 'free');
this.stripeProducts = await this.transaction.select('id', 'product_id', 'stripe_product_id').from('stripe_products');
this.stripePrices = await this.transaction.select('id', 'nickname', 'stripe_product_id', 'stripe_price_id', 'amount', 'interval', 'currency').from('stripe_prices');
// eslint-disable-next-line no-constant-condition
while (true) {
const membersStripeCustomers = await this.transaction.select('id', 'member_id', 'customer_id').from('members_stripe_customers').limit(limit).offset(offset);
if (membersStripeCustomers.length === 0) {
break;
}
this.members = await this.transaction.select('id', 'status', 'created_at').from('members').whereIn('id', membersStripeCustomers.map(m => m.member_id));
if (this.members.length === 0) {
continue;
}
const membersProducts = await this.transaction.select('member_id', 'product_id').from('members_products').whereIn('member_id', this.members.map(member => member.id));
//const membersStripeCustomers = await this.transaction.select('id', 'member_id', 'customer_id').from('members_stripe_customers').whereIn('member_id', this.members.map(member => member.id));
this.membersStripeCustomers = new Map();
for (const customer of membersStripeCustomers) {
this.membersStripeCustomers.set(customer.member_id, customer);
}
this.membersProducts = new Map();
for (const product of membersProducts) {
this.membersProducts.set(product.member_id, product);
}
await this.importForEach(this.members, 2);
await this.importForEach(this.members, 1.2);
offset += limit;
}
}
setReferencedModel(model) {

View File

@ -12,10 +12,25 @@ class MembersSubscribeEventsImporter extends TableImporter {
}
async import(quantity) {
const members = await this.transaction.select('id', 'created_at', 'status').from('members');
if (quantity === 0) {
return;
}
let offset = 0;
let limit = 100000;
this.newsletters = await this.transaction.select('id').from('newsletters').orderBy('sort_order');
// eslint-disable-next-line no-constant-condition
while (true) {
const members = await this.transaction.select('id', 'created_at', 'status').from('members').limit(limit).offset(offset);
if (members.length === 0) {
break;
}
await this.importForEach(members, quantity ? quantity / members.length : this.newsletters.length);
offset += limit;
}
}
setReferencedModel(model) {

View File

@ -11,16 +11,27 @@ class MembersSubscriptionCreatedEventsImporter extends TableImporter {
}
async import(quantity) {
const membersStripeCustomersSubscriptions = await this.transaction.select('id', 'created_at', 'customer_id').from('members_stripe_customers_subscriptions');
const membersStripeCustomers = await this.transaction.select('id', 'member_id', 'customer_id').from('members_stripe_customers');
let offset = 0;
let limit = 1000;
this.posts = await this.transaction.select('id', 'published_at', 'visibility', 'type', 'slug').from('posts').whereNotNull('published_at').where('visibility', 'public').orderBy('published_at', 'desc');
this.incomingRecommendations = await this.transaction.select('id', 'source', 'created_at').from('mentions');
// eslint-disable-next-line no-constant-condition
while (true) {
const membersStripeCustomersSubscriptions = await this.transaction.select('id', 'created_at', 'customer_id').from('members_stripe_customers_subscriptions').limit(limit).offset(offset);
if (membersStripeCustomersSubscriptions.length === 0) {
break;
}
const membersStripeCustomers = await this.transaction.select('id', 'member_id', 'customer_id').from('members_stripe_customers').whereIn('customer_id', membersStripeCustomersSubscriptions.map(subscription => subscription.customer_id));
this.membersStripeCustomers = new Map();
for (const memberStripeCustomer of membersStripeCustomers) {
this.membersStripeCustomers.set(memberStripeCustomer.customer_id, memberStripeCustomer);
}
await this.importForEach(membersStripeCustomersSubscriptions, quantity ? quantity / membersStripeCustomersSubscriptions.length : 1);
offset += limit;
}
}
generate() {

View File

@ -1,73 +0,0 @@
const {faker} = require('@faker-js/faker');
const generateEvents = require('../utils/event-generator');
const TableImporter = require('./TableImporter');
const dateToDatabaseString = require('../utils/database-date');
class SubscriptionsImporter extends TableImporter {
static table = 'subscriptions';
static dependencies = ['members', 'members_products', 'stripe_products', 'stripe_prices'];
constructor(knex, transaction) {
super(SubscriptionsImporter.table, knex, transaction);
}
async import() {
const membersProducts = await this.transaction.select('member_id', 'product_id').from('members_products');
this.members = await this.transaction.select('id', 'status', 'created_at').from('members').where('status', 'paid');
this.stripeProducts = await this.transaction.select('product_id', 'stripe_product_id').from('stripe_products');
this.stripePrices = await this.transaction.select('stripe_product_id', 'currency', 'amount', 'interval').from('stripe_prices');
await this.importForEach(membersProducts, 1);
}
generate() {
const member = this.members.find(m => m.id === this.model.member_id);
if (!member) {
return;
}
const status = member.status;
const billingInfo = {};
const isMonthly = faker.datatype.boolean();
if (status === 'paid') {
const stripeProduct = this.stripeProducts.find(product => product.product_id === this.model.product_id);
const stripePrice = this.stripePrices.find((price) => {
return price.stripe_product_id === stripeProduct.stripe_product_id &&
(isMonthly ? price.interval === 'month' : price.interval === 'year');
});
billingInfo.cadence = isMonthly ? 'month' : 'year';
billingInfo.currency = stripePrice.currency;
billingInfo.amount = stripePrice.amount;
}
const [startDate] = generateEvents({
total: 1,
trend: 'negative',
startTime: new Date(member.created_at),
endTime: new Date(),
shape: 'ease-out'
});
const endDate = new Date(startDate);
if (isMonthly) {
endDate.setMonth(new Date().getMonth());
if (endDate < new Date()) {
endDate.setMonth(endDate.getMonth() + 1);
}
} else {
endDate.setFullYear(new Date().getFullYear());
if (endDate < new Date()) {
endDate.setFullYear(endDate.getFullYear() + 1);
}
}
return Object.assign({}, {
id: this.fastFakeObjectId(),
type: status,
status: 'active',
member_id: this.model.member_id,
tier_id: this.model.product_id,
payment_provider: 'stripe',
expires_at: dateToDatabaseString(endDate),
created_at: dateToDatabaseString(startDate)
}, billingInfo);
}
}
module.exports = SubscriptionsImporter;

View File

@ -5,10 +5,9 @@ const createCsvWriter = require('csv-writer').createObjectCsvWriter;
const fs = require('fs');
const {luck} = require('../utils/random');
const os = require('os');
const crypto = require('crypto');
const logging = require('@tryghost/logging');
const errors = require('@tryghost/errors');
const ObjectID = require('bson-objectid').default;
let idIndex = 0;
class TableImporter {
/**
@ -34,9 +33,9 @@ class TableImporter {
}
fastFakeObjectId() {
// using faker.database.mongodbObjectId() is too slow (slow generation + MySQL is faster for ascending PRIMARY keys)
idIndex += 1;
return ObjectID.createFromTime(idIndex).toHexString();
// It is important that IDs are generated for a timestamp < NOW (for email batch sending) and that
// generating the ids is fast.
return `00000000` + crypto.randomBytes(8).toString('hex');
}
async #generateData(amount = this.defaultQuantity) {
@ -102,7 +101,7 @@ class TableImporter {
const filePath = path.join(rootFolder, `${this.name}.csv`);
let now = Date.now();
if (data.length > 1000) {
if (data.length > 5000) {
try {
await fs.promises.unlink(filePath);
} catch (e) {
@ -146,10 +145,13 @@ class TableImporter {
// Import from CSV file
const [result] = await this.transaction.raw(`LOAD DATA LOCAL INFILE '${filePath}' INTO TABLE \`${this.name}\` FIELDS TERMINATED BY ',' ENCLOSED BY '"' IGNORE 1 LINES (${Object.keys(data[0]).map(d => '`' + d + '`').join(',')});`);
if (result.affectedRows !== data.length) {
if (Math.abs(result.affectedRows - data.length) > 0.01 * data.length) {
throw new errors.InternalServerError({
message: `CSV import failed: expected ${data.length} imported rows, got ${result.affectedRows}`
});
}
logging.warn(`CSV import warning: expected ${data.length} imported rows, got ${result.affectedRows}.`);
}
} else {
await this.knex.batchInsert(this.name, data).transacting(this.transaction);
}

View File

@ -30,7 +30,7 @@
"fix": "yarn cache clean && rimraf -g '**/node_modules' && yarn",
"knex-migrator": "yarn workspace ghost run knex-migrator",
"setup": "yarn && git submodule update --init && NODE_ENV=development node .github/scripts/setup.js",
"reset:data": "cd ghost/core && node index.js generate-data --clear-database --quantities members:100000,posts:500 --seed 123",
"reset:data": "cd ghost/core && node index.js generate-data --clear-database --quantities members:2000000,posts:0,emails:0,members_stripe_customers:0,members_login_events:0,members_status_events:0 --seed 123",
"docker:reset": "docker-compose -f .github/scripts/docker-compose.yml down -v && docker-compose -f .github/scripts/docker-compose.yml up -d --wait",
"lint": "nx run-many -t lint",
"test": "nx run-many -t test",