mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 01:07:18 +03:00
stop filtering "phantom archives" when it won't work; reenable it on request (#5003)
* ContractKeyStreamRequest with different types for whether offset-preceded or not - should replace EnrichedContractKey as the WS StreamQuery type * add the ContractKeyStreamRequest layer everywhere * split StreamQuery parse from the other steps * make StreamQuery type depend on the input JsValue * only StreamQueryReader is a typeclass now * scalafmt * wrong type arg * letting the request data and phantom archive removal choice flow - from work with @leo-da * finish threading request-derived phantom state to removal flow - from work with @leo-da * make it clear that hints are contract IDs in StreamQuery - from work with @leo-da * treat StreamQueryReader's type parameter fully as a phantom - from work with @leo-da * remove unused type alias - from work with @leo-da * test fetch resume, with and without various offsetHints * document offsetHints * missing ` in rst - thanks @hurryabit * rename offsetHint to contractIdAtOffset CHANGELOG_BEGIN - [JSON API - Experimental] New field ``contractIdAtOffset`` for fetch-by-key streams to restore proper archive filtering. See `issue #4511 <https://github.com/digital-asset/daml/issues/4511>`_. CHANGELOG_END * we never unify the two ContractKeyStreamRequest types * doc update for contractIdAtOffset
This commit is contained in:
parent
b950692db1
commit
332d35a347
@ -1295,4 +1295,32 @@ Example:
|
||||
{"templateId": "Account:Account", "key": {"_1": "Alice", "_2": "def345"}}
|
||||
]
|
||||
|
||||
The output stream has the same format as the output from the `Contracts Query Stream`_. We further guarantee that for every ``archived`` event appearing on the stream there has been a matching ``created`` event earlier in the stream.
|
||||
The output stream has the same format as the output from the `Contracts
|
||||
Query Stream`_. We further guarantee that for every ``archived`` event
|
||||
appearing on the stream there has been a matching ``created`` event
|
||||
earlier in the stream, except in the case of missing
|
||||
``contractIdAtOffset`` fields in the case described below.
|
||||
|
||||
You may supply an optional ``offset`` for the stream, exactly as with
|
||||
query streams. However, you should supply with each ``{templateId,
|
||||
key}`` pair a ``contractIdAtOffset``, which is the contract ID currently
|
||||
associated with that pair at the point of the given offset, or ``null``
|
||||
if no contract ID was associated with the pair at that offset. For
|
||||
example, with the above keys, if you had one ``"abc123"`` contract but
|
||||
no ``"def345"`` contract, you might specify:
|
||||
|
||||
.. code-block:: json
|
||||
|
||||
[
|
||||
{"templateId": "Account:Account", "key": {"_1": "Alice", "_2": "abc123"},
|
||||
"contractIdAtOffset": "#1:0"},
|
||||
{"templateId": "Account:Account", "key": {"_1": "Alice", "_2": "def345"},
|
||||
"contractIdAtOffset": null}
|
||||
]
|
||||
|
||||
If every ``contractIdAtOffset`` is specified, as is so in the example
|
||||
above, you will not receive any ``archived`` events for contracts
|
||||
created before the offset *unless* those contracts are identified in a
|
||||
``contractIdAtOffset``. By contrast, if any ``contractIdAtOffset`` is
|
||||
missing, ``archived`` event filtering will be disabled, and you will
|
||||
receive "phantom archives" as with query streams.
|
||||
|
@ -17,8 +17,6 @@ import json.JsonProtocol.LfValueCodec.{apiValueToJsValue => lfValueToJsValue}
|
||||
import query.ValuePredicate.{LfV, TypeLookup}
|
||||
import com.digitalasset.jwt.domain.Jwt
|
||||
import com.typesafe.scalalogging.LazyLogging
|
||||
import scalaz.{Liskov, NonEmptyList}
|
||||
import Liskov.<~<
|
||||
import com.digitalasset.http.query.ValuePredicate
|
||||
import scalaz.syntax.bifunctor._
|
||||
import scalaz.syntax.std.boolean._
|
||||
@ -28,8 +26,9 @@ import scalaz.std.map._
|
||||
import scalaz.std.option._
|
||||
import scalaz.std.set._
|
||||
import scalaz.std.tuple._
|
||||
import scalaz.{-\/, \/, \/-}
|
||||
import spray.json.{JsArray, JsObject, JsValue}
|
||||
import scalaz.{-\/, Foldable, Liskov, NonEmptyList, Tag, \/, \/-}
|
||||
import Liskov.<~<
|
||||
import spray.json.{JsArray, JsObject, JsValue, JsonReader}
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.util.{Failure, Success}
|
||||
@ -134,14 +133,17 @@ object WebSocketService {
|
||||
)(_ append _)
|
||||
}
|
||||
|
||||
trait StreamQuery[A] {
|
||||
sealed abstract class StreamQueryReader[A] {
|
||||
case class Query[Q](q: Q, alg: StreamQuery[Q])
|
||||
def parse(resumingAtOffset: Boolean, decoder: DomainJsonDecoder, jv: JsValue): Error \/ Query[_]
|
||||
}
|
||||
|
||||
sealed trait StreamQuery[A] {
|
||||
|
||||
/** Extra data on success of a predicate. */
|
||||
type Positive
|
||||
|
||||
def parse(decoder: DomainJsonDecoder, jv: JsValue): Error \/ A
|
||||
|
||||
def allowPhantonArchives: Boolean
|
||||
def removePhantomArchives(request: A): Option[Set[domain.ContractId]]
|
||||
|
||||
def predicate(
|
||||
request: A,
|
||||
@ -151,19 +153,21 @@ object WebSocketService {
|
||||
def renderCreatedMetadata(p: Positive): Map[String, JsValue]
|
||||
}
|
||||
|
||||
implicit val SearchForeverRequestWithStreamQuery: StreamQuery[domain.SearchForeverRequest] =
|
||||
new StreamQuery[domain.SearchForeverRequest] {
|
||||
implicit val SearchForeverRequestWithStreamQuery: StreamQueryReader[domain.SearchForeverRequest] =
|
||||
new StreamQueryReader[domain.SearchForeverRequest]
|
||||
with StreamQuery[domain.SearchForeverRequest] {
|
||||
|
||||
type Positive = NonEmptyList[Int]
|
||||
|
||||
override def parse(decoder: DomainJsonDecoder, jv: JsValue): Error \/ SearchForeverRequest = {
|
||||
override def parse(resumingAtOffset: Boolean, decoder: DomainJsonDecoder, jv: JsValue) = {
|
||||
import JsonProtocol._
|
||||
SprayJson
|
||||
.decode[SearchForeverRequest](jv)
|
||||
.liftErr(InvalidUserInput)
|
||||
.map(Query(_, this))
|
||||
}
|
||||
|
||||
override def allowPhantonArchives: Boolean = true
|
||||
override def removePhantomArchives(request: SearchForeverRequest) = None
|
||||
|
||||
override def predicate(
|
||||
request: SearchForeverRequest,
|
||||
@ -206,56 +210,81 @@ object WebSocketService {
|
||||
}
|
||||
|
||||
implicit val EnrichedContractKeyWithStreamQuery
|
||||
: StreamQuery[NonEmptyList[domain.EnrichedContractKey[LfV]]] =
|
||||
new StreamQuery[NonEmptyList[domain.EnrichedContractKey[LfV]]] {
|
||||
|
||||
type Positive = Unit
|
||||
: StreamQueryReader[domain.ContractKeyStreamRequest[_, _]] =
|
||||
new StreamQueryReader[domain.ContractKeyStreamRequest[_, _]] {
|
||||
|
||||
import JsonProtocol._
|
||||
|
||||
@SuppressWarnings(Array("org.wartremover.warts.Any"))
|
||||
override def parse(
|
||||
decoder: DomainJsonDecoder,
|
||||
jv: JsValue): Error \/ NonEmptyList[domain.EnrichedContractKey[LfV]] =
|
||||
for {
|
||||
as <- SprayJson
|
||||
.decode[NonEmptyList[domain.EnrichedContractKey[JsValue]]](jv)
|
||||
.liftErr(InvalidUserInput)
|
||||
bs = as.map(a => decodeWithFallback(decoder, a))
|
||||
} yield bs
|
||||
override def parse(resumingAtOffset: Boolean, decoder: DomainJsonDecoder, jv: JsValue) = {
|
||||
type NelCKRH[Hint, V] = NonEmptyList[domain.ContractKeyStreamRequest[Hint, V]]
|
||||
def go[Hint](alg: StreamQuery[NelCKRH[Hint, LfV]])(
|
||||
implicit ev: JsonReader[NelCKRH[Hint, JsValue]]) =
|
||||
for {
|
||||
as <- SprayJson
|
||||
.decode[NelCKRH[Hint, JsValue]](jv)
|
||||
.liftErr(InvalidUserInput)
|
||||
bs = as.map(a => decodeWithFallback(decoder, a))
|
||||
} yield Query(bs, alg)
|
||||
if (resumingAtOffset) go(ResumingEnrichedContractKeyWithStreamQuery)
|
||||
else go(InitialEnrichedContractKeyWithStreamQuery)
|
||||
}
|
||||
|
||||
private def decodeWithFallback(
|
||||
@SuppressWarnings(Array("org.wartremover.warts.Any"))
|
||||
private def decodeWithFallback[Hint](
|
||||
decoder: DomainJsonDecoder,
|
||||
a: domain.EnrichedContractKey[JsValue]): domain.EnrichedContractKey[LfV] =
|
||||
a: domain.ContractKeyStreamRequest[Hint, JsValue]) =
|
||||
decoder
|
||||
.decodeUnderlyingValuesToLf(a)
|
||||
.valueOr(_ => a.map(_ => com.digitalasset.daml.lf.value.Value.ValueUnit)) // unit will not match any key
|
||||
|
||||
override def allowPhantonArchives: Boolean = false
|
||||
|
||||
override def predicate(
|
||||
request: NonEmptyList[domain.EnrichedContractKey[LfV]],
|
||||
resolveTemplateId: PackageService.ResolveTemplateId,
|
||||
lookupType: TypeLookup): StreamPredicate[Positive] = {
|
||||
|
||||
import util.Collections._
|
||||
|
||||
val (resolvedWithKey, unresolved) =
|
||||
request.toSet.partitionMap { x: domain.EnrichedContractKey[LfV] =>
|
||||
resolveTemplateId(x.templateId).map((_, x.key)).toLeftDisjunction(x.templateId)
|
||||
}
|
||||
|
||||
val q: Map[domain.TemplateId.RequiredPkg, LfV] = resolvedWithKey.toMap
|
||||
val fn: domain.ActiveContract[LfV] => Option[Positive] = { a =>
|
||||
if (q.get(a.templateId).exists(k => domain.ActiveContract.matchesKey(k)(a)))
|
||||
Some(())
|
||||
else None
|
||||
}
|
||||
(q.keySet, unresolved, fn)
|
||||
}
|
||||
|
||||
override def renderCreatedMetadata(p: Unit) = Map.empty
|
||||
}
|
||||
|
||||
private[this] sealed abstract class EnrichedContractKeyWithStreamQuery[Cid]
|
||||
extends StreamQuery[NonEmptyList[domain.ContractKeyStreamRequest[Cid, LfV]]] {
|
||||
type Positive = Unit
|
||||
|
||||
protected type CKR[+V] = domain.ContractKeyStreamRequest[Cid, V]
|
||||
|
||||
override def predicate(
|
||||
request: NonEmptyList[CKR[LfV]],
|
||||
resolveTemplateId: PackageService.ResolveTemplateId,
|
||||
lookupType: TypeLookup): StreamPredicate[Positive] = {
|
||||
|
||||
import util.Collections._
|
||||
|
||||
val (resolvedWithKey, unresolved) =
|
||||
request.toSet.partitionMap { x: CKR[LfV] =>
|
||||
resolveTemplateId(x.ekey.templateId)
|
||||
.map((_, x.ekey.key))
|
||||
.toLeftDisjunction(x.ekey.templateId)
|
||||
}
|
||||
|
||||
val q: Map[domain.TemplateId.RequiredPkg, LfV] = resolvedWithKey.toMap
|
||||
val fn: domain.ActiveContract[LfV] => Option[Positive] = { a =>
|
||||
if (q.get(a.templateId).exists(k => domain.ActiveContract.matchesKey(k)(a)))
|
||||
Some(())
|
||||
else None
|
||||
}
|
||||
(q.keySet, unresolved, fn)
|
||||
}
|
||||
|
||||
override def renderCreatedMetadata(p: Unit) = Map.empty
|
||||
}
|
||||
|
||||
private[this] object InitialEnrichedContractKeyWithStreamQuery
|
||||
extends EnrichedContractKeyWithStreamQuery[Unit] {
|
||||
override def removePhantomArchives(request: NonEmptyList[CKR[LfV]]) = Some(Set.empty)
|
||||
}
|
||||
|
||||
private[this] object ResumingEnrichedContractKeyWithStreamQuery
|
||||
extends EnrichedContractKeyWithStreamQuery[Option[Option[domain.ContractId]]] {
|
||||
override def removePhantomArchives(request: NonEmptyList[CKR[LfV]]) = {
|
||||
@SuppressWarnings(Array("org.wartremover.warts.Any"))
|
||||
val NelO = Foldable[NonEmptyList].compose[Option]
|
||||
request traverse (_.contractIdAtOffset) map NelO.toSet
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class WebSocketService(
|
||||
@ -276,7 +305,7 @@ class WebSocketService(
|
||||
|
||||
private val numConns = new java.util.concurrent.atomic.AtomicInteger(0)
|
||||
|
||||
private[http] def transactionMessageHandler[A: StreamQuery](
|
||||
private[http] def transactionMessageHandler[A: StreamQueryReader](
|
||||
jwt: Jwt,
|
||||
jwtPayload: JwtPayload,
|
||||
): Flow[Message, Message, _] =
|
||||
@ -309,11 +338,11 @@ class WebSocketService(
|
||||
}
|
||||
|
||||
@SuppressWarnings(Array("org.wartremover.warts.Any"))
|
||||
private def wsMessageHandler[A: StreamQuery](
|
||||
private def wsMessageHandler[A: StreamQueryReader](
|
||||
jwt: Jwt,
|
||||
jwtPayload: JwtPayload,
|
||||
): Flow[Message, Message, NotUsed] = {
|
||||
val Q = implicitly[StreamQuery[A]]
|
||||
val Q = implicitly[StreamQueryReader[A]]
|
||||
Flow[Message]
|
||||
.mapAsync(1) {
|
||||
case msg: TextMessage =>
|
||||
@ -332,12 +361,14 @@ class WebSocketService(
|
||||
for {
|
||||
offPrefix <- oeso.sequence
|
||||
jv <- ejv
|
||||
a <- Q.parse(decoder, jv)
|
||||
a <- Q.parse(resumingAtOffset = offPrefix.isDefined, decoder, jv)
|
||||
} yield (offPrefix, a)
|
||||
}
|
||||
.map {
|
||||
_.flatMap {
|
||||
case (offPrefix, a) => getTransactionSourceForParty[A](jwt, jwtPayload, offPrefix, a)
|
||||
case (offPrefix, qq: Q.Query[q]) =>
|
||||
implicit val SQ: StreamQuery[q] = qq.alg
|
||||
getTransactionSourceForParty[q](jwt, jwtPayload, offPrefix, qq.q: q)
|
||||
}
|
||||
}
|
||||
.takeWhile(_.isRight, inclusive = true) // stop after emitting 1st error
|
||||
@ -361,7 +392,7 @@ class WebSocketService(
|
||||
.insertDeleteStepSource(jwt, jwtPayload.party, resolved.toList, offPrefix, Terminates.Never)
|
||||
.via(convertFilterContracts(fn))
|
||||
.filter(_.nonEmpty)
|
||||
.via(removePhantomArchives(remove = !Q.allowPhantonArchives))
|
||||
.via(removePhantomArchives(remove = Q.removePhantomArchives(request)))
|
||||
.map(_.mapPos(Q.renderCreatedMetadata).render)
|
||||
.via(renderEventsAndEmitHeartbeats) // wrong place, see https://github.com/digital-asset/daml/issues/4955
|
||||
.prepend(reportUnresolvedTemplateIds(unresolved))
|
||||
@ -390,15 +421,14 @@ class WebSocketService(
|
||||
case Some((_, a)) => renderEvents(a._1, a._2)
|
||||
}
|
||||
|
||||
private def removePhantomArchives[A, B](remove: Boolean) =
|
||||
if (remove) removePhantomArchives_[A, B]
|
||||
else Flow[StepAndErrors[A, B]]
|
||||
private def removePhantomArchives[A, B](remove: Option[Set[domain.ContractId]]) =
|
||||
remove cata (removePhantomArchives_[A, B], Flow[StepAndErrors[A, B]])
|
||||
|
||||
private def removePhantomArchives_[A, B]
|
||||
private def removePhantomArchives_[A, B](initialState: Set[domain.ContractId])
|
||||
: Flow[StepAndErrors[A, B], StepAndErrors[A, B], NotUsed] = {
|
||||
import ContractStreamStep.{LiveBegin, Txn, Acs}
|
||||
Flow[StepAndErrors[A, B]]
|
||||
.scan((Set.empty[String], Option.empty[StepAndErrors[A, B]])) {
|
||||
.scan((Tag unsubst initialState, Option.empty[StepAndErrors[A, B]])) {
|
||||
case ((s0, _), a0 @ StepAndErrors(_, Txn(idstep, _))) =>
|
||||
val newInserts: Vector[String] =
|
||||
domain.ContractId.unsubst(idstep.inserts.map(_._1.contractId))
|
||||
|
@ -13,7 +13,7 @@ import com.digitalasset.ledger.api.refinements.{ApiTypes => lar}
|
||||
import com.typesafe.scalalogging.StrictLogging
|
||||
import scalaz.syntax.std.boolean._
|
||||
import scalaz.syntax.std.option._
|
||||
import scalaz.{NonEmptyList, \/}
|
||||
import scalaz.\/
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import EndpointsCompanion._
|
||||
@ -84,7 +84,7 @@ class WebsocketEndpoints(
|
||||
payload <- preconnect(decodeJwt, upgradeReq, wsProtocol)
|
||||
(jwt, jwtPayload) = payload
|
||||
} yield
|
||||
handleWebsocketRequest[NonEmptyList[domain.EnrichedContractKey[domain.LfValue]]](
|
||||
handleWebsocketRequest[domain.ContractKeyStreamRequest[_, _]](
|
||||
jwt,
|
||||
jwtPayload,
|
||||
upgradeReq,
|
||||
@ -93,7 +93,7 @@ class WebsocketEndpoints(
|
||||
)
|
||||
}
|
||||
|
||||
def handleWebsocketRequest[A: WebSocketService.StreamQuery](
|
||||
def handleWebsocketRequest[A: WebSocketService.StreamQueryReader](
|
||||
jwt: Jwt,
|
||||
jwtPayload: domain.JwtPayload,
|
||||
req: UpgradeToWebSocket,
|
||||
|
@ -86,6 +86,11 @@ object domain {
|
||||
contractId: domain.ContractId,
|
||||
) extends ContractLocator[Nothing]
|
||||
|
||||
final case class ContractKeyStreamRequest[+Cid, +LfV](
|
||||
contractIdAtOffset: Cid,
|
||||
ekey: EnrichedContractKey[LfV],
|
||||
)
|
||||
|
||||
case class GetActiveContractsRequest(
|
||||
templateIds: Set[TemplateId.OptionalPkg],
|
||||
query: Map[String, JsValue],
|
||||
@ -416,6 +421,19 @@ object domain {
|
||||
}
|
||||
}
|
||||
|
||||
object ContractKeyStreamRequest {
|
||||
implicit def covariantR[Off]: Traverse[ContractKeyStreamRequest[Off, ?]] = {
|
||||
type F[A] = ContractKeyStreamRequest[Off, A]
|
||||
new Traverse[F] {
|
||||
override def traverseImpl[G[_]: Applicative, A, B](fa: F[A])(f: A => G[B]): G[F[B]] =
|
||||
fa.ekey traverse f map (ekey => fa copy (ekey = ekey))
|
||||
}
|
||||
}
|
||||
|
||||
implicit def hasTemplateId[Off]: HasTemplateId[ContractKeyStreamRequest[Off, ?]] =
|
||||
HasTemplateId.by[ContractKeyStreamRequest[Off, ?]](_.ekey)
|
||||
}
|
||||
|
||||
private[this] implicit final class ErrorOps[A](private val o: Option[A]) extends AnyVal {
|
||||
def required(label: String): Error \/ A =
|
||||
o toRightDisjunction Error('ErrorOps_required, s"Missing required field $label")
|
||||
@ -435,6 +453,25 @@ object domain {
|
||||
): Error \/ LfType
|
||||
}
|
||||
|
||||
object HasTemplateId {
|
||||
def by[F[_]]: By[F] = new By[F](0)
|
||||
|
||||
final class By[F[_]](private val ign: Int) extends AnyVal {
|
||||
def apply[G[_]](nt: F[_] => G[_])(implicit basis: HasTemplateId[G]): HasTemplateId[F] =
|
||||
new HasTemplateId[F] {
|
||||
override def templateId(fa: F[_]) = basis templateId nt(fa)
|
||||
|
||||
override def lfType(
|
||||
fa: F[_],
|
||||
templateId: TemplateId.RequiredPkg,
|
||||
f: PackageService.ResolveTemplateRecordType,
|
||||
g: PackageService.ResolveChoiceArgType,
|
||||
h: PackageService.ResolveKeyType,
|
||||
) = basis lfType (nt(fa), templateId, f, g, h)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object CreateCommand {
|
||||
implicit val traverseInstance: Traverse[CreateCommand] = new Traverse[CreateCommand] {
|
||||
override def traverseImpl[G[_]: Applicative, A, B](
|
||||
|
@ -43,6 +43,21 @@ object JsonProtocol extends DefaultJsonProtocol {
|
||||
implicit def NonEmptyListWriter[A: JsonWriter]: JsonWriter[NonEmptyList[A]] =
|
||||
nela => JsArray(nela.map(_.toJson).list.toVector)
|
||||
|
||||
/** This intuitively pointless extra type is here to give it specificity
|
||||
* so this instance will beat [[CollectionFormats#listFormat]].
|
||||
* You would normally achieve the conflict resolution by putting this
|
||||
* instance in a parent of [[CollectionFormats]], but that kind of
|
||||
* extension isn't possible here.
|
||||
*/
|
||||
final class JsonReaderList[A: JsonReader] extends JsonReader[List[A]] {
|
||||
override def read(json: JsValue) = json match {
|
||||
case JsArray(elements) => elements.iterator.map(_.convertTo[A]).toList
|
||||
case _ => deserializationError(s"must be a list, but got $json")
|
||||
}
|
||||
}
|
||||
|
||||
implicit def `List reader only`[A: JsonReader]: JsonReaderList[A] = new JsonReaderList
|
||||
|
||||
implicit val PartyDetails: JsonFormat[domain.PartyDetails] =
|
||||
jsonFormat3(domain.PartyDetails.apply)
|
||||
|
||||
@ -155,6 +170,31 @@ object JsonProtocol extends DefaultJsonProtocol {
|
||||
implicit val EnrichedContractIdFormat: RootJsonFormat[domain.EnrichedContractId] =
|
||||
jsonFormat2(domain.EnrichedContractId)
|
||||
|
||||
private[this] val contractIdAtOffsetKey = "contractIdAtOffset"
|
||||
|
||||
implicit val InitialContractKeyStreamRequest
|
||||
: RootJsonReader[domain.ContractKeyStreamRequest[Unit, JsValue]] = { jsv =>
|
||||
val ekey = jsv.convertTo[domain.EnrichedContractKey[JsValue]]
|
||||
jsv match {
|
||||
case JsObject(fields) if fields contains contractIdAtOffsetKey =>
|
||||
deserializationError(
|
||||
s"$contractIdAtOffsetKey is not allowed for WebSocket streams starting at the beginning")
|
||||
case _ =>
|
||||
}
|
||||
domain.ContractKeyStreamRequest((), ekey)
|
||||
}
|
||||
|
||||
implicit val ResumingContractKeyStreamRequest: RootJsonReader[
|
||||
domain.ContractKeyStreamRequest[Option[Option[domain.ContractId]], JsValue]] = { jsv =>
|
||||
val off = jsv match {
|
||||
case JsObject(fields) => fields get contractIdAtOffsetKey map (_.convertTo[Option[String]])
|
||||
case _ => None
|
||||
}
|
||||
val ekey = jsv.convertTo[domain.EnrichedContractKey[JsValue]]
|
||||
type OO[+A] = Option[Option[A]]
|
||||
domain.ContractKeyStreamRequest(domain.ContractId.subst[OO, String](off), ekey)
|
||||
}
|
||||
|
||||
implicit val ContractLocatorFormat: RootJsonFormat[domain.ContractLocator[JsValue]] =
|
||||
new RootJsonFormat[domain.ContractLocator[JsValue]] {
|
||||
override def write(obj: domain.ContractLocator[JsValue]): JsValue = obj match {
|
||||
|
@ -94,14 +94,15 @@ class WebsocketServiceIntegrationTest
|
||||
.collect { case m: TextMessage => m.getStrictText }
|
||||
.toMat(Sink.seq)(Keep.right)
|
||||
|
||||
private def singleClientQueryStream(
|
||||
private def singleClientWSStream(
|
||||
path: String,
|
||||
serviceUri: Uri,
|
||||
query: String,
|
||||
offset: Option[domain.Offset] = None): Source[Message, NotUsed] = {
|
||||
offset: Option[domain.Offset]): Source[Message, NotUsed] = {
|
||||
import spray.json._, json.JsonProtocol._
|
||||
val uri = serviceUri.copy(scheme = "ws").withPath(Uri.Path("/v1/stream/query"))
|
||||
val uri = serviceUri.copy(scheme = "ws").withPath(Uri.Path(s"/v1/stream/$path"))
|
||||
logger.info(
|
||||
s"---- singleClientQueryStream uri: ${uri.toString}, query: $query, offset: ${offset.toString}")
|
||||
s"---- singleClientWSStream uri: ${uri.toString}, query: $query, offset: ${offset.toString}")
|
||||
val webSocketFlow =
|
||||
Http().webSocketClientFlow(WebSocketRequest(uri = uri, subprotocol = validSubprotocol))
|
||||
offset
|
||||
@ -114,17 +115,17 @@ class WebsocketServiceIntegrationTest
|
||||
.via(webSocketFlow)
|
||||
}
|
||||
|
||||
private def singleClientQueryStream(
|
||||
serviceUri: Uri,
|
||||
query: String,
|
||||
offset: Option[domain.Offset] = None): Source[Message, NotUsed] =
|
||||
singleClientWSStream("query", serviceUri, query, offset)
|
||||
|
||||
private def singleClientFetchStream(
|
||||
serviceUri: Uri,
|
||||
request: String): Source[Message, NotUsed] = {
|
||||
val uri = serviceUri.copy(scheme = "ws").withPath(Uri.Path("/v1/stream/fetch"))
|
||||
logger.info(s"---- singleClientFetchStream uri: ${uri.toString}, request: $request")
|
||||
val webSocketFlow =
|
||||
Http().webSocketClientFlow(WebSocketRequest(uri = uri, subprotocol = validSubprotocol))
|
||||
Source
|
||||
.single(TextMessage(request))
|
||||
.via(webSocketFlow)
|
||||
}
|
||||
request: String,
|
||||
offset: Option[domain.Offset] = None): Source[Message, NotUsed] =
|
||||
singleClientWSStream("fetch", serviceUri, request, offset)
|
||||
|
||||
private def initialIouCreate(serviceUri: Uri) = {
|
||||
val payload = TestUtil.readFile("it/iouCreateCommand.json")
|
||||
@ -353,7 +354,14 @@ class WebsocketServiceIntegrationTest
|
||||
"fetch should receive deltas as contracts are archived/created, filtering out phantom archives" in withHttpService {
|
||||
(uri, encoder, _) =>
|
||||
val templateId = domain.TemplateId(None, "Account", "Account")
|
||||
val fetchRequest = """[{"templateId": "Account:Account", "key": ["Alice", "abc123"]}]"""
|
||||
def fetchRequest(contractIdAtOffset: Option[Option[domain.ContractId]] = None) = {
|
||||
import spray.json._, json.JsonProtocol._
|
||||
List(
|
||||
Map("templateId" -> "Account:Account".toJson, "key" -> List("Alice", "abc123").toJson)
|
||||
++ contractIdAtOffset
|
||||
.map(ocid => contractIdAtOffsetKey -> ocid.toJson)
|
||||
.toList).toJson.compactPrint
|
||||
}
|
||||
val f1 =
|
||||
postCreateCommand(accountCreateCommand(domain.Party("Alice"), "abc123"), encoder, uri)
|
||||
val f2 =
|
||||
@ -415,16 +423,29 @@ class WebsocketServiceIntegrationTest
|
||||
_ = r2._1 shouldBe 'success
|
||||
cid2 = getContractId(getResult(r2._2))
|
||||
|
||||
lastState <- singleClientFetchStream(uri, fetchRequest).via(parseResp) runWith resp(
|
||||
cid1,
|
||||
cid2)
|
||||
lastState <- singleClientFetchStream(uri, fetchRequest())
|
||||
.via(parseResp) runWith resp(cid1, cid2)
|
||||
|
||||
} yield
|
||||
inside(lastState) {
|
||||
liveOffset = inside(lastState) {
|
||||
case ShouldHaveEnded(liveStart, 0, lastSeen) =>
|
||||
import domain.Offset.ordering
|
||||
lastSeen should be > liveStart
|
||||
liveStart
|
||||
}
|
||||
|
||||
// check contractIdAtOffsets' effects on phantom filtering
|
||||
resumes <- Future.traverse(Seq((None, 2L), (Some(None), 0L), (Some(Some(cid1)), 1L))) {
|
||||
case (abcHint, expectArchives) =>
|
||||
(singleClientFetchStream(uri, fetchRequest(abcHint), Some(liveOffset))
|
||||
via parseResp runWith remainingDeltas)
|
||||
.map {
|
||||
case (creates, archives, _) =>
|
||||
creates shouldBe empty
|
||||
archives should have size expectArchives
|
||||
}
|
||||
}
|
||||
|
||||
} yield resumes.foldLeft(1 shouldBe 1)((_, a) => a)
|
||||
}
|
||||
|
||||
"fetch should should return an error if empty list of (templateId, key) pairs is passed" in withHttpService {
|
||||
@ -444,6 +465,45 @@ class WebsocketServiceIntegrationTest
|
||||
}: Future[Assertion]
|
||||
}
|
||||
|
||||
"ContractKeyStreamRequest" - {
|
||||
import spray.json._, json.JsonProtocol._
|
||||
val baseVal =
|
||||
domain.EnrichedContractKey(domain.TemplateId(Some("ab"), "cd", "ef"), JsString("42"): JsValue)
|
||||
val baseMap = baseVal.toJson.asJsObject.fields
|
||||
val withSome = JsObject(baseMap + (contractIdAtOffsetKey -> JsString("hi")))
|
||||
val withNone = JsObject(baseMap + (contractIdAtOffsetKey -> JsNull))
|
||||
|
||||
"initial JSON reader" - {
|
||||
type T = domain.ContractKeyStreamRequest[Unit, JsValue]
|
||||
|
||||
"shares EnrichedContractKey format" in {
|
||||
JsObject(baseMap).convertTo[T] should ===(domain.ContractKeyStreamRequest((), baseVal))
|
||||
}
|
||||
|
||||
"errors on contractIdAtOffset presence" in {
|
||||
a[DeserializationException] shouldBe thrownBy {
|
||||
withSome.convertTo[T]
|
||||
}
|
||||
a[DeserializationException] shouldBe thrownBy {
|
||||
withNone.convertTo[T]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"resuming JSON reader" - {
|
||||
type T = domain.ContractKeyStreamRequest[Option[Option[domain.ContractId]], JsValue]
|
||||
|
||||
"shares EnrichedContractKey format" in {
|
||||
JsObject(baseMap).convertTo[T] should ===(domain.ContractKeyStreamRequest(None, baseVal))
|
||||
}
|
||||
|
||||
"distinguishes null and string" in {
|
||||
withSome.convertTo[T] should ===(domain.ContractKeyStreamRequest(Some(Some("hi")), baseVal))
|
||||
withNone.convertTo[T] should ===(domain.ContractKeyStreamRequest(Some(None), baseVal))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def wsConnectRequest[M](
|
||||
uri: Uri,
|
||||
subprotocol: Option[String],
|
||||
@ -524,6 +584,8 @@ object WebsocketServiceIntegrationTest {
|
||||
.collect { case \/-(t) => t }
|
||||
.toMat(Sink.headOption)(Keep.right)
|
||||
|
||||
private val contractIdAtOffsetKey = "contractIdAtOffset"
|
||||
|
||||
private case class SimpleScenario(
|
||||
id: String,
|
||||
path: Uri.Path,
|
||||
|
Loading…
Reference in New Issue
Block a user