diff --git a/ledger/error/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala b/ledger/error/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala index 4760133ef9..f685715503 100644 --- a/ledger/error/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala +++ b/ledger/error/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala @@ -672,11 +672,17 @@ object LedgerApiErrors extends LedgerApiErrorGroup { ErrorCategory.InvalidGivenCurrentSystemStateResourceExists, ) { - case class Reject(override val definiteAnswer: Boolean = false)(implicit + case class Reject( + override val definiteAnswer: Boolean = false, + existingCommandSubmissionId: Option[String], + )(implicit loggingContext: ContextualizedErrorLogger ) extends LoggingTransactionErrorImpl( cause = "A command with the given command id has already been successfully processed" - ) + ) { + override def context: Map[String, String] = + super.context ++ existingCommandSubmissionId.map("existing_submission_id" -> _).toList + } } @Explanation("An input contract has been archived by a concurrent transaction submission.") diff --git a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/ErrorFactories.scala b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/ErrorFactories.scala index fae51cacfd..3130d70ff2 100644 --- a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/ErrorFactories.scala +++ b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/ErrorFactories.scala @@ -167,7 +167,7 @@ class ErrorFactories private (errorCodesVersionSwitcher: ErrorCodesVersionSwitch v2 = LedgerApiErrors.InternalError.VersionService(message).asGrpcError, ) - def duplicateCommandException(implicit + def duplicateCommandException(existingSubmissionId: Option[String])(implicit contextualizedErrorLogger: ContextualizedErrorLogger ): StatusRuntimeException = errorCodesVersionSwitcher.choose( @@ -183,7 +183,9 @@ class ErrorFactories private (errorCodesVersionSwitcher: ErrorCodesVersionSwitch contextualizedErrorLogger.info(exception.getMessage) exception }, - v2 = LedgerApiErrors.ConsistencyErrors.DuplicateCommand.Reject().asGrpcError, + v2 = LedgerApiErrors.ConsistencyErrors.DuplicateCommand + .Reject(existingCommandSubmissionId = existingSubmissionId) + .asGrpcError, ) /** @param expected Expected ledger id. diff --git a/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/validation/ErrorFactoriesSpec.scala b/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/validation/ErrorFactoriesSpec.scala index 9c7a566aa9..c57df38ac2 100644 --- a/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/validation/ErrorFactoriesSpec.scala +++ b/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/validation/ErrorFactoriesSpec.scala @@ -247,7 +247,7 @@ class ErrorFactoriesSpec } "return the DuplicateCommandException" in { - assertVersionedError(_.duplicateCommandException)( + assertVersionedError(_.duplicateCommandException(None))( v1_code = Code.ALREADY_EXISTS, v1_message = "Duplicate command", v1_details = Seq(definiteAnswers(false)), diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiSubmissionService.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiSubmissionService.scala index 4d8ff2c378..b910e12011 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiSubmissionService.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiSubmissionService.scala @@ -186,7 +186,7 @@ private[apiserver] final class ApiSubmissionService private[services] ( case _: CommandDeduplicationDuplicate => metrics.daml.commands.deduplicatedCommands.mark() Future.failed( - errorFactories.duplicateCommandException + errorFactories.duplicateCommandException(None) ) } diff --git a/ledger/participant-state/kvutils/src/main/protobuf/com/daml/ledger/participant/state/kvutils/store/events/rejection_reason.proto b/ledger/participant-state/kvutils/src/main/protobuf/com/daml/ledger/participant/state/kvutils/store/events/rejection_reason.proto index 274e3732b7..c9fe22e525 100644 --- a/ledger/participant-state/kvutils/src/main/protobuf/com/daml/ledger/participant/state/kvutils/store/events/rejection_reason.proto +++ b/ledger/participant-state/kvutils/src/main/protobuf/com/daml/ledger/participant/state/kvutils/store/events/rejection_reason.proto @@ -47,6 +47,7 @@ message InvalidLedgerTime { // key during its implementation specific deduplication window. message Duplicate { string details = 1; + string submission_id = 2; } // A party mentioned as a stakeholder or actor has not been on-boarded on diff --git a/ledger/participant-state/kvutils/src/main/protobuf/com/daml/ledger/participant/state/kvutils/store/state.proto b/ledger/participant-state/kvutils/src/main/protobuf/com/daml/ledger/participant/state/kvutils/store/state.proto index 2ee2949729..87493e5652 100644 --- a/ledger/participant-state/kvutils/src/main/protobuf/com/daml/ledger/participant/state/kvutils/store/state.proto +++ b/ledger/participant-state/kvutils/src/main/protobuf/com/daml/ledger/participant/state/kvutils/store/state.proto @@ -62,6 +62,7 @@ message DamlCommandDedupValue { google.protobuf.Timestamp record_time = 3; PreExecutionDeduplicationBounds record_time_bounds = 4; } + string submission_id = 5; } message PreExecutionDeduplicationBounds { // record_time is not available during pre-execution diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala index 3a1b53bc77..c0dec77235 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala @@ -363,6 +363,7 @@ object KeyValueConsumption { recordTime, rejectionEntry, errorVersionSwitch, + None, // Not available for historical entries )(contextualizedErrorLogger(loggingContext, rejectionEntry)) ) diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/committer/transaction/CommandDeduplication.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/committer/transaction/CommandDeduplication.scala index 8031cbfb96..b614e45a9c 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/committer/transaction/CommandDeduplication.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/committer/transaction/CommandDeduplication.scala @@ -68,7 +68,7 @@ private[transaction] object CommandDeduplication { maybeDedupValue, ) } else { - duplicateRejection(commitContext, transactionEntry.submitterInfo) + duplicateRejection(commitContext, transactionEntry.submitterInfo, maybeDedupValue) } } } @@ -156,22 +156,28 @@ private[transaction] object CommandDeduplication { buildDuration(rejectionDeduplicationDuration) ) .build, + maybeDedupValue, ) case None => - duplicateRejection(commitContext, transactionEntry.submitterInfo) + duplicateRejection(commitContext, transactionEntry.submitterInfo, maybeDedupValue) } } private def duplicateRejection( commitContext: CommitContext, submitterInfo: DamlSubmitterInfo, + dedupValue: Option[DamlCommandDedupValue], )(implicit loggingContext: LoggingContext) = { rejections.reject( DamlTransactionRejectionEntry.newBuilder .setSubmitterInfo(submitterInfo) // No duplicate rejection is a definite answer as the deduplication entry will eventually expire. .setDefiniteAnswer(false) - .setDuplicateCommand(Duplicate.newBuilder.setDetails("")), + .setDuplicateCommand( + Duplicate.newBuilder + .setDetails("") + .setSubmissionId(dedupValue.map(_.getSubmissionId).getOrElse("")) + ), "the command is a duplicate", commitContext.recordTime, ) @@ -186,7 +192,9 @@ private[transaction] object CommandDeduplication { if (!transactionEntry.submitterInfo.hasDeduplicationDuration) { throw Err.InvalidSubmission("Deduplication duration is not set.") } - val commandDedupBuilder = DamlCommandDedupValue.newBuilder + val commandDedupBuilder = DamlCommandDedupValue.newBuilder.setSubmissionId( + transactionEntry.submitterInfo.getSubmissionId + ) commitContext.recordTime .map(Conversions.buildTimestamp) match { case Some(recordTime) => diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/updates/TransactionRejections.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/updates/TransactionRejections.scala index c63b7e0ec1..d3080f0f11 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/updates/TransactionRejections.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/updates/TransactionRejections.scala @@ -71,6 +71,7 @@ private[kvutils] object TransactionRejections { recordTime: Timestamp, rejectionEntry: DamlTransactionRejectionEntry, errorVersionSwitch: ValueSwitch, + existingCommandSubmissionId: Option[String], )(implicit loggingContext: ContextualizedErrorLogger): Update.CommandRejected = { val definiteAnswer = rejectionEntry.getDefiniteAnswer Update.CommandRejected( @@ -82,7 +83,7 @@ private[kvutils] object TransactionRejections { reasonTemplate = FinalReason( errorVersionSwitch.choose( V1.duplicateCommandsRejectionStatus(definiteAnswer, Code.ALREADY_EXISTS), - V2.duplicateCommandsRejectionStatus(definiteAnswer), + V2.duplicateCommandsRejectionStatus(definiteAnswer, existingCommandSubmissionId), ) ), ) @@ -302,15 +303,19 @@ private[kvutils] object TransactionRejections { def duplicateCommandStatus( entry: DamlTransactionRejectionEntry, errorVersionSwitch: ValueSwitch, - )(implicit loggingContext: ContextualizedErrorLogger): Status = + )(implicit loggingContext: ContextualizedErrorLogger): Status = { + val rejectionReason = entry.getDuplicateCommand errorVersionSwitch.choose( V1.status( entry, Code.ALREADY_EXISTS, "Duplicate commands", ), - V2.duplicateCommandsRejectionStatus(), + V2.duplicateCommandsRejectionStatus(existingCommandSubmissionId = + Some(rejectionReason.getSubmissionId).filter(_.nonEmpty) + ), ) + } @nowarn("msg=deprecated") def submitterCannotActViaParticipantStatus( @@ -515,11 +520,12 @@ private[kvutils] object TransactionRejections { .asStatus def duplicateCommandsRejectionStatus( - definiteAnswer: Boolean = false + definiteAnswer: Boolean = false, + existingCommandSubmissionId: Option[String], )(implicit loggingContext: ContextualizedErrorLogger): Status = GrpcStatus.toProto( LedgerApiErrors.ConsistencyErrors.DuplicateCommand - .Reject(definiteAnswer) + .Reject(definiteAnswer, existingCommandSubmissionId) .asGrpcStatusFromContext ) diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/ConversionsSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/ConversionsSpec.scala index 54785a5de7..89498ba1c4 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/ConversionsSpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/ConversionsSpec.scala @@ -445,9 +445,9 @@ class ConversionsSpec extends AnyWordSpec with Matchers with OptionValues { Map.empty, ), ( - _.setDuplicateCommand(Duplicate.newBuilder()), + _.setDuplicateCommand(Duplicate.newBuilder().setSubmissionId("not_used")), Code.ALREADY_EXISTS, - Map.empty, + Map(), ), ( _.setSubmitterCannotActViaParticipant( @@ -496,6 +496,23 @@ class ConversionsSpec extends AnyWordSpec with Matchers with OptionValues { } } + "decode duplicate command v2" in { + val finalReason = Conversions + .decodeTransactionRejectionEntry( + DamlTransactionRejectionEntry + .newBuilder() + .setDuplicateCommand(Duplicate.newBuilder().setSubmissionId("submissionId")) + .build(), + v2ErrorSwitch, + ) + finalReason.code shouldBe Code.ALREADY_EXISTS.value() + finalReason.definiteAnswer shouldBe false + val actualDetails = finalReasonDetails(finalReason) + actualDetails should contain allElementsOf Map( + "existing_submission_id" -> "submissionId" + ) + } + "decode completion info" should { val recordTime = LfTimestamp.now() def submitterInfo = { diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/committer/transaction/CommandDeduplicationSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/committer/transaction/CommandDeduplicationSpec.scala index 181eb34081..75550cb4ae 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/committer/transaction/CommandDeduplicationSpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/committer/transaction/CommandDeduplicationSpec.scala @@ -171,6 +171,20 @@ class CommandDeduplicationSpec deduplicateStepHasTransactionRejectionEntry(context) } } + + "include the submission id in the rejection" in { + val submissionId = "submissionId" + val (_, context) = contextBuilder(timestamp => + Some( + newDedupValue(builder => + timeSetter(timestamp)(builder.setSubmissionId(submissionId)) + ) + ) + ) + val rejection = deduplicateStepHasTransactionRejectionEntry(context) + rejection.getDuplicateCommand.getSubmissionId shouldBe submissionId + } + } } } @@ -283,6 +297,23 @@ class CommandDeduplicationSpec ) shouldBe recordTime } + "set the submission id in the dedup value" in { + val submissionId = "submissionId" + val (context, transactionEntrySummary) = + buildContextAndTransaction( + submissionTime, + _.setDeduplicationDuration(Conversions.buildDuration(deduplicationDuration)) + .setSubmissionId(submissionId), + Some(timestamp), + ) + setDeduplicationEntryStep(context, transactionEntrySummary) + deduplicateValueStoredInContext(context, transactionEntrySummary) + .map( + _.getSubmissionId + ) + .value shouldBe submissionId + } + "throw an error for missing record time bounds" in { val (context, transactionEntrySummary) = buildContextAndTransaction(