mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 01:07:18 +03:00
[kvutils] Handle offsets in case of multiple updates from the same log entry (#6677)
This commit is contained in:
parent
2e836dc6b3
commit
f60ec5c2ed
@ -6,7 +6,7 @@ package com.daml.ledger.on.memory
|
||||
import akka.NotUsed
|
||||
import akka.stream.scaladsl.Source
|
||||
import com.daml.ledger.api.health.{HealthStatus, Healthy}
|
||||
import com.daml.ledger.participant.state.kvutils.KVOffset
|
||||
import com.daml.ledger.participant.state.kvutils.OffsetBuilder
|
||||
import com.daml.ledger.participant.state.kvutils.api.{LedgerReader, LedgerRecord}
|
||||
import com.daml.ledger.participant.state.v1.{LedgerId, Offset}
|
||||
import com.daml.metrics.{Metrics, Timed}
|
||||
@ -23,7 +23,7 @@ class InMemoryLedgerReader(
|
||||
dispatcher
|
||||
.startingAt(
|
||||
startExclusive
|
||||
.map(KVOffset.highestIndex(_).toInt)
|
||||
.map(OffsetBuilder.highestIndex(_).toInt)
|
||||
.getOrElse(StartIndex),
|
||||
RangeSource((startExclusive, endInclusive) =>
|
||||
Source.fromIterator(() => {
|
||||
|
@ -4,7 +4,7 @@
|
||||
package com.daml.ledger.on.memory
|
||||
|
||||
import com.daml.ledger.on.memory.InMemoryState.MutableLog
|
||||
import com.daml.ledger.participant.state.kvutils.KVOffset
|
||||
import com.daml.ledger.participant.state.kvutils.OffsetBuilder
|
||||
import com.daml.ledger.participant.state.kvutils.api.LedgerRecord
|
||||
import com.daml.ledger.participant.state.v1.Offset
|
||||
import com.daml.ledger.validator.BatchingLedgerStateOperations
|
||||
@ -41,7 +41,7 @@ object InMemoryLedgerStateOperations {
|
||||
|
||||
private[memory] def appendEntry(log: MutableLog, createEntry: Offset => LedgerRecord): Index = {
|
||||
val entryAtIndex = log.size
|
||||
val offset = KVOffset.fromLong(entryAtIndex.toLong)
|
||||
val offset = OffsetBuilder.fromLong(entryAtIndex.toLong)
|
||||
val entry = createEntry(offset)
|
||||
log += entry
|
||||
entryAtIndex
|
||||
|
@ -21,7 +21,7 @@ import com.daml.ledger.participant.state.kvutils.api.{
|
||||
LedgerRecord,
|
||||
LedgerWriter
|
||||
}
|
||||
import com.daml.ledger.participant.state.kvutils.{Bytes, KVOffset}
|
||||
import com.daml.ledger.participant.state.kvutils.{Bytes, OffsetBuilder}
|
||||
import com.daml.ledger.participant.state.v1._
|
||||
import com.daml.ledger.validator.LedgerStateOperations.{Key, Value}
|
||||
import com.daml.ledger.validator._
|
||||
@ -81,7 +81,7 @@ final class SqlLedgerReaderWriter(
|
||||
override def events(startExclusive: Option[Offset]): Source[LedgerRecord, NotUsed] =
|
||||
dispatcher
|
||||
.startingAt(
|
||||
KVOffset.highestIndex(startExclusive.getOrElse(StartOffset)),
|
||||
OffsetBuilder.highestIndex(startExclusive.getOrElse(StartOffset)),
|
||||
RangeSource(
|
||||
(startExclusive, endInclusive) =>
|
||||
Source
|
||||
@ -123,7 +123,7 @@ final class SqlLedgerReaderWriter(
|
||||
}
|
||||
|
||||
object SqlLedgerReaderWriter {
|
||||
private val StartOffset: Offset = KVOffset.fromLong(StartIndex)
|
||||
private val StartOffset: Offset = OffsetBuilder.fromLong(StartIndex)
|
||||
|
||||
val DefaultTimeProvider: TimeProvider = TimeProvider.UTC
|
||||
|
||||
|
@ -9,7 +9,7 @@ import anorm.SqlParser._
|
||||
import anorm._
|
||||
import com.daml.ledger.on.sql.Index
|
||||
import com.daml.ledger.on.sql.queries.Queries._
|
||||
import com.daml.ledger.participant.state.kvutils.KVOffset
|
||||
import com.daml.ledger.participant.state.kvutils.OffsetBuilder
|
||||
import com.daml.ledger.participant.state.kvutils.api.LedgerRecord
|
||||
import com.daml.ledger.validator.LedgerStateOperations.{Key, Value}
|
||||
|
||||
@ -32,7 +32,7 @@ trait CommonQueries extends Queries {
|
||||
SQL"SELECT sequence_no, entry_id, envelope FROM #$LogTable WHERE sequence_no > $startExclusive AND sequence_no <= $endInclusive ORDER BY sequence_no"
|
||||
.as((long("sequence_no") ~ getBytes("entry_id") ~ getBytes("envelope")).map {
|
||||
case index ~ entryId ~ envelope =>
|
||||
index -> LedgerRecord(KVOffset.fromLong(index), entryId, envelope)
|
||||
index -> LedgerRecord(OffsetBuilder.fromLong(index), entryId, envelope)
|
||||
}.*)
|
||||
}
|
||||
|
||||
|
@ -5,7 +5,21 @@ package com.daml.ledger.participant.state.kvutils
|
||||
|
||||
import com.daml.ledger.participant.state.v1.Offset
|
||||
|
||||
object KVOffset {
|
||||
/**
|
||||
* Helper functions for generating 16 byte [[com.daml.ledger.participant.state.v1.Offset]]s from integers.
|
||||
* The created offset will look as follows:
|
||||
* | highest index (64 bits) | middle index (32 bits) | lowest index (32 bits) |
|
||||
* Leading zeros will be retained when generating the resulting offset bytes.
|
||||
*
|
||||
* Example usage:
|
||||
* * If you have one record per block then just use [[OffsetBuilder.fromLong(<block-ID>)]]
|
||||
* * If you may have multiple records per block then use [[OffsetBuilder.fromLong(<block-ID>, <index>)]],
|
||||
* where <index> denotes the position or index of a given log entry in the block.
|
||||
*
|
||||
* @see com.daml.ledger.participant.state.v1.Offset
|
||||
* @see com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantStateReader
|
||||
*/
|
||||
object OffsetBuilder {
|
||||
private[kvutils] val highestStart = 0
|
||||
private[kvutils] val middleStart = 8
|
||||
private[kvutils] val lowestStart = 12
|
||||
@ -18,6 +32,12 @@ object KVOffset {
|
||||
fromLong(highest)
|
||||
}
|
||||
|
||||
def dropLowestIndex(offset: Offset): Offset = {
|
||||
val highest = highestIndex(offset)
|
||||
val middle = middleIndex(offset)
|
||||
fromLong(highest, middle.toInt)
|
||||
}
|
||||
|
||||
def setMiddleIndex(offset: Offset, middle: Int): Offset = {
|
||||
val highest = highestIndex(offset)
|
||||
val lowest = lowestIndex(offset)
|
@ -34,7 +34,7 @@ class KeyValueParticipantState(
|
||||
extends ReadService
|
||||
with WriteService {
|
||||
private val readerAdapter =
|
||||
new KeyValueParticipantStateReader(reader, metrics)
|
||||
KeyValueParticipantStateReader(reader, metrics)
|
||||
private val writerAdapter =
|
||||
new KeyValueParticipantStateWriter(new TimedLedgerWriter(writer, metrics), metrics)
|
||||
|
||||
|
@ -7,21 +7,38 @@ import akka.NotUsed
|
||||
import akka.stream.Materializer
|
||||
import akka.stream.scaladsl.Source
|
||||
import com.daml.ledger.api.health.HealthStatus
|
||||
import com.daml.ledger.participant.state.kvutils.DamlKvutils.DamlLogEntryId
|
||||
import com.daml.ledger.participant.state.kvutils.{Envelope, KVOffset, KeyValueConsumption}
|
||||
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlLogEntry, DamlLogEntryId}
|
||||
import com.daml.ledger.participant.state.kvutils.{Envelope, KeyValueConsumption, OffsetBuilder}
|
||||
import com.daml.ledger.participant.state.v1._
|
||||
import com.daml.lf.data.Time
|
||||
import com.daml.lf.data.Time.Timestamp
|
||||
import com.daml.metrics.{Metrics, Timed}
|
||||
|
||||
class KeyValueParticipantStateReader(reader: LedgerReader, metrics: Metrics)(
|
||||
/**
|
||||
* Adapts a [[LedgerReader]] instance to [[ReadService]].
|
||||
* Performs translation between the offsets required by the underlying reader and [[ReadService]]:
|
||||
* * a 3 component integer offset is exposed to [[ReadService]] (see [[OffsetBuilder.fromLong]]),
|
||||
* * a max. 2 component integer offset is expected from the underlying [[LedgerReader]], and
|
||||
* * the third (lowest index) component is generated as the index of the update in case more than
|
||||
* 1 has been generated by [[KeyValueConsumption.logEntryToUpdate]],
|
||||
* * otherwise the offset is passed on to [[ReadService]] as-is.
|
||||
*
|
||||
* @see com.daml.ledger.participant.state.kvutils.OffsetBuilder
|
||||
*/
|
||||
class KeyValueParticipantStateReader private[api] (
|
||||
reader: LedgerReader,
|
||||
metrics: Metrics,
|
||||
logEntryToUpdate: (DamlLogEntryId, DamlLogEntry, Option[Timestamp]) => List[Update])(
|
||||
implicit materializer: Materializer)
|
||||
extends ReadService {
|
||||
import KeyValueParticipantStateReader._
|
||||
|
||||
override def getLedgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] =
|
||||
Source.single(createLedgerInitialConditions())
|
||||
|
||||
override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] = {
|
||||
Source
|
||||
.single(beginAfter.map(KVOffset.onlyKeepHighestIndex))
|
||||
.single(beginAfter.map(OffsetBuilder.dropLowestIndex))
|
||||
.flatMapConcat(reader.events)
|
||||
.flatMapConcat {
|
||||
case LedgerRecord(offset, entryId, envelope) =>
|
||||
@ -32,12 +49,10 @@ class KeyValueParticipantStateReader(reader: LedgerReader, metrics: Metrics)(
|
||||
Timed.value(
|
||||
metrics.daml.kvutils.reader.parseUpdates, {
|
||||
val logEntryId = DamlLogEntryId.parseFrom(entryId)
|
||||
val updates = KeyValueConsumption.logEntryToUpdate(logEntryId, logEntry)
|
||||
val updateOffset: (Offset, Int) => Offset =
|
||||
if (updates.size > 1) KVOffset.setMiddleIndex else (offset, _) => offset
|
||||
val updates = logEntryToUpdate(logEntryId, logEntry, None)
|
||||
val updatesWithOffsets = Source(updates).zipWithIndex.map {
|
||||
case (update, index) =>
|
||||
updateOffset(offset, index.toInt) -> update
|
||||
offsetForUpdate(offset, index.toInt, updates.size) -> update
|
||||
}
|
||||
Right(updatesWithOffsets)
|
||||
}
|
||||
@ -59,3 +74,19 @@ class KeyValueParticipantStateReader(reader: LedgerReader, metrics: Metrics)(
|
||||
LedgerReader.DefaultConfiguration,
|
||||
Time.Timestamp.Epoch)
|
||||
}
|
||||
|
||||
object KeyValueParticipantStateReader {
|
||||
def apply(reader: LedgerReader, metrics: Metrics)(
|
||||
implicit materializer: Materializer): KeyValueParticipantStateReader =
|
||||
new KeyValueParticipantStateReader(reader, metrics, KeyValueConsumption.logEntryToUpdate)
|
||||
|
||||
private[api] def offsetForUpdate(
|
||||
offsetFromRecord: Offset,
|
||||
index: Int,
|
||||
totalUpdates: Int): Offset =
|
||||
if (totalUpdates > 1) {
|
||||
OffsetBuilder.setLowestIndex(offsetFromRecord, index)
|
||||
} else {
|
||||
offsetFromRecord
|
||||
}
|
||||
}
|
||||
|
@ -15,6 +15,8 @@ import com.daml.ledger.api.health.ReportsHealth
|
||||
*
|
||||
* For a detailed description of the required semantics of state updates see
|
||||
* [[com.daml.ledger.participant.state.v1.ReadService]].
|
||||
* For a detailed description of the requirements on how offsets should be generated see
|
||||
* [[com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantStateReader]].
|
||||
*/
|
||||
trait LedgerReader extends ReportsHealth {
|
||||
|
||||
@ -22,7 +24,7 @@ trait LedgerReader extends ReportsHealth {
|
||||
* Streams raw updates from the given offset for the participant.
|
||||
*
|
||||
* In case an offset is not specified, all updates must be streamed from the oldest known state.
|
||||
* Each update is defined either as an opaque log entry ID and an
|
||||
* Each update is defined as an opaque log entry ID and an
|
||||
* envelope ([[com.daml.ledger.participant.state.kvutils.api.LedgerRecord]]).
|
||||
*
|
||||
* @param startExclusive offset right after which updates must be streamed; in case not specified updates
|
||||
|
@ -12,7 +12,7 @@ import com.codahale.metrics.MetricRegistry
|
||||
import com.daml.bazeltools.BazelRunfiles.rlocation
|
||||
import com.daml.daml_lf_dev.DamlLf
|
||||
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
|
||||
import com.daml.ledger.participant.state.kvutils.KVOffset.{fromLong => toOffset}
|
||||
import com.daml.ledger.participant.state.kvutils.OffsetBuilder.{fromLong => toOffset}
|
||||
import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase._
|
||||
import com.daml.ledger.participant.state.v1.Update._
|
||||
import com.daml.ledger.participant.state.v1._
|
||||
|
@ -1,58 +0,0 @@
|
||||
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.ledger.participant.state.kvutils
|
||||
|
||||
import com.daml.ledger.participant.state.v1.Offset
|
||||
import com.daml.lf.data
|
||||
import org.scalatest.{Matchers, WordSpec}
|
||||
|
||||
class KVOffsetSpec extends WordSpec with Matchers {
|
||||
|
||||
"KVOffset" should {
|
||||
val zeroBytes = data.Bytes.fromByteArray(Array.fill(16)(0: Byte))
|
||||
|
||||
def triple(offset: Offset) =
|
||||
(KVOffset.highestIndex(offset), KVOffset.middleIndex(offset), KVOffset.lowestIndex(offset))
|
||||
|
||||
"set 0 bytes" in {
|
||||
KVOffset.fromLong(0).bytes shouldEqual zeroBytes
|
||||
}
|
||||
|
||||
"extract the correct indexes" in {
|
||||
val offset = KVOffset.fromLong(1, 2, 3)
|
||||
triple(offset) shouldBe ((1, 2, 3))
|
||||
}
|
||||
|
||||
"only change individual indexes" in {
|
||||
val offset = KVOffset.fromLong(1, 2, 3)
|
||||
|
||||
triple(KVOffset.setLowestIndex(offset, 17)) shouldBe ((1, 2, 17))
|
||||
triple(KVOffset.setMiddleIndex(offset, 17)) shouldBe ((1, 17, 3))
|
||||
}
|
||||
|
||||
"zero out the middle and lowest index" in {
|
||||
val offset = KVOffset.fromLong(1, 2, 3)
|
||||
triple(KVOffset.onlyKeepHighestIndex(offset)) shouldBe ((1, 0, 0))
|
||||
}
|
||||
|
||||
"retain leading zeros" in {
|
||||
val offset = KVOffset.fromLong(1, 2, 3)
|
||||
val highest = offset.toByteArray.slice(KVOffset.highestStart, KVOffset.middleStart)
|
||||
val middle = offset.toByteArray.slice(KVOffset.middleStart, KVOffset.lowestStart)
|
||||
val lowest = offset.toByteArray.slice(KVOffset.lowestStart, KVOffset.end)
|
||||
|
||||
val highestZeros = highest.dropRight(1)
|
||||
highestZeros.forall(_ == 0) shouldBe true
|
||||
highest.takeRight(1)(0) shouldBe 1
|
||||
|
||||
val middleZeros = middle.dropRight(1)
|
||||
middleZeros.forall(_ == 0) shouldBe true
|
||||
middle.takeRight(1)(0) shouldBe 2
|
||||
|
||||
val lowestZeros = lowest.dropRight(1)
|
||||
lowestZeros.forall(_ == 0) shouldBe true
|
||||
lowest.takeRight(1)(0) shouldBe 3
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,66 @@
|
||||
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.ledger.participant.state.kvutils
|
||||
|
||||
import com.daml.ledger.participant.state.v1.Offset
|
||||
import com.daml.lf.data
|
||||
import org.scalatest.{Matchers, WordSpec}
|
||||
|
||||
class OffsetBuilderSpec extends WordSpec with Matchers {
|
||||
|
||||
"OffsetBuilder" should {
|
||||
val zeroBytes = data.Bytes.fromByteArray(Array.fill(16)(0: Byte))
|
||||
|
||||
def triple(offset: Offset): (Long, Long, Long) =
|
||||
(
|
||||
OffsetBuilder.highestIndex(offset),
|
||||
OffsetBuilder.middleIndex(offset),
|
||||
OffsetBuilder.lowestIndex(offset))
|
||||
|
||||
"set 0 bytes" in {
|
||||
OffsetBuilder.fromLong(0).bytes shouldEqual zeroBytes
|
||||
}
|
||||
|
||||
"extract the correct indexes" in {
|
||||
val offset = OffsetBuilder.fromLong(1, 2, 3)
|
||||
triple(offset) shouldBe ((1, 2, 3))
|
||||
}
|
||||
|
||||
"only change individual indexes" in {
|
||||
val offset = OffsetBuilder.fromLong(1, 2, 3)
|
||||
|
||||
triple(OffsetBuilder.setLowestIndex(offset, 17)) shouldBe ((1, 2, 17))
|
||||
triple(OffsetBuilder.setMiddleIndex(offset, 17)) shouldBe ((1, 17, 3))
|
||||
}
|
||||
|
||||
"zero out the middle and lowest index" in {
|
||||
val offset = OffsetBuilder.fromLong(1, 2, 3)
|
||||
triple(OffsetBuilder.onlyKeepHighestIndex(offset)) shouldBe ((1, 0, 0))
|
||||
}
|
||||
|
||||
"zero out the lowest index" in {
|
||||
val offset = OffsetBuilder.fromLong(1, 2, 3)
|
||||
triple(OffsetBuilder.dropLowestIndex(offset)) shouldBe ((1, 2, 0))
|
||||
}
|
||||
|
||||
"retain leading zeros" in {
|
||||
val offset = OffsetBuilder.fromLong(1, 2, 3)
|
||||
val highest = offset.toByteArray.slice(OffsetBuilder.highestStart, OffsetBuilder.middleStart)
|
||||
val middle = offset.toByteArray.slice(OffsetBuilder.middleStart, OffsetBuilder.lowestStart)
|
||||
val lowest = offset.toByteArray.slice(OffsetBuilder.lowestStart, OffsetBuilder.end)
|
||||
|
||||
val highestZeros = highest.dropRight(1)
|
||||
highestZeros.forall(_ == 0) shouldBe true
|
||||
highest.takeRight(1)(0) shouldBe 1
|
||||
|
||||
val middleZeros = middle.dropRight(1)
|
||||
middleZeros.forall(_ == 0) shouldBe true
|
||||
middle.takeRight(1)(0) shouldBe 2
|
||||
|
||||
val lowestZeros = lowest.dropRight(1)
|
||||
lowestZeros.forall(_ == 0) shouldBe true
|
||||
lowest.takeRight(1)(0) shouldBe 3
|
||||
}
|
||||
}
|
||||
}
|
@ -4,13 +4,17 @@
|
||||
package com.daml.ledger.participant.state.kvutils.api
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.stream.Materializer
|
||||
import akka.stream.scaladsl.{Sink, Source}
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
|
||||
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
|
||||
import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantStateReader.offsetForUpdate
|
||||
import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantStateReaderSpec._
|
||||
import com.daml.ledger.participant.state.kvutils.{Bytes, Envelope, KVOffset}
|
||||
import com.daml.ledger.participant.state.v1.{Offset, Update}
|
||||
import com.daml.ledger.participant.state.kvutils.{Bytes, Envelope, OffsetBuilder}
|
||||
import com.daml.ledger.participant.state.v1.{Offset, ParticipantId, Update}
|
||||
import com.daml.lf.data.Ref
|
||||
import com.daml.lf.data.Time.Timestamp
|
||||
import com.daml.metrics.Metrics
|
||||
import com.google.protobuf.ByteString
|
||||
import org.mockito.Mockito.when
|
||||
@ -24,9 +28,7 @@ class KeyValueParticipantStateReaderSpec
|
||||
with Matchers
|
||||
with AkkaBeforeAndAfterAll {
|
||||
|
||||
import KVOffset.{fromLong => toOffset}
|
||||
|
||||
private def newMetrics = new Metrics(new MetricRegistry)
|
||||
import OffsetBuilder.{fromLong => toOffset}
|
||||
|
||||
"participant state reader" should {
|
||||
"stream offsets from the start" in {
|
||||
@ -36,7 +38,7 @@ class KeyValueParticipantStateReaderSpec
|
||||
LedgerRecord(toOffset(2), aLogEntryId(2), aWrappedLogEntry),
|
||||
LedgerRecord(toOffset(3), aLogEntryId(3), aWrappedLogEntry),
|
||||
)
|
||||
val instance = new KeyValueParticipantStateReader(reader, newMetrics)
|
||||
val instance = createInstance(reader)
|
||||
val stream = instance.stateUpdates(None)
|
||||
|
||||
offsetsFrom(stream).map { actual =>
|
||||
@ -49,7 +51,7 @@ class KeyValueParticipantStateReaderSpec
|
||||
}
|
||||
}
|
||||
|
||||
"stream offsets from a given offset" in {
|
||||
"stream offsets from a given 1 component offset" in {
|
||||
val reader = readerStreamingFrom(
|
||||
offset = Some(toOffset(4)),
|
||||
LedgerRecord(toOffset(5), aLogEntryId(5), aWrappedLogEntry),
|
||||
@ -57,7 +59,7 @@ class KeyValueParticipantStateReaderSpec
|
||||
LedgerRecord(toOffset(7), aLogEntryId(7), aWrappedLogEntry),
|
||||
LedgerRecord(toOffset(8), aLogEntryId(8), aWrappedLogEntry),
|
||||
)
|
||||
val instance = new KeyValueParticipantStateReader(reader, newMetrics)
|
||||
val instance = createInstance(reader)
|
||||
val stream = instance.stateUpdates(Some(toOffset(4)))
|
||||
|
||||
offsetsFrom(stream).map { actual =>
|
||||
@ -71,12 +73,12 @@ class KeyValueParticipantStateReaderSpec
|
||||
}
|
||||
}
|
||||
|
||||
"remove index suffix when streaming from underlying reader" in {
|
||||
"remove third component of input offset when streaming from underlying reader" in {
|
||||
val reader = readerStreamingFrom(
|
||||
offset = Some(toOffset(1)),
|
||||
offset = Some(toOffset(1, 2)),
|
||||
LedgerRecord(toOffset(2), aLogEntryId(2), aWrappedLogEntry))
|
||||
val instance = new KeyValueParticipantStateReader(reader, newMetrics)
|
||||
val stream = instance.stateUpdates(Some(toOffset(1)))
|
||||
val instance = createInstance(reader)
|
||||
val stream = instance.stateUpdates(Some(toOffset(1, 2, 3)))
|
||||
|
||||
offsetsFrom(stream).map { actual =>
|
||||
actual should have size 1
|
||||
@ -84,18 +86,40 @@ class KeyValueParticipantStateReaderSpec
|
||||
}
|
||||
}
|
||||
|
||||
"append index to internal offset" in {
|
||||
"do not append index to underlying reader's offset in case of no more than 1 update" in {
|
||||
val reader = readerStreamingFrom(
|
||||
offset = None,
|
||||
LedgerRecord(toOffset(1), aLogEntryId(1), aWrappedLogEntry),
|
||||
LedgerRecord(toOffset(2), aLogEntryId(2), aWrappedLogEntry)
|
||||
)
|
||||
val instance = new KeyValueParticipantStateReader(reader, newMetrics)
|
||||
for (updateGenerator <- Seq(zeroUpdateGenerator, singleUpdateGenerator)) {
|
||||
val instance = createInstance(reader, updateGenerator)
|
||||
val stream = instance.stateUpdates(None)
|
||||
|
||||
offsetsFrom(stream).map { actual =>
|
||||
actual should have size 2
|
||||
actual shouldBe Seq(toOffset(1), toOffset(2))
|
||||
}
|
||||
}
|
||||
succeed
|
||||
}
|
||||
|
||||
"append index to underlying reader's offset in case of more than 1 update" in {
|
||||
val reader = readerStreamingFrom(
|
||||
offset = None,
|
||||
LedgerRecord(toOffset(1, 11), aLogEntryId(1), aWrappedLogEntry),
|
||||
LedgerRecord(toOffset(2, 22), aLogEntryId(2), aWrappedLogEntry)
|
||||
)
|
||||
val instance = createInstance(reader, twoUpdatesGenerator)
|
||||
val stream = instance.stateUpdates(None)
|
||||
|
||||
offsetsFrom(stream).map { actual =>
|
||||
actual should have size 2
|
||||
actual shouldBe Seq(toOffset(1), toOffset(2))
|
||||
actual should have size 4
|
||||
actual shouldBe Seq(
|
||||
toOffset(1, 11, 0),
|
||||
toOffset(1, 11, 1),
|
||||
toOffset(2, 22, 0),
|
||||
toOffset(2, 22, 1))
|
||||
}
|
||||
}
|
||||
|
||||
@ -106,10 +130,10 @@ class KeyValueParticipantStateReaderSpec
|
||||
LedgerRecord(toOffset(3), aLogEntryId(3), aWrappedLogEntry)
|
||||
)
|
||||
|
||||
def getInstance(offset: Option[Offset], items: LedgerRecord*) =
|
||||
new KeyValueParticipantStateReader(
|
||||
readerStreamingFrom(offset = offset, items: _*),
|
||||
newMetrics)
|
||||
def getInstance(
|
||||
offset: Option[Offset],
|
||||
items: LedgerRecord*): KeyValueParticipantStateReader =
|
||||
createInstance(readerStreamingFrom(offset = offset, items: _*))
|
||||
|
||||
val instances = records.tails.flatMap {
|
||||
case first :: rest =>
|
||||
@ -136,7 +160,7 @@ class KeyValueParticipantStateReaderSpec
|
||||
val reader = readerStreamingFrom(
|
||||
offset = None,
|
||||
LedgerRecord(toOffset(0), aLogEntryId(0), anInvalidEnvelope))
|
||||
val instance = new KeyValueParticipantStateReader(reader, newMetrics)
|
||||
val instance = createInstance(reader)
|
||||
|
||||
offsetsFrom(instance.stateUpdates(None)).failed.map { _ =>
|
||||
succeed
|
||||
@ -154,7 +178,7 @@ class KeyValueParticipantStateReaderSpec
|
||||
val reader = readerStreamingFrom(
|
||||
offset = None,
|
||||
LedgerRecord(toOffset(0), aLogEntryId(0), anInvalidEnvelopeMessage))
|
||||
val instance = new KeyValueParticipantStateReader(reader, newMetrics)
|
||||
val instance = createInstance(reader)
|
||||
|
||||
offsetsFrom(instance.stateUpdates(None)).failed.map { _ =>
|
||||
succeed
|
||||
@ -162,6 +186,29 @@ class KeyValueParticipantStateReaderSpec
|
||||
}
|
||||
}
|
||||
|
||||
"offsetForUpdate" should {
|
||||
"not overwrite middle offset from record in case of 2 updates" in {
|
||||
val offsetFromRecord = OffsetBuilder.fromLong(1, 2)
|
||||
for (subOffset <- Seq(0, 1)) {
|
||||
offsetForUpdate(offsetFromRecord, subOffset, 2) shouldBe OffsetBuilder.fromLong(
|
||||
1,
|
||||
2,
|
||||
subOffset)
|
||||
}
|
||||
succeed
|
||||
}
|
||||
|
||||
"use original offset in case less than 2 updates" in {
|
||||
val expectedOffset = OffsetBuilder.fromLong(1, 2, 3)
|
||||
for (totalUpdates <- Seq(0, 1)) {
|
||||
for (i <- 0 until totalUpdates) {
|
||||
offsetForUpdate(expectedOffset, i, totalUpdates) shouldBe expectedOffset
|
||||
}
|
||||
}
|
||||
succeed
|
||||
}
|
||||
}
|
||||
|
||||
private def offsetsFrom(stream: Source[(Offset, Update), NotUsed]): Future[Seq[Offset]] =
|
||||
stream.runWith(Sink.seq).map(_.map(_._1))
|
||||
}
|
||||
@ -177,6 +224,27 @@ object KeyValueParticipantStateReaderSpec {
|
||||
|
||||
private val aWrappedLogEntry = Envelope.enclose(aLogEntry)
|
||||
|
||||
private val zeroUpdateGenerator
|
||||
: (DamlLogEntryId, DamlLogEntry, Option[Timestamp]) => List[Update] = (_, _, _) => List.empty
|
||||
|
||||
private val singleUpdateGenerator
|
||||
: (DamlLogEntryId, DamlLogEntry, Option[Timestamp]) => List[Update] = (_, _, _) =>
|
||||
List(
|
||||
Update.PartyAddedToParticipant(
|
||||
Ref.Party.assertFromString("aParty"),
|
||||
"a party",
|
||||
ParticipantId.assertFromString("aParticipant"),
|
||||
Timestamp.now(),
|
||||
submissionId = None))
|
||||
|
||||
private val twoUpdatesGenerator
|
||||
: (DamlLogEntryId, DamlLogEntry, Option[Timestamp]) => List[Update] =
|
||||
(entryId, entry, recordTime) =>
|
||||
singleUpdateGenerator(entryId, entry, recordTime) ::: singleUpdateGenerator(
|
||||
entryId,
|
||||
entry,
|
||||
recordTime)
|
||||
|
||||
private def aLogEntryId(index: Int): Bytes =
|
||||
DamlLogEntryId.newBuilder
|
||||
.setEntryId(ByteString.copyFrom(s"id-$index".getBytes))
|
||||
@ -190,4 +258,10 @@ object KeyValueParticipantStateReaderSpec {
|
||||
reader
|
||||
}
|
||||
|
||||
private def createInstance(
|
||||
reader: LedgerReader,
|
||||
logEntryToUpdate: (DamlLogEntryId, DamlLogEntry, Option[Timestamp]) => List[Update] =
|
||||
singleUpdateGenerator)(
|
||||
implicit materializer: Materializer): KeyValueParticipantStateReader =
|
||||
new KeyValueParticipantStateReader(reader, new Metrics(new MetricRegistry), logEntryToUpdate)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user