Add counter for all indexed events [PLEN-70] (#15921)

This commit is contained in:
Nicu Reut 2023-01-03 16:26:36 +01:00 committed by GitHub
parent f7e97a2f1a
commit 0caec011fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 162 additions and 94 deletions

View File

@ -20,13 +20,37 @@ class IndexedUpdatesMetrics(prefix: MetricName, metricFactory: Factory) {
"Number of events that will be metered.",
)
@MetricDoc.Tag(
summary = "Updates processed by the indexer",
description = "Represents the total number of updates processed, that are sent for indexing.",
qualification = Debug,
)
val eventsMeter: MetricHandle.Meter =
metricFactory.meter(prefix :+ "events", "Number of events ingested by the indexer.")
}
object IndexedUpdatesMetrics {
object Labels {
val applicationId = "application_id"
val grpcCode = "grpc_code"
object eventType {
val key = "event_type"
val configurationChange = "configuration_change"
val partyAllocation = "party_allocation"
val packageUpload = "package_upload"
val transaction = "transaction"
}
object status {
val key = "status"
val accepted = "accepted"
val rejected = "rejected"
}
}
}

View File

@ -66,6 +66,7 @@ private[platform] case class ParallelIndexerSubscription[DB_BATCH](
participantId = participantId,
translation = translation,
compressionStrategy = compressionStrategy,
metrics,
),
UpdateToMeteringDbDto(metrics = metrics.daml.indexerEvents),
)

View File

