Simplify IndexService hierarchy [DPP-932] (#13043)

* Squash LedgerBackedIndexService, SpannedIndexService and MeteredReadOnlyLedger
into ReadOnlyLedgerImpl

changelog_begin
changelog_end

* Rename ReadOnlyLedgerImpl to IndexServiceImpl

* Metrics cleanup
* Remove redundant metered layer stemming from MeteredReadOnlyLedger

* Simplified getLedgerId

* Addressed Marton's review comments
This commit is contained in:
tudor-da 2022-03-01 14:30:30 +01:00 committed by GitHub
parent e1ccc2f4f4
commit 6da1cde1b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 310 additions and 1065 deletions

View File

@ -352,25 +352,6 @@ final class Metrics(val registry: MetricRegistry) {
private val Prefix = daml.Prefix :+ "index"
val decodeStateEvent: Timer = registry.timer(Prefix :+ "decode_state_event")
val lookupContract: Timer = registry.timer(Prefix :+ "lookup_contract")
val lookupKey: Timer = registry.timer(Prefix :+ "lookup_key")
val lookupFlatTransactionById: Timer =
registry.timer(Prefix :+ "lookup_flat_transaction_by_id")
val lookupTransactionTreeById: Timer =
registry.timer(Prefix :+ "lookup_transaction_tree_by_id")
val lookupLedgerConfiguration: Timer = registry.timer(Prefix :+ "lookup_ledger_configuration")
val lookupMaximumLedgerTime: Timer = registry.timer(Prefix :+ "lookup_maximum_ledger_time")
val getParties: Timer = registry.timer(Prefix :+ "get_parties")
val listKnownParties: Timer = registry.timer(Prefix :+ "list_known_parties")
val listLfPackages: Timer = registry.timer(Prefix :+ "list_lf_packages")
val getLfArchive: Timer = registry.timer(Prefix :+ "get_lf_archive")
val prune: Timer = registry.timer(Prefix :+ "prune")
val getTransactionMetering: Timer = registry.timer(Prefix :+ "get_transaction_metering")
val publishTransaction: Timer = registry.timer(Prefix :+ "publish_transaction")
val publishPartyAllocation: Timer = registry.timer(Prefix :+ "publish_party_allocation")
val uploadPackages: Timer = registry.timer(Prefix :+ "upload_packages")
val publishConfiguration: Timer = registry.timer(Prefix :+ "publish_configuration")
val decodeTransactionLogUpdate: Timer =
registry.timer(Prefix :+ "transaction_log_update_decode")
@ -545,9 +526,7 @@ final class Metrics(val registry: MetricRegistry) {
registry.histogram(Prefix :+ "exercise_result_compressed")
val exerciseResultUncompressed: Histogram =
registry.histogram(Prefix :+ "exercise_result_uncompressed")
}
}
}
@ -645,7 +624,6 @@ final class Metrics(val registry: MetricRegistry) {
val lookupActiveContract: Timer = registry.timer(Prefix :+ "lookup_active_contract")
val lookupContractKey: Timer = registry.timer(Prefix :+ "lookup_contract_key")
val lookupMaximumLedgerTime: Timer = registry.timer(Prefix :+ "lookup_maximum_ledger_time")
val getLedgerId: Timer = registry.timer(Prefix :+ "get_ledger_id")
val getParticipantId: Timer = registry.timer(Prefix :+ "get_participant_id")
val getParties: Timer = registry.timer(Prefix :+ "get_parties")
val listKnownParties: Timer = registry.timer(Prefix :+ "list_known_parties")

View File

@ -116,14 +116,13 @@ private[daml] object ApiServices {
override def acquire()(implicit context: ResourceContext): Resource[ApiServices] = {
logger.info(engine.info.toString)
for {
ledgerId <- Resource.fromFuture(indexService.getLedgerId())
currentLedgerConfiguration <- configurationInitializer.initialize(
initialLedgerConfiguration = initialLedgerConfiguration,
configurationLoadTimeout = ScalaDuration.fromNanos(configurationLoadTimeout.toNanos),
)
services <- Resource(
Future(
createServices(ledgerId, currentLedgerConfiguration, checkOverloaded)(
createServices(identityService.ledgerId, currentLedgerConfiguration, checkOverloaded)(
servicesExecutionContext
)
)
@ -147,7 +146,7 @@ private[daml] object ApiServices {
ApiTransactionService.create(ledgerId, transactionsService, metrics)
val apiLedgerIdentityService =
ApiLedgerIdentityService.create(() => identityService.getLedgerId())
ApiLedgerIdentityService.create(ledgerId)
val apiVersionService =
ApiVersionService.create(

View File

@ -1,196 +0,0 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.apiserver
import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.daml_lf_dev.DamlLf
import com.daml.ledger.api.domain.{ConfigurationEntry, LedgerId, LedgerOffset, TransactionId}
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.daml.ledger.api.v1.transaction_service.{
GetFlatTransactionResponse,
GetTransactionResponse,
GetTransactionTreesResponse,
GetTransactionsResponse,
}
import com.daml.ledger.api.{TraceIdentifiers, domain}
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.index.v2
import com.daml.ledger.participant.state.index.v2.MeteringStore.ReportData
import com.daml.ledger.participant.state.index.v2.{IndexService, MaximumLedgerTime}
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.ApplicationId
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.transaction.GlobalKey
import com.daml.lf.value.Value
import com.daml.logging.LoggingContext
import com.daml.telemetry.{Event, Spans}
import scala.concurrent.Future
private[daml] final class SpannedIndexService(delegate: IndexService) extends IndexService {
override def listLfPackages()(implicit
loggingContext: LoggingContext
): Future[Map[Ref.PackageId, v2.PackageDetails]] =
delegate.listLfPackages()
override def getLfArchive(packageId: Ref.PackageId)(implicit
loggingContext: LoggingContext
): Future[Option[DamlLf.Archive]] =
delegate.getLfArchive(packageId)
override def packageEntries(
startExclusive: Option[LedgerOffset.Absolute]
)(implicit loggingContext: LoggingContext): Source[domain.PackageEntry, NotUsed] =
delegate.packageEntries(startExclusive)
override def getLedgerConfiguration()(implicit
loggingContext: LoggingContext
): Source[v2.LedgerConfiguration, NotUsed] =
delegate.getLedgerConfiguration()
override def currentLedgerEnd()(implicit
loggingContext: LoggingContext
): Future[LedgerOffset.Absolute] =
delegate.currentLedgerEnd()
override def getCompletions(
begin: domain.LedgerOffset,
applicationId: Ref.ApplicationId,
parties: Set[Ref.Party],
)(implicit loggingContext: LoggingContext): Source[CompletionStreamResponse, NotUsed] =
delegate.getCompletions(begin, applicationId, parties)
override def transactions(
begin: domain.LedgerOffset,
endAt: Option[domain.LedgerOffset],
filter: domain.TransactionFilter,
verbose: Boolean,
)(implicit loggingContext: LoggingContext): Source[GetTransactionsResponse, NotUsed] =
delegate
.transactions(begin, endAt, filter, verbose)
.wireTap(
_.transactions
.map(transaction =>
Event(transaction.commandId, TraceIdentifiers.fromTransaction(transaction))
)
.foreach(Spans.addEventToCurrentSpan)
)
override def transactionTrees(
begin: domain.LedgerOffset,
endAt: Option[domain.LedgerOffset],
filter: domain.TransactionFilter,
verbose: Boolean,
)(implicit loggingContext: LoggingContext): Source[GetTransactionTreesResponse, NotUsed] =
delegate
.transactionTrees(begin, endAt, filter, verbose)
.wireTap(
_.transactions
.map(transaction =>
Event(transaction.commandId, TraceIdentifiers.fromTransactionTree(transaction))
)
.foreach(Spans.addEventToCurrentSpan)
)
override def getTransactionById(
transactionId: TransactionId,
requestingParties: Set[Ref.Party],
)(implicit loggingContext: LoggingContext): Future[Option[GetFlatTransactionResponse]] =
delegate.getTransactionById(transactionId, requestingParties)
override def getTransactionTreeById(
transactionId: TransactionId,
requestingParties: Set[Ref.Party],
)(implicit loggingContext: LoggingContext): Future[Option[GetTransactionResponse]] =
delegate.getTransactionTreeById(transactionId, requestingParties)
override def getActiveContracts(
filter: domain.TransactionFilter,
verbose: Boolean,
)(implicit loggingContext: LoggingContext): Source[GetActiveContractsResponse, NotUsed] =
delegate.getActiveContracts(filter, verbose)
override def lookupActiveContract(
readers: Set[Ref.Party],
contractId: Value.ContractId,
)(implicit
loggingContext: LoggingContext
): Future[Option[Value.VersionedContractInstance]] =
delegate.lookupActiveContract(readers, contractId)
override def lookupContractKey(
readers: Set[Ref.Party],
key: GlobalKey,
)(implicit loggingContext: LoggingContext): Future[Option[Value.ContractId]] =
delegate.lookupContractKey(readers, key)
override def lookupMaximumLedgerTimeAfterInterpretation(
ids: Set[Value.ContractId]
)(implicit loggingContext: LoggingContext): Future[MaximumLedgerTime] =
delegate.lookupMaximumLedgerTimeAfterInterpretation(ids)
override def getLedgerId()(implicit loggingContext: LoggingContext): Future[LedgerId] =
delegate.getLedgerId()
override def getParticipantId()(implicit
loggingContext: LoggingContext
): Future[Ref.ParticipantId] =
delegate.getParticipantId()
override def getParties(parties: Seq[Ref.Party])(implicit
loggingContext: LoggingContext
): Future[List[domain.PartyDetails]] =
delegate.getParties(parties)
override def listKnownParties()(implicit
loggingContext: LoggingContext
): Future[List[domain.PartyDetails]] =
delegate.listKnownParties()
override def partyEntries(
startExclusive: Option[LedgerOffset.Absolute]
)(implicit loggingContext: LoggingContext): Source[domain.PartyEntry, NotUsed] =
delegate.partyEntries(startExclusive)
override def lookupConfiguration()(implicit
loggingContext: LoggingContext
): Future[Option[(LedgerOffset.Absolute, Configuration)]] =
delegate.lookupConfiguration()
override def configurationEntries(
startExclusive: Option[LedgerOffset.Absolute]
)(implicit
loggingContext: LoggingContext
): Source[(LedgerOffset.Absolute, ConfigurationEntry), NotUsed] =
delegate.configurationEntries(startExclusive)
override def prune(pruneUpToInclusive: Offset, pruneAllDivulgedContracts: Boolean)(implicit
loggingContext: LoggingContext
): Future[Unit] =
delegate.prune(pruneUpToInclusive, pruneAllDivulgedContracts)
override def getCompletions(
startExclusive: LedgerOffset,
endInclusive: LedgerOffset,
applicationId: Ref.ApplicationId,
parties: Set[Ref.Party],
)(implicit loggingContext: LoggingContext): Source[CompletionStreamResponse, NotUsed] =
delegate.getCompletions(startExclusive, endInclusive, applicationId, parties)
override def currentHealth(): HealthStatus =
delegate.currentHealth()
override def getMeteringReportData(
from: Timestamp,
to: Option[Timestamp],
applicationId: Option[ApplicationId],
)(implicit loggingContext: LoggingContext): Future[ReportData] = {
delegate.getMeteringReportData(from, to, applicationId)
}
}

View File

@ -3,8 +3,6 @@
package com.daml.platform.apiserver
import java.io.File
import akka.stream.Materializer
import com.daml.ledger.api.domain
import com.daml.ledger.configuration.LedgerId
@ -15,10 +13,12 @@ import com.daml.lf.data.Time.Timestamp
import com.daml.lf.engine.{Engine, ValueEnricher}
import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
import com.daml.platform.index.JdbcIndex
import com.daml.platform.index.IndexServiceBuilder
import com.daml.platform.packages.InMemoryPackageStore
import com.daml.platform.server.api.validation.ErrorFactories
import com.daml.platform.store.{DbSupport, LfValueTranslationCache}
import java.io.File
import scala.concurrent.ExecutionContextExecutor
object StandaloneIndexService {
@ -71,28 +71,29 @@ object StandaloneIndexService {
val packageStore = loadDamlPackages()
preloadPackages(packageStore)
})
indexService <- JdbcIndex
.owner(
dbSupport = dbSupport,
ledgerId = domain.LedgerId(ledgerId),
participantId = participantId,
eventsPageSize = config.eventsPageSize,
eventsProcessingParallelism = config.eventsProcessingParallelism,
acsIdPageSize = config.acsIdPageSize,
acsIdFetchingParallelism = config.acsIdFetchingParallelism,
acsContractFetchingParallelism = config.acsContractFetchingParallelism,
acsGlobalParallelism = config.acsGlobalParallelism,
acsIdQueueLimit = config.acsIdQueueLimit,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,
enricher = valueEnricher,
maxContractStateCacheSize = config.maxContractStateCacheSize,
maxContractKeyStateCacheSize = config.maxContractKeyStateCacheSize,
maxTransactionsInMemoryFanOutBufferSize = config.maxTransactionsInMemoryFanOutBufferSize,
enableInMemoryFanOutForLedgerApi = config.enableInMemoryFanOutForLedgerApi,
)
.map(index => new SpannedIndexService(new TimedIndexService(index, metrics)))
indexService <- IndexServiceBuilder(
dbSupport = dbSupport,
initialLedgerId = domain.LedgerId(ledgerId),
participantId = participantId,
eventsPageSize = config.eventsPageSize,
eventsProcessingParallelism = config.eventsProcessingParallelism,
acsIdPageSize = config.acsIdPageSize,
acsIdFetchingParallelism = config.acsIdFetchingParallelism,
acsContractFetchingParallelism = config.acsContractFetchingParallelism,
acsGlobalParallelism = config.acsGlobalParallelism,
acsIdQueueLimit = config.acsIdQueueLimit,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,
enricher = valueEnricher,
maxContractStateCacheSize = config.maxContractStateCacheSize,
maxContractKeyStateCacheSize = config.maxContractKeyStateCacheSize,
maxTransactionsInMemoryFanOutBufferSize = config.maxTransactionsInMemoryFanOutBufferSize,
enableInMemoryFanOutForLedgerApi = config.enableInMemoryFanOutForLedgerApi,
errorFactories = ErrorFactories(),
)(materializer, loggingContext, servicesExecutionContext)
.owner()
.map(index => new TimedIndexService(index, metrics))
} yield indexService
}
}

View File

@ -35,6 +35,8 @@ import scala.concurrent.Future
private[daml] final class TimedIndexService(delegate: IndexService, metrics: Metrics)
extends IndexService {
override def ledgerId: LedgerId = delegate.ledgerId
override def listLfPackages()(implicit
loggingContext: LoggingContext
): Future[Map[Ref.PackageId, v2.PackageDetails]] =
@ -153,9 +155,6 @@ private[daml] final class TimedIndexService(delegate: IndexService, metrics: Met
delegate.lookupMaximumLedgerTimeAfterInterpretation(ids),
)
override def getLedgerId()(implicit loggingContext: LoggingContext): Future[LedgerId] =
Timed.future(metrics.daml.services.index.getLedgerId, delegate.getLedgerId())
override def getParticipantId()(implicit
loggingContext: LoggingContext
): Future[Ref.ParticipantId] =

View File

@ -57,12 +57,10 @@ private[apiserver] final class ApiLedgerIdentityService private (
}
private[apiserver] object ApiLedgerIdentityService {
def create(
getLedgerId: () => Future[LedgerId]
)(implicit
def create(ledgerId: LedgerId)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
): ApiLedgerIdentityService with BindableService = {
new ApiLedgerIdentityService(getLedgerId)
new ApiLedgerIdentityService(() => Future.successful(ledgerId))
}
}

View File

@ -6,6 +6,7 @@ import akka.stream._
import com.daml.error.definitions.IndexErrors.IndexDbException
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.index.v2.IndexService
import com.daml.ledger.resources.ResourceOwner
import com.daml.lf.data.Ref
import com.daml.lf.engine.ValueEnricher
@ -42,7 +43,7 @@ import com.daml.timer.RetryStrategy
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
private[index] case class ReadOnlyLedgerBuilder(
private[platform] case class IndexServiceBuilder(
dbSupport: DbSupport,
initialLedgerId: LedgerId,
eventsPageSize: Int,
@ -69,7 +70,7 @@ private[index] case class ReadOnlyLedgerBuilder(
) {
private val logger = ContextualizedLogger.get(getClass)
def owner(): ResourceOwner[ReadOnlyLedger] = {
def owner(): ResourceOwner[IndexService] = {
val ledgerEndCache = MutableLedgerEndCache()
val stringInterningView = createStringInterningView()
val ledgerDao = createLedgerReadDao(ledgerEndCache, stringInterningView)
@ -103,13 +104,15 @@ private[index] case class ReadOnlyLedgerBuilder(
instrumentedSignalNewLedgerHead,
prefetchingDispatcher,
)
} yield new ReadOnlyLedgerImpl(
} yield new IndexServiceImpl(
ledgerId,
participantId,
ledgerDao,
transactionsReader,
contractStore,
pruneBuffers,
generalDispatcher,
errorFactories,
)
}

View File

@ -5,15 +5,14 @@ package com.daml.platform.index
import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.daml_lf_dev.DamlLf.Archive
import com.daml.daml_lf_dev.DamlLf
import com.daml.error.DamlContextualizedErrorLogger
import com.daml.ledger.api.domain
import com.daml.ledger.api.{TraceIdentifiers, domain}
import com.daml.ledger.api.domain.ConfigurationEntry.Accepted
import com.daml.ledger.api.domain.{
LedgerId,
LedgerOffset,
PackageEntry,
PartyDetails,
PartyEntry,
TransactionFilter,
TransactionId,
@ -29,66 +28,55 @@ import com.daml.ledger.api.v1.transaction_service.{
}
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.index.v2.MeteringStore.{ReportData}
import com.daml.ledger.participant.state.index.v2._
import com.daml.ledger.participant.state.index.v2
import com.daml.ledger.participant.state.index.v2.MeteringStore.ReportData
import com.daml.ledger.participant.state.index.v2.{
ContractStore,
IndexService,
MaximumLedgerTime,
_,
}
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.{ApplicationId, Identifier, PackageId, Party}
import com.daml.lf.data.Ref.{ApplicationId, Identifier, Party}
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.transaction.GlobalKey
import com.daml.lf.value.Value.{ContractId, VersionedContractInstance}
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.ApiOffset
import com.daml.platform.ApiOffset.ApiOffsetConverter
import com.daml.platform.{ApiOffset, PruneBuffers}
import com.daml.platform.akkastreams.dispatcher.Dispatcher
import com.daml.platform.akkastreams.dispatcher.SubSource.RangeSource
import com.daml.platform.server.api.validation.ErrorFactories
import com.daml.platform.store.appendonlydao.{LedgerDaoTransactionsReader, LedgerReadDao}
import com.daml.platform.store.entries.PartyLedgerEntry
import com.daml.telemetry.{SpanAttribute, Spans}
import com.daml.telemetry.{Event, SpanAttribute, Spans}
import scalaz.syntax.tag.ToTagOps
import scala.concurrent.{ExecutionContext, Future}
private[index] final class LedgerBackedIndexService(
ledger: ReadOnlyLedger,
private[index] class IndexServiceImpl(
val ledgerId: LedgerId,
participantId: Ref.ParticipantId,
ledgerDao: LedgerReadDao,
transactionsReader: LedgerDaoTransactionsReader,
contractStore: ContractStore,
pruneBuffers: PruneBuffers,
dispatcher: Dispatcher[Offset],
errorFactories: ErrorFactories,
) extends IndexService {
private val logger = ContextualizedLogger.get(getClass)
override def getLedgerId()(implicit loggingContext: LoggingContext): Future[LedgerId] =
Future.successful(ledger.ledgerId)
override def getParticipantId()(implicit
loggingContext: LoggingContext
): Future[Ref.ParticipantId] =
Future.successful(participantId)
override def currentHealth(): HealthStatus = ledger.currentHealth()
override def currentHealth(): HealthStatus = ledgerDao.currentHealth()
override def getActiveContracts(
filter: TransactionFilter,
verbose: Boolean,
)(implicit loggingContext: LoggingContext): Source[GetActiveContractsResponse, NotUsed] = {
val (acs, ledgerEnd) = ledger
.activeContracts(convertFilter(filter), verbose)
acs.concat(Source.single(GetActiveContractsResponse(offset = ApiOffset.toApiString(ledgerEnd))))
}
override def transactionTrees(
startExclusive: LedgerOffset,
endInclusive: Option[LedgerOffset],
filter: domain.TransactionFilter,
verbose: Boolean,
)(implicit loggingContext: LoggingContext): Source[GetTransactionTreesResponse, NotUsed] =
between(startExclusive, endInclusive)((from, to) => {
from.foreach(offset =>
Spans.setCurrentSpanAttribute(SpanAttribute.OffsetFrom, offset.toHexString)
)
to.foreach(offset =>
Spans.setCurrentSpanAttribute(SpanAttribute.OffsetTo, offset.toHexString)
)
ledger
.transactionTrees(
startExclusive = from,
endInclusive = to,
requestingParties = filter.filtersByParty.keySet,
verbose = verbose,
)
.map(_._2)
})
override def lookupContractKey(readers: Set[Ref.Party], key: GlobalKey)(implicit
loggingContext: LoggingContext
): Future[Option[ContractId]] =
contractStore.lookupContractKey(readers, key)
override def transactions(
startExclusive: domain.LedgerOffset,
@ -103,23 +91,247 @@ private[index] final class LedgerBackedIndexService(
to.foreach(offset =>
Spans.setCurrentSpanAttribute(SpanAttribute.OffsetTo, offset.toHexString)
)
ledger
.flatTransactions(
startExclusive = from,
endInclusive = to,
filter = convertFilter(filter),
verbose = verbose,
dispatcher
.startingAt(
from.getOrElse(Offset.beforeBegin),
RangeSource(transactionsReader.getFlatTransactions(_, _, convertFilter(filter), verbose)),
to,
)
.map(_._2)
})
}).wireTap(
_.transactions.view
.map(transaction =>
Event(transaction.commandId, TraceIdentifiers.fromTransaction(transaction))
)
.foreach(Spans.addEventToCurrentSpan)
)
override def transactionTrees(
startExclusive: LedgerOffset,
endInclusive: Option[LedgerOffset],
filter: domain.TransactionFilter,
verbose: Boolean,
)(implicit loggingContext: LoggingContext): Source[GetTransactionTreesResponse, NotUsed] =
between(startExclusive, endInclusive)((from, to) => {
from.foreach(offset =>
Spans.setCurrentSpanAttribute(SpanAttribute.OffsetFrom, offset.toHexString)
)
to.foreach(offset =>
Spans.setCurrentSpanAttribute(SpanAttribute.OffsetTo, offset.toHexString)
)
dispatcher
.startingAt(
from.getOrElse(Offset.beforeBegin),
RangeSource(
transactionsReader.getTransactionTrees(_, _, filter.filtersByParty.keySet, verbose)
),
to,
)
.map(_._2)
}).wireTap(
_.transactions.view
.map(transaction =>
Event(transaction.commandId, TraceIdentifiers.fromTransactionTree(transaction))
)
.foreach(Spans.addEventToCurrentSpan)
)
override def getCompletions(
startExclusive: LedgerOffset,
applicationId: Ref.ApplicationId,
parties: Set[Ref.Party],
)(implicit loggingContext: LoggingContext): Source[CompletionStreamResponse, NotUsed] = {
convertOffset(startExclusive).flatMapConcat { beginOpt =>
dispatcher
.startingAt(
beginOpt,
RangeSource(ledgerDao.completions.getCommandCompletions(_, _, applicationId, parties)),
None,
)
.map(_._2)
}
}
override def getCompletions(
startExclusive: LedgerOffset,
endInclusive: LedgerOffset,
applicationId: Ref.ApplicationId,
parties: Set[Ref.Party],
)(implicit loggingContext: LoggingContext): Source[CompletionStreamResponse, NotUsed] =
between(startExclusive, Some(endInclusive))((start, end) =>
dispatcher
.startingAt(
start.getOrElse(Offset.beforeBegin),
RangeSource(ledgerDao.completions.getCommandCompletions(_, _, applicationId, parties)),
end,
)
.map(_._2)
)
override def getActiveContracts(
filter: TransactionFilter,
verbose: Boolean,
)(implicit loggingContext: LoggingContext): Source[GetActiveContractsResponse, NotUsed] = {
val currentLedgerEnd = ledgerEnd()
val acs =
ledgerDao.transactionsReader.getActiveContracts(
currentLedgerEnd,
convertFilter(filter),
verbose,
)
acs.concat(
Source.single(GetActiveContractsResponse(offset = ApiOffset.toApiString(currentLedgerEnd)))
)
}
override def lookupActiveContract(
forParties: Set[Ref.Party],
contractId: ContractId,
)(implicit
loggingContext: LoggingContext
): Future[Option[VersionedContractInstance]] =
contractStore.lookupActiveContract(forParties, contractId)
override def getTransactionById(
transactionId: TransactionId,
requestingParties: Set[Ref.Party],
)(implicit loggingContext: LoggingContext): Future[Option[GetFlatTransactionResponse]] =
ledgerDao.transactionsReader
.lookupFlatTransactionById(transactionId.unwrap, requestingParties)
override def getTransactionTreeById(
transactionId: TransactionId,
requestingParties: Set[Ref.Party],
)(implicit loggingContext: LoggingContext): Future[Option[GetTransactionResponse]] =
ledgerDao.transactionsReader
.lookupTransactionTreeById(transactionId.unwrap, requestingParties)
override def lookupMaximumLedgerTimeAfterInterpretation(
contractIds: Set[ContractId]
)(implicit loggingContext: LoggingContext): Future[MaximumLedgerTime] =
contractStore.lookupMaximumLedgerTimeAfterInterpretation(contractIds)
override def getParties(parties: Seq[Ref.Party])(implicit
loggingContext: LoggingContext
): Future[List[domain.PartyDetails]] =
ledgerDao.getParties(parties)
override def listKnownParties()(implicit
loggingContext: LoggingContext
): Future[List[domain.PartyDetails]] =
ledgerDao.listKnownParties()
override def partyEntries(
startExclusive: Option[LedgerOffset.Absolute]
)(implicit loggingContext: LoggingContext): Source[PartyEntry, NotUsed] = {
Source
.future(concreteOffset(startExclusive))
.flatMapConcat(dispatcher.startingAt(_, RangeSource(ledgerDao.getPartyEntries)))
.map {
case (_, PartyLedgerEntry.AllocationRejected(subId, _, reason)) =>
PartyEntry.AllocationRejected(subId, reason)
case (_, PartyLedgerEntry.AllocationAccepted(subId, _, details)) =>
PartyEntry.AllocationAccepted(subId, details)
}
}
override def listLfPackages()(implicit
loggingContext: LoggingContext
): Future[Map[Ref.PackageId, v2.PackageDetails]] =
ledgerDao.listLfPackages()
override def getLfArchive(packageId: Ref.PackageId)(implicit
loggingContext: LoggingContext
): Future[Option[DamlLf.Archive]] =
ledgerDao.getLfArchive(packageId)
override def packageEntries(
startExclusive: Option[LedgerOffset.Absolute]
)(implicit loggingContext: LoggingContext): Source[PackageEntry, NotUsed] =
Source
.future(concreteOffset(startExclusive))
.flatMapConcat(dispatcher.startingAt(_, RangeSource(ledgerDao.getPackageEntries)))
.map(_._2.toDomain)
/** Looks up the current configuration, if set, and the offset from which
* to subscribe to further configuration changes.
* The offset is internal and not exposed over Ledger API.
*/
override def lookupConfiguration()(implicit
loggingContext: LoggingContext
): Future[Option[(LedgerOffset.Absolute, Configuration)]] =
ledgerDao
.lookupLedgerConfiguration()
.map(
_.map { case (offset, config) => (toAbsolute(offset), config) }
)(ExecutionContext.parasitic)
/** Looks up the current configuration, if set, and continues to stream configuration changes.
*/
override def getLedgerConfiguration()(implicit
loggingContext: LoggingContext
): Source[LedgerConfiguration, NotUsed] = {
Source
.future(lookupConfiguration())
.flatMapConcat { optResult =>
val offset = optResult.map(_._1)
val foundConfig = optResult.map(_._2)
val initialConfig = Source(foundConfig.toList)
val configStream = configurationEntries(offset).collect {
case (_, Accepted(_, configuration)) => configuration
}
initialConfig
.concat(configStream)
.map(cfg => LedgerConfiguration(cfg.maxDeduplicationDuration))
}
}
/** Retrieve configuration entries. */
override def configurationEntries(startExclusive: Option[LedgerOffset.Absolute])(implicit
loggingContext: LoggingContext
): Source[(domain.LedgerOffset.Absolute, domain.ConfigurationEntry), NotUsed] =
Source
.future(concreteOffset(startExclusive))
.flatMapConcat(dispatcher.startingAt(_, RangeSource(ledgerDao.getConfigurationEntries)).map {
case (offset, config) =>
toAbsolute(offset) -> config.toDomain
})
override def prune(pruneUpToInclusive: Offset, pruneAllDivulgedContracts: Boolean)(implicit
loggingContext: LoggingContext
): Future[Unit] = {
pruneBuffers(pruneUpToInclusive)
ledgerDao.prune(pruneUpToInclusive, pruneAllDivulgedContracts)
}
override def getMeteringReportData(
from: Timestamp,
to: Option[Timestamp],
applicationId: Option[ApplicationId],
)(implicit loggingContext: LoggingContext): Future[ReportData] =
ledgerDao.meteringReportData(
from: Timestamp,
to: Option[Timestamp],
applicationId: Option[ApplicationId],
)
override def currentLedgerEnd()(implicit
loggingContext: LoggingContext
): Future[LedgerOffset.Absolute] = {
val offset =
if (ledgerEnd() == Offset.beforeBegin) ApiOffset.begin
else ledgerEnd()
Future.successful(toAbsolute(offset))
}
private def ledgerEnd(): Offset = dispatcher.getHead()
// Returns a function that memoizes the current end
// Can be used directly or shared throughout a request processing
private def convertOffset(implicit
loggingContext: LoggingContext
): LedgerOffset => Source[Offset, NotUsed] = {
private def convertOffset: LedgerOffset => Source[Offset, NotUsed] = {
case LedgerOffset.LedgerBegin => Source.single(Offset.beforeBegin)
case LedgerOffset.LedgerEnd => Source.single(ledger.ledgerEnd())
case LedgerOffset.LedgerEnd => Source.single(ledgerEnd())
case LedgerOffset.Absolute(offset) =>
ApiOffset.fromString(offset).fold(Source.failed, off => Source.single(off))
}
@ -155,185 +367,11 @@ private[index] final class LedgerBackedIndexService(
party -> filters.inclusive.fold(Set.empty[Identifier])(_.templateIds)
}
override def currentLedgerEnd()(implicit
loggingContext: LoggingContext
): Future[LedgerOffset.Absolute] = {
val offset =
if (ledger.ledgerEnd() == Offset.beforeBegin) ApiOffset.begin
else ledger.ledgerEnd()
Future.successful(toAbsolute(offset))
}
override def getTransactionById(
transactionId: TransactionId,
requestingParties: Set[Ref.Party],
)(implicit loggingContext: LoggingContext): Future[Option[GetFlatTransactionResponse]] =
ledger.lookupFlatTransactionById(transactionId.unwrap, requestingParties)
override def getTransactionTreeById(
transactionId: TransactionId,
requestingParties: Set[Ref.Party],
)(implicit loggingContext: LoggingContext): Future[Option[GetTransactionResponse]] =
ledger.lookupTransactionTreeById(transactionId.unwrap, requestingParties)
def toAbsolute(offset: Offset): LedgerOffset.Absolute =
LedgerOffset.Absolute(offset.toApiString)
override def getCompletions(
startExclusive: LedgerOffset,
applicationId: Ref.ApplicationId,
parties: Set[Ref.Party],
)(implicit loggingContext: LoggingContext): Source[CompletionStreamResponse, NotUsed] = {
val convert = convertOffset
convert(startExclusive).flatMapConcat { beginOpt =>
ledger.completions(Some(beginOpt), None, applicationId, parties).map(_._2)
}
}
override def getCompletions(
startExclusive: LedgerOffset,
endInclusive: LedgerOffset,
applicationId: Ref.ApplicationId,
parties: Set[Ref.Party],
)(implicit loggingContext: LoggingContext): Source[CompletionStreamResponse, NotUsed] = {
between(startExclusive, Some(endInclusive))((start, end) =>
ledger.completions(start, end, applicationId, parties).map(_._2)
)
}
// IndexPackagesService
override def listLfPackages()(implicit
loggingContext: LoggingContext
): Future[Map[PackageId, PackageDetails]] =
ledger.listLfPackages()
override def getLfArchive(packageId: PackageId)(implicit
loggingContext: LoggingContext
): Future[Option[Archive]] =
ledger.getLfArchive(packageId)
override def lookupActiveContract(
readers: Set[Party],
contractId: ContractId,
)(implicit
loggingContext: LoggingContext
): Future[Option[VersionedContractInstance]] =
ledger.lookupContract(contractId, readers)
override def lookupMaximumLedgerTimeAfterInterpretation(ids: Set[ContractId])(implicit
loggingContext: LoggingContext
): Future[MaximumLedgerTime] =
ledger.lookupMaximumLedgerTime(ids)
override def lookupContractKey(
readers: Set[Party],
key: GlobalKey,
)(implicit loggingContext: LoggingContext): Future[Option[ContractId]] =
ledger.lookupKey(key, readers)
// PartyManagementService
override def getParticipantId()(implicit
loggingContext: LoggingContext
): Future[Ref.ParticipantId] =
Future.successful(participantId)
override def getParties(parties: Seq[Party])(implicit
loggingContext: LoggingContext
): Future[List[PartyDetails]] =
ledger.getParties(parties)
override def listKnownParties()(implicit
loggingContext: LoggingContext
): Future[List[PartyDetails]] =
ledger.listKnownParties()
override def partyEntries(
startExclusive: Option[LedgerOffset.Absolute]
)(implicit loggingContext: LoggingContext): Source[PartyEntry, NotUsed] = {
Source
.future(concreteOffset(startExclusive))
.flatMapConcat(ledger.partyEntries)
.map {
case (_, PartyLedgerEntry.AllocationRejected(subId, _, reason)) =>
PartyEntry.AllocationRejected(subId, reason)
case (_, PartyLedgerEntry.AllocationAccepted(subId, _, details)) =>
PartyEntry.AllocationAccepted(subId, details)
}
}
override def packageEntries(
startExclusive: Option[LedgerOffset.Absolute]
)(implicit loggingContext: LoggingContext): Source[PackageEntry, NotUsed] =
Source
.future(concreteOffset(startExclusive))
.flatMapConcat(ledger.packageEntries)
.map(_._2.toDomain)
/** Looks up the current configuration, if set, and the offset from which
* to subscribe to further configuration changes.
* The offset is internal and not exposed over Ledger API.
*/
override def lookupConfiguration()(implicit
loggingContext: LoggingContext
): Future[Option[(LedgerOffset.Absolute, Configuration)]] =
ledger
.lookupLedgerConfiguration()
.map(
_.map { case (offset, config) => (toAbsolute(offset), config) }
)(ExecutionContext.parasitic)
/** Looks up the current configuration, if set, and continues to stream configuration changes.
*/
override def getLedgerConfiguration()(implicit
loggingContext: LoggingContext
): Source[LedgerConfiguration, NotUsed] = {
Source
.future(lookupConfiguration())
.flatMapConcat { optResult =>
val offset = optResult.map(_._1)
val foundConfig = optResult.map(_._2)
val initialConfig = Source(foundConfig.toList)
val configStream = configurationEntries(offset).collect {
case (_, Accepted(_, configuration)) => configuration
}
initialConfig
.concat(configStream)
.map(cfg => LedgerConfiguration(cfg.maxDeduplicationDuration))
}
}
/** Retrieve configuration entries. */
override def configurationEntries(startExclusive: Option[LedgerOffset.Absolute])(implicit
loggingContext: LoggingContext
): Source[(domain.LedgerOffset.Absolute, domain.ConfigurationEntry), NotUsed] =
Source
.future(concreteOffset(startExclusive))
.flatMapConcat(ledger.configurationEntries(_).map { case (offset, config) =>
toAbsolute(offset) -> config.toDomain
})
/** Participant pruning command */
override def prune(pruneUpToInclusive: Offset, pruneAllDivulgedContracts: Boolean)(implicit
loggingContext: LoggingContext
): Future[Unit] =
ledger.prune(pruneUpToInclusive, pruneAllDivulgedContracts)
private def concreteOffset(startExclusive: Option[LedgerOffset.Absolute]): Future[Offset] =
startExclusive
.map(off => Future.fromTry(ApiOffset.fromString(off.value)))
.getOrElse(Future.successful(Offset.beforeBegin))
override def getMeteringReportData(
from: Timestamp,
to: Option[Timestamp],
applicationId: Option[ApplicationId],
)(implicit loggingContext: LoggingContext): Future[ReportData] = {
ledger.meteringReportData(
from: Timestamp,
to: Option[Timestamp],
applicationId: Option[ApplicationId],
)
}
private def toAbsolute(offset: Offset): LedgerOffset.Absolute =
LedgerOffset.Absolute(offset.toApiString)
}

View File

@ -1,69 +0,0 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.index
import akka.stream.Materializer
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.participant.state.index.v2.IndexService
import com.daml.ledger.resources.ResourceOwner
import com.daml.lf.data.Ref
import com.daml.lf.engine.ValueEnricher
import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
import com.daml.platform.server.api.validation.ErrorFactories
import com.daml.platform.store.{DbSupport, LfValueTranslationCache}
import scala.concurrent.ExecutionContext
private[platform] object JdbcIndex {
def owner(
dbSupport: DbSupport,
ledgerId: LedgerId,
participantId: Ref.ParticipantId,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
acsIdPageSize: Int,
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
acsGlobalParallelism: Int,
acsIdQueueLimit: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
enricher: ValueEnricher,
maxContractStateCacheSize: Long,
maxContractKeyStateCacheSize: Long,
maxTransactionsInMemoryFanOutBufferSize: Long,
enableInMemoryFanOutForLedgerApi: Boolean,
)(implicit mat: Materializer, loggingContext: LoggingContext): ResourceOwner[IndexService] =
ReadOnlyLedgerBuilder(
dbSupport = dbSupport,
initialLedgerId = ledgerId,
eventsPageSize = eventsPageSize,
eventsProcessingParallelism = eventsProcessingParallelism,
acsIdPageSize = acsIdPageSize,
acsIdFetchingParallelism = acsIdFetchingParallelism,
acsContractFetchingParallelism = acsContractFetchingParallelism,
acsGlobalParallelism = acsGlobalParallelism,
acsIdQueueLimit = acsIdQueueLimit,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,
enricher = enricher,
maxContractStateCacheSize = maxContractStateCacheSize,
maxContractKeyStateCacheSize = maxContractKeyStateCacheSize,
enableInMemoryFanOutForLedgerApi = enableInMemoryFanOutForLedgerApi,
participantId = participantId,
maxTransactionsInMemoryFanOutBufferSize = maxTransactionsInMemoryFanOutBufferSize,
errorFactories = ErrorFactories(),
)(mat, loggingContext, servicesExecutionContext)
.owner()
.map { ledger =>
new LedgerBackedIndexService(
MeteredReadOnlyLedger(ledger, metrics),
participantId,
errorFactories = ErrorFactories(),
)
}
}

View File

@ -1,180 +0,0 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.index
import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.daml_lf_dev.DamlLf.Archive
import com.daml.ledger.api.domain.{LedgerId, PartyDetails}
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.daml.ledger.api.v1.transaction_service.{
GetFlatTransactionResponse,
GetTransactionResponse,
GetTransactionTreesResponse,
GetTransactionsResponse,
}
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.index.v2.MeteringStore.ReportData
import com.daml.ledger.participant.state.index.v2.{MaximumLedgerTime, PackageDetails}
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.ApplicationId
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.transaction.GlobalKey
import com.daml.lf.value.Value.{ContractId, VersionedContractInstance}
import com.daml.logging.LoggingContext
import com.daml.metrics.{Metrics, Timed}
import com.daml.platform.store.entries.{ConfigurationEntry, PackageLedgerEntry, PartyLedgerEntry}
import scala.concurrent.Future
private[index] class MeteredReadOnlyLedger(ledger: ReadOnlyLedger, metrics: Metrics)
extends ReadOnlyLedger {
override def ledgerId: LedgerId = ledger.ledgerId
override def currentHealth(): HealthStatus = ledger.currentHealth()
override def flatTransactions(
startExclusive: Option[Offset],
endInclusive: Option[Offset],
filter: Map[Ref.Party, Set[Ref.Identifier]],
verbose: Boolean,
)(implicit loggingContext: LoggingContext): Source[(Offset, GetTransactionsResponse), NotUsed] =
ledger.flatTransactions(startExclusive, endInclusive, filter, verbose)
override def transactionTrees(
startExclusive: Option[Offset],
endInclusive: Option[Offset],
requestingParties: Set[Ref.Party],
verbose: Boolean,
)(implicit
loggingContext: LoggingContext
): Source[(Offset, GetTransactionTreesResponse), NotUsed] =
ledger.transactionTrees(startExclusive, endInclusive, requestingParties, verbose)
override def ledgerEnd()(implicit loggingContext: LoggingContext): Offset = ledger.ledgerEnd()
override def completions(
startExclusive: Option[Offset],
endInclusive: Option[Offset],
applicationId: Ref.ApplicationId,
parties: Set[Ref.Party],
)(implicit loggingContext: LoggingContext): Source[(Offset, CompletionStreamResponse), NotUsed] =
ledger.completions(startExclusive, endInclusive, applicationId, parties)
override def activeContracts(
filter: Map[Ref.Party, Set[Ref.Identifier]],
verbose: Boolean,
)(implicit
loggingContext: LoggingContext
): (Source[GetActiveContractsResponse, NotUsed], Offset) =
ledger.activeContracts(filter, verbose)
override def lookupContract(
contractId: ContractId,
forParties: Set[Ref.Party],
)(implicit
loggingContext: LoggingContext
): Future[Option[VersionedContractInstance]] =
Timed.future(metrics.daml.index.lookupContract, ledger.lookupContract(contractId, forParties))
override def lookupKey(key: GlobalKey, forParties: Set[Ref.Party])(implicit
loggingContext: LoggingContext
): Future[Option[ContractId]] =
Timed.future(metrics.daml.index.lookupKey, ledger.lookupKey(key, forParties))
override def lookupFlatTransactionById(
transactionId: Ref.TransactionId,
requestingParties: Set[Ref.Party],
)(implicit loggingContext: LoggingContext): Future[Option[GetFlatTransactionResponse]] =
Timed.future(
metrics.daml.index.lookupFlatTransactionById,
ledger.lookupFlatTransactionById(transactionId, requestingParties),
)
override def lookupTransactionTreeById(
transactionId: Ref.TransactionId,
requestingParties: Set[Ref.Party],
)(implicit loggingContext: LoggingContext): Future[Option[GetTransactionResponse]] =
Timed.future(
metrics.daml.index.lookupTransactionTreeById,
ledger.lookupTransactionTreeById(transactionId, requestingParties),
)
override def lookupMaximumLedgerTime(
contractIds: Set[ContractId]
)(implicit loggingContext: LoggingContext): Future[MaximumLedgerTime] =
Timed.future(
metrics.daml.index.lookupMaximumLedgerTime,
ledger.lookupMaximumLedgerTime(contractIds),
)
override def getParties(parties: Seq[Ref.Party])(implicit
loggingContext: LoggingContext
): Future[List[PartyDetails]] =
Timed.future(metrics.daml.index.getParties, ledger.getParties(parties))
override def listKnownParties()(implicit
loggingContext: LoggingContext
): Future[List[PartyDetails]] =
Timed.future(metrics.daml.index.listKnownParties, ledger.listKnownParties())
override def partyEntries(startExclusive: Offset)(implicit
loggingContext: LoggingContext
): Source[(Offset, PartyLedgerEntry), NotUsed] =
ledger.partyEntries(startExclusive)
override def listLfPackages()(implicit
loggingContext: LoggingContext
): Future[Map[Ref.PackageId, PackageDetails]] =
Timed.future(metrics.daml.index.listLfPackages, ledger.listLfPackages())
override def getLfArchive(packageId: Ref.PackageId)(implicit
loggingContext: LoggingContext
): Future[Option[Archive]] =
Timed.future(metrics.daml.index.getLfArchive, ledger.getLfArchive(packageId))
override def packageEntries(
startExclusive: Offset
)(implicit loggingContext: LoggingContext): Source[(Offset, PackageLedgerEntry), NotUsed] =
ledger.packageEntries(startExclusive)
override def lookupLedgerConfiguration()(implicit
loggingContext: LoggingContext
): Future[Option[(Offset, Configuration)]] =
Timed.future(metrics.daml.index.lookupLedgerConfiguration, ledger.lookupLedgerConfiguration())
override def configurationEntries(
startExclusive: Offset
)(implicit loggingContext: LoggingContext): Source[(Offset, ConfigurationEntry), NotUsed] =
ledger.configurationEntries(startExclusive)
override def prune(
pruneUpToInclusive: Offset,
pruneAllDivulgedContracts: Boolean,
)(implicit loggingContext: LoggingContext): Future[Unit] =
Timed.future(
metrics.daml.index.prune,
ledger.prune(pruneUpToInclusive, pruneAllDivulgedContracts),
)
override def meteringReportData(
from: Timestamp,
to: Option[Timestamp],
applicationId: Option[ApplicationId],
)(implicit loggingContext: LoggingContext): Future[ReportData] = {
Timed.future(
metrics.daml.index.getTransactionMetering,
ledger.meteringReportData(from, to, applicationId),
)
}
}
private[platform] object MeteredReadOnlyLedger {
def apply(ledger: ReadOnlyLedger, metrics: Metrics): ReadOnlyLedger =
new MeteredReadOnlyLedger(ledger, metrics)
}

View File

@ -1,135 +0,0 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.index
import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.daml_lf_dev.DamlLf.Archive
import com.daml.ledger.api.domain.{LedgerId, PartyDetails}
import com.daml.ledger.api.health.ReportsHealth
import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.daml.ledger.api.v1.transaction_service.{
GetFlatTransactionResponse,
GetTransactionResponse,
GetTransactionTreesResponse,
GetTransactionsResponse,
}
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.index.v2.MeteringStore.ReportData
import com.daml.ledger.participant.state.index.v2.{MaximumLedgerTime, PackageDetails}
import com.daml.lf.data.Ref
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.transaction.GlobalKey
import com.daml.lf.value.Value.{ContractId, VersionedContractInstance}
import com.daml.logging.LoggingContext
import com.daml.platform.store.entries.{ConfigurationEntry, PackageLedgerEntry, PartyLedgerEntry}
import scala.concurrent.Future
/** Defines all the functionalities a Ledger needs to provide */
private[index] trait ReadOnlyLedger extends ReportsHealth {
def ledgerId: LedgerId
def flatTransactions(
startExclusive: Option[Offset],
endInclusive: Option[Offset],
filter: Map[Ref.Party, Set[Ref.Identifier]],
verbose: Boolean,
)(implicit loggingContext: LoggingContext): Source[(Offset, GetTransactionsResponse), NotUsed]
def transactionTrees(
startExclusive: Option[Offset],
endInclusive: Option[Offset],
requestingParties: Set[Ref.Party],
verbose: Boolean,
)(implicit loggingContext: LoggingContext): Source[(Offset, GetTransactionTreesResponse), NotUsed]
def ledgerEnd()(implicit loggingContext: LoggingContext): Offset
def completions(
startExclusive: Option[Offset],
endInclusive: Option[Offset],
applicationId: Ref.ApplicationId,
parties: Set[Ref.Party],
)(implicit loggingContext: LoggingContext): Source[(Offset, CompletionStreamResponse), NotUsed]
def activeContracts(
filter: Map[Ref.Party, Set[Ref.Identifier]],
verbose: Boolean,
)(implicit loggingContext: LoggingContext): (Source[GetActiveContractsResponse, NotUsed], Offset)
def lookupContract(
contractId: ContractId,
forParties: Set[Ref.Party],
)(implicit
loggingContext: LoggingContext
): Future[Option[VersionedContractInstance]]
def lookupMaximumLedgerTime(
contractIds: Set[ContractId]
)(implicit loggingContext: LoggingContext): Future[MaximumLedgerTime]
def lookupKey(key: GlobalKey, forParties: Set[Ref.Party])(implicit
loggingContext: LoggingContext
): Future[Option[ContractId]]
def lookupFlatTransactionById(
transactionId: Ref.TransactionId,
requestingParties: Set[Ref.Party],
)(implicit loggingContext: LoggingContext): Future[Option[GetFlatTransactionResponse]]
def lookupTransactionTreeById(
transactionId: Ref.TransactionId,
requestingParties: Set[Ref.Party],
)(implicit loggingContext: LoggingContext): Future[Option[GetTransactionResponse]]
// Party management
def getParties(parties: Seq[Ref.Party])(implicit
loggingContext: LoggingContext
): Future[List[PartyDetails]]
def listKnownParties()(implicit loggingContext: LoggingContext): Future[List[PartyDetails]]
def partyEntries(startExclusive: Offset)(implicit
loggingContext: LoggingContext
): Source[(Offset, PartyLedgerEntry), NotUsed]
// Package management
def listLfPackages()(implicit
loggingContext: LoggingContext
): Future[Map[Ref.PackageId, PackageDetails]]
def getLfArchive(packageId: Ref.PackageId)(implicit
loggingContext: LoggingContext
): Future[Option[Archive]]
def packageEntries(startExclusive: Offset)(implicit
loggingContext: LoggingContext
): Source[(Offset, PackageLedgerEntry), NotUsed]
// Configuration management
def lookupLedgerConfiguration()(implicit
loggingContext: LoggingContext
): Future[Option[(Offset, Configuration)]]
def configurationEntries(
startExclusive: Offset
)(implicit loggingContext: LoggingContext): Source[(Offset, ConfigurationEntry), NotUsed]
/** Performs participant ledger pruning up to and including the specified offset.
*/
def prune(pruneUpToInclusive: Offset, pruneAllDivulgedContracts: Boolean)(implicit
loggingContext: LoggingContext
): Future[Unit]
def meteringReportData(
from: Timestamp,
to: Option[Timestamp],
applicationId: Option[Ref.ApplicationId],
)(implicit loggingContext: LoggingContext): Future[ReportData]
}

View File

@ -1,185 +0,0 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.index
import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.daml_lf_dev.DamlLf
import com.daml.ledger.api.domain
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.daml.ledger.api.v1.transaction_service.{
GetFlatTransactionResponse,
GetTransactionResponse,
GetTransactionTreesResponse,
GetTransactionsResponse,
}
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.index.v2
import com.daml.ledger.participant.state.index.v2.MeteringStore.ReportData
import com.daml.ledger.participant.state.index.v2.{ContractStore, MaximumLedgerTime}
import com.daml.lf.data.Ref
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.transaction.GlobalKey
import com.daml.lf.value.Value.{ContractId, VersionedContractInstance}
import com.daml.logging.LoggingContext
import com.daml.platform.PruneBuffers
import com.daml.platform.akkastreams.dispatcher.Dispatcher
import com.daml.platform.akkastreams.dispatcher.SubSource.RangeSource
import com.daml.platform.store.appendonlydao.{LedgerDaoTransactionsReader, LedgerReadDao}
import com.daml.platform.store.entries.{ConfigurationEntry, PackageLedgerEntry, PartyLedgerEntry}
import scala.concurrent.Future
private[index] class ReadOnlyLedgerImpl(
val ledgerId: LedgerId,
ledgerDao: LedgerReadDao,
transactionsReader: LedgerDaoTransactionsReader,
contractStore: ContractStore,
pruneBuffers: PruneBuffers,
dispatcher: Dispatcher[Offset],
) extends ReadOnlyLedger {
override def currentHealth(): HealthStatus = ledgerDao.currentHealth()
override def lookupKey(key: GlobalKey, forParties: Set[Ref.Party])(implicit
loggingContext: LoggingContext
): Future[Option[ContractId]] =
contractStore.lookupContractKey(forParties, key)
override def flatTransactions(
startExclusive: Option[Offset],
endInclusive: Option[Offset],
filter: Map[Ref.Party, Set[Ref.Identifier]],
verbose: Boolean,
)(implicit loggingContext: LoggingContext): Source[(Offset, GetTransactionsResponse), NotUsed] =
dispatcher.startingAt(
startExclusive.getOrElse(Offset.beforeBegin),
RangeSource(transactionsReader.getFlatTransactions(_, _, filter, verbose)),
endInclusive,
)
override def transactionTrees(
startExclusive: Option[Offset],
endInclusive: Option[Offset],
requestingParties: Set[Ref.Party],
verbose: Boolean,
)(implicit
loggingContext: LoggingContext
): Source[(Offset, GetTransactionTreesResponse), NotUsed] =
dispatcher.startingAt(
startExclusive.getOrElse(Offset.beforeBegin),
RangeSource(
transactionsReader.getTransactionTrees(_, _, requestingParties, verbose)
),
endInclusive,
)
override def ledgerEnd()(implicit loggingContext: LoggingContext): Offset = dispatcher.getHead()
override def completions(
startExclusive: Option[Offset],
endInclusive: Option[Offset],
applicationId: Ref.ApplicationId,
parties: Set[Ref.Party],
)(implicit loggingContext: LoggingContext): Source[(Offset, CompletionStreamResponse), NotUsed] =
dispatcher.startingAt(
startExclusive.getOrElse(Offset.beforeBegin),
RangeSource(ledgerDao.completions.getCommandCompletions(_, _, applicationId, parties)),
endInclusive,
)
override def activeContracts(
filter: Map[Ref.Party, Set[Ref.Identifier]],
verbose: Boolean,
)(implicit
loggingContext: LoggingContext
): (Source[GetActiveContractsResponse, NotUsed], Offset) = {
val activeAt = ledgerEnd()
(ledgerDao.transactionsReader.getActiveContracts(activeAt, filter, verbose), activeAt)
}
override def lookupContract(
contractId: ContractId,
forParties: Set[Ref.Party],
)(implicit
loggingContext: LoggingContext
): Future[Option[VersionedContractInstance]] =
contractStore.lookupActiveContract(forParties, contractId)
override def lookupFlatTransactionById(
transactionId: Ref.TransactionId,
requestingParties: Set[Ref.Party],
)(implicit loggingContext: LoggingContext): Future[Option[GetFlatTransactionResponse]] =
ledgerDao.transactionsReader.lookupFlatTransactionById(transactionId, requestingParties)
override def lookupTransactionTreeById(
transactionId: Ref.TransactionId,
requestingParties: Set[Ref.Party],
)(implicit loggingContext: LoggingContext): Future[Option[GetTransactionResponse]] =
ledgerDao.transactionsReader.lookupTransactionTreeById(transactionId, requestingParties)
override def lookupMaximumLedgerTime(
contractIds: Set[ContractId]
)(implicit loggingContext: LoggingContext): Future[MaximumLedgerTime] =
contractStore.lookupMaximumLedgerTimeAfterInterpretation(contractIds)
override def getParties(parties: Seq[Ref.Party])(implicit
loggingContext: LoggingContext
): Future[List[domain.PartyDetails]] =
ledgerDao.getParties(parties)
override def listKnownParties()(implicit
loggingContext: LoggingContext
): Future[List[domain.PartyDetails]] =
ledgerDao.listKnownParties()
override def partyEntries(startExclusive: Offset)(implicit
loggingContext: LoggingContext
): Source[(Offset, PartyLedgerEntry), NotUsed] =
dispatcher.startingAt(startExclusive, RangeSource(ledgerDao.getPartyEntries))
override def listLfPackages()(implicit
loggingContext: LoggingContext
): Future[Map[Ref.PackageId, v2.PackageDetails]] =
ledgerDao.listLfPackages()
override def getLfArchive(packageId: Ref.PackageId)(implicit
loggingContext: LoggingContext
): Future[Option[DamlLf.Archive]] =
ledgerDao.getLfArchive(packageId)
override def packageEntries(startExclusive: Offset)(implicit
loggingContext: LoggingContext
): Source[(Offset, PackageLedgerEntry), NotUsed] =
dispatcher.startingAt(startExclusive, RangeSource(ledgerDao.getPackageEntries))
override def lookupLedgerConfiguration()(implicit
loggingContext: LoggingContext
): Future[Option[(Offset, Configuration)]] =
ledgerDao.lookupLedgerConfiguration()
override def configurationEntries(startExclusive: Offset)(implicit
loggingContext: LoggingContext
): Source[(Offset, ConfigurationEntry), NotUsed] =
dispatcher.startingAt(startExclusive, RangeSource(ledgerDao.getConfigurationEntries))
override def prune(pruneUpToInclusive: Offset, pruneAllDivulgedContracts: Boolean)(implicit
loggingContext: LoggingContext
): Future[Unit] = {
pruneBuffers(pruneUpToInclusive)
ledgerDao.prune(pruneUpToInclusive, pruneAllDivulgedContracts)
}
override def meteringReportData(
from: Timestamp,
to: Option[Timestamp],
applicationId: Option[Ref.ApplicationId],
)(implicit loggingContext: LoggingContext): Future[ReportData] = {
ledgerDao.meteringReportData(from, to, applicationId)
}
}

View File

@ -12,7 +12,7 @@ import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.transaction.GlobalKey
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.{Metrics, Timed}
import com.daml.metrics.Metrics
import com.daml.platform.store.appendonlydao.events.ContractStateEvent
import com.daml.platform.store.appendonlydao.events.ContractStateEvent.LedgerEndMarker
import com.daml.platform.store.cache.ContractKeyStateValue._
@ -147,10 +147,7 @@ private[platform] class MutableCacheBackedContractStore(
) = {
val currentCacheSequentialId = cacheIndex()._2
val fetchStateRequest =
Timed.future(
metrics.daml.index.lookupContract,
contractsReader.lookupContractState(contractId, currentCacheSequentialId),
)
contractsReader.lookupContractState(contractId, currentCacheSequentialId)
val eventualValue = fetchStateRequest.map(toContractCacheValue)
for {

View File

@ -4,13 +4,10 @@
package com.daml.ledger.participant.state.index.v2
import com.daml.ledger.api.domain.LedgerId
import com.daml.logging.LoggingContext
import scala.concurrent.Future
/** Serves as a backend to implement
* [[com.daml.ledger.api.v1.ledger_identity_service.LedgerIdentityServiceGrpc.LedgerIdentityService]]
*/
trait IdentityProvider {
def getLedgerId()(implicit loggingContext: LoggingContext): Future[LedgerId]
def ledgerId: LedgerId
}