mirror of
https://github.com/TryGhost/Ghost.git
synced 2025-01-05 09:50:34 +03:00
61e1b19d46
- This is a precursor to trying to split apart into:
- model events + webhooks system which makes sense
- frontend events which should be independent or removed
- maybe some concept of a settings manager that we can use in various places to bind logic 🤔
- other usages of events that should be refactored to not use events
532 lines
18 KiB
JavaScript
532 lines
18 KiB
JavaScript
const _ = require('lodash');
|
|
const Promise = require('bluebird');
|
|
const debug = require('@tryghost/debug')('mega');
|
|
const tpl = require('@tryghost/tpl');
|
|
const url = require('url');
|
|
const moment = require('moment');
|
|
const ObjectID = require('bson-objectid');
|
|
const errors = require('@tryghost/errors');
|
|
const i18n = require('../../../shared/i18n');
|
|
const logging = require('@tryghost/logging');
|
|
const settingsCache = require('../../../shared/settings-cache');
|
|
const membersService = require('../members');
|
|
const limitService = require('../limits');
|
|
const bulkEmailService = require('../bulk-email');
|
|
const jobsService = require('../jobs');
|
|
const db = require('../../data/db');
|
|
const models = require('../../models');
|
|
const postEmailSerializer = require('./post-email-serializer');
|
|
const {getSegmentsFromHtml} = require('./segment-parser');
|
|
const labs = require('../labs');
|
|
|
|
// Used to listen to email.added and email.edited model events originally, I think to offload this - ideally would just use jobs now if possible
|
|
const events = require('../../lib/common/events');
|
|
|
|
const messages = {
|
|
invalidSegment: 'Invalid segment value. Use one of the valid:"status:free" or "status:-free" values.'
|
|
};
|
|
|
|
const getFromAddress = () => {
|
|
let fromAddress = membersService.config.getEmailFromAddress();
|
|
|
|
if (/@localhost$/.test(fromAddress) || /@ghost.local$/.test(fromAddress)) {
|
|
const localAddress = 'localhost@example.com';
|
|
logging.warn(`Rewriting bulk email from address ${fromAddress} to ${localAddress}`);
|
|
fromAddress = localAddress;
|
|
}
|
|
|
|
const siteTitle = settingsCache.get('title') ? settingsCache.get('title').replace(/"/g, '\\"') : '';
|
|
|
|
return siteTitle ? `"${siteTitle}"<${fromAddress}>` : fromAddress;
|
|
};
|
|
|
|
const getReplyToAddress = () => {
|
|
const fromAddress = membersService.config.getEmailFromAddress();
|
|
const supportAddress = membersService.config.getEmailSupportAddress();
|
|
const replyAddressOption = settingsCache.get('members_reply_address');
|
|
|
|
return (replyAddressOption === 'support') ? supportAddress : fromAddress;
|
|
};
|
|
|
|
/**
|
|
*
|
|
* @param {Object} postModel - post model instance
|
|
* @param {Object} options
|
|
* @param {ValidAPIVersion} options.apiVersion - api version to be used when serializing email data
|
|
*/
|
|
const getEmailData = async (postModel, options) => {
|
|
const {subject, html, plaintext} = await postEmailSerializer.serialize(postModel, options);
|
|
|
|
return {
|
|
subject,
|
|
html,
|
|
plaintext,
|
|
from: getFromAddress(),
|
|
replyTo: getReplyToAddress()
|
|
};
|
|
};
|
|
|
|
/**
|
|
*
|
|
* @param {Object} postModel - post model instance
|
|
* @param {[string]} toEmails - member email addresses to send email to
|
|
* @param {ValidAPIVersion} options.apiVersion - api version to be used when serializing email data
|
|
*/
|
|
const sendTestEmail = async (postModel, toEmails, apiVersion) => {
|
|
const emailData = await getEmailData(postModel, {apiVersion});
|
|
emailData.subject = `[Test] ${emailData.subject}`;
|
|
|
|
// fetch any matching members so that replacements use expected values
|
|
const recipients = await Promise.all(toEmails.map(async (email) => {
|
|
const member = await membersService.api.members.get({email});
|
|
if (member) {
|
|
return {
|
|
member_uuid: member.get('uuid'),
|
|
member_email: member.get('email'),
|
|
member_name: member.get('name')
|
|
};
|
|
}
|
|
|
|
return {
|
|
member_email: email
|
|
};
|
|
}));
|
|
|
|
// enable tracking for previews to match real-world behaviour
|
|
emailData.track_opens = !!settingsCache.get('email_track_opens');
|
|
|
|
const response = await bulkEmailService.send(emailData, recipients);
|
|
|
|
if (response instanceof bulkEmailService.FailedBatch) {
|
|
return Promise.reject(response.error);
|
|
}
|
|
|
|
return response;
|
|
};
|
|
|
|
/**
|
|
* addEmail
|
|
*
|
|
* Accepts a post model and creates an email record based on it. Only creates one
|
|
* record per post
|
|
*
|
|
* @param {object} postModel Post Model Object
|
|
* @param {object} options
|
|
* @param {ValidAPIVersion} options.apiVersion - api version to be used when serializing email data
|
|
*/
|
|
|
|
const addEmail = async (postModel, options) => {
|
|
if (limitService.isLimited('emails')) {
|
|
await limitService.errorIfWouldGoOverLimit('emails');
|
|
}
|
|
|
|
const knexOptions = _.pick(options, ['transacting', 'forUpdate']);
|
|
const filterOptions = Object.assign({}, knexOptions, {limit: 1});
|
|
|
|
const emailRecipientFilter = postModel.get('email_recipient_filter');
|
|
|
|
switch (emailRecipientFilter) {
|
|
// `paid` and `free` were swapped out for NQL filters in 4.5.0, we shouldn't see them here now
|
|
case 'paid':
|
|
case 'free':
|
|
throw new Error(`Unexpected email_recipient_filter value "${emailRecipientFilter}", expected an NQL equivalent`);
|
|
case 'all':
|
|
filterOptions.filter = 'subscribed:true';
|
|
break;
|
|
case 'none':
|
|
throw new Error('Cannot sent email to "none" email_recipient_filter');
|
|
default:
|
|
filterOptions.filter = `subscribed:true+${emailRecipientFilter}`;
|
|
}
|
|
|
|
const startRetrieve = Date.now();
|
|
debug('addEmail: retrieving members count');
|
|
const {meta: {pagination: {total: membersCount}}} = await membersService.api.members.list(Object.assign({}, knexOptions, filterOptions));
|
|
debug(`addEmail: retrieved members count - ${membersCount} members (${Date.now() - startRetrieve}ms)`);
|
|
|
|
// NOTE: don't create email object when there's nobody to send the email to
|
|
if (membersCount === 0) {
|
|
return null;
|
|
}
|
|
|
|
if (limitService.isLimited('emails')) {
|
|
await limitService.errorIfWouldGoOverLimit('emails', {addedCount: membersCount});
|
|
}
|
|
|
|
const postId = postModel.get('id');
|
|
const existing = await models.Email.findOne({post_id: postId}, knexOptions);
|
|
|
|
if (!existing) {
|
|
// get email contents and perform replacements using no member data so
|
|
// we have a decent snapshot of email content for later display
|
|
const emailData = await getEmailData(postModel, options);
|
|
|
|
return models.Email.add({
|
|
post_id: postId,
|
|
status: 'pending',
|
|
email_count: membersCount,
|
|
subject: emailData.subject,
|
|
from: emailData.from,
|
|
reply_to: emailData.replyTo,
|
|
html: emailData.html,
|
|
plaintext: emailData.plaintext,
|
|
submitted_at: moment().toDate(),
|
|
track_opens: !!settingsCache.get('email_track_opens'),
|
|
recipient_filter: emailRecipientFilter
|
|
}, knexOptions);
|
|
} else {
|
|
return existing;
|
|
}
|
|
};
|
|
|
|
/**
|
|
* retryFailedEmail
|
|
*
|
|
* Accepts an Email model and resets it's fields to trigger retry listeners
|
|
*
|
|
* @param {Email} emailModel Email model
|
|
*/
|
|
const retryFailedEmail = async (emailModel) => {
|
|
return await models.Email.edit({
|
|
status: 'pending'
|
|
}, {
|
|
id: emailModel.get('id')
|
|
});
|
|
};
|
|
|
|
/**
|
|
* handleUnsubscribeRequest
|
|
*
|
|
* Takes a request/response pair and reads the `unsubscribe` query parameter,
|
|
* using the content to update the members service to set the `subscribed` flag
|
|
* to false on the member
|
|
*
|
|
* If any operation fails, or the request is invalid the function will error - so using
|
|
* as middleware should consider wrapping with `try/catch`
|
|
*
|
|
* @param {Request} req
|
|
* @returns {Promise<void>}
|
|
*/
|
|
async function handleUnsubscribeRequest(req) {
|
|
if (!req.url) {
|
|
throw new errors.BadRequestError({
|
|
message: 'Unsubscribe failed! Could not find member'
|
|
});
|
|
}
|
|
|
|
const {query} = url.parse(req.url, true);
|
|
if (!query || !query.uuid) {
|
|
throw new errors.BadRequestError({
|
|
message: (query.preview ? 'Unsubscribe preview' : 'Unsubscribe failed! Could not find member')
|
|
});
|
|
}
|
|
|
|
const member = await membersService.api.members.get({
|
|
uuid: query.uuid
|
|
});
|
|
|
|
if (!member) {
|
|
throw new errors.BadRequestError({
|
|
message: 'Unsubscribe failed! Could not find member'
|
|
});
|
|
}
|
|
|
|
try {
|
|
const memberModel = await membersService.api.members.update({subscribed: false}, {id: member.id});
|
|
return memberModel.toJSON();
|
|
} catch (err) {
|
|
throw new errors.InternalServerError({
|
|
err,
|
|
message: 'Failed to unsubscribe member'
|
|
});
|
|
}
|
|
}
|
|
|
|
async function pendingEmailHandler(emailModel, options) {
|
|
// CASE: do not send email if we import a database
|
|
// TODO: refactor post.published events to never fire on importing
|
|
if (options && options.importing) {
|
|
return;
|
|
}
|
|
|
|
if (emailModel.get('status') !== 'pending') {
|
|
return;
|
|
}
|
|
|
|
// make sure recurring background analytics jobs are running once we have emails
|
|
const emailAnalyticsJobs = require('../email-analytics/jobs');
|
|
emailAnalyticsJobs.scheduleRecurringJobs();
|
|
|
|
return jobsService.addJob({
|
|
job: sendEmailJob,
|
|
data: {emailModel},
|
|
offloaded: false
|
|
});
|
|
}
|
|
|
|
async function sendEmailJob({emailModel, options}) {
|
|
let startEmailSend = null;
|
|
|
|
try {
|
|
// Check host limit for allowed member count and throw error if over limit
|
|
// - do this even if it's a retry so that there's no way around the limit
|
|
if (limitService.isLimited('members')) {
|
|
await limitService.errorIfIsOverLimit('members');
|
|
}
|
|
|
|
// Check host limit for disabled emails or going over emails limit
|
|
if (limitService.isLimited('emails')) {
|
|
await limitService.errorIfWouldGoOverLimit('emails');
|
|
}
|
|
|
|
// Create email batch and recipient rows unless this is a retry and they already exist
|
|
const existingBatchCount = await emailModel.related('emailBatches').count('id');
|
|
|
|
if (existingBatchCount === 0) {
|
|
let newBatchCount;
|
|
|
|
await models.Base.transaction(async (transacting) => {
|
|
newBatchCount = await createSegmentedEmailBatches({emailModel, options: {transacting}});
|
|
});
|
|
|
|
if (newBatchCount === 0) {
|
|
return;
|
|
}
|
|
}
|
|
|
|
debug('sendEmailJob: sending email');
|
|
startEmailSend = Date.now();
|
|
await bulkEmailService.processEmail({emailId: emailModel.get('id'), options});
|
|
debug(`sendEmailJob: sent email (${Date.now() - startEmailSend}ms)`);
|
|
} catch (error) {
|
|
if (startEmailSend) {
|
|
debug(`sendEmailJob: send email failed (${Date.now() - startEmailSend}ms)`);
|
|
}
|
|
|
|
let errorMessage = error.message;
|
|
if (errorMessage.length > 2000) {
|
|
errorMessage = errorMessage.substring(0, 2000);
|
|
}
|
|
|
|
await emailModel.save({
|
|
status: 'failed',
|
|
error: errorMessage
|
|
}, {patch: true});
|
|
|
|
throw new errors.GhostError({
|
|
err: error,
|
|
context: i18n.t('errors.services.mega.requestFailed.error')
|
|
});
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Fetch rows of members that should receive an email.
|
|
* Uses knex directly rather than bookshelf to avoid thousands of bookshelf model
|
|
* instantiations and associated processing and event loop blocking
|
|
*
|
|
* @param {Object} options
|
|
* @param {Object} options.emailModel - instance of Email model
|
|
* @param {string} [options.memberSegment] - NQL filter to apply in addition to the one defined in emailModel
|
|
* @param {Object} options.options - knex options
|
|
*
|
|
* @returns {Promise<Object[]>} instances of filtered knex member rows
|
|
*/
|
|
async function getEmailMemberRows({emailModel, memberSegment, options}) {
|
|
const knexOptions = _.pick(options, ['transacting', 'forUpdate']);
|
|
const filterOptions = Object.assign({}, knexOptions);
|
|
|
|
const recipientFilter = emailModel.get('recipient_filter');
|
|
|
|
switch (recipientFilter) {
|
|
// `paid` and `free` were swapped out for NQL filters in 4.5.0, we shouldn't see them here now
|
|
case 'paid':
|
|
case 'free':
|
|
throw new Error(`Unexpected recipient_filter value "${recipientFilter}", expected an NQL equivalent`);
|
|
case 'all':
|
|
filterOptions.filter = 'subscribed:true';
|
|
break;
|
|
case 'none':
|
|
throw new Error('Cannot sent email to "none" recipient_filter');
|
|
default:
|
|
filterOptions.filter = `subscribed:true+${recipientFilter}`;
|
|
}
|
|
|
|
if (memberSegment) {
|
|
filterOptions.filter = `${filterOptions.filter}+${memberSegment}`;
|
|
}
|
|
|
|
const startRetrieve = Date.now();
|
|
debug('getEmailMemberRows: retrieving members list');
|
|
// select('members.*') is necessary here to avoid duplicate `email` columns in the result set
|
|
// without it we do `select *` which pulls in the Stripe customer email too which overrides the member email
|
|
const memberRows = await models.Member.getFilteredCollectionQuery(filterOptions).select('members.*').distinct();
|
|
debug(`getEmailMemberRows: retrieved members list - ${memberRows.length} members (${Date.now() - startRetrieve}ms)`);
|
|
|
|
return memberRows;
|
|
}
|
|
|
|
/**
|
|
* Partitions array of member records according to the segment they belong to
|
|
*
|
|
* @param {Object[]} memberRows raw member rows to partition
|
|
* @param {string[]} segments segment filters to partition batches by
|
|
*
|
|
* @returns {Object} partitioned memberRows with keys that correspond segment names
|
|
*/
|
|
function partitionMembersBySegment(memberRows, segments) {
|
|
const partitions = {};
|
|
|
|
for (const memberSegment of segments) {
|
|
let segmentedMemberRows;
|
|
|
|
// NOTE: because we only support two types of segments at the moment the logic was kept dead simple
|
|
// in the future this segmentation should probably be substituted with NQL:
|
|
// memberRows.filter(member => nql(memberSegment).queryJSON(member));
|
|
if (memberSegment === 'status:free') {
|
|
segmentedMemberRows = memberRows.filter(member => member.status === 'free');
|
|
memberRows = memberRows.filter(member => member.status !== 'free');
|
|
} else if (memberSegment === 'status:-free') {
|
|
segmentedMemberRows = memberRows.filter(member => member.status !== 'free');
|
|
memberRows = memberRows.filter(member => member.status === 'free');
|
|
} else {
|
|
throw new errors.ValidationError(tpl(messages.invalidSegment));
|
|
}
|
|
|
|
partitions[memberSegment] = segmentedMemberRows;
|
|
}
|
|
|
|
if (memberRows.length) {
|
|
partitions.unsegmented = memberRows;
|
|
}
|
|
|
|
return partitions;
|
|
}
|
|
|
|
/**
|
|
* Detects segment filters in emailModel's html and creates separate batches per segment
|
|
*
|
|
* @param {Object} options
|
|
* @param {Object} options.emailModel - instance of Email model
|
|
* @param {Object} options.options - knex options
|
|
*/
|
|
async function createSegmentedEmailBatches({emailModel, options}) {
|
|
let memberRows = await getEmailMemberRows({emailModel, options});
|
|
|
|
if (!memberRows.length) {
|
|
return [];
|
|
}
|
|
|
|
if (labs.isSet('emailCardSegments')) {
|
|
const segments = getSegmentsFromHtml(emailModel.get('html'));
|
|
const batchIds = [];
|
|
|
|
if (segments.length) {
|
|
const partitionedMembers = partitionMembersBySegment(memberRows, segments);
|
|
|
|
for (const partition in partitionedMembers) {
|
|
const emailBatchIds = await createEmailBatches({
|
|
emailModel,
|
|
memberRows: partitionedMembers[partition],
|
|
memberSegment: partition === 'unsegmented' ? null : partition,
|
|
options
|
|
});
|
|
batchIds.push(emailBatchIds);
|
|
}
|
|
return batchIds;
|
|
}
|
|
}
|
|
|
|
const emailBatchIds = await createEmailBatches({emailModel, memberRows, options});
|
|
const batchIds = [emailBatchIds];
|
|
return batchIds;
|
|
}
|
|
|
|
/**
|
|
* Store email_batch and email_recipient records for an email.
|
|
* Uses knex directly rather than bookshelf to avoid thousands of bookshelf model
|
|
* instantiations and associated processing and event loop blocking.
|
|
*
|
|
* @param {Object} options
|
|
* @param {Object} options.emailModel - instance of Email model
|
|
* @param {string} [options.memberSegment] - NQL filter to apply in addition to the one defined in emailModel
|
|
* @param {Object[]} [options.memberRows] - member rows to be batched
|
|
* @param {Object} options.options - knex options
|
|
* @returns {Promise<string[]>} - created batch ids
|
|
*/
|
|
async function createEmailBatches({emailModel, memberRows, memberSegment, options}) {
|
|
const storeRecipientBatch = async function (recipients) {
|
|
const knexOptions = _.pick(options, ['transacting', 'forUpdate']);
|
|
const batchModel = await models.EmailBatch.add({
|
|
email_id: emailModel.id,
|
|
member_segment: memberSegment
|
|
}, knexOptions);
|
|
|
|
const recipientData = [];
|
|
|
|
recipients.forEach((memberRow) => {
|
|
if (!memberRow.id || !memberRow.uuid || !memberRow.email) {
|
|
logging.warn(`Member row not included as email recipient due to missing data - id: ${memberRow.id}, uuid: ${memberRow.uuid}, email: ${memberRow.email}`);
|
|
return;
|
|
}
|
|
|
|
recipientData.push({
|
|
id: ObjectID().toHexString(),
|
|
email_id: emailModel.id,
|
|
member_id: memberRow.id,
|
|
batch_id: batchModel.id,
|
|
member_uuid: memberRow.uuid,
|
|
member_email: memberRow.email,
|
|
member_name: memberRow.name
|
|
});
|
|
});
|
|
|
|
const insertQuery = db.knex('email_recipients').insert(recipientData);
|
|
|
|
if (knexOptions.transacting) {
|
|
insertQuery.transacting(knexOptions.transacting);
|
|
}
|
|
|
|
await insertQuery;
|
|
|
|
return batchModel.id;
|
|
};
|
|
|
|
debug('createEmailBatches: storing recipient list');
|
|
const startOfRecipientStorage = Date.now();
|
|
const batches = _.chunk(memberRows, bulkEmailService.BATCH_SIZE);
|
|
const batchIds = await Promise.mapSeries(batches, storeRecipientBatch);
|
|
debug(`createEmailBatches: stored recipient list (${Date.now() - startOfRecipientStorage}ms)`);
|
|
|
|
return batchIds;
|
|
}
|
|
|
|
const statusChangedHandler = (emailModel, options) => {
|
|
const emailRetried = emailModel.wasChanged()
|
|
&& emailModel.get('status') === 'pending'
|
|
&& emailModel.previous('status') === 'failed';
|
|
|
|
if (emailRetried) {
|
|
pendingEmailHandler(emailModel, options);
|
|
}
|
|
};
|
|
|
|
function listen() {
|
|
events.on('email.added', pendingEmailHandler);
|
|
events.on('email.edited', statusChangedHandler);
|
|
}
|
|
|
|
// Public API
|
|
module.exports = {
|
|
listen,
|
|
addEmail,
|
|
retryFailedEmail,
|
|
sendTestEmail,
|
|
handleUnsubscribeRequest,
|
|
partitionMembersBySegment // NOTE: only exposed for testing
|
|
};
|
|
|
|
/**
|
|
* @typedef {'v2' | 'v3' | 'v4' | 'canary' } ValidAPIVersion
|
|
*/
|