[DPP-1327] ETQ: Use transaction_meta table for pointwise tx retrieval and drop tx_id based indexes (#15877)

This commit is contained in:
pbatko-da 2022-12-14 21:12:24 +01:00 committed by GitHub
parent 0dcd635eb8
commit 3429cb543e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 499 additions and 457 deletions

View File

@ -1 +1 @@
512e838cb0fb18c4ca7070f95ebe3340b28eb0b8b1b27f129add353a5fe6d018
71d97ec9efd5e6ceea2fabe31d4ab263544fa845594b96da88385c2a865358fe

View File

@ -220,9 +220,6 @@ CREATE INDEX participant_events_create_event_offset ON participant_events_create
-- sequential_id index for paging
CREATE INDEX participant_events_create_event_sequential_id ON participant_events_create (event_sequential_id);
-- lookup by transaction id
CREATE INDEX participant_events_create_transaction_id_idx ON participant_events_create (transaction_id);
-- lookup by contract id
CREATE INDEX participant_events_create_contract_id_idx ON participant_events_create (contract_id);
@ -281,9 +278,6 @@ CREATE INDEX participant_events_consuming_exercise_event_offset ON participant_e
-- sequential_id index for paging
CREATE INDEX participant_events_consuming_exercise_event_sequential_id ON participant_events_consuming_exercise (event_sequential_id);
-- lookup by transaction id
CREATE INDEX participant_events_consuming_exercise_transaction_id_idx ON participant_events_consuming_exercise (transaction_id);
-- lookup by contract id
CREATE INDEX participant_events_consuming_exercise_contract_id_idx ON participant_events_consuming_exercise (contract_id);
@ -339,9 +333,6 @@ CREATE INDEX participant_events_non_consuming_exercise_event_offset ON participa
-- sequential_id index for paging
CREATE INDEX participant_events_non_consuming_exercise_event_sequential_id ON participant_events_non_consuming_exercise (event_sequential_id);
-- lookup by transaction id
CREATE INDEX participant_events_non_consuming_exercise_transaction_id_idx ON participant_events_non_consuming_exercise (transaction_id);
CREATE TABLE string_interning (
internal_id integer PRIMARY KEY NOT NULL,
external_string text

View File

@ -0,0 +1 @@
68900c9d2b68e7b27c7717fdf4c23d314d358fce7f4b7d8906489f1533a6a5ed

View File

@ -0,0 +1,3 @@
DROP INDEX participant_events_create_transaction_id_idx;
DROP INDEX participant_events_consuming_exercise_transaction_id_idx;
DROP INDEX participant_events_non_consuming_exercise_transaction_id_idx;

View File

@ -0,0 +1 @@
68900c9d2b68e7b27c7717fdf4c23d314d358fce7f4b7d8906489f1533a6a5ed

View File

@ -0,0 +1,3 @@
DROP INDEX participant_events_create_transaction_id_idx;
DROP INDEX participant_events_consuming_exercise_transaction_id_idx;
DROP INDEX participant_events_non_consuming_exercise_transaction_id_idx;

View File

@ -13,18 +13,9 @@ import com.daml.lf.crypto.Hash
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.ledger.EventId
import com.daml.logging.LoggingContext
import com.daml.platform.{
ApplicationId,
ContractId,
Identifier,
Key,
PackageId,
Party,
TransactionId,
}
import com.daml.platform.{ApplicationId, ContractId, Identifier, Key, PackageId, Party}
import com.daml.platform.store.EventSequentialId
import com.daml.platform.store.dao.events.Raw
import com.daml.platform.store.backend.EventStorageBackend.FilterParams
import com.daml.platform.store.backend.MeteringParameterStorageBackend.LedgerMeteringEnd
import com.daml.platform.store.backend.postgresql.PostgresDataSourceConfig
import com.daml.platform.store.entries.{ConfigurationEntry, PackageLedgerEntry, PartyLedgerEntry}
@ -34,7 +25,10 @@ import com.daml.scalautil.NeverEqualsOverride
import java.sql.Connection
import javax.sql.DataSource
import com.daml.platform.store.backend.common.TransactionStreamingQueries
import com.daml.platform.store.backend.common.{
TransactionPointwiseQueries,
TransactionStreamingQueries,
}
import scala.annotation.unused
@ -264,6 +258,7 @@ object ContractStorageBackend {
trait EventStorageBackend {
def transactionPointwiseQueries: TransactionPointwiseQueries
def transactionStreamingQueries: TransactionStreamingQueries
/** Part of pruning process, this needs to be in the same transaction as the other pruning related database operations
@ -283,17 +278,10 @@ trait EventStorageBackend {
allFilterParties: Set[Party],
endInclusive: Long,
)(connection: Connection): Vector[EventStorageBackend.Entry[Raw.FlatEvent]]
def flatTransaction(
transactionId: TransactionId,
filterParams: FilterParams,
)(connection: Connection): Vector[EventStorageBackend.Entry[Raw.FlatEvent]]
def transactionTree(
transactionId: TransactionId,
filterParams: FilterParams,
)(connection: Connection): Vector[EventStorageBackend.Entry[Raw.TreeEvent]]
/** Max event sequential id of observable (create, consuming and nonconsuming exercise) events. */
def maxEventSequentialIdOfAnObservableEvent(offset: Offset)(connection: Connection): Option[Long]
def rawEvents(startExclusive: Long, endInclusive: Long)(
connection: Connection
): Vector[EventStorageBackend.RawTransactionEvent]

View File

@ -16,7 +16,6 @@ import com.daml.platform.index.index.StatusDetails
import com.daml.platform.store.dao.JdbcLedgerDao
import com.daml.platform.store.dao.events._
import com.daml.platform._
import java.util.UUID
object UpdateToDbDto {

View File

@ -21,8 +21,8 @@ import com.daml.platform.store.backend.Conversions.{
import com.daml.platform.store.backend.common.SimpleSqlAsVectorOf._
import com.daml.platform.store.dao.events.Raw
import com.daml.platform.store.backend.EventStorageBackend
import com.daml.platform.store.backend.EventStorageBackend.{FilterParams, RawTransactionEvent}
import com.daml.platform.store.backend.common.ComposableQuery.{CompositeSql, SqlStringInterpolation}
import com.daml.platform.store.backend.EventStorageBackend.RawTransactionEvent
import com.daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation
import com.daml.platform.store.cache.LedgerEndCache
import com.daml.platform.store.interning.StringInterning
@ -399,10 +399,15 @@ object EventStorageBackendTemplate {
"NULL as driver_metadata",
).mkString(", ")
val EventSequentialIdFirstLast: RowParser[(Long, Long)] =
long("event_sequential_id_first") ~ long("event_sequential_id_last") map {
case event_sequential_id_first ~ event_sequential_id_last =>
(event_sequential_id_first, event_sequential_id_last)
}
}
abstract class EventStorageBackendTemplate(
eventStrategy: EventStrategy,
queryStrategy: QueryStrategy,
ledgerEndCache: LedgerEndCache,
stringInterning: StringInterning,
@ -410,95 +415,24 @@ abstract class EventStorageBackendTemplate(
participantAllDivulgedContractsPrunedUpToInclusive: Connection => Option[Offset],
) extends EventStorageBackend {
import com.daml.platform.store.backend.Conversions.ArrayColumnToIntArray._
import EventStorageBackendTemplate._
private val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)
override def transactionPointwiseQueries: TransactionPointwiseQueries =
new TransactionPointwiseQueries(
queryStrategy = queryStrategy,
ledgerEndCache = ledgerEndCache,
stringInterning = stringInterning,
)
override def transactionStreamingQueries: TransactionStreamingQueries =
new TransactionStreamingQueries(
queryStrategy = queryStrategy,
stringInterning = stringInterning,
)
private def events[T](
joinClause: CompositeSql,
additionalAndClause: CompositeSql,
rowParser: Set[Int] => RowParser[T],
witnessesColumn: String,
partitions: List[(String, String)],
)(
limit: Option[Int],
fetchSizeHint: Option[Int],
filterParams: FilterParams,
)(connection: Connection): Vector[T] = {
val internedAllParties: Set[Int] =
filterParams.wildCardParties.iterator
.++(filterParams.partiesAndTemplates.iterator.flatMap(_._1.iterator))
.map(stringInterning.party.tryInternalize)
.flatMap(_.iterator)
.toSet
val internedWildcardParties: Set[Int] = filterParams.wildCardParties.view
.flatMap(party => stringInterning.party.tryInternalize(party).toList)
.toSet
val internedPartiesAndTemplates: List[(Set[Int], Set[Int])] =
filterParams.partiesAndTemplates.iterator
.map { case (parties, templateIds) =>
(
parties.flatMap(s => stringInterning.party.tryInternalize(s).toList),
templateIds.flatMap(s => stringInterning.templateId.tryInternalize(s).toList),
)
}
.filterNot(_._1.isEmpty)
.filterNot(_._2.isEmpty)
.toList
if (internedWildcardParties.isEmpty && internedPartiesAndTemplates.isEmpty) {
Vector.empty
} else {
val wildcardPartiesClause = if (internedWildcardParties.isEmpty) {
Nil
} else {
eventStrategy.wildcardPartiesClause(witnessesColumn, internedWildcardParties) :: Nil
}
val filterPartiesClauses = internedPartiesAndTemplates.map { case (parties, templates) =>
eventStrategy.partiesAndTemplatesClause(witnessesColumn, parties, templates)
}
val witnessesWhereClause =
(wildcardPartiesClause ::: filterPartiesClauses).mkComposite("(", " or ", ")")
// NOTE:
// 1. We use `order by event_sequential_id` to hint Postgres to use an index scan rather than a sequential scan.
// 2. We also need to wrap this subquery in another subquery because
// on Oracle subqueries used with `union all` cannot contain an `order by` clause.
def selectFrom(table: String, selectColumns: String) = cSQL"""
(SELECT #$selectColumns, event_witnesses, command_id FROM ( SELECT
#$selectColumns, #$witnessesColumn as event_witnesses, command_id
FROM
#$table $joinClause
WHERE
$additionalAndClause
$witnessesWhereClause
ORDER BY event_sequential_id
) x)
"""
val selectClause = partitions
.map(p => selectFrom(p._1, p._2))
.mkComposite("", " UNION ALL", "")
SQL"""
$selectClause
ORDER BY event_sequential_id
${QueryStrategy.limitClause(limit)}"""
.withFetchSize(fetchSizeHint)
.asVectorOf(rowParser(internedAllParties))(connection)
}
}
override def activeContractEventBatch(
eventSequentialIds: Iterable[Long],
allFilterParties: Set[Ref.Party],
@ -530,65 +464,6 @@ abstract class EventStorageBackendTemplate(
.asVectorOf(rawFlatEventParser(allInternedFilterParties, stringInterning))(connection)
}
override def flatTransaction(
transactionId: Ref.TransactionId,
filterParams: FilterParams,
)(connection: Connection): Vector[EventStorageBackend.Entry[Raw.FlatEvent]] = {
import com.daml.platform.store.backend.Conversions.ledgerStringToStatement
import com.daml.platform.store.backend.Conversions.OffsetToStatement
val ledgerEndOffset = ledgerEndCache()._1
events(
joinClause = cSQL"""JOIN parameters ON
(participant_pruned_up_to_inclusive is null or event_offset > participant_pruned_up_to_inclusive)
AND event_offset <= $ledgerEndOffset""",
additionalAndClause = cSQL"""
transaction_id = $transactionId AND""",
rowParser = rawFlatEventParser(_, stringInterning),
witnessesColumn = "flat_event_witnesses",
partitions = List(
// we do not want to fetch divulgence events
"participant_events_create" -> selectColumnsForFlatTransactionsCreate,
"participant_events_consuming_exercise" -> selectColumnsForFlatTransactionsExercise,
"participant_events_non_consuming_exercise" -> selectColumnsForFlatTransactionsExercise,
),
)(
limit = None,
fetchSizeHint = None,
filterParams = filterParams,
)(connection)
}
override def transactionTree(
transactionId: Ref.TransactionId,
filterParams: FilterParams,
)(connection: Connection): Vector[EventStorageBackend.Entry[Raw.TreeEvent]] = {
import com.daml.platform.store.backend.Conversions.ledgerStringToStatement
import com.daml.platform.store.backend.Conversions.OffsetToStatement
val ledgerEndOffset = ledgerEndCache()._1
events(
joinClause = cSQL"""JOIN parameters ON
(participant_pruned_up_to_inclusive is null or event_offset > participant_pruned_up_to_inclusive)
AND event_offset <= $ledgerEndOffset""",
additionalAndClause = cSQL"""
transaction_id = $transactionId AND""",
rowParser = rawTreeEventParser(_, stringInterning),
witnessesColumn = "tree_event_witnesses",
partitions = List(
// we do not want to fetch divulgence events
"participant_events_create" -> s"$selectColumnsForTransactionTreeCreate, ${queryStrategy
.constBooleanSelect(false)} as exercise_consuming",
"participant_events_consuming_exercise" -> s"$selectColumnsForTransactionTreeExercise, ${queryStrategy
.constBooleanSelect(true)} as exercise_consuming",
"participant_events_non_consuming_exercise" -> s"$selectColumnsForTransactionTreeExercise, ${queryStrategy
.constBooleanSelect(false)} as exercise_consuming",
),
)(
limit = None,
fetchSizeHint = None,
filterParams,
)(connection)
}
// TODO etq: Implement pruning queries in terms of event sequential id in order to be able to drop offset based indices.
/** Deletes a subset of the indexed data (up to the pruning offset) in the following order and in the manner specified:
* 1.a if pruning-all-divulged-contracts is enabled: all divulgence events (retroactive divulgence),
@ -1038,44 +913,7 @@ abstract class EventStorageBackendTemplate(
AND
events.event_sequential_id = id_filter.event_sequential_id
)"""
}
}
/** This encapsulates the moving part as composing various Events queries.
*/
trait EventStrategy {
/** Generates a clause that checks whether any of the given wildcard parties is a witness
*
* @param witnessesColumnName name of the Array column holding witnesses
* @param internedWildcardParties List of all wildcard parties (their interned names).
* Guaranteed to be non-empty.
* @return the composable SQL
*/
def wildcardPartiesClause(
witnessesColumnName: String,
internedWildcardParties: Set[Int],
): CompositeSql
/** Generates a clause that checks whether the given parties+templates filter matches the contract,
* i.e., whether any of the template ids matches AND any of the parties is a witness
*
* @param witnessesColumnName Name of the Array column holding witnesses
* @param internedParties The non-empty list of interned party names
* @param internedTemplates The non-empty list of interned template names
* @return the composable SQL for this filter
*/
def partiesAndTemplatesClause(
witnessesColumnName: String,
internedParties: Set[Int],
internedTemplates: Set[Int],
): CompositeSql
/** Pruning pe_create_id_filter_stakeholder entries.
*
* @param pruneUpToInclusive create and archive events must be earlier or equal to this offset
* @return the executable anorm query
*/
def pruneCreateFilters(pruneUpToInclusive: Offset): SimpleSql[Row]
}

View File

@ -0,0 +1,171 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.store.backend.common
import java.sql.Connection
import anorm.RowParser
import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref
import com.daml.platform.Party
import com.daml.platform.store.backend.EventStorageBackend
import com.daml.platform.store.cache.LedgerEndCache
import com.daml.platform.store.dao.events.Raw
import com.daml.platform.store.interning.StringInterning
import com.daml.platform.store.backend.common.SimpleSqlAsVectorOf._
import com.daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation
class TransactionPointwiseQueries(
queryStrategy: QueryStrategy,
ledgerEndCache: LedgerEndCache,
stringInterning: StringInterning,
) {
import EventStorageBackendTemplate._
/** Fetches a matching event sequential id range unless it's within the pruning offset.
*/
def fetchIdsFromTransactionMeta(
transactionId: Ref.TransactionId
)(connection: Connection): Option[(Long, Long)] = {
import com.daml.platform.store.backend.Conversions.ledgerStringToStatement
import com.daml.platform.store.backend.Conversions.OffsetToStatement
// 1. Checking whether "event_offset <= ledgerEndOffset" is needed because during indexing
// the events and transaction_meta tables are written to prior to the ledger end being updated.
// 2. Checking "event_offset > participant_pruned_up_to_inclusive" is needed in order to
// prevent fetching data that is within the pruning offset. (Such data may only be accessed by retrieving an ACS)
val ledgerEndOffset: Offset = ledgerEndCache()._1
SQL"""
SELECT
t.event_sequential_id_first,
t.event_sequential_id_last
FROM
participant_transaction_meta t
JOIN parameters p
ON
p.participant_pruned_up_to_inclusive IS NULL
OR
t.event_offset > p.participant_pruned_up_to_inclusive
WHERE
t.transaction_id = $transactionId
AND
t.event_offset <= $ledgerEndOffset
""".as(EventSequentialIdFirstLast.singleOpt)(connection)
}
def fetchFlatTransactionEvents(
firstEventSequentialId: Long,
lastEventSequentialId: Long,
requestingParties: Set[Party],
)(connection: Connection): Vector[EventStorageBackend.Entry[Raw.FlatEvent]] = {
fetchEventsForTransactionPointWiseLookup(
firstEventSequentialId = firstEventSequentialId,
lastEventSequentialId = lastEventSequentialId,
witnessesColumn = "flat_event_witnesses",
tables = List(
SelectTable(
tableName = "participant_events_create",
selectColumns = selectColumnsForFlatTransactionsCreate,
),
SelectTable(
tableName = "participant_events_consuming_exercise",
selectColumns = selectColumnsForFlatTransactionsExercise,
),
),
requestingParties = requestingParties,
filteringRowParser = rawFlatEventParser(_, stringInterning),
)(connection)
}
def fetchTreeTransactionEvents(
firstEventSequentialId: Long,
lastEventSequentialId: Long,
requestingParties: Set[Party],
)(connection: Connection): Vector[EventStorageBackend.Entry[Raw.TreeEvent]] = {
fetchEventsForTransactionPointWiseLookup(
firstEventSequentialId = firstEventSequentialId,
lastEventSequentialId = lastEventSequentialId,
witnessesColumn = "tree_event_witnesses",
tables = List(
SelectTable(
tableName = "participant_events_create",
selectColumns =
s"$selectColumnsForTransactionTreeCreate, ${queryStrategy.constBooleanSelect(false)} as exercise_consuming",
),
SelectTable(
tableName = "participant_events_consuming_exercise",
selectColumns =
s"$selectColumnsForTransactionTreeExercise, ${queryStrategy.constBooleanSelect(true)} as exercise_consuming",
),
SelectTable(
tableName = "participant_events_non_consuming_exercise",
selectColumns =
s"$selectColumnsForTransactionTreeExercise, ${queryStrategy.constBooleanSelect(false)} as exercise_consuming",
),
),
requestingParties = requestingParties,
filteringRowParser = rawTreeEventParser(_, stringInterning),
)(connection)
}
case class SelectTable(tableName: String, selectColumns: String)
private def fetchEventsForTransactionPointWiseLookup[T](
firstEventSequentialId: Long,
lastEventSequentialId: Long,
witnessesColumn: String,
tables: List[SelectTable],
requestingParties: Set[Party],
filteringRowParser: Set[Int] => RowParser[EventStorageBackend.Entry[T]],
)(connection: Connection): Vector[EventStorageBackend.Entry[T]] = {
val allInternedParties: Set[Int] = requestingParties.iterator
.map(stringInterning.party.tryInternalize)
.flatMap(_.iterator)
.toSet
// TODO etq: Consider implementing support for `fetchSizeHint` and `limit`.
def selectFrom(tableName: String, selectColumns: String) = cSQL"""
(
SELECT
#$selectColumns,
event_witnesses,
command_id
FROM
(
SELECT
#$selectColumns,
#$witnessesColumn as event_witnesses,
e.command_id
FROM
#$tableName e
JOIN parameters p
ON
p.participant_pruned_up_to_inclusive IS NULL
OR
e.event_offset > p.participant_pruned_up_to_inclusive
WHERE
e.event_sequential_id >= $firstEventSequentialId
AND
e.event_sequential_id <= $lastEventSequentialId
ORDER BY
e.event_sequential_id
) x
)
"""
val unionQuery = tables
.map(table =>
selectFrom(
tableName = table.tableName,
selectColumns = table.selectColumns,
)
)
.mkComposite("", " UNION ALL", "")
val parsedRows: Vector[EventStorageBackend.Entry[T]] = SQL"""
$unionQuery
ORDER BY event_sequential_id"""
.asVectorOf(
parser = filteringRowParser(allInternedParties)
)(connection)
parsedRows
}
}

View File

@ -16,7 +16,6 @@ import com.daml.platform.store.interning.StringInterning
class H2EventStorageBackend(ledgerEndCache: LedgerEndCache, stringInterning: StringInterning)
extends EventStorageBackendTemplate(
queryStrategy = H2QueryStrategy,
eventStrategy = H2EventStrategy,
ledgerEndCache = ledgerEndCache,
stringInterning = stringInterning,
participantAllDivulgedContractsPrunedUpToInclusive =

View File

@ -1,54 +0,0 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.store.backend.h2
import anorm.{Row, SimpleSql}
import com.daml.ledger.offset.Offset
import com.daml.platform.store.backend.common.ComposableQuery.{CompositeSql, SqlStringInterpolation}
import com.daml.platform.store.backend.common.EventStrategy
object H2EventStrategy extends EventStrategy {
override def wildcardPartiesClause(
witnessesColumnName: String,
internedWildcardParties: Set[Int],
): CompositeSql = {
cSQL"(${H2QueryStrategy.arrayIntersectionNonEmptyClause(witnessesColumnName, internedWildcardParties)})"
}
override def partiesAndTemplatesClause(
witnessesColumnName: String,
internedParties: Set[Int],
internedTemplates: Set[Int],
): CompositeSql = {
val clause =
H2QueryStrategy.arrayIntersectionNonEmptyClause(
witnessesColumnName,
internedParties,
)
// anorm does not like primitive arrays, so we need to box it
val templateIdsArray = internedTemplates.map(Int.box).toArray
cSQL"( ($clause) AND (template_id = ANY($templateIdsArray)) )"
}
override def pruneCreateFilters(pruneUpToInclusive: Offset): SimpleSql[Row] = {
import com.daml.platform.store.backend.Conversions.OffsetToStatement
SQL"""
-- Create events filter table (only for contracts archived before the specified offset)
delete from pe_create_id_filter_stakeholder
where exists (
select * from participant_events_create delete_events
where
delete_events.event_offset <= $pruneUpToInclusive and
exists (
SELECT 1 FROM participant_events_consuming_exercise archive_events
WHERE
archive_events.event_offset <= $pruneUpToInclusive AND
archive_events.contract_id = delete_events.contract_id
) and
delete_events.event_sequential_id = pe_create_id_filter_stakeholder.event_sequential_id
)"""
}
}

View File

@ -15,7 +15,6 @@ import com.daml.platform.store.interning.StringInterning
class OracleEventStorageBackend(ledgerEndCache: LedgerEndCache, stringInterning: StringInterning)
extends EventStorageBackendTemplate(
eventStrategy = OracleEventStrategy,
queryStrategy = OracleQueryStrategy,
ledgerEndCache = ledgerEndCache,
stringInterning = stringInterning,

View File

@ -1,51 +0,0 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.store.backend.oracle
import anorm.{Row, SimpleSql}
import com.daml.ledger.offset.Offset
import com.daml.platform.store.backend.common.ComposableQuery.{CompositeSql, SqlStringInterpolation}
import com.daml.platform.store.backend.common.EventStrategy
object OracleEventStrategy extends EventStrategy {
override def wildcardPartiesClause(
witnessesColumnName: String,
internedWildcardParties: Set[Int],
): CompositeSql = {
cSQL"(${OracleQueryStrategy.arrayIntersectionNonEmptyClause(witnessesColumnName, internedWildcardParties)})"
}
override def partiesAndTemplatesClause(
witnessesColumnName: String,
internedParties: Set[Int],
internedTemplates: Set[Int],
): CompositeSql = {
val clause =
OracleQueryStrategy.arrayIntersectionNonEmptyClause(
witnessesColumnName,
internedParties,
)
cSQL"( ($clause) AND (template_id IN ($internedTemplates)) )"
}
override def pruneCreateFilters(pruneUpToInclusive: Offset): SimpleSql[Row] = {
import com.daml.platform.store.backend.Conversions.OffsetToStatement
SQL"""
-- Create events filter table (only for contracts archived before the specified offset)
delete from pe_create_id_filter_stakeholder
where exists (
select * from participant_events_create delete_events
where
delete_events.event_offset <= $pruneUpToInclusive and
exists (
SELECT 1 FROM participant_events_consuming_exercise archive_events
WHERE
archive_events.event_offset <= $pruneUpToInclusive AND
archive_events.contract_id = delete_events.contract_id
) and
delete_events.event_sequential_id = pe_create_id_filter_stakeholder.event_sequential_id
)"""
}
}

View File

@ -17,7 +17,6 @@ import com.daml.platform.store.interning.StringInterning
class PostgresEventStorageBackend(ledgerEndCache: LedgerEndCache, stringInterning: StringInterning)
extends EventStorageBackendTemplate(
eventStrategy = PostgresEventStrategy,
queryStrategy = PostgresQueryStrategy,
ledgerEndCache = ledgerEndCache,
stringInterning = stringInterning,

View File

@ -1,49 +0,0 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.store.backend.postgresql
import anorm.{Row, SimpleSql}
import com.daml.ledger.offset.Offset
import com.daml.platform.store.backend.common.ComposableQuery.{CompositeSql, SqlStringInterpolation}
import com.daml.platform.store.backend.common.EventStrategy
object PostgresEventStrategy extends EventStrategy {
override def wildcardPartiesClause(
witnessesColumnName: String,
internedWildcardParties: Set[Int],
): CompositeSql = {
// anorm does not like primitive arrays, so we need to box it
val internedWildcardPartiesArray = internedWildcardParties.map(Int.box).toArray
cSQL"(#$witnessesColumnName::integer[] && $internedWildcardPartiesArray::integer[])"
}
override def partiesAndTemplatesClause(
witnessesColumnName: String,
internedParties: Set[Int],
internedTemplates: Set[Int],
): CompositeSql = {
// anorm does not like primitive arrays, so we need to box it
val partiesArray = internedParties.map(Int.box).toArray
val templateIdsArray = internedTemplates.map(Int.box).toArray
cSQL"( (#$witnessesColumnName::integer[] && $partiesArray::integer[]) AND (template_id = ANY($templateIdsArray::integer[])) )"
}
override def pruneCreateFilters(pruneUpToInclusive: Offset): SimpleSql[Row] = {
import com.daml.platform.store.backend.Conversions.OffsetToStatement
SQL"""
-- Create events filter table (only for contracts archived before the specified offset)
delete from pe_create_id_filter_stakeholder
using participant_events_create delete_events
where
delete_events.event_offset <= $pruneUpToInclusive and
exists (
SELECT 1 FROM participant_events_consuming_exercise archive_events
WHERE
archive_events.event_offset <= $pruneUpToInclusive AND
archive_events.contract_id = delete_events.contract_id
) and
delete_events.event_sequential_id = pe_create_id_filter_stakeholder.event_sequential_id"""
}
}

View File

@ -515,6 +515,20 @@ private class JdbcLedgerDao(
metrics = metrics,
)(servicesExecutionContext)
private val flatTransactionPointwiseReader = new TransactionFlatPointwiseReader(
dbDispatcher = dbDispatcher,
eventStorageBackend = readStorageBackend.eventStorageBackend,
metrics = metrics,
lfValueTranslation = translation,
)(servicesExecutionContext)
private val treeTransactionPointwiseReader = new TransactionTreePointwiseReader(
dbDispatcher = dbDispatcher,
eventStorageBackend = readStorageBackend.eventStorageBackend,
metrics = metrics,
lfValueTranslation = translation,
)(servicesExecutionContext)
override val transactionsReader: TransactionsReader =
new TransactionsReader(
dispatcher = dbDispatcher,
@ -525,6 +539,8 @@ private class JdbcLedgerDao(
lfValueTranslation = translation,
flatTransactionsStreamReader = flatTransactionsStreamReader,
treeTransactionsStreamReader = treeTransactionsStreamReader,
flatTransactionPointwiseReader = flatTransactionPointwiseReader,
treeTransactionPointwiseReader = treeTransactionPointwiseReader,
acsReader = acsReader,
)(
servicesExecutionContext

View File

@ -46,6 +46,8 @@ sealed trait Raw[+E] {
loggingContext: LoggingContext,
): Future[E]
def witnesses: Seq[String]
}
// TODO append-only: FIXME move
@ -127,6 +129,8 @@ object Raw {
with FlatEvent {
override protected def wrapInEvent(event: PbCreatedEvent): PbFlatEvent =
PbFlatEvent(PbFlatEvent.Event.Created(event))
override def witnesses: Seq[String] = raw.witnessParties
}
object Created {
@ -179,6 +183,8 @@ object Raw {
loggingContext: LoggingContext,
): Future[PbFlatEvent] =
Future.successful(PbFlatEvent(PbFlatEvent.Event.Archived(raw)))
override def witnesses: Seq[String] = raw.witnessParties
}
object Archived {
@ -221,6 +227,8 @@ object Raw {
with TreeEvent {
override protected def wrapInEvent(event: PbCreatedEvent): PbTreeEvent =
PbTreeEvent(PbTreeEvent.Kind.Created(event))
override def witnesses: Seq[String] = raw.witnessParties
}
object Created {
@ -278,6 +286,7 @@ object Raw {
.deserialize(this, eventProjectionProperties.verbose)
.map(event => PbTreeEvent(PbTreeEvent.Kind.Exercised(event)))
override def witnesses: Seq[String] = partial.witnessParties
}
object Exercised {

View File

@ -0,0 +1,159 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.store.dao.events
import java.sql.Connection
import com.daml.ledger.api.v1.event.Event
import com.daml.ledger.api.v1.transaction.TreeEvent
import com.daml.ledger.api.v1.transaction_service.{
GetFlatTransactionResponse,
GetTransactionResponse,
}
import com.daml.lf.data.Ref
import com.daml.logging.LoggingContext
import com.daml.metrics.{DatabaseMetrics, Metrics, Timed}
import com.daml.platform.Party
import com.daml.platform.store.backend.EventStorageBackend
import com.daml.platform.store.backend.EventStorageBackend.Entry
import com.daml.platform.store.dao.events.EventsTable.TransactionConversions
import com.daml.platform.store.dao.events.TransactionsReader.deserializeEntry
import com.daml.platform.store.dao.{DbDispatcher, EventProjectionProperties}
import scala.concurrent.{ExecutionContext, Future}
sealed trait TransactionPointwiseReader {
type EventT
type RawEventT <: Raw[EventT]
type RespT
def dbDispatcher: DbDispatcher
def eventStorageBackend: EventStorageBackend
def lfValueTranslation: LfValueTranslation
val metrics: Metrics
val dbMetric: DatabaseMetrics
implicit def ec: ExecutionContext
protected val dbMetrics: metrics.daml.index.db.type = metrics.daml.index.db
protected def fetchTransaction(
firstEventSequentialId: Long,
lastEventSequentialId: Long,
requestingParties: Set[Party],
eventProjectionProperties: EventProjectionProperties,
)(connection: Connection): Vector[EventStorageBackend.Entry[RawEventT]]
protected def toTransactionResponse(
events: Vector[Entry[EventT]]
): Option[RespT]
final def lookupTransactionById(
transactionId: Ref.TransactionId,
requestingParties: Set[Party],
eventProjectionProperties: EventProjectionProperties,
)(implicit loggingContext: LoggingContext): Future[Option[RespT]] = {
val requestingPartiesStrings: Set[String] = requestingParties.toSet[String]
for {
// Fetching event sequential id range corresponding to the requested transaction id
eventSeqIdRangeO <- dbDispatcher.executeSql(dbMetric)(
eventStorageBackend.transactionPointwiseQueries.fetchIdsFromTransactionMeta(transactionId =
transactionId
)
)
response <- eventSeqIdRangeO match {
case Some((firstEventSeqId, lastEventSeqId)) =>
for {
// Fetching all events from the event sequential id range
rawEvents <- dbDispatcher.executeSql(dbMetric)(
fetchTransaction(
firstEventSequentialId = firstEventSeqId,
lastEventSequentialId = lastEventSeqId,
requestingParties = requestingParties,
eventProjectionProperties = eventProjectionProperties,
)
)
// Filtering by requesting parties
filteredRawEvents = rawEvents.filter(
_.event.witnesses.exists(requestingPartiesStrings)
)
// Deserialization of lf values
deserialized <- Timed.value(
timer = dbMetric.translationTimer,
value = Future.traverse(filteredRawEvents)(
deserializeEntry(eventProjectionProperties, lfValueTranslation)
),
)
} yield {
// Conversion to API response type
toTransactionResponse(deserialized)
}
case None => Future.successful[Option[RespT]](None)
}
} yield response
}
}
final class TransactionTreePointwiseReader(
override val dbDispatcher: DbDispatcher,
override val eventStorageBackend: EventStorageBackend,
override val metrics: Metrics,
override val lfValueTranslation: LfValueTranslation,
)(implicit val ec: ExecutionContext)
extends TransactionPointwiseReader {
override type EventT = TreeEvent
override type RawEventT = Raw.TreeEvent
override type RespT = GetTransactionResponse
override val dbMetric: DatabaseMetrics = dbMetrics.lookupTransactionTreeById
override protected def fetchTransaction(
firstEventSequentialId: Long,
lastEventSequentialId: Long,
requestingParties: Set[Party],
eventProjectionProperties: EventProjectionProperties,
)(connection: Connection): Vector[EventStorageBackend.Entry[RawEventT]] = {
eventStorageBackend.transactionPointwiseQueries.fetchTreeTransactionEvents(
firstEventSequentialId = firstEventSequentialId,
lastEventSequentialId = lastEventSequentialId,
requestingParties = requestingParties,
)(connection)
}
override protected def toTransactionResponse(events: Vector[Entry[EventT]]): Option[RespT] = {
TransactionConversions.toGetTransactionResponse(events)
}
}
final class TransactionFlatPointwiseReader(
override val dbDispatcher: DbDispatcher,
override val eventStorageBackend: EventStorageBackend,
override val metrics: Metrics,
override val lfValueTranslation: LfValueTranslation,
)(implicit val ec: ExecutionContext)
extends TransactionPointwiseReader {
override type EventT = Event
override type RawEventT = Raw.FlatEvent
override type RespT = GetFlatTransactionResponse
override val dbMetric: DatabaseMetrics = dbMetrics.lookupFlatTransactionById
override protected def fetchTransaction(
firstEventSequentialId: Long,
lastEventSequentialId: Long,
requestingParties: Set[Party],
eventProjectionProperties: EventProjectionProperties,
)(connection: Connection): Vector[EventStorageBackend.Entry[RawEventT]] = {
eventStorageBackend.transactionPointwiseQueries.fetchFlatTransactionEvents(
firstEventSequentialId = firstEventSequentialId,
lastEventSequentialId = lastEventSequentialId,
requestingParties = requestingParties,
)(connection)
}
override protected def toTransactionResponse(events: Vector[Entry[EventT]]): Option[RespT] = {
TransactionConversions.toGetFlatTransactionResponse(events)
}
}

View File

@ -26,7 +26,6 @@ import com.daml.platform.store.dao.{
LedgerDaoTransactionsReader,
}
import com.daml.platform.store.backend.EventStorageBackend
import com.daml.platform.store.backend.EventStorageBackend.FilterParams
import com.daml.platform.store.dao.events.EventsTable.TransactionConversions
import com.daml.platform.store.utils.Telemetry
import com.daml.telemetry
@ -38,6 +37,8 @@ import scala.util.{Failure, Success}
/** @param flatTransactionsStreamReader Knows how to stream flat transactions
* @param treeTransactionsStreamReader Knows how to stream tree transactions
* @param flatTransactionPointwiseReader Knows how to fetch a flat transaction by its id
* @param treeTransactionPointwiseReader Knows how to fetch a tree transaction by its id
* @param dispatcher Executes the queries prepared by this object
* @param queryNonPruned
* @param eventStorageBackend
@ -50,6 +51,8 @@ import scala.util.{Failure, Success}
private[dao] final class TransactionsReader(
flatTransactionsStreamReader: TransactionsFlatStreamReader,
treeTransactionsStreamReader: TransactionsTreeStreamReader,
flatTransactionPointwiseReader: TransactionFlatPointwiseReader,
treeTransactionPointwiseReader: TransactionTreePointwiseReader,
dispatcher: DbDispatcher,
queryNonPruned: QueryNonPruned,
eventStorageBackend: EventStorageBackend,
@ -90,33 +93,30 @@ private[dao] final class TransactionsReader(
override def lookupFlatTransactionById(
transactionId: Ref.TransactionId,
requestingParties: Set[Party],
)(implicit loggingContext: LoggingContext): Future[Option[GetFlatTransactionResponse]] =
dispatcher
.executeSql(dbMetrics.lookupFlatTransactionById)(
eventStorageBackend.flatTransaction(
transactionId,
FilterParams(
wildCardParties = requestingParties,
partiesAndTemplates = Set.empty,
),
)
)
.flatMap(rawEvents =>
Timed.value(
timer = dbMetrics.lookupFlatTransactionById.translationTimer,
value = Future.traverse(rawEvents)(
deserializeEntry(
EventProjectionProperties(
verbose = true,
witnessTemplateIdFilter =
requestingParties.map(_.toString -> Set.empty[Identifier]).toMap,
),
lfValueTranslation,
)
),
)
)
.map(TransactionConversions.toGetFlatTransactionResponse)
)(implicit loggingContext: LoggingContext): Future[Option[GetFlatTransactionResponse]] = {
flatTransactionPointwiseReader.lookupTransactionById(
transactionId = transactionId,
requestingParties = requestingParties,
eventProjectionProperties = EventProjectionProperties(
verbose = true,
witnessTemplateIdFilter = requestingParties.map(_.toString -> Set.empty[Identifier]).toMap,
),
)
}
override def lookupTransactionTreeById(
transactionId: Ref.TransactionId,
requestingParties: Set[Party],
)(implicit loggingContext: LoggingContext): Future[Option[GetTransactionResponse]] = {
treeTransactionPointwiseReader.lookupTransactionById(
transactionId = transactionId,
requestingParties = requestingParties,
eventProjectionProperties = EventProjectionProperties(
verbose = true,
witnessTemplateIdFilter = requestingParties.map(_.toString -> Set.empty[Identifier]).toMap,
),
)
}
override def getTransactionTrees(
startExclusive: Offset,
@ -140,37 +140,6 @@ private[dao] final class TransactionsReader(
.mapMaterializedValue((_: Future[NotUsed]) => NotUsed)
}
override def lookupTransactionTreeById(
transactionId: Ref.TransactionId,
requestingParties: Set[Party],
)(implicit loggingContext: LoggingContext): Future[Option[GetTransactionResponse]] =
dispatcher
.executeSql(dbMetrics.lookupTransactionTreeById)(
eventStorageBackend.transactionTree(
transactionId,
FilterParams(
wildCardParties = requestingParties,
partiesAndTemplates = Set.empty,
),
)
)
.flatMap(rawEvents =>
Timed.value(
timer = dbMetrics.lookupTransactionTreeById.translationTimer,
value = Future.traverse(rawEvents)(
deserializeEntry(
EventProjectionProperties(
verbose = true,
witnessTemplateIdFilter =
requestingParties.map(_.toString -> Set.empty[Identifier]).toMap,
),
lfValueTranslation,
)
),
)
)
.map(TransactionConversions.toGetTransactionResponse)
override def getActiveContracts(
activeAt: Offset,
filter: TemplatePartiesFilter,

View File

@ -4,22 +4,30 @@
package com.daml.platform.store.backend
import com.daml.lf.data.Ref
import com.daml.platform.store.backend.EventStorageBackend.FilterParams
import com.daml.platform.store.backend.common.{
EventIdSourceForInformees,
EventPayloadSourceForTreeTx,
}
import com.daml.platform.store.dao.events.Raw
import org.scalatest.OptionValues
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
private[backend] trait StorageBackendTestsPruning extends Matchers with StorageBackendSpec {
private[backend] trait StorageBackendTestsPruning
extends Matchers
with OptionValues
with StorageBackendSpec {
this: AnyFlatSpec =>
behavior of "StorageBackend (pruning)"
import StorageBackendTestValues._
private val signatoryParty = Ref.Party.assertFromString("signatory")
private val observerParty = Ref.Party.assertFromString("observer")
private val nonStakeholderInformeeParty = Ref.Party.assertFromString("nonstakeholderinformee")
private val actorParty = Ref.Party.assertFromString("actor")
it should "correctly update the pruning offset" in {
val offset_1 = offset(3)
val offset_2 = offset(2)
@ -83,8 +91,6 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB
}
it should "prune consuming and non-consuming events" in {
val signatoryParty = Ref.Party.assertFromString("signatory")
val actorParty = Ref.Party.assertFromString("actor")
val nonConsuming = dtoExercise(
offset = offset(3),
eventSequentialId = 5L,
@ -231,9 +237,6 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB
}
it should "prune an archived contract" in {
val signatoryParty = Ref.Party.assertFromString("signatory")
val observerParty = Ref.Party.assertFromString("observer")
val nonStakeholderInformeeParty = Ref.Party.assertFromString("nonstakeholderinformee")
// a create event in its own transaction
val create = dtoCreate(
offset = offset(1),
@ -246,7 +249,6 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB
val createFilter1 = DbDto.IdFilterCreateStakeholder(1L, someTemplateId.toString, signatoryParty)
val createFilter2 = DbDto.IdFilterCreateStakeholder(1L, someTemplateId.toString, observerParty)
val createFilter3 = DbDto.IdFilterCreateNonStakeholderInformee(1L, nonStakeholderInformeeParty)
val createTransactionId = dtoTransactionId(create)
val createTxId = dtoTransactionId(create)
val createTxMeta = DbDto.TransactionMeta(
transaction_id = createTxId,
@ -272,7 +274,6 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB
event_sequential_id_first = archive.event_sequential_id,
event_sequential_id_last = archive.event_sequential_id,
)
val filter = FilterParams(Set(signatoryParty), Set.empty)
executeSql(backend.parameter.initializeParameters(someIdentityParams))
// Ingest a create and archive event
executeSql(
@ -325,7 +326,7 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB
)
}
def fetchTreeEvents_streaming(
def fetchTreeEventsCreate_streaming(
eventSequentialIds: Iterable[Long]
): Seq[EventStorageBackend.Entry[Raw.TreeEvent]] = {
executeSql(
@ -342,13 +343,18 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB
val before1_idsSignatory_streaming = fetchIdsSignatory_streaming
val before2_idsObserver_streaming = fetchIdsObserver_streaming
val before3_idsNonStakeholders_streaming = fetchIdsNonStakeholders_streaming
val before4_treeEvents_streaming = fetchTreeEvents_streaming(
val before4_treeEvents_streaming = fetchTreeEventsCreate_streaming(
before1_idsSignatory_streaming ++ before2_idsObserver_streaming ++ before3_idsNonStakeholders_streaming
)
val before5_ids_pointwise =
executeSql(backend.event.transactionPointwiseQueries.fetchIdsFromTransactionMeta(createTxId))
val before5_eventsFlat_pointwise =
executeSql(backend.event.flatTransaction(createTransactionId, filter))
fetchEventsFlat_pointwise(before5_ids_pointwise.value._1, before5_ids_pointwise.value._2)
val before6_eventsTree_pointwise =
executeSql(backend.event.transactionTree(createTransactionId, filter))
fetchEventsTree_pointwise(before5_ids_pointwise.value._1, before5_ids_pointwise.value._2)
val before7_txMeta =
executeSql(backend.event.transactionPointwiseQueries.fetchIdsFromTransactionMeta(createTxId))
val before7_rawEvents = executeSql(backend.event.rawEvents(0, 2L))
val before8_activeContracts = executeSql(
@ -359,8 +365,10 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB
before2_idsObserver_streaming should not be empty
before3_idsNonStakeholders_streaming should not be empty
before4_treeEvents_streaming should not be empty
before5_ids_pointwise should not be empty
before5_eventsFlat_pointwise should not be empty
before6_eventsTree_pointwise should not be empty
before7_txMeta should not be empty
before7_rawEvents should not be empty
before8_activeContracts shouldBe empty
// Prune
@ -380,13 +388,19 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB
target = EventIdSourceForInformees.CreateNonStakeholder
)(informee = signatoryParty, startExclusive = 0, endInclusive = 2L, limit = 10)
)
val after4_treeEvents = fetchTreeEvents_streaming(
val after4_treeEvents = fetchTreeEventsCreate_streaming(
before1_idsSignatory_streaming ++ before2_idsObserver_streaming ++ before3_idsNonStakeholders_streaming
)
val after5_ids_pointwise =
executeSql(backend.event.transactionPointwiseQueries.fetchIdsFromTransactionMeta(createTxId))
val after5_eventsFlat_pointwise =
executeSql(backend.event.flatTransaction(createTransactionId, filter))
fetchEventsFlat_pointwise(before5_ids_pointwise.value._1, before5_ids_pointwise.value._2)
val after6_eventsTree_pointwise =
executeSql(backend.event.transactionTree(createTransactionId, filter))
fetchEventsTree_pointwise(before5_ids_pointwise.value._1, before5_ids_pointwise.value._2)
val after7_txMeta =
executeSql(backend.event.transactionPointwiseQueries.fetchIdsFromTransactionMeta(createTxId))
val after7_rawEvents = executeSql(backend.event.rawEvents(0, 2L))
val after8_activeContracts = executeSql(
backend.event
@ -397,15 +411,14 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB
after3_idsNonStakeholders shouldBe empty
after4_treeEvents shouldBe empty
after5_eventsFlat_pointwise shouldBe empty
after5_ids_pointwise shouldBe empty
after6_eventsTree_pointwise shouldBe empty
after7_txMeta shouldBe empty
after7_rawEvents shouldBe empty
after8_activeContracts shouldBe empty
}
it should "not prune an active contract" in {
val signatoryParty = Ref.Party.assertFromString("signatory")
val observerParty = Ref.Party.assertFromString("observer")
val nonStakeholderInformeeParty = Ref.Party.assertFromString("nonstakeholderinformee")
val partyEntry = dtoPartyEntry(offset(1), signatoryParty)
val create = dtoCreate(
offset = offset(2),
@ -417,7 +430,6 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB
val createFilter1 = DbDto.IdFilterCreateStakeholder(1L, someTemplateId.toString, signatoryParty)
val createFilter2 = DbDto.IdFilterCreateStakeholder(1L, someTemplateId.toString, observerParty)
val createFilter3 = DbDto.IdFilterCreateNonStakeholderInformee(1L, nonStakeholderInformeeParty)
val createTransactionId = dtoTransactionId(create)
val createTxId = dtoTransactionId(create)
val createTxMeta = DbDto.TransactionMeta(
transaction_id = createTxId,
@ -425,7 +437,6 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB
event_sequential_id_first = create.event_sequential_id,
event_sequential_id_last = create.event_sequential_id,
)
val filter = FilterParams(Set(signatoryParty), Set.empty)
executeSql(backend.parameter.initializeParameters(someIdentityParams))
// Ingest a create and archive event
executeSql(
@ -495,10 +506,15 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB
val before_treeEvents_streaming = fetchTreeEvents_streaming(
before_idsSignatory_streaming ++ before_idsObserver_streaming ++ before_idsNonStakeholders_streaming
)
val before_ids_pointwise =
executeSql(backend.event.transactionPointwiseQueries.fetchIdsFromTransactionMeta(createTxId))
val before_eventsFlat_pointwise =
executeSql(backend.event.flatTransaction(createTransactionId, filter))
fetchEventsFlat_pointwise(before_ids_pointwise.value._1, before_ids_pointwise.value._2)
val before_eventsTree_pointwise =
executeSql(backend.event.transactionTree(createTransactionId, filter))
fetchEventsTree_pointwise(before_ids_pointwise.value._1, before_ids_pointwise.value._2)
val before_txMeta =
executeSql(backend.event.transactionPointwiseQueries.fetchIdsFromTransactionMeta(createTxId))
val before_rawEvents = executeSql(backend.event.rawEvents(0, 1L))
val before_activeContracts = executeSql(
backend.event
@ -508,7 +524,9 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB
before_idsNonStakeholders_streaming should not be empty
before_treeEvents_streaming should not be empty
before_eventsFlat_pointwise should not be empty
before_ids_pointwise should not be empty
before_eventsTree_pointwise should not be empty
before_txMeta should not be empty
before_rawEvents should not be empty
before_activeContracts should have size 1
// Prune
@ -526,10 +544,15 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB
val after_treeEvents_streaming = fetchTreeEvents_streaming(
before_idsSignatory_streaming ++ before_idsObserver_streaming ++ before_idsNonStakeholders_streaming
)
val after_ids_pointwise =
executeSql(backend.event.transactionPointwiseQueries.fetchIdsFromTransactionMeta(createTxId))
val after_eventsFlat_pointwise =
executeSql(backend.event.flatTransaction(createTransactionId, filter))
fetchEventsFlat_pointwise(before_ids_pointwise.value._1, before_ids_pointwise.value._2)
val after_eventsTree_pointwise =
executeSql(backend.event.transactionTree(createTransactionId, filter))
fetchEventsTree_pointwise(before_ids_pointwise.value._1, before_ids_pointwise.value._2)
val after_txMeta =
executeSql(backend.event.transactionPointwiseQueries.fetchIdsFromTransactionMeta(createTxId))
val after_rawEvents = executeSql(backend.event.rawEvents(0, 1L))
val after_activeContracts = executeSql(
backend.event
@ -543,7 +566,9 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB
after_idsNonStakeholders_streaming should not be empty
after_treeEvents_streaming should not be empty
after_eventsFlat_pointwise shouldBe empty
after_ids_pointwise shouldBe empty
after_eventsTree_pointwise shouldBe empty
after_txMeta shouldBe empty
after_rawEvents should not be empty
after_activeContracts should have size 1
}
@ -766,4 +791,31 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB
before should not be empty
after shouldBe empty
}
private def fetchEventsTree_pointwise(
firstEventSequentialId: Long,
lastEventSequentialId: Long,
): Seq[EventStorageBackend.Entry[Raw.TreeEvent]] = {
executeSql(
backend.event.transactionPointwiseQueries.fetchTreeTransactionEvents(
firstEventSequentialId = firstEventSequentialId,
lastEventSequentialId = lastEventSequentialId,
requestingParties = Set(signatoryParty, observerParty, nonStakeholderInformeeParty),
)
)
}
private def fetchEventsFlat_pointwise(
firstEventSequentialId: Long,
lastEventSequentialId: Long,
): Seq[EventStorageBackend.Entry[Raw.FlatEvent]] = {
executeSql(
backend.event.transactionPointwiseQueries.fetchFlatTransactionEvents(
firstEventSequentialId = firstEventSequentialId,
lastEventSequentialId = lastEventSequentialId,
requestingParties = Set(signatoryParty, observerParty, nonStakeholderInformeeParty),
)
)
}
}

View File

@ -6,7 +6,6 @@ package com.daml.platform.store.backend
import com.daml.ledger.api.v1.contract_metadata.ContractMetadata
import com.daml.lf.crypto.Hash
import com.daml.lf.data.{Bytes, Ref}
import com.daml.platform.store.backend.EventStorageBackend.FilterParams
import com.daml.platform.store.backend.common.{
EventPayloadSourceForFlatTx,
EventPayloadSourceForTreeTx,
@ -82,14 +81,10 @@ private[backend] trait StorageBackendTestsTransactionStreamsEvents
): Assertion = {
val someParty = Ref.Party.assertFromString(signatory)
val createTransactionId = dtoTransactionId(create)
executeSql(backend.parameter.initializeParameters(someIdentityParams))
executeSql(ingest(Vector(create), _))
executeSql(updateLedgerEnd(offset(1), 1L))
val filter = FilterParams(Set(someParty), Set.empty)
val flatTransactionEvents = executeSql(
backend.event.transactionStreamingQueries.fetchEventPayloadsFlat(
EventPayloadSourceForFlatTx.Create
@ -100,8 +95,12 @@ private[backend] trait StorageBackendTestsTransactionStreamsEvents
EventPayloadSourceForTreeTx.Create
)(eventSequentialIds = Seq(1L), Set(someParty))
)
val flatTransaction = executeSql(backend.event.flatTransaction(createTransactionId, filter))
val transactionTree = executeSql(backend.event.transactionTree(createTransactionId, filter))
val flatTransaction = executeSql(
backend.event.transactionPointwiseQueries.fetchFlatTransactionEvents(1L, 1L, Set(someParty))
)
val transactionTree = executeSql(
backend.event.transactionPointwiseQueries.fetchTreeTransactionEvents(1L, 1L, Set(someParty))
)
val acs = executeSql(backend.event.activeContractEventBatch(Seq(1L), Set(someParty), 1L))
extractContractMetadataFrom[FlatEvent.Created, FlatEvent](