[DPP-601] Measure only active time in indexer stage metrics (#12951)

* [DPP-601] Measure only active time in indexer stage metrics

changelog_begin
changelog_end
This commit is contained in:
Sergey Kisel 2022-02-22 18:07:26 +01:00 committed by GitHub
parent a4f9dd79c6
commit c50360bbb2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 77 additions and 115 deletions

View File

@ -108,6 +108,12 @@ class IndexerBenchmark() {
val duration: Double = (stopTime - startTime).toDouble / 1000000000.0 val duration: Double = (stopTime - startTime).toDouble / 1000000000.0
val updates: Long = metrics.daml.parallelIndexer.updates.getCount val updates: Long = metrics.daml.parallelIndexer.updates.getCount
val updateRate: Double = updates / duration val updateRate: Double = updates / duration
val inputMappingDurationMetric = metrics.registry.timer(
MetricRegistry.name(metrics.daml.parallelIndexer.inputMapping.executor, "duration")
)
val batchingDurationMetric = metrics.registry.timer(
MetricRegistry.name(metrics.daml.parallelIndexer.batching.executor, "duration")
)
val (failure, minimumUpdateRateFailureInfo): (Boolean, String) = val (failure, minimumUpdateRateFailureInfo): (Boolean, String) =
config.minUpdateRate match { config.minUpdateRate match {
case Some(requiredMinUpdateRate) if requiredMinUpdateRate > updateRate => case Some(requiredMinUpdateRate) if requiredMinUpdateRate > updateRate =>
@ -149,13 +155,21 @@ class IndexerBenchmark() {
metrics.daml.parallelIndexer.inputMapping.batchSize.getSnapshot metrics.daml.parallelIndexer.inputMapping.batchSize.getSnapshot
)} )}
| inputMapping.duration: ${histogramToString( | inputMapping.duration: ${histogramToString(
metrics.daml.parallelIndexer.inputMapping.duration.getSnapshot inputMappingDurationMetric.getSnapshot
)} )}
| inputMapping.duration.rate: ${metrics.daml.parallelIndexer.inputMapping.duration.getMeanRate} | inputMapping.duration.rate: ${inputMappingDurationMetric.getMeanRate}
| batching.duration: ${histogramToString(batchingDurationMetric.getSnapshot)}
| batching.duration.rate: ${batchingDurationMetric.getMeanRate}
| seqMapping.duration: ${metrics.daml.parallelIndexer.seqMapping.duration.getSnapshot}|
| seqMapping.duration.rate: ${metrics.daml.parallelIndexer.seqMapping.duration.getMeanRate}|
| ingestion.duration: ${histogramToString( | ingestion.duration: ${histogramToString(
metrics.daml.parallelIndexer.ingestion.duration.getSnapshot metrics.daml.parallelIndexer.ingestion.executionTimer.getSnapshot
)} )}
| ingestion.duration.rate: ${metrics.daml.parallelIndexer.ingestion.duration.getMeanRate} | ingestion.duration.rate: ${metrics.daml.parallelIndexer.ingestion.executionTimer.getMeanRate}
| tailIngestion.duration: ${histogramToString(
metrics.daml.parallelIndexer.tailIngestion.executionTimer.getSnapshot
)}
| tailIngestion.duration.rate: ${metrics.daml.parallelIndexer.tailIngestion.executionTimer.getMeanRate}
| |
|Notes: |Notes:
| The above numbers include all ingested updates, including package uploads. | The above numbers include all ingested updates, including package uploads.

View File

@ -594,10 +594,6 @@ final class Metrics(val registry: MetricRegistry) {
// Bundle of metrics coming from instrumentation of the underlying thread-pool // Bundle of metrics coming from instrumentation of the underlying thread-pool
val executor: MetricName = Prefix :+ "executor" val executor: MetricName = Prefix :+ "executor"
// The latency, which during an update element is residing in the mapping-stage.
// Since batches are involved, this duration is divided by the batch size.
val duration: Timer = registry.timer(Prefix :+ "duration")
// The batch size, i.e., the number of state updates per database submission // The batch size, i.e., the number of state updates per database submission
val batchSize: Histogram = registry.histogram(Prefix :+ "batch_size") val batchSize: Histogram = registry.histogram(Prefix :+ "batch_size")
} }
@ -609,21 +605,19 @@ final class Metrics(val registry: MetricRegistry) {
// Bundle of metrics coming from instrumentation of the underlying thread-pool // Bundle of metrics coming from instrumentation of the underlying thread-pool
val executor: MetricName = Prefix :+ "executor" val executor: MetricName = Prefix :+ "executor"
}
// The latency, which during an update element is residing in the batching-stage. // Sequence Mapping stage
// Since batches are involved, this duration is divided by the batch size. object seqMapping {
private val Prefix: MetricName = parallelIndexer.Prefix :+ "seqmapping"
// The latency, which during an update element is residing in the seq-mapping-stage.
val duration: Timer = registry.timer(Prefix :+ "duration") val duration: Timer = registry.timer(Prefix :+ "duration")
} }
// Ingestion stage // Ingestion stage
// Parallel ingestion of prepared data into the database // Parallel ingestion of prepared data into the database
object ingestion extends DatabaseMetrics(registry, Prefix, "ingestion") { val ingestion = new DatabaseMetrics(registry, Prefix, "ingestion")
private val Prefix: MetricName = parallelIndexer.Prefix :+ "ingestion"
// The latency, which during an update element is residing in the ingestion.
// Since batches are involved, this duration is divided by the batch size.
val duration: Timer = registry.timer(Prefix :+ "duration")
}
// Tail ingestion stage // Tail ingestion stage
// The throttled update of ledger end parameters // The throttled update of ledger end parameters

