mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-19 16:57:40 +03:00
kvutils: Extract out a common type for the ledger state readers. (#8264)
CHANGELOG_BEGIN CHANGELOG_END
This commit is contained in:
parent
44c5b8a777
commit
c15071e8ca
@ -98,6 +98,7 @@ da_scala_library(
|
||||
"@maven//:com_typesafe_akka_akka_stream_2_12",
|
||||
"@maven//:io_dropwizard_metrics_metrics_core",
|
||||
"@maven//:org_mockito_mockito_core",
|
||||
"@maven//:org_mockito_mockito_scala_2_12",
|
||||
"@maven//:org_scala_lang_modules_scala_java8_compat_2_12",
|
||||
"@maven//:org_scalactic_scalactic_2_12",
|
||||
"@maven//:org_scalatest_scalatest_2_12",
|
||||
|
@ -3,27 +3,12 @@
|
||||
|
||||
package com.daml.ledger.validator
|
||||
|
||||
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue}
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
||||
/** Defines how we read from an underlying ledger, using internal kvutils types.
|
||||
* This is the interface that the validator works against. However, ledger
|
||||
* integrations need not implement this directly.
|
||||
* You can create a DamlLedgerStateReader instance via the factory methods
|
||||
* available in [[com.daml.ledger.validator.batch.BatchedSubmissionValidatorFactory]]
|
||||
* We're required to work at this level of abstraction in order to implement
|
||||
* efficient caching (e.g. package DamlStateValue is too large to be always
|
||||
* decompressed and deserialized from bytes).
|
||||
*/
|
||||
trait DamlLedgerStateReader {
|
||||
def readState(keys: Seq[DamlStateKey]): Future[Seq[Option[DamlStateValue]]]
|
||||
}
|
||||
import com.daml.ledger.validator.reading.{DamlLedgerStateReader, LedgerStateReader}
|
||||
|
||||
object DamlLedgerStateReader {
|
||||
def from(
|
||||
ledgerStateReader: LedgerStateReader,
|
||||
keySerializationStrategy: StateKeySerializationStrategy,
|
||||
)(implicit executionContext: ExecutionContext): DamlLedgerStateReader =
|
||||
): DamlLedgerStateReader =
|
||||
new RawToDamlLedgerStateReaderAdapter(ledgerStateReader, keySerializationStrategy)
|
||||
}
|
||||
|
@ -5,18 +5,20 @@ package com.daml.ledger.validator
|
||||
|
||||
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue}
|
||||
import com.daml.ledger.participant.state.kvutils.Envelope
|
||||
import com.daml.ledger.validator.reading.{DamlLedgerStateReader, LedgerStateReader}
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
||||
final class RawToDamlLedgerStateReaderAdapter(
|
||||
ledgerStateReader: LedgerStateReader,
|
||||
keySerializationStrategy: StateKeySerializationStrategy,
|
||||
)(implicit executionContext: ExecutionContext)
|
||||
extends DamlLedgerStateReader {
|
||||
) extends DamlLedgerStateReader {
|
||||
|
||||
import RawToDamlLedgerStateReaderAdapter.deserializeDamlStateValue
|
||||
|
||||
override def readState(keys: Seq[DamlStateKey]): Future[Seq[Option[DamlStateValue]]] =
|
||||
override def read(
|
||||
keys: Seq[DamlStateKey]
|
||||
)(implicit executionContext: ExecutionContext): Future[Seq[Option[DamlStateValue]]] =
|
||||
ledgerStateReader
|
||||
.read(keys.map(keySerializationStrategy.serializeStateKey))
|
||||
.map(_.map(_.map(deserializeDamlStateValue)))
|
||||
|
@ -21,6 +21,7 @@ import com.daml.ledger.participant.state.v1.ParticipantId
|
||||
import com.daml.ledger.validator
|
||||
import com.daml.ledger.validator.SubmissionValidator.LogEntryAndState
|
||||
import com.daml.ledger.validator._
|
||||
import com.daml.ledger.validator.reading.DamlLedgerStateReader
|
||||
import com.daml.lf.data.Time
|
||||
import com.daml.lf.data.Time.Timestamp
|
||||
import com.daml.logging.LoggingContext.newLoggingContext
|
||||
@ -115,7 +116,7 @@ class BatchedSubmissionValidator[CommitResult] private[validator] (
|
||||
recordTimeInstant: Instant,
|
||||
participantId: ParticipantId,
|
||||
ledgerStateReader: DamlLedgerStateReader,
|
||||
commitStrategy: CommitStrategy[CommitResult]
|
||||
commitStrategy: CommitStrategy[CommitResult],
|
||||
)(implicit materializer: Materializer, executionContext: ExecutionContext): Future[Unit] =
|
||||
withCorrelationIdLogged(correlationId) { implicit loggingContext =>
|
||||
val recordTime = Time.Timestamp.assertFromInstant(recordTimeInstant)
|
||||
@ -333,7 +334,7 @@ class BatchedSubmissionValidator[CommitResult] private[validator] (
|
||||
metrics.fetchInputs,
|
||||
metrics.fetchInputsRunning,
|
||||
ledgerStateReader
|
||||
.readState(inputKeys)
|
||||
.read(inputKeys)
|
||||
.map { values =>
|
||||
(correlatedSubmission, inputKeys.zip(values).toMap)
|
||||
}
|
||||
|
@ -12,12 +12,12 @@ import com.daml.ledger.validator.caching.{
|
||||
CachingDamlLedgerStateReader,
|
||||
QueryableReadSet
|
||||
}
|
||||
import com.daml.ledger.validator.reading.{DamlLedgerStateReader, LedgerStateReader}
|
||||
import com.daml.ledger.validator.{
|
||||
CommitStrategy,
|
||||
DamlLedgerStateReader,
|
||||
DefaultStateKeySerializationStrategy,
|
||||
LedgerStateOperations,
|
||||
LedgerStateReader,
|
||||
LogAppendingCommitStrategy,
|
||||
StateKeySerializationStrategy
|
||||
}
|
||||
|
@ -10,8 +10,9 @@ import com.daml.caching.Cache
|
||||
import com.daml.ledger.participant.state.kvutils.Bytes
|
||||
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue}
|
||||
import com.daml.ledger.participant.state.v1.{ParticipantId, SubmissionResult}
|
||||
import com.daml.ledger.validator.caching.{CacheUpdatePolicy, ImmutablesOnlyCacheUpdatePolicy}
|
||||
import com.daml.ledger.validator._
|
||||
import com.daml.ledger.validator.caching.{CacheUpdatePolicy, ImmutablesOnlyCacheUpdatePolicy}
|
||||
import com.daml.ledger.validator.reading.DamlLedgerStateReader
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.util.{Failure, Success}
|
||||
|
@ -7,11 +7,8 @@ import com.daml.caching.Cache
|
||||
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue}
|
||||
import com.daml.ledger.validator.LedgerStateOperations.Key
|
||||
import com.daml.ledger.validator.caching.CachingDamlLedgerStateReader.StateCache
|
||||
import com.daml.ledger.validator.{
|
||||
DamlLedgerStateReader,
|
||||
LedgerStateReader,
|
||||
StateKeySerializationStrategy
|
||||
}
|
||||
import com.daml.ledger.validator.reading.{DamlLedgerStateReader, LedgerStateReader}
|
||||
import com.daml.ledger.validator.{DamlLedgerStateReader, StateKeySerializationStrategy}
|
||||
|
||||
import scala.collection.mutable
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
@ -33,17 +30,22 @@ final class CachingDamlLedgerStateReader(
|
||||
shouldCache: DamlStateKey => Boolean,
|
||||
keySerializationStrategy: StateKeySerializationStrategy,
|
||||
delegate: DamlLedgerStateReader,
|
||||
)(implicit executionContext: ExecutionContext)
|
||||
extends DamlLedgerStateReader
|
||||
) extends DamlLedgerStateReader
|
||||
with QueryableReadSet {
|
||||
|
||||
private val readSet = mutable.Set.empty[DamlStateKey]
|
||||
|
||||
override def getReadSet: Set[Key] =
|
||||
this.synchronized { readSet.map(keySerializationStrategy.serializeStateKey).toSet }
|
||||
this.synchronized {
|
||||
readSet.map(keySerializationStrategy.serializeStateKey).toSet
|
||||
}
|
||||
|
||||
override def readState(keys: Seq[DamlStateKey]): Future[Seq[Option[DamlStateValue]]] = {
|
||||
this.synchronized { readSet ++= keys }
|
||||
override def read(
|
||||
keys: Seq[DamlStateKey]
|
||||
)(implicit executionContext: ExecutionContext): Future[Seq[Option[DamlStateValue]]] = {
|
||||
this.synchronized {
|
||||
readSet ++= keys
|
||||
}
|
||||
@SuppressWarnings(Array("org.wartremover.warts.Any")) // Required to make `.view` work.
|
||||
val cachedValues = keys.view
|
||||
.map(key => key -> cache.getIfPresent(key))
|
||||
@ -52,7 +54,7 @@ final class CachingDamlLedgerStateReader(
|
||||
val keysToRead = keys.toSet -- cachedValues.keySet
|
||||
if (keysToRead.nonEmpty) {
|
||||
delegate
|
||||
.readState(keysToRead.toSeq)
|
||||
.read(keysToRead.toSeq)
|
||||
.map { readStateValues =>
|
||||
val readValues = keysToRead.zip(readStateValues).toMap
|
||||
readValues.collect {
|
||||
@ -78,7 +80,7 @@ object CachingDamlLedgerStateReader {
|
||||
cachingPolicy: CacheUpdatePolicy,
|
||||
ledgerStateOperations: LedgerStateReader,
|
||||
keySerializationStrategy: StateKeySerializationStrategy,
|
||||
)(implicit executionContext: ExecutionContext): CachingDamlLedgerStateReader =
|
||||
): CachingDamlLedgerStateReader =
|
||||
new CachingDamlLedgerStateReader(
|
||||
cache,
|
||||
cachingPolicy.shouldCacheOnRead,
|
||||
|
@ -1,18 +1,22 @@
|
||||
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.ledger.validator
|
||||
|
||||
import com.daml.ledger.validator.LedgerStateOperations.{Key, Value}
|
||||
package com.daml.ledger.validator.reading
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
||||
trait LedgerStateReader {
|
||||
/**
|
||||
* Generic interface for reading from the ledger.
|
||||
*
|
||||
* @tparam Key The type of the key expected.
|
||||
* @tparam Value The type of the value returned.
|
||||
*/
|
||||
trait StateReader[Key, Value] {
|
||||
|
||||
/**
|
||||
* Reads values of a set of keys from the backing store.
|
||||
*
|
||||
* @param keys list of keys to look up data for
|
||||
* @param keys list of keys to look up
|
||||
* @return values corresponding to the requested keys, in the same order as requested
|
||||
*/
|
||||
def read(keys: Seq[Key])(implicit executionContext: ExecutionContext): Future[Seq[Option[Value]]]
|
@ -0,0 +1,14 @@
|
||||
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.ledger.validator
|
||||
|
||||
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue}
|
||||
|
||||
package object reading {
|
||||
|
||||
type LedgerStateReader = StateReader[LedgerStateOperations.Key, LedgerStateOperations.Value]
|
||||
|
||||
type DamlLedgerStateReader = StateReader[DamlStateKey, DamlStateValue]
|
||||
|
||||
}
|
@ -0,0 +1,24 @@
|
||||
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.ledger.validator
|
||||
|
||||
import org.mockito.{ArgumentMatcher, ArgumentMatchersSugar}
|
||||
|
||||
import scala.concurrent.ExecutionContext
|
||||
|
||||
trait ArgumentMatchers {
|
||||
|
||||
import ArgumentMatchersSugar._
|
||||
|
||||
def anyExecutionContext: ExecutionContext = any[ExecutionContext]
|
||||
|
||||
def seqOf[T](size: Int): Seq[T] =
|
||||
argThat[Seq[T]](new ArgumentMatcher[Seq[T]] {
|
||||
override def matches(argument: Seq[T]): Boolean = argument.size == size
|
||||
|
||||
override def toString: String = s"seq of size $size"
|
||||
})
|
||||
}
|
||||
|
||||
object ArgumentMatchers extends ArgumentMatchers
|
@ -5,6 +5,7 @@ package com.daml.ledger.validator
|
||||
|
||||
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue}
|
||||
import com.daml.ledger.participant.state.kvutils.Envelope
|
||||
import com.daml.ledger.validator.ArgumentMatchers.anyExecutionContext
|
||||
import com.daml.ledger.validator.LedgerStateOperations.{Key, Value}
|
||||
import com.daml.ledger.validator.LogAppendingCommitStrategySpec._
|
||||
import com.daml.ledger.validator.TestHelper._
|
||||
@ -13,7 +14,7 @@ import org.mockito.{ArgumentMatchersSugar, MockitoSugar}
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
import org.scalatest.wordspec.AsyncWordSpec
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.concurrent.Future
|
||||
|
||||
final class LogAppendingCommitStrategySpec
|
||||
extends AsyncWordSpec
|
||||
@ -77,11 +78,6 @@ final class LogAppendingCommitStrategySpec
|
||||
}
|
||||
|
||||
object LogAppendingCommitStrategySpec {
|
||||
|
||||
import ArgumentMatchersSugar._
|
||||
|
||||
private def anyExecutionContext = any[ExecutionContext]
|
||||
|
||||
private val aStateKey: DamlStateKey = DamlStateKey
|
||||
.newBuilder()
|
||||
.setContractId(1.toString)
|
||||
|
@ -9,14 +9,16 @@ import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
|
||||
DamlStateValue
|
||||
}
|
||||
import com.daml.ledger.participant.state.kvutils.Envelope
|
||||
import com.daml.ledger.validator.ArgumentMatchers.anyExecutionContext
|
||||
import com.daml.ledger.validator.LedgerStateOperations.Key
|
||||
import com.daml.ledger.validator.RawToDamlLedgerStateReaderAdapterSpec._
|
||||
import com.daml.ledger.validator.TestHelper.{anInvalidEnvelope, makePartySubmission}
|
||||
import com.daml.ledger.validator.reading.LedgerStateReader
|
||||
import org.mockito.{ArgumentMatchersSugar, MockitoSugar}
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
import org.scalatest.wordspec.AsyncWordSpec
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.concurrent.Future
|
||||
|
||||
class RawToDamlLedgerStateReaderAdapterSpec
|
||||
extends AsyncWordSpec
|
||||
@ -35,7 +37,7 @@ class RawToDamlLedgerStateReaderAdapterSpec
|
||||
val instance =
|
||||
new RawToDamlLedgerStateReaderAdapter(mockReader, DefaultStateKeySerializationStrategy)
|
||||
|
||||
instance.readState(Seq(aDamlStateKey())).map { actual =>
|
||||
instance.read(Seq(aDamlStateKey())).map { actual =>
|
||||
verify(mockReader, times(1)).read(Seq(expectedKey))
|
||||
actual shouldBe Seq(Some(expectedValue))
|
||||
}
|
||||
@ -48,7 +50,7 @@ class RawToDamlLedgerStateReaderAdapterSpec
|
||||
val instance =
|
||||
new RawToDamlLedgerStateReaderAdapter(mockReader, DefaultStateKeySerializationStrategy)
|
||||
|
||||
instance.readState(Seq(aDamlStateKey())).failed.map { actual =>
|
||||
instance.read(Seq(aDamlStateKey())).failed.map { actual =>
|
||||
actual shouldBe a[RuntimeException]
|
||||
actual.getLocalizedMessage should include("Opening enveloped")
|
||||
}
|
||||
@ -62,7 +64,7 @@ class RawToDamlLedgerStateReaderAdapterSpec
|
||||
val instance =
|
||||
new RawToDamlLedgerStateReaderAdapter(mockReader, DefaultStateKeySerializationStrategy)
|
||||
|
||||
instance.readState(Seq(aDamlStateKey())).failed.map { actual =>
|
||||
instance.read(Seq(aDamlStateKey())).failed.map { actual =>
|
||||
actual shouldBe a[RuntimeException]
|
||||
actual.getLocalizedMessage should include("Opening enveloped")
|
||||
}
|
||||
@ -71,11 +73,6 @@ class RawToDamlLedgerStateReaderAdapterSpec
|
||||
}
|
||||
|
||||
object RawToDamlLedgerStateReaderAdapterSpec {
|
||||
|
||||
import ArgumentMatchersSugar._
|
||||
|
||||
private def anyExecutionContext = any[ExecutionContext]
|
||||
|
||||
private def aDamlStateKey(): DamlStateKey =
|
||||
DamlStateKey.newBuilder
|
||||
.setContractId("aContractId")
|
||||
|
@ -11,6 +11,7 @@ import com.daml.ledger.participant.state.kvutils.DamlKvutils._
|
||||
import com.daml.ledger.participant.state.kvutils.MockitoHelpers.captor
|
||||
import com.daml.ledger.participant.state.kvutils.{Bytes, Envelope, KeyValueCommitting}
|
||||
import com.daml.ledger.participant.state.v1.ParticipantId
|
||||
import com.daml.ledger.validator.ArgumentMatchers.anyExecutionContext
|
||||
import com.daml.ledger.validator.LedgerStateOperations.{Key, Value}
|
||||
import com.daml.ledger.validator.SubmissionValidator.RawKeyValuePairs
|
||||
import com.daml.ledger.validator.SubmissionValidatorSpec._
|
||||
@ -291,12 +292,8 @@ class SubmissionValidatorSpec
|
||||
}
|
||||
|
||||
object SubmissionValidatorSpec {
|
||||
|
||||
import ArgumentMatchersSugar._
|
||||
import MockitoSugar._
|
||||
|
||||
private def anyExecutionContext = any[ExecutionContext]
|
||||
|
||||
private def aLogEntry(): DamlLogEntry =
|
||||
DamlLogEntry
|
||||
.newBuilder()
|
||||
|
@ -15,15 +15,16 @@ import com.daml.ledger.participant.state.kvutils.export.{
|
||||
}
|
||||
import com.daml.ledger.participant.state.kvutils.{Envelope, KeyValueCommitting}
|
||||
import com.daml.ledger.participant.state.v1.ParticipantId
|
||||
import com.daml.ledger.validator.ArgumentMatchers.{anyExecutionContext, seqOf}
|
||||
import com.daml.ledger.validator.TestHelper.{aParticipantId, anInvalidEnvelope, makePartySubmission}
|
||||
import com.daml.ledger.validator.{CommitStrategy, DamlLedgerStateReader, ValidationFailed}
|
||||
import com.daml.ledger.validator.batch.BatchedSubmissionValidatorSpec._
|
||||
import com.daml.ledger.validator.reading.DamlLedgerStateReader
|
||||
import com.daml.ledger.validator.{CommitStrategy, ValidationFailed}
|
||||
import com.daml.lf.data.Time.Timestamp
|
||||
import com.daml.lf.engine.Engine
|
||||
import com.daml.metrics.Metrics
|
||||
import com.google.protobuf.ByteString
|
||||
import org.mockito.ArgumentCaptor
|
||||
import org.mockito.ArgumentMatchers.{any, argThat}
|
||||
import org.mockito.MockitoSugar
|
||||
import org.mockito.{ArgumentCaptor, ArgumentMatchersSugar, MockitoSugar}
|
||||
import org.scalatest.Inside
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
import org.scalatest.wordspec.AsyncWordSpec
|
||||
@ -36,7 +37,8 @@ class BatchedSubmissionValidatorSpec
|
||||
with Matchers
|
||||
with Inside
|
||||
with AkkaBeforeAndAfterAll
|
||||
with MockitoSugar {
|
||||
with MockitoSugar
|
||||
with ArgumentMatchersSugar {
|
||||
|
||||
private val engine = Engine.DevEngine()
|
||||
private val metrics = new Metrics(new MetricRegistry)
|
||||
@ -127,7 +129,7 @@ class BatchedSubmissionValidatorSpec
|
||||
val mockCommit = mock[CommitStrategy[Unit]]
|
||||
val partySubmission = makePartySubmission("foo")
|
||||
// Expect two keys, i.e., to retrieve the party and submission dedup values.
|
||||
when(mockLedgerStateReader.readState(argThat((keys: Seq[DamlStateKey]) => keys.size == 2)))
|
||||
when(mockLedgerStateReader.read(seqOf(size = 2))(anyExecutionContext))
|
||||
.thenReturn(Future.successful(Seq(None, None)))
|
||||
val logEntryCaptor = ArgumentCaptor.forClass(classOf[DamlLogEntry])
|
||||
val outputStateCaptor = ArgumentCaptor.forClass(classOf[Map[DamlStateKey, DamlStateValue]])
|
||||
@ -174,7 +176,7 @@ class BatchedSubmissionValidatorSpec
|
||||
val (submissions, _, batchSubmissionBytes) = createBatchSubmissionOf(1000)
|
||||
val mockLedgerStateReader = mock[DamlLedgerStateReader]
|
||||
// Expect two keys, i.e., to retrieve the party and submission dedup values.
|
||||
when(mockLedgerStateReader.readState(argThat((keys: Seq[DamlStateKey]) => keys.size == 2)))
|
||||
when(mockLedgerStateReader.read(seqOf(size = 2))(anyExecutionContext))
|
||||
.thenReturn(Future.successful(Seq(None, None)))
|
||||
val logEntryCaptor = ArgumentCaptor.forClass(classOf[DamlLogEntry])
|
||||
val outputStateCaptor = ArgumentCaptor.forClass(classOf[Map[DamlStateKey, DamlStateValue]])
|
||||
@ -204,7 +206,8 @@ class BatchedSubmissionValidatorSpec
|
||||
)
|
||||
.map { _ =>
|
||||
// We expected two state fetches and two commits.
|
||||
verify(mockLedgerStateReader, times(1000)).readState(any[Seq[DamlStateKey]]())
|
||||
verify(mockLedgerStateReader, times(1000))
|
||||
.read(any[Seq[DamlStateKey]])(anyExecutionContext)
|
||||
verify(mockCommit, times(1000)).commit(
|
||||
any[ParticipantId],
|
||||
any[String],
|
||||
@ -240,7 +243,7 @@ class BatchedSubmissionValidatorSpec
|
||||
.build()
|
||||
val mockLedgerStateReader = mock[DamlLedgerStateReader]
|
||||
// Expect two keys, i.e., to retrieve the party and submission dedup values.
|
||||
when(mockLedgerStateReader.readState(argThat((keys: Seq[DamlStateKey]) => keys.size == 2)))
|
||||
when(mockLedgerStateReader.read(seqOf(size = 2))(anyExecutionContext))
|
||||
.thenReturn(Future.successful(Seq(None, None)))
|
||||
val mockCommit = mock[CommitStrategy[Unit]]
|
||||
when(
|
||||
@ -288,7 +291,7 @@ class BatchedSubmissionValidatorSpec
|
||||
val (submissions, batchSubmission, batchSubmissionBytes) = createBatchSubmissionOf(2)
|
||||
val mockLedgerStateReader = mock[DamlLedgerStateReader]
|
||||
// Expect two keys, i.e., to retrieve the party and submission dedup values.
|
||||
when(mockLedgerStateReader.readState(argThat((keys: Seq[DamlStateKey]) => keys.size == 2)))
|
||||
when(mockLedgerStateReader.read(seqOf(size = 2))(anyExecutionContext))
|
||||
.thenReturn(Future.successful(Seq(None, None)))
|
||||
val mockCommit = mock[CommitStrategy[Unit]]
|
||||
when(
|
||||
@ -327,15 +330,18 @@ class BatchedSubmissionValidatorSpec
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object BatchedSubmissionValidatorSpec {
|
||||
|
||||
type DamlInputState = Map[DamlStateKey, Option[DamlStateValue]]
|
||||
type DamlOutputState = Map[DamlStateKey, DamlStateValue]
|
||||
|
||||
private lazy val aCorrelationId: String = "aCorrelationId"
|
||||
|
||||
private def newRecordTime(): Timestamp =
|
||||
Timestamp.assertFromInstant(Clock.systemUTC().instant())
|
||||
|
||||
private lazy val aCorrelationId: String = "aCorrelationId"
|
||||
|
||||
private def createBatchSubmissionOf(
|
||||
nSubmissions: Int): (Seq[DamlSubmission], DamlSubmissionBatch, ByteString) = {
|
||||
val submissions = (1 to nSubmissions).map { n =>
|
||||
|
@ -9,12 +9,12 @@ import akka.stream.Materializer
|
||||
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
|
||||
import com.daml.ledger.participant.state.v1.{ParticipantId, SubmissionResult}
|
||||
import com.daml.ledger.validator.TestHelper.aParticipantId
|
||||
import com.daml.ledger.validator.{CommitStrategy, DamlLedgerStateReader, LedgerStateOperations}
|
||||
import com.daml.ledger.validator.reading.DamlLedgerStateReader
|
||||
import com.daml.ledger.validator.{CommitStrategy, LedgerStateOperations}
|
||||
import com.google.protobuf.ByteString
|
||||
import org.mockito.ArgumentMatchers.{any, anyString}
|
||||
import org.mockito.MockitoSugar
|
||||
import org.mockito.stubbing.ScalaFirstStubbing
|
||||
// import org.mockito.stubbing.OngoingStubbing
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
import org.scalatest.wordspec.AsyncWordSpec
|
||||
|
||||
|
@ -6,13 +6,16 @@ package com.daml.ledger.validator.caching
|
||||
import com.daml.caching.WeightedCache
|
||||
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue}
|
||||
import com.daml.ledger.participant.state.kvutils.caching.`Message Weight`
|
||||
import com.daml.ledger.validator.{DamlLedgerStateReader, DefaultStateKeySerializationStrategy}
|
||||
import com.daml.ledger.validator.ArgumentMatchers.{anyExecutionContext, seqOf}
|
||||
import com.daml.ledger.validator.DefaultStateKeySerializationStrategy
|
||||
import com.daml.ledger.validator.caching.CachingDamlLedgerStateReaderSpec._
|
||||
import com.daml.ledger.validator.reading.DamlLedgerStateReader
|
||||
import org.mockito.{ArgumentMatchersSugar, MockitoSugar}
|
||||
import org.scalatest.Inside
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
import org.scalatest.wordspec.AsyncWordSpec
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.concurrent.Future
|
||||
|
||||
class CachingDamlLedgerStateReaderSpec
|
||||
extends AsyncWordSpec
|
||||
@ -20,15 +23,14 @@ class CachingDamlLedgerStateReaderSpec
|
||||
with Inside
|
||||
with MockitoSugar
|
||||
with ArgumentMatchersSugar {
|
||||
|
||||
"readState" should {
|
||||
"read" should {
|
||||
"record read keys" in {
|
||||
val mockReader = mock[DamlLedgerStateReader]
|
||||
when(mockReader.readState(argThat((keys: Seq[DamlStateKey]) => keys.size == 1)))
|
||||
.thenReturn(Future.successful(Seq(Some(aDamlStateValue()))))
|
||||
when(mockReader.read(seqOf(size = 1))(anyExecutionContext))
|
||||
.thenReturn(Future.successful(Seq(Some(aDamlStateValue))))
|
||||
val instance = newInstance(mockReader, shouldCache = false)
|
||||
|
||||
instance.readState(Seq(aDamlStateKey)).map { actual =>
|
||||
instance.read(Seq(aDamlStateKey)).map { actual =>
|
||||
actual should have size 1
|
||||
instance.getReadSet should be(
|
||||
Set(keySerializationStrategy.serializeStateKey(aDamlStateKey)))
|
||||
@ -37,51 +39,56 @@ class CachingDamlLedgerStateReaderSpec
|
||||
|
||||
"update cache upon read if policy allows" in {
|
||||
val mockReader = mock[DamlLedgerStateReader]
|
||||
when(mockReader.readState(argThat((keys: Seq[DamlStateKey]) => keys.size == 1)))
|
||||
.thenReturn(Future.successful(Seq(Some(aDamlStateValue()))))
|
||||
when(mockReader.read(seqOf(size = 1))(anyExecutionContext))
|
||||
.thenReturn(Future.successful(Seq(Some(aDamlStateValue))))
|
||||
val instance = newInstance(mockReader, shouldCache = true)
|
||||
|
||||
instance.readState(Seq(aDamlStateKey)).map { _ =>
|
||||
instance.read(Seq(aDamlStateKey)).map { _ =>
|
||||
instance.cache.getIfPresent(aDamlStateKey) shouldBe defined
|
||||
}
|
||||
}
|
||||
|
||||
"do not update cache upon read if policy does not allow" in {
|
||||
val mockReader = mock[DamlLedgerStateReader]
|
||||
when(mockReader.readState(argThat((keys: Seq[DamlStateKey]) => keys.size == 1)))
|
||||
.thenReturn(Future.successful(Seq(Some(aDamlStateValue()))))
|
||||
when(mockReader.read(seqOf(size = 1))(anyExecutionContext))
|
||||
.thenReturn(Future.successful(Seq(Some(aDamlStateValue))))
|
||||
val instance = newInstance(mockReader, shouldCache = false)
|
||||
|
||||
instance.readState(Seq(aDamlStateKey)).map { _ =>
|
||||
instance.read(Seq(aDamlStateKey)).map { _ =>
|
||||
instance.cache.getIfPresent(aDamlStateKey) should not be defined
|
||||
}
|
||||
}
|
||||
|
||||
"serve request from cache for seen key (if policy allows)" in {
|
||||
val mockReader = mock[DamlLedgerStateReader]
|
||||
when(mockReader.readState(any[Seq[DamlStateKey]])).thenReturn(Future.successful(Seq(None)))
|
||||
when(mockReader.read(any[Seq[DamlStateKey]])(anyExecutionContext))
|
||||
.thenReturn(Future.successful(Seq(Some(aDamlStateValue))))
|
||||
val instance = newInstance(mockReader, shouldCache = true)
|
||||
|
||||
for {
|
||||
originalReadState <- instance.readState(Seq(aDamlStateKey))
|
||||
readAgain <- instance.readState(Seq(aDamlStateKey))
|
||||
originalReadState <- instance.read(Seq(aDamlStateKey))
|
||||
readAgain <- instance.read(Seq(aDamlStateKey))
|
||||
} yield {
|
||||
verify(mockReader, times(1)).readState(_)
|
||||
verify(mockReader, times(1)).read(eqTo(Seq(aDamlStateKey)))(anyExecutionContext)
|
||||
readAgain shouldEqual originalReadState
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object CachingDamlLedgerStateReaderSpec {
|
||||
private val keySerializationStrategy = DefaultStateKeySerializationStrategy
|
||||
|
||||
private lazy val aDamlStateKey = DamlStateKey.newBuilder
|
||||
.setContractId("aContractId")
|
||||
.build
|
||||
|
||||
private def aDamlStateValue(): DamlStateValue = DamlStateValue.getDefaultInstance
|
||||
private val aDamlStateValue: DamlStateValue = DamlStateValue.getDefaultInstance
|
||||
|
||||
private def newInstance(damlLedgerStateReader: DamlLedgerStateReader, shouldCache: Boolean)(
|
||||
implicit executionContext: ExecutionContext): CachingDamlLedgerStateReader = {
|
||||
private def newInstance(
|
||||
damlLedgerStateReader: DamlLedgerStateReader,
|
||||
shouldCache: Boolean,
|
||||
): CachingDamlLedgerStateReader = {
|
||||
val cache = WeightedCache.from[DamlStateKey, DamlStateValue](WeightedCache.Configuration(1024))
|
||||
new CachingDamlLedgerStateReader(
|
||||
cache,
|
||||
|
@ -13,9 +13,9 @@ import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
|
||||
}
|
||||
import com.daml.ledger.participant.state.kvutils.caching.`Message Weight`
|
||||
import com.daml.ledger.participant.state.kvutils.{Fingerprint, FingerprintPlaceholder}
|
||||
import com.daml.ledger.validator.ArgumentMatchers.seqOf
|
||||
import com.daml.ledger.validator.caching.CachingDamlLedgerStateReaderWithFingerprints.`Message-Fingerprint Pair Weight`
|
||||
import com.daml.ledger.validator.preexecution.DamlLedgerStateReaderWithFingerprints
|
||||
import org.mockito.ArgumentMatchers.argThat
|
||||
import org.mockito.MockitoSugar
|
||||
import org.scalatest.Inside
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
@ -31,7 +31,7 @@ class CachingDamlLedgerStateReaderWithFingerprintsSpec
|
||||
"read" should {
|
||||
"update cache upon read if policy allows" in {
|
||||
val mockReader = mock[DamlLedgerStateReaderWithFingerprints]
|
||||
when(mockReader.read(argThat((keys: Seq[DamlStateKey]) => keys.size == 1)))
|
||||
when(mockReader.read(seqOf(size = 1)))
|
||||
.thenReturn(Future.successful(Seq((Some(aDamlStateValue()), FingerprintPlaceholder))))
|
||||
val instance = newInstance(mockReader, shouldCache = true)
|
||||
|
||||
@ -42,7 +42,7 @@ class CachingDamlLedgerStateReaderWithFingerprintsSpec
|
||||
|
||||
"do not update cache upon read if policy does not allow" in {
|
||||
val mockReader = mock[DamlLedgerStateReaderWithFingerprints]
|
||||
when(mockReader.read(argThat((keys: Seq[DamlStateKey]) => keys.size == 1)))
|
||||
when(mockReader.read(seqOf(size = 1)))
|
||||
.thenReturn(Future.successful(Seq((Some(aDamlStateValue()), FingerprintPlaceholder))))
|
||||
val instance = newInstance(mockReader, shouldCache = false)
|
||||
|
||||
@ -77,7 +77,7 @@ class CachingDamlLedgerStateReaderWithFingerprintsSpec
|
||||
|
||||
"do not cache None value returned from delegate" in {
|
||||
val mockReader = mock[DamlLedgerStateReaderWithFingerprints]
|
||||
when(mockReader.read(argThat((keys: Seq[DamlStateKey]) => keys.size == 1)))
|
||||
when(mockReader.read(seqOf(size = 1)))
|
||||
.thenReturn(Future.successful(Seq((None, FingerprintPlaceholder))))
|
||||
val instance = newInstance(mockReader, shouldCache = true)
|
||||
|
||||
|
@ -16,11 +16,8 @@ import com.daml.ledger.participant.state.kvutils.Envelope
|
||||
import com.daml.ledger.participant.state.kvutils.tools.integritycheck.IntegrityChecker.bytesAsHexString
|
||||
import com.daml.ledger.validator.LedgerStateOperations.{Key, Value}
|
||||
import com.daml.ledger.validator.batch.BatchedSubmissionValidatorFactory
|
||||
import com.daml.ledger.validator.{
|
||||
CommitStrategy,
|
||||
DamlLedgerStateReader,
|
||||
StateKeySerializationStrategy
|
||||
}
|
||||
import com.daml.ledger.validator.reading.DamlLedgerStateReader
|
||||
import com.daml.ledger.validator.{CommitStrategy, StateKeySerializationStrategy}
|
||||
import com.daml.metrics.Metrics
|
||||
|
||||
import scala.concurrent.ExecutionContext
|
||||
|
@ -7,11 +7,8 @@ import akka.stream.Materializer
|
||||
import com.daml.ledger.participant.state.kvutils.export.WriteSet
|
||||
import com.daml.ledger.participant.state.v1.ReadService
|
||||
import com.daml.ledger.validator.LedgerStateOperations.{Key, Value}
|
||||
import com.daml.ledger.validator.{
|
||||
CommitStrategy,
|
||||
DamlLedgerStateReader,
|
||||
StateKeySerializationStrategy
|
||||
}
|
||||
import com.daml.ledger.validator.reading.DamlLedgerStateReader
|
||||
import com.daml.ledger.validator.{CommitStrategy, StateKeySerializationStrategy}
|
||||
|
||||
trait QueryableWriteSet {
|
||||
def getAndClearRecordedWriteSet(): WriteSet
|
||||
|
Loading…
Reference in New Issue
Block a user