2022-11-23 13:33:44 +03:00
const logging = require ( '@tryghost/logging' ) ;
const ObjectID = require ( 'bson-objectid' ) . default ;
const errors = require ( '@tryghost/errors' ) ;
const tpl = require ( '@tryghost/tpl' ) ;
const messages = {
2023-01-20 20:36:33 +03:00
emailErrorPartialFailure : 'An error occurred, and your newsletter was only partially sent. Please retry sending the remaining emails.' ,
emailError : 'An unexpected error occurred, please retry sending your newsletter.'
2022-11-23 13:33:44 +03:00
} ;
/ * *
* @ typedef { import ( './sending-service' ) } SendingService
* @ typedef { import ( './email-segmenter' ) } EmailSegmenter
* @ typedef { import ( './email-renderer' ) } EmailRenderer
* @ typedef { import ( './email-renderer' ) . MemberLike } MemberLike
* @ typedef { object } JobsService
* @ typedef { object } Email
* @ typedef { object } Newsletter
* @ typedef { object } Post
* @ typedef { object } EmailBatch
* /
class BatchSendingService {
# emailRenderer ;
# sendingService ;
# emailSegmenter ;
# jobsService ;
# models ;
# db ;
/ * *
2022-11-30 13:51:58 +03:00
* @ param { Object } dependencies
2022-11-23 13:33:44 +03:00
* @ param { EmailRenderer } dependencies . emailRenderer
* @ param { SendingService } dependencies . sendingService
* @ param { JobsService } dependencies . jobsService
* @ param { EmailSegmenter } dependencies . emailSegmenter
* @ param { object } dependencies . models
* @ param { object } dependencies . models . EmailRecipient
* @ param { EmailBatch } dependencies . models . EmailBatch
* @ param { Email } dependencies . models . Email
* @ param { object } dependencies . models . Member
* @ param { object } dependencies . db
* /
constructor ( {
emailRenderer ,
sendingService ,
jobsService ,
emailSegmenter ,
models ,
db
} ) {
this . # emailRenderer = emailRenderer ;
this . # sendingService = sendingService ;
this . # jobsService = jobsService ;
this . # emailSegmenter = emailSegmenter ;
this . # models = models ;
this . # db = db ;
}
/ * *
* Schedules a background job that sends the email in the background if it is pending or failed .
2022-11-30 13:51:58 +03:00
* @ param { Email } email
2022-11-23 13:33:44 +03:00
* @ returns { void }
* /
scheduleEmail ( email ) {
return this . # jobsService . addJob ( {
2022-12-01 15:43:49 +03:00
name : 'batch-sending-service-job' ,
2022-11-23 13:33:44 +03:00
job : this . emailJob . bind ( this ) ,
data : { emailId : email . id } ,
offloaded : false
} ) ;
}
/ * *
* @ private
* @ param { { emailId : string } } data Data passed from the job service . We only need the emailId because we need to refetch the email anyway to make sure the status is right and 'locked' .
* /
async emailJob ( { emailId } ) {
logging . info ( ` Starting email job for email ${ emailId } ` ) ;
// Check if email is 'pending' only + change status to submitting in one transaction.
// This allows us to have a lock around the email job that makes sure an email can only have one active job.
let email = await this . updateStatusLock ( this . # models . Email , emailId , 'submitting' , [ 'pending' , 'failed' ] ) ;
if ( ! email ) {
logging . error ( ` Tried sending email that is not pending or failed ${ emailId } ` ) ;
return ;
}
try {
await this . sendEmail ( email ) ;
await email . save ( {
status : 'submitted' ,
submitted _at : new Date ( ) ,
error : null
2022-12-05 14:09:30 +03:00
} , { patch : true , autoRefresh : false } ) ;
2022-11-23 13:33:44 +03:00
} catch ( e ) {
logging . error ( ` Error sending email ${ email . id } : ${ e . message } ` ) ;
// Edge case: Store error in email model (that are not caught by the batch)
await email . save ( {
status : 'failed' ,
error : e . message || 'Something went wrong while sending the email'
2022-12-05 14:09:30 +03:00
} , { patch : true , autoRefresh : false } ) ;
2022-11-23 13:33:44 +03:00
}
}
/ * *
* @ private
2022-11-30 13:51:58 +03:00
* @ param { Email } email
2022-11-23 13:33:44 +03:00
* @ throws { errors . EmailError } If one of the batches fails
* /
async sendEmail ( email ) {
logging . info ( ` Sending email ${ email . id } ` ) ;
// Load required relations
const newsletter = await email . getLazyRelation ( 'newsletter' , { require : true } ) ;
2022-11-30 15:56:28 +03:00
const post = await email . getLazyRelation ( 'post' , { require : true , withRelated : [ 'posts_meta' , 'authors' ] } ) ;
2022-11-23 13:33:44 +03:00
let batches = await this . getBatches ( email ) ;
if ( batches . length === 0 ) {
batches = await this . createBatches ( { email , newsletter , post } ) ;
}
await this . sendBatches ( { email , batches , post , newsletter } ) ;
}
/ * *
* @ private
* @ param { Email } email
* @ returns { Promise < EmailBatch [ ] > }
* /
async getBatches ( email ) {
logging . info ( ` Getting batches for email ${ email . id } ` ) ;
return await this . # models . EmailBatch . findAll ( { filter : 'email_id:' + email . id } ) ;
}
/ * *
* @ private
* @ param { { email : Email , newsletter : Newsletter , post : Post } } data
* @ returns { Promise < EmailBatch [ ] > }
* /
async createBatches ( { email , post , newsletter } ) {
logging . info ( ` Creating batches for email ${ email . id } ` ) ;
2022-11-29 13:27:17 +03:00
const segments = this . # emailRenderer . getSegments ( post ) ;
2022-11-23 13:33:44 +03:00
const batches = [ ] ;
2022-12-01 15:43:49 +03:00
const BATCH _SIZE = this . # sendingService . getMaximumRecipients ( ) ;
2022-11-23 13:33:44 +03:00
let totalCount = 0 ;
for ( const segment of segments ) {
logging . info ( ` Creating batches for email ${ email . id } segment ${ segment } ` ) ;
const segmentFilter = this . # emailSegmenter . getMemberFilterForSegment ( newsletter , email . get ( 'recipient_filter' ) , segment ) ;
// Avoiding Bookshelf for performance reasons
let members ;
2023-01-04 13:22:12 +03:00
// Start with the id of the email, which is an objectId. We'll only fetch members that are created before the email. This is a special property of ObjectIds.
// Note: we use ID and not created_at, because imported members could set a created_at in the future or past and avoid limit checking.
let lastId = email . id ;
2022-11-23 13:33:44 +03:00
while ( ! members || lastId ) {
logging . info ( ` Fetching members batch for email ${ email . id } segment ${ segment } , lastId: ${ lastId } ` ) ;
2023-01-10 16:58:50 +03:00
const filter = segmentFilter + ` +id:< ${ lastId } ` ;
2022-12-01 15:43:49 +03:00
members = await this . # models . Member . getFilteredCollectionQuery ( { filter } )
. orderByRaw ( 'id DESC' )
. select ( 'members.id' , 'members.uuid' , 'members.email' , 'members.name' ) . limit ( BATCH _SIZE + 1 ) ;
2022-11-23 13:33:44 +03:00
if ( members . length > 0 ) {
totalCount += Math . min ( members . length , BATCH _SIZE ) ;
const batch = await this . createBatch ( email , segment , members . slice ( 0 , BATCH _SIZE ) ) ;
batches . push ( batch ) ;
}
2023-01-04 13:22:12 +03:00
if ( members . length > BATCH _SIZE ) {
lastId = members [ members . length - 2 ] . id ;
} else {
break ;
}
2022-11-23 13:33:44 +03:00
}
}
logging . info ( ` Created ${ batches . length } batches for email ${ email . id } with ${ totalCount } recipients ` ) ;
if ( email . get ( 'email_count' ) !== totalCount ) {
2023-01-10 16:58:50 +03:00
logging . error ( ` Email ${ email . id } has wrong stored email_count ${ email . get ( 'email_count' ) } , did expect ${ totalCount } . Updating the model. ` ) ;
2022-11-23 13:33:44 +03:00
2023-01-04 13:22:12 +03:00
// We update the email model because this might happen in rare cases where the initial member count changed (e.g. deleted members)
// between creating the email and sending it
2022-11-23 13:33:44 +03:00
await email . save ( {
email _count : totalCount
2022-12-05 14:09:30 +03:00
} , { patch : true , require : false , autoRefresh : false } ) ;
2022-11-23 13:33:44 +03:00
}
return batches ;
}
/ * *
* @ private
* @ param { Email } email
* @ param { import ( './email-renderer' ) . Segment } segment
* @ param { object [ ] } members
* @ returns { Promise < EmailBatch > }
* /
async createBatch ( email , segment , members , options ) {
if ( ! options || ! options . transacting ) {
return this . # models . EmailBatch . transaction ( async ( transacting ) => {
return this . createBatch ( email , segment , members , { transacting } ) ;
} ) ;
}
logging . info ( ` Creating batch for email ${ email . id } segment ${ segment } with ${ members . length } members ` ) ;
const batch = await this . # models . EmailBatch . add ( {
email _id : email . id ,
member _segment : segment ,
status : 'pending'
} , options ) ;
const recipientData = [ ] ;
members . forEach ( ( memberRow ) => {
if ( ! memberRow . id || ! memberRow . uuid || ! memberRow . email ) {
logging . warn ( ` Member row not included as email recipient due to missing data - id: ${ memberRow . id } , uuid: ${ memberRow . uuid } , email: ${ memberRow . email } ` ) ;
return ;
}
recipientData . push ( {
id : ObjectID ( ) . toHexString ( ) ,
email _id : email . id ,
member _id : memberRow . id ,
batch _id : batch . id ,
member _uuid : memberRow . uuid ,
member _email : memberRow . email ,
member _name : memberRow . name
} ) ;
} ) ;
const insertQuery = this . # db . knex ( 'email_recipients' ) . insert ( recipientData ) ;
if ( options . transacting ) {
insertQuery . transacting ( options . transacting ) ;
}
logging . info ( ` Inserting ${ recipientData . length } recipients for email ${ email . id } batch ${ batch . id } ` ) ;
await insertQuery ;
return batch ;
}
async sendBatches ( { email , batches , post , newsletter } ) {
logging . info ( ` Sending ${ batches . length } batches for email ${ email . id } ` ) ;
// Loop batches and send them via the EmailProvider
let succeededCount = 0 ;
2022-12-02 17:30:02 +03:00
const queue = batches . slice ( ) ;
// Bind this
let runNext ;
runNext = async ( ) => {
const batch = queue . shift ( ) ;
if ( batch ) {
if ( await this . sendBatch ( { email , batch , post , newsletter } ) ) {
succeededCount += 1 ;
}
await runNext ( ) ;
2022-11-23 13:33:44 +03:00
}
2022-12-02 17:30:02 +03:00
} ;
// Run maximum 10 at the same time
await Promise . all ( new Array ( 10 ) . fill ( 0 ) . map ( ( ) => runNext ( ) ) ) ;
2022-11-23 13:33:44 +03:00
if ( succeededCount < batches . length ) {
if ( succeededCount > 0 ) {
throw new errors . EmailError ( {
message : tpl ( messages . emailErrorPartialFailure )
} ) ;
}
throw new errors . EmailError ( {
message : tpl ( messages . emailError )
} ) ;
}
}
/ * *
2022-11-30 13:51:58 +03:00
*
* @ param { { email : Email , batch : EmailBatch , post : Post , newsletter : Newsletter } } data
2022-11-23 13:33:44 +03:00
* @ returns { Promise < boolean > } True when succeeded , false when failed with an error
* /
2022-12-01 15:43:49 +03:00
async sendBatch ( { email , batch : originalBatch , post , newsletter } ) {
logging . info ( ` Sending batch ${ originalBatch . id } for email ${ email . id } ` ) ;
2022-11-23 13:33:44 +03:00
// Check the status of the email batch in a 'for update' transaction
2022-12-01 15:43:49 +03:00
const batch = await this . updateStatusLock ( this . # models . EmailBatch , originalBatch . id , 'submitting' , [ 'pending' , 'failed' ] ) ;
2022-11-23 13:33:44 +03:00
if ( ! batch ) {
2022-12-01 15:43:49 +03:00
logging . error ( ` Tried sending email batch that is not pending or failed ${ originalBatch . id } ` ) ;
2022-11-23 13:33:44 +03:00
return true ;
}
let succeeded = false ;
try {
const members = await this . getBatchMembers ( batch . id ) ;
const response = await this . # sendingService . send ( {
2022-11-30 13:51:58 +03:00
emailId : email . id ,
2022-11-23 13:33:44 +03:00
post ,
newsletter ,
segment : batch . get ( 'member_segment' ) ,
members
} , {
openTrackingEnabled : ! ! email . get ( 'track_opens' ) ,
clickTrackingEnabled : ! ! email . get ( 'track_clicks' )
} ) ;
await batch . save ( {
status : 'submitted' ,
2022-11-30 13:51:58 +03:00
provider _id : response . id ,
// reset error fields when sending succeeds
error _status _code : null ,
error _message : null ,
error _data : null
2022-12-05 14:09:30 +03:00
} , { patch : true , require : false , autoRefresh : false } ) ;
2022-11-23 13:33:44 +03:00
succeeded = true ;
} catch ( err ) {
2022-12-01 18:35:54 +03:00
logging . error ( ` Error sending email batch ${ batch . id } ` ) ;
logging . error ( err ) ;
2022-11-23 13:33:44 +03:00
await batch . save ( {
2022-11-30 13:51:58 +03:00
status : 'failed' ,
2023-01-10 16:58:50 +03:00
error _status _code : err . statusCode ? ? null ,
2022-11-30 13:51:58 +03:00
error _message : err . message ,
2023-01-10 16:58:50 +03:00
error _data : err . errorDetails ? ? null
2022-12-05 14:09:30 +03:00
} , { patch : true , require : false , autoRefresh : false } ) ;
2022-11-23 13:33:44 +03:00
}
// Mark as processed, even when failed
await this . # models . EmailRecipient
. where ( { batch _id : batch . id } )
2022-12-05 14:09:30 +03:00
. save ( { processed _at : new Date ( ) } , { patch : true , require : false , autoRefresh : false } ) ;
2022-11-23 13:33:44 +03:00
return succeeded ;
}
/ * *
* We don ' t want to pass EmailRecipient models to the sendingService .
* So we transform them into the MemberLike interface .
* That keeps the sending service nicely seperated so it isn ' t dependent on the batch sending data structure .
* @ returns { Promise < MemberLike [ ] > }
* /
async getBatchMembers ( batchId ) {
const models = await this . # models . EmailRecipient . findAll ( { filter : ` batch_id: ${ batchId } ` } ) ;
return models . map ( ( model ) => {
return {
id : model . get ( 'member_id' ) ,
uuid : model . get ( 'member_uuid' ) ,
email : model . get ( 'member_email' ) ,
name : model . get ( 'member_name' )
} ;
} ) ;
}
/ * *
* @ private
* Update the status of an email or emailBatch to a given status , but first check if their current status is 'pending' or 'failed' .
* @ param { object } Model Bookshelf model constructor
* @ param { string } id id of the model
* @ param { string } status set the status of the model to this value
* @ param { string [ ] } allowedStatuses Check if the models current status is one of these values
* @ returns { Promise < object | undefined > } The updated model . Undefined if the model didn ' t pass the status check .
* /
async updateStatusLock ( Model , id , status , allowedStatuses ) {
let model ;
await Model . transaction ( async ( transacting ) => {
model = await Model . findOne ( { id } , { require : true , transacting , forUpdate : true } ) ;
if ( ! allowedStatuses . includes ( model . get ( 'status' ) ) ) {
model = undefined ;
return ;
}
await model . save ( {
status
2022-12-05 14:09:30 +03:00
} , { patch : true , transacting , autoRefresh : false } ) ;
2022-11-23 13:33:44 +03:00
} ) ;
return model ;
}
}
module . exports = BatchSendingService ;