mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 09:17:43 +03:00
Bisected daml.execution.cache.register_update [DPP-1248]. (#15274)
CHANGELOG_BEGIN * Removed daml_execution_cache_register_update metric * Added: - daml_execution_cache_contract_state_register_update - daml_execution_cache_key_state_register_update CHANGELOG_END
This commit is contained in:
parent
ddfa7c1d9d
commit
e5c418a913
@ -122,16 +122,31 @@ class ExecutionMetrics(override val prefix: MetricName, override val registry: M
|
||||
override val prefix: MetricName = ExecutionMetrics.this.prefix :+ "cache"
|
||||
override val registry = ExecutionMetrics.this.registry
|
||||
|
||||
val keyState: CacheMetrics = new CacheMetrics(prefix :+ "key_state", registry)
|
||||
val contractState: CacheMetrics = new CacheMetrics(prefix :+ "contract_state", registry)
|
||||
object keyState {
|
||||
val stateCache: CacheMetrics = new CacheMetrics(prefix :+ "key_state", registry)
|
||||
|
||||
@MetricDoc.Tag(
|
||||
summary = "The time spent to update the cache.",
|
||||
description = """The total time spent in sequential update steps of the contract state caches
|
||||
|updating logic. This metric is created with debugging purposes in mind.""",
|
||||
qualification = Debug,
|
||||
)
|
||||
val registerCacheUpdate: Timer = timer(prefix :+ "register_update")
|
||||
@MetricDoc.Tag(
|
||||
summary = "The time spent to update the cache.",
|
||||
description =
|
||||
"""The total time spent in sequential update steps of the contract state caches
|
||||
|updating logic. This metric is created with debugging purposes in mind.""",
|
||||
qualification = Debug,
|
||||
)
|
||||
val registerCacheUpdate: Timer = timer(prefix :+ "key_state" :+ "register_update")
|
||||
}
|
||||
|
||||
object contractState {
|
||||
val stateCache: CacheMetrics = new CacheMetrics(prefix :+ "contract_state", registry)
|
||||
|
||||
@MetricDoc.Tag(
|
||||
summary = "The time spent to update the cache.",
|
||||
description =
|
||||
"""The total time spent in sequential update steps of the contract state caches
|
||||
|updating logic. This metric is created with debugging purposes in mind.""",
|
||||
qualification = Debug,
|
||||
)
|
||||
val registerCacheUpdate: Timer = timer(prefix :+ "contract_state" :+ "register_update")
|
||||
}
|
||||
|
||||
@MetricDoc.Tag(
|
||||
summary =
|
||||
|
@ -18,9 +18,9 @@ object ContractKeyStateCache {
|
||||
initialCacheIndex = initialCacheIndex,
|
||||
cache = SizedCache.from[GlobalKey, ContractKeyStateValue](
|
||||
SizedCache.Configuration(cacheSize),
|
||||
metrics.daml.execution.cache.keyState,
|
||||
metrics.daml.execution.cache.keyState.stateCache,
|
||||
),
|
||||
registerUpdateTimer = metrics.daml.execution.cache.registerCacheUpdate,
|
||||
registerUpdateTimer = metrics.daml.execution.cache.keyState.registerCacheUpdate,
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -18,9 +18,9 @@ object ContractsStateCache {
|
||||
initialCacheIndex = initialCacheIndex,
|
||||
cache = SizedCache.from[ContractId, ContractStateValue](
|
||||
SizedCache.Configuration(cacheSize),
|
||||
metrics.daml.execution.cache.contractState,
|
||||
metrics.daml.execution.cache.contractState.stateCache,
|
||||
),
|
||||
registerUpdateTimer = metrics.daml.execution.cache.registerCacheUpdate,
|
||||
registerUpdateTimer = metrics.daml.execution.cache.contractState.registerCacheUpdate,
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -127,26 +127,29 @@ private[platform] case class StateCache[K, V](
|
||||
)(implicit loggingContext: LoggingContext): Future[Unit] =
|
||||
eventualUpdate
|
||||
.map { (value: V) =>
|
||||
pendingUpdates.synchronized {
|
||||
pendingUpdates.get(key) match {
|
||||
case Some(pendingForKey) =>
|
||||
// Only update the cache if the current update is targeting the cacheIndex
|
||||
// sampled when initially dispatched in `putAsync`.
|
||||
// Otherwise we can assume that a more recent `putAsync` has an update in-flight
|
||||
// or that the entry has been updated synchronously with `put` with a recent Index DB entry.
|
||||
if (pendingForKey.latestValidAt == validAt) {
|
||||
cache.put(key, value)
|
||||
logger.debug(
|
||||
s"Updated cache for $key with ${truncateValueForLogging(value)} at $validAt"
|
||||
Timed.value(
|
||||
registerUpdateTimer,
|
||||
pendingUpdates.synchronized {
|
||||
pendingUpdates.get(key) match {
|
||||
case Some(pendingForKey) =>
|
||||
// Only update the cache if the current update is targeting the cacheIndex
|
||||
// sampled when initially dispatched in `putAsync`.
|
||||
// Otherwise we can assume that a more recent `putAsync` has an update in-flight
|
||||
// or that the entry has been updated synchronously with `put` with a recent Index DB entry.
|
||||
if (pendingForKey.latestValidAt == validAt) {
|
||||
cache.put(key, value)
|
||||
logger.debug(
|
||||
s"Updated cache for $key with ${truncateValueForLogging(value)} at $validAt"
|
||||
)
|
||||
}
|
||||
removeFromPending(key)
|
||||
case None =>
|
||||
logger.warn(
|
||||
s"Pending updates tracker for $key not registered. This could be due to a transient error causing a restart in the index service."
|
||||
)
|
||||
}
|
||||
removeFromPending(key)
|
||||
case None =>
|
||||
logger.warn(
|
||||
s"Pending updates tracker for $key not registered. This could be due to a transient error causing a restart in the index service."
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
.recover {
|
||||
// Negative contract lookups are forwarded to `putAsync` as failed futures as they should not be cached
|
||||
|
@ -29,9 +29,7 @@ class StateCacheSpec extends AsyncFlatSpec with Matchers with MockitoSugar with
|
||||
override implicit def executionContext: ExecutionContext =
|
||||
scala.concurrent.ExecutionContext.global
|
||||
|
||||
private val cacheUpdateTimer = new Metrics(
|
||||
new MetricRegistry
|
||||
).daml.execution.cache.registerCacheUpdate
|
||||
private val cacheUpdateTimer = Metrics.ForTesting.timer(MetricName("cache-update"))
|
||||
|
||||
behavior of s"$className.putAsync"
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user