@ -3,6 +3,8 @@
package com.daml.platform.store.backend
import java.util.UUID
import com.daml.ledger.api.DeduplicationPeriod.{DeduplicationDuration, DeduplicationOffset}
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.offset.Offset
@ -12,11 +14,14 @@ import com.daml.lf.data.{Ref, Time}
import com.daml.lf.engine.Blinding
import com.daml.lf.ledger.EventId
import com.daml.lf.transaction.Transaction.ChildrenRecursion
import com.daml.metrics.api.MetricsContext
import com.daml.metrics.api.MetricsContext.withExtraMetricLabels
import com.daml.metrics.{IndexedUpdatesMetrics, Metrics}
import com.daml.platform._
import com.daml.platform.index.index.StatusDetails
import com.daml.platform.store.dao.JdbcLedgerDao
import com.daml.platform.store.dao.events._
import com.daml.platform._
import java.util.UUID
import io.grpc.Status
object UpdateToDbDto {
@ -24,10 +29,23 @@ object UpdateToDbDto {
participantId: Ref.ParticipantId,
translation: LfValueSerialization,
compressionStrategy: CompressionStrategy,
): Offset => state.Update => Iterator[DbDto] = { offset =>
metrics: Metrics,
)(implicit mc: MetricsContext): Offset => state.Update => Iterator[DbDto] = { offset =>
import state.Update._
{
case u: CommandRejected =>
withExtraMetricLabels(
IndexedUpdatesMetrics.Labels.grpcCode -> Status
.fromCodeValue(u.reasonTemplate.code)
.getCode
.name()
) { implicit mc: MetricsContext =>
incrementCounterForEvent(
metrics.daml.indexerEvents,
IndexedUpdatesMetrics.Labels.eventType.transaction,
IndexedUpdatesMetrics.Labels.status.rejected,
)
}
Iterator(
commandCompletion(offset, u.recordTime, transactionId = None, u.completionInfo).copy(
rejection_status_code = Some(u.reasonTemplate.code),
@ -38,6 +56,11 @@ object UpdateToDbDto {
)
case u: ConfigurationChanged =>
incrementCounterForEvent(
metrics.daml.indexerEvents,
IndexedUpdatesMetrics.Labels.eventType.configurationChange,
IndexedUpdatesMetrics.Labels.status.accepted,
)
Iterator(
DbDto.ConfigurationEntry(
ledger_offset = offset.toHexString,
@ -50,6 +73,11 @@ object UpdateToDbDto {
)
case u: ConfigurationChangeRejected =>
incrementCounterForEvent(
metrics.daml.indexerEvents,
IndexedUpdatesMetrics.Labels.eventType.configurationChange,
IndexedUpdatesMetrics.Labels.status.rejected,
)
Iterator(
DbDto.ConfigurationEntry(
ledger_offset = offset.toHexString,
@ -62,6 +90,11 @@ object UpdateToDbDto {
)
case u: PartyAddedToParticipant =>
incrementCounterForEvent(
metrics.daml.indexerEvents,
IndexedUpdatesMetrics.Labels.eventType.partyAllocation,
IndexedUpdatesMetrics.Labels.status.accepted,
)
Iterator(
DbDto.PartyEntry(
ledger_offset = offset.toHexString,
@ -76,6 +109,11 @@ object UpdateToDbDto {
)
case u: PartyAllocationRejected =>
incrementCounterForEvent(
metrics.daml.indexerEvents,
IndexedUpdatesMetrics.Labels.eventType.partyAllocation,
IndexedUpdatesMetrics.Labels.status.rejected,
)
Iterator(
DbDto.PartyEntry(
ledger_offset = offset.toHexString,
@ -90,6 +128,11 @@ object UpdateToDbDto {
)
case u: PublicPackageUpload =>
incrementCounterForEvent(
metrics.daml.indexerEvents,
IndexedUpdatesMetrics.Labels.eventType.packageUpload,
IndexedUpdatesMetrics.Labels.status.accepted,
)
val uploadId = u.submissionId.getOrElse(UUID.randomUUID().toString)
val packages = u.archives.iterator.map { archive =>
DbDto.Package(
@ -114,6 +157,11 @@ object UpdateToDbDto {
packages ++ packageEntries
case u: PublicPackageUploadRejected =>
incrementCounterForEvent(
metrics.daml.indexerEvents,
IndexedUpdatesMetrics.Labels.eventType.packageUpload,
IndexedUpdatesMetrics.Labels.status.rejected,
)
Iterator(
DbDto.PackageEntry(
ledger_offset = offset.toHexString,
@ -125,6 +173,11 @@ object UpdateToDbDto {
)
case u: TransactionAccepted =>
incrementCounterForEvent(
metrics.daml.indexerEvents,
IndexedUpdatesMetrics.Labels.eventType.transaction,
IndexedUpdatesMetrics.Labels.status.accepted,
)
val blinding = u.blindingInfo.getOrElse(Blinding.blind(u.transaction))
// TODO LLP: Extract in common functionality together with duplicated code in [[InMemoryStateUpdater]]
val preorderTraversal = u.transaction
@ -311,6 +364,20 @@ object UpdateToDbDto {
}
}
private def incrementCounterForEvent(
metrics: IndexedUpdatesMetrics,
eventType: String,
status: String,
)(implicit
mc: MetricsContext
): Unit = {
withExtraMetricLabels(
IndexedUpdatesMetrics.Labels.eventType.key -> eventType,
IndexedUpdatesMetrics.Labels.status.key -> status,
) { implicit mc =>
metrics.eventsMeter.mark()
}
}
private def commandCompletion(
offset: Offset,
recordTime: Time.Timestamp,

View File

@ -10,6 +10,7 @@ import com.daml.ledger.participant.state.v2.Update
import com.daml.ledger.participant.state.{v2 => state}
import com.daml.lf.data.Ref
import com.daml.metrics.Metrics
import com.daml.metrics.api.MetricsContext
import com.daml.platform.store.dao.events.{CompressionStrategy, LfValueTranslation}
import com.daml.platform.store.backend.{
DbDto,
@ -41,23 +42,27 @@ object SequentialWriteDao {
stringInterningView: StringInterning with InternizingStringInterningView,
ingestionStorageBackend: IngestionStorageBackend[_],
parameterStorageBackend: ParameterStorageBackend,
): SequentialWriteDao =
SequentialWriteDaoImpl(
ingestionStorageBackend = ingestionStorageBackend,
parameterStorageBackend = parameterStorageBackend,
updateToDbDtos = UpdateToDbDto(
participantId = participantId,
translation = new LfValueTranslation(
metrics = metrics,
engineO = None,
loadPackage = (_, _) => Future.successful(None),
): SequentialWriteDao = {
MetricsContext.withMetricLabels("participant_id" -> participantId) { implicit mc =>
SequentialWriteDaoImpl(
ingestionStorageBackend = ingestionStorageBackend,
parameterStorageBackend = parameterStorageBackend,
updateToDbDtos = UpdateToDbDto(
participantId = participantId,
translation = new LfValueTranslation(
metrics = metrics,
engineO = None,
loadPackage = (_, _) => Future.successful(None),
),
compressionStrategy = compressionStrategy,
metrics,
),
compressionStrategy = compressionStrategy,
),
ledgerEndCache = ledgerEndCache,
stringInterningView = stringInterningView,
dbDtosToStringsForInterning = DbDtoToStringsForInterning(_),
)
ledgerEndCache = ledgerEndCache,
stringInterningView = stringInterningView,
dbDtosToStringsForInterning = DbDtoToStringsForInterning(_),
)
}
}
val noop: SequentialWriteDao = NoopSequentialWriteDao
}

View File

@ -3,11 +3,14 @@
package com.daml.platform.store.backend
import java.time.Duration
import com.daml.daml_lf_dev.DamlLf
import com.daml.ledger.api.DeduplicationPeriod.{DeduplicationDuration, DeduplicationOffset}
import com.daml.ledger.api.v1.event.{CreatedEvent, ExercisedEvent}
import com.daml.ledger.configuration.{Configuration, LedgerTimeModel}
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.v2.Update
import com.daml.ledger.participant.state.{v2 => state}
import com.daml.lf.crypto
import com.daml.lf.data.{Bytes, Ref, Time}
@ -16,16 +19,18 @@ import com.daml.lf.transaction.BlindingInfo
import com.daml.lf.transaction.test.TransactionBuilder
import com.daml.lf.value.Value
import com.daml.logging.LoggingContext
import com.daml.platform.{ContractId, Create, Exercise}
import com.daml.metrics.Metrics
import com.daml.metrics.api.MetricsContext
import com.daml.platform.index.index.StatusDetails
import com.daml.platform.store.dao.events.Raw.TreeEvent
import com.daml.platform.store.dao.{EventProjectionProperties, JdbcLedgerDao}
import com.daml.platform.store.dao.events.{
CompressionStrategy,
FieldCompressionStrategy,
LfValueSerialization,
Raw,
}
import com.daml.platform.store.dao.{EventProjectionProperties, JdbcLedgerDao}
import com.daml.platform.{ContractId, Create, Exercise}
import com.google.protobuf.ByteString
import com.google.rpc.status.{Status => StatusProto}
import io.grpc.Status
@ -33,7 +38,6 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.prop.TableDrivenPropertyChecks._
import org.scalatest.wordspec.AnyWordSpec
import java.time.Duration
import scala.concurrent.{ExecutionContext, Future}
// Note: this suite contains hand-crafted updates that are impossible to produce on some ledgers
@ -53,9 +57,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
someParticipantId,
someConfiguration,
)
val dtos = UpdateToDbDto(someParticipantId, valueSerialization, compressionStrategy)(
someOffset
)(update).toList
val dtos = updateToDtos(update)
dtos should contain theSameElementsInOrderAs List(
DbDto.ConfigurationEntry(
@ -78,9 +80,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
someConfiguration,
rejectionReason,
)
val dtos = UpdateToDbDto(someParticipantId, valueSerialization, compressionStrategy)(
someOffset
)(update).toList
val dtos = updateToDtos(update)
dtos should contain theSameElementsInOrderAs List(
DbDto.ConfigurationEntry(
@ -103,9 +103,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
someRecordTime,
Some(someSubmissionId),
)
val dtos = UpdateToDbDto(someParticipantId, valueSerialization, compressionStrategy)(
someOffset
)(update).toList
val dtos = updateToDtos(update)
dtos should contain theSameElementsInOrderAs List(
DbDto.PartyEntry(
@ -130,9 +128,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
someRecordTime,
None,
)
val dtos = UpdateToDbDto(someParticipantId, valueSerialization, compressionStrategy)(
someOffset
)(update).toList
val dtos = updateToDtos(update)
dtos should contain theSameElementsInOrderAs List(
DbDto.PartyEntry(
@ -156,9 +152,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
someRecordTime,
rejectionReason,
)
val dtos = UpdateToDbDto(someParticipantId, valueSerialization, compressionStrategy)(
someOffset
)(update).toList
val dtos = updateToDtos(update)
dtos should contain theSameElementsInOrderAs List(
DbDto.PartyEntry(
@ -182,9 +176,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
someRecordTime,
Some(someSubmissionId),
)
val dtos = UpdateToDbDto(someParticipantId, valueSerialization, compressionStrategy)(
someOffset
)(update).toList
val dtos = updateToDtos(update)
dtos should contain theSameElementsInOrderAs List(
DbDto.Package(
@ -222,9 +214,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
someRecordTime,
rejectionReason,
)
val dtos = UpdateToDbDto(someParticipantId, valueSerialization, compressionStrategy)(
someOffset
)(update).toList
val dtos = updateToDtos(update)
dtos should contain theSameElementsInOrderAs List(
DbDto.PackageEntry(
@ -245,9 +235,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
completionInfo,
state.Update.CommandRejected.FinalReason(status),
)
val dtos = UpdateToDbDto(someParticipantId, valueSerialization, compressionStrategy)(
someOffset
)(update).toList
val dtos = updateToDtos(update)
dtos should contain theSameElementsInOrderAs List(
DbDto.CommandCompletion(
@ -298,9 +286,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
blindingInfo = None,
contractMetadata = Map(contractId -> someContractDriverMetadata),
)
val dtos = UpdateToDbDto(someParticipantId, valueSerialization, compressionStrategy)(
someOffset
)(update).toList
val dtos = updateToDtos(update)
dtos.head shouldEqual DbDto.EventCreate(
event_offset = Some(someOffset.toHexString),
@ -392,9 +378,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
blindingInfo = None,
contractMetadata = Map.empty,
)
val dtos = UpdateToDbDto(someParticipantId, valueSerialization, compressionStrategy)(
someOffset
)(update).toList
val dtos = updateToDtos(update)
dtos should contain theSameElementsInOrderAs List(
DbDto.EventExercise(
@ -494,9 +478,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
blindingInfo = None,
contractMetadata = Map.empty,
)
val dtos = UpdateToDbDto(someParticipantId, valueSerialization, compressionStrategy)(
someOffset
)(update).toList
val dtos = updateToDtos(update)
dtos should contain theSameElementsInOrderAs List(
DbDto.EventExercise(
@ -616,9 +598,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
blindingInfo = None,
contractMetadata = Map.empty,
)
val dtos = UpdateToDbDto(someParticipantId, valueSerialization, compressionStrategy)(
someOffset
)(update).toList
val dtos = updateToDtos(update)
dtos should contain theSameElementsInOrderAs List(
DbDto.EventExercise(
@ -785,9 +765,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
blindingInfo = None,
contractMetadata = Map.empty,
)
val dtos = UpdateToDbDto(someParticipantId, valueSerialization, compressionStrategy)(
someOffset
)(update).toList
val dtos = updateToDtos(update)
// Note: fetch and lookup nodes are not indexed
dtos should contain theSameElementsInOrderAs List(
@ -854,9 +832,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
blindingInfo = None,
contractMetadata = Map.empty,
)
val dtos = UpdateToDbDto(someParticipantId, valueSerialization, compressionStrategy)(
someOffset
)(update).toList
val dtos = updateToDtos(update)
dtos should contain theSameElementsInOrderAs List(
DbDto.EventExercise(
@ -978,9 +954,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
blindingInfo = None,
contractMetadata = Map(contractId -> someContractDriverMetadata),
)
val dtos = UpdateToDbDto(someParticipantId, valueSerialization, compressionStrategy)(
someOffset
)(update).toList
val dtos = updateToDtos(update)
dtos.head shouldEqual DbDto.EventCreate(
event_offset = Some(someOffset.toHexString),
@ -1131,9 +1105,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
),
contractMetadata = Map.empty,
)
val dtos = UpdateToDbDto(someParticipantId, valueSerialization, compressionStrategy)(
someOffset
)(update).toList
val dtos = updateToDtos(update)
dtos(0) shouldEqual DbDto.EventExercise(
consuming = true,
@ -1256,9 +1228,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
blindingInfo = None,
contractMetadata = Map.empty,
)
val dtos = UpdateToDbDto(someParticipantId, valueSerialization, compressionStrategy)(
someOffset
)(update).toList
val dtos = updateToDtos(update)
dtos should contain theSameElementsInOrderAs List(
// Note: this divulgence event references a contract that was never created. This is correct:
@ -1328,9 +1298,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
blindingInfo = None,
contractMetadata = Map(contractId -> someContractDriverMetadata),
)
val dtos = UpdateToDbDto(someParticipantId, valueSerialization, compressionStrategy)(
someOffset
)(update).toList
val dtos = updateToDtos(update)
dtos.head shouldEqual DbDto.EventCreate(
event_offset = Some(someOffset.toHexString),
@ -1391,9 +1359,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
blindingInfo = None,
contractMetadata = Map.empty,
)
val dtos = UpdateToDbDto(someParticipantId, valueSerialization, compressionStrategy)(
someOffset
)(update).toList
val dtos = updateToDtos(update)
dtos.head shouldEqual DbDto.EventCreate(
event_offset = Some(someOffset.toHexString),
@ -1470,9 +1436,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
completionInfo,
state.Update.CommandRejected.FinalReason(status),
)
val dtos = UpdateToDbDto(someParticipantId, valueSerialization, compressionStrategy)(
someOffset
)(update).toList
val dtos = updateToDtos(update)
dtos should contain theSameElementsInOrderAs List(
DbDto.CommandCompletion(
@ -1528,9 +1492,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
blindingInfo = None,
contractMetadata = Map(contractId -> someContractDriverMetadata),
)
val dtos = UpdateToDbDto(someParticipantId, valueSerialization, compressionStrategy)(
someOffset
)(update).toList
val dtos = updateToDtos(update)
dtos.head shouldEqual DbDto.EventCreate(
event_offset = Some(someOffset.toHexString),
@ -1587,6 +1549,15 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
}
}
}
private def updateToDtos(
update: Update
) = {
UpdateToDbDto(someParticipantId, valueSerialization, compressionStrategy, Metrics.ForTesting)(
MetricsContext.Empty
)(
someOffset
)(update).toList
}
}
object UpdateToDbDtoSpec {

View File

@ -23,7 +23,7 @@ class UpdateToMeteringDbDtoSpec extends AnyWordSpec with MetricValues {
import DbDtoEq._
private val updateEventsMetrics = newUpdateMetrics
private val IndexedUpdatesMetrics = newUpdateMetrics
"UpdateMeteringToDbDto" should {
@ -75,7 +75,7 @@ class UpdateToMeteringDbDtoSpec extends AnyWordSpec with MetricValues {
"extract transaction metering" in {
val actual =
UpdateToMeteringDbDto(clock = () => timestamp, updateEventsMetrics)(MetricsContext.Empty)(
UpdateToMeteringDbDto(clock = () => timestamp, IndexedUpdatesMetrics)(MetricsContext.Empty)(
List((Offset.fromHexString(offset), someTransactionAccepted))
)
@ -104,7 +104,7 @@ class UpdateToMeteringDbDtoSpec extends AnyWordSpec with MetricValues {
val expected: Vector[DbDto.TransactionMetering] = Vector(metering)
val actual =
UpdateToMeteringDbDto(clock = () => timestamp, updateEventsMetrics)(MetricsContext.Empty)(
UpdateToMeteringDbDto(clock = () => timestamp, IndexedUpdatesMetrics)(MetricsContext.Empty)(
List(
(
Offset.fromHexString(Ref.HexString.assertFromString("01")),
@ -123,7 +123,7 @@ class UpdateToMeteringDbDtoSpec extends AnyWordSpec with MetricValues {
"return empty vector if input iterable is empty" in {
val expected: Vector[DbDto.TransactionMetering] = Vector.empty
val actual = UpdateToMeteringDbDto(clock = () => timestamp, updateEventsMetrics)(
val actual = UpdateToMeteringDbDto(clock = () => timestamp, IndexedUpdatesMetrics)(
MetricsContext.Empty
)(List.empty)
actual should equal(expected)(decided by DbDtoSeqEq)
@ -137,7 +137,7 @@ class UpdateToMeteringDbDtoSpec extends AnyWordSpec with MetricValues {
)
val actual =
UpdateToMeteringDbDto(clock = () => timestamp, updateEventsMetrics)(MetricsContext.Empty)(
UpdateToMeteringDbDto(clock = () => timestamp, IndexedUpdatesMetrics)(MetricsContext.Empty)(
List((Offset.fromHexString(offset), txWithNoActionCount))
)
@ -145,11 +145,11 @@ class UpdateToMeteringDbDtoSpec extends AnyWordSpec with MetricValues {
}
"increment metered events counter" in {
val updateEventsMetrics = newUpdateMetrics
UpdateToMeteringDbDto(clock = () => timestamp, updateEventsMetrics)(MetricsContext.Empty)(
val IndexedUpdatesMetrics = newUpdateMetrics
UpdateToMeteringDbDto(clock = () => timestamp, IndexedUpdatesMetrics)(MetricsContext.Empty)(
List((Offset.fromHexString(offset), someTransactionAccepted))
)
updateEventsMetrics.meteredEventsMeter.value shouldBe (statistics.committed.actions + statistics.rolledBack.actions)
IndexedUpdatesMetrics.meteredEventsMeter.value shouldBe (statistics.committed.actions + statistics.rolledBack.actions)
}
}