From 717543835c19b32a0d45fc713064287a28c58289 Mon Sep 17 00:00:00 2001 From: Kevin Ansfield Date: Thu, 26 Nov 2020 13:09:38 +0000 Subject: [PATCH] Added email analytics service (#12393) no issue - added `EmailAnalyticsService` - `.fetchAll()` grabs and processes all available events - `.fetchLatest()` grabs and processes all events since the last seen event timestamp - `EventProcessor` passed event objects and updates `email_recipients` or `members` records depending on the event being analytics or list hygiene - always returns a `EventProcessingResult` instance so that progress can be tracked and merged across individual events, batches (pages of events), and total runs - adds email_id and member_id to the returned result where appropriate so that the stats aggregator can limit processing to data that has changed - sets `email_recipients.{delivered_at, opened_at, failed_at}` for analytics events - sets `members.subscribed = false` for permanent failure/unsubscribed/complained list hygiene events - `StatsAggregator` takes an `EventProcessingResult`-like object containing arrays of email ids and member ids on which to aggregate statistics. - jobs for `fetch-latest` and `fetch-all` ready for use with the JobsService - added `initialiseRecurringJobs()` function to Ghost bootup procedure that schedules the email analytics "fetch latest" job to run every minute --- core/server/index.js | 23 ++ .../email-analytics/email-analytics.js | 103 ++++++++ core/server/services/email-analytics/index.js | 12 + .../email-analytics/jobs/fetch-all.js | 70 ++++++ .../email-analytics/jobs/fetch-latest.js | 71 ++++++ .../lib/event-processing-result.js | 45 ++++ .../email-analytics/lib/event-processor.js | 227 ++++++++++++++++++ .../email-analytics/lib/stats-aggregator.js | 20 ++ .../email-analytics/providers/index.js | 10 + .../email-analytics/providers/mailgun.js | 135 +++++++++++ 10 files changed, 716 insertions(+) create mode 100644 core/server/services/email-analytics/email-analytics.js create mode 100644 core/server/services/email-analytics/index.js create mode 100644 core/server/services/email-analytics/jobs/fetch-all.js create mode 100644 core/server/services/email-analytics/jobs/fetch-latest.js create mode 100644 core/server/services/email-analytics/lib/event-processing-result.js create mode 100644 core/server/services/email-analytics/lib/event-processor.js create mode 100644 core/server/services/email-analytics/lib/stats-aggregator.js create mode 100644 core/server/services/email-analytics/providers/index.js create mode 100644 core/server/services/email-analytics/providers/mailgun.js diff --git a/core/server/index.js b/core/server/index.js index 6aca48c370..9e3fe6431b 100644 --- a/core/server/index.js +++ b/core/server/index.js @@ -10,6 +10,7 @@ require('./overrides'); const debug = require('ghost-ignition').debug('boot:init'); +const path = require('path'); const Promise = require('bluebird'); const config = require('../shared/config'); const {events, i18n} = require('./lib/common'); @@ -69,6 +70,22 @@ function initialiseServices() { }); } +function initializeRecurringJobs() { + // we don't want to kick off scheduled/recurring jobs that will interfere with tests + if (process.env.NODE_ENV.match(/^testing/)) { + return; + } + + const jobsService = require('./services/jobs'); + + jobsService.scheduleJob( + 'every 1 minute', + path.resolve(__dirname, 'services', 'email-analytics', 'jobs', 'fetch-latest.js'), + undefined, + 'email-analytics-fetch-latest' + ); +} + /** * - initialise models * - initialise i18n @@ -123,6 +140,9 @@ const minimalRequiredSetupToStartGhost = (dbState) => { events.emit('db.ready'); return initialiseServices() + .then(() => { + initializeRecurringJobs(); + }) .then(() => { return ghostServer; }); @@ -146,6 +166,9 @@ const minimalRequiredSetupToStartGhost = (dbState) => { logging.info('Blog is out of maintenance mode.'); return GhostServer.announceServerReadiness(); }) + .then(() => { + initializeRecurringJobs(); + }) .catch((err) => { return GhostServer.announceServerReadiness(err) .finally(() => { diff --git a/core/server/services/email-analytics/email-analytics.js b/core/server/services/email-analytics/email-analytics.js new file mode 100644 index 0000000000..0a323d9d97 --- /dev/null +++ b/core/server/services/email-analytics/email-analytics.js @@ -0,0 +1,103 @@ +const _ = require('lodash'); +const EventProcessingResult = require('./lib/event-processing-result'); +const EventProcessor = require('./lib/event-processor'); +const StatsAggregator = require('./lib/stats-aggregator'); +const defaultProviders = require('./providers'); +const debug = require('ghost-ignition').debug('services:email-analytics'); + +// when fetching a batch we should keep a record of which emails have associated +// events so we only aggregate those that are affected + +class EmailAnalyticsService { + constructor({config, settings, logging, db, providers, eventProcessor, statsAggregator}) { + this.config = config; + this.settings = settings; + this.logging = logging || console; + this.db = db; + this.providers = providers || defaultProviders.init({config, settings, logging}); + this.eventProcessor = eventProcessor || new EventProcessor({db, logging}); + this.statsAggregator = statsAggregator || new StatsAggregator({db, logging}); + } + + async fetchAll() { + const result = new EventProcessingResult(); + + const emailCount = await this.db.knex('emails').count(); + if (emailCount <= 0) { + debug('fetchAll: skipping - no emails to track'); + return result; + } + + const startFetch = new Date(); + debug('fetchAll: starting'); + for (const [, provider] of Object.entries(this.providers)) { + const providerResults = await provider.fetchAll(this.processEventBatch.bind(this)); + result.merge(providerResults); + } + debug(`fetchAll: finished (${Date.now() - startFetch}ms)`); + + return result; + } + + async fetchLatest({maxEvents = Infinity} = {}) { + const result = new EventProcessingResult(); + const lastTimestamp = await this.getLastSeenEventTimestamp(); + + const startFetch = new Date(); + debug('fetchLatest: starting'); + providersLoop: + for (const [, provider] of Object.entries(this.providers)) { + const providerResults = await provider.fetchLatest(lastTimestamp, this.processEventBatch.bind(this), {maxEvents}); + result.merge(providerResults); + + if (result.totalEvents >= maxEvents) { + break providersLoop; + } + } + debug(`fetchLatest: finished in ${Date.now() - startFetch}ms. Fetched ${result.totalEvents} events`); + + return result; + } + + async processEventBatch(events) { + const result = new EventProcessingResult(); + + for (const event of events) { + const batchResult = await this.eventProcessor.process(event); + result.merge(batchResult); + } + + return result; + } + + async aggregateStats({emailIds = [], memberIds = []}) { + for (const emailId of emailIds) { + await this.aggregateEmailStats(emailId); + } + for (const memberId of memberIds) { + await this.aggregateEmailStats(memberId); + } + } + + aggregateEmailStats(emailId) { + return this.statsAggregator.aggregateEmail(emailId); + } + + aggregateMemberStats(memberId) { + return this.statsAggregator.aggregateMember(memberId); + } + + async getLastSeenEventTimestamp() { + const startDate = new Date(); + // three separate queries is much faster than using max/greatest across columns with coalesce to handle nulls + const {maxDeliveredAt} = await this.db.knex('email_recipients').select(this.db.knex.raw('MAX(delivered_at) as maxDeliveredAt')).first() || {}; + const {maxOpenedAt} = await this.db.knex('email_recipients').select(this.db.knex.raw('MAX(opened_at) as maxOpenedAt')).first() || {}; + const {maxFailedAt} = await this.db.knex('email_recipients').select(this.db.knex.raw('MAX(failed_at) as maxFailedAt')).first() || {}; + const lastSeenEventTimestamp = _.max([maxDeliveredAt, maxOpenedAt, maxFailedAt]); + debug(`getLastSeenEventTimestamp: finished in ${Date.now() - startDate}ms`); + + return lastSeenEventTimestamp; + } +} + +module.exports = EmailAnalyticsService; diff --git a/core/server/services/email-analytics/index.js b/core/server/services/email-analytics/index.js new file mode 100644 index 0000000000..fc50e1085d --- /dev/null +++ b/core/server/services/email-analytics/index.js @@ -0,0 +1,12 @@ +const config = require('../../../shared/config'); +const logging = require('../../../shared/logging'); +const db = require('../../data/db'); +const settings = require('../settings/cache'); +const EmailAnalyticsService = require('./email-analytics'); + +module.exports = new EmailAnalyticsService({ + config, + logging, + db, + settings +}); diff --git a/core/server/services/email-analytics/jobs/fetch-all.js b/core/server/services/email-analytics/jobs/fetch-all.js new file mode 100644 index 0000000000..308c32159c --- /dev/null +++ b/core/server/services/email-analytics/jobs/fetch-all.js @@ -0,0 +1,70 @@ +const logging = require('../../../../shared/logging'); +const {parentPort} = require('worker_threads'); +const debug = require('ghost-ignition').debug('jobs:email-analytics:fetch-all'); + +// one-off job to fetch all available events and re-process them idempotently +// NB. can be a _very_ long job for sites with many members and frequent emails + +function cancel() { + logging.info('Email analytics fetch-all job cancelled before completion'); + + if (parentPort) { + parentPort.postMessage('cancelled'); + } else { + setTimeout(() => { + process.exit(0); + }, 1000); + } +} + +if (parentPort) { + parentPort.once('message', (message) => { + if (message === 'cancel') { + return cancel(); + } + }); +} + +(async () => { + try { + const models = require('../../../models'); + const settingsService = require('../../settings'); + + // must be initialized before emailAnalyticsService is required otherwise + // requires are in the wrong order and settingsCache will always be empty + await models.init(); + await settingsService.init(); + + const emailAnalyticsService = require('../'); + + const fetchStartDate = new Date(); + debug('Starting email analytics fetch of all available events'); + const eventStats = await emailAnalyticsService.fetchAll(); + const fetchEndDate = new Date(); + debug(`Finished fetching ${eventStats.totalEvents} analytics events in ${fetchEndDate - fetchStartDate}ms`); + + const aggregateStartDate = new Date(); + debug(`Starting email analytics aggregation for ${eventStats.emailIds.length} emails`); + await emailAnalyticsService.aggregateStats(eventStats); + const aggregateEndDate = new Date(); + debug(`Finished aggregating email analytics in ${aggregateEndDate - aggregateStartDate}ms`); + + logging.info(`Fetched ${eventStats.totalEvents} events and aggregated stats for ${eventStats.emailIds.length} emails in ${aggregateEndDate - fetchStartDate}ms`); + + if (parentPort) { + parentPort.postMessage('done'); + } else { + // give the logging pipes time finish writing before exit + setTimeout(() => { + process.exit(0); + }, 1000); + } + } catch (error) { + logging.error(error); + + // give the logging pipes time finish writing before exit + setTimeout(() => { + process.exit(1); + }, 1000); + } +})(); diff --git a/core/server/services/email-analytics/jobs/fetch-latest.js b/core/server/services/email-analytics/jobs/fetch-latest.js new file mode 100644 index 0000000000..2f9578e9ac --- /dev/null +++ b/core/server/services/email-analytics/jobs/fetch-latest.js @@ -0,0 +1,71 @@ +const logging = require('../../../../shared/logging'); +const {parentPort} = require('worker_threads'); +const debug = require('ghost-ignition').debug('jobs:email-analytics:fetch-latest'); + +// recurring job to fetch analytics since the most recently seen event timestamp + +// Exit early when cancelled to prevent stalling shutdown. No cleanup needed when cancelling as everything is idempotent and will pick up +// where it left off on next run +function cancel() { + logging.info('Email analytics fetch-latest job cancelled before completion'); + + if (parentPort) { + parentPort.postMessage('cancelled'); + } else { + setTimeout(() => { + process.exit(0); + }, 1000); + } +} + +if (parentPort) { + parentPort.once('message', (message) => { + if (message === 'cancel') { + return cancel(); + } + }); +} + +(async () => { + try { + const models = require('../../../models'); + const settingsService = require('../../settings'); + + // must be initialized before emailAnalyticsService is required otherwise + // requires are in the wrong order and settingsCache will always be empty + await models.init(); + await settingsService.init(); + + const emailAnalyticsService = require('../'); + + const fetchStartDate = new Date(); + debug('Starting email analytics fetch of latest events'); + const eventStats = await emailAnalyticsService.fetchLatest(); + const fetchEndDate = new Date(); + debug(`Finished fetching ${eventStats.totalEvents} analytics events in ${fetchEndDate - fetchStartDate}ms`); + + const aggregateStartDate = new Date(); + debug(`Starting email analytics aggregation for ${eventStats.emailIds.length} emails`); + await emailAnalyticsService.aggregateStats(eventStats); + const aggregateEndDate = new Date(); + debug(`Finished aggregating email analytics in ${aggregateEndDate - aggregateStartDate}ms`); + + logging.info(`Fetched ${eventStats.totalEvents} events and aggregated stats for ${eventStats.emailIds.length} emails in ${aggregateEndDate - fetchStartDate}ms`); + + if (parentPort) { + parentPort.postMessage('done'); + } else { + // give the logging pipes time finish writing before exit + setTimeout(() => { + process.exit(0); + }, 1000); + } + } catch (error) { + logging.error(error); + + // give the logging pipes time finish writing before exit + setTimeout(() => { + process.exit(1); + }, 1000); + } +})(); diff --git a/core/server/services/email-analytics/lib/event-processing-result.js b/core/server/services/email-analytics/lib/event-processing-result.js new file mode 100644 index 0000000000..496d253546 --- /dev/null +++ b/core/server/services/email-analytics/lib/event-processing-result.js @@ -0,0 +1,45 @@ +const _ = require('lodash'); + +class EventProcessingResult { + constructor(result = {}) { + // counts + this.delivered = 0; + this.opened = 0; + this.failed = 0; + this.unsubscribed = 0; + this.complained = 0; + this.unhandled = 0; + this.unprocessable = 0; + + // ids seen whilst processing ready for passing to the stats aggregator + this.emailIds = []; + this.memberIds = []; + + this.merge(result); + } + + get totalEvents() { + return this.delivered + + this.opened + + this.failed + + this.unsubscribed + + this.complained + + this.unhandled + + this.unprocessable; + } + + merge(other = {}) { + this.delivered += other.delivered || 0; + this.opened += other.opened || 0; + this.failed += other.failed || 0; + this.unsubscribed += other.unsubscribed || 0; + this.complained += other.complained || 0; + this.unhandled += other.unhandled || 0; + this.unprocessable += other.unprocessable || 0; + + this.emailIds = _.compact(_.union(this.emailIds, other.emailIds || [])); + this.memberIds = _.compact(_.union(this.memberIds, other.memberIds || [])); + } +} + +module.exports = EventProcessingResult; diff --git a/core/server/services/email-analytics/lib/event-processor.js b/core/server/services/email-analytics/lib/event-processor.js new file mode 100644 index 0000000000..a111c28035 --- /dev/null +++ b/core/server/services/email-analytics/lib/event-processor.js @@ -0,0 +1,227 @@ +const moment = require('moment'); + +class EmailAnalyticsEventProcessor { + constructor({db, logging}) { + this.db = db; + this.logging = logging || console; + + // avoid having to query email_batch by provider_id for every event + this.providerIdEmailIdMap = {}; + } + + async process(event) { + if (event.type === 'delivered') { + return this.handleDelivered(event); + } + + if (event.type === 'opened') { + return this.handleOpened(event); + } + + if (event.type === 'failed') { + return this.handleFailed(event); + } + + if (event.type === 'unsubscribed') { + return this.handleUnsubscribed(event); + } + + if (event.type === 'complained') { + return this.handleComplained(event); + } + + return { + unhandled: 1 + }; + } + + async handleDelivered(event) { + const emailId = await this._getEmailId(event); + + if (!emailId) { + return {unprocessable: 1}; + } + + // this doesn't work - the Base model intercepts the attr and tries to convert "COALESCE(...)" to a date + // await this.models.EmailRecipient + // .where({email_id: emailId, member_email: event.recipientEmail}) + // .save({delivered_at: this.db.knex.raw('COALESCE(delivered_at, ?)', [moment.utc(event.timestamp).format('YYYY-MM-DD HH:mm:ss')])}, {patch: true, {context: {internal: true}}}); + + const updateResult = await this.db.knex('email_recipients') + .where('email_id', '=', emailId) + .where('member_email', '=', event.recipientEmail) + .update({ + delivered_at: this.db.knex.raw('COALESCE(delivered_at, ?)', [moment.utc(event.timestamp).format('YYYY-MM-DD HH:mm:ss')]) + }); + + if (updateResult !== 0) { + const memberId = await this._getMemberId(event); + + return { + delivered: 1, + emailIds: [emailId], + memberIds: [memberId] + }; + } + + return {delivered: 1}; + } + + async handleOpened(event) { + const emailId = await this._getEmailId(event); + + if (!emailId) { + return {unprocessable: 1}; + } + + const updateResult = await this.db.knex('email_recipients') + .where('email_id', '=', emailId) + .where('member_email', '=', event.recipientEmail) + .update({ + opened_at: this.db.knex.raw('COALESCE(opened_at, ?)', [moment.utc(event.timestamp).format('YYYY-MM-DD HH:mm:ss')]) + }); + + if (updateResult !== 0) { + const memberId = await this._getMemberId(event); + + return { + opened: 1, + emailIds: [emailId], + memberIds: [memberId] + }; + } + + return {opened: 1}; + } + + async handleFailed(event) { + if (event.severity === 'permanent') { + const emailId = await this._getEmailId(event); + + if (!emailId) { + return {unprocessable: 1}; + } + + await this.db.knex('email_recipients') + .where('email_id', '=', emailId) + .where('member_email', '=', event.recipientEmail) + .update({ + failed_at: this.db.knex.raw('COALESCE(failed_at, ?)', [moment.utc(event.timestamp).format('YYYY-MM-DD HH:mm:ss')]) + }); + + // saving via bookshelf triggers label fetch/update which errors and slows down processing + await this.db.knex('members') + .where('id', '=', this.db.knex('email_recipients') + .select('member_id') + .where('email_id', '=', emailId) + .where('member_email', '=', event.recipientEmail) + ) + .update({ + subscribed: false, + updated_at: moment.utc().toDate() + }); + + return { + failed: 1, + emailIds: [emailId] + }; + } + + if (event.severity === 'temporary') { + // we don't care about soft bounces at the moment + return {unhandled: 1}; + } + } + + async handleUnsubscribed(event) { + const emailId = await this._getEmailId(event); + + if (!emailId) { + return {unprocessable: 1}; + } + + // saving via bookshelf triggers label fetch/update which errors and slows down processing + await this.db.knex('members') + .where('id', '=', this.db.knex('email_recipients') + .select('member_id') + .where('email_id', '=', emailId) + .where('member_email', '=', event.recipientEmail) + ) + .update({ + subscribed: false, + updated_at: moment.utc().toDate() + }); + + return { + unsubscribed: 1 + }; + } + + async handleComplained(event) { + const emailId = await this._getEmailId(event); + + if (!emailId) { + return {unprocessable: 1}; + } + + // saving via bookshelf triggers label fetch/update which errors and slows down processing + await this.db.knex('members') + .where('id', '=', this.db.knex('email_recipients') + .select('member_id') + .where('email_id', '=', emailId) + .where('member_email', '=', event.recipientEmail) + ) + .update({ + subscribed: false, + updated_at: moment.utc().toDate() + }); + + return { + complained: 1 + }; + } + + async _getEmailId(event) { + if (event.emailId) { + return event.emailId; + } + + if (event.providerId) { + if (this.providerIdEmailIdMap[event.providerId]) { + return this.providerIdEmailIdMap[event.providerId]; + } + + const {emailId} = await this.db.knex('email_batches') + .select('email_id as emailId') + .where('provider_id', event.providerId) + .first() || {}; + + if (!emailId) { + return; + } + + this.providerIdEmailIdMap[event.providerId] = emailId; + return emailId; + } + } + + async _getMemberId(event) { + if (event.memberId) { + return event.memberId; + } + + const emailId = await this._getEmailId(event); + + if (emailId && event.recipientEmail) { + const {memberId} = await this.db.knex('email_recipients') + .select('member_id as memberId') + .where('member_email', event.recipientEmail) + .where('email_id', emailId) + .first() || {}; + + return memberId; + } + } +} + +module.exports = EmailAnalyticsEventProcessor; diff --git a/core/server/services/email-analytics/lib/stats-aggregator.js b/core/server/services/email-analytics/lib/stats-aggregator.js new file mode 100644 index 0000000000..4a0d4355f7 --- /dev/null +++ b/core/server/services/email-analytics/lib/stats-aggregator.js @@ -0,0 +1,20 @@ +class EmailAnalyticsStatsAggregator { + constructor({logging, db}) { + this.logging = logging || console; + this.db = db; + } + + async aggregateEmail(emailId) { + await this.db.knex('emails').update({ + delivered_count: this.db.knex.raw(`(SELECT COUNT(id) FROM email_recipients WHERE email_id = ? AND delivered_at IS NOT NULL)`, [emailId]), + opened_count: this.db.knex.raw(`(SELECT COUNT(id) FROM email_recipients WHERE email_id = ? AND opened_at IS NOT NULL)`, [emailId]), + failed_count: this.db.knex.raw(`(SELECT COUNT(id) FROM email_recipients WHERE email_id = ? AND failed_at IS NOT NULL)`, [emailId]) + }).where('id', emailId); + } + + async aggregateMember(/*memberId*/) { + // TODO: decide on aggregation algorithm when only certain emails have open tracking + } +} + +module.exports = EmailAnalyticsStatsAggregator; diff --git a/core/server/services/email-analytics/providers/index.js b/core/server/services/email-analytics/providers/index.js new file mode 100644 index 0000000000..c2e50a1ffe --- /dev/null +++ b/core/server/services/email-analytics/providers/index.js @@ -0,0 +1,10 @@ +module.exports = { + init({config, settings, logging = console}) { + return { + get mailgun() { + const Mailgun = require('./mailgun'); + return new Mailgun({config, settings, logging}); + } + }; + } +}; diff --git a/core/server/services/email-analytics/providers/mailgun.js b/core/server/services/email-analytics/providers/mailgun.js new file mode 100644 index 0000000000..b14aa2cc9b --- /dev/null +++ b/core/server/services/email-analytics/providers/mailgun.js @@ -0,0 +1,135 @@ +const _ = require('lodash'); +const mailgunJs = require('mailgun-js'); +const moment = require('moment'); +const EventProcessingResult = require('../lib/event-processing-result'); + +const EVENT_FILTER = 'delivered OR opened OR failed OR unsubscribed OR complained'; +const PAGE_LIMIT = 300; +const TRUST_THRESHOLD_S = 30 * 60; // 30 minutes +const DEFAULT_TAGS = ['bulk-email']; + +class EmailAnalyticsMailgunProvider { + constructor({config, settings, mailgun, logging = console}) { + this.config = config; + this.settings = settings; + this.logging = logging; + this.tags = [...DEFAULT_TAGS]; + this._mailgun = mailgun; + + if (this.config.get('bulkEmail:mailgun:tag')) { + this.tags.push(this.config.get('bulkEmail:mailgun:tag')); + } + } + + // unless an instance is passed in to the constructor, generate a new instance each + // time the getter is called to account for changes in config/settings over time + get mailgun() { + if (this._mailgun) { + return this._mailgun; + } + + const bulkEmailConfig = this.config.get('bulkEmail'); + const bulkEmailSetting = { + apiKey: this.settings.get('mailgun_api_key'), + domain: this.settings.get('mailgun_domain'), + baseUrl: this.settings.get('mailgun_base_url') + }; + const hasMailgunConfig = !!(bulkEmailConfig && bulkEmailConfig.mailgun); + const hasMailgunSetting = !!(bulkEmailSetting && bulkEmailSetting.apiKey && bulkEmailSetting.baseUrl && bulkEmailSetting.domain); + + if (!hasMailgunConfig && !hasMailgunSetting) { + this.logging.warn(`Bulk email service is not configured`); + return undefined; + } + + const mailgunConfig = hasMailgunConfig ? bulkEmailConfig.mailgun : bulkEmailSetting; + const baseUrl = new URL(mailgunConfig.baseUrl); + + return mailgunJs({ + apiKey: mailgunConfig.apiKey, + domain: mailgunConfig.domain, + protocol: baseUrl.protocol, + host: baseUrl.hostname, + port: baseUrl.port, + endpoint: baseUrl.pathname, + retry: 5 + }); + } + + // do not start from a particular time, grab latest then work back through + // pages until we get a blank response + fetchAll(batchHandler) { + const options = { + event: EVENT_FILTER, + limit: PAGE_LIMIT, + tags: this.tags.join(' AND ') + }; + + return this._fetchPages(options, batchHandler); + } + + // fetch from the last known timestamp-TRUST_THRESHOLD then work forwards + // through pages until we get a blank response. This lets us get events + // quicker than the TRUST_THRESHOLD + fetchLatest(latestTimestamp, batchHandler, options) { + const beginDate = moment(latestTimestamp).subtract(TRUST_THRESHOLD_S, 's').toDate(); + + const mailgunOptions = { + limit: PAGE_LIMIT, + event: EVENT_FILTER, + tags: this.tags.join(' AND '), + begin: beginDate.toUTCString(), + ascending: 'yes' + }; + + return this._fetchPages(mailgunOptions, batchHandler, options); + } + + async _fetchPages(mailgunOptions, batchHandler, {maxEvents = Infinity} = {}) { + const {mailgun} = this; + + if (!mailgun) { + this.logging.warn(`Bulk email service is not configured`); + return new EventProcessingResult(); + } + + const result = new EventProcessingResult(); + + let page = await mailgun.events().get(mailgunOptions); + let events = page && page.items && page.items.map(this.normalizeEvent) || []; + + pagesLoop: + while (events.length !== 0) { + const batchResult = await batchHandler(events); + result.merge(batchResult); + + if (result.totalEvents >= maxEvents) { + break pagesLoop; + } + + page = await mailgun.get(page.paging.next.replace('https://api.mailgun.net/v3', '')); + events = page.items.map(this.normalizeEvent); + } + + return result; + } + + normalizeEvent(event) { + // TODO: clean up the <> surrounding email_batches.provider_id values + let providerId = event.message && event.message.headers && event.message.headers['message-id']; + if (providerId) { + providerId = `<${providerId}>`; + } + + return { + type: event.event, + severity: event.severity, + recipientEmail: event.recipient, + emailId: event['user-variables'] && event['user-variables']['email-id'], + providerId: providerId, + timestamp: new Date(event.timestamp * 1000) + }; + } +} + +module.exports = EmailAnalyticsMailgunProvider;