Query store support for interface query/fetch (#15304)

* Update contracts table to allow multiple rows per contract id

* Keep template id for deletes

* add tpid grouping to deleteContracts

* chunking implementation

* mention tpid for postgres ON CONFLICT

* Remove ByInterfaceId special case

* add changelog

CHANGELOG_BEGIN
- [JSON API] The JSON API query store database has changed schema and
  needs to be reset. If you are migrating from a previous version,
  either reset your database manually or start the HTTP JSON API with
  one of the options that regenerate the schema (``create-only``,
  ``create-if-needed-and-start``, ``create-and-start``).
CHANGELOG_END

* add tpid to ignore_row_on_dupkey_index pragma

- came up in discussion with @ray-roestenburg-da; thanks

Co-authored-by: Stephen Compall <stephen.compall@daml.com>
This commit is contained in:
fayi-da 2022-11-01 19:53:57 +00:00 committed by GitHub
parent b77a4552d1
commit 33b637433d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 173 additions and 65 deletions

View File

@ -42,6 +42,8 @@ da_scala_library(
"//ledger/metrics",
"//libs-scala/contextualized-logging",
"//libs-scala/nonempty",
"//libs-scala/nonempty-cats",
"//libs-scala/scala-utils",
],
)
@ -51,6 +53,8 @@ da_scala_test(
srcs = glob(["src/test/scala/**/*.scala"]),
scala_deps = [
"@maven//:com_chuusai_shapeless",
"@maven//:org_scalacheck_scalacheck",
"@maven//:org_scalatestplus_scalacheck_1_15",
"@maven//:org_scalatest_scalatest_core",
"@maven//:org_scalatest_scalatest_matchers_core",
"@maven//:org_scalatest_scalatest_shouldmatchers",
@ -67,5 +71,6 @@ da_scala_test(
# data = ["//docs:quickstart-model.dar"],
deps = [
":db-backend",
"//libs-scala/nonempty",
],
)

View File

@ -37,7 +37,7 @@ sealed abstract class Queries(tablePrefix: String, tpIdCacheMaxEntries: Long)(im
import Queries.{Implicits => _, _}, InitDdl._
import Queries.Implicits._
val schemaVersion = 3
val schemaVersion = 4
private[http] val surrogateTpIdCache = new SurrogateTemplateIdCache(metrics, tpIdCacheMaxEntries)
@ -69,13 +69,14 @@ sealed abstract class Queries(tablePrefix: String, tpIdCacheMaxEntries: Long)(im
sql"""
CREATE TABLE
$contractTableName
(contract_id $contractIdType NOT NULL CONSTRAINT ${tablePrefixFr}contract_k PRIMARY KEY
(contract_id $contractIdType NOT NULL
,tpid $bigIntType NOT NULL REFERENCES $templateIdTableName (tpid)
,${jsonColumn(sql"key")}
,key_hash $keyHashColumn
,${jsonColumn(contractColumnName)}
$contractsTableSignatoriesObservers
,agreement_text $agreementTextType
,CONSTRAINT ${tablePrefixFr}contract_k PRIMARY KEY (contract_id, tpid)
)
""",
)
@ -303,24 +304,31 @@ sealed abstract class Queries(tablePrefix: String, tpIdCacheMaxEntries: Long)(im
dbcs: F[DBContract[SurrogateTpId, DBContractKey, JsValue, Array[String]]]
)(implicit log: LogHandler): ConnectionIO[Int]
// ContractTypeId -> CId[String]
final def deleteContracts(
cids: Set[String]
cids: Map[SurrogateTpId, Set[String]]
)(implicit log: LogHandler): ConnectionIO[Int] = {
import cats.data.NonEmptyVector
import cats.instances.vector._
import cats.instances.int._
import cats.syntax.foldable._
NonEmptyVector.fromVector(cids.toVector) match {
case None =>
import nonempty.catsinstances._
(cids: Iterable[(SurrogateTpId, Set[String])]).view.collect { case (k, NonEmpty(cids)) =>
(k, cids)
}.toMap match {
case NonEmpty(cids) =>
val chunks = maxListSize.fold(Vector(cids))(chunkBySetSize(_, cids))
chunks.map { chunk =>
(fr"DELETE FROM $contractTableName WHERE " ++
joinFragment(
chunk.toVector.map { case (tpid, cids) =>
val inCids = Fragments.in(fr"contract_id", cids.toVector.toNEF)
fr"(tpid = $tpid AND $inCids)"
}.toOneAnd,
fr" OR ",
)).update.run
}.foldA
case _ =>
free.connection.pure(0)
case Some(cids) =>
val chunks = maxListSize.fold(Vector(cids))(size => cids.grouped(size).toVector)
chunks
.map(chunk =>
(fr"DELETE FROM $contractTableName WHERE " ++ Fragments
.in(fr"contract_id", chunk)).update.run
)
.foldA
}
}
@ -582,6 +590,49 @@ object Queries {
(allTpids diff grouped.keySet).view.map((_, Map.empty[Party, Off])).toMap ++ grouped
}
// invariant: each element x of result has `x.values.flatten.size <= size`
private[dbbackend] def chunkBySetSize[K, V](
size: Int,
groups: NonEmpty[Map[K, NonEmpty[Set[V]]]],
): Vector[NonEmpty[Map[K, NonEmpty[Set[V]]]]] = {
assert(size > 0, s"chunk size must be positive, not $size")
type Groups = NonEmpty[Map[K, NonEmpty[Set[V]]]]
type Remaining = Map[K, NonEmpty[Set[V]]]
@annotation.tailrec
def takeSize(size: Int, acc: Groups, remaining: Remaining): (Groups, Remaining) =
if (size <= 0 || remaining.isEmpty) (acc, remaining)
else {
val (k, sv) = remaining.head
if (sv.size <= size)
takeSize(size - sv.size, acc.updated(k, sv), remaining - k)
else {
// ! <= proves that both sides of the split are non-empty
val NonEmpty(taken) = sv take size
val NonEmpty(left) = sv -- taken
(acc.updated(k, taken), remaining.updated(k, left))
}
}
// XXX SC: takeInitSize duplicates some of takeSize, but it's a little tricky
// to start with an empty acc
def takeInitSize(remaining: Groups): (Groups, Remaining) = {
val (k, sv) = remaining.head
if (sv.size <= size)
takeSize(size - sv.size, NonEmpty(Map, k -> sv), remaining - k)
else {
// ! <= proves that both sides of the split are non-empty
val NonEmpty(taken) = sv take size
val NonEmpty(left) = sv -- taken
(NonEmpty(Map, k -> taken), remaining.updated(k, left))
}
}
Vector.unfold(groups: Remaining) { remaining =>
NonEmpty from remaining map takeInitSize
}
}
import doobie.util.invariant.InvalidValue
@throws[InvalidValue[_, _]]
@ -723,7 +774,7 @@ private final class PostgresQueries(tablePrefix: String, tpIdCacheMaxEntries: Lo
s"""
INSERT INTO $contractTableNameRaw
VALUES (?, ?, ?::jsonb, ?, ?::jsonb, ?, ?, ?)
ON CONFLICT (contract_id) DO NOTHING
ON CONFLICT (contract_id, tpid) DO NOTHING
"""
).updateMany(dbcs)
}
@ -888,7 +939,7 @@ private final class OracleQueries(
import spray.json.DefaultJsonProtocol._
Update[DBContract[SurrogateTpId, DBContractKey, JsValue, JsValue]](
s"""
INSERT /*+ ignore_row_on_dupkey_index($contractTableNameRaw(contract_id)) */
INSERT /*+ ignore_row_on_dupkey_index($contractTableNameRaw(contract_id, tpid)) */
INTO $contractTableNameRaw (contract_id, tpid, key, key_hash, payload, signatories, observers, agreement_text)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
"""
@ -919,7 +970,8 @@ private final class OracleQueries(
sql"""SELECT c.contract_id contract_id, $tpid template_id, key, key_hash, payload,
signatories, observers, agreement_text ${rownum getOrElse fr""}
FROM $contractTableName c
JOIN $contractStakeholdersViewName cst ON (c.contract_id = cst.contract_id)
JOIN $contractStakeholdersViewName cst
ON (c.contract_id = cst.contract_id AND c.tpid = cst.tpid)
WHERE (${Fragments.in(fr"cst.stakeholder", parties.toNEF)})
AND ($queriesCondition)"""
rownum.fold(dupQ)(_ => sql"SELECT $outerSelectList FROM ($dupQ) WHERE rownumber = 1")

View File

@ -4,13 +4,24 @@
package com.daml.http.dbbackend
import doobie.implicits._
import com.daml.nonempty.NonEmpty
import org.scalatest.Inspectors.{forAll => cForAll}
import org.scalatest.matchers.should.Matchers
import org.scalatestplus.scalacheck.{ScalaCheckDrivenPropertyChecks => STSC}
import STSC.{forAll => scForAll}
import org.scalatest.prop.TableDrivenPropertyChecks
import org.scalatest.wordspec.AnyWordSpec
import scala.collection.immutable.Seq
import scala.collection.{immutable => imm}
import scalaz.\/
import scalaz.std.anyVal._
import scalaz.std.map._
import scalaz.std.set._
import scalaz.std.vector._
import scalaz.syntax.foldable._
import scalaz.syntax.std.map._
class QueriesSpec extends AnyWordSpec with Matchers with TableDrivenPropertyChecks {
import QueriesSpec._
"projectedIndex" should {
@ -59,6 +70,53 @@ class QueriesSpec extends AnyWordSpec with Matchers with TableDrivenPropertyChec
)
}
}
"chunkBySetSize" should {
import org.scalacheck.{Arbitrary, Gen, Shrink}
import Arbitrary.arbitrary
import Queries.chunkBySetSize
type Arg = NonEmpty[Map[Int, NonEmpty[Set[Int]]]]
val sizes: Gen[Int] = Gen.choose(1, 20)
implicit val doNotShrinkSize: Shrink[Int] = Shrink.shrinkAny
val randomArg: Gen[Arg] = Gen
.nonEmptyMap(
Gen.zip(
arbitrary[Int],
Gen.nonEmptyContainerOf[Set, Int](arbitrary[Int]).map { case NonEmpty(xs) => xs },
)
)
.map { case NonEmpty(xs) => xs }
implicit def shrinkNE[Self <: imm.Iterable[Any]: Shrink]: Shrink[NonEmpty[Self]] =
Shrink { nes => Shrink shrink nes.forgetNE collect { case NonEmpty(s) => s } }
// at 1k each test takes ~500ms; at 10k each ~5s
implicit val generatorDrivenConfig: STSC.PropertyCheckConfiguration =
STSC.generatorDrivenConfig.copy(minSuccessful = 1000)
"include all arguments in the result" in scForAll(sizes, randomArg) { (s, r) =>
chunkBySetSize(s, r).foldMap1Opt(identity) should ===(Some(r))
}
def measuredChunkSize[K, V](chunk: NonEmpty[Map[K, NonEmpty[Set[V]]]]) =
chunk.toNEF.foldMap(_.size)
"never exceed size in each chunk" in scForAll(sizes, randomArg) { (s, r) =>
all(chunkBySetSize(s, r) map measuredChunkSize) should be <= s
}
"make chunks as large as possible" in scForAll(sizes, randomArg) { (s, r) =>
all(chunkBySetSize(s, r).init map measuredChunkSize) should ===(s)
}
"make chunks that do not intersect" in scForAll(sizes, randomArg) { (s, r) =>
cForAll(chunkBySetSize(s, r).sliding(2, 1).toSeq) {
case Seq(a, b) => all(a.forgetNE.intersectWith(b)(_ intersect _).values) shouldBe empty
case Seq(_) => succeed
case _ => fail("impossible sliding or empty output")
}
}
}
}
object QueriesSpec {

View File

@ -45,8 +45,7 @@ trait InsertBenchmark extends ContractDaoBenchmark {
@TearDown(Level.Invocation)
def dropContracts: Unit = {
val deleted =
dao.transact(queries.deleteContracts(contractCids)).unsafeRunSync()
val deleted = dao.transact(queries.deleteContracts(Map(tpid -> contractCids))).unsafeRunSync()
assert(deleted == numContracts)
}

View File

@ -24,6 +24,7 @@ import com.daml.fetchcontracts.util.{
}
import com.daml.scalautil.ExceptionOps._
import com.daml.nonempty.NonEmpty
import com.daml.nonempty.NonEmptyReturningOps._
import com.daml.jwt.domain.Jwt
import com.daml.ledger.api.{v1 => lav1}
import com.daml.logging.{ContextualizedLogger, LoggingContextOf}
@ -243,16 +244,16 @@ private class ContractsFetch(
}
private def prepareCreatedEventStorage(
ce: lav1.event.CreatedEvent
ce: lav1.event.CreatedEvent,
d: ContractTypeId.Resolved,
): Exception \/ PreInsertContract = {
import scalaz.syntax.traverse._
import scalaz.std.option._
import com.daml.lf.crypto.Hash
for {
// TODO #14819 IgnoreInterface is wrong in interface DB update case
ac <-
domain.ActiveContract fromLedgerApi (domain.ActiveContract.IgnoreInterface, ce) leftMap (
de => new IllegalArgumentException(s"contract ${ce.contractId}: ${de.shows}"): Exception
domain.ActiveContract fromLedgerApi (domain.ResolvedQuery(d), ce) leftMap (de =>
new IllegalArgumentException(s"contract ${ce.contractId}: ${de.shows}"): Exception
)
lfKey <- ac.key.traverse(apiValueToLfValue).leftMap(_.cause: Exception)
lfArg <- apiValueToLfValue(ac.payload) leftMap (_.cause: Exception)
@ -272,11 +273,12 @@ private class ContractsFetch(
)
}
private def jsonifyInsertDeleteStep(
a: InsertDeleteStep[Any, lav1.event.CreatedEvent]
): InsertDeleteStep[Unit, PreInsertContract] =
a.leftMap(_ => ())
.mapPreservingIds(prepareCreatedEventStorage(_) valueOr (e => throw e))
private def jsonifyInsertDeleteStep[D <: ContractTypeId.Resolved](
a: InsertDeleteStep[Any, lav1.event.CreatedEvent],
d: D,
): InsertDeleteStep[D, PreInsertContract] =
a.leftMap(_ => d)
.mapPreservingIds(prepareCreatedEventStorage(_, d) valueOr (e => throw e))
private def contractsFromOffsetIo(
fetchContext: FetchContext,
@ -330,7 +332,9 @@ private class ContractsFetch(
}
val transactInsertsDeletes = Flow
.fromFunction(jsonifyInsertDeleteStep)
.fromFunction(
jsonifyInsertDeleteStep((_: InsertDeleteStep[Any, lav1.event.CreatedEvent]), templateId)
)
.via(conflation)
.map(insertAndDelete)
@ -380,27 +384,36 @@ private[http] object ContractsFetch {
@SuppressWarnings(Array("org.wartremover.warts.Any"))
private def insertAndDelete(
step: InsertDeleteStep[Any, PreInsertContract]
step: InsertDeleteStep[ContractTypeId.Resolved, PreInsertContract]
)(implicit
log: doobie.LogHandler,
sjd: SupportedJdbcDriver.TC,
lc: LoggingContextOf[InstanceUUID],
): ConnectionIO[Unit] = {
import doobie.implicits._, cats.syntax.functor._
surrogateTemplateIds(step.inserts.iterator.map(_.templateId).toSet).flatMap { stidMap =>
surrogateTemplateIds(
(step.inserts.iterator.map(_.templateId) ++ step.deletes.valuesIterator).toSet
).flatMap { stidMap =>
import cats.syntax.apply._, cats.instances.vector._
import json.JsonProtocol._
import sjd.q.queries
(queries.deleteContracts(step.deletes.keySet) *>
// cid -> ctid
// we want ctid
def mapToId(a: ContractTypeId.RequiredPkg) =
stidMap.getOrElse(
a,
throw new IllegalStateException(
"template ID missing from prior retrieval; impossible"
),
)
(queries.deleteContracts(step.deletes.groupMap1(_._2)(_._1).map { case (tid, cids) =>
(mapToId(tid), cids.toSet)
}) *>
queries.insertContracts(
step.inserts map (dbc =>
dbc.copy(
templateId = stidMap.getOrElse(
dbc.templateId,
throw new IllegalStateException(
"template ID missing from prior retrieval; impossible"
),
),
templateId = mapToId(dbc.templateId),
signatories = domain.Party.unsubst(dbc.signatories),
observers = domain.Party.unsubst(dbc.observers),
)

View File

@ -376,11 +376,9 @@ class ContractsService(
resolved <- OptionT(
resolveContractTypeId(jwt, ledgerId)(templateId).map(_.toOption.flatten)
)
res <- domain.ResolvedQuery(resolved) match {
case domain.ResolvedQuery.ByInterfaceId(_) => doSearchInMemory
case _ => doSearchInDb(resolved)
}
res <- doSearchInDb(resolved)
} yield res
dbQueried.orElse {
doSearchInMemory
}.run
@ -428,20 +426,11 @@ class ContractsService(
lc: LoggingContextOf[InstanceUUID with RequestID],
metrics: Metrics,
): Source[Error \/ domain.ActiveContract.ResolvedCtTyId[LfV], NotUsed] = {
import ctx.{jwt, ledgerId, parties, templateIds => query}
query match {
case rq: domain.ResolvedQuery.ByInterfaceId =>
import com.daml.http.json.JsonProtocol._
// TODO query store support for interface query/fetch #14819
searchInMemory(jwt, ledgerId, parties, rq, InMemoryQuery.Params(queryParams))
.map(_.map(_.map(LfValueCodec.apiValueToJsValue)))
case _ =>
// TODO use `stream` when materializing DBContracts, so we could stream ActiveContracts
val fv: Future[Vector[domain.ActiveContract.ResolvedCtTyId[JsValue]]] =
unsafeRunAsync(searchDb_(fetch)(ctx, queryParams))
// TODO use `stream` when materializing DBContracts, so we could stream ActiveContracts
val fv: Future[Vector[domain.ActiveContract.ResolvedCtTyId[JsValue]]] =
unsafeRunAsync(searchDb_(fetch)(ctx, queryParams))
Source.future(fv).mapConcat(identity).map(\/.right)
}
Source.future(fv).mapConcat(identity).map(\/.right)
}
private[this] def unsafeRunAsync[A](cio: doobie.ConnectionIO[A]) =

View File

@ -802,15 +802,7 @@ class WebSocketService(
)(implicit
lc: LoggingContextOf[InstanceUUID]
): Future[Source[StepAndErrors[Positive, JsValue], NotUsed]] = {
// TODO query store support for interface query/fetch #14819
val daoAndFetch = predicate.resolvedQurey match {
case domain.ResolvedQuery.ByInterfaceId(_) =>
None
case _ =>
contractsService.daoAndFetch
}
daoAndFetch.cata(
contractsService.daoAndFetch.cata(
{ case (dao, fetch) =>
val tx: ConnectionIO[Source[StepAndErrors[Positive, JsValue], NotUsed]] =
fetch.fetchAndPersistBracket(