ledger/metrics: Move metric helpers to their own Bazel package. (#5542)

* ledger/metrics: Move metric helpers to their own Bazel package.

CHANGELOG_BEGIN
CHANGELOG_END

* sandbox: Use ledger/metrics.

* metrics: Rename `Metrics` to `Timed` and drop the `timed` prefix.

Importing methods is harder than importing objects.

* metrics: Publish to Maven Central.
This commit is contained in:
Samir Talwar 2020-04-14 14:32:03 +02:00 committed by GitHub
parent 08a5a64325
commit bdb476fff0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 152 additions and 145 deletions

View File

@ -0,0 +1,25 @@
# Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
load(
"//bazel_tools:scala.bzl",
"da_scala_library",
)
da_scala_library(
name = "metrics",
srcs = glob(["src/main/scala/**/*.scala"]),
resources = glob(["src/main/resources/**/*"]),
tags = ["maven_coordinates=com.daml:metrics:__VERSION__"],
visibility = [
"//visibility:public",
],
runtime_deps = [],
deps = [
"//libs-scala/direct-execution-context",
"@maven//:com_typesafe_akka_akka_actor_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",
"@maven//:io_dropwizard_metrics_metrics_core",
"@maven//:io_dropwizard_metrics_metrics_jvm",
],
)

View File

@ -1,7 +1,7 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.metrics
package com.daml.metrics
import java.util
@ -13,7 +13,7 @@ import com.codahale.metrics.jvm.{
ThreadStatesGaugeSet
}
import com.codahale.metrics.{Metric, MetricSet}
import com.daml.ledger.participant.state.metrics.JvmMetricSet._
import com.daml.metrics.JvmMetricSet._
import scala.collection.JavaConverters._

View File

@ -1,7 +1,7 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.metrics
package com.daml.metrics
import scala.language.implicitConversions

View File

@ -1,7 +1,7 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.metrics
package com.daml.metrics
import java.util.concurrent.CompletionStage
@ -12,9 +12,9 @@ import com.daml.dec.DirectExecutionContext
import scala.concurrent.Future
object Metrics {
object Timed {
def timedCompletionStage[T](timer: Timer, future: => CompletionStage[T]): CompletionStage[T] = {
def completionStage[T](timer: Timer, future: => CompletionStage[T]): CompletionStage[T] = {
val ctx = timer.time()
future.whenComplete { (_, _) =>
ctx.stop()
@ -22,14 +22,14 @@ object Metrics {
}
}
def timedFuture[T](timer: Timer, future: => Future[T]): Future[T] = {
def future[T](timer: Timer, future: => Future[T]): Future[T] = {
val ctx = timer.time()
val result = future
result.onComplete(_ => ctx.stop())(DirectExecutionContext)
result
}
def timedSource[Out, Mat](timer: Timer, source: => Source[Out, Mat]): Source[Out, Mat] = {
def source[Out, Mat](timer: Timer, source: => Source[Out, Mat]): Source[Out, Mat] = {
val ctx = timer.time()
source
.watchTermination()(Keep.both[Mat, Future[Done]])

View File

@ -20,12 +20,11 @@ da_scala_library(
"//daml-lf/data",
"//daml-lf/transaction",
"//ledger/ledger-api-health",
"//ledger/metrics",
"//ledger/participant-state",
"//libs-scala/direct-execution-context",
"@maven//:com_google_protobuf_protobuf_java",
"@maven//:com_typesafe_akka_akka_actor_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",
"@maven//:io_dropwizard_metrics_metrics_core",
"@maven//:io_dropwizard_metrics_metrics_jvm",
],
)

View File

@ -6,9 +6,9 @@ package com.daml.ledger.participant.state.v1.metrics
import akka.NotUsed
import akka.stream.scaladsl.Source
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.participant.state.metrics.{MetricName, Metrics}
import com.daml.ledger.participant.state.v1.{LedgerInitialConditions, Offset, ReadService, Update}
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.participant.state.v1.{LedgerInitialConditions, Offset, ReadService, Update}
import com.daml.metrics.{MetricName, Timed}
final class TimedReadService(delegate: ReadService, metrics: MetricRegistry, prefix: MetricName)
extends ReadService {
@ -22,5 +22,5 @@ final class TimedReadService(delegate: ReadService, metrics: MetricRegistry, pre
delegate.currentHealth()
private def time[Out, Mat](name: String, source: => Source[Out, Mat]): Source[Out, Mat] =
Metrics.timedSource(metrics.timer(prefix :+ name), source)
Timed.source(metrics.timer(prefix :+ name), source)
}

View File

@ -6,7 +6,8 @@ package com.daml.ledger.participant.state.v1.metrics
import java.util.concurrent.CompletionStage
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.participant.state.metrics.{MetricName, Metrics}
import com.daml.daml_lf_dev.DamlLf
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.participant.state.v1.{
Configuration,
Party,
@ -18,8 +19,7 @@ import com.daml.ledger.participant.state.v1.{
WriteService
}
import com.daml.lf.data.Time
import com.daml.daml_lf_dev.DamlLf
import com.daml.ledger.api.health.HealthStatus
import com.daml.metrics.{MetricName, Timed}
final class TimedWriteService(delegate: WriteService, metrics: MetricRegistry, prefix: MetricName)
extends WriteService {
@ -57,5 +57,5 @@ final class TimedWriteService(delegate: WriteService, metrics: MetricRegistry, p
delegate.currentHealth()
private def time[T](name: String, future: => CompletionStage[T]): CompletionStage[T] =
Metrics.timedCompletionStage(metrics.timer(prefix :+ name), future)
Timed.completionStage(metrics.timer(prefix :+ name), future)
}

View File

@ -38,8 +38,8 @@ da_scala_library(
"//daml-lf/transaction:value_java_proto",
"//ledger/ledger-api-common",
"//ledger/ledger-api-health",
"//ledger/metrics",
"//ledger/participant-state",
"//ledger/participant-state-metrics",
"//ledger/participant-state/protobuf:ledger_configuration_java_proto",
"//libs-scala/contextualized-logging",
"@maven//:com_github_ben_manes_caffeine_caffeine",

View File

@ -31,6 +31,7 @@ da_scala_library(
"//ledger/ledger-api-auth",
"//ledger/ledger-api-common",
"//ledger/ledger-api-health",
"//ledger/metrics",
"//ledger/participant-state",
"//ledger/participant-state-index",
"//ledger/participant-state-metrics",

View File

@ -3,7 +3,7 @@
package com.daml.ledger.participant.state.kvutils.app
import com.daml.ledger.participant.state.metrics.MetricName
import com.daml.metrics.MetricName
private[app] object Metrics {

View File

@ -9,17 +9,17 @@ import java.util.concurrent.TimeUnit
import akka.actor.ActorSystem
import akka.stream.Materializer
import com.daml.daml_lf_dev.DamlLf.Archive
import com.daml.ledger.participant.state.kvutils.app.Metrics.{
IndexServicePrefix,
ReadServicePrefix,
WriteServicePrefix
}
import com.daml.ledger.participant.state.metrics.JvmMetricSet
import com.daml.ledger.participant.state.v1.metrics.{TimedReadService, TimedWriteService}
import com.daml.ledger.participant.state.v1.{SubmissionId, WritePackagesService}
import com.daml.lf.archive.DarReader
import com.daml.daml_lf_dev.DamlLf.Archive
import com.daml.logging.LoggingContext.newLoggingContext
import com.daml.metrics.JvmMetricSet
import com.daml.platform.apiserver.{StandaloneApiServer, TimedIndexService}
import com.daml.platform.indexer.StandaloneIndexerServer
import com.daml.resources.akka.AkkaResourceOwner

View File

@ -14,9 +14,9 @@ import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
}
import com.daml.ledger.participant.state.kvutils.DamlStateMap
import com.daml.ledger.participant.state.kvutils.committer.Committer._
import com.daml.ledger.participant.state.metrics.MetricName
import com.daml.ledger.participant.state.v1.ParticipantId
import com.daml.lf.data.Time
import com.daml.metrics.MetricName
import org.slf4j.{Logger, LoggerFactory}
/** A committer processes a submission, with its inputs into an ordered set of output state and a log entry.

View File

@ -4,7 +4,7 @@
package com.daml.ledger.participant.state
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue}
import com.daml.ledger.participant.state.metrics.MetricName
import com.daml.metrics.MetricName
import com.google.protobuf.ByteString
/** The participant-state key-value utilities provide methods to succinctly implement

View File

@ -12,8 +12,6 @@ import com.daml.ledger.participant.state.kvutils.DamlKvutils._
import com.daml.ledger.participant.state.kvutils.api.LedgerReader
import com.daml.ledger.participant.state.kvutils.caching.Cache
import com.daml.ledger.participant.state.kvutils.{Bytes, Envelope, KeyValueCommitting}
import com.daml.ledger.participant.state.metrics.MetricName
import com.daml.ledger.participant.state.metrics.Metrics.timedFuture
import com.daml.ledger.participant.state.v1.ParticipantId
import com.daml.ledger.validator.SubmissionValidator._
import com.daml.ledger.validator.ValidationFailed.{MissingInputState, ValidationError}
@ -21,6 +19,7 @@ import com.daml.lf.data.Time.Timestamp
import com.daml.lf.engine.Engine
import com.daml.logging.LoggingContext.newLoggingContext
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.{MetricName, Timed}
import com.google.protobuf.ByteString
import scala.annotation.tailrec
@ -185,7 +184,7 @@ class SubmissionValidator[LogResult] private[validator] (
timedLedgerStateAccess
.inTransaction { stateOperations =>
for {
readInputs <- timedFuture(
readInputs <- Timed.future(
Metrics.validateSubmission,
for {
readStateValues <- stateOperations.readState(inputKeysAsBytes)
@ -199,7 +198,7 @@ class SubmissionValidator[LogResult] private[validator] (
_ <- verifyAllInputsArePresent(declaredInputs, readInputs)
} yield readInputs
)
logEntryAndState <- timedFuture(
logEntryAndState <- Timed.future(
Metrics.processSubmission,
Future.fromTry(
Try(
@ -217,7 +216,7 @@ class SubmissionValidator[LogResult] private[validator] (
stateOperations,
)
result <- postProcessResultTimer.fold(processResult())(
timedFuture(_, processResult()))
Timed.future(_, processResult()))
} yield result
}
.transform {

View File

@ -22,8 +22,8 @@ da_scala_binary(
"//daml-lf/archive:daml_lf_dev_archive_java_proto",
"//daml-lf/data",
"//daml-lf/engine",
"//ledger/metrics",
"//ledger/participant-state",
"//ledger/participant-state-metrics",
"//ledger/participant-state/kvutils",
"//ledger/participant-state/kvutils:daml_kvutils_java_proto",
"@maven//:com_google_protobuf_protobuf_java",

View File

@ -10,10 +10,10 @@ import java.util.concurrent.TimeUnit
import com.codahale.metrics
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.participant.state.kvutils.{DamlKvutils => Proto, _}
import com.daml.ledger.participant.state.metrics.JvmMetricSet
import com.daml.ledger.participant.state.v1._
import com.daml.lf.data.Ref
import com.daml.lf.engine.Engine
import com.daml.metrics.JvmMetricSet
import scala.collection.JavaConverters._
import scala.util.Try

View File

@ -32,6 +32,7 @@ compile_deps = [
"//ledger/ledger-api-domain",
"//ledger/ledger-api-health",
"//ledger/ledger-on-sql",
"//ledger/metrics",
"//ledger/participant-state",
"//ledger/participant-state-index",
"//ledger/participant-state-metrics",
@ -284,6 +285,7 @@ da_scala_test_suite(
"//ledger/ledger-api-common:ledger-api-common-scala-tests-lib",
"//ledger/ledger-api-domain",
"//ledger/ledger-api-health",
"//ledger/metrics",
"//ledger/participant-state",
"//ledger/participant-state-index",
"//ledger/participant-state-metrics",

View File

@ -6,7 +6,7 @@ package com.daml.platform.apiserver
import java.util.concurrent.atomic.AtomicBoolean
import com.codahale.metrics.{MetricRegistry, Timer}
import com.daml.ledger.participant.state.metrics.MetricName
import com.daml.metrics.MetricName
import io.grpc.{Metadata, ServerCall, ServerCallHandler, ServerInterceptor}
import scala.collection.concurrent.TrieMap

View File

@ -3,7 +3,7 @@
package com.daml.platform.apiserver
import com.daml.ledger.participant.state.metrics.MetricName
import com.daml.metrics.MetricName
private[apiserver] object MetricsNaming {

View File

@ -8,14 +8,6 @@ import java.time.Instant
import akka.NotUsed
import akka.stream.scaladsl.Source
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.participant.state.index.v2
import com.daml.ledger.participant.state.index.v2.IndexService
import com.daml.ledger.participant.state.metrics.{MetricName, Metrics}
import com.daml.ledger.participant.state.v1.{Configuration, PackageId, ParticipantId, Party}
import com.daml.lf.data.Ref
import com.daml.lf.language.Ast
import com.daml.lf.transaction.Node
import com.daml.lf.value.Value
import com.daml.daml_lf_dev.DamlLf
import com.daml.ledger.api.domain
import com.daml.ledger.api.domain.{ApplicationId, CommandId, LedgerId, LedgerOffset, TransactionId}
@ -28,6 +20,14 @@ import com.daml.ledger.api.v1.transaction_service.{
GetTransactionTreesResponse,
GetTransactionsResponse
}
import com.daml.ledger.participant.state.index.v2
import com.daml.ledger.participant.state.index.v2.IndexService
import com.daml.ledger.participant.state.v1.{Configuration, PackageId, ParticipantId, Party}
import com.daml.lf.data.Ref
import com.daml.lf.language.Ast
import com.daml.lf.transaction.Node
import com.daml.lf.value.Value
import com.daml.metrics.{MetricName, Timed}
import scala.concurrent.Future
@ -156,8 +156,8 @@ final class TimedIndexService(delegate: IndexService, metrics: MetricRegistry, p
delegate.currentHealth()
private def time[T](name: String, future: => Future[T]): Future[T] =
Metrics.timedFuture(metrics.timer(prefix :+ name), future)
Timed.future(metrics.timer(prefix :+ name), future)
private def time[Out, Mat](name: String, source: => Source[Out, Mat]): Source[Out, Mat] =
Metrics.timedSource(metrics.timer(prefix :+ name), source)
Timed.source(metrics.timer(prefix :+ name), source)
}

View File

@ -11,7 +11,6 @@ import com.daml.ledger.participant.state.index.v2.{ContractStore, IndexPackagesS
import com.daml.ledger.participant.state.v1.{SubmitterInfo, TransactionMeta}
import com.daml.lf.crypto
import com.daml.lf.data.{Ref, Time}
import com.daml.platform.metrics.timedFuture
import com.daml.lf.engine.{
Blinding,
Engine,
@ -25,6 +24,7 @@ import com.daml.lf.engine.{
}
import com.daml.lf.language.Ast.Package
import com.daml.logging.LoggingContext
import com.daml.metrics.Timed
import com.daml.platform.store.ErrorCause
import scalaz.syntax.tag._
@ -89,13 +89,16 @@ final class StoreBackedCommandExecutor(
case ResultError(err) => Future.successful(Left(err))
case ResultNeedContract(acoid, resume) =>
timedFuture(
Metrics.lookupActiveContract,
contractStore.lookupActiveContract(submitter, acoid),
).flatMap(instance => resolveStep(resume(instance)))
Timed
.future(
Metrics.lookupActiveContract,
contractStore.lookupActiveContract(submitter, acoid),
)
.flatMap(instance => resolveStep(resume(instance)))
case ResultNeedKey(key, resume) =>
timedFuture(Metrics.lookupContractKey, contractStore.lookupContractKey(submitter, key))
Timed
.future(Metrics.lookupContractKey, contractStore.lookupContractKey(submitter, key))
.flatMap(contractId => resolveStep(resume(contractId)))
case ResultNeedPackage(packageId, resume) =>
@ -106,7 +109,7 @@ final class StoreBackedCommandExecutor(
})
if (gettingPackage) {
val future = timedFuture(Metrics.getLfPackage, packagesService.getLfPackage(packageId))
val future = Timed.future(Metrics.getLfPackage, packagesService.getLfPackage(packageId))
future.onComplete {
case Success(None) | Failure(_) =>
// Did not find the package or got an error when looking for it. Remove the promise to allow later retries.

View File

@ -7,7 +7,7 @@ import com.codahale.metrics.MetricRegistry
import com.daml.ledger.api.domain
import com.daml.lf.crypto.Hash
import com.daml.logging.LoggingContext
import com.daml.platform.metrics.timedFuture
import com.daml.metrics.Timed
import com.daml.platform.store.ErrorCause
import scala.concurrent.{ExecutionContext, Future}
@ -26,6 +26,6 @@ class TimedCommandExecutor(
implicit ec: ExecutionContext,
logCtx: LoggingContext,
): Future[Either[ErrorCause, CommandExecutionResult]] =
timedFuture(timer, delegate.execute(commands, submissionSeed))
Timed.future(timer, delegate.execute(commands, submissionSeed))
}

View File

@ -3,7 +3,7 @@
package com.daml.platform.apiserver
import com.daml.ledger.participant.state.metrics.MetricName
import com.daml.metrics.MetricName
package object execution {

View File

@ -35,10 +35,10 @@ import com.daml.lf.transaction.BlindingInfo
import com.daml.lf.transaction.Transaction.Transaction
import com.daml.logging.LoggingContext.withEnrichedLoggingContext
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Timed
import com.daml.platform.api.grpc.GrpcApiService
import com.daml.platform.apiserver.MetricsNaming
import com.daml.platform.apiserver.execution.{CommandExecutionResult, CommandExecutor}
import com.daml.platform.metrics.timedFuture
import com.daml.platform.server.api.services.domain.CommandSubmissionService
import com.daml.platform.server.api.services.grpc.GrpcCommandSubmissionService
import com.daml.platform.server.api.validation.ErrorFactories
@ -245,7 +245,7 @@ final class ApiSubmissionService private (
case None =>
transactionInfo match {
case CommandExecutionResult(submitterInfo, transactionMeta, transaction, _) =>
timedFuture(
Timed.future(
Metrics.submittedTransactionsTimer,
FutureConverters.toScala(
writeService.submitTransaction(submitterInfo, transactionMeta, transaction)))

View File

@ -8,15 +8,6 @@ import java.time.Instant
import akka.NotUsed
import akka.stream.scaladsl.Source
import com.codahale.metrics.{MetricRegistry, Timer}
import com.daml.ledger.participant.state.index.v2.{CommandDeduplicationResult, PackageDetails}
import com.daml.ledger.participant.state.metrics.MetricName
import com.daml.ledger.participant.state.v1.{Configuration, Offset}
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.{Identifier, PackageId, Party}
import com.daml.lf.language.Ast
import com.daml.lf.transaction.Node.GlobalKey
import com.daml.lf.value.Value
import com.daml.lf.value.Value.{AbsoluteContractId, ContractInst}
import com.daml.daml_lf_dev.DamlLf.Archive
import com.daml.ledger.TransactionId
import com.daml.ledger.api.domain.{ApplicationId, CommandId, LedgerId, PartyDetails}
@ -27,16 +18,24 @@ import com.daml.ledger.api.v1.transaction_service.{
GetFlatTransactionResponse,
GetTransactionResponse,
GetTransactionTreesResponse,
GetTransactionsResponse,
GetTransactionsResponse
}
import com.daml.platform.metrics.timedFuture
import com.daml.ledger.participant.state.index.v2.{CommandDeduplicationResult, PackageDetails}
import com.daml.ledger.participant.state.v1.{Configuration, Offset}
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.{Identifier, PackageId, Party}
import com.daml.lf.language.Ast
import com.daml.lf.transaction.Node.GlobalKey
import com.daml.lf.value.Value
import com.daml.lf.value.Value.{AbsoluteContractId, ContractInst}
import com.daml.metrics.{MetricName, Timed}
import com.daml.platform.store.ReadOnlyLedger
import com.daml.platform.store.entries.{
ConfigurationEntry,
LedgerEntry,
PackageLedgerEntry,
PartyLedgerEntry,
PartyLedgerEntry
}
import com.daml.platform.store.ReadOnlyLedger
import scala.concurrent.Future
@ -109,16 +108,16 @@ class MeteredReadOnlyLedger(ledger: ReadOnlyLedger, metrics: MetricRegistry)
contractId: Value.AbsoluteContractId,
forParty: Party
): Future[Option[ContractInst[Value.VersionedValue[AbsoluteContractId]]]] =
timedFuture(Metrics.lookupContract, ledger.lookupContract(contractId, forParty))
Timed.future(Metrics.lookupContract, ledger.lookupContract(contractId, forParty))
override def lookupKey(key: GlobalKey, forParty: Party): Future[Option[AbsoluteContractId]] =
timedFuture(Metrics.lookupKey, ledger.lookupKey(key, forParty))
Timed.future(Metrics.lookupKey, ledger.lookupKey(key, forParty))
override def lookupFlatTransactionById(
transactionId: TransactionId,
requestingParties: Set[Party],
): Future[Option[GetFlatTransactionResponse]] =
timedFuture(
Timed.future(
Metrics.lookupFlatTransactionById,
ledger.lookupFlatTransactionById(transactionId, requestingParties),
)
@ -127,31 +126,31 @@ class MeteredReadOnlyLedger(ledger: ReadOnlyLedger, metrics: MetricRegistry)
transactionId: TransactionId,
requestingParties: Set[Party],
): Future[Option[GetTransactionResponse]] =
timedFuture(
Timed.future(
Metrics.lookupTransactionTreeById,
ledger.lookupTransactionTreeById(transactionId, requestingParties),
)
override def lookupMaximumLedgerTime(contractIds: Set[AbsoluteContractId]): Future[Instant] =
timedFuture(Metrics.lookupMaximumLedgerTime, ledger.lookupMaximumLedgerTime(contractIds))
Timed.future(Metrics.lookupMaximumLedgerTime, ledger.lookupMaximumLedgerTime(contractIds))
override def getParties(parties: Seq[Party]): Future[List[PartyDetails]] =
timedFuture(Metrics.getParties, ledger.getParties(parties))
Timed.future(Metrics.getParties, ledger.getParties(parties))
override def listKnownParties(): Future[List[PartyDetails]] =
timedFuture(Metrics.listKnownParties, ledger.listKnownParties())
Timed.future(Metrics.listKnownParties, ledger.listKnownParties())
override def partyEntries(startExclusive: Offset): Source[(Offset, PartyLedgerEntry), NotUsed] =
ledger.partyEntries(startExclusive)
override def listLfPackages(): Future[Map[PackageId, PackageDetails]] =
timedFuture(Metrics.listLfPackages, ledger.listLfPackages())
Timed.future(Metrics.listLfPackages, ledger.listLfPackages())
override def getLfArchive(packageId: PackageId): Future[Option[Archive]] =
timedFuture(Metrics.getLfArchive, ledger.getLfArchive(packageId))
Timed.future(Metrics.getLfArchive, ledger.getLfArchive(packageId))
override def getLfPackage(packageId: PackageId): Future[Option[Ast.Package]] =
timedFuture(Metrics.getLfPackage, ledger.getLfPackage(packageId))
Timed.future(Metrics.getLfPackage, ledger.getLfPackage(packageId))
override def packageEntries(
startExclusive: Offset): Source[(Offset, PackageLedgerEntry), NotUsed] =
@ -162,7 +161,7 @@ class MeteredReadOnlyLedger(ledger: ReadOnlyLedger, metrics: MetricRegistry)
}
override def lookupLedgerConfiguration(): Future[Option[(Offset, Configuration)]] =
timedFuture(Metrics.lookupLedgerConfiguration, ledger.lookupLedgerConfiguration())
Timed.future(Metrics.lookupLedgerConfiguration, ledger.lookupLedgerConfiguration())
override def configurationEntries(
startExclusive: Option[Offset]): Source[(Offset, ConfigurationEntry), NotUsed] =
@ -173,12 +172,12 @@ class MeteredReadOnlyLedger(ledger: ReadOnlyLedger, metrics: MetricRegistry)
submitter: Ref.Party,
submittedAt: Instant,
deduplicateUntil: Instant): Future[CommandDeduplicationResult] =
timedFuture(
Timed.future(
Metrics.deduplicateCommand,
ledger.deduplicateCommand(commandId, submitter, submittedAt, deduplicateUntil))
override def removeExpiredDeduplicationData(currentTime: Instant): Future[Unit] =
timedFuture(
Timed.future(
Metrics.removeExpiredDeduplicationData,
ledger.removeExpiredDeduplicationData(currentTime))
@ -186,7 +185,7 @@ class MeteredReadOnlyLedger(ledger: ReadOnlyLedger, metrics: MetricRegistry)
commandId: CommandId,
submitter: Ref.Party,
): Future[Unit] =
timedFuture(
Timed.future(
Metrics.stopDeduplicatingCommand,
ledger.stopDeduplicatingCommand(commandId, submitter))
}

View File

@ -9,21 +9,20 @@ import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl.{Keep, Sink}
import com.codahale.metrics.{Gauge, MetricRegistry, Timer}
import com.daml.daml_lf_dev.DamlLf
import com.daml.dec.{DirectExecutionContext => DEC}
import com.daml.ledger.api.domain
import com.daml.ledger.participant.state.index.v2
import com.daml.ledger.participant.state.metrics.MetricName
import com.daml.ledger.participant.state.v1.Update._
import com.daml.ledger.participant.state.v1._
import com.daml.lf.data.Ref.LedgerString
import com.daml.lf.engine.Blinding
import com.daml.daml_lf_dev.DamlLf
import com.daml.dec.{DirectExecutionContext => DEC}
import com.daml.ledger.api.domain
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.{MetricName, Timed}
import com.daml.platform.ApiOffset.ApiOffsetConverter
import com.daml.platform.common.LedgerIdMismatchException
import com.daml.platform.configuration.ServerRole
import com.daml.platform.events.EventIdFormatter
import com.daml.platform.metrics.timedFuture
import com.daml.platform.store.dao.{JdbcLedgerDao, LedgerDao}
import com.daml.platform.store.entries.{LedgerEntry, PackageLedgerEntry, PartyLedgerEntry}
import com.daml.platform.store.{FlywayMigrations, PersistenceEntry}
@ -335,7 +334,7 @@ class JdbcIndexer private[indexer] (
.viaMat(KillSwitches.single)(Keep.right[NotUsed, UniqueKillSwitch])
.mapAsync(1) {
case (offset, update) =>
timedFuture(Metrics.stateUpdateProcessingTimer, handleStateUpdate(offset, update))
Timed.future(Metrics.stateUpdateProcessingTimer, handleStateUpdate(offset, update))
}
.toMat(Sink.ignore)(Keep.both)
.run()

View File

@ -1,20 +0,0 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform
import com.codahale.metrics.Timer
import com.daml.dec.DirectExecutionContext
import scala.concurrent.Future
package object metrics {
def timedFuture[T](timer: Timer, future: => Future[T]): Future[T] = {
val ctx = timer.time()
val result = future
result.onComplete(_ => ctx.stop())(DirectExecutionContext)
result
}
}

View File

@ -10,7 +10,6 @@ import java.time.{Duration, Instant}
import akka.actor.ActorSystem
import akka.stream.Materializer
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.participant.state.metrics.MetricName
import com.daml.ledger.participant.state.v1.metrics.TimedWriteService
import com.daml.ledger.participant.state.v1.{ParticipantId, SeedService}
import com.daml.ledger.participant.state.{v1 => ParticipantState}
@ -26,6 +25,7 @@ import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.health.HealthChecks
import com.daml.logging.LoggingContext.newLoggingContext
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.MetricName
import com.daml.platform.apiserver._
import com.daml.platform.packages.InMemoryPackageStore
import com.daml.platform.sandbox.SandboxServer._

View File

@ -9,7 +9,7 @@ import java.util.concurrent.TimeUnit
import com.codahale.metrics.Slf4jReporter.LoggingLevel
import com.codahale.metrics.jmx.JmxReporter
import com.codahale.metrics.{MetricRegistry, Reporter, Slf4jReporter}
import com.daml.ledger.participant.state.metrics.JvmMetricSet
import com.daml.metrics.JvmMetricSet
import com.daml.platform.configuration.MetricsReporter
import com.daml.resources.{Resource, ResourceOwner}

View File

@ -6,13 +6,12 @@ package com.daml.platform.sandbox.stores.ledger
import java.time.Instant
import com.codahale.metrics.{MetricRegistry, Timer}
import com.daml.ledger.participant.state.metrics.MetricName
import com.daml.daml_lf_dev.DamlLf.Archive
import com.daml.ledger.participant.state.v1._
import com.daml.lf.data.Ref.Party
import com.daml.lf.data.Time
import com.daml.daml_lf_dev.DamlLf.Archive
import com.daml.metrics.{MetricName, Timed}
import com.daml.platform.index.MeteredReadOnlyLedger
import com.daml.platform.metrics.timedFuture
import scala.concurrent.Future
@ -33,7 +32,7 @@ private class MeteredLedger(ledger: Ledger, metrics: MetricRegistry)
submitterInfo: SubmitterInfo,
transactionMeta: TransactionMeta,
transaction: SubmittedTransaction): Future[SubmissionResult] =
timedFuture(
Timed.future(
Metrics.publishTransaction,
ledger.publishTransaction(submitterInfo, transactionMeta, transaction))
@ -41,7 +40,7 @@ private class MeteredLedger(ledger: Ledger, metrics: MetricRegistry)
submissionId: SubmissionId,
party: Party,
displayName: Option[String]): Future[SubmissionResult] =
timedFuture(
Timed.future(
Metrics.publishPartyAllocation,
ledger.publishPartyAllocation(submissionId, party, displayName))
@ -50,7 +49,7 @@ private class MeteredLedger(ledger: Ledger, metrics: MetricRegistry)
knownSince: Instant,
sourceDescription: Option[String],
payload: List[Archive]): Future[SubmissionResult] =
timedFuture(
Timed.future(
Metrics.uploadPackages,
ledger.uploadPackages(submissionId, knownSince, sourceDescription, payload))
@ -58,7 +57,7 @@ private class MeteredLedger(ledger: Ledger, metrics: MetricRegistry)
maxRecordTime: Time.Timestamp,
submissionId: String,
config: Configuration): Future[SubmissionResult] =
timedFuture(
Timed.future(
Metrics.publishConfiguration,
ledger.publishConfiguration(maxRecordTime, submissionId, config))

View File

@ -19,7 +19,6 @@ import com.daml.ledger.on.sql.Database.InvalidDatabaseException
import com.daml.ledger.on.sql.SqlLedgerReaderWriter
import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState
import com.daml.ledger.participant.state.kvutils.caching
import com.daml.ledger.participant.state.metrics.MetricName
import com.daml.ledger.participant.state.v1
import com.daml.ledger.participant.state.v1.metrics.{TimedReadService, TimedWriteService}
import com.daml.ledger.participant.state.v1.{SeedService, WritePackagesService}
@ -27,6 +26,7 @@ import com.daml.lf.archive.DarReader
import com.daml.lf.data.Ref
import com.daml.logging.ContextualizedLogger
import com.daml.logging.LoggingContext.newLoggingContext
import com.daml.metrics.MetricName
import com.daml.platform.apiserver._
import com.daml.platform.common.LedgerIdMode
import com.daml.platform.indexer.{IndexerConfig, IndexerStartupMode, StandaloneIndexerServer}

View File

@ -7,9 +7,9 @@ import java.sql.Connection
import java.util.concurrent.{Executor, Executors, TimeUnit}
import com.codahale.metrics.{MetricRegistry, Timer}
import com.daml.ledger.participant.state.metrics.MetricName
import com.daml.ledger.api.health.{HealthStatus, ReportsHealth}
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.MetricName
import com.daml.platform.configuration.ServerRole
import com.daml.resources.ResourceOwner
import com.google.common.util.concurrent.ThreadFactoryBuilder

View File

@ -8,27 +8,26 @@ import java.time.Instant
import akka.NotUsed
import akka.stream.scaladsl.Source
import com.codahale.metrics.{MetricRegistry, Timer}
import com.daml.daml_lf_dev.DamlLf.Archive
import com.daml.ledger.api.domain.{CommandId, LedgerId, PartyDetails}
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.participant.state.index.v2.{CommandDeduplicationResult, PackageDetails}
import com.daml.ledger.participant.state.metrics.MetricName
import com.daml.ledger.participant.state.v1.{Configuration, Offset, ParticipantId}
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.{PackageId, Party}
import com.daml.lf.transaction.Node
import com.daml.lf.value.Value
import com.daml.lf.value.Value.{AbsoluteContractId, ContractInst}
import com.daml.daml_lf_dev.DamlLf.Archive
import com.daml.ledger.api.domain.{CommandId, LedgerId, PartyDetails}
import com.daml.ledger.api.health.HealthStatus
import com.daml.platform.metrics.timedFuture
import com.daml.metrics.{MetricName, Timed}
import com.daml.platform.store.Contract.ActiveContract
import com.daml.platform.store.PersistenceEntry
import com.daml.platform.store.dao.events.{TransactionsReader, TransactionsWriter}
import com.daml.platform.store.entries.{
ConfigurationEntry,
LedgerEntry,
PackageLedgerEntry,
PartyLedgerEntry,
PartyLedgerEntry
}
import com.daml.platform.store.PersistenceEntry
import scala.collection.immutable
import scala.concurrent.Future
@ -63,35 +62,35 @@ class MeteredLedgerReadDao(ledgerDao: LedgerReadDao, metrics: MetricRegistry)
override def currentHealth(): HealthStatus = ledgerDao.currentHealth()
override def lookupLedgerId(): Future[Option[LedgerId]] =
timedFuture(Metrics.lookupLedgerId, ledgerDao.lookupLedgerId())
Timed.future(Metrics.lookupLedgerId, ledgerDao.lookupLedgerId())
override def lookupLedgerEnd(): Future[Offset] =
timedFuture(Metrics.lookupLedgerEnd, ledgerDao.lookupLedgerEnd())
Timed.future(Metrics.lookupLedgerEnd, ledgerDao.lookupLedgerEnd())
override def lookupInitialLedgerEnd(): Future[Option[Offset]] =
timedFuture(Metrics.lookupLedgerEnd, ledgerDao.lookupInitialLedgerEnd())
Timed.future(Metrics.lookupLedgerEnd, ledgerDao.lookupInitialLedgerEnd())
override def lookupActiveOrDivulgedContract(
contractId: Value.AbsoluteContractId,
forParty: Party): Future[Option[ContractInst[Value.VersionedValue[AbsoluteContractId]]]] =
timedFuture(
Timed.future(
Metrics.lookupActiveContract,
ledgerDao.lookupActiveOrDivulgedContract(contractId, forParty))
override def lookupMaximumLedgerTime(
contractIds: Set[AbsoluteContractId],
): Future[Instant] =
timedFuture(Metrics.lookupMaximumLedgerTime, ledgerDao.lookupMaximumLedgerTime(contractIds))
Timed.future(Metrics.lookupMaximumLedgerTime, ledgerDao.lookupMaximumLedgerTime(contractIds))
override def lookupLedgerEntry(offset: Offset): Future[Option[LedgerEntry]] =
timedFuture(Metrics.lookupLedgerEntry, ledgerDao.lookupLedgerEntry(offset))
Timed.future(Metrics.lookupLedgerEntry, ledgerDao.lookupLedgerEntry(offset))
override def transactionsReader: TransactionsReader = ledgerDao.transactionsReader
override def lookupKey(
key: Node.GlobalKey,
forParty: Party): Future[Option[Value.AbsoluteContractId]] =
timedFuture(Metrics.lookupKey, ledgerDao.lookupKey(key, forParty))
Timed.future(Metrics.lookupKey, ledgerDao.lookupKey(key, forParty))
override def getLedgerEntries(
startExclusive: Offset,
@ -100,10 +99,10 @@ class MeteredLedgerReadDao(ledgerDao: LedgerReadDao, metrics: MetricRegistry)
ledgerDao.getLedgerEntries(startExclusive, endInclusive)
override def getParties(parties: Seq[Party]): Future[List[PartyDetails]] =
timedFuture(Metrics.getParties, ledgerDao.getParties(parties))
Timed.future(Metrics.getParties, ledgerDao.getParties(parties))
override def listKnownParties(): Future[List[PartyDetails]] =
timedFuture(Metrics.listKnownParties, ledgerDao.listKnownParties())
Timed.future(Metrics.listKnownParties, ledgerDao.listKnownParties())
override def getPartyEntries(
startExclusive: Offset,
@ -112,10 +111,10 @@ class MeteredLedgerReadDao(ledgerDao: LedgerReadDao, metrics: MetricRegistry)
ledgerDao.getPartyEntries(startExclusive, endInclusive)
override def listLfPackages: Future[Map[PackageId, PackageDetails]] =
timedFuture(Metrics.listLfPackages, ledgerDao.listLfPackages)
Timed.future(Metrics.listLfPackages, ledgerDao.listLfPackages)
override def getLfArchive(packageId: PackageId): Future[Option[Archive]] =
timedFuture(Metrics.getLfArchive, ledgerDao.getLfArchive(packageId))
Timed.future(Metrics.getLfArchive, ledgerDao.getLfArchive(packageId))
override def getPackageEntries(
startExclusive: Offset,
@ -124,7 +123,7 @@ class MeteredLedgerReadDao(ledgerDao: LedgerReadDao, metrics: MetricRegistry)
/** Looks up the current ledger configuration, if it has been set. */
override def lookupLedgerConfiguration(): Future[Option[(Offset, Configuration)]] =
timedFuture(Metrics.lookupLedgerConfiguration, ledgerDao.lookupLedgerConfiguration())
Timed.future(Metrics.lookupLedgerConfiguration, ledgerDao.lookupLedgerConfiguration())
/** Get a stream of configuration entries. */
override def getConfigurationEntries(
@ -139,17 +138,17 @@ class MeteredLedgerReadDao(ledgerDao: LedgerReadDao, metrics: MetricRegistry)
submitter: Ref.Party,
submittedAt: Instant,
deduplicateUntil: Instant): Future[CommandDeduplicationResult] =
timedFuture(
Timed.future(
Metrics.deduplicateCommand,
ledgerDao.deduplicateCommand(commandId, submitter, submittedAt, deduplicateUntil))
override def removeExpiredDeduplicationData(currentTime: Instant): Future[Unit] =
timedFuture(
Timed.future(
Metrics.removeExpiredDeduplicationData,
ledgerDao.removeExpiredDeduplicationData(currentTime))
override def stopDeduplicatingCommand(commandId: CommandId, submitter: Party): Future[Unit] =
timedFuture(
Timed.future(
Metrics.stopDeduplicatingCommand,
ledgerDao.stopDeduplicatingCommand(commandId, submitter))
}
@ -173,14 +172,14 @@ class MeteredLedgerDao(ledgerDao: LedgerDao, metrics: MetricRegistry)
override def storeLedgerEntry(
offset: Offset,
ledgerEntry: PersistenceEntry): Future[PersistenceResponse] =
timedFuture(Metrics.storeLedgerEntry, ledgerDao.storeLedgerEntry(offset, ledgerEntry))
Timed.future(Metrics.storeLedgerEntry, ledgerDao.storeLedgerEntry(offset, ledgerEntry))
override def storeInitialState(
activeContracts: immutable.Seq[ActiveContract],
ledgerEntries: immutable.Seq[(Offset, LedgerEntry)],
newLedgerEnd: Offset
): Future[Unit] =
timedFuture(
Timed.future(
Metrics.storeInitialState,
ledgerDao.storeInitialState(activeContracts, ledgerEntries, newLedgerEnd))
@ -193,7 +192,7 @@ class MeteredLedgerDao(ledgerDao: LedgerDao, metrics: MetricRegistry)
override def storePartyEntry(
offset: Offset,
partyEntry: PartyLedgerEntry): Future[PersistenceResponse] =
timedFuture(Metrics.storePartyEntry, ledgerDao.storePartyEntry(offset, partyEntry))
Timed.future(Metrics.storePartyEntry, ledgerDao.storePartyEntry(offset, partyEntry))
override def storeConfigurationEntry(
offset: Offset,
@ -203,7 +202,7 @@ class MeteredLedgerDao(ledgerDao: LedgerDao, metrics: MetricRegistry)
configuration: Configuration,
rejectionReason: Option[String]
): Future[PersistenceResponse] =
timedFuture(
Timed.future(
Metrics.storeConfigurationEntry,
ledgerDao.storeConfigurationEntry(
offset,
@ -219,7 +218,7 @@ class MeteredLedgerDao(ledgerDao: LedgerDao, metrics: MetricRegistry)
packages: List[(Archive, PackageDetails)],
entry: Option[PackageLedgerEntry]
): Future[PersistenceResponse] =
timedFuture(Metrics.storePackageEntry, ledgerDao.storePackageEntry(offset, packages, entry))
Timed.future(Metrics.storePackageEntry, ledgerDao.storePackageEntry(offset, packages, entry))
override def transactionsWriter: TransactionsWriter = ledgerDao.transactionsWriter
}

View File

@ -94,6 +94,8 @@
type: jar-deploy
- target: //ledger/ledger-on-sql:ledger-on-sql
type: jar-scala
- target: //ledger/metrics:metrics
type: jar-scala
- target: //ledger/participant-state:participant-state
type: jar-scala
- target: //ledger/participant-state/kvutils:daml_kvutils_java_proto