From 8b972fb1d6ae520982212381abb3b61306b0e0ab Mon Sep 17 00:00:00 2001 From: Samir Talwar Date: Mon, 29 Jun 2020 15:24:07 +0200 Subject: [PATCH] sandbox: Inject the dispatcher into the BaseLedger. (#6497) * sandbox: Create proper `ResourceOwner` implementations. This allows us to use the resource acquisition execution context, rather than using DirectExecutionContext. CHANGELOG_BEGIN CHANGELOG_END * sandbox: Simplify the construction of SqlLedger. * sandbox: Inject the dispatcher into the BaseLedger. * sandbox: Make sure the SqlLedger objects are closed. Tiny regression. No one noticed. It's OK. * sandbox: Simplify ReadOnlySqlLedger. Co-Authored-By: Stefano Baghino <43749967+stefanobaghino-da@users.noreply.github.com> * sandbox: Pull out functions to make SqlLedger.Owner easier to read. Co-Authored-By: Stefano Baghino <43749967+stefanobaghino-da@users.noreply.github.com> * ledger-api-common: Factor out wrapping Dispatcher in a ResourceOwner. * sandbox: Move the PersistenceQueue into a ResourceOwner. * ledger-api-common: Add a comma. Co-authored-by: Stefano Baghino <43749967+stefanobaghino-da@users.noreply.github.com> Co-authored-by: Stefano Baghino <43749967+stefanobaghino-da@users.noreply.github.com> --- ledger/ledger-api-common/BUILD.bazel | 1 + .../akkastreams/dispatcher/Dispatcher.scala | 12 +- .../com/daml/ledger/on/memory/package.scala | 12 +- .../ledger/on/sql/SqlLedgerReaderWriter.scala | 33 +- .../platform/index/JdbcIndex.scala | 21 +- .../platform/index/ReadOnlySqlLedger.scala | 101 ++-- .../stores/SandboxIndexAndWriteService.scala | 42 +- .../sandbox/stores/ledger/sql/SqlLedger.scala | 507 +++++++++--------- .../platform/store/BaseLedger.scala | 36 +- .../platform/sandbox/LedgerResource.scala | 4 +- .../stores/ledger/sql/SqlLedgerSpec.scala | 38 +- 11 files changed, 400 insertions(+), 407 deletions(-) diff --git a/ledger/ledger-api-common/BUILD.bazel b/ledger/ledger-api-common/BUILD.bazel index 464c020f37..e3c5a5d738 100644 --- a/ledger/ledger-api-common/BUILD.bazel +++ b/ledger/ledger-api-common/BUILD.bazel @@ -32,6 +32,7 @@ da_scala_library( "//ledger/ledger-api-health", "//ledger/metrics", "//libs-scala/direct-execution-context", + "//libs-scala/resources", "@maven//:com_github_ghik_silencer_lib_2_12_11", "@maven//:com_typesafe_akka_akka_actor_2_12", "@maven//:com_typesafe_akka_akka_stream_2_12", diff --git a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/akkastreams/dispatcher/Dispatcher.scala b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/akkastreams/dispatcher/Dispatcher.scala index 57e1f476f6..894609cd9f 100644 --- a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/akkastreams/dispatcher/Dispatcher.scala +++ b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/akkastreams/dispatcher/Dispatcher.scala @@ -5,6 +5,7 @@ package com.daml.platform.akkastreams.dispatcher import akka.NotUsed import akka.stream.scaladsl.Source +import com.daml.resources.ResourceOwner /** * A fanout signaller, representing a stream of external updates, @@ -43,6 +44,15 @@ object Dispatcher { def apply[Index: Ordering]( name: String, zeroIndex: Index, - headAtInitialization: Index): Dispatcher[Index] = + headAtInitialization: Index, + ): Dispatcher[Index] = new DispatcherImpl[Index](name: String, zeroIndex, headAtInitialization) + + def owner[Index: Ordering]( + name: String, + zeroIndex: Index, + headAtInitialization: Index, + ): ResourceOwner[Dispatcher[Index]] = + ResourceOwner.forCloseable(() => apply(name, zeroIndex, headAtInitialization)) + } diff --git a/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/package.scala b/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/package.scala index 17044f098d..1cd0a035a4 100644 --- a/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/package.scala +++ b/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/package.scala @@ -12,11 +12,9 @@ package object memory { private[memory] val StartIndex: Index = 0 private[memory] def dispatcherOwner: ResourceOwner[Dispatcher[Index]] = - ResourceOwner.forCloseable( - () => - Dispatcher( - "in-memory-key-value-participant-state", - zeroIndex = StartIndex, - headAtInitialization = StartIndex, - )) + Dispatcher.owner( + name = "in-memory-key-value-participant-state", + zeroIndex = StartIndex, + headAtInitialization = StartIndex, + ) } diff --git a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriter.scala b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriter.scala index 94d7c8ab19..c7d938fbf3 100644 --- a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriter.scala +++ b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriter.scala @@ -139,7 +139,7 @@ object SqlLedgerReaderWriter { if (resetOnStartup) uninitializedDatabase.migrateAndReset() else Future.successful(uninitializedDatabase.migrate())) ledgerId <- Resource.fromFuture(updateOrRetrieveLedgerId(initialLedgerId, database)) - dispatcher <- ResourceOwner.forFutureCloseable(() => newDispatcher(database)).acquire() + dispatcher <- new DispatcherOwner(database).acquire() } yield new SqlLedgerReaderWriter( ledgerId, @@ -177,13 +177,26 @@ object SqlLedgerReaderWriter { }) } - private def newDispatcher(database: Database)( - implicit executionContext: ExecutionContext, - logCtx: LoggingContext, - ): Future[Dispatcher[Index]] = - database - .inReadTransaction("read_head") { queries => - Future.fromTry(queries.selectLatestLogEntryId().map(_.map(_ + 1).getOrElse(StartIndex))) - } - .map(head => Dispatcher("sql-participant-state", StartIndex, head)) + private final class DispatcherOwner(database: Database)(implicit logCtx: LoggingContext) + extends ResourceOwner[Dispatcher[Index]] { + override def acquire()( + implicit executionContext: ExecutionContext + ): Resource[Dispatcher[Index]] = + for { + head <- Resource.fromFuture( + database + .inReadTransaction("read_head") { queries => + Future.fromTry( + queries.selectLatestLogEntryId().map(_.map(_ + 1).getOrElse(StartIndex))) + }) + dispatcher <- Dispatcher + .owner( + name = "sql-participant-state", + zeroIndex = StartIndex, + headAtInitialization = head, + ) + .acquire() + } yield dispatcher + } + } diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/JdbcIndex.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/JdbcIndex.scala index b10fc9976c..e5be373092 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/JdbcIndex.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/JdbcIndex.scala @@ -27,13 +27,18 @@ object JdbcIndex { metrics: Metrics, lfValueTranslationCache: LfValueTranslation.Cache, )(implicit mat: Materializer, logCtx: LoggingContext): ResourceOwner[IndexService] = - ReadOnlySqlLedger - .owner(serverRole, jdbcUrl, ledgerId, eventsPageSize, metrics, lfValueTranslationCache) - .map { ledger => - new LedgerBackedIndexService(MeteredReadOnlyLedger(ledger, metrics), participantId) { - override def getLedgerConfiguration(): Source[v2.LedgerConfiguration, NotUsed] = - // FIXME(JM): The indexer should on start set the default configuration. - Source.single(v2.LedgerConfiguration(initialConfig.maxDeduplicationTime)) - } + new ReadOnlySqlLedger.Owner( + serverRole, + jdbcUrl, + ledgerId, + eventsPageSize, + metrics, + lfValueTranslationCache, + ).map { ledger => + new LedgerBackedIndexService(MeteredReadOnlyLedger(ledger, metrics), participantId) { + override def getLedgerConfiguration(): Source[v2.LedgerConfiguration, NotUsed] = + // FIXME(JM): The indexer should on start set the default configuration. + Source.single(v2.LedgerConfiguration(initialConfig.maxDeduplicationTime)) } + } } diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/ReadOnlySqlLedger.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/ReadOnlySqlLedger.scala index 93fd7a5680..47a5796bd8 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/ReadOnlySqlLedger.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/ReadOnlySqlLedger.scala @@ -9,88 +9,92 @@ import akka.actor.Cancellable import akka.stream._ import akka.stream.scaladsl.{Keep, RestartSource, Sink, Source} import akka.{Done, NotUsed} -import com.daml.dec.{DirectExecutionContext => DEC} import com.daml.ledger.api.domain.LedgerId import com.daml.ledger.api.health.HealthStatus import com.daml.ledger.participant.state.v1.Offset import com.daml.logging.{ContextualizedLogger, LoggingContext} import com.daml.metrics.Metrics +import com.daml.platform.akkastreams.dispatcher.Dispatcher import com.daml.platform.common.LedgerIdMismatchException import com.daml.platform.configuration.ServerRole -import com.daml.platform.store.dao.{JdbcLedgerDao, LedgerReadDao} import com.daml.platform.store.dao.events.LfValueTranslation +import com.daml.platform.store.dao.{JdbcLedgerDao, LedgerReadDao} import com.daml.platform.store.{BaseLedger, ReadOnlyLedger} import com.daml.resources.ProgramResource.StartupException -import com.daml.resources.ResourceOwner +import com.daml.resources.{Resource, ResourceOwner} import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext, Future} object ReadOnlySqlLedger { + private val logger = ContextualizedLogger.get(this.getClass) + //jdbcUrl must have the user/password encoded in form of: "jdbc:postgresql://localhost/test?user=fred&password=secret" - def owner( + final class Owner( serverRole: ServerRole, jdbcUrl: String, - ledgerId: LedgerId, + initialLedgerId: LedgerId, eventsPageSize: Int, metrics: Metrics, lfValueTranslationCache: LfValueTranslation.Cache, - )(implicit mat: Materializer, logCtx: LoggingContext): ResourceOwner[ReadOnlyLedger] = - for { - ledgerReadDao <- JdbcLedgerDao.readOwner( + )(implicit mat: Materializer, logCtx: LoggingContext) + extends ResourceOwner[ReadOnlyLedger] { + override def acquire()( + implicit executionContext: ExecutionContext + ): Resource[ReadOnlyLedger] = + for { + ledgerDao <- ledgerDaoOwner().acquire() + ledgerId <- Resource.fromFuture(verifyOrSetLedgerId(ledgerDao, initialLedgerId)) + ledgerEnd <- Resource.fromFuture(ledgerDao.lookupLedgerEnd()) + dispatcher <- dispatcherOwner(ledgerEnd).acquire() + ledger <- ResourceOwner + .forCloseable(() => new ReadOnlySqlLedger(ledgerId, ledgerDao, dispatcher)) + .acquire() + } yield ledger + + private def verifyOrSetLedgerId( + ledgerDao: LedgerReadDao, + initialLedgerId: LedgerId, + )(implicit executionContext: ExecutionContext, logCtx: LoggingContext): Future[LedgerId] = + ledgerDao + .lookupLedgerId() + .flatMap { + case Some(`initialLedgerId`) => + logger.info(s"Found existing ledger with ID: $initialLedgerId") + Future.successful(initialLedgerId) + case Some(foundLedgerId) => + Future.failed( + new LedgerIdMismatchException(foundLedgerId, initialLedgerId) with StartupException) + case None => + Future.successful(initialLedgerId) + } + + private def ledgerDaoOwner(): ResourceOwner[LedgerReadDao] = + JdbcLedgerDao.readOwner( serverRole, jdbcUrl, eventsPageSize, metrics, lfValueTranslationCache, ) - factory = new Factory(ledgerReadDao) - ledger <- ResourceOwner.forFutureCloseable(() => factory.createReadOnlySqlLedger(ledgerId)) - } yield ledger - private class Factory(ledgerDao: LedgerReadDao)(implicit logCtx: LoggingContext) { - - private val logger = ContextualizedLogger.get(this.getClass) - - /** - * Creates a DB backed Ledger implementation. - * - * @return a compliant read-only Ledger implementation - */ - def createReadOnlySqlLedger(initialLedgerId: LedgerId)( - implicit mat: Materializer - ): Future[ReadOnlySqlLedger] = { - implicit val ec: ExecutionContext = DEC - for { - ledgerId <- initialize(initialLedgerId) - ledgerEnd <- ledgerDao.lookupLedgerEnd() - } yield new ReadOnlySqlLedger(ledgerId, ledgerEnd, ledgerDao) - } - - private def initialize(initialLedgerId: LedgerId): Future[LedgerId] = - ledgerDao - .lookupLedgerId() - .flatMap { - case Some(foundLedgerId @ `initialLedgerId`) => - logger.info(s"Found existing ledger with ID: $foundLedgerId") - Future.successful(foundLedgerId) - case Some(foundLedgerId) => - Future.failed( - new LedgerIdMismatchException(foundLedgerId, initialLedgerId) with StartupException) - case None => - Future.successful(initialLedgerId) - }(DEC) + private def dispatcherOwner(ledgerEnd: Offset): ResourceOwner[Dispatcher[Offset]] = + Dispatcher.owner( + name = "sql-ledger", + zeroIndex = Offset.beforeBegin, + headAtInitialization = ledgerEnd, + ) } } -private class ReadOnlySqlLedger( +private final class ReadOnlySqlLedger( ledgerId: LedgerId, - headAtInitialization: Offset, ledgerDao: LedgerReadDao, + dispatcher: Dispatcher[Offset], )(implicit mat: Materializer) - extends BaseLedger(ledgerId, headAtInitialization, ledgerDao) { + extends BaseLedger(ledgerId, ledgerDao, dispatcher) { private val (ledgerEndUpdateKillSwitch, ledgerEndUpdateDone) = RestartSource @@ -124,11 +128,12 @@ private class ReadOnlySqlLedger( override def close(): Unit = { // Terminate the dispatcher first so that it doesn't trigger new queries. - super.close() + dispatcher.close() + deduplicationCleanupKillSwitch.shutdown() ledgerEndUpdateKillSwitch.shutdown() Await.result(deduplicationCleanupDone, 10.seconds) Await.result(ledgerEndUpdateDone, 10.seconds) - () + super.close() } } diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/SandboxIndexAndWriteService.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/SandboxIndexAndWriteService.scala index 3632e15f3d..61ab908b3e 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/SandboxIndexAndWriteService.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/SandboxIndexAndWriteService.scala @@ -53,7 +53,7 @@ object SandboxIndexAndWriteService { private val logger = LoggerFactory.getLogger(SandboxIndexAndWriteService.getClass) def postgres( - ledgerId: LedgerIdMode, + initialLedgerId: LedgerIdMode, participantId: ParticipantId, jdbcUrl: String, initialConfig: ParticipantState.Configuration, @@ -68,28 +68,26 @@ object SandboxIndexAndWriteService { metrics: Metrics, lfValueTranslationCache: LfValueTranslation.Cache, )(implicit mat: Materializer, logCtx: LoggingContext): ResourceOwner[IndexAndWriteService] = - SqlLedger - .owner( - serverRole = ServerRole.Sandbox, - jdbcUrl = jdbcUrl, - ledgerId = ledgerId, - participantId = participantId, - timeProvider = timeProvider, - acs = acs, - packages = templateStore, - initialLedgerEntries = ledgerEntries, - queueDepth = queueDepth, - transactionCommitter = transactionCommitter, - startMode = startMode, - eventsPageSize = eventsPageSize, - metrics = metrics, - lfValueTranslationCache - ) - .flatMap(ledger => - owner(MeteredLedger(ledger, metrics), participantId, initialConfig, timeProvider)) + new SqlLedger.Owner( + serverRole = ServerRole.Sandbox, + jdbcUrl = jdbcUrl, + initialLedgerId = initialLedgerId, + participantId = participantId, + timeProvider = timeProvider, + acs = acs, + packages = templateStore, + initialLedgerEntries = ledgerEntries, + queueDepth = queueDepth, + transactionCommitter = transactionCommitter, + startMode = startMode, + eventsPageSize = eventsPageSize, + metrics = metrics, + lfValueTranslationCache + ).flatMap(ledger => + owner(MeteredLedger(ledger, metrics), participantId, initialConfig, timeProvider)) def inMemory( - ledgerId: LedgerIdMode, + initialLedgerId: LedgerIdMode, participantId: ParticipantId, intialConfig: ParticipantState.Configuration, timeProvider: TimeProvider, @@ -101,7 +99,7 @@ object SandboxIndexAndWriteService { )(implicit mat: Materializer): ResourceOwner[IndexAndWriteService] = { val ledger = new InMemoryLedger( - ledgerId.or(LedgerIdGenerator.generateRandomId()), + initialLedgerId.or(LedgerIdGenerator.generateRandomId()), participantId, timeProvider, acs, diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/SqlLedger.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/SqlLedger.scala index c264c9262f..c5a976d68b 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/SqlLedger.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/SqlLedger.scala @@ -23,19 +23,21 @@ import com.daml.lf.transaction.TransactionCommitter import com.daml.logging.{ContextualizedLogger, LoggingContext} import com.daml.metrics.Metrics import com.daml.platform.ApiOffset.ApiOffsetConverter +import com.daml.platform.akkastreams.dispatcher.Dispatcher import com.daml.platform.common.{LedgerIdMismatchException, LedgerIdMode} import com.daml.platform.configuration.ServerRole import com.daml.platform.packages.InMemoryPackageStore import com.daml.platform.sandbox.LedgerIdGenerator import com.daml.platform.sandbox.stores.InMemoryActiveLedgerState import com.daml.platform.sandbox.stores.ledger.ScenarioLoader.LedgerEntryOrBump +import com.daml.platform.sandbox.stores.ledger.sql.SqlLedger._ import com.daml.platform.sandbox.stores.ledger.{Ledger, SandboxOffset} import com.daml.platform.store.dao.events.LfValueTranslation import com.daml.platform.store.dao.{JdbcLedgerDao, LedgerDao} import com.daml.platform.store.entries.{LedgerEntry, PackageLedgerEntry, PartyLedgerEntry} import com.daml.platform.store.{BaseLedger, FlywayMigrations} import com.daml.resources.ProgramResource.StartupException -import com.daml.resources.ResourceOwner +import com.daml.resources.{Resource, ResourceOwner} import scala.collection.immutable.Queue import scala.concurrent.{ExecutionContext, Future} @@ -44,13 +46,13 @@ import scala.util.{Failure, Success} object SqlLedger { - private type PersistentQueue = SourceQueueWithComplete[Offset => Future[Unit]] + private type PersistenceQueue = SourceQueueWithComplete[Offset => Future[Unit]] - def owner( + final class Owner( serverRole: ServerRole, // jdbcUrl must have the user/password encoded in form of: "jdbc:postgresql://localhost/test?user=fred&password=secret" jdbcUrl: String, - ledgerId: LedgerIdMode, + initialLedgerId: LedgerIdMode, participantId: ParticipantId, timeProvider: TimeProvider, acs: InMemoryActiveLedgerState, @@ -61,80 +63,223 @@ object SqlLedger { startMode: SqlStartMode = SqlStartMode.ContinueIfExists, eventsPageSize: Int, metrics: Metrics, - lfValueTranslationCache: LfValueTranslation.Cache - )(implicit mat: Materializer, logCtx: LoggingContext): ResourceOwner[Ledger] = - for { - _ <- ResourceOwner.forFuture(() => new FlywayMigrations(jdbcUrl).migrate()(DEC)) - ledgerDao <- JdbcLedgerDao.validatingWriteOwner( + lfValueTranslationCache: LfValueTranslation.Cache, + )(implicit mat: Materializer, logCtx: LoggingContext) + extends ResourceOwner[Ledger] { + + private val logger = ContextualizedLogger.get(this.getClass) + + override def acquire()(implicit executionContext: ExecutionContext): Resource[Ledger] = + for { + _ <- Resource.fromFuture(new FlywayMigrations(jdbcUrl).migrate()) + ledgerDao <- ledgerDaoOwner().acquire() + _ <- startMode match { + case SqlStartMode.AlwaysReset => + Resource.fromFuture(ledgerDao.reset()) + case SqlStartMode.ContinueIfExists => + Resource.unit + } + ledgerId <- Resource.fromFuture(initialize(ledgerDao)) + ledgerEnd <- Resource.fromFuture(ledgerDao.lookupLedgerEnd()) + ledgerConfig <- Resource.fromFuture(ledgerDao.lookupLedgerConfiguration()) + dispatcher <- dispatcherOwner(ledgerEnd).acquire() + persistenceQueue <- new PersistenceQueueOwner(dispatcher).acquire() + // Close the dispatcher before the persistence queue. + _ <- Resource(Future.unit)(_ => Future.successful(dispatcher.close())) + ledger <- sqlLedgerOwner( + ledgerId, + ledgerConfig.map(_._2), + ledgerDao, + dispatcher, + persistenceQueue).acquire() + } yield ledger + + private def initialize( + ledgerDao: LedgerDao, + )(implicit executionContext: ExecutionContext): Future[LedgerId] = { + // Note that here we only store the ledger entry and we do not update anything else, such as the + // headRef. This is OK since this initialization + // step happens before we start up the sql ledger at all, so it's running in isolation. + for { + currentLedgerId <- ledgerDao.lookupLedgerId() + initializationRequired = currentLedgerId.isEmpty + ledgerId <- (currentLedgerId, initialLedgerId) match { + case (Some(foundLedgerId), LedgerIdMode.Static(initialId)) + if foundLedgerId == initialId => + ledgerFound(foundLedgerId, initialLedgerEntries, packages) + + case (Some(foundLedgerId), LedgerIdMode.Static(initialId)) => + Future.failed( + new LedgerIdMismatchException(foundLedgerId, initialId) with StartupException) + + case (Some(foundLedgerId), LedgerIdMode.Dynamic) => + ledgerFound(foundLedgerId, initialLedgerEntries, packages) + + case (None, LedgerIdMode.Static(initialId)) => + Future.successful(initialId) + + case (None, LedgerIdMode.Dynamic) => + val randomLedgerId = LedgerIdGenerator.generateRandomId() + Future.successful(randomLedgerId) + } + _ <- if (initializationRequired) { + logger.info(s"Initializing ledger with ID: $ledgerId") + for { + _ <- ledgerDao.initializeLedger(ledgerId) + _ <- initializeLedgerEntries( + initialLedgerEntries, + timeProvider, + packages, + acs, + ledgerDao, + participantId, + ) + } yield () + } else { + Future.unit + } + } yield ledgerId + } + + private def ledgerFound( + foundLedgerId: LedgerId, + initialLedgerEntries: ImmArray[LedgerEntryOrBump], + packages: InMemoryPackageStore, + ): Future[LedgerId] = { + logger.info(s"Found existing ledger with ID: $foundLedgerId") + if (initialLedgerEntries.nonEmpty) { + logger.warn( + s"Initial ledger entries provided, presumably from scenario, but there is an existing database, and thus they will not be used.") + } + if (packages.listLfPackagesSync().nonEmpty) { + logger.warn( + s"Initial packages provided, presumably as command line arguments, but there is an existing database, and thus they will not be used.") + } + Future.successful(foundLedgerId) + } + + private def initializeLedgerEntries( + initialLedgerEntries: ImmArray[LedgerEntryOrBump], + timeProvider: TimeProvider, + packages: InMemoryPackageStore, + acs: InMemoryActiveLedgerState, + ledgerDao: LedgerDao, + participantId: ParticipantId, + )(implicit executionContext: ExecutionContext): Future[Unit] = { + if (initialLedgerEntries.nonEmpty) { + logger.info(s"Initializing ledger with ${initialLedgerEntries.length} ledger entries.") + } + + val (ledgerEnd, ledgerEntries) = + initialLedgerEntries.foldLeft((1L, Vector.empty[(Offset, LedgerEntry)])) { + case ((offset, entries), entryOrBump) => + entryOrBump match { + case LedgerEntryOrBump.Entry(entry) => + (offset + 1, entries :+ SandboxOffset.toOffset(offset + 1) -> entry) + case LedgerEntryOrBump.Bump(increment) => + (offset + increment, entries) + } + } + + for { + _ <- copyPackages( + packages, + ledgerDao, + timeProvider.getCurrentTime, + SandboxOffset.toOffset(ledgerEnd), + ) + _ <- ledgerDao.storeInitialState(ledgerEntries, SandboxOffset.toOffset(ledgerEnd)) + } yield () + } + + private def copyPackages( + store: InMemoryPackageStore, + ledgerDao: LedgerDao, + knownSince: Instant, + newLedgerEnd: Offset, + ): Future[Unit] = { + val packageDetails = store.listLfPackagesSync() + if (packageDetails.nonEmpty) { + logger.info(s"Copying initial packages ${packageDetails.keys.mkString(",")}") + val packages = packageDetails.toList.map(pkg => { + val archive = + store.getLfArchiveSync(pkg._1).getOrElse(sys.error(s"Package ${pkg._1} not found")) + archive -> PackageDetails(archive.getPayload.size.toLong, knownSince, None) + }) + + ledgerDao + .storePackageEntry(newLedgerEnd, packages, None) + .transform(_ => (), e => sys.error("Failed to copy initial packages: " + e.getMessage))( + DEC) + } else { + Future.successful(()) + } + } + + private def ledgerDaoOwner(): ResourceOwner[LedgerDao] = + JdbcLedgerDao.validatingWriteOwner( serverRole, jdbcUrl, eventsPageSize, metrics, - lfValueTranslationCache) - ledger <- ResourceOwner.forFutureCloseable( + lfValueTranslationCache, + ) + + private def dispatcherOwner(ledgerEnd: Offset): ResourceOwner[Dispatcher[Offset]] = + Dispatcher.owner( + name = "sql-ledger", + zeroIndex = Offset.beforeBegin, + headAtInitialization = ledgerEnd, + ) + + private def sqlLedgerOwner( + ledgerId: LedgerId, + ledgerConfig: Option[Configuration], + ledgerDao: LedgerDao, + dispatcher: Dispatcher[Offset], + persistenceQueue: PersistenceQueue, + ): ResourceOwner[SqlLedger] = + ResourceOwner.forCloseable( () => - new SqlLedgerFactory(ledgerDao).createSqlLedger( + new SqlLedger( ledgerId, participantId, + ledgerConfig, + ledgerDao, + dispatcher, timeProvider, - startMode, - acs, packages, - initialLedgerEntries, - queueDepth, + persistenceQueue, transactionCommitter, )) - } yield ledger -} -private final class SqlLedger( - ledgerId: LedgerId, - participantId: ParticipantId, - headAtInitialization: Offset, - configAtInitialization: Option[Configuration], - ledgerDao: LedgerDao, - timeProvider: TimeProvider, - packages: InMemoryPackageStore, - queueDepth: Int, - transactionCommitter: TransactionCommitter, -)(implicit mat: Materializer, logCtx: LoggingContext) - extends BaseLedger(ledgerId, headAtInitialization, ledgerDao) - with Ledger { + private final class PersistenceQueueOwner(dispatcher: Dispatcher[Offset]) + extends ResourceOwner[PersistenceQueue] { + override def acquire()( + implicit executionContext: ExecutionContext + ): Resource[PersistenceQueue] = + Resource(Future.successful { + val queue = + Source.queue[Offset => Future[Unit]](queueDepth, OverflowStrategy.dropNew) - import SqlLedger._ + // By default we process the requests in batches when under pressure (see semantics of `batch`). Note + // that this is safe on the read end because the readers rely on the dispatchers to know the + // ledger end, and not the database itself. This means that they will not start reading from the new + // ledger end until we tell them so, which we do when _all_ the entries have been committed. + val persistenceQueue = queue + .batch(1, Queue(_))(_.enqueue(_)) + .mapAsync(1)(persistAll) + .toMat(Sink.ignore)(Keep.left[PersistenceQueue, Future[Done]]) + .run() + watchForFailures(persistenceQueue) + persistenceQueue + })(queue => Future.successful(queue.complete())) - private val logger = ContextualizedLogger.get(this.getClass) - - // the reason for modelling persistence as a reactive pipeline is to avoid having race-conditions between the - // moving ledger-end, the async persistence operation and the dispatcher head notification - private val persistenceQueue: PersistentQueue = createQueue() - - watchForFailures(persistenceQueue, "persistence") - - private def watchForFailures(queue: SourceQueueWithComplete[_], name: String): Unit = - queue - .watchCompletion() - .failed - .foreach { throwable => - logger.error(s"$name queue has been closed with a failure!", throwable) - }(DEC) - - private def createQueue(): PersistentQueue = { - implicit val ec: ExecutionContext = DEC - - val persistenceQueue = - Source.queue[Offset => Future[Unit]](queueDepth, OverflowStrategy.dropNew) - - // By default we process the requests in batches when under pressure (see semantics of `batch`). Note - // that this is safe on the read end because the readers rely on the dispatchers to know the - // ledger end, and not the database itself. This means that they will not start reading from the new - // ledger end until we tell them so, which we do when _all_ the entries have been committed. - persistenceQueue - .batch(1, Queue(_))(_.enqueue(_)) - .mapAsync(1) { queue => + private def persistAll(queue: Queue[Offset => Future[Unit]]): Future[Unit] = { + implicit val ec: ExecutionContext = DEC val startOffset = SandboxOffset.fromOffset(dispatcher.getHead()) - // we can only do this because there is no parallelism here! - //shooting the SQL queries in parallel + // This will attempt to run the SQL queries concurrently, but there is no parallelism here, + // so they will still run sequentially. Future .sequence(queue.toIterator.zipWithIndex.map { case (persist, i) => @@ -142,20 +287,45 @@ private final class SqlLedger( persist(SandboxOffset.toOffset(offset)) }) .map { _ => - //note that we can have holes in offsets in case of the storing of an entry failed for some reason - dispatcher.signalNewHead(SandboxOffset.toOffset(startOffset + queue.length)) //signalling downstream subscriptions + // note that we can have holes in offsets in case of the storing of an entry failed for some reason + dispatcher.signalNewHead( + SandboxOffset + .toOffset(startOffset + queue.length)) // signalling downstream subscriptions } } - .toMat(Sink.ignore)(Keep.left[PersistentQueue, Future[Done]]) - .run() + } + + private def watchForFailures(queue: SourceQueueWithComplete[_]): Unit = + queue + .watchCompletion() + .failed + .foreach { throwable => + logger.error("Persistence queue has been closed with a failure.", throwable) + }(DEC) + } +} + +private final class SqlLedger( + ledgerId: LedgerId, + participantId: ParticipantId, + configAtInitialization: Option[Configuration], + ledgerDao: LedgerDao, + dispatcher: Dispatcher[Offset], + timeProvider: TimeProvider, + packages: InMemoryPackageStore, + persistenceQueue: PersistenceQueue, + transactionCommitter: TransactionCommitter, +)(implicit mat: Materializer, logCtx: LoggingContext) + extends BaseLedger(ledgerId, ledgerDao, dispatcher) + with Ledger { + + private val logger = ContextualizedLogger.get(this.getClass) override def currentHealth(): HealthStatus = ledgerDao.currentHealth() - override def close(): Unit = { + override def close(): Unit = super.close() - persistenceQueue.complete() - } // Note: ledger entries are written in batches, and this variable is updated while writing a ledger configuration // changed entry. Transactions written around the same time as a configuration change entry might not use the correct @@ -335,200 +505,3 @@ private final class SqlLedger( }(DEC) } } - -private final class SqlLedgerFactory(ledgerDao: LedgerDao)(implicit logCtx: LoggingContext) { - - private val logger = ContextualizedLogger.get(this.getClass) - - /** * - * Creates a DB backed Ledger implementation. - * - * @param initialLedgerId a random ledger id is generated if none given, if set it's used to initialize the ledger. - * In case the ledger had already been initialized, the given ledger id must not be set or must - * be equal to the one in the database. - * @param participantId the participant identifier - * @param timeProvider to get the current time when sequencing transactions - * @param startMode whether we should start with a clean state or continue where we left off - * @param initialLedgerEntries The initial ledger entries -- usually provided by the scenario runner. Will only be - * used if starting from a fresh database. - * @param queueDepth the depth of the buffer for persisting entries. When gets full, the system will signal back-pressure - * upstream - * @return a compliant Ledger implementation - */ - def createSqlLedger( - initialLedgerId: LedgerIdMode, - participantId: ParticipantId, - timeProvider: TimeProvider, - startMode: SqlStartMode, - acs: InMemoryActiveLedgerState, - packages: InMemoryPackageStore, - initialLedgerEntries: ImmArray[LedgerEntryOrBump], - queueDepth: Int, - transactionCommitter: TransactionCommitter, - )(implicit mat: Materializer): Future[SqlLedger] = { - implicit val ec: ExecutionContext = DEC - - def init(): Future[LedgerId] = startMode match { - case SqlStartMode.AlwaysReset => - for { - _ <- reset() - ledgerId <- initialize( - initialLedgerId, - participantId, - timeProvider, - acs, - packages, - initialLedgerEntries, - ) - } yield ledgerId - case SqlStartMode.ContinueIfExists => - initialize( - initialLedgerId, - participantId, - timeProvider, - acs, - packages, - initialLedgerEntries, - ) - } - - for { - ledgerId <- init() - ledgerEnd <- ledgerDao.lookupLedgerEnd() - ledgerConfig <- ledgerDao.lookupLedgerConfiguration() - } yield - new SqlLedger( - ledgerId, - participantId, - ledgerEnd, - ledgerConfig.map(_._2), - ledgerDao, - timeProvider, - packages, - queueDepth, - transactionCommitter, - ) - } - - private def reset(): Future[Unit] = - ledgerDao.reset() - - private def initialize( - initialLedgerId: LedgerIdMode, - participantId: ParticipantId, - timeProvider: TimeProvider, - acs: InMemoryActiveLedgerState, - packages: InMemoryPackageStore, - initialLedgerEntries: ImmArray[LedgerEntryOrBump], - ): Future[LedgerId] = { - // Note that here we only store the ledger entry and we do not update anything else, such as the - // headRef. This is OK since this initialization - // step happens before we start up the sql ledger at all, so it's running in isolation. - - implicit val ec: ExecutionContext = DEC - for { - currentLedgerId <- ledgerDao.lookupLedgerId() - initializationRequired = currentLedgerId.isEmpty - ledgerId <- (currentLedgerId, initialLedgerId) match { - case (Some(foundLedgerId), LedgerIdMode.Static(initialId)) if foundLedgerId == initialId => - ledgerFound(foundLedgerId, initialLedgerEntries, packages) - - case (Some(foundLedgerId), LedgerIdMode.Static(initialId)) => - Future.failed( - new LedgerIdMismatchException(foundLedgerId, initialId) with StartupException) - - case (Some(foundLedgerId), LedgerIdMode.Dynamic) => - ledgerFound(foundLedgerId, initialLedgerEntries, packages) - - case (None, LedgerIdMode.Static(initialId)) => - Future.successful(initialId) - - case (None, LedgerIdMode.Dynamic) => - val randomLedgerId = LedgerIdGenerator.generateRandomId() - Future.successful(randomLedgerId) - } - _ <- if (initializationRequired) { - logger.info(s"Initializing ledger with ID: $ledgerId") - for { - _ <- ledgerDao.initializeLedger(ledgerId) - _ <- initializeLedgerEntries( - initialLedgerEntries, - timeProvider, - packages, - acs, - participantId) - } yield () - } else { - Future.unit - } - } yield ledgerId - } - - private def ledgerFound( - foundLedgerId: LedgerId, - initialLedgerEntries: ImmArray[LedgerEntryOrBump], - packages: InMemoryPackageStore, - ): Future[LedgerId] = { - logger.info(s"Found existing ledger with ID: $foundLedgerId") - if (initialLedgerEntries.nonEmpty) { - logger.warn( - s"Initial ledger entries provided, presumably from scenario, but there is an existing database, and thus they will not be used.") - } - if (packages.listLfPackagesSync().nonEmpty) { - logger.warn( - s"Initial packages provided, presumably as command line arguments, but there is an existing database, and thus they will not be used.") - } - Future.successful(foundLedgerId) - } - - private def initializeLedgerEntries( - initialLedgerEntries: ImmArray[LedgerEntryOrBump], - timeProvider: TimeProvider, - packages: InMemoryPackageStore, - acs: InMemoryActiveLedgerState, - participantId: ParticipantId, - )(implicit executionContext: ExecutionContext): Future[Unit] = { - if (initialLedgerEntries.nonEmpty) { - logger.info(s"Initializing ledger with ${initialLedgerEntries.length} ledger entries.") - } - - val (ledgerEnd, ledgerEntries) = - initialLedgerEntries.foldLeft((1L, Vector.empty[(Offset, LedgerEntry)])) { - case ((offset, entries), entryOrBump) => - entryOrBump match { - case LedgerEntryOrBump.Entry(entry) => - (offset + 1, entries :+ SandboxOffset.toOffset(offset + 1) -> entry) - case LedgerEntryOrBump.Bump(increment) => - (offset + increment, entries) - } - } - - for { - _ <- copyPackages(packages, timeProvider.getCurrentTime, SandboxOffset.toOffset(ledgerEnd)) - _ <- ledgerDao.storeInitialState(ledgerEntries, SandboxOffset.toOffset(ledgerEnd)) - } yield () - } - - private def copyPackages( - store: InMemoryPackageStore, - knownSince: Instant, - newLedgerEnd: Offset): Future[Unit] = { - - val packageDetails = store.listLfPackagesSync() - if (packageDetails.nonEmpty) { - logger.info(s"Copying initial packages ${packageDetails.keys.mkString(",")}") - val packages = packageDetails.toList.map(pkg => { - val archive = - store.getLfArchiveSync(pkg._1).getOrElse(sys.error(s"Package ${pkg._1} not found")) - archive -> PackageDetails(archive.getPayload.size.toLong, knownSince, None) - }) - - ledgerDao - .storePackageEntry(newLedgerEnd, packages, None) - .transform(_ => (), e => sys.error("Failed to copy initial packages: " + e.getMessage))(DEC) - } else { - Future.successful(()) - } - } - -} diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/store/BaseLedger.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/store/BaseLedger.scala index 0079952b74..9217612e00 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/store/BaseLedger.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/store/BaseLedger.scala @@ -7,16 +7,6 @@ import java.time.Instant import akka.NotUsed import akka.stream.scaladsl.Source -import com.daml.ledger.participant.state.index.v2 -import com.daml.ledger.participant.state.index.v2.CommandDeduplicationResult -import com.daml.ledger.participant.state.v1.{Configuration, Offset} -import com.daml.lf.archive.Decode -import com.daml.lf.data.Ref -import com.daml.lf.data.Ref.{Identifier, PackageId, Party} -import com.daml.lf.language.Ast -import com.daml.lf.transaction.Node -import com.daml.lf.value.Value -import com.daml.lf.value.Value.{ContractId, ContractInst} import com.daml.daml_lf_dev.DamlLf import com.daml.dec.DirectExecutionContext import com.daml.ledger.TransactionId @@ -31,6 +21,16 @@ import com.daml.ledger.api.v1.transaction_service.{ GetTransactionTreesResponse, GetTransactionsResponse } +import com.daml.ledger.participant.state.index.v2 +import com.daml.ledger.participant.state.index.v2.CommandDeduplicationResult +import com.daml.ledger.participant.state.v1.{Configuration, Offset} +import com.daml.lf.archive.Decode +import com.daml.lf.data.Ref +import com.daml.lf.data.Ref.{Identifier, PackageId, Party} +import com.daml.lf.language.Ast +import com.daml.lf.transaction.Node +import com.daml.lf.value.Value +import com.daml.lf.value.Value.{ContractId, ContractInst} import com.daml.platform.akkastreams.dispatcher.Dispatcher import com.daml.platform.akkastreams.dispatcher.SubSource.RangeSource import com.daml.platform.store.dao.LedgerReadDao @@ -42,18 +42,12 @@ import scala.util.Try abstract class BaseLedger( val ledgerId: LedgerId, - headAtInitialization: Offset, - ledgerDao: LedgerReadDao) - extends ReadOnlyLedger { + ledgerDao: LedgerReadDao, + dispatcher: Dispatcher[Offset], +) extends ReadOnlyLedger { implicit private val DEC: ExecutionContext = DirectExecutionContext - protected final val dispatcher: Dispatcher[Offset] = Dispatcher[Offset]( - "sql-ledger", - Offset.beforeBegin, - headAtInitialization - ) - override def currentHealth(): HealthStatus = ledgerDao.currentHealth() override def lookupKey(key: Node.GlobalKey, forParty: Party): Future[Option[ContractId]] = @@ -174,7 +168,5 @@ abstract class BaseLedger( override def stopDeduplicatingCommand(commandId: CommandId, submitter: Party): Future[Unit] = ledgerDao.stopDeduplicatingCommand(commandId, submitter) - override def close(): Unit = { - dispatcher.close() - } + override def close(): Unit = () } diff --git a/ledger/sandbox/src/test/lib/scala/com/digitalasset/platform/sandbox/LedgerResource.scala b/ledger/sandbox/src/test/lib/scala/com/digitalasset/platform/sandbox/LedgerResource.scala index 24ee6195ef..500132b1bc 100644 --- a/ledger/sandbox/src/test/lib/scala/com/digitalasset/platform/sandbox/LedgerResource.scala +++ b/ledger/sandbox/src/test/lib/scala/com/digitalasset/platform/sandbox/LedgerResource.scala @@ -64,10 +64,10 @@ object LedgerResource { new OwnedResource( for { database <- PostgresResource.owner() - ledger <- SqlLedger.owner( + ledger <- new SqlLedger.Owner( serverRole = ServerRole.Testing(testClass), jdbcUrl = database.url, - ledgerId = LedgerIdMode.Static(ledgerId), + initialLedgerId = LedgerIdMode.Static(ledgerId), participantId = participantId, timeProvider = timeProvider, acs = InMemoryActiveLedgerState.empty, diff --git a/ledger/sandbox/src/test/suite/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/SqlLedgerSpec.scala b/ledger/sandbox/src/test/suite/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/SqlLedgerSpec.scala index 6bf30300f8..462196e5ae 100644 --- a/ledger/sandbox/src/test/suite/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/SqlLedgerSpec.scala +++ b/ledger/sandbox/src/test/suite/scala/com/digitalasset/platform/sandbox/stores/ledger/sql/SqlLedgerSpec.scala @@ -175,26 +175,24 @@ class SqlLedgerSpec ): Future[Ledger] = { metrics.getNames.forEach(name => { val _ = metrics.remove(name) }) val ledger = newLoggingContext { implicit logCtx => - SqlLedger - .owner( - serverRole = ServerRole.Testing(getClass), - jdbcUrl = postgresDatabase.url, - ledgerId = ledgerId.fold[LedgerIdMode](LedgerIdMode.Dynamic)(LedgerIdMode.Static), - participantId = participantId, - timeProvider = TimeProvider.UTC, - acs = InMemoryActiveLedgerState.empty, - packages = InMemoryPackageStore.empty - .withPackages(Instant.EPOCH, None, packages) - .fold(sys.error, identity), - initialLedgerEntries = ImmArray.empty, - queueDepth = queueDepth, - transactionCommitter = LegacyTransactionCommitter, - startMode = SqlStartMode.ContinueIfExists, - eventsPageSize = 100, - metrics = new Metrics(metrics), - lfValueTranslationCache = LfValueTranslation.Cache.none, - ) - .acquire()(system.dispatcher) + new SqlLedger.Owner( + serverRole = ServerRole.Testing(getClass), + jdbcUrl = postgresDatabase.url, + initialLedgerId = ledgerId.fold[LedgerIdMode](LedgerIdMode.Dynamic)(LedgerIdMode.Static), + participantId = participantId, + timeProvider = TimeProvider.UTC, + acs = InMemoryActiveLedgerState.empty, + packages = InMemoryPackageStore.empty + .withPackages(Instant.EPOCH, None, packages) + .fold(sys.error, identity), + initialLedgerEntries = ImmArray.empty, + queueDepth = queueDepth, + transactionCommitter = LegacyTransactionCommitter, + startMode = SqlStartMode.ContinueIfExists, + eventsPageSize = 100, + metrics = new Metrics(metrics), + lfValueTranslationCache = LfValueTranslation.Cache.none, + ).acquire()(system.dispatcher) } createdLedgers += ledger ledger.asFuture