From 332d35a34768754ac9eb5b803e9002e5d0a2164c Mon Sep 17 00:00:00 2001 From: Stephen Compall Date: Thu, 26 Mar 2020 15:31:27 -0400 Subject: [PATCH] stop filtering "phantom archives" when it won't work; reenable it on request (#5003) * ContractKeyStreamRequest with different types for whether offset-preceded or not - should replace EnrichedContractKey as the WS StreamQuery type * add the ContractKeyStreamRequest layer everywhere * split StreamQuery parse from the other steps * make StreamQuery type depend on the input JsValue * only StreamQueryReader is a typeclass now * scalafmt * wrong type arg * letting the request data and phantom archive removal choice flow - from work with @leo-da * finish threading request-derived phantom state to removal flow - from work with @leo-da * make it clear that hints are contract IDs in StreamQuery - from work with @leo-da * treat StreamQueryReader's type parameter fully as a phantom - from work with @leo-da * remove unused type alias - from work with @leo-da * test fetch resume, with and without various offsetHints * document offsetHints * missing ` in rst - thanks @hurryabit * rename offsetHint to contractIdAtOffset CHANGELOG_BEGIN - [JSON API - Experimental] New field ``contractIdAtOffset`` for fetch-by-key streams to restore proper archive filtering. See `issue #4511 `_. CHANGELOG_END * we never unify the two ContractKeyStreamRequest types * doc update for contractIdAtOffset --- docs/source/json-api/index.rst | 30 +++- .../digitalasset/http/WebSocketService.scala | 154 +++++++++++------- .../http/WebsocketEndpoints.scala | 6 +- .../scala/com/digitalasset/http/domain.scala | 37 +++++ .../digitalasset/http/json/JsonProtocol.scala | 40 +++++ .../WebsocketServiceIntegrationTest.scala | 100 +++++++++--- 6 files changed, 282 insertions(+), 85 deletions(-) diff --git a/docs/source/json-api/index.rst b/docs/source/json-api/index.rst index 616ee9841c..d92374ee17 100644 --- a/docs/source/json-api/index.rst +++ b/docs/source/json-api/index.rst @@ -1295,4 +1295,32 @@ Example: {"templateId": "Account:Account", "key": {"_1": "Alice", "_2": "def345"}} ] -The output stream has the same format as the output from the `Contracts Query Stream`_. We further guarantee that for every ``archived`` event appearing on the stream there has been a matching ``created`` event earlier in the stream. +The output stream has the same format as the output from the `Contracts +Query Stream`_. We further guarantee that for every ``archived`` event +appearing on the stream there has been a matching ``created`` event +earlier in the stream, except in the case of missing +``contractIdAtOffset`` fields in the case described below. + +You may supply an optional ``offset`` for the stream, exactly as with +query streams. However, you should supply with each ``{templateId, +key}`` pair a ``contractIdAtOffset``, which is the contract ID currently +associated with that pair at the point of the given offset, or ``null`` +if no contract ID was associated with the pair at that offset. For +example, with the above keys, if you had one ``"abc123"`` contract but +no ``"def345"`` contract, you might specify: + +.. code-block:: json + + [ + {"templateId": "Account:Account", "key": {"_1": "Alice", "_2": "abc123"}, + "contractIdAtOffset": "#1:0"}, + {"templateId": "Account:Account", "key": {"_1": "Alice", "_2": "def345"}, + "contractIdAtOffset": null} + ] + +If every ``contractIdAtOffset`` is specified, as is so in the example +above, you will not receive any ``archived`` events for contracts +created before the offset *unless* those contracts are identified in a +``contractIdAtOffset``. By contrast, if any ``contractIdAtOffset`` is +missing, ``archived`` event filtering will be disabled, and you will +receive "phantom archives" as with query streams. diff --git a/ledger-service/http-json/src/main/scala/com/digitalasset/http/WebSocketService.scala b/ledger-service/http-json/src/main/scala/com/digitalasset/http/WebSocketService.scala index cc73ae7fd1..a3568ddb19 100644 --- a/ledger-service/http-json/src/main/scala/com/digitalasset/http/WebSocketService.scala +++ b/ledger-service/http-json/src/main/scala/com/digitalasset/http/WebSocketService.scala @@ -17,8 +17,6 @@ import json.JsonProtocol.LfValueCodec.{apiValueToJsValue => lfValueToJsValue} import query.ValuePredicate.{LfV, TypeLookup} import com.digitalasset.jwt.domain.Jwt import com.typesafe.scalalogging.LazyLogging -import scalaz.{Liskov, NonEmptyList} -import Liskov.<~< import com.digitalasset.http.query.ValuePredicate import scalaz.syntax.bifunctor._ import scalaz.syntax.std.boolean._ @@ -28,8 +26,9 @@ import scalaz.std.map._ import scalaz.std.option._ import scalaz.std.set._ import scalaz.std.tuple._ -import scalaz.{-\/, \/, \/-} -import spray.json.{JsArray, JsObject, JsValue} +import scalaz.{-\/, Foldable, Liskov, NonEmptyList, Tag, \/, \/-} +import Liskov.<~< +import spray.json.{JsArray, JsObject, JsValue, JsonReader} import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} @@ -134,14 +133,17 @@ object WebSocketService { )(_ append _) } - trait StreamQuery[A] { + sealed abstract class StreamQueryReader[A] { + case class Query[Q](q: Q, alg: StreamQuery[Q]) + def parse(resumingAtOffset: Boolean, decoder: DomainJsonDecoder, jv: JsValue): Error \/ Query[_] + } + + sealed trait StreamQuery[A] { /** Extra data on success of a predicate. */ type Positive - def parse(decoder: DomainJsonDecoder, jv: JsValue): Error \/ A - - def allowPhantonArchives: Boolean + def removePhantomArchives(request: A): Option[Set[domain.ContractId]] def predicate( request: A, @@ -151,19 +153,21 @@ object WebSocketService { def renderCreatedMetadata(p: Positive): Map[String, JsValue] } - implicit val SearchForeverRequestWithStreamQuery: StreamQuery[domain.SearchForeverRequest] = - new StreamQuery[domain.SearchForeverRequest] { + implicit val SearchForeverRequestWithStreamQuery: StreamQueryReader[domain.SearchForeverRequest] = + new StreamQueryReader[domain.SearchForeverRequest] + with StreamQuery[domain.SearchForeverRequest] { type Positive = NonEmptyList[Int] - override def parse(decoder: DomainJsonDecoder, jv: JsValue): Error \/ SearchForeverRequest = { + override def parse(resumingAtOffset: Boolean, decoder: DomainJsonDecoder, jv: JsValue) = { import JsonProtocol._ SprayJson .decode[SearchForeverRequest](jv) .liftErr(InvalidUserInput) + .map(Query(_, this)) } - override def allowPhantonArchives: Boolean = true + override def removePhantomArchives(request: SearchForeverRequest) = None override def predicate( request: SearchForeverRequest, @@ -206,56 +210,81 @@ object WebSocketService { } implicit val EnrichedContractKeyWithStreamQuery - : StreamQuery[NonEmptyList[domain.EnrichedContractKey[LfV]]] = - new StreamQuery[NonEmptyList[domain.EnrichedContractKey[LfV]]] { - - type Positive = Unit + : StreamQueryReader[domain.ContractKeyStreamRequest[_, _]] = + new StreamQueryReader[domain.ContractKeyStreamRequest[_, _]] { import JsonProtocol._ @SuppressWarnings(Array("org.wartremover.warts.Any")) - override def parse( - decoder: DomainJsonDecoder, - jv: JsValue): Error \/ NonEmptyList[domain.EnrichedContractKey[LfV]] = - for { - as <- SprayJson - .decode[NonEmptyList[domain.EnrichedContractKey[JsValue]]](jv) - .liftErr(InvalidUserInput) - bs = as.map(a => decodeWithFallback(decoder, a)) - } yield bs + override def parse(resumingAtOffset: Boolean, decoder: DomainJsonDecoder, jv: JsValue) = { + type NelCKRH[Hint, V] = NonEmptyList[domain.ContractKeyStreamRequest[Hint, V]] + def go[Hint](alg: StreamQuery[NelCKRH[Hint, LfV]])( + implicit ev: JsonReader[NelCKRH[Hint, JsValue]]) = + for { + as <- SprayJson + .decode[NelCKRH[Hint, JsValue]](jv) + .liftErr(InvalidUserInput) + bs = as.map(a => decodeWithFallback(decoder, a)) + } yield Query(bs, alg) + if (resumingAtOffset) go(ResumingEnrichedContractKeyWithStreamQuery) + else go(InitialEnrichedContractKeyWithStreamQuery) + } - private def decodeWithFallback( + @SuppressWarnings(Array("org.wartremover.warts.Any")) + private def decodeWithFallback[Hint]( decoder: DomainJsonDecoder, - a: domain.EnrichedContractKey[JsValue]): domain.EnrichedContractKey[LfV] = + a: domain.ContractKeyStreamRequest[Hint, JsValue]) = decoder .decodeUnderlyingValuesToLf(a) .valueOr(_ => a.map(_ => com.digitalasset.daml.lf.value.Value.ValueUnit)) // unit will not match any key - override def allowPhantonArchives: Boolean = false - - override def predicate( - request: NonEmptyList[domain.EnrichedContractKey[LfV]], - resolveTemplateId: PackageService.ResolveTemplateId, - lookupType: TypeLookup): StreamPredicate[Positive] = { - - import util.Collections._ - - val (resolvedWithKey, unresolved) = - request.toSet.partitionMap { x: domain.EnrichedContractKey[LfV] => - resolveTemplateId(x.templateId).map((_, x.key)).toLeftDisjunction(x.templateId) - } - - val q: Map[domain.TemplateId.RequiredPkg, LfV] = resolvedWithKey.toMap - val fn: domain.ActiveContract[LfV] => Option[Positive] = { a => - if (q.get(a.templateId).exists(k => domain.ActiveContract.matchesKey(k)(a))) - Some(()) - else None - } - (q.keySet, unresolved, fn) - } - - override def renderCreatedMetadata(p: Unit) = Map.empty } + + private[this] sealed abstract class EnrichedContractKeyWithStreamQuery[Cid] + extends StreamQuery[NonEmptyList[domain.ContractKeyStreamRequest[Cid, LfV]]] { + type Positive = Unit + + protected type CKR[+V] = domain.ContractKeyStreamRequest[Cid, V] + + override def predicate( + request: NonEmptyList[CKR[LfV]], + resolveTemplateId: PackageService.ResolveTemplateId, + lookupType: TypeLookup): StreamPredicate[Positive] = { + + import util.Collections._ + + val (resolvedWithKey, unresolved) = + request.toSet.partitionMap { x: CKR[LfV] => + resolveTemplateId(x.ekey.templateId) + .map((_, x.ekey.key)) + .toLeftDisjunction(x.ekey.templateId) + } + + val q: Map[domain.TemplateId.RequiredPkg, LfV] = resolvedWithKey.toMap + val fn: domain.ActiveContract[LfV] => Option[Positive] = { a => + if (q.get(a.templateId).exists(k => domain.ActiveContract.matchesKey(k)(a))) + Some(()) + else None + } + (q.keySet, unresolved, fn) + } + + override def renderCreatedMetadata(p: Unit) = Map.empty + } + + private[this] object InitialEnrichedContractKeyWithStreamQuery + extends EnrichedContractKeyWithStreamQuery[Unit] { + override def removePhantomArchives(request: NonEmptyList[CKR[LfV]]) = Some(Set.empty) + } + + private[this] object ResumingEnrichedContractKeyWithStreamQuery + extends EnrichedContractKeyWithStreamQuery[Option[Option[domain.ContractId]]] { + override def removePhantomArchives(request: NonEmptyList[CKR[LfV]]) = { + @SuppressWarnings(Array("org.wartremover.warts.Any")) + val NelO = Foldable[NonEmptyList].compose[Option] + request traverse (_.contractIdAtOffset) map NelO.toSet + } + } } class WebSocketService( @@ -276,7 +305,7 @@ class WebSocketService( private val numConns = new java.util.concurrent.atomic.AtomicInteger(0) - private[http] def transactionMessageHandler[A: StreamQuery]( + private[http] def transactionMessageHandler[A: StreamQueryReader]( jwt: Jwt, jwtPayload: JwtPayload, ): Flow[Message, Message, _] = @@ -309,11 +338,11 @@ class WebSocketService( } @SuppressWarnings(Array("org.wartremover.warts.Any")) - private def wsMessageHandler[A: StreamQuery]( + private def wsMessageHandler[A: StreamQueryReader]( jwt: Jwt, jwtPayload: JwtPayload, ): Flow[Message, Message, NotUsed] = { - val Q = implicitly[StreamQuery[A]] + val Q = implicitly[StreamQueryReader[A]] Flow[Message] .mapAsync(1) { case msg: TextMessage => @@ -332,12 +361,14 @@ class WebSocketService( for { offPrefix <- oeso.sequence jv <- ejv - a <- Q.parse(decoder, jv) + a <- Q.parse(resumingAtOffset = offPrefix.isDefined, decoder, jv) } yield (offPrefix, a) } .map { _.flatMap { - case (offPrefix, a) => getTransactionSourceForParty[A](jwt, jwtPayload, offPrefix, a) + case (offPrefix, qq: Q.Query[q]) => + implicit val SQ: StreamQuery[q] = qq.alg + getTransactionSourceForParty[q](jwt, jwtPayload, offPrefix, qq.q: q) } } .takeWhile(_.isRight, inclusive = true) // stop after emitting 1st error @@ -361,7 +392,7 @@ class WebSocketService( .insertDeleteStepSource(jwt, jwtPayload.party, resolved.toList, offPrefix, Terminates.Never) .via(convertFilterContracts(fn)) .filter(_.nonEmpty) - .via(removePhantomArchives(remove = !Q.allowPhantonArchives)) + .via(removePhantomArchives(remove = Q.removePhantomArchives(request))) .map(_.mapPos(Q.renderCreatedMetadata).render) .via(renderEventsAndEmitHeartbeats) // wrong place, see https://github.com/digital-asset/daml/issues/4955 .prepend(reportUnresolvedTemplateIds(unresolved)) @@ -390,15 +421,14 @@ class WebSocketService( case Some((_, a)) => renderEvents(a._1, a._2) } - private def removePhantomArchives[A, B](remove: Boolean) = - if (remove) removePhantomArchives_[A, B] - else Flow[StepAndErrors[A, B]] + private def removePhantomArchives[A, B](remove: Option[Set[domain.ContractId]]) = + remove cata (removePhantomArchives_[A, B], Flow[StepAndErrors[A, B]]) - private def removePhantomArchives_[A, B] + private def removePhantomArchives_[A, B](initialState: Set[domain.ContractId]) : Flow[StepAndErrors[A, B], StepAndErrors[A, B], NotUsed] = { import ContractStreamStep.{LiveBegin, Txn, Acs} Flow[StepAndErrors[A, B]] - .scan((Set.empty[String], Option.empty[StepAndErrors[A, B]])) { + .scan((Tag unsubst initialState, Option.empty[StepAndErrors[A, B]])) { case ((s0, _), a0 @ StepAndErrors(_, Txn(idstep, _))) => val newInserts: Vector[String] = domain.ContractId.unsubst(idstep.inserts.map(_._1.contractId)) diff --git a/ledger-service/http-json/src/main/scala/com/digitalasset/http/WebsocketEndpoints.scala b/ledger-service/http-json/src/main/scala/com/digitalasset/http/WebsocketEndpoints.scala index e500dbc3d8..4817dfef48 100644 --- a/ledger-service/http-json/src/main/scala/com/digitalasset/http/WebsocketEndpoints.scala +++ b/ledger-service/http-json/src/main/scala/com/digitalasset/http/WebsocketEndpoints.scala @@ -13,7 +13,7 @@ import com.digitalasset.ledger.api.refinements.{ApiTypes => lar} import com.typesafe.scalalogging.StrictLogging import scalaz.syntax.std.boolean._ import scalaz.syntax.std.option._ -import scalaz.{NonEmptyList, \/} +import scalaz.\/ import scala.concurrent.{ExecutionContext, Future} import EndpointsCompanion._ @@ -84,7 +84,7 @@ class WebsocketEndpoints( payload <- preconnect(decodeJwt, upgradeReq, wsProtocol) (jwt, jwtPayload) = payload } yield - handleWebsocketRequest[NonEmptyList[domain.EnrichedContractKey[domain.LfValue]]]( + handleWebsocketRequest[domain.ContractKeyStreamRequest[_, _]]( jwt, jwtPayload, upgradeReq, @@ -93,7 +93,7 @@ class WebsocketEndpoints( ) } - def handleWebsocketRequest[A: WebSocketService.StreamQuery]( + def handleWebsocketRequest[A: WebSocketService.StreamQueryReader]( jwt: Jwt, jwtPayload: domain.JwtPayload, req: UpgradeToWebSocket, diff --git a/ledger-service/http-json/src/main/scala/com/digitalasset/http/domain.scala b/ledger-service/http-json/src/main/scala/com/digitalasset/http/domain.scala index e81667a333..6e8db90690 100644 --- a/ledger-service/http-json/src/main/scala/com/digitalasset/http/domain.scala +++ b/ledger-service/http-json/src/main/scala/com/digitalasset/http/domain.scala @@ -86,6 +86,11 @@ object domain { contractId: domain.ContractId, ) extends ContractLocator[Nothing] + final case class ContractKeyStreamRequest[+Cid, +LfV]( + contractIdAtOffset: Cid, + ekey: EnrichedContractKey[LfV], + ) + case class GetActiveContractsRequest( templateIds: Set[TemplateId.OptionalPkg], query: Map[String, JsValue], @@ -416,6 +421,19 @@ object domain { } } + object ContractKeyStreamRequest { + implicit def covariantR[Off]: Traverse[ContractKeyStreamRequest[Off, ?]] = { + type F[A] = ContractKeyStreamRequest[Off, A] + new Traverse[F] { + override def traverseImpl[G[_]: Applicative, A, B](fa: F[A])(f: A => G[B]): G[F[B]] = + fa.ekey traverse f map (ekey => fa copy (ekey = ekey)) + } + } + + implicit def hasTemplateId[Off]: HasTemplateId[ContractKeyStreamRequest[Off, ?]] = + HasTemplateId.by[ContractKeyStreamRequest[Off, ?]](_.ekey) + } + private[this] implicit final class ErrorOps[A](private val o: Option[A]) extends AnyVal { def required(label: String): Error \/ A = o toRightDisjunction Error('ErrorOps_required, s"Missing required field $label") @@ -435,6 +453,25 @@ object domain { ): Error \/ LfType } + object HasTemplateId { + def by[F[_]]: By[F] = new By[F](0) + + final class By[F[_]](private val ign: Int) extends AnyVal { + def apply[G[_]](nt: F[_] => G[_])(implicit basis: HasTemplateId[G]): HasTemplateId[F] = + new HasTemplateId[F] { + override def templateId(fa: F[_]) = basis templateId nt(fa) + + override def lfType( + fa: F[_], + templateId: TemplateId.RequiredPkg, + f: PackageService.ResolveTemplateRecordType, + g: PackageService.ResolveChoiceArgType, + h: PackageService.ResolveKeyType, + ) = basis lfType (nt(fa), templateId, f, g, h) + } + } + } + object CreateCommand { implicit val traverseInstance: Traverse[CreateCommand] = new Traverse[CreateCommand] { override def traverseImpl[G[_]: Applicative, A, B]( diff --git a/ledger-service/http-json/src/main/scala/com/digitalasset/http/json/JsonProtocol.scala b/ledger-service/http-json/src/main/scala/com/digitalasset/http/json/JsonProtocol.scala index 9897e0cada..fb182bd2eb 100644 --- a/ledger-service/http-json/src/main/scala/com/digitalasset/http/json/JsonProtocol.scala +++ b/ledger-service/http-json/src/main/scala/com/digitalasset/http/json/JsonProtocol.scala @@ -43,6 +43,21 @@ object JsonProtocol extends DefaultJsonProtocol { implicit def NonEmptyListWriter[A: JsonWriter]: JsonWriter[NonEmptyList[A]] = nela => JsArray(nela.map(_.toJson).list.toVector) + /** This intuitively pointless extra type is here to give it specificity + * so this instance will beat [[CollectionFormats#listFormat]]. + * You would normally achieve the conflict resolution by putting this + * instance in a parent of [[CollectionFormats]], but that kind of + * extension isn't possible here. + */ + final class JsonReaderList[A: JsonReader] extends JsonReader[List[A]] { + override def read(json: JsValue) = json match { + case JsArray(elements) => elements.iterator.map(_.convertTo[A]).toList + case _ => deserializationError(s"must be a list, but got $json") + } + } + + implicit def `List reader only`[A: JsonReader]: JsonReaderList[A] = new JsonReaderList + implicit val PartyDetails: JsonFormat[domain.PartyDetails] = jsonFormat3(domain.PartyDetails.apply) @@ -155,6 +170,31 @@ object JsonProtocol extends DefaultJsonProtocol { implicit val EnrichedContractIdFormat: RootJsonFormat[domain.EnrichedContractId] = jsonFormat2(domain.EnrichedContractId) + private[this] val contractIdAtOffsetKey = "contractIdAtOffset" + + implicit val InitialContractKeyStreamRequest + : RootJsonReader[domain.ContractKeyStreamRequest[Unit, JsValue]] = { jsv => + val ekey = jsv.convertTo[domain.EnrichedContractKey[JsValue]] + jsv match { + case JsObject(fields) if fields contains contractIdAtOffsetKey => + deserializationError( + s"$contractIdAtOffsetKey is not allowed for WebSocket streams starting at the beginning") + case _ => + } + domain.ContractKeyStreamRequest((), ekey) + } + + implicit val ResumingContractKeyStreamRequest: RootJsonReader[ + domain.ContractKeyStreamRequest[Option[Option[domain.ContractId]], JsValue]] = { jsv => + val off = jsv match { + case JsObject(fields) => fields get contractIdAtOffsetKey map (_.convertTo[Option[String]]) + case _ => None + } + val ekey = jsv.convertTo[domain.EnrichedContractKey[JsValue]] + type OO[+A] = Option[Option[A]] + domain.ContractKeyStreamRequest(domain.ContractId.subst[OO, String](off), ekey) + } + implicit val ContractLocatorFormat: RootJsonFormat[domain.ContractLocator[JsValue]] = new RootJsonFormat[domain.ContractLocator[JsValue]] { override def write(obj: domain.ContractLocator[JsValue]): JsValue = obj match { diff --git a/ledger-service/http-json/src/test/scala/com/digitalasset/http/WebsocketServiceIntegrationTest.scala b/ledger-service/http-json/src/test/scala/com/digitalasset/http/WebsocketServiceIntegrationTest.scala index 45ab2b3990..5cdc22f87b 100644 --- a/ledger-service/http-json/src/test/scala/com/digitalasset/http/WebsocketServiceIntegrationTest.scala +++ b/ledger-service/http-json/src/test/scala/com/digitalasset/http/WebsocketServiceIntegrationTest.scala @@ -94,14 +94,15 @@ class WebsocketServiceIntegrationTest .collect { case m: TextMessage => m.getStrictText } .toMat(Sink.seq)(Keep.right) - private def singleClientQueryStream( + private def singleClientWSStream( + path: String, serviceUri: Uri, query: String, - offset: Option[domain.Offset] = None): Source[Message, NotUsed] = { + offset: Option[domain.Offset]): Source[Message, NotUsed] = { import spray.json._, json.JsonProtocol._ - val uri = serviceUri.copy(scheme = "ws").withPath(Uri.Path("/v1/stream/query")) + val uri = serviceUri.copy(scheme = "ws").withPath(Uri.Path(s"/v1/stream/$path")) logger.info( - s"---- singleClientQueryStream uri: ${uri.toString}, query: $query, offset: ${offset.toString}") + s"---- singleClientWSStream uri: ${uri.toString}, query: $query, offset: ${offset.toString}") val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest(uri = uri, subprotocol = validSubprotocol)) offset @@ -114,17 +115,17 @@ class WebsocketServiceIntegrationTest .via(webSocketFlow) } + private def singleClientQueryStream( + serviceUri: Uri, + query: String, + offset: Option[domain.Offset] = None): Source[Message, NotUsed] = + singleClientWSStream("query", serviceUri, query, offset) + private def singleClientFetchStream( serviceUri: Uri, - request: String): Source[Message, NotUsed] = { - val uri = serviceUri.copy(scheme = "ws").withPath(Uri.Path("/v1/stream/fetch")) - logger.info(s"---- singleClientFetchStream uri: ${uri.toString}, request: $request") - val webSocketFlow = - Http().webSocketClientFlow(WebSocketRequest(uri = uri, subprotocol = validSubprotocol)) - Source - .single(TextMessage(request)) - .via(webSocketFlow) - } + request: String, + offset: Option[domain.Offset] = None): Source[Message, NotUsed] = + singleClientWSStream("fetch", serviceUri, request, offset) private def initialIouCreate(serviceUri: Uri) = { val payload = TestUtil.readFile("it/iouCreateCommand.json") @@ -353,7 +354,14 @@ class WebsocketServiceIntegrationTest "fetch should receive deltas as contracts are archived/created, filtering out phantom archives" in withHttpService { (uri, encoder, _) => val templateId = domain.TemplateId(None, "Account", "Account") - val fetchRequest = """[{"templateId": "Account:Account", "key": ["Alice", "abc123"]}]""" + def fetchRequest(contractIdAtOffset: Option[Option[domain.ContractId]] = None) = { + import spray.json._, json.JsonProtocol._ + List( + Map("templateId" -> "Account:Account".toJson, "key" -> List("Alice", "abc123").toJson) + ++ contractIdAtOffset + .map(ocid => contractIdAtOffsetKey -> ocid.toJson) + .toList).toJson.compactPrint + } val f1 = postCreateCommand(accountCreateCommand(domain.Party("Alice"), "abc123"), encoder, uri) val f2 = @@ -415,16 +423,29 @@ class WebsocketServiceIntegrationTest _ = r2._1 shouldBe 'success cid2 = getContractId(getResult(r2._2)) - lastState <- singleClientFetchStream(uri, fetchRequest).via(parseResp) runWith resp( - cid1, - cid2) + lastState <- singleClientFetchStream(uri, fetchRequest()) + .via(parseResp) runWith resp(cid1, cid2) - } yield - inside(lastState) { + liveOffset = inside(lastState) { case ShouldHaveEnded(liveStart, 0, lastSeen) => import domain.Offset.ordering lastSeen should be > liveStart + liveStart } + + // check contractIdAtOffsets' effects on phantom filtering + resumes <- Future.traverse(Seq((None, 2L), (Some(None), 0L), (Some(Some(cid1)), 1L))) { + case (abcHint, expectArchives) => + (singleClientFetchStream(uri, fetchRequest(abcHint), Some(liveOffset)) + via parseResp runWith remainingDeltas) + .map { + case (creates, archives, _) => + creates shouldBe empty + archives should have size expectArchives + } + } + + } yield resumes.foldLeft(1 shouldBe 1)((_, a) => a) } "fetch should should return an error if empty list of (templateId, key) pairs is passed" in withHttpService { @@ -444,6 +465,45 @@ class WebsocketServiceIntegrationTest }: Future[Assertion] } + "ContractKeyStreamRequest" - { + import spray.json._, json.JsonProtocol._ + val baseVal = + domain.EnrichedContractKey(domain.TemplateId(Some("ab"), "cd", "ef"), JsString("42"): JsValue) + val baseMap = baseVal.toJson.asJsObject.fields + val withSome = JsObject(baseMap + (contractIdAtOffsetKey -> JsString("hi"))) + val withNone = JsObject(baseMap + (contractIdAtOffsetKey -> JsNull)) + + "initial JSON reader" - { + type T = domain.ContractKeyStreamRequest[Unit, JsValue] + + "shares EnrichedContractKey format" in { + JsObject(baseMap).convertTo[T] should ===(domain.ContractKeyStreamRequest((), baseVal)) + } + + "errors on contractIdAtOffset presence" in { + a[DeserializationException] shouldBe thrownBy { + withSome.convertTo[T] + } + a[DeserializationException] shouldBe thrownBy { + withNone.convertTo[T] + } + } + } + + "resuming JSON reader" - { + type T = domain.ContractKeyStreamRequest[Option[Option[domain.ContractId]], JsValue] + + "shares EnrichedContractKey format" in { + JsObject(baseMap).convertTo[T] should ===(domain.ContractKeyStreamRequest(None, baseVal)) + } + + "distinguishes null and string" in { + withSome.convertTo[T] should ===(domain.ContractKeyStreamRequest(Some(Some("hi")), baseVal)) + withNone.convertTo[T] should ===(domain.ContractKeyStreamRequest(Some(None), baseVal)) + } + } + } + private def wsConnectRequest[M]( uri: Uri, subprotocol: Option[String], @@ -524,6 +584,8 @@ object WebsocketServiceIntegrationTest { .collect { case \/-(t) => t } .toMat(Sink.headOption)(Keep.right) + private val contractIdAtOffsetKey = "contractIdAtOffset" + private case class SimpleScenario( id: String, path: Uri.Path,