kvutils: Run the commit strategies in as parallel a fashion as possible. [KVL-558] (#7520)

* kvutils: Document the deterministic nature of commit strategies.

* kvutils: Run the commit strategies in as parallel a fashion as possible.

Trees are faster than lists.

CHANGELOG_BEGIN
CHANGELOG_END

* kvutils: Move `serializeStateKey` into its own class.
This commit is contained in:
Samir Talwar 2020-09-30 11:43:03 +02:00 committed by GitHub
parent 9d0206fc44
commit 7adaa78696
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 103 additions and 62 deletions

View File

@ -16,6 +16,10 @@ import scala.concurrent.Future
/**
* Determines how we commit the results of processing a DAML submission.
*
* This must write deterministically. The output and order of writes should not vary with the same
* input, even across process runs. This also means that the implementing type must not depend on
* the order of the `inputState` and `outputState` maps.
*/
trait CommitStrategy[Result] {
def commit(

View File

@ -9,12 +9,10 @@ import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
DamlStateKey,
DamlStateValue
}
import com.daml.ledger.participant.state.kvutils.Envelope
import com.daml.ledger.participant.state.kvutils.export.SubmissionAggregator
import com.daml.ledger.participant.state.kvutils.{Envelope, `Bytes Ordering`}
import com.daml.ledger.participant.state.v1.ParticipantId
import com.daml.ledger.validator.LedgerStateOperations.{Key, Value}
import scala.collection.{SortedMap, breakOut}
import scala.concurrent.{ExecutionContext, Future}
class LogAppendingCommitStrategy[Index](
@ -22,6 +20,8 @@ class LogAppendingCommitStrategy[Index](
keySerializationStrategy: StateKeySerializationStrategy,
)(implicit executionContext: ExecutionContext)
extends CommitStrategy[Index] {
private val stateSerializationStrategy = new StateSerializationStrategy(keySerializationStrategy)
override def commit(
participantId: ParticipantId,
correlationId: String,
@ -29,28 +29,26 @@ class LogAppendingCommitStrategy[Index](
entry: DamlLogEntry,
inputState: Map[DamlStateKey, Option[DamlStateValue]],
outputState: Map[DamlStateKey, DamlStateValue],
exporterWriteSet: Option[SubmissionAggregator.WriteSetBuilder] = None,
): Future[Index] = {
val serializedKeyValuePairs: SortedMap[Key, Value] =
outputState
.map {
case (key, value) =>
(keySerializationStrategy.serializeStateKey(key), Envelope.enclose(value))
}(breakOut)
exporterWriteSet.foreach {
_ ++= serializedKeyValuePairs
}
writeSetBuilder: Option[SubmissionAggregator.WriteSetBuilder] = None,
): Future[Index] =
for {
_ <- if (serializedKeyValuePairs.nonEmpty) {
ledgerStateOperations.writeState(serializedKeyValuePairs)
} else {
Future.unit
}
envelopedLogEntry = Envelope.enclose(entry)
_ = exporterWriteSet.foreach {
_ += entryId.toByteString -> envelopedLogEntry
}
index <- ledgerStateOperations.appendToLog(entryId.toByteString, envelopedLogEntry)
(serializedKeyValuePairs, envelopedLogEntry) <- inParallel(
Future(stateSerializationStrategy.serializeState(outputState)),
Future(Envelope.enclose(entry)),
)
(_, _, index) <- inParallel(
if (serializedKeyValuePairs.nonEmpty) {
ledgerStateOperations.writeState(serializedKeyValuePairs)
} else {
Future.unit
},
Future {
writeSetBuilder.foreach { builder =>
builder ++= serializedKeyValuePairs
builder += entryId.toByteString -> envelopedLogEntry
}
},
ledgerStateOperations.appendToLog(entryId.toByteString, envelopedLogEntry),
)
} yield index
}
}

View File

@ -3,17 +3,16 @@
package com.daml.ledger.validator
import com.daml.ledger.participant.state.kvutils.Bytes
import com.daml.ledger.participant.state.kvutils.DamlKvutils.DamlStateKey
import com.google.protobuf.ByteString
import com.daml.ledger.validator.LedgerStateOperations.Key
/**
* Determines how we namespace and serialize state keys.
*/
trait StateKeySerializationStrategy {
def serializeStateKey(key: DamlStateKey): ByteString
def serializeStateKey(key: DamlStateKey): Key
def deserializeStateKey(input: Bytes): DamlStateKey
def deserializeStateKey(input: Key): DamlStateKey
}
object StateKeySerializationStrategy {

View File

@ -0,0 +1,18 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.validator
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue}
import com.daml.ledger.participant.state.kvutils.{Envelope, `Bytes Ordering`}
import com.daml.ledger.validator.LedgerStateOperations.{Key, Value}
import scala.collection.{SortedMap, breakOut}
final class StateSerializationStrategy(keyStrategy: StateKeySerializationStrategy) {
def serializeState(state: Map[DamlStateKey, DamlStateValue]): SortedMap[Key, Value] =
state.map {
case (key, value) =>
(keyStrategy.serializeStateKey(key), Envelope.enclose(value))
}(breakOut)
}

View File

@ -6,7 +6,7 @@ package com.daml.ledger
import com.daml.ledger.participant.state.kvutils.{Bytes, CorrelationId}
import com.daml.ledger.participant.state.v1.{ParticipantId, SubmissionResult}
import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, Future}
package object validator {
type SubmissionEnvelope = Bytes
@ -20,4 +20,29 @@ package object validator {
SubmissionEnvelope,
SubmittingParticipantId,
) => Future[SubmissionResult]
// At some point, someone much smarter than the author of this code will reimplement the usages of
// `inParallel` using Cats or Scalaz.
//
// Until then, these will do. They're not public because they're not part of the API.
private[validator] def inParallel[A, B](
aFuture: Future[A],
bFuture: Future[B],
)(implicit executionContext: ExecutionContext): Future[(A, B)] =
for {
a <- aFuture
b <- bFuture
} yield (a, b)
private[validator] def inParallel[A, B, C](
aFuture: Future[A],
bFuture: Future[B],
cFuture: Future[C],
)(implicit executionContext: ExecutionContext): Future[(A, B, C)] =
for {
a <- aFuture
b <- bFuture
c <- cFuture
} yield (a, b, c)
}

View File

@ -4,45 +4,42 @@
package com.daml.ledger.validator.preexecution
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlLogEntry, DamlLogEntryId}
import com.daml.ledger.participant.state.kvutils.{
Bytes,
DamlKvutils,
Envelope,
KeyValueCommitting,
`Bytes Ordering`
}
import com.daml.ledger.participant.state.kvutils.{Bytes, DamlKvutils, Envelope, KeyValueCommitting}
import com.daml.ledger.participant.state.v1.ParticipantId
import com.daml.ledger.validator.LedgerStateOperations.{Key, Value}
import com.daml.ledger.validator.StateKeySerializationStrategy
import com.daml.ledger.validator.{
StateKeySerializationStrategy,
StateSerializationStrategy,
inParallel
}
import com.google.protobuf.ByteString
import scala.collection.{SortedMap, breakOut}
import scala.concurrent.{ExecutionContext, Future}
class LogAppenderPreExecutingCommitStrategy(keySerializationStrategy: StateKeySerializationStrategy)
extends PreExecutingCommitStrategy[RawKeyValuePairsWithLogEntry] {
final class LogAppenderPreExecutingCommitStrategy(
keySerializationStrategy: StateKeySerializationStrategy,
) extends PreExecutingCommitStrategy[RawKeyValuePairsWithLogEntry] {
private val stateSerializationStrategy = new StateSerializationStrategy(keySerializationStrategy)
override def generateWriteSets(
participantId: ParticipantId,
entryId: DamlLogEntryId,
logEntryId: DamlLogEntryId,
inputState: Map[DamlKvutils.DamlStateKey, Option[DamlKvutils.DamlStateValue]],
preExecutionResult: KeyValueCommitting.PreExecutionResult,
)(implicit executionContext: ExecutionContext)
: Future[PreExecutionCommitResult[RawKeyValuePairsWithLogEntry]] = {
val serializedSuccessKeyValuePairs: SortedMap[Key, Value] =
preExecutionResult.stateUpdates
.map {
case (key, value) =>
(keySerializationStrategy.serializeStateKey(key), Envelope.enclose(value))
}(breakOut)
val serializedLogEntryId = entryId.toByteString
for {
serializedSuccessLogEntryPair <- logEntryToKeyValuePairs(
serializedLogEntryId,
preExecutionResult.successfulLogEntry)
serializedOutOfTimeBoundsLogEntryPair <- logEntryToKeyValuePairs(
serializedLogEntryId,
preExecutionResult.outOfTimeBoundsLogEntry)
(
serializedSuccessKeyValuePairs,
(serializedSuccessLogEntryPair, serializedOutOfTimeBoundsLogEntryPair),
) <- inParallel(
Future(stateSerializationStrategy.serializeState(preExecutionResult.stateUpdates)),
Future(logEntryId.toByteString).flatMap(
serializedId =>
inParallel(
logEntryToKeyValuePairs(serializedId, preExecutionResult.successfulLogEntry),
logEntryToKeyValuePairs(serializedId, preExecutionResult.outOfTimeBoundsLogEntry),
)),
)
} yield
PreExecutionCommitResult(
successWriteSet = RawKeyValuePairsWithLogEntry(
@ -53,17 +50,17 @@ class LogAppenderPreExecutingCommitStrategy(keySerializationStrategy: StateKeySe
outOfTimeBoundsWriteSet = RawKeyValuePairsWithLogEntry(
Seq.empty,
serializedOutOfTimeBoundsLogEntryPair._1,
serializedOutOfTimeBoundsLogEntryPair._2),
serializedOutOfTimeBoundsLogEntryPair._2,
),
// We assume updates for a successful transaction must be visible to every participant for
// public ledgers.
involvedParticipants = Set.empty
involvedParticipants = Set.empty,
)
}
private def logEntryToKeyValuePairs(
logEntryId: ByteString,
logEntry: DamlLogEntry,
)(implicit executionContext: ExecutionContext): Future[(Bytes, Bytes)] = Future {
logEntryId -> Envelope.enclose(logEntry)
}
)(implicit executionContext: ExecutionContext): Future[(Bytes, Bytes)] =
Future(logEntryId -> Envelope.enclose(logEntry))
}

View File

@ -17,7 +17,7 @@ import org.scalatest.{AsyncWordSpec, Matchers}
import scala.concurrent.{ExecutionContext, Future}
class LogAppendingCommitStrategySpec extends AsyncWordSpec with Matchers with MockitoSugar {
final class LogAppendingCommitStrategySpec extends AsyncWordSpec with Matchers with MockitoSugar {
"commit" should {
"return index from appendToLog" in {
val mockLedgerStateOperations = mock[LedgerStateOperations[Long]]