kvutils: Simplify exporting by pushing the mutations outwards. (#7215)

* kvutils: Add `override` annotations for the exporters.

* kvutils: Test the ledger export (v2).

* kvutils: Mark integrity tests as small.

* kvutils: Add assertions to the FileBasedLedgerDataExporter.

Just confirming my understanding of how this works. Verified with the
Ledger API Test Tool and some proprietary code.

* kvutils: Move the definition of `CorrelationId` to the package file.

* kvutils: Pass a submission write set through the batch pipeline.

This allows for less mutability all over the place.

CHANGELOG_BEGIN
CHANGELOG_END

* kvutils: Move export finishing into its own type.

* kvutils: Move some nested types upwards.

* kvutils: Split Deserialization from Serialization.

* kvutils: Extract out common behavior in (De)Serialization.

* kvutils: Don't use a singleton for LedgerDataExporter.

Instead, construct it once.

* kvutils: Make sure we close the export file when we're done.

* kvutils: Simplify `Debug` in the same manner as `LedgerDataExport`.

* kvutils: Fix a test name.

Co-authored-by: fabiotudone-da <fabio.tudone@digitalasset.com>

* kvutils: Remove backticks around "export".

* kvutils: Move the test `BatchedSubmissionValidator#apply` into the test.

Co-authored-by: fabiotudone-da <fabio.tudone@digitalasset.com>
This commit is contained in:
Samir Talwar 2020-08-25 19:52:59 +02:00 committed by GitHub
parent 4e265f0c1c
commit aafb4a27f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 586 additions and 473 deletions

View File

@ -12,6 +12,7 @@ import com.daml.dec.DirectExecutionContext
import com.daml.ledger.api.health.{HealthStatus, Healthy}
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue}
import com.daml.ledger.participant.state.kvutils.api._
import com.daml.ledger.participant.state.kvutils.export.LedgerDataExporter
import com.daml.ledger.participant.state.kvutils.{
Bytes,
Fingerprint,
@ -83,36 +84,34 @@ object InMemoryLedgerReaderWriter {
extends ResourceOwner[KeyValueLedger] {
override def acquire()(
implicit executionContext: ExecutionContext
): Resource[KeyValueLedger] = {
val keyValueCommitting = createKeyValueCommitting(metrics, timeProvider, engine)
val committer = createBatchedCommitter(
keyValueCommitting,
batchingLedgerWriterConfig,
state,
metrics,
timeProvider,
stateValueCache,
)
val readerWriter = new InMemoryLedgerReaderWriter(
ledgerId,
participantId,
dispatcher,
state,
committer,
metrics,
)
// We need to generate batched submissions for the validator in order to improve throughput.
// Hence, we have a BatchingLedgerWriter collect and forward batched submissions to the
// in-memory committer.
val ledgerWriter = newLoggingContext { implicit loggingContext =>
BatchingLedgerWriter(batchingLedgerWriterConfig, readerWriter)
}
Resource.successful(createKeyValueLedger(readerWriter, ledgerWriter))
}
): Resource[KeyValueLedger] =
for {
ledgerDataExporter <- LedgerDataExporter.Owner.acquire()
keyValueCommitting = createKeyValueCommitting(metrics, timeProvider, engine)
committer = createBatchedCommitter(
keyValueCommitting,
batchingLedgerWriterConfig,
state,
metrics,
timeProvider,
stateValueCache,
ledgerDataExporter,
)
readerWriter = new InMemoryLedgerReaderWriter(
ledgerId,
participantId,
dispatcher,
state,
committer,
metrics,
)
// We need to generate batched submissions for the validator in order to improve throughput.
// Hence, we have a BatchingLedgerWriter collect and forward batched submissions to the
// in-memory committer.
ledgerWriter = newLoggingContext { implicit loggingContext =>
BatchingLedgerWriter(batchingLedgerWriterConfig, readerWriter)
}
} yield createKeyValueLedger(readerWriter, ledgerWriter)
}
final class SingleParticipantBatchingOwner(
@ -195,13 +194,15 @@ object InMemoryLedgerReaderWriter {
metrics: Metrics,
timeProvider: TimeProvider,
stateValueCache: Cache[DamlStateKey, DamlStateValue],
ledgerDataExporter: LedgerDataExporter,
)(implicit materializer: Materializer): ValidateAndCommit = {
val validator = BatchedSubmissionValidator[Index](
BatchedSubmissionValidatorFactory.defaultParametersFor(
batchingLedgerWriterConfig.enableBatching),
keyValueCommitting,
new ConflictDetection(metrics),
metrics
metrics,
ledgerDataExporter,
)
val committer = BatchedValidatingCommitter[Index](
() => timeProvider.getCurrentTime,

View File

@ -48,6 +48,7 @@ da_scala_library(
"//ledger/participant-state/protobuf:ledger_configuration_java_proto",
"//libs-scala/contextualized-logging",
"//libs-scala/direct-execution-context",
"//libs-scala/resources",
"//libs-scala/timer-utils",
"@maven//:com_github_ghik_silencer_lib_2_12_11",
"@maven//:com_google_guava_guava",
@ -173,35 +174,40 @@ da_java_proto_library(
deps = [":daml_kvutils_proto"],
)
# Builds a ledger dump from running the test tool against a kvutils-based
# in-memory ledger.
client_server_build(
name = "reference-ledger-dump",
testonly = True, # only test targets can depend on this.
client = "//ledger/ledger-api-test-tool",
client_args = [
"--concurrent-test-runs=4",
"--timeout-scale-factor=20",
"localhost:6865",
],
data = [
"//ledger/test-common:dar-files",
],
output_env = "KVUTILS_LEDGER_DUMP",
runner = "@//bazel_tools/client_server/runner_with_port_check:runner",
runner_args = ["6865"],
server = "//ledger/ledger-on-memory:app",
server_args = [
"--contract-id-seeding=testing-weak",
"--participant participant-id=ledger-dump,port=6865",
],
server_files = [
"//ledger/test-common:dar-files",
],
) if not is_windows else None
# Test for checking the integrity of the ledger dump produced above.
integrity_test(
name = "reference-ledger-dump-integrity-test",
dump = ":reference-ledger-dump",
) if not is_windows else None
[
(
# Generates a ledger export by running the test tool against a kvutils-based ledger.
client_server_build(
name = name,
testonly = True, # only test targets can depend on this.
client = "//ledger/ledger-api-test-tool",
client_args = [
"--concurrent-test-runs=4",
"--timeout-scale-factor=20",
"localhost:%d" % port,
],
data = [
"//ledger/test-common:dar-files",
],
output_env = environment_variable,
runner = "@//bazel_tools/client_server/runner_with_port_check:runner",
runner_args = [str(port)],
server = "//ledger/ledger-on-memory:app",
server_args = [
"--contract-id-seeding=testing-weak",
"--participant participant-id=%s,port=%d" % (name, port),
],
) if not is_windows else None,
# Test for checking the integrity of the ledger export produced above.
integrity_test(
name = "%s-integrity-test" % name,
size = "small",
checker = checker,
dump = ":%s" % name,
) if not is_windows else None,
)
for (name, port, environment_variable, checker) in [
("reference-ledger-dump", 65101, "KVUTILS_LEDGER_DUMP", "//ledger/participant-state/kvutils/tools:integrity-check"),
("reference-ledger-export", 65102, "KVUTILS_LEDGER_EXPORT", "//ledger/participant-state/kvutils/tools:integrity-check-v2"),
]
]

View File

@ -3,7 +3,8 @@
package com.daml.ledger.participant.state.kvutils
import java.io.{DataOutputStream, FileOutputStream}
import java.io.DataOutputStream
import java.nio.file.{Files, Paths}
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
import org.slf4j.LoggerFactory
@ -18,13 +19,14 @@ object Debug {
/** The ledger dump stream is a gzip-compressed stream of `LedgerDumpEntry` messages prefixed
* by their size.
*/
private lazy val optLedgerDumpStream: Option[DataOutputStream] = {
Option(System.getenv("KVUTILS_LEDGER_DUMP"))
.map { filename =>
logger.info(s"Enabled writing ledger entries to $filename")
new DataOutputStream(new FileOutputStream(filename))
private lazy val optLedgerDumpStream: Option[DataOutputStream] =
sys.env
.get("KVUTILS_LEDGER_DUMP")
.map { filePath =>
val path = Paths.get(filePath)
logger.info(s"Enabled writing ledger entries to $path.")
new DataOutputStream(Files.newOutputStream(path))
}
}
/** Dump ledger entry to disk if dumping is enabled.
* Ledger dumps are mostly used to test for backwards compatibility of new releases.

View File

@ -0,0 +1,53 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils.export
import java.io.DataInputStream
import java.time.Instant
import com.daml.ledger.participant.state
import com.google.protobuf.ByteString
object Deserialization {
def deserializeEntry(input: DataInputStream): (SubmissionInfo, WriteSet) = {
val submissionInfo = deserializeSubmissionInfo(input)
val writeSet = deserializeWriteSet(input)
(submissionInfo, writeSet)
}
private def deserializeSubmissionInfo(input: DataInputStream): SubmissionInfo = {
val correlationId = input.readUTF()
val submissionEnvelope = readBytes(input)
val recordTimeInstant: Instant = readInstant(input)
val participantId = input.readUTF()
SubmissionInfo(
state.v1.ParticipantId.assertFromString(participantId),
correlationId,
submissionEnvelope,
recordTimeInstant,
)
}
private def deserializeWriteSet(input: DataInputStream): WriteSet = {
val numKeyValuePairs = input.readInt()
(1 to numKeyValuePairs).map { _ =>
val key = readBytes(input)
val value = readBytes(input)
key -> value
}
}
private def readBytes(input: DataInputStream): ByteString = {
val size = input.readInt()
val byteArray = new Array[Byte](size)
input.readFully(byteArray)
ByteString.copyFrom(byteArray)
}
private def readInstant(input: DataInputStream) = {
val epochSecond = input.readLong()
val nano = input.readInt()
Instant.ofEpochSecond(epochSecond, nano.toLong)
}
}

View File

@ -7,96 +7,40 @@ import java.io.DataOutputStream
import java.time.Instant
import java.util.concurrent.locks.StampedLock
import com.daml.ledger.participant.state.kvutils.CorrelationId
import com.daml.ledger.participant.state.v1.ParticipantId
import com.daml.ledger.validator.LedgerStateOperations.{Key, Value}
import com.google.protobuf.ByteString
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
/**
* Enables exporting ledger data to an output stream.
* This class is thread-safe.
*/
class FileBasedLedgerDataExporter(output: DataOutputStream) extends LedgerDataExporter {
import FileBasedLedgerDataExporter._
private val outputLock = new StampedLock
private[export] val correlationIdMapping = mutable.Map.empty[String, String]
private[export] val inProgressSubmissions = mutable.Map.empty[String, SubmissionInfo]
private[export] val bufferedKeyValueDataPerCorrelationId =
mutable.Map.empty[String, mutable.ListBuffer[(Key, Value)]]
def addSubmission(
submissionEnvelope: ByteString,
correlationId: String,
override def addSubmission(
participantId: ParticipantId,
correlationId: CorrelationId,
submissionEnvelope: Key,
recordTimeInstant: Instant,
participantId: ParticipantId): Unit =
): SubmissionAggregator =
this.synchronized {
inProgressSubmissions.put(
correlationId,
SubmissionInfo(submissionEnvelope, correlationId, recordTimeInstant, participantId))
()
val submissionInfo =
SubmissionInfo(participantId, correlationId, submissionEnvelope, recordTimeInstant)
new InMemorySubmissionAggregator(submissionInfo, FileBasedLedgerDataWriter)
}
def addParentChild(parentCorrelationId: String, childCorrelationId: String): Unit =
this.synchronized {
correlationIdMapping.put(childCorrelationId, parentCorrelationId)
()
}
def addToWriteSet(correlationId: String, data: Iterable[(Key, Value)]): Unit =
this.synchronized {
correlationIdMapping
.get(correlationId)
.foreach { parentCorrelationId =>
val keyValuePairs = bufferedKeyValueDataPerCorrelationId
.getOrElseUpdate(parentCorrelationId, ListBuffer.empty)
keyValuePairs.appendAll(data)
bufferedKeyValueDataPerCorrelationId.put(parentCorrelationId, keyValuePairs)
}
}
def finishedProcessing(correlationId: String): Unit = {
val (submissionInfo, bufferedData) = this.synchronized {
(
inProgressSubmissions.get(correlationId),
bufferedKeyValueDataPerCorrelationId.get(correlationId))
}
submissionInfo.foreach { submission =>
bufferedData.foreach(writeSubmissionData(submission, _))
this.synchronized {
inProgressSubmissions.remove(correlationId)
bufferedKeyValueDataPerCorrelationId.remove(correlationId)
correlationIdMapping
.collect {
case (key, value) if value == correlationId => key
}
.foreach(correlationIdMapping.remove)
object FileBasedLedgerDataWriter extends LedgerDataWriter {
override def write(submissionInfo: SubmissionInfo, writeSet: Seq[(Key, Value)]): Unit = {
val stamp = outputLock.writeLock()
try {
Serialization.serializeEntry(submissionInfo, writeSet, output)
output.flush()
} finally {
outputLock.unlock(stamp)
}
}
}
private def writeSubmissionData(
submissionInfo: SubmissionInfo,
writeSet: ListBuffer[(Key, Value)]): Unit = {
val stamp = outputLock.writeLock()
try {
Serialization.serializeEntry(submissionInfo, writeSet, output)
output.flush()
} finally {
outputLock.unlock(stamp)
}
}
}
object FileBasedLedgerDataExporter {
case class SubmissionInfo(
submissionEnvelope: ByteString,
correlationId: String,
recordTimeInstant: Instant,
participantId: ParticipantId)
type WriteSet = Seq[(Key, Value)]
}

View File

@ -0,0 +1,36 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils.export
import com.daml.ledger.participant.state.kvutils.export.SubmissionAggregator.WriteSetBuilder
import scala.collection.mutable
final class InMemorySubmissionAggregator(submissionInfo: SubmissionInfo, writer: LedgerDataWriter)
extends SubmissionAggregator {
import InMemorySubmissionAggregator._
private val buffer = mutable.ListBuffer.empty[WriteItem]
override def addChild(): WriteSetBuilder = new InMemoryWriteSetBuilder(buffer)
override def finish(): Unit = writer.write(submissionInfo, buffer)
}
object InMemorySubmissionAggregator {
final class InMemoryWriteSetBuilder(buffer: mutable.Buffer[WriteItem]) extends WriteSetBuilder {
override def +=(data: WriteItem): Unit = buffer.synchronized {
buffer += data
()
}
override def ++=(data: Iterable[WriteItem]): Unit = buffer.synchronized {
buffer ++= data
()
}
}
}

View File

@ -3,39 +3,25 @@
package com.daml.ledger.participant.state.kvutils.export
import java.io.{DataOutputStream, FileOutputStream}
import java.io.DataOutputStream
import java.nio.file.{Files, Paths}
import java.time.Instant
import com.daml.ledger.participant.state.kvutils.CorrelationId
import com.daml.ledger.participant.state.v1.ParticipantId
import com.daml.ledger.validator.LedgerStateOperations.{Key, Value}
import com.daml.resources.{Resource, ResourceOwner}
import com.google.protobuf.ByteString
import org.slf4j.LoggerFactory
import scala.concurrent.ExecutionContext
trait LedgerDataExporter {
/**
* Adds given submission and its parameters to the list of in-progress submissions.
*/
def addSubmission(
participantId: ParticipantId,
correlationId: CorrelationId,
submissionEnvelope: ByteString,
correlationId: String,
recordTimeInstant: Instant,
participantId: ParticipantId): Unit
/**
* Establishes parent-child relation between two correlation IDs.
*/
def addParentChild(parentCorrelationId: String, childCorrelationId: String): Unit
/**
* Adds given key-value pairs to the write-set belonging to the given correlation ID.
*/
def addToWriteSet(correlationId: String, data: Iterable[(Key, Value)]): Unit
/**
* Signals that entries for the given top-level (parent) correlation ID may be persisted.
*/
def finishedProcessing(correlationId: String): Unit
): SubmissionAggregator
}
object LedgerDataExporter {
@ -43,17 +29,21 @@ object LedgerDataExporter {
private val logger = LoggerFactory.getLogger(this.getClass)
private lazy val outputStreamMaybe: Option[DataOutputStream] = {
Option(System.getenv(EnvironmentVariableName))
.map { filename =>
logger.info(s"Enabled writing ledger entries to $filename")
new DataOutputStream(new FileOutputStream(filename))
}
object Owner extends ResourceOwner[LedgerDataExporter] {
override def acquire()(
implicit executionContext: ExecutionContext
): Resource[LedgerDataExporter] =
sys.env
.get(EnvironmentVariableName)
.map(Paths.get(_))
.map { path =>
logger.info(s"Enabled writing ledger entries to $path.")
ResourceOwner
.forCloseable(() => new DataOutputStream(Files.newOutputStream(path)))
.acquire()
.map(new FileBasedLedgerDataExporter(_))
}
.getOrElse(Resource.successful(NoOpLedgerDataExporter))
}
private lazy val instance = outputStreamMaybe
.map(new FileBasedLedgerDataExporter(_))
.getOrElse(NoopLedgerDataExporter)
def apply(): LedgerDataExporter = instance
}

View File

@ -0,0 +1,8 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils.export
trait LedgerDataWriter {
def write(submissionInfo: SubmissionInfo, writeSet: WriteSet): Unit
}

View File

@ -0,0 +1,19 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils.export
import java.time.Instant
import com.daml.ledger.participant.state.kvutils.CorrelationId
import com.daml.ledger.participant.state.v1.ParticipantId
import com.google.protobuf.ByteString
object NoOpLedgerDataExporter extends LedgerDataExporter {
override def addSubmission(
participantId: ParticipantId,
correlationId: CorrelationId,
submissionEnvelope: ByteString,
recordTimeInstant: Instant,
): SubmissionAggregator = NoOpSubmissionAggregator
}

View File

@ -0,0 +1,19 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils.export
import com.daml.ledger.participant.state.kvutils.export.SubmissionAggregator.WriteSetBuilder
object NoOpSubmissionAggregator extends SubmissionAggregator {
override def addChild(): WriteSetBuilder = NoOpWriteSetBuilder
override def finish(): Unit = ()
object NoOpWriteSetBuilder extends WriteSetBuilder {
override def +=(data: WriteItem): Unit = ()
override def ++=(data: Iterable[WriteItem]): Unit = ()
}
}

View File

@ -1,24 +0,0 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils.export
import java.time.Instant
import com.daml.ledger.participant.state.v1.ParticipantId
import com.daml.ledger.validator.LedgerStateOperations.{Key, Value}
import com.google.protobuf.ByteString
object NoopLedgerDataExporter extends LedgerDataExporter {
override def addSubmission(
submissionEnvelope: ByteString,
correlationId: String,
recordTimeInstant: Instant,
participantId: ParticipantId): Unit = ()
override def addParentChild(parentCorrelationId: String, childCorrelationId: String): Unit = ()
override def addToWriteSet(correlationId: String, data: Iterable[(Key, Value)]): Unit = ()
override def finishedProcessing(correlationId: String): Unit = ()
}

View File

@ -3,78 +3,46 @@
package com.daml.ledger.participant.state.kvutils.export
import java.io.{DataInputStream, DataOutputStream}
import java.io.DataOutputStream
import java.time.Instant
import com.daml.ledger.participant.state
import com.daml.ledger.participant.state.kvutils.export.FileBasedLedgerDataExporter.{
SubmissionInfo,
WriteSet
}
import com.google.protobuf.ByteString
object Serialization {
def serializeEntry(
submissionInfo: SubmissionInfo,
writeSet: WriteSet,
out: DataOutputStream): Unit = {
out: DataOutputStream,
): Unit = {
serializeSubmissionInfo(submissionInfo, out)
serializeWriteSet(writeSet, out)
}
def readEntry(input: DataInputStream): (SubmissionInfo, WriteSet) = {
val submissionInfo = readSubmissionInfo(input)
val writeSet = readWriteSet(input)
(submissionInfo, writeSet)
}
private def serializeSubmissionInfo(
submissionInfo: SubmissionInfo,
out: DataOutputStream): Unit = {
out: DataOutputStream,
): Unit = {
out.writeUTF(submissionInfo.correlationId)
out.writeInt(submissionInfo.submissionEnvelope.size())
submissionInfo.submissionEnvelope.writeTo(out)
out.writeLong(submissionInfo.recordTimeInstant.getEpochSecond)
out.writeInt(submissionInfo.recordTimeInstant.getNano)
writeBytes(submissionInfo.submissionEnvelope, out)
writeInstant(submissionInfo.recordTimeInstant, out)
out.writeUTF(submissionInfo.participantId)
}
private def readSubmissionInfo(input: DataInputStream): SubmissionInfo = {
val correlationId = input.readUTF()
val submissionEnvelopeSize = input.readInt()
val submissionEnvelope = new Array[Byte](submissionEnvelopeSize)
input.readFully(submissionEnvelope)
val recordTimeEpochSeconds = input.readLong()
val recordTimeEpochNanos = input.readInt()
val participantId = input.readUTF()
SubmissionInfo(
ByteString.copyFrom(submissionEnvelope),
correlationId,
Instant.ofEpochSecond(recordTimeEpochSeconds, recordTimeEpochNanos.toLong),
state.v1.ParticipantId.assertFromString(participantId)
)
}
private def serializeWriteSet(writeSet: WriteSet, out: DataOutputStream): Unit = {
out.writeInt(writeSet.size)
for ((key, value) <- writeSet.sortBy(_._1.asReadOnlyByteBuffer())) {
out.writeInt(key.size())
key.writeTo(out)
out.writeInt(value.size())
value.writeTo(out)
writeBytes(key, out)
writeBytes(value, out)
}
}
private def readWriteSet(input: DataInputStream): WriteSet = {
val numKeyValuePairs = input.readInt()
(1 to numKeyValuePairs).map { _ =>
val keySize = input.readInt()
val keyBytes = new Array[Byte](keySize)
input.readFully(keyBytes)
val valueSize = input.readInt()
val valueBytes = new Array[Byte](valueSize)
input.readFully(valueBytes)
(ByteString.copyFrom(keyBytes), ByteString.copyFrom(valueBytes))
}
private def writeBytes(bytes: ByteString, out: DataOutputStream): Unit = {
out.writeInt(bytes.size())
bytes.writeTo(out)
}
private def writeInstant(instant: Instant, out: DataOutputStream): Unit = {
out.writeLong(instant.getEpochSecond)
out.writeInt(instant.getNano)
}
}

View File

@ -0,0 +1,22 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils.export
import com.daml.ledger.participant.state.kvutils.export.SubmissionAggregator._
trait SubmissionAggregator {
def addChild(): WriteSetBuilder
def finish(): Unit
}
object SubmissionAggregator {
trait WriteSetBuilder {
def +=(data: WriteItem): Unit
def ++=(data: Iterable[WriteItem]): Unit
}
}

View File

@ -0,0 +1,17 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils.export
import java.time.Instant
import com.daml.ledger.participant.state.kvutils.CorrelationId
import com.daml.ledger.participant.state.v1.ParticipantId
import com.google.protobuf.ByteString
case class SubmissionInfo(
participantId: ParticipantId,
correlationId: CorrelationId,
submissionEnvelope: ByteString,
recordTimeInstant: Instant,
)

View File

@ -0,0 +1,14 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils
import com.daml.ledger.validator.LedgerStateOperations.{Key, Value}
package object export {
type WriteItem = (Key, Value)
type WriteSet = Seq[WriteItem]
}

View File

@ -32,6 +32,8 @@ package object kvutils {
type Bytes = ByteString
type DamlStateMap = Map[DamlStateKey, Option[DamlStateValue]]
type CorrelationId = String
type Fingerprint = Bytes
type DamlStateMapWithFingerprints = Map[DamlStateKey, (Option[DamlStateValue], Fingerprint)]
val FingerprintPlaceholder: Fingerprint = ByteString.EMPTY

View File

@ -9,6 +9,7 @@ import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
DamlStateKey,
DamlStateValue
}
import com.daml.ledger.participant.state.kvutils.export.SubmissionAggregator
import com.daml.ledger.participant.state.v1.ParticipantId
import scala.concurrent.Future
@ -23,5 +24,7 @@ trait CommitStrategy[Result] {
entryId: DamlLogEntryId,
entry: DamlLogEntry,
inputState: Map[DamlStateKey, Option[DamlStateValue]],
outputState: Map[DamlStateKey, DamlStateValue]): Future[Result]
outputState: Map[DamlStateKey, DamlStateValue],
exporterWriteSet: Option[SubmissionAggregator.WriteSetBuilder] = None,
): Future[Result]
}

View File

@ -10,7 +10,7 @@ import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
DamlStateValue
}
import com.daml.ledger.participant.state.kvutils.Envelope
import com.daml.ledger.participant.state.kvutils.export.LedgerDataExporter
import com.daml.ledger.participant.state.kvutils.export.SubmissionAggregator
import com.daml.ledger.participant.state.v1.ParticipantId
import scala.collection.breakOut
@ -19,8 +19,7 @@ import scala.concurrent.{ExecutionContext, Future}
class LogAppendingCommitStrategy[Index](
ledgerStateOperations: LedgerStateOperations[Index],
keySerializationStrategy: StateKeySerializationStrategy,
ledgerDataExporter: LedgerDataExporter = LedgerDataExporter())(
implicit executionContext: ExecutionContext)
)(implicit executionContext: ExecutionContext)
extends CommitStrategy[Index] {
override def commit(
participantId: ParticipantId,
@ -28,21 +27,26 @@ class LogAppendingCommitStrategy[Index](
entryId: DamlLogEntryId,
entry: DamlLogEntry,
inputState: Map[DamlStateKey, Option[DamlStateValue]],
outputState: Map[DamlStateKey, DamlStateValue]): Future[Index] =
outputState: Map[DamlStateKey, DamlStateValue],
exporterWriteSet: Option[SubmissionAggregator.WriteSetBuilder] = None,
): Future[Index] =
for {
serializedKeyValuePairs <- Future.successful(outputState.map {
case (key, value) =>
(keySerializationStrategy.serializeStateKey(key), Envelope.enclose(value))
}(breakOut))
_ = ledgerDataExporter.addToWriteSet(correlationId, serializedKeyValuePairs)
_ = exporterWriteSet.foreach {
_ ++= serializedKeyValuePairs
}
_ <- if (serializedKeyValuePairs.nonEmpty) {
ledgerStateOperations.writeState(serializedKeyValuePairs)
} else {
Future.unit
}
envelopedLogEntry <- Future.successful(Envelope.enclose(entry))
_ = ledgerDataExporter
.addToWriteSet(correlationId, List((entryId.toByteString, envelopedLogEntry)))
_ = exporterWriteSet.foreach {
_ += entryId.toByteString -> envelopedLogEntry
}
index <- ledgerStateOperations
.appendToLog(
entryId.toByteString,

View File

@ -11,15 +11,14 @@ import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
import com.daml.ledger.participant.state.kvutils.api.LedgerReader
import com.daml.ledger.participant.state.kvutils.export.LedgerDataExporter
import com.daml.ledger.participant.state.kvutils.{Envelope, KeyValueCommitting}
import com.daml.ledger.participant.state.kvutils.export.{LedgerDataExporter, SubmissionAggregator}
import com.daml.ledger.participant.state.kvutils.{CorrelationId, Envelope, KeyValueCommitting}
import com.daml.ledger.participant.state.v1.ParticipantId
import com.daml.ledger.validator
import com.daml.ledger.validator.SubmissionValidator.LogEntryAndState
import com.daml.ledger.validator._
import com.daml.lf.data.Time
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.engine.Engine
import com.daml.logging.LoggingContext.newLoggingContext
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.{Metrics, Timed}
@ -36,28 +35,16 @@ object BatchedSubmissionValidator {
committer: KeyValueCommitting,
conflictDetection: ConflictDetection,
metrics: Metrics,
ledgerDataExporter: LedgerDataExporter = LedgerDataExporter())
: BatchedSubmissionValidator[CommitResult] =
ledgerDataExporter: LedgerDataExporter,
): BatchedSubmissionValidator[CommitResult] =
new BatchedSubmissionValidator[CommitResult](
params,
committer,
conflictDetection,
metrics,
ledgerDataExporter
ledgerDataExporter,
)
private[validator] def apply[CommitResult](
params: BatchedSubmissionValidatorParameters,
engine: Engine,
metrics: Metrics): BatchedSubmissionValidator[CommitResult] =
new BatchedSubmissionValidator[CommitResult](
params,
new KeyValueCommitting(engine, metrics),
new ConflictDetection(metrics),
metrics)
private type CorrelationId = String
/** A [[DamlSubmission]] with an associated correlation id and a log entry id computed
* from the envelope. */
private case class CorrelatedSubmission(
@ -104,7 +91,8 @@ class BatchedSubmissionValidator[CommitResult] private[validator] (
committer: KeyValueCommitting,
conflictDetection: ConflictDetection,
damlMetrics: Metrics,
ledgerDataExporter: LedgerDataExporter = LedgerDataExporter()) {
ledgerDataExporter: LedgerDataExporter,
) {
import BatchedSubmissionValidator._
@ -126,11 +114,12 @@ class BatchedSubmissionValidator[CommitResult] private[validator] (
commitStrategy: CommitStrategy[CommitResult]
)(implicit materializer: Materializer, executionContext: ExecutionContext): Future[Unit] =
withCorrelationIdLogged(correlationId) { implicit loggingContext =>
ledgerDataExporter.addSubmission(
submissionEnvelope,
val exporterAggregator = ledgerDataExporter.addSubmission(
participantId,
correlationId,
submissionEnvelope,
recordTimeInstant,
participantId)
)
val recordTime = Time.Timestamp.assertFromInstant(recordTimeInstant)
Timed.future(
metrics.validateAndCommit, {
@ -138,11 +127,11 @@ class BatchedSubmissionValidator[CommitResult] private[validator] (
case Right(Envelope.SubmissionMessage(submission)) =>
processBatch(
participantId,
correlationId,
recordTime,
singleSubmissionSource(submissionEnvelope, submission, correlationId),
ledgerStateReader,
commitStrategy
commitStrategy,
exporterAggregator,
)
case Right(Envelope.SubmissionBatchMessage(batch)) =>
@ -151,11 +140,12 @@ class BatchedSubmissionValidator[CommitResult] private[validator] (
metrics.receivedBatchSubmissionBytes.update(batch.getSerializedSize)
processBatch(
participantId,
correlationId,
recordTime,
batchSubmissionSource(batch),
ledgerStateReader,
commitStrategy)
commitStrategy,
exporterAggregator,
)
case Right(other) =>
Future.failed(
@ -231,7 +221,9 @@ class BatchedSubmissionValidator[CommitResult] private[validator] (
private case class ValidatedSubmission(
correlatedSubmission: CorrelatedSubmission,
inputState: DamlInputState,
logEntryAndState: LogEntryAndState)
logEntryAndState: LogEntryAndState,
exporterWriteSet: SubmissionAggregator.WriteSetBuilder,
)
private type Outputs2 = Indexed[ValidatedSubmission]
@ -253,13 +245,15 @@ class BatchedSubmissionValidator[CommitResult] private[validator] (
*/
private def processBatch(
participantId: ParticipantId,
batchCorrelationId: CorrelationId,
recordTime: Timestamp,
indexedSubmissions: Source[Inputs, NotUsed],
damlLedgerStateReader: DamlLedgerStateReader,
commitStrategy: CommitStrategy[CommitResult])(
commitStrategy: CommitStrategy[CommitResult],
exporterAggregator: SubmissionAggregator,
)(
implicit materializer: Materializer,
executionContext: ExecutionContext): Future[Unit] =
executionContext: ExecutionContext,
): Future[Unit] =
indexedSubmissions
// Fetch the submission inputs in parallel.
.mapAsyncUnordered[Outputs1](params.readParallelism) {
@ -269,10 +263,14 @@ class BatchedSubmissionValidator[CommitResult] private[validator] (
.mapAsyncUnordered[Outputs2](params.cpuParallelism) {
_.mapFuture {
case (correlatedSubmission, inputState) =>
ledgerDataExporter.addParentChild(
batchCorrelationId,
correlatedSubmission.correlationId)
validateSubmission(participantId, recordTime, correlatedSubmission, inputState)
val exporterWriteSet = exporterAggregator.addChild()
validateSubmission(
participantId,
recordTime,
correlatedSubmission,
inputState,
exporterWriteSet,
)
}
}
// Collect the results.
@ -289,26 +287,40 @@ class BatchedSubmissionValidator[CommitResult] private[validator] (
val invalidatedKeys = mutable.Set.empty[DamlStateKey]
{
case ValidatedSubmission(correlatedSubmission, inputState, logEntryAndOutputState) =>
case ValidatedSubmission(
correlatedSubmission,
inputState,
logEntryAndOutputState,
exporterWriteSet,
) =>
detectConflictsAndRecover(
correlatedSubmission,
inputState,
logEntryAndOutputState,
invalidatedKeys)
invalidatedKeys,
exporterWriteSet,
)
}
}
// Commit the results.
.mapAsync[Outputs6](params.commitParallelism) {
case ValidatedSubmission(correlatedSubmission, inputState, logEntryAndOutputState) =>
case ValidatedSubmission(
correlatedSubmission,
inputState,
logEntryAndOutputState,
exporterWriteSet,
) =>
commitResult(
participantId,
correlatedSubmission,
inputState,
logEntryAndOutputState,
commitStrategy)
commitStrategy,
exporterWriteSet,
)
}
.runWith(Sink.ignore)
.map(_ => ledgerDataExporter.finishedProcessing(batchCorrelationId))
.map(_ => exporterAggregator.finish())
private def fetchSubmissionInputs(
correlatedSubmission: CorrelatedSubmission,
@ -332,8 +344,9 @@ class BatchedSubmissionValidator[CommitResult] private[validator] (
participantId: ParticipantId,
recordTime: Timestamp,
correlatedSubmission: CorrelatedSubmission,
inputState: DamlInputState)(
implicit executionContext: ExecutionContext): Future[ValidatedSubmission] =
inputState: DamlInputState,
exporterWriteSet: SubmissionAggregator.WriteSetBuilder,
)(implicit executionContext: ExecutionContext): Future[ValidatedSubmission] =
withSubmissionLoggingContext(correlatedSubmission) { _ =>
Timed.timedAndTrackedFuture(
metrics.validate,
@ -347,7 +360,7 @@ class BatchedSubmissionValidator[CommitResult] private[validator] (
participantId,
inputState
)
ValidatedSubmission(correlatedSubmission, inputState, logEntryAndState)
ValidatedSubmission(correlatedSubmission, inputState, logEntryAndState, exporterWriteSet)
}
)
}
@ -356,8 +369,9 @@ class BatchedSubmissionValidator[CommitResult] private[validator] (
correlatedSubmission: CorrelatedSubmission,
inputState: DamlInputState,
logEntryAndState: LogEntryAndState,
invalidatedKeys: mutable.Set[DamlStateKey])
: scala.collection.immutable.Iterable[ValidatedSubmission] = {
invalidatedKeys: mutable.Set[DamlStateKey],
exporterWriteSet: SubmissionAggregator.WriteSetBuilder,
): scala.collection.immutable.Iterable[ValidatedSubmission] = {
val (logEntry, outputState) = logEntryAndState
withSubmissionLoggingContext(correlatedSubmission) { implicit loggingContext =>
Timed.value(
@ -372,7 +386,12 @@ class BatchedSubmissionValidator[CommitResult] private[validator] (
.map {
case (newInvalidatedKeys, (newLogEntry, newState)) =>
invalidatedKeys ++= newInvalidatedKeys
ValidatedSubmission(correlatedSubmission, inputState, (newLogEntry, newState)) :: Nil
ValidatedSubmission(
correlatedSubmission,
inputState,
(newLogEntry, newState),
exporterWriteSet,
) :: Nil
}
.getOrElse {
logger.info(
@ -389,23 +408,27 @@ class BatchedSubmissionValidator[CommitResult] private[validator] (
correlatedSubmission: CorrelatedSubmission,
inputState: DamlInputState,
logEntryAndState: LogEntryAndState,
commitStrategy: CommitStrategy[CommitResult])(
implicit executionContext: ExecutionContext): Future[Unit] = {
commitStrategy: CommitStrategy[CommitResult],
exporterWriteSet: SubmissionAggregator.WriteSetBuilder,
)(implicit executionContext: ExecutionContext): Future[Unit] = {
val (logEntry, outputState) = logEntryAndState
withSubmissionLoggingContext(correlatedSubmission) { _ =>
Timed.timedAndTrackedFuture(
metrics.commit,
metrics.commitRunning,
commitStrategy
.commit(
participantId,
correlatedSubmission.correlationId,
correlatedSubmission.logEntryId,
logEntry,
inputState,
outputState)
.map(_ => ())
)
Timed
.timedAndTrackedFuture(
metrics.commit,
metrics.commitRunning,
commitStrategy
.commit(
participantId,
correlatedSubmission.correlationId,
correlatedSubmission.logEntryId,
logEntry,
inputState,
outputState,
Some(exporterWriteSet),
)
)
.map(_ => ())
}
}
}

View File

@ -5,7 +5,6 @@ package com.daml.ledger.validator.batch
import com.daml.caching.Cache
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue}
import com.daml.ledger.participant.state.kvutils.export.LedgerDataExporter
import com.daml.ledger.validator.LedgerStateOperations.{Key, Value}
import com.daml.ledger.validator.caching.{
CacheUpdatePolicy,
@ -22,8 +21,6 @@ import com.daml.ledger.validator.{
LogAppendingCommitStrategy,
StateKeySerializationStrategy
}
import com.daml.lf.engine.Engine
import com.daml.metrics.Metrics
import scala.concurrent.{ExecutionContext, Future}
@ -50,17 +47,16 @@ object BatchedSubmissionValidatorFactory {
def readerAndCommitStrategyFrom[LogResult](
ledgerStateOperations: LedgerStateOperations[LogResult],
keySerializationStrategy: StateKeySerializationStrategy = DefaultStateKeySerializationStrategy,
ledgerDataExporter: LedgerDataExporter = LedgerDataExporter())(
implicit executionContext: ExecutionContext)
)(implicit executionContext: ExecutionContext)
: (DamlLedgerStateReader, CommitStrategy[LogResult]) = {
val ledgerStateReader = DamlLedgerStateReader.from(
new LedgerStateReaderAdapter[LogResult](ledgerStateOperations),
keySerializationStrategy)
val commitStrategy =
new LogAppendingCommitStrategy[LogResult](
ledgerStateOperations,
keySerializationStrategy,
ledgerDataExporter)
keySerializationStrategy,
)
val commitStrategy = new LogAppendingCommitStrategy[LogResult](
ledgerStateOperations,
keySerializationStrategy,
)
(ledgerStateReader, commitStrategy)
}
@ -69,8 +65,7 @@ object BatchedSubmissionValidatorFactory {
stateCache: Cache[DamlStateKey, DamlStateValue],
cacheUpdatePolicy: CacheUpdatePolicy,
keySerializationStrategy: StateKeySerializationStrategy = DefaultStateKeySerializationStrategy,
ledgerDataExporter: LedgerDataExporter = LedgerDataExporter())(
implicit executionContext: ExecutionContext)
)(implicit executionContext: ExecutionContext)
: (DamlLedgerStateReader with QueryableReadSet, CommitStrategy[LogResult]) = {
val ledgerStateReader = new CachingDamlLedgerStateReader(
stateCache,
@ -78,7 +73,8 @@ object BatchedSubmissionValidatorFactory {
keySerializationStrategy,
DamlLedgerStateReader.from(
new LedgerStateReaderAdapter[LogResult](ledgerStateOperations),
keySerializationStrategy)
keySerializationStrategy,
),
)
val commitStrategy = new CachingCommitStrategy(
stateCache,
@ -86,31 +82,8 @@ object BatchedSubmissionValidatorFactory {
new LogAppendingCommitStrategy[LogResult](
ledgerStateOperations,
keySerializationStrategy,
ledgerDataExporter)
)
)
(ledgerStateReader, commitStrategy)
}
case class CachingEnabledComponents[LogResult](
ledgerStateReader: DamlLedgerStateReader with QueryableReadSet,
commitStrategy: CommitStrategy[LogResult],
batchValidator: BatchedSubmissionValidator[LogResult])
def componentsEnabledForCaching[LogResult](
params: BatchedSubmissionValidatorParameters,
ledgerStateOperations: LedgerStateOperations[LogResult],
stateCache: Cache[DamlStateKey, DamlStateValue],
cacheUpdatePolicy: CacheUpdatePolicy,
metrics: Metrics,
engine: Engine
)(implicit executionContext: ExecutionContext): CachingEnabledComponents[LogResult] = {
val (ledgerStateReader, commitStrategy) =
cachingReaderAndCommitStrategyFrom(ledgerStateOperations, stateCache, cacheUpdatePolicy)
val batchValidator = BatchedSubmissionValidator[LogResult](
params,
engine,
metrics
)
CachingEnabledComponents(ledgerStateReader, commitStrategy, batchValidator)
}
}

View File

@ -6,6 +6,7 @@ package com.daml.ledger.validator.caching
import com.daml.caching.Cache
import com.daml.ledger.participant.state.kvutils.DamlKvutils
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue}
import com.daml.ledger.participant.state.kvutils.export.SubmissionAggregator
import com.daml.ledger.participant.state.v1.ParticipantId
import com.daml.ledger.validator.CommitStrategy
@ -22,7 +23,9 @@ class CachingCommitStrategy[Result](
entryId: DamlKvutils.DamlLogEntryId,
entry: DamlKvutils.DamlLogEntry,
inputState: Map[DamlStateKey, Option[DamlStateValue]],
outputState: Map[DamlStateKey, DamlStateValue]): Future[Result] =
outputState: Map[DamlStateKey, DamlStateValue],
exporterWriteSet: Option[SubmissionAggregator.WriteSetBuilder],
): Future[Result] =
for {
_ <- Future {
outputState.view.filter { case (key, _) => shouldCache(key) }.foreach {
@ -35,6 +38,8 @@ class CachingCommitStrategy[Result](
entryId,
entry,
inputState,
outputState)
outputState,
exporterWriteSet,
)
} yield result
}

View File

@ -3,13 +3,12 @@
package com.daml.ledger
import com.daml.ledger.participant.state.kvutils.Bytes
import com.daml.ledger.participant.state.kvutils.{Bytes, CorrelationId}
import com.daml.ledger.participant.state.v1.{ParticipantId, SubmissionResult}
import scala.concurrent.Future
package object validator {
type CorrelationId = String
type SubmissionEnvelope = Bytes
type SubmittingParticipantId = ParticipantId

View File

@ -11,56 +11,7 @@ import com.google.protobuf.ByteString
import org.scalatest.mockito.MockitoSugar
import org.scalatest.{Matchers, WordSpec}
class FileBasedLedgerDataExportSpec extends WordSpec with Matchers with MockitoSugar {
// XXX SC remove in Scala 2.13; see notes in ConfSpec
import scala.collection.GenTraversable, org.scalatest.enablers.Containing
private[this] implicit def `fixed sig containingNatureOfGenTraversable`[
E: org.scalactic.Equality,
TRAV]: Containing[TRAV with GenTraversable[E]] =
Containing.containingNatureOfGenTraversable[E, GenTraversable]
"addParentChild" should {
"add entry to correlation ID mapping" in {
val instance = new FileBasedLedgerDataExporter(mock[DataOutputStream])
instance.addParentChild("parent", "child")
instance.correlationIdMapping should contain("child" -> "parent")
}
}
"addToWriteSet" should {
"append to existing data" in {
val instance = new FileBasedLedgerDataExporter(mock[DataOutputStream])
instance.addParentChild("parent", "child")
instance.addToWriteSet("child", Seq(keyValuePairOf("a", "b")))
instance.addToWriteSet("child", Seq(keyValuePairOf("c", "d")))
instance.bufferedKeyValueDataPerCorrelationId should contain(
"parent" ->
Seq(keyValuePairOf("a", "b"), keyValuePairOf("c", "d")))
}
}
"finishedProcessing" should {
"remove all data such as submission info, write-set and child correlation IDs" in {
val dataOutputStream = new DataOutputStream(new ByteArrayOutputStream())
val instance = new FileBasedLedgerDataExporter(dataOutputStream)
instance.addSubmission(
ByteString.copyFromUtf8("an envelope"),
"parent",
Instant.now(),
v1.ParticipantId.assertFromString("id"))
instance.addParentChild("parent", "parent")
instance.addToWriteSet("parent", Seq(keyValuePairOf("a", "b")))
instance.finishedProcessing("parent")
instance.inProgressSubmissions shouldBe empty
instance.bufferedKeyValueDataPerCorrelationId shouldBe empty
instance.correlationIdMapping shouldBe empty
}
}
final class FileBasedLedgerDataExportSpec extends WordSpec with Matchers with MockitoSugar {
"serialized submission" should {
"be readable back" in {
val baos = new ByteArrayOutputStream()
@ -68,23 +19,33 @@ class FileBasedLedgerDataExportSpec extends WordSpec with Matchers with MockitoS
val instance = new FileBasedLedgerDataExporter(dataOutputStream)
val expectedRecordTimeInstant = Instant.ofEpochSecond(123456, 123456789)
val expectedParticipantId = v1.ParticipantId.assertFromString("id")
instance.addSubmission(
ByteString.copyFromUtf8("an envelope"),
"parent",
expectedRecordTimeInstant,
v1.ParticipantId.assertFromString("id"))
instance.addParentChild("parent", "parent")
instance.addToWriteSet("parent", Seq(keyValuePairOf("a", "b")))
instance.finishedProcessing("parent")
val submission = instance.addSubmission(
v1.ParticipantId.assertFromString("id"),
"parent",
ByteString.copyFromUtf8("an envelope"),
expectedRecordTimeInstant,
)
val writeSetA1 = submission.addChild()
writeSetA1 ++= Seq(keyValuePairOf("a", "b"), keyValuePairOf("c", "d"))
val writeSetA2 = submission.addChild()
writeSetA2 ++= Seq(keyValuePairOf("e", "f"), keyValuePairOf("g", "h"))
submission.finish()
val dataInputStream = new DataInputStream(new ByteArrayInputStream(baos.toByteArray))
val (actualSubmissionInfo, actualWriteSet) = Serialization.readEntry(dataInputStream)
val (actualSubmissionInfo, actualWriteSet) = Deserialization.deserializeEntry(dataInputStream)
actualSubmissionInfo.submissionEnvelope should be(ByteString.copyFromUtf8("an envelope"))
actualSubmissionInfo.correlationId should be("parent")
actualSubmissionInfo.recordTimeInstant should be(expectedRecordTimeInstant)
actualSubmissionInfo.participantId should be(expectedParticipantId)
actualWriteSet should be(Seq(keyValuePairOf("a", "b")))
actualWriteSet should be(
Seq(
keyValuePairOf("a", "b"),
keyValuePairOf("c", "d"),
keyValuePairOf("e", "f"),
keyValuePairOf("g", "h"),
))
}
}

View File

@ -0,0 +1,47 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils.export
import java.time.Instant
import com.daml.ledger.participant.state.v1.ParticipantId
import com.google.protobuf.ByteString
import org.mockito.Mockito
import org.scalatest.mockito.MockitoSugar
import org.scalatest.{Matchers, WordSpec}
final class InMemorySubmissionAggregatorSpec extends WordSpec with Matchers with MockitoSugar {
"InMemorySubmissionAggregator" should {
"aggregate data" in {
val submissionInfo = SubmissionInfo(
ParticipantId.assertFromString("participant-id"),
"correlation ID",
ByteString.copyFromUtf8("the envelope"),
Instant.now(),
)
val writer = mock[LedgerDataWriter]
val submission = new InMemorySubmissionAggregator(submissionInfo, writer)
val writeSetA = submission.addChild()
writeSetA += keyValuePairOf("a", "b")
writeSetA += keyValuePairOf("c", "d")
val writeSetB = submission.addChild()
writeSetB += keyValuePairOf("e", "f")
writeSetB += keyValuePairOf("g", "h")
submission.finish()
val expected = Seq(
keyValuePairOf("a", "b"),
keyValuePairOf("c", "d"),
keyValuePairOf("e", "f"),
keyValuePairOf("g", "h"),
)
Mockito.verify(writer).write(submissionInfo, expected)
}
}
private def keyValuePairOf(key: String, value: String): (ByteString, ByteString) =
ByteString.copyFromUtf8(key) -> ByteString.copyFromUtf8(value)
}

View File

@ -9,7 +9,11 @@ import com.codahale.metrics.MetricRegistry
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import com.daml.ledger.participant.state.kvutils.DamlKvutils.DamlSubmissionBatch.CorrelatedSubmission
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
import com.daml.ledger.participant.state.kvutils.Envelope
import com.daml.ledger.participant.state.kvutils.export.{
NoOpLedgerDataExporter,
SubmissionAggregator
}
import com.daml.ledger.participant.state.kvutils.{Envelope, KeyValueCommitting}
import com.daml.ledger.participant.state.v1.ParticipantId
import com.daml.ledger.validator.TestHelper.{aParticipantId, anInvalidEnvelope, makePartySubmission}
import com.daml.ledger.validator.{CommitStrategy, DamlLedgerStateReader, ValidationFailed}
@ -20,8 +24,8 @@ import com.google.protobuf.ByteString
import org.mockito.ArgumentCaptor
import org.mockito.ArgumentMatchers.{any, argThat}
import org.mockito.Mockito._
import org.scalatest.{Assertion, AsyncWordSpec, Inside, Matchers}
import org.scalatest.mockito.MockitoSugar
import org.scalatest.{Assertion, AsyncWordSpec, Inside, Matchers}
import scala.collection.JavaConverters._
import scala.concurrent.Future
@ -36,13 +40,24 @@ class BatchedSubmissionValidatorSpec
private val engine = Engine.DevEngine()
private val metrics = new Metrics(new MetricRegistry)
private def newBatchedSubmissionValidator[CommitResult](
params: BatchedSubmissionValidatorParameters,
metrics: Metrics = this.metrics,
): BatchedSubmissionValidator[CommitResult] =
new BatchedSubmissionValidator[CommitResult](
params,
new KeyValueCommitting(engine, metrics),
new ConflictDetection(metrics),
metrics,
NoOpLedgerDataExporter,
)
"validateAndCommit" should {
"return validation failure for invalid envelope" in {
val validator = BatchedSubmissionValidator[Unit](
val validator = newBatchedSubmissionValidator[Unit](
BatchedSubmissionValidatorParameters.reasonableDefault,
engine,
metrics)
)
validator
.validateAndCommit(
@ -60,10 +75,9 @@ class BatchedSubmissionValidatorSpec
}
"return validation failure for invalid message type in envelope" in {
val validator = BatchedSubmissionValidator[Unit](
val validator = newBatchedSubmissionValidator[Unit](
BatchedSubmissionValidatorParameters.reasonableDefault,
engine,
metrics)
)
val notASubmission = Envelope.enclose(DamlStateValue.getDefaultInstance)
validator
@ -82,10 +96,9 @@ class BatchedSubmissionValidatorSpec
}
"return validation failure for invalid envelope in batch" in {
val validator = BatchedSubmissionValidator[Unit](
val validator = newBatchedSubmissionValidator[Unit](
BatchedSubmissionValidatorParameters.reasonableDefault,
engine,
metrics)
)
val batchSubmission = DamlSubmissionBatch.newBuilder
.addSubmissions(
CorrelatedSubmission.newBuilder
@ -124,12 +137,13 @@ class BatchedSubmissionValidatorSpec
any[DamlLogEntryId],
logEntryCaptor.capture(),
any[Map[DamlStateKey, Option[DamlStateValue]]],
outputStateCaptor.capture()))
outputStateCaptor.capture(),
any[Option[SubmissionAggregator.WriteSetBuilder]],
))
.thenReturn(Future.unit)
val validator = BatchedSubmissionValidator[Unit](
val validator = newBatchedSubmissionValidator[Unit](
BatchedSubmissionValidatorParameters.reasonableDefault,
engine,
metrics)
)
validator
.validateAndCommit(
@ -179,12 +193,13 @@ class BatchedSubmissionValidatorSpec
any[DamlLogEntryId],
logEntryCaptor.capture(),
any[Map[DamlStateKey, Option[DamlStateValue]]],
any[Map[DamlStateKey, DamlStateValue]]
any[Map[DamlStateKey, DamlStateValue]],
any[Option[SubmissionAggregator.WriteSetBuilder]],
))
.thenReturn(Future.unit)
val validatorConfig =
BatchedSubmissionValidatorParameters.reasonableDefault.copy(commitParallelism = 1)
val validator = BatchedSubmissionValidator[Unit](validatorConfig, engine, metrics)
val validator = newBatchedSubmissionValidator[Unit](validatorConfig)
validator
.validateAndCommit(
@ -202,7 +217,9 @@ class BatchedSubmissionValidatorSpec
any[DamlLogEntryId],
any[DamlLogEntry],
any[DamlInputState],
any[DamlOutputState])
any[DamlOutputState],
any[Option[SubmissionAggregator.WriteSetBuilder]],
)
// Verify that the log entries have been committed in the right order.
val logEntries = logEntryCaptor.getAllValues.asScala.map(_.asInstanceOf[DamlLogEntry])
logEntries.map(_.getPartyAllocationEntry) should be(
@ -233,13 +250,13 @@ class BatchedSubmissionValidatorSpec
any[DamlLogEntryId],
any[DamlLogEntry],
any[Map[DamlStateKey, Option[DamlStateValue]]],
any[Map[DamlStateKey, DamlStateValue]]
any[Map[DamlStateKey, DamlStateValue]],
any[Option[SubmissionAggregator.WriteSetBuilder]],
))
.thenReturn(Future.unit)
val validator = BatchedSubmissionValidator[Unit](
val validator = newBatchedSubmissionValidator[Unit](
BatchedSubmissionValidatorParameters.reasonableDefault,
engine,
metrics)
)
validator
.validateAndCommit(
@ -258,7 +275,9 @@ class BatchedSubmissionValidatorSpec
any[DamlLogEntryId],
any[DamlLogEntry],
any[DamlInputState],
any[DamlOutputState])
any[DamlOutputState],
any[Option[SubmissionAggregator.WriteSetBuilder]],
)
succeed
}
}
@ -279,14 +298,14 @@ class BatchedSubmissionValidatorSpec
any[DamlLogEntryId],
any[DamlLogEntry],
any[Map[DamlStateKey, Option[DamlStateValue]]],
any[Map[DamlStateKey, DamlStateValue]]
any[Map[DamlStateKey, DamlStateValue]],
any[Option[SubmissionAggregator.WriteSetBuilder]],
))
.thenReturn(Future.unit)
val validator =
BatchedSubmissionValidator[Unit](
BatchedSubmissionValidatorParameters.reasonableDefault,
engine,
metrics)
val validator = newBatchedSubmissionValidator[Unit](
BatchedSubmissionValidatorParameters.reasonableDefault,
metrics = metrics,
)
validator
.validateAndCommit(
@ -335,12 +354,14 @@ class BatchedSubmissionValidatorSpec
any[DamlLogEntryId],
logEntryCaptor.capture(),
any[Map[DamlStateKey, Option[DamlStateValue]]],
outputStateCaptor.capture()))
outputStateCaptor.capture(),
any[Option[SubmissionAggregator.WriteSetBuilder]],
))
.thenReturn(Future.unit)
val validatorConfig =
BatchedSubmissionValidatorParameters.reasonableDefault.copy(
commitParallelism = commitParallelism)
val validator = BatchedSubmissionValidator[Unit](validatorConfig, engine, metrics)
val validator = newBatchedSubmissionValidator[Unit](validatorConfig)
validator
.validateAndCommit(
@ -360,7 +381,9 @@ class BatchedSubmissionValidatorSpec
any[DamlLogEntryId],
any[DamlLogEntry],
any[DamlInputState],
any[DamlOutputState])
any[DamlOutputState],
any[Option[SubmissionAggregator.WriteSetBuilder]],
)
// Verify we have all the expected log entries.
val logEntries = logEntryCaptor.getAllValues.asScala.map(_.asInstanceOf[DamlLogEntry])
logEntries.map(_.getPartyAllocationEntry) should contain allElementsOf

View File

@ -11,6 +11,7 @@ import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
DamlStateValue
}
import com.daml.ledger.participant.state.kvutils.caching.`Message Weight`
import com.daml.ledger.participant.state.kvutils.export.SubmissionAggregator
import com.daml.ledger.participant.state.v1.ParticipantId
import com.daml.ledger.validator.CommitStrategy
import com.daml.ledger.validator.TestHelper._
@ -64,7 +65,8 @@ class CachingCommitStrategySpec extends AsyncWordSpec with Matchers with Mockito
any[DamlLogEntryId](),
any[DamlLogEntry](),
any[Map[DamlStateKey, Option[DamlStateValue]]](),
any[Map[DamlStateKey, DamlStateValue]]()
any[Map[DamlStateKey, DamlStateValue]](),
any[Option[SubmissionAggregator.WriteSetBuilder]],
))
.thenReturn(Future.unit)
new CachingCommitStrategy[Unit](cache, _ => shouldCache, mockCommitStrategy)

View File

@ -9,7 +9,7 @@ def _integrity_test_impl(ctx):
set -eux
{checker} $(rlocation "$TEST_WORKSPACE/{dump}")
""".format(
checker = ctx.executable._checker.short_path,
checker = ctx.executable.checker.short_path,
dump = ctx.file.dump.short_path,
),
is_executable = True,
@ -17,7 +17,7 @@ set -eux
runfiles = ctx.runfiles(
files = [wrapper, ctx.file.dump],
)
runfiles = runfiles.merge(ctx.attr._checker[DefaultInfo].default_runfiles)
runfiles = runfiles.merge(ctx.attr.checker[DefaultInfo].default_runfiles)
return DefaultInfo(
executable = wrapper,
files = depset([wrapper]),
@ -29,11 +29,7 @@ integrity_test = rule(
test = True,
executable = True,
attrs = {
"_checker": attr.label(
cfg = "host",
executable = True,
default = Label("@//ledger/participant-state/kvutils/tools:integrity-check"),
),
"checker": attr.label(cfg = "host", executable = True),
"dump": attr.label(allow_single_file = True),
},
)

View File

@ -3,7 +3,7 @@
package com.daml.ledger.participant.state.kvutils.tools.export
import com.daml.ledger.participant.state.kvutils.export.FileBasedLedgerDataExporter.WriteSet
import com.daml.ledger.participant.state.kvutils.export.WriteSet
import com.daml.ledger.validator.LedgerStateOperations.{Key, Value}
import com.daml.ledger.validator.{
CommitStrategy,

View File

@ -11,11 +11,12 @@ import akka.stream.Materializer
import com.codahale.metrics.{ConsoleReporter, MetricRegistry}
import com.daml.ledger.participant.state.kvutils
import com.daml.ledger.participant.state.kvutils.KeyValueCommitting
import com.daml.ledger.participant.state.kvutils.export.FileBasedLedgerDataExporter.{
import com.daml.ledger.participant.state.kvutils.export.{
Deserialization,
NoOpLedgerDataExporter,
SubmissionInfo,
WriteSet
}
import com.daml.ledger.participant.state.kvutils.export.{NoopLedgerDataExporter, Serialization}
import com.daml.ledger.participant.state.kvutils.tools._
import com.daml.ledger.validator.LedgerStateOperations.{Key, Value}
import com.daml.ledger.validator.batch.{
@ -47,7 +48,7 @@ class IntegrityChecker[LogResult](commitStrategySupport: CommitStrategySupport[L
new KeyValueCommitting(engine, metrics),
new ConflictDetection(metrics),
metrics,
NoopLedgerDataExporter,
NoOpLedgerDataExporter,
)
val ComponentsForReplay(reader, commitStrategy, queryableWriteSet) =
commitStrategySupport.createComponentsForReplay()
@ -165,7 +166,7 @@ class IntegrityChecker[LogResult](commitStrategySupport: CommitStrategySupport[L
.orElse(commitStrategySupport.explainMismatchingValue(key, expectedValue, actualValue))
private def readSubmissionAndOutputs(input: DataInputStream): (SubmissionInfo, WriteSet) = {
val (submissionInfo, writeSet) = Serialization.readEntry(input)
val (submissionInfo, writeSet) = Deserialization.deserializeEntry(input)
println(
s"Read submission correlationId=${submissionInfo.correlationId} submissionEnvelopeSize=${submissionInfo.submissionEnvelope
.size()} writeSetSize=${writeSet.size}")

View File

@ -5,7 +5,6 @@ package com.daml.ledger.participant.state.kvutils.tools.export
import com.daml.ledger.on.memory.{InMemoryLedgerStateOperations, Index}
import com.daml.ledger.participant.state.kvutils
import com.daml.ledger.participant.state.kvutils.export.NoopLedgerDataExporter
import com.daml.ledger.participant.state.kvutils.tools.export.IntegrityChecker.bytesAsHexString
import com.daml.ledger.validator.LedgerStateOperations.{Key, Value}
import com.daml.ledger.validator.StateKeySerializationStrategy
@ -26,7 +25,7 @@ object LogAppendingCommitStrategySupport extends CommitStrategySupport[Index] {
BatchedSubmissionValidatorFactory.readerAndCommitStrategyFrom(
writeRecordingLedgerStateOperations,
stateKeySerializationStrategy,
NoopLedgerDataExporter)
)
ComponentsForReplay(reader, commitStrategy, writeRecordingLedgerStateOperations)
}

View File

@ -3,7 +3,7 @@
package com.daml.ledger.participant.state.kvutils.tools.export
import com.daml.ledger.participant.state.kvutils.export.FileBasedLedgerDataExporter.WriteSet
import com.daml.ledger.participant.state.kvutils.export.WriteSet
import com.daml.ledger.validator.LedgerStateOperations
import com.daml.ledger.validator.LedgerStateOperations.{Key, Value}

View File

@ -9,8 +9,7 @@ import java.nio.file.{Files, Path, Paths}
import java.util.concurrent.TimeUnit
import com.daml.ledger.participant.state.kvutils.Conversions._
import com.daml.ledger.participant.state.kvutils.export.FileBasedLedgerDataExporter.SubmissionInfo
import com.daml.ledger.participant.state.kvutils.export.Serialization
import com.daml.ledger.participant.state.kvutils.export.{Deserialization, SubmissionInfo}
import com.daml.ledger.participant.state.kvutils.{Envelope, DamlKvutils => Proto}
import com.daml.ledger.participant.state.v1.ParticipantId
import com.daml.lf.archive.{Decode, UniversalArchiveReader}
@ -18,8 +17,9 @@ import com.daml.lf.crypto
import com.daml.lf.data._
import com.daml.lf.engine.Engine
import com.daml.lf.language.{Ast, LanguageVersion, Util => AstUtil}
import com.daml.lf.transaction.{GlobalKey, GlobalKeyWithMaintainers}
import com.daml.lf.transaction.{
GlobalKey,
GlobalKeyWithMaintainers,
Node,
SubmittedTransaction,
Transaction => Tx,
@ -164,7 +164,7 @@ object Replay {
def go: Stream[SubmissionInfo] =
if (ledgerExportStream.available() > 0)
Serialization.readEntry(ledgerExportStream)._1 #:: go
Deserialization.deserializeEntry(ledgerExportStream)._1 #:: go
else {
ledgerExportStream.close()
Stream.empty

View File

@ -3,7 +3,7 @@
package com.daml.ledger.participant.state.kvutils.tools
import com.daml.ledger.participant.state.kvutils.export.FileBasedLedgerDataExporter.WriteSet
import com.daml.ledger.participant.state.kvutils.export.WriteSet
import com.daml.ledger.participant.state.kvutils.tools.export.{
CommitStrategySupport,
IntegrityChecker