experimental Oracle support in json-api (#8596)

* separate OracleQueries from PostgresQueries

- with some changes from 8161e63189 courtesy @cocreature

Co-authored-by: Moritz Kiefer <moritz.kiefer@purelyfunctional.org>

* abstract BIGINT

* json, signatories, observers columns

* compatible lastOffset

Co-authored-by: Moritz Kiefer <moritz.kiefer@purelyfunctional.org>

* oracle functions for select (single template ID), insert

Co-authored-by: Moritz Kiefer <moritz.kiefer@purelyfunctional.org>

* add oracle branch to integration tests

* oracle CLI configuration for json-api

* run integration tests with ojdbc in classpath

* update maven_install for ojdbc

* drop table if exists for Oracle

* make create DDLs and drops more planned out; drop in reverse order for Oracle integrity

* repin maven

* port agreement_text

* port (by removal) array part of ledger offset update

* use CASE instead of JSON map lookup for multiparty offset update

* simplify self types

* fix contract archival

* repin

* remove selectContracts in favor of selectContractsMultiTemplate

* move Oracle test execution to separate build target

* move websocket test to itlib

* make a bad array instance for Oracle

* report actually-available JDBC drivers only

* configure Oracle test from CI

* attempt with platforms and constraints

* a mismash of bazel to get it to conditionally enable oracle testing

* fix dep resolution in Scala 2.13

* make the Oracle test a stub (inits and does empty DB query)

* remove commented unused deps

* no changelog

CHANGELOG_BEGIN
CHANGELOG_END

* repin

* we never supply a value for the surrogate ID columns

- suggested by @cocreature; thanks

* add not null to json in DB-specific place

- suggested by @cocreature; thanks

* why DBContractKey

- suggested by @cocreature; thanks

* textType isn't finalized

- suggested by @cocreature; thanks

Co-authored-by: Moritz Kiefer <moritz.kiefer@purelyfunctional.org>
This commit is contained in:
Stephen Compall 2021-02-17 03:50:35 -05:00 committed by GitHub
parent adb81a3961
commit b94b5f92f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1601 additions and 1126 deletions

View File

@ -56,6 +56,7 @@ def install_java_deps():
"com.lihaoyi:pprint_{}:0.6.0".format(scala_major_version),
"com.lihaoyi:sjsonnet_{}:0.3.0".format(scala_major_version),
"commons-io:commons-io:2.5",
"com.oracle.database.jdbc:ojdbc8:19.8.0.0",
"com.sparkjava:spark-core:2.9.1",
"com.squareup:javapoet:1.11.1",
"com.storm-enroute:scalameter_{}:0.19".format(scala_major_version),

View File

@ -240,7 +240,10 @@ jobs:
echo "Could not connect to Oracle, trying again..."
sleep 1
done
# TODO Actually run some tests once we have them.
# Actually run some tests
bazel test --//ledger-service/http-json-oracle:oracle_testing=yes \
--test_env ORACLE_USERNAME=system --test_env ORACLE_PORT --test_env ORACLE_PWD \
//ledger-service/http-json-oracle/...
env:
ARTIFACTORY_USERNAME: $(ARTIFACTORY_USERNAME)
ARTIFACTORY_PASSWORD: $(ARTIFACTORY_PASSWORD)

View File

@ -27,7 +27,6 @@ da_scala_library(
"@maven//:org_tpolecat_doobie_postgres",
],
scalacopts = lf_scalacopts,
silent_annotations = True,
tags = ["maven_coordinates=com.daml:http-json-db-backend:__VERSION__"],
visibility = [
"//visibility:public",

View File

@ -3,8 +3,6 @@
package com.daml.http.dbbackend
import com.github.ghik.silencer.silent
import doobie._
import doobie.implicits._
import scalaz.{@@, Foldable, Functor, OneAnd, Tag}
@ -23,79 +21,87 @@ import cats.syntax.apply._
import cats.syntax.functor._
sealed abstract class Queries {
import Queries._
import Queries._, InitDdl._
import Implicits._
def dropTableIfExists(table: String): Fragment = Fragment.const(s"DROP TABLE IF EXISTS ${table}")
protected[this] def dropTableIfExists(table: String): Fragment
/** for use when generating predicates */
private[http] val contractColumnName: Fragment = sql"payload"
private[this] val dropContractsTable: Fragment = dropTableIfExists("contract")
private[this] val createContractsTable: Fragment = sql"""
private[this] val createContractsTable = CreateTable(
"contract",
sql"""
CREATE TABLE
contract
(contract_id TEXT NOT NULL PRIMARY KEY
,tpid BIGINT NOT NULL REFERENCES template_id (tpid)
,key JSONB NOT NULL
,payload JSONB NOT NULL
,signatories TEXT ARRAY NOT NULL
,observers TEXT ARRAY NOT NULL
,agreement_text TEXT NOT NULL
(contract_id """ ++ textType ++ sql""" NOT NULL PRIMARY KEY
,tpid """ ++ bigIntType ++ sql""" NOT NULL REFERENCES template_id (tpid)
,""" ++ jsonColumn(sql"key") ++ sql"""
,""" ++ jsonColumn(contractColumnName) ++
contractsTableSignatoriesObservers ++ sql"""
,agreement_text """ ++ agreementTextType ++ sql"""
)
"""
""",
)
val indexContractsTable: Fragment = sql"""
protected[this] def contractsTableSignatoriesObservers: Fragment
private[this] val indexContractsTable = CreateIndex(sql"""
CREATE INDEX contract_tpid_idx ON contract (tpid)
"""
""")
private[this] val indexContractsKeys: Fragment = sql"""
CREATE INDEX contract_tpid_key_idx ON contract USING BTREE (tpid, key)
"""
private[this] val dropOffsetTable: Fragment = dropTableIfExists("ledger_offset")
private[this] val createOffsetTable: Fragment = sql"""
private[this] val createOffsetTable = CreateTable(
"ledger_offset",
sql"""
CREATE TABLE
ledger_offset
(party TEXT NOT NULL
,tpid BIGINT NOT NULL REFERENCES template_id (tpid)
,last_offset TEXT NOT NULL
(party """ ++ textType ++ sql""" NOT NULL
,tpid """ ++ bigIntType ++ sql""" NOT NULL REFERENCES template_id (tpid)
,last_offset """ ++ textType ++ sql""" NOT NULL
,PRIMARY KEY (party, tpid)
)
"""
""",
)
private[this] val dropTemplateIdsTable: Fragment = dropTableIfExists("template_id")
protected[this] def bigIntType: Fragment // must match bigserial
protected[this] def bigSerialType: Fragment
protected[this] def textType: Fragment
protected[this] def agreementTextType: Fragment
private[this] val createTemplateIdsTable: Fragment = sql"""
protected[this] def jsonColumn(name: Fragment): Fragment
private[this] val createTemplateIdsTable = CreateTable(
"template_id",
sql"""
CREATE TABLE
template_id
(tpid BIGSERIAL NOT NULL PRIMARY KEY
,package_id TEXT NOT NULL
,template_module_name TEXT NOT NULL
,template_entity_name TEXT NOT NULL
(tpid """ ++ bigSerialType ++ sql""" NOT NULL PRIMARY KEY
,package_id """ ++ textType ++ sql""" NOT NULL
,template_module_name """ ++ textType ++ sql""" NOT NULL
,template_entity_name """ ++ textType ++ sql""" NOT NULL
,UNIQUE (package_id, template_module_name, template_entity_name)
)
"""
""",
)
private[http] def dropAllTablesIfExist(implicit log: LogHandler): ConnectionIO[Unit] =
(dropContractsTable.update.run
*> dropOffsetTable.update.run
*> dropTemplateIdsTable.update.run).void
private[http] def dropAllTablesIfExist(implicit log: LogHandler): ConnectionIO[Unit] = {
import cats.instances.vector._, cats.syntax.foldable.{toFoldableOps => ToFoldableOps}
initDatabaseDdls.reverse
.collect { case CreateTable(name, _) => dropTableIfExists(name) }
.traverse_(_.update.run)
}
private[this] def initDatabaseDdls: Vector[Fragment] =
protected[this] def initDatabaseDdls: Vector[InitDdl] =
Vector(
createTemplateIdsTable,
createOffsetTable,
createContractsTable,
indexContractsTable,
indexContractsKeys,
)
private[http] def initDatabase(implicit log: LogHandler): ConnectionIO[Unit] = {
import cats.instances.vector._, cats.syntax.foldable.{toFoldableOps => ToFoldableOps}
initDatabaseDdls.traverse_(_.update.run)
initDatabaseDdls.traverse_(_.create.update.run)
}
def surrogateTemplateId(packageId: String, moduleName: String, entityName: String)(implicit
@ -114,13 +120,15 @@ sealed abstract class Queries {
)
}
def lastOffset(parties: OneAnd[Set, String], tpid: SurrogateTpId)(implicit
log: LogHandler,
pls: Put[Vector[String]],
final def lastOffset(parties: OneAnd[Set, String], tpid: SurrogateTpId)(implicit
log: LogHandler
): ConnectionIO[Map[String, String]] = {
val partyVector = parties.toVector
sql"""SELECT party, last_offset FROM ledger_offset WHERE (party = ANY(${partyVector}) AND tpid = $tpid)"""
.query[(String, String)]
val partyVector =
cats.data.OneAnd(parties.head, parties.tail.toList)
val q = sql"""
SELECT party, last_offset FROM ledger_offset WHERE tpid = $tpid AND
""" ++ Fragments.in(fr"party", partyVector)
q.query[(String, String)]
.to[Vector]
.map(_.toMap)
}
@ -145,8 +153,7 @@ sealed abstract class Queries {
tpid: SurrogateTpId,
newOffset: String,
lastOffsets: Map[String, String],
)(implicit log: LogHandler, pls: Put[List[String]]): ConnectionIO[Int] = {
import spray.json.DefaultJsonProtocol._
)(implicit log: LogHandler): ConnectionIO[Int] = {
val (existingParties, newParties) = {
import cats.syntax.foldable._
parties.toList.partition(p => lastOffsets.contains(p))
@ -158,36 +165,52 @@ sealed abstract class Queries {
)
// If a concurrent transaction updated the offset for an existing party, we will get
// fewer rows and throw a StaleOffsetException in the caller.
val update =
sql"""UPDATE ledger_offset SET last_offset = $newOffset WHERE party = ANY($existingParties::text[]) AND tpid = $tpid AND last_offset = (${lastOffsets.toJson}::jsonb->>party)"""
val update = existingParties match {
case hdP +: tlP =>
Some(
sql"""UPDATE ledger_offset SET last_offset = $newOffset
WHERE """ ++ Fragments.in(fr"party", cats.data.OneAnd(hdP, tlP)) ++
sql""" AND tpid = $tpid
AND last_offset = """ ++ caseLookup(
lastOffsets.filter { case (k, _) => existingParties contains k },
fr"party",
)
)
case _ => None
}
for {
inserted <-
if (newParties.isEmpty) { Applicative[ConnectionIO].pure(0) }
else {
insert.updateMany(newParties.toList.map(p => (p, tpid, newOffset)))
}
updated <-
if (existingParties.isEmpty) { Applicative[ConnectionIO].pure(0) }
else {
update.update.run
}
updated <- update.cata(_.update.run, Applicative[ConnectionIO].pure(0))
} yield { inserted + updated }
}
@silent(" pas .* never used")
def insertContracts[F[_]: cats.Foldable: Functor, CK: JsonWriter, PL: JsonWriter](
private[this] def caseLookup(m: Map[String, String], selector: Fragment): Fragment =
fr"CASE" ++ {
assert(m.nonEmpty, "existing offsets must be non-empty")
val when +: whens = m.iterator.map { case (k, v) =>
fr"WHEN (" ++ selector ++ fr" = $k) THEN $v"
}.toVector
concatFragment(OneAnd(when, whens))
} ++ fr"ELSE NULL END"
// different databases encode contract keys in different formats
protected[this] type DBContractKey
protected[this] def toDBContractKey[CK: JsonWriter](ck: CK): DBContractKey
final def insertContracts[F[_]: cats.Foldable: Functor, CK: JsonWriter, PL: JsonWriter](
dbcs: F[DBContract[SurrogateTpId, CK, PL, Seq[String]]]
)(implicit log: LogHandler, pas: Put[Array[String]]): ConnectionIO[Int] =
Update[DBContract[SurrogateTpId, JsValue, JsValue, Array[String]]](
"""
INSERT INTO contract
VALUES (?, ?, ?::jsonb, ?::jsonb, ?, ?, ?)
ON CONFLICT (contract_id) DO NOTHING
""",
logHandler0 = log,
).updateMany(dbcs.map(_.mapKeyPayloadParties(_.toJson, _.toJson, _.toArray)))
primInsertContracts(dbcs.map(_.mapKeyPayloadParties(toDBContractKey(_), _.toJson, _.toArray)))
def deleteContracts[F[_]: Foldable](
protected[this] def primInsertContracts[F[_]: cats.Foldable: Functor](
dbcs: F[DBContract[SurrogateTpId, DBContractKey, JsValue, Array[String]]]
)(implicit log: LogHandler, pas: Put[Array[String]]): ConnectionIO[Int]
final def deleteContracts[F[_]: Foldable](
cids: F[String]
)(implicit log: LogHandler): ConnectionIO[Int] = {
cids.toVector match {
@ -199,8 +222,7 @@ sealed abstract class Queries {
}
}
@silent(" gvs .* never used")
private[http] def selectContracts(
private[http] final def selectContracts(
parties: OneAnd[Set, String],
tpid: SurrogateTpId,
predicate: Fragment,
@ -208,25 +230,9 @@ sealed abstract class Queries {
log: LogHandler,
gvs: Get[Vector[String]],
pvs: Put[Vector[String]],
): Query0[DBContract[Unit, JsValue, JsValue, Vector[String]]] = {
val partyVector = parties.toVector
val q = sql"""SELECT contract_id, key, payload, signatories, observers, agreement_text
FROM contract
WHERE (signatories && $partyVector::text[] OR observers && $partyVector::text[])
AND tpid = $tpid AND (""" ++ predicate ++ sql")"
q.query[(String, JsValue, JsValue, Vector[String], Vector[String], String)].map {
case (cid, key, payload, signatories, observers, agreement) =>
DBContract(
contractId = cid,
templateId = (),
key = key,
payload = payload,
signatories = signatories,
observers = observers,
agreementText = agreement,
)
}
}
): Query0[DBContract[Unit, JsValue, JsValue, Vector[String]]] =
selectContractsMultiTemplate(parties, Seq((tpid, predicate)), MatchedQueryMarker.Unused)
.map(_ copy (templateId = ()))
/** Make the smallest number of queries from `queries` that still indicates
* which query or queries produced each contract.
@ -236,7 +242,6 @@ sealed abstract class Queries {
* `templateId` of the resulting [[DBContract]] is actually the 0-based index
* into the `queries` argument that produced the contract.
*/
@silent(" gvs .* never used")
private[http] def selectContractsMultiTemplate[T[_], Mark](
parties: OneAnd[Set, String],
queries: Seq[(SurrogateTpId, Fragment)],
@ -245,50 +250,9 @@ sealed abstract class Queries {
log: LogHandler,
gvs: Get[Vector[String]],
pvs: Put[Vector[String]],
): T[Query0[DBContract[Mark, JsValue, JsValue, Vector[String]]]] = {
val partyVector = parties.toVector
def query(preds: OneAnd[Vector, (SurrogateTpId, Fragment)], findMark: SurrogateTpId => Mark) = {
val assocedPreds = preds.map { case (tpid, predicate) =>
sql"(tpid = $tpid AND (" ++ predicate ++ sql"))"
}
val unionPred = concatFragment(intersperse(assocedPreds, sql" OR "))
val q = sql"""SELECT contract_id, tpid, key, payload, signatories, observers, agreement_text
FROM contract AS c
WHERE (signatories && $partyVector::text[] OR observers && $partyVector::text[])
AND (""" ++ unionPred ++ sql")"
q.query[(String, SurrogateTpId, JsValue, JsValue, Vector[String], Vector[String], String)]
.map { case (cid, tpid, key, payload, signatories, observers, agreement) =>
DBContract(
contractId = cid,
templateId = findMark(tpid),
key = key,
payload = payload,
signatories = signatories,
observers = observers,
agreementText = agreement,
)
}
}
): T[Query0[DBContract[Mark, JsValue, JsValue, Vector[String]]]]
trackMatchIndices match {
case MatchedQueryMarker.ByInt =>
type Ix = Int
uniqueSets(queries.zipWithIndex map { case ((tpid, pred), ix) => (tpid, (pred, ix)) }).map {
preds: Map[SurrogateTpId, (Fragment, Ix)] =>
val predHd +: predTl = preds.toVector
val predsList = OneAnd(predHd, predTl).map { case (tpid, (predicate, _)) =>
(tpid, predicate)
}
query(predsList, tpid => preds(tpid)._2)
}
case MatchedQueryMarker.Unused =>
val predHd +: predTl = queries.toVector
query(OneAnd(predHd, predTl), identity)
}
}
private[http] def fetchById(
private[http] final def fetchById(
parties: OneAnd[Set, String],
tpid: SurrogateTpId,
contractId: String,
@ -349,6 +313,15 @@ object Queries {
)
}
private[dbbackend] sealed abstract class InitDdl extends Product with Serializable {
def create: Fragment
}
private[dbbackend] object InitDdl {
final case class CreateTable(name: String, create: Fragment) extends InitDdl
final case class CreateIndex(create: Fragment) extends InitDdl
}
/** Whether selectContractsMultiTemplate computes a matchedQueries marker,
* and whether it may compute >1 query to run.
*
@ -396,6 +369,226 @@ object Queries {
}
private[http] val Postgres: Queries = PostgresQueries
private[http] val Oracle: Queries = OracleQueries
}
private object PostgresQueries extends Queries
private object PostgresQueries extends Queries {
import Queries._, Queries.InitDdl.CreateIndex
import Implicits._
protected[this] override def dropTableIfExists(table: String) =
Fragment.const(s"DROP TABLE IF EXISTS ${table}")
protected[this] override def bigIntType = sql"BIGINT"
protected[this] override def bigSerialType = sql"BIGSERIAL"
protected[this] override def textType = sql"TEXT"
protected[this] override def agreementTextType = sql"TEXT NOT NULL"
protected[this] override def jsonColumn(name: Fragment) = name ++ sql" JSONB NOT NULL"
private[this] val indexContractsKeys = CreateIndex(sql"""
CREATE INDEX contract_tpid_key_idx ON contract USING BTREE (tpid, key)
""")
protected[this] override def initDatabaseDdls = super.initDatabaseDdls :+ indexContractsKeys
protected[this] override def contractsTableSignatoriesObservers = sql"""
,signatories TEXT ARRAY NOT NULL
,observers TEXT ARRAY NOT NULL
"""
protected[this] type DBContractKey = JsValue
protected[this] override def toDBContractKey[CK: JsonWriter](x: CK) = x.toJson
protected[this] override def primInsertContracts[F[_]: cats.Foldable: Functor](
dbcs: F[DBContract[SurrogateTpId, DBContractKey, JsValue, Array[String]]]
)(implicit log: LogHandler, pas: Put[Array[String]]): ConnectionIO[Int] =
Update[DBContract[SurrogateTpId, JsValue, JsValue, Array[String]]](
"""
INSERT INTO contract
VALUES (?, ?, ?::jsonb, ?::jsonb, ?, ?, ?)
ON CONFLICT (contract_id) DO NOTHING
""",
logHandler0 = log,
).updateMany(dbcs)
private[http] override def selectContractsMultiTemplate[T[_], Mark](
parties: OneAnd[Set, String],
queries: Seq[(SurrogateTpId, Fragment)],
trackMatchIndices: MatchedQueryMarker[T, Mark],
)(implicit
log: LogHandler,
gvs: Get[Vector[String]],
pvs: Put[Vector[String]],
): T[Query0[DBContract[Mark, JsValue, JsValue, Vector[String]]]] = {
val partyVector = parties.toVector
def query(preds: OneAnd[Vector, (SurrogateTpId, Fragment)], findMark: SurrogateTpId => Mark) = {
val assocedPreds = preds.map { case (tpid, predicate) =>
sql"(tpid = $tpid AND (" ++ predicate ++ sql"))"
}
val unionPred = concatFragment(intersperse(assocedPreds, sql" OR "))
val q = sql"""SELECT contract_id, tpid, key, payload, signatories, observers, agreement_text
FROM contract AS c
WHERE (signatories && $partyVector::text[] OR observers && $partyVector::text[])
AND (""" ++ unionPred ++ sql")"
q.query[(String, SurrogateTpId, JsValue, JsValue, Vector[String], Vector[String], String)]
.map { case (cid, tpid, key, payload, signatories, observers, agreement) =>
DBContract(
contractId = cid,
templateId = findMark(tpid),
key = key,
payload = payload,
signatories = signatories,
observers = observers,
agreementText = agreement,
)
}
}
trackMatchIndices match {
case MatchedQueryMarker.ByInt =>
type Ix = Int
uniqueSets(queries.zipWithIndex map { case ((tpid, pred), ix) => (tpid, (pred, ix)) }).map {
preds: Map[SurrogateTpId, (Fragment, Ix)] =>
val predHd +: predTl = preds.toVector
val predsList = OneAnd(predHd, predTl).map { case (tpid, (predicate, _)) =>
(tpid, predicate)
}
query(predsList, tpid => preds(tpid)._2)
}
case MatchedQueryMarker.Unused =>
val predHd +: predTl = queries.toVector
query(OneAnd(predHd, predTl), identity)
}
}
}
private object OracleQueries extends Queries {
import Queries.{DBContract, MatchedQueryMarker, SurrogateTpId}, Queries.InitDdl.CreateTable
import Implicits._
protected[this] override def dropTableIfExists(table: String) = sql"""BEGIN
EXECUTE IMMEDIATE 'DROP TABLE ' || $table;
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -942 THEN
RAISE;
END IF;
END;"""
protected[this] override def bigIntType = sql"NUMBER(19,0)"
protected[this] override def bigSerialType =
bigIntType ++ sql" GENERATED ALWAYS AS IDENTITY"
// TODO SC refine the string formats chosen here and for jsonColumn
protected[this] override def textType = sql"NVARCHAR2(100)"
protected[this] override def agreementTextType = sql"NVARCHAR2(100)"
protected[this] override def jsonColumn(name: Fragment) =
name ++ sql" CLOB NOT NULL CONSTRAINT ensure_json_" ++ name ++ sql" CHECK (" ++ name ++ sql" IS JSON)"
protected[this] override def contractsTableSignatoriesObservers = sql""
private val createSignatoriesTable = CreateTable(
"signatories",
sql"""
CREATE TABLE
signatories
(contract_id NVARCHAR2(100) NOT NULL REFERENCES contract(contract_id) ON DELETE CASCADE
,party NVARCHAR2(100) NOT NULL
,UNIQUE (contract_id, party)
)
""",
)
private val createObserversTable = CreateTable(
"observers",
sql"""
CREATE TABLE
observers
(contract_id NVARCHAR2(100) NOT NULL REFERENCES contract(contract_id) ON DELETE CASCADE
,party NVARCHAR2(100) NOT NULL
,UNIQUE (contract_id, party)
)
""",
)
protected[this] override def initDatabaseDdls =
super.initDatabaseDdls ++ Seq(createSignatoriesTable, createObserversTable)
protected[this] type DBContractKey = JsValue
protected[this] override def toDBContractKey[CK: JsonWriter](x: CK) =
JsObject(Map("key" -> x.toJson))
protected[this] override def primInsertContracts[F[_]: cats.Foldable: Functor](
dbcs: F[DBContract[SurrogateTpId, DBContractKey, JsValue, Array[String]]]
)(implicit log: LogHandler, pas: Put[Array[String]]): ConnectionIO[Int] = {
println("insert contracts")
println(dbcs)
val r = Update[(String, SurrogateTpId, JsValue, JsValue, String)](
"""
INSERT INTO contract(contract_id, tpid, key, payload, agreement_text)
VALUES (?, ?, ?, ?, ?)
""",
logHandler0 = log,
).updateMany(
dbcs
.map { c =>
// println(c)
(c.contractId, c.templateId, c.key, c.payload, c.agreementText)
}
)
println("inserted")
import cats.syntax.foldable._, cats.instances.vector._
val r2 = Update[(String, String)](
"""
INSERT INTO signatories(contract_id, party)
VALUES (?, ?)
""",
logHandler0 = log,
).updateMany(dbcs.foldMap(c => c.signatories.view.map(s => (c.contractId, s)).toVector))
val r3 = Update[(String, String)](
"""
INSERT INTO observers(contract_id, party)
VALUES (?, ?)
""",
logHandler0 = log,
).updateMany(dbcs.foldMap(c => c.observers.view.map(s => (c.contractId, s)).toVector))
r *> r2 *> r3
}
private[http] override def selectContractsMultiTemplate[T[_], Mark](
parties: OneAnd[Set, String],
queries: Seq[(SurrogateTpId, Fragment)],
trackMatchIndices: MatchedQueryMarker[T, Mark],
)(implicit
log: LogHandler,
gvs: Get[Vector[String]],
pvs: Put[Vector[String]],
): T[Query0[DBContract[Mark, JsValue, JsValue, Vector[String]]]] = {
val Seq((tpid, predicate @ _)) = queries // TODO SC handle more than one
val _ = parties
println("selecting")
val q = sql"""SELECT contract_id, key, payload, agreement_text
FROM contract
WHERE tpid = $tpid""" // TODO SC AND (""" ++ predicate ++ sql")"
trackMatchIndices match {
case MatchedQueryMarker.ByInt => sys.error("TODO websocket Oracle support")
case MatchedQueryMarker.Unused =>
q.query[(String, JsValue, JsValue, Option[String])].map {
case (cid, key, payload, agreement) =>
DBContract(
contractId = cid,
templateId = tpid,
key = key,
payload = payload,
signatories = Vector(),
observers = Vector(),
agreementText = agreement getOrElse "",
)
}
}
}
}

View File

@ -0,0 +1,90 @@
# Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
load(
"//bazel_tools:scala.bzl",
"da_scala_binary",
"da_scala_library",
"da_scala_test",
"lf_scalacopts",
)
load("@bazel_skylib//rules:common_settings.bzl", "string_flag")
load("@scala_version//:index.bzl", "scala_version_suffix")
hj_scalacopts = lf_scalacopts + [
"-P:wartremover:traverser:org.wartremover.warts.NonUnitStatements",
]
string_flag(
name = "oracle_testing",
build_setting_default = "no",
)
config_setting(
name = "oracle_available",
flag_values = {
":oracle_testing": "yes",
},
)
config_setting(
name = "oracle_unavailable",
flag_values = {
":oracle_testing": "no",
},
)
# ERROR: .../daml/ledger-service/http-json-oracle/BUILD.bazel:33:14: no such target
# '@platforms//:incompatible': target 'incompatible' not declared in package '' defined by
# /private/var/tmp/.../85ba8c006f4674ac1f1213a18c0df36a/external/platforms/BUILD and
# referenced by '//ledger-service/http-json-oracle:integration-tests'
constraint_setting(name = "incompatible_setting")
constraint_value(
name = "incompatible",
constraint_setting = ":incompatible_setting",
)
da_scala_test(
name = "integration-tests",
size = "large",
srcs = glob(["src/it/scala/**/*.scala"]),
data = [
"//docs:quickstart-model.dar",
"//ledger-service/http-json:Account.dar",
"//ledger/test-common:model-tests.dar",
"//ledger/test-common/test-certificates",
],
plugins = [
"@maven//:org_typelevel_kind_projector_{}".format(scala_version_suffix),
],
scala_deps = [
"@maven//:com_chuusai_shapeless",
"@maven//:com_typesafe_akka_akka_http_core",
"@maven//:com_typesafe_scala_logging_scala_logging",
"@maven//:io_spray_spray_json",
"@maven//:org_scalatest_scalatest",
"@maven//:org_scalaz_scalaz_core",
],
scalacopts = hj_scalacopts,
target_compatible_with = select({
":oracle_available": [],
"//conditions:default": [":incompatible"],
}),
runtime_deps = [
"@maven//:com_oracle_database_jdbc_ojdbc8",
],
deps = [
"//daml-lf/data",
"//daml-lf/interface",
"//daml-lf/transaction",
"//language-support/scala/bindings-akka",
"//ledger-api/rs-grpc-bridge",
"//ledger-service/http-json",
"//ledger-service/http-json:integration-tests-lib",
"//ledger-service/http-json-testing",
"//ledger-service/jwt",
"//ledger-service/utils",
"//ledger/ledger-api-common",
],
)

View File

@ -0,0 +1,27 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.http
import org.scalatest.Inside
import org.scalatest.AsyncTestSuite
import org.scalatest.matchers.should.Matchers
trait HttpServiceOracleInt extends AbstractHttpServiceIntegrationTestFuns {
this: AsyncTestSuite with Matchers with Inside =>
override final def jdbcConfig: Option[JdbcConfig] = Some(jdbcConfig_)
import sys.env
private[this] def oraclePort = env("ORACLE_PORT")
private[this] def oraclePwd = env("ORACLE_PWD")
private[this] def oracleUsername = env("ORACLE_USERNAME")
protected[this] lazy val jdbcConfig_ = JdbcConfig(
driver = "oracle.jdbc.OracleDriver",
url = s"jdbc:oracle:thin:@//localhost:$oraclePort/XEPDB1",
user = oracleUsername,
password = oraclePwd,
createSchema = true,
)
}

View File

@ -0,0 +1,44 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.http
import org.scalatest.Inside
import org.scalatest.freespec.AsyncFreeSpec
import org.scalatest.matchers.should.Matchers
import spray.json.JsValue
// TODO SC remove `abstract` to reenable
abstract class HttpServiceWithOracleIntTest
extends AbstractHttpServiceIntegrationTest
with HttpServiceOracleInt {
override def staticContentConfig: Option[StaticContentConfig] = None
override def wsConfig: Option[WebsocketConfig] = None
}
// TODO SC this is a small subset of above test, remove when reenabling
class HttpServiceWithOracleIntTestStub
extends AsyncFreeSpec
with Matchers
with Inside
with AbstractHttpServiceIntegrationTestFuns
with HttpServiceOracleInt {
override def staticContentConfig: Option[StaticContentConfig] = None
override def wsConfig: Option[WebsocketConfig] = None
override def useTls = HttpServiceTestFixture.UseTls.NoTls
"query POST with empty query" in withHttpService { (uri, encoder, _) =>
searchExpectOk(
List.empty,
jsObject("""{"templateIds": ["Iou:Iou"]}"""),
uri,
encoder,
).map { acl: List[domain.ActiveContract[JsValue]] =>
acl shouldBe empty
}
}
}

View File

@ -123,6 +123,7 @@ da_scala_binary(
daml_compile(
name = "Account",
srcs = ["src/it/daml/Account.daml"],
visibility = ["//ledger-service:__subpackages__"],
)
da_scala_test(
@ -169,6 +170,38 @@ da_scala_test(
],
)
da_scala_library(
name = "integration-tests-lib",
srcs = glob(["src/itlib/scala/**/*.scala"]),
scala_deps = [
"@maven//:com_chuusai_shapeless",
"@maven//:com_typesafe_akka_akka_http_core",
"@maven//:com_typesafe_scala_logging_scala_logging",
"@maven//:io_spray_spray_json",
"@maven//:org_scalacheck_scalacheck",
"@maven//:org_scalactic_scalactic",
"@maven//:org_scalatest_scalatest",
"@maven//:org_scalaz_scalaz_core",
],
silent_annotations = True,
visibility = ["//ledger-service:__subpackages__"],
deps = [
":http-json",
"//bazel_tools/runfiles:scala_runfiles",
"//daml-lf/data",
"//daml-lf/interface",
"//daml-lf/transaction",
"//language-support/scala/bindings-akka",
"//ledger-api/rs-grpc-bridge",
"//ledger-service/http-json-testing",
"//ledger-service/jwt",
"//ledger-service/utils",
"//ledger/ledger-api-auth",
"//ledger/ledger-api-common",
"//libs-scala/ports",
],
)
da_scala_test(
name = "integration-tests",
size = "large",
@ -199,9 +232,9 @@ da_scala_test(
"@maven//:org_typelevel_cats_free",
],
scalacopts = hj_scalacopts,
silent_annotations = True,
deps = [
":http-json",
":integration-tests-lib",
"//bazel_tools/runfiles:scala_runfiles",
"//daml-lf/data",
"//daml-lf/interface",

View File

@ -4,13 +4,12 @@
package com.daml.http
import com.daml.testing.postgresql.PostgresAroundAll
import com.typesafe.scalalogging.StrictLogging
import org.scalatest.Inside
import org.scalatest.freespec.AsyncFreeSpec
import org.scalatest.AsyncTestSuite
import org.scalatest.matchers.should.Matchers
trait HttpServicePostgresInt extends AbstractHttpServiceIntegrationTestFuns with PostgresAroundAll {
this: AsyncFreeSpec with Matchers with Inside with StrictLogging =>
this: AsyncTestSuite with Matchers with Inside =>
override final def jdbcConfig: Option[JdbcConfig] = Some(jdbcConfig_)

View File

@ -3,938 +3,6 @@
package com.daml.http
import akka.NotUsed
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest}
import akka.http.scaladsl.model.{StatusCodes, Uri}
import akka.stream.{KillSwitches, UniqueKillSwitch}
import akka.stream.scaladsl.{Keep, Sink, Source}
import com.daml.http.json.SprayJson
import com.typesafe.scalalogging.StrictLogging
import org.scalatest._
import org.scalatest.freespec.AsyncFreeSpec
import org.scalatest.matchers.should.Matchers
import scalaz.std.option._
import scalaz.std.vector._
import scalaz.syntax.std.option._
import scalaz.syntax.tag._
import scalaz.syntax.traverse._
import scalaz.{-\/, \/-}
import spray.json.{JsNull, JsObject, JsString, JsValue}
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
sealed abstract class AbstractWebsocketServiceIntegrationTest
extends AsyncFreeSpec
with Matchers
with Inside
with StrictLogging
with AbstractHttpServiceIntegrationTestFuns
with BeforeAndAfterAll {
import HttpServiceTestFixture._
import WebsocketTestFixture._
override def staticContentConfig: Option[StaticContentConfig] = None
override def useTls = UseTls.NoTls
override def wsConfig: Option[WebsocketConfig] = Some(Config.DefaultWsConfig)
private val baseQueryInput: Source[Message, NotUsed] =
Source.single(TextMessage.Strict("""{"templateIds": ["Account:Account"]}"""))
private val fetchRequest =
"""[{"templateId": "Account:Account", "key": ["Alice", "abc123"]}]"""
private val baseFetchInput: Source[Message, NotUsed] =
Source.single(TextMessage.Strict(fetchRequest))
List(
SimpleScenario("query", Uri.Path("/v1/stream/query"), baseQueryInput),
SimpleScenario("fetch", Uri.Path("/v1/stream/fetch"), baseFetchInput),
).foreach { scenario =>
s"${scenario.id} request with valid protocol token should allow client subscribe to stream" in withHttpService {
(uri, _, _) =>
wsConnectRequest(
uri.copy(scheme = "ws").withPath(scenario.path),
validSubprotocol(jwt),
scenario.input,
)._1 flatMap (x => x.response.status shouldBe StatusCodes.SwitchingProtocols)
}
s"${scenario.id} request with invalid protocol token should be denied" in withHttpService {
(uri, _, _) =>
wsConnectRequest(
uri.copy(scheme = "ws").withPath(scenario.path),
Option("foo"),
scenario.input,
)._1 flatMap (x => x.response.status shouldBe StatusCodes.Unauthorized)
}
s"${scenario.id} request without protocol token should be denied" in withHttpService {
(uri, _, _) =>
wsConnectRequest(
uri.copy(scheme = "ws").withPath(scenario.path),
None,
scenario.input,
)._1 flatMap (x => x.response.status shouldBe StatusCodes.Unauthorized)
}
s"two ${scenario.id} requests over the same WebSocket connection are NOT allowed" in withHttpService {
(uri, _, _) =>
val input = scenario.input.mapConcat(x => List(x, x))
val webSocketFlow =
Http().webSocketClientFlow(
WebSocketRequest(
uri = uri.copy(scheme = "ws").withPath(scenario.path),
subprotocol = validSubprotocol(jwt),
)
)
input
.via(webSocketFlow)
.runWith(collectResultsAsTextMessageSkipOffsetTicks)
.flatMap { msgs =>
inside(msgs) { case Seq(errorMsg) =>
val error = decodeErrorResponse(errorMsg)
error shouldBe domain.ErrorResponse(
List("Multiple requests over the same WebSocket connection are not allowed."),
None,
StatusCodes.BadRequest,
)
}
}
}
}
List(
SimpleScenario(
"query",
Uri.Path("/v1/stream/query"),
Source.single(TextMessage.Strict("""{"templateIds": ["AA:BB"]}""")),
),
SimpleScenario(
"fetch",
Uri.Path("/v1/stream/fetch"),
Source.single(TextMessage.Strict("""[{"templateId": "AA:BB", "key": ["k", "v"]}]""")),
),
).foreach { scenario =>
s"${scenario.id} report UnknownTemplateIds and error when cannot resolve any template ID" in withHttpService {
(uri, _, _) =>
val webSocketFlow =
Http().webSocketClientFlow(
WebSocketRequest(
uri = uri.copy(scheme = "ws").withPath(scenario.path),
subprotocol = validSubprotocol(jwt),
)
)
scenario.input
.via(webSocketFlow)
.runWith(collectResultsAsTextMessageSkipOffsetTicks)
.flatMap { msgs =>
inside(msgs) { case Seq(warningMsg, errorMsg) =>
val warning = decodeServiceWarning(warningMsg)
inside(warning) { case domain.UnknownTemplateIds(ids) =>
ids shouldBe List(domain.TemplateId(None, "AA", "BB"))
}
val error = decodeErrorResponse(errorMsg)
error shouldBe domain.ErrorResponse(
List(ErrorMessages.cannotResolveAnyTemplateId),
None,
StatusCodes.BadRequest,
)
}
}
}
}
"query endpoint should publish transactions when command create is completed" in withHttpService {
(uri, _, _) =>
for {
_ <- initialIouCreate(uri)
clientMsg <- singleClientQueryStream(
jwt,
uri,
"""{"templateIds": ["Iou:Iou"]}""",
).take(2)
.runWith(collectResultsAsTextMessage)
} yield inside(clientMsg) { case result +: heartbeats =>
result should include(""""issuer":"Alice"""")
result should include(""""amount":"999.99"""")
Inspectors.forAll(heartbeats)(assertHeartbeat)
}
}
"fetch endpoint should publish transactions when command create is completed" in withHttpService {
(uri, encoder, _) =>
for {
_ <- initialAccountCreate(uri, encoder)
clientMsg <- singleClientFetchStream(jwt, uri, fetchRequest)
.take(2)
.runWith(collectResultsAsTextMessage)
} yield inside(clientMsg) { case result +: heartbeats =>
result should include(""""owner":"Alice"""")
result should include(""""number":"abc123"""")
result should not include (""""offset":"""")
Inspectors.forAll(heartbeats)(assertHeartbeat)
}
}
"query endpoint should warn on unknown template IDs" in withHttpService { (uri, _, _) =>
for {
_ <- initialIouCreate(uri)
clientMsg <- singleClientQueryStream(
jwt,
uri,
"""{"templateIds": ["Iou:Iou", "Unknown:Template"]}""",
).take(3)
.runWith(collectResultsAsTextMessage)
} yield inside(clientMsg) { case warning +: result +: heartbeats =>
warning should include("\"warnings\":{\"unknownTemplateIds\":[\"Unk")
result should include("\"issuer\":\"Alice\"")
Inspectors.forAll(heartbeats)(assertHeartbeat)
}
}
"fetch endpoint should warn on unknown template IDs" in withHttpService { (uri, encoder, _) =>
for {
_ <- initialAccountCreate(uri, encoder)
clientMsg <- singleClientFetchStream(
jwt,
uri,
"""[{"templateId": "Account:Account", "key": ["Alice", "abc123"]}, {"templateId": "Unknown:Template", "key": ["Alice", "abc123"]}]""",
).take(3)
.runWith(collectResultsAsTextMessage)
} yield inside(clientMsg) { case warning +: result +: heartbeats =>
warning should include("""{"warnings":{"unknownTemplateIds":["Unk""")
result should include(""""owner":"Alice"""")
result should include(""""number":"abc123"""")
Inspectors.forAll(heartbeats)(assertHeartbeat)
}
}
"query endpoint should send error msg when receiving malformed message" in withHttpService {
(uri, _, _) =>
val clientMsg = singleClientQueryStream(jwt, uri, "{}")
.runWith(collectResultsAsTextMessageSkipOffsetTicks)
val result = Await.result(clientMsg, 10.seconds)
result should have size 1
val errorResponse = decodeErrorResponse(result.head)
errorResponse.status shouldBe StatusCodes.BadRequest
errorResponse.errors should have size 1
}
"fetch endpoint should send error msg when receiving malformed message" in withHttpService {
(uri, _, _) =>
val clientMsg = singleClientFetchStream(jwt, uri, """[abcdefg!]""")
.runWith(collectResultsAsTextMessageSkipOffsetTicks)
val result = Await.result(clientMsg, 10.seconds)
result should have size 1
val errorResponse = decodeErrorResponse(result.head)
errorResponse.status shouldBe StatusCodes.BadRequest
errorResponse.errors should have size 1
}
private def exercisePayload(cid: domain.ContractId, amount: BigDecimal = BigDecimal("42.42")) = {
import json.JsonProtocol._
import spray.json._
Map(
"templateId" -> "Iou:Iou".toJson,
"contractId" -> cid.toJson,
"choice" -> "Iou_Split".toJson,
"argument" -> Map("splitAmount" -> amount).toJson,
).toJson
}
"query should receive deltas as contracts are archived/created" in withHttpService {
(uri, _, _) =>
import spray.json._
val initialCreate = initialIouCreate(uri)
val query =
"""[
{"templateIds": ["Iou:Iou"], "query": {"amount": {"%lte": 50}}},
{"templateIds": ["Iou:Iou"], "query": {"amount": {"%gt": 50}}},
{"templateIds": ["Iou:Iou"]}
]"""
@com.github.ghik.silencer.silent("evtsWrapper.*never used")
def resp(
iouCid: domain.ContractId,
kill: UniqueKillSwitch,
): Sink[JsValue, Future[ShouldHaveEnded]] = {
val dslSyntax = Consume.syntax[JsValue]
import dslSyntax._
Consume
.interpret(
for {
ContractDelta(Vector((ctid, _)), Vector(), None) <- readOne
_ = (ctid: String) shouldBe (iouCid.unwrap: String)
_ <- liftF(
postJsonRequest(
uri.withPath(Uri.Path("/v1/exercise")),
exercisePayload(domain.ContractId(ctid)),
headersWithAuth,
) map { case (statusCode, _) =>
statusCode.isSuccess shouldBe true
}
)
ContractDelta(Vector(), _, Some(offset)) <- readOne
(preOffset, consumedCtid) = (offset, ctid)
evtsWrapper @ ContractDelta(
Vector((fstId, fst), (sndId, snd)),
Vector(observeConsumed),
Some(lastSeenOffset),
) <- readOne
(liveStartOffset, msgCount) = {
observeConsumed.contractId should ===(consumedCtid)
Set(fstId, sndId, consumedCtid) should have size 3
inside(evtsWrapper) { case JsObject(obj) =>
inside(obj get "events") {
case Some(
JsArray(
Vector(
Archived(_, _),
Created(IouAmount(amt1), MatchedQueries(NumList(ixes1), _)),
Created(IouAmount(amt2), MatchedQueries(NumList(ixes2), _)),
)
)
) =>
Set((amt1, ixes1), (amt2, ixes2)) should ===(
Set(
(BigDecimal("42.42"), Vector(BigDecimal(0), BigDecimal(2))),
(BigDecimal("957.57"), Vector(BigDecimal(1), BigDecimal(2))),
)
)
}
}
(preOffset, 2)
}
_ = kill.shutdown()
heartbeats <- drain
hbCount = (heartbeats.iterator.map {
case ContractDelta(Vector(), Vector(), Some(currentOffset)) => currentOffset
}.toSet + lastSeenOffset).size - 1
} yield
// don't count empty events block if lastSeenOffset does not change
ShouldHaveEnded(
liveStartOffset = liveStartOffset,
msgCount = msgCount + hbCount,
lastSeenOffset = lastSeenOffset,
)
)
}
for {
creation <- initialCreate
_ = creation._1 shouldBe a[StatusCodes.Success]
iouCid = getContractId(getResult(creation._2))
(kill, source) = singleClientQueryStream(jwt, uri, query)
.viaMat(KillSwitches.single)(Keep.right)
.preMaterialize()
lastState <- source via parseResp runWith resp(iouCid, kill)
liveOffset = inside(lastState) { case ShouldHaveEnded(liveStart, 2, lastSeen) =>
lastSeen.unwrap should be > liveStart.unwrap
liveStart
}
rescan <- (singleClientQueryStream(jwt, uri, query, Some(liveOffset))
via parseResp).take(1) runWith remainingDeltas
} yield inside(rescan) {
case (Vector((fstId, fst @ _), (sndId, snd @ _)), Vector(observeConsumed), Some(_)) =>
Set(fstId, sndId, observeConsumed.contractId) should have size 3
}
}
"multi-party query should receive deltas as contracts are archived/created" in withHttpService {
(uri, encoder, _) =>
import spray.json._
val f1 =
postCreateCommand(
accountCreateCommand(domain.Party("Alice"), "abc123"),
encoder,
uri,
headers = headersWithPartyAuth(List("Alice")),
)
val f2 =
postCreateCommand(
accountCreateCommand(domain.Party("Bob"), "def456"),
encoder,
uri,
headers = headersWithPartyAuth(List("Bob")),
)
val query =
"""[
{"templateIds": ["Account:Account"]}
]"""
def resp(
cid1: domain.ContractId,
cid2: domain.ContractId,
kill: UniqueKillSwitch,
): Sink[JsValue, Future[ShouldHaveEnded]] = {
val dslSyntax = Consume.syntax[JsValue]
import dslSyntax._
Consume.interpret(
for {
Vector((account1, _), (account2, _)) <- readAcsN(2)
_ = Seq(account1, account2) should contain theSameElementsAs Seq(cid1, cid2)
ContractDelta(Vector(), _, Some(liveStartOffset)) <- readOne
_ <- liftF(
postCreateCommand(
accountCreateCommand(domain.Party("Alice"), "abc234"),
encoder,
uri,
headers = headersWithPartyAuth(List("Alice")),
)
)
ContractDelta(Vector((_, aliceAccount)), _, Some(_)) <- readOne
_ = inside(aliceAccount) { case JsObject(obj) =>
inside((obj get "owner", obj get "number")) {
case (Some(JsString(owner)), Some(JsString(number))) =>
owner shouldBe "Alice"
number shouldBe "abc234"
}
}
_ <- liftF(
postCreateCommand(
accountCreateCommand(domain.Party("Bob"), "def567"),
encoder,
uri,
headers = headersWithPartyAuth(List("Bob")),
)
)
ContractDelta(Vector((_, bobAccount)), _, Some(lastSeenOffset)) <- readOne
_ = inside(bobAccount) { case JsObject(obj) =>
inside((obj get "owner", obj get "number")) {
case (Some(JsString(owner)), Some(JsString(number))) =>
owner shouldBe "Bob"
number shouldBe "def567"
}
}
_ = kill.shutdown()
heartbeats <- drain
hbCount = (heartbeats.iterator.map {
case ContractDelta(Vector(), Vector(), Some(currentOffset)) => currentOffset
}.toSet + lastSeenOffset).size - 1
} yield (
// don't count empty events block if lastSeenOffset does not change
ShouldHaveEnded(
liveStartOffset = liveStartOffset,
msgCount = 5 + hbCount,
lastSeenOffset = lastSeenOffset,
),
)
)
}
for {
r1 <- f1
_ = r1._1 shouldBe a[StatusCodes.Success]
cid1 = getContractId(getResult(r1._2))
r2 <- f2
_ = r2._1 shouldBe a[StatusCodes.Success]
cid2 = getContractId(getResult(r2._2))
(kill, source) = singleClientQueryStream(
jwtForParties(List("Alice", "Bob"), List(), testId),
uri,
query,
).viaMat(KillSwitches.single)(Keep.right).preMaterialize()
lastState <- source via parseResp runWith resp(cid1, cid2, kill)
liveOffset = inside(lastState) { case ShouldHaveEnded(liveStart, 5, lastSeen) =>
lastSeen.unwrap should be > liveStart.unwrap
liveStart
}
rescan <- (singleClientQueryStream(jwt, uri, query, Some(liveOffset))
via parseResp).take(1) runWith remainingDeltas
} yield inside(rescan) { case (Vector(_), _, Some(_)) =>
succeed
}
}
"fetch should receive deltas as contracts are archived/created, filtering out phantom archives" in withHttpService {
(uri, encoder, _) =>
val templateId = domain.TemplateId(None, "Account", "Account")
def fetchRequest(contractIdAtOffset: Option[Option[domain.ContractId]] = None) = {
import spray.json._, json.JsonProtocol._
List(
Map("templateId" -> "Account:Account".toJson, "key" -> List("Alice", "abc123").toJson)
++ contractIdAtOffset
.map(ocid => contractIdAtOffsetKey -> ocid.toJson)
.toList
).toJson.compactPrint
}
val f1 =
postCreateCommand(accountCreateCommand(domain.Party("Alice"), "abc123"), encoder, uri)
val f2 =
postCreateCommand(accountCreateCommand(domain.Party("Alice"), "def456"), encoder, uri)
def resp(
cid1: domain.ContractId,
cid2: domain.ContractId,
kill: UniqueKillSwitch,
): Sink[JsValue, Future[ShouldHaveEnded]] = {
val dslSyntax = Consume.syntax[JsValue]
import dslSyntax._
Consume.interpret(
for {
ContractDelta(Vector((cid, c)), Vector(), None) <- readOne
_ = (cid: String) shouldBe (cid1.unwrap: String)
ctid <- liftF(postArchiveCommand(templateId, cid2, encoder, uri).flatMap {
case (statusCode, _) =>
statusCode.isSuccess shouldBe true
postArchiveCommand(templateId, cid1, encoder, uri).map { case (statusCode, _) =>
statusCode.isSuccess shouldBe true
cid
}
})
ContractDelta(Vector(), _, Some(offset)) <- readOne
(off, archivedCid) = (offset, ctid)
ContractDelta(Vector(), Vector(observeArchivedCid), Some(lastSeenOffset)) <- readOne
(liveStartOffset, msgCount) = {
(observeArchivedCid.contractId.unwrap: String) shouldBe (archivedCid: String)
(observeArchivedCid.contractId: domain.ContractId) shouldBe (cid1: domain.ContractId)
(off, 0)
}
_ = kill.shutdown()
heartbeats <- drain
hbCount = (heartbeats.iterator.map {
case ContractDelta(Vector(), Vector(), Some(currentOffset)) => currentOffset
}.toSet + lastSeenOffset).size - 1
} yield
// don't count empty events block if lastSeenOffset does not change
ShouldHaveEnded(
liveStartOffset = liveStartOffset,
msgCount = msgCount + hbCount,
lastSeenOffset = lastSeenOffset,
)
)
}
for {
r1 <- f1
_ = r1._1 shouldBe a[StatusCodes.Success]
cid1 = getContractId(getResult(r1._2))
r2 <- f2
_ = r2._1 shouldBe a[StatusCodes.Success]
cid2 = getContractId(getResult(r2._2))
(kill, source) = singleClientFetchStream(jwt, uri, fetchRequest())
.viaMat(KillSwitches.single)(Keep.right)
.preMaterialize()
lastState <- source
.via(parseResp) runWith resp(cid1, cid2, kill)
liveOffset = inside(lastState) { case ShouldHaveEnded(liveStart, 0, lastSeen) =>
lastSeen.unwrap should be > liveStart.unwrap
liveStart
}
// check contractIdAtOffsets' effects on phantom filtering
resumes <- Future.traverse(Seq((None, 2L), (Some(None), 0L), (Some(Some(cid1)), 1L))) {
case (abcHint, expectArchives) =>
(singleClientFetchStream(
jwt,
uri,
fetchRequest(abcHint),
Some(liveOffset),
)
via parseResp)
.take(2)
.runWith(remainingDeltas)
.map { case (creates, archives, _) =>
creates shouldBe empty
archives should have size expectArchives
}
}
} yield resumes.foldLeft(1 shouldBe 1)((_, a) => a)
}
"fetch multiple keys should work" in withHttpService { (uri, encoder, _) =>
def create(account: String): Future[domain.ContractId] =
for {
r <- postCreateCommand(accountCreateCommand(domain.Party("Alice"), account), encoder, uri)
} yield {
assert(r._1.isSuccess)
getContractId(getResult(r._2))
}
def archive(id: domain.ContractId): Future[Assertion] =
for {
r <- postArchiveCommand(domain.TemplateId(None, "Account", "Account"), id, encoder, uri)
} yield {
assert(r._1.isSuccess)
}
val req =
"""
|[{"templateId": "Account:Account", "key": ["Alice", "abc123"]},
| {"templateId": "Account:Account", "key": ["Alice", "def456"]}]
|""".stripMargin
val futureResults =
singleClientFetchStream(jwt, uri, req)
.via(parseResp)
.filterNot(isOffsetTick)
.take(4)
.runWith(Sink.seq[JsValue])
for {
cid1 <- create("abc123")
_ <- create("abc124")
_ <- create("abc125")
cid2 <- create("def456")
_ <- archive(cid2)
_ <- archive(cid1)
results <- futureResults
} yield {
val expected: Seq[JsValue] = {
import spray.json._
Seq(
"""
|{"events":[{"created":{"payload":{"number":"abc123"}}}]}
|""".stripMargin.parseJson,
"""
|{"events":[{"created":{"payload":{"number":"def456"}}}]}
|""".stripMargin.parseJson,
"""
|{"events":[{"archived":{}}]}
|""".stripMargin.parseJson,
"""
|{"events":[{"archived":{}}]}
|""".stripMargin.parseJson,
)
}
results should matchJsValues(expected)
}
}
"multi-party fetch-by-key query should receive deltas as contracts are archived/created" in withHttpService {
(uri, encoder, _) =>
import spray.json._
val templateId = domain.TemplateId(None, "Account", "Account")
val f1 =
postCreateCommand(
accountCreateCommand(domain.Party("Alice"), "abc123"),
encoder,
uri,
headers = headersWithPartyAuth(List("Alice")),
)
val f2 =
postCreateCommand(
accountCreateCommand(domain.Party("Bob"), "def456"),
encoder,
uri,
headers = headersWithPartyAuth(List("Bob")),
)
val query =
"""[
{"templateId": "Account:Account", "key": ["Alice", "abc123"]},
{"templateId": "Account:Account", "key": ["Bob", "def456"]}
]"""
def resp(
cid1: domain.ContractId,
cid2: domain.ContractId,
kill: UniqueKillSwitch,
): Sink[JsValue, Future[ShouldHaveEnded]] = {
val dslSyntax = Consume.syntax[JsValue]
import dslSyntax._
Consume.interpret(
for {
Vector((account1, _), (account2, _)) <- readAcsN(2)
_ = Seq(account1, account2) should contain theSameElementsAs Seq(cid1, cid2)
ContractDelta(Vector(), _, Some(liveStartOffset)) <- readOne
_ <- liftF(postArchiveCommand(templateId, cid1, encoder, uri))
ContractDelta(Vector(), Vector(archivedCid1), Some(_)) <- readOne
_ = archivedCid1.contractId shouldBe cid1
_ <- liftF(
postArchiveCommand(
templateId,
cid2,
encoder,
uri,
headers = headersWithPartyAuth(List("Bob")),
)
)
ContractDelta(Vector(), Vector(archivedCid2), Some(lastSeenOffset)) <- readOne
_ = archivedCid2.contractId shouldBe cid2
_ = kill.shutdown()
heartbeats <- drain
hbCount = (heartbeats.iterator.map {
case ContractDelta(Vector(), Vector(), Some(currentOffset)) => currentOffset
}.toSet + lastSeenOffset).size - 1
} yield (
// don't count empty events block if lastSeenOffset does not change
ShouldHaveEnded(
liveStartOffset = liveStartOffset,
msgCount = 5 + hbCount,
lastSeenOffset = lastSeenOffset,
),
)
)
}
for {
r1 <- f1
_ = r1._1 shouldBe a[StatusCodes.Success]
cid1 = getContractId(getResult(r1._2))
r2 <- f2
_ = r2._1 shouldBe a[StatusCodes.Success]
cid2 = getContractId(getResult(r2._2))
(kill, source) = singleClientFetchStream(
jwtForParties(List("Alice", "Bob"), List(), testId),
uri,
query,
).viaMat(KillSwitches.single)(Keep.right).preMaterialize()
lastState <- source via parseResp runWith resp(cid1, cid2, kill)
liveOffset = inside(lastState) { case ShouldHaveEnded(liveStart, 5, lastSeen) =>
lastSeen.unwrap should be > liveStart.unwrap
liveStart
}
rescan <- (singleClientFetchStream(
jwtForParties(List("Alice", "Bob"), List(), testId),
uri,
query,
Some(liveOffset),
)
via parseResp).take(2) runWith remainingDeltas
} yield inside(rescan) { case (Vector(), Vector(_, _), Some(_)) =>
succeed
}
}
/** Consume ACS blocks expecting `createCount` contracts. Fail if there
* are too many contracts.
*/
private[this] def readAcsN(createCount: Int): Consume.FCC[JsValue, Vector[(String, JsValue)]] = {
val dslSyntax = Consume.syntax[JsValue]
import dslSyntax._
def go(createCount: Int): Consume.FCC[JsValue, Vector[(String, JsValue)]] =
if (createCount <= 0) point(Vector.empty)
else
for {
ContractDelta(creates, Vector(), None) <- readOne
found = creates.size
if found <= createCount
tail <- if (found < createCount) go(createCount - found) else point(Vector.empty)
} yield creates ++ tail
go(createCount)
}
"fetch should should return an error if empty list of (templateId, key) pairs is passed" in withHttpService {
(uri, _, _) =>
singleClientFetchStream(jwt, uri, "[]")
.runWith(collectResultsAsTextMessageSkipOffsetTicks)
.map { clientMsgs =>
inside(clientMsgs) { case Seq(errorMsg) =>
val errorResponse = decodeErrorResponse(errorMsg)
errorResponse.status shouldBe StatusCodes.BadRequest
inside(errorResponse.errors) { case List(error) =>
error should include("must be a JSON array with at least 1 element")
}
}
}: Future[Assertion]
}
"query on a bunch of random splits should yield consistent results" in withHttpService {
(uri, _, _) =>
val splitSample = SplitSeq.gen.map(_ map (BigDecimal(_))).sample.get
val query =
"""[
{"templateIds": ["Iou:Iou"]}
]"""
val (kill, source) = singleClientQueryStream(jwt, uri, query)
.viaMat(KillSwitches.single)(Keep.right)
.preMaterialize()
source
.via(parseResp)
.map(iouSplitResult)
.filterNot(_ == \/-((Vector(), Vector()))) // liveness marker/heartbeat
.runWith(Consume.interpret(trialSplitSeq(uri, splitSample, kill)))
}
private def trialSplitSeq(
serviceUri: Uri,
ss: SplitSeq[BigDecimal],
kill: UniqueKillSwitch,
): Consume.FCC[IouSplitResult, Assertion] = {
val dslSyntax = Consume.syntax[IouSplitResult]
import SplitSeq._
import dslSyntax._
def go(
createdCid: domain.ContractId,
ss: SplitSeq[BigDecimal],
): Consume.FCC[IouSplitResult, Assertion] = ss match {
case Leaf(_) =>
point(1 shouldBe 1)
case Node(_, l, r) =>
for {
(StatusCodes.OK, _) <- liftF(
postJsonRequest(
serviceUri.withPath(Uri.Path("/v1/exercise")),
exercisePayload(createdCid, l.x),
headersWithAuth,
)
)
\/-((Vector((cid1, amt1), (cid2, amt2)), Vector(archival))) <- readOne
(lCid, rCid) = {
archival should ===(createdCid)
Set(amt1, amt2) should ===(Set(l.x, r.x))
if (amt1 == l.x) (cid1, cid2) else (cid2, cid1)
}
_ <- go(lCid, l)
last <- go(rCid, r)
} yield last
}
val initialPayload = {
import spray.json._, json.JsonProtocol._
Map(
"templateId" -> "Iou:Iou".toJson,
"payload" -> Map(
"observers" -> List[String]().toJson,
"issuer" -> "Alice".toJson,
"amount" -> ss.x.toJson,
"currency" -> "USD".toJson,
"owner" -> "Alice".toJson,
).toJson,
).toJson
}
for {
(StatusCodes.OK, _) <- liftF(
postJsonRequest(
serviceUri.withPath(Uri.Path("/v1/create")),
initialPayload,
headersWithAuth,
)
)
\/-((Vector((genesisCid, amt)), Vector())) <- readOne
_ = amt should ===(ss.x)
last <- go(genesisCid, ss)
_ = kill.shutdown()
} yield last
}
private def iouSplitResult(jsv: JsValue): IouSplitResult = jsv match {
case ContractDelta(creates, archives, _) =>
creates traverse {
case (cid, JsObject(fields)) =>
fields get "amount" collect { case JsString(amt) =>
(domain.ContractId(cid), BigDecimal(amt))
}
case _ => None
} map ((_, archives map (_.contractId))) toRightDisjunction jsv
case _ => -\/(jsv)
}
"ContractKeyStreamRequest" - {
import spray.json._, json.JsonProtocol._
val baseVal =
domain.EnrichedContractKey(domain.TemplateId(Some("ab"), "cd", "ef"), JsString("42"): JsValue)
val baseMap = baseVal.toJson.asJsObject.fields
val withSome = JsObject(baseMap + (contractIdAtOffsetKey -> JsString("hi")))
val withNone = JsObject(baseMap + (contractIdAtOffsetKey -> JsNull))
"initial JSON reader" - {
type T = domain.ContractKeyStreamRequest[Unit, JsValue]
"shares EnrichedContractKey format" in {
JsObject(baseMap).convertTo[T] should ===(domain.ContractKeyStreamRequest((), baseVal))
}
"errors on contractIdAtOffset presence" in {
a[DeserializationException] shouldBe thrownBy {
withSome.convertTo[T]
}
a[DeserializationException] shouldBe thrownBy {
withNone.convertTo[T]
}
}
}
"resuming JSON reader" - {
type T = domain.ContractKeyStreamRequest[Option[Option[domain.ContractId]], JsValue]
"shares EnrichedContractKey format" in {
JsObject(baseMap).convertTo[T] should ===(domain.ContractKeyStreamRequest(None, baseVal))
}
"distinguishes null and string" in {
withSome.convertTo[T] should ===(domain.ContractKeyStreamRequest(Some(Some("hi")), baseVal))
withNone.convertTo[T] should ===(domain.ContractKeyStreamRequest(Some(None), baseVal))
}
}
}
private def wsConnectRequest[M](
uri: Uri,
subprotocol: Option[String],
input: Source[Message, NotUsed],
) =
Http().singleWebSocketRequest(
request = WebSocketRequest(uri = uri, subprotocol = subprotocol),
clientFlow = dummyFlow(input),
)
private def assertHeartbeat(str: String): Assertion =
inside(
SprayJson
.decode[EventsBlock](str)
) { case \/-(eventsBlock) =>
eventsBlock.events shouldBe Vector.empty[JsValue]
inside(eventsBlock.offset) {
case Some(JsString(offset)) =>
offset.length should be > 0
case Some(JsNull) =>
Succeeded
}
}
private def decodeErrorResponse(str: String): domain.ErrorResponse = {
import json.JsonProtocol._
inside(SprayJson.decode[domain.ErrorResponse](str)) { case \/-(e) =>
e
}
}
private def decodeServiceWarning(str: String): domain.ServiceWarning = {
import json.JsonProtocol._
inside(SprayJson.decode[domain.AsyncWarningsWrapper](str)) { case \/-(w) =>
w.warnings
}
}
}
final class WebsocketServiceIntegrationTest extends AbstractWebsocketServiceIntegrationTest {
override def jdbcConfig = None
}
@ -942,3 +10,9 @@ final class WebsocketServiceIntegrationTest extends AbstractWebsocketServiceInte
final class WebsocketServiceWithPostgresIntTest
extends AbstractWebsocketServiceIntegrationTest
with HttpServicePostgresInt
/* TODO SC
final class WebsocketServiceWithOracleIntTest
extends AbstractWebsocketServiceIntegrationTest
with HttpServiceOracleInt
*/

View File

@ -72,7 +72,7 @@ object AbstractHttpServiceIntegrationTestFuns {
@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
trait AbstractHttpServiceIntegrationTestFuns extends StrictLogging {
this: AsyncFreeSpec with Matchers with Inside with StrictLogging =>
this: AsyncTestSuite with Matchers with Inside =>
import AbstractHttpServiceIntegrationTestFuns._
import json.JsonProtocol._
import HttpServiceTestFixture._
@ -475,6 +475,52 @@ trait AbstractHttpServiceIntegrationTestFuns extends StrictLogging {
val command = accountCreateCommand(domain.Party("Alice"), "abc123")
postCreateCommand(command, encoder, serviceUri)
}
protected def jsObject(s: String): JsObject = {
val r: JsonError \/ JsObject = for {
jsVal <- SprayJson.parse(s).leftMap(e => JsonError(e.shows))
jsObj <- SprayJson.mustBeJsObject(jsVal)
} yield jsObj
r.valueOr(e => fail(e.shows))
}
protected def searchExpectOk(
commands: List[domain.CreateCommand[v.Record]],
query: JsObject,
uri: Uri,
encoder: DomainJsonEncoder,
headers: List[HttpHeader] = headersWithAuth,
): Future[List[domain.ActiveContract[JsValue]]] = {
search(commands, query, uri, encoder, headers).map(expectOk(_))
}
protected def search(
commands: List[domain.CreateCommand[v.Record]],
query: JsObject,
uri: Uri,
encoder: DomainJsonEncoder,
headers: List[HttpHeader] = headersWithAuth,
): Future[
domain.SyncResponse[List[domain.ActiveContract[JsValue]]]
] = {
commands.traverse(c => postCreateCommand(c, encoder, uri, headers)).flatMap { rs =>
rs.map(_._1) shouldBe List.fill(commands.size)(StatusCodes.OK)
postJsonRequest(uri.withPath(Uri.Path("/v1/query")), query, headers).flatMap {
case (_, output) =>
FutureUtil
.toFuture(decode1[domain.SyncResponse, List[domain.ActiveContract[JsValue]]](output))
}
}
}
private[http] def expectOk[R](resp: domain.SyncResponse[R]): R = resp match {
case ok: domain.OkResponse[_] =>
ok.status shouldBe StatusCodes.OK
ok.warnings shouldBe empty
ok.result
case err: domain.ErrorResponse =>
fail(s"Expected OK response, got: $err")
}
}
@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
@ -717,52 +763,6 @@ abstract class AbstractHttpServiceIntegrationTest
}: Future[Assertion]
}
protected def jsObject(s: String): JsObject = {
val r: JsonError \/ JsObject = for {
jsVal <- SprayJson.parse(s).leftMap(e => JsonError(e.shows))
jsObj <- SprayJson.mustBeJsObject(jsVal)
} yield jsObj
r.valueOr(e => fail(e.shows))
}
private def expectOk[R](resp: domain.SyncResponse[R]): R = resp match {
case ok: domain.OkResponse[_] =>
ok.status shouldBe StatusCodes.OK
ok.warnings shouldBe empty
ok.result
case err: domain.ErrorResponse =>
fail(s"Expected OK response, got: $err")
}
protected def searchExpectOk(
commands: List[domain.CreateCommand[v.Record]],
query: JsObject,
uri: Uri,
encoder: DomainJsonEncoder,
headers: List[HttpHeader] = headersWithAuth,
): Future[List[domain.ActiveContract[JsValue]]] = {
search(commands, query, uri, encoder, headers).map(expectOk(_))
}
protected def search(
commands: List[domain.CreateCommand[v.Record]],
query: JsObject,
uri: Uri,
encoder: DomainJsonEncoder,
headers: List[HttpHeader] = headersWithAuth,
): Future[
domain.SyncResponse[List[domain.ActiveContract[JsValue]]]
] = {
commands.traverse(c => postCreateCommand(c, encoder, uri, headers)).flatMap { rs =>
rs.map(_._1) shouldBe List.fill(commands.size)(StatusCodes.OK)
postJsonRequest(uri.withPath(Uri.Path("/v1/query")), query, headers).flatMap {
case (_, output) =>
FutureUtil
.toFuture(decode1[domain.SyncResponse, List[domain.ActiveContract[JsValue]]](output))
}
}
}
protected def searchAllExpectOk(
uri: Uri,
headers: List[HttpHeader] = headersWithAuth,

View File

@ -0,0 +1,936 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.http
import akka.NotUsed
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest}
import akka.http.scaladsl.model.{StatusCodes, Uri}
import akka.stream.{KillSwitches, UniqueKillSwitch}
import akka.stream.scaladsl.{Keep, Sink, Source}
import com.daml.http.json.SprayJson
import com.typesafe.scalalogging.StrictLogging
import org.scalatest._
import org.scalatest.freespec.AsyncFreeSpec
import org.scalatest.matchers.should.Matchers
import scalaz.std.option._
import scalaz.std.vector._
import scalaz.syntax.std.option._
import scalaz.syntax.tag._
import scalaz.syntax.traverse._
import scalaz.{-\/, \/-}
import spray.json.{JsNull, JsObject, JsString, JsValue}
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
abstract class AbstractWebsocketServiceIntegrationTest
extends AsyncFreeSpec
with Matchers
with Inside
with StrictLogging
with AbstractHttpServiceIntegrationTestFuns
with BeforeAndAfterAll {
import HttpServiceTestFixture._
import WebsocketTestFixture._
override def staticContentConfig: Option[StaticContentConfig] = None
override def useTls = UseTls.NoTls
override def wsConfig: Option[WebsocketConfig] = Some(Config.DefaultWsConfig)
private val baseQueryInput: Source[Message, NotUsed] =
Source.single(TextMessage.Strict("""{"templateIds": ["Account:Account"]}"""))
private val fetchRequest =
"""[{"templateId": "Account:Account", "key": ["Alice", "abc123"]}]"""
private val baseFetchInput: Source[Message, NotUsed] =
Source.single(TextMessage.Strict(fetchRequest))
List(
SimpleScenario("query", Uri.Path("/v1/stream/query"), baseQueryInput),
SimpleScenario("fetch", Uri.Path("/v1/stream/fetch"), baseFetchInput),
).foreach { scenario =>
s"${scenario.id} request with valid protocol token should allow client subscribe to stream" in withHttpService {
(uri, _, _) =>
wsConnectRequest(
uri.copy(scheme = "ws").withPath(scenario.path),
validSubprotocol(jwt),
scenario.input,
)._1 flatMap (x => x.response.status shouldBe StatusCodes.SwitchingProtocols)
}
s"${scenario.id} request with invalid protocol token should be denied" in withHttpService {
(uri, _, _) =>
wsConnectRequest(
uri.copy(scheme = "ws").withPath(scenario.path),
Option("foo"),
scenario.input,
)._1 flatMap (x => x.response.status shouldBe StatusCodes.Unauthorized)
}
s"${scenario.id} request without protocol token should be denied" in withHttpService {
(uri, _, _) =>
wsConnectRequest(
uri.copy(scheme = "ws").withPath(scenario.path),
None,
scenario.input,
)._1 flatMap (x => x.response.status shouldBe StatusCodes.Unauthorized)
}
s"two ${scenario.id} requests over the same WebSocket connection are NOT allowed" in withHttpService {
(uri, _, _) =>
val input = scenario.input.mapConcat(x => List(x, x))
val webSocketFlow =
Http().webSocketClientFlow(
WebSocketRequest(
uri = uri.copy(scheme = "ws").withPath(scenario.path),
subprotocol = validSubprotocol(jwt),
)
)
input
.via(webSocketFlow)
.runWith(collectResultsAsTextMessageSkipOffsetTicks)
.flatMap { msgs =>
inside(msgs) { case Seq(errorMsg) =>
val error = decodeErrorResponse(errorMsg)
error shouldBe domain.ErrorResponse(
List("Multiple requests over the same WebSocket connection are not allowed."),
None,
StatusCodes.BadRequest,
)
}
}
}
}
List(
SimpleScenario(
"query",
Uri.Path("/v1/stream/query"),
Source.single(TextMessage.Strict("""{"templateIds": ["AA:BB"]}""")),
),
SimpleScenario(
"fetch",
Uri.Path("/v1/stream/fetch"),
Source.single(TextMessage.Strict("""[{"templateId": "AA:BB", "key": ["k", "v"]}]""")),
),
).foreach { scenario =>
s"${scenario.id} report UnknownTemplateIds and error when cannot resolve any template ID" in withHttpService {
(uri, _, _) =>
val webSocketFlow =
Http().webSocketClientFlow(
WebSocketRequest(
uri = uri.copy(scheme = "ws").withPath(scenario.path),
subprotocol = validSubprotocol(jwt),
)
)
scenario.input
.via(webSocketFlow)
.runWith(collectResultsAsTextMessageSkipOffsetTicks)
.flatMap { msgs =>
inside(msgs) { case Seq(warningMsg, errorMsg) =>
val warning = decodeServiceWarning(warningMsg)
inside(warning) { case domain.UnknownTemplateIds(ids) =>
ids shouldBe List(domain.TemplateId(None, "AA", "BB"))
}
val error = decodeErrorResponse(errorMsg)
error shouldBe domain.ErrorResponse(
List(ErrorMessages.cannotResolveAnyTemplateId),
None,
StatusCodes.BadRequest,
)
}
}
}
}
"query endpoint should publish transactions when command create is completed" in withHttpService {
(uri, _, _) =>
for {
_ <- initialIouCreate(uri)
clientMsg <- singleClientQueryStream(
jwt,
uri,
"""{"templateIds": ["Iou:Iou"]}""",
).take(2)
.runWith(collectResultsAsTextMessage)
} yield inside(clientMsg) { case result +: heartbeats =>
result should include(""""issuer":"Alice"""")
result should include(""""amount":"999.99"""")
Inspectors.forAll(heartbeats)(assertHeartbeat)
}
}
"fetch endpoint should publish transactions when command create is completed" in withHttpService {
(uri, encoder, _) =>
for {
_ <- initialAccountCreate(uri, encoder)
clientMsg <- singleClientFetchStream(jwt, uri, fetchRequest)
.take(2)
.runWith(collectResultsAsTextMessage)
} yield inside(clientMsg) { case result +: heartbeats =>
result should include(""""owner":"Alice"""")
result should include(""""number":"abc123"""")
result should not include (""""offset":"""")
Inspectors.forAll(heartbeats)(assertHeartbeat)
}
}
"query endpoint should warn on unknown template IDs" in withHttpService { (uri, _, _) =>
for {
_ <- initialIouCreate(uri)
clientMsg <- singleClientQueryStream(
jwt,
uri,
"""{"templateIds": ["Iou:Iou", "Unknown:Template"]}""",
).take(3)
.runWith(collectResultsAsTextMessage)
} yield inside(clientMsg) { case warning +: result +: heartbeats =>
warning should include("\"warnings\":{\"unknownTemplateIds\":[\"Unk")
result should include("\"issuer\":\"Alice\"")
Inspectors.forAll(heartbeats)(assertHeartbeat)
}
}
"fetch endpoint should warn on unknown template IDs" in withHttpService { (uri, encoder, _) =>
for {
_ <- initialAccountCreate(uri, encoder)
clientMsg <- singleClientFetchStream(
jwt,
uri,
"""[{"templateId": "Account:Account", "key": ["Alice", "abc123"]}, {"templateId": "Unknown:Template", "key": ["Alice", "abc123"]}]""",
).take(3)
.runWith(collectResultsAsTextMessage)
} yield inside(clientMsg) { case warning +: result +: heartbeats =>
warning should include("""{"warnings":{"unknownTemplateIds":["Unk""")
result should include(""""owner":"Alice"""")
result should include(""""number":"abc123"""")
Inspectors.forAll(heartbeats)(assertHeartbeat)
}
}
"query endpoint should send error msg when receiving malformed message" in withHttpService {
(uri, _, _) =>
val clientMsg = singleClientQueryStream(jwt, uri, "{}")
.runWith(collectResultsAsTextMessageSkipOffsetTicks)
val result = Await.result(clientMsg, 10.seconds)
result should have size 1
val errorResponse = decodeErrorResponse(result.head)
errorResponse.status shouldBe StatusCodes.BadRequest
errorResponse.errors should have size 1
}
"fetch endpoint should send error msg when receiving malformed message" in withHttpService {
(uri, _, _) =>
val clientMsg = singleClientFetchStream(jwt, uri, """[abcdefg!]""")
.runWith(collectResultsAsTextMessageSkipOffsetTicks)
val result = Await.result(clientMsg, 10.seconds)
result should have size 1
val errorResponse = decodeErrorResponse(result.head)
errorResponse.status shouldBe StatusCodes.BadRequest
errorResponse.errors should have size 1
}
private def exercisePayload(cid: domain.ContractId, amount: BigDecimal = BigDecimal("42.42")) = {
import json.JsonProtocol._
import spray.json._
Map(
"templateId" -> "Iou:Iou".toJson,
"contractId" -> cid.toJson,
"choice" -> "Iou_Split".toJson,
"argument" -> Map("splitAmount" -> amount).toJson,
).toJson
}
"query should receive deltas as contracts are archived/created" in withHttpService {
(uri, _, _) =>
import spray.json._
val initialCreate = initialIouCreate(uri)
val query =
"""[
{"templateIds": ["Iou:Iou"], "query": {"amount": {"%lte": 50}}},
{"templateIds": ["Iou:Iou"], "query": {"amount": {"%gt": 50}}},
{"templateIds": ["Iou:Iou"]}
]"""
@com.github.ghik.silencer.silent("evtsWrapper.*never used")
def resp(
iouCid: domain.ContractId,
kill: UniqueKillSwitch,
): Sink[JsValue, Future[ShouldHaveEnded]] = {
val dslSyntax = Consume.syntax[JsValue]
import dslSyntax._
Consume
.interpret(
for {
ContractDelta(Vector((ctid, _)), Vector(), None) <- readOne
_ = (ctid: String) shouldBe (iouCid.unwrap: String)
_ <- liftF(
postJsonRequest(
uri.withPath(Uri.Path("/v1/exercise")),
exercisePayload(domain.ContractId(ctid)),
headersWithAuth,
) map { case (statusCode, _) =>
statusCode.isSuccess shouldBe true
}
)
ContractDelta(Vector(), _, Some(offset)) <- readOne
(preOffset, consumedCtid) = (offset, ctid)
evtsWrapper @ ContractDelta(
Vector((fstId, fst), (sndId, snd)),
Vector(observeConsumed),
Some(lastSeenOffset),
) <- readOne
(liveStartOffset, msgCount) = {
observeConsumed.contractId should ===(consumedCtid)
Set(fstId, sndId, consumedCtid) should have size 3
inside(evtsWrapper) { case JsObject(obj) =>
inside(obj get "events") {
case Some(
JsArray(
Vector(
Archived(_, _),
Created(IouAmount(amt1), MatchedQueries(NumList(ixes1), _)),
Created(IouAmount(amt2), MatchedQueries(NumList(ixes2), _)),
)
)
) =>
Set((amt1, ixes1), (amt2, ixes2)) should ===(
Set(
(BigDecimal("42.42"), Vector(BigDecimal(0), BigDecimal(2))),
(BigDecimal("957.57"), Vector(BigDecimal(1), BigDecimal(2))),
)
)
}
}
(preOffset, 2)
}
_ = kill.shutdown()
heartbeats <- drain
hbCount = (heartbeats.iterator.map {
case ContractDelta(Vector(), Vector(), Some(currentOffset)) => currentOffset
}.toSet + lastSeenOffset).size - 1
} yield
// don't count empty events block if lastSeenOffset does not change
ShouldHaveEnded(
liveStartOffset = liveStartOffset,
msgCount = msgCount + hbCount,
lastSeenOffset = lastSeenOffset,
)
)
}
for {
creation <- initialCreate
_ = creation._1 shouldBe a[StatusCodes.Success]
iouCid = getContractId(getResult(creation._2))
(kill, source) = singleClientQueryStream(jwt, uri, query)
.viaMat(KillSwitches.single)(Keep.right)
.preMaterialize()
lastState <- source via parseResp runWith resp(iouCid, kill)
liveOffset = inside(lastState) { case ShouldHaveEnded(liveStart, 2, lastSeen) =>
lastSeen.unwrap should be > liveStart.unwrap
liveStart
}
rescan <- (singleClientQueryStream(jwt, uri, query, Some(liveOffset))
via parseResp).take(1) runWith remainingDeltas
} yield inside(rescan) {
case (Vector((fstId, fst @ _), (sndId, snd @ _)), Vector(observeConsumed), Some(_)) =>
Set(fstId, sndId, observeConsumed.contractId) should have size 3
}
}
"multi-party query should receive deltas as contracts are archived/created" in withHttpService {
(uri, encoder, _) =>
import spray.json._
val f1 =
postCreateCommand(
accountCreateCommand(domain.Party("Alice"), "abc123"),
encoder,
uri,
headers = headersWithPartyAuth(List("Alice")),
)
val f2 =
postCreateCommand(
accountCreateCommand(domain.Party("Bob"), "def456"),
encoder,
uri,
headers = headersWithPartyAuth(List("Bob")),
)
val query =
"""[
{"templateIds": ["Account:Account"]}
]"""
def resp(
cid1: domain.ContractId,
cid2: domain.ContractId,
kill: UniqueKillSwitch,
): Sink[JsValue, Future[ShouldHaveEnded]] = {
val dslSyntax = Consume.syntax[JsValue]
import dslSyntax._
Consume.interpret(
for {
Vector((account1, _), (account2, _)) <- readAcsN(2)
_ = Seq(account1, account2) should contain theSameElementsAs Seq(cid1, cid2)
ContractDelta(Vector(), _, Some(liveStartOffset)) <- readOne
_ <- liftF(
postCreateCommand(
accountCreateCommand(domain.Party("Alice"), "abc234"),
encoder,
uri,
headers = headersWithPartyAuth(List("Alice")),
)
)
ContractDelta(Vector((_, aliceAccount)), _, Some(_)) <- readOne
_ = inside(aliceAccount) { case JsObject(obj) =>
inside((obj get "owner", obj get "number")) {
case (Some(JsString(owner)), Some(JsString(number))) =>
owner shouldBe "Alice"
number shouldBe "abc234"
}
}
_ <- liftF(
postCreateCommand(
accountCreateCommand(domain.Party("Bob"), "def567"),
encoder,
uri,
headers = headersWithPartyAuth(List("Bob")),
)
)
ContractDelta(Vector((_, bobAccount)), _, Some(lastSeenOffset)) <- readOne
_ = inside(bobAccount) { case JsObject(obj) =>
inside((obj get "owner", obj get "number")) {
case (Some(JsString(owner)), Some(JsString(number))) =>
owner shouldBe "Bob"
number shouldBe "def567"
}
}
_ = kill.shutdown()
heartbeats <- drain
hbCount = (heartbeats.iterator.map {
case ContractDelta(Vector(), Vector(), Some(currentOffset)) => currentOffset
}.toSet + lastSeenOffset).size - 1
} yield (
// don't count empty events block if lastSeenOffset does not change
ShouldHaveEnded(
liveStartOffset = liveStartOffset,
msgCount = 5 + hbCount,
lastSeenOffset = lastSeenOffset,
),
)
)
}
for {
r1 <- f1
_ = r1._1 shouldBe a[StatusCodes.Success]
cid1 = getContractId(getResult(r1._2))
r2 <- f2
_ = r2._1 shouldBe a[StatusCodes.Success]
cid2 = getContractId(getResult(r2._2))
(kill, source) = singleClientQueryStream(
jwtForParties(List("Alice", "Bob"), List(), testId),
uri,
query,
).viaMat(KillSwitches.single)(Keep.right).preMaterialize()
lastState <- source via parseResp runWith resp(cid1, cid2, kill)
liveOffset = inside(lastState) { case ShouldHaveEnded(liveStart, 5, lastSeen) =>
lastSeen.unwrap should be > liveStart.unwrap
liveStart
}
rescan <- (singleClientQueryStream(jwt, uri, query, Some(liveOffset))
via parseResp).take(1) runWith remainingDeltas
} yield inside(rescan) { case (Vector(_), _, Some(_)) =>
succeed
}
}
"fetch should receive deltas as contracts are archived/created, filtering out phantom archives" in withHttpService {
(uri, encoder, _) =>
val templateId = domain.TemplateId(None, "Account", "Account")
def fetchRequest(contractIdAtOffset: Option[Option[domain.ContractId]] = None) = {
import spray.json._, json.JsonProtocol._
List(
Map("templateId" -> "Account:Account".toJson, "key" -> List("Alice", "abc123").toJson)
++ contractIdAtOffset
.map(ocid => contractIdAtOffsetKey -> ocid.toJson)
.toList
).toJson.compactPrint
}
val f1 =
postCreateCommand(accountCreateCommand(domain.Party("Alice"), "abc123"), encoder, uri)
val f2 =
postCreateCommand(accountCreateCommand(domain.Party("Alice"), "def456"), encoder, uri)
def resp(
cid1: domain.ContractId,
cid2: domain.ContractId,
kill: UniqueKillSwitch,
): Sink[JsValue, Future[ShouldHaveEnded]] = {
val dslSyntax = Consume.syntax[JsValue]
import dslSyntax._
Consume.interpret(
for {
ContractDelta(Vector((cid, c)), Vector(), None) <- readOne
_ = (cid: String) shouldBe (cid1.unwrap: String)
ctid <- liftF(postArchiveCommand(templateId, cid2, encoder, uri).flatMap {
case (statusCode, _) =>
statusCode.isSuccess shouldBe true
postArchiveCommand(templateId, cid1, encoder, uri).map { case (statusCode, _) =>
statusCode.isSuccess shouldBe true
cid
}
})
ContractDelta(Vector(), _, Some(offset)) <- readOne
(off, archivedCid) = (offset, ctid)
ContractDelta(Vector(), Vector(observeArchivedCid), Some(lastSeenOffset)) <- readOne
(liveStartOffset, msgCount) = {
(observeArchivedCid.contractId.unwrap: String) shouldBe (archivedCid: String)
(observeArchivedCid.contractId: domain.ContractId) shouldBe (cid1: domain.ContractId)
(off, 0)
}
_ = kill.shutdown()
heartbeats <- drain
hbCount = (heartbeats.iterator.map {
case ContractDelta(Vector(), Vector(), Some(currentOffset)) => currentOffset
}.toSet + lastSeenOffset).size - 1
} yield
// don't count empty events block if lastSeenOffset does not change
ShouldHaveEnded(
liveStartOffset = liveStartOffset,
msgCount = msgCount + hbCount,
lastSeenOffset = lastSeenOffset,
)
)
}
for {
r1 <- f1
_ = r1._1 shouldBe a[StatusCodes.Success]
cid1 = getContractId(getResult(r1._2))
r2 <- f2
_ = r2._1 shouldBe a[StatusCodes.Success]
cid2 = getContractId(getResult(r2._2))
(kill, source) = singleClientFetchStream(jwt, uri, fetchRequest())
.viaMat(KillSwitches.single)(Keep.right)
.preMaterialize()
lastState <- source
.via(parseResp) runWith resp(cid1, cid2, kill)
liveOffset = inside(lastState) { case ShouldHaveEnded(liveStart, 0, lastSeen) =>
lastSeen.unwrap should be > liveStart.unwrap
liveStart
}
// check contractIdAtOffsets' effects on phantom filtering
resumes <- Future.traverse(Seq((None, 2L), (Some(None), 0L), (Some(Some(cid1)), 1L))) {
case (abcHint, expectArchives) =>
(singleClientFetchStream(
jwt,
uri,
fetchRequest(abcHint),
Some(liveOffset),
)
via parseResp)
.take(2)
.runWith(remainingDeltas)
.map { case (creates, archives, _) =>
creates shouldBe empty
archives should have size expectArchives
}
}
} yield resumes.foldLeft(1 shouldBe 1)((_, a) => a)
}
"fetch multiple keys should work" in withHttpService { (uri, encoder, _) =>
def create(account: String): Future[domain.ContractId] =
for {
r <- postCreateCommand(accountCreateCommand(domain.Party("Alice"), account), encoder, uri)
} yield {
assert(r._1.isSuccess)
getContractId(getResult(r._2))
}
def archive(id: domain.ContractId): Future[Assertion] =
for {
r <- postArchiveCommand(domain.TemplateId(None, "Account", "Account"), id, encoder, uri)
} yield {
assert(r._1.isSuccess)
}
val req =
"""
|[{"templateId": "Account:Account", "key": ["Alice", "abc123"]},
| {"templateId": "Account:Account", "key": ["Alice", "def456"]}]
|""".stripMargin
val futureResults =
singleClientFetchStream(jwt, uri, req)
.via(parseResp)
.filterNot(isOffsetTick)
.take(4)
.runWith(Sink.seq[JsValue])
for {
cid1 <- create("abc123")
_ <- create("abc124")
_ <- create("abc125")
cid2 <- create("def456")
_ <- archive(cid2)
_ <- archive(cid1)
results <- futureResults
} yield {
val expected: Seq[JsValue] = {
import spray.json._
Seq(
"""
|{"events":[{"created":{"payload":{"number":"abc123"}}}]}
|""".stripMargin.parseJson,
"""
|{"events":[{"created":{"payload":{"number":"def456"}}}]}
|""".stripMargin.parseJson,
"""
|{"events":[{"archived":{}}]}
|""".stripMargin.parseJson,
"""
|{"events":[{"archived":{}}]}
|""".stripMargin.parseJson,
)
}
results should matchJsValues(expected)
}
}
"multi-party fetch-by-key query should receive deltas as contracts are archived/created" in withHttpService {
(uri, encoder, _) =>
import spray.json._
val templateId = domain.TemplateId(None, "Account", "Account")
val f1 =
postCreateCommand(
accountCreateCommand(domain.Party("Alice"), "abc123"),
encoder,
uri,
headers = headersWithPartyAuth(List("Alice")),
)
val f2 =
postCreateCommand(
accountCreateCommand(domain.Party("Bob"), "def456"),
encoder,
uri,
headers = headersWithPartyAuth(List("Bob")),
)
val query =
"""[
{"templateId": "Account:Account", "key": ["Alice", "abc123"]},
{"templateId": "Account:Account", "key": ["Bob", "def456"]}
]"""
def resp(
cid1: domain.ContractId,
cid2: domain.ContractId,
kill: UniqueKillSwitch,
): Sink[JsValue, Future[ShouldHaveEnded]] = {
val dslSyntax = Consume.syntax[JsValue]
import dslSyntax._
Consume.interpret(
for {
Vector((account1, _), (account2, _)) <- readAcsN(2)
_ = Seq(account1, account2) should contain theSameElementsAs Seq(cid1, cid2)
ContractDelta(Vector(), _, Some(liveStartOffset)) <- readOne
_ <- liftF(postArchiveCommand(templateId, cid1, encoder, uri))
ContractDelta(Vector(), Vector(archivedCid1), Some(_)) <- readOne
_ = archivedCid1.contractId shouldBe cid1
_ <- liftF(
postArchiveCommand(
templateId,
cid2,
encoder,
uri,
headers = headersWithPartyAuth(List("Bob")),
)
)
ContractDelta(Vector(), Vector(archivedCid2), Some(lastSeenOffset)) <- readOne
_ = archivedCid2.contractId shouldBe cid2
_ = kill.shutdown()
heartbeats <- drain
hbCount = (heartbeats.iterator.map {
case ContractDelta(Vector(), Vector(), Some(currentOffset)) => currentOffset
}.toSet + lastSeenOffset).size - 1
} yield (
// don't count empty events block if lastSeenOffset does not change
ShouldHaveEnded(
liveStartOffset = liveStartOffset,
msgCount = 5 + hbCount,
lastSeenOffset = lastSeenOffset,
),
)
)
}
for {
r1 <- f1
_ = r1._1 shouldBe a[StatusCodes.Success]
cid1 = getContractId(getResult(r1._2))
r2 <- f2
_ = r2._1 shouldBe a[StatusCodes.Success]
cid2 = getContractId(getResult(r2._2))
(kill, source) = singleClientFetchStream(
jwtForParties(List("Alice", "Bob"), List(), testId),
uri,
query,
).viaMat(KillSwitches.single)(Keep.right).preMaterialize()
lastState <- source via parseResp runWith resp(cid1, cid2, kill)
liveOffset = inside(lastState) { case ShouldHaveEnded(liveStart, 5, lastSeen) =>
lastSeen.unwrap should be > liveStart.unwrap
liveStart
}
rescan <- (singleClientFetchStream(
jwtForParties(List("Alice", "Bob"), List(), testId),
uri,
query,
Some(liveOffset),
)
via parseResp).take(2) runWith remainingDeltas
} yield inside(rescan) { case (Vector(), Vector(_, _), Some(_)) =>
succeed
}
}
/** Consume ACS blocks expecting `createCount` contracts. Fail if there
* are too many contracts.
*/
private[this] def readAcsN(createCount: Int): Consume.FCC[JsValue, Vector[(String, JsValue)]] = {
val dslSyntax = Consume.syntax[JsValue]
import dslSyntax._
def go(createCount: Int): Consume.FCC[JsValue, Vector[(String, JsValue)]] =
if (createCount <= 0) point(Vector.empty)
else
for {
ContractDelta(creates, Vector(), None) <- readOne
found = creates.size
if found <= createCount
tail <- if (found < createCount) go(createCount - found) else point(Vector.empty)
} yield creates ++ tail
go(createCount)
}
"fetch should should return an error if empty list of (templateId, key) pairs is passed" in withHttpService {
(uri, _, _) =>
singleClientFetchStream(jwt, uri, "[]")
.runWith(collectResultsAsTextMessageSkipOffsetTicks)
.map { clientMsgs =>
inside(clientMsgs) { case Seq(errorMsg) =>
val errorResponse = decodeErrorResponse(errorMsg)
errorResponse.status shouldBe StatusCodes.BadRequest
inside(errorResponse.errors) { case List(error) =>
error should include("must be a JSON array with at least 1 element")
}
}
}: Future[Assertion]
}
"query on a bunch of random splits should yield consistent results" in withHttpService {
(uri, _, _) =>
val splitSample = SplitSeq.gen.map(_ map (BigDecimal(_))).sample.get
val query =
"""[
{"templateIds": ["Iou:Iou"]}
]"""
val (kill, source) = singleClientQueryStream(jwt, uri, query)
.viaMat(KillSwitches.single)(Keep.right)
.preMaterialize()
source
.via(parseResp)
.map(iouSplitResult)
.filterNot(_ == \/-((Vector(), Vector()))) // liveness marker/heartbeat
.runWith(Consume.interpret(trialSplitSeq(uri, splitSample, kill)))
}
private def trialSplitSeq(
serviceUri: Uri,
ss: SplitSeq[BigDecimal],
kill: UniqueKillSwitch,
): Consume.FCC[IouSplitResult, Assertion] = {
val dslSyntax = Consume.syntax[IouSplitResult]
import SplitSeq._
import dslSyntax._
def go(
createdCid: domain.ContractId,
ss: SplitSeq[BigDecimal],
): Consume.FCC[IouSplitResult, Assertion] = ss match {
case Leaf(_) =>
point(1 shouldBe 1)
case Node(_, l, r) =>
for {
(StatusCodes.OK, _) <- liftF(
postJsonRequest(
serviceUri.withPath(Uri.Path("/v1/exercise")),
exercisePayload(createdCid, l.x),
headersWithAuth,
)
)
\/-((Vector((cid1, amt1), (cid2, amt2)), Vector(archival))) <- readOne
(lCid, rCid) = {
archival should ===(createdCid)
Set(amt1, amt2) should ===(Set(l.x, r.x))
if (amt1 == l.x) (cid1, cid2) else (cid2, cid1)
}
_ <- go(lCid, l)
last <- go(rCid, r)
} yield last
}
val initialPayload = {
import spray.json._, json.JsonProtocol._
Map(
"templateId" -> "Iou:Iou".toJson,
"payload" -> Map(
"observers" -> List[String]().toJson,
"issuer" -> "Alice".toJson,
"amount" -> ss.x.toJson,
"currency" -> "USD".toJson,
"owner" -> "Alice".toJson,
).toJson,
).toJson
}
for {
(StatusCodes.OK, _) <- liftF(
postJsonRequest(
serviceUri.withPath(Uri.Path("/v1/create")),
initialPayload,
headersWithAuth,
)
)
\/-((Vector((genesisCid, amt)), Vector())) <- readOne
_ = amt should ===(ss.x)
last <- go(genesisCid, ss)
_ = kill.shutdown()
} yield last
}
private def iouSplitResult(jsv: JsValue): IouSplitResult = jsv match {
case ContractDelta(creates, archives, _) =>
creates traverse {
case (cid, JsObject(fields)) =>
fields get "amount" collect { case JsString(amt) =>
(domain.ContractId(cid), BigDecimal(amt))
}
case _ => None
} map ((_, archives map (_.contractId))) toRightDisjunction jsv
case _ => -\/(jsv)
}
"ContractKeyStreamRequest" - {
import spray.json._, json.JsonProtocol._
val baseVal =
domain.EnrichedContractKey(domain.TemplateId(Some("ab"), "cd", "ef"), JsString("42"): JsValue)
val baseMap = baseVal.toJson.asJsObject.fields
val withSome = JsObject(baseMap + (contractIdAtOffsetKey -> JsString("hi")))
val withNone = JsObject(baseMap + (contractIdAtOffsetKey -> JsNull))
"initial JSON reader" - {
type T = domain.ContractKeyStreamRequest[Unit, JsValue]
"shares EnrichedContractKey format" in {
JsObject(baseMap).convertTo[T] should ===(domain.ContractKeyStreamRequest((), baseVal))
}
"errors on contractIdAtOffset presence" in {
a[DeserializationException] shouldBe thrownBy {
withSome.convertTo[T]
}
a[DeserializationException] shouldBe thrownBy {
withNone.convertTo[T]
}
}
}
"resuming JSON reader" - {
type T = domain.ContractKeyStreamRequest[Option[Option[domain.ContractId]], JsValue]
"shares EnrichedContractKey format" in {
JsObject(baseMap).convertTo[T] should ===(domain.ContractKeyStreamRequest(None, baseVal))
}
"distinguishes null and string" in {
withSome.convertTo[T] should ===(domain.ContractKeyStreamRequest(Some(Some("hi")), baseVal))
withNone.convertTo[T] should ===(domain.ContractKeyStreamRequest(Some(None), baseVal))
}
}
}
private def wsConnectRequest[M](
uri: Uri,
subprotocol: Option[String],
input: Source[Message, NotUsed],
) =
Http().singleWebSocketRequest(
request = WebSocketRequest(uri = uri, subprotocol = subprotocol),
clientFlow = dummyFlow(input),
)
private def assertHeartbeat(str: String): Assertion =
inside(
SprayJson
.decode[EventsBlock](str)
) { case \/-(eventsBlock) =>
eventsBlock.events shouldBe Vector.empty[JsValue]
inside(eventsBlock.offset) {
case Some(JsString(offset)) =>
offset.length should be > 0
case Some(JsNull) =>
Succeeded
}
}
private def decodeErrorResponse(str: String): domain.ErrorResponse = {
import json.JsonProtocol._
inside(SprayJson.decode[domain.ErrorResponse](str)) { case \/-(e) =>
e
}
}
private def decodeServiceWarning(str: String): domain.ServiceWarning = {
import json.JsonProtocol._
inside(SprayJson.decode[domain.AsyncWarningsWrapper](str)) { case \/-(w) =>
w.warnings
}
}
}

View File

@ -105,6 +105,9 @@ private[http] final case class JdbcConfig(
)
private[http] object JdbcConfig extends ConfigCompanion[JdbcConfig]("JdbcConfig") {
import dbbackend.ContractDao.supportedJdbcDriverNames
private[this] def supportedDriversHelp = supportedJdbcDriverNames mkString ", "
implicit val showInstance: Show[JdbcConfig] = Show.shows(a =>
s"JdbcConfig(driver=${a.driver}, url=${a.url}, user=${a.user}, createSchema=${a.createSchema})"
@ -112,8 +115,8 @@ private[http] object JdbcConfig extends ConfigCompanion[JdbcConfig]("JdbcConfig"
lazy val help: String =
"Contains comma-separated key-value pairs. Where:\n" +
s"${indent}driver -- JDBC driver class name, only org.postgresql.Driver supported right now,\n" +
s"${indent}url -- JDBC connection URL, only jdbc:postgresql supported right now,\n" +
s"${indent}driver -- JDBC driver class name, $supportedDriversHelp supported right now,\n" +
s"${indent}url -- JDBC connection URL,\n" +
s"${indent}user -- database user name,\n" +
s"${indent}password -- database user password,\n" +
s"${indent}createSchema -- boolean flag, if set to true, the process will re-create database schema and terminate immediately.\n" +
@ -136,6 +139,11 @@ private[http] object JdbcConfig extends ConfigCompanion[JdbcConfig]("JdbcConfig"
override def create(x: Map[String, String]): Either[String, JdbcConfig] =
for {
driver <- requiredField(x)("driver")
_ <- Either.cond(
supportedJdbcDriverNames(driver),
(),
s"$driver unsupported. Supported drivers: $supportedDriversHelp",
)
url <- requiredField(x)("url")
user <- requiredField(x)("user")
password <- requiredField(x)("password")

View File

@ -19,12 +19,10 @@ import spray.json.{JsNull, JsValue}
import scala.concurrent.ExecutionContext
class ContractDao(xa: Connection.T) {
class ContractDao private (xa: Connection.T)(implicit val jdbcDriver: SupportedJdbcDriver) {
implicit val logHandler: log.LogHandler = Slf4jLogHandler(classOf[ContractDao])
implicit val jdbcDriver: SupportedJdbcDriver = SupportedJdbcDriver.Postgres
def transact[A](query: ConnectionIO[A]): IO[A] =
query.transact(xa)
@ -33,10 +31,25 @@ class ContractDao(xa: Connection.T) {
}
object ContractDao {
private[this] val supportedJdbcDrivers = Map(
"org.postgresql.Driver" -> SupportedJdbcDriver.Postgres,
"oracle.jdbc.OracleDriver" -> SupportedJdbcDriver.Oracle,
)
lazy val supportedJdbcDriverNames = supportedJdbcDrivers.keySet filter { d =>
scala.util.Try(Class forName d).isSuccess
}
def apply(jdbcDriver: String, jdbcUrl: String, username: String, password: String)(implicit
ec: ExecutionContext
): ContractDao = {
val cs: ContextShift[IO] = IO.contextShift(ec)
implicit val sjd: SupportedJdbcDriver = supportedJdbcDrivers.getOrElse(
jdbcDriver,
throw new IllegalArgumentException(
s"JDBC driver $jdbcDriver is not one of ${supportedJdbcDrivers.keySet}"
),
)
new ContractDao(Connection.connect(jdbcDriver, jdbcUrl, username, password)(cs))
}

View File

@ -37,4 +37,15 @@ object SupportedJdbcDriver {
Set(postgres_class23.UNIQUE_VIOLATION.value, ContractDao.StaleOffsetException.SqlState),
)
}
val Oracle: SupportedJdbcDriver = {
// import doobie.postgres.implicits.unliftedStringArrayType // TODO s11 just a thought
implicit val qqq: doobie.Meta[Array[String]] =
doobie.Meta[Int].timap(Array.fill(_)("x"))(_.length)
new SupportedJdbcDriver(
label = "Oracle",
queries = Queries.Oracle,
retrySqlStates = Set( /*s11 TODO, */ ContractDao.StaleOffsetException.SqlState),
)
}
}

View File

@ -1,6 +1,6 @@
{
"dependency_tree": {
"__AUTOGENERATED_FILE_DO_NOT_MODIFY_THIS_FILE_MANUALLY": 1301820626,
"__AUTOGENERATED_FILE_DO_NOT_MODIFY_THIS_FILE_MANUALLY": 702334853,
"conflict_resolution": {},
"dependencies": [
{
@ -1530,6 +1530,28 @@
"sha256": "3ae59d060ba74fbfd511423e0ce6b85cb5e65699a6b7cd331b8ab80dde76198e",
"url": "https://repo1.maven.org/maven2/com/lihaoyi/upickle-core_2.12/1.2.0/upickle-core_2.12-1.2.0-sources.jar"
},
{
"coord": "com.oracle.database.jdbc:ojdbc8:19.8.0.0",
"dependencies": [],
"directDependencies": [],
"file": "v1/https/repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/19.8.0.0/ojdbc8-19.8.0.0.jar",
"mirror_urls": [
"https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/19.8.0.0/ojdbc8-19.8.0.0.jar"
],
"sha256": "3ddde85f9a33d774c23b930b4ee27acb4d412c7c8c3aab4655bc1ee72339ac8d",
"url": "https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/19.8.0.0/ojdbc8-19.8.0.0.jar"
},
{
"coord": "com.oracle.database.jdbc:ojdbc8:jar:sources:19.8.0.0",
"dependencies": [],
"directDependencies": [],
"file": "v1/https/repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/19.8.0.0/ojdbc8-19.8.0.0-sources.jar",
"mirror_urls": [
"https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/19.8.0.0/ojdbc8-19.8.0.0-sources.jar"
],
"sha256": "25a9cb395eee4096d117e8146a48dfa2c748eea7884b0fa01d8a1b99f399d01d",
"url": "https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/19.8.0.0/ojdbc8-19.8.0.0-sources.jar"
},
{
"coord": "com.rabbitmq:amqp-client:5.5.3",
"dependencies": [],

View File

@ -1,6 +1,6 @@
{
"dependency_tree": {
"__AUTOGENERATED_FILE_DO_NOT_MODIFY_THIS_FILE_MANUALLY": 1532588487,
"__AUTOGENERATED_FILE_DO_NOT_MODIFY_THIS_FILE_MANUALLY": -1217143180,
"conflict_resolution": {},
"dependencies": [
{
@ -1486,6 +1486,28 @@
"sha256": "047ffe7c529b7cbbd3efa7e35a7ce27d3c4ffae05fa77f1ff4cfd46a8b40a52a",
"url": "https://repo1.maven.org/maven2/com/lihaoyi/upickle-core_2.13/1.2.0/upickle-core_2.13-1.2.0-sources.jar"
},
{
"coord": "com.oracle.database.jdbc:ojdbc8:19.8.0.0",
"dependencies": [],
"directDependencies": [],
"file": "v1/https/repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/19.8.0.0/ojdbc8-19.8.0.0.jar",
"mirror_urls": [
"https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/19.8.0.0/ojdbc8-19.8.0.0.jar"
],
"sha256": "3ddde85f9a33d774c23b930b4ee27acb4d412c7c8c3aab4655bc1ee72339ac8d",
"url": "https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/19.8.0.0/ojdbc8-19.8.0.0.jar"
},
{
"coord": "com.oracle.database.jdbc:ojdbc8:jar:sources:19.8.0.0",
"dependencies": [],
"directDependencies": [],
"file": "v1/https/repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/19.8.0.0/ojdbc8-19.8.0.0-sources.jar",
"mirror_urls": [
"https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/19.8.0.0/ojdbc8-19.8.0.0-sources.jar"
],
"sha256": "25a9cb395eee4096d117e8146a48dfa2c748eea7884b0fa01d8a1b99f399d01d",
"url": "https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/19.8.0.0/ojdbc8-19.8.0.0-sources.jar"
},
{
"coord": "com.rabbitmq:amqp-client:5.5.3",
"dependencies": [],