Support divulged contracts in a distributed ledger (#2739)

Fixes #2488

Includes some refactoring of ActiveContracts (now ActiveLedgerState).
This commit is contained in:
Robert Autenrieth 2019-09-11 14:21:41 +02:00 committed by GitHub
parent 03962da4e3
commit e288437dc0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 682 additions and 391 deletions

View File

@ -14,7 +14,7 @@ import com.digitalasset.daml.lf.value.Value
import com.digitalasset.daml.lf.value.Value.ContractId
import com.digitalasset.daml_lf.DamlLf.Archive
import com.digitalasset.ledger.api.domain.PartyDetails
import com.digitalasset.platform.sandbox.stores.InMemoryActiveContracts
import com.digitalasset.platform.sandbox.stores.InMemoryActiveLedgerState
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.immutable.TreeMap
@ -28,7 +28,7 @@ final case class IndexState(
private val beginning: Option[Offset],
// Accepted transactions indexed by offset.
txs: TreeMap[Offset, (Update.TransactionAccepted, BlindingInfo)],
activeContracts: InMemoryActiveContracts,
activeContracts: InMemoryActiveLedgerState,
// Rejected commands indexed by offset.
commandRejections: TreeMap[Offset, Update.CommandRejected],
// Uploaded packages.
@ -120,7 +120,8 @@ final case class IndexState(
transaction = u.transaction,
explicitDisclosure = blindingInfo.explicitDisclosure,
localImplicitDisclosure = blindingInfo.localImplicitDisclosure,
globalImplicitDisclosure = blindingInfo.globalImplicitDisclosure
globalImplicitDisclosure = blindingInfo.globalImplicitDisclosure,
referencedContracts = u.divulgedContracts.map(c => c.contractId -> c.contractInst)
)
.fold(_ => Left(SequencingError), { newActiveContracts =>
Right(
@ -171,7 +172,7 @@ object IndexState {
configuration = lic.config,
recordTime = lic.initialRecordTime,
txs = TreeMap.empty,
activeContracts = InMemoryActiveContracts.empty,
activeContracts = InMemoryActiveLedgerState.empty,
commandRejections = TreeMap.empty,
packages = Map.empty,
packageDetails = Map.empty,

View File

@ -19,7 +19,7 @@ import com.digitalasset.daml.lf.value.Value
import com.digitalasset.daml_lf.DamlLf
import com.digitalasset.ledger.api.domain.{LedgerOffset, PartyDetails, TransactionFilter}
import com.digitalasset.platform.akkastreams.dispatcher.SignalDispatcher
import com.digitalasset.platform.sandbox.stores.ActiveContracts
import com.digitalasset.platform.sandbox.stores.ActiveLedgerState
import org.slf4j.LoggerFactory
import scala.concurrent.duration.FiniteDuration
@ -219,7 +219,7 @@ final case class ReferenceIndexService(
}
.collect {
case (workflowId, create: AcsUpdateEvent.Create)
if state.activeContracts.contracts.contains(create.contractId) =>
if state.activeContracts.activeContracts.contains(create.contractId) =>
(workflowId, create)
}
.toIterator)
@ -328,10 +328,15 @@ final case class ReferenceIndexService(
})
}
private def canSeeContract(submitter: Party, ac: ActiveContracts.ActiveContract): Boolean = {
// ^ only parties disclosed or divulged to can lookup; see https://github.com/digital-asset/daml/issues/10
// and https://github.com/digital-asset/daml/issues/751 .
Right(submitter) exists (p => ac.witnesses(p) || ac.divulgences.contains(p))
private def canSeeContract(submitter: Party, c: ActiveLedgerState.Contract): Boolean = c match {
case ac: ActiveLedgerState.ActiveContract =>
// ^ only parties disclosed or divulged to can lookup; see https://github.com/digital-asset/daml/issues/10
// and https://github.com/digital-asset/daml/issues/751 .
Right(submitter) exists (p => ac.witnesses(p) || ac.divulgences.contains(p))
case dc: ActiveLedgerState.DivulgedContract =>
// ^ only parties divulged to can lookup; see https://github.com/digital-asset/daml/issues/10
// and https://github.com/digital-asset/daml/issues/751 .
Right(submitter) exists (p => dc.divulgences.contains(p))
}
override def lookupActiveContract(submitter: Party, contractId: Value.AbsoluteContractId)

View File

@ -0,0 +1 @@
4cace68fd69753d0a5ace8e92d3ff3b50058351692c384ab5f47b3ba43923bd7

View File

@ -0,0 +1,25 @@
-- Copyright (c) 2019 The DAML Authors. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0
---------------------------------------------------------------------------------------------------
-- V3: Contract divulgence
--
-- This schema version splits the contracts table into:
-- contracts_data, only holding contract data
-- contracts, only holding contract metadata
--
-- This is done because for divulged contracts, we only know the contract data,
-- but no other metadata.
---------------------------------------------------------------------------------------------------
-- Move the `contract` column (the serialized contract data) from contracts to contract_data.
CREATE TABLE contract_data (
id varchar primary key not null,
-- the serialized contract value, using the definition in
-- `daml-lf/transaction/src/main/protobuf/com/digitalasset/daml/lf/value.proto`
-- and the encoder in `ContractSerializer.scala`.
contract bytea not null
);
INSERT INTO contract_data (id, contract) SELECT id, contract FROM contracts;
ALTER TABLE contracts DROP COLUMN contract;

View File

@ -0,0 +1 @@
53fce47c203871a2ca80fd647d5d4a6351b321d0591acc25cd24396bcee01b47

View File

@ -0,0 +1,24 @@
-- Copyright (c) 2019 The DAML Authors. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0
---------------------------------------------------------------------------------------------------
-- V8: Contract divulgence
--
-- This schema version splits the contracts table into:
-- contracts_data, only holding contract data
-- contracts, only holding contract metadata
--
-- This is done because for divulged contracts, we only know the contract data,
-- but no other metadata.
---------------------------------------------------------------------------------------------------
-- Move the `contract` column (the serialized contract data) from contracts to contract_data.
CREATE TABLE contract_data (
id varchar primary key not null,
-- the serialized contract value, using the definition in
-- `daml-lf/transaction/src/main/protobuf/com/digitalasset/daml/lf/value.proto`
-- and the encoder in `ContractSerializer.scala`.
contract bytea not null
);
INSERT INTO contract_data (id, contract) SELECT id, contract FROM contracts;
ALTER TABLE contracts DROP COLUMN contract;

View File

@ -225,6 +225,8 @@ class JdbcIndexer private[index] (
SandboxEventIdFormatter.fromTransactionId(transactionId, nodeId) -> parties
}
assert(blindingInfo.localImplicitDisclosure.isEmpty)
val pt = PersistenceEntry.Transaction(
LedgerEntry.Transaction(
optSubmitterInfo.map(_.commandId),
@ -240,7 +242,8 @@ class JdbcIndexer private[index] (
mappedDisclosure
),
mappedLocalImplicitDisclosure,
blindingInfo.globalImplicitDisclosure
blindingInfo.globalImplicitDisclosure,
divulgedContracts.map(c => c.contractId -> c.contractInst)
)
ledgerDao
.storeLedgerEntry(headRef, headRef + 1, externalOffset, pt)

View File

@ -25,7 +25,7 @@ import com.digitalasset.platform.sandbox.stores.ledger.ScenarioLoader.LedgerEntr
import com.digitalasset.platform.sandbox.stores.ledger._
import com.digitalasset.platform.sandbox.stores.ledger.sql.SqlStartMode
import com.digitalasset.platform.sandbox.stores.{
InMemoryActiveContracts,
InMemoryActiveLedgerState,
InMemoryPackageStore,
SandboxIndexAndWriteService
}
@ -53,7 +53,7 @@ object SandboxServer {
// if requested, initialize the ledger state with the given scenario
private def createInitialState(config: SandboxConfig, packageStore: InMemoryPackageStore)
: (InMemoryActiveContracts, ImmArray[LedgerEntryOrBump], Option[Instant]) = {
: (InMemoryActiveLedgerState, ImmArray[LedgerEntryOrBump], Option[Instant]) = {
// [[ScenarioLoader]] needs all the packages to be already compiled --
// make sure that that's the case
if (config.eagerPackageLoading || config.scenario.nonEmpty) {
@ -72,7 +72,7 @@ object SandboxServer {
}
}
config.scenario match {
case None => (InMemoryActiveContracts.empty, ImmArray.empty, None)
case None => (InMemoryActiveLedgerState.empty, ImmArray.empty, None)
case Some(scenario) =>
val (acs, records, ledgerTime) =
ScenarioLoader.fromScenario(packageStore, engine.compiledPackages(), scenario)

View File

@ -0,0 +1,110 @@
// Copyright (c) 2019 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.platform.sandbox.stores
import java.time.Instant
import com.daml.ledger.participant.state.v1.AbsoluteContractInst
import com.digitalasset.daml.lf.data.Ref.{Party, TransactionIdString}
import com.digitalasset.daml.lf.data.Relation.Relation
import com.digitalasset.daml.lf.transaction.Node.{GlobalKey, KeyWithMaintainers}
import com.digitalasset.daml.lf.value.Value
import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, ContractInst, VersionedValue}
import com.digitalasset.ledger.WorkflowId
import com.digitalasset.platform.sandbox.stores.ActiveLedgerState._
/**
* An abstract representation of the active ledger state:
* - Active contracts
* - Divulged contracts
* - Contract keys
* - Known parties
*
* The active ledger state is used for validating transactions,
* see [[ActiveLedgerStateManager]].
*
* The active ledger state could be derived from the transaction stream,
* we keep track of it explicitly for performance reasons.
*/
trait ActiveLedgerState[+Self] { this: ActiveLedgerState[Self] =>
/** Callback to query an active or divulged contract, used for transaction validation */
def lookupContract(cid: AbsoluteContractId): Option[Contract]
/** Callback to query a contract key, used for transaction validation */
def keyExists(key: GlobalKey): Boolean
/** Called when a new contract is created */
def addContract(c: ActiveContract, keyO: Option[GlobalKey]): Self
/** Called when the given contract is archived */
def removeContract(cid: AbsoluteContractId, keyO: Option[GlobalKey]): Self
/** Called once for each transaction with the set of parties found in that transaction.
* As the sandbox has an open world of parties, any party name mentioned in a transaction
* will implicitly add that name to the list of known parties.
*/
def addParties(parties: Set[Party]): Self
/** Note that this method is about divulging contracts _that have already been
* committed_. Implementors of [[ActiveLedgerState]] must take care to also store
* divulgence information already present in `ActiveContract#divulgences` in the `addContract`
* method.
*/
def divulgeAlreadyCommittedContracts(
transactionId: TransactionIdString,
global: Relation[AbsoluteContractId, Party],
referencedContracts: List[(Value.AbsoluteContractId, AbsoluteContractInst)]): Self
}
object ActiveLedgerState {
/** A contract that is part of the [[ActiveLedgerState]].
* Depending on where the contract came from, other metadata may be available.
*/
sealed abstract class Contract {
def id: Value.AbsoluteContractId
def contract: ContractInst[VersionedValue[AbsoluteContractId]]
/** For each party, the transaction id at which the contract was divulged */
def divulgences: Map[Party, TransactionIdString]
/** Returns the new divulgences after the contract has been divulged to the given parties at the given transaction */
def divulgeTo(
parties: Set[Party],
transactionId: TransactionIdString): Map[Party, TransactionIdString] =
parties.foldLeft(divulgences)((m, e) => if (m.contains(e)) m else m + (e -> transactionId))
}
/**
* For divulged contracts, we only know their contract argument, but no other metadata.
* Note also that a ledger node may not be notified when a divulged contract gets archived.
*
* These contracts are only used for transaction validation, they are not part of the active contract set.
*/
final case class DivulgedContract(
id: Value.AbsoluteContractId,
contract: ContractInst[VersionedValue[AbsoluteContractId]],
/** For each party, the transaction id at which the contract was divulged */
divulgences: Map[Party, TransactionIdString],
) extends Contract
/**
* For active contracts, we know all metadata.
*/
final case class ActiveContract(
id: Value.AbsoluteContractId,
let: Instant, // time when the contract was committed
transactionId: TransactionIdString, // transaction id where the contract originates
workflowId: Option[WorkflowId], // workflow id from where the contract originates
contract: ContractInst[VersionedValue[AbsoluteContractId]],
witnesses: Set[Party],
divulgences: Map[Party, TransactionIdString], // for each party, the transaction id at which the contract was divulged
key: Option[KeyWithMaintainers[VersionedValue[AbsoluteContractId]]],
signatories: Set[Party],
observers: Set[Party],
agreementText: String)
extends Contract
}

View File

@ -5,14 +5,16 @@ package com.digitalasset.platform.sandbox.stores
import java.time.Instant
import com.daml.ledger.participant.state.v1.AbsoluteContractInst
import com.digitalasset.daml.lf.data.Ref.{Party, TransactionIdString}
import com.digitalasset.daml.lf.data.Relation.Relation
import com.digitalasset.daml.lf.transaction.Node.{GlobalKey, KeyWithMaintainers}
import com.digitalasset.daml.lf.transaction.Node.GlobalKey
import com.digitalasset.daml.lf.transaction.{GenTransaction, Node => N}
import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, ContractInst, VersionedValue}
import com.digitalasset.daml.lf.value.Value
import com.digitalasset.daml.lf.value.Value.AbsoluteContractId
import com.digitalasset.ledger.WorkflowId
import com.digitalasset.platform.sandbox.services.transaction.SandboxEventIdFormatter
import com.digitalasset.platform.sandbox.stores.ActiveContracts._
import com.digitalasset.platform.sandbox.stores.ActiveLedgerState._
import com.digitalasset.platform.sandbox.stores.ledger.SequencingError
import com.digitalasset.platform.sandbox.stores.ledger.SequencingError.PredicateType.{
Exercise,
@ -25,16 +27,23 @@ import com.digitalasset.platform.sandbox.stores.ledger.SequencingError.{
TimeBeforeError
}
class ActiveContractsManager[ACS](initialState: => ACS)(implicit ACS: ACS => ActiveContracts[ACS]) {
/**
* A helper for updating an [[ActiveLedgerState]] with new transactions:
* - Validates the transaction against the [[ActiveLedgerState]].
* - Updates the [[ActiveLedgerState].
*/
class ActiveLedgerStateManager[ALS](initialState: => ALS)(
implicit ACS: ALS => ActiveLedgerState[ALS]) {
private case class AddTransactionState(
acc: Option[ACS],
acc: Option[ALS],
errs: Set[SequencingError],
parties: Set[Party]) {
parties: Set[Party],
archivedIds: Set[AbsoluteContractId]) {
def mapAcs(f: ACS => ACS): AddTransactionState = copy(acc = acc map f)
def mapAcs(f: ALS => ALS): AddTransactionState = copy(acc = acc map f)
def result: Either[Set[SequencingError], ACS] = {
def result: Either[Set[SequencingError], ALS] = {
acc match {
case None =>
if (errs.isEmpty) {
@ -52,12 +61,12 @@ class ActiveContractsManager[ACS](initialState: => ACS)(implicit ACS: ACS => Act
}
private object AddTransactionState {
def apply(acs: ACS): AddTransactionState =
AddTransactionState(Some(acs), Set(), Set.empty)
def apply(acs: ALS): AddTransactionState =
AddTransactionState(Some(acs), Set(), Set.empty, Set.empty)
}
/**
* A higher order function to update an abstract active contract set (ACS) with the effects of the given transaction.
* A higher order function to update an abstract active ledger state (ALS) with the effects of the given transaction.
* Makes sure that there are no double spends or timing errors.
*/
def addTransaction[Nid](
@ -67,26 +76,47 @@ class ActiveContractsManager[ACS](initialState: => ACS)(implicit ACS: ACS => Act
transaction: GenTransaction.WithTxValue[Nid, AbsoluteContractId],
explicitDisclosure: Relation[Nid, Party],
localImplicitDisclosure: Relation[Nid, Party],
globalImplicitDisclosure: Relation[AbsoluteContractId, Party])
: Either[Set[SequencingError], ACS] = {
globalImplicitDisclosure: Relation[AbsoluteContractId, Party],
referencedContracts: List[(Value.AbsoluteContractId, AbsoluteContractInst)])
: Either[Set[SequencingError], ALS] = {
// NOTE(RC): `globalImplicitDisclosure` was meant to refer to contracts created in previous transactions.
// However, because we have translated relative to absolute IDs at this point, `globalImplicitDisclosure`
// will also point to contracts created in the same transaction.
//
// This is dealt with as follows:
// - First, all transaction nodes are traversed without updating divulgence info.
// - When validating a fetch/exercise node, both the set of previously divulged contracts and
// the newly divulged contracts is used.
// - While traversing consuming exercise nodes, the set of all contracts archived in this transaction is collected.
// - Finally, divulgence information is updated using `globalImplicitDisclosure` minus the set of contracts
// archived in this transaction.
val st =
transaction
.fold[AddTransactionState](GenTransaction.TopDown, AddTransactionState(initialState)) {
case (ats @ AddTransactionState(None, _, _), _) => ats
case (ats @ AddTransactionState(Some(acc), errs, parties), (nodeId, node)) =>
// if some node requires a contract, check that we have that contract, and check that that contract is not
case (ats @ AddTransactionState(None, _, _, _), _) => ats
case (ats @ AddTransactionState(Some(acc), errs, parties, archivedIds), (nodeId, node)) =>
// If some node requires a contract, check that we have that contract, and check that that contract is not
// created after the current let.
def contractCheck(
cid: AbsoluteContractId,
predType: PredicateType): Option[SequencingError] =
acc lookupContract cid match {
case None => Some(InactiveDependencyError(cid, predType))
case Some(otherTx) =>
if (otherTx.let.isAfter(let)) {
Some(TimeBeforeError(cid, otherTx.let, let, predType))
case Some(otherContract: ActiveContract) =>
// Existing active contract, check its LET
if (otherContract.let.isAfter(let)) {
Some(TimeBeforeError(cid, otherContract.let, let, predType))
} else {
None
}
case Some(_: DivulgedContract) =>
// Contract divulged in the past
None
case None if referencedContracts.exists(_._1 == cid) =>
// Contract is going to be divulged in this transaction
None
case None =>
// Contract not known
Some(InactiveDependencyError(cid, predType))
}
node match {
@ -98,14 +128,23 @@ class ActiveContractsManager[ACS](initialState: => ACS)(implicit ACS: ACS => Act
AddTransactionState(
Some(acc),
contractCheck(absCoid, Fetch).fold(errs)(errs + _),
parties.union(nodeParties)
parties.union(nodeParties),
archivedIds
)
case nc: N.NodeCreate.WithTxValue[AbsoluteContractId] =>
val nodeParties = nc.signatories
.union(nc.stakeholders)
.union(nc.key.map(_.maintainers).getOrElse(Set.empty))
val absCoid = SandboxEventIdFormatter.makeAbsCoid(transactionId)(nc.coid)
val withoutStakeHolders = localImplicitDisclosure
.getOrElse(nodeId, Set.empty) diff nc.stakeholders
val withStakeHolders = localImplicitDisclosure
.getOrElse(nodeId, Set.empty)
assert(withoutStakeHolders == withStakeHolders)
val activeContract = ActiveContract(
id = absCoid,
let = let,
transactionId = transactionId,
workflowId = workflowId,
@ -125,14 +164,18 @@ class ActiveContractsManager[ACS](initialState: => ACS)(implicit ACS: ACS => Act
)
activeContract.key match {
case None =>
ats.copy(acc = Some(acc.addContract(absCoid, activeContract, None)))
ats.copy(acc = Some(acc.addContract(activeContract, None)))
case Some(key) =>
val gk = GlobalKey(activeContract.contract.template, key.key)
if (acc keyExists gk) {
AddTransactionState(None, errs + DuplicateKey(gk), parties.union(nodeParties))
AddTransactionState(
None,
errs + DuplicateKey(gk),
parties.union(nodeParties),
archivedIds)
} else {
ats.copy(
acc = Some(acc.addContract(absCoid, activeContract, Some(gk))),
acc = Some(acc.addContract(activeContract, Some(gk))),
parties = parties.union(nodeParties)
)
}
@ -145,14 +188,16 @@ class ActiveContractsManager[ACS](initialState: => ACS)(implicit ACS: ACS => Act
ats.copy(
errs = contractCheck(absCoid, Exercise).fold(errs)(errs + _),
acc = Some(if (ne.consuming) {
acc.removeContract(absCoid, (acc lookupContract absCoid).flatMap(_.key) match {
case None => None
case Some(key) => Some(GlobalKey(ne.templateId, key.key))
})
val keyO = (acc lookupContract absCoid)
.collect({ case c: ActiveContract => c })
.flatMap(_.key)
.map(key => GlobalKey(ne.templateId, key.key))
acc.removeContract(absCoid, keyO)
} else {
acc
}),
parties = parties.union(nodeParties)
parties = parties.union(nodeParties),
archivedIds = if (ne.consuming) archivedIds + absCoid else archivedIds
)
case nlkup: N.NodeLookupByKey.WithTxValue[AbsoluteContractId] =>
// NOTE(FM) we do not need to check anything, since
@ -162,47 +207,11 @@ class ActiveContractsManager[ACS](initialState: => ACS)(implicit ACS: ACS => Act
}
}
st.mapAcs(_ divulgeAlreadyCommittedContract (transactionId, globalImplicitDisclosure))
val divulgedContracts = globalImplicitDisclosure -- st.archivedIds
st.mapAcs(
_ divulgeAlreadyCommittedContracts (transactionId, divulgedContracts, referencedContracts))
.mapAcs(_ addParties st.parties)
.result
}
}
trait ActiveContracts[+Self] { this: ActiveContracts[Self] =>
def lookupContract(cid: AbsoluteContractId): Option[ActiveContract]
def keyExists(key: GlobalKey): Boolean
def addContract(cid: AbsoluteContractId, c: ActiveContract, keyO: Option[GlobalKey]): Self
def removeContract(cid: AbsoluteContractId, keyO: Option[GlobalKey]): Self
/** Called once for each transaction with the set of parties found in that transaction.
* As the sandbox has an open world of parties, any party name mentioned in a transaction
* will implicitly add that name to the list of known parties.
*/
def addParties(parties: Set[Party]): Self
/** Note that this method is about disclosing contracts _that have already been
* committed_. Implementors of `ActiveContracts` must take care to also store
* divulgence information already present in `ActiveContract#divulgences` in the `addContract`
* method.
*/
def divulgeAlreadyCommittedContract(
transactionId: TransactionIdString,
global: Relation[AbsoluteContractId, Party]): Self
}
object ActiveContracts {
case class ActiveContract(
let: Instant, // time when the contract was committed
transactionId: TransactionIdString, // transaction id where the contract originates
workflowId: Option[WorkflowId], // workflow id from where the contract originates
contract: ContractInst[VersionedValue[AbsoluteContractId]],
witnesses: Set[Party],
divulgences: Map[Party, TransactionIdString], // for each party, the transaction id at which the contract was divulged
key: Option[KeyWithMaintainers[VersionedValue[AbsoluteContractId]]],
signatories: Set[Party],
observers: Set[Party],
agreementText: String)
}

View File

@ -1,91 +0,0 @@
// Copyright (c) 2019 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.platform.sandbox.stores
import java.time.Instant
import com.digitalasset.daml.lf.data.Ref.{Party, TransactionIdString}
import com.digitalasset.daml.lf.data.Relation.Relation
import com.digitalasset.daml.lf.transaction.Node.GlobalKey
import com.digitalasset.daml.lf.transaction.GenTransaction
import com.digitalasset.daml.lf.value.Value.AbsoluteContractId
import com.digitalasset.ledger.WorkflowId
import com.digitalasset.ledger.api.domain.PartyDetails
import com.digitalasset.platform.sandbox.stores.ActiveContracts._
import com.digitalasset.platform.sandbox.stores.ledger.SequencingError
import scalaz.syntax.std.map._
case class InMemoryActiveContracts(
contracts: Map[AbsoluteContractId, ActiveContract],
keys: Map[GlobalKey, AbsoluteContractId],
parties: Map[Party, PartyDetails])
extends ActiveContracts[InMemoryActiveContracts] {
override def lookupContract(cid: AbsoluteContractId) = contracts.get(cid)
override def keyExists(key: GlobalKey) = keys.contains(key)
override def addContract(cid: AbsoluteContractId, c: ActiveContract, keyO: Option[GlobalKey]) =
keyO match {
case None => copy(contracts = contracts + (cid -> c))
case Some(key) =>
copy(contracts = contracts + (cid -> c), keys = keys + (key -> cid))
}
override def removeContract(cid: AbsoluteContractId, keyO: Option[GlobalKey]) = keyO match {
case None => copy(contracts = contracts - cid)
case Some(key) => copy(contracts = contracts - cid, keys = keys - key)
}
override def addParties(newParties: Set[Party]): InMemoryActiveContracts =
copy(parties = newParties.map(p => p -> PartyDetails(p, None, true)).toMap ++ parties)
override def divulgeAlreadyCommittedContract(
transactionId: TransactionIdString,
global: Relation[AbsoluteContractId, Party]): InMemoryActiveContracts =
if (global.nonEmpty)
copy(
contracts = contracts ++
contracts.intersectWith(global) { (ac, parties) =>
ac copy (divulgences = parties.foldLeft(ac.divulgences)((m, e) =>
if (m.contains(e)) m else m + (e -> transactionId)))
})
else this
private val acManager =
new ActiveContractsManager(this)
/** adds a transaction to the ActiveContracts, make sure that there are no double spends or
* timing errors. this check is leveraged to achieve higher concurrency, see LedgerState
*/
def addTransaction[Nid](
let: Instant,
transactionId: TransactionIdString,
workflowId: Option[WorkflowId],
transaction: GenTransaction.WithTxValue[Nid, AbsoluteContractId],
explicitDisclosure: Relation[Nid, Party],
localImplicitDisclosure: Relation[Nid, Party],
globalImplicitDisclosure: Relation[AbsoluteContractId, Party]
): Either[Set[SequencingError], InMemoryActiveContracts] =
acManager.addTransaction(
let,
transactionId,
workflowId,
transaction,
explicitDisclosure,
localImplicitDisclosure,
globalImplicitDisclosure)
/**
* Adds a new party to the list of known parties.
*/
def addParty(details: PartyDetails): InMemoryActiveContracts = {
assert(!parties.contains(details.party))
copy(parties = parties + (details.party -> details))
}
}
object InMemoryActiveContracts {
def empty: InMemoryActiveContracts = InMemoryActiveContracts(Map(), Map(), Map.empty)
}

View File

@ -0,0 +1,155 @@
// Copyright (c) 2019 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.platform.sandbox.stores
import java.time.Instant
import com.daml.ledger.participant.state.v1.AbsoluteContractInst
import com.digitalasset.daml.lf.data.Ref.{Party, TransactionIdString}
import com.digitalasset.daml.lf.data.Relation.Relation
import com.digitalasset.daml.lf.transaction.Node.GlobalKey
import com.digitalasset.daml.lf.transaction.GenTransaction
import com.digitalasset.daml.lf.value.Value
import com.digitalasset.daml.lf.value.Value.AbsoluteContractId
import com.digitalasset.ledger.WorkflowId
import com.digitalasset.ledger.api.domain.PartyDetails
import com.digitalasset.platform.sandbox.stores.ActiveLedgerState._
import com.digitalasset.platform.sandbox.stores.ledger.SequencingError
import scalaz.syntax.std.map._
case class InMemoryActiveLedgerState(
activeContracts: Map[AbsoluteContractId, ActiveContract],
divulgedContracts: Map[AbsoluteContractId, DivulgedContract],
keys: Map[GlobalKey, AbsoluteContractId],
parties: Map[Party, PartyDetails])
extends ActiveLedgerState[InMemoryActiveLedgerState] {
override def lookupContract(cid: AbsoluteContractId): Option[Contract] =
activeContracts.get(cid).orElse[Contract](divulgedContracts.get(cid))
override def keyExists(key: GlobalKey) = keys.contains(key)
/**
* Updates divulgence information on the given active contract with information
* from the already existing divulged contract.
*/
private def copyDivulgences(ac: ActiveContract, dc: DivulgedContract): ActiveContract =
ac.copy(divulgences = ac.divulgences.unionWith(dc.divulgences)((l, _) => l))
override def addContract(
c: ActiveContract,
keyO: Option[GlobalKey]): InMemoryActiveLedgerState = {
val newKeys = keyO match {
case None => keys
case Some(key) => keys + (key -> c.id)
}
divulgedContracts.get(c.id) match {
case None =>
copy(
activeContracts = activeContracts + (c.id -> c),
keys = newKeys
)
case Some(dc) =>
copy(
activeContracts = activeContracts + (c.id -> copyDivulgences(c, dc)),
divulgedContracts = divulgedContracts - c.id,
keys = newKeys
)
}
}
override def removeContract(cid: AbsoluteContractId, keyO: Option[GlobalKey]) = {
val newKeys = keyO match {
case None => keys
case Some(key) => keys - key
}
copy(
activeContracts = activeContracts - cid,
divulgedContracts = divulgedContracts - cid,
keys = newKeys
)
}
override def addParties(newParties: Set[Party]): InMemoryActiveLedgerState =
copy(parties = newParties.map(p => p -> PartyDetails(p, None, true)).toMap ++ parties)
override def divulgeAlreadyCommittedContracts(
transactionId: TransactionIdString,
global: Relation[AbsoluteContractId, Party],
referencedContracts: List[(Value.AbsoluteContractId, AbsoluteContractInst)])
: InMemoryActiveLedgerState =
if (global.nonEmpty) {
val referencedContractsM = referencedContracts.toMap
// Note: each entry in `global` can refer to either:
// - a known active contract, in which case its divulgence info is updated
// - a previously divulged contract, in which case its divulgence info is updated
// - an unknown contract, in which case a new divulged contract is created from the corresponding info in `referencedContracts`
val updatedAcs = activeContracts.intersectWith(global) { (ac, parties) =>
ac copy (divulgences = ac.divulgeTo(parties, transactionId))
}
val updatedDcs = divulgedContracts.intersectWith(global) { (dc, parties) =>
dc copy (divulgences = dc.divulgeTo(parties, transactionId))
}
val newDcs = global.foldLeft(Map.empty[AbsoluteContractId, DivulgedContract]) {
case (m, (cid, divulgeTo)) =>
if (divulgeTo.isEmpty || updatedAcs.contains(cid) || updatedDcs.contains(cid))
m
else
m + (cid -> DivulgedContract(
id = cid,
contract = referencedContractsM
.getOrElse(
cid,
sys.error(
s"Transaction $transactionId says it divulges contract ${cid.coid} to parties ${divulgeTo
.mkString(",")}, but that contract does not exist.")
),
divulgences = Map.empty ++ divulgeTo.map(p => p -> transactionId)
))
}
copy(
activeContracts = activeContracts ++ updatedAcs,
divulgedContracts = divulgedContracts ++ updatedDcs ++ newDcs
)
} else this
private val acManager =
new ActiveLedgerStateManager(this)
/** adds a transaction to the ActiveContracts, make sure that there are no double spends or
* timing errors. this check is leveraged to achieve higher concurrency, see LedgerState
*/
def addTransaction[Nid](
let: Instant,
transactionId: TransactionIdString,
workflowId: Option[WorkflowId],
transaction: GenTransaction.WithTxValue[Nid, AbsoluteContractId],
explicitDisclosure: Relation[Nid, Party],
localImplicitDisclosure: Relation[Nid, Party],
globalImplicitDisclosure: Relation[AbsoluteContractId, Party],
referencedContracts: List[(Value.AbsoluteContractId, AbsoluteContractInst)]
): Either[Set[SequencingError], InMemoryActiveLedgerState] =
acManager.addTransaction(
let,
transactionId,
workflowId,
transaction,
explicitDisclosure,
localImplicitDisclosure,
globalImplicitDisclosure,
referencedContracts)
/**
* Adds a new party to the list of known parties.
*/
def addParty(details: PartyDetails): InMemoryActiveLedgerState = {
assert(!parties.contains(details.party))
copy(parties = parties + (details.party -> details))
}
}
object InMemoryActiveLedgerState {
def empty: InMemoryActiveLedgerState =
InMemoryActiveLedgerState(Map.empty, Map.empty, Map.empty, Map.empty)
}

View File

@ -66,7 +66,7 @@ object SandboxIndexAndWriteService {
jdbcUrl: String,
timeModel: TimeModel,
timeProvider: TimeProvider,
acs: InMemoryActiveContracts,
acs: InMemoryActiveLedgerState,
ledgerEntries: ImmArray[LedgerEntryOrBump],
startMode: SqlStartMode,
queueDepth: Int,
@ -92,7 +92,7 @@ object SandboxIndexAndWriteService {
participantId: ParticipantId,
timeModel: TimeModel,
timeProvider: TimeProvider,
acs: InMemoryActiveContracts,
acs: InMemoryActiveLedgerState,
ledgerEntries: ImmArray[LedgerEntryOrBump],
templateStore: InMemoryPackageStore)(
implicit mat: Materializer,
@ -171,21 +171,20 @@ abstract class LedgerBackedIndexService(
ActiveContractSetSnapshot(
LedgerOffset.Absolute(LedgerString.fromLong(offset)),
acsStream
.mapConcat {
case (cId, ac) =>
val create = toUpdateEvent(cId, ac)
EventFilter
.byTemplates(filter)
.filterActiveContractWitnesses(create)
.map(create => ac.workflowId.map(domain.WorkflowId(_)) -> create)
.toList
.mapConcat { ac =>
val create = toUpdateEvent(ac.id, ac)
EventFilter
.byTemplates(filter)
.filterActiveContractWitnesses(create)
.map(create => ac.workflowId.map(domain.WorkflowId(_)) -> create)
.toList
}
)
}(mat.executionContext)
private def toUpdateEvent(
cId: Value.AbsoluteContractId,
ac: ActiveContracts.ActiveContract): AcsUpdateEvent.Create =
ac: ActiveLedgerState.ActiveContract): AcsUpdateEvent.Create =
AcsUpdateEvent.Create(
// we use absolute contract ids as event ids throughout the sandbox
domain.TransactionId(ac.transactionId),

View File

@ -20,8 +20,8 @@ import com.digitalasset.daml.lf.value.Value.AbsoluteContractId
import com.digitalasset.daml_lf.DamlLf.Archive
import com.digitalasset.ledger.api.domain.{LedgerId, PartyDetails}
import com.digitalasset.platform.sandbox.metrics.MetricsManager
import com.digitalasset.platform.sandbox.stores.ActiveContracts.ActiveContract
import com.digitalasset.platform.sandbox.stores.{InMemoryActiveContracts, InMemoryPackageStore}
import com.digitalasset.platform.sandbox.stores.ActiveLedgerState.Contract
import com.digitalasset.platform.sandbox.stores.{InMemoryActiveLedgerState, InMemoryPackageStore}
import com.digitalasset.platform.sandbox.stores.ledger.ScenarioLoader.LedgerEntryOrBump
import com.digitalasset.platform.sandbox.stores.ledger.inmemory.InMemoryLedger
import com.digitalasset.platform.sandbox.stores.ledger.sql.{
@ -64,7 +64,7 @@ trait ReadOnlyLedger extends AutoCloseable {
def snapshot(): Future[LedgerSnapshot]
def lookupContract(contractId: Value.AbsoluteContractId): Future[Option[ActiveContract]]
def lookupContract(contractId: Value.AbsoluteContractId): Future[Option[Contract]]
def lookupKey(key: GlobalKey): Future[Option[AbsoluteContractId]]
@ -85,7 +85,7 @@ trait ReadOnlyLedger extends AutoCloseable {
object Ledger {
type LedgerFactory = (InMemoryActiveContracts, Seq[LedgerEntry]) => Ledger
type LedgerFactory = (InMemoryActiveLedgerState, Seq[LedgerEntry]) => Ledger
/**
* Creates an in-memory ledger
@ -99,7 +99,7 @@ object Ledger {
def inMemory(
ledgerId: LedgerId,
timeProvider: TimeProvider,
acs: InMemoryActiveContracts,
acs: InMemoryActiveLedgerState,
packages: InMemoryPackageStore,
ledgerEntries: ImmArray[LedgerEntryOrBump]): Ledger =
new InMemoryLedger(ledgerId, timeProvider, acs, packages, ledgerEntries)
@ -120,7 +120,7 @@ object Ledger {
jdbcUrl: String,
ledgerId: LedgerId,
timeProvider: TimeProvider,
acs: InMemoryActiveContracts,
acs: InMemoryActiveLedgerState,
packages: InMemoryPackageStore,
ledgerEntries: ImmArray[LedgerEntryOrBump],
queueDepth: Int,

View File

@ -5,7 +5,6 @@ package com.digitalasset.platform.sandbox.stores.ledger
import akka.NotUsed
import akka.stream.scaladsl.Source
import com.digitalasset.daml.lf.value.Value.AbsoluteContractId
import com.digitalasset.platform.sandbox.stores.ActiveContracts.ActiveContract
import com.digitalasset.platform.sandbox.stores.ActiveLedgerState.ActiveContract
case class LedgerSnapshot(offset: Long, acs: Source[(AbsoluteContractId, ActiveContract), NotUsed])
case class LedgerSnapshot(offset: Long, acs: Source[ActiveContract, NotUsed])

View File

@ -17,7 +17,7 @@ import com.digitalasset.daml.lf.value.Value.AbsoluteContractId
import com.digitalasset.daml_lf.DamlLf.Archive
import com.digitalasset.ledger.api.domain.{LedgerId, PartyDetails}
import com.digitalasset.platform.sandbox.metrics.MetricsManager
import com.digitalasset.platform.sandbox.stores.ActiveContracts.ActiveContract
import com.digitalasset.platform.sandbox.stores.ActiveLedgerState.Contract
import scala.concurrent.Future
@ -33,8 +33,7 @@ private class MeteredReadOnlyLedger(ledger: ReadOnlyLedger, mm: MetricsManager)
override def snapshot(): Future[LedgerSnapshot] =
ledger.snapshot()
override def lookupContract(
contractId: Value.AbsoluteContractId): Future[Option[ActiveContract]] =
override def lookupContract(contractId: Value.AbsoluteContractId): Future[Option[Contract]] =
mm.timedFuture("Ledger:lookupContract", ledger.lookupContract(contractId))
override def lookupKey(key: GlobalKey): Future[Option[AbsoluteContractId]] =

View File

@ -9,18 +9,22 @@ import com.digitalasset.daml.lf.transaction.Node
import com.digitalasset.daml.lf.transaction.Transaction.{Value => TxValue}
import com.digitalasset.daml.lf.value.Value
import com.digitalasset.platform.common.util.{DirectExecutionContext => DEC}
import com.digitalasset.platform.sandbox.stores.ActiveContracts
import com.digitalasset.platform.sandbox.stores.ActiveLedgerState
import scala.concurrent.{ExecutionContext, Future}
class SandboxContractStore(ledger: ReadOnlyLedger) extends ContractStore {
private[this] def canSeeContract(
submitter: Party,
ac: ActiveContracts.ActiveContract): Boolean = {
// ^ only parties disclosed or divulged to can lookup; see https://github.com/digital-asset/daml/issues/10
// and https://github.com/digital-asset/daml/issues/751 .
Party fromString submitter exists (p => ac.witnesses(p) || ac.divulgences.contains(p))
}
private[this] def canSeeContract(submitter: Party, c: ActiveLedgerState.Contract): Boolean =
c match {
case ac: ActiveLedgerState.ActiveContract =>
// ^ only parties disclosed or divulged to can lookup; see https://github.com/digital-asset/daml/issues/10
// and https://github.com/digital-asset/daml/issues/751 .
Party fromString submitter exists (p => ac.witnesses(p) || ac.divulgences.contains(p))
case dc: ActiveLedgerState.DivulgedContract =>
// ^ only parties disclosed or divulged to can lookup; see https://github.com/digital-asset/daml/issues/10
// and https://github.com/digital-asset/daml/issues/751 .
Party fromString submitter exists (p => dc.divulgences.contains(p))
}
override def lookupActiveContract(submitter: Party, contractId: Value.AbsoluteContractId)
: Future[Option[Value.ContractInst[TxValue[Value.AbsoluteContractId]]]] =

View File

@ -12,7 +12,7 @@ import com.digitalasset.daml.lf.language.Ast.{DDataType, DValue, Definition}
import com.digitalasset.daml.lf.speedy.{ScenarioRunner, Speedy}
import com.digitalasset.daml.lf.types.{Ledger => L}
import com.digitalasset.daml.lf.value.Value.AbsoluteContractId
import com.digitalasset.platform.sandbox.stores.{InMemoryActiveContracts, InMemoryPackageStore}
import com.digitalasset.platform.sandbox.stores.{InMemoryActiveLedgerState, InMemoryPackageStore}
import org.slf4j.LoggerFactory
import com.digitalasset.daml.lf.transaction.GenTransaction
import com.digitalasset.daml.lf.types.Ledger.ScenarioTransactionId
@ -60,16 +60,16 @@ object ScenarioLoader {
def fromScenario(
packages: InMemoryPackageStore,
compiledPackages: CompiledPackages,
scenario: String): (InMemoryActiveContracts, ImmArray[LedgerEntryOrBump], Instant) = {
scenario: String): (InMemoryActiveLedgerState, ImmArray[LedgerEntryOrBump], Instant) = {
val (scenarioLedger, scenarioRef) = buildScenarioLedger(packages, compiledPackages, scenario)
// we store the tx id since later we need to recover how much to bump the
// ledger end by, and here the transaction id _is_ the ledger end.
val ledgerEntries =
new ArrayBuffer[(ScenarioTransactionId, LedgerEntry)](scenarioLedger.scenarioSteps.size)
type Acc = (InMemoryActiveContracts, Time.Timestamp, Option[ScenarioTransactionId])
type Acc = (InMemoryActiveLedgerState, Time.Timestamp, Option[ScenarioTransactionId])
val (acs, time, txId) =
scenarioLedger.scenarioSteps.iterator
.foldLeft[Acc]((InMemoryActiveContracts.empty, Time.Timestamp.Epoch, None)) {
.foldLeft[Acc]((InMemoryActiveLedgerState.empty, Time.Timestamp.Epoch, None)) {
case ((acs, time, mbOldTxId), (stepId @ _, step)) =>
executeScenarioStep(ledgerEntries, scenarioRef, acs, time, mbOldTxId, stepId, step)
}
@ -215,12 +215,12 @@ object ScenarioLoader {
private def executeScenarioStep(
ledger: ArrayBuffer[(ScenarioTransactionId, LedgerEntry)],
scenarioRef: Ref.DefinitionRef,
acs: InMemoryActiveContracts,
acs: InMemoryActiveLedgerState,
time: Time.Timestamp,
mbOldTxId: Option[ScenarioTransactionId],
stepId: Int,
step: L.ScenarioStep
): (InMemoryActiveContracts, Time.Timestamp, Option[ScenarioTransactionId]) = {
): (InMemoryActiveLedgerState, Time.Timestamp, Option[ScenarioTransactionId]) = {
step match {
case L.Commit(txId: ScenarioTransactionId, richTransaction: L.RichTransaction, _) =>
mbOldTxId match {
@ -252,7 +252,8 @@ object ScenarioLoader {
tx,
explicitDisclosure,
implicitDisclosure,
globalizedImplicitDisclosure) match {
globalizedImplicitDisclosure,
List.empty) match {
case Right(newAcs) =>
val recordTx = tx.mapNodeId(nodeIdWithHash)
val recordDisclosure = explicitDisclosure.map {

View File

@ -33,13 +33,14 @@ import com.digitalasset.ledger.api.domain.{
RejectionReason
}
import com.digitalasset.platform.sandbox.services.transaction.SandboxEventIdFormatter
import com.digitalasset.platform.sandbox.stores.ActiveLedgerState.ActiveContract
import com.digitalasset.platform.sandbox.stores.deduplicator.Deduplicator
import com.digitalasset.platform.sandbox.stores.ledger.LedgerEntry.{Checkpoint, Rejection}
import com.digitalasset.platform.sandbox.stores.ledger.ScenarioLoader.LedgerEntryOrBump
import com.digitalasset.platform.sandbox.stores.ledger.{Ledger, LedgerEntry, LedgerSnapshot}
import com.digitalasset.platform.sandbox.stores.{
ActiveContracts,
InMemoryActiveContracts,
ActiveLedgerState,
InMemoryActiveLedgerState,
InMemoryPackageStore
}
import org.slf4j.LoggerFactory
@ -53,7 +54,7 @@ import scala.util.{Failure, Success, Try}
class InMemoryLedger(
val ledgerId: LedgerId,
timeProvider: TimeProvider,
acs0: InMemoryActiveContracts,
acs0: InMemoryActiveLedgerState,
packageStoreInit: InMemoryPackageStore,
ledgerEntries: ImmArray[LedgerEntryOrBump])
extends Ledger {
@ -87,13 +88,15 @@ class InMemoryLedger(
// need to take the lock to make sure the two pieces of data are consistent.
override def snapshot(): Future[LedgerSnapshot] =
Future.successful(this.synchronized {
LedgerSnapshot(entries.ledgerEnd, Source(acs.contracts))
LedgerSnapshot(
entries.ledgerEnd,
Source.fromIterator[ActiveContract](() => acs.activeContracts.valuesIterator))
})
override def lookupContract(
contractId: AbsoluteContractId): Future[Option[ActiveContracts.ActiveContract]] =
contractId: AbsoluteContractId): Future[Option[ActiveLedgerState.Contract]] =
Future.successful(this.synchronized {
acs.contracts.get(contractId)
acs.activeContracts.get(contractId)
})
override def lookupKey(key: Node.GlobalKey): Future[Option[AbsoluteContractId]] =
@ -162,6 +165,7 @@ class InMemoryLedger(
blindingInfo.explicitDisclosure,
blindingInfo.localImplicitDisclosure,
blindingInfo.globalImplicitDisclosure,
List.empty
)
acsRes match {
case Left(err) =>

View File

@ -18,7 +18,7 @@ import com.digitalasset.ledger.api.domain.LedgerId
import com.digitalasset.platform.akkastreams.dispatcher.Dispatcher
import com.digitalasset.platform.akkastreams.dispatcher.SubSource.RangeSource
import com.digitalasset.platform.common.util.DirectExecutionContext
import com.digitalasset.platform.sandbox.stores.ActiveContracts
import com.digitalasset.platform.sandbox.stores.ActiveLedgerState
import com.digitalasset.platform.sandbox.stores.ledger.sql.dao.LedgerReadDao
import com.digitalasset.platform.sandbox.stores.ledger.{LedgerEntry, LedgerSnapshot, ReadOnlyLedger}
@ -53,16 +53,14 @@ class BaseLedger(val ledgerId: LedgerId, headAtInitialization: Long, ledgerDao:
// 3. A GetActiveContractsRequest comes in and we look at the latest ledger_end offset in the database. We will see 6 (from transaction B).
// 4. If we finish streaming the active contracts up to offset 6 before transaction A is properly inserted into the DB, the client will not see the contracts from transaction A
// The fix to that is to use the latest known headRef, which is updated AFTER a batch has been inserted completely.
//TODO (robert): SQL DAO does not know about ActiveContract, this method does a (trivial) mapping from DAO Contract to Ledger ActiveContract. Intended? The DAO layer was introduced its own Contract abstraction so it can also reason read archived ones if it's needed. In hindsight, this might be necessary at all so we could probably collapse the two
ledgerDao
.getActiveContractSnapshot(ledgerEnd)
.map(s => LedgerSnapshot(s.offset, s.acs.map(c => (c.contractId, c.toActiveContract))))(DEC)
.map(s => LedgerSnapshot(s.offset, s.acs))(DEC)
override def lookupContract(
contractId: AbsoluteContractId): Future[Option[ActiveContracts.ActiveContract]] =
contractId: AbsoluteContractId): Future[Option[ActiveLedgerState.Contract]] =
ledgerDao
.lookupActiveContract(contractId)
.map(_.map(c => c.toActiveContract))(DEC)
.lookupActiveOrDivulgedContract(contractId)
override def lookupTransaction(
transactionId: TransactionIdString): Future[Option[(Long, LedgerEntry.Transaction)]] =

View File

@ -38,7 +38,7 @@ import com.digitalasset.platform.sandbox.stores.ledger.sql.serialisation.{
}
import com.digitalasset.platform.sandbox.stores.ledger.sql.util.DbDispatcher
import com.digitalasset.platform.sandbox.stores.ledger.{Ledger, LedgerEntry}
import com.digitalasset.platform.sandbox.stores.{InMemoryActiveContracts, InMemoryPackageStore}
import com.digitalasset.platform.sandbox.stores.{InMemoryActiveLedgerState, InMemoryPackageStore}
import org.slf4j.LoggerFactory
import scalaz.syntax.tag._
@ -69,7 +69,7 @@ object SqlLedger {
jdbcUrl: String,
ledgerId: Option[LedgerId],
timeProvider: TimeProvider,
acs: InMemoryActiveContracts,
acs: InMemoryActiveLedgerState,
packages: InMemoryPackageStore,
initialLedgerEntries: ImmArray[LedgerEntryOrBump],
queueDepth: Int,
@ -265,7 +265,8 @@ private class SqlLedger(
mappedDisclosure
),
mappedLocalImplicitDisclosure,
blindingInfo.globalImplicitDisclosure
blindingInfo.globalImplicitDisclosure,
List.empty
)
}
}
@ -343,7 +344,7 @@ private class SqlLedgerFactory(ledgerDao: LedgerDao) {
initialLedgerId: Option[LedgerId],
timeProvider: TimeProvider,
startMode: SqlStartMode,
acs: InMemoryActiveContracts,
acs: InMemoryActiveLedgerState,
packages: InMemoryPackageStore,
initialLedgerEntries: ImmArray[LedgerEntryOrBump],
queueDepth: Int,
@ -382,7 +383,7 @@ private class SqlLedgerFactory(ledgerDao: LedgerDao) {
private def initialize(
initialLedgerId: Option[LedgerId],
timeProvider: TimeProvider,
acs: InMemoryActiveContracts,
acs: InMemoryActiveLedgerState,
packages: InMemoryPackageStore,
initialLedgerEntries: ImmArray[LedgerEntryOrBump]): Future[LedgerId] = {
// Note that here we only store the ledger entry and we do not update anything else, such as the
@ -415,9 +416,7 @@ private class SqlLedgerFactory(ledgerDao: LedgerDao) {
s"Initializing ledger with ${initialLedgerEntries.length} ledger entries")
}
val contracts = acs.contracts
.map(f => Contract.fromActiveContract(f._1, f._2))
.toList
val contracts = acs.activeContracts.values.toList
val initialLedgerEnd = 0L
val entriesWithOffset = initialLedgerEntries.foldLeft(

View File

@ -13,18 +13,24 @@ import anorm.SqlParser._
import anorm.ToStatement.optionToStatement
import anorm.{AkkaStream, BatchSql, Macro, NamedParameter, RowParser, SQL, SqlParser}
import com.daml.ledger.participant.state.index.v2.PackageDetails
import com.daml.ledger.participant.state.v1.TransactionId
import com.daml.ledger.participant.state.v1.{AbsoluteContractInst, TransactionId}
import com.digitalasset.daml.lf.archive.Decode
import com.digitalasset.daml.lf.data.Ref._
import com.digitalasset.daml.lf.data.Relation.Relation
import com.digitalasset.daml.lf.transaction.Node
import com.digitalasset.daml.lf.transaction.Node.{GlobalKey, KeyWithMaintainers, NodeCreate}
import com.digitalasset.daml.lf.value.Value
import com.digitalasset.daml.lf.value.Value.AbsoluteContractId
import com.digitalasset.daml_lf.DamlLf.Archive
import com.digitalasset.ledger._
import com.digitalasset.ledger.api.domain.RejectionReason._
import com.digitalasset.ledger.api.domain.{LedgerId, PartyDetails, RejectionReason}
import com.digitalasset.platform.common.util.DirectExecutionContext
import com.digitalasset.platform.sandbox.stores.ActiveLedgerState.{
ActiveContract,
Contract,
DivulgedContract
}
import com.digitalasset.platform.sandbox.stores._
import com.digitalasset.platform.sandbox.stores.ledger.LedgerEntry
import com.digitalasset.platform.sandbox.stores.ledger.LedgerEntry._
@ -172,7 +178,7 @@ private class JdbcLedgerDao(
override def lookupKey(key: Node.GlobalKey): Future[Option[AbsoluteContractId]] =
dbDispatcher.executeSql("lookup contract by key")(implicit conn => selectContractKey(key))
private def storeContract(offset: Long, contract: Contract)(
private def storeContract(offset: Long, contract: ActiveContract)(
implicit connection: Connection): Unit = storeContracts(offset, List(contract))
private def archiveContract(offset: Long, cid: AbsoluteContractId)(
@ -185,8 +191,11 @@ private class JdbcLedgerDao(
.execute()
private val SQL_INSERT_CONTRACT =
"""insert into contracts(id, transaction_id, workflow_id, package_id, name, create_offset, contract, key)
|values({id}, {transaction_id}, {workflow_id}, {package_id}, {name}, {create_offset}, {contract}, {key})""".stripMargin
"""insert into contracts(id, transaction_id, workflow_id, package_id, name, create_offset, key)
|values({id}, {transaction_id}, {workflow_id}, {package_id}, {name}, {create_offset}, {key})""".stripMargin
private val SQL_INSERT_CONTRACT_DATA =
"insert into contract_data(id, contract) values({id}, {contract})"
private val SQL_INSERT_CONTRACT_WITNESS =
"insert into contract_witnesses(contract_id, witness) values({contract_id}, {witness})"
@ -194,7 +203,7 @@ private class JdbcLedgerDao(
private val SQL_INSERT_CONTRACT_KEY_MAINTAINERS =
"insert into contract_key_maintainers(contract_id, maintainer) values({contract_id}, {maintainer})"
private def storeContracts(offset: Long, contracts: immutable.Seq[Contract])(
private def storeContracts(offset: Long, contracts: immutable.Seq[ActiveContract])(
implicit connection: Connection): Unit = {
// An ACS contract contains several collections (e.g., witnesses or divulgences).
@ -206,22 +215,19 @@ private class JdbcLedgerDao(
.map(
c =>
Seq[NamedParameter](
"id" -> c.contractId.coid,
"id" -> c.id.coid,
"transaction_id" -> c.transactionId,
"workflow_id" -> c.workflowId,
"package_id" -> c.coinst.template.packageId,
"name" -> c.coinst.template.qualifiedName.toString,
"package_id" -> c.contract.template.packageId,
"name" -> c.contract.template.qualifiedName.toString,
"create_offset" -> offset,
"contract" -> contractSerializer
.serializeContractInstance(c.coinst)
.getOrElse(sys.error(s"failed to serialize contract! cid:${c.contractId.coid}")),
"key" -> c.key
.map(
k =>
valueSerializer
.serializeValue(k.key)
.getOrElse(sys.error(
s"failed to serialize contract key value! cid:${c.contractId.coid}")))
.getOrElse(
sys.error(s"failed to serialize contract key value! cid:${c.id.coid}")))
)
)
@ -230,6 +236,22 @@ private class JdbcLedgerDao(
namedContractParams
)
val namedContractDataParams = contracts
.map(
c =>
Seq[NamedParameter](
"id" -> c.id.coid,
"contract" -> contractSerializer
.serializeContractInstance(c.contract)
.getOrElse(sys.error(s"failed to serialize contract! cid:${c.id.coid}"))
)
)
executeBatchSql(
SQL_INSERT_CONTRACT_DATA,
namedContractDataParams
)
// Part 2: insert witnesses into the 'contract_witnesses' table
val namedWitnessesParams = contracts
.flatMap(
@ -237,7 +259,7 @@ private class JdbcLedgerDao(
c.witnesses.map(
w =>
Seq[NamedParameter](
"contract_id" -> c.contractId.coid,
"contract_id" -> c.id.coid,
"witness" -> w
))
)
@ -264,7 +286,7 @@ private class JdbcLedgerDao(
c.divulgences.map(
w =>
Seq[NamedParameter](
"contract_id" -> c.contractId.coid,
"contract_id" -> c.id.coid,
"party" -> w._1,
"transaction_id" -> w._2
))
@ -284,7 +306,7 @@ private class JdbcLedgerDao(
c.divulgences.map(
w =>
Seq[NamedParameter](
"contract_id" -> c.contractId.coid,
"contract_id" -> c.id.coid,
"party" -> w._1,
"ledger_offset" -> offset,
"transaction_id" -> c.transactionId
@ -310,7 +332,7 @@ private class JdbcLedgerDao(
k.maintainers.map(
p =>
Seq[NamedParameter](
"contract_id" -> c.contractId.coid,
"contract_id" -> c.id.coid,
"maintainer" -> p
)))
.getOrElse(Set.empty)
@ -356,7 +378,8 @@ private class JdbcLedgerDao(
offset: Long,
tx: Transaction,
localImplicitDisclosure: Relation[EventId, Party],
globalImplicitDisclosure: Relation[AbsoluteContractId, Party])(
globalImplicitDisclosure: Relation[AbsoluteContractId, Party],
divulgedContracts: List[(Value.AbsoluteContractId, AbsoluteContractInst)])(
implicit connection: Connection): Option[RejectionReason] = tx match {
case Transaction(
_,
@ -368,19 +391,16 @@ private class JdbcLedgerDao(
_,
transaction,
disclosure) =>
final class AcsStoreAcc extends ActiveContracts[AcsStoreAcc] {
final class AcsStoreAcc extends ActiveLedgerState[AcsStoreAcc] {
override def lookupContract(cid: AbsoluteContractId) =
lookupActiveContractSync(cid).map(_.toActiveContract)
lookupContractSync(cid)
override def keyExists(key: GlobalKey): Boolean = selectContractKey(key).isDefined
override def addContract(
cid: AbsoluteContractId,
c: ActiveContracts.ActiveContract,
keyO: Option[GlobalKey]) = {
storeContract(offset, Contract.fromActiveContract(cid, c))
keyO.foreach(key => storeContractKey(key, cid))
override def addContract(c: ActiveContract, keyO: Option[GlobalKey]): AcsStoreAcc = {
storeContract(offset, c)
keyO.foreach(key => storeContractKey(key, c.id))
this
}
@ -404,9 +424,10 @@ private class JdbcLedgerDao(
this
}
override def divulgeAlreadyCommittedContract(
override def divulgeAlreadyCommittedContracts(
transactionId: TransactionIdString,
global: Relation[AbsoluteContractId, Party]) = {
global: Relation[AbsoluteContractId, Party],
divulgedContracts: List[(Value.AbsoluteContractId, AbsoluteContractInst)]) = {
val divulgenceParams = global
.flatMap {
case (cid, parties) =>
@ -430,7 +451,7 @@ private class JdbcLedgerDao(
}
// this should be a class member field, we can't move it out yet as the functions above are closing over to the implicit Connection
val acsManager = new ActiveContractsManager(new AcsStoreAcc)
val acsManager = new ActiveLedgerStateManager(new AcsStoreAcc)
// Note: ACS is typed as Unit here, as the ACS is given implicitly by the current database state
// within the current SQL transaction. All of the given functions perform side effects to update the database.
@ -441,7 +462,8 @@ private class JdbcLedgerDao(
transaction,
disclosure,
localImplicitDisclosure,
globalImplicitDisclosure
globalImplicitDisclosure,
divulgedContracts
)
atr match {
@ -531,11 +553,20 @@ private class JdbcLedgerDao(
def insertEntry(le: PersistenceEntry)(implicit conn: Connection): PersistenceResponse =
le match {
case PersistenceEntry.Transaction(tx, localImplicitDisclosure, globalImplicitDisclosure) =>
case PersistenceEntry.Transaction(
tx,
localImplicitDisclosure,
globalImplicitDisclosure,
divulgedContracts) =>
Try {
storeTransaction(offset, tx)
updateActiveContractSet(offset, tx, localImplicitDisclosure, globalImplicitDisclosure)
updateActiveContractSet(
offset,
tx,
localImplicitDisclosure,
globalImplicitDisclosure,
divulgedContracts)
.flatMap { rejectionReason =>
// we need to rollback the existing sql transaction
conn.rollback()
@ -586,7 +617,7 @@ private class JdbcLedgerDao(
}
override def storeInitialState(
activeContracts: immutable.Seq[Contract],
activeContracts: immutable.Seq[ActiveContract],
ledgerEntries: immutable.Seq[(LedgerOffset, LedgerEntry)],
newLedgerEnd: LedgerOffset
): Future[Unit] = {
@ -779,16 +810,21 @@ private class JdbcLedgerDao(
}
private val ContractDataParser = (ledgerString("id")
~ ledgerString("transaction_id")
~ ledgerString("transaction_id").?
~ ledgerString("workflow_id").?
~ date("effective_at")
~ date("effective_at").?
~ binaryStream("contract")
~ binaryStream("key").?
~ binaryStream("transaction") map (flatten))
~ binaryStream("transaction").? map (flatten))
private val SQL_SELECT_CONTRACT =
SQL(
"select c.*, le.effective_at, le.transaction from contracts c inner join ledger_entries le on c.transaction_id = le.transaction_id where id={contract_id} and archive_offset is null ")
"""
|select cd.id, cd.contract, c.transaction_id, c.workflow_id, c.key, le.effective_at, le.transaction
|from contract_data cd
|left join contracts c on cd.id=c.id
|left join ledger_entries le on c.transaction_id = le.transaction_id
|where cd.id={contract_id} and c.archive_offset is null""".stripMargin)
private val SQL_SELECT_WITNESS =
SQL("select witness from contract_witnesses where contract_id={contract_id}")
@ -804,32 +840,55 @@ private class JdbcLedgerDao(
private val SQL_SELECT_KEY_MAINTAINERS =
SQL("select maintainer from contract_key_maintainers where contract_id={contract_id}")
private def lookupActiveContractSync(contractId: AbsoluteContractId)(
private def lookupContractSync(contractId: AbsoluteContractId)(
implicit conn: Connection): Option[Contract] =
SQL_SELECT_CONTRACT
.on("contract_id" -> contractId.coid)
.as(ContractDataParser.singleOpt)
.map(mapContractDetails)
override def lookupActiveContract(contractId: AbsoluteContractId): Future[Option[Contract]] =
override def lookupActiveOrDivulgedContract(
contractId: AbsoluteContractId): Future[Option[Contract]] =
dbDispatcher.executeSql(s"lookup active contract [${contractId.coid}]") { implicit conn =>
lookupActiveContractSync(contractId)
lookupContractSync(contractId)
}
private def mapContractDetails(
contractResult: (
ContractIdString,
TransactionIdString,
Option[TransactionIdString],
Option[WorkflowId],
Date,
Option[Date],
InputStream,
Option[InputStream],
InputStream))(implicit conn: Connection) =
Option[InputStream]))(implicit conn: Connection): Contract =
contractResult match {
case (coid, transactionId, workflowId, ledgerEffectiveTime, contractStream, keyStreamO, tx) =>
case (coid, None, None, None, contractStream, None, None) =>
val divulgences = lookupDivulgences(coid)
val absoluteCoid = AbsoluteContractId(coid)
DivulgedContract(
absoluteCoid,
contractSerializer
.deserializeContractInstance(ByteStreams.toByteArray(contractStream))
.getOrElse(sys.error(s"failed to deserialize contract! cid:$coid")),
divulgences
)
case (
coid,
Some(transactionId),
workflowId,
Some(ledgerEffectiveTime),
contractStream,
keyStreamO,
Some(tx)) =>
val witnesses = lookupWitnesses(coid)
val divulgences = lookupDivulgences(coid)
val absoluteCoid = AbsoluteContractId(coid)
val contractInstance = contractSerializer
.deserializeContractInstance(ByteStreams.toByteArray(contractStream))
.getOrElse(sys.error(s"failed to deserialize contract! cid:$coid"))
val (signatories, observers) =
transactionSerializer
@ -842,16 +901,14 @@ private class JdbcLedgerDao(
(signatories, stakeholders diff signatories)
} getOrElse sys.error(s"no create node in contract creating transaction! cid:$coid")
Contract(
ActiveContract(
absoluteCoid,
ledgerEffectiveTime.toInstant,
transactionId,
workflowId,
contractInstance,
witnesses,
divulgences,
contractSerializer
.deserializeContractInstance(ByteStreams.toByteArray(contractStream))
.getOrElse(sys.error(s"failed to deserialize contract! cid:$coid")),
keyStreamO.map(keyStream => {
val keyMaintainers = lookupKeyMaintainers(coid)
val keyValue = valueSerializer
@ -860,8 +917,13 @@ private class JdbcLedgerDao(
KeyWithMaintainers(keyValue, keyMaintainers)
}),
signatories,
observers
observers,
contractInstance.agreementText
)
case (_, _, _, _, _, _, _) =>
sys.error(
"mapContractDetails called with partial data, can not map to either active or divulged contract")
}
private def lookupWitnesses(coid: String)(implicit conn: Connection): Set[Party] =
@ -932,7 +994,12 @@ private class JdbcLedgerDao(
private val SQL_SELECT_ACTIVE_CONTRACTS =
SQL(
"select c.*, le.effective_at, le.transaction from contracts c inner join ledger_entries le on c.transaction_id = le.transaction_id where create_offset <= {offset} and (archive_offset is null or archive_offset > {offset})")
"""
|select cd.id, cd.contract, c.transaction_id, c.workflow_id, c.key, le.effective_at, le.transaction
|from contracts c
|inner join contract_data cd on c.id = cd.id
|inner join ledger_entries le on c.transaction_id = le.transaction_id
|where create_offset <= {offset} and (archive_offset is null or archive_offset > {offset})""".stripMargin)
override def getActiveContractSnapshot(untilExclusive: LedgerOffset)(
implicit mat: Materializer): Future[LedgerSnapshot] = {
@ -945,7 +1012,11 @@ private class JdbcLedgerDao(
// it's ok to not have query isolation as witnesses cannot change once we saved them
dbDispatcher
.executeSql(s"load contract details ${contractResult._1}") { implicit conn =>
mapContractDetails(contractResult)
mapContractDetails(contractResult) match {
case ac: ActiveContract => ac
case _: DivulgedContract =>
sys.error("Impossible: SQL_SELECT_ACTIVE_CONTRACTS returned a divulged contract")
}
}
}
}.mapMaterializedValue(_.map(_ => Done)(DirectExecutionContext))
@ -1103,6 +1174,7 @@ private class JdbcLedgerDao(
|truncate ledger_entries cascade;
|truncate disclosures cascade;
|truncate contracts cascade;
|truncate contract_data cascade;
|truncate contract_witnesses cascade;
|truncate contract_key_maintainers cascade;
|truncate parameters cascade;

View File

@ -3,70 +3,28 @@
package com.digitalasset.platform.sandbox.stores.ledger.sql.dao
import java.time.Instant
import akka.NotUsed
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import com.daml.ledger.participant.state.index.v2.PackageDetails
import com.daml.ledger.participant.state.v1.TransactionId
import com.daml.ledger.participant.state.v1.{AbsoluteContractInst, TransactionId}
import com.digitalasset.daml.lf.data.Ref.{LedgerString, PackageId, Party}
import com.digitalasset.daml.lf.data.Relation.Relation
import com.digitalasset.daml.lf.transaction.Node
import com.digitalasset.daml.lf.transaction.Node.KeyWithMaintainers
import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, ContractInst, VersionedValue}
import com.digitalasset.daml.lf.value.Value
import com.digitalasset.daml.lf.value.Value.AbsoluteContractId
import com.digitalasset.daml_lf.DamlLf.Archive
import com.digitalasset.ledger._
import com.digitalasset.ledger.api.domain.{LedgerId, PartyDetails}
import com.digitalasset.platform.common.util.DirectExecutionContext
import com.digitalasset.platform.sandbox.metrics.MetricsManager
import com.digitalasset.platform.sandbox.stores.ActiveContracts.ActiveContract
import com.digitalasset.platform.sandbox.stores.ActiveLedgerState.{ActiveContract, Contract}
import com.digitalasset.platform.sandbox.stores.ledger.LedgerEntry
import com.digitalasset.platform.sandbox.stores.ledger.LedgerEntry.Transaction
import scala.collection.immutable
import scala.concurrent.Future
final case class Contract(
contractId: AbsoluteContractId,
let: Instant,
transactionId: TransactionId,
workflowId: Option[WorkflowId],
witnesses: Set[Party],
divulgences: Map[Party, TransactionId],
coinst: ContractInst[VersionedValue[AbsoluteContractId]],
key: Option[KeyWithMaintainers[VersionedValue[AbsoluteContractId]]],
signatories: Set[Party],
observers: Set[Party]) {
def toActiveContract: ActiveContract =
ActiveContract(
let,
transactionId,
workflowId,
coinst,
witnesses,
divulgences,
key,
signatories,
observers,
coinst.agreementText)
}
object Contract {
def fromActiveContract(cid: AbsoluteContractId, ac: ActiveContract): Contract =
Contract(
cid,
ac.let,
ac.transactionId,
ac.workflowId,
ac.witnesses,
ac.divulgences,
ac.contract,
ac.key,
ac.signatories,
ac.observers)
}
/**
* Every time the ledger persists a transactions, the active contract set (ACS) is updated.
* Updating the ACS requires knowledge of blinding info, which is not included in LedgerEntry.Transaction.
@ -79,7 +37,8 @@ object PersistenceEntry {
final case class Transaction(
entry: LedgerEntry.Transaction,
localImplicitDisclosure: Relation[EventId, Party],
globalImplicitDisclosure: Relation[AbsoluteContractId, Party]
globalImplicitDisclosure: Relation[AbsoluteContractId, Party],
divulgedContracts: List[(Value.AbsoluteContractId, AbsoluteContractInst)]
) extends PersistenceEntry
final case class Checkpoint(entry: LedgerEntry.Checkpoint) extends PersistenceEntry
}
@ -94,7 +53,7 @@ object PersistenceResponse {
}
case class LedgerSnapshot(offset: Long, acs: Source[Contract, NotUsed])
case class LedgerSnapshot(offset: Long, acs: Source[ActiveContract, NotUsed])
trait LedgerReadDao extends AutoCloseable {
@ -111,8 +70,8 @@ trait LedgerReadDao extends AutoCloseable {
/** Looks up the current external ledger end offset*/
def lookupExternalLedgerEnd(): Future[Option[LedgerString]]
/** Looks up an active contract. Archived contracts must not be returned by this method */
def lookupActiveContract(contractId: AbsoluteContractId): Future[Option[Contract]]
/** Looks up an active or divulged contract. Archived contracts must not be returned by this method */
def lookupActiveOrDivulgedContract(contractId: AbsoluteContractId): Future[Option[Contract]]
/**
* Looks up a LedgerEntry at a given offset
@ -217,7 +176,7 @@ trait LedgerWriteDao extends AutoCloseable {
* @return Ok when the operation was successful
*/
def storeInitialState(
activeContracts: immutable.Seq[Contract],
activeContracts: immutable.Seq[ActiveContract],
ledgerEntries: immutable.Seq[(LedgerOffset, LedgerEntry)],
newLedgerEnd: LedgerOffset
): Future[Unit]

View File

@ -14,6 +14,7 @@ import com.digitalasset.daml.lf.value.Value
import com.digitalasset.daml_lf.DamlLf.Archive
import com.digitalasset.ledger.api.domain.{LedgerId, PartyDetails}
import com.digitalasset.platform.sandbox.metrics.MetricsManager
import com.digitalasset.platform.sandbox.stores.ActiveLedgerState.{ActiveContract, Contract}
import com.digitalasset.platform.sandbox.stores.ledger.LedgerEntry
import scala.collection.immutable
@ -31,9 +32,11 @@ private class MeteredLedgerReadDao(ledgerDao: LedgerReadDao, mm: MetricsManager)
override def lookupExternalLedgerEnd(): Future[Option[LedgerString]] =
mm.timedFuture("LedgerDao:lookupExternalLedgerEnd", ledgerDao.lookupExternalLedgerEnd())
override def lookupActiveContract(
override def lookupActiveOrDivulgedContract(
contractId: Value.AbsoluteContractId): Future[Option[Contract]] =
mm.timedFuture("LedgerDao:lookupActiveContract", ledgerDao.lookupActiveContract(contractId))
mm.timedFuture(
"LedgerDao:lookupActiveContract",
ledgerDao.lookupActiveOrDivulgedContract(contractId))
override def lookupLedgerEntry(offset: Long): Future[Option[LedgerEntry]] =
mm.timedFuture("LedgerDao:lookupLedgerEntry", ledgerDao.lookupLedgerEntry(offset))
@ -81,7 +84,7 @@ private class MeteredLedgerDao(ledgerDao: LedgerDao, mm: MetricsManager)
ledgerDao.storeLedgerEntry(offset, newLedgerEnd, externalOffset, ledgerEntry))
override def storeInitialState(
activeContracts: immutable.Seq[Contract],
activeContracts: immutable.Seq[ActiveContract],
ledgerEntries: immutable.Seq[(LedgerOffset, LedgerEntry)],
newLedgerEnd: LedgerOffset
): Future[Unit] =

View File

@ -13,20 +13,22 @@ import akka.stream.scaladsl.Source
import akka.NotUsed
import anorm.SqlParser._
import anorm.{BatchSql, Macro, NamedParameter, RowParser, SQL, SqlParser}
import com.daml.ledger.participant.state.v1.AbsoluteContractInst
import com.digitalasset.daml.lf.data.Ref
import com.digitalasset.daml.lf.data.Ref._
import com.digitalasset.daml.lf.data.Relation.Relation
import com.digitalasset.daml.lf.engine.Blinding
import com.digitalasset.daml.lf.transaction.Transaction
import com.digitalasset.daml.lf.transaction.Node.{GlobalKey, KeyWithMaintainers, NodeCreate}
import com.digitalasset.daml.lf.value.Value
import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, ContractId}
import com.digitalasset.ledger._
import com.digitalasset.ledger.api.domain.RejectionReason
import com.digitalasset.ledger.api.domain.RejectionReason._
import com.digitalasset.platform.sandbox.services.transaction.SandboxEventIdFormatter
import com.digitalasset.platform.sandbox.stores.ActiveLedgerState.{ActiveContract, Contract}
import com.digitalasset.platform.sandbox.stores._
import com.digitalasset.platform.sandbox.stores.ledger.LedgerEntry
import com.digitalasset.platform.sandbox.stores.ledger.sql.dao.Contract
import com.digitalasset.platform.sandbox.stores.ledger.sql.serialisation.{
ContractSerializer,
KeyHasher,
@ -126,7 +128,7 @@ class V2_1__Rebuild_Acs extends BaseJavaMigration {
.as(ledgerString("contract_id").singleOpt)
.map(AbsoluteContractId)
private def storeContract(offset: Long, contract: Contract)(
private def storeContract(offset: Long, contract: ActiveContract)(
implicit connection: Connection): Unit = storeContracts(offset, List(contract))
private def archiveContract(offset: Long, cid: AbsoluteContractId)(
@ -148,7 +150,7 @@ class V2_1__Rebuild_Acs extends BaseJavaMigration {
private val SQL_INSERT_CONTRACT_KEY_MAINTAINERS =
"insert into contract_key_maintainers(contract_id, maintainer) values({contract_id}, {maintainer})"
private def storeContracts(offset: Long, contracts: immutable.Seq[Contract])(
private def storeContracts(offset: Long, contracts: immutable.Seq[ActiveContract])(
implicit connection: Connection): Unit = {
// A ACS contract contaixns several collections (e.g., witnesses or divulgences).
@ -160,22 +162,22 @@ class V2_1__Rebuild_Acs extends BaseJavaMigration {
.map(
c =>
Seq[NamedParameter](
"id" -> c.contractId.coid,
"id" -> c.id.coid,
"transaction_id" -> c.transactionId,
"workflow_id" -> c.workflowId.getOrElse(""),
"package_id" -> c.coinst.template.packageId,
"name" -> c.coinst.template.qualifiedName.toString,
"package_id" -> c.contract.template.packageId,
"name" -> c.contract.template.qualifiedName.toString,
"create_offset" -> offset,
"contract" -> contractSerializer
.serializeContractInstance(c.coinst)
.getOrElse(sys.error(s"failed to serialize contract! cid:${c.contractId.coid}")),
.serializeContractInstance(c.contract)
.getOrElse(sys.error(s"failed to serialize contract! cid:${c.id.coid}")),
"key" -> c.key
.map(
k =>
valueSerializer
.serializeValue(k.key)
.getOrElse(sys.error(
s"failed to serialize contract key value! cid:${c.contractId.coid}")))
.getOrElse(
sys.error(s"failed to serialize contract key value! cid:${c.id.coid}")))
)
)
@ -191,7 +193,7 @@ class V2_1__Rebuild_Acs extends BaseJavaMigration {
c.witnesses.map(
w =>
Seq[NamedParameter](
"contract_id" -> c.contractId.coid,
"contract_id" -> c.id.coid,
"witness" -> w
))
)
@ -218,7 +220,7 @@ class V2_1__Rebuild_Acs extends BaseJavaMigration {
c.divulgences.map(
w =>
Seq[NamedParameter](
"contract_id" -> c.contractId.coid,
"contract_id" -> c.id.coid,
"party" -> (w._1: String),
"transaction_id" -> w._2
))
@ -238,7 +240,7 @@ class V2_1__Rebuild_Acs extends BaseJavaMigration {
c.divulgences.map(
w =>
Seq[NamedParameter](
"contract_id" -> c.contractId.coid,
"contract_id" -> c.id.coid,
"party" -> (w._1: String),
"ledger_offset" -> offset,
"transaction_id" -> c.transactionId
@ -264,7 +266,7 @@ class V2_1__Rebuild_Acs extends BaseJavaMigration {
k.maintainers.map(
p =>
Seq[NamedParameter](
"contract_id" -> c.contractId.coid,
"contract_id" -> c.id.coid,
"maintainer" -> p
)))
.getOrElse(Set.empty)
@ -344,19 +346,16 @@ class V2_1__Rebuild_Acs extends BaseJavaMigration {
val mappedDisclosure = explicitDisclosure
.mapValues(parties => parties.map(Party.assertFromString))
final class AcsStoreAcc extends ActiveContracts[AcsStoreAcc] {
final class AcsStoreAcc extends ActiveLedgerState[AcsStoreAcc] {
override def lookupContract(cid: AbsoluteContractId) =
lookupActiveContractSync(cid).map(_.toActiveContract)
lookupActiveContractSync(cid)
override def keyExists(key: GlobalKey): Boolean = selectContractKey(key).isDefined
override def addContract(
cid: AbsoluteContractId,
c: ActiveContracts.ActiveContract,
keyO: Option[GlobalKey]) = {
storeContract(offset, Contract.fromActiveContract(cid, c))
keyO.foreach(key => storeContractKey(key, cid))
override def addContract(c: ActiveLedgerState.ActiveContract, keyO: Option[GlobalKey]) = {
storeContract(offset, c)
keyO.foreach(key => storeContractKey(key, c.id))
this
}
@ -371,9 +370,10 @@ class V2_1__Rebuild_Acs extends BaseJavaMigration {
this
}
override def divulgeAlreadyCommittedContract(
override def divulgeAlreadyCommittedContracts(
transactionId: TransactionIdString,
global: Relation[AbsoluteContractId, Party]) = {
global: Relation[AbsoluteContractId, Party],
referencedContracts: List[(Value.AbsoluteContractId, AbsoluteContractInst)]) = {
val divulgenceParams = global
.flatMap {
case (cid, parties) =>
@ -397,7 +397,7 @@ class V2_1__Rebuild_Acs extends BaseJavaMigration {
}
// this should be a class member field, we can't move it out yet as the functions above are closing over to the implicit Connection
val acsManager = new ActiveContractsManager(new AcsStoreAcc)
val acsManager = new ActiveLedgerStateManager(new AcsStoreAcc)
// Note: ACS is typed as Unit here, as the ACS is given implicitly by the current database state
// within the current SQL transaction. All of the given functions perform side effects to update the database.
@ -408,7 +408,8 @@ class V2_1__Rebuild_Acs extends BaseJavaMigration {
transaction,
mappedDisclosure,
localImplicitDisclosure,
globalImplicitDisclosure
globalImplicitDisclosure,
List.empty
)
atr match {
@ -629,6 +630,9 @@ class V2_1__Rebuild_Acs extends BaseJavaMigration {
.as(ContractDataParser.singleOpt)
.map(mapContractDetails)
/** Note: at the time this migration was written, divulged contracts were not stored separately from active contracts.
* This method therefore always returns an ActiveContract.
*/
private def mapContractDetails(
contractResult: (
ContractIdString,
@ -637,12 +641,15 @@ class V2_1__Rebuild_Acs extends BaseJavaMigration {
Date,
InputStream,
Option[InputStream],
InputStream))(implicit conn: Connection) =
InputStream))(implicit conn: Connection): ActiveContract =
contractResult match {
case (coid, transactionId, workflowId, createdAt, contractStream, keyStreamO, tx) =>
val witnesses = lookupWitnesses(coid)
val divulgences = lookupDivulgences(coid)
val absoluteCoid = AbsoluteContractId(coid)
val contractInstance = contractSerializer
.deserializeContractInstance(ByteStreams.toByteArray(contractStream))
.getOrElse(sys.error(s"failed to deserialize contract! cid:$coid"))
val (signatories: Set[Ref.Party], observers: Set[Ref.Party]) =
transactionSerializer
@ -658,16 +665,14 @@ class V2_1__Rebuild_Acs extends BaseJavaMigration {
(Set.empty, Set.empty)
}
Contract(
ActiveContract(
absoluteCoid,
createdAt.toInstant,
transactionId,
Some(workflowId),
contractInstance,
witnesses,
divulgences,
contractSerializer
.deserializeContractInstance(ByteStreams.toByteArray(contractStream))
.getOrElse(sys.error(s"failed to deserialize contract! cid:$coid")),
keyStreamO.map(keyStream => {
val keyMaintainers = lookupKeyMaintainers(coid)
val keyValue = valueSerializer
@ -676,7 +681,8 @@ class V2_1__Rebuild_Acs extends BaseJavaMigration {
KeyWithMaintainers(keyValue, keyMaintainers)
}),
signatories,
observers
observers,
contractInstance.agreementText
)
}

View File

@ -8,7 +8,7 @@ import com.digitalasset.api.util.TimeProvider
import com.digitalasset.ledger.api.testing.utils.Resource
import com.digitalasset.platform.sandbox.metrics.MetricsManager
import com.digitalasset.platform.sandbox.persistence.{PostgresFixture, PostgresResource}
import com.digitalasset.platform.sandbox.stores.{InMemoryActiveContracts, InMemoryPackageStore}
import com.digitalasset.platform.sandbox.stores.{InMemoryActiveLedgerState, InMemoryPackageStore}
import com.digitalasset.platform.sandbox.stores.ledger.sql.SqlStartMode
import com.digitalasset.platform.sandbox.stores.ledger.Ledger
import com.digitalasset.daml.lf.data.ImmArray
@ -33,7 +33,7 @@ object LedgerResource {
def inMemory(
ledgerId: LedgerId,
timeProvider: TimeProvider,
acs: InMemoryActiveContracts = InMemoryActiveContracts.empty,
acs: InMemoryActiveLedgerState = InMemoryActiveLedgerState.empty,
packages: InMemoryPackageStore = InMemoryPackageStore.empty,
entries: ImmArray[LedgerEntryOrBump] = ImmArray.empty): Resource[Ledger] =
LedgerResource.resource(
@ -68,7 +68,7 @@ object LedgerResource {
postgres.value.jdbcUrl,
ledgerId,
timeProvider,
InMemoryActiveContracts.empty,
InMemoryActiveLedgerState.empty,
packages,
ImmArray.empty,
128,

View File

@ -17,7 +17,7 @@ import com.digitalasset.platform.sandbox.metrics.MetricsManager
import com.digitalasset.platform.sandbox.services.ApiSubmissionService
import com.digitalasset.platform.sandbox.stores.ledger.CommandExecutorImpl
import com.digitalasset.platform.sandbox.stores.{
InMemoryActiveContracts,
InMemoryActiveLedgerState,
InMemoryPackageStore,
SandboxIndexAndWriteService
}
@ -60,7 +60,7 @@ trait TestHelpers {
participantId,
TimeModel.reasonableDefault,
TimeProvider.Constant(Instant.EPOCH),
InMemoryActiveContracts.empty,
InMemoryActiveLedgerState.empty,
ImmArray.empty,
packageStore
)

View File

@ -34,6 +34,7 @@ import com.digitalasset.ledger.EventId
import com.digitalasset.ledger.api.domain.{LedgerId, RejectionReason}
import com.digitalasset.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import com.digitalasset.platform.sandbox.persistence.PostgresAroundAll
import com.digitalasset.platform.sandbox.stores.ActiveLedgerState.ActiveContract
import com.digitalasset.platform.sandbox.stores.ledger.LedgerEntry
import com.digitalasset.platform.sandbox.stores.ledger.sql.dao._
import com.digitalasset.platform.sandbox.stores.ledger.sql.migration.FlywayMigrations
@ -120,17 +121,18 @@ class JdbcLedgerDaoSpec
Set(alice)
)
val contract = Contract(
val contract = ActiveContract(
absCid,
let,
txId,
Some(workflowId),
contractInstance,
Set(alice, bob),
Map(alice -> txId, bob -> txId),
contractInstance,
Some(keyWithMaintainers),
Set(alice, bob),
Set.empty
Set.empty,
contractInstance.agreementText
)
val transaction = LedgerEntry.Transaction(
@ -157,7 +159,7 @@ class JdbcLedgerDaoSpec
Map(event1 -> Set[Party]("Alice", "Bob"), event2 -> Set[Party]("Alice", "In", "Chains"))
)
for {
result1 <- ledgerDao.lookupActiveContract(absCid)
result1 <- ledgerDao.lookupActiveOrDivulgedContract(absCid)
_ <- ledgerDao.storeLedgerEntry(
offset,
offset + 1,
@ -166,10 +168,13 @@ class JdbcLedgerDaoSpec
transaction,
Map.empty,
Map(
absCid -> Set(Ref.Party.assertFromString("Alice"), Ref.Party.assertFromString("Bob")))
absCid -> Set(
Ref.Party.assertFromString("Alice"),
Ref.Party.assertFromString("Bob"))),
List.empty
)
)
result2 <- ledgerDao.lookupActiveContract(absCid)
result2 <- ledgerDao.lookupActiveOrDivulgedContract(absCid)
externalLedgerEnd <- ledgerDao.lookupExternalLedgerEnd()
} yield {
result1 shouldEqual None
@ -351,7 +356,7 @@ class JdbcLedgerDaoSpec
offset,
offset + 1,
None,
PersistenceEntry.Transaction(transaction, Map.empty, Map.empty))
PersistenceEntry.Transaction(transaction, Map.empty, Map.empty, List.empty))
entry <- ledgerDao.lookupLedgerEntry(offset)
endingOffset <- ledgerDao.lookupLedgerEnd()
} yield {
@ -416,7 +421,7 @@ class JdbcLedgerDaoSpec
offset,
offset + 1,
None,
PersistenceEntry.Transaction(transaction, Map.empty, Map.empty))
PersistenceEntry.Transaction(transaction, Map.empty, Map.empty, List.empty))
entry <- ledgerDao.lookupLedgerEntry(offset)
endingOffset <- ledgerDao.lookupLedgerEnd()
} yield {
@ -512,7 +517,7 @@ class JdbcLedgerDaoSpec
offset,
offset + 1,
None,
PersistenceEntry.Transaction(t, Map.empty, Map.empty))
PersistenceEntry.Transaction(t, Map.empty, Map.empty, List.empty))
.map(_ => ())
}
@ -524,7 +529,7 @@ class JdbcLedgerDaoSpec
offset,
offset + 1,
None,
PersistenceEntry.Transaction(t, Map.empty, Map.empty))
PersistenceEntry.Transaction(t, Map.empty, Map.empty, List.empty))
.map(_ => ())
}

View File

@ -8,7 +8,7 @@ import com.digitalasset.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import com.digitalasset.platform.sandbox.MetricsAround
import com.digitalasset.platform.sandbox.persistence.PostgresAroundEach
import com.digitalasset.daml.lf.data.{ImmArray, Ref}
import com.digitalasset.platform.sandbox.stores.{InMemoryActiveContracts, InMemoryPackageStore}
import com.digitalasset.platform.sandbox.stores.{InMemoryActiveLedgerState, InMemoryPackageStore}
import org.scalatest.concurrent.{AsyncTimeLimitedTests, ScaledTimeSpans}
import org.scalatest.time.Span
import org.scalatest.{AsyncWordSpec, Matchers}
@ -37,7 +37,7 @@ class SqlLedgerSpec
jdbcUrl = postgresFixture.jdbcUrl,
ledgerId = None,
timeProvider = TimeProvider.UTC,
acs = InMemoryActiveContracts.empty,
acs = InMemoryActiveLedgerState.empty,
packages = InMemoryPackageStore.empty,
initialLedgerEntries = ImmArray.empty,
queueDepth
@ -53,7 +53,7 @@ class SqlLedgerSpec
jdbcUrl = postgresFixture.jdbcUrl,
ledgerId = Some(ledgerId),
timeProvider = TimeProvider.UTC,
acs = InMemoryActiveContracts.empty,
acs = InMemoryActiveLedgerState.empty,
packages = InMemoryPackageStore.empty,
initialLedgerEntries = ImmArray.empty,
queueDepth
@ -71,7 +71,7 @@ class SqlLedgerSpec
jdbcUrl = postgresFixture.jdbcUrl,
ledgerId = Some(ledgerId),
timeProvider = TimeProvider.UTC,
acs = InMemoryActiveContracts.empty,
acs = InMemoryActiveLedgerState.empty,
packages = InMemoryPackageStore.empty,
initialLedgerEntries = ImmArray.empty,
queueDepth
@ -81,7 +81,7 @@ class SqlLedgerSpec
jdbcUrl = postgresFixture.jdbcUrl,
ledgerId = Some(ledgerId),
timeProvider = TimeProvider.UTC,
acs = InMemoryActiveContracts.empty,
acs = InMemoryActiveLedgerState.empty,
packages = InMemoryPackageStore.empty,
initialLedgerEntries = ImmArray.empty,
queueDepth
@ -91,7 +91,7 @@ class SqlLedgerSpec
jdbcUrl = postgresFixture.jdbcUrl,
ledgerId = None,
timeProvider = TimeProvider.UTC,
acs = InMemoryActiveContracts.empty,
acs = InMemoryActiveLedgerState.empty,
packages = InMemoryPackageStore.empty,
initialLedgerEntries = ImmArray.empty,
queueDepth
@ -111,7 +111,7 @@ class SqlLedgerSpec
jdbcUrl = postgresFixture.jdbcUrl,
ledgerId = Some(LedgerId(Ref.LedgerString.assertFromString("TheLedger"))),
timeProvider = TimeProvider.UTC,
acs = InMemoryActiveContracts.empty,
acs = InMemoryActiveLedgerState.empty,
packages = InMemoryPackageStore.empty,
initialLedgerEntries = ImmArray.empty,
queueDepth
@ -120,7 +120,7 @@ class SqlLedgerSpec
jdbcUrl = postgresFixture.jdbcUrl,
ledgerId = Some(LedgerId(Ref.LedgerString.assertFromString("AnotherLedger"))),
timeProvider = TimeProvider.UTC,
acs = InMemoryActiveContracts.empty,
acs = InMemoryActiveLedgerState.empty,
packages = InMemoryPackageStore.empty,
initialLedgerEntries = ImmArray.empty,
queueDepth