Bug fixes and logging improvements to trigger simulations (#16694)

This commit is contained in:
Carl Pulley 2023-05-05 13:20:07 +01:00 committed by GitHub
parent 803b7256df
commit 2be396a481
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 202 additions and 134 deletions

View File

@ -23,9 +23,11 @@ class CatAndFoodTriggerSimulation
import CatAndFoodTriggerSimulation._
// For demonstration purposes, we only run for 30 seconds
// For demonstration purposes, we only run the simulation for 30 seconds
override protected implicit lazy val simulationConfig: TriggerSimulationConfig =
TriggerSimulationConfig(simulationDuration = 30.seconds)
// For demonstration purposes, we enable saving Canton logging
override protected val cantonFixtureDebugMode: Boolean = true
override protected def triggerMultiProcessSimulation: Behavior[Unit] = {
Behaviors.setup { context =>
@ -38,7 +40,8 @@ class CatAndFoodTriggerSimulation
val triggerFactory: TriggerProcessFactory =
triggerProcessFactory(client, ledger, "Cats:feedingTrigger", actAs)
// With a negative start state, Cats:feedingTrigger will have a behaviour that is dependent on Cat and Food contract generators
val trigger = context.spawn(triggerFactory.create(SValue.SInt64(-1)), "trigger")
val trigger1 = context.spawn(triggerFactory.create(SValue.SInt64(-1)), "trigger1")
val trigger2 = context.spawn(triggerFactory.create(SValue.SInt64(-1)), "trigger2")
val workload =
context.spawn(
workloadProcess(ledger, actAs)(
@ -52,10 +55,11 @@ class CatAndFoodTriggerSimulation
"workload",
)
context.watch(ledger)
context.watch(trigger)
context.watch(trigger1)
context.watch(trigger2)
context.watch(workload)
super.triggerMultiProcessSimulation
Behaviors.empty
}
}

View File

@ -16,7 +16,7 @@ import org.scalatest.wordspec.AsyncWordSpec
import java.nio.file.{Files, Path}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
import scala.concurrent.{ExecutionContext, TimeoutException}
abstract class TriggerMultiProcessSimulation
extends AsyncWordSpec
@ -24,11 +24,10 @@ abstract class TriggerMultiProcessSimulation
import TriggerMultiProcessSimulation._
// For demonstration purposes, we only run for 30 seconds
protected implicit lazy val simulationConfig: TriggerSimulationConfig =
TriggerSimulationConfig()
protected implicit lazy val simulation: ActorSystem[Unit] =
protected implicit lazy val simulation: ActorSystem[Message] =
ActorSystem(triggerMultiProcessSimulationWithTimeout, "cat-and-food-simulation")
override implicit lazy val materializer: Materializer = Materializer(simulation)
@ -51,26 +50,46 @@ abstract class TriggerMultiProcessSimulation
} yield succeed
}
/** User simulation need to (at least) override this method in order to define a trigger multi-process simulation. If
* the user implementation continues as the `super.triggerMultiProcessSimulation` behavior, then simulations will be
* bounded using time durations from the simulation configuration. If they do not continue with this behaviour, then
* their runtime will be bounded by the duration of the test (i.e. configured using bazel).
/** User simulations need to (at least) override this method in order to define a trigger multi-process simulation.
*
* @return trigger multi-process actor system
*/
protected def triggerMultiProcessSimulation: Behavior[Unit] = {
Behaviors.receive { (context, _) =>
context.log.info(s"Simulation timed out after: ${simulationConfig.simulationDuration}")
Behaviors.stopped
}
}
protected def triggerMultiProcessSimulation: Behavior[Unit]
private[this] def triggerMultiProcessSimulationWithTimeout: Behavior[Unit] = {
Behaviors.withTimers[Unit] { timer =>
timer.startSingleTimer((), simulationConfig.simulationDuration)
private[this] def triggerMultiProcessSimulationWithTimeout: Behavior[Message] = {
Behaviors.withTimers[Message] { timer =>
timer.startSingleTimer(StopSimulation, simulationConfig.simulationDuration)
Behaviors
.supervise(triggerMultiProcessSimulation)
.supervise[Message] {
Behaviors.setup { context =>
context.log.info(s"Simulation will run for ${simulationConfig.simulationDuration}")
context.self ! StartSimulation
Behaviors.logMessages {
Behaviors.receiveMessage {
case StartSimulation =>
triggerMultiProcessSimulation.transformMessages {
case StartSimulation =>
()
case StopSimulation =>
throw TriggerSimulationFailure(
new TimeoutException(
s"Simulation stopped after ${simulationConfig.simulationDuration}"
)
)
}
case StopSimulation =>
throw TriggerSimulationFailure(
new TimeoutException(
s"Simulation stopped after ${simulationConfig.simulationDuration}"
)
)
}
}
}
}
.onFailure[Throwable](SupervisorStrategy.stop)
}
}
@ -101,13 +120,15 @@ object TriggerMultiProcessSimulation {
// (otherwise bazel will remove the directory holding the saved CSV data)
private val tmpDir = Files.createTempDirectory("TriggerSimulation")
println(s"Trigger simulation reporting data is located in $tmpDir")
final case class TriggerSimulationConfig(
simulationSetupTimeout: FiniteDuration = 30.seconds,
simulationDuration: FiniteDuration = 5.minutes,
ledgerSubmissionTimeout: FiniteDuration = 30.seconds,
ledgerRegistrationTimeout: FiniteDuration = 30.seconds,
ledgerWorkloadTimeout: FiniteDuration = 1.second,
triggerDataFile: Path = tmpDir.resolve("trigger-simulation-data.csv"),
ledgerWorkloadTimeout: FiniteDuration = 5.seconds,
triggerDataFile: Path = tmpDir.resolve("trigger-simulation-metrics-data.csv"),
acsDataFile: Path = tmpDir.resolve("trigger-simulation-acs-data.csv"),
)
@ -117,4 +138,8 @@ object TriggerMultiProcessSimulation {
def apply(reason: String): TriggerSimulationFailure =
TriggerSimulationFailure(new RuntimeException(reason))
}
abstract class Message extends Product with Serializable
private case object StartSimulation extends Message
private case object StopSimulation extends Message
}

View File

@ -723,7 +723,7 @@ final class TriggerRuleSimulationLib private[simulation] (
private[simulation] val trigger = Trigger
.fromIdentifier(compiledPackages, triggerId)(
TriggerLogContext.newRootSpan(
"trigger",
"simulation",
"id" -> triggerId.toString,
"applicationId" -> applicationId.unwrap,
"definition" -> triggerId.toString,
@ -749,7 +749,7 @@ final class TriggerRuleSimulationLib private[simulation] (
private[simulation] implicit val triggerContext: TriggerLogContext =
TriggerLogContext.newRootSpanWithCallback(
"trigger",
"simulation",
logObserver,
"id" -> triggerId.toString,
"applicationId" -> applicationId.unwrap,
@ -774,26 +774,23 @@ final class TriggerRuleSimulationLib private[simulation] (
acs: Seq[CreatedEvent]
): Future[(Seq[SubmitRequest], TriggerRuleMetrics.RuleMetrics, SValue)] = {
val context = new SimulationContext
val initStateGraph = context.triggerContext.childSpan("simulation") {
val initStateGraph = context.triggerContext.childSpan("initialStateLambda") {
implicit triggerContext: TriggerLogContext =>
triggerContext.childSpan("initialStateLambda") {
implicit triggerContext: TriggerLogContext =>
GraphDSL.createGraph(Sink.last[SValue]) { implicit gb => saveLastState =>
import GraphDSL.Implicits._
GraphDSL.createGraph(Sink.last[SValue]) { implicit gb => saveLastState =>
import GraphDSL.Implicits._
val clientTime: Timestamp =
Timestamp.assertFromInstant(
Runner.getTimeProvider(RunnerConfig.DefaultTimeProviderType).getCurrentTime
)
val killSwitch = KillSwitches.shared("init-state-simulation")
val initialState = gb add context.runner.runInitialState(clientTime, killSwitch)(acs)
val submissions = gb add Flow[TriggerContext[SubmitRequest]]
val clientTime: Timestamp =
Timestamp.assertFromInstant(
Runner.getTimeProvider(RunnerConfig.DefaultTimeProviderType).getCurrentTime
)
val killSwitch = KillSwitches.shared("init-state-simulation")
val initialState = gb add context.runner.runInitialState(clientTime, killSwitch)(acs)
val submissions = gb add Flow[TriggerContext[SubmitRequest]]
initialState.finalState ~> saveLastState
initialState.elemsOut ~> submissions
initialState.finalState ~> saveLastState
initialState.elemsOut ~> submissions
new SourceShape(submissions.out)
}
new SourceShape(submissions.out)
}
}
val initStateSimulation = Source.fromGraph(initStateGraph)
@ -810,33 +807,30 @@ final class TriggerRuleSimulationLib private[simulation] (
message: TriggerMsg,
): Future[(Seq[SubmitRequest], TriggerRuleMetrics.RuleMetrics, SValue)] = {
val context = new SimulationContext
val updateStateGraph = context.triggerContext.childSpan("simulation") {
val updateStateGraph = context.triggerContext.childSpan("updateStateLambda") {
implicit triggerContext: TriggerLogContext =>
triggerContext.childSpan("updateStateLambda") {
implicit triggerContext: TriggerLogContext =>
GraphDSL.createGraph(Sink.last[SValue]) { implicit gb => saveLastState =>
import GraphDSL.Implicits._
GraphDSL.createGraph(Sink.last[SValue]) { implicit gb => saveLastState =>
import GraphDSL.Implicits._
val lambdaKillSwitch = KillSwitches.shared("update-state-simulation")
val msgIn = gb add TriggerContextualFlow[TriggerMsg]
val encodeMsg =
gb add context.runner.encodeMsgs.map(ctx =>
ctx.copy(value = SList(FrontStack(ctx.value)))
)
val stateOut = gb add Source.single(state)
val rule = gb add context.runner.runRuleOnMsgs(lambdaKillSwitch)
val killSwitch = gb add lambdaKillSwitch.flow[TriggerContext[SValue]]
val submissions = gb add Flow[TriggerContext[SubmitRequest]]
val lambdaKillSwitch = KillSwitches.shared("update-state-simulation")
val msgIn = gb add TriggerContextualFlow[TriggerMsg]
val encodeMsg =
gb add context.runner.encodeMsgs.map(ctx =>
ctx.copy(value = SList(FrontStack(ctx.value)))
)
val stateOut = gb add Source.single(state)
val rule = gb add context.runner.runRuleOnMsgs(lambdaKillSwitch)
val killSwitch = gb add lambdaKillSwitch.flow[TriggerContext[SValue]]
val submissions = gb add Flow[TriggerContext[SubmitRequest]]
// format: off
stateOut ~> rule.initState
msgIn ~> encodeMsg ~> killSwitch ~> rule.elemsIn
submissions <~ rule.elemsOut
rule.finalStates ~> saveLastState
// format: on
// format: off
stateOut ~> rule.initState
msgIn ~> encodeMsg ~> killSwitch ~> rule.elemsIn
submissions <~ rule.elemsOut
rule.finalStates ~> saveLastState
// format: on
new FlowShape(msgIn.in, submissions.out)
}
new FlowShape(msgIn.in, submissions.out)
}
}
val updateStateSimulation = Source

View File

@ -30,34 +30,36 @@ object LedgerProcess {
config: TriggerSimulationConfig,
applicationId: ApplicationId,
): Behavior[Message] = {
Behaviors.setup { context =>
val report = context.spawn(ReportingProcess.create(context.self), "reporting")
val ledgerApi = context.spawn(LedgerApiClient.create(client), "ledger-api")
val ledgerExternal =
context.spawn(new LedgerExternalAction(client).create(), "ledger-external-action")
val triggerRegistration =
context.spawn(
new LedgerRegistration(client).create(ledgerApi, report),
"trigger-registration",
)
Behaviors.logMessages {
Behaviors.setup { context =>
val report = context.spawn(ReportingProcess.create(context.self), "reporting")
val ledgerApi = context.spawn(LedgerApiClient.create(client), "ledger-api")
val ledgerExternal =
context.spawn(new LedgerExternalAction(client).create(), "ledger-external-action")
val triggerRegistration =
context.spawn(
new LedgerRegistration(client).create(ledgerApi, report),
"trigger-registration",
)
context.watch(report)
context.watch(ledgerApi)
context.watch(ledgerExternal)
context.watch(triggerRegistration)
context.watch(report)
context.watch(ledgerApi)
context.watch(ledgerExternal)
context.watch(triggerRegistration)
Behaviors.receiveMessage {
case TriggerRegistration(registration) =>
triggerRegistration ! registration
Behaviors.same
Behaviors.receiveMessage {
case TriggerRegistration(registration) =>
triggerRegistration ! registration
Behaviors.same
case GetTriggerACSDiff(request) =>
triggerRegistration ! request
Behaviors.same
case GetTriggerACSDiff(request) =>
triggerRegistration ! request
Behaviors.same
case ExternalAction(request) =>
ledgerExternal ! request
Behaviors.same
case ExternalAction(request) =>
ledgerExternal ! request
Behaviors.same
}
}
}
}

View File

@ -72,7 +72,6 @@ final class TriggerProcessFactory private[simulation] (
import TriggerProcess._
private[this] val triggerId = UUID.randomUUID()
private[this] val triggerDefRef =
Ref.DefinitionRef(packageId, QualifiedName.assertFromString(name))
private[this] val triggerParties = TriggerParties(
@ -108,6 +107,7 @@ final class TriggerProcessFactory private[simulation] (
def create(userState: SValue, acs: Seq[CreatedEvent] = Seq.empty)(implicit
config: TriggerSimulationConfig
): Behavior[Message] = {
val triggerId = UUID.randomUUID()
val converter = new Converter(compiledPackages, trigger)
val startState = converter
.fromTriggerUpdateState(
@ -123,41 +123,45 @@ final class TriggerProcessFactory private[simulation] (
implicit val ledgerResponseTimeout: Timeout = Timeout(config.ledgerRegistrationTimeout)
Behaviors.setup { context =>
context.ask(
ledger,
(ref: ActorRef[LedgerRegistration.LedgerApi]) =>
LedgerProcess.TriggerRegistration(
LedgerRegistration.Registration(triggerId, context.self, actAs, transactionFilter, ref)
),
) {
case Success(LedgerRegistration.LedgerApi(api, report)) =>
LedgerResponse(api, report)
Behaviors.logMessages {
Behaviors.setup { context =>
context.ask(
ledger,
(ref: ActorRef[LedgerRegistration.LedgerApi]) =>
LedgerProcess.TriggerRegistration(
LedgerRegistration
.Registration(triggerId, context.self, actAs, transactionFilter, ref)
),
) {
case Success(LedgerRegistration.LedgerApi(api, report)) =>
LedgerResponse(api, report)
case Failure(exn) =>
throw TriggerSimulationFailure(exn)
}
case Failure(exn) =>
throw TriggerSimulationFailure(exn)
}
Behaviors.receiveMessage {
case LedgerResponse(api, report) =>
run(api, report, startState)
Behaviors.receiveMessage {
case LedgerResponse(api, report) =>
run(triggerId, api, report, startState)
case msg =>
context.log.error(
s"Whilst waiting for a ledger response during trigger registration, we received an unexpected message: $msg"
)
Behaviors.stopped
case msg =>
context.log.error(
s"Whilst waiting for a ledger response during trigger registration, we received an unexpected message: $msg"
)
Behaviors.stopped
}
}
}
}
private[this] def run(
triggerId: UUID,
ledgerApi: ActorRef[LedgerApiClient.Message],
report: ActorRef[ReportingProcess.Message],
state: SValue,
)(implicit config: TriggerSimulationConfig): Behavior[Message] = {
Behaviors.receive {
case (_, MessageWrapper(msg)) =>
case (context, MessageWrapper(msg)) =>
val (submissions, metrics, nextState) = Await.result(
simulator.updateStateLambda(state, msg),
triggerConfig.hardLimit.ruleEvaluationTimeout,
@ -187,7 +191,7 @@ final class TriggerProcessFactory private[simulation] (
val reportId = UUID.randomUUID()
submissions.foreach { request =>
ledgerApi ! LedgerApiClient.CommandSubmission(request)
ledgerApi ! LedgerApiClient.CommandSubmission(request, context.self)
}
report ! ReportingProcess.MetricsUpdate(
MetricsReporting.TriggerMetricsUpdate(
@ -206,7 +210,7 @@ final class TriggerProcessFactory private[simulation] (
ACSReporting.TriggerACSUpdate(reportId, triggerId, triggerACSView)
)
run(ledgerApi, report, state = nextState)
run(triggerId, ledgerApi, report, state = nextState)
case (context, msg) =>
context.log.error(

View File

@ -4,33 +4,61 @@
package com.daml.lf.engine.trigger.simulation.process
package ledger
import akka.actor.typed.Behavior
import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.Behaviors
import com.daml.ledger.api.v1.command_submission_service.SubmitRequest
import com.daml.ledger.api.v1.completion.Completion
import com.daml.ledger.client.LedgerClient
import com.daml.lf.engine.trigger.TriggerMsg
import com.daml.lf.engine.trigger.simulation.TriggerMultiProcessSimulation.TriggerSimulationConfig
import com.daml.scalautil.Statement.discard
import com.google.rpc.status.Status
import io.grpc.Status.Code
import io.grpc.StatusRuntimeException
import scala.concurrent.Await
import scala.util.control.NonFatal
object LedgerApiClient {
sealed abstract class Message extends Product with Serializable
// Used by TriggerProcess
private[process] final case class CommandSubmission(
request: SubmitRequest
request: SubmitRequest,
replyTo: ActorRef[TriggerProcess.Message],
) extends Message
def create(
client: LedgerClient
)(implicit config: TriggerSimulationConfig): Behavior[Message] = {
Behaviors.receiveMessage { case CommandSubmission(request) =>
discard(
Await.result(
client.commandClient.submitSingleCommand(request),
config.ledgerSubmissionTimeout,
Behaviors.receive { case (context, CommandSubmission(request, replyTo)) =>
try {
discard(
Await.result(
client.commandClient.submitSingleCommand(request),
config.ledgerSubmissionTimeout,
)
)
)
} catch {
case cause: StatusRuntimeException if cause.getStatus.getCode != Code.UNAUTHENTICATED =>
context.log.info(
s"Ledger API encountered a command submission failure for $request - sending completion failure to $replyTo",
cause,
)
replyTo ! TriggerProcess.MessageWrapper(
TriggerMsg.Completion(
Completion(
request.getCommands.commandId,
Some(Status(cause.getStatus.getCode.value(), cause.getStatus.getDescription)),
)
)
)
case NonFatal(reason) =>
context.log.warn(
s"Ledger API encountered a command submission failure for $request - ignoring this failure",
reason,
)
}
Behaviors.same
}
}

View File

@ -46,7 +46,8 @@ final class LedgerExternalAction(client: LedgerClient)(implicit
case NonFatal(reason) =>
context.log.warn(
s"Ignoring create event submission failure: $event - reason: $reason"
s"Ignoring create event submission failure for $event",
reason,
)
}
Behaviors.same
@ -66,7 +67,8 @@ final class LedgerExternalAction(client: LedgerClient)(implicit
case NonFatal(reason) =>
context.log.warn(
s"Ignoring archive event submission failure: $event - reason: $reason"
s"Ignoring archive event submission failure for $event",
reason,
)
}
Behaviors.same

View File

@ -53,18 +53,21 @@ final class LedgerRegistration(client: LedgerClient)(implicit
if !ledgerACSView.contains(triggerId) =>
val offset =
Await.result(getLedgerOffset(client, filter), config.simulationSetupTimeout)
val logger = context.log
ledgerACSView += (triggerId -> TrieMap.empty)
client.transactionClient
.getTransactions(offset, None, filter)
.runForeach { transaction =>
logger.debug(s"Transaction source received: $transaction")
transaction.events.foreach {
case Event(Event.Event.Created(create)) =>
ledgerACSView(triggerId) += (create.contractId -> assertIdentifier(
create.getTemplateId
))
case Event(Event.Event.Archived(archive)) =>
case Event(Event.Event.Archived(archive))
if ledgerACSView(triggerId).contains(archive.contractId) =>
ledgerACSView(triggerId) -= archive.contractId
case Event(_) =>
@ -77,7 +80,9 @@ final class LedgerRegistration(client: LedgerClient)(implicit
throw exn
case Success(_) =>
// Do nothing
throw TriggerSimulationFailure(
new RuntimeException("Transaction source unexpectedly closed")
)
}
client.commandClient
.completionSource(Seq(actAs.unwrap), offset)
@ -90,14 +95,16 @@ final class LedgerRegistration(client: LedgerClient)(implicit
throw exn
case Success(_) =>
// Do nothing
throw TriggerSimulationFailure(
new RuntimeException("Completion source unexpectedly closed")
)
}
replyTo ! LedgerApi(consumer, report)
Behaviors.same
case Registration(triggerId, _, _, _, _) =>
case msg: Registration =>
context.log.error(
s"Following trigger registration, received another LedgerRegistration message for trigger: $triggerId"
s"Following trigger registration, received another LedgerRegistration message: $msg"
)
Behaviors.stopped
@ -154,7 +161,9 @@ object LedgerRegistration {
filter: TransactionFilter,
replyTo: ActorRef[LedgerApi],
) extends Message
// Used by TriggerProcess (via LedgerProcess)
final case class APIMessage(triggerId: UUID, message: LedgerApiClient.Message) extends Message
// Used by ReportingProcess
final case class GetTriggerACSDiff(
reportID: UUID,
triggerId: UUID,

View File

@ -7,7 +7,7 @@ package report
import akka.actor.typed.{Behavior, PostStop}
import akka.actor.typed.scaladsl.Behaviors
import com.daml.ledger.api.v1.command_submission_service.SubmitRequest
import com.daml.lf.data.Ref.Identifier
import com.daml.lf.data.Ref
import com.daml.lf.engine.trigger.simulation.TriggerMultiProcessSimulation.TriggerSimulationConfig
import com.daml.lf.engine.trigger.simulation.TriggerRuleMetrics
import com.google.rpc.status.{Status => GrpcStatus}
@ -21,7 +21,7 @@ private[simulation] object MetricsReporting {
private[process] final case class TriggerMetricsUpdate(
reportingId: UUID,
triggerId: UUID,
triggerType: Identifier,
triggerDefRef: Ref.DefinitionRef,
submissions: Seq[SubmitRequest],
metrics: TriggerRuleMetrics.RuleMetrics,
percentageHeapUsed: Double,
@ -35,8 +35,8 @@ private[simulation] object MetricsReporting {
val triggerDataFile = Files.newOutputStream(config.triggerDataFile)
val triggerDataFileCsvHeader = Seq(
"reporting-id",
"trigger-name",
"trigger-id",
"trigger-def-ref",
"submissions",
"evaluation-steps",
"evaluation-get-times",
@ -56,7 +56,7 @@ private[simulation] object MetricsReporting {
case TriggerMetricsUpdate(
reportingId,
triggerId,
triggerType,
triggerDefRef,
submissions,
metrics,
percentageHeapUsed,
@ -67,7 +67,7 @@ private[simulation] object MetricsReporting {
val csvData: String = Seq[Any](
reportingId,
triggerId,
triggerType,
triggerDefRef,
submissions.size,
metrics.evaluation.steps,
metrics.evaluation.getTimes,