Implemented email analytics retrying (#16273)

fixes https://github.com/TryGhost/Team/issues/2562

New event fetching loops:
- Reworked the analytics fetching algorithm. Instead of starting again
where we stopped during the last fetching minus 30 minutes, we now just
continue where we stopped. But with ms precision (because no longer
database dependent after first fetch), and we stop at NOW - 1 minute to
reduce chance of missing events.
- Apart from that, a missing fetching loop is introduced. This fetches
events that are older than 30 minutes, and just processes all events a
second time to make sure we didn't skip any because of storage delays in
the Mailgun API.
- A new scheduled fetching loop, that allows us to schedule between a
given start/end date (currently only persisted in memory, so stops after
a reboot)

UI and endpoint changes:
- New UI to show the state of the analytics 'loops'
- New endpoint to request the analytics loop status
- New endpoint to schedule analytics
- New endpoint to cancel scheduled analytics
- Some number formatting improvements, and introduction of 'opened'
count in debug screen
- Live reload of data in the debug screen

Other changes:
- This also improves the support for maxEvents. We can now stop a
fetching loop after x events without worrying about lost events. This is
used to reduce the fetched events in the missing and scheduled event
loop (e.g. when the main one is fetching lots of events, we skip the
other loops).
- Prevents fetching the same events over and over again if no new events
come in (because we always started at the same begin timestamp). The
code increases the begin timestamp with 1 second if it is safe to do so,
to prevent the API from returning the same events over and over again.
- Some optimisations in handing the processing results (less merges to
reduce CPU usage in cases we have lots of events).

Testing:
- You can test with lots of events using the new mailgun mocking server
(Toolbox repo `scripts/mailgun-mock-server`). This can also simulate
events that are only returned after x minutes because of storage delays.
This commit is contained in:
Simon Backx 2023-02-20 16:44:13 +01:00
parent 51cbf13be4
commit b665b1a3cc
21 changed files with 957 additions and 343 deletions

View File

@ -278,15 +278,19 @@
</tr>
<tr>
<td>Emails sent:</td>
<td>{{this.emailSettings.emailsSent}}</td>
<td class="gh-type-number">{{ format-number this.emailSettings.emailsSent}}</td>
</tr>
<tr>
<td>Delivered:</td>
<td>{{this.emailSettings.emailsDelivered}}</td>
<td class="gh-type-number">{{ format-number this.emailSettings.emailsDelivered}}</td>
</tr>
<tr>
<td>Opened:</td>
<td class="gh-type-number">{{ format-number this.emailSettings.emailsOpened}}</td>
</tr>
<tr>
<td>Failed:</td>
<td>{{this.emailSettings.emailsFailed}}</td>
<td class="gh-type-number">{{ format-number this.emailSettings.emailsFailed}}</td>
</tr>
<tr>
<td colspan="2"><hr></td>
@ -321,8 +325,99 @@
{{/if}}
</td>
</tr>
<tr>
<td colspan="2"><hr></td>
</tr>
<tr>
<td>Analytics Latest running:</td>
<td class="gh-email-debug-settings-icon">
{{#if (and this.analyticsStatus this.analyticsStatus.latest this.analyticsStatus.latest.running) }}
<span class="check">{{svg-jar "check-2"}}</span>
{{else}}
<span class="x">{{svg-jar "close"}}</span>
{{/if}}
</td>
</tr>
<tr>
<td>Last started:</td>
<td>{{ this.analyticsStatus.latest.lastStarted }}</td>
</tr>
<tr>
<td>Fetching from:</td>
<td>{{ this.analyticsStatus.latest.lastBegin }}</td>
</tr>
<tr>
<td>Last event time:</td>
<td>{{ this.analyticsStatus.latest.lastEventTimestamp }}</td>
</tr>
<tr>
<td colspan="2"><hr></td>
</tr>
<tr>
<td>Analytics Missing running:</td>
<td class="gh-email-debug-settings-icon">
{{#if (and this.analyticsStatus this.analyticsStatus.missing this.analyticsStatus.missing.running) }}
<span class="check">{{svg-jar "check-2"}}</span>
{{else}}
<span class="x">{{svg-jar "close"}}</span>
{{/if}}
</td>
</tr>
<tr>
<td>Last started:</td>
<td>{{ this.analyticsStatus.missing.lastStarted }}</td>
</tr>
<tr>
<td>Fetching from:</td>
<td>{{ this.analyticsStatus.missing.lastBegin }}</td>
</tr>
<tr>
<td>Last event time:</td>
<td>{{ this.analyticsStatus.missing.lastEventTimestamp }}</td>
</tr>
{{#if (and this.analyticsStatus this.analyticsStatus.scheduled this.analyticsStatus.scheduled.schedule) }}
<tr>
<td colspan="2"><hr></td>
</tr>
<tr>
<td>Analytics Scheduled running:</td>
<td class="gh-email-debug-settings-icon">
{{#if this.analyticsStatus.scheduled.running }}
<span class="check">{{svg-jar "check-2"}}</span>
{{else}}
<span class="x">{{svg-jar "close"}}</span>
{{/if}}
</td>
</tr>
<tr>
<td>Schedule:</td>
<td>{{ this.analyticsStatus.scheduled.schedule.begin }} - {{ this.analyticsStatus.scheduled.schedule.end }}</td>
</tr>
<tr>
<td>Last started:</td>
<td>{{ this.analyticsStatus.scheduled.lastStarted }}</td>
</tr>
<tr>
<td>Last event time:</td>
<td>{{ this.analyticsStatus.scheduled.lastEventTimestamp }}</td>
</tr>
{{#unless this.analyticsStatus.scheduled.canceled}}
<tr>
<td colspan="2">
<button type="button" class="gh-email-debug-schedule-analytics" {{on "click" this.cancelScheduleAnalytics }}>{{svg-jar "trash"}}Cancel scheduled refetch</button>
</td>
</tr>
{{/unless}}
{{else}}
<tr>
<td colspan="2">
<button type="button" class="gh-email-debug-schedule-analytics" {{on "click" this.scheduleAnalytics }}>{{svg-jar "reload"}}Refetch Analytics</button>
</td>
</tr>
{{/if}}
</tbody>
</table>
</tabs.tabPanel>
</Tabs::Tabs>

View File

@ -1,7 +1,7 @@
import Component from '@glimmer/component';
import moment from 'moment-timezone';
import {action} from '@ember/object';
import {didCancel, task} from 'ember-concurrency';
import {didCancel, task, timeout} from 'ember-concurrency';
import {formatNumber} from 'ghost-admin/helpers/format-number';
import {ghPluralize} from 'ghost-admin/helpers/gh-pluralize';
import {inject as service} from '@ember/service';
@ -13,15 +13,30 @@ export default class Debug extends Component {
@service membersUtils;
@service utils;
@service feature;
@service store;
@tracked emailBatches = null;
@tracked recipientFailures = null;
@tracked loading = true;
@tracked analyticsStatus = null;
@tracked latestEmail = null;
get post() {
return this.args.post;
}
get email() {
return this.latestEmail ?? this.post.email;
}
async updateEmail() {
try {
this.latestEmail = await this.store.findRecord('email', this.post.email.id, {reload: true});
} catch (e) {
// Skip
}
}
get emailError() {
// get failed batches count
let failedBatches = this.emailBatchesData?.filter((batch) => {
@ -39,17 +54,18 @@ export default class Debug extends Component {
get emailSettings() {
return {
statusClass: this.post.email?.status,
status: this.getStatusLabel(this.post.email?.status),
recipientFilter: this.post.email?.recipientFilter,
createdAt: this.post.email?.createdAtUTC ? moment(this.post.email.createdAtUTC).format('DD MMM, YYYY, HH:mm:ss') : '',
submittedAt: this.post.email?.submittedAtUTC ? moment(this.post.email.submittedAtUTC).format('DD MMM, YYYY, HH:mm:ss') : '',
emailsSent: this.post.email?.emailCount,
emailsDelivered: this.post.email?.deliveredCount,
emailsFailed: this.post.email?.failedCount,
trackOpens: this.post.email?.trackOpens,
trackClicks: this.post.email?.trackClicks,
feedbackEnabled: this.post.email?.feedbackEnabled
statusClass: this.email?.status,
status: this.getStatusLabel(this.email?.status),
recipientFilter: this.email?.recipientFilter,
createdAt: this.email?.createdAtUTC ? moment(this.email.createdAtUTC).format('DD MMM, YYYY, HH:mm:ss') : '',
submittedAt: this.email?.submittedAtUTC ? moment(this.email.submittedAtUTC).format('DD MMM, YYYY, HH:mm:ss') : '',
emailsSent: this.email?.emailCount,
emailsDelivered: this.email?.deliveredCount,
emailsOpened: this.email?.openedCount,
emailsFailed: this.email?.failedCount,
trackOpens: this.email?.trackOpens,
trackClicks: this.email?.trackClicks,
feedbackEnabled: this.email?.feedbackEnabled
};
}
@ -159,6 +175,8 @@ export default class Debug extends Component {
if (this.post.email) {
this.fetchEmailBatches();
this.fetchRecipientFailures();
this.pollAnalyticsStatus.perform();
this.pollEmail.perform();
}
}
@ -204,6 +222,33 @@ export default class Debug extends Component {
}
}
@task
*pollAnalyticsStatus() {
while (true) {
yield this.fetchAnalyticsStatus();
yield timeout(5 * 1000);
}
}
@task
*pollEmail() {
while (true) {
yield timeout(10 * 1000);
yield this.updateEmail();
}
}
async fetchAnalyticsStatus() {
try {
if (this._fetchAnalyticsStatus.isRunning) {
return this._fetchAnalyticsStatus.last;
}
return this._fetchAnalyticsStatus.perform();
} catch (e) {
// Skip
}
}
@task
*_fetchRecipientFailures() {
const data = {
@ -214,4 +259,81 @@ export default class Debug extends Component {
let result = yield this.ajax.request(statsUrl, {data});
this.recipientFailures = result.failures;
}
@task
*_fetchAnalyticsStatus() {
let statsUrl = this.ghostPaths.url.api(`/emails/${this.post.email.id}/analytics`);
let result = yield this.ajax.request(statsUrl);
this.analyticsStatus = result;
// Parse dates
for (const type of Object.keys(result)) {
if (!result[type]) {
result[type] = {};
}
let object = result[type];
for (const key of ['lastStarted', 'lastBegin', 'lastEventTimestamp']) {
if (object[key]) {
object[key] = moment(object[key]).format('DD MMM, YYYY, HH:mm:ss.SSS');
} else {
object[key] = 'N/A';
}
}
if (object.schedule) {
object = object.schedule;
for (const key of ['begin', 'end']) {
if (object[key]) {
object[key] = moment(object[key]).format('DD MMM, YYYY, HH:mm:ss.SSS');
} else {
object[key] = 'N/A';
}
}
}
}
}
@action
scheduleAnalytics() {
try {
if (this._scheduleAnalytics.isRunning) {
return this._scheduleAnalytics.last;
}
return this._scheduleAnalytics.perform();
} catch (e) {
if (!didCancel(e)) {
// re-throw the non-cancelation error
throw e;
}
}
}
@task
*_scheduleAnalytics() {
let statsUrl = this.ghostPaths.url.api(`/emails/${this.post.email.id}/analytics`);
yield this.ajax.put(statsUrl, {});
yield this.fetchAnalyticsStatus();
}
@action
cancelScheduleAnalytics() {
try {
if (this._cancelScheduleAnalytics.isRunning) {
return this._cancelScheduleAnalytics.last;
}
return this._cancelScheduleAnalytics.perform();
} catch (e) {
if (!didCancel(e)) {
// re-throw the non-cancelation error
throw e;
}
}
}
@task
*_cancelScheduleAnalytics() {
let statsUrl = this.ghostPaths.url.api(`/emails/analytics`);
yield this.ajax.delete(statsUrl, {});
yield this.fetchAnalyticsStatus();
}
}

View File

@ -3585,6 +3585,10 @@ p.theme-validation-details {
margin: 12px 0 20px;
}
.gh-email-debug-settings .gh-type-number {
font-variant-numeric: tabular-nums;
}
.gh-email-debug-settings hr {
margin: 8px 0;
border-top-color: var(--whitegrey);
@ -3596,7 +3600,7 @@ p.theme-validation-details {
}
.gh-email-debug-settings tr td:first-of-type {
width: 20%;
width: 30%;
white-space: nowrap;
color: var(--midgrey);
}
@ -3620,6 +3624,25 @@ p.theme-validation-details {
stroke: var(--midgrey);
}
.gh-email-debug-schedule-analytics {
display: flex;
align-items: center;
width: max-content;
margin: .8rem 0 0;
color: var(--green-d1);
}
.gh-email-debug-schedule-analytics svg {
width: 1rem;
height: 1rem;
margin-right: 6px;
}
.gh-email-debug-schedule-analytics svg g {
stroke: var(--green-d1);
stroke-width: 3px;
}
.gh-email-debug-empty-list {
margin: 120px 40px;
text-align: center;

View File

@ -4,6 +4,7 @@ const errors = require('@tryghost/errors');
const megaService = require('../../services/mega');
const emailService = require('../../services/email-service');
const labs = require('../../../shared/labs');
const emailAnalytics = require('../../services/email-analytics');
const messages = {
emailNotFound: 'Email not found.',
@ -140,5 +141,39 @@ module.exports = {
const filter = `email_id:'${frame.data.id}'` + (frame.options.filter ? `+(${frame.options.filter})` : '');
return await models.EmailRecipientFailure.findPage({...frame.options, filter});
}
},
analyticsStatus: {
permissions: {
method: 'browse'
},
async query() {
return emailAnalytics.service.getStatus();
}
},
scheduleAnalytics: {
permissions: {
method: 'browse'
},
data: [
'id'
],
async query(frame) {
const model = await models.Email.findOne(frame.data, frame.options);
return emailAnalytics.service.schedule({
begin: model.get('created_at'),
end: new Date(Math.min(Date.now() - 60 * 60 * 1000, model.get('created_at').getTime() + 24 * 60 * 60 * 1000 * 7))
});
}
},
cancelScheduledAnalytics: {
permissions: {
method: 'browse'
},
async query() {
return emailAnalytics.service.cancelScheduled();
}
}
};

View File

@ -28,5 +28,9 @@ module.exports = {
if (response.meta) {
frame.response.meta = response.meta;
}
},
analyticsStatus(response, apiConfig, frame) {
frame.response = response;
}
};

View File

@ -15,9 +15,24 @@ module.exports = {
const startDate = new Date();
// three separate queries is much faster than using max/greatest (with coalesce to handle nulls) across columns
const {maxDeliveredAt} = await db.knex('email_recipients').select(db.knex.raw('MAX(delivered_at) as maxDeliveredAt')).first() || {};
const {maxOpenedAt} = await db.knex('email_recipients').select(db.knex.raw('MAX(opened_at) as maxOpenedAt')).first() || {};
const {maxFailedAt} = await db.knex('email_recipients').select(db.knex.raw('MAX(failed_at) as maxFailedAt')).first() || {};
let {maxDeliveredAt} = await db.knex('email_recipients').select(db.knex.raw('MAX(delivered_at) as maxDeliveredAt')).first() || {};
let {maxOpenedAt} = await db.knex('email_recipients').select(db.knex.raw('MAX(opened_at) as maxOpenedAt')).first() || {};
let {maxFailedAt} = await db.knex('email_recipients').select(db.knex.raw('MAX(failed_at) as maxFailedAt')).first() || {};
if (maxDeliveredAt && !(maxDeliveredAt instanceof Date)) {
// SQLite returns a string instead of a Date
maxDeliveredAt = new Date(maxDeliveredAt);
}
if (maxOpenedAt && !(maxOpenedAt instanceof Date)) {
// SQLite returns a string instead of a Date
maxOpenedAt = new Date(maxOpenedAt);
}
if (maxFailedAt && !(maxFailedAt instanceof Date)) {
// SQLite returns a string instead of a Date
maxFailedAt = new Date(maxFailedAt);
}
const lastSeenEventTimestamp = _.max([maxDeliveredAt, maxOpenedAt, maxFailedAt]);
debug(`getLastSeenEventTimestamp: finished in ${Date.now() - startDate}ms`);

View File

@ -1,5 +1,4 @@
const logging = require('@tryghost/logging');
const debug = require('@tryghost/debug')('jobs:email-analytics:fetch-latest');
class EmailAnalyticsServiceWrapper {
init() {
@ -55,22 +54,40 @@ class EmailAnalyticsServiceWrapper {
});
}
async fetchLatest() {
async fetchLatest({maxEvents} = {maxEvents: Infinity}) {
logging.info('[EmailAnalytics] Fetch latest started');
const fetchStartDate = new Date();
debug('Starting email analytics fetch of latest events');
const eventStats = await this.service.fetchLatest();
const totalEvents = await this.service.fetchLatest({maxEvents});
const fetchEndDate = new Date();
debug(`Finished fetching ${eventStats.totalEvents} analytics events in ${fetchEndDate.getTime() - fetchStartDate.getTime()}ms`);
const aggregateStartDate = new Date();
debug(`Starting email analytics aggregation for ${eventStats.emailIds.length} emails`);
await this.service.aggregateStats(eventStats);
const aggregateEndDate = new Date();
debug(`Finished aggregating email analytics in ${aggregateEndDate.getTime() - aggregateStartDate.getTime()}ms`);
logging.info(`[EmailAnalytics] Fetched ${totalEvents} events and aggregated stats in ${fetchEndDate.getTime() - fetchStartDate.getTime()}ms (latest)`);
return totalEvents;
}
logging.info(`Fetched ${eventStats.totalEvents} events and aggregated stats for ${eventStats.emailIds.length} emails in ${aggregateEndDate.getTime() - fetchStartDate.getTime()}ms`);
async fetchMissing({maxEvents} = {maxEvents: Infinity}) {
logging.info('[EmailAnalytics] Fetch missing started');
return eventStats;
const fetchStartDate = new Date();
const totalEvents = await this.service.fetchMissing({maxEvents});
const fetchEndDate = new Date();
logging.info(`[EmailAnalytics] Fetched ${totalEvents} events and aggregated stats in ${fetchEndDate.getTime() - fetchStartDate.getTime()}ms (missing)`);
return totalEvents;
}
async fetchScheduled({maxEvents}) {
if (maxEvents < 300) {
return 0;
}
logging.info('[EmailAnalytics] Fetch scheduled started');
const fetchStartDate = new Date();
const totalEvents = await this.service.fetchScheduled({maxEvents});
const fetchEndDate = new Date();
logging.info(`[EmailAnalytics] Fetched ${totalEvents} events and aggregated stats in ${fetchEndDate.getTime() - fetchStartDate.getTime()}ms (scheduled)`);
return totalEvents;
}
async startFetch() {
@ -80,12 +97,14 @@ class EmailAnalyticsServiceWrapper {
}
this.fetching = true;
logging.info('Email analytics fetch started');
try {
const eventStats = await this.fetchLatest();
const c1 = await this.fetchLatest({maxEvents: Infinity});
const c2 = await this.fetchMissing({maxEvents: Infinity});
// Only fetch scheduled if we didn't fetch a lot of normal events
await this.fetchScheduled({maxEvents: 20000 - c1 - c2});
this.fetching = false;
return eventStats;
} catch (e) {
logging.error(e, 'Error while fetching email analytics');

View File

@ -300,6 +300,9 @@ module.exports = function apiRoutes() {
router.put('/emails/:id/retry', mw.authAdminApi, http(api.emails.retry));
router.get('/emails/:id/batches', mw.authAdminApi, http(api.emails.browseBatches));
router.get('/emails/:id/recipient-failures', mw.authAdminApi, http(api.emails.browseFailures));
router.get('/emails/:id/analytics', mw.authAdminApi, http(api.emails.analyticsStatus));
router.put('/emails/:id/analytics', mw.authAdminApi, http(api.emails.scheduleAnalytics));
router.delete('/emails/analytics', mw.authAdminApi, http(api.emails.cancelScheduledAnalytics));
// ## Snippets
router.get('/snippets', mw.authAdminApi, http(api.snippets.browse));

View File

@ -66,7 +66,7 @@ describe('EmailEventStorage', function () {
}
},
// unix timestamp
timestamp: Math.round(timestamp.getTime() / 1000)
timestamp: timestamp.getTime() / 1000
}];
const initialModel = await models.EmailRecipient.findOne({
@ -78,9 +78,7 @@ describe('EmailEventStorage', function () {
// Fire event processing
// We use offloading to have correct coverage and usage of worker thread
const result = await emailAnalytics.fetchLatest();
assert.equal(result.delivered, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
assert.equal(result, 1);
// Since this is all event based we should wait for all dispatched events to be completed.
await DomainEvents.allSettled();
@ -129,9 +127,7 @@ describe('EmailEventStorage', function () {
// Fire event processing
const result = await emailAnalytics.fetchLatest();
assert.equal(result.delivered, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
assert.equal(result, 1);
// Since this is all event based we should wait for all dispatched events to be completed.
await DomainEvents.allSettled();
@ -177,9 +173,7 @@ describe('EmailEventStorage', function () {
// Fire event processing
const result = await emailAnalytics.fetchLatest();
assert.equal(result.opened, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
assert.equal(result, 1);
// Since this is all event based we should wait for all dispatched events to be completed.
await DomainEvents.allSettled();
@ -259,9 +253,7 @@ describe('EmailEventStorage', function () {
// Fire event processing
const result = await emailAnalytics.fetchLatest();
assert.equal(result.permanentFailed, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
assert.equal(result, 1);
// Since this is all event based we should wait for all dispatched events to be completed.
await DomainEvents.allSettled();
@ -357,9 +349,7 @@ describe('EmailEventStorage', function () {
// Fire event processing
const result = await emailAnalytics.fetchLatest();
assert.equal(result.permanentFailed, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
assert.equal(result, 1);
// Since this is all event based we should wait for all dispatched events to be completed.
await DomainEvents.allSettled();
@ -454,9 +444,7 @@ describe('EmailEventStorage', function () {
// Fire event processing
const result = await emailAnalytics.fetchLatest();
assert.equal(result.permanentFailed, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
assert.equal(result, 1);
// Since this is all event based we should wait for all dispatched events to be completed.
await DomainEvents.allSettled();
@ -546,9 +534,7 @@ describe('EmailEventStorage', function () {
// Fire event processing
const result = await emailAnalytics.fetchLatest();
assert.equal(result.permanentFailed, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
assert.equal(result, 1);
// Since this is all event based we should wait for all dispatched events to be completed.
await DomainEvents.allSettled();
@ -664,9 +650,7 @@ describe('EmailEventStorage', function () {
// Fire event processing
const result = await emailAnalytics.fetchLatest();
assert.equal(result.temporaryFailed, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
assert.equal(result, 1);
// Since this is all event based we should wait for all dispatched events to be completed.
await DomainEvents.allSettled();
@ -768,9 +752,7 @@ describe('EmailEventStorage', function () {
// Fire event processing
const result = await emailAnalytics.fetchLatest();
assert.equal(result.temporaryFailed, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
assert.equal(result, 1);
// Since this is all event based we should wait for all dispatched events to be completed.
await DomainEvents.allSettled();
@ -872,9 +854,7 @@ describe('EmailEventStorage', function () {
// Fire event processing
const result = await emailAnalytics.fetchLatest();
assert.equal(result.temporaryFailed, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
assert.equal(result, 1);
// Since this is all event based we should wait for all dispatched events to be completed.
await DomainEvents.allSettled();
@ -976,9 +956,7 @@ describe('EmailEventStorage', function () {
// Fire event processing
const result = await emailAnalytics.fetchLatest();
assert.equal(result.permanentFailed, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
assert.equal(result, 1);
// Since this is all event based we should wait for all dispatched events to be completed.
await DomainEvents.allSettled();
@ -1042,9 +1020,7 @@ describe('EmailEventStorage', function () {
// Fire event processing
const result = await emailAnalytics.fetchLatest();
assert.equal(result.complained, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
assert.equal(result, 1);
// Since this is all event based we should wait for all dispatched events to be completed.
await DomainEvents.allSettled();
@ -1093,9 +1069,7 @@ describe('EmailEventStorage', function () {
// Fire event processing
const result = await emailAnalytics.fetchLatest();
assert.equal(result.unsubscribed, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
assert.equal(result, 1);
// Since this is all event based we should wait for all dispatched events to be completed.
await DomainEvents.allSettled();
@ -1132,9 +1106,7 @@ describe('EmailEventStorage', function () {
// Fire event processing
const result = await emailAnalytics.fetchLatest();
assert.equal(result.unhandled, 1);
assert.deepEqual(result.emailIds, []);
assert.deepEqual(result.memberIds, []);
assert.equal(result, 1);
});
it('Ignores invalid events', async function () {
@ -1148,8 +1120,6 @@ describe('EmailEventStorage', function () {
// Fire event processing
const result = await emailAnalytics.fetchLatest();
assert.equal(result.unhandled, 0);
assert.deepEqual(result.emailIds, []);
assert.deepEqual(result.memberIds, []);
assert.equal(result, 0);
});
});

View File

@ -44,7 +44,7 @@ describe('MailgunEmailSuppressionList', function () {
recipient
})];
await emailAnalytics.startFetch();
await emailAnalytics.fetchLatest();
await DomainEvents.allSettled();
const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`);
@ -72,7 +72,7 @@ describe('MailgunEmailSuppressionList', function () {
recipient
})];
await emailAnalytics.startFetch();
await emailAnalytics.fetchLatest();
await DomainEvents.allSettled();
const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`);
@ -100,7 +100,7 @@ describe('MailgunEmailSuppressionList', function () {
recipient
})];
await emailAnalytics.startFetch();
await emailAnalytics.fetchLatest();
await DomainEvents.allSettled();
const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`);
@ -128,7 +128,7 @@ describe('MailgunEmailSuppressionList', function () {
recipient
})];
await emailAnalytics.startFetch();
await emailAnalytics.fetchLatest();
await DomainEvents.allSettled();
const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`);
@ -163,7 +163,7 @@ describe('MailgunEmailSuppressionList', function () {
timestamp: Math.round(timestamp.getTime() / 1000)
}];
await emailAnalytics.startFetch();
await emailAnalytics.fetchLatest();
await DomainEvents.allSettled();
const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`);

View File

@ -1,10 +1,7 @@
const MailgunClient = require('@tryghost/mailgun-client');
const moment = require('moment');
const {EventProcessingResult} = require('@tryghost/email-analytics-service');
const EVENT_FILTER = 'delivered OR opened OR failed OR unsubscribed OR complained';
const PAGE_LIMIT = 300;
const TRUST_THRESHOLD_S = 30 * 60; // 30 minutes
const DEFAULT_TAGS = ['bulk-email'];
class EmailAnalyticsProviderMailgun {
@ -19,44 +16,30 @@ class EmailAnalyticsProviderMailgun {
}
}
/**
* Do not start from a particular time, grab latest then work back through
* pages until we get a blank response
*
* @param {Function} batchHandler
* @param {Object} [options]
*/
fetchAll(batchHandler, options) {
const mailgunOptions = {
event: EVENT_FILTER,
limit: PAGE_LIMIT,
tags: this.tags.join(' AND ')
};
return this.#fetchAnalytics(mailgunOptions, batchHandler, options);
}
/**
* Fetch from the last known timestamp-TRUST_THRESHOLD then work forwards
* through pages until we get a blank response. This lets us get events
* quicker than the TRUST_THRESHOLD
*
* @param {Date} latestTimestamp
* @param {Function} batchHandler
* @param {Object} [options]
* @param {Number} [options.maxEvents] Not a strict maximum. We stop fetching after we reached the maximum AND received at least one event after begin (not equal) to prevent deadlocks.
* @param {Date} [options.begin]
* @param {Date} [options.end]
*/
fetchLatest(latestTimestamp, batchHandler, options) {
const beginDate = moment(latestTimestamp).subtract(TRUST_THRESHOLD_S, 's').toDate();
fetchLatest(batchHandler, options) {
const mailgunOptions = {
limit: PAGE_LIMIT,
event: EVENT_FILTER,
tags: this.tags.join(' AND '),
begin: beginDate.toUTCString(),
begin: options.begin ? options.begin.getTime() / 1000 : undefined,
end: options.end ? options.end.getTime() / 1000 : undefined,
ascending: 'yes'
};
return this.#fetchAnalytics(mailgunOptions, batchHandler, options);
return this.#fetchAnalytics(mailgunOptions, batchHandler, {
maxEvents: options.maxEvents
});
}
/**
@ -68,17 +51,10 @@ class EmailAnalyticsProviderMailgun {
* @param {String} [mailgunOptions.ascending]
* @param {Function} batchHandler
* @param {Object} [options]
* @param {Number} [options.maxEvents] Not a strict maximum. We stop fetching after we reached the maximum AND received at least one event after begin (not equal) to prevent deadlocks.
*/
async #fetchAnalytics(mailgunOptions, batchHandler, options) {
const events = await this.mailgunClient.fetchEvents(mailgunOptions, batchHandler, options);
const processingResult = new EventProcessingResult();
for (const event of events) {
processingResult.merge(event);
}
return processingResult;
return await this.mailgunClient.fetchEvents(mailgunOptions, batchHandler, options);
}
}

View File

@ -7,7 +7,7 @@
"main": "index.js",
"scripts": {
"dev": "echo \"Implement me!\"",
"test:unit": "NODE_ENV=testing c8 --all --reporter text --reporter cobertura --check-coverage mocha './test/**/*.test.js'",
"test:unit": "NODE_ENV=testing c8 --all --reporter text --reporter cobertura --check-coverage --100 mocha './test/**/*.test.js'",
"test": "yarn test:unit",
"lint": "eslint . --ext .js --cache"
},

View File

@ -38,64 +38,15 @@ describe('EmailAnalyticsProviderMailgun', function () {
sinon.restore();
});
describe('fetchAll()', function () {
const MAILGUN_OPTIONS = {
event: 'delivered OR opened OR failed OR unsubscribed OR complained',
limit: 300,
tags: 'bulk-email'
};
it('passes the correct parameters to mailgun-client', async function () {
const configStub = sinon.stub(config, 'get');
configStub.withArgs('bulkEmail').returns({
mailgun: {
apiKey: 'apiKey',
domain: 'domain.com',
baseUrl: 'https://api.mailgun.net/v3'
}
});
const mailgunProvider = new EmailAnalyticsProviderMailgun({config, settings});
const batchHandler = sinon.spy();
const mailgunFetchEventsStub = sinon.stub(mailgunProvider.mailgunClient, 'fetchEvents').returns(SAMPLE_EVENTS);
await mailgunProvider.fetchAll(batchHandler);
sinon.assert.calledWithExactly(mailgunFetchEventsStub, MAILGUN_OPTIONS, batchHandler, undefined);
});
it('uses custom tags when supplied', async function () {
const configStub = sinon.stub(config, 'get');
configStub.withArgs('bulkEmail').returns({
mailgun: {
apiKey: 'apiKey',
domain: 'domain.com',
baseUrl: 'https://api.mailgun.net/v3'
}
});
configStub.withArgs('bulkEmail:mailgun:tag').returns('custom-tag');
const mailgunProvider = new EmailAnalyticsProviderMailgun({config, settings});
const batchHandler = sinon.spy();
const mailgunFetchEventsStub = sinon.stub(mailgunProvider.mailgunClient, 'fetchEvents').returns(SAMPLE_EVENTS);
await mailgunProvider.fetchAll(batchHandler);
sinon.assert.calledWithExactly(mailgunFetchEventsStub, {
...MAILGUN_OPTIONS,
tags: 'bulk-email AND custom-tag'
}, batchHandler, undefined);
});
});
describe('fetchLatest()', function () {
const LATEST_TIMESTAMP = new Date('Thu Feb 25 2021 12:00:00 GMT+0000');
const END_EXAMPLE = new Date('Thu Feb 25 2021 14:00:00 GMT+0000');
const MAILGUN_OPTIONS = {
event: 'delivered OR opened OR failed OR unsubscribed OR complained',
limit: 300,
tags: 'bulk-email',
begin: 'Thu, 25 Feb 2021 11:30:00 GMT',
begin: 1614254400,
end: undefined,
ascending: 'yes'
};
@ -114,8 +65,71 @@ describe('EmailAnalyticsProviderMailgun', function () {
const batchHandler = sinon.spy();
const mailgunFetchEventsStub = sinon.stub(mailgunProvider.mailgunClient, 'fetchEvents').returns(SAMPLE_EVENTS);
await mailgunProvider.fetchLatest(LATEST_TIMESTAMP, batchHandler);
sinon.assert.calledWithExactly(mailgunFetchEventsStub, MAILGUN_OPTIONS, batchHandler, undefined);
await mailgunProvider.fetchLatest(batchHandler, {begin: LATEST_TIMESTAMP});
sinon.assert.calledWithExactly(mailgunFetchEventsStub, MAILGUN_OPTIONS, batchHandler, {maxEvents: undefined});
});
it('can use end timestamp', async function () {
const configStub = sinon.stub(config, 'get');
configStub.withArgs('bulkEmail').returns({
mailgun: {
apiKey: 'apiKey',
domain: 'domain.com',
baseUrl: 'https://api.mailgun.net/v3'
}
});
const mailgunProvider = new EmailAnalyticsProviderMailgun({config, settings});
const batchHandler = sinon.spy();
const mailgunFetchEventsStub = sinon.stub(mailgunProvider.mailgunClient, 'fetchEvents').returns(SAMPLE_EVENTS);
await mailgunProvider.fetchLatest(batchHandler, {begin: LATEST_TIMESTAMP, end: END_EXAMPLE});
const END_EXAMPLE_UNIX = END_EXAMPLE.getTime() / 1000;
sinon.assert.calledWithExactly(mailgunFetchEventsStub, {...MAILGUN_OPTIONS, end: END_EXAMPLE_UNIX}, batchHandler, {maxEvents: undefined});
});
it('can use end without begin', async function () {
const configStub = sinon.stub(config, 'get');
configStub.withArgs('bulkEmail').returns({
mailgun: {
apiKey: 'apiKey',
domain: 'domain.com',
baseUrl: 'https://api.mailgun.net/v3'
}
});
const mailgunProvider = new EmailAnalyticsProviderMailgun({config, settings});
const batchHandler = sinon.spy();
const mailgunFetchEventsStub = sinon.stub(mailgunProvider.mailgunClient, 'fetchEvents').returns(SAMPLE_EVENTS);
await mailgunProvider.fetchLatest(batchHandler, {end: END_EXAMPLE});
const END_EXAMPLE_UNIX = END_EXAMPLE.getTime() / 1000;
sinon.assert.calledWithExactly(mailgunFetchEventsStub, {...MAILGUN_OPTIONS, begin: undefined, end: END_EXAMPLE_UNIX}, batchHandler, {maxEvents: undefined});
});
it('can use max events', async function () {
const configStub = sinon.stub(config, 'get');
configStub.withArgs('bulkEmail').returns({
mailgun: {
apiKey: 'apiKey',
domain: 'domain.com',
baseUrl: 'https://api.mailgun.net/v3'
}
});
const mailgunProvider = new EmailAnalyticsProviderMailgun({config, settings});
const batchHandler = sinon.spy();
const mailgunFetchEventsStub = sinon.stub(mailgunProvider.mailgunClient, 'fetchEvents').returns(SAMPLE_EVENTS);
await mailgunProvider.fetchLatest(batchHandler, {begin: LATEST_TIMESTAMP, end: END_EXAMPLE, maxEvents: 1000});
const END_EXAMPLE_UNIX = END_EXAMPLE.getTime() / 1000;
sinon.assert.calledWithExactly(mailgunFetchEventsStub, {...MAILGUN_OPTIONS, end: END_EXAMPLE_UNIX}, batchHandler, {maxEvents: 1000});
});
it('uses custom tags when supplied', async function () {
@ -134,12 +148,12 @@ describe('EmailAnalyticsProviderMailgun', function () {
const batchHandler = sinon.spy();
const mailgunFetchEventsStub = sinon.stub(mailgunProvider.mailgunClient, 'fetchEvents').returns(SAMPLE_EVENTS);
await mailgunProvider.fetchLatest(LATEST_TIMESTAMP, batchHandler);
await mailgunProvider.fetchLatest(batchHandler, {begin: LATEST_TIMESTAMP});
sinon.assert.calledWithExactly(mailgunFetchEventsStub, {
...MAILGUN_OPTIONS,
tags: 'bulk-email AND custom-tag'
}, batchHandler, undefined);
}, batchHandler, {maxEvents: undefined});
});
});
});

View File

@ -1,11 +1,27 @@
const EventProcessingResult = require('./event-processing-result');
const debug = require('@tryghost/debug')('services:email-analytics');
const logging = require('@tryghost/logging');
const errors = require('@tryghost/errors');
/**
* @typedef {import('@tryghost/email-service').EmailEventProcessor} EmailEventProcessor
*/
/**
* @typedef {object} FetchData
* @property {boolean} running
* @property {Date} [lastStarted] Date the last fetch started on
* @property {Date} [lastBegin] The begin time used during the last fetch
* @property {Date} [lastEventTimestamp]
* @property {boolean} [canceled] Set to quit the job early
*/
/**
* @typedef {FetchData & {schedule: {begin: Date, end: Date}}} FetchDataScheduled
*/
const TRUST_THRESHOLD_MS = 30 * 60 * 1000; // 30 minutes
const FETCH_LATEST_END_MARGIN_MS = 1 * 60 * 1000; // Do not fetch events newer than 1 minute (yet). Reduces the chance of having missed events in fetchLatest.
module.exports = class EmailAnalyticsService {
config;
settings;
@ -13,6 +29,21 @@ module.exports = class EmailAnalyticsService {
eventProcessor;
providers;
/**
* @type {FetchData}
*/
#fetchLatestData = null;
/**
* @type {FetchData}
*/
#fetchMissingData = null;
/**
* @type {FetchDataScheduled}
*/
#fetchScheduledData = null;
/**
* @param {object} dependencies
* @param {EmailEventProcessor} dependencies.eventProcessor
@ -25,67 +56,265 @@ module.exports = class EmailAnalyticsService {
this.providers = providers;
}
async fetchAll() {
const result = new EventProcessingResult();
getStatus() {
return {
latest: this.#fetchLatestData,
missing: this.#fetchMissingData,
scheduled: this.#fetchScheduledData
};
}
const shouldFetchStats = await this.queries.shouldFetchStats();
if (!shouldFetchStats) {
debug('fetchAll: skipping - fetch requirements not met');
return result;
}
const startFetch = new Date();
debug('fetchAll: starting');
for (const [, provider] of Object.entries(this.providers)) {
const providerResults = await provider.fetchAll(this.processEventBatch.bind(this));
result.merge(providerResults);
}
debug(`fetchAll: finished (${Date.now() - startFetch}ms)`);
return result;
/**
* Returns the timestamp of the last event we processed. Defaults to now minus 30 minutes if we have no data yet.
*/
async getLastEventTimestamp() {
return this.#fetchLatestData?.lastEventTimestamp ?? (await this.queries.getLastSeenEventTimestamp()) ?? new Date(Date.now() - TRUST_THRESHOLD_MS);
}
async fetchLatest({maxEvents = Infinity} = {}) {
const result = new EventProcessingResult();
// Start where we left of, or the last stored event in the database, or start 30 minutes ago if we have nothing available
const begin = await this.getLastEventTimestamp();
const end = new Date(Date.now() - FETCH_LATEST_END_MARGIN_MS); // ALways stop at x minutes ago to give Mailgun a bit more time to stabilize storage
const shouldFetchStats = await this.queries.shouldFetchStats();
if (!shouldFetchStats) {
debug('fetchLatest: skipping - fetch requirements not met');
return result;
if (end < begin) {
// Skip for now
logging.info('[EmailAnalytics] Skipping fetchLatest because end (' + end + ') is before begin (' + begin + ')');
//return 0;
}
const lastTimestamp = await this.queries.getLastSeenEventTimestamp();
const startFetch = new Date();
debug('fetchLatest: starting');
providersLoop:
for (const [, provider] of Object.entries(this.providers)) {
const providerResults = await provider.fetchLatest(lastTimestamp, this.processEventBatch.bind(this), {maxEvents});
result.merge(providerResults);
if (result.totalEvents >= maxEvents) {
break providersLoop;
}
// Create the fetch data object if it doesn't exist yet
if (!this.#fetchLatestData) {
this.#fetchLatestData = {
running: false
};
}
debug(`fetchLatest: finished in ${Date.now() - startFetch}ms. Fetched ${result.totalEvents} events`);
return result;
return await this.#fetchEvents(this.#fetchLatestData, {begin, end, maxEvents});
}
async processEventBatch(events) {
const result = new EventProcessingResult();
/**
* Fetches events that are older than 30 minutes, because then the 'storage' of the Mailgun API is stable. And we are sure we don't miss any events.
* @param {object} options
* @param {number} [options.maxEvents] Not a strict maximum. We stop fetching after we reached the maximum AND received at least one event after begin (not equal) to prevent deadlocks.
*/
async fetchMissing({maxEvents = Infinity} = {}) {
// We start where we left of, or 1,5h ago after a server restart
const begin = this.#fetchMissingData?.lastEventTimestamp ?? this.#fetchMissingData?.lastBegin ?? new Date(Date.now() - TRUST_THRESHOLD_MS * 3);
for (const event of events) {
const batchResult = await this.processEvent(event);
result.merge(batchResult);
// Always stop at the time the fetchLatest started fetching on, or maximum 30 minutes ago
const end = new Date(
Math.min(
Date.now() - TRUST_THRESHOLD_MS,
this.#fetchLatestData?.lastBegin?.getTime()
)
);
if (end < begin) {
// Skip for now
logging.info('[EmailAnalytics] Skipping fetchMissing because end (' + end + ') is before begin (' + begin + ')');
return 0;
}
return result;
// Create the fetch data object if it doesn't exist yet
if (!this.#fetchMissingData) {
this.#fetchMissingData = {
running: false
};
}
return await this.#fetchEvents(this.#fetchMissingData, {begin, end, maxEvents});
}
/**
* Schedule a new fetch that should happen
*/
schedule({begin, end}) {
if (this.#fetchScheduledData && this.#fetchScheduledData.running) {
throw new errors.ValidationError({
message: 'Already fetching scheduled events. Wait for it to finish before scheduling a new one.'
});
}
logging.info('[EmailAnalytics] Scheduling fetch from ' + begin.toISOString() + ' until ' + end.toISOString());
this.#fetchScheduledData = {
running: false,
schedule: {
begin,
end
}
};
}
cancelScheduled() {
if (this.#fetchScheduledData) {
if (this.#fetchScheduledData.running) {
// Cancel the running fetch
this.#fetchScheduledData.canceled = true;
} else {
this.#fetchScheduledData = null;
}
}
}
/**
* Continues fetching the scheduled events (does not start one). Resets the scheduled event when received 0 events.
*/
async fetchScheduled({maxEvents = Infinity} = {}) {
if (!this.#fetchScheduledData || !this.#fetchScheduledData.schedule) {
// Nothing scheduled
return 0;
}
if (this.#fetchScheduledData.canceled) {
// Skip for now
this.#fetchScheduledData = null;
return 0;
}
let begin = this.#fetchScheduledData.schedule.begin;
const end = this.#fetchScheduledData.schedule.end;
if (this.#fetchScheduledData.lastEventTimestamp && this.#fetchScheduledData.lastEventTimestamp > begin) {
// Continue where we left of
begin = this.#fetchScheduledData.lastEventTimestamp;
}
if (end < begin) {
// Skip for now
logging.info('[EmailAnalytics] Skipping fetchScheduled because end is before begin');
this.#fetchScheduledData = null;
return 0;
}
const count = await this.#fetchEvents(this.#fetchScheduledData, {begin, end, maxEvents});
if (count === 0 || this.#fetchScheduledData.canceled) {
// Reset the scheduled fetch
this.#fetchScheduledData = null;
}
return count;
}
/**
* Start fetching analytics and store the data of the progress inside fetchData
* @param {FetchData} fetchData
* @param {object} options
* @param {Date} options.begin
* @param {Date} options.end
* @param {number} [options.maxEvents] Not a strict maximum. We stop fetching after we reached the maximum AND received at least one event after begin (not equal) to prevent deadlocks.
*/
async #fetchEvents(fetchData, {begin, end, maxEvents = Infinity}) {
// Start where we left of, or the last stored event in the database, or start 30 minutes ago if we have nothing available
logging.info('[EmailAnalytics] Fetching from ' + begin.toISOString() + ' until ' + end.toISOString() + ' (maxEvents: ' + maxEvents + ')');
// Store that we started fetching
fetchData.running = true;
fetchData.lastStarted = new Date();
fetchData.lastBegin = begin;
let lastAggregation = Date.now();
let eventCount = 0;
// We keep the processing result here, so we also have a result in case of failures
let processingResult = new EventProcessingResult();
let error = null;
const processBatch = async (events) => {
// Even if the fetching is interrupted because of an error, we still store the last event timestamp
await this.processEventBatch(events, processingResult, fetchData);
eventCount += events.length;
// Every 5 minutes or 5000 members we do an aggregation and clear the processingResult
// Otherwise we need to loop a lot of members afterwards, and this takes too long without updating the stat counts in between
if (Date.now() - lastAggregation > 5 * 60 * 1000 || processingResult.memberIds.length > 5000) {
// Aggregate and clear the processingResult
// We do this here because otherwise it could take a long time before the new events are visible in the stats
try {
await this.aggregateStats(processingResult);
lastAggregation = Date.now();
processingResult = new EventProcessingResult();
} catch (err) {
logging.error('[EmailAnalytics] Error while aggregating stats');
logging.error(err);
}
}
if (fetchData.canceled) {
throw new errors.InternalServerError({
message: 'Fetching canceled'
});
}
};
try {
for (const provider of this.providers) {
await provider.fetchLatest(processBatch, {begin, end, maxEvents});
}
logging.info('[EmailAnalytics] Fetching finshed');
} catch (err) {
if (err.message !== 'Fetching canceled') {
logging.error('[EmailAnalytics] Error while fetching');
logging.error(err);
error = err;
} else {
logging.error('[EmailAnalytics] Canceled fetching');
}
}
// Aggregate
try {
await this.aggregateStats(processingResult);
} catch (err) {
logging.error('[EmailAnalytics] Error while aggregating stats');
logging.error(err);
if (!error) {
error = err;
}
}
// Small trick: if reached the end of new events, we are going to keep
// fetching the same events because 'begin' won't change
// So if we didn't have errors while fetching, and total events < maxEvents, increase lastEventTimestamp with one second
if (!error && eventCount > 0 && eventCount < maxEvents && fetchData.lastEventTimestamp && fetchData.lastEventTimestamp.getTime() < Date.now() - 2000) {
logging.info('[EmailAnalytics] Reached end of new events, increasing lastEventTimestamp with one second');
fetchData.lastEventTimestamp = new Date(fetchData.lastEventTimestamp.getTime() + 1000);
}
fetchData.running = false;
if (error) {
throw error;
}
return eventCount;
}
/**
* @param {any[]} events
* @param {FetchData} fetchData
*/
async processEventBatch(events, result, fetchData) {
const processStart = Date.now();
for (const event of events) {
const batchResult = await this.processEvent(event);
// Save last event timestamp
if (!fetchData.lastEventTimestamp || (event.timestamp && event.timestamp > fetchData.lastEventTimestamp)) {
fetchData.lastEventTimestamp = event.timestamp;
}
result.merge(batchResult);
}
const processEnd = Date.now();
const time = processEnd - processStart;
if (time > 1000) {
// This is a means to show in the logs that the analytics job is still alive.
logging.warn(`[EmailAnalytics] Processing event batch took ${(time / 1000).toFixed(1)}s`);
}
}
/**
*
* @param {{id: string, type: any; severity: any; recipientEmail: any; emailId: any; providerId: string; timestamp: Date; error: {code: number; message: string; enhandedCode: string|number} | null}} event
* @param {{id: string, type: any; severity: any; recipientEmail: any; emailId?: string; providerId: string; timestamp: Date; error: {code: number; message: string; enhandedCode: string|number} | null}} event
* @returns {Promise<EventProcessingResult>}
*/
async processEvent(event) {
@ -177,12 +406,12 @@ module.exports = class EmailAnalyticsService {
}
async aggregateStats({emailIds = [], memberIds = []}) {
logging.info(`Aggregating email analytics for ${emailIds.length} emails`);
logging.info(`[EmailAnalytics] Aggregating for ${emailIds.length} emails`);
for (const emailId of emailIds) {
await this.aggregateEmailStats(emailId);
}
logging.info(`Aggregating email analytics for ${memberIds.length} members`);
logging.info(`[EmailAnalytics] Aggregating for ${memberIds.length} members`);
for (const memberId of memberIds) {
await this.aggregateMemberStats(memberId);
}

View File

@ -29,86 +29,6 @@ describe('EmailAnalyticsService', function () {
});
});
describe('fetchAll', function () {
let providers;
let queries;
beforeEach(function () {
providers = {
testing: {
async fetchAll(batchHandler) {
const result = new EventProcessingResult();
// first page
result.merge(await batchHandler([{
type: 'delivered',
emailId: 1,
memberId: 1
}, {
type: 'delivered',
emailId: 1,
memberId: 1
}]));
// second page
result.merge(await batchHandler([{
type: 'opened',
emailId: 1,
memberId: 1
}, {
type: 'opened',
emailId: 1,
memberId: 1
}]));
return result;
}
}
};
queries = {
shouldFetchStats: sinon.fake.resolves(true)
};
});
it('uses passed-in providers', async function () {
const service = new EmailAnalyticsService({
queries,
eventProcessor,
providers
});
const result = await service.fetchAll();
queries.shouldFetchStats.calledOnce.should.be.true();
eventProcessor.handleDelivered.calledTwice.should.be.true();
result.should.deepEqual(new EventProcessingResult({
delivered: 2,
opened: 2,
emailIds: [1],
memberIds: [1]
}));
});
it('skips if queries.shouldFetchStats is falsy', async function () {
queries.shouldFetchStats = sinon.fake.resolves(false);
const service = new EmailAnalyticsService({
queries,
eventProcessor,
providers
});
const result = await service.fetchAll();
queries.shouldFetchStats.calledOnce.should.be.true();
eventProcessor.handleDelivered.called.should.be.false();
result.should.deepEqual(new EventProcessingResult());
});
});
describe('fetchLatest', function () {
});
@ -119,16 +39,23 @@ describe('EmailAnalyticsService', function () {
eventProcessor
});
const result = await service.processEventBatch([{
const result = new EventProcessingResult();
const fetchData = {
};
await service.processEventBatch([{
type: 'delivered',
emailId: 1
emailId: 1,
timestamp: new Date(1)
}, {
type: 'delivered',
emailId: 2
emailId: 2,
timestamp: new Date(2)
}, {
type: 'opened',
emailId: 1
}]);
emailId: 1,
timestamp: new Date(3)
}], result, fetchData);
eventProcessor.handleDelivered.callCount.should.eql(2);
@ -139,6 +66,10 @@ describe('EmailAnalyticsService', function () {
emailIds: [1, 2],
memberIds: [1]
}));
fetchData.should.deepEqual({
lastEventTimestamp: new Date(3)
});
});
});

View File

@ -82,10 +82,8 @@ class EmailEventProcessor {
emailId: recipient.emailId,
timestamp
});
await this.#eventStorage.handleOpened(event);
this.#domainEvents.dispatch(event);
await waitForEvent(); // Avoids knex connection pool to run dry
await this.#eventStorage.handleOpened(event);
}
return recipient;
}

View File

@ -2,6 +2,7 @@ const _ = require('lodash');
const debug = require('@tryghost/debug');
const logging = require('@tryghost/logging');
const metrics = require('@tryghost/metrics');
const errors = require('@tryghost/errors');
module.exports = class MailgunClient {
#config;
@ -38,7 +39,9 @@ module.exports = class MailgunClient {
}
if (Object.keys(recipientData).length > MailgunClient.BATCH_SIZE) {
// TODO: what to do here?
throw new errors.IncorrectUsageError({
message: `Mailgun only supports sending to ${MailgunClient.BATCH_SIZE} recipients at a time`
});
}
let messageData = {};
@ -107,13 +110,19 @@ module.exports = class MailgunClient {
}
}
/**
* Fetches events from Mailgun
* @param {Object} mailgunOptions
* @param {Function} batchHandler
* @param {Object} options
* @param {Number} options.maxEvents Not a strict maximum. We stop fetching after we reached the maximum AND received at least one event after begin (not equal) to prevent deadlocks.
* @returns {Promise<void>}
*/
async fetchEvents(mailgunOptions, batchHandler, {maxEvents = Infinity} = {}) {
let result = [];
const mailgunInstance = this.getInstance();
if (!mailgunInstance) {
logging.warn(`Mailgun is not configured`);
return result;
return;
}
debug(`fetchEvents: starting fetching first events page`);
@ -132,16 +141,14 @@ module.exports = class MailgunClient {
debug(`fetchEvents: finished fetching first page with ${events.length} events`);
let eventCount = 0;
const beginTimestamp = mailgunOptions.begin ? Math.ceil(mailgunOptions.begin * 1000) : undefined; // ceil here if we have rounding errors
pagesLoop:
while (events.length !== 0) {
const batchResult = await batchHandler(events);
result = result.concat(batchResult);
await batchHandler(events);
eventCount += events.length;
if (eventCount >= maxEvents) {
break pagesLoop;
if (eventCount >= maxEvents && (!beginTimestamp || !events[events.length - 1].timestamp || (events[events.length - 1].timestamp.getTime() > beginTimestamp))) {
break;
}
const nextPageId = page.pages.next.page;
@ -159,8 +166,6 @@ module.exports = class MailgunClient {
events = (page?.items?.map(this.normalizeEvent) || []).filter(e => !!e && e.timestamp <= startDate);
debug(`fetchEvents: finished fetching next page with ${events.length} events`);
}
return result;
} catch (error) {
// Log and re-throw Mailgun errors
logging.error(error);

View File

@ -0,0 +1,64 @@
{
"items": [
{
"event": "delivered",
"recipient": "recipient1@gmail.com",
"user-variables": {
"email-id": "5fbe5d9607bdfa3765dc3819"
},
"message": {
"headers": {
"message-id": "0201125133533.1.C55897076DBD42F2@domain.com"
}
},
"timestamp": 1606399301.266528
},
{
"event": "failed",
"severity": "temporary",
"recipient": "recipient2@gmail.com",
"user-variables": {
"email-id": "5fbe5d9607bdfa3765dc3819"
},
"message": {
"headers": {
"message-id": "0201125133533.1.C55897076DBD42F2@domain.com"
}
},
"timestamp": 1606399301.266528
},
{
"event": "failed",
"severity": "permanent",
"recipient": "recipient3@gmail.com",
"user-variables": {
"email-id": "5fbe5d9607bdfa3765dc3819"
},
"message": {
"headers": {
"message-id": "0201125133533.1.C55897076DBD42F2@domain.com"
}
},
"timestamp": 1606399301.266528
},
{
"event": "unsubscribed",
"recipient": "recipient4@gmail.com",
"user-variables": {
"email-id": "5fbe5d9607bdfa3765dc3819"
},
"message": {
"headers": {
"message-id": "0201125133533.1.C55897076DBD42F2@domain.com"
}
},
"timestamp": 1606399301.366528
}
],
"paging": {
"previous": "https://api.mailgun.net/v3/domain.com/events/all-1-previous",
"first": "https://api.mailgun.net/v3/domain.com/events/all-1-first",
"last": "https://api.mailgun.net/v3/domain.com/events/all-1-last",
"next": "https://api.mailgun.net/v3/domain.com/events/all-1-next"
}
}

View File

@ -11,7 +11,7 @@
"message-id": "0201125133533.1.C55897076DBD42F2@domain.com"
}
},
"timestamp": 1606399301.266528
"timestamp": 1606399301.266
},
{
"event": "failed",
@ -25,7 +25,7 @@
"message-id": "0201125133533.1.C55897076DBD42F2@domain.com"
}
},
"timestamp": 1606399301.266528
"timestamp": 1606399301.266
},
{
"event": "failed",
@ -39,7 +39,7 @@
"message-id": "0201125133533.1.C55897076DBD42F2@domain.com"
}
},
"timestamp": 1606399301.266528
"timestamp": 1606399301.266
},
{
"event": "unsubscribed",
@ -52,7 +52,7 @@
"message-id": "0201125133533.1.C55897076DBD42F2@domain.com"
}
},
"timestamp": 1606399301.266528
"timestamp": 1606399301.266
}
],
"paging": {

View File

@ -11,7 +11,7 @@
"message-id": "0201125133533.1.C55897076DBD42F2@domain.com"
}
},
"timestamp": 1606399301.266528
"timestamp": 1606399301.266
},
{
"event": "failed",
@ -25,7 +25,7 @@
"message-id": "0201125133533.1.C55897076DBD42F2@domain.com"
}
},
"timestamp": 1606399301.266528
"timestamp": 1606399301.266
}
],
"paging": {

View File

@ -9,7 +9,25 @@ const MailgunClient = require('../');
const MAILGUN_OPTIONS = {
event: 'delivered OR opened OR failed OR unsubscribed OR complained',
limit: 300,
tags: 'bulk-email'
tags: 'bulk-email',
begin: 1606399301.266
};
const createBatchCounter = (customHandler) => {
const batchCounter = {
events: 0,
batches: 0
};
batchCounter.batchHandler = async (events) => {
batchCounter.events += events.length;
batchCounter.batches += 1;
if (customHandler) {
await customHandler(events);
}
};
return batchCounter;
};
describe('MailgunClient', function () {
@ -138,12 +156,11 @@ describe('MailgunClient', function () {
describe('fetchEvents()', function () {
it('does not fetch if not configured', async function () {
const batchHandler = sinon.spy();
const counter = createBatchCounter();
const mailgunClient = new MailgunClient({config, settings});
const events = await mailgunClient.fetchEvents(MAILGUN_OPTIONS, batchHandler);
assert.equal(events.length, 0);
assert.equal(batchHandler.callCount, 0);
await mailgunClient.fetchEvents(MAILGUN_OPTIONS, counter.batchHandler);
assert.equal(counter.events, 0);
assert.equal(counter.batches, 0);
});
it('fetches from now and works backwards', async function () {
@ -178,17 +195,19 @@ describe('MailgunClient', function () {
'Content-Type': 'application/json'
});
const batchHandler = sinon.spy();
const counter = createBatchCounter();
const mailgunClient = new MailgunClient({config, settings});
await mailgunClient.fetchEvents(MAILGUN_OPTIONS, batchHandler);
await mailgunClient.fetchEvents(MAILGUN_OPTIONS, counter.batchHandler);
assert.equal(firstPageMock.isDone(), true);
assert.equal(secondPageMock.isDone(), true);
assert.equal(batchHandler.callCount, 2); // one per page
assert.equal(counter.batches, 2);
assert.equal(counter.events, 6);
});
it('fetches with a limit', async function () {
// This tests the deadlock possibility (if we would stop after x events, we would retry the same events again and again)
it('keeps fetching over the limit if events have the same timestamp as begin', async function () {
const configStub = sinon.stub(config, 'get');
configStub.withArgs('bulkEmail').returns({
mailgun: {
@ -207,6 +226,7 @@ describe('MailgunClient', function () {
const secondPageMock = nock('https://api.mailgun.net')
.get('/v3/domain.com/events/all-1-next')
.query(MAILGUN_OPTIONS)
.replyWithFile(200, `${__dirname}/fixtures/all-2.json`, {
'Content-Type': 'application/json'
});
@ -214,21 +234,112 @@ describe('MailgunClient', function () {
// requests continue until an empty items set is returned
nock('https://api.mailgun.net')
.get('/v3/domain.com/events/all-2-next')
.query(MAILGUN_OPTIONS)
.replyWithFile(200, `${__dirname}/fixtures/empty.json`, {
'Content-Type': 'application/json'
});
const batchHandler = sinon.stub().returnsArg(0);
const counter = createBatchCounter();
const maxEvents = 3;
const mailgunClient = new MailgunClient({config, settings});
const events = await mailgunClient.fetchEvents(MAILGUN_OPTIONS, batchHandler, {maxEvents});
assert.equal(events.length, 4); // `maxEvents` is 3 but the first page contains 4 events
await mailgunClient.fetchEvents(MAILGUN_OPTIONS, counter.batchHandler, {maxEvents});
assert.equal(counter.batches, 2);
assert.equal(counter.events, 6);
assert.equal(firstPageMock.isDone(), true);
assert.equal(secondPageMock.isDone(), true);
});
it('fetches with a limit and stops if timestamp difference reached', async function () {
const configStub = sinon.stub(config, 'get');
configStub.withArgs('bulkEmail').returns({
mailgun: {
apiKey: 'apiKey',
domain: 'domain.com',
baseUrl: 'https://api.mailgun.net/v3'
}
});
const firstPageMock = nock('https://api.mailgun.net')
.get('/v3/domain.com/events')
.query(MAILGUN_OPTIONS)
.replyWithFile(200, `${__dirname}/fixtures/all-1-timestamp.json`, {
'Content-Type': 'application/json'
});
const secondPageMock = nock('https://api.mailgun.net')
.get('/v3/domain.com/events/all-1-next')
.query(MAILGUN_OPTIONS)
.replyWithFile(200, `${__dirname}/fixtures/all-2.json`, {
'Content-Type': 'application/json'
});
// requests continue until an empty items set is returned
nock('https://api.mailgun.net')
.get('/v3/domain.com/events/all-2-next')
.query(MAILGUN_OPTIONS)
.replyWithFile(200, `${__dirname}/fixtures/empty.json`, {
'Content-Type': 'application/json'
});
const counter = createBatchCounter();
const maxEvents = 3;
const mailgunClient = new MailgunClient({config, settings});
await mailgunClient.fetchEvents(MAILGUN_OPTIONS, counter.batchHandler, {maxEvents});
assert.equal(counter.batches, 1);
assert.equal(counter.events, 4);
assert.equal(firstPageMock.isDone(), true);
assert.equal(secondPageMock.isDone(), false);
});
it('logs errors and rethrows during processing', async function () {
const configStub = sinon.stub(config, 'get');
configStub.withArgs('bulkEmail').returns({
mailgun: {
apiKey: 'apiKey',
domain: 'domain.com',
baseUrl: 'https://api.mailgun.net/v3'
}
});
const firstPageMock = nock('https://api.mailgun.net')
.get('/v3/domain.com/events')
.query(MAILGUN_OPTIONS)
.replyWithFile(200, `${__dirname}/fixtures/all-1-timestamp.json`, {
'Content-Type': 'application/json'
});
const secondPageMock = nock('https://api.mailgun.net')
.get('/v3/domain.com/events/all-1-next')
.query(MAILGUN_OPTIONS)
.replyWithFile(200, `${__dirname}/fixtures/all-2.json`, {
'Content-Type': 'application/json'
});
// requests continue until an empty items set is returned
nock('https://api.mailgun.net')
.get('/v3/domain.com/events/all-2-next')
.query(MAILGUN_OPTIONS)
.replyWithFile(200, `${__dirname}/fixtures/empty.json`, {
'Content-Type': 'application/json'
});
const counter = createBatchCounter(() => {
throw new Error('test error');
});
const mailgunClient = new MailgunClient({config, settings});
await assert.rejects(mailgunClient.fetchEvents(MAILGUN_OPTIONS, counter.batchHandler), /test error/);
assert.equal(counter.batches, 1);
assert.equal(counter.events, 4);
assert.equal(firstPageMock.isDone(), true);
assert.equal(secondPageMock.isDone(), false);
assert.equal(batchHandler.callCount, 1);
});
it('supports EU Mailgun domain', async function () {