participant-integration-api: Reuse the services execution context for data munging. (#8415)

* participant-integration-api: Reuse the services EC for data munging.

Rather than spawning a separate thread pool for manipulating data before
and after index queries, we can just re-use the services EC, which is
already a work-stealing thread pool.

* kvutils: Share the services EC with the indexer.

CHANGELOG_BEGIN
CHANGELOG_END
This commit is contained in:
Samir Talwar 2021-01-29 17:54:58 +01:00 committed by GitHub
parent ecf5f9b9c4
commit a325d00468
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 162 additions and 101 deletions

View File

@ -25,12 +25,11 @@ import com.daml.platform.configuration.{
PartyConfiguration,
ServerRole,
}
import com.daml.ports.{PortFiles}
import com.daml.platform.index.JdbcIndex
import com.daml.platform.packages.InMemoryPackageStore
import com.daml.platform.services.time.TimeProviderType
import com.daml.platform.store.dao.events.LfValueTranslation
import com.daml.ports.Port
import com.daml.ports.{Port, PortFiles}
import io.grpc.{BindableService, ServerInterceptor}
import scala.collection.immutable
@ -69,13 +68,14 @@ final class StandaloneApiServer(
val owner = for {
indexService <- JdbcIndex
.owner(
ServerRole.ApiServer,
domain.LedgerId(ledgerId),
participantId,
config.jdbcUrl,
config.eventsPageSize,
metrics,
lfValueTranslationCache,
serverRole = ServerRole.ApiServer,
ledgerId = domain.LedgerId(ledgerId),
participantId = participantId,
jdbcUrl = config.jdbcUrl,
eventsPageSize = config.eventsPageSize,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,
)
.map(index => new SpannedIndexService(new TimedIndexService(index, metrics)))
authorizer = new Authorizer(Clock.systemUTC.instant _, ledgerId, participantId)

View File

@ -13,6 +13,8 @@ import com.daml.metrics.Metrics
import com.daml.platform.configuration.ServerRole
import com.daml.platform.store.dao.events.LfValueTranslation
import scala.concurrent.ExecutionContext
private[platform] object JdbcIndex {
def owner(
serverRole: ServerRole,
@ -20,16 +22,18 @@ private[platform] object JdbcIndex {
participantId: ParticipantId,
jdbcUrl: String,
eventsPageSize: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslation.Cache,
)(implicit mat: Materializer, loggingContext: LoggingContext): ResourceOwner[IndexService] =
new ReadOnlySqlLedger.Owner(
serverRole,
jdbcUrl,
ledgerId,
eventsPageSize,
metrics,
lfValueTranslationCache,
serverRole = serverRole,
jdbcUrl = jdbcUrl,
initialLedgerId = ledgerId,
eventsPageSize = eventsPageSize,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,
).map { ledger =>
new LedgerBackedIndexService(MeteredReadOnlyLedger(ledger, metrics), participantId)
}

View File

@ -38,13 +38,14 @@ private[platform] object ReadOnlySqlLedger {
jdbcUrl: String,
initialLedgerId: LedgerId,
eventsPageSize: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslation.Cache,
)(implicit mat: Materializer, loggingContext: LoggingContext)
extends ResourceOwner[ReadOnlyLedger] {
override def acquire()(implicit context: ResourceContext): Resource[ReadOnlyLedger] =
for {
ledgerDao <- ledgerDaoOwner().acquire()
ledgerDao <- ledgerDaoOwner(servicesExecutionContext).acquire()
ledgerId <- Resource.fromFuture(verifyLedgerId(ledgerDao, initialLedgerId))
ledgerEnd <- Resource.fromFuture(ledgerDao.lookupLedgerEnd())
dispatcher <- dispatcherOwner(ledgerEnd).acquire()
@ -94,11 +95,14 @@ private[platform] object ReadOnlySqlLedger {
}
private def ledgerDaoOwner(): ResourceOwner[LedgerReadDao] =
private def ledgerDaoOwner(
servicesExecutionContext: ExecutionContext
): ResourceOwner[LedgerReadDao] =
JdbcLedgerDao.readOwner(
serverRole,
jdbcUrl,
eventsPageSize,
servicesExecutionContext,
metrics,
lfValueTranslationCache,
)

View File

@ -23,13 +23,14 @@ import com.daml.platform.store.dao.events.LfValueTranslation
import com.daml.platform.store.dao.{JdbcLedgerDao, LedgerDao}
import com.daml.platform.store.{DbType, FlywayMigrations}
import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal
object JdbcIndexer {
private[daml] final class Factory private[indexer] (
config: IndexerConfig,
readService: ReadService,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
updateFlowOwnerBuilder: ExecuteUpdate.FlowOwnerBuilder,
ledgerDaoOwner: ResourceOwner[LedgerDao],
@ -40,18 +41,21 @@ object JdbcIndexer {
serverRole: ServerRole,
config: IndexerConfig,
readService: ReadService,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslation.Cache,
)(implicit materializer: Materializer, loggingContext: LoggingContext) =
this(
config,
readService,
servicesExecutionContext,
metrics,
ExecuteUpdate.owner,
JdbcLedgerDao.writeOwner(
serverRole,
config.jdbcUrl,
config.eventsPageSize,
servicesExecutionContext,
metrics,
lfValueTranslationCache,
jdbcAsyncCommits = true,

View File

@ -11,9 +11,12 @@ import com.daml.metrics.Metrics
import com.daml.platform.configuration.ServerRole
import com.daml.platform.store.dao.events.LfValueTranslation
import scala.concurrent.ExecutionContext
final class StandaloneIndexerServer(
readService: ReadService,
config: IndexerConfig,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslation.Cache,
)(implicit materializer: Materializer, loggingContext: LoggingContext)
@ -26,6 +29,7 @@ final class StandaloneIndexerServer(
ServerRole.Indexer,
config,
readService,
servicesExecutionContext,
metrics,
lfValueTranslationCache,
)

View File

@ -33,11 +33,15 @@ object IndexMetadata {
} yield metadata(ledgerId, participantId, ledgerEnd)
}
private def ownDao(jdbcUrl: String)(implicit loggingContext: LoggingContext) =
private def ownDao(jdbcUrl: String)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
) =
JdbcLedgerDao.readOwner(
serverRole = ServerRole.ReadIndexMetadata,
jdbcUrl = jdbcUrl,
eventsPageSize = 1000,
servicesExecutionContext = executionContext,
metrics = new Metrics(new MetricRegistry),
lfValueTranslationCache = LfValueTranslation.Cache.none,
)

View File

@ -4,7 +4,6 @@ package com.daml.platform.store.dao
import java.sql.Connection
import java.time.Instant
import java.util.concurrent.Executors
import java.util.{Date, UUID}
import akka.NotUsed
@ -80,7 +79,7 @@ private class JdbcLedgerDao(
override val maxConcurrentConnections: Int,
dbDispatcher: DbDispatcher,
dbType: DbType,
executionContext: ExecutionContext,
servicesExecutionContext: ExecutionContext,
eventsPageSize: Int,
performPostCommitValidation: Boolean,
metrics: Metrics,
@ -456,7 +455,7 @@ private class JdbcLedgerDao(
.executeSql(metrics.daml.index.db.storeTransactionDbMetrics)(
preparedInsert.writeState(metrics)(_)
)
.map(_ => Ok)(executionContext)
.map(_ => Ok)(servicesExecutionContext)
override def storeTransactionEvents(
preparedInsert: PreparedInsert
@ -465,7 +464,7 @@ private class JdbcLedgerDao(
.executeSql(metrics.daml.index.db.storeTransactionDbMetrics)(
preparedInsert.writeEvents(metrics)(_)
)
.map(_ => Ok)(executionContext)
.map(_ => Ok)(servicesExecutionContext)
override def completeTransaction(
submitterInfo: Option[SubmitterInfo],
@ -635,7 +634,7 @@ private class JdbcLedgerDao(
.executeSql(metrics.daml.index.db.loadParties) { implicit conn =>
selectParties(parties)
}
.map(_.map(constructPartyDetails))(executionContext)
.map(_.map(constructPartyDetails))(servicesExecutionContext)
override def listKnownParties()(implicit
loggingContext: LoggingContext
@ -645,7 +644,7 @@ private class JdbcLedgerDao(
SQL_SELECT_ALL_PARTIES
.as(PartyDataParser.*)
}
.map(_.map(constructPartyDetails))(executionContext)
.map(_.map(constructPartyDetails))(servicesExecutionContext)
private val SQL_INSERT_PARTY =
SQL("""insert into parties(party, display_name, ledger_offset, explicit, is_local)
@ -686,7 +685,7 @@ private class JdbcLedgerDao(
d.sourceDescription,
)
).toMap
)(executionContext)
)(servicesExecutionContext)
override def getLfArchive(packageId: PackageId)(implicit
loggingContext: LoggingContext
@ -700,7 +699,7 @@ private class JdbcLedgerDao(
.as[Option[Array[Byte]]](SqlParser.byteArray("package").singleOpt)
}
.map(_.map(data => Archive.parseFrom(Decode.damlLfCodedInputStreamFromBytes(data))))(
executionContext
servicesExecutionContext
)
private val SQL_INSERT_PACKAGE_ENTRY_ACCEPT =
@ -940,14 +939,16 @@ private class JdbcLedgerDao(
override val transactionsReader: TransactionsReader =
new TransactionsReader(dbDispatcher, dbType, eventsPageSize, metrics, translation)(
executionContext
servicesExecutionContext
)
private val contractsReader: ContractsReader =
ContractsReader(dbDispatcher, dbType, metrics, lfValueTranslationCache)(executionContext)
ContractsReader(dbDispatcher, dbType, metrics, lfValueTranslationCache)(
servicesExecutionContext
)
override val completions: CommandCompletionsReader =
new CommandCompletionsReader(dbDispatcher, dbType, metrics, executionContext)
new CommandCompletionsReader(dbDispatcher, dbType, metrics, servicesExecutionContext)
private val postCommitValidation =
if (performPostCommitValidation)
@ -971,6 +972,7 @@ private[platform] object JdbcLedgerDao {
serverRole: ServerRole,
jdbcUrl: String,
eventsPageSize: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslation.Cache,
)(implicit loggingContext: LoggingContext): ResourceOwner[LedgerReadDao] = {
@ -981,6 +983,7 @@ private[platform] object JdbcLedgerDao {
maxConnections,
eventsPageSize,
validate = false,
servicesExecutionContext,
metrics,
lfValueTranslationCache,
jdbcAsyncCommits = false,
@ -992,6 +995,7 @@ private[platform] object JdbcLedgerDao {
serverRole: ServerRole,
jdbcUrl: String,
eventsPageSize: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslation.Cache,
jdbcAsyncCommits: Boolean,
@ -1005,6 +1009,7 @@ private[platform] object JdbcLedgerDao {
maxConnections,
eventsPageSize,
validate = false,
servicesExecutionContext,
metrics,
lfValueTranslationCache,
jdbcAsyncCommits = jdbcAsyncCommits && dbType.supportsAsynchronousCommits,
@ -1016,6 +1021,7 @@ private[platform] object JdbcLedgerDao {
serverRole: ServerRole,
jdbcUrl: String,
eventsPageSize: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslation.Cache,
validatePartyAllocation: Boolean = false,
@ -1029,6 +1035,7 @@ private[platform] object JdbcLedgerDao {
maxConnections,
eventsPageSize,
validate = true,
servicesExecutionContext,
metrics,
lfValueTranslationCache,
validatePartyAllocation,
@ -1067,6 +1074,7 @@ private[platform] object JdbcLedgerDao {
maxConnections: Int,
eventsPageSize: Int,
validate: Boolean,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslation.Cache,
validatePartyAllocation: Boolean = false,
@ -1082,12 +1090,11 @@ private[platform] object JdbcLedgerDao {
metrics,
jdbcAsyncCommits,
)
executor <- ResourceOwner.forExecutorService(() => Executors.newWorkStealingPool())
} yield new JdbcLedgerDao(
maxConnections,
dbDispatcher,
DbType.jdbcType(jdbcUrl),
ExecutionContext.fromExecutor(executor),
servicesExecutionContext,
eventsPageSize,
validate,
metrics,

View File

@ -44,6 +44,7 @@ private[dao] trait JdbcLedgerDaoBackend extends AkkaBeforeAndAfterAll {
serverRole = ServerRole.Testing(getClass),
jdbcUrl = jdbcUrl,
eventsPageSize = eventsPageSize,
servicesExecutionContext = executionContext,
metrics = new Metrics(new MetricRegistry),
lfValueTranslationCache = LfValueTranslation.Cache.none,
jdbcAsyncCommits = true,

View File

@ -27,6 +27,7 @@ private[dao] trait JdbcLedgerDaoPostCommitValidationSpec extends LoneElement {
serverRole = ServerRole.Testing(getClass),
jdbcUrl = jdbcUrl,
eventsPageSize = eventsPageSize,
servicesExecutionContext = executionContext,
metrics = new Metrics(new MetricRegistry),
lfValueTranslationCache = LfValueTranslation.Cache.none,
)

View File

@ -175,6 +175,7 @@ final class JdbcIndexerSpec
ServerRole.Indexer,
config.jdbcUrl,
config.eventsPageSize,
materializer.executionContext,
metrics,
LfValueTranslation.Cache.none,
jdbcAsyncCommits = true,
@ -182,6 +183,7 @@ final class JdbcIndexerSpec
new indexer.JdbcIndexer.Factory(
config = config,
readService = mockedReadService(),
servicesExecutionContext = materializer.executionContext,
metrics = metrics,
updateFlowOwnerBuilder =
mockedUpdateFlowOwnerBuilder(metrics, config.participantId, mockFlow),

View File

@ -128,6 +128,7 @@ final class Runner[T <: ReadWriteService, Extra](
new StandaloneIndexerServer(
readService = readService,
config = factory.indexerConfig(participantConfig, config),
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,
).acquire()

View File

@ -145,16 +145,12 @@ class IntegrityChecker[LogResult](
println(s"Starting to index ${readService.updateCount()} updates.".white)
newLoggingContext { implicit loggingContext =>
val feedHandleResourceOwner = for {
indexerFactory <- ResourceOwner
.forFuture(() =>
migrateAndStartIndexer(
createIndexerConfig(config),
readService,
metrics,
LfValueTranslation.Cache.none,
)
)
indexer <- indexerFactory
indexer <- migrateAndStartIndexer(
createIndexerConfig(config),
readService,
metrics,
LfValueTranslation.Cache.none,
)
feedHandle <- indexer.subscription(readService)
} yield (feedHandle, System.nanoTime())
@ -335,16 +331,24 @@ class IntegrityChecker[LogResult](
resourceContext: ResourceContext,
materializer: Materializer,
loggingContext: LoggingContext,
): Future[ResourceOwner[JdbcIndexer]] = {
val indexerFactory = new JdbcIndexer.Factory(
ServerRole.Indexer,
config,
readService,
metrics,
lfValueTranslationCache,
)
indexerFactory.migrateSchema(allowExistingSchema = false)
}
): ResourceOwner[JdbcIndexer] =
for {
servicesExecutionContext <- ResourceOwner
.forExecutorService(() => Executors.newWorkStealingPool())
.map(ExecutionContext.fromExecutorService)
indexerFactory = new JdbcIndexer.Factory(
ServerRole.Indexer,
config,
readService,
servicesExecutionContext,
metrics,
lfValueTranslationCache,
)
migrating <- ResourceOwner.forFuture(() =>
indexerFactory.migrateSchema(allowExistingSchema = false)
)
migrated <- migrating
} yield migrated
}
object IntegrityChecker {

View File

@ -6,6 +6,7 @@ package com.daml.platform.indexer
import java.time.Instant
import java.time.temporal.ChronoUnit.SECONDS
import java.util.UUID
import java.util.concurrent.Executors
import akka.actor.ActorSystem
import akka.stream.Materializer
@ -38,8 +39,8 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec
import scala.compat.java8.FutureConverters._
import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try
class RecoveringIndexerIntegrationSpec
@ -198,15 +199,19 @@ class RecoveringIndexerIntegrationSpec
for {
actorSystem <- ResourceOwner.forActorSystem(() => ActorSystem())
materializer <- ResourceOwner.forMaterializer(() => Materializer(actorSystem))
servicesExecutionContext <- ResourceOwner
.forExecutorService(() => Executors.newWorkStealingPool())
.map(ExecutionContext.fromExecutorService)
participantState <- newParticipantState(ledgerId, participantId)(materializer, loggingContext)
_ <- new StandaloneIndexerServer(
readService = participantState,
config = IndexerConfig(
participantId,
jdbcUrl,
participantId = participantId,
jdbcUrl = jdbcUrl,
startupMode = IndexerStartupMode.MigrateAndStart,
restartDelay = restartDelay,
),
servicesExecutionContext = servicesExecutionContext,
metrics = new Metrics(new MetricRegistry),
lfValueTranslationCache = LfValueTranslation.Cache.none,
)(materializer, loggingContext)
@ -220,6 +225,7 @@ class RecoveringIndexerIntegrationSpec
serverRole = ServerRole.Testing(getClass),
jdbcUrl = jdbcUrl,
eventsPageSize = 100,
servicesExecutionContext = executionContext,
metrics = new Metrics(new MetricRegistry),
lfValueTranslationCache = LfValueTranslation.Cache.none,
jdbcAsyncCommits = true,

View File

@ -288,41 +288,48 @@ final class SandboxServer(
metrics = metrics,
)
val (ledgerType, indexAndWriteServiceResourceOwner) = config.jdbcUrl match {
case Some(jdbcUrl) =>
"postgres" -> SandboxIndexAndWriteService.postgres(
name = name,
providedLedgerId = config.ledgerIdMode,
participantId = config.participantId,
jdbcUrl = jdbcUrl,
timeProvider = timeProvider,
ledgerEntries = ledgerEntries,
startMode = startMode,
queueDepth = config.commandConfig.maxParallelSubmissions,
transactionCommitter = transactionCommitter,
templateStore = packageStore,
eventsPageSize = config.eventsPageSize,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,
validatePartyAllocation = !config.implicitPartyAllocation,
)
case None =>
"in-memory" -> SandboxIndexAndWriteService.inMemory(
name,
config.ledgerIdMode,
config.participantId,
timeProvider,
acs,
ledgerEntries,
transactionCommitter,
packageStore,
metrics,
)
val ledgerType = config.jdbcUrl match {
case Some(_) => "postgres"
case None => "in-memory"
}
for {
indexAndWriteService <- indexAndWriteServiceResourceOwner.acquire()
servicesExecutionContext <- ResourceOwner
.forExecutorService(() => Executors.newWorkStealingPool())
.map(ExecutionContext.fromExecutorService)
.acquire()
indexAndWriteService <- (config.jdbcUrl match {
case Some(jdbcUrl) =>
SandboxIndexAndWriteService.postgres(
name = name,
providedLedgerId = config.ledgerIdMode,
participantId = config.participantId,
jdbcUrl = jdbcUrl,
timeProvider = timeProvider,
ledgerEntries = ledgerEntries,
startMode = startMode,
queueDepth = config.commandConfig.maxParallelSubmissions,
transactionCommitter = transactionCommitter,
templateStore = packageStore,
eventsPageSize = config.eventsPageSize,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,
validatePartyAllocation = !config.implicitPartyAllocation,
)
case None =>
SandboxIndexAndWriteService.inMemory(
name,
config.ledgerIdMode,
config.participantId,
timeProvider,
acs,
ledgerEntries,
transactionCommitter,
packageStore,
metrics,
)
}).acquire()
ledgerId <- Resource.fromFuture(indexAndWriteService.indexService.getLedgerId())
authorizer = new Authorizer(
() => java.time.Clock.systemUTC.instant(),
@ -341,10 +348,6 @@ final class SandboxServer(
)
ledgerConfiguration = config.ledgerConfig
executionSequencerFactory <- new ExecutionSequencerFactoryOwner().acquire()
servicesExecutionContext <- ResourceOwner
.forExecutorService(() => Executors.newWorkStealingPool())
.map(ExecutionContext.fromExecutorService)
.acquire()
apiServicesOwner = new ApiServices.Owner(
participantId = config.participantId,
optWriteService = Some(new TimedWriteService(indexAndWriteService.writeService, metrics)),

View File

@ -29,8 +29,8 @@ import com.daml.platform.sandbox.stores.ledger.{Ledger, MeteredLedger}
import com.daml.platform.store.dao.events.LfValueTranslation
import org.slf4j.LoggerFactory
import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future}
private[sandbox] trait IndexAndWriteService {
def indexService: IndexService
@ -54,6 +54,7 @@ private[sandbox] object SandboxIndexAndWriteService {
transactionCommitter: TransactionCommitter,
templateStore: InMemoryPackageStore,
eventsPageSize: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslation.Cache,
validatePartyAllocation: Boolean = false,
@ -74,9 +75,10 @@ private[sandbox] object SandboxIndexAndWriteService {
transactionCommitter = transactionCommitter,
startMode = startMode,
eventsPageSize = eventsPageSize,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache,
validatePartyAllocation,
lfValueTranslationCache = lfValueTranslationCache,
validatePartyAllocation = validatePartyAllocation,
).flatMap(ledger => owner(MeteredLedger(ledger, metrics), participantId, timeProvider))
def inMemory(

View File

@ -28,6 +28,7 @@ import com.daml.platform.ApiOffset.ApiOffsetConverter
import com.daml.platform.akkastreams.dispatcher.Dispatcher
import com.daml.platform.common.{LedgerIdMode, MismatchException}
import com.daml.platform.configuration.ServerRole
import com.daml.platform.indexer.CurrentOffset
import com.daml.platform.packages.InMemoryPackageStore
import com.daml.platform.sandbox.LedgerIdGenerator
import com.daml.platform.sandbox.config.LedgerName
@ -40,7 +41,6 @@ import com.daml.platform.store.entries.{LedgerEntry, PackageLedgerEntry, PartyLe
import com.daml.platform.store.{BaseLedger, FlywayMigrations}
import com.daml.resources.ProgramResource.StartupException
import scalaz.Tag
import com.daml.platform.indexer.CurrentOffset
import scala.collection.immutable.Queue
import scala.concurrent.{ExecutionContext, Future}
@ -75,6 +75,7 @@ private[sandbox] object SqlLedger {
transactionCommitter: TransactionCommitter,
startMode: SqlStartMode = SqlStartMode.ContinueIfExists,
eventsPageSize: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslation.Cache,
validatePartyAllocation: Boolean = false,
@ -86,7 +87,7 @@ private[sandbox] object SqlLedger {
override def acquire()(implicit context: ResourceContext): Resource[Ledger] =
for {
_ <- Resource.fromFuture(new FlywayMigrations(jdbcUrl).migrate())
dao <- ledgerDaoOwner().acquire()
dao <- ledgerDaoOwner(servicesExecutionContext).acquire()
_ <- startMode match {
case SqlStartMode.AlwaysReset =>
Resource.fromFuture(dao.reset())
@ -221,11 +222,14 @@ private[sandbox] object SqlLedger {
}
}
private def ledgerDaoOwner(): ResourceOwner[LedgerDao] =
private def ledgerDaoOwner(
servicesExecutionContext: ExecutionContext
): ResourceOwner[LedgerDao] =
JdbcLedgerDao.validatingWriteOwner(
serverRole,
jdbcUrl,
eventsPageSize,
servicesExecutionContext,
metrics,
lfValueTranslationCache,
validatePartyAllocation,

View File

@ -3,6 +3,8 @@
package com.daml.platform.sandbox
import java.util.concurrent.Executors
import akka.stream.Materializer
import com.codahale.metrics.MetricRegistry
import com.daml.api.util.TimeProvider
@ -26,6 +28,8 @@ import com.daml.platform.sandbox.stores.ledger.sql.{SqlLedger, SqlStartMode}
import com.daml.platform.store.dao.events.LfValueTranslation
import com.daml.testing.postgresql.PostgresResource
import scala.concurrent.ExecutionContext
private[sandbox] object LedgerResource {
def inMemory(
@ -64,6 +68,9 @@ private[sandbox] object LedgerResource {
): Resource[Ledger] =
new OwnedResource(
for {
servicesExecutionContext <- ResourceOwner
.forExecutorService(() => Executors.newWorkStealingPool())
.map(ExecutionContext.fromExecutorService)
database <- PostgresResource.owner[ResourceContext]()
ledger <- new SqlLedger.Owner(
name = LedgerName(testClass.getSimpleName),
@ -78,6 +85,7 @@ private[sandbox] object LedgerResource {
transactionCommitter = StandardTransactionCommitter,
startMode = SqlStartMode.AlwaysReset,
eventsPageSize = 100,
servicesExecutionContext = servicesExecutionContext,
metrics = new Metrics(metrics),
lfValueTranslationCache = LfValueTranslation.Cache.none,
)

View File

@ -31,8 +31,8 @@ import com.daml.platform.store.dao.events.LfValueTranslation
import com.daml.platform.testing.LogCollector
import com.daml.testing.postgresql.PostgresAroundEach
import org.scalatest.concurrent.{AsyncTimeLimitedTests, Eventually, ScaledTimeSpans}
import org.scalatest.time.{Minute, Seconds, Span}
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.{Minute, Seconds, Span}
import org.scalatest.wordspec.AsyncWordSpec
import scala.collection.mutable
@ -300,6 +300,7 @@ final class SqlLedgerSpec
transactionCommitter = LegacyTransactionCommitter,
startMode = SqlStartMode.ContinueIfExists,
eventsPageSize = 100,
servicesExecutionContext = executionContext,
metrics = new Metrics(metrics),
lfValueTranslationCache = LfValueTranslation.Cache.none,
validatePartyAllocation = validatePartyAllocation,

View File

@ -180,6 +180,9 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] {
)
.map(_ => ())
}
servicesExecutionContext <- ResourceOwner
.forExecutorService(() => Executors.newWorkStealingPool())
.map(ExecutionContext.fromExecutorService)
_ <- new StandaloneIndexerServer(
readService = readService,
config = IndexerConfig(
@ -191,6 +194,7 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] {
eventsPageSize = config.eventsPageSize,
allowExistingSchema = true,
),
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,
)
@ -212,9 +216,6 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] {
authorizer,
)
}
servicesExecutionContext <- ResourceOwner
.forExecutorService(() => Executors.newWorkStealingPool())
.map(ExecutionContext.fromExecutorService)
apiServer <- new StandaloneApiServer(
ledgerId = ledgerId,
config = ApiServerConfig(