KVL-810 Report the correct gRPC error code on failures due to concurrent transactions (#9218)

* Fix gRPC status codes for inconsistency rejections and DamlLf errors
Also, add unit tests and exclude failing compatibility and conformance tests

CHANGELOG_BEGIN

- [Integration Kit] Fix gRPC status codes for inconsistency rejections and DamlLf errors (ContractNotFound, ReplayMismatch) by changing them from INVALID_ARGUMENT to ABORTED

CHANGELOG_END
This commit is contained in:
Hubert Slojewski 2021-03-26 15:26:33 +01:00 committed by GitHub
parent 3a91cbccba
commit cb422e2f67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 369 additions and 135 deletions

View File

@ -19,12 +19,29 @@ load("//:versions.bzl", "latest_stable_version")
# - ContractKeysIT:
# - https://github.com/digital-asset/daml/pull/5608
# - https://github.com/digital-asset/daml/pull/7829
# - https://github.com/digital-asset/daml/pull/9218
# - ContractKeysSubmitterIsMaintainerIT:
# - https://github.com/digital-asset/daml/pull/5611
# - SemanticTests:
# - https://github.com/digital-asset/daml/pull/9218
last_nongranular_test_tool = "1.3.0-snapshot.20200617.4484.0.7e0a6848"
first_granular_test_tool = "1.3.0-snapshot.20200623.4546.0.4f68cfc4"
# Some of gRPC error codes changed from INVALID_ARGUMENT to ABORTED
# See https://github.com/digital-asset/daml/pull/9218
before_grpc_error_code_breaking_change = "1.12.0-snapshot.20210323.6567.0.90c5ce70"
after_grpc_error_code_breaking_change = "1.12.0-snapshot.20210323.6567.1.90c5ce70"
grpc_error_code_breaking_change_exclusions = [
"SemanticTests:SemanticDoubleSpendBasic",
"SemanticTests:SemanticDoubleSpendShared",
"SemanticTests:SemanticPrivacyProjections",
"SemanticTests:SemanticDivulgence",
"ContractKeysIT:CKFetchOrLookup",
"ContractKeysIT:CKNoFetchUndisclosed",
"ContractKeysIT:CKMaintainerScoped",
]
excluded_test_tool_tests = [
{
"start": "1.0.0",
@ -168,6 +185,24 @@ excluded_test_tool_tests = [
},
],
},
{
"start": after_grpc_error_code_breaking_change,
"platform_ranges": [
{
"end": before_grpc_error_code_breaking_change,
"exclusions": grpc_error_code_breaking_change_exclusions,
},
],
},
{
"end": before_grpc_error_code_breaking_change,
"platform_ranges": [
{
"start": after_grpc_error_code_breaking_change,
"exclusions": grpc_error_code_breaking_change_exclusions,
},
],
},
]
def in_range(version, range):

View File

