Remove committer support for non pre-execution [kvl-734] (#13598)

CHANGELOG_BEGIN
[kvutils] - Pre-execution is the only supported execution mode
CHANGELOG_END
This commit is contained in:
Nicu Reut 2022-04-20 11:52:53 +02:00 committed by GitHub
parent 23feb9c513
commit a37dd63675
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 491 additions and 1056 deletions

View File

@ -9,7 +9,7 @@ import com.daml.lf.data.Ref
/** Errors thrown by kvutils.
*
* Validation and consistency errors are turned into command rejections.
* Note that [[KeyValueCommitting.processSubmission]] can also fail with a protobuf exception,
* Note that [[KeyValueCommitting.preExecuteSubmission]] can also fail with a protobuf exception,
* e.g. https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/InvalidProtocolBufferException.
*/
sealed abstract class Err extends RuntimeException with Product with Serializable {

View File

@ -14,12 +14,7 @@ import com.daml.ledger.participant.state.kvutils.committer.{
SubmissionExecutor,
}
import com.daml.ledger.participant.state.kvutils.store.events.DamlTransactionEntry
import com.daml.ledger.participant.state.kvutils.store.{
DamlLogEntry,
DamlLogEntryId,
DamlStateKey,
DamlStateValue,
}
import com.daml.ledger.participant.state.kvutils.store.{DamlLogEntry, DamlStateKey, DamlStateValue}
import com.daml.ledger.participant.state.kvutils.wire.DamlSubmission
import com.daml.lf.archive
import com.daml.lf.data.Ref
@ -27,7 +22,7 @@ import com.daml.lf.data.Time.Timestamp
import com.daml.lf.engine.Engine
import com.daml.lf.kv.archives.{ArchiveConversions, RawArchive}
import com.daml.lf.kv.transactions.{ContractIdOrKey, RawTransaction, TransactionConversions}
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
import com.google.protobuf.ByteString
@ -43,9 +38,6 @@ class KeyValueCommitting private[daml] (
engine: Engine,
metrics: Metrics,
) {
import KeyValueCommitting.submissionOutputs
private val logger = ContextualizedLogger.get(getClass)
/** Processes a Daml submission, given the allocated log entry id, the submission and its resolved inputs.
* Produces the log entry to be committed, and Daml state updates.
@ -58,8 +50,6 @@ class KeyValueCommitting private[daml] (
* that can be accessed through `entryId`. The Daml state updates may create new entries or update
* existing entries in the key-value store.
*
* @param entryId: Log entry id to which this submission is committed.
* @param recordTime: Record time at which this log entry is committed.
* @param defaultConfig: The default configuration that is to be used if no configuration has been committed to state.
* @param submission: Submission to commit to the ledger.
* @param participantId: The participant from which the submission originates. Expected to be authenticated.
@ -73,43 +63,6 @@ class KeyValueCommitting private[daml] (
* the submission.
* @return Log entry to be committed and the Daml state updates to be applied.
*/
@throws(classOf[Err])
def processSubmission(
entryId: DamlLogEntryId,
recordTime: Timestamp,
defaultConfig: Configuration,
submission: DamlSubmission,
participantId: Ref.ParticipantId,
inputState: DamlStateMap,
)(implicit loggingContext: LoggingContext): (DamlLogEntry, Map[DamlStateKey, DamlStateValue]) = {
metrics.daml.kvutils.committer.processing.inc()
metrics.daml.kvutils.committer.last.lastRecordTimeGauge.updateValue(recordTime.toString)
metrics.daml.kvutils.committer.last.lastEntryIdGauge.updateValue(Pretty.prettyEntryId(entryId))
metrics.daml.kvutils.committer.last.lastParticipantIdGauge.updateValue(participantId)
val ctx = metrics.daml.kvutils.committer.runTimer.time()
try {
val committer = createCommitter(engine, defaultConfig, submission)
val (logEntry, outputState) = committer.run(
Some(recordTime),
submission,
participantId,
inputState,
)
verifyStateUpdatesAgainstPreDeclaredOutputs(outputState, submission)
(logEntry, outputState)
} catch {
case scala.util.control.NonFatal(exception) =>
logger.warn("Exception while processing submission.", exception)
metrics.daml.kvutils.committer.last.lastExceptionGauge.updateValue(
s"${Pretty.prettyEntryId(entryId)}[${submission.getPayloadCase}]: $exception"
)
throw exception
} finally {
val _ = ctx.stop()
metrics.daml.kvutils.committer.processing.dec()
}
}
@throws(classOf[Err])
def preExecuteSubmission(
defaultConfig: Configuration,
@ -148,18 +101,6 @@ class KeyValueCommitting private[daml] (
throw Err.InvalidSubmission("DamlSubmission payload not set")
}
private def verifyStateUpdatesAgainstPreDeclaredOutputs(
actualStateUpdates: Map[DamlStateKey, DamlStateValue],
submission: DamlSubmission,
): Unit = {
val expectedStateUpdates = submissionOutputs(submission)
if (!(actualStateUpdates.keySet subsetOf expectedStateUpdates)) {
val unaccountedKeys = actualStateUpdates.keySet diff expectedStateUpdates
sys.error(
s"State updates not a subset of expected updates! Keys [$unaccountedKeys] are unaccounted for!"
)
}
}
}
object KeyValueCommitting {

View File

@ -25,7 +25,7 @@ import scala.jdk.CollectionConverters._
/** Methods to produce the [[DamlSubmission]] message.
*
* [[DamlSubmission]] is processed for committing with [[KeyValueCommitting.processSubmission]].
* [[DamlSubmission]] is processed for committing with [[KeyValueCommitting.preExecuteSubmission]].
*
* These methods are the only acceptable way of producing the submission messages.
* The protocol buffer messages must not be embedded in other protocol buffer messages,

View File

@ -51,7 +51,7 @@ package com.daml.ledger.participant.state.kvutils
* - Configuration extended with "Open World" flag that defines whether
* submissions from unallocated parties are accepted.
* - Support for authenticating submissions based on participant id. The
* [[KeyValueCommitting.processSubmission]] method now takes the participant id as
* [[KeyValueCommitting.preExecuteSubmission]] method now takes the participant id as
* argument.
* - Support for submitting authenticated configuration changes.
* - Bug in command deduplication fixed: rejected commands are now deduplicated correctly.

View File

@ -11,12 +11,11 @@ import com.daml.logging.{ContextualizedLogger, LoggingContext}
import scala.collection.{Factory, mutable}
/** Commit context provides access to state inputs, commit parameters (e.g. record time) and
/** Commit context provides access to state inputs, commit parameters and
* allows committer to set state outputs.
*/
private[kvutils] case class CommitContext(
private val inputs: DamlStateMap,
recordTime: Option[Timestamp],
participantId: Ref.ParticipantId,
) {
private[this] val logger = ContextualizedLogger.get(getClass)
@ -36,8 +35,6 @@ private[kvutils] case class CommitContext(
// pre-execution.
var outOfTimeBoundsLogEntry: Option[DamlLogEntry] = None
def preExecute: Boolean = recordTime.isEmpty
/** Retrieve value from output state, or if not found, from input state.
* Throws an exception if the key is not found in either.
*/

View File

@ -8,16 +8,10 @@ import com.daml.ledger.configuration.Configuration
import com.daml.ledger.participant.state.kvutils.Conversions.buildTimestamp
import com.daml.ledger.participant.state.kvutils.KeyValueCommitting.PreExecutionResult
import com.daml.ledger.participant.state.kvutils._
import com.daml.ledger.participant.state.kvutils.store.{
DamlLogEntry,
DamlOutOfTimeBoundsEntry,
DamlStateKey,
DamlStateValue,
}
import com.daml.ledger.participant.state.kvutils.store.events.DamlConfigurationEntry
import com.daml.ledger.participant.state.kvutils.store.{DamlLogEntry, DamlOutOfTimeBoundsEntry}
import com.daml.ledger.participant.state.kvutils.wire.DamlSubmission
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.data.{Ref, Time}
import com.daml.lf.data.Ref
import com.daml.logging.LoggingContext.withEnrichedLoggingContextFrom
import com.daml.logging.entries.LoggingEntries
import com.daml.logging.{ContextualizedLogger, LoggingContext}
@ -56,7 +50,6 @@ private[committer] trait Committer[PartialResult] extends SubmissionExecutor {
// These are lazy because they rely on `committerName`, which is defined in the subclass and
// therefore not set at object initialization.
private lazy val runTimer: Timer = metrics.daml.kvutils.committer.runTimer(committerName)
private lazy val preExecutionRunTimer: Timer =
metrics.daml.kvutils.committer.preExecutionRunTimer(committerName)
private lazy val stepTimers: Map[StepInfo, Timer] =
@ -79,26 +72,13 @@ private[committer] trait Committer[PartialResult] extends SubmissionExecutor {
protected val metrics: Metrics
/** A committer can `run` a submission and produce a log entry and output states. */
def run(
recordTime: Option[Time.Timestamp],
submission: DamlSubmission,
participantId: Ref.ParticipantId,
inputState: DamlStateMap,
)(implicit loggingContext: LoggingContext): (DamlLogEntry, Map[DamlStateKey, DamlStateValue]) =
runTimer.time { () =>
val commitContext = CommitContext(inputState, recordTime, participantId)
val logEntry = runSteps(commitContext, submission)
logEntry -> commitContext.getOutputs.toMap
}
def runWithPreExecution(
submission: DamlSubmission,
participantId: Ref.ParticipantId,
inputState: DamlStateMap,
)(implicit loggingContext: LoggingContext): PreExecutionResult =
preExecutionRunTimer.time { () =>
val commitContext = CommitContext(inputState, recordTime = None, participantId)
val commitContext = CommitContext(inputState, participantId)
preExecute(submission, commitContext)
}
@ -187,21 +167,4 @@ object Committer {
}
.getOrElse(None -> defaultConfig)
def buildLogEntryWithOptionalRecordTime(
recordTime: Option[Timestamp],
addSubmissionSpecificEntry: DamlLogEntry.Builder => DamlLogEntry.Builder,
): DamlLogEntry = {
val logEntryBuilder = DamlLogEntry.newBuilder
addSubmissionSpecificEntry(logEntryBuilder)
setRecordTimeIfAvailable(recordTime, logEntryBuilder)
logEntryBuilder.build
}
private def setRecordTimeIfAvailable(
recordTime: Option[Timestamp],
logEntryBuilder: DamlLogEntry.Builder,
): DamlLogEntry.Builder =
recordTime.fold(logEntryBuilder)(timestamp =>
logEntryBuilder.setRecordTime(buildTimestamp(timestamp))
)
}

View File

@ -4,11 +4,7 @@
package com.daml.ledger.participant.state.kvutils.committer
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.participant.state.kvutils.Conversions.{
buildTimestamp,
configDedupKey,
configurationStateKey,
}
import com.daml.ledger.participant.state.kvutils.Conversions.{configDedupKey, configurationStateKey}
import com.daml.ledger.participant.state.kvutils.committer.Committer._
import com.daml.ledger.participant.state.kvutils.store.events.{
DamlConfigurationEntry,
@ -17,7 +13,6 @@ import com.daml.ledger.participant.state.kvutils.store.events.{
GenerationMismatch,
Invalid,
ParticipantNotAuthorized,
TimedOut,
}
import com.daml.ledger.participant.state.kvutils.store.{
DamlLogEntry,
@ -71,25 +66,12 @@ private[kvutils] class ConfigCommitter(
def apply(
ctx: CommitContext,
result: Result,
)(implicit loggingContext: LoggingContext): StepResult[Result] =
// Check the maximum record time against the record time of the commit.
// This mechanism allows the submitter to detect lost submissions and retry
// with a submitter controlled rate.
if (ctx.recordTime.exists(_ > maximumRecordTime)) {
rejectionTraceLog(s"submission timed out (${ctx.recordTime} > $maximumRecordTime)")
reject(
ctx.recordTime,
result.submission,
_.setTimedOut(TimedOut.newBuilder.setMaximumRecordTime(buildTimestamp(maximumRecordTime))),
)
} else {
if (ctx.preExecute) {
// Propagate the time bounds and defer the checks to post-execution.
ctx.maximumRecordTime = Some(maximumRecordTime)
setOutOfTimeBoundsLogEntry(result.submission, ctx)
}
StepContinue(result)
}
)(implicit loggingContext: LoggingContext): StepResult[Result] = {
// Propagate the time bounds and defer the checks to post-execution.
ctx.maximumRecordTime = Some(maximumRecordTime)
setOutOfTimeBoundsLogEntry(result.submission, ctx)
StepContinue(result)
}
}
private val authorizeSubmission: Step = new Step {
@ -113,7 +95,6 @@ private[kvutils] class ConfigCommitter(
s"participant id ${result.submission.getParticipantId} did not match authenticated participant id ${ctx.participantId}"
rejectionTraceLog(message)
reject(
ctx.recordTime,
result.submission,
_.setParticipantNotAuthorized(ParticipantNotAuthorized.newBuilder.setDetails(message)),
)
@ -121,7 +102,6 @@ private[kvutils] class ConfigCommitter(
val message = s"${ctx.participantId} is not authorized to change configuration."
rejectionTraceLog(message)
reject(
ctx.recordTime,
result.submission,
_.setParticipantNotAuthorized(ParticipantNotAuthorized.newBuilder.setDetails(message)),
)
@ -141,14 +121,12 @@ private[kvutils] class ConfigCommitter(
.fold(
err =>
reject(
ctx.recordTime,
result.submission,
_.setInvalidConfiguration(Invalid.newBuilder.setDetails(err)),
),
config =>
if (config.generation != (1 + result.currentConfig._2.generation))
reject(
ctx.recordTime,
result.submission,
_.setGenerationMismatch(
GenerationMismatch.newBuilder
@ -172,7 +150,6 @@ private[kvutils] class ConfigCommitter(
val message = s"duplicate submission='${result.submission.getSubmissionId}'"
rejectionTraceLog(message)
reject(
ctx.recordTime,
result.submission,
_.setDuplicateSubmission(Duplicate.newBuilder.setDetails(message)),
)
@ -207,21 +184,20 @@ private[kvutils] class ConfigCommitter(
.build,
)
val successLogEntry = buildLogEntryWithOptionalRecordTime(
ctx.recordTime,
_.setConfigurationEntry(configurationEntry),
)
val successLogEntry = DamlLogEntry
.newBuilder()
.setConfigurationEntry(configurationEntry)
.build()
StepStop(successLogEntry)
}
}
private def reject[PartialResult](
recordTime: Option[Timestamp],
submission: DamlConfigurationSubmission,
addErrorDetails: DamlConfigurationRejectionEntry.Builder => DamlConfigurationRejectionEntry.Builder,
): StepResult[PartialResult] = {
metrics.daml.kvutils.committer.config.rejections.inc()
StepStop(buildRejectionLogEntry(recordTime, submission, addErrorDetails))
StepStop(buildRejectionLogEntry(submission, addErrorDetails))
}
private def setOutOfTimeBoundsLogEntry(
@ -229,26 +205,25 @@ private[kvutils] class ConfigCommitter(
commitContext: CommitContext,
): Unit = {
commitContext.outOfTimeBoundsLogEntry = Some(
buildRejectionLogEntry(recordTime = None, submission, identity)
buildRejectionLogEntry(submission, identity)
)
}
private def buildRejectionLogEntry(
recordTime: Option[Timestamp],
submission: DamlConfigurationSubmission,
addErrorDetails: DamlConfigurationRejectionEntry.Builder => DamlConfigurationRejectionEntry.Builder,
): DamlLogEntry = {
buildLogEntryWithOptionalRecordTime(
recordTime,
_.setConfigurationRejectionEntry(
DamlLogEntry
.newBuilder()
.setConfigurationRejectionEntry(
addErrorDetails(
DamlConfigurationRejectionEntry.newBuilder
.setSubmissionId(submission.getSubmissionId)
.setParticipantId(submission.getParticipantId)
.setConfiguration(submission.getConfiguration)
)
),
)
)
.build()
}
override protected val steps: Steps[Result] = Iterable(

View File

@ -6,7 +6,6 @@ package com.daml.ledger.participant.state.kvutils.committer
import java.util.concurrent.Executors
import com.daml.ledger.participant.state.kvutils.Conversions.packageUploadDedupKey
import com.daml.ledger.participant.state.kvutils.committer.Committer.buildLogEntryWithOptionalRecordTime
import com.daml.ledger.participant.state.kvutils.store.events.PackageUpload.{
DamlPackageUploadEntry,
DamlPackageUploadRejectionEntry,
@ -26,7 +25,6 @@ import com.daml.ledger.participant.state.kvutils.wire.DamlSubmission
import com.daml.lf.archive.ArchiveParser
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.PackageId
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.engine.Engine
import com.daml.lf.kv.archives.{ArchiveConversions, RawArchive}
import com.daml.lf.language.Ast
@ -76,31 +74,29 @@ final private[kvutils] class PackageCommitter(
logger.trace(s"Package upload rejected: $message.")
private def reject(
recordTime: Option[Timestamp],
submissionId: String,
participantId: String,
addErrorDetails: DamlPackageUploadRejectionEntry.Builder => DamlPackageUploadRejectionEntry.Builder,
): StepStop = {
metrics.daml.kvutils.committer.packageUpload.rejections.inc()
StepStop(buildRejectionLogEntry(recordTime, submissionId, participantId, addErrorDetails))
StepStop(buildRejectionLogEntry(submissionId, participantId, addErrorDetails))
}
private def buildRejectionLogEntry(
recordTime: Option[Timestamp],
submissionId: String,
participantId: String,
addErrorDetails: DamlPackageUploadRejectionEntry.Builder => DamlPackageUploadRejectionEntry.Builder,
): DamlLogEntry =
buildLogEntryWithOptionalRecordTime(
recordTime,
_.setPackageUploadRejectionEntry(
DamlLogEntry
.newBuilder()
.setPackageUploadRejectionEntry(
addErrorDetails(
DamlPackageUploadRejectionEntry.newBuilder
.setSubmissionId(submissionId)
.setParticipantId(participantId)
)
),
)
)
.build()
private def setOutOfTimeBoundsLogEntry(
uploadEntry: DamlPackageUploadEntry.Builder,
@ -108,7 +104,6 @@ final private[kvutils] class PackageCommitter(
): Unit =
commitContext.outOfTimeBoundsLogEntry = Some(
buildRejectionLogEntry(
recordTime = None,
uploadEntry.getSubmissionId,
uploadEntry.getParticipantId,
identity,
@ -130,7 +125,6 @@ final private[kvutils] class PackageCommitter(
val uploadEntry = partialResult.uploadEntry
rejectionTraceLog(errors.map(_.msg).mkString("[", ",", "]"))
reject(
ctx.recordTime,
uploadEntry.getSubmissionId,
uploadEntry.getParticipantId,
_.setInvalidPackage(Invalid.newBuilder.setDetails("Cannot parse package ID")),
@ -152,7 +146,6 @@ final private[kvutils] class PackageCommitter(
s"Participant ID '${uploadEntry.getParticipantId}' did not match authorized participant ID '${ctx.participantId}'"
rejectionTraceLog(message)
reject(
ctx.recordTime,
uploadEntry.getSubmissionId,
uploadEntry.getParticipantId,
_.setParticipantNotAuthorized(ParticipantNotAuthorized.newBuilder.setDetails(message)),
@ -174,7 +167,6 @@ final private[kvutils] class PackageCommitter(
val message = s"duplicate submission='${uploadEntry.getSubmissionId}'"
rejectionTraceLog(message)
reject(
ctx.recordTime,
uploadEntry.getSubmissionId,
uploadEntry.getParticipantId,
_.setDuplicateSubmission(Duplicate.newBuilder.setDetails(message)),
@ -209,7 +201,6 @@ final private[kvutils] class PackageCommitter(
.mkString(", ")
rejectionTraceLog(message)
reject(
ctx.recordTime,
uploadEntry.getSubmissionId,
uploadEntry.getParticipantId,
_.setInvalidPackage(Invalid.newBuilder.setDetails(message)),
@ -266,7 +257,6 @@ final private[kvutils] class PackageCommitter(
case Left(message) =>
rejectionTraceLog(message)
reject(
ctx.recordTime,
uploadEntry.getSubmissionId,
uploadEntry.getParticipantId,
_.setInvalidPackage(Invalid.newBuilder.setDetails(message)),
@ -301,7 +291,6 @@ final private[kvutils] class PackageCommitter(
val message = errors.mkString(", ")
rejectionTraceLog(message)
reject(
ctx.recordTime,
uploadEntry.getSubmissionId,
uploadEntry.getParticipantId,
_.setInvalidPackage(Invalid.newBuilder.setDetails(message)),
@ -348,7 +337,6 @@ final private[kvutils] class PackageCommitter(
case Left(message) =>
rejectionTraceLog(message)
reject(
ctx.recordTime,
uploadEntry.getSubmissionId,
uploadEntry.getParticipantId,
_.setInvalidPackage(Invalid.newBuilder.setDetails(message)),
@ -447,10 +435,8 @@ final private[kvutils] class PackageCommitter(
.build,
)
val successLogEntry =
buildLogEntryWithOptionalRecordTime(ctx.recordTime, _.setPackageUploadEntry(uploadEntry))
if (ctx.preExecute) {
setOutOfTimeBoundsLogEntry(uploadEntry, ctx)
}
DamlLogEntry.newBuilder().setPackageUploadEntry(uploadEntry).build()
setOutOfTimeBoundsLogEntry(uploadEntry, ctx)
StepStop(successLogEntry)
}
}

View File

@ -4,7 +4,6 @@
package com.daml.ledger.participant.state.kvutils.committer
import com.daml.ledger.participant.state.kvutils.Conversions.partyAllocationDedupKey
import com.daml.ledger.participant.state.kvutils.committer.Committer.buildLogEntryWithOptionalRecordTime
import com.daml.ledger.participant.state.kvutils.store.events.{
AlreadyExists,
DamlPartyAllocationEntry,
@ -22,7 +21,6 @@ import com.daml.ledger.participant.state.kvutils.store.{
}
import com.daml.ledger.participant.state.kvutils.wire.DamlSubmission
import com.daml.lf.data.Ref
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.entries.LoggingEntries
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
@ -67,7 +65,6 @@ private[kvutils] class PartyAllocationCommitter(
s"participant id ${result.getParticipantId} did not match authenticated participant id ${ctx.participantId}"
rejectionTraceLog(message)
reject(
ctx.recordTime,
result,
_.setParticipantNotAuthorized(ParticipantNotAuthorized.newBuilder.setDetails(message)),
)
@ -86,7 +83,6 @@ private[kvutils] class PartyAllocationCommitter(
val message = s"party string '$party' invalid"
rejectionTraceLog(message)
reject(
ctx.recordTime,
result,
_.setInvalidName(Invalid.newBuilder.setDetails(message)),
)
@ -107,7 +103,6 @@ private[kvutils] class PartyAllocationCommitter(
val message = s"party already exists party='$party'"
rejectionTraceLog(message)
reject(
ctx.recordTime,
result,
_.setAlreadyExists(AlreadyExists.newBuilder.setDetails(message)),
)
@ -127,7 +122,6 @@ private[kvutils] class PartyAllocationCommitter(
val message = s"duplicate submission='${result.getSubmissionId}'"
rejectionTraceLog(message)
reject(
ctx.recordTime,
result,
_.setDuplicateSubmission(Duplicate.newBuilder.setDetails(message)),
)
@ -163,46 +157,42 @@ private[kvutils] class PartyAllocationCommitter(
.build,
)
val successLogEntry = buildLogEntryWithOptionalRecordTime(
ctx.recordTime,
_.setPartyAllocationEntry(result),
)
if (ctx.preExecute) {
setOutOfTimeBoundsLogEntry(result, ctx)
}
val successLogEntry = DamlLogEntry
.newBuilder()
.setPartyAllocationEntry(result)
.build()
setOutOfTimeBoundsLogEntry(result, ctx)
StepStop(successLogEntry)
}
}
private def reject[PartialResult](
recordTime: Option[Timestamp],
result: Result,
addErrorDetails: DamlPartyAllocationRejectionEntry.Builder => DamlPartyAllocationRejectionEntry.Builder,
): StepResult[PartialResult] = {
metrics.daml.kvutils.committer.partyAllocation.rejections.inc()
StepStop(buildRejectionLogEntry(recordTime, result, addErrorDetails))
StepStop(buildRejectionLogEntry(result, addErrorDetails))
}
private def buildRejectionLogEntry(
recordTime: Option[Timestamp],
result: Result,
addErrorDetails: DamlPartyAllocationRejectionEntry.Builder => DamlPartyAllocationRejectionEntry.Builder,
): DamlLogEntry = {
buildLogEntryWithOptionalRecordTime(
recordTime,
_.setPartyAllocationRejectionEntry(
DamlLogEntry
.newBuilder()
.setPartyAllocationRejectionEntry(
addErrorDetails(
DamlPartyAllocationRejectionEntry.newBuilder
.setSubmissionId(result.getSubmissionId)
.setParticipantId(result.getParticipantId)
)
),
)
)
.build()
}
private def setOutOfTimeBoundsLogEntry(result: Result, commitContext: CommitContext): Unit = {
commitContext.outOfTimeBoundsLogEntry = Some(
buildRejectionLogEntry(recordTime = None, result, identity)
buildRejectionLogEntry(result, identity)
)
}

View File

@ -5,18 +5,11 @@ package com.daml.ledger.participant.state.kvutils.committer
import com.daml.ledger.participant.state.kvutils.DamlStateMap
import com.daml.ledger.participant.state.kvutils.KeyValueCommitting.PreExecutionResult
import com.daml.ledger.participant.state.kvutils.store.{DamlLogEntry, DamlStateKey, DamlStateValue}
import com.daml.ledger.participant.state.kvutils.wire.DamlSubmission
import com.daml.lf.data.{Ref, Time}
import com.daml.lf.data.Ref
import com.daml.logging.LoggingContext
trait SubmissionExecutor {
def run(
recordTime: Option[Time.Timestamp],
submission: DamlSubmission,
participantId: Ref.ParticipantId,
inputState: DamlStateMap,
)(implicit loggingContext: LoggingContext): (DamlLogEntry, Map[DamlStateKey, DamlStateValue])
def runWithPreExecution(
submission: DamlSubmission,

View File

@ -69,16 +69,12 @@ private[transaction] object CommandDeduplication {
if (isNotADuplicate) {
StepContinue(transactionEntry)
} else {
if (commitContext.preExecute) {
preExecutionDuplicateRejection(
commitContext,
transactionEntry,
commandDeduplicationDuration,
maybeDedupValue,
)
} else {
duplicateRejection(commitContext, transactionEntry.submitterInfo, maybeDedupValue)
}
preExecutionDuplicateRejection(
commitContext,
transactionEntry,
commandDeduplicationDuration,
maybeDedupValue,
)
}
}
@ -87,22 +83,16 @@ private[transaction] object CommandDeduplication {
commandDeduplicationDuration: Duration,
maybeDedupValue: Option[DamlCommandDedupValue],
): Boolean = {
val minimumRecordTime = commitContext.recordTime match {
case Some(recordTime) =>
// During the normal execution, in the deduplication state value we stored the record time
// This allows us to compare the record times directly
recordTime.toInstant
case None =>
// We select minimum record time for pre-execution
// During pre-execution in the deduplication state value we stored the maximum record time
// To guarantee the deduplication duration, we basically compare the maximum record time of the previous transaction
// with the minimum record time of the current transaction. This gives us the smallest possible interval between two transactions.
commitContext.minimumRecordTime
.getOrElse(
throw Err.InternalError("Minimum record time is not set for pre-execution")
)
.toInstant
}
// We select minimum record time for pre-execution
// During pre-execution in the deduplication state value we stored the maximum record time
// To guarantee the deduplication duration, we basically compare the maximum record time of the previous transaction
// with the minimum record time of the current transaction. This gives us the smallest possible interval between two transactions.
val minimumRecordTime = commitContext.minimumRecordTime
.getOrElse(
throw Err.InternalError("Minimum record time is not set for pre-execution")
)
.toInstant
val maybeDeduplicatedUntil = maybeDedupValue.flatMap(commandDeduplication =>
commandDeduplication.getTimeCase match {
// Backward-compatibility, will not be set for new entries
@ -152,7 +142,6 @@ private[transaction] object CommandDeduplication {
val rejectionDeduplicationDuration =
Seq(maxDurationBetweenRecords, commandDeduplicationDuration).max
duplicateRejection(
commitContext,
transactionEntry.submitterInfo.toBuilder
.setDeduplicationDuration(
buildDuration(rejectionDeduplicationDuration)
@ -161,12 +150,11 @@ private[transaction] object CommandDeduplication {
maybeDedupValue,
)
case None =>
duplicateRejection(commitContext, transactionEntry.submitterInfo, maybeDedupValue)
duplicateRejection(transactionEntry.submitterInfo, maybeDedupValue)
}
}
private def duplicateRejection(
commitContext: CommitContext,
submitterInfo: DamlSubmitterInfo,
dedupValue: Option[DamlCommandDedupValue],
)(implicit loggingContext: LoggingContext) = {
@ -181,7 +169,6 @@ private[transaction] object CommandDeduplication {
.setSubmissionId(dedupValue.map(_.getSubmissionId).getOrElse(""))
),
"the command is a duplicate",
commitContext.recordTime,
)
}
}
@ -204,28 +191,20 @@ private[transaction] object CommandDeduplication {
config.maxDeduplicationDuration
.plus(config.timeModel.maxSkew)
.plus(config.timeModel.minSkew)
commitContext.recordTime match {
case Some(recordTime) =>
val prunableFrom = recordTime.add(pruningInterval)
commandDedupBuilder
.setRecordTime(Conversions.buildTimestamp(recordTime))
.setPrunableFrom(Conversions.buildTimestamp(prunableFrom))
case None =>
val maxRecordTime = commitContext.maximumRecordTime.getOrElse(
throw Err.InternalError("Maximum record time is not set for pre-execution")
)
val minRecordTime = commitContext.minimumRecordTime.getOrElse(
throw Err.InternalError("Minimum record time is not set for pre-execution")
)
val prunableFrom = maxRecordTime.add(pruningInterval)
commandDedupBuilder
.setRecordTimeBounds(
PreExecutionDeduplicationBounds.newBuilder
.setMaxRecordTime(Conversions.buildTimestamp(maxRecordTime))
.setMinRecordTime(Conversions.buildTimestamp(minRecordTime))
)
.setPrunableFrom(Conversions.buildTimestamp(prunableFrom))
}
val maxRecordTime = commitContext.maximumRecordTime.getOrElse(
throw Err.InternalError("Maximum record time is not set for pre-execution")
)
val minRecordTime = commitContext.minimumRecordTime.getOrElse(
throw Err.InternalError("Minimum record time is not set for pre-execution")
)
val prunableFrom = maxRecordTime.add(pruningInterval)
commandDedupBuilder
.setRecordTimeBounds(
PreExecutionDeduplicationBounds.newBuilder
.setMaxRecordTime(Conversions.buildTimestamp(maxRecordTime))
.setMinRecordTime(Conversions.buildTimestamp(minRecordTime))
)
.setPrunableFrom(Conversions.buildTimestamp(prunableFrom))
// Set a deduplication entry.
commitContext.set(

View File

@ -5,8 +5,8 @@ package com.daml.ledger.participant.state.kvutils.committer.transaction
import com.codahale.metrics.Counter
import com.daml.ledger.participant.state.kvutils.Conversions
import com.daml.ledger.participant.state.kvutils.committer.Committer.buildLogEntryWithOptionalRecordTime
import com.daml.ledger.participant.state.kvutils.committer.{StepResult, StepStop}
import com.daml.ledger.participant.state.kvutils.store.DamlLogEntry
import com.daml.ledger.participant.state.kvutils.store.events.{
DamlSubmitterInfo,
DamlTransactionRejectionEntry,
@ -22,26 +22,20 @@ private[transaction] class Rejections(metrics: Metrics) {
def reject[A](
transactionEntry: DamlTransactionEntrySummary,
rejection: Rejection,
recordTime: Option[Timestamp],
)(implicit loggingContext: LoggingContext): StepResult[A] =
reject(
Conversions.encodeTransactionRejectionEntry(transactionEntry.submitterInfo, rejection),
rejection.description,
recordTime,
)
def reject[A](
rejectionEntry: DamlTransactionRejectionEntry.Builder,
rejectionDescription: String,
recordTime: Option[Timestamp],
)(implicit loggingContext: LoggingContext): StepResult[A] = {
Metrics.rejections(rejectionEntry.getReasonCase.getNumber).inc()
logger.trace(s"Transaction rejected, $rejectionDescription.")
StepStop(
buildLogEntryWithOptionalRecordTime(
recordTime,
_.setTransactionRejectionEntry(rejectionEntry),
)
DamlLogEntry.newBuilder().setTransactionRejectionEntry(rejectionEntry).build()
)
}

View File

@ -18,20 +18,18 @@ object TimeBoundBindingStep {
val (_, config) = getCurrentConfiguration(defaultConfig, commitContext)
val timeModel = config.timeModel
if (commitContext.preExecute) {
val minimumRecordTime = transactionMinRecordTime(
transactionEntry.submissionTime,
transactionEntry.ledgerEffectiveTime,
timeModel,
)
val maximumRecordTime = transactionMaxRecordTime(
transactionEntry.submissionTime,
transactionEntry.ledgerEffectiveTime,
timeModel,
)
commitContext.minimumRecordTime = Some(minimumRecordTime)
commitContext.maximumRecordTime = Some(maximumRecordTime)
}
val minimumRecordTime = transactionMinRecordTime(
transactionEntry.submissionTime,
transactionEntry.ledgerEffectiveTime,
timeModel,
)
val maximumRecordTime = transactionMaxRecordTime(
transactionEntry.submissionTime,
transactionEntry.ledgerEffectiveTime,
timeModel,
)
commitContext.minimumRecordTime = Some(minimumRecordTime)
commitContext.maximumRecordTime = Some(maximumRecordTime)
StepContinue(transactionEntry)
}
}

View File

@ -8,7 +8,6 @@ package com.daml.ledger.participant.state.kvutils.committer.transaction
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.participant.state.kvutils.Conversions._
import com.daml.ledger.participant.state.kvutils.committer.Committer._
import com.daml.ledger.participant.state.kvutils.committer._
import com.daml.ledger.participant.state.kvutils.committer.transaction.validation.{
CommitterModelConformanceValidator,
@ -65,7 +64,7 @@ private[kvutils] class TransactionCommitter(
DamlTransactionEntrySummary(submission.getTransactionEntry)
private val rejections = new Rejections(metrics)
private val ledgerTimeValidator = new LedgerTimeValidator(defaultConfig)
private val ledgerTimeValidator = new LedgerTimeValidator
private val committerModelConformanceValidator =
new CommitterModelConformanceValidator(engine, metrics)
@ -129,7 +128,6 @@ private[kvutils] class TransactionCommitter(
rejections.reject(
transactionEntry,
reason,
commitContext.recordTime,
)
authorizeAll(transactionEntry.submitters)
@ -213,7 +211,6 @@ private[kvutils] class TransactionCommitter(
rejections.reject(
transactionEntry,
Rejection.PartiesNotKnownOnLedger(missingParties),
commitContext.recordTime,
)
}
}
@ -326,20 +323,18 @@ private[kvutils] object TransactionCommitter {
transactionEntry: DamlTransactionEntrySummary,
commitContext: CommitContext,
): DamlLogEntry = {
if (commitContext.preExecute) {
val outOfTimeBoundsLogEntry = DamlLogEntry.newBuilder
.setTransactionRejectionEntry(
DamlTransactionRejectionEntry.newBuilder
.setDefiniteAnswer(false)
.setSubmitterInfo(transactionEntry.submitterInfo)
)
.build
commitContext.outOfTimeBoundsLogEntry = Some(outOfTimeBoundsLogEntry)
}
buildLogEntryWithOptionalRecordTime(
commitContext.recordTime,
_.setTransactionEntry(transactionEntry.submission),
)
val outOfTimeBoundsLogEntry = DamlLogEntry.newBuilder
.setTransactionRejectionEntry(
DamlTransactionRejectionEntry.newBuilder
.setDefiniteAnswer(false)
.setSubmitterInfo(transactionEntry.submitterInfo)
)
.build
commitContext.outOfTimeBoundsLogEntry = Some(outOfTimeBoundsLogEntry)
DamlLogEntry
.newBuilder()
.setTransactionEntry(transactionEntry.submission)
.build()
}
// Helper to read the _current_ contract state.

View File

@ -18,7 +18,6 @@ import com.daml.ledger.participant.state.kvutils.committer.{CommitContext, StepC
import com.daml.ledger.participant.state.kvutils.store.{DamlContractState, DamlStateValue}
import com.daml.ledger.participant.state.kvutils.{Conversions, Err}
import com.daml.lf.data.Ref.PackageId
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.engine.{Engine, Result}
import com.daml.lf.kv.archives.{ArchiveConversions, RawArchive}
import com.daml.lf.kv.contracts.{ContractConversions, RawContractInstance}
@ -95,7 +94,7 @@ private[transaction] class CommitterModelConformanceValidator(engine: Engine, me
try {
val stepResult = for {
contractKeyInputs <- transactionEntry.transaction.contractKeyInputs.left
.map(rejectionForKeyInputError(transactionEntry, commitContext.recordTime, rejections))
.map(rejectionForKeyInputError(transactionEntry, rejections))
_ <- validationResult
.consume(
lookupContract(commitContext),
@ -107,7 +106,6 @@ private[transaction] class CommitterModelConformanceValidator(engine: Engine, me
rejections.reject(
transactionEntry,
Rejection.ValidationFailure(error),
commitContext.recordTime,
)
)
} yield ()
@ -122,7 +120,6 @@ private[transaction] class CommitterModelConformanceValidator(engine: Engine, me
rejections.reject(
transactionEntry,
Rejection.MissingInputState(key),
commitContext.recordTime,
)
// Archive decoding error or other bug
@ -134,7 +131,6 @@ private[transaction] class CommitterModelConformanceValidator(engine: Engine, me
rejections.reject(
transactionEntry,
Rejection.InvalidParticipantState(err),
commitContext.recordTime,
)
}
}
@ -238,7 +234,6 @@ private[transaction] class CommitterModelConformanceValidator(engine: Engine, me
rejections.reject(
transactionEntry,
Rejection.CausalMonotonicityViolated,
commitContext.recordTime,
)
}
}
@ -247,7 +242,6 @@ private[transaction] object CommitterModelConformanceValidator {
private def rejectionForKeyInputError(
transactionEntry: DamlTransactionEntrySummary,
recordTime: Option[Timestamp],
rejections: Rejections,
)(
error: KeyInputError
@ -258,6 +252,6 @@ private[transaction] object CommitterModelConformanceValidator {
case InconsistentKeys(_) =>
Rejection.InternallyInconsistentTransaction.InconsistentKeys
}
rejections.reject(transactionEntry, rejection, recordTime)
rejections.reject(transactionEntry, rejection)
}
}

View File

@ -3,12 +3,9 @@
package com.daml.ledger.participant.state.kvutils.committer.transaction.validation
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.participant.state.kvutils.Err
import com.daml.ledger.participant.state.kvutils.committer.Committer.getCurrentConfiguration
import com.daml.ledger.participant.state.kvutils.committer.transaction.{
DamlTransactionEntrySummary,
Rejection,
Rejections,
Step,
}
@ -16,8 +13,7 @@ import com.daml.ledger.participant.state.kvutils.committer.{CommitContext, StepC
import com.daml.ledger.participant.state.kvutils.store.DamlLogEntry
import com.daml.logging.LoggingContext
private[transaction] class LedgerTimeValidator(defaultConfig: Configuration)
extends TransactionValidator {
private[transaction] class LedgerTimeValidator extends TransactionValidator {
/** Creates a committer step that validates ledger effective time and the command's time-to-live. */
override def createValidationStep(rejections: Rejections): Step =
@ -27,42 +23,23 @@ private[transaction] class LedgerTimeValidator(defaultConfig: Configuration)
transactionEntry: DamlTransactionEntrySummary,
)(implicit loggingContext: LoggingContext): StepResult[DamlTransactionEntrySummary] = {
commitContext.recordTime match {
case Some(recordTime) =>
val givenLedgerTime = transactionEntry.ledgerEffectiveTime
val (_, config) = getCurrentConfiguration(defaultConfig, commitContext)
val timeModel = config.timeModel
timeModel
.checkTime(ledgerTime = givenLedgerTime, recordTime = recordTime)
.fold(
outOfRange =>
rejections.reject(
transactionEntry,
Rejection.LedgerTimeOutOfRange(outOfRange),
commitContext.recordTime,
),
_ => StepContinue(transactionEntry),
)
case None => // Pre-execution: propagate the time bounds and defer the checks to post-execution.
(commitContext.minimumRecordTime, commitContext.maximumRecordTime) match {
case (Some(minimumRecordTime), Some(maximumRecordTime)) =>
val outOfTimeBoundsLogEntry = DamlLogEntry.newBuilder
.setTransactionRejectionEntry(
rejections.preExecutionOutOfTimeBoundsRejectionEntry(
transactionEntry.submitterInfo,
minimumRecordTime,
maximumRecordTime,
)
)
.build
commitContext.outOfTimeBoundsLogEntry = Some(outOfTimeBoundsLogEntry)
StepContinue(transactionEntry)
case _ =>
throw Err.InternalError(
"Minimum and maximum record times were not set in the committer context"
(commitContext.minimumRecordTime, commitContext.maximumRecordTime) match {
case (Some(minimumRecordTime), Some(maximumRecordTime)) =>
val outOfTimeBoundsLogEntry = DamlLogEntry.newBuilder
.setTransactionRejectionEntry(
rejections.preExecutionOutOfTimeBoundsRejectionEntry(
transactionEntry.submitterInfo,
minimumRecordTime,
maximumRecordTime,
)
}
)
.build
commitContext.outOfTimeBoundsLogEntry = Some(outOfTimeBoundsLogEntry)
StepContinue(transactionEntry)
case _ =>
throw Err.InternalError(
"Minimum and maximum record times were not set in the committer context"
)
}
}
}

View File

@ -109,7 +109,6 @@ private[transaction] object TransactionConsistencyValidator extends TransactionV
rejections.reject(
transactionEntry,
rejection,
commitContext.recordTime,
)
}
}
@ -135,7 +134,6 @@ private[transaction] object TransactionConsistencyValidator extends TransactionV
rejections.reject(
transactionEntry,
Rejection.ExternallyInconsistentTransaction.InconsistentContracts,
commitContext.recordTime,
)
}

View File

@ -110,10 +110,11 @@ object KVTest {
simplePackage: SimplePackage
)(implicit loggingContext: LoggingContext): KVTest[Unit] =
for {
archiveLogEntry <- submitArchives(
result <- preExecuteArchives(
"simple-archive-submission",
simplePackage.archives.values.toSeq: _*
).map(_._2)
archiveLogEntry = result.successfulLogEntry
_ = assert(archiveLogEntry.getPayloadCase == DamlLogEntry.PayloadCase.PACKAGE_UPLOAD_ENTRY)
_ <- modify[KVTestState](state =>
state.copy(uploadedPackages = state.uploadedPackages ++ simplePackage.packages)
@ -166,23 +167,15 @@ object KVTest {
def getDamlState(key: DamlStateKey): KVTest[Option[DamlStateValue]] =
gets(s => s.damlState.get(key))
def submitArchives(
submissionId: String,
archives: DamlLf.Archive*
)(implicit loggingContext: LoggingContext): KVTest[(DamlLogEntryId, DamlLogEntry)] =
get.flatMap { testState =>
submit(
createArchiveSubmission(submissionId, testState, archives: _*)
)
}
def preExecuteArchives(
submissionId: String,
archives: DamlLf.Archive*
)(implicit loggingContext: LoggingContext): KVTest[(DamlLogEntryId, PreExecutionResult)] =
get.flatMap { testState =>
preExecute(
createArchiveSubmission(submissionId, testState, archives: _*)
damlSubmission = createArchiveSubmission(submissionId, testState, archives: _*),
validateOutOfTimeBoundsWriteSet =
false, // no record time bounds are set, therefore the out of time bounds write set is meaningless
)
}
@ -229,23 +222,6 @@ object KVTest {
)(implicit loggingContext: LoggingContext): KVTest[(SubmittedTransaction, Transaction.Metadata)] =
runCommand(submitter, submissionSeed, command)
def submitTransaction(
submitter: Ref.Party,
transaction: (SubmittedTransaction, Transaction.Metadata),
submissionSeed: crypto.Hash,
letDelta: Duration = Duration.ZERO,
commandId: Ref.CommandId = randomLedgerString,
deduplicationDuration: Duration = Duration.ofDays(1),
)(implicit loggingContext: LoggingContext): KVTest[(DamlLogEntryId, DamlLogEntry)] =
prepareTransactionSubmission(
submitter,
transaction,
submissionSeed,
letDelta,
commandId,
deduplicationDuration,
).flatMap(submit)
def preExecuteTransaction(
submitter: Ref.Party,
transaction: (SubmittedTransaction, Transaction.Metadata),
@ -261,7 +237,7 @@ object KVTest {
letDelta,
commandId,
deduplicationDuration,
).flatMap(preExecute)
).flatMap(preExecute(_, validateOutOfTimeBoundsWriteSet = true))
def prepareTransactionSubmission(
submitter: Ref.Party,
@ -292,25 +268,6 @@ object KVTest {
)
}
def submitConfig(
configModify: Configuration => Configuration,
submissionId: Ref.SubmissionId = randomLedgerString,
minMaxRecordTimeDelta: Duration = MinMaxRecordTimeDelta,
)(implicit loggingContext: LoggingContext): KVTest[DamlLogEntry] =
for {
testState <- get[KVTestState]
oldConf <- getConfiguration
result <- submit(
createConfigurationSubmission(
configModify,
submissionId,
minMaxRecordTimeDelta,
testState,
oldConf,
)
)
} yield result._2
def preExecuteConfig(
configModify: Configuration => Configuration,
submissionId: Ref.SubmissionId = randomLedgerString,
@ -320,25 +277,17 @@ object KVTest {
testState <- get[KVTestState]
oldConf <- getConfiguration
result <- preExecute(
createConfigurationSubmission(
damlSubmission = createConfigurationSubmission(
configModify,
submissionId,
minMaxRecordTimeDelta,
testState,
oldConf,
)
),
validateOutOfTimeBoundsWriteSet = true,
)
} yield result._2
def submitPartyAllocation(
subId: String,
hint: String,
participantId: Ref.ParticipantId,
)(implicit loggingContext: LoggingContext): KVTest[DamlLogEntry] =
get[KVTestState]
.flatMap(testState => submit(createPartySubmission(subId, hint, participantId, testState)))
.map(_._2)
def preExecutePartyAllocation(
subId: String,
hint: String,
@ -346,7 +295,11 @@ object KVTest {
)(implicit loggingContext: LoggingContext): KVTest[PreExecutionResult] =
get[KVTestState]
.flatMap(testState =>
preExecute(createPartySubmission(subId, hint, participantId, testState))
preExecute(
damlSubmission = createPartySubmission(subId, hint, participantId, testState),
validateOutOfTimeBoundsWriteSet =
false, // no record time bounds are set, therefore the out of time bounds write set is meaningless
)
)
.map(_._2)
@ -356,45 +309,16 @@ object KVTest {
)(implicit loggingContext: LoggingContext): KVTest[Ref.Party] =
for {
testState <- get[KVTestState]
result <- submitPartyAllocation(subId, hint, testState.participantId).map { logEntry =>
result <- preExecutePartyAllocation(subId, hint, testState.participantId).map { result =>
val logEntry = result.successfulLogEntry
assert(logEntry.getPayloadCase == DamlLogEntry.PayloadCase.PARTY_ALLOCATION_ENTRY)
Ref.Party.assertFromString(logEntry.getPartyAllocationEntry.getParty)
}
} yield result
private def submit(
submission: DamlSubmission
)(implicit loggingContext: LoggingContext): KVTest[(DamlLogEntryId, DamlLogEntry)] =
for {
testState <- get[KVTestState]
entryId <- freshEntryId
(logEntry, newState) = testState.keyValueCommitting.processSubmission(
entryId = entryId,
recordTime = testState.recordTime,
defaultConfig = testState.defaultConfig,
submission = submission,
participantId = testState.participantId,
inputState = submission.getInputDamlStateList.asScala.map { key =>
key -> testState.damlState.get(key)
}.toMap,
)
_ <- addDamlState(newState)
} yield {
// Verify that all state touched matches with "submissionOutputs".
assert(
newState.keySet subsetOf KeyValueCommitting.submissionOutputs(submission)
)
// Verify that we can always process the log entry.
val _ = KeyValueConsumption.logEntryToUpdate(
entryId,
logEntry,
)(loggingContext)
entryId -> logEntry
}
def preExecute(
damlSubmission: DamlSubmission
damlSubmission: DamlSubmission,
validateOutOfTimeBoundsWriteSet: Boolean = true,
)(implicit loggingContext: LoggingContext): KVTest[(DamlLogEntryId, PreExecutionResult)] =
for {
testState <- get[KVTestState]
@ -420,11 +344,12 @@ object KVTest {
successfulLogEntry,
recordTimeFromTimeUpdateLogEntry,
)(loggingContext)
KeyValueConsumption.logEntryToUpdate(
entryId,
outOfTimeBoundsLogEntry,
recordTimeFromTimeUpdateLogEntry,
)(loggingContext)
if (validateOutOfTimeBoundsWriteSet)
KeyValueConsumption.logEntryToUpdate(
entryId,
outOfTimeBoundsLogEntry,
recordTimeFromTimeUpdateLogEntry,
)(loggingContext)
entryId -> preExecutionResult
}

View File

@ -54,11 +54,10 @@ object TestHelpers {
Ref.LedgerString.assertFromString(UUID.randomUUID().toString)
def createCommitContext(
recordTime: Option[Timestamp],
inputs: DamlStateMap = Map.empty,
participantId: Int = 0,
): CommitContext =
CommitContext(inputs, recordTime, mkParticipantId(participantId))
CommitContext(inputs, mkParticipantId(participantId))
def createEmptyTransactionEntry(submitters: List[String]): DamlTransactionEntry =
createTransactionEntry(submitters, TransactionBuilder.EmptySubmitted)

View File

@ -54,22 +54,24 @@ class KVUtilsConfigSpec extends AnyWordSpec with Matchers {
"check generation" in KVTest.runTest {
for {
logEntry <- submitConfig(
result1 <- preExecuteConfig(
configModify = c => c.copy(generation = c.generation + 1),
submissionId = Ref.LedgerString.assertFromString("submission0"),
)
newConfig <- getConfiguration
// Change again, but without bumping generation.
logEntry2 <- submitConfig(
result2 <- preExecuteConfig(
configModify = c => c.copy(generation = c.generation),
submissionId = Ref.LedgerString.assertFromString("submission1"),
)
newConfig2 <- getConfiguration
} yield {
logEntry.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.CONFIGURATION_ENTRY
logEntry.getConfigurationEntry.getSubmissionId shouldEqual "submission0"
val logEntry1 = result1.successfulLogEntry
val logEntry2 = result2.successfulLogEntry
logEntry1.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.CONFIGURATION_ENTRY
logEntry1.getConfigurationEntry.getSubmissionId shouldEqual "submission0"
newConfig.generation shouldEqual 1
logEntry2.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.CONFIGURATION_REJECTION_ENTRY
@ -80,7 +82,7 @@ class KVUtilsConfigSpec extends AnyWordSpec with Matchers {
"reject expired submissions" in KVTest.runTest {
for {
logEntry <- submitConfig(
result <- preExecuteConfig(
minMaxRecordTimeDelta = Duration.ofMinutes(-1),
configModify = { c =>
c.copy(generation = c.generation + 1)
@ -88,8 +90,9 @@ class KVUtilsConfigSpec extends AnyWordSpec with Matchers {
submissionId = Ref.LedgerString.assertFromString("some-submission-id"),
)
} yield {
val logEntry = result.outOfTimeBoundsLogEntry.getOutOfTimeBoundsEntry.getEntry
logEntry.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.CONFIGURATION_REJECTION_ENTRY
logEntry.getConfigurationRejectionEntry.getReasonCase shouldEqual DamlConfigurationRejectionEntry.ReasonCase.TIMED_OUT
logEntry.getConfigurationRejectionEntry.getReasonCase shouldEqual DamlConfigurationRejectionEntry.ReasonCase.REASON_NOT_SET
}
}
@ -99,7 +102,7 @@ class KVUtilsConfigSpec extends AnyWordSpec with Matchers {
for {
// Set a configuration with an authorized participant id
logEntry0 <- submitConfig(
result0 <- preExecuteConfig(
{ c =>
c.copy(
generation = c.generation + 1
@ -112,8 +115,8 @@ class KVUtilsConfigSpec extends AnyWordSpec with Matchers {
// A well authorized submission
//
logEntry1 <- withParticipantId(p0) {
submitConfig(
result1 <- withParticipantId(p0) {
preExecuteConfig(
{ c =>
c.copy(
generation = c.generation + 1
@ -127,8 +130,8 @@ class KVUtilsConfigSpec extends AnyWordSpec with Matchers {
// A badly authorized submission
//
logEntry2 <- withParticipantId(p1) {
submitConfig(
result2 <- withParticipantId(p1) {
preExecuteConfig(
{ c =>
c.copy(
generation = c.generation + 1
@ -139,6 +142,9 @@ class KVUtilsConfigSpec extends AnyWordSpec with Matchers {
}
} yield {
val logEntry0 = result0.successfulLogEntry
val logEntry1 = result1.successfulLogEntry
val logEntry2 = result2.successfulLogEntry
logEntry0.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.CONFIGURATION_ENTRY
logEntry1.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.CONFIGURATION_ENTRY
@ -152,7 +158,7 @@ class KVUtilsConfigSpec extends AnyWordSpec with Matchers {
"reject duplicate" in KVTest.runTest {
for {
logEntry0 <- submitConfig(
result0 <- preExecuteConfig(
{ c =>
c.copy(
generation = c.generation + 1
@ -161,7 +167,7 @@ class KVUtilsConfigSpec extends AnyWordSpec with Matchers {
submissionId = Ref.LedgerString.assertFromString("submission-id-1"),
)
logEntry1 <- submitConfig(
result1 <- preExecuteConfig(
{ c =>
c.copy(
generation = c.generation + 1
@ -171,6 +177,8 @@ class KVUtilsConfigSpec extends AnyWordSpec with Matchers {
)
} yield {
val logEntry0 = result0.successfulLogEntry
val logEntry1 = result1.successfulLogEntry
logEntry0.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.CONFIGURATION_ENTRY
logEntry1.getPayloadCase shouldEqual
DamlLogEntry.PayloadCase.CONFIGURATION_REJECTION_ENTRY
@ -183,7 +191,7 @@ class KVUtilsConfigSpec extends AnyWordSpec with Matchers {
"update metrics" in KVTest.runTest {
for {
//Submit config twice to force one acceptance and one rejection on duplicate
_ <- submitConfig(
_ <- preExecuteConfig(
{ c =>
c.copy(
generation = c.generation + 1
@ -192,7 +200,7 @@ class KVUtilsConfigSpec extends AnyWordSpec with Matchers {
submissionId = Ref.LedgerString.assertFromString("submission-id-1"),
)
_ <- submitConfig(
_ <- preExecuteConfig(
{ c =>
c.copy(
generation = c.generation + 1
@ -204,7 +212,7 @@ class KVUtilsConfigSpec extends AnyWordSpec with Matchers {
// Check that we're updating the metrics (assuming this test at least has been run)
metrics.daml.kvutils.committer.config.accepts.getCount should be >= 1L
metrics.daml.kvutils.committer.config.rejections.getCount should be >= 1L
metrics.daml.kvutils.committer.runTimer("config").getCount should be >= 1L
metrics.daml.kvutils.committer.preExecutionRunTimer("config").getCount should be >= 1L
}
}
}

View File

@ -29,16 +29,18 @@ class KVUtilsPackageSpec extends AnyWordSpec with Matchers with BazelRunfiles {
"be able to submit simple package" in KVTest.runTest {
for {
// NOTE(JM): 'runTest' always uploads 'simpleArchive' by default.
logEntry <- submitArchives("simple-archive-submission-1", simpleArchive).map(_._2)
result <- preExecuteArchives("simple-archive-submission-1", simpleArchive).map(_._2)
archiveState <- getDamlState(
Conversions.packageStateKey(simplePackage.mainPackageId)
)
// Submit again and verify that the uploaded archive didn't appear again.
logEntry2 <- submitArchives("simple-archive-submission-2", simpleArchive)
result2 <- preExecuteArchives("simple-archive-submission-2", simpleArchive)
.map(_._2)
} yield {
val logEntry = result.successfulLogEntry
val logEntry2 = result2.successfulLogEntry
logEntry.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.PACKAGE_UPLOAD_ENTRY
logEntry.getPackageUploadEntry.getArchivesCount shouldEqual 1
@ -53,40 +55,30 @@ class KVUtilsPackageSpec extends AnyWordSpec with Matchers with BazelRunfiles {
"be able to submit model-test.dar" in KVTest.runTest {
for {
logEntry <- submitArchives("model-test-submission", testStablePackages.all: _*).map(_._2)
result <- preExecuteArchives("model-test-submission", testStablePackages.all: _*).map(_._2)
} yield {
val logEntry = result.successfulLogEntry
logEntry.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.PACKAGE_UPLOAD_ENTRY
logEntry.getPackageUploadEntry.getArchivesCount shouldEqual testStablePackages.all.length
}
}
"be able to pre-execute model-test.dar" in KVTest.runTest {
for {
preExecutionResult <- preExecuteArchives(
"model-test-submission",
testStablePackages.all: _*
)
.map(_._2)
actualLogEntry = preExecutionResult.successfulLogEntry
} yield {
actualLogEntry.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.PACKAGE_UPLOAD_ENTRY
actualLogEntry.getPackageUploadEntry.getArchivesCount shouldEqual testStablePackages.all.length
}
}
"reject invalid packages" in KVTest.runTest {
for {
logEntry <- submitArchives("bad-archive-submission", badArchive).map(_._2)
result <- preExecuteArchives("bad-archive-submission", badArchive).map(_._2)
} yield {
val logEntry = result.successfulLogEntry
logEntry.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.PACKAGE_UPLOAD_REJECTION_ENTRY
}
}
"reject duplicate" in KVTest.runTest {
for {
logEntry0 <- submitArchives("simple-archive-submission-1", simpleArchive).map(_._2)
logEntry1 <- submitArchives("simple-archive-submission-1", simpleArchive).map(_._2)
result0 <- preExecuteArchives("simple-archive-submission-1", simpleArchive).map(_._2)
result1 <- preExecuteArchives("simple-archive-submission-1", simpleArchive).map(_._2)
} yield {
val logEntry0 = result0.successfulLogEntry
val logEntry1 = result1.successfulLogEntry
logEntry0.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.PACKAGE_UPLOAD_ENTRY
logEntry1.getPayloadCase shouldEqual
DamlLogEntry.PayloadCase.PACKAGE_UPLOAD_REJECTION_ENTRY
@ -98,13 +90,15 @@ class KVUtilsPackageSpec extends AnyWordSpec with Matchers with BazelRunfiles {
"update metrics" in KVTest.runTest {
for {
//Submit archive twice to force one acceptance and one rejection on duplicate
_ <- submitArchives("simple-archive-submission-1", simpleArchive).map(_._2)
_ <- submitArchives("simple-archive-submission-1", simpleArchive).map(_._2)
_ <- preExecuteArchives("simple-archive-submission-1", simpleArchive).map(_._2)
_ <- preExecuteArchives("simple-archive-submission-1", simpleArchive).map(_._2)
} yield {
// Check that we're updating the metrics (assuming this test at least has been run)
metrics.daml.kvutils.committer.packageUpload.accepts.getCount should be >= 1L
metrics.daml.kvutils.committer.packageUpload.rejections.getCount should be >= 1L
metrics.daml.kvutils.committer.runTimer("package_upload").getCount should be >= 1L
metrics.daml.kvutils.committer
.preExecutionRunTimer("package_upload")
.getCount should be >= 1L
}
}
}

View File

@ -10,10 +10,6 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
class KVUtilsPartySpec extends AnyWordSpec with Matchers {
// TESTS:
// - party allocation rejected if participant id does not match
// - party allocation rejected with bad party string
// - party allocation succeeds
import KVTest._
import TestHelpers._
@ -24,14 +20,6 @@ class KVUtilsPartySpec extends AnyWordSpec with Matchers {
val p0 = mkParticipantId(0)
val p1 = mkParticipantId(1)
"be able to submit a party allocation" in KVTest.runTest {
withParticipantId(p0) {
submitPartyAllocation("ok", "alice", p0).map { logEntry =>
logEntry.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.PARTY_ALLOCATION_ENTRY
}
}
}
"be able to pre-execute a party allocation" in KVTest.runTest {
withParticipantId(p0) {
preExecutePartyAllocation("ok", "alice", p0).map { preExecutionResult =>
@ -42,17 +30,19 @@ class KVUtilsPartySpec extends AnyWordSpec with Matchers {
"reject when participant id does not match" in KVTest.runTest {
withParticipantId(p0) {
submitPartyAllocation("mismatch", "alice", p1)
preExecutePartyAllocation("mismatch", "alice", p1)
}.map { logEntry =>
logEntry.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.PARTY_ALLOCATION_REJECTION_ENTRY
logEntry.successfulLogEntry.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.PARTY_ALLOCATION_REJECTION_ENTRY
}
}
"reject on bad party string" in KVTest.runTest {
for {
logEntry1 <- submitPartyAllocation("empty party", "", p0)
logEntry2 <- submitPartyAllocation("bad party", "%", p0)
result1 <- preExecutePartyAllocation("empty party", "", p0)
result2 <- preExecutePartyAllocation("bad party", "%", p0)
} yield {
val logEntry1 = result1.successfulLogEntry
val logEntry2 = result2.successfulLogEntry
logEntry1.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.PARTY_ALLOCATION_REJECTION_ENTRY
logEntry1.getPartyAllocationRejectionEntry.getReasonCase shouldEqual DamlPartyAllocationRejectionEntry.ReasonCase.INVALID_NAME
logEntry2.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.PARTY_ALLOCATION_REJECTION_ENTRY
@ -63,9 +53,11 @@ class KVUtilsPartySpec extends AnyWordSpec with Matchers {
"reject on duplicate party" in KVTest.runTest {
for {
logEntry1 <- submitPartyAllocation("alice", "alice", p0)
logEntry2 <- submitPartyAllocation("alice again", "alice", p0)
result1 <- preExecutePartyAllocation("alice", "alice", p0)
result2 <- preExecutePartyAllocation("alice again", "alice", p0)
} yield {
val logEntry1 = result1.successfulLogEntry
val logEntry2 = result2.successfulLogEntry
logEntry1.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.PARTY_ALLOCATION_ENTRY
logEntry2.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.PARTY_ALLOCATION_REJECTION_ENTRY
logEntry2.getPartyAllocationRejectionEntry.getReasonCase shouldEqual DamlPartyAllocationRejectionEntry.ReasonCase.ALREADY_EXISTS
@ -74,9 +66,11 @@ class KVUtilsPartySpec extends AnyWordSpec with Matchers {
"reject duplicate submission" in KVTest.runTest {
for {
logEntry0 <- submitPartyAllocation("submission-1", "alice", p0)
logEntry1 <- submitPartyAllocation("submission-1", "bob", p0)
result0 <- preExecutePartyAllocation("submission-1", "alice", p0)
result1 <- preExecutePartyAllocation("submission-1", "bob", p0)
} yield {
val logEntry0 = result0.successfulLogEntry
val logEntry1 = result1.successfulLogEntry
logEntry0.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.PARTY_ALLOCATION_ENTRY
logEntry1.getPayloadCase shouldEqual
DamlLogEntry.PayloadCase.PARTY_ALLOCATION_REJECTION_ENTRY
@ -88,13 +82,15 @@ class KVUtilsPartySpec extends AnyWordSpec with Matchers {
"update metrics" in KVTest.runTest {
for {
//Submit party twice to force one acceptance and one rejection on duplicate
_ <- submitPartyAllocation("submission-1", "alice", p0)
_ <- submitPartyAllocation("submission-1", "bob", p0)
_ <- preExecutePartyAllocation("submission-1", "alice", p0)
_ <- preExecutePartyAllocation("submission-1", "bob", p0)
} yield {
// Check that we're updating the metrics (assuming this test at least has been run)
metrics.daml.kvutils.committer.partyAllocation.accepts.getCount should be >= 1L
metrics.daml.kvutils.committer.partyAllocation.rejections.getCount should be >= 1L
metrics.daml.kvutils.committer.runTimer("party_allocation").getCount should be >= 1L
metrics.daml.kvutils.committer
.preExecutionRunTimer("party_allocation")
.getCount should be >= 1L
}
}
}

View File

@ -4,6 +4,7 @@
package com.daml.ledger.participant.state.kvutils
import java.time.Duration
import com.daml.ledger.participant.state.kvutils.TestHelpers._
import com.daml.ledger.participant.state.kvutils.store.events.DamlTransactionRejectionEntry
import com.daml.ledger.participant.state.kvutils.store.{DamlLogEntry, DamlStateValue}
@ -27,6 +28,7 @@ import com.daml.lf.value.Value.{
ValueVariant,
}
import com.daml.logging.LoggingContext
import com.google.protobuf.Timestamp
import org.scalatest.Inside
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
@ -86,7 +88,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
val seed1 = seed(1)
for {
transaction1 <- runSimpleCommand(alice, seed0, simpleCreateCmd(simplePackage))
result <- submitTransaction(
result <- preExecuteTransaction(
submitter = alice,
transaction = transaction1,
submissionSeed = seed0,
@ -95,7 +97,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
contractId = contractIdOfCreateTransaction(
KeyValueConsumption.logEntryToUpdate(
entryId,
logEntry,
logEntryWithRecordTime(logEntry.successfulLogEntry),
)(loggingContext)
)
@ -114,22 +116,6 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
}
}
"be able to submit a transaction" in KVTest.runTestWithSimplePackage(alice, bob, eve) {
simplePackage =>
val seed = hash(this.getClass.getName)
for {
transaction <- runSimpleCommand(alice, seed, simpleCreateCmd(simplePackage))
logEntry <- submitTransaction(
submitter = alice,
transaction = transaction,
submissionSeed = seed,
).map(_._2)
} yield {
logEntry.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.TRANSACTION_ENTRY
logEntry.getTransactionEntry.hasBlindingInfo shouldBe true
}
}
"be able to pre-execute a transaction" in KVTest.runTestWithSimplePackage(alice, bob, eve) {
simplePackage =>
val seed = hash(this.getClass.getName)
@ -264,7 +250,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
}
}
"reject transaction with out of bounds LET" in KVTest.runTestWithSimplePackage(
"populate out of time bounds write set" in KVTest.runTestWithSimplePackage(
alice,
bob,
eve,
@ -273,7 +259,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
for {
transaction <- runSimpleCommand(alice, seed, simpleCreateCmd(simplePackage))
conf <- getDefaultConfiguration
logEntry <- submitTransaction(
result <- preExecuteTransaction(
submitter = alice,
transaction = transaction,
submissionSeed = seed,
@ -281,8 +267,13 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
)
.map(_._2)
} yield {
logEntry.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.TRANSACTION_REJECTION_ENTRY
logEntry.getTransactionRejectionEntry.getReasonCase shouldEqual DamlTransactionRejectionEntry.ReasonCase.INVALID_LEDGER_TIME
val logEntry = result.outOfTimeBoundsLogEntry
logEntry.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.OUT_OF_TIME_BOUNDS_ENTRY
val outOfTimeBoundsEntry = logEntry.getOutOfTimeBoundsEntry
outOfTimeBoundsEntry.hasEntry shouldBe true
outOfTimeBoundsEntry.getEntry.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.TRANSACTION_REJECTION_ENTRY
// FIXME KVL-1413
outOfTimeBoundsEntry.getEntry.getTransactionRejectionEntry.getReasonCase shouldEqual DamlTransactionRejectionEntry.ReasonCase.REASON_NOT_SET
}
}
@ -297,7 +288,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
.map(i => seed(i))
for {
transaction1 <- runSimpleCommand(alice, seeds.head, simpleCreateCmd(simplePackage))
result <- submitTransaction(
result <- preExecuteTransaction(
submitter = alice,
transaction = transaction1,
submissionSeed = seeds.head,
@ -306,7 +297,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
contractId = contractIdOfCreateTransaction(
KeyValueConsumption.logEntryToUpdate(
entryId,
logEntry,
logEntryWithRecordTime(logEntry.successfulLogEntry),
)(loggingContext)
)
@ -315,20 +306,22 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
seeds(1),
simplePackage.simpleExerciseArchiveCmd(contractId),
)
logEntry2 <- submitTransaction(
result2 <- preExecuteTransaction(
submitter = alice,
transaction = transaction2,
submissionSeed = seeds(1),
).map(_._2)
// Try to double consume.
logEntry3 <- submitTransaction(
result3 <- preExecuteTransaction(
submitter = alice,
transaction = transaction2,
submissionSeed = seeds(1),
).map(_._2)
} yield {
val logEntry2 = result2.successfulLogEntry
val logEntry3 = result3.successfulLogEntry
logEntry2.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.TRANSACTION_ENTRY
logEntry3.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.TRANSACTION_REJECTION_ENTRY
logEntry3.getTransactionRejectionEntry.getReasonCase shouldEqual DamlTransactionRejectionEntry.ReasonCase.EXTERNALLY_INCONSISTENT_KEYS
@ -339,16 +332,18 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
simplePackage =>
val seed = hash(this.getClass.getName)
for {
configEntry <- submitConfig { c =>
configResult <- preExecuteConfig { c =>
c.copy(generation = c.generation + 1)
}
transaction <- runSimpleCommand(alice, seed, simpleCreateCmd(simplePackage))
txEntry <- submitTransaction(
txResult <- preExecuteTransaction(
submitter = alice,
transaction = transaction,
submissionSeed = seed,
).map(_._2)
} yield {
val configEntry = configResult.successfulLogEntry
val txEntry = txResult.successfulLogEntry
configEntry.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.CONFIGURATION_ENTRY
txEntry.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.TRANSACTION_REJECTION_ENTRY
txEntry.getTransactionRejectionEntry.getReasonCase shouldEqual DamlTransactionRejectionEntry.ReasonCase.SUBMITTING_PARTY_NOT_KNOWN_ON_LEDGER
@ -365,13 +360,13 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
val seed = hash(this.getClass.getName)
for {
transaction <- runCommand(alice, seed, command)
txEntry <- submitTransaction(
txEntry <- preExecuteTransaction(
submitter = alice,
transaction = transaction,
submissionSeed = seed,
).map(_._2)
} yield {
txEntry.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.TRANSACTION_ENTRY
txEntry.successfulLogEntry.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.TRANSACTION_ENTRY
}
}
}
@ -382,13 +377,14 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
val seed = hash(this.getClass.getName)
for {
transaction <- runSimpleCommand(alice, seed, simpleCreateCmd(simplePackage))
txEntry1 <- submitTransaction(
txResult <- preExecuteTransaction(
submitter = alice,
transaction = transaction,
submissionSeed = seed,
)
.map(_._2)
} yield {
val txEntry1 = txResult.successfulLogEntry
txEntry1.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.TRANSACTION_REJECTION_ENTRY
txEntry1.getTransactionRejectionEntry.getReasonCase shouldEqual DamlTransactionRejectionEntry.ReasonCase.PARTIES_NOT_KNOWN_ON_LEDGER
}
@ -399,7 +395,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
simplePackage =>
val seed = hash(this.getClass.getName)
for {
configEntry <- submitConfig { c =>
configResult <- preExecuteConfig { c =>
c.copy(generation = c.generation + 1)
}
createTx <- withParticipantId(p1)(
@ -407,15 +403,22 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
)
newParty <- withParticipantId(p1)(allocateParty("unhosted", alice))
txEntry1 <- withParticipantId(p0)(
submitTransaction(submitter = newParty, createTx, seed).map(_._2)
txResult1 <- withParticipantId(p0)(
preExecuteTransaction(submitter = newParty, createTx, seed).map(_._2)
)
txEntry2 <- withParticipantId(p1)(
submitTransaction(submitter = newParty, transaction = createTx, submissionSeed = seed)
txResult2 <- withParticipantId(p1)(
preExecuteTransaction(
submitter = newParty,
transaction = createTx,
submissionSeed = seed,
)
.map(_._2)
)
} yield {
val configEntry = configResult.successfulLogEntry
val txEntry1 = txResult1.successfulLogEntry
val txEntry2 = txResult2.successfulLogEntry
configEntry.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.CONFIGURATION_ENTRY
txEntry1.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.TRANSACTION_REJECTION_ENTRY
txEntry1.getTransactionRejectionEntry.getReasonCase shouldEqual DamlTransactionRejectionEntry.ReasonCase.SUBMITTER_CANNOT_ACT_VIA_PARTICIPANT
@ -430,13 +433,14 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
// Submit a creation of a contract with owner 'Alice', but submit it as 'Bob'.
transaction <- runSimpleCommand(alice, seed, simpleCreateCmd(simplePackage))
bob <- allocateParty("bob", bob)
txEntry <- submitTransaction(
txResult <- preExecuteTransaction(
submitter = bob,
transaction = transaction,
submissionSeed = seed,
)
.map(_._2)
} yield {
val txEntry = txResult.successfulLogEntry
txEntry.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.TRANSACTION_REJECTION_ENTRY
txEntry.getTransactionRejectionEntry.getReasonCase shouldEqual DamlTransactionRejectionEntry.ReasonCase.VALIDATION_FAILURE
}
@ -448,16 +452,19 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
// Submit a creation of a contract with owner 'Alice', but submit it as 'Bob'.
transaction <- runSimpleCommand(alice, seed, simpleCreateCmd(simplePackage))
bob <- allocateParty("bob", bob)
_ <- submitTransaction(submitter = bob, transaction = transaction, submissionSeed = seed)
_ <- preExecuteTransaction(
submitter = bob,
transaction = transaction,
submissionSeed = seed,
)
.map(_._2)
} yield {
val disputed = DamlTransactionRejectionEntry.ReasonCase.VALIDATION_FAILURE
// Check that we're updating the metrics (assuming this test at least has been run)
metrics.daml.kvutils.committer.transaction.accepts.getCount should be >= 1L
metrics.daml.kvutils.committer.transaction.rejection(disputed.name).getCount should be >= 1L
metrics.daml.kvutils.committer.runTimer("transaction").getCount should be >= 1L
metrics.daml.kvutils.committer.preExecutionRunTimer("transaction").getCount should be >= 1L
metrics.daml.kvutils.committer.transaction.interpretTimer.getCount should be >= 1L
metrics.daml.kvutils.committer.transaction.runTimer.getCount should be >= 1L
}
}
@ -476,13 +483,15 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
for {
tx1 <- runSimpleCommand(alice, seeds.head, simpleCreateAndExerciseCmd)
createAndExerciseTx1 <- submitTransaction(alice, tx1, seeds.head).map(_._2)
createAndExerciseTx1Result <- preExecuteTransaction(alice, tx1, seeds.head).map(_._2)
tx2 <- runSimpleCommand(alice, seeds(1), simpleCreateAndExerciseCmd)
createAndExerciseTx2 <- submitTransaction(alice, tx2, seeds(1)).map(_._2)
createAndExerciseTx2Result <- preExecuteTransaction(alice, tx2, seeds(1)).map(_._2)
finalState <- scalaz.State.get[KVTestState]
} yield {
val createAndExerciseTx1 = createAndExerciseTx1Result.successfulLogEntry
val createAndExerciseTx2 = createAndExerciseTx2Result.successfulLogEntry
createAndExerciseTx1.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.TRANSACTION_ENTRY
createAndExerciseTx2.getPayloadCase shouldEqual DamlLogEntry.PayloadCase.TRANSACTION_ENTRY
@ -511,22 +520,22 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
val seed = hash(this.getClass.getName)
for {
transaction <- runSimpleCommand(alice, seed, simpleCreateCmd(simplePackage))
result <- submitTransaction(
result <- preExecuteTransaction(
submitter = alice,
transaction = transaction,
submissionSeed = seed,
)
} yield {
val (entryId, entry) = result
val (entryId, output) = result
// Clear the submitter info from the log entry
val strippedEntry = entry.toBuilder
val strippedEntry = output.successfulLogEntry.toBuilder
strippedEntry.getTransactionEntryBuilder.clearSubmitterInfo
// Process into updates and verify
val updates =
KeyValueConsumption.logEntryToUpdate(
entryId,
strippedEntry.build,
logEntryWithRecordTime(strippedEntry.build),
)(loggingContext)
inside(updates) { case Seq(txAccepted: Update.TransactionAccepted) =>
txAccepted.optCompletionInfo should be(None)
@ -564,6 +573,13 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
}
}
private def logEntryWithRecordTime(
entry: DamlLogEntry
): DamlLogEntry =
entry.toBuilder
.setRecordTime(Timestamp.getDefaultInstance)
.build()
private def preExecuteCreateSimpleContract(
submitter: Ref.Party,
seed: Hash,

View File

@ -10,7 +10,6 @@ import com.daml.ledger.participant.state.kvutils.store.{
DamlStateValue,
}
import com.daml.ledger.participant.state.kvutils.{DamlStateMap, Err, TestHelpers}
import com.daml.lf.data.Time
import com.daml.logging.LoggingContext
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
@ -146,18 +145,6 @@ class CommitContextSpec extends AnyWordSpec with Matchers {
context.getOutputs should have size 0
}
}
"preExecute" should {
"return false in case record time is set" in {
val context = newInstance(recordTime = Some(Time.Timestamp.now()))
context.preExecute shouldBe false
}
"return true in case record time is not set" in {
val context = newInstance(recordTime = None)
context.preExecute shouldBe true
}
}
}
object CommitContextSpec {
@ -174,10 +161,9 @@ object CommitContextSpec {
.build
private def newInstance(
recordTime: Option[Time.Timestamp] = Some(Time.Timestamp.now()),
inputs: DamlStateMap = Map.empty,
inputs: DamlStateMap = Map.empty
) =
CommitContext(inputs, recordTime, TestHelpers.mkParticipantId(1))
CommitContext(inputs, TestHelpers.mkParticipantId(1))
private def newDamlStateMap(keyAndValues: (DamlStateKey, DamlStateValue)*): DamlStateMap =
(for ((key, value) <- keyAndValues)

View File

@ -163,23 +163,6 @@ class CommitterSpec
}
}
"buildLogEntryWithOptionalRecordTime" should {
"set record time in log entry if record time is available" in {
val actualLogEntry =
Committer.buildLogEntryWithOptionalRecordTime(recordTime = Some(aRecordTime), identity)
actualLogEntry.hasRecordTime shouldBe true
actualLogEntry.getRecordTime shouldBe buildTimestamp(aRecordTime)
}
"skip setting record time in log entry when it is not available" in {
val actualLogEntry =
Committer.buildLogEntryWithOptionalRecordTime(recordTime = None, identity)
actualLogEntry.hasRecordTime shouldBe false
}
}
"run" should {
"run init" in {
val initialized = new AtomicBoolean(false)
@ -192,7 +175,11 @@ class CommitterSpec
}
}
committer.run(None, aDamlSubmission, Ref.ParticipantId.assertFromString("test"), Map.empty)
committer.runWithPreExecution(
aDamlSubmission,
Ref.ParticipantId.assertFromString("test"),
Map.empty,
)
initialized.get() shouldBe true
}
@ -229,7 +216,7 @@ class CommitterSpec
"getCurrentConfiguration" should {
"return configuration in case there is one available on the ledger" in {
val inputState = Map(configurationStateKey -> Some(aConfigurationStateValue))
val commitContext = createCommitContext(recordTime = None, inputState)
val commitContext = createCommitContext(inputState)
val (Some(actualConfigurationEntry), actualConfiguration) =
Committer.getCurrentConfiguration(theDefaultConfig, commitContext)
@ -240,7 +227,7 @@ class CommitterSpec
"return default configuration in case there is no configuration on the ledger" in {
val inputState = Map(configurationStateKey -> None)
val commitContext = createCommitContext(recordTime = None, inputState)
val commitContext = createCommitContext(inputState)
val (actualConfigurationEntry, actualConfiguration) =
Committer.getCurrentConfiguration(theDefaultConfig, commitContext)
@ -250,7 +237,7 @@ class CommitterSpec
}
"throw in case configuration key is not declared in the input" in {
val commitContext = createCommitContext(recordTime = None, Map.empty)
val commitContext = createCommitContext(Map.empty)
assertThrows[Err.MissingInputState] {
Committer.getCurrentConfiguration(theDefaultConfig, commitContext)
@ -266,8 +253,7 @@ class CommitterSpec
)
.build
val commitContext = createCommitContext(
recordTime = None,
Map(configurationStateKey -> Some(invalidConfigurationEntry)),
Map(configurationStateKey -> Some(invalidConfigurationEntry))
)
val (actualConfigurationEntry, actualConfiguration) =
@ -280,7 +266,6 @@ class CommitterSpec
}
object CommitterSpec {
private val aRecordTime = Timestamp(100)
private val aDamlSubmission = DamlSubmission.getDefaultInstance
private val aLogEntry = DamlLogEntry.newBuilder
.setPartyAllocationEntry(

View File

@ -5,7 +5,6 @@ package com.daml.ledger.participant.state.kvutils.committer
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.participant.state.kvutils.Conversions.buildTimestamp
import com.daml.ledger.participant.state.kvutils.TestHelpers
import com.daml.ledger.participant.state.kvutils.TestHelpers._
import com.daml.ledger.participant.state.kvutils.wire.DamlConfigurationSubmission
@ -29,40 +28,10 @@ class ConfigCommitterSpec extends AnyWordSpec with Matchers {
ConfigCommitter.Result(aConfigurationSubmission, (None, theDefaultConfig))
"checkTtl" should {
"produce rejection log entry if maximum record time is before record time" in {
val instance = createConfigCommitter(aRecordTime)
val context = createCommitContext(recordTime = Some(aRecordTime.addMicros(1)))
val actual = instance.checkTtl(context, anEmptyResult)
actual match {
case StepContinue(_) => fail()
case StepStop(actualLogEntry) =>
actualLogEntry.hasConfigurationRejectionEntry shouldBe true
actualLogEntry.getConfigurationRejectionEntry.hasTimedOut shouldBe true
actualLogEntry.getConfigurationRejectionEntry.getTimedOut.getMaximumRecordTime shouldBe buildTimestamp(
aRecordTime
)
}
}
"continue if maximum record time is on or after record time" in {
for (maximumRecordTime <- Iterable(aRecordTime, aRecordTime.addMicros(1))) {
val instance = createConfigCommitter(maximumRecordTime)
val context = createCommitContext(recordTime = Some(aRecordTime))
val actual = instance.checkTtl(context, anEmptyResult)
actual match {
case StepContinue(_) => succeed
case StepStop(_) => fail()
}
}
}
"skip checking against maximum record time if record time is not available" in {
val instance = createConfigCommitter(aRecordTime)
val context = createCommitContext(recordTime = None)
val context = createCommitContext()
val actual = instance.checkTtl(context, anEmptyResult)
@ -72,9 +41,9 @@ class ConfigCommitterSpec extends AnyWordSpec with Matchers {
}
}
"set the maximum record time and out-of-time-bounds log entry in the context if record time is not available" in {
"set the maximum record time and out-of-time-bounds log entry in the context" in {
val instance = createConfigCommitter(aRecordTime)
val context = createCommitContext(recordTime = None)
val context = createCommitContext()
instance.checkTtl(context, anEmptyResult)
@ -89,63 +58,25 @@ class ConfigCommitterSpec extends AnyWordSpec with Matchers {
actualConfigurationRejectionEntry.getConfiguration shouldBe aConfigurationSubmission.getConfiguration
}
}
"not set an out-of-time-bounds rejection log entry in case pre-execution is disabled" in {
val instance = createConfigCommitter(theRecordTime)
val context = createCommitContext(recordTime = Some(aRecordTime))
instance.checkTtl(context, anEmptyResult)
context.preExecute shouldBe false
context.outOfTimeBoundsLogEntry shouldBe empty
}
}
"buildLogEntry" should {
"set record time in log entry when it is available" in {
val instance = createConfigCommitter(theRecordTime.addMicros(1000))
val context = createCommitContext(recordTime = Some(theRecordTime))
val actual = instance.buildLogEntry(context, anEmptyResult)
actual match {
case StepContinue(_) => fail()
case StepStop(actualLogEntry) =>
actualLogEntry.hasRecordTime shouldBe true
actualLogEntry.getRecordTime shouldBe buildTimestamp(theRecordTime)
}
}
"skip setting record time in log entry when it is not available" in {
"produce a log entry" in {
val instance = createConfigCommitter(theRecordTime)
val context = createCommitContext(recordTime = None)
val context = createCommitContext()
val actual = instance.buildLogEntry(context, anEmptyResult)
actual match {
case StepContinue(_) => fail()
case StepStop(actualLogEntry) =>
actualLogEntry.hasRecordTime shouldBe false
}
}
"produce a log entry (pre-execution disabled or enabled)" in {
for (recordTimeMaybe <- Iterable(Some(aRecordTime), None)) {
val instance = createConfigCommitter(theRecordTime)
val context = createCommitContext(recordTime = recordTimeMaybe)
val actual = instance.buildLogEntry(context, anEmptyResult)
actual match {
case StepContinue(_) => fail()
case StepStop(actualLogEntry) =>
actualLogEntry.hasConfigurationEntry shouldBe true
val actualConfigurationEntry = actualLogEntry.getConfigurationEntry
actualConfigurationEntry.getSubmissionId shouldBe aConfigurationSubmission.getSubmissionId
actualConfigurationEntry.getParticipantId shouldBe aConfigurationSubmission.getParticipantId
actualConfigurationEntry.getConfiguration shouldBe aConfigurationSubmission.getConfiguration
context.outOfTimeBoundsLogEntry should not be defined
}
actualLogEntry.hasConfigurationEntry shouldBe true
val actualConfigurationEntry = actualLogEntry.getConfigurationEntry
actualConfigurationEntry.getSubmissionId shouldBe aConfigurationSubmission.getSubmissionId
actualConfigurationEntry.getParticipantId shouldBe aConfigurationSubmission.getParticipantId
actualConfigurationEntry.getConfiguration shouldBe aConfigurationSubmission.getConfiguration
context.outOfTimeBoundsLogEntry should not be defined
}
}
}

View File

@ -4,16 +4,17 @@
package com.daml.ledger.participant.state.kvutils.committer
import java.util.UUID
import com.codahale.metrics.MetricRegistry
import com.daml.daml_lf_dev.DamlLf
import com.daml.ledger.participant.state.kvutils.Conversions.buildTimestamp
import com.daml.ledger.participant.state.kvutils.KeyValueCommitting.PreExecutionResult
import com.daml.ledger.participant.state.kvutils.TestHelpers._
import com.daml.ledger.participant.state.kvutils.store.events.PackageUpload.DamlPackageUploadRejectionEntry.ReasonCase.INVALID_PACKAGE
import com.daml.ledger.participant.state.kvutils.store.events.PackageUpload.{
DamlPackageUploadEntry,
DamlPackageUploadRejectionEntry,
}
import com.daml.ledger.participant.state.kvutils.store.{DamlLogEntry, DamlStateKey, DamlStateValue}
import com.daml.ledger.participant.state.kvutils.store.{DamlStateKey, DamlStateValue}
import com.daml.ledger.participant.state.kvutils.wire.DamlSubmission
import com.daml.lf.archive.Decode
import com.daml.lf.archive.testing.Encode
@ -107,18 +108,17 @@ class PackageCommitterSpec extends AnyWordSpec with Matchers with ParallelTestEx
packageCommitter = new PackageCommitter(engine, metrics, validationMode, preloadingMode)
}
def submit(submission: DamlSubmission): (DamlLogEntry, Map[DamlStateKey, DamlStateValue]) = {
val result @ (log2, output1) =
packageCommitter.run(
Some(com.daml.lf.data.Time.Timestamp.now()),
def submit(submission: DamlSubmission): PreExecutionResult = {
val result =
packageCommitter.runWithPreExecution(
submission,
participantId,
Compat.wrapMap(state),
)
if (log2.hasPackageUploadRejectionEntry)
assert(output1.isEmpty)
if (result.successfulLogEntry.hasPackageUploadRejectionEntry)
assert(result.stateUpdates.isEmpty)
else
state ++= output1
state ++= result.stateUpdates
result
}
}
@ -152,26 +152,27 @@ class PackageCommitterSpec extends AnyWordSpec with Matchers with ParallelTestEx
}
private[this] def shouldFailWith(
output: (DamlLogEntry, _),
result: PreExecutionResult,
reason: DamlPackageUploadRejectionEntry.ReasonCase,
msg: String = "",
) = {
output._1.hasPackageUploadRejectionEntry shouldBe true
output._1.getPackageUploadRejectionEntry.getReasonCase shouldBe reason
details(output._1.getPackageUploadRejectionEntry) should include(msg)
val output = result.successfulLogEntry
output.hasPackageUploadRejectionEntry shouldBe true
output.getPackageUploadRejectionEntry.getReasonCase shouldBe reason
details(output.getPackageUploadRejectionEntry) should include(msg)
}
private[this] def shouldSucceed(output: (DamlLogEntry, Map[DamlStateKey, DamlStateValue])) = {
output._1.hasPackageUploadRejectionEntry shouldBe false
output._2 shouldBe Symbol("nonEmpty")
private[this] def shouldSucceed(output: PreExecutionResult) = {
output.successfulLogEntry.hasPackageUploadRejectionEntry shouldBe false
output.stateUpdates shouldBe Symbol("nonEmpty")
}
private[this] def shouldSucceedWith(
output: (DamlLogEntry, Map[DamlStateKey, DamlStateValue]),
output: PreExecutionResult,
committedPackages: Set[Ref.PackageId],
) = {
shouldSucceed(output)
val archives = output._1.getPackageUploadEntry.getArchivesList
val archives = output.successfulLogEntry.getPackageUploadEntry.getArchivesList
archives.size() shouldBe committedPackages.size
val packageIds = archives
.iterator()
@ -196,29 +197,17 @@ class PackageCommitterSpec extends AnyWordSpec with Matchers with ParallelTestEx
.newBuilder()
.setPackageUploadEntry(packageUploadEntryBuilder)
.build()
val output = newCommitter.packageCommitter.run(None, submission, participantId, emptyState)
val output =
newCommitter.packageCommitter.runWithPreExecution(submission, participantId, emptyState)
shouldFailWith(output, INVALID_PACKAGE, "Cannot parse package ID")
}
// Don't need to run the below test cases for all instances of PackageCommitter.
"set record time in log entry if record time is available" in {
val submission1 = buildSubmission(archive1)
val output = newCommitter.packageCommitter.run(
Some(theRecordTime),
submission1,
participantId,
emptyState,
)
shouldSucceed(output)
output._1.hasRecordTime shouldBe true
output._1.getRecordTime shouldBe buildTimestamp(theRecordTime)
}
"skip setting record time in log entry when it is not available" in {
val submission1 = buildSubmission(archive1)
val output = newCommitter.packageCommitter.run(None, submission1, participantId, emptyState)
val output =
newCommitter.packageCommitter.runWithPreExecution(submission1, participantId, emptyState)
shouldSucceed(output)
output._1.hasRecordTime shouldBe false
output.successfulLogEntry.hasRecordTime shouldBe false
}
"filter out already known packages" in {
@ -253,8 +242,7 @@ class PackageCommitterSpec extends AnyWordSpec with Matchers with ParallelTestEx
val committer = newCommitter
val submission1 = buildSubmission(archive1)
val output = committer.packageCommitter.run(
None,
val output = committer.packageCommitter.runWithPreExecution(
submission1,
Ref.ParticipantId.assertFromString("authorizedParticipant"),
emptyState,
@ -537,12 +525,11 @@ class PackageCommitterSpec extends AnyWordSpec with Matchers with ParallelTestEx
def newCommitter = new CommitterWrapper(PackageValidationMode.No, PackagePreloadingMode.No)
"produce an out-of-time-bounds rejection log entry in case pre-execution is enabled" in {
val context = createCommitContext(recordTime = None)
"produce an out-of-time-bounds rejection log entry" in {
val context = createCommitContext()
newCommitter.packageCommitter.buildLogEntry(context, anEmptyResult)
context.preExecute shouldBe true
context.outOfTimeBoundsLogEntry should not be empty
context.outOfTimeBoundsLogEntry.foreach { actual =>
actual.hasRecordTime shouldBe false
@ -551,14 +538,5 @@ class PackageCommitterSpec extends AnyWordSpec with Matchers with ParallelTestEx
actual.getPackageUploadRejectionEntry.getParticipantId shouldBe anEmptyResult.uploadEntry.getParticipantId
}
}
"not set an out-of-time-bounds rejection log entry in case pre-execution is disabled" in {
val context = createCommitContext(recordTime = Some(theRecordTime))
newCommitter.packageCommitter.buildLogEntry(context, anEmptyResult)
context.preExecute shouldBe false
context.outOfTimeBoundsLogEntry shouldBe empty
}
}
}

View File

@ -4,7 +4,7 @@
package com.daml.ledger.participant.state.kvutils.committer
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.participant.state.kvutils.TestHelpers.{createCommitContext, theRecordTime}
import com.daml.ledger.participant.state.kvutils.TestHelpers.createCommitContext
import com.daml.ledger.participant.state.kvutils.store.events.DamlPartyAllocationEntry
import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
@ -20,13 +20,12 @@ class PartyAllocationCommitterSpec extends AnyWordSpec with Matchers {
.setParticipantId("a participant")
"buildLogEntry" should {
"produce an out-of-time-bounds rejection log entry in case pre-execution is enabled" in {
"produce an out-of-time-bounds rejection log entry" in {
val instance = new PartyAllocationCommitter(metrics)
val context = createCommitContext(recordTime = None)
val context = createCommitContext()
instance.buildLogEntry(context, aPartyAllocationEntry)
context.preExecute shouldBe true
context.outOfTimeBoundsLogEntry should not be empty
context.outOfTimeBoundsLogEntry.foreach { actual =>
actual.hasRecordTime shouldBe false
@ -36,14 +35,5 @@ class PartyAllocationCommitterSpec extends AnyWordSpec with Matchers {
}
}
"not set an out-of-time-bounds rejection log entry in case pre-execution is disabled" in {
val instance = new PartyAllocationCommitter(metrics)
val context = createCommitContext(recordTime = Some(theRecordTime))
instance.buildLogEntry(context, aPartyAllocationEntry)
context.preExecute shouldBe false
context.outOfTimeBoundsLogEntry shouldBe empty
}
}
}

View File

@ -56,40 +56,81 @@ class CommandDeduplicationSpec
CommandDeduplication.setDeduplicationEntryStep(theDefaultConfig)
private val timestamp: Timestamp = Timestamp.now()
"deduplicateCommand" when {
Map[String, (Timestamp => Option[DamlStateValue]) => (Timestamp, CommitContext)](
"pre-execution" -> (dedupValueBuilder => {
val dedupValue = dedupValueBuilder(timestamp)
val commitContext = createCommitContext(None, Map(aDedupKey -> dedupValue))
commitContext.minimumRecordTime = Some(timestamp)
commitContext.maximumRecordTime = Some(Timestamp.Epoch)
timestamp -> commitContext
}),
"normal-execution" -> (dedupValueBuilder => {
val dedupValue = dedupValueBuilder(timestamp)
val commitContext = createCommitContext(Some(timestamp), Map(aDedupKey -> dedupValue))
timestamp -> commitContext
}),
).foreach { case (key, contextBuilder) =>
key should {
"continue if no deduplication entry could be found" in {
val (_, context) = contextBuilder(_ => None)
"deduplicating command" when {
deduplicationStepContinues(context)
"having no deduplication entry" should {
"continue if no deduplication entry could be found" in {
val (_, context) = contextBuilder(_ => None)
deduplicationStepContinues(context)
}
"continue if deduplication entry has no value set" in {
val (_, context) = contextBuilder(_ => Some(newDedupValue(identity)))
deduplicationStepContinues(context)
}
}
"using deduplicate until" should {
"continue if record time is after deduplication time in case a deduplication entry is found" in {
val (_, context) = contextBuilder(timestamp =>
Some(
newDedupValue(
_.setDeduplicatedUntil(buildTimestamp(timestamp.subtract(Duration.ofSeconds(1))))
)
)
)
deduplicationStepContinues(context)
}
"produce rejection log entry in case record time is on or before deduplication time" in {
for (
durationToAdd <- Iterable(
Duration.ZERO,
Duration.ofSeconds(1),
)
) {
val (_, context) = contextBuilder(timestamp =>
Some(
newDedupValue(
_.setDeduplicatedUntil(buildTimestamp(timestamp.add(durationToAdd)))
)
)
)
deduplicateStepHasTransactionRejectionEntry(context)
}
}
}
"continue if deduplication entry has no value set" in {
val (_, context) = contextBuilder(_ => Some(newDedupValue(identity)))
deduplicationStepContinues(context)
}
"using deduplicate until" should {
"using record time" should {
forAll(
Table[
String,
Timestamp => DamlCommandDedupValue.Builder => DamlCommandDedupValue.Builder,
](
"identifier" -> "time setter",
"record time" -> (timestamp =>
builder => builder.setRecordTime(buildTimestamp(timestamp))
),
"record time bounds" -> (timestamp =>
builder =>
builder.setRecordTimeBounds(
buildPreExecutionDeduplicationBounds(
timestamp.subtract(deduplicationDuration),
timestamp,
)
)
),
)
) { case (identifier, timeSetter) =>
identifier should {
"continue if record time is after deduplication time in case a deduplication entry is found" in {
val (_, context) = contextBuilder(timestamp =>
Some(
newDedupValue(
_.setDeduplicatedUntil(buildTimestamp(timestamp.subtract(Duration.ofSeconds(1))))
timeSetter(timestamp.subtract(deduplicationDuration.plusMillis(1)))
)
)
)
@ -97,9 +138,9 @@ class CommandDeduplicationSpec
deduplicationStepContinues(context)
}
"produce rejection log entry in case record time is on or before deduplication time" in {
"produce rejection log entry in case transaction timestamp is on or before deduplication time" in {
for (
durationToAdd <- Iterable(
durationToSubstractFromDeduplicationDuration <- Iterable(
Duration.ZERO,
Duration.ofSeconds(1),
)
@ -107,136 +148,87 @@ class CommandDeduplicationSpec
val (_, context) = contextBuilder(timestamp =>
Some(
newDedupValue(
_.setDeduplicatedUntil(buildTimestamp(timestamp.add(durationToAdd)))
timeSetter(
timestamp.subtract(
deduplicationDuration.minus(
durationToSubstractFromDeduplicationDuration
)
)
)
)
)
)
deduplicateStepHasTransactionRejectionEntry(context)
}
}
}
"using record time" should {
forAll(
Table[
String,
Timestamp => DamlCommandDedupValue.Builder => DamlCommandDedupValue.Builder,
](
"identifier" -> "time setter",
"record time" -> (timestamp =>
builder => builder.setRecordTime(buildTimestamp(timestamp))
),
"record time bounds" -> (timestamp =>
builder =>
builder.setRecordTimeBounds(
buildPreExecutionDeduplicationBounds(
timestamp.subtract(deduplicationDuration),
timestamp,
)
)
),
"include the submission id in the rejection" in {
val submissionId = "submissionId"
val (_, context) = contextBuilder(timestamp =>
Some(
newDedupValue(builder =>
timeSetter(timestamp)(builder.setSubmissionId(submissionId))
)
)
)
) { case (identifier, timeSetter) =>
identifier should {
"continue if record time is after deduplication time in case a deduplication entry is found" in {
val (_, context) = contextBuilder(timestamp =>
Some(
newDedupValue(
timeSetter(timestamp.subtract(deduplicationDuration.plusMillis(1)))
)
)
)
deduplicationStepContinues(context)
}
"produce rejection log entry in case transaction timestamp is on or before deduplication time" in {
for (
durationToSubstractFromDeduplicationDuration <- Iterable(
Duration.ZERO,
Duration.ofSeconds(1),
)
) {
val (_, context) = contextBuilder(timestamp =>
Some(
newDedupValue(
timeSetter(
timestamp.subtract(
deduplicationDuration.minus(
durationToSubstractFromDeduplicationDuration
)
)
)
)
)
)
deduplicateStepHasTransactionRejectionEntry(context)
}
}
"include the submission id in the rejection" in {
val submissionId = "submissionId"
val (_, context) = contextBuilder(timestamp =>
Some(
newDedupValue(builder =>
timeSetter(timestamp)(builder.setSubmissionId(submissionId))
)
)
)
val rejection = deduplicateStepHasTransactionRejectionEntry(context)
rejection.getDuplicateCommand.getSubmissionId shouldBe submissionId
}
}
val rejection = deduplicateStepHasTransactionRejectionEntry(context)
rejection.getDuplicateCommand.getSubmissionId shouldBe submissionId
}
}
}
}
"using pre-execution" should {
"produce rejection log entry when there's an overlap between previous transaction max-record-time and current transaction min-record-time" in {
val dedupValue = newDedupValue(
_.setRecordTimeBounds(buildPreExecutionDeduplicationBounds(timestamp, timestamp))
)
val commitContext = createCommitContext(Map(aDedupKey -> Some(dedupValue)))
commitContext.minimumRecordTime = Some(timestamp.subtract(Duration.ofMillis(1)))
commitContext.maximumRecordTime = Some(Timestamp.Epoch)
"produce rejection log entry when there's an overlap between previous transaction max-record-time and current transaction min-record-time" in {
val dedupValue = newDedupValue(
_.setRecordTimeBounds(buildPreExecutionDeduplicationBounds(timestamp, timestamp))
deduplicateStepHasTransactionRejectionEntry(commitContext)
}
"return the command deduplication duration as deduplication duration when this exceeds the max delta between record times" in {
val dedupValue = newDedupValue(
_.setRecordTimeBounds(buildPreExecutionDeduplicationBounds(timestamp, timestamp))
)
val commitContext = createCommitContext(Map(aDedupKey -> Some(dedupValue)))
commitContext.minimumRecordTime = Some(timestamp)
commitContext.maximumRecordTime = Some(timestamp.add(Duration.ofSeconds(1)))
val rejectionEntry = deduplicateStepHasTransactionRejectionEntry(commitContext)
rejectionEntry.getSubmitterInfo.getDeduplicationDuration shouldBe buildDuration(
deduplicationDuration
)
}
"the deduplication duration is the delta between records when this exceeds the deduplication duration sent in the command" in {
val dedupValue = newDedupValue(
_.setRecordTimeBounds(
buildPreExecutionDeduplicationBounds(timestamp, timestamp.add(Duration.ofSeconds(1)))
)
val commitContext = createCommitContext(None, Map(aDedupKey -> Some(dedupValue)))
commitContext.minimumRecordTime = Some(timestamp.subtract(Duration.ofMillis(1)))
commitContext.maximumRecordTime = Some(Timestamp.Epoch)
)
val commitContext = createCommitContext(Map(aDedupKey -> Some(dedupValue)))
val deltaBetweenRecords = deduplicationDuration.plusMillis(1)
commitContext.minimumRecordTime = Some(timestamp)
commitContext.maximumRecordTime = Some(timestamp.add(deltaBetweenRecords))
deduplicateStepHasTransactionRejectionEntry(commitContext)
}
"return the command deduplication duration as deduplication duration when this exceeds the max delta between record times" in {
val dedupValue = newDedupValue(
_.setRecordTimeBounds(buildPreExecutionDeduplicationBounds(timestamp, timestamp))
)
val commitContext = createCommitContext(None, Map(aDedupKey -> Some(dedupValue)))
commitContext.minimumRecordTime = Some(timestamp)
commitContext.maximumRecordTime = Some(timestamp.add(Duration.ofSeconds(1)))
val rejectionEntry = deduplicateStepHasTransactionRejectionEntry(commitContext)
rejectionEntry.getSubmitterInfo.getDeduplicationDuration shouldBe buildDuration(
deduplicationDuration
)
}
"the deduplication duration is the delta between records when this exceeds the deduplication duration sent in the command" in {
val dedupValue = newDedupValue(
_.setRecordTimeBounds(
buildPreExecutionDeduplicationBounds(timestamp, timestamp.add(Duration.ofSeconds(1)))
)
)
val commitContext = createCommitContext(None, Map(aDedupKey -> Some(dedupValue)))
val deltaBetweenRecords = deduplicationDuration.plusMillis(1)
commitContext.minimumRecordTime = Some(timestamp)
commitContext.maximumRecordTime = Some(timestamp.add(deltaBetweenRecords))
val rejectionEntry = deduplicateStepHasTransactionRejectionEntry(commitContext)
rejectionEntry.getSubmitterInfo.getDeduplicationDuration shouldBe buildDuration(
deltaBetweenRecords
)
}
val rejectionEntry = deduplicateStepHasTransactionRejectionEntry(commitContext)
rejectionEntry.getSubmitterInfo.getDeduplicationDuration shouldBe buildDuration(
deltaBetweenRecords
)
}
def contextBuilder(
dedupValueBuilder: Timestamp => Option[DamlStateValue]
): (Timestamp, CommitContext) = {
val dedupValue = dedupValueBuilder(timestamp)
val commitContext = createCommitContext(Map(aDedupKey -> dedupValue))
commitContext.minimumRecordTime = Some(timestamp)
commitContext.maximumRecordTime = Some(Timestamp.Epoch)
timestamp -> commitContext
}
}
@ -311,49 +303,6 @@ class CommandDeduplicationSpec
}
}
"using normal execution" should {
"set the record time in the committer context values" in {
val recordTime = timestamp
val (context, transactionEntrySummary) =
buildContextAndTransaction(
submissionTime,
_.setDeduplicationDuration(Conversions.buildDuration(deduplicationDuration)),
Some(recordTime),
)
setDeduplicationEntryStep(context, transactionEntrySummary)
parseTimestamp(
deduplicateValueStoredInContext(context, transactionEntrySummary)
.map(
_.getRecordTime
)
.value
) shouldBe recordTime
}
"set pruning time based on record time" in {
val recordTime = timestamp
val (context, transactionEntrySummary) =
buildContextAndTransaction(
submissionTime,
identity,
Some(recordTime),
)
setDeduplicationEntryStep(context, transactionEntrySummary)
parseTimestamp(
deduplicateValueStoredInContext(context, transactionEntrySummary)
.map(
_.getPrunableFrom
)
.value
) shouldBe recordTime
.add(theDefaultConfig.maxDeduplicationDuration)
.add(theDefaultConfig.timeModel.minSkew)
.add(theDefaultConfig.timeModel.maxSkew)
}
}
"set the submission id in the dedup value" in {
val submissionId = "submissionId"
val (context, transactionEntrySummary) =
@ -361,8 +310,9 @@ class CommandDeduplicationSpec
submissionTime,
_.setDeduplicationDuration(Conversions.buildDuration(deduplicationDuration))
.setSubmissionId(submissionId),
Some(timestamp),
)
context.minimumRecordTime = Some(timestamp)
context.maximumRecordTime = Some(timestamp)
setDeduplicationEntryStep(context, transactionEntrySummary)
deduplicateValueStoredInContext(context, transactionEntrySummary)
.map(
@ -446,9 +396,8 @@ object CommandDeduplicationSpec {
private def buildContextAndTransaction(
submissionTime: protobuf.Timestamp,
submitterInfoAugmenter: DamlSubmitterInfo.Builder => DamlSubmitterInfo.Builder,
recordTime: Option[Timestamp] = None,
) = {
val context = createCommitContext(recordTime)
val context = createCommitContext()
context.set(Conversions.configurationStateKey, aDamlConfigurationStateValue)
val transactionEntrySummary = DamlTransactionEntrySummary(
aDamlTransactionEntry.toBuilder

View File

@ -85,7 +85,7 @@ class TimeBoundsBindingStepSpec extends AnyWordSpec with Matchers {
"mark config as accessed in context" in {
val commitContext =
createCommitContext(recordTime = None, inputWithConfiguration)
createCommitContext(inputWithConfiguration)
step.apply(commitContext, aTransactionEntrySummary)
@ -94,7 +94,7 @@ class TimeBoundsBindingStepSpec extends AnyWordSpec with Matchers {
}
private def contextWithTimeModelAndEmptyCommandDeduplication() =
createCommitContext(recordTime = None, inputs = inputWithTimeModelAndEmptyCommandDeduplication)
createCommitContext(inputs = inputWithTimeModelAndEmptyCommandDeduplication)
private def createProtobufTimestamp(seconds: Long): timestamp.Timestamp = {
timestamp.Timestamp(seconds)

View File

@ -5,7 +5,6 @@ package com.daml.ledger.participant.state.kvutils.committer.transaction
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.participant.state.kvutils.Conversions.buildTimestamp
import com.daml.ledger.participant.state.kvutils.Err.MissingInputState
import com.daml.ledger.participant.state.kvutils.TestHelpers._
import com.daml.ledger.participant.state.kvutils.committer.{StepContinue, StepStop}
@ -23,14 +22,13 @@ import com.daml.ledger.participant.state.kvutils.store.{
}
import com.daml.ledger.participant.state.kvutils.wire.DamlSubmission
import com.daml.ledger.participant.state.kvutils.{Conversions, Err, KeyValueCommitting, committer}
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.data.{ImmArray, Ref}
import com.daml.lf.engine.Engine
import com.daml.lf.kv.contracts.ContractConversions
import com.daml.lf.transaction._
import com.daml.lf.transaction.test.TransactionBuilder
import com.daml.lf.value.Value.{ContractId, ValueRecord, ValueText}
import com.daml.lf.value.Value
import com.daml.lf.value.Value.{ContractId, ValueRecord, ValueText}
import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
import com.google.protobuf.{ByteString, Duration}
@ -59,7 +57,6 @@ class TransactionCommitterSpec
"authorizeSubmitters" should {
"reject a submission when any of the submitters keys is not present in the input state" in {
val context = createCommitContext(
recordTime = None,
inputs = createInputs(
Alice -> Some(hostedParty(Alice)),
Bob -> Some(hostedParty(Bob)),
@ -76,7 +73,6 @@ class TransactionCommitterSpec
"reject a submission when any of the submitters is not known" in {
val context = createCommitContext(
recordTime = None,
inputs = createInputs(
Alice -> Some(hostedParty(Alice)),
Bob -> None,
@ -95,7 +91,6 @@ class TransactionCommitterSpec
"reject a submission when any of the submitters' participant id is incorrect" in {
val context = createCommitContext(
recordTime = None,
inputs = createInputs(
Alice -> Some(hostedParty(Alice)),
Bob -> Some(notHostedParty(Bob)),
@ -116,7 +111,6 @@ class TransactionCommitterSpec
"allow a submission when all of the submitters are hosted on the participant" in {
val context = createCommitContext(
recordTime = None,
inputs = createInputs(
Alice -> Some(hostedParty(Alice)),
Bob -> Some(hostedParty(Bob)),
@ -133,7 +127,7 @@ class TransactionCommitterSpec
"trimUnnecessaryNodes" should {
"remove `Fetch`, `LookupByKey`, and `Rollback` nodes from the transaction tree" in {
val context = createCommitContext(recordTime = None)
val context = createCommitContext()
val actual = transactionCommitter.trimUnnecessaryNodes(
context,
@ -168,7 +162,7 @@ class TransactionCommitterSpec
}
"fail on a non-parsable transaction" in {
val context = createCommitContext(recordTime = None)
val context = createCommitContext()
val brokenEntry =
aDamlTransactionEntry.toBuilder.setRawTransaction(ByteString.copyFromUtf8("wrong")).build()
@ -179,7 +173,7 @@ class TransactionCommitterSpec
}
"fail on a transaction with invalid roots" in {
val context = createCommitContext(recordTime = None)
val context = createCommitContext()
val brokenEntry = aDamlTransactionEntry.toBuilder
.setRawTransaction(
aRichNodeTreeTransaction.toBuilder.addRoots("non-existent").build().toByteString
@ -193,34 +187,12 @@ class TransactionCommitterSpec
}
"buildLogEntry" should {
"set record time in log entry when it is available" in {
val context = createCommitContext(recordTime = Some(theRecordTime))
val actual = TransactionCommitter.buildLogEntry(aTransactionEntrySummary, context)
actual.hasRecordTime shouldBe true
actual.getRecordTime shouldBe buildTimestamp(theRecordTime)
actual.hasTransactionEntry shouldBe true
actual.getTransactionEntry shouldBe aTransactionEntrySummary.submission
}
"skip setting record time in log entry when it is not available" in {
val context = createCommitContext(recordTime = None)
val actual =
TransactionCommitter.buildLogEntry(aTransactionEntrySummary, context)
actual.hasRecordTime shouldBe false
actual.hasTransactionEntry shouldBe true
actual.getTransactionEntry shouldBe aTransactionEntrySummary.submission
}
"produce an out-of-time-bounds rejection log entry in case pre-execution is enabled" in {
val context = createCommitContext(recordTime = None)
"produce an out-of-time-bounds rejection log entry" in {
val context = createCommitContext()
TransactionCommitter.buildLogEntry(aTransactionEntrySummary, context)
context.preExecute shouldBe true
context.outOfTimeBoundsLogEntry should not be empty
context.outOfTimeBoundsLogEntry.foreach { actual =>
actual.hasRecordTime shouldBe false
@ -231,19 +203,11 @@ class TransactionCommitterSpec
}
}
"not set an out-of-time-bounds rejection log entry in case pre-execution is disabled" in {
val context = createCommitContext(recordTime = Some(aRecordTime))
TransactionCommitter.buildLogEntry(aTransactionEntrySummary, context)
context.preExecute shouldBe false
context.outOfTimeBoundsLogEntry shouldBe empty
}
}
"blind" should {
"always set blindingInfo" in {
val context = createCommitContext(recordTime = None)
val context = createCommitContext()
context.set(Conversions.configurationStateKey, aDamlConfigurationStateValue)
val builder = TransactionBuilder()
@ -299,7 +263,6 @@ class TransactionCommitterSpec
"a submitting party is not known" in {
val context = createCommitContext(
recordTime = None,
inputs = createInputs(
Alice -> Some(hostedParty(Alice)),
Bob -> None,
@ -336,7 +299,6 @@ class TransactionCommitterSpec
.build
)
val context = createCommitContext(
recordTime = None,
inputs = createInputs(
Alice -> Some(hostedParty(Alice))
) + configurationInput + commandDeduplicationInput,
@ -414,7 +376,6 @@ object TransactionCommitterSpec {
private val OtherParticipantId = 1
private val aDamlTransactionEntry = createEmptyTransactionEntry(List("aSubmitter"))
private val aTransactionEntrySummary = DamlTransactionEntrySummary(aDamlTransactionEntry)
private val aRecordTime = Timestamp(100)
private val aDummyValue = TransactionBuilder.record("field" -> "value")
private val aKey = "key"
private val aKeyMaintainer = "maintainer"

View File

@ -129,11 +129,10 @@ class CommitterModelConformanceValidatorSpec
validator.createValidationStep(rejections)(
createCommitContext(
None,
Map(
inputContractIdStateKey -> Some(makeContractIdStateValue()),
contractIdStateKey1 -> Some(makeContractIdStateValue()),
),
)
),
aTransactionEntry,
) shouldBe StepContinue(aTransactionEntry)
@ -163,11 +162,10 @@ class CommitterModelConformanceValidatorSpec
val step = validator
.createValidationStep(rejections)(
createCommitContext(
None,
Map(
inputContractIdStateKey -> Some(makeContractIdStateValue()),
contractIdStateKey1 -> Some(makeContractIdStateValue()),
),
)
),
aTransactionEntry,
)
@ -181,7 +179,7 @@ class CommitterModelConformanceValidatorSpec
val step = validator
.createValidationStep(rejections)(
createCommitContext(None),
createCommitContext(),
aTransactionEntry,
)
inside(step) { case StepStop(logEntry) =>
@ -195,7 +193,7 @@ class CommitterModelConformanceValidatorSpec
val step = validator
.createValidationStep(rejections)(
createCommitContext(None),
createCommitContext(),
aTransactionEntry,
)
inside(step) { case StepStop(logEntry) =>
@ -238,12 +236,11 @@ class CommitterModelConformanceValidatorSpec
"lookupContract" should {
"return Some when a contract is present in the current state" in {
val commitContext = createCommitContext(
None,
Map(
inputContractIdStateKey -> Some(
aContractIdStateValue
)
),
)
)
val contractInstance = defaultValidator.lookupContract(commitContext)(
@ -256,8 +253,7 @@ class CommitterModelConformanceValidatorSpec
"throw if a contract does not exist in the current state" in {
an[Err.MissingInputState] should be thrownBy defaultValidator.lookupContract(
createCommitContext(
None,
Map.empty,
Map.empty
)
)(Conversions.decodeContractId(inputContractId.coid))
}
@ -290,8 +286,7 @@ class CommitterModelConformanceValidatorSpec
.setArchive(anArchive.byteString)
.build()
val commitContext = createCommitContext(
None,
Map(stateKey -> Some(stateValue)),
Map(stateKey -> Some(stateValue))
)
val maybePackage = defaultValidator.lookupPackage(commitContext)(aPackageId)
@ -302,8 +297,7 @@ class CommitterModelConformanceValidatorSpec
"fail when the package is missing" in {
an[Err.MissingInputState] should be thrownBy defaultValidator.lookupPackage(
createCommitContext(
None,
Map.empty,
Map.empty
)
)(Ref.PackageId.assertFromString("nonexistentPackageId"))
}
@ -320,8 +314,7 @@ class CommitterModelConformanceValidatorSpec
forAll(stateValues) { stateValue =>
an[Err.ArchiveDecodingFailed] should be thrownBy defaultValidator.lookupPackage(
createCommitContext(
None,
Map(stateKey -> Some(stateValue)),
Map(stateKey -> Some(stateValue))
)
)(Ref.PackageId.assertFromString("invalidPackage"))
}
@ -334,11 +327,10 @@ class CommitterModelConformanceValidatorSpec
.validateCausalMonotonicity(
aTransactionEntry,
createCommitContext(
None,
Map(
inputContractIdStateKey -> Some(makeContractIdStateValue()),
contractIdStateKey1 -> Some(aStateValueActiveAt(ledgerEffectiveTime.minusSeconds(1))),
),
)
),
rejections,
) shouldBe StepContinue(aTransactionEntry)
@ -349,11 +341,10 @@ class CommitterModelConformanceValidatorSpec
.validateCausalMonotonicity(
aTransactionEntry,
createCommitContext(
None,
Map(
inputContractIdStateKey -> Some(makeContractIdStateValue()),
contractIdStateKey1 -> Some(aStateValueActiveAt(ledgerEffectiveTime.plusSeconds(1))),
),
)
),
rejections,
)
@ -373,7 +364,7 @@ class CommitterModelConformanceValidatorSpec
"accept a transaction in case an input contract is non-existent (possibly because it has been pruned)" in {
val step = defaultValidator.validateCausalMonotonicity(
aTransactionEntry,
createCommitContext(None, Map.empty), // No contract is present in the commit context.
createCommitContext(Map.empty), // No contract is present in the commit context.
rejections,
)

View File

@ -4,23 +4,16 @@
package com.daml.ledger.participant.state.kvutils.committer.transaction.validation
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.participant.state.kvutils.Err
import com.daml.ledger.participant.state.kvutils.TestHelpers.{
createCommitContext,
createEmptyTransactionEntry,
theDefaultConfig,
}
import com.daml.ledger.participant.state.kvutils.committer.transaction.{
DamlTransactionEntrySummary,
Rejections,
}
import com.daml.ledger.participant.state.kvutils.committer.{StepContinue, StepStop}
import com.daml.ledger.participant.state.kvutils.store.DamlStateValue
import com.daml.ledger.participant.state.kvutils.store.events.{
DamlConfigurationEntry,
DamlTransactionEntry,
}
import com.daml.ledger.participant.state.kvutils.{Conversions, Err}
import com.daml.ledger.participant.state.kvutils.store.events.DamlTransactionEntry
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
@ -34,18 +27,16 @@ class LedgerTimeValidatorSpec extends AnyWordSpec with Matchers {
private val metrics = new Metrics(new MetricRegistry)
private val rejections = new Rejections(metrics)
private val ledgerTimeValidationStep =
new LedgerTimeValidator(theDefaultConfig).createValidationStep(rejections)
new LedgerTimeValidator().createValidationStep(rejections)
private val aDamlTransactionEntry: DamlTransactionEntry = createEmptyTransactionEntry(
List("aSubmitter")
)
private val aTransactionEntrySummary = DamlTransactionEntrySummary(aDamlTransactionEntry)
private val emptyConfigurationStateValue: DamlStateValue =
defaultConfigurationStateValueBuilder().build
"LedgerTimeValidator" can {
"when the record time is not available" should {
"compute and correctly set out-of-time-bounds log entry with min/max record time available in the committer context" in {
val context = createCommitContext(recordTime = None)
val context = createCommitContext()
context.minimumRecordTime = Some(Timestamp.now())
context.maximumRecordTime = Some(Timestamp.now())
ledgerTimeValidationStep.apply(
@ -61,7 +52,7 @@ class LedgerTimeValidatorSpec extends AnyWordSpec with Matchers {
}
"fail if minimum record time is not set" in {
val context = createCommitContext(recordTime = None)
val context = createCommitContext()
context.maximumRecordTime = Some(Timestamp.now())
an[Err.InternalError] shouldBe thrownBy(
ledgerTimeValidationStep.apply(
@ -72,7 +63,7 @@ class LedgerTimeValidatorSpec extends AnyWordSpec with Matchers {
}
"fail if maximum record time is not set" in {
val context = createCommitContext(recordTime = None)
val context = createCommitContext()
context.minimumRecordTime = Some(Timestamp.now())
an[Err.InternalError] shouldBe thrownBy(
ledgerTimeValidationStep.apply(
@ -82,47 +73,5 @@ class LedgerTimeValidatorSpec extends AnyWordSpec with Matchers {
)
}
}
"produce rejection log entry if record time is outside of ledger effective time bounds" in {
val recordTime = Timestamp.now()
val recordTimeInstant = recordTime.toInstant
val lowerBound =
recordTimeInstant
.minus(theDefaultConfig.timeModel.minSkew)
.minusMillis(1)
val upperBound =
recordTimeInstant.plus(theDefaultConfig.timeModel.maxSkew).plusMillis(1)
val inputWithDeclaredConfig =
Map(Conversions.configurationStateKey -> Some(emptyConfigurationStateValue))
for (ledgerEffectiveTime <- Iterable(lowerBound, upperBound)) {
val context =
createCommitContext(recordTime = Some(recordTime), inputs = inputWithDeclaredConfig)
val transactionEntrySummary = DamlTransactionEntrySummary(
aDamlTransactionEntry.toBuilder
.setLedgerEffectiveTime(
com.google.protobuf.Timestamp.newBuilder
.setSeconds(ledgerEffectiveTime.getEpochSecond)
.setNanos(ledgerEffectiveTime.getNano)
)
.build
)
val actual = ledgerTimeValidationStep.apply(context, transactionEntrySummary)
actual match {
case StepContinue(_) => fail()
case StepStop(actualLogEntry) =>
actualLogEntry.hasTransactionRejectionEntry shouldBe true
}
}
}
}
private def defaultConfigurationStateValueBuilder(): DamlStateValue.Builder =
DamlStateValue.newBuilder
.setConfigurationEntry(
DamlConfigurationEntry.newBuilder
.setConfiguration(Configuration.encode(theDefaultConfig))
)
}

View File

@ -131,11 +131,10 @@ class TransactionConsistencyValidatorSpec extends AnyWordSpec with Matchers {
val globalCid = freshContractId
val globalCreate = newCreateNodeWithFixedKey(globalCid)
val context = createCommitContext(
recordTime = None,
inputs = Map(
makeContractIdStateKey(globalCid.coid) -> Some(makeContractIdStateValue()),
contractStateKey(conflictingKey) -> Some(contractKeyStateValue(globalCid.coid)),
),
)
)
val builder = TransactionBuilder()
builder.add(archive(globalCreate, Set("Alice")))
@ -243,7 +242,6 @@ class TransactionConsistencyValidatorSpec extends AnyWordSpec with Matchers {
val globalCid = freshContractId
val globalCreate = newCreateNodeWithFixedKey(globalCid)
val context = createCommitContext(
recordTime = None,
inputs = Map(
makeContractIdStateKey(globalCid.coid) -> Some(
makeContractIdStateValue().toBuilder
@ -252,7 +250,7 @@ class TransactionConsistencyValidatorSpec extends AnyWordSpec with Matchers {
)
.build()
)
),
)
)
val builder = TransactionBuilder()
builder.add(archive(globalCreate, Set("Alice")))
@ -334,10 +332,9 @@ object TransactionConsistencyValidatorSpec {
contractKeyIdPairs: (DamlContractKey, Option[String])*
): CommitContext =
createCommitContext(
recordTime = None,
inputs = contractKeyIdPairs.map { case (key, id) =>
contractStateKey(key) -> id.map(contractKeyStateValue)
}.toMap,
}.toMap
)
private def contractStateKey(contractKey: DamlContractKey): DamlStateKey =