remove known command ID from trigger state once Transaction and Completion message are seen (#7598)

- no assumption about order

- multiple transactions/completions are treated as one

CHANGELOG_BEGIN
CHANGELOG_END
This commit is contained in:
Stephen Compall 2020-10-07 14:32:49 -04:00 committed by GitHub
parent e2da5ba010
commit dc204a37a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -12,10 +12,15 @@ import io.grpc.StatusRuntimeException
import java.time.Instant
import java.util.UUID
import scalaz.Functor
import scalaz.\/.fromTryCatchThrowable
import scalaz.std.option._
import scalaz.syntax.functor._
import scalaz.syntax.tag._
import scalaz.syntax.std.boolean._
import scala.language.higherKinds
import scala.annotation.tailrec
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.FiniteDuration
@ -207,22 +212,32 @@ class Runner(
applicationId: ApplicationId,
party: String,
)(implicit loggingContext: LoggingContextOf[Trigger]) {
import Runner.{SeenMsgs, alterF}
// Compiles LF expressions into Speedy expressions.
private val compiler: Compiler = compiledPackages.compiler
// Converts between various objects and SValues.
private val converter: Converter = Converter(compiledPackages, trigger.triggerIds)
// These are the command IDs used on the ledger API to submit commands for
// this trigger. This set can grow without bound. TODO : limit the size.
private[this] var commandIdsUsed: Set[UUID] = Set.empty
// this trigger for which we are awaiting either a completion or transaction
// message, or both.
private[this] var pendingCommandIds: Map[UUID, SeenMsgs] = Map.empty
private val transactionFilter: TransactionFilter =
TransactionFilter(Seq((party, trigger.filters)).toMap)
private[this] def logger = ContextualizedLogger get getClass
// return whether uuid *was* present in pendingCommandIds
private[this] def useCommandId(uuid: UUID, seeOne: SeenMsgs.One): Boolean = {
val newMap = alterF(pendingCommandIds, uuid)(_ map (_ see seeOne))
newMap foreach { pendingCommandIds = _ }
newMap.isDefined
}
@throws[RuntimeException]
private def handleCommands(commands: Seq[Command]): (UUID, SubmitRequest) = {
val commandUUID = UUID.randomUUID
commandIdsUsed += commandUUID
pendingCommandIds += ((commandUUID, SeenMsgs.Neither))
val commandsArg = Commands(
ledgerId = client.ledgerId.unwrap,
applicationId = applicationId.unwrap,
@ -403,15 +418,19 @@ class Runner(
case msg @ CompletionMsg(c) =>
// This happens for invalid UUIDs which we might get for
// completions not emitted by the trigger.
val uuid = fromTryCatchThrowable[UUID, IllegalArgumentException](
val ouuid = fromTryCatchThrowable[UUID, IllegalArgumentException](
UUID.fromString(c.commandId)).toOption
(uuid exists commandIdsUsed).option(msg).toList
ouuid.flatMap { uuid =>
useCommandId(uuid, SeenMsgs.Completion) option msg
}.toList
case msg @ TransactionMsg(t) =>
// This happens for invalid UUIDs which we might get for
// transactions not emitted by the trigger.
val uuid = fromTryCatchThrowable[UUID, IllegalArgumentException](
val ouuid = fromTryCatchThrowable[UUID, IllegalArgumentException](
UUID.fromString(t.commandId)).toOption
List(if (uuid exists commandIdsUsed) msg else TransactionMsg(t.copy(commandId = "")))
List(ouuid flatMap { uuid =>
useCommandId(uuid, SeenMsgs.Transaction) option msg
} getOrElse TransactionMsg(t.copy(commandId = "")))
case x @ HeartbeatMsg() => List(x) // Hearbeats don't carry any information.
})
.toMat(Sink.fold[SValue, TriggerMsg](evaluatedInitialState)((state, message) => {
@ -521,6 +540,35 @@ object Runner extends StrictLogging {
def unapply(v: SPAP): Some[SPAP] = Some(v)
}
private def alterF[K, V, F[_]: Functor](m: Map[K, V], k: K)(
f: Option[V] => F[Option[V]]): F[Map[K, V]] = {
val ov = m get k
f(ov) map {
(ov, _) match {
case (_, Some(v)) => m updated (k, v)
case (None, None) => m
case (Some(_), None) => m - k
}
}
}
private sealed abstract class SeenMsgs {
import Runner.{SeenMsgs => S}
def see(msg: S.One): Option[SeenMsgs] =
(this, msg).match2 {
case S.Completion => { case S.Transaction => None }
case S.Transaction => { case S.Completion => None }
case S.Neither => { case one => Some(one: SeenMsgs) }
}(fallback = Some(this))
}
private object SeenMsgs {
sealed abstract class One extends SeenMsgs
case object Completion extends One
case object Transaction extends One
case object Neither extends SeenMsgs
}
// Convience wrapper that creates the runner and runs the trigger.
def run(
dar: Dar[(PackageId, Package)],