mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-19 16:57:40 +03:00
use domain.Offset instead of String in ContractsFetch functions (#12160)
* Initial commit CHANGELOG_BEGIN CHANGELOG_END * formatting fix * fix up graph DSL whitespace Co-authored-by: Stephen Compall <stephen.compall@daml.com>
This commit is contained in:
parent
42adfdc857
commit
4c8be783b2
@ -12,7 +12,6 @@ import util.{AbsoluteBookmark, BeginBookmark, ContractStreamStep, InsertDeleteSt
|
||||
import util.IdentifierConverters.apiIdentifier
|
||||
import com.daml.ledger.api.v1.transaction.Transaction
|
||||
import com.daml.ledger.api.{v1 => lav1}
|
||||
import scalaz.syntax.tag._
|
||||
|
||||
private[daml] object AcsTxStreams {
|
||||
import util.AkkaStreamsDoobie.{last, max, project2}
|
||||
@ -43,16 +42,16 @@ private[daml] object AcsTxStreams {
|
||||
): Graph[FanOutShape2[
|
||||
lav1.active_contracts_service.GetActiveContractsResponse,
|
||||
ContractStreamStep.LAV1,
|
||||
BeginBookmark[String],
|
||||
BeginBookmark[domain.Offset],
|
||||
], NotUsed] =
|
||||
GraphDSL.create() { implicit b =>
|
||||
import GraphDSL.Implicits._
|
||||
import ContractStreamStep.{LiveBegin, Acs}
|
||||
type Off = BeginBookmark[String]
|
||||
type Off = BeginBookmark[domain.Offset]
|
||||
val acs = b add acsAndBoundary
|
||||
val dupOff = b add Broadcast[Off](2)
|
||||
val liveStart = Flow fromFunction { off: Off =>
|
||||
LiveBegin(domain.Offset.tag.subst(off))
|
||||
LiveBegin(off)
|
||||
}
|
||||
val txns = b add transactionsFollowingBoundary(transactionsSince)
|
||||
val allSteps = b add Concat[ContractStreamStep.LAV1](3)
|
||||
@ -72,15 +71,17 @@ private[daml] object AcsTxStreams {
|
||||
private[this] def acsAndBoundary
|
||||
: Graph[FanOutShape2[lav1.active_contracts_service.GetActiveContractsResponse, Seq[
|
||||
lav1.event.CreatedEvent,
|
||||
], BeginBookmark[String]], NotUsed] =
|
||||
], BeginBookmark[domain.Offset]], NotUsed] =
|
||||
GraphDSL.create() { implicit b =>
|
||||
import GraphDSL.Implicits._
|
||||
import lav1.active_contracts_service.{GetActiveContractsResponse => GACR}
|
||||
val dup = b add Broadcast[GACR](2)
|
||||
val acs = b add (Flow fromFunction ((_: GACR).activeContracts))
|
||||
val off = b add Flow[GACR]
|
||||
.collect { case gacr if gacr.offset.nonEmpty => AbsoluteBookmark(gacr.offset) }
|
||||
.via(last(LedgerBegin: BeginBookmark[String]))
|
||||
.collect {
|
||||
case gacr if gacr.offset.nonEmpty => AbsoluteBookmark(domain.Offset(gacr.offset))
|
||||
}
|
||||
.via(last(LedgerBegin: BeginBookmark[domain.Offset]))
|
||||
discard { dup ~> acs }
|
||||
discard { dup ~> off }
|
||||
new FanOutShape2(dup.in, acs.out, off.out)
|
||||
@ -93,29 +94,26 @@ private[daml] object AcsTxStreams {
|
||||
private[daml] def transactionsFollowingBoundary(
|
||||
transactionsSince: lav1.ledger_offset.LedgerOffset => Source[Transaction, NotUsed]
|
||||
): Graph[FanOutShape2[
|
||||
BeginBookmark[String],
|
||||
BeginBookmark[domain.Offset],
|
||||
ContractStreamStep.Txn.LAV1,
|
||||
BeginBookmark[String],
|
||||
BeginBookmark[domain.Offset],
|
||||
], NotUsed] =
|
||||
GraphDSL.create() { implicit b =>
|
||||
import GraphDSL.Implicits._
|
||||
type Off = BeginBookmark[String]
|
||||
type Off = BeginBookmark[domain.Offset]
|
||||
val dupOff = b add Broadcast[Off](2)
|
||||
val mergeOff = b add Concat[Off](2)
|
||||
val txns = Flow[Off]
|
||||
.flatMapConcat(off => transactionsSince(domain.Offset.tag.subst(off).toLedgerApi))
|
||||
.flatMapConcat(off => transactionsSince(off.toLedgerApi))
|
||||
.map(transactionToInsertsAndDeletes)
|
||||
val txnSplit = b add project2[ContractStreamStep.Txn.LAV1, domain.Offset]
|
||||
import domain.Offset.`Offset ordering`
|
||||
val lastTxOff = b add last(LedgerBegin: Off)
|
||||
type EndoBookmarkFlow[A] = Flow[BeginBookmark[A], BeginBookmark[A], NotUsed]
|
||||
val maxOff = b add domain.Offset.tag.unsubst[EndoBookmarkFlow, String](
|
||||
max(LedgerBegin: BeginBookmark[domain.Offset])
|
||||
)
|
||||
val maxOff = b add max(LedgerBegin: Off)
|
||||
// format: off
|
||||
discard { txnSplit.in <~ txns <~ dupOff }
|
||||
discard { dupOff ~> mergeOff ~> maxOff }
|
||||
discard { txnSplit.out1.map(off => AbsoluteBookmark(off.unwrap)) ~> lastTxOff ~> mergeOff }
|
||||
discard { dupOff ~> mergeOff ~> maxOff }
|
||||
discard { txnSplit.out1.map(off => AbsoluteBookmark(off)) ~> lastTxOff ~> mergeOff }
|
||||
// format: on
|
||||
new FanOutShape2(dupOff.in, txnSplit.out0, maxOff.out)
|
||||
}
|
||||
|
@ -293,7 +293,7 @@ private class ContractsFetch(
|
||||
val graph = RunnableGraph.fromGraph(
|
||||
GraphDSL.create(
|
||||
Sink.queue[ConnectionIO[Unit]](),
|
||||
Sink.last[BeginBookmark[String]],
|
||||
Sink.last[BeginBookmark[domain.Offset]],
|
||||
)(Keep.both) { implicit builder => (acsSink, offsetSink) =>
|
||||
import GraphDSL.Implicits._
|
||||
|
||||
@ -319,7 +319,7 @@ private class ContractsFetch(
|
||||
|
||||
case (AbsoluteBookmark(_), _) | (LedgerBegin, true) =>
|
||||
val stepsAndOffset = builder add transactionsFollowingBoundary(txnK)
|
||||
stepsAndOffset.in <~ Source.single(domain.Offset.tag.unsubst(startOffset))
|
||||
stepsAndOffset.in <~ Source.single(startOffset)
|
||||
(
|
||||
(stepsAndOffset: FanOutShape2[_, ContractStreamStep.LAV1, _]).out0,
|
||||
stepsAndOffset.out1,
|
||||
@ -343,9 +343,7 @@ private class ContractsFetch(
|
||||
for {
|
||||
_ <- sinkCioSequence_(acsQueue)
|
||||
offset0 <- connectionIOFuture(lastOffsetFuture)
|
||||
offsetOrError <- (domain.Offset.tag.subst(offset0) max AbsoluteBookmark(
|
||||
absEnd.toDomain
|
||||
)) match {
|
||||
offsetOrError <- offset0 max AbsoluteBookmark(absEnd.toDomain) match {
|
||||
case ab @ AbsoluteBookmark(newOffset) =>
|
||||
ContractDao
|
||||
.updateOffset(parties, templateId, newOffset, offsets)
|
||||
|
@ -31,7 +31,7 @@ import scalaz.std.option._
|
||||
import scalaz.syntax.show._
|
||||
import scalaz.syntax.std.option._
|
||||
import scalaz.syntax.traverse._
|
||||
import scalaz.{-\/, OneAnd, OptionT, Show, Tag, \/, \/-}
|
||||
import scalaz.{-\/, OneAnd, OptionT, Show, \/, \/-}
|
||||
import spray.json.JsValue
|
||||
|
||||
import scala.collection.compat._
|
||||
@ -557,7 +557,7 @@ class ContractsService(
|
||||
val contractsAndBoundary = startOffset.cata(
|
||||
so =>
|
||||
Source
|
||||
.single(Tag unsubst AbsoluteBookmark(so.offset))
|
||||
.single(AbsoluteBookmark(so.offset))
|
||||
.viaMat(transactionsFollowingBoundary(transactionsSince).divertToHead)(Keep.right),
|
||||
source.viaMat(acsFollowingAndBoundary(transactionsSince).divertToHead)(Keep.right),
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user