Introduce DAML-LF value caching for transaction service (#6052)

* Introduce DAML-LF value caching for transaction service

Allows to keep the DAML-LF values in the most recently indexed events in memory,
so that they don't have to be deserialized on the client from their Protobuf encoding.

Closes #6044

CHANGELOG_BEGIN
[Sandbox] The --max-lf-value-translation-cache-entries option allows to set a
number of events for which DAML-LF values are cached. Could reduce latency in
serving transactions for consumers that are reasonably fast.
CHANGELOG_END

* Add missing dependency

* Address https://github.com/digital-asset/daml/pull/6052#discussion_r428076003

* Update ledger/sandbox/src/main/scala/com/digitalasset/platform/sandboxnext/Runner.scala

Co-authored-by: Samir Talwar <samir.talwar@digitalasset.com>

* Address https://github.com/digital-asset/daml/pull/6052#discussion_r428071324

* Address https://github.com/digital-asset/daml/pull/6052#discussion_r428076905

* Address https://github.com/digital-asset/daml/pull/6052#discussion_r428081294

* Fix fatal warnings

* //ledger/caching has to be used whenever sandbox is run

Co-authored-by: Samir Talwar <samir.talwar@digitalasset.com>
This commit is contained in:
Stefano Baghino 2020-05-26 10:33:53 +02:00 committed by GitHub
parent 9e456a1016
commit 9af85e56e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 400 additions and 168 deletions

View File

@ -25,6 +25,7 @@ da_scala_library(
"//ledger-api/rs-grpc-bridge",
"//ledger-service/jwt",
"//ledger-service/lf-value-json",
"//ledger/caching",
"//ledger/ledger-api-auth",
"//ledger/ledger-api-client",
"//ledger/ledger-api-common",

View File

@ -111,6 +111,7 @@ da_scala_test_suite(
"//ledger-api/testing-utils",
"//ledger-service/http-json",
"//ledger-service/jwt",
"//ledger/caching",
"//ledger/ledger-api-auth",
"//ledger/ledger-api-common",
"//ledger/participant-state",

View File

@ -214,6 +214,7 @@ da_scala_test_suite(
"//ledger-api/rs-grpc-bridge",
"//ledger-api/testing-utils",
"//ledger-service/utils",
"//ledger/caching",
"//ledger/ledger-api-auth",
"//ledger/ledger-api-client",
"//ledger/ledger-api-common",

View File

@ -268,6 +268,7 @@ da_scala_test(
"//bazel_tools/runfiles:scala_runfiles",
"//daml-lf/data",
"//language-support/java/bindings:bindings-java",
"//ledger/caching",
"//ledger/ledger-api-common",
"//ledger/ledger-api-domain",
"//ledger/participant-state",

View File

@ -130,6 +130,7 @@ da_scala_test(
"//language-support/scala/codegen-testing",
"//ledger-api/rs-grpc-bridge",
"//ledger-api/testing-utils",
"//ledger/caching",
"//ledger/ledger-api-client",
"//ledger/ledger-api-common",
"//ledger/participant-state",

View File

@ -140,6 +140,7 @@ da_scala_test(
"//ledger-service/jwt",
"//ledger-service/lf-value-json",
"//ledger-service/utils",
"//ledger/caching",
"//ledger/ledger-api-auth",
"//ledger/ledger-api-common",
"//ledger/participant-state",

View File

@ -39,6 +39,7 @@ da_scala_test_suite(
"//language-support/scala/bindings",
"//ledger-api/rs-grpc-bridge",
"//ledger-api/testing-utils",
"//ledger/caching",
"//libs-scala/direct-execution-context",
"@maven//:com_typesafe_akka_akka_actor_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",
@ -62,6 +63,7 @@ da_scala_test_suite(
"//language-support/scala/bindings",
"//ledger-api/rs-grpc-bridge",
"//ledger-api/testing-utils",
"//ledger/caching",
"//ledger/ledger-api-common",
"//ledger/ledger-api-domain",
"//ledger/participant-state",

View File

@ -25,6 +25,7 @@ final case class Config[Extra](
participants: Seq[ParticipantConfig],
eventsPageSize: Int,
stateValueCache: caching.Configuration,
lfValueTranslationCache: caching.Configuration,
seeding: Seeding,
metricsReporter: Option[MetricsReporter],
metricsReportingInterval: Duration,
@ -62,6 +63,7 @@ object Config {
participants = Vector.empty,
eventsPageSize = IndexConfiguration.DefaultEventsPageSize,
stateValueCache = caching.Configuration.none,
lfValueTranslationCache = caching.Configuration.none,
seeding = Seeding.Strong,
metricsReporter = None,
metricsReportingInterval = Duration.ofSeconds(10),
@ -156,6 +158,14 @@ object Config {
config.copy(stateValueCache =
config.stateValueCache.copy(maximumWeight = maximumStateValueCacheSize * 1024 * 1024)))
opt[Long]("max-lf-value-translation-cache-entries")
.optional()
.text(
s"The maximum size of the cache used to deserialize DAML-LF values, in number of allowed entries. By default, nothing is cached.")
.action((maximumLfValueTranslationCacheEntries, config) =>
config.copy(lfValueTranslationCache = config.lfValueTranslationCache.copy(
maximumWeight = maximumLfValueTranslationCacheEntries)))
private val seedingMap =
Map[String, Seeding]("testing-weak" -> Seeding.Weak, "strong" -> Seeding.Strong)

View File

@ -18,6 +18,7 @@ import com.daml.logging.LoggingContext.newLoggingContext
import com.daml.metrics.JvmMetricSet
import com.daml.platform.apiserver.{StandaloneApiServer, TimedIndexService}
import com.daml.platform.indexer.StandaloneIndexerServer
import com.daml.platform.store.dao.events.LfValueTranslation
import com.daml.resources.akka.AkkaResourceOwner
import com.daml.resources.{Resource, ResourceOwner}
@ -57,6 +58,11 @@ final class Runner[T <: ReadWriteService, Extra](
_ <- Resource.sequence(config.participants.map { participantConfig =>
val metrics = factory.createMetrics(participantConfig, config)
metrics.registry.registerAll(new JvmMetricSet)
val lfValueTranslationCache =
LfValueTranslation.Cache.newInstrumentedInstance(
configuration = config.lfValueTranslationCache,
metrics = metrics,
)
for {
_ <- config.metricsReporter.fold(Resource.unit)(
reporter =>
@ -75,6 +81,7 @@ final class Runner[T <: ReadWriteService, Extra](
readService = readService,
config = factory.indexerConfig(participantConfig, config),
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,
).acquire()
_ <- new StandaloneApiServer(
config = factory.apiServerConfig(participantConfig, config),
@ -88,6 +95,7 @@ final class Runner[T <: ReadWriteService, Extra](
metrics = metrics,
timeServiceBackend = factory.timeServiceBackend(config),
engine = sharedEngine,
lfValueTranslationCache = lfValueTranslationCache,
).acquire()
} yield ()
})

View File

@ -12,6 +12,7 @@ import akka.stream.Materializer
import akka.stream.scaladsl.Source
import ch.qos.logback.classic.Level
import com.codahale.metrics.MetricRegistry
import com.daml.caching.Cache
import com.daml.ledger.on.memory.InMemoryLedgerReaderWriter
import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState
import com.daml.ledger.participant.state.v1._
@ -194,6 +195,7 @@ class RecoveringIndexerIntegrationSpec extends AsyncWordSpec with Matchers with
restartDelay = restartDelay,
),
metrics = new Metrics(new MetricRegistry),
lfValueTranslationCache = Cache.none,
)(materializer, logCtx)
} yield participantState
}
@ -206,6 +208,7 @@ class RecoveringIndexerIntegrationSpec extends AsyncWordSpec with Matchers with
jdbcUrl = jdbcUrl,
eventsPageSize = 100,
metrics = new Metrics(new MetricRegistry),
lfValueTranslationCache = Cache.none,
)
}
}

View File

@ -25,6 +25,7 @@ da_scala_library(
"//ledger-api/rs-grpc-akka",
"//ledger-api/rs-grpc-bridge",
"//ledger-api/testing-utils",
"//ledger/caching",
"//ledger/ledger-api-client",
"//ledger/ledger-api-common",
"//ledger/ledger-api-domain",

View File

@ -230,6 +230,7 @@ da_scala_library(
"//ledger-api/rs-grpc-bridge",
"//ledger-api/testing-utils",
"//ledger-service/jwt",
"//ledger/caching",
"//ledger/ledger-api-auth",
"//ledger/ledger-api-auth-client",
"//ledger/ledger-api-client",
@ -281,6 +282,7 @@ test_deps = [
"//ledger-api/rs-grpc-bridge",
"//ledger-api/sample-service",
"//ledger-api/testing-utils",
"//ledger/caching",
"//ledger/ledger-api-auth",
"//ledger/ledger-api-client",
"//ledger/ledger-api-common",

View File

@ -30,6 +30,7 @@ import com.daml.platform.configuration.{
import com.daml.platform.index.JdbcIndex
import com.daml.platform.packages.InMemoryPackageStore
import com.daml.platform.services.time.TimeProviderType
import com.daml.platform.store.dao.events.LfValueTranslation
import com.daml.ports.Port
import com.daml.resources.{Resource, ResourceOwner}
import io.grpc.{BindableService, ServerInterceptor}
@ -53,7 +54,8 @@ final class StandaloneApiServer(
timeServiceBackend: Option[TimeServiceBackend] = None,
otherServices: immutable.Seq[BindableService] = immutable.Seq.empty,
otherInterceptors: List[ServerInterceptor] = List.empty,
engine: Engine
engine: Engine,
lfValueTranslationCache: LfValueTranslation.Cache,
)(implicit actorSystem: ActorSystem, materializer: Materializer, logCtx: LoggingContext)
extends ResourceOwner[ApiServer] {
@ -82,6 +84,7 @@ final class StandaloneApiServer(
config.jdbcUrl,
config.eventsPageSize,
metrics,
lfValueTranslationCache,
)
.map(transformIndexService)
healthChecks = new HealthChecks(

View File

@ -13,6 +13,7 @@ import com.daml.ledger.participant.state.v1.{Configuration, ParticipantId}
import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
import com.daml.platform.configuration.ServerRole
import com.daml.platform.store.dao.events.LfValueTranslation
import com.daml.resources.ResourceOwner
object JdbcIndex {
@ -24,9 +25,10 @@ object JdbcIndex {
jdbcUrl: String,
eventsPageSize: Int,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslation.Cache,
)(implicit mat: Materializer, logCtx: LoggingContext): ResourceOwner[IndexService] =
ReadOnlySqlLedger
.owner(serverRole, jdbcUrl, ledgerId, eventsPageSize, metrics)
.owner(serverRole, jdbcUrl, ledgerId, eventsPageSize, metrics, lfValueTranslationCache)
.map { ledger =>
new LedgerBackedIndexService(MeteredReadOnlyLedger(ledger, metrics), participantId) {
override def getLedgerConfiguration(): Source[v2.LedgerConfiguration, NotUsed] =

View File

@ -18,6 +18,7 @@ import com.daml.metrics.Metrics
import com.daml.platform.common.LedgerIdMismatchException
import com.daml.platform.configuration.ServerRole
import com.daml.platform.store.dao.{JdbcLedgerDao, LedgerReadDao}
import com.daml.platform.store.dao.events.LfValueTranslation
import com.daml.platform.store.{BaseLedger, ReadOnlyLedger}
import com.daml.resources.ProgramResource.StartupException
import com.daml.resources.ResourceOwner
@ -34,9 +35,16 @@ object ReadOnlySqlLedger {
ledgerId: LedgerId,
eventsPageSize: Int,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslation.Cache,
)(implicit mat: Materializer, logCtx: LoggingContext): ResourceOwner[ReadOnlyLedger] =
for {
ledgerReadDao <- JdbcLedgerDao.readOwner(serverRole, jdbcUrl, eventsPageSize, metrics)
ledgerReadDao <- JdbcLedgerDao.readOwner(
serverRole,
jdbcUrl,
eventsPageSize,
metrics,
lfValueTranslationCache,
)
factory = new Factory(ledgerReadDao)
ledger <- ResourceOwner.forFutureCloseable(() => factory.createReadOnlySqlLedger(ledgerId))
} yield ledger

View File

@ -22,6 +22,7 @@ import com.daml.platform.configuration.ServerRole
import com.daml.platform.store.dao.{JdbcLedgerDao, LedgerDao}
import com.daml.platform.store.entries.{PackageLedgerEntry, PartyLedgerEntry}
import com.daml.platform.store.FlywayMigrations
import com.daml.platform.store.dao.events.LfValueTranslation
import com.daml.resources.{Resource, ResourceOwner}
import scala.concurrent.{ExecutionContext, Future}
@ -32,6 +33,7 @@ final class JdbcIndexerFactory(
config: IndexerConfig,
readService: ReadService,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslation.Cache,
)(implicit materializer: Materializer, logCtx: LoggingContext) {
private val logger = ContextualizedLogger.get(this.getClass)
@ -59,6 +61,7 @@ final class JdbcIndexerFactory(
config.jdbcUrl,
config.eventsPageSize,
metrics,
lfValueTranslationCache,
)
_ <- ResourceOwner.forFuture(() => ledgerDao.reset())
initialLedgerEnd <- ResourceOwner.forFuture(() => initializeLedger(ledgerDao))
@ -73,6 +76,7 @@ final class JdbcIndexerFactory(
config.jdbcUrl,
config.eventsPageSize,
metrics,
lfValueTranslationCache,
)
initialLedgerEnd <- ResourceOwner.forFuture(() => initializeLedger(ledgerDao))
} yield new JdbcIndexer(initialLedgerEnd, config.participantId, ledgerDao, metrics)

View File

@ -8,6 +8,7 @@ import com.daml.ledger.participant.state.v1.ReadService
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.platform.configuration.ServerRole
import com.daml.platform.store.dao.events.LfValueTranslation
import com.daml.resources.{Resource, ResourceOwner}
import scala.concurrent.ExecutionContext
@ -16,6 +17,7 @@ final class StandaloneIndexerServer(
readService: ReadService,
config: IndexerConfig,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslation.Cache,
)(implicit materializer: Materializer, logCtx: LoggingContext)
extends ResourceOwner[Unit] {
@ -27,6 +29,7 @@ final class StandaloneIndexerServer(
config,
readService,
metrics,
lfValueTranslationCache,
)
val indexer = new RecoveringIndexer(materializer.system.scheduler, config.restartDelay)
config.startupMode match {

View File

@ -271,6 +271,18 @@ object Cli {
.text(
"The maximum number of parallel command submissions. Only applicable to sandbox-classic.")
opt[Long]("max-lf-value-translation-cache-entries")
.optional()
.text(
s"The maximum size of the cache used to deserialize DAML-LF values, in number of allowed entries. By default, nothing is cached.")
.action(
(maximumLfValueTranslationCacheEntries, config) =>
config.copy(
lfValueTranslationCacheConfiguration = config.lfValueTranslationCacheConfiguration
.copy(maximumWeight = maximumLfValueTranslationCacheEntries)
)
)
help("help").text("Print the usage text")
checkConfig(c => {

View File

@ -8,6 +8,7 @@ import java.nio.file.Path
import java.time.Duration
import ch.qos.logback.classic.Level
import com.daml.caching
import com.daml.ledger.api.auth.AuthService
import com.daml.ledger.api.tls.TlsConfiguration
import com.daml.ledger.participant.state.v1.SeedService.Seeding
@ -40,6 +41,7 @@ final case class SandboxConfig(
metricsReporter: Option[MetricsReporter],
metricsReportingInterval: Duration,
eventsPageSize: Int,
lfValueTranslationCacheConfiguration: caching.Configuration,
)
object SandboxConfig {
@ -51,6 +53,8 @@ object SandboxConfig {
val DefaultTimeProviderType: TimeProviderType = TimeProviderType.WallClock
val DefaultLfValueTranslationCacheConfiguration = caching.Configuration.none
lazy val nextDefault: SandboxConfig =
SandboxConfig(
address = None,
@ -73,6 +77,7 @@ object SandboxConfig {
metricsReporter = None,
metricsReportingInterval = Duration.ofSeconds(10),
eventsPageSize = DefaultEventsPageSize,
lfValueTranslationCacheConfiguration = DefaultLfValueTranslationCacheConfiguration,
)
lazy val default: SandboxConfig =

View File

@ -38,6 +38,7 @@ import com.daml.platform.sandbox.metrics.MetricsReporting
import com.daml.platform.sandbox.services.SandboxResetService
import com.daml.platform.sandboxnext.Runner._
import com.daml.platform.services.time.TimeProviderType
import com.daml.platform.store.dao.events.LfValueTranslation
import com.daml.ports.Port
import com.daml.resources.akka.AkkaResourceOwner
import com.daml.resources.{ResettableResourceOwner, Resource, ResourceOwner}
@ -121,6 +122,10 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] {
config.metricsReporter,
config.metricsReportingInterval,
)
lfValueTranslationCache = LfValueTranslation.Cache.newInstrumentedInstance(
configuration = config.lfValueTranslationCacheConfiguration,
metrics = metrics,
)
timeServiceBackend = timeProviderType match {
case TimeProviderType.Static =>
Some(TimeServiceBackend.simple(Instant.EPOCH))
@ -167,6 +172,7 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] {
allowExistingSchema = true,
),
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,
)
authService = config.authService.getOrElse(AuthServiceWildcard)
promise = Promise[Unit]
@ -213,6 +219,7 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] {
timeServiceBackend = timeServiceBackend,
otherServices = List(resetService),
otherInterceptors = List(resetService),
lfValueTranslationCache = lfValueTranslationCache,
)
_ = promise.completeWith(apiServer.servicesClosed())
} yield {

View File

@ -12,6 +12,7 @@ import akka.stream.scaladsl.Source
import anorm.SqlParser._
import anorm.ToStatement.optionToStatement
import anorm.{BatchSql, Macro, NamedParameter, ResultSetParser, RowParser, SQL, SqlParser}
import com.daml.caching.Cache
import com.daml.daml_lf_dev.DamlLf.Archive
import com.daml.ledger.api.domain
import com.daml.ledger.api.domain.{LedgerId, PartyDetails}
@ -40,6 +41,7 @@ import com.daml.platform.store._
import com.daml.platform.store.dao.JdbcLedgerDao.{H2DatabaseQueries, PostgresQueries}
import com.daml.platform.store.dao.events.{
ContractsReader,
LfValueTranslation,
PostCommitValidation,
TransactionsReader,
TransactionsWriter
@ -84,6 +86,7 @@ private class JdbcLedgerDao(
eventsPageSize: Int,
performPostCommitValidation: Boolean,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslation.Cache,
)(implicit logCtx: LoggingContext)
extends LedgerDao {
@ -873,11 +876,14 @@ private class JdbcLedgerDao(
val _ = SQL(queries.SQL_TRUNCATE_TABLES).execute()
}
private val translation: LfValueTranslation =
new LfValueTranslation(lfValueTranslationCache)
private val transactionsWriter: TransactionsWriter =
new TransactionsWriter(dbType, metrics)
new TransactionsWriter(dbType, metrics, translation)
override val transactionsReader: TransactionsReader =
new TransactionsReader(dbDispatcher, eventsPageSize, metrics)(executionContext)
new TransactionsReader(dbDispatcher, eventsPageSize, metrics, translation)(executionContext)
private val contractsReader: ContractsReader =
ContractsReader(dbDispatcher, dbType, metrics)(executionContext)
@ -907,23 +913,39 @@ object JdbcLedgerDao {
jdbcUrl: String,
eventsPageSize: Int,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslation.Cache,
)(implicit logCtx: LoggingContext): ResourceOwner[LedgerReadDao] = {
val maxConnections = DefaultNumberOfShortLivedConnections
owner(serverRole, jdbcUrl, maxConnections, eventsPageSize, validate = false, metrics)
.map(new MeteredLedgerReadDao(_, metrics))
owner(
serverRole,
jdbcUrl,
maxConnections,
eventsPageSize,
validate = false,
metrics,
lfValueTranslationCache,
).map(new MeteredLedgerReadDao(_, metrics))
}
def writeOwner(
serverRole: ServerRole,
jdbcUrl: String,
eventsPageSize: Int,
metrics: Metrics
metrics: Metrics,
lfValueTranslationCache: LfValueTranslation.Cache,
)(implicit logCtx: LoggingContext): ResourceOwner[LedgerDao] = {
val dbType = DbType.jdbcType(jdbcUrl)
val maxConnections =
if (dbType.supportsParallelWrites) DefaultNumberOfShortLivedConnections else 1
owner(serverRole, jdbcUrl, maxConnections, eventsPageSize, validate = false, metrics)
.map(new MeteredLedgerDao(_, metrics))
owner(
serverRole,
jdbcUrl,
maxConnections,
eventsPageSize,
validate = false,
metrics,
lfValueTranslationCache,
).map(new MeteredLedgerDao(_, metrics))
}
def validatingWriteOwner(
@ -935,7 +957,7 @@ object JdbcLedgerDao {
val dbType = DbType.jdbcType(jdbcUrl)
val maxConnections =
if (dbType.supportsParallelWrites) DefaultNumberOfShortLivedConnections else 1
owner(serverRole, jdbcUrl, maxConnections, eventsPageSize, validate = true, metrics)
owner(serverRole, jdbcUrl, maxConnections, eventsPageSize, validate = true, metrics, Cache.none)
.map(new MeteredLedgerDao(_, metrics))
}
@ -946,6 +968,7 @@ object JdbcLedgerDao {
eventsPageSize: Int,
validate: Boolean,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslation.Cache,
)(implicit logCtx: LoggingContext): ResourceOwner[LedgerDao] =
for {
dbDispatcher <- DbDispatcher.owner(serverRole, jdbcUrl, maxConnections, metrics)
@ -958,7 +981,8 @@ object JdbcLedgerDao {
ExecutionContext.fromExecutor(executor),
eventsPageSize,
validate,
metrics
metrics,
lfValueTranslationCache,
)
sealed trait Queries {

View File

@ -86,9 +86,11 @@ private[events] sealed abstract class ContractsTable extends PostCommitValidatio
val deletions: Option[(Set[ContractId], BatchSql)],
val transientContracts: Set[ContractId],
) {
def applySerialization(): SerializedBatches =
def applySerialization(lfValueTranslation: LfValueTranslation): SerializedBatches =
new SerializedBatches(
insertions = insertions.map { case (ids, rawBatch) => (ids, rawBatch.applySerialization()) },
insertions = insertions.map {
case (ids, rawBatch) => (ids, rawBatch.applySerialization(lfValueTranslation))
},
deletions = deletions,
transientContracts = transientContracts,
)

View File

@ -76,10 +76,12 @@ private[events] trait EventsTableInsert { this: EventsTable =>
exercises: Option[RawBatch],
archives: Option[BatchSql],
) {
def applySerialization(): SerializedBatches =
def applySerialization(
lfValueTranslation: LfValueTranslation,
): SerializedBatches =
new SerializedBatches(
creates = creates.map(_.applySerialization()),
exercises = exercises.map(_.applySerialization()),
creates = creates.map(_.applySerialization(lfValueTranslation)),
exercises = exercises.map(_.applySerialization(lfValueTranslation)),
archives = archives,
)
}

View File

@ -0,0 +1,220 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.store.dao.events
import anorm.NamedParameter
import com.daml.caching
import com.daml.ledger.api.v1.value.{Record => ApiRecord, Value => ApiValue}
import com.daml.ledger.EventId
import com.daml.ledger.api.v1.event.{CreatedEvent, ExercisedEvent}
import com.daml.metrics.Metrics
import com.daml.platform.participant.util.LfEngineToApi
import com.daml.platform.store.dao.events.{Value => LfValue}
import com.daml.platform.store.serialization.ValueSerializer
final class LfValueTranslation(val cache: LfValueTranslation.Cache) {
private def cantSerialize(attribute: String, forContract: ContractId): String =
s"Cannot serialize $attribute for ${forContract.coid}"
private def serializeCreateArgOrThrow(contractId: ContractId, arg: LfValue): Array[Byte] =
ValueSerializer.serializeValue(
value = arg,
errorContext = cantSerialize(attribute = "create argument", forContract = contractId),
)
private def serializeCreateArgOrThrow(c: Create): Array[Byte] =
serializeCreateArgOrThrow(c.coid, c.coinst.arg)
private def serializeNullableKeyOrThrow(c: Create): Option[Array[Byte]] =
c.key.map(
k =>
ValueSerializer.serializeValue(
value = k.key,
errorContext = cantSerialize(attribute = "key", forContract = c.coid),
)
)
private def serializeExerciseArgOrThrow(e: Exercise): Array[Byte] =
ValueSerializer.serializeValue(
value = e.chosenValue,
errorContext = cantSerialize(attribute = "exercise argument", forContract = e.targetCoid),
)
private def serializeNullableExerciseResultOrThrow(e: Exercise): Option[Array[Byte]] =
e.exerciseResult.map(
exerciseResult =>
ValueSerializer.serializeValue(
value = exerciseResult,
errorContext = cantSerialize(attribute = "exercise result", forContract = e.targetCoid),
)
)
// Doesn't go through caching, for now caching is limited to events
def serialize(contractId: ContractId, createArgument: LfValue): NamedParameter =
("create_argument", serializeCreateArgOrThrow(contractId, createArgument))
def serialize(eventId: EventId, create: Create): Vector[NamedParameter] = {
cache.put(
key = LfValueTranslation.Cache.Key(eventId),
value = LfValueTranslation.Cache.Value.Create(create.coinst.arg, create.key.map(_.key)),
)
Vector[NamedParameter](
"create_argument" -> serializeCreateArgOrThrow(create),
"create_key_value" -> serializeNullableKeyOrThrow(create),
)
}
def serialize(eventId: EventId, exercise: Exercise): Vector[NamedParameter] = {
cache.put(
key = LfValueTranslation.Cache.Key(eventId),
value = LfValueTranslation.Cache.Value.Exercise(exercise.chosenValue, exercise.exerciseResult),
)
Vector[NamedParameter](
"exercise_argument" -> serializeExerciseArgOrThrow(exercise),
"exercise_result" -> serializeNullableExerciseResultOrThrow(exercise),
)
}
private def toApiValue(
value: LfValue,
verbose: Boolean,
attribute: => String,
): ApiValue =
LfEngineToApi.assertOrRuntimeEx(
failureContext = s"attempting to deserialize persisted $attribute to value",
LfEngineToApi
.lfVersionedValueToApiValue(
verbose = verbose,
value = value,
),
)
private def toApiRecord(
value: LfValue,
verbose: Boolean,
attribute: => String,
): ApiRecord =
LfEngineToApi.assertOrRuntimeEx(
failureContext = s"attempting to deserialize persisted $attribute to record",
LfEngineToApi
.lfVersionedValueToApiRecord(
verbose = verbose,
recordValue = value,
),
)
def deserialize[E](raw: Raw.Created[E], verbose: Boolean): CreatedEvent = {
val key = LfValueTranslation.Cache.Key(raw.partial.eventId)
val create =
cache
.getIfPresent(key)
.getOrElse(
LfValueTranslation.Cache.Value.Create(
argument = ValueSerializer.deserializeValue(raw.createArgument),
key = raw.createKeyValue.map(ValueSerializer.deserializeValue)
)
)
.assertCreate()
raw.partial.copy(
createArguments = Some(
toApiRecord(
value = create.argument,
verbose = verbose,
attribute = "create argument",
)
),
contractKey = create.key.map(
key =>
toApiValue(
value = key,
verbose = verbose,
attribute = "create key",
)
),
)
}
def deserialize(raw: Raw.TreeEvent.Exercised, verbose: Boolean): ExercisedEvent = {
val key = LfValueTranslation.Cache.Key(raw.partial.eventId)
val exercise =
cache
.getIfPresent(key)
.getOrElse(
LfValueTranslation.Cache.Value.Exercise(
argument = ValueSerializer.deserializeValue(raw.exerciseArgument),
result = raw.exerciseResult.map(ValueSerializer.deserializeValue)
)
)
.assertExercise()
raw.partial.copy(
choiceArgument = Some(
toApiValue(
value = exercise.argument,
verbose = verbose,
attribute = "exercise argument",
)
),
exerciseResult = exercise.result.map(
result =>
toApiValue(
value = result,
verbose = verbose,
attribute = "exercise result",
)
),
)
}
}
object LfValueTranslation {
type Cache = caching.Cache[Cache.Key, Cache.Value]
object Cache {
private implicit object `Key Weight` extends caching.Weight[Key] {
override def weigh(value: Key): caching.Cache.Size =
0 // make sure that only the value is counted
}
private implicit object `Value Weight` extends caching.Weight[Value] {
override def weigh(value: Value): caching.Cache.Size =
1 // TODO replace this with something to avoid weights entirely
}
def newInstance(configuration: caching.Configuration): Cache =
caching.Cache.from(configuration)
def newInstrumentedInstance(configuration: caching.Configuration, metrics: Metrics): Cache =
caching.Cache.from(
configuration = configuration,
metrics = metrics.daml.index.db.translation.cache,
)
final class UnexpectedTypeException(value: Value)
extends RuntimeException(s"Unexpected value $value")
final case class Key(eventId: String)
sealed abstract class Value {
def assertCreate(): Value.Create
def assertExercise(): Value.Exercise
}
object Value {
final case class Create(argument: LfValue, key: Option[LfValue]) extends Value {
override def assertCreate(): Create = this
override def assertExercise(): Exercise = throw new UnexpectedTypeException(this)
}
final case class Exercise(argument: LfValue, result: Option[LfValue]) extends Value {
override def assertCreate(): Create = throw new UnexpectedTypeException(this)
override def assertExercise(): Exercise = this
}
}
}
}

View File

@ -12,9 +12,7 @@ import com.daml.ledger.api.v1.event.{
ExercisedEvent => PbExercisedEvent
}
import com.daml.ledger.api.v1.transaction.{TreeEvent => PbTreeEvent}
import com.daml.ledger.api.v1.value.{Record => ApiRecord, Value => ApiValue}
import com.daml.platform.participant.util.LfEngineToApi
import com.daml.platform.store.serialization.ValueSerializer
/**
* An event as it's fetched from the participant index, before
@ -28,73 +26,31 @@ sealed trait Raw[+E] {
* Fill the blanks left in the raw event by running
* the deserialization on contained values.
*
* @param lfValueTranslation The delegate in charge of applying deserialization
* @param verbose If true, field names of records will be included
*/
def applyDeserialization(verbose: Boolean): E
def applyDeserialization(lfValueTranslation: LfValueTranslation, verbose: Boolean): E
}
object Raw {
private def toApiValue(
value: InputStream,
verbose: Boolean,
attribute: => String,
): ApiValue =
LfEngineToApi.assertOrRuntimeEx(
failureContext = s"attempting to deserialize persisted $attribute to value",
LfEngineToApi
.lfVersionedValueToApiValue(
verbose = verbose,
value = ValueSerializer.deserializeValue(value),
),
)
private def toApiRecord(
value: InputStream,
verbose: Boolean,
attribute: => String,
): ApiRecord =
LfEngineToApi.assertOrRuntimeEx(
failureContext = s"attempting to deserialize persisted $attribute to record",
LfEngineToApi
.lfVersionedValueToApiRecord(
verbose = verbose,
recordValue = ValueSerializer.deserializeValue(value),
),
)
/**
* Since created events can be both a flat event or a tree event
* we share common code between the two variants here. What's left
* out is wrapping the result in the proper envelope.
*/
private[events] sealed abstract class Created[E] private[Raw] (
raw: PbCreatedEvent,
createArgument: InputStream,
createKeyValue: Option[InputStream],
private[events] sealed abstract class Created[E](
val partial: PbCreatedEvent,
val createArgument: InputStream,
val createKeyValue: Option[InputStream],
) extends Raw[E] {
protected def wrapInEvent(event: PbCreatedEvent): E
final override def applyDeserialization(verbose: Boolean): E =
wrapInEvent(
raw.copy(
contractKey = createKeyValue.map(
key =>
toApiValue(
value = key,
verbose = verbose,
attribute = "create key",
)
),
createArguments = Some(
toApiRecord(
value = createArgument,
verbose = verbose,
attribute = "create argument",
)
),
)
)
final override def applyDeserialization(
lfValueTranslation: LfValueTranslation,
verbose: Boolean,
): E =
wrapInEvent(lfValueTranslation.deserialize(this, verbose))
}
private object Created {
@ -120,15 +76,7 @@ object Raw {
)
}
/**
* Defines a couple of methods on flat events to allow the events
* to be manipulated before running deserialization (specifically,
* removing transient contracts).
*/
sealed trait FlatEvent extends Raw[PbFlatEvent] {
def isCreated: Boolean
def contractId: String
}
sealed trait FlatEvent extends Raw[PbFlatEvent]
object FlatEvent {
@ -138,8 +86,6 @@ object Raw {
createKeyValue: Option[InputStream],
) extends Raw.Created[PbFlatEvent](raw, createArgument, createKeyValue)
with FlatEvent {
override val isCreated: Boolean = true
override val contractId: String = raw.contractId
override protected def wrapInEvent(event: PbCreatedEvent): PbFlatEvent =
PbFlatEvent(PbFlatEvent.Event.Created(event))
}
@ -177,9 +123,10 @@ object Raw {
final class Archived private[Raw] (
raw: PbArchivedEvent,
) extends FlatEvent {
override val isCreated: Boolean = false
override val contractId: String = raw.contractId
override def applyDeserialization(verbose: Boolean): PbFlatEvent =
override def applyDeserialization(
lfValueTranslation: LfValueTranslation,
verbose: Boolean,
): PbFlatEvent =
PbFlatEvent(PbFlatEvent.Event.Archived(raw))
}
@ -207,7 +154,7 @@ object Raw {
object TreeEvent {
final class Created private[Raw] (
final class Created(
raw: PbCreatedEvent,
createArgument: InputStream,
createKeyValue: Option[InputStream],
@ -245,31 +192,15 @@ object Raw {
}
final class Exercised(
base: PbExercisedEvent,
exerciseArgument: InputStream,
exerciseResult: Option[InputStream],
val partial: PbExercisedEvent,
val exerciseArgument: InputStream,
val exerciseResult: Option[InputStream],
) extends TreeEvent {
override def applyDeserialization(verbose: Boolean): PbTreeEvent =
PbTreeEvent(
PbTreeEvent.Kind.Exercised(
base.copy(
choiceArgument = Some(
toApiValue(
value = exerciseArgument,
verbose = verbose,
attribute = "exercise argument",
)
),
exerciseResult = exerciseResult.map(
result =>
toApiValue(
value = result,
verbose = verbose,
attribute = "exercise result",
)
),
)
))
override def applyDeserialization(
lfValueTranslation: LfValueTranslation,
verbose: Boolean,
): PbTreeEvent =
PbTreeEvent(PbTreeEvent.Kind.Exercised(lfValueTranslation.deserialize(this, verbose)))
}
object Exercised {
@ -286,7 +217,7 @@ object Raw {
eventWitnesses: Array[String],
): Raw.TreeEvent.Exercised =
new Raw.TreeEvent.Exercised(
base = PbExercisedEvent(
partial = PbExercisedEvent(
eventId = eventId,
contractId = contractId,
templateId = Some(LfEngineToApi.toApiIdentifier(templateId)),

View File

@ -11,17 +11,20 @@ import com.daml.ledger.{ApplicationId, CommandId, TransactionId, WorkflowId}
import com.daml.platform.events.EventIdFormatter.fromTransactionId
import com.daml.platform.store.Conversions._
import com.daml.platform.store.dao.events.RawBatch.PartialParameters
import com.daml.platform.store.serialization.ValueSerializer
private[events] final class RawBatch(query: String, parameters: Vector[PartialParameters]) {
def applySerialization(): Vector[Vector[NamedParameter]] =
parameters.map(_.applySerialization())
def applySerialization(
lfValueTranslation: LfValueTranslation,
): Vector[Vector[NamedParameter]] =
parameters.map(_.applySerialization(lfValueTranslation))
}
private[events] object RawBatch {
sealed abstract class PartialParameters {
def applySerialization(): Vector[NamedParameter]
def applySerialization(
lfValueTranslation: LfValueTranslation,
): Vector[NamedParameter]
}
final class Contract(
@ -42,12 +45,10 @@ private[events] object RawBatch {
"create_key_hash" -> key.map(_.hash),
)
override def applySerialization(): Vector[NamedParameter] =
partial :+
("create_argument" -> ValueSerializer.serializeValue(
value = createArgument,
errorContext = s"Cannot serialize create argument for ${contractId.coid}",
): NamedParameter)
override def applySerialization(
lfValueTranslation: LfValueTranslation,
): Vector[NamedParameter] =
partial :+ lfValueTranslation.serialize(contractId, createArgument)
}
sealed abstract class Event(
@ -60,9 +61,10 @@ private[events] object RawBatch {
ledgerEffectiveTime: Instant,
offset: Offset,
) extends PartialParameters {
final protected val eventId = fromTransactionId(transactionId, nodeId)
final protected val base: Vector[NamedParameter] =
Vector[NamedParameter](
"event_id" -> fromTransactionId(transactionId, nodeId),
"event_id" -> eventId,
"event_offset" -> offset,
"transaction_id" -> transactionId,
"workflow_id" -> workflowId,
@ -104,11 +106,10 @@ private[events] object RawBatch {
"create_observers" -> create.stakeholders.diff(create.signatories).toArray[String],
"create_agreement_text" -> Some(create.coinst.agreementText).filter(_.nonEmpty),
)
override def applySerialization(): Vector[NamedParameter] =
partial ++ Vector[NamedParameter](
"create_argument" -> serializeCreateArgOrThrow(create),
"create_key_value" -> serializeNullableKeyOrThrow(create),
)
override def applySerialization(
lfValueTranslation: LfValueTranslation,
): Vector[NamedParameter] =
partial ++ lfValueTranslation.serialize(eventId, create)
}
final class Exercised(
@ -142,46 +143,12 @@ private[events] object RawBatch {
.map(fromTransactionId(transactionId, _))
.toArray[String],
)
override def applySerialization(): Vector[NamedParameter] =
partial ++ Vector[NamedParameter](
"exercise_argument" -> serializeExerciseArgOrThrow(exercise),
"exercise_result" -> serializeNullableExerciseResultOrThrow(exercise),
)
override def applySerialization(
lfValueTranslation: LfValueTranslation,
): Vector[NamedParameter] =
partial ++ lfValueTranslation.serialize(eventId, exercise)
}
private def cantSerialize(attribute: String, forContract: ContractId): String =
s"Cannot serialize $attribute for ${forContract.coid}"
private def serializeCreateArgOrThrow(c: Create): Array[Byte] =
ValueSerializer.serializeValue(
value = c.coinst.arg,
errorContext = cantSerialize(attribute = "create argument", forContract = c.coid),
)
private def serializeNullableKeyOrThrow(c: Create): Option[Array[Byte]] =
c.key.map(
k =>
ValueSerializer.serializeValue(
value = k.key,
errorContext = cantSerialize(attribute = "key", forContract = c.coid),
)
)
private def serializeExerciseArgOrThrow(e: Exercise): Array[Byte] =
ValueSerializer.serializeValue(
value = e.chosenValue,
errorContext = cantSerialize(attribute = "exercise argument", forContract = e.targetCoid),
)
private def serializeNullableExerciseResultOrThrow(e: Exercise): Option[Array[Byte]] =
e.exerciseResult.map(
exerciseResult =>
ValueSerializer.serializeValue(
value = exerciseResult,
errorContext = cantSerialize(attribute = "exercise result", forContract = e.targetCoid),
)
)
}
}

View File

@ -24,12 +24,14 @@ import scala.concurrent.{ExecutionContext, Future}
* @param dispatcher Executes the queries prepared by this object
* @param executionContext Runs transformations on data fetched from the database, including DAML-LF value deserialization
* @param pageSize The number of events to fetch at a time the database when serving streaming calls
* @param lfValueTranslation The delegate in charge of translating serialized DAML-LF values
* @see [[PaginatingAsyncStream]]
*/
private[dao] final class TransactionsReader(
dispatcher: DbDispatcher,
pageSize: Int,
metrics: Metrics,
lfValueTranslation: LfValueTranslation,
)(implicit executionContext: ExecutionContext) {
private val dbMetrics = metrics.daml.index.db
@ -41,7 +43,7 @@ private[dao] final class TransactionsReader(
ApiOffset.assertFromString(response.transactions.head.offset)
private def deserializeEvent[E](verbose: Boolean)(entry: EventsTable.Entry[Raw[E]]): Future[E] =
Future(entry.event.applyDeserialization(verbose))
Future(entry.event.applyDeserialization(lfValueTranslation, verbose))
private def deserializeEntry[E](verbose: Boolean)(
entry: EventsTable.Entry[Raw[E]],

View File

@ -53,6 +53,7 @@ private[dao] object TransactionsWriter {
private[dao] final class TransactionsWriter(
dbType: DbType,
metrics: Metrics,
lfValueTranslation: LfValueTranslation,
) {
private val contractsTable = ContractsTable(dbType)
@ -193,7 +194,10 @@ private[dao] final class TransactionsWriter(
val (serializedEventBatches, serializedContractBatches) =
Timed.value(
metrics.daml.index.db.storeTransactionDao.translationTimer,
(rawEventBatches.applySerialization(), rawContractBatches.applySerialization())
(
rawEventBatches.applySerialization(lfValueTranslation),
rawContractBatches.applySerialization(lfValueTranslation)
)
)
val eventBatches = serializedEventBatches.applyBatching()

View File

@ -4,6 +4,7 @@
package com.daml.platform.store.dao
import com.codahale.metrics.MetricRegistry
import com.daml.caching.Cache
import com.daml.ledger.participant.state.v1.Offset
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
@ -30,6 +31,7 @@ private[dao] trait JdbcLedgerDaoBackend extends AkkaBeforeAndAfterAll { this: Su
jdbcUrl = jdbcUrl,
eventsPageSize = 100,
metrics = new Metrics(new MetricRegistry),
lfValueTranslationCache = Cache.none,
)
protected final var ledgerDao: LedgerDao = _

View File

@ -84,6 +84,7 @@ da_scala_test(
"//language-support/scala/bindings-akka",
"//ledger-api/rs-grpc-bridge",
"//ledger-service/jwt",
"//ledger/caching",
"//ledger/ledger-api-auth",
"//ledger/ledger-api-common",
"//ledger/participant-state",

View File

@ -114,6 +114,7 @@ da_scala_test(
"//language-support/scala/bindings-akka",
"//ledger-api/rs-grpc-bridge",
"//ledger-api/testing-utils",
"//ledger/caching",
"//ledger/ledger-api-auth",
"//ledger/ledger-api-common",
"//ledger/ledger-api-domain",