mirror of
https://github.com/TryGhost/Ghost.git
synced 2024-12-25 03:44:29 +03:00
Added support for one off inline jobs
refs https://github.com/TryGhost/Toolbox/issues/359 - Inline one off jobs are needed in situations when we want to run a certain operation only once in the lifecycle of the Ghost instance. These operations should not be extremely long to execute though (not suited for backups or import types of tasks)
This commit is contained in:
parent
822549c9ef
commit
7adf3a5410
@ -12,21 +12,12 @@ const JobsRepository = require('./jobs-repository');
|
|||||||
const worker = async (task, callback) => {
|
const worker = async (task, callback) => {
|
||||||
try {
|
try {
|
||||||
let result = await task();
|
let result = await task();
|
||||||
callback(null, result);
|
await callback(null, result);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
callback(error);
|
await callback(error);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
const handler = (error, result) => {
|
|
||||||
if (error) {
|
|
||||||
// TODO: this handler should not be throwing as this blocks the queue
|
|
||||||
// throw error;
|
|
||||||
}
|
|
||||||
// Can potentially standardise the result here
|
|
||||||
return result;
|
|
||||||
};
|
|
||||||
|
|
||||||
class JobManager {
|
class JobManager {
|
||||||
/**
|
/**
|
||||||
* @param {Object} options
|
* @param {Object} options
|
||||||
@ -71,6 +62,24 @@ class JobManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inlineJobHandler(jobName) {
|
||||||
|
return async (error, result) => {
|
||||||
|
if (error) {
|
||||||
|
await this._jobErrorHandler(error, {
|
||||||
|
name: jobName
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
await this._jobMessageHandler({
|
||||||
|
name: jobName,
|
||||||
|
message: 'done'
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Can potentially standardise the result here
|
||||||
|
return result;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
async _jobMessageHandler({name, message}) {
|
async _jobMessageHandler({name, message}) {
|
||||||
if (this._jobsRepository) {
|
if (this._jobsRepository) {
|
||||||
if (message === 'started') {
|
if (message === 'started') {
|
||||||
@ -161,6 +170,13 @@ class JobManager {
|
|||||||
|
|
||||||
this.queue.push(async () => {
|
this.queue.push(async () => {
|
||||||
try {
|
try {
|
||||||
|
// NOTE: setting the status here otherwise it is impossible to
|
||||||
|
// distinguish between states when the job fails immediately
|
||||||
|
await this._jobMessageHandler({
|
||||||
|
name: name,
|
||||||
|
message: 'started'
|
||||||
|
});
|
||||||
|
|
||||||
if (typeof job === 'function') {
|
if (typeof job === 'function') {
|
||||||
await job(data);
|
await job(data);
|
||||||
} else {
|
} else {
|
||||||
@ -176,7 +192,7 @@ class JobManager {
|
|||||||
|
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
}, handler);
|
}, this.inlineJobHandler(name));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -266,6 +266,121 @@ describe('Job Manager', function () {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('Inline jobs', function () {
|
||||||
|
it('adds job to the queue when it is a unique one', async function () {
|
||||||
|
const spy = sinon.spy();
|
||||||
|
const JobModel = {
|
||||||
|
findOne: sinon.stub().resolves(undefined),
|
||||||
|
add: sinon.stub().resolves()
|
||||||
|
};
|
||||||
|
|
||||||
|
const jobManager = new JobManager({JobModel});
|
||||||
|
await jobManager.addOneOffJob({
|
||||||
|
job: spy,
|
||||||
|
name: 'unique name',
|
||||||
|
data: 'test data',
|
||||||
|
offloaded: false
|
||||||
|
});
|
||||||
|
|
||||||
|
assert.equal(JobModel.add.called, true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('does not add a job to the queue when it already exists', async function () {
|
||||||
|
const spy = sinon.spy();
|
||||||
|
const JobModel = {
|
||||||
|
findOne: sinon.stub().resolves({name: 'I am the only one'}),
|
||||||
|
add: sinon.stub().throws('should not be called')
|
||||||
|
};
|
||||||
|
|
||||||
|
const jobManager = new JobManager({JobModel});
|
||||||
|
|
||||||
|
try {
|
||||||
|
await jobManager.addOneOffJob({
|
||||||
|
job: spy,
|
||||||
|
name: 'I am the only one',
|
||||||
|
data: 'test data',
|
||||||
|
offloaded: false
|
||||||
|
});
|
||||||
|
throw new Error('should not reach this point');
|
||||||
|
} catch (error) {
|
||||||
|
assert.equal(error.message, 'A "I am the only one" one off job has already been executed.');
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it('sets a finished state on an inline job', async function () {
|
||||||
|
const JobModel = {
|
||||||
|
findOne: sinon.stub()
|
||||||
|
.onCall(0)
|
||||||
|
.resolves(null)
|
||||||
|
.resolves({id: 'unique', name: 'successful-oneoff'}),
|
||||||
|
add: sinon.stub().resolves({name: 'successful-oneoff'}),
|
||||||
|
edit: sinon.stub().resolves({name: 'successful-oneoff'})
|
||||||
|
};
|
||||||
|
|
||||||
|
const jobManager = new JobManager({JobModel});
|
||||||
|
|
||||||
|
jobManager.addOneOffJob({
|
||||||
|
job: async () => {
|
||||||
|
return await delay(10);
|
||||||
|
},
|
||||||
|
name: 'successful-oneoff',
|
||||||
|
offloaded: false
|
||||||
|
});
|
||||||
|
|
||||||
|
// allow job to get picked up and executed
|
||||||
|
await delay(20);
|
||||||
|
|
||||||
|
// tracks the job queued
|
||||||
|
should(JobModel.add.args[0][0].status).equal('queued');
|
||||||
|
should(JobModel.add.args[0][0].name).equal('successful-oneoff');
|
||||||
|
|
||||||
|
// tracks the job started
|
||||||
|
should(JobModel.edit.args[0][0].status).equal('started');
|
||||||
|
should(JobModel.edit.args[0][0].started_at).not.equal(undefined);
|
||||||
|
should(JobModel.edit.args[0][1].id).equal('unique');
|
||||||
|
|
||||||
|
// tracks the job finish
|
||||||
|
should(JobModel.edit.args[1][0].status).equal('finished');
|
||||||
|
should(JobModel.edit.args[1][0].finished_at).not.equal(undefined);
|
||||||
|
should(JobModel.edit.args[1][1].id).equal('unique');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('sets a failed state on a job', async function () {
|
||||||
|
const JobModel = {
|
||||||
|
findOne: sinon.stub()
|
||||||
|
.onCall(0)
|
||||||
|
.resolves(null)
|
||||||
|
.resolves({id: 'unique', name: 'failed-oneoff'}),
|
||||||
|
add: sinon.stub().resolves({name: 'failed-oneoff'}),
|
||||||
|
edit: sinon.stub().resolves({name: 'failed-oneoff'})
|
||||||
|
};
|
||||||
|
|
||||||
|
let job = function namedJob() {
|
||||||
|
throw new Error('job error');
|
||||||
|
};
|
||||||
|
const spyHandler = sinon.spy();
|
||||||
|
const jobManager = new JobManager({errorHandler: spyHandler, JobModel});
|
||||||
|
|
||||||
|
await jobManager.addOneOffJob({
|
||||||
|
job,
|
||||||
|
name: 'failed-oneoff',
|
||||||
|
offloaded: false
|
||||||
|
});
|
||||||
|
|
||||||
|
// give time to execute the job
|
||||||
|
await delay(50);
|
||||||
|
|
||||||
|
// tracks the job start
|
||||||
|
should(JobModel.edit.args[0][0].status).equal('started');
|
||||||
|
should(JobModel.edit.args[0][0].started_at).not.equal(undefined);
|
||||||
|
should(JobModel.edit.args[0][1].id).equal('unique');
|
||||||
|
|
||||||
|
// tracks the job failure
|
||||||
|
should(JobModel.edit.args[1][0].status).equal('failed');
|
||||||
|
should(JobModel.edit.args[1][1].id).equal('unique');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
describe('Offloaded jobs', function () {
|
describe('Offloaded jobs', function () {
|
||||||
it('adds job to the queue when it is a unique one', async function () {
|
it('adds job to the queue when it is a unique one', async function () {
|
||||||
const spy = sinon.spy();
|
const spy = sinon.spy();
|
||||||
|
Loading…
Reference in New Issue
Block a user