diff --git a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/tracker/CommandTracker.scala b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/tracker/CommandTracker.scala index e994a776d3c..43b244738ea 100644 --- a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/tracker/CommandTracker.scala +++ b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/tracker/CommandTracker.scala @@ -62,6 +62,8 @@ private[commands] class CommandTracker[Context]( private val logger = LoggerFactory.getLogger(this.getClass.getName) + type ContextualizedCompletionResponse = Ctx[Context, Either[CompletionFailure, CompletionSuccess]] + val submitRequestIn: Inlet[Ctx[Context, CommandSubmission]] = Inlet[Ctx[Context, CommandSubmission]]("submitRequestIn") val submitRequestOut: Outlet[Ctx[(Context, TrackedCommandKey), CommandSubmission]] = @@ -71,8 +73,8 @@ private[commands] class CommandTracker[Context]( Inlet[Either[Ctx[(Context, TrackedCommandKey), Try[Empty]], CompletionStreamElement]]( "commandResultIn" ) - val resultOut: Outlet[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]] = - Outlet[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]]("resultOut") + val resultOut: Outlet[ContextualizedCompletionResponse] = + Outlet[ContextualizedCompletionResponse]("resultOut") val offsetOut: Outlet[LedgerOffset] = Outlet[LedgerOffset]("offsetOut") @@ -93,7 +95,7 @@ private[commands] class CommandTracker[Context]( override protected def onTimer(timerKey: Any): Unit = { timerKey match { case `timeout_detection` => - val timeouts = getOutputForTimeout(Instant.now) + val timeouts = getResponsesForTimeouts(Instant.now) if (timeouts.nonEmpty) emitMultiple(resultOut, timeouts.to(immutable.Iterable)) case _ => // unknown timer, nothing to do } @@ -164,7 +166,7 @@ private[commands] class CommandTracker[Context]( pushResultOrPullCommandResultIn(handleSubmitResponse(submitResponse)) case Right(CompletionStreamElement.CompletionElement(completion)) => - pushResultOrPullCommandResultIn(getOutputForCompletion(completion)) + pushResultOrPullCommandResultIn(getResponsesForCompletion(completion)) case Right(CompletionStreamElement.CheckpointElement(checkpoint)) => if (!hasBeenPulled(commandResultIn)) pull(commandResultIn) @@ -185,7 +187,7 @@ private[commands] class CommandTracker[Context]( ) private def pushResultOrPullCommandResultIn( - compl: Seq[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]] + completionResponses: Seq[ContextualizedCompletionResponse] ): Unit = { // The command tracker detects timeouts outside the regular pull/push // mechanism of the input/output ports. Basically the timeout @@ -194,10 +196,10 @@ private[commands] class CommandTracker[Context]( // even though it hasn't been pulled again in the meantime. Using `emit` // instead of `push` when a completion arrives makes akka take care of // handling the signaling properly. - if (compl.isEmpty && !hasBeenPulled(commandResultIn)) { + if (completionResponses.isEmpty && !hasBeenPulled(commandResultIn)) { pull(commandResultIn) } - emitMultiple(resultOut, compl.to(immutable.Iterable)) + emitMultiple(resultOut, completionResponses.to(immutable.Iterable)) } private def completeStageIfTerminal(): Unit = { @@ -210,12 +212,15 @@ private[commands] class CommandTracker[Context]( private def handleSubmitResponse( submitResponse: Ctx[(Context, TrackedCommandKey), Try[Empty]] - ): Seq[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]] = { + ): Seq[ContextualizedCompletionResponse] = { val Ctx((_, commandKey), value, _) = submitResponse value match { case Failure(GrpcException(status @ GrpcStatus(code, _), metadata)) if !nonTerminalCodes(code) => - getOutputForTerminalStatusCode(commandKey, GrpcStatus.toProto(status, metadata)) + getResponseForTerminalStatusCode( + commandKey, + GrpcStatus.toProto(status, metadata), + ).toList case Failure(throwable) => logger.warn( s"Service responded with error for submitting command with context ${submitResponse.context}. Status of command is unknown. watching for completion...", @@ -238,7 +243,7 @@ private[commands] class CommandTracker[Context]( logger.trace(s"Begin tracking of command $commandId for submission $submissionId.") if (commands.submissionId.isEmpty) { throw new IllegalArgumentException( - s"The submission id for the command $commandId is empty. This should not happen." + s"The submission ID for the command ID $commandId is empty. This should not happen." ) } if (pendingCommands.contains(TrackedCommandKey(submissionId, commandId))) { @@ -274,7 +279,9 @@ private[commands] class CommandTracker[Context]( () } - private def getOutputForTimeout(instant: Instant) = { + private def getResponsesForTimeouts( + instant: Instant + ): Seq[ContextualizedCompletionResponse] = { logger.trace("Checking timeouts at {}", instant) pendingCommands.view.flatMap { case (commandKey, trackingData) => if (trackingData.commandTimeout.isBefore(instant)) { @@ -298,32 +305,29 @@ private[commands] class CommandTracker[Context]( }.toSeq } - private def getOutputForCompletion( + private def getResponsesForCompletion( completion: Completion - ): Seq[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]] = { - val completionDescription = completion.status match { - case Some(StatusProto(code, _, _, _)) if code == Status.Code.OK.value => - "successful completion of command" - case _ => "failed completion of command" - } + ): Seq[ContextualizedCompletionResponse] = { val commandId = completion.commandId - logger.trace( - "Handling {} {} from submission {}.", - completionDescription, - commandId, - completion.submissionId, - ) - val maybeSubmissionId = Option(completion.submissionId).collect { case id if id.nonEmpty => id } - val value = pendingCommandKeys(maybeSubmissionId, commandId) - val trackedCommands = - value.flatMap(pendingCommands.remove(_).toList) + logger.trace { + val completionDescription = completion.status match { + case Some(StatusProto(code, _, _, _)) if code == Status.Code.OK.value => + "successful completion of command" + case _ => "failed completion of command" + } + s"Handling $completionDescription $commandId from submission $maybeSubmissionId." + } - if (trackedCommands.size > 1) { - trackedCommands.map { trackingData => + val trackedCommandKeys = pendingCommandKeys(maybeSubmissionId, commandId) + val trackingDataForSubmissionId = + trackedCommandKeys.flatMap(pendingCommands.remove(_).toList) + + if (trackingDataForSubmissionId.size > 1) { + trackingDataForSubmissionId.map { trackingData => Ctx( trackingData.context, Left( @@ -331,7 +335,7 @@ private[commands] class CommandTracker[Context]( commandId, StatusProto.of( Status.Code.INTERNAL.value(), - s"There are multiple pending commands for the id $commandId. This can only happen for the mutating schema.", + s"There are multiple pending commands with ID: $commandId for submission ID: $maybeSubmissionId. This can only happen for the mutating schema that shouldn't be used anymore, as it doesn't fully support command deduplication.", Seq.empty, ), ) @@ -339,7 +343,7 @@ private[commands] class CommandTracker[Context]( ) } } else { - trackedCommands.map(trackingData => + trackingDataForSubmissionId.map(trackingData => Ctx(trackingData.context, tracker.CompletionResponse(completion)) ) } @@ -353,10 +357,10 @@ private[commands] class CommandTracker[Context]( pendingCommands.keys.filter(_.commandId == commandId).toList } - private def getOutputForTerminalStatusCode( + private def getResponseForTerminalStatusCode( commandKey: TrackedCommandKey, status: StatusProto, - ): Seq[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]] = { + ): Option[ContextualizedCompletionResponse] = { logger.trace( s"Handling failure of command ${commandKey.commandId} from submission ${commandKey.submissionId}." ) @@ -380,7 +384,6 @@ private[commands] class CommandTracker[Context]( ) None } - .toList } override def postStop(): Unit = { diff --git a/ledger/ledger-api-client/src/test/suite/scala/com/digitalasset/ledger/client/services/commands/CommandTrackerFlowTest.scala b/ledger/ledger-api-client/src/test/suite/scala/com/digitalasset/ledger/client/services/commands/CommandTrackerFlowTest.scala index 918c0ad53d6..a72b1e34194 100644 --- a/ledger/ledger-api-client/src/test/suite/scala/com/digitalasset/ledger/client/services/commands/CommandTrackerFlowTest.scala +++ b/ledger/ledger-api-client/src/test/suite/scala/com/digitalasset/ledger/client/services/commands/CommandTrackerFlowTest.scala @@ -162,7 +162,7 @@ class CommandTrackerFlowTest val actualException = results.expectError() actualException shouldBe an[IllegalArgumentException] - actualException.getMessage shouldBe s"The submission id for the command $commandId is empty. This should not happen." + actualException.getMessage shouldBe s"The submission ID for the command ID $commandId is empty. This should not happen." } } @@ -520,7 +520,7 @@ class CommandTrackerFlowTest commandId = commandId, grpcStatus = StatusProto.of( Status.Code.INTERNAL.value(), - s"There are multiple pending commands for the id $commandId. This can only happen for the mutating schema.", + s"There are multiple pending commands with ID: $commandId for submission ID: None. This can only happen for the mutating schema that shouldn't be used anymore, as it doesn't fully support command deduplication.", Seq.empty, ), )