From dfae9f600fdc55157aa5685ba0220a3c8d2c7ef0 Mon Sep 17 00:00:00 2001 From: nicu-da Date: Mon, 13 Sep 2021 12:05:50 -0700 Subject: [PATCH] Command deduplication - better support for different deduplication modes in conformance tests [KVL-1099] (#10864) * Extract deduplication "features" into a configuration to be used around the tests. Better naming for assertions that support sync and async deduplication CHANGELOG_BEGIN CHANGELOG_END * Fix broken test and use consistency for tests --- .../CommandDeduplicationBase.scala | 105 +++++++++++------- .../KVCommandDeduplicationBase.scala | 15 +-- .../AppendOnlyKVCommandDeduplicationIT.scala | 13 ++- .../suites/CommandDeduplicationIT.scala | 11 +- .../suites/KVCommandDeduplicationIT.scala | 12 +- 5 files changed, 99 insertions(+), 57 deletions(-) diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/CommandDeduplicationBase.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/CommandDeduplicationBase.scala index c16f3eaa70..fd3aa27312 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/CommandDeduplicationBase.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/CommandDeduplicationBase.scala @@ -9,6 +9,7 @@ import com.daml.ledger.api.testtool.infrastructure.Allocation._ import com.daml.ledger.api.testtool.infrastructure.Assertions.{assertGrpcError, assertSingleton, _} import com.daml.ledger.api.testtool.infrastructure.LedgerTestSuite import com.daml.ledger.api.testtool.infrastructure.ProtobufConverters._ +import com.daml.ledger.api.testtool.infrastructure.deduplication.CommandDeduplicationBase.DeduplicationFeatures import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext import com.daml.ledger.api.v1.command_submission_service.SubmitRequest import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod @@ -37,10 +38,7 @@ private[testtool] abstract class CommandDeduplicationBase( val ledgerWaitInterval: FiniteDuration = ledgerTimeInterval * 2 val defaultDeduplicationWindowWait: FiniteDuration = deduplicationDuration + ledgerWaitInterval - /** For [[Completion]], the submission id and deduplication period are filled only for append only schemas - * Therefore, we need to assert on those fields only if it's an append only schema - */ - protected def isAppendOnly: Boolean + def deduplicationFeatures: DeduplicationFeatures protected def runGivenDeduplicationWait( participants: Seq[ParticipantTestContext] @@ -78,8 +76,8 @@ private[testtool] abstract class CommandDeduplicationBase( // Submit command A (first deduplication window) // Note: the second submit() in this block is deduplicated and thus rejected by the ledger API server, // only one submission is therefore sent to the ledger. - completion1 <- requestHasOkCompletion(ledger)(requestA1, party) - _ <- submitRequestAndAssertFailure(ledger)(requestA1, Code.ALREADY_EXISTS) + completion1 <- submitRequestAndAssertCompletionAccepted(ledger)(requestA1, party) + _ <- submitRequestAndAssertDeduplication(ledger)(requestA1) // Wait until the end of first deduplication window _ <- Delayed.by(deduplicationWait)(()) @@ -88,8 +86,8 @@ private[testtool] abstract class CommandDeduplicationBase( // the ledger API server and the ledger itself, since the test waited more than // `deduplicationSeconds` after receiving the first command *completion*. // The first submit() in this block should therefore lead to an accepted transaction. - completion2 <- requestHasOkCompletion(ledger)(requestA2, party) - _ <- submitRequestAndAssertFailure(ledger)(requestA2, Code.ALREADY_EXISTS) + completion2 <- submitRequestAndAssertCompletionAccepted(ledger)(requestA2, party) + _ <- submitRequestAndAssertDeduplication(ledger)(requestA2, party) // Inspect created contracts activeContracts <- ledger.activeContracts(party) } yield { @@ -122,10 +120,10 @@ private[testtool] abstract class CommandDeduplicationBase( for { // Submit an invalid command (should fail with INVALID_ARGUMENT) - _ <- submitRequestAndAssertFailure(ledger)(requestA, Code.INVALID_ARGUMENT) + _ <- submitRequestAndAssertSyncFailure(ledger)(requestA, Code.INVALID_ARGUMENT) // Re-submit the invalid command (should again fail with INVALID_ARGUMENT and not with ALREADY_EXISTS) - _ <- submitRequestAndAssertFailure(ledger)(requestA, Code.INVALID_ARGUMENT) + _ <- submitRequestAndAssertSyncFailure(ledger)(requestA, Code.INVALID_ARGUMENT) } yield {} }) @@ -332,54 +330,71 @@ private[testtool] abstract class CommandDeduplicationBase( } }) - def requestHasOkCompletion( + def submitRequestAndAssertCompletionAccepted( ledger: ParticipantTestContext - )(request: SubmitRequest, party: Party)(implicit ec: ExecutionContext): Future[Completion] = { - requestHasCompletionWithStatusCode(ledger)(request, party, Code.OK) + )(request: SubmitRequest, parties: Party*)(implicit ec: ExecutionContext): Future[Completion] = { + submitRequestAndAssertCompletionStatus(ledger)(request, Code.OK, parties: _*) } - def requestHasCompletionWithStatusCode( + protected def submitRequestAndAssertDeduplication( ledger: ParticipantTestContext - )(request: SubmitRequest, party: Party, code: Code)(implicit + )(request: SubmitRequest, parties: Party*)(implicit ec: ExecutionContext - ): Future[Completion] = { - submitRequestAndFindCompletion(ledger)(request, party).map(completion => { + ): Future[Unit] = { + if (deduplicationFeatures.participantDeduplication) { + submitRequestAndAssertSyncDeduplication(ledger, request) + } else { + submitRequestAndAssertAsyncDeduplication(ledger)(request, parties: _*) + .map(_ => ()) + } + } + + protected def submitRequestAndAssertSyncDeduplication( + ledger: ParticipantTestContext, + request: SubmitRequest, + )(implicit ec: ExecutionContext): Future[Unit] = + submitRequestAndAssertSyncFailure(ledger)(request, Code.ALREADY_EXISTS) + + private def submitRequestAndAssertSyncFailure(ledger: ParticipantTestContext)( + request: SubmitRequest, + code: Code, + )(implicit ec: ExecutionContext) = ledger + .submit(request) + .mustFail(s"Request expected to fail with code $code") + .map(assertGrpcError(_, code, None, checkDefiniteAnswerMetadata = true)) + + protected def submitRequestAndAssertAsyncDeduplication(ledger: ParticipantTestContext)( + request: SubmitRequest, + parties: Party* + )(implicit ec: ExecutionContext): Future[Completion] = submitRequestAndAssertCompletionStatus( + ledger + )(request, Code.ALREADY_EXISTS, parties: _*) + + private def submitRequestAndAssertCompletionStatus( + ledger: ParticipantTestContext + )(request: SubmitRequest, statusCode: Code, parties: Party*)(implicit + ec: ExecutionContext + ): Future[Completion] = + submitRequestAndFindCompletion(ledger)(request, parties: _*).map(completion => { assert( - completion.getStatus.code == code.value(), - s"Expecting completion with status code $code but completion has status ${completion.status}", + completion.getStatus.code == statusCode.value(), + s"Expecting completion with status code $statusCode but completion has status ${completion.status}", ) completion }) - } - - protected def submitRequestAndAssertFailure( - ledger: ParticipantTestContext - )(request: SubmitRequest, statusCode: Code)(implicit ec: ExecutionContext): Future[Unit] = { - ledger - .submit(request) - .mustFail(s"Request expected to fail with status $statusCode") - .map( - assertGrpcError( - _, - statusCode, - exceptionMessageSubstring = None, - checkDefiniteAnswerMetadata = true, - ) - ) - } protected def submitRequestAndFindCompletion( ledger: ParticipantTestContext - )(request: SubmitRequest, party: Party)(implicit ec: ExecutionContext): Future[Completion] = { + )(request: SubmitRequest, parties: Party*)(implicit ec: ExecutionContext): Future[Completion] = { val submissionId = UUID.randomUUID().toString submitRequest(ledger)(request.update(_.commands.submissionId := submissionId)) .flatMap(ledgerEnd => { - ledger.firstCompletions(ledger.completionStreamRequest(ledgerEnd)(party)) + ledger.firstCompletions(ledger.completionStreamRequest(ledgerEnd)(parties: _*)) }) .map { completions => val completion = assertSingleton("Expected only one completion", completions) // The [[Completion.submissionId]] is set only for append-only ledgers - if (isAppendOnly) + if (deduplicationFeatures.appendOnlySchema) assert( completion.submissionId == submissionId, s"Submission id is different for completion. Completion has submission id [${completion.submissionId}], request has submission id [$submissionId]", @@ -407,3 +422,15 @@ private[testtool] abstract class CommandDeduplicationBase( throw new IllegalArgumentException(s"Invalid timeout scale factor: $timeoutScaleFactor") } } + +object CommandDeduplicationBase { + + /** @param participantDeduplication If participant deduplication is enabled then we will receive synchronous rejections + * @param appendOnlySchema For [[Completion]], the submission id and deduplication period are filled only for append only schemas + * Therefore, we need to assert on those fields only if it's an append only schema + */ + case class DeduplicationFeatures( + participantDeduplication: Boolean, + appendOnlySchema: Boolean, + ) +} diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/KVCommandDeduplicationBase.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/KVCommandDeduplicationBase.scala index 92df616f5f..24bbcad014 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/KVCommandDeduplicationBase.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/KVCommandDeduplicationBase.scala @@ -16,7 +16,6 @@ import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod import com.daml.ledger.api.v1.completion.Completion import com.daml.ledger.test.model.Test.DummyWithAnnotation import com.daml.timer.Delayed -import io.grpc.Status.Code import org.slf4j.LoggerFactory import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration} @@ -50,22 +49,18 @@ abstract class KVCommandDeduplicationBase( ) runWithConfig(configuredParticipants) { (maxDeduplicationDuration, minSkew) => for { - completion1 <- requestHasOkCompletion(ledger)(request, party) + completion1 <- submitRequestAndAssertCompletionAccepted(ledger)(request, party) // participant side deduplication, sync result - _ <- submitRequestAndAssertFailure(ledger)(request, Code.ALREADY_EXISTS) + _ <- submitRequestAndAssertSyncDeduplication(ledger, request) // Wait for the end of participant deduplication // We also add min skew to also validate the committer deduplication duration _ <- Delayed.by(deduplicationDuration.plus(minSkew))(()) // Validate committer deduplication - duplicateCompletion <- requestHasCompletionWithStatusCode(ledger)( - request, - party, - Code.ALREADY_EXISTS, - ) + duplicateCompletion <- submitRequestAndAssertAsyncDeduplication(ledger)(request, party) // Wait for the end of committer deduplication, we already waited for minSkew _ <- Delayed.by(maxDeduplicationDuration.minus(deduplicationDuration))(()) // Deduplication has finished - completion2 <- requestHasOkCompletion(ledger)(request, party) + completion2 <- submitRequestAndAssertCompletionAccepted(ledger)(request, party) // Inspect created contracts activeContracts <- ledger.activeContracts(party) } yield { @@ -82,7 +77,7 @@ abstract class KVCommandDeduplicationBase( "The command ID of the duplicate completion does not match the command ID of the submission", ) // The [[Completion.deduplicationPeriod]] is set only for append-only ledgers - if (isAppendOnly) { + if (deduplicationFeatures.appendOnlySchema) { val expectedCompletionDeduplicationPeriod = Completion.DeduplicationPeriod.DeduplicationTime( maxDeduplicationDuration.asProtobuf diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/AppendOnlyKVCommandDeduplicationIT.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/AppendOnlyKVCommandDeduplicationIT.scala index 5392c609d9..9b4fb93d1e 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/AppendOnlyKVCommandDeduplicationIT.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/AppendOnlyKVCommandDeduplicationIT.scala @@ -3,7 +3,11 @@ package com.daml.ledger.api.testtool.suites -import com.daml.ledger.api.testtool.infrastructure.deduplication.KVCommandDeduplicationBase +import com.daml.ledger.api.testtool.infrastructure.deduplication.CommandDeduplicationBase.DeduplicationFeatures +import com.daml.ledger.api.testtool.infrastructure.deduplication.{ + CommandDeduplicationBase, + KVCommandDeduplicationBase, +} import scala.concurrent.duration.FiniteDuration @@ -14,7 +18,12 @@ class AppendOnlyKVCommandDeduplicationIT( timeoutScaleFactor: Double, ledgerTimeInterval: FiniteDuration, ) extends KVCommandDeduplicationBase(timeoutScaleFactor, ledgerTimeInterval) { - override protected def isAppendOnly: Boolean = true override protected def testNamingPrefix: String = "AppendOnlyKVCommandDeduplication" + + override def deduplicationFeatures: CommandDeduplicationBase.DeduplicationFeatures = + DeduplicationFeatures( + participantDeduplication = true, + appendOnlySchema = true, + ) } diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationIT.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationIT.scala index 124665f57f..314f4443ac 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationIT.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationIT.scala @@ -4,10 +4,11 @@ package com.daml.ledger.api.testtool.suites import com.daml.ledger.api.testtool.infrastructure.deduplication.CommandDeduplicationBase +import com.daml.ledger.api.testtool.infrastructure.deduplication.CommandDeduplicationBase.DeduplicationFeatures import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext -import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.{Duration, FiniteDuration} +import scala.concurrent.{ExecutionContext, Future} /** Command deduplication tests for participant side deduplication * Should be disabled for ledgers that have committer side deduplication enabled (KV) @@ -23,7 +24,9 @@ final class CommandDeduplicationIT(timeoutScaleFactor: Double, ledgerTimeInterva override def testNamingPrefix: String = "ParticipantCommandDeduplication" - /** Assertions for append-only schema are important for KV ledgers - */ - override protected def isAppendOnly: Boolean = false + override def deduplicationFeatures: CommandDeduplicationBase.DeduplicationFeatures = + DeduplicationFeatures( + participantDeduplication = true, + appendOnlySchema = false, + ) } diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/KVCommandDeduplicationIT.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/KVCommandDeduplicationIT.scala index 520687e515..30e4b83123 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/KVCommandDeduplicationIT.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/KVCommandDeduplicationIT.scala @@ -3,7 +3,11 @@ package com.daml.ledger.api.testtool.suites -import com.daml.ledger.api.testtool.infrastructure.deduplication.KVCommandDeduplicationBase +import com.daml.ledger.api.testtool.infrastructure.deduplication.CommandDeduplicationBase.DeduplicationFeatures +import com.daml.ledger.api.testtool.infrastructure.deduplication.{ + CommandDeduplicationBase, + KVCommandDeduplicationBase, +} import scala.concurrent.duration.FiniteDuration @@ -17,5 +21,9 @@ class KVCommandDeduplicationIT(timeoutScaleFactor: Double, ledgerTimeInterval: F override def testNamingPrefix: String = "KVCommandDeduplication" - override protected def isAppendOnly: Boolean = false + override def deduplicationFeatures: CommandDeduplicationBase.DeduplicationFeatures = + DeduplicationFeatures( + participantDeduplication = true, + appendOnlySchema = false, + ) }