Migrated jobs to use parentPort.postMessage

refs https://github.com/TryGhost/Ghost/issues/12496

- Using ghost-ignition logging caused file handle leaks. As there is no straight-forward way to handle write streams with bunyan (ghost-ignition's underlying logging library) this method of logging was chosen as an alternative to keep the amount of open file handles to minimum
- The follow up changes will include custom formatter for jobs service which should make logging match the same format  as has been used inside the jobs
This commit is contained in:
Naz 2021-02-22 19:35:04 +13:00 committed by naz
parent adebca422f
commit 8a718ca99a
3 changed files with 23 additions and 30 deletions

View File

@ -1,4 +1,3 @@
const logging = require('../../../../shared/logging');
const {parentPort} = require('bthreads');
const debug = require('ghost-ignition').debug('jobs:email-analytics:fetch-all');
@ -6,7 +5,7 @@ const debug = require('ghost-ignition').debug('jobs:email-analytics:fetch-all');
// NB. can be a _very_ long job for sites with many members and frequent emails
function cancel() {
logging.info('Email analytics fetch-all job cancelled before completion');
parentPort.postMessage('Email analytics fetch-all job cancelled before completion');
if (parentPort) {
parentPort.postMessage('cancelled');
@ -48,7 +47,7 @@ if (parentPort) {
const aggregateEndDate = new Date();
debug(`Finished aggregating email analytics in ${aggregateEndDate - aggregateStartDate}ms`);
logging.info(`Fetched ${eventStats.totalEvents} events and aggregated stats for ${eventStats.emailIds.length} emails in ${aggregateEndDate - fetchStartDate}ms`);
parentPort.postMessage(`Fetched ${eventStats.totalEvents} events and aggregated stats for ${eventStats.emailIds.length} emails in ${aggregateEndDate - fetchStartDate}ms`);
if (parentPort) {
parentPort.postMessage('done');

View File

@ -1,4 +1,3 @@
const logging = require('../../../../shared/logging');
const {parentPort} = require('bthreads');
const debug = require('ghost-ignition').debug('jobs:email-analytics:fetch-latest');
@ -7,7 +6,7 @@ const debug = require('ghost-ignition').debug('jobs:email-analytics:fetch-latest
// Exit early when cancelled to prevent stalling shutdown. No cleanup needed when cancelling as everything is idempotent and will pick up
// where it left off on next run
function cancel() {
logging.info('Email analytics fetch-latest job cancelled before completion');
parentPort.postMessage('Email analytics fetch-latest job cancelled before completion');
if (parentPort) {
parentPort.postMessage('cancelled');
@ -49,7 +48,7 @@ if (parentPort) {
const aggregateEndDate = new Date();
debug(`Finished aggregating email analytics in ${aggregateEndDate - aggregateStartDate}ms`);
logging.info(`Fetched ${eventStats.totalEvents} events and aggregated stats for ${eventStats.emailIds.length} emails in ${aggregateEndDate - fetchStartDate}ms`);
parentPort.postMessage(`Fetched ${eventStats.totalEvents} events and aggregated stats for ${eventStats.emailIds.length} emails in ${aggregateEndDate - fetchStartDate}ms`);
if (parentPort) {
parentPort.postMessage('done');

View File

@ -1,12 +1,10 @@
const {parentPort} = require('bthreads');
const util = require('util');
const logging = require('../../../../../shared/logging');
const models = require('../../../../models');
let shutdown = false;
parentPort.on('message', (message) => {
logging.info(`paret message received: ${message}`);
parentPort.postMessage(`parent message received: ${message}`);
if (message === 'cancel') {
shutdown = true;
@ -17,32 +15,29 @@ const setTimeoutPromise = util.promisify(setTimeout);
const internalContext = {context: {internal: true}};
(async () => {
try {
await models.init();
const models = require('../../../../models');
logging.info(`Fetching tags`);
const tags = await models.Tag.findPage(internalContext);
await models.init();
logging.info(`Found ${tags.data.length} tags. First one: ${tags.data[0].toJSON().slug}`);
parentPort.postMessage(`Fetching tags`);
const tags = await models.Tag.findPage(internalContext);
logging.info(`Waiting 5 seconds to perform second part of the job`);
await setTimeoutPromise(5 * 1000);
parentPort.postMessage(`Found ${tags.data.length} tags. First one: ${tags.data[0].toJSON().slug}`);
if (shutdown) {
logging.info(`Job shutting down gracefully`);
process.exit(0);
}
logging.info(`Fetching posts`);
const posts = await models.Post.findPage(internalContext);
logging.info(`Found ${posts.data.length} posts. First one: ${posts.data[0].toJSON().slug}`);
logging.info('Graceful job has completed!');
parentPort.postMessage(`Waiting 5 seconds to perform second part of the job`);
await setTimeoutPromise(5 * 1000);
if (shutdown) {
parentPort.postMessage(`Job shutting down gracefully`);
process.exit(0);
} catch (err) {
logging.error(err);
process.exit(1);
}
parentPort.postMessage(`Fetching posts`);
const posts = await models.Post.findPage(internalContext);
parentPort.postMessage(`Found ${posts.data.length} posts. First one: ${posts.data[0].toJSON().slug}`);
parentPort.postMessage('Graceful job has completed!');
process.exit(0);
})();