diff --git a/ledger/participant-state-index/reference/src/main/scala/com/daml/ledger/participant/state/index/v1/impl/reference/IndexState.scala b/ledger/participant-state-index/reference/src/main/scala/com/daml/ledger/participant/state/index/v1/impl/reference/IndexState.scala index da11382a56..ee73047aab 100644 --- a/ledger/participant-state-index/reference/src/main/scala/com/daml/ledger/participant/state/index/v1/impl/reference/IndexState.scala +++ b/ledger/participant-state-index/reference/src/main/scala/com/daml/ledger/participant/state/index/v1/impl/reference/IndexState.scala @@ -14,7 +14,7 @@ import com.digitalasset.daml.lf.value.Value import com.digitalasset.daml.lf.value.Value.ContractId import com.digitalasset.daml_lf.DamlLf.Archive import com.digitalasset.ledger.api.domain.PartyDetails -import com.digitalasset.platform.sandbox.stores.InMemoryActiveContracts +import com.digitalasset.platform.sandbox.stores.InMemoryActiveLedgerState import org.slf4j.{Logger, LoggerFactory} import scala.collection.immutable.TreeMap @@ -28,7 +28,7 @@ final case class IndexState( private val beginning: Option[Offset], // Accepted transactions indexed by offset. txs: TreeMap[Offset, (Update.TransactionAccepted, BlindingInfo)], - activeContracts: InMemoryActiveContracts, + activeContracts: InMemoryActiveLedgerState, // Rejected commands indexed by offset. commandRejections: TreeMap[Offset, Update.CommandRejected], // Uploaded packages. @@ -120,7 +120,8 @@ final case class IndexState( transaction = u.transaction, explicitDisclosure = blindingInfo.explicitDisclosure, localImplicitDisclosure = blindingInfo.localImplicitDisclosure, - globalImplicitDisclosure = blindingInfo.globalImplicitDisclosure + globalImplicitDisclosure = blindingInfo.globalImplicitDisclosure, + referencedContracts = u.divulgedContracts.map(c => c.contractId -> c.contractInst) ) .fold(_ => Left(SequencingError), { newActiveContracts => Right( @@ -171,7 +172,7 @@ object IndexState { configuration = lic.config, recordTime = lic.initialRecordTime, txs = TreeMap.empty, - activeContracts = InMemoryActiveContracts.empty, + activeContracts = InMemoryActiveLedgerState.empty, commandRejections = TreeMap.empty, packages = Map.empty, packageDetails = Map.empty, diff --git a/ledger/participant-state-index/reference/src/main/scala/com/daml/ledger/participant/state/index/v1/impl/reference/ReferenceIndexService.scala b/ledger/participant-state-index/reference/src/main/scala/com/daml/ledger/participant/state/index/v1/impl/reference/ReferenceIndexService.scala index 1b9b925603..13c792b22b 100644 --- a/ledger/participant-state-index/reference/src/main/scala/com/daml/ledger/participant/state/index/v1/impl/reference/ReferenceIndexService.scala +++ b/ledger/participant-state-index/reference/src/main/scala/com/daml/ledger/participant/state/index/v1/impl/reference/ReferenceIndexService.scala @@ -19,7 +19,7 @@ import com.digitalasset.daml.lf.value.Value import com.digitalasset.daml_lf.DamlLf import com.digitalasset.ledger.api.domain.{LedgerOffset, PartyDetails, TransactionFilter} import com.digitalasset.platform.akkastreams.dispatcher.SignalDispatcher -import com.digitalasset.platform.sandbox.stores.ActiveContracts +import com.digitalasset.platform.sandbox.stores.ActiveLedgerState import org.slf4j.LoggerFactory import scala.concurrent.duration.FiniteDuration @@ -219,7 +219,7 @@ final case class ReferenceIndexService( } .collect { case (workflowId, create: AcsUpdateEvent.Create) - if state.activeContracts.contracts.contains(create.contractId) => + if state.activeContracts.activeContracts.contains(create.contractId) => (workflowId, create) } .toIterator) @@ -328,10 +328,15 @@ final case class ReferenceIndexService( }) } - private def canSeeContract(submitter: Party, ac: ActiveContracts.ActiveContract): Boolean = { - // ^ only parties disclosed or divulged to can lookup; see https://github.com/digital-asset/daml/issues/10 - // and https://github.com/digital-asset/daml/issues/751 . - Right(submitter) exists (p => ac.witnesses(p) || ac.divulgences.contains(p)) + private def canSeeContract(submitter: Party, c: ActiveLedgerState.Contract): Boolean = c match { + case ac: ActiveLedgerState.ActiveContract => + // ^ only parties disclosed or divulged to can lookup; see https://github.com/digital-asset/daml/issues/10 + // and https://github.com/digital-asset/daml/issues/751 . + Right(submitter) exists (p => ac.witnesses(p) || ac.divulgences.contains(p)) + case dc: ActiveLedgerState.DivulgedContract => + // ^ only parties divulged to can lookup; see https://github.com/digital-asset/daml/issues/10 + // and https://github.com/digital-asset/daml/issues/751 . + Right(submitter) exists (p => dc.divulgences.contains(p)) } override def lookupActiveContract(submitter: Party, contractId: Value.AbsoluteContractId) diff --git a/ledger/sandbox/src/main/resources/db/migration/h2database/V3__Contract_Divulgence.sha256 b/ledger/sandbox/src/main/resources/db/migration/h2database/V3__Contract_Divulgence.sha256 new file mode 100644 index 0000000000..041a13e91f --- /dev/null +++ b/ledger/sandbox/src/main/resources/db/migration/h2database/V3__Contract_Divulgence.sha256 @@ -0,0 +1 @@ +4cace68fd69753d0a5ace8e92d3ff3b50058351692c384ab5f47b3ba43923bd7 diff --git a/ledger/sandbox/src/main/resources/db/migration/h2database/V3__Contract_Divulgence.sql b/ledger/sandbox/src/main/resources/db/migration/h2database/V3__Contract_Divulgence.sql new file mode 100644 index 0000000000..f0b0072c61 --- /dev/null +++ b/ledger/sandbox/src/main/resources/db/migration/h2database/V3__Contract_Divulgence.sql @@ -0,0 +1,25 @@ +-- Copyright (c) 2019 The DAML Authors. All rights reserved. +-- SPDX-License-Identifier: Apache-2.0 + +--------------------------------------------------------------------------------------------------- +-- V3: Contract divulgence +-- +-- This schema version splits the contracts table into: +-- contracts_data, only holding contract data +-- contracts, only holding contract metadata +-- +-- This is done because for divulged contracts, we only know the contract data, +-- but no other metadata. +--------------------------------------------------------------------------------------------------- + + +-- Move the `contract` column (the serialized contract data) from contracts to contract_data. +CREATE TABLE contract_data ( + id varchar primary key not null, + -- the serialized contract value, using the definition in + -- `daml-lf/transaction/src/main/protobuf/com/digitalasset/daml/lf/value.proto` + -- and the encoder in `ContractSerializer.scala`. + contract bytea not null +); +INSERT INTO contract_data (id, contract) SELECT id, contract FROM contracts; +ALTER TABLE contracts DROP COLUMN contract; diff --git a/ledger/sandbox/src/main/resources/db/migration/postgres/V8__Contract_Divulgence.sha256 b/ledger/sandbox/src/main/resources/db/migration/postgres/V8__Contract_Divulgence.sha256 new file mode 100644 index 0000000000..c9f2c27162 --- /dev/null +++ b/ledger/sandbox/src/main/resources/db/migration/postgres/V8__Contract_Divulgence.sha256 @@ -0,0 +1 @@ +53fce47c203871a2ca80fd647d5d4a6351b321d0591acc25cd24396bcee01b47 diff --git a/ledger/sandbox/src/main/resources/db/migration/postgres/V8__Contract_Divulgence.sql b/ledger/sandbox/src/main/resources/db/migration/postgres/V8__Contract_Divulgence.sql new file mode 100644 index 0000000000..ebf39bb84b --- /dev/null +++ b/ledger/sandbox/src/main/resources/db/migration/postgres/V8__Contract_Divulgence.sql @@ -0,0 +1,24 @@ +-- Copyright (c) 2019 The DAML Authors. All rights reserved. +-- SPDX-License-Identifier: Apache-2.0 + +--------------------------------------------------------------------------------------------------- +-- V8: Contract divulgence +-- +-- This schema version splits the contracts table into: +-- contracts_data, only holding contract data +-- contracts, only holding contract metadata +-- +-- This is done because for divulged contracts, we only know the contract data, +-- but no other metadata. +--------------------------------------------------------------------------------------------------- + +-- Move the `contract` column (the serialized contract data) from contracts to contract_data. +CREATE TABLE contract_data ( + id varchar primary key not null, + -- the serialized contract value, using the definition in + -- `daml-lf/transaction/src/main/protobuf/com/digitalasset/daml/lf/value.proto` + -- and the encoder in `ContractSerializer.scala`. + contract bytea not null +); +INSERT INTO contract_data (id, contract) SELECT id, contract FROM contracts; +ALTER TABLE contracts DROP COLUMN contract; diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/JdbcIndexer.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/JdbcIndexer.scala index 16d2b25054..1bd6e09858 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/JdbcIndexer.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/JdbcIndexer.scala @@ -225,6 +225,8 @@ class JdbcIndexer private[index] ( SandboxEventIdFormatter.fromTransactionId(transactionId, nodeId) -> parties } + assert(blindingInfo.localImplicitDisclosure.isEmpty) + val pt = PersistenceEntry.Transaction( LedgerEntry.Transaction( optSubmitterInfo.map(_.commandId), @@ -240,7 +242,8 @@ class JdbcIndexer private[index] ( mappedDisclosure ), mappedLocalImplicitDisclosure, - blindingInfo.globalImplicitDisclosure + blindingInfo.globalImplicitDisclosure, + divulgedContracts.map(c => c.contractId -> c.contractInst) ) ledgerDao .storeLedgerEntry(headRef, headRef + 1, externalOffset, pt) diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/SandboxServer.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/SandboxServer.scala index 592657bd4b..8a51981675 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/SandboxServer.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/SandboxServer.scala @@ -25,7 +25,7 @@ import com.digitalasset.platform.sandbox.stores.ledger.ScenarioLoader.LedgerEntr import com.digitalasset.platform.sandbox.stores.ledger._ import com.digitalasset.platform.sandbox.stores.ledger.sql.SqlStartMode import com.digitalasset.platform.sandbox.stores.{ - InMemoryActiveContracts, + InMemoryActiveLedgerState, InMemoryPackageStore, SandboxIndexAndWriteService } @@ -53,7 +53,7 @@ object SandboxServer { // if requested, initialize the ledger state with the given scenario private def createInitialState(config: SandboxConfig, packageStore: InMemoryPackageStore) - : (InMemoryActiveContracts, ImmArray[LedgerEntryOrBump], Option[Instant]) = { + : (InMemoryActiveLedgerState, ImmArray[LedgerEntryOrBump], Option[Instant]) = { // [[ScenarioLoader]] needs all the packages to be already compiled -- // make sure that that's the case if (config.eagerPackageLoading || config.scenario.nonEmpty) { @@ -72,7 +72,7 @@ object SandboxServer { } } config.scenario match { - case None => (InMemoryActiveContracts.empty, ImmArray.empty, None) + case None => (InMemoryActiveLedgerState.empty, ImmArray.empty, None) case Some(scenario) => val (acs, records, ledgerTime) = ScenarioLoader.fromScenario(packageStore, engine.compiledPackages(), scenario) diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ActiveLedgerState.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ActiveLedgerState.scala new file mode 100644 index 0000000000..109290e3e2 --- /dev/null +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ActiveLedgerState.scala @@ -0,0 +1,110 @@ +// Copyright (c) 2019 The DAML Authors. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.digitalasset.platform.sandbox.stores + +import java.time.Instant + +import com.daml.ledger.participant.state.v1.AbsoluteContractInst +import com.digitalasset.daml.lf.data.Ref.{Party, TransactionIdString} +import com.digitalasset.daml.lf.data.Relation.Relation +import com.digitalasset.daml.lf.transaction.Node.{GlobalKey, KeyWithMaintainers} +import com.digitalasset.daml.lf.value.Value +import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, ContractInst, VersionedValue} +import com.digitalasset.ledger.WorkflowId +import com.digitalasset.platform.sandbox.stores.ActiveLedgerState._ + +/** + * An abstract representation of the active ledger state: + * - Active contracts + * - Divulged contracts + * - Contract keys + * - Known parties + * + * The active ledger state is used for validating transactions, + * see [[ActiveLedgerStateManager]]. + * + * The active ledger state could be derived from the transaction stream, + * we keep track of it explicitly for performance reasons. + */ +trait ActiveLedgerState[+Self] { this: ActiveLedgerState[Self] => + + /** Callback to query an active or divulged contract, used for transaction validation */ + def lookupContract(cid: AbsoluteContractId): Option[Contract] + + /** Callback to query a contract key, used for transaction validation */ + def keyExists(key: GlobalKey): Boolean + + /** Called when a new contract is created */ + def addContract(c: ActiveContract, keyO: Option[GlobalKey]): Self + + /** Called when the given contract is archived */ + def removeContract(cid: AbsoluteContractId, keyO: Option[GlobalKey]): Self + + /** Called once for each transaction with the set of parties found in that transaction. + * As the sandbox has an open world of parties, any party name mentioned in a transaction + * will implicitly add that name to the list of known parties. + */ + def addParties(parties: Set[Party]): Self + + /** Note that this method is about divulging contracts _that have already been + * committed_. Implementors of [[ActiveLedgerState]] must take care to also store + * divulgence information already present in `ActiveContract#divulgences` in the `addContract` + * method. + */ + def divulgeAlreadyCommittedContracts( + transactionId: TransactionIdString, + global: Relation[AbsoluteContractId, Party], + referencedContracts: List[(Value.AbsoluteContractId, AbsoluteContractInst)]): Self +} + +object ActiveLedgerState { + + /** A contract that is part of the [[ActiveLedgerState]]. + * Depending on where the contract came from, other metadata may be available. + */ + sealed abstract class Contract { + def id: Value.AbsoluteContractId + def contract: ContractInst[VersionedValue[AbsoluteContractId]] + + /** For each party, the transaction id at which the contract was divulged */ + def divulgences: Map[Party, TransactionIdString] + + /** Returns the new divulgences after the contract has been divulged to the given parties at the given transaction */ + def divulgeTo( + parties: Set[Party], + transactionId: TransactionIdString): Map[Party, TransactionIdString] = + parties.foldLeft(divulgences)((m, e) => if (m.contains(e)) m else m + (e -> transactionId)) + } + + /** + * For divulged contracts, we only know their contract argument, but no other metadata. + * Note also that a ledger node may not be notified when a divulged contract gets archived. + * + * These contracts are only used for transaction validation, they are not part of the active contract set. + */ + final case class DivulgedContract( + id: Value.AbsoluteContractId, + contract: ContractInst[VersionedValue[AbsoluteContractId]], + /** For each party, the transaction id at which the contract was divulged */ + divulgences: Map[Party, TransactionIdString], + ) extends Contract + + /** + * For active contracts, we know all metadata. + */ + final case class ActiveContract( + id: Value.AbsoluteContractId, + let: Instant, // time when the contract was committed + transactionId: TransactionIdString, // transaction id where the contract originates + workflowId: Option[WorkflowId], // workflow id from where the contract originates + contract: ContractInst[VersionedValue[AbsoluteContractId]], + witnesses: Set[Party], + divulgences: Map[Party, TransactionIdString], // for each party, the transaction id at which the contract was divulged + key: Option[KeyWithMaintainers[VersionedValue[AbsoluteContractId]]], + signatories: Set[Party], + observers: Set[Party], + agreementText: String) + extends Contract + +} diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ActiveContracts.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ActiveLedgerStateManager.scala similarity index 56% rename from ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ActiveContracts.scala rename to ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ActiveLedgerStateManager.scala index c7815f2297..9500e8530f 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ActiveContracts.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ActiveLedgerStateManager.scala @@ -5,14 +5,16 @@ package com.digitalasset.platform.sandbox.stores import java.time.Instant +import com.daml.ledger.participant.state.v1.AbsoluteContractInst import com.digitalasset.daml.lf.data.Ref.{Party, TransactionIdString} import com.digitalasset.daml.lf.data.Relation.Relation -import com.digitalasset.daml.lf.transaction.Node.{GlobalKey, KeyWithMaintainers} +import com.digitalasset.daml.lf.transaction.Node.GlobalKey import com.digitalasset.daml.lf.transaction.{GenTransaction, Node => N} -import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, ContractInst, VersionedValue} +import com.digitalasset.daml.lf.value.Value +import com.digitalasset.daml.lf.value.Value.AbsoluteContractId import com.digitalasset.ledger.WorkflowId import com.digitalasset.platform.sandbox.services.transaction.SandboxEventIdFormatter -import com.digitalasset.platform.sandbox.stores.ActiveContracts._ +import com.digitalasset.platform.sandbox.stores.ActiveLedgerState._ import com.digitalasset.platform.sandbox.stores.ledger.SequencingError import com.digitalasset.platform.sandbox.stores.ledger.SequencingError.PredicateType.{ Exercise, @@ -25,16 +27,23 @@ import com.digitalasset.platform.sandbox.stores.ledger.SequencingError.{ TimeBeforeError } -class ActiveContractsManager[ACS](initialState: => ACS)(implicit ACS: ACS => ActiveContracts[ACS]) { +/** + * A helper for updating an [[ActiveLedgerState]] with new transactions: + * - Validates the transaction against the [[ActiveLedgerState]]. + * - Updates the [[ActiveLedgerState]. + */ +class ActiveLedgerStateManager[ALS](initialState: => ALS)( + implicit ACS: ALS => ActiveLedgerState[ALS]) { private case class AddTransactionState( - acc: Option[ACS], + acc: Option[ALS], errs: Set[SequencingError], - parties: Set[Party]) { + parties: Set[Party], + archivedIds: Set[AbsoluteContractId]) { - def mapAcs(f: ACS => ACS): AddTransactionState = copy(acc = acc map f) + def mapAcs(f: ALS => ALS): AddTransactionState = copy(acc = acc map f) - def result: Either[Set[SequencingError], ACS] = { + def result: Either[Set[SequencingError], ALS] = { acc match { case None => if (errs.isEmpty) { @@ -52,12 +61,12 @@ class ActiveContractsManager[ACS](initialState: => ACS)(implicit ACS: ACS => Act } private object AddTransactionState { - def apply(acs: ACS): AddTransactionState = - AddTransactionState(Some(acs), Set(), Set.empty) + def apply(acs: ALS): AddTransactionState = + AddTransactionState(Some(acs), Set(), Set.empty, Set.empty) } /** - * A higher order function to update an abstract active contract set (ACS) with the effects of the given transaction. + * A higher order function to update an abstract active ledger state (ALS) with the effects of the given transaction. * Makes sure that there are no double spends or timing errors. */ def addTransaction[Nid]( @@ -67,26 +76,47 @@ class ActiveContractsManager[ACS](initialState: => ACS)(implicit ACS: ACS => Act transaction: GenTransaction.WithTxValue[Nid, AbsoluteContractId], explicitDisclosure: Relation[Nid, Party], localImplicitDisclosure: Relation[Nid, Party], - globalImplicitDisclosure: Relation[AbsoluteContractId, Party]) - : Either[Set[SequencingError], ACS] = { + globalImplicitDisclosure: Relation[AbsoluteContractId, Party], + referencedContracts: List[(Value.AbsoluteContractId, AbsoluteContractInst)]) + : Either[Set[SequencingError], ALS] = { + // NOTE(RC): `globalImplicitDisclosure` was meant to refer to contracts created in previous transactions. + // However, because we have translated relative to absolute IDs at this point, `globalImplicitDisclosure` + // will also point to contracts created in the same transaction. + // + // This is dealt with as follows: + // - First, all transaction nodes are traversed without updating divulgence info. + // - When validating a fetch/exercise node, both the set of previously divulged contracts and + // the newly divulged contracts is used. + // - While traversing consuming exercise nodes, the set of all contracts archived in this transaction is collected. + // - Finally, divulgence information is updated using `globalImplicitDisclosure` minus the set of contracts + // archived in this transaction. val st = transaction .fold[AddTransactionState](GenTransaction.TopDown, AddTransactionState(initialState)) { - case (ats @ AddTransactionState(None, _, _), _) => ats - case (ats @ AddTransactionState(Some(acc), errs, parties), (nodeId, node)) => - // if some node requires a contract, check that we have that contract, and check that that contract is not + case (ats @ AddTransactionState(None, _, _, _), _) => ats + case (ats @ AddTransactionState(Some(acc), errs, parties, archivedIds), (nodeId, node)) => + // If some node requires a contract, check that we have that contract, and check that that contract is not // created after the current let. def contractCheck( cid: AbsoluteContractId, predType: PredicateType): Option[SequencingError] = acc lookupContract cid match { - case None => Some(InactiveDependencyError(cid, predType)) - case Some(otherTx) => - if (otherTx.let.isAfter(let)) { - Some(TimeBeforeError(cid, otherTx.let, let, predType)) + case Some(otherContract: ActiveContract) => + // Existing active contract, check its LET + if (otherContract.let.isAfter(let)) { + Some(TimeBeforeError(cid, otherContract.let, let, predType)) } else { None } + case Some(_: DivulgedContract) => + // Contract divulged in the past + None + case None if referencedContracts.exists(_._1 == cid) => + // Contract is going to be divulged in this transaction + None + case None => + // Contract not known + Some(InactiveDependencyError(cid, predType)) } node match { @@ -98,14 +128,23 @@ class ActiveContractsManager[ACS](initialState: => ACS)(implicit ACS: ACS => Act AddTransactionState( Some(acc), contractCheck(absCoid, Fetch).fold(errs)(errs + _), - parties.union(nodeParties) + parties.union(nodeParties), + archivedIds ) case nc: N.NodeCreate.WithTxValue[AbsoluteContractId] => val nodeParties = nc.signatories .union(nc.stakeholders) .union(nc.key.map(_.maintainers).getOrElse(Set.empty)) val absCoid = SandboxEventIdFormatter.makeAbsCoid(transactionId)(nc.coid) + val withoutStakeHolders = localImplicitDisclosure + .getOrElse(nodeId, Set.empty) diff nc.stakeholders + val withStakeHolders = localImplicitDisclosure + .getOrElse(nodeId, Set.empty) + + assert(withoutStakeHolders == withStakeHolders) + val activeContract = ActiveContract( + id = absCoid, let = let, transactionId = transactionId, workflowId = workflowId, @@ -125,14 +164,18 @@ class ActiveContractsManager[ACS](initialState: => ACS)(implicit ACS: ACS => Act ) activeContract.key match { case None => - ats.copy(acc = Some(acc.addContract(absCoid, activeContract, None))) + ats.copy(acc = Some(acc.addContract(activeContract, None))) case Some(key) => val gk = GlobalKey(activeContract.contract.template, key.key) if (acc keyExists gk) { - AddTransactionState(None, errs + DuplicateKey(gk), parties.union(nodeParties)) + AddTransactionState( + None, + errs + DuplicateKey(gk), + parties.union(nodeParties), + archivedIds) } else { ats.copy( - acc = Some(acc.addContract(absCoid, activeContract, Some(gk))), + acc = Some(acc.addContract(activeContract, Some(gk))), parties = parties.union(nodeParties) ) } @@ -145,14 +188,16 @@ class ActiveContractsManager[ACS](initialState: => ACS)(implicit ACS: ACS => Act ats.copy( errs = contractCheck(absCoid, Exercise).fold(errs)(errs + _), acc = Some(if (ne.consuming) { - acc.removeContract(absCoid, (acc lookupContract absCoid).flatMap(_.key) match { - case None => None - case Some(key) => Some(GlobalKey(ne.templateId, key.key)) - }) + val keyO = (acc lookupContract absCoid) + .collect({ case c: ActiveContract => c }) + .flatMap(_.key) + .map(key => GlobalKey(ne.templateId, key.key)) + acc.removeContract(absCoid, keyO) } else { acc }), - parties = parties.union(nodeParties) + parties = parties.union(nodeParties), + archivedIds = if (ne.consuming) archivedIds + absCoid else archivedIds ) case nlkup: N.NodeLookupByKey.WithTxValue[AbsoluteContractId] => // NOTE(FM) we do not need to check anything, since @@ -162,47 +207,11 @@ class ActiveContractsManager[ACS](initialState: => ACS)(implicit ACS: ACS => Act } } - st.mapAcs(_ divulgeAlreadyCommittedContract (transactionId, globalImplicitDisclosure)) + val divulgedContracts = globalImplicitDisclosure -- st.archivedIds + st.mapAcs( + _ divulgeAlreadyCommittedContracts (transactionId, divulgedContracts, referencedContracts)) .mapAcs(_ addParties st.parties) .result } } - -trait ActiveContracts[+Self] { this: ActiveContracts[Self] => - def lookupContract(cid: AbsoluteContractId): Option[ActiveContract] - def keyExists(key: GlobalKey): Boolean - def addContract(cid: AbsoluteContractId, c: ActiveContract, keyO: Option[GlobalKey]): Self - def removeContract(cid: AbsoluteContractId, keyO: Option[GlobalKey]): Self - - /** Called once for each transaction with the set of parties found in that transaction. - * As the sandbox has an open world of parties, any party name mentioned in a transaction - * will implicitly add that name to the list of known parties. - */ - def addParties(parties: Set[Party]): Self - - /** Note that this method is about disclosing contracts _that have already been - * committed_. Implementors of `ActiveContracts` must take care to also store - * divulgence information already present in `ActiveContract#divulgences` in the `addContract` - * method. - */ - def divulgeAlreadyCommittedContract( - transactionId: TransactionIdString, - global: Relation[AbsoluteContractId, Party]): Self -} - -object ActiveContracts { - - case class ActiveContract( - let: Instant, // time when the contract was committed - transactionId: TransactionIdString, // transaction id where the contract originates - workflowId: Option[WorkflowId], // workflow id from where the contract originates - contract: ContractInst[VersionedValue[AbsoluteContractId]], - witnesses: Set[Party], - divulgences: Map[Party, TransactionIdString], // for each party, the transaction id at which the contract was divulged - key: Option[KeyWithMaintainers[VersionedValue[AbsoluteContractId]]], - signatories: Set[Party], - observers: Set[Party], - agreementText: String) - -} diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/InMemoryActiveContracts.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/InMemoryActiveContracts.scala deleted file mode 100644 index 9539dce3ec..0000000000 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/InMemoryActiveContracts.scala +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright (c) 2019 The DAML Authors. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.digitalasset.platform.sandbox.stores - -import java.time.Instant - -import com.digitalasset.daml.lf.data.Ref.{Party, TransactionIdString} -import com.digitalasset.daml.lf.data.Relation.Relation -import com.digitalasset.daml.lf.transaction.Node.GlobalKey -import com.digitalasset.daml.lf.transaction.GenTransaction -import com.digitalasset.daml.lf.value.Value.AbsoluteContractId -import com.digitalasset.ledger.WorkflowId -import com.digitalasset.ledger.api.domain.PartyDetails -import com.digitalasset.platform.sandbox.stores.ActiveContracts._ -import com.digitalasset.platform.sandbox.stores.ledger.SequencingError -import scalaz.syntax.std.map._ - -case class InMemoryActiveContracts( - contracts: Map[AbsoluteContractId, ActiveContract], - keys: Map[GlobalKey, AbsoluteContractId], - parties: Map[Party, PartyDetails]) - extends ActiveContracts[InMemoryActiveContracts] { - - override def lookupContract(cid: AbsoluteContractId) = contracts.get(cid) - - override def keyExists(key: GlobalKey) = keys.contains(key) - - override def addContract(cid: AbsoluteContractId, c: ActiveContract, keyO: Option[GlobalKey]) = - keyO match { - case None => copy(contracts = contracts + (cid -> c)) - case Some(key) => - copy(contracts = contracts + (cid -> c), keys = keys + (key -> cid)) - } - - override def removeContract(cid: AbsoluteContractId, keyO: Option[GlobalKey]) = keyO match { - case None => copy(contracts = contracts - cid) - case Some(key) => copy(contracts = contracts - cid, keys = keys - key) - } - - override def addParties(newParties: Set[Party]): InMemoryActiveContracts = - copy(parties = newParties.map(p => p -> PartyDetails(p, None, true)).toMap ++ parties) - - override def divulgeAlreadyCommittedContract( - transactionId: TransactionIdString, - global: Relation[AbsoluteContractId, Party]): InMemoryActiveContracts = - if (global.nonEmpty) - copy( - contracts = contracts ++ - contracts.intersectWith(global) { (ac, parties) => - ac copy (divulgences = parties.foldLeft(ac.divulgences)((m, e) => - if (m.contains(e)) m else m + (e -> transactionId))) - }) - else this - - private val acManager = - new ActiveContractsManager(this) - - /** adds a transaction to the ActiveContracts, make sure that there are no double spends or - * timing errors. this check is leveraged to achieve higher concurrency, see LedgerState - */ - def addTransaction[Nid]( - let: Instant, - transactionId: TransactionIdString, - workflowId: Option[WorkflowId], - transaction: GenTransaction.WithTxValue[Nid, AbsoluteContractId], - explicitDisclosure: Relation[Nid, Party], - localImplicitDisclosure: Relation[Nid, Party], - globalImplicitDisclosure: Relation[AbsoluteContractId, Party] - ): Either[Set[SequencingError], InMemoryActiveContracts] = - acManager.addTransaction( - let, - transactionId, - workflowId, - transaction, - explicitDisclosure, - localImplicitDisclosure, - globalImplicitDisclosure) - - /** - * Adds a new party to the list of known parties. - */ - def addParty(details: PartyDetails): InMemoryActiveContracts = { - assert(!parties.contains(details.party)) - copy(parties = parties + (details.party -> details)) - } -} - -object InMemoryActiveContracts { - def empty: InMemoryActiveContracts = InMemoryActiveContracts(Map(), Map(), Map.empty) -} diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/InMemoryActiveLedgerState.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/InMemoryActiveLedgerState.scala new file mode 100644 index 0000000000..cece66b93e --- /dev/null +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/InMemoryActiveLedgerState.scala @@ -0,0 +1,155 @@ +// Copyright (c) 2019 The DAML Authors. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.digitalasset.platform.sandbox.stores + +import java.time.Instant + +import com.daml.ledger.participant.state.v1.AbsoluteContractInst +import com.digitalasset.daml.lf.data.Ref.{Party, TransactionIdString} +import com.digitalasset.daml.lf.data.Relation.Relation +import com.digitalasset.daml.lf.transaction.Node.GlobalKey +import com.digitalasset.daml.lf.transaction.GenTransaction +import com.digitalasset.daml.lf.value.Value +import com.digitalasset.daml.lf.value.Value.AbsoluteContractId +import com.digitalasset.ledger.WorkflowId +import com.digitalasset.ledger.api.domain.PartyDetails +import com.digitalasset.platform.sandbox.stores.ActiveLedgerState._ +import com.digitalasset.platform.sandbox.stores.ledger.SequencingError +import scalaz.syntax.std.map._ + +case class InMemoryActiveLedgerState( + activeContracts: Map[AbsoluteContractId, ActiveContract], + divulgedContracts: Map[AbsoluteContractId, DivulgedContract], + keys: Map[GlobalKey, AbsoluteContractId], + parties: Map[Party, PartyDetails]) + extends ActiveLedgerState[InMemoryActiveLedgerState] { + + override def lookupContract(cid: AbsoluteContractId): Option[Contract] = + activeContracts.get(cid).orElse[Contract](divulgedContracts.get(cid)) + + override def keyExists(key: GlobalKey) = keys.contains(key) + + /** + * Updates divulgence information on the given active contract with information + * from the already existing divulged contract. + */ + private def copyDivulgences(ac: ActiveContract, dc: DivulgedContract): ActiveContract = + ac.copy(divulgences = ac.divulgences.unionWith(dc.divulgences)((l, _) => l)) + + override def addContract( + c: ActiveContract, + keyO: Option[GlobalKey]): InMemoryActiveLedgerState = { + val newKeys = keyO match { + case None => keys + case Some(key) => keys + (key -> c.id) + } + divulgedContracts.get(c.id) match { + case None => + copy( + activeContracts = activeContracts + (c.id -> c), + keys = newKeys + ) + case Some(dc) => + copy( + activeContracts = activeContracts + (c.id -> copyDivulgences(c, dc)), + divulgedContracts = divulgedContracts - c.id, + keys = newKeys + ) + } + } + + override def removeContract(cid: AbsoluteContractId, keyO: Option[GlobalKey]) = { + val newKeys = keyO match { + case None => keys + case Some(key) => keys - key + } + copy( + activeContracts = activeContracts - cid, + divulgedContracts = divulgedContracts - cid, + keys = newKeys + ) + } + + override def addParties(newParties: Set[Party]): InMemoryActiveLedgerState = + copy(parties = newParties.map(p => p -> PartyDetails(p, None, true)).toMap ++ parties) + + override def divulgeAlreadyCommittedContracts( + transactionId: TransactionIdString, + global: Relation[AbsoluteContractId, Party], + referencedContracts: List[(Value.AbsoluteContractId, AbsoluteContractInst)]) + : InMemoryActiveLedgerState = + if (global.nonEmpty) { + val referencedContractsM = referencedContracts.toMap + // Note: each entry in `global` can refer to either: + // - a known active contract, in which case its divulgence info is updated + // - a previously divulged contract, in which case its divulgence info is updated + // - an unknown contract, in which case a new divulged contract is created from the corresponding info in `referencedContracts` + val updatedAcs = activeContracts.intersectWith(global) { (ac, parties) => + ac copy (divulgences = ac.divulgeTo(parties, transactionId)) + } + val updatedDcs = divulgedContracts.intersectWith(global) { (dc, parties) => + dc copy (divulgences = dc.divulgeTo(parties, transactionId)) + } + val newDcs = global.foldLeft(Map.empty[AbsoluteContractId, DivulgedContract]) { + case (m, (cid, divulgeTo)) => + if (divulgeTo.isEmpty || updatedAcs.contains(cid) || updatedDcs.contains(cid)) + m + else + m + (cid -> DivulgedContract( + id = cid, + contract = referencedContractsM + .getOrElse( + cid, + sys.error( + s"Transaction $transactionId says it divulges contract ${cid.coid} to parties ${divulgeTo + .mkString(",")}, but that contract does not exist.") + ), + divulgences = Map.empty ++ divulgeTo.map(p => p -> transactionId) + )) + } + copy( + activeContracts = activeContracts ++ updatedAcs, + divulgedContracts = divulgedContracts ++ updatedDcs ++ newDcs + ) + } else this + + private val acManager = + new ActiveLedgerStateManager(this) + + /** adds a transaction to the ActiveContracts, make sure that there are no double spends or + * timing errors. this check is leveraged to achieve higher concurrency, see LedgerState + */ + def addTransaction[Nid]( + let: Instant, + transactionId: TransactionIdString, + workflowId: Option[WorkflowId], + transaction: GenTransaction.WithTxValue[Nid, AbsoluteContractId], + explicitDisclosure: Relation[Nid, Party], + localImplicitDisclosure: Relation[Nid, Party], + globalImplicitDisclosure: Relation[AbsoluteContractId, Party], + referencedContracts: List[(Value.AbsoluteContractId, AbsoluteContractInst)] + ): Either[Set[SequencingError], InMemoryActiveLedgerState] = + acManager.addTransaction( + let, + transactionId, + workflowId, + transaction, + explicitDisclosure, + localImplicitDisclosure, + globalImplicitDisclosure, + referencedContracts) + + /** + * Adds a new party to the list of known parties. + */ + def addParty(details: PartyDetails): InMemoryActiveLedgerState = { + assert(!parties.contains(details.party)) + copy(parties = parties + (details.party -> details)) + } +} + +object InMemoryActiveLedgerState { + def empty: InMemoryActiveLedgerState = + InMemoryActiveLedgerState(Map.empty, Map.empty, Map.empty, Map.empty) +} diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/SandboxIndexAndWriteService.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/SandboxIndexAndWriteService.scala index 1dcdaa1357..e2ea4457a3 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/SandboxIndexAndWriteService.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/SandboxIndexAndWriteService.scala @@ -66,7 +66,7 @@ object SandboxIndexAndWriteService { jdbcUrl: String, timeModel: TimeModel, timeProvider: TimeProvider, - acs: InMemoryActiveContracts, + acs: InMemoryActiveLedgerState, ledgerEntries: ImmArray[LedgerEntryOrBump], startMode: SqlStartMode, queueDepth: Int, @@ -92,7 +92,7 @@ object SandboxIndexAndWriteService { participantId: ParticipantId, timeModel: TimeModel, timeProvider: TimeProvider, - acs: InMemoryActiveContracts, + acs: InMemoryActiveLedgerState, ledgerEntries: ImmArray[LedgerEntryOrBump], templateStore: InMemoryPackageStore)( implicit mat: Materializer, @@ -171,21 +171,20 @@ abstract class LedgerBackedIndexService( ActiveContractSetSnapshot( LedgerOffset.Absolute(LedgerString.fromLong(offset)), acsStream - .mapConcat { - case (cId, ac) => - val create = toUpdateEvent(cId, ac) - EventFilter - .byTemplates(filter) - .filterActiveContractWitnesses(create) - .map(create => ac.workflowId.map(domain.WorkflowId(_)) -> create) - .toList + .mapConcat { ac => + val create = toUpdateEvent(ac.id, ac) + EventFilter + .byTemplates(filter) + .filterActiveContractWitnesses(create) + .map(create => ac.workflowId.map(domain.WorkflowId(_)) -> create) + .toList } ) }(mat.executionContext) private def toUpdateEvent( cId: Value.AbsoluteContractId, - ac: ActiveContracts.ActiveContract): AcsUpdateEvent.Create = + ac: ActiveLedgerState.ActiveContract): AcsUpdateEvent.Create = AcsUpdateEvent.Create( // we use absolute contract ids as event ids throughout the sandbox domain.TransactionId(ac.transactionId), diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/Ledger.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/Ledger.scala index 186de4f84e..5baff8ffe3 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/Ledger.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/Ledger.scala @@ -20,8 +20,8 @@ import com.digitalasset.daml.lf.value.Value.AbsoluteContractId import com.digitalasset.daml_lf.DamlLf.Archive import com.digitalasset.ledger.api.domain.{LedgerId, PartyDetails} import com.digitalasset.platform.sandbox.metrics.MetricsManager -import com.digitalasset.platform.sandbox.stores.ActiveContracts.ActiveContract -import com.digitalasset.platform.sandbox.stores.{InMemoryActiveContracts, InMemoryPackageStore} +import com.digitalasset.platform.sandbox.stores.ActiveLedgerState.Contract +import com.digitalasset.platform.sandbox.stores.{InMemoryActiveLedgerState, InMemoryPackageStore} import com.digitalasset.platform.sandbox.stores.ledger.ScenarioLoader.LedgerEntryOrBump import com.digitalasset.platform.sandbox.stores.ledger.inmemory.InMemoryLedger import com.digitalasset.platform.sandbox.stores.ledger.sql.{ @@ -64,7 +64,7 @@ trait ReadOnlyLedger extends AutoCloseable { def snapshot(): Future[LedgerSnapshot] - def lookupContract(contractId: Value.AbsoluteContractId): Future[Option[ActiveContract]] + def lookupContract(contractId: Value.AbsoluteContractId): Future[Option[Contract]] def lookupKey(key: GlobalKey): Future[Option[AbsoluteContractId]] @@ -85,7 +85,7 @@ trait ReadOnlyLedger extends AutoCloseable { object Ledger { - type LedgerFactory = (InMemoryActiveContracts, Seq[LedgerEntry]) => Ledger + type LedgerFactory = (InMemoryActiveLedgerState, Seq[LedgerEntry]) => Ledger /** * Creates an in-memory ledger @@ -99,7 +99,7 @@ object Ledger { def inMemory( ledgerId: LedgerId, timeProvider: TimeProvider, - acs: InMemoryActiveContracts, + acs: InMemoryActiveLedgerState, packages: InMemoryPackageStore, ledgerEntries: ImmArray[LedgerEntryOrBump]): Ledger = new InMemoryLedger(ledgerId, timeProvider, acs, packages, ledgerEntries) @@ -120,7 +120,7 @@ object Ledger { jdbcUrl: String, ledgerId: LedgerId, timeProvider: TimeProvider, - acs: InMemoryActiveContracts, + acs: InMemoryActiveLedgerState, packages: InMemoryPackageStore, ledgerEntries: ImmArray[LedgerEntryOrBump], queueDepth: Int, diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/LedgerSnapshot.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/LedgerSnapshot.scala index 7aea7c0c3e..1435928c3b 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/LedgerSnapshot.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/LedgerSnapshot.scala @@ -5,7 +5,6 @@ package com.digitalasset.platform.sandbox.stores.ledger import akka.NotUsed import akka.stream.scaladsl.Source -import com.digitalasset.daml.lf.value.Value.AbsoluteContractId -import com.digitalasset.platform.sandbox.stores.ActiveContracts.ActiveContract +import com.digitalasset.platform.sandbox.stores.ActiveLedgerState.ActiveContract -case class LedgerSnapshot(offset: Long, acs: Source[(AbsoluteContractId, ActiveContract), NotUsed]) +case class LedgerSnapshot(offset: Long, acs: Source[ActiveContract, NotUsed]) diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/MeteredLedger.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/MeteredLedger.scala index dc468a6f18..71a2b72563 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/MeteredLedger.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/MeteredLedger.scala @@ -17,7 +17,7 @@ import com.digitalasset.daml.lf.value.Value.AbsoluteContractId import com.digitalasset.daml_lf.DamlLf.Archive import com.digitalasset.ledger.api.domain.{LedgerId, PartyDetails} import com.digitalasset.platform.sandbox.metrics.MetricsManager -import com.digitalasset.platform.sandbox.stores.ActiveContracts.ActiveContract +import com.digitalasset.platform.sandbox.stores.ActiveLedgerState.Contract import scala.concurrent.Future @@ -33,8 +33,7 @@ private class MeteredReadOnlyLedger(ledger: ReadOnlyLedger, mm: MetricsManager) override def snapshot(): Future[LedgerSnapshot] = ledger.snapshot() - override def lookupContract( - contractId: Value.AbsoluteContractId): Future[Option[ActiveContract]] = + override def lookupContract(contractId: Value.AbsoluteContractId): Future[Option[Contract]] = mm.timedFuture("Ledger:lookupContract", ledger.lookupContract(contractId)) override def lookupKey(key: GlobalKey): Future[Option[AbsoluteContractId]] = diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/SandboxContractStore.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/SandboxContractStore.scala index 231f9f3330..09fc2d83fe 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/SandboxContractStore.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/SandboxContractStore.scala @@ -9,18 +9,22 @@ import com.digitalasset.daml.lf.transaction.Node import com.digitalasset.daml.lf.transaction.Transaction.{Value => TxValue} import com.digitalasset.daml.lf.value.Value import com.digitalasset.platform.common.util.{DirectExecutionContext => DEC} -import com.digitalasset.platform.sandbox.stores.ActiveContracts +import com.digitalasset.platform.sandbox.stores.ActiveLedgerState import scala.concurrent.{ExecutionContext, Future} class SandboxContractStore(ledger: ReadOnlyLedger) extends ContractStore { - private[this] def canSeeContract( - submitter: Party, - ac: ActiveContracts.ActiveContract): Boolean = { - // ^ only parties disclosed or divulged to can lookup; see https://github.com/digital-asset/daml/issues/10 - // and https://github.com/digital-asset/daml/issues/751 . - Party fromString submitter exists (p => ac.witnesses(p) || ac.divulgences.contains(p)) - } + private[this] def canSeeContract(submitter: Party, c: ActiveLedgerState.Contract): Boolean = + c match { + case ac: ActiveLedgerState.ActiveContract => + // ^ only parties disclosed or divulged to can lookup; see https://github.com/digital-asset/daml/issues/10 + // and https://github.com/digital-asset/daml/issues/751 . + Party fromString submitter exists (p => ac.witnesses(p) || ac.divulgences.contains(p)) + case dc: ActiveLedgerState.DivulgedContract => + // ^ only parties disclosed or divulged to can lookup; see https://github.com/digital-asset/daml/issues/10 + // and https://github.com/digital-asset/daml/issues/751 . + Party fromString submitter exists (p => dc.divulgences.contains(p)) + } override def lookupActiveContract(submitter: Party, contractId: Value.AbsoluteContractId) : Future[Option[Value.ContractInst[TxValue[Value.AbsoluteContractId]]]] = diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/ScenarioLoader.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/ScenarioLoader.scala index ccd6f8e756..b77963e740 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/ScenarioLoader.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/ScenarioLoader.scala @@ -12,7 +12,7 @@ import com.digitalasset.daml.lf.language.Ast.{DDataType, DValue, Definition} import com.digitalasset.daml.lf.speedy.{ScenarioRunner, Speedy} import com.digitalasset.daml.lf.types.{Ledger => L} import com.digitalasset.daml.lf.value.Value.AbsoluteContractId -import com.digitalasset.platform.sandbox.stores.{InMemoryActiveContracts, InMemoryPackageStore} +import com.digitalasset.platform.sandbox.stores.{InMemoryActiveLedgerState, InMemoryPackageStore} import org.slf4j.LoggerFactory import com.digitalasset.daml.lf.transaction.GenTransaction import com.digitalasset.daml.lf.types.Ledger.ScenarioTransactionId @@ -60,16 +60,16 @@ object ScenarioLoader { def fromScenario( packages: InMemoryPackageStore, compiledPackages: CompiledPackages, - scenario: String): (InMemoryActiveContracts, ImmArray[LedgerEntryOrBump], Instant) = { + scenario: String): (InMemoryActiveLedgerState, ImmArray[LedgerEntryOrBump], Instant) = { val (scenarioLedger, scenarioRef) = buildScenarioLedger(packages, compiledPackages, scenario) // we store the tx id since later we need to recover how much to bump the // ledger end by, and here the transaction id _is_ the ledger end. val ledgerEntries = new ArrayBuffer[(ScenarioTransactionId, LedgerEntry)](scenarioLedger.scenarioSteps.size) - type Acc = (InMemoryActiveContracts, Time.Timestamp, Option[ScenarioTransactionId]) + type Acc = (InMemoryActiveLedgerState, Time.Timestamp, Option[ScenarioTransactionId]) val (acs, time, txId) = scenarioLedger.scenarioSteps.iterator - .foldLeft[Acc]((InMemoryActiveContracts.empty, Time.Timestamp.Epoch, None)) { + .foldLeft[Acc]((InMemoryActiveLedgerState.empty, Time.Timestamp.Epoch, None)) { case ((acs, time, mbOldTxId), (stepId @ _, step)) => executeScenarioStep(ledgerEntries, scenarioRef, acs, time, mbOldTxId, stepId, step) } @@ -215,12 +215,12 @@ object ScenarioLoader { private def executeScenarioStep( ledger: ArrayBuffer[(ScenarioTransactionId, LedgerEntry)], scenarioRef: Ref.DefinitionRef, - acs: InMemoryActiveContracts, + acs: InMemoryActiveLedgerState, time: Time.Timestamp, mbOldTxId: Option[ScenarioTransactionId], stepId: Int, step: L.ScenarioStep - ): (InMemoryActiveContracts, Time.Timestamp, Option[ScenarioTransactionId]) = { + ): (InMemoryActiveLedgerState, Time.Timestamp, Option[ScenarioTransactionId]) = { step match { case L.Commit(txId: ScenarioTransactionId, richTransaction: L.RichTransaction, _) => mbOldTxId match { @@ -252,7 +252,8 @@ object ScenarioLoader { tx, explicitDisclosure, implicitDisclosure, - globalizedImplicitDisclosure) match { + globalizedImplicitDisclosure, + List.empty) match { case Right(newAcs) => val recordTx = tx.mapNodeId(nodeIdWithHash) val recordDisclosure = explicitDisclosure.map { diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/inmemory/InMemoryLedger.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/inmemory/InMemoryLedger.scala index 2f1077a2d1..c2cad21523 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/inmemory/InMemoryLedger.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/inmemory/InMemoryLedger.scala @@ -33,13 +33,14 @@ import com.digitalasset.ledger.api.domain.{ RejectionReason } import com.digitalasset.platform.sandbox.services.transaction.SandboxEventIdFormatter +import com.digitalasset.platform.sandbox.stores.ActiveLedgerState.ActiveContract import com.digitalasset.platform.sandbox.stores.deduplicator.Deduplicator import com.digitalasset.platform.sandbox.stores.ledger.LedgerEntry.{Checkpoint, Rejection} import com.digitalasset.platform.sandbox.stores.ledger.ScenarioLoader.LedgerEntryOrBump import com.digitalasset.platform.sandbox.stores.ledger.{Ledger, LedgerEntry, LedgerSnapshot} import com.digitalasset.platform.sandbox.stores.{ - ActiveContracts, - InMemoryActiveContracts, + ActiveLedgerState, + InMemoryActiveLedgerState, InMemoryPackageStore } import org.slf4j.LoggerFactory @@ -53,7 +54,7 @@ import scala.util.{Failure, Success, Try} class InMemoryLedger( val ledgerId: LedgerId, timeProvider: TimeProvider, - acs0: InMemoryActiveContracts, + acs0: InMemoryActiveLedgerState, packageStoreInit: InMemoryPackageStore, ledgerEntries: ImmArray[LedgerEntryOrBump]) extends Ledger { @@ -87,13 +88,15 @@ class InMemoryLedger( // need to take the lock to make sure the two pieces of data are consistent. override def snapshot(): Future[LedgerSnapshot] = Future.successful(this.synchronized { - LedgerSnapshot(entries.ledgerEnd, Source(acs.contracts)) + LedgerSnapshot( + entries.ledgerEnd, + Source.fromIterator[ActiveContract](() => acs.activeContracts.valuesIterator)) }) override def lookupContract( - contractId: AbsoluteContractId): Future[Option[ActiveContracts.ActiveContract]] = + contractId: AbsoluteContractId): Future[Option[ActiveLedgerState.Contract]] = Future.successful(this.synchronized { - acs.contracts.get(contractId) + acs.activeContracts.get(contractId) }) override def lookupKey(key: Node.GlobalKey): Future[Option[AbsoluteContractId]] = @@ -162,6 +165,7 @@ class InMemoryLedger( blindingInfo.explicitDisclosure, blindingInfo.localImplicitDisclosure, blindingInfo.globalImplicitDisclosure, + List.empty ) acsRes match { case Left(err) => diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/BaseLedger.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/BaseLedger.scala index 3ace74bf6d..5d793073d5 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/BaseLedger.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/BaseLedger.scala @@ -18,7 +18,7 @@ import com.digitalasset.ledger.api.domain.LedgerId import com.digitalasset.platform.akkastreams.dispatcher.Dispatcher import com.digitalasset.platform.akkastreams.dispatcher.SubSource.RangeSource import com.digitalasset.platform.common.util.DirectExecutionContext -import com.digitalasset.platform.sandbox.stores.ActiveContracts +import com.digitalasset.platform.sandbox.stores.ActiveLedgerState import com.digitalasset.platform.sandbox.stores.ledger.sql.dao.LedgerReadDao import com.digitalasset.platform.sandbox.stores.ledger.{LedgerEntry, LedgerSnapshot, ReadOnlyLedger} @@ -53,16 +53,14 @@ class BaseLedger(val ledgerId: LedgerId, headAtInitialization: Long, ledgerDao: // 3. A GetActiveContractsRequest comes in and we look at the latest ledger_end offset in the database. We will see 6 (from transaction B). // 4. If we finish streaming the active contracts up to offset 6 before transaction A is properly inserted into the DB, the client will not see the contracts from transaction A // The fix to that is to use the latest known headRef, which is updated AFTER a batch has been inserted completely. - //TODO (robert): SQL DAO does not know about ActiveContract, this method does a (trivial) mapping from DAO Contract to Ledger ActiveContract. Intended? The DAO layer was introduced its own Contract abstraction so it can also reason read archived ones if it's needed. In hindsight, this might be necessary at all so we could probably collapse the two ledgerDao .getActiveContractSnapshot(ledgerEnd) - .map(s => LedgerSnapshot(s.offset, s.acs.map(c => (c.contractId, c.toActiveContract))))(DEC) + .map(s => LedgerSnapshot(s.offset, s.acs))(DEC) override def lookupContract( - contractId: AbsoluteContractId): Future[Option[ActiveContracts.ActiveContract]] = + contractId: AbsoluteContractId): Future[Option[ActiveLedgerState.Contract]] = ledgerDao - .lookupActiveContract(contractId) - .map(_.map(c => c.toActiveContract))(DEC) + .lookupActiveOrDivulgedContract(contractId) override def lookupTransaction( transactionId: TransactionIdString): Future[Option[(Long, LedgerEntry.Transaction)]] = diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/SqlLedger.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/SqlLedger.scala index f74d92f183..d8600856a6 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/SqlLedger.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/SqlLedger.scala @@ -38,7 +38,7 @@ import com.digitalasset.platform.sandbox.stores.ledger.sql.serialisation.{ } import com.digitalasset.platform.sandbox.stores.ledger.sql.util.DbDispatcher import com.digitalasset.platform.sandbox.stores.ledger.{Ledger, LedgerEntry} -import com.digitalasset.platform.sandbox.stores.{InMemoryActiveContracts, InMemoryPackageStore} +import com.digitalasset.platform.sandbox.stores.{InMemoryActiveLedgerState, InMemoryPackageStore} import org.slf4j.LoggerFactory import scalaz.syntax.tag._ @@ -69,7 +69,7 @@ object SqlLedger { jdbcUrl: String, ledgerId: Option[LedgerId], timeProvider: TimeProvider, - acs: InMemoryActiveContracts, + acs: InMemoryActiveLedgerState, packages: InMemoryPackageStore, initialLedgerEntries: ImmArray[LedgerEntryOrBump], queueDepth: Int, @@ -265,7 +265,8 @@ private class SqlLedger( mappedDisclosure ), mappedLocalImplicitDisclosure, - blindingInfo.globalImplicitDisclosure + blindingInfo.globalImplicitDisclosure, + List.empty ) } } @@ -343,7 +344,7 @@ private class SqlLedgerFactory(ledgerDao: LedgerDao) { initialLedgerId: Option[LedgerId], timeProvider: TimeProvider, startMode: SqlStartMode, - acs: InMemoryActiveContracts, + acs: InMemoryActiveLedgerState, packages: InMemoryPackageStore, initialLedgerEntries: ImmArray[LedgerEntryOrBump], queueDepth: Int, @@ -382,7 +383,7 @@ private class SqlLedgerFactory(ledgerDao: LedgerDao) { private def initialize( initialLedgerId: Option[LedgerId], timeProvider: TimeProvider, - acs: InMemoryActiveContracts, + acs: InMemoryActiveLedgerState, packages: InMemoryPackageStore, initialLedgerEntries: ImmArray[LedgerEntryOrBump]): Future[LedgerId] = { // Note that here we only store the ledger entry and we do not update anything else, such as the @@ -415,9 +416,7 @@ private class SqlLedgerFactory(ledgerDao: LedgerDao) { s"Initializing ledger with ${initialLedgerEntries.length} ledger entries") } - val contracts = acs.contracts - .map(f => Contract.fromActiveContract(f._1, f._2)) - .toList + val contracts = acs.activeContracts.values.toList val initialLedgerEnd = 0L val entriesWithOffset = initialLedgerEntries.foldLeft( diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/dao/JdbcLedgerDao.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/dao/JdbcLedgerDao.scala index b370ba126d..25b2d23fb1 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/dao/JdbcLedgerDao.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/dao/JdbcLedgerDao.scala @@ -13,18 +13,24 @@ import anorm.SqlParser._ import anorm.ToStatement.optionToStatement import anorm.{AkkaStream, BatchSql, Macro, NamedParameter, RowParser, SQL, SqlParser} import com.daml.ledger.participant.state.index.v2.PackageDetails -import com.daml.ledger.participant.state.v1.TransactionId +import com.daml.ledger.participant.state.v1.{AbsoluteContractInst, TransactionId} import com.digitalasset.daml.lf.archive.Decode import com.digitalasset.daml.lf.data.Ref._ import com.digitalasset.daml.lf.data.Relation.Relation import com.digitalasset.daml.lf.transaction.Node import com.digitalasset.daml.lf.transaction.Node.{GlobalKey, KeyWithMaintainers, NodeCreate} +import com.digitalasset.daml.lf.value.Value import com.digitalasset.daml.lf.value.Value.AbsoluteContractId import com.digitalasset.daml_lf.DamlLf.Archive import com.digitalasset.ledger._ import com.digitalasset.ledger.api.domain.RejectionReason._ import com.digitalasset.ledger.api.domain.{LedgerId, PartyDetails, RejectionReason} import com.digitalasset.platform.common.util.DirectExecutionContext +import com.digitalasset.platform.sandbox.stores.ActiveLedgerState.{ + ActiveContract, + Contract, + DivulgedContract +} import com.digitalasset.platform.sandbox.stores._ import com.digitalasset.platform.sandbox.stores.ledger.LedgerEntry import com.digitalasset.platform.sandbox.stores.ledger.LedgerEntry._ @@ -172,7 +178,7 @@ private class JdbcLedgerDao( override def lookupKey(key: Node.GlobalKey): Future[Option[AbsoluteContractId]] = dbDispatcher.executeSql("lookup contract by key")(implicit conn => selectContractKey(key)) - private def storeContract(offset: Long, contract: Contract)( + private def storeContract(offset: Long, contract: ActiveContract)( implicit connection: Connection): Unit = storeContracts(offset, List(contract)) private def archiveContract(offset: Long, cid: AbsoluteContractId)( @@ -185,8 +191,11 @@ private class JdbcLedgerDao( .execute() private val SQL_INSERT_CONTRACT = - """insert into contracts(id, transaction_id, workflow_id, package_id, name, create_offset, contract, key) - |values({id}, {transaction_id}, {workflow_id}, {package_id}, {name}, {create_offset}, {contract}, {key})""".stripMargin + """insert into contracts(id, transaction_id, workflow_id, package_id, name, create_offset, key) + |values({id}, {transaction_id}, {workflow_id}, {package_id}, {name}, {create_offset}, {key})""".stripMargin + + private val SQL_INSERT_CONTRACT_DATA = + "insert into contract_data(id, contract) values({id}, {contract})" private val SQL_INSERT_CONTRACT_WITNESS = "insert into contract_witnesses(contract_id, witness) values({contract_id}, {witness})" @@ -194,7 +203,7 @@ private class JdbcLedgerDao( private val SQL_INSERT_CONTRACT_KEY_MAINTAINERS = "insert into contract_key_maintainers(contract_id, maintainer) values({contract_id}, {maintainer})" - private def storeContracts(offset: Long, contracts: immutable.Seq[Contract])( + private def storeContracts(offset: Long, contracts: immutable.Seq[ActiveContract])( implicit connection: Connection): Unit = { // An ACS contract contains several collections (e.g., witnesses or divulgences). @@ -206,22 +215,19 @@ private class JdbcLedgerDao( .map( c => Seq[NamedParameter]( - "id" -> c.contractId.coid, + "id" -> c.id.coid, "transaction_id" -> c.transactionId, "workflow_id" -> c.workflowId, - "package_id" -> c.coinst.template.packageId, - "name" -> c.coinst.template.qualifiedName.toString, + "package_id" -> c.contract.template.packageId, + "name" -> c.contract.template.qualifiedName.toString, "create_offset" -> offset, - "contract" -> contractSerializer - .serializeContractInstance(c.coinst) - .getOrElse(sys.error(s"failed to serialize contract! cid:${c.contractId.coid}")), "key" -> c.key .map( k => valueSerializer .serializeValue(k.key) - .getOrElse(sys.error( - s"failed to serialize contract key value! cid:${c.contractId.coid}"))) + .getOrElse( + sys.error(s"failed to serialize contract key value! cid:${c.id.coid}"))) ) ) @@ -230,6 +236,22 @@ private class JdbcLedgerDao( namedContractParams ) + val namedContractDataParams = contracts + .map( + c => + Seq[NamedParameter]( + "id" -> c.id.coid, + "contract" -> contractSerializer + .serializeContractInstance(c.contract) + .getOrElse(sys.error(s"failed to serialize contract! cid:${c.id.coid}")) + ) + ) + + executeBatchSql( + SQL_INSERT_CONTRACT_DATA, + namedContractDataParams + ) + // Part 2: insert witnesses into the 'contract_witnesses' table val namedWitnessesParams = contracts .flatMap( @@ -237,7 +259,7 @@ private class JdbcLedgerDao( c.witnesses.map( w => Seq[NamedParameter]( - "contract_id" -> c.contractId.coid, + "contract_id" -> c.id.coid, "witness" -> w )) ) @@ -264,7 +286,7 @@ private class JdbcLedgerDao( c.divulgences.map( w => Seq[NamedParameter]( - "contract_id" -> c.contractId.coid, + "contract_id" -> c.id.coid, "party" -> w._1, "transaction_id" -> w._2 )) @@ -284,7 +306,7 @@ private class JdbcLedgerDao( c.divulgences.map( w => Seq[NamedParameter]( - "contract_id" -> c.contractId.coid, + "contract_id" -> c.id.coid, "party" -> w._1, "ledger_offset" -> offset, "transaction_id" -> c.transactionId @@ -310,7 +332,7 @@ private class JdbcLedgerDao( k.maintainers.map( p => Seq[NamedParameter]( - "contract_id" -> c.contractId.coid, + "contract_id" -> c.id.coid, "maintainer" -> p ))) .getOrElse(Set.empty) @@ -356,7 +378,8 @@ private class JdbcLedgerDao( offset: Long, tx: Transaction, localImplicitDisclosure: Relation[EventId, Party], - globalImplicitDisclosure: Relation[AbsoluteContractId, Party])( + globalImplicitDisclosure: Relation[AbsoluteContractId, Party], + divulgedContracts: List[(Value.AbsoluteContractId, AbsoluteContractInst)])( implicit connection: Connection): Option[RejectionReason] = tx match { case Transaction( _, @@ -368,19 +391,16 @@ private class JdbcLedgerDao( _, transaction, disclosure) => - final class AcsStoreAcc extends ActiveContracts[AcsStoreAcc] { + final class AcsStoreAcc extends ActiveLedgerState[AcsStoreAcc] { override def lookupContract(cid: AbsoluteContractId) = - lookupActiveContractSync(cid).map(_.toActiveContract) + lookupContractSync(cid) override def keyExists(key: GlobalKey): Boolean = selectContractKey(key).isDefined - override def addContract( - cid: AbsoluteContractId, - c: ActiveContracts.ActiveContract, - keyO: Option[GlobalKey]) = { - storeContract(offset, Contract.fromActiveContract(cid, c)) - keyO.foreach(key => storeContractKey(key, cid)) + override def addContract(c: ActiveContract, keyO: Option[GlobalKey]): AcsStoreAcc = { + storeContract(offset, c) + keyO.foreach(key => storeContractKey(key, c.id)) this } @@ -404,9 +424,10 @@ private class JdbcLedgerDao( this } - override def divulgeAlreadyCommittedContract( + override def divulgeAlreadyCommittedContracts( transactionId: TransactionIdString, - global: Relation[AbsoluteContractId, Party]) = { + global: Relation[AbsoluteContractId, Party], + divulgedContracts: List[(Value.AbsoluteContractId, AbsoluteContractInst)]) = { val divulgenceParams = global .flatMap { case (cid, parties) => @@ -430,7 +451,7 @@ private class JdbcLedgerDao( } // this should be a class member field, we can't move it out yet as the functions above are closing over to the implicit Connection - val acsManager = new ActiveContractsManager(new AcsStoreAcc) + val acsManager = new ActiveLedgerStateManager(new AcsStoreAcc) // Note: ACS is typed as Unit here, as the ACS is given implicitly by the current database state // within the current SQL transaction. All of the given functions perform side effects to update the database. @@ -441,7 +462,8 @@ private class JdbcLedgerDao( transaction, disclosure, localImplicitDisclosure, - globalImplicitDisclosure + globalImplicitDisclosure, + divulgedContracts ) atr match { @@ -531,11 +553,20 @@ private class JdbcLedgerDao( def insertEntry(le: PersistenceEntry)(implicit conn: Connection): PersistenceResponse = le match { - case PersistenceEntry.Transaction(tx, localImplicitDisclosure, globalImplicitDisclosure) => + case PersistenceEntry.Transaction( + tx, + localImplicitDisclosure, + globalImplicitDisclosure, + divulgedContracts) => Try { storeTransaction(offset, tx) - updateActiveContractSet(offset, tx, localImplicitDisclosure, globalImplicitDisclosure) + updateActiveContractSet( + offset, + tx, + localImplicitDisclosure, + globalImplicitDisclosure, + divulgedContracts) .flatMap { rejectionReason => // we need to rollback the existing sql transaction conn.rollback() @@ -586,7 +617,7 @@ private class JdbcLedgerDao( } override def storeInitialState( - activeContracts: immutable.Seq[Contract], + activeContracts: immutable.Seq[ActiveContract], ledgerEntries: immutable.Seq[(LedgerOffset, LedgerEntry)], newLedgerEnd: LedgerOffset ): Future[Unit] = { @@ -779,16 +810,21 @@ private class JdbcLedgerDao( } private val ContractDataParser = (ledgerString("id") - ~ ledgerString("transaction_id") + ~ ledgerString("transaction_id").? ~ ledgerString("workflow_id").? - ~ date("effective_at") + ~ date("effective_at").? ~ binaryStream("contract") ~ binaryStream("key").? - ~ binaryStream("transaction") map (flatten)) + ~ binaryStream("transaction").? map (flatten)) private val SQL_SELECT_CONTRACT = SQL( - "select c.*, le.effective_at, le.transaction from contracts c inner join ledger_entries le on c.transaction_id = le.transaction_id where id={contract_id} and archive_offset is null ") + """ + |select cd.id, cd.contract, c.transaction_id, c.workflow_id, c.key, le.effective_at, le.transaction + |from contract_data cd + |left join contracts c on cd.id=c.id + |left join ledger_entries le on c.transaction_id = le.transaction_id + |where cd.id={contract_id} and c.archive_offset is null""".stripMargin) private val SQL_SELECT_WITNESS = SQL("select witness from contract_witnesses where contract_id={contract_id}") @@ -804,32 +840,55 @@ private class JdbcLedgerDao( private val SQL_SELECT_KEY_MAINTAINERS = SQL("select maintainer from contract_key_maintainers where contract_id={contract_id}") - private def lookupActiveContractSync(contractId: AbsoluteContractId)( + private def lookupContractSync(contractId: AbsoluteContractId)( implicit conn: Connection): Option[Contract] = SQL_SELECT_CONTRACT .on("contract_id" -> contractId.coid) .as(ContractDataParser.singleOpt) .map(mapContractDetails) - override def lookupActiveContract(contractId: AbsoluteContractId): Future[Option[Contract]] = + override def lookupActiveOrDivulgedContract( + contractId: AbsoluteContractId): Future[Option[Contract]] = dbDispatcher.executeSql(s"lookup active contract [${contractId.coid}]") { implicit conn => - lookupActiveContractSync(contractId) + lookupContractSync(contractId) } private def mapContractDetails( contractResult: ( ContractIdString, - TransactionIdString, + Option[TransactionIdString], Option[WorkflowId], - Date, + Option[Date], InputStream, Option[InputStream], - InputStream))(implicit conn: Connection) = + Option[InputStream]))(implicit conn: Connection): Contract = contractResult match { - case (coid, transactionId, workflowId, ledgerEffectiveTime, contractStream, keyStreamO, tx) => + case (coid, None, None, None, contractStream, None, None) => + val divulgences = lookupDivulgences(coid) + val absoluteCoid = AbsoluteContractId(coid) + + DivulgedContract( + absoluteCoid, + contractSerializer + .deserializeContractInstance(ByteStreams.toByteArray(contractStream)) + .getOrElse(sys.error(s"failed to deserialize contract! cid:$coid")), + divulgences + ) + + case ( + coid, + Some(transactionId), + workflowId, + Some(ledgerEffectiveTime), + contractStream, + keyStreamO, + Some(tx)) => val witnesses = lookupWitnesses(coid) val divulgences = lookupDivulgences(coid) val absoluteCoid = AbsoluteContractId(coid) + val contractInstance = contractSerializer + .deserializeContractInstance(ByteStreams.toByteArray(contractStream)) + .getOrElse(sys.error(s"failed to deserialize contract! cid:$coid")) val (signatories, observers) = transactionSerializer @@ -842,16 +901,14 @@ private class JdbcLedgerDao( (signatories, stakeholders diff signatories) } getOrElse sys.error(s"no create node in contract creating transaction! cid:$coid") - Contract( + ActiveContract( absoluteCoid, ledgerEffectiveTime.toInstant, transactionId, workflowId, + contractInstance, witnesses, divulgences, - contractSerializer - .deserializeContractInstance(ByteStreams.toByteArray(contractStream)) - .getOrElse(sys.error(s"failed to deserialize contract! cid:$coid")), keyStreamO.map(keyStream => { val keyMaintainers = lookupKeyMaintainers(coid) val keyValue = valueSerializer @@ -860,8 +917,13 @@ private class JdbcLedgerDao( KeyWithMaintainers(keyValue, keyMaintainers) }), signatories, - observers + observers, + contractInstance.agreementText ) + + case (_, _, _, _, _, _, _) => + sys.error( + "mapContractDetails called with partial data, can not map to either active or divulged contract") } private def lookupWitnesses(coid: String)(implicit conn: Connection): Set[Party] = @@ -932,7 +994,12 @@ private class JdbcLedgerDao( private val SQL_SELECT_ACTIVE_CONTRACTS = SQL( - "select c.*, le.effective_at, le.transaction from contracts c inner join ledger_entries le on c.transaction_id = le.transaction_id where create_offset <= {offset} and (archive_offset is null or archive_offset > {offset})") + """ + |select cd.id, cd.contract, c.transaction_id, c.workflow_id, c.key, le.effective_at, le.transaction + |from contracts c + |inner join contract_data cd on c.id = cd.id + |inner join ledger_entries le on c.transaction_id = le.transaction_id + |where create_offset <= {offset} and (archive_offset is null or archive_offset > {offset})""".stripMargin) override def getActiveContractSnapshot(untilExclusive: LedgerOffset)( implicit mat: Materializer): Future[LedgerSnapshot] = { @@ -945,7 +1012,11 @@ private class JdbcLedgerDao( // it's ok to not have query isolation as witnesses cannot change once we saved them dbDispatcher .executeSql(s"load contract details ${contractResult._1}") { implicit conn => - mapContractDetails(contractResult) + mapContractDetails(contractResult) match { + case ac: ActiveContract => ac + case _: DivulgedContract => + sys.error("Impossible: SQL_SELECT_ACTIVE_CONTRACTS returned a divulged contract") + } } } }.mapMaterializedValue(_.map(_ => Done)(DirectExecutionContext)) @@ -1103,6 +1174,7 @@ private class JdbcLedgerDao( |truncate ledger_entries cascade; |truncate disclosures cascade; |truncate contracts cascade; + |truncate contract_data cascade; |truncate contract_witnesses cascade; |truncate contract_key_maintainers cascade; |truncate parameters cascade; diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/dao/LedgerDao.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/dao/LedgerDao.scala index 59a3a56dd4..42416379f9 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/dao/LedgerDao.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/dao/LedgerDao.scala @@ -3,70 +3,28 @@ package com.digitalasset.platform.sandbox.stores.ledger.sql.dao -import java.time.Instant - import akka.NotUsed import akka.stream.Materializer import akka.stream.scaladsl.Source import com.daml.ledger.participant.state.index.v2.PackageDetails -import com.daml.ledger.participant.state.v1.TransactionId +import com.daml.ledger.participant.state.v1.{AbsoluteContractInst, TransactionId} import com.digitalasset.daml.lf.data.Ref.{LedgerString, PackageId, Party} import com.digitalasset.daml.lf.data.Relation.Relation import com.digitalasset.daml.lf.transaction.Node -import com.digitalasset.daml.lf.transaction.Node.KeyWithMaintainers -import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, ContractInst, VersionedValue} +import com.digitalasset.daml.lf.value.Value +import com.digitalasset.daml.lf.value.Value.AbsoluteContractId import com.digitalasset.daml_lf.DamlLf.Archive import com.digitalasset.ledger._ import com.digitalasset.ledger.api.domain.{LedgerId, PartyDetails} import com.digitalasset.platform.common.util.DirectExecutionContext import com.digitalasset.platform.sandbox.metrics.MetricsManager -import com.digitalasset.platform.sandbox.stores.ActiveContracts.ActiveContract +import com.digitalasset.platform.sandbox.stores.ActiveLedgerState.{ActiveContract, Contract} import com.digitalasset.platform.sandbox.stores.ledger.LedgerEntry import com.digitalasset.platform.sandbox.stores.ledger.LedgerEntry.Transaction import scala.collection.immutable import scala.concurrent.Future -final case class Contract( - contractId: AbsoluteContractId, - let: Instant, - transactionId: TransactionId, - workflowId: Option[WorkflowId], - witnesses: Set[Party], - divulgences: Map[Party, TransactionId], - coinst: ContractInst[VersionedValue[AbsoluteContractId]], - key: Option[KeyWithMaintainers[VersionedValue[AbsoluteContractId]]], - signatories: Set[Party], - observers: Set[Party]) { - def toActiveContract: ActiveContract = - ActiveContract( - let, - transactionId, - workflowId, - coinst, - witnesses, - divulgences, - key, - signatories, - observers, - coinst.agreementText) -} - -object Contract { - def fromActiveContract(cid: AbsoluteContractId, ac: ActiveContract): Contract = - Contract( - cid, - ac.let, - ac.transactionId, - ac.workflowId, - ac.witnesses, - ac.divulgences, - ac.contract, - ac.key, - ac.signatories, - ac.observers) -} - /** * Every time the ledger persists a transactions, the active contract set (ACS) is updated. * Updating the ACS requires knowledge of blinding info, which is not included in LedgerEntry.Transaction. @@ -79,7 +37,8 @@ object PersistenceEntry { final case class Transaction( entry: LedgerEntry.Transaction, localImplicitDisclosure: Relation[EventId, Party], - globalImplicitDisclosure: Relation[AbsoluteContractId, Party] + globalImplicitDisclosure: Relation[AbsoluteContractId, Party], + divulgedContracts: List[(Value.AbsoluteContractId, AbsoluteContractInst)] ) extends PersistenceEntry final case class Checkpoint(entry: LedgerEntry.Checkpoint) extends PersistenceEntry } @@ -94,7 +53,7 @@ object PersistenceResponse { } -case class LedgerSnapshot(offset: Long, acs: Source[Contract, NotUsed]) +case class LedgerSnapshot(offset: Long, acs: Source[ActiveContract, NotUsed]) trait LedgerReadDao extends AutoCloseable { @@ -111,8 +70,8 @@ trait LedgerReadDao extends AutoCloseable { /** Looks up the current external ledger end offset*/ def lookupExternalLedgerEnd(): Future[Option[LedgerString]] - /** Looks up an active contract. Archived contracts must not be returned by this method */ - def lookupActiveContract(contractId: AbsoluteContractId): Future[Option[Contract]] + /** Looks up an active or divulged contract. Archived contracts must not be returned by this method */ + def lookupActiveOrDivulgedContract(contractId: AbsoluteContractId): Future[Option[Contract]] /** * Looks up a LedgerEntry at a given offset @@ -217,7 +176,7 @@ trait LedgerWriteDao extends AutoCloseable { * @return Ok when the operation was successful */ def storeInitialState( - activeContracts: immutable.Seq[Contract], + activeContracts: immutable.Seq[ActiveContract], ledgerEntries: immutable.Seq[(LedgerOffset, LedgerEntry)], newLedgerEnd: LedgerOffset ): Future[Unit] diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/dao/MeteredLedgerDao.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/dao/MeteredLedgerDao.scala index 740a499dc5..3ef0831d83 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/dao/MeteredLedgerDao.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/dao/MeteredLedgerDao.scala @@ -14,6 +14,7 @@ import com.digitalasset.daml.lf.value.Value import com.digitalasset.daml_lf.DamlLf.Archive import com.digitalasset.ledger.api.domain.{LedgerId, PartyDetails} import com.digitalasset.platform.sandbox.metrics.MetricsManager +import com.digitalasset.platform.sandbox.stores.ActiveLedgerState.{ActiveContract, Contract} import com.digitalasset.platform.sandbox.stores.ledger.LedgerEntry import scala.collection.immutable @@ -31,9 +32,11 @@ private class MeteredLedgerReadDao(ledgerDao: LedgerReadDao, mm: MetricsManager) override def lookupExternalLedgerEnd(): Future[Option[LedgerString]] = mm.timedFuture("LedgerDao:lookupExternalLedgerEnd", ledgerDao.lookupExternalLedgerEnd()) - override def lookupActiveContract( + override def lookupActiveOrDivulgedContract( contractId: Value.AbsoluteContractId): Future[Option[Contract]] = - mm.timedFuture("LedgerDao:lookupActiveContract", ledgerDao.lookupActiveContract(contractId)) + mm.timedFuture( + "LedgerDao:lookupActiveContract", + ledgerDao.lookupActiveOrDivulgedContract(contractId)) override def lookupLedgerEntry(offset: Long): Future[Option[LedgerEntry]] = mm.timedFuture("LedgerDao:lookupLedgerEntry", ledgerDao.lookupLedgerEntry(offset)) @@ -81,7 +84,7 @@ private class MeteredLedgerDao(ledgerDao: LedgerDao, mm: MetricsManager) ledgerDao.storeLedgerEntry(offset, newLedgerEnd, externalOffset, ledgerEntry)) override def storeInitialState( - activeContracts: immutable.Seq[Contract], + activeContracts: immutable.Seq[ActiveContract], ledgerEntries: immutable.Seq[(LedgerOffset, LedgerEntry)], newLedgerEnd: LedgerOffset ): Future[Unit] = diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/migration/postgres/V2_1__Rebuild_Acs.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/migration/postgres/V2_1__Rebuild_Acs.scala index 46e0255699..48b2be58c3 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/migration/postgres/V2_1__Rebuild_Acs.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/migration/postgres/V2_1__Rebuild_Acs.scala @@ -13,20 +13,22 @@ import akka.stream.scaladsl.Source import akka.NotUsed import anorm.SqlParser._ import anorm.{BatchSql, Macro, NamedParameter, RowParser, SQL, SqlParser} +import com.daml.ledger.participant.state.v1.AbsoluteContractInst import com.digitalasset.daml.lf.data.Ref import com.digitalasset.daml.lf.data.Ref._ import com.digitalasset.daml.lf.data.Relation.Relation import com.digitalasset.daml.lf.engine.Blinding import com.digitalasset.daml.lf.transaction.Transaction import com.digitalasset.daml.lf.transaction.Node.{GlobalKey, KeyWithMaintainers, NodeCreate} +import com.digitalasset.daml.lf.value.Value import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, ContractId} import com.digitalasset.ledger._ import com.digitalasset.ledger.api.domain.RejectionReason import com.digitalasset.ledger.api.domain.RejectionReason._ import com.digitalasset.platform.sandbox.services.transaction.SandboxEventIdFormatter +import com.digitalasset.platform.sandbox.stores.ActiveLedgerState.{ActiveContract, Contract} import com.digitalasset.platform.sandbox.stores._ import com.digitalasset.platform.sandbox.stores.ledger.LedgerEntry -import com.digitalasset.platform.sandbox.stores.ledger.sql.dao.Contract import com.digitalasset.platform.sandbox.stores.ledger.sql.serialisation.{ ContractSerializer, KeyHasher, @@ -126,7 +128,7 @@ class V2_1__Rebuild_Acs extends BaseJavaMigration { .as(ledgerString("contract_id").singleOpt) .map(AbsoluteContractId) - private def storeContract(offset: Long, contract: Contract)( + private def storeContract(offset: Long, contract: ActiveContract)( implicit connection: Connection): Unit = storeContracts(offset, List(contract)) private def archiveContract(offset: Long, cid: AbsoluteContractId)( @@ -148,7 +150,7 @@ class V2_1__Rebuild_Acs extends BaseJavaMigration { private val SQL_INSERT_CONTRACT_KEY_MAINTAINERS = "insert into contract_key_maintainers(contract_id, maintainer) values({contract_id}, {maintainer})" - private def storeContracts(offset: Long, contracts: immutable.Seq[Contract])( + private def storeContracts(offset: Long, contracts: immutable.Seq[ActiveContract])( implicit connection: Connection): Unit = { // A ACS contract contaixns several collections (e.g., witnesses or divulgences). @@ -160,22 +162,22 @@ class V2_1__Rebuild_Acs extends BaseJavaMigration { .map( c => Seq[NamedParameter]( - "id" -> c.contractId.coid, + "id" -> c.id.coid, "transaction_id" -> c.transactionId, "workflow_id" -> c.workflowId.getOrElse(""), - "package_id" -> c.coinst.template.packageId, - "name" -> c.coinst.template.qualifiedName.toString, + "package_id" -> c.contract.template.packageId, + "name" -> c.contract.template.qualifiedName.toString, "create_offset" -> offset, "contract" -> contractSerializer - .serializeContractInstance(c.coinst) - .getOrElse(sys.error(s"failed to serialize contract! cid:${c.contractId.coid}")), + .serializeContractInstance(c.contract) + .getOrElse(sys.error(s"failed to serialize contract! cid:${c.id.coid}")), "key" -> c.key .map( k => valueSerializer .serializeValue(k.key) - .getOrElse(sys.error( - s"failed to serialize contract key value! cid:${c.contractId.coid}"))) + .getOrElse( + sys.error(s"failed to serialize contract key value! cid:${c.id.coid}"))) ) ) @@ -191,7 +193,7 @@ class V2_1__Rebuild_Acs extends BaseJavaMigration { c.witnesses.map( w => Seq[NamedParameter]( - "contract_id" -> c.contractId.coid, + "contract_id" -> c.id.coid, "witness" -> w )) ) @@ -218,7 +220,7 @@ class V2_1__Rebuild_Acs extends BaseJavaMigration { c.divulgences.map( w => Seq[NamedParameter]( - "contract_id" -> c.contractId.coid, + "contract_id" -> c.id.coid, "party" -> (w._1: String), "transaction_id" -> w._2 )) @@ -238,7 +240,7 @@ class V2_1__Rebuild_Acs extends BaseJavaMigration { c.divulgences.map( w => Seq[NamedParameter]( - "contract_id" -> c.contractId.coid, + "contract_id" -> c.id.coid, "party" -> (w._1: String), "ledger_offset" -> offset, "transaction_id" -> c.transactionId @@ -264,7 +266,7 @@ class V2_1__Rebuild_Acs extends BaseJavaMigration { k.maintainers.map( p => Seq[NamedParameter]( - "contract_id" -> c.contractId.coid, + "contract_id" -> c.id.coid, "maintainer" -> p ))) .getOrElse(Set.empty) @@ -344,19 +346,16 @@ class V2_1__Rebuild_Acs extends BaseJavaMigration { val mappedDisclosure = explicitDisclosure .mapValues(parties => parties.map(Party.assertFromString)) - final class AcsStoreAcc extends ActiveContracts[AcsStoreAcc] { + final class AcsStoreAcc extends ActiveLedgerState[AcsStoreAcc] { override def lookupContract(cid: AbsoluteContractId) = - lookupActiveContractSync(cid).map(_.toActiveContract) + lookupActiveContractSync(cid) override def keyExists(key: GlobalKey): Boolean = selectContractKey(key).isDefined - override def addContract( - cid: AbsoluteContractId, - c: ActiveContracts.ActiveContract, - keyO: Option[GlobalKey]) = { - storeContract(offset, Contract.fromActiveContract(cid, c)) - keyO.foreach(key => storeContractKey(key, cid)) + override def addContract(c: ActiveLedgerState.ActiveContract, keyO: Option[GlobalKey]) = { + storeContract(offset, c) + keyO.foreach(key => storeContractKey(key, c.id)) this } @@ -371,9 +370,10 @@ class V2_1__Rebuild_Acs extends BaseJavaMigration { this } - override def divulgeAlreadyCommittedContract( + override def divulgeAlreadyCommittedContracts( transactionId: TransactionIdString, - global: Relation[AbsoluteContractId, Party]) = { + global: Relation[AbsoluteContractId, Party], + referencedContracts: List[(Value.AbsoluteContractId, AbsoluteContractInst)]) = { val divulgenceParams = global .flatMap { case (cid, parties) => @@ -397,7 +397,7 @@ class V2_1__Rebuild_Acs extends BaseJavaMigration { } // this should be a class member field, we can't move it out yet as the functions above are closing over to the implicit Connection - val acsManager = new ActiveContractsManager(new AcsStoreAcc) + val acsManager = new ActiveLedgerStateManager(new AcsStoreAcc) // Note: ACS is typed as Unit here, as the ACS is given implicitly by the current database state // within the current SQL transaction. All of the given functions perform side effects to update the database. @@ -408,7 +408,8 @@ class V2_1__Rebuild_Acs extends BaseJavaMigration { transaction, mappedDisclosure, localImplicitDisclosure, - globalImplicitDisclosure + globalImplicitDisclosure, + List.empty ) atr match { @@ -629,6 +630,9 @@ class V2_1__Rebuild_Acs extends BaseJavaMigration { .as(ContractDataParser.singleOpt) .map(mapContractDetails) + /** Note: at the time this migration was written, divulged contracts were not stored separately from active contracts. + * This method therefore always returns an ActiveContract. + */ private def mapContractDetails( contractResult: ( ContractIdString, @@ -637,12 +641,15 @@ class V2_1__Rebuild_Acs extends BaseJavaMigration { Date, InputStream, Option[InputStream], - InputStream))(implicit conn: Connection) = + InputStream))(implicit conn: Connection): ActiveContract = contractResult match { case (coid, transactionId, workflowId, createdAt, contractStream, keyStreamO, tx) => val witnesses = lookupWitnesses(coid) val divulgences = lookupDivulgences(coid) val absoluteCoid = AbsoluteContractId(coid) + val contractInstance = contractSerializer + .deserializeContractInstance(ByteStreams.toByteArray(contractStream)) + .getOrElse(sys.error(s"failed to deserialize contract! cid:$coid")) val (signatories: Set[Ref.Party], observers: Set[Ref.Party]) = transactionSerializer @@ -658,16 +665,14 @@ class V2_1__Rebuild_Acs extends BaseJavaMigration { (Set.empty, Set.empty) } - Contract( + ActiveContract( absoluteCoid, createdAt.toInstant, transactionId, Some(workflowId), + contractInstance, witnesses, divulgences, - contractSerializer - .deserializeContractInstance(ByteStreams.toByteArray(contractStream)) - .getOrElse(sys.error(s"failed to deserialize contract! cid:$coid")), keyStreamO.map(keyStream => { val keyMaintainers = lookupKeyMaintainers(coid) val keyValue = valueSerializer @@ -676,7 +681,8 @@ class V2_1__Rebuild_Acs extends BaseJavaMigration { KeyWithMaintainers(keyValue, keyMaintainers) }), signatories, - observers + observers, + contractInstance.agreementText ) } diff --git a/ledger/sandbox/src/test/lib/scala/com/digitalasset/platform/sandbox/LedgerResource.scala b/ledger/sandbox/src/test/lib/scala/com/digitalasset/platform/sandbox/LedgerResource.scala index f0d4fd869e..5b44849825 100644 --- a/ledger/sandbox/src/test/lib/scala/com/digitalasset/platform/sandbox/LedgerResource.scala +++ b/ledger/sandbox/src/test/lib/scala/com/digitalasset/platform/sandbox/LedgerResource.scala @@ -8,7 +8,7 @@ import com.digitalasset.api.util.TimeProvider import com.digitalasset.ledger.api.testing.utils.Resource import com.digitalasset.platform.sandbox.metrics.MetricsManager import com.digitalasset.platform.sandbox.persistence.{PostgresFixture, PostgresResource} -import com.digitalasset.platform.sandbox.stores.{InMemoryActiveContracts, InMemoryPackageStore} +import com.digitalasset.platform.sandbox.stores.{InMemoryActiveLedgerState, InMemoryPackageStore} import com.digitalasset.platform.sandbox.stores.ledger.sql.SqlStartMode import com.digitalasset.platform.sandbox.stores.ledger.Ledger import com.digitalasset.daml.lf.data.ImmArray @@ -33,7 +33,7 @@ object LedgerResource { def inMemory( ledgerId: LedgerId, timeProvider: TimeProvider, - acs: InMemoryActiveContracts = InMemoryActiveContracts.empty, + acs: InMemoryActiveLedgerState = InMemoryActiveLedgerState.empty, packages: InMemoryPackageStore = InMemoryPackageStore.empty, entries: ImmArray[LedgerEntryOrBump] = ImmArray.empty): Resource[Ledger] = LedgerResource.resource( @@ -68,7 +68,7 @@ object LedgerResource { postgres.value.jdbcUrl, ledgerId, timeProvider, - InMemoryActiveContracts.empty, + InMemoryActiveLedgerState.empty, packages, ImmArray.empty, 128, diff --git a/ledger/sandbox/src/test/lib/scala/com/digitalasset/platform/sandbox/TestHelpers.scala b/ledger/sandbox/src/test/lib/scala/com/digitalasset/platform/sandbox/TestHelpers.scala index d584144be7..46977b1134 100644 --- a/ledger/sandbox/src/test/lib/scala/com/digitalasset/platform/sandbox/TestHelpers.scala +++ b/ledger/sandbox/src/test/lib/scala/com/digitalasset/platform/sandbox/TestHelpers.scala @@ -17,7 +17,7 @@ import com.digitalasset.platform.sandbox.metrics.MetricsManager import com.digitalasset.platform.sandbox.services.ApiSubmissionService import com.digitalasset.platform.sandbox.stores.ledger.CommandExecutorImpl import com.digitalasset.platform.sandbox.stores.{ - InMemoryActiveContracts, + InMemoryActiveLedgerState, InMemoryPackageStore, SandboxIndexAndWriteService } @@ -60,7 +60,7 @@ trait TestHelpers { participantId, TimeModel.reasonableDefault, TimeProvider.Constant(Instant.EPOCH), - InMemoryActiveContracts.empty, + InMemoryActiveLedgerState.empty, ImmArray.empty, packageStore ) diff --git a/ledger/sandbox/src/test/suite/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/JdbcLedgerDaoSpec.scala b/ledger/sandbox/src/test/suite/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/JdbcLedgerDaoSpec.scala index e338025509..84b36be1f5 100644 --- a/ledger/sandbox/src/test/suite/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/JdbcLedgerDaoSpec.scala +++ b/ledger/sandbox/src/test/suite/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/JdbcLedgerDaoSpec.scala @@ -34,6 +34,7 @@ import com.digitalasset.ledger.EventId import com.digitalasset.ledger.api.domain.{LedgerId, RejectionReason} import com.digitalasset.ledger.api.testing.utils.AkkaBeforeAndAfterAll import com.digitalasset.platform.sandbox.persistence.PostgresAroundAll +import com.digitalasset.platform.sandbox.stores.ActiveLedgerState.ActiveContract import com.digitalasset.platform.sandbox.stores.ledger.LedgerEntry import com.digitalasset.platform.sandbox.stores.ledger.sql.dao._ import com.digitalasset.platform.sandbox.stores.ledger.sql.migration.FlywayMigrations @@ -120,17 +121,18 @@ class JdbcLedgerDaoSpec Set(alice) ) - val contract = Contract( + val contract = ActiveContract( absCid, let, txId, Some(workflowId), + contractInstance, Set(alice, bob), Map(alice -> txId, bob -> txId), - contractInstance, Some(keyWithMaintainers), Set(alice, bob), - Set.empty + Set.empty, + contractInstance.agreementText ) val transaction = LedgerEntry.Transaction( @@ -157,7 +159,7 @@ class JdbcLedgerDaoSpec Map(event1 -> Set[Party]("Alice", "Bob"), event2 -> Set[Party]("Alice", "In", "Chains")) ) for { - result1 <- ledgerDao.lookupActiveContract(absCid) + result1 <- ledgerDao.lookupActiveOrDivulgedContract(absCid) _ <- ledgerDao.storeLedgerEntry( offset, offset + 1, @@ -166,10 +168,13 @@ class JdbcLedgerDaoSpec transaction, Map.empty, Map( - absCid -> Set(Ref.Party.assertFromString("Alice"), Ref.Party.assertFromString("Bob"))) + absCid -> Set( + Ref.Party.assertFromString("Alice"), + Ref.Party.assertFromString("Bob"))), + List.empty ) ) - result2 <- ledgerDao.lookupActiveContract(absCid) + result2 <- ledgerDao.lookupActiveOrDivulgedContract(absCid) externalLedgerEnd <- ledgerDao.lookupExternalLedgerEnd() } yield { result1 shouldEqual None @@ -351,7 +356,7 @@ class JdbcLedgerDaoSpec offset, offset + 1, None, - PersistenceEntry.Transaction(transaction, Map.empty, Map.empty)) + PersistenceEntry.Transaction(transaction, Map.empty, Map.empty, List.empty)) entry <- ledgerDao.lookupLedgerEntry(offset) endingOffset <- ledgerDao.lookupLedgerEnd() } yield { @@ -416,7 +421,7 @@ class JdbcLedgerDaoSpec offset, offset + 1, None, - PersistenceEntry.Transaction(transaction, Map.empty, Map.empty)) + PersistenceEntry.Transaction(transaction, Map.empty, Map.empty, List.empty)) entry <- ledgerDao.lookupLedgerEntry(offset) endingOffset <- ledgerDao.lookupLedgerEnd() } yield { @@ -512,7 +517,7 @@ class JdbcLedgerDaoSpec offset, offset + 1, None, - PersistenceEntry.Transaction(t, Map.empty, Map.empty)) + PersistenceEntry.Transaction(t, Map.empty, Map.empty, List.empty)) .map(_ => ()) } @@ -524,7 +529,7 @@ class JdbcLedgerDaoSpec offset, offset + 1, None, - PersistenceEntry.Transaction(t, Map.empty, Map.empty)) + PersistenceEntry.Transaction(t, Map.empty, Map.empty, List.empty)) .map(_ => ()) } diff --git a/ledger/sandbox/src/test/suite/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/SqlLedgerSpec.scala b/ledger/sandbox/src/test/suite/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/SqlLedgerSpec.scala index b02abca2e7..98be8b4952 100644 --- a/ledger/sandbox/src/test/suite/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/SqlLedgerSpec.scala +++ b/ledger/sandbox/src/test/suite/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/SqlLedgerSpec.scala @@ -8,7 +8,7 @@ import com.digitalasset.ledger.api.testing.utils.AkkaBeforeAndAfterAll import com.digitalasset.platform.sandbox.MetricsAround import com.digitalasset.platform.sandbox.persistence.PostgresAroundEach import com.digitalasset.daml.lf.data.{ImmArray, Ref} -import com.digitalasset.platform.sandbox.stores.{InMemoryActiveContracts, InMemoryPackageStore} +import com.digitalasset.platform.sandbox.stores.{InMemoryActiveLedgerState, InMemoryPackageStore} import org.scalatest.concurrent.{AsyncTimeLimitedTests, ScaledTimeSpans} import org.scalatest.time.Span import org.scalatest.{AsyncWordSpec, Matchers} @@ -37,7 +37,7 @@ class SqlLedgerSpec jdbcUrl = postgresFixture.jdbcUrl, ledgerId = None, timeProvider = TimeProvider.UTC, - acs = InMemoryActiveContracts.empty, + acs = InMemoryActiveLedgerState.empty, packages = InMemoryPackageStore.empty, initialLedgerEntries = ImmArray.empty, queueDepth @@ -53,7 +53,7 @@ class SqlLedgerSpec jdbcUrl = postgresFixture.jdbcUrl, ledgerId = Some(ledgerId), timeProvider = TimeProvider.UTC, - acs = InMemoryActiveContracts.empty, + acs = InMemoryActiveLedgerState.empty, packages = InMemoryPackageStore.empty, initialLedgerEntries = ImmArray.empty, queueDepth @@ -71,7 +71,7 @@ class SqlLedgerSpec jdbcUrl = postgresFixture.jdbcUrl, ledgerId = Some(ledgerId), timeProvider = TimeProvider.UTC, - acs = InMemoryActiveContracts.empty, + acs = InMemoryActiveLedgerState.empty, packages = InMemoryPackageStore.empty, initialLedgerEntries = ImmArray.empty, queueDepth @@ -81,7 +81,7 @@ class SqlLedgerSpec jdbcUrl = postgresFixture.jdbcUrl, ledgerId = Some(ledgerId), timeProvider = TimeProvider.UTC, - acs = InMemoryActiveContracts.empty, + acs = InMemoryActiveLedgerState.empty, packages = InMemoryPackageStore.empty, initialLedgerEntries = ImmArray.empty, queueDepth @@ -91,7 +91,7 @@ class SqlLedgerSpec jdbcUrl = postgresFixture.jdbcUrl, ledgerId = None, timeProvider = TimeProvider.UTC, - acs = InMemoryActiveContracts.empty, + acs = InMemoryActiveLedgerState.empty, packages = InMemoryPackageStore.empty, initialLedgerEntries = ImmArray.empty, queueDepth @@ -111,7 +111,7 @@ class SqlLedgerSpec jdbcUrl = postgresFixture.jdbcUrl, ledgerId = Some(LedgerId(Ref.LedgerString.assertFromString("TheLedger"))), timeProvider = TimeProvider.UTC, - acs = InMemoryActiveContracts.empty, + acs = InMemoryActiveLedgerState.empty, packages = InMemoryPackageStore.empty, initialLedgerEntries = ImmArray.empty, queueDepth @@ -120,7 +120,7 @@ class SqlLedgerSpec jdbcUrl = postgresFixture.jdbcUrl, ledgerId = Some(LedgerId(Ref.LedgerString.assertFromString("AnotherLedger"))), timeProvider = TimeProvider.UTC, - acs = InMemoryActiveContracts.empty, + acs = InMemoryActiveLedgerState.empty, packages = InMemoryPackageStore.empty, initialLedgerEntries = ImmArray.empty, queueDepth