Experiment: replacing row OFFSET with ledger offset (#6083)

* Experiment: replacing row OFFSET with ledger offset in the flat transactions query for one party.

if this improves the perf numbers, the rest of the queries can be updated.

* Flat transaction query optimization:

replacing row offset with ledger offset

* Flat transaction query optimization:

replacing row offset with ledger offset

* transaction tree query optimization

changelog_begin

[Sandbox-next/Postgres]
Flat Transaction Stream, Transaction Tree Stream SQL query optimizations.
Pagination based on Ledger Offset instead of SQL Row Offset.

changelog_end

* Addressing code review comments
This commit is contained in:
Leonid Shlyapnikov 2020-05-29 11:49:46 -04:00 committed by GitHub
parent 655c0a5425
commit 12c05c408a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 276 additions and 143 deletions

View File

@ -52,7 +52,7 @@ object InMemoryState {
type StateValue = Bytes
// The first element will never be read because begin offsets are exclusive.
private val Beginning = LedgerRecord(Offset.begin, ByteString.EMPTY, ByteString.EMPTY)
private val Beginning = LedgerRecord(Offset.beforeBegin, ByteString.EMPTY, ByteString.EMPTY)
def empty =
new InMemoryState(

View File

@ -34,7 +34,7 @@ final case class Offset(bytes: Bytes) extends Ordered[Offset] {
object Offset {
val begin: Offset = Offset.fromByteArray(Array(0: Byte))
val beforeBegin: Offset = Offset.fromByteArray(Array.empty[Byte])
def fromByteArray(bytes: Array[Byte]) = new Offset(Bytes.fromByteArray(bytes))

View File

@ -108,7 +108,7 @@ abstract class LedgerBackedIndexService(
lazy val currentEnd: Offset = ledger.ledgerEnd
domainOffset: LedgerOffset =>
domainOffset match {
case LedgerOffset.LedgerBegin => Source.single(Offset.begin)
case LedgerOffset.LedgerBegin => Source.single(Offset.beforeBegin)
case LedgerOffset.LedgerEnd => Source.single(currentEnd)
case LedgerOffset.Absolute(offset) =>
ApiOffset.fromString(offset).fold(Source.failed, off => Source.single(off))

View File

@ -48,7 +48,7 @@ private[ledger] class LedgerEntries[T](identify: T => String) {
newOffset
}
private val dispatcher = Dispatcher[Offset]("inmemory-ledger", Offset.begin, ledgerEnd)
private val dispatcher = Dispatcher[Offset]("inmemory-ledger", Offset.beforeBegin, ledgerEnd)
def getSource(
startExclusive: Option[Offset],

View File

@ -50,7 +50,7 @@ abstract class BaseLedger(
protected final val dispatcher: Dispatcher[Offset] = Dispatcher[Offset](
"sql-ledger",
Offset.begin,
Offset.beforeBegin,
headAtInitialization
)
@ -66,7 +66,7 @@ abstract class BaseLedger(
verbose: Boolean,
): Source[(Offset, GetTransactionsResponse), NotUsed] =
dispatcher.startingAt(
startExclusive.getOrElse(Offset.begin),
startExclusive.getOrElse(Offset.beforeBegin),
RangeSource(ledgerDao.transactionsReader.getFlatTransactions(_, _, filter, verbose)),
endInclusive
)
@ -77,7 +77,7 @@ abstract class BaseLedger(
requestingParties: Set[Party],
verbose: Boolean): Source[(Offset, GetTransactionTreesResponse), NotUsed] =
dispatcher.startingAt(
startExclusive.getOrElse(Offset.begin),
startExclusive.getOrElse(Offset.beforeBegin),
RangeSource(
ledgerDao.transactionsReader.getTransactionTrees(_, _, requestingParties, verbose)),
endInclusive
@ -91,7 +91,7 @@ abstract class BaseLedger(
applicationId: ApplicationId,
parties: Set[Party]): Source[(Offset, CompletionStreamResponse), NotUsed] =
dispatcher.startingAt(
startExclusive.getOrElse(Offset.begin),
startExclusive.getOrElse(Offset.beforeBegin),
RangeSource(ledgerDao.completions.getCommandCompletions(_, _, applicationId.unwrap, parties)),
endInclusive
)
@ -158,7 +158,7 @@ abstract class BaseLedger(
override def configurationEntries(
startExclusive: Option[Offset]): Source[(Offset, ConfigurationEntry), NotUsed] =
dispatcher.startingAt(
startExclusive.getOrElse(Offset.begin),
startExclusive.getOrElse(Offset.beforeBegin),
RangeSource(ledgerDao.getConfigurationEntries))
override def deduplicateCommand(

View File

@ -116,7 +116,7 @@ private class JdbcLedgerDao(
override def lookupLedgerEnd(): Future[Offset] =
dbDispatcher.executeSql(metrics.daml.index.db.getLedgerEnd) { implicit conn =>
SQL_SELECT_LEDGER_END
.as(offset("ledger_end").?.map(_.getOrElse(Offset.begin)).single)
.as(offset("ledger_end").?.map(_.getOrElse(Offset.beforeBegin)).single)
}
private val SQL_SELECT_INITIAL_LEDGER_END = SQL("select ledger_end from parameters")

View File

@ -6,6 +6,7 @@ package com.daml.platform.store.dao
import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.dec.DirectExecutionContext
import com.daml.ledger.participant.state.v1.Offset
import scala.concurrent.Future
@ -44,4 +45,41 @@ object PaginatingAsyncStream {
}
.flatMapConcat(Source(_))
}
/**
* Concatenates the results of multiple asynchronous calls into
* a single [[Source]], passing the last seen event's ledger [[Offset]] and node index to the
* next iteration query, so it can continue reading events from this point.
*
* This is to implement pagination based on ledger offset and event node index.
* The main purpose of the pagination is to break down large queries
* into smaller batches. The reason for this is that we are currently using
* simple blocking JDBC APIs and a long-running stream would end up
* occupying a thread in the DB pool, severely limiting the ability
* of keeping multiple, concurrent, long-running streams while serving
* lookup calls.
*
* @param initialOffset initial ledger [[Offset]]
* @param extractOffsetAndNodeIndex function that extracts [[Offset]] and node index from the result entry of type [[T]]
* @param query a function that takes [[Offset]] and optional node index to start pagination from
* @tparam T the type of the items returned in each call
*/
def streamFrom[T](initialOffset: Offset, extractOffsetAndNodeIndex: T => (Offset, Int))(
query: (Offset, Option[Int]) => Future[Vector[T]]
): Source[T, NotUsed] = {
Source
.unfoldAsync(Option((initialOffset, Option.empty[Int]))) {
case None =>
Future.successful(None) // finished reading the whole thing
case Some((prevOffset, prevNodeIndex)) =>
query(prevOffset, prevNodeIndex).map { result =>
val newState = result.lastOption.map { t =>
val event: (Offset, Int) = extractOffsetAndNodeIndex(t)
(event._1, Some(event._2))
}
Some((newState, result))
}(DirectExecutionContext) // run in the same thread as the query, avoid context switch for a cheap operation
}
.flatMapConcat(Source(_))
}
}

View File

@ -6,7 +6,7 @@ package com.daml.platform.store.dao.events
import java.io.InputStream
import java.time.Instant
import anorm.SqlParser.{array, binaryStream, bool, str}
import anorm.SqlParser.{array, binaryStream, bool, str, int}
import anorm.{RowParser, ~}
import com.daml.ledger.participant.state.v1.Offset
import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
@ -43,6 +43,7 @@ private[events] object EventsTable
final case class Entry[+E](
eventOffset: Offset,
transactionId: String,
nodeIndex: Int,
ledgerEffectiveTime: Instant,
commandId: String,
workflowId: String,
@ -164,11 +165,13 @@ private[events] object EventsTable
private[events] trait EventsTable {
private type SharedRow =
Offset ~ String ~ String ~ String ~ Instant ~ Identifier ~ Option[String] ~ Option[String] ~ Array[
String]
Offset ~ String ~ Int ~ String ~ String ~ Instant ~ Identifier ~ Option[String] ~ Option[String] ~
Array[String]
private val sharedRow: RowParser[SharedRow] =
offset("event_offset") ~
str("transaction_id") ~
int("node_index") ~
str("event_id") ~
str("contract_id") ~
instant("ledger_effective_time") ~

View File

@ -12,10 +12,11 @@ private[events] trait EventsTableFlatEvents { this: EventsTable =>
private val createdFlatEventParser: RowParser[EventsTable.Entry[Raw.FlatEvent.Created]] =
createdEventRow map {
case eventOffset ~ transactionId ~ eventId ~ contractId ~ ledgerEffectiveTime ~ templateId ~ commandId ~ workflowId ~ eventWitnesses ~ createArgument ~ createSignatories ~ createObservers ~ createAgreementText ~ createKeyValue =>
case eventOffset ~ transactionId ~ nodeIndex ~ eventId ~ contractId ~ ledgerEffectiveTime ~ templateId ~ commandId ~ workflowId ~ eventWitnesses ~ createArgument ~ createSignatories ~ createObservers ~ createAgreementText ~ createKeyValue =>
EventsTable.Entry(
eventOffset = eventOffset,
transactionId = transactionId,
nodeIndex = nodeIndex,
ledgerEffectiveTime = ledgerEffectiveTime,
commandId = commandId.getOrElse(""),
workflowId = workflowId.getOrElse(""),
@ -35,10 +36,11 @@ private[events] trait EventsTableFlatEvents { this: EventsTable =>
private val archivedFlatEventParser: RowParser[EventsTable.Entry[Raw.FlatEvent.Archived]] =
archivedEventRow map {
case eventOffset ~ transactionId ~ eventId ~ contractId ~ ledgerEffectiveTime ~ templateId ~ commandId ~ workflowId ~ eventWitnesses =>
case eventOffset ~ transactionId ~ nodeIndex ~ eventId ~ contractId ~ ledgerEffectiveTime ~ templateId ~ commandId ~ workflowId ~ eventWitnesses =>
EventsTable.Entry(
eventOffset = eventOffset,
transactionId = transactionId,
nodeIndex = nodeIndex,
ledgerEffectiveTime = ledgerEffectiveTime,
commandId = commandId.getOrElse(""),
workflowId = workflowId.getOrElse(""),
@ -58,6 +60,7 @@ private[events] trait EventsTableFlatEvents { this: EventsTable =>
Seq(
"event_offset",
"transaction_id",
"node_index",
"ledger_effective_time",
"workflow_id",
"participant_events.event_id",
@ -131,9 +134,14 @@ private[events] trait EventsTableFlatEvents { this: EventsTable =>
endInclusive: Offset,
filter: FilterRelation,
pageSize: Int,
rowOffset: Long,
previousEventNodeIndex: Option[Int],
): SimpleSql[Row] =
getFlatTransactionsQueries((startExclusive, endInclusive), filter, pageSize, rowOffset)
getFlatTransactionsQueries(
(startExclusive, endInclusive),
filter,
pageSize,
previousEventNodeIndex,
)
private val getActiveContractsQueries =
new EventsTableFlatEventsRangeQueries.GetActiveContracts(
@ -145,11 +153,16 @@ private[events] trait EventsTableFlatEvents { this: EventsTable =>
)
def preparePagedGetActiveContracts(
lastOffsetFromPrevPage: Offset,
activeAt: Offset,
filter: FilterRelation,
pageSize: Int,
rowOffset: Long,
lastEventNodeIndexFromPrevPage: Option[Int]
): SimpleSql[Row] =
getActiveContractsQueries(activeAt, filter, pageSize, rowOffset)
getActiveContractsQueries(
(lastOffsetFromPrevPage, activeAt),
filter,
pageSize,
lastEventNodeIndexFromPrevPage,
)
}

View File

@ -7,6 +7,7 @@ import anorm.{Row, SimpleSql, SqlStringInterpolation}
import com.daml.ledger.participant.state.v1.Offset
import com.daml.lf.data.Ref.{Identifier => ApiIdentifier}
import com.daml.platform.store.Conversions._
import com.daml.platform.store.dao.events.EventsTableQueries.previousOffsetWhereClauseValues
private[events] sealed trait EventsTableFlatEventsRangeQueries[Offset] {
@ -14,7 +15,7 @@ private[events] sealed trait EventsTableFlatEventsRangeQueries[Offset] {
offset: Offset,
party: Party,
pageSize: Int,
rowOffset: Long,
previousEventNodeIndex: Option[Int],
): SimpleSql[Row]
protected def singlePartyWithTemplates(
@ -22,14 +23,14 @@ private[events] sealed trait EventsTableFlatEventsRangeQueries[Offset] {
party: Party,
templateIds: Set[ApiIdentifier],
pageSize: Int,
rowOffset: Long,
previousEventNodeIndex: Option[Int],
): SimpleSql[Row]
protected def onlyWildcardParties(
offset: Offset,
parties: Set[Party],
pageSize: Int,
rowOffset: Long,
previousEventNodeIndex: Option[Int],
): SimpleSql[Row]
protected def sameTemplates(
@ -37,14 +38,14 @@ private[events] sealed trait EventsTableFlatEventsRangeQueries[Offset] {
parties: Set[Party],
templateIds: Set[ApiIdentifier],
pageSize: Int,
rowOffset: Long,
previousEventNodeIndex: Option[Int],
): SimpleSql[Row]
protected def mixedTemplates(
offset: Offset,
partiesAndTemplateIds: Set[(Party, ApiIdentifier)],
pageSize: Int,
rowOffset: Long,
previousEventNodeIndex: Option[Int],
): SimpleSql[Row]
protected def mixedTemplatesWithWildcardParties(
@ -52,14 +53,14 @@ private[events] sealed trait EventsTableFlatEventsRangeQueries[Offset] {
wildcardParties: Set[Party],
partiesAndTemplateIds: Set[(Party, ApiIdentifier)],
pageSize: Int,
rowOffset: Long,
previousEventNodeIndex: Option[Int],
): SimpleSql[Row]
final def apply(
offset: Offset,
filter: FilterRelation,
pageSize: Int,
rowOffset: Long,
previousEventNodeIndex: Option[Int],
): SimpleSql[Row] = {
require(filter.nonEmpty, "The request must be issued by at least one party")
@ -68,10 +69,10 @@ private[events] sealed trait EventsTableFlatEventsRangeQueries[Offset] {
val (party, templateIds) = filter.toIterator.next
if (templateIds.isEmpty) {
// Single-party request, no specific template identifier
singleWildcardParty(offset, party, pageSize, rowOffset)
singleWildcardParty(offset, party, pageSize, previousEventNodeIndex)
} else {
// Single-party request, restricted to a set of template identifiers
singlePartyWithTemplates(offset, party, templateIds, pageSize, rowOffset)
singlePartyWithTemplates(offset, party, templateIds, pageSize, previousEventNodeIndex)
}
} else {
// Multi-party requests
@ -82,7 +83,7 @@ private[events] sealed trait EventsTableFlatEventsRangeQueries[Offset] {
offset = offset,
parties = parties,
pageSize = pageSize,
rowOffset = rowOffset,
previousEventNodeIndex = previousEventNodeIndex,
)
else {
// If all parties request the same template identifier
@ -93,7 +94,7 @@ private[events] sealed trait EventsTableFlatEventsRangeQueries[Offset] {
parties = parties,
templateIds = templateIds,
pageSize = pageSize,
rowOffset = rowOffset,
previousEventNodeIndex = previousEventNodeIndex,
)
} else {
// If there are different template identifier but there are no wildcard parties
@ -104,7 +105,7 @@ private[events] sealed trait EventsTableFlatEventsRangeQueries[Offset] {
offset,
partiesAndTemplateIds = partiesAndTemplateIds,
pageSize = pageSize,
rowOffset = rowOffset,
previousEventNodeIndex = previousEventNodeIndex,
)
} else {
// If there are wildcard parties and different template identifiers
@ -113,7 +114,7 @@ private[events] sealed trait EventsTableFlatEventsRangeQueries[Offset] {
wildcardParties,
partiesAndTemplateIds,
pageSize,
rowOffset,
previousEventNodeIndex,
)
}
}
@ -137,45 +138,59 @@ private[events] object EventsTableFlatEventsRangeQueries {
between: (Offset, Offset),
party: Party,
pageSize: Int,
rowOffset: Long,
): SimpleSql[Row] =
SQL"select #$selectColumns, array[$party] as event_witnesses, case when submitter = $party then command_id else '' end as command_id from #$flatEventsTable where event_offset > ${between._1} and event_offset <= ${between._2} and event_witness = $party order by (#$orderByColumns) limit $pageSize offset $rowOffset"
previousEventNodeIndex: Option[Int],
): SimpleSql[Row] = {
val (prevOffset, prevNodeIndex) =
previousOffsetWhereClauseValues(between, previousEventNodeIndex)
SQL"""select #$selectColumns, array[$party] as event_witnesses, case when submitter = $party then command_id else '' end as command_id from #$flatEventsTable where (event_offset > ${between._1} or (event_offset = $prevOffset and node_index > $prevNodeIndex)) and event_offset <= ${between._2} and event_witness = $party order by (#$orderByColumns) limit $pageSize"""
}
override protected def singlePartyWithTemplates(
between: (Offset, Offset),
party: Party,
templateIds: Set[ApiIdentifier],
pageSize: Int,
rowOffset: Long,
): SimpleSql[Row] =
SQL"select #$selectColumns, array[$party] as event_witnesses, case when submitter = $party then command_id else '' end as command_id from #$flatEventsTable where event_offset > ${between._1} and event_offset <= ${between._2} and event_witness = $party and template_id in ($templateIds) group by (#$groupByColumns) order by (#$orderByColumns) limit $pageSize offset $rowOffset"
previousEventNodeIndex: Option[Int],
): SimpleSql[Row] = {
val (prevOffset, prevNodeIndex) =
previousOffsetWhereClauseValues(between, previousEventNodeIndex)
SQL"select #$selectColumns, array[$party] as event_witnesses, case when submitter = $party then command_id else '' end as command_id from #$flatEventsTable where (event_offset > ${between._1} or (event_offset = $prevOffset and node_index > $prevNodeIndex)) and event_offset <= ${between._2} and event_witness = $party and template_id in ($templateIds) group by (#$groupByColumns) order by (#$orderByColumns) limit $pageSize"
}
protected def onlyWildcardParties(
between: (Offset, Offset),
parties: Set[Party],
pageSize: Int,
rowOffset: Long,
): SimpleSql[Row] =
SQL"select #$selectColumns, #$witnessesAggregation, case when submitter in ($parties) then command_id else '' end as command_id from #$flatEventsTable where event_offset > ${between._1} and event_offset <= ${between._2} and event_witness in ($parties) group by (#$groupByColumns) order by (#$orderByColumns) limit $pageSize offset $rowOffset"
previousEventNodeIndex: Option[Int],
): SimpleSql[Row] = {
val (prevOffset, prevNodeIndex) =
previousOffsetWhereClauseValues(between, previousEventNodeIndex)
SQL"select #$selectColumns, #$witnessesAggregation, case when submitter in ($parties) then command_id else '' end as command_id from #$flatEventsTable where (event_offset > ${between._1} or (event_offset = $prevOffset and node_index > $prevNodeIndex)) and event_offset <= ${between._2} and event_witness in ($parties) group by (#$groupByColumns) order by (#$orderByColumns) limit $pageSize"
}
protected def sameTemplates(
between: (Offset, Offset),
parties: Set[Party],
templateIds: Set[ApiIdentifier],
pageSize: Int,
rowOffset: Long,
): SimpleSql[Row] =
SQL"select #$selectColumns, #$witnessesAggregation, case when submitter in ($parties) then command_id else '' end as command_id from #$flatEventsTable where event_offset > ${between._1} and event_offset <= ${between._2} and event_witness in ($parties) and template_id in ($templateIds) group by (#$groupByColumns) order by (#$orderByColumns) limit $pageSize offset $rowOffset"
previousEventNodeIndex: Option[Int],
): SimpleSql[Row] = {
val (prevOffset, prevNodeIndex) =
previousOffsetWhereClauseValues(between, previousEventNodeIndex)
SQL"select #$selectColumns, #$witnessesAggregation, case when submitter in ($parties) then command_id else '' end as command_id from #$flatEventsTable where (event_offset > ${between._1} or (event_offset = $prevOffset and node_index > $prevNodeIndex)) and event_offset <= ${between._2} and event_witness in ($parties) and template_id in ($templateIds) group by (#$groupByColumns) order by (#$orderByColumns) limit $pageSize"
}
protected def mixedTemplates(
between: (Offset, Offset),
partiesAndTemplateIds: Set[(Party, ApiIdentifier)],
pageSize: Int,
rowOffset: Long,
previousEventNodeIndex: Option[Int],
): SimpleSql[Row] = {
val parties = partiesAndTemplateIds.map(_._1)
val partiesAndTemplateIdsAsString = partiesAndTemplateIds.map { case (p, i) => s"$p&$i" }
SQL"select #$selectColumns, #$witnessesAggregation, case when submitter in ($parties) then command_id else '' end as command_id from #$flatEventsTable where event_offset > ${between._1} and event_offset <= ${between._2} and concat(event_witness, '&', template_id) in ($partiesAndTemplateIdsAsString) group by (#$groupByColumns) order by (#$orderByColumns) limit $pageSize offset $rowOffset"
val (prevOffset, prevNodeIndex) =
previousOffsetWhereClauseValues(between, previousEventNodeIndex)
SQL"select #$selectColumns, #$witnessesAggregation, case when submitter in ($parties) then command_id else '' end as command_id from #$flatEventsTable where (event_offset > ${between._1} or (event_offset = $prevOffset and node_index > $prevNodeIndex)) and event_offset <= ${between._2} and concat(event_witness, '&', template_id) in ($partiesAndTemplateIdsAsString) group by (#$groupByColumns) order by (#$orderByColumns) limit $pageSize"
}
protected def mixedTemplatesWithWildcardParties(
@ -183,11 +198,13 @@ private[events] object EventsTableFlatEventsRangeQueries {
wildcardParties: Set[Party],
partiesAndTemplateIds: Set[(Party, ApiIdentifier)],
pageSize: Int,
rowOffset: Long,
previousEventNodeIndex: Option[Int],
): SimpleSql[Row] = {
val parties = wildcardParties ++ partiesAndTemplateIds.map(_._1)
val partiesAndTemplateIdsAsString = partiesAndTemplateIds.map { case (p, i) => s"$p&$i" }
SQL"select #$selectColumns, #$witnessesAggregation, case when submitter in ($parties) then command_id else '' end as command_id from #$flatEventsTable where event_offset > ${between._1} and event_offset <= ${between._2} and (event_witness in ($wildcardParties) or concat(event_witness, '&', template_id) in ($partiesAndTemplateIdsAsString)) group by (#$groupByColumns) order by (#$orderByColumns) limit $pageSize offset $rowOffset"
val (prevOffset, prevNodeIndex) =
previousOffsetWhereClauseValues(between, previousEventNodeIndex)
SQL"select #$selectColumns, #$witnessesAggregation, case when submitter in ($parties) then command_id else '' end as command_id from #$flatEventsTable where (event_offset > ${between._1} or (event_offset = $prevOffset and node_index > $prevNodeIndex)) and event_offset <= ${between._2} and (event_witness in ($wildcardParties) or concat(event_witness, '&', template_id) in ($partiesAndTemplateIdsAsString)) group by (#$groupByColumns) order by (#$orderByColumns) limit $pageSize"
}
}
@ -198,63 +215,79 @@ private[events] object EventsTableFlatEventsRangeQueries {
flatEventsTable: String,
groupByColumns: String,
orderByColumns: String,
) extends EventsTableFlatEventsRangeQueries[Offset] {
) extends EventsTableFlatEventsRangeQueries[(Offset, Offset)] {
override protected def singleWildcardParty(
activeAt: Offset,
between: (Offset, Offset),
party: Party,
pageSize: Int,
rowOffset: Long,
): SimpleSql[Row] =
SQL"select #$selectColumns, array[$party] as event_witnesses, case when submitter = $party then command_id else '' end as command_id from #$flatEventsTable where create_argument is not null and event_offset <= $activeAt and (create_consumed_at is null or create_consumed_at > $activeAt) and event_witness = $party order by (#$orderByColumns) limit $pageSize offset $rowOffset"
previousEventNodeIndex: Option[Int],
): SimpleSql[Row] = {
val (prevOffset, prevNodeIndex) =
previousOffsetWhereClauseValues(between, previousEventNodeIndex)
SQL"select #$selectColumns, array[$party] as event_witnesses, case when submitter = $party then command_id else '' end as command_id from #$flatEventsTable where create_argument is not null and (event_offset > ${between._1} or (event_offset = $prevOffset and node_index > $prevNodeIndex)) and event_offset <= ${between._2} and (create_consumed_at is null or create_consumed_at > ${between._2}) and event_witness = $party order by (#$orderByColumns) limit $pageSize"
}
override protected def singlePartyWithTemplates(
activeAt: Offset,
between: (Offset, Offset),
party: Party,
templateIds: Set[ApiIdentifier],
pageSize: Int,
rowOffset: Long,
): SimpleSql[Row] =
SQL"select #$selectColumns, array[$party] as event_witnesses, case when submitter = $party then command_id else '' end as command_id from #$flatEventsTable where create_argument is not null and event_offset <= $activeAt and (create_consumed_at is null or create_consumed_at > $activeAt) and event_witness = $party and template_id in ($templateIds) order by (#$orderByColumns) limit $pageSize offset $rowOffset"
previousEventNodeIndex: Option[Int],
): SimpleSql[Row] = {
val (prevOffset, prevNodeIndex) =
previousOffsetWhereClauseValues(between, previousEventNodeIndex)
SQL"select #$selectColumns, array[$party] as event_witnesses, case when submitter = $party then command_id else '' end as command_id from #$flatEventsTable where create_argument is not null and (event_offset > ${between._1} or (event_offset = $prevOffset and node_index > $prevNodeIndex)) and event_offset <= ${between._2} and (create_consumed_at is null or create_consumed_at > ${between._2}) and event_witness = $party and template_id in ($templateIds) order by (#$orderByColumns) limit $pageSize"
}
def onlyWildcardParties(
activeAt: Offset,
between: (Offset, Offset),
parties: Set[Party],
pageSize: Int,
rowOffset: Long,
): SimpleSql[Row] =
SQL"select #$selectColumns, #$witnessesAggregation, case when submitter in ($parties) then command_id else '' end as command_id from #$flatEventsTable where create_argument is not null and event_offset <= $activeAt and (create_consumed_at is null or create_consumed_at > $activeAt) and event_witness in ($parties) group by (#$groupByColumns) order by (#$orderByColumns) limit $pageSize offset $rowOffset"
previousEventNodeIndex: Option[Int],
): SimpleSql[Row] = {
val (prevOffset, prevNodeIndex) =
previousOffsetWhereClauseValues(between, previousEventNodeIndex)
SQL"select #$selectColumns, #$witnessesAggregation, case when submitter in ($parties) then command_id else '' end as command_id from #$flatEventsTable where create_argument is not null and (event_offset > ${between._1} or (event_offset = $prevOffset and node_index > $prevNodeIndex)) and event_offset <= ${between._2} and (create_consumed_at is null or create_consumed_at > ${between._2}) and event_witness in ($parties) group by (#$groupByColumns) order by (#$orderByColumns) limit $pageSize"
}
def sameTemplates(
activeAt: Offset,
between: (Offset, Offset),
parties: Set[Party],
templateIds: Set[ApiIdentifier],
pageSize: Int,
rowOffset: Long,
): SimpleSql[Row] =
SQL"select #$selectColumns, #$witnessesAggregation, case when submitter in ($parties) then command_id else '' end as command_id from #$flatEventsTable where create_argument is not null and event_offset <= $activeAt and (create_consumed_at is null or create_consumed_at > $activeAt) and event_witness in ($parties) and template_id in ($templateIds) group by (#$groupByColumns) order by (#$orderByColumns) limit $pageSize offset $rowOffset"
previousEventNodeIndex: Option[Int],
): SimpleSql[Row] = {
val (prevOffset, prevNodeIndex) =
previousOffsetWhereClauseValues(between, previousEventNodeIndex)
SQL"select #$selectColumns, #$witnessesAggregation, case when submitter in ($parties) then command_id else '' end as command_id from #$flatEventsTable where create_argument is not null and (event_offset > ${between._1} or (event_offset = $prevOffset and node_index > $prevNodeIndex)) and event_offset <= ${between._2} and (create_consumed_at is null or create_consumed_at > ${between._2}) and event_witness in ($parties) and template_id in ($templateIds) group by (#$groupByColumns) order by (#$orderByColumns) limit $pageSize"
}
def mixedTemplates(
activeAt: Offset,
between: (Offset, Offset),
partiesAndTemplateIds: Set[(Party, ApiIdentifier)],
pageSize: Int,
rowOffset: Long,
previousEventNodeIndex: Option[Int],
): SimpleSql[Row] = {
val parties = partiesAndTemplateIds.map(_._1)
val partiesAndTemplateIdsAsString = partiesAndTemplateIds.map { case (p, i) => s"$p&$i" }
SQL"select #$selectColumns, #$witnessesAggregation, case when submitter in ($parties) then command_id else '' end as command_id from #$flatEventsTable where create_argument is not null and event_offset <= $activeAt and (create_consumed_at is null or create_consumed_at > $activeAt) and concat(event_witness, '&', template_id) in ($partiesAndTemplateIdsAsString) group by (#$groupByColumns) order by (#$orderByColumns) limit $pageSize offset $rowOffset"
val (prevOffset, prevNodeIndex) =
previousOffsetWhereClauseValues(between, previousEventNodeIndex)
SQL"select #$selectColumns, #$witnessesAggregation, case when submitter in ($parties) then command_id else '' end as command_id from #$flatEventsTable where create_argument is not null and (event_offset > ${between._1} or (event_offset = $prevOffset and node_index > $prevNodeIndex)) and event_offset <= ${between._2} and (create_consumed_at is null or create_consumed_at > ${between._2}) and concat(event_witness, '&', template_id) in ($partiesAndTemplateIdsAsString) group by (#$groupByColumns) order by (#$orderByColumns) limit $pageSize"
}
def mixedTemplatesWithWildcardParties(
activeAt: Offset,
between: (Offset, Offset),
wildcardParties: Set[Party],
partiesAndTemplateIds: Set[(Party, ApiIdentifier)],
pageSize: Int,
rowOffset: Long,
previousEventNodeIndex: Option[Int],
): SimpleSql[Row] = {
val parties = wildcardParties ++ partiesAndTemplateIds.map(_._1)
val partiesAndTemplateIdsAsString = partiesAndTemplateIds.map { case (p, i) => s"$p&$i" }
SQL"select #$selectColumns, #$witnessesAggregation, case when submitter in ($parties) then command_id else '' end as command_id from #$flatEventsTable where create_argument is not null and event_offset <= $activeAt and (create_consumed_at is null or create_consumed_at > $activeAt) and (event_witness in ($wildcardParties) or concat(event_witness, '&', template_id) in ($partiesAndTemplateIdsAsString)) group by (#$groupByColumns) order by (#$orderByColumns) limit $pageSize offset $rowOffset"
val (prevOffset, prevNodeIndex) =
previousOffsetWhereClauseValues(between, previousEventNodeIndex)
SQL"select #$selectColumns, #$witnessesAggregation, case when submitter in ($parties) then command_id else '' end as command_id from #$flatEventsTable where create_argument is not null and (event_offset > ${between._1} or (event_offset = $prevOffset and node_index > $prevNodeIndex)) and event_offset <= ${between._2} and (create_consumed_at is null or create_consumed_at > ${between._2}) and (event_witness in ($wildcardParties) or concat(event_witness, '&', template_id) in ($partiesAndTemplateIdsAsString)) group by (#$groupByColumns) order by (#$orderByColumns) limit $pageSize"
}
}

View File

@ -0,0 +1,25 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.store.dao.events
import com.daml.ledger.participant.state.v1.Offset
object EventsTableQueries {
private val NonExistingOffsetNodeIndexPair: (Offset, Int) =
(Offset.beforeBegin, Integer.MAX_VALUE)
def previousOffsetWhereClauseValues(
between: (Offset, Offset),
lastEventNodeIndexFromPreviousPage: Option[Int]
): (Offset, Int) = previousOffsetWhereClauseValues(between._1, lastEventNodeIndexFromPreviousPage)
def previousOffsetWhereClauseValues(
lastOffsetFromPreviousPage: Offset,
lastEventNodeIndexFromPreviousPage: Option[Int]
): (Offset, Int) =
lastEventNodeIndexFromPreviousPage
.map(x => (lastOffsetFromPreviousPage, x))
.getOrElse(NonExistingOffsetNodeIndexPair)
}

View File

@ -7,15 +7,17 @@ import anorm.{Row, RowParser, SimpleSql, SqlStringInterpolation, ~}
import com.daml.ledger.participant.state.v1.Offset
import com.daml.ledger.TransactionId
import com.daml.platform.store.Conversions._
import com.daml.platform.store.dao.events.EventsTableQueries.previousOffsetWhereClauseValues
private[events] trait EventsTableTreeEvents { this: EventsTable =>
private val createdTreeEventParser: RowParser[EventsTable.Entry[Raw.TreeEvent.Created]] =
createdEventRow map {
case eventOffset ~ transactionId ~ eventId ~ contractId ~ ledgerEffectiveTime ~ templateId ~ commandId ~ workflowId ~ eventWitnesses ~ createArgument ~ createSignatories ~ createObservers ~ createAgreementText ~ createKeyValue =>
case eventOffset ~ transactionId ~ nodeIndex ~ eventId ~ contractId ~ ledgerEffectiveTime ~ templateId ~ commandId ~ workflowId ~ eventWitnesses ~ createArgument ~ createSignatories ~ createObservers ~ createAgreementText ~ createKeyValue =>
EventsTable.Entry(
eventOffset = eventOffset,
transactionId = transactionId,
nodeIndex = nodeIndex,
ledgerEffectiveTime = ledgerEffectiveTime,
commandId = commandId.getOrElse(""),
workflowId = workflowId.getOrElse(""),
@ -35,10 +37,11 @@ private[events] trait EventsTableTreeEvents { this: EventsTable =>
private val exercisedTreeEventParser: RowParser[EventsTable.Entry[Raw.TreeEvent.Exercised]] =
exercisedEventRow map {
case eventOffset ~ transactionId ~ eventId ~ contractId ~ ledgerEffectiveTime ~ templateId ~ commandId ~ workflowId ~ eventWitnesses ~ exerciseConsuming ~ exerciseChoice ~ exerciseArgument ~ exerciseResult ~ exerciseActors ~ exerciseChildEventIds =>
case eventOffset ~ transactionId ~ nodeIndex ~ eventId ~ contractId ~ ledgerEffectiveTime ~ templateId ~ commandId ~ workflowId ~ eventWitnesses ~ exerciseConsuming ~ exerciseChoice ~ exerciseArgument ~ exerciseResult ~ exerciseActors ~ exerciseChildEventIds =>
EventsTable.Entry(
eventOffset = eventOffset,
transactionId = transactionId,
nodeIndex = nodeIndex,
ledgerEffectiveTime = ledgerEffectiveTime,
commandId = commandId.getOrElse(""),
workflowId = workflowId.getOrElse(""),
@ -142,11 +145,11 @@ private[events] trait EventsTableTreeEvents { this: EventsTable =>
endInclusive: Offset,
requestingParties: Set[Party],
pageSize: Int,
rowOffset: Long,
previousEventNodeIndex: Option[Int],
): SimpleSql[Row] =
route(requestingParties)(
single = singlePartyTrees(startExclusive, endInclusive, _, pageSize, rowOffset),
multi = multiPartyTrees(startExclusive, endInclusive, _, pageSize, rowOffset),
single = singlePartyTrees(startExclusive, endInclusive, _, pageSize, previousEventNodeIndex),
multi = multiPartyTrees(startExclusive, endInclusive, _, pageSize, previousEventNodeIndex),
)
private def singlePartyTrees(
@ -154,17 +157,23 @@ private[events] trait EventsTableTreeEvents { this: EventsTable =>
endInclusive: Offset,
requestingParty: Party,
pageSize: Int,
rowOffset: Long,
): SimpleSql[Row] =
SQL"select #$selectColumns, array[$requestingParty] as event_witnesses, case when submitter = $requestingParty then command_id else '' end as command_id from #$treeEventsTable where event_offset > $startExclusive and event_offset <= $endInclusive and event_witness = $requestingParty order by (#$orderByColumns) limit $pageSize offset $rowOffset"
previousEventNodeIndex: Option[Int],
): SimpleSql[Row] = {
val (prevOffset, prevNodeIndex) =
previousOffsetWhereClauseValues(startExclusive, previousEventNodeIndex)
SQL"select #$selectColumns, array[$requestingParty] as event_witnesses, case when submitter = $requestingParty then command_id else '' end as command_id from #$treeEventsTable where (event_offset > $startExclusive or (event_offset = $prevOffset and node_index > $prevNodeIndex)) and event_offset <= $endInclusive and event_witness = $requestingParty order by (#$orderByColumns) limit $pageSize"
}
private def multiPartyTrees(
startExclusive: Offset,
endInclusive: Offset,
requestingParties: Set[Party],
pageSize: Int,
rowOffset: Long,
): SimpleSql[Row] =
SQL"select #$selectColumns, #$witnessesAggregation, case when submitter in ($requestingParties) then command_id else '' end as command_id from #$treeEventsTable where event_offset > $startExclusive and event_offset <= $endInclusive and event_witness in ($requestingParties) group by (#$groupByColumns) order by (#$orderByColumns) limit $pageSize offset $rowOffset"
previousEventNodeIndex: Option[Int],
): SimpleSql[Row] = {
val (prevOffset, prevNodeIndex) =
previousOffsetWhereClauseValues(startExclusive, previousEventNodeIndex)
SQL"select #$selectColumns, #$witnessesAggregation, case when submitter in ($requestingParties) then command_id else '' end as command_id from #$treeEventsTable where (event_offset > $startExclusive or (event_offset = $prevOffset and node_index > $prevNodeIndex)) and event_offset <= $endInclusive and event_witness in ($requestingParties) group by (#$groupByColumns) order by (#$orderByColumns) limit $pageSize"
}
}

View File

@ -7,6 +7,8 @@ import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.ledger.participant.state.v1.{Offset, TransactionId}
import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
import com.daml.ledger.api.v1.event.Event
import com.daml.ledger.api.v1.transaction.TreeEvent
import com.daml.ledger.api.v1.transaction_service.{
GetFlatTransactionResponse,
GetTransactionResponse,
@ -56,29 +58,30 @@ private[dao] final class TransactionsReader(
filter: FilterRelation,
verbose: Boolean,
): Source[(Offset, GetTransactionsResponse), NotUsed] = {
val events =
PaginatingAsyncStream(pageSize) { offset =>
val query =
EventsTable
.preparePagedGetFlatTransactions(
startExclusive = startExclusive,
endInclusive = endInclusive,
filter = filter,
pageSize = pageSize,
rowOffset = offset,
val events: Source[EventsTable.Entry[Event], NotUsed] =
PaginatingAsyncStream.streamFrom(startExclusive, eventDetails) {
(prevOffset, prevNodeIndex) =>
val query =
EventsTable
.preparePagedGetFlatTransactions(
startExclusive = prevOffset,
endInclusive = endInclusive,
filter = filter,
pageSize = pageSize,
previousEventNodeIndex = prevNodeIndex,
)
.withFetchSize(Some(pageSize))
val rawEventsFuture =
dispatcher.executeSql(dbMetrics.getFlatTransactions) { implicit connection =>
query.asVectorOf(EventsTable.rawFlatEventParser)
}
rawEventsFuture.flatMap(
rawEvents =>
Timed.future(
future = Future.traverse(rawEvents)(deserializeEntry(verbose)),
timer = dbMetrics.getFlatTransactions.translationTimer,
)
.withFetchSize(Some(pageSize))
val rawEventsFuture =
dispatcher.executeSql(dbMetrics.getFlatTransactions) { implicit connection =>
query.asVectorOf(EventsTable.rawFlatEventParser)
}
rawEventsFuture.flatMap(
rawEvents =>
Timed.future(
future = Future.traverse(rawEvents)(deserializeEntry(verbose)),
timer = dbMetrics.getFlatTransactions.translationTimer,
)
)
}
groupContiguous(events)(by = _.transactionId)
@ -88,6 +91,12 @@ private[dao] final class TransactionsReader(
}
}
private def eventDetails(a: EventsTable.Entry[Event]): (Offset, Int) =
(a.eventOffset, a.nodeIndex)
private def treeEventDetails(a: EventsTable.Entry[TreeEvent]): (Offset, Int) =
(a.eventOffset, a.nodeIndex)
def lookupFlatTransactionById(
transactionId: TransactionId,
requestingParties: Set[Party],
@ -115,29 +124,30 @@ private[dao] final class TransactionsReader(
requestingParties: Set[Party],
verbose: Boolean,
): Source[(Offset, GetTransactionTreesResponse), NotUsed] = {
val events =
PaginatingAsyncStream(pageSize) { offset =>
val query =
EventsTable
.preparePagedGetTransactionTrees(
startExclusive = startExclusive,
endInclusive = endInclusive,
requestingParties = requestingParties,
pageSize = pageSize,
rowOffset = offset,
val events: Source[EventsTable.Entry[TreeEvent], NotUsed] =
PaginatingAsyncStream.streamFrom(startExclusive, treeEventDetails) {
(prevOffset, prevNodeIndex) =>
val query =
EventsTable
.preparePagedGetTransactionTrees(
startExclusive = prevOffset,
endInclusive = endInclusive,
requestingParties = requestingParties,
pageSize = pageSize,
previousEventNodeIndex = prevNodeIndex
)
.withFetchSize(Some(pageSize))
val rawEvents =
dispatcher.executeSql(dbMetrics.getTransactionTrees) { implicit connection =>
query.asVectorOf(EventsTable.rawTreeEventParser)
}
rawEvents.flatMap(
es =>
Timed.future(
future = Future.traverse(es)(deserializeEntry(verbose)),
timer = dbMetrics.getTransactionTrees.translationTimer,
)
.withFetchSize(Some(pageSize))
val rawEvents =
dispatcher.executeSql(dbMetrics.getTransactionTrees) { implicit connection =>
query.asVectorOf(EventsTable.rawTreeEventParser)
}
rawEvents.flatMap(
es =>
Timed.future(
future = Future.traverse(es)(deserializeEntry(verbose)),
timer = dbMetrics.getTransactionTrees.translationTimer,
)
)
}
groupContiguous(events)(by = _.transactionId)
@ -174,24 +184,26 @@ private[dao] final class TransactionsReader(
verbose: Boolean,
): Source[GetActiveContractsResponse, NotUsed] = {
val events =
PaginatingAsyncStream(pageSize) { offset =>
val query =
EventsTable
.preparePagedGetActiveContracts(
activeAt = activeAt,
filter = filter,
pageSize = pageSize,
rowOffset = offset,
)
.withFetchSize(Some(pageSize))
val rawEvents =
dispatcher.executeSql(dbMetrics.getActiveContracts) { implicit connection =>
query.asVectorOf(EventsTable.rawFlatEventParser)
}
Timed.future(
future = rawEvents.flatMap(Future.traverse(_)(deserializeEntry(verbose))),
timer = dbMetrics.getActiveContracts.translationTimer,
)
PaginatingAsyncStream.streamFrom(Offset.beforeBegin, eventDetails) {
(prevOffset, prevNodeIndex) =>
val query =
EventsTable
.preparePagedGetActiveContracts(
lastOffsetFromPrevPage = prevOffset,
activeAt = activeAt,
filter = filter,
pageSize = pageSize,
lastEventNodeIndexFromPrevPage = prevNodeIndex,
)
.withFetchSize(Some(pageSize))
val rawEvents =
dispatcher.executeSql(dbMetrics.getActiveContracts) { implicit connection =>
query.asVectorOf(EventsTable.rawFlatEventParser)
}
Timed.future(
future = rawEvents.flatMap(Future.traverse(_)(deserializeEntry(verbose))),
timer = dbMetrics.getActiveContracts.translationTimer,
)
}
groupContiguous(events)(by = _.transactionId)