Convert transaction committer to applicative style (#5901)

* Convert transaction committer to applicative style

* few renames following code review

* Test that `TransactionCommitter` won't overflow when performing many reads

* Remove useless `TransactionCommitterSpec`

* Apply suggestions from code review

Apply suggestions

Co-authored-by: Miklos <57664299+miklos-da@users.noreply.github.com>

* address more review comments

Co-authored-by: Fabio Tudone <fabio.tudone@digitalasset.com>
Co-authored-by: Miklos <57664299+miklos-da@users.noreply.github.com>
This commit is contained in:
mziolekda 2020-05-09 11:03:05 +02:00 committed by GitHub
parent 75ed957c63
commit e0ab70df64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 290 additions and 485 deletions

View File

@ -9,9 +9,9 @@ import com.daml.ledger.participant.state.kvutils.DamlKvutils._
import com.daml.ledger.participant.state.kvutils.committer.{
ConfigCommitter,
PackageCommitter,
PartyAllocationCommitter
PartyAllocationCommitter,
TransactionCommitter
}
import com.daml.ledger.participant.state.kvutils.committing._
import com.daml.ledger.participant.state.v1.{Configuration, ParticipantId}
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.engine.Engine
@ -157,7 +157,6 @@ class KeyValueCommitting private[daml] (
case DamlSubmission.PayloadCase.PARTY_ALLOCATION_ENTRY =>
new PartyAllocationCommitter(metrics).run(
entryId,
//TODO replace this call with an explicit maxRecordTime from the request once available
estimateMaximumRecordTime(recordTime),
recordTime,
submission.getPartyAllocationEntry,
@ -176,12 +175,13 @@ class KeyValueCommitting private[daml] (
)
case DamlSubmission.PayloadCase.TRANSACTION_ENTRY =>
new ProcessTransactionSubmission(defaultConfig, engine, metrics, inStaticTimeMode)
new TransactionCommitter(defaultConfig, engine, metrics, inStaticTimeMode)
.run(
entryId,
estimateMaximumRecordTime(recordTime),
recordTime,
participantId,
submission.getTransactionEntry,
participantId,
inputState,
)

View File

@ -5,14 +5,15 @@ package com.daml.ledger.participant.state.kvutils.committer
import com.codahale.metrics.Timer
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
DamlConfigurationEntry,
DamlLogEntry,
DamlLogEntryId,
DamlStateKey,
DamlStateValue
}
import com.daml.ledger.participant.state.kvutils.DamlStateMap
import com.daml.ledger.participant.state.kvutils.{Conversions, DamlStateMap, Err}
import com.daml.ledger.participant.state.kvutils.committer.Committer._
import com.daml.ledger.participant.state.v1.ParticipantId
import com.daml.ledger.participant.state.v1.{Configuration, ParticipantId}
import com.daml.lf.data.Time
import com.daml.metrics.Metrics
import org.slf4j.{Logger, LoggerFactory}
@ -93,4 +94,27 @@ private[committer] trait Committer[Submission, PartialResult] {
object Committer {
type StepInfo = String
def getCurrentConfiguration(
defaultConfig: Configuration,
inputState: Map[DamlStateKey, Option[DamlStateValue]],
logger: Logger): (Option[DamlConfigurationEntry], Configuration) =
inputState
.getOrElse(
Conversions.configurationStateKey,
/* If we're retrieving configuration, we require it to at least
* have been declared as an input by the submitter as it is used
* to authorize configuration changes. */
throw Err.MissingInputState(Conversions.configurationStateKey)
)
.flatMap { v =>
val entry = v.getConfigurationEntry
Configuration
.decode(entry.getConfiguration)
.fold({ err =>
logger.error(s"Failed to parse configuration: $err, using default configuration.")
None
}, conf => Some(Some(entry) -> conf))
}
.getOrElse(None -> defaultConfig)
}

View File

@ -9,8 +9,7 @@ import com.daml.ledger.participant.state.kvutils.Conversions.{
configurationStateKey
}
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
import com.daml.ledger.participant.state.kvutils.committer.Committer.StepInfo
import com.daml.ledger.participant.state.kvutils.committing.Common.getCurrentConfiguration
import com.daml.ledger.participant.state.kvutils.committer.Committer._
import com.daml.ledger.participant.state.v1.Configuration
import com.daml.metrics.Metrics

View File

@ -1,18 +1,17 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils.committing
package com.daml.ledger.participant.state.kvutils.committer
import java.time.Instant
import com.codahale.metrics.Counter
import com.daml.ledger.participant.state.kvutils.Conversions._
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
import com.daml.ledger.participant.state.kvutils.committing.Common.Commit._
import com.daml.ledger.participant.state.kvutils.committing.Common._
import com.daml.ledger.participant.state.kvutils.committing.ProcessTransactionSubmission._
import com.daml.ledger.participant.state.kvutils.committer.Committer._
import com.daml.ledger.participant.state.kvutils.committer.TransactionCommitter._
import com.daml.ledger.participant.state.kvutils.{Conversions, DamlStateMap, Err, InputsAndEffects}
import com.daml.ledger.participant.state.v1.{Configuration, ParticipantId, RejectionReason}
import com.daml.ledger.participant.state.v1.{Configuration, RejectionReason}
import com.daml.lf.archive.Decode
import com.daml.lf.archive.Reader.ParseError
import com.daml.lf.crypto
@ -20,16 +19,17 @@ import com.daml.lf.data.Ref.{PackageId, Party}
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.engine.{Blinding, Engine}
import com.daml.lf.language.Ast
import com.daml.lf.transaction.Transaction.AbsTransaction
import com.daml.lf.transaction.{BlindingInfo, GenTransaction, Node}
import com.daml.lf.transaction.Transaction.AbsTransaction
import com.daml.lf.value.Value
import com.daml.lf.value.Value.AbsoluteContractId
import com.daml.metrics.Metrics
import com.google.protobuf.{Timestamp => ProtoTimestamp}
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
import TransactionCommitter._
// The parameter inStaticTimeMode indicates that the ledger is running in static time mode.
//
// Command deduplication is always based on wall clock time and not ledger time. In static time mode,
@ -41,47 +41,34 @@ import scala.collection.JavaConverters._
// * Adding and additional submission field commandDedupSubmissionTime field. While having participants
// provide this field *could* lead to possible exploits, they are not exploits that could do any harm.
// The bigger concern is adding a public API for the specific use case of Sandbox with static time.
private[kvutils] class ProcessTransactionSubmission(
private[kvutils] class TransactionCommitter(
defaultConfig: Configuration,
engine: Engine,
metrics: Metrics,
override protected val metrics: Metrics,
inStaticTimeMode: Boolean
) {
) extends Committer[DamlTransactionEntry, DamlTransactionEntrySummary] {
override protected val committerName = "transaction"
private implicit val logger: Logger = LoggerFactory.getLogger(this.getClass)
override protected def init(
commitContext: CommitContext,
transactionEntry: DamlTransactionEntry,
): DamlTransactionEntrySummary =
DamlTransactionEntrySummary(transactionEntry)
def run(
entryId: DamlLogEntryId,
recordTime: Timestamp,
participantId: ParticipantId,
txEntry: DamlTransactionEntry,
inputState: DamlStateMap,
): (DamlLogEntry, Map[DamlStateKey, DamlStateValue]) =
metrics.daml.kvutils.committer.transaction.runTimer.time { () =>
val transactionEntry = TransactionEntry(txEntry)
runSequence(
inputState = inputState,
"Authorize submitter" -> authorizeSubmitter(recordTime, participantId, transactionEntry),
"Check Informee Parties Allocation" ->
checkInformeePartiesAllocation(recordTime, transactionEntry),
"Deduplicate" -> deduplicateCommand(recordTime, transactionEntry),
"Validate Ledger Time" -> validateLedgerTime(recordTime, transactionEntry, inputState),
"Validate Contract Keys" ->
validateContractKeys(recordTime, transactionEntry),
"Validate Model Conformance" -> timed(
metrics.daml.kvutils.committer.transaction.interpretTimer,
validateModelConformance(engine, recordTime, participantId, transactionEntry, inputState),
),
"Authorize and build result" ->
authorizeAndBlind(recordTime, transactionEntry).flatMap(
buildFinalResult(entryId, recordTime, transactionEntry))
)
}
override protected val steps: Iterable[(StepInfo, Step)] = Iterable(
"authorize_submitter" -> authorizeSubmitter,
"check_informee_parties_allocation" -> checkInformeePartiesAllocation,
"deduplicate" -> deduplicateCommand,
"validate_ledger_time" -> validateLedgerTime,
"validate_contract_keys" -> validateContractKeys,
"validate_model_conformance" -> validateModelConformance,
"authorize_and_blind" -> authorizeAndBlind
)
// -------------------------------------------------------------------------------
private def contractIsActiveAndVisibleToSubmitter(
transactionEntry: TransactionEntry,
transactionEntry: DamlTransactionEntrySummary,
contractState: DamlContractState,
): Boolean = {
val locallyDisclosedTo = contractState.getLocallyDisclosedToList.asScala
@ -97,25 +84,22 @@ private[kvutils] class ProcessTransactionSubmission(
/** Reject duplicate commands
*/
private def deduplicateCommand(
recordTime: Timestamp,
transactionEntry: TransactionEntry,
): Commit[Unit] = {
private def deduplicateCommand: Step = (commitContext, transactionEntry) => {
val dedupKey = commandDedupKey(transactionEntry.submitterInfo)
get(dedupKey).flatMap { dedupEntry =>
val submissionTime = if (inStaticTimeMode) Instant.now() else recordTime.toInstant
if (dedupEntry.forall(isAfterDeduplicationTime(submissionTime, _))) {
pass
} else {
logger.trace(
s"Transaction rejected, duplicate command, correlationId=${transactionEntry.commandId}")
reject(
recordTime,
DamlTransactionRejectionEntry.newBuilder
.setSubmitterInfo(transactionEntry.submitterInfo)
.setDuplicateCommand(Duplicate.newBuilder.setDetails(""))
)
}
val dedupEntry = commitContext.get(dedupKey)
val submissionTime =
if (inStaticTimeMode) Instant.now() else commitContext.getRecordTime.toInstant
if (dedupEntry.forall(isAfterDeduplicationTime(submissionTime, _))) {
StepContinue(transactionEntry)
} else {
logger.trace(
s"Transaction rejected, duplicate command, correlationId=${transactionEntry.commandId}")
reject(
commitContext.getRecordTime,
DamlTransactionRejectionEntry.newBuilder
.setSubmitterInfo(transactionEntry.submitterInfo)
.setDuplicateCommand(Duplicate.newBuilder.setDetails(""))
)
}
}
@ -139,134 +123,127 @@ private[kvutils] class ProcessTransactionSubmission(
* If the "open world" setting is enabled we allow the submission even if the
* party is unallocated.
*/
private def authorizeSubmitter(
recordTime: Timestamp,
participantId: ParticipantId,
transactionEntry: TransactionEntry,
): Commit[Unit] =
get(partyStateKey(transactionEntry.submitter)).flatMap {
private def authorizeSubmitter: Step = (commitContext, transactionEntry) => {
commitContext.get(partyStateKey(transactionEntry.submitter)) match {
case Some(partyAllocation) =>
if (partyAllocation.getParty.getParticipantId == participantId)
pass
if (partyAllocation.getParty.getParticipantId == commitContext.getParticipantId)
StepContinue(transactionEntry)
else
reject(
recordTime,
commitContext.getRecordTime,
buildRejectionLogEntry(
transactionEntry,
RejectionReason.SubmitterCannotActViaParticipant(
s"Party '${transactionEntry.submitter}' not hosted by participant $participantId"))
s"Party '${transactionEntry.submitter}' not hosted by participant ${commitContext.getParticipantId}")
)
)
case None =>
reject(
recordTime,
commitContext.getRecordTime,
buildRejectionLogEntry(
transactionEntry,
RejectionReason.PartyNotKnownOnLedger(
s"Submitting party '${transactionEntry.submitter}' not known"))
)
}
}
/** Validate ledger effective time and the command's time-to-live. */
private def validateLedgerTime(
recordTime: Timestamp,
transactionEntry: TransactionEntry,
inputState: DamlStateMap,
): Commit[Unit] = delay {
val (_, config) = Common.getCurrentConfiguration(defaultConfig, inputState, logger)
private def validateLedgerTime: Step = (commitContext, transactionEntry) => {
val (_, config) = getCurrentConfiguration(defaultConfig, commitContext.inputs, logger)
val timeModel = config.timeModel
val givenLedgerTime = transactionEntry.ledgerEffectiveTime.toInstant
timeModel
.checkTime(ledgerTime = givenLedgerTime, recordTime = recordTime.toInstant)
.checkTime(ledgerTime = givenLedgerTime, recordTime = commitContext.getRecordTime.toInstant)
.fold(
reason =>
reject(
recordTime,
commitContext.getRecordTime,
buildRejectionLogEntry(transactionEntry, RejectionReason.InvalidLedgerTime(reason))),
_ => pass)
_ => StepContinue(transactionEntry)
)
}
/** Validate the submission's conformance to the DAML model */
private def validateModelConformance(
engine: Engine,
recordTime: Timestamp,
participantId: ParticipantId,
transactionEntry: TransactionEntry,
inputState: DamlStateMap,
): Commit[Unit] = delay {
// Pull all keys from referenced contracts. We require this for 'fetchByKey' calls
// which are not evidenced in the transaction itself and hence the contract key state is
// not included in the inputs.
lazy val knownKeys: Map[DamlContractKey, Value.AbsoluteContractId] =
inputState.collect {
case (key, Some(value))
if value.hasContractState
&& value.getContractState.hasContractKey
&& contractIsActiveAndVisibleToSubmitter(transactionEntry, value.getContractState) =>
value.getContractState.getContractKey -> Conversions.stateKeyToContractId(key)
}
private def validateModelConformance: Step =
(commitContext, transactionEntry) =>
metrics.daml.kvutils.committer.transaction.interpretTimer.time(() => {
// Pull all keys from referenced contracts. We require this for 'fetchByKey' calls
// which are not evidenced in the transaction itself and hence the contract key state is
// not included in the inputs.
lazy val knownKeys: Map[DamlContractKey, Value.AbsoluteContractId] =
commitContext.inputs.collect {
case (key, Some(value))
if value.hasContractState
&& value.getContractState.hasContractKey
&& contractIsActiveAndVisibleToSubmitter(
transactionEntry,
value.getContractState) =>
value.getContractState.getContractKey -> Conversions.stateKeyToContractId(key)
}
engine
.validate(
transactionEntry.abs,
transactionEntry.ledgerEffectiveTime,
participantId,
transactionEntry.submissionTime,
transactionEntry.submissionSeed,
)
.consume(
lookupContract(transactionEntry, inputState),
lookupPackage(transactionEntry, inputState),
lookupKey(transactionEntry, inputState, knownKeys),
)
.fold(
err =>
reject(
recordTime,
buildRejectionLogEntry(transactionEntry, RejectionReason.Disputed(err.msg))),
_ => pass)
}
engine
.validate(
transactionEntry.absoluteTransaction,
transactionEntry.ledgerEffectiveTime,
commitContext.getParticipantId,
transactionEntry.submissionTime,
transactionEntry.submissionSeed,
)
.consume(
lookupContract(transactionEntry, commitContext.inputs),
lookupPackage(transactionEntry, commitContext.inputs),
lookupKey(transactionEntry, commitContext.inputs, knownKeys),
)
.fold(
err =>
reject[DamlTransactionEntrySummary](
commitContext.getRecordTime,
buildRejectionLogEntry(transactionEntry, RejectionReason.Disputed(err.msg))),
_ => StepContinue[DamlTransactionEntrySummary](transactionEntry)
)
})
/** Validate the submission's conformance to the DAML model */
private def authorizeAndBlind(
recordTime: Timestamp,
transactionEntry: TransactionEntry,
): Commit[BlindingInfo] = delay {
Blinding
.checkAuthorizationAndBlind(
transactionEntry.abs,
initialAuthorizers = Set(transactionEntry.submitter),
private def authorizeAndBlind: Step =
(commitContext, transactionEntry) =>
Blinding
.checkAuthorizationAndBlind(
transactionEntry.absoluteTransaction,
initialAuthorizers = Set(transactionEntry.submitter),
)
.fold(
err =>
reject(
commitContext.getRecordTime,
buildRejectionLogEntry(transactionEntry, RejectionReason.Disputed(err.msg))),
succ => buildFinalResult(commitContext, transactionEntry, succ)
)
.fold(
err =>
reject(
recordTime,
buildRejectionLogEntry(transactionEntry, RejectionReason.Disputed(err.msg))),
pure)
}
private def validateContractKeys(
recordTime: Timestamp,
transactionEntry: TransactionEntry,
): Commit[Unit] =
for {
damlState <- getDamlState
startingKeys = damlState.collect {
case (k, v) if k.hasContractKey && v.getContractKeyState.getContractId.nonEmpty => k
}.toSet
_ <- validateContractKeyUniqueness(recordTime, transactionEntry, startingKeys)
_ <- validateContractKeyCausalMonotonicity(
recordTime,
transactionEntry,
startingKeys,
damlState)
} yield ()
private def validateContractKeys: Step = (commitContext, transactionEntry) => {
val damlState = commitContext.inputs
.collect { case (k, Some(v)) => k -> v } ++ commitContext.getOutputs
val startingKeys = damlState.collect {
case (k, v) if k.hasContractKey && v.getContractKeyState.getContractId.nonEmpty => k
}.toSet
validateContractKeyUniqueness(commitContext.getRecordTime, transactionEntry, startingKeys) match {
case StepContinue(transactionEntry) =>
validateContractKeyCausalMonotonicity(
commitContext.getRecordTime,
transactionEntry,
startingKeys,
damlState)
case err => err
}
}
private def validateContractKeyUniqueness(
recordTime: Timestamp,
transactionEntry: TransactionEntry,
keys: Set[DamlStateKey]) = {
val allUnique = transactionEntry.abs
transactionEntry: DamlTransactionEntrySummary,
keys: Set[DamlStateKey]): StepResult[DamlTransactionEntrySummary] = {
val allUnique = transactionEntry.absoluteTransaction
.fold((true, keys)) {
case (
(allUnique, existingKeys),
@ -290,7 +267,7 @@ private[kvutils] class ProcessTransactionSubmission(
._1
if (allUnique)
pass
StepContinue(transactionEntry)
else
reject(
recordTime,
@ -308,9 +285,9 @@ private[kvutils] class ProcessTransactionSubmission(
*/
private def validateContractKeyCausalMonotonicity(
recordTime: Timestamp,
transactionEntry: TransactionEntry,
transactionEntry: DamlTransactionEntrySummary,
keys: Set[DamlStateKey],
damlState: DamlOutputStateMap) = {
damlState: Map[DamlStateKey, DamlStateValue]): StepResult[DamlTransactionEntrySummary] = {
val causalKeyMonotonicity = keys.forall { key =>
val state = damlState(key)
val keyActiveAt =
@ -318,7 +295,7 @@ private[kvutils] class ProcessTransactionSubmission(
!keyActiveAt.isAfter(transactionEntry.ledgerEffectiveTime.toInstant)
}
if (causalKeyMonotonicity)
pass
StepContinue(transactionEntry)
else
reject(
recordTime,
@ -328,136 +305,129 @@ private[kvutils] class ProcessTransactionSubmission(
}
/** Check that all informee parties mentioned of a transaction are allocated. */
private def checkInformeePartiesAllocation(
recordTime: Timestamp,
transactionEntry: TransactionEntry,
): Commit[Unit] = {
def foldInformeeParties[T](tx: GenTransaction.WithTxValue[_, _], init: T)(
f: (T, String) => T
): T =
private def checkInformeePartiesAllocation: Step = (commitContext, transactionEntry) => {
def foldInformeeParties(tx: GenTransaction.WithTxValue[_, _], init: Boolean)(
f: (Boolean, String) => Boolean
): Boolean =
tx.fold(init) {
case (accum, (_, node)) =>
node.informeesOfNode.foldLeft(accum)(f)
}
for {
allExist <- foldInformeeParties(transactionEntry.abs, pure(true)) { (accum, party) =>
get(partyStateKey(party)).flatMap(_.fold(pure(false))(_ => accum))
}
val allExist = foldInformeeParties(transactionEntry.absoluteTransaction, init = true) {
(accum, party) =>
commitContext.get(partyStateKey(party)).fold(false)(_ => accum)
}
result <- if (allExist)
pass
else
reject(
recordTime,
buildRejectionLogEntry(
transactionEntry,
RejectionReason.PartyNotKnownOnLedger("Not all parties known"))
)
} yield result
if (allExist)
StepContinue(transactionEntry)
else
reject(
commitContext.getRecordTime,
buildRejectionLogEntry(
transactionEntry,
RejectionReason.PartyNotKnownOnLedger("Not all parties known"))
)
}
/** All checks passed. Produce the log entry and contract state updates. */
private def buildFinalResult(
entryId: DamlLogEntryId,
recordTime: Timestamp,
transactionEntry: TransactionEntry,
)(blindingInfo: BlindingInfo): Commit[Unit] = delay {
val effects = InputsAndEffects.computeEffects(transactionEntry.abs)
commitContext: CommitContext,
transactionEntry: DamlTransactionEntrySummary,
blindingInfo: BlindingInfo
): StepResult[DamlTransactionEntrySummary] = {
val effects = InputsAndEffects.computeEffects(transactionEntry.absoluteTransaction)
val cid2nid: Value.AbsoluteContractId => Value.NodeId = transactionEntry.abs.localContracts
val cid2nid: Value.AbsoluteContractId => Value.NodeId =
transactionEntry.absoluteTransaction.localContracts
val dedupKey = commandDedupKey(transactionEntry.submitterInfo)
val ledgerEffectiveTime = transactionEntry.txEntry.getLedgerEffectiveTime
val ledgerEffectiveTime = transactionEntry.submission.getLedgerEffectiveTime
// Helper to read the _current_ contract state.
// NOTE(JM): Important to fetch from the state that is currently being built up since
// we mark some contracts as archived and may later change their disclosure and do not
// want to "unarchive" them.
def getContractState(key: DamlStateKey): Commit[DamlContractState] =
get(key).map {
_.getOrElse(throw Err.MissingInputState(key)).getContractState
}
// Set a deduplication entry
commitContext.set(
dedupKey,
DamlStateValue.newBuilder
.setCommandDedup(
DamlCommandDedupValue.newBuilder
.setRecordTime(buildTimestamp(commitContext.getRecordTime))
.setDeduplicatedUntil(transactionEntry.submitterInfo.getDeduplicateUntil)
.build)
.build
)
sequence2(
// Set a deduplication entry
set(
dedupKey -> DamlStateValue.newBuilder
.setCommandDedup(
DamlCommandDedupValue.newBuilder
.setRecordTime(buildTimestamp(recordTime))
.setDeduplicatedUntil(transactionEntry.submitterInfo.getDeduplicateUntil)
.build)
.build),
// Add contract state entries to mark contract activeness (checked by 'validateModelConformance')
set(effects.createdContracts.map {
case (key, createNode) =>
val cs = DamlContractState.newBuilder
cs.setActiveAt(buildTimestamp(transactionEntry.ledgerEffectiveTime))
val localDisclosure =
blindingInfo.localDisclosure(cid2nid(decodeContractId(key.getContractId)))
cs.addAllLocallyDisclosedTo((localDisclosure: Iterable[String]).asJava)
cs.setContractInstance(
Conversions.encodeContractInstance(createNode.coinst)
// Add contract state entries to mark contract activeness (checked by 'validateModelConformance')
effects.createdContracts.foreach {
case (key, createNode) =>
val cs = DamlContractState.newBuilder
cs.setActiveAt(buildTimestamp(transactionEntry.ledgerEffectiveTime))
val localDisclosure =
blindingInfo.localDisclosure(cid2nid(decodeContractId(key.getContractId)))
cs.addAllLocallyDisclosedTo((localDisclosure: Iterable[String]).asJava)
cs.setContractInstance(
Conversions.encodeContractInstance(createNode.coinst)
)
createNode.key.foreach { keyWithMaintainers =>
cs.setContractKey(
Conversions.encodeGlobalKey(
Node.GlobalKey
.build(
createNode.coinst.template,
keyWithMaintainers.key.value
)
.fold(
_ => throw Err.InvalidSubmission("Unexpected contract id in contract key."),
identity))
)
createNode.key.foreach { keyWithMaintainers =>
cs.setContractKey(
Conversions.encodeGlobalKey(
Node.GlobalKey
.build(
createNode.coinst.template,
keyWithMaintainers.key.value
)
.fold(
_ => throw Err.InvalidSubmission("Unexpected contract id in contract key."),
identity))
)
}
key -> DamlStateValue.newBuilder.setContractState(cs).build
}),
// Update contract state entries to mark contracts as consumed (checked by 'validateModelConformance')
sequence2(effects.consumedContracts.map { key =>
for {
cs <- getContractState(key).map { cs =>
}
commitContext.set(key, DamlStateValue.newBuilder.setContractState(cs).build)
}
// Update contract state entries to mark contracts as consumed (checked by 'validateModelConformance')
effects.consumedContracts.foreach { key =>
val cs = getContractState(commitContext, key)
commitContext.set(
key,
DamlStateValue.newBuilder
.setContractState(
cs.toBuilder
.setArchivedAt(buildTimestamp(transactionEntry.ledgerEffectiveTime))
.setArchivedByEntry(entryId)
}
r <- set(key -> DamlStateValue.newBuilder.setContractState(cs).build)
} yield r
}: _*),
// Update contract state of divulged contracts
sequence2(blindingInfo.globalDivulgence.map {
case (coid, parties) =>
val key = contractIdToStateKey(coid)
getContractState(key).flatMap { cs =>
val divulged: Set[String] = cs.getDivulgedToList.asScala.toSet
val newDivulgences: Set[String] = parties.toSet[String] -- divulged
if (newDivulgences.isEmpty)
pass
else {
val cs2 = cs.toBuilder
.addAllDivulgedTo(newDivulgences.asJava)
set(key -> DamlStateValue.newBuilder.setContractState(cs2).build)
}
}
}.toList: _*),
// Update contract keys
set(effects.updatedContractKeys.map {
case (key, contractKeyState) =>
.setArchivedByEntry(commitContext.getEntryId)
)
.build
)
}
// Update contract state of divulged contracts
blindingInfo.globalDivulgence.foreach {
case (coid, parties) =>
val key = contractIdToStateKey(coid)
val cs = getContractState(commitContext, key)
val divulged: Set[String] = cs.getDivulgedToList.asScala.toSet
val newDivulgences: Set[String] = parties.toSet[String] -- divulged
if (newDivulgences.nonEmpty) {
val cs2 = cs.toBuilder
.addAllDivulgedTo(newDivulgences.asJava)
commitContext.set(key, DamlStateValue.newBuilder.setContractState(cs2).build)
}
}
// Update contract keys
effects.updatedContractKeys.foreach {
case (key, contractKeyState) =>
val (k, v) =
updateContractKeyWithContractKeyState(ledgerEffectiveTime, key, contractKeyState)
}),
delay {
metrics.daml.kvutils.committer.transaction.accepts.inc()
logger.trace(s"Transaction accepted, correlationId=${transactionEntry.commandId}")
done(
DamlLogEntry.newBuilder
.setRecordTime(buildTimestamp(recordTime))
.setTransactionEntry(transactionEntry.txEntry)
.build
)
}
commitContext.set(k, v)
}
metrics.daml.kvutils.committer.transaction.accepts.inc()
logger.trace(s"Transaction accepted, correlationId=${transactionEntry.commandId}")
StepStop(
DamlLogEntry.newBuilder
.setRecordTime(buildTimestamp(commitContext.getRecordTime))
.setTransactionEntry(transactionEntry.submission)
.build
)
}
@ -483,7 +453,9 @@ private[kvutils] class ProcessTransactionSubmission(
// Helper to lookup contract instances. We verify the activeness of
// contract instances here. Since we look up every contract that was
// an input to a transaction, we do not need to verify the inputs separately.
private def lookupContract(transactionEntry: TransactionEntry, inputState: DamlStateMap)(
private def lookupContract(
transactionEntry: DamlTransactionEntrySummary,
inputState: DamlStateMap)(
coid: Value.AbsoluteContractId,
): Option[Value.ContractInst[Value.VersionedValue[Value.AbsoluteContractId]]] = {
val stateKey = contractIdToStateKey(coid)
@ -505,7 +477,7 @@ private[kvutils] class ProcessTransactionSubmission(
// are stored in the [[DamlLogEntry]], which we find by looking up
// the DAML state entry at `DamlStateKey(packageId = pkgId)`.
private def lookupPackage(
transactionEntry: TransactionEntry,
transactionEntry: DamlTransactionEntrySummary,
inputState: DamlStateMap,
)(pkgId: PackageId): Option[Ast.Package] = {
val stateKey = packageStateKey(pkgId)
@ -542,7 +514,7 @@ private[kvutils] class ProcessTransactionSubmission(
}
private def lookupKey(
transactionEntry: TransactionEntry,
transactionEntry: DamlTransactionEntrySummary,
inputState: DamlStateMap,
knownKeys: Map[DamlContractKey, Value.AbsoluteContractId],
)(key: Node.GlobalKey): Option[Value.AbsoluteContractId] = {
@ -572,7 +544,7 @@ private[kvutils] class ProcessTransactionSubmission(
}
private def buildRejectionLogEntry(
transactionEntry: TransactionEntry,
transactionEntry: DamlTransactionEntrySummary,
reason: RejectionReason,
): DamlTransactionRejectionEntry.Builder = {
logger.trace(
@ -603,9 +575,9 @@ private[kvutils] class ProcessTransactionSubmission(
private def reject[A](
recordTime: Timestamp,
rejectionEntry: DamlTransactionRejectionEntry.Builder,
): Commit[A] = {
): StepResult[A] = {
Metrics.rejections(rejectionEntry.getReasonCase.getNumber).inc()
Commit.done(
StepStop(
DamlLogEntry.newBuilder
.setRecordTime(buildTimestamp(recordTime))
.setTransactionRejectionEntry(rejectionEntry)
@ -621,16 +593,24 @@ private[kvutils] class ProcessTransactionSubmission(
}
}
object ProcessTransactionSubmission {
private[kvutils] object TransactionCommitter {
private case class TransactionEntry(txEntry: DamlTransactionEntry) {
val ledgerEffectiveTime: Timestamp = parseTimestamp(txEntry.getLedgerEffectiveTime)
val submitterInfo: DamlSubmitterInfo = txEntry.getSubmitterInfo
case class DamlTransactionEntrySummary(submission: DamlTransactionEntry) {
val ledgerEffectiveTime: Timestamp = parseTimestamp(submission.getLedgerEffectiveTime)
val submitterInfo: DamlSubmitterInfo = submission.getSubmitterInfo
val commandId: String = submitterInfo.getCommandId
val submitter: Party = Party.assertFromString(submitterInfo.getSubmitter)
lazy val abs: AbsTransaction = Conversions.decodeTransaction(txEntry.getTransaction)
val submissionTime: Timestamp = Conversions.parseTimestamp(txEntry.getSubmissionTime)
val submissionSeed: Option[crypto.Hash] = Conversions.parseOptHash(txEntry.getSubmissionSeed)
lazy val absoluteTransaction: AbsTransaction =
Conversions.decodeTransaction(submission.getTransaction)
val submissionTime: Timestamp = Conversions.parseTimestamp(submission.getSubmissionTime)
val submissionSeed: Option[crypto.Hash] = Conversions.parseOptHash(submission.getSubmissionSeed)
}
// Helper to read the _current_ contract state.
// NOTE(JM): Important to fetch from the state that is currently being built up since
// we mark some contracts as archived and may later change their disclosure and do not
// want to "unarchive" them.
def getContractState(commitContext: CommitContext, key: DamlStateKey): DamlContractState =
commitContext.get(key).getOrElse(throw Err.MissingInputState(key)).getContractState
}

View File

@ -1,200 +0,0 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils.committing
import java.util.concurrent.TimeUnit
import com.codahale.metrics
import com.daml.ledger.participant.state.kvutils.DamlStateMap
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
DamlLogEntry,
DamlStateKey,
DamlStateValue,
DamlConfigurationEntry
}
import com.daml.ledger.participant.state.kvutils.{Conversions, Err}
import com.daml.ledger.participant.state.v1.Configuration
import com.daml.lf.data.InsertOrdMap
import org.slf4j.Logger
import scala.annotation.tailrec
private[kvutils] object Common {
type DamlOutputStateMap = Map[DamlStateKey, DamlStateValue]
final case class CommitContext private (
/* The input state as declared by the submission. */
inputState: DamlStateMap,
/* The intermediate and final state that is committed. */
resultState: DamlOutputStateMap,
)
/** A monadic computation that represents the process of committing which accumulates
* ledger state and finishes with the final state and a log entry.
* This is essentially State + Either.
*/
final case class Commit[A](run: CommitContext => Either[CommitDone, (A, CommitContext)]) {
def flatMap[A1](f: A => Commit[A1]): Commit[A1] =
Commit { state =>
run(state) match {
case Left(done) => Left(done)
case Right((x, state2)) =>
f(x).run(state2)
}
}
def pure[A1](a: A1): Commit[A1] =
Commit { state =>
Right(a -> state)
}
def map[A1](f: A => A1): Commit[A1] =
flatMap(a => pure(f(a)))
}
/** The terminal state for the commit computation. */
final case class CommitDone(logEntry: DamlLogEntry, state: DamlOutputStateMap)
object Commit {
def sequence(acts: Iterable[(String, Commit[Unit])])(implicit logger: Logger): Commit[Unit] = {
@tailrec
def go(
state: CommitContext,
act: (String, Commit[Unit]),
rest: Iterable[(String, Commit[Unit])]
): Either[CommitDone, (Unit, CommitContext)] = {
val result =
if (act._1.isEmpty || !logger.isTraceEnabled)
act._2.run(state)
else {
val t0 = System.nanoTime()
val r = act._2.run(state)
val t1 = System.nanoTime()
if (!act._1.isEmpty)
logger.trace(s"${act._1}: ${TimeUnit.NANOSECONDS.toMillis(t1 - t0)}ms")
r
}
result match {
case Left(done) =>
Left(done)
case Right(((), state2)) =>
if (rest.isEmpty)
Right(() -> state2)
else
go(state2, rest.head, rest.tail)
}
}
if (acts.isEmpty)
pass
else
Commit { state0 =>
go(state0, acts.head, acts.tail)
}
}
/** Sequence commit actions which produces no intermediate values. */
def sequence(acts: (String, Commit[Unit])*)(implicit logger: Logger): Commit[Unit] =
sequence(acts)
/** Sequence commit actions which produces no intermediate values. */
def sequence2(acts: Commit[Unit]*)(implicit logger: Logger): Commit[Unit] =
sequence(acts.map { act =>
"" -> act
})
/** Run a sequence of commit computations, producing a log entry and the state. */
def runSequence(inputState: DamlStateMap, acts: (String, Commit[Unit])*)(
implicit logger: Logger): (DamlLogEntry, DamlOutputStateMap) =
sequence(acts).run(CommitContext(inputState, InsertOrdMap.empty)) match {
case Left(done) => done.logEntry -> done.state
case Right(_) =>
throw Err.InternalError("Commit.runSequence: The commit processing did not terminate!")
}
/** A no-op computation that produces no result. Useful when validating,
* e.g. if (somethingIsCorrect) pass else done(someFailure). */
val pass: Commit[Unit] =
Commit { state =>
Right(() -> state)
}
/** Lift a pure value into the computation. */
def pure[A](a: A): Commit[A] =
Commit { state =>
Right(a -> state)
}
/** Delay a commit. Useful for delaying expensive computation, e.g.
* delay { val foo = someExpensiveComputation; if (foo) done(err) else pass }
*/
def delay[A](act: => Commit[A]): Commit[A] =
Commit { state =>
act.run(state)
}
/** Time a commit */
def timed[A](timer: metrics.Timer, act: Commit[A]): Commit[A] = Commit { state =>
timer.time { () =>
act.run(state)
}
}
/** Set value(s) in the state. */
def set(additionalState: (DamlStateKey, DamlStateValue)*): Commit[Unit] =
set(additionalState)
/** Set value(s) in the state. */
def set(additionalState: Iterable[(DamlStateKey, DamlStateValue)]): Commit[Unit] =
Commit { state =>
Right(() -> (state.copy(resultState = state.resultState ++ additionalState)))
}
/** Get a value from the state built up thus far, or if not found then from input state. */
def get(key: DamlStateKey): Commit[Option[DamlStateValue]] =
Commit { state =>
Right(
state.resultState
.get(key)
.orElse(state.inputState.getOrElse(key, throw Err.MissingInputState(key)))
-> state)
}
def getDamlState: Commit[DamlOutputStateMap] =
Commit { state =>
Right(
(state.inputState.collect { case (k, Some(v)) => k -> v } ++ state.resultState) -> state)
}
/** Finish the computation and produce a log entry, along with the
* state built thus far by the computation. */
def done[A](logEntry: DamlLogEntry): Commit[A] =
Commit { state =>
Left(CommitDone(logEntry, state.resultState))
}
}
def getCurrentConfiguration(
defaultConfig: Configuration,
inputState: Map[DamlStateKey, Option[DamlStateValue]],
logger: Logger): (Option[DamlConfigurationEntry], Configuration) =
inputState
.getOrElse(
Conversions.configurationStateKey,
/* If we're retrieving configuration, we require it to at least
* have been declared as an input by the submitter as it is used
* to authorize configuration changes. */
throw Err.MissingInputState(Conversions.configurationStateKey)
)
.flatMap { v =>
val entry = v.getConfigurationEntry
Configuration
.decode(entry.getConfiguration)
.fold({ err =>
logger.error(s"Failed to parse configuration: $err, using default configuration.")
None
}, conf => Some(Some(entry) -> conf))
}
.getOrElse(None -> defaultConfig)
}

View File

@ -267,6 +267,8 @@ class KVUtilsTransactionSpec extends WordSpec with Matchers {
metrics.daml.kvutils.committer.transaction.accepts.getCount should be >= 1L
metrics.daml.kvutils.committer.transaction.rejection(disputed.name).getCount should be >= 1L
metrics.daml.kvutils.committer.runTimer("transaction").getCount should be >= 1L
metrics.daml.kvutils.committer.transaction.interpretTimer.getCount should be >= 1L
metrics.daml.kvutils.committer.transaction.runTimer.getCount should be >= 1L
}
}