Push down completion requests to data access layer (#4609)

* Push down completion requests to data access layer

This is largely a refactoring. The externally observable behavior is unchanged, but:

- a sub-dao is created for command completions (with the intent of breaking up the dao completely in future commits)
- the command completions dao can, in theory, directly fetch completions off the index
- in practice this is not implemented here to keep this PR as small as possible

Filtering ledger entries to get completions is moved to a function that is in turn used by:
- the ledger dao
- the in-memory sandbox

The plan for the former is to add a new table where completion-relevant data is stored so that it can be fetched quickly.

The plan for the latter is to get rid of it once DAML-on-SQL ships.

CHANGELOG_BEGIN
CHANGELOG_END

* Fix off-by-one error in the in-memory sandbox
This commit is contained in:
Stefano Baghino 2020-02-19 18:37:07 +01:00 committed by GitHub
parent 183a2ea9fa
commit 9eb28924ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 169 additions and 52 deletions

View File

@ -22,11 +22,6 @@ import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, ContractInst}
import com.digitalasset.daml_lf_dev.DamlLf.Archive
import com.digitalasset.dec.{DirectExecutionContext => DEC}
import com.digitalasset.ledger.api.domain
import com.digitalasset.ledger.api.domain.CompletionEvent.{
Checkpoint,
CommandAccepted,
CommandRejected
}
import com.digitalasset.ledger.api.domain.{
ApplicationId,
CompletionEvent,
@ -44,8 +39,6 @@ import com.digitalasset.platform.server.api.validation.ErrorFactories
import com.digitalasset.platform.store.Contract.ActiveContract
import com.digitalasset.platform.store.entries.{LedgerEntry, PartyLedgerEntry}
import com.digitalasset.platform.store.{LedgerSnapshot, ReadOnlyLedger}
import scalaz.Tag
import scalaz.syntax.tag._
import scala.concurrent.Future
@ -224,48 +217,11 @@ abstract class LedgerBackedIndexService(
begin: LedgerOffset,
applicationId: ApplicationId,
parties: Set[Ref.Party]
): Source[CompletionEvent, NotUsed] = {
val converter = new OffsetConverter()
converter.toAbsolute(begin).flatMapConcat {
): Source[CompletionEvent, NotUsed] =
new OffsetConverter().toAbsolute(begin).flatMapConcat {
case LedgerOffset.Absolute(absBegin) =>
ledger
.ledgerEntries(Some(absBegin.toLong), endExclusive = None)
.map {
case (offset, entry) =>
(offset + 1, entry) //doing the same as above with transactions. The ledger api has to return a non-inclusive offset
}
.collect {
case (offset, t: LedgerEntry.Transaction)
// We only send out completions for transactions for which we have the full submitter information (appId, submitter, cmdId).
//
// This doesn't make a difference for the sandbox (because it represents the ledger backend + api server in single package).
// But for an api server that is part of a distributed ledger network, we might see
// transactions that originated from some other api server. These transactions don't contain the submitter information,
// and therefore we don't emit CommandAccepted completions for those
if t.applicationId.contains(applicationId.unwrap) &&
t.submittingParty.exists(parties.contains) &&
t.commandId.nonEmpty =>
CommandAccepted(
domain.LedgerOffset.Absolute(Ref.LedgerString.assertFromString(offset.toString)),
t.recordedAt,
Tag.subst(t.commandId).get,
domain.TransactionId(t.transactionId)
)
case (offset, c: LedgerEntry.Checkpoint) =>
Checkpoint(
domain.LedgerOffset.Absolute(Ref.LedgerString.assertFromString(offset.toString)),
c.recordedAt)
case (offset, r: LedgerEntry.Rejection)
if r.commandId.nonEmpty && r.applicationId.contains(applicationId.unwrap) =>
CommandRejected(
domain.LedgerOffset.Absolute(Ref.LedgerString.assertFromString(offset.toString)),
r.recordTime,
domain.CommandId(r.commandId),
r.rejectionReason)
}
ledger.completions(Option(absBegin.toLong), None, applicationId, parties).map(_._2)
}
}
// IndexPackagesService
override def listLfPackages(): Future[Map[PackageId, PackageDetails]] =

View File

@ -14,7 +14,8 @@ import com.digitalasset.daml.lf.transaction.Node.GlobalKey
import com.digitalasset.daml.lf.value.Value
import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, ContractInst}
import com.digitalasset.daml_lf_dev.DamlLf.Archive
import com.digitalasset.ledger.api.domain.{LedgerId, PartyDetails, TransactionId}
import com.digitalasset.ledger.api.domain
import com.digitalasset.ledger.api.domain.{ApplicationId, LedgerId, PartyDetails, TransactionId}
import com.digitalasset.ledger.api.health.HealthStatus
import com.digitalasset.platform.metrics.timedFuture
import com.digitalasset.platform.participant.util.EventFilter.TemplateAwareFilter
@ -35,7 +36,7 @@ class MeteredReadOnlyLedger(ledger: ReadOnlyLedger, metrics: MetricRegistry)
val lookupContract: Timer = metrics.timer("daml.index.lookup_contract")
val lookupKey: Timer = metrics.timer("daml.index.lookup_key")
val lookupTransaction: Timer = metrics.timer("daml.index.lookup_transaction")
val lookupLedgerConfiguration: Timer = metrics.timer("daml.index.lookup_ledger_configuration ")
val lookupLedgerConfiguration: Timer = metrics.timer("daml.index.lookup_ledger_configuration")
val parties: Timer = metrics.timer("daml.index.parties")
val listLfPackages: Timer = metrics.timer("daml.index.list_lf_packages")
val getLfArchive: Timer = metrics.timer("daml.index.get_lf_archive")
@ -53,6 +54,13 @@ class MeteredReadOnlyLedger(ledger: ReadOnlyLedger, metrics: MetricRegistry)
override def ledgerEnd: Long = ledger.ledgerEnd
override def completions(
beginInclusive: Option[Long],
endExclusive: Option[Long],
applicationId: ApplicationId,
parties: Set[Party]): Source[(Long, domain.CompletionEvent), NotUsed] =
ledger.completions(beginInclusive, endExclusive, applicationId, parties)
override def snapshot(filter: TemplateAwareFilter): Future[LedgerSnapshot] =
ledger.snapshot(filter)

