sandbox: Move server parameters into the Config objects. (#5413)

* sandbox: Move the events page size configuration value into config.

* sandbox: Pass `config` directly into JdbcIndexerFactory.

* sandbox: Reorder `eventsPageSize` before `metrics` in parameters.

* sandbox: Move `seeding` into `ApiServerConfig`.

CHANGELOG_BEGIN
CHANGELOG_END

* sandbox: Name all parameters of `JdbcLedgerDao.writeOwner`.

Co-Authored-By: stefano.baghino@digitalasset.com
This commit is contained in:
Samir Talwar 2020-04-03 13:20:05 +02:00 committed by GitHub
parent 6d9db3fea3
commit 4de360a47a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 69 additions and 47 deletions

View File

@ -10,8 +10,8 @@ import java.time.Duration
import com.daml.ledger.participant.state.v1.ParticipantId
import com.daml.ledger.participant.state.v1.SeedService.Seeding
import com.digitalasset.ledger.api.tls.TlsConfiguration
import com.digitalasset.platform.configuration.MetricsReporter
import com.digitalasset.platform.configuration.Readers._
import com.digitalasset.platform.configuration.{IndexConfiguration, MetricsReporter}
import com.digitalasset.ports.Port
import com.digitalasset.resources.ProgramResource.SuppressedStartupException
import com.digitalasset.resources.ResourceOwner
@ -51,8 +51,6 @@ object Config {
val DefaultMaxInboundMessageSize: Int = 4 * 1024 * 1024
val DefaultEventsPageSize = 1000
def default[Extra](extra: Extra): Config[Extra] =
Config(
ledgerId = None,
@ -62,7 +60,7 @@ object Config {
seeding = Seeding.Strong,
metricsReporter = None,
metricsReportingInterval = Duration.ofSeconds(10),
eventsPageSize = DefaultEventsPageSize,
eventsPageSize = IndexConfiguration.DefaultEventsPageSize,
extra = extra,
)
@ -140,7 +138,7 @@ object Config {
opt[Int]("events-page-size")
.optional()
.text(
s"Number of events fetched from the index for every round trip when serving streaming calls. Default is ${Config.DefaultEventsPageSize}.")
s"Number of events fetched from the index for every round trip when serving streaming calls. Default is ${IndexConfiguration.DefaultEventsPageSize}.")
.action((eventsPageSize, config) => config.copy(eventsPageSize = eventsPageSize))
private val seedingMap =

View File

@ -34,6 +34,7 @@ trait ConfigProvider[ExtraConfig] {
participantConfig.participantId,
jdbcUrl = participantConfig.serverJdbcUrl,
startupMode = IndexerStartupMode.MigrateAndStart,
eventsPageSize = config.eventsPageSize,
allowExistingSchema = participantConfig.allowExistingSchemaForIndex,
)
@ -48,7 +49,9 @@ trait ConfigProvider[ExtraConfig] {
jdbcUrl = participantConfig.serverJdbcUrl,
tlsConfig = config.tlsConfig,
maxInboundMessageSize = Config.DefaultMaxInboundMessageSize,
eventsPageSize = config.eventsPageSize,
portFile = participantConfig.portFile,
seeding = config.seeding,
)
def commandConfig(config: Config[ExtraConfig]): CommandConfiguration =

View File

@ -73,7 +73,6 @@ final class Runner[T <: ReadWriteService, Extra](
readService = readService,
config = factory.indexerConfig(participantConfig, config),
metrics = metricRegistry,
eventsPageSize = config.eventsPageSize,
).acquire()
_ <- new StandaloneApiServer(
config = factory.apiServerConfig(participantConfig, config),
@ -82,13 +81,11 @@ final class Runner[T <: ReadWriteService, Extra](
submissionConfig = factory.submissionConfig(config),
readService = readService,
writeService = writeService,
eventsPageSize = config.eventsPageSize,
authService = factory.authService(config),
transformIndexService =
service => new TimedIndexService(service, metricRegistry, IndexServicePrefix),
authService = factory.authService(config),
metrics = metricRegistry,
timeServiceBackend = factory.timeServiceBackend(config),
seeding = Some(config.seeding),
).acquire()
} yield ()
})

View File

@ -192,7 +192,6 @@ class RecoveringIndexerIntegrationSpec extends AsyncWordSpec with Matchers with
restartDelay = restartDelay,
),
metrics = new MetricRegistry,
eventsPageSize = 100,
)(materializer, logCtx)
} yield participantState
}
@ -200,7 +199,12 @@ class RecoveringIndexerIntegrationSpec extends AsyncWordSpec with Matchers with
private def index(implicit logCtx: LoggingContext): ResourceOwner[LedgerDao] = {
val jdbcUrl =
s"jdbc:h2:mem:${getClass.getSimpleName.toLowerCase}-$testId;db_close_delay=-1;db_close_on_exit=false"
JdbcLedgerDao.writeOwner(ServerRole.Testing(getClass), jdbcUrl, new MetricRegistry, 100)
JdbcLedgerDao.writeOwner(
serverRole = ServerRole.Testing(getClass),
jdbcUrl = jdbcUrl,
eventsPageSize = 100,
metrics = new MetricRegistry,
)
}
}

