Added email analytics throughput metrics (#21694)

ref
https://linear.app/ghost/issue/ENG-1508/add-custom-metrics-for-email-analytics-jobs

- With the experimental job queue, we're using email analytics as our
initial validation test case. We're hoping to see an improvement in
Ghost's throughput for ingesting email events. However, we don't
currently collect this data point, so it's kind of impossible to tell
right now if the job queue is making things better or not.
- This PR fixes that by adding two new prometheus metrics:
- `email_analytics_events_processed` — a counter incremented each time
an event is processed. Sometimes the event has already been processed in
the past, so this doens't always result in a new event being stored in
the DB.
- `email_analytics_events_stored` — a counter incremented each time an
event is stored in the DB. For example, if an email is opened 3 times by
the same recipient, this counter will only be incremented once.
- The metrics also have a label for the event type, so we can split out
opened events from delivered events. We can use the `rate()` function in
grafana to then get an `events ingested per second` metric, and compare
sites with/without the job queue enabled.
This commit is contained in:
Chris Raible 2024-11-21 17:30:43 -08:00 committed by GitHub
parent 48839c80ae
commit 829a0fc7e9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 152 additions and 13 deletions

View File

@ -32,7 +32,8 @@ class EmailAnalyticsServiceWrapper {
EmailRecipientFailure, EmailRecipientFailure,
EmailSpamComplaintEvent EmailSpamComplaintEvent
}, },
emailSuppressionList emailSuppressionList,
prometheusClient
}); });
// Since this is running in a worker thread, we cant dispatch directly // Since this is running in a worker thread, we cant dispatch directly
@ -40,7 +41,8 @@ class EmailAnalyticsServiceWrapper {
const eventProcessor = new EmailEventProcessor({ const eventProcessor = new EmailEventProcessor({
domainEvents, domainEvents,
db, db,
eventStorage: this.eventStorage eventStorage: this.eventStorage,
prometheusClient
}); });
this.service = new EmailAnalyticsService({ this.service = new EmailAnalyticsService({

View File

@ -1,4 +1,5 @@
const {EmailDeliveredEvent, EmailOpenedEvent, EmailBouncedEvent, SpamComplaintEvent, EmailUnsubscribedEvent, EmailTemporaryBouncedEvent} = require('@tryghost/email-events'); const {EmailDeliveredEvent, EmailOpenedEvent, EmailBouncedEvent, SpamComplaintEvent, EmailUnsubscribedEvent, EmailTemporaryBouncedEvent} = require('@tryghost/email-events');
const logging = require('@tryghost/logging');
async function waitForEvent() { async function waitForEvent() {
return new Promise((resolve) => { return new Promise((resolve) => {
@ -37,14 +38,22 @@ class EmailEventProcessor {
#domainEvents; #domainEvents;
#db; #db;
#eventStorage; #eventStorage;
#prometheusClient;
constructor({domainEvents, db, eventStorage}) { constructor({domainEvents, db, eventStorage, prometheusClient}) {
this.#domainEvents = domainEvents; this.#domainEvents = domainEvents;
this.#db = db; this.#db = db;
this.#eventStorage = eventStorage; this.#eventStorage = eventStorage;
this.#prometheusClient = prometheusClient;
// Avoid having to query email_batch by provider_id for every event // Avoid having to query email_batch by provider_id for every event
this.providerIdEmailIdMap = {}; this.providerIdEmailIdMap = {};
if (this.#prometheusClient) {
this.#prometheusClient.registerCounter({
name: 'email_analytics_events_processed',
help: 'Number of email analytics events processed',
labelNames: ['event']
});
}
} }
/** /**
@ -64,6 +73,7 @@ class EmailEventProcessor {
await this.#eventStorage.handleDelivered(event); await this.#eventStorage.handleDelivered(event);
this.#domainEvents.dispatch(event); this.#domainEvents.dispatch(event);
this.recordEventProcessed('delivered');
} }
return recipient; return recipient;
} }
@ -84,6 +94,7 @@ class EmailEventProcessor {
}); });
this.#domainEvents.dispatch(event); this.#domainEvents.dispatch(event);
await this.#eventStorage.handleOpened(event); await this.#eventStorage.handleOpened(event);
this.recordEventProcessed('opened');
} }
return recipient; return recipient;
} }
@ -209,6 +220,20 @@ class EmailEventProcessor {
} }
} }
/**
* Record event processed
* @param {string} event
*/
recordEventProcessed(event) {
try {
if (this.#prometheusClient) {
this.#prometheusClient.getMetric('email_analytics_events_processed')?.inc({event});
}
} catch (err) {
logging.error('Error recording email analytics event processed', err);
}
}
/** /**
* @private * @private
* @param {string} providerId * @param {string} providerId

View File

@ -6,34 +6,46 @@ class EmailEventStorage {
#membersRepository; #membersRepository;
#models; #models;
#emailSuppressionList; #emailSuppressionList;
#prometheusClient;
constructor({db, models, membersRepository, emailSuppressionList}) { constructor({db, models, membersRepository, emailSuppressionList, prometheusClient}) {
this.#db = db; this.#db = db;
this.#models = models; this.#models = models;
this.#membersRepository = membersRepository; this.#membersRepository = membersRepository;
this.#emailSuppressionList = emailSuppressionList; this.#emailSuppressionList = emailSuppressionList;
this.#prometheusClient = prometheusClient;
if (this.#prometheusClient) {
this.#prometheusClient.registerCounter({
name: 'email_analytics_events_stored',
help: 'Number of email analytics events stored',
labelNames: ['event']
});
}
} }
async handleDelivered(event) { async handleDelivered(event) {
// To properly handle events that are received out of order (this happens because of polling) // To properly handle events that are received out of order (this happens because of polling)
// only set if delivered_at is null // only set if delivered_at is null
await this.#db.knex('email_recipients') const rowCount = await this.#db.knex('email_recipients')
.where('id', '=', event.emailRecipientId) .where('id', '=', event.emailRecipientId)
.whereNull('delivered_at') .whereNull('delivered_at')
.update({ .update({
delivered_at: moment.utc(event.timestamp).format('YYYY-MM-DD HH:mm:ss') delivered_at: moment.utc(event.timestamp).format('YYYY-MM-DD HH:mm:ss')
}); });
this.recordEventStored('delivered', rowCount);
} }
async handleOpened(event) { async handleOpened(event) {
// To properly handle events that are received out of order (this happens because of polling) // To properly handle events that are received out of order (this happens because of polling)
// only set if opened_at is null // only set if opened_at is null
await this.#db.knex('email_recipients') const rowCount = await this.#db.knex('email_recipients')
.where('id', '=', event.emailRecipientId) .where('id', '=', event.emailRecipientId)
.whereNull('opened_at') .whereNull('opened_at')
.update({ .update({
opened_at: moment.utc(event.timestamp).format('YYYY-MM-DD HH:mm:ss') opened_at: moment.utc(event.timestamp).format('YYYY-MM-DD HH:mm:ss')
}); });
this.recordEventStored('opened', rowCount);
} }
async handlePermanentFailed(event) { async handlePermanentFailed(event) {
@ -157,6 +169,19 @@ class EmailEventStorage {
return []; return [];
} }
} }
/**
* Record event stored
* @param {string} event
* @param {number} count
*/
recordEventStored(event, count = 1) {
try {
this.#prometheusClient?.getMetric('email_analytics_events_stored')?.inc({event}, count);
} catch (err) {
logging.error('Error recording email analytics event stored', err);
}
}
} }
module.exports = EmailEventStorage; module.exports = EmailEventStorage;

View File

@ -1,6 +1,6 @@
const assert = require('assert/strict'); const assert = require('assert/strict');
const EmailEventProcessor = require('../lib/EmailEventProcessor'); const EmailEventProcessor = require('../lib/EmailEventProcessor');
const {createDb} = require('./utils'); const {createDb, createPrometheusClient} = require('./utils');
const sinon = require('sinon'); const sinon = require('sinon');
describe('Email Event Processor', function () { describe('Email Event Processor', function () {
@ -8,14 +8,14 @@ describe('Email Event Processor', function () {
let eventStorage; let eventStorage;
let db; let db;
let domainEvents; let domainEvents;
let prometheusClient;
beforeEach(function () { beforeEach(function () {
db = createDb({first: { db = createDb({first: {
emailId: 'fetched-email-id', emailId: 'fetched-email-id',
member_id: 'member-id', member_id: 'member-id',
id: 'email-recipient-id' id: 'email-recipient-id'
}}); }});
prometheusClient = createPrometheusClient();
domainEvents = { domainEvents = {
dispatch: sinon.stub() dispatch: sinon.stub()
}; };
@ -32,7 +32,8 @@ describe('Email Event Processor', function () {
eventProcessor = new EmailEventProcessor({ eventProcessor = new EmailEventProcessor({
db, db,
domainEvents, domainEvents,
eventStorage eventStorage,
prometheusClient
}); });
}); });
@ -171,4 +172,30 @@ describe('Email Event Processor', function () {
assert.equal(event.constructor.name, 'SpamComplaintEvent'); assert.equal(event.constructor.name, 'SpamComplaintEvent');
}); });
}); });
describe('recordEventProcessed', function () {
it('records the event processed metric', function () {
const incStub = sinon.stub();
prometheusClient = createPrometheusClient({incStub});
eventProcessor = new EmailEventProcessor({
db,
domainEvents,
eventStorage,
prometheusClient
});
eventProcessor.recordEventProcessed('delivered');
assert(incStub.calledOnce);
});
it('does not throw if recording the event metric fails', function () {
prometheusClient = createPrometheusClient({incStub: sinon.stub().throws()});
eventProcessor = new EmailEventProcessor({
db,
domainEvents,
eventStorage,
prometheusClient
});
assert.doesNotThrow(() => eventProcessor.recordEventProcessed('delivered'));
});
});
}); });

View File

@ -3,7 +3,7 @@ const {EmailDeliveredEvent, EmailOpenedEvent, EmailBouncedEvent, EmailTemporaryB
const sinon = require('sinon'); const sinon = require('sinon');
const assert = require('assert/strict'); const assert = require('assert/strict');
const logging = require('@tryghost/logging'); const logging = require('@tryghost/logging');
const {createDb} = require('./utils'); const {createDb, createPrometheusClient} = require('./utils');
describe('Email Event Storage', function () { describe('Email Event Storage', function () {
let logError; let logError;
@ -21,6 +21,12 @@ describe('Email Event Storage', function () {
it('doesn\'t throw', function () { it('doesn\'t throw', function () {
new EmailEventStorage({}); new EmailEventStorage({});
}); });
it('sets up metrics if prometheusClient is provided', function () {
const prometheusClient = createPrometheusClient();
new EmailEventStorage({prometheusClient});
sinon.assert.calledOnce(prometheusClient.registerCounter);
});
}); });
it('Handles email delivered events', async function () { it('Handles email delivered events', async function () {
@ -39,6 +45,16 @@ describe('Email Event Storage', function () {
assert(!!db.update.firstCall.args[0].delivered_at); assert(!!db.update.firstCall.args[0].delivered_at);
}); });
it('Records the event stored metric when handling email delivered events', async function () {
const event = EmailDeliveredEvent.create({});
const db = createDb();
const prometheusClient = createPrometheusClient();
const eventHandler = new EmailEventStorage({db, prometheusClient});
sinon.stub(eventHandler, 'recordEventStored').resolves();
await eventHandler.handleDelivered(event);
assert(eventHandler.recordEventStored.calledOnce);
});
it('Handles email opened events', async function () { it('Handles email opened events', async function () {
const event = EmailOpenedEvent.create({ const event = EmailOpenedEvent.create({
email: 'example@example.com', email: 'example@example.com',
@ -55,6 +71,16 @@ describe('Email Event Storage', function () {
assert(!!db.update.firstCall.args[0].opened_at); assert(!!db.update.firstCall.args[0].opened_at);
}); });
it('Records the event stored metric when handling email opened events', async function () {
const event = EmailOpenedEvent.create({});
const db = createDb();
const prometheusClient = createPrometheusClient();
const eventHandler = new EmailEventStorage({db, prometheusClient});
sinon.stub(eventHandler, 'recordEventStored').resolves();
await eventHandler.handleOpened(event);
assert(eventHandler.recordEventStored.calledOnce);
});
it('Handles email permanent bounce events with update', async function () { it('Handles email permanent bounce events with update', async function () {
const event = EmailBouncedEvent.create({ const event = EmailBouncedEvent.create({
email: 'example@example.com', email: 'example@example.com',
@ -595,4 +621,28 @@ describe('Email Event Storage', function () {
assert(EmailSpamComplaintEvent.add.calledOnce); assert(EmailSpamComplaintEvent.add.calledOnce);
assert(logError.calledOnce); assert(logError.calledOnce);
}); });
describe('recordEventStored', function () {
it('increments the counter', function () {
const incStub = sinon.stub();
const prometheusClient = {
registerCounter: sinon.stub(),
getMetric: sinon.stub().returns({
inc: incStub
})
};
const eventHandler = new EmailEventStorage({prometheusClient});
eventHandler.recordEventStored('delivered');
sinon.assert.calledOnce(incStub);
});
it('does not throw if recording the event metric fails', function () {
const prometheusClient = {
registerCounter: sinon.stub(),
getMetric: sinon.stub().throws(new Error('Metric not found'))
};
const eventHandler = new EmailEventStorage({prometheusClient});
assert.doesNotThrow(() => eventHandler.recordEventStored('delivered'));
});
});
}); });

View File

@ -163,6 +163,15 @@ const createDb = ({first, all} = {}) => {
return db; return db;
}; };
const createPrometheusClient = ({registerCounterStub, getMetricStub, incStub} = {}) => {
return {
registerCounter: registerCounterStub ?? sinon.stub(),
getMetric: getMetricStub ?? sinon.stub().returns({
inc: incStub ?? sinon.stub()
})
};
};
const sleep = (ms) => { const sleep = (ms) => {
return new Promise((resolve) => { return new Promise((resolve) => {
setTimeout(resolve, ms); setTimeout(resolve, ms);
@ -173,5 +182,6 @@ module.exports = {
createModel, createModel,
createModelClass, createModelClass,
createDb, createDb,
createPrometheusClient,
sleep sleep
}; };