Ghost/core/server/services/email-analytics/jobs/fetch-latest.js
Kevin Ansfield 717543835c
Added email analytics service (#12393)
no issue

- added `EmailAnalyticsService`
  - `.fetchAll()` grabs and processes all available events
  - `.fetchLatest()` grabs and processes all events since the last seen event timestamp
  - `EventProcessor` passed event objects and updates `email_recipients` or `members` records depending on the event being analytics or list hygiene
    - always returns a `EventProcessingResult` instance so that progress can be tracked and merged across individual events, batches (pages of events), and total runs
    - adds email_id and member_id to the returned result where appropriate so that the stats aggregator can limit processing to data that has changed
    - sets `email_recipients.{delivered_at, opened_at, failed_at}` for analytics events
    - sets `members.subscribed = false` for permanent failure/unsubscribed/complained list hygiene events
  - `StatsAggregator` takes an `EventProcessingResult`-like object containing arrays of email ids and member ids on which to aggregate statistics.
  - jobs for `fetch-latest` and `fetch-all` ready for use with the JobsService
- added `initialiseRecurringJobs()` function to Ghost bootup procedure that schedules the email analytics "fetch latest" job to run every minute
2020-11-26 13:09:38 +00:00

72 lines
2.5 KiB
JavaScript

const logging = require('../../../../shared/logging');
const {parentPort} = require('worker_threads');
const debug = require('ghost-ignition').debug('jobs:email-analytics:fetch-latest');
// recurring job to fetch analytics since the most recently seen event timestamp
// Exit early when cancelled to prevent stalling shutdown. No cleanup needed when cancelling as everything is idempotent and will pick up
// where it left off on next run
function cancel() {
logging.info('Email analytics fetch-latest job cancelled before completion');
if (parentPort) {
parentPort.postMessage('cancelled');
} else {
setTimeout(() => {
process.exit(0);
}, 1000);
}
}
if (parentPort) {
parentPort.once('message', (message) => {
if (message === 'cancel') {
return cancel();
}
});
}
(async () => {
try {
const models = require('../../../models');
const settingsService = require('../../settings');
// must be initialized before emailAnalyticsService is required otherwise
// requires are in the wrong order and settingsCache will always be empty
await models.init();
await settingsService.init();
const emailAnalyticsService = require('../');
const fetchStartDate = new Date();
debug('Starting email analytics fetch of latest events');
const eventStats = await emailAnalyticsService.fetchLatest();
const fetchEndDate = new Date();
debug(`Finished fetching ${eventStats.totalEvents} analytics events in ${fetchEndDate - fetchStartDate}ms`);
const aggregateStartDate = new Date();
debug(`Starting email analytics aggregation for ${eventStats.emailIds.length} emails`);
await emailAnalyticsService.aggregateStats(eventStats);
const aggregateEndDate = new Date();
debug(`Finished aggregating email analytics in ${aggregateEndDate - aggregateStartDate}ms`);
logging.info(`Fetched ${eventStats.totalEvents} events and aggregated stats for ${eventStats.emailIds.length} emails in ${aggregateEndDate - fetchStartDate}ms`);
if (parentPort) {
parentPort.postMessage('done');
} else {
// give the logging pipes time finish writing before exit
setTimeout(() => {
process.exit(0);
}, 1000);
}
} catch (error) {
logging.error(error);
// give the logging pipes time finish writing before exit
setTimeout(() => {
process.exit(1);
}, 1000);
}
})();