[DPP-84] Employ parallel unnesting instead of batching on PostgreSQL (#8042)

* [DPP-84] Employ parallel unnesting instead of batching on PostgreSQL

changelog_begin
[Integration Kit] When using a PostgreSQL-based index, leveraging native
parallel unnesting allows to more efficiently index new transactions.
changelog_end

* Address review https://github.com/digital-asset/daml/pull/8042#pullrequestreview-541759596
This commit is contained in:
Stefano Baghino 2020-12-01 13:06:36 +01:00 committed by GitHub
parent 761b3da068
commit bc0f6ab35e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 453 additions and 203 deletions

View File

@ -120,7 +120,7 @@ def install_java_deps():
"org.mockito:mockito-inline:2.24.0",
"org.mockito:mockito-scala_2.12:1.1.2",
"org.pcollections:pcollections:2.1.3",
"org.postgresql:postgresql:42.2.9",
"org.postgresql:postgresql:42.2.18",
"org.reactivestreams:reactive-streams:1.0.2",
"org.reactivestreams:reactive-streams-tck:1.0.2",
"org.sangria-graphql:sangria_2.12:1.4.2",

View File

@ -27,8 +27,8 @@ trait AkkaBeforeAndAfterAll extends BeforeAndAfterAll {
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(s"$actorSystemName-thread-pool-worker-%d")
.setUncaughtExceptionHandler((thread, _) =>
logger.error(s"got an uncaught exception on thread: ${thread.getName}"))
.setUncaughtExceptionHandler((thread, e) =>
logger.error(s"got an uncaught exception on thread: ${thread.getName}", e))
.build()))
protected implicit lazy val system: ActorSystem =

View File

