Moved email event fetching to main thread (#16231)

This commit is contained in:
Simon Backx 2023-02-09 09:36:39 +01:00 committed by GitHub
parent fd79ca3f5a
commit ea2c69565f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 336 additions and 496 deletions

View File

@ -292,6 +292,7 @@ async function initServices({config}) {
const audienceFeedback = require('./server/services/audience-feedback');
const emailSuppressionList = require('./server/services/email-suppression-list');
const emailService = require('./server/services/email-service');
const emailAnalytics = require('./server/services/email-analytics');
const mentionsService = require('./server/services/mentions');
const urlUtils = require('./shared/url-utils');
@ -316,6 +317,7 @@ async function initServices({config}) {
slack.listen(),
audienceFeedback.init(),
emailService.init(),
emailAnalytics.init(),
mega.listen(),
webhooks.listen(),
appService.init(),

View File

@ -0,0 +1,22 @@
/**
* This is an event that is used to circumvent the job manager that currently isn't able to run scheduled jobs on the main thread (not offloaded).
* We simply emit this event in the job manager and listen for it on the main thread.
*/
module.exports = class StartEmailAnalyticsJobEvent {
/**
* @param {any} data
* @param {Date} timestamp
*/
constructor(data, timestamp) {
this.data = data;
this.timestamp = timestamp;
}
/**
* @param {any} [data]
* @param {Date} [timestamp]
*/
static create(data, timestamp) {
return new StartEmailAnalyticsJobEvent(data, timestamp ?? new Date);
}
};

View File

@ -1,23 +1,3 @@
const config = require('../../../shared/config');
const db = require('../../data/db');
const settings = require('../../../shared/settings-cache');
const {EmailAnalyticsService} = require('@tryghost/email-analytics-service');
const {EmailEventProcessor} = require('@tryghost/email-service');
const MailgunProvider = require('@tryghost/email-analytics-provider-mailgun');
const queries = require('./lib/queries');
const DomainEvents = require('@tryghost/domain-events');
const EmailAnalyticsServiceWrapper = require('./wrapper');
const eventProcessor = new EmailEventProcessor({
domainEvents: DomainEvents,
db
});
module.exports = new EmailAnalyticsService({
config,
settings,
eventProcessor,
providers: [
new MailgunProvider({config, settings})
],
queries
});
module.exports = new EmailAnalyticsServiceWrapper();

View File

@ -1,4 +1,5 @@
const {parentPort} = require('worker_threads');
const StartEmailAnalyticsJobEvent = require('../../events/StartEmailAnalyticsJobEvent');
// recurring job to fetch analytics since the most recently seen event timestamp
@ -24,22 +25,15 @@ if (parentPort) {
}
(async () => {
const {run} = require('./run');
const {eventStats, aggregateEndDate, fetchStartDate} = await run({
domainEvents: {
dispatch(event) {
parentPort.postMessage({
event: {
type: event.constructor.name,
data: event
}
});
}
// We send an evnet message, so that it is emitted on the main thread by the job manager
// This will start the email analytics job on the main thread (the wrapper service is listening for this event)
parentPort.postMessage({
event: {
type: StartEmailAnalyticsJobEvent.name
}
});
if (parentPort) {
parentPort.postMessage(`Fetched ${eventStats.totalEvents} events and aggregated stats for ${eventStats.emailIds.length} emails in ${aggregateEndDate - fetchStartDate}ms`);
parentPort.postMessage('done');
} else {
// give the logging pipes time finish writing before exit

View File

@ -1,57 +0,0 @@
const debug = require('@tryghost/debug')('jobs:email-analytics:fetch-latest');
async function run({domainEvents}) {
const config = require('../../../../../shared/config');
const db = require('../../../../data/db');
const settingsRows = await db.knex('settings')
.whereIn('key', ['mailgun_api_key', 'mailgun_domain', 'mailgun_base_url']);
const settingsCache = {};
settingsRows.forEach((row) => {
settingsCache[row.key] = row.value;
});
const settings = {
get(key) {
return settingsCache[key];
}
};
const {EmailAnalyticsService} = require('@tryghost/email-analytics-service');
const MailgunProvider = require('@tryghost/email-analytics-provider-mailgun');
const queries = require('../../lib/queries');
const {EmailEventProcessor} = require('@tryghost/email-service');
// Since this is running in a worker thread, we cant dispatch directly
// So we post the events as a message to the job manager
const eventProcessor = new EmailEventProcessor({
domainEvents,
db
});
const emailAnalyticsService = new EmailAnalyticsService({
config,
settings,
eventProcessor,
providers: [
new MailgunProvider({config, settings})
],
queries
});
const fetchStartDate = new Date();
debug('Starting email analytics fetch of latest events');
const eventStats = await emailAnalyticsService.fetchLatest();
const fetchEndDate = new Date();
debug(`Finished fetching ${eventStats.totalEvents} analytics events in ${fetchEndDate - fetchStartDate}ms`);
const aggregateStartDate = new Date();
debug(`Starting email analytics aggregation for ${eventStats.emailIds.length} emails`);
await emailAnalyticsService.aggregateStats(eventStats);
const aggregateEndDate = new Date();
debug(`Finished aggregating email analytics in ${aggregateEndDate - aggregateStartDate}ms`);
return {eventStats, fetchStartDate, fetchEndDate, aggregateStartDate, aggregateEndDate};
}
module.exports.run = run;

View File

@ -0,0 +1,90 @@
const logging = require('@tryghost/logging');
const debug = require('@tryghost/debug')('jobs:email-analytics:fetch-latest');
class EmailAnalyticsServiceWrapper {
init() {
if (this.service) {
return;
}
const {EmailAnalyticsService} = require('@tryghost/email-analytics-service');
const {EmailEventStorage, EmailEventProcessor} = require('@tryghost/email-service');
const MailgunProvider = require('@tryghost/email-analytics-provider-mailgun');
const {EmailRecipientFailure, EmailSpamComplaintEvent} = require('../../models');
const StartEmailAnalyticsJobEvent = require('./events/StartEmailAnalyticsJobEvent');
const domainEvents = require('@tryghost/domain-events');
const config = require('../../../shared/config');
const settings = require('../../../shared/settings-cache');
const db = require('../../data/db');
const queries = require('./lib/queries');
const membersService = require('../members');
const membersRepository = membersService.api.members;
this.eventStorage = new EmailEventStorage({
db,
membersRepository,
models: {
EmailRecipientFailure,
EmailSpamComplaintEvent
}
});
// Since this is running in a worker thread, we cant dispatch directly
// So we post the events as a message to the job manager
const eventProcessor = new EmailEventProcessor({
domainEvents,
db,
eventStorage: this.eventStorage
});
this.service = new EmailAnalyticsService({
config,
settings,
eventProcessor,
providers: [
new MailgunProvider({config, settings})
],
queries
});
// We currently cannot trigger a non-offloaded job from the job manager
// So the email analytics jobs simply emits an event.
domainEvents.subscribe(StartEmailAnalyticsJobEvent, async () => {
await this.startFetch();
});
}
async startFetch() {
if (this.fetching) {
logging.info('Email analytics fetch already running, skipping');
return;
}
this.fetching = true;
logging.info('Email analytics fetch started');
try {
const fetchStartDate = new Date();
debug('Starting email analytics fetch of latest events');
const eventStats = await this.service.fetchLatest();
const fetchEndDate = new Date();
debug(`Finished fetching ${eventStats.totalEvents} analytics events in ${fetchEndDate.getTime() - fetchStartDate.getTime()}ms`);
const aggregateStartDate = new Date();
debug(`Starting email analytics aggregation for ${eventStats.emailIds.length} emails`);
await this.service.aggregateStats(eventStats);
const aggregateEndDate = new Date();
debug(`Finished aggregating email analytics in ${aggregateEndDate.getTime() - aggregateStartDate.getTime()}ms`);
logging.info(`Fetched ${eventStats.totalEvents} events and aggregated stats for ${eventStats.emailIds.length} emails in ${aggregateEndDate.getTime() - fetchStartDate.getTime()}ms`);
this.fetching = false;
return eventStats;
} catch (e) {
logging.error(e, 'Error while fetching email analytics');
}
this.fetching = false;
}
}
module.exports = EmailAnalyticsServiceWrapper;

View File

@ -13,8 +13,8 @@ class EmailServiceWrapper {
return;
}
const {EmailService, EmailController, EmailRenderer, SendingService, BatchSendingService, EmailSegmenter, EmailEventStorage, MailgunEmailProvider} = require('@tryghost/email-service');
const {Post, Newsletter, Email, EmailBatch, EmailRecipient, Member, EmailRecipientFailure, EmailSpamComplaintEvent} = require('../../models');
const {EmailService, EmailController, EmailRenderer, SendingService, BatchSendingService, EmailSegmenter, MailgunEmailProvider} = require('@tryghost/email-service');
const {Post, Newsletter, Email, EmailBatch, EmailRecipient, Member} = require('../../models');
const MailgunClient = require('@tryghost/mailgun-client');
const configService = require('../../../shared/config');
const settingsCache = require('../../../shared/settings-cache');
@ -25,7 +25,6 @@ class EmailServiceWrapper {
const sentry = require('../../../shared/sentry');
const membersRepository = membersService.api.members;
const limitService = require('../limits');
const domainEvents = require('@tryghost/domain-events');
const mobiledocLib = require('../../lib/mobiledoc');
const lexicalLib = require('../../lib/lexical');
@ -116,16 +115,6 @@ class EmailServiceWrapper {
Email
}
});
this.eventStorage = new EmailEventStorage({
db,
membersRepository,
models: {
EmailRecipientFailure,
EmailSpamComplaintEvent
}
});
this.eventStorage.listen(domainEvents);
}
}

View File

@ -3,8 +3,8 @@ const {agentProvider, fixtureManager} = require('../../../utils/e2e-framework');
const assert = require('assert');
const domainEvents = require('@tryghost/domain-events');
const MailgunClient = require('@tryghost/mailgun-client');
const {EmailDeliveredEvent} = require('@tryghost/email-events');
const DomainEvents = require('@tryghost/domain-events');
const emailAnalytics = require('../../../../core/server/services/email-analytics');
async function resetFailures(models, emailId) {
await models.EmailRecipientFailure.destroy({
@ -31,7 +31,6 @@ describe('EmailEventStorage', function () {
// Only reference services after Ghost boot
models = require('../../../../core/server/models');
run = require('../../../../core/server/services/email-analytics/jobs/fetch-latest/run.js').run;
membersService = require('../../../../core/server/services/members');
jobsService = require('../../../../core/server/services/jobs');
@ -78,9 +77,7 @@ describe('EmailEventStorage', function () {
// Fire event processing
// We use offloading to have correct coverage and usage of worker thread
const {eventStats: result} = await run({
domainEvents
});
const result = await emailAnalytics.startFetch();
assert.equal(result.delivered, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
@ -131,10 +128,7 @@ describe('EmailEventStorage', function () {
assert.equal(initialModel.get('delivered_at'), null);
// Fire event processing
// We use offloading to have correct coverage and usage of worker thread
const {eventStats: result} = await run({
domainEvents
});
const result = await emailAnalytics.startFetch();
assert.equal(result.delivered, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
@ -182,10 +176,7 @@ describe('EmailEventStorage', function () {
assert.equal(initialModel.get('opened_at'), null);
// Fire event processing
// We use offloading to have correct coverage and usage of worker thread
const {eventStats: result} = await run({
domainEvents
});
const result = await emailAnalytics.startFetch();
assert.equal(result.opened, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
@ -267,10 +258,7 @@ describe('EmailEventStorage', function () {
assert.notEqual(initialModel.get('delivered_at'), null);
// Fire event processing
// We use offloading to have correct coverage and usage of worker thread
const {eventStats: result} = await run({
domainEvents
});
const result = await emailAnalytics.startFetch();
assert.equal(result.permanentFailed, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
@ -367,10 +355,7 @@ describe('EmailEventStorage', function () {
assert.notEqual(initialModel.get('failed_at'), null, 'This test requires a failed email recipient');
// Fire event processing
// We use offloading to have correct coverage and usage of worker thread
const {eventStats: result} = await run({
domainEvents
});
const result = await emailAnalytics.startFetch();
assert.equal(result.permanentFailed, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
@ -462,10 +447,7 @@ describe('EmailEventStorage', function () {
assert.equal(initialModel.get('failed_at'), null);
// Fire event processing
// We use offloading to have correct coverage and usage of worker thread
const {eventStats: result} = await run({
domainEvents
});
const result = await emailAnalytics.startFetch();
assert.equal(result.permanentFailed, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
@ -583,10 +565,7 @@ describe('EmailEventStorage', function () {
assert.equal(initialModel.get('failed_at'), null);
// Fire event processing
// We use offloading to have correct coverage and usage of worker thread
const {eventStats: result} = await run({
domainEvents
});
const result = await emailAnalytics.startFetch();
assert.equal(result.temporaryFailed, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
@ -690,10 +669,7 @@ describe('EmailEventStorage', function () {
}];
// Fire event processing
// We use offloading to have correct coverage and usage of worker thread
const {eventStats: result} = await run({
domainEvents
});
const result = await emailAnalytics.startFetch();
assert.equal(result.temporaryFailed, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
@ -797,10 +773,7 @@ describe('EmailEventStorage', function () {
}];
// Fire event processing
// We use offloading to have correct coverage and usage of worker thread
const {eventStats: result} = await run({
domainEvents
});
const result = await emailAnalytics.startFetch();
assert.equal(result.permanentFailed, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
@ -866,10 +839,7 @@ describe('EmailEventStorage', function () {
}];
// Fire event processing
// We use offloading to have correct coverage and usage of worker thread
const {eventStats: result} = await run({
domainEvents
});
const result = await emailAnalytics.startFetch();
assert.equal(result.complained, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
@ -920,10 +890,7 @@ describe('EmailEventStorage', function () {
}];
// Fire event processing
// We use offloading to have correct coverage and usage of worker thread
const {eventStats: result} = await run({
domainEvents
});
const result = await emailAnalytics.startFetch();
assert.equal(result.unsubscribed, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
@ -962,10 +929,7 @@ describe('EmailEventStorage', function () {
}];
// Fire event processing
// We use offloading to have correct coverage and usage of worker thread
const {eventStats: result} = await run({
domainEvents
});
const result = await emailAnalytics.startFetch();
assert.equal(result.unhandled, 1);
assert.deepEqual(result.emailIds, []);
assert.deepEqual(result.memberIds, []);
@ -981,10 +945,7 @@ describe('EmailEventStorage', function () {
}];
// Fire event processing
// We use offloading to have correct coverage and usage of worker thread
const {eventStats: result} = await run({
domainEvents
});
const result = await emailAnalytics.startFetch();
assert.equal(result.unhandled, 0);
assert.deepEqual(result.emailIds, []);
assert.deepEqual(result.memberIds, []);

View File

@ -3,20 +3,17 @@ const {agentProvider, fixtureManager} = require('../../utils/e2e-framework');
const assert = require('assert');
const MailgunClient = require('@tryghost/mailgun-client');
const DomainEvents = require('@tryghost/domain-events');
const emailAnalytics = require('../../../core/server/services/email-analytics');
describe('MailgunEmailSuppressionList', function () {
let agent;
let events = [];
let run;
before(async function () {
agent = await agentProvider.getAdminAPIAgent();
await fixtureManager.init('newsletters', 'members:newsletters', 'members:emails');
await agent.loginAsOwner();
// Only reference services after Ghost boot
run = require('../../../core/server/services/email-analytics/jobs/fetch-latest/run.js').run;
sinon.stub(MailgunClient.prototype, 'fetchEvents').callsFake(async function (_, batchHandler) {
const normalizedEvents = (events.map(this.normalizeEvent) || []).filter(e => !!e);
return [await batchHandler(normalizedEvents)];
@ -47,10 +44,7 @@ describe('MailgunEmailSuppressionList', function () {
recipient
})];
await run({
domainEvents: DomainEvents
});
await emailAnalytics.startFetch();
await DomainEvents.allSettled();
const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`);
@ -78,10 +72,7 @@ describe('MailgunEmailSuppressionList', function () {
recipient
})];
await run({
domainEvents: DomainEvents
});
await emailAnalytics.startFetch();
await DomainEvents.allSettled();
const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`);
@ -109,10 +100,7 @@ describe('MailgunEmailSuppressionList', function () {
recipient
})];
await run({
domainEvents: DomainEvents
});
await emailAnalytics.startFetch();
await DomainEvents.allSettled();
const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`);
@ -140,10 +128,7 @@ describe('MailgunEmailSuppressionList', function () {
recipient
})];
await run({
domainEvents: DomainEvents
});
await emailAnalytics.startFetch();
await DomainEvents.allSettled();
const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`);
@ -178,10 +163,7 @@ describe('MailgunEmailSuppressionList', function () {
timestamp: Math.round(timestamp.getTime() / 1000)
}];
await run({
domainEvents: DomainEvents
});
await emailAnalytics.startFetch();
await DomainEvents.allSettled();
const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`);

View File

@ -642,7 +642,7 @@ const fixtures = {
memberIds: DataGenerator.forKnex.members.map(member => member.id)
};
return emailAnalyticsService.aggregateStats(toAggregate);
return emailAnalyticsService.service.aggregateStats(toAggregate);
});
},

View File

@ -2,7 +2,7 @@ const {EmailDeliveredEvent, EmailOpenedEvent, EmailBouncedEvent, SpamComplaintEv
async function waitForEvent() {
return new Promise((resolve) => {
setTimeout(resolve, 200);
setTimeout(resolve, 70);
});
}
@ -20,16 +20,28 @@ async function waitForEvent() {
* @property {string} emailId
*/
/**
* @typedef EmailEventStorage
* @property {(event: EmailDeliveredEvent) => Promise<void>} handleDelivered
* @property {(event: EmailOpenedEvent) => Promise<void>} handleOpened
* @property {(event: EmailBouncedEvent) => Promise<void>} handlePermanentFailed
* @property {(event: EmailTemporaryBouncedEvent) => Promise<void>} handleTemporaryFailed
* @property {(event: EmailUnsubscribedEvent) => Promise<void>} handleUnsubscribed
* @property {(event: SpamComplaintEvent) => Promise<void>} handleComplained
*/
/**
* WARNING: this class is used in a separate thread (an offloaded job). Be careful when working with settings and models.
*/
class EmailEventProcessor {
#domainEvents;
#db;
#eventStorage;
constructor({domainEvents, db}) {
constructor({domainEvents, db, eventStorage}) {
this.#domainEvents = domainEvents;
this.#db = db;
this.#eventStorage = eventStorage;
// Avoid having to query email_batch by provider_id for every event
this.providerIdEmailIdMap = {};
@ -42,15 +54,16 @@ class EmailEventProcessor {
async handleDelivered(emailIdentification, timestamp) {
const recipient = await this.getRecipient(emailIdentification);
if (recipient) {
this.#domainEvents.dispatch(EmailDeliveredEvent.create({
const event = EmailDeliveredEvent.create({
email: emailIdentification.email,
emailRecipientId: recipient.emailRecipientId,
memberId: recipient.memberId,
emailId: recipient.emailId,
timestamp
}));
// We cannot await the dispatched domainEvent, but we need to limit the number of events thare are processed at the same time
await waitForEvent();
});
await this.#eventStorage.handleDelivered(event);
this.#domainEvents.dispatch(event);
}
return recipient;
}
@ -62,15 +75,17 @@ class EmailEventProcessor {
async handleOpened(emailIdentification, timestamp) {
const recipient = await this.getRecipient(emailIdentification);
if (recipient) {
this.#domainEvents.dispatch(EmailOpenedEvent.create({
const event = EmailOpenedEvent.create({
email: emailIdentification.email,
emailRecipientId: recipient.emailRecipientId,
memberId: recipient.memberId,
emailId: recipient.emailId,
timestamp
}));
// We cannot await the dispatched domainEvent, but we need to limit the number of events thare are processed at the same time
await waitForEvent();
});
await this.#eventStorage.handleOpened(event);
this.#domainEvents.dispatch(event);
await waitForEvent(); // Avoids knex connection pool to run dry
}
return recipient;
}
@ -82,7 +97,7 @@ class EmailEventProcessor {
async handleTemporaryFailed(emailIdentification, {timestamp, error, id}) {
const recipient = await this.getRecipient(emailIdentification);
if (recipient) {
this.#domainEvents.dispatch(EmailTemporaryBouncedEvent.create({
const event = EmailTemporaryBouncedEvent.create({
id,
error,
email: emailIdentification.email,
@ -90,9 +105,10 @@ class EmailEventProcessor {
emailId: recipient.emailId,
emailRecipientId: recipient.emailRecipientId,
timestamp
}));
// We cannot await the dispatched domainEvent, but we need to limit the number of events thare are processed at the same time
await waitForEvent();
});
await this.#eventStorage.handleTemporaryFailed(event);
this.#domainEvents.dispatch(event);
}
return recipient;
}
@ -104,7 +120,7 @@ class EmailEventProcessor {
async handlePermanentFailed(emailIdentification, {timestamp, error, id}) {
const recipient = await this.getRecipient(emailIdentification);
if (recipient) {
this.#domainEvents.dispatch(EmailBouncedEvent.create({
const event = EmailBouncedEvent.create({
id,
error,
email: emailIdentification.email,
@ -112,9 +128,11 @@ class EmailEventProcessor {
emailId: recipient.emailId,
emailRecipientId: recipient.emailRecipientId,
timestamp
}));
// We cannot await the dispatched domainEvent, but we need to limit the number of events thare are processed at the same time
await waitForEvent();
});
await this.#eventStorage.handlePermanentFailed(event);
this.#domainEvents.dispatch(event);
await waitForEvent(); // Avoids knex connection pool to run dry
}
return recipient;
}
@ -126,14 +144,15 @@ class EmailEventProcessor {
async handleUnsubscribed(emailIdentification, timestamp) {
const recipient = await this.getRecipient(emailIdentification);
if (recipient) {
this.#domainEvents.dispatch(EmailUnsubscribedEvent.create({
const event = EmailUnsubscribedEvent.create({
email: emailIdentification.email,
memberId: recipient.memberId,
emailId: recipient.emailId,
timestamp
}));
// We cannot await the dispatched domainEvent, but we need to limit the number of events thare are processed at the same time
await waitForEvent();
});
await this.#eventStorage.handleUnsubscribed(event);
this.#domainEvents.dispatch(event);
}
return recipient;
}
@ -145,14 +164,16 @@ class EmailEventProcessor {
async handleComplained(emailIdentification, timestamp) {
const recipient = await this.getRecipient(emailIdentification);
if (recipient) {
this.#domainEvents.dispatch(SpamComplaintEvent.create({
const event = SpamComplaintEvent.create({
email: emailIdentification.email,
memberId: recipient.memberId,
emailId: recipient.emailId,
timestamp
}));
// We cannot await the dispatched domainEvent, but we need to limit the number of events thare are processed at the same time
await waitForEvent();
});
await this.#eventStorage.handleComplained(event);
this.#domainEvents.dispatch(event);
await waitForEvent(); // Avoids knex connection pool to run dry
}
return recipient;
}

View File

@ -1,4 +1,3 @@
const {EmailDeliveredEvent, EmailOpenedEvent, EmailBouncedEvent, EmailTemporaryBouncedEvent, EmailUnsubscribedEvent, SpamComplaintEvent} = require('@tryghost/email-events');
const moment = require('moment-timezone');
const logging = require('@tryghost/logging');
@ -13,35 +12,6 @@ class EmailEventStorage {
this.#membersRepository = membersRepository;
}
/**
* @param {import('@tryghost/domain-events')} domainEvents
*/
listen(domainEvents) {
domainEvents.subscribe(EmailDeliveredEvent, async (event) => {
await this.handleDelivered(event);
});
domainEvents.subscribe(EmailOpenedEvent, async (event) => {
await this.handleOpened(event);
});
domainEvents.subscribe(EmailBouncedEvent, async (event) => {
await this.handlePermanentFailed(event);
});
domainEvents.subscribe(EmailTemporaryBouncedEvent, async (event) => {
await this.handleTemporaryFailed(event);
});
domainEvents.subscribe(EmailUnsubscribedEvent, async (event) => {
await this.handleUnsubscribed(event);
});
domainEvents.subscribe(SpamComplaintEvent, async (event) => {
await this.handleComplained(event);
});
}
async handleDelivered(event) {
// To properly handle events that are received out of order (this happens because of polling)
// only set if delivered_at is null

View File

@ -5,6 +5,7 @@ const sinon = require('sinon');
describe('Email Event Processor', function () {
let eventProcessor;
let eventStorage;
let db;
let domainEvents;
@ -18,9 +19,20 @@ describe('Email Event Processor', function () {
domainEvents = {
dispatch: sinon.stub()
};
eventStorage = {
handleDelivered: sinon.stub(),
handleOpened: sinon.stub(),
handlePermanentFailed: sinon.stub(),
handleTemporaryFailed: sinon.stub(),
handleComplained: sinon.stub(),
handleUnsubscribed: sinon.stub()
};
eventProcessor = new EmailEventProcessor({
db,
domainEvents
domainEvents,
eventStorage
});
});
@ -88,8 +100,8 @@ describe('Email Event Processor', function () {
memberId: 'member-id',
emailId: 'email-id'
});
assert.equal(domainEvents.dispatch.callCount, 1);
const event = domainEvents.dispatch.firstCall.args[0];
assert.equal(eventStorage.handleDelivered.callCount, 1);
const event = eventStorage.handleDelivered.firstCall.args[0];
assert.equal(event.email, 'example@example.com');
assert.equal(event.constructor.name, 'EmailDeliveredEvent');
});
@ -101,8 +113,8 @@ describe('Email Event Processor', function () {
memberId: 'member-id',
emailId: 'email-id'
});
assert.equal(domainEvents.dispatch.callCount, 1);
const event = domainEvents.dispatch.firstCall.args[0];
assert.equal(eventStorage.handleOpened.callCount, 1);
const event = eventStorage.handleOpened.firstCall.args[0];
assert.equal(event.email, 'example@example.com');
assert.equal(event.constructor.name, 'EmailOpenedEvent');
});
@ -114,8 +126,8 @@ describe('Email Event Processor', function () {
memberId: 'member-id',
emailId: 'email-id'
});
assert.equal(domainEvents.dispatch.callCount, 1);
const event = domainEvents.dispatch.firstCall.args[0];
assert.equal(eventStorage.handleTemporaryFailed.callCount, 1);
const event = eventStorage.handleTemporaryFailed.firstCall.args[0];
assert.equal(event.email, 'example@example.com');
assert.equal(event.constructor.name, 'EmailTemporaryBouncedEvent');
});
@ -127,8 +139,8 @@ describe('Email Event Processor', function () {
memberId: 'member-id',
emailId: 'email-id'
});
assert.equal(domainEvents.dispatch.callCount, 1);
const event = domainEvents.dispatch.firstCall.args[0];
assert.equal(eventStorage.handlePermanentFailed.callCount, 1);
const event = eventStorage.handlePermanentFailed.firstCall.args[0];
assert.equal(event.email, 'example@example.com');
assert.equal(event.constructor.name, 'EmailBouncedEvent');
});
@ -140,8 +152,8 @@ describe('Email Event Processor', function () {
memberId: 'member-id',
emailId: 'email-id'
});
assert.equal(domainEvents.dispatch.callCount, 1);
const event = domainEvents.dispatch.firstCall.args[0];
assert.equal(eventStorage.handleUnsubscribed.callCount, 1);
const event = eventStorage.handleUnsubscribed.firstCall.args[0];
assert.equal(event.email, 'example@example.com');
assert.equal(event.constructor.name, 'EmailUnsubscribedEvent');
});
@ -153,8 +165,8 @@ describe('Email Event Processor', function () {
memberId: 'member-id',
emailId: 'email-id'
});
assert.equal(domainEvents.dispatch.callCount, 1);
const event = domainEvents.dispatch.firstCall.args[0];
assert.equal(eventStorage.handleComplained.callCount, 1);
const event = eventStorage.handleComplained.firstCall.args[0];
assert.equal(event.email, 'example@example.com');
assert.equal(event.constructor.name, 'SpamComplaintEvent');
});

View File

@ -10,6 +10,7 @@ describe('Email Event Storage', function () {
beforeEach(function () {
logError = sinon.stub(logging, 'error');
sinon.stub(logging, 'info');
});
afterEach(function () {
@ -23,76 +24,51 @@ describe('Email Event Storage', function () {
});
it('Handles email delivered events', async function () {
const DomainEvents = {
subscribe: async (type, handler) => {
if (type === EmailDeliveredEvent) {
handler(EmailDeliveredEvent.create({
email: 'example@example.com',
memberId: '123',
emailId: '456',
emailRecipientId: '789',
timestamp: new Date(0)
}));
}
}
};
const event = EmailDeliveredEvent.create({
email: 'example@example.com',
memberId: '123',
emailId: '456',
emailRecipientId: '789',
timestamp: new Date(0)
});
const subscribeSpy = sinon.spy(DomainEvents, 'subscribe');
const db = createDb();
const eventHandler = new EmailEventStorage({db});
eventHandler.listen(DomainEvents);
sinon.assert.callCount(subscribeSpy, 6);
await eventHandler.handleDelivered(event);
sinon.assert.calledOnce(db.update);
assert(!!db.update.firstCall.args[0].delivered_at);
});
it('Handles email opened events', async function () {
const DomainEvents = {
subscribe: async (type, handler) => {
if (type === EmailOpenedEvent) {
handler(EmailOpenedEvent.create({
email: 'example@example.com',
memberId: '123',
emailId: '456',
emailRecipientId: '789',
timestamp: new Date(0)
}));
}
}
};
const event = EmailOpenedEvent.create({
email: 'example@example.com',
memberId: '123',
emailId: '456',
emailRecipientId: '789',
timestamp: new Date(0)
});
const subscribeSpy = sinon.spy(DomainEvents, 'subscribe');
const db = createDb();
const eventHandler = new EmailEventStorage({db});
eventHandler.listen(DomainEvents);
sinon.assert.callCount(subscribeSpy, 6);
await eventHandler.handleOpened(event);
sinon.assert.calledOnce(db.update);
assert(!!db.update.firstCall.args[0].opened_at);
});
it('Handles email permanent bounce events with update', async function () {
let waitPromise;
const event = EmailBouncedEvent.create({
email: 'example@example.com',
memberId: '123',
emailId: '456',
emailRecipientId: '789',
error: {
message: 'test',
code: 500,
enhancedCode: '5.5.5'
},
timestamp: new Date(0)
});
const DomainEvents = {
subscribe: async (type, handler) => {
if (type === EmailBouncedEvent) {
waitPromise = handler(EmailBouncedEvent.create({
email: 'example@example.com',
memberId: '123',
emailId: '456',
emailRecipientId: '789',
error: {
message: 'test',
code: 500,
enhancedCode: '5.5.5'
},
timestamp: new Date(0)
}));
}
}
};
const subscribeSpy = sinon.spy(DomainEvents, 'subscribe');
const db = createDb();
const existing = {
id: 1,
@ -119,37 +95,26 @@ describe('Email Event Storage', function () {
EmailRecipientFailure
}
});
eventHandler.listen(DomainEvents);
sinon.assert.callCount(subscribeSpy, 6);
await waitPromise;
await eventHandler.handlePermanentFailed(event);
sinon.assert.calledOnce(db.update);
assert(!!db.update.firstCall.args[0].failed_at);
assert(existing.save.calledOnce);
});
it('Handles email permanent bounce events with insert', async function () {
let waitPromise;
const event = EmailBouncedEvent.create({
email: 'example@example.com',
memberId: '123',
emailId: '456',
emailRecipientId: '789',
error: {
message: 'test',
code: 500,
enhancedCode: '5.5.5'
},
timestamp: new Date(0)
});
const DomainEvents = {
subscribe: async (type, handler) => {
if (type === EmailBouncedEvent) {
waitPromise = handler(EmailBouncedEvent.create({
email: 'example@example.com',
memberId: '123',
emailId: '456',
emailRecipientId: '789',
error: {
message: 'test',
code: 500,
enhancedCode: '5.5.5'
},
timestamp: new Date(0)
}));
}
}
};
const subscribeSpy = sinon.spy(DomainEvents, 'subscribe');
const db = createDb();
const EmailRecipientFailure = {
transaction: async function (callback) {
@ -165,68 +130,45 @@ describe('Email Event Storage', function () {
EmailRecipientFailure
}
});
eventHandler.listen(DomainEvents);
sinon.assert.callCount(subscribeSpy, 6);
await waitPromise;
await eventHandler.handlePermanentFailed(event);
sinon.assert.calledOnce(db.update);
assert(!!db.update.firstCall.args[0].failed_at);
assert(EmailRecipientFailure.add.calledOnce);
});
it('Handles email permanent bounce event without error data', async function () {
let waitPromise;
const event = EmailBouncedEvent.create({
email: 'example@example.com',
memberId: '123',
emailId: '456',
emailRecipientId: '789',
error: null,
timestamp: new Date(0)
});
const DomainEvents = {
subscribe: async (type, handler) => {
if (type === EmailBouncedEvent) {
waitPromise = handler(EmailBouncedEvent.create({
email: 'example@example.com',
memberId: '123',
emailId: '456',
emailRecipientId: '789',
error: null,
timestamp: new Date(0)
}));
}
}
};
const subscribeSpy = sinon.spy(DomainEvents, 'subscribe');
const db = createDb();
const eventHandler = new EmailEventStorage({
db,
models: {}
});
eventHandler.listen(DomainEvents);
sinon.assert.callCount(subscribeSpy, 6);
await waitPromise;
await eventHandler.handlePermanentFailed(event);
sinon.assert.calledOnce(db.update);
});
it('Handles email permanent bounce events with skipped update', async function () {
let waitPromise;
const event = EmailBouncedEvent.create({
email: 'example@example.com',
memberId: '123',
emailId: '456',
emailRecipientId: '789',
error: {
message: 'test',
code: 500,
enhancedCode: '5.5.5'
},
timestamp: new Date(0)
});
const DomainEvents = {
subscribe: async (type, handler) => {
if (type === EmailBouncedEvent) {
waitPromise = handler(EmailBouncedEvent.create({
email: 'example@example.com',
memberId: '123',
emailId: '456',
emailRecipientId: '789',
error: {
message: 'test',
code: 500,
enhancedCode: '5.5.5'
},
timestamp: new Date(0)
}));
}
}
};
const subscribeSpy = sinon.spy(DomainEvents, 'subscribe');
const db = createDb();
const existing = {
id: 1,
@ -253,9 +195,7 @@ describe('Email Event Storage', function () {
EmailRecipientFailure
}
});
eventHandler.listen(DomainEvents);
sinon.assert.callCount(subscribeSpy, 6);
await waitPromise;
await eventHandler.handlePermanentFailed(event);
sinon.assert.calledOnce(db.update);
assert(!!db.update.firstCall.args[0].failed_at);
assert(EmailRecipientFailure.findOne.called);
@ -263,28 +203,19 @@ describe('Email Event Storage', function () {
});
it('Handles email temporary bounce events with update', async function () {
let waitPromise;
const event = EmailTemporaryBouncedEvent.create({
email: 'example@example.com',
memberId: '123',
emailId: '456',
emailRecipientId: '789',
error: {
message: 'test',
code: 500,
enhancedCode: null
},
timestamp: new Date(0)
});
const DomainEvents = {
subscribe: async (type, handler) => {
if (type === EmailTemporaryBouncedEvent) {
waitPromise = handler(EmailTemporaryBouncedEvent.create({
email: 'example@example.com',
memberId: '123',
emailId: '456',
emailRecipientId: '789',
error: {
message: 'test',
code: 500,
enhancedCode: null
},
timestamp: new Date(0)
}));
}
}
};
const subscribeSpy = sinon.spy(DomainEvents, 'subscribe');
const existing = {
id: 1,
get: (key) => {
@ -309,35 +240,24 @@ describe('Email Event Storage', function () {
EmailRecipientFailure
}
});
eventHandler.listen(DomainEvents);
sinon.assert.callCount(subscribeSpy, 6);
await waitPromise;
await eventHandler.handleTemporaryFailed(event);
assert(existing.save.calledOnce);
});
it('Handles email temporary bounce events with skipped update', async function () {
let waitPromise;
const event = EmailTemporaryBouncedEvent.create({
email: 'example@example.com',
memberId: '123',
emailId: '456',
emailRecipientId: '789',
error: {
message: 'test',
code: 500,
enhancedCode: '5.5.5'
},
timestamp: new Date(0)
});
const DomainEvents = {
subscribe: async (type, handler) => {
if (type === EmailTemporaryBouncedEvent) {
waitPromise = handler(EmailTemporaryBouncedEvent.create({
email: 'example@example.com',
memberId: '123',
emailId: '456',
emailRecipientId: '789',
error: {
message: 'test',
code: 500,
enhancedCode: '5.5.5'
},
timestamp: new Date(0)
}));
}
}
};
const subscribeSpy = sinon.spy(DomainEvents, 'subscribe');
const existing = {
id: 1,
get: (key) => {
@ -362,29 +282,18 @@ describe('Email Event Storage', function () {
EmailRecipientFailure
}
});
eventHandler.listen(DomainEvents);
sinon.assert.callCount(subscribeSpy, 6);
await waitPromise;
await eventHandler.handleTemporaryFailed(event);
assert(existing.save.notCalled);
});
it('Handles unsubscribe', async function () {
let waitPromise;
const event = EmailUnsubscribedEvent.create({
email: 'example@example.com',
memberId: '123',
emailId: '456',
timestamp: new Date(0)
});
const DomainEvents = {
subscribe: async (type, handler) => {
if (type === EmailUnsubscribedEvent) {
waitPromise = handler(EmailUnsubscribedEvent.create({
email: 'example@example.com',
memberId: '123',
emailId: '456',
timestamp: new Date(0)
}));
}
}
};
const subscribeSpy = sinon.spy(DomainEvents, 'subscribe');
const update = sinon.stub().resolves();
const eventHandler = new EmailEventStorage({
@ -392,30 +301,19 @@ describe('Email Event Storage', function () {
update
}
});
eventHandler.listen(DomainEvents);
sinon.assert.callCount(subscribeSpy, 6);
await waitPromise;
await eventHandler.handleUnsubscribed(event);
assert(update.calledOnce);
assert(update.firstCall.args[0].newsletters.length === 0);
});
it('Handles complaints', async function () {
let waitPromise;
const event = SpamComplaintEvent.create({
email: 'example@example.com',
memberId: '123',
emailId: '456',
timestamp: new Date(0)
});
const DomainEvents = {
subscribe: async (type, handler) => {
if (type === SpamComplaintEvent) {
waitPromise = handler(SpamComplaintEvent.create({
email: 'example@example.com',
memberId: '123',
emailId: '456',
timestamp: new Date(0)
}));
}
}
};
const subscribeSpy = sinon.spy(DomainEvents, 'subscribe');
const EmailSpamComplaintEvent = {
add: sinon.stub().resolves()
};
@ -425,29 +323,18 @@ describe('Email Event Storage', function () {
EmailSpamComplaintEvent
}
});
eventHandler.listen(DomainEvents);
sinon.assert.callCount(subscribeSpy, 6);
await waitPromise;
await eventHandler.handleComplained(event);
assert(EmailSpamComplaintEvent.add.calledOnce);
});
it('Handles duplicate complaints', async function () {
let waitPromise;
const event = SpamComplaintEvent.create({
email: 'example@example.com',
memberId: '123',
emailId: '456',
timestamp: new Date(0)
});
const DomainEvents = {
subscribe: async (type, handler) => {
if (type === SpamComplaintEvent) {
waitPromise = handler(SpamComplaintEvent.create({
email: 'example@example.com',
memberId: '123',
emailId: '456',
timestamp: new Date(0)
}));
}
}
};
const subscribeSpy = sinon.spy(DomainEvents, 'subscribe');
const EmailSpamComplaintEvent = {
add: sinon.stub().rejects({code: 'ER_DUP_ENTRY'})
};
@ -457,30 +344,19 @@ describe('Email Event Storage', function () {
EmailSpamComplaintEvent
}
});
eventHandler.listen(DomainEvents);
sinon.assert.callCount(subscribeSpy, 6);
await waitPromise;
await eventHandler.handleComplained(event);
assert(EmailSpamComplaintEvent.add.calledOnce);
assert(!logError.calledOnce);
});
it('Handles logging failed complaint storage', async function () {
let waitPromise;
const event = SpamComplaintEvent.create({
email: 'example@example.com',
memberId: '123',
emailId: '456',
timestamp: new Date(0)
});
const DomainEvents = {
subscribe: async (type, handler) => {
if (type === SpamComplaintEvent) {
waitPromise = handler(SpamComplaintEvent.create({
email: 'example@example.com',
memberId: '123',
emailId: '456',
timestamp: new Date(0)
}));
}
}
};
const subscribeSpy = sinon.spy(DomainEvents, 'subscribe');
const EmailSpamComplaintEvent = {
add: sinon.stub().rejects(new Error('Some database error'))
};
@ -490,9 +366,7 @@ describe('Email Event Storage', function () {
EmailSpamComplaintEvent
}
});
eventHandler.listen(DomainEvents);
sinon.assert.callCount(subscribeSpy, 6);
await waitPromise;
await eventHandler.handleComplained(event);
assert(EmailSpamComplaintEvent.add.calledOnce);
assert(logError.calledOnce);
});