kvutils: Make Step a trait to carry the logging context more easily. (#9525)

Implicits and lambdas don't work well together.

CHANGELOG_BEGIN
CHANGELOG_END
This commit is contained in:
Samir Talwar 2021-04-29 10:05:59 +02:00 committed by GitHub
parent 977b23ac9f
commit 9527b2e2ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 600 additions and 443 deletions

View File

@ -0,0 +1,13 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils.committer
import com.daml.logging.LoggingContext
private[committer] trait CommitStep[PartialResult] {
def apply(
context: CommitContext,
input: PartialResult,
)(implicit loggingContext: LoggingContext): StepResult[PartialResult]
}

View File

@ -15,7 +15,6 @@ import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
}
import com.daml.ledger.participant.state.kvutils.KeyValueCommitting.PreExecutionResult
import com.daml.ledger.participant.state.kvutils._
import com.daml.ledger.participant.state.kvutils.committer.Committer._
import com.daml.ledger.participant.state.v1.{Configuration, ParticipantId}
import com.daml.lf.data.Time
import com.daml.lf.data.Time.Timestamp
@ -75,7 +74,7 @@ private[committer] trait Committer[PartialResult] extends SubmissionExecutor {
submission: DamlSubmission,
)(implicit loggingContext: LoggingContext): PartialResult
protected def steps: Iterable[(StepInfo, Step[PartialResult])]
protected def steps: Iterable[(StepInfo, CommitStep[PartialResult])]
protected val metrics: Metrics
@ -159,7 +158,7 @@ private[committer] trait Committer[PartialResult] extends SubmissionExecutor {
state match {
case StepContinue(state) =>
withEnrichedLoggingContext(extraLoggingContext(state)) { implicit loggingContext =>
stepTimers(info).time(() => step(commitContext, state)(loggingContext))
stepTimers(info).time(() => step(commitContext, state))
}
case result @ StepStop(_) => result
}
@ -170,15 +169,8 @@ private[committer] trait Committer[PartialResult] extends SubmissionExecutor {
}
object Committer {
type StepInfo = String
private final val logger = ContextualizedLogger.get(getClass)
private[committer] type Step[PartialResult] =
(CommitContext, PartialResult) => LoggingContext => StepResult[PartialResult]
private[committer] type Steps[PartialResult] = Iterable[(StepInfo, Step[PartialResult])]
def getCurrentConfiguration(
defaultConfig: Configuration,
commitContext: CommitContext,

View File

@ -16,12 +16,13 @@ import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
private[kvutils] object ConfigCommitter {
private type Step = Committer.Step[ConfigCommitter.Result]
case class Result(
final case class Result(
submission: DamlConfigurationSubmission,
currentConfig: (Option[DamlConfigurationEntry], Configuration),
)
type Step = CommitStep[Result]
}
private[kvutils] class ConfigCommitter(
@ -52,135 +53,152 @@ private[kvutils] class ConfigCommitter(
private def rejectionTraceLog(message: String)(implicit loggingContext: LoggingContext): Unit =
logger.trace(s"Configuration rejected: $message.")
private[committer] val checkTtl: Step = { (ctx, result) => implicit loggingContext =>
// Check the maximum record time against the record time of the commit.
// This mechanism allows the submitter to detect lost submissions and retry
// with a submitter controlled rate.
if (ctx.recordTime.exists(_ > maximumRecordTime)) {
rejectionTraceLog(s"submission timed out (${ctx.recordTime} > $maximumRecordTime)")
reject(
ctx.recordTime,
result.submission,
_.setTimedOut(
TimedOut.newBuilder
.setMaximumRecordTime(buildTimestamp(maximumRecordTime))
),
)
} else {
if (ctx.preExecute) {
// Propagate the time bounds and defer the checks to post-execution.
ctx.maximumRecordTime = Some(maximumRecordTime.toInstant)
setOutOfTimeBoundsLogEntry(result.submission, ctx)
private[committer] val checkTtl: Step = new Step {
def apply(
ctx: CommitContext,
result: Result,
)(implicit loggingContext: LoggingContext): StepResult[Result] =
// Check the maximum record time against the record time of the commit.
// This mechanism allows the submitter to detect lost submissions and retry
// with a submitter controlled rate.
if (ctx.recordTime.exists(_ > maximumRecordTime)) {
rejectionTraceLog(s"submission timed out (${ctx.recordTime} > $maximumRecordTime)")
reject(
ctx.recordTime,
result.submission,
_.setTimedOut(TimedOut.newBuilder.setMaximumRecordTime(buildTimestamp(maximumRecordTime))),
)
} else {
if (ctx.preExecute) {
// Propagate the time bounds and defer the checks to post-execution.
ctx.maximumRecordTime = Some(maximumRecordTime.toInstant)
setOutOfTimeBoundsLogEntry(result.submission, ctx)
}
StepContinue(result)
}
}
private val authorizeSubmission: Step = new Step {
def apply(
ctx: CommitContext,
result: Result,
)(implicit loggingContext: LoggingContext): StepResult[Result] = {
// Submission is authorized when:
// the provided participant id matches source participant id
// AND (
// there exists no current configuration
// OR the current configuration's participant matches the submitting participant.
// )
val authorized = result.submission.getParticipantId == ctx.participantId
val wellFormed =
result.currentConfig._1.forall(_.getParticipantId == ctx.participantId)
if (!authorized) {
val message =
s"participant id ${result.submission.getParticipantId} did not match authenticated participant id ${ctx.participantId}"
rejectionTraceLog(message)
reject(
ctx.recordTime,
result.submission,
_.setParticipantNotAuthorized(ParticipantNotAuthorized.newBuilder.setDetails(message)),
)
} else if (!wellFormed) {
val message = s"${ctx.participantId} is not authorized to change configuration."
rejectionTraceLog(message)
reject(
ctx.recordTime,
result.submission,
_.setParticipantNotAuthorized(ParticipantNotAuthorized.newBuilder.setDetails(message)),
)
} else {
StepContinue(result)
}
StepContinue(result)
}
}
private val authorizeSubmission: Step = { (ctx, result) => implicit loggingContext =>
// Submission is authorized when:
// the provided participant id matches source participant id
// AND (
// there exists no current configuration
// OR the current configuration's participant matches the submitting participant.
// )
val authorized = result.submission.getParticipantId == ctx.participantId
val wellFormed =
result.currentConfig._1.forall(_.getParticipantId == ctx.participantId)
if (!authorized) {
val message =
s"participant id ${result.submission.getParticipantId} did not match authenticated participant id ${ctx.participantId}"
rejectionTraceLog(message)
reject(
ctx.recordTime,
result.submission,
_.setParticipantNotAuthorized(ParticipantNotAuthorized.newBuilder.setDetails(message)),
)
} else if (!wellFormed) {
val message = s"${ctx.participantId} is not authorized to change configuration."
rejectionTraceLog(message)
reject(
ctx.recordTime,
result.submission,
_.setParticipantNotAuthorized(ParticipantNotAuthorized.newBuilder.setDetails(message)),
)
} else {
StepContinue(result)
}
}
private val validateSubmission: Step = { (ctx, result) => _ =>
Configuration
.decode(result.submission.getConfiguration)
.fold(
err =>
reject(
ctx.recordTime,
result.submission,
_.setInvalidConfiguration(
Invalid.newBuilder
.setDetails(err)
),
),
config =>
if (config.generation != (1 + result.currentConfig._2.generation))
private val validateSubmission: Step = new Step {
def apply(
ctx: CommitContext,
result: Result,
)(implicit loggingContext: LoggingContext): StepResult[Result] =
Configuration
.decode(result.submission.getConfiguration)
.fold(
err =>
reject(
ctx.recordTime,
result.submission,
_.setGenerationMismatch(
GenerationMismatch.newBuilder
.setExpectedGeneration(1 + result.currentConfig._2.generation)
),
)
else
StepContinue(result),
)
_.setInvalidConfiguration(Invalid.newBuilder.setDetails(err)),
),
config =>
if (config.generation != (1 + result.currentConfig._2.generation))
reject(
ctx.recordTime,
result.submission,
_.setGenerationMismatch(
GenerationMismatch.newBuilder
.setExpectedGeneration(1 + result.currentConfig._2.generation)
),
)
else
StepContinue(result),
)
}
private val deduplicateSubmission: Step = { (ctx, result) => implicit loggingContext =>
val submissionKey = configDedupKey(ctx.participantId, result.submission.getSubmissionId)
if (ctx.get(submissionKey).isEmpty) {
StepContinue(result)
} else {
val message = s"duplicate submission='${result.submission.getSubmissionId}'"
rejectionTraceLog(message)
reject(
ctx.recordTime,
result.submission,
_.setDuplicateSubmission(Duplicate.newBuilder.setDetails(message)),
)
private val deduplicateSubmission: Step = new Step {
def apply(
ctx: CommitContext,
result: Result,
)(implicit loggingContext: LoggingContext): StepResult[Result] = {
val submissionKey = configDedupKey(ctx.participantId, result.submission.getSubmissionId)
if (ctx.get(submissionKey).isEmpty) {
StepContinue(result)
} else {
val message = s"duplicate submission='${result.submission.getSubmissionId}'"
rejectionTraceLog(message)
reject(
ctx.recordTime,
result.submission,
_.setDuplicateSubmission(Duplicate.newBuilder.setDetails(message)),
)
}
}
}
private[committer] def buildLogEntry: Step = { (ctx, result) => implicit loggingContext =>
metrics.daml.kvutils.committer.config.accepts.inc()
logger.trace("Configuration accepted.")
private[committer] def buildLogEntry: Step = new Step {
def apply(
ctx: CommitContext,
result: Result,
)(implicit loggingContext: LoggingContext): StepResult[Result] = {
metrics.daml.kvutils.committer.config.accepts.inc()
logger.trace("Configuration accepted.")
val configurationEntry = DamlConfigurationEntry.newBuilder
.setSubmissionId(result.submission.getSubmissionId)
.setParticipantId(result.submission.getParticipantId)
.setConfiguration(result.submission.getConfiguration)
.build
ctx.set(
configurationStateKey,
DamlStateValue.newBuilder
.setConfigurationEntry(configurationEntry)
.build,
)
val configurationEntry = DamlConfigurationEntry.newBuilder
.setSubmissionId(result.submission.getSubmissionId)
.setParticipantId(result.submission.getParticipantId)
.setConfiguration(result.submission.getConfiguration)
.build
ctx.set(
configurationStateKey,
DamlStateValue.newBuilder
.setConfigurationEntry(configurationEntry)
.build,
)
ctx.set(
configDedupKey(ctx.participantId, result.submission.getSubmissionId),
DamlStateValue.newBuilder
.setSubmissionDedup(DamlSubmissionDedupValue.newBuilder)
.build,
)
ctx.set(
configDedupKey(ctx.participantId, result.submission.getSubmissionId),
DamlStateValue.newBuilder
.setSubmissionDedup(DamlSubmissionDedupValue.newBuilder)
.build,
)
val successLogEntry = buildLogEntryWithOptionalRecordTime(
ctx.recordTime,
_.setConfigurationEntry(configurationEntry),
)
StepStop(successLogEntry)
val successLogEntry = buildLogEntryWithOptionalRecordTime(
ctx.recordTime,
_.setConfigurationEntry(configurationEntry),
)
StepStop(successLogEntry)
}
}
private def reject[PartialResult](

View File

@ -9,11 +9,7 @@ import com.daml.daml_lf_dev.DamlLf
import com.daml.ledger.participant.state.kvutils.Conversions.packageUploadDedupKey
import com.daml.ledger.participant.state.kvutils.DamlKvutils
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
import com.daml.ledger.participant.state.kvutils.committer.Committer.{
StepInfo,
Steps,
buildLogEntryWithOptionalRecordTime,
}
import com.daml.ledger.participant.state.kvutils.committer.Committer.buildLogEntryWithOptionalRecordTime
import com.daml.lf
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.PackageId
@ -27,9 +23,13 @@ import com.google.protobuf.ByteString
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal
object PackageCommitter {
private[committer] type Result = (DamlPackageUploadEntry.Builder, Map[Ref.PackageId, Ast.Package])
private[committer] type Step = Committer.Step[Result]
private[committer] object PackageCommitter {
final case class Result(
uploadEntry: DamlPackageUploadEntry.Builder,
packagesCache: Map[Ref.PackageId, Ast.Package],
)
type Step = CommitStep[Result]
}
final private[kvutils] class PackageCommitter(
@ -46,17 +46,15 @@ final private[kvutils] class PackageCommitter(
override protected val committerName: String = "package_upload"
override protected def extraLoggingContext(result: Result): Map[String, String] = Map(
"packages" -> result._1.getArchivesList.asScala.map(_.getHash).mkString("[", ", ", "]")
"packages" -> result.uploadEntry.getArchivesList.asScala.map(_.getHash).mkString("[", ", ", "]")
)
/** The initial internal state passed to first step. */
override protected def init(
ctx: CommitContext,
submission: DamlSubmission,
)(implicit
loggingContext: LoggingContext
): (DamlPackageUploadEntry.Builder, Map[Ref.PackageId, Ast.Package]) =
(submission.getPackageUploadEntry.toBuilder, Map.empty)
)(implicit loggingContext: LoggingContext): Result =
Result(submission.getPackageUploadEntry.toBuilder, Map.empty)
private def rejectionTraceLog(message: String)(implicit loggingContext: LoggingContext): Unit =
logger.trace(s"Package upload rejected: $message.")
@ -101,8 +99,12 @@ final private[kvutils] class PackageCommitter(
)
)
private def authorizeSubmission: Step = { case (ctx, partialResult @ (uploadEntry, _)) =>
implicit loggingContext =>
private def authorizeSubmission: Step = new Step {
def apply(
ctx: CommitContext,
partialResult: Result,
)(implicit loggingContext: LoggingContext): StepResult[Result] = {
val uploadEntry = partialResult.uploadEntry
if (ctx.participantId == uploadEntry.getParticipantId) {
StepContinue(partialResult)
} else {
@ -116,10 +118,15 @@ final private[kvutils] class PackageCommitter(
_.setParticipantNotAuthorized(ParticipantNotAuthorized.newBuilder.setDetails(message)),
)
}
}
}
private def deduplicateSubmission: Step = { case (ctx, partialResult @ (uploadEntry, _)) =>
implicit loggingContext =>
private def deduplicateSubmission: Step = new Step {
def apply(
ctx: CommitContext,
partialResult: Result,
)(implicit loggingContext: LoggingContext): StepResult[Result] = {
val uploadEntry = partialResult.uploadEntry
val submissionKey = packageUploadDedupKey(ctx.participantId, uploadEntry.getSubmissionId)
if (ctx.get(submissionKey).isEmpty) {
StepContinue(partialResult)
@ -133,11 +140,16 @@ final private[kvutils] class PackageCommitter(
_.setDuplicateSubmission(Duplicate.newBuilder.setDetails(message)),
)
}
}
}
// Checks that packages are not repeated in the submission.
private def checkForDuplicates: Step = { case (ctx, partialResult @ (uploadEntry, _)) =>
implicit loggingContext =>
private def checkForDuplicates: Step = new Step {
def apply(
ctx: CommitContext,
partialResult: Result,
)(implicit loggingContext: LoggingContext): StepResult[Result] = {
val uploadEntry = partialResult.uploadEntry
val (seenOnce, duplicates) = uploadEntry.getArchivesList
.iterator()
.asScala
@ -168,6 +180,7 @@ final private[kvutils] class PackageCommitter(
} else {
StepContinue(partialResult)
}
}
}
private def decodePackages(
@ -222,12 +235,16 @@ final private[kvutils] class PackageCommitter(
}
// Strict validation
private def strictlyValidatePackages: Step = { case (ctx, (uploadEntry, pkgsCache)) =>
implicit loggingContext =>
private def strictlyValidatePackages: Step = new Step {
def apply(
ctx: CommitContext,
partialResult: Result,
)(implicit loggingContext: LoggingContext): StepResult[Result] = {
val Result(uploadEntry, packagesCache) = partialResult
val result = for {
pkgs <- decodePackagesIfNeeded(pkgsCache, uploadEntry.getArchivesList.asScala)
_ <- validatePackages(uploadEntry, pkgs)
} yield StepContinue((uploadEntry, pkgs))
packages <- decodePackagesIfNeeded(packagesCache, uploadEntry.getArchivesList.asScala)
_ <- validatePackages(uploadEntry, packages)
} yield StepContinue(Result(uploadEntry, packages))
result match {
case Right(result) => result
@ -240,12 +257,17 @@ final private[kvutils] class PackageCommitter(
_.setInvalidPackage(DamlKvutils.Invalid.newBuilder.setDetails(message)),
)
}
}
}
// Minimal validation.
// Checks that package IDs are valid and package payloads are non-empty.
private def looselyValidatePackages: Step = { case (ctx, partialResult @ (uploadEntry, _)) =>
implicit loggingContext =>
private def looselyValidatePackages: Step = new Step {
def apply(
ctx: CommitContext,
partialResult: Result,
)(implicit loggingContext: LoggingContext): StepResult[Result] = {
val uploadEntry = partialResult.uploadEntry
val archives = uploadEntry.getArchivesList.asScala
val errors =
archives.foldLeft(List.empty[String]) { (errors, archive) =>
@ -269,14 +291,15 @@ final private[kvutils] class PackageCommitter(
_.setInvalidPackage(Invalid.newBuilder.setDetails(message)),
)
}
}
}
private def uploadPackages(pkgs: Map[Ref.PackageId, Ast.Package]): Either[String, Unit] =
private def uploadPackages(packages: Map[Ref.PackageId, Ast.Package]): Either[String, Unit] =
metrics.daml.kvutils.committer.packageUpload.preloadTimer.time { () =>
val errors = pkgs.flatMap { case (pkgId, pkg) =>
val errors = packages.flatMap { case (pkgId, pkg) =>
engine
.preloadPackage(pkgId, pkg)
.consume(_ => None, pkgs.get, _ => None, _ => VisibleByKey.Visible)
.consume(_ => None, packages.get, _ => None, _ => VisibleByKey.Visible)
.fold(err => List(err.detailMsg), _ => List.empty)
}.toList
metrics.daml.kvutils.committer.packageUpload.loadedPackages(() =>
@ -289,12 +312,16 @@ final private[kvutils] class PackageCommitter(
)
}
private def preloadSynchronously: Step = { case (ctx, (uploadEntry, pkgsCache)) =>
implicit loggingContext =>
private def preloadSynchronously: Step = new Step {
def apply(
ctx: CommitContext,
partialResult: Result,
)(implicit loggingContext: LoggingContext): StepResult[Result] = {
val Result(uploadEntry, packagesCache) = partialResult
val result = for {
pkgs <- decodePackagesIfNeeded(pkgsCache, uploadEntry.getArchivesList.asScala)
_ <- uploadPackages(pkgs)
} yield StepContinue((uploadEntry, pkgs))
packages <- decodePackagesIfNeeded(packagesCache, uploadEntry.getArchivesList.asScala)
_ <- uploadPackages(packages)
} yield StepContinue(Result(uploadEntry, packages))
result match {
case Right(partialResult) =>
@ -308,6 +335,7 @@ final private[kvutils] class PackageCommitter(
_.setInvalidPackage(DamlKvutils.Invalid.newBuilder.setDetails(message)),
)
}
}
}
private lazy val preloadExecutor =
@ -325,15 +353,19 @@ final private[kvutils] class PackageCommitter(
*
* This assumes the engine validates the archive it receives.
*/
private def preloadAsynchronously: Step = { case (_, partialResult @ (uploadEntry, pkgsCache)) =>
implicit loggingContext =>
private def preloadAsynchronously: Step = new Step {
def apply(
ctx: CommitContext,
partialResult: Result,
)(implicit loggingContext: LoggingContext): StepResult[Result] = {
val Result(uploadEntry, packagesCache) = partialResult
// we need to extract the archives synchronously as other steps may modify uploadEntry
val archives = uploadEntry.getArchivesList.iterator().asScala.toList
preloadExecutor.execute { () =>
logger.trace(s"Uploading ${uploadEntry.getArchivesCount} archive(s).")
val result = for {
pkgs <- decodePackagesIfNeeded(pkgsCache, archives)
_ <- uploadPackages(pkgs)
packages <- decodePackagesIfNeeded(packagesCache, archives)
_ <- uploadPackages(packages)
} yield ()
result.fold(
@ -342,27 +374,39 @@ final private[kvutils] class PackageCommitter(
)
}
StepContinue(partialResult)
}
}
// Filter out packages already on the ledger.
// Should be done after decoding, validation or preloading, as those step may
// require packages on the ledger by not loaded by the engine.
private def filterKnownPackages: Step = { case (ctx, (uploadEntry, pkgs)) =>
_ =>
private def filterKnownPackages: Step = new Step {
def apply(
ctx: CommitContext,
partialResult: Result,
)(implicit loggingContext: LoggingContext): StepResult[Result] = {
val Result(uploadEntry, packagesCache) = partialResult
val archives = uploadEntry.getArchivesList.asScala.filter { archive =>
val stateKey = DamlStateKey.newBuilder
.setPackageId(archive.getHash)
.build
ctx.get(stateKey).isEmpty
}
StepContinue(uploadEntry.clearArchives().addAllArchives(archives.asJava) -> pkgs)
StepContinue(
Result(uploadEntry.clearArchives().addAllArchives(archives.asJava), packagesCache)
)
}
}
private[committer] def buildLogEntry: Step = { case (ctx, (uploadEntry, _)) =>
implicit loggingContext =>
private[committer] def buildLogEntry: Step = new Step {
def apply(
ctx: CommitContext,
partialResult: Result,
)(implicit loggingContext: LoggingContext): StepResult[Result] = {
metrics.daml.kvutils.committer.packageUpload.accepts.inc()
logger.trace("Packages committed.")
val uploadEntry = partialResult.uploadEntry
uploadEntry.getArchivesList.forEach { archive =>
ctx.set(
DamlStateKey.newBuilder.setPackageId(archive.getHash).build,
@ -381,8 +425,8 @@ final private[kvutils] class PackageCommitter(
setOutOfTimeBoundsLogEntry(uploadEntry, ctx)
}
StepStop(successLogEntry)
}
}
override protected val steps: Steps[Result] = {
val builder = List.newBuilder[(StepInfo, Step)]

View File

@ -5,10 +5,7 @@ package com.daml.ledger.participant.state.kvutils.committer
import com.daml.ledger.participant.state.kvutils.Conversions.partyAllocationDedupKey
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
import com.daml.ledger.participant.state.kvutils.committer.Committer.{
Steps,
buildLogEntryWithOptionalRecordTime,
}
import com.daml.ledger.participant.state.kvutils.committer.Committer.buildLogEntryWithOptionalRecordTime
import com.daml.lf.data.Ref
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.{ContextualizedLogger, LoggingContext}
@ -16,7 +13,8 @@ import com.daml.metrics.Metrics
private[kvutils] object PartyAllocationCommitter {
type Result = DamlPartyAllocationEntry.Builder
type Step = Committer.Step[Result]
type Step = CommitStep[Result]
}
private[kvutils] class PartyAllocationCommitter(
@ -42,99 +40,123 @@ private[kvutils] class PartyAllocationCommitter(
private def rejectionTraceLog(msg: String)(implicit loggingContext: LoggingContext): Unit =
logger.trace(s"Party allocation rejected: $msg.")
private val authorizeSubmission: Step = { (ctx, result) => implicit loggingContext =>
if (ctx.participantId == result.getParticipantId) {
StepContinue(result)
} else {
val message =
s"participant id ${result.getParticipantId} did not match authenticated participant id ${ctx.participantId}"
rejectionTraceLog(message)
reject(
ctx.recordTime,
result,
_.setParticipantNotAuthorized(ParticipantNotAuthorized.newBuilder.setDetails(message)),
)
}
}
private val validateParty: Step = { (ctx, result) => implicit loggingContext =>
val party = result.getParty
if (Ref.Party.fromString(party).isRight) {
StepContinue(result)
} else {
val message = s"party string '$party' invalid"
rejectionTraceLog(message)
reject(
ctx.recordTime,
result,
_.setInvalidName(Invalid.newBuilder.setDetails(message)),
)
}
}
private val deduplicateParty: Step = { (ctx, result) => implicit loggingContext =>
val party = result.getParty
val partyKey = DamlStateKey.newBuilder.setParty(party).build
if (ctx.get(partyKey).isEmpty) {
StepContinue(result)
} else {
val message = s"party already exists party='$party'"
rejectionTraceLog(message)
reject(
ctx.recordTime,
result,
_.setAlreadyExists(AlreadyExists.newBuilder.setDetails(message)),
)
}
}
private val deduplicateSubmission: Step = { (ctx, result) => implicit loggingContext =>
val submissionKey = partyAllocationDedupKey(ctx.participantId, result.getSubmissionId)
if (ctx.get(submissionKey).isEmpty) {
StepContinue(result)
} else {
val message = s"duplicate submission='${result.getSubmissionId}'"
rejectionTraceLog(message)
reject(
ctx.recordTime,
result,
_.setDuplicateSubmission(Duplicate.newBuilder.setDetails(message)),
)
}
}
private[committer] val buildLogEntry: Step = { (ctx, result) => implicit loggingContext =>
val party = result.getParty
val partyKey = DamlStateKey.newBuilder.setParty(party).build
metrics.daml.kvutils.committer.partyAllocation.accepts.inc()
logger.trace("Party allocated.")
ctx.set(
partyKey,
DamlStateValue.newBuilder
.setParty(
DamlPartyAllocation.newBuilder
.setParticipantId(ctx.participantId)
private val authorizeSubmission: Step = new Step {
def apply(
ctx: CommitContext,
result: Result,
)(implicit loggingContext: LoggingContext): StepResult[Result] =
if (ctx.participantId == result.getParticipantId) {
StepContinue(result)
} else {
val message =
s"participant id ${result.getParticipantId} did not match authenticated participant id ${ctx.participantId}"
rejectionTraceLog(message)
reject(
ctx.recordTime,
result,
_.setParticipantNotAuthorized(ParticipantNotAuthorized.newBuilder.setDetails(message)),
)
.build,
)
}
}
ctx.set(
partyAllocationDedupKey(ctx.participantId, result.getSubmissionId),
DamlStateValue.newBuilder
.setSubmissionDedup(DamlSubmissionDedupValue.newBuilder)
.build,
)
val successLogEntry = buildLogEntryWithOptionalRecordTime(
ctx.recordTime,
_.setPartyAllocationEntry(result),
)
if (ctx.preExecute) {
setOutOfTimeBoundsLogEntry(result, ctx)
private val validateParty: Step = new Step {
def apply(
ctx: CommitContext,
result: Result,
)(implicit loggingContext: LoggingContext): StepResult[Result] = {
val party = result.getParty
if (Ref.Party.fromString(party).isRight) {
StepContinue(result)
} else {
val message = s"party string '$party' invalid"
rejectionTraceLog(message)
reject(
ctx.recordTime,
result,
_.setInvalidName(Invalid.newBuilder.setDetails(message)),
)
}
}
}
private val deduplicateParty: Step = new Step {
def apply(
ctx: CommitContext,
result: Result,
)(implicit loggingContext: LoggingContext): StepResult[Result] = {
val party = result.getParty
val partyKey = DamlStateKey.newBuilder.setParty(party).build
if (ctx.get(partyKey).isEmpty) {
StepContinue(result)
} else {
val message = s"party already exists party='$party'"
rejectionTraceLog(message)
reject(
ctx.recordTime,
result,
_.setAlreadyExists(AlreadyExists.newBuilder.setDetails(message)),
)
}
}
}
private val deduplicateSubmission: Step = new Step {
def apply(
ctx: CommitContext,
result: Result,
)(implicit loggingContext: LoggingContext): StepResult[Result] = {
val submissionKey = partyAllocationDedupKey(ctx.participantId, result.getSubmissionId)
if (ctx.get(submissionKey).isEmpty) {
StepContinue(result)
} else {
val message = s"duplicate submission='${result.getSubmissionId}'"
rejectionTraceLog(message)
reject(
ctx.recordTime,
result,
_.setDuplicateSubmission(Duplicate.newBuilder.setDetails(message)),
)
}
}
}
private[committer] val buildLogEntry: Step = new Step {
def apply(
ctx: CommitContext,
result: Result,
)(implicit loggingContext: LoggingContext): StepResult[Result] = {
val party = result.getParty
val partyKey = DamlStateKey.newBuilder.setParty(party).build
metrics.daml.kvutils.committer.partyAllocation.accepts.inc()
logger.trace("Party allocated.")
ctx.set(
partyKey,
DamlStateValue.newBuilder
.setParty(
DamlPartyAllocation.newBuilder
.setParticipantId(ctx.participantId)
)
.build,
)
ctx.set(
partyAllocationDedupKey(ctx.participantId, result.getSubmissionId),
DamlStateValue.newBuilder
.setSubmissionDedup(DamlSubmissionDedupValue.newBuilder)
.build,
)
val successLogEntry = buildLogEntryWithOptionalRecordTime(
ctx.recordTime,
_.setPartyAllocationEntry(result),
)
if (ctx.preExecute) {
setOutOfTimeBoundsLogEntry(result, ctx)
}
StepStop(successLogEntry)
}
StepStop(successLogEntry)
}
private def reject[PartialResult](

View File

@ -0,0 +1,10 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils
package object committer {
private[committer] type StepInfo = String
private[committer] type Steps[PartialResult] = Iterable[(StepInfo, CommitStep[PartialResult])]
}

View File

@ -0,0 +1,49 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils.committer.transaction
import com.daml.ledger.participant.state.kvutils.Conversions
import com.daml.ledger.participant.state.kvutils.Conversions.parseTimestamp
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
DamlSubmitterInfo,
DamlTransactionEntry,
}
import com.daml.lf.crypto
import com.daml.lf.data.Ref.Party
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.transaction.{Transaction => Tx}
import scala.jdk.CollectionConverters._
private[transaction] final class DamlTransactionEntrySummary(
val submission: DamlTransactionEntry,
tx: => Tx.Transaction,
) {
val ledgerEffectiveTime: Timestamp = parseTimestamp(submission.getLedgerEffectiveTime)
val submitterInfo: DamlSubmitterInfo = submission.getSubmitterInfo
val commandId: String = submitterInfo.getCommandId
val submitters: List[Party] =
submitterInfo.getSubmittersList.asScala.toList.map(Party.assertFromString)
lazy val transaction: Tx.Transaction = tx
val submissionTime: Timestamp =
Conversions.parseTimestamp(submission.getSubmissionTime)
val submissionSeed: crypto.Hash =
Conversions.parseHash(submission.getSubmissionSeed)
// On copy, avoid decoding the transaction again if not needed
def copyPreservingDecodedTransaction(
submission: DamlTransactionEntry
): DamlTransactionEntrySummary =
new DamlTransactionEntrySummary(submission, transaction)
}
private[transaction] object DamlTransactionEntrySummary {
def apply(
submission: DamlTransactionEntry
): DamlTransactionEntrySummary =
new DamlTransactionEntrySummary(
submission,
Conversions.decodeTransaction(submission.getTransaction),
)
}

View File

@ -10,13 +10,11 @@ import com.daml.ledger.participant.state.kvutils.Conversions._
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
import com.daml.ledger.participant.state.kvutils.committer.Committer._
import com.daml.ledger.participant.state.kvutils.committer._
import com.daml.ledger.participant.state.kvutils.committer.transaction.TransactionCommitter.DamlTransactionEntrySummary
import com.daml.ledger.participant.state.kvutils.committer.transaction.keys.ContractKeysValidation.validateKeys
import com.daml.ledger.participant.state.kvutils.{Conversions, Err}
import com.daml.ledger.participant.state.v1.{Configuration, RejectionReason, TimeModel}
import com.daml.lf.archive.Decode
import com.daml.lf.archive.Reader.ParseError
import com.daml.lf.crypto
import com.daml.lf.data.Ref.{PackageId, Party}
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.engine.{Blinding, Engine, ReplayMismatch, VisibleByKey}
@ -30,7 +28,6 @@ import com.daml.lf.transaction.{
SubmittedTransaction,
TransactionOuterClass,
VersionedTransaction,
Transaction => Tx,
}
import com.daml.lf.value.Value
import com.daml.lf.value.Value.ContractId
@ -99,8 +96,11 @@ private[kvutils] class TransactionCommitter(
/** Reject duplicate commands
*/
private[committer] def deduplicateCommand: Step = {
(commitContext, transactionEntry) => implicit loggingContext =>
private[transaction] def deduplicateCommand: Step = new Step {
def apply(
commitContext: CommitContext,
transactionEntry: DamlTransactionEntrySummary,
)(implicit loggingContext: LoggingContext): StepResult[DamlTransactionEntrySummary] = {
commitContext.recordTime
.map { recordTime =>
val dedupKey = commandDedupKey(transactionEntry.submitterInfo)
@ -120,6 +120,7 @@ private[kvutils] class TransactionCommitter(
}
}
.getOrElse(StepContinue(transactionEntry))
}
}
// Checks that the submission time of the command is after the
@ -140,14 +141,11 @@ private[kvutils] class TransactionCommitter(
/** Authorize the submission by looking up the party allocation and verifying
* that all of the submitting parties are indeed hosted by the submitting participant.
*/
private[committer] def authorizeSubmitters: Step = {
(commitContext, transactionEntry) => implicit loggingContext =>
def rejection(reason: RejectionReason) =
reject[DamlTransactionEntrySummary](
commitContext.recordTime,
buildRejectionLogEntry(transactionEntry, reason),
)
private[transaction] def authorizeSubmitters: Step = new Step {
def apply(
commitContext: CommitContext,
transactionEntry: DamlTransactionEntrySummary,
)(implicit loggingContext: LoggingContext): StepResult[DamlTransactionEntrySummary] = {
@scala.annotation.tailrec
def authorizeAll(submitters: List[Party]): StepResult[DamlTransactionEntrySummary] =
submitters match {
@ -183,12 +181,22 @@ private[kvutils] class TransactionCommitter(
)
}
def rejection(reason: RejectionReason): StepResult[DamlTransactionEntrySummary] =
reject[DamlTransactionEntrySummary](
commitContext.recordTime,
buildRejectionLogEntry(transactionEntry, reason),
)
authorizeAll(transactionEntry.submitters)
}
}
/** Validate ledger effective time and the command's time-to-live. */
private[committer] def validateLedgerTime: Step = {
(commitContext, transactionEntry) => implicit loggingContext =>
private[transaction] def validateLedgerTime: Step = new Step {
def apply(
commitContext: CommitContext,
transactionEntry: DamlTransactionEntrySummary,
)(implicit loggingContext: LoggingContext): StepResult[DamlTransactionEntrySummary] = {
val (_, config) = getCurrentConfiguration(defaultConfig, commitContext)
val timeModel = config.timeModel
@ -239,11 +247,15 @@ private[kvutils] class TransactionCommitter(
commitContext.outOfTimeBoundsLogEntry = Some(outOfTimeBoundsLogEntry)
StepContinue(transactionEntry)
}
}
}
/** Validate the submission's conformance to the DAML model */
private def validateModelConformance: Step = (commitContext, transactionEntry) =>
implicit loggingContext =>
private def validateModelConformance: Step = new Step {
def apply(
commitContext: CommitContext,
transactionEntry: DamlTransactionEntrySummary,
)(implicit loggingContext: LoggingContext): StepResult[DamlTransactionEntrySummary] =
metrics.daml.kvutils.committer.transaction.interpretTimer.time(() => {
// Pull all keys from referenced contracts. We require this for 'fetchByKey' calls
// which are not evidenced in the transaction itself and hence the contract key state is
@ -293,8 +305,9 @@ private[kvutils] class TransactionCommitter(
)
}
})
}
private[committer] def rejectionReasonForValidationError(
private[transaction] def rejectionReasonForValidationError(
validationError: com.daml.lf.engine.Error
): RejectionReason = {
def disputed: RejectionReason =
@ -347,8 +360,11 @@ private[kvutils] class TransactionCommitter(
}
/** Validate the submission's conformance to the DAML model */
private[committer] def blind: Step = {
(commitContext, transactionEntry) => implicit loggingContext =>
private[transaction] def blind: Step = new Step {
def apply(
commitContext: CommitContext,
transactionEntry: DamlTransactionEntrySummary,
)(implicit loggingContext: LoggingContext): StepResult[DamlTransactionEntrySummary] = {
val blindingInfo = Blinding.blind(transactionEntry.transaction)
setDedupEntryAndUpdateContractState(
commitContext,
@ -359,63 +375,78 @@ private[kvutils] class TransactionCommitter(
),
blindingInfo,
)
}
}
/** Removes `Fetch` and `LookupByKey` nodes from the transactionEntry.
*/
private[committer] def trimUnnecessaryNodes: Step = { (_, transactionEntry) => _ =>
val transaction = transactionEntry.submission.getTransaction
val nodes = transaction.getNodesList.asScala
val nodesToKeep = nodes.iterator.collect {
case node if node.hasCreate || node.hasExercise => node.getNodeId
}.toSet
private[transaction] def trimUnnecessaryNodes: Step = new Step {
def apply(
commitContext: CommitContext,
transactionEntry: DamlTransactionEntrySummary,
)(implicit loggingContext: LoggingContext): StepResult[DamlTransactionEntrySummary] = {
val transaction = transactionEntry.submission.getTransaction
val nodes = transaction.getNodesList.asScala
val nodesToKeep = nodes.iterator.collect {
case node if node.hasCreate || node.hasExercise => node.getNodeId
}.toSet
val filteredRoots = transaction.getRootsList.asScala.filter(nodesToKeep)
val filteredRoots = transaction.getRootsList.asScala.filter(nodesToKeep)
def stripUnnecessaryNodes(node: TransactionOuterClass.Node): TransactionOuterClass.Node =
if (node.hasExercise) {
val exerciseNode = node.getExercise
val keptChildren =
exerciseNode.getChildrenList.asScala.filter(nodesToKeep)
val newExerciseNode = exerciseNode.toBuilder
.clearChildren()
.addAllChildren(keptChildren.asJavaCollection)
.build()
def stripUnnecessaryNodes(node: TransactionOuterClass.Node): TransactionOuterClass.Node =
if (node.hasExercise) {
val exerciseNode = node.getExercise
val keptChildren =
exerciseNode.getChildrenList.asScala.filter(nodesToKeep)
val newExerciseNode = exerciseNode.toBuilder
.clearChildren()
.addAllChildren(keptChildren.asJavaCollection)
.build()
node.toBuilder
.setExercise(newExerciseNode)
.build()
} else {
node
}
node.toBuilder
.setExercise(newExerciseNode)
.build()
} else {
node
}
val filteredNodes = nodes
.collect {
case node if nodesToKeep(node.getNodeId) => stripUnnecessaryNodes(node)
}
val filteredNodes = nodes
.collect {
case node if nodesToKeep(node.getNodeId) => stripUnnecessaryNodes(node)
}
val newTransaction = transaction
.newBuilderForType()
.addAllRoots(filteredRoots.asJavaCollection)
.addAllNodes(filteredNodes.asJavaCollection)
.setVersion(transaction.getVersion)
.build()
val newTransaction = transaction
.newBuilderForType()
.addAllRoots(filteredRoots.asJavaCollection)
.addAllNodes(filteredNodes.asJavaCollection)
.setVersion(transaction.getVersion)
.build()
val newTransactionEntry = transactionEntry.submission.toBuilder
.setTransaction(newTransaction)
.build()
val newTransactionEntry = transactionEntry.submission.toBuilder
.setTransaction(newTransaction)
.build()
StepContinue(DamlTransactionEntrySummary(newTransactionEntry))
StepContinue(DamlTransactionEntrySummary(newTransactionEntry))
}
}
/** Builds the log entry as the final step.
*/
private def buildFinalLogEntry: Step = (commitContext, transactionEntry) =>
_ => StepStop(buildLogEntry(transactionEntry, commitContext))
private def buildFinalLogEntry: Step = new Step {
def apply(
commitContext: CommitContext,
transactionEntry: DamlTransactionEntrySummary,
)(implicit loggingContext: LoggingContext): StepResult[DamlTransactionEntrySummary] = StepStop(
buildLogEntry(transactionEntry, commitContext)
)
}
/** Check that all informee parties mentioned of a transaction are allocated. */
private def checkInformeePartiesAllocation: Step = {
(commitContext, transactionEntry) => implicit loggingContext =>
private def checkInformeePartiesAllocation: Step = new Step {
def apply(
commitContext: CommitContext,
transactionEntry: DamlTransactionEntrySummary,
)(implicit loggingContext: LoggingContext): StepResult[DamlTransactionEntrySummary] = {
val parties = transactionEntry.transaction.informees
if (parties.forall(party => commitContext.get(partyStateKey(party)).isDefined))
StepContinue(transactionEntry)
@ -427,6 +458,7 @@ private[kvutils] class TransactionCommitter(
RejectionReason.PartyNotKnownOnLedger("Not all parties known"),
),
)
}
}
/** Produce the log entry and contract state updates. */
@ -517,7 +549,7 @@ private[kvutils] class TransactionCommitter(
}
}
private[committer] def buildLogEntry(
private[transaction] def buildLogEntry(
transactionEntry: DamlTransactionEntrySummary,
commitContext: CommitContext,
): DamlLogEntry = {
@ -698,37 +730,6 @@ private[kvutils] class TransactionCommitter(
}
private[kvutils] object TransactionCommitter {
class DamlTransactionEntrySummary(val submission: DamlTransactionEntry, tx: => Tx.Transaction) {
val ledgerEffectiveTime: Timestamp = parseTimestamp(submission.getLedgerEffectiveTime)
val submitterInfo: DamlSubmitterInfo = submission.getSubmitterInfo
val commandId: String = submitterInfo.getCommandId
val submitters: List[Party] =
submitterInfo.getSubmittersList.asScala.toList.map(Party.assertFromString)
lazy val transaction: Tx.Transaction = tx
val submissionTime: Timestamp =
Conversions.parseTimestamp(submission.getSubmissionTime)
val submissionSeed: crypto.Hash =
Conversions.parseHash(submission.getSubmissionSeed)
// On copy, avoid decoding the transaction again if not needed
def copyPreservingDecodedTransaction(
submission: DamlTransactionEntry
): DamlTransactionEntrySummary =
new DamlTransactionEntrySummary(submission, transaction)
}
private[transaction] type Step = Committer.Step[DamlTransactionEntrySummary]
object DamlTransactionEntrySummary {
def apply(
submission: DamlTransactionEntry
): DamlTransactionEntrySummary =
new DamlTransactionEntrySummary(
submission,
Conversions.decodeTransaction(submission.getTransaction),
)
}
// Helper to read the _current_ contract state.
// NOTE(JM): Important to fetch from the state that is currently being built up since
// we mark some contracts as archived and may later change their disclosure and do not

View File

@ -8,13 +8,15 @@ import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
DamlStateKey,
DamlStateValue,
}
import com.daml.ledger.participant.state.kvutils.committer.Committer.Step
import com.daml.ledger.participant.state.kvutils.committer.transaction.TransactionCommitter
import com.daml.ledger.participant.state.kvutils.committer.transaction.TransactionCommitter.DamlTransactionEntrySummary
import com.daml.ledger.participant.state.kvutils.committer.transaction.{
DamlTransactionEntrySummary,
Step,
TransactionCommitter,
}
import com.daml.ledger.participant.state.kvutils.committer.transaction.keys.KeyConsistencyValidation.checkNodeKeyConsistency
import com.daml.ledger.participant.state.kvutils.committer.transaction.keys.KeyMonotonicityValidation.checkContractKeysCausalMonotonicity
import com.daml.ledger.participant.state.kvutils.committer.transaction.keys.KeyUniquenessValidation.checkNodeKeyUniqueness
import com.daml.ledger.participant.state.kvutils.committer.{StepContinue, StepResult}
import com.daml.ledger.participant.state.kvutils.committer.{CommitContext, StepContinue, StepResult}
import com.daml.ledger.participant.state.v1.RejectionReason
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.transaction.{Node, NodeId}
@ -22,11 +24,11 @@ import com.daml.lf.value.Value.ContractId
import com.daml.logging.LoggingContext
private[transaction] object ContractKeysValidation {
def validateKeys(
transactionCommitter: TransactionCommitter
): Step[DamlTransactionEntrySummary] = {
(commitContext, transactionEntry) => implicit loggingContext =>
def validateKeys(transactionCommitter: TransactionCommitter): Step = new Step {
def apply(
commitContext: CommitContext,
transactionEntry: DamlTransactionEntrySummary,
)(implicit loggingContext: LoggingContext): StepResult[DamlTransactionEntrySummary] = {
val damlState = commitContext
.collectInputs[(DamlStateKey, DamlStateValue), Map[DamlStateKey, DamlStateValue]] {
case (key, Some(value)) if key.hasContractKey => key -> value
@ -58,6 +60,7 @@ private[transaction] object ContractKeysValidation {
stateAfterMonotonicityCheck,
)
} yield finalState
}
}
private def performTraversalContractKeysChecks(

View File

@ -5,9 +5,11 @@ package com.daml.ledger.participant.state.kvutils.committer.transaction.keys
import com.daml.ledger.participant.state.kvutils.Conversions
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue}
import com.daml.ledger.participant.state.kvutils.committer.transaction.{
DamlTransactionEntrySummary,
TransactionCommitter,
}
import com.daml.ledger.participant.state.kvutils.committer.{StepContinue, StepResult}
import com.daml.ledger.participant.state.kvutils.committer.transaction.TransactionCommitter
import com.daml.ledger.participant.state.kvutils.committer.transaction.TransactionCommitter.DamlTransactionEntrySummary
import com.daml.ledger.participant.state.v1.RejectionReason
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.LoggingContext

View File

@ -5,4 +5,6 @@ package com.daml.ledger.participant.state.kvutils.committer
package object transaction {
private[transaction] type RawContractId = String
private[transaction] type Step = CommitStep[DamlTransactionEntrySummary]
}

View File

@ -9,7 +9,6 @@ import com.codahale.metrics.MetricRegistry
import com.daml.ledger.participant.state.kvutils.Conversions.{buildTimestamp, configurationStateKey}
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
import com.daml.ledger.participant.state.kvutils.TestHelpers.{createCommitContext, theDefaultConfig}
import com.daml.ledger.participant.state.kvutils.committer.Committer.StepInfo
import com.daml.ledger.participant.state.kvutils.committer.CommitterSpec._
import com.daml.ledger.participant.state.kvutils.{DamlKvutils, Err}
import com.daml.ledger.participant.state.protobuf.LedgerConfiguration
@ -166,11 +165,12 @@ class CommitterSpec
submission: DamlSubmission,
)(implicit loggingContext: LoggingContext): Int = 0
override protected def steps: Iterable[(StepInfo, Step)] = Iterable[(StepInfo, Step)](
("first", (_, _) => _ => StepContinue(1)),
("second", (_, _) => _ => StepStop(expectedLogEntry)),
("third", (_, _) => _ => StepStop(DamlLogEntry.getDefaultInstance)),
)
override protected def steps: Iterable[(StepInfo, Step)] =
Iterable(
"first" -> stepReturning(StepContinue(1)),
"second" -> stepReturning(StepStop(expectedLogEntry)),
"third" -> stepReturning(StepStop(DamlLogEntry.getDefaultInstance)),
)
override protected val metrics: Metrics = newMetrics()
}
@ -189,10 +189,11 @@ class CommitterSpec
submission: DamlSubmission,
)(implicit loggingContext: LoggingContext): Int = 0
override protected def steps: Iterable[(StepInfo, Step)] = Iterable(
("first", (_, _) => _ => StepContinue(1)),
("second", (_, _) => _ => StepContinue(2)),
)
override protected def steps: Iterable[(StepInfo, Step)] =
Iterable(
"first" -> stepReturning(StepContinue(1)),
"second" -> stepReturning(StepContinue(2)),
)
override protected val metrics: Metrics = newMetrics()
}
@ -290,7 +291,9 @@ object CommitterSpec {
)(implicit loggingContext: LoggingContext): Int = 0
override protected def steps: Iterable[(StepInfo, Step)] =
Iterable(("result", (_, _) => _ => StepStop(aLogEntry)))
Iterable(
"result" -> stepReturning(StepStop(aLogEntry))
)
override protected val metrics: Metrics = newMetrics()
}
@ -303,5 +306,13 @@ object CommitterSpec {
)
.build
private type Step = Committer.Step[Int]
private type Step = CommitStep[Int]
def stepReturning[PartialResult](result: StepResult[PartialResult]): CommitStep[PartialResult] =
new CommitStep[PartialResult] {
override def apply(
context: CommitContext,
input: PartialResult,
)(implicit loggingContext: LoggingContext): StepResult[PartialResult] = result
}
}

View File

@ -33,7 +33,7 @@ class ConfigCommitterSpec extends AnyWordSpec with Matchers {
val instance = createConfigCommitter(aRecordTime)
val context = createCommitContext(recordTime = Some(aRecordTime.addMicros(1)))
val actual = instance.checkTtl(context, anEmptyResult)(loggingContext)
val actual = instance.checkTtl(context, anEmptyResult)
actual match {
case StepContinue(_) => fail()
@ -51,7 +51,7 @@ class ConfigCommitterSpec extends AnyWordSpec with Matchers {
val instance = createConfigCommitter(maximumRecordTime)
val context = createCommitContext(recordTime = Some(aRecordTime))
val actual = instance.checkTtl(context, anEmptyResult)(loggingContext)
val actual = instance.checkTtl(context, anEmptyResult)
actual match {
case StepContinue(_) => succeed
@ -64,7 +64,7 @@ class ConfigCommitterSpec extends AnyWordSpec with Matchers {
val instance = createConfigCommitter(aRecordTime)
val context = createCommitContext(recordTime = None)
val actual = instance.checkTtl(context, anEmptyResult)(loggingContext)
val actual = instance.checkTtl(context, anEmptyResult)
actual match {
case StepContinue(_) => succeed
@ -76,7 +76,7 @@ class ConfigCommitterSpec extends AnyWordSpec with Matchers {
val instance = createConfigCommitter(aRecordTime)
val context = createCommitContext(recordTime = None)
instance.checkTtl(context, anEmptyResult)(loggingContext)
instance.checkTtl(context, anEmptyResult)
context.maximumRecordTime shouldEqual Some(aRecordTime.toInstant)
context.outOfTimeBoundsLogEntry should not be empty
@ -94,7 +94,7 @@ class ConfigCommitterSpec extends AnyWordSpec with Matchers {
val instance = createConfigCommitter(theRecordTime)
val context = createCommitContext(recordTime = Some(aRecordTime))
instance.checkTtl(context, anEmptyResult)(loggingContext)
instance.checkTtl(context, anEmptyResult)
context.preExecute shouldBe false
context.outOfTimeBoundsLogEntry shouldBe empty
@ -106,7 +106,7 @@ class ConfigCommitterSpec extends AnyWordSpec with Matchers {
val instance = createConfigCommitter(theRecordTime.addMicros(1000))
val context = createCommitContext(recordTime = Some(theRecordTime))
val actual = instance.buildLogEntry(context, anEmptyResult)(loggingContext)
val actual = instance.buildLogEntry(context, anEmptyResult)
actual match {
case StepContinue(_) => fail()
@ -120,7 +120,7 @@ class ConfigCommitterSpec extends AnyWordSpec with Matchers {
val instance = createConfigCommitter(theRecordTime)
val context = createCommitContext(recordTime = None)
val actual = instance.buildLogEntry(context, anEmptyResult)(loggingContext)
val actual = instance.buildLogEntry(context, anEmptyResult)
actual match {
case StepContinue(_) => fail()
@ -134,7 +134,7 @@ class ConfigCommitterSpec extends AnyWordSpec with Matchers {
val instance = createConfigCommitter(theRecordTime)
val context = createCommitContext(recordTime = recordTimeMaybe)
val actual = instance.buildLogEntry(context, anEmptyResult)(loggingContext)
val actual = instance.buildLogEntry(context, anEmptyResult)
actual match {
case StepContinue(_) => fail()

View File

@ -497,31 +497,32 @@ class PackageCommitterSpec extends AnyWordSpec with Matchers with ParallelTestEx
"buildLogEntry" should {
val anEmptyResult = DamlPackageUploadEntry.newBuilder
.setSubmissionId("an ID")
.setParticipantId("a participant") -> Map.empty[Ref.PackageId, Ast.Package]
val anEmptyResult = PackageCommitter.Result(
DamlPackageUploadEntry.newBuilder.setSubmissionId("an ID").setParticipantId("a participant"),
Map.empty,
)
def newCommitter = new CommitterWrapper(PackageValidationMode.No, PackagePreloadingMode.No)
"produce an out-of-time-bounds rejection log entry in case pre-execution is enabled" in {
val context = createCommitContext(recordTime = None)
newCommitter.packageCommitter.buildLogEntry(context, anEmptyResult)(loggingContext)
newCommitter.packageCommitter.buildLogEntry(context, anEmptyResult)
context.preExecute shouldBe true
context.outOfTimeBoundsLogEntry should not be empty
context.outOfTimeBoundsLogEntry.foreach { actual =>
actual.hasRecordTime shouldBe false
actual.hasPackageUploadRejectionEntry shouldBe true
actual.getPackageUploadRejectionEntry.getSubmissionId shouldBe anEmptyResult._1.getSubmissionId
actual.getPackageUploadRejectionEntry.getParticipantId shouldBe anEmptyResult._1.getParticipantId
actual.getPackageUploadRejectionEntry.getSubmissionId shouldBe anEmptyResult.uploadEntry.getSubmissionId
actual.getPackageUploadRejectionEntry.getParticipantId shouldBe anEmptyResult.uploadEntry.getParticipantId
}
}
"not set an out-of-time-bounds rejection log entry in case pre-execution is disabled" in {
val context = createCommitContext(recordTime = Some(theRecordTime))
newCommitter.packageCommitter.buildLogEntry(context, anEmptyResult)(loggingContext)
newCommitter.packageCommitter.buildLogEntry(context, anEmptyResult)
context.preExecute shouldBe false
context.outOfTimeBoundsLogEntry shouldBe empty

View File

@ -24,7 +24,7 @@ class PartyAllocationCommitterSpec extends AnyWordSpec with Matchers {
val instance = new PartyAllocationCommitter(metrics)
val context = createCommitContext(recordTime = None)
instance.buildLogEntry(context, aPartyAllocationEntry)(loggingContext)
instance.buildLogEntry(context, aPartyAllocationEntry)
context.preExecute shouldBe true
context.outOfTimeBoundsLogEntry should not be empty
@ -40,7 +40,7 @@ class PartyAllocationCommitterSpec extends AnyWordSpec with Matchers {
val instance = new PartyAllocationCommitter(metrics)
val context = createCommitContext(recordTime = Some(theRecordTime))
instance.buildLogEntry(context, aPartyAllocationEntry)(loggingContext)
instance.buildLogEntry(context, aPartyAllocationEntry)
context.preExecute shouldBe false
context.outOfTimeBoundsLogEntry shouldBe empty

View File

@ -11,7 +11,6 @@ import com.daml.ledger.participant.state.kvutils.Conversions.{buildTimestamp, co
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
import com.daml.ledger.participant.state.kvutils.Err.MissingInputState
import com.daml.ledger.participant.state.kvutils.TestHelpers._
import com.daml.ledger.participant.state.kvutils.committer.transaction.TransactionCommitter.DamlTransactionEntrySummary
import com.daml.ledger.participant.state.kvutils.committer.transaction.keys.ContractKeysValidation
import com.daml.ledger.participant.state.kvutils.committer.{
CommitContext,
@ -135,7 +134,7 @@ class TransactionCommitterSpec extends AnyWordSpec with Matchers with MockitoSug
val actual = transactionCommitter.trimUnnecessaryNodes(
context,
aRichTransactionTreeSummary,
)(loggingContext)
)
actual match {
case StepContinue(logEntry) =>
@ -168,8 +167,7 @@ class TransactionCommitterSpec extends AnyWordSpec with Matchers with MockitoSug
"continue if record time is not available" in {
val context = createCommitContext(recordTime = None)
val actual =
transactionCommitter.deduplicateCommand(context, aTransactionEntrySummary)(loggingContext)
val actual = transactionCommitter.deduplicateCommand(context, aTransactionEntrySummary)
actual match {
case StepContinue(_) => succeed
@ -182,8 +180,7 @@ class TransactionCommitterSpec extends AnyWordSpec with Matchers with MockitoSug
val context =
createCommitContext(recordTime = Some(aRecordTime), inputs = inputs)
val actual =
transactionCommitter.deduplicateCommand(context, aTransactionEntrySummary)(loggingContext)
val actual = transactionCommitter.deduplicateCommand(context, aTransactionEntrySummary)
actual match {
case StepContinue(_) => succeed
@ -197,8 +194,7 @@ class TransactionCommitterSpec extends AnyWordSpec with Matchers with MockitoSug
val context =
createCommitContext(recordTime = Some(aRecordTime.addMicros(1)), inputs = inputs)
val actual =
transactionCommitter.deduplicateCommand(context, aTransactionEntrySummary)(loggingContext)
val actual = transactionCommitter.deduplicateCommand(context, aTransactionEntrySummary)
actual match {
case StepContinue(_) => succeed
@ -218,8 +214,7 @@ class TransactionCommitterSpec extends AnyWordSpec with Matchers with MockitoSug
val context =
createCommitContext(recordTime = Some(recordTime), inputs = inputs)
val actual =
transactionCommitter.deduplicateCommand(context, aTransactionEntrySummary)(loggingContext)
val actual = transactionCommitter.deduplicateCommand(context, aTransactionEntrySummary)
actual match {
case StepContinue(_) => fail()
@ -236,7 +231,7 @@ class TransactionCommitterSpec extends AnyWordSpec with Matchers with MockitoSug
val result = transactionCommitter.validateLedgerTime(
contextWithTimeModelAndEmptyCommandDeduplication(),
aTransactionEntrySummary,
)(loggingContext)
)
result match {
case StepContinue(_) => succeed
@ -250,7 +245,7 @@ class TransactionCommitterSpec extends AnyWordSpec with Matchers with MockitoSug
transactionCommitter.validateLedgerTime(
context,
aDamlTransactionEntrySummaryWithSubmissionAndLedgerEffectiveTimes,
)(loggingContext)
)
context.minimumRecordTime shouldEqual Some(Instant.ofEpochSecond(-28))
context.maximumRecordTime shouldEqual Some(Instant.ofEpochSecond(31))
@ -268,7 +263,7 @@ class TransactionCommitterSpec extends AnyWordSpec with Matchers with MockitoSug
transactionCommitter.validateLedgerTime(
context,
aDamlTransactionEntrySummaryWithSubmissionAndLedgerEffectiveTimes,
)(loggingContext)
)
context.minimumRecordTime shouldEqual Some(
Instant.ofEpochSecond(3).plus(Timestamp.Resolution)
@ -309,8 +304,7 @@ class TransactionCommitterSpec extends AnyWordSpec with Matchers with MockitoSug
)
.build
)
val actual =
transactionCommitter.validateLedgerTime(context, transactionEntrySummary)(loggingContext)
val actual = transactionCommitter.validateLedgerTime(context, transactionEntrySummary)
actual match {
case StepContinue(_) => fail()
@ -327,7 +321,7 @@ class TransactionCommitterSpec extends AnyWordSpec with Matchers with MockitoSug
transactionCommitter.validateLedgerTime(
commitContext,
aTransactionEntrySummary,
)(loggingContext)
)
commitContext.getAccessedInputKeys should contain(configurationStateKey)
}
@ -337,8 +331,7 @@ class TransactionCommitterSpec extends AnyWordSpec with Matchers with MockitoSug
"set record time in log entry when it is available" in {
val context = createCommitContext(recordTime = Some(theRecordTime))
val actual =
transactionCommitter.buildLogEntry(aTransactionEntrySummary, context)
val actual = transactionCommitter.buildLogEntry(aTransactionEntrySummary, context)
actual.hasRecordTime shouldBe true
actual.getRecordTime shouldBe buildTimestamp(theRecordTime)
@ -360,8 +353,7 @@ class TransactionCommitterSpec extends AnyWordSpec with Matchers with MockitoSug
"produce an out-of-time-bounds rejection log entry in case pre-execution is enabled" in {
val context = createCommitContext(recordTime = None)
val _ =
transactionCommitter.buildLogEntry(aTransactionEntrySummary, context)
transactionCommitter.buildLogEntry(aTransactionEntrySummary, context)
context.preExecute shouldBe true
context.outOfTimeBoundsLogEntry should not be empty
@ -375,8 +367,7 @@ class TransactionCommitterSpec extends AnyWordSpec with Matchers with MockitoSug
"not set an out-of-time-bounds rejection log entry in case pre-execution is disabled" in {
val context = createCommitContext(recordTime = Some(aRecordTime))
val _ =
transactionCommitter.buildLogEntry(aTransactionEntrySummary, context)
transactionCommitter.buildLogEntry(aTransactionEntrySummary, context)
context.preExecute shouldBe false
context.outOfTimeBoundsLogEntry shouldBe empty
@ -387,7 +378,7 @@ class TransactionCommitterSpec extends AnyWordSpec with Matchers with MockitoSug
"always set blindingInfo" in {
val context = createCommitContext(recordTime = None)
val actual = transactionCommitter.blind(context, aTransactionEntrySummary)(loggingContext)
val actual = transactionCommitter.blind(context, aTransactionEntrySummary)
actual match {
case StepContinue(partialResult) =>
@ -529,7 +520,7 @@ class TransactionCommitterSpec extends AnyWordSpec with Matchers with MockitoSug
a[MissingInputState] should be thrownBy transactionCommitter.authorizeSubmitters(
context,
tx,
)(loggingContext)
)
}
"reject a submission when any of the submitters is not known" in {
@ -543,7 +534,7 @@ class TransactionCommitterSpec extends AnyWordSpec with Matchers with MockitoSug
)
val tx = DamlTransactionEntrySummary(createEmptyTransactionEntry(List(Alice, Bob)))
val result = transactionCommitter.authorizeSubmitters(context, tx)(loggingContext)
val result = transactionCommitter.authorizeSubmitters(context, tx)
result shouldBe a[StepStop]
val rejectionReason =
@ -562,7 +553,7 @@ class TransactionCommitterSpec extends AnyWordSpec with Matchers with MockitoSug
)
val tx = DamlTransactionEntrySummary(createEmptyTransactionEntry(List(Alice, Bob)))
val result = transactionCommitter.authorizeSubmitters(context, tx)(loggingContext)
val result = transactionCommitter.authorizeSubmitters(context, tx)
result shouldBe a[StepStop]
val rejectionReason =
@ -584,8 +575,8 @@ class TransactionCommitterSpec extends AnyWordSpec with Matchers with MockitoSug
)
val tx = DamlTransactionEntrySummary(createEmptyTransactionEntry(List(Alice, Bob, Emma)))
transactionCommitter
.authorizeSubmitters(context, tx)(loggingContext) shouldBe a[StepContinue[_]]
val result = transactionCommitter.authorizeSubmitters(context, tx)
result shouldBe a[StepContinue[_]]
}
lazy val Alice = "alice"
@ -778,11 +769,10 @@ class TransactionCommitterSpec extends AnyWordSpec with Matchers with MockitoSug
ctx: CommitContext,
transaction: SubmittedTransaction,
)(implicit loggingContext: LoggingContext): StepResult[DamlTransactionEntrySummary] =
ContractKeysValidation
.validateKeys(transactionCommitter)(
ctx,
DamlTransactionEntrySummary(createTransactionEntry(List("Alice"), transaction)),
)(loggingContext)
ContractKeysValidation.validateKeys(transactionCommitter)(
ctx,
DamlTransactionEntrySummary(createTransactionEntry(List("Alice"), transaction)),
)
def contractKeyState(contractId: String): DamlContractKeyState =
DamlContractKeyState

View File

@ -8,8 +8,10 @@ import java.time.{Instant, ZoneOffset, ZonedDateTime}
import com.daml.ledger.participant.state.kvutils.Conversions
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
import com.daml.ledger.participant.state.kvutils.committer.StepContinue
import com.daml.ledger.participant.state.kvutils.committer.transaction.TransactionCommitter
import com.daml.ledger.participant.state.kvutils.committer.transaction.TransactionCommitter.DamlTransactionEntrySummary
import com.daml.ledger.participant.state.kvutils.committer.transaction.{
DamlTransactionEntrySummary,
TransactionCommitter,
}
import com.daml.ledger.participant.state.v1.RejectionReason
import com.daml.logging.LoggingContext
import com.google.protobuf.ByteString
@ -29,11 +31,10 @@ class KeyMonotonicityValidationSpec
private val ledgerEffectiveTime =
ZonedDateTime.of(2021, 1, 1, 12, 0, 0, 0, ZoneOffset.UTC).toInstant
private val testTransactionEntry = DamlTransactionEntrySummary(
DamlTransactionEntry
.newBuilder()
DamlTransactionEntry.newBuilder
.setSubmissionSeed(testSubmissionSeed)
.setLedgerEffectiveTime(Conversions.buildTimestamp(ledgerEffectiveTime))
.build()
.build
)
"checkContractKeysCausalMonotonicity" should {
@ -71,12 +72,10 @@ class KeyMonotonicityValidationSpec
}
}
private def aStateValueActiveAt(activeAt: Instant) = DamlStateValue
.newBuilder()
.setContractKeyState(
DamlContractKeyState
.newBuilder()
.setActiveAt(Conversions.buildTimestamp(activeAt))
)
.build()
private def aStateValueActiveAt(activeAt: Instant) =
DamlStateValue.newBuilder
.setContractKeyState(
DamlContractKeyState.newBuilder.setActiveAt(Conversions.buildTimestamp(activeAt))
)
.build
}