mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 09:17:43 +03:00
add offset to live events; remove liveness marker (#4593)
* move BeginBookmark to util * adding offsets to steps * offsetAfter belongs in Txn, not InsertDeleteStep * make transaction stream a ContractStreamStep.Txn stream * add several ContractStreamStep append cases * rewrite 'render' to emit offset in the right places * make ContractStreamStep#append total again * check for offset in a few tests * revert useless whitespace changes * missed argument * simpler mapPreservingIds * rewrite states for new "live" format * remove invalidated "events" block structure assertions * make shutdown in withHttpService deterministic, to try to catch race condition * exhaustiveness checking somehow disabled; fixed fetch flow and all is well * documentation and changelog CHANGELOG_BEGIN - [JSON API - Experimental] Remove ``{"live": true}`` marker from websocket streams; instead, live data is indicated by the presence of an "offset". See `issue #4593 <https://github.com/digital-asset/daml/pull/4593>`_. CHANGELOG_END * be more specific about what liveness marker may be in docs * fix daml2ts websocket tests * mention type rules for all cases in offset documentation
This commit is contained in:
parent
6685becccc
commit
79c6ee7339
@ -932,41 +932,40 @@ output a series of JSON documents, each ``payload`` formatted according
|
||||
to :doc:`lf-value-specification`::
|
||||
|
||||
{
|
||||
"events": [
|
||||
{
|
||||
"created": {
|
||||
"events": [{
|
||||
"created": {
|
||||
"observers": [],
|
||||
"agreementText": "",
|
||||
"payload": {
|
||||
"observers": [],
|
||||
"agreementText": "",
|
||||
"payload": {
|
||||
"observers": [],
|
||||
"issuer": "Alice",
|
||||
"amount": "999.99",
|
||||
"currency": "USD",
|
||||
"owner": "Alice"
|
||||
},
|
||||
"signatories": [
|
||||
"Alice"
|
||||
],
|
||||
"contractId": "#1:0",
|
||||
"templateId": "b70bbfbc77a4790f66d4840cb19f657dd20848f5e2f64e39ad404a6cbd98cf75:Iou:Iou"
|
||||
"issuer": "Alice",
|
||||
"amount": "999.99",
|
||||
"currency": "USD",
|
||||
"owner": "Alice"
|
||||
},
|
||||
"matchedQueries": [
|
||||
1,
|
||||
2
|
||||
]
|
||||
}
|
||||
]
|
||||
"signatories": ["Alice"],
|
||||
"contractId": "#1:0",
|
||||
"templateId": "eb3b150383a979d6765b8570a17dd24ae8d8b63418ee5fd20df20ad2a1c13976:Iou:Iou"
|
||||
},
|
||||
"matchedQueries": [1, 2]
|
||||
}]
|
||||
}
|
||||
|
||||
where ``matchedQueries`` indicates the 0-based indices into the request
|
||||
list of queries that matched this contract.
|
||||
|
||||
When the stream reaches the end of contracts that existed when the
|
||||
request started, you'll receive a special message indicating the start
|
||||
of "live" updates. For example, you might use it to turn off an initial
|
||||
"loading" indicator::
|
||||
Every ``events`` block following the end of contracts that existed when
|
||||
the request started includes an ``offset``. The stream is guaranteed to
|
||||
send an offset immediately at the beginning of this "live" data, which
|
||||
may or may not contain any ``events``; if it does not contain events and
|
||||
no events were emitted before, it may be ``null`` or a string;
|
||||
otherwise, it will be a string. For example, you might use it to turn
|
||||
off an initial "loading" indicator::
|
||||
|
||||
{"live": true}
|
||||
{
|
||||
"events": [],
|
||||
"offset": "2"
|
||||
}
|
||||
|
||||
To keep the stream alive, you'll occasionally see messages like this,
|
||||
which can be safely ignored::
|
||||
@ -977,58 +976,45 @@ After submitting an ``Iou_Split`` exercise, which creates two contracts
|
||||
and archives the one above, the same stream will eventually produce::
|
||||
|
||||
{
|
||||
"events": [
|
||||
{
|
||||
"archived": {
|
||||
"contractId": "#1:0",
|
||||
"templateId": "b70bbfbc77a4790f66d4840cb19f657dd20848f5e2f64e39ad404a6cbd98cf75:Iou:Iou"
|
||||
}
|
||||
},
|
||||
{
|
||||
"created": {
|
||||
"observers": [],
|
||||
"agreementText": "",
|
||||
"payload": {
|
||||
"observers": [],
|
||||
"issuer": "Alice",
|
||||
"amount": "42.42",
|
||||
"currency": "USD",
|
||||
"owner": "Alice"
|
||||
},
|
||||
"signatories": [
|
||||
"Alice"
|
||||
],
|
||||
"contractId": "#2:1",
|
||||
"templateId": "b70bbfbc77a4790f66d4840cb19f657dd20848f5e2f64e39ad404a6cbd98cf75:Iou:Iou"
|
||||
},
|
||||
"matchedQueries": [
|
||||
0,
|
||||
2
|
||||
]
|
||||
},
|
||||
{
|
||||
"created": {
|
||||
"observers": [],
|
||||
"agreementText": "",
|
||||
"payload": {
|
||||
"observers": [],
|
||||
"issuer": "Alice",
|
||||
"amount": "957.57",
|
||||
"currency": "USD",
|
||||
"owner": "Alice"
|
||||
},
|
||||
"signatories": [
|
||||
"Alice"
|
||||
],
|
||||
"contractId": "#2:2",
|
||||
"templateId": "b70bbfbc77a4790f66d4840cb19f657dd20848f5e2f64e39ad404a6cbd98cf75:Iou:Iou"
|
||||
},
|
||||
"matchedQueries": [
|
||||
1,
|
||||
2
|
||||
]
|
||||
"events": [{
|
||||
"archived": {
|
||||
"contractId": "#1:0",
|
||||
"templateId": "eb3b150383a979d6765b8570a17dd24ae8d8b63418ee5fd20df20ad2a1c13976:Iou:Iou"
|
||||
}
|
||||
]
|
||||
}, {
|
||||
"created": {
|
||||
"observers": [],
|
||||
"agreementText": "",
|
||||
"payload": {
|
||||
"observers": [],
|
||||
"issuer": "Alice",
|
||||
"amount": "42.42",
|
||||
"currency": "USD",
|
||||
"owner": "Alice"
|
||||
},
|
||||
"signatories": ["Alice"],
|
||||
"contractId": "#2:1",
|
||||
"templateId": "eb3b150383a979d6765b8570a17dd24ae8d8b63418ee5fd20df20ad2a1c13976:Iou:Iou"
|
||||
},
|
||||
"matchedQueries": [0, 2]
|
||||
}, {
|
||||
"created": {
|
||||
"observers": [],
|
||||
"agreementText": "",
|
||||
"payload": {
|
||||
"observers": [],
|
||||
"issuer": "Alice",
|
||||
"amount": "957.57",
|
||||
"currency": "USD",
|
||||
"owner": "Alice"
|
||||
},
|
||||
"signatories": ["Alice"],
|
||||
"contractId": "#2:2",
|
||||
"templateId": "eb3b150383a979d6765b8570a17dd24ae8d8b63418ee5fd20df20ad2a1c13976:Iou:Iou"
|
||||
},
|
||||
"matchedQueries": [1, 2]
|
||||
}],
|
||||
"offset": "3"
|
||||
}
|
||||
|
||||
If any template IDs are found not to resolve, the first non-heartbeat
|
||||
|
@ -161,6 +161,10 @@ test('create + fetch & exercise', async () => {
|
||||
const personStream = promisifyStream(aliceLedger.streamQuery(Main.Person));
|
||||
expect(await personStream.next()).toEqual([[alice6Contract], [{created: alice6Contract}]]);
|
||||
|
||||
// end of non-live data, first offset
|
||||
expect(await personStream.next()).toEqual([[alice6Contract], []]);
|
||||
expect(await alice6KeyStream.next()).toEqual([alice6Contract, []]);
|
||||
|
||||
// Bob enters the scene.
|
||||
const bob4Contract = await bobLedger.create(Main.Person, bob4);
|
||||
expect(bob4Contract.payload).toEqual(bob4);
|
||||
|
@ -28,7 +28,7 @@ import com.digitalasset.http.json.JsonProtocol.LfValueDatabaseCodec.{
|
||||
apiValueToJsValue => lfValueToDbJsValue,
|
||||
}
|
||||
import com.digitalasset.http.util.IdentifierConverters.apiIdentifier
|
||||
import util.{ContractStreamStep, InsertDeleteStep}
|
||||
import util.{AbsoluteBookmark, BeginBookmark, ContractStreamStep, InsertDeleteStep, LedgerBegin}
|
||||
import com.digitalasset.util.ExceptionOps._
|
||||
import com.digitalasset.jwt.domain.Jwt
|
||||
import com.digitalasset.ledger.api.v1.transaction.Transaction
|
||||
@ -193,12 +193,14 @@ private class ContractsFetch(
|
||||
transactionFilter(party, List(templateId)),
|
||||
true,
|
||||
)
|
||||
(stepsAndOffset.out0.map(_.toInsertDelete).outlet, stepsAndOffset.out1)
|
||||
(stepsAndOffset.out0, stepsAndOffset.out1)
|
||||
|
||||
case AbsoluteBookmark(_) =>
|
||||
val stepsAndOffset = builder add transactionsFollowingBoundary(txnK)
|
||||
stepsAndOffset.in <~ Source.single(domain.Offset.tag.unsubst(offset))
|
||||
(stepsAndOffset.out0, stepsAndOffset.out1)
|
||||
(
|
||||
(stepsAndOffset: FanOutShape2[_, ContractStreamStep.LAV1, _]).out0,
|
||||
stepsAndOffset.out1)
|
||||
}
|
||||
|
||||
val transactInsertsDeletes = Flow
|
||||
@ -206,7 +208,7 @@ private class ContractsFetch(
|
||||
.conflate(_ append _)
|
||||
.map(insertAndDelete)
|
||||
|
||||
idses ~> transactInsertsDeletes ~> acsSink
|
||||
idses.map(_.toInsertDelete) ~> transactInsertsDeletes ~> acsSink
|
||||
lastOff ~> offsetSink
|
||||
|
||||
ClosedShape
|
||||
@ -235,24 +237,6 @@ private[http] object ContractsFetch {
|
||||
|
||||
type PreInsertContract = DBContract[TemplateId.RequiredPkg, JsValue, JsValue, Seq[domain.Party]]
|
||||
|
||||
sealed abstract class BeginBookmark[+Off] extends Product with Serializable {
|
||||
import lav1.ledger_offset.LedgerOffset
|
||||
import LedgerOffset.{LedgerBoundary, Value}
|
||||
import Value.Boundary
|
||||
def toLedgerApi(implicit ev: Off <~< domain.Offset): LedgerOffset =
|
||||
this match {
|
||||
case AbsoluteBookmark(offset) => domain.Offset.toLedgerApi(ev(offset))
|
||||
case LedgerBegin => LedgerOffset(Boundary(LedgerBoundary.LEDGER_BEGIN))
|
||||
}
|
||||
|
||||
def toOption: Option[Off] = this match {
|
||||
case AbsoluteBookmark(offset) => Some(offset)
|
||||
case LedgerBegin => None
|
||||
}
|
||||
}
|
||||
final case class AbsoluteBookmark[+Off](offset: Off) extends BeginBookmark[Off]
|
||||
case object LedgerBegin extends BeginBookmark[Nothing]
|
||||
|
||||
def partition[A, B]: Graph[FanOutShape2[A \/ B, A, B], NotUsed] =
|
||||
GraphDSL.create() { implicit b =>
|
||||
import GraphDSL.Implicits._
|
||||
@ -331,15 +315,21 @@ private[http] object ContractsFetch {
|
||||
NotUsed] =
|
||||
GraphDSL.create() { implicit b =>
|
||||
import GraphDSL.Implicits._
|
||||
import ContractStreamStep.{LiveBegin, acs => Acs, Txn}
|
||||
import ContractStreamStep.{LiveBegin, Acs}
|
||||
type Off = BeginBookmark[String]
|
||||
val acs = b add acsAndBoundary
|
||||
val dupOff = b add Broadcast[Off](2)
|
||||
val liveStart = Flow fromFunction { off: Off =>
|
||||
LiveBegin(domain.Offset.tag.subst(off))
|
||||
}
|
||||
val txns = b add transactionsFollowingBoundary(transactionsSince)
|
||||
val allSteps = b add Concat[ContractStreamStep.LAV1](3)
|
||||
// format: off
|
||||
discard { acs.out0.map(ces => Acs(ces.toVector)) ~> allSteps }
|
||||
discard { Source.single(LiveBegin) ~> allSteps }
|
||||
discard { txns.out0.map(Txn(_)) ~> allSteps }
|
||||
discard { acs.out1 ~> txns.in }
|
||||
discard { dupOff <~ acs.out1 }
|
||||
discard { acs.out0.map(ces => Acs(ces.toVector)) ~> allSteps }
|
||||
discard { dupOff ~> liveStart ~> allSteps }
|
||||
discard { txns.out0 ~> allSteps }
|
||||
discard { dupOff ~> txns.in }
|
||||
// format: on
|
||||
new FanOutShape2(acs.in, allSteps.out, txns.out1)
|
||||
}
|
||||
@ -353,7 +343,7 @@ private[http] object ContractsFetch {
|
||||
): Graph[
|
||||
FanOutShape2[
|
||||
BeginBookmark[String],
|
||||
InsertDeleteStep.LAV1,
|
||||
ContractStreamStep.Txn.LAV1,
|
||||
BeginBookmark[String],
|
||||
],
|
||||
NotUsed] =
|
||||
@ -365,7 +355,7 @@ private[http] object ContractsFetch {
|
||||
val txns = Flow[Off]
|
||||
.flatMapConcat(off => transactionsSince(domain.Offset.tag.subst(off).toLedgerApi))
|
||||
.map(transactionToInsertsAndDeletes)
|
||||
val txnSplit = b add project2[InsertDeleteStep.LAV1, domain.Offset]
|
||||
val txnSplit = b add project2[ContractStreamStep.Txn.LAV1, domain.Offset]
|
||||
val lastOff = b add last(LedgerBegin: Off)
|
||||
// format: off
|
||||
discard { txnSplit.in <~ txns <~ dupOff }
|
||||
@ -375,9 +365,6 @@ private[http] object ContractsFetch {
|
||||
new FanOutShape2(dupOff.in, txnSplit.out0, lastOff.out)
|
||||
}
|
||||
|
||||
private[this] def createdEventsIDS[C](seq: Seq[C]): InsertDeleteStep[Nothing, C] =
|
||||
InsertDeleteStep(seq.toVector, Map.empty)
|
||||
|
||||
/** Split a series of ACS responses into two channels: one with contracts, the
|
||||
* other with a single result, the last offset.
|
||||
*/
|
||||
@ -404,9 +391,9 @@ private[http] object ContractsFetch {
|
||||
|
||||
private def transactionToInsertsAndDeletes(
|
||||
tx: lav1.transaction.Transaction,
|
||||
): (InsertDeleteStep.LAV1, domain.Offset) = {
|
||||
): (ContractStreamStep.Txn.LAV1, domain.Offset) = {
|
||||
val offset = domain.Offset.fromLedgerApi(tx)
|
||||
(partitionInsertsDeletes(tx.events), offset)
|
||||
(ContractStreamStep.Txn(partitionInsertsDeletes(tx.events), offset), offset)
|
||||
}
|
||||
|
||||
private def surrogateTemplateIds[K <: TemplateId.RequiredPkg](
|
||||
|
@ -22,7 +22,6 @@ import Liskov.<~<
|
||||
import com.digitalasset.http.query.ValuePredicate
|
||||
import scalaz.syntax.bifunctor._
|
||||
import scalaz.syntax.show._
|
||||
import scalaz.syntax.tag._
|
||||
import scalaz.syntax.std.boolean._
|
||||
import scalaz.syntax.std.option._
|
||||
import scalaz.syntax.traverse._
|
||||
@ -56,25 +55,29 @@ object WebSocketService {
|
||||
errors: Seq[ServerError],
|
||||
step: ContractStreamStep[domain.ArchivedContract, (domain.ActiveContract[LfV], Pos)]) {
|
||||
import json.JsonProtocol._, spray.json._
|
||||
def render(implicit lfv: LfV <~< JsValue, pos: Pos <~< Map[String, JsValue]): JsValue =
|
||||
step match {
|
||||
case ContractStreamStep.LiveBegin => liveMarker
|
||||
case _ =>
|
||||
def inj[V: JsonWriter](ctor: String, v: V) = JsObject(ctor -> v.toJson)
|
||||
val InsertDeleteStep(inserts, deletes) =
|
||||
Liskov
|
||||
.lift2[StepAndErrors, Pos, Map[String, JsValue], LfV, JsValue](pos, lfv)(this)
|
||||
.step
|
||||
.toInsertDelete
|
||||
val events = JsArray(
|
||||
deletes.valuesIterator.map(inj("archived", _)).toVector
|
||||
++ inserts.map {
|
||||
case (ac, pos) =>
|
||||
val acj = inj("created", ac)
|
||||
acj copy (fields = acj.fields ++ pos)
|
||||
} ++ errors.map(e => inj("error", e.message)))
|
||||
JsObject(("events", events))
|
||||
def render(implicit lfv: LfV <~< JsValue, pos: Pos <~< Map[String, JsValue]): JsValue = {
|
||||
import ContractStreamStep._
|
||||
def inj[V: JsonWriter](ctor: String, v: V) = JsObject(ctor -> v.toJson)
|
||||
val InsertDeleteStep(inserts, deletes) =
|
||||
Liskov
|
||||
.lift2[StepAndErrors, Pos, Map[String, JsValue], LfV, JsValue](pos, lfv)(this)
|
||||
.step
|
||||
.toInsertDelete
|
||||
|
||||
val events = (deletes.valuesIterator.map(inj("archived", _)).toVector
|
||||
++ inserts.map {
|
||||
case (ac, pos) =>
|
||||
val acj = inj("created", ac)
|
||||
acj copy (fields = acj.fields ++ pos)
|
||||
} ++ errors.map(e => inj("error", e.message)))
|
||||
val offsetAfter = step match {
|
||||
case Acs(_) => None
|
||||
case LiveBegin(off) =>
|
||||
Some(off.toOption.cata(o => JsString(domain.Offset.unwrap(o)), JsNull: JsValue))
|
||||
case Txn(_, off) => Some(JsString(domain.Offset.unwrap(off)))
|
||||
}
|
||||
JsObject(Map("events" -> JsArray(events)) ++ offsetAfter.map("offset" -> _).toList)
|
||||
}
|
||||
|
||||
def append[P >: Pos, A >: LfV](o: StepAndErrors[P, A]): StepAndErrors[P, A] =
|
||||
StepAndErrors(errors ++ o.errors, step append o.step)
|
||||
@ -94,7 +97,7 @@ object WebSocketService {
|
||||
.batchWeighted(
|
||||
max = maxCost,
|
||||
costFn = {
|
||||
case StepAndErrors(_, ContractStreamStep.LiveBegin) =>
|
||||
case StepAndErrors(_, ContractStreamStep.LiveBegin(_)) =>
|
||||
// this is how we avoid conflating LiveBegin
|
||||
maxCost
|
||||
case StepAndErrors(errors, step) =>
|
||||
@ -330,19 +333,24 @@ class WebSocketService(
|
||||
|
||||
private def removePhantomArchives_[A, B]
|
||||
: Flow[StepAndErrors[A, B], StepAndErrors[A, B], NotUsed] = {
|
||||
import ContractStreamStep.{LiveBegin, Txn}
|
||||
import ContractStreamStep.{LiveBegin, Txn, Acs}
|
||||
Flow[StepAndErrors[A, B]]
|
||||
.scan((Set.empty[String], Option.empty[StepAndErrors[A, B]])) {
|
||||
case ((s0, _), a0 @ StepAndErrors(_, Txn(idstep))) =>
|
||||
val newInserts: Vector[String] = idstep.inserts.map(_._1.contractId.unwrap)
|
||||
case ((s0, _), a0 @ StepAndErrors(_, Txn(idstep, _))) =>
|
||||
val newInserts: Vector[String] =
|
||||
domain.ContractId.unsubst(idstep.inserts.map(_._1.contractId))
|
||||
val (deletesToEmit, deletesToHold) = s0 partition idstep.deletes.keySet
|
||||
val s1: Set[String] = deletesToHold ++ newInserts
|
||||
val a1 = a0.copy(
|
||||
step = a0.step.mapStep(_ copy (deletes = idstep.deletes filterKeys deletesToEmit)))
|
||||
val a1 = a0.copy(step = a0.step.mapDeletes(_ filterKeys deletesToEmit))
|
||||
|
||||
(s1, if (a1.nonEmpty) Some(a1) else None)
|
||||
|
||||
case ((s0, _), a0 @ StepAndErrors(_, LiveBegin)) =>
|
||||
case ((deletesToHold, _), a0 @ StepAndErrors(_, Acs(inserts))) =>
|
||||
val newInserts: Vector[String] = domain.ContractId.unsubst(inserts.map(_._1.contractId))
|
||||
val s1: Set[String] = deletesToHold ++ newInserts
|
||||
(s1, Some(a0))
|
||||
|
||||
case ((s0, _), a0 @ StepAndErrors(_, LiveBegin(_))) =>
|
||||
(s0, Some(a0))
|
||||
}
|
||||
.collect { case (_, Some(x)) => x }
|
||||
@ -376,14 +384,12 @@ class WebSocketService(
|
||||
.liftErr(ServerError)
|
||||
.flatMap(_.traverse(apiValueToLfValue).liftErr(ServerError)),
|
||||
)
|
||||
StepAndErrors(
|
||||
errors ++ aerrors,
|
||||
dstep mapStep (insDel =>
|
||||
insDel copy (inserts = (insDel.inserts: Vector[domain.ActiveContract[LfV]]).flatMap {
|
||||
ac =>
|
||||
fn(ac).map((ac, _)).toList
|
||||
}))
|
||||
)
|
||||
StepAndErrors(errors ++ aerrors, dstep mapInserts {
|
||||
inserts: Vector[domain.ActiveContract[LfV]] =>
|
||||
inserts.flatMap { ac =>
|
||||
fn(ac).map((ac, _)).toList
|
||||
}
|
||||
})
|
||||
}
|
||||
.via(conflation)
|
||||
.map(_ mapLfv lfValueToJsValue)
|
||||
|
@ -0,0 +1,26 @@
|
||||
// Copyright (c) 2020 The DAML Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.digitalasset.http
|
||||
package util
|
||||
|
||||
import com.digitalasset.ledger.api.v1.ledger_offset.LedgerOffset
|
||||
import LedgerOffset.{LedgerBoundary, Value}
|
||||
import Value.Boundary
|
||||
|
||||
import scalaz.Liskov.<~<
|
||||
|
||||
private[http] sealed abstract class BeginBookmark[+Off] extends Product with Serializable {
|
||||
def toLedgerApi(implicit ev: Off <~< domain.Offset): LedgerOffset =
|
||||
this match {
|
||||
case AbsoluteBookmark(offset) => domain.Offset.toLedgerApi(ev(offset))
|
||||
case LedgerBegin => LedgerOffset(Boundary(LedgerBoundary.LEDGER_BEGIN))
|
||||
}
|
||||
|
||||
def toOption: Option[Off] = this match {
|
||||
case AbsoluteBookmark(offset) => Some(offset)
|
||||
case LedgerBegin => None
|
||||
}
|
||||
}
|
||||
private[http] final case class AbsoluteBookmark[+Off](offset: Off) extends BeginBookmark[Off]
|
||||
private[http] case object LedgerBegin extends BeginBookmark[Nothing]
|
@ -4,6 +4,7 @@
|
||||
package com.digitalasset.http
|
||||
package util
|
||||
|
||||
import Collections._
|
||||
import InsertDeleteStep.{Cid, Inserts}
|
||||
|
||||
import scalaz.\/
|
||||
@ -16,48 +17,69 @@ private[http] sealed abstract class ContractStreamStep[+D, +C] extends Product w
|
||||
import ContractStreamStep._
|
||||
|
||||
def toInsertDelete: InsertDeleteStep[D, C] = this match {
|
||||
case LiveBegin => InsertDeleteStep(Vector.empty, Map.empty)
|
||||
case Txn(step) => step
|
||||
case Acs(inserts) => InsertDeleteStep(inserts, Map.empty)
|
||||
case LiveBegin(_) => InsertDeleteStep(Vector.empty, Map.empty)
|
||||
case Txn(step, _) => step
|
||||
}
|
||||
|
||||
/** Forms a monoid with 0 = LiveBegin */
|
||||
def append[DD >: D, CC >: C: Cid](o: ContractStreamStep[DD, CC]): ContractStreamStep[DD, CC] =
|
||||
(this, o) match {
|
||||
case (_, LiveBegin) => this
|
||||
case (LiveBegin, _) => o
|
||||
case _ => Txn(toInsertDelete append o.toInsertDelete)
|
||||
case (Acs(inserts), Acs(oinserts)) => Acs(inserts ++ oinserts)
|
||||
case (Acs(_), LiveBegin(AbsoluteBookmark(off))) =>
|
||||
Txn(toInsertDelete, off)
|
||||
case (Acs(_) | Txn(_, _), Txn(ostep, off)) =>
|
||||
Txn(toInsertDelete append ostep, off)
|
||||
case (LiveBegin(_), Txn(_, _) | LiveBegin(_)) => o
|
||||
// the following cases should never happen in a real stream; we attempt to
|
||||
// provide definitions that make `append` totally associative, anyway
|
||||
case (Acs(_), LiveBegin(LedgerBegin)) => this
|
||||
case (LiveBegin(LedgerBegin), Acs(_)) => o
|
||||
case (LiveBegin(AbsoluteBookmark(off)), Acs(_)) => Txn(o.toInsertDelete, off)
|
||||
case (Txn(step, off), Acs(_) | LiveBegin(LedgerBegin)) =>
|
||||
Txn(step append o.toInsertDelete, off)
|
||||
case (Txn(step, _), LiveBegin(AbsoluteBookmark(off))) => Txn(step, off)
|
||||
}
|
||||
|
||||
def mapPreservingIds[CC](f: C => CC): ContractStreamStep[D, CC] =
|
||||
mapStep(_ mapPreservingIds f)
|
||||
mapInserts(_ map f)
|
||||
|
||||
@SuppressWarnings(Array("org.wartremover.warts.Any"))
|
||||
def partitionBimap[LD, DD, LC, CC, LDS, LCS](f: D => (LD \/ DD), g: C => (LC \/ CC))(
|
||||
def partitionBimap[LD, DD, LC, CC, LDS](f: D => (LD \/ DD), g: C => (LC \/ CC))(
|
||||
implicit LDS: CanBuildFrom[Map[String, D], LD, LDS],
|
||||
LCS: CanBuildFrom[Inserts[C], LC, LCS],
|
||||
): (LDS, LCS, ContractStreamStep[DD, CC]) =
|
||||
): (LDS, Inserts[LC], ContractStreamStep[DD, CC]) =
|
||||
this match {
|
||||
case LiveBegin => (LDS().result(), LCS().result(), LiveBegin)
|
||||
case Txn(step) => step partitionBimap (f, g) map (Txn(_))
|
||||
case Acs(inserts) =>
|
||||
val (lcs, ins) = inserts partitionMap g
|
||||
(LDS().result(), lcs, Acs(ins))
|
||||
case lb @ LiveBegin(_) => (LDS().result(), Inserts.empty, lb)
|
||||
case Txn(step, off) => step partitionBimap (f, g) map (Txn(_, off))
|
||||
}
|
||||
|
||||
def mapStep[DD, CC](
|
||||
f: InsertDeleteStep[D, C] => InsertDeleteStep[DD, CC]): ContractStreamStep[DD, CC] =
|
||||
def mapInserts[CC](f: Inserts[C] => Inserts[CC]): ContractStreamStep[D, CC] = this match {
|
||||
case Acs(inserts) => Acs(f(inserts))
|
||||
case lb @ LiveBegin(_) => lb
|
||||
case Txn(step, off) => Txn(step copy (inserts = f(step.inserts)), off)
|
||||
}
|
||||
|
||||
def mapDeletes[DD](f: Map[String, D] => Map[String, DD]): ContractStreamStep[DD, C] =
|
||||
this match {
|
||||
case LiveBegin => LiveBegin
|
||||
case Txn(step) => Txn(f(step))
|
||||
case acs @ Acs(_) => acs
|
||||
case lb @ LiveBegin(_) => lb
|
||||
case Txn(step, off) => Txn(step copy (deletes = f(step.deletes)), off)
|
||||
}
|
||||
|
||||
def nonEmpty: Boolean = this match {
|
||||
case LiveBegin => true // unnatural wrt `toInsertDelete`, but what nonEmpty is used for here
|
||||
case Txn(step) => step.nonEmpty
|
||||
case Acs(inserts) => inserts.nonEmpty
|
||||
case LiveBegin(_) => true // unnatural wrt `toInsertDelete`, but what nonEmpty is used for here
|
||||
case Txn(step, _) => step.nonEmpty
|
||||
}
|
||||
}
|
||||
|
||||
private[http] object ContractStreamStep extends WithLAV1[ContractStreamStep] {
|
||||
case object LiveBegin extends ContractStreamStep[Nothing, Nothing]
|
||||
final case class Txn[+D, +C](step: InsertDeleteStep[D, C]) extends ContractStreamStep[D, C]
|
||||
|
||||
def acs[C](inserts: Inserts[C]): ContractStreamStep[Nothing, C] =
|
||||
Txn(InsertDeleteStep(inserts, Map.empty))
|
||||
final case class Acs[+C](inserts: Inserts[C]) extends ContractStreamStep[Nothing, C]
|
||||
final case class LiveBegin(offset: BeginBookmark[domain.Offset])
|
||||
extends ContractStreamStep[Nothing, Nothing]
|
||||
final case class Txn[+D, +C](step: InsertDeleteStep[D, C], offsetAfter: domain.Offset)
|
||||
extends ContractStreamStep[D, C]
|
||||
object Txn extends WithLAV1[Txn]
|
||||
}
|
||||
|
@ -34,24 +34,22 @@ private[http] final case class InsertDeleteStep[+D, +C](
|
||||
def mapPreservingIds[CC](f: C => CC): InsertDeleteStep[D, CC] = copy(inserts = inserts map f)
|
||||
|
||||
/** Results undefined if cid(d) != cid(c) */
|
||||
def partitionMapPreservingIds[LC, CC, LCS](f: C => (LC \/ CC))(
|
||||
implicit LCS: CanBuildFrom[Inserts[C], LC, LCS],
|
||||
): (LCS, InsertDeleteStep[D, CC]) = {
|
||||
def partitionMapPreservingIds[LC, CC](
|
||||
f: C => (LC \/ CC)): (Inserts[LC], InsertDeleteStep[D, CC]) = {
|
||||
val (_, lcs, step) = partitionBimap(\/-(_), f)
|
||||
(lcs, step)
|
||||
}
|
||||
|
||||
/** Results undefined if cid(cc) != cid(c) */
|
||||
@SuppressWarnings(Array("org.wartremover.warts.Any"))
|
||||
def partitionBimap[LD, DD, LC, CC, LDS, LCS](f: D => (LD \/ DD), g: C => (LC \/ CC))(
|
||||
def partitionBimap[LD, DD, LC, CC, LDS](f: D => (LD \/ DD), g: C => (LC \/ CC))(
|
||||
implicit LDS: CanBuildFrom[Map[String, D], LD, LDS],
|
||||
LCS: CanBuildFrom[Inserts[C], LC, LCS],
|
||||
): (LDS, LCS, InsertDeleteStep[DD, CC]) = {
|
||||
): (LDS, Inserts[LC], InsertDeleteStep[DD, CC]) = {
|
||||
import Collections._
|
||||
import scalaz.std.tuple._, scalaz.syntax.traverse._
|
||||
val (lcs, ins) = inserts partitionMap g
|
||||
val (lds, del) = deletes partitionMap (_ traverse f)
|
||||
(lds, lcs, InsertDeleteStep(ins, del))
|
||||
(lds, lcs, copy(inserts = ins, deletes = del))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -40,6 +40,7 @@ object HttpServiceTestFixture {
|
||||
|
||||
private val doNotReloadPackages = FiniteDuration(100, DAYS)
|
||||
|
||||
@SuppressWarnings(Array("org.wartremover.warts.Any"))
|
||||
def withHttpService[A](
|
||||
testName: String,
|
||||
dars: List[File],
|
||||
@ -97,12 +98,15 @@ object HttpServiceTestFixture {
|
||||
a <- testFn(uri, encoder, decoder, client)
|
||||
} yield a
|
||||
|
||||
fa.onComplete { _ =>
|
||||
ledgerF.foreach(_._1.close())
|
||||
httpServiceF.foreach(_._1.unbind())
|
||||
fa.transformWith { ta =>
|
||||
Future
|
||||
.sequence(
|
||||
Seq(
|
||||
ledgerF.map(_._1.close()),
|
||||
httpServiceF.flatMap(_._1.unbind()),
|
||||
) map (_ fallbackTo Future.successful(())))
|
||||
.flatMap(_ => Future fromTry ta)
|
||||
}
|
||||
|
||||
fa
|
||||
}
|
||||
|
||||
def withLedger[A](
|
||||
|
@ -84,7 +84,7 @@ class WebsocketServiceIntegrationTest
|
||||
private val collectResultsAsRawString: Sink[Message, Future[Seq[String]]] =
|
||||
Flow[Message]
|
||||
.map(_.toString)
|
||||
.filterNot(v => Set("heartbeat", "live") exists (v contains _))
|
||||
.filterNot(_ contains "heartbeat")
|
||||
.toMat(Sink.seq)(Keep.right)
|
||||
|
||||
private def singleClientQueryStream(serviceUri: Uri, query: String): Source[Message, NotUsed] = {
|
||||
@ -133,9 +133,10 @@ class WebsocketServiceIntegrationTest
|
||||
.runWith(collectResultsAsRawString)
|
||||
} yield
|
||||
inside(clientMsg) {
|
||||
case Seq(result) =>
|
||||
case Seq(result, liveBegin) =>
|
||||
result should include(""""issuer":"Alice"""")
|
||||
result should include(""""amount":"999.99"""")
|
||||
liveBegin should include(""""offset":"""")
|
||||
}
|
||||
}
|
||||
|
||||
@ -148,9 +149,10 @@ class WebsocketServiceIntegrationTest
|
||||
.runWith(collectResultsAsRawString)
|
||||
} yield
|
||||
inside(clientMsg) {
|
||||
case Seq(result) =>
|
||||
case Seq(result, liveBegin) =>
|
||||
result should include(""""owner":"Alice"""")
|
||||
result should include(""""number":"abc123"""")
|
||||
liveBegin should include(""""offset":"""")
|
||||
}
|
||||
}
|
||||
|
||||
@ -164,9 +166,10 @@ class WebsocketServiceIntegrationTest
|
||||
.runWith(collectResultsAsRawString)
|
||||
} yield
|
||||
inside(clientMsg) {
|
||||
case Seq(warning, result) =>
|
||||
case Seq(warning, result, liveBegin) =>
|
||||
warning should include("\"warnings\":{\"unknownTemplateIds\":[\"Unk")
|
||||
result should include("\"issuer\":\"Alice\"")
|
||||
liveBegin should include(""""offset":"""")
|
||||
}
|
||||
}
|
||||
|
||||
@ -180,10 +183,11 @@ class WebsocketServiceIntegrationTest
|
||||
.runWith(collectResultsAsRawString)
|
||||
} yield
|
||||
inside(clientMsg) {
|
||||
case Seq(warning, result) =>
|
||||
case Seq(warning, result, liveBegin) =>
|
||||
warning should include("""{"warnings":{"unknownTemplateIds":["Unk""")
|
||||
result should include(""""owner":"Alice"""")
|
||||
result should include(""""number":"abc123"""")
|
||||
liveBegin should include(""""offset":"""")
|
||||
}
|
||||
}
|
||||
|
||||
@ -256,7 +260,10 @@ class WebsocketServiceIntegrationTest
|
||||
statusCode.isSuccess shouldBe true
|
||||
GotAcs(ctid)
|
||||
}
|
||||
case (GotAcs(ctid), Live(_, _)) => Future.successful(GotLive(ctid))
|
||||
|
||||
case (GotAcs(ctid), Offset(JsString(_), Events(JsArray(Vector()), _))) =>
|
||||
Future.successful(GotLive(ctid))
|
||||
|
||||
case (
|
||||
GotLive(consumedCtid),
|
||||
evtsWrapper @ ContractDelta(
|
||||
@ -267,20 +274,18 @@ class WebsocketServiceIntegrationTest
|
||||
Set(fstId, sndId, consumedCtid) should have size 3
|
||||
inside(evtsWrapper) {
|
||||
case JsObject(obj) =>
|
||||
inside(obj.toSeq) {
|
||||
case Seq(("events", array)) =>
|
||||
inside(array) {
|
||||
case JsArray(
|
||||
Vector(
|
||||
Archived(_, _),
|
||||
Created(IouAmount(amt1), MatchedQueries(NumList(ixes1), _)),
|
||||
Created(IouAmount(amt2), MatchedQueries(NumList(ixes2), _)))) =>
|
||||
Set((amt1, ixes1), (amt2, ixes2)) should ===(
|
||||
Set(
|
||||
(BigDecimal("42.42"), Vector(BigDecimal(0), BigDecimal(2))),
|
||||
(BigDecimal("957.57"), Vector(BigDecimal(1), BigDecimal(2))),
|
||||
))
|
||||
}
|
||||
inside(obj get "events") {
|
||||
case Some(
|
||||
JsArray(
|
||||
Vector(
|
||||
Archived(_, _),
|
||||
Created(IouAmount(amt1), MatchedQueries(NumList(ixes1), _)),
|
||||
Created(IouAmount(amt2), MatchedQueries(NumList(ixes2), _))))) =>
|
||||
Set((amt1, ixes1), (amt2, ixes2)) should ===(
|
||||
Set(
|
||||
(BigDecimal("42.42"), Vector(BigDecimal(0), BigDecimal(2))),
|
||||
(BigDecimal("957.57"), Vector(BigDecimal(1), BigDecimal(2))),
|
||||
))
|
||||
}
|
||||
}
|
||||
ShouldHaveEnded(2)
|
||||
@ -297,6 +302,7 @@ class WebsocketServiceIntegrationTest
|
||||
|
||||
"fetch should receive deltas as contracts are archived/created, filtering out phantom archives" in withHttpService {
|
||||
(uri, encoder, _) =>
|
||||
import spray.json.{JsArray, JsString}
|
||||
val templateId = domain.TemplateId(None, "Account", "Account")
|
||||
val fetchRequest = """[{"templateId": "Account:Account", "key": ["Alice", "abc123"]}]"""
|
||||
val f1 =
|
||||
@ -324,7 +330,8 @@ class WebsocketServiceIntegrationTest
|
||||
}
|
||||
}: Future[StreamState]
|
||||
|
||||
case (GotAcs(ctid), Live(_, _)) => Future.successful(GotLive(ctid))
|
||||
case (GotAcs(ctid), Offset(JsString(_), Events(JsArray(Vector()), _))) =>
|
||||
Future.successful(GotLive(ctid))
|
||||
|
||||
case (
|
||||
GotLive(archivedCid),
|
||||
@ -423,7 +430,7 @@ object WebsocketServiceIntegrationTest {
|
||||
): Option[(Vector[(String, JsValue)], Vector[domain.ArchivedContract])] =
|
||||
for {
|
||||
JsObject(eventsWrapper) <- Some(jsv)
|
||||
JsArray(sums) <- eventsWrapper.get("events") if eventsWrapper.size == 1
|
||||
JsArray(sums) <- eventsWrapper.get("events")
|
||||
pairs = sums collect { case JsObject(fields) => fields.filterKeys(tagKeys).head }
|
||||
if pairs.length == sums.length
|
||||
sets = pairs groupBy (_._1)
|
||||
@ -462,7 +469,8 @@ object WebsocketServiceIntegrationTest {
|
||||
jsv.fields get label map ((_, JsObject(jsv.fields - label)))
|
||||
}
|
||||
|
||||
private object Live extends JsoField("live")
|
||||
private object Events extends JsoField("events")
|
||||
private object Offset extends JsoField("offset")
|
||||
private object Created extends JsoField("created")
|
||||
private object Archived extends JsoField("archived")
|
||||
private object MatchedQueries extends JsoField("matchedQueries")
|
||||
|
Loading…
Reference in New Issue
Block a user