Added error handling for failed member imports

no issue

- When bulk insert fails there is no transactional logic to revert
related records form being inserted. Also, previously there were no
attempts to "retry" the insert.
- To avoid complex retry logic, an iterative one-by-one insert retry
approach was taken. If this becomes a bottleneck in the future, the
retry algorithm could be improved.
- To avoid a lot of code duplication refactored model's `bulkAdd` & `bulkDestroy`
methods to use 'bulk-operations' module.
- Updated error handling and logging for bulk delete operations. It's very
unlikely for error to happen here,  but still need to make sure there is
a proper logging in place to trace back the failure.
- Added debug logs. This should improve debugging experience and
performance measurements.
- Added handling for unrecognized errors. Handling inspired by current unrecognized
error handling by ghost importer -10e5d5f3d4/core/server/data/importer/importers/data/base.js (L148-L154)
This commit is contained in:
Nazar Gargol 2020-08-20 20:24:05 +12:00 committed by naz
parent 4feaf49ca7
commit 4fe9d1536c
2 changed files with 150 additions and 120 deletions

View File

@ -1,81 +0,0 @@
const _ = require('lodash');
const db = require('../../../data/db');
const CHUNK_SIZE = 100;
async function insertChunkSequential(table, chunk, result) {
for (const record of chunk) {
try {
await db.knex(table).insert(record);
result.successful += 1;
} catch (err) {
result.errors.push(err);
result.unsuccessfulIds.push(record.id);
result.unsuccessful += 1;
}
}
}
async function insertChunk(table, chunk, result) {
try {
await db.knex(table).insert(chunk);
result.successful += chunk.length;
} catch (err) {
await insertChunkSequential(table, chunk, result);
}
}
async function insert(table, data) {
const result = {
successful: 0,
unsuccessful: 0,
unsuccessfulIds: [],
errors: []
};
for (const chunk of _.chunk(data, CHUNK_SIZE)) {
await insertChunk(table, chunk, result);
}
return result;
}
async function delChunkSequential(table, chunk, result) {
for (const record of chunk) {
try {
await db.knex(table).where('id', record).del();
result.successful += 1;
} catch (err) {
result.errors.push(err);
result.unsuccessfulIds.push(record);
result.unsuccessful += 1;
}
}
}
async function delChunk(table, chunk, result) {
try {
await db.knex(table).whereIn('id', chunk).del();
result.successful += chunk.length;
} catch (err) {
await delChunkSequential(table, chunk, result);
}
}
async function del(table, ids) {
const result = {
successful: 0,
unsuccessful: 0,
unsuccessfulIds: [],
errors: []
};
for (const chunk of _.chunk(ids, CHUNK_SIZE)) {
await delChunk(table, chunk, result);
}
return result;
}
module.exports.insert = insert;
module.exports.del = del;

View File

