From 35b50992f02567a347f85dce319f13c93fd6987d Mon Sep 17 00:00:00 2001 From: Gerolf Seitz Date: Fri, 17 Jul 2020 09:57:00 +0200 Subject: [PATCH] Allow running StandaloneApiServer in read-only mode. (#6721) Also fixes #5635, removing usage of `ReadService` from the `StandaloneApiServer`. The configuration stream of the Ledger API LedgerConfigurationService is now properly backed by the configuration entries instead of just serving the initial configuration. CHANGELOG_BEGIN [DAML Integration Kit]: ``StandaloneApiServer`` can now be run in a read-only mode. - The type of the constructor parameter ``writeService`` of ``StandaloneApiServer`` changed to ``Option[WriteService]``. Passing ``None`` will not start any of the admin services, the command service, and the command submission service. - The constructor parameter ``readService`` of ``StandaloneApiServer`` has been removed. - A new constructor parameter ``ledgerId`` has been added to ``StandaloneApiServer``. It is used to verify that that ``StandaloneApiServer`` is run against an index storage for the same ledgerId. Initialization is aborted if this is not the case. [DAML Integration Kit]: The ``LedgerConfigurationService`` now properly streams configuration changes. CHANGELOG_END --- .../common/LedgerIdNotFoundException.scala | 8 + .../com/daml/ledger/on/memory/Main.scala | 2 +- .../memory/InMemoryLedgerReaderWriter.scala | 11 +- ...oryLedgerReaderWriterIntegrationSpec.scala | 2 +- .../ledger/on/sql/SqlLedgerReaderWriter.scala | 12 +- ...edgerReaderWriterIntegrationSpecBase.scala | 2 +- .../state/kvutils/app/Config.scala | 11 +- .../state/kvutils/app/Runner.scala | 4 +- .../ParticipantStateIntegrationSpecBase.scala | 20 +- .../RecoveringIndexerIntegrationSpec.scala | 8 +- .../platform/apiserver/ApiServices.scala | 181 ++++++++++-------- .../apiserver/StandaloneApiServer.scala | 31 ++- .../services/LedgerConfigProvider.scala | 18 +- .../platform/index/JdbcIndex.scala | 12 +- .../index/LedgerBackedIndexService.scala | 23 ++- .../platform/index/ReadOnlySqlLedger.scala | 48 +++-- .../platform/sandbox/SandboxServer.scala | 2 +- .../stores/SandboxIndexAndWriteService.scala | 10 +- .../platform/sandboxnext/Runner.scala | 7 +- 19 files changed, 230 insertions(+), 182 deletions(-) create mode 100644 ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/common/LedgerIdNotFoundException.scala diff --git a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/common/LedgerIdNotFoundException.scala b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/common/LedgerIdNotFoundException.scala new file mode 100644 index 0000000000..baa8ce0b3d --- /dev/null +++ b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/common/LedgerIdNotFoundException.scala @@ -0,0 +1,8 @@ +// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.platform.common + +class LedgerIdNotFoundException(attempts: Int) + extends RuntimeException( + s"""No ledger ID found in the index database after $attempts attempts.""") diff --git a/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/Main.scala b/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/Main.scala index 4e2a9a1179..2d5f0202b9 100644 --- a/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/Main.scala +++ b/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/Main.scala @@ -64,7 +64,7 @@ object Main { ): ResourceOwner[KeyValueLedger] = { val metrics = createMetrics(participantConfig, config) new InMemoryLedgerReaderWriter.Owner( - initialLedgerId = config.ledgerId, + ledgerId = config.ledgerId, config.extra.batchingLedgerWriterConfig, participantId = participantConfig.participantId, metrics = metrics, diff --git a/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriter.scala b/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriter.scala index 4d5a7ca005..d20f2224ad 100644 --- a/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriter.scala +++ b/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriter.scala @@ -3,8 +3,6 @@ package com.daml.ledger.on.memory -import java.util.UUID - import akka.NotUsed import akka.stream.Materializer import akka.stream.scaladsl.Source @@ -21,7 +19,6 @@ import com.daml.ledger.validator.batch.{ BatchedSubmissionValidatorFactory, ConflictDetection } -import com.daml.lf.data.Ref import com.daml.lf.engine.Engine import com.daml.logging.LoggingContext.newLoggingContext import com.daml.metrics.Metrics @@ -69,7 +66,7 @@ object InMemoryLedgerReaderWriter { val DefaultTimeProvider: TimeProvider = TimeProvider.UTC final class SingleParticipantOwner( - initialLedgerId: Option[LedgerId], + ledgerId: LedgerId, batchingLedgerWriterConfig: BatchingLedgerWriterConfig, participantId: ParticipantId, timeProvider: TimeProvider = DefaultTimeProvider, @@ -85,7 +82,7 @@ object InMemoryLedgerReaderWriter { for { dispatcher <- dispatcherOwner.acquire() readerWriter <- new Owner( - initialLedgerId, + ledgerId, batchingLedgerWriterConfig, participantId, metrics, @@ -100,7 +97,7 @@ object InMemoryLedgerReaderWriter { } final class Owner( - initialLedgerId: Option[LedgerId], + ledgerId: LedgerId, batchingLedgerWriterConfig: BatchingLedgerWriterConfig, participantId: ParticipantId, metrics: Metrics, @@ -114,8 +111,6 @@ object InMemoryLedgerReaderWriter { override def acquire()( implicit executionContext: ExecutionContext ): Resource[KeyValueLedger] = { - val ledgerId = - initialLedgerId.getOrElse(Ref.LedgerString.assertFromString(UUID.randomUUID.toString)) val keyValueCommitting = new KeyValueCommitting( engine, diff --git a/ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriterIntegrationSpec.scala b/ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriterIntegrationSpec.scala index 6a90b7a617..fa07109318 100644 --- a/ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriterIntegrationSpec.scala +++ b/ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriterIntegrationSpec.scala @@ -39,7 +39,7 @@ abstract class InMemoryLedgerReaderWriterIntegrationSpecBase(enableBatching: Boo override val isPersistent: Boolean = false override def participantStateFactory( - ledgerId: Option[LedgerId], + ledgerId: LedgerId, participantId: ParticipantId, testId: String, metrics: Metrics, diff --git a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriter.scala b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriter.scala index 96c1e24b84..3531b955bf 100644 --- a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriter.scala +++ b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriter.scala @@ -128,7 +128,7 @@ object SqlLedgerReaderWriter { val DefaultTimeProvider: TimeProvider = TimeProvider.UTC final class Owner( - initialLedgerId: Option[LedgerId], + ledgerId: LedgerId, participantId: ParticipantId, metrics: Metrics, engine: Engine, @@ -147,7 +147,7 @@ object SqlLedgerReaderWriter { database <- Resource.fromFuture( if (resetOnStartup) uninitializedDatabase.migrateAndReset() else Future.successful(uninitializedDatabase.migrate())) - ledgerId <- Resource.fromFuture(updateOrRetrieveLedgerId(initialLedgerId, database)) + ledgerId <- Resource.fromFuture(updateOrRetrieveLedgerId(ledgerId, database)) dispatcher <- new DispatcherOwner(database).acquire() } yield new SqlLedgerReaderWriter( @@ -163,22 +163,20 @@ object SqlLedgerReaderWriter { ) } - private def updateOrRetrieveLedgerId(initialLedgerId: Option[LedgerId], database: Database)( + private def updateOrRetrieveLedgerId(providedLedgerId: LedgerId, database: Database)( implicit executionContext: ExecutionContext, logCtx: LoggingContext, ): Future[LedgerId] = database.inWriteTransaction("retrieve_ledger_id") { queries => - val providedLedgerId = - initialLedgerId.getOrElse(Ref.LedgerString.assertFromString(UUID.randomUUID.toString)) Future.fromTry( queries .updateOrRetrieveLedgerId(providedLedgerId) .flatMap { ledgerId => - if (initialLedgerId.exists(_ != ledgerId)) { + if (providedLedgerId != ledgerId) { Failure( new LedgerIdMismatchException( domain.LedgerId(ledgerId), - domain.LedgerId(initialLedgerId.get), + domain.LedgerId(providedLedgerId), )) } else { Success(ledgerId) diff --git a/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriterIntegrationSpecBase.scala b/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriterIntegrationSpecBase.scala index ab69718a78..cbf69083af 100644 --- a/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriterIntegrationSpecBase.scala +++ b/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriterIntegrationSpecBase.scala @@ -20,7 +20,7 @@ abstract class SqlLedgerReaderWriterIntegrationSpecBase(implementationName: Stri override protected final val startIndex: Long = StartIndex override protected final def participantStateFactory( - ledgerId: Option[LedgerId], + ledgerId: LedgerId, participantId: ParticipantId, testId: String, metrics: Metrics, diff --git a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Config.scala b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Config.scala index 347daecdd5..25a0b00fa7 100644 --- a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Config.scala +++ b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Config.scala @@ -6,6 +6,7 @@ package com.daml.ledger.participant.state.kvutils.app import java.io.File import java.nio.file.Path import java.time.Duration +import java.util.UUID import com.daml.caching import com.daml.ledger.api.tls.TlsConfiguration @@ -19,7 +20,7 @@ import com.daml.resources.ResourceOwner import scopt.OptionParser final case class Config[Extra]( - ledgerId: Option[String], + ledgerId: String, archiveFiles: Seq[Path], tlsConfig: Option[TlsConfiguration], participants: Seq[ParticipantConfig], @@ -59,7 +60,7 @@ object Config { def createDefault[Extra](extra: Extra): Config[Extra] = Config( - ledgerId = None, + ledgerId = UUID.randomUUID().toString, archiveFiles = Vector.empty, tlsConfig = None, participants = Vector.empty, @@ -123,9 +124,9 @@ object Config { config.copy(participants = config.participants :+ partConfig) }) opt[String]("ledger-id") - .text( - "The ID of the ledger. This must be the same each time the ledger is started. Defaults to a random UUID.") - .action((ledgerId, config) => config.copy(ledgerId = Some(ledgerId))) + .optional() + .text("The ID of the ledger. This must be the same each time the ledger is started. Defaults to a random UUID.") + .action((ledgerId, config) => config.copy(ledgerId = ledgerId)) opt[String]("pem") .optional() diff --git a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Runner.scala b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Runner.scala index 4f6ccfcf7a..76892cca86 100644 --- a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Runner.scala +++ b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Runner.scala @@ -89,12 +89,12 @@ final class Runner[T <: ReadWriteService, Extra]( lfValueTranslationCache = lfValueTranslationCache, ).acquire() _ <- new StandaloneApiServer( + ledgerId = config.ledgerId, config = factory.apiServerConfig(participantConfig, config), commandConfig = factory.commandConfig(participantConfig, config), partyConfig = factory.partyConfig(config), ledgerConfig = factory.ledgerConfig(config), - readService = readService, - writeService = writeService, + optWriteService = Some(writeService), authService = factory.authService(config), transformIndexService = service => new TimedIndexService(service, metrics), metrics = metrics, diff --git a/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/ParticipantStateIntegrationSpecBase.scala b/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/ParticipantStateIntegrationSpecBase.scala index 93e496b7cc..167ae3ef2c 100644 --- a/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/ParticipantStateIntegrationSpecBase.scala +++ b/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/ParticipantStateIntegrationSpecBase.scala @@ -55,17 +55,17 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)( protected val isPersistent: Boolean = true protected def participantStateFactory( - ledgerId: Option[LedgerId], + ledgerId: LedgerId, participantId: ParticipantId, testId: String, metrics: Metrics, )(implicit logCtx: LoggingContext): ResourceOwner[ParticipantState] private def participantState: ResourceOwner[ParticipantState] = - newParticipantState() + newParticipantState(Ref.LedgerString.assertFromString(UUID.randomUUID.toString)) private def newParticipantState( - ledgerId: Option[LedgerId] = None, + ledgerId: LedgerId, ): ResourceOwner[ParticipantState] = newLoggingContext { implicit logCtx => participantStateFactory(ledgerId, participantId, testId, new Metrics(new MetricRegistry)) @@ -83,7 +83,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)( implementationName should { "return initial conditions" in { val ledgerId = newLedgerId() - newParticipantState(ledgerId = Some(ledgerId)).use { ps => + newParticipantState(ledgerId = ledgerId).use { ps => for { conditions <- ps .getLedgerInitialConditions() @@ -596,10 +596,10 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)( "store the ledger ID and re-use it" in { val ledgerId = newLedgerId() for { - retrievedLedgerId1 <- newParticipantState(ledgerId = Some(ledgerId)).use { ps => + retrievedLedgerId1 <- newParticipantState(ledgerId = ledgerId).use { ps => ps.getLedgerInitialConditions().map(_.ledgerId).runWith(Sink.head) } - retrievedLedgerId2 <- newParticipantState().use { ps => + retrievedLedgerId2 <- newParticipantState(ledgerId = ledgerId).use { ps => ps.getLedgerInitialConditions().map(_.ledgerId).runWith(Sink.head) } } yield { @@ -612,10 +612,10 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)( val ledgerId = newLedgerId() val attemptedLedgerId = newLedgerId() for { - _ <- newParticipantState(ledgerId = Some(ledgerId)).use { _ => + _ <- newParticipantState(ledgerId = ledgerId).use { _ => Future.unit } - exception <- newParticipantState(ledgerId = Some(attemptedLedgerId)).use { _ => + exception <- newParticipantState(ledgerId = attemptedLedgerId).use { _ => Future.unit }.failed } yield { @@ -629,14 +629,14 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)( "resume where it left off on restart" in { val ledgerId = newLedgerId() for { - _ <- newParticipantState(ledgerId = Some(ledgerId)).use { ps => + _ <- newParticipantState(ledgerId = ledgerId).use { ps => for { _ <- ps .allocateParty(None, Some("party-1"), newSubmissionId()) .toScala } yield () } - updates <- newParticipantState().use { ps => + updates <- newParticipantState(ledgerId = ledgerId).use { ps => for { _ <- ps .allocateParty(None, Some("party-2"), newSubmissionId()) diff --git a/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala b/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala index 5060f3c439..baa4118477 100644 --- a/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala +++ b/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala @@ -188,7 +188,7 @@ class RecoveringIndexerIntegrationSpec extends AsyncWordSpec with Matchers with for { actorSystem <- AkkaResourceOwner.forActorSystem(() => ActorSystem()) materializer <- AkkaResourceOwner.forMaterializer(() => Materializer(actorSystem)) - participantState <- newParticipantState(Some(ledgerId), participantId)(materializer, logCtx) + participantState <- newParticipantState(ledgerId, participantId)(materializer, logCtx) _ <- new StandaloneIndexerServer( readService = participantState, config = IndexerConfig( @@ -226,14 +226,14 @@ object RecoveringIndexerIntegrationSpec { SubmissionId.assertFromString(UUID.randomUUID().toString) private trait ParticipantStateFactory { - def apply(ledgerId: Option[LedgerId], participantId: ParticipantId)( + def apply(ledgerId: LedgerId, participantId: ParticipantId)( implicit materializer: Materializer, logCtx: LoggingContext, ): ResourceOwner[ParticipantState] } private object SimpleParticipantState extends ParticipantStateFactory { - override def apply(ledgerId: Option[LedgerId], participantId: ParticipantId)( + override def apply(ledgerId: LedgerId, participantId: ParticipantId)( implicit materializer: Materializer, logCtx: LoggingContext ): ResourceOwner[ParticipantState] = { @@ -249,7 +249,7 @@ object RecoveringIndexerIntegrationSpec { } private object ParticipantStateThatFailsOften extends ParticipantStateFactory { - override def apply(ledgerId: Option[LedgerId], participantId: ParticipantId)( + override def apply(ledgerId: LedgerId, participantId: ParticipantId)( implicit materializer: Materializer, logCtx: LoggingContext ): ResourceOwner[ParticipantState] = diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/apiserver/ApiServices.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/apiserver/ApiServices.scala index 10866200bc..0f042d9660 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/apiserver/ApiServices.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/apiserver/ApiServices.scala @@ -35,7 +35,11 @@ import com.daml.platform.configuration.{ LedgerConfiguration, PartyConfiguration } -import com.daml.platform.server.api.services.grpc.GrpcHealthService +import com.daml.platform.server.api.services.grpc.{ + GrpcCommandCompletionService, + GrpcHealthService, + GrpcTransactionService +} import com.daml.platform.services.time.TimeProviderType import com.daml.resources.{Resource, ResourceOwner} import io.grpc.BindableService @@ -64,7 +68,7 @@ object ApiServices { class Owner( participantId: Ref.ParticipantId, - writeService: WriteService, + optWriteService: Option[WriteService], indexService: IndexService, authorizer: Authorizer, engine: Engine, @@ -99,7 +103,7 @@ object ApiServices { ledgerId <- identityService.getLedgerId() ledgerConfigProvider = LedgerConfigProvider.create( configManagementService, - writeService, + optWriteService, timeProvider, ledgerConfiguration) services = createServices(ledgerId, ledgerConfigProvider)(mat.system.dispatcher) @@ -118,37 +122,6 @@ object ApiServices { private def createServices(ledgerId: LedgerId, ledgerConfigProvider: LedgerConfigProvider)( implicit executionContext: ExecutionContext): List[BindableService] = { - val commandExecutor = new TimedCommandExecutor( - new LedgerTimeAwareCommandExecutor( - new StoreBackedCommandExecutor( - engine, - participantId, - packagesService, - contractStore, - metrics, - ), - contractStore, - maxRetries = 3, - metrics, - ), - metrics, - ) - val apiSubmissionService = ApiSubmissionService.create( - ledgerId, - contractStore, - writeService, - submissionService, - partyManagementService, - timeProvider, - timeProviderType, - ledgerConfigProvider, - seedService, - commandExecutor, - ApiSubmissionService.Configuration( - partyConfig.implicitPartyAllocation, - ), - metrics, - ) logger.info(engine.info.show) @@ -166,29 +139,6 @@ object ApiServices { val apiCompletionService = ApiCommandCompletionService.create(ledgerId, completionsService) - // Note: the command service uses the command submission, command completion, and transaction - // services internally. These connections do not use authorization, authorization wrappers are - // only added here to all exposed services. - val apiCommandService = ApiCommandService.create( - ApiCommandService.Configuration( - ledgerId, - commandConfig.inputBufferSize, - commandConfig.maxCommandsInFlight, - commandConfig.limitMaxCommandsInFlight, - commandConfig.retentionPeriod, - ), - // Using local services skips the gRPC layer, improving performance. - ApiCommandService.LocalServices( - CommandSubmissionFlow(apiSubmissionService.submit, commandConfig.maxCommandsInFlight), - r => apiCompletionService.completionStreamSource(r), - () => apiCompletionService.completionEnd(CompletionEndRequest(ledgerId.unwrap)), - apiTransactionService.getTransactionById, - apiTransactionService.getFlatTransactionById - ), - timeProvider, - ledgerConfigProvider, - ) - val apiActiveContractsService = ApiActiveContractsService.create(ledgerId, activeContractsService) @@ -203,42 +153,119 @@ object ApiServices { ) } - val apiPartyManagementService = - ApiPartyManagementService - .createApiService(partyManagementService, transactionsService, writeService) - - val apiPackageManagementService = - ApiPackageManagementService - .createApiService(indexService, transactionsService, writeService, timeProvider) - - val apiConfigManagementService = - ApiConfigManagementService - .createApiService( - configManagementService, - writeService, - timeProvider, - ledgerConfiguration) + val writeServiceBackedApiServices = + intitializeWriteServiceBackedApiServices( + ledgerId, + ledgerConfigProvider, + apiCompletionService, + apiTransactionService) val apiReflectionService = ProtoReflectionService.newInstance() val apiHealthService = new GrpcHealthService(healthChecks) apiTimeServiceOpt.toList ::: + writeServiceBackedApiServices ::: List( new LedgerIdentityServiceAuthorization(apiLedgerIdentityService, authorizer), new PackageServiceAuthorization(apiPackageService, authorizer), new LedgerConfigurationServiceAuthorization(apiConfigurationService, authorizer), - new CommandSubmissionServiceAuthorization(apiSubmissionService, authorizer), new TransactionServiceAuthorization(apiTransactionService, authorizer), new CommandCompletionServiceAuthorization(apiCompletionService, authorizer), - new CommandServiceAuthorization(apiCommandService, authorizer), new ActiveContractsServiceAuthorization(apiActiveContractsService, authorizer), - new PartyManagementServiceAuthorization(apiPartyManagementService, authorizer), - new PackageManagementServiceAuthorization(apiPackageManagementService, authorizer), - new ConfigManagementServiceAuthorization(apiConfigManagementService, authorizer), apiReflectionService, apiHealthService, ) } + + private def intitializeWriteServiceBackedApiServices( + ledgerId: LedgerId, + ledgerConfigProvider: LedgerConfigProvider, + apiCompletionService: GrpcCommandCompletionService, + apiTransactionService: GrpcTransactionService)( + implicit mat: Materializer, + ec: ExecutionContext, + logCtx: LoggingContext): List[BindableService] = { + optWriteService.toList.flatMap { writeService => + val commandExecutor = new TimedCommandExecutor( + new LedgerTimeAwareCommandExecutor( + new StoreBackedCommandExecutor( + engine, + participantId, + packagesService, + contractStore, + metrics, + ), + contractStore, + maxRetries = 3, + metrics, + ), + metrics, + ) + + val apiSubmissionService = ApiSubmissionService.create( + ledgerId, + contractStore, + writeService, + submissionService, + partyManagementService, + timeProvider, + timeProviderType, + ledgerConfigProvider, + seedService, + commandExecutor, + ApiSubmissionService.Configuration( + partyConfig.implicitPartyAllocation, + ), + metrics, + ) + + // Note: the command service uses the command submission, command completion, and transaction + // services internally. These connections do not use authorization, authorization wrappers are + // only added here to all exposed services. + val apiCommandService = ApiCommandService.create( + ApiCommandService.Configuration( + ledgerId, + commandConfig.inputBufferSize, + commandConfig.maxCommandsInFlight, + commandConfig.limitMaxCommandsInFlight, + commandConfig.retentionPeriod, + ), + // Using local services skips the gRPC layer, improving performance. + ApiCommandService.LocalServices( + CommandSubmissionFlow(apiSubmissionService.submit, commandConfig.maxCommandsInFlight), + r => apiCompletionService.completionStreamSource(r), + () => apiCompletionService.completionEnd(CompletionEndRequest(ledgerId.unwrap)), + apiTransactionService.getTransactionById, + apiTransactionService.getFlatTransactionById + ), + timeProvider, + ledgerConfigProvider, + ) + val apiPartyManagementService = + ApiPartyManagementService + .createApiService(partyManagementService, transactionsService, writeService) + + val apiPackageManagementService = + ApiPackageManagementService + .createApiService(indexService, transactionsService, writeService, timeProvider) + + val apiConfigManagementService = + ApiConfigManagementService + .createApiService( + configManagementService, + writeService, + timeProvider, + ledgerConfiguration) + + List( + new CommandSubmissionServiceAuthorization(apiSubmissionService, authorizer), + new CommandServiceAuthorization(apiCommandService, authorizer), + new PartyManagementServiceAuthorization(apiPartyManagementService, authorizer), + new PackageManagementServiceAuthorization(apiPackageManagementService, authorizer), + new ConfigManagementServiceAuthorization(apiConfigManagementService, authorizer), + ) + } + } } } diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/apiserver/StandaloneApiServer.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/apiserver/StandaloneApiServer.scala index b17e71ad74..60ef5599de 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/apiserver/StandaloneApiServer.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/apiserver/StandaloneApiServer.scala @@ -9,7 +9,6 @@ import java.time.Instant import akka.actor.ActorSystem import akka.stream.Materializer -import akka.stream.scaladsl.Sink import com.daml.api.util.TimeProvider import com.daml.buildinfo.BuildInfo import com.daml.ledger.api.auth.interceptor.AuthorizationInterceptor @@ -17,7 +16,7 @@ import com.daml.ledger.api.auth.{AuthService, Authorizer} import com.daml.ledger.api.domain import com.daml.ledger.api.health.HealthChecks import com.daml.ledger.participant.state.index.v2.IndexService -import com.daml.ledger.participant.state.v1.{ParticipantId, ReadService, SeedService, WriteService} +import com.daml.ledger.participant.state.v1.{LedgerId, ParticipantId, SeedService, WriteService} import com.daml.lf.engine.Engine import com.daml.logging.{ContextualizedLogger, LoggingContext} import com.daml.metrics.Metrics @@ -25,7 +24,7 @@ import com.daml.platform.configuration.{ CommandConfiguration, LedgerConfiguration, PartyConfiguration, - ServerRole, + ServerRole } import com.daml.platform.index.JdbcIndex import com.daml.platform.packages.InMemoryPackageStore @@ -42,12 +41,12 @@ import scala.concurrent.ExecutionContext // Main entry point to start an index server that also hosts the ledger API. // See v2.ReferenceServer on how it is used. final class StandaloneApiServer( + ledgerId: LedgerId, config: ApiServerConfig, commandConfig: CommandConfiguration, partyConfig: PartyConfiguration, ledgerConfig: LedgerConfiguration, - readService: ReadService, - writeService: WriteService, + optWriteService: Option[WriteService], authService: AuthService, transformIndexService: IndexService => IndexService = identity, metrics: Metrics, @@ -69,17 +68,10 @@ final class StandaloneApiServer( preloadPackages(packageStore) val owner = for { - initialConditions <- ResourceOwner.forFuture(() => - readService.getLedgerInitialConditions().runWith(Sink.head)) - authorizer = new Authorizer( - () => java.time.Clock.systemUTC.instant(), - initialConditions.ledgerId, - participantId) indexService <- JdbcIndex .owner( ServerRole.ApiServer, - initialConditions.config, - domain.LedgerId(initialConditions.ledgerId), + domain.LedgerId(ledgerId), participantId, config.jdbcUrl, config.eventsPageSize, @@ -87,15 +79,16 @@ final class StandaloneApiServer( lfValueTranslationCache, ) .map(transformIndexService) + authorizer = new Authorizer( + () => java.time.Clock.systemUTC.instant(), + ledgerId, + participantId) healthChecks = new HealthChecks( - "index" -> indexService, - "read" -> readService, - "write" -> writeService, - ) + Seq("index" -> indexService) ++ optWriteService.toList.map("write" -> _): _*) executionSequencerFactory <- new ExecutionSequencerFactoryOwner() apiServicesOwner = new ApiServices.Owner( participantId = participantId, - writeService = writeService, + optWriteService = optWriteService, indexService = indexService, authorizer = authorizer, engine = engine, @@ -124,7 +117,7 @@ final class StandaloneApiServer( } yield { writePortFile(apiServer.port) logger.info( - s"Initialized API server version ${BuildInfo.Version} with ledger-id = ${initialConditions.ledgerId}, port = ${apiServer.port}, dar file = ${config.archiveFiles}") + s"Initialized API server version ${BuildInfo.Version} with ledger-id = $ledgerId, port = ${apiServer.port}, dar file = ${config.archiveFiles}") apiServer } diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/apiserver/services/LedgerConfigProvider.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/apiserver/services/LedgerConfigProvider.scala index 397b895956..08847a4b27 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/apiserver/services/LedgerConfigProvider.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/apiserver/services/LedgerConfigProvider.scala @@ -37,7 +37,7 @@ import scala.concurrent.duration.{DurationInt, DurationLong} */ final class LedgerConfigProvider private ( index: IndexConfigManagementService, - writeService: WriteService, + optWriteService: Option[WriteService], timeProvider: TimeProvider, config: LedgerConfiguration, materializer: Materializer, @@ -62,10 +62,12 @@ final class LedgerConfigProvider private ( readyPromise.trySuccess(()) () }) - materializer.scheduleOnce(config.initialConfigurationSubmitDelay.toNanos.nanos, () => { - if (latestConfiguration.isEmpty) submitInitialConfig() - () - }) + optWriteService.foreach { writeService => + materializer.scheduleOnce(config.initialConfigurationSubmitDelay.toNanos.nanos, () => { + if (latestConfiguration.isEmpty) submitInitialConfig(writeService) + () + }) + } // Looks up the latest ledger configuration, then subscribes to a // stream of configuration changes. @@ -118,7 +120,7 @@ final class LedgerConfigProvider private ( () } - private[this] def submitInitialConfig(): Future[Unit] = { + private[this] def submitInitialConfig(writeService: WriteService): Future[Unit] = { implicit val executionContext: ExecutionContext = DE // There are several reasons why the change could be rejected: // - The participant is not authorized to set the configuration @@ -168,10 +170,10 @@ object LedgerConfigProvider { def create( index: IndexConfigManagementService, - writeService: WriteService, + optWriteService: Option[WriteService], timeProvider: TimeProvider, config: LedgerConfiguration)( implicit materializer: Materializer, logCtx: LoggingContext): LedgerConfigProvider = - new LedgerConfigProvider(index, writeService, timeProvider, config, materializer) + new LedgerConfigProvider(index, optWriteService, timeProvider, config, materializer) } diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/JdbcIndex.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/JdbcIndex.scala index e5be373092..86e1731ca9 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/JdbcIndex.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/JdbcIndex.scala @@ -3,13 +3,10 @@ package com.daml.platform.index -import akka.NotUsed import akka.stream.Materializer -import akka.stream.scaladsl.Source import com.daml.ledger.api.domain.LedgerId -import com.daml.ledger.participant.state.index.v2 import com.daml.ledger.participant.state.index.v2.IndexService -import com.daml.ledger.participant.state.v1.{Configuration, ParticipantId} +import com.daml.ledger.participant.state.v1.ParticipantId import com.daml.logging.LoggingContext import com.daml.metrics.Metrics import com.daml.platform.configuration.ServerRole @@ -19,7 +16,6 @@ import com.daml.resources.ResourceOwner object JdbcIndex { def owner( serverRole: ServerRole, - initialConfig: Configuration, ledgerId: LedgerId, participantId: ParticipantId, jdbcUrl: String, @@ -35,10 +31,6 @@ object JdbcIndex { metrics, lfValueTranslationCache, ).map { ledger => - new LedgerBackedIndexService(MeteredReadOnlyLedger(ledger, metrics), participantId) { - override def getLedgerConfiguration(): Source[v2.LedgerConfiguration, NotUsed] = - // FIXME(JM): The indexer should on start set the default configuration. - Source.single(v2.LedgerConfiguration(initialConfig.maxDeduplicationTime)) - } + new LedgerBackedIndexService(MeteredReadOnlyLedger(ledger, metrics), participantId) } } diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/LedgerBackedIndexService.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/LedgerBackedIndexService.scala index 6d6cfa2277..8d3e60b7b7 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/LedgerBackedIndexService.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/LedgerBackedIndexService.scala @@ -19,6 +19,7 @@ import com.daml.lf.value.Value.{ContractId, ContractInst} import com.daml.daml_lf_dev.DamlLf.Archive import com.daml.dec.{DirectExecutionContext => DEC} import com.daml.ledger.api.domain +import com.daml.ledger.api.domain.ConfigurationEntry.Accepted import com.daml.ledger.api.domain.{ ApplicationId, CommandId, @@ -48,7 +49,7 @@ import scalaz.syntax.tag.ToTagOps import scala.concurrent.Future -abstract class LedgerBackedIndexService( +final class LedgerBackedIndexService( ledger: ReadOnlyLedger, participantId: ParticipantId, )(implicit mat: Materializer) @@ -237,6 +238,26 @@ abstract class LedgerBackedIndexService( .lookupLedgerConfiguration() .map(_.map { case (offset, config) => (toAbsolute(offset), config) })(DEC) + /** Looks up the current configuration, if set, and continues to stream configuration changes. + * + */ + override def getLedgerConfiguration(): Source[LedgerConfiguration, NotUsed] = { + Source + .future(lookupConfiguration()) + .flatMapConcat { optResult => + val offset = optResult.map(_._1) + val foundConfig = optResult.map(_._2) + + val initialConfig = Source(foundConfig.toList) + val configStream = configurationEntries(offset).collect { + case (_, Accepted(_, _, configuration)) => configuration + } + initialConfig + .concat(configStream) + .map(cfg => LedgerConfiguration(cfg.maxDeduplicationTime)) + } + } + /** Retrieve configuration entries. */ override def configurationEntries(startExclusive: Option[LedgerOffset.Absolute]) : Source[(domain.LedgerOffset.Absolute, domain.ConfigurationEntry), NotUsed] = diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/ReadOnlySqlLedger.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/ReadOnlySqlLedger.scala index 47a5796bd8..e1b9ef3d9e 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/ReadOnlySqlLedger.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/index/ReadOnlySqlLedger.scala @@ -15,13 +15,14 @@ import com.daml.ledger.participant.state.v1.Offset import com.daml.logging.{ContextualizedLogger, LoggingContext} import com.daml.metrics.Metrics import com.daml.platform.akkastreams.dispatcher.Dispatcher -import com.daml.platform.common.LedgerIdMismatchException +import com.daml.platform.common.{LedgerIdMismatchException, LedgerIdNotFoundException} import com.daml.platform.configuration.ServerRole import com.daml.platform.store.dao.events.LfValueTranslation import com.daml.platform.store.dao.{JdbcLedgerDao, LedgerReadDao} import com.daml.platform.store.{BaseLedger, ReadOnlyLedger} import com.daml.resources.ProgramResource.StartupException import com.daml.resources.{Resource, ResourceOwner} +import com.daml.timer.RetryStrategy import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext, Future} @@ -45,7 +46,7 @@ object ReadOnlySqlLedger { ): Resource[ReadOnlyLedger] = for { ledgerDao <- ledgerDaoOwner().acquire() - ledgerId <- Resource.fromFuture(verifyOrSetLedgerId(ledgerDao, initialLedgerId)) + ledgerId <- Resource.fromFuture(verifyLedgerId(ledgerDao, initialLedgerId)) ledgerEnd <- Resource.fromFuture(ledgerDao.lookupLedgerEnd()) dispatcher <- dispatcherOwner(ledgerEnd).acquire() ledger <- ResourceOwner @@ -53,22 +54,37 @@ object ReadOnlySqlLedger { .acquire() } yield ledger - private def verifyOrSetLedgerId( + private def verifyLedgerId( ledgerDao: LedgerReadDao, initialLedgerId: LedgerId, - )(implicit executionContext: ExecutionContext, logCtx: LoggingContext): Future[LedgerId] = - ledgerDao - .lookupLedgerId() - .flatMap { - case Some(`initialLedgerId`) => - logger.info(s"Found existing ledger with ID: $initialLedgerId") - Future.successful(initialLedgerId) - case Some(foundLedgerId) => - Future.failed( - new LedgerIdMismatchException(foundLedgerId, initialLedgerId) with StartupException) - case None => - Future.successful(initialLedgerId) - } + )(implicit executionContext: ExecutionContext, logCtx: LoggingContext): Future[LedgerId] = { + val predicate: PartialFunction[Throwable, Boolean] = { + case _: LedgerIdNotFoundException => true + case _: LedgerIdMismatchException => false + case _ => false + } + val retryDelay = 5.seconds + val maxAttempts = 100 + RetryStrategy.constant(attempts = Some(maxAttempts), waitTime = retryDelay)(predicate) { + (attempt, _wait) => + ledgerDao + .lookupLedgerId() + .flatMap { + case Some(`initialLedgerId`) => + logger.info(s"Found existing ledger with ID: $initialLedgerId") + Future.successful(initialLedgerId) + case Some(foundLedgerId) => + Future.failed( + new LedgerIdMismatchException(foundLedgerId, initialLedgerId) + with StartupException) + case None => + logger.info( + s"Ledger ID not found in the index database on attempt $attempt/$maxAttempts. Retrying again in $retryDelay.") + Future.failed(new LedgerIdNotFoundException(attempt)) + } + } + + } private def ledgerDaoOwner(): ResourceOwner[LedgerReadDao] = JdbcLedgerDao.readOwner( diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/SandboxServer.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/SandboxServer.scala index 85b39be005..a2fdbc3299 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/SandboxServer.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/SandboxServer.scala @@ -303,7 +303,7 @@ final class SandboxServer( executionSequencerFactory <- new ExecutionSequencerFactoryOwner().acquire() apiServicesOwner = new ApiServices.Owner( participantId = participantId, - writeService = new TimedWriteService(indexAndWriteService.writeService, metrics), + optWriteService = Some(new TimedWriteService(indexAndWriteService.writeService, metrics)), indexService = new TimedIndexService(indexAndWriteService.indexService, metrics), authorizer = authorizer, engine = SandboxServer.engine, diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/SandboxIndexAndWriteService.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/SandboxIndexAndWriteService.scala index 61ab908b3e..680f1c8c6c 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/SandboxIndexAndWriteService.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/SandboxIndexAndWriteService.scala @@ -6,7 +6,6 @@ package com.daml.platform.sandbox.stores import java.time.Instant import java.util.concurrent.CompletionStage -import akka.NotUsed import akka.stream.Materializer import akka.stream.scaladsl.{Sink, Source} import com.daml.api.util.TimeProvider @@ -40,7 +39,7 @@ import org.slf4j.LoggerFactory import scala.compat.java8.FutureConverters import scala.concurrent.duration._ -import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.concurrent.{ExecutionContext, Future} trait IndexAndWriteService { def indexService: IndexService @@ -116,12 +115,7 @@ object SandboxIndexAndWriteService { initialConfig: Configuration, timeProvider: TimeProvider, )(implicit mat: Materializer): ResourceOwner[IndexAndWriteService] = { - val indexSvc = new LedgerBackedIndexService(ledger, participantId) { - override def getLedgerConfiguration(): Source[LedgerConfiguration, NotUsed] = - Source - .single(LedgerConfiguration(initialConfig.maxDeduplicationTime)) - .concat(Source.future(Promise[LedgerConfiguration]().future)) // we should keep the stream open! - } + val indexSvc = new LedgerBackedIndexService(ledger, participantId) val writeSvc = new LedgerBackedWriteService(ledger, timeProvider) for { diff --git a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandboxnext/Runner.scala b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandboxnext/Runner.scala index e4ef005759..a6f32d8070 100644 --- a/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandboxnext/Runner.scala +++ b/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandboxnext/Runner.scala @@ -140,9 +140,10 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] { case TimeProviderType.WallClock => None } + ledgerId = specifiedLedgerId.getOrElse(UUID.randomUUID().toString) isReset = startupMode == StartupMode.ResetAndStart readerWriter <- new SqlLedgerReaderWriter.Owner( - initialLedgerId = specifiedLedgerId, + ledgerId = ledgerId, participantId = ParticipantId, metrics = metrics, jdbcUrl = ledgerJdbcUrl, @@ -200,6 +201,7 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] { ) } apiServer <- new StandaloneApiServer( + ledgerId = ledgerId, config = ApiServerConfig( participantId = ParticipantId, archiveFiles = if (isReset) List.empty else config.damlPackages, @@ -219,8 +221,7 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] { implicitPartyAllocation = config.implicitPartyAllocation, ), ledgerConfig = config.ledgerConfig, - readService = readService, - writeService = writeService, + optWriteService = Some(writeService), authService = authService, transformIndexService = new TimedIndexService(_, metrics), metrics = metrics,