// Switch these lines once there are useful utils // const testUtils = require('./utils'); require('./utils'); const assert = require('assert'); const path = require('path'); const sinon = require('sinon'); const delay = require('delay'); const FakeTimers = require('@sinonjs/fake-timers'); const logging = require('@tryghost/logging'); const JobManager = require('../index'); const sandbox = sinon.createSandbox(); const jobModelInstance = { id: 'unique', get: (field) => { if (field === 'status') { return 'finished'; } } }; describe('Job Manager', function () { beforeEach(function () { sandbox.stub(logging, 'info'); sandbox.stub(logging, 'warn'); sandbox.stub(logging, 'error'); }); afterEach(function () { sandbox.restore(); }); it('public interface', function () { const jobManager = new JobManager({}); should.exist(jobManager.addJob); should.exist(jobManager.hasExecutedSuccessfully); should.exist(jobManager.awaitCompletion); }); describe('Add a job', function () { describe('Inline jobs', function () { it('adds a job to a queue', async function () { const spy = sinon.spy(); const jobManager = new JobManager({ JobModel: sinon.stub().resolves() }); jobManager.addJob({ job: spy, data: 'test data', offloaded: false }); should(jobManager.queue.idle()).be.false(); // give time to execute the job await delay(1); should(jobManager.queue.idle()).be.true(); should(spy.called).be.true(); should(spy.args[0][0]).equal('test data'); }); it('handles failed job gracefully', async function () { const spy = sinon.stub().throws(); const jobManager = new JobManager({}); jobManager.addJob({ job: spy, data: 'test data', offloaded: false }); should(jobManager.queue.idle()).be.false(); // give time to execute the job await delay(1); should(jobManager.queue.idle()).be.true(); should(spy.called).be.true(); should(spy.args[0][0]).equal('test data'); should(logging.error.called).be.true(); }); }); describe('Offloaded jobs', function () { it('fails to schedule for invalid scheduling expression', function () { const jobManager = new JobManager({}); try { jobManager.addJob({ at: 'invalid expression', name: 'jobName' }); } catch (err) { err.message.should.equal('Invalid schedule format'); } }); it('fails to schedule for no job name', function () { const jobManager = new JobManager({}); try { jobManager.addJob({ at: 'invalid expression', job: () => {} }); } catch (err) { err.message.should.equal('Name parameter should be present if job is a function'); } }); it('schedules a job using date format', async function () { const jobManager = new JobManager({}); const timeInTenSeconds = new Date(Date.now() + 10); const jobPath = path.resolve(__dirname, './jobs/simple.js'); const clock = FakeTimers.install({now: Date.now()}); jobManager.addJob({ at: timeInTenSeconds, job: jobPath, name: 'job-in-ten' }); should(jobManager.bree.timeouts['job-in-ten']).type('object'); should(jobManager.bree.workers['job-in-ten']).type('undefined'); // allow to run the job and start the worker await clock.nextAsync(); should(jobManager.bree.workers['job-in-ten']).type('object'); const promise = new Promise((resolve, reject) => { jobManager.bree.workers['job-in-ten'].on('error', reject); jobManager.bree.workers['job-in-ten'].on('exit', (code) => { should(code).equal(0); resolve(); }); }); // allow job to finish execution and exit clock.next(); await promise; should(jobManager.bree.workers['job-in-ten']).type('undefined'); clock.uninstall(); }); it('schedules a job to run immediately', async function () { const jobManager = new JobManager({}); const clock = FakeTimers.install({now: Date.now()}); const jobPath = path.resolve(__dirname, './jobs/simple.js'); jobManager.addJob({ job: jobPath, name: 'job-now' }); should(jobManager.bree.timeouts['job-now']).type('object'); // allow scheduler to pick up the job clock.tick(1); should(jobManager.bree.workers['job-now']).type('object'); const promise = new Promise((resolve, reject) => { jobManager.bree.workers['job-now'].on('error', reject); jobManager.bree.workers['job-now'].on('exit', (code) => { should(code).equal(0); resolve(); }); }); await promise; should(jobManager.bree.workers['job-now']).type('undefined'); clock.uninstall(); }); it('fails to schedule a job with the same name to run immediately one after another', async function () { const jobManager = new JobManager({}); const clock = FakeTimers.install({now: Date.now()}); const jobPath = path.resolve(__dirname, './jobs/simple.js'); jobManager.addJob({ job: jobPath, name: 'job-now' }); should(jobManager.bree.timeouts['job-now']).type('object'); // allow scheduler to pick up the job clock.tick(1); should(jobManager.bree.workers['job-now']).type('object'); const promise = new Promise((resolve, reject) => { jobManager.bree.workers['job-now'].on('error', reject); jobManager.bree.workers['job-now'].on('exit', (code) => { should(code).equal(0); resolve(); }); }); await promise; should(jobManager.bree.workers['job-now']).type('undefined'); (() => { jobManager.addJob({ job: jobPath, name: 'job-now' }); }).should.throw('Job #1 has a duplicate job name of job-now'); clock.uninstall(); }); it('uses custom error handler when job fails', async function (){ let job = function namedJob() { throw new Error('job error'); }; const spyHandler = sinon.spy(); const jobManager = new JobManager({errorHandler: spyHandler}); jobManager.addJob({ job, name: 'will-fail' }); // give time to execute the job // has to be this long because in Node v10 the communication is // done through processes, which takes longer comparing to worker_threads // can be reduced to 100 when Node v10 support is dropped await delay(600); should(spyHandler.called).be.true(); should(spyHandler.args[0][0].message).equal('job error'); should(spyHandler.args[0][1].name).equal('will-fail'); }); it('uses worker message handler when job sends a message', async function (){ const workerMessageHandlerSpy = sinon.spy(); const jobManager = new JobManager({workerMessageHandler: workerMessageHandlerSpy}); jobManager.addJob({ job: path.resolve(__dirname, './jobs/message.js'), name: 'will-send-msg' }); jobManager.bree.run('will-send-msg'); jobManager.bree.workers['will-send-msg'].postMessage('hello from Ghost!'); // Give time for worker (worker thread) <-> parent process (job manager) communication await delay(100); should(workerMessageHandlerSpy.called).be.true(); should(workerMessageHandlerSpy.args[0][0].name).equal('will-send-msg'); should(workerMessageHandlerSpy.args[0][0].message).equal('Worker received: hello from Ghost!'); }); }); }); describe('Add one off job', function () { it('throws if name parameter is not provided', async function () { const jobManager = new JobManager({}); try { await jobManager.addOneOffJob({ job: () => {} }); throw new Error('should have thrown'); } catch (err) { should.equal(err.message, 'The name parameter is required for a one off job.'); } }); describe('Inline jobs', function () { it('adds job to the queue when it is a unique one', async function () { const spy = sinon.spy(); const JobModel = { findOne: sinon.stub().resolves(undefined), add: sinon.stub().resolves() }; const jobManager = new JobManager({JobModel}); await jobManager.addOneOffJob({ job: spy, name: 'unique name', data: 'test data', offloaded: false }); assert.equal(JobModel.add.called, true); }); it('does not add a job to the queue when it already exists', async function () { const spy = sinon.spy(); const JobModel = { findOne: sinon.stub().resolves(jobModelInstance), add: sinon.stub().throws('should not be called') }; const jobManager = new JobManager({JobModel}); try { await jobManager.addOneOffJob({ job: spy, name: 'I am the only one', data: 'test data', offloaded: false }); throw new Error('should not reach this point'); } catch (error) { assert.equal(error.message, 'A "I am the only one" one off job has already been executed.'); } }); it('sets a finished state on an inline job', async function () { const JobModel = { findOne: sinon.stub() .onCall(0) .resolves(null) .resolves({id: 'unique', name: 'successful-oneoff'}), add: sinon.stub().resolves({name: 'successful-oneoff'}), edit: sinon.stub().resolves({name: 'successful-oneoff'}) }; const jobManager = new JobManager({JobModel}); jobManager.addOneOffJob({ job: async () => { return await delay(10); }, name: 'successful-oneoff', offloaded: false }); // allow job to get picked up and executed await delay(20); // tracks the job queued should(JobModel.add.args[0][0].status).equal('queued'); should(JobModel.add.args[0][0].name).equal('successful-oneoff'); // tracks the job started should(JobModel.edit.args[0][0].status).equal('started'); should(JobModel.edit.args[0][0].started_at).not.equal(undefined); should(JobModel.edit.args[0][1].id).equal('unique'); // tracks the job finish should(JobModel.edit.args[1][0].status).equal('finished'); should(JobModel.edit.args[1][0].finished_at).not.equal(undefined); should(JobModel.edit.args[1][1].id).equal('unique'); }); it('sets a failed state on a job', async function () { const JobModel = { findOne: sinon.stub() .onCall(0) .resolves(null) .resolves({id: 'unique', name: 'failed-oneoff'}), add: sinon.stub().resolves({name: 'failed-oneoff'}), edit: sinon.stub().resolves({name: 'failed-oneoff'}) }; let job = function namedJob() { throw new Error('job error'); }; const spyHandler = sinon.spy(); const jobManager = new JobManager({errorHandler: spyHandler, JobModel}); await jobManager.addOneOffJob({ job, name: 'failed-oneoff', offloaded: false }); // give time to execute the job await delay(50); // tracks the job start should(JobModel.edit.args[0][0].status).equal('started'); should(JobModel.edit.args[0][0].started_at).not.equal(undefined); should(JobModel.edit.args[0][1].id).equal('unique'); // tracks the job failure should(JobModel.edit.args[1][0].status).equal('failed'); should(JobModel.edit.args[1][1].id).equal('unique'); }); it('adds job to the queue after failing', async function () { const JobModel = { findOne: sinon.stub() .onCall(0) .resolves(null) .onCall(1) .resolves({id: 'unique'}) .resolves({ id: 'unique', get: (field) => { if (field === 'status') { return 'failed'; } } }), add: sinon.stub().resolves({}), edit: sinon.stub().resolves() }; let job = function namedJob() { throw new Error('job error'); }; const spyHandler = sinon.spy(); const jobManager = new JobManager({errorHandler: spyHandler, JobModel}); await jobManager.addOneOffJob({ job, name: 'failed-oneoff', offloaded: false }); // give time to execute the job and fail await delay(50); should(JobModel.edit.args[1][0].status).equal('failed'); // simulate process restart and "fresh" slate to add the job jobManager.removeJob('failed-oneoff'); await jobManager.addOneOffJob({ job, name: 'failed-oneoff', offloaded: false }); // give time to execute the job and fail AGAIN await delay(50); should(JobModel.edit.args[3][0].status).equal('started'); should(JobModel.edit.args[4][0].status).equal('failed'); }); }); describe('Offloaded jobs', function () { it('adds job to the queue when it is a unique one', async function () { const spy = sinon.spy(); const JobModel = { findOne: sinon.stub().resolves(undefined), add: sinon.stub().resolves() }; const jobManager = new JobManager({JobModel}); await jobManager.addOneOffJob({ job: spy, name: 'unique name', data: 'test data' }); assert.equal(JobModel.add.called, true); }); it('does not add a job to the queue when it already exists', async function () { const spy = sinon.spy(); const JobModel = { findOne: sinon.stub().resolves(jobModelInstance), add: sinon.stub().throws('should not be called') }; const jobManager = new JobManager({JobModel}); try { await jobManager.addOneOffJob({ job: spy, name: 'I am the only one', data: 'test data' }); throw new Error('should not reach this point'); } catch (error) { assert.equal(error.message, 'A "I am the only one" one off job has already been executed.'); } }); it('sets a finished state on a job', async function () { const JobModel = { findOne: sinon.stub() .onCall(0) .resolves(null) .resolves({id: 'unique', name: 'successful-oneoff'}), add: sinon.stub().resolves({name: 'successful-oneoff'}), edit: sinon.stub().resolves({name: 'successful-oneoff'}) }; const jobManager = new JobManager({JobModel}); await jobManager.addOneOffJob({ job: path.resolve(__dirname, './jobs/message.js'), name: 'successful-oneoff' }); // allow job to get picked up and executed await delay(50); jobManager.bree.workers['successful-oneoff'].postMessage('be done!'); // allow the message to be passed around await delay(50); // tracks the job start should(JobModel.edit.args[0][0].status).equal('started'); should(JobModel.edit.args[0][0].started_at).not.equal(undefined); should(JobModel.edit.args[0][1].id).equal('unique'); // tracks the job finish should(JobModel.edit.args[1][0].status).equal('finished'); should(JobModel.edit.args[1][0].finished_at).not.equal(undefined); should(JobModel.edit.args[1][1].id).equal('unique'); }); it('handles a failed job', async function () { const JobModel = { findOne: sinon.stub() .onCall(0) .resolves(null) .resolves(jobModelInstance), add: sinon.stub().resolves({name: 'failed-oneoff'}), edit: sinon.stub().resolves({name: 'failed-oneoff'}) }; let job = function namedJob() { throw new Error('job error'); }; const spyHandler = sinon.spy(); const jobManager = new JobManager({errorHandler: spyHandler, JobModel}); await jobManager.addOneOffJob({ job, name: 'failed-oneoff' }); // give time to execute the job // has to be this long because in Node v10 the communication is // done through processes, which takes longer comparing to worker_threads // can be reduced to 100 when Node v10 support is dropped await delay(100); // still calls the original error handler should(spyHandler.called).be.true(); should(spyHandler.args[0][0].message).equal('job error'); should(spyHandler.args[0][1].name).equal('failed-oneoff'); // tracks the job start should(JobModel.edit.args[0][0].status).equal('started'); should(JobModel.edit.args[0][0].started_at).not.equal(undefined); should(JobModel.edit.args[0][1].id).equal('unique'); // tracks the job failure should(JobModel.edit.args[1][0].status).equal('failed'); should(JobModel.edit.args[1][1].id).equal('unique'); }); }); }); describe('Job execution progress', function () { it('checks if job has ever been executed', async function () { const JobModel = { findOne: sinon.stub() .withArgs('solovei') .onCall(0) .resolves(null) .onCall(1) .resolves({ id: 'unique', get: (field) => { if (field === 'status') { return 'finished'; } } }) .onCall(2) .resolves({ id: 'unique', get: (field) => { if (field === 'status') { return 'failed'; } } }) }; const jobManager = new JobManager({JobModel}); let executed = await jobManager.hasExecutedSuccessfully('solovei'); should.equal(executed, false); executed = await jobManager.hasExecutedSuccessfully('solovei'); should.equal(executed, true); executed = await jobManager.hasExecutedSuccessfully('solovei'); should.equal(executed, false); }); it('can wait for job completion', async function () { const spy = sinon.spy(); let status = 'queued'; const jobWithDelay = async () => { await delay(80); status = 'finished'; spy(); }; const JobModel = { findOne: sinon.stub() // first call when adding a job .withArgs('solovei') .onCall(0) // first call when adding a job .resolves(null) .onCall(1) .resolves(null) .resolves({ id: 'unique', get: () => status }), add: sinon.stub().resolves() }; const jobManager = new JobManager({JobModel}); await jobManager.addOneOffJob({ job: jobWithDelay, name: 'solovei', offloaded: false }); should.equal(spy.called, false); await jobManager.awaitCompletion('solovei'); should.equal(spy.called, true); }); }); describe('Remove a job', function () { it('removes a scheduled job from the queue', async function () { const jobManager = new JobManager({}); const timeInTenSeconds = new Date(Date.now() + 10); const jobPath = path.resolve(__dirname, './jobs/simple.js'); jobManager.addJob({ at: timeInTenSeconds, job: jobPath, name: 'job-in-ten' }); jobManager.bree.config.jobs[0].name.should.equal('job-in-ten'); await jobManager.removeJob('job-in-ten'); should(jobManager.bree.config.jobs[0]).be.undefined; }); }); describe('Shutdown', function () { it('gracefully shuts down an inline jobs', async function () { const jobManager = new JobManager({}); jobManager.addJob({ job: require('./jobs/timed-job'), data: 200, offloaded: false }); should(jobManager.queue.idle()).be.false(); await jobManager.shutdown(); should(jobManager.queue.idle()).be.true(); }); it('gracefully shuts down an interval job', async function () { const jobManager = new JobManager({}); jobManager.addJob({ at: 'every 5 seconds', job: path.resolve(__dirname, './jobs/graceful.js') }); await delay(1); // let the job execution kick in should(Object.keys(jobManager.bree.workers).length).equal(0); should(Object.keys(jobManager.bree.timeouts).length).equal(0); should(Object.keys(jobManager.bree.intervals).length).equal(1); await jobManager.shutdown(); should(Object.keys(jobManager.bree.intervals).length).equal(0); }); }); });