@ -108,7 +108,7 @@ object Result {
ResultNeedContract(
acoid,
{
case None => ResultError(Error(s"dependency error: couldn't find contract $acoid"))
case None => ResultError(ContractNotFound(acoid))
case Some(contract) => resume(contract)
},
)

View File

@ -28,6 +28,7 @@ service CommandService {
// - ``INVALID_ARGUMENT``: if the payload is malformed or is missing required fields
// - ``RESOURCE_EXHAUSTED``: if the number of in-flight commands reached the maximum (if a limit is configured)
// - ``UNAVAILABLE``: if the participant is not yet ready to submit commands or if the service has been shut down.
// - ``ABORTED``: if a contract key is missing or duplicated due to for example contention on resources
rpc SubmitAndWait (SubmitAndWaitRequest) returns (google.protobuf.Empty);
// Submits a single composite command, waits for its result, and returns the transaction id.
@ -39,6 +40,7 @@ service CommandService {
// - ``INVALID_ARGUMENT``: if the payload is malformed or is missing required fields
// - ``RESOURCE_EXHAUSTED``: if the number of in-flight commands reached the maximum (if a limit is configured)
// - ``UNAVAILABLE``: if the participant is not yet ready to submit commands or if the service has been shut down.
// - ``ABORTED``: if a contract key is missing or duplicated due to for example contention on resources
rpc SubmitAndWaitForTransactionId (SubmitAndWaitRequest) returns (SubmitAndWaitForTransactionIdResponse);
// Submits a single composite command, waits for its result, and returns the transaction.
@ -50,6 +52,7 @@ service CommandService {
// - ``INVALID_ARGUMENT``: if the payload is malformed or is missing required fields
// - ``RESOURCE_EXHAUSTED``: if the number of in-flight commands reached the maximum (if a limit is configured)
// - ``UNAVAILABLE``: if the participant is not yet ready to submit commands or if the service has been shut down.
// - ``ABORTED``: if a contract key is missing or duplicated due to for example contention on resources
rpc SubmitAndWaitForTransaction (SubmitAndWaitRequest) returns (SubmitAndWaitForTransactionResponse);
// Submits a single composite command, waits for its result, and returns the transaction tree.
@ -61,6 +64,7 @@ service CommandService {
// - ``INVALID_ARGUMENT``: if the payload is malformed or is missing required fields
// - ``RESOURCE_EXHAUSTED``: if the number of in-flight commands reached the maximum (if a limit is configured)
// - ``UNAVAILABLE``: if the participant is not yet ready to submit commands or if the service has been shut down.
// - ``ABORTED``: if a contract key is missing or duplicated due to for example contention on resources
rpc SubmitAndWaitForTransactionTree (SubmitAndWaitRequest) returns (SubmitAndWaitForTransactionTreeResponse);
}

View File

@ -41,6 +41,7 @@ service CommandSubmissionService {
// - ``INVALID_ARGUMENT``: if the payload is malformed or is missing required fields
// - ``UNAVAILABLE``: if the participant is not yet ready to submit commands or if the service has been shut down.
// - ``RESOURCE_EXHAUSTED``: if the participant or the ledger is overloaded. Clients should back off exponentially and retry.
// - ``ABORTED``: if a contract key is missing or duplicated due to for example contention on resources
rpc Submit (SubmitRequest) returns (google.protobuf.Empty);
}

View File

@ -947,7 +947,7 @@ abstract class AbstractHttpServiceIntegrationTest
status shouldBe StatusCodes.InternalServerError
assertStatus(output, StatusCodes.InternalServerError)
expectedOneErrorMessage(output) should include(
"couldn't find contract ContractId(#NonExistentContractId)"
"Contract could not be found with id ContractId(#NonExistentContractId)"
)
}: Future[Assertion]
}

View File

@ -463,7 +463,7 @@ final class CommandClientIT
assertCommandFailsWithCode(command, Code.INVALID_ARGUMENT, "requires authorizers")
}
"not accept exercises with bad contract IDs, return INVALID_ARGUMENT" in {
"not accept exercises with bad contract IDs, return ABORTED" in {
val contractId = "#deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef-123"
val command =
submitRequest(
@ -473,7 +473,7 @@ final class CommandClientIT
),
)
assertCommandFailsWithCode(command, Code.INVALID_ARGUMENT, "error")
assertCommandFailsWithCode(command, Code.ABORTED, "error")
}
}
}

View File

@ -74,12 +74,14 @@ conformance_test(
test_tool_args = [
"--verbose",
"--concurrent-test-runs=4", # lowered from default #procs to reduce flakes - details in https://github.com/digital-asset/daml/issues/7316
# The following three contract key tests require uniqueness and are tested in conformance-test-contract-keys below
# The following three contract key tests require uniqueness
"--exclude=ContractKeysIT,ContractKeysIT:CKFetchOrLookup,ContractKeysIT:CKNoFetchUndisclosed,ContractKeysIT:CKMaintainerScoped" +
",ParticipantPruningIT" + # see "conformance-test-participant-pruning" below
",ConfigManagementServiceIT,LedgerConfigurationServiceIT" + # dynamic config management not supported by Canton
",ClosedWorldIT" + # Canton currently fails this test with a different error (missing namespace in "unallocated" party id)
",RaceConditionIT",
",RaceConditionIT" +
# Some of gRPC status codes changed from INVALID_ARGUMENT to ABORTED (see https://github.com/digital-asset/daml/pull/9218):
",SemanticTests:SemanticDoubleSpendBasic,SemanticTests:SemanticDoubleSpendShared,SemanticTests:SemanticPrivacyProjections,SemanticTests:SemanticDivulgence",
],
) if not is_windows else None
@ -113,6 +115,8 @@ conformance_test(
"--concurrent-test-runs=4", # lowered from default #procs to reduce flakes - details in https://github.com/digital-asset/daml/issues/7316
"--include=ContractKeysIT" +
",RaceConditionIT",
# Some of gRPC status codes changed from INVALID_ARGUMENT to ABORTED (see https://github.com/digital-asset/daml/pull/9218):
"--exclude=ContractKeysIT:CKFetchOrLookup,ContractKeysIT:CKNoFetchUndisclosed,ContractKeysIT:CKMaintainerScoped",
],
) if not is_windows else None

View File

