ledger-api-test-tool - Add conformance test for parallel command deduplication using CommandSubmissionService [KVL-1099] (#10869)

* Extract deduplication "features" into a configuration to be used around the tests.
Better naming for assertions that support sync and async deduplication

CHANGELOG_BEGIN

CHANGELOG_END

* Fix broken test and use consistency for tests

* ledger-api-test-tool - Add conformance test for parallel command deduplication

CHANGELOG_BEGIN
CHANGELOG_END

* Add import for 2.12 compat

* Add silencer plugin

* Split parallel command deduplication scenario into it's own test suite

* Add the parallel command deduplication test to append only ledgers

* Run parallel command deduplication tests for append only ledgers

* Apply suggestions from code review

Co-authored-by: fabiotudone-da <fabio.tudone@digitalasset.com>

* Code review renames

* Add compat import

* Run the test concurrently
This commit is contained in:
nicu-da 2021-09-15 05:15:13 -07:00 committed by GitHub
parent 0c32e3baff
commit b4328b3dc3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 116 additions and 5 deletions

View File

@ -129,6 +129,7 @@ conformance_test(
test_tool_args = [
"--verbose",
"--additional=MultiPartySubmissionIT",
"--additional=AppendOnlyCommandDeduplicationParallelIT",
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandService",
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService",
],

View File

@ -73,6 +73,9 @@ da_scala_binary(
da_scala_library(
name = "ledger-api-test-tool-%s-lib" % lf_version,
srcs = glob(["src/main/scala/com/daml/ledger/api/testtool/infrastructure/**/*.scala"]),
plugins = [
silencer_plugin,
],
scala_deps = [
"@maven//:com_typesafe_akka_akka_actor",
"@maven//:com_typesafe_akka_akka_stream",

View File

@ -341,12 +341,11 @@ private[testtool] abstract class CommandDeduplicationBase(
)(request: SubmitRequest, parties: Party*)(implicit
ec: ExecutionContext
): Future[Unit] = {
if (deduplicationFeatures.participantDeduplication) {
if (deduplicationFeatures.participantDeduplication)
submitRequestAndAssertSyncDeduplication(ledger, request)
} else {
else
submitRequestAndAssertAsyncDeduplication(ledger)(request, parties: _*)
.map(_ => ())
}
}
protected def submitRequestAndAssertSyncDeduplication(
@ -361,7 +360,14 @@ private[testtool] abstract class CommandDeduplicationBase(
)(implicit ec: ExecutionContext) = ledger
.submit(request)
.mustFail(s"Request expected to fail with code $code")
.map(assertGrpcError(_, code, None, checkDefiniteAnswerMetadata = true))
.map(
assertGrpcError(
_,
code,
exceptionMessageSubstring = None,
checkDefiniteAnswerMetadata = true,
)
)
protected def submitRequestAndAssertAsyncDeduplication(ledger: ParticipantTestContext)(
request: SubmitRequest,

View File

@ -0,0 +1,91 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.api.testtool.suites
import java.util.UUID
import com.daml.grpc.GrpcException
import com.daml.ledger.api.testtool.infrastructure.Allocation.{
Participant,
Participants,
SingleParty,
allocate,
}
import com.daml.ledger.api.testtool.infrastructure.Assertions.fail
import com.daml.ledger.api.testtool.infrastructure.LedgerTestSuite
import com.daml.ledger.api.testtool.infrastructure.ProtobufConverters._
import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext
import com.daml.ledger.api.v1.command_submission_service.SubmitRequest
import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod
import com.daml.ledger.client.binding.Primitive.Party
import com.daml.ledger.test.model.Test.DummyWithAnnotation
import io.grpc.Status
import io.grpc.Status.Code
import scala.collection.compat._
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
/** Should be enabled for ledgers that fill the submission ID in the completions,
* as we need to use the submission id to find completions for parallel submissions
*/
class AppendOnlyCommandDeduplicationParallelIT extends LedgerTestSuite {
test(
s"DeduplicateParallelSubmissions",
"Commands submitted at the same, in parallel, should be deduplicated",
allocate(SingleParty),
)(implicit ec => { case Participants(Participant(ledger, party)) =>
val deduplicationDuration = 3.seconds
val numberOfParallelRequests = 10
lazy val request = ledger
.submitRequest(party, DummyWithAnnotation(party, "Duplicate").create.command)
.update(
_.commands.deduplicationPeriod := DeduplicationPeriod.DeduplicationDuration(
deduplicationDuration.asProtobuf
)
)
Future
.traverse(Seq.fill(numberOfParallelRequests)(request))(request => {
submitRequestAndGetStatusCode(ledger)(request, party)
})
.map(_.groupBy(identity).view.mapValues(_.size).toMap)
.map(responses => {
val expectedDuplicateResponses = numberOfParallelRequests - 1
val okResponses = responses.getOrElse(Code.OK, 0)
val alreadyExistsResponses = responses.getOrElse(Code.ALREADY_EXISTS, 0)
// Participant-based command de-duplication can currently also reject duplicates via a SQL exception
val internalResponses = responses.getOrElse(Code.INTERNAL, 0)
// Canton can return ABORTED for duplicate submissions
val abortedResponses = responses.getOrElse(Code.ABORTED, 0)
val duplicateResponses =
alreadyExistsResponses + internalResponses + abortedResponses
assert(
okResponses == 1 && duplicateResponses == numberOfParallelRequests - 1,
s"Expected $expectedDuplicateResponses duplicate responses and one accepted, got $responses",
)
})
})
protected def submitRequestAndGetStatusCode(
ledger: ParticipantTestContext
)(request: SubmitRequest, parties: Party*)(implicit ec: ExecutionContext): Future[Code] = {
val submissionId = UUID.randomUUID().toString
val requestWithSubmissionId = request.update(_.commands.submissionId := submissionId)
ledger
.submit(requestWithSubmissionId)
.flatMap(_ => ledger.findCompletion(parties: _*)(_.submissionId == submissionId))
.map {
case Some(completion) =>
completion.getStatus.code
case None => fail(s"Did not find completion for request with submission id $submissionId")
}
.recover {
case GrpcException(status, _) =>
status.getCode.value()
case otherException => fail("Not a GRPC exception", otherException)
}
.map(codeValue => Status.fromCodeValue(codeValue).getCode)
}
}

View File

@ -69,8 +69,9 @@ object Tests {
new AppendOnlyCompletionDeduplicationInfoIT(CommandService),
new AppendOnlyCompletionDeduplicationInfoIT(CommandSubmissionService),
new AppendOnlyKVCommandDeduplicationIT(timeoutScaleFactor, ledgerClockGranularity),
new KVCommandDeduplicationIT(timeoutScaleFactor, ledgerClockGranularity),
new AppendOnlyCommandDeduplicationParallelIT,
new ContractIdIT,
new KVCommandDeduplicationIT(timeoutScaleFactor, ledgerClockGranularity),
new MultiPartySubmissionIT,
new ParticipantPruningIT,
new MonotonicRecordTimeIT,

View File

@ -219,6 +219,7 @@ conformance_test(
"--additional=ParticipantPruningIT",
"--additional=AppendOnlyKVCommandDeduplicationIT",
# The following two tests don't actually care about multi-participant but they do need append-only.
"--additional=AppendOnlyCommandDeduplicationParallelIT",
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandService",
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService",
"--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant

View File

@ -321,6 +321,7 @@ conformance_test(
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService",
"--additional=ParticipantPruningIT",
"--additional=MultiPartySubmissionIT",
"--additional=AppendOnlyCommandDeduplicationParallelIT",
"--additional=AppendOnlyKVCommandDeduplicationIT",
"--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant
# Disable tests targeting only multi-participant setups
@ -347,6 +348,7 @@ conformance_test(
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService",
"--additional=ParticipantPruningIT",
"--additional=MultiPartySubmissionIT",
"--additional=AppendOnlyCommandDeduplicationParallelIT",
"--additional=AppendOnlyKVCommandDeduplicationIT",
"--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant
# Disable tests targeting only multi-participant setups
@ -372,6 +374,7 @@ conformance_test(
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService",
"--additional=ParticipantPruningIT",
"--additional=MultiPartySubmissionIT",
"--additional=AppendOnlyCommandDeduplicationParallelIT",
"--additional=AppendOnlyKVCommandDeduplicationIT",
"--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant
# Disable tests targeting only multi-participant setups
@ -419,6 +422,7 @@ conformance_test(
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService",
"--additional=ParticipantPruningIT",
"--additional=MultiPartySubmissionIT",
"--additional=AppendOnlyCommandDeduplicationParallelIT",
"--additional=AppendOnlyKVCommandDeduplicationIT",
"--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant
# Disable tests targeting only multi-participant setups
@ -445,6 +449,7 @@ conformance_test(
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService",
"--additional=ParticipantPruningIT",
"--additional=MultiPartySubmissionIT",
"--additional=AppendOnlyCommandDeduplicationParallelIT",
"--additional=AppendOnlyKVCommandDeduplicationIT",
"--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant
# Disable tests targeting only multi-participant setups

View File

@ -364,6 +364,7 @@ server_conformance_test(
servers = ONLY_POSTGRES_SERVER,
test_tool_args = [
"--open-world",
"--additional=AppendOnlyCommandDeduplicationParallelIT",
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandService",
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService",
"--exclude=ClosedWorldIT",
@ -394,6 +395,7 @@ server_conformance_test(
servers = ONLY_POSTGRES_SERVER,
test_tool_args = [
"--open-world",
"--additional=AppendOnlyCommandDeduplicationParallelIT",
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandService",
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService",
"--additional=ParticipantPruningIT",

View File

@ -337,6 +337,7 @@ server_conformance_test(
servers = {"postgresql": NEXT_SERVERS["postgresql"]},
test_tool_args = [
"--open-world",
"--additional=AppendOnlyCommandDeduplicationParallelIT",
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandService",
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService",
"--additional=AppendOnlyKVCommandDeduplicationIT",