Jobs callback function updates (#676)

This commit is contained in:
Shayne Czyzewski 2022-08-17 13:53:05 -04:00 committed by GitHub
parent f06e9894a9
commit d827d05d96
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 171 additions and 58 deletions

View File

@ -1,8 +1,8 @@
import axios from 'axios'
import { upsertMetric } from './utils.js'
export async function workerFunction() {
console.log('github.js workerFunction')
export async function workerFunction(args, context) {
console.log('github.js workerFunction', args, context)
const response = await axios.get('https://api.github.com/repos/wasp-lang/wasp')
@ -13,7 +13,7 @@ export async function workerFunction() {
{ name: 'Wasp GitHub Open Issues', value: response.data.open_issues },
]
await Promise.all(metrics.map(upsertMetric))
await Promise.all(metrics.map(upsertMetric(context)))
return metrics
}

View File

@ -1,16 +1,16 @@
import axios from 'axios'
import { upsertMetric } from './utils.js'
export async function workerFunction({ data } = {}) {
console.log('loadTime.js workerFunction', data)
export async function workerFunction(args, context) {
console.log('loadTime.js workerFunction', args, context)
const start = Date.now()
await axios.get(data.url)
await axios.get(args.url)
const end = Date.now()
const metrics = [{ name: data.name, value: `${end - start}ms` }]
const metrics = [{ name: args.name, value: `${end - start}ms` }]
await Promise.all(metrics.map(upsertMetric))
await Promise.all(metrics.map(upsertMetric(context)))
return metrics
}

View File

@ -1,11 +1,9 @@
import Prisma from '@prisma/client'
const prisma = new Prisma.PrismaClient()
export function upsertMetric({ name, value } = {}) {
return prisma.metric.upsert({
where: { name },
update: { name, value: String(value) },
create: { name, value: String(value) }
})
export function upsertMetric(context) {
return ({ name, value } = {}) => {
return context.entities.Metric.upsert({
where: { name },
update: { name, value: String(value) },
create: { name, value: String(value) }
})
}
}

View File

@ -19,7 +19,8 @@ job github {
},
schedule: {
cron: "*/10 * * * *"
}
},
entities: [Metric]
}
job loadTime {
@ -33,7 +34,8 @@ job loadTime {
"url": "https://wasp-lang.dev",
"name": "wasp-lang.dev Load Time"
} json=}
}
},
entities: [Metric]
}
entity Metric {=psl

View File

@ -1,4 +1,5 @@
{{={= =}=}}
import prisma from '../dbClient.js'
import { createJob } from './{= executorJobRelFP =}'
{=& jobPerformFnImportStatement =}
@ -6,5 +7,10 @@ export const {= jobName =} = createJob({
jobName: "{= jobName =}",
jobFn: {= jobPerformFnName =},
defaultJobOptions: {=& jobPerformOptions =},
jobSchedule: {=& jobSchedule =}
jobSchedule: {=& jobSchedule =},
entities: {
{=# entities =}
{= name =}: prisma.{= prismaIdentifier =},
{=/ entities =}
},
})

View File

@ -72,8 +72,9 @@ class PgBossJob extends Job {
* @param {object} defaultJobOptions - pg-boss specific options for `boss.send()` applied to every `submit()` invocation,
* which can overriden in that call.
* @param {object} jobSchedule [Optional] - The 5-field cron string, job function JSON arg, and `boss.send()` options when invoking the job.
* @param {array} entities - Entities used by job, passed into callback context.
*/
export function createJob({ jobName, jobFn, defaultJobOptions, jobSchedule } = {}) {
export function createJob({ jobName, jobFn, defaultJobOptions, jobSchedule, entities } = {}) {
// NOTE(shayne): We are not awaiting `pgBossStarted` here since we need to return an instance to the job
// template, or else the NodeJS module bootstrapping process will block and fail as it would then depend
// on a runtime resolution of the promise in `startServer()`.
@ -89,7 +90,7 @@ export function createJob({ jobName, jobFn, defaultJobOptions, jobSchedule } = {
// This tells pg-boss to run given worker function when job with that name is submitted.
// Ref: https://github.com/timgit/pg-boss/blob/master/docs/readme.md#work
await boss.work(jobName, jobFn)
await boss.work(jobName, pgBossCallbackWrapper(jobFn, entities))
// If a job schedule is provided, we should schedule the recurring job.
// If the schedule name already exists, it's updated to the provided cron expression, arguments, and options.
@ -102,3 +103,18 @@ export function createJob({ jobName, jobFn, defaultJobOptions, jobSchedule } = {
return new PgBossJob(jobName, defaultJobOptions)
}
/**
* Wraps the normal pg-boss callback function to inject entities, as well as extract
* the `data` property so the arguments passed into the job are the exact same as those received.
*
* @param {fn} jobFn - The user-defined async job callback function.
* @param {array} entities - Entities used by job, passed into callback context.
* @returns a function that accepts the pg-boss callback arguments and invokes the user-defined callback.
*/
function pgBossCallbackWrapper(jobFn, entities) {
return (args) => {
const context = { entities }
return jobFn(args.data, context)
}
}

View File

@ -137,7 +137,7 @@
"file",
"server/src/jobs/core/pgBoss/pgBossJob.js"
],
"6a58878b76d5086454bc721dc779d0589eab6096913272910f390dc502a34708"
"532ed0f34a2011cff2a9c43b712a513ba505e77d896455d6c7f36add6f88741d"
],
[
[

View File

@ -72,8 +72,9 @@ class PgBossJob extends Job {
* @param {object} defaultJobOptions - pg-boss specific options for `boss.send()` applied to every `submit()` invocation,
* which can overriden in that call.
* @param {object} jobSchedule [Optional] - The 5-field cron string, job function JSON arg, and `boss.send()` options when invoking the job.
* @param {array} entities - Entities used by job, passed into callback context.
*/
export function createJob({ jobName, jobFn, defaultJobOptions, jobSchedule } = {}) {
export function createJob({ jobName, jobFn, defaultJobOptions, jobSchedule, entities } = {}) {
// NOTE(shayne): We are not awaiting `pgBossStarted` here since we need to return an instance to the job
// template, or else the NodeJS module bootstrapping process will block and fail as it would then depend
// on a runtime resolution of the promise in `startServer()`.
@ -89,7 +90,7 @@ export function createJob({ jobName, jobFn, defaultJobOptions, jobSchedule } = {
// This tells pg-boss to run given worker function when job with that name is submitted.
// Ref: https://github.com/timgit/pg-boss/blob/master/docs/readme.md#work
await boss.work(jobName, jobFn)
await boss.work(jobName, pgBossCallbackWrapper(jobFn, entities))
// If a job schedule is provided, we should schedule the recurring job.
// If the schedule name already exists, it's updated to the provided cron expression, arguments, and options.
@ -102,3 +103,18 @@ export function createJob({ jobName, jobFn, defaultJobOptions, jobSchedule } = {
return new PgBossJob(jobName, defaultJobOptions)
}
/**
* Wraps the normal pg-boss callback function to inject entities, as well as extract
* the `data` property so the arguments passed into the job are the exact same as those received.
*
* @param {fn} jobFn - The user-defined async job callback function.
* @param {array} entities - Entities used by job, passed into callback context.
* @returns a function that accepts the pg-boss callback arguments and invokes the user-defined callback.
*/
function pgBossCallbackWrapper(jobFn, entities) {
return (args) => {
const context = { entities }
return jobFn(args.data, context)
}
}

View File

@ -137,7 +137,7 @@
"file",
"server/src/jobs/core/pgBoss/pgBossJob.js"
],
"6a58878b76d5086454bc721dc779d0589eab6096913272910f390dc502a34708"
"532ed0f34a2011cff2a9c43b712a513ba505e77d896455d6c7f36add6f88741d"
],
[
[

View File

@ -72,8 +72,9 @@ class PgBossJob extends Job {
* @param {object} defaultJobOptions - pg-boss specific options for `boss.send()` applied to every `submit()` invocation,
* which can overriden in that call.
* @param {object} jobSchedule [Optional] - The 5-field cron string, job function JSON arg, and `boss.send()` options when invoking the job.
* @param {array} entities - Entities used by job, passed into callback context.
*/
export function createJob({ jobName, jobFn, defaultJobOptions, jobSchedule } = {}) {
export function createJob({ jobName, jobFn, defaultJobOptions, jobSchedule, entities } = {}) {
// NOTE(shayne): We are not awaiting `pgBossStarted` here since we need to return an instance to the job
// template, or else the NodeJS module bootstrapping process will block and fail as it would then depend
// on a runtime resolution of the promise in `startServer()`.
@ -89,7 +90,7 @@ export function createJob({ jobName, jobFn, defaultJobOptions, jobSchedule } = {
// This tells pg-boss to run given worker function when job with that name is submitted.
// Ref: https://github.com/timgit/pg-boss/blob/master/docs/readme.md#work
await boss.work(jobName, jobFn)
await boss.work(jobName, pgBossCallbackWrapper(jobFn, entities))
// If a job schedule is provided, we should schedule the recurring job.
// If the schedule name already exists, it's updated to the provided cron expression, arguments, and options.
@ -102,3 +103,18 @@ export function createJob({ jobName, jobFn, defaultJobOptions, jobSchedule } = {
return new PgBossJob(jobName, defaultJobOptions)
}
/**
* Wraps the normal pg-boss callback function to inject entities, as well as extract
* the `data` property so the arguments passed into the job are the exact same as those received.
*
* @param {fn} jobFn - The user-defined async job callback function.
* @param {array} entities - Entities used by job, passed into callback context.
* @returns a function that accepts the pg-boss callback arguments and invokes the user-defined callback.
*/
function pgBossCallbackWrapper(jobFn, entities) {
return (args) => {
const context = { entities }
return jobFn(args.data, context)
}
}

View File

@ -116,7 +116,7 @@
"file",
"server/src/jobs/MySpecialJob.js"
],
"4639c4c738a9a6a5b0137962ac64252da12aaa01cff7918ba0937e8b375bf9e6"
"69926a4b7eafc2e80e50732fee80865625c06f74004cbcd7be1701b4ce9249eb"
],
[
[
@ -151,7 +151,7 @@
"file",
"server/src/jobs/core/pgBoss/pgBossJob.js"
],
"6a58878b76d5086454bc721dc779d0589eab6096913272910f390dc502a34708"
"532ed0f34a2011cff2a9c43b712a513ba505e77d896455d6c7f36add6f88741d"
],
[
[

View File

@ -1,3 +1,4 @@
import prisma from '../dbClient.js'
import { createJob } from './core/pgBoss/pgBossJob.js'
import { foo } from './../ext-src/jobs/bar.js'
@ -5,5 +6,7 @@ export const MySpecialJob = createJob({
jobName: "MySpecialJob",
jobFn: foo,
defaultJobOptions: {},
jobSchedule: null
jobSchedule: null,
entities: {
},
})

View File

@ -72,8 +72,9 @@ class PgBossJob extends Job {
* @param {object} defaultJobOptions - pg-boss specific options for `boss.send()` applied to every `submit()` invocation,
* which can overriden in that call.
* @param {object} jobSchedule [Optional] - The 5-field cron string, job function JSON arg, and `boss.send()` options when invoking the job.
* @param {array} entities - Entities used by job, passed into callback context.
*/
export function createJob({ jobName, jobFn, defaultJobOptions, jobSchedule } = {}) {
export function createJob({ jobName, jobFn, defaultJobOptions, jobSchedule, entities } = {}) {
// NOTE(shayne): We are not awaiting `pgBossStarted` here since we need to return an instance to the job
// template, or else the NodeJS module bootstrapping process will block and fail as it would then depend
// on a runtime resolution of the promise in `startServer()`.
@ -89,7 +90,7 @@ export function createJob({ jobName, jobFn, defaultJobOptions, jobSchedule } = {
// This tells pg-boss to run given worker function when job with that name is submitted.
// Ref: https://github.com/timgit/pg-boss/blob/master/docs/readme.md#work
await boss.work(jobName, jobFn)
await boss.work(jobName, pgBossCallbackWrapper(jobFn, entities))
// If a job schedule is provided, we should schedule the recurring job.
// If the schedule name already exists, it's updated to the provided cron expression, arguments, and options.
@ -102,3 +103,18 @@ export function createJob({ jobName, jobFn, defaultJobOptions, jobSchedule } = {
return new PgBossJob(jobName, defaultJobOptions)
}
/**
* Wraps the normal pg-boss callback function to inject entities, as well as extract
* the `data` property so the arguments passed into the job are the exact same as those received.
*
* @param {fn} jobFn - The user-defined async job callback function.
* @param {array} entities - Entities used by job, passed into callback context.
* @returns a function that accepts the pg-boss callback arguments and invokes the user-defined callback.
*/
function pgBossCallbackWrapper(jobFn, entities) {
return (args) => {
const context = { entities }
return jobFn(args.data, context)
}
}

View File

@ -137,7 +137,7 @@
"file",
"server/src/jobs/core/pgBoss/pgBossJob.js"
],
"6a58878b76d5086454bc721dc779d0589eab6096913272910f390dc502a34708"
"532ed0f34a2011cff2a9c43b712a513ba505e77d896455d6c7f36add6f88741d"
],
[
[

View File

@ -72,8 +72,9 @@ class PgBossJob extends Job {
* @param {object} defaultJobOptions - pg-boss specific options for `boss.send()` applied to every `submit()` invocation,
* which can overriden in that call.
* @param {object} jobSchedule [Optional] - The 5-field cron string, job function JSON arg, and `boss.send()` options when invoking the job.
* @param {array} entities - Entities used by job, passed into callback context.
*/
export function createJob({ jobName, jobFn, defaultJobOptions, jobSchedule } = {}) {
export function createJob({ jobName, jobFn, defaultJobOptions, jobSchedule, entities } = {}) {
// NOTE(shayne): We are not awaiting `pgBossStarted` here since we need to return an instance to the job
// template, or else the NodeJS module bootstrapping process will block and fail as it would then depend
// on a runtime resolution of the promise in `startServer()`.
@ -89,7 +90,7 @@ export function createJob({ jobName, jobFn, defaultJobOptions, jobSchedule } = {
// This tells pg-boss to run given worker function when job with that name is submitted.
// Ref: https://github.com/timgit/pg-boss/blob/master/docs/readme.md#work
await boss.work(jobName, jobFn)
await boss.work(jobName, pgBossCallbackWrapper(jobFn, entities))
// If a job schedule is provided, we should schedule the recurring job.
// If the schedule name already exists, it's updated to the provided cron expression, arguments, and options.
@ -102,3 +103,18 @@ export function createJob({ jobName, jobFn, defaultJobOptions, jobSchedule } = {
return new PgBossJob(jobName, defaultJobOptions)
}
/**
* Wraps the normal pg-boss callback function to inject entities, as well as extract
* the `data` property so the arguments passed into the job are the exact same as those received.
*
* @param {fn} jobFn - The user-defined async job callback function.
* @param {array} entities - Entities used by job, passed into callback context.
* @returns a function that accepts the pg-boss callback arguments and invokes the user-defined callback.
*/
function pgBossCallbackWrapper(jobFn, entities) {
return (args) => {
const context = { entities }
return jobFn(args.data, context)
}
}

View File

@ -1,7 +1,7 @@
import { sleep } from '@wasp/utils.js'
export const foo = async (args) => {
console.log("Inside Job bar's callback foo: ", args)
export async function foo(args, context) {
console.log("Inside Job bar's callback foo: ", args, context)
await sleep(4000)
return { hello: "world" }
}

View File

@ -121,7 +121,8 @@ job mySpecialJob {
executorOptions: {
pgBoss: {=json { "retryLimit": 1 } json=}
}
}
},
entities: [Task]
}
job mySpecialScheduledJob {

View File

@ -15,13 +15,16 @@ where
import Data.Data (Data)
import Wasp.AppSpec.Core.Decl (IsDecl)
import Wasp.AppSpec.Core.Ref (Ref)
import Wasp.AppSpec.Entity (Entity)
import Wasp.AppSpec.ExtImport (ExtImport)
import Wasp.AppSpec.JSON (JSON (..))
data Job = Job
{ executor :: JobExecutor,
perform :: Perform,
schedule :: Maybe Schedule
schedule :: Maybe Schedule,
entities :: Maybe [Ref Entity]
}
deriving (Show, Eq, Data)

View File

@ -10,6 +10,7 @@ module Wasp.Generator.ServerGenerator.Common
asTmplSrcFile,
asServerFile,
asServerSrcFile,
entityNameToPrismaIdentifier,
ServerRootDir,
ServerSrcDir,
ServerTemplatesDir,
@ -18,6 +19,7 @@ module Wasp.Generator.ServerGenerator.Common
where
import qualified Data.Aeson as Aeson
import Data.Char (toLower)
import StrongPath (Dir, File', Path', Rel, reldir, (</>))
import qualified StrongPath as SP
import Wasp.Generator.Common (ProjectRootDir)
@ -85,3 +87,10 @@ serverTemplatesDirInTemplatesDir = [reldir|server|]
srcDirInServerTemplatesDir :: Path' (Rel ServerTemplatesDir) (Dir ServerTemplatesSrcDir)
srcDirInServerTemplatesDir = [reldir|src|]
-- | Takes a Wasp Entity name (like `SomeTask` from `entity SomeTask {...}`) and
-- converts it into a corresponding Prisma identifier (like `prisma.someTask`).
-- This is what Prisma implicitly does when translating `model` declarations to
-- client SDK identifiers. Useful when creating `context.entities` JS objects in Wasp templates.
entityNameToPrismaIdentifier :: String -> String
entityNameToPrismaIdentifier entityName = toLower (head entityName) : tail entityName

View File

@ -26,6 +26,7 @@ import StrongPath
)
import qualified StrongPath as SP
import Wasp.AppSpec (AppSpec, getJobs)
import qualified Wasp.AppSpec as AS
import qualified Wasp.AppSpec.App.Dependency as AS.Dependency
import qualified Wasp.AppSpec.JSON as AS.JSON
import Wasp.AppSpec.Job (Job, JobExecutor (PgBoss, Simple), jobExecutors)
@ -62,7 +63,8 @@ genJob (jobName, job) =
-- `Aeson.Text.encodeToLazyText` on an Aeson.Object, or `show` on an AS.JSON.
"jobSchedule" .= Aeson.Text.encodeToLazyText (fromMaybe Aeson.Null maybeJobSchedule),
"jobPerformOptions" .= show (fromMaybe AS.JSON.emptyObject maybeJobPerformOptions),
"executorJobRelFP" .= toFilePath (executorJobTemplateInJobsDir (J.executor job))
"executorJobRelFP" .= toFilePath (executorJobTemplateInJobsDir (J.executor job)),
"entities" .= maybe [] (map (buildEntityData . AS.refName)) (J.entities job)
]
)
where
@ -78,6 +80,13 @@ genJob (jobName, job) =
]
maybeJobSchedule = jobScheduleTmplData <$> J.schedule job
buildEntityData :: String -> Aeson.Value
buildEntityData entityName =
object
[ "name" .= entityName,
"prismaIdentifier" .= C.entityNameToPrismaIdentifier entityName
]
-- Creates a file that is imported on the server to ensure all job JS modules are loaded
-- even if they are not referenced by user code. This ensures schedules are started, etc.
genAllJobImports :: AppSpec -> FileDraft

View File

@ -10,7 +10,6 @@ where
import Data.Aeson (object, (.=))
import qualified Data.Aeson as Aeson
import Data.Char (toLower)
import Data.Maybe (fromJust)
import StrongPath (Dir, Dir', File', Path, Path', Posix, Rel, reldir, reldirP, relfile, (</>))
import qualified StrongPath as SP
@ -90,5 +89,5 @@ operationTmplData operation =
buildEntityData entityName =
object
[ "name" .= entityName,
"prismaIdentifier" .= (toLower (head entityName) : tail entityName)
"prismaIdentifier" .= C.entityNameToPrismaIdentifier entityName
]

View File

@ -244,7 +244,8 @@ spec_Analyzer = do
Job.Job
{ Job.executor = Job.PgBoss,
Job.perform = jobPerform,
Job.schedule = Just jobSchedule
Job.schedule = Just jobSchedule,
Job.entities = Nothing
}
)
]

View File

@ -513,9 +513,7 @@ console.log(await submittedJob.pgBoss.details())
await mySpecialJob.delay(10).submit({ job: "args" })
```
And that is it! Your job will be executed by the job executor (pg-boss, in this case) as if you called `foo({ data: { job: "args" } })`.
**Note**: pg-boss wraps job arguments into a larger object and exposes it under the property `data`.
And that is it! Your job will be executed by the job executor (pg-boss, in this case) as if you called `foo({ job: "args" })`.
### Recurring jobs
@ -534,16 +532,15 @@ job mySpecialJob {
}
```
In this example, you do _not_ need to invoke anything in JavaScript. You can imagine `foo({ data: { "job": "args" } })` getting automatically scheduled and invoked for you every hour.
**Note**: pg-boss wraps job arguments into a larger object and exposes it under the property `data`.
In this example, you do _not_ need to invoke anything in JavaScript. You can imagine `foo({ "job": "args" })` getting automatically scheduled and invoked for you every hour.
### Fully specified example
Additionally, both `perform` and `schedule` accept `executorOptions`, which we pass directly to the named job executor when you submit jobs. In this example, the scheduled job will have a `retryLimit` set to 0, as `schedule` overrides any similar property from `perform`.
Both `perform` and `schedule` accept `executorOptions`, which we pass directly to the named job executor when you submit jobs. In this example, the scheduled job will have a `retryLimit` set to 0, as `schedule` overrides any similar property from `perform`. Lastly, we add an entity to pass in via the context argument to `perform.fn`.
```css
job mySpecialJob {
executor: PgBoss,
entities: [Task],
perform: {
fn: import { foo } from "@ext/jobs/bar.js",
executorOptions: {
@ -568,7 +565,13 @@ job mySpecialJob {
#### `perform: dict` (required)
- ##### `fn: fn` (required)
An `async` JavaScript function of work to be performed. It can optionally take a JSON value as an argument.
An `async` JavaScript function of work to be performed. It receives a JSON value as the first argument and context containing any declared entities as the second. Here is a sample signature:
```js
export async function foo(args, context) {
// Can reference context.entities.Task, for example.
}
```
- ##### `executorOptions: dict` (optional)
Executor-specific default options to use when submitting jobs. These are passed directly through and you should consult the documentation for the job executor. These can be overridden during invocation with `submit()` or in a `schedule`.
@ -583,15 +586,16 @@ job mySpecialJob {
- ##### `args: JSON` (optional)
The arguments to pass to the `perform.fn` function when invoked.
**Note**: pg-boss wraps job arguments into a larger object and exposes it under the property `data`.
- ##### `executorOptions: dict` (optional)
Executor-specific options to use when submitting jobs. These are passed directly through and you should consult the documentation for the job executor. The `perform.executorOptions` are the default options, and `schedule.executorOptions` can override/extend those.
- ##### `pgBoss: JSON` (optional)
See the docs for [pg-boss](https://github.com/timgit/pg-boss/blob/7.2.1/docs/readme.md#sendname-data-options).
#### `entities: [Entity]` (optional)
A list of entities you wish to use inside your Job (similar to Queries and Actions).
### JavaScript API
#### Invocation
@ -607,8 +611,6 @@ import { mySpecialJob } from '@wasp/jobs/mySpecialJob.js'
Submits a `job` to be executed by an executor, optionally passing in a JSON job argument your job handler function will receive, and executor-specific submit options.
**Note**: pg-boss wraps job arguments into a larger object and exposes it under the property `data`.
```js
const submittedJob = await mySpecialJob.submit({ job: "args" })
```