View File

@ -7,7 +7,9 @@ import java.io.File
import java.nio.file.Path
import com.daml.ledger.participant.state.v1.ParticipantId
import com.daml.ledger.participant.state.v1.SeedService.Seeding
import com.digitalasset.ledger.api.tls.TlsConfiguration
import com.digitalasset.platform.configuration.IndexConfiguration
import com.digitalasset.ports.Port
case class ApiServerConfig(
@ -18,5 +20,7 @@ case class ApiServerConfig(
jdbcUrl: String,
tlsConfig: Option[TlsConfiguration],
maxInboundMessageSize: Int,
eventsPageSize: Int = IndexConfiguration.DefaultEventsPageSize,
portFile: Option[Path],
seeding: Seeding,
)

View File

@ -12,7 +12,6 @@ import akka.stream.Materializer
import akka.stream.scaladsl.Sink
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.participant.state.index.v2.IndexService
import com.daml.ledger.participant.state.v1.SeedService.Seeding
import com.daml.ledger.participant.state.v1.{ParticipantId, ReadService, SeedService, WriteService}
import com.digitalasset.api.util.TimeProvider
import com.digitalasset.buildinfo.BuildInfo
@ -50,11 +49,9 @@ final class StandaloneApiServer(
readService: ReadService,
writeService: WriteService,
authService: AuthService,
eventsPageSize: Int,
transformIndexService: IndexService => IndexService = identity,
metrics: MetricRegistry,
timeServiceBackend: Option[TimeServiceBackend] = None,
seeding: Option[Seeding],
otherServices: immutable.Seq[BindableService] = immutable.Seq.empty,
otherInterceptors: List[ServerInterceptor] = List.empty,
engine: Engine = sharedEngine // allows sharing DAML engine with DAML-on-X participant
@ -84,8 +81,8 @@ final class StandaloneApiServer(
domain.LedgerId(initialConditions.ledgerId),
participantId,
config.jdbcUrl,
config.eventsPageSize,
metrics,
eventsPageSize,
)
.map(transformIndexService)
healthChecks = new HealthChecks(
@ -110,7 +107,7 @@ final class StandaloneApiServer(
optTimeServiceBackend = timeServiceBackend,
metrics = metrics,
healthChecks = healthChecks,
seedService = seeding.map(SeedService(_)),
seedService = Some(SeedService(config.seeding)),
)(mat, esf, logCtx)
.map(_.withServices(otherServices))
},

View File

@ -0,0 +1,10 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.platform.configuration
object IndexConfiguration {
val DefaultEventsPageSize: Int = 1000
}

View File

@ -22,11 +22,11 @@ object JdbcIndex {
ledgerId: LedgerId,
participantId: ParticipantId,
jdbcUrl: String,
metrics: MetricRegistry,
eventsPageSize: Int,
metrics: MetricRegistry,
)(implicit mat: Materializer, logCtx: LoggingContext): ResourceOwner[IndexService] =
ReadOnlySqlLedger
.owner(serverRole, jdbcUrl, ledgerId, metrics, eventsPageSize)
.owner(serverRole, jdbcUrl, ledgerId, eventsPageSize, metrics)
.map { ledger =>
new LedgerBackedIndexService(MeteredReadOnlyLedger(ledger, metrics), participantId) {
override def getLedgerConfiguration(): Source[v2.LedgerConfiguration, NotUsed] =

View File

@ -32,11 +32,11 @@ object ReadOnlySqlLedger {
serverRole: ServerRole,
jdbcUrl: String,
ledgerId: LedgerId,
metrics: MetricRegistry,
eventsPageSize: Int,
metrics: MetricRegistry,
)(implicit mat: Materializer, logCtx: LoggingContext): ResourceOwner[ReadOnlyLedger] =
for {
ledgerReadDao <- JdbcLedgerDao.readOwner(serverRole, jdbcUrl, metrics, eventsPageSize)
ledgerReadDao <- JdbcLedgerDao.readOwner(serverRole, jdbcUrl, eventsPageSize, metrics)
factory = new Factory(ledgerReadDao)
ledger <- ResourceOwner.forFutureCloseable(() => factory.createReadOnlySqlLedger(ledgerId))
} yield ledger

View File

@ -4,6 +4,7 @@
package com.digitalasset.platform.indexer
import com.daml.ledger.participant.state.v1.ParticipantId
import com.digitalasset.platform.configuration.IndexConfiguration
import com.digitalasset.platform.indexer.IndexerConfig._
import scala.concurrent.duration.{DurationInt, FiniteDuration}
@ -13,9 +14,12 @@ case class IndexerConfig(
jdbcUrl: String,
startupMode: IndexerStartupMode,
restartDelay: FiniteDuration = DefaultRestartDelay,
eventsPageSize: Int = IndexConfiguration.DefaultEventsPageSize,
allowExistingSchema: Boolean = false,
)
object IndexerConfig {
val DefaultRestartDelay: FiniteDuration = 10.seconds
}

View File

@ -34,11 +34,9 @@ import scala.util.control.NonFatal
final class JdbcIndexerFactory(
serverRole: ServerRole,
participantId: ParticipantId,
jdbcUrl: String,
config: IndexerConfig,
readService: ReadService,
metrics: MetricRegistry,
eventsPageSize: Int,
)(implicit materializer: Materializer, logCtx: LoggingContext) {
private val logger = ContextualizedLogger.get(this.getClass)
@ -46,14 +44,14 @@ final class JdbcIndexerFactory(
def validateSchema()(
implicit executionContext: ExecutionContext
): Future[ResourceOwner[JdbcIndexer]] =
new FlywayMigrations(jdbcUrl)
new FlywayMigrations(config.jdbcUrl)
.validate()
.map(_ => initialized())
def migrateSchema(allowExistingSchema: Boolean)(
implicit executionContext: ExecutionContext
): Future[ResourceOwner[JdbcIndexer]] =
new FlywayMigrations(jdbcUrl)
new FlywayMigrations(config.jdbcUrl)
.migrate(allowExistingSchema)
.map(_ => initialized())
@ -61,9 +59,14 @@ final class JdbcIndexerFactory(
implicit executionContext: ExecutionContext
): ResourceOwner[JdbcIndexer] =
for {
ledgerDao <- JdbcLedgerDao.writeOwner(serverRole, jdbcUrl, metrics, eventsPageSize)
ledgerDao <- JdbcLedgerDao.writeOwner(
serverRole,
config.jdbcUrl,
config.eventsPageSize,
metrics,
)
initialLedgerEnd <- ResourceOwner.forFuture(() => initializeLedger(ledgerDao))
} yield new JdbcIndexer(initialLedgerEnd, participantId, ledgerDao, metrics)
} yield new JdbcIndexer(initialLedgerEnd, config.participantId, ledgerDao, metrics)
private def initializeLedger(dao: LedgerDao)(
implicit executionContext: ExecutionContext,

View File

@ -16,7 +16,6 @@ final class StandaloneIndexerServer(
readService: ReadService,
config: IndexerConfig,
metrics: MetricRegistry,
eventsPageSize: Int,
)(implicit materializer: Materializer, logCtx: LoggingContext)
extends ResourceOwner[Unit] {
@ -25,11 +24,9 @@ final class StandaloneIndexerServer(
override def acquire()(implicit executionContext: ExecutionContext): Resource[Unit] = {
val indexerFactory = new JdbcIndexerFactory(
ServerRole.Indexer,
config.participantId,
config.jdbcUrl,
config,
readService,
metrics,
eventsPageSize,
)
val indexer = new RecoveringIndexer(materializer.system.scheduler, config.restartDelay)
config.startupMode match {

View File

@ -239,8 +239,8 @@ final class SandboxServer(
startMode,
config.commandConfig.maxCommandsInFlight * 2, // we can get commands directly as well on the submission service
packageStore,
metrics,
config.eventsPageSize,
metrics,
)
case None =>

View File

@ -61,8 +61,8 @@ object SandboxIndexAndWriteService {
startMode: SqlStartMode,
queueDepth: Int,
templateStore: InMemoryPackageStore,
metrics: MetricRegistry,
eventsPageSize: Int,
metrics: MetricRegistry,
)(implicit mat: Materializer, logCtx: LoggingContext): ResourceOwner[IndexAndWriteService] =
SqlLedger
.owner(
@ -77,8 +77,8 @@ object SandboxIndexAndWriteService {
initialConfig = initialConfig,
queueDepth = queueDepth,
startMode = startMode,
metrics = metrics,
eventsPageSize = eventsPageSize,
metrics = metrics,
)
.flatMap(ledger =>
owner(MeteredLedger(ledger, metrics), participantId, initialConfig, timeProvider))

View File

@ -57,12 +57,12 @@ object SqlLedger {
initialConfig: Configuration,
queueDepth: Int,
startMode: SqlStartMode = SqlStartMode.ContinueIfExists,
metrics: MetricRegistry,
eventsPageSize: Int,
metrics: MetricRegistry,
)(implicit mat: Materializer, logCtx: LoggingContext): ResourceOwner[Ledger] =
for {
_ <- ResourceOwner.forFuture(() => new FlywayMigrations(jdbcUrl).migrate()(DEC))
ledgerDao <- JdbcLedgerDao.writeOwner(serverRole, jdbcUrl, metrics, eventsPageSize)
ledgerDao <- JdbcLedgerDao.writeOwner(serverRole, jdbcUrl, eventsPageSize, metrics)
ledger <- ResourceOwner.forFutureCloseable(
() =>
new SqlLedgerFactory(ledgerDao).createSqlLedger(

View File

@ -157,10 +157,10 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] {
ParticipantId,
jdbcUrl = indexJdbcUrl,
startupMode = IndexerStartupMode.MigrateAndStart,
eventsPageSize = config.eventsPageSize,
allowExistingSchema = true,
),
metrics = metrics,
eventsPageSize = config.eventsPageSize,
)
authService = config.authService.getOrElse(AuthServiceWildcard)
promise = Promise[Unit]
@ -189,7 +189,9 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] {
jdbcUrl = indexJdbcUrl,
tlsConfig = config.tlsConfig,
maxInboundMessageSize = config.maxInboundMessageSize,
eventsPageSize = config.eventsPageSize,
portFile = config.portFile,
seeding = seeding,
),
commandConfig = config.commandConfig,
partyConfig = config.partyConfig,
@ -197,11 +199,9 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] {
readService = readService,
writeService = writeService,
authService = authService,
eventsPageSize = config.eventsPageSize,
transformIndexService = new TimedIndexService(_, metrics, IndexServicePrefix),
metrics = metrics,
timeServiceBackend = timeServiceBackend,
seeding = Some(seeding),
otherServices = List(resetService),
otherInterceptors = List(resetService),
)

View File

@ -1626,24 +1626,24 @@ object JdbcLedgerDao {
def readOwner(
serverRole: ServerRole,
jdbcUrl: String,
metrics: MetricRegistry,
eventsPageSize: Int,
metrics: MetricRegistry,
)(implicit logCtx: LoggingContext): ResourceOwner[LedgerReadDao] = {
val maxConnections = DefaultNumberOfShortLivedConnections
owner(serverRole, jdbcUrl, maxConnections, metrics, eventsPageSize)
owner(serverRole, jdbcUrl, maxConnections, eventsPageSize, metrics)
.map(new MeteredLedgerReadDao(_, metrics))
}
def writeOwner(
serverRole: ServerRole,
jdbcUrl: String,
metrics: MetricRegistry,
eventsPageSize: Int,
metrics: MetricRegistry,
)(implicit logCtx: LoggingContext): ResourceOwner[LedgerDao] = {
val dbType = DbType.jdbcType(jdbcUrl)
val maxConnections =
if (dbType.supportsParallelWrites) DefaultNumberOfShortLivedConnections else 1
owner(serverRole, jdbcUrl, maxConnections, metrics, eventsPageSize)
owner(serverRole, jdbcUrl, maxConnections, eventsPageSize, metrics)
.map(new MeteredLedgerDao(_, metrics))
}
@ -1651,8 +1651,8 @@ object JdbcLedgerDao {
serverRole: ServerRole,
jdbcUrl: String,
maxConnections: Int,
metrics: MetricRegistry,
eventsPageSize: Int,
metrics: MetricRegistry,
)(implicit logCtx: LoggingContext): ResourceOwner[LedgerDao] =
for {
dbDispatcher <- DbDispatcher.owner(serverRole, jdbcUrl, maxConnections, metrics)

View File

@ -73,8 +73,8 @@ object LedgerResource {
initialConfig = initialConfig,
queueDepth = 128,
startMode = SqlStartMode.AlwaysReset,
metrics = metrics,
eventsPageSize = 100,
metrics = metrics,
)
} yield ledger
)

View File

@ -33,7 +33,12 @@ private[dao] trait JdbcLedgerDaoBackend extends AkkaBeforeAndAfterAll { this: Su
for {
_ <- Resource.fromFuture(new FlywayMigrations(jdbcUrl).migrate())
dao <- JdbcLedgerDao
.writeOwner(ServerRole.Testing(getClass), jdbcUrl, new MetricRegistry, 100)
.writeOwner(
serverRole = ServerRole.Testing(getClass),
jdbcUrl = jdbcUrl,
eventsPageSize = 100,
metrics = new MetricRegistry,
)
.acquire()
_ <- Resource.fromFuture(dao.initializeLedger(LedgerId("test-ledger"), Offset.begin))
} yield dao

View File

@ -213,8 +213,8 @@ class SqlLedgerSpec
initialConfig = Configuration(0, TimeModel.reasonableDefault, Duration.ofDays(1)),
queueDepth = queueDepth,
startMode = SqlStartMode.ContinueIfExists,
metrics = metrics,
eventsPageSize = 100,
metrics = metrics,
)
.acquire()(system.dispatcher)
}