2020-11-19 07:58:41 +03:00
|
|
|
const path = require('path');
|
2020-08-11 23:13:31 +03:00
|
|
|
const fastq = require('fastq');
|
2020-11-05 07:36:29 +03:00
|
|
|
const later = require('@breejs/later');
|
2020-11-19 07:58:41 +03:00
|
|
|
const Bree = require('bree');
|
2020-08-11 23:13:31 +03:00
|
|
|
const pWaitFor = require('p-wait-for');
|
2020-11-10 07:11:24 +03:00
|
|
|
const errors = require('@tryghost/errors');
|
2020-11-05 07:36:29 +03:00
|
|
|
const isCronExpression = require('./is-cron-expression');
|
2020-11-19 07:58:41 +03:00
|
|
|
const assembleBreeJob = require('./assemble-bree-job');
|
2020-08-11 23:13:31 +03:00
|
|
|
|
|
|
|
const worker = async (task, callback) => {
|
|
|
|
try {
|
|
|
|
let result = await task();
|
|
|
|
callback(null, result);
|
|
|
|
} catch (error) {
|
|
|
|
callback(error);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
const handler = (error, result) => {
|
|
|
|
if (error) {
|
2020-11-10 07:11:24 +03:00
|
|
|
// TODO: this handler should not be throwing as this blocks the queue
|
|
|
|
// throw error;
|
2020-08-11 23:13:31 +03:00
|
|
|
}
|
|
|
|
// Can potentially standardise the result here
|
|
|
|
return result;
|
|
|
|
};
|
|
|
|
|
|
|
|
class JobManager {
|
|
|
|
constructor(logging) {
|
|
|
|
this.queue = fastq(this, worker, 1);
|
2020-11-19 07:58:41 +03:00
|
|
|
|
|
|
|
this.bree = new Bree({
|
|
|
|
root: false, // set this to `false` to prevent requiring a root directory of jobs
|
|
|
|
hasSeconds: true, // precission is needed to avoid task ovelaps after immidiate execution
|
2020-11-19 08:15:52 +03:00
|
|
|
outputWorkerMetadata: true,
|
2020-11-19 07:58:41 +03:00
|
|
|
logger: logging
|
|
|
|
});
|
|
|
|
|
2020-08-11 23:13:31 +03:00
|
|
|
this.logging = logging;
|
|
|
|
}
|
|
|
|
|
2020-11-05 06:41:16 +03:00
|
|
|
/**
|
2020-11-30 05:30:45 +03:00
|
|
|
* Adds job to queue in current even loop
|
2020-11-05 06:41:16 +03:00
|
|
|
*
|
2020-11-10 06:32:47 +03:00
|
|
|
* @param {Function|String} job - function or path to a module defining a job
|
2020-11-05 07:36:51 +03:00
|
|
|
* @param {Object} [data] - data to be passed into the job
|
2020-11-05 06:41:16 +03:00
|
|
|
*/
|
2020-08-11 23:13:31 +03:00
|
|
|
addJob(job, data) {
|
2020-11-05 06:42:23 +03:00
|
|
|
this.logging.info('Adding one off job to the queue');
|
|
|
|
|
2020-08-11 23:13:31 +03:00
|
|
|
this.queue.push(async () => {
|
2020-11-10 07:11:24 +03:00
|
|
|
try {
|
|
|
|
if (typeof job === 'function') {
|
|
|
|
await job(data);
|
|
|
|
} else {
|
|
|
|
await require(job)(data);
|
|
|
|
}
|
|
|
|
} catch (err) {
|
|
|
|
// NOTE: each job should be written in a safe way and handle all errors internally
|
|
|
|
// if the error is caught here jobs implementaton should be changed
|
|
|
|
this.logging.error(new errors.IgnitionError({
|
|
|
|
level: 'critical',
|
|
|
|
errorType: 'UnhandledJobError',
|
|
|
|
message: 'Processed job threw an unhandled error',
|
|
|
|
context: (typeof job === 'function') ? 'function' : job,
|
|
|
|
err
|
|
|
|
}));
|
|
|
|
|
|
|
|
throw err;
|
2020-11-10 06:32:47 +03:00
|
|
|
}
|
2020-08-11 23:13:31 +03:00
|
|
|
}, handler);
|
|
|
|
}
|
|
|
|
|
2020-11-05 07:36:29 +03:00
|
|
|
/**
|
2020-11-30 05:30:45 +03:00
|
|
|
* Schedules recuring job offloaded to per-job event-loop (thread or a process)
|
2020-11-05 07:36:29 +03:00
|
|
|
*
|
2020-12-03 06:22:56 +03:00
|
|
|
* @param {String | Date} at - Date, cron or human readable schedule format
|
2020-11-10 06:32:47 +03:00
|
|
|
* @param {Function|String} job - function or path to a module defining a job
|
|
|
|
* @param {Object} [data] - data to be passed into the job
|
2020-11-19 07:58:41 +03:00
|
|
|
* @param {String} [name] - job name
|
2020-11-05 07:36:29 +03:00
|
|
|
*/
|
2020-12-03 06:22:56 +03:00
|
|
|
scheduleJob(at, job, data, name) {
|
2020-11-05 07:36:29 +03:00
|
|
|
let schedule;
|
|
|
|
|
2020-11-19 07:58:41 +03:00
|
|
|
if (!name) {
|
|
|
|
if (typeof job === 'string') {
|
|
|
|
name = path.parse(job).name;
|
|
|
|
} else {
|
|
|
|
throw new Error('Name parameter should be present if job is a function');
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-12-03 06:22:56 +03:00
|
|
|
if (!(at instanceof Date)) {
|
|
|
|
if (isCronExpression(at)) {
|
|
|
|
schedule = later.parse.cron(at, true);
|
2020-12-02 11:26:15 +03:00
|
|
|
} else {
|
2020-12-03 06:22:56 +03:00
|
|
|
schedule = later.parse.text(at);
|
2020-12-02 11:26:15 +03:00
|
|
|
}
|
2020-11-05 07:36:29 +03:00
|
|
|
|
2020-12-02 11:26:15 +03:00
|
|
|
if ((schedule.error && schedule.error !== -1) || schedule.schedules.length === 0) {
|
|
|
|
throw new Error('Invalid schedule format');
|
|
|
|
}
|
2020-11-05 07:36:29 +03:00
|
|
|
|
2020-12-03 06:22:56 +03:00
|
|
|
this.logging.info(`Scheduling job ${name} at ${at}. Next run on: ${later.schedule(schedule).next()}`);
|
2020-12-02 11:26:15 +03:00
|
|
|
} else {
|
2020-12-03 06:22:56 +03:00
|
|
|
this.logging.info(`Scheduling job ${name} at ${at}`);
|
2020-11-05 07:36:29 +03:00
|
|
|
}
|
|
|
|
|
2020-12-03 06:22:56 +03:00
|
|
|
const breeJob = assembleBreeJob(at, job, data, name);
|
2020-11-19 07:58:41 +03:00
|
|
|
this.bree.add(breeJob);
|
|
|
|
return this.bree.start(name);
|
2020-11-05 07:36:29 +03:00
|
|
|
}
|
|
|
|
|
2020-12-08 06:43:18 +03:00
|
|
|
/**
|
|
|
|
* Removes a job from sqcheduled (offloaded) jobs queue.
|
|
|
|
* There is no way to remove jovs from in-line (same event loop) jobs
|
|
|
|
* added through `addJob` method.
|
|
|
|
* The method will throw an Error if job with provided name does not exist.
|
|
|
|
*
|
|
|
|
* NOTE: current implementation does not guarante running job termination
|
|
|
|
* for details see https://github.com/breejs/bree/pull/64
|
|
|
|
*
|
|
|
|
* @param {String} name - job name
|
|
|
|
*/
|
|
|
|
async removeJob(name) {
|
|
|
|
await this.bree.remove(name);
|
|
|
|
}
|
|
|
|
|
2020-11-05 07:36:29 +03:00
|
|
|
/**
|
|
|
|
* @param {import('p-wait-for').Options} [options]
|
|
|
|
*/
|
2020-08-11 23:13:31 +03:00
|
|
|
async shutdown(options) {
|
2020-11-19 07:58:41 +03:00
|
|
|
await this.bree.stop();
|
2020-11-05 07:36:29 +03:00
|
|
|
|
2020-08-11 23:13:31 +03:00
|
|
|
if (this.queue.idle()) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
this.logging.warn('Waiting for busy job queue');
|
|
|
|
|
|
|
|
await pWaitFor(() => this.queue.idle() === true, options);
|
|
|
|
|
|
|
|
this.logging.warn('Job queue finished');
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
module.exports = JobManager;
|