Implemented JdbcLedgerDaoTransactionLogUpdatesSpec (#9813)

CHANGELOG_BEGIN
CHANGELOG_END
This commit is contained in:
tudor-da 2021-06-02 14:17:11 +02:00 committed by GitHub
parent f36f556b73
commit 4a4dde0f19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 175 additions and 0 deletions

View File

@ -0,0 +1,174 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.store.dao
import java.util.concurrent.atomic.AtomicLong
import akka.NotUsed
import akka.stream.scaladsl.{Sink, Source}
import com.daml.ledger.participant.state.v1.Offset
import com.daml.lf.ledger.EventId
import com.daml.lf.transaction.Node
import com.daml.lf.transaction.Node.{KeyWithMaintainers, NodeCreate, NodeExercises}
import com.daml.lf.value.Value
import com.daml.platform.store.appendonlydao.events.{ContractId, NodeId}
import com.daml.platform.store.entries.LedgerEntry
import com.daml.platform.store.interfaces.TransactionLogUpdate
import org.scalatest._
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers
import scala.concurrent.Future
private[dao] trait JdbcLedgerDaoTransactionLogUpdatesSpec
extends OptionValues
with Inside
with LoneElement {
this: AsyncFlatSpec with Matchers with JdbcLedgerDaoSuite =>
behavior of "JdbcLedgerDao (getTransactionLogUpdates)"
it should "return only the ledger end marker if no new transactions in range" in {
for {
ledgerEnd @ (offset, eventSequentialId) <- ledgerDao.lookupLedgerEndOffsetAndSequentialId()
result <- transactionsOf(
ledgerDao.transactionsReader
.getTransactionLogUpdates(
startExclusive = ledgerEnd,
endInclusive = ledgerEnd,
)
)
} yield {
result should contain theSameElementsAs Seq(
TransactionLogUpdate.LedgerEndMarker(offset, eventSequentialId)
)
Succeeded
}
}
it should "return the correct transaction log updates" in {
for {
from <- ledgerDao.lookupLedgerEndOffsetAndSequentialId()
(offset1, t1) <- store(singleCreate)
(offset2, t2) <- store(txCreateContractWithKey(alice, "some-key"))
(offset3, t3) <- store(
singleExercise(
nonTransient(t2).loneElement,
Some(KeyWithMaintainers(someContractKey(alice, "some-key"), Set(alice))),
)
)
(offset4, t4) <- store(fullyTransient())
(offset5, t5) <- store(singleCreate)
(offset6, t6) <- store(singleNonConsumingExercise(nonTransient(t5).loneElement))
to <- ledgerDao.lookupLedgerEndOffsetAndSequentialId()
result <- transactionsOf(
ledgerDao.transactionsReader
.getTransactionLogUpdates(
startExclusive = from,
endInclusive = to,
)
)
} yield {
val (
Seq(
actualTx1: TransactionLogUpdate.Transaction,
actualTx2: TransactionLogUpdate.Transaction,
actualTx3: TransactionLogUpdate.Transaction,
actualTx4: TransactionLogUpdate.Transaction,
actualTx5: TransactionLogUpdate.Transaction,
actualTx6: TransactionLogUpdate.Transaction,
),
Seq(endMarker: TransactionLogUpdate.LedgerEndMarker),
) = result.splitAt(6)
val contractKey =
t2.transaction.nodes.head._2.asInstanceOf[NodeCreate[ContractId]].key.get.key
val exercisedContractKey = Map(offset2 -> contractKey, offset3 -> contractKey)
val eventSequentialIdGen = new AtomicLong(from._2 + 1L)
assertExpectedEquality(actualTx1, t1, offset1, exercisedContractKey, eventSequentialIdGen)
assertExpectedEquality(actualTx2, t2, offset2, exercisedContractKey, eventSequentialIdGen)
assertExpectedEquality(actualTx3, t3, offset3, exercisedContractKey, eventSequentialIdGen)
assertExpectedEquality(actualTx4, t4, offset4, exercisedContractKey, eventSequentialIdGen)
assertExpectedEquality(actualTx5, t5, offset5, exercisedContractKey, eventSequentialIdGen)
assertExpectedEquality(actualTx6, t6, offset6, exercisedContractKey, eventSequentialIdGen)
endMarker.eventOffset shouldBe to._1
endMarker.eventSequentialId shouldBe to._2
}
}
private def assertExpectedEquality(
actual: TransactionLogUpdate.Transaction,
expected: LedgerEntry.Transaction,
expectedOffset: Offset,
exercisedContractKey: Map[Offset, Value[ContractId]],
eventSequentialIdRef: AtomicLong,
): Unit = {
actual.transactionId shouldBe expected.transactionId
actual.workflowId shouldBe expected.workflowId.value
actual.commandId shouldBe expected.commandId.value
actual.effectiveAt shouldBe expected.ledgerEffectiveTime
actual.offset shouldBe expectedOffset
val actualEventsById = actual.events.map(ev => ev.eventId -> ev).toMap
actualEventsById.size shouldBe expected.transaction.nodes.size
expected.transaction.nodes.toVector.sortBy(_._1.index).foreach { case (nodeId, value) =>
value match {
case nodeCreate: NodeCreate[ContractId] =>
val expectedEventId = EventId(expected.transactionId, nodeId)
val Some(actualCreated: TransactionLogUpdate.CreatedEvent) =
actualEventsById.get(expectedEventId)
actualCreated.contractId shouldBe nodeCreate.coid
actualCreated.templateId shouldBe nodeCreate.templateId
actualCreated.treeEventWitnesses shouldBe nodeCreate.informeesOfNode
actualCreated.flatEventWitnesses shouldBe nodeCreate.stakeholders
actualCreated.createSignatories shouldBe nodeCreate.signatories
actualCreated.createObservers shouldBe (nodeCreate.stakeholders diff nodeCreate.signatories)
actualCreated.createArgument.value shouldBe nodeCreate.arg
actualCreated.createAgreementText.value shouldBe nodeCreate.agreementText
actualCreated.nodeIndex shouldBe nodeId.index
actualCreated.eventSequentialId shouldBe eventSequentialIdRef.getAndIncrement()
case nodeExercises: NodeExercises[NodeId, ContractId] =>
val expectedEventId = EventId(expected.transactionId, nodeId)
val Some(actualExercised: TransactionLogUpdate.ExercisedEvent) =
actualEventsById.get(expectedEventId)
actualExercised.contractId shouldBe nodeExercises.targetCoid
actualExercised.templateId shouldBe nodeExercises.templateId
if (actualExercised.consuming)
actualExercised.flatEventWitnesses shouldBe nodeExercises.stakeholders
else
actualExercised.flatEventWitnesses shouldBe empty
actualExercised.treeEventWitnesses shouldBe nodeExercises.informeesOfNode
actualExercised.exerciseArgument.value shouldBe nodeExercises.chosenValue
actualExercised.exerciseResult.map(_.value) shouldBe nodeExercises.exerciseResult
actualExercised.consuming shouldBe nodeExercises.consuming
actualExercised.choice shouldBe nodeExercises.choiceId
actualExercised.children should contain theSameElementsAs nodeExercises.children
.map(_.toString)
.toIndexedSeq
actualExercised.actingParties shouldBe nodeExercises.actingParties
actualExercised.nodeIndex shouldBe nodeId.index
actualExercised.contractKey.map(_.value) shouldBe exercisedContractKey.get(
actual.offset
)
actualExercised.eventSequentialId shouldBe eventSequentialIdRef.getAndIncrement()
case Node.NodeRollback(_) => ()
case _ => ()
}
}
()
}
private def transactionsOf(
source: Source[((Offset, Long), TransactionLogUpdate), NotUsed]
): Future[Seq[TransactionLogUpdate]] =
source
.map(_._2)
.runWith(Sink.seq)
}

View File

@ -24,4 +24,5 @@ final class JdbcLedgerDaoPostgresqlAppendOnlySpec
with JdbcLedgerDaoTransactionTreesSpec
with JdbcLedgerDaoContractEventsStreamSpec
with JdbcLedgerDaoTransactionsWriterSpec
with JdbcLedgerDaoTransactionLogUpdatesSpec
with JdbcAppendOnlyTransactionInsertion