Allow running StandaloneApiServer in read-only mode. (#6721)

Also fixes #5635, removing usage of `ReadService` from the `StandaloneApiServer`.

The configuration stream of the Ledger API LedgerConfigurationService is now properly backed by the configuration entries instead of just serving the initial configuration.

CHANGELOG_BEGIN
[DAML Integration Kit]: ``StandaloneApiServer`` can now be run in a read-only mode.
  - The type of the constructor parameter ``writeService`` of ``StandaloneApiServer`` changed to ``Option[WriteService]``. Passing ``None`` will not start any of the admin services, the command service, and the command submission service.
  - The constructor parameter ``readService`` of ``StandaloneApiServer`` has been removed.
  - A new constructor parameter ``ledgerId`` has been added to ``StandaloneApiServer``. It is used to verify that that ``StandaloneApiServer`` is run against an index storage for the same ledgerId. Initialization is aborted if this is not the case.
[DAML Integration Kit]: The ``LedgerConfigurationService`` now properly streams configuration changes.
CHANGELOG_END
This commit is contained in:
Gerolf Seitz 2020-07-17 09:57:00 +02:00 committed by GitHub
parent c32355b9fd
commit 35b50992f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 230 additions and 182 deletions

View File

@ -0,0 +1,8 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.common
class LedgerIdNotFoundException(attempts: Int)
extends RuntimeException(
s"""No ledger ID found in the index database after $attempts attempts.""")

View File

@ -64,7 +64,7 @@ object Main {
): ResourceOwner[KeyValueLedger] = {
val metrics = createMetrics(participantConfig, config)
new InMemoryLedgerReaderWriter.Owner(
initialLedgerId = config.ledgerId,
ledgerId = config.ledgerId,
config.extra.batchingLedgerWriterConfig,
participantId = participantConfig.participantId,
metrics = metrics,

View File

@ -3,8 +3,6 @@
package com.daml.ledger.on.memory
import java.util.UUID
import akka.NotUsed
import akka.stream.Materializer
import akka.stream.scaladsl.Source
@ -21,7 +19,6 @@ import com.daml.ledger.validator.batch.{
BatchedSubmissionValidatorFactory,
ConflictDetection
}
import com.daml.lf.data.Ref
import com.daml.lf.engine.Engine
import com.daml.logging.LoggingContext.newLoggingContext
import com.daml.metrics.Metrics
@ -69,7 +66,7 @@ object InMemoryLedgerReaderWriter {
val DefaultTimeProvider: TimeProvider = TimeProvider.UTC
final class SingleParticipantOwner(
initialLedgerId: Option[LedgerId],
ledgerId: LedgerId,
batchingLedgerWriterConfig: BatchingLedgerWriterConfig,
participantId: ParticipantId,
timeProvider: TimeProvider = DefaultTimeProvider,
@ -85,7 +82,7 @@ object InMemoryLedgerReaderWriter {
for {
dispatcher <- dispatcherOwner.acquire()
readerWriter <- new Owner(
initialLedgerId,
ledgerId,
batchingLedgerWriterConfig,
participantId,
metrics,
@ -100,7 +97,7 @@ object InMemoryLedgerReaderWriter {
}
final class Owner(
initialLedgerId: Option[LedgerId],
ledgerId: LedgerId,
batchingLedgerWriterConfig: BatchingLedgerWriterConfig,
participantId: ParticipantId,
metrics: Metrics,
@ -114,8 +111,6 @@ object InMemoryLedgerReaderWriter {
override def acquire()(
implicit executionContext: ExecutionContext
): Resource[KeyValueLedger] = {
val ledgerId =
initialLedgerId.getOrElse(Ref.LedgerString.assertFromString(UUID.randomUUID.toString))
val keyValueCommitting =
new KeyValueCommitting(
engine,

View File

@ -39,7 +39,7 @@ abstract class InMemoryLedgerReaderWriterIntegrationSpecBase(enableBatching: Boo
override val isPersistent: Boolean = false
override def participantStateFactory(
ledgerId: Option[LedgerId],
ledgerId: LedgerId,
participantId: ParticipantId,
testId: String,
metrics: Metrics,

View File

@ -128,7 +128,7 @@ object SqlLedgerReaderWriter {
val DefaultTimeProvider: TimeProvider = TimeProvider.UTC
final class Owner(
initialLedgerId: Option[LedgerId],
ledgerId: LedgerId,
participantId: ParticipantId,
metrics: Metrics,
engine: Engine,
@ -147,7 +147,7 @@ object SqlLedgerReaderWriter {
database <- Resource.fromFuture(
if (resetOnStartup) uninitializedDatabase.migrateAndReset()
else Future.successful(uninitializedDatabase.migrate()))
ledgerId <- Resource.fromFuture(updateOrRetrieveLedgerId(initialLedgerId, database))
ledgerId <- Resource.fromFuture(updateOrRetrieveLedgerId(ledgerId, database))
dispatcher <- new DispatcherOwner(database).acquire()
} yield
new SqlLedgerReaderWriter(
@ -163,22 +163,20 @@ object SqlLedgerReaderWriter {
)
}
private def updateOrRetrieveLedgerId(initialLedgerId: Option[LedgerId], database: Database)(
private def updateOrRetrieveLedgerId(providedLedgerId: LedgerId, database: Database)(
implicit executionContext: ExecutionContext,
logCtx: LoggingContext,
): Future[LedgerId] =
database.inWriteTransaction("retrieve_ledger_id") { queries =>
val providedLedgerId =
initialLedgerId.getOrElse(Ref.LedgerString.assertFromString(UUID.randomUUID.toString))
Future.fromTry(
queries
.updateOrRetrieveLedgerId(providedLedgerId)
.flatMap { ledgerId =>
if (initialLedgerId.exists(_ != ledgerId)) {
if (providedLedgerId != ledgerId) {
Failure(
new LedgerIdMismatchException(
domain.LedgerId(ledgerId),
domain.LedgerId(initialLedgerId.get),
domain.LedgerId(providedLedgerId),
))
} else {
Success(ledgerId)

View File

@ -20,7 +20,7 @@ abstract class SqlLedgerReaderWriterIntegrationSpecBase(implementationName: Stri
override protected final val startIndex: Long = StartIndex
override protected final def participantStateFactory(
ledgerId: Option[LedgerId],
ledgerId: LedgerId,
participantId: ParticipantId,
testId: String,
metrics: Metrics,

View File

@ -6,6 +6,7 @@ package com.daml.ledger.participant.state.kvutils.app
import java.io.File
import java.nio.file.Path
import java.time.Duration
import java.util.UUID
import com.daml.caching
import com.daml.ledger.api.tls.TlsConfiguration
@ -19,7 +20,7 @@ import com.daml.resources.ResourceOwner
import scopt.OptionParser
final case class Config[Extra](
ledgerId: Option[String],
ledgerId: String,
archiveFiles: Seq[Path],
tlsConfig: Option[TlsConfiguration],
participants: Seq[ParticipantConfig],
@ -59,7 +60,7 @@ object Config {
def createDefault[Extra](extra: Extra): Config[Extra] =
Config(
ledgerId = None,
ledgerId = UUID.randomUUID().toString,
archiveFiles = Vector.empty,
tlsConfig = None,
participants = Vector.empty,
@ -123,9 +124,9 @@ object Config {
config.copy(participants = config.participants :+ partConfig)
})
opt[String]("ledger-id")
.text(
"The ID of the ledger. This must be the same each time the ledger is started. Defaults to a random UUID.")
.action((ledgerId, config) => config.copy(ledgerId = Some(ledgerId)))
.optional()
.text("The ID of the ledger. This must be the same each time the ledger is started. Defaults to a random UUID.")
.action((ledgerId, config) => config.copy(ledgerId = ledgerId))
opt[String]("pem")
.optional()

View File

@ -89,12 +89,12 @@ final class Runner[T <: ReadWriteService, Extra](
lfValueTranslationCache = lfValueTranslationCache,
).acquire()
_ <- new StandaloneApiServer(
ledgerId = config.ledgerId,
config = factory.apiServerConfig(participantConfig, config),
commandConfig = factory.commandConfig(participantConfig, config),
partyConfig = factory.partyConfig(config),
ledgerConfig = factory.ledgerConfig(config),
readService = readService,
writeService = writeService,
optWriteService = Some(writeService),
authService = factory.authService(config),
transformIndexService = service => new TimedIndexService(service, metrics),
metrics = metrics,

View File

@ -55,17 +55,17 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(
protected val isPersistent: Boolean = true
protected def participantStateFactory(
ledgerId: Option[LedgerId],
ledgerId: LedgerId,
participantId: ParticipantId,
testId: String,
metrics: Metrics,
)(implicit logCtx: LoggingContext): ResourceOwner[ParticipantState]
private def participantState: ResourceOwner[ParticipantState] =
newParticipantState()
newParticipantState(Ref.LedgerString.assertFromString(UUID.randomUUID.toString))
private def newParticipantState(
ledgerId: Option[LedgerId] = None,
ledgerId: LedgerId,
): ResourceOwner[ParticipantState] =
newLoggingContext { implicit logCtx =>
participantStateFactory(ledgerId, participantId, testId, new Metrics(new MetricRegistry))
@ -83,7 +83,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(
implementationName should {
"return initial conditions" in {
val ledgerId = newLedgerId()
newParticipantState(ledgerId = Some(ledgerId)).use { ps =>
newParticipantState(ledgerId = ledgerId).use { ps =>
for {
conditions <- ps
.getLedgerInitialConditions()
@ -596,10 +596,10 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(
"store the ledger ID and re-use it" in {
val ledgerId = newLedgerId()
for {
retrievedLedgerId1 <- newParticipantState(ledgerId = Some(ledgerId)).use { ps =>
retrievedLedgerId1 <- newParticipantState(ledgerId = ledgerId).use { ps =>
ps.getLedgerInitialConditions().map(_.ledgerId).runWith(Sink.head)
}
retrievedLedgerId2 <- newParticipantState().use { ps =>
retrievedLedgerId2 <- newParticipantState(ledgerId = ledgerId).use { ps =>
ps.getLedgerInitialConditions().map(_.ledgerId).runWith(Sink.head)
}
} yield {
@ -612,10 +612,10 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(
val ledgerId = newLedgerId()
val attemptedLedgerId = newLedgerId()
for {
_ <- newParticipantState(ledgerId = Some(ledgerId)).use { _ =>
_ <- newParticipantState(ledgerId = ledgerId).use { _ =>
Future.unit
}
exception <- newParticipantState(ledgerId = Some(attemptedLedgerId)).use { _ =>
exception <- newParticipantState(ledgerId = attemptedLedgerId).use { _ =>
Future.unit
}.failed
} yield {
@ -629,14 +629,14 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(
"resume where it left off on restart" in {
val ledgerId = newLedgerId()
for {
_ <- newParticipantState(ledgerId = Some(ledgerId)).use { ps =>
_ <- newParticipantState(ledgerId = ledgerId).use { ps =>
for {
_ <- ps
.allocateParty(None, Some("party-1"), newSubmissionId())
.toScala
} yield ()
}
updates <- newParticipantState().use { ps =>
updates <- newParticipantState(ledgerId = ledgerId).use { ps =>
for {
_ <- ps
.allocateParty(None, Some("party-2"), newSubmissionId())

View File

@ -188,7 +188,7 @@ class RecoveringIndexerIntegrationSpec extends AsyncWordSpec with Matchers with
for {
actorSystem <- AkkaResourceOwner.forActorSystem(() => ActorSystem())
materializer <- AkkaResourceOwner.forMaterializer(() => Materializer(actorSystem))
participantState <- newParticipantState(Some(ledgerId), participantId)(materializer, logCtx)
participantState <- newParticipantState(ledgerId, participantId)(materializer, logCtx)
_ <- new StandaloneIndexerServer(
readService = participantState,
config = IndexerConfig(
@ -226,14 +226,14 @@ object RecoveringIndexerIntegrationSpec {
SubmissionId.assertFromString(UUID.randomUUID().toString)
private trait ParticipantStateFactory {
def apply(ledgerId: Option[LedgerId], participantId: ParticipantId)(
def apply(ledgerId: LedgerId, participantId: ParticipantId)(
implicit materializer: Materializer,
logCtx: LoggingContext,
): ResourceOwner[ParticipantState]
}
private object SimpleParticipantState extends ParticipantStateFactory {
override def apply(ledgerId: Option[LedgerId], participantId: ParticipantId)(
override def apply(ledgerId: LedgerId, participantId: ParticipantId)(
implicit materializer: Materializer,
logCtx: LoggingContext
): ResourceOwner[ParticipantState] = {
@ -249,7 +249,7 @@ object RecoveringIndexerIntegrationSpec {
}
private object ParticipantStateThatFailsOften extends ParticipantStateFactory {
override def apply(ledgerId: Option[LedgerId], participantId: ParticipantId)(
override def apply(ledgerId: LedgerId, participantId: ParticipantId)(
implicit materializer: Materializer,
logCtx: LoggingContext
): ResourceOwner[ParticipantState] =

View File

@ -35,7 +35,11 @@ import com.daml.platform.configuration.{
LedgerConfiguration,
PartyConfiguration
}
import com.daml.platform.server.api.services.grpc.GrpcHealthService
import com.daml.platform.server.api.services.grpc.{
GrpcCommandCompletionService,
GrpcHealthService,
GrpcTransactionService
}
import com.daml.platform.services.time.TimeProviderType
import com.daml.resources.{Resource, ResourceOwner}
import io.grpc.BindableService
@ -64,7 +68,7 @@ object ApiServices {
class Owner(
participantId: Ref.ParticipantId,
writeService: WriteService,
optWriteService: Option[WriteService],
indexService: IndexService,
authorizer: Authorizer,
engine: Engine,
@ -99,7 +103,7 @@ object ApiServices {
ledgerId <- identityService.getLedgerId()
ledgerConfigProvider = LedgerConfigProvider.create(
configManagementService,
writeService,
optWriteService,
timeProvider,
ledgerConfiguration)
services = createServices(ledgerId, ledgerConfigProvider)(mat.system.dispatcher)
@ -118,37 +122,6 @@ object ApiServices {
private def createServices(ledgerId: LedgerId, ledgerConfigProvider: LedgerConfigProvider)(
implicit executionContext: ExecutionContext): List[BindableService] = {
val commandExecutor = new TimedCommandExecutor(
new LedgerTimeAwareCommandExecutor(
new StoreBackedCommandExecutor(
engine,
participantId,
packagesService,
contractStore,
metrics,
),
contractStore,
maxRetries = 3,
metrics,
),
metrics,
)
val apiSubmissionService = ApiSubmissionService.create(
ledgerId,
contractStore,
writeService,
submissionService,
partyManagementService,
timeProvider,
timeProviderType,
ledgerConfigProvider,
seedService,
commandExecutor,
ApiSubmissionService.Configuration(
partyConfig.implicitPartyAllocation,
),
metrics,
)
logger.info(engine.info.show)
@ -166,29 +139,6 @@ object ApiServices {
val apiCompletionService =
ApiCommandCompletionService.create(ledgerId, completionsService)
// Note: the command service uses the command submission, command completion, and transaction
// services internally. These connections do not use authorization, authorization wrappers are
// only added here to all exposed services.
val apiCommandService = ApiCommandService.create(
ApiCommandService.Configuration(
ledgerId,
commandConfig.inputBufferSize,
commandConfig.maxCommandsInFlight,
commandConfig.limitMaxCommandsInFlight,
commandConfig.retentionPeriod,
),
// Using local services skips the gRPC layer, improving performance.
ApiCommandService.LocalServices(
CommandSubmissionFlow(apiSubmissionService.submit, commandConfig.maxCommandsInFlight),
r => apiCompletionService.completionStreamSource(r),
() => apiCompletionService.completionEnd(CompletionEndRequest(ledgerId.unwrap)),
apiTransactionService.getTransactionById,
apiTransactionService.getFlatTransactionById
),
timeProvider,
ledgerConfigProvider,
)
val apiActiveContractsService =
ApiActiveContractsService.create(ledgerId, activeContractsService)
@ -203,42 +153,119 @@ object ApiServices {
)
}
val apiPartyManagementService =
ApiPartyManagementService
.createApiService(partyManagementService, transactionsService, writeService)
val apiPackageManagementService =
ApiPackageManagementService
.createApiService(indexService, transactionsService, writeService, timeProvider)
val apiConfigManagementService =
ApiConfigManagementService
.createApiService(
configManagementService,
writeService,
timeProvider,
ledgerConfiguration)
val writeServiceBackedApiServices =
intitializeWriteServiceBackedApiServices(
ledgerId,
ledgerConfigProvider,
apiCompletionService,
apiTransactionService)
val apiReflectionService = ProtoReflectionService.newInstance()
val apiHealthService = new GrpcHealthService(healthChecks)
apiTimeServiceOpt.toList :::
writeServiceBackedApiServices :::
List(
new LedgerIdentityServiceAuthorization(apiLedgerIdentityService, authorizer),
new PackageServiceAuthorization(apiPackageService, authorizer),
new LedgerConfigurationServiceAuthorization(apiConfigurationService, authorizer),
new CommandSubmissionServiceAuthorization(apiSubmissionService, authorizer),
new TransactionServiceAuthorization(apiTransactionService, authorizer),
new CommandCompletionServiceAuthorization(apiCompletionService, authorizer),
new CommandServiceAuthorization(apiCommandService, authorizer),
new ActiveContractsServiceAuthorization(apiActiveContractsService, authorizer),
new PartyManagementServiceAuthorization(apiPartyManagementService, authorizer),
new PackageManagementServiceAuthorization(apiPackageManagementService, authorizer),
new ConfigManagementServiceAuthorization(apiConfigManagementService, authorizer),
apiReflectionService,
apiHealthService,
)
}
private def intitializeWriteServiceBackedApiServices(
ledgerId: LedgerId,
ledgerConfigProvider: LedgerConfigProvider,
apiCompletionService: GrpcCommandCompletionService,
apiTransactionService: GrpcTransactionService)(
implicit mat: Materializer,
ec: ExecutionContext,
logCtx: LoggingContext): List[BindableService] = {
optWriteService.toList.flatMap { writeService =>
val commandExecutor = new TimedCommandExecutor(
new LedgerTimeAwareCommandExecutor(
new StoreBackedCommandExecutor(
engine,
participantId,
packagesService,
contractStore,
metrics,
),
contractStore,
maxRetries = 3,
metrics,
),
metrics,
)
val apiSubmissionService = ApiSubmissionService.create(
ledgerId,
contractStore,
writeService,
submissionService,
partyManagementService,
timeProvider,
timeProviderType,
ledgerConfigProvider,
seedService,
commandExecutor,
ApiSubmissionService.Configuration(
partyConfig.implicitPartyAllocation,
),
metrics,
)
// Note: the command service uses the command submission, command completion, and transaction
// services internally. These connections do not use authorization, authorization wrappers are
// only added here to all exposed services.
val apiCommandService = ApiCommandService.create(
ApiCommandService.Configuration(
ledgerId,
commandConfig.inputBufferSize,
commandConfig.maxCommandsInFlight,
commandConfig.limitMaxCommandsInFlight,
commandConfig.retentionPeriod,
),
// Using local services skips the gRPC layer, improving performance.
ApiCommandService.LocalServices(
CommandSubmissionFlow(apiSubmissionService.submit, commandConfig.maxCommandsInFlight),
r => apiCompletionService.completionStreamSource(r),
() => apiCompletionService.completionEnd(CompletionEndRequest(ledgerId.unwrap)),
apiTransactionService.getTransactionById,
apiTransactionService.getFlatTransactionById
),
timeProvider,
ledgerConfigProvider,
)
val apiPartyManagementService =
ApiPartyManagementService
.createApiService(partyManagementService, transactionsService, writeService)
val apiPackageManagementService =
ApiPackageManagementService
.createApiService(indexService, transactionsService, writeService, timeProvider)
val apiConfigManagementService =
ApiConfigManagementService
.createApiService(
configManagementService,
writeService,
timeProvider,
ledgerConfiguration)
List(
new CommandSubmissionServiceAuthorization(apiSubmissionService, authorizer),
new CommandServiceAuthorization(apiCommandService, authorizer),
new PartyManagementServiceAuthorization(apiPartyManagementService, authorizer),
new PackageManagementServiceAuthorization(apiPackageManagementService, authorizer),
new ConfigManagementServiceAuthorization(apiConfigManagementService, authorizer),
)
}
}
}
}

View File

@ -9,7 +9,6 @@ import java.time.Instant
import akka.actor.ActorSystem
import akka.stream.Materializer
import akka.stream.scaladsl.Sink
import com.daml.api.util.TimeProvider
import com.daml.buildinfo.BuildInfo
import com.daml.ledger.api.auth.interceptor.AuthorizationInterceptor
@ -17,7 +16,7 @@ import com.daml.ledger.api.auth.{AuthService, Authorizer}
import com.daml.ledger.api.domain
import com.daml.ledger.api.health.HealthChecks
import com.daml.ledger.participant.state.index.v2.IndexService
import com.daml.ledger.participant.state.v1.{ParticipantId, ReadService, SeedService, WriteService}
import com.daml.ledger.participant.state.v1.{LedgerId, ParticipantId, SeedService, WriteService}
import com.daml.lf.engine.Engine
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
@ -25,7 +24,7 @@ import com.daml.platform.configuration.{
CommandConfiguration,
LedgerConfiguration,
PartyConfiguration,
ServerRole,
ServerRole
}
import com.daml.platform.index.JdbcIndex
import com.daml.platform.packages.InMemoryPackageStore
@ -42,12 +41,12 @@ import scala.concurrent.ExecutionContext
// Main entry point to start an index server that also hosts the ledger API.
// See v2.ReferenceServer on how it is used.
final class StandaloneApiServer(
ledgerId: LedgerId,
config: ApiServerConfig,
commandConfig: CommandConfiguration,
partyConfig: PartyConfiguration,
ledgerConfig: LedgerConfiguration,
readService: ReadService,
writeService: WriteService,
optWriteService: Option[WriteService],
authService: AuthService,
transformIndexService: IndexService => IndexService = identity,
metrics: Metrics,
@ -69,17 +68,10 @@ final class StandaloneApiServer(
preloadPackages(packageStore)
val owner = for {
initialConditions <- ResourceOwner.forFuture(() =>
readService.getLedgerInitialConditions().runWith(Sink.head))
authorizer = new Authorizer(
() => java.time.Clock.systemUTC.instant(),
initialConditions.ledgerId,
participantId)
indexService <- JdbcIndex
.owner(
ServerRole.ApiServer,
initialConditions.config,
domain.LedgerId(initialConditions.ledgerId),
domain.LedgerId(ledgerId),
participantId,
config.jdbcUrl,
config.eventsPageSize,
@ -87,15 +79,16 @@ final class StandaloneApiServer(
lfValueTranslationCache,
)
.map(transformIndexService)
authorizer = new Authorizer(
() => java.time.Clock.systemUTC.instant(),
ledgerId,
participantId)
healthChecks = new HealthChecks(
"index" -> indexService,
"read" -> readService,
"write" -> writeService,
)
Seq("index" -> indexService) ++ optWriteService.toList.map("write" -> _): _*)
executionSequencerFactory <- new ExecutionSequencerFactoryOwner()
apiServicesOwner = new ApiServices.Owner(
participantId = participantId,
writeService = writeService,
optWriteService = optWriteService,
indexService = indexService,
authorizer = authorizer,
engine = engine,
@ -124,7 +117,7 @@ final class StandaloneApiServer(
} yield {
writePortFile(apiServer.port)
logger.info(
s"Initialized API server version ${BuildInfo.Version} with ledger-id = ${initialConditions.ledgerId}, port = ${apiServer.port}, dar file = ${config.archiveFiles}")
s"Initialized API server version ${BuildInfo.Version} with ledger-id = $ledgerId, port = ${apiServer.port}, dar file = ${config.archiveFiles}")
apiServer
}

View File

@ -37,7 +37,7 @@ import scala.concurrent.duration.{DurationInt, DurationLong}
*/
final class LedgerConfigProvider private (
index: IndexConfigManagementService,
writeService: WriteService,
optWriteService: Option[WriteService],
timeProvider: TimeProvider,
config: LedgerConfiguration,
materializer: Materializer,
@ -62,10 +62,12 @@ final class LedgerConfigProvider private (
readyPromise.trySuccess(())
()
})
materializer.scheduleOnce(config.initialConfigurationSubmitDelay.toNanos.nanos, () => {
if (latestConfiguration.isEmpty) submitInitialConfig()
()
})
optWriteService.foreach { writeService =>
materializer.scheduleOnce(config.initialConfigurationSubmitDelay.toNanos.nanos, () => {
if (latestConfiguration.isEmpty) submitInitialConfig(writeService)
()
})
}
// Looks up the latest ledger configuration, then subscribes to a
// stream of configuration changes.
@ -118,7 +120,7 @@ final class LedgerConfigProvider private (
()
}
private[this] def submitInitialConfig(): Future[Unit] = {
private[this] def submitInitialConfig(writeService: WriteService): Future[Unit] = {
implicit val executionContext: ExecutionContext = DE
// There are several reasons why the change could be rejected:
// - The participant is not authorized to set the configuration
@ -168,10 +170,10 @@ object LedgerConfigProvider {
def create(
index: IndexConfigManagementService,
writeService: WriteService,
optWriteService: Option[WriteService],
timeProvider: TimeProvider,
config: LedgerConfiguration)(
implicit materializer: Materializer,
logCtx: LoggingContext): LedgerConfigProvider =
new LedgerConfigProvider(index, writeService, timeProvider, config, materializer)
new LedgerConfigProvider(index, optWriteService, timeProvider, config, materializer)
}

View File

@ -3,13 +3,10 @@
package com.daml.platform.index
import akka.NotUsed
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.participant.state.index.v2
import com.daml.ledger.participant.state.index.v2.IndexService
import com.daml.ledger.participant.state.v1.{Configuration, ParticipantId}
import com.daml.ledger.participant.state.v1.ParticipantId
import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
import com.daml.platform.configuration.ServerRole
@ -19,7 +16,6 @@ import com.daml.resources.ResourceOwner
object JdbcIndex {
def owner(
serverRole: ServerRole,
initialConfig: Configuration,
ledgerId: LedgerId,
participantId: ParticipantId,
jdbcUrl: String,
@ -35,10 +31,6 @@ object JdbcIndex {
metrics,
lfValueTranslationCache,
).map { ledger =>
new LedgerBackedIndexService(MeteredReadOnlyLedger(ledger, metrics), participantId) {
override def getLedgerConfiguration(): Source[v2.LedgerConfiguration, NotUsed] =
// FIXME(JM): The indexer should on start set the default configuration.
Source.single(v2.LedgerConfiguration(initialConfig.maxDeduplicationTime))
}
new LedgerBackedIndexService(MeteredReadOnlyLedger(ledger, metrics), participantId)
}
}

View File

@ -19,6 +19,7 @@ import com.daml.lf.value.Value.{ContractId, ContractInst}
import com.daml.daml_lf_dev.DamlLf.Archive
import com.daml.dec.{DirectExecutionContext => DEC}
import com.daml.ledger.api.domain
import com.daml.ledger.api.domain.ConfigurationEntry.Accepted
import com.daml.ledger.api.domain.{
ApplicationId,
CommandId,
@ -48,7 +49,7 @@ import scalaz.syntax.tag.ToTagOps
import scala.concurrent.Future
abstract class LedgerBackedIndexService(
final class LedgerBackedIndexService(
ledger: ReadOnlyLedger,
participantId: ParticipantId,
)(implicit mat: Materializer)
@ -237,6 +238,26 @@ abstract class LedgerBackedIndexService(
.lookupLedgerConfiguration()
.map(_.map { case (offset, config) => (toAbsolute(offset), config) })(DEC)
/** Looks up the current configuration, if set, and continues to stream configuration changes.
*
*/
override def getLedgerConfiguration(): Source[LedgerConfiguration, NotUsed] = {
Source
.future(lookupConfiguration())
.flatMapConcat { optResult =>
val offset = optResult.map(_._1)
val foundConfig = optResult.map(_._2)
val initialConfig = Source(foundConfig.toList)
val configStream = configurationEntries(offset).collect {
case (_, Accepted(_, _, configuration)) => configuration
}
initialConfig
.concat(configStream)
.map(cfg => LedgerConfiguration(cfg.maxDeduplicationTime))
}
}
/** Retrieve configuration entries. */
override def configurationEntries(startExclusive: Option[LedgerOffset.Absolute])
: Source[(domain.LedgerOffset.Absolute, domain.ConfigurationEntry), NotUsed] =

View File

@ -15,13 +15,14 @@ import com.daml.ledger.participant.state.v1.Offset
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.platform.akkastreams.dispatcher.Dispatcher
import com.daml.platform.common.LedgerIdMismatchException
import com.daml.platform.common.{LedgerIdMismatchException, LedgerIdNotFoundException}
import com.daml.platform.configuration.ServerRole
import com.daml.platform.store.dao.events.LfValueTranslation
import com.daml.platform.store.dao.{JdbcLedgerDao, LedgerReadDao}
import com.daml.platform.store.{BaseLedger, ReadOnlyLedger}
import com.daml.resources.ProgramResource.StartupException
import com.daml.resources.{Resource, ResourceOwner}
import com.daml.timer.RetryStrategy
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
@ -45,7 +46,7 @@ object ReadOnlySqlLedger {
): Resource[ReadOnlyLedger] =
for {
ledgerDao <- ledgerDaoOwner().acquire()
ledgerId <- Resource.fromFuture(verifyOrSetLedgerId(ledgerDao, initialLedgerId))
ledgerId <- Resource.fromFuture(verifyLedgerId(ledgerDao, initialLedgerId))
ledgerEnd <- Resource.fromFuture(ledgerDao.lookupLedgerEnd())
dispatcher <- dispatcherOwner(ledgerEnd).acquire()
ledger <- ResourceOwner
@ -53,22 +54,37 @@ object ReadOnlySqlLedger {
.acquire()
} yield ledger
private def verifyOrSetLedgerId(
private def verifyLedgerId(
ledgerDao: LedgerReadDao,
initialLedgerId: LedgerId,
)(implicit executionContext: ExecutionContext, logCtx: LoggingContext): Future[LedgerId] =
ledgerDao
.lookupLedgerId()
.flatMap {
case Some(`initialLedgerId`) =>
logger.info(s"Found existing ledger with ID: $initialLedgerId")
Future.successful(initialLedgerId)
case Some(foundLedgerId) =>
Future.failed(
new LedgerIdMismatchException(foundLedgerId, initialLedgerId) with StartupException)
case None =>
Future.successful(initialLedgerId)
}
)(implicit executionContext: ExecutionContext, logCtx: LoggingContext): Future[LedgerId] = {
val predicate: PartialFunction[Throwable, Boolean] = {
case _: LedgerIdNotFoundException => true
case _: LedgerIdMismatchException => false
case _ => false
}
val retryDelay = 5.seconds
val maxAttempts = 100
RetryStrategy.constant(attempts = Some(maxAttempts), waitTime = retryDelay)(predicate) {
(attempt, _wait) =>
ledgerDao
.lookupLedgerId()
.flatMap {
case Some(`initialLedgerId`) =>
logger.info(s"Found existing ledger with ID: $initialLedgerId")
Future.successful(initialLedgerId)
case Some(foundLedgerId) =>
Future.failed(
new LedgerIdMismatchException(foundLedgerId, initialLedgerId)
with StartupException)
case None =>
logger.info(
s"Ledger ID not found in the index database on attempt $attempt/$maxAttempts. Retrying again in $retryDelay.")
Future.failed(new LedgerIdNotFoundException(attempt))
}
}
}
private def ledgerDaoOwner(): ResourceOwner[LedgerReadDao] =
JdbcLedgerDao.readOwner(

View File

@ -303,7 +303,7 @@ final class SandboxServer(
executionSequencerFactory <- new ExecutionSequencerFactoryOwner().acquire()
apiServicesOwner = new ApiServices.Owner(
participantId = participantId,
writeService = new TimedWriteService(indexAndWriteService.writeService, metrics),
optWriteService = Some(new TimedWriteService(indexAndWriteService.writeService, metrics)),
indexService = new TimedIndexService(indexAndWriteService.indexService, metrics),
authorizer = authorizer,
engine = SandboxServer.engine,

View File

@ -6,7 +6,6 @@ package com.daml.platform.sandbox.stores
import java.time.Instant
import java.util.concurrent.CompletionStage
import akka.NotUsed
import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import com.daml.api.util.TimeProvider
@ -40,7 +39,7 @@ import org.slf4j.LoggerFactory
import scala.compat.java8.FutureConverters
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.concurrent.{ExecutionContext, Future}
trait IndexAndWriteService {
def indexService: IndexService
@ -116,12 +115,7 @@ object SandboxIndexAndWriteService {
initialConfig: Configuration,
timeProvider: TimeProvider,
)(implicit mat: Materializer): ResourceOwner[IndexAndWriteService] = {
val indexSvc = new LedgerBackedIndexService(ledger, participantId) {
override def getLedgerConfiguration(): Source[LedgerConfiguration, NotUsed] =
Source
.single(LedgerConfiguration(initialConfig.maxDeduplicationTime))
.concat(Source.future(Promise[LedgerConfiguration]().future)) // we should keep the stream open!
}
val indexSvc = new LedgerBackedIndexService(ledger, participantId)
val writeSvc = new LedgerBackedWriteService(ledger, timeProvider)
for {

View File

@ -140,9 +140,10 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] {
case TimeProviderType.WallClock =>
None
}
ledgerId = specifiedLedgerId.getOrElse(UUID.randomUUID().toString)
isReset = startupMode == StartupMode.ResetAndStart
readerWriter <- new SqlLedgerReaderWriter.Owner(
initialLedgerId = specifiedLedgerId,
ledgerId = ledgerId,
participantId = ParticipantId,
metrics = metrics,
jdbcUrl = ledgerJdbcUrl,
@ -200,6 +201,7 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] {
)
}
apiServer <- new StandaloneApiServer(
ledgerId = ledgerId,
config = ApiServerConfig(
participantId = ParticipantId,
archiveFiles = if (isReset) List.empty else config.damlPackages,
@ -219,8 +221,7 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] {
implicitPartyAllocation = config.implicitPartyAllocation,
),
ledgerConfig = config.ledgerConfig,
readService = readService,
writeService = writeService,
optWriteService = Some(writeService),
authService = authService,
transformIndexService = new TimedIndexService(_, metrics),
metrics = metrics,