From aafb4a27f9e16557a71cc9d0faa3171a9a262e84 Mon Sep 17 00:00:00 2001 From: Samir Talwar Date: Tue, 25 Aug 2020 19:52:59 +0200 Subject: [PATCH] kvutils: Simplify exporting by pushing the mutations outwards. (#7215) * kvutils: Add `override` annotations for the exporters. * kvutils: Test the ledger export (v2). * kvutils: Mark integrity tests as small. * kvutils: Add assertions to the FileBasedLedgerDataExporter. Just confirming my understanding of how this works. Verified with the Ledger API Test Tool and some proprietary code. * kvutils: Move the definition of `CorrelationId` to the package file. * kvutils: Pass a submission write set through the batch pipeline. This allows for less mutability all over the place. CHANGELOG_BEGIN CHANGELOG_END * kvutils: Move export finishing into its own type. * kvutils: Move some nested types upwards. * kvutils: Split Deserialization from Serialization. * kvutils: Extract out common behavior in (De)Serialization. * kvutils: Don't use a singleton for LedgerDataExporter. Instead, construct it once. * kvutils: Make sure we close the export file when we're done. * kvutils: Simplify `Debug` in the same manner as `LedgerDataExport`. * kvutils: Fix a test name. Co-authored-by: fabiotudone-da * kvutils: Remove backticks around "export". * kvutils: Move the test `BatchedSubmissionValidator#apply` into the test. Co-authored-by: fabiotudone-da --- .../memory/InMemoryLedgerReaderWriter.scala | 63 ++++---- ledger/participant-state/kvutils/BUILD.bazel | 70 +++++---- .../participant/state/kvutils/Debug.scala | 16 +- .../kvutils/export/Deserialization.scala | 53 +++++++ .../export/FileBasedLedgerDataExporter.scala | 90 +++-------- .../export/InMemorySubmissionAggregator.scala | 36 +++++ .../kvutils/export/LedgerDataExporter.scala | 58 +++---- .../kvutils/export/LedgerDataWriter.scala | 8 + .../export/NoOpLedgerDataExporter.scala | 19 +++ .../export/NoOpSubmissionAggregator.scala | 19 +++ .../export/NoopLedgerDataExporter.scala | 24 --- .../state/kvutils/export/Serialization.scala | 66 +++----- .../kvutils/export/SubmissionAggregator.scala | 22 +++ .../state/kvutils/export/SubmissionInfo.scala | 17 +++ .../state/kvutils/export/package.scala | 14 ++ .../participant/state/kvutils/package.scala | 2 + .../ledger/validator/CommitStrategy.scala | 5 +- .../LogAppendingCommitStrategy.scala | 18 ++- .../batch/BatchedSubmissionValidator.scala | 143 ++++++++++-------- .../BatchedSubmissionValidatorFactory.scala | 49 ++---- .../caching/CachingCommitStrategy.scala | 9 +- .../com/daml/ledger/validator/package.scala | 3 +- .../FileBasedLedgerDataExportSpec.scala | 81 +++------- .../InMemorySubmissionAggregatorSpec.scala | 47 ++++++ .../BatchedSubmissionValidatorSpec.scala | 87 +++++++---- .../caching/CachingCommitStrategySpec.scala | 4 +- .../kvutils/tools/integrity_test.bzl | 10 +- .../tools/export/CommitStrategySupport.scala | 2 +- .../tools/export/IntegrityChecker.scala | 9 +- .../LogAppendingCommitStrategySupport.scala | 3 +- .../WriteRecordingLedgerStateOperations.scala | 2 +- .../state/kvutils/test/Replay.scala | 8 +- .../kvutils/tools/IntegrityCheckerSpec.scala | 2 +- 33 files changed, 586 insertions(+), 473 deletions(-) create mode 100644 ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/Deserialization.scala create mode 100644 ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/InMemorySubmissionAggregator.scala create mode 100644 ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/LedgerDataWriter.scala create mode 100644 ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/NoOpLedgerDataExporter.scala create mode 100644 ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/NoOpSubmissionAggregator.scala delete mode 100644 ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/NoopLedgerDataExporter.scala create mode 100644 ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/SubmissionAggregator.scala create mode 100644 ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/SubmissionInfo.scala create mode 100644 ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/package.scala create mode 100644 ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/export/InMemorySubmissionAggregatorSpec.scala diff --git a/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriter.scala b/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriter.scala index d904c306a8b..2a02cefa073 100644 --- a/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriter.scala +++ b/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriter.scala @@ -12,6 +12,7 @@ import com.daml.dec.DirectExecutionContext import com.daml.ledger.api.health.{HealthStatus, Healthy} import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue} import com.daml.ledger.participant.state.kvutils.api._ +import com.daml.ledger.participant.state.kvutils.export.LedgerDataExporter import com.daml.ledger.participant.state.kvutils.{ Bytes, Fingerprint, @@ -83,36 +84,34 @@ object InMemoryLedgerReaderWriter { extends ResourceOwner[KeyValueLedger] { override def acquire()( implicit executionContext: ExecutionContext - ): Resource[KeyValueLedger] = { - val keyValueCommitting = createKeyValueCommitting(metrics, timeProvider, engine) - - val committer = createBatchedCommitter( - keyValueCommitting, - batchingLedgerWriterConfig, - state, - metrics, - timeProvider, - stateValueCache, - ) - - val readerWriter = new InMemoryLedgerReaderWriter( - ledgerId, - participantId, - dispatcher, - state, - committer, - metrics, - ) - - // We need to generate batched submissions for the validator in order to improve throughput. - // Hence, we have a BatchingLedgerWriter collect and forward batched submissions to the - // in-memory committer. - val ledgerWriter = newLoggingContext { implicit loggingContext => - BatchingLedgerWriter(batchingLedgerWriterConfig, readerWriter) - } - - Resource.successful(createKeyValueLedger(readerWriter, ledgerWriter)) - } + ): Resource[KeyValueLedger] = + for { + ledgerDataExporter <- LedgerDataExporter.Owner.acquire() + keyValueCommitting = createKeyValueCommitting(metrics, timeProvider, engine) + committer = createBatchedCommitter( + keyValueCommitting, + batchingLedgerWriterConfig, + state, + metrics, + timeProvider, + stateValueCache, + ledgerDataExporter, + ) + readerWriter = new InMemoryLedgerReaderWriter( + ledgerId, + participantId, + dispatcher, + state, + committer, + metrics, + ) + // We need to generate batched submissions for the validator in order to improve throughput. + // Hence, we have a BatchingLedgerWriter collect and forward batched submissions to the + // in-memory committer. + ledgerWriter = newLoggingContext { implicit loggingContext => + BatchingLedgerWriter(batchingLedgerWriterConfig, readerWriter) + } + } yield createKeyValueLedger(readerWriter, ledgerWriter) } final class SingleParticipantBatchingOwner( @@ -195,13 +194,15 @@ object InMemoryLedgerReaderWriter { metrics: Metrics, timeProvider: TimeProvider, stateValueCache: Cache[DamlStateKey, DamlStateValue], + ledgerDataExporter: LedgerDataExporter, )(implicit materializer: Materializer): ValidateAndCommit = { val validator = BatchedSubmissionValidator[Index]( BatchedSubmissionValidatorFactory.defaultParametersFor( batchingLedgerWriterConfig.enableBatching), keyValueCommitting, new ConflictDetection(metrics), - metrics + metrics, + ledgerDataExporter, ) val committer = BatchedValidatingCommitter[Index]( () => timeProvider.getCurrentTime, diff --git a/ledger/participant-state/kvutils/BUILD.bazel b/ledger/participant-state/kvutils/BUILD.bazel index d3747c5812d..a062606e088 100644 --- a/ledger/participant-state/kvutils/BUILD.bazel +++ b/ledger/participant-state/kvutils/BUILD.bazel @@ -48,6 +48,7 @@ da_scala_library( "//ledger/participant-state/protobuf:ledger_configuration_java_proto", "//libs-scala/contextualized-logging", "//libs-scala/direct-execution-context", + "//libs-scala/resources", "//libs-scala/timer-utils", "@maven//:com_github_ghik_silencer_lib_2_12_11", "@maven//:com_google_guava_guava", @@ -173,35 +174,40 @@ da_java_proto_library( deps = [":daml_kvutils_proto"], ) -# Builds a ledger dump from running the test tool against a kvutils-based -# in-memory ledger. -client_server_build( - name = "reference-ledger-dump", - testonly = True, # only test targets can depend on this. - client = "//ledger/ledger-api-test-tool", - client_args = [ - "--concurrent-test-runs=4", - "--timeout-scale-factor=20", - "localhost:6865", - ], - data = [ - "//ledger/test-common:dar-files", - ], - output_env = "KVUTILS_LEDGER_DUMP", - runner = "@//bazel_tools/client_server/runner_with_port_check:runner", - runner_args = ["6865"], - server = "//ledger/ledger-on-memory:app", - server_args = [ - "--contract-id-seeding=testing-weak", - "--participant participant-id=ledger-dump,port=6865", - ], - server_files = [ - "//ledger/test-common:dar-files", - ], -) if not is_windows else None - -# Test for checking the integrity of the ledger dump produced above. -integrity_test( - name = "reference-ledger-dump-integrity-test", - dump = ":reference-ledger-dump", -) if not is_windows else None +[ + ( + # Generates a ledger export by running the test tool against a kvutils-based ledger. + client_server_build( + name = name, + testonly = True, # only test targets can depend on this. + client = "//ledger/ledger-api-test-tool", + client_args = [ + "--concurrent-test-runs=4", + "--timeout-scale-factor=20", + "localhost:%d" % port, + ], + data = [ + "//ledger/test-common:dar-files", + ], + output_env = environment_variable, + runner = "@//bazel_tools/client_server/runner_with_port_check:runner", + runner_args = [str(port)], + server = "//ledger/ledger-on-memory:app", + server_args = [ + "--contract-id-seeding=testing-weak", + "--participant participant-id=%s,port=%d" % (name, port), + ], + ) if not is_windows else None, + # Test for checking the integrity of the ledger export produced above. + integrity_test( + name = "%s-integrity-test" % name, + size = "small", + checker = checker, + dump = ":%s" % name, + ) if not is_windows else None, + ) + for (name, port, environment_variable, checker) in [ + ("reference-ledger-dump", 65101, "KVUTILS_LEDGER_DUMP", "//ledger/participant-state/kvutils/tools:integrity-check"), + ("reference-ledger-export", 65102, "KVUTILS_LEDGER_EXPORT", "//ledger/participant-state/kvutils/tools:integrity-check-v2"), + ] +] diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/Debug.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/Debug.scala index a3805447b56..c584b96596d 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/Debug.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/Debug.scala @@ -3,7 +3,8 @@ package com.daml.ledger.participant.state.kvutils -import java.io.{DataOutputStream, FileOutputStream} +import java.io.DataOutputStream +import java.nio.file.{Files, Paths} import com.daml.ledger.participant.state.kvutils.DamlKvutils._ import org.slf4j.LoggerFactory @@ -18,13 +19,14 @@ object Debug { /** The ledger dump stream is a gzip-compressed stream of `LedgerDumpEntry` messages prefixed * by their size. */ - private lazy val optLedgerDumpStream: Option[DataOutputStream] = { - Option(System.getenv("KVUTILS_LEDGER_DUMP")) - .map { filename => - logger.info(s"Enabled writing ledger entries to $filename") - new DataOutputStream(new FileOutputStream(filename)) + private lazy val optLedgerDumpStream: Option[DataOutputStream] = + sys.env + .get("KVUTILS_LEDGER_DUMP") + .map { filePath => + val path = Paths.get(filePath) + logger.info(s"Enabled writing ledger entries to $path.") + new DataOutputStream(Files.newOutputStream(path)) } - } /** Dump ledger entry to disk if dumping is enabled. * Ledger dumps are mostly used to test for backwards compatibility of new releases. diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/Deserialization.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/Deserialization.scala new file mode 100644 index 00000000000..ee6f6db8010 --- /dev/null +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/Deserialization.scala @@ -0,0 +1,53 @@ +// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.participant.state.kvutils.export + +import java.io.DataInputStream +import java.time.Instant + +import com.daml.ledger.participant.state +import com.google.protobuf.ByteString + +object Deserialization { + def deserializeEntry(input: DataInputStream): (SubmissionInfo, WriteSet) = { + val submissionInfo = deserializeSubmissionInfo(input) + val writeSet = deserializeWriteSet(input) + (submissionInfo, writeSet) + } + + private def deserializeSubmissionInfo(input: DataInputStream): SubmissionInfo = { + val correlationId = input.readUTF() + val submissionEnvelope = readBytes(input) + val recordTimeInstant: Instant = readInstant(input) + val participantId = input.readUTF() + SubmissionInfo( + state.v1.ParticipantId.assertFromString(participantId), + correlationId, + submissionEnvelope, + recordTimeInstant, + ) + } + + private def deserializeWriteSet(input: DataInputStream): WriteSet = { + val numKeyValuePairs = input.readInt() + (1 to numKeyValuePairs).map { _ => + val key = readBytes(input) + val value = readBytes(input) + key -> value + } + } + + private def readBytes(input: DataInputStream): ByteString = { + val size = input.readInt() + val byteArray = new Array[Byte](size) + input.readFully(byteArray) + ByteString.copyFrom(byteArray) + } + + private def readInstant(input: DataInputStream) = { + val epochSecond = input.readLong() + val nano = input.readInt() + Instant.ofEpochSecond(epochSecond, nano.toLong) + } +} diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/FileBasedLedgerDataExporter.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/FileBasedLedgerDataExporter.scala index 8e921fe2b08..8c9e6d896ab 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/FileBasedLedgerDataExporter.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/FileBasedLedgerDataExporter.scala @@ -7,96 +7,40 @@ import java.io.DataOutputStream import java.time.Instant import java.util.concurrent.locks.StampedLock +import com.daml.ledger.participant.state.kvutils.CorrelationId import com.daml.ledger.participant.state.v1.ParticipantId import com.daml.ledger.validator.LedgerStateOperations.{Key, Value} -import com.google.protobuf.ByteString - -import scala.collection.mutable -import scala.collection.mutable.ListBuffer /** * Enables exporting ledger data to an output stream. * This class is thread-safe. */ class FileBasedLedgerDataExporter(output: DataOutputStream) extends LedgerDataExporter { - import FileBasedLedgerDataExporter._ private val outputLock = new StampedLock - private[export] val correlationIdMapping = mutable.Map.empty[String, String] - private[export] val inProgressSubmissions = mutable.Map.empty[String, SubmissionInfo] - private[export] val bufferedKeyValueDataPerCorrelationId = - mutable.Map.empty[String, mutable.ListBuffer[(Key, Value)]] - - def addSubmission( - submissionEnvelope: ByteString, - correlationId: String, + override def addSubmission( + participantId: ParticipantId, + correlationId: CorrelationId, + submissionEnvelope: Key, recordTimeInstant: Instant, - participantId: ParticipantId): Unit = + ): SubmissionAggregator = this.synchronized { - inProgressSubmissions.put( - correlationId, - SubmissionInfo(submissionEnvelope, correlationId, recordTimeInstant, participantId)) - () + val submissionInfo = + SubmissionInfo(participantId, correlationId, submissionEnvelope, recordTimeInstant) + new InMemorySubmissionAggregator(submissionInfo, FileBasedLedgerDataWriter) } - def addParentChild(parentCorrelationId: String, childCorrelationId: String): Unit = - this.synchronized { - correlationIdMapping.put(childCorrelationId, parentCorrelationId) - () - } - - def addToWriteSet(correlationId: String, data: Iterable[(Key, Value)]): Unit = - this.synchronized { - correlationIdMapping - .get(correlationId) - .foreach { parentCorrelationId => - val keyValuePairs = bufferedKeyValueDataPerCorrelationId - .getOrElseUpdate(parentCorrelationId, ListBuffer.empty) - keyValuePairs.appendAll(data) - bufferedKeyValueDataPerCorrelationId.put(parentCorrelationId, keyValuePairs) - } - } - - def finishedProcessing(correlationId: String): Unit = { - val (submissionInfo, bufferedData) = this.synchronized { - ( - inProgressSubmissions.get(correlationId), - bufferedKeyValueDataPerCorrelationId.get(correlationId)) - } - submissionInfo.foreach { submission => - bufferedData.foreach(writeSubmissionData(submission, _)) - this.synchronized { - inProgressSubmissions.remove(correlationId) - bufferedKeyValueDataPerCorrelationId.remove(correlationId) - correlationIdMapping - .collect { - case (key, value) if value == correlationId => key - } - .foreach(correlationIdMapping.remove) + object FileBasedLedgerDataWriter extends LedgerDataWriter { + override def write(submissionInfo: SubmissionInfo, writeSet: Seq[(Key, Value)]): Unit = { + val stamp = outputLock.writeLock() + try { + Serialization.serializeEntry(submissionInfo, writeSet, output) + output.flush() + } finally { + outputLock.unlock(stamp) } } } - private def writeSubmissionData( - submissionInfo: SubmissionInfo, - writeSet: ListBuffer[(Key, Value)]): Unit = { - val stamp = outputLock.writeLock() - try { - Serialization.serializeEntry(submissionInfo, writeSet, output) - output.flush() - } finally { - outputLock.unlock(stamp) - } - } -} - -object FileBasedLedgerDataExporter { - case class SubmissionInfo( - submissionEnvelope: ByteString, - correlationId: String, - recordTimeInstant: Instant, - participantId: ParticipantId) - - type WriteSet = Seq[(Key, Value)] } diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/InMemorySubmissionAggregator.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/InMemorySubmissionAggregator.scala new file mode 100644 index 00000000000..09449031c3b --- /dev/null +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/InMemorySubmissionAggregator.scala @@ -0,0 +1,36 @@ +// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.participant.state.kvutils.export + +import com.daml.ledger.participant.state.kvutils.export.SubmissionAggregator.WriteSetBuilder + +import scala.collection.mutable + +final class InMemorySubmissionAggregator(submissionInfo: SubmissionInfo, writer: LedgerDataWriter) + extends SubmissionAggregator { + + import InMemorySubmissionAggregator._ + + private val buffer = mutable.ListBuffer.empty[WriteItem] + + override def addChild(): WriteSetBuilder = new InMemoryWriteSetBuilder(buffer) + + override def finish(): Unit = writer.write(submissionInfo, buffer) +} + +object InMemorySubmissionAggregator { + + final class InMemoryWriteSetBuilder(buffer: mutable.Buffer[WriteItem]) extends WriteSetBuilder { + override def +=(data: WriteItem): Unit = buffer.synchronized { + buffer += data + () + } + + override def ++=(data: Iterable[WriteItem]): Unit = buffer.synchronized { + buffer ++= data + () + } + } + +} diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/LedgerDataExporter.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/LedgerDataExporter.scala index e187677a78a..2d19ad7af50 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/LedgerDataExporter.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/LedgerDataExporter.scala @@ -3,39 +3,25 @@ package com.daml.ledger.participant.state.kvutils.export -import java.io.{DataOutputStream, FileOutputStream} +import java.io.DataOutputStream +import java.nio.file.{Files, Paths} import java.time.Instant +import com.daml.ledger.participant.state.kvutils.CorrelationId import com.daml.ledger.participant.state.v1.ParticipantId -import com.daml.ledger.validator.LedgerStateOperations.{Key, Value} +import com.daml.resources.{Resource, ResourceOwner} import com.google.protobuf.ByteString import org.slf4j.LoggerFactory +import scala.concurrent.ExecutionContext + trait LedgerDataExporter { - - /** - * Adds given submission and its parameters to the list of in-progress submissions. - */ def addSubmission( + participantId: ParticipantId, + correlationId: CorrelationId, submissionEnvelope: ByteString, - correlationId: String, recordTimeInstant: Instant, - participantId: ParticipantId): Unit - - /** - * Establishes parent-child relation between two correlation IDs. - */ - def addParentChild(parentCorrelationId: String, childCorrelationId: String): Unit - - /** - * Adds given key-value pairs to the write-set belonging to the given correlation ID. - */ - def addToWriteSet(correlationId: String, data: Iterable[(Key, Value)]): Unit - - /** - * Signals that entries for the given top-level (parent) correlation ID may be persisted. - */ - def finishedProcessing(correlationId: String): Unit + ): SubmissionAggregator } object LedgerDataExporter { @@ -43,17 +29,21 @@ object LedgerDataExporter { private val logger = LoggerFactory.getLogger(this.getClass) - private lazy val outputStreamMaybe: Option[DataOutputStream] = { - Option(System.getenv(EnvironmentVariableName)) - .map { filename => - logger.info(s"Enabled writing ledger entries to $filename") - new DataOutputStream(new FileOutputStream(filename)) - } + object Owner extends ResourceOwner[LedgerDataExporter] { + override def acquire()( + implicit executionContext: ExecutionContext + ): Resource[LedgerDataExporter] = + sys.env + .get(EnvironmentVariableName) + .map(Paths.get(_)) + .map { path => + logger.info(s"Enabled writing ledger entries to $path.") + ResourceOwner + .forCloseable(() => new DataOutputStream(Files.newOutputStream(path))) + .acquire() + .map(new FileBasedLedgerDataExporter(_)) + } + .getOrElse(Resource.successful(NoOpLedgerDataExporter)) } - private lazy val instance = outputStreamMaybe - .map(new FileBasedLedgerDataExporter(_)) - .getOrElse(NoopLedgerDataExporter) - - def apply(): LedgerDataExporter = instance } diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/LedgerDataWriter.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/LedgerDataWriter.scala new file mode 100644 index 00000000000..e9df34df410 --- /dev/null +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/LedgerDataWriter.scala @@ -0,0 +1,8 @@ +// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.participant.state.kvutils.export + +trait LedgerDataWriter { + def write(submissionInfo: SubmissionInfo, writeSet: WriteSet): Unit +} diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/NoOpLedgerDataExporter.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/NoOpLedgerDataExporter.scala new file mode 100644 index 00000000000..3d13af35ae9 --- /dev/null +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/NoOpLedgerDataExporter.scala @@ -0,0 +1,19 @@ +// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.participant.state.kvutils.export + +import java.time.Instant + +import com.daml.ledger.participant.state.kvutils.CorrelationId +import com.daml.ledger.participant.state.v1.ParticipantId +import com.google.protobuf.ByteString + +object NoOpLedgerDataExporter extends LedgerDataExporter { + override def addSubmission( + participantId: ParticipantId, + correlationId: CorrelationId, + submissionEnvelope: ByteString, + recordTimeInstant: Instant, + ): SubmissionAggregator = NoOpSubmissionAggregator +} diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/NoOpSubmissionAggregator.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/NoOpSubmissionAggregator.scala new file mode 100644 index 00000000000..484b23ae276 --- /dev/null +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/NoOpSubmissionAggregator.scala @@ -0,0 +1,19 @@ +// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.participant.state.kvutils.export + +import com.daml.ledger.participant.state.kvutils.export.SubmissionAggregator.WriteSetBuilder + +object NoOpSubmissionAggregator extends SubmissionAggregator { + override def addChild(): WriteSetBuilder = NoOpWriteSetBuilder + + override def finish(): Unit = () + + object NoOpWriteSetBuilder extends WriteSetBuilder { + override def +=(data: WriteItem): Unit = () + + override def ++=(data: Iterable[WriteItem]): Unit = () + } + +} diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/NoopLedgerDataExporter.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/NoopLedgerDataExporter.scala deleted file mode 100644 index ec106f9ae06..00000000000 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/NoopLedgerDataExporter.scala +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.ledger.participant.state.kvutils.export - -import java.time.Instant - -import com.daml.ledger.participant.state.v1.ParticipantId -import com.daml.ledger.validator.LedgerStateOperations.{Key, Value} -import com.google.protobuf.ByteString - -object NoopLedgerDataExporter extends LedgerDataExporter { - override def addSubmission( - submissionEnvelope: ByteString, - correlationId: String, - recordTimeInstant: Instant, - participantId: ParticipantId): Unit = () - - override def addParentChild(parentCorrelationId: String, childCorrelationId: String): Unit = () - - override def addToWriteSet(correlationId: String, data: Iterable[(Key, Value)]): Unit = () - - override def finishedProcessing(correlationId: String): Unit = () -} diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/Serialization.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/Serialization.scala index 1524c124258..605ee78def3 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/Serialization.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/Serialization.scala @@ -3,78 +3,46 @@ package com.daml.ledger.participant.state.kvutils.export -import java.io.{DataInputStream, DataOutputStream} +import java.io.DataOutputStream import java.time.Instant -import com.daml.ledger.participant.state -import com.daml.ledger.participant.state.kvutils.export.FileBasedLedgerDataExporter.{ - SubmissionInfo, - WriteSet -} import com.google.protobuf.ByteString object Serialization { def serializeEntry( submissionInfo: SubmissionInfo, writeSet: WriteSet, - out: DataOutputStream): Unit = { + out: DataOutputStream, + ): Unit = { serializeSubmissionInfo(submissionInfo, out) serializeWriteSet(writeSet, out) } - def readEntry(input: DataInputStream): (SubmissionInfo, WriteSet) = { - val submissionInfo = readSubmissionInfo(input) - val writeSet = readWriteSet(input) - (submissionInfo, writeSet) - } - private def serializeSubmissionInfo( submissionInfo: SubmissionInfo, - out: DataOutputStream): Unit = { + out: DataOutputStream, + ): Unit = { out.writeUTF(submissionInfo.correlationId) - out.writeInt(submissionInfo.submissionEnvelope.size()) - submissionInfo.submissionEnvelope.writeTo(out) - out.writeLong(submissionInfo.recordTimeInstant.getEpochSecond) - out.writeInt(submissionInfo.recordTimeInstant.getNano) + writeBytes(submissionInfo.submissionEnvelope, out) + writeInstant(submissionInfo.recordTimeInstant, out) out.writeUTF(submissionInfo.participantId) } - private def readSubmissionInfo(input: DataInputStream): SubmissionInfo = { - val correlationId = input.readUTF() - val submissionEnvelopeSize = input.readInt() - val submissionEnvelope = new Array[Byte](submissionEnvelopeSize) - input.readFully(submissionEnvelope) - val recordTimeEpochSeconds = input.readLong() - val recordTimeEpochNanos = input.readInt() - val participantId = input.readUTF() - SubmissionInfo( - ByteString.copyFrom(submissionEnvelope), - correlationId, - Instant.ofEpochSecond(recordTimeEpochSeconds, recordTimeEpochNanos.toLong), - state.v1.ParticipantId.assertFromString(participantId) - ) - } - private def serializeWriteSet(writeSet: WriteSet, out: DataOutputStream): Unit = { out.writeInt(writeSet.size) for ((key, value) <- writeSet.sortBy(_._1.asReadOnlyByteBuffer())) { - out.writeInt(key.size()) - key.writeTo(out) - out.writeInt(value.size()) - value.writeTo(out) + writeBytes(key, out) + writeBytes(value, out) } } - private def readWriteSet(input: DataInputStream): WriteSet = { - val numKeyValuePairs = input.readInt() - (1 to numKeyValuePairs).map { _ => - val keySize = input.readInt() - val keyBytes = new Array[Byte](keySize) - input.readFully(keyBytes) - val valueSize = input.readInt() - val valueBytes = new Array[Byte](valueSize) - input.readFully(valueBytes) - (ByteString.copyFrom(keyBytes), ByteString.copyFrom(valueBytes)) - } + private def writeBytes(bytes: ByteString, out: DataOutputStream): Unit = { + out.writeInt(bytes.size()) + bytes.writeTo(out) + } + + private def writeInstant(instant: Instant, out: DataOutputStream): Unit = { + out.writeLong(instant.getEpochSecond) + out.writeInt(instant.getNano) } } diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/SubmissionAggregator.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/SubmissionAggregator.scala new file mode 100644 index 00000000000..cfca07df1a8 --- /dev/null +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/SubmissionAggregator.scala @@ -0,0 +1,22 @@ +// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.participant.state.kvutils.export + +import com.daml.ledger.participant.state.kvutils.export.SubmissionAggregator._ + +trait SubmissionAggregator { + def addChild(): WriteSetBuilder + + def finish(): Unit +} + +object SubmissionAggregator { + + trait WriteSetBuilder { + def +=(data: WriteItem): Unit + + def ++=(data: Iterable[WriteItem]): Unit + } + +} diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/SubmissionInfo.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/SubmissionInfo.scala new file mode 100644 index 00000000000..70d52146781 --- /dev/null +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/SubmissionInfo.scala @@ -0,0 +1,17 @@ +// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.participant.state.kvutils.export + +import java.time.Instant + +import com.daml.ledger.participant.state.kvutils.CorrelationId +import com.daml.ledger.participant.state.v1.ParticipantId +import com.google.protobuf.ByteString + +case class SubmissionInfo( + participantId: ParticipantId, + correlationId: CorrelationId, + submissionEnvelope: ByteString, + recordTimeInstant: Instant, +) diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/package.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/package.scala new file mode 100644 index 00000000000..a73cea6a48b --- /dev/null +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/export/package.scala @@ -0,0 +1,14 @@ +// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.participant.state.kvutils + +import com.daml.ledger.validator.LedgerStateOperations.{Key, Value} + +package object export { + + type WriteItem = (Key, Value) + + type WriteSet = Seq[WriteItem] + +} diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/package.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/package.scala index 8e19a567da5..3b44a84ad08 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/package.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/package.scala @@ -32,6 +32,8 @@ package object kvutils { type Bytes = ByteString type DamlStateMap = Map[DamlStateKey, Option[DamlStateValue]] + type CorrelationId = String + type Fingerprint = Bytes type DamlStateMapWithFingerprints = Map[DamlStateKey, (Option[DamlStateValue], Fingerprint)] val FingerprintPlaceholder: Fingerprint = ByteString.EMPTY diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/CommitStrategy.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/CommitStrategy.scala index 37bd20afd0b..9ebf47d2ba7 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/CommitStrategy.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/CommitStrategy.scala @@ -9,6 +9,7 @@ import com.daml.ledger.participant.state.kvutils.DamlKvutils.{ DamlStateKey, DamlStateValue } +import com.daml.ledger.participant.state.kvutils.export.SubmissionAggregator import com.daml.ledger.participant.state.v1.ParticipantId import scala.concurrent.Future @@ -23,5 +24,7 @@ trait CommitStrategy[Result] { entryId: DamlLogEntryId, entry: DamlLogEntry, inputState: Map[DamlStateKey, Option[DamlStateValue]], - outputState: Map[DamlStateKey, DamlStateValue]): Future[Result] + outputState: Map[DamlStateKey, DamlStateValue], + exporterWriteSet: Option[SubmissionAggregator.WriteSetBuilder] = None, + ): Future[Result] } diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/LogAppendingCommitStrategy.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/LogAppendingCommitStrategy.scala index 6f31f957561..1a9e3d8665b 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/LogAppendingCommitStrategy.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/LogAppendingCommitStrategy.scala @@ -10,7 +10,7 @@ import com.daml.ledger.participant.state.kvutils.DamlKvutils.{ DamlStateValue } import com.daml.ledger.participant.state.kvutils.Envelope -import com.daml.ledger.participant.state.kvutils.export.LedgerDataExporter +import com.daml.ledger.participant.state.kvutils.export.SubmissionAggregator import com.daml.ledger.participant.state.v1.ParticipantId import scala.collection.breakOut @@ -19,8 +19,7 @@ import scala.concurrent.{ExecutionContext, Future} class LogAppendingCommitStrategy[Index]( ledgerStateOperations: LedgerStateOperations[Index], keySerializationStrategy: StateKeySerializationStrategy, - ledgerDataExporter: LedgerDataExporter = LedgerDataExporter())( - implicit executionContext: ExecutionContext) +)(implicit executionContext: ExecutionContext) extends CommitStrategy[Index] { override def commit( participantId: ParticipantId, @@ -28,21 +27,26 @@ class LogAppendingCommitStrategy[Index]( entryId: DamlLogEntryId, entry: DamlLogEntry, inputState: Map[DamlStateKey, Option[DamlStateValue]], - outputState: Map[DamlStateKey, DamlStateValue]): Future[Index] = + outputState: Map[DamlStateKey, DamlStateValue], + exporterWriteSet: Option[SubmissionAggregator.WriteSetBuilder] = None, + ): Future[Index] = for { serializedKeyValuePairs <- Future.successful(outputState.map { case (key, value) => (keySerializationStrategy.serializeStateKey(key), Envelope.enclose(value)) }(breakOut)) - _ = ledgerDataExporter.addToWriteSet(correlationId, serializedKeyValuePairs) + _ = exporterWriteSet.foreach { + _ ++= serializedKeyValuePairs + } _ <- if (serializedKeyValuePairs.nonEmpty) { ledgerStateOperations.writeState(serializedKeyValuePairs) } else { Future.unit } envelopedLogEntry <- Future.successful(Envelope.enclose(entry)) - _ = ledgerDataExporter - .addToWriteSet(correlationId, List((entryId.toByteString, envelopedLogEntry))) + _ = exporterWriteSet.foreach { + _ += entryId.toByteString -> envelopedLogEntry + } index <- ledgerStateOperations .appendToLog( entryId.toByteString, diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/batch/BatchedSubmissionValidator.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/batch/BatchedSubmissionValidator.scala index 49451672894..df3de15a02f 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/batch/BatchedSubmissionValidator.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/batch/BatchedSubmissionValidator.scala @@ -11,15 +11,14 @@ import akka.stream.Materializer import akka.stream.scaladsl.{Sink, Source} import com.daml.ledger.participant.state.kvutils.DamlKvutils._ import com.daml.ledger.participant.state.kvutils.api.LedgerReader -import com.daml.ledger.participant.state.kvutils.export.LedgerDataExporter -import com.daml.ledger.participant.state.kvutils.{Envelope, KeyValueCommitting} +import com.daml.ledger.participant.state.kvutils.export.{LedgerDataExporter, SubmissionAggregator} +import com.daml.ledger.participant.state.kvutils.{CorrelationId, Envelope, KeyValueCommitting} import com.daml.ledger.participant.state.v1.ParticipantId import com.daml.ledger.validator import com.daml.ledger.validator.SubmissionValidator.LogEntryAndState import com.daml.ledger.validator._ import com.daml.lf.data.Time import com.daml.lf.data.Time.Timestamp -import com.daml.lf.engine.Engine import com.daml.logging.LoggingContext.newLoggingContext import com.daml.logging.{ContextualizedLogger, LoggingContext} import com.daml.metrics.{Metrics, Timed} @@ -36,28 +35,16 @@ object BatchedSubmissionValidator { committer: KeyValueCommitting, conflictDetection: ConflictDetection, metrics: Metrics, - ledgerDataExporter: LedgerDataExporter = LedgerDataExporter()) - : BatchedSubmissionValidator[CommitResult] = + ledgerDataExporter: LedgerDataExporter, + ): BatchedSubmissionValidator[CommitResult] = new BatchedSubmissionValidator[CommitResult]( params, committer, conflictDetection, metrics, - ledgerDataExporter + ledgerDataExporter, ) - private[validator] def apply[CommitResult]( - params: BatchedSubmissionValidatorParameters, - engine: Engine, - metrics: Metrics): BatchedSubmissionValidator[CommitResult] = - new BatchedSubmissionValidator[CommitResult]( - params, - new KeyValueCommitting(engine, metrics), - new ConflictDetection(metrics), - metrics) - - private type CorrelationId = String - /** A [[DamlSubmission]] with an associated correlation id and a log entry id computed * from the envelope. */ private case class CorrelatedSubmission( @@ -104,7 +91,8 @@ class BatchedSubmissionValidator[CommitResult] private[validator] ( committer: KeyValueCommitting, conflictDetection: ConflictDetection, damlMetrics: Metrics, - ledgerDataExporter: LedgerDataExporter = LedgerDataExporter()) { + ledgerDataExporter: LedgerDataExporter, +) { import BatchedSubmissionValidator._ @@ -126,11 +114,12 @@ class BatchedSubmissionValidator[CommitResult] private[validator] ( commitStrategy: CommitStrategy[CommitResult] )(implicit materializer: Materializer, executionContext: ExecutionContext): Future[Unit] = withCorrelationIdLogged(correlationId) { implicit loggingContext => - ledgerDataExporter.addSubmission( - submissionEnvelope, + val exporterAggregator = ledgerDataExporter.addSubmission( + participantId, correlationId, + submissionEnvelope, recordTimeInstant, - participantId) + ) val recordTime = Time.Timestamp.assertFromInstant(recordTimeInstant) Timed.future( metrics.validateAndCommit, { @@ -138,11 +127,11 @@ class BatchedSubmissionValidator[CommitResult] private[validator] ( case Right(Envelope.SubmissionMessage(submission)) => processBatch( participantId, - correlationId, recordTime, singleSubmissionSource(submissionEnvelope, submission, correlationId), ledgerStateReader, - commitStrategy + commitStrategy, + exporterAggregator, ) case Right(Envelope.SubmissionBatchMessage(batch)) => @@ -151,11 +140,12 @@ class BatchedSubmissionValidator[CommitResult] private[validator] ( metrics.receivedBatchSubmissionBytes.update(batch.getSerializedSize) processBatch( participantId, - correlationId, recordTime, batchSubmissionSource(batch), ledgerStateReader, - commitStrategy) + commitStrategy, + exporterAggregator, + ) case Right(other) => Future.failed( @@ -231,7 +221,9 @@ class BatchedSubmissionValidator[CommitResult] private[validator] ( private case class ValidatedSubmission( correlatedSubmission: CorrelatedSubmission, inputState: DamlInputState, - logEntryAndState: LogEntryAndState) + logEntryAndState: LogEntryAndState, + exporterWriteSet: SubmissionAggregator.WriteSetBuilder, + ) private type Outputs2 = Indexed[ValidatedSubmission] @@ -253,13 +245,15 @@ class BatchedSubmissionValidator[CommitResult] private[validator] ( */ private def processBatch( participantId: ParticipantId, - batchCorrelationId: CorrelationId, recordTime: Timestamp, indexedSubmissions: Source[Inputs, NotUsed], damlLedgerStateReader: DamlLedgerStateReader, - commitStrategy: CommitStrategy[CommitResult])( + commitStrategy: CommitStrategy[CommitResult], + exporterAggregator: SubmissionAggregator, + )( implicit materializer: Materializer, - executionContext: ExecutionContext): Future[Unit] = + executionContext: ExecutionContext, + ): Future[Unit] = indexedSubmissions // Fetch the submission inputs in parallel. .mapAsyncUnordered[Outputs1](params.readParallelism) { @@ -269,10 +263,14 @@ class BatchedSubmissionValidator[CommitResult] private[validator] ( .mapAsyncUnordered[Outputs2](params.cpuParallelism) { _.mapFuture { case (correlatedSubmission, inputState) => - ledgerDataExporter.addParentChild( - batchCorrelationId, - correlatedSubmission.correlationId) - validateSubmission(participantId, recordTime, correlatedSubmission, inputState) + val exporterWriteSet = exporterAggregator.addChild() + validateSubmission( + participantId, + recordTime, + correlatedSubmission, + inputState, + exporterWriteSet, + ) } } // Collect the results. @@ -289,26 +287,40 @@ class BatchedSubmissionValidator[CommitResult] private[validator] ( val invalidatedKeys = mutable.Set.empty[DamlStateKey] { - case ValidatedSubmission(correlatedSubmission, inputState, logEntryAndOutputState) => + case ValidatedSubmission( + correlatedSubmission, + inputState, + logEntryAndOutputState, + exporterWriteSet, + ) => detectConflictsAndRecover( correlatedSubmission, inputState, logEntryAndOutputState, - invalidatedKeys) + invalidatedKeys, + exporterWriteSet, + ) } } // Commit the results. .mapAsync[Outputs6](params.commitParallelism) { - case ValidatedSubmission(correlatedSubmission, inputState, logEntryAndOutputState) => + case ValidatedSubmission( + correlatedSubmission, + inputState, + logEntryAndOutputState, + exporterWriteSet, + ) => commitResult( participantId, correlatedSubmission, inputState, logEntryAndOutputState, - commitStrategy) + commitStrategy, + exporterWriteSet, + ) } .runWith(Sink.ignore) - .map(_ => ledgerDataExporter.finishedProcessing(batchCorrelationId)) + .map(_ => exporterAggregator.finish()) private def fetchSubmissionInputs( correlatedSubmission: CorrelatedSubmission, @@ -332,8 +344,9 @@ class BatchedSubmissionValidator[CommitResult] private[validator] ( participantId: ParticipantId, recordTime: Timestamp, correlatedSubmission: CorrelatedSubmission, - inputState: DamlInputState)( - implicit executionContext: ExecutionContext): Future[ValidatedSubmission] = + inputState: DamlInputState, + exporterWriteSet: SubmissionAggregator.WriteSetBuilder, + )(implicit executionContext: ExecutionContext): Future[ValidatedSubmission] = withSubmissionLoggingContext(correlatedSubmission) { _ => Timed.timedAndTrackedFuture( metrics.validate, @@ -347,7 +360,7 @@ class BatchedSubmissionValidator[CommitResult] private[validator] ( participantId, inputState ) - ValidatedSubmission(correlatedSubmission, inputState, logEntryAndState) + ValidatedSubmission(correlatedSubmission, inputState, logEntryAndState, exporterWriteSet) } ) } @@ -356,8 +369,9 @@ class BatchedSubmissionValidator[CommitResult] private[validator] ( correlatedSubmission: CorrelatedSubmission, inputState: DamlInputState, logEntryAndState: LogEntryAndState, - invalidatedKeys: mutable.Set[DamlStateKey]) - : scala.collection.immutable.Iterable[ValidatedSubmission] = { + invalidatedKeys: mutable.Set[DamlStateKey], + exporterWriteSet: SubmissionAggregator.WriteSetBuilder, + ): scala.collection.immutable.Iterable[ValidatedSubmission] = { val (logEntry, outputState) = logEntryAndState withSubmissionLoggingContext(correlatedSubmission) { implicit loggingContext => Timed.value( @@ -372,7 +386,12 @@ class BatchedSubmissionValidator[CommitResult] private[validator] ( .map { case (newInvalidatedKeys, (newLogEntry, newState)) => invalidatedKeys ++= newInvalidatedKeys - ValidatedSubmission(correlatedSubmission, inputState, (newLogEntry, newState)) :: Nil + ValidatedSubmission( + correlatedSubmission, + inputState, + (newLogEntry, newState), + exporterWriteSet, + ) :: Nil } .getOrElse { logger.info( @@ -389,23 +408,27 @@ class BatchedSubmissionValidator[CommitResult] private[validator] ( correlatedSubmission: CorrelatedSubmission, inputState: DamlInputState, logEntryAndState: LogEntryAndState, - commitStrategy: CommitStrategy[CommitResult])( - implicit executionContext: ExecutionContext): Future[Unit] = { + commitStrategy: CommitStrategy[CommitResult], + exporterWriteSet: SubmissionAggregator.WriteSetBuilder, + )(implicit executionContext: ExecutionContext): Future[Unit] = { val (logEntry, outputState) = logEntryAndState withSubmissionLoggingContext(correlatedSubmission) { _ => - Timed.timedAndTrackedFuture( - metrics.commit, - metrics.commitRunning, - commitStrategy - .commit( - participantId, - correlatedSubmission.correlationId, - correlatedSubmission.logEntryId, - logEntry, - inputState, - outputState) - .map(_ => ()) - ) + Timed + .timedAndTrackedFuture( + metrics.commit, + metrics.commitRunning, + commitStrategy + .commit( + participantId, + correlatedSubmission.correlationId, + correlatedSubmission.logEntryId, + logEntry, + inputState, + outputState, + Some(exporterWriteSet), + ) + ) + .map(_ => ()) } } } diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/batch/BatchedSubmissionValidatorFactory.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/batch/BatchedSubmissionValidatorFactory.scala index 9dc572658b7..8c92e8fcae6 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/batch/BatchedSubmissionValidatorFactory.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/batch/BatchedSubmissionValidatorFactory.scala @@ -5,7 +5,6 @@ package com.daml.ledger.validator.batch import com.daml.caching.Cache import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue} -import com.daml.ledger.participant.state.kvutils.export.LedgerDataExporter import com.daml.ledger.validator.LedgerStateOperations.{Key, Value} import com.daml.ledger.validator.caching.{ CacheUpdatePolicy, @@ -22,8 +21,6 @@ import com.daml.ledger.validator.{ LogAppendingCommitStrategy, StateKeySerializationStrategy } -import com.daml.lf.engine.Engine -import com.daml.metrics.Metrics import scala.concurrent.{ExecutionContext, Future} @@ -50,17 +47,16 @@ object BatchedSubmissionValidatorFactory { def readerAndCommitStrategyFrom[LogResult]( ledgerStateOperations: LedgerStateOperations[LogResult], keySerializationStrategy: StateKeySerializationStrategy = DefaultStateKeySerializationStrategy, - ledgerDataExporter: LedgerDataExporter = LedgerDataExporter())( - implicit executionContext: ExecutionContext) + )(implicit executionContext: ExecutionContext) : (DamlLedgerStateReader, CommitStrategy[LogResult]) = { val ledgerStateReader = DamlLedgerStateReader.from( new LedgerStateReaderAdapter[LogResult](ledgerStateOperations), - keySerializationStrategy) - val commitStrategy = - new LogAppendingCommitStrategy[LogResult]( - ledgerStateOperations, - keySerializationStrategy, - ledgerDataExporter) + keySerializationStrategy, + ) + val commitStrategy = new LogAppendingCommitStrategy[LogResult]( + ledgerStateOperations, + keySerializationStrategy, + ) (ledgerStateReader, commitStrategy) } @@ -69,8 +65,7 @@ object BatchedSubmissionValidatorFactory { stateCache: Cache[DamlStateKey, DamlStateValue], cacheUpdatePolicy: CacheUpdatePolicy, keySerializationStrategy: StateKeySerializationStrategy = DefaultStateKeySerializationStrategy, - ledgerDataExporter: LedgerDataExporter = LedgerDataExporter())( - implicit executionContext: ExecutionContext) + )(implicit executionContext: ExecutionContext) : (DamlLedgerStateReader with QueryableReadSet, CommitStrategy[LogResult]) = { val ledgerStateReader = new CachingDamlLedgerStateReader( stateCache, @@ -78,7 +73,8 @@ object BatchedSubmissionValidatorFactory { keySerializationStrategy, DamlLedgerStateReader.from( new LedgerStateReaderAdapter[LogResult](ledgerStateOperations), - keySerializationStrategy) + keySerializationStrategy, + ), ) val commitStrategy = new CachingCommitStrategy( stateCache, @@ -86,31 +82,8 @@ object BatchedSubmissionValidatorFactory { new LogAppendingCommitStrategy[LogResult]( ledgerStateOperations, keySerializationStrategy, - ledgerDataExporter) + ) ) (ledgerStateReader, commitStrategy) } - - case class CachingEnabledComponents[LogResult]( - ledgerStateReader: DamlLedgerStateReader with QueryableReadSet, - commitStrategy: CommitStrategy[LogResult], - batchValidator: BatchedSubmissionValidator[LogResult]) - - def componentsEnabledForCaching[LogResult]( - params: BatchedSubmissionValidatorParameters, - ledgerStateOperations: LedgerStateOperations[LogResult], - stateCache: Cache[DamlStateKey, DamlStateValue], - cacheUpdatePolicy: CacheUpdatePolicy, - metrics: Metrics, - engine: Engine - )(implicit executionContext: ExecutionContext): CachingEnabledComponents[LogResult] = { - val (ledgerStateReader, commitStrategy) = - cachingReaderAndCommitStrategyFrom(ledgerStateOperations, stateCache, cacheUpdatePolicy) - val batchValidator = BatchedSubmissionValidator[LogResult]( - params, - engine, - metrics - ) - CachingEnabledComponents(ledgerStateReader, commitStrategy, batchValidator) - } } diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/caching/CachingCommitStrategy.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/caching/CachingCommitStrategy.scala index 42d87b5c1cc..3fe00446c8b 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/caching/CachingCommitStrategy.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/caching/CachingCommitStrategy.scala @@ -6,6 +6,7 @@ package com.daml.ledger.validator.caching import com.daml.caching.Cache import com.daml.ledger.participant.state.kvutils.DamlKvutils import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue} +import com.daml.ledger.participant.state.kvutils.export.SubmissionAggregator import com.daml.ledger.participant.state.v1.ParticipantId import com.daml.ledger.validator.CommitStrategy @@ -22,7 +23,9 @@ class CachingCommitStrategy[Result]( entryId: DamlKvutils.DamlLogEntryId, entry: DamlKvutils.DamlLogEntry, inputState: Map[DamlStateKey, Option[DamlStateValue]], - outputState: Map[DamlStateKey, DamlStateValue]): Future[Result] = + outputState: Map[DamlStateKey, DamlStateValue], + exporterWriteSet: Option[SubmissionAggregator.WriteSetBuilder], + ): Future[Result] = for { _ <- Future { outputState.view.filter { case (key, _) => shouldCache(key) }.foreach { @@ -35,6 +38,8 @@ class CachingCommitStrategy[Result]( entryId, entry, inputState, - outputState) + outputState, + exporterWriteSet, + ) } yield result } diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/package.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/package.scala index b00398d17e8..7cf10871d12 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/package.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/package.scala @@ -3,13 +3,12 @@ package com.daml.ledger -import com.daml.ledger.participant.state.kvutils.Bytes +import com.daml.ledger.participant.state.kvutils.{Bytes, CorrelationId} import com.daml.ledger.participant.state.v1.{ParticipantId, SubmissionResult} import scala.concurrent.Future package object validator { - type CorrelationId = String type SubmissionEnvelope = Bytes type SubmittingParticipantId = ParticipantId diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/export/FileBasedLedgerDataExportSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/export/FileBasedLedgerDataExportSpec.scala index 8e6417fd58a..184087c3846 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/export/FileBasedLedgerDataExportSpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/export/FileBasedLedgerDataExportSpec.scala @@ -11,56 +11,7 @@ import com.google.protobuf.ByteString import org.scalatest.mockito.MockitoSugar import org.scalatest.{Matchers, WordSpec} -class FileBasedLedgerDataExportSpec extends WordSpec with Matchers with MockitoSugar { - // XXX SC remove in Scala 2.13; see notes in ConfSpec - import scala.collection.GenTraversable, org.scalatest.enablers.Containing - private[this] implicit def `fixed sig containingNatureOfGenTraversable`[ - E: org.scalactic.Equality, - TRAV]: Containing[TRAV with GenTraversable[E]] = - Containing.containingNatureOfGenTraversable[E, GenTraversable] - - "addParentChild" should { - "add entry to correlation ID mapping" in { - val instance = new FileBasedLedgerDataExporter(mock[DataOutputStream]) - instance.addParentChild("parent", "child") - - instance.correlationIdMapping should contain("child" -> "parent") - } - } - - "addToWriteSet" should { - "append to existing data" in { - val instance = new FileBasedLedgerDataExporter(mock[DataOutputStream]) - instance.addParentChild("parent", "child") - instance.addToWriteSet("child", Seq(keyValuePairOf("a", "b"))) - instance.addToWriteSet("child", Seq(keyValuePairOf("c", "d"))) - - instance.bufferedKeyValueDataPerCorrelationId should contain( - "parent" -> - Seq(keyValuePairOf("a", "b"), keyValuePairOf("c", "d"))) - } - } - - "finishedProcessing" should { - "remove all data such as submission info, write-set and child correlation IDs" in { - val dataOutputStream = new DataOutputStream(new ByteArrayOutputStream()) - val instance = new FileBasedLedgerDataExporter(dataOutputStream) - instance.addSubmission( - ByteString.copyFromUtf8("an envelope"), - "parent", - Instant.now(), - v1.ParticipantId.assertFromString("id")) - instance.addParentChild("parent", "parent") - instance.addToWriteSet("parent", Seq(keyValuePairOf("a", "b"))) - - instance.finishedProcessing("parent") - - instance.inProgressSubmissions shouldBe empty - instance.bufferedKeyValueDataPerCorrelationId shouldBe empty - instance.correlationIdMapping shouldBe empty - } - } - +final class FileBasedLedgerDataExportSpec extends WordSpec with Matchers with MockitoSugar { "serialized submission" should { "be readable back" in { val baos = new ByteArrayOutputStream() @@ -68,23 +19,33 @@ class FileBasedLedgerDataExportSpec extends WordSpec with Matchers with MockitoS val instance = new FileBasedLedgerDataExporter(dataOutputStream) val expectedRecordTimeInstant = Instant.ofEpochSecond(123456, 123456789) val expectedParticipantId = v1.ParticipantId.assertFromString("id") - instance.addSubmission( - ByteString.copyFromUtf8("an envelope"), - "parent", - expectedRecordTimeInstant, - v1.ParticipantId.assertFromString("id")) - instance.addParentChild("parent", "parent") - instance.addToWriteSet("parent", Seq(keyValuePairOf("a", "b"))) - instance.finishedProcessing("parent") + val submission = instance.addSubmission( + v1.ParticipantId.assertFromString("id"), + "parent", + ByteString.copyFromUtf8("an envelope"), + expectedRecordTimeInstant, + ) + val writeSetA1 = submission.addChild() + writeSetA1 ++= Seq(keyValuePairOf("a", "b"), keyValuePairOf("c", "d")) + val writeSetA2 = submission.addChild() + writeSetA2 ++= Seq(keyValuePairOf("e", "f"), keyValuePairOf("g", "h")) + + submission.finish() val dataInputStream = new DataInputStream(new ByteArrayInputStream(baos.toByteArray)) - val (actualSubmissionInfo, actualWriteSet) = Serialization.readEntry(dataInputStream) + val (actualSubmissionInfo, actualWriteSet) = Deserialization.deserializeEntry(dataInputStream) actualSubmissionInfo.submissionEnvelope should be(ByteString.copyFromUtf8("an envelope")) actualSubmissionInfo.correlationId should be("parent") actualSubmissionInfo.recordTimeInstant should be(expectedRecordTimeInstant) actualSubmissionInfo.participantId should be(expectedParticipantId) - actualWriteSet should be(Seq(keyValuePairOf("a", "b"))) + actualWriteSet should be( + Seq( + keyValuePairOf("a", "b"), + keyValuePairOf("c", "d"), + keyValuePairOf("e", "f"), + keyValuePairOf("g", "h"), + )) } } diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/export/InMemorySubmissionAggregatorSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/export/InMemorySubmissionAggregatorSpec.scala new file mode 100644 index 00000000000..d05adbdb644 --- /dev/null +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/export/InMemorySubmissionAggregatorSpec.scala @@ -0,0 +1,47 @@ +// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.participant.state.kvutils.export + +import java.time.Instant + +import com.daml.ledger.participant.state.v1.ParticipantId +import com.google.protobuf.ByteString +import org.mockito.Mockito +import org.scalatest.mockito.MockitoSugar +import org.scalatest.{Matchers, WordSpec} + +final class InMemorySubmissionAggregatorSpec extends WordSpec with Matchers with MockitoSugar { + "InMemorySubmissionAggregator" should { + "aggregate data" in { + val submissionInfo = SubmissionInfo( + ParticipantId.assertFromString("participant-id"), + "correlation ID", + ByteString.copyFromUtf8("the envelope"), + Instant.now(), + ) + val writer = mock[LedgerDataWriter] + val submission = new InMemorySubmissionAggregator(submissionInfo, writer) + val writeSetA = submission.addChild() + writeSetA += keyValuePairOf("a", "b") + writeSetA += keyValuePairOf("c", "d") + + val writeSetB = submission.addChild() + writeSetB += keyValuePairOf("e", "f") + writeSetB += keyValuePairOf("g", "h") + + submission.finish() + + val expected = Seq( + keyValuePairOf("a", "b"), + keyValuePairOf("c", "d"), + keyValuePairOf("e", "f"), + keyValuePairOf("g", "h"), + ) + Mockito.verify(writer).write(submissionInfo, expected) + } + } + + private def keyValuePairOf(key: String, value: String): (ByteString, ByteString) = + ByteString.copyFromUtf8(key) -> ByteString.copyFromUtf8(value) +} diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/batch/BatchedSubmissionValidatorSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/batch/BatchedSubmissionValidatorSpec.scala index 777bda2aab8..0dddfb89346 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/batch/BatchedSubmissionValidatorSpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/batch/BatchedSubmissionValidatorSpec.scala @@ -9,7 +9,11 @@ import com.codahale.metrics.MetricRegistry import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll import com.daml.ledger.participant.state.kvutils.DamlKvutils.DamlSubmissionBatch.CorrelatedSubmission import com.daml.ledger.participant.state.kvutils.DamlKvutils._ -import com.daml.ledger.participant.state.kvutils.Envelope +import com.daml.ledger.participant.state.kvutils.export.{ + NoOpLedgerDataExporter, + SubmissionAggregator +} +import com.daml.ledger.participant.state.kvutils.{Envelope, KeyValueCommitting} import com.daml.ledger.participant.state.v1.ParticipantId import com.daml.ledger.validator.TestHelper.{aParticipantId, anInvalidEnvelope, makePartySubmission} import com.daml.ledger.validator.{CommitStrategy, DamlLedgerStateReader, ValidationFailed} @@ -20,8 +24,8 @@ import com.google.protobuf.ByteString import org.mockito.ArgumentCaptor import org.mockito.ArgumentMatchers.{any, argThat} import org.mockito.Mockito._ -import org.scalatest.{Assertion, AsyncWordSpec, Inside, Matchers} import org.scalatest.mockito.MockitoSugar +import org.scalatest.{Assertion, AsyncWordSpec, Inside, Matchers} import scala.collection.JavaConverters._ import scala.concurrent.Future @@ -36,13 +40,24 @@ class BatchedSubmissionValidatorSpec private val engine = Engine.DevEngine() private val metrics = new Metrics(new MetricRegistry) + private def newBatchedSubmissionValidator[CommitResult]( + params: BatchedSubmissionValidatorParameters, + metrics: Metrics = this.metrics, + ): BatchedSubmissionValidator[CommitResult] = + new BatchedSubmissionValidator[CommitResult]( + params, + new KeyValueCommitting(engine, metrics), + new ConflictDetection(metrics), + metrics, + NoOpLedgerDataExporter, + ) + "validateAndCommit" should { "return validation failure for invalid envelope" in { - val validator = BatchedSubmissionValidator[Unit]( + val validator = newBatchedSubmissionValidator[Unit]( BatchedSubmissionValidatorParameters.reasonableDefault, - engine, - metrics) + ) validator .validateAndCommit( @@ -60,10 +75,9 @@ class BatchedSubmissionValidatorSpec } "return validation failure for invalid message type in envelope" in { - val validator = BatchedSubmissionValidator[Unit]( + val validator = newBatchedSubmissionValidator[Unit]( BatchedSubmissionValidatorParameters.reasonableDefault, - engine, - metrics) + ) val notASubmission = Envelope.enclose(DamlStateValue.getDefaultInstance) validator @@ -82,10 +96,9 @@ class BatchedSubmissionValidatorSpec } "return validation failure for invalid envelope in batch" in { - val validator = BatchedSubmissionValidator[Unit]( + val validator = newBatchedSubmissionValidator[Unit]( BatchedSubmissionValidatorParameters.reasonableDefault, - engine, - metrics) + ) val batchSubmission = DamlSubmissionBatch.newBuilder .addSubmissions( CorrelatedSubmission.newBuilder @@ -124,12 +137,13 @@ class BatchedSubmissionValidatorSpec any[DamlLogEntryId], logEntryCaptor.capture(), any[Map[DamlStateKey, Option[DamlStateValue]]], - outputStateCaptor.capture())) + outputStateCaptor.capture(), + any[Option[SubmissionAggregator.WriteSetBuilder]], + )) .thenReturn(Future.unit) - val validator = BatchedSubmissionValidator[Unit]( + val validator = newBatchedSubmissionValidator[Unit]( BatchedSubmissionValidatorParameters.reasonableDefault, - engine, - metrics) + ) validator .validateAndCommit( @@ -179,12 +193,13 @@ class BatchedSubmissionValidatorSpec any[DamlLogEntryId], logEntryCaptor.capture(), any[Map[DamlStateKey, Option[DamlStateValue]]], - any[Map[DamlStateKey, DamlStateValue]] + any[Map[DamlStateKey, DamlStateValue]], + any[Option[SubmissionAggregator.WriteSetBuilder]], )) .thenReturn(Future.unit) val validatorConfig = BatchedSubmissionValidatorParameters.reasonableDefault.copy(commitParallelism = 1) - val validator = BatchedSubmissionValidator[Unit](validatorConfig, engine, metrics) + val validator = newBatchedSubmissionValidator[Unit](validatorConfig) validator .validateAndCommit( @@ -202,7 +217,9 @@ class BatchedSubmissionValidatorSpec any[DamlLogEntryId], any[DamlLogEntry], any[DamlInputState], - any[DamlOutputState]) + any[DamlOutputState], + any[Option[SubmissionAggregator.WriteSetBuilder]], + ) // Verify that the log entries have been committed in the right order. val logEntries = logEntryCaptor.getAllValues.asScala.map(_.asInstanceOf[DamlLogEntry]) logEntries.map(_.getPartyAllocationEntry) should be( @@ -233,13 +250,13 @@ class BatchedSubmissionValidatorSpec any[DamlLogEntryId], any[DamlLogEntry], any[Map[DamlStateKey, Option[DamlStateValue]]], - any[Map[DamlStateKey, DamlStateValue]] + any[Map[DamlStateKey, DamlStateValue]], + any[Option[SubmissionAggregator.WriteSetBuilder]], )) .thenReturn(Future.unit) - val validator = BatchedSubmissionValidator[Unit]( + val validator = newBatchedSubmissionValidator[Unit]( BatchedSubmissionValidatorParameters.reasonableDefault, - engine, - metrics) + ) validator .validateAndCommit( @@ -258,7 +275,9 @@ class BatchedSubmissionValidatorSpec any[DamlLogEntryId], any[DamlLogEntry], any[DamlInputState], - any[DamlOutputState]) + any[DamlOutputState], + any[Option[SubmissionAggregator.WriteSetBuilder]], + ) succeed } } @@ -279,14 +298,14 @@ class BatchedSubmissionValidatorSpec any[DamlLogEntryId], any[DamlLogEntry], any[Map[DamlStateKey, Option[DamlStateValue]]], - any[Map[DamlStateKey, DamlStateValue]] + any[Map[DamlStateKey, DamlStateValue]], + any[Option[SubmissionAggregator.WriteSetBuilder]], )) .thenReturn(Future.unit) - val validator = - BatchedSubmissionValidator[Unit]( - BatchedSubmissionValidatorParameters.reasonableDefault, - engine, - metrics) + val validator = newBatchedSubmissionValidator[Unit]( + BatchedSubmissionValidatorParameters.reasonableDefault, + metrics = metrics, + ) validator .validateAndCommit( @@ -335,12 +354,14 @@ class BatchedSubmissionValidatorSpec any[DamlLogEntryId], logEntryCaptor.capture(), any[Map[DamlStateKey, Option[DamlStateValue]]], - outputStateCaptor.capture())) + outputStateCaptor.capture(), + any[Option[SubmissionAggregator.WriteSetBuilder]], + )) .thenReturn(Future.unit) val validatorConfig = BatchedSubmissionValidatorParameters.reasonableDefault.copy( commitParallelism = commitParallelism) - val validator = BatchedSubmissionValidator[Unit](validatorConfig, engine, metrics) + val validator = newBatchedSubmissionValidator[Unit](validatorConfig) validator .validateAndCommit( @@ -360,7 +381,9 @@ class BatchedSubmissionValidatorSpec any[DamlLogEntryId], any[DamlLogEntry], any[DamlInputState], - any[DamlOutputState]) + any[DamlOutputState], + any[Option[SubmissionAggregator.WriteSetBuilder]], + ) // Verify we have all the expected log entries. val logEntries = logEntryCaptor.getAllValues.asScala.map(_.asInstanceOf[DamlLogEntry]) logEntries.map(_.getPartyAllocationEntry) should contain allElementsOf diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/caching/CachingCommitStrategySpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/caching/CachingCommitStrategySpec.scala index 0002399075a..364d93de14b 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/caching/CachingCommitStrategySpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/validator/caching/CachingCommitStrategySpec.scala @@ -11,6 +11,7 @@ import com.daml.ledger.participant.state.kvutils.DamlKvutils.{ DamlStateValue } import com.daml.ledger.participant.state.kvutils.caching.`Message Weight` +import com.daml.ledger.participant.state.kvutils.export.SubmissionAggregator import com.daml.ledger.participant.state.v1.ParticipantId import com.daml.ledger.validator.CommitStrategy import com.daml.ledger.validator.TestHelper._ @@ -64,7 +65,8 @@ class CachingCommitStrategySpec extends AsyncWordSpec with Matchers with Mockito any[DamlLogEntryId](), any[DamlLogEntry](), any[Map[DamlStateKey, Option[DamlStateValue]]](), - any[Map[DamlStateKey, DamlStateValue]]() + any[Map[DamlStateKey, DamlStateValue]](), + any[Option[SubmissionAggregator.WriteSetBuilder]], )) .thenReturn(Future.unit) new CachingCommitStrategy[Unit](cache, _ => shouldCache, mockCommitStrategy) diff --git a/ledger/participant-state/kvutils/tools/integrity_test.bzl b/ledger/participant-state/kvutils/tools/integrity_test.bzl index 2008fea2c96..e3597941028 100644 --- a/ledger/participant-state/kvutils/tools/integrity_test.bzl +++ b/ledger/participant-state/kvutils/tools/integrity_test.bzl @@ -9,7 +9,7 @@ def _integrity_test_impl(ctx): set -eux {checker} $(rlocation "$TEST_WORKSPACE/{dump}") """.format( - checker = ctx.executable._checker.short_path, + checker = ctx.executable.checker.short_path, dump = ctx.file.dump.short_path, ), is_executable = True, @@ -17,7 +17,7 @@ set -eux runfiles = ctx.runfiles( files = [wrapper, ctx.file.dump], ) - runfiles = runfiles.merge(ctx.attr._checker[DefaultInfo].default_runfiles) + runfiles = runfiles.merge(ctx.attr.checker[DefaultInfo].default_runfiles) return DefaultInfo( executable = wrapper, files = depset([wrapper]), @@ -29,11 +29,7 @@ integrity_test = rule( test = True, executable = True, attrs = { - "_checker": attr.label( - cfg = "host", - executable = True, - default = Label("@//ledger/participant-state/kvutils/tools:integrity-check"), - ), + "checker": attr.label(cfg = "host", executable = True), "dump": attr.label(allow_single_file = True), }, ) diff --git a/ledger/participant-state/kvutils/tools/src/main/scala/com/daml/ledger/participant/state/kvutils/tools/export/CommitStrategySupport.scala b/ledger/participant-state/kvutils/tools/src/main/scala/com/daml/ledger/participant/state/kvutils/tools/export/CommitStrategySupport.scala index 9b9c65f3e3d..b7c3fc75526 100644 --- a/ledger/participant-state/kvutils/tools/src/main/scala/com/daml/ledger/participant/state/kvutils/tools/export/CommitStrategySupport.scala +++ b/ledger/participant-state/kvutils/tools/src/main/scala/com/daml/ledger/participant/state/kvutils/tools/export/CommitStrategySupport.scala @@ -3,7 +3,7 @@ package com.daml.ledger.participant.state.kvutils.tools.export -import com.daml.ledger.participant.state.kvutils.export.FileBasedLedgerDataExporter.WriteSet +import com.daml.ledger.participant.state.kvutils.export.WriteSet import com.daml.ledger.validator.LedgerStateOperations.{Key, Value} import com.daml.ledger.validator.{ CommitStrategy, diff --git a/ledger/participant-state/kvutils/tools/src/main/scala/com/daml/ledger/participant/state/kvutils/tools/export/IntegrityChecker.scala b/ledger/participant-state/kvutils/tools/src/main/scala/com/daml/ledger/participant/state/kvutils/tools/export/IntegrityChecker.scala index c2988fac075..3418a6a6f48 100644 --- a/ledger/participant-state/kvutils/tools/src/main/scala/com/daml/ledger/participant/state/kvutils/tools/export/IntegrityChecker.scala +++ b/ledger/participant-state/kvutils/tools/src/main/scala/com/daml/ledger/participant/state/kvutils/tools/export/IntegrityChecker.scala @@ -11,11 +11,12 @@ import akka.stream.Materializer import com.codahale.metrics.{ConsoleReporter, MetricRegistry} import com.daml.ledger.participant.state.kvutils import com.daml.ledger.participant.state.kvutils.KeyValueCommitting -import com.daml.ledger.participant.state.kvutils.export.FileBasedLedgerDataExporter.{ +import com.daml.ledger.participant.state.kvutils.export.{ + Deserialization, + NoOpLedgerDataExporter, SubmissionInfo, WriteSet } -import com.daml.ledger.participant.state.kvutils.export.{NoopLedgerDataExporter, Serialization} import com.daml.ledger.participant.state.kvutils.tools._ import com.daml.ledger.validator.LedgerStateOperations.{Key, Value} import com.daml.ledger.validator.batch.{ @@ -47,7 +48,7 @@ class IntegrityChecker[LogResult](commitStrategySupport: CommitStrategySupport[L new KeyValueCommitting(engine, metrics), new ConflictDetection(metrics), metrics, - NoopLedgerDataExporter, + NoOpLedgerDataExporter, ) val ComponentsForReplay(reader, commitStrategy, queryableWriteSet) = commitStrategySupport.createComponentsForReplay() @@ -165,7 +166,7 @@ class IntegrityChecker[LogResult](commitStrategySupport: CommitStrategySupport[L .orElse(commitStrategySupport.explainMismatchingValue(key, expectedValue, actualValue)) private def readSubmissionAndOutputs(input: DataInputStream): (SubmissionInfo, WriteSet) = { - val (submissionInfo, writeSet) = Serialization.readEntry(input) + val (submissionInfo, writeSet) = Deserialization.deserializeEntry(input) println( s"Read submission correlationId=${submissionInfo.correlationId} submissionEnvelopeSize=${submissionInfo.submissionEnvelope .size()} writeSetSize=${writeSet.size}") diff --git a/ledger/participant-state/kvutils/tools/src/main/scala/com/daml/ledger/participant/state/kvutils/tools/export/LogAppendingCommitStrategySupport.scala b/ledger/participant-state/kvutils/tools/src/main/scala/com/daml/ledger/participant/state/kvutils/tools/export/LogAppendingCommitStrategySupport.scala index fd37673c6b5..46b81fed51d 100644 --- a/ledger/participant-state/kvutils/tools/src/main/scala/com/daml/ledger/participant/state/kvutils/tools/export/LogAppendingCommitStrategySupport.scala +++ b/ledger/participant-state/kvutils/tools/src/main/scala/com/daml/ledger/participant/state/kvutils/tools/export/LogAppendingCommitStrategySupport.scala @@ -5,7 +5,6 @@ package com.daml.ledger.participant.state.kvutils.tools.export import com.daml.ledger.on.memory.{InMemoryLedgerStateOperations, Index} import com.daml.ledger.participant.state.kvutils -import com.daml.ledger.participant.state.kvutils.export.NoopLedgerDataExporter import com.daml.ledger.participant.state.kvutils.tools.export.IntegrityChecker.bytesAsHexString import com.daml.ledger.validator.LedgerStateOperations.{Key, Value} import com.daml.ledger.validator.StateKeySerializationStrategy @@ -26,7 +25,7 @@ object LogAppendingCommitStrategySupport extends CommitStrategySupport[Index] { BatchedSubmissionValidatorFactory.readerAndCommitStrategyFrom( writeRecordingLedgerStateOperations, stateKeySerializationStrategy, - NoopLedgerDataExporter) + ) ComponentsForReplay(reader, commitStrategy, writeRecordingLedgerStateOperations) } diff --git a/ledger/participant-state/kvutils/tools/src/main/scala/com/daml/ledger/participant/state/kvutils/tools/export/WriteRecordingLedgerStateOperations.scala b/ledger/participant-state/kvutils/tools/src/main/scala/com/daml/ledger/participant/state/kvutils/tools/export/WriteRecordingLedgerStateOperations.scala index 0d42904bcad..7c787e453ff 100644 --- a/ledger/participant-state/kvutils/tools/src/main/scala/com/daml/ledger/participant/state/kvutils/tools/export/WriteRecordingLedgerStateOperations.scala +++ b/ledger/participant-state/kvutils/tools/src/main/scala/com/daml/ledger/participant/state/kvutils/tools/export/WriteRecordingLedgerStateOperations.scala @@ -3,7 +3,7 @@ package com.daml.ledger.participant.state.kvutils.tools.export -import com.daml.ledger.participant.state.kvutils.export.FileBasedLedgerDataExporter.WriteSet +import com.daml.ledger.participant.state.kvutils.export.WriteSet import com.daml.ledger.validator.LedgerStateOperations import com.daml.ledger.validator.LedgerStateOperations.{Key, Value} diff --git a/ledger/participant-state/kvutils/tools/src/test/scala/com/daml/ledger/participant/state/kvutils/test/Replay.scala b/ledger/participant-state/kvutils/tools/src/test/scala/com/daml/ledger/participant/state/kvutils/test/Replay.scala index 532bf79423f..f099460f97a 100644 --- a/ledger/participant-state/kvutils/tools/src/test/scala/com/daml/ledger/participant/state/kvutils/test/Replay.scala +++ b/ledger/participant-state/kvutils/tools/src/test/scala/com/daml/ledger/participant/state/kvutils/test/Replay.scala @@ -9,8 +9,7 @@ import java.nio.file.{Files, Path, Paths} import java.util.concurrent.TimeUnit import com.daml.ledger.participant.state.kvutils.Conversions._ -import com.daml.ledger.participant.state.kvutils.export.FileBasedLedgerDataExporter.SubmissionInfo -import com.daml.ledger.participant.state.kvutils.export.Serialization +import com.daml.ledger.participant.state.kvutils.export.{Deserialization, SubmissionInfo} import com.daml.ledger.participant.state.kvutils.{Envelope, DamlKvutils => Proto} import com.daml.ledger.participant.state.v1.ParticipantId import com.daml.lf.archive.{Decode, UniversalArchiveReader} @@ -18,8 +17,9 @@ import com.daml.lf.crypto import com.daml.lf.data._ import com.daml.lf.engine.Engine import com.daml.lf.language.{Ast, LanguageVersion, Util => AstUtil} -import com.daml.lf.transaction.{GlobalKey, GlobalKeyWithMaintainers} import com.daml.lf.transaction.{ + GlobalKey, + GlobalKeyWithMaintainers, Node, SubmittedTransaction, Transaction => Tx, @@ -164,7 +164,7 @@ object Replay { def go: Stream[SubmissionInfo] = if (ledgerExportStream.available() > 0) - Serialization.readEntry(ledgerExportStream)._1 #:: go + Deserialization.deserializeEntry(ledgerExportStream)._1 #:: go else { ledgerExportStream.close() Stream.empty diff --git a/ledger/participant-state/kvutils/tools/src/test/scala/com/daml/ledger/participant/state/kvutils/tools/IntegrityCheckerSpec.scala b/ledger/participant-state/kvutils/tools/src/test/scala/com/daml/ledger/participant/state/kvutils/tools/IntegrityCheckerSpec.scala index 71e8f7b49b9..cfb0f6769bf 100644 --- a/ledger/participant-state/kvutils/tools/src/test/scala/com/daml/ledger/participant/state/kvutils/tools/IntegrityCheckerSpec.scala +++ b/ledger/participant-state/kvutils/tools/src/test/scala/com/daml/ledger/participant/state/kvutils/tools/IntegrityCheckerSpec.scala @@ -3,7 +3,7 @@ package com.daml.ledger.participant.state.kvutils.tools -import com.daml.ledger.participant.state.kvutils.export.FileBasedLedgerDataExporter.WriteSet +import com.daml.ledger.participant.state.kvutils.export.WriteSet import com.daml.ledger.participant.state.kvutils.tools.export.{ CommitStrategySupport, IntegrityChecker