mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-19 16:57:40 +03:00
kvutils - For duplicate command rejections, add the submission id as metadata [KVL-1175] (#11848)
changelog_begin kvutils - For duplicate command rejections, the submission id of the already accepted transaction is returning as part of the gRPC metadata. The submission id will be included under the key `existing_submission_id`. changelog_end
This commit is contained in:
parent
970243dd46
commit
59eb0d2eff
@ -672,11 +672,17 @@ object LedgerApiErrors extends LedgerApiErrorGroup {
|
||||
ErrorCategory.InvalidGivenCurrentSystemStateResourceExists,
|
||||
) {
|
||||
|
||||
case class Reject(override val definiteAnswer: Boolean = false)(implicit
|
||||
case class Reject(
|
||||
override val definiteAnswer: Boolean = false,
|
||||
existingCommandSubmissionId: Option[String],
|
||||
)(implicit
|
||||
loggingContext: ContextualizedErrorLogger
|
||||
) extends LoggingTransactionErrorImpl(
|
||||
cause = "A command with the given command id has already been successfully processed"
|
||||
)
|
||||
) {
|
||||
override def context: Map[String, String] =
|
||||
super.context ++ existingCommandSubmissionId.map("existing_submission_id" -> _).toList
|
||||
}
|
||||
}
|
||||
|
||||
@Explanation("An input contract has been archived by a concurrent transaction submission.")
|
||||
|
@ -167,7 +167,7 @@ class ErrorFactories private (errorCodesVersionSwitcher: ErrorCodesVersionSwitch
|
||||
v2 = LedgerApiErrors.InternalError.VersionService(message).asGrpcError,
|
||||
)
|
||||
|
||||
def duplicateCommandException(implicit
|
||||
def duplicateCommandException(existingSubmissionId: Option[String])(implicit
|
||||
contextualizedErrorLogger: ContextualizedErrorLogger
|
||||
): StatusRuntimeException =
|
||||
errorCodesVersionSwitcher.choose(
|
||||
@ -183,7 +183,9 @@ class ErrorFactories private (errorCodesVersionSwitcher: ErrorCodesVersionSwitch
|
||||
contextualizedErrorLogger.info(exception.getMessage)
|
||||
exception
|
||||
},
|
||||
v2 = LedgerApiErrors.ConsistencyErrors.DuplicateCommand.Reject().asGrpcError,
|
||||
v2 = LedgerApiErrors.ConsistencyErrors.DuplicateCommand
|
||||
.Reject(existingCommandSubmissionId = existingSubmissionId)
|
||||
.asGrpcError,
|
||||
)
|
||||
|
||||
/** @param expected Expected ledger id.
|
||||
|
@ -247,7 +247,7 @@ class ErrorFactoriesSpec
|
||||
}
|
||||
|
||||
"return the DuplicateCommandException" in {
|
||||
assertVersionedError(_.duplicateCommandException)(
|
||||
assertVersionedError(_.duplicateCommandException(None))(
|
||||
v1_code = Code.ALREADY_EXISTS,
|
||||
v1_message = "Duplicate command",
|
||||
v1_details = Seq(definiteAnswers(false)),
|
||||
|
@ -186,7 +186,7 @@ private[apiserver] final class ApiSubmissionService private[services] (
|
||||
case _: CommandDeduplicationDuplicate =>
|
||||
metrics.daml.commands.deduplicatedCommands.mark()
|
||||
Future.failed(
|
||||
errorFactories.duplicateCommandException
|
||||
errorFactories.duplicateCommandException(None)
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -47,6 +47,7 @@ message InvalidLedgerTime {
|
||||
// key during its implementation specific deduplication window.
|
||||
message Duplicate {
|
||||
string details = 1;
|
||||
string submission_id = 2;
|
||||
}
|
||||
|
||||
// A party mentioned as a stakeholder or actor has not been on-boarded on
|
||||
|
@ -62,6 +62,7 @@ message DamlCommandDedupValue {
|
||||
google.protobuf.Timestamp record_time = 3;
|
||||
PreExecutionDeduplicationBounds record_time_bounds = 4;
|
||||
}
|
||||
string submission_id = 5;
|
||||
}
|
||||
message PreExecutionDeduplicationBounds {
|
||||
// record_time is not available during pre-execution
|
||||
|
@ -363,6 +363,7 @@ object KeyValueConsumption {
|
||||
recordTime,
|
||||
rejectionEntry,
|
||||
errorVersionSwitch,
|
||||
None, // Not available for historical entries
|
||||
)(contextualizedErrorLogger(loggingContext, rejectionEntry))
|
||||
)
|
||||
|
||||
|
@ -68,7 +68,7 @@ private[transaction] object CommandDeduplication {
|
||||
maybeDedupValue,
|
||||
)
|
||||
} else {
|
||||
duplicateRejection(commitContext, transactionEntry.submitterInfo)
|
||||
duplicateRejection(commitContext, transactionEntry.submitterInfo, maybeDedupValue)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -156,22 +156,28 @@ private[transaction] object CommandDeduplication {
|
||||
buildDuration(rejectionDeduplicationDuration)
|
||||
)
|
||||
.build,
|
||||
maybeDedupValue,
|
||||
)
|
||||
case None =>
|
||||
duplicateRejection(commitContext, transactionEntry.submitterInfo)
|
||||
duplicateRejection(commitContext, transactionEntry.submitterInfo, maybeDedupValue)
|
||||
}
|
||||
}
|
||||
|
||||
private def duplicateRejection(
|
||||
commitContext: CommitContext,
|
||||
submitterInfo: DamlSubmitterInfo,
|
||||
dedupValue: Option[DamlCommandDedupValue],
|
||||
)(implicit loggingContext: LoggingContext) = {
|
||||
rejections.reject(
|
||||
DamlTransactionRejectionEntry.newBuilder
|
||||
.setSubmitterInfo(submitterInfo)
|
||||
// No duplicate rejection is a definite answer as the deduplication entry will eventually expire.
|
||||
.setDefiniteAnswer(false)
|
||||
.setDuplicateCommand(Duplicate.newBuilder.setDetails("")),
|
||||
.setDuplicateCommand(
|
||||
Duplicate.newBuilder
|
||||
.setDetails("")
|
||||
.setSubmissionId(dedupValue.map(_.getSubmissionId).getOrElse(""))
|
||||
),
|
||||
"the command is a duplicate",
|
||||
commitContext.recordTime,
|
||||
)
|
||||
@ -186,7 +192,9 @@ private[transaction] object CommandDeduplication {
|
||||
if (!transactionEntry.submitterInfo.hasDeduplicationDuration) {
|
||||
throw Err.InvalidSubmission("Deduplication duration is not set.")
|
||||
}
|
||||
val commandDedupBuilder = DamlCommandDedupValue.newBuilder
|
||||
val commandDedupBuilder = DamlCommandDedupValue.newBuilder.setSubmissionId(
|
||||
transactionEntry.submitterInfo.getSubmissionId
|
||||
)
|
||||
commitContext.recordTime
|
||||
.map(Conversions.buildTimestamp) match {
|
||||
case Some(recordTime) =>
|
||||
|
@ -71,6 +71,7 @@ private[kvutils] object TransactionRejections {
|
||||
recordTime: Timestamp,
|
||||
rejectionEntry: DamlTransactionRejectionEntry,
|
||||
errorVersionSwitch: ValueSwitch,
|
||||
existingCommandSubmissionId: Option[String],
|
||||
)(implicit loggingContext: ContextualizedErrorLogger): Update.CommandRejected = {
|
||||
val definiteAnswer = rejectionEntry.getDefiniteAnswer
|
||||
Update.CommandRejected(
|
||||
@ -82,7 +83,7 @@ private[kvutils] object TransactionRejections {
|
||||
reasonTemplate = FinalReason(
|
||||
errorVersionSwitch.choose(
|
||||
V1.duplicateCommandsRejectionStatus(definiteAnswer, Code.ALREADY_EXISTS),
|
||||
V2.duplicateCommandsRejectionStatus(definiteAnswer),
|
||||
V2.duplicateCommandsRejectionStatus(definiteAnswer, existingCommandSubmissionId),
|
||||
)
|
||||
),
|
||||
)
|
||||
@ -302,15 +303,19 @@ private[kvutils] object TransactionRejections {
|
||||
def duplicateCommandStatus(
|
||||
entry: DamlTransactionRejectionEntry,
|
||||
errorVersionSwitch: ValueSwitch,
|
||||
)(implicit loggingContext: ContextualizedErrorLogger): Status =
|
||||
)(implicit loggingContext: ContextualizedErrorLogger): Status = {
|
||||
val rejectionReason = entry.getDuplicateCommand
|
||||
errorVersionSwitch.choose(
|
||||
V1.status(
|
||||
entry,
|
||||
Code.ALREADY_EXISTS,
|
||||
"Duplicate commands",
|
||||
),
|
||||
V2.duplicateCommandsRejectionStatus(),
|
||||
V2.duplicateCommandsRejectionStatus(existingCommandSubmissionId =
|
||||
Some(rejectionReason.getSubmissionId).filter(_.nonEmpty)
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
@nowarn("msg=deprecated")
|
||||
def submitterCannotActViaParticipantStatus(
|
||||
@ -515,11 +520,12 @@ private[kvutils] object TransactionRejections {
|
||||
.asStatus
|
||||
|
||||
def duplicateCommandsRejectionStatus(
|
||||
definiteAnswer: Boolean = false
|
||||
definiteAnswer: Boolean = false,
|
||||
existingCommandSubmissionId: Option[String],
|
||||
)(implicit loggingContext: ContextualizedErrorLogger): Status =
|
||||
GrpcStatus.toProto(
|
||||
LedgerApiErrors.ConsistencyErrors.DuplicateCommand
|
||||
.Reject(definiteAnswer)
|
||||
.Reject(definiteAnswer, existingCommandSubmissionId)
|
||||
.asGrpcStatusFromContext
|
||||
)
|
||||
|
||||
|
@ -445,9 +445,9 @@ class ConversionsSpec extends AnyWordSpec with Matchers with OptionValues {
|
||||
Map.empty,
|
||||
),
|
||||
(
|
||||
_.setDuplicateCommand(Duplicate.newBuilder()),
|
||||
_.setDuplicateCommand(Duplicate.newBuilder().setSubmissionId("not_used")),
|
||||
Code.ALREADY_EXISTS,
|
||||
Map.empty,
|
||||
Map(),
|
||||
),
|
||||
(
|
||||
_.setSubmitterCannotActViaParticipant(
|
||||
@ -496,6 +496,23 @@ class ConversionsSpec extends AnyWordSpec with Matchers with OptionValues {
|
||||
}
|
||||
}
|
||||
|
||||
"decode duplicate command v2" in {
|
||||
val finalReason = Conversions
|
||||
.decodeTransactionRejectionEntry(
|
||||
DamlTransactionRejectionEntry
|
||||
.newBuilder()
|
||||
.setDuplicateCommand(Duplicate.newBuilder().setSubmissionId("submissionId"))
|
||||
.build(),
|
||||
v2ErrorSwitch,
|
||||
)
|
||||
finalReason.code shouldBe Code.ALREADY_EXISTS.value()
|
||||
finalReason.definiteAnswer shouldBe false
|
||||
val actualDetails = finalReasonDetails(finalReason)
|
||||
actualDetails should contain allElementsOf Map(
|
||||
"existing_submission_id" -> "submissionId"
|
||||
)
|
||||
}
|
||||
|
||||
"decode completion info" should {
|
||||
val recordTime = LfTimestamp.now()
|
||||
def submitterInfo = {
|
||||
|
@ -171,6 +171,20 @@ class CommandDeduplicationSpec
|
||||
deduplicateStepHasTransactionRejectionEntry(context)
|
||||
}
|
||||
}
|
||||
|
||||
"include the submission id in the rejection" in {
|
||||
val submissionId = "submissionId"
|
||||
val (_, context) = contextBuilder(timestamp =>
|
||||
Some(
|
||||
newDedupValue(builder =>
|
||||
timeSetter(timestamp)(builder.setSubmissionId(submissionId))
|
||||
)
|
||||
)
|
||||
)
|
||||
val rejection = deduplicateStepHasTransactionRejectionEntry(context)
|
||||
rejection.getDuplicateCommand.getSubmissionId shouldBe submissionId
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -283,6 +297,23 @@ class CommandDeduplicationSpec
|
||||
) shouldBe recordTime
|
||||
}
|
||||
|
||||
"set the submission id in the dedup value" in {
|
||||
val submissionId = "submissionId"
|
||||
val (context, transactionEntrySummary) =
|
||||
buildContextAndTransaction(
|
||||
submissionTime,
|
||||
_.setDeduplicationDuration(Conversions.buildDuration(deduplicationDuration))
|
||||
.setSubmissionId(submissionId),
|
||||
Some(timestamp),
|
||||
)
|
||||
setDeduplicationEntryStep(context, transactionEntrySummary)
|
||||
deduplicateValueStoredInContext(context, transactionEntrySummary)
|
||||
.map(
|
||||
_.getSubmissionId
|
||||
)
|
||||
.value shouldBe submissionId
|
||||
}
|
||||
|
||||
"throw an error for missing record time bounds" in {
|
||||
val (context, transactionEntrySummary) =
|
||||
buildContextAndTransaction(
|
||||
|
Loading…
Reference in New Issue
Block a user