const _ = require('lodash'); const Promise = require('bluebird'); const debug = require('@tryghost/debug')('mega'); const tpl = require('@tryghost/tpl'); const url = require('url'); const moment = require('moment'); const ObjectID = require('bson-objectid'); const errors = require('@tryghost/errors'); const i18n = require('../../../shared/i18n'); const logging = require('@tryghost/logging'); const settingsCache = require('../../../shared/settings-cache'); const membersService = require('../members'); const limitService = require('../limits'); const bulkEmailService = require('../bulk-email'); const jobsService = require('../jobs'); const db = require('../../data/db'); const models = require('../../models'); const postEmailSerializer = require('./post-email-serializer'); const {getSegmentsFromHtml} = require('./segment-parser'); const labs = require('../labs'); // Used to listen to email.added and email.edited model events originally, I think to offload this - ideally would just use jobs now if possible const events = require('../../lib/common/events'); const messages = { invalidSegment: 'Invalid segment value. Use one of the valid:"status:free" or "status:-free" values.' }; const getFromAddress = () => { let fromAddress = membersService.config.getEmailFromAddress(); if (/@localhost$/.test(fromAddress) || /@ghost.local$/.test(fromAddress)) { const localAddress = 'localhost@example.com'; logging.warn(`Rewriting bulk email from address ${fromAddress} to ${localAddress}`); fromAddress = localAddress; } const siteTitle = settingsCache.get('title') ? settingsCache.get('title').replace(/"/g, '\\"') : ''; return siteTitle ? `"${siteTitle}"<${fromAddress}>` : fromAddress; }; const getReplyToAddress = () => { const fromAddress = membersService.config.getEmailFromAddress(); const supportAddress = membersService.config.getEmailSupportAddress(); const replyAddressOption = settingsCache.get('members_reply_address'); return (replyAddressOption === 'support') ? supportAddress : fromAddress; }; /** * * @param {Object} postModel - post model instance * @param {Object} options * @param {ValidAPIVersion} options.apiVersion - api version to be used when serializing email data */ const getEmailData = async (postModel, options) => { const {subject, html, plaintext} = await postEmailSerializer.serialize(postModel, options); return { subject, html, plaintext, from: getFromAddress(), replyTo: getReplyToAddress() }; }; /** * * @param {Object} postModel - post model instance * @param {[string]} toEmails - member email addresses to send email to * @param {ValidAPIVersion} options.apiVersion - api version to be used when serializing email data */ const sendTestEmail = async (postModel, toEmails, apiVersion) => { const emailData = await getEmailData(postModel, {apiVersion}); emailData.subject = `[Test] ${emailData.subject}`; // fetch any matching members so that replacements use expected values const recipients = await Promise.all(toEmails.map(async (email) => { const member = await membersService.api.members.get({email}); if (member) { return { member_uuid: member.get('uuid'), member_email: member.get('email'), member_name: member.get('name') }; } return { member_email: email }; })); // enable tracking for previews to match real-world behaviour emailData.track_opens = !!settingsCache.get('email_track_opens'); const response = await bulkEmailService.send(emailData, recipients); if (response instanceof bulkEmailService.FailedBatch) { return Promise.reject(response.error); } return response; }; /** * addEmail * * Accepts a post model and creates an email record based on it. Only creates one * record per post * * @param {object} postModel Post Model Object * @param {object} options * @param {ValidAPIVersion} options.apiVersion - api version to be used when serializing email data */ const addEmail = async (postModel, options) => { if (limitService.isLimited('emails')) { await limitService.errorIfWouldGoOverLimit('emails'); } const knexOptions = _.pick(options, ['transacting', 'forUpdate']); const filterOptions = Object.assign({}, knexOptions, {limit: 1}); const emailRecipientFilter = postModel.get('email_recipient_filter'); switch (emailRecipientFilter) { // `paid` and `free` were swapped out for NQL filters in 4.5.0, we shouldn't see them here now case 'paid': case 'free': throw new Error(`Unexpected email_recipient_filter value "${emailRecipientFilter}", expected an NQL equivalent`); case 'all': filterOptions.filter = 'subscribed:true'; break; case 'none': throw new Error('Cannot sent email to "none" email_recipient_filter'); default: filterOptions.filter = `subscribed:true+${emailRecipientFilter}`; } const startRetrieve = Date.now(); debug('addEmail: retrieving members count'); const {meta: {pagination: {total: membersCount}}} = await membersService.api.members.list(Object.assign({}, knexOptions, filterOptions)); debug(`addEmail: retrieved members count - ${membersCount} members (${Date.now() - startRetrieve}ms)`); // NOTE: don't create email object when there's nobody to send the email to if (membersCount === 0) { return null; } if (limitService.isLimited('emails')) { await limitService.errorIfWouldGoOverLimit('emails', {addedCount: membersCount}); } const postId = postModel.get('id'); const existing = await models.Email.findOne({post_id: postId}, knexOptions); if (!existing) { // get email contents and perform replacements using no member data so // we have a decent snapshot of email content for later display const emailData = await getEmailData(postModel, options); return models.Email.add({ post_id: postId, status: 'pending', email_count: membersCount, subject: emailData.subject, from: emailData.from, reply_to: emailData.replyTo, html: emailData.html, plaintext: emailData.plaintext, submitted_at: moment().toDate(), track_opens: !!settingsCache.get('email_track_opens'), recipient_filter: emailRecipientFilter }, knexOptions); } else { return existing; } }; /** * retryFailedEmail * * Accepts an Email model and resets it's fields to trigger retry listeners * * @param {Email} emailModel Email model */ const retryFailedEmail = async (emailModel) => { return await models.Email.edit({ status: 'pending' }, { id: emailModel.get('id') }); }; /** * handleUnsubscribeRequest * * Takes a request/response pair and reads the `unsubscribe` query parameter, * using the content to update the members service to set the `subscribed` flag * to false on the member * * If any operation fails, or the request is invalid the function will error - so using * as middleware should consider wrapping with `try/catch` * * @param {Request} req * @returns {Promise} */ async function handleUnsubscribeRequest(req) { if (!req.url) { throw new errors.BadRequestError({ message: 'Unsubscribe failed! Could not find member' }); } const {query} = url.parse(req.url, true); if (!query || !query.uuid) { throw new errors.BadRequestError({ message: (query.preview ? 'Unsubscribe preview' : 'Unsubscribe failed! Could not find member') }); } const member = await membersService.api.members.get({ uuid: query.uuid }); if (!member) { throw new errors.BadRequestError({ message: 'Unsubscribe failed! Could not find member' }); } try { const memberModel = await membersService.api.members.update({subscribed: false}, {id: member.id}); return memberModel.toJSON(); } catch (err) { throw new errors.InternalServerError({ err, message: 'Failed to unsubscribe member' }); } } async function pendingEmailHandler(emailModel, options) { // CASE: do not send email if we import a database // TODO: refactor post.published events to never fire on importing if (options && options.importing) { return; } if (emailModel.get('status') !== 'pending') { return; } // make sure recurring background analytics jobs are running once we have emails const emailAnalyticsJobs = require('../email-analytics/jobs'); emailAnalyticsJobs.scheduleRecurringJobs(); return jobsService.addJob({ job: sendEmailJob, data: {emailModel}, offloaded: false }); } async function sendEmailJob({emailModel, options}) { let startEmailSend = null; try { // Check host limit for allowed member count and throw error if over limit // - do this even if it's a retry so that there's no way around the limit if (limitService.isLimited('members')) { await limitService.errorIfIsOverLimit('members'); } // Check host limit for disabled emails or going over emails limit if (limitService.isLimited('emails')) { await limitService.errorIfWouldGoOverLimit('emails'); } // Create email batch and recipient rows unless this is a retry and they already exist const existingBatchCount = await emailModel.related('emailBatches').count('id'); if (existingBatchCount === 0) { let newBatchCount; await models.Base.transaction(async (transacting) => { newBatchCount = await createSegmentedEmailBatches({emailModel, options: {transacting}}); }); if (newBatchCount === 0) { return; } } debug('sendEmailJob: sending email'); startEmailSend = Date.now(); await bulkEmailService.processEmail({emailId: emailModel.get('id'), options}); debug(`sendEmailJob: sent email (${Date.now() - startEmailSend}ms)`); } catch (error) { if (startEmailSend) { debug(`sendEmailJob: send email failed (${Date.now() - startEmailSend}ms)`); } let errorMessage = error.message; if (errorMessage.length > 2000) { errorMessage = errorMessage.substring(0, 2000); } await emailModel.save({ status: 'failed', error: errorMessage }, {patch: true}); throw new errors.GhostError({ err: error, context: i18n.t('errors.services.mega.requestFailed.error') }); } } /** * Fetch rows of members that should receive an email. * Uses knex directly rather than bookshelf to avoid thousands of bookshelf model * instantiations and associated processing and event loop blocking * * @param {Object} options * @param {Object} options.emailModel - instance of Email model * @param {string} [options.memberSegment] - NQL filter to apply in addition to the one defined in emailModel * @param {Object} options.options - knex options * * @returns {Promise} instances of filtered knex member rows */ async function getEmailMemberRows({emailModel, memberSegment, options}) { const knexOptions = _.pick(options, ['transacting', 'forUpdate']); const filterOptions = Object.assign({}, knexOptions); const recipientFilter = emailModel.get('recipient_filter'); switch (recipientFilter) { // `paid` and `free` were swapped out for NQL filters in 4.5.0, we shouldn't see them here now case 'paid': case 'free': throw new Error(`Unexpected recipient_filter value "${recipientFilter}", expected an NQL equivalent`); case 'all': filterOptions.filter = 'subscribed:true'; break; case 'none': throw new Error('Cannot sent email to "none" recipient_filter'); default: filterOptions.filter = `subscribed:true+${recipientFilter}`; } if (memberSegment) { filterOptions.filter = `${filterOptions.filter}+${memberSegment}`; } const startRetrieve = Date.now(); debug('getEmailMemberRows: retrieving members list'); // select('members.*') is necessary here to avoid duplicate `email` columns in the result set // without it we do `select *` which pulls in the Stripe customer email too which overrides the member email const memberRows = await models.Member.getFilteredCollectionQuery(filterOptions).select('members.*').distinct(); debug(`getEmailMemberRows: retrieved members list - ${memberRows.length} members (${Date.now() - startRetrieve}ms)`); return memberRows; } /** * Partitions array of member records according to the segment they belong to * * @param {Object[]} memberRows raw member rows to partition * @param {string[]} segments segment filters to partition batches by * * @returns {Object} partitioned memberRows with keys that correspond segment names */ function partitionMembersBySegment(memberRows, segments) { const partitions = {}; for (const memberSegment of segments) { let segmentedMemberRows; // NOTE: because we only support two types of segments at the moment the logic was kept dead simple // in the future this segmentation should probably be substituted with NQL: // memberRows.filter(member => nql(memberSegment).queryJSON(member)); if (memberSegment === 'status:free') { segmentedMemberRows = memberRows.filter(member => member.status === 'free'); memberRows = memberRows.filter(member => member.status !== 'free'); } else if (memberSegment === 'status:-free') { segmentedMemberRows = memberRows.filter(member => member.status !== 'free'); memberRows = memberRows.filter(member => member.status === 'free'); } else { throw new errors.ValidationError(tpl(messages.invalidSegment)); } partitions[memberSegment] = segmentedMemberRows; } if (memberRows.length) { partitions.unsegmented = memberRows; } return partitions; } /** * Detects segment filters in emailModel's html and creates separate batches per segment * * @param {Object} options * @param {Object} options.emailModel - instance of Email model * @param {Object} options.options - knex options */ async function createSegmentedEmailBatches({emailModel, options}) { let memberRows = await getEmailMemberRows({emailModel, options}); if (!memberRows.length) { return []; } if (labs.isSet('emailCardSegments')) { const segments = getSegmentsFromHtml(emailModel.get('html')); const batchIds = []; if (segments.length) { const partitionedMembers = partitionMembersBySegment(memberRows, segments); for (const partition in partitionedMembers) { const emailBatchIds = await createEmailBatches({ emailModel, memberRows: partitionedMembers[partition], memberSegment: partition === 'unsegmented' ? null : partition, options }); batchIds.push(emailBatchIds); } return batchIds; } } const emailBatchIds = await createEmailBatches({emailModel, memberRows, options}); const batchIds = [emailBatchIds]; return batchIds; } /** * Store email_batch and email_recipient records for an email. * Uses knex directly rather than bookshelf to avoid thousands of bookshelf model * instantiations and associated processing and event loop blocking. * * @param {Object} options * @param {Object} options.emailModel - instance of Email model * @param {string} [options.memberSegment] - NQL filter to apply in addition to the one defined in emailModel * @param {Object[]} [options.memberRows] - member rows to be batched * @param {Object} options.options - knex options * @returns {Promise} - created batch ids */ async function createEmailBatches({emailModel, memberRows, memberSegment, options}) { const storeRecipientBatch = async function (recipients) { const knexOptions = _.pick(options, ['transacting', 'forUpdate']); const batchModel = await models.EmailBatch.add({ email_id: emailModel.id, member_segment: memberSegment }, knexOptions); const recipientData = []; recipients.forEach((memberRow) => { if (!memberRow.id || !memberRow.uuid || !memberRow.email) { logging.warn(`Member row not included as email recipient due to missing data - id: ${memberRow.id}, uuid: ${memberRow.uuid}, email: ${memberRow.email}`); return; } recipientData.push({ id: ObjectID().toHexString(), email_id: emailModel.id, member_id: memberRow.id, batch_id: batchModel.id, member_uuid: memberRow.uuid, member_email: memberRow.email, member_name: memberRow.name }); }); const insertQuery = db.knex('email_recipients').insert(recipientData); if (knexOptions.transacting) { insertQuery.transacting(knexOptions.transacting); } await insertQuery; return batchModel.id; }; debug('createEmailBatches: storing recipient list'); const startOfRecipientStorage = Date.now(); const batches = _.chunk(memberRows, bulkEmailService.BATCH_SIZE); const batchIds = await Promise.mapSeries(batches, storeRecipientBatch); debug(`createEmailBatches: stored recipient list (${Date.now() - startOfRecipientStorage}ms)`); return batchIds; } const statusChangedHandler = (emailModel, options) => { const emailRetried = emailModel.wasChanged() && emailModel.get('status') === 'pending' && emailModel.previous('status') === 'failed'; if (emailRetried) { pendingEmailHandler(emailModel, options); } }; function listen() { events.on('email.added', pendingEmailHandler); events.on('email.edited', statusChangedHandler); } // Public API module.exports = { listen, addEmail, retryFailedEmail, sendTestEmail, handleUnsubscribeRequest, partitionMembersBySegment // NOTE: only exposed for testing }; /** * @typedef {'v2' | 'v3' | 'v4' | 'canary' } ValidAPIVersion */