From 7c33a02d44464a96c779a3bde8015ee6c5dbd053 Mon Sep 17 00:00:00 2001 From: Stephen Compall Date: Wed, 20 Apr 2022 10:57:33 -0400 Subject: [PATCH] test and document error when querying with a pruned offset argument (#13633) * pruning test successful, shows the real error * document current pruning failure CHANGELOG_BEGIN CHANGELOG_END --- docs/source/json-api/index.rst | 3 + .../com/daml/http/WebsocketTestFixture.scala | 10 ++ ...tractWebsocketServiceIntegrationTest.scala | 113 +++++++++++++++++- security-evidence.md | 8 +- 4 files changed, 129 insertions(+), 5 deletions(-) diff --git a/docs/source/json-api/index.rst b/docs/source/json-api/index.rst index 14bd2b1d3e..3f9398bc49 100644 --- a/docs/source/json-api/index.rst +++ b/docs/source/json-api/index.rst @@ -1937,6 +1937,9 @@ For example, if this message preceded the above 3-query example, it would be as if ``"4307"`` had been specified for the first two queries, while ``"5609"`` would be used for the third query. +If any offset has been pruned, the websocket will immediately fail with +code 1011 and message ``internal error``. + The output is a series of JSON documents, each ``payload`` formatted according to :doc:`lf-value-specification`:: diff --git a/ledger-service/http-json-testing/src/main/scala/com/daml/http/WebsocketTestFixture.scala b/ledger-service/http-json-testing/src/main/scala/com/daml/http/WebsocketTestFixture.scala index d4829e9576..09abe9e350 100644 --- a/ledger-service/http-json-testing/src/main/scala/com/daml/http/WebsocketTestFixture.scala +++ b/ledger-service/http-json-testing/src/main/scala/com/daml/http/WebsocketTestFixture.scala @@ -308,6 +308,16 @@ private[http] object WebsocketTestFixture extends StrictLogging with Assertions } } + def readUntil[A]: ReadUntil[A] = new ReadUntil(Consume.syntax[A]) + + final class ReadUntil[A](private val syntax: Consume.Syntax[A]) extends AnyVal { + def apply[B](f: A => Option[B]): Consume.FCC[A, B] = { + def go: Consume.FCC[A, B] = + syntax.readOne flatMap { a => f(a).fold(go)(syntax.point) } + go + } + } + def parseResp(implicit ec: ExecutionContext, fm: Materializer, diff --git a/ledger-service/http-json/src/itlib/scala/http/AbstractWebsocketServiceIntegrationTest.scala b/ledger-service/http-json/src/itlib/scala/http/AbstractWebsocketServiceIntegrationTest.scala index 0df7fa6ecf..e6034ec6b6 100644 --- a/ledger-service/http-json/src/itlib/scala/http/AbstractWebsocketServiceIntegrationTest.scala +++ b/ledger-service/http-json/src/itlib/scala/http/AbstractWebsocketServiceIntegrationTest.scala @@ -5,7 +5,12 @@ package com.daml.http import akka.NotUsed import akka.http.scaladsl.Http -import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest} +import akka.http.scaladsl.model.ws.{ + Message, + PeerClosedConnectionException, + TextMessage, + WebSocketRequest, +} import akka.http.scaladsl.model.{HttpHeader, StatusCodes, Uri} import akka.stream.{KillSwitches, UniqueKillSwitch} import akka.stream.scaladsl.{Keep, Sink, Source} @@ -17,11 +22,13 @@ import com.daml.http.HttpServiceTestFixture.{ sharedAccountCreateCommand, } import com.daml.http.json.SprayJson +import com.daml.ledger.api.v1.admin.{participant_pruning_service => PruneGrpc} import com.typesafe.scalalogging.StrictLogging import org.scalatest._ import org.scalatest.freespec.AsyncFreeSpec import org.scalatest.matchers.should.Matchers import scalaz.std.option._ +import scalaz.std.tuple._ import scalaz.std.vector._ import scalaz.syntax.std.option._ import scalaz.syntax.tag._ @@ -950,6 +957,110 @@ abstract class AbstractWebsocketServiceIntegrationTest }: Future[Assertion] } + "fail reading from a pruned offset" in withHttpServiceAndClient { (uri, encoder, _, client, _) => + for { + aliceH <- getUniquePartyAndAuthHeaders(uri)("Alice") + (alice, aliceHeaders) = aliceH + offsets <- offsetBeforeAfterArchival(alice, uri, encoder, aliceHeaders) + (offsetBeforeArchive, offsetAfterArchive) = offsets + + pruned <- PruneGrpc.ParticipantPruningServiceGrpc + .stub(client.channel) + .prune( + PruneGrpc.PruneRequest( + pruneUpTo = domain.Offset unwrap offsetAfterArchive, + pruneAllDivulgedContracts = true, + ) + ) + _ = pruned should ===(PruneGrpc.PruneResponse()) + + // now query again with a pruned offset + jwt <- jwtForParties(uri)(List(alice.unwrap), List(), testId) + query = s"""[{"templateIds": ["Iou:Iou"]}]""" + streamError <- singleClientQueryStream(jwt, uri, query, Some(offsetBeforeArchive)) + .runWith(Sink.seq) + .failed + } yield inside(streamError) { case t: PeerClosedConnectionException => + // TODO #13506 descriptive/structured error. The logs when running this + // test include + // Websocket handler failed with FAILED_PRECONDITION: PARTICIPANT_PRUNED_DATA_ACCESSED(9,0): + // Transactions request from 0000000000000006 to 0000000000000008 + // precedes pruned offset 0000000000000007 + // but this doesn't propagate to the client + t.closeCode should ===(1011) // see RFC 6455 + t.closeReason should ===("internal error") + } + } + + private[this] def offsetBeforeAfterArchival( + party: domain.Party, + uri: Uri, + encoder: json.DomainJsonEncoder, + headers: List[HttpHeader], + ): Future[(domain.Offset, domain.Offset)] = { + import json.JsonProtocol._ + type In = JsValue // JsValue might not be the most convenient choice + val syntax = Consume.syntax[In] + import syntax._ + + def offsetAfterCreate(): Consume.FCC[In, (domain.ContractId, domain.Offset)] = for { + // make a contract + create <- liftF( + postCreateCommand( + iouCreateCommand(domain.Party unwrap party), + encoder, + uri, + headers, + ) + ) + cid = inside( + create map (_.convertTo[domain.SyncResponse[domain.ActiveContract[JsValue]]]) + ) { case (StatusCodes.OK, domain.OkResponse(contract, _, StatusCodes.OK)) => + contract.contractId + } + // wait for the creation's offset + offsetAfter <- readUntil[In] { + case ContractDelta(creates, _, off @ Some(_)) => + if (creates.exists(_._1 == cid.unwrap)) off else None + case _ => None + } + } yield (cid, offsetAfter) + + def readMidwayOffset(kill: UniqueKillSwitch) = for { + // wait for the ACS + _ <- readUntil[In] { + case ContractDelta(_, _, offset) => offset + case _ => None + } + // make a contract and fetch the offset after it + (cid, betweenOffset) <- offsetAfterCreate() + // archive it + archive <- liftF(postArchiveCommand(TpId.Iou.Iou, cid, encoder, uri, headers)) + _ = archive._1 should ===(StatusCodes.OK) + // wait for the archival offset + afterOffset <- readUntil[In] { + case ContractDelta(_, archived, offset) => + if (archived.exists(_.contractId == cid)) offset else None + case _ => None + } + // if you try to prune afterOffset, pruning fails with + // OFFSET_OUT_OF_RANGE(9,db14ee96): prune_up_to needs to be before ledger end 0000000000000007 + // create another dummy contract and ignore it + _ <- offsetAfterCreate() + _ = kill.shutdown() + } yield (betweenOffset, afterOffset) + + val query = """[{"templateIds": ["Iou:Iou"]}]""" + for { + jwt <- jwtForParties(uri)(List(party.unwrap), List(), testId) + (kill, source) = + singleClientQueryStream(jwt, uri, query) + .viaMat(KillSwitches.single)(Keep.right) + .preMaterialize() + offsets <- source.via(parseResp).runWith(Consume.interpret(readMidwayOffset(kill))) + } yield offsets + } + "query on a bunch of random splits should yield consistent results" in withHttpService { (uri, _, _, _) => for { diff --git a/security-evidence.md b/security-evidence.md index ff15791f37..d83640fd92 100644 --- a/security-evidence.md +++ b/security-evidence.md @@ -24,7 +24,7 @@ - forbid a non-authorized party to start a trigger: [TriggerServiceTest.scala](triggers/service/src/test/scala/com/digitalasset/daml/lf/engine/trigger/TriggerServiceTest.scala#L622) - forbid a non-authorized party to stop a trigger: [TriggerServiceTest.scala](triggers/service/src/test/scala/com/digitalasset/daml/lf/engine/trigger/TriggerServiceTest.scala#L659) - forbid a non-authorized user to upload a DAR: [TriggerServiceTest.scala](triggers/service/src/test/scala/com/digitalasset/daml/lf/engine/trigger/TriggerServiceTest.scala#L675) -- multiple websocket requests over the same WebSocket connection are NOT allowed: [AbstractWebsocketServiceIntegrationTest.scala](ledger-service/http-json/src/itlib/scala/http/AbstractWebsocketServiceIntegrationTest.scala#L111) +- multiple websocket requests over the same WebSocket connection are NOT allowed: [AbstractWebsocketServiceIntegrationTest.scala](ledger-service/http-json/src/itlib/scala/http/AbstractWebsocketServiceIntegrationTest.scala#L118) - refresh a token after expiry on the server side: [TriggerServiceTest.scala](triggers/service/src/test/scala/com/digitalasset/daml/lf/engine/trigger/TriggerServiceTest.scala#L700) - reject requests with missing auth header: [AbstractHttpServiceIntegrationTest.scala](ledger-service/http-json/src/itlib/scala/http/AbstractHttpServiceIntegrationTest.scala#L1280) - request a fresh token after expiry on user request: [TriggerServiceTest.scala](triggers/service/src/test/scala/com/digitalasset/daml/lf/engine/trigger/TriggerServiceTest.scala#L685) @@ -35,9 +35,9 @@ - return unauthorized without cookie: [TestMiddleware.scala](triggers/service/auth/src/test/scala/com/daml/auth/middleware/oauth2/TestMiddleware.scala#L87) - the /login endpoint with an oauth server checking claims should not authorize disallowed admin claims: [TestMiddleware.scala](triggers/service/auth/src/test/scala/com/daml/auth/middleware/oauth2/TestMiddleware.scala#L343) - the /login endpoint with an oauth server checking claims should not authorize unauthorized parties: [TestMiddleware.scala](triggers/service/auth/src/test/scala/com/daml/auth/middleware/oauth2/TestMiddleware.scala#L336) -- websocket request with invalid protocol token should be denied: [AbstractWebsocketServiceIntegrationTest.scala](ledger-service/http-json/src/itlib/scala/http/AbstractWebsocketServiceIntegrationTest.scala#L91) -- websocket request with valid protocol token should allow client subscribe to stream: [AbstractWebsocketServiceIntegrationTest.scala](ledger-service/http-json/src/itlib/scala/http/AbstractWebsocketServiceIntegrationTest.scala#L79) -- websocket request without protocol token should be denied: [AbstractWebsocketServiceIntegrationTest.scala](ledger-service/http-json/src/itlib/scala/http/AbstractWebsocketServiceIntegrationTest.scala#L101) +- websocket request with invalid protocol token should be denied: [AbstractWebsocketServiceIntegrationTest.scala](ledger-service/http-json/src/itlib/scala/http/AbstractWebsocketServiceIntegrationTest.scala#L98) +- websocket request with valid protocol token should allow client subscribe to stream: [AbstractWebsocketServiceIntegrationTest.scala](ledger-service/http-json/src/itlib/scala/http/AbstractWebsocketServiceIntegrationTest.scala#L86) +- websocket request without protocol token should be denied: [AbstractWebsocketServiceIntegrationTest.scala](ledger-service/http-json/src/itlib/scala/http/AbstractWebsocketServiceIntegrationTest.scala#L108) - well-authorized create is accepted: [AuthorizationSpec.scala](daml-lf/engine/src/test/scala/com/digitalasset/daml/lf/engine/AuthorizationSpec.scala#L43) - well-authorized exercise is accepted: [AuthorizationSpec.scala](daml-lf/engine/src/test/scala/com/digitalasset/daml/lf/engine/AuthorizationSpec.scala#L141) - well-authorized exercise/create is accepted: [AuthPropagationSpec.scala](daml-lf/engine/src/test/scala/com/digitalasset/daml/lf/engine/AuthPropagationSpec.scala#L220)