[LLP] Memory safe buffered ledger api streams [DPP-1020] (#13852)

* [TO BE REMOVED] OOM memory test for BufferedTransactionsReader

* [LLP] Memory-safe BufferedTransactionsReader

changelog_begin
changelog_end

* Addressed Simon's review comments

* Non-recusive stream construction

* Remove redundant in-stream buffers

* Addressed Marton's review comments

* Last round of addressing Marton's comments

* Remove MemoryBoundedBufferedTransactionsReaderSpec
This commit is contained in:
tudor-da 2022-05-23 21:06:54 +02:00 committed by GitHub
parent 97f3f4f956
commit ee9891b4b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 750 additions and 412 deletions

View File

@ -38,6 +38,7 @@ final case class Config[Extra](
commandConfig: CommandConfiguration, commandConfig: CommandConfiguration,
enableInMemoryFanOutForLedgerApi: Boolean, enableInMemoryFanOutForLedgerApi: Boolean,
eventsPageSize: Int, eventsPageSize: Int,
bufferedStreamsPageSize: Int,
eventsProcessingParallelism: Int, eventsProcessingParallelism: Int,
extra: Extra, extra: Extra,
ledgerId: String, ledgerId: String,
@ -86,6 +87,7 @@ object Config {
commandConfig = CommandConfiguration.default, commandConfig = CommandConfiguration.default,
enableInMemoryFanOutForLedgerApi = false, enableInMemoryFanOutForLedgerApi = false,
eventsPageSize = IndexConfiguration.DefaultEventsPageSize, eventsPageSize = IndexConfiguration.DefaultEventsPageSize,
bufferedStreamsPageSize = IndexConfiguration.DefaultBufferedStreamsPageSize,
eventsProcessingParallelism = IndexConfiguration.DefaultEventsProcessingParallelism, eventsProcessingParallelism = IndexConfiguration.DefaultEventsProcessingParallelism,
extra = extra, extra = extra,
ledgerId = UUID.randomUUID().toString, ledgerId = UUID.randomUUID().toString,
@ -433,6 +435,17 @@ object Config {
} }
.action((eventsPageSize, config) => config.copy(eventsPageSize = eventsPageSize)) .action((eventsPageSize, config) => config.copy(eventsPageSize = eventsPageSize))
opt[Int]("buffered-streams-page-size")
.optional()
.text(
s"Number of transactions fetched from the buffer when serving streaming calls. Default is ${IndexConfiguration.DefaultBufferedStreamsPageSize}."
)
.validate { pageSize =>
if (pageSize > 0) Right(())
else Left("buffered-streams-page-size should be strictly positive")
}
.action((pageSize, config) => config.copy(bufferedStreamsPageSize = pageSize))
opt[Int]("buffers-prefetching-parallelism") opt[Int]("buffers-prefetching-parallelism")
.optional() .optional()
.text( .text(

View File

@ -49,6 +49,7 @@ trait ConfigProvider[ExtraConfig] {
indexConfiguration = IndexConfiguration( indexConfiguration = IndexConfiguration(
eventsPageSize = config.eventsPageSize, eventsPageSize = config.eventsPageSize,
eventsProcessingParallelism = config.eventsProcessingParallelism, eventsProcessingParallelism = config.eventsProcessingParallelism,
bufferedStreamsPageSize = config.bufferedStreamsPageSize,
acsIdPageSize = config.acsIdPageSize, acsIdPageSize = config.acsIdPageSize,
acsIdFetchingParallelism = config.acsIdFetchingParallelism, acsIdFetchingParallelism = config.acsIdFetchingParallelism,
acsContractFetchingParallelism = config.acsContractFetchingParallelism, acsContractFetchingParallelism = config.acsContractFetchingParallelism,

View File

@ -639,35 +639,24 @@ final class Metrics(val registry: MetricRegistry) {
val prune: Timer = registry.timer(Prefix :+ "prune") val prune: Timer = registry.timer(Prefix :+ "prune")
val getTransactionMetering: Timer = registry.timer(Prefix :+ "get_transaction_metering") val getTransactionMetering: Timer = registry.timer(Prefix :+ "get_transaction_metering")
object streamsBuffer { case class Buffer(bufferName: String) {
private val Prefix: MetricName = index.Prefix :+ "streams_buffer" private val Prefix: MetricName = index.Prefix :+ s"${bufferName}_buffer"
def push(qualifier: String): Timer = registry.timer(Prefix :+ qualifier :+ "push") val push: Timer = registry.timer(Prefix :+ "push")
def slice(qualifier: String): Timer = registry.timer(Prefix :+ qualifier :+ "slice") val slice: Timer = registry.timer(Prefix :+ "slice")
def prune(qualifier: String): Timer = registry.timer(Prefix :+ qualifier :+ "prune") val prune: Timer = registry.timer(Prefix :+ "prune")
val transactionTreesTotal: Counter = val sliceSize: Histogram = registry.histogram(Prefix :+ "slice_size")
registry.counter(Prefix :+ "transaction_trees_total") }
val transactionTreesBuffered: Counter =
registry.counter(Prefix :+ "transaction_trees_buffered")
val flatTransactionsTotal: Counter = case class BufferedReader(streamName: String) {
registry.counter(Prefix :+ "flat_transactions_total") private val Prefix: MetricName = index.Prefix :+ s"${streamName}_buffer_reader"
val flatTransactionsBuffered: Counter =
registry.counter(Prefix :+ "flat_transactions_buffered")
val getTransactionTrees: Timer = val fetchedTotal: Counter = registry.counter(Prefix :+ "fetched_total")
registry.timer(Prefix :+ "get_transaction_trees") val fetchedBuffered: Counter = registry.counter(Prefix :+ "fetched_buffered")
val getFlatTransactions: Timer = val fetchTimer: Timer = registry.timer(Prefix :+ "fetch")
registry.timer(Prefix :+ "get_flat_transactions") val conversion: Timer = registry.timer(Prefix :+ "conversion")
val bufferSize: Counter = registry.counter(Prefix :+ "buffer_size")
val toFlatTransactions: Timer = registry.timer(Prefix :+ "to_flat_transactions")
val toTransactionTrees: Timer = registry.timer(Prefix :+ "to_transaction_trees")
val transactionTreesBufferSize: Counter =
registry.counter(Prefix :+ "transaction_trees_buffer_size")
val flatTransactionsBufferSize: Counter =
registry.counter(Prefix :+ "flat_transactions_buffer_size")
} }
val getContractStateEventsChunkSize: Histogram = val getContractStateEventsChunkSize: Histogram =

View File

@ -77,6 +77,7 @@ object StandaloneIndexService {
participantId = participantId, participantId = participantId,
eventsPageSize = config.eventsPageSize, eventsPageSize = config.eventsPageSize,
eventsProcessingParallelism = config.eventsProcessingParallelism, eventsProcessingParallelism = config.eventsProcessingParallelism,
bufferedStreamsPageSize = config.bufferedStreamsPageSize,
acsIdPageSize = config.acsIdPageSize, acsIdPageSize = config.acsIdPageSize,
acsIdPageBufferSize = config.acsIdPageBufferSize, acsIdPageBufferSize = config.acsIdPageBufferSize,
acsIdFetchingParallelism = config.acsIdFetchingParallelism, acsIdFetchingParallelism = config.acsIdFetchingParallelism,

View File

@ -11,6 +11,7 @@ final case class IndexConfiguration(
archiveFiles: List[File] = IndexConfiguration.DefaultArchiveFiles, archiveFiles: List[File] = IndexConfiguration.DefaultArchiveFiles,
eventsPageSize: Int = IndexConfiguration.DefaultEventsPageSize, eventsPageSize: Int = IndexConfiguration.DefaultEventsPageSize,
eventsProcessingParallelism: Int = IndexConfiguration.DefaultEventsProcessingParallelism, eventsProcessingParallelism: Int = IndexConfiguration.DefaultEventsProcessingParallelism,
bufferedStreamsPageSize: Int = IndexConfiguration.DefaultBufferedStreamsPageSize,
acsIdPageSize: Int = IndexConfiguration.DefaultAcsIdPageSize, acsIdPageSize: Int = IndexConfiguration.DefaultAcsIdPageSize,
acsIdPageBufferSize: Int = IndexConfiguration.DefaultAcsIdPageBufferSize, acsIdPageBufferSize: Int = IndexConfiguration.DefaultAcsIdPageBufferSize,
acsIdFetchingParallelism: Int = IndexConfiguration.DefaultAcsIdFetchingParallelism, acsIdFetchingParallelism: Int = IndexConfiguration.DefaultAcsIdFetchingParallelism,
@ -28,6 +29,7 @@ final case class IndexConfiguration(
object IndexConfiguration { object IndexConfiguration {
val DefaultEventsPageSize: Int = 1000 val DefaultEventsPageSize: Int = 1000
val DefaultEventsProcessingParallelism: Int = 8 val DefaultEventsProcessingParallelism: Int = 8
val DefaultBufferedStreamsPageSize: Int = 100
val DefaultAcsIdPageSize: Int = 20000 val DefaultAcsIdPageSize: Int = 20000
val DefaultAcsIdPageBufferSize: Int = 1 val DefaultAcsIdPageBufferSize: Int = 1
val DefaultAcsIdFetchingParallelism: Int = 2 val DefaultAcsIdFetchingParallelism: Int = 2

View File

@ -44,6 +44,7 @@ private[platform] case class IndexServiceBuilder(
initialLedgerId: LedgerId, initialLedgerId: LedgerId,
eventsPageSize: Int, eventsPageSize: Int,
eventsProcessingParallelism: Int, eventsProcessingParallelism: Int,
bufferedStreamsPageSize: Int,
acsIdPageSize: Int, acsIdPageSize: Int,
acsIdPageBufferSize: Int, acsIdPageBufferSize: Int,
acsIdFetchingParallelism: Int, acsIdFetchingParallelism: Int,
@ -185,11 +186,12 @@ private[platform] case class IndexServiceBuilder(
ledgerEndCache: LedgerEndCache, ledgerEndCache: LedgerEndCache,
): ResourceOwner[(LedgerDaoTransactionsReader, PruneBuffers)] = ): ResourceOwner[(LedgerDaoTransactionsReader, PruneBuffers)] =
if (enableInMemoryFanOutForLedgerApi) { if (enableInMemoryFanOutForLedgerApi) {
val transactionsBuffer = new EventsBuffer[Offset, TransactionLogUpdate]( val transactionsBuffer = new EventsBuffer[TransactionLogUpdate](
maxBufferSize = maxTransactionsInMemoryFanOutBufferSize, maxBufferSize = maxTransactionsInMemoryFanOutBufferSize,
metrics = metrics, metrics = metrics,
bufferQualifier = "transactions", bufferQualifier = "transactions",
isRangeEndMarker = _.isInstanceOf[TransactionLogUpdate.LedgerEndMarker], isRangeEndMarker = _.isInstanceOf[TransactionLogUpdate.LedgerEndMarker],
maxBufferedChunkSize = bufferedStreamsPageSize,
) )
val bufferedTransactionsReader = BufferedTransactionsReader( val bufferedTransactionsReader = BufferedTransactionsReader(
@ -203,7 +205,7 @@ private[platform] case class IndexServiceBuilder(
(packageId, loggingContext) => ledgerReadDao.getLfArchive(packageId)(loggingContext), (packageId, loggingContext) => ledgerReadDao.getLfArchive(packageId)(loggingContext),
), ),
metrics = metrics, metrics = metrics,
)(loggingContext, servicesExecutionContext) )(servicesExecutionContext)
for { for {
_ <- ResourceOwner.forCloseable(() => _ <- ResourceOwner.forCloseable(() =>

View File

@ -3,44 +3,50 @@
package com.daml.platform.store.cache package com.daml.platform.store.cache
import com.daml.ledger.offset.Offset
import com.daml.metrics.{Metrics, Timed} import com.daml.metrics.{Metrics, Timed}
import com.daml.platform.store.cache.BufferSlice.{BufferSlice, Inclusive, LastBufferChunkSuffix}
import com.daml.platform.store.cache.EventsBuffer.{ import com.daml.platform.store.cache.EventsBuffer.{
BufferStateRef, BufferState,
RequestOffBufferBounds, RequestOffBufferBounds,
SearchableByVector, SearchableByVector,
UnorderedException, UnorderedException,
filterAndChunkSlice,
indexAfter,
lastFilteredChunk,
} }
import com.daml.platform.store.cache.BufferSlice.{Inclusive, Prefix, BufferSlice, Empty}
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.Searching.{Found, InsertionPoint, SearchResult} import scala.collection.Searching.{Found, InsertionPoint, SearchResult}
import scala.collection.View
import scala.math.Ordering import scala.math.Ordering
import scala.math.Ordering.Implicits.infixOrderingOps
/** An ordered-by-offset ring buffer. /** An ordered-by-offset queue buffer.
* *
* The buffer allows appending only elements with strictly increasing offsets. * The buffer allows appending only elements with strictly increasing offsets.
* *
* @param maxBufferSize The maximum buffer size. * @param maxBufferSize The maximum buffer size.
* @param metrics The Daml metrics. * @param metrics The Daml metrics.
* @param bufferQualifier The qualifier used for metrics tag specialization. * @param bufferQualifier The qualifier used for metrics tag specialization.
* @param isRangeEndMarker Identifies if an element [[E]] should be treated * @param isRangeEndMarker Identifies if an element [[ENTRY]] should be treated
* as a range end marker, in which case the element would be treated * as a range end marker, in which case the element would be treated
* as a buffer range end updater and not appended to the actual buffer. * as a buffer range end updater and not appended to the actual buffer.
* @tparam O The offset type. * @tparam ENTRY The entry buffer type.
* @tparam E The entry buffer type.
*/ */
private[platform] final class EventsBuffer[O: Ordering, E]( final class EventsBuffer[ENTRY](
maxBufferSize: Long, maxBufferSize: Long,
metrics: Metrics, metrics: Metrics,
bufferQualifier: String, bufferQualifier: String,
isRangeEndMarker: E => Boolean, isRangeEndMarker: ENTRY => Boolean,
maxBufferedChunkSize: Int,
) { ) {
@volatile private var _bufferStateRef = BufferStateRef[O, E]() @volatile private var _bufferState = BufferState[Offset, ENTRY]()
private val pushTimer = metrics.daml.services.index.streamsBuffer.push(bufferQualifier) private val bufferMetrics = metrics.daml.services.index.Buffer(bufferQualifier)
private val sliceTimer = metrics.daml.services.index.streamsBuffer.slice(bufferQualifier) private val pushTimer = bufferMetrics.push
private val pruneTimer = metrics.daml.services.index.streamsBuffer.prune(bufferQualifier) private val sliceTimer = bufferMetrics.slice
private val pruneTimer = bufferMetrics.prune
private val sliceSizeHistogram = bufferMetrics.sliceSize
/** Appends a new event to the buffer. /** Appends a new event to the buffer.
* *
@ -51,11 +57,11 @@ private[platform] final class EventsBuffer[O: Ordering, E](
* of the range end marker, which can have an offset equal to the last appended element. * of the range end marker, which can have an offset equal to the last appended element.
* @param entry The buffer entry. * @param entry The buffer entry.
*/ */
def push(offset: O, entry: E): Unit = def push(offset: Offset, entry: ENTRY): Unit =
Timed.value( Timed.value(
pushTimer, pushTimer,
synchronized { synchronized {
_bufferStateRef.rangeEnd.foreach { lastOffset => _bufferState.rangeEnd.foreach { lastOffset =>
// Ensure vector grows with strictly monotonic offsets. // Ensure vector grows with strictly monotonic offsets.
// Only specially-designated range end markers are allowed // Only specially-designated range end markers are allowed
// to have offsets equal to the buffer range end. // to have offsets equal to the buffer range end.
@ -64,18 +70,18 @@ private[platform] final class EventsBuffer[O: Ordering, E](
} }
} }
var auxBufferVector = _bufferStateRef.vector var bufferVectorSnapshot = _bufferState.vector
// The range end markers are not appended to the buffer // The range end markers are not appended to the buffer
if (!isRangeEndMarker(entry)) { if (!isRangeEndMarker(entry)) {
if (auxBufferVector.size.toLong == maxBufferSize) { if (bufferVectorSnapshot.size.toLong == maxBufferSize) {
auxBufferVector = auxBufferVector.drop(1) bufferVectorSnapshot = bufferVectorSnapshot.drop(1)
} }
auxBufferVector = auxBufferVector :+ offset -> entry bufferVectorSnapshot = bufferVectorSnapshot :+ offset -> entry
} }
// Update the buffer reference // Update the buffer reference
_bufferStateRef = BufferStateRef(auxBufferVector, Some(offset)) _bufferState = BufferState(bufferVectorSnapshot, Some(offset))
}, },
) )
@ -87,28 +93,37 @@ private[platform] final class EventsBuffer[O: Ordering, E](
* @param endInclusive The end inclusive bound of the requested range. * @param endInclusive The end inclusive bound of the requested range.
* @return The series of events as an ordered vector satisfying the input bounds. * @return The series of events as an ordered vector satisfying the input bounds.
*/ */
def slice(startExclusive: O, endInclusive: O): BufferSlice[(O, E)] = def slice[FILTER_RESULT](
startExclusive: Offset,
endInclusive: Offset,
filter: ENTRY => Option[FILTER_RESULT],
): BufferSlice[(Offset, FILTER_RESULT)] =
Timed.value( Timed.value(
sliceTimer, { sliceTimer, {
val bufferSnapshot = _bufferStateRef val bufferSnapshot = _bufferState
val vectorSnapshot = bufferSnapshot.vector
if (bufferSnapshot.rangeEnd.exists(_ < endInclusive)) { if (bufferSnapshot.rangeEnd.exists(_ < endInclusive)) {
throw RequestOffBufferBounds(bufferSnapshot.vector.last._1, endInclusive) throw RequestOffBufferBounds(bufferSnapshot.rangeEnd.get, endInclusive)
} else if (bufferSnapshot.vector.isEmpty) } else {
Empty val bufferStartSearchResult = vectorSnapshot.searchBy(startExclusive, _._1)
else { val bufferEndSearchResult = vectorSnapshot.searchBy(endInclusive, _._1)
val Seq(bufferStartInclusiveIdx, bufferEndExclusiveIdx) =
Seq(startExclusive, endInclusive)
.map(bufferSnapshot.vector.searchBy(_, _._1))
.map {
case InsertionPoint(insertionPoint) => insertionPoint
case Found(foundIndex) => foundIndex + 1
}
val vectorSlice = val bufferStartInclusiveIdx = indexAfter(bufferStartSearchResult)
bufferSnapshot.vector.slice(bufferStartInclusiveIdx, bufferEndExclusiveIdx) val bufferEndExclusiveIdx = indexAfter(bufferEndSearchResult)
if (bufferStartInclusiveIdx == 0) Prefix(vectorSlice) val bufferSlice = vectorSnapshot.slice(bufferStartInclusiveIdx, bufferEndExclusiveIdx)
else Inclusive(vectorSlice)
val filteredBufferSlice = bufferStartSearchResult match {
case InsertionPoint(0) if bufferSlice.isEmpty =>
LastBufferChunkSuffix(endInclusive, Vector.empty)
case InsertionPoint(0) => lastFilteredChunk(bufferSlice, filter, maxBufferedChunkSize)
case InsertionPoint(_) | Found(_) =>
Inclusive(filterAndChunkSlice(bufferSlice.view, filter, maxBufferedChunkSize))
}
sliceSizeHistogram.update(filteredBufferSlice.slice.size)
filteredBufferSlice
} }
}, },
) )
@ -117,17 +132,15 @@ private[platform] final class EventsBuffer[O: Ordering, E](
* *
* @param endInclusive The last inclusive (highest) buffer offset to be pruned. * @param endInclusive The last inclusive (highest) buffer offset to be pruned.
*/ */
def prune(endInclusive: O): Unit = def prune(endInclusive: Offset): Unit =
Timed.value( Timed.value(
pruneTimer, pruneTimer,
synchronized { synchronized {
_bufferStateRef.vector.searchBy[O](endInclusive, _._1) match { _bufferState.vector.searchBy(endInclusive, _._1) match {
case Found(foundIndex) => case Found(foundIndex) =>
_bufferStateRef = _bufferState = _bufferState.copy(vector = _bufferState.vector.drop(foundIndex + 1))
_bufferStateRef.copy(vector = _bufferStateRef.vector.drop(foundIndex + 1))
case InsertionPoint(insertionPoint) => case InsertionPoint(insertionPoint) =>
_bufferStateRef = _bufferState = _bufferState.copy(vector = _bufferState.vector.drop(insertionPoint))
_bufferStateRef.copy(vector = _bufferStateRef.vector.drop(insertionPoint))
} }
}, },
) )
@ -140,20 +153,18 @@ private[platform] object BufferSlice {
def slice: Vector[ELEM] def slice: Vector[ELEM]
} }
/** The source was empty */
private[platform] final case object Empty extends BufferSlice[Nothing] {
override val slice: Vector[Nothing] = Vector.empty
}
/** A slice of a vector that is inclusive (start index of the slice in the source vector is gteq to 1) */ /** A slice of a vector that is inclusive (start index of the slice in the source vector is gteq to 1) */
private[platform] final case class Inclusive[ELEM](slice: Vector[ELEM]) extends BufferSlice[ELEM] private[platform] final case class Inclusive[ELEM](slice: Vector[ELEM]) extends BufferSlice[ELEM]
/** A slice of a vector that is also the vector's prefix (i.e. start index of the slice in the source vector is 0) */ /** A slice of a vector that is a suffix of the requested window (i.e. start index of the slice in the source vector is 0) */
private[platform] final case class Prefix[ELEM](slice: Vector[ELEM]) extends BufferSlice[ELEM] private[platform] final case class LastBufferChunkSuffix[ELEM](
bufferedStartExclusive: Offset,
slice: Vector[ELEM],
) extends BufferSlice[ELEM]
} }
private[platform] object EventsBuffer { private[platform] object EventsBuffer {
private final case class BufferStateRef[O, E]( private final case class BufferState[O, E](
vector: Vector[(O, E)] = Vector.empty, vector: Vector[(O, E)] = Vector.empty,
rangeEnd: Option[O] = Option.empty, rangeEnd: Option[O] = Option.empty,
) )
@ -193,4 +204,36 @@ private[platform] object EventsBuffer {
} }
} }
} }
private[cache] def indexAfter(bufferStartInclusiveSearchResult: SearchResult): Int =
bufferStartInclusiveSearchResult match {
case InsertionPoint(insertionPoint) => insertionPoint
case Found(foundIndex) => foundIndex + 1
}
private[cache] def filterAndChunkSlice[ENTRY, FILTER_RESULT](
sliceView: View[(Offset, ENTRY)],
filter: ENTRY => Option[FILTER_RESULT],
maxChunkSize: Int,
): Vector[(Offset, FILTER_RESULT)] =
sliceView
.flatMap { case (offset, entry) => filter(entry).map(offset -> _) }
.take(maxChunkSize)
.toVector
private[cache] def lastFilteredChunk[ENTRY, FILTER_RESULT](
bufferSlice: Vector[(Offset, ENTRY)],
filter: ENTRY => Option[FILTER_RESULT],
maxChunkSize: Int,
): LastBufferChunkSuffix[(Offset, FILTER_RESULT)] = {
val lastChunk =
filterAndChunkSlice(bufferSlice.view.reverse, filter, maxChunkSize + 1).reverse
if (lastChunk.isEmpty)
LastBufferChunkSuffix(bufferSlice.head._1, Vector.empty)
else {
// We waste the first element so we can pass it as the bufferStartExclusive
LastBufferChunkSuffix(lastChunk.head._1, lastChunk.tail)
}
}
} }

View File

@ -4,10 +4,9 @@
package com.daml.platform.store.dao.events package com.daml.platform.store.dao.events
import akka.NotUsed import akka.NotUsed
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.Source import akka.stream.scaladsl.Source
import com.codahale.metrics.{Counter, Timer}
import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
import com.daml.ledger.api.v1.transaction.{TransactionTree, Transaction => FlatTransaction}
import com.daml.ledger.api.v1.transaction_service.{ import com.daml.ledger.api.v1.transaction_service.{
GetFlatTransactionResponse, GetFlatTransactionResponse,
GetTransactionResponse, GetTransactionResponse,
@ -15,35 +14,53 @@ import com.daml.ledger.api.v1.transaction_service.{
GetTransactionsResponse, GetTransactionsResponse,
} }
import com.daml.ledger.offset.Offset import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref.TransactionId
import com.daml.logging.LoggingContext import com.daml.logging.LoggingContext
import com.daml.metrics.{InstrumentedSource, Metrics, Timed} import com.daml.metrics.{Metrics, Timed}
import com.daml.platform.{FilterRelation, Identifier, Party, TransactionId}
import com.daml.platform.store.dao.LedgerDaoTransactionsReader
import com.daml.platform.store.dao.events.BufferedTransactionsReader.getTransactions
import com.daml.platform.store.cache.MutableCacheBackedContractStore.EventSequentialId import com.daml.platform.store.cache.MutableCacheBackedContractStore.EventSequentialId
import com.daml.platform.store.cache.{BufferSlice, EventsBuffer} import com.daml.platform.store.cache.{BufferSlice, EventsBuffer}
import com.daml.platform.store.dao.LedgerDaoTransactionsReader
import com.daml.platform.store.dao.events.BufferedTransactionsReader.{
getTransactions,
invertMapping,
}
import com.daml.platform.store.dao.events.TransactionLogUpdatesConversions.{
ToApi,
ToFlatTransaction,
ToTransactionTree,
}
import com.daml.platform.store.interfaces.TransactionLogUpdate import com.daml.platform.store.interfaces.TransactionLogUpdate
import com.daml.platform.store.interfaces.TransactionLogUpdate.{Transaction => TxUpdate} import com.daml.platform.{FilterRelation, Identifier, Party}
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.{ExecutionContext, Future}
private[events] class BufferedTransactionsReader( private[events] class BufferedTransactionsReader(
protected val delegate: LedgerDaoTransactionsReader, protected val delegate: LedgerDaoTransactionsReader,
val transactionsBuffer: EventsBuffer[Offset, TransactionLogUpdate], val transactionsBuffer: EventsBuffer[TransactionLogUpdate],
toFlatTransaction: ( filterFlatTransactions: (
TxUpdate, Set[Party],
Map[Identifier, Set[Party]],
) => TransactionLogUpdate.Transaction => Option[TransactionLogUpdate.Transaction],
flatToApiTransactions: (
FilterRelation, FilterRelation,
Set[String],
Map[Identifier, Set[String]],
Boolean, Boolean,
) => Future[Option[FlatTransaction]], LoggingContext,
toTransactionTree: (TxUpdate, Set[String], Boolean) => Future[Option[TransactionTree]], ) => ToApi[GetTransactionsResponse],
filterTransactionTrees: Set[Party] => TransactionLogUpdate.Transaction => Option[
TransactionLogUpdate.Transaction
],
treesToApiTransactions: (
Set[Party],
Boolean,
LoggingContext,
) => ToApi[GetTransactionTreesResponse],
metrics: Metrics, metrics: Metrics,
)(implicit executionContext: ExecutionContext) ) extends LedgerDaoTransactionsReader {
extends LedgerDaoTransactionsReader {
private val outputStreamBufferSize = 128 private val flatTransactionsBufferMetrics =
metrics.daml.services.index.BufferedReader("flat_transactions")
private val transactionTreesBufferMetrics =
metrics.daml.services.index.BufferedReader("transaction_trees")
override def getFlatTransactions( override def getFlatTransactions(
startExclusive: Offset, startExclusive: Offset,
@ -52,21 +69,21 @@ private[events] class BufferedTransactionsReader(
verbose: Boolean, verbose: Boolean,
)(implicit loggingContext: LoggingContext): Source[(Offset, GetTransactionsResponse), NotUsed] = { )(implicit loggingContext: LoggingContext): Source[(Offset, GetTransactionsResponse), NotUsed] = {
val (parties, partiesTemplates) = filter.partition(_._2.isEmpty) val (parties, partiesTemplates) = filter.partition(_._2.isEmpty)
val wildcardParties = parties.keySet.map(_.toString) val wildcardParties = parties.keySet
val templatesParties = invertMapping(partiesTemplates) val templatesParties = invertMapping(partiesTemplates)
getTransactions(transactionsBuffer)(startExclusive, endInclusive, filter, verbose)( getTransactions(transactionsBuffer)(
toApiTx = toFlatTransaction(_, _, wildcardParties, templatesParties, _), startExclusive = startExclusive,
apiResponseCtor = GetTransactionsResponse(_), endInclusive = endInclusive,
filter = filter,
verbose = verbose,
metrics = metrics,
)(
filterEvents = filterFlatTransactions(wildcardParties, templatesParties),
toApiTx = flatToApiTransactions(filter, verbose, loggingContext),
fetchTransactions = delegate.getFlatTransactions(_, _, _, _)(loggingContext), fetchTransactions = delegate.getFlatTransactions(_, _, _, _)(loggingContext),
toApiTxTimer = metrics.daml.services.index.streamsBuffer.toFlatTransactions, bufferReaderMetrics = flatTransactionsBufferMetrics,
sourceTimer = metrics.daml.services.index.streamsBuffer.getFlatTransactions,
resolvedFromBufferCounter =
metrics.daml.services.index.streamsBuffer.flatTransactionsBuffered,
totalRetrievedCounter = metrics.daml.services.index.streamsBuffer.flatTransactionsTotal,
bufferSizeCounter = metrics.daml.services.index.streamsBuffer.flatTransactionsBufferSize,
outputStreamBufferSize = outputStreamBufferSize,
) )
} }
@ -78,18 +95,17 @@ private[events] class BufferedTransactionsReader(
)(implicit )(implicit
loggingContext: LoggingContext loggingContext: LoggingContext
): Source[(Offset, GetTransactionTreesResponse), NotUsed] = ): Source[(Offset, GetTransactionTreesResponse), NotUsed] =
getTransactions(transactionsBuffer)(startExclusive, endInclusive, requestingParties, verbose)( getTransactions(transactionsBuffer)(
toApiTx = (tx: TxUpdate, requestingParties: Set[Party], verbose) => startExclusive = startExclusive,
toTransactionTree(tx, requestingParties.map(_.toString), verbose), endInclusive = endInclusive,
apiResponseCtor = GetTransactionTreesResponse(_), filter = requestingParties,
verbose = verbose,
metrics = metrics,
)(
filterEvents = filterTransactionTrees(requestingParties),
toApiTx = treesToApiTransactions(requestingParties, verbose, loggingContext),
fetchTransactions = delegate.getTransactionTrees(_, _, _, _)(loggingContext), fetchTransactions = delegate.getTransactionTrees(_, _, _, _)(loggingContext),
toApiTxTimer = metrics.daml.services.index.streamsBuffer.toTransactionTrees, bufferReaderMetrics = transactionTreesBufferMetrics,
sourceTimer = metrics.daml.services.index.streamsBuffer.getTransactionTrees,
resolvedFromBufferCounter =
metrics.daml.services.index.streamsBuffer.transactionTreesBuffered,
totalRetrievedCounter = metrics.daml.services.index.streamsBuffer.transactionTreesTotal,
bufferSizeCounter = metrics.daml.services.index.streamsBuffer.transactionTreesBufferSize,
outputStreamBufferSize = outputStreamBufferSize,
) )
override def lookupFlatTransactionById( override def lookupFlatTransactionById(
@ -125,107 +141,121 @@ private[events] class BufferedTransactionsReader(
throw new UnsupportedOperationException( throw new UnsupportedOperationException(
s"getTransactionLogUpdates is not supported on ${getClass.getSimpleName}" s"getTransactionLogUpdates is not supported on ${getClass.getSimpleName}"
) )
private def invertMapping(partiesTemplates: Map[Party, Set[Identifier]]) =
partiesTemplates
.foldLeft(Map.empty[Identifier, mutable.Builder[String, Set[String]]]) {
case (acc, (k, vs)) =>
vs.foldLeft(acc) { case (a, v) =>
a + (v -> (a.getOrElse(v, Set.newBuilder) += k))
}
}
.view
.map { case (k, v) => k -> v.result() }
.toMap
} }
private[platform] object BufferedTransactionsReader { private[platform] object BufferedTransactionsReader {
private val outputStreamBufferSize = 128
type FetchTransactions[FILTER, API_RESPONSE] = type FetchTransactions[FILTER, API_RESPONSE] =
(Offset, Offset, FILTER, Boolean) => Source[(Offset, API_RESPONSE), NotUsed] (Offset, Offset, FILTER, Boolean) => Source[(Offset, API_RESPONSE), NotUsed]
def apply( def apply(
delegate: LedgerDaoTransactionsReader, delegate: LedgerDaoTransactionsReader,
transactionsBuffer: EventsBuffer[Offset, TransactionLogUpdate], transactionsBuffer: EventsBuffer[TransactionLogUpdate],
lfValueTranslation: LfValueTranslation, lfValueTranslation: LfValueTranslation,
metrics: Metrics, metrics: Metrics,
)(implicit )(implicit
loggingContext: LoggingContext, executionContext: ExecutionContext
executionContext: ExecutionContext,
): BufferedTransactionsReader = ): BufferedTransactionsReader =
new BufferedTransactionsReader( new BufferedTransactionsReader(
delegate = delegate, delegate = delegate,
transactionsBuffer = transactionsBuffer, transactionsBuffer = transactionsBuffer,
toFlatTransaction =
TransactionLogUpdatesConversions.ToFlatTransaction(_, _, _, _, _, lfValueTranslation),
toTransactionTree =
TransactionLogUpdatesConversions.ToTransactionTree(_, _, _, lfValueTranslation),
metrics = metrics, metrics = metrics,
filterFlatTransactions = ToFlatTransaction.filter,
flatToApiTransactions =
ToFlatTransaction.toApiTransaction(_, _, lfValueTranslation)(_, executionContext),
filterTransactionTrees = ToTransactionTree.filter,
treesToApiTransactions =
ToTransactionTree.toApiTransaction(_, _, lfValueTranslation)(_, executionContext),
) )
private[events] def getTransactions[FILTER, API_TX, API_RESPONSE]( private[events] def getTransactions[FILTER, API_RESPONSE](
transactionsBuffer: EventsBuffer[Offset, TransactionLogUpdate] transactionsBuffer: EventsBuffer[TransactionLogUpdate]
)( )(
startExclusive: Offset, startExclusive: Offset,
endInclusive: Offset, endInclusive: Offset,
filter: FILTER, filter: FILTER,
verbose: Boolean, verbose: Boolean,
metrics: Metrics,
)( )(
toApiTx: (TxUpdate, FILTER, Boolean) => Future[Option[API_TX]], filterEvents: TransactionLogUpdate.Transaction => Option[TransactionLogUpdate.Transaction],
apiResponseCtor: Seq[API_TX] => API_RESPONSE, toApiTx: ToApi[API_RESPONSE],
fetchTransactions: FetchTransactions[FILTER, API_RESPONSE], fetchTransactions: FetchTransactions[FILTER, API_RESPONSE],
sourceTimer: Timer, bufferReaderMetrics: metrics.daml.services.index.BufferedReader,
toApiTxTimer: Timer, ): Source[(Offset, API_RESPONSE), NotUsed] = {
resolvedFromBufferCounter: Counter, def bufferSource(
totalRetrievedCounter: Counter, bufferSlice: Vector[(Offset, TransactionLogUpdate.Transaction)]
outputStreamBufferSize: Int, ) =
bufferSizeCounter: Counter, if (bufferSlice.isEmpty) Source.empty
)(implicit executionContext: ExecutionContext): Source[(Offset, API_RESPONSE), NotUsed] = { else
def bufferedSource( Source(bufferSlice)
slice: Vector[(Offset, TransactionLogUpdate)] .mapAsync(1) { case (offset, payload) =>
): Source[(Offset, API_RESPONSE), NotUsed] = bufferReaderMetrics.fetchedBuffered.inc()
Source Timed.future(
.fromIterator(() => slice.iterator) bufferReaderMetrics.conversion,
// Using collect + mapAsync as an alternative to the non-existent collectAsync toApiTx(payload).map(offset -> _)(ExecutionContext.parasitic),
.collect { case (offset, tx: TxUpdate) => )
Timed.future(toApiTxTimer, toApiTx(tx, filter, verbose).map(offset -> _)) }
}
// Note that it is safe to use high parallelism for mapAsync as long
// as the Futures executed within are running on a bounded thread pool
.mapAsync(32)(identity)
.async
.collect { case (offset, Some(tx)) =>
resolvedFromBufferCounter.inc()
offset -> apiResponseCtor(Seq(tx))
}
val transactionsSource = Timed.source( val source = Source
sourceTimer, { .unfold(startExclusive) {
transactionsBuffer.slice(startExclusive, endInclusive) match { case scannedToInclusive if scannedToInclusive < endInclusive =>
case BufferSlice.Empty => val sliceFilter: TransactionLogUpdate => Option[TransactionLogUpdate.Transaction] = {
fetchTransactions(startExclusive, endInclusive, filter, verbose) case tx: TransactionLogUpdate.Transaction => filterEvents(tx)
case _ => None
}
case BufferSlice.Prefix(slice) => transactionsBuffer.slice(scannedToInclusive, endInclusive, sliceFilter) match {
if (slice.size <= 1) { case BufferSlice.Inclusive(slice) =>
fetchTransactions(startExclusive, endInclusive, filter, verbose) val sourceFromBuffer = bufferSource(slice)
} else { val nextChunkStartExclusive = slice.lastOption.map(_._1).getOrElse(endInclusive)
fetchTransactions(startExclusive, slice.head._1, filter, verbose) Some(nextChunkStartExclusive -> sourceFromBuffer)
.concat(bufferedSource(slice.tail))
.mapMaterializedValue(_ => NotUsed)
}
case BufferSlice.Inclusive(slice) => case BufferSlice.LastBufferChunkSuffix(bufferedStartExclusive, slice) =>
bufferedSource(slice).mapMaterializedValue(_ => NotUsed) val sourceFromBuffer =
} fetchTransactions(startExclusive, bufferedStartExclusive, filter, verbose)
}.map(tx => { .concat(bufferSource(slice))
totalRetrievedCounter.inc() Some(endInclusive -> sourceFromBuffer)
}
case _ => None
}
.flatMapConcat(identity)
Timed
.source(bufferReaderMetrics.fetchTimer, source)
.map { tx =>
bufferReaderMetrics.fetchedTotal.inc()
tx tx
}), }
) .buffered(outputStreamBufferSize)(bufferReaderMetrics.bufferSize)
}
InstrumentedSource.bufferedSource( private[events] def invertMapping(
original = transactionsSource, partiesTemplates: Map[Party, Set[Identifier]]
counter = bufferSizeCounter, ): Map[Identifier, Set[Party]] =
size = outputStreamBufferSize, partiesTemplates
) .foldLeft(Map.empty[Identifier, Set[Party]]) {
case (templatesToParties, (party, templates)) =>
templates.foldLeft(templatesToParties) { case (aux, templateId) =>
aux.updatedWith(templateId) {
case None => Some(Set(party))
case Some(partySet) => Some(partySet + party)
}
}
}
// TODO LLP: Consider merging with InstrumentedSource.bufferedSource
private implicit class SourceWithBuffers[T, R](source: Source[T, NotUsed]) {
def buffered(bufferLength: Int)(counter: com.codahale.metrics.Counter): Source[T, NotUsed] =
source
.map { in =>
counter.inc()
in
}
.buffer(bufferLength, OverflowStrategy.backpressure)
.map { in =>
counter.dec()
in
}
} }
} }

View File

@ -10,63 +10,75 @@ import com.daml.ledger.api.v1.transaction.{
TreeEvent, TreeEvent,
Transaction => FlatTransaction, Transaction => FlatTransaction,
} }
import com.daml.ledger.api.v1.transaction_service.{
GetTransactionTreesResponse,
GetTransactionsResponse,
}
import com.daml.ledger.api.v1.{event => apiEvent} import com.daml.ledger.api.v1.{event => apiEvent}
import com.daml.lf.data.Ref import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.{Identifier, Party}
import com.daml.lf.value.Value.ContractId
import com.daml.logging.LoggingContext import com.daml.logging.LoggingContext
import com.daml.platform.{ApiOffset, ContractId, FilterRelation, Identifier, Value}
import com.daml.platform.api.v1.event.EventOps.TreeEventOps import com.daml.platform.api.v1.event.EventOps.TreeEventOps
import com.daml.platform.participant.util.LfEngineToApi import com.daml.platform.participant.util.LfEngineToApi
import com.daml.platform.store.interfaces.TransactionLogUpdate import com.daml.platform.store.interfaces.TransactionLogUpdate
import com.daml.platform.store.interfaces.TransactionLogUpdate.{CreatedEvent, ExercisedEvent} import com.daml.platform.store.interfaces.TransactionLogUpdate.{CreatedEvent, ExercisedEvent}
import com.daml.platform.{ApiOffset, FilterRelation, Value}
import com.google.protobuf.timestamp.Timestamp import com.google.protobuf.timestamp.Timestamp
import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.{ExecutionContext, Future}
private[events] object TransactionLogUpdatesConversions { private[events] object TransactionLogUpdatesConversions {
type ToApi[API_TX] = TransactionLogUpdate.Transaction => Future[API_TX]
object ToFlatTransaction { object ToFlatTransaction {
def apply( def filter(
tx: TransactionLogUpdate.Transaction, wildcardParties: Set[Party],
templateSpecificParties: Map[Identifier, Set[Party]],
): TransactionLogUpdate.Transaction => Option[TransactionLogUpdate.Transaction] =
transaction => {
val flatTransactionEvents = transaction.events.collect {
case createdEvent: TransactionLogUpdate.CreatedEvent => createdEvent
case exercisedEvent: TransactionLogUpdate.ExercisedEvent if exercisedEvent.consuming =>
exercisedEvent
}
val filteredFlatEvents = flatTransactionEvents
.filter(flatTransactionPredicate(wildcardParties, templateSpecificParties))
val hasOneEventWithCommandId = filteredFlatEvents.exists(_.commandId.nonEmpty)
// Allows emitting flat transactions with no events, a use-case needed
// for the functioning of Daml triggers.
// (more details in https://github.com/digital-asset/daml/issues/6975)
Option.when(filteredFlatEvents.nonEmpty || hasOneEventWithCommandId)(
transaction.copy(events = filteredFlatEvents)
)
}
def toApiTransaction(
filter: FilterRelation, filter: FilterRelation,
wildcardParties: Set[String],
templateSpecificParties: Map[Identifier, Set[String]],
verbose: Boolean, verbose: Boolean,
lfValueTranslation: LfValueTranslation, lfValueTranslation: LfValueTranslation,
)(implicit )(implicit
loggingContext: LoggingContext, loggingContext: LoggingContext,
executionContext: ExecutionContext, executionContext: ExecutionContext,
): Future[Option[FlatTransaction]] = { ): ToApi[GetTransactionsResponse] = { transaction =>
val transactionEvents = tx.events val nonTransient = removeTransient(transaction.events)
val filtered = transactionEvents.filter( val requestingParties = filter.keySet
FlatTransactionPredicate(_, wildcardParties, templateSpecificParties) Future
) .traverse(nonTransient)(toFlatEvent(_, requestingParties, verbose, lfValueTranslation))
.map(flatEvents =>
filtered.headOption GetTransactionsResponse(
.map { first => Seq(
val events = removeTransient(filtered) FlatTransaction(
val requestingParties = filter.keySet.map(_.toString) transactionId = transaction.transactionId,
commandId = getCommandId(transaction.events, requestingParties),
Future workflowId = transaction.workflowId,
.traverse(events)(toFlatEvent(_, requestingParties, verbose, lfValueTranslation)) effectiveAt = Some(timestampToTimestamp(transaction.effectiveAt)),
.map(_.collect { case Some(value) => value }) events = flatEvents,
.map { flatEvents => offset = ApiOffset.toApiString(transaction.offset),
// Allows emitting flat transactions with no events, a use-case needed )
// for the functioning of Daml triggers. )
// (more details in https://github.com/digital-asset/daml/issues/6975) )
if (flatEvents.nonEmpty || first.commandId.nonEmpty) { )
Some(
FlatTransaction(
transactionId = first.transactionId,
commandId = getCommandId(events, requestingParties),
workflowId = first.workflowId,
effectiveAt = Some(timestampToTimestamp(first.ledgerEffectiveTime)),
events = flatEvents,
offset = ApiOffset.toApiString(tx.offset),
)
)
} else None
}
}
.getOrElse(Future.successful(None))
} }
private def removeTransient(aux: Vector[TransactionLogUpdate.Event]) = { private def removeTransient(aux: Vector[TransactionLogUpdate.Event]) = {
@ -82,43 +94,39 @@ private[events] object TransactionLogUpdatesConversions {
aux.filter(ev => permanent(ev.contractId)) aux.filter(ev => permanent(ev.contractId))
} }
private val FlatTransactionPredicate = private def flatTransactionPredicate(
( wildcardParties: Set[Party],
event: TransactionLogUpdate.Event, templateSpecificParties: Map[Identifier, Set[Party]],
wildcardParties: Set[String], )(event: TransactionLogUpdate.Event) = {
templateSpecificParties: Map[Identifier, Set[String]], val stakeholdersMatchingParties =
) => { event.flatEventWitnesses.exists(wildcardParties)
val stakeholdersMatchingParties = stakeholdersMatchingParties || templateSpecificParties
event.flatEventWitnesses.exists(wildcardParties) .get(event.templateId)
.exists(_.exists(event.flatEventWitnesses))
stakeholdersMatchingParties || templateSpecificParties }
.get(event.templateId)
.exists(_.exists(event.flatEventWitnesses))
}
private def toFlatEvent( private def toFlatEvent(
event: TransactionLogUpdate.Event, event: TransactionLogUpdate.Event,
requestingParties: Set[String], requestingParties: Set[Party],
verbose: Boolean, verbose: Boolean,
lfValueTranslation: LfValueTranslation, lfValueTranslation: LfValueTranslation,
)(implicit )(implicit
loggingContext: LoggingContext, loggingContext: LoggingContext,
executionContext: ExecutionContext, executionContext: ExecutionContext,
): Future[Option[apiEvent.Event]] = ): Future[apiEvent.Event] =
event match { event match {
case createdEvent: TransactionLogUpdate.CreatedEvent => case createdEvent: TransactionLogUpdate.CreatedEvent =>
createdToFlatEvent(requestingParties, verbose, lfValueTranslation, createdEvent) createdToFlatEvent(requestingParties, verbose, lfValueTranslation, createdEvent)
.map(Some(_))
case exercisedEvent: TransactionLogUpdate.ExercisedEvent if exercisedEvent.consuming => case exercisedEvent: TransactionLogUpdate.ExercisedEvent if exercisedEvent.consuming =>
Future.successful(Some(exercisedToFlatEvent(requestingParties, exercisedEvent))) Future.successful(exercisedToFlatEvent(requestingParties, exercisedEvent))
case _ => Future.successful(None) case _ => Future.failed(new RuntimeException("Not a flat transaction event"))
} }
private def createdToFlatEvent( private def createdToFlatEvent(
requestingParties: Set[String], requestingParties: Set[Party],
verbose: Boolean, verbose: Boolean,
lfValueTranslation: LfValueTranslation, lfValueTranslation: LfValueTranslation,
createdEvent: CreatedEvent, createdEvent: CreatedEvent,
@ -173,7 +181,7 @@ private[events] object TransactionLogUpdatesConversions {
} }
private def exercisedToFlatEvent( private def exercisedToFlatEvent(
requestingParties: Set[String], requestingParties: Set[Party],
exercisedEvent: ExercisedEvent, exercisedEvent: ExercisedEvent,
): apiEvent.Event = ): apiEvent.Event =
apiEvent.Event( apiEvent.Event(
@ -182,29 +190,34 @@ private[events] object TransactionLogUpdatesConversions {
eventId = exercisedEvent.eventId.toLedgerString, eventId = exercisedEvent.eventId.toLedgerString,
contractId = exercisedEvent.contractId.coid, contractId = exercisedEvent.contractId.coid,
templateId = Some(LfEngineToApi.toApiIdentifier(exercisedEvent.templateId)), templateId = Some(LfEngineToApi.toApiIdentifier(exercisedEvent.templateId)),
witnessParties = requestingParties.view.filter(exercisedEvent.flatEventWitnesses).toSeq, witnessParties =
requestingParties.iterator.filter(exercisedEvent.flatEventWitnesses).toSeq,
) )
) )
) )
} }
object ToTransactionTree { object ToTransactionTree {
def apply( def filter(
tx: TransactionLogUpdate.Transaction, requestingParties: Set[Party]
requestingParties: Set[String], )(transaction: TransactionLogUpdate.Transaction): Option[TransactionLogUpdate.Transaction] = {
val filteredForVisibility =
transaction.events.filter(transactionTreePredicate(requestingParties))
Option.when(filteredForVisibility.nonEmpty)(transaction.copy(events = filteredForVisibility))
}
def toApiTransaction(
requestingParties: Set[Party],
verbose: Boolean, verbose: Boolean,
lfValueTranslation: LfValueTranslation, lfValueTranslation: LfValueTranslation,
)(implicit )(implicit
loggingContext: LoggingContext, loggingContext: LoggingContext,
executionContext: ExecutionContext, executionContext: ExecutionContext,
): Future[Option[TransactionTree]] = { ): ToApi[GetTransactionTreesResponse] =
val filteredForVisibility = tx.events transaction =>
.filter(TransactionTreePredicate(requestingParties))
if (filteredForVisibility.isEmpty) Future.successful(None)
else
Future Future
.traverse(filteredForVisibility)( .traverse(transaction.events)(
toTransactionTreeEvent(requestingParties, verbose, lfValueTranslation) toTransactionTreeEvent(requestingParties, verbose, lfValueTranslation)
) )
.map { treeEvents => .map { treeEvents =>
@ -228,22 +241,23 @@ private[events] object TransactionLogUpdatesConversions {
// that are not a child of some other visible item // that are not a child of some other visible item
val rootEventIds = visible.filterNot(children) val rootEventIds = visible.filterNot(children)
Some( GetTransactionTreesResponse(
TransactionTree( Seq(
transactionId = tx.transactionId, TransactionTree(
commandId = getCommandId(filteredForVisibility, requestingParties), transactionId = transaction.transactionId,
workflowId = tx.workflowId, commandId = getCommandId(transaction.events, requestingParties),
effectiveAt = Some(timestampToTimestamp(tx.effectiveAt)), workflowId = transaction.workflowId,
offset = ApiOffset.toApiString(tx.offset), effectiveAt = Some(timestampToTimestamp(transaction.effectiveAt)),
eventsById = eventsById, offset = ApiOffset.toApiString(transaction.offset),
rootEventIds = rootEventIds, eventsById = eventsById,
rootEventIds = rootEventIds,
)
) )
) )
} }
}
private def toTransactionTreeEvent( private def toTransactionTreeEvent(
requestingParties: Set[String], requestingParties: Set[Party],
verbose: Boolean, verbose: Boolean,
lfValueTranslation: LfValueTranslation, lfValueTranslation: LfValueTranslation,
)(event: TransactionLogUpdate.Event)(implicit )(event: TransactionLogUpdate.Event)(implicit
@ -268,7 +282,7 @@ private[events] object TransactionLogUpdatesConversions {
} }
private def exercisedToTransactionTreeEvent( private def exercisedToTransactionTreeEvent(
requestingParties: Set[String], requestingParties: Set[Party],
verbose: Boolean, verbose: Boolean,
lfValueTranslation: LfValueTranslation, lfValueTranslation: LfValueTranslation,
exercisedEvent: ExercisedEvent, exercisedEvent: ExercisedEvent,
@ -335,7 +349,7 @@ private[events] object TransactionLogUpdatesConversions {
} }
private def createdToTransactionTreeEvent( private def createdToTransactionTreeEvent(
requestingParties: Set[String], requestingParties: Set[Party],
verbose: Boolean, verbose: Boolean,
lfValueTranslation: LfValueTranslation, lfValueTranslation: LfValueTranslation,
createdEvent: CreatedEvent, createdEvent: CreatedEvent,
@ -393,11 +407,13 @@ private[events] object TransactionLogUpdatesConversions {
) )
} }
private val TransactionTreePredicate: Set[String] => TransactionLogUpdate.Event => Boolean = private def transactionTreePredicate(
requestingParties => { requestingParties: Set[Party]
case createdEvent: CreatedEvent => requestingParties.exists(createdEvent.treeEventWitnesses) ): TransactionLogUpdate.Event => Boolean = {
case exercised: ExercisedEvent => requestingParties.exists(exercised.treeEventWitnesses) case createdEvent: CreatedEvent => requestingParties.exists(createdEvent.treeEventWitnesses)
} case exercised: ExercisedEvent => requestingParties.exists(exercised.treeEventWitnesses)
case _ => false
}
} }
private def timestampToTimestamp(t: com.daml.lf.data.Time.Timestamp): Timestamp = private def timestampToTimestamp(t: com.daml.lf.data.Time.Timestamp): Timestamp =
@ -405,7 +421,7 @@ private[events] object TransactionLogUpdatesConversions {
private def getCommandId( private def getCommandId(
flatTransactionEvents: Vector[TransactionLogUpdate.Event], flatTransactionEvents: Vector[TransactionLogUpdate.Event],
requestingParties: Set[String], requestingParties: Set[Party],
) = ) =
flatTransactionEvents flatTransactionEvents
.collectFirst { .collectFirst {

View File

@ -41,9 +41,9 @@ object TransactionLogUpdatesReader {
_, _,
) )
), ),
treeEventWitnesses = raw.treeEventWitnesses, treeEventWitnesses = raw.treeEventWitnesses.map(Ref.Party.assertFromString),
flatEventWitnesses = raw.flatEventWitnesses, flatEventWitnesses = raw.flatEventWitnesses.map(Ref.Party.assertFromString),
submitters = raw.submitters, submitters = raw.submitters.map(Ref.Party.assertFromString),
choice = choiceName, choice = choiceName,
actingParties = raw.exerciseActors actingParties = raw.exerciseActors
.mandatory("exercise_actors") .mandatory("exercise_actors")
@ -97,12 +97,20 @@ object TransactionLogUpdatesReader {
commandId = raw.commandId.getOrElse(""), commandId = raw.commandId.getOrElse(""),
workflowId = raw.workflowId.getOrElse(""), workflowId = raw.workflowId.getOrElse(""),
contractKey = maybeGlobalKey, contractKey = maybeGlobalKey,
treeEventWitnesses = raw.treeEventWitnesses, treeEventWitnesses = raw.treeEventWitnesses.map(Ref.Party.assertFromString),
flatEventWitnesses = raw.flatEventWitnesses, flatEventWitnesses = raw.flatEventWitnesses.map(Ref.Party.assertFromString),
submitters = raw.submitters, submitters = raw.submitters.map(Ref.Party.assertFromString),
createArgument = createArgumentDecompressed, createArgument = createArgumentDecompressed,
createSignatories = raw.createSignatories.mandatory("create_signatories").toSet, createSignatories = raw.createSignatories
createObservers = raw.createObservers.mandatory("create_observers").toSet, .mandatory("create_signatories")
.iterator
.map(Ref.Party.assertFromString)
.toSet,
createObservers = raw.createObservers
.mandatory("create_observers")
.iterator
.map(Ref.Party.assertFromString)
.toSet,
createAgreementText = raw.createAgreementText, createAgreementText = raw.createAgreementText,
) )
case unknownKind => case unknownKind =>

View File

@ -4,10 +4,10 @@
package com.daml.platform.store.interfaces package com.daml.platform.store.interfaces
import com.daml.ledger.offset.Offset import com.daml.ledger.offset.Offset
import com.daml.lf.value.{Value => LfValue} import com.daml.lf.data.Ref.Party
import com.daml.lf.data.Ref.IdString
import com.daml.lf.data.Time.Timestamp import com.daml.lf.data.Time.Timestamp
import com.daml.lf.ledger.EventId import com.daml.lf.ledger.EventId
import com.daml.lf.value.{Value => LfValue}
import com.daml.platform.{ContractId, Identifier} import com.daml.platform.{ContractId, Identifier}
import com.daml.platform.store.cache.MutableCacheBackedContractStore.EventSequentialId import com.daml.platform.store.cache.MutableCacheBackedContractStore.EventSequentialId
@ -24,7 +24,6 @@ object TransactionLogUpdate {
/** Complete view of a ledger transaction. /** Complete view of a ledger transaction.
* *
* @param transactionId The transaction it. * @param transactionId The transaction it.
* @param commandId The command id.
* @param workflowId The workflow id. * @param workflowId The workflow id.
* @param effectiveAt The transaction ledger time. * @param effectiveAt The transaction ledger time.
* @param offset The transaction's offset in the ledger. * @param offset The transaction's offset in the ledger.
@ -41,6 +40,8 @@ object TransactionLogUpdate {
} }
/** A special event which signifies that the ledger end has been reached in a stream. /** A special event which signifies that the ledger end has been reached in a stream.
*
* TODO LLP: Remove this class with the implementation of the Ledger API - Indexer bypass.
* *
* @see [[LedgerDaoTransactionsReader.getTransactionLogUpdates()]] * @see [[LedgerDaoTransactionsReader.getTransactionLogUpdates()]]
* @param eventOffset The ledger end offset. * @param eventOffset The ledger end offset.
@ -58,9 +59,9 @@ object TransactionLogUpdate {
def commandId: String def commandId: String
def workflowId: String def workflowId: String
def ledgerEffectiveTime: Timestamp def ledgerEffectiveTime: Timestamp
def treeEventWitnesses: Set[String] def treeEventWitnesses: Set[Party]
def flatEventWitnesses: Set[String] def flatEventWitnesses: Set[Party]
def submitters: Set[String] def submitters: Set[Party]
def templateId: Identifier def templateId: Identifier
def contractId: ContractId def contractId: ContractId
} }
@ -77,12 +78,12 @@ object TransactionLogUpdate {
commandId: String, commandId: String,
workflowId: String, workflowId: String,
contractKey: Option[LfValue.VersionedValue], contractKey: Option[LfValue.VersionedValue],
treeEventWitnesses: Set[String], treeEventWitnesses: Set[Party],
flatEventWitnesses: Set[String], flatEventWitnesses: Set[Party],
submitters: Set[String], submitters: Set[Party],
createArgument: LfValue.VersionedValue, createArgument: LfValue.VersionedValue,
createSignatories: Set[String], createSignatories: Set[Party],
createObservers: Set[String], createObservers: Set[Party],
createAgreementText: Option[String], createAgreementText: Option[String],
) extends Event ) extends Event
@ -99,11 +100,11 @@ object TransactionLogUpdate {
commandId: String, commandId: String,
workflowId: String, workflowId: String,
contractKey: Option[LfValue.VersionedValue], contractKey: Option[LfValue.VersionedValue],
treeEventWitnesses: Set[String], treeEventWitnesses: Set[Party],
flatEventWitnesses: Set[String], flatEventWitnesses: Set[Party],
submitters: Set[String], submitters: Set[Party],
choice: String, choice: String,
actingParties: Set[IdString.Party], actingParties: Set[Party],
children: Seq[String], children: Seq[String],
exerciseArgument: LfValue.VersionedValue, exerciseArgument: LfValue.VersionedValue,
exerciseResult: Option[LfValue.VersionedValue], exerciseResult: Option[LfValue.VersionedValue],

View File

@ -213,7 +213,7 @@ final class BuffersUpdaterSpec
val exercisedCid = ContractId.V1(Hash.hashPrivateKey("exercisedCid")) val exercisedCid = ContractId.V1(Hash.hashPrivateKey("exercisedCid"))
val exercisedKey = Versioned(TransactionVersion.VDev, ValueInt64(8974L)) val exercisedKey = Versioned(TransactionVersion.VDev, ValueInt64(8974L))
val exercisedTemplateId = Ref.Identifier.assertFromString("exercised:template:id") val exercisedTemplateId = Ref.Identifier.assertFromString("exercised:template:id")
val exercisedFlatEventWitnesses = Set("bob", "dan") val exercisedFlatEventWitnesses = Set("bob", "dan").map(Ref.Party.assertFromString)
val exercisedOffset = Offset.fromByteArray(BigInt(1337L).toByteArray) val exercisedOffset = Offset.fromByteArray(BigInt(1337L).toByteArray)
val exercisedEventSequentialId = 9876L val exercisedEventSequentialId = 9876L
@ -252,8 +252,8 @@ final class BuffersUpdaterSpec
commandId = null, commandId = null,
workflowId = null, workflowId = null,
contractKey = Some(createdContractKey), contractKey = Some(createdContractKey),
treeEventWitnesses = Set("bob"), // Unused in ContractStateEvent treeEventWitnesses = Set(Ref.Party.assertFromString("bob")), // Unused in ContractStateEvent
flatEventWitnesses = createdFlatEventWitnesses, flatEventWitnesses = createdFlatEventWitnesses.map(Ref.Party.assertFromString),
submitters = null, submitters = null,
createArgument = createArgument, createArgument = createArgument,
createSignatories = null, createSignatories = null,

View File

@ -8,12 +8,17 @@ import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source} import akka.stream.scaladsl.{Sink, Source}
import com.codahale.metrics.MetricRegistry import com.codahale.metrics.MetricRegistry
import com.daml.ledger.offset.Offset import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref.{IdString, Identifier, Party}
import com.daml.lf.data.Time.Timestamp import com.daml.lf.data.Time.Timestamp
import com.daml.metrics.Metrics import com.daml.metrics.Metrics
import com.daml.platform.store.dao.events.BufferedTransactionsReader.FetchTransactions import com.daml.platform.store.dao.events.BufferedTransactionsReader.{
FetchTransactions,
invertMapping,
}
import com.daml.platform.store.dao.events.BufferedTransactionsReaderSpec.{ import com.daml.platform.store.dao.events.BufferedTransactionsReaderSpec.{
offset,
predecessor, predecessor,
transactionLogUpdate, transaction,
} }
import com.daml.platform.store.cache.EventsBuffer import com.daml.platform.store.cache.EventsBuffer
import com.daml.platform.store.interfaces.TransactionLogUpdate import com.daml.platform.store.interfaces.TransactionLogUpdate
@ -41,67 +46,60 @@ class BufferedTransactionsReaderSpec
(offset4, update4), (offset4, update4),
) = ) =
(1 to 4).map { idx => (1 to 4).map { idx =>
Offset.fromByteArray(BigInt(idx * 2 + 1234).toByteArray) -> transactionLogUpdate(s"tx-$idx") offset(idx.toLong) -> transaction(s"tx-$idx")
} }
// Dummy filters. We're not interested in concrete details // Dummy filters. We're not interested in concrete details
// since we are asserting only the generic getTransactions method // since we are asserting only the generic getTransactions method
val filter, otherFilter = new Object val filter = new Object
val txApiMap @ Seq( val filterEvents: TransactionLogUpdate => Option[TransactionLogUpdate.Transaction] = {
(apiTx1, apiResponse1), case `update1` => None
(apiTx2, apiResponse2), case `update2` => Some(update2)
(apiTx3, apiResponse3), case `update3` => Some(update3)
(apiTx4, apiResponse4), case `update4` => Some(update4)
) = (1 to 4).map(idx => s"Some API TX $idx" -> s"Some API response $idx from buffer")
val toApiTx: (TransactionLogUpdate, Object, Boolean) => Future[Option[String]] = {
case (`update1`, `otherFilter`, false) => Future.successful(None)
case (`update1`, `filter`, false) => Future.successful(Some(apiTx1))
case (`update2`, `filter`, false) => Future.successful(Some(apiTx2))
case (`update3`, `filter`, false) => Future.successful(Some(apiTx3))
case (`update4`, `filter`, false) => Future.successful(Some(apiTx4))
case unexpected => fail(s"Unexpected $unexpected") case unexpected => fail(s"Unexpected $unexpected")
} }
val apiResponseCtor = txApiMap.map { case (apiTx, apiResponse) => val apiResponseFromDB = "Some API response from storage"
Seq(apiTx) -> apiResponse val apiResponses @ Seq(apiResponse2, apiResponse3, apiResponse4) =
}.toMap (2 to 4).map(idx => s"Some API response $idx from buffer")
val transactionsBuffer = new EventsBuffer[Offset, TransactionLogUpdate]( val toApiTx = Seq(update2, update3, update4)
.zip(apiResponses)
.map { case (u, r) =>
u -> Future.successful(r)
}
.toMap
val transactionsBuffer = new EventsBuffer[TransactionLogUpdate](
maxBufferSize = 3L, maxBufferSize = 3L,
metrics = metrics, metrics = metrics,
bufferQualifier = "test", bufferQualifier = "test",
isRangeEndMarker = _.isInstanceOf[TransactionLogUpdate.LedgerEndMarker], isRangeEndMarker = _.isInstanceOf[TransactionLogUpdate.LedgerEndMarker],
maxBufferedChunkSize = 100,
) )
offsetUpdates.foreach { case (offset, update) => offsetUpdates.foreach(Function.tupled(transactionsBuffer.push))
transactionsBuffer.push(offset, update)
}
def readerGetTransactionsGeneric( def readerGetTransactionsGeneric(
eventsBuffer: EventsBuffer[Offset, TransactionLogUpdate], eventsBuffer: EventsBuffer[TransactionLogUpdate],
startExclusive: Offset, startExclusive: Offset,
endInclusive: Offset, endInclusive: Offset,
fetchTransactions: FetchTransactions[Object, String], fetchTransactions: FetchTransactions[Object, String],
) = ) =
BufferedTransactionsReader BufferedTransactionsReader
.getTransactions(eventsBuffer)( .getTransactions[Object, String](eventsBuffer)(
startExclusive = startExclusive, startExclusive = startExclusive,
endInclusive = endInclusive, endInclusive = endInclusive,
filter = filter, filter = filter,
verbose = false, verbose = false,
metrics,
)( )(
filterEvents = filterEvents,
toApiTx = toApiTx, toApiTx = toApiTx,
apiResponseCtor = apiResponseCtor,
fetchTransactions = fetchTransactions, fetchTransactions = fetchTransactions,
toApiTxTimer = metrics.daml.services.index.streamsBuffer.toTransactionTrees, bufferReaderMetrics = metrics.daml.services.index.BufferedReader("some_tx_stream"),
sourceTimer = metrics.daml.services.index.streamsBuffer.getTransactionTrees,
resolvedFromBufferCounter =
metrics.daml.services.index.streamsBuffer.transactionTreesBuffered,
totalRetrievedCounter = metrics.daml.services.index.streamsBuffer.transactionTreesTotal,
bufferSizeCounter = metrics.daml.services.index.streamsBuffer.transactionTreesBufferSize,
outputStreamBufferSize = 128,
) )
.runWith(Sink.seq) .runWith(Sink.seq)
@ -157,21 +155,53 @@ class BufferedTransactionsReaderSpec
) )
) )
} }
"fetch from buffer and storage chunked" in {
val transactionsBufferWithSmallChunkSize = new EventsBuffer[TransactionLogUpdate](
maxBufferSize = 3L,
metrics = metrics,
bufferQualifier = "test",
isRangeEndMarker = _.isInstanceOf[TransactionLogUpdate.LedgerEndMarker],
maxBufferedChunkSize = 1,
)
offsetUpdates.foreach(Function.tupled(transactionsBufferWithSmallChunkSize.push))
val anotherResponseForOffset2 = "(2) Response fetched from storage"
val anotherResponseForOffset3 = "(3) Response fetched from storage"
readerGetTransactionsGeneric(
eventsBuffer = transactionsBufferWithSmallChunkSize,
startExclusive = offset1,
endInclusive = offset4,
fetchTransactions = {
case (`offset1`, `offset3`, `filter`, false) =>
Source(
Seq(offset2 -> anotherResponseForOffset2, offset3 -> anotherResponseForOffset3)
)
case unexpected =>
fail(s"Unexpected fetch transactions subscription start: $unexpected")
},
).map(
_ should contain theSameElementsInOrderAs Seq(
offset2 -> anotherResponseForOffset2,
offset3 -> anotherResponseForOffset3,
offset4 -> apiResponse4,
)
)
}
} }
"request before buffer bounds" should { "request before buffer bounds" should {
"fetch only from storage" in { "fetch only from storage" in {
val transactionsBuffer = new EventsBuffer[Offset, TransactionLogUpdate]( val transactionsBuffer = new EventsBuffer[TransactionLogUpdate](
maxBufferSize = 1L, maxBufferSize = 1L,
metrics = metrics, metrics = metrics,
bufferQualifier = "test", bufferQualifier = "test",
isRangeEndMarker = _.isInstanceOf[TransactionLogUpdate.LedgerEndMarker], isRangeEndMarker = _.isInstanceOf[TransactionLogUpdate.LedgerEndMarker],
maxBufferedChunkSize = 100,
) )
offsetUpdates.foreach { case (offset, update) => offsetUpdates.foreach(Function.tupled(transactionsBuffer.push))
transactionsBuffer.push(offset, update) val fetchedElements = Vector(offset2 -> apiResponseFromDB, offset3 -> apiResponse2)
}
val fetchedElements = Vector(offset2 -> apiResponse1, offset3 -> apiResponse2)
readerGetTransactionsGeneric( readerGetTransactionsGeneric(
eventsBuffer = transactionsBuffer, eventsBuffer = transactionsBuffer,
@ -187,13 +217,35 @@ class BufferedTransactionsReaderSpec
} }
} }
"invertMapping" should {
"invert the mapping of Map[Party, Set[TemplateId]] into a Map[TemplateId, Set[Party]]" in {
def party: String => IdString.Party = Party.assertFromString
def templateId: String => Identifier = Identifier.assertFromString
val partiesToTemplates =
Map(
party("p11") -> Set(templateId("a:b:t1")),
party("p12") -> Set(templateId("a:b:t1"), templateId("a:b:t2")),
party("p21") -> Set(templateId("a:b:t2")),
)
val expectedTemplatesToParties =
Map(
templateId("a:b:t1") -> Set(party("p11"), party("p12")),
templateId("a:b:t2") -> Set(party("p21"), party("p12")),
)
invertMapping(partiesToTemplates) shouldBe expectedTemplatesToParties
}
}
override def afterAll(): Unit = { override def afterAll(): Unit = {
val _ = actorSystem.terminate() val _ = actorSystem.terminate()
} }
} }
object BufferedTransactionsReaderSpec { object BufferedTransactionsReaderSpec {
private def transactionLogUpdate(discriminator: String) = private def transaction(discriminator: String) =
TransactionLogUpdate.Transaction( TransactionLogUpdate.Transaction(
transactionId = discriminator, transactionId = discriminator,
workflowId = "", workflowId = "",
@ -204,4 +256,9 @@ object BufferedTransactionsReaderSpec {
private def predecessor(offset: Offset): Offset = private def predecessor(offset: Offset): Offset =
Offset.fromByteArray((BigInt(offset.toByteArray) - 1).toByteArray) Offset.fromByteArray((BigInt(offset.toByteArray) - 1).toByteArray)
private def offset(idx: Long): Offset = {
val base = BigInt(1L) << 32
Offset.fromByteArray((base + idx).toByteArray)
}
} }

View File

@ -4,10 +4,10 @@
package com.daml.platform.store.cache package com.daml.platform.store.cache
import java.util.concurrent.Executors import java.util.concurrent.Executors
import com.codahale.metrics.MetricRegistry import com.codahale.metrics.MetricRegistry
import com.daml.ledger.offset.Offset
import com.daml.metrics.Metrics import com.daml.metrics.Metrics
import com.daml.platform.store.cache.BufferSlice.Prefix import com.daml.platform.store.cache.BufferSlice.{Inclusive, LastBufferChunkSuffix}
import com.daml.platform.store.cache.EventsBuffer.{RequestOffBufferBounds, UnorderedException} import com.daml.platform.store.cache.EventsBuffer.{RequestOffBufferBounds, UnorderedException}
import org.scalatest.Succeeded import org.scalatest.Succeeded
import org.scalatest.compatible.Assertion import org.scalatest.compatible.Assertion
@ -16,109 +16,192 @@ import org.scalatest.wordspec.AnyWordSpec
import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks
import scala.collection.Searching.{Found, InsertionPoint} import scala.collection.Searching.{Found, InsertionPoint}
import scala.collection.immutable import scala.collection.{View, immutable}
import scala.concurrent.duration.DurationInt import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorService, Future} import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorService, Future}
class EventsBufferSpec extends AnyWordSpec with Matchers with ScalaCheckDrivenPropertyChecks { class EventsBufferSpec extends AnyWordSpec with Matchers with ScalaCheckDrivenPropertyChecks {
private val BufferElements = Vector(2, 3, 5, 8, 13).map(idx => idx -> idx * 2) private val offsetIdx = Vector(2, 4, 6, 8, 10)
private val LastOffset = BufferElements.last._1 private val BeginOffset = offset(0L)
private val offsets @ Seq(offset1, offset2, offset3, offset4, offset5) =
offsetIdx.map(i => offset(i.toLong))
private val bufferElements @ Seq(entry1, entry2, entry3, entry4) =
offsets.zip(offsetIdx.map(_ * 2)).take(4)
private val LastOffset = offset4
private val IdentityFilter: Int => Option[Int] = Some(_)
"push" when { "push" when {
"max buffer size reached" should { "max buffer size reached" should {
"drop oldest" in withBuffer(3L) { buffer => "drop oldest" in withBuffer(3L) { buffer =>
buffer.slice(0, LastOffset) shouldBe Prefix(BufferElements.drop(2)) buffer.slice(BeginOffset, LastOffset, IdentityFilter) shouldBe LastBufferChunkSuffix(
buffer.push(21, 42) bufferedStartExclusive = offset2,
buffer.slice(0, 21) shouldBe Prefix(BufferElements.drop(3) :+ 21 -> 42) slice = Vector(entry3, entry4),
)
buffer.push(offset5, 21)
buffer.slice(BeginOffset, offset5, IdentityFilter) shouldBe LastBufferChunkSuffix(
bufferedStartExclusive = offset3,
slice = Vector(entry4, offset5 -> 21),
)
} }
} }
"element with smaller offset added" should { "element with smaller offset added" should {
"throw" in withBuffer(3L) { buffer => "throw" in withBuffer(3L) { buffer =>
intercept[UnorderedException[Int]] { intercept[UnorderedException[Int]] {
buffer.push(1, 2) buffer.push(offset1, 2)
}.getMessage shouldBe s"Elements appended to the buffer should have strictly increasing offsets: 13 vs 1" }.getMessage shouldBe s"Elements appended to the buffer should have strictly increasing offsets: $offset4 vs $offset1"
} }
} }
"element with equal offset added" should { "element with equal offset added" should {
"throw" in withBuffer(3L) { buffer => "throw" in withBuffer(3L) { buffer =>
intercept[UnorderedException[Int]] { intercept[UnorderedException[Int]] {
buffer.push(13, 2) buffer.push(offset4, 2)
}.getMessage shouldBe s"Elements appended to the buffer should have strictly increasing offsets: 13 vs 13" }.getMessage shouldBe s"Elements appended to the buffer should have strictly increasing offsets: $offset4 vs $offset4"
} }
} }
"range end with equal offset added" should { "range end with equal offset added" should {
"accept it" in withBuffer(3L) { buffer => "accept it" in withBuffer(3L) { buffer =>
buffer.push(13, Int.MaxValue) buffer.push(LastOffset, Int.MaxValue)
buffer.slice(0, 13) shouldBe Prefix(BufferElements.drop(2)) buffer.slice(BeginOffset, LastOffset, IdentityFilter) shouldBe LastBufferChunkSuffix(
bufferedStartExclusive = offset2,
slice = Vector(entry3, entry4),
)
} }
} }
"range end with greater offset added" should { "range end with greater offset added" should {
"not allow new element with lower offset" in withBuffer(3) { buffer => "not allow new element with lower offset" in withBuffer(3) { buffer =>
buffer.push(15, Int.MaxValue) buffer.push(offset(15), Int.MaxValue)
buffer.slice(0, 13) shouldBe Prefix(BufferElements.drop(2))
intercept[UnorderedException[Int]] { intercept[UnorderedException[Int]] {
buffer.push(14, 28) buffer.push(offset(14), 28)
}.getMessage shouldBe s"Elements appended to the buffer should have strictly increasing offsets: 15 vs 14" }.getMessage shouldBe s"Elements appended to the buffer should have strictly increasing offsets: ${offset(15)} vs ${offset(14)}"
} }
} }
} }
"getEvents" when { "slice" when {
"called with inclusive range" should { "filters" in withBuffer() { buffer =>
"return the full buffer contents" in withBuffer() { buffer => buffer.slice(offset1, offset4, Some(_).filterNot(_ == entry3._2)) shouldBe Inclusive(
buffer.slice(0, 13) shouldBe Prefix(BufferElements) Vector(entry2, entry4)
)
}
"called with startExclusive gteq than the buffer start" should {
"return an Inclusive slice" in withBuffer() { buffer =>
buffer.slice(offset1, succ(offset3), IdentityFilter) shouldBe BufferSlice.Inclusive(
Vector(entry2, entry3)
)
buffer.slice(offset1, offset4, IdentityFilter) shouldBe BufferSlice.Inclusive(
Vector(entry2, entry3, entry4)
)
buffer.slice(succ(offset1), offset4, IdentityFilter) shouldBe BufferSlice.Inclusive(
Vector(entry2, entry3, entry4)
)
}
"return an Inclusive chunk result if resulting slice is bigger than maxFetchSize" in withBuffer(
maxFetchSize = 2
) { buffer =>
buffer.slice(offset1, offset4, IdentityFilter) shouldBe Inclusive(
Vector(entry2, entry3)
)
} }
} }
"called with range end before buffer range" should { "called with endInclusive lteq startExclusive" should {
"not include elements past the requested end inclusive" in withBuffer() { buffer => "return an empty Inclusive slice if startExclusive is gteq buffer start" in withBuffer() {
buffer.slice(0, 12) shouldBe Prefix(BufferElements.dropRight(1)) buffer =>
buffer.slice(0, 8) shouldBe Prefix(BufferElements.dropRight(1)) buffer.slice(offset1, offset1, IdentityFilter) shouldBe Inclusive(Vector.empty)
buffer.slice(offset2, offset1, IdentityFilter) shouldBe Inclusive(Vector.empty)
}
"return an empty LastBufferChunkSuffix slice if startExclusive is before buffer start" in withBuffer(
maxBufferSize = 2L
) { buffer =>
buffer.slice(offset1, offset1, IdentityFilter) shouldBe LastBufferChunkSuffix(
offset1,
Vector.empty,
)
buffer.slice(offset2, offset1, IdentityFilter) shouldBe LastBufferChunkSuffix(
offset1,
Vector.empty,
)
} }
} }
"called with range start exclusive past the buffer start range" in withBuffer() { buffer => "called with startExclusive before the buffer start" should {
buffer.slice(4, 13) shouldBe BufferSlice.Inclusive(BufferElements.drop(2)) "return a LastBufferChunkSuffix slice" in withBuffer() { buffer =>
buffer.slice(5, 13) shouldBe BufferSlice.Inclusive(BufferElements.drop(3)) buffer.slice(Offset.beforeBegin, offset3, IdentityFilter) shouldBe LastBufferChunkSuffix(
offset1,
Vector(entry2, entry3),
)
buffer.slice(
Offset.beforeBegin,
succ(offset3),
IdentityFilter,
) shouldBe LastBufferChunkSuffix(
offset1,
Vector(entry2, entry3),
)
}
"return a the last filtered chunk as LastBufferChunkSuffix slice if resulting slice is bigger than maxFetchSize" in withBuffer(
maxFetchSize = 2
) { buffer =>
buffer.slice(Offset.beforeBegin, offset4, IdentityFilter) shouldBe LastBufferChunkSuffix(
offset2,
Vector(entry3, entry4),
)
}
} }
"called with endInclusive exceeding buffer range" should { "called with endInclusive exceeding buffer range" should {
"fail with exception" in withBuffer() { buffer => val (toBeBuffered, Vector((notBufferedOffset, _))) = bufferElements.splitAt(3)
"fail with exception" in withBuffer(elems = toBeBuffered) { buffer =>
intercept[RequestOffBufferBounds[Int]] { intercept[RequestOffBufferBounds[Int]] {
buffer.slice(4, 15) buffer.slice(offset3, notBufferedOffset, IdentityFilter)
}.getMessage shouldBe s"Request endInclusive (15) is higher than bufferEnd (13)" }.getMessage shouldBe s"Request endInclusive ($offset4) is higher than bufferEnd ($offset3)"
} }
} }
"called after push from a different thread" should { "called after push from a different thread" should {
"always see the most recent updates" in withBuffer(1000, Vector.empty) { buffer => "always see the most recent updates" in withBuffer(1000, Vector.empty, maxFetchSize = 1000) {
(0 until 1000).foreach(idx => buffer.push(idx, idx)) // fill buffer to max size buffer =>
(0 until 1000).foreach(idx =>
buffer.push(offset(idx.toLong), idx)
) // fill buffer to max size
val pushExecutor, sliceExecutor = val pushExecutor, sliceExecutor =
ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1)) ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1))
(0 until 1000).foreach { idx => (0 until 1000).foreach { idx =>
val expected = ((idx + 901) to (1000 + idx)).map(idx => idx -> idx) val expected = ((idx + 901) to (1000 + idx)).map(idx => offset(idx.toLong) -> idx)
implicit val ec: ExecutionContextExecutorService = pushExecutor implicit val ec: ExecutionContextExecutorService = pushExecutor
Await.result( Await.result(
// Simulate different thread accesses for push/slice // Simulate different thread accesses for push/slice
awaitable = { awaitable = {
for { for {
_ <- Future(buffer.push(1000 + idx, 1000 + idx))(pushExecutor) _ <- Future(buffer.push(offset((1000 + idx).toLong), 1000 + idx))(pushExecutor)
_ <- Future(buffer.slice(900 + idx, 1000 + idx))(sliceExecutor) _ <- Future(
.map(_.slice should contain theSameElementsInOrderAs expected)(sliceExecutor) buffer.slice(
} yield Succeeded offset((900 + idx).toLong),
}, offset((1000 + idx).toLong),
atMost = 1.seconds, IdentityFilter,
) )
} )(
Succeeded sliceExecutor
)
.map(_.slice should contain theSameElementsInOrderAs expected)(sliceExecutor)
} yield Succeeded
},
atMost = 1.seconds,
)
}
Succeeded
} }
} }
} }
@ -126,36 +209,51 @@ class EventsBufferSpec extends AnyWordSpec with Matchers with ScalaCheckDrivenPr
"prune" when { "prune" when {
"element found" should { "element found" should {
"prune inclusive" in withBuffer() { buffer => "prune inclusive" in withBuffer() { buffer =>
buffer.prune(5) buffer.prune(offset3)
buffer.slice(0, LastOffset) shouldBe Prefix(Vector(8 -> 16, 13 -> 26)) buffer.slice(BeginOffset, LastOffset, IdentityFilter) shouldBe LastBufferChunkSuffix(
offset4,
bufferElements.drop(4),
)
} }
} }
"element not present" should { "element not present" should {
"prune inclusive" in withBuffer() { buffer => "prune inclusive" in withBuffer() { buffer =>
buffer.prune(6) buffer.prune(offset(6))
buffer.slice(0, LastOffset) shouldBe Prefix(Vector(8 -> 16, 13 -> 26)) buffer.slice(BeginOffset, LastOffset, IdentityFilter) shouldBe LastBufferChunkSuffix(
offset4,
bufferElements.drop(4),
)
} }
} }
"element before series" should { "element before series" should {
"not prune" in withBuffer() { buffer => "not prune" in withBuffer() { buffer =>
buffer.prune(1) buffer.prune(offset(1))
buffer.slice(0, LastOffset) shouldBe Prefix(BufferElements) buffer.slice(BeginOffset, LastOffset, IdentityFilter) shouldBe LastBufferChunkSuffix(
offset1,
bufferElements.drop(1),
)
} }
} }
"element after series" should { "element after series" should {
"prune all" in withBuffer() { buffer => "prune all" in withBuffer() { buffer =>
buffer.prune(15) buffer.prune(offset5)
buffer.slice(0, LastOffset) shouldBe BufferSlice.Empty buffer.slice(BeginOffset, LastOffset, IdentityFilter) shouldBe LastBufferChunkSuffix(
LastOffset,
Vector.empty,
)
} }
} }
"one element in buffer" should { "one element in buffer" should {
"prune all" in withBuffer(1, Vector(1 -> 2)) { buffer => "prune all" in withBuffer(1, Vector(offset(1) -> 2)) { buffer =>
buffer.prune(1) buffer.prune(offset(1))
buffer.slice(0, 1) shouldBe BufferSlice.Empty buffer.slice(BeginOffset, offset(1), IdentityFilter) shouldBe LastBufferChunkSuffix(
offset(1),
Vector.empty,
)
} }
} }
} }
@ -183,17 +281,92 @@ class EventsBufferSpec extends AnyWordSpec with Matchers with ScalaCheckDrivenPr
} }
} }
"indexAfter" should {
"yield the index gt the searched entry" in {
EventsBuffer.indexAfter(InsertionPoint(3)) shouldBe 3
EventsBuffer.indexAfter(Found(3)) shouldBe 4
}
}
"filterAndChunkSlice" should {
"return an Inclusive result with filter" in {
val input = Vector(entry1, entry2, entry3, entry4).view
EventsBuffer.filterAndChunkSlice[Int, Int](
sliceView = input,
filter = Option(_).filterNot(_ == entry2._2),
maxChunkSize = 3,
) shouldBe Vector(entry1, entry3, entry4)
EventsBuffer.filterAndChunkSlice[Int, Int](
sliceView = View.empty,
filter = Some(_),
maxChunkSize = 3,
) shouldBe Vector.empty
}
}
"lastFilteredChunk" should {
val input = Vector(entry1, entry2, entry3, entry4)
"return a LastBufferChunkSuffix with the last maxChunkSize-sized chunk from the slice with filter" in {
EventsBuffer.lastFilteredChunk[Int, Int](
bufferSlice = input,
filter = Option(_).filterNot(_ == entry2._2),
maxChunkSize = 1,
) shouldBe LastBufferChunkSuffix(entry3._1, Vector(entry4))
EventsBuffer.lastFilteredChunk[Int, Int](
bufferSlice = input,
filter = Option(_).filterNot(_ == entry2._2),
maxChunkSize = 2,
) shouldBe LastBufferChunkSuffix(entry1._1, Vector(entry3, entry4))
EventsBuffer.lastFilteredChunk[Int, Int](
bufferSlice = input,
filter = Option(_).filterNot(_ == entry2._2),
maxChunkSize = 3,
) shouldBe LastBufferChunkSuffix(entry1._1, Vector(entry3, entry4))
EventsBuffer.lastFilteredChunk[Int, Int](
bufferSlice = input,
filter = Some(_), // No filter
maxChunkSize = 4,
) shouldBe LastBufferChunkSuffix(entry1._1, Vector(entry2, entry3, entry4))
}
"use the slice head as bufferedStartExclusive when filter yields an empty result slice" in {
EventsBuffer.lastFilteredChunk[Int, Int](
bufferSlice = input,
filter = _ => None,
maxChunkSize = 2,
) shouldBe LastBufferChunkSuffix(entry1._1, Vector.empty)
}
}
private def withBuffer( private def withBuffer(
maxBufferSize: Long = 5L, maxBufferSize: Long = 5L,
elems: immutable.Vector[(Int, Int)] = BufferElements, elems: immutable.Vector[(Offset, Int)] = bufferElements,
)(test: EventsBuffer[Int, Int] => Assertion): Assertion = { maxFetchSize: Int = 10,
val buffer = new EventsBuffer[Int, Int]( )(test: EventsBuffer[Int] => Assertion): Assertion = {
val buffer = new EventsBuffer[Int](
maxBufferSize, maxBufferSize,
new Metrics(new MetricRegistry), new Metrics(new MetricRegistry),
"integers", "integers",
_ == Int.MaxValue, // Signifies ledger end _ == Int.MaxValue, // Signifies ledger end
maxBufferedChunkSize = maxFetchSize,
) )
elems.foreach { case (offset, event) => buffer.push(offset, event) } elems.foreach { case (offset, event) => buffer.push(offset, event) }
test(buffer) test(buffer)
} }
private def offset(idx: Long): Offset = {
val base = BigInt(1L) << 32
Offset.fromByteArray((base + idx).toByteArray)
}
private def succ(offset: Offset): Offset = {
val bigInt = BigInt(offset.toByteArray)
Offset.fromByteArray((bigInt + 1).toByteArray)
}
} }

View File

@ -522,7 +522,8 @@ conformance_test(
server = ":app", server = ":app",
server_args = [ server_args = [
"--contract-id-seeding=testing-weak", "--contract-id-seeding=testing-weak",
"--participant participant-id=example,port=6865,ledger-api-transactions-buffer-max-size=10", "--participant participant-id=example,port=6865,ledger-api-transactions-buffer-max-size=3",
"--buffered-streams-page-size=1",
"--buffered-ledger-api-streams", "--buffered-ledger-api-streams",
], ],
test_tool_args = [ test_tool_args = [

View File

@ -76,6 +76,7 @@ object ConfigConverter {
commandConfig = sandboxConfig.commandConfig, commandConfig = sandboxConfig.commandConfig,
enableInMemoryFanOutForLedgerApi = false, enableInMemoryFanOutForLedgerApi = false,
eventsPageSize = sandboxConfig.eventsPageSize, eventsPageSize = sandboxConfig.eventsPageSize,
bufferedStreamsPageSize = 100,
eventsProcessingParallelism = sandboxConfig.eventsProcessingParallelism, eventsProcessingParallelism = sandboxConfig.eventsProcessingParallelism,
extra = extraBridgeConfig, extra = extraBridgeConfig,
ledgerId = sandboxConfig.ledgerIdMode match { ledgerId = sandboxConfig.ledgerIdMode match {