View File

@ -26,6 +26,7 @@ import com.digitalasset.daml_lf_dev.DamlLf.Archive
import com.digitalasset.ledger.api.domain.{
ApplicationId,
CommandId,
CompletionEvent,
LedgerId,
PartyDetails,
RejectionReason,
@ -45,9 +46,10 @@ import com.digitalasset.platform.store.entries.{
PackageLedgerEntry,
PartyLedgerEntry
}
import com.digitalasset.platform.store.LedgerSnapshot
import com.digitalasset.platform.store.{CompletionFromTransaction, LedgerSnapshot}
import org.slf4j.LoggerFactory
import scalaz.Tag
import scalaz.syntax.tag.ToTagOps
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}
@ -101,6 +103,16 @@ class InMemoryLedger(
private var deduplicator = Deduplicator()
private var ledgerConfiguration: Option[Configuration] = None
override def completions(
beginInclusive: Option[Long],
endExclusive: Option[Long],
applicationId: ApplicationId,
parties: Set[Party]): Source[(Long, CompletionEvent), NotUsed] =
entries
.getSource(beginInclusive, endExclusive)
.collect { case (offset, InMemoryLedgerEntry(entry)) => (offset + 1, entry) }
.collect(CompletionFromTransaction(applicationId.unwrap, parties))
override def ledgerEnd: Long = entries.ledgerEnd
// need to take the lock to make sure the two pieces of data are consistent.

View File

@ -16,7 +16,7 @@ import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, ContractInst}
import com.digitalasset.daml_lf_dev.DamlLf
import com.digitalasset.dec.DirectExecutionContext
import com.digitalasset.ledger.api.domain
import com.digitalasset.ledger.api.domain.{LedgerId, TransactionId}
import com.digitalasset.ledger.api.domain.{ApplicationId, LedgerId, TransactionId}
import com.digitalasset.ledger.api.health.HealthStatus
import com.digitalasset.platform.akkastreams.dispatcher.Dispatcher
import com.digitalasset.platform.akkastreams.dispatcher.SubSource.RangeSource
@ -31,6 +31,7 @@ import com.digitalasset.platform.store.entries.{
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try
import scalaz.syntax.tag.ToTagOps
class BaseLedger(val ledgerId: LedgerId, headAtInitialization: Long, ledgerDao: LedgerReadDao)
extends ReadOnlyLedger {
@ -60,6 +61,17 @@ class BaseLedger(val ledgerId: LedgerId, headAtInitialization: Long, ledgerDao:
override def ledgerEnd: Long = dispatcher.getHead()
override def completions(
beginInclusive: Option[Long],
endExclusive: Option[Long],
applicationId: ApplicationId,
parties: Set[Party]): Source[(Long, domain.CompletionEvent), NotUsed] =
dispatcher.startingAt(
beginInclusive.getOrElse(0),
RangeSource(ledgerDao.completions.getCommandCompletions(_, _, applicationId.unwrap, parties)),
endExclusive
)
override def snapshot(filter: TemplateAwareFilter): Future[LedgerSnapshot] =
// instead of looking up the latest ledger end, we can only take the latest known ledgerEnd in the scope of SqlLedger.
// If we don't do that, we can miss contracts from a partially inserted batch insert of ledger entries

View File

@ -0,0 +1,66 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.platform.store
import com.daml.ledger.participant.state.v1.TransactionId
import com.digitalasset.daml.lf.data.Ref
import com.digitalasset.ledger.{ApplicationId, CommandId}
import com.digitalasset.ledger.api.domain
import com.digitalasset.ledger.api.domain.CompletionEvent
import com.digitalasset.ledger.api.domain.CompletionEvent.{CommandAccepted, CommandRejected}
import com.digitalasset.platform.store.dao.LedgerDao
import com.digitalasset.platform.store.entries.LedgerEntry
// Turn a stream of transactions into a stream of completions for a given application and set of parties
// TODO Remove this when:
// TODO - the participant can read completions off the index directly AND
// TODO - the in-memory sandbox is gone
private[platform] object CompletionFromTransaction {
private def toApiOffset(offset: LedgerDao#LedgerOffset): domain.LedgerOffset.Absolute =
domain.LedgerOffset.Absolute(Ref.LedgerString.assertFromString(offset.toString))
private def toApiCommandId(commandId: CommandId): domain.CommandId =
domain.CommandId(commandId)
private def toApiTransactionId(transactionId: TransactionId): domain.TransactionId =
domain.TransactionId(transactionId)
// Filter completions for transactions for which we have the full submitter information: appId, submitter, cmdId
// This doesn't make a difference for the sandbox (because it represents the ledger backend + api server in single package).
// But for an api server that is part of a distributed ledger network, we might see
// transactions that originated from some other api server. These transactions don't contain the submitter information,
// and therefore we don't emit CommandAccepted completions for those
def apply(appId: ApplicationId, parties: Set[Ref.Party]): PartialFunction[
(LedgerDao#LedgerOffset, LedgerEntry),
(LedgerDao#LedgerOffset, CompletionEvent)] = {
case (
offset,
LedgerEntry.Transaction(
Some(cmdId),
transactionId,
Some(`appId`),
Some(submitter),
_,
_,
recordTime,
_,
_)) if parties(submitter) =>
offset -> CommandAccepted(
toApiOffset(offset),
recordTime,
toApiCommandId(cmdId),
toApiTransactionId(transactionId))
case (offset, LedgerEntry.Rejection(recordTime, commandId, `appId`, submitter, rejectionReason))
if parties(submitter) =>
offset -> CommandRejected(
toApiOffset(offset),
recordTime,
toApiCommandId(commandId),
rejectionReason)
case (offset, LedgerEntry.Checkpoint(recordedAt)) =>
offset -> CompletionEvent.Checkpoint(toApiOffset(offset), recordedAt)
}
}

View File

@ -7,13 +7,20 @@ import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.ledger.participant.state.index.v2.PackageDetails
import com.daml.ledger.participant.state.v1.Configuration
import com.digitalasset.daml.lf.data.Ref
import com.digitalasset.daml.lf.data.Ref.{PackageId, Party}
import com.digitalasset.daml.lf.language.Ast
import com.digitalasset.daml.lf.transaction.Node.GlobalKey
import com.digitalasset.daml.lf.value.Value
import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, ContractInst}
import com.digitalasset.daml_lf_dev.DamlLf.Archive
import com.digitalasset.ledger.api.domain.{LedgerId, PartyDetails, TransactionId}
import com.digitalasset.ledger.api.domain.{
ApplicationId,
CompletionEvent,
LedgerId,
PartyDetails,
TransactionId
}
import com.digitalasset.ledger.api.health.ReportsHealth
import com.digitalasset.platform.participant.util.EventFilter.TemplateAwareFilter
import com.digitalasset.platform.store.entries.{
@ -36,6 +43,12 @@ trait ReadOnlyLedger extends ReportsHealth with AutoCloseable {
def ledgerEnd: Long
def completions(
beginInclusive: Option[Long],
endExclusive: Option[Long],
applicationId: ApplicationId,
parties: Set[Ref.Party]): Source[(Long, CompletionEvent), NotUsed]
def snapshot(filter: TemplateAwareFilter): Future[LedgerSnapshot]
def lookupContract(

View File

@ -0,0 +1,45 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.platform.store.dao
import akka.NotUsed
import akka.stream.scaladsl.Source
import com.digitalasset.daml.lf.data.Ref
import com.digitalasset.ledger.ApplicationId
import com.digitalasset.ledger.api.domain.CompletionEvent
import com.digitalasset.platform.store.CompletionFromTransaction
private[dao] object CommandCompletionsReader {
// Uses an existing LedgerReadDao to read completions from the ledger entries stream
// TODO Replace this to tap directly into the index
def apply(reader: LedgerReadDao): CommandCompletionsReader[LedgerDao#LedgerOffset] =
(from: Long, to: Long, appId: ApplicationId, parties: Set[Ref.Party]) =>
reader
.getLedgerEntries(from, to)
.map { case (offset, entry) => (offset + 1, entry) }
.collect(CompletionFromTransaction(appId, parties))
}
trait CommandCompletionsReader[LedgerOffset] {
/**
* Returns a stream of command completions
*
* TODO The current type parameter is to differentiate between checkpoints
* TODO and actual completions, it will change when we drop checkpoints
*
* TODO Drop the LedgerOffset from the source when we replace the Dispatcher mechanism
*
* @param startInclusive starting offset inclusive
* @param endExclusive ending offset exclusive
* @return a stream of command completions tupled with their offset
*/
def getCommandCompletions(
startInclusive: LedgerOffset,
endExclusive: LedgerOffset,
applicationId: ApplicationId,
parties: Set[Ref.Party]): Source[(LedgerOffset, CompletionEvent), NotUsed]
}

View File

@ -1646,6 +1646,7 @@ private class JdbcLedgerDao(
BatchSql(query, params.head, params.drop(1).toArray: _*).execute()
}
override val completions: CommandCompletionsReader[LedgerOffset] = CommandCompletionsReader(this)
}
object JdbcLedgerDao {

View File

@ -138,6 +138,8 @@ trait LedgerReadDao extends ReportsHealth {
startInclusive: LedgerOffset,
endExclusive: LedgerOffset): Source[(LedgerOffset, PackageLedgerEntry), NotUsed]
def completions: CommandCompletionsReader[LedgerOffset]
}
trait LedgerWriteDao extends ReportsHealth {

View File

@ -117,6 +117,8 @@ class MeteredLedgerReadDao(ledgerDao: LedgerReadDao, metrics: MetricRegistry)
startInclusive: LedgerOffset,
endExclusive: LedgerOffset): Source[(LedgerOffset, ConfigurationEntry), NotUsed] =
ledgerDao.getConfigurationEntries(startInclusive, endExclusive)
override val completions: CommandCompletionsReader[LedgerOffset] = ledgerDao.completions
}
class MeteredLedgerDao(ledgerDao: LedgerDao, metrics: MetricRegistry)