kvutils: Prepare the integrity checker for pre-execution. [KVL-822] (#8740)

* integrity-checker: Actually log the error, and fail.

Previously, we were just logging "FAIL" and exiting with a status code
of 0, which is not great.

* integrity-checker: Move the commit logic into the support class.

CHANGELOG_BEGIN
CHANGELOG_END

* integrity-checker: Run with `exec` to get rid of a bash instance.

* integrity-checker: Throw away the write set instead of clearing it.
This commit is contained in:
Samir Talwar 2021-02-04 09:49:10 +01:00 committed by GitHub
parent 5a08c5276f
commit 5944179828
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 146 additions and 150 deletions

View File

@ -12,7 +12,7 @@ import com.daml.metrics.Metrics
import scala.concurrent.{ExecutionContext, Future}
private[memory] final class InMemoryLedgerStateAccess(state: InMemoryState, metrics: Metrics)
final class InMemoryLedgerStateAccess(state: InMemoryState, metrics: Metrics)
extends LedgerStateAccess[Index] {
override def inTransaction[T](body: LedgerStateOperations[Index] => Future[T])(implicit
executionContext: ExecutionContext

View File

@ -3,22 +3,20 @@
package com.daml.ledger.on.memory
import com.daml.ledger.on.memory.InMemoryLedgerStateOperations._
import com.daml.ledger.on.memory.InMemoryState.MutableLog
import com.daml.ledger.participant.state.kvutils.api.LedgerRecord
import com.daml.ledger.participant.state.kvutils.{OffsetBuilder, Raw}
import com.daml.ledger.participant.state.v1.Offset
import com.daml.ledger.validator.BatchingLedgerStateOperations
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
private[memory] final class InMemoryLedgerStateOperations(
final class InMemoryLedgerStateOperations(
log: InMemoryState.MutableLog,
state: InMemoryState.MutableState,
) extends BatchingLedgerStateOperations[Index] {
import InMemoryLedgerStateOperations.appendEntry
override def readState(
keys: Iterable[Raw.Key]
)(implicit executionContext: ExecutionContext): Future[Seq[Option[Raw.Value]]] =
@ -35,14 +33,10 @@ private[memory] final class InMemoryLedgerStateOperations(
executionContext: ExecutionContext
): Future[Index] =
Future.successful(appendEntry(log, LedgerRecord(_, key, value)))
}
object InMemoryLedgerStateOperations {
def apply(): InMemoryLedgerStateOperations = {
val inMemoryState = mutable.Map.empty[Raw.Key, Raw.Value]
val inMemoryLog = mutable.ArrayBuffer[LedgerRecord]()
new InMemoryLedgerStateOperations(inMemoryLog, inMemoryState)
}
private[memory] def appendEntry(log: MutableLog, createEntry: Offset => LedgerRecord): Index = {
val entryAtIndex = log.size
@ -51,4 +45,5 @@ object InMemoryLedgerStateOperations {
log += entry
entryAtIndex
}
}

View File

@ -29,7 +29,6 @@ da_scala_library(
],
deps = [
"//daml-lf/data",
"//daml-lf/engine",
"//ledger/ledger-api-health",
"//ledger/ledger-resources",
"//ledger/metrics",
@ -83,6 +82,8 @@ da_scala_binary(
"@maven//:ch_qos_logback_logback_classic",
],
deps = [
"//daml-lf/data",
"//daml-lf/engine",
"//ledger/ledger-api-health",
"//ledger/ledger-on-memory",
"//ledger/metrics",

View File

@ -3,7 +3,8 @@
package com.daml.ledger.participant.state.kvutils.tools.integritycheck
import com.daml.ledger.on.memory.{InMemoryLedgerStateOperations, Index}
import akka.stream.Materializer
import com.daml.ledger.on.memory.{InMemoryLedgerStateAccess, InMemoryState, Index}
import com.daml.ledger.participant.state.kvutils
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
DamlLogEntry,
@ -11,41 +12,68 @@ import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
DamlStateKey,
DamlStateValue,
}
import com.daml.ledger.participant.state.kvutils.export.{
NoOpLedgerDataExporter,
SubmissionInfo,
WriteSet,
}
import com.daml.ledger.participant.state.kvutils.tools.integritycheck.IntegrityChecker.rawHexString
import com.daml.ledger.participant.state.kvutils.{Envelope, Raw}
import com.daml.ledger.validator.batch.BatchedSubmissionValidatorFactory
import com.daml.ledger.validator.reading.DamlLedgerStateReader
import com.daml.ledger.validator.{CommitStrategy, StateKeySerializationStrategy}
import com.daml.ledger.participant.state.kvutils.{Envelope, KeyValueCommitting, Raw}
import com.daml.ledger.validator.StateKeySerializationStrategy
import com.daml.ledger.validator.batch.{
BatchedSubmissionValidator,
BatchedSubmissionValidatorFactory,
BatchedSubmissionValidatorParameters,
ConflictDetection,
}
import com.daml.lf.engine.Engine
import com.daml.metrics.Metrics
import scala.concurrent.ExecutionContext
import scala.concurrent.{ExecutionContext, Future}
final class LogAppendingCommitStrategySupport(metrics: Metrics)(implicit
executionContext: ExecutionContext
) extends CommitStrategySupport[Index] {
private val ledgerStateOperations =
InMemoryLedgerStateOperations()
private val writeRecordingLedgerStateOperations =
new WriteRecordingLedgerStateOperations[Index](ledgerStateOperations)
final class LogAppendingCommitStrategySupport(
metrics: Metrics
)(implicit executionContext: ExecutionContext)
extends CommitStrategySupport[Index] {
private val state = InMemoryState.empty
private val serializationStrategy = StateKeySerializationStrategy.createDefault()
private val readerAndCommitStrategy =
BatchedSubmissionValidatorFactory.readerAndCommitStrategyFrom(
writeRecordingLedgerStateOperations,
serializationStrategy,
)
private val engine = new Engine()
private val submissionValidator = BatchedSubmissionValidator[Index](
params = BatchedSubmissionValidatorParameters(cpuParallelism = 1, readParallelism = 1),
committer = new KeyValueCommitting(engine, metrics),
conflictDetection = new ConflictDetection(metrics),
metrics = metrics,
ledgerDataExporter = NoOpLedgerDataExporter,
)
override val stateKeySerializationStrategy: StateKeySerializationStrategy =
serializationStrategy
override val writeSet: QueryableWriteSet = writeRecordingLedgerStateOperations
override val ledgerStateReader: DamlLedgerStateReader = readerAndCommitStrategy._1
override val commitStrategy: CommitStrategy[Index] =
readerAndCommitStrategy._2
override def commit(
submissionInfo: SubmissionInfo
)(implicit materializer: Materializer): Future[WriteSet] = {
val access = new WriteRecordingLedgerStateAccess(new InMemoryLedgerStateAccess(state, metrics))
access.inTransaction { operations =>
val (ledgerStateReader, commitStrategy) =
BatchedSubmissionValidatorFactory.readerAndCommitStrategyFrom(
operations,
serializationStrategy,
)
submissionValidator
.validateAndCommit(
submissionInfo.submissionEnvelope,
submissionInfo.correlationId,
submissionInfo.recordTimeInstant,
submissionInfo.participantId,
ledgerStateReader,
commitStrategy,
)
.map(_ => access.getWriteSet)
}
}
override def newReadServiceFactory(): ReplayingReadServiceFactory =
new LogAppendingReadServiceFactory(metrics)

View File

@ -0,0 +1,67 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils.tools.integritycheck
import com.daml.ledger.participant.state.kvutils.Raw
import com.daml.ledger.participant.state.kvutils.export.{WriteItem, WriteSet}
import com.daml.ledger.validator.{LedgerStateAccess, LedgerStateOperations}
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
class WriteRecordingLedgerStateAccess[LogResult](delegate: LedgerStateAccess[LogResult])
extends LedgerStateAccess[LogResult] {
private val recordedWriteSet = mutable.Buffer.empty[WriteItem]
override def inTransaction[T](
body: LedgerStateOperations[LogResult] => Future[T]
)(implicit executionContext: ExecutionContext): Future[T] =
delegate.inTransaction { operations =>
body(new WriteRecordingLedgerStateAccess.Operations(recordedWriteSet, operations))
}
def getWriteSet: WriteSet = recordedWriteSet
}
object WriteRecordingLedgerStateAccess {
class Operations[LogResult](
recordedWriteSet: mutable.Buffer[WriteItem],
delegate: LedgerStateOperations[LogResult],
) extends LedgerStateOperations[LogResult] {
override def readState(
key: Raw.Key
)(implicit executionContext: ExecutionContext): Future[Option[Raw.Value]] =
delegate.readState(key)
override def readState(
keys: Iterable[Raw.Key]
)(implicit executionContext: ExecutionContext): Future[Seq[Option[Raw.Value]]] =
delegate.readState(keys)
override def writeState(
key: Raw.Key,
value: Raw.Value,
)(implicit executionContext: ExecutionContext): Future[Unit] = {
this.synchronized(recordedWriteSet.append((key, value)))
delegate.writeState(key, value)
}
override def writeState(
keyValuePairs: Iterable[Raw.KeyValuePair]
)(implicit executionContext: ExecutionContext): Future[Unit] = {
this.synchronized(recordedWriteSet.appendAll(keyValuePairs))
delegate.writeState(keyValuePairs)
}
override def appendToLog(
key: Raw.Key,
value: Raw.Value,
)(implicit executionContext: ExecutionContext): Future[LogResult] = {
this.synchronized(recordedWriteSet.append((key, value)))
delegate.appendToLog(key, value)
}
}
}

View File

@ -1,58 +0,0 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils.tools.integritycheck
import com.daml.ledger.participant.state.kvutils.Raw
import com.daml.ledger.participant.state.kvutils.export.{WriteItem, WriteSet}
import com.daml.ledger.validator.LedgerStateOperations
import scala.collection.mutable.ListBuffer
import scala.concurrent.{ExecutionContext, Future}
class WriteRecordingLedgerStateOperations[LogResult](delegate: LedgerStateOperations[LogResult])
extends LedgerStateOperations[LogResult]
with QueryableWriteSet {
private val recordedWriteSet = ListBuffer.empty[WriteItem]
override def readState(
key: Raw.Key
)(implicit executionContext: ExecutionContext): Future[Option[Raw.Value]] =
delegate.readState(key)
override def readState(
keys: Iterable[Raw.Key]
)(implicit executionContext: ExecutionContext): Future[Seq[Option[Raw.Value]]] =
delegate.readState(keys)
override def writeState(
key: Raw.Key,
value: Raw.Value,
)(implicit executionContext: ExecutionContext): Future[Unit] = {
this.synchronized(recordedWriteSet.append((key, value)))
delegate.writeState(key, value)
}
override def writeState(
keyValuePairs: Iterable[Raw.KeyValuePair]
)(implicit executionContext: ExecutionContext): Future[Unit] = {
this.synchronized(recordedWriteSet.appendAll(keyValuePairs))
delegate.writeState(keyValuePairs)
}
override def appendToLog(
key: Raw.Key,
value: Raw.Value,
)(implicit executionContext: ExecutionContext): Future[LogResult] = {
this.synchronized(recordedWriteSet.append((key, value)))
delegate.appendToLog(key, value)
}
override def getAndClearRecordedWriteSet(): WriteSet = {
this.synchronized {
val result = recordedWriteSet.result()
recordedWriteSet.clear()
result
}
}
}

View File

@ -7,7 +7,7 @@ def _integrity_test_impl(ctx):
output = wrapper,
content = """#!/usr/bin/env bash
set -eux
{checker} $(rlocation "$TEST_WORKSPACE/{dump}")
exec {checker} $(rlocation "$TEST_WORKSPACE/{dump}")
""".format(
checker = ctx.executable.checker.short_path,
dump = ctx.file.dump.short_path,

View File

@ -5,14 +5,11 @@ package com.daml.ledger.participant.state.kvutils.tools.integritycheck
import akka.stream.Materializer
import com.daml.ledger.participant.state.kvutils.Raw
import com.daml.ledger.participant.state.kvutils.export.WriteSet
import com.daml.ledger.participant.state.kvutils.export.{SubmissionInfo, WriteSet}
import com.daml.ledger.participant.state.v1.ReadService
import com.daml.ledger.validator.reading.DamlLedgerStateReader
import com.daml.ledger.validator.{CommitStrategy, StateKeySerializationStrategy}
import com.daml.ledger.validator.StateKeySerializationStrategy
trait QueryableWriteSet {
def getAndClearRecordedWriteSet(): WriteSet
}
import scala.concurrent.Future
/** A ReadService that streams back previously recorded state updates */
trait ReplayingReadService extends ReadService {
@ -29,11 +26,9 @@ trait ReplayingReadServiceFactory {
trait CommitStrategySupport[LogResult] {
def stateKeySerializationStrategy: StateKeySerializationStrategy
def ledgerStateReader: DamlLedgerStateReader
def commitStrategy: CommitStrategy[LogResult]
def writeSet: QueryableWriteSet
def commit(
submissionInfo: SubmissionInfo
)(implicit materializer: Materializer): Future[WriteSet]
def newReadServiceFactory(): ReplayingReadServiceFactory

View File

@ -11,21 +11,14 @@ import akka.stream.scaladsl.{Sink, Source}
import com.codahale.metrics.{ConsoleReporter, MetricRegistry}
import com.daml.dec.DirectExecutionContext
import com.daml.ledger.participant.state.kvutils
import com.daml.ledger.participant.state.kvutils.Raw
import com.daml.ledger.participant.state.kvutils.export.{
LedgerDataImporter,
NoOpLedgerDataExporter,
ProtobufBasedLedgerDataImporter,
WriteSet,
}
import com.daml.ledger.participant.state.kvutils.{KeyValueCommitting, Raw}
import com.daml.ledger.participant.state.v1.{ParticipantId, ReadService}
import com.daml.ledger.resources.{ResourceContext, ResourceOwner}
import com.daml.ledger.validator.batch.{
BatchedSubmissionValidator,
BatchedSubmissionValidatorParameters,
ConflictDetection,
}
import com.daml.lf.engine.Engine
import com.daml.logging.LoggingContext
import com.daml.logging.LoggingContext.newLoggingContext
import com.daml.metrics.Metrics
@ -62,16 +55,6 @@ class IntegrityChecker[LogResult](
println()
}
val engine = new Engine()
val metricRegistry = new MetricRegistry
val metrics = new Metrics(metricRegistry)
val submissionValidator = BatchedSubmissionValidator[LogResult](
params = BatchedSubmissionValidatorParameters(cpuParallelism = 1, readParallelism = 1),
committer = new KeyValueCommitting(engine, metrics),
conflictDetection = new ConflictDetection(metrics),
metrics = metrics,
ledgerDataExporter = NoOpLedgerDataExporter,
)
val expectedReadServiceFactory = commitStrategySupport.newReadServiceFactory()
val actualReadServiceFactory = commitStrategySupport.newReadServiceFactory()
val stateUpdates = new ReadServiceStateUpdateComparison(
@ -82,7 +65,6 @@ class IntegrityChecker[LogResult](
checkIntegrity(
config,
importer,
submissionValidator,
expectedReadServiceFactory,
actualReadServiceFactory,
stateUpdates,
@ -96,7 +78,6 @@ class IntegrityChecker[LogResult](
private def checkIntegrity(
config: Config,
importer: LedgerDataImporter,
submissionValidator: BatchedSubmissionValidator[LogResult],
expectedReadServiceFactory: ReplayingReadServiceFactory,
actualReadServiceFactory: ReplayingReadServiceFactory,
stateUpdates: StateUpdateComparison,
@ -108,7 +89,6 @@ class IntegrityChecker[LogResult](
for {
_ <- processSubmissions(
importer,
submissionValidator,
expectedReadServiceFactory,
actualReadServiceFactory,
config,
@ -189,7 +169,6 @@ class IntegrityChecker[LogResult](
private def processSubmissions(
importer: LedgerDataImporter,
submissionValidator: BatchedSubmissionValidator[LogResult],
expectedReadServiceFactory: ReplayingReadServiceFactory,
actualReadServiceFactory: ReplayingReadServiceFactory,
config: Config,
@ -212,15 +191,7 @@ class IntegrityChecker[LogResult](
}
expectedReadServiceFactory.appendBlock(expectedWriteSet)
if (!config.indexOnly) {
submissionValidator.validateAndCommit(
submissionInfo.submissionEnvelope,
submissionInfo.correlationId,
submissionInfo.recordTimeInstant,
submissionInfo.participantId,
commitStrategySupport.ledgerStateReader,
commitStrategySupport.commitStrategy,
) map { _ =>
val actualWriteSet = commitStrategySupport.writeSet.getAndClearRecordedWriteSet()
commitStrategySupport.commit(submissionInfo) map { actualWriteSet =>
val orderedActualWriteSet =
if (config.sortWriteSet)
actualWriteSet.sortBy(_._1)
@ -254,7 +225,6 @@ class IntegrityChecker[LogResult](
Some(s"Expected write-set of size ${expectedWriteSet.size} vs. ${actualWriteSet.size}")
}
messageMaybe.foreach { message =>
println("FAIL".red)
throw new ComparisonFailureException(message)
}
}
@ -381,15 +351,16 @@ object IntegrityChecker {
config: Config,
commitStrategySupportFactory: CommitStrategySupportFactory[LogResult],
): Unit = {
runAsync(config, commitStrategySupportFactory).failed
.foreach {
case exception: CheckFailedException =>
println(exception.getMessage.red)
sys.exit(1)
case exception =>
exception.printStackTrace()
sys.exit(1)
}(DirectExecutionContext)
runAsync(config, commitStrategySupportFactory).onComplete {
case Success(_) =>
sys.exit(0)
case Failure(exception: CheckFailedException) =>
println(exception.getMessage.red)
sys.exit(1)
case Failure(exception) =>
exception.printStackTrace()
sys.exit(1)
}(DirectExecutionContext)
}
private[integritycheck] def createIndexerConfig(config: Config): IndexerConfig =
@ -419,8 +390,5 @@ object IntegrityChecker {
val importer = ProtobufBasedLedgerDataImporter(config.exportFilePath)
new IntegrityChecker(commitStrategySupportFactory(_, executionContext))
.run(importer, config)
.andThen { case _ =>
sys.exit(0)
}(DirectExecutionContext)
}
}