Integrate new active contracts reader in Ledger API (#5397)

changelog_begin
changelog_end
This commit is contained in:
Stefano Baghino 2020-04-03 10:17:42 +02:00 committed by GitHub
parent 1d9c7d2574
commit e2cab1c162
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 120 additions and 509 deletions

View File

@ -3,9 +3,10 @@
package com.daml.ledger.participant.state.index.v2
import akka.NotUsed
import akka.stream.scaladsl.Source
import com.digitalasset.ledger.api.domain.TransactionFilter
import scala.concurrent.Future
import com.digitalasset.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
/**
* Serves as a backend to implement
@ -13,7 +14,8 @@ import scala.concurrent.Future
**/
trait IndexActiveContractsService {
def getActiveContractSetSnapshot(
filter: TransactionFilter
): Future[ActiveContractSetSnapshot]
def getActiveContracts(
filter: TransactionFilter,
verbose: Boolean,
): Source[GetActiveContractsResponse, NotUsed]
}

View File

@ -26,6 +26,7 @@ import com.digitalasset.ledger.api.domain.{
TransactionId
}
import com.digitalasset.ledger.api.health.HealthStatus
import com.digitalasset.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
import com.digitalasset.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.digitalasset.ledger.api.v1.transaction_service.{
GetFlatTransactionResponse,
@ -95,10 +96,11 @@ final class TimedIndexService(delegate: IndexService, metrics: MetricRegistry, p
"get_transaction_tree_by_id",
delegate.getTransactionTreeById(transactionId, requestingParties))
override def getActiveContractSetSnapshot(
filter: domain.TransactionFilter
): Future[v2.ActiveContractSetSnapshot] =
time("get_active_contract_set_snapshot", delegate.getActiveContractSetSnapshot(filter))
override def getActiveContracts(
filter: domain.TransactionFilter,
verbose: Boolean,
): Source[GetActiveContractsResponse, NotUsed] =
time("get_active_contracts", delegate.getActiveContracts(filter, verbose))
override def lookupActiveContract(
submitter: Party,

View File

@ -6,35 +6,28 @@ package com.digitalasset.platform.apiserver.services
import akka.NotUsed
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import com.daml.ledger.participant.state.index.v2.{
ActiveContractSetSnapshot,
IndexActiveContractsService => ACSBackend
}
import com.daml.ledger.participant.state.index.v2.{IndexActiveContractsService => ACSBackend}
import com.digitalasset.dec.DirectExecutionContext
import com.digitalasset.grpc.adapter.ExecutionSequencerFactory
import com.digitalasset.ledger.api.domain.LedgerId
import com.digitalasset.ledger.api.v1.active_contracts_service.ActiveContractsServiceGrpc.ActiveContractsService
import com.digitalasset.ledger.api.v1.active_contracts_service._
import com.digitalasset.ledger.api.v1.event.CreatedEvent
import com.digitalasset.ledger.api.validation.TransactionFilterValidator
import com.digitalasset.logging.{ContextualizedLogger, LoggingContext}
import com.digitalasset.logging.LoggingContext.withEnrichedLoggingContext
import com.digitalasset.platform.api.grpc.GrpcApiService
import com.digitalasset.platform.participant.util.LfEngineToApi
import com.digitalasset.platform.server.api.validation.ActiveContractsServiceValidation
import io.grpc.{BindableService, ServerServiceDefinition}
import scalaz.syntax.tag._
import scala.concurrent.ExecutionContext
final class ApiActiveContractsService private (
backend: ACSBackend,
parallelism: Int = Runtime.getRuntime.availableProcessors)(
)(
implicit executionContext: ExecutionContext,
protected val mat: Materializer,
protected val esf: ExecutionSequencerFactory,
logCtx: LoggingContext)
extends ActiveContractsServiceAkkaGrpc
logCtx: LoggingContext,
) extends ActiveContractsServiceAkkaGrpc
with GrpcApiService {
private val logger = ContextualizedLogger.get(this.getClass)
@ -45,50 +38,7 @@ final class ApiActiveContractsService private (
TransactionFilterValidator
.validate(request.getFilter, "filter")
.fold(
Source.failed, { filter =>
withEnrichedLoggingContext(logging.parties(filter.filtersByParty.keys)) {
implicit logCtx =>
Source
.future(backend.getActiveContractSetSnapshot(filter))
.flatMapConcat {
case ActiveContractSetSnapshot(offset, acsStream) =>
acsStream
.map {
case (wfId, create) =>
GetActiveContractsResponse(
workflowId = wfId.map(_.unwrap).getOrElse(""),
activeContracts = List(
CreatedEvent(
create.eventId.unwrap,
create.contractId.coid,
Some(LfEngineToApi.toApiIdentifier(create.templateId)),
create.contractKey.map(ck =>
LfEngineToApi.assertOrRuntimeEx(
"converting stored contract",
LfEngineToApi
.lfVersionedValueToApiValue(verbose = request.verbose, ck))),
Some(
LfEngineToApi.assertOrRuntimeEx(
"converting stored contract",
LfEngineToApi
.lfValueToApiRecord(
verbose = request.verbose,
create.argument.value))),
create.stakeholders.toSeq,
signatories =
create.signatories.map(_.toString)(collection.breakOut),
observers = create.observers.map(_.toString)(collection.breakOut),
agreementText = Some(create.agreementText)
)
)
)
}
.concat(Source.single(GetActiveContractsResponse(offset = offset.value)))
}
}
}
)
.fold(Source.failed, backend.getActiveContracts(_, request.verbose))
.via(logger.logErrorsOnStream)
}
@ -97,8 +47,6 @@ final class ApiActiveContractsService private (
}
object ApiActiveContractsService {
type TransactionId = String
type WorkflowId = String
def create(ledgerId: LedgerId, backend: ACSBackend)(
implicit ec: ExecutionContext,

View File

@ -31,6 +31,7 @@ import com.digitalasset.ledger.api.domain.{
TransactionId
}
import com.digitalasset.ledger.api.health.HealthStatus
import com.digitalasset.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
import com.digitalasset.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.digitalasset.ledger.api.v1.transaction_service.{
GetFlatTransactionResponse,
@ -39,9 +40,8 @@ import com.digitalasset.ledger.api.v1.transaction_service.{
GetTransactionsResponse
}
import com.digitalasset.platform.server.api.validation.ErrorFactories
import com.digitalasset.platform.store.Contract.ActiveContract
import com.digitalasset.platform.store.entries.PartyLedgerEntry
import com.digitalasset.platform.store.{LedgerSnapshot, ReadOnlyLedger}
import com.digitalasset.platform.store.ReadOnlyLedger
import com.digitalasset.platform.ApiOffset
import com.digitalasset.platform.ApiOffset.ApiOffsetConverter
import scalaz.syntax.tag.ToTagOps
@ -57,40 +57,16 @@ abstract class LedgerBackedIndexService(
override def currentHealth(): HealthStatus = ledger.currentHealth()
override def getActiveContractSetSnapshot(
filter: TransactionFilter): Future[ActiveContractSetSnapshot] = {
override def getActiveContracts(
filter: TransactionFilter,
verbose: Boolean,
): Source[GetActiveContractsResponse, NotUsed] = {
val ledgerEnd = ledger.ledgerEnd
ledger
.snapshot(filter)
.map {
case LedgerSnapshot(offset, acsStream) =>
ActiveContractSetSnapshot(
toAbsolute(offset),
acsStream
.mapConcat { ac =>
EventFilter(ac)(filter)
.map(create =>
create.workflowId.map(domain.WorkflowId(_)) -> toUpdateEvent(create))
.toList
}
)
}(mat.executionContext)
.activeContracts(ledgerEnd, convertFilter(filter), verbose)
.concat(Source.single(GetActiveContractsResponse(offset = ApiOffset.toApiString(ledgerEnd))))
}
private def toUpdateEvent(ac: ActiveContract): AcsUpdateEvent.Create =
AcsUpdateEvent.Create(
// we use absolute contract ids as event ids throughout the sandbox
domain.TransactionId(ac.transactionId),
domain.EventId(ac.eventId),
ac.id,
ac.contract.template,
ac.contract.arg,
ac.witnesses,
ac.key.map(_.key),
ac.signatories,
ac.observers,
ac.agreementText
)
override def transactionTrees(
startExclusive: LedgerOffset,
endInclusive: Option[LedgerOffset],
@ -121,10 +97,7 @@ abstract class LedgerBackedIndexService(
.flatTransactions(
startExclusive = from,
endInclusive = to,
filter = filter.filtersByParty.map {
case (party, filters) =>
party -> filters.inclusive.fold(Set.empty[Identifier])(_.templateIds)
},
filter = convertFilter(filter),
verbose = verbose,
)
.map(_._2)
@ -165,6 +138,12 @@ abstract class LedgerBackedIndexService(
}
}
private def convertFilter(filter: TransactionFilter): Map[Party, Set[Identifier]] =
filter.filtersByParty.map {
case (party, filters) =>
party -> filters.inclusive.fold(Set.empty[Identifier])(_.templateIds)
}
override def currentLedgerEnd(): Future[LedgerOffset.Absolute] =
Future.successful(toAbsolute(ledger.ledgerEnd))

View File

@ -19,29 +19,24 @@ import com.digitalasset.daml.lf.value.Value
import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, ContractInst}
import com.digitalasset.daml_lf_dev.DamlLf.Archive
import com.digitalasset.ledger.TransactionId
import com.digitalasset.ledger.api.domain.{
ApplicationId,
CommandId,
LedgerId,
PartyDetails,
TransactionFilter
}
import com.digitalasset.ledger.api.domain.{ApplicationId, CommandId, LedgerId, PartyDetails}
import com.digitalasset.ledger.api.health.HealthStatus
import com.digitalasset.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
import com.digitalasset.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.digitalasset.ledger.api.v1.transaction_service.{
GetFlatTransactionResponse,
GetTransactionResponse,
GetTransactionTreesResponse,
GetTransactionsResponse
GetTransactionsResponse,
}
import com.digitalasset.platform.metrics.timedFuture
import com.digitalasset.platform.store.entries.{
ConfigurationEntry,
LedgerEntry,
PackageLedgerEntry,
PartyLedgerEntry
PartyLedgerEntry,
}
import com.digitalasset.platform.store.{LedgerSnapshot, ReadOnlyLedger}
import com.digitalasset.platform.store.ReadOnlyLedger
import scala.concurrent.Future
@ -103,8 +98,12 @@ class MeteredReadOnlyLedger(ledger: ReadOnlyLedger, metrics: MetricRegistry)
parties: Set[Party]): Source[(Offset, CompletionStreamResponse), NotUsed] =
ledger.completions(startExclusive, endInclusive, applicationId, parties)
override def snapshot(filter: TransactionFilter): Future[LedgerSnapshot] =
ledger.snapshot(filter)
override def activeContracts(
activeAt: Offset,
filter: Map[Party, Set[Identifier]],
verbose: Boolean,
): Source[GetActiveContractsResponse, NotUsed] =
ledger.activeContracts(activeAt, filter, verbose)
override def lookupContract(
contractId: Value.AbsoluteContractId,

View File

@ -42,7 +42,9 @@ import com.digitalasset.ledger.api.domain.{
TransactionFilter
}
import com.digitalasset.ledger.api.health.{HealthStatus, Healthy}
import com.digitalasset.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
import com.digitalasset.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.digitalasset.ledger.api.v1.event.CreatedEvent
import com.digitalasset.ledger.api.v1.transaction_service.{
GetFlatTransactionResponse,
GetTransactionResponse,
@ -51,6 +53,7 @@ import com.digitalasset.ledger.api.v1.transaction_service.{
}
import com.digitalasset.platform.index.TransactionConversion
import com.digitalasset.platform.packages.InMemoryPackageStore
import com.digitalasset.platform.participant.util.LfEngineToApi
import com.digitalasset.platform.sandbox.stores.InMemoryActiveLedgerState
import com.digitalasset.platform.sandbox.stores.ledger.Ledger
import com.digitalasset.platform.sandbox.stores.ledger.ScenarioLoader.LedgerEntryOrBump
@ -61,7 +64,7 @@ import com.digitalasset.platform.store.entries.{
PackageLedgerEntry,
PartyLedgerEntry
}
import com.digitalasset.platform.store.{CompletionFromTransaction, LedgerSnapshot}
import com.digitalasset.platform.store.CompletionFromTransaction
import com.digitalasset.platform.{ApiOffset, index}
import org.slf4j.LoggerFactory
import scalaz.syntax.tag.ToTagOps
@ -199,16 +202,46 @@ class InMemoryLedger(
override def ledgerEnd: Offset = entries.ledgerEnd
// need to take the lock to make sure the two pieces of data are consistent.
override def snapshot(filter: TransactionFilter): Future[LedgerSnapshot] =
Future.successful(this.synchronized {
LedgerSnapshot(
entries.ledgerEnd,
Source
.fromIterator[ActiveContract](() =>
acs.activeContracts.valuesIterator.flatMap(index.EventFilter(_)(filter).toList))
)
})
override def activeContracts(
activeAt: Offset,
filter: Map[Party, Set[Ref.Identifier]],
verbose: Boolean,
): Source[GetActiveContractsResponse, NotUsed] =
Source
.fromIterator[ActiveContract](() =>
acs.activeContracts.valuesIterator.flatMap(index
.EventFilter(_)(TransactionFilter(filter.map {
case (party, templates) =>
party -> Filters(if (templates.nonEmpty) Some(InclusiveFilters(templates)) else None)
}))
.toList))
.map { contract =>
GetActiveContractsResponse(
workflowId = contract.workflowId.getOrElse(""),
activeContracts = List(
CreatedEvent(
contract.eventId,
contract.id.coid,
Some(LfEngineToApi.toApiIdentifier(contract.contract.template)),
contractKey = contract.key.map(
ck =>
LfEngineToApi.assertOrRuntimeEx(
"converting stored contract",
LfEngineToApi
.lfContractKeyToApiValue(verbose = verbose, ck))),
createArguments = Some(
LfEngineToApi.assertOrRuntimeEx(
"converting stored contract",
LfEngineToApi
.lfValueToApiRecord(verbose = verbose, contract.contract.arg.value))),
contract.signatories.union(contract.observers).intersect(filter.keySet).toSeq,
signatories = contract.signatories.toSeq,
observers = contract.observers.toSeq,
agreementText = Some(contract.agreementText)
)
)
)
}
override def lookupContract(
contractId: AbsoluteContractId,

View File

@ -21,8 +21,9 @@ import com.digitalasset.daml_lf_dev.DamlLf
import com.digitalasset.dec.DirectExecutionContext
import com.digitalasset.ledger.TransactionId
import com.digitalasset.ledger.api.domain
import com.digitalasset.ledger.api.domain.{ApplicationId, CommandId, LedgerId, TransactionFilter}
import com.digitalasset.ledger.api.domain.{ApplicationId, CommandId, LedgerId}
import com.digitalasset.ledger.api.health.HealthStatus
import com.digitalasset.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
import com.digitalasset.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.digitalasset.ledger.api.v1.transaction_service.{
GetFlatTransactionResponse,
@ -110,18 +111,12 @@ abstract class BaseLedger(
endInclusive
)
override def snapshot(filter: TransactionFilter): Future[LedgerSnapshot] =
// instead of looking up the latest ledger end, we can only take the latest known ledgerEnd in the scope of SqlLedger.
// If we don't do that, we can miss contracts from a partially inserted batch insert of ledger entries
// scenario:
// 1. batch insert transactions A and B at offsets 5 and 6 respectively; A is a huge transaction, B is a small transaction
// 2. B is inserted earlier than A and the ledger_end column in the parameters table is updated
// 3. A GetActiveContractsRequest comes in and we look at the latest ledger_end offset in the database. We will see 6 (from transaction B).
// 4. If we finish streaming the active contracts up to offset 6 before transaction A is properly inserted into the DB, the client will not see the contracts from transaction A
// The fix to that is to use the latest known headRef, which is updated AFTER a batch has been inserted completely.
ledgerDao
.getActiveContractSnapshot(ledgerEnd, filter)
.map(s => LedgerSnapshot(s.offset, s.acs))(DEC)
override def activeContracts(
activeAt: Offset,
filter: Map[Party, Set[Identifier]],
verbose: Boolean,
): Source[GetActiveContractsResponse, NotUsed] =
ledgerDao.transactionsReader.getActiveContracts(activeAt, filter, verbose)
override def lookupContract(
contractId: AbsoluteContractId,

View File

@ -17,26 +17,21 @@ import com.digitalasset.daml.lf.value.Value
import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, ContractInst}
import com.digitalasset.daml_lf_dev.DamlLf.Archive
import com.digitalasset.ledger.TransactionId
import com.digitalasset.ledger.api.domain.{
ApplicationId,
CommandId,
LedgerId,
PartyDetails,
TransactionFilter
}
import com.digitalasset.ledger.api.domain.{ApplicationId, CommandId, LedgerId, PartyDetails}
import com.digitalasset.ledger.api.health.ReportsHealth
import com.digitalasset.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
import com.digitalasset.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.digitalasset.ledger.api.v1.transaction_service.{
GetFlatTransactionResponse,
GetTransactionResponse,
GetTransactionTreesResponse,
GetTransactionsResponse
GetTransactionsResponse,
}
import com.digitalasset.platform.store.entries.{
ConfigurationEntry,
LedgerEntry,
PackageLedgerEntry,
PartyLedgerEntry
PartyLedgerEntry,
}
import scala.concurrent.Future
@ -72,7 +67,11 @@ trait ReadOnlyLedger extends ReportsHealth with AutoCloseable {
applicationId: ApplicationId,
parties: Set[Ref.Party]): Source[(Offset, CompletionStreamResponse), NotUsed]
def snapshot(filter: TransactionFilter): Future[LedgerSnapshot]
def activeContracts(
activeAt: Offset,
filter: Map[Party, Set[Identifier]],
verbose: Boolean,
): Source[GetActiveContractsResponse, NotUsed]
def lookupContract(
contractId: Value.AbsoluteContractId,

View File

@ -23,10 +23,10 @@ import com.daml.ledger.participant.state.index.v2.{
import com.daml.ledger.participant.state.v1._
import com.digitalasset.daml.lf.archive.Decode
import com.digitalasset.daml.lf.data.Ref
import com.digitalasset.daml.lf.data.Ref.{LedgerString, PackageId, Party}
import com.digitalasset.daml.lf.data.Ref.{PackageId, Party}
import com.digitalasset.daml.lf.data.Relation.Relation
import com.digitalasset.daml.lf.transaction.Node
import com.digitalasset.daml.lf.transaction.Node.{GlobalKey, KeyWithMaintainers}
import com.digitalasset.daml.lf.transaction.Node.GlobalKey
import com.digitalasset.daml.lf.value.Value
import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, ContractInst, NodeId}
import com.digitalasset.daml_lf_dev.DamlLf.Archive
@ -46,7 +46,7 @@ import com.digitalasset.logging.{ContextualizedLogger, LoggingContext}
import com.digitalasset.platform.ApiOffset.ApiOffsetConverter
import com.digitalasset.platform.configuration.ServerRole
import com.digitalasset.platform.events.EventIdFormatter.split
import com.digitalasset.platform.store.Contract.{ActiveContract, DivulgedContract}
import com.digitalasset.platform.store.Contract.ActiveContract
import com.digitalasset.platform.store.Conversions._
import com.digitalasset.platform.store.SimpleSqlAsVectorOf.SimpleSqlAsVectorOf
import com.digitalasset.platform.store._
@ -1155,16 +1155,6 @@ private class JdbcLedgerDao(
.map(_.map((toLedgerEntry _).tupled(_)._2))(executionContext)
}
private val ContractDataParser = (contractId("id")
~ ledgerString("transaction_id").?
~ ledgerString("create_event_id").?
~ ledgerString("workflow_id").?
~ date("effective_at").?
~ binaryStream("contract")
~ binaryStream("key").?
~ str("signatories").?
~ str("observers").? map flatten)
private val SQL_SELECT_CONTRACT =
SQL("""select cd.contract
|from contract_data cd
@ -1259,81 +1249,6 @@ private class JdbcLedgerDao(
.getOrElse(sys.error(s"failed to deserialize contract! cid:${contractId.coid}"))
})(executionContext)
private def mapContractDetails(
contractResult: (
Value.AbsoluteContractId,
Option[LedgerString],
Option[EventId],
Option[WorkflowId],
Option[Date],
InputStream,
Option[InputStream],
Option[String],
Option[String]))(implicit conn: Connection): Contract =
contractResult match {
case (coid, None, None, None, None, contractStream, None, None, None) =>
val divulgences = lookupDivulgences(coid.coid)
DivulgedContract(
coid,
contractSerializer
.deserializeContractInstance(contractStream)
.getOrElse(sys.error(s"failed to deserialize contract! cid:$coid")),
divulgences
)
case (
coid,
Some(transactionId),
Some(eventId),
workflowId,
Some(ledgerEffectiveTime),
contractStream,
keyStreamO,
Some(signatoriesRaw),
observersRaw) =>
val witnesses = lookupWitnesses(coid.coid)
val divulgences = lookupDivulgences(coid.coid)
val contractInstance = contractSerializer
.deserializeContractInstance(contractStream)
.getOrElse(sys.error(s"failed to deserialize contract! cid:${coid.coid}"))
val signatories =
signatoriesRaw.split(JdbcLedgerDao.PARTY_SEPARATOR).toSet.map(Party.assertFromString)
val observers = observersRaw
.map(_.split(JdbcLedgerDao.PARTY_SEPARATOR).toSet.map(Party.assertFromString))
.getOrElse(Set.empty)
ActiveContract(
coid,
ledgerEffectiveTime.toInstant,
transactionId,
eventId,
workflowId,
contractInstance,
witnesses,
divulgences,
keyStreamO.map(keyStream => {
val keyMaintainers = lookupKeyMaintainers(coid.coid)
val keyValue = ValueSerializer
.deserializeValue(keyStream, s"Failed to deserialize key for contract $coid")
.ensureNoCid
.fold(
coid => sys.error(s"Found contract ID $coid in a contract key"),
identity,
)
KeyWithMaintainers(keyValue, keyMaintainers)
}),
signatories,
observers,
contractInstance.agreementText
)
case (_, _, _, _, _, _, _, _, _) =>
sys.error(
"mapContractDetails called with partial data, cannot map to either active or divulged contract")
}
private def lookupWitnesses(coid: String)(implicit conn: Connection): Set[Party] =
SQL_SELECT_WITNESS
.on("contract_id" -> coid)
@ -1392,9 +1307,6 @@ private class JdbcLedgerDao(
}
}
// this query pre-filters the active contracts. this avoids loading data that anyway will be dismissed later
private val SQL_SELECT_ACTIVE_CONTRACTS = SQL(queries.SQL_SELECT_ACTIVE_CONTRACTS)
private def orEmptyStringList(xs: Iterable[String]): List[String] =
if (xs.nonEmpty) xs.toList else List("")
@ -1414,42 +1326,6 @@ private class JdbcLedgerDao(
case _ => Seq.empty
})
override def getActiveContractSnapshot(
endInclusive: Offset,
filter: TransactionFilter): Future[LedgerSnapshot] = {
val contractStream =
PaginatingAsyncStream(PageSize) { queryOffset =>
dbDispatcher.executeSql(
"load_active_contracts",
Some(s"bounds: ]0, ${endInclusive.toApiString}] queryOffset $queryOffset")) {
implicit conn =>
SQL_SELECT_ACTIVE_CONTRACTS
.on(
"endInclusive" -> endInclusive,
"queryOffset" -> queryOffset,
"pageSize" -> PageSize,
"template_parties" -> byPartyAndTemplate(filter),
"wildcard_parties" -> justByParty(filter),
)
.asVectorOf(ContractDataParser)
}
}.mapAsync(1) { contractResult =>
dbDispatcher
.executeSql("load_contract_details", Some(s"contract details: ${contractResult._1}")) {
implicit conn =>
mapContractDetails(contractResult) match {
case ac: ActiveContract => ac
case _: DivulgedContract =>
sys.error("Impossible: SQL_SELECT_ACTIVE_CONTRACTS returned a divulged contract")
}
}
}
Future.successful(LedgerSnapshot(endInclusive, contractStream))
}
private val SQL_SELECT_MULTIPLE_PARTIES =
SQL(
"select party, display_name, ledger_offset, explicit from parties where party in ({parties})")
@ -1807,8 +1683,6 @@ object JdbcLedgerDao {
protected[JdbcLedgerDao] def SQL_INSERT_COMMAND: String
protected[JdbcLedgerDao] def SQL_SELECT_ACTIVE_CONTRACTS: String
// Note: the SQL backend may receive divulgence information for the same (contract, party) tuple
// more than once through BlindingInfo.globalDivulgence.
// The ledger offsets for the same (contract, party) tuple should always be increasing, and the database
@ -1862,43 +1736,6 @@ object JdbcLedgerDao {
override protected[JdbcLedgerDao] val DUPLICATE_KEY_ERROR: String = "duplicate key"
override protected[JdbcLedgerDao] val SQL_SELECT_ACTIVE_CONTRACTS: String =
// the distinct keyword is required, because a single contract can be visible by 2 parties,
// thus resulting in multiple output rows
s"""
|with stakeholders as (
|select signatory as party, contract_id from contract_signatories
| union
| select observer as party, contract_id from contract_observers
|)
|select distinct
| c.create_offset,
| cd.id,
| cd.contract,
| c.transaction_id,
| c.create_event_id,
| c.workflow_id,
| c.key,
| le.effective_at,
| string_agg(distinct sigs.signatory, '$PARTY_SEPARATOR') as signatories,
| string_agg(distinct obs.observer, '$PARTY_SEPARATOR') as observers
|from contracts c
|inner join contract_data cd on c.id = cd.id
|inner join ledger_entries le on c.transaction_id = le.transaction_id
|inner join stakeholders s on c.id = s.contract_id
|left join contract_signatories sigs on sigs.contract_id = c.id
|left join contract_observers obs on obs.contract_id = c.id
|where create_offset <= {endInclusive} and (archive_offset is null or archive_offset > {endInclusive})
|and
| (
| concat(c.name,'&',s.party) in ({template_parties})
| OR s.party in ({wildcard_parties})
| )
|group by c.create_offset, cd.id, cd.contract, c.transaction_id, c.create_event_id, c.workflow_id, c.key, le.effective_at
|order by c.create_offset
|limit {pageSize} offset {queryOffset}
|""".stripMargin
override protected[JdbcLedgerDao] val SQL_TRUNCATE_TABLES: String =
"""
|truncate table configuration_entries cascade;
@ -1965,43 +1802,6 @@ object JdbcLedgerDao {
override protected[JdbcLedgerDao] val DUPLICATE_KEY_ERROR: String =
"Unique index or primary key violation"
override protected[JdbcLedgerDao] val SQL_SELECT_ACTIVE_CONTRACTS: String =
// the distinct keyword is required, because a single contract can be visible by 2 parties,
// thus resulting in multiple output rows
s"""
|with stakeholders as (
|select signatory as party, contract_id from contract_signatories
| union
| select observer as party, contract_id from contract_observers
|)
|select distinct
| c.create_offset,
| cd.id,
| cd.contract,
| c.transaction_id,
| c.create_event_id,
| c.workflow_id,
| c.key,
| le.effective_at,
| listagg(distinct sigs.signatory, '$PARTY_SEPARATOR') as signatories,
| listagg(distinct obs.observer, '$PARTY_SEPARATOR') as observers
|from contracts c
|inner join contract_data cd on c.id = cd.id
|inner join ledger_entries le on c.transaction_id = le.transaction_id
|inner join stakeholders s on c.id = s.contract_id
|left join contract_signatories sigs on sigs.contract_id = c.id
|left join contract_observers obs on obs.contract_id = c.id
|where c.create_offset <= {endInclusive} and (archive_offset is null or archive_offset > {endInclusive})
|and
| (
| concat(c.name,'&',s.party) in ({template_parties})
| OR s.party in ({wildcard_parties})
| )
|group by c.create_offset, cd.id, cd.contract, c.transaction_id, c.create_event_id, c.workflow_id, c.key, le.effective_at
|order by c.create_offset
|limit {pageSize} offset {queryOffset}
|""".stripMargin
override protected[JdbcLedgerDao] val SQL_TRUNCATE_TABLES: String =
"""
|set referential_integrity false;

View File

@ -16,7 +16,7 @@ import com.digitalasset.daml.lf.value.Value
import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, ContractInst}
import com.digitalasset.daml_lf_dev.DamlLf.Archive
import com.digitalasset.dec.DirectExecutionContext
import com.digitalasset.ledger.api.domain.{CommandId, LedgerId, PartyDetails, TransactionFilter}
import com.digitalasset.ledger.api.domain.{CommandId, LedgerId, PartyDetails}
import com.digitalasset.ledger.api.health.ReportsHealth
import com.digitalasset.platform.store.Contract.ActiveContract
import com.digitalasset.platform.store.dao.events.{TransactionsReader, TransactionsWriter}
@ -26,7 +26,7 @@ import com.digitalasset.platform.store.entries.{
PackageLedgerEntry,
PartyLedgerEntry
}
import com.digitalasset.platform.store.{LedgerSnapshot, PersistenceEntry}
import com.digitalasset.platform.store.PersistenceEntry
import scala.collection.immutable
import scala.concurrent.Future
@ -101,15 +101,6 @@ trait LedgerReadDao extends ReportsHealth {
startExclusive: Offset,
endInclusive: Offset): Source[(Offset, LedgerEntry), NotUsed]
/**
* Returns a snapshot of the ledger.
* The snapshot consists of an offset, and a stream of contracts that were active at that offset.
*/
def getActiveContractSnapshot(
endInclusive: Offset,
filter: TransactionFilter
): Future[LedgerSnapshot]
/** Returns a list of party details for the parties specified. */
def getParties(parties: Seq[Party]): Future[List[PartyDetails]]

View File

@ -17,7 +17,7 @@ import com.digitalasset.daml.lf.transaction.Node
import com.digitalasset.daml.lf.value.Value
import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, ContractInst}
import com.digitalasset.daml_lf_dev.DamlLf.Archive
import com.digitalasset.ledger.api.domain.{CommandId, LedgerId, PartyDetails, TransactionFilter}
import com.digitalasset.ledger.api.domain.{CommandId, LedgerId, PartyDetails}
import com.digitalasset.ledger.api.health.HealthStatus
import com.digitalasset.platform.metrics.timedFuture
import com.digitalasset.platform.store.Contract.ActiveContract
@ -26,9 +26,9 @@ import com.digitalasset.platform.store.entries.{
ConfigurationEntry,
LedgerEntry,
PackageLedgerEntry,
PartyLedgerEntry
PartyLedgerEntry,
}
import com.digitalasset.platform.store.{LedgerSnapshot, PersistenceEntry}
import com.digitalasset.platform.store.PersistenceEntry
import scala.collection.immutable
import scala.concurrent.Future
@ -93,12 +93,6 @@ class MeteredLedgerReadDao(ledgerDao: LedgerReadDao, metrics: MetricRegistry)
forParty: Party): Future[Option[Value.AbsoluteContractId]] =
timedFuture(Metrics.lookupKey, ledgerDao.lookupKey(key, forParty))
override def getActiveContractSnapshot(
endInclusive: Offset,
filter: TransactionFilter
): Future[LedgerSnapshot] =
ledgerDao.getActiveContractSnapshot(endInclusive, filter)
override def getLedgerEntries(
startExclusive: Offset,
endInclusive: Offset

View File

@ -37,9 +37,6 @@ private[dao] final class TransactionsReader(
private def offsetFor(response: GetTransactionTreesResponse): Offset =
ApiOffset.assertFromString(response.transactions.head.offset)
private def offsetFor(response: GetActiveContractsResponse): Offset =
ApiOffset.assertFromString(response.offset)
def getFlatTransactions(
startExclusive: Offset,
endInclusive: Offset,
@ -134,13 +131,13 @@ private[dao] final class TransactionsReader(
activeAt: Offset,
filter: FilterRelation,
verbose: Boolean,
): Source[(Offset, GetActiveContractsResponse), NotUsed] = {
): Source[GetActiveContractsResponse, NotUsed] = {
val events =
PaginatingAsyncStream(pageSize) { offset =>
val query =
EventsTable
.preparePagedGetActiveContracts(
activeAt: Offset,
activeAt = activeAt,
filter = filter,
pageSize = pageSize,
rowOffset = offset,
@ -153,8 +150,7 @@ private[dao] final class TransactionsReader(
groupContiguous(events)(by = _.transactionId)
.flatMapConcat { events =>
val response = EventsTable.Entry.toGetActiveContractsResponse(events)
Source(response.map(r => offsetFor(r) -> r))
Source(EventsTable.Entry.toGetActiveContractsResponse(events))
}
}

View File

@ -7,7 +7,6 @@ import java.util.UUID
import akka.NotUsed
import akka.stream.scaladsl.{Sink, Source}
import com.daml.ledger.participant.state.v1.Offset
import com.digitalasset.daml.lf.data.Ref.{Identifier, Party}
import com.digitalasset.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
import com.digitalasset.ledger.api.v1.event.CreatedEvent
@ -261,11 +260,8 @@ private[dao] trait JdbcLedgerDaoActiveContractsSpec
}
private def activeContractsOf(
source: Source[(Offset, GetActiveContractsResponse), NotUsed],
source: Source[GetActiveContractsResponse, NotUsed],
): Future[Seq[CreatedEvent]] =
source
.map(_._2)
.runWith(Sink.seq)
.map(_.flatMap(_.activeContracts))
source.runWith(Sink.seq).map(_.flatMap(_.activeContracts))
}

View File

@ -5,24 +5,17 @@ package com.digitalasset.platform.store.dao
import java.time.Instant
import akka.stream.scaladsl.{Sink, Source}
import com.digitalasset.daml.lf.data.{ImmArray, Ref}
import com.digitalasset.daml.lf.transaction.GenTransaction
import com.digitalasset.daml.lf.transaction.Node.{KeyWithMaintainers, NodeCreate, NodeFetch}
import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, ValueText, VersionedValue}
import com.digitalasset.daml.lf.value.ValueVersions
import com.digitalasset.ledger.api.domain.{
Filters,
InclusiveFilters,
RejectionReason,
TransactionFilter
}
import com.digitalasset.ledger.api.domain.RejectionReason
import com.digitalasset.platform.store.PersistenceEntry
import com.digitalasset.platform.store.entries.LedgerEntry
import org.scalatest.{AsyncFlatSpec, LoneElement, Matchers}
import scala.collection.immutable.HashMap
import scala.concurrent.Future
private[dao] trait JdbcLedgerDaoLedgerEntriesSpec extends LoneElement {
this: AsyncFlatSpec with Matchers with JdbcLedgerDaoSuite =>
@ -157,120 +150,4 @@ private[dao] trait JdbcLedgerDaoLedgerEntriesSpec extends LoneElement {
}
}
it should "be able to produce a valid snapshot" in {
val sumSink = Sink.fold[Int, Int](0)(_ + _)
val N = 1000
val M = 10
def runSequentially[U](n: Int, f: Int => Future[U]): Future[Seq[U]] =
Source(1 to n).mapAsync(1)(f).runWith(Sink.seq)
// Perform the following operations:
// - Create N contracts
// - Archive 1 contract
// - Take a snapshot
// - Create another M contracts
// The resulting snapshot should contain N-1 contracts
val aliceWildcardFilter =
TransactionFilter(Map(alice -> Filters(None)))
val aliceSpecificTemplatesFilter =
TransactionFilter(Map(alice -> Filters(InclusiveFilters(Set(someTemplateId)))))
val charlieWildcardFilter =
TransactionFilter(Map(charlie -> Filters(None)))
val charlieSpecificFilter =
TransactionFilter(Map(charlie -> Filters(InclusiveFilters(Set(someTemplateId)))))
val mixedFilter =
TransactionFilter(
Map(
alice -> Filters(InclusiveFilters(Set(someTemplateId))),
bob -> Filters(None),
charlie -> Filters(None),
))
for {
startingOffset <- ledgerDao.lookupLedgerEnd()
aliceStartingSnapshot <- ledgerDao.getActiveContractSnapshot(
startingOffset,
aliceWildcardFilter)
charlieStartingSnapshot <- ledgerDao.getActiveContractSnapshot(
startingOffset,
charlieWildcardFilter)
mixedStartingSnapshot <- ledgerDao.getActiveContractSnapshot(startingOffset, mixedFilter)
created <- runSequentially(N, _ => store(singleCreate))
_ <- store(singleExercise(nonTransient(created.head._2).loneElement))
snapshotOffset <- ledgerDao.lookupLedgerEnd()
aliceWildcardSnapshot <- ledgerDao.getActiveContractSnapshot(
snapshotOffset,
aliceWildcardFilter)
aliceSpecificTemplatesSnapshot <- ledgerDao.getActiveContractSnapshot(
snapshotOffset,
aliceSpecificTemplatesFilter)
charlieWildcardSnapshot <- ledgerDao.getActiveContractSnapshot(
snapshotOffset,
charlieWildcardFilter)
charlieSpecificTemplateSnapshot <- ledgerDao.getActiveContractSnapshot(
snapshotOffset,
charlieSpecificFilter)
mixedSnapshot <- ledgerDao.getActiveContractSnapshot(snapshotOffset, mixedFilter)
_ <- runSequentially(M, _ => store(singleCreate))
endingOffset <- ledgerDao.lookupLedgerEnd()
aliceStartingSnapshotSize <- aliceStartingSnapshot.acs.map(_ => 1).runWith(sumSink)
aliceWildcardSnapshotSize <- aliceWildcardSnapshot.acs.map(_ => 1).runWith(sumSink)
aliceSpecificTemplatesSnapshotSize <- aliceSpecificTemplatesSnapshot.acs
.map(_ => 1)
.runWith(sumSink)
charlieStartingSnapshotSize <- charlieStartingSnapshot.acs.map(_ => 1).runWith(sumSink)
charlieWildcardSnapshotSize <- charlieWildcardSnapshot.acs.map(_ => 1).runWith(sumSink)
charlieSpecificTemplateSnapshotSize <- charlieSpecificTemplateSnapshot.acs
.map(_ => 1)
.runWith(sumSink)
mixedStartingSnapshotSize <- mixedStartingSnapshot.acs.map(_ => 1).runWith(sumSink)
mixedSnapshotSize <- mixedSnapshot.acs.map(_ => 1).runWith(sumSink)
} yield {
withClue("starting offset: ") {
aliceStartingSnapshot.offset shouldEqual startingOffset
}
withClue("snapshot offset: ") {
aliceWildcardSnapshot.offset shouldEqual snapshotOffset
aliceSpecificTemplatesSnapshot.offset shouldEqual snapshotOffset
}
withClue("snapshot offset (2): ") {
snapshotOffset.toLong shouldEqual (startingOffset.toLong + N + 1)
}
withClue("ending offset: ") {
endingOffset.toLong shouldEqual (snapshotOffset.toLong + M)
}
withClue("alice wildcard snapshot size: ") {
(aliceWildcardSnapshotSize - aliceStartingSnapshotSize) shouldEqual (N - 1)
}
withClue("alice specific template snapshot size: ") {
(aliceSpecificTemplatesSnapshotSize - aliceStartingSnapshotSize) shouldEqual (N - 1)
}
withClue("charlie wildcard snapshot size: ") {
(charlieWildcardSnapshotSize - charlieStartingSnapshotSize) shouldEqual 0
}
withClue("charlie specific template snapshot size: ") {
(charlieSpecificTemplateSnapshotSize - charlieStartingSnapshotSize) shouldEqual 0
}
withClue("mixed snapshot size: ") {
(mixedSnapshotSize - mixedStartingSnapshotSize) shouldEqual (N - 1)
}
}
}
}