mirror of
https://github.com/digital-asset/daml.git
synced 2024-11-10 00:35:25 +03:00
Backport: limit contract insertion/deletion batching on backpressure (#11647)
Backport of #11589 / 6372d41d13
.
* move contract insertion/deletion batching to separate function
* limit contract insertion/deletion batching on backpressure
* add changelog
CHANGELOG_BEGIN
- [JSON API] While updating the contract table for a query, if the DB appears to be slow,
JSON API will slow down its own inserts and deletes at some point rather than construct
ever-larger INSERT and DELETE batch commands.
See `issue #11589 <https://github.com/digital-asset/daml/pull/11589>`__.
CHANGELOG_END
This commit is contained in:
parent
eed4ad32b7
commit
6bf19fce82
@ -243,7 +243,7 @@ private class ContractsFetch(
|
||||
|
||||
val transactInsertsDeletes = Flow
|
||||
.fromFunction(jsonifyInsertDeleteStep)
|
||||
.conflate(_ append _)
|
||||
.via(conflation)
|
||||
.map(insertAndDelete)
|
||||
|
||||
idses.map(_.toInsertDelete) ~> transactInsertsDeletes ~> acsSink
|
||||
@ -519,4 +519,14 @@ private[http] object ContractsFetch {
|
||||
|
||||
TransactionFilter(domain.Party.unsubst(parties.toVector).map(_ -> filters).toMap)
|
||||
}
|
||||
|
||||
private def conflation[D, C: InsertDeleteStep.Cid]
|
||||
: Flow[InsertDeleteStep[D, C], InsertDeleteStep[D, C], NotUsed] = {
|
||||
// when considering this cost, keep in mind that each deleteContracts
|
||||
// may entail a table scan. Backpressure indicates that DB operations
|
||||
// are slow, the idea here is to set the DB up for success
|
||||
val maxCost = 250L
|
||||
Flow[InsertDeleteStep[D, C]]
|
||||
.batchWeighted(max = maxCost, costFn = _.size.toLong, identity)(_ append _)
|
||||
}
|
||||
}
|
||||
|
@ -25,6 +25,9 @@ private[http] final case class InsertDeleteStep[+D, +C](
|
||||
deletes ++ o.deletes,
|
||||
)
|
||||
|
||||
/** NB: This is ''not'' distributive across `append`. */
|
||||
def size: Int = inserts.length + deletes.size
|
||||
|
||||
def nonEmpty: Boolean = inserts.nonEmpty || deletes.nonEmpty
|
||||
|
||||
def leftMap[DD](f: D => DD): InsertDeleteStep[DD, C] =
|
||||
|
Loading…
Reference in New Issue
Block a user