From 9eb28924efe90c33d374a03ef8376c1451550f88 Mon Sep 17 00:00:00 2001 From: Stefano Baghino <43749967+stefanobaghino-da@users.noreply.github.com> Date: Wed, 19 Feb 2020 18:37:07 +0100 Subject: [PATCH] Push down completion requests to data access layer (#4609) * Push down completion requests to data access layer This is largely a refactoring. The externally observable behavior is unchanged, but: - a sub-dao is created for command completions (with the intent of breaking up the dao completely in future commits) - the command completions dao can, in theory, directly fetch completions off the index - in practice this is not implemented here to keep this PR as small as possible Filtering ledger entries to get completions is moved to a function that is in turn used by: - the ledger dao - the in-memory sandbox The plan for the former is to add a new table where completion-relevant data is stored so that it can be fetched quickly. The plan for the latter is to get rid of it once DAML-on-SQL ships. CHANGELOG_BEGIN CHANGELOG_END * Fix off-by-one error in the in-memory sandbox --- .../index/LedgerBackedIndexService.scala | 50 +------------- .../index/MeteredReadOnlyLedger.scala | 12 +++- .../ledger/inmemory/InMemoryLedger.scala | 14 +++- .../platform/store/BaseLedger.scala | 14 +++- .../store/CompletionFromTransaction.scala | 66 +++++++++++++++++++ .../platform/store/ReadOnlyLedger.scala | 15 ++++- .../store/dao/CommandCompletionsReader.scala | 45 +++++++++++++ .../platform/store/dao/JdbcLedgerDao.scala | 1 + .../platform/store/dao/LedgerDao.scala | 2 + .../platform/store/dao/MeteredLedgerDao.scala | 2 + 10 files changed, 169 insertions(+), 52 deletions(-) create mode 100644 ledger/sandbox/src/main/scala/com/digitalasset/platform/store/CompletionFromTransaction.scala create mode 100644 ledger/sandbox/src/main/scala/com/digitalasset/platform/store/dao/CommandCompletionsReader.scala diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/LedgerBackedIndexService.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/LedgerBackedIndexService.scala index d704be131f1..4e5eab4c776 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/LedgerBackedIndexService.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/LedgerBackedIndexService.scala @@ -22,11 +22,6 @@ import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, ContractInst} import com.digitalasset.daml_lf_dev.DamlLf.Archive import com.digitalasset.dec.{DirectExecutionContext => DEC} import com.digitalasset.ledger.api.domain -import com.digitalasset.ledger.api.domain.CompletionEvent.{ - Checkpoint, - CommandAccepted, - CommandRejected -} import com.digitalasset.ledger.api.domain.{ ApplicationId, CompletionEvent, @@ -44,8 +39,6 @@ import com.digitalasset.platform.server.api.validation.ErrorFactories import com.digitalasset.platform.store.Contract.ActiveContract import com.digitalasset.platform.store.entries.{LedgerEntry, PartyLedgerEntry} import com.digitalasset.platform.store.{LedgerSnapshot, ReadOnlyLedger} -import scalaz.Tag -import scalaz.syntax.tag._ import scala.concurrent.Future @@ -224,48 +217,11 @@ abstract class LedgerBackedIndexService( begin: LedgerOffset, applicationId: ApplicationId, parties: Set[Ref.Party] - ): Source[CompletionEvent, NotUsed] = { - val converter = new OffsetConverter() - converter.toAbsolute(begin).flatMapConcat { + ): Source[CompletionEvent, NotUsed] = + new OffsetConverter().toAbsolute(begin).flatMapConcat { case LedgerOffset.Absolute(absBegin) => - ledger - .ledgerEntries(Some(absBegin.toLong), endExclusive = None) - .map { - case (offset, entry) => - (offset + 1, entry) //doing the same as above with transactions. The ledger api has to return a non-inclusive offset - } - .collect { - case (offset, t: LedgerEntry.Transaction) - // We only send out completions for transactions for which we have the full submitter information (appId, submitter, cmdId). - // - // This doesn't make a difference for the sandbox (because it represents the ledger backend + api server in single package). - // But for an api server that is part of a distributed ledger network, we might see - // transactions that originated from some other api server. These transactions don't contain the submitter information, - // and therefore we don't emit CommandAccepted completions for those - if t.applicationId.contains(applicationId.unwrap) && - t.submittingParty.exists(parties.contains) && - t.commandId.nonEmpty => - CommandAccepted( - domain.LedgerOffset.Absolute(Ref.LedgerString.assertFromString(offset.toString)), - t.recordedAt, - Tag.subst(t.commandId).get, - domain.TransactionId(t.transactionId) - ) - - case (offset, c: LedgerEntry.Checkpoint) => - Checkpoint( - domain.LedgerOffset.Absolute(Ref.LedgerString.assertFromString(offset.toString)), - c.recordedAt) - case (offset, r: LedgerEntry.Rejection) - if r.commandId.nonEmpty && r.applicationId.contains(applicationId.unwrap) => - CommandRejected( - domain.LedgerOffset.Absolute(Ref.LedgerString.assertFromString(offset.toString)), - r.recordTime, - domain.CommandId(r.commandId), - r.rejectionReason) - } + ledger.completions(Option(absBegin.toLong), None, applicationId, parties).map(_._2) } - } // IndexPackagesService override def listLfPackages(): Future[Map[PackageId, PackageDetails]] = diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/MeteredReadOnlyLedger.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/MeteredReadOnlyLedger.scala index ac88c98151f..e9b81ee62ca 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/MeteredReadOnlyLedger.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/MeteredReadOnlyLedger.scala @@ -14,7 +14,8 @@ import com.digitalasset.daml.lf.transaction.Node.GlobalKey import com.digitalasset.daml.lf.value.Value import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, ContractInst} import com.digitalasset.daml_lf_dev.DamlLf.Archive -import com.digitalasset.ledger.api.domain.{LedgerId, PartyDetails, TransactionId} +import com.digitalasset.ledger.api.domain +import com.digitalasset.ledger.api.domain.{ApplicationId, LedgerId, PartyDetails, TransactionId} import com.digitalasset.ledger.api.health.HealthStatus import com.digitalasset.platform.metrics.timedFuture import com.digitalasset.platform.participant.util.EventFilter.TemplateAwareFilter @@ -35,7 +36,7 @@ class MeteredReadOnlyLedger(ledger: ReadOnlyLedger, metrics: MetricRegistry) val lookupContract: Timer = metrics.timer("daml.index.lookup_contract") val lookupKey: Timer = metrics.timer("daml.index.lookup_key") val lookupTransaction: Timer = metrics.timer("daml.index.lookup_transaction") - val lookupLedgerConfiguration: Timer = metrics.timer("daml.index.lookup_ledger_configuration ") + val lookupLedgerConfiguration: Timer = metrics.timer("daml.index.lookup_ledger_configuration") val parties: Timer = metrics.timer("daml.index.parties") val listLfPackages: Timer = metrics.timer("daml.index.list_lf_packages") val getLfArchive: Timer = metrics.timer("daml.index.get_lf_archive") @@ -53,6 +54,13 @@ class MeteredReadOnlyLedger(ledger: ReadOnlyLedger, metrics: MetricRegistry) override def ledgerEnd: Long = ledger.ledgerEnd + override def completions( + beginInclusive: Option[Long], + endExclusive: Option[Long], + applicationId: ApplicationId, + parties: Set[Party]): Source[(Long, domain.CompletionEvent), NotUsed] = + ledger.completions(beginInclusive, endExclusive, applicationId, parties) + override def snapshot(filter: TemplateAwareFilter): Future[LedgerSnapshot] = ledger.snapshot(filter) diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/inmemory/InMemoryLedger.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/inmemory/InMemoryLedger.scala index e20001a7a80..6dfd466e7ca 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/inmemory/InMemoryLedger.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/inmemory/InMemoryLedger.scala @@ -26,6 +26,7 @@ import com.digitalasset.daml_lf_dev.DamlLf.Archive import com.digitalasset.ledger.api.domain.{ ApplicationId, CommandId, + CompletionEvent, LedgerId, PartyDetails, RejectionReason, @@ -45,9 +46,10 @@ import com.digitalasset.platform.store.entries.{ PackageLedgerEntry, PartyLedgerEntry } -import com.digitalasset.platform.store.LedgerSnapshot +import com.digitalasset.platform.store.{CompletionFromTransaction, LedgerSnapshot} import org.slf4j.LoggerFactory import scalaz.Tag +import scalaz.syntax.tag.ToTagOps import scala.concurrent.Future import scala.util.{Failure, Success, Try} @@ -101,6 +103,16 @@ class InMemoryLedger( private var deduplicator = Deduplicator() private var ledgerConfiguration: Option[Configuration] = None + override def completions( + beginInclusive: Option[Long], + endExclusive: Option[Long], + applicationId: ApplicationId, + parties: Set[Party]): Source[(Long, CompletionEvent), NotUsed] = + entries + .getSource(beginInclusive, endExclusive) + .collect { case (offset, InMemoryLedgerEntry(entry)) => (offset + 1, entry) } + .collect(CompletionFromTransaction(applicationId.unwrap, parties)) + override def ledgerEnd: Long = entries.ledgerEnd // need to take the lock to make sure the two pieces of data are consistent. diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/store/BaseLedger.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/store/BaseLedger.scala index c8274d84792..c4ef0076031 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/store/BaseLedger.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/store/BaseLedger.scala @@ -16,7 +16,7 @@ import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, ContractInst} import com.digitalasset.daml_lf_dev.DamlLf import com.digitalasset.dec.DirectExecutionContext import com.digitalasset.ledger.api.domain -import com.digitalasset.ledger.api.domain.{LedgerId, TransactionId} +import com.digitalasset.ledger.api.domain.{ApplicationId, LedgerId, TransactionId} import com.digitalasset.ledger.api.health.HealthStatus import com.digitalasset.platform.akkastreams.dispatcher.Dispatcher import com.digitalasset.platform.akkastreams.dispatcher.SubSource.RangeSource @@ -31,6 +31,7 @@ import com.digitalasset.platform.store.entries.{ import scala.concurrent.{ExecutionContext, Future} import scala.util.Try +import scalaz.syntax.tag.ToTagOps class BaseLedger(val ledgerId: LedgerId, headAtInitialization: Long, ledgerDao: LedgerReadDao) extends ReadOnlyLedger { @@ -60,6 +61,17 @@ class BaseLedger(val ledgerId: LedgerId, headAtInitialization: Long, ledgerDao: override def ledgerEnd: Long = dispatcher.getHead() + override def completions( + beginInclusive: Option[Long], + endExclusive: Option[Long], + applicationId: ApplicationId, + parties: Set[Party]): Source[(Long, domain.CompletionEvent), NotUsed] = + dispatcher.startingAt( + beginInclusive.getOrElse(0), + RangeSource(ledgerDao.completions.getCommandCompletions(_, _, applicationId.unwrap, parties)), + endExclusive + ) + override def snapshot(filter: TemplateAwareFilter): Future[LedgerSnapshot] = // instead of looking up the latest ledger end, we can only take the latest known ledgerEnd in the scope of SqlLedger. // If we don't do that, we can miss contracts from a partially inserted batch insert of ledger entries diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/store/CompletionFromTransaction.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/store/CompletionFromTransaction.scala new file mode 100644 index 00000000000..d847b6d70d0 --- /dev/null +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/store/CompletionFromTransaction.scala @@ -0,0 +1,66 @@ +// Copyright (c) 2020 The DAML Authors. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.digitalasset.platform.store + +import com.daml.ledger.participant.state.v1.TransactionId +import com.digitalasset.daml.lf.data.Ref +import com.digitalasset.ledger.{ApplicationId, CommandId} +import com.digitalasset.ledger.api.domain +import com.digitalasset.ledger.api.domain.CompletionEvent +import com.digitalasset.ledger.api.domain.CompletionEvent.{CommandAccepted, CommandRejected} +import com.digitalasset.platform.store.dao.LedgerDao +import com.digitalasset.platform.store.entries.LedgerEntry + +// Turn a stream of transactions into a stream of completions for a given application and set of parties +// TODO Remove this when: +// TODO - the participant can read completions off the index directly AND +// TODO - the in-memory sandbox is gone +private[platform] object CompletionFromTransaction { + + private def toApiOffset(offset: LedgerDao#LedgerOffset): domain.LedgerOffset.Absolute = + domain.LedgerOffset.Absolute(Ref.LedgerString.assertFromString(offset.toString)) + + private def toApiCommandId(commandId: CommandId): domain.CommandId = + domain.CommandId(commandId) + + private def toApiTransactionId(transactionId: TransactionId): domain.TransactionId = + domain.TransactionId(transactionId) + + // Filter completions for transactions for which we have the full submitter information: appId, submitter, cmdId + // This doesn't make a difference for the sandbox (because it represents the ledger backend + api server in single package). + // But for an api server that is part of a distributed ledger network, we might see + // transactions that originated from some other api server. These transactions don't contain the submitter information, + // and therefore we don't emit CommandAccepted completions for those + def apply(appId: ApplicationId, parties: Set[Ref.Party]): PartialFunction[ + (LedgerDao#LedgerOffset, LedgerEntry), + (LedgerDao#LedgerOffset, CompletionEvent)] = { + case ( + offset, + LedgerEntry.Transaction( + Some(cmdId), + transactionId, + Some(`appId`), + Some(submitter), + _, + _, + recordTime, + _, + _)) if parties(submitter) => + offset -> CommandAccepted( + toApiOffset(offset), + recordTime, + toApiCommandId(cmdId), + toApiTransactionId(transactionId)) + case (offset, LedgerEntry.Rejection(recordTime, commandId, `appId`, submitter, rejectionReason)) + if parties(submitter) => + offset -> CommandRejected( + toApiOffset(offset), + recordTime, + toApiCommandId(commandId), + rejectionReason) + case (offset, LedgerEntry.Checkpoint(recordedAt)) => + offset -> CompletionEvent.Checkpoint(toApiOffset(offset), recordedAt) + } + +} diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/store/ReadOnlyLedger.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/store/ReadOnlyLedger.scala index c9ff083bc7b..50191d1da03 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/store/ReadOnlyLedger.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/store/ReadOnlyLedger.scala @@ -7,13 +7,20 @@ import akka.NotUsed import akka.stream.scaladsl.Source import com.daml.ledger.participant.state.index.v2.PackageDetails import com.daml.ledger.participant.state.v1.Configuration +import com.digitalasset.daml.lf.data.Ref import com.digitalasset.daml.lf.data.Ref.{PackageId, Party} import com.digitalasset.daml.lf.language.Ast import com.digitalasset.daml.lf.transaction.Node.GlobalKey import com.digitalasset.daml.lf.value.Value import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, ContractInst} import com.digitalasset.daml_lf_dev.DamlLf.Archive -import com.digitalasset.ledger.api.domain.{LedgerId, PartyDetails, TransactionId} +import com.digitalasset.ledger.api.domain.{ + ApplicationId, + CompletionEvent, + LedgerId, + PartyDetails, + TransactionId +} import com.digitalasset.ledger.api.health.ReportsHealth import com.digitalasset.platform.participant.util.EventFilter.TemplateAwareFilter import com.digitalasset.platform.store.entries.{ @@ -36,6 +43,12 @@ trait ReadOnlyLedger extends ReportsHealth with AutoCloseable { def ledgerEnd: Long + def completions( + beginInclusive: Option[Long], + endExclusive: Option[Long], + applicationId: ApplicationId, + parties: Set[Ref.Party]): Source[(Long, CompletionEvent), NotUsed] + def snapshot(filter: TemplateAwareFilter): Future[LedgerSnapshot] def lookupContract( diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/store/dao/CommandCompletionsReader.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/store/dao/CommandCompletionsReader.scala new file mode 100644 index 00000000000..5e00b0d0d60 --- /dev/null +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/store/dao/CommandCompletionsReader.scala @@ -0,0 +1,45 @@ +// Copyright (c) 2020 The DAML Authors. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.digitalasset.platform.store.dao + +import akka.NotUsed +import akka.stream.scaladsl.Source +import com.digitalasset.daml.lf.data.Ref +import com.digitalasset.ledger.ApplicationId +import com.digitalasset.ledger.api.domain.CompletionEvent +import com.digitalasset.platform.store.CompletionFromTransaction + +private[dao] object CommandCompletionsReader { + + // Uses an existing LedgerReadDao to read completions from the ledger entries stream + // TODO Replace this to tap directly into the index + def apply(reader: LedgerReadDao): CommandCompletionsReader[LedgerDao#LedgerOffset] = + (from: Long, to: Long, appId: ApplicationId, parties: Set[Ref.Party]) => + reader + .getLedgerEntries(from, to) + .map { case (offset, entry) => (offset + 1, entry) } + .collect(CompletionFromTransaction(appId, parties)) +} + +trait CommandCompletionsReader[LedgerOffset] { + + /** + * Returns a stream of command completions + * + * TODO The current type parameter is to differentiate between checkpoints + * TODO and actual completions, it will change when we drop checkpoints + * + * TODO Drop the LedgerOffset from the source when we replace the Dispatcher mechanism + * + * @param startInclusive starting offset inclusive + * @param endExclusive ending offset exclusive + * @return a stream of command completions tupled with their offset + */ + def getCommandCompletions( + startInclusive: LedgerOffset, + endExclusive: LedgerOffset, + applicationId: ApplicationId, + parties: Set[Ref.Party]): Source[(LedgerOffset, CompletionEvent), NotUsed] + +} diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/store/dao/JdbcLedgerDao.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/store/dao/JdbcLedgerDao.scala index 58a1e9be26b..7700fa4562a 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/store/dao/JdbcLedgerDao.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/store/dao/JdbcLedgerDao.scala @@ -1646,6 +1646,7 @@ private class JdbcLedgerDao( BatchSql(query, params.head, params.drop(1).toArray: _*).execute() } + override val completions: CommandCompletionsReader[LedgerOffset] = CommandCompletionsReader(this) } object JdbcLedgerDao { diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/store/dao/LedgerDao.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/store/dao/LedgerDao.scala index 6e3c42af658..247435058f0 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/store/dao/LedgerDao.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/store/dao/LedgerDao.scala @@ -138,6 +138,8 @@ trait LedgerReadDao extends ReportsHealth { startInclusive: LedgerOffset, endExclusive: LedgerOffset): Source[(LedgerOffset, PackageLedgerEntry), NotUsed] + def completions: CommandCompletionsReader[LedgerOffset] + } trait LedgerWriteDao extends ReportsHealth { diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/store/dao/MeteredLedgerDao.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/store/dao/MeteredLedgerDao.scala index 58987b3a5b7..07d5488ee94 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/store/dao/MeteredLedgerDao.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/store/dao/MeteredLedgerDao.scala @@ -117,6 +117,8 @@ class MeteredLedgerReadDao(ledgerDao: LedgerReadDao, metrics: MetricRegistry) startInclusive: LedgerOffset, endExclusive: LedgerOffset): Source[(LedgerOffset, ConfigurationEntry), NotUsed] = ledgerDao.getConfigurationEntries(startInclusive, endExclusive) + + override val completions: CommandCompletionsReader[LedgerOffset] = ledgerDao.completions } class MeteredLedgerDao(ledgerDao: LedgerDao, metrics: MetricRegistry)