mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 01:07:18 +03:00
kvutils: Inject the metrics registry instead of using SharedMetricRegistries. (#5161)
* kvutils: Remove an unnecessary `@SuppressWarnings`. * kvutils: Reduce the scope of fields and methods in `Committer`. * kvutils: Inject the metric registry into `KeyValueCommitting`. CHANGELOG_BEGIN CHANGELOG_END * kvutils: Inject the metric registry into the committers. * kvutils: Inject the metric registry into `ProcessTransactionSubmission`. * kvutils: Avoid shared metric registries in tests. * kvutils: Recreate the metrics registry per participant state. * kvutils: Add trailing commas to parameter lists. Flagrantly encouraged by @stefanobaghino-da. * recovering-indexer: Don't re-use the metric registry in tests.
This commit is contained in:
parent
a6a1e75bc1
commit
2887f28cc1
@ -28,6 +28,7 @@ da_scala_library(
|
||||
"@maven//:com_google_protobuf_protobuf_java",
|
||||
"@maven//:com_typesafe_akka_akka_actor_2_12",
|
||||
"@maven//:com_typesafe_akka_akka_stream_2_12",
|
||||
"@maven//:io_dropwizard_metrics_metrics_core",
|
||||
],
|
||||
)
|
||||
|
||||
@ -54,6 +55,7 @@ da_scala_test(
|
||||
"//libs-scala/resources",
|
||||
"@maven//:com_typesafe_akka_akka_actor_2_12",
|
||||
"@maven//:com_typesafe_akka_akka_stream_2_12",
|
||||
"@maven//:io_dropwizard_metrics_metrics_core",
|
||||
"@maven//:org_scalactic_scalactic_2_12",
|
||||
"@maven//:org_scalatest_scalatest_2_12",
|
||||
],
|
||||
|
@ -31,8 +31,9 @@ object Main {
|
||||
logCtx: LoggingContext,
|
||||
): ResourceOwner[InMemoryLedgerReaderWriter] =
|
||||
new InMemoryLedgerReaderWriter.Owner(
|
||||
config.ledgerId,
|
||||
participantConfig.participantId,
|
||||
initialLedgerId = config.ledgerId,
|
||||
participantId = participantConfig.participantId,
|
||||
metricRegistry = metricRegistry(participantConfig, config),
|
||||
dispatcher = dispatcher,
|
||||
state = state,
|
||||
)
|
||||
|
@ -9,10 +9,11 @@ import java.util.UUID
|
||||
import akka.NotUsed
|
||||
import akka.stream.Materializer
|
||||
import akka.stream.scaladsl.{Sink, Source}
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.daml.ledger.on.memory.InMemoryLedgerReaderWriter._
|
||||
import com.daml.ledger.on.memory.InMemoryState.MutableLog
|
||||
import com.daml.ledger.participant.state.kvutils.api.{LedgerEntry, LedgerReader, LedgerWriter}
|
||||
import com.daml.ledger.participant.state.kvutils.{KVOffset, Bytes, SequentialLogEntryId}
|
||||
import com.daml.ledger.participant.state.kvutils.{Bytes, KVOffset, SequentialLogEntryId}
|
||||
import com.daml.ledger.participant.state.v1._
|
||||
import com.daml.ledger.validator.LedgerStateOperations.{Key, Value}
|
||||
import com.daml.ledger.validator._
|
||||
@ -29,6 +30,7 @@ import scala.concurrent.{ExecutionContext, Future}
|
||||
final class InMemoryLedgerReaderWriter(
|
||||
override val ledgerId: LedgerId,
|
||||
override val participantId: ParticipantId,
|
||||
metricRegistry: MetricRegistry,
|
||||
timeProvider: TimeProvider,
|
||||
dispatcher: Dispatcher[Index],
|
||||
state: InMemoryState,
|
||||
@ -39,7 +41,11 @@ final class InMemoryLedgerReaderWriter(
|
||||
private val committer = new ValidatingCommitter(
|
||||
() => timeProvider.getCurrentTime,
|
||||
SubmissionValidator
|
||||
.create(new InMemoryLedgerStateAccess(state), () => sequentialLogEntryId.next()),
|
||||
.create(
|
||||
new InMemoryLedgerStateAccess(state),
|
||||
allocateNextLogEntryId = () => sequentialLogEntryId.next(),
|
||||
metricRegistry = metricRegistry,
|
||||
),
|
||||
dispatcher.signalNewHead,
|
||||
)
|
||||
|
||||
@ -113,6 +119,7 @@ object InMemoryLedgerReaderWriter {
|
||||
class SingleParticipantOwner(
|
||||
initialLedgerId: Option[LedgerId],
|
||||
participantId: ParticipantId,
|
||||
metricRegistry: MetricRegistry,
|
||||
timeProvider: TimeProvider = DefaultTimeProvider,
|
||||
heartbeats: Source[Instant, NotUsed] = Source.empty,
|
||||
)(implicit materializer: Materializer)
|
||||
@ -127,6 +134,7 @@ object InMemoryLedgerReaderWriter {
|
||||
readerWriter <- new Owner(
|
||||
initialLedgerId,
|
||||
participantId,
|
||||
metricRegistry,
|
||||
timeProvider,
|
||||
dispatcher,
|
||||
state,
|
||||
@ -140,6 +148,7 @@ object InMemoryLedgerReaderWriter {
|
||||
class Owner(
|
||||
initialLedgerId: Option[LedgerId],
|
||||
participantId: ParticipantId,
|
||||
metricRegistry: MetricRegistry,
|
||||
timeProvider: TimeProvider = DefaultTimeProvider,
|
||||
dispatcher: Dispatcher[Index],
|
||||
state: InMemoryState,
|
||||
@ -153,6 +162,7 @@ object InMemoryLedgerReaderWriter {
|
||||
new InMemoryLedgerReaderWriter(
|
||||
ledgerId,
|
||||
participantId,
|
||||
metricRegistry,
|
||||
timeProvider,
|
||||
dispatcher,
|
||||
state,
|
||||
|
@ -7,8 +7,9 @@ import java.time.Instant
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.stream.scaladsl.Source
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase
|
||||
import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase._
|
||||
import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase.ParticipantState
|
||||
import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState
|
||||
import com.daml.ledger.participant.state.v1.{LedgerId, ParticipantId}
|
||||
import com.digitalasset.logging.LoggingContext
|
||||
@ -24,10 +25,12 @@ class InMemoryLedgerReaderWriterIntegrationSpec
|
||||
participantId: ParticipantId,
|
||||
testId: String,
|
||||
heartbeats: Source[Instant, NotUsed],
|
||||
metricRegistry: MetricRegistry,
|
||||
)(implicit logCtx: LoggingContext): ResourceOwner[ParticipantState] =
|
||||
new InMemoryLedgerReaderWriter.SingleParticipantOwner(
|
||||
ledgerId,
|
||||
participantId,
|
||||
metricRegistry,
|
||||
heartbeats = heartbeats,
|
||||
).map(readerWriter => new KeyValueParticipantState(readerWriter, readerWriter))
|
||||
}
|
||||
|
@ -103,6 +103,7 @@ da_scala_library(
|
||||
"@maven//:com_typesafe_play_anorm_2_12",
|
||||
"@maven//:com_typesafe_play_anorm_tokenizer_2_12",
|
||||
"@maven//:com_zaxxer_HikariCP",
|
||||
"@maven//:io_dropwizard_metrics_metrics_core",
|
||||
"@maven//:org_flywaydb_flyway_core",
|
||||
"@maven//:org_scalaz_scalaz_core_2_12",
|
||||
],
|
||||
@ -200,6 +201,7 @@ da_scala_test_suite(
|
||||
"//libs-scala/resources",
|
||||
"@maven//:com_typesafe_akka_akka_actor_2_12",
|
||||
"@maven//:com_typesafe_akka_akka_stream_2_12",
|
||||
"@maven//:io_dropwizard_metrics_metrics_core",
|
||||
"@maven//:org_flywaydb_flyway_core",
|
||||
"@maven//:org_scala_lang_modules_scala_java8_compat_2_12",
|
||||
"@maven//:org_scalactic_scalactic_2_12",
|
||||
|
@ -56,9 +56,10 @@ object SqlLedgerFactory extends LedgerFactory[ReadWriteService, ExtraConfig] {
|
||||
new SqlLedgerReaderWriter.Owner(
|
||||
config.ledgerId,
|
||||
participantConfig.participantId,
|
||||
metricRegistry(participantConfig, config),
|
||||
jdbcUrl,
|
||||
seedService = SeedService(Seeding.Strong))
|
||||
.acquire()
|
||||
seedService = SeedService(Seeding.Strong),
|
||||
).acquire()
|
||||
.map(readerWriter => new KeyValueParticipantState(readerWriter, readerWriter))
|
||||
}
|
||||
}
|
||||
|
@ -9,12 +9,12 @@ import java.util.UUID
|
||||
import akka.NotUsed
|
||||
import akka.stream.Materializer
|
||||
import akka.stream.scaladsl.{Sink, Source}
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.daml.ledger.on.sql.SqlLedgerReaderWriter._
|
||||
import com.daml.ledger.on.sql.queries.Queries
|
||||
import com.daml.ledger.participant.state.kvutils.Bytes
|
||||
import com.daml.ledger.participant.state.kvutils.DamlKvutils.DamlLogEntryId
|
||||
import com.daml.ledger.participant.state.kvutils.KVOffset
|
||||
import com.daml.ledger.participant.state.kvutils.api.{LedgerEntry, LedgerReader, LedgerWriter}
|
||||
import com.daml.ledger.participant.state.kvutils.{Bytes, KVOffset}
|
||||
import com.daml.ledger.participant.state.v1._
|
||||
import com.daml.ledger.validator.LedgerStateOperations.{Key, Value}
|
||||
import com.daml.ledger.validator._
|
||||
@ -35,6 +35,7 @@ import scala.util.{Failure, Success}
|
||||
final class SqlLedgerReaderWriter(
|
||||
override val ledgerId: LedgerId = Ref.LedgerString.assertFromString(UUID.randomUUID.toString),
|
||||
val participantId: ParticipantId,
|
||||
metricRegistry: MetricRegistry,
|
||||
timeProvider: TimeProvider,
|
||||
database: Database,
|
||||
dispatcher: Dispatcher[Index],
|
||||
@ -56,7 +57,11 @@ final class SqlLedgerReaderWriter(
|
||||
private val committer = new ValidatingCommitter[Index](
|
||||
() => timeProvider.getCurrentTime,
|
||||
SubmissionValidator
|
||||
.create(SqlLedgerStateAccess, allocateNextLogEntryId = () => allocateSeededLogEntryId()),
|
||||
.create(
|
||||
SqlLedgerStateAccess,
|
||||
allocateNextLogEntryId = () => allocateSeededLogEntryId(),
|
||||
metricRegistry = metricRegistry,
|
||||
),
|
||||
latestSequenceNo => dispatcher.signalNewHead(latestSequenceNo),
|
||||
)
|
||||
|
||||
@ -110,10 +115,11 @@ object SqlLedgerReaderWriter {
|
||||
class Owner(
|
||||
initialLedgerId: Option[LedgerId],
|
||||
participantId: ParticipantId,
|
||||
metricRegistry: MetricRegistry,
|
||||
jdbcUrl: String,
|
||||
timeProvider: TimeProvider = DefaultTimeProvider,
|
||||
heartbeats: Source[Instant, NotUsed] = Source.empty,
|
||||
seedService: SeedService
|
||||
seedService: SeedService,
|
||||
)(implicit materializer: Materializer, logCtx: LoggingContext)
|
||||
extends ResourceOwner[SqlLedgerReaderWriter] {
|
||||
override def acquire()(
|
||||
@ -129,10 +135,12 @@ object SqlLedgerReaderWriter {
|
||||
new SqlLedgerReaderWriter(
|
||||
ledgerId,
|
||||
participantId,
|
||||
metricRegistry,
|
||||
timeProvider,
|
||||
database,
|
||||
dispatcher,
|
||||
seedService)
|
||||
seedService,
|
||||
)
|
||||
}
|
||||
|
||||
private def updateOrRetrieveLedgerId(initialLedgerId: Option[LedgerId], database: Database)(
|
||||
|
@ -7,6 +7,7 @@ import java.time.Instant
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.stream.scaladsl.Source
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase
|
||||
import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase.ParticipantState
|
||||
import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState
|
||||
@ -26,13 +27,15 @@ abstract class SqlLedgerReaderWriterIntegrationSpecBase(implementationName: Stri
|
||||
participantId: ParticipantId,
|
||||
testId: String,
|
||||
heartbeats: Source[Instant, NotUsed],
|
||||
metricRegistry: MetricRegistry,
|
||||
)(implicit logCtx: LoggingContext): ResourceOwner[ParticipantState] =
|
||||
new SqlLedgerReaderWriter.Owner(
|
||||
ledgerId,
|
||||
participantId,
|
||||
metricRegistry,
|
||||
jdbcUrl(testId),
|
||||
heartbeats = heartbeats,
|
||||
// Using a weak random source to avoid slowdown during tests.
|
||||
seedService = SeedService(Seeding.Weak)
|
||||
seedService = SeedService(Seeding.Weak),
|
||||
).map(readerWriter => new KeyValueParticipantState(readerWriter, readerWriter))
|
||||
}
|
||||
|
@ -77,6 +77,7 @@ da_scala_library(
|
||||
"@maven//:com_google_protobuf_protobuf_java",
|
||||
"@maven//:com_typesafe_akka_akka_actor_2_12",
|
||||
"@maven//:com_typesafe_akka_akka_stream_2_12",
|
||||
"@maven//:io_dropwizard_metrics_metrics_core",
|
||||
"@maven//:org_scala_lang_modules_scala_java8_compat_2_12",
|
||||
"@maven//:org_scalactic_scalactic_2_12",
|
||||
"@maven//:org_scalatest_scalatest_2_12",
|
||||
|
@ -4,6 +4,7 @@
|
||||
package com.daml.ledger.participant.state.kvutils
|
||||
|
||||
import com.codahale.metrics
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.daml.ledger.participant.state.kvutils.Conversions._
|
||||
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
|
||||
import com.daml.ledger.participant.state.kvutils.committer.{
|
||||
@ -25,16 +26,19 @@ import org.slf4j.LoggerFactory
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
object KeyValueCommitting {
|
||||
class KeyValueCommitting(metricRegistry: MetricRegistry) {
|
||||
private val logger = LoggerFactory.getLogger(this.getClass)
|
||||
|
||||
def packDamlStateKey(key: DamlStateKey): ByteString = key.toByteString
|
||||
|
||||
def unpackDamlStateKey(bytes: ByteString): DamlStateKey =
|
||||
DamlStateKey.parseFrom(bytes)
|
||||
|
||||
def packDamlStateValue(value: DamlStateValue): ByteString = value.toByteString
|
||||
|
||||
def unpackDamlStateValue(bytes: Array[Byte]): DamlStateValue =
|
||||
DamlStateValue.parseFrom(bytes)
|
||||
|
||||
def unpackDamlStateValue(bytes: ByteString): DamlStateValue =
|
||||
DamlStateValue.parseFrom(bytes)
|
||||
|
||||
@ -133,7 +137,7 @@ object KeyValueCommitting {
|
||||
): (DamlLogEntry, Map[DamlStateKey, DamlStateValue]) =
|
||||
submission.getPayloadCase match {
|
||||
case DamlSubmission.PayloadCase.PACKAGE_UPLOAD_ENTRY =>
|
||||
PackageCommitter(engine).run(
|
||||
new PackageCommitter(engine, metricRegistry).run(
|
||||
entryId,
|
||||
//TODO replace this call with an explicit maxRecordTime from the request once available
|
||||
estimateMaximumRecordTime(recordTime),
|
||||
@ -144,7 +148,7 @@ object KeyValueCommitting {
|
||||
)
|
||||
|
||||
case DamlSubmission.PayloadCase.PARTY_ALLOCATION_ENTRY =>
|
||||
PartyAllocationCommitter.run(
|
||||
new PartyAllocationCommitter(metricRegistry).run(
|
||||
entryId,
|
||||
//TODO replace this call with an explicit maxRecordTime from the request once available
|
||||
estimateMaximumRecordTime(recordTime),
|
||||
@ -155,7 +159,7 @@ object KeyValueCommitting {
|
||||
)
|
||||
|
||||
case DamlSubmission.PayloadCase.CONFIGURATION_SUBMISSION =>
|
||||
ConfigCommitter(defaultConfig).run(
|
||||
new ConfigCommitter(defaultConfig, metricRegistry).run(
|
||||
entryId,
|
||||
parseTimestamp(submission.getConfigurationSubmission.getMaximumRecordTime),
|
||||
recordTime,
|
||||
@ -165,7 +169,7 @@ object KeyValueCommitting {
|
||||
)
|
||||
|
||||
case DamlSubmission.PayloadCase.TRANSACTION_ENTRY =>
|
||||
ProcessTransactionSubmission(defaultConfig, engine).run(
|
||||
new ProcessTransactionSubmission(defaultConfig, engine, metricRegistry).run(
|
||||
entryId,
|
||||
recordTime,
|
||||
participantId,
|
||||
@ -317,30 +321,28 @@ object KeyValueCommitting {
|
||||
}
|
||||
|
||||
private object Metrics {
|
||||
//TODO: Replace with metrics registry object passed in constructor
|
||||
private val registry = metrics.SharedMetricRegistries.getOrCreate("kvutils")
|
||||
private val prefix = "kvutils.committer"
|
||||
|
||||
// Timer (and count) of how fast submissions have been processed.
|
||||
val runTimer: metrics.Timer = registry.timer(s"$prefix.run_timer")
|
||||
val runTimer: metrics.Timer = metricRegistry.timer(s"$prefix.run_timer")
|
||||
|
||||
// Number of exceptions seen.
|
||||
val exceptions: metrics.Counter = registry.counter(s"$prefix.exceptions")
|
||||
val exceptions: metrics.Counter = metricRegistry.counter(s"$prefix.exceptions")
|
||||
|
||||
// Counter to monitor how many at a time and when kvutils is processing a submission.
|
||||
val processing: metrics.Counter = registry.counter(s"$prefix.processing")
|
||||
val processing: metrics.Counter = metricRegistry.counter(s"$prefix.processing")
|
||||
|
||||
val lastRecordTimeGauge = new VarGauge[String]("<none>")
|
||||
registry.register(s"$prefix.last.record_time", lastRecordTimeGauge)
|
||||
metricRegistry.register(s"$prefix.last.record_time", lastRecordTimeGauge)
|
||||
|
||||
val lastEntryIdGauge = new VarGauge[String]("<none>")
|
||||
registry.register(s"$prefix.last.entry_id", lastEntryIdGauge)
|
||||
metricRegistry.register(s"$prefix.last.entry_id", lastEntryIdGauge)
|
||||
|
||||
val lastParticipantIdGauge = new VarGauge[String]("<none>")
|
||||
registry.register(s"$prefix.last.participant_id", lastParticipantIdGauge)
|
||||
metricRegistry.register(s"$prefix.last.participant_id", lastParticipantIdGauge)
|
||||
|
||||
val lastExceptionGauge = new VarGauge[String]("<none>")
|
||||
registry.register(s"$prefix.last.exception", lastExceptionGauge)
|
||||
metricRegistry.register(s"$prefix.last.exception", lastExceptionGauge)
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -5,13 +5,13 @@ package com.daml.ledger.participant.state.kvutils.committer
|
||||
|
||||
import com.codahale.metrics
|
||||
import com.codahale.metrics.Timer
|
||||
import com.daml.ledger.participant.state.kvutils.DamlStateMap
|
||||
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
|
||||
DamlLogEntry,
|
||||
DamlLogEntryId,
|
||||
DamlStateKey,
|
||||
DamlStateValue,
|
||||
DamlStateValue
|
||||
}
|
||||
import com.daml.ledger.participant.state.kvutils.DamlStateMap
|
||||
import com.daml.ledger.participant.state.v1.ParticipantId
|
||||
import com.digitalasset.daml.lf.data.Time
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
@ -35,27 +35,31 @@ import org.slf4j.{Logger, LoggerFactory}
|
||||
* e.g. `kvutils.PackageCommitter`. An overall run time is measured in `kvutils.PackageCommitter.run-timer`,
|
||||
* and each step is measured separately under `step-timers.<step>`, e.g. `kvutils.PackageCommitter.step-timers.validateEntry`.
|
||||
*/
|
||||
private[kvutils] trait Committer[Submission, PartialResult] {
|
||||
type StepInfo = String
|
||||
type Step = (CommitContext, PartialResult) => StepResult[PartialResult]
|
||||
def steps: Iterable[(StepInfo, Step)]
|
||||
lazy val committerName: String = this.getClass.getSimpleName.toLowerCase
|
||||
private[committer] trait Committer[Submission, PartialResult] {
|
||||
protected type StepInfo = String
|
||||
protected type Step = (CommitContext, PartialResult) => StepResult[PartialResult]
|
||||
|
||||
protected val logger: Logger = LoggerFactory.getLogger(getClass)
|
||||
|
||||
protected val committerName: String
|
||||
|
||||
protected def steps: Iterable[(StepInfo, Step)]
|
||||
|
||||
/** The initial internal state passed to first step. */
|
||||
def init(ctx: CommitContext, subm: Submission): PartialResult
|
||||
protected def init(ctx: CommitContext, subm: Submission): PartialResult
|
||||
|
||||
val logger: Logger = LoggerFactory.getLogger(this.getClass)
|
||||
protected val metricRegistry: metrics.MetricRegistry
|
||||
|
||||
//TODO: Replace with metrics registry object passed in constructor
|
||||
val metricsRegistry: metrics.MetricRegistry =
|
||||
metrics.SharedMetricRegistries.getOrCreate("kvutils")
|
||||
def metricsName(metric: String): String =
|
||||
metrics.MetricRegistry.name("kvutils.committer", committerName, metric)
|
||||
private val runTimer: Timer = metricsRegistry.timer(metricsName("run_timer"))
|
||||
protected def metricsName(metric: String): String =
|
||||
metrics.MetricRegistry.name("kvutils", "committer", committerName, metric)
|
||||
|
||||
// These timers are lazy because they rely on `committerName`, which is defined in the subclass
|
||||
// and therefore not set at object initialization.
|
||||
private lazy val runTimer: Timer = metricRegistry.timer(metricsName("run_timer"))
|
||||
private lazy val stepTimers: Map[StepInfo, Timer] =
|
||||
steps.map {
|
||||
case (info, _) =>
|
||||
info -> metricsRegistry.timer(metricsName(s"step_timers.${info}"))
|
||||
info -> metricRegistry.timer(metricsName(s"step_timers.$info"))
|
||||
}.toMap
|
||||
|
||||
/** A committer can `run` a submission and produce a log entry and output states. */
|
||||
@ -86,6 +90,6 @@ private[kvutils] trait Committer[Submission, PartialResult] {
|
||||
return logEntry -> ctx.getOutputs.toMap
|
||||
}
|
||||
}
|
||||
sys.error(s"Internal error: Committer ${this.getClass} did not produce a result!")
|
||||
sys.error(s"Internal error: Committer $committerName did not produce a result!")
|
||||
}
|
||||
}
|
||||
|
@ -3,25 +3,35 @@
|
||||
|
||||
package com.daml.ledger.participant.state.kvutils.committer
|
||||
|
||||
import com.daml.ledger.participant.state.kvutils.Conversions._
|
||||
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlConfigurationEntry, _}
|
||||
import com.codahale.metrics.{Counter, MetricRegistry}
|
||||
import com.daml.ledger.participant.state.kvutils.Conversions.{
|
||||
buildTimestamp,
|
||||
configDedupKey,
|
||||
configurationStateKey
|
||||
}
|
||||
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
|
||||
import com.daml.ledger.participant.state.kvutils.committing.Common.getCurrentConfiguration
|
||||
import com.daml.ledger.participant.state.v1.Configuration
|
||||
|
||||
private[kvutils] object ConfigCommitter {
|
||||
|
||||
case class Result(
|
||||
submission: DamlConfigurationSubmission,
|
||||
currentConfig: (Option[DamlConfigurationEntry], Configuration))
|
||||
currentConfig: (Option[DamlConfigurationEntry], Configuration),
|
||||
)
|
||||
|
||||
}
|
||||
|
||||
private[kvutils] case class ConfigCommitter(
|
||||
defaultConfig: Configuration
|
||||
private[kvutils] class ConfigCommitter(
|
||||
defaultConfig: Configuration,
|
||||
override protected val metricRegistry: MetricRegistry,
|
||||
) extends Committer[DamlConfigurationSubmission, ConfigCommitter.Result] {
|
||||
|
||||
override protected val committerName = "config"
|
||||
|
||||
private object Metrics {
|
||||
// kvutils.ConfigCommitter.*
|
||||
val accepts = metricsRegistry.counter(metricsName("accepts"))
|
||||
val rejections = metricsRegistry.counter(metricsName("rejections"))
|
||||
val accepts: Counter = metricRegistry.counter(metricsName("accepts"))
|
||||
val rejections: Counter = metricRegistry.counter(metricsName("rejections"))
|
||||
}
|
||||
|
||||
private def rejectionTraceLog(msg: String, submission: DamlConfigurationSubmission): Unit =
|
||||
@ -172,8 +182,8 @@ private[kvutils] case class ConfigCommitter(
|
||||
private def buildRejectionLogEntry(
|
||||
ctx: CommitContext,
|
||||
submission: DamlConfigurationSubmission,
|
||||
addErrorDetails: DamlConfigurationRejectionEntry.Builder => DamlConfigurationRejectionEntry.Builder)
|
||||
: DamlLogEntry = {
|
||||
addErrorDetails: DamlConfigurationRejectionEntry.Builder => DamlConfigurationRejectionEntry.Builder,
|
||||
): DamlLogEntry = {
|
||||
Metrics.rejections.inc()
|
||||
DamlLogEntry.newBuilder
|
||||
.setRecordTime(buildTimestamp(ctx.getRecordTime))
|
||||
@ -188,15 +198,16 @@ private[kvutils] case class ConfigCommitter(
|
||||
.build
|
||||
}
|
||||
|
||||
override def init(
|
||||
override protected def init(
|
||||
ctx: CommitContext,
|
||||
configurationSubmission: DamlConfigurationSubmission): ConfigCommitter.Result =
|
||||
configurationSubmission: DamlConfigurationSubmission,
|
||||
): ConfigCommitter.Result =
|
||||
ConfigCommitter.Result(
|
||||
configurationSubmission,
|
||||
getCurrentConfiguration(defaultConfig, ctx.inputs, logger)
|
||||
)
|
||||
|
||||
override val steps: Iterable[(StepInfo, Step)] = Iterable(
|
||||
override protected val steps: Iterable[(StepInfo, Step)] = Iterable(
|
||||
"check_ttl" -> checkTtl,
|
||||
"authorize_submission" -> authorizeSubmission,
|
||||
"validate_submission" -> validateSubmission,
|
||||
@ -204,6 +215,4 @@ private[kvutils] case class ConfigCommitter(
|
||||
"build_log_entry" -> buildLogEntry
|
||||
)
|
||||
|
||||
override lazy val committerName = "config"
|
||||
|
||||
}
|
||||
|
@ -5,7 +5,7 @@ package com.daml.ledger.participant.state.kvutils.committer
|
||||
|
||||
import java.util.concurrent.Executors
|
||||
|
||||
import com.codahale.metrics.{Counter, Gauge, Timer}
|
||||
import com.codahale.metrics.{Counter, Gauge, MetricRegistry, Timer}
|
||||
import com.daml.ledger.participant.state.kvutils.Conversions.{buildTimestamp, packageUploadDedupKey}
|
||||
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
|
||||
import com.digitalasset.daml.lf.archive.Decode
|
||||
@ -16,15 +16,19 @@ import com.digitalasset.daml_lf_dev.DamlLf.Archive
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
private[kvutils] case class PackageCommitter(engine: Engine)
|
||||
extends Committer[DamlPackageUploadEntry, DamlPackageUploadEntry.Builder] {
|
||||
private[kvutils] class PackageCommitter(
|
||||
engine: Engine,
|
||||
override protected val metricRegistry: MetricRegistry,
|
||||
) extends Committer[DamlPackageUploadEntry, DamlPackageUploadEntry.Builder] {
|
||||
|
||||
override protected val committerName = "package_upload"
|
||||
|
||||
private object Metrics {
|
||||
// kvutils.PackageCommitter.*
|
||||
val preloadTimer: Timer = metricsRegistry.timer(metricsName("preload_timer"))
|
||||
val decodeTimer: Timer = metricsRegistry.timer(metricsName("decode_timer"))
|
||||
val accepts: Counter = metricsRegistry.counter(metricsName("accepts"))
|
||||
val rejections: Counter = metricsRegistry.counter(metricsName("rejections"))
|
||||
metricsRegistry.gauge(
|
||||
val preloadTimer: Timer = metricRegistry.timer(metricsName("preload_timer"))
|
||||
val decodeTimer: Timer = metricRegistry.timer(metricsName("decode_timer"))
|
||||
val accepts: Counter = metricRegistry.counter(metricsName("accepts"))
|
||||
val rejections: Counter = metricRegistry.counter(metricsName("rejections"))
|
||||
metricRegistry.gauge(
|
||||
metricsName("loaded_packages"),
|
||||
() =>
|
||||
new Gauge[Int] {
|
||||
@ -35,7 +39,8 @@ private[kvutils] case class PackageCommitter(engine: Engine)
|
||||
|
||||
private def rejectionTraceLog(
|
||||
msg: String,
|
||||
packageUploadEntry: DamlPackageUploadEntry.Builder): Unit =
|
||||
packageUploadEntry: DamlPackageUploadEntry.Builder,
|
||||
): Unit =
|
||||
logger.trace(
|
||||
s"Package upload rejected, $msg, correlationId=${packageUploadEntry.getSubmissionId}")
|
||||
|
||||
@ -152,12 +157,13 @@ private[kvutils] case class PackageCommitter(engine: Engine)
|
||||
)
|
||||
}
|
||||
|
||||
override def init(
|
||||
override protected def init(
|
||||
ctx: CommitContext,
|
||||
uploadEntry: DamlPackageUploadEntry): DamlPackageUploadEntry.Builder =
|
||||
uploadEntry: DamlPackageUploadEntry,
|
||||
): DamlPackageUploadEntry.Builder =
|
||||
uploadEntry.toBuilder
|
||||
|
||||
override val steps: Iterable[(StepInfo, Step)] = Iterable(
|
||||
override protected val steps: Iterable[(StepInfo, Step)] = Iterable(
|
||||
"authorize_submission" -> authorizeSubmission,
|
||||
"validate_entry" -> validateEntry,
|
||||
"deduplicate_submission" -> deduplicateSubmission,
|
||||
@ -166,13 +172,11 @@ private[kvutils] case class PackageCommitter(engine: Engine)
|
||||
"build_log_entry" -> buildLogEntry
|
||||
)
|
||||
|
||||
override lazy val committerName = "package_upload"
|
||||
|
||||
private def buildRejectionLogEntry(
|
||||
ctx: CommitContext,
|
||||
packageUploadEntry: DamlPackageUploadEntry.Builder,
|
||||
addErrorDetails: DamlPackageUploadRejectionEntry.Builder => DamlPackageUploadRejectionEntry.Builder)
|
||||
: DamlLogEntry = {
|
||||
addErrorDetails: DamlPackageUploadRejectionEntry.Builder => DamlPackageUploadRejectionEntry.Builder,
|
||||
): DamlLogEntry = {
|
||||
Metrics.rejections.inc()
|
||||
DamlLogEntry.newBuilder
|
||||
.setRecordTime(buildTimestamp(ctx.getRecordTime))
|
||||
|
@ -3,7 +3,7 @@
|
||||
|
||||
package com.daml.ledger.participant.state.kvutils.committer
|
||||
|
||||
import com.codahale.metrics.Counter
|
||||
import com.codahale.metrics.{Counter, MetricRegistry}
|
||||
import com.daml.ledger.participant.state.kvutils.Conversions.{
|
||||
buildTimestamp,
|
||||
partyAllocationDedupKey
|
||||
@ -11,18 +11,21 @@ import com.daml.ledger.participant.state.kvutils.Conversions.{
|
||||
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
|
||||
import com.digitalasset.daml.lf.data.Ref
|
||||
|
||||
private[kvutils] case object PartyAllocationCommitter
|
||||
extends Committer[DamlPartyAllocationEntry, DamlPartyAllocationEntry.Builder] {
|
||||
private[kvutils] class PartyAllocationCommitter(
|
||||
override protected val metricRegistry: MetricRegistry,
|
||||
) extends Committer[DamlPartyAllocationEntry, DamlPartyAllocationEntry.Builder] {
|
||||
|
||||
override protected val committerName = "party_allocation"
|
||||
|
||||
private object Metrics {
|
||||
// kvutils.PartyAllocationCommitter.*
|
||||
val accepts: Counter = metricsRegistry.counter(metricsName("accepts"))
|
||||
val rejections: Counter = metricsRegistry.counter(metricsName("rejections"))
|
||||
val accepts: Counter = metricRegistry.counter(metricsName("accepts"))
|
||||
val rejections: Counter = metricRegistry.counter(metricsName("rejections"))
|
||||
}
|
||||
|
||||
private def rejectionTraceLog(
|
||||
msg: String,
|
||||
partyAllocationEntry: DamlPartyAllocationEntry.Builder): Unit =
|
||||
partyAllocationEntry: DamlPartyAllocationEntry.Builder,
|
||||
): Unit =
|
||||
logger.trace(
|
||||
s"Party allocation rejected, $msg, correlationId=${partyAllocationEntry.getSubmissionId}")
|
||||
|
||||
@ -47,7 +50,7 @@ private[kvutils] case object PartyAllocationCommitter
|
||||
if (Ref.Party.fromString(party).isRight)
|
||||
StepContinue(partyAllocationEntry)
|
||||
else {
|
||||
val msg = s"party string '${party}' invalid"
|
||||
val msg = s"party string '$party' invalid"
|
||||
rejectionTraceLog(msg, partyAllocationEntry)
|
||||
StepStop(
|
||||
buildRejectionLogEntry(
|
||||
@ -133,8 +136,8 @@ private[kvutils] case object PartyAllocationCommitter
|
||||
private def buildRejectionLogEntry(
|
||||
ctx: CommitContext,
|
||||
partyAllocationEntry: DamlPartyAllocationEntry.Builder,
|
||||
addErrorDetails: DamlPartyAllocationRejectionEntry.Builder => DamlPartyAllocationRejectionEntry.Builder)
|
||||
: DamlLogEntry = {
|
||||
addErrorDetails: DamlPartyAllocationRejectionEntry.Builder => DamlPartyAllocationRejectionEntry.Builder,
|
||||
): DamlLogEntry = {
|
||||
Metrics.rejections.inc()
|
||||
DamlLogEntry.newBuilder
|
||||
.setRecordTime(buildTimestamp(ctx.getRecordTime))
|
||||
@ -148,12 +151,13 @@ private[kvutils] case object PartyAllocationCommitter
|
||||
.build
|
||||
}
|
||||
|
||||
override def init(
|
||||
override protected def init(
|
||||
ctx: CommitContext,
|
||||
partyAllocationEntry: DamlPartyAllocationEntry): DamlPartyAllocationEntry.Builder =
|
||||
partyAllocationEntry: DamlPartyAllocationEntry,
|
||||
): DamlPartyAllocationEntry.Builder =
|
||||
partyAllocationEntry.toBuilder
|
||||
|
||||
override val steps: Iterable[(StepInfo, Step)] = Iterable(
|
||||
override protected val steps: Iterable[(StepInfo, Step)] = Iterable(
|
||||
"authorize_submission" -> authorizeSubmission,
|
||||
"validate_party" -> validateParty,
|
||||
"deduplicate_submission" -> deduplicateSubmission,
|
||||
@ -161,6 +165,4 @@ private[kvutils] case object PartyAllocationCommitter
|
||||
"build_log_entry" -> buildLogEntry
|
||||
)
|
||||
|
||||
override lazy val committerName = "party_allocation"
|
||||
|
||||
}
|
||||
|
@ -3,8 +3,7 @@
|
||||
|
||||
package com.daml.ledger.participant.state.kvutils.committing
|
||||
|
||||
import com.codahale.metrics
|
||||
import com.codahale.metrics.{Counter, Timer}
|
||||
import com.codahale.metrics.{Counter, MetricRegistry, Timer}
|
||||
import com.daml.ledger.participant.state.kvutils.Conversions._
|
||||
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
|
||||
import com.daml.ledger.participant.state.kvutils.committing.Common.Commit._
|
||||
@ -26,9 +25,10 @@ import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
private[kvutils] case class ProcessTransactionSubmission(
|
||||
private[kvutils] class ProcessTransactionSubmission(
|
||||
defaultConfig: Configuration,
|
||||
engine: Engine,
|
||||
metricRegistry: MetricRegistry,
|
||||
) {
|
||||
|
||||
private implicit val logger: Logger = LoggerFactory.getLogger(this.getClass)
|
||||
@ -526,23 +526,21 @@ private[kvutils] case class ProcessTransactionSubmission(
|
||||
.build,
|
||||
)
|
||||
}
|
||||
|
||||
private object Metrics {
|
||||
private val prefix = "kvutils.committer.transaction"
|
||||
val runTimer: Timer = metricRegistry.timer(s"$prefix.run_timer")
|
||||
val interpretTimer: Timer = metricRegistry.timer(s"$prefix.interpret_timer")
|
||||
val accepts: Counter = metricRegistry.counter(s"$prefix.accepts")
|
||||
val rejections: Map[Int, Counter] =
|
||||
DamlTransactionRejectionEntry.ReasonCase.values
|
||||
.map(v => v.getNumber -> metricRegistry.counter(s"$prefix.rejections_${v.name}"))
|
||||
.toMap
|
||||
}
|
||||
}
|
||||
|
||||
object ProcessTransactionSubmission {
|
||||
|
||||
private[committing] object Metrics {
|
||||
//TODO: Replace with metrics registry object passed in constructor
|
||||
private val registry = metrics.SharedMetricRegistries.getOrCreate("kvutils")
|
||||
private val prefix = "kvutils.committer.transaction"
|
||||
val runTimer: Timer = registry.timer(s"$prefix.run_timer")
|
||||
val interpretTimer: Timer = registry.timer(s"$prefix.interpret_timer")
|
||||
val accepts: Counter = registry.counter(s"$prefix.accepts")
|
||||
val rejections: Map[Int, Counter] =
|
||||
DamlTransactionRejectionEntry.ReasonCase.values
|
||||
.map(v => v.getNumber -> registry.counter(s"$prefix.rejections_${v.name}"))
|
||||
.toMap
|
||||
}
|
||||
|
||||
private case class TransactionEntry(txEntry: DamlTransactionEntry) {
|
||||
val ledgerEffectiveTime: Timestamp = parseTimestamp(txEntry.getLedgerEffectiveTime)
|
||||
val submitterInfo: DamlSubmitterInfo = txEntry.getSubmitterInfo
|
||||
|
@ -5,6 +5,7 @@ package com.daml.ledger.validator
|
||||
|
||||
import java.util.UUID
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
|
||||
import com.daml.ledger.participant.state.kvutils.api.LedgerReader
|
||||
import com.daml.ledger.participant.state.kvutils.{Bytes, Envelope, KeyValueCommitting}
|
||||
@ -108,7 +109,6 @@ class SubmissionValidator[LogResult](
|
||||
} yield logResult
|
||||
}
|
||||
|
||||
@SuppressWarnings(Array("org.wartremover.warts.Product", "org.wartremover.warts.Serializable"))
|
||||
private def runValidation[T](
|
||||
envelope: Bytes,
|
||||
correlationId: String,
|
||||
@ -181,10 +181,11 @@ object SubmissionValidator {
|
||||
ledgerStateAccess: LedgerStateAccess[LogResult],
|
||||
allocateNextLogEntryId: () => DamlLogEntryId = () => allocateRandomLogEntryId(),
|
||||
checkForMissingInputs: Boolean = false,
|
||||
metricRegistry: MetricRegistry,
|
||||
)(implicit executionContext: ExecutionContext): SubmissionValidator[LogResult] = {
|
||||
new SubmissionValidator(
|
||||
ledgerStateAccess,
|
||||
processSubmission,
|
||||
processSubmission(new KeyValueCommitting(metricRegistry)),
|
||||
allocateNextLogEntryId,
|
||||
checkForMissingInputs,
|
||||
)
|
||||
@ -195,13 +196,14 @@ object SubmissionValidator {
|
||||
.setEntryId(ByteString.copyFromUtf8(UUID.randomUUID().toString))
|
||||
.build()
|
||||
|
||||
private[validator] def processSubmission(
|
||||
private[validator] def processSubmission(keyValueCommitting: KeyValueCommitting)(
|
||||
damlLogEntryId: DamlLogEntryId,
|
||||
recordTime: Timestamp,
|
||||
damlSubmission: DamlSubmission,
|
||||
participantId: ParticipantId,
|
||||
inputState: Map[DamlStateKey, Option[DamlStateValue]]): LogEntryAndState =
|
||||
KeyValueCommitting.processSubmission(
|
||||
inputState: Map[DamlStateKey, Option[DamlStateValue]],
|
||||
): LogEntryAndState =
|
||||
keyValueCommitting.processSubmission(
|
||||
engine,
|
||||
damlLogEntryId,
|
||||
recordTime,
|
||||
|
@ -9,6 +9,7 @@ import java.util.UUID
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.stream.scaladsl.{Sink, Source}
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.daml.ledger.participant.state.kvutils.KVOffset.{fromLong => toOffset}
|
||||
import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase._
|
||||
import com.daml.ledger.participant.state.v1.Update._
|
||||
@ -62,6 +63,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(
|
||||
participantId: ParticipantId,
|
||||
testId: String,
|
||||
heartbeats: Source[Instant, NotUsed],
|
||||
metricRegistry: MetricRegistry,
|
||||
)(implicit logCtx: LoggingContext): ResourceOwner[ParticipantState]
|
||||
|
||||
private def participantState: ResourceOwner[ParticipantState] =
|
||||
@ -72,7 +74,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(
|
||||
heartbeats: Source[Instant, NotUsed] = Source.empty,
|
||||
): ResourceOwner[ParticipantState] =
|
||||
newLoggingContext { implicit logCtx =>
|
||||
participantStateFactory(ledgerId, participantId, testId, heartbeats)
|
||||
participantStateFactory(ledgerId, participantId, testId, heartbeats, new MetricRegistry)
|
||||
}
|
||||
|
||||
override protected def beforeEach(): Unit = {
|
||||
|
@ -5,18 +5,19 @@ package com.daml.ledger.participant.state.kvutils
|
||||
|
||||
import java.time.Duration
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
|
||||
import com.daml.ledger.participant.state.v1._
|
||||
import com.digitalasset.daml.lf.command.{Command, Commands}
|
||||
import com.digitalasset.daml.lf.crypto
|
||||
import com.digitalasset.daml.lf.data.{ImmArray, Ref}
|
||||
import com.digitalasset.daml.lf.data.Time.Timestamp
|
||||
import com.digitalasset.daml.lf.data.{ImmArray, Ref}
|
||||
import com.digitalasset.daml.lf.engine.Engine
|
||||
import com.digitalasset.daml.lf.transaction.Transaction
|
||||
import com.digitalasset.daml_lf_dev.DamlLf
|
||||
import scalaz.State
|
||||
import scalaz.syntax.traverse._
|
||||
import scalaz.std.list._
|
||||
import scalaz.syntax.traverse._
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
@ -29,13 +30,18 @@ final case class KVTestState(
|
||||
damlState: Map[DamlStateKey, DamlStateValue]) {}
|
||||
|
||||
object KVTest {
|
||||
import scalaz.State._
|
||||
|
||||
import TestHelpers._
|
||||
import scalaz.State._
|
||||
|
||||
type KVTest[A] = State[KVTestState, A]
|
||||
|
||||
private[this] val defaultAdditionalContractDataTy = "Party"
|
||||
|
||||
private[kvutils] val metricRegistry = new MetricRegistry
|
||||
|
||||
private[this] val keyValueCommitting = new KeyValueCommitting(metricRegistry)
|
||||
|
||||
def initialTestState: KVTestState =
|
||||
KVTestState(
|
||||
engine = Engine(),
|
||||
@ -112,11 +118,11 @@ object KVTest {
|
||||
def getDamlState(key: DamlStateKey): KVTest[Option[DamlStateValue]] =
|
||||
gets(s => s.damlState.get(key))
|
||||
|
||||
def submit(submission: DamlSubmission): KVTest[(DamlLogEntryId, DamlLogEntry)] =
|
||||
private def submit(submission: DamlSubmission): KVTest[(DamlLogEntryId, DamlLogEntry)] =
|
||||
for {
|
||||
testState <- get[KVTestState]
|
||||
entryId <- freshEntryId
|
||||
(logEntry, newState) = KeyValueCommitting.processSubmission(
|
||||
(logEntry, newState) = keyValueCommitting.processSubmission(
|
||||
engine = testState.engine,
|
||||
entryId = entryId,
|
||||
recordTime = testState.recordTime,
|
||||
@ -131,8 +137,7 @@ object KVTest {
|
||||
} yield {
|
||||
// Verify that all state touched matches with "submissionOutputs".
|
||||
assert(
|
||||
newState.keySet subsetOf
|
||||
KeyValueCommitting.submissionOutputs(entryId, submission)
|
||||
newState.keySet subsetOf keyValueCommitting.submissionOutputs(entryId, submission)
|
||||
)
|
||||
|
||||
// Verify that we can always process the log entry
|
||||
|
@ -5,7 +5,6 @@ package com.daml.ledger.participant.state.kvutils
|
||||
|
||||
import java.time.Duration
|
||||
|
||||
import com.codahale.metrics
|
||||
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
|
||||
import com.daml.ledger.participant.state.v1.Configuration
|
||||
import com.digitalasset.daml.lf.data.Ref
|
||||
@ -162,10 +161,9 @@ class KVUtilsConfigSpec extends WordSpec with Matchers {
|
||||
}, submissionId = Ref.LedgerString.assertFromString("submission-id-1"))
|
||||
} yield {
|
||||
// Check that we're updating the metrics (assuming this test at least has been run)
|
||||
val reg = metrics.SharedMetricRegistries.getOrCreate("kvutils")
|
||||
reg.counter("kvutils.committer.config.accepts").getCount should be >= 1L
|
||||
reg.counter("kvutils.committer.config.rejections").getCount should be >= 1L
|
||||
reg.timer("kvutils.committer.config.run_timer").getCount should be >= 1L
|
||||
metricRegistry.counter("kvutils.committer.config.accepts").getCount should be >= 1L
|
||||
metricRegistry.counter("kvutils.committer.config.rejections").getCount should be >= 1L
|
||||
metricRegistry.timer("kvutils.committer.config.run_timer").getCount should be >= 1L
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -5,7 +5,6 @@ package com.daml.ledger.participant.state.kvutils
|
||||
|
||||
import java.io.File
|
||||
|
||||
import com.codahale.metrics
|
||||
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
|
||||
DamlLogEntry,
|
||||
DamlPackageUploadRejectionEntry
|
||||
@ -93,10 +92,11 @@ class KVUtilsPackageSpec extends WordSpec with Matchers with BazelRunfiles {
|
||||
_ <- submitArchives("simple-archive-submission-1", simpleArchive).map(_._2)
|
||||
} yield {
|
||||
// Check that we're updating the metrics (assuming this test at least has been run)
|
||||
val reg = metrics.SharedMetricRegistries.getOrCreate("kvutils")
|
||||
reg.counter("kvutils.committer.package_upload.accepts").getCount should be >= 1L
|
||||
reg.counter("kvutils.committer.package_upload.rejections").getCount should be >= 1L
|
||||
reg.timer("kvutils.committer.package_upload.run_timer").getCount should be >= 1L
|
||||
metricRegistry.counter("kvutils.committer.package_upload.accepts").getCount should be >= 1L
|
||||
metricRegistry
|
||||
.counter("kvutils.committer.package_upload.rejections")
|
||||
.getCount should be >= 1L
|
||||
metricRegistry.timer("kvutils.committer.package_upload.run_timer").getCount should be >= 1L
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3,7 +3,6 @@
|
||||
|
||||
package com.daml.ledger.participant.state.kvutils
|
||||
|
||||
import com.codahale.metrics
|
||||
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
|
||||
DamlLogEntry,
|
||||
DamlPartyAllocationRejectionEntry
|
||||
@ -84,10 +83,15 @@ class KVUtilsPartySpec extends WordSpec with Matchers {
|
||||
_ <- submitPartyAllocation("submission-1", "bob", p0)
|
||||
} yield {
|
||||
// Check that we're updating the metrics (assuming this test at least has been run)
|
||||
val reg = metrics.SharedMetricRegistries.getOrCreate("kvutils")
|
||||
reg.counter("kvutils.committer.party_allocation.accepts").getCount should be >= 1L
|
||||
reg.counter("kvutils.committer.party_allocation.rejections").getCount should be >= 1L
|
||||
reg.timer("kvutils.committer.party_allocation.run_timer").getCount should be >= 1L
|
||||
metricRegistry
|
||||
.counter("kvutils.committer.party_allocation.accepts")
|
||||
.getCount should be >= 1L
|
||||
metricRegistry
|
||||
.counter("kvutils.committer.party_allocation.rejections")
|
||||
.getCount should be >= 1L
|
||||
metricRegistry
|
||||
.timer("kvutils.committer.party_allocation.run_timer")
|
||||
.getCount should be >= 1L
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3,7 +3,6 @@
|
||||
|
||||
package com.daml.ledger.participant.state.kvutils
|
||||
|
||||
import com.codahale.metrics
|
||||
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
|
||||
DamlLogEntry,
|
||||
DamlTransactionRejectionEntry
|
||||
@ -17,18 +16,18 @@ import com.digitalasset.daml.lf.command.{
|
||||
ExerciseCommand
|
||||
}
|
||||
import com.digitalasset.daml.lf.crypto
|
||||
import com.digitalasset.daml.lf.data.{Ref, FrontStack, SortedLookupList}
|
||||
import com.digitalasset.daml.lf.data.{FrontStack, Ref, SortedLookupList}
|
||||
import com.digitalasset.daml.lf.transaction.Node.NodeCreate
|
||||
import com.digitalasset.daml.lf.value.Value
|
||||
import com.digitalasset.daml.lf.value.Value.{
|
||||
AbsoluteContractId,
|
||||
ValueUnit,
|
||||
ValueParty,
|
||||
ValueOptional,
|
||||
ValueList,
|
||||
ValueVariant,
|
||||
ValueOptional,
|
||||
ValueParty,
|
||||
ValueRecord,
|
||||
ValueTextMap
|
||||
ValueTextMap,
|
||||
ValueUnit,
|
||||
ValueVariant
|
||||
}
|
||||
import org.scalatest.{Matchers, WordSpec}
|
||||
|
||||
@ -284,12 +283,11 @@ class KVUtilsTransactionSpec extends WordSpec with Matchers {
|
||||
} yield {
|
||||
val disputed = DamlTransactionRejectionEntry.ReasonCase.DISPUTED
|
||||
// Check that we're updating the metrics (assuming this test at least has been run)
|
||||
val reg = metrics.SharedMetricRegistries.getOrCreate("kvutils")
|
||||
reg.counter("kvutils.committer.transaction.accepts").getCount should be >= 1L
|
||||
reg
|
||||
metricRegistry.counter("kvutils.committer.transaction.accepts").getCount should be >= 1L
|
||||
metricRegistry
|
||||
.counter(s"kvutils.committer.transaction.rejections_${disputed.name}")
|
||||
.getCount should be >= 1L
|
||||
reg.timer("kvutils.committer.transaction.run_timer").getCount should be >= 1L
|
||||
metricRegistry.timer("kvutils.committer.transaction.run_timer").getCount should be >= 1L
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5,9 +5,10 @@ package com.daml.ledger.validator
|
||||
|
||||
import java.time.Clock
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
|
||||
import com.daml.ledger.participant.state.kvutils.{Bytes, Envelope}
|
||||
import com.daml.ledger.participant.state.kvutils.MockitoHelpers.captor
|
||||
import com.daml.ledger.participant.state.kvutils.{Bytes, Envelope, KeyValueCommitting}
|
||||
import com.daml.ledger.participant.state.v1.ParticipantId
|
||||
import com.daml.ledger.validator.SubmissionValidator.{LogEntryAndState, RawKeyValuePairs}
|
||||
import com.daml.ledger.validator.SubmissionValidatorSpec._
|
||||
@ -28,10 +29,14 @@ class SubmissionValidatorSpec extends AsyncWordSpec with Matchers with Inside {
|
||||
val mockStateOperations = mock[LedgerStateOperations[Unit]]
|
||||
when(mockStateOperations.readState(any[Seq[Bytes]]()))
|
||||
.thenReturn(Future.successful(Seq(Some(aStateValue()))))
|
||||
val instance = SubmissionValidator.create(new FakeStateAccess(mockStateOperations))
|
||||
val instance = SubmissionValidator.create(
|
||||
new FakeStateAccess(mockStateOperations),
|
||||
metricRegistry = new MetricRegistry,
|
||||
)
|
||||
instance.validate(anEnvelope(), "aCorrelationId", newRecordTime(), aParticipantId()).map {
|
||||
inside(_) {
|
||||
case Right(_) => succeed
|
||||
case Left(error: ValidationError) => fail(s"ValidationError: $error")
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -42,7 +47,9 @@ class SubmissionValidatorSpec extends AsyncWordSpec with Matchers with Inside {
|
||||
.thenReturn(Future.successful(Seq(None)))
|
||||
val instance = SubmissionValidator.create(
|
||||
ledgerStateAccess = new FakeStateAccess(mockStateOperations),
|
||||
checkForMissingInputs = true)
|
||||
checkForMissingInputs = true,
|
||||
metricRegistry = new MetricRegistry,
|
||||
)
|
||||
instance.validate(anEnvelope(), "aCorrelationId", newRecordTime(), aParticipantId()).map {
|
||||
inside(_) {
|
||||
case Left(MissingInputState(keys)) => keys should have size 1
|
||||
@ -52,7 +59,10 @@ class SubmissionValidatorSpec extends AsyncWordSpec with Matchers with Inside {
|
||||
|
||||
"return invalid submission for invalid envelope" in {
|
||||
val mockStateOperations = mock[LedgerStateOperations[Unit]]
|
||||
val instance = SubmissionValidator.create(new FakeStateAccess(mockStateOperations))
|
||||
val instance = SubmissionValidator.create(
|
||||
new FakeStateAccess(mockStateOperations),
|
||||
metricRegistry = new MetricRegistry,
|
||||
)
|
||||
instance
|
||||
.validate(
|
||||
ByteString.copyFrom(Array[Byte](1, 2, 3)),
|
||||
@ -108,7 +118,7 @@ class SubmissionValidatorSpec extends AsyncWordSpec with Matchers with Inside {
|
||||
val mockLogEntryIdGenerator = mockFunctionReturning(expectedLogEntryId)
|
||||
val instance = new SubmissionValidator(
|
||||
new FakeStateAccess(mockStateOperations),
|
||||
SubmissionValidator.processSubmission,
|
||||
SubmissionValidator.processSubmission(new KeyValueCommitting(new MetricRegistry)),
|
||||
mockLogEntryIdGenerator)
|
||||
instance
|
||||
.validateAndCommit(anEnvelope(), "aCorrelationId", newRecordTime(), aParticipantId())
|
||||
|
@ -8,6 +8,7 @@ import java.time.Duration
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import com.codahale.metrics
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.daml.ledger.participant.state.kvutils.{DamlKvutils => Proto, _}
|
||||
import com.daml.ledger.participant.state.v1._
|
||||
import com.digitalasset.daml.lf.data.Ref
|
||||
@ -36,13 +37,13 @@ object IntegrityCheck extends App {
|
||||
val filename = args(0)
|
||||
println(s"Verifying integrity of $filename...")
|
||||
|
||||
val registry = metrics.SharedMetricRegistries.getOrCreate("kvutils")
|
||||
val metricRegistry = new MetricRegistry
|
||||
// Register JVM related metrics.
|
||||
(new metrics.jvm.GarbageCollectorMetricSet).getMetrics.forEach { (k, m) =>
|
||||
val _ = registry.register(s"jvm.gc.$k", m)
|
||||
val _ = metricRegistry.register(s"jvm.gc.$k", m)
|
||||
}
|
||||
(new metrics.jvm.MemoryUsageGaugeSet).getMetrics.forEach { (k, m) =>
|
||||
val _ = registry.register(s"jvm.mem.$k", m)
|
||||
val _ = metricRegistry.register(s"jvm.mem.$k", m)
|
||||
}
|
||||
|
||||
val ledgerDumpStream: DataInputStream =
|
||||
@ -54,6 +55,7 @@ object IntegrityCheck extends App {
|
||||
timeModel = TimeModel.reasonableDefault,
|
||||
maxDeduplicationTime = Duration.ofDays(1),
|
||||
)
|
||||
val keyValueCommitting = new KeyValueCommitting(metricRegistry)
|
||||
var state = Map.empty[Proto.DamlStateKey, Proto.DamlStateValue]
|
||||
|
||||
var total_t_commit = 0L
|
||||
@ -82,7 +84,7 @@ object IntegrityCheck extends App {
|
||||
print(s"verifying ${Pretty.prettyEntryId(entry.getEntryId)}: commit... ")
|
||||
val (t_commit, (logEntry2, outputState)) = Helpers.time(
|
||||
() =>
|
||||
KeyValueCommitting.processSubmission(
|
||||
keyValueCommitting.processSubmission(
|
||||
engine,
|
||||
entry.getEntryId,
|
||||
Conversions.parseTimestamp(logEntry.getRecordTime),
|
||||
@ -135,7 +137,7 @@ object IntegrityCheck extends App {
|
||||
|
||||
// Dump detailed metrics.
|
||||
val reporter = metrics.ConsoleReporter
|
||||
.forRegistry(metrics.SharedMetricRegistries.getOrCreate("kvutils"))
|
||||
.forRegistry(metricRegistry)
|
||||
.convertRatesTo(TimeUnit.SECONDS)
|
||||
.convertDurationsTo(TimeUnit.MILLISECONDS)
|
||||
.build
|
||||
|
@ -225,8 +225,11 @@ object RecoveringIndexerIntegrationSpec {
|
||||
implicit materializer: Materializer,
|
||||
logCtx: LoggingContext
|
||||
): ResourceOwner[ParticipantState] =
|
||||
new InMemoryLedgerReaderWriter.SingleParticipantOwner(ledgerId, participantId)
|
||||
.map(readerWriter => new KeyValueParticipantState(readerWriter, readerWriter))
|
||||
new InMemoryLedgerReaderWriter.SingleParticipantOwner(
|
||||
ledgerId,
|
||||
participantId,
|
||||
new MetricRegistry,
|
||||
).map(readerWriter => new KeyValueParticipantState(readerWriter, readerWriter))
|
||||
}
|
||||
|
||||
private object ParticipantStateThatFailsOften extends ParticipantStateFactory {
|
||||
|
@ -153,10 +153,11 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] {
|
||||
readerWriter <- new SqlLedgerReaderWriter.Owner(
|
||||
initialLedgerId = specifiedLedgerId,
|
||||
participantId = ParticipantId,
|
||||
metricRegistry = metrics,
|
||||
jdbcUrl = ledgerJdbcUrl,
|
||||
timeProvider = timeServiceBackend.getOrElse(TimeProvider.UTC),
|
||||
heartbeats = heartbeats,
|
||||
seedService = SeedService(seeding)
|
||||
seedService = SeedService(seeding),
|
||||
)
|
||||
ledger = new KeyValueParticipantState(readerWriter, readerWriter)
|
||||
readService = new TimedReadService(ledger, metrics, ReadServicePrefix)
|
||||
|
Loading…
Reference in New Issue
Block a user