sandbox: Inject the dispatcher into the BaseLedger. (#6497)

* sandbox: Create proper `ResourceOwner` implementations.

This allows us to use the resource acquisition execution context, rather
than using DirectExecutionContext.

CHANGELOG_BEGIN
CHANGELOG_END

* sandbox: Simplify the construction of SqlLedger.

* sandbox: Inject the dispatcher into the BaseLedger.

* sandbox: Make sure the SqlLedger objects are closed.

Tiny regression. No one noticed. It's OK.

* sandbox: Simplify ReadOnlySqlLedger.

Co-Authored-By: Stefano Baghino <43749967+stefanobaghino-da@users.noreply.github.com>

* sandbox: Pull out functions to make SqlLedger.Owner easier to read.

Co-Authored-By: Stefano Baghino <43749967+stefanobaghino-da@users.noreply.github.com>

* ledger-api-common: Factor out wrapping Dispatcher in a ResourceOwner.

* sandbox: Move the PersistenceQueue into a ResourceOwner.

* ledger-api-common: Add a comma.

Co-authored-by: Stefano Baghino <43749967+stefanobaghino-da@users.noreply.github.com>

Co-authored-by: Stefano Baghino <43749967+stefanobaghino-da@users.noreply.github.com>
This commit is contained in:
Samir Talwar 2020-06-29 15:24:07 +02:00 committed by GitHub
parent 77c0b879e6
commit 8b972fb1d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 400 additions and 407 deletions

View File

@ -32,6 +32,7 @@ da_scala_library(
"//ledger/ledger-api-health",
"//ledger/metrics",
"//libs-scala/direct-execution-context",
"//libs-scala/resources",
"@maven//:com_github_ghik_silencer_lib_2_12_11",
"@maven//:com_typesafe_akka_akka_actor_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",

View File

@ -5,6 +5,7 @@ package com.daml.platform.akkastreams.dispatcher
import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.resources.ResourceOwner
/**
* A fanout signaller, representing a stream of external updates,
@ -43,6 +44,15 @@ object Dispatcher {
def apply[Index: Ordering](
name: String,
zeroIndex: Index,
headAtInitialization: Index): Dispatcher[Index] =
headAtInitialization: Index,
): Dispatcher[Index] =
new DispatcherImpl[Index](name: String, zeroIndex, headAtInitialization)
def owner[Index: Ordering](
name: String,
zeroIndex: Index,
headAtInitialization: Index,
): ResourceOwner[Dispatcher[Index]] =
ResourceOwner.forCloseable(() => apply(name, zeroIndex, headAtInitialization))
}

View File

@ -12,11 +12,9 @@ package object memory {
private[memory] val StartIndex: Index = 0
private[memory] def dispatcherOwner: ResourceOwner[Dispatcher[Index]] =
ResourceOwner.forCloseable(
() =>
Dispatcher(
"in-memory-key-value-participant-state",
zeroIndex = StartIndex,
headAtInitialization = StartIndex,
))
Dispatcher.owner(
name = "in-memory-key-value-participant-state",
zeroIndex = StartIndex,
headAtInitialization = StartIndex,
)
}

View File

@ -139,7 +139,7 @@ object SqlLedgerReaderWriter {
if (resetOnStartup) uninitializedDatabase.migrateAndReset()
else Future.successful(uninitializedDatabase.migrate()))
ledgerId <- Resource.fromFuture(updateOrRetrieveLedgerId(initialLedgerId, database))
dispatcher <- ResourceOwner.forFutureCloseable(() => newDispatcher(database)).acquire()
dispatcher <- new DispatcherOwner(database).acquire()
} yield
new SqlLedgerReaderWriter(
ledgerId,
@ -177,13 +177,26 @@ object SqlLedgerReaderWriter {
})
}
private def newDispatcher(database: Database)(
implicit executionContext: ExecutionContext,
logCtx: LoggingContext,
): Future[Dispatcher[Index]] =
database
.inReadTransaction("read_head") { queries =>
Future.fromTry(queries.selectLatestLogEntryId().map(_.map(_ + 1).getOrElse(StartIndex)))
}
.map(head => Dispatcher("sql-participant-state", StartIndex, head))
private final class DispatcherOwner(database: Database)(implicit logCtx: LoggingContext)
extends ResourceOwner[Dispatcher[Index]] {
override def acquire()(
implicit executionContext: ExecutionContext
): Resource[Dispatcher[Index]] =
for {
head <- Resource.fromFuture(
database
.inReadTransaction("read_head") { queries =>
Future.fromTry(
queries.selectLatestLogEntryId().map(_.map(_ + 1).getOrElse(StartIndex)))
})
dispatcher <- Dispatcher
.owner(
name = "sql-participant-state",
zeroIndex = StartIndex,
headAtInitialization = head,
)
.acquire()
} yield dispatcher
}
}

View File

@ -27,13 +27,18 @@ object JdbcIndex {
metrics: Metrics,
lfValueTranslationCache: LfValueTranslation.Cache,
)(implicit mat: Materializer, logCtx: LoggingContext): ResourceOwner[IndexService] =
ReadOnlySqlLedger
.owner(serverRole, jdbcUrl, ledgerId, eventsPageSize, metrics, lfValueTranslationCache)
.map { ledger =>
new LedgerBackedIndexService(MeteredReadOnlyLedger(ledger, metrics), participantId) {
override def getLedgerConfiguration(): Source[v2.LedgerConfiguration, NotUsed] =
// FIXME(JM): The indexer should on start set the default configuration.
Source.single(v2.LedgerConfiguration(initialConfig.maxDeduplicationTime))
}
new ReadOnlySqlLedger.Owner(
serverRole,
jdbcUrl,
ledgerId,
eventsPageSize,
metrics,
lfValueTranslationCache,
).map { ledger =>
new LedgerBackedIndexService(MeteredReadOnlyLedger(ledger, metrics), participantId) {
override def getLedgerConfiguration(): Source[v2.LedgerConfiguration, NotUsed] =
// FIXME(JM): The indexer should on start set the default configuration.
Source.single(v2.LedgerConfiguration(initialConfig.maxDeduplicationTime))
}
}
}

View File

@ -9,88 +9,92 @@ import akka.actor.Cancellable
import akka.stream._
import akka.stream.scaladsl.{Keep, RestartSource, Sink, Source}
import akka.{Done, NotUsed}
import com.daml.dec.{DirectExecutionContext => DEC}
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.participant.state.v1.Offset
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.platform.akkastreams.dispatcher.Dispatcher
import com.daml.platform.common.LedgerIdMismatchException
import com.daml.platform.configuration.ServerRole
import com.daml.platform.store.dao.{JdbcLedgerDao, LedgerReadDao}
import com.daml.platform.store.dao.events.LfValueTranslation
import com.daml.platform.store.dao.{JdbcLedgerDao, LedgerReadDao}
import com.daml.platform.store.{BaseLedger, ReadOnlyLedger}
import com.daml.resources.ProgramResource.StartupException
import com.daml.resources.ResourceOwner
import com.daml.resources.{Resource, ResourceOwner}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
object ReadOnlySqlLedger {
private val logger = ContextualizedLogger.get(this.getClass)
//jdbcUrl must have the user/password encoded in form of: "jdbc:postgresql://localhost/test?user=fred&password=secret"
def owner(
final class Owner(
serverRole: ServerRole,
jdbcUrl: String,
ledgerId: LedgerId,
initialLedgerId: LedgerId,
eventsPageSize: Int,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslation.Cache,
)(implicit mat: Materializer, logCtx: LoggingContext): ResourceOwner[ReadOnlyLedger] =
for {
ledgerReadDao <- JdbcLedgerDao.readOwner(
)(implicit mat: Materializer, logCtx: LoggingContext)
extends ResourceOwner[ReadOnlyLedger] {
override def acquire()(
implicit executionContext: ExecutionContext
): Resource[ReadOnlyLedger] =
for {
ledgerDao <- ledgerDaoOwner().acquire()
ledgerId <- Resource.fromFuture(verifyOrSetLedgerId(ledgerDao, initialLedgerId))
ledgerEnd <- Resource.fromFuture(ledgerDao.lookupLedgerEnd())
dispatcher <- dispatcherOwner(ledgerEnd).acquire()
ledger <- ResourceOwner
.forCloseable(() => new ReadOnlySqlLedger(ledgerId, ledgerDao, dispatcher))
.acquire()
} yield ledger
private def verifyOrSetLedgerId(
ledgerDao: LedgerReadDao,
initialLedgerId: LedgerId,
)(implicit executionContext: ExecutionContext, logCtx: LoggingContext): Future[LedgerId] =
ledgerDao
.lookupLedgerId()
.flatMap {
case Some(`initialLedgerId`) =>
logger.info(s"Found existing ledger with ID: $initialLedgerId")
Future.successful(initialLedgerId)
case Some(foundLedgerId) =>
Future.failed(
new LedgerIdMismatchException(foundLedgerId, initialLedgerId) with StartupException)
case None =>
Future.successful(initialLedgerId)
}
private def ledgerDaoOwner(): ResourceOwner[LedgerReadDao] =
JdbcLedgerDao.readOwner(
serverRole,
jdbcUrl,
eventsPageSize,
metrics,
lfValueTranslationCache,
)
factory = new Factory(ledgerReadDao)
ledger <- ResourceOwner.forFutureCloseable(() => factory.createReadOnlySqlLedger(ledgerId))
} yield ledger
private class Factory(ledgerDao: LedgerReadDao)(implicit logCtx: LoggingContext) {
private val logger = ContextualizedLogger.get(this.getClass)
/**
* Creates a DB backed Ledger implementation.
*
* @return a compliant read-only Ledger implementation
*/
def createReadOnlySqlLedger(initialLedgerId: LedgerId)(
implicit mat: Materializer
): Future[ReadOnlySqlLedger] = {
implicit val ec: ExecutionContext = DEC
for {
ledgerId <- initialize(initialLedgerId)
ledgerEnd <- ledgerDao.lookupLedgerEnd()
} yield new ReadOnlySqlLedger(ledgerId, ledgerEnd, ledgerDao)
}
private def initialize(initialLedgerId: LedgerId): Future[LedgerId] =
ledgerDao
.lookupLedgerId()
.flatMap {
case Some(foundLedgerId @ `initialLedgerId`) =>
logger.info(s"Found existing ledger with ID: $foundLedgerId")
Future.successful(foundLedgerId)
case Some(foundLedgerId) =>
Future.failed(
new LedgerIdMismatchException(foundLedgerId, initialLedgerId) with StartupException)
case None =>
Future.successful(initialLedgerId)
}(DEC)
private def dispatcherOwner(ledgerEnd: Offset): ResourceOwner[Dispatcher[Offset]] =
Dispatcher.owner(
name = "sql-ledger",
zeroIndex = Offset.beforeBegin,
headAtInitialization = ledgerEnd,
)
}
}
private class ReadOnlySqlLedger(
private final class ReadOnlySqlLedger(
ledgerId: LedgerId,
headAtInitialization: Offset,
ledgerDao: LedgerReadDao,
dispatcher: Dispatcher[Offset],
)(implicit mat: Materializer)
extends BaseLedger(ledgerId, headAtInitialization, ledgerDao) {
extends BaseLedger(ledgerId, ledgerDao, dispatcher) {
private val (ledgerEndUpdateKillSwitch, ledgerEndUpdateDone) =
RestartSource
@ -124,11 +128,12 @@ private class ReadOnlySqlLedger(
override def close(): Unit = {
// Terminate the dispatcher first so that it doesn't trigger new queries.
super.close()
dispatcher.close()
deduplicationCleanupKillSwitch.shutdown()
ledgerEndUpdateKillSwitch.shutdown()
Await.result(deduplicationCleanupDone, 10.seconds)
Await.result(ledgerEndUpdateDone, 10.seconds)
()
super.close()
}
}

View File

@ -53,7 +53,7 @@ object SandboxIndexAndWriteService {
private val logger = LoggerFactory.getLogger(SandboxIndexAndWriteService.getClass)
def postgres(
ledgerId: LedgerIdMode,
initialLedgerId: LedgerIdMode,
participantId: ParticipantId,
jdbcUrl: String,
initialConfig: ParticipantState.Configuration,
@ -68,28 +68,26 @@ object SandboxIndexAndWriteService {
metrics: Metrics,
lfValueTranslationCache: LfValueTranslation.Cache,
)(implicit mat: Materializer, logCtx: LoggingContext): ResourceOwner[IndexAndWriteService] =
SqlLedger
.owner(
serverRole = ServerRole.Sandbox,
jdbcUrl = jdbcUrl,
ledgerId = ledgerId,
participantId = participantId,
timeProvider = timeProvider,
acs = acs,
packages = templateStore,
initialLedgerEntries = ledgerEntries,
queueDepth = queueDepth,
transactionCommitter = transactionCommitter,
startMode = startMode,
eventsPageSize = eventsPageSize,
metrics = metrics,
lfValueTranslationCache
)
.flatMap(ledger =>
owner(MeteredLedger(ledger, metrics), participantId, initialConfig, timeProvider))
new SqlLedger.Owner(
serverRole = ServerRole.Sandbox,
jdbcUrl = jdbcUrl,
initialLedgerId = initialLedgerId,
participantId = participantId,
timeProvider = timeProvider,
acs = acs,
packages = templateStore,
initialLedgerEntries = ledgerEntries,
queueDepth = queueDepth,
transactionCommitter = transactionCommitter,
startMode = startMode,
eventsPageSize = eventsPageSize,
metrics = metrics,
lfValueTranslationCache
).flatMap(ledger =>
owner(MeteredLedger(ledger, metrics), participantId, initialConfig, timeProvider))
def inMemory(
ledgerId: LedgerIdMode,
initialLedgerId: LedgerIdMode,
participantId: ParticipantId,
intialConfig: ParticipantState.Configuration,
timeProvider: TimeProvider,
@ -101,7 +99,7 @@ object SandboxIndexAndWriteService {
)(implicit mat: Materializer): ResourceOwner[IndexAndWriteService] = {
val ledger =
new InMemoryLedger(
ledgerId.or(LedgerIdGenerator.generateRandomId()),
initialLedgerId.or(LedgerIdGenerator.generateRandomId()),
participantId,
timeProvider,
acs,

View File

@ -23,19 +23,21 @@ import com.daml.lf.transaction.TransactionCommitter
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.platform.ApiOffset.ApiOffsetConverter
import com.daml.platform.akkastreams.dispatcher.Dispatcher
import com.daml.platform.common.{LedgerIdMismatchException, LedgerIdMode}
import com.daml.platform.configuration.ServerRole
import com.daml.platform.packages.InMemoryPackageStore
import com.daml.platform.sandbox.LedgerIdGenerator
import com.daml.platform.sandbox.stores.InMemoryActiveLedgerState
import com.daml.platform.sandbox.stores.ledger.ScenarioLoader.LedgerEntryOrBump
import com.daml.platform.sandbox.stores.ledger.sql.SqlLedger._
import com.daml.platform.sandbox.stores.ledger.{Ledger, SandboxOffset}
import com.daml.platform.store.dao.events.LfValueTranslation
import com.daml.platform.store.dao.{JdbcLedgerDao, LedgerDao}
import com.daml.platform.store.entries.{LedgerEntry, PackageLedgerEntry, PartyLedgerEntry}
import com.daml.platform.store.{BaseLedger, FlywayMigrations}
import com.daml.resources.ProgramResource.StartupException
import com.daml.resources.ResourceOwner
import com.daml.resources.{Resource, ResourceOwner}
import scala.collection.immutable.Queue
import scala.concurrent.{ExecutionContext, Future}
@ -44,13 +46,13 @@ import scala.util.{Failure, Success}
object SqlLedger {
private type PersistentQueue = SourceQueueWithComplete[Offset => Future[Unit]]
private type PersistenceQueue = SourceQueueWithComplete[Offset => Future[Unit]]
def owner(
final class Owner(
serverRole: ServerRole,
// jdbcUrl must have the user/password encoded in form of: "jdbc:postgresql://localhost/test?user=fred&password=secret"
jdbcUrl: String,
ledgerId: LedgerIdMode,
initialLedgerId: LedgerIdMode,
participantId: ParticipantId,
timeProvider: TimeProvider,
acs: InMemoryActiveLedgerState,
@ -61,80 +63,223 @@ object SqlLedger {
startMode: SqlStartMode = SqlStartMode.ContinueIfExists,
eventsPageSize: Int,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslation.Cache
)(implicit mat: Materializer, logCtx: LoggingContext): ResourceOwner[Ledger] =
for {
_ <- ResourceOwner.forFuture(() => new FlywayMigrations(jdbcUrl).migrate()(DEC))
ledgerDao <- JdbcLedgerDao.validatingWriteOwner(
lfValueTranslationCache: LfValueTranslation.Cache,
)(implicit mat: Materializer, logCtx: LoggingContext)
extends ResourceOwner[Ledger] {
private val logger = ContextualizedLogger.get(this.getClass)
override def acquire()(implicit executionContext: ExecutionContext): Resource[Ledger] =
for {
_ <- Resource.fromFuture(new FlywayMigrations(jdbcUrl).migrate())
ledgerDao <- ledgerDaoOwner().acquire()
_ <- startMode match {
case SqlStartMode.AlwaysReset =>
Resource.fromFuture(ledgerDao.reset())
case SqlStartMode.ContinueIfExists =>
Resource.unit
}
ledgerId <- Resource.fromFuture(initialize(ledgerDao))
ledgerEnd <- Resource.fromFuture(ledgerDao.lookupLedgerEnd())
ledgerConfig <- Resource.fromFuture(ledgerDao.lookupLedgerConfiguration())
dispatcher <- dispatcherOwner(ledgerEnd).acquire()
persistenceQueue <- new PersistenceQueueOwner(dispatcher).acquire()
// Close the dispatcher before the persistence queue.
_ <- Resource(Future.unit)(_ => Future.successful(dispatcher.close()))
ledger <- sqlLedgerOwner(
ledgerId,
ledgerConfig.map(_._2),
ledgerDao,
dispatcher,
persistenceQueue).acquire()
} yield ledger
private def initialize(
ledgerDao: LedgerDao,
)(implicit executionContext: ExecutionContext): Future[LedgerId] = {
// Note that here we only store the ledger entry and we do not update anything else, such as the
// headRef. This is OK since this initialization
// step happens before we start up the sql ledger at all, so it's running in isolation.
for {
currentLedgerId <- ledgerDao.lookupLedgerId()
initializationRequired = currentLedgerId.isEmpty
ledgerId <- (currentLedgerId, initialLedgerId) match {
case (Some(foundLedgerId), LedgerIdMode.Static(initialId))
if foundLedgerId == initialId =>
ledgerFound(foundLedgerId, initialLedgerEntries, packages)
case (Some(foundLedgerId), LedgerIdMode.Static(initialId)) =>
Future.failed(
new LedgerIdMismatchException(foundLedgerId, initialId) with StartupException)
case (Some(foundLedgerId), LedgerIdMode.Dynamic) =>
ledgerFound(foundLedgerId, initialLedgerEntries, packages)
case (None, LedgerIdMode.Static(initialId)) =>
Future.successful(initialId)
case (None, LedgerIdMode.Dynamic) =>
val randomLedgerId = LedgerIdGenerator.generateRandomId()
Future.successful(randomLedgerId)
}
_ <- if (initializationRequired) {
logger.info(s"Initializing ledger with ID: $ledgerId")
for {
_ <- ledgerDao.initializeLedger(ledgerId)
_ <- initializeLedgerEntries(
initialLedgerEntries,
timeProvider,
packages,
acs,
ledgerDao,
participantId,
)
} yield ()
} else {
Future.unit
}
} yield ledgerId
}
private def ledgerFound(
foundLedgerId: LedgerId,
initialLedgerEntries: ImmArray[LedgerEntryOrBump],
packages: InMemoryPackageStore,
): Future[LedgerId] = {
logger.info(s"Found existing ledger with ID: $foundLedgerId")
if (initialLedgerEntries.nonEmpty) {
logger.warn(
s"Initial ledger entries provided, presumably from scenario, but there is an existing database, and thus they will not be used.")
}
if (packages.listLfPackagesSync().nonEmpty) {
logger.warn(
s"Initial packages provided, presumably as command line arguments, but there is an existing database, and thus they will not be used.")
}
Future.successful(foundLedgerId)
}
private def initializeLedgerEntries(
initialLedgerEntries: ImmArray[LedgerEntryOrBump],
timeProvider: TimeProvider,
packages: InMemoryPackageStore,
acs: InMemoryActiveLedgerState,
ledgerDao: LedgerDao,
participantId: ParticipantId,
)(implicit executionContext: ExecutionContext): Future[Unit] = {
if (initialLedgerEntries.nonEmpty) {
logger.info(s"Initializing ledger with ${initialLedgerEntries.length} ledger entries.")
}
val (ledgerEnd, ledgerEntries) =
initialLedgerEntries.foldLeft((1L, Vector.empty[(Offset, LedgerEntry)])) {
case ((offset, entries), entryOrBump) =>
entryOrBump match {
case LedgerEntryOrBump.Entry(entry) =>
(offset + 1, entries :+ SandboxOffset.toOffset(offset + 1) -> entry)
case LedgerEntryOrBump.Bump(increment) =>
(offset + increment, entries)
}
}
for {
_ <- copyPackages(
packages,
ledgerDao,
timeProvider.getCurrentTime,
SandboxOffset.toOffset(ledgerEnd),
)
_ <- ledgerDao.storeInitialState(ledgerEntries, SandboxOffset.toOffset(ledgerEnd))
} yield ()
}
private def copyPackages(
store: InMemoryPackageStore,
ledgerDao: LedgerDao,
knownSince: Instant,
newLedgerEnd: Offset,
): Future[Unit] = {
val packageDetails = store.listLfPackagesSync()
if (packageDetails.nonEmpty) {
logger.info(s"Copying initial packages ${packageDetails.keys.mkString(",")}")
val packages = packageDetails.toList.map(pkg => {
val archive =
store.getLfArchiveSync(pkg._1).getOrElse(sys.error(s"Package ${pkg._1} not found"))
archive -> PackageDetails(archive.getPayload.size.toLong, knownSince, None)
})
ledgerDao
.storePackageEntry(newLedgerEnd, packages, None)
.transform(_ => (), e => sys.error("Failed to copy initial packages: " + e.getMessage))(
DEC)
} else {
Future.successful(())
}
}
private def ledgerDaoOwner(): ResourceOwner[LedgerDao] =
JdbcLedgerDao.validatingWriteOwner(
serverRole,
jdbcUrl,
eventsPageSize,
metrics,
lfValueTranslationCache)
ledger <- ResourceOwner.forFutureCloseable(
lfValueTranslationCache,
)
private def dispatcherOwner(ledgerEnd: Offset): ResourceOwner[Dispatcher[Offset]] =
Dispatcher.owner(
name = "sql-ledger",
zeroIndex = Offset.beforeBegin,
headAtInitialization = ledgerEnd,
)
private def sqlLedgerOwner(
ledgerId: LedgerId,
ledgerConfig: Option[Configuration],
ledgerDao: LedgerDao,
dispatcher: Dispatcher[Offset],
persistenceQueue: PersistenceQueue,
): ResourceOwner[SqlLedger] =
ResourceOwner.forCloseable(
() =>
new SqlLedgerFactory(ledgerDao).createSqlLedger(
new SqlLedger(
ledgerId,
participantId,
ledgerConfig,
ledgerDao,
dispatcher,
timeProvider,
startMode,
acs,
packages,
initialLedgerEntries,
queueDepth,
persistenceQueue,
transactionCommitter,
))
} yield ledger
}
private final class SqlLedger(
ledgerId: LedgerId,
participantId: ParticipantId,
headAtInitialization: Offset,
configAtInitialization: Option[Configuration],
ledgerDao: LedgerDao,
timeProvider: TimeProvider,
packages: InMemoryPackageStore,
queueDepth: Int,
transactionCommitter: TransactionCommitter,
)(implicit mat: Materializer, logCtx: LoggingContext)
extends BaseLedger(ledgerId, headAtInitialization, ledgerDao)
with Ledger {
private final class PersistenceQueueOwner(dispatcher: Dispatcher[Offset])
extends ResourceOwner[PersistenceQueue] {
override def acquire()(
implicit executionContext: ExecutionContext
): Resource[PersistenceQueue] =
Resource(Future.successful {
val queue =
Source.queue[Offset => Future[Unit]](queueDepth, OverflowStrategy.dropNew)
import SqlLedger._
// By default we process the requests in batches when under pressure (see semantics of `batch`). Note
// that this is safe on the read end because the readers rely on the dispatchers to know the
// ledger end, and not the database itself. This means that they will not start reading from the new
// ledger end until we tell them so, which we do when _all_ the entries have been committed.
val persistenceQueue = queue
.batch(1, Queue(_))(_.enqueue(_))
.mapAsync(1)(persistAll)
.toMat(Sink.ignore)(Keep.left[PersistenceQueue, Future[Done]])
.run()
watchForFailures(persistenceQueue)
persistenceQueue
})(queue => Future.successful(queue.complete()))
private val logger = ContextualizedLogger.get(this.getClass)
// the reason for modelling persistence as a reactive pipeline is to avoid having race-conditions between the
// moving ledger-end, the async persistence operation and the dispatcher head notification
private val persistenceQueue: PersistentQueue = createQueue()
watchForFailures(persistenceQueue, "persistence")
private def watchForFailures(queue: SourceQueueWithComplete[_], name: String): Unit =
queue
.watchCompletion()
.failed
.foreach { throwable =>
logger.error(s"$name queue has been closed with a failure!", throwable)
}(DEC)
private def createQueue(): PersistentQueue = {
implicit val ec: ExecutionContext = DEC
val persistenceQueue =
Source.queue[Offset => Future[Unit]](queueDepth, OverflowStrategy.dropNew)
// By default we process the requests in batches when under pressure (see semantics of `batch`). Note
// that this is safe on the read end because the readers rely on the dispatchers to know the
// ledger end, and not the database itself. This means that they will not start reading from the new
// ledger end until we tell them so, which we do when _all_ the entries have been committed.
persistenceQueue
.batch(1, Queue(_))(_.enqueue(_))
.mapAsync(1) { queue =>
private def persistAll(queue: Queue[Offset => Future[Unit]]): Future[Unit] = {
implicit val ec: ExecutionContext = DEC
val startOffset = SandboxOffset.fromOffset(dispatcher.getHead())
// we can only do this because there is no parallelism here!
//shooting the SQL queries in parallel
// This will attempt to run the SQL queries concurrently, but there is no parallelism here,
// so they will still run sequentially.
Future
.sequence(queue.toIterator.zipWithIndex.map {
case (persist, i) =>
@ -142,20 +287,45 @@ private final class SqlLedger(
persist(SandboxOffset.toOffset(offset))
})
.map { _ =>
//note that we can have holes in offsets in case of the storing of an entry failed for some reason
dispatcher.signalNewHead(SandboxOffset.toOffset(startOffset + queue.length)) //signalling downstream subscriptions
// note that we can have holes in offsets in case of the storing of an entry failed for some reason
dispatcher.signalNewHead(
SandboxOffset
.toOffset(startOffset + queue.length)) // signalling downstream subscriptions
}
}
.toMat(Sink.ignore)(Keep.left[PersistentQueue, Future[Done]])
.run()
}
private def watchForFailures(queue: SourceQueueWithComplete[_]): Unit =
queue
.watchCompletion()
.failed
.foreach { throwable =>
logger.error("Persistence queue has been closed with a failure.", throwable)
}(DEC)
}
}
private final class SqlLedger(
ledgerId: LedgerId,
participantId: ParticipantId,
configAtInitialization: Option[Configuration],
ledgerDao: LedgerDao,
dispatcher: Dispatcher[Offset],
timeProvider: TimeProvider,
packages: InMemoryPackageStore,
persistenceQueue: PersistenceQueue,
transactionCommitter: TransactionCommitter,
)(implicit mat: Materializer, logCtx: LoggingContext)
extends BaseLedger(ledgerId, ledgerDao, dispatcher)
with Ledger {
private val logger = ContextualizedLogger.get(this.getClass)
override def currentHealth(): HealthStatus = ledgerDao.currentHealth()
override def close(): Unit = {
override def close(): Unit =
super.close()
persistenceQueue.complete()
}
// Note: ledger entries are written in batches, and this variable is updated while writing a ledger configuration
// changed entry. Transactions written around the same time as a configuration change entry might not use the correct
@ -335,200 +505,3 @@ private final class SqlLedger(
}(DEC)
}
}
private final class SqlLedgerFactory(ledgerDao: LedgerDao)(implicit logCtx: LoggingContext) {
private val logger = ContextualizedLogger.get(this.getClass)
/** *
* Creates a DB backed Ledger implementation.
*
* @param initialLedgerId a random ledger id is generated if none given, if set it's used to initialize the ledger.
* In case the ledger had already been initialized, the given ledger id must not be set or must
* be equal to the one in the database.
* @param participantId the participant identifier
* @param timeProvider to get the current time when sequencing transactions
* @param startMode whether we should start with a clean state or continue where we left off
* @param initialLedgerEntries The initial ledger entries -- usually provided by the scenario runner. Will only be
* used if starting from a fresh database.
* @param queueDepth the depth of the buffer for persisting entries. When gets full, the system will signal back-pressure
* upstream
* @return a compliant Ledger implementation
*/
def createSqlLedger(
initialLedgerId: LedgerIdMode,
participantId: ParticipantId,
timeProvider: TimeProvider,
startMode: SqlStartMode,
acs: InMemoryActiveLedgerState,
packages: InMemoryPackageStore,
initialLedgerEntries: ImmArray[LedgerEntryOrBump],
queueDepth: Int,
transactionCommitter: TransactionCommitter,
)(implicit mat: Materializer): Future[SqlLedger] = {
implicit val ec: ExecutionContext = DEC
def init(): Future[LedgerId] = startMode match {
case SqlStartMode.AlwaysReset =>
for {
_ <- reset()
ledgerId <- initialize(
initialLedgerId,
participantId,
timeProvider,
acs,
packages,
initialLedgerEntries,
)
} yield ledgerId
case SqlStartMode.ContinueIfExists =>
initialize(
initialLedgerId,
participantId,
timeProvider,
acs,
packages,
initialLedgerEntries,
)
}
for {
ledgerId <- init()
ledgerEnd <- ledgerDao.lookupLedgerEnd()
ledgerConfig <- ledgerDao.lookupLedgerConfiguration()
} yield
new SqlLedger(
ledgerId,
participantId,
ledgerEnd,
ledgerConfig.map(_._2),
ledgerDao,
timeProvider,
packages,
queueDepth,
transactionCommitter,
)
}
private def reset(): Future[Unit] =
ledgerDao.reset()
private def initialize(
initialLedgerId: LedgerIdMode,
participantId: ParticipantId,
timeProvider: TimeProvider,
acs: InMemoryActiveLedgerState,
packages: InMemoryPackageStore,
initialLedgerEntries: ImmArray[LedgerEntryOrBump],
): Future[LedgerId] = {
// Note that here we only store the ledger entry and we do not update anything else, such as the
// headRef. This is OK since this initialization
// step happens before we start up the sql ledger at all, so it's running in isolation.
implicit val ec: ExecutionContext = DEC
for {
currentLedgerId <- ledgerDao.lookupLedgerId()
initializationRequired = currentLedgerId.isEmpty
ledgerId <- (currentLedgerId, initialLedgerId) match {
case (Some(foundLedgerId), LedgerIdMode.Static(initialId)) if foundLedgerId == initialId =>
ledgerFound(foundLedgerId, initialLedgerEntries, packages)
case (Some(foundLedgerId), LedgerIdMode.Static(initialId)) =>
Future.failed(
new LedgerIdMismatchException(foundLedgerId, initialId) with StartupException)
case (Some(foundLedgerId), LedgerIdMode.Dynamic) =>
ledgerFound(foundLedgerId, initialLedgerEntries, packages)
case (None, LedgerIdMode.Static(initialId)) =>
Future.successful(initialId)
case (None, LedgerIdMode.Dynamic) =>
val randomLedgerId = LedgerIdGenerator.generateRandomId()
Future.successful(randomLedgerId)
}
_ <- if (initializationRequired) {
logger.info(s"Initializing ledger with ID: $ledgerId")
for {
_ <- ledgerDao.initializeLedger(ledgerId)
_ <- initializeLedgerEntries(
initialLedgerEntries,
timeProvider,
packages,
acs,
participantId)
} yield ()
} else {
Future.unit
}
} yield ledgerId
}
private def ledgerFound(
foundLedgerId: LedgerId,
initialLedgerEntries: ImmArray[LedgerEntryOrBump],
packages: InMemoryPackageStore,
): Future[LedgerId] = {
logger.info(s"Found existing ledger with ID: $foundLedgerId")
if (initialLedgerEntries.nonEmpty) {
logger.warn(
s"Initial ledger entries provided, presumably from scenario, but there is an existing database, and thus they will not be used.")
}
if (packages.listLfPackagesSync().nonEmpty) {
logger.warn(
s"Initial packages provided, presumably as command line arguments, but there is an existing database, and thus they will not be used.")
}
Future.successful(foundLedgerId)
}
private def initializeLedgerEntries(
initialLedgerEntries: ImmArray[LedgerEntryOrBump],
timeProvider: TimeProvider,
packages: InMemoryPackageStore,
acs: InMemoryActiveLedgerState,
participantId: ParticipantId,
)(implicit executionContext: ExecutionContext): Future[Unit] = {
if (initialLedgerEntries.nonEmpty) {
logger.info(s"Initializing ledger with ${initialLedgerEntries.length} ledger entries.")
}
val (ledgerEnd, ledgerEntries) =
initialLedgerEntries.foldLeft((1L, Vector.empty[(Offset, LedgerEntry)])) {
case ((offset, entries), entryOrBump) =>
entryOrBump match {
case LedgerEntryOrBump.Entry(entry) =>
(offset + 1, entries :+ SandboxOffset.toOffset(offset + 1) -> entry)
case LedgerEntryOrBump.Bump(increment) =>
(offset + increment, entries)
}
}
for {
_ <- copyPackages(packages, timeProvider.getCurrentTime, SandboxOffset.toOffset(ledgerEnd))
_ <- ledgerDao.storeInitialState(ledgerEntries, SandboxOffset.toOffset(ledgerEnd))
} yield ()
}
private def copyPackages(
store: InMemoryPackageStore,
knownSince: Instant,
newLedgerEnd: Offset): Future[Unit] = {
val packageDetails = store.listLfPackagesSync()
if (packageDetails.nonEmpty) {
logger.info(s"Copying initial packages ${packageDetails.keys.mkString(",")}")
val packages = packageDetails.toList.map(pkg => {
val archive =
store.getLfArchiveSync(pkg._1).getOrElse(sys.error(s"Package ${pkg._1} not found"))
archive -> PackageDetails(archive.getPayload.size.toLong, knownSince, None)
})
ledgerDao
.storePackageEntry(newLedgerEnd, packages, None)
.transform(_ => (), e => sys.error("Failed to copy initial packages: " + e.getMessage))(DEC)
} else {
Future.successful(())
}
}
}

View File

@ -7,16 +7,6 @@ import java.time.Instant
import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.ledger.participant.state.index.v2
import com.daml.ledger.participant.state.index.v2.CommandDeduplicationResult
import com.daml.ledger.participant.state.v1.{Configuration, Offset}
import com.daml.lf.archive.Decode
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.{Identifier, PackageId, Party}
import com.daml.lf.language.Ast
import com.daml.lf.transaction.Node
import com.daml.lf.value.Value
import com.daml.lf.value.Value.{ContractId, ContractInst}
import com.daml.daml_lf_dev.DamlLf
import com.daml.dec.DirectExecutionContext
import com.daml.ledger.TransactionId
@ -31,6 +21,16 @@ import com.daml.ledger.api.v1.transaction_service.{
GetTransactionTreesResponse,
GetTransactionsResponse
}
import com.daml.ledger.participant.state.index.v2
import com.daml.ledger.participant.state.index.v2.CommandDeduplicationResult
import com.daml.ledger.participant.state.v1.{Configuration, Offset}
import com.daml.lf.archive.Decode
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.{Identifier, PackageId, Party}
import com.daml.lf.language.Ast
import com.daml.lf.transaction.Node
import com.daml.lf.value.Value
import com.daml.lf.value.Value.{ContractId, ContractInst}
import com.daml.platform.akkastreams.dispatcher.Dispatcher
import com.daml.platform.akkastreams.dispatcher.SubSource.RangeSource
import com.daml.platform.store.dao.LedgerReadDao
@ -42,18 +42,12 @@ import scala.util.Try
abstract class BaseLedger(
val ledgerId: LedgerId,
headAtInitialization: Offset,
ledgerDao: LedgerReadDao)
extends ReadOnlyLedger {
ledgerDao: LedgerReadDao,
dispatcher: Dispatcher[Offset],
) extends ReadOnlyLedger {
implicit private val DEC: ExecutionContext = DirectExecutionContext
protected final val dispatcher: Dispatcher[Offset] = Dispatcher[Offset](
"sql-ledger",
Offset.beforeBegin,
headAtInitialization
)
override def currentHealth(): HealthStatus = ledgerDao.currentHealth()
override def lookupKey(key: Node.GlobalKey, forParty: Party): Future[Option[ContractId]] =
@ -174,7 +168,5 @@ abstract class BaseLedger(
override def stopDeduplicatingCommand(commandId: CommandId, submitter: Party): Future[Unit] =
ledgerDao.stopDeduplicatingCommand(commandId, submitter)
override def close(): Unit = {
dispatcher.close()
}
override def close(): Unit = ()
}

View File

@ -64,10 +64,10 @@ object LedgerResource {
new OwnedResource(
for {
database <- PostgresResource.owner()
ledger <- SqlLedger.owner(
ledger <- new SqlLedger.Owner(
serverRole = ServerRole.Testing(testClass),
jdbcUrl = database.url,
ledgerId = LedgerIdMode.Static(ledgerId),
initialLedgerId = LedgerIdMode.Static(ledgerId),
participantId = participantId,
timeProvider = timeProvider,
acs = InMemoryActiveLedgerState.empty,

View File

@ -175,26 +175,24 @@ class SqlLedgerSpec
): Future[Ledger] = {
metrics.getNames.forEach(name => { val _ = metrics.remove(name) })
val ledger = newLoggingContext { implicit logCtx =>
SqlLedger
.owner(
serverRole = ServerRole.Testing(getClass),
jdbcUrl = postgresDatabase.url,
ledgerId = ledgerId.fold[LedgerIdMode](LedgerIdMode.Dynamic)(LedgerIdMode.Static),
participantId = participantId,
timeProvider = TimeProvider.UTC,
acs = InMemoryActiveLedgerState.empty,
packages = InMemoryPackageStore.empty
.withPackages(Instant.EPOCH, None, packages)
.fold(sys.error, identity),
initialLedgerEntries = ImmArray.empty,
queueDepth = queueDepth,
transactionCommitter = LegacyTransactionCommitter,
startMode = SqlStartMode.ContinueIfExists,
eventsPageSize = 100,
metrics = new Metrics(metrics),
lfValueTranslationCache = LfValueTranslation.Cache.none,
)
.acquire()(system.dispatcher)
new SqlLedger.Owner(
serverRole = ServerRole.Testing(getClass),
jdbcUrl = postgresDatabase.url,
initialLedgerId = ledgerId.fold[LedgerIdMode](LedgerIdMode.Dynamic)(LedgerIdMode.Static),
participantId = participantId,
timeProvider = TimeProvider.UTC,
acs = InMemoryActiveLedgerState.empty,
packages = InMemoryPackageStore.empty
.withPackages(Instant.EPOCH, None, packages)
.fold(sys.error, identity),
initialLedgerEntries = ImmArray.empty,
queueDepth = queueDepth,
transactionCommitter = LegacyTransactionCommitter,
startMode = SqlStartMode.ContinueIfExists,
eventsPageSize = 100,
metrics = new Metrics(metrics),
lfValueTranslationCache = LfValueTranslation.Cache.none,
).acquire()(system.dispatcher)
}
createdLedgers += ledger
ledger.asFuture