mirror of
https://github.com/TryGhost/Ghost.git
synced 2025-01-05 09:50:34 +03:00
414 lines
15 KiB
JavaScript
414 lines
15 KiB
JavaScript
const path = require('path');
|
|
const util = require('util');
|
|
const setTimeoutPromise = util.promisify(setTimeout);
|
|
const fastq = require('fastq');
|
|
const later = require('@breejs/later');
|
|
const Bree = require('bree');
|
|
const pWaitFor = require('p-wait-for');
|
|
const {UnhandledJobError, IncorrectUsageError} = require('@tryghost/errors');
|
|
const logging = require('@tryghost/logging');
|
|
const isCronExpression = require('./is-cron-expression');
|
|
const assembleBreeJob = require('./assemble-bree-job');
|
|
const JobsRepository = require('./jobs-repository');
|
|
|
|
const worker = async (task, callback) => {
|
|
try {
|
|
let result = await task();
|
|
await callback(null, result);
|
|
} catch (error) {
|
|
await callback(error);
|
|
}
|
|
};
|
|
|
|
const ALL_STATUSES = {
|
|
started: 'started',
|
|
finished: 'finished',
|
|
failed: 'failed',
|
|
queued: 'queued'
|
|
};
|
|
|
|
class JobManager {
|
|
#domainEvents;
|
|
#completionPromises = new Map();
|
|
|
|
/**
|
|
* @param {Object} options
|
|
* @param {Function} [options.errorHandler] - custom job error handler
|
|
* @param {Function} [options.workerMessageHandler] - custom message handler coming from workers
|
|
* @param {Object} [options.JobModel] - a model which can persist job data in the storage
|
|
* @param {Object} [options.domainEvents] - domain events emitter
|
|
*/
|
|
constructor({errorHandler, workerMessageHandler, JobModel, domainEvents}) {
|
|
this.queue = fastq(this, worker, 3);
|
|
this._jobMessageHandler = this._jobMessageHandler.bind(this);
|
|
this._jobErrorHandler = this._jobErrorHandler.bind(this);
|
|
this.#domainEvents = domainEvents;
|
|
|
|
const combinedMessageHandler = workerMessageHandler
|
|
? ({name, message}) => {
|
|
workerMessageHandler({name, message});
|
|
this._jobMessageHandler({name, message});
|
|
}
|
|
: this._jobMessageHandler;
|
|
|
|
const combinedErrorHandler = errorHandler
|
|
? (error, workerMeta) => {
|
|
errorHandler(error, workerMeta);
|
|
this._jobErrorHandler(error, workerMeta);
|
|
}
|
|
: this._jobErrorHandler;
|
|
|
|
this.bree = new Bree({
|
|
root: false, // set this to `false` to prevent requiring a root directory of jobs
|
|
hasSeconds: true, // precision is needed to avoid task overlaps after immediate execution
|
|
outputWorkerMetadata: true,
|
|
logger: logging,
|
|
errorHandler: combinedErrorHandler,
|
|
workerMessageHandler: combinedMessageHandler
|
|
});
|
|
|
|
this.bree.on('worker created', (name) => {
|
|
this._jobMessageHandler({name, message: ALL_STATUSES.started});
|
|
});
|
|
|
|
if (JobModel) {
|
|
this._jobsRepository = new JobsRepository({JobModel});
|
|
}
|
|
}
|
|
|
|
inlineJobHandler(jobName) {
|
|
return async (error, result) => {
|
|
if (error) {
|
|
await this._jobErrorHandler(error, {
|
|
name: jobName
|
|
});
|
|
} else {
|
|
await this._jobMessageHandler({
|
|
name: jobName,
|
|
message: 'done'
|
|
});
|
|
}
|
|
|
|
// Can potentially standardize the result here
|
|
return result;
|
|
};
|
|
}
|
|
|
|
async _jobMessageHandler({name, message}) {
|
|
if (name) {
|
|
if (message === ALL_STATUSES.started) {
|
|
if (this._jobsRepository) {
|
|
const job = await this._jobsRepository.read(name);
|
|
|
|
if (job) {
|
|
await this._jobsRepository.update(job.id, {
|
|
status: ALL_STATUSES.started,
|
|
started_at: new Date()
|
|
});
|
|
}
|
|
}
|
|
} else if (message === 'done') {
|
|
if (this._jobsRepository) {
|
|
const job = await this._jobsRepository.read(name);
|
|
|
|
if (job) {
|
|
await this._jobsRepository.update(job.id, {
|
|
status: ALL_STATUSES.finished,
|
|
finished_at: new Date()
|
|
});
|
|
}
|
|
}
|
|
|
|
// Check completion listeners
|
|
if (this.#completionPromises.has(name)) {
|
|
for (const listeners of this.#completionPromises.get(name)) {
|
|
listeners.resolve();
|
|
}
|
|
// Clear the listeners
|
|
this.#completionPromises.delete(name);
|
|
}
|
|
|
|
if (this.queue.length() <= 1) {
|
|
if (this.#completionPromises.has('all')) {
|
|
for (const listeners of this.#completionPromises.get('all')) {
|
|
listeners.resolve();
|
|
}
|
|
// Clear the listeners
|
|
this.#completionPromises.delete('all');
|
|
}
|
|
}
|
|
} else {
|
|
if (typeof message === 'object' && this.#domainEvents) {
|
|
// Is this an event?
|
|
if (message.event) {
|
|
this.#domainEvents.dispatchRaw(message.event.type, message.event.data);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async _jobErrorHandler(error, jobMeta) {
|
|
if (this._jobsRepository && jobMeta.name) {
|
|
const job = await this._jobsRepository.read(jobMeta.name);
|
|
|
|
if (job) {
|
|
await this._jobsRepository.update(job.id, {
|
|
status: ALL_STATUSES.failed
|
|
});
|
|
}
|
|
}
|
|
|
|
// Check completion listeners and call them with error
|
|
if (this.#completionPromises.has(jobMeta.name)) {
|
|
for (const listeners of this.#completionPromises.get(jobMeta.name)) {
|
|
listeners.reject(error);
|
|
}
|
|
// Clear the listeners
|
|
this.#completionPromises.delete(jobMeta.name);
|
|
}
|
|
|
|
if (this.queue.length() <= 1) {
|
|
if (this.#completionPromises.has('all')) {
|
|
for (const listeners of this.#completionPromises.get('all')) {
|
|
listeners.reject(error);
|
|
}
|
|
// Clear the listeners
|
|
this.#completionPromises.delete('all');
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* By default schedules an "offloaded" job. If `offloaded: true` parameter is set,
|
|
* puts an "inline" immediate job into the queue.
|
|
*
|
|
* @param {Object} GhostJob - job options
|
|
* @prop {Function | String} GhostJob.job - function or path to a module defining a job
|
|
* @prop {String} [GhostJob.name] - unique job name, if not provided takes function name or job script filename
|
|
* @prop {String | Date} [GhostJob.at] - Date, cron or human readable schedule format. Manage will do immediate execution if not specified. Not supported for "inline" jobs
|
|
* @prop {Object} [GhostJob.data] - data to be passed into the job
|
|
* @prop {Boolean} [GhostJob.offloaded] - creates an "offloaded" job running in a worker thread by default. If set to "false" runs an "inline" job on the same event loop
|
|
*/
|
|
addJob({name, at, job, data, offloaded = true}) {
|
|
if (offloaded) {
|
|
logging.info('Adding offloaded job to the queue');
|
|
let schedule;
|
|
|
|
if (!name) {
|
|
if (typeof job === 'string') {
|
|
name = path.parse(job).name;
|
|
} else {
|
|
throw new IncorrectUsageError({
|
|
message: 'Name parameter should be present if job is a function'
|
|
});
|
|
}
|
|
}
|
|
|
|
if (at && !(at instanceof Date)) {
|
|
if (isCronExpression(at)) {
|
|
schedule = later.parse.cron(at, true);
|
|
} else {
|
|
schedule = later.parse.text(at);
|
|
}
|
|
|
|
if ((schedule.error && schedule.error !== -1) || schedule.schedules.length === 0) {
|
|
throw new IncorrectUsageError({
|
|
message: 'Invalid schedule format'
|
|
});
|
|
}
|
|
|
|
logging.info(`Scheduling job ${name} at ${at}. Next run on: ${later.schedule(schedule).next()}`);
|
|
} else if (at !== undefined) {
|
|
logging.info(`Scheduling job ${name} at ${at}`);
|
|
} else {
|
|
logging.info(`Scheduling job ${name} to run immediately`);
|
|
}
|
|
|
|
const breeJob = assembleBreeJob(at, job, data, name);
|
|
this.bree.add(breeJob);
|
|
return this.bree.start(name);
|
|
} else {
|
|
logging.info(`Adding one-off job to queue with current length = ${this.queue.length()} called '${name || 'anonymous'}'`);
|
|
|
|
this.queue.push(async () => {
|
|
try {
|
|
// NOTE: setting the status here otherwise it is impossible to
|
|
// distinguish between states when the job fails immediately
|
|
await this._jobMessageHandler({
|
|
name: name,
|
|
message: ALL_STATUSES.started
|
|
});
|
|
|
|
if (typeof job === 'function') {
|
|
await job(data);
|
|
} else {
|
|
await require(job)(data);
|
|
}
|
|
} catch (err) {
|
|
// NOTE: each job should be written in a safe way and handle all errors internally
|
|
// if the error is caught here jobs implementation should be changed
|
|
logging.error(new UnhandledJobError({
|
|
context: (typeof job === 'function') ? 'function' : job,
|
|
err
|
|
}));
|
|
|
|
throw err;
|
|
}
|
|
}, this.inlineJobHandler(name));
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Adds a job that could ever be executed once. In case the job fails
|
|
* can be "added" again, effectively restarting the failed job.
|
|
*
|
|
* @param {Object} GhostJob - job options
|
|
* @prop {Function | String} GhostJob.job - function or path to a module defining a job
|
|
* @prop {String} GhostJob.name - unique job name, if not provided takes function name or job script filename
|
|
* @prop {String | Date} [GhostJob.at] - Date, cron or human readable schedule format. Manage will do immediate execution if not specified. Not supported for "inline" jobs
|
|
* @prop {Object} [GhostJob.data] - data to be passed into the job
|
|
* @prop {Boolean} [GhostJob.offloaded] - creates an "offloaded" job running in a worker thread by default. If set to "false" runs an "inline" job on the same event loop
|
|
*/
|
|
async addOneOffJob({name, job, data, offloaded = true}) {
|
|
if (!name) {
|
|
throw new IncorrectUsageError({
|
|
message: `The name parameter is required for a one off job.`
|
|
});
|
|
}
|
|
|
|
const persistedJob = await this._jobsRepository.read(name);
|
|
|
|
if (persistedJob && (persistedJob.get('status') !== ALL_STATUSES.failed)) {
|
|
throw new IncorrectUsageError({
|
|
message: `A "${name}" one off job has already been executed.`
|
|
});
|
|
}
|
|
|
|
if (persistedJob && (persistedJob.get('status') === ALL_STATUSES.failed)) {
|
|
await this._jobsRepository.update(persistedJob.id, {
|
|
status: ALL_STATUSES.queued
|
|
});
|
|
} else {
|
|
await this._jobsRepository.add({
|
|
name,
|
|
status: ALL_STATUSES.queued
|
|
});
|
|
}
|
|
|
|
// NOTE: there's a assumption the job with the same name failed while
|
|
// running under different instance of job manager (bree).
|
|
// For example, it failed and the process was restarted.
|
|
// If we want to be able to restart within the same instance,
|
|
// we'd need to handle job restart/removal in Bree first
|
|
this.addJob({name, job, data, offloaded});
|
|
}
|
|
|
|
/**
|
|
* Checks if the one-off job has ever been executed successfully.
|
|
* @param {String} name one-off job name
|
|
*/
|
|
async hasExecutedSuccessfully(name) {
|
|
if (this._jobsRepository) {
|
|
const persistedJob = await this._jobsRepository.read(name);
|
|
|
|
if (!persistedJob) {
|
|
return false;
|
|
} else {
|
|
return (persistedJob.get('status') !== ALL_STATUSES.failed);
|
|
}
|
|
} else {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Awaits completion of the offloaded one-off job.
|
|
* CAUTION: it might take a long time to resolve!
|
|
* @param {String} name one-off job name
|
|
* @returns resolves with a Job model at current state
|
|
*/
|
|
async awaitOneOffCompletion(name) {
|
|
const persistedJob = await this._jobsRepository.read({
|
|
name
|
|
});
|
|
|
|
if (!persistedJob || ![ALL_STATUSES.finished, ALL_STATUSES.failed].includes(persistedJob.get('status'))) {
|
|
// NOTE: can implement exponential backoff here if that's ever needed
|
|
await setTimeoutPromise(500);
|
|
|
|
return this.awaitOneOffCompletion(name);
|
|
}
|
|
|
|
return persistedJob;
|
|
}
|
|
|
|
/***
|
|
* Create this promise before you add the job you want to listen for. Then await the returned promise.
|
|
* Resolves if the job has been executed successfully.
|
|
* Throws an error if the job has failed execution.
|
|
*/
|
|
async awaitCompletion(name) {
|
|
const promise = new Promise((resolve, reject) => {
|
|
this.#completionPromises.set(name, [
|
|
...(this.#completionPromises.get(name) ?? []),
|
|
{resolve, reject}
|
|
]);
|
|
});
|
|
|
|
return promise;
|
|
}
|
|
|
|
/**
|
|
* Wait for all inline jobs to be completed.
|
|
*/
|
|
async allSettled() {
|
|
const name = 'all';
|
|
|
|
return new Promise((resolve, reject) => {
|
|
if (this.queue.idle()) {
|
|
resolve();
|
|
return;
|
|
}
|
|
|
|
this.#completionPromises.set(name, [
|
|
...(this.#completionPromises.get(name) ?? []),
|
|
{resolve, reject}
|
|
]);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Removes an "offloaded" job from scheduled jobs queue.
|
|
* It's NOT yet possible to remove "inline" jobs (will be possible when scheduling is added https://github.com/breejs/bree/issues/68).
|
|
* The method will throw an Error if job with provided name does not exist.
|
|
*
|
|
* NOTE: current implementation does not guarante running job termination
|
|
* for details see https://github.com/breejs/bree/pull/64
|
|
*
|
|
* @param {String} name - job name
|
|
*/
|
|
async removeJob(name) {
|
|
await this.bree.remove(name);
|
|
}
|
|
|
|
/**
|
|
* @param {import('p-wait-for').Options} [options]
|
|
*/
|
|
async shutdown(options) {
|
|
await this.bree.stop();
|
|
|
|
if (this.queue.idle()) {
|
|
return;
|
|
}
|
|
|
|
logging.warn('Waiting for busy job queue');
|
|
|
|
await pWaitFor(() => this.queue.idle() === true, options);
|
|
|
|
logging.warn('Job queue finished');
|
|
}
|
|
}
|
|
|
|
module.exports = JobManager;
|