Adds cron support to pg-boss Jobs and documentation (#586)

This commit is contained in:
Shayne Czyzewski 2022-05-13 12:30:55 -04:00 committed by GitHub
parent f14be11fc3
commit 4de7d16331
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 979 additions and 282 deletions

View File

@ -5,6 +5,7 @@ where
import Control.Monad.Except (throwError)
import Control.Monad.IO.Class (liftIO)
import Wasp.AppSpec (AppSpec)
import qualified Wasp.AppSpec.App.Dependency as AS.Dependency
import Wasp.Cli.Command (Command, CommandError (..))
import Wasp.Cli.Command.Common (findWaspProjectRootDirFromCwd)
@ -26,7 +27,11 @@ deps = do
return
appSpecOrCompileErrors
liftIO . putStrLn . unlines $
liftIO . putStrLn $ depsMessage appSpec
depsMessage :: AppSpec -> String
depsMessage appSpec =
unlines $
[ "",
title "Below are listed the dependencies that Wasp uses in your project. You can import and use these directly in the code as if you specified them yourself, but you can't change their versions.",
""

View File

@ -2,4 +2,9 @@
import { createJob } from './{= executorJobRelFP =}'
{=& jobPerformFnImportStatement =}
export const {= jobName =} = await createJob({ jobName: "{= jobName =}", jobFn: {= jobPerformFnName =}, defaultJobOptions: {=& jobPerformOptions =} })
export const {= jobName =} = createJob({
jobName: "{= jobName =}",
jobFn: {= jobPerformFnName =},
defaultJobOptions: {=& jobPerformOptions =},
jobSchedule: {=& jobSchedule =}
})

View File

@ -0,0 +1,8 @@
{{={= =}=}}
// This module exports all jobs and is imported by the server to ensure
// any schedules that are not referenced are still loaded by NodeJS.
{=# jobs =}
export { {= name =} } from '../{= name =}.js'
{=/ jobs =}

View File

@ -1,10 +1,41 @@
import PgBoss from 'pg-boss'
import config from '../../../config.js'
export const boss = new PgBoss({ connectionString: config.databaseUrl })
const boss = createPgBoss()
// Ensure PgBoss can only be started once during a server's lifetime.
let hasPgBossBeenStarted = false
function createPgBoss() {
let pgBossNewOptions = {
connectionString: config.databaseUrl,
}
// Add an escape hatch for advanced configuration of pg-boss to overwrite our defaults.
if (process.env.PG_BOSS_NEW_OPTIONS) {
try {
pgBossNewOptions = JSON.parse(process.env.PG_BOSS_NEW_OPTIONS)
}
catch {
console.error("Environment variable PG_BOSS_NEW_OPTIONS was not parsable by JSON.parse()!")
}
}
return new PgBoss(pgBossNewOptions)
}
let resolvePgBossStarted, rejectPgBossStarted
// Code that wants to access pg-boss must wait until it has been started.
export const pgBossStarted = new Promise((resolve, reject) => {
resolvePgBossStarted = resolve
rejectPgBossStarted = reject
})
// Ensure pg-boss can only be started once during a server's lifetime.
const PgBossStatus = {
Unstarted: 'Unstarted',
Starting: 'Starting',
Started: 'Started',
Error: 'Error'
}
let pgBossStatus = PgBossStatus.Unstarted
/**
* Prepares the target PostgreSQL database and begins job monitoring.
@ -12,17 +43,27 @@ let hasPgBossBeenStarted = false
* `boss.start()` will automatically create them.
* Ref: https://github.com/timgit/pg-boss/blob/master/docs/readme.md#start
*
* After making this call, we can send PgBoss jobs and they will be persisted and acted upon.
* After making this call, we can send pg-boss jobs and they will be persisted and acted upon.
* This should only be called once during a server's lifetime.
*/
export async function startPgBoss() {
if (!hasPgBossBeenStarted) {
console.log('Starting PgBoss...')
if (pgBossStatus !== PgBossStatus.Unstarted) { return }
pgBossStatus = PgBossStatus.Starting
console.log('Starting pg-boss...')
boss.on('error', error => console.error(error))
boss.on('error', error => console.error(error))
try {
await boss.start()
console.log('PgBoss started!')
hasPgBossBeenStarted = true
} catch (error) {
console.error('pg-boss failed to start!')
console.error(error)
pgBossStatus = PgBossStatus.Error
rejectPgBossStarted(boss)
return
}
resolvePgBossStarted(boss)
console.log('pg-boss started!')
pgBossStatus = PgBossStatus.Started
}

View File

@ -1,25 +1,25 @@
import { boss } from './pgBoss.js'
import { pgBossStarted } from './pgBoss.js'
import { Job } from '../Job.js'
import { SubmittedJob } from '../SubmittedJob.js'
export const PG_BOSS_EXECUTOR_NAME = Symbol('PgBoss')
/**
* A PgBoss specific SubmittedJob that adds additional PgBoss functionality.
* A pg-boss specific SubmittedJob that adds additional pg-boss functionality.
*/
class PgBossSubmittedJob extends SubmittedJob {
constructor(job, jobId) {
constructor(boss, job, jobId) {
super(job, jobId)
this.pgBoss = {
async cancel() { return boss.cancel(jobId) },
async resume() { return boss.resume(jobId) },
async details() { return boss.getJobById(jobId) }
cancel: () => boss.cancel(jobId),
resume: () => boss.resume(jobId),
details: () => boss.getJobById(jobId),
}
}
}
/**
* This is a class repesenting a job that can be submitted to PgBoss.
* This is a class repesenting a job that can be submitted to pg-boss.
* It is not yet submitted until the caller invokes `submit()` on an instance.
* The caller can make as many calls to `submit()` as they wish.
*/
@ -51,14 +51,15 @@ class PgBossJob extends Job {
}
/**
* Submits the job to PgBoss.
* Submits the job to pg-boss.
* @param {object} jobArgs - The job arguments supplied by the user for their perform callback.
* @param {object} jobOptions - PgBoss specific options for `boss.send()`, which can override their defaultJobOptions.
* @param {object} jobOptions - pg-boss specific options for `boss.send()`, which can override their defaultJobOptions.
*/
async submit(jobArgs, jobOptions) {
const boss = await pgBossStarted
const jobId = await boss.send(this.jobName, jobArgs,
{ ...this.#defaultJobOptions, ...(this.#startAfter && { startAfter: this.#startAfter }), ...jobOptions })
return new PgBossSubmittedJob(this, jobId)
return new PgBossSubmittedJob(boss, this, jobId)
}
}
@ -68,17 +69,36 @@ class PgBossJob extends Job {
* functions, we will override the previous calls.
* @param {string} jobName - The user-defined job name in their .wasp file.
* @param {fn} jobFn - The user-defined async job callback function.
* @param {object} defaultJobOptions - PgBoss specific options for boss.send() applied to every submit() invocation,
* @param {object} defaultJobOptions - pg-boss specific options for `boss.send()` applied to every `submit()` invocation,
* which can overriden in that call.
* @param {object} jobSchedule [Optional] - The 5-field cron string, job function JSON arg, and `boss.send()` options when invoking the job.
*/
export async function createJob({ jobName, jobFn, defaultJobOptions } = {}) {
// As a safety precaution against undefined behavior of registering different
// functions for the same job name, remove all registered functions first.
await boss.offWork(jobName)
export function createJob({ jobName, jobFn, defaultJobOptions, jobSchedule } = {}) {
// NOTE(shayne): We are not awaiting `pgBossStarted` here since we need to return an instance to the job
// template, or else the NodeJS module bootstrapping process will block and fail as it would then depend
// on a runtime resolution of the promise in `startServer()`.
// Since `pgBossStarted` will resolve in the future, it may appear possible to send pg-boss
// a job before we actually have registered the handler via `boss.work()`. However, even if NodeJS does
// not execute this callback before any job `submit()` calls, this is not a problem since pg-boss allows you
// to submit jobs even if there are no workers registered.
// Once they are registered, they will just start on the first job in their queue.
pgBossStarted.then(async (boss) => {
// As a safety precaution against undefined behavior of registering different
// functions for the same job name, remove all registered functions first.
await boss.offWork(jobName)
// This tells pgBoss to run given worker function when job/payload with given job name is submitted.
// Ref: https://github.com/timgit/pg-boss/blob/master/docs/readme.md#work
await boss.work(jobName, jobFn)
// This tells pg-boss to run given worker function when job with that name is submitted.
// Ref: https://github.com/timgit/pg-boss/blob/master/docs/readme.md#work
await boss.work(jobName, jobFn)
// If a job schedule is provided, we should schedule the recurring job.
// If the schedule name already exists, it's updated to the provided cron expression, arguments, and options.
// Ref: https://github.com/timgit/pg-boss/blob/master/docs/readme.md#scheduling
if (jobSchedule) {
const options = { ...defaultJobOptions, ...jobSchedule.options }
await boss.schedule(jobName, jobSchedule.cron, jobSchedule.args || null, options)
}
})
return new PgBossJob(jobName, defaultJobOptions)
}

View File

@ -2,14 +2,15 @@ import { sleep } from '../../utils.js'
import { Job } from './Job.js'
import { SubmittedJob } from './SubmittedJob.js'
export const PASSTHROUGH_EXECUTOR_NAME = Symbol('Passthrough')
export const SIMPLE_EXECUTOR_NAME = Symbol('Simple')
/**
* A simple job mainly intended for testing. It will not submit work to any
* job executor, but instead will simply invoke the underlying perform function.
* It is dependency-free, however.
* It does not support `schedule`. It does not require any extra NPM dependencies
* or infrastructure, however.
*/
class PassthroughJob extends Job {
class SimpleJob extends Job {
#jobFn
#delaySeconds
@ -20,7 +21,7 @@ class PassthroughJob extends Job {
* @param {int} delaySeconds - The number of seconds to delay invoking the Job function.
*/
constructor(jobName, jobFn, delaySeconds = 0) {
super(jobName, PASSTHROUGH_EXECUTOR_NAME)
super(jobName, SIMPLE_EXECUTOR_NAME)
this.#jobFn = jobFn
this.#delaySeconds = delaySeconds
}
@ -29,18 +30,18 @@ class PassthroughJob extends Job {
* @param {int} delaySeconds - Used to delay the processing of the job by some number of seconds.
*/
delay(delaySeconds) {
return new PassthroughJob(this.jobName, this.#jobFn, delaySeconds)
return new SimpleJob(this.jobName, this.#jobFn, delaySeconds)
}
async submit(jobArgs) {
sleep(this.#delaySeconds * 1000).then(() => this.#jobFn(jobArgs))
// NOTE: Dumb random ID generator, mainly so we don't have to add `uuid`
// as a dependency in the server generator for something nobody will likely use.
let jobId = (Math.random() + 1).toString(36).substring(7)
const jobId = (Math.random() + 1).toString(36).substring(7)
return new SubmittedJob(this, jobId)
}
}
export function createJob({ jobName, jobFn } = {}) {
return new PassthroughJob(jobName, jobFn)
return new SimpleJob(jobName, jobFn)
}

View File

@ -11,6 +11,7 @@ import config from './config.js'
{=# isPgBossJobExecutorUsed =}
import { startPgBoss } from './jobs/core/pgBoss/pgBoss.js'
import './jobs/core/allJobs.js'
{=/ isPgBossJobExecutorUsed =}
const startServer = async () => {

View File

@ -17,9 +17,10 @@ waspBuild/.wasp/build/server/src/ext-src/MainPage.js
waspBuild/.wasp/build/server/src/ext-src/waspLogo.png
waspBuild/.wasp/build/server/src/jobs/core/Job.js
waspBuild/.wasp/build/server/src/jobs/core/SubmittedJob.js
waspBuild/.wasp/build/server/src/jobs/core/passthroughJob.js
waspBuild/.wasp/build/server/src/jobs/core/allJobs.js
waspBuild/.wasp/build/server/src/jobs/core/pgBoss/pgBoss.js
waspBuild/.wasp/build/server/src/jobs/core/pgBoss/pgBossJob.js
waspBuild/.wasp/build/server/src/jobs/core/simpleJob.js
waspBuild/.wasp/build/server/src/routes/index.js
waspBuild/.wasp/build/server/src/routes/operations/index.js
waspBuild/.wasp/build/server/src/server.js

View File

@ -128,23 +128,30 @@
[
[
"file",
"server/src/jobs/core/passthroughJob.js"
"server/src/jobs/core/allJobs.js"
],
"5df690abebd10220346751adcfc157dd66f34a033abe017e34ac4e84287090bf"
"90b1b3012216900efa82fff14911dcbf195fa2e449edb3b24ab80db0065d796f"
],
[
[
"file",
"server/src/jobs/core/pgBoss/pgBoss.js"
],
"b6bc8378ba0870623fdf63620ceb60306f45616ef073bb399307784fe10b20c4"
"9821963d90b39058285343834c70e6f825d3d7696c738fd95539614b5e7d7b94"
],
[
[
"file",
"server/src/jobs/core/pgBoss/pgBossJob.js"
],
"d55605f4699cdba098a7a52f6d4d69e4cd4deabf3234b08634f86fbcf9a31dc8"
"6a58878b76d5086454bc721dc779d0589eab6096913272910f390dc502a34708"
],
[
[
"file",
"server/src/jobs/core/simpleJob.js"
],
"36fe173d9f5128859196bfd3a661983df2d95eb34d165a469b840982b06cf59b"
],
[
[

View File

@ -0,0 +1,4 @@
// This module exports all jobs and is imported by the server to ensure
// any schedules that are not referenced are still loaded by NodeJS.

View File

@ -1,10 +1,41 @@
import PgBoss from 'pg-boss'
import config from '../../../config.js'
export const boss = new PgBoss({ connectionString: config.databaseUrl })
const boss = createPgBoss()
// Ensure PgBoss can only be started once during a server's lifetime.
let hasPgBossBeenStarted = false
function createPgBoss() {
let pgBossNewOptions = {
connectionString: config.databaseUrl,
}
// Add an escape hatch for advanced configuration of pg-boss to overwrite our defaults.
if (process.env.PG_BOSS_NEW_OPTIONS) {
try {
pgBossNewOptions = JSON.parse(process.env.PG_BOSS_NEW_OPTIONS)
}
catch {
console.error("Environment variable PG_BOSS_NEW_OPTIONS was not parsable by JSON.parse()!")
}
}
return new PgBoss(pgBossNewOptions)
}
let resolvePgBossStarted, rejectPgBossStarted
// Code that wants to access pg-boss must wait until it has been started.
export const pgBossStarted = new Promise((resolve, reject) => {
resolvePgBossStarted = resolve
rejectPgBossStarted = reject
})
// Ensure pg-boss can only be started once during a server's lifetime.
const PgBossStatus = {
Unstarted: 'Unstarted',
Starting: 'Starting',
Started: 'Started',
Error: 'Error'
}
let pgBossStatus = PgBossStatus.Unstarted
/**
* Prepares the target PostgreSQL database and begins job monitoring.
@ -12,17 +43,27 @@ let hasPgBossBeenStarted = false
* `boss.start()` will automatically create them.
* Ref: https://github.com/timgit/pg-boss/blob/master/docs/readme.md#start
*
* After making this call, we can send PgBoss jobs and they will be persisted and acted upon.
* After making this call, we can send pg-boss jobs and they will be persisted and acted upon.
* This should only be called once during a server's lifetime.
*/
export async function startPgBoss() {
if (!hasPgBossBeenStarted) {
console.log('Starting PgBoss...')
if (pgBossStatus !== PgBossStatus.Unstarted) { return }
pgBossStatus = PgBossStatus.Starting
console.log('Starting pg-boss...')
boss.on('error', error => console.error(error))
boss.on('error', error => console.error(error))
try {
await boss.start()
console.log('PgBoss started!')
hasPgBossBeenStarted = true
} catch (error) {
console.error('pg-boss failed to start!')
console.error(error)
pgBossStatus = PgBossStatus.Error
rejectPgBossStarted(boss)
return
}
resolvePgBossStarted(boss)
console.log('pg-boss started!')
pgBossStatus = PgBossStatus.Started
}

View File

@ -1,25 +1,25 @@
import { boss } from './pgBoss.js'
import { pgBossStarted } from './pgBoss.js'
import { Job } from '../Job.js'
import { SubmittedJob } from '../SubmittedJob.js'
export const PG_BOSS_EXECUTOR_NAME = Symbol('PgBoss')
/**
* A PgBoss specific SubmittedJob that adds additional PgBoss functionality.
* A pg-boss specific SubmittedJob that adds additional pg-boss functionality.
*/
class PgBossSubmittedJob extends SubmittedJob {
constructor(job, jobId) {
constructor(boss, job, jobId) {
super(job, jobId)
this.pgBoss = {
async cancel() { return boss.cancel(jobId) },
async resume() { return boss.resume(jobId) },
async details() { return boss.getJobById(jobId) }
cancel: () => boss.cancel(jobId),
resume: () => boss.resume(jobId),
details: () => boss.getJobById(jobId),
}
}
}
/**
* This is a class repesenting a job that can be submitted to PgBoss.
* This is a class repesenting a job that can be submitted to pg-boss.
* It is not yet submitted until the caller invokes `submit()` on an instance.
* The caller can make as many calls to `submit()` as they wish.
*/
@ -51,14 +51,15 @@ class PgBossJob extends Job {
}
/**
* Submits the job to PgBoss.
* Submits the job to pg-boss.
* @param {object} jobArgs - The job arguments supplied by the user for their perform callback.
* @param {object} jobOptions - PgBoss specific options for `boss.send()`, which can override their defaultJobOptions.
* @param {object} jobOptions - pg-boss specific options for `boss.send()`, which can override their defaultJobOptions.
*/
async submit(jobArgs, jobOptions) {
const boss = await pgBossStarted
const jobId = await boss.send(this.jobName, jobArgs,
{ ...this.#defaultJobOptions, ...(this.#startAfter && { startAfter: this.#startAfter }), ...jobOptions })
return new PgBossSubmittedJob(this, jobId)
return new PgBossSubmittedJob(boss, this, jobId)
}
}
@ -68,17 +69,36 @@ class PgBossJob extends Job {
* functions, we will override the previous calls.
* @param {string} jobName - The user-defined job name in their .wasp file.
* @param {fn} jobFn - The user-defined async job callback function.
* @param {object} defaultJobOptions - PgBoss specific options for boss.send() applied to every submit() invocation,
* @param {object} defaultJobOptions - pg-boss specific options for `boss.send()` applied to every `submit()` invocation,
* which can overriden in that call.
* @param {object} jobSchedule [Optional] - The 5-field cron string, job function JSON arg, and `boss.send()` options when invoking the job.
*/
export async function createJob({ jobName, jobFn, defaultJobOptions } = {}) {
// As a safety precaution against undefined behavior of registering different
// functions for the same job name, remove all registered functions first.
await boss.offWork(jobName)
export function createJob({ jobName, jobFn, defaultJobOptions, jobSchedule } = {}) {
// NOTE(shayne): We are not awaiting `pgBossStarted` here since we need to return an instance to the job
// template, or else the NodeJS module bootstrapping process will block and fail as it would then depend
// on a runtime resolution of the promise in `startServer()`.
// Since `pgBossStarted` will resolve in the future, it may appear possible to send pg-boss
// a job before we actually have registered the handler via `boss.work()`. However, even if NodeJS does
// not execute this callback before any job `submit()` calls, this is not a problem since pg-boss allows you
// to submit jobs even if there are no workers registered.
// Once they are registered, they will just start on the first job in their queue.
pgBossStarted.then(async (boss) => {
// As a safety precaution against undefined behavior of registering different
// functions for the same job name, remove all registered functions first.
await boss.offWork(jobName)
// This tells pgBoss to run given worker function when job/payload with given job name is submitted.
// Ref: https://github.com/timgit/pg-boss/blob/master/docs/readme.md#work
await boss.work(jobName, jobFn)
// This tells pg-boss to run given worker function when job with that name is submitted.
// Ref: https://github.com/timgit/pg-boss/blob/master/docs/readme.md#work
await boss.work(jobName, jobFn)
// If a job schedule is provided, we should schedule the recurring job.
// If the schedule name already exists, it's updated to the provided cron expression, arguments, and options.
// Ref: https://github.com/timgit/pg-boss/blob/master/docs/readme.md#scheduling
if (jobSchedule) {
const options = { ...defaultJobOptions, ...jobSchedule.options }
await boss.schedule(jobName, jobSchedule.cron, jobSchedule.args || null, options)
}
})
return new PgBossJob(jobName, defaultJobOptions)
}

View File

@ -2,14 +2,15 @@ import { sleep } from '../../utils.js'
import { Job } from './Job.js'
import { SubmittedJob } from './SubmittedJob.js'
export const PASSTHROUGH_EXECUTOR_NAME = Symbol('Passthrough')
export const SIMPLE_EXECUTOR_NAME = Symbol('Simple')
/**
* A simple job mainly intended for testing. It will not submit work to any
* job executor, but instead will simply invoke the underlying perform function.
* It is dependency-free, however.
* It does not support `schedule`. It does not require any extra NPM dependencies
* or infrastructure, however.
*/
class PassthroughJob extends Job {
class SimpleJob extends Job {
#jobFn
#delaySeconds
@ -20,7 +21,7 @@ class PassthroughJob extends Job {
* @param {int} delaySeconds - The number of seconds to delay invoking the Job function.
*/
constructor(jobName, jobFn, delaySeconds = 0) {
super(jobName, PASSTHROUGH_EXECUTOR_NAME)
super(jobName, SIMPLE_EXECUTOR_NAME)
this.#jobFn = jobFn
this.#delaySeconds = delaySeconds
}
@ -29,18 +30,18 @@ class PassthroughJob extends Job {
* @param {int} delaySeconds - Used to delay the processing of the job by some number of seconds.
*/
delay(delaySeconds) {
return new PassthroughJob(this.jobName, this.#jobFn, delaySeconds)
return new SimpleJob(this.jobName, this.#jobFn, delaySeconds)
}
async submit(jobArgs) {
sleep(this.#delaySeconds * 1000).then(() => this.#jobFn(jobArgs))
// NOTE: Dumb random ID generator, mainly so we don't have to add `uuid`
// as a dependency in the server generator for something nobody will likely use.
let jobId = (Math.random() + 1).toString(36).substring(7)
const jobId = (Math.random() + 1).toString(36).substring(7)
return new SubmittedJob(this, jobId)
}
}
export function createJob({ jobName, jobFn } = {}) {
return new PassthroughJob(jobName, jobFn)
return new SimpleJob(jobName, jobFn)
}

View File

@ -17,9 +17,10 @@ waspCompile/.wasp/out/server/src/ext-src/MainPage.js
waspCompile/.wasp/out/server/src/ext-src/waspLogo.png
waspCompile/.wasp/out/server/src/jobs/core/Job.js
waspCompile/.wasp/out/server/src/jobs/core/SubmittedJob.js
waspCompile/.wasp/out/server/src/jobs/core/passthroughJob.js
waspCompile/.wasp/out/server/src/jobs/core/allJobs.js
waspCompile/.wasp/out/server/src/jobs/core/pgBoss/pgBoss.js
waspCompile/.wasp/out/server/src/jobs/core/pgBoss/pgBossJob.js
waspCompile/.wasp/out/server/src/jobs/core/simpleJob.js
waspCompile/.wasp/out/server/src/routes/index.js
waspCompile/.wasp/out/server/src/routes/operations/index.js
waspCompile/.wasp/out/server/src/server.js

View File

@ -128,23 +128,30 @@
[
[
"file",
"server/src/jobs/core/passthroughJob.js"
"server/src/jobs/core/allJobs.js"
],
"5df690abebd10220346751adcfc157dd66f34a033abe017e34ac4e84287090bf"
"90b1b3012216900efa82fff14911dcbf195fa2e449edb3b24ab80db0065d796f"
],
[
[
"file",
"server/src/jobs/core/pgBoss/pgBoss.js"
],
"b6bc8378ba0870623fdf63620ceb60306f45616ef073bb399307784fe10b20c4"
"9821963d90b39058285343834c70e6f825d3d7696c738fd95539614b5e7d7b94"
],
[
[
"file",
"server/src/jobs/core/pgBoss/pgBossJob.js"
],
"d55605f4699cdba098a7a52f6d4d69e4cd4deabf3234b08634f86fbcf9a31dc8"
"6a58878b76d5086454bc721dc779d0589eab6096913272910f390dc502a34708"
],
[
[
"file",
"server/src/jobs/core/simpleJob.js"
],
"36fe173d9f5128859196bfd3a661983df2d95eb34d165a469b840982b06cf59b"
],
[
[

View File

@ -0,0 +1,4 @@
// This module exports all jobs and is imported by the server to ensure
// any schedules that are not referenced are still loaded by NodeJS.

View File

@ -1,10 +1,41 @@
import PgBoss from 'pg-boss'
import config from '../../../config.js'
export const boss = new PgBoss({ connectionString: config.databaseUrl })
const boss = createPgBoss()
// Ensure PgBoss can only be started once during a server's lifetime.
let hasPgBossBeenStarted = false
function createPgBoss() {
let pgBossNewOptions = {
connectionString: config.databaseUrl,
}
// Add an escape hatch for advanced configuration of pg-boss to overwrite our defaults.
if (process.env.PG_BOSS_NEW_OPTIONS) {
try {
pgBossNewOptions = JSON.parse(process.env.PG_BOSS_NEW_OPTIONS)
}
catch {
console.error("Environment variable PG_BOSS_NEW_OPTIONS was not parsable by JSON.parse()!")
}
}
return new PgBoss(pgBossNewOptions)
}
let resolvePgBossStarted, rejectPgBossStarted
// Code that wants to access pg-boss must wait until it has been started.
export const pgBossStarted = new Promise((resolve, reject) => {
resolvePgBossStarted = resolve
rejectPgBossStarted = reject
})
// Ensure pg-boss can only be started once during a server's lifetime.
const PgBossStatus = {
Unstarted: 'Unstarted',
Starting: 'Starting',
Started: 'Started',
Error: 'Error'
}
let pgBossStatus = PgBossStatus.Unstarted
/**
* Prepares the target PostgreSQL database and begins job monitoring.
@ -12,17 +43,27 @@ let hasPgBossBeenStarted = false
* `boss.start()` will automatically create them.
* Ref: https://github.com/timgit/pg-boss/blob/master/docs/readme.md#start
*
* After making this call, we can send PgBoss jobs and they will be persisted and acted upon.
* After making this call, we can send pg-boss jobs and they will be persisted and acted upon.
* This should only be called once during a server's lifetime.
*/
export async function startPgBoss() {
if (!hasPgBossBeenStarted) {
console.log('Starting PgBoss...')
if (pgBossStatus !== PgBossStatus.Unstarted) { return }
pgBossStatus = PgBossStatus.Starting
console.log('Starting pg-boss...')
boss.on('error', error => console.error(error))
boss.on('error', error => console.error(error))
try {
await boss.start()
console.log('PgBoss started!')
hasPgBossBeenStarted = true
} catch (error) {
console.error('pg-boss failed to start!')
console.error(error)
pgBossStatus = PgBossStatus.Error
rejectPgBossStarted(boss)
return
}
resolvePgBossStarted(boss)
console.log('pg-boss started!')
pgBossStatus = PgBossStatus.Started
}

View File

@ -1,25 +1,25 @@
import { boss } from './pgBoss.js'
import { pgBossStarted } from './pgBoss.js'
import { Job } from '../Job.js'
import { SubmittedJob } from '../SubmittedJob.js'
export const PG_BOSS_EXECUTOR_NAME = Symbol('PgBoss')
/**
* A PgBoss specific SubmittedJob that adds additional PgBoss functionality.
* A pg-boss specific SubmittedJob that adds additional pg-boss functionality.
*/
class PgBossSubmittedJob extends SubmittedJob {
constructor(job, jobId) {
constructor(boss, job, jobId) {
super(job, jobId)
this.pgBoss = {
async cancel() { return boss.cancel(jobId) },
async resume() { return boss.resume(jobId) },
async details() { return boss.getJobById(jobId) }
cancel: () => boss.cancel(jobId),
resume: () => boss.resume(jobId),
details: () => boss.getJobById(jobId),
}
}
}
/**
* This is a class repesenting a job that can be submitted to PgBoss.
* This is a class repesenting a job that can be submitted to pg-boss.
* It is not yet submitted until the caller invokes `submit()` on an instance.
* The caller can make as many calls to `submit()` as they wish.
*/
@ -51,14 +51,15 @@ class PgBossJob extends Job {
}
/**
* Submits the job to PgBoss.
* Submits the job to pg-boss.
* @param {object} jobArgs - The job arguments supplied by the user for their perform callback.
* @param {object} jobOptions - PgBoss specific options for `boss.send()`, which can override their defaultJobOptions.
* @param {object} jobOptions - pg-boss specific options for `boss.send()`, which can override their defaultJobOptions.
*/
async submit(jobArgs, jobOptions) {
const boss = await pgBossStarted
const jobId = await boss.send(this.jobName, jobArgs,
{ ...this.#defaultJobOptions, ...(this.#startAfter && { startAfter: this.#startAfter }), ...jobOptions })
return new PgBossSubmittedJob(this, jobId)
return new PgBossSubmittedJob(boss, this, jobId)
}
}
@ -68,17 +69,36 @@ class PgBossJob extends Job {
* functions, we will override the previous calls.
* @param {string} jobName - The user-defined job name in their .wasp file.
* @param {fn} jobFn - The user-defined async job callback function.
* @param {object} defaultJobOptions - PgBoss specific options for boss.send() applied to every submit() invocation,
* @param {object} defaultJobOptions - pg-boss specific options for `boss.send()` applied to every `submit()` invocation,
* which can overriden in that call.
* @param {object} jobSchedule [Optional] - The 5-field cron string, job function JSON arg, and `boss.send()` options when invoking the job.
*/
export async function createJob({ jobName, jobFn, defaultJobOptions } = {}) {
// As a safety precaution against undefined behavior of registering different
// functions for the same job name, remove all registered functions first.
await boss.offWork(jobName)
export function createJob({ jobName, jobFn, defaultJobOptions, jobSchedule } = {}) {
// NOTE(shayne): We are not awaiting `pgBossStarted` here since we need to return an instance to the job
// template, or else the NodeJS module bootstrapping process will block and fail as it would then depend
// on a runtime resolution of the promise in `startServer()`.
// Since `pgBossStarted` will resolve in the future, it may appear possible to send pg-boss
// a job before we actually have registered the handler via `boss.work()`. However, even if NodeJS does
// not execute this callback before any job `submit()` calls, this is not a problem since pg-boss allows you
// to submit jobs even if there are no workers registered.
// Once they are registered, they will just start on the first job in their queue.
pgBossStarted.then(async (boss) => {
// As a safety precaution against undefined behavior of registering different
// functions for the same job name, remove all registered functions first.
await boss.offWork(jobName)
// This tells pgBoss to run given worker function when job/payload with given job name is submitted.
// Ref: https://github.com/timgit/pg-boss/blob/master/docs/readme.md#work
await boss.work(jobName, jobFn)
// This tells pg-boss to run given worker function when job with that name is submitted.
// Ref: https://github.com/timgit/pg-boss/blob/master/docs/readme.md#work
await boss.work(jobName, jobFn)
// If a job schedule is provided, we should schedule the recurring job.
// If the schedule name already exists, it's updated to the provided cron expression, arguments, and options.
// Ref: https://github.com/timgit/pg-boss/blob/master/docs/readme.md#scheduling
if (jobSchedule) {
const options = { ...defaultJobOptions, ...jobSchedule.options }
await boss.schedule(jobName, jobSchedule.cron, jobSchedule.args || null, options)
}
})
return new PgBossJob(jobName, defaultJobOptions)
}

View File

@ -2,14 +2,15 @@ import { sleep } from '../../utils.js'
import { Job } from './Job.js'
import { SubmittedJob } from './SubmittedJob.js'
export const PASSTHROUGH_EXECUTOR_NAME = Symbol('Passthrough')
export const SIMPLE_EXECUTOR_NAME = Symbol('Simple')
/**
* A simple job mainly intended for testing. It will not submit work to any
* job executor, but instead will simply invoke the underlying perform function.
* It is dependency-free, however.
* It does not support `schedule`. It does not require any extra NPM dependencies
* or infrastructure, however.
*/
class PassthroughJob extends Job {
class SimpleJob extends Job {
#jobFn
#delaySeconds
@ -20,7 +21,7 @@ class PassthroughJob extends Job {
* @param {int} delaySeconds - The number of seconds to delay invoking the Job function.
*/
constructor(jobName, jobFn, delaySeconds = 0) {
super(jobName, PASSTHROUGH_EXECUTOR_NAME)
super(jobName, SIMPLE_EXECUTOR_NAME)
this.#jobFn = jobFn
this.#delaySeconds = delaySeconds
}
@ -29,18 +30,18 @@ class PassthroughJob extends Job {
* @param {int} delaySeconds - Used to delay the processing of the job by some number of seconds.
*/
delay(delaySeconds) {
return new PassthroughJob(this.jobName, this.#jobFn, delaySeconds)
return new SimpleJob(this.jobName, this.#jobFn, delaySeconds)
}
async submit(jobArgs) {
sleep(this.#delaySeconds * 1000).then(() => this.#jobFn(jobArgs))
// NOTE: Dumb random ID generator, mainly so we don't have to add `uuid`
// as a dependency in the server generator for something nobody will likely use.
let jobId = (Math.random() + 1).toString(36).substring(7)
const jobId = (Math.random() + 1).toString(36).substring(7)
return new SubmittedJob(this, jobId)
}
}
export function createJob({ jobName, jobFn } = {}) {
return new PassthroughJob(jobName, jobFn)
return new SimpleJob(jobName, jobFn)
}

View File

@ -19,9 +19,10 @@ waspJob/.wasp/out/server/src/ext-src/waspLogo.png
waspJob/.wasp/out/server/src/jobs/MySpecialJob.js
waspJob/.wasp/out/server/src/jobs/core/Job.js
waspJob/.wasp/out/server/src/jobs/core/SubmittedJob.js
waspJob/.wasp/out/server/src/jobs/core/passthroughJob.js
waspJob/.wasp/out/server/src/jobs/core/allJobs.js
waspJob/.wasp/out/server/src/jobs/core/pgBoss/pgBoss.js
waspJob/.wasp/out/server/src/jobs/core/pgBoss/pgBossJob.js
waspJob/.wasp/out/server/src/jobs/core/simpleJob.js
waspJob/.wasp/out/server/src/routes/index.js
waspJob/.wasp/out/server/src/routes/operations/index.js
waspJob/.wasp/out/server/src/server.js

View File

@ -123,7 +123,7 @@
"file",
"server/src/jobs/MySpecialJob.js"
],
"36f01c1d688af5eb7eaaec00c920af81dbf2a3d9f54444d94873306e8d10527f"
"4639c4c738a9a6a5b0137962ac64252da12aaa01cff7918ba0937e8b375bf9e6"
],
[
[
@ -142,23 +142,30 @@
[
[
"file",
"server/src/jobs/core/passthroughJob.js"
"server/src/jobs/core/allJobs.js"
],
"5df690abebd10220346751adcfc157dd66f34a033abe017e34ac4e84287090bf"
"b5ddc268dfe8f1f7d96d775c1d8d407a647a45ed4937a87da7eb1eb50f9a4674"
],
[
[
"file",
"server/src/jobs/core/pgBoss/pgBoss.js"
],
"b6bc8378ba0870623fdf63620ceb60306f45616ef073bb399307784fe10b20c4"
"9821963d90b39058285343834c70e6f825d3d7696c738fd95539614b5e7d7b94"
],
[
[
"file",
"server/src/jobs/core/pgBoss/pgBossJob.js"
],
"d55605f4699cdba098a7a52f6d4d69e4cd4deabf3234b08634f86fbcf9a31dc8"
"6a58878b76d5086454bc721dc779d0589eab6096913272910f390dc502a34708"
],
[
[
"file",
"server/src/jobs/core/simpleJob.js"
],
"36fe173d9f5128859196bfd3a661983df2d95eb34d165a469b840982b06cf59b"
],
[
[
@ -179,7 +186,7 @@
"file",
"server/src/server.js"
],
"6ee8aa871c3340f832d2a7fddac32a29a87683699f654bf6ada91377c26ddb2b"
"25cee89ea082bef40d13a6ba01718ecaa0933c92991c9d395dc8602215c61255"
],
[
[

View File

@ -1,4 +1,9 @@
import { createJob } from './core/pgBoss/pgBossJob.js'
import { foo } from './../ext-src/jobs/bar.js'
export const MySpecialJob = await createJob({ jobName: "MySpecialJob", jobFn: foo, defaultJobOptions: {} })
export const MySpecialJob = createJob({
jobName: "MySpecialJob",
jobFn: foo,
defaultJobOptions: {},
jobSchedule: null
})

View File

@ -0,0 +1,5 @@
// This module exports all jobs and is imported by the server to ensure
// any schedules that are not referenced are still loaded by NodeJS.
export { MySpecialJob } from '../MySpecialJob.js'

View File

@ -1,10 +1,41 @@
import PgBoss from 'pg-boss'
import config from '../../../config.js'
export const boss = new PgBoss({ connectionString: config.databaseUrl })
const boss = createPgBoss()
// Ensure PgBoss can only be started once during a server's lifetime.
let hasPgBossBeenStarted = false
function createPgBoss() {
let pgBossNewOptions = {
connectionString: config.databaseUrl,
}
// Add an escape hatch for advanced configuration of pg-boss to overwrite our defaults.
if (process.env.PG_BOSS_NEW_OPTIONS) {
try {
pgBossNewOptions = JSON.parse(process.env.PG_BOSS_NEW_OPTIONS)
}
catch {
console.error("Environment variable PG_BOSS_NEW_OPTIONS was not parsable by JSON.parse()!")
}
}
return new PgBoss(pgBossNewOptions)
}
let resolvePgBossStarted, rejectPgBossStarted
// Code that wants to access pg-boss must wait until it has been started.
export const pgBossStarted = new Promise((resolve, reject) => {
resolvePgBossStarted = resolve
rejectPgBossStarted = reject
})
// Ensure pg-boss can only be started once during a server's lifetime.
const PgBossStatus = {
Unstarted: 'Unstarted',
Starting: 'Starting',
Started: 'Started',
Error: 'Error'
}
let pgBossStatus = PgBossStatus.Unstarted
/**
* Prepares the target PostgreSQL database and begins job monitoring.
@ -12,17 +43,27 @@ let hasPgBossBeenStarted = false
* `boss.start()` will automatically create them.
* Ref: https://github.com/timgit/pg-boss/blob/master/docs/readme.md#start
*
* After making this call, we can send PgBoss jobs and they will be persisted and acted upon.
* After making this call, we can send pg-boss jobs and they will be persisted and acted upon.
* This should only be called once during a server's lifetime.
*/
export async function startPgBoss() {
if (!hasPgBossBeenStarted) {
console.log('Starting PgBoss...')
if (pgBossStatus !== PgBossStatus.Unstarted) { return }
pgBossStatus = PgBossStatus.Starting
console.log('Starting pg-boss...')
boss.on('error', error => console.error(error))
boss.on('error', error => console.error(error))
try {
await boss.start()
console.log('PgBoss started!')
hasPgBossBeenStarted = true
} catch (error) {
console.error('pg-boss failed to start!')
console.error(error)
pgBossStatus = PgBossStatus.Error
rejectPgBossStarted(boss)
return
}
resolvePgBossStarted(boss)
console.log('pg-boss started!')
pgBossStatus = PgBossStatus.Started
}

View File

@ -1,25 +1,25 @@
import { boss } from './pgBoss.js'
import { pgBossStarted } from './pgBoss.js'
import { Job } from '../Job.js'
import { SubmittedJob } from '../SubmittedJob.js'
export const PG_BOSS_EXECUTOR_NAME = Symbol('PgBoss')
/**
* A PgBoss specific SubmittedJob that adds additional PgBoss functionality.
* A pg-boss specific SubmittedJob that adds additional pg-boss functionality.
*/
class PgBossSubmittedJob extends SubmittedJob {
constructor(job, jobId) {
constructor(boss, job, jobId) {
super(job, jobId)
this.pgBoss = {
async cancel() { return boss.cancel(jobId) },
async resume() { return boss.resume(jobId) },
async details() { return boss.getJobById(jobId) }
cancel: () => boss.cancel(jobId),
resume: () => boss.resume(jobId),
details: () => boss.getJobById(jobId),
}
}
}
/**
* This is a class repesenting a job that can be submitted to PgBoss.
* This is a class repesenting a job that can be submitted to pg-boss.
* It is not yet submitted until the caller invokes `submit()` on an instance.
* The caller can make as many calls to `submit()` as they wish.
*/
@ -51,14 +51,15 @@ class PgBossJob extends Job {
}
/**
* Submits the job to PgBoss.
* Submits the job to pg-boss.
* @param {object} jobArgs - The job arguments supplied by the user for their perform callback.
* @param {object} jobOptions - PgBoss specific options for `boss.send()`, which can override their defaultJobOptions.
* @param {object} jobOptions - pg-boss specific options for `boss.send()`, which can override their defaultJobOptions.
*/
async submit(jobArgs, jobOptions) {
const boss = await pgBossStarted
const jobId = await boss.send(this.jobName, jobArgs,
{ ...this.#defaultJobOptions, ...(this.#startAfter && { startAfter: this.#startAfter }), ...jobOptions })
return new PgBossSubmittedJob(this, jobId)
return new PgBossSubmittedJob(boss, this, jobId)
}
}
@ -68,17 +69,36 @@ class PgBossJob extends Job {
* functions, we will override the previous calls.
* @param {string} jobName - The user-defined job name in their .wasp file.
* @param {fn} jobFn - The user-defined async job callback function.
* @param {object} defaultJobOptions - PgBoss specific options for boss.send() applied to every submit() invocation,
* @param {object} defaultJobOptions - pg-boss specific options for `boss.send()` applied to every `submit()` invocation,
* which can overriden in that call.
* @param {object} jobSchedule [Optional] - The 5-field cron string, job function JSON arg, and `boss.send()` options when invoking the job.
*/
export async function createJob({ jobName, jobFn, defaultJobOptions } = {}) {
// As a safety precaution against undefined behavior of registering different
// functions for the same job name, remove all registered functions first.
await boss.offWork(jobName)
export function createJob({ jobName, jobFn, defaultJobOptions, jobSchedule } = {}) {
// NOTE(shayne): We are not awaiting `pgBossStarted` here since we need to return an instance to the job
// template, or else the NodeJS module bootstrapping process will block and fail as it would then depend
// on a runtime resolution of the promise in `startServer()`.
// Since `pgBossStarted` will resolve in the future, it may appear possible to send pg-boss
// a job before we actually have registered the handler via `boss.work()`. However, even if NodeJS does
// not execute this callback before any job `submit()` calls, this is not a problem since pg-boss allows you
// to submit jobs even if there are no workers registered.
// Once they are registered, they will just start on the first job in their queue.
pgBossStarted.then(async (boss) => {
// As a safety precaution against undefined behavior of registering different
// functions for the same job name, remove all registered functions first.
await boss.offWork(jobName)
// This tells pgBoss to run given worker function when job/payload with given job name is submitted.
// Ref: https://github.com/timgit/pg-boss/blob/master/docs/readme.md#work
await boss.work(jobName, jobFn)
// This tells pg-boss to run given worker function when job with that name is submitted.
// Ref: https://github.com/timgit/pg-boss/blob/master/docs/readme.md#work
await boss.work(jobName, jobFn)
// If a job schedule is provided, we should schedule the recurring job.
// If the schedule name already exists, it's updated to the provided cron expression, arguments, and options.
// Ref: https://github.com/timgit/pg-boss/blob/master/docs/readme.md#scheduling
if (jobSchedule) {
const options = { ...defaultJobOptions, ...jobSchedule.options }
await boss.schedule(jobName, jobSchedule.cron, jobSchedule.args || null, options)
}
})
return new PgBossJob(jobName, defaultJobOptions)
}

View File

@ -2,14 +2,15 @@ import { sleep } from '../../utils.js'
import { Job } from './Job.js'
import { SubmittedJob } from './SubmittedJob.js'
export const PASSTHROUGH_EXECUTOR_NAME = Symbol('Passthrough')
export const SIMPLE_EXECUTOR_NAME = Symbol('Simple')
/**
* A simple job mainly intended for testing. It will not submit work to any
* job executor, but instead will simply invoke the underlying perform function.
* It is dependency-free, however.
* It does not support `schedule`. It does not require any extra NPM dependencies
* or infrastructure, however.
*/
class PassthroughJob extends Job {
class SimpleJob extends Job {
#jobFn
#delaySeconds
@ -20,7 +21,7 @@ class PassthroughJob extends Job {
* @param {int} delaySeconds - The number of seconds to delay invoking the Job function.
*/
constructor(jobName, jobFn, delaySeconds = 0) {
super(jobName, PASSTHROUGH_EXECUTOR_NAME)
super(jobName, SIMPLE_EXECUTOR_NAME)
this.#jobFn = jobFn
this.#delaySeconds = delaySeconds
}
@ -29,18 +30,18 @@ class PassthroughJob extends Job {
* @param {int} delaySeconds - Used to delay the processing of the job by some number of seconds.
*/
delay(delaySeconds) {
return new PassthroughJob(this.jobName, this.#jobFn, delaySeconds)
return new SimpleJob(this.jobName, this.#jobFn, delaySeconds)
}
async submit(jobArgs) {
sleep(this.#delaySeconds * 1000).then(() => this.#jobFn(jobArgs))
// NOTE: Dumb random ID generator, mainly so we don't have to add `uuid`
// as a dependency in the server generator for something nobody will likely use.
let jobId = (Math.random() + 1).toString(36).substring(7)
const jobId = (Math.random() + 1).toString(36).substring(7)
return new SubmittedJob(this, jobId)
}
}
export function createJob({ jobName, jobFn } = {}) {
return new PassthroughJob(jobName, jobFn)
return new SimpleJob(jobName, jobFn)
}

View File

@ -6,6 +6,7 @@ import config from './config.js'
import { startPgBoss } from './jobs/core/pgBoss/pgBoss.js'
import './jobs/core/allJobs.js'
const startServer = async () => {
await startPgBoss()

View File

@ -22,9 +22,10 @@ waspMigrate/.wasp/out/server/src/ext-src/MainPage.js
waspMigrate/.wasp/out/server/src/ext-src/waspLogo.png
waspMigrate/.wasp/out/server/src/jobs/core/Job.js
waspMigrate/.wasp/out/server/src/jobs/core/SubmittedJob.js
waspMigrate/.wasp/out/server/src/jobs/core/passthroughJob.js
waspMigrate/.wasp/out/server/src/jobs/core/allJobs.js
waspMigrate/.wasp/out/server/src/jobs/core/pgBoss/pgBoss.js
waspMigrate/.wasp/out/server/src/jobs/core/pgBoss/pgBossJob.js
waspMigrate/.wasp/out/server/src/jobs/core/simpleJob.js
waspMigrate/.wasp/out/server/src/routes/index.js
waspMigrate/.wasp/out/server/src/routes/operations/index.js
waspMigrate/.wasp/out/server/src/server.js

View File

@ -128,23 +128,30 @@
[
[
"file",
"server/src/jobs/core/passthroughJob.js"
"server/src/jobs/core/allJobs.js"
],
"5df690abebd10220346751adcfc157dd66f34a033abe017e34ac4e84287090bf"
"90b1b3012216900efa82fff14911dcbf195fa2e449edb3b24ab80db0065d796f"
],
[
[
"file",
"server/src/jobs/core/pgBoss/pgBoss.js"
],
"b6bc8378ba0870623fdf63620ceb60306f45616ef073bb399307784fe10b20c4"
"9821963d90b39058285343834c70e6f825d3d7696c738fd95539614b5e7d7b94"
],
[
[
"file",
"server/src/jobs/core/pgBoss/pgBossJob.js"
],
"d55605f4699cdba098a7a52f6d4d69e4cd4deabf3234b08634f86fbcf9a31dc8"
"6a58878b76d5086454bc721dc779d0589eab6096913272910f390dc502a34708"
],
[
[
"file",
"server/src/jobs/core/simpleJob.js"
],
"36fe173d9f5128859196bfd3a661983df2d95eb34d165a469b840982b06cf59b"
],
[
[

View File

@ -0,0 +1,4 @@
// This module exports all jobs and is imported by the server to ensure
// any schedules that are not referenced are still loaded by NodeJS.

View File

@ -1,46 +0,0 @@
import { sleep } from '../../utils.js'
import { Job } from './Job.js'
import { SubmittedJob } from './SubmittedJob.js'
export const PASSTHROUGH_EXECUTOR_NAME = Symbol('Passthrough')
/**
* A simple job mainly intended for testing. It will not submit work to any
* job executor, but instead will simply invoke the underlying perform function.
* It is dependency-free, however.
*/
class PassthroughJob extends Job {
#jobFn
#delaySeconds
/**
*
* @param {string} jobName - Name of the Job.
* @param {fn} jobFn - The Job function to execute.
* @param {int} delaySeconds - The number of seconds to delay invoking the Job function.
*/
constructor(jobName, jobFn, delaySeconds = 0) {
super(jobName, PASSTHROUGH_EXECUTOR_NAME)
this.#jobFn = jobFn
this.#delaySeconds = delaySeconds
}
/**
* @param {int} delaySeconds - Used to delay the processing of the job by some number of seconds.
*/
delay(delaySeconds) {
return new PassthroughJob(this.jobName, this.#jobFn, delaySeconds)
}
async submit(jobArgs) {
sleep(this.#delaySeconds * 1000).then(() => this.#jobFn(jobArgs))
// NOTE: Dumb random ID generator, mainly so we don't have to add `uuid`
// as a dependency in the server generator for something nobody will likely use.
let jobId = (Math.random() + 1).toString(36).substring(7)
return new SubmittedJob(this, jobId)
}
}
export function createJob({ jobName, jobFn } = {}) {
return new PassthroughJob(jobName, jobFn)
}

View File

@ -1,10 +1,41 @@
import PgBoss from 'pg-boss'
import config from '../../../config.js'
export const boss = new PgBoss({ connectionString: config.databaseUrl })
const boss = createPgBoss()
// Ensure PgBoss can only be started once during a server's lifetime.
let hasPgBossBeenStarted = false
function createPgBoss() {
let pgBossNewOptions = {
connectionString: config.databaseUrl,
}
// Add an escape hatch for advanced configuration of pg-boss to overwrite our defaults.
if (process.env.PG_BOSS_NEW_OPTIONS) {
try {
pgBossNewOptions = JSON.parse(process.env.PG_BOSS_NEW_OPTIONS)
}
catch {
console.error("Environment variable PG_BOSS_NEW_OPTIONS was not parsable by JSON.parse()!")
}
}
return new PgBoss(pgBossNewOptions)
}
let resolvePgBossStarted, rejectPgBossStarted
// Code that wants to access pg-boss must wait until it has been started.
export const pgBossStarted = new Promise((resolve, reject) => {
resolvePgBossStarted = resolve
rejectPgBossStarted = reject
})
// Ensure pg-boss can only be started once during a server's lifetime.
const PgBossStatus = {
Unstarted: 'Unstarted',
Starting: 'Starting',
Started: 'Started',
Error: 'Error'
}
let pgBossStatus = PgBossStatus.Unstarted
/**
* Prepares the target PostgreSQL database and begins job monitoring.
@ -12,17 +43,27 @@ let hasPgBossBeenStarted = false
* `boss.start()` will automatically create them.
* Ref: https://github.com/timgit/pg-boss/blob/master/docs/readme.md#start
*
* After making this call, we can send PgBoss jobs and they will be persisted and acted upon.
* After making this call, we can send pg-boss jobs and they will be persisted and acted upon.
* This should only be called once during a server's lifetime.
*/
export async function startPgBoss() {
if (!hasPgBossBeenStarted) {
console.log('Starting PgBoss...')
if (pgBossStatus !== PgBossStatus.Unstarted) { return }
pgBossStatus = PgBossStatus.Starting
console.log('Starting pg-boss...')
boss.on('error', error => console.error(error))
boss.on('error', error => console.error(error))
try {
await boss.start()
console.log('PgBoss started!')
hasPgBossBeenStarted = true
} catch (error) {
console.error('pg-boss failed to start!')
console.error(error)
pgBossStatus = PgBossStatus.Error
rejectPgBossStarted(boss)
return
}
resolvePgBossStarted(boss)
console.log('pg-boss started!')
pgBossStatus = PgBossStatus.Started
}

View File

@ -1,25 +1,25 @@
import { boss } from './pgBoss.js'
import { pgBossStarted } from './pgBoss.js'
import { Job } from '../Job.js'
import { SubmittedJob } from '../SubmittedJob.js'
export const PG_BOSS_EXECUTOR_NAME = Symbol('PgBoss')
/**
* A PgBoss specific SubmittedJob that adds additional PgBoss functionality.
* A pg-boss specific SubmittedJob that adds additional pg-boss functionality.
*/
class PgBossSubmittedJob extends SubmittedJob {
constructor(job, jobId) {
constructor(boss, job, jobId) {
super(job, jobId)
this.pgBoss = {
async cancel() { return boss.cancel(jobId) },
async resume() { return boss.resume(jobId) },
async details() { return boss.getJobById(jobId) }
cancel: () => boss.cancel(jobId),
resume: () => boss.resume(jobId),
details: () => boss.getJobById(jobId),
}
}
}
/**
* This is a class repesenting a job that can be submitted to PgBoss.
* This is a class repesenting a job that can be submitted to pg-boss.
* It is not yet submitted until the caller invokes `submit()` on an instance.
* The caller can make as many calls to `submit()` as they wish.
*/
@ -51,14 +51,15 @@ class PgBossJob extends Job {
}
/**
* Submits the job to PgBoss.
* Submits the job to pg-boss.
* @param {object} jobArgs - The job arguments supplied by the user for their perform callback.
* @param {object} jobOptions - PgBoss specific options for `boss.send()`, which can override their defaultJobOptions.
* @param {object} jobOptions - pg-boss specific options for `boss.send()`, which can override their defaultJobOptions.
*/
async submit(jobArgs, jobOptions) {
const boss = await pgBossStarted
const jobId = await boss.send(this.jobName, jobArgs,
{ ...this.#defaultJobOptions, ...(this.#startAfter && { startAfter: this.#startAfter }), ...jobOptions })
return new PgBossSubmittedJob(this, jobId)
return new PgBossSubmittedJob(boss, this, jobId)
}
}
@ -68,17 +69,36 @@ class PgBossJob extends Job {
* functions, we will override the previous calls.
* @param {string} jobName - The user-defined job name in their .wasp file.
* @param {fn} jobFn - The user-defined async job callback function.
* @param {object} defaultJobOptions - PgBoss specific options for boss.send() applied to every submit() invocation,
* @param {object} defaultJobOptions - pg-boss specific options for `boss.send()` applied to every `submit()` invocation,
* which can overriden in that call.
* @param {object} jobSchedule [Optional] - The 5-field cron string, job function JSON arg, and `boss.send()` options when invoking the job.
*/
export async function createJob({ jobName, jobFn, defaultJobOptions } = {}) {
// As a safety precaution against undefined behavior of registering different
// functions for the same job name, remove all registered functions first.
await boss.offWork(jobName)
export function createJob({ jobName, jobFn, defaultJobOptions, jobSchedule } = {}) {
// NOTE(shayne): We are not awaiting `pgBossStarted` here since we need to return an instance to the job
// template, or else the NodeJS module bootstrapping process will block and fail as it would then depend
// on a runtime resolution of the promise in `startServer()`.
// Since `pgBossStarted` will resolve in the future, it may appear possible to send pg-boss
// a job before we actually have registered the handler via `boss.work()`. However, even if NodeJS does
// not execute this callback before any job `submit()` calls, this is not a problem since pg-boss allows you
// to submit jobs even if there are no workers registered.
// Once they are registered, they will just start on the first job in their queue.
pgBossStarted.then(async (boss) => {
// As a safety precaution against undefined behavior of registering different
// functions for the same job name, remove all registered functions first.
await boss.offWork(jobName)
// This tells pgBoss to run given worker function when job/payload with given job name is submitted.
// Ref: https://github.com/timgit/pg-boss/blob/master/docs/readme.md#work
await boss.work(jobName, jobFn)
// This tells pg-boss to run given worker function when job with that name is submitted.
// Ref: https://github.com/timgit/pg-boss/blob/master/docs/readme.md#work
await boss.work(jobName, jobFn)
// If a job schedule is provided, we should schedule the recurring job.
// If the schedule name already exists, it's updated to the provided cron expression, arguments, and options.
// Ref: https://github.com/timgit/pg-boss/blob/master/docs/readme.md#scheduling
if (jobSchedule) {
const options = { ...defaultJobOptions, ...jobSchedule.options }
await boss.schedule(jobName, jobSchedule.cron, jobSchedule.args || null, options)
}
})
return new PgBossJob(jobName, defaultJobOptions)
}

View File

@ -0,0 +1,47 @@
import { sleep } from '../../utils.js'
import { Job } from './Job.js'
import { SubmittedJob } from './SubmittedJob.js'
export const SIMPLE_EXECUTOR_NAME = Symbol('Simple')
/**
* A simple job mainly intended for testing. It will not submit work to any
* job executor, but instead will simply invoke the underlying perform function.
* It does not support `schedule`. It does not require any extra NPM dependencies
* or infrastructure, however.
*/
class SimpleJob extends Job {
#jobFn
#delaySeconds
/**
*
* @param {string} jobName - Name of the Job.
* @param {fn} jobFn - The Job function to execute.
* @param {int} delaySeconds - The number of seconds to delay invoking the Job function.
*/
constructor(jobName, jobFn, delaySeconds = 0) {
super(jobName, SIMPLE_EXECUTOR_NAME)
this.#jobFn = jobFn
this.#delaySeconds = delaySeconds
}
/**
* @param {int} delaySeconds - Used to delay the processing of the job by some number of seconds.
*/
delay(delaySeconds) {
return new SimpleJob(this.jobName, this.#jobFn, delaySeconds)
}
async submit(jobArgs) {
sleep(this.#delaySeconds * 1000).then(() => this.#jobFn(jobArgs))
// NOTE: Dumb random ID generator, mainly so we don't have to add `uuid`
// as a dependency in the server generator for something nobody will likely use.
const jobId = (Math.random() + 1).toString(36).substring(7)
return new SubmittedJob(this, jobId)
}
}
export function createJob({ jobName, jobFn } = {}) {
return new SimpleJob(jobName, jobFn)
}

View File

@ -115,6 +115,22 @@ job mySpecialJob {
executor: PgBoss,
perform: {
fn: import { foo } from "@ext/jobs/bar.js",
options: {=json { "retryLimit": 1 } json=}
executorOptions: {
pgBoss: {=json { "retryLimit": 1 } json=}
}
}
}
job mySpecialScheduledJob {
executor: PgBoss,
perform: {
fn: import { foo } from "@ext/jobs/bar.js"
},
schedule: {
cron: "*/2 * * * *",
args: {=json { "foo": "bar" } json=},
executorOptions: {
pgBoss: {=json { "retryLimit": 2 } json=}
}
}
}

View File

@ -3,18 +3,30 @@
module Wasp.AppSpec.JSON
( JSON (..),
emptyObject,
nullValue,
)
where
import qualified Data.Aeson as Aeson
import qualified Data.ByteString.Lazy.UTF8 as ByteStringLazyUTF8
import qualified Data.Aeson.Text as Aeson.Text
import qualified Data.Aeson.Types as Aeson.Types
import Data.Data (Data)
import qualified Data.Text.Lazy as TextL
newtype JSON = JSON Aeson.Value
deriving (Eq, Data)
instance Show JSON where
show (JSON val) = ByteStringLazyUTF8.toString $ Aeson.encode val
show (JSON val) = TextL.unpack . Aeson.Text.encodeToLazyText $ val
instance Aeson.ToJSON JSON where
toJSON (JSON val) = val
instance Aeson.FromJSON JSON where
parseJSON val = return $ JSON val
emptyObject :: JSON
emptyObject = JSON $ Aeson.object []
emptyObject = JSON Aeson.Types.emptyObject
nullValue :: JSON
nullValue = JSON Aeson.Types.Null

View File

@ -1,9 +1,14 @@
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE DuplicateRecordFields #-}
module Wasp.AppSpec.Job
( Job (..),
JobExecutor (..),
Perform (..),
Schedule (..),
ExecutorOptions (..),
performExecutorOptionsJson,
scheduleExecutorOptionsJson,
jobExecutors,
)
where
@ -15,22 +20,56 @@ import Wasp.AppSpec.JSON (JSON (..))
data Job = Job
{ executor :: JobExecutor,
perform :: Perform
perform :: Perform,
schedule :: Maybe Schedule
}
deriving (Show, Eq, Data)
instance IsDecl Job
data JobExecutor = Passthrough | PgBoss
data JobExecutor = Simple | PgBoss
deriving (Show, Eq, Data, Ord, Enum, Bounded)
data Perform = Perform
{ fn :: ExtImport,
options :: Maybe JSON
executorOptions :: Maybe ExecutorOptions
}
deriving (Show, Eq, Data)
instance IsDecl Perform
-- Allows jobs to run via some cron schedule.
data Schedule = Schedule
{ cron :: String, -- 5 field cron expression, exe: "*/5 * * * *".
args :: Maybe JSON, -- Arguments to pass to the job handler function (`Perform.fn`).
executorOptions :: Maybe ExecutorOptions
}
deriving (Show, Eq, Data)
instance IsDecl Schedule
-- These are optional executor-specific JSON options we pass
-- directly through to the executor when submitting jobs.
data ExecutorOptions = ExecutorOptions
{ pgBoss :: Maybe JSON,
simple :: Maybe JSON
}
deriving (Show, Eq, Data)
jobExecutors :: [JobExecutor]
jobExecutors = enumFrom minBound :: [JobExecutor]
-- Helpers to disambiguate duplicate field `executorOptions`.
performExecutorOptionsJson :: Job -> Maybe JSON
performExecutorOptionsJson job =
executorOptionsJson (executor job) (executorOptions (perform job :: Perform))
scheduleExecutorOptionsJson :: Job -> Maybe JSON
scheduleExecutorOptionsJson job = do
s <- schedule job
executorOptionsJson (executor job) (executorOptions (s :: Schedule))
executorOptionsJson :: JobExecutor -> Maybe ExecutorOptions -> Maybe JSON
executorOptionsJson Simple (Just ExecutorOptions {simple = Just json}) = Just json
executorOptionsJson PgBoss (Just ExecutorOptions {pgBoss = Just json}) = Just json
executorOptionsJson _ _ = Nothing

View File

@ -8,6 +8,8 @@ module Wasp.Generator.ServerGenerator.JobGenerator
where
import Data.Aeson (object, (.=))
import qualified Data.Aeson as Aeson
import qualified Data.Aeson.Text as Aeson.Text
import Data.Maybe (fromJust, fromMaybe)
import StrongPath
( Dir,
@ -26,7 +28,7 @@ import qualified StrongPath as SP
import Wasp.AppSpec (AppSpec, getJobs)
import qualified Wasp.AppSpec.App.Dependency as AS.Dependency
import qualified Wasp.AppSpec.JSON as AS.JSON
import Wasp.AppSpec.Job (Job, JobExecutor (Passthrough, PgBoss), jobExecutors)
import Wasp.AppSpec.Job (Job, JobExecutor (PgBoss, Simple), jobExecutors)
import qualified Wasp.AppSpec.Job as J
import Wasp.AppSpec.Util (isPgBossJobExecutorUsed)
import Wasp.Generator.ExternalCodeGenerator.Common (GeneratedExternalCodeDir)
@ -40,27 +42,61 @@ import Wasp.Generator.ServerGenerator.Common
srcDirInServerTemplatesDir,
)
import qualified Wasp.Generator.ServerGenerator.Common as C
import qualified Wasp.SemanticVersion as SV
genJobs :: AppSpec -> Generator [FileDraft]
genJobs spec = return $ genJob <$> getJobs spec
genJobs spec = return $ genAllJobImports spec : (genJob <$> getJobs spec)
genJob :: (String, Job) -> FileDraft
genJob (jobName, job) =
C.mkTmplFdWithDstAndData
tmplFile
dstFile
( Just $
object
[ "jobName" .= jobName,
"jobPerformFnName" .= jobPerformFnName,
"jobPerformFnImportStatement" .= jobPerformFnImportStatement,
-- NOTE: You cannot directly input an Aeson.object for Mustache to substitute.
-- This is why we must get a text representation of the object, either by
-- `Aeson.Text.encodeToLazyText` on an Aeson.Object, or `show` on an AS.JSON.
"jobSchedule" .= Aeson.Text.encodeToLazyText (fromMaybe Aeson.Null maybeJobSchedule),
"jobPerformOptions" .= show (fromMaybe AS.JSON.emptyObject maybeJobPerformOptions),
"executorJobRelFP" .= toFilePath (executorJobTemplateInJobsDir (J.executor job))
]
)
where
tmplFile = C.asTmplFile $ jobsDirInServerTemplatesDir SP.</> [relfile|_job.js|]
dstFileFromJobName jobName = jobsDirInServerRootDir SP.</> fromJust (parseRelFile $ jobName ++ ".js")
genJob :: (String, Job) -> FileDraft
genJob (jobName, job) =
let (jobPerformFnName, jobPerformFnImportStatement) = getJsImportDetailsForExtFnImport relPosixPathFromJobFileToExtSrcDir $ (J.fn . J.perform) job
in C.mkTmplFdWithDstAndData
tmplFile
(dstFileFromJobName jobName)
( Just $
object
[ "jobName" .= jobName,
"jobPerformFnName" .= jobPerformFnName,
"jobPerformFnImportStatement" .= jobPerformFnImportStatement,
"jobPerformOptions" .= show (fromMaybe AS.JSON.emptyObject (J.options . J.perform $ job)),
"executorJobRelFP" .= toFilePath (executorJobTemplateInJobsDir (J.executor job))
]
)
dstFile = jobsDirInServerRootDir SP.</> fromJust (parseRelFile $ jobName ++ ".js")
(jobPerformFnName, jobPerformFnImportStatement) = getJsImportDetailsForExtFnImport relPosixPathFromJobFileToExtSrcDir $ (J.fn . J.perform) job
maybeJobPerformOptions = J.performExecutorOptionsJson job
jobScheduleTmplData s =
object
[ "cron" .= J.cron s,
"args" .= J.args s,
"options" .= fromMaybe AS.JSON.emptyObject (J.scheduleExecutorOptionsJson job)
]
maybeJobSchedule = jobScheduleTmplData <$> J.schedule job
-- Creates a file that is imported on the server to ensure all job JS modules are loaded
-- even if they are not referenced by user code. This ensures schedules are started, etc.
genAllJobImports :: AppSpec -> FileDraft
genAllJobImports spec =
let tmplFile = C.asTmplFile $ jobsDirInServerTemplatesDir SP.</> [relfile|core/_allJobs.js|]
dstFile = jobsDirInServerRootDir SP.</> [relfile|core/allJobs.js|]
in C.mkTmplFdWithDstAndData
tmplFile
dstFile
( Just $
object
["jobs" .= (buildJobInfo <$> (fst <$> getJobs spec))]
)
where
buildJobInfo :: String -> Aeson.Value
buildJobInfo jobName =
object
[ "name" .= jobName
]
-- | TODO: Make this not hardcoded!
relPosixPathFromJobFileToExtSrcDir :: Path Posix (Rel (Dir ServerSrcDir)) (Dir GeneratedExternalCodeDir)
@ -92,17 +128,19 @@ executorJobTemplateInServerTemplatesDir = (jobsDirInServerTemplatesDir SP.</>) .
executorJobTemplateInJobsDir :: JobExecutor -> Path' (Rel JobsDir) File'
executorJobTemplateInJobsDir PgBoss = [relfile|core/pgBoss/pgBossJob.js|]
executorJobTemplateInJobsDir Passthrough = [relfile|core/passthroughJob.js|]
executorJobTemplateInJobsDir Simple = [relfile|core/simpleJob.js|]
-- Path to destination files are the same as in templates dir.
jobsDirInServerRootDir :: Path' (Rel ServerRootDir) (Dir JobsDir)
jobsDirInServerRootDir = SP.castRel jobsDirInServerTemplatesDir
pgBossVersionBounds :: String
pgBossVersionBounds = "^7.2.1"
-- NOTE: Our pg-boss related documentation references this version in URLs.
-- Please update the docs when this changes (until we solve: https://github.com/wasp-lang/wasp/issues/596).
pgBossVersionBounds :: SV.VersionBounds
pgBossVersionBounds = SV.BackwardsCompatibleWith (SV.Version 7 2 1)
pgBossDependency :: AS.Dependency.Dependency
pgBossDependency = AS.Dependency.make ("pg-boss", pgBossVersionBounds)
pgBossDependency = AS.Dependency.make ("pg-boss", show pgBossVersionBounds)
depsRequiredByJobs :: AppSpec -> [AS.Dependency.Dependency]
depsRequiredByJobs spec = [pgBossDependency | isPgBossJobExecutorUsed spec]

View File

@ -3,6 +3,7 @@
module AnalyzerTest where
import Analyzer.TestUtil (ctx)
import qualified Data.Aeson as Aeson
import Data.Either (isRight)
import Data.List (intercalate)
import Data.Maybe (fromJust)
@ -21,6 +22,7 @@ import Wasp.AppSpec.Core.Ref (Ref (..))
import Wasp.AppSpec.Entity (Entity)
import qualified Wasp.AppSpec.Entity as Entity
import Wasp.AppSpec.ExtImport (ExtImport (..), ExtImportName (..))
import qualified Wasp.AppSpec.JSON as JSON
import qualified Wasp.AppSpec.Job as Job
import qualified Wasp.AppSpec.Page as Page
import qualified Wasp.AppSpec.Query as Query
@ -82,7 +84,17 @@ spec_Analyzer = do
"job BackgroundJob {",
" executor: PgBoss,",
" perform: {",
" fn: import { backgroundJob } from \"@ext/jobs/baz.js\"",
" fn: import { backgroundJob } from \"@ext/jobs/baz.js\",",
" executorOptions: {",
" pgBoss: {=json { \"retryLimit\": 1 } json=}",
" }",
" },",
" schedule: {",
" cron: \"*/5 * * * *\",",
" args: {=json { \"job\": \"args\" } json=},",
" executorOptions: {",
" pgBoss: {=json { \"retryLimit\": 0 } json=}",
" }",
" }",
"}"
]
@ -194,18 +206,34 @@ spec_Analyzer = do
]
takeDecls <$> decls `shouldBe` Right expectedAction
let jobPerform =
Job.Perform
( ExtImport
(ExtImportField "backgroundJob")
(fromJust $ SP.parseRelFileP "jobs/baz.js")
)
( Just $
Job.ExecutorOptions
{ Job.pgBoss = JSON.JSON <$> Aeson.decode "{\"retryLimit\":1}",
Job.simple = Nothing
}
)
let jobSchedule =
Job.Schedule
"*/5 * * * *"
(JSON.JSON <$> Aeson.decode "{\"job\":\"args\"}")
( Just $
Job.ExecutorOptions
{ Job.pgBoss = JSON.JSON <$> Aeson.decode "{\"retryLimit\":0}",
Job.simple = Nothing
}
)
let expectedJob =
[ ( "BackgroundJob",
Job.Job
{ Job.executor = Job.PgBoss,
Job.perform =
Job.Perform
{ Job.fn =
ExtImport
(ExtImportField "backgroundJob")
(fromJust $ SP.parseRelFileP "jobs/baz.js"),
Job.options = Nothing
}
Job.perform = jobPerform,
Job.schedule = Just jobSchedule
}
)
]

View File

@ -443,6 +443,185 @@ import { isPrismaError, prismaErrorToHttpError } from '@wasp/utils.js'
}
```
## Jobs
If you have server tasks that you do not want to handle as part of the normal request-response cycle, Wasp allows you to make that function a `job` and it will gain some "superpowers." Jobs will persist between server restarts, can be retried if they fail, and they can even be delayed until the future (or have a recurring schedule)! Some examples where you may want to use a `job` on the server include sending an email, making an HTTP request to some external API, or doing some nightly calculations.
### Job Executors
Job executors handle the scheduling, monitoring, and execution of our jobs.
Wasp allows you to choose which job executor will be used to execute a specific job that you define, which affects some of the finer details of how jobs will behave and how they can be further configured. Each job executor has its pros and cons, which we will explain in more detail below, so you can pick the one that best suits your needs.
Currently, Wasp supports only one type of job executor, which is `PgBoss`, but in the future, it will likely support more.
#### pg-boss
We have selected [pg-boss](https://github.com/timgit/pg-boss/) as our first job executor to handle the low-volume, basic job queue workloads many web applications have. By using PostgreSQL (and [SKIP LOCKED](https://www.2ndquadrant.com/en/blog/what-is-select-skip-locked-for-in-postgresql-9-5/)) as its storage and synchronization mechanism, it allows us to provide many job queue pros without any additional infrastructure or complex management.
Keep in mind that pg-boss jobs run alongside your other server-side code, so they are not appropriate for CPU-heavy workloads. Additionally, some care is required if you modify scheduled jobs. Please see pg-boss details for more information.
<details>
<summary>pg-boss details</summary>
pg-boss provides many useful features, which can be found [here](https://github.com/timgit/pg-boss/blob/7.2.1/README.md).
When you add pg-boss to a Wasp project, it will automatically add a new schema to your database called `pgboss` with some internal tracking tables, including `job` and `schedule`. pg-boss tables have a `name` column in most tables that will correspond to your `job` identifier. Additionally, these tables maintain arguments, states, return values, retry information, start and expiration times, and other metadata required by pg-boss.
If you need to customize the creation of the pg-boss instance, you can set an environment variable called `PG_BOSS_NEW_OPTIONS` to a stringified JSON object containing [these initialization parameters](https://github.com/timgit/pg-boss/blob/7.2.1/docs/readme.md#newoptions). **NOTE**: Setting this overwrites all Wasp defaults, so you must include database connection information as well.
##### pg-boss considerations
- Wasp starts pg-boss alongside your web server's application, where both are simultaneously operational. This means that jobs running via pg-boss and the rest of the server logic (like Operations) share the CPU, therefore you should avoid running CPU-intensive tasks via jobs.
- Wasp does not (yet) support independent, horizontal scaling of pg-boss-only applications, nor starting them as separate workers/processes/threads.
- The job name/identifier in your `.wasp` file is the same name that will be used in the `name` column of pg-boss tables. If you change a name that had a `schedule` associated with it, pg-boss will continue scheduling those jobs but they will have no handlers associated, and will thus become stale and expire. To resolve this, you can remove the applicable row from the `schedule` table in the `pgboss` schema of your database.
- If you remove a `schedule` from a job, you will need to do the above as well.
</details>
### Basic job definition and usage
To declare a `job` in Wasp, simply add a declaration with a reference to an `async` function, like the following:
```css title="main.wasp"
job mySpecialJob {
executor: PgBoss,
perform: {
fn: import { foo } from "@ext/jobs/bar.js"
}
}
```
Then, in your [Operations](/docs/language/features#queries-and-actions-aka-operations) or [setupFn](/docs/language/features#setupfn-extimport-optional) (or any other NodeJS code), you can submit work to be done:
```js
import { mySpecialJob } from '@wasp/jobs/mySpecialJob.js'
const submittedJob = await mySpecialJob.submit({ job: "args" })
console.log(await submittedJob.pgBoss.details())
// Or, if you'd prefer it to execute in the future, just add a .delay().
// It takes a number of seconds, Date, or ISO date string.
await mySpecialJob.delay(10).submit({ job: "args" })
```
And that is it! Your job will be executed by the job executor as if you called `foo({ job: "args" })`.
### Recurring jobs
If you have work that needs to be done on some recurring basis, you can add a `schedule` to your job declaration:
```css {6-9} title="main.wasp"
job mySpecialJob {
executor: PgBoss,
perform: {
fn: import { foo } from "@ext/jobs/bar.js"
},
schedule: {
cron: "0 * * * *",
args: {=json { "job": "args" } json=} // optional
}
}
```
In this example, you do _not_ need to invoke anything in JavaScript. You can imagine `foo({ "job": "args" })` getting automatically scheduled and invoked for you every hour.
### Fully specified example
Additionally, both `perform` and `schedule` accept `executorOptions`, which we pass directly to the named job executor when you submit jobs. In this example, the scheduled job will have a `retryLimit` set to 0, as `schedule` overrides any similar property from `perform`.
```css
job mySpecialJob {
executor: PgBoss,
perform: {
fn: import { foo } from "@ext/jobs/bar.js",
executorOptions: {
pgBoss: {=json { "retryLimit": 1 } json=}
}
},
schedule: {
cron: "*/5 * * * *",
args: {=json { "foo": "bar" } json=},
executorOptions: {
pgBoss: {=json { "retryLimit": 0 } json=}
}
}
}
```
### Fields
#### `executor: JobExecutor` (required)
`PgBoss` is currently our only job executor, and is recommended for low-volume production use cases. It requires your `app.db.system` to be `PostgreSQL`.
#### `perform: dict` (required)
- ##### `fn: fn` (required)
An `async` JavaScript function of work to be performed. It can optionally take a JSON value as an argument.
- ##### `executorOptions: dict` (optional)
Executor-specific default options to use when submitting jobs. These are passed directly through and you should consult the documentation for the job executor. These can be overridden during invocation with `submit()` or in a `schedule`.
- ##### `pgBoss: JSON` (optional)
See the docs for [pg-boss](https://github.com/timgit/pg-boss/blob/7.2.1/docs/readme.md#sendname-data-options).
#### `schedule: dict` (optional)
- ##### `cron: string` (required)
A 5-placeholder format cron expression string. See rationale for minute-level precision [here](https://github.com/timgit/pg-boss/blob/7.2.1/docs/readme.md#scheduling).
- ##### `args: JSON` (optional)
The arguments to pass to the `perform.fn` function when invoked.
- ##### `executorOptions: dict` (optional)
Executor-specific options to use when submitting jobs. These are passed directly through and you should consult the documentation for the job executor. The `perform.executorOptions` are the default options, and `schedule.executorOptions` can override/extend those.
- ##### `pgBoss: JSON` (optional)
See the docs for [pg-boss](https://github.com/timgit/pg-boss/blob/7.2.1/docs/readme.md#sendname-data-options).
### JavaScript API
#### Invocation
##### `import`
```js
import { mySpecialJob } from '@wasp/jobs/mySpecialJob.js'
```
##### `submit(jobArgs, executorOptions)`
- ###### `jobArgs: JSON` (optional)
- ###### `executorOptions: JSON` (optional)
Submits a `job` to be executed by an executor, optionally passing in a JSON job argument your job handler function will receive, and executor-specific submit options.
```js
const submittedJob = await mySpecialJob.submit({ job: "args" })
```
##### `delay(startAfter)` (optional)
- ###### `startAfter: int | string | Date` (required)
Delaying the invocation of the job handler. The delay can be one of:
- Integer: number of seconds to delay. [Default 0]
- String: ISO date string to run at.
- Date: Date to run at.
```js
const submittedJob = await mySpecialJob.delay(10).submit({ job: "args" }, { "retryLimit": 2 })
```
#### Tracking
The return value of `submit()` is an instance of `SubmittedJob`, which minimally contains:
- `jobId`: A getter returning the UUID String ID for the job in that executor.
- `jobName`: A getter returning the name of the job you used in your `.wasp` file.
- `executorName`: A getter returning a Symbol of the name of the job executor.
- For pg-boss, you can import a Symbol from: `import { PG_BOSS_EXECUTOR_NAME } from '@wasp/jobs/core/pgBoss/pgBossJob.js'` if you wish to compare against `executorName`.
There will also be namespaced, job executor-specific objects.
- For pg-boss, you may access: `pgBoss`
- **NOTE**: no arguments are necessary, as we already applied the `jobId` in the available functions.
- `details()`: pg-boss specific job detail information. [Reference](https://github.com/timgit/pg-boss/blob/7.2.1/docs/readme.md#getjobbyidid)
- `cancel()`: attempts to cancel a job. [Reference](https://github.com/timgit/pg-boss/blob/7.2.1/docs/readme.md#cancelid)
- `resume()`: attempts to resume a canceled job. [Reference](https://github.com/timgit/pg-boss/blob/7.2.1/docs/readme.md#resumeid)
## Dependencies
You can specify additional npm dependencies via `dependencies` field in `app` declaration, in following way:

View File

@ -62,15 +62,16 @@ While fundamental types are here to be basic building blocks of a language, and
- Tuples can be of size 2, 3 and 4.
- Domain types ([source of truth](https://github.com/wasp-lang/wasp/blob/main/waspc/src/Wasp/Analyzer/StdTypeDefinitions.hs))
- Declaration types
- **app**
- **page**
- **route**
- **query**
- **action**
- **app**
- **entity**
- **job**
- **page**
- **query**
- **route**
- Enum types
- **AuthMethod**
- **DbSystem**
- **JobExecutor**
For more details about each of the domain types, both regarding their body types and what they mean, check the [Features](/language/features.md) section.