mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 01:07:18 +03:00
Add conformance test for command deduplication using the CommandService [KVL-1099] (#10883)
* Add conformance test for command deduplication using the CommandService CHANGELOG_BEGIN CHANGELOG_END
This commit is contained in:
parent
8a391189a6
commit
f4adee91ca
@ -16,10 +16,12 @@ import com.daml.ledger.api.testtool.infrastructure.Assertions.fail
|
||||
import com.daml.ledger.api.testtool.infrastructure.LedgerTestSuite
|
||||
import com.daml.ledger.api.testtool.infrastructure.ProtobufConverters._
|
||||
import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext
|
||||
import com.daml.ledger.api.v1.command_service.SubmitAndWaitRequest
|
||||
import com.daml.ledger.api.v1.command_submission_service.SubmitRequest
|
||||
import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod
|
||||
import com.daml.ledger.client.binding.Primitive
|
||||
import com.daml.ledger.client.binding.Primitive.Party
|
||||
import com.daml.ledger.test.model.Test.DummyWithAnnotation
|
||||
import com.daml.ledger.test.model.Test.{Dummy, DummyWithAnnotation}
|
||||
import io.grpc.Status
|
||||
import io.grpc.Status.Code
|
||||
|
||||
@ -33,12 +35,11 @@ import scala.concurrent.{ExecutionContext, Future}
|
||||
class AppendOnlyCommandDeduplicationParallelIT extends LedgerTestSuite {
|
||||
|
||||
test(
|
||||
s"DeduplicateParallelSubmissions",
|
||||
"Commands submitted at the same, in parallel, should be deduplicated",
|
||||
s"DeduplicateParallelSubmissionsUsingCommandSubmissionService",
|
||||
"Commands submitted at the same, in parallel, using the CommandSubmissionService, should be deduplicated",
|
||||
allocate(SingleParty),
|
||||
)(implicit ec => { case Participants(Participant(ledger, party)) =>
|
||||
val deduplicationDuration = 3.seconds
|
||||
val numberOfParallelRequests = 10
|
||||
lazy val request = ledger
|
||||
.submitRequest(party, DummyWithAnnotation(party, "Duplicate").create.command)
|
||||
.update(
|
||||
@ -46,35 +47,93 @@ class AppendOnlyCommandDeduplicationParallelIT extends LedgerTestSuite {
|
||||
deduplicationDuration.asProtobuf
|
||||
)
|
||||
)
|
||||
Future
|
||||
.traverse(Seq.fill(numberOfParallelRequests)(request))(request => {
|
||||
submitRequestAndGetStatusCode(ledger)(request, party)
|
||||
})
|
||||
.map(_.groupBy(identity).view.mapValues(_.size).toMap)
|
||||
.map(responses => {
|
||||
val expectedDuplicateResponses = numberOfParallelRequests - 1
|
||||
val okResponses = responses.getOrElse(Code.OK, 0)
|
||||
val alreadyExistsResponses = responses.getOrElse(Code.ALREADY_EXISTS, 0)
|
||||
// Participant-based command de-duplication can currently also reject duplicates via a SQL exception
|
||||
val internalResponses = responses.getOrElse(Code.INTERNAL, 0)
|
||||
// Canton can return ABORTED for duplicate submissions
|
||||
val abortedResponses = responses.getOrElse(Code.ABORTED, 0)
|
||||
val duplicateResponses =
|
||||
alreadyExistsResponses + internalResponses + abortedResponses
|
||||
assert(
|
||||
okResponses == 1 && duplicateResponses == numberOfParallelRequests - 1,
|
||||
s"Expected $expectedDuplicateResponses duplicate responses and one accepted, got $responses",
|
||||
)
|
||||
})
|
||||
runTestWithSubmission[SubmitRequest](
|
||||
ledger,
|
||||
party,
|
||||
request,
|
||||
submitRequestAndGetStatusCode(ledger)(_, party),
|
||||
)
|
||||
})
|
||||
|
||||
test(
|
||||
s"DeduplicateParallelSubmissionsUsingCommandService",
|
||||
"Commands submitted at the same, in parallel, using the CommandService, should be deduplicated",
|
||||
allocate(SingleParty),
|
||||
runConcurrently = false,
|
||||
)(implicit ec => { case Participants(Participant(ledger, party)) =>
|
||||
val deduplicationDuration = 3.seconds
|
||||
val request = ledger
|
||||
.submitAndWaitRequest(party, Dummy(party).create.command)
|
||||
.update(
|
||||
_.commands.deduplicationDuration := deduplicationDuration.asProtobuf
|
||||
)
|
||||
runTestWithSubmission[SubmitAndWaitRequest](
|
||||
ledger,
|
||||
party,
|
||||
request,
|
||||
submitAndWaitRequestAndGetStatusCode(ledger)(_, party),
|
||||
)
|
||||
})
|
||||
|
||||
private def runTestWithSubmission[T](
|
||||
ledger: ParticipantTestContext,
|
||||
party: Party,
|
||||
request: T,
|
||||
submitRequestAndGetStatus: T => Future[Code],
|
||||
)(implicit
|
||||
ec: ExecutionContext
|
||||
) = {
|
||||
val numberOfParallelRequests = 10
|
||||
for {
|
||||
responses <- Future
|
||||
.traverse(Seq.fill(numberOfParallelRequests)(request))(request => {
|
||||
submitRequestAndGetStatus(request)
|
||||
})
|
||||
.map(_.groupBy(identity).view.mapValues(_.size).toMap)
|
||||
activeContracts <- ledger.activeContracts(party)
|
||||
} yield {
|
||||
val expectedDuplicateResponses = numberOfParallelRequests - 1
|
||||
val okResponses = responses.getOrElse(Code.OK, 0)
|
||||
val alreadyExistsResponses = responses.getOrElse(Code.ALREADY_EXISTS, 0)
|
||||
// Participant-based command de-duplication can currently also reject duplicates via a SQL exception when using the CommandSubmissionService
|
||||
val internalResponses = responses.getOrElse(Code.INTERNAL, 0)
|
||||
// Canton can return ABORTED for duplicate submissions
|
||||
// Participant based command de-duplication can currently also return ABORTED when using the CommandService
|
||||
val abortedResponses = responses.getOrElse(Code.ABORTED, 0)
|
||||
val duplicateResponses =
|
||||
alreadyExistsResponses + internalResponses + abortedResponses
|
||||
assert(
|
||||
okResponses == 1 && duplicateResponses == numberOfParallelRequests - 1,
|
||||
s"Expected $expectedDuplicateResponses duplicate responses and one accepted, got $responses",
|
||||
)
|
||||
assert(activeContracts.size == 1)
|
||||
}
|
||||
}
|
||||
|
||||
private def submitAndWaitRequestAndGetStatusCode(
|
||||
ledger: ParticipantTestContext
|
||||
)(request: SubmitAndWaitRequest, parties: Party*)(implicit ec: ExecutionContext) = {
|
||||
val submissionId = UUID.randomUUID().toString
|
||||
val requestWithSubmissionId = request.update(_.commands.submissionId := submissionId)
|
||||
val submitResult = ledger.submitAndWait(requestWithSubmissionId)
|
||||
submissionResultToFinalStatusCode(ledger)(submitResult, submissionId, parties: _*)
|
||||
}
|
||||
protected def submitRequestAndGetStatusCode(
|
||||
ledger: ParticipantTestContext
|
||||
)(request: SubmitRequest, parties: Party*)(implicit ec: ExecutionContext): Future[Code] = {
|
||||
val submissionId = UUID.randomUUID().toString
|
||||
val requestWithSubmissionId = request.update(_.commands.submissionId := submissionId)
|
||||
ledger
|
||||
val submitResult = ledger
|
||||
.submit(requestWithSubmissionId)
|
||||
submissionResultToFinalStatusCode(ledger)(submitResult, submissionId, parties: _*)
|
||||
}
|
||||
|
||||
private def submissionResultToFinalStatusCode(
|
||||
ledger: ParticipantTestContext
|
||||
)(submitResult: Future[Unit], submissionId: String, parties: Primitive.Party*)(implicit
|
||||
ec: ExecutionContext
|
||||
) = {
|
||||
submitResult
|
||||
.flatMap(_ => ledger.findCompletion(parties: _*)(_.submissionId == submissionId))
|
||||
.map {
|
||||
case Some(completion) =>
|
||||
|
@ -213,6 +213,7 @@ conformance_test(
|
||||
"--participant=participant-id=example1,port=6865",
|
||||
"--participant=participant-id=example2,port=6866",
|
||||
"--max-deduplication-duration=PT5S",
|
||||
"--tracker-retention-period=PT5S", # lower the command timeout duration, this is a workaround DPP-609
|
||||
],
|
||||
test_tool_args = [
|
||||
"--verbose",
|
||||
|
@ -317,12 +317,12 @@ conformance_test(
|
||||
tags = [],
|
||||
test_tool_args = [
|
||||
"--verbose",
|
||||
"--additional=AppendOnlyCommandDeduplicationParallelIT",
|
||||
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandService",
|
||||
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService",
|
||||
"--additional=AppendOnlyKVCommandDeduplicationIT",
|
||||
"--additional=ParticipantPruningIT",
|
||||
"--additional=MultiPartySubmissionIT",
|
||||
"--additional=AppendOnlyCommandDeduplicationParallelIT",
|
||||
"--additional=AppendOnlyKVCommandDeduplicationIT",
|
||||
"--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant
|
||||
# Disable tests targeting only multi-participant setups
|
||||
"--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence",
|
||||
@ -340,16 +340,17 @@ conformance_test(
|
||||
"--mutable-contract-state-cache",
|
||||
"--jdbc-url=jdbc:h2:mem:daml-on-sql-conformance-test",
|
||||
"--max-deduplication-duration=PT5S",
|
||||
"--tracker-retention-period=PT5S", # lower the command timeout duration, this is a workaround DPP-609
|
||||
],
|
||||
tags = [],
|
||||
test_tool_args = [
|
||||
"--verbose",
|
||||
"--additional=AppendOnlyCommandDeduplicationParallelIT",
|
||||
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandService",
|
||||
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService",
|
||||
"--additional=AppendOnlyKVCommandDeduplicationIT",
|
||||
"--additional=ParticipantPruningIT",
|
||||
"--additional=MultiPartySubmissionIT",
|
||||
"--additional=AppendOnlyCommandDeduplicationParallelIT",
|
||||
"--additional=AppendOnlyKVCommandDeduplicationIT",
|
||||
"--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant
|
||||
# Disable tests targeting only multi-participant setups
|
||||
"--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence",
|
||||
@ -366,16 +367,17 @@ conformance_test(
|
||||
"--index-append-only-schema",
|
||||
"--mutable-contract-state-cache",
|
||||
"--max-deduplication-duration=PT5S",
|
||||
"--tracker-retention-period=PT5S", # lower the command timeout duration, this is a workaround DPP-609
|
||||
],
|
||||
tags = [] if oracle_testing else ["manual"],
|
||||
test_tool_args = [
|
||||
"--verbose",
|
||||
"--additional=AppendOnlyCommandDeduplicationParallelIT",
|
||||
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandService",
|
||||
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService",
|
||||
"--additional=AppendOnlyKVCommandDeduplicationIT",
|
||||
"--additional=ParticipantPruningIT",
|
||||
"--additional=MultiPartySubmissionIT",
|
||||
"--additional=AppendOnlyCommandDeduplicationParallelIT",
|
||||
"--additional=AppendOnlyKVCommandDeduplicationIT",
|
||||
"--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant
|
||||
# Disable tests targeting only multi-participant setups
|
||||
"--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence",
|
||||
@ -418,12 +420,12 @@ conformance_test(
|
||||
tags = [],
|
||||
test_tool_args = [
|
||||
"--verbose",
|
||||
"--additional=AppendOnlyCommandDeduplicationParallelIT",
|
||||
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandService",
|
||||
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService",
|
||||
"--additional=AppendOnlyKVCommandDeduplicationIT",
|
||||
"--additional=ParticipantPruningIT",
|
||||
"--additional=MultiPartySubmissionIT",
|
||||
"--additional=AppendOnlyCommandDeduplicationParallelIT",
|
||||
"--additional=AppendOnlyKVCommandDeduplicationIT",
|
||||
"--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant
|
||||
# Disable tests targeting only multi-participant setups
|
||||
"--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence",
|
||||
@ -441,16 +443,17 @@ conformance_test(
|
||||
"--mutable-contract-state-cache",
|
||||
"--buffered-ledger-api-streams-unsafe",
|
||||
"--max-deduplication-duration=PT5S",
|
||||
"--tracker-retention-period=PT5S", # lower the command timeout duration, this is a workaround DPP-609
|
||||
],
|
||||
tags = [] if oracle_testing else ["manual"],
|
||||
test_tool_args = [
|
||||
"--verbose",
|
||||
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandService",
|
||||
"--additional=AppendOnlyCommandDeduplicationParallelIT",
|
||||
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService",
|
||||
"--additional=AppendOnlyKVCommandDeduplicationIT",
|
||||
"--additional=ParticipantPruningIT",
|
||||
"--additional=MultiPartySubmissionIT",
|
||||
"--additional=AppendOnlyCommandDeduplicationParallelIT",
|
||||
"--additional=AppendOnlyKVCommandDeduplicationIT",
|
||||
"--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant
|
||||
# Disable tests targeting only multi-participant setups
|
||||
"--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence",
|
||||
|
Loading…
Reference in New Issue
Block a user