mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 01:07:18 +03:00
Fix automatic retry mechanism in scala bindings. (#3061)
* Fix automatic retry mechanism in scala bindings. This fix only affects the usage of com.digitalasset.ledger.client.binding.LedgerClientBinding#retryingConfirmedCommands The retry mechanism didn't distinguish between submission failures and final completion failures. Retrying completion failures with the same commandId doesn't make sense, as the deduplication mechanism will kick in. The new mechanism now only retries commands with an updated LET and MRT in case the server responds with a RESOURCE_EXHAUSTED status code (i.e. backpressure). Fixes #3057. * Add UNAVAILABLE as retryable error code * fix unreleased.rst
This commit is contained in:
parent
06a4d56025
commit
e310cbe143
@ -72,12 +72,12 @@ object CommandRetryFlow {
|
||||
} else if ((firstSubmissionTime plus maxRetryTime) isBefore timeProvider.getCurrentTime) {
|
||||
RetryLogger.logStopRetrying(request, status, nrOfRetries, firstSubmissionTime)
|
||||
PROPAGATE_PORT
|
||||
} else if (NON_RETRYABLE_ERROR_CODES.contains(status.code)) {
|
||||
RetryLogger.logFatal(request, status, nrOfRetries)
|
||||
PROPAGATE_PORT
|
||||
} else {
|
||||
} else if (RETRYABLE_ERROR_CODES.contains(status.code)) {
|
||||
RetryLogger.logNonFatal(request, status, nrOfRetries)
|
||||
RETRY_PORT
|
||||
} else {
|
||||
RetryLogger.logFatal(request, status, nrOfRetries)
|
||||
PROPAGATE_PORT
|
||||
}
|
||||
case Ctx(_, Completion(commandId, _, _, _)) =>
|
||||
statusNotFoundError(commandId)
|
||||
@ -97,7 +97,8 @@ object CommandRetryFlow {
|
||||
FlowShape(merge.in(PROPAGATE_PORT), retryDecider.out(PROPAGATE_PORT))
|
||||
})
|
||||
|
||||
private val NON_RETRYABLE_ERROR_CODES = Set(Code.INVALID_ARGUMENT_VALUE)
|
||||
private[retrying] val RETRYABLE_ERROR_CODES =
|
||||
Set(Code.RESOURCE_EXHAUSTED_VALUE, Code.UNAVAILABLE_VALUE)
|
||||
|
||||
private def statusNotFoundError(commandId: String): Int =
|
||||
throw new RuntimeException(s"Status for command $commandId is missing.")
|
||||
|
@ -95,16 +95,33 @@ class CommandRetryFlowUT extends AsyncWordSpec with Matchers with AkkaTest {
|
||||
}
|
||||
}
|
||||
|
||||
"fail INVALID_ARGUMENT status" in {
|
||||
submitRequest(Code.INVALID_ARGUMENT_VALUE, Instant.ofEpochSecond(45)) map { result =>
|
||||
"fail on all codes but RESOURCE_EXHAUSTED and UNAVAILABLE status" in {
|
||||
val codesToFail =
|
||||
Code
|
||||
.values()
|
||||
.toList
|
||||
.filterNot(c =>
|
||||
c == Code.UNRECOGNIZED || CommandRetryFlow.RETRYABLE_ERROR_CODES.contains(c.getNumber))
|
||||
val failedSubmissions = codesToFail.map { code =>
|
||||
submitRequest(code.getNumber, Instant.ofEpochSecond(45)) map { result =>
|
||||
result.size shouldBe 1
|
||||
result.head.context.nrOfRetries shouldBe 0
|
||||
result.head.value.status.get.code shouldBe code.getNumber
|
||||
}
|
||||
}
|
||||
Future.sequence(failedSubmissions).map(_ => succeed)
|
||||
}
|
||||
|
||||
"retry RESOURCE_EXHAUSTED status" in {
|
||||
submitRequest(Code.RESOURCE_EXHAUSTED_VALUE, Instant.ofEpochSecond(45)) map { result =>
|
||||
result.size shouldBe 1
|
||||
result.head.context.nrOfRetries shouldBe 0
|
||||
result.head.value.status.get.code shouldBe Code.INVALID_ARGUMENT_VALUE
|
||||
result.head.context.nrOfRetries shouldBe 1
|
||||
result.head.value.status.get.code shouldBe Code.OK_VALUE
|
||||
}
|
||||
}
|
||||
|
||||
"retry ABORTED status" in {
|
||||
submitRequest(Code.ABORTED_VALUE, Instant.ofEpochSecond(45)) map { result =>
|
||||
"retry UNAVAILABLE status" in {
|
||||
submitRequest(Code.UNAVAILABLE_VALUE, Instant.ofEpochSecond(45)) map { result =>
|
||||
result.size shouldBe 1
|
||||
result.head.context.nrOfRetries shouldBe 1
|
||||
result.head.value.status.get.code shouldBe Code.OK_VALUE
|
||||
@ -112,10 +129,10 @@ class CommandRetryFlowUT extends AsyncWordSpec with Matchers with AkkaTest {
|
||||
}
|
||||
|
||||
"stop retrying after maxRetryTime" in {
|
||||
submitRequest(Code.ABORTED_VALUE, Instant.ofEpochSecond(15)) map { result =>
|
||||
submitRequest(Code.RESOURCE_EXHAUSTED_VALUE, Instant.ofEpochSecond(15)) map { result =>
|
||||
result.size shouldBe 1
|
||||
result.head.context.nrOfRetries shouldBe 0
|
||||
result.head.value.status.get.code shouldBe Code.ABORTED_VALUE
|
||||
result.head.value.status.get.code shouldBe Code.RESOURCE_EXHAUSTED_VALUE
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -21,4 +21,6 @@ HEAD — ongoing
|
||||
+ [SQL Extractor] The format used for storing Optional and Map values found in contracts
|
||||
as JSON has been replaced with :doc:`/json-api/lf-value-specification`. See `issue
|
||||
#3066 <https://github.com/digital-asset/daml/issues/3066>`_ for specifics.
|
||||
+ [Scala Codegen] Fixes for StackOverflowErrors in reading large LF archives. See `issue #3104 <https://github.com/digital-asset/daml/issues/3104>`_.
|
||||
+ [Scala Codegen] Fixes for StackOverflowErrors in reading large LF archives. See `issue #3104 <https://github.com/digital-asset/daml/issues/3104>`_.
|
||||
+ [Scala Bindings] Fixed a bug in the retry logic of ``LedgerClientBinding#retryingConfirmedCommands``. Commands are now only retried when the server responds with status ``RESOURCE_EXHAUSTED`` or ``UNAVAILABLE``.
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user