Backport: limit contract insertion/deletion batching on backpressure (#11647)

Backport of #11589 / 6372d41d13.

* move contract insertion/deletion batching to separate function

* limit contract insertion/deletion batching on backpressure

* add changelog

CHANGELOG_BEGIN
- [JSON API] While updating the contract table for a query, if the DB appears to be slow,
  JSON API will slow down its own inserts and deletes at some point rather than construct
  ever-larger INSERT and DELETE batch commands.
  See `issue #11589 <https://github.com/digital-asset/daml/pull/11589>`__.
CHANGELOG_END
This commit is contained in:
Stephen Compall 2021-11-12 03:26:53 -05:00 committed by GitHub
parent eed4ad32b7
commit 6bf19fce82
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 14 additions and 1 deletions

View File

@ -243,7 +243,7 @@ private class ContractsFetch(
val transactInsertsDeletes = Flow
.fromFunction(jsonifyInsertDeleteStep)
.conflate(_ append _)
.via(conflation)
.map(insertAndDelete)
idses.map(_.toInsertDelete) ~> transactInsertsDeletes ~> acsSink
@ -519,4 +519,14 @@ private[http] object ContractsFetch {
TransactionFilter(domain.Party.unsubst(parties.toVector).map(_ -> filters).toMap)
}
private def conflation[D, C: InsertDeleteStep.Cid]
: Flow[InsertDeleteStep[D, C], InsertDeleteStep[D, C], NotUsed] = {
// when considering this cost, keep in mind that each deleteContracts
// may entail a table scan. Backpressure indicates that DB operations
// are slow, the idea here is to set the DB up for success
val maxCost = 250L
Flow[InsertDeleteStep[D, C]]
.batchWeighted(max = maxCost, costFn = _.size.toLong, identity)(_ append _)
}
}

View File

@ -25,6 +25,9 @@ private[http] final case class InsertDeleteStep[+D, +C](
deletes ++ o.deletes,
)
/** NB: This is ''not'' distributive across `append`. */
def size: Int = inserts.length + deletes.size
def nonEmpty: Boolean = inserts.nonEmpty || deletes.nonEmpty
def leftMap[DD](f: D => DD): InsertDeleteStep[DD, C] =