test and document error when querying with a pruned offset argument (#13633)

* pruning test successful, shows the real error
* document current pruning failure

CHANGELOG_BEGIN
CHANGELOG_END
This commit is contained in:
Stephen Compall 2022-04-20 10:57:33 -04:00 committed by GitHub
parent cb91febcd5
commit 7c33a02d44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 129 additions and 5 deletions

View File

@ -1937,6 +1937,9 @@ For example, if this message preceded the above 3-query example, it
would be as if ``"4307"`` had been specified for the first two queries,
while ``"5609"`` would be used for the third query.
If any offset has been pruned, the websocket will immediately fail with
code 1011 and message ``internal error``.
The output is a series of JSON documents, each ``payload`` formatted
according to :doc:`lf-value-specification`::

View File

@ -308,6 +308,16 @@ private[http] object WebsocketTestFixture extends StrictLogging with Assertions
}
}
def readUntil[A]: ReadUntil[A] = new ReadUntil(Consume.syntax[A])
final class ReadUntil[A](private val syntax: Consume.Syntax[A]) extends AnyVal {
def apply[B](f: A => Option[B]): Consume.FCC[A, B] = {
def go: Consume.FCC[A, B] =
syntax.readOne flatMap { a => f(a).fold(go)(syntax.point) }
go
}
}
def parseResp(implicit
ec: ExecutionContext,
fm: Materializer,

View File

@ -5,7 +5,12 @@ package com.daml.http
import akka.NotUsed
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest}
import akka.http.scaladsl.model.ws.{
Message,
PeerClosedConnectionException,
TextMessage,
WebSocketRequest,
}
import akka.http.scaladsl.model.{HttpHeader, StatusCodes, Uri}
import akka.stream.{KillSwitches, UniqueKillSwitch}
import akka.stream.scaladsl.{Keep, Sink, Source}
@ -17,11 +22,13 @@ import com.daml.http.HttpServiceTestFixture.{
sharedAccountCreateCommand,
}
import com.daml.http.json.SprayJson
import com.daml.ledger.api.v1.admin.{participant_pruning_service => PruneGrpc}
import com.typesafe.scalalogging.StrictLogging
import org.scalatest._
import org.scalatest.freespec.AsyncFreeSpec
import org.scalatest.matchers.should.Matchers
import scalaz.std.option._
import scalaz.std.tuple._
import scalaz.std.vector._
import scalaz.syntax.std.option._
import scalaz.syntax.tag._
@ -950,6 +957,110 @@ abstract class AbstractWebsocketServiceIntegrationTest
}: Future[Assertion]
}
"fail reading from a pruned offset" in withHttpServiceAndClient { (uri, encoder, _, client, _) =>
for {
aliceH <- getUniquePartyAndAuthHeaders(uri)("Alice")
(alice, aliceHeaders) = aliceH
offsets <- offsetBeforeAfterArchival(alice, uri, encoder, aliceHeaders)
(offsetBeforeArchive, offsetAfterArchive) = offsets
pruned <- PruneGrpc.ParticipantPruningServiceGrpc
.stub(client.channel)
.prune(
PruneGrpc.PruneRequest(
pruneUpTo = domain.Offset unwrap offsetAfterArchive,
pruneAllDivulgedContracts = true,
)
)
_ = pruned should ===(PruneGrpc.PruneResponse())
// now query again with a pruned offset
jwt <- jwtForParties(uri)(List(alice.unwrap), List(), testId)
query = s"""[{"templateIds": ["Iou:Iou"]}]"""
streamError <- singleClientQueryStream(jwt, uri, query, Some(offsetBeforeArchive))
.runWith(Sink.seq)
.failed
} yield inside(streamError) { case t: PeerClosedConnectionException =>
// TODO #13506 descriptive/structured error. The logs when running this
// test include
// Websocket handler failed with FAILED_PRECONDITION: PARTICIPANT_PRUNED_DATA_ACCESSED(9,0):
// Transactions request from 0000000000000006 to 0000000000000008
// precedes pruned offset 0000000000000007
// but this doesn't propagate to the client
t.closeCode should ===(1011) // see RFC 6455
t.closeReason should ===("internal error")
}
}
private[this] def offsetBeforeAfterArchival(
party: domain.Party,
uri: Uri,
encoder: json.DomainJsonEncoder,
headers: List[HttpHeader],
): Future[(domain.Offset, domain.Offset)] = {
import json.JsonProtocol._
type In = JsValue // JsValue might not be the most convenient choice
val syntax = Consume.syntax[In]
import syntax._
def offsetAfterCreate(): Consume.FCC[In, (domain.ContractId, domain.Offset)] = for {
// make a contract
create <- liftF(
postCreateCommand(
iouCreateCommand(domain.Party unwrap party),
encoder,
uri,
headers,
)
)
cid = inside(
create map (_.convertTo[domain.SyncResponse[domain.ActiveContract[JsValue]]])
) { case (StatusCodes.OK, domain.OkResponse(contract, _, StatusCodes.OK)) =>
contract.contractId
}
// wait for the creation's offset
offsetAfter <- readUntil[In] {
case ContractDelta(creates, _, off @ Some(_)) =>
if (creates.exists(_._1 == cid.unwrap)) off else None
case _ => None
}
} yield (cid, offsetAfter)
def readMidwayOffset(kill: UniqueKillSwitch) = for {
// wait for the ACS
_ <- readUntil[In] {
case ContractDelta(_, _, offset) => offset
case _ => None
}
// make a contract and fetch the offset after it
(cid, betweenOffset) <- offsetAfterCreate()
// archive it
archive <- liftF(postArchiveCommand(TpId.Iou.Iou, cid, encoder, uri, headers))
_ = archive._1 should ===(StatusCodes.OK)
// wait for the archival offset
afterOffset <- readUntil[In] {
case ContractDelta(_, archived, offset) =>
if (archived.exists(_.contractId == cid)) offset else None
case _ => None
}
// if you try to prune afterOffset, pruning fails with
// OFFSET_OUT_OF_RANGE(9,db14ee96): prune_up_to needs to be before ledger end 0000000000000007
// create another dummy contract and ignore it
_ <- offsetAfterCreate()
_ = kill.shutdown()
} yield (betweenOffset, afterOffset)
val query = """[{"templateIds": ["Iou:Iou"]}]"""
for {
jwt <- jwtForParties(uri)(List(party.unwrap), List(), testId)
(kill, source) =
singleClientQueryStream(jwt, uri, query)
.viaMat(KillSwitches.single)(Keep.right)
.preMaterialize()
offsets <- source.via(parseResp).runWith(Consume.interpret(readMidwayOffset(kill)))
} yield offsets
}
"query on a bunch of random splits should yield consistent results" in withHttpService {
(uri, _, _, _) =>
for {

View File

@ -24,7 +24,7 @@
- forbid a non-authorized party to start a trigger: [TriggerServiceTest.scala](triggers/service/src/test/scala/com/digitalasset/daml/lf/engine/trigger/TriggerServiceTest.scala#L622)
- forbid a non-authorized party to stop a trigger: [TriggerServiceTest.scala](triggers/service/src/test/scala/com/digitalasset/daml/lf/engine/trigger/TriggerServiceTest.scala#L659)
- forbid a non-authorized user to upload a DAR: [TriggerServiceTest.scala](triggers/service/src/test/scala/com/digitalasset/daml/lf/engine/trigger/TriggerServiceTest.scala#L675)
- multiple websocket requests over the same WebSocket connection are NOT allowed: [AbstractWebsocketServiceIntegrationTest.scala](ledger-service/http-json/src/itlib/scala/http/AbstractWebsocketServiceIntegrationTest.scala#L111)
- multiple websocket requests over the same WebSocket connection are NOT allowed: [AbstractWebsocketServiceIntegrationTest.scala](ledger-service/http-json/src/itlib/scala/http/AbstractWebsocketServiceIntegrationTest.scala#L118)
- refresh a token after expiry on the server side: [TriggerServiceTest.scala](triggers/service/src/test/scala/com/digitalasset/daml/lf/engine/trigger/TriggerServiceTest.scala#L700)
- reject requests with missing auth header: [AbstractHttpServiceIntegrationTest.scala](ledger-service/http-json/src/itlib/scala/http/AbstractHttpServiceIntegrationTest.scala#L1280)
- request a fresh token after expiry on user request: [TriggerServiceTest.scala](triggers/service/src/test/scala/com/digitalasset/daml/lf/engine/trigger/TriggerServiceTest.scala#L685)
@ -35,9 +35,9 @@
- return unauthorized without cookie: [TestMiddleware.scala](triggers/service/auth/src/test/scala/com/daml/auth/middleware/oauth2/TestMiddleware.scala#L87)
- the /login endpoint with an oauth server checking claims should not authorize disallowed admin claims: [TestMiddleware.scala](triggers/service/auth/src/test/scala/com/daml/auth/middleware/oauth2/TestMiddleware.scala#L343)
- the /login endpoint with an oauth server checking claims should not authorize unauthorized parties: [TestMiddleware.scala](triggers/service/auth/src/test/scala/com/daml/auth/middleware/oauth2/TestMiddleware.scala#L336)
- websocket request with invalid protocol token should be denied: [AbstractWebsocketServiceIntegrationTest.scala](ledger-service/http-json/src/itlib/scala/http/AbstractWebsocketServiceIntegrationTest.scala#L91)
- websocket request with valid protocol token should allow client subscribe to stream: [AbstractWebsocketServiceIntegrationTest.scala](ledger-service/http-json/src/itlib/scala/http/AbstractWebsocketServiceIntegrationTest.scala#L79)
- websocket request without protocol token should be denied: [AbstractWebsocketServiceIntegrationTest.scala](ledger-service/http-json/src/itlib/scala/http/AbstractWebsocketServiceIntegrationTest.scala#L101)
- websocket request with invalid protocol token should be denied: [AbstractWebsocketServiceIntegrationTest.scala](ledger-service/http-json/src/itlib/scala/http/AbstractWebsocketServiceIntegrationTest.scala#L98)
- websocket request with valid protocol token should allow client subscribe to stream: [AbstractWebsocketServiceIntegrationTest.scala](ledger-service/http-json/src/itlib/scala/http/AbstractWebsocketServiceIntegrationTest.scala#L86)
- websocket request without protocol token should be denied: [AbstractWebsocketServiceIntegrationTest.scala](ledger-service/http-json/src/itlib/scala/http/AbstractWebsocketServiceIntegrationTest.scala#L108)
- well-authorized create is accepted: [AuthorizationSpec.scala](daml-lf/engine/src/test/scala/com/digitalasset/daml/lf/engine/AuthorizationSpec.scala#L43)
- well-authorized exercise is accepted: [AuthorizationSpec.scala](daml-lf/engine/src/test/scala/com/digitalasset/daml/lf/engine/AuthorizationSpec.scala#L141)
- well-authorized exercise/create is accepted: [AuthPropagationSpec.scala](daml-lf/engine/src/test/scala/com/digitalasset/daml/lf/engine/AuthPropagationSpec.scala#L220)