@ -4,10 +4,11 @@
package com.daml.platform.store.dao.events
import java.io.InputStream
import java.sql.Connection
import java.time.Instant
import anorm.SqlParser.{array, binaryStream, bool, int, long, str}
import anorm.{BatchSql, RowParser, ~}
import anorm.{RowParser, ~}
import com.daml.ledger.participant.state.v1.Offset
import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
import com.daml.ledger.api.v1.event.Event
@ -26,22 +27,74 @@ import com.daml.platform.ApiOffset
import com.daml.platform.api.v1.event.EventOps.{EventOps, TreeEventOps}
import com.daml.platform.index.TransactionConversion
import com.daml.platform.store.Conversions.{identifier, instant, offset}
import com.daml.platform.store.DbType
import com.google.protobuf.timestamp.Timestamp
/**
* Data access object for a table representing raw transactions nodes that
* are going to be streamed off through the Ledger API. By joining these items
* with a [[ContractWitnessesTable]] events can be filtered based on their visibility to
* a party.
*/
private[events] object EventsTable
extends EventsTable
with EventsTableInsert
with EventsTableDelete
with EventsTableFlatEvents
with EventsTableTreeEvents {
private[events] abstract class EventsTable {
final case class Executables(insertEvents: Option[BatchSql], updateArchives: Option[BatchSql])
def toExecutables(
tx: TransactionIndexing.TransactionInfo,
info: TransactionIndexing.EventsInfo,
compressed: TransactionIndexing.Serialized,
): EventsTable.Batches
}
private[events] object EventsTable {
private type SharedRow =
Offset ~ String ~ Int ~ Long ~ String ~ String ~ Instant ~ Identifier ~ Option[String] ~
Option[String] ~ Array[String]
private val sharedRow: RowParser[SharedRow] =
offset("event_offset") ~
str("transaction_id") ~
int("node_index") ~
long("event_sequential_id") ~
str("event_id") ~
str("contract_id") ~
instant("ledger_effective_time") ~
identifier("template_id") ~
str("command_id").? ~
str("workflow_id").? ~
array[String]("event_witnesses")
type CreatedEventRow =
SharedRow ~ InputStream ~ Array[String] ~ Array[String] ~ Option[String] ~ Option[InputStream]
val createdEventRow: RowParser[CreatedEventRow] =
sharedRow ~
binaryStream("create_argument") ~
array[String]("create_signatories") ~
array[String]("create_observers") ~
str("create_agreement_text").? ~
binaryStream("create_key_value").?
type ExercisedEventRow =
SharedRow ~ Boolean ~ String ~ InputStream ~ Option[InputStream] ~ Array[String] ~ Array[String]
val exercisedEventRow: RowParser[ExercisedEventRow] =
sharedRow ~
bool("exercise_consuming") ~
str("exercise_choice") ~
binaryStream("exercise_argument") ~
binaryStream("exercise_result").? ~
array[String]("exercise_actors") ~
array[String]("exercise_child_event_ids")
type ArchiveEventRow = SharedRow
val archivedEventRow: RowParser[ArchiveEventRow] = sharedRow
trait Batches {
def execute()(implicit connection: Connection): Unit
}
def apply(dbType: DbType): EventsTable =
dbType match {
case DbType.Postgres => EventsTablePostgresql
case DbType.H2Database => EventsTableH2Database
}
final case class Entry[+E](
eventOffset: Offset,
@ -163,48 +216,3 @@ private[events] object EventsTable
}
}
private[events] trait EventsTable {
private type SharedRow =
Offset ~ String ~ Int ~ Long ~ String ~ String ~ Instant ~ Identifier ~ Option[String] ~
Option[String] ~ Array[String]
private val sharedRow: RowParser[SharedRow] =
offset("event_offset") ~
str("transaction_id") ~
int("node_index") ~
long("event_sequential_id") ~
str("event_id") ~
str("contract_id") ~
instant("ledger_effective_time") ~
identifier("template_id") ~
str("command_id").? ~
str("workflow_id").? ~
array[String]("event_witnesses")
protected type CreatedEventRow =
SharedRow ~ InputStream ~ Array[String] ~ Array[String] ~ Option[String] ~ Option[InputStream]
protected val createdEventRow: RowParser[CreatedEventRow] =
sharedRow ~
binaryStream("create_argument") ~
array[String]("create_signatories") ~
array[String]("create_observers") ~
str("create_agreement_text").? ~
binaryStream("create_key_value").?
protected type ExercisedEventRow =
SharedRow ~ Boolean ~ String ~ InputStream ~ Option[InputStream] ~ Array[String] ~ Array[String]
protected val exercisedEventRow: RowParser[ExercisedEventRow] =
sharedRow ~
bool("exercise_consuming") ~
str("exercise_choice") ~
binaryStream("exercise_argument") ~
binaryStream("exercise_result").? ~
array[String]("exercise_actors") ~
array[String]("exercise_child_event_ids")
protected type ArchiveEventRow = SharedRow
protected val archivedEventRow: RowParser[ArchiveEventRow] = sharedRow
}

View File

@ -7,7 +7,7 @@ import anorm.{Row, SimpleSql, SqlStringInterpolation}
import com.daml.ledger.participant.state.v1.Offset
import com.daml.platform.store.Conversions.OffsetToStatement
trait EventsTableDelete {
object EventsTableDelete {
/**
* Delete

View File

@ -8,10 +8,10 @@ import com.daml.ledger.participant.state.v1.Offset
import com.daml.ledger.TransactionId
import com.daml.platform.store.Conversions._
private[events] trait EventsTableFlatEvents { this: EventsTable =>
private[events] object EventsTableFlatEvents {
private val createdFlatEventParser: RowParser[EventsTable.Entry[Raw.FlatEvent.Created]] =
createdEventRow map {
EventsTable.createdEventRow map {
case eventOffset ~ transactionId ~ nodeIndex ~ eventSequentialId ~ eventId ~ contractId ~ ledgerEffectiveTime ~ templateId ~ commandId ~ workflowId ~ eventWitnesses ~ createArgument ~ createSignatories ~ createObservers ~ createAgreementText ~ createKeyValue =>
EventsTable.Entry(
eventOffset = eventOffset,
@ -36,7 +36,7 @@ private[events] trait EventsTableFlatEvents { this: EventsTable =>
}
private val archivedFlatEventParser: RowParser[EventsTable.Entry[Raw.FlatEvent.Archived]] =
archivedEventRow map {
EventsTable.archivedEventRow map {
case eventOffset ~ transactionId ~ nodeIndex ~ eventSequentialId ~ eventId ~ contractId ~ ledgerEffectiveTime ~ templateId ~ commandId ~ workflowId ~ eventWitnesses =>
EventsTable.Entry(
eventOffset = eventOffset,
@ -151,7 +151,7 @@ private[events] trait EventsTableFlatEvents { this: EventsTable =>
private def getActiveContractsQueries(sqlFunctions: SqlFunctions) =
new EventsTableFlatEventsRangeQueries.GetActiveContracts(
selectColumns = selectColumns,
sqlFunctions = sqlFunctions
sqlFunctions = sqlFunctions,
)
def preparePagedGetActiveContracts(sqlFunctions: SqlFunctions)(

View File

@ -114,9 +114,17 @@ private[events] sealed abstract class EventsTableFlatEventsRangeQueries[Offset]
frqK match {
case QueryParts.ByArith(read) =>
EventsRange.readPage(read, EventsTable.rawFlatEventParser, offsetRange(offset), pageSize)
EventsRange.readPage(
read,
EventsTableFlatEvents.rawFlatEventParser,
offsetRange(offset),
pageSize,
)
case QueryParts.ByLimit(sql) =>
SqlSequence.vector(sql withFetchSize Some(pageSize), EventsTable.rawFlatEventParser)
SqlSequence.vector(
sql withFetchSize Some(pageSize),
EventsTableFlatEvents.rawFlatEventParser,
)
}
}
}

View File

@ -3,14 +3,23 @@
package com.daml.platform.store.dao.events
import java.sql.Connection
import java.time.Instant
import anorm.NamedParameter
import anorm.{BatchSql, NamedParameter}
import com.daml.ledger.{EventId, TransactionId}
import com.daml.ledger.participant.state.v1.{Offset, SubmitterInfo, WorkflowId}
import com.daml.platform.store.Conversions._
private[events] trait EventsTableInsert { this: EventsTable =>
object EventsTableH2Database extends EventsTable {
final class Batches(insertEvents: Option[BatchSql], updateArchives: Option[BatchSql])
extends EventsTable.Batches {
override def execute()(implicit connection: Connection): Unit = {
insertEvents.foreach(_.execute())
updateArchives.foreach(_.execute())
}
}
private val insertEvent: String = {
val (columns, values) = Seq(
@ -173,7 +182,7 @@ private[events] trait EventsTableInsert { this: EventsTable =>
tx: TransactionIndexing.TransactionInfo,
info: TransactionIndexing.EventsInfo,
serialized: TransactionIndexing.Serialized,
): EventsTable.Executables = {
): EventsTable.Batches = {
val events = transaction(
offset = tx.offset,
@ -193,11 +202,10 @@ private[events] trait EventsTableInsert { this: EventsTable =>
val archivals =
info.archives.iterator.map(archive(tx.offset)).toList
EventsTable.Executables(
new Batches(
insertEvents = batch(insertEvent, events),
updateArchives = batch(updateArchived, archivals),
)
}
}

View File

@ -0,0 +1,213 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.store.dao.events
import java.sql.{Connection, PreparedStatement}
import java.time.Instant
import anorm.{BatchSql, NamedParameter, Row, SimpleSql, SqlStringInterpolation, ToStatement}
import com.daml.ledger.participant.state.v1.Offset
import com.daml.lf.ledger.EventId
import com.daml.platform.store.Conversions._
object EventsTablePostgresql extends EventsTable {
/**
* Insertions are represented by a single statement made of nested arrays, one per column, instead of JDBC batches.
* This leverages a PostgreSQL-specific feature known as "array unnesting", which has shown to be considerable
* faster than using JDBC batches.
*/
final class Batches(
insertEvents: Option[SimpleSql[Row]],
updateArchives: Option[BatchSql],
) extends EventsTable.Batches {
override def execute()(implicit connection: Connection): Unit = {
insertEvents.foreach(_.execute())
updateArchives.foreach(_.execute())
}
}
private val updateArchived =
"""update participant_events set create_consumed_at={consumed_at} where contract_id={contract_id} and create_argument is not null"""
private def archive(consumedAt: Offset)(contractId: ContractId): Vector[NamedParameter] =
Vector[NamedParameter](
"consumed_at" -> consumedAt,
"contract_id" -> contractId.coid,
)
override def toExecutables(
tx: TransactionIndexing.TransactionInfo,
info: TransactionIndexing.EventsInfo,
serialized: TransactionIndexing.Serialized,
): EventsTable.Batches = {
val batchSize = info.events.size
val eventIds = Array.ofDim[String](batchSize)
val eventOffsets = Array.fill(batchSize)(tx.offset.toByteArray)
val contractIds = Array.ofDim[String](batchSize)
val transactionIds = Array.fill(batchSize)(tx.transactionId.asInstanceOf[String])
val workflowIds = Array.fill(batchSize)(tx.workflowId.map(_.asInstanceOf[String]).orNull)
val ledgerEffectiveTimes = Array.fill(batchSize)(tx.ledgerEffectiveTime)
val templateIds = Array.ofDim[String](batchSize)
val nodeIndexes = Array.ofDim[java.lang.Integer](batchSize)
val commandIds =
Array.fill(batchSize)(tx.submitterInfo.map(_.commandId.asInstanceOf[String]).orNull)
val applicationIds =
Array.fill(batchSize)(tx.submitterInfo.map(_.applicationId.asInstanceOf[String]).orNull)
val submitters = Array.fill(batchSize)(
tx.submitterInfo.map(_.singleSubmitterOrThrow().asInstanceOf[String]).orNull)
val flatEventWitnesses = Array.ofDim[String](batchSize)
val treeEventWitnesses = Array.ofDim[String](batchSize)
val createArguments = Array.ofDim[Array[Byte]](batchSize)
val createSignatories = Array.ofDim[String](batchSize)
val createObservers = Array.ofDim[String](batchSize)
val createAgreementTexts = Array.ofDim[String](batchSize)
val createConsumedAt = Array.ofDim[Array[Byte]](batchSize)
val createKeyValues = Array.ofDim[Array[Byte]](batchSize)
val exerciseConsuming = Array.ofDim[java.lang.Boolean](batchSize)
val exerciseChoices = Array.ofDim[String](batchSize)
val exerciseArguments = Array.ofDim[Array[Byte]](batchSize)
val exerciseResults = Array.ofDim[Array[Byte]](batchSize)
val exerciseActors = Array.ofDim[String](batchSize)
val exerciseChildEventIds = Array.ofDim[String](batchSize)
for (((nodeId, node), i) <- info.events.zipWithIndex) {
node match {
case create: Create =>
contractIds(i) = create.coid.coid
templateIds(i) = create.coinst.template.toString
eventIds(i) = EventId(tx.transactionId, nodeId).toLedgerString
nodeIndexes(i) = nodeId.index
flatEventWitnesses(i) = info.stakeholders.getOrElse(nodeId, Set.empty).mkString("|")
treeEventWitnesses(i) = info.disclosure.getOrElse(nodeId, Set.empty).mkString("|")
createArguments(i) = serialized.createArguments(nodeId)
createSignatories(i) = create.signatories.mkString("|")
createObservers(i) = create.stakeholders.diff(create.signatories).mkString("|")
if (create.coinst.agreementText.nonEmpty) {
createAgreementTexts(i) = create.coinst.agreementText
}
createKeyValues(i) = serialized.createKeyValues.get(nodeId).orNull
case exercise: Exercise =>
contractIds(i) = exercise.targetCoid.coid
templateIds(i) = exercise.templateId.toString
eventIds(i) = EventId(tx.transactionId, nodeId).toLedgerString
nodeIndexes(i) = nodeId.index
flatEventWitnesses(i) = info.stakeholders.getOrElse(nodeId, Set.empty).mkString("|")
treeEventWitnesses(i) = info.disclosure.getOrElse(nodeId, Set.empty).mkString("|")
exerciseConsuming(i) = exercise.consuming
exerciseChoices(i) = exercise.choiceId
exerciseArguments(i) = serialized.exerciseArguments(nodeId)
exerciseResults(i) = serialized.exerciseResults.get(nodeId).orNull
exerciseActors(i) = exercise.actingParties.mkString("|")
exerciseChildEventIds(i) = exercise.children
.map(EventId(tx.transactionId, _).toLedgerString)
.iterator
.mkString("|")
case _ => throw new UnexpectedNodeException(nodeId, tx.transactionId)
}
}
val inserts = insertEvents(
eventIds,
eventOffsets,
contractIds,
transactionIds,
workflowIds,
ledgerEffectiveTimes,
templateIds,
nodeIndexes,
commandIds,
applicationIds,
submitters,
flatEventWitnesses,
treeEventWitnesses,
createArguments,
createSignatories,
createObservers,
createAgreementTexts,
createConsumedAt,
createKeyValues,
exerciseConsuming,
exerciseChoices,
exerciseArguments,
exerciseResults,
exerciseActors,
exerciseChildEventIds
)
val archivals =
info.archives.iterator.map(archive(tx.offset)).toList
new Batches(
insertEvents = Some(inserts),
updateArchives = batch(updateArchived, archivals),
)
}
// Specific for PostgreSQL parallel unnesting insertions
private implicit object ByteArrayArrayToStatement extends ToStatement[Array[Array[Byte]]] {
override def set(s: PreparedStatement, index: Int, v: Array[Array[Byte]]): Unit =
s.setObject(index, v)
}
private implicit object InstantArrayToStatement extends ToStatement[Array[Instant]] {
override def set(s: PreparedStatement, index: Int, v: Array[Instant]): Unit = {
val conn = s.getConnection
val ts = conn.createArrayOf("TIMESTAMP", v.map(java.sql.Timestamp.from))
s.setArray(index, ts)
}
}
private def insertEvents(
eventIds: Array[String],
eventOffsets: Array[Array[Byte]],
contractIds: Array[String],
transactionIds: Array[String],
workflowIds: Array[String],
ledgerEffectiveTimes: Array[Instant],
templateIds: Array[String],
nodeIndexes: Array[java.lang.Integer],
commandIds: Array[String],
applicationIds: Array[String],
submitters: Array[String],
flatEventWitnesses: Array[String],
treeEventWitnesses: Array[String],
createArguments: Array[Array[Byte]],
createSignatories: Array[String],
createObservers: Array[String],
createAgreementTexts: Array[String],
createConsumedAt: Array[Array[Byte]],
createKeyValues: Array[Array[Byte]],
exerciseConsuming: Array[java.lang.Boolean],
exerciseChoices: Array[String],
exerciseArguments: Array[Array[Byte]],
exerciseResults: Array[Array[Byte]],
exerciseActors: Array[String],
exerciseChildEventIds: Array[String],
) =
SQL"""insert into participant_events(
event_id, event_offset, contract_id, transaction_id, workflow_id, ledger_effective_time, template_id, node_index, command_id, application_id, submitter, flat_event_witnesses, tree_event_witnesses,
create_argument, create_signatories, create_observers, create_agreement_text, create_consumed_at, create_key_value,
exercise_consuming, exercise_choice, exercise_argument, exercise_result, exercise_actors, exercise_child_event_ids
)
select
event_id, event_offset, contract_id, transaction_id, workflow_id, ledger_effective_time, template_id, node_index, command_id, application_id, submitter, string_to_array(flat_event_witnesses, '|'), string_to_array(tree_event_witnesses, '|'),
create_argument, string_to_array(create_signatories,'|'), string_to_array(create_observers,'|'), create_agreement_text, create_consumed_at, create_key_value,
exercise_consuming, exercise_choice, exercise_argument, exercise_result, string_to_array(exercise_actors,'|'), string_to_array(exercise_child_event_ids,'|')
from
unnest(
$eventIds::varchar[], $eventOffsets::bytea[], $contractIds::varchar[], $transactionIds::varchar[], $workflowIds::varchar[], $ledgerEffectiveTimes::timestamp[], $templateIds::varchar[], $nodeIndexes::int[], $commandIds::varchar[], $applicationIds::varchar[], $submitters::varchar[], $flatEventWitnesses::varchar[], $treeEventWitnesses::varchar[],
$createArguments::bytea[], $createSignatories::varchar[], $createObservers::varchar[], $createAgreementTexts::varchar[], $createConsumedAt::bytea[], $createKeyValues::bytea[],
$exerciseConsuming::bool[], $exerciseChoices::varchar[], $exerciseArguments::bytea[], $exerciseResults::bytea[], $exerciseActors::varchar[], $exerciseChildEventIds::varchar[]
)
as
t(
event_id, event_offset, contract_id, transaction_id, workflow_id, ledger_effective_time, template_id, node_index, command_id, application_id, submitter, flat_event_witnesses, tree_event_witnesses,
create_argument, create_signatories, create_observers, create_agreement_text, create_consumed_at, create_key_value,
exercise_consuming, exercise_choice, exercise_argument, exercise_result, exercise_actors, exercise_child_event_ids
)
"""
}

View File

@ -7,10 +7,10 @@ import anorm.{Row, RowParser, SimpleSql, SqlStringInterpolation, ~}
import com.daml.ledger.TransactionId
import com.daml.platform.store.Conversions._
private[events] trait EventsTableTreeEvents { this: EventsTable =>
private[events] object EventsTableTreeEvents {
private val createdTreeEventParser: RowParser[EventsTable.Entry[Raw.TreeEvent.Created]] =
createdEventRow map {
EventsTable.createdEventRow map {
case eventOffset ~ transactionId ~ nodeIndex ~ eventSequentialId ~ eventId ~ contractId ~ ledgerEffectiveTime ~ templateId ~ commandId ~ workflowId ~ eventWitnesses ~ createArgument ~ createSignatories ~ createObservers ~ createAgreementText ~ createKeyValue =>
EventsTable.Entry(
eventOffset = eventOffset,
@ -35,7 +35,7 @@ private[events] trait EventsTableTreeEvents { this: EventsTable =>
}
private val exercisedTreeEventParser: RowParser[EventsTable.Entry[Raw.TreeEvent.Exercised]] =
exercisedEventRow map {
EventsTable.exercisedEventRow map {
case eventOffset ~ transactionId ~ nodeIndex ~ eventSequentialId ~ eventId ~ contractId ~ ledgerEffectiveTime ~ templateId ~ commandId ~ workflowId ~ eventWitnesses ~ exerciseConsuming ~ exerciseChoice ~ exerciseArgument ~ exerciseResult ~ exerciseActors ~ exerciseChildEventIds =>
EventsTable.Entry(
eventOffset = eventOffset,

View File

@ -75,7 +75,7 @@ private[dao] final class TransactionsReader(
val query = (range: EventsRange[(Offset, Long)]) => { implicit connection: Connection =>
logger.debug(s"getFlatTransactions query($range)")
QueryNonPruned.executeSqlOrThrow(
EventsTable
EventsTableFlatEvents
.preparePagedGetFlatTransactions(sqlFunctions)(
range = EventsRange(range.startExclusive._2, range.endInclusive._2),
filter = filter,
@ -112,10 +112,12 @@ private[dao] final class TransactionsReader(
requestingParties: Set[Party],
)(implicit loggingContext: LoggingContext): Future[Option[GetFlatTransactionResponse]] = {
val query =
EventsTable.prepareLookupFlatTransactionById(sqlFunctions)(transactionId, requestingParties)
EventsTableFlatEvents.prepareLookupFlatTransactionById(sqlFunctions)(
transactionId,
requestingParties)
dispatcher
.executeSql(dbMetrics.lookupFlatTransactionById) { implicit connection =>
query.asVectorOf(EventsTable.rawFlatEventParser)
query.asVectorOf(EventsTableFlatEvents.rawFlatEventParser)
}
.flatMap(
rawEvents =>
@ -142,7 +144,7 @@ private[dao] final class TransactionsReader(
val query = (range: EventsRange[(Offset, Long)]) => { implicit connection: Connection =>
logger.debug(s"getTransactionTrees query($range)")
QueryNonPruned.executeSqlOrThrow(
EventsTable
EventsTableTreeEvents
.preparePagedGetTransactionTrees(sqlFunctions)(
eventsRange = EventsRange(range.startExclusive._2, range.endInclusive._2),
requestingParties = requestingParties,
@ -179,10 +181,12 @@ private[dao] final class TransactionsReader(
requestingParties: Set[Party],
)(implicit loggingContext: LoggingContext): Future[Option[GetTransactionResponse]] = {
val query =
EventsTable.prepareLookupTransactionTreeById(sqlFunctions)(transactionId, requestingParties)
EventsTableTreeEvents.prepareLookupTransactionTreeById(sqlFunctions)(
transactionId,
requestingParties)
dispatcher
.executeSql(dbMetrics.lookupTransactionTreeById) { implicit connection =>
query.asVectorOf(EventsTable.rawTreeEventParser)
query.asVectorOf(EventsTableTreeEvents.rawTreeEventParser)
}
.flatMap(
rawEvents =>
@ -206,7 +210,7 @@ private[dao] final class TransactionsReader(
val query = (range: EventsRange[(Offset, Long)]) => { implicit connection: Connection =>
logger.debug(s"getActiveContracts query($range)")
QueryNonPruned.executeSqlOrThrow(
EventsTable
EventsTableFlatEvents
.preparePagedGetActiveContracts(sqlFunctions)(
range = range,
filter = filter,

View File

@ -22,16 +22,14 @@ import com.daml.platform.store.DbType
object TransactionsWriter {
final class PreparedInsert private[TransactionsWriter] (
eventsTableExecutables: EventsTable.Executables,
eventsTableExecutables: EventsTable.Batches,
contractsTableExecutables: ContractsTable.Executables,
contractWitnessesTableExecutables: ContractWitnessesTable.Executables,
) {
def write(metrics: Metrics)(implicit connection: Connection): Unit = {
import metrics.daml.index.db.storeTransactionDbMetrics._
val events = eventsTableExecutables.insertEvents.toList ++ eventsTableExecutables.updateArchives.toList
Timed.value(eventsBatch, events.foreach(_.execute()))
Timed.value(eventsBatch, eventsTableExecutables.execute())
// Delete the witnesses of contracts that being removed first, to
// respect the foreign key constraint of the underlying storage
@ -65,6 +63,7 @@ private[dao] final class TransactionsWriter(
lfValueTranslation: LfValueTranslation,
) {
private val eventsTable = EventsTable(dbType)
private val contractsTable = ContractsTable(dbType)
private val contractWitnessesTable = ContractWitnessesTable(dbType)
@ -105,7 +104,7 @@ private[dao] final class TransactionsWriter(
)
new TransactionsWriter.PreparedInsert(
EventsTable.toExecutables(indexing.transaction, indexing.events, serialized),
eventsTable.toExecutables(indexing.transaction, indexing.events, serialized),
contractsTable.toExecutables(indexing.transaction, indexing.contracts, serialized),
contractWitnessesTable.toExecutables(indexing.contractWitnesses),
)
@ -113,6 +112,6 @@ private[dao] final class TransactionsWriter(
}
def prepareEventsDelete(endInclusive: Offset): SimpleSql[Row] =
EventsTable.prepareEventsDelete(endInclusive)
EventsTableDelete.prepareEventsDelete(endInclusive)
}

File diff suppressed because it is too large Load Diff