kvutils: Remove fingerprints from the PreExecutingValidatingCommitter. [KVL-747] (#8337)

* ledger-on-memory: Create type aliases for caches.

* kvutils: Move caching out of the committer and into the constructor.

* kvutils: Move `reader.map` into the conflict detector.

* kvutils: Remove `LogResult` from the `PreExecutingValidatingCommitter`.

By making it covariant, we can just use `Any`, as we don't care about
the actual type.

* kvutils: Pull the state type out of `PreExecutingValidatingCommitter`.

It is now generic over state types.

CHANGELOG_BEGIN
CHANGELOG_END
This commit is contained in:
Samir Talwar 2020-12-18 09:13:14 +01:00 committed by GitHub
parent 72b00a05f2
commit cb0f681932
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 95 additions and 62 deletions

View File

@ -15,6 +15,7 @@ import com.daml.ledger.participant.state.kvutils.api._
import com.daml.ledger.participant.state.kvutils.export.LedgerDataExporter
import com.daml.ledger.participant.state.kvutils.{
Bytes,
Envelope,
Fingerprint,
FingerprintPlaceholder,
KeyValueCommitting,
@ -22,16 +23,20 @@ import com.daml.ledger.participant.state.kvutils.{
}
import com.daml.ledger.participant.state.v1.{LedgerId, Offset, ParticipantId, SubmissionResult}
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.ledger.validator.LedgerStateOperations.Value
import com.daml.ledger.validator.LedgerStateOperations.{Key, Value}
import com.daml.ledger.validator.batch.{
BatchedSubmissionValidator,
BatchedSubmissionValidatorFactory,
BatchedValidatingCommitter,
ConflictDetection
}
import com.daml.ledger.validator.caching.ImmutablesOnlyCacheUpdatePolicy
import com.daml.ledger.validator.caching.{
CachingDamlLedgerStateReaderWithFingerprints,
ImmutablesOnlyCacheUpdatePolicy
}
import com.daml.ledger.validator.preexecution.LogAppenderPreExecutingCommitStrategy.FingerprintedReadSet
import com.daml.ledger.validator.preexecution._
import com.daml.ledger.validator.reading.StateReader
import com.daml.ledger.validator.{StateKeySerializationStrategy, ValidateAndCommit}
import com.daml.lf.engine.Engine
import com.daml.logging.LoggingContext.newLoggingContext
@ -72,13 +77,17 @@ final class InMemoryLedgerReaderWriter private[memory] (
object InMemoryLedgerReaderWriter {
val DefaultTimeProvider: TimeProvider = TimeProvider.UTC
type BatchingStateValueCache = Cache[DamlStateKey, DamlStateValue]
type PreExecutionStateValueCache = Cache[DamlStateKey, (DamlStateValue, Fingerprint)]
final class BatchingOwner(
ledgerId: LedgerId,
batchingLedgerWriterConfig: BatchingLedgerWriterConfig,
participantId: ParticipantId,
metrics: Metrics,
timeProvider: TimeProvider = DefaultTimeProvider,
stateValueCache: Cache[DamlStateKey, DamlStateValue] = Cache.none,
stateValueCache: BatchingStateValueCache = Cache.none,
dispatcher: Dispatcher[Index],
state: InMemoryState,
engine: Engine,
@ -121,7 +130,7 @@ object InMemoryLedgerReaderWriter {
batchingLedgerWriterConfig: BatchingLedgerWriterConfig,
participantId: ParticipantId,
timeProvider: TimeProvider = DefaultTimeProvider,
stateValueCache: Cache[DamlStateKey, DamlStateValue] = Cache.none,
stateValueCache: BatchingStateValueCache = Cache.none,
metrics: Metrics,
engine: Engine,
)(implicit materializer: Materializer)
@ -152,8 +161,7 @@ object InMemoryLedgerReaderWriter {
keySerializationStrategy: StateKeySerializationStrategy,
metrics: Metrics,
timeProvider: TimeProvider = DefaultTimeProvider,
stateValueCacheForPreExecution: Cache[DamlStateKey, (DamlStateValue, Fingerprint)] =
Cache.none,
stateValueCacheForPreExecution: PreExecutionStateValueCache = Cache.none,
dispatcher: Dispatcher[Index],
state: InMemoryState,
engine: Engine,
@ -191,7 +199,7 @@ object InMemoryLedgerReaderWriter {
state: InMemoryState,
metrics: Metrics,
timeProvider: TimeProvider,
stateValueCache: Cache[DamlStateKey, DamlStateValue],
stateValueCache: BatchingStateValueCache,
ledgerDataExporter: LedgerDataExporter,
)(implicit materializer: Materializer): ValidateAndCommit = {
val validator = BatchedSubmissionValidator[Index](
@ -234,22 +242,22 @@ object InMemoryLedgerReaderWriter {
state: InMemoryState,
metrics: Metrics,
timeProvider: TimeProvider,
stateValueCacheForPreExecution: Cache[DamlStateKey, (DamlStateValue, Fingerprint)],
stateValueCacheForPreExecution: PreExecutionStateValueCache,
)(implicit materializer: Materializer): ValidateAndCommit = {
val commitStrategy = new LogAppenderPreExecutingCommitStrategy(keySerializationStrategy)
val valueToFingerprint: Option[Value] => Fingerprint =
_.getOrElse(FingerprintPlaceholder)
val validator = new PreExecutingSubmissionValidator(keyValueCommitting, commitStrategy, metrics)
val committer =
new PreExecutingValidatingCommitter[FingerprintedReadSet, RawKeyValuePairsWithLogEntry](
keySerializationStrategy,
validator,
valueToFingerprint,
new EqualityBasedPostExecutionConflictDetector,
new RawPostExecutionFinalizer(now = timeProvider.getCurrentTime _),
stateValueCache = stateValueCacheForPreExecution,
ImmutablesOnlyCacheUpdatePolicy,
)
val committer = new PreExecutingValidatingCommitter[
(Option[DamlStateValue], Fingerprint),
FingerprintedReadSet,
RawKeyValuePairsWithLogEntry,
](
transformStateReader =
transformStateReader(keySerializationStrategy, stateValueCacheForPreExecution),
validator = validator,
postExecutionConflictDetector =
new EqualityBasedPostExecutionConflictDetector().contramapValues(_._2),
postExecutionFinalizer = new RawPostExecutionFinalizer(now = timeProvider.getCurrentTime _),
)
locally {
implicit val executionContext: ExecutionContext = materializer.executionContext
@ -269,6 +277,27 @@ object InMemoryLedgerReaderWriter {
}
}
private def transformStateReader(
keySerializationStrategy: StateKeySerializationStrategy,
cache: Cache[DamlStateKey, (DamlStateValue, Fingerprint)]
)(stateReader: StateReader[Key, Option[Value]])
: StateReader[DamlStateKey, (Option[DamlStateValue], Fingerprint)] = {
CachingDamlLedgerStateReaderWithFingerprints(
cache,
ImmutablesOnlyCacheUpdatePolicy,
stateReader
.contramapKeys(keySerializationStrategy.serializeStateKey)
.mapValues(value => {
val damlStateValue = value.map(
Envelope
.openStateValue(_)
.getOrElse(sys.error("Opening enveloped DamlStateValue failed")))
val fingerprint = value.getOrElse(FingerprintPlaceholder)
damlStateValue -> fingerprint
}),
)
}
private def createKeyValueCommitting(
metrics: Metrics,
timeProvider: TimeProvider,

View File

@ -16,7 +16,7 @@ import scala.concurrent.{ExecutionContext, Future}
*
* @tparam LogResult type of the offset used for a log entry
*/
trait LedgerStateAccess[LogResult] {
trait LedgerStateAccess[+LogResult] {
/**
* Performs read and write operations on the backing store in a single atomic transaction.

View File

@ -12,8 +12,14 @@ import scala.concurrent.{ExecutionContext, Future}
* pre-execution pipeline.
*/
trait PostExecutionConflictDetector[StateKey, StateValue, -ReadSet, -WriteSet] {
self =>
/**
* Re-reads the current state of the ledger and ensures that the state has not changed compared
* to the pre-execution read set.
*
* If there is a conflict, throws a [[ConflictDetectedException]].
*
* @param preExecutionOutput The output from the pre-execution stage.
* @param reader The operations that can access actual ledger storage as part of a transaction.
* @param executionContext The execution context for ledger state operations.
@ -23,4 +29,24 @@ trait PostExecutionConflictDetector[StateKey, StateValue, -ReadSet, -WriteSet] {
preExecutionOutput: PreExecutionOutput[ReadSet, WriteSet],
reader: StateReader[StateKey, StateValue],
)(implicit executionContext: ExecutionContext): Future[Unit]
/**
* Transforms the reader to widen the state value, allowing it to handle any value that can be
* converted to `StateValue`.
*
* This is an instance of a "contravariant functor", in which the mapping is backwards, because
* the values are used as input to the conflict detection.
*
* @tparam NewStateValue The new state value type.
* @return A new conflict detector that transforms after reading, before detecting conflicts.
*/
def contramapValues[NewStateValue](transformValue: NewStateValue => StateValue)
: PostExecutionConflictDetector[StateKey, NewStateValue, ReadSet, WriteSet] =
new PostExecutionConflictDetector[StateKey, NewStateValue, ReadSet, WriteSet] {
override def detectConflicts(
preExecutionOutput: PreExecutionOutput[ReadSet, WriteSet],
reader: StateReader[StateKey, NewStateValue],
)(implicit executionContext: ExecutionContext): Future[Unit] =
self.detectConflicts(preExecutionOutput, reader.mapValues(transformValue))
}
}

View File

@ -3,21 +3,12 @@
package com.daml.ledger.validator.preexecution
import com.daml.caching.Cache
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue}
import com.daml.ledger.participant.state.kvutils.{Bytes, Fingerprint}
import com.daml.ledger.participant.state.kvutils.Bytes
import com.daml.ledger.participant.state.kvutils.DamlKvutils.DamlStateKey
import com.daml.ledger.participant.state.v1.{ParticipantId, SubmissionResult}
import com.daml.ledger.validator.LedgerStateOperations.Value
import com.daml.ledger.validator.RawToDamlLedgerStateReaderAdapter.deserializeDamlStateValue
import com.daml.ledger.validator.caching.{
CacheUpdatePolicy,
CachingDamlLedgerStateReaderWithFingerprints
}
import com.daml.ledger.validator.{
LedgerStateAccess,
LedgerStateOperationsReaderAdapter,
StateKeySerializationStrategy
}
import com.daml.ledger.validator.LedgerStateOperations.{Key, Value}
import com.daml.ledger.validator.reading.StateReader
import com.daml.ledger.validator.{LedgerStateAccess, LedgerStateOperationsReaderAdapter}
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.timer.RetryStrategy
@ -30,31 +21,28 @@ import scala.util.{Failure, Success}
* fingerprints alongside values), parametric in the logic that produces a fingerprint given a
* value.
*
* @param keySerializationStrategy The key serializer used for state keys.
* @param transformStateReader Transforms the state reader into the format used by the underlying store.
* @param validator The pre-execution validator.
* @param valueToFingerprint The logic that produces a fingerprint given a value.
* @param postExecutionConflictDetector The post-execution conflict detector.
* @param postExecutionFinalizer The post-execution finalizer.
* @param stateValueCache The cache instance for state values.
* @param cacheUpdatePolicy The caching policy for values.
*/
class PreExecutingValidatingCommitter[ReadSet, WriteSet](
keySerializationStrategy: StateKeySerializationStrategy,
class PreExecutingValidatingCommitter[StateValue, ReadSet, WriteSet](
transformStateReader: StateReader[Key, Option[Value]] => StateReader[
DamlStateKey,
StateValue,
],
validator: PreExecutingSubmissionValidator[
(Option[DamlStateValue], Fingerprint),
StateValue,
ReadSet,
WriteSet,
],
valueToFingerprint: Option[Value] => Fingerprint,
postExecutionConflictDetector: PostExecutionConflictDetector[
DamlStateKey,
Fingerprint,
StateValue,
ReadSet,
WriteSet,
],
postExecutionFinalizer: PostExecutionFinalizer[ReadSet, WriteSet],
stateValueCache: Cache[DamlStateKey, (DamlStateValue, Fingerprint)],
cacheUpdatePolicy: CacheUpdatePolicy[DamlStateKey],
) {
private val logger = ContextualizedLogger.get(getClass)
@ -62,39 +50,29 @@ class PreExecutingValidatingCommitter[ReadSet, WriteSet](
/**
* Pre-executes and then commits a submission.
*/
def commit[LogResult](
def commit(
correlationId: String,
submissionEnvelope: Bytes,
submittingParticipantId: ParticipantId,
ledgerStateAccess: LedgerStateAccess[LogResult],
ledgerStateAccess: LedgerStateAccess[Any],
)(implicit executionContext: ExecutionContext): Future[SubmissionResult] =
LoggingContext.newLoggingContext("correlationId" -> correlationId) { implicit loggingContext =>
// Sequential pre-execution, implemented by enclosing the whole pre-post-exec pipeline is a single transaction.
ledgerStateAccess.inTransaction { ledgerStateOperations =>
val stateReader = new LedgerStateOperationsReaderAdapter(ledgerStateOperations)
.contramapKeys(keySerializationStrategy.serializeStateKey)
.mapValues(value => value.map(deserializeDamlStateValue) -> valueToFingerprint(value))
val cachingStateReader = CachingDamlLedgerStateReaderWithFingerprints(
stateValueCache,
cacheUpdatePolicy,
stateReader,
)
val stateReader =
transformStateReader(new LedgerStateOperationsReaderAdapter(ledgerStateOperations))
for {
preExecutionOutput <- validator.validate(
submissionEnvelope,
submittingParticipantId,
cachingStateReader,
stateReader,
)
_ <- retry {
case _: ConflictDetectedException =>
logger.error("Conflict detected during post-execution. Retrying...")
true
} { (_, _) =>
val fingerprintStateReader = cachingStateReader.mapValues(_._2)
postExecutionConflictDetector.detectConflicts(
preExecutionOutput,
fingerprintStateReader,
)
postExecutionConflictDetector.detectConflicts(preExecutionOutput, stateReader)
}.transform {
case Failure(_: ConflictDetectedException) =>
logger.error("Too many conflicts detected during post-execution. Giving up.")