@ -60,8 +60,8 @@ final class ContractKeysIT extends LedgerTestSuite {
assertGrpcError(fetchFailure, Status.Code.INVALID_ARGUMENT, "couldn't find key")
assertGrpcError(
lookupByKeyFailure,
Status.Code.INVALID_ARGUMENT,
Some(Pattern.compile("Disputed|Inconsistent")),
Status.Code.ABORTED,
Some(Pattern.compile("Inconsistent")),
)
}
})
@ -100,14 +100,14 @@ final class ContractKeysIT extends LedgerTestSuite {
} yield {
assertGrpcError(
fetchFailure,
Status.Code.INVALID_ARGUMENT,
"dependency error: couldn't find contract",
Status.Code.ABORTED,
"Contract could not be found",
)
assertGrpcError(fetchByKeyFailure, Status.Code.INVALID_ARGUMENT, "couldn't find key")
assertGrpcError(
lookupByKeyFailure,
Status.Code.INVALID_ARGUMENT,
Some(Pattern.compile("Disputed|Inconsistent")),
Status.Code.ABORTED,
Some(Pattern.compile("Inconsistent")),
)
}
})
@ -177,8 +177,8 @@ final class ContractKeysIT extends LedgerTestSuite {
} yield {
assertGrpcError(
duplicateKeyFailure,
Status.Code.INVALID_ARGUMENT,
Some(Pattern.compile("DuplicateKey|Inconsistent")),
Status.Code.ABORTED,
Some(Pattern.compile("Inconsistent")),
)
assertGrpcError(
bobLooksUpTextKeyFailure,

View File

@ -201,8 +201,8 @@ final class MultiPartySubmissionIT extends LedgerTestSuite {
} yield {
assertGrpcError(
failure,
Status.Code.INVALID_ARGUMENT,
Some(Pattern.compile("dependency error: couldn't find contract")),
Status.Code.ABORTED,
Some(Pattern.compile("Contract could not be found")),
)
}
})

View File

@ -59,7 +59,7 @@ final class SemanticTests extends LedgerTestSuite {
.exercise(owner, iou.exerciseTransfer(_, leftWithNothing))
.mustFail("consuming a contract twice")
} yield {
assertGrpcError(failure, Status.Code.INVALID_ARGUMENT, "couldn't find contract")
assertGrpcError(failure, Status.Code.ABORTED, "Contract could not be found")
}
})
@ -142,7 +142,7 @@ final class SemanticTests extends LedgerTestSuite {
.exercise(owner2, shared.exerciseSharedContract_Consume2)
.mustFail("consuming a contract twice")
} yield {
assertGrpcError(failure, Status.Code.INVALID_ARGUMENT, "couldn't find contract")
assertGrpcError(failure, Status.Code.ABORTED, "Contract could not be found")
}
})
@ -308,16 +308,16 @@ final class SemanticTests extends LedgerTestSuite {
.mustFail("fetching the new IOU with the wrong party")
} yield {
assertGrpcError(iouFetchFailure, Status.Code.INVALID_ARGUMENT, "couldn't find contract")
assertGrpcError(iouFetchFailure, Status.Code.ABORTED, "Contract could not be found")
assertGrpcError(
paintOfferFetchFailure,
Status.Code.INVALID_ARGUMENT,
"couldn't find contract",
Status.Code.ABORTED,
"Contract could not be found",
)
assertGrpcError(
paintAgreeFetchFailure,
Status.Code.INVALID_ARGUMENT,
"couldn't find contract",
Status.Code.ABORTED,
"Contract could not be found",
)
assertGrpcError(
secondIouFetchFailure,
@ -384,7 +384,7 @@ final class SemanticTests extends LedgerTestSuite {
beta.exercise(delegate, delegation.exerciseDelegation_Token_Consume(_, token))
}
} yield {
assertGrpcError(failure, Status.Code.INVALID_ARGUMENT, "couldn't find contract")
assertGrpcError(failure, Status.Code.ABORTED, "Contract could not be found")
}
}
)

View File

@ -84,7 +84,7 @@ private[apiserver] final class LedgerTimeAwareCommandExecutor(
Future.successful(Left(ErrorCause.LedgerTime(maxRetries)))
}
})
.recoverWith {
.recover {
// An error while looking up the maximum ledger time for the used contracts
// most likely means that one of the contracts is already not active anymore,
// which can happen under contention.
@ -95,7 +95,7 @@ private[apiserver] final class LedgerTimeAwareCommandExecutor(
s"Lookup of maximum ledger time failed. This can happen if there is contention on contracts used by the transaction. Used contracts: ${usedContractIds
.mkString(", ")}. Details: $error"
)
Future.successful(Left(ErrorCause.LedgerTime(maxRetries - retriesLeft)))
Left(ErrorCause.LedgerTime(maxRetries - retriesLeft))
}
}
}

View File

