Use a TrieMap to manage pendingCommandIds in trigger runners

* Fixes #15376

CHANGELOG_BEGIN
CHANGELOG_END
This commit is contained in:
Carl Pulley 2022-10-28 12:49:01 +01:00 committed by GitHub
parent 70b29e1b63
commit 1b4a812d37
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -41,6 +41,7 @@ import com.daml.logging.entries.LoggingEntry
import com.daml.logging.{ContextualizedLogger, LoggingContextOf}
import com.daml.platform.participant.util.LfEngineToApi.toApiIdentifier
import com.daml.platform.services.time.TimeProviderType
import com.daml.scalautil.Statement.discard
import com.daml.script.converter.Converter.Implicits._
import com.daml.script.converter.Converter.{DamlAnyModuleRecord, DamlTuple2, unrollFree}
import com.daml.script.converter.ConverterException
@ -49,17 +50,16 @@ import com.google.rpc.status.Status
import com.typesafe.scalalogging.StrictLogging
import io.grpc.Status.Code
import io.grpc.StatusRuntimeException
import scalaz.std.option._
import scalaz.syntax.bifunctor._
import scalaz.syntax.functor._
import scalaz.syntax.std.boolean._
import scalaz.syntax.std.option._
import scalaz.syntax.tag._
import scalaz.{-\/, Functor, Tag, \/, \/-}
import scalaz.{-\/, Tag, \/, \/-}
import java.time.Instant
import java.util.UUID
import scala.annotation.tailrec
import scala.collection.concurrent.TrieMap
import scala.concurrent.duration.{FiniteDuration, _}
import scala.concurrent.{ExecutionContext, Future}
@ -351,7 +351,7 @@ class Runner(
applicationId: ApplicationId,
parties: TriggerParties,
)(implicit loggingContext: LoggingContextOf[Trigger]) {
import Runner.{SeenMsgs, alterF}
import Runner.SeenMsgs
// Compiles LF expressions into Speedy expressions.
private val compiler = compiledPackages.compiler
@ -360,7 +360,9 @@ class Runner(
// These are the command IDs used on the ledger API to submit commands for
// this trigger for which we are awaiting either a completion or transaction
// message, or both.
private[this] var pendingCommandIds = Map.empty[UUID, SeenMsgs]
// This is a data structure that is shared across (potentially) multiple async contexts
// - hence why we use a scala.concurrent.TrieMap here.
private[this] val pendingCommandIds = TrieMap.empty[UUID, SeenMsgs]
private val transactionFilter =
TransactionFilter(parties.readers.map(p => (p.unwrap, trigger.filters)).toMap)
@ -368,9 +370,19 @@ class Runner(
// return whether uuid *was* present in pendingCommandIds
private[this] def useCommandId(uuid: UUID, seeOne: SeenMsgs.One): Boolean = {
val newMap = alterF(pendingCommandIds, uuid)(_ map (_ see seeOne))
newMap foreach { pendingCommandIds = _ }
newMap.isDefined
pendingCommandIds.get(uuid) match {
case None =>
false
case Some(seenOne) =>
seenOne.see(seeOne) match {
case Some(v) =>
pendingCommandIds.update(uuid, v)
case None =>
discard(pendingCommandIds.remove(uuid))
}
true
}
}
import Runner.{
@ -385,7 +397,7 @@ class Runner(
@throws[RuntimeException]
private def handleCommands(commands: Seq[Command]): (UUID, SubmitRequest) = {
val commandUUID = UUID.randomUUID
pendingCommandIds += ((commandUUID, SeenMsgs.Neither))
pendingCommandIds.update(commandUUID, SeenMsgs.Neither)
val commandsArg = Commands(
ledgerId = client.ledgerId.unwrap,
applicationId = applicationId.unwrap,
@ -822,29 +834,21 @@ object Runner extends StrictLogging {
Flow[A].mapAsync(parallelism)(trial(initialTries, _))
}
private def alterF[K, V, F[a] >: Option[a]: Functor](m: Map[K, V], k: K)(
f: Option[V] => F[Option[V]]
): F[Map[K, V]] = {
val ov = m get k
f(ov) map {
(ov, _) match {
case (_, Some(v)) => m.updated(k, v)
case (None, None) => m
case (Some(_), None) => m - k
}
}
}
private final case class SingleCommandFailure(commandId: String, s: StatusRuntimeException)
private sealed abstract class SeenMsgs {
import Runner.{SeenMsgs => S}
def see(msg: S.One): Option[SeenMsgs] =
(this, msg).match2 {
case S.Completion => { case S.Transaction => None }
case S.Transaction => { case S.Completion => None }
case S.Neither => { case one => Some(one: SeenMsgs) }
}(fallback = Some(this))
def see(msg: S.One): Option[SeenMsgs] = {
(this, msg) match {
case (S.Completion, S.Transaction) | (S.Transaction, S.Completion) =>
None
case (S.Neither, _) =>
Some(msg)
case _ =>
Some(this)
}
}
}
private object SeenMsgs {