From 1b4a812d37cbed6098d20d141be2cc67e1fb5ed7 Mon Sep 17 00:00:00 2001 From: Carl Pulley <106966370+carlpulley-da@users.noreply.github.com> Date: Fri, 28 Oct 2022 12:49:01 +0100 Subject: [PATCH] Use a TrieMap to manage pendingCommandIds in trigger runners * Fixes #15376 CHANGELOG_BEGIN CHANGELOG_END --- .../daml/lf/engine/trigger/Runner.scala | 60 ++++++++++--------- 1 file changed, 32 insertions(+), 28 deletions(-) diff --git a/triggers/runner/src/main/scala/com/digitalasset/daml/lf/engine/trigger/Runner.scala b/triggers/runner/src/main/scala/com/digitalasset/daml/lf/engine/trigger/Runner.scala index a7875f5e01..cad206445c 100644 --- a/triggers/runner/src/main/scala/com/digitalasset/daml/lf/engine/trigger/Runner.scala +++ b/triggers/runner/src/main/scala/com/digitalasset/daml/lf/engine/trigger/Runner.scala @@ -41,6 +41,7 @@ import com.daml.logging.entries.LoggingEntry import com.daml.logging.{ContextualizedLogger, LoggingContextOf} import com.daml.platform.participant.util.LfEngineToApi.toApiIdentifier import com.daml.platform.services.time.TimeProviderType +import com.daml.scalautil.Statement.discard import com.daml.script.converter.Converter.Implicits._ import com.daml.script.converter.Converter.{DamlAnyModuleRecord, DamlTuple2, unrollFree} import com.daml.script.converter.ConverterException @@ -49,17 +50,16 @@ import com.google.rpc.status.Status import com.typesafe.scalalogging.StrictLogging import io.grpc.Status.Code import io.grpc.StatusRuntimeException -import scalaz.std.option._ import scalaz.syntax.bifunctor._ -import scalaz.syntax.functor._ import scalaz.syntax.std.boolean._ import scalaz.syntax.std.option._ import scalaz.syntax.tag._ -import scalaz.{-\/, Functor, Tag, \/, \/-} +import scalaz.{-\/, Tag, \/, \/-} import java.time.Instant import java.util.UUID import scala.annotation.tailrec +import scala.collection.concurrent.TrieMap import scala.concurrent.duration.{FiniteDuration, _} import scala.concurrent.{ExecutionContext, Future} @@ -351,7 +351,7 @@ class Runner( applicationId: ApplicationId, parties: TriggerParties, )(implicit loggingContext: LoggingContextOf[Trigger]) { - import Runner.{SeenMsgs, alterF} + import Runner.SeenMsgs // Compiles LF expressions into Speedy expressions. private val compiler = compiledPackages.compiler @@ -360,7 +360,9 @@ class Runner( // These are the command IDs used on the ledger API to submit commands for // this trigger for which we are awaiting either a completion or transaction // message, or both. - private[this] var pendingCommandIds = Map.empty[UUID, SeenMsgs] + // This is a data structure that is shared across (potentially) multiple async contexts + // - hence why we use a scala.concurrent.TrieMap here. + private[this] val pendingCommandIds = TrieMap.empty[UUID, SeenMsgs] private val transactionFilter = TransactionFilter(parties.readers.map(p => (p.unwrap, trigger.filters)).toMap) @@ -368,9 +370,19 @@ class Runner( // return whether uuid *was* present in pendingCommandIds private[this] def useCommandId(uuid: UUID, seeOne: SeenMsgs.One): Boolean = { - val newMap = alterF(pendingCommandIds, uuid)(_ map (_ see seeOne)) - newMap foreach { pendingCommandIds = _ } - newMap.isDefined + pendingCommandIds.get(uuid) match { + case None => + false + + case Some(seenOne) => + seenOne.see(seeOne) match { + case Some(v) => + pendingCommandIds.update(uuid, v) + case None => + discard(pendingCommandIds.remove(uuid)) + } + true + } } import Runner.{ @@ -385,7 +397,7 @@ class Runner( @throws[RuntimeException] private def handleCommands(commands: Seq[Command]): (UUID, SubmitRequest) = { val commandUUID = UUID.randomUUID - pendingCommandIds += ((commandUUID, SeenMsgs.Neither)) + pendingCommandIds.update(commandUUID, SeenMsgs.Neither) val commandsArg = Commands( ledgerId = client.ledgerId.unwrap, applicationId = applicationId.unwrap, @@ -822,29 +834,21 @@ object Runner extends StrictLogging { Flow[A].mapAsync(parallelism)(trial(initialTries, _)) } - private def alterF[K, V, F[a] >: Option[a]: Functor](m: Map[K, V], k: K)( - f: Option[V] => F[Option[V]] - ): F[Map[K, V]] = { - val ov = m get k - f(ov) map { - (ov, _) match { - case (_, Some(v)) => m.updated(k, v) - case (None, None) => m - case (Some(_), None) => m - k - } - } - } - private final case class SingleCommandFailure(commandId: String, s: StatusRuntimeException) private sealed abstract class SeenMsgs { import Runner.{SeenMsgs => S} - def see(msg: S.One): Option[SeenMsgs] = - (this, msg).match2 { - case S.Completion => { case S.Transaction => None } - case S.Transaction => { case S.Completion => None } - case S.Neither => { case one => Some(one: SeenMsgs) } - }(fallback = Some(this)) + + def see(msg: S.One): Option[SeenMsgs] = { + (this, msg) match { + case (S.Completion, S.Transaction) | (S.Transaction, S.Completion) => + None + case (S.Neither, _) => + Some(msg) + case _ => + Some(this) + } + } } private object SeenMsgs {