@ -25,6 +25,7 @@ import com.daml.ledger.participant.state.v1.{
}
import com.daml.lf.crypto
import com.daml.lf.data.Ref.Party
import com.daml.lf.engine.{ContractNotFound, ReplayMismatch}
import com.daml.lf.transaction.SubmittedTransaction
import com.daml.logging.LoggingContext.withEnrichedLoggingContext
import com.daml.logging.{ContextualizedLogger, LoggingContext}
@ -268,10 +269,14 @@ private[apiserver] final class ApiSubmissionService private[services] (
private def toStatus(errorCause: ErrorCause) =
errorCause match {
case e: ErrorCause.DamlLf =>
Status.INVALID_ARGUMENT.withDescription(e.explain)
case e: ErrorCause.LedgerTime =>
Status.ABORTED.withDescription(e.explain)
case cause @ ErrorCause.DamlLf(error) =>
error match {
case ContractNotFound(_) | ReplayMismatch(_) =>
Status.ABORTED.withDescription(cause.explain)
case _ => Status.INVALID_ARGUMENT.withDescription(cause.explain)
}
case cause: ErrorCause.LedgerTime =>
Status.ABORTED.withDescription(cause.explain)
}
override def close(): Unit = ()

View File

@ -5,18 +5,17 @@ package com.daml.platform.store
import java.time.Instant
import com.daml.ledger.participant.state.v1.{Offset, RejectionReason}
import com.daml.api.util.TimestampConversion.fromInstant
import com.daml.lf.data.Ref
import com.daml.ledger.ApplicationId
import com.daml.ledger.api.domain
import com.daml.ledger.api.v1.command_completion_service.{Checkpoint, CompletionStreamResponse}
import com.daml.ledger.api.v1.completion.Completion
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
import com.daml.ledger.participant.state.v1.Offset
import com.daml.lf.data.Ref
import com.daml.platform.ApiOffset.ApiOffsetConverter
import com.daml.platform.store.Conversions.domainRejectionReasonToErrorCode
import com.daml.platform.store.entries.LedgerEntry
import com.google.rpc.status.Status
import io.grpc.Status.Code
// Turn a stream of transactions into a stream of completions for a given application and set of parties
// TODO Restrict the scope of this to com.daml.platform.store.dao when
@ -31,37 +30,6 @@ private[platform] object CompletionFromTransaction {
)
)
// We _rely_ on the following compiler flags for this to be safe:
// * -Xno-patmat-analysis _MUST NOT_ be enabled
// * -Xfatal-warnings _MUST_ be enabled
def toErrorCode(rejection: RejectionReason): Code = {
rejection match {
case _: RejectionReason.Inconsistent | _: RejectionReason.Disputed |
_: RejectionReason.PartyNotKnownOnLedger =>
Code.INVALID_ARGUMENT
case _: RejectionReason.ResourcesExhausted | _: RejectionReason.InvalidLedgerTime =>
Code.ABORTED
case _: RejectionReason.SubmitterCannotActViaParticipant =>
Code.PERMISSION_DENIED
}
}
private def toParticipantRejection(reason: domain.RejectionReason): RejectionReason =
reason match {
case r: domain.RejectionReason.Inconsistent =>
RejectionReason.Inconsistent(r.description)
case r: domain.RejectionReason.Disputed =>
RejectionReason.Disputed(r.description)
case r: domain.RejectionReason.OutOfQuota =>
RejectionReason.ResourcesExhausted(r.description)
case r: domain.RejectionReason.PartyNotKnownOnLedger =>
RejectionReason.PartyNotKnownOnLedger(r.description)
case r: domain.RejectionReason.SubmitterCannotActViaParticipant =>
RejectionReason.SubmitterCannotActViaParticipant(r.description)
case r: domain.RejectionReason.InvalidLedgerTime =>
RejectionReason.InvalidLedgerTime(r.description)
}
// Filter completions for transactions for which we have the full submitter information: appId, submitter, cmdId
// This doesn't make a difference for the sandbox (because it represents the ledger backend + api server in single package).
// But for an api server that is part of a distributed ledger network, we might see
@ -96,7 +64,7 @@ private[platform] object CompletionFromTransaction {
Seq(
Completion(
commandId,
Some(Status(toErrorCode(toParticipantRejection(reason)).value(), reason.description)),
Some(Status(reason.value, reason.description)),
)
),
)

View File

@ -9,10 +9,15 @@ import java.util.Date
import anorm._
import com.daml.ledger.EventId
import com.daml.ledger.participant.state.v1.Offset
import com.daml.ledger.api.domain
import com.daml.ledger.participant.state.v1.RejectionReason._
import com.daml.ledger.participant.state.v1.{Offset, RejectionReason}
import com.daml.lf.crypto.Hash
import com.daml.lf.data.Ref
import com.daml.lf.value.Value
import io.grpc.Status.Code
import scala.language.implicitConversions
private[platform] object Conversions {
@ -227,4 +232,35 @@ private[platform] object Conversions {
}
}
// RejectionReason
implicit def domainRejectionReasonToErrorCode(reason: domain.RejectionReason): Code =
participantRejectionReasonToErrorCode(
domainRejectionReasonToParticipantRejectionReason(
reason
)
)
// We _rely_ on the following compiler flags for this to be safe:
// * -Xno-patmat-analysis _MUST NOT_ be enabled
// * -Xfatal-warnings _MUST_ be enabled
implicit def participantRejectionReasonToErrorCode(reason: RejectionReason): Code = reason match {
case _: Disputed | _: PartyNotKnownOnLedger => Code.INVALID_ARGUMENT
case _: Inconsistent | _: ResourcesExhausted | _: InvalidLedgerTime => Code.ABORTED
case _: SubmitterCannotActViaParticipant => Code.PERMISSION_DENIED
}
implicit def domainRejectionReasonToParticipantRejectionReason(
reason: domain.RejectionReason
): RejectionReason =
reason match {
case r: domain.RejectionReason.Inconsistent => Inconsistent(r.description)
case r: domain.RejectionReason.Disputed => Disputed(r.description)
case r: domain.RejectionReason.OutOfQuota => ResourcesExhausted(r.description)
case r: domain.RejectionReason.PartyNotKnownOnLedger => PartyNotKnownOnLedger(r.description)
case r: domain.RejectionReason.SubmitterCannotActViaParticipant =>
SubmitterCannotActViaParticipant(r.description)
case r: domain.RejectionReason.InvalidLedgerTime => InvalidLedgerTime(r.description)
}
}

