Added email analytics service (#12393)

no issue

- added `EmailAnalyticsService`
  - `.fetchAll()` grabs and processes all available events
  - `.fetchLatest()` grabs and processes all events since the last seen event timestamp
  - `EventProcessor` passed event objects and updates `email_recipients` or `members` records depending on the event being analytics or list hygiene
    - always returns a `EventProcessingResult` instance so that progress can be tracked and merged across individual events, batches (pages of events), and total runs
    - adds email_id and member_id to the returned result where appropriate so that the stats aggregator can limit processing to data that has changed
    - sets `email_recipients.{delivered_at, opened_at, failed_at}` for analytics events
    - sets `members.subscribed = false` for permanent failure/unsubscribed/complained list hygiene events
  - `StatsAggregator` takes an `EventProcessingResult`-like object containing arrays of email ids and member ids on which to aggregate statistics.
  - jobs for `fetch-latest` and `fetch-all` ready for use with the JobsService
- added `initialiseRecurringJobs()` function to Ghost bootup procedure that schedules the email analytics "fetch latest" job to run every minute
This commit is contained in:
Kevin Ansfield 2020-11-26 13:09:38 +00:00 committed by GitHub
parent c8ec1067c5
commit 717543835c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 716 additions and 0 deletions

View File

@ -10,6 +10,7 @@
require('./overrides');
const debug = require('ghost-ignition').debug('boot:init');
const path = require('path');
const Promise = require('bluebird');
const config = require('../shared/config');
const {events, i18n} = require('./lib/common');
@ -69,6 +70,22 @@ function initialiseServices() {
});
}
function initializeRecurringJobs() {
// we don't want to kick off scheduled/recurring jobs that will interfere with tests
if (process.env.NODE_ENV.match(/^testing/)) {
return;
}
const jobsService = require('./services/jobs');
jobsService.scheduleJob(
'every 1 minute',
path.resolve(__dirname, 'services', 'email-analytics', 'jobs', 'fetch-latest.js'),
undefined,
'email-analytics-fetch-latest'
);
}
/**
* - initialise models
* - initialise i18n
@ -123,6 +140,9 @@ const minimalRequiredSetupToStartGhost = (dbState) => {
events.emit('db.ready');
return initialiseServices()
.then(() => {
initializeRecurringJobs();
})
.then(() => {
return ghostServer;
});
@ -146,6 +166,9 @@ const minimalRequiredSetupToStartGhost = (dbState) => {
logging.info('Blog is out of maintenance mode.');
return GhostServer.announceServerReadiness();
})
.then(() => {
initializeRecurringJobs();
})
.catch((err) => {
return GhostServer.announceServerReadiness(err)
.finally(() => {

View File

@ -0,0 +1,103 @@
const _ = require('lodash');
const EventProcessingResult = require('./lib/event-processing-result');
const EventProcessor = require('./lib/event-processor');
const StatsAggregator = require('./lib/stats-aggregator');
const defaultProviders = require('./providers');
const debug = require('ghost-ignition').debug('services:email-analytics');
// when fetching a batch we should keep a record of which emails have associated
// events so we only aggregate those that are affected
class EmailAnalyticsService {
constructor({config, settings, logging, db, providers, eventProcessor, statsAggregator}) {
this.config = config;
this.settings = settings;
this.logging = logging || console;
this.db = db;
this.providers = providers || defaultProviders.init({config, settings, logging});
this.eventProcessor = eventProcessor || new EventProcessor({db, logging});
this.statsAggregator = statsAggregator || new StatsAggregator({db, logging});
}
async fetchAll() {
const result = new EventProcessingResult();
const emailCount = await this.db.knex('emails').count();
if (emailCount <= 0) {
debug('fetchAll: skipping - no emails to track');
return result;
}
const startFetch = new Date();
debug('fetchAll: starting');
for (const [, provider] of Object.entries(this.providers)) {
const providerResults = await provider.fetchAll(this.processEventBatch.bind(this));
result.merge(providerResults);
}
debug(`fetchAll: finished (${Date.now() - startFetch}ms)`);
return result;
}
async fetchLatest({maxEvents = Infinity} = {}) {
const result = new EventProcessingResult();
const lastTimestamp = await this.getLastSeenEventTimestamp();
const startFetch = new Date();
debug('fetchLatest: starting');
providersLoop:
for (const [, provider] of Object.entries(this.providers)) {
const providerResults = await provider.fetchLatest(lastTimestamp, this.processEventBatch.bind(this), {maxEvents});
result.merge(providerResults);
if (result.totalEvents >= maxEvents) {
break providersLoop;
}
}
debug(`fetchLatest: finished in ${Date.now() - startFetch}ms. Fetched ${result.totalEvents} events`);
return result;
}
async processEventBatch(events) {
const result = new EventProcessingResult();
for (const event of events) {
const batchResult = await this.eventProcessor.process(event);
result.merge(batchResult);
}
return result;
}
async aggregateStats({emailIds = [], memberIds = []}) {
for (const emailId of emailIds) {
await this.aggregateEmailStats(emailId);
}
for (const memberId of memberIds) {
await this.aggregateEmailStats(memberId);
}
}
aggregateEmailStats(emailId) {
return this.statsAggregator.aggregateEmail(emailId);
}
aggregateMemberStats(memberId) {
return this.statsAggregator.aggregateMember(memberId);
}
async getLastSeenEventTimestamp() {
const startDate = new Date();
// three separate queries is much faster than using max/greatest across columns with coalesce to handle nulls
const {maxDeliveredAt} = await this.db.knex('email_recipients').select(this.db.knex.raw('MAX(delivered_at) as maxDeliveredAt')).first() || {};
const {maxOpenedAt} = await this.db.knex('email_recipients').select(this.db.knex.raw('MAX(opened_at) as maxOpenedAt')).first() || {};
const {maxFailedAt} = await this.db.knex('email_recipients').select(this.db.knex.raw('MAX(failed_at) as maxFailedAt')).first() || {};
const lastSeenEventTimestamp = _.max([maxDeliveredAt, maxOpenedAt, maxFailedAt]);
debug(`getLastSeenEventTimestamp: finished in ${Date.now() - startDate}ms`);
return lastSeenEventTimestamp;
}
}
module.exports = EmailAnalyticsService;

View File

@ -0,0 +1,12 @@
const config = require('../../../shared/config');
const logging = require('../../../shared/logging');
const db = require('../../data/db');
const settings = require('../settings/cache');
const EmailAnalyticsService = require('./email-analytics');
module.exports = new EmailAnalyticsService({
config,
logging,
db,
settings
});

View File

@ -0,0 +1,70 @@
const logging = require('../../../../shared/logging');
const {parentPort} = require('worker_threads');
const debug = require('ghost-ignition').debug('jobs:email-analytics:fetch-all');
// one-off job to fetch all available events and re-process them idempotently
// NB. can be a _very_ long job for sites with many members and frequent emails
function cancel() {
logging.info('Email analytics fetch-all job cancelled before completion');
if (parentPort) {
parentPort.postMessage('cancelled');
} else {
setTimeout(() => {
process.exit(0);
}, 1000);
}
}
if (parentPort) {
parentPort.once('message', (message) => {
if (message === 'cancel') {
return cancel();
}
});
}
(async () => {
try {
const models = require('../../../models');
const settingsService = require('../../settings');
// must be initialized before emailAnalyticsService is required otherwise
// requires are in the wrong order and settingsCache will always be empty
await models.init();
await settingsService.init();
const emailAnalyticsService = require('../');
const fetchStartDate = new Date();
debug('Starting email analytics fetch of all available events');
const eventStats = await emailAnalyticsService.fetchAll();
const fetchEndDate = new Date();
debug(`Finished fetching ${eventStats.totalEvents} analytics events in ${fetchEndDate - fetchStartDate}ms`);
const aggregateStartDate = new Date();
debug(`Starting email analytics aggregation for ${eventStats.emailIds.length} emails`);
await emailAnalyticsService.aggregateStats(eventStats);
const aggregateEndDate = new Date();
debug(`Finished aggregating email analytics in ${aggregateEndDate - aggregateStartDate}ms`);
logging.info(`Fetched ${eventStats.totalEvents} events and aggregated stats for ${eventStats.emailIds.length} emails in ${aggregateEndDate - fetchStartDate}ms`);
if (parentPort) {
parentPort.postMessage('done');
} else {
// give the logging pipes time finish writing before exit
setTimeout(() => {
process.exit(0);
}, 1000);
}
} catch (error) {
logging.error(error);
// give the logging pipes time finish writing before exit
setTimeout(() => {
process.exit(1);
}, 1000);
}
})();

View File

@ -0,0 +1,71 @@
const logging = require('../../../../shared/logging');
const {parentPort} = require('worker_threads');
const debug = require('ghost-ignition').debug('jobs:email-analytics:fetch-latest');
// recurring job to fetch analytics since the most recently seen event timestamp
// Exit early when cancelled to prevent stalling shutdown. No cleanup needed when cancelling as everything is idempotent and will pick up
// where it left off on next run
function cancel() {
logging.info('Email analytics fetch-latest job cancelled before completion');
if (parentPort) {
parentPort.postMessage('cancelled');
} else {
setTimeout(() => {
process.exit(0);
}, 1000);
}
}
if (parentPort) {
parentPort.once('message', (message) => {
if (message === 'cancel') {
return cancel();
}
});
}
(async () => {
try {
const models = require('../../../models');
const settingsService = require('../../settings');
// must be initialized before emailAnalyticsService is required otherwise
// requires are in the wrong order and settingsCache will always be empty
await models.init();
await settingsService.init();
const emailAnalyticsService = require('../');
const fetchStartDate = new Date();
debug('Starting email analytics fetch of latest events');
const eventStats = await emailAnalyticsService.fetchLatest();
const fetchEndDate = new Date();
debug(`Finished fetching ${eventStats.totalEvents} analytics events in ${fetchEndDate - fetchStartDate}ms`);
const aggregateStartDate = new Date();
debug(`Starting email analytics aggregation for ${eventStats.emailIds.length} emails`);
await emailAnalyticsService.aggregateStats(eventStats);
const aggregateEndDate = new Date();
debug(`Finished aggregating email analytics in ${aggregateEndDate - aggregateStartDate}ms`);
logging.info(`Fetched ${eventStats.totalEvents} events and aggregated stats for ${eventStats.emailIds.length} emails in ${aggregateEndDate - fetchStartDate}ms`);
if (parentPort) {
parentPort.postMessage('done');
} else {
// give the logging pipes time finish writing before exit
setTimeout(() => {
process.exit(0);
}, 1000);
}
} catch (error) {
logging.error(error);
// give the logging pipes time finish writing before exit
setTimeout(() => {
process.exit(1);
}, 1000);
}
})();

View File

@ -0,0 +1,45 @@
const _ = require('lodash');
class EventProcessingResult {
constructor(result = {}) {
// counts
this.delivered = 0;
this.opened = 0;
this.failed = 0;
this.unsubscribed = 0;
this.complained = 0;
this.unhandled = 0;
this.unprocessable = 0;
// ids seen whilst processing ready for passing to the stats aggregator
this.emailIds = [];
this.memberIds = [];
this.merge(result);
}
get totalEvents() {
return this.delivered
+ this.opened
+ this.failed
+ this.unsubscribed
+ this.complained
+ this.unhandled
+ this.unprocessable;
}
merge(other = {}) {
this.delivered += other.delivered || 0;
this.opened += other.opened || 0;
this.failed += other.failed || 0;
this.unsubscribed += other.unsubscribed || 0;
this.complained += other.complained || 0;
this.unhandled += other.unhandled || 0;
this.unprocessable += other.unprocessable || 0;
this.emailIds = _.compact(_.union(this.emailIds, other.emailIds || []));
this.memberIds = _.compact(_.union(this.memberIds, other.memberIds || []));
}
}
module.exports = EventProcessingResult;

View File

@ -0,0 +1,227 @@
const moment = require('moment');
class EmailAnalyticsEventProcessor {
constructor({db, logging}) {
this.db = db;
this.logging = logging || console;
// avoid having to query email_batch by provider_id for every event
this.providerIdEmailIdMap = {};
}
async process(event) {
if (event.type === 'delivered') {
return this.handleDelivered(event);
}
if (event.type === 'opened') {
return this.handleOpened(event);
}
if (event.type === 'failed') {
return this.handleFailed(event);
}
if (event.type === 'unsubscribed') {
return this.handleUnsubscribed(event);
}
if (event.type === 'complained') {
return this.handleComplained(event);
}
return {
unhandled: 1
};
}
async handleDelivered(event) {
const emailId = await this._getEmailId(event);
if (!emailId) {
return {unprocessable: 1};
}
// this doesn't work - the Base model intercepts the attr and tries to convert "COALESCE(...)" to a date
// await this.models.EmailRecipient
// .where({email_id: emailId, member_email: event.recipientEmail})
// .save({delivered_at: this.db.knex.raw('COALESCE(delivered_at, ?)', [moment.utc(event.timestamp).format('YYYY-MM-DD HH:mm:ss')])}, {patch: true, {context: {internal: true}}});
const updateResult = await this.db.knex('email_recipients')
.where('email_id', '=', emailId)
.where('member_email', '=', event.recipientEmail)
.update({
delivered_at: this.db.knex.raw('COALESCE(delivered_at, ?)', [moment.utc(event.timestamp).format('YYYY-MM-DD HH:mm:ss')])
});
if (updateResult !== 0) {
const memberId = await this._getMemberId(event);
return {
delivered: 1,
emailIds: [emailId],
memberIds: [memberId]
};
}
return {delivered: 1};
}
async handleOpened(event) {
const emailId = await this._getEmailId(event);
if (!emailId) {
return {unprocessable: 1};
}
const updateResult = await this.db.knex('email_recipients')
.where('email_id', '=', emailId)
.where('member_email', '=', event.recipientEmail)
.update({
opened_at: this.db.knex.raw('COALESCE(opened_at, ?)', [moment.utc(event.timestamp).format('YYYY-MM-DD HH:mm:ss')])
});
if (updateResult !== 0) {
const memberId = await this._getMemberId(event);
return {
opened: 1,
emailIds: [emailId],
memberIds: [memberId]
};
}
return {opened: 1};
}
async handleFailed(event) {
if (event.severity === 'permanent') {
const emailId = await this._getEmailId(event);
if (!emailId) {
return {unprocessable: 1};
}
await this.db.knex('email_recipients')
.where('email_id', '=', emailId)
.where('member_email', '=', event.recipientEmail)
.update({
failed_at: this.db.knex.raw('COALESCE(failed_at, ?)', [moment.utc(event.timestamp).format('YYYY-MM-DD HH:mm:ss')])
});
// saving via bookshelf triggers label fetch/update which errors and slows down processing
await this.db.knex('members')
.where('id', '=', this.db.knex('email_recipients')
.select('member_id')
.where('email_id', '=', emailId)
.where('member_email', '=', event.recipientEmail)
)
.update({
subscribed: false,
updated_at: moment.utc().toDate()
});
return {
failed: 1,
emailIds: [emailId]
};
}
if (event.severity === 'temporary') {
// we don't care about soft bounces at the moment
return {unhandled: 1};
}
}
async handleUnsubscribed(event) {
const emailId = await this._getEmailId(event);
if (!emailId) {
return {unprocessable: 1};
}
// saving via bookshelf triggers label fetch/update which errors and slows down processing
await this.db.knex('members')
.where('id', '=', this.db.knex('email_recipients')
.select('member_id')
.where('email_id', '=', emailId)
.where('member_email', '=', event.recipientEmail)
)
.update({
subscribed: false,
updated_at: moment.utc().toDate()
});
return {
unsubscribed: 1
};
}
async handleComplained(event) {
const emailId = await this._getEmailId(event);
if (!emailId) {
return {unprocessable: 1};
}
// saving via bookshelf triggers label fetch/update which errors and slows down processing
await this.db.knex('members')
.where('id', '=', this.db.knex('email_recipients')
.select('member_id')
.where('email_id', '=', emailId)
.where('member_email', '=', event.recipientEmail)
)
.update({
subscribed: false,
updated_at: moment.utc().toDate()
});
return {
complained: 1
};
}
async _getEmailId(event) {
if (event.emailId) {
return event.emailId;
}
if (event.providerId) {
if (this.providerIdEmailIdMap[event.providerId]) {
return this.providerIdEmailIdMap[event.providerId];
}
const {emailId} = await this.db.knex('email_batches')
.select('email_id as emailId')
.where('provider_id', event.providerId)
.first() || {};
if (!emailId) {
return;
}
this.providerIdEmailIdMap[event.providerId] = emailId;
return emailId;
}
}
async _getMemberId(event) {
if (event.memberId) {
return event.memberId;
}
const emailId = await this._getEmailId(event);
if (emailId && event.recipientEmail) {
const {memberId} = await this.db.knex('email_recipients')
.select('member_id as memberId')
.where('member_email', event.recipientEmail)
.where('email_id', emailId)
.first() || {};
return memberId;
}
}
}
module.exports = EmailAnalyticsEventProcessor;

View File

@ -0,0 +1,20 @@
class EmailAnalyticsStatsAggregator {
constructor({logging, db}) {
this.logging = logging || console;
this.db = db;
}
async aggregateEmail(emailId) {
await this.db.knex('emails').update({
delivered_count: this.db.knex.raw(`(SELECT COUNT(id) FROM email_recipients WHERE email_id = ? AND delivered_at IS NOT NULL)`, [emailId]),
opened_count: this.db.knex.raw(`(SELECT COUNT(id) FROM email_recipients WHERE email_id = ? AND opened_at IS NOT NULL)`, [emailId]),
failed_count: this.db.knex.raw(`(SELECT COUNT(id) FROM email_recipients WHERE email_id = ? AND failed_at IS NOT NULL)`, [emailId])
}).where('id', emailId);
}
async aggregateMember(/*memberId*/) {
// TODO: decide on aggregation algorithm when only certain emails have open tracking
}
}
module.exports = EmailAnalyticsStatsAggregator;

