diff --git a/ledger/indexer-benchmark/src/main/scala/ledger/indexerbenchmark/IndexerBenchmark.scala b/ledger/indexer-benchmark/src/main/scala/ledger/indexerbenchmark/IndexerBenchmark.scala index a10953ace5..006c5da676 100644 --- a/ledger/indexer-benchmark/src/main/scala/ledger/indexerbenchmark/IndexerBenchmark.scala +++ b/ledger/indexer-benchmark/src/main/scala/ledger/indexerbenchmark/IndexerBenchmark.scala @@ -108,6 +108,12 @@ class IndexerBenchmark() { val duration: Double = (stopTime - startTime).toDouble / 1000000000.0 val updates: Long = metrics.daml.parallelIndexer.updates.getCount val updateRate: Double = updates / duration + val inputMappingDurationMetric = metrics.registry.timer( + MetricRegistry.name(metrics.daml.parallelIndexer.inputMapping.executor, "duration") + ) + val batchingDurationMetric = metrics.registry.timer( + MetricRegistry.name(metrics.daml.parallelIndexer.batching.executor, "duration") + ) val (failure, minimumUpdateRateFailureInfo): (Boolean, String) = config.minUpdateRate match { case Some(requiredMinUpdateRate) if requiredMinUpdateRate > updateRate => @@ -149,13 +155,21 @@ class IndexerBenchmark() { metrics.daml.parallelIndexer.inputMapping.batchSize.getSnapshot )} | inputMapping.duration: ${histogramToString( - metrics.daml.parallelIndexer.inputMapping.duration.getSnapshot + inputMappingDurationMetric.getSnapshot )} - | inputMapping.duration.rate: ${metrics.daml.parallelIndexer.inputMapping.duration.getMeanRate} + | inputMapping.duration.rate: ${inputMappingDurationMetric.getMeanRate} + | batching.duration: ${histogramToString(batchingDurationMetric.getSnapshot)} + | batching.duration.rate: ${batchingDurationMetric.getMeanRate} + | seqMapping.duration: ${metrics.daml.parallelIndexer.seqMapping.duration.getSnapshot}| + | seqMapping.duration.rate: ${metrics.daml.parallelIndexer.seqMapping.duration.getMeanRate}| | ingestion.duration: ${histogramToString( - metrics.daml.parallelIndexer.ingestion.duration.getSnapshot + metrics.daml.parallelIndexer.ingestion.executionTimer.getSnapshot )} - | ingestion.duration.rate: ${metrics.daml.parallelIndexer.ingestion.duration.getMeanRate} + | ingestion.duration.rate: ${metrics.daml.parallelIndexer.ingestion.executionTimer.getMeanRate} + | tailIngestion.duration: ${histogramToString( + metrics.daml.parallelIndexer.tailIngestion.executionTimer.getSnapshot + )} + | tailIngestion.duration.rate: ${metrics.daml.parallelIndexer.tailIngestion.executionTimer.getMeanRate} | |Notes: | The above numbers include all ingested updates, including package uploads. diff --git a/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala b/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala index c1bacd833a..a8749e08c6 100644 --- a/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala +++ b/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala @@ -594,10 +594,6 @@ final class Metrics(val registry: MetricRegistry) { // Bundle of metrics coming from instrumentation of the underlying thread-pool val executor: MetricName = Prefix :+ "executor" - // The latency, which during an update element is residing in the mapping-stage. - // Since batches are involved, this duration is divided by the batch size. - val duration: Timer = registry.timer(Prefix :+ "duration") - // The batch size, i.e., the number of state updates per database submission val batchSize: Histogram = registry.histogram(Prefix :+ "batch_size") } @@ -609,21 +605,19 @@ final class Metrics(val registry: MetricRegistry) { // Bundle of metrics coming from instrumentation of the underlying thread-pool val executor: MetricName = Prefix :+ "executor" + } - // The latency, which during an update element is residing in the batching-stage. - // Since batches are involved, this duration is divided by the batch size. + // Sequence Mapping stage + object seqMapping { + private val Prefix: MetricName = parallelIndexer.Prefix :+ "seqmapping" + + // The latency, which during an update element is residing in the seq-mapping-stage. val duration: Timer = registry.timer(Prefix :+ "duration") } // Ingestion stage // Parallel ingestion of prepared data into the database - object ingestion extends DatabaseMetrics(registry, Prefix, "ingestion") { - private val Prefix: MetricName = parallelIndexer.Prefix :+ "ingestion" - - // The latency, which during an update element is residing in the ingestion. - // Since batches are involved, this duration is divided by the batch size. - val duration: Timer = registry.timer(Prefix :+ "duration") - } + val ingestion = new DatabaseMetrics(registry, Prefix, "ingestion") // Tail ingestion stage // The throttled update of ledger end parameters diff --git a/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/ParallelIndexerSubscription.scala b/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/ParallelIndexerSubscription.scala index 2ee2a4c30b..f6cfd481bc 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/ParallelIndexerSubscription.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/ParallelIndexerSubscription.scala @@ -11,7 +11,7 @@ import com.daml.ledger.participant.state.{v2 => state} import com.daml.lf.data.Ref import com.daml.logging.LoggingContext.withEnrichedLoggingContext import com.daml.logging.{ContextualizedLogger, LoggingContext} -import com.daml.metrics.{InstrumentedSource, Metrics} +import com.daml.metrics.{InstrumentedSource, Metrics, Timed} import com.daml.platform.indexer.ha.Handle import com.daml.platform.indexer.parallel.AsyncSupport._ import com.daml.platform.store.appendonlydao.DbDispatcher @@ -21,7 +21,6 @@ import com.daml.platform.store.backend._ import com.daml.platform.store.interning.{InternizingStringInterningView, StringInterning} import java.sql.Connection -import java.util.concurrent.TimeUnit import scala.concurrent.Future private[platform] case class ParallelIndexerSubscription[DB_BATCH]( @@ -72,7 +71,7 @@ private[platform] case class ParallelIndexerSubscription[DB_BATCH]( ), batchingParallelism = batchingParallelism, batcher = batcherExecutor.execute( - batcher(ingestionStorageBackend.batch(_, stringInterningView), metrics) + batcher(ingestionStorageBackend.batch(_, stringInterningView)) ), ingestingParallelism = ingestionParallelism, ingester = ingester(ingestionStorageBackend.insertBatch, dbDispatcher, metrics), @@ -87,7 +86,6 @@ private[platform] case class ParallelIndexerSubscription[DB_BATCH]( counter = metrics.daml.parallelIndexer.inputBufferLength, size = maxInputBufferSize, ) - .map(_ -> System.nanoTime()) ) .map(_ => ()) .viaMat(KillSwitches.single)(Keep.right[NotUsed, UniqueKillSwitch]) @@ -109,7 +107,6 @@ object ParallelIndexerSubscription { * @param lastRecordTime The latest record time in the batch, in milliseconds since Epoch. Needed for metrics population. * @param batch The batch of variable type. * @param batchSize Size of the batch measured in number of updates. Needed for metrics population. - * @param averageStartTime The nanosecond timestamp of the start of the previous processing stage. Needed for metrics population: how much time is spend by a particular update in a certain stage. */ case class Batch[+T]( lastOffset: Offset, @@ -118,7 +115,6 @@ object ParallelIndexerSubscription { lastRecordTime: Long, batch: T, batchSize: Int, - averageStartTime: Long, offsets: Vector[Offset], ) @@ -128,32 +124,31 @@ object ParallelIndexerSubscription { toMeteringDbDto: Iterable[(Offset, state.Update)] => Vector[DbDto.TransactionMetering], )(implicit loggingContext: LoggingContext - ): Iterable[((Offset, state.Update), Long)] => Batch[Vector[DbDto]] = { input => + ): Iterable[(Offset, state.Update)] => Batch[Vector[DbDto]] = { input => metrics.daml.parallelIndexer.inputMapping.batchSize.update(input.size) - input.foreach { case ((offset, update), _) => + input.foreach { case (offset, update) => withEnrichedLoggingContext("offset" -> offset, "update" -> update) { implicit loggingContext => logger.info(s"Storing ${update.description}") } } - val mainBatch = input.iterator.flatMap { case ((offset, update), _) => + val mainBatch = input.iterator.flatMap { case (offset, update) => toDbDto(offset)(update) }.toVector - val meteringBatch = toMeteringDbDto(input.map(_._1)) + val meteringBatch = toMeteringDbDto(input) val batch = mainBatch ++ meteringBatch Batch( - lastOffset = input.last._1._1, + lastOffset = input.last._1, lastSeqEventId = 0, // will be filled later in the sequential step lastStringInterningId = 0, // will be filled later in the sequential step - lastRecordTime = input.last._1._2.recordTime.toInstant.toEpochMilli, + lastRecordTime = input.last._2.recordTime.toInstant.toEpochMilli, batch = batch, batchSize = input.size, - averageStartTime = input.view.map(_._2 / input.size).sum, - offsets = input.view.map(_._1._1).toVector, + offsets = input.view.map(_._1).toVector, ) } @@ -169,7 +164,6 @@ object ParallelIndexerSubscription { lastRecordTime = 0, batch = Vector.empty, batchSize = 0, - averageStartTime = 0, offsets = Vector.empty, ) @@ -180,61 +174,52 @@ object ParallelIndexerSubscription { previous: Batch[Vector[DbDto]], current: Batch[Vector[DbDto]], ): Batch[Vector[DbDto]] = { - var eventSeqId = previous.lastSeqEventId - val batchWithSeqIds = current.batch.map { - case dbDto: DbDto.EventCreate => - eventSeqId += 1 - dbDto.copy(event_sequential_id = eventSeqId) + Timed.value( + metrics.daml.parallelIndexer.seqMapping.duration, { + var eventSeqId = previous.lastSeqEventId + val batchWithSeqIds = current.batch.map { + case dbDto: DbDto.EventCreate => + eventSeqId += 1 + dbDto.copy(event_sequential_id = eventSeqId) - case dbDto: DbDto.EventExercise => - eventSeqId += 1 - dbDto.copy(event_sequential_id = eventSeqId) + case dbDto: DbDto.EventExercise => + eventSeqId += 1 + dbDto.copy(event_sequential_id = eventSeqId) - case dbDto: DbDto.EventDivulgence => - eventSeqId += 1 - dbDto.copy(event_sequential_id = eventSeqId) + case dbDto: DbDto.EventDivulgence => + eventSeqId += 1 + dbDto.copy(event_sequential_id = eventSeqId) - case dbDto: DbDto.CreateFilter => - // we do not increase the event_seq_id here, because all the CreateFilter DbDto-s must have the same eventSeqId as the preceding EventCreate - dbDto.copy(event_sequential_id = eventSeqId) + case dbDto: DbDto.CreateFilter => + // we do not increase the event_seq_id here, because all the CreateFilter DbDto-s must have the same eventSeqId as the preceding EventCreate + dbDto.copy(event_sequential_id = eventSeqId) - case unChanged => unChanged - } + case unChanged => unChanged + } - val (newLastStringInterningId, dbDtosWithStringInterning) = - internize(batchWithSeqIds) - .map(DbDto.StringInterningDto.from) match { - case noNewEntries if noNewEntries.isEmpty => - previous.lastStringInterningId -> batchWithSeqIds - case newEntries => newEntries.last.internalId -> (batchWithSeqIds ++ newEntries) - } + val (newLastStringInterningId, dbDtosWithStringInterning) = + internize(batchWithSeqIds) + .map(DbDto.StringInterningDto.from) match { + case noNewEntries if noNewEntries.isEmpty => + previous.lastStringInterningId -> batchWithSeqIds + case newEntries => newEntries.last.internalId -> (batchWithSeqIds ++ newEntries) + } - val nowNanos = System.nanoTime() - metrics.daml.parallelIndexer.inputMapping.duration.update( - (nowNanos - current.averageStartTime) / current.batchSize, - TimeUnit.NANOSECONDS, - ) - current.copy( - lastSeqEventId = eventSeqId, - lastStringInterningId = newLastStringInterningId, - batch = dbDtosWithStringInterning, - averageStartTime = nowNanos, // setting start time to the start of the next stage + current.copy( + lastSeqEventId = eventSeqId, + lastStringInterningId = newLastStringInterningId, + batch = dbDtosWithStringInterning, + ) + }, ) } def batcher[DB_BATCH]( - batchF: Vector[DbDto] => DB_BATCH, - metrics: Metrics, + batchF: Vector[DbDto] => DB_BATCH ): Batch[Vector[DbDto]] => Batch[DB_BATCH] = { inBatch => val dbBatch = batchF(inBatch.batch) - val nowNanos = System.nanoTime() - metrics.daml.parallelIndexer.batching.duration.update( - (nowNanos - inBatch.averageStartTime) / inBatch.batchSize, - TimeUnit.NANOSECONDS, - ) inBatch.copy( - batch = dbBatch, - averageStartTime = nowNanos, + batch = dbBatch ) } @@ -248,11 +233,6 @@ object ParallelIndexerSubscription { dbDispatcher.executeSql(metrics.daml.parallelIndexer.ingestion) { connection => metrics.daml.parallelIndexer.updates.inc(batch.batchSize.toLong) ingestFunction(connection, batch.batch) - val nowNanos = System.nanoTime() - metrics.daml.parallelIndexer.ingestion.duration.update( - (nowNanos - batch.averageStartTime) / batch.batchSize, - TimeUnit.NANOSECONDS, - ) batch } } @@ -268,7 +248,6 @@ object ParallelIndexerSubscription { lastRecordTime = curr.lastRecordTime, batch = zeroDbBatch, // not used anymore batchSize = 0, // not used anymore - averageStartTime = 0, // not used anymore offsets = Vector.empty, // not used anymore ) diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/parallel/ParallelIndexerSubscriptionSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/parallel/ParallelIndexerSubscriptionSpec.scala index 177e7bd875..4a0a2a6a68 100644 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/parallel/ParallelIndexerSubscriptionSpec.scala +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/parallel/ParallelIndexerSubscriptionSpec.scala @@ -127,27 +127,18 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers { toMeteringDbDto = _ => Vector.empty, )(lc)( List( + (Offset.fromHexString(Ref.HexString.assertFromString("00")), somePackageUploadRejected), ( - (Offset.fromHexString(Ref.HexString.assertFromString("00")), somePackageUploadRejected), - 1000000001, + Offset.fromHexString(Ref.HexString.assertFromString("01")), + somePackageUploadRejected.copy(recordTime = + somePackageUploadRejected.recordTime.addMicros(1000) + ), ), ( - ( - Offset.fromHexString(Ref.HexString.assertFromString("01")), - somePackageUploadRejected.copy(recordTime = - somePackageUploadRejected.recordTime.addMicros(1000) - ), + Offset.fromHexString(Ref.HexString.assertFromString("02")), + somePackageUploadRejected.copy(recordTime = + somePackageUploadRejected.recordTime.addMicros(2000) ), - 1000000002, - ), - ( - ( - Offset.fromHexString(Ref.HexString.assertFromString("02")), - somePackageUploadRejected.copy(recordTime = - somePackageUploadRejected.recordTime.addMicros(2000) - ), - ), - 1000000003, ), ) ) @@ -165,7 +156,6 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers { someParty, ), batchSize = 3, - averageStartTime = 1000000001, offsets = Vector("00", "01", "02").map(offset), ) actual shouldBe expected @@ -234,10 +224,7 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers { toMeteringDbDto = _ => expected, )(lc)( List( - ( - (Offset.fromHexString(offset), someTransactionAccepted), - timestamp, - ) + (Offset.fromHexString(offset), someTransactionAccepted) ) ) .batch @@ -257,7 +244,6 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers { lastRecordTime = 0, batch = Vector.empty, batchSize = 0, - averageStartTime = 0, offsets = Vector.empty, ) } @@ -287,13 +273,11 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers { someParty, ), batchSize = 3, - averageStartTime = 1000000001, offsets = Vector("00", "01", "02").map(offset), ), ) result.lastSeqEventId shouldBe 18 result.lastStringInterningId shouldBe 1 - result.averageStartTime should be > System.nanoTime() - 1000000000 result.batch(1).asInstanceOf[DbDto.EventDivulgence].event_sequential_id shouldBe 16 result.batch(3).asInstanceOf[DbDto.EventCreate].event_sequential_id shouldBe 17 result.batch(4).asInstanceOf[DbDto.CreateFilter].event_sequential_id shouldBe 17 @@ -320,7 +304,6 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers { someParty, ), batchSize = 3, - averageStartTime = 1000000001, offsets = Vector("00", "01", "02").map(offset), ), ) @@ -332,8 +315,7 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers { it should "batch correctly in happy path case" in { val result = ParallelIndexerSubscription.batcher( - batchF = _ => "bumm", - metrics = metrics, + batchF = _ => "bumm" )( Batch( lastOffset = offset("02"), @@ -347,7 +329,6 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers { someParty, ), batchSize = 3, - averageStartTime = 1000000001, offsets = Vector("00", "01", "02").map(offset), ) ) @@ -358,10 +339,8 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers { lastRecordTime = someTime.toEpochMilli, batch = "bumm", batchSize = 3, - averageStartTime = result.averageStartTime, offsets = Vector("00", "01", "02").map(offset), ) - result.averageStartTime should be > System.nanoTime() - 1000000000 } behavior of "tailer" @@ -375,7 +354,6 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers { lastRecordTime = someTime.toEpochMilli - 1000, batch = "bumm1", batchSize = 3, - averageStartTime = 8, offsets = Vector("00", "01", "02").map(offset), ), Batch( @@ -385,7 +363,6 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers { lastRecordTime = someTime.toEpochMilli, batch = "bumm2", batchSize = 3, - averageStartTime = 10, offsets = Vector("03", "04", "05").map(offset), ), ) shouldBe Batch( @@ -395,7 +372,6 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers { lastRecordTime = someTime.toEpochMilli, batch = "zero", batchSize = 0, - averageStartTime = 0, offsets = Vector.empty, ) } @@ -411,7 +387,6 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers { lastRecordTime = someTime.toEpochMilli, batch = "zero", batchSize = 0, - averageStartTime = 0, offsets = Vector.empty, ) ) shouldBe ParameterStorageBackend.LedgerEnd(