diff --git a/ledger/participant-state/kvutils/BUILD.bazel b/ledger/participant-state/kvutils/BUILD.bazel index 583bd023ee..a5476f3882 100644 --- a/ledger/participant-state/kvutils/BUILD.bazel +++ b/ledger/participant-state/kvutils/BUILD.bazel @@ -98,6 +98,7 @@ da_scala_library( "@maven//:com_typesafe_akka_akka_stream_2_12", "@maven//:io_dropwizard_metrics_metrics_core", "@maven//:org_mockito_mockito_core", + "@maven//:org_mockito_mockito_scala_2_12", "@maven//:org_scala_lang_modules_scala_java8_compat_2_12", "@maven//:org_scalactic_scalactic_2_12", "@maven//:org_scalatest_scalatest_2_12", diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/DamlLedgerStateReader.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/DamlLedgerStateReader.scala index ded07fa675..5c70ea7b28 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/DamlLedgerStateReader.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/DamlLedgerStateReader.scala @@ -3,27 +3,12 @@ package com.daml.ledger.validator -import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue} - -import scala.concurrent.{ExecutionContext, Future} - -/** Defines how we read from an underlying ledger, using internal kvutils types. - * This is the interface that the validator works against. However, ledger - * integrations need not implement this directly. - * You can create a DamlLedgerStateReader instance via the factory methods - * available in [[com.daml.ledger.validator.batch.BatchedSubmissionValidatorFactory]] - * We're required to work at this level of abstraction in order to implement - * efficient caching (e.g. package DamlStateValue is too large to be always - * decompressed and deserialized from bytes). - */ -trait DamlLedgerStateReader { - def readState(keys: Seq[DamlStateKey]): Future[Seq[Option[DamlStateValue]]] -} +import com.daml.ledger.validator.reading.{DamlLedgerStateReader, LedgerStateReader} object DamlLedgerStateReader { def from( ledgerStateReader: LedgerStateReader, keySerializationStrategy: StateKeySerializationStrategy, - )(implicit executionContext: ExecutionContext): DamlLedgerStateReader = + ): DamlLedgerStateReader = new RawToDamlLedgerStateReaderAdapter(ledgerStateReader, keySerializationStrategy) } diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/RawToDamlLedgerStateReaderAdapter.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/RawToDamlLedgerStateReaderAdapter.scala index a42017c93d..d57cc84c0e 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/RawToDamlLedgerStateReaderAdapter.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/RawToDamlLedgerStateReaderAdapter.scala @@ -5,18 +5,20 @@ package com.daml.ledger.validator import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue} import com.daml.ledger.participant.state.kvutils.Envelope +import com.daml.ledger.validator.reading.{DamlLedgerStateReader, LedgerStateReader} import scala.concurrent.{ExecutionContext, Future} final class RawToDamlLedgerStateReaderAdapter( ledgerStateReader: LedgerStateReader, keySerializationStrategy: StateKeySerializationStrategy, -)(implicit executionContext: ExecutionContext) - extends DamlLedgerStateReader { +) extends DamlLedgerStateReader { import RawToDamlLedgerStateReaderAdapter.deserializeDamlStateValue - override def readState(keys: Seq[DamlStateKey]): Future[Seq[Option[DamlStateValue]]] = + override def read( + keys: Seq[DamlStateKey] + )(implicit executionContext: ExecutionContext): Future[Seq[Option[DamlStateValue]]] = ledgerStateReader .read(keys.map(keySerializationStrategy.serializeStateKey)) .map(_.map(_.map(deserializeDamlStateValue))) diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/batch/BatchedSubmissionValidator.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/batch/BatchedSubmissionValidator.scala index fced212ce3..dd10419019 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/batch/BatchedSubmissionValidator.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/batch/BatchedSubmissionValidator.scala @@ -21,6 +21,7 @@ import com.daml.ledger.participant.state.v1.ParticipantId import com.daml.ledger.validator import com.daml.ledger.validator.SubmissionValidator.LogEntryAndState import com.daml.ledger.validator._ +import com.daml.ledger.validator.reading.DamlLedgerStateReader import com.daml.lf.data.Time import com.daml.lf.data.Time.Timestamp import com.daml.logging.LoggingContext.newLoggingContext @@ -115,7 +116,7 @@ class BatchedSubmissionValidator[CommitResult] private[validator] ( recordTimeInstant: Instant, participantId: ParticipantId, ledgerStateReader: DamlLedgerStateReader, - commitStrategy: CommitStrategy[CommitResult] + commitStrategy: CommitStrategy[CommitResult], )(implicit materializer: Materializer, executionContext: ExecutionContext): Future[Unit] = withCorrelationIdLogged(correlationId) { implicit loggingContext => val recordTime = Time.Timestamp.assertFromInstant(recordTimeInstant) @@ -333,7 +334,7 @@ class BatchedSubmissionValidator[CommitResult] private[validator] ( metrics.fetchInputs, metrics.fetchInputsRunning, ledgerStateReader - .readState(inputKeys) + .read(inputKeys) .map { values => (correlatedSubmission, inputKeys.zip(values).toMap) } diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/batch/BatchedSubmissionValidatorFactory.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/batch/BatchedSubmissionValidatorFactory.scala index c3f1d64be0..05090d5a69 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/batch/BatchedSubmissionValidatorFactory.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/batch/BatchedSubmissionValidatorFactory.scala @@ -12,12 +12,12 @@ import com.daml.ledger.validator.caching.{ CachingDamlLedgerStateReader, QueryableReadSet } +import com.daml.ledger.validator.reading.{DamlLedgerStateReader, LedgerStateReader} import com.daml.ledger.validator.{ CommitStrategy, DamlLedgerStateReader, DefaultStateKeySerializationStrategy, LedgerStateOperations, - LedgerStateReader, LogAppendingCommitStrategy, StateKeySerializationStrategy } diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/batch/BatchedValidatingCommitter.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/batch/BatchedValidatingCommitter.scala index 64f0a43d43..45babaa04d 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/batch/BatchedValidatingCommitter.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/batch/BatchedValidatingCommitter.scala @@ -10,8 +10,9 @@ import com.daml.caching.Cache import com.daml.ledger.participant.state.kvutils.Bytes import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue} import com.daml.ledger.participant.state.v1.{ParticipantId, SubmissionResult} -import com.daml.ledger.validator.caching.{CacheUpdatePolicy, ImmutablesOnlyCacheUpdatePolicy} import com.daml.ledger.validator._ +import com.daml.ledger.validator.caching.{CacheUpdatePolicy, ImmutablesOnlyCacheUpdatePolicy} +import com.daml.ledger.validator.reading.DamlLedgerStateReader import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/caching/CachingDamlLedgerStateReader.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/caching/CachingDamlLedgerStateReader.scala index 0fc77416bc..3d29b137a5 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/caching/CachingDamlLedgerStateReader.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/caching/CachingDamlLedgerStateReader.scala @@ -7,11 +7,8 @@ import com.daml.caching.Cache import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue} import com.daml.ledger.validator.LedgerStateOperations.Key import com.daml.ledger.validator.caching.CachingDamlLedgerStateReader.StateCache -import com.daml.ledger.validator.{ - DamlLedgerStateReader, - LedgerStateReader, - StateKeySerializationStrategy -} +import com.daml.ledger.validator.reading.{DamlLedgerStateReader, LedgerStateReader} +import com.daml.ledger.validator.{DamlLedgerStateReader, StateKeySerializationStrategy} import scala.collection.mutable import scala.concurrent.{ExecutionContext, Future} @@ -33,17 +30,22 @@ final class CachingDamlLedgerStateReader( shouldCache: DamlStateKey => Boolean, keySerializationStrategy: StateKeySerializationStrategy, delegate: DamlLedgerStateReader, -)(implicit executionContext: ExecutionContext) - extends DamlLedgerStateReader +) extends DamlLedgerStateReader with QueryableReadSet { private val readSet = mutable.Set.empty[DamlStateKey] override def getReadSet: Set[Key] = - this.synchronized { readSet.map(keySerializationStrategy.serializeStateKey).toSet } + this.synchronized { + readSet.map(keySerializationStrategy.serializeStateKey).toSet + } - override def readState(keys: Seq[DamlStateKey]): Future[Seq[Option[DamlStateValue]]] = { - this.synchronized { readSet ++= keys } + override def read( + keys: Seq[DamlStateKey] + )(implicit executionContext: ExecutionContext): Future[Seq[Option[DamlStateValue]]] = { + this.synchronized { + readSet ++= keys + } @SuppressWarnings(Array("org.wartremover.warts.Any")) // Required to make `.view` work. val cachedValues = keys.view .map(key => key -> cache.getIfPresent(key)) @@ -52,7 +54,7 @@ final class CachingDamlLedgerStateReader( val keysToRead = keys.toSet -- cachedValues.keySet if (keysToRead.nonEmpty) { delegate - .readState(keysToRead.toSeq) + .read(keysToRead.toSeq) .map { readStateValues => val readValues = keysToRead.zip(readStateValues).toMap readValues.collect { @@ -78,7 +80,7 @@ object CachingDamlLedgerStateReader { cachingPolicy: CacheUpdatePolicy, ledgerStateOperations: LedgerStateReader, keySerializationStrategy: StateKeySerializationStrategy, - )(implicit executionContext: ExecutionContext): CachingDamlLedgerStateReader = + ): CachingDamlLedgerStateReader = new CachingDamlLedgerStateReader( cache, cachingPolicy.shouldCacheOnRead, diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/LedgerStateReader.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/reading/StateReader.scala similarity index 62% rename from ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/LedgerStateReader.scala rename to ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/reading/StateReader.scala index 56433fe649..c7cf03de32 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/LedgerStateReader.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/reading/StateReader.scala @@ -1,18 +1,22 @@ // Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. // SPDX-License-Identifier: Apache-2.0 -package com.daml.ledger.validator - -import com.daml.ledger.validator.LedgerStateOperations.{Key, Value} +package com.daml.ledger.validator.reading import scala.concurrent.{ExecutionContext, Future} -trait LedgerStateReader { +/** + * Generic interface for reading from the ledger. + * + * @tparam Key The type of the key expected. + * @tparam Value The type of the value returned. + */ +trait StateReader[Key, Value] { /** * Reads values of a set of keys from the backing store. * - * @param keys list of keys to look up data for + * @param keys list of keys to look up * @return values corresponding to the requested keys, in the same order as requested */ def read(keys: Seq[Key])(implicit executionContext: ExecutionContext): Future[Seq[Option[Value]]] diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/reading/package.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/reading/package.scala new file mode 100644 index 0000000000..036618ab5c --- /dev/null +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/reading/package.scala @@ -0,0 +1,14 @@ +// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.validator + +import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue} + +package object reading { + + type LedgerStateReader = StateReader[LedgerStateOperations.Key, LedgerStateOperations.Value] + + type DamlLedgerStateReader = StateReader[DamlStateKey, DamlStateValue] + +} diff --git a/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/validator/ArgumentMatchers.scala b/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/validator/ArgumentMatchers.scala new file mode 100644 index 0000000000..d4ca1630fe --- /dev/null +++ b/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/validator/ArgumentMatchers.scala @@ -0,0 +1,24 @@ +// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.validator + +import org.mockito.{ArgumentMatcher, ArgumentMatchersSugar} + +import scala.concurrent.ExecutionContext + +trait ArgumentMatchers { + + import ArgumentMatchersSugar._ + + def anyExecutionContext: ExecutionContext = any[ExecutionContext] + + def seqOf[T](size: Int): Seq[T] = + argThat[Seq[T]](new ArgumentMatcher[Seq[T]] { + override def matches(argument: Seq[T]): Boolean = argument.size == size + + override def toString: String = s"seq of size $size" + }) +} + +object ArgumentMatchers extends ArgumentMatchers diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/LogAppendingCommitStrategySpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/LogAppendingCommitStrategySpec.scala index 4126e2ca79..e056e2e03b 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/LogAppendingCommitStrategySpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/LogAppendingCommitStrategySpec.scala @@ -5,6 +5,7 @@ package com.daml.ledger.validator import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue} import com.daml.ledger.participant.state.kvutils.Envelope +import com.daml.ledger.validator.ArgumentMatchers.anyExecutionContext import com.daml.ledger.validator.LedgerStateOperations.{Key, Value} import com.daml.ledger.validator.LogAppendingCommitStrategySpec._ import com.daml.ledger.validator.TestHelper._ @@ -13,7 +14,7 @@ import org.mockito.{ArgumentMatchersSugar, MockitoSugar} import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AsyncWordSpec -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.Future final class LogAppendingCommitStrategySpec extends AsyncWordSpec @@ -77,11 +78,6 @@ final class LogAppendingCommitStrategySpec } object LogAppendingCommitStrategySpec { - - import ArgumentMatchersSugar._ - - private def anyExecutionContext = any[ExecutionContext] - private val aStateKey: DamlStateKey = DamlStateKey .newBuilder() .setContractId(1.toString) diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/RawToDamlLedgerStateReaderAdapterSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/RawToDamlLedgerStateReaderAdapterSpec.scala index 156b362650..94a982c5d8 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/RawToDamlLedgerStateReaderAdapterSpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/RawToDamlLedgerStateReaderAdapterSpec.scala @@ -9,14 +9,16 @@ import com.daml.ledger.participant.state.kvutils.DamlKvutils.{ DamlStateValue } import com.daml.ledger.participant.state.kvutils.Envelope +import com.daml.ledger.validator.ArgumentMatchers.anyExecutionContext import com.daml.ledger.validator.LedgerStateOperations.Key import com.daml.ledger.validator.RawToDamlLedgerStateReaderAdapterSpec._ import com.daml.ledger.validator.TestHelper.{anInvalidEnvelope, makePartySubmission} +import com.daml.ledger.validator.reading.LedgerStateReader import org.mockito.{ArgumentMatchersSugar, MockitoSugar} import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AsyncWordSpec -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.Future class RawToDamlLedgerStateReaderAdapterSpec extends AsyncWordSpec @@ -35,7 +37,7 @@ class RawToDamlLedgerStateReaderAdapterSpec val instance = new RawToDamlLedgerStateReaderAdapter(mockReader, DefaultStateKeySerializationStrategy) - instance.readState(Seq(aDamlStateKey())).map { actual => + instance.read(Seq(aDamlStateKey())).map { actual => verify(mockReader, times(1)).read(Seq(expectedKey)) actual shouldBe Seq(Some(expectedValue)) } @@ -48,7 +50,7 @@ class RawToDamlLedgerStateReaderAdapterSpec val instance = new RawToDamlLedgerStateReaderAdapter(mockReader, DefaultStateKeySerializationStrategy) - instance.readState(Seq(aDamlStateKey())).failed.map { actual => + instance.read(Seq(aDamlStateKey())).failed.map { actual => actual shouldBe a[RuntimeException] actual.getLocalizedMessage should include("Opening enveloped") } @@ -62,7 +64,7 @@ class RawToDamlLedgerStateReaderAdapterSpec val instance = new RawToDamlLedgerStateReaderAdapter(mockReader, DefaultStateKeySerializationStrategy) - instance.readState(Seq(aDamlStateKey())).failed.map { actual => + instance.read(Seq(aDamlStateKey())).failed.map { actual => actual shouldBe a[RuntimeException] actual.getLocalizedMessage should include("Opening enveloped") } @@ -71,11 +73,6 @@ class RawToDamlLedgerStateReaderAdapterSpec } object RawToDamlLedgerStateReaderAdapterSpec { - - import ArgumentMatchersSugar._ - - private def anyExecutionContext = any[ExecutionContext] - private def aDamlStateKey(): DamlStateKey = DamlStateKey.newBuilder .setContractId("aContractId") diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/SubmissionValidatorSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/SubmissionValidatorSpec.scala index 38f6fa5dbe..1f8b9f71ff 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/SubmissionValidatorSpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/SubmissionValidatorSpec.scala @@ -11,6 +11,7 @@ import com.daml.ledger.participant.state.kvutils.DamlKvutils._ import com.daml.ledger.participant.state.kvutils.MockitoHelpers.captor import com.daml.ledger.participant.state.kvutils.{Bytes, Envelope, KeyValueCommitting} import com.daml.ledger.participant.state.v1.ParticipantId +import com.daml.ledger.validator.ArgumentMatchers.anyExecutionContext import com.daml.ledger.validator.LedgerStateOperations.{Key, Value} import com.daml.ledger.validator.SubmissionValidator.RawKeyValuePairs import com.daml.ledger.validator.SubmissionValidatorSpec._ @@ -291,12 +292,8 @@ class SubmissionValidatorSpec } object SubmissionValidatorSpec { - - import ArgumentMatchersSugar._ import MockitoSugar._ - private def anyExecutionContext = any[ExecutionContext] - private def aLogEntry(): DamlLogEntry = DamlLogEntry .newBuilder() diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/batch/BatchedSubmissionValidatorSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/batch/BatchedSubmissionValidatorSpec.scala index 2ccc741707..5584d4311a 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/batch/BatchedSubmissionValidatorSpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/batch/BatchedSubmissionValidatorSpec.scala @@ -15,15 +15,16 @@ import com.daml.ledger.participant.state.kvutils.export.{ } import com.daml.ledger.participant.state.kvutils.{Envelope, KeyValueCommitting} import com.daml.ledger.participant.state.v1.ParticipantId +import com.daml.ledger.validator.ArgumentMatchers.{anyExecutionContext, seqOf} import com.daml.ledger.validator.TestHelper.{aParticipantId, anInvalidEnvelope, makePartySubmission} -import com.daml.ledger.validator.{CommitStrategy, DamlLedgerStateReader, ValidationFailed} +import com.daml.ledger.validator.batch.BatchedSubmissionValidatorSpec._ +import com.daml.ledger.validator.reading.DamlLedgerStateReader +import com.daml.ledger.validator.{CommitStrategy, ValidationFailed} import com.daml.lf.data.Time.Timestamp import com.daml.lf.engine.Engine import com.daml.metrics.Metrics import com.google.protobuf.ByteString -import org.mockito.ArgumentCaptor -import org.mockito.ArgumentMatchers.{any, argThat} -import org.mockito.MockitoSugar +import org.mockito.{ArgumentCaptor, ArgumentMatchersSugar, MockitoSugar} import org.scalatest.Inside import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AsyncWordSpec @@ -36,7 +37,8 @@ class BatchedSubmissionValidatorSpec with Matchers with Inside with AkkaBeforeAndAfterAll - with MockitoSugar { + with MockitoSugar + with ArgumentMatchersSugar { private val engine = Engine.DevEngine() private val metrics = new Metrics(new MetricRegistry) @@ -127,7 +129,7 @@ class BatchedSubmissionValidatorSpec val mockCommit = mock[CommitStrategy[Unit]] val partySubmission = makePartySubmission("foo") // Expect two keys, i.e., to retrieve the party and submission dedup values. - when(mockLedgerStateReader.readState(argThat((keys: Seq[DamlStateKey]) => keys.size == 2))) + when(mockLedgerStateReader.read(seqOf(size = 2))(anyExecutionContext)) .thenReturn(Future.successful(Seq(None, None))) val logEntryCaptor = ArgumentCaptor.forClass(classOf[DamlLogEntry]) val outputStateCaptor = ArgumentCaptor.forClass(classOf[Map[DamlStateKey, DamlStateValue]]) @@ -174,7 +176,7 @@ class BatchedSubmissionValidatorSpec val (submissions, _, batchSubmissionBytes) = createBatchSubmissionOf(1000) val mockLedgerStateReader = mock[DamlLedgerStateReader] // Expect two keys, i.e., to retrieve the party and submission dedup values. - when(mockLedgerStateReader.readState(argThat((keys: Seq[DamlStateKey]) => keys.size == 2))) + when(mockLedgerStateReader.read(seqOf(size = 2))(anyExecutionContext)) .thenReturn(Future.successful(Seq(None, None))) val logEntryCaptor = ArgumentCaptor.forClass(classOf[DamlLogEntry]) val outputStateCaptor = ArgumentCaptor.forClass(classOf[Map[DamlStateKey, DamlStateValue]]) @@ -204,7 +206,8 @@ class BatchedSubmissionValidatorSpec ) .map { _ => // We expected two state fetches and two commits. - verify(mockLedgerStateReader, times(1000)).readState(any[Seq[DamlStateKey]]()) + verify(mockLedgerStateReader, times(1000)) + .read(any[Seq[DamlStateKey]])(anyExecutionContext) verify(mockCommit, times(1000)).commit( any[ParticipantId], any[String], @@ -240,7 +243,7 @@ class BatchedSubmissionValidatorSpec .build() val mockLedgerStateReader = mock[DamlLedgerStateReader] // Expect two keys, i.e., to retrieve the party and submission dedup values. - when(mockLedgerStateReader.readState(argThat((keys: Seq[DamlStateKey]) => keys.size == 2))) + when(mockLedgerStateReader.read(seqOf(size = 2))(anyExecutionContext)) .thenReturn(Future.successful(Seq(None, None))) val mockCommit = mock[CommitStrategy[Unit]] when( @@ -288,7 +291,7 @@ class BatchedSubmissionValidatorSpec val (submissions, batchSubmission, batchSubmissionBytes) = createBatchSubmissionOf(2) val mockLedgerStateReader = mock[DamlLedgerStateReader] // Expect two keys, i.e., to retrieve the party and submission dedup values. - when(mockLedgerStateReader.readState(argThat((keys: Seq[DamlStateKey]) => keys.size == 2))) + when(mockLedgerStateReader.read(seqOf(size = 2))(anyExecutionContext)) .thenReturn(Future.successful(Seq(None, None))) val mockCommit = mock[CommitStrategy[Unit]] when( @@ -327,15 +330,18 @@ class BatchedSubmissionValidatorSpec } } } +} + +object BatchedSubmissionValidatorSpec { type DamlInputState = Map[DamlStateKey, Option[DamlStateValue]] type DamlOutputState = Map[DamlStateKey, DamlStateValue] + private lazy val aCorrelationId: String = "aCorrelationId" + private def newRecordTime(): Timestamp = Timestamp.assertFromInstant(Clock.systemUTC().instant()) - private lazy val aCorrelationId: String = "aCorrelationId" - private def createBatchSubmissionOf( nSubmissions: Int): (Seq[DamlSubmission], DamlSubmissionBatch, ByteString) = { val submissions = (1 to nSubmissions).map { n => diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/batch/BatchedValidatingCommitterSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/batch/BatchedValidatingCommitterSpec.scala index e67af9fdd4..c9cea6351f 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/batch/BatchedValidatingCommitterSpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/batch/BatchedValidatingCommitterSpec.scala @@ -9,12 +9,12 @@ import akka.stream.Materializer import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll import com.daml.ledger.participant.state.v1.{ParticipantId, SubmissionResult} import com.daml.ledger.validator.TestHelper.aParticipantId -import com.daml.ledger.validator.{CommitStrategy, DamlLedgerStateReader, LedgerStateOperations} +import com.daml.ledger.validator.reading.DamlLedgerStateReader +import com.daml.ledger.validator.{CommitStrategy, LedgerStateOperations} import com.google.protobuf.ByteString import org.mockito.ArgumentMatchers.{any, anyString} import org.mockito.MockitoSugar import org.mockito.stubbing.ScalaFirstStubbing -// import org.mockito.stubbing.OngoingStubbing import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AsyncWordSpec diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/caching/CachingDamlLedgerStateReaderSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/caching/CachingDamlLedgerStateReaderSpec.scala index 2861385cf3..84de62af0a 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/caching/CachingDamlLedgerStateReaderSpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/caching/CachingDamlLedgerStateReaderSpec.scala @@ -6,13 +6,16 @@ package com.daml.ledger.validator.caching import com.daml.caching.WeightedCache import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue} import com.daml.ledger.participant.state.kvutils.caching.`Message Weight` -import com.daml.ledger.validator.{DamlLedgerStateReader, DefaultStateKeySerializationStrategy} +import com.daml.ledger.validator.ArgumentMatchers.{anyExecutionContext, seqOf} +import com.daml.ledger.validator.DefaultStateKeySerializationStrategy +import com.daml.ledger.validator.caching.CachingDamlLedgerStateReaderSpec._ +import com.daml.ledger.validator.reading.DamlLedgerStateReader import org.mockito.{ArgumentMatchersSugar, MockitoSugar} import org.scalatest.Inside import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AsyncWordSpec -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.Future class CachingDamlLedgerStateReaderSpec extends AsyncWordSpec @@ -20,15 +23,14 @@ class CachingDamlLedgerStateReaderSpec with Inside with MockitoSugar with ArgumentMatchersSugar { - - "readState" should { + "read" should { "record read keys" in { val mockReader = mock[DamlLedgerStateReader] - when(mockReader.readState(argThat((keys: Seq[DamlStateKey]) => keys.size == 1))) - .thenReturn(Future.successful(Seq(Some(aDamlStateValue())))) + when(mockReader.read(seqOf(size = 1))(anyExecutionContext)) + .thenReturn(Future.successful(Seq(Some(aDamlStateValue)))) val instance = newInstance(mockReader, shouldCache = false) - instance.readState(Seq(aDamlStateKey)).map { actual => + instance.read(Seq(aDamlStateKey)).map { actual => actual should have size 1 instance.getReadSet should be( Set(keySerializationStrategy.serializeStateKey(aDamlStateKey))) @@ -37,51 +39,56 @@ class CachingDamlLedgerStateReaderSpec "update cache upon read if policy allows" in { val mockReader = mock[DamlLedgerStateReader] - when(mockReader.readState(argThat((keys: Seq[DamlStateKey]) => keys.size == 1))) - .thenReturn(Future.successful(Seq(Some(aDamlStateValue())))) + when(mockReader.read(seqOf(size = 1))(anyExecutionContext)) + .thenReturn(Future.successful(Seq(Some(aDamlStateValue)))) val instance = newInstance(mockReader, shouldCache = true) - instance.readState(Seq(aDamlStateKey)).map { _ => + instance.read(Seq(aDamlStateKey)).map { _ => instance.cache.getIfPresent(aDamlStateKey) shouldBe defined } } "do not update cache upon read if policy does not allow" in { val mockReader = mock[DamlLedgerStateReader] - when(mockReader.readState(argThat((keys: Seq[DamlStateKey]) => keys.size == 1))) - .thenReturn(Future.successful(Seq(Some(aDamlStateValue())))) + when(mockReader.read(seqOf(size = 1))(anyExecutionContext)) + .thenReturn(Future.successful(Seq(Some(aDamlStateValue)))) val instance = newInstance(mockReader, shouldCache = false) - instance.readState(Seq(aDamlStateKey)).map { _ => + instance.read(Seq(aDamlStateKey)).map { _ => instance.cache.getIfPresent(aDamlStateKey) should not be defined } } "serve request from cache for seen key (if policy allows)" in { val mockReader = mock[DamlLedgerStateReader] - when(mockReader.readState(any[Seq[DamlStateKey]])).thenReturn(Future.successful(Seq(None))) + when(mockReader.read(any[Seq[DamlStateKey]])(anyExecutionContext)) + .thenReturn(Future.successful(Seq(Some(aDamlStateValue)))) val instance = newInstance(mockReader, shouldCache = true) for { - originalReadState <- instance.readState(Seq(aDamlStateKey)) - readAgain <- instance.readState(Seq(aDamlStateKey)) + originalReadState <- instance.read(Seq(aDamlStateKey)) + readAgain <- instance.read(Seq(aDamlStateKey)) } yield { - verify(mockReader, times(1)).readState(_) + verify(mockReader, times(1)).read(eqTo(Seq(aDamlStateKey)))(anyExecutionContext) readAgain shouldEqual originalReadState } } } +} +object CachingDamlLedgerStateReaderSpec { private val keySerializationStrategy = DefaultStateKeySerializationStrategy private lazy val aDamlStateKey = DamlStateKey.newBuilder .setContractId("aContractId") .build - private def aDamlStateValue(): DamlStateValue = DamlStateValue.getDefaultInstance + private val aDamlStateValue: DamlStateValue = DamlStateValue.getDefaultInstance - private def newInstance(damlLedgerStateReader: DamlLedgerStateReader, shouldCache: Boolean)( - implicit executionContext: ExecutionContext): CachingDamlLedgerStateReader = { + private def newInstance( + damlLedgerStateReader: DamlLedgerStateReader, + shouldCache: Boolean, + ): CachingDamlLedgerStateReader = { val cache = WeightedCache.from[DamlStateKey, DamlStateValue](WeightedCache.Configuration(1024)) new CachingDamlLedgerStateReader( cache, diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/caching/CachingDamlLedgerStateReaderWithFingerprintsSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/caching/CachingDamlLedgerStateReaderWithFingerprintsSpec.scala index f051554018..c5ccf2f77c 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/caching/CachingDamlLedgerStateReaderWithFingerprintsSpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/caching/CachingDamlLedgerStateReaderWithFingerprintsSpec.scala @@ -13,9 +13,9 @@ import com.daml.ledger.participant.state.kvutils.DamlKvutils.{ } import com.daml.ledger.participant.state.kvutils.caching.`Message Weight` import com.daml.ledger.participant.state.kvutils.{Fingerprint, FingerprintPlaceholder} +import com.daml.ledger.validator.ArgumentMatchers.seqOf import com.daml.ledger.validator.caching.CachingDamlLedgerStateReaderWithFingerprints.`Message-Fingerprint Pair Weight` import com.daml.ledger.validator.preexecution.DamlLedgerStateReaderWithFingerprints -import org.mockito.ArgumentMatchers.argThat import org.mockito.MockitoSugar import org.scalatest.Inside import org.scalatest.matchers.should.Matchers @@ -31,7 +31,7 @@ class CachingDamlLedgerStateReaderWithFingerprintsSpec "read" should { "update cache upon read if policy allows" in { val mockReader = mock[DamlLedgerStateReaderWithFingerprints] - when(mockReader.read(argThat((keys: Seq[DamlStateKey]) => keys.size == 1))) + when(mockReader.read(seqOf(size = 1))) .thenReturn(Future.successful(Seq((Some(aDamlStateValue()), FingerprintPlaceholder)))) val instance = newInstance(mockReader, shouldCache = true) @@ -42,7 +42,7 @@ class CachingDamlLedgerStateReaderWithFingerprintsSpec "do not update cache upon read if policy does not allow" in { val mockReader = mock[DamlLedgerStateReaderWithFingerprints] - when(mockReader.read(argThat((keys: Seq[DamlStateKey]) => keys.size == 1))) + when(mockReader.read(seqOf(size = 1))) .thenReturn(Future.successful(Seq((Some(aDamlStateValue()), FingerprintPlaceholder)))) val instance = newInstance(mockReader, shouldCache = false) @@ -77,7 +77,7 @@ class CachingDamlLedgerStateReaderWithFingerprintsSpec "do not cache None value returned from delegate" in { val mockReader = mock[DamlLedgerStateReaderWithFingerprints] - when(mockReader.read(argThat((keys: Seq[DamlStateKey]) => keys.size == 1))) + when(mockReader.read(seqOf(size = 1))) .thenReturn(Future.successful(Seq((None, FingerprintPlaceholder)))) val instance = newInstance(mockReader, shouldCache = true) diff --git a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingCommitStrategySupport.scala b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingCommitStrategySupport.scala index b48d513fdb..43f8355855 100644 --- a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingCommitStrategySupport.scala +++ b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingCommitStrategySupport.scala @@ -16,11 +16,8 @@ import com.daml.ledger.participant.state.kvutils.Envelope import com.daml.ledger.participant.state.kvutils.tools.integritycheck.IntegrityChecker.bytesAsHexString import com.daml.ledger.validator.LedgerStateOperations.{Key, Value} import com.daml.ledger.validator.batch.BatchedSubmissionValidatorFactory -import com.daml.ledger.validator.{ - CommitStrategy, - DamlLedgerStateReader, - StateKeySerializationStrategy -} +import com.daml.ledger.validator.reading.DamlLedgerStateReader +import com.daml.ledger.validator.{CommitStrategy, StateKeySerializationStrategy} import com.daml.metrics.Metrics import scala.concurrent.ExecutionContext diff --git a/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/CommitStrategySupport.scala b/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/CommitStrategySupport.scala index 84f431e452..f7021bac35 100644 --- a/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/CommitStrategySupport.scala +++ b/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/CommitStrategySupport.scala @@ -7,11 +7,8 @@ import akka.stream.Materializer import com.daml.ledger.participant.state.kvutils.export.WriteSet import com.daml.ledger.participant.state.v1.ReadService import com.daml.ledger.validator.LedgerStateOperations.{Key, Value} -import com.daml.ledger.validator.{ - CommitStrategy, - DamlLedgerStateReader, - StateKeySerializationStrategy -} +import com.daml.ledger.validator.reading.DamlLedgerStateReader +import com.daml.ledger.validator.{CommitStrategy, StateKeySerializationStrategy} trait QueryableWriteSet { def getAndClearRecordedWriteSet(): WriteSet