Integrate metrics for cache (#6043)

* Integrate metrics for cache

Centralizes the creation of instrumented caches in a single point and adds
metrics coming from Caffeine into the mix.

changelog_begin
[Ledger API Server] if --max-state-value-cache-size is greater than zero, the
following additional metrics will be recorded under the daml.kvutils.submission.validator.state_value_cache namespace:
hits, misses, load_successes, load_failures, load_total_time, evictions and evicted_weight
changelog_end

* Fix Bazel build file formatting

* Address https://github.com/digital-asset/daml/pull/6043#discussion_r427902339

* Address https://github.com/digital-asset/daml/pull/6043#discussion_r427904794

* Review DropwizardStatsCounter

- address https://github.com/digital-asset/daml/pull/6043#discussion_r427905074
- address https://github.com/digital-asset/daml/pull/6043#discussion_r427905184
- address https://github.com/digital-asset/daml/pull/6043#discussion_r427905307
- address https://github.com/digital-asset/daml/pull/6043#discussion_r427905387
- address https://github.com/digital-asset/daml/pull/6043#discussion_r427905650

* Address https://github.com/digital-asset/daml/pull/6043#discussion_r427906243

* Fix implicit numeric widening fatal warning

* Address https://github.com/digital-asset/daml/pull/6043#discussion_r427960762

* Fix infinite loop in metrics
This commit is contained in:
Stefano Baghino 2020-05-20 15:51:45 +02:00 committed by GitHub
parent 595f1e278d
commit 6f1e051648
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 237 additions and 109 deletions

View File

@ -14,7 +14,9 @@ da_scala_library(
"//:__subpackages__",
],
deps = [
"//ledger/metrics",
"@maven//:com_github_ben_manes_caffeine_caffeine",
"@maven//:io_dropwizard_metrics_metrics_core",
"@maven//:org_scala_lang_modules_scala_java8_compat_2_12",
],
)

View File

@ -0,0 +1,101 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.caching
import com.codahale.metrics.Gauge
import com.daml.metrics.CacheMetrics
import com.github.benmanes.caffeine.{cache => caffeine}
import scala.compat.java8.OptionConverters._
sealed abstract class Cache[Key, Value] {
def put(key: Key, value: Value): Unit
def get(key: Key, acquire: Key => Value): Value
def getIfPresent(key: Key): Option[Value]
}
object Cache {
type Size = Long
def none[Key, Value]: Cache[Key, Value] = new NoCache
def from[Key <: AnyRef: Weight, Value <: AnyRef: Weight](
configuration: Configuration,
): Cache[Key, Value] =
from(configuration, None)
def from[Key <: AnyRef: Weight, Value <: AnyRef: Weight](
configuration: Configuration,
metrics: CacheMetrics,
): Cache[Key, Value] =
from(configuration, Some(metrics))
private def from[Key <: AnyRef: Weight, Value <: AnyRef: Weight](
configuration: Configuration,
metrics: Option[CacheMetrics],
): Cache[Key, Value] =
configuration match {
case Configuration(maximumWeight) if maximumWeight <= 0 =>
none
case Configuration(maximumWeight) =>
val builder =
caffeine.Caffeine
.newBuilder()
.maximumWeight(maximumWeight)
.weigher(Weight.weigher[Key, Value])
metrics.fold(new CaffeineCache(builder))(new InstrumentedCaffeineCache(builder, _))
}
final class NoCache[Key, Value] private[Cache] extends Cache[Key, Value] {
override def put(key: Key, value: Value): Unit = ()
override def get(key: Key, acquire: Key => Value): Value = acquire(key)
override def getIfPresent(key: Key): Option[Value] = None
}
sealed class CaffeineCache[Key, Value] private[Cache] (builder: caffeine.Caffeine[Key, Value])
extends Cache[Key, Value] {
private val cache = init(builder)
protected def init(builder: caffeine.Caffeine[Key, Value]): caffeine.Cache[Key, Value] =
builder.build()
override def put(key: Key, value: Value): Unit = cache.put(key, value)
override def get(key: Key, acquire: Key => Value): Value =
cache.get(key, key => acquire(key))
override def getIfPresent(key: Key): Option[Value] =
Option(cache.getIfPresent(key))
}
private def size(cache: caffeine.Cache[_, _]): Gauge[Long] =
() => cache.estimatedSize()
private def weight(cache: caffeine.Cache[_, _]): Gauge[Long] =
() => cache.policy().eviction().asScala.flatMap(_.weightedSize.asScala).getOrElse(0)
final class InstrumentedCaffeineCache[Key, Value] private[Cache] (
builder: caffeine.Caffeine[Key, Value],
metrics: CacheMetrics,
) extends CaffeineCache[Key, Value](builder) {
override protected def init(
builder: caffeine.Caffeine[Key, Value],
): caffeine.Cache[Key, Value] = {
val cache = super.init(builder.recordStats(() => new DropwizardStatsCounter(metrics)))
metrics.registerSizeGauge(size(cache))
metrics.registerWeightGauge(weight(cache))
cache
}
}
}

