simulation bridge code to populate trigger metric data structures using logging (#16505)

This commit is contained in:
Carl Pulley 2023-03-14 18:05:04 +00:00 committed by GitHub
parent 08f80f2a3a
commit a8e9ee3937
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -19,11 +19,634 @@ import com.daml.lf.engine.trigger.UnfoldState.{flatMapConcatNodeOps, toSourceOps
import com.daml.lf.speedy.SValue.SList
import com.daml.lf.speedy.SValue
import com.daml.logging.LoggingContextOf
import com.daml.logging.entries.LoggingValue
import com.daml.logging.entries.LoggingValue.{Nested, OfInt, OfLong, OfString}
import com.daml.platform.services.time.TimeProviderType
import com.daml.scalautil.Statement.discard
import com.daml.script.converter.Converter.Implicits._
import com.daml.util.Ctx
import java.util.UUID
import scala.collection.mutable
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.util.Try
@SuppressWarnings(
Array(
"org.wartremover.warts.JavaSerializable",
"org.wartremover.warts.Product",
"org.wartremover.warts.Serializable",
)
)
private class TriggerRuleMetrics {
private[this] var metricCountData = mutable.Map.empty[Set[UUID], mutable.Map[String, Long]]
private[this] var metricTimingData =
mutable.Map.empty[Set[UUID], mutable.Map[String, FiniteDuration]]
def addLogEntry(msg: String, context: TriggerLogContext): Unit = {
addSteps(context)
addSubmissions(context)
addGetTime(context)
addACSActiveStart(msg, context)
addACSActiveEnd(msg, context)
addACSPendingStart(msg, context)
addACSPendingEnd(msg, context)
addInFlightStart(msg, context)
addInFlightEnd(msg, context)
addRuleEvaluation(context)
addStepIteratorMean(context)
addStepIteratorDelayMean(context)
}
def clearMetrics(): Unit = {
metricCountData = mutable.Map.empty
metricTimingData = mutable.Map.empty
}
def getMetrics: TriggerRuleMetrics.RuleMetrics = {
import TriggerRuleMetrics._
require(
metricCountData.keys.toSet.size == 1,
s"Metric count data was associated with multiple spans: ${metricCountData.keys.toSet}",
)
val (spanId, countMetrics) = metricCountData.head
require(
metricTimingData.keys.toSet == metricCountData.keys.toSet,
s"Timing metric data was associated with different spans to count metric data: for span ID $spanId found the keys ${metricTimingData.keys.toSet}",
)
val timingMetrics = metricTimingData(spanId)
require(
countMetrics.keys.toSet == Set(
"acs-active-start",
"acs-active-end",
"acs-pending-start",
"acs-pending-end",
"in-flight-start",
"in-flight-end",
"steps",
"get-time",
"submission-total",
"submission-create",
"submission-exercise",
"submission-create-and-exercise",
"submission-exercise-by-key",
),
s"Count metric data did not contain all the expected values: for span ID $spanId found the keys ${countMetrics.keys.toSet}",
)
require(
timingMetrics.keys.toSet
.subsetOf(
Set(
"rule-evaluation",
"step-iterator-mean",
"step-iterator-delay-mean",
)
),
s"Timing metric data contained unexpected values: for span ID $spanId found the keys ${timingMetrics.keys.toSet}",
)
require(
Set(
"rule-evaluation",
"step-iterator-mean",
).subsetOf(timingMetrics.keys.toSet),
s"Timing metric data was missing expected values: for span ID $spanId found the keys ${timingMetrics.keys.toSet}",
)
RuleMetrics(
evaluation = EvaluationMetrics(
steps = countMetrics("steps"),
submissions = countMetrics("submission-total"),
getTimes = countMetrics("get-time"),
ruleEvaluation = timingMetrics("rule-evaluation"),
stepIteratorMean = timingMetrics("step-iterator-mean"),
stepIteratorDelayMean = timingMetrics.get("step-iterator-delay-mean"),
),
submission = SubmissionMetrics(
creates = countMetrics("submission-create"),
exercises = countMetrics("submission-exercise"),
createAndExercises = countMetrics("submission-create-and-exercise"),
exerciseByKeys = countMetrics("submission-exercise-by-key"),
),
startState = InternalStateMetrics(
acs = ACSMetrics(
activeContracts = countMetrics("acs-active-start"),
pendingContracts = countMetrics("acs-pending-start"),
),
inFlight = InFlightMetrics(
commands = countMetrics("in-flight-start")
),
),
endState = InternalStateMetrics(
acs = ACSMetrics(
activeContracts = countMetrics("acs-active-end"),
pendingContracts = countMetrics("acs-pending-end"),
),
inFlight = InFlightMetrics(
commands = countMetrics("in-flight-end")
),
),
)
}
private def addACSActiveStart(msg: String, context: TriggerLogContext): Unit = discard {
if ("Trigger rule .+ start".r.matches(msg)) {
for {
count <- getACSActive(context)
} yield {
metricCountData.getOrElseUpdate(
Set(context.span.id),
mutable.Map.empty,
) += ("acs-active-start" -> count)
}
}
}
private def addACSActiveEnd(msg: String, context: TriggerLogContext): Unit = discard {
if ("Trigger rule .+ end".r.matches(msg)) {
for {
count <- getACSActive(context)
} yield {
metricCountData.getOrElseUpdate(
Set(context.span.id),
mutable.Map.empty,
) += ("acs-active-end" -> count)
}
}
}
private def addACSPendingStart(msg: String, context: TriggerLogContext): Unit = discard {
if ("Trigger rule .+ start".r.matches(msg)) {
for {
count <- getACSPending(context)
} yield {
metricCountData.getOrElseUpdate(
Set(context.span.id),
mutable.Map.empty,
) += ("acs-pending-start" -> count)
}
}
}
private def addACSPendingEnd(msg: String, context: TriggerLogContext): Unit = discard {
if ("Trigger rule .+ end".r.matches(msg)) {
for {
count <- getACSPending(context)
} yield {
metricCountData.getOrElseUpdate(
Set(context.span.id),
mutable.Map.empty,
) += ("acs-pending-end" -> count)
}
}
}
private def addInFlightStart(msg: String, context: TriggerLogContext): Unit = discard {
if ("Trigger rule .+ start".r.matches(msg)) {
for {
count <- getInFlight(context)
} yield {
metricCountData.getOrElseUpdate(
Set(context.span.id),
mutable.Map.empty,
) += ("in-flight-start" -> count)
}
}
}
private def addInFlightEnd(msg: String, context: TriggerLogContext): Unit = discard {
if ("Trigger rule .+ end".r.matches(msg)) {
for {
count <- getInFlight(context)
} yield {
metricCountData.getOrElseUpdate(
Set(context.span.id),
mutable.Map.empty,
) += ("in-flight-end" -> count)
}
}
}
private def addSteps(context: TriggerLogContext): Unit = discard {
for {
metrics <- getMetrics(context)
steps <- metrics
.expect(
"LoggingValue.Nested",
{
case Nested(entries) if entries.contents.contains("steps") => entries.contents("steps")
},
)
count <- steps
.expect(
"LoggingValue.OfNumeric",
{
case OfInt(value) => value.toLong
case OfLong(value) => value
},
)
} yield {
metricCountData.getOrElseUpdate(
context.span.parent,
mutable.Map.empty,
) += ("steps" -> count)
}
}
private def addRuleEvaluation(context: TriggerLogContext): Unit = discard {
for {
metrics <- getMetrics(context)
duration <- metrics
.expect(
"LoggingValue.Nested",
{
case Nested(entries) if entries.contents.contains("duration") =>
entries.contents("duration")
},
)
ruleEval <- duration
.expect(
"LoggingValue.Nested",
{
case Nested(entries) if entries.contents.contains("rule-evaluation") =>
entries.contents("rule-evaluation")
},
)
value <- ruleEval
.expect("LoggingValue.OfString", { case OfString(value) => value })
timing <- Try(
Duration(value.replace('u', 'µ')).asInstanceOf[FiniteDuration]
).toEither
} yield {
metricTimingData.getOrElseUpdate(
context.span.parent,
mutable.Map.empty,
) += ("rule-evaluation" -> timing)
}
}
private def addStepIteratorMean(context: TriggerLogContext): Unit = discard {
for {
metrics <- getMetrics(context)
duration <- metrics
.expect(
"LoggingValue.Nested",
{
case Nested(entries) if entries.contents.contains("duration") =>
entries.contents("duration")
},
)
ruleEval <- duration
.expect(
"LoggingValue.Nested",
{
case Nested(entries) if entries.contents.contains("step-iterator-mean") =>
entries.contents("step-iterator-mean")
},
)
value <- ruleEval
.expect("LoggingValue.OfString", { case OfString(value) => value })
timing <- Try(
Duration(value.replace('u', 'µ')).asInstanceOf[FiniteDuration]
).toEither
} yield {
metricTimingData.getOrElseUpdate(
context.span.parent,
mutable.Map.empty,
) += ("step-iterator-mean" -> timing)
}
}
private def addStepIteratorDelayMean(context: TriggerLogContext): Unit = discard {
for {
metrics <- getMetrics(context)
duration <- metrics
.expect(
"LoggingValue.Nested",
{
case Nested(entries) if entries.contents.contains("duration") =>
entries.contents("duration")
},
)
ruleEval <- duration
.expect(
"LoggingValue.Nested",
{
case Nested(entries) if entries.contents.contains("step-iterator-delay-mean") =>
entries.contents("step-iterator-delay-mean")
},
)
value <- ruleEval
.expect("LoggingValue.OfString", { case OfString(value) => value })
timing <- Try(
Duration(value.replace('u', 'µ')).asInstanceOf[FiniteDuration]
).toEither
} yield {
metricTimingData.getOrElseUpdate(
context.span.parent,
mutable.Map.empty,
) += ("step-iterator-delay-mean" -> timing)
}
}
private def addSubmissions(context: TriggerLogContext): Unit = {
addSubmissionTotal(context)
addSubmissionCreate(context)
addSubmissionExercise(context)
addSubmissionCreateAndExercise(context)
addSubmissionExerciseByKey(context)
}
private def addGetTime(context: TriggerLogContext): Unit = discard {
for {
metrics <- getMetrics(context)
getTimes <- metrics
.expect(
"LoggingValue.Nested",
{
case Nested(entries) if entries.contents.contains("get-time") =>
entries.contents("get-time")
},
)
count <- getTimes
.expect(
"LoggingValue.OfNumeric",
{
case OfInt(value) => value.toLong
case OfLong(value) => value
},
)
} yield {
metricCountData.getOrElseUpdate(
context.span.parent,
mutable.Map.empty,
) += ("get-time" -> count)
}
}
private def addSubmissionTotal(context: TriggerLogContext): Unit = discard {
for {
metrics <- getMetrics(context)
submissions <- metrics
.expect(
"LoggingValue.Nested",
{
case Nested(entries) if entries.contents.contains("submissions") =>
entries.contents("submissions")
},
)
total <- submissions
.expect(
"LoggingValue.Nested",
{
case Nested(entries) if entries.contents.contains("total") => entries.contents("total")
},
)
count <- total
.expect(
"LoggingValue.OfNumeric",
{
case OfInt(value) => value.toLong
case OfLong(value) => value
},
)
} yield {
metricCountData.getOrElseUpdate(
context.span.parent,
mutable.Map.empty,
) += ("submission-total" -> count)
}
}
private def addSubmissionCreate(context: TriggerLogContext): Unit = discard {
for {
metrics <- getMetrics(context)
submissions <- metrics
.expect(
"LoggingValue.Nested",
{
case Nested(entries) if entries.contents.contains("submissions") =>
entries.contents("submissions")
},
)
create <- submissions
.expect(
"LoggingValue.Nested",
{
case Nested(entries) if entries.contents.contains("create") =>
entries.contents("create")
},
)
count <- create
.expect(
"LoggingValue.OfNumeric",
{
case OfInt(value) => value.toLong
case OfLong(value) => value
},
)
} yield {
metricCountData.getOrElseUpdate(
context.span.parent,
mutable.Map.empty,
) += ("submission-create" -> count)
}
}
private def addSubmissionExercise(context: TriggerLogContext): Unit = discard {
for {
metrics <- getMetrics(context)
submissions <- metrics
.expect(
"LoggingValue.Nested",
{
case Nested(entries) if entries.contents.contains("submissions") =>
entries.contents("submissions")
},
)
exercise <- submissions
.expect(
"LoggingValue.Nested",
{
case Nested(entries) if entries.contents.contains("exercise") =>
entries.contents("exercise")
},
)
count <- exercise
.expect(
"LoggingValue.OfNumeric",
{
case OfInt(value) => value.toLong
case OfLong(value) => value
},
)
} yield {
metricCountData.getOrElseUpdate(
context.span.parent,
mutable.Map.empty,
) += ("submission-exercise" -> count)
}
}
private def addSubmissionCreateAndExercise(context: TriggerLogContext): Unit = discard {
for {
metrics <- getMetrics(context)
submissions <- metrics
.expect(
"LoggingValue.Nested",
{
case Nested(entries) if entries.contents.contains("submissions") =>
entries.contents("submissions")
},
)
createAndExercise <- submissions
.expect(
"LoggingValue.Nested",
{
case Nested(entries) if entries.contents.contains("createAndExercise") =>
entries.contents("createAndExercise")
},
)
count <- createAndExercise
.expect(
"LoggingValue.OfNumeric",
{
case OfInt(value) => value.toLong
case OfLong(value) => value
},
)
} yield {
metricCountData.getOrElseUpdate(
context.span.parent,
mutable.Map.empty,
) += ("submission-create-and-exercise" -> count)
}
}
private def addSubmissionExerciseByKey(context: TriggerLogContext): Unit = discard {
for {
metrics <- getMetrics(context)
submissions <- metrics
.expect(
"LoggingValue.Nested",
{
case Nested(entries) if entries.contents.contains("submissions") =>
entries.contents("submissions")
},
)
exerciseByKey <- submissions
.expect(
"LoggingValue.Nested",
{
case Nested(entries) if entries.contents.contains("exerciseByKey") =>
entries.contents("exerciseByKey")
},
)
count <- exerciseByKey
.expect(
"LoggingValue.OfNumeric",
{
case OfInt(value) => value.toLong
case OfLong(value) => value
},
)
} yield {
metricCountData.getOrElseUpdate(
context.span.parent,
mutable.Map.empty,
) += ("submission-exercise-by-key" -> count)
}
}
private def getMetrics(context: TriggerLogContext): Either[String, LoggingValue] = {
context.entries
.find(_._1 == "metrics")
.map(_._2)
.toRight("Trigger logging context did not have a metrics block")
}
private def getACSActive(context: TriggerLogContext): Either[String, Long] = {
for {
metrics <- getMetrics(context)
acs <- metrics
.expect(
"LoggingValue.Nested",
{ case Nested(entries) if entries.contents.contains("acs") => entries.contents("acs") },
)
active <- acs
.expect(
"LoggingValue.Nested",
{
case Nested(entries) if entries.contents.contains("active") =>
entries.contents("active")
},
)
result <- active
.expect(
"LoggingValue.OfNumeric",
{
case OfInt(value) => value.toLong
case OfLong(value) => value
},
)
} yield result
}
private def getACSPending(context: TriggerLogContext): Either[String, Long] = {
for {
metrics <- getMetrics(context)
acs <- metrics
.expect(
"LoggingValue.Nested",
{ case Nested(entries) if entries.contents.contains("acs") => entries.contents("acs") },
)
pending <- acs
.expect(
"LoggingValue.Nested",
{
case Nested(entries) if entries.contents.contains("pending") =>
entries.contents("pending")
},
)
result <- pending
.expect(
"LoggingValue.OfNumeric",
{
case OfInt(value) => value.toLong
case OfLong(value) => value
},
)
} yield result
}
private def getInFlight(context: TriggerLogContext): Either[String, Long] = {
for {
metrics <- getMetrics(context)
inFlight <- metrics
.expect(
"LoggingValue.Nested",
{
case Nested(entries) if entries.contents.contains("in-flight") =>
entries.contents("in-flight")
},
)
result <- inFlight
.expect(
"LoggingValue.OfNumeric",
{
case OfInt(value) => value.toLong
case OfLong(value) => value
},
)
} yield result
}
}
object TriggerRuleMetrics {
final case class EvaluationMetrics(