mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 01:07:18 +03:00
Deduplication flake: allow inflight-request error (#15609)
In rare situations a canton participant returns a SUBMISSION_ALREADY_IN_FLIGHT error instead of DUPLICATE_COMMAND due to a possible race condition between publishing the TransactionAccepted read service update and updating internal in-flight transactions. Particularly in Oracle runs this can result in a subsequence "duplicate" command to still be considered in-flight. Extended the tests to also accept the "in-flight" error code. Changelog_begin Changelog_end
This commit is contained in:
parent
253150e6e2
commit
936594f523
@ -69,13 +69,41 @@ object Assertions {
|
||||
assertSameElements(actual, Seq.empty)
|
||||
}
|
||||
|
||||
def assertGrpcErrorOneOf(t: Throwable, errors: ErrorCode*): Unit = {
|
||||
val hasErrorCode = errors.map(errorCode => Try(assertGrpcError(t, errorCode, None))).exists {
|
||||
case Success(_) => true
|
||||
case _ => false
|
||||
case class ExtendedAsserts(
|
||||
exceptionMessageSubstring: Option[String] = None,
|
||||
checkDefiniteAnswerMetadata: Boolean = false,
|
||||
additionalErrorAssertions: Throwable => Unit = _ => (),
|
||||
)
|
||||
|
||||
def assertGrpcErrorOneOf(
|
||||
t: Throwable,
|
||||
extendedChecks: Option[ExtendedAsserts],
|
||||
errors: ErrorCode*
|
||||
): Unit = {
|
||||
errors.map(errorCode => (errorCode, Try(assertGrpcError(t, errorCode, None)))).collectFirst {
|
||||
case (errorCode, Success(_)) => errorCode
|
||||
} match {
|
||||
case Some(matchedErrorCode) =>
|
||||
// In order to catch errors related to the "extended checks" rerun the check on the matching error code.
|
||||
// This results in a more specific error message if one of the error codes matches, but the other checks fail.
|
||||
extendedChecks.foreach {
|
||||
case ExtendedAsserts(
|
||||
exceptionMessageSubstring,
|
||||
checkDefiniteAnswerMetadata,
|
||||
additionalErrorAssertions,
|
||||
) =>
|
||||
assertGrpcError(
|
||||
t,
|
||||
matchedErrorCode,
|
||||
exceptionMessageSubstring,
|
||||
checkDefiniteAnswerMetadata,
|
||||
additionalErrorAssertions,
|
||||
)
|
||||
}
|
||||
|
||||
case None =>
|
||||
fail(s"gRPC failure did not contain one of the expected error codes $errors.", t)
|
||||
}
|
||||
if (hasErrorCode) ()
|
||||
else fail(s"gRPC failure did not contain one of the expected error codes $errors.", t)
|
||||
}
|
||||
|
||||
def assertGrpcError(
|
||||
|
@ -694,16 +694,20 @@ final class CommandDeduplicationIT(
|
||||
.submitAndWaitForTransaction(request)
|
||||
.mustFail("Request was accepted but we were expecting it to fail with a duplicate error")
|
||||
.map(
|
||||
assertGrpcError(
|
||||
assertGrpcErrorOneOf(
|
||||
_,
|
||||
errorCode = LedgerApiErrors.ConsistencyErrors.DuplicateCommand,
|
||||
exceptionMessageSubstring = None,
|
||||
checkDefiniteAnswerMetadata = true,
|
||||
assertDeduplicatedSubmissionIdAndOffsetOnError(
|
||||
acceptedSubmissionId,
|
||||
acceptedOffset,
|
||||
_,
|
||||
extendedChecks = Some(
|
||||
ExtendedAsserts(
|
||||
checkDefiniteAnswerMetadata = true,
|
||||
additionalErrorAssertions = assertDeduplicatedSubmissionIdAndOffsetOnError(
|
||||
acceptedSubmissionId,
|
||||
acceptedOffset,
|
||||
_,
|
||||
),
|
||||
)
|
||||
),
|
||||
LedgerApiErrors.ConsistencyErrors.DuplicateCommand,
|
||||
LedgerApiErrors.ConsistencyErrors.SubmissionAlreadyInFlight,
|
||||
)
|
||||
)
|
||||
|
||||
@ -719,7 +723,12 @@ final class CommandDeduplicationIT(
|
||||
request,
|
||||
parties: _*
|
||||
) { completion =>
|
||||
assertCompletionStatus(request.toString, completion, Code.ALREADY_EXISTS)
|
||||
assertCompletionStatus(
|
||||
request.toString,
|
||||
completion,
|
||||
Code.ALREADY_EXISTS, // Deduplication error
|
||||
Code.ABORTED, // Code for inflight request: Different grpc code implied by retryability of ContentionOnSharedResources
|
||||
)
|
||||
assertDeduplicatedSubmissionIdAndOffsetOnCompletion(
|
||||
acceptedSubmissionId,
|
||||
acceptedOffset,
|
||||
@ -730,11 +739,13 @@ final class CommandDeduplicationIT(
|
||||
private def assertCompletionStatus(
|
||||
requestString: String,
|
||||
response: CompletionResponse,
|
||||
statusCode: Code,
|
||||
statusCodes: Code*
|
||||
): Unit =
|
||||
assert(
|
||||
response.completion.getStatus.code == statusCode.value(),
|
||||
s"""Expecting completion with status code $statusCode but completion has status ${response.completion.status}.
|
||||
statusCodes.exists(response.completion.getStatus.code == _.value()),
|
||||
s"""Expecting completion with status code(s) ${statusCodes.mkString(
|
||||
","
|
||||
)} but completion has status ${response.completion.status}.
|
||||
|Request: $requestString
|
||||
|Response: $response
|
||||
|Metadata: ${extractErrorInfoMetadata(
|
||||
|
@ -165,6 +165,7 @@ final class ConfigManagementServiceIT extends LedgerTestSuite {
|
||||
)
|
||||
assertGrpcErrorOneOf(
|
||||
failure,
|
||||
extendedChecks = None,
|
||||
LedgerApiErrors.Admin.ConfigurationEntryRejected,
|
||||
LedgerApiErrors.RequestValidation.InvalidArgument,
|
||||
KvErrors.Consistency.PostExecutionConflicts,
|
||||
|
Loading…
Reference in New Issue
Block a user