mirror of
https://github.com/TryGhost/Ghost.git
synced 2024-12-20 17:32:15 +03:00
2447335ab1
no issue
- case: restart Ghost and while having a scheduled post
- caused by 4acc375fb6 (diff-4726ce3c4d18d41afad4b46cb0aa7dd3)
- the bug exists since 2.12
- Bookshelf added support (or better said fixed a bug) for accessing previous attributes
- `object.updated('published_at')` always returned "undefined", because the self-implementation < 2.12 only remembered previous attributes after update (see https://github.com/TryGhost/Ghost/blob/2.11.0/core/server/models/base/index.js#L234)
- but `object.previous('published_at')` returns the current value (object.get('published_at') === object.previous('published_at') -> and that's why rescheduling on bootstrap never worked
- might fix https://forum.ghost.org/t/scheduled-posts-never-publish/6873/10
- reduced timeouts on scheduling unit tests
357 lines
11 KiB
JavaScript
357 lines
11 KiB
JavaScript
const util = require('util');
|
|
const moment = require('moment');
|
|
const debug = require('ghost-ignition').debug('scheduling-default');
|
|
const SchedulingBase = require('./SchedulingBase');
|
|
const common = require('../../lib/common');
|
|
const request = require('../../lib/request');
|
|
|
|
/**
|
|
* @description Default post scheduling implementation.
|
|
*
|
|
* The default scheduler is used for all self-hosted blogs.
|
|
* It is implemented with pure javascript (timers).
|
|
*
|
|
* "node-cron" did not perform well enough and we really just needed a simple time management.
|
|
|
|
* @param {Objec†} options
|
|
* @constructor
|
|
*/
|
|
function SchedulingDefault(options) {
|
|
SchedulingBase.call(this, options);
|
|
|
|
// NOTE: How often should the scheduler wake up?
|
|
this.runTimeoutInMs = 1000 * 60 * 5;
|
|
|
|
// NOTE: An offset between now and past, which helps us choosing jobs which need to be executed soon.
|
|
this.offsetInMinutes = 10;
|
|
this.beforePingInMs = -50;
|
|
this.retryTimeoutInMs = 1000 * 5;
|
|
|
|
// NOTE: Each scheduler implementation can decide whether to load scheduled posts on bootstrap or not.
|
|
this.rescheduleOnBoot = true;
|
|
|
|
// NOTE: A sorted list of all scheduled jobs.
|
|
this.allJobs = {};
|
|
|
|
this.deletedJobs = {};
|
|
this.isRunning = false;
|
|
}
|
|
|
|
util.inherits(SchedulingDefault, SchedulingBase);
|
|
|
|
/**
|
|
* @description Add a new job to the scheduler.
|
|
*
|
|
* A new job get's added when the post scheduler module receives a new model event e.g. "post.scheduled".
|
|
*
|
|
* @param {Object} object
|
|
* {
|
|
* time: [Number] A unix timestamp
|
|
* url: [String] The full post/page API url to publish it.
|
|
* extra: {
|
|
* httpMethod: [String] The method of the target API endpoint.
|
|
* oldTime: [Number] The previous published time.
|
|
* }
|
|
* }
|
|
*/
|
|
SchedulingDefault.prototype.schedule = function (object) {
|
|
this._addJob(object);
|
|
};
|
|
|
|
/**
|
|
* @description Remove & schedule a job.
|
|
*
|
|
* This function is useful if the model layer detects a rescheduling event.
|
|
* Rescheduling means: scheduled -> update published at.
|
|
* To be able to delete the previous job we need the old published time.
|
|
*
|
|
* @param {Object} object
|
|
* {
|
|
* time: [Number] A unix timestamp
|
|
* url: [String] The full post/page API url to publish it.
|
|
* extra: {
|
|
* httpMethod: [String] The method of the target API endpoint.
|
|
* oldTime: [Number] The previous published time.
|
|
* }
|
|
* }
|
|
* @param {Object} options
|
|
* {
|
|
* bootstrap: [Boolean]
|
|
* }
|
|
*/
|
|
SchedulingDefault.prototype.reschedule = function (object, options = {bootstrap: false}) {
|
|
/**
|
|
* CASE:
|
|
* The post scheduling unit calls "reschedule" on bootstrap, because other custom scheduling implementations
|
|
* could use a database and we need to give the chance to update the job (delete + re-add).
|
|
*
|
|
* We receive a "bootstrap" variable to ensure that jobs are scheduled correctly for this scheduler implementation,
|
|
* because "object.extra.oldTime" === "object.time". If we mark the job as deleted, it won't get scheduled.
|
|
*/
|
|
if (!options.bootstrap) {
|
|
this._deleteJob({time: object.extra.oldTime, url: object.url});
|
|
}
|
|
|
|
this._addJob(object);
|
|
};
|
|
|
|
/**
|
|
* @description Unschedule a job.
|
|
*
|
|
* Unscheduling means: scheduled -> draft.
|
|
*
|
|
* @param {Object} object
|
|
* {
|
|
* time: [Number] A unix timestamp
|
|
* url: [String] The full post/page API url to publish it.
|
|
* extra: {
|
|
* httpMethod: [String] The method of the target API endpoint.
|
|
* oldTime: [Number] The previous published time.
|
|
* }
|
|
* }
|
|
*/
|
|
SchedulingDefault.prototype.unschedule = function (object) {
|
|
this._deleteJob(object);
|
|
};
|
|
|
|
/**
|
|
* @description "run" is executed from outside (see post-scheduling module)
|
|
*
|
|
* This function will ensure that the scheduler will be kept alive while the blog is running.
|
|
* It will run recursively and checks if there are new jobs which need to be executed in the next X minutes.
|
|
*/
|
|
SchedulingDefault.prototype.run = function () {
|
|
const self = this;
|
|
let timeout = null,
|
|
recursiveRun;
|
|
|
|
// NOTE: Ensure the scheduler never runs twice.
|
|
if (this.isRunning) {
|
|
return;
|
|
}
|
|
|
|
this.isRunning = true;
|
|
|
|
recursiveRun = function recursiveRun() {
|
|
timeout = setTimeout(function () {
|
|
const times = Object.keys(self.allJobs),
|
|
nextJobs = {};
|
|
|
|
// CASE: We stop till the offset is too big. We are only interested in jobs which need get executed soon.
|
|
times.every(function (time) {
|
|
if (moment(Number(time)).diff(moment(), 'minutes') <= self.offsetInMinutes) {
|
|
nextJobs[time] = self.allJobs[time];
|
|
delete self.allJobs[time];
|
|
return true;
|
|
}
|
|
|
|
// break!
|
|
return false;
|
|
});
|
|
|
|
clearTimeout(timeout);
|
|
self._execute(nextJobs);
|
|
|
|
recursiveRun();
|
|
}, self.runTimeoutInMs);
|
|
};
|
|
|
|
recursiveRun();
|
|
};
|
|
|
|
/**
|
|
* @description Add the actual job to "allJobs".
|
|
* @param {Object} object
|
|
* @private
|
|
*/
|
|
SchedulingDefault.prototype._addJob = function (object) {
|
|
let timestamp = moment(object.time).valueOf(),
|
|
keys = [],
|
|
sortedJobs = {},
|
|
instantJob = {},
|
|
i = 0;
|
|
|
|
// CASE: should have been already pinged or should be pinged soon
|
|
if (moment(timestamp).diff(moment(), 'minutes') < this.offsetInMinutes) {
|
|
debug('Emergency job', object.url, moment(object.time).format('YYYY-MM-DD HH:mm:ss'));
|
|
|
|
instantJob[timestamp] = [object];
|
|
this._execute(instantJob);
|
|
return;
|
|
}
|
|
|
|
// CASE: are there jobs already scheduled for the same time?
|
|
if (!this.allJobs[timestamp]) {
|
|
this.allJobs[timestamp] = [];
|
|
}
|
|
|
|
debug('Added job', object.url, moment(object.time).format('YYYY-MM-DD HH:mm:ss'));
|
|
this.allJobs[timestamp].push(object);
|
|
|
|
keys = Object.keys(this.allJobs);
|
|
keys.sort();
|
|
|
|
for (i = 0; i < keys.length; i = i + 1) {
|
|
sortedJobs[keys[i]] = this.allJobs[keys[i]];
|
|
}
|
|
|
|
this.allJobs = sortedJobs;
|
|
};
|
|
|
|
/**
|
|
* @description Delete the job.
|
|
*
|
|
* Keep a list of deleted jobs because it can happen that a job is already part of the next execution list,
|
|
* but it got deleted meanwhile.
|
|
*
|
|
* @param {Object} object
|
|
* @private
|
|
*/
|
|
SchedulingDefault.prototype._deleteJob = function (object) {
|
|
const {url, time} = object;
|
|
|
|
if (!time) {
|
|
return;
|
|
}
|
|
|
|
const deleteKey = `${url}_${moment(time).valueOf()}`;
|
|
|
|
if (!this.deletedJobs[deleteKey]) {
|
|
this.deletedJobs[deleteKey] = [];
|
|
}
|
|
|
|
debug('Deleted job', url, moment(time).format('YYYY-MM-DD HH:mm:ss'));
|
|
this.deletedJobs[deleteKey].push(object);
|
|
};
|
|
|
|
/**
|
|
* @description The "execute" function will receive the next jobs which need execution.
|
|
*
|
|
* Based on "offsetInMinutes" we figure out which jobs need execution and the "execute" function will
|
|
* ensure that
|
|
*
|
|
* The advantage of having a two step system (a general runner and an executor) is:
|
|
* - accuracy
|
|
* - setTimeout is limited to 24,3 days
|
|
*
|
|
* The execution of "setTimeout" is never guaranteed, therefor we've optimised the execution by using "setImmediate".
|
|
* The executor will put each job to sleep using `setTimeout` with a threshold of 70ms. And "setImmediate" is then
|
|
* used to detect the correct moment to trigger the URL.
|
|
|
|
* We can't use "process.nextTick" otherwise we will block I/O operations.
|
|
*/
|
|
SchedulingDefault.prototype._execute = function (jobs) {
|
|
const keys = Object.keys(jobs),
|
|
self = this;
|
|
|
|
keys.forEach(function (timestamp) {
|
|
let timeout = null,
|
|
diff = moment(Number(timestamp)).diff(moment());
|
|
|
|
// NOTE: awake a little before...
|
|
timeout = setTimeout(function () {
|
|
clearTimeout(timeout);
|
|
|
|
(function retry() {
|
|
let immediate = setImmediate(function () {
|
|
clearImmediate(immediate);
|
|
|
|
// CASE: It's not the time yet...
|
|
if (moment().diff(moment(Number(timestamp))) <= self.beforePingInMs) {
|
|
return retry();
|
|
}
|
|
|
|
const toExecute = jobs[timestamp];
|
|
delete jobs[timestamp];
|
|
|
|
// CASE: each timestamp can have multiple jobs
|
|
toExecute.forEach(function (job) {
|
|
const {url, time} = job;
|
|
const deleteKey = `${url}_${moment(time).valueOf()}`;
|
|
|
|
// CASE: Was the job already deleted in the meanwhile...?
|
|
if (self.deletedJobs[deleteKey]) {
|
|
if (self.deletedJobs[deleteKey].length === 1) {
|
|
delete self.deletedJobs[deleteKey];
|
|
} else {
|
|
self.deletedJobs[deleteKey].pop();
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
self._pingUrl(job);
|
|
});
|
|
});
|
|
})();
|
|
}, diff - 70);
|
|
});
|
|
};
|
|
|
|
/**
|
|
* @description Ping the job URL.
|
|
* @param {Object} object
|
|
* @return {Promise}
|
|
* @private
|
|
*/
|
|
SchedulingDefault.prototype._pingUrl = function (object) {
|
|
const {url, time} = object;
|
|
|
|
debug('Ping url', url, moment().format('YYYY-MM-DD HH:mm:ss'), moment(time).format('YYYY-MM-DD HH:mm:ss'));
|
|
|
|
const httpMethod = object.extra ? object.extra.httpMethod : 'PUT';
|
|
const tries = object.tries || 0;
|
|
const requestTimeout = object.extra ? object.extra.timeoutInMS : 1000 * 5;
|
|
const maxTries = 30;
|
|
|
|
const options = {
|
|
timeout: requestTimeout,
|
|
method: httpMethod.toLowerCase(),
|
|
headers: {
|
|
'Content-Type': 'application/json'
|
|
}
|
|
};
|
|
|
|
// CASE: If we detect to publish a post in the past (case blog is down), we add a force flag
|
|
if (moment(time).isBefore(moment())) {
|
|
if (httpMethod === 'GET') {
|
|
// @TODO: rename to searchParams when updating to Got v10
|
|
options.query = 'force=true';
|
|
} else {
|
|
options.body = JSON.stringify({force: true});
|
|
}
|
|
}
|
|
|
|
return request(url, options).catch((err) => {
|
|
const {statusCode} = err;
|
|
|
|
// CASE: post/page was deleted already
|
|
if (statusCode === 404) {
|
|
return;
|
|
}
|
|
|
|
// CASE: blog is in maintenance mode, retry
|
|
if (statusCode === 503 && tries < maxTries) {
|
|
setTimeout(() => {
|
|
object.tries = tries + 1;
|
|
this._pingUrl(object);
|
|
}, this.retryTimeoutInMs);
|
|
|
|
common.logging.error(new common.errors.GhostError({
|
|
err,
|
|
context: 'Retrying...',
|
|
level: 'normal'
|
|
}));
|
|
|
|
return;
|
|
}
|
|
|
|
common.logging.error(new common.errors.GhostError({
|
|
err,
|
|
level: 'critical'
|
|
}));
|
|
});
|
|
};
|
|
|
|
module.exports = SchedulingDefault;
|