mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 09:17:43 +03:00
warn on unknown template IDs in searchForever (#4312)
* in searchForever, warn on unknown template IDs as long as at least one is known * remove unused resolveTemplateId * factor out WS request parts in WS integration test * factor out IOU create * test early template ID warning in searchForever stream * document warnings case for searchForever CHANGELOG_BEGIN - [JSON API - Experimental] Precede stream with warnings of unknown template IDs, if any, rather than failing outright. See `issue #4290 <https://github.com/digital-asset/daml/issues/4290>`_. CHANGELOG_END
This commit is contained in:
parent
9f57994f2d
commit
7882080207
@ -847,6 +847,14 @@ and archives the one above, the same stream will eventually produce::
|
||||
"archived": "#1:0"
|
||||
}]
|
||||
|
||||
If any template IDs are found not to resolve, the first non-heartbeat
|
||||
element of the stream will report them::
|
||||
|
||||
{"warnings": {"unknownTemplateIds": ["UnknownModule:UnknownEntity"]}}
|
||||
|
||||
and the stream will continue, provided that at least one template ID
|
||||
resolved properly.
|
||||
|
||||
Aside from ``"created"`` and ``"archived"`` elements, ``"error"``
|
||||
elements may appear, which contain a string describing the error. The
|
||||
stream will continue in these cases, rather than terminating.
|
||||
|
@ -322,9 +322,9 @@ class ContractsService(
|
||||
\/.fromTryCatchNonFatal(LfValueCodec.apiValueToJsValue(a)).leftMap(e =>
|
||||
Error('lfValueToJsValue, e.description))
|
||||
|
||||
private def resolveTemplateIds(
|
||||
xs: Set[domain.TemplateId.OptionalPkg],
|
||||
): (Set[domain.TemplateId.RequiredPkg], Set[domain.TemplateId.OptionalPkg]) = {
|
||||
private[http] def resolveTemplateIds[Tid <: domain.TemplateId.OptionalPkg](
|
||||
xs: Set[Tid],
|
||||
): (Set[domain.TemplateId.RequiredPkg], Set[Tid]) = {
|
||||
xs.partitionMap { x =>
|
||||
resolveTemplateId(x) toLeftDisjunction x
|
||||
}
|
||||
|
@ -139,7 +139,6 @@ object HttpService extends StrictLogging {
|
||||
|
||||
websocketService = new WebSocketService(
|
||||
contractsService,
|
||||
packageService.resolveTemplateId,
|
||||
encoder,
|
||||
decoder,
|
||||
wsConfig,
|
||||
|
@ -22,6 +22,8 @@ import com.digitalasset.ledger.api.{v1 => api}
|
||||
|
||||
import com.typesafe.scalalogging.LazyLogging
|
||||
import scalaz.Liskov, Liskov.<~<
|
||||
import scalaz.std.tuple._
|
||||
import scalaz.syntax.bifunctor._
|
||||
import scalaz.syntax.show._
|
||||
import scalaz.syntax.tag._
|
||||
import scalaz.syntax.traverse._
|
||||
@ -69,7 +71,6 @@ object WebSocketService {
|
||||
|
||||
class WebSocketService(
|
||||
contractsService: ContractsService,
|
||||
resolveTemplateId: PackageService.ResolveTemplateId,
|
||||
encoder: DomainJsonEncoder,
|
||||
decoder: DomainJsonDecoder,
|
||||
wsConfig: Option[WebsocketConfig])(implicit mat: Materializer, ec: ExecutionContext)
|
||||
@ -159,16 +160,18 @@ class WebSocketService(
|
||||
jwt: Jwt,
|
||||
jwtPayload: JwtPayload,
|
||||
request: GetActiveContractsRequest): Source[Message, NotUsed] =
|
||||
resolveRequiredTemplateIds(request.templateIds) match {
|
||||
case Some(ids) =>
|
||||
contractsService.resolveTemplateIds(request.templateIds).leftMap(_.toList) match {
|
||||
case (ids @ (_ +: _), unresolved) =>
|
||||
contractsService
|
||||
.insertDeleteStepSource(jwt, jwtPayload.party, ids, Terminates.Never)
|
||||
.via(convertFilterContracts(prepareFilters(ids, request.query)))
|
||||
.filter(_.nonEmpty)
|
||||
.map(sae => TextMessage(sae.render.compactPrint))
|
||||
case None =>
|
||||
.map(_.render)
|
||||
.prepend(reportUnresolvedTemplateIds(unresolved))
|
||||
.map(jsv => TextMessage(jsv.compactPrint))
|
||||
case _ =>
|
||||
Source.single(
|
||||
wsErrorMessage("Cannot find one of templateIds " + request.templateIds.toString))
|
||||
wsErrorMessage("Cannot find any of templateIds " + request.templateIds.toString))
|
||||
}
|
||||
|
||||
private[http] def wsErrorMessage(errorMsg: String): TextMessage.Strict =
|
||||
@ -205,10 +208,12 @@ class WebSocketService(
|
||||
.via(conflation)
|
||||
.map(sae => sae copy (step = sae.step.mapPreservingIds(_ map lfValueToJsValue)))
|
||||
|
||||
private def resolveRequiredTemplateIds(
|
||||
xs: Set[domain.TemplateId.OptionalPkg]): Option[List[domain.TemplateId.RequiredPkg]] = {
|
||||
import scalaz.std.list._
|
||||
import scalaz.std.option._
|
||||
xs.toList.traverse(resolveTemplateId)
|
||||
}
|
||||
private def reportUnresolvedTemplateIds(
|
||||
unresolved: Set[domain.TemplateId.OptionalPkg]): Source[JsValue, NotUsed] =
|
||||
if (unresolved.isEmpty) Source.empty
|
||||
else
|
||||
Source.single {
|
||||
import spray.json._
|
||||
Map("warnings" -> domain.UnknownTemplateIds(unresolved.toList)).toJson
|
||||
}
|
||||
}
|
||||
|
@ -68,23 +68,30 @@ class WebsocketServiceIntegrationTest
|
||||
private val collectResultsAsRawString: Sink[Message, Future[Seq[String]]] =
|
||||
Flow[Message].map(_.toString).filter(v => !(v contains "heartbeat")).toMat(Sink.seq)(Keep.right)
|
||||
|
||||
private def singleClientStream(serviceUri: Uri, query: String) = {
|
||||
val webSocketFlow = Http().webSocketClientFlow(
|
||||
WebSocketRequest(
|
||||
uri = serviceUri.copy(scheme = "ws").withPath(Uri.Path("/contracts/searchForever")),
|
||||
subprotocol = validSubprotocol))
|
||||
Source
|
||||
.single(TextMessage(query))
|
||||
.via(webSocketFlow)
|
||||
}
|
||||
|
||||
private def initialIouCreate(serviceUri: Uri) = {
|
||||
val payload = TestUtil.readFile("it/iouCreateCommand.json")
|
||||
TestUtil.postJsonStringRequest(
|
||||
serviceUri.withPath(Uri.Path("/command/create")),
|
||||
payload,
|
||||
headersWithAuth)
|
||||
}
|
||||
|
||||
"websocket should publish transactions when command create is completed" in withHttpService {
|
||||
(uri, _, _) =>
|
||||
val payload = TestUtil.readFile("it/iouCreateCommand.json")
|
||||
for {
|
||||
_ <- TestUtil.postJsonStringRequest(
|
||||
uri.withPath(Uri.Path("/command/create")),
|
||||
payload,
|
||||
headersWithAuth)
|
||||
_ <- initialIouCreate(uri)
|
||||
|
||||
webSocketFlow = Http().webSocketClientFlow(
|
||||
WebSocketRequest(
|
||||
uri = uri.copy(scheme = "ws").withPath(Uri.Path("/contracts/searchForever")),
|
||||
subprotocol = validSubprotocol))
|
||||
|
||||
clientMsg <- Source
|
||||
.single(TextMessage("""{"templateIds": ["Iou:Iou"]}"""))
|
||||
.via(webSocketFlow)
|
||||
clientMsg <- singleClientStream(uri, """{"templateIds": ["Iou:Iou"]}""")
|
||||
.runWith(collectResultsAsRawString)
|
||||
} yield
|
||||
inside(clientMsg) {
|
||||
@ -93,16 +100,23 @@ class WebsocketServiceIntegrationTest
|
||||
}
|
||||
}
|
||||
|
||||
"websocket should warn on unknown template IDs" in withHttpService { (uri, _, _) =>
|
||||
for {
|
||||
_ <- initialIouCreate(uri)
|
||||
|
||||
clientMsg <- singleClientStream(uri, """{"templateIds": ["Iou:Iou", "Unknown:Template"]}""")
|
||||
.runWith(collectResultsAsRawString)
|
||||
} yield
|
||||
inside(clientMsg) {
|
||||
case Seq(warning, result) =>
|
||||
warning should include("\"warnings\":{\"unknownTemplateIds\":[\"Unk")
|
||||
result should include("\"issuer\":\"Alice\"")
|
||||
}
|
||||
}
|
||||
|
||||
"websocket should send error msg when receiving malformed message" in withHttpService {
|
||||
(uri, _, _) =>
|
||||
val webSocketFlow = Http().webSocketClientFlow(
|
||||
WebSocketRequest(
|
||||
uri = uri.copy(scheme = "ws").withPath(Uri.Path("/contracts/searchForever")),
|
||||
subprotocol = validSubprotocol))
|
||||
|
||||
val clientMsg = Source
|
||||
.single(TextMessage("{}"))
|
||||
.via(webSocketFlow)
|
||||
val clientMsg = singleClientStream(uri, "{}")
|
||||
.runWith(collectResultsAsRawString)
|
||||
|
||||
val result = Await.result(clientMsg, 10.seconds)
|
||||
@ -133,22 +147,12 @@ class WebsocketServiceIntegrationTest
|
||||
(uri, _, _) =>
|
||||
import spray.json._
|
||||
|
||||
val payload = TestUtil.readFile("it/iouCreateCommand.json")
|
||||
val initialCreate = TestUtil.postJsonStringRequest(
|
||||
uri.withPath(Uri.Path("/command/create")),
|
||||
payload,
|
||||
headersWithAuth)
|
||||
val initialCreate = initialIouCreate(uri)
|
||||
def exercisePayload(cid: String) =
|
||||
baseExercisePayload.copy(
|
||||
fields = baseExercisePayload.fields updated ("contractId", JsString(cid)))
|
||||
|
||||
val webSocketFlow = Http().webSocketClientFlow(
|
||||
WebSocketRequest(
|
||||
uri = uri.copy(scheme = "ws").withPath(Uri.Path("/contracts/searchForever")),
|
||||
subprotocol = validSubprotocol))
|
||||
|
||||
val query =
|
||||
TextMessage.Strict("""{"templateIds": ["Iou:Iou"]}""")
|
||||
val query = """{"templateIds": ["Iou:Iou"]}"""
|
||||
|
||||
val parseResp: Flow[Message, JsValue, NotUsed] =
|
||||
Flow[Message]
|
||||
@ -190,7 +194,7 @@ class WebsocketServiceIntegrationTest
|
||||
for {
|
||||
creation <- initialCreate
|
||||
_ = creation._1 shouldBe 'success
|
||||
lastState <- Source single query via webSocketFlow via parseResp runWith resp
|
||||
lastState <- singleClientStream(uri, query) via parseResp runWith resp
|
||||
} yield lastState should ===(ShouldHaveEnded(2))
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user