Make CommandTracker distinguish submissions of the same command using submissionId [KVL-1104] (#10868)

CHANGELOG_BEGIN
CHANGELOG_END
This commit is contained in:
Hubert Slojewski 2021-09-15 16:15:10 +02:00 committed by GitHub
parent b4328b3dc3
commit b5648c0e3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 296 additions and 127 deletions

View File

@ -4,12 +4,11 @@
package com.daml.ledger.client.services.commands
import java.time.Duration
import akka.NotUsed
import akka.stream.scaladsl.{Concat, Flow, GraphDSL, Merge, Source}
import akka.stream.{DelayOverflowStrategy, FlowShape, OverflowStrategy}
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
import com.daml.ledger.client.services.commands.tracker.CommandTracker
import com.daml.ledger.client.services.commands.tracker.{TrackedCommandKey, CommandTracker}
import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{
CompletionFailure,
CompletionSuccess,
@ -32,13 +31,13 @@ object CommandTrackerFlow {
final case class Materialized[SubmissionMat, Context](
submissionMat: SubmissionMat,
trackingMat: Future[immutable.Map[String, Context]],
trackingMat: Future[immutable.Map[TrackedCommandKey, Context]],
)
def apply[Context, SubmissionMat](
commandSubmissionFlow: Flow[
Ctx[(Context, String), CommandSubmission],
Ctx[(Context, String), Try[
Ctx[(Context, TrackedCommandKey), CommandSubmission],
Ctx[(Context, TrackedCommandKey), Try[
Empty
]],
SubmissionMat,
@ -62,12 +61,13 @@ object CommandTrackerFlow {
implicit builder => (submissionFlow, tracker) =>
import GraphDSL.Implicits._
val wrapResult = builder.add(Flow[Ctx[(Context, String), Try[Empty]]].map(Left.apply))
val wrapResult =
builder.add(Flow[Ctx[(Context, TrackedCommandKey), Try[Empty]]].map(Left.apply))
val wrapCompletion = builder.add(Flow[CompletionStreamElement].map(Right.apply))
val merge = builder.add(
Merge[Either[Ctx[(Context, String), Try[Empty]], CompletionStreamElement]](
Merge[Either[Ctx[(Context, TrackedCommandKey), Try[Empty]], CompletionStreamElement]](
inputPorts = 2,
eagerComplete = false,
)

View File

@ -3,8 +3,6 @@
package com.daml.ledger.client.services.commands.tracker
import java.time.{Duration, Instant}
import akka.stream.stage._
import akka.stream.{Attributes, Inlet, Outlet}
import com.daml.grpc.{GrpcException, GrpcStatus}
@ -15,6 +13,7 @@ import com.daml.ledger.client.services.commands.tracker.CommandTracker._
import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{
CompletionFailure,
CompletionSuccess,
NotOkResponse,
}
import com.daml.ledger.client.services.commands.{
CommandSubmission,
@ -27,6 +26,7 @@ import com.google.rpc.status.{Status => StatusProto}
import io.grpc.Status
import org.slf4j.LoggerFactory
import java.time.{Duration, Instant}
import scala.annotation.nowarn
import scala.collection.compat._
import scala.collection.{immutable, mutable}
@ -57,17 +57,20 @@ private[commands] class CommandTracker[Context](
timeoutDetectionPeriod: FiniteDuration,
) extends GraphStageWithMaterializedValue[
CommandTrackerShape[Context],
Future[Map[String, Context]],
Future[Map[TrackedCommandKey, Context]],
] {
private val logger = LoggerFactory.getLogger(this.getClass.getName)
val submitRequestIn: Inlet[Ctx[Context, CommandSubmission]] =
Inlet[Ctx[Context, CommandSubmission]]("submitRequestIn")
val submitRequestOut: Outlet[Ctx[(Context, String), CommandSubmission]] =
Outlet[Ctx[(Context, String), CommandSubmission]]("submitRequestOut")
val commandResultIn: Inlet[Either[Ctx[(Context, String), Try[Empty]], CompletionStreamElement]] =
Inlet[Either[Ctx[(Context, String), Try[Empty]], CompletionStreamElement]]("commandResultIn")
val submitRequestOut: Outlet[Ctx[(Context, TrackedCommandKey), CommandSubmission]] =
Outlet[Ctx[(Context, TrackedCommandKey), CommandSubmission]]("submitRequestOut")
val commandResultIn
: Inlet[Either[Ctx[(Context, TrackedCommandKey), Try[Empty]], CompletionStreamElement]] =
Inlet[Either[Ctx[(Context, TrackedCommandKey), Try[Empty]], CompletionStreamElement]](
"commandResultIn"
)
val resultOut: Outlet[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]] =
Outlet[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]]("resultOut")
val offsetOut: Outlet[LedgerOffset] =
@ -75,9 +78,9 @@ private[commands] class CommandTracker[Context](
override def createLogicAndMaterializedValue(
inheritedAttributes: Attributes
): (GraphStageLogic, Future[Map[String, Context]]) = {
): (GraphStageLogic, Future[Map[TrackedCommandKey, Context]]) = {
val promise = Promise[immutable.Map[String, Context]]()
val promise = Promise[immutable.Map[TrackedCommandKey, Context]]()
val logic: TimerGraphStageLogic = new TimerGraphStageLogic(shape) {
@ -91,12 +94,12 @@ private[commands] class CommandTracker[Context](
timerKey match {
case `timeout_detection` =>
val timeouts = getOutputForTimeout(Instant.now)
if (timeouts.nonEmpty) emitMultiple(resultOut, timeouts)
if (timeouts.nonEmpty) emitMultiple(resultOut, timeouts.to(immutable.Iterable))
case _ => // unknown timer, nothing to do
}
}
private val pendingCommands = new mutable.HashMap[String, TrackingData[Context]]()
private val pendingCommands = new mutable.HashMap[TrackedCommandKey, TrackingData[Context]]()
setHandler(
submitRequestOut,
@ -116,11 +119,16 @@ private[commands] class CommandTracker[Context](
override def onPush(): Unit = {
val submitRequest = grab(submitRequestIn)
registerSubmission(submitRequest)
logger.trace(
"Submitted command {}",
submitRequest.value.commands.commandId,
val commands = submitRequest.value.commands
val submissionId = commands.submissionId
val commandId = commands.commandId
logger.trace(s"Submitted command $commandId in submission $submissionId.")
push(
submitRequestOut,
submitRequest.enrich((context, _) =>
context -> TrackedCommandKey(submissionId, commandId)
),
)
push(submitRequestOut, submitRequest.enrich(_ -> _.commands.commandId))
}
override def onUpstreamFinish(): Unit = {
@ -177,7 +185,7 @@ private[commands] class CommandTracker[Context](
)
private def pushResultOrPullCommandResultIn(
compl: Option[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]]
compl: Seq[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]]
): Unit = {
// The command tracker detects timeouts outside the regular pull/push
// mechanism of the input/output ports. Basically the timeout
@ -186,7 +194,10 @@ private[commands] class CommandTracker[Context](
// even though it hasn't been pulled again in the meantime. Using `emit`
// instead of `push` when a completion arrives makes akka take care of
// handling the signaling properly.
compl.fold(if (!hasBeenPulled(commandResultIn)) pull(commandResultIn))(emit(resultOut, _))
if (compl.isEmpty && !hasBeenPulled(commandResultIn)) {
pull(commandResultIn)
}
emitMultiple(resultOut, compl.to(immutable.Iterable))
}
private def completeStageIfTerminal(): Unit = {
@ -197,33 +208,38 @@ private[commands] class CommandTracker[Context](
import CommandTracker.nonTerminalCodes
private def handleSubmitResponse(submitResponse: Ctx[(Context, String), Try[Empty]]) = {
val Ctx((_, commandId), value, _) = submitResponse
private def handleSubmitResponse(
submitResponse: Ctx[(Context, TrackedCommandKey), Try[Empty]]
): Seq[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]] = {
val Ctx((_, commandKey), value, _) = submitResponse
value match {
case Failure(GrpcException(status @ GrpcStatus(code, _), metadata))
if !nonTerminalCodes(code) =>
getOutputForTerminalStatusCode(commandId, GrpcStatus.toProto(status, metadata))
getOutputForTerminalStatusCode(commandKey, GrpcStatus.toProto(status, metadata))
case Failure(throwable) =>
logger.warn(
s"Service responded with error for submitting command with context ${submitResponse.context}. Status of command is unknown. watching for completion...",
throwable,
)
None
Seq.empty
case Success(_) =>
logger.trace("Received confirmation that command {} was accepted.", commandId)
None
logger.trace(
s"Received confirmation that command ${commandKey.commandId} from submission ${commandKey.submissionId} was accepted."
)
Seq.empty
}
}
@nowarn("msg=deprecated")
private def registerSubmission(submission: Ctx[Context, CommandSubmission]): Unit = {
val commands = submission.value.commands
val submissionId = commands.submissionId
val commandId = commands.commandId
logger.trace("Begin tracking of command {}", commandId)
if (pendingCommands.contains(commandId)) {
logger.trace(s"Begin tracking of command $commandId for submission $submissionId.")
if (pendingCommands.contains(TrackedCommandKey(submissionId, commandId))) {
// TODO return an error identical to the server side duplicate command error once that's defined.
throw new IllegalStateException(
s"A command with id $commandId is already being tracked. CommandIds submitted to the CommandTracker must be unique."
s"A command $commandId from a submission $submissionId is already being tracked. CommandIds submitted to the CommandTracker must be unique."
) with NoStackTrace
}
val commandTimeout = submission.value.timeout match {
@ -249,65 +265,117 @@ private[commands] class CommandTracker[Context](
commandTimeout = Instant.now().plus(commandTimeout),
context = submission.context,
)
pendingCommands += commandId -> trackingData
pendingCommands += TrackedCommandKey(submissionId, commandId) -> trackingData
()
}
private def getOutputForTimeout(instant: Instant) = {
logger.trace("Checking timeouts at {}", instant)
pendingCommands.view
.flatMap { case (commandId, trackingData) =>
if (trackingData.commandTimeout.isBefore(instant)) {
pendingCommands -= commandId
logger.info(
s"Command {} (command timeout {}) timed out at checkpoint {}.",
commandId,
trackingData.commandTimeout,
instant,
pendingCommands.view.flatMap { case (commandKey, trackingData) =>
if (trackingData.commandTimeout.isBefore(instant)) {
pendingCommands -= commandKey
logger.info(
s"Command {} from submission {} (command timeout {}) timed out at checkpoint {}.",
commandKey.commandId,
commandKey.submissionId,
trackingData.commandTimeout,
instant,
)
List(
Ctx(
trackingData.context,
Left(CompletionResponse.TimeoutResponse(commandId = trackingData.commandId)),
)
List(
Ctx(
trackingData.context,
Left(CompletionResponse.TimeoutResponse(commandId = trackingData.commandId)),
)
} else {
Nil
}
}.toSeq
}
private def getOutputForCompletion(
completion: Completion
): Seq[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]] = {
val completionDescription = completion.status match {
case Some(StatusProto(code, _, _, _)) if code == Status.Code.OK.value =>
"successful completion of command"
case _ => "failed completion of command"
}
val commandId = completion.commandId
logger.trace(
"Handling {} {} from submission {}.",
completionDescription,
commandId,
completion.submissionId,
)
val maybeSubmissionId = Option(completion.submissionId).collect {
case id if id.nonEmpty => id
}
val value = pendingCommandKeys(maybeSubmissionId, commandId)
val trackedCommands =
value.flatMap(pendingCommands.remove(_).toList)
if (trackedCommands.size > 1) {
trackedCommands.map { trackingData =>
Ctx(
trackingData.context,
Left(
NotOkResponse(
commandId,
StatusProto.of(
Status.Code.INTERNAL.value(),
s"There are multiple pending commands for the id $commandId. This can only happen for the mutating schema.",
Seq.empty,
),
)
)
} else {
Nil
}
),
)
}
.to(immutable.Seq)
}
private def getOutputForCompletion(completion: Completion) = {
val (commandId, errorText) = {
completion.status match {
case Some(StatusProto(code, _, _, _)) if code == Status.Code.OK.value =>
completion.commandId -> "successful completion of command"
case _ =>
completion.commandId -> "failed completion of command"
}
}
logger.trace("Handling {} {}", errorText, completion.commandId: Any)
pendingCommands.remove(commandId).map { t =>
Ctx(t.context, tracker.CompletionResponse(completion))
} else {
trackedCommands.map(trackingData =>
Ctx(trackingData.context, tracker.CompletionResponse(completion))
)
}
}
private def pendingCommandKeys(
submissionId: Option[String],
commandId: String,
): Seq[TrackedCommandKey] =
submissionId.map(id => Seq(TrackedCommandKey(id, commandId))).getOrElse {
pendingCommands.keys.filter(_.commandId == commandId).toList
}
private def getOutputForTerminalStatusCode(
commandId: String,
commandKey: TrackedCommandKey,
status: StatusProto,
): Option[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]] = {
logger.trace("Handling failure of command {}", commandId)
): Seq[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]] = {
logger.trace(
s"Handling failure of command ${commandKey.commandId} from submission ${commandKey.submissionId}."
)
pendingCommands
.remove(commandId)
.remove(commandKey)
.map { t =>
Ctx(t.context, tracker.CompletionResponse(Completion(commandId, Some(status))))
Ctx(
t.context,
tracker.CompletionResponse(
Completion(
commandKey.commandId,
Some(status),
submissionId = commandKey.submissionId,
)
),
)
}
.orElse {
logger.trace("Platform signaled failure for unknown command {}", commandId)
logger.trace(
s"Platform signaled failure for unknown command ${commandKey.commandId} from submission ${commandKey.submissionId}."
)
None
}
.toList
}
override def postStop(): Unit = {

View File

@ -18,8 +18,10 @@ import scala.util.Try
private[tracker] final case class CommandTrackerShape[Context](
submitRequestIn: Inlet[Ctx[Context, CommandSubmission]],
submitRequestOut: Outlet[Ctx[(Context, String), CommandSubmission]],
commandResultIn: Inlet[Either[Ctx[(Context, String), Try[Empty]], CompletionStreamElement]],
submitRequestOut: Outlet[Ctx[(Context, TrackedCommandKey), CommandSubmission]],
commandResultIn: Inlet[
Either[Ctx[(Context, TrackedCommandKey), Try[Empty]], CompletionStreamElement]
],
resultOut: Outlet[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]],
offsetOut: Outlet[LedgerOffset],
) extends Shape {

View File

@ -0,0 +1,6 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.client.services.commands.tracker
case class TrackedCommandKey(submissionId: String, commandId: String)

View File

@ -23,6 +23,7 @@ import com.daml.ledger.api.validation.CommandsValidator
import com.daml.ledger.client.LedgerClient
import com.daml.ledger.client.configuration.CommandClientConfiguration
import com.daml.ledger.client.services.commands.CommandTrackerFlow.Materialized
import com.daml.ledger.client.services.commands.tracker.TrackedCommandKey
import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{
CompletionFailure,
CompletionSuccess,
@ -149,7 +150,7 @@ private[daml] final class CommandClient(
.via(commandUpdaterFlow[Context](ledgerIdToUse))
.viaMat(
CommandTrackerFlow[Context, NotUsed](
commandSubmissionFlow = CommandSubmissionFlow[(Context, String)](
commandSubmissionFlow = CommandSubmissionFlow[(Context, TrackedCommandKey)](
submit(token),
config.maxParallelSubmissions,
),

View File

@ -5,7 +5,6 @@ package com.daml.ledger.client.services.commands
import java.time.{Duration, Instant}
import java.util.concurrent.atomic.AtomicReference
import akka.NotUsed
import akka.stream.scaladsl.{Flow, Keep, Source, SourceQueueWithComplete}
import akka.stream.testkit.javadsl.TestSink
@ -22,7 +21,7 @@ import com.daml.ledger.api.v1.completion.Completion
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset.LedgerBoundary.LEDGER_BEGIN
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset.Value.{Absolute, Boundary}
import com.daml.ledger.client.services.commands.tracker.CompletionResponse
import com.daml.ledger.client.services.commands.tracker.{CompletionResponse, TrackedCommandKey}
import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{
CompletionFailure,
CompletionSuccess,
@ -32,8 +31,8 @@ import com.daml.util.Ctx
import com.google.protobuf.empty.Empty
import com.google.protobuf.timestamp.Timestamp
import com.google.rpc.code._
import com.google.rpc.status.Status
import io.grpc.StatusRuntimeException
import com.google.rpc.status.{Status => StatusProto}
import io.grpc.{Status, StatusRuntimeException}
import org.scalatest.OptionValues
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
@ -53,10 +52,12 @@ class CommandTrackerFlowTest
with AkkaBeforeAndAfterAll
with ScalaFutures {
type C[Value] = Ctx[(Int, String), Value]
type C[Value] = Ctx[(Int, TrackedCommandKey), Value]
private val allSubmissionsSuccessful
: Flow[Ctx[(Int, String), CommandSubmission], Ctx[(Int, String), Try[Empty]], NotUsed] =
private val allSubmissionsSuccessful: Flow[Ctx[(Int, TrackedCommandKey), CommandSubmission], Ctx[
(Int, TrackedCommandKey),
Try[Empty],
], NotUsed] =
Flow[C[CommandSubmission]].map {
_.map(_ => Success(Empty.defaultInstance))
}
@ -68,23 +69,32 @@ class CommandTrackerFlowTest
TestSink.probe[Ctx[Int, Either[CompletionFailure, CompletionSuccess]]](system)
private val mrt = Instant.EPOCH.plus(shortDuration)
private val submissionId = "submissionId"
private val commandId = "commandId"
private val abortedCompletion =
Completion(
commandId,
Some(Status(Code.ABORTED.value)),
Some(StatusProto(Code.ABORTED.value)),
submissionId = submissionId,
)
private val successStatus = Status(Code.OK.value)
private val successStatus = StatusProto(Code.OK.value)
private val context = 1
private val submission = newSubmission(commandId)
private val submission = newSubmission(submissionId, commandId)
private def newSubmission(commandId: String, timeout: Option[Duration] = None) =
Ctx(context, CommandSubmission(Commands(commandId = commandId), timeout))
private def newSubmission(
submissionId: String,
commandId: String,
timeout: Option[Duration] = None,
) =
Ctx(
context,
CommandSubmission(Commands(commandId = commandId, submissionId = submissionId), timeout),
)
private case class Handle(
submissions: TestPublisher.Probe[Ctx[Int, CommandSubmission]],
completions: TestSubscriber.Probe[Ctx[Int, Either[CompletionFailure, CompletionSuccess]]],
whatever: Future[Map[String, Int]],
whatever: Future[Map[TrackedCommandKey, Int]],
completionsStreamMock: CompletionStreamMock,
)
@ -155,7 +165,7 @@ class CommandTrackerFlowTest
whenReady(unhandledF) { unhandled =>
unhandled should have size 1
unhandled should contain(
submission.value.commands.commandId -> submission.context
TrackedCommandKey(submissionId, commandId) -> submission.context
)
}
}
@ -167,14 +177,17 @@ class CommandTrackerFlowTest
val Handle(submissions, results, unhandledF, _) =
runCommandTrackingFlow(allSubmissionsSuccessful)
val otherCommandId = "otherId"
val otherSubmissionId = "otherSubmissionId"
val otherCommandId = "otherCommandId"
submissions.sendNext(newSubmission(otherCommandId))
submissions.sendNext(newSubmission(otherSubmissionId, otherCommandId))
results.cancel()
whenReady(unhandledF) { unhandled =>
unhandled should have size 1
unhandled should contain(otherCommandId -> submission.context)
unhandled should contain(
TrackedCommandKey(otherSubmissionId, otherCommandId) -> submission.context
)
}
}
}
@ -216,7 +229,10 @@ class CommandTrackerFlowTest
val failureCompletion =
Left(
NotOkResponse(commandId = commandId, grpcStatus = Status(Code.RESOURCE_EXHAUSTED.value))
NotOkResponse(
commandId = commandId,
grpcStatus = StatusProto(Code.RESOURCE_EXHAUSTED.value),
)
)
results.expectNext(Ctx(context, failureCompletion))
@ -236,7 +252,7 @@ class CommandTrackerFlowTest
completionStreamMock.send(CompletionStreamElement.CompletionElement(abortedCompletion))
results.requestNext().value shouldEqual Left(
NotOkResponse(commandId, Status(Code.ABORTED.value))
NotOkResponse(commandId, StatusProto(Code.ABORTED.value))
)
}
@ -251,7 +267,7 @@ class CommandTrackerFlowTest
completionStreamMock.send(CompletionStreamElement.CompletionElement(abortedCompletion))
results.requestNext().value shouldEqual Left(
NotOkResponse(commandId, Status(Code.ABORTED.value))
NotOkResponse(commandId, StatusProto(Code.ABORTED.value))
)
}
@ -276,7 +292,9 @@ class CommandTrackerFlowTest
"timeout the command when the timeout passes" in {
val Handle(submissions, results, _, _) = runCommandTrackingFlow(allSubmissionsSuccessful)
submissions.sendNext(newSubmission(commandId, timeout = Some(Duration.ofMillis(100))))
submissions.sendNext(
newSubmission(submissionId, commandId, timeout = Some(Duration.ofMillis(100)))
)
results.expectNext(
500.milliseconds,
@ -330,7 +348,9 @@ class CommandTrackerFlowTest
maximumCommandTimeout = Duration.ofMillis(100),
)
submissions.sendNext(newSubmission(commandId, timeout = Some(Duration.ofSeconds(10))))
submissions.sendNext(
newSubmission(submissionId, commandId, timeout = Some(Duration.ofSeconds(10)))
)
results.expectNext(
500.millis,
@ -348,7 +368,7 @@ class CommandTrackerFlowTest
submissions.sendNext(submission)
completionStreamMock.send(successfulCompletion(commandId))
completionStreamMock.send(successfulCompletion(submissionId, commandId))
results.expectNext(
Ctx(context, Right(CompletionResponse.CompletionSuccess(commandId, "", successStatus)))
@ -359,9 +379,12 @@ class CommandTrackerFlowTest
"after the timeout" in {
val Handle(submissions, results, _, completionStreamMock) =
runCommandTrackingFlow(allSubmissionsSuccessful)
val timedOutSubmissionId = "timedOutSubmissionId"
val timedOutCommandId = "timedOutCommandId"
submissions.sendNext(newSubmission(timedOutCommandId, timeout = Some(shortDuration)))
submissions.sendNext(
newSubmission(timedOutSubmissionId, timedOutCommandId, timeout = Some(shortDuration))
)
results.expectNext(
shortDuration.getSeconds.seconds * 3,
@ -369,7 +392,7 @@ class CommandTrackerFlowTest
)
// since the command timed out before, the tracker shouldn't send the completion through
completionStreamMock.send(successfulCompletion(timedOutCommandId))
completionStreamMock.send(successfulCompletion(timedOutSubmissionId, timedOutCommandId))
results.request(1)
results.expectNoMessage()
succeed
@ -378,9 +401,10 @@ class CommandTrackerFlowTest
"after another command has timed out" in {
val Handle(submissions, results, _, completionStreamMock) =
runCommandTrackingFlow(allSubmissionsSuccessful)
val timedOutSubmissionId = "timedOutSubmissionId"
val timedOutCommandId = "timedOutCommandId"
val submitRequestShortDedupTime =
newSubmission(timedOutCommandId, timeout = Some(shortDuration))
newSubmission(timedOutSubmissionId, timedOutCommandId, timeout = Some(shortDuration))
// we send 2 requests
submissions.sendNext(submitRequestShortDedupTime)
@ -392,7 +416,7 @@ class CommandTrackerFlowTest
Ctx(context, Left(CompletionResponse.TimeoutResponse(timedOutCommandId))),
)
// we now receive a completion
completionStreamMock.send(successfulCompletion(commandId))
completionStreamMock.send(successfulCompletion(submissionId, commandId))
// because the out-of-band timeout completion consumed the previous pull on `results`,
// we don't expect a message until we request one.
// The order below is important to reproduce the issue described in DPP-285.
@ -415,8 +439,8 @@ class CommandTrackerFlowTest
submissions.sendNext(submission)
completionStreamMock.send(successfulCompletion(commandId))
completionStreamMock.send(successfulCompletion(commandId))
completionStreamMock.send(successfulCompletion(submissionId, commandId))
completionStreamMock.send(successfulCompletion(submissionId, commandId))
results.expectNext(
Ctx(context, Right(CompletionResponse.CompletionSuccess(commandId, "", successStatus)))
@ -435,11 +459,12 @@ class CommandTrackerFlowTest
submissions.sendNext(submission)
val status = Status(Code.INVALID_ARGUMENT.value)
val status = StatusProto(Code.INVALID_ARGUMENT.value)
val failureCompletion =
Completion(
commandId,
Some(status),
submissionId = submissionId,
)
completionStreamMock.send(CompletionStreamElement.CompletionElement(failureCompletion))
@ -453,27 +478,86 @@ class CommandTrackerFlowTest
}
}
"a completion without the submission id arrives" should {
"output a failure if there are multiple pending commands with the same command id" in {
val Handle(submissions, results, _, completionStreamMock) =
runCommandTrackingFlow(allSubmissionsSuccessful)
submissions.sendNext(newSubmission("submissionId", "commandId"))
submissions.sendNext(newSubmission("anotherSubmissionId", "commandId"))
val completionWithoutSubmissionId =
Completion(
commandId,
Some(successStatus),
submissionId = "",
)
completionStreamMock.send(
CompletionStreamElement.CompletionElement(completionWithoutSubmissionId)
)
results.expectNext(
Ctx(
context,
Left(
CompletionResponse.NotOkResponse(
commandId = commandId,
grpcStatus = StatusProto.of(
Status.Code.INTERNAL.value(),
s"There are multiple pending commands for the id $commandId. This can only happen for the mutating schema.",
Seq.empty,
),
)
),
)
)
succeed
}
"output a successful completion" in {
val Handle(submissions, results, _, completionStreamMock) =
runCommandTrackingFlow(allSubmissionsSuccessful)
submissions.sendNext(submission)
val completionWithoutSubmissionId =
Completion(
commandId,
Some(successStatus),
submissionId = "",
)
completionStreamMock.send(
CompletionStreamElement.CompletionElement(completionWithoutSubmissionId)
)
results.expectNext(
Ctx(context, Right(CompletionResponse.CompletionSuccess(commandId, "", successStatus)))
)
succeed
}
}
"a multitude of successful completions arrive for submitted commands" should {
"output all expected values" in {
val cmdCount = 1000
val commandIds = 1.to(cmdCount).map(_.toString)
val identifiers =
1.to(cmdCount).map(_.toString).map(id => s"submission-$id" -> s"command-$id")
val Handle(submissions, results, _, completionStreamMock) =
runCommandTrackingFlow(allSubmissionsSuccessful)
results.request(cmdCount.toLong - 1)
commandIds.foreach { commandId =>
submissions.sendNext(submission.copy(value = commandWithId(commandId)))
identifiers.foreach { case (submissionId, commandId) =>
submissions.sendNext(submission.copy(value = commandWithIds(submissionId, commandId)))
}
commandIds.foreach { commandId =>
completionStreamMock.send(successfulCompletion(commandId))
identifiers.foreach { case (submissionId, commandId) =>
completionStreamMock.send(successfulCompletion(submissionId, commandId))
}
results.expectNextUnorderedN(commandIds.map { commandId =>
results.expectNextUnorderedN(identifiers.map { case (_, commandId) =>
val successCompletion =
Right(CompletionResponse.CompletionSuccess(commandId, "", successStatus))
Ctx(context, successCompletion)
@ -498,10 +582,10 @@ class CommandTrackerFlowTest
else Future.unit
} yield ()
def sendCommand(commandId: String) = {
submissions.sendNext(submission.copy(value = commandWithId(commandId)))
def sendCommand(submissionId: String, commandId: String) = {
submissions.sendNext(submission.copy(value = commandWithIds(submissionId, commandId)))
for {
_ <- completionStreamMock.send(successfulCompletion(commandId))
_ <- completionStreamMock.send(successfulCompletion(submissionId, commandId))
_ = results.request(1)
_ = results.expectNext(
Ctx(
@ -525,12 +609,12 @@ class CommandTrackerFlowTest
for {
_ <- checkOffset(LedgerOffset(Boundary(LEDGER_BEGIN)))
_ <- sendCommand("1")
_ <- sendCommand("submission-1", "command-1")
_ <- sendCheckPoint(checkPointOffset)
_ <- checkOffset(LedgerOffset(Boundary(LEDGER_BEGIN)))
_ <- breakUntilOffsetArrives()
_ <- checkOffset(checkPointOffset)
_ <- sendCommand("2")
_ <- sendCommand("submission-2", "command-2")
} yield {
succeed
}
@ -540,13 +624,17 @@ class CommandTrackerFlowTest
}
private def commandWithId(commandId: String) = {
private def commandWithIds(submissionId: String, commandId: String) = {
val request = submission.value
request.copy(commands = request.commands.copy(commandId = commandId))
request.copy(commands =
request.commands.copy(commandId = commandId, submissionId = submissionId)
)
}
private def successfulCompletion(commandId: String) =
CompletionStreamElement.CompletionElement(Completion(commandId, Some(successStatus)))
private def successfulCompletion(submissionId: String, commandId: String) =
CompletionStreamElement.CompletionElement(
Completion(commandId, Some(successStatus), submissionId = submissionId)
)
private def checkPoint(ledgerOffset: LedgerOffset) =
CompletionStreamElement.CheckpointElement(
@ -558,8 +646,8 @@ class CommandTrackerFlowTest
private def runCommandTrackingFlow(
submissionFlow: Flow[
Ctx[(Int, String), CommandSubmission],
Ctx[(Int, String), Try[Empty]],
Ctx[(Int, TrackedCommandKey), CommandSubmission],
Ctx[(Int, TrackedCommandKey), Try[Empty]],
NotUsed,
],
maximumCommandTimeout: Duration = Duration.ofSeconds(10),

View File

@ -23,12 +23,12 @@ import com.daml.ledger.api.v1.transaction_service.{
GetTransactionResponse,
}
import com.daml.ledger.api.validation.CommandsValidator
import com.daml.ledger.client.services.commands.tracker.CompletionResponse
import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{
CompletionFailure,
CompletionSuccess,
TrackedCompletionFailure,
}
import com.daml.ledger.client.services.commands.tracker.{CompletionResponse, TrackedCommandKey}
import com.daml.ledger.client.services.commands.{
CommandCompletionSource,
CommandSubmission,
@ -79,6 +79,7 @@ private[apiserver] final class ApiCommandService private[services] (
): Future[Either[TrackedCompletionFailure, CompletionSuccess]] = {
val commands = request.getCommands
withEnrichedLoggingContext(
logging.submissionId(commands.submissionId),
logging.commandId(commands.commandId),
logging.partyString(commands.party),
logging.actAsStrings(commands.actAs),
@ -160,8 +161,11 @@ private[apiserver] object ApiCommandService {
private val trackerCleanupInterval = 30.seconds
type SubmissionFlow = Flow[
Ctx[(Promise[Either[CompletionFailure, CompletionSuccess]], String), CommandSubmission],
Ctx[(Promise[Either[CompletionFailure, CompletionSuccess]], String), Try[Empty]],
Ctx[
(Promise[Either[CompletionFailure, CompletionSuccess]], TrackedCommandKey),
CommandSubmission,
],
Ctx[(Promise[Either[CompletionFailure, CompletionSuccess]], TrackedCommandKey), Try[Empty]],
NotUsed,
]