diff --git a/ghost/members-importer/lib/bulk-operations.js b/ghost/members-importer/lib/bulk-operations.js deleted file mode 100644 index 68f5411d02..0000000000 --- a/ghost/members-importer/lib/bulk-operations.js +++ /dev/null @@ -1,81 +0,0 @@ -const _ = require('lodash'); -const db = require('../../../data/db'); - -const CHUNK_SIZE = 100; - -async function insertChunkSequential(table, chunk, result) { - for (const record of chunk) { - try { - await db.knex(table).insert(record); - result.successful += 1; - } catch (err) { - result.errors.push(err); - result.unsuccessfulIds.push(record.id); - result.unsuccessful += 1; - } - } -} - -async function insertChunk(table, chunk, result) { - try { - await db.knex(table).insert(chunk); - result.successful += chunk.length; - } catch (err) { - await insertChunkSequential(table, chunk, result); - } -} - -async function insert(table, data) { - const result = { - successful: 0, - unsuccessful: 0, - unsuccessfulIds: [], - errors: [] - }; - - for (const chunk of _.chunk(data, CHUNK_SIZE)) { - await insertChunk(table, chunk, result); - } - - return result; -} - -async function delChunkSequential(table, chunk, result) { - for (const record of chunk) { - try { - await db.knex(table).where('id', record).del(); - result.successful += 1; - } catch (err) { - result.errors.push(err); - result.unsuccessfulIds.push(record); - result.unsuccessful += 1; - } - } -} - -async function delChunk(table, chunk, result) { - try { - await db.knex(table).whereIn('id', chunk).del(); - result.successful += chunk.length; - } catch (err) { - await delChunkSequential(table, chunk, result); - } -} - -async function del(table, ids) { - const result = { - successful: 0, - unsuccessful: 0, - unsuccessfulIds: [], - errors: [] - }; - - for (const chunk of _.chunk(ids, CHUNK_SIZE)) { - await delChunk(table, chunk, result); - } - - return result; -} - -module.exports.insert = insert; -module.exports.del = del; diff --git a/ghost/members-importer/lib/index.js b/ghost/members-importer/lib/index.js index 74d950a2da..9bc69177c8 100644 --- a/ghost/members-importer/lib/index.js +++ b/ghost/members-importer/lib/index.js @@ -3,18 +3,26 @@ const uuid = require('uuid'); const ObjectId = require('bson-objectid'); const moment = require('moment-timezone'); const errors = require('@tryghost/errors'); +const debug = require('ghost-ignition').debug('importer:members'); const membersService = require('../index'); -const bulkOperations = require('./bulk-operations'); const models = require('../../../models'); const {i18n} = require('../../../lib/common'); const logging = require('../../../../shared/logging'); -const doImport = async ({members, allLabelModels, importSetLabels, createdBy}) => { - const createInserter = table => data => bulkOperations.insert(table, data); - const createDeleter = table => data => bulkOperations.del(table, data); +const handleUnrecognizedError = (error) => { + if (!errors.utils.isIgnitionError(error)) { + return new errors.DataImportError({ + message: error.message, + context: error.context, + err: error + }); + } else { + return error; + } +}; - const deleteMembers = createDeleter('members'); - const insertLabelAssociations = createInserter('members_labels'); +const doImport = async ({members, allLabelModels, importSetLabels, createdBy}) => { + debug(`Importing members: ${members.length}, labels: ${allLabelModels.length}, import lables: ${importSetLabels.length}, createdBy: ${createdBy}`); let { invalidMembers, @@ -26,16 +34,34 @@ const doImport = async ({members, allLabelModels, importSetLabels, createdBy}) = // NOTE: member insertion has to happen before the rest of insertions to handle validation // errors - remove failed members from label/stripe sets + debug(`Starting insert of ${membersToInsert.length} members`); const insertedMembers = await models.Member.bulkAdd(membersToInsert).then((insertResult) => { - if (insertResult.unsuccessfulIds.length) { + if (insertResult.unsuccessfulRecords.length) { + const unsuccessfulIds = insertResult.unsuccessfulRecords.map(r => r.id); + labelAssociationsToInsert = labelAssociationsToInsert - .filter(la => !insertResult.unsuccessfulIds.includes(la.member_id)); + .filter(la => !unsuccessfulIds.includes(la.member_id)); stripeCustomersToFetch = stripeCustomersToFetch - .filter(sc => !insertResult.unsuccessfulIds.includes(sc.member_id)); + .filter(sc => !unsuccessfulIds.includes(sc.member_id)); stripeCustomersToCreate = stripeCustomersToCreate - .filter(sc => !insertResult.unsuccessfulIds.includes(sc.member_id)); + .filter(sc => !unsuccessfulIds.includes(sc.member_id)); + } + + debug(`Finished inserting members with ${insertResult.errors.length} errors`); + if (insertResult.errors.length) { + insertResult.errors = insertResult.errors.map((error) => { + if (error.code === 'ER_DUP_ENTRY') { + return new errors.ValidationError({ + message: i18n.t('errors.models.member.memberAlreadyExists.message'), + context: i18n.t('errors.models.member.memberAlreadyExists.context'), + err: error + }); + } else { + return handleUnrecognizedError(error); + } + }); } return insertResult; @@ -43,16 +69,41 @@ const doImport = async ({members, allLabelModels, importSetLabels, createdBy}) = const fetchedStripeCustomersPromise = fetchStripeCustomers(stripeCustomersToFetch); const createdStripeCustomersPromise = createStripeCustomers(stripeCustomersToCreate); - const insertedLabelsPromise = insertLabelAssociations(labelAssociationsToInsert); + + debug(`Starting insert of ${labelAssociationsToInsert.length} label associations`); + const insertedLabelsPromise = models.Base.Model.bulkAdd(labelAssociationsToInsert, 'members_labels') + .then((insertResult) => { + debug(`Finished inserting member label associations with ${insertResult.errors.length} errors`); + return insertResult; + }); const insertedCustomersPromise = Promise.all([ fetchedStripeCustomersPromise, createdStripeCustomersPromise ]).then( ([fetchedStripeCustomers, createdStripeCustomers]) => { - return models.MemberStripeCustomer.bulkAdd( - fetchedStripeCustomers.customersToInsert.concat(createdStripeCustomers.customersToInsert) - ); + const stripeCustomersToInsert = fetchedStripeCustomers.customersToInsert.concat(createdStripeCustomers.customersToInsert); + + debug(`Starting insert of ${stripeCustomersToInsert.length} stripe customers`); + return models.MemberStripeCustomer.bulkAdd(stripeCustomersToInsert).then((insertResult) => { + debug(`Finished inserting stripe customers with ${insertResult.errors.length} errors`); + + if (insertResult.errors.length) { + insertResult.errors = insertResult.errors.map((error) => { + if (error.code === 'ER_DUP_ENTRY') { + return new errors.ValidationError({ + message: i18n.t('errors.models.member_stripe_customer.customerAlreadyExists.message'), + context: i18n.t('errors.models.member_stripe_customer.customerAlreadyExists.context'), + err: error + }); + } else { + return handleUnrecognizedError(error); + } + }); + } + + return insertResult; + }); } ); @@ -61,18 +112,54 @@ const doImport = async ({members, allLabelModels, importSetLabels, createdBy}) = createdStripeCustomersPromise, insertedCustomersPromise ]).then( - ([fetchedStripeCustomers, createdStripeCustomers]) => models.StripeCustomerSubscription.bulkAdd( - fetchedStripeCustomers.subscriptionsToInsert.concat(createdStripeCustomers.subscriptionsToInsert) - ) + ([fetchedStripeCustomers, createdStripeCustomers, insertedCustomersResult]) => { + let subscriptionsToInsert = fetchedStripeCustomers.subscriptionsToInsert.concat(createdStripeCustomers.subscriptionsToInsert); + + if (insertedCustomersResult.unsuccessfulRecords.length) { + const unsuccessfulCustomerIds = insertedCustomersResult.unsuccessfulRecords.map(r => r.customer_id); + subscriptionsToInsert = subscriptionsToInsert.filter(s => !unsuccessfulCustomerIds.includes(s.customer_id)); + } + + debug(`Starting insert of ${subscriptionsToInsert.length} stripe customer subscriptions`); + return models.StripeCustomerSubscription.bulkAdd(subscriptionsToInsert) + .then((insertResult) => { + debug(`Finished inserting stripe customer subscriptions with ${insertResult.errors.length} errors`); + + if (insertResult.errors.length) { + insertResult.errors = insertResult.errors.map((error) => { + if (error.code === 'ER_DUP_ENTRY') { + return new errors.ValidationError({ + message: i18n.t('errors.models.stripe_customer_subscription.subscriptionAlreadyExists.message'), + context: i18n.t('errors.models.stripe_customer_subscription.subscriptionAlreadyExists.context'), + err: error + }); + } else { + return handleUnrecognizedError(error); + } + }); + } + + return insertResult; + }); + } ); const deletedMembersPromise = Promise.all([ fetchedStripeCustomersPromise, - createdStripeCustomersPromise + createdStripeCustomersPromise, + insertedCustomersPromise, + insertedSubscriptionsPromise ]).then( - ([fetchedStripeCustomers, createdStripeCustomers]) => deleteMembers( - fetchedStripeCustomers.membersToDelete.concat(createdStripeCustomers.membersToDelete) - ) + ([fetchedStripeCustomers, createdStripeCustomers, insertedStripeCustomers, insertedStripeSubscriptions]) => { + const memberIds = [ + ...fetchedStripeCustomers.membersToDelete, + ...createdStripeCustomers.membersToDelete, + ...insertedStripeCustomers.unsuccessfulRecords.map(r => r.member_id), + ...insertedStripeSubscriptions.unsuccessfulRecords.map(r => r.member_id) + ]; + + return models.Member.bulkDestroy(memberIds); + } ); // This looks sequential, but at the point insertedCustomersPromise has resolved so have all the others @@ -82,26 +169,33 @@ const doImport = async ({members, allLabelModels, importSetLabels, createdBy}) = const fetchedCustomers = await fetchedStripeCustomersPromise; const insertedLabels = await insertedLabelsPromise; + const allErrors = [ + ...insertedMembers.errors, + ...insertedCustomers.errors, + ...insertedSubscriptions.errors, + ...insertedLabels.errors, + ...fetchedCustomers.errors + ]; + const importedCount = insertedMembers.successful - deletedMembers.successful; + const invalidCount = insertedMembers.unsuccessful + invalidMembers.length + deletedMembers.successful + deletedMembers.unsuccessful; + + debug(`Finished members import with ${importedCount} imported, ${invalidCount} invalid and ${allErrors.length} errors`); + const result = { imported: { - count: insertedMembers.successful - deletedMembers.successful + count: importedCount }, invalid: { - count: insertedMembers.unsuccessful + deletedMembers.unsuccessful + invalidMembers.length, - errors: [ - ...insertedMembers.errors, - ...insertedCustomers.errors, - ...insertedSubscriptions.errors, - ...insertedLabels.errors, - ...fetchedCustomers.errors - ] + count: invalidCount, + errors: allErrors } }; // Allow logging to happen outside of the request cycle process.nextTick(() => { - // @TODO wrap errors with validation errors (or whatever is reasonable) result.invalid.errors.forEach(err => logging.error(err)); + deletedMembers.errors.forEach(err => logging.error(err)); + insertedLabels.errors.forEach(err => logging.error(err)); }); return result; @@ -231,6 +325,7 @@ async function createStripeCustomers(stripeCustomersToCreate) { membersToDelete: [] }; + debug(`Creating Stripe customers for ${stripeCustomersToCreate.length} records`); await Promise.all(stripeCustomersToCreate.map(async function createStripeCustomer(customerToCreate) { try { const customer = await membersService.api.members.createStripeCustomer({ @@ -274,15 +369,22 @@ async function createStripeCustomers(stripeCustomersToCreate) { }); } catch (error) { if (error.message.indexOf('customer') && error.code === 'resource_missing') { - error.message = `Member not imported. ${error.message}`; - error.context = i18n.t('errors.api.members.stripeCustomerNotFound.context'); - error.help = i18n.t('errors.api.members.stripeCustomerNotFound.help'); + result.errors.push(new errors.NotFoundError({ + message: `Member not imported. ${error.message}`, + context: i18n.t('errors.api.members.stripeCustomerNotFound.context'), + help: i18n.t('errors.api.members.stripeCustomerNotFound.help'), + err: error, + errorDetails: JSON.stringify(customerToCreate) + })); + } else { + result.errors.push(handleUnrecognizedError(error)); } - result.errors.push(error); + result.membersToDelete.push(customerToCreate.member_id); } })); + debug(`Finished creating Stripe customers with ${result.errors.length} errors`); return result; } @@ -294,6 +396,8 @@ async function fetchStripeCustomers(stripeCustomersToInsert) { membersToDelete: [] }; + debug(`Fetching Stripe customers for ${stripeCustomersToInsert.length} records`); + await Promise.all(stripeCustomersToInsert.map(async function fetchStripeCustomer(customer) { try { const fetchedCustomer = await membersService.api.members.getStripeCustomer(customer.customer_id, { @@ -336,15 +440,22 @@ async function fetchStripeCustomers(stripeCustomersToInsert) { }); } catch (error) { if (error.message.indexOf('customer') && error.code === 'resource_missing') { - error.message = `Member not imported. ${error.message}`; - error.context = i18n.t('errors.api.members.stripeCustomerNotFound.context'); - error.help = i18n.t('errors.api.members.stripeCustomerNotFound.help'); + result.errors.push(new errors.NotFoundError({ + message: `Member not imported. ${error.message}`, + context: i18n.t('errors.api.members.stripeCustomerNotFound.context'), + help: i18n.t('errors.api.members.stripeCustomerNotFound.help'), + err: error, + errorDetails: JSON.stringify(customer) + })); + } else { + result.errors.push(handleUnrecognizedError(error)); } - result.errors.push(error); + result.membersToDelete.push(customer.member_id); } })); + debug(`Finished fetching Stripe customers with ${result.errors.length} errors`); return result; }