ledger-on-memory: Only support pre-execution. [KVL-821] (#8757)

* ledger-on-memory: Rename `PreExecutingOwner` to just `Owner`.

* ledger-on-memory: Push the committer execution context up a little.

* ledger-on-memory: Use pre-execution in tests, not batching.

* ledger-on-memory: Remove support for batching submissions.

Pre-execution is pretty much always faster; let's use it.

CHANGELOG_BEGIN
CHANGELOG_END

* ledger-on-memory: Simplify construction.

Now we only support pre-execution, we can be less general.
This commit is contained in:
Samir Talwar 2021-02-05 09:56:55 +01:00 committed by GitHub
parent e79966b890
commit 9c477ffdf5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 175 additions and 468 deletions

View File

@ -9,7 +9,7 @@ import java.util.stream.Collectors
import com.daml.bazeltools.BazelRunfiles._
import com.daml.ledger.api.testing.utils.{AkkaBeforeAndAfterAll, OwnedResource, SuiteResource}
import com.daml.ledger.api.tls.TlsConfiguration
import com.daml.ledger.on.memory.{ExtraConfig, Owner}
import com.daml.ledger.on.memory.Owner
import com.daml.ledger.participant.state.kvutils.app.{ParticipantConfig, ParticipantRunMode}
import com.daml.ledger.participant.state.kvutils.{app => kvutils}
import com.daml.ledger.participant.state.v1
@ -73,15 +73,10 @@ trait MultiParticipantFixture
for {
_ <- Owner(
kvutils.Config
.createDefault(ExtraConfig.reasonableDefault)
.createDefault(())
.copy(
participants = Seq(
participant1,
participant2,
),
archiveFiles = Seq(
darFile
),
participants = Seq(participant1, participant2),
archiveFiles = Seq(darFile),
)
)
} yield (readPortfile(participant1Portfile), readPortfile(participant2Portfile))

View File

@ -33,46 +33,13 @@ da_scala_library(
"//ledger/metrics",
"//ledger/participant-state",
"//ledger/participant-state/kvutils",
"//libs-scala/concurrent",
"//libs-scala/contextualized-logging",
"//libs-scala/resources",
"//libs-scala/resources-akka",
"//libs-scala/resources-grpc",
"@maven//:com_google_protobuf_protobuf_java",
"@maven//:io_dropwizard_metrics_metrics_core",
],
)
da_scala_library(
name = "ledger-on-memory-test-lib",
srcs = glob(["src/test/lib/scala/**/*.scala"]),
scala_deps = [
"@maven//:com_typesafe_akka_akka_actor",
"@maven//:com_typesafe_akka_akka_stream",
"@maven//:org_scalactic_scalactic",
"@maven//:org_scalatest_scalatest",
],
deps = [
":ledger-on-memory",
"//daml-lf/data",
"//daml-lf/engine",
"//language-support/scala/bindings",
"//ledger-api/rs-grpc-bridge",
"//ledger-api/testing-utils",
"//ledger/caching",
"//ledger/ledger-api-common",
"//ledger/ledger-api-health",
"//ledger/ledger-resources",
"//ledger/ledger-resources:ledger-resources-test-lib",
"//ledger/metrics",
"//ledger/participant-state",
"//ledger/participant-state/kvutils",
"//ledger/participant-state/kvutils:kvutils-tests-lib",
"//libs-scala/contextualized-logging",
"//libs-scala/resources",
],
)
da_scala_test_suite(
name = "ledger-on-memory-tests",
size = "small",
@ -94,11 +61,12 @@ da_scala_test_suite(
],
deps = [
":ledger-on-memory",
":ledger-on-memory-test-lib",
"//daml-lf/data",
"//daml-lf/engine",
"//language-support/scala/bindings",
"//ledger-api/rs-grpc-bridge",
"//ledger-api/testing-utils",
"//ledger/caching",
"//ledger/ledger-api-common",
"//ledger/ledger-api-health",
"//ledger/ledger-resources",
@ -109,6 +77,8 @@ da_scala_test_suite(
"//ledger/participant-state/kvutils:kvutils-tests-lib",
"//libs-scala/contextualized-logging",
"//libs-scala/resources",
"//libs-scala/resources-akka",
"//libs-scala/resources-grpc",
"@maven//:io_dropwizard_metrics_metrics_core",
"@maven//:org_mockito_mockito_core",
],
@ -164,23 +134,7 @@ conformance_test(
server = ":app",
server_args = [
"--contract-id-seeding=testing-weak",
"--participant participant-id=example,port=6865",
"--batching enable=true,max-batch-size-bytes=262144",
],
test_tool_args = [
"--verbose",
],
)
conformance_test(
name = "conformance-test-pre-execution",
ports = [6865],
server = ":app",
server_args = [
"--contract-id-seeding=testing-weak",
"--participant participant-id=example,port=6865",
"--batching enable=false",
"--force-pre-execution=true",
"--participant=participant-id=example,port=6865",
],
test_tool_args = [
"--verbose",
@ -196,9 +150,8 @@ conformance_test(
server = ":app",
server_args = [
"--contract-id-seeding=testing-weak",
"--participant participant-id=example1,port=6865",
"--participant participant-id=example2,port=6866",
"--batching enable=true,max-batch-size-bytes=262144",
"--participant=participant-id=example1,port=6865",
"--participant=participant-id=example2,port=6866",
],
test_tool_args = [
"--verbose",
@ -214,9 +167,8 @@ conformance_test(
server = ":app",
server_args = [
"--contract-id-seeding=testing-weak",
"--participant participant-id=split-example,port=6865,server-jdbc-url=jdbc:h2:mem:split-example;db_close_delay=-1;db_close_on_exit=false,shard-name=server1,run-mode=ledger-api-server",
"--participant participant-id=split-example,port=6865,server-jdbc-url=jdbc:h2:mem:split-example;db_close_delay=-1;db_close_on_exit=false,shard-name=indexer,run-mode=indexer",
"--batching enable=true,max-batch-size-bytes=262144",
"--participant=participant-id=split-example,port=6865,server-jdbc-url=jdbc:h2:mem:split-example;db_close_delay=-1;db_close_on_exit=false,shard-name=server1,run-mode=ledger-api-server",
"--participant=participant-id=split-example,port=6865,server-jdbc-url=jdbc:h2:mem:split-example;db_close_delay=-1;db_close_on_exit=false,shard-name=indexer,run-mode=indexer",
],
test_tool_args = [
"--verbose",
@ -230,8 +182,7 @@ conformance_test(
server = ":app",
server_args = [
"--contract-id-seeding=testing-weak",
"--participant participant-id=example,port=6865",
"--batching enable=true,max-batch-size-bytes=262144",
"--participant=participant-id=example,port=6865",
],
test_tool_args = [
"--verbose",
@ -245,8 +196,7 @@ conformance_test(
server = ":app",
server_args = [
"--contract-id-seeding=testing-weak",
"--participant participant-id=example,port=6865",
"--batching enable=true,max-batch-size-bytes=262144",
"--participant=participant-id=example,port=6865",
],
test_tool_args = [
"--verbose",
@ -260,7 +210,7 @@ conformance_test(
server = ":app",
server_args = [
"--contract-id-seeding=testing-weak",
"--participant participant-id=example,port=6865",
"--participant=participant-id=example,port=6865",
],
test_tool_args = [
"--verbose",

View File

@ -1,20 +0,0 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.on.memory
import com.daml.ledger.participant.state.kvutils.api.BatchingLedgerWriterConfig
final case class ExtraConfig(
batchingLedgerWriterConfig: BatchingLedgerWriterConfig,
alwaysPreExecute: Boolean,
)
object ExtraConfig {
val reasonableDefault: ExtraConfig =
ExtraConfig(
batchingLedgerWriterConfig =
BatchingLedgerWriterConfig.reasonableDefault.copy(maxBatchConcurrentCommits = 2),
alwaysPreExecute = false,
)
}

View File

@ -5,13 +5,7 @@ package com.daml.ledger.on.memory
import akka.stream.Materializer
import com.daml.caching
import com.daml.ledger.participant.state.kvutils.api.{
BatchingLedgerWriterConfig,
KeyValueLedger,
KeyValueParticipantState,
}
import com.daml.ledger.participant.state.kvutils.app.batch.BatchingLedgerWriterConfigReader
import com.daml.ledger.participant.state.kvutils.app.batch.BatchingLedgerWriterConfigReader.optionsReader
import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState
import com.daml.ledger.participant.state.kvutils.app.{Config, LedgerFactory, ParticipantConfig}
import com.daml.ledger.participant.state.kvutils.caching.`Message Weight`
import com.daml.ledger.resources.ResourceOwner
@ -23,30 +17,19 @@ import com.daml.platform.configuration.LedgerConfiguration
import scopt.OptionParser
private[memory] class InMemoryLedgerFactory(dispatcher: Dispatcher[Index], state: InMemoryState)
extends LedgerFactory[KeyValueParticipantState, ExtraConfig] {
extends LedgerFactory[KeyValueParticipantState, Unit] {
override final def readWriteServiceOwner(
config: Config[ExtraConfig],
config: Config[Unit],
participantConfig: ParticipantConfig,
engine: Engine,
)(implicit
materializer: Materializer,
loggingContext: LoggingContext,
): ResourceOwner[KeyValueParticipantState] =
for {
readerWriter <- owner(config, participantConfig, engine)
} yield new KeyValueParticipantState(
readerWriter,
readerWriter,
createMetrics(participantConfig, config),
)
def owner(config: Config[ExtraConfig], participantConfig: ParticipantConfig, engine: Engine)(
implicit materializer: Materializer
): ResourceOwner[KeyValueLedger] = {
): ResourceOwner[KeyValueParticipantState] = {
val metrics = createMetrics(participantConfig, config)
if (config.extra.alwaysPreExecute)
new InMemoryLedgerReaderWriter.PreExecutingOwner(
for {
readerWriter <- new InMemoryLedgerReaderWriter.Owner(
ledgerId = config.ledgerId,
participantId = participantConfig.participantId,
keySerializationStrategy = DefaultStateKeySerializationStrategy,
@ -58,64 +41,19 @@ private[memory] class InMemoryLedgerFactory(dispatcher: Dispatcher[Index], state
dispatcher = dispatcher,
state = state,
engine = engine,
committerExecutionContext = materializer.executionContext,
)
else
new InMemoryLedgerReaderWriter.BatchingOwner(
ledgerId = config.ledgerId,
batchingLedgerWriterConfig = config.extra.batchingLedgerWriterConfig,
participantId = participantConfig.participantId,
metrics = metrics,
stateValueCache = caching.WeightedCache.from(
configuration = config.stateValueCache,
metrics = metrics.daml.kvutils.submission.validator.stateValueCache,
),
dispatcher = dispatcher,
state = state,
engine = engine,
} yield new KeyValueParticipantState(
readerWriter,
readerWriter,
createMetrics(participantConfig, config),
)
}
override def ledgerConfig(config: Config[ExtraConfig]): LedgerConfiguration =
override def ledgerConfig(config: Config[Unit]): LedgerConfiguration =
LedgerConfiguration.defaultLocalLedger
override def extraConfigParser(parser: OptionParser[Config[ExtraConfig]]): Unit =
InMemoryLedgerFactory.extraConfigParser(parser)
override def extraConfigParser(parser: OptionParser[Config[Unit]]): Unit = ()
override val defaultExtraConfig: ExtraConfig = InMemoryLedgerFactory.defaultExtraConfig
}
private[memory] object InMemoryLedgerFactory {
val defaultExtraConfig: ExtraConfig = ExtraConfig.reasonableDefault
final def extraConfigParser(parser: OptionParser[Config[ExtraConfig]]): Unit = {
parser
.opt[BatchingLedgerWriterConfig]("batching")
.optional()
.text(BatchingLedgerWriterConfigReader.UsageText)
.action { case (parsedBatchingConfig, config) =>
config.copy(
extra = config.extra.copy(
batchingLedgerWriterConfig = parsedBatchingConfig
)
)
}
parser
.opt[Boolean]("force-pre-execution")
.optional()
.text("Force pre-execution (mutually exclusive with batching)")
.action { case (preExecute, config) =>
config.copy(
extra = config.extra.copy(
alwaysPreExecute = preExecute
)
)
}
parser.checkConfig { config =>
if (config.extra.alwaysPreExecute && config.extra.batchingLedgerWriterConfig.enableBatching)
Left("Either pre-executing can be forced or batching can be enabled, but not both.")
else
Right(())
}
()
}
override val defaultExtraConfig: Unit = ()
}

View File

@ -10,12 +10,7 @@ import com.daml.resources.ProgramResource
object Main {
def main(args: Array[String]): Unit = {
val resource = for {
config <- Config.owner(
RunnerName,
InMemoryLedgerFactory.extraConfigParser,
InMemoryLedgerFactory.defaultExtraConfig,
args,
)
config <- Config.ownerWithoutExtras(RunnerName, args)
owner <- Owner(config)
} yield owner
new ProgramResource(resource).run(ResourceContext.apply)

View File

@ -8,7 +8,7 @@ import com.daml.ledger.resources.ResourceOwner
object Owner {
// Utily if you want to spin this up as a library.
def apply(config: Config[ExtraConfig]): ResourceOwner[Unit] =
def apply(config: Config[Unit]): ResourceOwner[Unit] =
for {
dispatcher <- dispatcherOwner
sharedState = InMemoryState.empty

View File

@ -3,7 +3,6 @@
package com.daml.ledger.on.memory
import akka.stream.Materializer
import com.daml.api.util.TimeProvider
import com.daml.caching.Cache
import com.daml.ledger.participant.state.kvutils.api._
@ -14,67 +13,11 @@ import com.daml.lf.engine.Engine
import com.daml.metrics.Metrics
import com.daml.platform.akkastreams.dispatcher.Dispatcher
import scala.concurrent.ExecutionContext
object InMemoryLedgerReaderWriter {
final class BatchingOwner(
ledgerId: LedgerId,
batchingLedgerWriterConfig: BatchingLedgerWriterConfig,
participantId: ParticipantId,
metrics: Metrics,
timeProvider: TimeProvider = InMemoryLedgerWriter.DefaultTimeProvider,
stateValueCache: InMemoryLedgerWriter.StateValueCache = Cache.none,
dispatcher: Dispatcher[Index],
state: InMemoryState,
engine: Engine,
)(implicit materializer: Materializer)
extends ResourceOwner[KeyValueLedger] {
override def acquire()(implicit context: ResourceContext): Resource[KeyValueLedger] = {
val reader = new InMemoryLedgerReader(ledgerId, dispatcher, state, metrics)
for {
writer <- new InMemoryLedgerWriter.BatchingOwner(
batchingLedgerWriterConfig,
participantId,
metrics,
timeProvider,
stateValueCache,
dispatcher,
state,
engine,
).acquire()
} yield createKeyValueLedger(reader, writer)
}
}
final class SingleParticipantBatchingOwner(
ledgerId: LedgerId,
batchingLedgerWriterConfig: BatchingLedgerWriterConfig,
participantId: ParticipantId,
timeProvider: TimeProvider = InMemoryLedgerWriter.DefaultTimeProvider,
stateValueCache: InMemoryLedgerWriter.StateValueCache = Cache.none,
metrics: Metrics,
engine: Engine,
)(implicit materializer: Materializer)
extends ResourceOwner[KeyValueLedger] {
override def acquire()(implicit context: ResourceContext): Resource[KeyValueLedger] = {
val state = InMemoryState.empty
for {
dispatcher <- dispatcherOwner.acquire()
readerWriter <- new BatchingOwner(
ledgerId,
batchingLedgerWriterConfig,
participantId,
metrics,
timeProvider,
stateValueCache,
dispatcher,
state,
engine,
).acquire()
} yield readerWriter
}
}
final class PreExecutingOwner(
final class Owner(
ledgerId: LedgerId,
participantId: ParticipantId,
keySerializationStrategy: StateKeySerializationStrategy,
@ -84,12 +27,12 @@ object InMemoryLedgerReaderWriter {
dispatcher: Dispatcher[Index],
state: InMemoryState,
engine: Engine,
)(implicit materializer: Materializer)
extends ResourceOwner[KeyValueLedger] {
committerExecutionContext: ExecutionContext,
) extends ResourceOwner[KeyValueLedger] {
override def acquire()(implicit context: ResourceContext): Resource[KeyValueLedger] = {
val reader = new InMemoryLedgerReader(ledgerId, dispatcher, state, metrics)
for {
writer <- new InMemoryLedgerWriter.PreExecutingOwner(
writer <- new InMemoryLedgerWriter.Owner(
participantId,
keySerializationStrategy,
metrics,
@ -98,6 +41,7 @@ object InMemoryLedgerReaderWriter {
dispatcher,
state,
engine,
committerExecutionContext,
).acquire()
} yield createKeyValueLedger(reader, writer)
}

View File

@ -3,28 +3,18 @@
package com.daml.ledger.on.memory
import akka.stream.Materializer
import java.time.Instant
import com.daml.api.util.TimeProvider
import com.daml.caching.Cache
import com.daml.dec.DirectExecutionContext
import com.daml.ledger.api.health.{HealthStatus, Healthy}
import com.daml.ledger.on.memory.InMemoryLedgerWriter._
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue}
import com.daml.ledger.participant.state.kvutils.api.{
BatchingLedgerWriter,
BatchingLedgerWriterConfig,
CommitMetadata,
LedgerWriter,
}
import com.daml.ledger.participant.state.kvutils.api.{CommitMetadata, LedgerWriter}
import com.daml.ledger.participant.state.kvutils.export.LedgerDataExporter
import com.daml.ledger.participant.state.kvutils.{KeyValueCommitting, Raw}
import com.daml.ledger.participant.state.v1.{ParticipantId, SubmissionResult}
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.ledger.validator.batch.{
BatchedSubmissionValidator,
BatchedSubmissionValidatorFactory,
BatchedValidatingCommitter,
ConflictDetection,
}
import com.daml.ledger.validator.caching.{CachingStateReader, ImmutablesOnlyCacheUpdatePolicy}
import com.daml.ledger.validator.preexecution.{
EqualityBasedPostExecutionConflictDetector,
@ -36,13 +26,8 @@ import com.daml.ledger.validator.preexecution.{
TimeBasedWriteSetSelector,
}
import com.daml.ledger.validator.reading.{DamlLedgerStateReader, LedgerStateReader}
import com.daml.ledger.validator.{
SerializingStateReader,
StateKeySerializationStrategy,
ValidateAndCommit,
}
import com.daml.ledger.validator.{SerializingStateReader, StateKeySerializationStrategy}
import com.daml.lf.engine.Engine
import com.daml.logging.LoggingContext.newLoggingContext
import com.daml.metrics.Metrics
import com.daml.platform.akkastreams.dispatcher.Dispatcher
@ -52,18 +37,28 @@ import scala.util.Success
final class InMemoryLedgerWriter private[memory] (
override val participantId: ParticipantId,
dispatcher: Dispatcher[Index],
now: () => Instant,
state: InMemoryState,
validateAndCommit: ValidateAndCommit,
committer: Committer,
committerExecutionContext: ExecutionContext,
metrics: Metrics,
) extends LedgerWriter {
override def commit(
correlationId: String,
envelope: Raw.Value,
metadata: CommitMetadata,
): Future[SubmissionResult] =
validateAndCommit(correlationId, envelope, participantId)
committer
.commit(
participantId,
correlationId,
envelope,
exportRecordTime = now(),
ledgerStateAccess = new InMemoryLedgerStateAccess(state, metrics),
)(committerExecutionContext)
.andThen { case Success(SubmissionResult.Acknowledged) =>
dispatcher.signalNewHead(state.newHeadSinceLastWrite())
}(DirectExecutionContext)
}(committerExecutionContext)
override def currentHealth(): HealthStatus = Healthy
}
@ -74,74 +69,13 @@ object InMemoryLedgerWriter {
private[memory] type StateValueCache = Cache[DamlStateKey, DamlStateValue]
final class BatchingOwner(
batchingLedgerWriterConfig: BatchingLedgerWriterConfig,
participantId: ParticipantId,
metrics: Metrics,
timeProvider: TimeProvider = DefaultTimeProvider,
stateValueCache: StateValueCache = Cache.none,
dispatcher: Dispatcher[Index],
state: InMemoryState,
engine: Engine,
)(implicit materializer: Materializer)
extends ResourceOwner[LedgerWriter] {
override def acquire()(implicit context: ResourceContext): Resource[LedgerWriter] =
for {
ledgerDataExporter <- LedgerDataExporter.Owner.acquire()
keyValueCommitting = createKeyValueCommitting(metrics, timeProvider, engine)
committer = createBatchedCommitter(keyValueCommitting, ledgerDataExporter)
writer = new InMemoryLedgerWriter(participantId, dispatcher, state, committer)
// We need to generate batched submissions for the validator in order to improve throughput.
// Hence, we have a BatchingLedgerWriter collect and forward batched submissions to the
// in-memory committer.
batchingWriter <- newLoggingContext { implicit loggingContext =>
ResourceOwner
.forCloseable(() => BatchingLedgerWriter(batchingLedgerWriterConfig, writer))
.acquire()
}
} yield batchingWriter
private[memory] type Committer = PreExecutingValidatingCommitter[
Option[DamlStateValue],
RawPreExecutingCommitStrategy.ReadSet,
RawKeyValuePairsWithLogEntry,
]
private def createBatchedCommitter(
keyValueCommitting: KeyValueCommitting,
ledgerDataExporter: LedgerDataExporter,
)(implicit materializer: Materializer): ValidateAndCommit = {
val validator = BatchedSubmissionValidator[Index](
BatchedSubmissionValidatorFactory.defaultParametersFor(
batchingLedgerWriterConfig.enableBatching
),
keyValueCommitting,
new ConflictDetection(metrics),
metrics,
ledgerDataExporter,
)
val committer = BatchedValidatingCommitter[Index](
() => timeProvider.getCurrentTime,
validator,
stateValueCache,
)
locally {
implicit val executionContext: ExecutionContext = materializer.executionContext
def validateAndCommit(
correlationId: String,
submissionEnvelope: Raw.Value,
submittingParticipantId: ParticipantId,
) =
new InMemoryLedgerStateAccess(state, metrics).inTransaction { ledgerStateOperations =>
committer.commit(
correlationId,
submissionEnvelope,
submittingParticipantId,
ledgerStateOperations,
)
}
validateAndCommit
}
}
}
final class PreExecutingOwner(
final class Owner(
participantId: ParticipantId,
keySerializationStrategy: StateKeySerializationStrategy,
metrics: Metrics,
@ -150,28 +84,30 @@ object InMemoryLedgerWriter {
dispatcher: Dispatcher[Index],
state: InMemoryState,
engine: Engine,
)(implicit materializer: Materializer)
extends ResourceOwner[LedgerWriter] {
override def acquire()(implicit context: ResourceContext): Resource[LedgerWriter] = {
val keyValueCommitting = createKeyValueCommitting(metrics, timeProvider, engine)
committerExecutionContext: ExecutionContext,
) extends ResourceOwner[LedgerWriter] {
private val now = () => timeProvider.getCurrentTime
override def acquire()(implicit context: ResourceContext): Resource[LedgerWriter] =
for {
ledgerDataExporter <- LedgerDataExporter.Owner.acquire()
} yield {
val committer = createPreExecutingCommitter(keyValueCommitting, ledgerDataExporter)
new InMemoryLedgerWriter(participantId, dispatcher, state, committer)
}
}
} yield new InMemoryLedgerWriter(
participantId,
dispatcher,
now,
state,
newCommitter(ledgerDataExporter),
committerExecutionContext,
metrics,
)
private def createPreExecutingCommitter(
keyValueCommitting: KeyValueCommitting,
ledgerDataExporter: LedgerDataExporter,
)(implicit materializer: Materializer): ValidateAndCommit = {
val now = () => timeProvider.getCurrentTime
val committer = new PreExecutingValidatingCommitter[
Option[DamlStateValue],
RawPreExecutingCommitStrategy.ReadSet,
RawKeyValuePairsWithLogEntry,
](
private def newCommitter(ledgerDataExporter: LedgerDataExporter): Committer = {
val keyValueCommitting = new KeyValueCommitting(
engine,
metrics,
inStaticTimeMode = needStaticTimeModeFor(timeProvider),
)
new Committer(
transformStateReader = transformStateReader(keySerializationStrategy, stateValueCache),
validator = new PreExecutingSubmissionValidator(
keyValueCommitting,
@ -183,24 +119,6 @@ object InMemoryLedgerWriter {
postExecutionWriter = new RawPostExecutionWriter,
ledgerDataExporter = ledgerDataExporter,
)
locally {
implicit val executionContext: ExecutionContext = materializer.executionContext
def validateAndCommit(
correlationId: String,
submissionEnvelope: Raw.Value,
submittingParticipantId: ParticipantId,
) =
committer.commit(
submittingParticipantId,
correlationId,
submissionEnvelope,
now(),
new InMemoryLedgerStateAccess(state, metrics),
)
validateAndCommit
}
}
private def transformStateReader(
@ -215,13 +133,6 @@ object InMemoryLedgerWriter {
}
}
private def createKeyValueCommitting(
metrics: Metrics,
timeProvider: TimeProvider,
engine: Engine,
): KeyValueCommitting =
new KeyValueCommitting(engine, metrics, inStaticTimeMode = needStaticTimeModeFor(timeProvider))
private def needStaticTimeModeFor(timeProvider: TimeProvider): Boolean =
timeProvider != TimeProvider.UTC

View File

@ -11,7 +11,7 @@ package object memory {
private[memory] val StartIndex: Index = 0
private[memory] def dispatcherOwner: ResourceOwner[Dispatcher[Index]] =
def dispatcherOwner: ResourceOwner[Dispatcher[Index]] =
Dispatcher.owner(
name = "in-memory-key-value-participant-state",
zeroIndex = StartIndex,

View File

@ -1,56 +0,0 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.on.memory
import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase
import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase.ParticipantState
import com.daml.ledger.participant.state.kvutils.api.{
BatchingLedgerWriterConfig,
KeyValueParticipantState,
}
import com.daml.ledger.participant.state.v1.{LedgerId, ParticipantId}
import com.daml.ledger.resources.ResourceOwner
import com.daml.lf.engine.Engine
import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
import scala.concurrent.duration.DurationInt
abstract class InMemoryLedgerReaderWriterIntegrationSpecBase(enableBatching: Boolean)
extends ParticipantStateIntegrationSpecBase(
s"In-memory ledger/participant with parallel validation ${if (enableBatching) "enabled"
else "disabled"}"
) {
private val batchingLedgerWriterConfig =
BatchingLedgerWriterConfig(
enableBatching = enableBatching,
// In case of serial validation, we need a queue length of 1000 because the
// "process many party allocations" test case will be sending in that many requests at once
// (otherwise some of those would be rejected).
// See [[ParticipantStateIntegrationSpecBase]].
maxBatchQueueSize = if (enableBatching) 100 else 1000,
maxBatchSizeBytes = 4L * 1024L * 1024L /* 4MB */,
maxBatchWaitDuration = 100.millis,
// In-memory ledger doesn't support concurrent commits.
maxBatchConcurrentCommits = 1,
)
override val isPersistent: Boolean = false
override def participantStateFactory(
ledgerId: LedgerId,
participantId: ParticipantId,
testId: String,
metrics: Metrics,
)(implicit loggingContext: LoggingContext): ResourceOwner[ParticipantState] =
new InMemoryLedgerReaderWriter.SingleParticipantBatchingOwner(
ledgerId,
batchingLedgerWriterConfig,
participantId,
metrics = metrics,
engine = Engine.DevEngine(),
).map(readerWriter => new KeyValueParticipantState(readerWriter, readerWriter, metrics))
}

View File

@ -6,7 +6,7 @@
</encoder>
</appender>
<root level="TRACE">
<root level="DEBUG">
<appender-ref ref="STDOUT"/>
</root>
</configuration>

View File

@ -3,5 +3,46 @@
package com.daml.ledger.on.memory
import java.util.concurrent.Executors
import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase
import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase.ParticipantState
import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState
import com.daml.ledger.participant.state.v1.{LedgerId, ParticipantId}
import com.daml.ledger.resources.ResourceOwner
import com.daml.ledger.validator.StateKeySerializationStrategy
import com.daml.lf.engine.Engine
import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
import scala.concurrent.ExecutionContext
class InMemoryLedgerReaderWriterIntegrationSpec
extends InMemoryLedgerReaderWriterIntegrationSpecBase(enableBatching = true)
extends ParticipantStateIntegrationSpecBase(s"In-memory ledger/participant") {
override val isPersistent: Boolean = false
override def participantStateFactory(
ledgerId: LedgerId,
participantId: ParticipantId,
testId: String,
metrics: Metrics,
)(implicit loggingContext: LoggingContext): ResourceOwner[ParticipantState] =
for {
dispatcher <- dispatcherOwner
committerExecutionContext <- ResourceOwner
.forExecutorService(() => Executors.newCachedThreadPool())
.map(ExecutionContext.fromExecutorService)
readerWriter <- new InMemoryLedgerReaderWriter.Owner(
ledgerId = ledgerId,
participantId = participantId,
keySerializationStrategy = StateKeySerializationStrategy.createDefault(),
metrics = metrics,
dispatcher = dispatcher,
state = InMemoryState.empty,
engine = Engine.DevEngine(),
committerExecutionContext = committerExecutionContext,
)
} yield new KeyValueParticipantState(readerWriter, readerWriter, metrics)
}

View File

@ -3,19 +3,23 @@
package com.daml.ledger.on.memory
import java.time.Instant
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import com.daml.ledger.participant.state.kvutils.Raw
import com.daml.ledger.participant.state.kvutils.api.CommitMetadata
import com.daml.ledger.participant.state.v1.{ParticipantId, SubmissionResult}
import com.daml.ledger.validator.ValidateAndCommit
import com.daml.ledger.validator.LedgerStateAccess
import com.daml.lf.data.Ref
import com.daml.metrics.Metrics
import com.daml.platform.akkastreams.dispatcher.Dispatcher
import com.google.protobuf.ByteString
import org.mockito.{ArgumentMatchersSugar, MockitoSugar}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec
import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, Future}
class InMemoryLedgerWriterSpec
extends AsyncWordSpec
@ -26,16 +30,27 @@ class InMemoryLedgerWriterSpec
"commit" should {
"not signal new head in case of failure" in {
val mockDispatcher = mock[Dispatcher[Index]]
val mockCommitter = mock[ValidateAndCommit]
when(mockCommitter(any[String], any[Raw.Value], any[ParticipantId]))
val mockCommitter = mock[InMemoryLedgerWriter.Committer]
when(
mockCommitter.commit(
any[ParticipantId],
any[String],
any[Raw.Value],
any[Instant],
any[LedgerStateAccess[Any]],
)(any[ExecutionContext])
)
.thenReturn(
Future.successful(SubmissionResult.InternalError("Validation failed with an exception"))
)
val instance = new InMemoryLedgerWriter(
Ref.ParticipantId.assertFromString("participant ID"),
mockDispatcher,
InMemoryState.empty,
mockCommitter,
participantId = Ref.ParticipantId.assertFromString("participant ID"),
dispatcher = mockDispatcher,
now = () => Instant.EPOCH,
state = InMemoryState.empty,
committer = mockCommitter,
committerExecutionContext = executionContext,
metrics = new Metrics(new MetricRegistry),
)
instance

View File

@ -1,7 +0,0 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.on.memory
class NonBatchedInMemoryLedgerReaderWriterIntegrationSpec
extends InMemoryLedgerReaderWriterIntegrationSpecBase(enableBatching = false) {}

View File

@ -226,7 +226,6 @@ client_server_build(
server_args = [
"--contract-id-seeding=testing-weak",
"--participant=participant-id=%s,port=%d" % (REFERENCE_LEDGER_EXPORT_NAME, REFERENCE_LEDGER_EXPORT_PORT),
"--force-pre-execution=true",
],
visibility = [":__subpackages__"],
) if not is_windows and scala_major_version == "2.12" else None

View File

@ -71,6 +71,9 @@ object Config {
extra = extra,
)
def ownerWithoutExtras(name: String, args: collection.Seq[String]): ResourceOwner[Config[Unit]] =
owner(name, _ => (), (), args)
def owner[Extra](
name: String,
extraOptions: OptionParser[Config[Extra]] => Unit,

View File

@ -5,22 +5,14 @@ package com.daml.ledger
import com.daml.caching.ConcurrentCache
import com.daml.ledger.participant.state.kvutils.DamlKvutils.DamlStateValue
import com.daml.ledger.participant.state.kvutils.{CorrelationId, Raw}
import com.daml.ledger.participant.state.v1.{ParticipantId, SubmissionResult}
import com.daml.ledger.participant.state.kvutils.Raw
import com.daml.ledger.participant.state.v1.ParticipantId
import scala.concurrent.{ExecutionContext, Future}
package object validator {
type SubmittingParticipantId = ParticipantId
/** Orchestrates committing to a ledger after validating submissions.
*/
type ValidateAndCommit = (
CorrelationId,
Raw.Value,
SubmittingParticipantId,
) => Future[SubmissionResult]
type StateValueCache = ConcurrentCache[Raw.Value, DamlStateValue]
// At some point, someone much smarter than the author of this code will reimplement the usages of

View File

@ -14,12 +14,10 @@ import akka.stream.scaladsl.Source
import ch.qos.logback.classic.Level
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.on.memory
import com.daml.ledger.participant.state.kvutils.api.{
BatchingLedgerWriterConfig,
KeyValueParticipantState,
}
import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState
import com.daml.ledger.participant.state.v1._
import com.daml.ledger.resources.{ResourceOwner, TestResourceContext}
import com.daml.ledger.validator.StateKeySerializationStrategy
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.LedgerString
import com.daml.lf.engine.Engine
@ -256,13 +254,22 @@ object RecoveringIndexerIntegrationSpec {
loggingContext: LoggingContext,
): ResourceOwner[ParticipantState] = {
val metrics = new Metrics(new MetricRegistry)
new memory.InMemoryLedgerReaderWriter.SingleParticipantBatchingOwner(
ledgerId,
BatchingLedgerWriterConfig.reasonableDefault,
participantId,
for {
dispatcher <- memory.dispatcherOwner
committerExecutionContext <- ResourceOwner
.forExecutorService(() => Executors.newCachedThreadPool())
.map(ExecutionContext.fromExecutorService)
readerWriter <- new memory.InMemoryLedgerReaderWriter.Owner(
ledgerId = ledgerId,
participantId = participantId,
keySerializationStrategy = StateKeySerializationStrategy.createDefault(),
metrics = metrics,
dispatcher = dispatcher,
state = memory.InMemoryState.empty,
engine = Engine.DevEngine(),
).map(readerWriter => new KeyValueParticipantState(readerWriter, readerWriter, metrics))
committerExecutionContext = committerExecutionContext,
)
} yield new KeyValueParticipantState(readerWriter, readerWriter, metrics)
}
}