kvutils: Remove the LedgerEntry trait; it's no longer necessary. (#5383)

* kvutils: Remove the LedgerEntry trait; it's no longer necessary.

This was introduced to allow for heartbeats, which no longer exist.

CHANGELOG_BEGIN
CHANGELOG_END

* kvutils: Make LedgerRecord a case class again.

We used to store the envelope as an array of bytes, which doesn't have a
value-based `equals` method and therefore should not be used in a case
class. We now use a `ByteString`, so this is no longer an issue.
This commit is contained in:
Samir Talwar 2020-04-03 09:19:45 +02:00 committed by GitHub
parent 33b55465cb
commit 1d9c7d2574
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 37 additions and 61 deletions

View File

@ -11,7 +11,7 @@ import akka.stream.scaladsl.Source
import com.codahale.metrics.MetricRegistry import com.codahale.metrics.MetricRegistry
import com.daml.ledger.on.memory.InMemoryLedgerReaderWriter._ import com.daml.ledger.on.memory.InMemoryLedgerReaderWriter._
import com.daml.ledger.on.memory.InMemoryState.MutableLog import com.daml.ledger.on.memory.InMemoryState.MutableLog
import com.daml.ledger.participant.state.kvutils.api.{LedgerEntry, LedgerReader, LedgerWriter} import com.daml.ledger.participant.state.kvutils.api.{LedgerReader, LedgerRecord, LedgerWriter}
import com.daml.ledger.participant.state.kvutils.{Bytes, KVOffset, SequentialLogEntryId} import com.daml.ledger.participant.state.kvutils.{Bytes, KVOffset, SequentialLogEntryId}
import com.daml.ledger.participant.state.v1._ import com.daml.ledger.participant.state.v1._
import com.daml.ledger.validator.LedgerStateOperations.{Key, Value} import com.daml.ledger.validator.LedgerStateOperations.{Key, Value}
@ -54,7 +54,7 @@ final class InMemoryLedgerReaderWriter(
override def commit(correlationId: String, envelope: Bytes): Future[SubmissionResult] = override def commit(correlationId: String, envelope: Bytes): Future[SubmissionResult] =
committer.commit(correlationId, envelope, participantId) committer.commit(correlationId, envelope, participantId)
override def events(startExclusive: Option[Offset]): Source[LedgerEntry, NotUsed] = override def events(startExclusive: Option[Offset]): Source[LedgerRecord, NotUsed] =
dispatcher dispatcher
.startingAt( .startingAt(
startExclusive startExclusive
@ -93,7 +93,7 @@ private class InMemoryLedgerStateOperations(
} }
override def appendToLog(key: Key, value: Value): Future[Index] = override def appendToLog(key: Key, value: Value): Future[Index] =
Future.successful(appendEntry(log, LedgerEntry.LedgerRecord(_, key, value))) Future.successful(appendEntry(log, LedgerRecord(_, key, value)))
} }
object InMemoryLedgerReaderWriter { object InMemoryLedgerReaderWriter {
@ -168,7 +168,7 @@ object InMemoryLedgerReaderWriter {
headAtInitialization = StartIndex, headAtInitialization = StartIndex,
)) ))
private[memory] def appendEntry(log: MutableLog, createEntry: Offset => LedgerEntry): Int = { private[memory] def appendEntry(log: MutableLog, createEntry: Offset => LedgerRecord): Int = {
val entryAtIndex = log.size val entryAtIndex = log.size
val offset = KVOffset.fromLong(entryAtIndex.toLong) val offset = KVOffset.fromLong(entryAtIndex.toLong)
val entry = createEntry(offset) val entry = createEntry(offset)

View File

@ -7,8 +7,7 @@ import java.util.concurrent.Semaphore
import com.daml.ledger.on.memory.InMemoryState._ import com.daml.ledger.on.memory.InMemoryState._
import com.daml.ledger.participant.state.kvutils.Bytes import com.daml.ledger.participant.state.kvutils.Bytes
import com.daml.ledger.participant.state.kvutils.api.LedgerEntry import com.daml.ledger.participant.state.kvutils.api.LedgerRecord
import com.daml.ledger.participant.state.kvutils.api.LedgerEntry.LedgerRecord
import com.daml.ledger.participant.state.v1.Offset import com.daml.ledger.participant.state.v1.Offset
import com.google.protobuf.ByteString import com.google.protobuf.ByteString
@ -33,10 +32,10 @@ private[memory] class InMemoryState private (log: MutableLog, state: MutableStat
} }
object InMemoryState { object InMemoryState {
type ImmutableLog = IndexedSeq[LedgerEntry] type ImmutableLog = IndexedSeq[LedgerRecord]
type ImmutableState = collection.Map[StateKey, StateValue] type ImmutableState = collection.Map[StateKey, StateValue]
type MutableLog = mutable.Buffer[LedgerEntry] with ImmutableLog type MutableLog = mutable.Buffer[LedgerRecord] with ImmutableLog
type MutableState = mutable.Map[StateKey, StateValue] with ImmutableState type MutableState = mutable.Map[StateKey, StateValue] with ImmutableState
type StateKey = Bytes type StateKey = Bytes

View File

@ -12,7 +12,7 @@ import com.codahale.metrics.MetricRegistry
import com.daml.ledger.on.sql.SqlLedgerReaderWriter._ import com.daml.ledger.on.sql.SqlLedgerReaderWriter._
import com.daml.ledger.on.sql.queries.Queries import com.daml.ledger.on.sql.queries.Queries
import com.daml.ledger.participant.state.kvutils.DamlKvutils.DamlLogEntryId import com.daml.ledger.participant.state.kvutils.DamlKvutils.DamlLogEntryId
import com.daml.ledger.participant.state.kvutils.api.{LedgerEntry, LedgerReader, LedgerWriter} import com.daml.ledger.participant.state.kvutils.api.{LedgerReader, LedgerRecord, LedgerWriter}
import com.daml.ledger.participant.state.kvutils.{Bytes, KVOffset} import com.daml.ledger.participant.state.kvutils.{Bytes, KVOffset}
import com.daml.ledger.participant.state.v1._ import com.daml.ledger.participant.state.v1._
import com.daml.ledger.validator.LedgerStateOperations.{Key, Value} import com.daml.ledger.validator.LedgerStateOperations.{Key, Value}
@ -66,7 +66,7 @@ final class SqlLedgerReaderWriter(
override def currentHealth(): HealthStatus = Healthy override def currentHealth(): HealthStatus = Healthy
override def events(startExclusive: Option[Offset]): Source[LedgerEntry, NotUsed] = override def events(startExclusive: Option[Offset]): Source[LedgerRecord, NotUsed] =
dispatcher dispatcher
.startingAt( .startingAt(
KVOffset.highestIndex(startExclusive.getOrElse(StartOffset)), KVOffset.highestIndex(startExclusive.getOrElse(StartOffset)),

View File

@ -10,7 +10,7 @@ import anorm._
import com.daml.ledger.on.sql.Index import com.daml.ledger.on.sql.Index
import com.daml.ledger.on.sql.queries.Queries._ import com.daml.ledger.on.sql.queries.Queries._
import com.daml.ledger.participant.state.kvutils.KVOffset import com.daml.ledger.participant.state.kvutils.KVOffset
import com.daml.ledger.participant.state.kvutils.api.LedgerEntry import com.daml.ledger.participant.state.kvutils.api.LedgerRecord
import com.daml.ledger.validator.LedgerStateOperations.{Key, Value} import com.daml.ledger.validator.LedgerStateOperations.{Key, Value}
import scala.collection.{breakOut, immutable} import scala.collection.{breakOut, immutable}
@ -28,14 +28,14 @@ trait CommonQueries extends Queries {
override final def selectFromLog( override final def selectFromLog(
start: Index, start: Index,
end: Index, end: Index,
): Try[immutable.Seq[(Index, LedgerEntry)]] = Try { ): Try[immutable.Seq[(Index, LedgerRecord)]] = Try {
SQL"SELECT sequence_no, entry_id, envelope FROM #$LogTable WHERE sequence_no > $start AND sequence_no <= $end ORDER BY sequence_no" SQL"SELECT sequence_no, entry_id, envelope FROM #$LogTable WHERE sequence_no > $start AND sequence_no <= $end ORDER BY sequence_no"
.as( .as(
(long("sequence_no") (long("sequence_no")
~ getBytes("entry_id") ~ getBytes("entry_id")
~ getBytes("envelope")).map { ~ getBytes("envelope")).map {
case index ~ entryId ~ envelope => case index ~ entryId ~ envelope =>
index -> LedgerEntry.LedgerRecord( index -> LedgerRecord(
KVOffset.fromLong(index), KVOffset.fromLong(index),
entryId, entryId,
envelope, envelope,

View File

@ -4,7 +4,7 @@
package com.daml.ledger.on.sql.queries package com.daml.ledger.on.sql.queries
import com.daml.ledger.on.sql.Index import com.daml.ledger.on.sql.Index
import com.daml.ledger.participant.state.kvutils.api.LedgerEntry import com.daml.ledger.participant.state.kvutils.api.LedgerRecord
import com.daml.ledger.validator.LedgerStateOperations.{Key, Value} import com.daml.ledger.validator.LedgerStateOperations.{Key, Value}
import scala.collection.immutable import scala.collection.immutable
@ -13,7 +13,7 @@ import scala.util.Try
trait ReadQueries { trait ReadQueries {
def selectLatestLogEntryId(): Try[Option[Index]] def selectLatestLogEntryId(): Try[Option[Index]]
def selectFromLog(start: Index, end: Index): Try[immutable.Seq[(Index, LedgerEntry)]] def selectFromLog(start: Index, end: Index): Try[immutable.Seq[(Index, LedgerRecord)]]
def selectStateValuesByKeys(keys: Seq[Key]): Try[immutable.Seq[Option[Value]]] def selectStateValuesByKeys(keys: Seq[Key]): Try[immutable.Seq[Option[Value]]]
} }

View File

@ -22,7 +22,7 @@ class KeyValueParticipantStateReader(reader: LedgerReader)(implicit materializer
.single(beginAfter.map(KVOffset.onlyKeepHighestIndex)) .single(beginAfter.map(KVOffset.onlyKeepHighestIndex))
.flatMapConcat(reader.events) .flatMapConcat(reader.events)
.flatMapConcat { .flatMapConcat {
case LedgerEntry.LedgerRecord(offset, entryId, envelope) => case LedgerRecord(offset, entryId, envelope) =>
Envelope Envelope
.open(envelope) .open(envelope)
.flatMap { .flatMap {

View File

@ -1,38 +0,0 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils.api
import com.daml.ledger.participant.state.kvutils.Bytes
import com.daml.ledger.participant.state.v1.Offset
/**
* Defines a log entry or update that may be read from the ledger.
*/
sealed trait LedgerEntry {
def offset: Offset
}
object LedgerEntry {
/**
* A log entry read from the ledger.
* @param offset offset of log entry
* @param entryId opaque ID of log entry
* @param envelope opaque contents of log entry
*/
final class LedgerRecord(
val offset: Offset,
val entryId: Bytes,
val envelope: Bytes
) extends LedgerEntry
object LedgerRecord {
def apply(offset: Offset, entryId: Bytes, envelope: Bytes): LedgerRecord =
new LedgerRecord(offset, entryId, envelope)
def unapply(record: LedgerRecord): Option[(Offset, Bytes, Bytes)] =
Some((record.offset, record.entryId, record.envelope))
}
}

View File

@ -23,13 +23,13 @@ trait LedgerReader extends ReportsHealth {
* *
* In case an offset is not specified, all updates must be streamed from the oldest known state. * In case an offset is not specified, all updates must be streamed from the oldest known state.
* Each update is defined either as an opaque log entry ID and an * Each update is defined either as an opaque log entry ID and an
* envelope ([[com.daml.ledger.participant.state.kvutils.api.LedgerEntry.LedgerRecord]]). * envelope ([[com.daml.ledger.participant.state.kvutils.api.LedgerRecord]]).
* *
* @param startExclusive offset right after which updates must be streamed; in case not specified updates * @param startExclusive offset right after which updates must be streamed; in case not specified updates
* must be returned from the beginning * must be returned from the beginning
* @return stream of updates * @return stream of updates
*/ */
def events(startExclusive: Option[Offset]): Source[LedgerEntry, NotUsed] def events(startExclusive: Option[Offset]): Source[LedgerRecord, NotUsed]
/** /**
* Get the ledger's ID from which this reader instance streams events. * Get the ledger's ID from which this reader instance streams events.
@ -45,7 +45,7 @@ object LedgerReader {
/** /**
* Default initial ledger configuration used by [[KeyValueParticipantStateReader]]. * Default initial ledger configuration used by [[KeyValueParticipantStateReader]].
*/ */
val DefaultConfiguration = Configuration( val DefaultConfiguration: Configuration = Configuration(
generation = 0, generation = 0,
timeModel = TimeModel.reasonableDefault, timeModel = TimeModel.reasonableDefault,
maxDeduplicationTime = Duration.ofDays(1), maxDeduplicationTime = Duration.ofDays(1),

View File

@ -0,0 +1,16 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils.api
import com.daml.ledger.participant.state.kvutils.Bytes
import com.daml.ledger.participant.state.v1.Offset
/**
* A log entry read from the ledger.
*
* @param offset offset of log entry
* @param entryId opaque ID of log entry
* @param envelope opaque contents of log entry
*/
final case class LedgerRecord(offset: Offset, entryId: Bytes, envelope: Bytes)

View File

@ -10,7 +10,6 @@ import akka.NotUsed
import akka.stream.scaladsl.{Sink, Source} import akka.stream.scaladsl.{Sink, Source}
import com.daml.ledger.participant.state.kvutils.DamlKvutils._ import com.daml.ledger.participant.state.kvutils.DamlKvutils._
import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantStateReaderSpec._ import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantStateReaderSpec._
import com.daml.ledger.participant.state.kvutils.api.LedgerEntry.LedgerRecord
import com.daml.ledger.participant.state.kvutils.{Bytes, Envelope, KVOffset} import com.daml.ledger.participant.state.kvutils.{Bytes, Envelope, KVOffset}
import com.daml.ledger.participant.state.v1.{Offset, Update} import com.daml.ledger.participant.state.v1.{Offset, Update}
import com.digitalasset.ledger.api.testing.utils.AkkaBeforeAndAfterAll import com.digitalasset.ledger.api.testing.utils.AkkaBeforeAndAfterAll
@ -108,7 +107,7 @@ class KeyValueParticipantStateReaderSpec
LedgerRecord(toOffset(3), aLogEntryId(3), aWrappedLogEntry) LedgerRecord(toOffset(3), aLogEntryId(3), aWrappedLogEntry)
) )
def getInstance(offset: Option[Offset], items: LedgerEntry*) = def getInstance(offset: Option[Offset], items: LedgerRecord*) =
new KeyValueParticipantStateReader(readerStreamingFrom(offset = offset, items: _*)) new KeyValueParticipantStateReader(readerStreamingFrom(offset = offset, items: _*))
val instances = records.tails.flatMap { val instances = records.tails.flatMap {
@ -182,14 +181,14 @@ object KeyValueParticipantStateReaderSpec {
.build .build
.toByteString .toByteString
private def readerStreamingFrom(offset: Option[Offset], items: LedgerEntry*): LedgerReader = { private def readerStreamingFrom(offset: Option[Offset], items: LedgerRecord*): LedgerReader = {
val reader = mock[LedgerReader] val reader = mock[LedgerReader]
val stream = Source.fromIterator(() => items.iterator) val stream = Source.fromIterator(() => items.iterator)
when(reader.events(offset)).thenReturn(stream) when(reader.events(offset)).thenReturn(stream)
reader reader
} }
private def readerStreamingFromAnyOffset(items: LedgerEntry*): LedgerReader = { private def readerStreamingFromAnyOffset(items: LedgerRecord*): LedgerReader = {
val reader = mock[LedgerReader] val reader = mock[LedgerReader]
val stream = Source.fromIterator(() => items.iterator) val stream = Source.fromIterator(() => items.iterator)
when(reader.events(any[Option[Offset]]())).thenReturn(stream) when(reader.events(any[Option[Offset]]())).thenReturn(stream)