ref https://linear.app/tryghost/issue/ONC-217/implement-the-deliverytime-option-in-mailgun-api-calls Ghost experiences its highest peak load immediately after sending out a newsletter, as it recieves an influx of traffic from users clicking on the links in the email, a burst of email analytics events to process from mailgun, and an increase in organic traffic to the site's frontend as well as the admin analytics pages. The `BatchSendingService` currently sends all the batches to Mailgun as quickly as possible, which may contribute to higher peak loads. This commit adds a `deliverytime` parameter to our API calls to Mailgun, which allows us to specify a time in the future when we want the email to be delivered. This will allow us to moderate the rate at which emails are delivered, and in turn that should moderate the peak traffic volume that Ghost receives in the first 2-3 minutes after sending an email. The `deliverytime` is calculated based on a configurable parameter: `bulkEmail.targetDeliveryWindow`, which specifies the maximum allowable time (in milliseconds) after the email is first sent for Ghost to instruct Mailgun to deliver the emails. Ghost will attempt to space out all the batches as evenly as possible throughout the specified window. For example, if the targetDeliveryWindow is set to `300000` (5 minutes) and there are 100 batches, Ghost will set the `deliveryTime` for each batch ~3 seconds apart.
const logging = require('@tryghost/logging');
const ObjectID = require('bson-objectid').default;
const errors = require('@tryghost/errors');
const tpl = require('@tryghost/tpl');
const EmailBodyCache = require('./EmailBodyCache');
const messages = {
emailErrorPartialFailure: 'An error occurred, and your newsletter was only partially sent. Please retry sending the remaining emails.',
emailError: 'An unexpected error occurred, please retry sending your newsletter.'
* @typedef {import('./SendingService')} SendingService
* @typedef {import('./EmailSegmenter')} EmailSegmenter
* @typedef {import('./EmailRenderer')} EmailRenderer
* @typedef {import('./EmailRenderer').MemberLike} MemberLike
* @typedef {object} JobsService
* @typedef {object} Email
* @typedef {object} Newsletter
* @typedef {object} Post
* @typedef {object} EmailBatch
class BatchSendingService {
// Retry database queries happening before sending the email
#BEFORE_RETRY_CONFIG = {maxRetries: 10, maxTime: 10 * 60 * 1000, sleep: 2000};
#AFTER_RETRY_CONFIG = {maxRetries: 20, maxTime: 30 * 60 * 1000, sleep: 2000};
#MAILGUN_API_RETRY_CONFIG = {sleep: 10 * 1000, maxRetries: 6};
* @param {Object} dependencies
* @param {EmailRenderer} dependencies.emailRenderer
* @param {SendingService} dependencies.sendingService
* @param {JobsService} dependencies.jobsService
* @param {EmailSegmenter} dependencies.emailSegmenter
* @param {object} dependencies.models
* @param {object} dependencies.models.EmailRecipient
* @param {EmailBatch} dependencies.models.EmailBatch
* @param {Email} dependencies.models.Email
* @param {object} dependencies.models.Member
* @param {object} dependencies.db
* @param {object} [dependencies.sentry]
* @param {object} [dependencies.BEFORE_RETRY_CONFIG]
* @param {object} [dependencies.AFTER_RETRY_CONFIG]
* @param {object} [dependencies.MAILGUN_API_RETRY_CONFIG]
* @param {string} [dependencies.debugStorageFilePath]
}) {
this.#emailRenderer = emailRenderer;
this.#sendingService = sendingService;
this.#jobsService = jobsService;
this.#emailSegmenter = emailSegmenter;
this.#models = models;
this.#db = db;
this.#sentry = sentry;
this.#debugStorageFilePath = debugStorageFilePath;
} else {
if (process.env.NODE_ENV.startsWith('test') || process.env.NODE_ENV === 'development') {
this.#BEFORE_RETRY_CONFIG = {maxRetries: 0};
} else {
if (process.env.NODE_ENV.startsWith('test') || process.env.NODE_ENV === 'development') {
this.#AFTER_RETRY_CONFIG = {maxRetries: 0};
} else {
if (process.env.NODE_ENV.startsWith('test') || process.env.NODE_ENV === 'development') {
this.#MAILGUN_API_RETRY_CONFIG = {maxRetries: 0};
#getBeforeRetryConfig(email) {
if (email._retryCutOffTime) {
return {...this.#BEFORE_RETRY_CONFIG, stopAfterDate: email._retryCutOffTime};
* Schedules a background job that sends the email in the background if it is pending or failed.
* @param {Email} email
* @returns {void}
scheduleEmail(email) {
return this.#jobsService.addJob({
name: 'batch-sending-service-job',
job: this.emailJob.bind(this),
data: {emailId: email.id},
offloaded: false
* @private
* @param {{emailId: string}} data Data passed from the job service. We only need the emailId because we need to refetch the email anyway to make sure the status is right and 'locked'.
async emailJob({emailId}) {
logging.info(`Starting email job for email ${emailId}`);
const startTime = Date.now();
// Check if email is 'pending' only + change status to submitting in one transaction.
// This allows us to have a lock around the email job that makes sure an email can only have one active job.
let email = await this.retryDb(
async () => {
return await this.updateStatusLock(this.#models.Email, emailId, 'submitting', ['pending', 'failed']);
{...this.#BEFORE_RETRY_CONFIG, description: `updateStatusLock email ${emailId} -> submitting`}
if (!email) {
logging.error(`Tried sending email that is not pending or failed ${emailId}`);
// We'll stop all automatic DB retries after this date
const expectedBatchCount = Math.ceil(email.get('email_count') / 1000);
const minimumSecondsPerBatch = 26; // In case of database issues, we make sure we expand the retry window relative to the amount of batches
const stopAfter = Math.max(expectedBatchCount * minimumSecondsPerBatch * 1000, this.#BEFORE_RETRY_CONFIG.maxTime);
const retryCutOffTime = new Date(startTime + stopAfter);
// Save a strict cutoff time for retries
email._retryCutOffTime = retryCutOffTime;
try {
await this.sendEmail(email);
await this.retryDb(async () => {
await email.save({
status: 'submitted',
submitted_at: new Date(),
error: null
}, {patch: true, autoRefresh: false});
}, {...this.#AFTER_RETRY_CONFIG, description: `email ${emailId} -> submitted`});
} catch (e) {
const ghostError = new errors.EmailError({
err: e,
message: `Error sending email ${email.id}`
if (this.#sentry) {
// Log the original error to Sentry
// Store error and status in email model
await this.retryDb(async () => {
await email.save({
status: 'failed',
error: e.message || 'Something went wrong while sending the email'
}, {patch: true, autoRefresh: false});
}, {...this.#AFTER_RETRY_CONFIG, description: `email ${emailId} -> failed`});
* @private
* @param {Email} email
* @throws {errors.EmailError} If one of the batches fails
async sendEmail(email) {
logging.info(`Sending email ${email.id}`);
// Load required relations
const newsletter = await this.retryDb(async () => {
return await email.getLazyRelation('newsletter', {require: true});
}, {...this.#getBeforeRetryConfig(email), description: `getLazyRelation newsletter for email ${email.id}`});
const post = await this.retryDb(async () => {
return await email.getLazyRelation('post', {require: true, withRelated: ['posts_meta', 'authors']});
}, {...this.#getBeforeRetryConfig(email), description: `getLazyRelation post for email ${email.id}`});
let batches = await this.retryDb(async () => {
return await this.getBatches(email);
}, {...this.#getBeforeRetryConfig(email), description: `getBatches for email ${email.id}`});
if (batches.length === 0) {
batches = await this.createBatches({email, newsletter, post});
await this.sendBatches({email, batches, post, newsletter});
* @private
* @param {Email} email
* @returns {Promise<EmailBatch[]>}
async getBatches(email) {
logging.info(`Getting batches for email ${email.id}`);
// findAll returns a bookshelf collection, we want to return a plain array to align with the createBatches method
const batches = await this.#models.EmailBatch.findAll({filter: 'email_id:\'' + email.id + '\''});
return batches.models;
* @private
* @param {{email: Email, newsletter: Newsletter, post: Post}} data
* @returns {Promise<EmailBatch[]>}
async createBatches({email, post, newsletter}) {
logging.info(`Creating batches for email ${email.id}`);
const segments = await this.#emailRenderer.getSegments(post);
const batches = [];
const BATCH_SIZE = this.#sendingService.getMaximumRecipients();
let totalCount = 0;
for (const segment of segments) {
logging.info(`Creating batches for email ${email.id} segment ${segment}`);
const segmentFilter = this.#emailSegmenter.getMemberFilterForSegment(newsletter, email.get('recipient_filter'), segment);
// Avoiding Bookshelf for performance reasons
let members;
// Start with the id of the email, which is an objectId. We'll only fetch members that are created before the email. This is a special property of ObjectIds.
// Note: we use ID and not created_at, because imported members could set a created_at in the future or past and avoid limit checking.
let lastId = email.id;
while (!members || lastId) {
logging.info(`Fetching members batch for email ${email.id} segment ${segment}, lastId: ${lastId}`);
const filter = segmentFilter + `+id:<'${lastId}'`;
logging.info(`Fetching members batch for email ${email.id} segment ${segment}, lastId: ${lastId} ${filter}`);
members = await this.#models.Member.getFilteredCollectionQuery({filter})
.orderByRaw('id DESC')
.select('members.id', 'members.uuid', 'members.email', 'members.name').limit(BATCH_SIZE + 1);
if (members.length > 0) {
totalCount += Math.min(members.length, BATCH_SIZE);
const batch = await this.retryDb(
async () => {
return await this.createBatch(email, segment, members.slice(0, BATCH_SIZE));
{...this.#getBeforeRetryConfig(email), description: `createBatch email ${email.id} segment ${segment}`}
if (members.length > BATCH_SIZE) {
lastId = members[members.length - 2].id;
} else {
logging.info(`Created ${batches.length} batches for email ${email.id} with ${totalCount} recipients`);
if (email.get('email_count') !== totalCount) {
logging.error(`Email ${email.id} has wrong stored email_count ${email.get('email_count')}, did expect ${totalCount}. Updating the model.`);
// If the error rate is greater than 1%, we log it to Sentry so we can investigate
// Some differences are expected, e.g. if a new member signs up while we are sending the email
const errorRate = Math.abs((totalCount - email.get('email_count')) / email.get('email_count'));
if (this.#sentry && errorRate >= 0.01) {
// we don't have a real exception, so just log a message to Sentry
this.#sentry.captureMessage(`Email ${email.id} has wrong stored email_count ${email.get('email_count')}, did expect ${totalCount}.`);
// We update the email model because this might happen in rare cases where the initial member count changed (e.g. deleted members)
// between creating the email and sending it
await email.save({
email_count: totalCount
}, {patch: true, require: false, autoRefresh: false});
return batches;
* @private
* @param {Email} email
* @param {import('./EmailRenderer').Segment} segment
* @param {object[]} members
* @returns {Promise<EmailBatch>}
async createBatch(email, segment, members, options) {
if (!options || !options.transacting) {
return this.#models.EmailBatch.transaction(async (transacting) => {
return this.createBatch(email, segment, members, {transacting});
logging.info(`Creating batch for email ${email.id} segment ${segment} with ${members.length} members`);
const batch = await this.#models.EmailBatch.add({
email_id: email.id,
member_segment: segment,
status: 'pending'
}, options);
const recipientData = [];
members.forEach((memberRow) => {
if (!memberRow.id || !memberRow.uuid || !memberRow.email) {
logging.warn(`Member row not included as email recipient due to missing data - id: ${memberRow.id}, uuid: ${memberRow.uuid}, email: ${memberRow.email}`);
id: ObjectID().toHexString(),
email_id: email.id,
member_id: memberRow.id,
batch_id: batch.id,
member_uuid: memberRow.uuid,
member_email: memberRow.email,
member_name: memberRow.name
const insertQuery = this.#db.knex('email_recipients').insert(recipientData);
if (options.transacting) {
logging.info(`Inserting ${recipientData.length} recipients for email ${email.id} batch ${batch.id}`);
await insertQuery;
return batch;
async sendBatches({email, batches, post, newsletter}) {
logging.info(`Sending ${batches.length} batches for email ${email.id}`);
const deadline = this.getDeliveryDeadline(email);
if (deadline) {
logging.info(`Delivery deadline for email ${email.id} is ${deadline}`);
// Reuse same HTML body if we send an email to the same segment
const emailBodyCache = new EmailBodyCache();
// Calculate deliverytimes for the batches
const deliveryTimes = this.calculateDeliveryTimes(email, batches.length);
// Loop batches and send them via the EmailProvider
let succeededCount = 0;
const queue = batches.slice();
// Bind this
let runNext;
runNext = async () => {
const batch = queue.shift();
if (batch) {
const batchData = {email, batch, post, newsletter, emailBodyCache, deliveryTime: undefined};
// Only set a delivery time if we have a deadline and it hasn't past yet
if (deadline && deadline.getTime() > Date.now()) {
const deliveryTime = deliveryTimes.shift();
if (deliveryTime && deliveryTime >= Date.now()) {
batchData.deliveryTime = deliveryTime;
if (await this.sendBatch(batchData)) {
succeededCount += 1;
await runNext();
// Run maximum MAX_SENDING_CONCURRENCY at the same time
await Promise.all(new Array(MAX_SENDING_CONCURRENCY).fill(0).map(() => runNext()));
if (succeededCount < batches.length) {
if (succeededCount > 0) {
throw new errors.EmailError({
message: tpl(messages.emailErrorPartialFailure)
throw new errors.EmailError({
message: tpl(messages.emailError)
* @param {{email: Email, batch: EmailBatch, post: Post, newsletter: Newsletter, emailBodyCache: EmailBodyCache, deliveryTime:(Date|undefined) }} data
* @returns {Promise<boolean>} True when succeeded, false when failed with an error
async sendBatch({email, batch: originalBatch, post, newsletter, emailBodyCache, deliveryTime}) {
logging.info(`Sending batch ${originalBatch.id} for email ${email.id}`);
// Check the status of the email batch in a 'for update' transaction
const batch = await this.retryDb(
async () => {
return await this.updateStatusLock(this.#models.EmailBatch, originalBatch.id, 'submitting', ['pending', 'failed']);
{...this.#getBeforeRetryConfig(email), description: `updateStatusLock batch ${originalBatch.id} -> submitting`}
if (!batch) {
logging.error(`Tried sending email batch that is not pending or failed ${originalBatch.id}`);
return true;
let succeeded = false;
try {
let members = await this.retryDb(
async () => {
const m = await this.getBatchMembers(batch.id);
// If we receive 0 rows, there is a possibility that we switched to a secondary database and have replication lag
// So we throw an error and we retry
if (m.length === 0) {
throw new errors.EmailError({
message: `No members found for batch ${batch.id}, possible replication lag`
return m;
{...this.#getBeforeRetryConfig(email), description: `getBatchMembers batch ${originalBatch.id}`}
const response = await this.retryDb(async () => {
return await this.#sendingService.send({
emailId: email.id,
segment: batch.get('member_segment'),
}, {
openTrackingEnabled: !!email.get('track_opens'),
clickTrackingEnabled: !!email.get('track_clicks'),
}, {...this.#MAILGUN_API_RETRY_CONFIG, description: `Sending email batch ${originalBatch.id} ${deliveryTime ? `with delivery time ${deliveryTime}` : ''}`});
succeeded = true;
await this.retryDb(
async () => {
await batch.save({
status: 'submitted',
provider_id: response.id,
// reset error fields when sending succeeds
error_status_code: null,
error_message: null,
error_data: null
}, {patch: true, require: false, autoRefresh: false});
{...this.#AFTER_RETRY_CONFIG, description: `save batch ${originalBatch.id} -> submitted`}
} catch (err) {
if (err.code && err.code === 'BULK_EMAIL_SEND_FAILED') {
if (this.#sentry) {
// Log the original error to Sentry
} else {
const ghostError = new errors.EmailError({
message: `Error sending email batch ${batch.id}`,
context: err.message
if (this.#sentry) {
// Log the original error to Sentry
if (!succeeded) {
// We check succeeded because a Rare edge case where the batch was send, but we failed to set status to submitted, then we don't want to set it to failed
await this.retryDb(
async () => {
await batch.save({
status: 'failed',
error_status_code: err.statusCode ?? null,
error_message: err.message,
error_data: err.errorDetails ?? null
}, {patch: true, require: false, autoRefresh: false});
{...this.#AFTER_RETRY_CONFIG, description: `save batch ${originalBatch.id} -> failed`}
// Mark as processed, even when failed
await this.retryDb(
async () => {
await this.#models.EmailRecipient
.where({batch_id: batch.id})
.save({processed_at: new Date()}, {patch: true, require: false, autoRefresh: false});
{...this.#AFTER_RETRY_CONFIG, description: `save EmailRecipients ${originalBatch.id} processed_at`}
return succeeded;
* We don't want to pass EmailRecipient models to the sendingService.
* So we transform them into the MemberLike interface.
* That keeps the sending service nicely separated so it isn't dependent on the batch sending data structure.
* @returns {Promise<MemberLike[]>}
async getBatchMembers(batchId) {
let models = await this.#models.EmailRecipient.findAll({filter: `batch_id:'${batchId}'`, withRelated: ['member', 'member.stripeSubscriptions', 'member.products']});
const BATCH_SIZE = this.#sendingService.getMaximumRecipients();
if (models.length > BATCH_SIZE) {
throw new errors.EmailError({
message: `Email batch ${batchId} has ${models.length} members, which exceeds the maximum of ${BATCH_SIZE} members per batch.`
return models.map((model) => {
// Map subscriptions
const subscriptions = model.related('member').related('stripeSubscriptions').toJSON();
const tiers = model.related('member').related('products').toJSON();
return {
id: model.get('member_id'),
uuid: model.get('member_uuid'),
email: model.get('member_email'),
name: model.get('member_name'),
createdAt: model.related('member')?.get('created_at') ?? null,
status: model.related('member')?.get('status') ?? 'free',
* @private
* Update the status of an email or emailBatch to a given status, but first check if their current status is 'pending' or 'failed'.
* @param {object} Model Bookshelf model constructor
* @param {string} id id of the model
* @param {string} status set the status of the model to this value
* @param {string[]} allowedStatuses Check if the models current status is one of these values
* @returns {Promise<object|undefined>} The updated model. Undefined if the model didn't pass the status check.
async updateStatusLock(Model, id, status, allowedStatuses) {
let model;
await Model.transaction(async (transacting) => {
model = await Model.findOne({id}, {require: true, transacting, forUpdate: true});
if (!allowedStatuses.includes(model.get('status'))) {
model = undefined;
await model.save({
}, {patch: true, transacting, autoRefresh: false});
return model;
* @private
* Retry a function until it doesn't throw an error or the max retries / max time are reached.
* @template T
* @param {() => Promise<T>} func
* @param {object} options
* @param {string} options.description Used for logging
* @param {number} options.sleep time between each retry (ms), will get multiplied by the number of retries
* @param {number} options.maxRetries note: retries, not tries. So 0 means maximum 1 try, 1 means maximum 2 tries, etc.
* @param {number} [options.retryCount] (internal) Amount of retries already done. 0 intially.
* @param {number} [options.maxTime] (ms)
* @param {Date} [options.stopAfterDate]
* @returns {Promise<T>}
async retryDb(func, options) {
if (options.maxTime !== undefined) {
const stopAfterDate = new Date(Date.now() + options.maxTime);
if (!options.stopAfterDate || stopAfterDate < options.stopAfterDate) {
options = {...options, stopAfterDate};
const retryCount = (options.retryCount ?? 0);
try {
if (retryCount > 0) {
logging.info(`[BULK_EMAIL_DB_RETRY] ${options.description} - Retrying ${retryCount + 1}th try`);
} else {
logging.info(`[BULK_EMAIL_DB_RETRY] ${options.description} - Started (1st try)`);
const response = await func();
logging.info(`[BULK_EMAIL_DB_RETRY] ${options.description} - Finished (after ${retryCount + 1}${retryCount === 0 ? 'st try' : ' tries'})`);
return response;
} catch (e) {
const sleep = (options.sleep ?? 0);
if (retryCount >= options.maxRetries || (options.stopAfterDate && (new Date(Date.now() + sleep)) > options.stopAfterDate)) {
if (retryCount > 0) {
const ghostError = new errors.EmailError({
err: e,
message: `[BULK_EMAIL_DB_RETRY] ${options.description} - Failed and stopped retrying: ${retryCount >= options.maxRetries ? 'max retries reached' : 'max time reached'}`,
context: e.message
throw e;
const ghostError = new errors.EmailError({
err: e,
message: `[BULK_EMAIL_DB_RETRY] ${options.description} - Failed (${retryCount + 1}${retryCount === 0 ? 'st' : 'th'} try)`,
context: e.message
if (sleep) {
await new Promise((resolve) => {
setTimeout(resolve, sleep);
return await this.retryDb(func, {...options, retryCount: retryCount + 1, sleep: sleep * 2});
* Returns the sending deadline for an email
* Based on the email.created_at timestamp and the configured target delivery window
* @param {*} email
* @returns Date | undefined
getDeliveryDeadline(email) {
// Return undefined if targetDeliveryWindow is 0 (or less)
const targetDeliveryWindow = this.#sendingService.getTargetDeliveryWindow();
if (targetDeliveryWindow === undefined || targetDeliveryWindow <= 0) {
return undefined;
try {
const startTime = email.get('created_at');
const deadline = new Date(startTime.getTime() + targetDeliveryWindow);
return deadline;
} catch (err) {
return undefined;
* Adds deliverytimes to the passed in batches, based on the delivery deadline
* @param {Email} email - the email model to be sent
* @param {number} numBatches - the number of batches to be sent
calculateDeliveryTimes(email, numBatches) {
const deadline = this.getDeliveryDeadline(email);
const now = new Date();
// If there is no deadline (target delivery window is not set) or the deadline is in the past, delivery immediately
if (!deadline || now >= deadline) {
return new Array(numBatches).fill(undefined);
} else {
const timeToDeadline = deadline.getTime() - now.getTime();
const batchDelay = timeToDeadline / numBatches;
const deliveryTimes = [];
for (let i = 0; i < numBatches; i++) {
const delay = batchDelay * i;
const deliveryTime = new Date(now.getTime() + delay);
return deliveryTimes;
module.exports = BatchSendingService;