Added buffer size metrics for getTransactions/getTransactionTrees (#10744)

CHANGELOG_BEGIN
CHANGELOG_END
This commit is contained in:
tudor-da 2021-09-03 10:51:00 +02:00 committed by GitHub
parent f76c868ee4
commit e5a6d70182
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 29 additions and 12 deletions

View File

@ -370,6 +370,11 @@ final class Metrics(val registry: MetricRegistry) {
val transactionLogUpdatesBufferSize: Counter =
registry.counter(Prefix :+ "transaction_log_updates_buffer_size")
val transactionTreesBufferSize: Counter =
registry.counter(Prefix :+ "transaction_trees_buffer_size")
val flatTransactionsBufferSize: Counter =
registry.counter(Prefix :+ "flat_transactions_buffer_size")
val contractStateEventsBufferSize: Counter =
registry.counter(Prefix :+ "contract_state_events_buffer_size")
@ -686,12 +691,12 @@ final class Metrics(val registry: MetricRegistry) {
val toFlatTransactions: Timer = registry.timer(Prefix :+ "to_flat_transactions")
val toTransactionTrees: Timer = registry.timer(Prefix :+ "to_transaction_trees")
}
val transactionTreesBufferSize: Counter =
registry.counter(Prefix :+ "transaction_trees_buffer_size")
val flatTransactionsBufferSize: Counter =
registry.counter(Prefix :+ "flat_transactions_buffer_size")
val transactionTreesBufferSize: Counter =
registry.counter(Prefix :+ "transaction_trees_buffer_size")
val flatTransactionsBufferSize: Counter =
registry.counter(Prefix :+ "flat_transactions_buffer_size")
}
val getContractStateEventsChunkSize: Histogram =
registry.histogram(Prefix :+ "get_contract_state_events_chunk_fetch_size")

View File

@ -68,7 +68,7 @@ private[events] class BufferedTransactionsReader(
resolvedFromBufferCounter =
metrics.daml.services.index.streamsBuffer.flatTransactionsBuffered,
totalRetrievedCounter = metrics.daml.services.index.streamsBuffer.flatTransactionsTotal,
bufferSizeCounter = metrics.daml.services.index.flatTransactionsBufferSize,
bufferSizeCounter = metrics.daml.services.index.streamsBuffer.flatTransactionsBufferSize,
outputStreamBufferSize = outputStreamBufferSize,
)
}
@ -93,7 +93,7 @@ private[events] class BufferedTransactionsReader(
totalRetrievedCounter = metrics.daml.services.index.streamsBuffer.transactionTreesTotal,
bufferSizeCounter =
// TODO in-memory fan-out: Specialize the metric per consumer
metrics.daml.services.index.transactionTreesBufferSize,
metrics.daml.services.index.streamsBuffer.transactionTreesBufferSize,
outputStreamBufferSize = outputStreamBufferSize,
)

View File

@ -126,12 +126,18 @@ private[appendonlydao] final class TransactionsReader(
})
.mapMaterializedValue(_ => NotUsed)
groupContiguous(events)(by = _.transactionId)
val flatTransactionsStream = groupContiguous(events)(by = _.transactionId)
.mapConcat { events =>
val response = EventsTable.Entry.toGetTransactionsResponse(events)
response.map(r => offsetFor(r) -> r)
}
.buffer(outputStreamBufferSize, OverflowStrategy.backpressure)
InstrumentedSource
.bufferedSource(
flatTransactionsStream,
metrics.daml.index.flatTransactionsBufferSize,
outputStreamBufferSize,
)
.wireTap(_ match {
case (_, response) =>
response.transactions.foreach(txn =>
@ -221,12 +227,18 @@ private[appendonlydao] final class TransactionsReader(
})
.mapMaterializedValue(_ => NotUsed)
groupContiguous(events)(by = _.transactionId)
val transactionTreesStream = groupContiguous(events)(by = _.transactionId)
.mapConcat { events =>
val response = EventsTable.Entry.toGetTransactionTreesResponse(events)
response.map(r => offsetFor(r) -> r)
}
.buffer(outputStreamBufferSize, OverflowStrategy.backpressure)
InstrumentedSource
.bufferedSource(
transactionTreesStream,
metrics.daml.index.transactionTreesBufferSize,
outputStreamBufferSize,
)
.wireTap(_ match {
case (_, response) =>
response.transactions.foreach(txn =>

View File

@ -101,7 +101,7 @@ class BufferedTransactionsReaderSpec
resolvedFromBufferCounter =
metrics.daml.services.index.streamsBuffer.transactionTreesBuffered,
totalRetrievedCounter = metrics.daml.services.index.streamsBuffer.transactionTreesTotal,
bufferSizeCounter = metrics.daml.services.index.transactionTreesBufferSize,
bufferSizeCounter = metrics.daml.services.index.streamsBuffer.transactionTreesBufferSize,
outputStreamBufferSize = 128,
)
.runWith(Sink.seq)