mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 01:07:18 +03:00
kvutils & ledger-on-sql: Avoid a race condition in dispatching. (#4546)
* kvutils/ledger-on-sql: Avoid a race condition in dispatching. This changes the API of kvutils to allow for passing data out of the transaction, which makes it much easier to ensure the new head makes it to the dispatcher. Previously, we would use an `AtomicLong` to communicate the data, but this was problematic because values could arrive out in the wrong order. For example: - log index 5 is committed - `head` is updated to 6 - the dispatcher is signalled with a head of 6 - log index 6 is committed - log index 7 is committed - `head` is updated to 8 - `head` is updated to 7 - the dispatcher is signalled with a head of 7 In this scenario, we would have to wait until a new commit comes in before the indexer finds out about log index 7. * kvutils: Just return an `Either`from `SubmissionValidator`. It was either that or introduce yet another type to split `SubmissionValidated` into two. CHANGELOG_BEGIN CHANGELOG_END * kvutils: Make ValidationFailed extend NoStackTrace.
This commit is contained in:
parent
ab74291f0a
commit
b203ccaade
@ -14,11 +14,7 @@ import com.daml.ledger.participant.state.kvutils.api.{LedgerReader, LedgerRecord
|
||||
import com.daml.ledger.participant.state.kvutils.{KeyValueCommitting, SequentialLogEntryId}
|
||||
import com.daml.ledger.participant.state.v1._
|
||||
import com.daml.ledger.validator.LedgerStateOperations.{Key, Value}
|
||||
import com.daml.ledger.validator.ValidationResult.{
|
||||
MissingInputState,
|
||||
SubmissionValidated,
|
||||
ValidationError
|
||||
}
|
||||
import com.daml.ledger.validator.ValidationFailed.{MissingInputState, ValidationError}
|
||||
import com.daml.ledger.validator._
|
||||
import com.digitalasset.daml.lf.data.Time.Timestamp
|
||||
import com.digitalasset.ledger.api.health.{HealthStatus, Healthy}
|
||||
@ -58,8 +54,8 @@ final class InMemoryLedgerReaderWriter(
|
||||
private val validator =
|
||||
SubmissionValidator.create(InMemoryLedgerStateAccess, () => sequentialLogEntryId.next())
|
||||
|
||||
private object InMemoryLedgerStateAccess extends LedgerStateAccess {
|
||||
override def inTransaction[T](body: LedgerStateOperations => Future[T]): Future[T] =
|
||||
private object InMemoryLedgerStateAccess extends LedgerStateAccess[Index] {
|
||||
override def inTransaction[T](body: LedgerStateOperations[Index] => Future[T]): Future[T] =
|
||||
Future
|
||||
.successful(lockCurrentState.acquire())
|
||||
.flatMap(_ => body(InMemoryLedgerStateOperations))
|
||||
@ -69,7 +65,7 @@ final class InMemoryLedgerReaderWriter(
|
||||
}
|
||||
}
|
||||
|
||||
private object InMemoryLedgerStateOperations extends BatchingLedgerStateOperations {
|
||||
private object InMemoryLedgerStateOperations extends BatchingLedgerStateOperations[Index] {
|
||||
override def readState(keys: Seq[Key]): Future[Seq[Option[Value]]] =
|
||||
Future.successful {
|
||||
keys.map(keyBytes => currentState.state.get(ByteString.copyFrom(keyBytes)))
|
||||
@ -82,15 +78,14 @@ final class InMemoryLedgerReaderWriter(
|
||||
}
|
||||
}
|
||||
|
||||
override def appendToLog(key: Key, value: Value): Future[Unit] =
|
||||
override def appendToLog(key: Key, value: Value): Future[Index] =
|
||||
Future.successful {
|
||||
val damlLogEntryId = KeyValueCommitting.unpackDamlLogEntryId(key)
|
||||
val logEntry = LogEntry(damlLogEntryId, value)
|
||||
val newHead = currentState.log.synchronized {
|
||||
currentState.log.synchronized {
|
||||
currentState.log += logEntry
|
||||
currentState.log.size
|
||||
}
|
||||
dispatcher.signalNewHead(newHead)
|
||||
}
|
||||
}
|
||||
|
||||
@ -98,9 +93,13 @@ final class InMemoryLedgerReaderWriter(
|
||||
validator
|
||||
.validateAndCommit(envelope, correlationId, currentRecordTime(), participantId)
|
||||
.map {
|
||||
case SubmissionValidated => SubmissionResult.Acknowledged
|
||||
case MissingInputState(_) => SubmissionResult.InternalError("Missing input state")
|
||||
case ValidationError(reason) => SubmissionResult.InternalError(reason)
|
||||
case Right(newHead) =>
|
||||
dispatcher.signalNewHead(newHead)
|
||||
SubmissionResult.Acknowledged
|
||||
case Left(MissingInputState(_)) =>
|
||||
SubmissionResult.InternalError("Missing input state")
|
||||
case Left(ValidationError(reason)) =>
|
||||
SubmissionResult.InternalError(reason)
|
||||
}
|
||||
|
||||
override def events(offset: Option[Offset]): Source[LedgerRecord, NotUsed] =
|
||||
|
@ -6,7 +6,6 @@ package com.daml.ledger.on.sql
|
||||
import java.sql.Connection
|
||||
import java.time.Clock
|
||||
import java.util.UUID
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.stream.Materializer
|
||||
@ -15,11 +14,7 @@ import com.daml.ledger.on.sql.SqlLedgerReaderWriter._
|
||||
import com.daml.ledger.participant.state.kvutils.api.{LedgerReader, LedgerRecord, LedgerWriter}
|
||||
import com.daml.ledger.participant.state.v1._
|
||||
import com.daml.ledger.validator.LedgerStateOperations.{Key, Value}
|
||||
import com.daml.ledger.validator.ValidationResult.{
|
||||
MissingInputState,
|
||||
SubmissionValidated,
|
||||
ValidationError
|
||||
}
|
||||
import com.daml.ledger.validator.ValidationFailed.{MissingInputState, ValidationError}
|
||||
import com.daml.ledger.validator.{
|
||||
BatchingLedgerStateOperations,
|
||||
LedgerStateAccess,
|
||||
@ -37,7 +32,6 @@ import com.digitalasset.resources.ResourceOwner
|
||||
|
||||
import scala.collection.immutable.TreeSet
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.util.Success
|
||||
|
||||
class SqlLedgerReaderWriter(
|
||||
override val ledgerId: LedgerId = Ref.LedgerString.assertFromString(UUID.randomUUID.toString),
|
||||
@ -55,8 +49,6 @@ class SqlLedgerReaderWriter(
|
||||
|
||||
private val validator = SubmissionValidator.create(SqlLedgerStateAccess)
|
||||
|
||||
private val head = new AtomicLong(dispatcher.getHead())
|
||||
|
||||
// TODO: implement
|
||||
override def currentHealth(): HealthStatus = Healthy
|
||||
|
||||
@ -90,12 +82,13 @@ class SqlLedgerReaderWriter(
|
||||
validator
|
||||
.validateAndCommit(envelope, correlationId, currentRecordTime(), participantId)
|
||||
.map {
|
||||
case SubmissionValidated =>
|
||||
case Right(latestSequenceNo) =>
|
||||
dispatcher.signalNewHead(latestSequenceNo + 1)
|
||||
SubmissionResult.Acknowledged
|
||||
case MissingInputState(keys) =>
|
||||
case Left(MissingInputState(keys)) =>
|
||||
SubmissionResult.InternalError(
|
||||
s"Missing input state: ${keys.map(_.map("%02x".format(_)).mkString).mkString(", ")}")
|
||||
case ValidationError(reason) =>
|
||||
case Left(ValidationError(reason)) =>
|
||||
SubmissionResult.InternalError(reason)
|
||||
}
|
||||
}
|
||||
@ -103,31 +96,23 @@ class SqlLedgerReaderWriter(
|
||||
private def currentRecordTime(): Timestamp =
|
||||
Timestamp.assertFromInstant(Clock.systemUTC().instant())
|
||||
|
||||
object SqlLedgerStateAccess extends LedgerStateAccess {
|
||||
override def inTransaction[T](body: LedgerStateOperations => Future[T]): Future[T] =
|
||||
database
|
||||
.inWriteTransaction("Committing a submission") { implicit connection =>
|
||||
body(new SqlLedgerStateOperations)
|
||||
}
|
||||
.andThen {
|
||||
case Success(_) =>
|
||||
dispatcher.signalNewHead(head.get())
|
||||
}
|
||||
object SqlLedgerStateAccess extends LedgerStateAccess[Index] {
|
||||
override def inTransaction[T](body: LedgerStateOperations[Index] => Future[T]): Future[T] =
|
||||
database.inWriteTransaction("Committing a submission") { implicit connection =>
|
||||
body(new SqlLedgerStateOperations)
|
||||
}
|
||||
}
|
||||
|
||||
class SqlLedgerStateOperations(implicit connection: Connection)
|
||||
extends BatchingLedgerStateOperations {
|
||||
extends BatchingLedgerStateOperations[Index] {
|
||||
override def readState(keys: Seq[Key]): Future[Seq[Option[Value]]] =
|
||||
Future.successful(queries.selectStateValuesByKeys(keys))
|
||||
|
||||
override def writeState(keyValuePairs: Seq[(Key, Value)]): Future[Unit] =
|
||||
Future.successful(queries.updateState(keyValuePairs))
|
||||
|
||||
override def appendToLog(key: Key, value: Value): Future[Unit] = {
|
||||
val latestSequenceNo = queries.insertIntoLog(key, value)
|
||||
head.set(latestSequenceNo + 1)
|
||||
Future.unit
|
||||
}
|
||||
override def appendToLog(key: Key, value: Value): Future[Index] =
|
||||
Future.successful(queries.insertIntoLog(key, value))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -7,17 +7,18 @@ import com.daml.ledger.validator.LedgerStateOperations._
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
||||
trait LedgerStateAccess {
|
||||
trait LedgerStateAccess[LogResult] {
|
||||
|
||||
/**
|
||||
* Performs read and write operations on the backing store in a single atomic transaction.
|
||||
*
|
||||
* @param body operations to perform
|
||||
* @tparam T type of result returned after execution
|
||||
* @tparam T return type of the body
|
||||
*/
|
||||
def inTransaction[T](body: LedgerStateOperations => Future[T]): Future[T]
|
||||
def inTransaction[T](body: LedgerStateOperations[LogResult] => Future[T]): Future[T]
|
||||
}
|
||||
|
||||
trait LedgerStateOperations {
|
||||
trait LedgerStateOperations[LogResult] {
|
||||
|
||||
/**
|
||||
* Reads value of a single key from the backing store.
|
||||
@ -47,14 +48,14 @@ trait LedgerStateOperations {
|
||||
* Writes a single log entry to the backing store. The implementation may return Future.failed in case the key
|
||||
* (i.e., the log entry ID) already exists.
|
||||
*/
|
||||
def appendToLog(key: Key, value: Value): Future[Unit]
|
||||
def appendToLog(key: Key, value: Value): Future[LogResult]
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements non-batching read and write operations on the backing store based on batched implementations.
|
||||
*/
|
||||
abstract class BatchingLedgerStateOperations(implicit executionContext: ExecutionContext)
|
||||
extends LedgerStateOperations {
|
||||
abstract class BatchingLedgerStateOperations[LogResult](implicit executionContext: ExecutionContext)
|
||||
extends LedgerStateOperations[LogResult] {
|
||||
override final def readState(key: Key): Future[Option[Value]] =
|
||||
readState(Seq(key)).map(_.head)
|
||||
|
||||
@ -65,8 +66,9 @@ abstract class BatchingLedgerStateOperations(implicit executionContext: Executio
|
||||
/**
|
||||
* Implements batching read and write operations on the backing store based on non-batched implementations.
|
||||
*/
|
||||
abstract class NonBatchingLedgerStateOperations(implicit executionContext: ExecutionContext)
|
||||
extends LedgerStateOperations {
|
||||
abstract class NonBatchingLedgerStateOperations[LogResult](
|
||||
implicit executionContext: ExecutionContext
|
||||
) extends LedgerStateOperations[LogResult] {
|
||||
override final def readState(keys: Seq[Key]): Future[Seq[Option[Value]]] =
|
||||
Future.sequence(keys.map(readState))
|
||||
|
||||
|
@ -9,14 +9,8 @@ import com.daml.ledger.participant.state.kvutils.DamlKvutils._
|
||||
import com.daml.ledger.participant.state.kvutils.api.LedgerReader
|
||||
import com.daml.ledger.participant.state.kvutils.{Envelope, KeyValueCommitting}
|
||||
import com.daml.ledger.participant.state.v1.ParticipantId
|
||||
import com.daml.ledger.validator.SubmissionValidator.LogEntryAndState
|
||||
import com.daml.ledger.validator.ValidationResult.{
|
||||
MissingInputState,
|
||||
SubmissionValidated,
|
||||
TransformedSubmission,
|
||||
ValidationError,
|
||||
ValidationFailed
|
||||
}
|
||||
import com.daml.ledger.validator.SubmissionValidator._
|
||||
import com.daml.ledger.validator.ValidationFailed.{MissingInputState, ValidationError}
|
||||
import com.digitalasset.daml.lf.data.Time.Timestamp
|
||||
import com.digitalasset.daml.lf.engine.Engine
|
||||
import com.google.protobuf.ByteString
|
||||
@ -33,52 +27,48 @@ import scala.util.{Failure, Success, Try}
|
||||
* @param allocateLogEntryId defines how new log entry IDs are being generated
|
||||
* @param executionContext ExecutionContext to use when performing ledger state reads/writes
|
||||
*/
|
||||
class SubmissionValidator(
|
||||
ledgerStateAccess: LedgerStateAccess,
|
||||
class SubmissionValidator[LogResult](
|
||||
ledgerStateAccess: LedgerStateAccess[LogResult],
|
||||
processSubmission: (
|
||||
DamlLogEntryId,
|
||||
Timestamp,
|
||||
DamlSubmission,
|
||||
ParticipantId,
|
||||
Map[DamlStateKey, Option[DamlStateValue]]) => LogEntryAndState,
|
||||
Map[DamlStateKey, Option[DamlStateValue]],
|
||||
) => LogEntryAndState,
|
||||
allocateLogEntryId: () => DamlLogEntryId,
|
||||
checkForMissingInputs: Boolean = false)(implicit executionContext: ExecutionContext) {
|
||||
|
||||
import SubmissionValidator._
|
||||
checkForMissingInputs: Boolean = false,
|
||||
)(implicit executionContext: ExecutionContext) {
|
||||
|
||||
def validate(
|
||||
envelope: RawBytes,
|
||||
correlationId: String,
|
||||
recordTime: Timestamp,
|
||||
participantId: ParticipantId): Future[ValidationResult] =
|
||||
participantId: ParticipantId,
|
||||
): Future[Either[ValidationFailed, Unit]] =
|
||||
runValidation(envelope, correlationId, recordTime, participantId, (_, _, _, _) => Future.unit)
|
||||
.map {
|
||||
case Left(failure) => failure
|
||||
case Right(_) => SubmissionValidated
|
||||
}
|
||||
|
||||
def validateAndCommit(
|
||||
envelope: RawBytes,
|
||||
correlationId: String,
|
||||
recordTime: Timestamp,
|
||||
participantId: ParticipantId): Future[ValidationResult] =
|
||||
runValidation(envelope, correlationId, recordTime, participantId, commit).map {
|
||||
case Left(failure) => failure
|
||||
case Right(_) => SubmissionValidated
|
||||
}
|
||||
participantId: ParticipantId,
|
||||
): Future[Either[ValidationFailed, LogResult]] =
|
||||
runValidation(envelope, correlationId, recordTime, participantId, commit)
|
||||
|
||||
def validateAndTransform[T](
|
||||
def validateAndTransform[U](
|
||||
envelope: RawBytes,
|
||||
correlationId: String,
|
||||
recordTime: Timestamp,
|
||||
participantId: ParticipantId,
|
||||
transform: (DamlLogEntryId, StateMap, LogEntryAndState) => T)
|
||||
: Future[Either[ValidationFailed, TransformedSubmission[T]]] = {
|
||||
transform: (DamlLogEntryId, StateMap, LogEntryAndState) => U
|
||||
): Future[Either[ValidationFailed, U]] = {
|
||||
def applyTransformation(
|
||||
logEntryId: DamlLogEntryId,
|
||||
inputStates: StateMap,
|
||||
logEntryAndState: LogEntryAndState,
|
||||
stateOperations: LedgerStateOperations): Future[T] =
|
||||
stateOperations: LedgerStateOperations[LogResult],
|
||||
): Future[U] =
|
||||
Future.successful(transform(logEntryId, inputStates, logEntryAndState))
|
||||
|
||||
runValidation(envelope, correlationId, recordTime, participantId, applyTransformation)
|
||||
@ -88,20 +78,19 @@ class SubmissionValidator(
|
||||
logEntryId: DamlLogEntryId,
|
||||
ignored: StateMap,
|
||||
logEntryAndState: LogEntryAndState,
|
||||
stateOperations: LedgerStateOperations): Future[Unit] = {
|
||||
stateOperations: LedgerStateOperations[LogResult],
|
||||
): Future[LogResult] = {
|
||||
val (rawLogEntry, rawStateUpdates) = serializeProcessedSubmission(logEntryAndState)
|
||||
Future
|
||||
.sequence(
|
||||
Seq(
|
||||
stateOperations.appendToLog(logEntryId.toByteArray, rawLogEntry),
|
||||
if (rawStateUpdates.nonEmpty) {
|
||||
stateOperations.writeState(rawStateUpdates)
|
||||
} else {
|
||||
Future.unit
|
||||
}
|
||||
)
|
||||
)
|
||||
.map(_ => ())
|
||||
val eventualLogResult = stateOperations.appendToLog(logEntryId.toByteArray, rawLogEntry)
|
||||
val eventualStateResult =
|
||||
if (rawStateUpdates.nonEmpty)
|
||||
stateOperations.writeState(rawStateUpdates)
|
||||
else
|
||||
Future.unit
|
||||
for {
|
||||
logResult <- eventualLogResult
|
||||
_ <- eventualStateResult
|
||||
} yield logResult
|
||||
}
|
||||
|
||||
@SuppressWarnings(Array("org.wartremover.warts.Product", "org.wartremover.warts.Serializable"))
|
||||
@ -114,8 +103,9 @@ class SubmissionValidator(
|
||||
DamlLogEntryId,
|
||||
StateMap,
|
||||
LogEntryAndState,
|
||||
LedgerStateOperations) => Future[T])
|
||||
: Future[Either[ValidationFailed, TransformedSubmission[T]]] =
|
||||
LedgerStateOperations[LogResult],
|
||||
) => Future[T],
|
||||
): Future[Either[ValidationFailed, T]] =
|
||||
Envelope.open(envelope) match {
|
||||
case Right(Envelope.SubmissionMessage(submission)) =>
|
||||
val declaredInputs = submission.getInputDamlStateList.asScala
|
||||
@ -144,7 +134,7 @@ class SubmissionValidator(
|
||||
} yield result
|
||||
result.transform {
|
||||
case Success(result) =>
|
||||
Success(Right(TransformedSubmission(result)))
|
||||
Success(Right(result))
|
||||
case Failure(exception: ValidationFailed) =>
|
||||
Success(Left(exception))
|
||||
case Failure(exception) =>
|
||||
@ -170,25 +160,26 @@ object SubmissionValidator {
|
||||
type StateMap = Map[DamlStateKey, DamlStateValue]
|
||||
type LogEntryAndState = (DamlLogEntry, StateMap)
|
||||
|
||||
def create(
|
||||
ledgerStateAccess: LedgerStateAccess,
|
||||
private lazy val engine = Engine()
|
||||
|
||||
def create[LogResult](
|
||||
ledgerStateAccess: LedgerStateAccess[LogResult],
|
||||
allocateNextLogEntryId: () => DamlLogEntryId = () => allocateRandomLogEntryId(),
|
||||
checkForMissingInputs: Boolean = false)(
|
||||
implicit executionContext: ExecutionContext): SubmissionValidator = {
|
||||
checkForMissingInputs: Boolean = false,
|
||||
)(implicit executionContext: ExecutionContext): SubmissionValidator[LogResult] = {
|
||||
new SubmissionValidator(
|
||||
ledgerStateAccess,
|
||||
processSubmission,
|
||||
allocateNextLogEntryId,
|
||||
checkForMissingInputs)
|
||||
checkForMissingInputs,
|
||||
)
|
||||
}
|
||||
|
||||
def allocateRandomLogEntryId(): DamlLogEntryId =
|
||||
private[validator] def allocateRandomLogEntryId(): DamlLogEntryId =
|
||||
DamlLogEntryId.newBuilder
|
||||
.setEntryId(ByteString.copyFromUtf8(UUID.randomUUID().toString))
|
||||
.build()
|
||||
|
||||
private lazy val engine = Engine()
|
||||
|
||||
private[validator] def processSubmission(
|
||||
damlLogEntryId: DamlLogEntryId,
|
||||
recordTime: Timestamp,
|
||||
|
@ -3,18 +3,14 @@
|
||||
|
||||
package com.daml.ledger.validator
|
||||
|
||||
sealed trait ValidationResult
|
||||
import scala.util.control.NoStackTrace
|
||||
|
||||
object ValidationResult {
|
||||
sealed trait ValidationFailed extends RuntimeException with NoStackTrace
|
||||
|
||||
case object SubmissionValidated extends ValidationResult
|
||||
|
||||
sealed trait ValidationFailed extends RuntimeException with ValidationResult
|
||||
object ValidationFailed {
|
||||
|
||||
final case class MissingInputState(keys: Seq[Array[Byte]]) extends ValidationFailed
|
||||
|
||||
final case class ValidationError(reason: String) extends ValidationFailed
|
||||
|
||||
final case class TransformedSubmission[T](value: T)
|
||||
|
||||
}
|
@ -9,61 +9,60 @@ import com.daml.ledger.participant.state.kvutils.DamlKvutils._
|
||||
import com.daml.ledger.participant.state.kvutils.Envelope
|
||||
import com.daml.ledger.participant.state.v1.ParticipantId
|
||||
import com.daml.ledger.validator.SubmissionValidator.{LogEntryAndState, RawBytes, RawKeyValuePairs}
|
||||
import com.daml.ledger.validator.ValidationResult.{
|
||||
MissingInputState,
|
||||
SubmissionValidated,
|
||||
ValidationError
|
||||
}
|
||||
import com.daml.ledger.validator.ValidationFailed.{MissingInputState, ValidationError}
|
||||
import com.digitalasset.daml.lf.data.Time.Timestamp
|
||||
import com.google.protobuf.{ByteString, Empty}
|
||||
import org.mockito.ArgumentCaptor
|
||||
import org.mockito.ArgumentMatchers._
|
||||
import org.mockito.Mockito.{times, verify, when}
|
||||
import org.scalatest.mockito.MockitoSugar
|
||||
import org.scalatest.{AsyncWordSpec, Matchers}
|
||||
import org.scalatest.{AsyncWordSpec, Inside, Matchers}
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.util.Try
|
||||
|
||||
class SubmissionValidatorSpec extends AsyncWordSpec with MockitoSugar with Matchers {
|
||||
class SubmissionValidatorSpec extends AsyncWordSpec with Matchers with MockitoSugar with Inside {
|
||||
"validate" should {
|
||||
"return success in case of no errors during processing of submission" in {
|
||||
val mockStateOperations = mock[LedgerStateOperations]
|
||||
val mockStateOperations = mock[LedgerStateOperations[Unit]]
|
||||
when(mockStateOperations.readState(any[Seq[RawBytes]]()))
|
||||
.thenReturn(Future.successful(Seq(Some(aStateValue()))))
|
||||
val instance = SubmissionValidator.create(new FakeStateAccess(mockStateOperations))
|
||||
instance.validate(anEnvelope(), "aCorrelationId", newRecordTime(), aParticipantId()).map {
|
||||
case SubmissionValidated => succeed
|
||||
case _ => fail
|
||||
inside(_) {
|
||||
case Right(_) => succeed
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"signal missing input in case state cannot be retrieved" in {
|
||||
val mockStateOperations = mock[LedgerStateOperations]
|
||||
val mockStateOperations = mock[LedgerStateOperations[Unit]]
|
||||
when(mockStateOperations.readState(any[Seq[RawBytes]]()))
|
||||
.thenReturn(Future.successful(Seq(None)))
|
||||
val instance = SubmissionValidator.create(
|
||||
ledgerStateAccess = new FakeStateAccess(mockStateOperations),
|
||||
checkForMissingInputs = true)
|
||||
instance.validate(anEnvelope(), "aCorrelationId", newRecordTime(), aParticipantId()).map {
|
||||
case MissingInputState(keys) => keys should have size 1
|
||||
case _ => fail
|
||||
inside(_) {
|
||||
case Left(MissingInputState(keys)) => keys should have size 1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"return invalid submission for invalid envelope" in {
|
||||
val mockStateOperations = mock[LedgerStateOperations]
|
||||
val mockStateOperations = mock[LedgerStateOperations[Unit]]
|
||||
val instance = SubmissionValidator.create(new FakeStateAccess(mockStateOperations))
|
||||
instance
|
||||
.validate(Array[Byte](1, 2, 3), "aCorrelationId", newRecordTime(), aParticipantId())
|
||||
.map {
|
||||
case ValidationError(reason) => reason should include("Failed to parse")
|
||||
case _ => fail
|
||||
inside(_) {
|
||||
case Left(ValidationError(reason)) => reason should include("Failed to parse")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"return invalid submission in case exception is thrown during processing of submission" in {
|
||||
val mockStateOperations = mock[BatchingLedgerStateOperations]
|
||||
val mockStateOperations = mock[BatchingLedgerStateOperations[Unit]]
|
||||
when(mockStateOperations.readState(any[Seq[RawBytes]]()))
|
||||
.thenReturn(Future.successful(Seq(Some(aStateValue()))))
|
||||
|
||||
@ -81,22 +80,24 @@ class SubmissionValidatorSpec extends AsyncWordSpec with MockitoSugar with Match
|
||||
failingProcessSubmission,
|
||||
() => aLogEntryId())
|
||||
instance.validate(anEnvelope(), "aCorrelationId", newRecordTime(), aParticipantId()).map {
|
||||
case ValidationError(reason) => reason should include("Validation failed")
|
||||
case _ => fail
|
||||
inside(_) {
|
||||
case Left(ValidationError(reason)) => reason should include("Validation failed")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"validateAndCommit" should {
|
||||
"write marshalled log entry to ledger" in {
|
||||
val mockStateOperations = mock[LedgerStateOperations]
|
||||
val mockStateOperations = mock[LedgerStateOperations[Int]]
|
||||
val expectedLogResult: Int = 3
|
||||
when(mockStateOperations.readState(any[Seq[RawBytes]]()))
|
||||
.thenReturn(Future.successful(Seq(Some(aStateValue()))))
|
||||
val logEntryValueCaptor = ArgumentCaptor.forClass(classOf[RawBytes])
|
||||
val logEntryIdCaptor = ArgumentCaptor.forClass(classOf[RawBytes])
|
||||
when(
|
||||
mockStateOperations.appendToLog(logEntryIdCaptor.capture(), logEntryValueCaptor.capture()))
|
||||
.thenReturn(Future.successful(()))
|
||||
.thenReturn(Future.successful(expectedLogResult))
|
||||
val expectedLogEntryId = aLogEntryId()
|
||||
val mockLogEntryIdGenerator = mockFunctionReturning(expectedLogEntryId)
|
||||
val instance = new SubmissionValidator(
|
||||
@ -106,24 +107,27 @@ class SubmissionValidatorSpec extends AsyncWordSpec with MockitoSugar with Match
|
||||
instance
|
||||
.validateAndCommit(anEnvelope(), "aCorrelationId", newRecordTime(), aParticipantId())
|
||||
.map {
|
||||
case SubmissionValidated =>
|
||||
verify(mockLogEntryIdGenerator, times(1)).apply()
|
||||
verify(mockStateOperations, times(0)).writeState(any[RawKeyValuePairs]())
|
||||
logEntryValueCaptor.getAllValues should have size 1
|
||||
logEntryIdCaptor.getAllValues should have size 1
|
||||
val actualLogEntryIdBytes = ByteString
|
||||
.copyFrom(logEntryIdCaptor.getValue.asInstanceOf[RawBytes])
|
||||
val expectedLogEntryIdBytes = ByteString.copyFrom(expectedLogEntryId.toByteArray)
|
||||
actualLogEntryIdBytes should be(expectedLogEntryIdBytes)
|
||||
ByteString
|
||||
.copyFrom(logEntryValueCaptor.getValue.asInstanceOf[RawBytes]) should not equal ByteString
|
||||
.copyFrom(logEntryIdCaptor.getValue.asInstanceOf[RawBytes])
|
||||
case _ => fail
|
||||
inside(_) {
|
||||
case Right(actualLogResult) =>
|
||||
actualLogResult should be(expectedLogResult)
|
||||
verify(mockLogEntryIdGenerator, times(1)).apply()
|
||||
verify(mockStateOperations, times(0)).writeState(any[RawKeyValuePairs]())
|
||||
logEntryValueCaptor.getAllValues should have size 1
|
||||
logEntryIdCaptor.getAllValues should have size 1
|
||||
val actualLogEntryIdBytes = ByteString
|
||||
.copyFrom(logEntryIdCaptor.getValue.asInstanceOf[RawBytes])
|
||||
val expectedLogEntryIdBytes = ByteString.copyFrom(expectedLogEntryId.toByteArray)
|
||||
actualLogEntryIdBytes should be(expectedLogEntryIdBytes)
|
||||
ByteString
|
||||
.copyFrom(logEntryValueCaptor.getValue.asInstanceOf[RawBytes]) should not equal ByteString
|
||||
.copyFrom(logEntryIdCaptor.getValue.asInstanceOf[RawBytes])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"write marshalled key-value pairs to ledger" in {
|
||||
val mockStateOperations = mock[LedgerStateOperations]
|
||||
val mockStateOperations = mock[LedgerStateOperations[Int]]
|
||||
val expectedLogResult: Int = 7
|
||||
when(mockStateOperations.readState(any[Seq[RawBytes]]()))
|
||||
.thenReturn(Future.successful(Seq(Some(aStateValue()))))
|
||||
val writtenKeyValuesCaptor = ArgumentCaptor.forClass(classOf[RawKeyValuePairs])
|
||||
@ -131,7 +135,7 @@ class SubmissionValidatorSpec extends AsyncWordSpec with MockitoSugar with Match
|
||||
.thenReturn(Future.successful(()))
|
||||
val logEntryCaptor = ArgumentCaptor.forClass(classOf[RawBytes])
|
||||
when(mockStateOperations.appendToLog(any[RawBytes](), logEntryCaptor.capture()))
|
||||
.thenReturn(Future.successful(()))
|
||||
.thenReturn(Future.successful(expectedLogResult))
|
||||
val logEntryAndStateResult = (aLogEntry(), someStateUpdates(1))
|
||||
val instance = new SubmissionValidator(
|
||||
new FakeStateAccess(mockStateOperations),
|
||||
@ -140,24 +144,26 @@ class SubmissionValidatorSpec extends AsyncWordSpec with MockitoSugar with Match
|
||||
instance
|
||||
.validateAndCommit(anEnvelope(), "aCorrelationId", newRecordTime(), aParticipantId())
|
||||
.map {
|
||||
case SubmissionValidated =>
|
||||
writtenKeyValuesCaptor.getAllValues should have size 1
|
||||
val writtenKeyValues = writtenKeyValuesCaptor.getValue.asInstanceOf[RawKeyValuePairs]
|
||||
writtenKeyValues should have size 1
|
||||
Try(SubmissionValidator.bytesToStateValue(writtenKeyValues.head._2)).isSuccess shouldBe true
|
||||
logEntryCaptor.getAllValues should have size 1
|
||||
case _ => fail
|
||||
inside(_) {
|
||||
case Right(actualLogResult) =>
|
||||
actualLogResult should be(expectedLogResult)
|
||||
writtenKeyValuesCaptor.getAllValues should have size 1
|
||||
val writtenKeyValues = writtenKeyValuesCaptor.getValue.asInstanceOf[RawKeyValuePairs]
|
||||
writtenKeyValues should have size 1
|
||||
Try(SubmissionValidator.bytesToStateValue(writtenKeyValues.head._2)).isSuccess shouldBe true
|
||||
logEntryCaptor.getAllValues should have size 1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"return invalid submission if state cannot be written" in {
|
||||
val mockStateOperations = mock[LedgerStateOperations]
|
||||
val mockStateOperations = mock[LedgerStateOperations[Int]]
|
||||
when(mockStateOperations.writeState(any[RawKeyValuePairs]()))
|
||||
.thenThrow(new IllegalArgumentException("Write error"))
|
||||
when(mockStateOperations.readState(any[Seq[RawBytes]]()))
|
||||
.thenReturn(Future.successful(Seq(Some(aStateValue()))))
|
||||
when(mockStateOperations.appendToLog(any[RawBytes](), any[RawBytes]()))
|
||||
.thenReturn(Future.successful(()))
|
||||
.thenReturn(Future.successful(99))
|
||||
val logEntryAndStateResult = (aLogEntry(), someStateUpdates(1))
|
||||
val instance = new SubmissionValidator(
|
||||
new FakeStateAccess(mockStateOperations),
|
||||
@ -166,8 +172,9 @@ class SubmissionValidatorSpec extends AsyncWordSpec with MockitoSugar with Match
|
||||
instance
|
||||
.validateAndCommit(anEnvelope(), "aCorrelationId", newRecordTime(), aParticipantId())
|
||||
.map {
|
||||
case ValidationError(reason) => reason should include("Write error")
|
||||
case _ => fail
|
||||
inside(_) {
|
||||
case Left(ValidationError(reason)) => reason should include("Write error")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -217,9 +224,9 @@ class SubmissionValidatorSpec extends AsyncWordSpec with MockitoSugar with Match
|
||||
mockFunction
|
||||
}
|
||||
|
||||
private class FakeStateAccess(mockStateOperations: LedgerStateOperations)
|
||||
extends LedgerStateAccess {
|
||||
override def inTransaction[T](body: LedgerStateOperations => Future[T]): Future[T] =
|
||||
private class FakeStateAccess[LogResult](mockStateOperations: LedgerStateOperations[LogResult])
|
||||
extends LedgerStateAccess[LogResult] {
|
||||
override def inTransaction[T](body: LedgerStateOperations[LogResult] => Future[T]): Future[T] =
|
||||
body(mockStateOperations)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user