View File

@ -6,5 +6,7 @@ package com.daml.caching
final case class Configuration(maximumWeight: Long) extends AnyVal
object Configuration {
val none: Configuration = Configuration(maximumWeight = 0)
}

View File

@ -0,0 +1,53 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.caching
import java.util.concurrent.TimeUnit
import com.daml.metrics.CacheMetrics
import com.github.benmanes.caffeine.cache.RemovalCause
import com.github.benmanes.caffeine.cache.stats.{CacheStats, StatsCounter}
private[caching] final class DropwizardStatsCounter(
metrics: CacheMetrics,
) extends StatsCounter {
override def recordHits(newHits: Int): Unit =
metrics.hitCount.inc(newHits.toLong)
override def recordMisses(newMisses: Int): Unit =
metrics.missCount.inc(newMisses.toLong)
override def recordLoadSuccess(loadTimeNanos: Long): Unit = {
metrics.loadSuccessCount.inc()
metrics.totalLoadTime.update(loadTimeNanos, TimeUnit.NANOSECONDS)
}
override def recordLoadFailure(loadTimeNanos: Long): Unit = {
metrics.loadFailureCount.inc()
metrics.totalLoadTime.update(loadTimeNanos, TimeUnit.NANOSECONDS)
}
override def recordEviction(weight: Int, cause: RemovalCause): Unit = {
metrics.evictionCount.inc()
metrics.evictionWeight.inc(weight.toLong)
}
override def recordEviction(): Unit = {
metrics.evictionCount.inc()
metrics.evictionWeight.inc()
}
override def snapshot(): CacheStats =
new CacheStats(
metrics.hitCount.getCount,
metrics.missCount.getCount,
metrics.loadSuccessCount.getCount,
metrics.loadFailureCount.getCount,
metrics.totalLoadTime.getCount,
metrics.evictionCount.getCount,
metrics.evictionWeight.getCount,
)
}

View File

