ledger-on-memory: Move committer construction into the writer. (#8735)

Closer to where it's used.

CHANGELOG_BEGIN
CHANGELOG_END
This commit is contained in:
Samir Talwar 2021-02-03 14:37:45 +01:00 committed by GitHub
parent 45336d6fb5
commit 9f06a28838
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 206 additions and 174 deletions

View File

@ -6,30 +6,16 @@ package com.daml.ledger.on.memory
import akka.stream.Materializer
import com.daml.api.util.TimeProvider
import com.daml.caching.Cache
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue}
import com.daml.ledger.participant.state.kvutils.api._
import com.daml.ledger.participant.state.kvutils.export.LedgerDataExporter
import com.daml.ledger.participant.state.kvutils.{Envelope, KeyValueCommitting, Raw}
import com.daml.ledger.participant.state.v1.{LedgerId, ParticipantId}
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.ledger.validator.batch.{
BatchedSubmissionValidator,
BatchedSubmissionValidatorFactory,
BatchedValidatingCommitter,
ConflictDetection,
}
import com.daml.ledger.validator.caching.{CachingStateReader, ImmutablesOnlyCacheUpdatePolicy}
import com.daml.ledger.validator.preexecution._
import com.daml.ledger.validator.reading.{DamlLedgerStateReader, LedgerStateReader}
import com.daml.ledger.validator.{StateKeySerializationStrategy, ValidateAndCommit}
import com.daml.ledger.validator.StateKeySerializationStrategy
import com.daml.lf.engine.Engine
import com.daml.logging.LoggingContext.newLoggingContext
import com.daml.metrics.Metrics
import com.daml.platform.akkastreams.dispatcher.Dispatcher
import scala.concurrent.ExecutionContext
object InMemoryLedgerReaderWriter {
final class BatchingOwner(
ledgerId: LedgerId,
batchingLedgerWriterConfig: BatchingLedgerWriterConfig,
@ -42,35 +28,21 @@ object InMemoryLedgerReaderWriter {
engine: Engine,
)(implicit materializer: Materializer)
extends ResourceOwner[KeyValueLedger] {
override def acquire()(implicit context: ResourceContext): Resource[KeyValueLedger] =
override def acquire()(implicit context: ResourceContext): Resource[KeyValueLedger] = {
val reader = new InMemoryLedgerReader(ledgerId, dispatcher, state, metrics)
for {
ledgerDataExporter <- LedgerDataExporter.Owner.acquire()
keyValueCommitting = createKeyValueCommitting(metrics, timeProvider, engine)
committer = createBatchedCommitter(
keyValueCommitting,
writer <- new InMemoryLedgerWriter.BatchingOwner(
batchingLedgerWriterConfig,
state,
participantId,
metrics,
timeProvider,
stateValueCache,
ledgerDataExporter,
)
reader = new InMemoryLedgerReader(ledgerId, dispatcher, state, metrics)
writer = new InMemoryLedgerWriter(
participantId,
dispatcher,
state,
committer,
)
// We need to generate batched submissions for the validator in order to improve throughput.
// Hence, we have a BatchingLedgerWriter collect and forward batched submissions to the
// in-memory committer.
batchingWriter <- newLoggingContext { implicit loggingContext =>
ResourceOwner
.forCloseable(() => BatchingLedgerWriter(batchingLedgerWriterConfig, writer))
.acquire()
}
} yield createKeyValueLedger(reader, batchingWriter)
engine,
).acquire()
} yield createKeyValueLedger(reader, writer)
}
}
final class SingleParticipantBatchingOwner(
@ -83,7 +55,6 @@ object InMemoryLedgerReaderWriter {
engine: Engine,
)(implicit materializer: Materializer)
extends ResourceOwner[KeyValueLedger] {
override def acquire()(implicit context: ResourceContext): Resource[KeyValueLedger] = {
val state = InMemoryState.empty
for {
@ -116,139 +87,20 @@ object InMemoryLedgerReaderWriter {
)(implicit materializer: Materializer)
extends ResourceOwner[KeyValueLedger] {
override def acquire()(implicit context: ResourceContext): Resource[KeyValueLedger] = {
val keyValueCommitting = createKeyValueCommitting(metrics, timeProvider, engine)
val committer = createPreExecutingCommitter(
keyValueCommitting,
keySerializationStrategy,
state,
metrics,
timeProvider,
stateValueCache,
)
val reader = new InMemoryLedgerReader(ledgerId, dispatcher, state, metrics)
val writer = new InMemoryLedgerWriter(
participantId,
dispatcher,
state,
committer,
)
Resource.successful(createKeyValueLedger(reader, writer))
for {
writer <- new InMemoryLedgerWriter.PreExecutingOwner(
participantId,
keySerializationStrategy,
metrics,
timeProvider,
stateValueCache,
dispatcher,
state,
engine,
).acquire()
} yield createKeyValueLedger(reader, writer)
}
}
private def createBatchedCommitter(
keyValueCommitting: KeyValueCommitting,
batchingLedgerWriterConfig: BatchingLedgerWriterConfig,
state: InMemoryState,
metrics: Metrics,
timeProvider: TimeProvider,
stateValueCache: InMemoryLedgerWriter.StateValueCache,
ledgerDataExporter: LedgerDataExporter,
)(implicit materializer: Materializer): ValidateAndCommit = {
val validator = BatchedSubmissionValidator[Index](
BatchedSubmissionValidatorFactory.defaultParametersFor(
batchingLedgerWriterConfig.enableBatching
),
keyValueCommitting,
new ConflictDetection(metrics),
metrics,
ledgerDataExporter,
)
val committer = BatchedValidatingCommitter[Index](
() => timeProvider.getCurrentTime,
validator,
stateValueCache,
)
locally {
implicit val executionContext: ExecutionContext = materializer.executionContext
def validateAndCommit(
correlationId: String,
submissionEnvelope: Raw.Value,
submittingParticipantId: ParticipantId,
) =
new InMemoryLedgerStateAccess(state, metrics).inTransaction { ledgerStateOperations =>
committer.commit(
correlationId,
submissionEnvelope,
submittingParticipantId,
ledgerStateOperations,
)
}
validateAndCommit
}
}
private def createPreExecutingCommitter(
keyValueCommitting: KeyValueCommitting,
keySerializationStrategy: StateKeySerializationStrategy,
state: InMemoryState,
metrics: Metrics,
timeProvider: TimeProvider,
stateValueCache: InMemoryLedgerWriter.StateValueCache,
)(implicit materializer: Materializer): ValidateAndCommit = {
val committer = new PreExecutingValidatingCommitter[
Option[DamlStateValue],
RawPreExecutingCommitStrategy.ReadSet,
RawKeyValuePairsWithLogEntry,
](
transformStateReader = transformStateReader(keySerializationStrategy, stateValueCache),
validator = new PreExecutingSubmissionValidator(
keyValueCommitting,
new RawPreExecutingCommitStrategy(keySerializationStrategy),
metrics,
),
postExecutionConflictDetector = new EqualityBasedPostExecutionConflictDetector(),
postExecutionFinalizer = new RawPostExecutionFinalizer(
now = () => timeProvider.getCurrentTime
),
)
locally {
implicit val executionContext: ExecutionContext = materializer.executionContext
def validateAndCommit(
correlationId: String,
submissionEnvelope: Raw.Value,
submittingParticipantId: ParticipantId,
) =
committer.commit(
correlationId,
submissionEnvelope,
submittingParticipantId,
new InMemoryLedgerStateAccess(state, metrics),
)
validateAndCommit
}
}
private def transformStateReader(
keySerializationStrategy: StateKeySerializationStrategy,
cache: Cache[DamlStateKey, DamlStateValue],
)(stateReader: LedgerStateReader): DamlLedgerStateReader = {
CachingStateReader(
cache,
ImmutablesOnlyCacheUpdatePolicy,
stateReader
.contramapKeys(keySerializationStrategy.serializeStateKey)
.mapValues(value =>
value.map(
Envelope
.openStateValue(_)
.getOrElse(sys.error("Opening enveloped DamlStateValue failed"))
)
),
)
}
private def createKeyValueCommitting(
metrics: Metrics,
timeProvider: TimeProvider,
engine: Engine,
): KeyValueCommitting =
new KeyValueCommitting(engine, metrics, inStaticTimeMode = needStaticTimeModeFor(timeProvider))
private def needStaticTimeModeFor(timeProvider: TimeProvider): Boolean =
timeProvider != TimeProvider.UTC
}

View File

@ -3,18 +3,45 @@
package com.daml.ledger.on.memory
import akka.stream.Materializer
import com.daml.api.util.TimeProvider
import com.daml.caching.Cache
import com.daml.dec.DirectExecutionContext
import com.daml.ledger.api.health.{HealthStatus, Healthy}
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue}
import com.daml.ledger.participant.state.kvutils.Raw
import com.daml.ledger.participant.state.kvutils.api.{CommitMetadata, LedgerWriter}
import com.daml.ledger.participant.state.kvutils.export.LedgerDataExporter
import com.daml.ledger.participant.state.kvutils.api.{
BatchingLedgerWriter,
BatchingLedgerWriterConfig,
CommitMetadata,
LedgerWriter,
}
import com.daml.ledger.participant.state.kvutils.{Envelope, KeyValueCommitting, Raw}
import com.daml.ledger.participant.state.v1.{ParticipantId, SubmissionResult}
import com.daml.ledger.validator.ValidateAndCommit
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.ledger.validator.batch.{
BatchedSubmissionValidator,
BatchedSubmissionValidatorFactory,
BatchedValidatingCommitter,
ConflictDetection,
}
import com.daml.ledger.validator.caching.{CachingStateReader, ImmutablesOnlyCacheUpdatePolicy}
import com.daml.ledger.validator.preexecution.{
EqualityBasedPostExecutionConflictDetector,
PreExecutingSubmissionValidator,
PreExecutingValidatingCommitter,
RawKeyValuePairsWithLogEntry,
RawPostExecutionFinalizer,
RawPreExecutingCommitStrategy,
}
import com.daml.ledger.validator.reading.{DamlLedgerStateReader, LedgerStateReader}
import com.daml.ledger.validator.{StateKeySerializationStrategy, ValidateAndCommit}
import com.daml.lf.engine.Engine
import com.daml.logging.LoggingContext.newLoggingContext
import com.daml.metrics.Metrics
import com.daml.platform.akkastreams.dispatcher.Dispatcher
import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Success
final class InMemoryLedgerWriter private[memory] (
@ -42,4 +69,157 @@ object InMemoryLedgerWriter {
private[memory] type StateValueCache = Cache[DamlStateKey, DamlStateValue]
final class BatchingOwner(
batchingLedgerWriterConfig: BatchingLedgerWriterConfig,
participantId: ParticipantId,
metrics: Metrics,
timeProvider: TimeProvider = DefaultTimeProvider,
stateValueCache: StateValueCache = Cache.none,
dispatcher: Dispatcher[Index],
state: InMemoryState,
engine: Engine,
)(implicit materializer: Materializer)
extends ResourceOwner[LedgerWriter] {
override def acquire()(implicit context: ResourceContext): Resource[LedgerWriter] =
for {
ledgerDataExporter <- LedgerDataExporter.Owner.acquire()
keyValueCommitting = createKeyValueCommitting(metrics, timeProvider, engine)
committer = createBatchedCommitter(keyValueCommitting, ledgerDataExporter)
writer = new InMemoryLedgerWriter(participantId, dispatcher, state, committer)
// We need to generate batched submissions for the validator in order to improve throughput.
// Hence, we have a BatchingLedgerWriter collect and forward batched submissions to the
// in-memory committer.
batchingWriter <- newLoggingContext { implicit loggingContext =>
ResourceOwner
.forCloseable(() => BatchingLedgerWriter(batchingLedgerWriterConfig, writer))
.acquire()
}
} yield batchingWriter
private def createBatchedCommitter(
keyValueCommitting: KeyValueCommitting,
ledgerDataExporter: LedgerDataExporter,
)(implicit materializer: Materializer): ValidateAndCommit = {
val validator = BatchedSubmissionValidator[Index](
BatchedSubmissionValidatorFactory.defaultParametersFor(
batchingLedgerWriterConfig.enableBatching
),
keyValueCommitting,
new ConflictDetection(metrics),
metrics,
ledgerDataExporter,
)
val committer = BatchedValidatingCommitter[Index](
() => timeProvider.getCurrentTime,
validator,
stateValueCache,
)
locally {
implicit val executionContext: ExecutionContext = materializer.executionContext
def validateAndCommit(
correlationId: String,
submissionEnvelope: Raw.Value,
submittingParticipantId: ParticipantId,
) =
new InMemoryLedgerStateAccess(state, metrics).inTransaction { ledgerStateOperations =>
committer.commit(
correlationId,
submissionEnvelope,
submittingParticipantId,
ledgerStateOperations,
)
}
validateAndCommit
}
}
}
final class PreExecutingOwner(
participantId: ParticipantId,
keySerializationStrategy: StateKeySerializationStrategy,
metrics: Metrics,
timeProvider: TimeProvider = DefaultTimeProvider,
stateValueCache: StateValueCache = Cache.none,
dispatcher: Dispatcher[Index],
state: InMemoryState,
engine: Engine,
)(implicit materializer: Materializer)
extends ResourceOwner[LedgerWriter] {
override def acquire()(implicit context: ResourceContext): Resource[LedgerWriter] = {
val keyValueCommitting = createKeyValueCommitting(metrics, timeProvider, engine)
val committer = createPreExecutingCommitter(keyValueCommitting)
val writer = new InMemoryLedgerWriter(participantId, dispatcher, state, committer)
Resource.successful(writer)
}
private def createPreExecutingCommitter(
keyValueCommitting: KeyValueCommitting
)(implicit materializer: Materializer): ValidateAndCommit = {
val committer = new PreExecutingValidatingCommitter[
Option[DamlStateValue],
RawPreExecutingCommitStrategy.ReadSet,
RawKeyValuePairsWithLogEntry,
](
transformStateReader = transformStateReader(keySerializationStrategy, stateValueCache),
validator = new PreExecutingSubmissionValidator(
keyValueCommitting,
new RawPreExecutingCommitStrategy(keySerializationStrategy),
metrics,
),
postExecutionConflictDetector = new EqualityBasedPostExecutionConflictDetector(),
postExecutionFinalizer = new RawPostExecutionFinalizer(
now = () => timeProvider.getCurrentTime
),
)
locally {
implicit val executionContext: ExecutionContext = materializer.executionContext
def validateAndCommit(
correlationId: String,
submissionEnvelope: Raw.Value,
submittingParticipantId: ParticipantId,
) =
committer.commit(
correlationId,
submissionEnvelope,
submittingParticipantId,
new InMemoryLedgerStateAccess(state, metrics),
)
validateAndCommit
}
}
}
private def transformStateReader(
keySerializationStrategy: StateKeySerializationStrategy,
cache: Cache[DamlStateKey, DamlStateValue],
)(stateReader: LedgerStateReader): DamlLedgerStateReader = {
CachingStateReader(
cache,
ImmutablesOnlyCacheUpdatePolicy,
stateReader
.contramapKeys(keySerializationStrategy.serializeStateKey)
.mapValues(value =>
value.map(
Envelope
.openStateValue(_)
.getOrElse(sys.error("Opening enveloped DamlStateValue failed"))
)
),
)
}
private def createKeyValueCommitting(
metrics: Metrics,
timeProvider: TimeProvider,
engine: Engine,
): KeyValueCommitting =
new KeyValueCommitting(engine, metrics, inStaticTimeMode = needStaticTimeModeFor(timeProvider))
private def needStaticTimeModeFor(timeProvider: TimeProvider): Boolean =
timeProvider != TimeProvider.UTC
}