View File

@ -11,7 +11,7 @@ import com.daml.ledger.participant.state.{v2 => state}
import com.daml.lf.data.Ref import com.daml.lf.data.Ref
import com.daml.logging.LoggingContext.withEnrichedLoggingContext import com.daml.logging.LoggingContext.withEnrichedLoggingContext
import com.daml.logging.{ContextualizedLogger, LoggingContext} import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.{InstrumentedSource, Metrics} import com.daml.metrics.{InstrumentedSource, Metrics, Timed}
import com.daml.platform.indexer.ha.Handle import com.daml.platform.indexer.ha.Handle
import com.daml.platform.indexer.parallel.AsyncSupport._ import com.daml.platform.indexer.parallel.AsyncSupport._
import com.daml.platform.store.appendonlydao.DbDispatcher import com.daml.platform.store.appendonlydao.DbDispatcher
@ -21,7 +21,6 @@ import com.daml.platform.store.backend._
import com.daml.platform.store.interning.{InternizingStringInterningView, StringInterning} import com.daml.platform.store.interning.{InternizingStringInterningView, StringInterning}
import java.sql.Connection import java.sql.Connection
import java.util.concurrent.TimeUnit
import scala.concurrent.Future import scala.concurrent.Future
private[platform] case class ParallelIndexerSubscription[DB_BATCH]( private[platform] case class ParallelIndexerSubscription[DB_BATCH](
@ -72,7 +71,7 @@ private[platform] case class ParallelIndexerSubscription[DB_BATCH](
), ),
batchingParallelism = batchingParallelism, batchingParallelism = batchingParallelism,
batcher = batcherExecutor.execute( batcher = batcherExecutor.execute(
batcher(ingestionStorageBackend.batch(_, stringInterningView), metrics) batcher(ingestionStorageBackend.batch(_, stringInterningView))
), ),
ingestingParallelism = ingestionParallelism, ingestingParallelism = ingestionParallelism,
ingester = ingester(ingestionStorageBackend.insertBatch, dbDispatcher, metrics), ingester = ingester(ingestionStorageBackend.insertBatch, dbDispatcher, metrics),
@ -87,7 +86,6 @@ private[platform] case class ParallelIndexerSubscription[DB_BATCH](
counter = metrics.daml.parallelIndexer.inputBufferLength, counter = metrics.daml.parallelIndexer.inputBufferLength,
size = maxInputBufferSize, size = maxInputBufferSize,
) )
.map(_ -> System.nanoTime())
) )
.map(_ => ()) .map(_ => ())
.viaMat(KillSwitches.single)(Keep.right[NotUsed, UniqueKillSwitch]) .viaMat(KillSwitches.single)(Keep.right[NotUsed, UniqueKillSwitch])
@ -109,7 +107,6 @@ object ParallelIndexerSubscription {
* @param lastRecordTime The latest record time in the batch, in milliseconds since Epoch. Needed for metrics population. * @param lastRecordTime The latest record time in the batch, in milliseconds since Epoch. Needed for metrics population.
* @param batch The batch of variable type. * @param batch The batch of variable type.
* @param batchSize Size of the batch measured in number of updates. Needed for metrics population. * @param batchSize Size of the batch measured in number of updates. Needed for metrics population.
* @param averageStartTime The nanosecond timestamp of the start of the previous processing stage. Needed for metrics population: how much time is spend by a particular update in a certain stage.
*/ */
case class Batch[+T]( case class Batch[+T](
lastOffset: Offset, lastOffset: Offset,
@ -118,7 +115,6 @@ object ParallelIndexerSubscription {
lastRecordTime: Long, lastRecordTime: Long,
batch: T, batch: T,
batchSize: Int, batchSize: Int,
averageStartTime: Long,
offsets: Vector[Offset], offsets: Vector[Offset],
) )
@ -128,32 +124,31 @@ object ParallelIndexerSubscription {
toMeteringDbDto: Iterable[(Offset, state.Update)] => Vector[DbDto.TransactionMetering], toMeteringDbDto: Iterable[(Offset, state.Update)] => Vector[DbDto.TransactionMetering],
)(implicit )(implicit
loggingContext: LoggingContext loggingContext: LoggingContext
): Iterable[((Offset, state.Update), Long)] => Batch[Vector[DbDto]] = { input => ): Iterable[(Offset, state.Update)] => Batch[Vector[DbDto]] = { input =>
metrics.daml.parallelIndexer.inputMapping.batchSize.update(input.size) metrics.daml.parallelIndexer.inputMapping.batchSize.update(input.size)
input.foreach { case ((offset, update), _) => input.foreach { case (offset, update) =>
withEnrichedLoggingContext("offset" -> offset, "update" -> update) { withEnrichedLoggingContext("offset" -> offset, "update" -> update) {
implicit loggingContext => implicit loggingContext =>
logger.info(s"Storing ${update.description}") logger.info(s"Storing ${update.description}")
} }
} }
val mainBatch = input.iterator.flatMap { case ((offset, update), _) => val mainBatch = input.iterator.flatMap { case (offset, update) =>
toDbDto(offset)(update) toDbDto(offset)(update)
}.toVector }.toVector
val meteringBatch = toMeteringDbDto(input.map(_._1)) val meteringBatch = toMeteringDbDto(input)
val batch = mainBatch ++ meteringBatch val batch = mainBatch ++ meteringBatch
Batch( Batch(
lastOffset = input.last._1._1, lastOffset = input.last._1,
lastSeqEventId = 0, // will be filled later in the sequential step lastSeqEventId = 0, // will be filled later in the sequential step
lastStringInterningId = 0, // will be filled later in the sequential step lastStringInterningId = 0, // will be filled later in the sequential step
lastRecordTime = input.last._1._2.recordTime.toInstant.toEpochMilli, lastRecordTime = input.last._2.recordTime.toInstant.toEpochMilli,
batch = batch, batch = batch,
batchSize = input.size, batchSize = input.size,
averageStartTime = input.view.map(_._2 / input.size).sum, offsets = input.view.map(_._1).toVector,
offsets = input.view.map(_._1._1).toVector,
) )
} }
@ -169,7 +164,6 @@ object ParallelIndexerSubscription {
lastRecordTime = 0, lastRecordTime = 0,
batch = Vector.empty, batch = Vector.empty,
batchSize = 0, batchSize = 0,
averageStartTime = 0,
offsets = Vector.empty, offsets = Vector.empty,
) )
@ -180,61 +174,52 @@ object ParallelIndexerSubscription {
previous: Batch[Vector[DbDto]], previous: Batch[Vector[DbDto]],
current: Batch[Vector[DbDto]], current: Batch[Vector[DbDto]],
): Batch[Vector[DbDto]] = { ): Batch[Vector[DbDto]] = {
var eventSeqId = previous.lastSeqEventId Timed.value(
val batchWithSeqIds = current.batch.map { metrics.daml.parallelIndexer.seqMapping.duration, {
case dbDto: DbDto.EventCreate => var eventSeqId = previous.lastSeqEventId
eventSeqId += 1 val batchWithSeqIds = current.batch.map {
dbDto.copy(event_sequential_id = eventSeqId) case dbDto: DbDto.EventCreate =>
eventSeqId += 1
dbDto.copy(event_sequential_id = eventSeqId)
case dbDto: DbDto.EventExercise => case dbDto: DbDto.EventExercise =>
eventSeqId += 1 eventSeqId += 1
dbDto.copy(event_sequential_id = eventSeqId) dbDto.copy(event_sequential_id = eventSeqId)
case dbDto: DbDto.EventDivulgence => case dbDto: DbDto.EventDivulgence =>
eventSeqId += 1 eventSeqId += 1
dbDto.copy(event_sequential_id = eventSeqId) dbDto.copy(event_sequential_id = eventSeqId)
case dbDto: DbDto.CreateFilter => case dbDto: DbDto.CreateFilter =>
// we do not increase the event_seq_id here, because all the CreateFilter DbDto-s must have the same eventSeqId as the preceding EventCreate // we do not increase the event_seq_id here, because all the CreateFilter DbDto-s must have the same eventSeqId as the preceding EventCreate
dbDto.copy(event_sequential_id = eventSeqId) dbDto.copy(event_sequential_id = eventSeqId)
case unChanged => unChanged case unChanged => unChanged
} }
val (newLastStringInterningId, dbDtosWithStringInterning) = val (newLastStringInterningId, dbDtosWithStringInterning) =
internize(batchWithSeqIds) internize(batchWithSeqIds)
.map(DbDto.StringInterningDto.from) match { .map(DbDto.StringInterningDto.from) match {
case noNewEntries if noNewEntries.isEmpty => case noNewEntries if noNewEntries.isEmpty =>
previous.lastStringInterningId -> batchWithSeqIds previous.lastStringInterningId -> batchWithSeqIds
case newEntries => newEntries.last.internalId -> (batchWithSeqIds ++ newEntries) case newEntries => newEntries.last.internalId -> (batchWithSeqIds ++ newEntries)
} }
val nowNanos = System.nanoTime() current.copy(
metrics.daml.parallelIndexer.inputMapping.duration.update( lastSeqEventId = eventSeqId,
(nowNanos - current.averageStartTime) / current.batchSize, lastStringInterningId = newLastStringInterningId,
TimeUnit.NANOSECONDS, batch = dbDtosWithStringInterning,
) )
current.copy( },
lastSeqEventId = eventSeqId,
lastStringInterningId = newLastStringInterningId,
batch = dbDtosWithStringInterning,
averageStartTime = nowNanos, // setting start time to the start of the next stage
) )
} }
def batcher[DB_BATCH]( def batcher[DB_BATCH](
batchF: Vector[DbDto] => DB_BATCH, batchF: Vector[DbDto] => DB_BATCH
metrics: Metrics,
): Batch[Vector[DbDto]] => Batch[DB_BATCH] = { inBatch => ): Batch[Vector[DbDto]] => Batch[DB_BATCH] = { inBatch =>
val dbBatch = batchF(inBatch.batch) val dbBatch = batchF(inBatch.batch)
val nowNanos = System.nanoTime()
metrics.daml.parallelIndexer.batching.duration.update(
(nowNanos - inBatch.averageStartTime) / inBatch.batchSize,
TimeUnit.NANOSECONDS,
)
inBatch.copy( inBatch.copy(
batch = dbBatch, batch = dbBatch
averageStartTime = nowNanos,
) )
} }
@ -248,11 +233,6 @@ object ParallelIndexerSubscription {
dbDispatcher.executeSql(metrics.daml.parallelIndexer.ingestion) { connection => dbDispatcher.executeSql(metrics.daml.parallelIndexer.ingestion) { connection =>
metrics.daml.parallelIndexer.updates.inc(batch.batchSize.toLong) metrics.daml.parallelIndexer.updates.inc(batch.batchSize.toLong)
ingestFunction(connection, batch.batch) ingestFunction(connection, batch.batch)
val nowNanos = System.nanoTime()
metrics.daml.parallelIndexer.ingestion.duration.update(
(nowNanos - batch.averageStartTime) / batch.batchSize,
TimeUnit.NANOSECONDS,
)
batch batch
} }
} }
@ -268,7 +248,6 @@ object ParallelIndexerSubscription {
lastRecordTime = curr.lastRecordTime, lastRecordTime = curr.lastRecordTime,
batch = zeroDbBatch, // not used anymore batch = zeroDbBatch, // not used anymore
batchSize = 0, // not used anymore batchSize = 0, // not used anymore
averageStartTime = 0, // not used anymore
offsets = Vector.empty, // not used anymore offsets = Vector.empty, // not used anymore
) )

