Add tests for completion reader in JdbcLedgerDaoSpec (#5072)

* Add tests for completion reader in JdbcLedgerDaoSpec

CHANGELOG_BEGIN
CHANGELOG_END

* Add test for completion, make offset checks more strict

* Fix badly typed check

* Comment reason for weird matcher use

* Address https://github.com/digital-asset/daml/pull/5072#discussion_r394447235

* Address https://github.com/digital-asset/daml/pull/5072#discussion_r394447981

* Address https://github.com/digital-asset/daml/pull/5072#discussion_r394448506

* Address https://github.com/digital-asset/daml/pull/5072#discussion_r394451069
This commit is contained in:
Stefano Baghino 2020-03-18 17:34:36 +01:00 committed by GitHub
parent 76a37b9560
commit 856ed80728
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 157 additions and 3 deletions

View File

@ -0,0 +1,151 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.platform.store.dao
import java.time.Instant
import java.util.UUID
import akka.stream.scaladsl.Sink
import com.daml.ledger.participant.state.v1.Offset
import com.digitalasset.daml.lf.data.Ref.Party
import com.digitalasset.ledger.{ApplicationId, CommandId}
import com.digitalasset.ledger.api.domain.RejectionReason
import com.digitalasset.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.digitalasset.platform.ApiOffset
import com.digitalasset.platform.store.dao.JdbcLedgerDaoCompletionsSpec._
import com.digitalasset.platform.store.entries.LedgerEntry
import com.digitalasset.platform.store.{CompletionFromTransaction, PersistenceEntry}
import org.scalatest.{AsyncFlatSpec, LoneElement, Matchers, OptionValues}
import scala.concurrent.Future
private[dao] trait JdbcLedgerDaoCompletionsSpec extends OptionValues with LoneElement {
this: AsyncFlatSpec with Matchers with JdbcLedgerDaoSuite =>
behavior of "JdbcLedgerDao (completions)"
it should "return the expected completion for an accepted transaction" in {
for {
from <- ledgerDao.lookupLedgerEnd()
(offset, tx) <- storeCreateTransaction()
to <- ledgerDao.lookupLedgerEnd()
(_, response) <- ledgerDao.completions
.getCommandCompletions(from, to, tx.applicationId.get, Set(tx.submittingParty.get))
.runWith(Sink.head)
} yield {
offsetOf(response) shouldBe offset
val completion = response.completions.loneElement
completion.transactionId shouldBe tx.transactionId
completion.commandId shouldBe tx.commandId.get
completion.status.value.code shouldBe io.grpc.Status.Code.OK.value()
}
}
it should "return the expected completion for a rejection" in {
val offset = nextOffset()
val rejection = rejectWith(RejectionReason.Inconsistent(""))
for {
from <- ledgerDao.lookupLedgerEnd()
_ <- ledgerDao.storeLedgerEntry(offset, rejection)
to <- ledgerDao.lookupLedgerEnd()
(_, response) <- ledgerDao.completions
.getCommandCompletions(from, to, applicationId, parties)
.runWith(Sink.head)
} yield {
offsetOf(response) shouldBe offset
val completion = response.completions.loneElement
completion.transactionId shouldBe empty
completion.commandId shouldBe rejection.entry.commandId
completion.status.value.code shouldNot be(io.grpc.Status.Code.OK.value())
}
}
it should "not return completions if the application id is wrong" in {
val offset = nextOffset()
val rejection = rejectWith(RejectionReason.Inconsistent(""))
for {
from <- ledgerDao.lookupLedgerEnd()
_ <- ledgerDao.storeLedgerEntry(offset, rejection)
to <- ledgerDao.lookupLedgerEnd()
response <- ledgerDao.completions
.getCommandCompletions(from, to, applicationId = "WRONG", parties)
.runWith(Sink.seq)
} yield {
response should have size 0 // `shouldBe empty` upsets WartRemover
}
}
it should "not return completions if the parties do not match" in {
val offset = nextOffset()
val rejection = rejectWith(RejectionReason.Inconsistent(""))
for {
from <- ledgerDao.lookupLedgerEnd()
_ <- ledgerDao.storeLedgerEntry(offset, rejection)
to <- ledgerDao.lookupLedgerEnd()
response <- ledgerDao.completions
.getCommandCompletions(from, to, applicationId, Set("WRONG"))
.runWith(Sink.seq)
} yield {
response should have size 0 // `shouldBe empty` upsets WartRemover
}
}
it should "return the expected status for each rejection reason" in {
val reasons = Seq[RejectionReason](
RejectionReason.Disputed(""),
RejectionReason.Inconsistent(""),
RejectionReason.InvalidLedgerTime(""),
RejectionReason.OutOfQuota(""),
RejectionReason.PartyNotKnownOnLedger(""),
RejectionReason.SubmitterCannotActViaParticipant(""),
RejectionReason.TimedOut(""),
)
for {
from <- ledgerDao.lookupLedgerEnd()
_ <- Future.sequence(
reasons.map(reason => ledgerDao.storeLedgerEntry(nextOffset(), rejectWith(reason))))
to <- ledgerDao.lookupLedgerEnd()
responses <- ledgerDao.completions
.getCommandCompletions(from, to, applicationId, parties)
.map(_._2)
.runWith(Sink.seq)
} yield {
responses should have length reasons.length.toLong
val returnedCodes = responses.flatMap(_.completions.map(_.status.get.code))
for ((reason, code) <- reasons.zip(returnedCodes)) {
code shouldBe CompletionFromTransaction.toErrorCode(reason).value()
}
succeed
}
}
}
private[dao] object JdbcLedgerDaoCompletionsSpec {
private val applicationId: ApplicationId =
"JdbcLedgerDaoCompletionsSpec".asInstanceOf[ApplicationId]
private val party: Party = "JdbcLedgerDaoCompletionsSpec".asInstanceOf[Party]
private val parties: Set[Party] = Set(party)
private def offsetOf(response: CompletionStreamResponse): Offset =
ApiOffset.assertFromString(response.checkpoint.get.offset.get.value.absolute.get)
private def rejectWith(reason: RejectionReason): PersistenceEntry.Rejection =
PersistenceEntry.Rejection(
LedgerEntry.Rejection(
recordTime = Instant.now,
commandId = UUID.randomUUID().toString.asInstanceOf[CommandId],
applicationId = applicationId,
submitter = party,
rejectionReason = reason,
)
)
}

View File

@ -177,7 +177,7 @@ private[dao] trait JdbcLedgerDaoLedgerEntriesSpec {
val N = 1000
val M = 10
def runSequentially(n: Int, f: Int => Future[Unit]) =
def runSequentially[U](n: Int, f: Int => Future[U]): Future[akka.Done] =
Source(1 to n).mapAsync(1)(f).runWith(Sink.ignore)
// Perform the following operations:

View File

@ -136,12 +136,13 @@ private[dao] trait JdbcLedgerDaoSuite extends AkkaBeforeAndAfterAll with JdbcLed
)
}
protected final def storeCreateTransaction()(implicit ec: ExecutionContext): Future[Unit] = {
protected final def storeCreateTransaction()(
implicit ec: ExecutionContext): Future[(Offset, LedgerEntry.Transaction)] = {
val offset = nextOffset()
val t = genCreateTransaction(offset)
ledgerDao
.storeLedgerEntry(offset, PersistenceEntry.Transaction(t, Map.empty, List.empty))
.map(_ => ())
.map(_ => offset -> t)
}
private def genExerciseTransaction(

View File

@ -11,6 +11,7 @@ final class JdbcLedgerDaoH2DatabaseSpec
with Matchers
with JdbcLedgerDaoSuite
with JdbcLedgerDaoBackendH2Database
with JdbcLedgerDaoCompletionsSpec
with JdbcLedgerDaoConfigurationSpec
with JdbcLedgerDaoContractKeysSpec
with JdbcLedgerDaoContractsSpec

View File

@ -11,6 +11,7 @@ final class JdbcLedgerDaoPostgresqlSpec
with Matchers
with JdbcLedgerDaoSuite
with JdbcLedgerDaoBackendPostgresql
with JdbcLedgerDaoCompletionsSpec
with JdbcLedgerDaoConfigurationSpec
with JdbcLedgerDaoContractKeysSpec
with JdbcLedgerDaoContractsSpec