@ -3,18 +3,26 @@ const uuid = require('uuid');
const ObjectId = require('bson-objectid');
const moment = require('moment-timezone');
const errors = require('@tryghost/errors');
const debug = require('ghost-ignition').debug('importer:members');
const membersService = require('../index');
const bulkOperations = require('./bulk-operations');
const models = require('../../../models');
const {i18n} = require('../../../lib/common');
const logging = require('../../../../shared/logging');
const doImport = async ({members, allLabelModels, importSetLabels, createdBy}) => {
const createInserter = table => data => bulkOperations.insert(table, data);
const createDeleter = table => data => bulkOperations.del(table, data);
const handleUnrecognizedError = (error) => {
if (!errors.utils.isIgnitionError(error)) {
return new errors.DataImportError({
message: error.message,
context: error.context,
err: error
});
} else {
return error;
}
};
const deleteMembers = createDeleter('members');
const insertLabelAssociations = createInserter('members_labels');
const doImport = async ({members, allLabelModels, importSetLabels, createdBy}) => {
debug(`Importing members: ${members.length}, labels: ${allLabelModels.length}, import lables: ${importSetLabels.length}, createdBy: ${createdBy}`);
let {
invalidMembers,
@ -26,16 +34,34 @@ const doImport = async ({members, allLabelModels, importSetLabels, createdBy}) =
// NOTE: member insertion has to happen before the rest of insertions to handle validation
// errors - remove failed members from label/stripe sets
debug(`Starting insert of ${membersToInsert.length} members`);
const insertedMembers = await models.Member.bulkAdd(membersToInsert).then((insertResult) => {
if (insertResult.unsuccessfulIds.length) {
if (insertResult.unsuccessfulRecords.length) {
const unsuccessfulIds = insertResult.unsuccessfulRecords.map(r => r.id);
labelAssociationsToInsert = labelAssociationsToInsert
.filter(la => !insertResult.unsuccessfulIds.includes(la.member_id));
.filter(la => !unsuccessfulIds.includes(la.member_id));
stripeCustomersToFetch = stripeCustomersToFetch
.filter(sc => !insertResult.unsuccessfulIds.includes(sc.member_id));
.filter(sc => !unsuccessfulIds.includes(sc.member_id));
stripeCustomersToCreate = stripeCustomersToCreate
.filter(sc => !insertResult.unsuccessfulIds.includes(sc.member_id));
.filter(sc => !unsuccessfulIds.includes(sc.member_id));
}
debug(`Finished inserting members with ${insertResult.errors.length} errors`);
if (insertResult.errors.length) {
insertResult.errors = insertResult.errors.map((error) => {
if (error.code === 'ER_DUP_ENTRY') {
return new errors.ValidationError({
message: i18n.t('errors.models.member.memberAlreadyExists.message'),
context: i18n.t('errors.models.member.memberAlreadyExists.context'),
err: error
});
} else {
return handleUnrecognizedError(error);
}
});
}
return insertResult;
@ -43,16 +69,41 @@ const doImport = async ({members, allLabelModels, importSetLabels, createdBy}) =
const fetchedStripeCustomersPromise = fetchStripeCustomers(stripeCustomersToFetch);
const createdStripeCustomersPromise = createStripeCustomers(stripeCustomersToCreate);
const insertedLabelsPromise = insertLabelAssociations(labelAssociationsToInsert);
debug(`Starting insert of ${labelAssociationsToInsert.length} label associations`);
const insertedLabelsPromise = models.Base.Model.bulkAdd(labelAssociationsToInsert, 'members_labels')
.then((insertResult) => {
debug(`Finished inserting member label associations with ${insertResult.errors.length} errors`);
return insertResult;
});
const insertedCustomersPromise = Promise.all([
fetchedStripeCustomersPromise,
createdStripeCustomersPromise
]).then(
([fetchedStripeCustomers, createdStripeCustomers]) => {
return models.MemberStripeCustomer.bulkAdd(
fetchedStripeCustomers.customersToInsert.concat(createdStripeCustomers.customersToInsert)
);
const stripeCustomersToInsert = fetchedStripeCustomers.customersToInsert.concat(createdStripeCustomers.customersToInsert);
debug(`Starting insert of ${stripeCustomersToInsert.length} stripe customers`);
return models.MemberStripeCustomer.bulkAdd(stripeCustomersToInsert).then((insertResult) => {
debug(`Finished inserting stripe customers with ${insertResult.errors.length} errors`);
if (insertResult.errors.length) {
insertResult.errors = insertResult.errors.map((error) => {
if (error.code === 'ER_DUP_ENTRY') {
return new errors.ValidationError({
message: i18n.t('errors.models.member_stripe_customer.customerAlreadyExists.message'),
context: i18n.t('errors.models.member_stripe_customer.customerAlreadyExists.context'),
err: error
});
} else {
return handleUnrecognizedError(error);
}
});
}
return insertResult;
});
}
);
@ -61,18 +112,54 @@ const doImport = async ({members, allLabelModels, importSetLabels, createdBy}) =
createdStripeCustomersPromise,
insertedCustomersPromise
]).then(
([fetchedStripeCustomers, createdStripeCustomers]) => models.StripeCustomerSubscription.bulkAdd(
fetchedStripeCustomers.subscriptionsToInsert.concat(createdStripeCustomers.subscriptionsToInsert)
)
([fetchedStripeCustomers, createdStripeCustomers, insertedCustomersResult]) => {
let subscriptionsToInsert = fetchedStripeCustomers.subscriptionsToInsert.concat(createdStripeCustomers.subscriptionsToInsert);
if (insertedCustomersResult.unsuccessfulRecords.length) {
const unsuccessfulCustomerIds = insertedCustomersResult.unsuccessfulRecords.map(r => r.customer_id);
subscriptionsToInsert = subscriptionsToInsert.filter(s => !unsuccessfulCustomerIds.includes(s.customer_id));
}
debug(`Starting insert of ${subscriptionsToInsert.length} stripe customer subscriptions`);
return models.StripeCustomerSubscription.bulkAdd(subscriptionsToInsert)
.then((insertResult) => {
debug(`Finished inserting stripe customer subscriptions with ${insertResult.errors.length} errors`);
if (insertResult.errors.length) {
insertResult.errors = insertResult.errors.map((error) => {
if (error.code === 'ER_DUP_ENTRY') {
return new errors.ValidationError({
message: i18n.t('errors.models.stripe_customer_subscription.subscriptionAlreadyExists.message'),
context: i18n.t('errors.models.stripe_customer_subscription.subscriptionAlreadyExists.context'),
err: error
});
} else {
return handleUnrecognizedError(error);
}
});
}
return insertResult;
});
}
);
const deletedMembersPromise = Promise.all([
fetchedStripeCustomersPromise,
createdStripeCustomersPromise
createdStripeCustomersPromise,
insertedCustomersPromise,
insertedSubscriptionsPromise
]).then(
([fetchedStripeCustomers, createdStripeCustomers]) => deleteMembers(
fetchedStripeCustomers.membersToDelete.concat(createdStripeCustomers.membersToDelete)
)
([fetchedStripeCustomers, createdStripeCustomers, insertedStripeCustomers, insertedStripeSubscriptions]) => {
const memberIds = [
...fetchedStripeCustomers.membersToDelete,
...createdStripeCustomers.membersToDelete,
...insertedStripeCustomers.unsuccessfulRecords.map(r => r.member_id),
...insertedStripeSubscriptions.unsuccessfulRecords.map(r => r.member_id)
];
return models.Member.bulkDestroy(memberIds);
}
);
// This looks sequential, but at the point insertedCustomersPromise has resolved so have all the others
@ -82,26 +169,33 @@ const doImport = async ({members, allLabelModels, importSetLabels, createdBy}) =
const fetchedCustomers = await fetchedStripeCustomersPromise;
const insertedLabels = await insertedLabelsPromise;
const allErrors = [
...insertedMembers.errors,
...insertedCustomers.errors,
...insertedSubscriptions.errors,
...insertedLabels.errors,
...fetchedCustomers.errors
];
const importedCount = insertedMembers.successful - deletedMembers.successful;
const invalidCount = insertedMembers.unsuccessful + invalidMembers.length + deletedMembers.successful + deletedMembers.unsuccessful;
debug(`Finished members import with ${importedCount} imported, ${invalidCount} invalid and ${allErrors.length} errors`);
const result = {
imported: {
count: insertedMembers.successful - deletedMembers.successful
count: importedCount
},
invalid: {
count: insertedMembers.unsuccessful + deletedMembers.unsuccessful + invalidMembers.length,
errors: [
...insertedMembers.errors,
...insertedCustomers.errors,
...insertedSubscriptions.errors,
...insertedLabels.errors,
...fetchedCustomers.errors
]
count: invalidCount,
errors: allErrors
}
};
// Allow logging to happen outside of the request cycle
process.nextTick(() => {
// @TODO wrap errors with validation errors (or whatever is reasonable)
result.invalid.errors.forEach(err => logging.error(err));
deletedMembers.errors.forEach(err => logging.error(err));
insertedLabels.errors.forEach(err => logging.error(err));
});
return result;
@ -231,6 +325,7 @@ async function createStripeCustomers(stripeCustomersToCreate) {
membersToDelete: []
};
debug(`Creating Stripe customers for ${stripeCustomersToCreate.length} records`);
await Promise.all(stripeCustomersToCreate.map(async function createStripeCustomer(customerToCreate) {
try {
const customer = await membersService.api.members.createStripeCustomer({
@ -274,15 +369,22 @@ async function createStripeCustomers(stripeCustomersToCreate) {
});
} catch (error) {
if (error.message.indexOf('customer') && error.code === 'resource_missing') {
error.message = `Member not imported. ${error.message}`;
error.context = i18n.t('errors.api.members.stripeCustomerNotFound.context');
error.help = i18n.t('errors.api.members.stripeCustomerNotFound.help');
result.errors.push(new errors.NotFoundError({
message: `Member not imported. ${error.message}`,
context: i18n.t('errors.api.members.stripeCustomerNotFound.context'),
help: i18n.t('errors.api.members.stripeCustomerNotFound.help'),
err: error,
errorDetails: JSON.stringify(customerToCreate)
}));
} else {
result.errors.push(handleUnrecognizedError(error));
}
result.errors.push(error);
result.membersToDelete.push(customerToCreate.member_id);
}
}));
debug(`Finished creating Stripe customers with ${result.errors.length} errors`);
return result;
}
@ -294,6 +396,8 @@ async function fetchStripeCustomers(stripeCustomersToInsert) {
membersToDelete: []
};
debug(`Fetching Stripe customers for ${stripeCustomersToInsert.length} records`);
await Promise.all(stripeCustomersToInsert.map(async function fetchStripeCustomer(customer) {
try {
const fetchedCustomer = await membersService.api.members.getStripeCustomer(customer.customer_id, {
@ -336,15 +440,22 @@ async function fetchStripeCustomers(stripeCustomersToInsert) {
});
} catch (error) {
if (error.message.indexOf('customer') && error.code === 'resource_missing') {
error.message = `Member not imported. ${error.message}`;
error.context = i18n.t('errors.api.members.stripeCustomerNotFound.context');
error.help = i18n.t('errors.api.members.stripeCustomerNotFound.help');
result.errors.push(new errors.NotFoundError({
message: `Member not imported. ${error.message}`,
context: i18n.t('errors.api.members.stripeCustomerNotFound.context'),
help: i18n.t('errors.api.members.stripeCustomerNotFound.help'),
err: error,
errorDetails: JSON.stringify(customer)
}));
} else {
result.errors.push(handleUnrecognizedError(error));
}
result.errors.push(error);
result.membersToDelete.push(customer.member_id);
}
}));
debug(`Finished fetching Stripe customers with ${result.errors.length} errors`);
return result;
}