Generate SubmissionEntry from KV export (#13209)

We split kv replay as follow:

- [LF] we introduce a simple protobuf to store submission entry (which
  roughly correspond to TRANSACTION_ENTRY and PACKAGE_UPLOAD_ENTRY KV
  submission)

- [KV] we create a tool to extract form a ledger export a sequence of
  submission entry

- [LF] we move the replay tool from KV to LF, and base it on
  submission entries instead of ledger export

The objectify of this split is double:

- on the one side it will simplify maintenance, as it will make API
  between KV and LF more clear (depends only on a protobuf definition)

- on the other side, it will made the use of the tool handy, as it
  will make possible to benchmark the engine without direct dependency
  between KV and LF.

CHANGELOG_BEGIN
CHANGELOG_END
This commit is contained in:
Remy 2022-03-10 08:47:04 +01:00 committed by GitHub
parent 6b72646807
commit 8abf0bae96
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 186 additions and 0 deletions

View File

@ -0,0 +1,20 @@
# Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
load("//bazel_tools:proto.bzl", "proto_jars")
load(
"//bazel_tools:scala.bzl",
"da_scala_library",
"da_scala_test",
"lf_scalacopts",
"lf_scalacopts_stricter",
)
proto_jars(
name = "snapshot-proto",
srcs = ["src/main/protobuf/com/daml/lf/snapshot.proto"],
maven_artifact_prefix = "daml-lf-snapshot",
maven_group = "com.daml",
strip_import_prefix = "src/main/protobuf/",
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,25 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
syntax = "proto3";
package com.daml.lf.testing.snapshot;
option java_package = "com.daml.lf.testing.snapshot";
message TransactionEntry {
bytes rawTransaction = 1;
string participantId = 2;
repeated string submitters = 3;
int64 ledgerTime = 4;
int64 submissionTime = 5;
bytes submissionSeed = 6;
}
message SubmissionEntry {
oneof entry {
TransactionEntry transaction = 1;
bytes archives = 2;
}
}

View File

@ -170,6 +170,20 @@ da_scala_library(
],
)
da_scala_binary(
name = "submission-entries-extractor",
srcs = glob(["submission-entries-extractor/src/**/*.scala"]),
main_class = "com.daml.ledger.participant.state.kvutils.tools.snapshot.SubmissionEntriesExtractor",
scala_deps = ["@maven//:com_github_scopt_scopt"],
deps = [
"//daml-lf/data",
"//daml-lf/snapshot:snapshot-proto_java",
"//ledger/participant-state/kvutils",
"//ledger/participant-state/kvutils:daml_kvutils_proto_java",
"@maven//:com_google_protobuf_protobuf_java",
],
)
da_scala_benchmark_jmh(
name = "engine-replay-benchmark",
srcs = glob(["engine-replay/src/benchmark/**/*.scala"]),

View File

@ -0,0 +1,127 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils.tools.snapshot
import com.daml.ledger.participant.state.kvutils.Conversions._
import com.daml.ledger.participant.state.kvutils.export.{
ProtobufBasedLedgerDataImporter,
SubmissionInfo,
}
import com.daml.ledger.participant.state.kvutils.wire.DamlSubmission
import com.daml.ledger.participant.state.kvutils.{Envelope, Raw}
import com.daml.lf.data._
import com.daml.lf.testing.snapshot.Snapshot
import java.io.BufferedOutputStream
import java.nio.file.{Files, Path, Paths}
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal
object SubmissionEntriesExtractor extends App {
private[this] def decodeSubmissionInfo(submissionInfo: SubmissionInfo) =
decodeEnvelope(submissionInfo.participantId, submissionInfo.submissionEnvelope)
private[this] def decodeEnvelope(
participantId: Ref.ParticipantId,
envelope: Raw.Envelope,
): LazyList[Snapshot.SubmissionEntry] =
assertRight(Envelope.open(envelope)) match {
case Envelope.SubmissionMessage(submission) =>
decodeSubmission(participantId, submission)
case Envelope.SubmissionBatchMessage(batch) =>
batch.getSubmissionsList.asScala
.to(LazyList)
.map(_.getSubmission)
.flatMap(submissionEnvelope =>
decodeEnvelope(participantId, Raw.Envelope(submissionEnvelope))
)
case Envelope.LogEntryMessage(_) | Envelope.StateValueMessage(_) =>
LazyList.empty
}
private[this] def decodeSubmission(
participantId: Ref.ParticipantId,
submission: DamlSubmission,
): LazyList[Snapshot.SubmissionEntry] = {
submission.getPayloadCase match {
case DamlSubmission.PayloadCase.TRANSACTION_ENTRY =>
val entry = submission.getTransactionEntry
LazyList(
Snapshot.SubmissionEntry
.newBuilder()
.setTransaction(
Snapshot.TransactionEntry
.newBuilder()
.setRawTransaction(entry.getRawTransaction)
.setParticipantId(participantId)
.addAllSubmitters(entry.getSubmitterInfo.getSubmittersList)
.setLedgerTime(parseTimestamp(entry.getLedgerEffectiveTime).micros)
.setSubmissionTime(parseTimestamp(entry.getSubmissionTime).micros)
.setSubmissionSeed(entry.getSubmissionSeed)
)
.build()
)
case DamlSubmission.PayloadCase.PACKAGE_UPLOAD_ENTRY =>
val entry = submission.getPackageUploadEntry
entry.getArchivesList.asScala.iterator
.map(
Snapshot.SubmissionEntry
.newBuilder()
.setArchives(_)
.build()
)
.to(LazyList)
case _ =>
LazyList.empty
}
}
case class Config(
input: Option[Path] = None,
output: Option[Path] = None,
)
import scopt.OParser
val builder = OParser.builder[Config]
val parser = {
import builder._
OParser.sequence(
programName("submission-entries-extractor"),
head("extractor", "1.0"),
arg[String]("input")
.action((x, c) => c.copy(input = Some(Paths.get(x))))
.text("path of the ledger export"),
opt[String]('o', "output ")
.action((x, c) => c.copy(output = Some(Paths.get(x))))
.text("path of the submssion entries file"),
)
}
// OParser.parse returns Option[Config]
OParser.parse(parser, args, Config()) match {
case Some(Config(inputFile, outputFile)) =>
val importer = ProtobufBasedLedgerDataImporter(inputFile.get)
val output = outputFile match {
case Some(path) => new BufferedOutputStream(Files.newOutputStream(path))
case None => scala.sys.process.stdout
}
try {
importer.read().map(_._1).flatMap(decodeSubmissionInfo).foreach {
_.writeDelimitedTo(output)
}
} catch {
case NonFatal(e) =>
sys.error("Error: " + e.getMessage)
} finally {
importer.close()
output.close()
}
case None =>
sys.exit(1)
}
}