@ -23,12 +23,12 @@ da_scala_library(
"//daml-lf/data",
"//daml-lf/engine",
"//language-support/scala/bindings",
"//ledger/caching",
"//ledger/ledger-api-common",
"//ledger/ledger-api-health",
"//ledger/metrics",
"//ledger/participant-state",
"//ledger/participant-state/kvutils",
"//libs-scala/caching",
"//libs-scala/resources",
"@maven//:com_google_protobuf_protobuf_java",
"@maven//:com_typesafe_akka_akka_actor_2_12",
@ -52,13 +52,13 @@ da_scala_test(
"//language-support/scala/bindings",
"//ledger-api/rs-grpc-bridge",
"//ledger-api/testing-utils",
"//ledger/caching",
"//ledger/ledger-api-common",
"//ledger/ledger-api-health",
"//ledger/metrics",
"//ledger/participant-state",
"//ledger/participant-state/kvutils",
"//ledger/participant-state/kvutils:kvutils-tests-lib",
"//libs-scala/caching",
"//libs-scala/contextualized-logging",
"//libs-scala/resources",
"@maven//:com_typesafe_akka_akka_actor_2_12",
@ -78,6 +78,7 @@ da_scala_library(
"//daml-lf/data",
"//daml-lf/engine",
"//language-support/scala/bindings",
"//ledger/caching",
"//ledger/ledger-api-auth",
"//ledger/ledger-api-common",
"//ledger/ledger-api-health",
@ -86,7 +87,6 @@ da_scala_library(
"//ledger/participant-state/kvutils",
"//ledger/participant-state/kvutils/app",
"//ledger/sandbox",
"//libs-scala/caching",
"//libs-scala/contextualized-logging",
"//libs-scala/ports",
"//libs-scala/resources",

View File

@ -51,21 +51,27 @@ object Main {
new KeyValueParticipantState(
readerWriter,
readerWriter,
createMetrics(participantConfig, config))
createMetrics(participantConfig, config),
)
def owner(config: Config[ExtraConfig], participantConfig: ParticipantConfig, engine: Engine)(
implicit materializer: Materializer,
logCtx: LoggingContext,
): ResourceOwner[InMemoryLedgerReaderWriter] =
): ResourceOwner[InMemoryLedgerReaderWriter] = {
val metrics = createMetrics(participantConfig, config)
new InMemoryLedgerReaderWriter.Owner(
initialLedgerId = config.ledgerId,
participantId = participantConfig.participantId,
metrics = createMetrics(participantConfig, config),
stateValueCache = caching.Cache.from(config.stateValueCache),
metrics = metrics,
stateValueCache = caching.Cache.from(
configuration = config.stateValueCache,
metrics = metrics.daml.kvutils.submission.validator.stateValueCache,
),
dispatcher = dispatcher,
state = state,
engine = engine,
)
}
override def ledgerConfig(config: Config[ExtraConfig]): LedgerConfiguration =
LedgerConfiguration.defaultLocalLedger

View File

@ -91,13 +91,13 @@ da_scala_library(
"//daml-lf/engine",
"//daml-lf/transaction",
"//language-support/scala/bindings",
"//ledger/caching",
"//ledger/ledger-api-common",
"//ledger/ledger-api-domain",
"//ledger/ledger-api-health",
"//ledger/metrics",
"//ledger/participant-state",
"//ledger/participant-state/kvutils",
"//libs-scala/caching",
"//libs-scala/contextualized-logging",
"//libs-scala/resources",
"@maven//:com_google_protobuf_protobuf_java",
@ -122,6 +122,7 @@ da_scala_library(
"//daml-lf/data",
"//daml-lf/engine",
"//language-support/scala/bindings",
"//ledger/caching",
"//ledger/ledger-api-auth",
"//ledger/ledger-api-health",
"//ledger/metrics",
@ -129,7 +130,6 @@ da_scala_library(
"//ledger/participant-state/kvutils",
"//ledger/participant-state/kvutils/app",
"//ledger/sandbox",
"//libs-scala/caching",
"//libs-scala/contextualized-logging",
"//libs-scala/ports",
"//libs-scala/resources",
@ -163,6 +163,7 @@ da_scala_library(
"//language-support/scala/bindings",
"//ledger-api/rs-grpc-bridge",
"//ledger-api/testing-utils",
"//ledger/caching",
"//ledger/ledger-api-auth",
"//ledger/ledger-api-health",
"//ledger/metrics",
@ -171,7 +172,6 @@ da_scala_library(
"//ledger/participant-state/kvutils:kvutils-tests-lib",
"//ledger/participant-state/kvutils/app",
"//ledger/sandbox",
"//libs-scala/caching",
"//libs-scala/contextualized-logging",
"//libs-scala/ports",
"//libs-scala/postgresql-testing",

View File

@ -70,7 +70,10 @@ object SqlLedgerFactory extends LedgerFactory[ReadWriteService, ExtraConfig] {
metrics = metrics,
engine,
jdbcUrl,
stateValueCache = caching.Cache.from(config.stateValueCache),
stateValueCache = caching.Cache.from(
configuration = config.stateValueCache,
metrics = metrics.daml.kvutils.submission.validator.stateValueCache,
),
seedService = SeedService(config.seeding),
resetOnStartup = false
).acquire()

View File

@ -0,0 +1,30 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.metrics
import com.codahale.metrics.MetricRegistry.MetricSupplier
import com.codahale.metrics.{Counter, Gauge, MetricRegistry, Timer}
final class CacheMetrics private[metrics] (
registry: MetricRegistry,
prefix: MetricName,
) {
val hitCount: Counter = registry.counter(prefix :+ "hits")
val missCount: Counter = registry.counter(prefix :+ "misses")
val loadSuccessCount: Counter = registry.counter(prefix :+ "load_successes")
val loadFailureCount: Counter = registry.counter(prefix :+ "load_failures")
val totalLoadTime: Timer = registry.timer(prefix :+ "load_total_time")
val evictionCount: Counter = registry.counter(prefix :+ "evictions")
val evictionWeight: Counter = registry.counter(prefix :+ "evicted_weight")
def registerSizeGauge(sizeGauge: Gauge[Long]): Unit =
register(prefix :+ "size", () => sizeGauge)
def registerWeightGauge(weightGauge: Gauge[Long]): Unit =
register(prefix :+ "weight", () => weightGauge)
private def register[T](name: MetricName, gaugeSupplier: MetricSupplier[Gauge[_]]): Gauge[T] = {
registry.remove(name)
registry.gauge(name, gaugeSupplier).asInstanceOf[Gauge[T]]
}
}

View File

@ -154,13 +154,7 @@ class Metrics(val registry: MetricRegistry) {
val commitSubmission: Timer = registry.timer(prefix :+ "commit_submission")
val transformSubmission: Timer = registry.timer(prefix :+ "transform_submission")
object stateValueCache {
val prefix: MetricName = validator.prefix :+ "state_value_cache"
def size(value: () => Long): Gauge[Nothing] =
gauge(prefix :+ "size", () => () => value())
def weight(value: () => Long): Gauge[Nothing] =
gauge(prefix :+ "weight", () => () => value())
}
val stateValueCache = new CacheMetrics(registry, prefix :+ "state_value_cache")
}
}
@ -328,6 +322,11 @@ class Metrics(val registry: MetricRegistry) {
"lookup_transaction_tree_by_id")
val getActiveContracts: DatabaseMetrics = createDatabaseMetrics("get_active_contracts")
object translation {
val prefix: MetricName = db.prefix :+ "translation"
val cache = new CacheMetrics(registry, prefix :+ "cache")
}
}
}
object indexer {

View File

@ -36,12 +36,12 @@ da_scala_library(
"//daml-lf/transaction",
"//daml-lf/transaction:transaction_java_proto",
"//daml-lf/transaction:value_java_proto",
"//ledger/caching",
"//ledger/ledger-api-common",
"//ledger/ledger-api-health",
"//ledger/metrics",
"//ledger/participant-state",
"//ledger/participant-state/protobuf:ledger_configuration_java_proto",
"//libs-scala/caching",
"//libs-scala/contextualized-logging",
"@maven//:com_google_guava_guava",
"@maven//:com_google_protobuf_protobuf_java",
@ -112,12 +112,12 @@ da_scala_test(
"//daml-lf/transaction:transaction_java_proto",
"//ledger-api/rs-grpc-bridge",
"//ledger-api/testing-utils",
"//ledger/caching",
"//ledger/ledger-api-common",
"//ledger/ledger-api-health",
"//ledger/metrics",
"//ledger/participant-state",
"//ledger/participant-state/protobuf:ledger_configuration_java_proto",
"//libs-scala/caching",
"//libs-scala/contextualized-logging",
"@maven//:com_google_protobuf_protobuf_java",
"@maven//:com_typesafe_akka_akka_actor_2_12",

View File

@ -28,6 +28,7 @@ da_scala_library(
"//daml-lf/data",
"//daml-lf/engine",
"//language-support/scala/bindings",
"//ledger/caching",
"//ledger/ledger-api-auth",
"//ledger/ledger-api-common",
"//ledger/ledger-api-health",
@ -37,7 +38,6 @@ da_scala_library(
"//ledger/participant-state-metrics",
"//ledger/participant-state/kvutils",
"//ledger/sandbox",
"//libs-scala/caching",
"//libs-scala/contextualized-logging",
"//libs-scala/ports",
"//libs-scala/resources",

View File

@ -57,9 +57,6 @@ class SubmissionValidator[LogResult] private[validator] (
private val timedLedgerStateAccess = new TimedLedgerStateAccess(ledgerStateAccess)
metrics.daml.kvutils.submission.validator.stateValueCache.size(() => stateValueCache.size)
metrics.daml.kvutils.submission.validator.stateValueCache.weight(() => stateValueCache.weight)
def validate(
envelope: Bytes,
correlationId: String,

View File

@ -20,6 +20,7 @@ da_scala_test_suite(
"//daml-lf/data",
"//daml-lf/engine",
"//language-support/scala/bindings",
"//ledger/caching",
"//ledger/ledger-api-common",
"//ledger/ledger-api-domain",
"//ledger/ledger-api-health",
@ -29,7 +30,6 @@ da_scala_test_suite(
"//ledger/participant-state/kvutils",
"//ledger/sandbox:ledger-api-server",
"//ledger/test-common",
"//libs-scala/caching",
"//libs-scala/contextualized-logging",
"//libs-scala/resources",
"//libs-scala/resources-akka",

View File

@ -25,6 +25,7 @@ compile_deps = [
"//ledger-api/rs-grpc-akka",
"//ledger-api/rs-grpc-bridge",
"//ledger-service/jwt",
"//ledger/caching",
"//ledger/ledger-api-akka",
"//ledger/ledger-api-auth",
"//ledger/ledger-api-client",
@ -38,7 +39,6 @@ compile_deps = [
"//ledger/participant-state-metrics",
"//ledger/participant-state/kvutils",
"//libs-scala/build-info",
"//libs-scala/caching",
"//libs-scala/contextualized-logging",
"//libs-scala/direct-execution-context",
"//libs-scala/ports",

View File

@ -137,7 +137,9 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] {
timeProvider = timeServiceBackend.getOrElse(TimeProvider.UTC),
seedService = SeedService(seeding),
stateValueCache = caching.Cache.from(
caching.Configuration(maximumWeight = MaximumStateValueCacheSize)),
caching.Configuration(
maximumWeight = MaximumStateValueCacheSize,
)),
engine = engine
)
ledger = new KeyValueParticipantState(readerWriter, readerWriter, metrics)

View File

@ -66,15 +66,17 @@ private[dao] final class TransactionsReader(
rowOffset = offset,
)
.withFetchSize(Some(pageSize))
val rawEvents =
val rawEventsFuture =
dispatcher.executeSql(dbMetrics.getFlatTransactions) { implicit connection =>
query.asVectorOf(EventsTable.rawFlatEventParser)
}
Timed.future(
future = rawEvents.flatMap(Future.traverse(_)(deserializeEntry(verbose))),
timer = dbMetrics.getFlatTransactions.translationTimer,
rawEventsFuture.flatMap(
rawEvents =>
Timed.future(
future = Future.traverse(rawEvents)(deserializeEntry(verbose)),
timer = dbMetrics.getFlatTransactions.translationTimer,
)
)
}
groupContiguous(events)(by = _.transactionId)
@ -127,9 +129,12 @@ private[dao] final class TransactionsReader(
dispatcher.executeSql(dbMetrics.getTransactionTrees) { implicit connection =>
query.asVectorOf(EventsTable.rawTreeEventParser)
}
Timed.future(
future = rawEvents.flatMap(Future.traverse(_)(deserializeEntry(verbose))),
timer = dbMetrics.getTransactionTrees.translationTimer,
rawEvents.flatMap(
es =>
Timed.future(
future = Future.traverse(es)(deserializeEntry(verbose)),
timer = dbMetrics.getTransactionTrees.translationTimer,
)
)
}

View File

@ -1,72 +0,0 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.caching
import com.github.benmanes.caffeine.{cache => caffeine}
import scala.compat.java8.OptionConverters._
sealed abstract class Cache[Key, Value] {
def put(key: Key, value: Value): Unit
def get(key: Key, acquire: Key => Value): Value
def getIfPresent(key: Key): Option[Value]
def size: Cache.Size
def weight: Cache.Size
}
object Cache {
type Size = Long
def none[Key, Value]: Cache[Key, Value] = new NoCache
def from[Key <: AnyRef: Weight, Value <: AnyRef: Weight](
configuration: Configuration,
): Cache[Key, Value] =
configuration match {
case Configuration(0) =>
none
case Configuration(maximumWeight) =>
new CaffeineCache(
caffeine.Caffeine
.newBuilder()
.maximumWeight(maximumWeight)
.weigher(Weight.weigher[Key, Value])
.build[Key, Value]())
}
final class NoCache[Key, Value] private[Cache] extends Cache[Key, Value] {
override def put(key: Key, value: Value): Unit = ()
override def get(key: Key, acquire: Key => Value): Value = acquire(key)
override def getIfPresent(key: Key): Option[Value] = None
override val size: Cache.Size = 0
override val weight: Cache.Size = 0
}
final class CaffeineCache[Key, Value] private[Cache] (val cache: caffeine.Cache[Key, Value])
extends Cache[Key, Value] {
override def put(key: Key, value: Value): Unit = cache.put(key, value)
override def get(key: Key, acquire: Key => Value): Value =
cache.get(key, key => acquire(key))
override def getIfPresent(key: Key): Option[Value] =
Option(cache.getIfPresent(key))
override def size: Cache.Size =
cache.estimatedSize()
override def weight: Cache.Size =
cache.policy().eviction().asScala.flatMap(_.weightedSize().asScala).getOrElse(0)
}
}

View File

@ -76,6 +76,8 @@
type: jar-scala
- target: //ledger-api/testing-utils:testing-utils
type: jar-scala
- target: //ledger/caching:caching
type: jar-scala
- target: //ledger/ledger-api-akka:ledger-api-akka
type: jar-scala
- target: //ledger/ledger-api-auth:ledger-api-auth
@ -128,8 +130,6 @@
type: jar-scala
- target: //libs-scala/auth-utils:auth-utils
type: jar-scala
- target: //libs-scala/caching:caching
type: jar-scala
- target: //libs-scala/contextualized-logging:contextualized-logging
type: jar-scala
- target: //libs-scala/direct-execution-context:direct-execution-context