Command deduplication - better support for different deduplication modes in conformance tests [KVL-1099] (#10864)

* Extract deduplication "features" into a configuration to be used around the tests.
Better naming for assertions that support sync and async deduplication

CHANGELOG_BEGIN

CHANGELOG_END

* Fix broken test and use consistency for tests
This commit is contained in:
nicu-da 2021-09-13 12:05:50 -07:00 committed by GitHub
parent 6f151e287e
commit dfae9f600f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 99 additions and 57 deletions

View File

@ -9,6 +9,7 @@ import com.daml.ledger.api.testtool.infrastructure.Allocation._
import com.daml.ledger.api.testtool.infrastructure.Assertions.{assertGrpcError, assertSingleton, _}
import com.daml.ledger.api.testtool.infrastructure.LedgerTestSuite
import com.daml.ledger.api.testtool.infrastructure.ProtobufConverters._
import com.daml.ledger.api.testtool.infrastructure.deduplication.CommandDeduplicationBase.DeduplicationFeatures
import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext
import com.daml.ledger.api.v1.command_submission_service.SubmitRequest
import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod
@ -37,10 +38,7 @@ private[testtool] abstract class CommandDeduplicationBase(
val ledgerWaitInterval: FiniteDuration = ledgerTimeInterval * 2
val defaultDeduplicationWindowWait: FiniteDuration = deduplicationDuration + ledgerWaitInterval
/** For [[Completion]], the submission id and deduplication period are filled only for append only schemas
* Therefore, we need to assert on those fields only if it's an append only schema
*/
protected def isAppendOnly: Boolean
def deduplicationFeatures: DeduplicationFeatures
protected def runGivenDeduplicationWait(
participants: Seq[ParticipantTestContext]
@ -78,8 +76,8 @@ private[testtool] abstract class CommandDeduplicationBase(
// Submit command A (first deduplication window)
// Note: the second submit() in this block is deduplicated and thus rejected by the ledger API server,
// only one submission is therefore sent to the ledger.
completion1 <- requestHasOkCompletion(ledger)(requestA1, party)
_ <- submitRequestAndAssertFailure(ledger)(requestA1, Code.ALREADY_EXISTS)
completion1 <- submitRequestAndAssertCompletionAccepted(ledger)(requestA1, party)
_ <- submitRequestAndAssertDeduplication(ledger)(requestA1)
// Wait until the end of first deduplication window
_ <- Delayed.by(deduplicationWait)(())
@ -88,8 +86,8 @@ private[testtool] abstract class CommandDeduplicationBase(
// the ledger API server and the ledger itself, since the test waited more than
// `deduplicationSeconds` after receiving the first command *completion*.
// The first submit() in this block should therefore lead to an accepted transaction.
completion2 <- requestHasOkCompletion(ledger)(requestA2, party)
_ <- submitRequestAndAssertFailure(ledger)(requestA2, Code.ALREADY_EXISTS)
completion2 <- submitRequestAndAssertCompletionAccepted(ledger)(requestA2, party)
_ <- submitRequestAndAssertDeduplication(ledger)(requestA2, party)
// Inspect created contracts
activeContracts <- ledger.activeContracts(party)
} yield {
@ -122,10 +120,10 @@ private[testtool] abstract class CommandDeduplicationBase(
for {
// Submit an invalid command (should fail with INVALID_ARGUMENT)
_ <- submitRequestAndAssertFailure(ledger)(requestA, Code.INVALID_ARGUMENT)
_ <- submitRequestAndAssertSyncFailure(ledger)(requestA, Code.INVALID_ARGUMENT)
// Re-submit the invalid command (should again fail with INVALID_ARGUMENT and not with ALREADY_EXISTS)
_ <- submitRequestAndAssertFailure(ledger)(requestA, Code.INVALID_ARGUMENT)
_ <- submitRequestAndAssertSyncFailure(ledger)(requestA, Code.INVALID_ARGUMENT)
} yield {}
})
@ -332,54 +330,71 @@ private[testtool] abstract class CommandDeduplicationBase(
}
})
def requestHasOkCompletion(
def submitRequestAndAssertCompletionAccepted(
ledger: ParticipantTestContext
)(request: SubmitRequest, party: Party)(implicit ec: ExecutionContext): Future[Completion] = {
requestHasCompletionWithStatusCode(ledger)(request, party, Code.OK)
)(request: SubmitRequest, parties: Party*)(implicit ec: ExecutionContext): Future[Completion] = {
submitRequestAndAssertCompletionStatus(ledger)(request, Code.OK, parties: _*)
}
def requestHasCompletionWithStatusCode(
protected def submitRequestAndAssertDeduplication(
ledger: ParticipantTestContext
)(request: SubmitRequest, party: Party, code: Code)(implicit
)(request: SubmitRequest, parties: Party*)(implicit
ec: ExecutionContext
): Future[Completion] = {
submitRequestAndFindCompletion(ledger)(request, party).map(completion => {
): Future[Unit] = {
if (deduplicationFeatures.participantDeduplication) {
submitRequestAndAssertSyncDeduplication(ledger, request)
} else {
submitRequestAndAssertAsyncDeduplication(ledger)(request, parties: _*)
.map(_ => ())
}
}
protected def submitRequestAndAssertSyncDeduplication(
ledger: ParticipantTestContext,
request: SubmitRequest,
)(implicit ec: ExecutionContext): Future[Unit] =
submitRequestAndAssertSyncFailure(ledger)(request, Code.ALREADY_EXISTS)
private def submitRequestAndAssertSyncFailure(ledger: ParticipantTestContext)(
request: SubmitRequest,
code: Code,
)(implicit ec: ExecutionContext) = ledger
.submit(request)
.mustFail(s"Request expected to fail with code $code")
.map(assertGrpcError(_, code, None, checkDefiniteAnswerMetadata = true))
protected def submitRequestAndAssertAsyncDeduplication(ledger: ParticipantTestContext)(
request: SubmitRequest,
parties: Party*
)(implicit ec: ExecutionContext): Future[Completion] = submitRequestAndAssertCompletionStatus(
ledger
)(request, Code.ALREADY_EXISTS, parties: _*)
private def submitRequestAndAssertCompletionStatus(
ledger: ParticipantTestContext
)(request: SubmitRequest, statusCode: Code, parties: Party*)(implicit
ec: ExecutionContext
): Future[Completion] =
submitRequestAndFindCompletion(ledger)(request, parties: _*).map(completion => {
assert(
completion.getStatus.code == code.value(),
s"Expecting completion with status code $code but completion has status ${completion.status}",
completion.getStatus.code == statusCode.value(),
s"Expecting completion with status code $statusCode but completion has status ${completion.status}",
)
completion
})
}
protected def submitRequestAndAssertFailure(
ledger: ParticipantTestContext
)(request: SubmitRequest, statusCode: Code)(implicit ec: ExecutionContext): Future[Unit] = {
ledger
.submit(request)
.mustFail(s"Request expected to fail with status $statusCode")
.map(
assertGrpcError(
_,
statusCode,
exceptionMessageSubstring = None,
checkDefiniteAnswerMetadata = true,
)
)
}
protected def submitRequestAndFindCompletion(
ledger: ParticipantTestContext
)(request: SubmitRequest, party: Party)(implicit ec: ExecutionContext): Future[Completion] = {
)(request: SubmitRequest, parties: Party*)(implicit ec: ExecutionContext): Future[Completion] = {
val submissionId = UUID.randomUUID().toString
submitRequest(ledger)(request.update(_.commands.submissionId := submissionId))
.flatMap(ledgerEnd => {
ledger.firstCompletions(ledger.completionStreamRequest(ledgerEnd)(party))
ledger.firstCompletions(ledger.completionStreamRequest(ledgerEnd)(parties: _*))
})
.map { completions =>
val completion = assertSingleton("Expected only one completion", completions)
// The [[Completion.submissionId]] is set only for append-only ledgers
if (isAppendOnly)
if (deduplicationFeatures.appendOnlySchema)
assert(
completion.submissionId == submissionId,
s"Submission id is different for completion. Completion has submission id [${completion.submissionId}], request has submission id [$submissionId]",
@ -407,3 +422,15 @@ private[testtool] abstract class CommandDeduplicationBase(
throw new IllegalArgumentException(s"Invalid timeout scale factor: $timeoutScaleFactor")
}
}
object CommandDeduplicationBase {
/** @param participantDeduplication If participant deduplication is enabled then we will receive synchronous rejections
* @param appendOnlySchema For [[Completion]], the submission id and deduplication period are filled only for append only schemas
* Therefore, we need to assert on those fields only if it's an append only schema
*/
case class DeduplicationFeatures(
participantDeduplication: Boolean,
appendOnlySchema: Boolean,
)
}

View File

@ -16,7 +16,6 @@ import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod
import com.daml.ledger.api.v1.completion.Completion
import com.daml.ledger.test.model.Test.DummyWithAnnotation
import com.daml.timer.Delayed
import io.grpc.Status.Code
import org.slf4j.LoggerFactory
import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration}
@ -50,22 +49,18 @@ abstract class KVCommandDeduplicationBase(
)
runWithConfig(configuredParticipants) { (maxDeduplicationDuration, minSkew) =>
for {
completion1 <- requestHasOkCompletion(ledger)(request, party)
completion1 <- submitRequestAndAssertCompletionAccepted(ledger)(request, party)
// participant side deduplication, sync result
_ <- submitRequestAndAssertFailure(ledger)(request, Code.ALREADY_EXISTS)
_ <- submitRequestAndAssertSyncDeduplication(ledger, request)
// Wait for the end of participant deduplication
// We also add min skew to also validate the committer deduplication duration
_ <- Delayed.by(deduplicationDuration.plus(minSkew))(())
// Validate committer deduplication
duplicateCompletion <- requestHasCompletionWithStatusCode(ledger)(
request,
party,
Code.ALREADY_EXISTS,
)
duplicateCompletion <- submitRequestAndAssertAsyncDeduplication(ledger)(request, party)
// Wait for the end of committer deduplication, we already waited for minSkew
_ <- Delayed.by(maxDeduplicationDuration.minus(deduplicationDuration))(())
// Deduplication has finished
completion2 <- requestHasOkCompletion(ledger)(request, party)
completion2 <- submitRequestAndAssertCompletionAccepted(ledger)(request, party)
// Inspect created contracts
activeContracts <- ledger.activeContracts(party)
} yield {
@ -82,7 +77,7 @@ abstract class KVCommandDeduplicationBase(
"The command ID of the duplicate completion does not match the command ID of the submission",
)
// The [[Completion.deduplicationPeriod]] is set only for append-only ledgers
if (isAppendOnly) {
if (deduplicationFeatures.appendOnlySchema) {
val expectedCompletionDeduplicationPeriod =
Completion.DeduplicationPeriod.DeduplicationTime(
maxDeduplicationDuration.asProtobuf

View File

@ -3,7 +3,11 @@
package com.daml.ledger.api.testtool.suites
import com.daml.ledger.api.testtool.infrastructure.deduplication.KVCommandDeduplicationBase
import com.daml.ledger.api.testtool.infrastructure.deduplication.CommandDeduplicationBase.DeduplicationFeatures
import com.daml.ledger.api.testtool.infrastructure.deduplication.{
CommandDeduplicationBase,
KVCommandDeduplicationBase,
}
import scala.concurrent.duration.FiniteDuration
@ -14,7 +18,12 @@ class AppendOnlyKVCommandDeduplicationIT(
timeoutScaleFactor: Double,
ledgerTimeInterval: FiniteDuration,
) extends KVCommandDeduplicationBase(timeoutScaleFactor, ledgerTimeInterval) {
override protected def isAppendOnly: Boolean = true
override protected def testNamingPrefix: String = "AppendOnlyKVCommandDeduplication"
override def deduplicationFeatures: CommandDeduplicationBase.DeduplicationFeatures =
DeduplicationFeatures(
participantDeduplication = true,
appendOnlySchema = true,
)
}

View File

@ -4,10 +4,11 @@
package com.daml.ledger.api.testtool.suites
import com.daml.ledger.api.testtool.infrastructure.deduplication.CommandDeduplicationBase
import com.daml.ledger.api.testtool.infrastructure.deduplication.CommandDeduplicationBase.DeduplicationFeatures
import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future}
/** Command deduplication tests for participant side deduplication
* Should be disabled for ledgers that have committer side deduplication enabled (KV)
@ -23,7 +24,9 @@ final class CommandDeduplicationIT(timeoutScaleFactor: Double, ledgerTimeInterva
override def testNamingPrefix: String = "ParticipantCommandDeduplication"
/** Assertions for append-only schema are important for KV ledgers
*/
override protected def isAppendOnly: Boolean = false
override def deduplicationFeatures: CommandDeduplicationBase.DeduplicationFeatures =
DeduplicationFeatures(
participantDeduplication = true,
appendOnlySchema = false,
)
}

View File

@ -3,7 +3,11 @@
package com.daml.ledger.api.testtool.suites
import com.daml.ledger.api.testtool.infrastructure.deduplication.KVCommandDeduplicationBase
import com.daml.ledger.api.testtool.infrastructure.deduplication.CommandDeduplicationBase.DeduplicationFeatures
import com.daml.ledger.api.testtool.infrastructure.deduplication.{
CommandDeduplicationBase,
KVCommandDeduplicationBase,
}
import scala.concurrent.duration.FiniteDuration
@ -17,5 +21,9 @@ class KVCommandDeduplicationIT(timeoutScaleFactor: Double, ledgerTimeInterval: F
override def testNamingPrefix: String = "KVCommandDeduplication"
override protected def isAppendOnly: Boolean = false
override def deduplicationFeatures: CommandDeduplicationBase.DeduplicationFeatures =
DeduplicationFeatures(
participantDeduplication = true,
appendOnlySchema = false,
)
}