From 9c477ffdf554eed34302bf5e6299042eb23c6b93 Mon Sep 17 00:00:00 2001 From: Samir Talwar Date: Fri, 5 Feb 2021 09:56:55 +0100 Subject: [PATCH] ledger-on-memory: Only support pre-execution. [KVL-821] (#8757) * ledger-on-memory: Rename `PreExecutingOwner` to just `Owner`. * ledger-on-memory: Push the committer execution context up a little. * ledger-on-memory: Use pre-execution in tests, not batching. * ledger-on-memory: Remove support for batching submissions. Pre-execution is pretty much always faster; let's use it. CHANGELOG_BEGIN CHANGELOG_END * ledger-on-memory: Simplify construction. Now we only support pre-execution, we can be less general. --- .../script/test/MultiParticipantFixture.scala | 13 +- ledger/ledger-on-memory/BUILD.bazel | 74 ++------ .../daml/ledger/on/memory/ExtraConfig.scala | 20 -- .../on/memory/InMemoryLedgerFactory.scala | 92 ++------- .../com/daml/ledger/on/memory/Main.scala | 7 +- .../com/daml/ledger/on/memory/Owner.scala | 2 +- .../memory/InMemoryLedgerReaderWriter.scala | 70 +------ .../on/memory/InMemoryLedgerWriter.scala | 179 +++++------------- .../com/daml/ledger/on/memory/package.scala | 2 +- ...edgerReaderWriterIntegrationSpecBase.scala | 56 ------ .../src/test/resources/logback-test.xml | 2 +- ...oryLedgerReaderWriterIntegrationSpec.scala | 43 ++++- .../on/memory/InMemoryLedgerWriterSpec.scala | 31 ++- ...oryLedgerReaderWriterIntegrationSpec.scala | 7 - ledger/participant-state/kvutils/BUILD.bazel | 1 - .../state/kvutils/app/Config.scala | 3 + .../com/daml/ledger/validator/package.scala | 12 +- .../RecoveringIndexerIntegrationSpec.scala | 29 +-- 18 files changed, 175 insertions(+), 468 deletions(-) delete mode 100644 ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/ExtraConfig.scala delete mode 100644 ledger/ledger-on-memory/src/test/lib/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriterIntegrationSpecBase.scala delete mode 100644 ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/NonBatchedInMemoryLedgerReaderWriterIntegrationSpec.scala diff --git a/daml-script/test/src/test-utils/com/daml/lf/engine/script/test/MultiParticipantFixture.scala b/daml-script/test/src/test-utils/com/daml/lf/engine/script/test/MultiParticipantFixture.scala index 7679e088f6..ac64cab118 100644 --- a/daml-script/test/src/test-utils/com/daml/lf/engine/script/test/MultiParticipantFixture.scala +++ b/daml-script/test/src/test-utils/com/daml/lf/engine/script/test/MultiParticipantFixture.scala @@ -9,7 +9,7 @@ import java.util.stream.Collectors import com.daml.bazeltools.BazelRunfiles._ import com.daml.ledger.api.testing.utils.{AkkaBeforeAndAfterAll, OwnedResource, SuiteResource} import com.daml.ledger.api.tls.TlsConfiguration -import com.daml.ledger.on.memory.{ExtraConfig, Owner} +import com.daml.ledger.on.memory.Owner import com.daml.ledger.participant.state.kvutils.app.{ParticipantConfig, ParticipantRunMode} import com.daml.ledger.participant.state.kvutils.{app => kvutils} import com.daml.ledger.participant.state.v1 @@ -73,15 +73,10 @@ trait MultiParticipantFixture for { _ <- Owner( kvutils.Config - .createDefault(ExtraConfig.reasonableDefault) + .createDefault(()) .copy( - participants = Seq( - participant1, - participant2, - ), - archiveFiles = Seq( - darFile - ), + participants = Seq(participant1, participant2), + archiveFiles = Seq(darFile), ) ) } yield (readPortfile(participant1Portfile), readPortfile(participant2Portfile)) diff --git a/ledger/ledger-on-memory/BUILD.bazel b/ledger/ledger-on-memory/BUILD.bazel index c5f25cc557..cd04c2afce 100644 --- a/ledger/ledger-on-memory/BUILD.bazel +++ b/ledger/ledger-on-memory/BUILD.bazel @@ -33,46 +33,13 @@ da_scala_library( "//ledger/metrics", "//ledger/participant-state", "//ledger/participant-state/kvutils", - "//libs-scala/concurrent", "//libs-scala/contextualized-logging", "//libs-scala/resources", - "//libs-scala/resources-akka", - "//libs-scala/resources-grpc", "@maven//:com_google_protobuf_protobuf_java", "@maven//:io_dropwizard_metrics_metrics_core", ], ) -da_scala_library( - name = "ledger-on-memory-test-lib", - srcs = glob(["src/test/lib/scala/**/*.scala"]), - scala_deps = [ - "@maven//:com_typesafe_akka_akka_actor", - "@maven//:com_typesafe_akka_akka_stream", - "@maven//:org_scalactic_scalactic", - "@maven//:org_scalatest_scalatest", - ], - deps = [ - ":ledger-on-memory", - "//daml-lf/data", - "//daml-lf/engine", - "//language-support/scala/bindings", - "//ledger-api/rs-grpc-bridge", - "//ledger-api/testing-utils", - "//ledger/caching", - "//ledger/ledger-api-common", - "//ledger/ledger-api-health", - "//ledger/ledger-resources", - "//ledger/ledger-resources:ledger-resources-test-lib", - "//ledger/metrics", - "//ledger/participant-state", - "//ledger/participant-state/kvutils", - "//ledger/participant-state/kvutils:kvutils-tests-lib", - "//libs-scala/contextualized-logging", - "//libs-scala/resources", - ], -) - da_scala_test_suite( name = "ledger-on-memory-tests", size = "small", @@ -94,11 +61,12 @@ da_scala_test_suite( ], deps = [ ":ledger-on-memory", - ":ledger-on-memory-test-lib", "//daml-lf/data", + "//daml-lf/engine", "//language-support/scala/bindings", "//ledger-api/rs-grpc-bridge", "//ledger-api/testing-utils", + "//ledger/caching", "//ledger/ledger-api-common", "//ledger/ledger-api-health", "//ledger/ledger-resources", @@ -109,6 +77,8 @@ da_scala_test_suite( "//ledger/participant-state/kvutils:kvutils-tests-lib", "//libs-scala/contextualized-logging", "//libs-scala/resources", + "//libs-scala/resources-akka", + "//libs-scala/resources-grpc", "@maven//:io_dropwizard_metrics_metrics_core", "@maven//:org_mockito_mockito_core", ], @@ -164,23 +134,7 @@ conformance_test( server = ":app", server_args = [ "--contract-id-seeding=testing-weak", - "--participant participant-id=example,port=6865", - "--batching enable=true,max-batch-size-bytes=262144", - ], - test_tool_args = [ - "--verbose", - ], -) - -conformance_test( - name = "conformance-test-pre-execution", - ports = [6865], - server = ":app", - server_args = [ - "--contract-id-seeding=testing-weak", - "--participant participant-id=example,port=6865", - "--batching enable=false", - "--force-pre-execution=true", + "--participant=participant-id=example,port=6865", ], test_tool_args = [ "--verbose", @@ -196,9 +150,8 @@ conformance_test( server = ":app", server_args = [ "--contract-id-seeding=testing-weak", - "--participant participant-id=example1,port=6865", - "--participant participant-id=example2,port=6866", - "--batching enable=true,max-batch-size-bytes=262144", + "--participant=participant-id=example1,port=6865", + "--participant=participant-id=example2,port=6866", ], test_tool_args = [ "--verbose", @@ -214,9 +167,8 @@ conformance_test( server = ":app", server_args = [ "--contract-id-seeding=testing-weak", - "--participant participant-id=split-example,port=6865,server-jdbc-url=jdbc:h2:mem:split-example;db_close_delay=-1;db_close_on_exit=false,shard-name=server1,run-mode=ledger-api-server", - "--participant participant-id=split-example,port=6865,server-jdbc-url=jdbc:h2:mem:split-example;db_close_delay=-1;db_close_on_exit=false,shard-name=indexer,run-mode=indexer", - "--batching enable=true,max-batch-size-bytes=262144", + "--participant=participant-id=split-example,port=6865,server-jdbc-url=jdbc:h2:mem:split-example;db_close_delay=-1;db_close_on_exit=false,shard-name=server1,run-mode=ledger-api-server", + "--participant=participant-id=split-example,port=6865,server-jdbc-url=jdbc:h2:mem:split-example;db_close_delay=-1;db_close_on_exit=false,shard-name=indexer,run-mode=indexer", ], test_tool_args = [ "--verbose", @@ -230,8 +182,7 @@ conformance_test( server = ":app", server_args = [ "--contract-id-seeding=testing-weak", - "--participant participant-id=example,port=6865", - "--batching enable=true,max-batch-size-bytes=262144", + "--participant=participant-id=example,port=6865", ], test_tool_args = [ "--verbose", @@ -245,8 +196,7 @@ conformance_test( server = ":app", server_args = [ "--contract-id-seeding=testing-weak", - "--participant participant-id=example,port=6865", - "--batching enable=true,max-batch-size-bytes=262144", + "--participant=participant-id=example,port=6865", ], test_tool_args = [ "--verbose", @@ -260,7 +210,7 @@ conformance_test( server = ":app", server_args = [ "--contract-id-seeding=testing-weak", - "--participant participant-id=example,port=6865", + "--participant=participant-id=example,port=6865", ], test_tool_args = [ "--verbose", diff --git a/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/ExtraConfig.scala b/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/ExtraConfig.scala deleted file mode 100644 index 4a84d28055..0000000000 --- a/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/ExtraConfig.scala +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.ledger.on.memory - -import com.daml.ledger.participant.state.kvutils.api.BatchingLedgerWriterConfig - -final case class ExtraConfig( - batchingLedgerWriterConfig: BatchingLedgerWriterConfig, - alwaysPreExecute: Boolean, -) - -object ExtraConfig { - val reasonableDefault: ExtraConfig = - ExtraConfig( - batchingLedgerWriterConfig = - BatchingLedgerWriterConfig.reasonableDefault.copy(maxBatchConcurrentCommits = 2), - alwaysPreExecute = false, - ) -} diff --git a/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/InMemoryLedgerFactory.scala b/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/InMemoryLedgerFactory.scala index 4013e2b88b..48bb17d42e 100644 --- a/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/InMemoryLedgerFactory.scala +++ b/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/InMemoryLedgerFactory.scala @@ -5,13 +5,7 @@ package com.daml.ledger.on.memory import akka.stream.Materializer import com.daml.caching -import com.daml.ledger.participant.state.kvutils.api.{ - BatchingLedgerWriterConfig, - KeyValueLedger, - KeyValueParticipantState, -} -import com.daml.ledger.participant.state.kvutils.app.batch.BatchingLedgerWriterConfigReader -import com.daml.ledger.participant.state.kvutils.app.batch.BatchingLedgerWriterConfigReader.optionsReader +import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState import com.daml.ledger.participant.state.kvutils.app.{Config, LedgerFactory, ParticipantConfig} import com.daml.ledger.participant.state.kvutils.caching.`Message Weight` import com.daml.ledger.resources.ResourceOwner @@ -23,30 +17,19 @@ import com.daml.platform.configuration.LedgerConfiguration import scopt.OptionParser private[memory] class InMemoryLedgerFactory(dispatcher: Dispatcher[Index], state: InMemoryState) - extends LedgerFactory[KeyValueParticipantState, ExtraConfig] { + extends LedgerFactory[KeyValueParticipantState, Unit] { override final def readWriteServiceOwner( - config: Config[ExtraConfig], + config: Config[Unit], participantConfig: ParticipantConfig, engine: Engine, )(implicit materializer: Materializer, loggingContext: LoggingContext, - ): ResourceOwner[KeyValueParticipantState] = - for { - readerWriter <- owner(config, participantConfig, engine) - } yield new KeyValueParticipantState( - readerWriter, - readerWriter, - createMetrics(participantConfig, config), - ) - - def owner(config: Config[ExtraConfig], participantConfig: ParticipantConfig, engine: Engine)( - implicit materializer: Materializer - ): ResourceOwner[KeyValueLedger] = { + ): ResourceOwner[KeyValueParticipantState] = { val metrics = createMetrics(participantConfig, config) - if (config.extra.alwaysPreExecute) - new InMemoryLedgerReaderWriter.PreExecutingOwner( + for { + readerWriter <- new InMemoryLedgerReaderWriter.Owner( ledgerId = config.ledgerId, participantId = participantConfig.participantId, keySerializationStrategy = DefaultStateKeySerializationStrategy, @@ -58,64 +41,19 @@ private[memory] class InMemoryLedgerFactory(dispatcher: Dispatcher[Index], state dispatcher = dispatcher, state = state, engine = engine, + committerExecutionContext = materializer.executionContext, ) - else - new InMemoryLedgerReaderWriter.BatchingOwner( - ledgerId = config.ledgerId, - batchingLedgerWriterConfig = config.extra.batchingLedgerWriterConfig, - participantId = participantConfig.participantId, - metrics = metrics, - stateValueCache = caching.WeightedCache.from( - configuration = config.stateValueCache, - metrics = metrics.daml.kvutils.submission.validator.stateValueCache, - ), - dispatcher = dispatcher, - state = state, - engine = engine, - ) + } yield new KeyValueParticipantState( + readerWriter, + readerWriter, + createMetrics(participantConfig, config), + ) } - override def ledgerConfig(config: Config[ExtraConfig]): LedgerConfiguration = + override def ledgerConfig(config: Config[Unit]): LedgerConfiguration = LedgerConfiguration.defaultLocalLedger - override def extraConfigParser(parser: OptionParser[Config[ExtraConfig]]): Unit = - InMemoryLedgerFactory.extraConfigParser(parser) + override def extraConfigParser(parser: OptionParser[Config[Unit]]): Unit = () - override val defaultExtraConfig: ExtraConfig = InMemoryLedgerFactory.defaultExtraConfig -} - -private[memory] object InMemoryLedgerFactory { - val defaultExtraConfig: ExtraConfig = ExtraConfig.reasonableDefault - - final def extraConfigParser(parser: OptionParser[Config[ExtraConfig]]): Unit = { - parser - .opt[BatchingLedgerWriterConfig]("batching") - .optional() - .text(BatchingLedgerWriterConfigReader.UsageText) - .action { case (parsedBatchingConfig, config) => - config.copy( - extra = config.extra.copy( - batchingLedgerWriterConfig = parsedBatchingConfig - ) - ) - } - parser - .opt[Boolean]("force-pre-execution") - .optional() - .text("Force pre-execution (mutually exclusive with batching)") - .action { case (preExecute, config) => - config.copy( - extra = config.extra.copy( - alwaysPreExecute = preExecute - ) - ) - } - parser.checkConfig { config => - if (config.extra.alwaysPreExecute && config.extra.batchingLedgerWriterConfig.enableBatching) - Left("Either pre-executing can be forced or batching can be enabled, but not both.") - else - Right(()) - } - () - } + override val defaultExtraConfig: Unit = () } diff --git a/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/Main.scala b/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/Main.scala index 9226688a4c..98a70dcaba 100644 --- a/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/Main.scala +++ b/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/Main.scala @@ -10,12 +10,7 @@ import com.daml.resources.ProgramResource object Main { def main(args: Array[String]): Unit = { val resource = for { - config <- Config.owner( - RunnerName, - InMemoryLedgerFactory.extraConfigParser, - InMemoryLedgerFactory.defaultExtraConfig, - args, - ) + config <- Config.ownerWithoutExtras(RunnerName, args) owner <- Owner(config) } yield owner new ProgramResource(resource).run(ResourceContext.apply) diff --git a/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/Owner.scala b/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/Owner.scala index 980996c7b5..742e77b55f 100644 --- a/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/Owner.scala +++ b/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/Owner.scala @@ -8,7 +8,7 @@ import com.daml.ledger.resources.ResourceOwner object Owner { // Utily if you want to spin this up as a library. - def apply(config: Config[ExtraConfig]): ResourceOwner[Unit] = + def apply(config: Config[Unit]): ResourceOwner[Unit] = for { dispatcher <- dispatcherOwner sharedState = InMemoryState.empty diff --git a/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriter.scala b/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriter.scala index 45e121d7f5..d2f7cc3d5a 100644 --- a/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriter.scala +++ b/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriter.scala @@ -3,7 +3,6 @@ package com.daml.ledger.on.memory -import akka.stream.Materializer import com.daml.api.util.TimeProvider import com.daml.caching.Cache import com.daml.ledger.participant.state.kvutils.api._ @@ -14,67 +13,11 @@ import com.daml.lf.engine.Engine import com.daml.metrics.Metrics import com.daml.platform.akkastreams.dispatcher.Dispatcher +import scala.concurrent.ExecutionContext + object InMemoryLedgerReaderWriter { - final class BatchingOwner( - ledgerId: LedgerId, - batchingLedgerWriterConfig: BatchingLedgerWriterConfig, - participantId: ParticipantId, - metrics: Metrics, - timeProvider: TimeProvider = InMemoryLedgerWriter.DefaultTimeProvider, - stateValueCache: InMemoryLedgerWriter.StateValueCache = Cache.none, - dispatcher: Dispatcher[Index], - state: InMemoryState, - engine: Engine, - )(implicit materializer: Materializer) - extends ResourceOwner[KeyValueLedger] { - override def acquire()(implicit context: ResourceContext): Resource[KeyValueLedger] = { - val reader = new InMemoryLedgerReader(ledgerId, dispatcher, state, metrics) - for { - writer <- new InMemoryLedgerWriter.BatchingOwner( - batchingLedgerWriterConfig, - participantId, - metrics, - timeProvider, - stateValueCache, - dispatcher, - state, - engine, - ).acquire() - } yield createKeyValueLedger(reader, writer) - } - } - - final class SingleParticipantBatchingOwner( - ledgerId: LedgerId, - batchingLedgerWriterConfig: BatchingLedgerWriterConfig, - participantId: ParticipantId, - timeProvider: TimeProvider = InMemoryLedgerWriter.DefaultTimeProvider, - stateValueCache: InMemoryLedgerWriter.StateValueCache = Cache.none, - metrics: Metrics, - engine: Engine, - )(implicit materializer: Materializer) - extends ResourceOwner[KeyValueLedger] { - override def acquire()(implicit context: ResourceContext): Resource[KeyValueLedger] = { - val state = InMemoryState.empty - for { - dispatcher <- dispatcherOwner.acquire() - readerWriter <- new BatchingOwner( - ledgerId, - batchingLedgerWriterConfig, - participantId, - metrics, - timeProvider, - stateValueCache, - dispatcher, - state, - engine, - ).acquire() - } yield readerWriter - } - } - - final class PreExecutingOwner( + final class Owner( ledgerId: LedgerId, participantId: ParticipantId, keySerializationStrategy: StateKeySerializationStrategy, @@ -84,12 +27,12 @@ object InMemoryLedgerReaderWriter { dispatcher: Dispatcher[Index], state: InMemoryState, engine: Engine, - )(implicit materializer: Materializer) - extends ResourceOwner[KeyValueLedger] { + committerExecutionContext: ExecutionContext, + ) extends ResourceOwner[KeyValueLedger] { override def acquire()(implicit context: ResourceContext): Resource[KeyValueLedger] = { val reader = new InMemoryLedgerReader(ledgerId, dispatcher, state, metrics) for { - writer <- new InMemoryLedgerWriter.PreExecutingOwner( + writer <- new InMemoryLedgerWriter.Owner( participantId, keySerializationStrategy, metrics, @@ -98,6 +41,7 @@ object InMemoryLedgerReaderWriter { dispatcher, state, engine, + committerExecutionContext, ).acquire() } yield createKeyValueLedger(reader, writer) } diff --git a/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerWriter.scala b/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerWriter.scala index 1b56251a46..baa15632b2 100644 --- a/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerWriter.scala +++ b/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerWriter.scala @@ -3,28 +3,18 @@ package com.daml.ledger.on.memory -import akka.stream.Materializer +import java.time.Instant + import com.daml.api.util.TimeProvider import com.daml.caching.Cache -import com.daml.dec.DirectExecutionContext import com.daml.ledger.api.health.{HealthStatus, Healthy} +import com.daml.ledger.on.memory.InMemoryLedgerWriter._ import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue} -import com.daml.ledger.participant.state.kvutils.api.{ - BatchingLedgerWriter, - BatchingLedgerWriterConfig, - CommitMetadata, - LedgerWriter, -} +import com.daml.ledger.participant.state.kvutils.api.{CommitMetadata, LedgerWriter} import com.daml.ledger.participant.state.kvutils.export.LedgerDataExporter import com.daml.ledger.participant.state.kvutils.{KeyValueCommitting, Raw} import com.daml.ledger.participant.state.v1.{ParticipantId, SubmissionResult} import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner} -import com.daml.ledger.validator.batch.{ - BatchedSubmissionValidator, - BatchedSubmissionValidatorFactory, - BatchedValidatingCommitter, - ConflictDetection, -} import com.daml.ledger.validator.caching.{CachingStateReader, ImmutablesOnlyCacheUpdatePolicy} import com.daml.ledger.validator.preexecution.{ EqualityBasedPostExecutionConflictDetector, @@ -36,13 +26,8 @@ import com.daml.ledger.validator.preexecution.{ TimeBasedWriteSetSelector, } import com.daml.ledger.validator.reading.{DamlLedgerStateReader, LedgerStateReader} -import com.daml.ledger.validator.{ - SerializingStateReader, - StateKeySerializationStrategy, - ValidateAndCommit, -} +import com.daml.ledger.validator.{SerializingStateReader, StateKeySerializationStrategy} import com.daml.lf.engine.Engine -import com.daml.logging.LoggingContext.newLoggingContext import com.daml.metrics.Metrics import com.daml.platform.akkastreams.dispatcher.Dispatcher @@ -52,18 +37,28 @@ import scala.util.Success final class InMemoryLedgerWriter private[memory] ( override val participantId: ParticipantId, dispatcher: Dispatcher[Index], + now: () => Instant, state: InMemoryState, - validateAndCommit: ValidateAndCommit, + committer: Committer, + committerExecutionContext: ExecutionContext, + metrics: Metrics, ) extends LedgerWriter { override def commit( correlationId: String, envelope: Raw.Value, metadata: CommitMetadata, ): Future[SubmissionResult] = - validateAndCommit(correlationId, envelope, participantId) + committer + .commit( + participantId, + correlationId, + envelope, + exportRecordTime = now(), + ledgerStateAccess = new InMemoryLedgerStateAccess(state, metrics), + )(committerExecutionContext) .andThen { case Success(SubmissionResult.Acknowledged) => dispatcher.signalNewHead(state.newHeadSinceLastWrite()) - }(DirectExecutionContext) + }(committerExecutionContext) override def currentHealth(): HealthStatus = Healthy } @@ -74,74 +69,13 @@ object InMemoryLedgerWriter { private[memory] type StateValueCache = Cache[DamlStateKey, DamlStateValue] - final class BatchingOwner( - batchingLedgerWriterConfig: BatchingLedgerWriterConfig, - participantId: ParticipantId, - metrics: Metrics, - timeProvider: TimeProvider = DefaultTimeProvider, - stateValueCache: StateValueCache = Cache.none, - dispatcher: Dispatcher[Index], - state: InMemoryState, - engine: Engine, - )(implicit materializer: Materializer) - extends ResourceOwner[LedgerWriter] { - override def acquire()(implicit context: ResourceContext): Resource[LedgerWriter] = - for { - ledgerDataExporter <- LedgerDataExporter.Owner.acquire() - keyValueCommitting = createKeyValueCommitting(metrics, timeProvider, engine) - committer = createBatchedCommitter(keyValueCommitting, ledgerDataExporter) - writer = new InMemoryLedgerWriter(participantId, dispatcher, state, committer) - // We need to generate batched submissions for the validator in order to improve throughput. - // Hence, we have a BatchingLedgerWriter collect and forward batched submissions to the - // in-memory committer. - batchingWriter <- newLoggingContext { implicit loggingContext => - ResourceOwner - .forCloseable(() => BatchingLedgerWriter(batchingLedgerWriterConfig, writer)) - .acquire() - } - } yield batchingWriter + private[memory] type Committer = PreExecutingValidatingCommitter[ + Option[DamlStateValue], + RawPreExecutingCommitStrategy.ReadSet, + RawKeyValuePairsWithLogEntry, + ] - private def createBatchedCommitter( - keyValueCommitting: KeyValueCommitting, - ledgerDataExporter: LedgerDataExporter, - )(implicit materializer: Materializer): ValidateAndCommit = { - val validator = BatchedSubmissionValidator[Index]( - BatchedSubmissionValidatorFactory.defaultParametersFor( - batchingLedgerWriterConfig.enableBatching - ), - keyValueCommitting, - new ConflictDetection(metrics), - metrics, - ledgerDataExporter, - ) - val committer = BatchedValidatingCommitter[Index]( - () => timeProvider.getCurrentTime, - validator, - stateValueCache, - ) - locally { - implicit val executionContext: ExecutionContext = materializer.executionContext - - def validateAndCommit( - correlationId: String, - submissionEnvelope: Raw.Value, - submittingParticipantId: ParticipantId, - ) = - new InMemoryLedgerStateAccess(state, metrics).inTransaction { ledgerStateOperations => - committer.commit( - correlationId, - submissionEnvelope, - submittingParticipantId, - ledgerStateOperations, - ) - } - - validateAndCommit - } - } - } - - final class PreExecutingOwner( + final class Owner( participantId: ParticipantId, keySerializationStrategy: StateKeySerializationStrategy, metrics: Metrics, @@ -150,28 +84,30 @@ object InMemoryLedgerWriter { dispatcher: Dispatcher[Index], state: InMemoryState, engine: Engine, - )(implicit materializer: Materializer) - extends ResourceOwner[LedgerWriter] { - override def acquire()(implicit context: ResourceContext): Resource[LedgerWriter] = { - val keyValueCommitting = createKeyValueCommitting(metrics, timeProvider, engine) + committerExecutionContext: ExecutionContext, + ) extends ResourceOwner[LedgerWriter] { + private val now = () => timeProvider.getCurrentTime + + override def acquire()(implicit context: ResourceContext): Resource[LedgerWriter] = for { ledgerDataExporter <- LedgerDataExporter.Owner.acquire() - } yield { - val committer = createPreExecutingCommitter(keyValueCommitting, ledgerDataExporter) - new InMemoryLedgerWriter(participantId, dispatcher, state, committer) - } - } + } yield new InMemoryLedgerWriter( + participantId, + dispatcher, + now, + state, + newCommitter(ledgerDataExporter), + committerExecutionContext, + metrics, + ) - private def createPreExecutingCommitter( - keyValueCommitting: KeyValueCommitting, - ledgerDataExporter: LedgerDataExporter, - )(implicit materializer: Materializer): ValidateAndCommit = { - val now = () => timeProvider.getCurrentTime - val committer = new PreExecutingValidatingCommitter[ - Option[DamlStateValue], - RawPreExecutingCommitStrategy.ReadSet, - RawKeyValuePairsWithLogEntry, - ]( + private def newCommitter(ledgerDataExporter: LedgerDataExporter): Committer = { + val keyValueCommitting = new KeyValueCommitting( + engine, + metrics, + inStaticTimeMode = needStaticTimeModeFor(timeProvider), + ) + new Committer( transformStateReader = transformStateReader(keySerializationStrategy, stateValueCache), validator = new PreExecutingSubmissionValidator( keyValueCommitting, @@ -183,24 +119,6 @@ object InMemoryLedgerWriter { postExecutionWriter = new RawPostExecutionWriter, ledgerDataExporter = ledgerDataExporter, ) - locally { - implicit val executionContext: ExecutionContext = materializer.executionContext - - def validateAndCommit( - correlationId: String, - submissionEnvelope: Raw.Value, - submittingParticipantId: ParticipantId, - ) = - committer.commit( - submittingParticipantId, - correlationId, - submissionEnvelope, - now(), - new InMemoryLedgerStateAccess(state, metrics), - ) - - validateAndCommit - } } private def transformStateReader( @@ -215,13 +133,6 @@ object InMemoryLedgerWriter { } } - private def createKeyValueCommitting( - metrics: Metrics, - timeProvider: TimeProvider, - engine: Engine, - ): KeyValueCommitting = - new KeyValueCommitting(engine, metrics, inStaticTimeMode = needStaticTimeModeFor(timeProvider)) - private def needStaticTimeModeFor(timeProvider: TimeProvider): Boolean = timeProvider != TimeProvider.UTC diff --git a/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/package.scala b/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/package.scala index 1a1859beba..646071a42c 100644 --- a/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/package.scala +++ b/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/package.scala @@ -11,7 +11,7 @@ package object memory { private[memory] val StartIndex: Index = 0 - private[memory] def dispatcherOwner: ResourceOwner[Dispatcher[Index]] = + def dispatcherOwner: ResourceOwner[Dispatcher[Index]] = Dispatcher.owner( name = "in-memory-key-value-participant-state", zeroIndex = StartIndex, diff --git a/ledger/ledger-on-memory/src/test/lib/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriterIntegrationSpecBase.scala b/ledger/ledger-on-memory/src/test/lib/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriterIntegrationSpecBase.scala deleted file mode 100644 index 8ee0968d2f..0000000000 --- a/ledger/ledger-on-memory/src/test/lib/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriterIntegrationSpecBase.scala +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.ledger.on.memory - -import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase -import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase.ParticipantState -import com.daml.ledger.participant.state.kvutils.api.{ - BatchingLedgerWriterConfig, - KeyValueParticipantState, -} -import com.daml.ledger.participant.state.v1.{LedgerId, ParticipantId} -import com.daml.ledger.resources.ResourceOwner -import com.daml.lf.engine.Engine -import com.daml.logging.LoggingContext -import com.daml.metrics.Metrics - -import scala.concurrent.duration.DurationInt - -abstract class InMemoryLedgerReaderWriterIntegrationSpecBase(enableBatching: Boolean) - extends ParticipantStateIntegrationSpecBase( - s"In-memory ledger/participant with parallel validation ${if (enableBatching) "enabled" - else "disabled"}" - ) { - - private val batchingLedgerWriterConfig = - BatchingLedgerWriterConfig( - enableBatching = enableBatching, - // In case of serial validation, we need a queue length of 1000 because the - // "process many party allocations" test case will be sending in that many requests at once - // (otherwise some of those would be rejected). - // See [[ParticipantStateIntegrationSpecBase]]. - maxBatchQueueSize = if (enableBatching) 100 else 1000, - maxBatchSizeBytes = 4L * 1024L * 1024L /* 4MB */, - maxBatchWaitDuration = 100.millis, - // In-memory ledger doesn't support concurrent commits. - maxBatchConcurrentCommits = 1, - ) - - override val isPersistent: Boolean = false - - override def participantStateFactory( - ledgerId: LedgerId, - participantId: ParticipantId, - testId: String, - metrics: Metrics, - )(implicit loggingContext: LoggingContext): ResourceOwner[ParticipantState] = - new InMemoryLedgerReaderWriter.SingleParticipantBatchingOwner( - ledgerId, - batchingLedgerWriterConfig, - participantId, - metrics = metrics, - engine = Engine.DevEngine(), - ).map(readerWriter => new KeyValueParticipantState(readerWriter, readerWriter, metrics)) - -} diff --git a/ledger/ledger-on-memory/src/test/resources/logback-test.xml b/ledger/ledger-on-memory/src/test/resources/logback-test.xml index 6db2204661..49b981fbd4 100644 --- a/ledger/ledger-on-memory/src/test/resources/logback-test.xml +++ b/ledger/ledger-on-memory/src/test/resources/logback-test.xml @@ -6,7 +6,7 @@ - + diff --git a/ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriterIntegrationSpec.scala b/ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriterIntegrationSpec.scala index 2924e80302..d1636769ff 100644 --- a/ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriterIntegrationSpec.scala +++ b/ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriterIntegrationSpec.scala @@ -3,5 +3,46 @@ package com.daml.ledger.on.memory +import java.util.concurrent.Executors + +import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase +import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase.ParticipantState +import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState +import com.daml.ledger.participant.state.v1.{LedgerId, ParticipantId} +import com.daml.ledger.resources.ResourceOwner +import com.daml.ledger.validator.StateKeySerializationStrategy +import com.daml.lf.engine.Engine +import com.daml.logging.LoggingContext +import com.daml.metrics.Metrics + +import scala.concurrent.ExecutionContext + class InMemoryLedgerReaderWriterIntegrationSpec - extends InMemoryLedgerReaderWriterIntegrationSpecBase(enableBatching = true) + extends ParticipantStateIntegrationSpecBase(s"In-memory ledger/participant") { + + override val isPersistent: Boolean = false + + override def participantStateFactory( + ledgerId: LedgerId, + participantId: ParticipantId, + testId: String, + metrics: Metrics, + )(implicit loggingContext: LoggingContext): ResourceOwner[ParticipantState] = + for { + dispatcher <- dispatcherOwner + committerExecutionContext <- ResourceOwner + .forExecutorService(() => Executors.newCachedThreadPool()) + .map(ExecutionContext.fromExecutorService) + readerWriter <- new InMemoryLedgerReaderWriter.Owner( + ledgerId = ledgerId, + participantId = participantId, + keySerializationStrategy = StateKeySerializationStrategy.createDefault(), + metrics = metrics, + dispatcher = dispatcher, + state = InMemoryState.empty, + engine = Engine.DevEngine(), + committerExecutionContext = committerExecutionContext, + ) + } yield new KeyValueParticipantState(readerWriter, readerWriter, metrics) + +} diff --git a/ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/InMemoryLedgerWriterSpec.scala b/ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/InMemoryLedgerWriterSpec.scala index f3daa6e6ff..5af229ae5c 100644 --- a/ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/InMemoryLedgerWriterSpec.scala +++ b/ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/InMemoryLedgerWriterSpec.scala @@ -3,19 +3,23 @@ package com.daml.ledger.on.memory +import java.time.Instant + +import com.codahale.metrics.MetricRegistry import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll import com.daml.ledger.participant.state.kvutils.Raw import com.daml.ledger.participant.state.kvutils.api.CommitMetadata import com.daml.ledger.participant.state.v1.{ParticipantId, SubmissionResult} -import com.daml.ledger.validator.ValidateAndCommit +import com.daml.ledger.validator.LedgerStateAccess import com.daml.lf.data.Ref +import com.daml.metrics.Metrics import com.daml.platform.akkastreams.dispatcher.Dispatcher import com.google.protobuf.ByteString import org.mockito.{ArgumentMatchersSugar, MockitoSugar} import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AsyncWordSpec -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} class InMemoryLedgerWriterSpec extends AsyncWordSpec @@ -26,16 +30,27 @@ class InMemoryLedgerWriterSpec "commit" should { "not signal new head in case of failure" in { val mockDispatcher = mock[Dispatcher[Index]] - val mockCommitter = mock[ValidateAndCommit] - when(mockCommitter(any[String], any[Raw.Value], any[ParticipantId])) + val mockCommitter = mock[InMemoryLedgerWriter.Committer] + when( + mockCommitter.commit( + any[ParticipantId], + any[String], + any[Raw.Value], + any[Instant], + any[LedgerStateAccess[Any]], + )(any[ExecutionContext]) + ) .thenReturn( Future.successful(SubmissionResult.InternalError("Validation failed with an exception")) ) val instance = new InMemoryLedgerWriter( - Ref.ParticipantId.assertFromString("participant ID"), - mockDispatcher, - InMemoryState.empty, - mockCommitter, + participantId = Ref.ParticipantId.assertFromString("participant ID"), + dispatcher = mockDispatcher, + now = () => Instant.EPOCH, + state = InMemoryState.empty, + committer = mockCommitter, + committerExecutionContext = executionContext, + metrics = new Metrics(new MetricRegistry), ) instance diff --git a/ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/NonBatchedInMemoryLedgerReaderWriterIntegrationSpec.scala b/ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/NonBatchedInMemoryLedgerReaderWriterIntegrationSpec.scala deleted file mode 100644 index 7d95e16731..0000000000 --- a/ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/NonBatchedInMemoryLedgerReaderWriterIntegrationSpec.scala +++ /dev/null @@ -1,7 +0,0 @@ -// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.ledger.on.memory - -class NonBatchedInMemoryLedgerReaderWriterIntegrationSpec - extends InMemoryLedgerReaderWriterIntegrationSpecBase(enableBatching = false) {} diff --git a/ledger/participant-state/kvutils/BUILD.bazel b/ledger/participant-state/kvutils/BUILD.bazel index cbd3d3f9b3..5a7b75a242 100644 --- a/ledger/participant-state/kvutils/BUILD.bazel +++ b/ledger/participant-state/kvutils/BUILD.bazel @@ -226,7 +226,6 @@ client_server_build( server_args = [ "--contract-id-seeding=testing-weak", "--participant=participant-id=%s,port=%d" % (REFERENCE_LEDGER_EXPORT_NAME, REFERENCE_LEDGER_EXPORT_PORT), - "--force-pre-execution=true", ], visibility = [":__subpackages__"], ) if not is_windows and scala_major_version == "2.12" else None diff --git a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Config.scala b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Config.scala index f26f0966bf..18da01cbcd 100644 --- a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Config.scala +++ b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Config.scala @@ -71,6 +71,9 @@ object Config { extra = extra, ) + def ownerWithoutExtras(name: String, args: collection.Seq[String]): ResourceOwner[Config[Unit]] = + owner(name, _ => (), (), args) + def owner[Extra]( name: String, extraOptions: OptionParser[Config[Extra]] => Unit, diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/package.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/package.scala index e4222c079e..91dad9030d 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/package.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/validator/package.scala @@ -5,22 +5,14 @@ package com.daml.ledger import com.daml.caching.ConcurrentCache import com.daml.ledger.participant.state.kvutils.DamlKvutils.DamlStateValue -import com.daml.ledger.participant.state.kvutils.{CorrelationId, Raw} -import com.daml.ledger.participant.state.v1.{ParticipantId, SubmissionResult} +import com.daml.ledger.participant.state.kvutils.Raw +import com.daml.ledger.participant.state.v1.ParticipantId import scala.concurrent.{ExecutionContext, Future} package object validator { type SubmittingParticipantId = ParticipantId - /** Orchestrates committing to a ledger after validating submissions. - */ - type ValidateAndCommit = ( - CorrelationId, - Raw.Value, - SubmittingParticipantId, - ) => Future[SubmissionResult] - type StateValueCache = ConcurrentCache[Raw.Value, DamlStateValue] // At some point, someone much smarter than the author of this code will reimplement the usages of diff --git a/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala b/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala index 6b1cdccaad..ea01720dd2 100644 --- a/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala +++ b/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala @@ -14,12 +14,10 @@ import akka.stream.scaladsl.Source import ch.qos.logback.classic.Level import com.codahale.metrics.MetricRegistry import com.daml.ledger.on.memory -import com.daml.ledger.participant.state.kvutils.api.{ - BatchingLedgerWriterConfig, - KeyValueParticipantState, -} +import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState import com.daml.ledger.participant.state.v1._ import com.daml.ledger.resources.{ResourceOwner, TestResourceContext} +import com.daml.ledger.validator.StateKeySerializationStrategy import com.daml.lf.data.Ref import com.daml.lf.data.Ref.LedgerString import com.daml.lf.engine.Engine @@ -256,13 +254,22 @@ object RecoveringIndexerIntegrationSpec { loggingContext: LoggingContext, ): ResourceOwner[ParticipantState] = { val metrics = new Metrics(new MetricRegistry) - new memory.InMemoryLedgerReaderWriter.SingleParticipantBatchingOwner( - ledgerId, - BatchingLedgerWriterConfig.reasonableDefault, - participantId, - metrics = metrics, - engine = Engine.DevEngine(), - ).map(readerWriter => new KeyValueParticipantState(readerWriter, readerWriter, metrics)) + for { + dispatcher <- memory.dispatcherOwner + committerExecutionContext <- ResourceOwner + .forExecutorService(() => Executors.newCachedThreadPool()) + .map(ExecutionContext.fromExecutorService) + readerWriter <- new memory.InMemoryLedgerReaderWriter.Owner( + ledgerId = ledgerId, + participantId = participantId, + keySerializationStrategy = StateKeySerializationStrategy.createDefault(), + metrics = metrics, + dispatcher = dispatcher, + state = memory.InMemoryState.empty, + engine = Engine.DevEngine(), + committerExecutionContext = committerExecutionContext, + ) + } yield new KeyValueParticipantState(readerWriter, readerWriter, metrics) } }