ledger-on-sql: Name the RangeSource parameters more explicitly. (#5679)

CHANGELOG_BEGIN
CHANGELOG_END
This commit is contained in:
Samir Talwar 2020-04-23 11:10:16 +02:00 committed by GitHub
parent 644d4c7512
commit 77af35f44e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 16 additions and 25 deletions

View File

@ -75,14 +75,15 @@ final class SqlLedgerReaderWriter(
dispatcher
.startingAt(
KVOffset.highestIndex(startExclusive.getOrElse(StartOffset)),
RangeSource((start, end) => {
Source
.future(Timed.value(Metrics.readLog, database.inReadTransaction("read_log") { queries =>
Future.fromTry(queries.selectFromLog(start, end))
}))
.mapConcat(identity)
.mapMaterializedValue(_ => NotUsed)
}),
RangeSource(
(startExclusive, endInclusive) =>
Source
.future(Timed.value(Metrics.readLog, database.inReadTransaction("read_log") {
queries =>
Future.fromTry(queries.selectFromLog(startExclusive, endInclusive))
}))
.mapConcat(identity)
.mapMaterializedValue(_ => NotUsed)),
)
.map { case (_, entry) => entry }

View File

@ -26,24 +26,14 @@ trait CommonQueries extends Queries {
}
override final def selectFromLog(
start: Index,
end: Index,
startExclusive: Index,
endInclusive: Index,
): Try[immutable.Seq[(Index, LedgerRecord)]] = Try {
SQL"SELECT sequence_no, entry_id, envelope FROM #$LogTable WHERE sequence_no > $start AND sequence_no <= $end ORDER BY sequence_no"
.as(
(long("sequence_no")
~ getBytes("entry_id")
~ getBytes("envelope")).map {
case index ~ entryId ~ envelope =>
index -> LedgerRecord(
KVOffset.fromLong(index),
entryId,
envelope,
)
case _ =>
throw new IllegalStateException(s"Invalid data in the $LogTable table.")
}.*,
)
SQL"SELECT sequence_no, entry_id, envelope FROM #$LogTable WHERE sequence_no > $startExclusive AND sequence_no <= $endInclusive ORDER BY sequence_no"
.as((long("sequence_no") ~ getBytes("entry_id") ~ getBytes("envelope")).map {
case index ~ entryId ~ envelope =>
index -> LedgerRecord(KVOffset.fromLong(index), entryId, envelope)
}.*)
}
override final def selectStateValuesByKeys(keys: Seq[Key]): Try[immutable.Seq[Option[Value]]] =