View File

@ -127,27 +127,18 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers {
toMeteringDbDto = _ => Vector.empty, toMeteringDbDto = _ => Vector.empty,
)(lc)( )(lc)(
List( List(
(Offset.fromHexString(Ref.HexString.assertFromString("00")), somePackageUploadRejected),
( (
(Offset.fromHexString(Ref.HexString.assertFromString("00")), somePackageUploadRejected), Offset.fromHexString(Ref.HexString.assertFromString("01")),
1000000001, somePackageUploadRejected.copy(recordTime =
somePackageUploadRejected.recordTime.addMicros(1000)
),
), ),
( (
( Offset.fromHexString(Ref.HexString.assertFromString("02")),
Offset.fromHexString(Ref.HexString.assertFromString("01")), somePackageUploadRejected.copy(recordTime =
somePackageUploadRejected.copy(recordTime = somePackageUploadRejected.recordTime.addMicros(2000)
somePackageUploadRejected.recordTime.addMicros(1000)
),
), ),
1000000002,
),
(
(
Offset.fromHexString(Ref.HexString.assertFromString("02")),
somePackageUploadRejected.copy(recordTime =
somePackageUploadRejected.recordTime.addMicros(2000)
),
),
1000000003,
), ),
) )
) )
@ -165,7 +156,6 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers {
someParty, someParty,
), ),
batchSize = 3, batchSize = 3,
averageStartTime = 1000000001,
offsets = Vector("00", "01", "02").map(offset), offsets = Vector("00", "01", "02").map(offset),
) )
actual shouldBe expected actual shouldBe expected
@ -234,10 +224,7 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers {
toMeteringDbDto = _ => expected, toMeteringDbDto = _ => expected,
)(lc)( )(lc)(
List( List(
( (Offset.fromHexString(offset), someTransactionAccepted)
(Offset.fromHexString(offset), someTransactionAccepted),
timestamp,
)
) )
) )
.batch .batch
@ -257,7 +244,6 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers {
lastRecordTime = 0, lastRecordTime = 0,
batch = Vector.empty, batch = Vector.empty,
batchSize = 0, batchSize = 0,
averageStartTime = 0,
offsets = Vector.empty, offsets = Vector.empty,
) )
} }
@ -287,13 +273,11 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers {
someParty, someParty,
), ),
batchSize = 3, batchSize = 3,
averageStartTime = 1000000001,
offsets = Vector("00", "01", "02").map(offset), offsets = Vector("00", "01", "02").map(offset),
), ),
) )
result.lastSeqEventId shouldBe 18 result.lastSeqEventId shouldBe 18
result.lastStringInterningId shouldBe 1 result.lastStringInterningId shouldBe 1
result.averageStartTime should be > System.nanoTime() - 1000000000
result.batch(1).asInstanceOf[DbDto.EventDivulgence].event_sequential_id shouldBe 16 result.batch(1).asInstanceOf[DbDto.EventDivulgence].event_sequential_id shouldBe 16
result.batch(3).asInstanceOf[DbDto.EventCreate].event_sequential_id shouldBe 17 result.batch(3).asInstanceOf[DbDto.EventCreate].event_sequential_id shouldBe 17
result.batch(4).asInstanceOf[DbDto.CreateFilter].event_sequential_id shouldBe 17 result.batch(4).asInstanceOf[DbDto.CreateFilter].event_sequential_id shouldBe 17
@ -320,7 +304,6 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers {
someParty, someParty,
), ),
batchSize = 3, batchSize = 3,
averageStartTime = 1000000001,
offsets = Vector("00", "01", "02").map(offset), offsets = Vector("00", "01", "02").map(offset),
), ),
) )
@ -332,8 +315,7 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers {
it should "batch correctly in happy path case" in { it should "batch correctly in happy path case" in {
val result = ParallelIndexerSubscription.batcher( val result = ParallelIndexerSubscription.batcher(
batchF = _ => "bumm", batchF = _ => "bumm"
metrics = metrics,
)( )(
Batch( Batch(
lastOffset = offset("02"), lastOffset = offset("02"),
@ -347,7 +329,6 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers {
someParty, someParty,
), ),
batchSize = 3, batchSize = 3,
averageStartTime = 1000000001,
offsets = Vector("00", "01", "02").map(offset), offsets = Vector("00", "01", "02").map(offset),
) )
) )
@ -358,10 +339,8 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers {
lastRecordTime = someTime.toEpochMilli, lastRecordTime = someTime.toEpochMilli,
batch = "bumm", batch = "bumm",
batchSize = 3, batchSize = 3,
averageStartTime = result.averageStartTime,
offsets = Vector("00", "01", "02").map(offset), offsets = Vector("00", "01", "02").map(offset),
) )
result.averageStartTime should be > System.nanoTime() - 1000000000
} }
behavior of "tailer" behavior of "tailer"
@ -375,7 +354,6 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers {
lastRecordTime = someTime.toEpochMilli - 1000, lastRecordTime = someTime.toEpochMilli - 1000,
batch = "bumm1", batch = "bumm1",
batchSize = 3, batchSize = 3,
averageStartTime = 8,
offsets = Vector("00", "01", "02").map(offset), offsets = Vector("00", "01", "02").map(offset),
), ),
Batch( Batch(
@ -385,7 +363,6 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers {
lastRecordTime = someTime.toEpochMilli, lastRecordTime = someTime.toEpochMilli,
batch = "bumm2", batch = "bumm2",
batchSize = 3, batchSize = 3,
averageStartTime = 10,
offsets = Vector("03", "04", "05").map(offset), offsets = Vector("03", "04", "05").map(offset),
), ),
) shouldBe Batch( ) shouldBe Batch(
@ -395,7 +372,6 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers {
lastRecordTime = someTime.toEpochMilli, lastRecordTime = someTime.toEpochMilli,
batch = "zero", batch = "zero",
batchSize = 0, batchSize = 0,
averageStartTime = 0,
offsets = Vector.empty, offsets = Vector.empty,
) )
} }
@ -411,7 +387,6 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers {
lastRecordTime = someTime.toEpochMilli, lastRecordTime = someTime.toEpochMilli,
batch = "zero", batch = "zero",
batchSize = 0, batchSize = 0,
averageStartTime = 0,
offsets = Vector.empty, offsets = Vector.empty,
) )
) shouldBe ParameterStorageBackend.LedgerEnd( ) shouldBe ParameterStorageBackend.LedgerEnd(