View File

@ -6,16 +6,14 @@ package com.daml.platform.store.dao
import java.time.Instant
import anorm.{Row, RowParser, SimpleSql, SqlParser, SqlStringInterpolation, ~}
import com.daml.ledger.participant.state.v1.{Offset, RejectionReason, SubmitterInfo, TransactionId}
import com.daml.ledger.participant.state.v1.RejectionReason._
import com.daml.lf.data.Ref
import com.daml.ledger.ApplicationId
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.daml.ledger.api.v1.completion.Completion
import com.daml.ledger.participant.state.v1.{Offset, RejectionReason, SubmitterInfo, TransactionId}
import com.daml.lf.data.Ref
import com.daml.platform.store.CompletionFromTransaction.toApiCheckpoint
import com.daml.platform.store.Conversions._
import com.daml.platform.store.dao.events.SqlFunctions
import io.grpc.Status.Code
import com.google.rpc.status.Status
private[platform] object CommandCompletionsTable {
@ -71,22 +69,9 @@ private[platform] object CommandCompletionsTable {
offset: Offset,
recordTime: Instant,
reason: RejectionReason,
): SimpleSql[Row] = {
val (code, message) = toStatus(reason)
): SimpleSql[Row] =
SQL"insert into participant_command_completions(completion_offset, record_time, application_id, submitters, command_id, status_code, status_message) values ($offset, $recordTime, ${submitterInfo.applicationId}, ${submitterInfo.actAs
.toArray[String]}, ${submitterInfo.commandId}, $code, $message)"
}
private def toStatus(rejection: RejectionReason): (Int, String) = {
rejection match {
case _: Inconsistent | _: Disputed | _: PartyNotKnownOnLedger =>
Code.INVALID_ARGUMENT.value() -> rejection.description
case _: ResourcesExhausted | _: InvalidLedgerTime =>
Code.ABORTED.value() -> rejection.description
case _: SubmitterCannotActViaParticipant =>
Code.PERMISSION_DENIED.value() -> rejection.description
}
}
.toArray[String]}, ${submitterInfo.commandId}, ${reason.value()}, ${reason.description})"
def prepareCompletionsDelete(endInclusive: Offset): SimpleSql[Row] =
SQL"delete from participant_command_completions where completion_offset <= $endInclusive"

View File

@ -595,7 +595,7 @@ private class JdbcLedgerDao(
submitterInfo = SubmitterInfo(actAs, applicationId, commandId, Instant.EPOCH),
offset = offset,
recordTime = recordTime,
reason = toParticipantRejection(reason),
reason = reason,
).execute()
}
}
@ -603,22 +603,6 @@ private class JdbcLedgerDao(
}
}
private def toParticipantRejection(reason: domain.RejectionReason): RejectionReason =
reason match {
case r: domain.RejectionReason.Inconsistent =>
RejectionReason.Inconsistent(r.description)
case r: domain.RejectionReason.Disputed =>
RejectionReason.Disputed(r.description)
case r: domain.RejectionReason.OutOfQuota =>
RejectionReason.ResourcesExhausted(r.description)
case r: domain.RejectionReason.PartyNotKnownOnLedger =>
RejectionReason.PartyNotKnownOnLedger(r.description)
case r: domain.RejectionReason.SubmitterCannotActViaParticipant =>
RejectionReason.SubmitterCannotActViaParticipant(r.description)
case r: domain.RejectionReason.InvalidLedgerTime =>
RejectionReason.InvalidLedgerTime(r.description)
}
private val PageSize = 100
override def lookupMaximumLedgerTime(

View File

@ -12,11 +12,11 @@ import com.daml.ledger.api.v1.command_completion_service.CompletionStreamRespons
import com.daml.ledger.participant.state.v1.{Offset, RejectionReason, SubmitterInfo}
import com.daml.lf.data.Ref.Party
import com.daml.platform.ApiOffset
import com.daml.platform.store.CompletionFromTransaction
import org.scalatest.{LoneElement, OptionValues}
import com.daml.platform.store.Conversions.participantRejectionReasonToErrorCode
import com.daml.platform.store.dao.JdbcLedgerDaoCompletionsSpec._
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers
import com.daml.platform.store.dao.JdbcLedgerDaoCompletionsSpec._
import org.scalatest.{LoneElement, OptionValues}
import scala.concurrent.Future
@ -183,7 +183,7 @@ private[dao] trait JdbcLedgerDaoCompletionsSpec extends OptionValues with LoneEl
responses should have length reasons.length.toLong
val returnedCodes = responses.flatMap(_.completions.map(_.status.get.code))
for ((reason, code) <- reasons.zip(returnedCodes)) {
code shouldBe CompletionFromTransaction.toErrorCode(reason).value()
code shouldBe participantRejectionReasonToErrorCode(reason).value
}
succeed
}