View File

@ -0,0 +1,10 @@
module.exports = {
init({config, settings, logging = console}) {
return {
get mailgun() {
const Mailgun = require('./mailgun');
return new Mailgun({config, settings, logging});
}
};
}
};

View File

@ -0,0 +1,135 @@
const _ = require('lodash');
const mailgunJs = require('mailgun-js');
const moment = require('moment');
const EventProcessingResult = require('../lib/event-processing-result');
const EVENT_FILTER = 'delivered OR opened OR failed OR unsubscribed OR complained';
const PAGE_LIMIT = 300;
const TRUST_THRESHOLD_S = 30 * 60; // 30 minutes
const DEFAULT_TAGS = ['bulk-email'];
class EmailAnalyticsMailgunProvider {
constructor({config, settings, mailgun, logging = console}) {
this.config = config;
this.settings = settings;
this.logging = logging;
this.tags = [...DEFAULT_TAGS];
this._mailgun = mailgun;
if (this.config.get('bulkEmail:mailgun:tag')) {
this.tags.push(this.config.get('bulkEmail:mailgun:tag'));
}
}
// unless an instance is passed in to the constructor, generate a new instance each
// time the getter is called to account for changes in config/settings over time
get mailgun() {
if (this._mailgun) {
return this._mailgun;
}
const bulkEmailConfig = this.config.get('bulkEmail');
const bulkEmailSetting = {
apiKey: this.settings.get('mailgun_api_key'),
domain: this.settings.get('mailgun_domain'),
baseUrl: this.settings.get('mailgun_base_url')
};
const hasMailgunConfig = !!(bulkEmailConfig && bulkEmailConfig.mailgun);
const hasMailgunSetting = !!(bulkEmailSetting && bulkEmailSetting.apiKey && bulkEmailSetting.baseUrl && bulkEmailSetting.domain);
if (!hasMailgunConfig && !hasMailgunSetting) {
this.logging.warn(`Bulk email service is not configured`);
return undefined;
}
const mailgunConfig = hasMailgunConfig ? bulkEmailConfig.mailgun : bulkEmailSetting;
const baseUrl = new URL(mailgunConfig.baseUrl);
return mailgunJs({
apiKey: mailgunConfig.apiKey,
domain: mailgunConfig.domain,
protocol: baseUrl.protocol,
host: baseUrl.hostname,
port: baseUrl.port,
endpoint: baseUrl.pathname,
retry: 5
});
}
// do not start from a particular time, grab latest then work back through
// pages until we get a blank response
fetchAll(batchHandler) {
const options = {
event: EVENT_FILTER,
limit: PAGE_LIMIT,
tags: this.tags.join(' AND ')
};
return this._fetchPages(options, batchHandler);
}
// fetch from the last known timestamp-TRUST_THRESHOLD then work forwards
// through pages until we get a blank response. This lets us get events
// quicker than the TRUST_THRESHOLD
fetchLatest(latestTimestamp, batchHandler, options) {
const beginDate = moment(latestTimestamp).subtract(TRUST_THRESHOLD_S, 's').toDate();
const mailgunOptions = {
limit: PAGE_LIMIT,
event: EVENT_FILTER,
tags: this.tags.join(' AND '),
begin: beginDate.toUTCString(),
ascending: 'yes'
};
return this._fetchPages(mailgunOptions, batchHandler, options);
}
async _fetchPages(mailgunOptions, batchHandler, {maxEvents = Infinity} = {}) {
const {mailgun} = this;
if (!mailgun) {
this.logging.warn(`Bulk email service is not configured`);
return new EventProcessingResult();
}
const result = new EventProcessingResult();
let page = await mailgun.events().get(mailgunOptions);
let events = page && page.items && page.items.map(this.normalizeEvent) || [];
pagesLoop:
while (events.length !== 0) {
const batchResult = await batchHandler(events);
result.merge(batchResult);
if (result.totalEvents >= maxEvents) {
break pagesLoop;
}
page = await mailgun.get(page.paging.next.replace('https://api.mailgun.net/v3', ''));
events = page.items.map(this.normalizeEvent);
}
return result;
}
normalizeEvent(event) {
// TODO: clean up the <> surrounding email_batches.provider_id values
let providerId = event.message && event.message.headers && event.message.headers['message-id'];
if (providerId) {
providerId = `<${providerId}>`;
}
return {
type: event.event,
severity: event.severity,
recipientEmail: event.recipient,
emailId: event['user-variables'] && event['user-variables']['email-id'],
providerId: providerId,
timestamp: new Date(event.timestamp * 1000)
};
}
}
module.exports = EmailAnalyticsMailgunProvider;