View File

@ -35,7 +35,7 @@ private[dao] trait JdbcLedgerDaoPostCommitValidationSpec extends LoneElement {
)
private val ok = io.grpc.Status.Code.OK.value()
private val invalid = io.grpc.Status.Code.INVALID_ARGUMENT.value()
private val aborted = io.grpc.Status.Code.ABORTED.value()
behavior of "JdbcLedgerDao (post-commit validation)"
@ -56,7 +56,7 @@ private[dao] trait JdbcLedgerDaoPostCommitValidationSpec extends LoneElement {
} yield {
completions should contain.allOf(
originalAttempt.commandId.get -> ok,
duplicateAttempt.commandId.get -> invalid,
duplicateAttempt.commandId.get -> aborted,
)
}
}
@ -76,7 +76,7 @@ private[dao] trait JdbcLedgerDaoPostCommitValidationSpec extends LoneElement {
} yield {
completions should contain.allOf(
create.commandId.get -> ok,
lookup.commandId.get -> invalid,
lookup.commandId.get -> aborted,
)
}
}
@ -99,7 +99,7 @@ private[dao] trait JdbcLedgerDaoPostCommitValidationSpec extends LoneElement {
completions should contain.allOf(
create.commandId.get -> ok,
archive.commandId.get -> ok,
lookup.commandId.get -> invalid,
lookup.commandId.get -> aborted,
)
}
}
@ -122,7 +122,7 @@ private[dao] trait JdbcLedgerDaoPostCommitValidationSpec extends LoneElement {
completions should contain.allOf(
create.commandId.get -> ok,
archive.commandId.get -> ok,
fetch.commandId.get -> invalid,
fetch.commandId.get -> aborted,
)
}
}
@ -143,7 +143,7 @@ private[dao] trait JdbcLedgerDaoPostCommitValidationSpec extends LoneElement {
completions <- getCompletions(from, to, defaultAppId, Set(alice))
} yield {
completions should contain.allOf(
fetch1.commandId.get -> invalid,
fetch1.commandId.get -> aborted,
divulgence.commandId.get -> ok,
fetch2.commandId.get -> ok,
)

View File

@ -3,25 +3,48 @@
package com.daml.platform.apiserver.services
import java.time.{Duration, Instant}
import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicInteger
import com.daml.ledger.api.domain.PartyDetails
import akka.stream.Materializer
import com.codahale.metrics.{Meter, MetricRegistry}
import com.daml.api.util.TimeProvider
import com.daml.ledger.api.DomainMocks
import com.daml.ledger.api.domain.LedgerOffset.Absolute
import com.daml.ledger.api.domain.{CommandId, Commands, LedgerId, PartyDetails}
import com.daml.ledger.api.messages.command.submission.SubmitRequest
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import com.daml.ledger.participant.state.index.v2.{
CommandDeduplicationNew,
IndexConfigManagementService,
IndexPartyManagementService,
IndexSubmissionService,
}
import com.daml.ledger.participant.state.v1.{Party, SubmissionId, SubmissionResult, WriteService}
import com.daml.lf.data.Ref
import com.daml.ledger.participant.state.v1._
import com.daml.ledger.resources.TestResourceContext
import com.daml.lf.command.{Commands => LfCommands}
import com.daml.lf.crypto.Hash
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.data.{ImmArray, Ref}
import com.daml.lf.engine._
import com.daml.lf.transaction.ReplayNodeMismatch
import com.daml.lf.transaction.test.TransactionBuilder
import com.daml.lf.value.Value
import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
import com.daml.platform.apiserver.execution.CommandExecutor
import com.daml.platform.configuration.LedgerConfiguration
import com.daml.platform.store.ErrorCause
import io.grpc.Status
import org.mockito.captor.ArgCaptor
import org.mockito.{ArgumentMatchersSugar, Mockito, MockitoSugar}
import org.scalatest.{BeforeAndAfter, OneInstancePerTest}
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.{BeforeAndAfter, OneInstancePerTest, Succeeded}
import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
class ApiSubmissionServiceSpec
extends AsyncFlatSpec
@ -29,7 +52,9 @@ class ApiSubmissionServiceSpec
with MockitoSugar
with ArgumentMatchersSugar
with OneInstancePerTest
with BeforeAndAfter {
with BeforeAndAfter
with AkkaBeforeAndAfterAll
with TestResourceContext {
private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting
private val builder = TransactionBuilder()
@ -58,7 +83,7 @@ class ApiSubmissionServiceSpec
private val partyManagementService = mock[IndexPartyManagementService]
private val writeService = mock[WriteService]
private val service =
private val defaultSubmissionService =
submissionService(writeService, partyManagementService, implicitPartyAllocation = true)
before {
@ -81,6 +106,7 @@ class ApiSubmissionServiceSpec
)
for {
service <- defaultSubmissionService
results <- service.allocateMissingInformees(transaction)
} yield {
results.find(_ != SubmissionResult.Acknowledged) shouldBe None
@ -106,6 +132,7 @@ class ApiSubmissionServiceSpec
)
for {
service <- defaultSubmissionService
result <- service.allocateMissingInformees(transaction)
} yield {
verifyZeroInteractions(writeService)
@ -114,8 +141,8 @@ class ApiSubmissionServiceSpec
}
it should "not allocate missing informees if implicit party allocation is disabled" in {
val service = submissionService(null, null, implicitPartyAllocation = false)
for {
service <- submissionService(null, null, implicitPartyAllocation = false)
result <- service.allocateMissingInformees(transaction)
} yield {
result shouldBe Seq.empty[SubmissionResult]
@ -145,26 +172,131 @@ class ApiSubmissionServiceSpec
val transaction = builder.buildSubmitted()
for {
service <- defaultSubmissionService
result <- service.allocateMissingInformees(transaction)
} yield {
result shouldBe Seq(submissionFailure)
}
}
def submissionService(
behavior of "submit"
it should "return proper gRPC status codes for DamlLf errors" in {
val errorsToStatuses = Map(
ErrorCause.DamlLf(ContractNotFound(null)) -> Status.ABORTED,
ErrorCause.DamlLf(
ReplayMismatch(ReplayNodeMismatch(null, null, null, null))
) -> Status.ABORTED,
ErrorCause.DamlLf(ValidationError("")) -> Status.INVALID_ARGUMENT,
ErrorCause.DamlLf(AuthorizationError("")) -> Status.INVALID_ARGUMENT,
ErrorCause.DamlLf(SerializationError("")) -> Status.INVALID_ARGUMENT,
ErrorCause.LedgerTime(0) -> Status.ABORTED,
)
val commandId = new AtomicInteger()
val mockCommandExecutor = mock[CommandExecutor]
Future
.sequence(errorsToStatuses.map { case (error, code) =>
val submitRequest = SubmitRequest(
Commands(
ledgerId = LedgerId("ledger-id"),
workflowId = None,
applicationId = DomainMocks.applicationId,
commandId = CommandId(
Ref.LedgerString.assertFromString(s"commandId-${commandId.incrementAndGet()}")
),
actAs = Set.empty,
readAs = Set.empty,
submittedAt = Instant.MIN,
deduplicateUntil = Instant.MIN,
commands = LfCommands(ImmArray.empty, Timestamp.MinValue, ""),
),
None,
)
when(
mockCommandExecutor.execute(eqTo(submitRequest.commands), any[Hash])(
any[ExecutionContext],
any[LoggingContext],
)
).thenReturn(Future.successful(Left(error)))
for {
service <- submissionService(
writeService,
partyManagementService,
implicitPartyAllocation = true,
commandExecutor = mockCommandExecutor,
)
assertions <- service.submit(submitRequest).transform {
case Success(_) => fail()
case Failure(e) => Success(e.getMessage should startWith(code.getCode.toString))
}
} yield assertions
})
.map(_.forall(_ == Succeeded))
.map(assert(_))
}
private def submissionService(
writeService: WriteService,
partyManagementService: IndexPartyManagementService,
implicitPartyAllocation: Boolean,
) = new ApiSubmissionService(
writeService = writeService,
submissionService = mock[IndexSubmissionService],
partyManagementService = partyManagementService,
timeProvider = null,
timeProviderType = null,
ledgerConfigProvider = null,
seedService = null,
commandExecutor = null,
configuration = ApiSubmissionService.Configuration(implicitPartyAllocation),
metrics = null,
)
commandExecutor: CommandExecutor = null,
)(implicit materializer: Materializer) = {
val mockMetricRegistry = mock[MetricRegistry]
val mockIndexSubmissionService = mock[IndexSubmissionService]
val mockConfigManagementService = mock[IndexConfigManagementService]
val configuration = Configuration(0L, TimeModel.reasonableDefault, Duration.ZERO)
when(mockMetricRegistry.meter(any[String])).thenReturn(new Meter())
when(
mockIndexSubmissionService.deduplicateCommand(
any[CommandId],
anyList[Ref.Party],
any[Instant],
any[Instant],
)(any[LoggingContext])
).thenReturn(Future.successful(CommandDeduplicationNew))
when(
mockIndexSubmissionService.stopDeduplicatingCommand(any[CommandId], anyList[Ref.Party])(
any[LoggingContext]
)
).thenReturn(Future.unit)
when(mockConfigManagementService.lookupConfiguration())
.thenReturn(
Future.successful(
Some((Absolute(Ref.LedgerString.assertFromString("offset")), configuration))
)
)
val configProviderResource = LedgerConfigProvider
.owner(
mockConfigManagementService,
optWriteService = Some(writeService),
timeProvider = mock[TimeProvider],
config = LedgerConfiguration(
initialConfiguration = configuration,
initialConfigurationSubmitDelay = Duration.ZERO,
configurationLoadTimeout = Duration.ZERO,
),
)
.acquire()
for {
configProvider <- configProviderResource.asFuture
_ <- configProviderResource.release()
} yield {
new ApiSubmissionService(
writeService = writeService,
submissionService = mockIndexSubmissionService,
partyManagementService = partyManagementService,
timeProvider = null,
timeProviderType = null,
ledgerConfigProvider = configProvider,
seedService = SeedService.WeakRandom,
commandExecutor = commandExecutor,
configuration = ApiSubmissionService.Configuration(implicitPartyAllocation),
metrics = new Metrics(mockMetricRegistry),
)
}
}
}

View File

@ -41,7 +41,7 @@ private[keys] object KeyMonotonicityValidation {
recordTime,
transactionCommitter.buildRejectionLogEntry(
transactionEntry,
RejectionReason.Inconsistent("Causal monotonicity violated"),
RejectionReason.InvalidLedgerTime("Causal monotonicity violated"),
),
)
}

View File

@ -0,0 +1,80 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils.committer.transaction.keys
import java.time.{Instant, ZoneOffset, ZonedDateTime}
import com.daml.ledger.participant.state.kvutils.Conversions
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
import com.daml.ledger.participant.state.kvutils.committer.StepContinue
import com.daml.ledger.participant.state.kvutils.committer.transaction.TransactionCommitter
import com.daml.ledger.participant.state.kvutils.committer.transaction.TransactionCommitter.DamlTransactionEntrySummary
import com.daml.ledger.participant.state.v1.RejectionReason
import com.google.protobuf.ByteString
import org.mockito.{ArgumentMatchersSugar, MockitoSugar}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec
class KeyMonotonicityValidationSpec
extends AsyncWordSpec
with Matchers
with MockitoSugar
with ArgumentMatchersSugar {
private val testKey = DamlStateKey.newBuilder().build()
private val testSubmissionSeed = ByteString.copyFromUtf8("a" * 32)
private val ledgerEffectiveTime =
ZonedDateTime.of(2021, 1, 1, 12, 0, 0, 0, ZoneOffset.UTC).toInstant
private val testTransactionEntry = DamlTransactionEntrySummary(
DamlTransactionEntry
.newBuilder()
.setSubmissionSeed(testSubmissionSeed)
.setLedgerEffectiveTime(Conversions.buildTimestamp(ledgerEffectiveTime))
.build()
)
"checkContractKeysCausalMonotonicity" should {
"create StepContinue in case of correct keys" in {
KeyMonotonicityValidation.checkContractKeysCausalMonotonicity(
mock[TransactionCommitter],
None,
Set(testKey),
Map(testKey -> aStateValueActiveAt(ledgerEffectiveTime.minusSeconds(1))),
testTransactionEntry,
) shouldBe StepContinue(testTransactionEntry)
}
"reject transaction in case of incorrect keys" in {
val mockTransactionCommitter = mock[TransactionCommitter]
KeyMonotonicityValidation
.checkContractKeysCausalMonotonicity(
mockTransactionCommitter,
None,
Set(testKey),
Map(testKey -> aStateValueActiveAt(ledgerEffectiveTime.plusSeconds(1))),
testTransactionEntry,
)
verify(mockTransactionCommitter).buildRejectionLogEntry(
eqTo(testTransactionEntry),
any[RejectionReason.InvalidLedgerTime],
)
verify(mockTransactionCommitter).reject(
eqTo(None),
any[DamlTransactionRejectionEntry.Builder],
)
succeed
}
}
private def aStateValueActiveAt(activeAt: Instant) = DamlStateValue
.newBuilder()
.setContractKeyState(
DamlContractKeyState
.newBuilder()
.setActiveAt(Conversions.buildTimestamp(activeAt))
)
.build()
}