diff --git a/bazel-java-deps.bzl b/bazel-java-deps.bzl index 1b87d80c3b..46e560090a 100644 --- a/bazel-java-deps.bzl +++ b/bazel-java-deps.bzl @@ -56,6 +56,7 @@ def install_java_deps(): "com.lihaoyi:pprint_{}:0.6.0".format(scala_major_version), "com.lihaoyi:sjsonnet_{}:0.3.0".format(scala_major_version), "commons-io:commons-io:2.5", + "com.oracle.database.jdbc:ojdbc8:19.8.0.0", "com.sparkjava:spark-core:2.9.1", "com.squareup:javapoet:1.11.1", "com.storm-enroute:scalameter_{}:0.19".format(scala_major_version), diff --git a/ci/build.yml b/ci/build.yml index f9510b16bf..5d5a29d3ac 100644 --- a/ci/build.yml +++ b/ci/build.yml @@ -240,7 +240,10 @@ jobs: echo "Could not connect to Oracle, trying again..." sleep 1 done - # TODO Actually run some tests once we have them. + # Actually run some tests + bazel test --//ledger-service/http-json-oracle:oracle_testing=yes \ + --test_env ORACLE_USERNAME=system --test_env ORACLE_PORT --test_env ORACLE_PWD \ + //ledger-service/http-json-oracle/... env: ARTIFACTORY_USERNAME: $(ARTIFACTORY_USERNAME) ARTIFACTORY_PASSWORD: $(ARTIFACTORY_PASSWORD) diff --git a/ledger-service/db-backend/BUILD.bazel b/ledger-service/db-backend/BUILD.bazel index 91ee179430..25262cb70a 100644 --- a/ledger-service/db-backend/BUILD.bazel +++ b/ledger-service/db-backend/BUILD.bazel @@ -27,7 +27,6 @@ da_scala_library( "@maven//:org_tpolecat_doobie_postgres", ], scalacopts = lf_scalacopts, - silent_annotations = True, tags = ["maven_coordinates=com.daml:http-json-db-backend:__VERSION__"], visibility = [ "//visibility:public", diff --git a/ledger-service/db-backend/src/main/scala/com/digitalasset/http/dbbackend/Queries.scala b/ledger-service/db-backend/src/main/scala/com/digitalasset/http/dbbackend/Queries.scala index 1150dcad2c..c755374a80 100644 --- a/ledger-service/db-backend/src/main/scala/com/digitalasset/http/dbbackend/Queries.scala +++ b/ledger-service/db-backend/src/main/scala/com/digitalasset/http/dbbackend/Queries.scala @@ -3,8 +3,6 @@ package com.daml.http.dbbackend -import com.github.ghik.silencer.silent - import doobie._ import doobie.implicits._ import scalaz.{@@, Foldable, Functor, OneAnd, Tag} @@ -23,79 +21,87 @@ import cats.syntax.apply._ import cats.syntax.functor._ sealed abstract class Queries { - import Queries._ + import Queries._, InitDdl._ import Implicits._ - def dropTableIfExists(table: String): Fragment = Fragment.const(s"DROP TABLE IF EXISTS ${table}") + protected[this] def dropTableIfExists(table: String): Fragment /** for use when generating predicates */ private[http] val contractColumnName: Fragment = sql"payload" - private[this] val dropContractsTable: Fragment = dropTableIfExists("contract") - - private[this] val createContractsTable: Fragment = sql""" + private[this] val createContractsTable = CreateTable( + "contract", + sql""" CREATE TABLE contract - (contract_id TEXT NOT NULL PRIMARY KEY - ,tpid BIGINT NOT NULL REFERENCES template_id (tpid) - ,key JSONB NOT NULL - ,payload JSONB NOT NULL - ,signatories TEXT ARRAY NOT NULL - ,observers TEXT ARRAY NOT NULL - ,agreement_text TEXT NOT NULL + (contract_id """ ++ textType ++ sql""" NOT NULL PRIMARY KEY + ,tpid """ ++ bigIntType ++ sql""" NOT NULL REFERENCES template_id (tpid) + ,""" ++ jsonColumn(sql"key") ++ sql""" + ,""" ++ jsonColumn(contractColumnName) ++ + contractsTableSignatoriesObservers ++ sql""" + ,agreement_text """ ++ agreementTextType ++ sql""" ) - """ + """, + ) - val indexContractsTable: Fragment = sql""" + protected[this] def contractsTableSignatoriesObservers: Fragment + + private[this] val indexContractsTable = CreateIndex(sql""" CREATE INDEX contract_tpid_idx ON contract (tpid) - """ + """) - private[this] val indexContractsKeys: Fragment = sql""" - CREATE INDEX contract_tpid_key_idx ON contract USING BTREE (tpid, key) - """ - - private[this] val dropOffsetTable: Fragment = dropTableIfExists("ledger_offset") - - private[this] val createOffsetTable: Fragment = sql""" + private[this] val createOffsetTable = CreateTable( + "ledger_offset", + sql""" CREATE TABLE ledger_offset - (party TEXT NOT NULL - ,tpid BIGINT NOT NULL REFERENCES template_id (tpid) - ,last_offset TEXT NOT NULL + (party """ ++ textType ++ sql""" NOT NULL + ,tpid """ ++ bigIntType ++ sql""" NOT NULL REFERENCES template_id (tpid) + ,last_offset """ ++ textType ++ sql""" NOT NULL ,PRIMARY KEY (party, tpid) ) - """ + """, + ) - private[this] val dropTemplateIdsTable: Fragment = dropTableIfExists("template_id") + protected[this] def bigIntType: Fragment // must match bigserial + protected[this] def bigSerialType: Fragment + protected[this] def textType: Fragment + protected[this] def agreementTextType: Fragment - private[this] val createTemplateIdsTable: Fragment = sql""" + protected[this] def jsonColumn(name: Fragment): Fragment + + private[this] val createTemplateIdsTable = CreateTable( + "template_id", + sql""" CREATE TABLE template_id - (tpid BIGSERIAL NOT NULL PRIMARY KEY - ,package_id TEXT NOT NULL - ,template_module_name TEXT NOT NULL - ,template_entity_name TEXT NOT NULL + (tpid """ ++ bigSerialType ++ sql""" NOT NULL PRIMARY KEY + ,package_id """ ++ textType ++ sql""" NOT NULL + ,template_module_name """ ++ textType ++ sql""" NOT NULL + ,template_entity_name """ ++ textType ++ sql""" NOT NULL ,UNIQUE (package_id, template_module_name, template_entity_name) ) - """ + """, + ) - private[http] def dropAllTablesIfExist(implicit log: LogHandler): ConnectionIO[Unit] = - (dropContractsTable.update.run - *> dropOffsetTable.update.run - *> dropTemplateIdsTable.update.run).void + private[http] def dropAllTablesIfExist(implicit log: LogHandler): ConnectionIO[Unit] = { + import cats.instances.vector._, cats.syntax.foldable.{toFoldableOps => ToFoldableOps} + initDatabaseDdls.reverse + .collect { case CreateTable(name, _) => dropTableIfExists(name) } + .traverse_(_.update.run) + } - private[this] def initDatabaseDdls: Vector[Fragment] = + protected[this] def initDatabaseDdls: Vector[InitDdl] = Vector( createTemplateIdsTable, createOffsetTable, createContractsTable, indexContractsTable, - indexContractsKeys, ) private[http] def initDatabase(implicit log: LogHandler): ConnectionIO[Unit] = { import cats.instances.vector._, cats.syntax.foldable.{toFoldableOps => ToFoldableOps} - initDatabaseDdls.traverse_(_.update.run) + initDatabaseDdls.traverse_(_.create.update.run) } def surrogateTemplateId(packageId: String, moduleName: String, entityName: String)(implicit @@ -114,13 +120,15 @@ sealed abstract class Queries { ) } - def lastOffset(parties: OneAnd[Set, String], tpid: SurrogateTpId)(implicit - log: LogHandler, - pls: Put[Vector[String]], + final def lastOffset(parties: OneAnd[Set, String], tpid: SurrogateTpId)(implicit + log: LogHandler ): ConnectionIO[Map[String, String]] = { - val partyVector = parties.toVector - sql"""SELECT party, last_offset FROM ledger_offset WHERE (party = ANY(${partyVector}) AND tpid = $tpid)""" - .query[(String, String)] + val partyVector = + cats.data.OneAnd(parties.head, parties.tail.toList) + val q = sql""" + SELECT party, last_offset FROM ledger_offset WHERE tpid = $tpid AND + """ ++ Fragments.in(fr"party", partyVector) + q.query[(String, String)] .to[Vector] .map(_.toMap) } @@ -145,8 +153,7 @@ sealed abstract class Queries { tpid: SurrogateTpId, newOffset: String, lastOffsets: Map[String, String], - )(implicit log: LogHandler, pls: Put[List[String]]): ConnectionIO[Int] = { - import spray.json.DefaultJsonProtocol._ + )(implicit log: LogHandler): ConnectionIO[Int] = { val (existingParties, newParties) = { import cats.syntax.foldable._ parties.toList.partition(p => lastOffsets.contains(p)) @@ -158,36 +165,52 @@ sealed abstract class Queries { ) // If a concurrent transaction updated the offset for an existing party, we will get // fewer rows and throw a StaleOffsetException in the caller. - val update = - sql"""UPDATE ledger_offset SET last_offset = $newOffset WHERE party = ANY($existingParties::text[]) AND tpid = $tpid AND last_offset = (${lastOffsets.toJson}::jsonb->>party)""" + val update = existingParties match { + case hdP +: tlP => + Some( + sql"""UPDATE ledger_offset SET last_offset = $newOffset + WHERE """ ++ Fragments.in(fr"party", cats.data.OneAnd(hdP, tlP)) ++ + sql""" AND tpid = $tpid + AND last_offset = """ ++ caseLookup( + lastOffsets.filter { case (k, _) => existingParties contains k }, + fr"party", + ) + ) + case _ => None + } for { inserted <- if (newParties.isEmpty) { Applicative[ConnectionIO].pure(0) } else { insert.updateMany(newParties.toList.map(p => (p, tpid, newOffset))) } - updated <- - if (existingParties.isEmpty) { Applicative[ConnectionIO].pure(0) } - else { - update.update.run - } + updated <- update.cata(_.update.run, Applicative[ConnectionIO].pure(0)) } yield { inserted + updated } } - @silent(" pas .* never used") - def insertContracts[F[_]: cats.Foldable: Functor, CK: JsonWriter, PL: JsonWriter]( + private[this] def caseLookup(m: Map[String, String], selector: Fragment): Fragment = + fr"CASE" ++ { + assert(m.nonEmpty, "existing offsets must be non-empty") + val when +: whens = m.iterator.map { case (k, v) => + fr"WHEN (" ++ selector ++ fr" = $k) THEN $v" + }.toVector + concatFragment(OneAnd(when, whens)) + } ++ fr"ELSE NULL END" + + // different databases encode contract keys in different formats + protected[this] type DBContractKey + protected[this] def toDBContractKey[CK: JsonWriter](ck: CK): DBContractKey + + final def insertContracts[F[_]: cats.Foldable: Functor, CK: JsonWriter, PL: JsonWriter]( dbcs: F[DBContract[SurrogateTpId, CK, PL, Seq[String]]] )(implicit log: LogHandler, pas: Put[Array[String]]): ConnectionIO[Int] = - Update[DBContract[SurrogateTpId, JsValue, JsValue, Array[String]]]( - """ - INSERT INTO contract - VALUES (?, ?, ?::jsonb, ?::jsonb, ?, ?, ?) - ON CONFLICT (contract_id) DO NOTHING - """, - logHandler0 = log, - ).updateMany(dbcs.map(_.mapKeyPayloadParties(_.toJson, _.toJson, _.toArray))) + primInsertContracts(dbcs.map(_.mapKeyPayloadParties(toDBContractKey(_), _.toJson, _.toArray))) - def deleteContracts[F[_]: Foldable]( + protected[this] def primInsertContracts[F[_]: cats.Foldable: Functor]( + dbcs: F[DBContract[SurrogateTpId, DBContractKey, JsValue, Array[String]]] + )(implicit log: LogHandler, pas: Put[Array[String]]): ConnectionIO[Int] + + final def deleteContracts[F[_]: Foldable]( cids: F[String] )(implicit log: LogHandler): ConnectionIO[Int] = { cids.toVector match { @@ -199,8 +222,7 @@ sealed abstract class Queries { } } - @silent(" gvs .* never used") - private[http] def selectContracts( + private[http] final def selectContracts( parties: OneAnd[Set, String], tpid: SurrogateTpId, predicate: Fragment, @@ -208,25 +230,9 @@ sealed abstract class Queries { log: LogHandler, gvs: Get[Vector[String]], pvs: Put[Vector[String]], - ): Query0[DBContract[Unit, JsValue, JsValue, Vector[String]]] = { - val partyVector = parties.toVector - val q = sql"""SELECT contract_id, key, payload, signatories, observers, agreement_text - FROM contract - WHERE (signatories && $partyVector::text[] OR observers && $partyVector::text[]) - AND tpid = $tpid AND (""" ++ predicate ++ sql")" - q.query[(String, JsValue, JsValue, Vector[String], Vector[String], String)].map { - case (cid, key, payload, signatories, observers, agreement) => - DBContract( - contractId = cid, - templateId = (), - key = key, - payload = payload, - signatories = signatories, - observers = observers, - agreementText = agreement, - ) - } - } + ): Query0[DBContract[Unit, JsValue, JsValue, Vector[String]]] = + selectContractsMultiTemplate(parties, Seq((tpid, predicate)), MatchedQueryMarker.Unused) + .map(_ copy (templateId = ())) /** Make the smallest number of queries from `queries` that still indicates * which query or queries produced each contract. @@ -236,7 +242,6 @@ sealed abstract class Queries { * `templateId` of the resulting [[DBContract]] is actually the 0-based index * into the `queries` argument that produced the contract. */ - @silent(" gvs .* never used") private[http] def selectContractsMultiTemplate[T[_], Mark]( parties: OneAnd[Set, String], queries: Seq[(SurrogateTpId, Fragment)], @@ -245,50 +250,9 @@ sealed abstract class Queries { log: LogHandler, gvs: Get[Vector[String]], pvs: Put[Vector[String]], - ): T[Query0[DBContract[Mark, JsValue, JsValue, Vector[String]]]] = { - val partyVector = parties.toVector - def query(preds: OneAnd[Vector, (SurrogateTpId, Fragment)], findMark: SurrogateTpId => Mark) = { - val assocedPreds = preds.map { case (tpid, predicate) => - sql"(tpid = $tpid AND (" ++ predicate ++ sql"))" - } - val unionPred = concatFragment(intersperse(assocedPreds, sql" OR ")) - val q = sql"""SELECT contract_id, tpid, key, payload, signatories, observers, agreement_text - FROM contract AS c - WHERE (signatories && $partyVector::text[] OR observers && $partyVector::text[]) - AND (""" ++ unionPred ++ sql")" - q.query[(String, SurrogateTpId, JsValue, JsValue, Vector[String], Vector[String], String)] - .map { case (cid, tpid, key, payload, signatories, observers, agreement) => - DBContract( - contractId = cid, - templateId = findMark(tpid), - key = key, - payload = payload, - signatories = signatories, - observers = observers, - agreementText = agreement, - ) - } - } + ): T[Query0[DBContract[Mark, JsValue, JsValue, Vector[String]]]] - trackMatchIndices match { - case MatchedQueryMarker.ByInt => - type Ix = Int - uniqueSets(queries.zipWithIndex map { case ((tpid, pred), ix) => (tpid, (pred, ix)) }).map { - preds: Map[SurrogateTpId, (Fragment, Ix)] => - val predHd +: predTl = preds.toVector - val predsList = OneAnd(predHd, predTl).map { case (tpid, (predicate, _)) => - (tpid, predicate) - } - query(predsList, tpid => preds(tpid)._2) - } - - case MatchedQueryMarker.Unused => - val predHd +: predTl = queries.toVector - query(OneAnd(predHd, predTl), identity) - } - } - - private[http] def fetchById( + private[http] final def fetchById( parties: OneAnd[Set, String], tpid: SurrogateTpId, contractId: String, @@ -349,6 +313,15 @@ object Queries { ) } + private[dbbackend] sealed abstract class InitDdl extends Product with Serializable { + def create: Fragment + } + + private[dbbackend] object InitDdl { + final case class CreateTable(name: String, create: Fragment) extends InitDdl + final case class CreateIndex(create: Fragment) extends InitDdl + } + /** Whether selectContractsMultiTemplate computes a matchedQueries marker, * and whether it may compute >1 query to run. * @@ -396,6 +369,226 @@ object Queries { } private[http] val Postgres: Queries = PostgresQueries + private[http] val Oracle: Queries = OracleQueries } -private object PostgresQueries extends Queries +private object PostgresQueries extends Queries { + import Queries._, Queries.InitDdl.CreateIndex + import Implicits._ + + protected[this] override def dropTableIfExists(table: String) = + Fragment.const(s"DROP TABLE IF EXISTS ${table}") + + protected[this] override def bigIntType = sql"BIGINT" + protected[this] override def bigSerialType = sql"BIGSERIAL" + protected[this] override def textType = sql"TEXT" + protected[this] override def agreementTextType = sql"TEXT NOT NULL" + + protected[this] override def jsonColumn(name: Fragment) = name ++ sql" JSONB NOT NULL" + + private[this] val indexContractsKeys = CreateIndex(sql""" + CREATE INDEX contract_tpid_key_idx ON contract USING BTREE (tpid, key) + """) + + protected[this] override def initDatabaseDdls = super.initDatabaseDdls :+ indexContractsKeys + + protected[this] override def contractsTableSignatoriesObservers = sql""" + ,signatories TEXT ARRAY NOT NULL + ,observers TEXT ARRAY NOT NULL + """ + + protected[this] type DBContractKey = JsValue + + protected[this] override def toDBContractKey[CK: JsonWriter](x: CK) = x.toJson + + protected[this] override def primInsertContracts[F[_]: cats.Foldable: Functor]( + dbcs: F[DBContract[SurrogateTpId, DBContractKey, JsValue, Array[String]]] + )(implicit log: LogHandler, pas: Put[Array[String]]): ConnectionIO[Int] = + Update[DBContract[SurrogateTpId, JsValue, JsValue, Array[String]]]( + """ + INSERT INTO contract + VALUES (?, ?, ?::jsonb, ?::jsonb, ?, ?, ?) + ON CONFLICT (contract_id) DO NOTHING + """, + logHandler0 = log, + ).updateMany(dbcs) + + private[http] override def selectContractsMultiTemplate[T[_], Mark]( + parties: OneAnd[Set, String], + queries: Seq[(SurrogateTpId, Fragment)], + trackMatchIndices: MatchedQueryMarker[T, Mark], + )(implicit + log: LogHandler, + gvs: Get[Vector[String]], + pvs: Put[Vector[String]], + ): T[Query0[DBContract[Mark, JsValue, JsValue, Vector[String]]]] = { + val partyVector = parties.toVector + def query(preds: OneAnd[Vector, (SurrogateTpId, Fragment)], findMark: SurrogateTpId => Mark) = { + val assocedPreds = preds.map { case (tpid, predicate) => + sql"(tpid = $tpid AND (" ++ predicate ++ sql"))" + } + val unionPred = concatFragment(intersperse(assocedPreds, sql" OR ")) + val q = sql"""SELECT contract_id, tpid, key, payload, signatories, observers, agreement_text + FROM contract AS c + WHERE (signatories && $partyVector::text[] OR observers && $partyVector::text[]) + AND (""" ++ unionPred ++ sql")" + q.query[(String, SurrogateTpId, JsValue, JsValue, Vector[String], Vector[String], String)] + .map { case (cid, tpid, key, payload, signatories, observers, agreement) => + DBContract( + contractId = cid, + templateId = findMark(tpid), + key = key, + payload = payload, + signatories = signatories, + observers = observers, + agreementText = agreement, + ) + } + } + + trackMatchIndices match { + case MatchedQueryMarker.ByInt => + type Ix = Int + uniqueSets(queries.zipWithIndex map { case ((tpid, pred), ix) => (tpid, (pred, ix)) }).map { + preds: Map[SurrogateTpId, (Fragment, Ix)] => + val predHd +: predTl = preds.toVector + val predsList = OneAnd(predHd, predTl).map { case (tpid, (predicate, _)) => + (tpid, predicate) + } + query(predsList, tpid => preds(tpid)._2) + } + + case MatchedQueryMarker.Unused => + val predHd +: predTl = queries.toVector + query(OneAnd(predHd, predTl), identity) + } + } +} + +private object OracleQueries extends Queries { + import Queries.{DBContract, MatchedQueryMarker, SurrogateTpId}, Queries.InitDdl.CreateTable + import Implicits._ + + protected[this] override def dropTableIfExists(table: String) = sql"""BEGIN + EXECUTE IMMEDIATE 'DROP TABLE ' || $table; + EXCEPTION + WHEN OTHERS THEN + IF SQLCODE != -942 THEN + RAISE; + END IF; + END;""" + + protected[this] override def bigIntType = sql"NUMBER(19,0)" + protected[this] override def bigSerialType = + bigIntType ++ sql" GENERATED ALWAYS AS IDENTITY" + // TODO SC refine the string formats chosen here and for jsonColumn + protected[this] override def textType = sql"NVARCHAR2(100)" + protected[this] override def agreementTextType = sql"NVARCHAR2(100)" + + protected[this] override def jsonColumn(name: Fragment) = + name ++ sql" CLOB NOT NULL CONSTRAINT ensure_json_" ++ name ++ sql" CHECK (" ++ name ++ sql" IS JSON)" + + protected[this] override def contractsTableSignatoriesObservers = sql"" + + private val createSignatoriesTable = CreateTable( + "signatories", + sql""" + CREATE TABLE + signatories + (contract_id NVARCHAR2(100) NOT NULL REFERENCES contract(contract_id) ON DELETE CASCADE + ,party NVARCHAR2(100) NOT NULL + ,UNIQUE (contract_id, party) + ) + """, + ) + + private val createObserversTable = CreateTable( + "observers", + sql""" + CREATE TABLE + observers + (contract_id NVARCHAR2(100) NOT NULL REFERENCES contract(contract_id) ON DELETE CASCADE + ,party NVARCHAR2(100) NOT NULL + ,UNIQUE (contract_id, party) + ) + """, + ) + + protected[this] override def initDatabaseDdls = + super.initDatabaseDdls ++ Seq(createSignatoriesTable, createObserversTable) + + protected[this] type DBContractKey = JsValue + + protected[this] override def toDBContractKey[CK: JsonWriter](x: CK) = + JsObject(Map("key" -> x.toJson)) + + protected[this] override def primInsertContracts[F[_]: cats.Foldable: Functor]( + dbcs: F[DBContract[SurrogateTpId, DBContractKey, JsValue, Array[String]]] + )(implicit log: LogHandler, pas: Put[Array[String]]): ConnectionIO[Int] = { + println("insert contracts") + println(dbcs) + val r = Update[(String, SurrogateTpId, JsValue, JsValue, String)]( + """ + INSERT INTO contract(contract_id, tpid, key, payload, agreement_text) + VALUES (?, ?, ?, ?, ?) + """, + logHandler0 = log, + ).updateMany( + dbcs + .map { c => +// println(c) + (c.contractId, c.templateId, c.key, c.payload, c.agreementText) + } + ) + println("inserted") + import cats.syntax.foldable._, cats.instances.vector._ + val r2 = Update[(String, String)]( + """ + INSERT INTO signatories(contract_id, party) + VALUES (?, ?) + """, + logHandler0 = log, + ).updateMany(dbcs.foldMap(c => c.signatories.view.map(s => (c.contractId, s)).toVector)) + val r3 = Update[(String, String)]( + """ + INSERT INTO observers(contract_id, party) + VALUES (?, ?) + """, + logHandler0 = log, + ).updateMany(dbcs.foldMap(c => c.observers.view.map(s => (c.contractId, s)).toVector)) + r *> r2 *> r3 + } + + private[http] override def selectContractsMultiTemplate[T[_], Mark]( + parties: OneAnd[Set, String], + queries: Seq[(SurrogateTpId, Fragment)], + trackMatchIndices: MatchedQueryMarker[T, Mark], + )(implicit + log: LogHandler, + gvs: Get[Vector[String]], + pvs: Put[Vector[String]], + ): T[Query0[DBContract[Mark, JsValue, JsValue, Vector[String]]]] = { + val Seq((tpid, predicate @ _)) = queries // TODO SC handle more than one + val _ = parties + println("selecting") + val q = sql"""SELECT contract_id, key, payload, agreement_text + FROM contract + WHERE tpid = $tpid""" // TODO SC AND (""" ++ predicate ++ sql")" + trackMatchIndices match { + case MatchedQueryMarker.ByInt => sys.error("TODO websocket Oracle support") + case MatchedQueryMarker.Unused => + q.query[(String, JsValue, JsValue, Option[String])].map { + case (cid, key, payload, agreement) => + DBContract( + contractId = cid, + templateId = tpid, + key = key, + payload = payload, + signatories = Vector(), + observers = Vector(), + agreementText = agreement getOrElse "", + ) + } + } + } +} diff --git a/ledger-service/http-json-oracle/BUILD.bazel b/ledger-service/http-json-oracle/BUILD.bazel new file mode 100644 index 0000000000..ff287e028b --- /dev/null +++ b/ledger-service/http-json-oracle/BUILD.bazel @@ -0,0 +1,90 @@ +# Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +load( + "//bazel_tools:scala.bzl", + "da_scala_binary", + "da_scala_library", + "da_scala_test", + "lf_scalacopts", +) +load("@bazel_skylib//rules:common_settings.bzl", "string_flag") +load("@scala_version//:index.bzl", "scala_version_suffix") + +hj_scalacopts = lf_scalacopts + [ + "-P:wartremover:traverser:org.wartremover.warts.NonUnitStatements", +] + +string_flag( + name = "oracle_testing", + build_setting_default = "no", +) + +config_setting( + name = "oracle_available", + flag_values = { + ":oracle_testing": "yes", + }, +) + +config_setting( + name = "oracle_unavailable", + flag_values = { + ":oracle_testing": "no", + }, +) + +# ERROR: .../daml/ledger-service/http-json-oracle/BUILD.bazel:33:14: no such target +# '@platforms//:incompatible': target 'incompatible' not declared in package '' defined by +# /private/var/tmp/.../85ba8c006f4674ac1f1213a18c0df36a/external/platforms/BUILD and +# referenced by '//ledger-service/http-json-oracle:integration-tests' +constraint_setting(name = "incompatible_setting") + +constraint_value( + name = "incompatible", + constraint_setting = ":incompatible_setting", +) + +da_scala_test( + name = "integration-tests", + size = "large", + srcs = glob(["src/it/scala/**/*.scala"]), + data = [ + "//docs:quickstart-model.dar", + "//ledger-service/http-json:Account.dar", + "//ledger/test-common:model-tests.dar", + "//ledger/test-common/test-certificates", + ], + plugins = [ + "@maven//:org_typelevel_kind_projector_{}".format(scala_version_suffix), + ], + scala_deps = [ + "@maven//:com_chuusai_shapeless", + "@maven//:com_typesafe_akka_akka_http_core", + "@maven//:com_typesafe_scala_logging_scala_logging", + "@maven//:io_spray_spray_json", + "@maven//:org_scalatest_scalatest", + "@maven//:org_scalaz_scalaz_core", + ], + scalacopts = hj_scalacopts, + target_compatible_with = select({ + ":oracle_available": [], + "//conditions:default": [":incompatible"], + }), + runtime_deps = [ + "@maven//:com_oracle_database_jdbc_ojdbc8", + ], + deps = [ + "//daml-lf/data", + "//daml-lf/interface", + "//daml-lf/transaction", + "//language-support/scala/bindings-akka", + "//ledger-api/rs-grpc-bridge", + "//ledger-service/http-json", + "//ledger-service/http-json:integration-tests-lib", + "//ledger-service/http-json-testing", + "//ledger-service/jwt", + "//ledger-service/utils", + "//ledger/ledger-api-common", + ], +) diff --git a/ledger-service/http-json-oracle/src/it/scala/http/HttpServiceOracleInt.scala b/ledger-service/http-json-oracle/src/it/scala/http/HttpServiceOracleInt.scala new file mode 100644 index 0000000000..2c05d8a2fc --- /dev/null +++ b/ledger-service/http-json-oracle/src/it/scala/http/HttpServiceOracleInt.scala @@ -0,0 +1,27 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.http + +import org.scalatest.Inside +import org.scalatest.AsyncTestSuite +import org.scalatest.matchers.should.Matchers + +trait HttpServiceOracleInt extends AbstractHttpServiceIntegrationTestFuns { + this: AsyncTestSuite with Matchers with Inside => + + override final def jdbcConfig: Option[JdbcConfig] = Some(jdbcConfig_) + + import sys.env + private[this] def oraclePort = env("ORACLE_PORT") + private[this] def oraclePwd = env("ORACLE_PWD") + private[this] def oracleUsername = env("ORACLE_USERNAME") + + protected[this] lazy val jdbcConfig_ = JdbcConfig( + driver = "oracle.jdbc.OracleDriver", + url = s"jdbc:oracle:thin:@//localhost:$oraclePort/XEPDB1", + user = oracleUsername, + password = oraclePwd, + createSchema = true, + ) +} diff --git a/ledger-service/http-json-oracle/src/it/scala/http/HttpServiceWithOracleIntTest.scala b/ledger-service/http-json-oracle/src/it/scala/http/HttpServiceWithOracleIntTest.scala new file mode 100644 index 0000000000..61d63c2bdf --- /dev/null +++ b/ledger-service/http-json-oracle/src/it/scala/http/HttpServiceWithOracleIntTest.scala @@ -0,0 +1,44 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.http + +import org.scalatest.Inside +import org.scalatest.freespec.AsyncFreeSpec +import org.scalatest.matchers.should.Matchers +import spray.json.JsValue + +// TODO SC remove `abstract` to reenable +abstract class HttpServiceWithOracleIntTest + extends AbstractHttpServiceIntegrationTest + with HttpServiceOracleInt { + + override def staticContentConfig: Option[StaticContentConfig] = None + + override def wsConfig: Option[WebsocketConfig] = None +} + +// TODO SC this is a small subset of above test, remove when reenabling +class HttpServiceWithOracleIntTestStub + extends AsyncFreeSpec + with Matchers + with Inside + with AbstractHttpServiceIntegrationTestFuns + with HttpServiceOracleInt { + override def staticContentConfig: Option[StaticContentConfig] = None + + override def wsConfig: Option[WebsocketConfig] = None + + override def useTls = HttpServiceTestFixture.UseTls.NoTls + + "query POST with empty query" in withHttpService { (uri, encoder, _) => + searchExpectOk( + List.empty, + jsObject("""{"templateIds": ["Iou:Iou"]}"""), + uri, + encoder, + ).map { acl: List[domain.ActiveContract[JsValue]] => + acl shouldBe empty + } + } +} diff --git a/ledger-service/http-json/BUILD.bazel b/ledger-service/http-json/BUILD.bazel index 90d7b28bf2..7f8800fa39 100644 --- a/ledger-service/http-json/BUILD.bazel +++ b/ledger-service/http-json/BUILD.bazel @@ -123,6 +123,7 @@ da_scala_binary( daml_compile( name = "Account", srcs = ["src/it/daml/Account.daml"], + visibility = ["//ledger-service:__subpackages__"], ) da_scala_test( @@ -169,6 +170,38 @@ da_scala_test( ], ) +da_scala_library( + name = "integration-tests-lib", + srcs = glob(["src/itlib/scala/**/*.scala"]), + scala_deps = [ + "@maven//:com_chuusai_shapeless", + "@maven//:com_typesafe_akka_akka_http_core", + "@maven//:com_typesafe_scala_logging_scala_logging", + "@maven//:io_spray_spray_json", + "@maven//:org_scalacheck_scalacheck", + "@maven//:org_scalactic_scalactic", + "@maven//:org_scalatest_scalatest", + "@maven//:org_scalaz_scalaz_core", + ], + silent_annotations = True, + visibility = ["//ledger-service:__subpackages__"], + deps = [ + ":http-json", + "//bazel_tools/runfiles:scala_runfiles", + "//daml-lf/data", + "//daml-lf/interface", + "//daml-lf/transaction", + "//language-support/scala/bindings-akka", + "//ledger-api/rs-grpc-bridge", + "//ledger-service/http-json-testing", + "//ledger-service/jwt", + "//ledger-service/utils", + "//ledger/ledger-api-auth", + "//ledger/ledger-api-common", + "//libs-scala/ports", + ], +) + da_scala_test( name = "integration-tests", size = "large", @@ -199,9 +232,9 @@ da_scala_test( "@maven//:org_typelevel_cats_free", ], scalacopts = hj_scalacopts, - silent_annotations = True, deps = [ ":http-json", + ":integration-tests-lib", "//bazel_tools/runfiles:scala_runfiles", "//daml-lf/data", "//daml-lf/interface", diff --git a/ledger-service/http-json/src/it/scala/http/HttpServicePostgresInt.scala b/ledger-service/http-json/src/it/scala/http/HttpServicePostgresInt.scala index 985deb102a..4fc704beca 100644 --- a/ledger-service/http-json/src/it/scala/http/HttpServicePostgresInt.scala +++ b/ledger-service/http-json/src/it/scala/http/HttpServicePostgresInt.scala @@ -4,13 +4,12 @@ package com.daml.http import com.daml.testing.postgresql.PostgresAroundAll -import com.typesafe.scalalogging.StrictLogging import org.scalatest.Inside -import org.scalatest.freespec.AsyncFreeSpec +import org.scalatest.AsyncTestSuite import org.scalatest.matchers.should.Matchers trait HttpServicePostgresInt extends AbstractHttpServiceIntegrationTestFuns with PostgresAroundAll { - this: AsyncFreeSpec with Matchers with Inside with StrictLogging => + this: AsyncTestSuite with Matchers with Inside => override final def jdbcConfig: Option[JdbcConfig] = Some(jdbcConfig_) diff --git a/ledger-service/http-json/src/it/scala/http/WebsocketServiceIntegrationTest.scala b/ledger-service/http-json/src/it/scala/http/WebsocketServiceIntegrationTest.scala index 8995bc492b..645d8eab55 100644 --- a/ledger-service/http-json/src/it/scala/http/WebsocketServiceIntegrationTest.scala +++ b/ledger-service/http-json/src/it/scala/http/WebsocketServiceIntegrationTest.scala @@ -3,938 +3,6 @@ package com.daml.http -import akka.NotUsed -import akka.http.scaladsl.Http -import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest} -import akka.http.scaladsl.model.{StatusCodes, Uri} -import akka.stream.{KillSwitches, UniqueKillSwitch} -import akka.stream.scaladsl.{Keep, Sink, Source} -import com.daml.http.json.SprayJson -import com.typesafe.scalalogging.StrictLogging -import org.scalatest._ -import org.scalatest.freespec.AsyncFreeSpec -import org.scalatest.matchers.should.Matchers -import scalaz.std.option._ -import scalaz.std.vector._ -import scalaz.syntax.std.option._ -import scalaz.syntax.tag._ -import scalaz.syntax.traverse._ -import scalaz.{-\/, \/-} -import spray.json.{JsNull, JsObject, JsString, JsValue} - -import scala.concurrent.duration._ -import scala.concurrent.{Await, Future} - -@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements")) -sealed abstract class AbstractWebsocketServiceIntegrationTest - extends AsyncFreeSpec - with Matchers - with Inside - with StrictLogging - with AbstractHttpServiceIntegrationTestFuns - with BeforeAndAfterAll { - - import HttpServiceTestFixture._ - import WebsocketTestFixture._ - - override def staticContentConfig: Option[StaticContentConfig] = None - - override def useTls = UseTls.NoTls - - override def wsConfig: Option[WebsocketConfig] = Some(Config.DefaultWsConfig) - - private val baseQueryInput: Source[Message, NotUsed] = - Source.single(TextMessage.Strict("""{"templateIds": ["Account:Account"]}""")) - - private val fetchRequest = - """[{"templateId": "Account:Account", "key": ["Alice", "abc123"]}]""" - - private val baseFetchInput: Source[Message, NotUsed] = - Source.single(TextMessage.Strict(fetchRequest)) - - List( - SimpleScenario("query", Uri.Path("/v1/stream/query"), baseQueryInput), - SimpleScenario("fetch", Uri.Path("/v1/stream/fetch"), baseFetchInput), - ).foreach { scenario => - s"${scenario.id} request with valid protocol token should allow client subscribe to stream" in withHttpService { - (uri, _, _) => - wsConnectRequest( - uri.copy(scheme = "ws").withPath(scenario.path), - validSubprotocol(jwt), - scenario.input, - )._1 flatMap (x => x.response.status shouldBe StatusCodes.SwitchingProtocols) - } - - s"${scenario.id} request with invalid protocol token should be denied" in withHttpService { - (uri, _, _) => - wsConnectRequest( - uri.copy(scheme = "ws").withPath(scenario.path), - Option("foo"), - scenario.input, - )._1 flatMap (x => x.response.status shouldBe StatusCodes.Unauthorized) - } - - s"${scenario.id} request without protocol token should be denied" in withHttpService { - (uri, _, _) => - wsConnectRequest( - uri.copy(scheme = "ws").withPath(scenario.path), - None, - scenario.input, - )._1 flatMap (x => x.response.status shouldBe StatusCodes.Unauthorized) - } - - s"two ${scenario.id} requests over the same WebSocket connection are NOT allowed" in withHttpService { - (uri, _, _) => - val input = scenario.input.mapConcat(x => List(x, x)) - val webSocketFlow = - Http().webSocketClientFlow( - WebSocketRequest( - uri = uri.copy(scheme = "ws").withPath(scenario.path), - subprotocol = validSubprotocol(jwt), - ) - ) - input - .via(webSocketFlow) - .runWith(collectResultsAsTextMessageSkipOffsetTicks) - .flatMap { msgs => - inside(msgs) { case Seq(errorMsg) => - val error = decodeErrorResponse(errorMsg) - error shouldBe domain.ErrorResponse( - List("Multiple requests over the same WebSocket connection are not allowed."), - None, - StatusCodes.BadRequest, - ) - } - } - } - } - - List( - SimpleScenario( - "query", - Uri.Path("/v1/stream/query"), - Source.single(TextMessage.Strict("""{"templateIds": ["AA:BB"]}""")), - ), - SimpleScenario( - "fetch", - Uri.Path("/v1/stream/fetch"), - Source.single(TextMessage.Strict("""[{"templateId": "AA:BB", "key": ["k", "v"]}]""")), - ), - ).foreach { scenario => - s"${scenario.id} report UnknownTemplateIds and error when cannot resolve any template ID" in withHttpService { - (uri, _, _) => - val webSocketFlow = - Http().webSocketClientFlow( - WebSocketRequest( - uri = uri.copy(scheme = "ws").withPath(scenario.path), - subprotocol = validSubprotocol(jwt), - ) - ) - scenario.input - .via(webSocketFlow) - .runWith(collectResultsAsTextMessageSkipOffsetTicks) - .flatMap { msgs => - inside(msgs) { case Seq(warningMsg, errorMsg) => - val warning = decodeServiceWarning(warningMsg) - inside(warning) { case domain.UnknownTemplateIds(ids) => - ids shouldBe List(domain.TemplateId(None, "AA", "BB")) - } - val error = decodeErrorResponse(errorMsg) - error shouldBe domain.ErrorResponse( - List(ErrorMessages.cannotResolveAnyTemplateId), - None, - StatusCodes.BadRequest, - ) - } - } - } - } - - "query endpoint should publish transactions when command create is completed" in withHttpService { - (uri, _, _) => - for { - _ <- initialIouCreate(uri) - - clientMsg <- singleClientQueryStream( - jwt, - uri, - """{"templateIds": ["Iou:Iou"]}""", - ).take(2) - .runWith(collectResultsAsTextMessage) - } yield inside(clientMsg) { case result +: heartbeats => - result should include(""""issuer":"Alice"""") - result should include(""""amount":"999.99"""") - Inspectors.forAll(heartbeats)(assertHeartbeat) - } - } - - "fetch endpoint should publish transactions when command create is completed" in withHttpService { - (uri, encoder, _) => - for { - _ <- initialAccountCreate(uri, encoder) - - clientMsg <- singleClientFetchStream(jwt, uri, fetchRequest) - .take(2) - .runWith(collectResultsAsTextMessage) - } yield inside(clientMsg) { case result +: heartbeats => - result should include(""""owner":"Alice"""") - result should include(""""number":"abc123"""") - result should not include (""""offset":"""") - Inspectors.forAll(heartbeats)(assertHeartbeat) - } - } - - "query endpoint should warn on unknown template IDs" in withHttpService { (uri, _, _) => - for { - _ <- initialIouCreate(uri) - - clientMsg <- singleClientQueryStream( - jwt, - uri, - """{"templateIds": ["Iou:Iou", "Unknown:Template"]}""", - ).take(3) - .runWith(collectResultsAsTextMessage) - } yield inside(clientMsg) { case warning +: result +: heartbeats => - warning should include("\"warnings\":{\"unknownTemplateIds\":[\"Unk") - result should include("\"issuer\":\"Alice\"") - Inspectors.forAll(heartbeats)(assertHeartbeat) - } - } - - "fetch endpoint should warn on unknown template IDs" in withHttpService { (uri, encoder, _) => - for { - _ <- initialAccountCreate(uri, encoder) - - clientMsg <- singleClientFetchStream( - jwt, - uri, - """[{"templateId": "Account:Account", "key": ["Alice", "abc123"]}, {"templateId": "Unknown:Template", "key": ["Alice", "abc123"]}]""", - ).take(3) - .runWith(collectResultsAsTextMessage) - } yield inside(clientMsg) { case warning +: result +: heartbeats => - warning should include("""{"warnings":{"unknownTemplateIds":["Unk""") - result should include(""""owner":"Alice"""") - result should include(""""number":"abc123"""") - Inspectors.forAll(heartbeats)(assertHeartbeat) - } - } - - "query endpoint should send error msg when receiving malformed message" in withHttpService { - (uri, _, _) => - val clientMsg = singleClientQueryStream(jwt, uri, "{}") - .runWith(collectResultsAsTextMessageSkipOffsetTicks) - - val result = Await.result(clientMsg, 10.seconds) - - result should have size 1 - val errorResponse = decodeErrorResponse(result.head) - errorResponse.status shouldBe StatusCodes.BadRequest - errorResponse.errors should have size 1 - } - - "fetch endpoint should send error msg when receiving malformed message" in withHttpService { - (uri, _, _) => - val clientMsg = singleClientFetchStream(jwt, uri, """[abcdefg!]""") - .runWith(collectResultsAsTextMessageSkipOffsetTicks) - - val result = Await.result(clientMsg, 10.seconds) - - result should have size 1 - val errorResponse = decodeErrorResponse(result.head) - errorResponse.status shouldBe StatusCodes.BadRequest - errorResponse.errors should have size 1 - } - - private def exercisePayload(cid: domain.ContractId, amount: BigDecimal = BigDecimal("42.42")) = { - import json.JsonProtocol._ - import spray.json._ - Map( - "templateId" -> "Iou:Iou".toJson, - "contractId" -> cid.toJson, - "choice" -> "Iou_Split".toJson, - "argument" -> Map("splitAmount" -> amount).toJson, - ).toJson - } - - "query should receive deltas as contracts are archived/created" in withHttpService { - (uri, _, _) => - import spray.json._ - - val initialCreate = initialIouCreate(uri) - - val query = - """[ - {"templateIds": ["Iou:Iou"], "query": {"amount": {"%lte": 50}}}, - {"templateIds": ["Iou:Iou"], "query": {"amount": {"%gt": 50}}}, - {"templateIds": ["Iou:Iou"]} - ]""" - - @com.github.ghik.silencer.silent("evtsWrapper.*never used") - def resp( - iouCid: domain.ContractId, - kill: UniqueKillSwitch, - ): Sink[JsValue, Future[ShouldHaveEnded]] = { - val dslSyntax = Consume.syntax[JsValue] - import dslSyntax._ - Consume - .interpret( - for { - ContractDelta(Vector((ctid, _)), Vector(), None) <- readOne - _ = (ctid: String) shouldBe (iouCid.unwrap: String) - _ <- liftF( - postJsonRequest( - uri.withPath(Uri.Path("/v1/exercise")), - exercisePayload(domain.ContractId(ctid)), - headersWithAuth, - ) map { case (statusCode, _) => - statusCode.isSuccess shouldBe true - } - ) - - ContractDelta(Vector(), _, Some(offset)) <- readOne - - (preOffset, consumedCtid) = (offset, ctid) - evtsWrapper @ ContractDelta( - Vector((fstId, fst), (sndId, snd)), - Vector(observeConsumed), - Some(lastSeenOffset), - ) <- readOne - (liveStartOffset, msgCount) = { - observeConsumed.contractId should ===(consumedCtid) - Set(fstId, sndId, consumedCtid) should have size 3 - inside(evtsWrapper) { case JsObject(obj) => - inside(obj get "events") { - case Some( - JsArray( - Vector( - Archived(_, _), - Created(IouAmount(amt1), MatchedQueries(NumList(ixes1), _)), - Created(IouAmount(amt2), MatchedQueries(NumList(ixes2), _)), - ) - ) - ) => - Set((amt1, ixes1), (amt2, ixes2)) should ===( - Set( - (BigDecimal("42.42"), Vector(BigDecimal(0), BigDecimal(2))), - (BigDecimal("957.57"), Vector(BigDecimal(1), BigDecimal(2))), - ) - ) - } - } - (preOffset, 2) - } - - _ = kill.shutdown() - heartbeats <- drain - hbCount = (heartbeats.iterator.map { - case ContractDelta(Vector(), Vector(), Some(currentOffset)) => currentOffset - }.toSet + lastSeenOffset).size - 1 - } yield - // don't count empty events block if lastSeenOffset does not change - ShouldHaveEnded( - liveStartOffset = liveStartOffset, - msgCount = msgCount + hbCount, - lastSeenOffset = lastSeenOffset, - ) - ) - } - - for { - creation <- initialCreate - _ = creation._1 shouldBe a[StatusCodes.Success] - iouCid = getContractId(getResult(creation._2)) - (kill, source) = singleClientQueryStream(jwt, uri, query) - .viaMat(KillSwitches.single)(Keep.right) - .preMaterialize() - lastState <- source via parseResp runWith resp(iouCid, kill) - liveOffset = inside(lastState) { case ShouldHaveEnded(liveStart, 2, lastSeen) => - lastSeen.unwrap should be > liveStart.unwrap - liveStart - } - rescan <- (singleClientQueryStream(jwt, uri, query, Some(liveOffset)) - via parseResp).take(1) runWith remainingDeltas - } yield inside(rescan) { - case (Vector((fstId, fst @ _), (sndId, snd @ _)), Vector(observeConsumed), Some(_)) => - Set(fstId, sndId, observeConsumed.contractId) should have size 3 - } - } - - "multi-party query should receive deltas as contracts are archived/created" in withHttpService { - (uri, encoder, _) => - import spray.json._ - - val f1 = - postCreateCommand( - accountCreateCommand(domain.Party("Alice"), "abc123"), - encoder, - uri, - headers = headersWithPartyAuth(List("Alice")), - ) - val f2 = - postCreateCommand( - accountCreateCommand(domain.Party("Bob"), "def456"), - encoder, - uri, - headers = headersWithPartyAuth(List("Bob")), - ) - - val query = - """[ - {"templateIds": ["Account:Account"]} - ]""" - - def resp( - cid1: domain.ContractId, - cid2: domain.ContractId, - kill: UniqueKillSwitch, - ): Sink[JsValue, Future[ShouldHaveEnded]] = { - val dslSyntax = Consume.syntax[JsValue] - import dslSyntax._ - Consume.interpret( - for { - Vector((account1, _), (account2, _)) <- readAcsN(2) - _ = Seq(account1, account2) should contain theSameElementsAs Seq(cid1, cid2) - ContractDelta(Vector(), _, Some(liveStartOffset)) <- readOne - _ <- liftF( - postCreateCommand( - accountCreateCommand(domain.Party("Alice"), "abc234"), - encoder, - uri, - headers = headersWithPartyAuth(List("Alice")), - ) - ) - ContractDelta(Vector((_, aliceAccount)), _, Some(_)) <- readOne - _ = inside(aliceAccount) { case JsObject(obj) => - inside((obj get "owner", obj get "number")) { - case (Some(JsString(owner)), Some(JsString(number))) => - owner shouldBe "Alice" - number shouldBe "abc234" - } - } - _ <- liftF( - postCreateCommand( - accountCreateCommand(domain.Party("Bob"), "def567"), - encoder, - uri, - headers = headersWithPartyAuth(List("Bob")), - ) - ) - ContractDelta(Vector((_, bobAccount)), _, Some(lastSeenOffset)) <- readOne - _ = inside(bobAccount) { case JsObject(obj) => - inside((obj get "owner", obj get "number")) { - case (Some(JsString(owner)), Some(JsString(number))) => - owner shouldBe "Bob" - number shouldBe "def567" - } - } - _ = kill.shutdown() - heartbeats <- drain - hbCount = (heartbeats.iterator.map { - case ContractDelta(Vector(), Vector(), Some(currentOffset)) => currentOffset - }.toSet + lastSeenOffset).size - 1 - } yield ( - // don't count empty events block if lastSeenOffset does not change - ShouldHaveEnded( - liveStartOffset = liveStartOffset, - msgCount = 5 + hbCount, - lastSeenOffset = lastSeenOffset, - ), - ) - ) - } - - for { - r1 <- f1 - _ = r1._1 shouldBe a[StatusCodes.Success] - cid1 = getContractId(getResult(r1._2)) - - r2 <- f2 - _ = r2._1 shouldBe a[StatusCodes.Success] - cid2 = getContractId(getResult(r2._2)) - - (kill, source) = singleClientQueryStream( - jwtForParties(List("Alice", "Bob"), List(), testId), - uri, - query, - ).viaMat(KillSwitches.single)(Keep.right).preMaterialize() - lastState <- source via parseResp runWith resp(cid1, cid2, kill) - liveOffset = inside(lastState) { case ShouldHaveEnded(liveStart, 5, lastSeen) => - lastSeen.unwrap should be > liveStart.unwrap - liveStart - } - rescan <- (singleClientQueryStream(jwt, uri, query, Some(liveOffset)) - via parseResp).take(1) runWith remainingDeltas - } yield inside(rescan) { case (Vector(_), _, Some(_)) => - succeed - } - } - - "fetch should receive deltas as contracts are archived/created, filtering out phantom archives" in withHttpService { - (uri, encoder, _) => - val templateId = domain.TemplateId(None, "Account", "Account") - def fetchRequest(contractIdAtOffset: Option[Option[domain.ContractId]] = None) = { - import spray.json._, json.JsonProtocol._ - List( - Map("templateId" -> "Account:Account".toJson, "key" -> List("Alice", "abc123").toJson) - ++ contractIdAtOffset - .map(ocid => contractIdAtOffsetKey -> ocid.toJson) - .toList - ).toJson.compactPrint - } - val f1 = - postCreateCommand(accountCreateCommand(domain.Party("Alice"), "abc123"), encoder, uri) - val f2 = - postCreateCommand(accountCreateCommand(domain.Party("Alice"), "def456"), encoder, uri) - - def resp( - cid1: domain.ContractId, - cid2: domain.ContractId, - kill: UniqueKillSwitch, - ): Sink[JsValue, Future[ShouldHaveEnded]] = { - val dslSyntax = Consume.syntax[JsValue] - import dslSyntax._ - Consume.interpret( - for { - ContractDelta(Vector((cid, c)), Vector(), None) <- readOne - _ = (cid: String) shouldBe (cid1.unwrap: String) - ctid <- liftF(postArchiveCommand(templateId, cid2, encoder, uri).flatMap { - case (statusCode, _) => - statusCode.isSuccess shouldBe true - postArchiveCommand(templateId, cid1, encoder, uri).map { case (statusCode, _) => - statusCode.isSuccess shouldBe true - cid - } - }) - - ContractDelta(Vector(), _, Some(offset)) <- readOne - (off, archivedCid) = (offset, ctid) - - ContractDelta(Vector(), Vector(observeArchivedCid), Some(lastSeenOffset)) <- readOne - (liveStartOffset, msgCount) = { - (observeArchivedCid.contractId.unwrap: String) shouldBe (archivedCid: String) - (observeArchivedCid.contractId: domain.ContractId) shouldBe (cid1: domain.ContractId) - (off, 0) - } - - _ = kill.shutdown() - heartbeats <- drain - hbCount = (heartbeats.iterator.map { - case ContractDelta(Vector(), Vector(), Some(currentOffset)) => currentOffset - }.toSet + lastSeenOffset).size - 1 - - } yield - // don't count empty events block if lastSeenOffset does not change - ShouldHaveEnded( - liveStartOffset = liveStartOffset, - msgCount = msgCount + hbCount, - lastSeenOffset = lastSeenOffset, - ) - ) - } - - for { - r1 <- f1 - _ = r1._1 shouldBe a[StatusCodes.Success] - cid1 = getContractId(getResult(r1._2)) - - r2 <- f2 - _ = r2._1 shouldBe a[StatusCodes.Success] - cid2 = getContractId(getResult(r2._2)) - - (kill, source) = singleClientFetchStream(jwt, uri, fetchRequest()) - .viaMat(KillSwitches.single)(Keep.right) - .preMaterialize() - - lastState <- source - .via(parseResp) runWith resp(cid1, cid2, kill) - - liveOffset = inside(lastState) { case ShouldHaveEnded(liveStart, 0, lastSeen) => - lastSeen.unwrap should be > liveStart.unwrap - liveStart - } - - // check contractIdAtOffsets' effects on phantom filtering - resumes <- Future.traverse(Seq((None, 2L), (Some(None), 0L), (Some(Some(cid1)), 1L))) { - case (abcHint, expectArchives) => - (singleClientFetchStream( - jwt, - uri, - fetchRequest(abcHint), - Some(liveOffset), - ) - via parseResp) - .take(2) - .runWith(remainingDeltas) - .map { case (creates, archives, _) => - creates shouldBe empty - archives should have size expectArchives - } - } - - } yield resumes.foldLeft(1 shouldBe 1)((_, a) => a) - } - - "fetch multiple keys should work" in withHttpService { (uri, encoder, _) => - def create(account: String): Future[domain.ContractId] = - for { - r <- postCreateCommand(accountCreateCommand(domain.Party("Alice"), account), encoder, uri) - } yield { - assert(r._1.isSuccess) - getContractId(getResult(r._2)) - } - def archive(id: domain.ContractId): Future[Assertion] = - for { - r <- postArchiveCommand(domain.TemplateId(None, "Account", "Account"), id, encoder, uri) - } yield { - assert(r._1.isSuccess) - } - val req = - """ - |[{"templateId": "Account:Account", "key": ["Alice", "abc123"]}, - | {"templateId": "Account:Account", "key": ["Alice", "def456"]}] - |""".stripMargin - val futureResults = - singleClientFetchStream(jwt, uri, req) - .via(parseResp) - .filterNot(isOffsetTick) - .take(4) - .runWith(Sink.seq[JsValue]) - - for { - cid1 <- create("abc123") - _ <- create("abc124") - _ <- create("abc125") - cid2 <- create("def456") - _ <- archive(cid2) - _ <- archive(cid1) - results <- futureResults - } yield { - val expected: Seq[JsValue] = { - import spray.json._ - Seq( - """ - |{"events":[{"created":{"payload":{"number":"abc123"}}}]} - |""".stripMargin.parseJson, - """ - |{"events":[{"created":{"payload":{"number":"def456"}}}]} - |""".stripMargin.parseJson, - """ - |{"events":[{"archived":{}}]} - |""".stripMargin.parseJson, - """ - |{"events":[{"archived":{}}]} - |""".stripMargin.parseJson, - ) - } - results should matchJsValues(expected) - - } - } - - "multi-party fetch-by-key query should receive deltas as contracts are archived/created" in withHttpService { - (uri, encoder, _) => - import spray.json._ - - val templateId = domain.TemplateId(None, "Account", "Account") - - val f1 = - postCreateCommand( - accountCreateCommand(domain.Party("Alice"), "abc123"), - encoder, - uri, - headers = headersWithPartyAuth(List("Alice")), - ) - val f2 = - postCreateCommand( - accountCreateCommand(domain.Party("Bob"), "def456"), - encoder, - uri, - headers = headersWithPartyAuth(List("Bob")), - ) - - val query = - """[ - {"templateId": "Account:Account", "key": ["Alice", "abc123"]}, - {"templateId": "Account:Account", "key": ["Bob", "def456"]} - ]""" - - def resp( - cid1: domain.ContractId, - cid2: domain.ContractId, - kill: UniqueKillSwitch, - ): Sink[JsValue, Future[ShouldHaveEnded]] = { - val dslSyntax = Consume.syntax[JsValue] - import dslSyntax._ - Consume.interpret( - for { - Vector((account1, _), (account2, _)) <- readAcsN(2) - _ = Seq(account1, account2) should contain theSameElementsAs Seq(cid1, cid2) - ContractDelta(Vector(), _, Some(liveStartOffset)) <- readOne - _ <- liftF(postArchiveCommand(templateId, cid1, encoder, uri)) - ContractDelta(Vector(), Vector(archivedCid1), Some(_)) <- readOne - _ = archivedCid1.contractId shouldBe cid1 - _ <- liftF( - postArchiveCommand( - templateId, - cid2, - encoder, - uri, - headers = headersWithPartyAuth(List("Bob")), - ) - ) - ContractDelta(Vector(), Vector(archivedCid2), Some(lastSeenOffset)) <- readOne - _ = archivedCid2.contractId shouldBe cid2 - _ = kill.shutdown() - heartbeats <- drain - hbCount = (heartbeats.iterator.map { - case ContractDelta(Vector(), Vector(), Some(currentOffset)) => currentOffset - }.toSet + lastSeenOffset).size - 1 - } yield ( - // don't count empty events block if lastSeenOffset does not change - ShouldHaveEnded( - liveStartOffset = liveStartOffset, - msgCount = 5 + hbCount, - lastSeenOffset = lastSeenOffset, - ), - ) - ) - } - - for { - r1 <- f1 - _ = r1._1 shouldBe a[StatusCodes.Success] - cid1 = getContractId(getResult(r1._2)) - - r2 <- f2 - _ = r2._1 shouldBe a[StatusCodes.Success] - cid2 = getContractId(getResult(r2._2)) - - (kill, source) = singleClientFetchStream( - jwtForParties(List("Alice", "Bob"), List(), testId), - uri, - query, - ).viaMat(KillSwitches.single)(Keep.right).preMaterialize() - lastState <- source via parseResp runWith resp(cid1, cid2, kill) - liveOffset = inside(lastState) { case ShouldHaveEnded(liveStart, 5, lastSeen) => - lastSeen.unwrap should be > liveStart.unwrap - liveStart - } - rescan <- (singleClientFetchStream( - jwtForParties(List("Alice", "Bob"), List(), testId), - uri, - query, - Some(liveOffset), - ) - via parseResp).take(2) runWith remainingDeltas - } yield inside(rescan) { case (Vector(), Vector(_, _), Some(_)) => - succeed - } - } - - /** Consume ACS blocks expecting `createCount` contracts. Fail if there - * are too many contracts. - */ - private[this] def readAcsN(createCount: Int): Consume.FCC[JsValue, Vector[(String, JsValue)]] = { - val dslSyntax = Consume.syntax[JsValue] - import dslSyntax._ - def go(createCount: Int): Consume.FCC[JsValue, Vector[(String, JsValue)]] = - if (createCount <= 0) point(Vector.empty) - else - for { - ContractDelta(creates, Vector(), None) <- readOne - found = creates.size - if found <= createCount - tail <- if (found < createCount) go(createCount - found) else point(Vector.empty) - } yield creates ++ tail - go(createCount) - } - - "fetch should should return an error if empty list of (templateId, key) pairs is passed" in withHttpService { - (uri, _, _) => - singleClientFetchStream(jwt, uri, "[]") - .runWith(collectResultsAsTextMessageSkipOffsetTicks) - .map { clientMsgs => - inside(clientMsgs) { case Seq(errorMsg) => - val errorResponse = decodeErrorResponse(errorMsg) - errorResponse.status shouldBe StatusCodes.BadRequest - inside(errorResponse.errors) { case List(error) => - error should include("must be a JSON array with at least 1 element") - } - } - }: Future[Assertion] - } - - "query on a bunch of random splits should yield consistent results" in withHttpService { - (uri, _, _) => - val splitSample = SplitSeq.gen.map(_ map (BigDecimal(_))).sample.get - val query = - """[ - {"templateIds": ["Iou:Iou"]} - ]""" - val (kill, source) = singleClientQueryStream(jwt, uri, query) - .viaMat(KillSwitches.single)(Keep.right) - .preMaterialize() - source - .via(parseResp) - .map(iouSplitResult) - .filterNot(_ == \/-((Vector(), Vector()))) // liveness marker/heartbeat - .runWith(Consume.interpret(trialSplitSeq(uri, splitSample, kill))) - } - - private def trialSplitSeq( - serviceUri: Uri, - ss: SplitSeq[BigDecimal], - kill: UniqueKillSwitch, - ): Consume.FCC[IouSplitResult, Assertion] = { - val dslSyntax = Consume.syntax[IouSplitResult] - import SplitSeq._ - import dslSyntax._ - def go( - createdCid: domain.ContractId, - ss: SplitSeq[BigDecimal], - ): Consume.FCC[IouSplitResult, Assertion] = ss match { - case Leaf(_) => - point(1 shouldBe 1) - case Node(_, l, r) => - for { - (StatusCodes.OK, _) <- liftF( - postJsonRequest( - serviceUri.withPath(Uri.Path("/v1/exercise")), - exercisePayload(createdCid, l.x), - headersWithAuth, - ) - ) - - \/-((Vector((cid1, amt1), (cid2, amt2)), Vector(archival))) <- readOne - (lCid, rCid) = { - archival should ===(createdCid) - Set(amt1, amt2) should ===(Set(l.x, r.x)) - if (amt1 == l.x) (cid1, cid2) else (cid2, cid1) - } - - _ <- go(lCid, l) - last <- go(rCid, r) - } yield last - } - - val initialPayload = { - import spray.json._, json.JsonProtocol._ - Map( - "templateId" -> "Iou:Iou".toJson, - "payload" -> Map( - "observers" -> List[String]().toJson, - "issuer" -> "Alice".toJson, - "amount" -> ss.x.toJson, - "currency" -> "USD".toJson, - "owner" -> "Alice".toJson, - ).toJson, - ).toJson - } - for { - (StatusCodes.OK, _) <- liftF( - postJsonRequest( - serviceUri.withPath(Uri.Path("/v1/create")), - initialPayload, - headersWithAuth, - ) - ) - \/-((Vector((genesisCid, amt)), Vector())) <- readOne - _ = amt should ===(ss.x) - last <- go(genesisCid, ss) - _ = kill.shutdown() - } yield last - } - - private def iouSplitResult(jsv: JsValue): IouSplitResult = jsv match { - case ContractDelta(creates, archives, _) => - creates traverse { - case (cid, JsObject(fields)) => - fields get "amount" collect { case JsString(amt) => - (domain.ContractId(cid), BigDecimal(amt)) - } - case _ => None - } map ((_, archives map (_.contractId))) toRightDisjunction jsv - case _ => -\/(jsv) - } - - "ContractKeyStreamRequest" - { - import spray.json._, json.JsonProtocol._ - val baseVal = - domain.EnrichedContractKey(domain.TemplateId(Some("ab"), "cd", "ef"), JsString("42"): JsValue) - val baseMap = baseVal.toJson.asJsObject.fields - val withSome = JsObject(baseMap + (contractIdAtOffsetKey -> JsString("hi"))) - val withNone = JsObject(baseMap + (contractIdAtOffsetKey -> JsNull)) - - "initial JSON reader" - { - type T = domain.ContractKeyStreamRequest[Unit, JsValue] - - "shares EnrichedContractKey format" in { - JsObject(baseMap).convertTo[T] should ===(domain.ContractKeyStreamRequest((), baseVal)) - } - - "errors on contractIdAtOffset presence" in { - a[DeserializationException] shouldBe thrownBy { - withSome.convertTo[T] - } - a[DeserializationException] shouldBe thrownBy { - withNone.convertTo[T] - } - } - } - - "resuming JSON reader" - { - type T = domain.ContractKeyStreamRequest[Option[Option[domain.ContractId]], JsValue] - - "shares EnrichedContractKey format" in { - JsObject(baseMap).convertTo[T] should ===(domain.ContractKeyStreamRequest(None, baseVal)) - } - - "distinguishes null and string" in { - withSome.convertTo[T] should ===(domain.ContractKeyStreamRequest(Some(Some("hi")), baseVal)) - withNone.convertTo[T] should ===(domain.ContractKeyStreamRequest(Some(None), baseVal)) - } - } - } - - private def wsConnectRequest[M]( - uri: Uri, - subprotocol: Option[String], - input: Source[Message, NotUsed], - ) = - Http().singleWebSocketRequest( - request = WebSocketRequest(uri = uri, subprotocol = subprotocol), - clientFlow = dummyFlow(input), - ) - - private def assertHeartbeat(str: String): Assertion = - inside( - SprayJson - .decode[EventsBlock](str) - ) { case \/-(eventsBlock) => - eventsBlock.events shouldBe Vector.empty[JsValue] - inside(eventsBlock.offset) { - case Some(JsString(offset)) => - offset.length should be > 0 - case Some(JsNull) => - Succeeded - } - } - - private def decodeErrorResponse(str: String): domain.ErrorResponse = { - import json.JsonProtocol._ - inside(SprayJson.decode[domain.ErrorResponse](str)) { case \/-(e) => - e - } - } - - private def decodeServiceWarning(str: String): domain.ServiceWarning = { - import json.JsonProtocol._ - inside(SprayJson.decode[domain.AsyncWarningsWrapper](str)) { case \/-(w) => - w.warnings - } - } -} - final class WebsocketServiceIntegrationTest extends AbstractWebsocketServiceIntegrationTest { override def jdbcConfig = None } @@ -942,3 +10,9 @@ final class WebsocketServiceIntegrationTest extends AbstractWebsocketServiceInte final class WebsocketServiceWithPostgresIntTest extends AbstractWebsocketServiceIntegrationTest with HttpServicePostgresInt + +/* TODO SC +final class WebsocketServiceWithOracleIntTest + extends AbstractWebsocketServiceIntegrationTest + with HttpServiceOracleInt + */ diff --git a/ledger-service/http-json/src/it/scala/http/AbstractHttpServiceIntegrationTest.scala b/ledger-service/http-json/src/itlib/scala/http/AbstractHttpServiceIntegrationTest.scala similarity index 99% rename from ledger-service/http-json/src/it/scala/http/AbstractHttpServiceIntegrationTest.scala rename to ledger-service/http-json/src/itlib/scala/http/AbstractHttpServiceIntegrationTest.scala index ef61f1ae14..e18a2cfaf8 100644 --- a/ledger-service/http-json/src/it/scala/http/AbstractHttpServiceIntegrationTest.scala +++ b/ledger-service/http-json/src/itlib/scala/http/AbstractHttpServiceIntegrationTest.scala @@ -72,7 +72,7 @@ object AbstractHttpServiceIntegrationTestFuns { @SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements")) trait AbstractHttpServiceIntegrationTestFuns extends StrictLogging { - this: AsyncFreeSpec with Matchers with Inside with StrictLogging => + this: AsyncTestSuite with Matchers with Inside => import AbstractHttpServiceIntegrationTestFuns._ import json.JsonProtocol._ import HttpServiceTestFixture._ @@ -475,6 +475,52 @@ trait AbstractHttpServiceIntegrationTestFuns extends StrictLogging { val command = accountCreateCommand(domain.Party("Alice"), "abc123") postCreateCommand(command, encoder, serviceUri) } + + protected def jsObject(s: String): JsObject = { + val r: JsonError \/ JsObject = for { + jsVal <- SprayJson.parse(s).leftMap(e => JsonError(e.shows)) + jsObj <- SprayJson.mustBeJsObject(jsVal) + } yield jsObj + r.valueOr(e => fail(e.shows)) + } + + protected def searchExpectOk( + commands: List[domain.CreateCommand[v.Record]], + query: JsObject, + uri: Uri, + encoder: DomainJsonEncoder, + headers: List[HttpHeader] = headersWithAuth, + ): Future[List[domain.ActiveContract[JsValue]]] = { + search(commands, query, uri, encoder, headers).map(expectOk(_)) + } + + protected def search( + commands: List[domain.CreateCommand[v.Record]], + query: JsObject, + uri: Uri, + encoder: DomainJsonEncoder, + headers: List[HttpHeader] = headersWithAuth, + ): Future[ + domain.SyncResponse[List[domain.ActiveContract[JsValue]]] + ] = { + commands.traverse(c => postCreateCommand(c, encoder, uri, headers)).flatMap { rs => + rs.map(_._1) shouldBe List.fill(commands.size)(StatusCodes.OK) + postJsonRequest(uri.withPath(Uri.Path("/v1/query")), query, headers).flatMap { + case (_, output) => + FutureUtil + .toFuture(decode1[domain.SyncResponse, List[domain.ActiveContract[JsValue]]](output)) + } + } + } + + private[http] def expectOk[R](resp: domain.SyncResponse[R]): R = resp match { + case ok: domain.OkResponse[_] => + ok.status shouldBe StatusCodes.OK + ok.warnings shouldBe empty + ok.result + case err: domain.ErrorResponse => + fail(s"Expected OK response, got: $err") + } } @SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements")) @@ -717,52 +763,6 @@ abstract class AbstractHttpServiceIntegrationTest }: Future[Assertion] } - protected def jsObject(s: String): JsObject = { - val r: JsonError \/ JsObject = for { - jsVal <- SprayJson.parse(s).leftMap(e => JsonError(e.shows)) - jsObj <- SprayJson.mustBeJsObject(jsVal) - } yield jsObj - r.valueOr(e => fail(e.shows)) - } - - private def expectOk[R](resp: domain.SyncResponse[R]): R = resp match { - case ok: domain.OkResponse[_] => - ok.status shouldBe StatusCodes.OK - ok.warnings shouldBe empty - ok.result - case err: domain.ErrorResponse => - fail(s"Expected OK response, got: $err") - } - - protected def searchExpectOk( - commands: List[domain.CreateCommand[v.Record]], - query: JsObject, - uri: Uri, - encoder: DomainJsonEncoder, - headers: List[HttpHeader] = headersWithAuth, - ): Future[List[domain.ActiveContract[JsValue]]] = { - search(commands, query, uri, encoder, headers).map(expectOk(_)) - } - - protected def search( - commands: List[domain.CreateCommand[v.Record]], - query: JsObject, - uri: Uri, - encoder: DomainJsonEncoder, - headers: List[HttpHeader] = headersWithAuth, - ): Future[ - domain.SyncResponse[List[domain.ActiveContract[JsValue]]] - ] = { - commands.traverse(c => postCreateCommand(c, encoder, uri, headers)).flatMap { rs => - rs.map(_._1) shouldBe List.fill(commands.size)(StatusCodes.OK) - postJsonRequest(uri.withPath(Uri.Path("/v1/query")), query, headers).flatMap { - case (_, output) => - FutureUtil - .toFuture(decode1[domain.SyncResponse, List[domain.ActiveContract[JsValue]]](output)) - } - } - } - protected def searchAllExpectOk( uri: Uri, headers: List[HttpHeader] = headersWithAuth, diff --git a/ledger-service/http-json/src/itlib/scala/http/AbstractWebsocketServiceIntegrationTest.scala b/ledger-service/http-json/src/itlib/scala/http/AbstractWebsocketServiceIntegrationTest.scala new file mode 100644 index 0000000000..28dec1256a --- /dev/null +++ b/ledger-service/http-json/src/itlib/scala/http/AbstractWebsocketServiceIntegrationTest.scala @@ -0,0 +1,936 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.http + +import akka.NotUsed +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest} +import akka.http.scaladsl.model.{StatusCodes, Uri} +import akka.stream.{KillSwitches, UniqueKillSwitch} +import akka.stream.scaladsl.{Keep, Sink, Source} +import com.daml.http.json.SprayJson +import com.typesafe.scalalogging.StrictLogging +import org.scalatest._ +import org.scalatest.freespec.AsyncFreeSpec +import org.scalatest.matchers.should.Matchers +import scalaz.std.option._ +import scalaz.std.vector._ +import scalaz.syntax.std.option._ +import scalaz.syntax.tag._ +import scalaz.syntax.traverse._ +import scalaz.{-\/, \/-} +import spray.json.{JsNull, JsObject, JsString, JsValue} + +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} + +@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements")) +abstract class AbstractWebsocketServiceIntegrationTest + extends AsyncFreeSpec + with Matchers + with Inside + with StrictLogging + with AbstractHttpServiceIntegrationTestFuns + with BeforeAndAfterAll { + + import HttpServiceTestFixture._ + import WebsocketTestFixture._ + + override def staticContentConfig: Option[StaticContentConfig] = None + + override def useTls = UseTls.NoTls + + override def wsConfig: Option[WebsocketConfig] = Some(Config.DefaultWsConfig) + + private val baseQueryInput: Source[Message, NotUsed] = + Source.single(TextMessage.Strict("""{"templateIds": ["Account:Account"]}""")) + + private val fetchRequest = + """[{"templateId": "Account:Account", "key": ["Alice", "abc123"]}]""" + + private val baseFetchInput: Source[Message, NotUsed] = + Source.single(TextMessage.Strict(fetchRequest)) + + List( + SimpleScenario("query", Uri.Path("/v1/stream/query"), baseQueryInput), + SimpleScenario("fetch", Uri.Path("/v1/stream/fetch"), baseFetchInput), + ).foreach { scenario => + s"${scenario.id} request with valid protocol token should allow client subscribe to stream" in withHttpService { + (uri, _, _) => + wsConnectRequest( + uri.copy(scheme = "ws").withPath(scenario.path), + validSubprotocol(jwt), + scenario.input, + )._1 flatMap (x => x.response.status shouldBe StatusCodes.SwitchingProtocols) + } + + s"${scenario.id} request with invalid protocol token should be denied" in withHttpService { + (uri, _, _) => + wsConnectRequest( + uri.copy(scheme = "ws").withPath(scenario.path), + Option("foo"), + scenario.input, + )._1 flatMap (x => x.response.status shouldBe StatusCodes.Unauthorized) + } + + s"${scenario.id} request without protocol token should be denied" in withHttpService { + (uri, _, _) => + wsConnectRequest( + uri.copy(scheme = "ws").withPath(scenario.path), + None, + scenario.input, + )._1 flatMap (x => x.response.status shouldBe StatusCodes.Unauthorized) + } + + s"two ${scenario.id} requests over the same WebSocket connection are NOT allowed" in withHttpService { + (uri, _, _) => + val input = scenario.input.mapConcat(x => List(x, x)) + val webSocketFlow = + Http().webSocketClientFlow( + WebSocketRequest( + uri = uri.copy(scheme = "ws").withPath(scenario.path), + subprotocol = validSubprotocol(jwt), + ) + ) + input + .via(webSocketFlow) + .runWith(collectResultsAsTextMessageSkipOffsetTicks) + .flatMap { msgs => + inside(msgs) { case Seq(errorMsg) => + val error = decodeErrorResponse(errorMsg) + error shouldBe domain.ErrorResponse( + List("Multiple requests over the same WebSocket connection are not allowed."), + None, + StatusCodes.BadRequest, + ) + } + } + } + } + + List( + SimpleScenario( + "query", + Uri.Path("/v1/stream/query"), + Source.single(TextMessage.Strict("""{"templateIds": ["AA:BB"]}""")), + ), + SimpleScenario( + "fetch", + Uri.Path("/v1/stream/fetch"), + Source.single(TextMessage.Strict("""[{"templateId": "AA:BB", "key": ["k", "v"]}]""")), + ), + ).foreach { scenario => + s"${scenario.id} report UnknownTemplateIds and error when cannot resolve any template ID" in withHttpService { + (uri, _, _) => + val webSocketFlow = + Http().webSocketClientFlow( + WebSocketRequest( + uri = uri.copy(scheme = "ws").withPath(scenario.path), + subprotocol = validSubprotocol(jwt), + ) + ) + scenario.input + .via(webSocketFlow) + .runWith(collectResultsAsTextMessageSkipOffsetTicks) + .flatMap { msgs => + inside(msgs) { case Seq(warningMsg, errorMsg) => + val warning = decodeServiceWarning(warningMsg) + inside(warning) { case domain.UnknownTemplateIds(ids) => + ids shouldBe List(domain.TemplateId(None, "AA", "BB")) + } + val error = decodeErrorResponse(errorMsg) + error shouldBe domain.ErrorResponse( + List(ErrorMessages.cannotResolveAnyTemplateId), + None, + StatusCodes.BadRequest, + ) + } + } + } + } + + "query endpoint should publish transactions when command create is completed" in withHttpService { + (uri, _, _) => + for { + _ <- initialIouCreate(uri) + + clientMsg <- singleClientQueryStream( + jwt, + uri, + """{"templateIds": ["Iou:Iou"]}""", + ).take(2) + .runWith(collectResultsAsTextMessage) + } yield inside(clientMsg) { case result +: heartbeats => + result should include(""""issuer":"Alice"""") + result should include(""""amount":"999.99"""") + Inspectors.forAll(heartbeats)(assertHeartbeat) + } + } + + "fetch endpoint should publish transactions when command create is completed" in withHttpService { + (uri, encoder, _) => + for { + _ <- initialAccountCreate(uri, encoder) + + clientMsg <- singleClientFetchStream(jwt, uri, fetchRequest) + .take(2) + .runWith(collectResultsAsTextMessage) + } yield inside(clientMsg) { case result +: heartbeats => + result should include(""""owner":"Alice"""") + result should include(""""number":"abc123"""") + result should not include (""""offset":"""") + Inspectors.forAll(heartbeats)(assertHeartbeat) + } + } + + "query endpoint should warn on unknown template IDs" in withHttpService { (uri, _, _) => + for { + _ <- initialIouCreate(uri) + + clientMsg <- singleClientQueryStream( + jwt, + uri, + """{"templateIds": ["Iou:Iou", "Unknown:Template"]}""", + ).take(3) + .runWith(collectResultsAsTextMessage) + } yield inside(clientMsg) { case warning +: result +: heartbeats => + warning should include("\"warnings\":{\"unknownTemplateIds\":[\"Unk") + result should include("\"issuer\":\"Alice\"") + Inspectors.forAll(heartbeats)(assertHeartbeat) + } + } + + "fetch endpoint should warn on unknown template IDs" in withHttpService { (uri, encoder, _) => + for { + _ <- initialAccountCreate(uri, encoder) + + clientMsg <- singleClientFetchStream( + jwt, + uri, + """[{"templateId": "Account:Account", "key": ["Alice", "abc123"]}, {"templateId": "Unknown:Template", "key": ["Alice", "abc123"]}]""", + ).take(3) + .runWith(collectResultsAsTextMessage) + } yield inside(clientMsg) { case warning +: result +: heartbeats => + warning should include("""{"warnings":{"unknownTemplateIds":["Unk""") + result should include(""""owner":"Alice"""") + result should include(""""number":"abc123"""") + Inspectors.forAll(heartbeats)(assertHeartbeat) + } + } + + "query endpoint should send error msg when receiving malformed message" in withHttpService { + (uri, _, _) => + val clientMsg = singleClientQueryStream(jwt, uri, "{}") + .runWith(collectResultsAsTextMessageSkipOffsetTicks) + + val result = Await.result(clientMsg, 10.seconds) + + result should have size 1 + val errorResponse = decodeErrorResponse(result.head) + errorResponse.status shouldBe StatusCodes.BadRequest + errorResponse.errors should have size 1 + } + + "fetch endpoint should send error msg when receiving malformed message" in withHttpService { + (uri, _, _) => + val clientMsg = singleClientFetchStream(jwt, uri, """[abcdefg!]""") + .runWith(collectResultsAsTextMessageSkipOffsetTicks) + + val result = Await.result(clientMsg, 10.seconds) + + result should have size 1 + val errorResponse = decodeErrorResponse(result.head) + errorResponse.status shouldBe StatusCodes.BadRequest + errorResponse.errors should have size 1 + } + + private def exercisePayload(cid: domain.ContractId, amount: BigDecimal = BigDecimal("42.42")) = { + import json.JsonProtocol._ + import spray.json._ + Map( + "templateId" -> "Iou:Iou".toJson, + "contractId" -> cid.toJson, + "choice" -> "Iou_Split".toJson, + "argument" -> Map("splitAmount" -> amount).toJson, + ).toJson + } + + "query should receive deltas as contracts are archived/created" in withHttpService { + (uri, _, _) => + import spray.json._ + + val initialCreate = initialIouCreate(uri) + + val query = + """[ + {"templateIds": ["Iou:Iou"], "query": {"amount": {"%lte": 50}}}, + {"templateIds": ["Iou:Iou"], "query": {"amount": {"%gt": 50}}}, + {"templateIds": ["Iou:Iou"]} + ]""" + + @com.github.ghik.silencer.silent("evtsWrapper.*never used") + def resp( + iouCid: domain.ContractId, + kill: UniqueKillSwitch, + ): Sink[JsValue, Future[ShouldHaveEnded]] = { + val dslSyntax = Consume.syntax[JsValue] + import dslSyntax._ + Consume + .interpret( + for { + ContractDelta(Vector((ctid, _)), Vector(), None) <- readOne + _ = (ctid: String) shouldBe (iouCid.unwrap: String) + _ <- liftF( + postJsonRequest( + uri.withPath(Uri.Path("/v1/exercise")), + exercisePayload(domain.ContractId(ctid)), + headersWithAuth, + ) map { case (statusCode, _) => + statusCode.isSuccess shouldBe true + } + ) + + ContractDelta(Vector(), _, Some(offset)) <- readOne + + (preOffset, consumedCtid) = (offset, ctid) + evtsWrapper @ ContractDelta( + Vector((fstId, fst), (sndId, snd)), + Vector(observeConsumed), + Some(lastSeenOffset), + ) <- readOne + (liveStartOffset, msgCount) = { + observeConsumed.contractId should ===(consumedCtid) + Set(fstId, sndId, consumedCtid) should have size 3 + inside(evtsWrapper) { case JsObject(obj) => + inside(obj get "events") { + case Some( + JsArray( + Vector( + Archived(_, _), + Created(IouAmount(amt1), MatchedQueries(NumList(ixes1), _)), + Created(IouAmount(amt2), MatchedQueries(NumList(ixes2), _)), + ) + ) + ) => + Set((amt1, ixes1), (amt2, ixes2)) should ===( + Set( + (BigDecimal("42.42"), Vector(BigDecimal(0), BigDecimal(2))), + (BigDecimal("957.57"), Vector(BigDecimal(1), BigDecimal(2))), + ) + ) + } + } + (preOffset, 2) + } + + _ = kill.shutdown() + heartbeats <- drain + hbCount = (heartbeats.iterator.map { + case ContractDelta(Vector(), Vector(), Some(currentOffset)) => currentOffset + }.toSet + lastSeenOffset).size - 1 + } yield + // don't count empty events block if lastSeenOffset does not change + ShouldHaveEnded( + liveStartOffset = liveStartOffset, + msgCount = msgCount + hbCount, + lastSeenOffset = lastSeenOffset, + ) + ) + } + + for { + creation <- initialCreate + _ = creation._1 shouldBe a[StatusCodes.Success] + iouCid = getContractId(getResult(creation._2)) + (kill, source) = singleClientQueryStream(jwt, uri, query) + .viaMat(KillSwitches.single)(Keep.right) + .preMaterialize() + lastState <- source via parseResp runWith resp(iouCid, kill) + liveOffset = inside(lastState) { case ShouldHaveEnded(liveStart, 2, lastSeen) => + lastSeen.unwrap should be > liveStart.unwrap + liveStart + } + rescan <- (singleClientQueryStream(jwt, uri, query, Some(liveOffset)) + via parseResp).take(1) runWith remainingDeltas + } yield inside(rescan) { + case (Vector((fstId, fst @ _), (sndId, snd @ _)), Vector(observeConsumed), Some(_)) => + Set(fstId, sndId, observeConsumed.contractId) should have size 3 + } + } + + "multi-party query should receive deltas as contracts are archived/created" in withHttpService { + (uri, encoder, _) => + import spray.json._ + + val f1 = + postCreateCommand( + accountCreateCommand(domain.Party("Alice"), "abc123"), + encoder, + uri, + headers = headersWithPartyAuth(List("Alice")), + ) + val f2 = + postCreateCommand( + accountCreateCommand(domain.Party("Bob"), "def456"), + encoder, + uri, + headers = headersWithPartyAuth(List("Bob")), + ) + + val query = + """[ + {"templateIds": ["Account:Account"]} + ]""" + + def resp( + cid1: domain.ContractId, + cid2: domain.ContractId, + kill: UniqueKillSwitch, + ): Sink[JsValue, Future[ShouldHaveEnded]] = { + val dslSyntax = Consume.syntax[JsValue] + import dslSyntax._ + Consume.interpret( + for { + Vector((account1, _), (account2, _)) <- readAcsN(2) + _ = Seq(account1, account2) should contain theSameElementsAs Seq(cid1, cid2) + ContractDelta(Vector(), _, Some(liveStartOffset)) <- readOne + _ <- liftF( + postCreateCommand( + accountCreateCommand(domain.Party("Alice"), "abc234"), + encoder, + uri, + headers = headersWithPartyAuth(List("Alice")), + ) + ) + ContractDelta(Vector((_, aliceAccount)), _, Some(_)) <- readOne + _ = inside(aliceAccount) { case JsObject(obj) => + inside((obj get "owner", obj get "number")) { + case (Some(JsString(owner)), Some(JsString(number))) => + owner shouldBe "Alice" + number shouldBe "abc234" + } + } + _ <- liftF( + postCreateCommand( + accountCreateCommand(domain.Party("Bob"), "def567"), + encoder, + uri, + headers = headersWithPartyAuth(List("Bob")), + ) + ) + ContractDelta(Vector((_, bobAccount)), _, Some(lastSeenOffset)) <- readOne + _ = inside(bobAccount) { case JsObject(obj) => + inside((obj get "owner", obj get "number")) { + case (Some(JsString(owner)), Some(JsString(number))) => + owner shouldBe "Bob" + number shouldBe "def567" + } + } + _ = kill.shutdown() + heartbeats <- drain + hbCount = (heartbeats.iterator.map { + case ContractDelta(Vector(), Vector(), Some(currentOffset)) => currentOffset + }.toSet + lastSeenOffset).size - 1 + } yield ( + // don't count empty events block if lastSeenOffset does not change + ShouldHaveEnded( + liveStartOffset = liveStartOffset, + msgCount = 5 + hbCount, + lastSeenOffset = lastSeenOffset, + ), + ) + ) + } + + for { + r1 <- f1 + _ = r1._1 shouldBe a[StatusCodes.Success] + cid1 = getContractId(getResult(r1._2)) + + r2 <- f2 + _ = r2._1 shouldBe a[StatusCodes.Success] + cid2 = getContractId(getResult(r2._2)) + + (kill, source) = singleClientQueryStream( + jwtForParties(List("Alice", "Bob"), List(), testId), + uri, + query, + ).viaMat(KillSwitches.single)(Keep.right).preMaterialize() + lastState <- source via parseResp runWith resp(cid1, cid2, kill) + liveOffset = inside(lastState) { case ShouldHaveEnded(liveStart, 5, lastSeen) => + lastSeen.unwrap should be > liveStart.unwrap + liveStart + } + rescan <- (singleClientQueryStream(jwt, uri, query, Some(liveOffset)) + via parseResp).take(1) runWith remainingDeltas + } yield inside(rescan) { case (Vector(_), _, Some(_)) => + succeed + } + } + + "fetch should receive deltas as contracts are archived/created, filtering out phantom archives" in withHttpService { + (uri, encoder, _) => + val templateId = domain.TemplateId(None, "Account", "Account") + def fetchRequest(contractIdAtOffset: Option[Option[domain.ContractId]] = None) = { + import spray.json._, json.JsonProtocol._ + List( + Map("templateId" -> "Account:Account".toJson, "key" -> List("Alice", "abc123").toJson) + ++ contractIdAtOffset + .map(ocid => contractIdAtOffsetKey -> ocid.toJson) + .toList + ).toJson.compactPrint + } + val f1 = + postCreateCommand(accountCreateCommand(domain.Party("Alice"), "abc123"), encoder, uri) + val f2 = + postCreateCommand(accountCreateCommand(domain.Party("Alice"), "def456"), encoder, uri) + + def resp( + cid1: domain.ContractId, + cid2: domain.ContractId, + kill: UniqueKillSwitch, + ): Sink[JsValue, Future[ShouldHaveEnded]] = { + val dslSyntax = Consume.syntax[JsValue] + import dslSyntax._ + Consume.interpret( + for { + ContractDelta(Vector((cid, c)), Vector(), None) <- readOne + _ = (cid: String) shouldBe (cid1.unwrap: String) + ctid <- liftF(postArchiveCommand(templateId, cid2, encoder, uri).flatMap { + case (statusCode, _) => + statusCode.isSuccess shouldBe true + postArchiveCommand(templateId, cid1, encoder, uri).map { case (statusCode, _) => + statusCode.isSuccess shouldBe true + cid + } + }) + + ContractDelta(Vector(), _, Some(offset)) <- readOne + (off, archivedCid) = (offset, ctid) + + ContractDelta(Vector(), Vector(observeArchivedCid), Some(lastSeenOffset)) <- readOne + (liveStartOffset, msgCount) = { + (observeArchivedCid.contractId.unwrap: String) shouldBe (archivedCid: String) + (observeArchivedCid.contractId: domain.ContractId) shouldBe (cid1: domain.ContractId) + (off, 0) + } + + _ = kill.shutdown() + heartbeats <- drain + hbCount = (heartbeats.iterator.map { + case ContractDelta(Vector(), Vector(), Some(currentOffset)) => currentOffset + }.toSet + lastSeenOffset).size - 1 + + } yield + // don't count empty events block if lastSeenOffset does not change + ShouldHaveEnded( + liveStartOffset = liveStartOffset, + msgCount = msgCount + hbCount, + lastSeenOffset = lastSeenOffset, + ) + ) + } + + for { + r1 <- f1 + _ = r1._1 shouldBe a[StatusCodes.Success] + cid1 = getContractId(getResult(r1._2)) + + r2 <- f2 + _ = r2._1 shouldBe a[StatusCodes.Success] + cid2 = getContractId(getResult(r2._2)) + + (kill, source) = singleClientFetchStream(jwt, uri, fetchRequest()) + .viaMat(KillSwitches.single)(Keep.right) + .preMaterialize() + + lastState <- source + .via(parseResp) runWith resp(cid1, cid2, kill) + + liveOffset = inside(lastState) { case ShouldHaveEnded(liveStart, 0, lastSeen) => + lastSeen.unwrap should be > liveStart.unwrap + liveStart + } + + // check contractIdAtOffsets' effects on phantom filtering + resumes <- Future.traverse(Seq((None, 2L), (Some(None), 0L), (Some(Some(cid1)), 1L))) { + case (abcHint, expectArchives) => + (singleClientFetchStream( + jwt, + uri, + fetchRequest(abcHint), + Some(liveOffset), + ) + via parseResp) + .take(2) + .runWith(remainingDeltas) + .map { case (creates, archives, _) => + creates shouldBe empty + archives should have size expectArchives + } + } + + } yield resumes.foldLeft(1 shouldBe 1)((_, a) => a) + } + + "fetch multiple keys should work" in withHttpService { (uri, encoder, _) => + def create(account: String): Future[domain.ContractId] = + for { + r <- postCreateCommand(accountCreateCommand(domain.Party("Alice"), account), encoder, uri) + } yield { + assert(r._1.isSuccess) + getContractId(getResult(r._2)) + } + def archive(id: domain.ContractId): Future[Assertion] = + for { + r <- postArchiveCommand(domain.TemplateId(None, "Account", "Account"), id, encoder, uri) + } yield { + assert(r._1.isSuccess) + } + val req = + """ + |[{"templateId": "Account:Account", "key": ["Alice", "abc123"]}, + | {"templateId": "Account:Account", "key": ["Alice", "def456"]}] + |""".stripMargin + val futureResults = + singleClientFetchStream(jwt, uri, req) + .via(parseResp) + .filterNot(isOffsetTick) + .take(4) + .runWith(Sink.seq[JsValue]) + + for { + cid1 <- create("abc123") + _ <- create("abc124") + _ <- create("abc125") + cid2 <- create("def456") + _ <- archive(cid2) + _ <- archive(cid1) + results <- futureResults + } yield { + val expected: Seq[JsValue] = { + import spray.json._ + Seq( + """ + |{"events":[{"created":{"payload":{"number":"abc123"}}}]} + |""".stripMargin.parseJson, + """ + |{"events":[{"created":{"payload":{"number":"def456"}}}]} + |""".stripMargin.parseJson, + """ + |{"events":[{"archived":{}}]} + |""".stripMargin.parseJson, + """ + |{"events":[{"archived":{}}]} + |""".stripMargin.parseJson, + ) + } + results should matchJsValues(expected) + + } + } + + "multi-party fetch-by-key query should receive deltas as contracts are archived/created" in withHttpService { + (uri, encoder, _) => + import spray.json._ + + val templateId = domain.TemplateId(None, "Account", "Account") + + val f1 = + postCreateCommand( + accountCreateCommand(domain.Party("Alice"), "abc123"), + encoder, + uri, + headers = headersWithPartyAuth(List("Alice")), + ) + val f2 = + postCreateCommand( + accountCreateCommand(domain.Party("Bob"), "def456"), + encoder, + uri, + headers = headersWithPartyAuth(List("Bob")), + ) + + val query = + """[ + {"templateId": "Account:Account", "key": ["Alice", "abc123"]}, + {"templateId": "Account:Account", "key": ["Bob", "def456"]} + ]""" + + def resp( + cid1: domain.ContractId, + cid2: domain.ContractId, + kill: UniqueKillSwitch, + ): Sink[JsValue, Future[ShouldHaveEnded]] = { + val dslSyntax = Consume.syntax[JsValue] + import dslSyntax._ + Consume.interpret( + for { + Vector((account1, _), (account2, _)) <- readAcsN(2) + _ = Seq(account1, account2) should contain theSameElementsAs Seq(cid1, cid2) + ContractDelta(Vector(), _, Some(liveStartOffset)) <- readOne + _ <- liftF(postArchiveCommand(templateId, cid1, encoder, uri)) + ContractDelta(Vector(), Vector(archivedCid1), Some(_)) <- readOne + _ = archivedCid1.contractId shouldBe cid1 + _ <- liftF( + postArchiveCommand( + templateId, + cid2, + encoder, + uri, + headers = headersWithPartyAuth(List("Bob")), + ) + ) + ContractDelta(Vector(), Vector(archivedCid2), Some(lastSeenOffset)) <- readOne + _ = archivedCid2.contractId shouldBe cid2 + _ = kill.shutdown() + heartbeats <- drain + hbCount = (heartbeats.iterator.map { + case ContractDelta(Vector(), Vector(), Some(currentOffset)) => currentOffset + }.toSet + lastSeenOffset).size - 1 + } yield ( + // don't count empty events block if lastSeenOffset does not change + ShouldHaveEnded( + liveStartOffset = liveStartOffset, + msgCount = 5 + hbCount, + lastSeenOffset = lastSeenOffset, + ), + ) + ) + } + + for { + r1 <- f1 + _ = r1._1 shouldBe a[StatusCodes.Success] + cid1 = getContractId(getResult(r1._2)) + + r2 <- f2 + _ = r2._1 shouldBe a[StatusCodes.Success] + cid2 = getContractId(getResult(r2._2)) + + (kill, source) = singleClientFetchStream( + jwtForParties(List("Alice", "Bob"), List(), testId), + uri, + query, + ).viaMat(KillSwitches.single)(Keep.right).preMaterialize() + lastState <- source via parseResp runWith resp(cid1, cid2, kill) + liveOffset = inside(lastState) { case ShouldHaveEnded(liveStart, 5, lastSeen) => + lastSeen.unwrap should be > liveStart.unwrap + liveStart + } + rescan <- (singleClientFetchStream( + jwtForParties(List("Alice", "Bob"), List(), testId), + uri, + query, + Some(liveOffset), + ) + via parseResp).take(2) runWith remainingDeltas + } yield inside(rescan) { case (Vector(), Vector(_, _), Some(_)) => + succeed + } + } + + /** Consume ACS blocks expecting `createCount` contracts. Fail if there + * are too many contracts. + */ + private[this] def readAcsN(createCount: Int): Consume.FCC[JsValue, Vector[(String, JsValue)]] = { + val dslSyntax = Consume.syntax[JsValue] + import dslSyntax._ + def go(createCount: Int): Consume.FCC[JsValue, Vector[(String, JsValue)]] = + if (createCount <= 0) point(Vector.empty) + else + for { + ContractDelta(creates, Vector(), None) <- readOne + found = creates.size + if found <= createCount + tail <- if (found < createCount) go(createCount - found) else point(Vector.empty) + } yield creates ++ tail + go(createCount) + } + + "fetch should should return an error if empty list of (templateId, key) pairs is passed" in withHttpService { + (uri, _, _) => + singleClientFetchStream(jwt, uri, "[]") + .runWith(collectResultsAsTextMessageSkipOffsetTicks) + .map { clientMsgs => + inside(clientMsgs) { case Seq(errorMsg) => + val errorResponse = decodeErrorResponse(errorMsg) + errorResponse.status shouldBe StatusCodes.BadRequest + inside(errorResponse.errors) { case List(error) => + error should include("must be a JSON array with at least 1 element") + } + } + }: Future[Assertion] + } + + "query on a bunch of random splits should yield consistent results" in withHttpService { + (uri, _, _) => + val splitSample = SplitSeq.gen.map(_ map (BigDecimal(_))).sample.get + val query = + """[ + {"templateIds": ["Iou:Iou"]} + ]""" + val (kill, source) = singleClientQueryStream(jwt, uri, query) + .viaMat(KillSwitches.single)(Keep.right) + .preMaterialize() + source + .via(parseResp) + .map(iouSplitResult) + .filterNot(_ == \/-((Vector(), Vector()))) // liveness marker/heartbeat + .runWith(Consume.interpret(trialSplitSeq(uri, splitSample, kill))) + } + + private def trialSplitSeq( + serviceUri: Uri, + ss: SplitSeq[BigDecimal], + kill: UniqueKillSwitch, + ): Consume.FCC[IouSplitResult, Assertion] = { + val dslSyntax = Consume.syntax[IouSplitResult] + import SplitSeq._ + import dslSyntax._ + def go( + createdCid: domain.ContractId, + ss: SplitSeq[BigDecimal], + ): Consume.FCC[IouSplitResult, Assertion] = ss match { + case Leaf(_) => + point(1 shouldBe 1) + case Node(_, l, r) => + for { + (StatusCodes.OK, _) <- liftF( + postJsonRequest( + serviceUri.withPath(Uri.Path("/v1/exercise")), + exercisePayload(createdCid, l.x), + headersWithAuth, + ) + ) + + \/-((Vector((cid1, amt1), (cid2, amt2)), Vector(archival))) <- readOne + (lCid, rCid) = { + archival should ===(createdCid) + Set(amt1, amt2) should ===(Set(l.x, r.x)) + if (amt1 == l.x) (cid1, cid2) else (cid2, cid1) + } + + _ <- go(lCid, l) + last <- go(rCid, r) + } yield last + } + + val initialPayload = { + import spray.json._, json.JsonProtocol._ + Map( + "templateId" -> "Iou:Iou".toJson, + "payload" -> Map( + "observers" -> List[String]().toJson, + "issuer" -> "Alice".toJson, + "amount" -> ss.x.toJson, + "currency" -> "USD".toJson, + "owner" -> "Alice".toJson, + ).toJson, + ).toJson + } + for { + (StatusCodes.OK, _) <- liftF( + postJsonRequest( + serviceUri.withPath(Uri.Path("/v1/create")), + initialPayload, + headersWithAuth, + ) + ) + \/-((Vector((genesisCid, amt)), Vector())) <- readOne + _ = amt should ===(ss.x) + last <- go(genesisCid, ss) + _ = kill.shutdown() + } yield last + } + + private def iouSplitResult(jsv: JsValue): IouSplitResult = jsv match { + case ContractDelta(creates, archives, _) => + creates traverse { + case (cid, JsObject(fields)) => + fields get "amount" collect { case JsString(amt) => + (domain.ContractId(cid), BigDecimal(amt)) + } + case _ => None + } map ((_, archives map (_.contractId))) toRightDisjunction jsv + case _ => -\/(jsv) + } + + "ContractKeyStreamRequest" - { + import spray.json._, json.JsonProtocol._ + val baseVal = + domain.EnrichedContractKey(domain.TemplateId(Some("ab"), "cd", "ef"), JsString("42"): JsValue) + val baseMap = baseVal.toJson.asJsObject.fields + val withSome = JsObject(baseMap + (contractIdAtOffsetKey -> JsString("hi"))) + val withNone = JsObject(baseMap + (contractIdAtOffsetKey -> JsNull)) + + "initial JSON reader" - { + type T = domain.ContractKeyStreamRequest[Unit, JsValue] + + "shares EnrichedContractKey format" in { + JsObject(baseMap).convertTo[T] should ===(domain.ContractKeyStreamRequest((), baseVal)) + } + + "errors on contractIdAtOffset presence" in { + a[DeserializationException] shouldBe thrownBy { + withSome.convertTo[T] + } + a[DeserializationException] shouldBe thrownBy { + withNone.convertTo[T] + } + } + } + + "resuming JSON reader" - { + type T = domain.ContractKeyStreamRequest[Option[Option[domain.ContractId]], JsValue] + + "shares EnrichedContractKey format" in { + JsObject(baseMap).convertTo[T] should ===(domain.ContractKeyStreamRequest(None, baseVal)) + } + + "distinguishes null and string" in { + withSome.convertTo[T] should ===(domain.ContractKeyStreamRequest(Some(Some("hi")), baseVal)) + withNone.convertTo[T] should ===(domain.ContractKeyStreamRequest(Some(None), baseVal)) + } + } + } + + private def wsConnectRequest[M]( + uri: Uri, + subprotocol: Option[String], + input: Source[Message, NotUsed], + ) = + Http().singleWebSocketRequest( + request = WebSocketRequest(uri = uri, subprotocol = subprotocol), + clientFlow = dummyFlow(input), + ) + + private def assertHeartbeat(str: String): Assertion = + inside( + SprayJson + .decode[EventsBlock](str) + ) { case \/-(eventsBlock) => + eventsBlock.events shouldBe Vector.empty[JsValue] + inside(eventsBlock.offset) { + case Some(JsString(offset)) => + offset.length should be > 0 + case Some(JsNull) => + Succeeded + } + } + + private def decodeErrorResponse(str: String): domain.ErrorResponse = { + import json.JsonProtocol._ + inside(SprayJson.decode[domain.ErrorResponse](str)) { case \/-(e) => + e + } + } + + private def decodeServiceWarning(str: String): domain.ServiceWarning = { + import json.JsonProtocol._ + inside(SprayJson.decode[domain.AsyncWarningsWrapper](str)) { case \/-(w) => + w.warnings + } + } +} diff --git a/ledger-service/http-json/src/main/scala/com/digitalasset/http/Config.scala b/ledger-service/http-json/src/main/scala/com/digitalasset/http/Config.scala index 1ce0d0e13b..b671d9dc86 100644 --- a/ledger-service/http-json/src/main/scala/com/digitalasset/http/Config.scala +++ b/ledger-service/http-json/src/main/scala/com/digitalasset/http/Config.scala @@ -105,6 +105,9 @@ private[http] final case class JdbcConfig( ) private[http] object JdbcConfig extends ConfigCompanion[JdbcConfig]("JdbcConfig") { + import dbbackend.ContractDao.supportedJdbcDriverNames + + private[this] def supportedDriversHelp = supportedJdbcDriverNames mkString ", " implicit val showInstance: Show[JdbcConfig] = Show.shows(a => s"JdbcConfig(driver=${a.driver}, url=${a.url}, user=${a.user}, createSchema=${a.createSchema})" @@ -112,8 +115,8 @@ private[http] object JdbcConfig extends ConfigCompanion[JdbcConfig]("JdbcConfig" lazy val help: String = "Contains comma-separated key-value pairs. Where:\n" + - s"${indent}driver -- JDBC driver class name, only org.postgresql.Driver supported right now,\n" + - s"${indent}url -- JDBC connection URL, only jdbc:postgresql supported right now,\n" + + s"${indent}driver -- JDBC driver class name, $supportedDriversHelp supported right now,\n" + + s"${indent}url -- JDBC connection URL,\n" + s"${indent}user -- database user name,\n" + s"${indent}password -- database user password,\n" + s"${indent}createSchema -- boolean flag, if set to true, the process will re-create database schema and terminate immediately.\n" + @@ -136,6 +139,11 @@ private[http] object JdbcConfig extends ConfigCompanion[JdbcConfig]("JdbcConfig" override def create(x: Map[String, String]): Either[String, JdbcConfig] = for { driver <- requiredField(x)("driver") + _ <- Either.cond( + supportedJdbcDriverNames(driver), + (), + s"$driver unsupported. Supported drivers: $supportedDriversHelp", + ) url <- requiredField(x)("url") user <- requiredField(x)("user") password <- requiredField(x)("password") diff --git a/ledger-service/http-json/src/main/scala/com/digitalasset/http/dbbackend/ContractDao.scala b/ledger-service/http-json/src/main/scala/com/digitalasset/http/dbbackend/ContractDao.scala index 0c9d044eeb..69b40ba5a3 100644 --- a/ledger-service/http-json/src/main/scala/com/digitalasset/http/dbbackend/ContractDao.scala +++ b/ledger-service/http-json/src/main/scala/com/digitalasset/http/dbbackend/ContractDao.scala @@ -19,12 +19,10 @@ import spray.json.{JsNull, JsValue} import scala.concurrent.ExecutionContext -class ContractDao(xa: Connection.T) { +class ContractDao private (xa: Connection.T)(implicit val jdbcDriver: SupportedJdbcDriver) { implicit val logHandler: log.LogHandler = Slf4jLogHandler(classOf[ContractDao]) - implicit val jdbcDriver: SupportedJdbcDriver = SupportedJdbcDriver.Postgres - def transact[A](query: ConnectionIO[A]): IO[A] = query.transact(xa) @@ -33,10 +31,25 @@ class ContractDao(xa: Connection.T) { } object ContractDao { + private[this] val supportedJdbcDrivers = Map( + "org.postgresql.Driver" -> SupportedJdbcDriver.Postgres, + "oracle.jdbc.OracleDriver" -> SupportedJdbcDriver.Oracle, + ) + + lazy val supportedJdbcDriverNames = supportedJdbcDrivers.keySet filter { d => + scala.util.Try(Class forName d).isSuccess + } + def apply(jdbcDriver: String, jdbcUrl: String, username: String, password: String)(implicit ec: ExecutionContext ): ContractDao = { val cs: ContextShift[IO] = IO.contextShift(ec) + implicit val sjd: SupportedJdbcDriver = supportedJdbcDrivers.getOrElse( + jdbcDriver, + throw new IllegalArgumentException( + s"JDBC driver $jdbcDriver is not one of ${supportedJdbcDrivers.keySet}" + ), + ) new ContractDao(Connection.connect(jdbcDriver, jdbcUrl, username, password)(cs)) } diff --git a/ledger-service/http-json/src/main/scala/com/digitalasset/http/dbbackend/SupportedJdbcDriver.scala b/ledger-service/http-json/src/main/scala/com/digitalasset/http/dbbackend/SupportedJdbcDriver.scala index 5b8c4ae509..ff3738a498 100644 --- a/ledger-service/http-json/src/main/scala/com/digitalasset/http/dbbackend/SupportedJdbcDriver.scala +++ b/ledger-service/http-json/src/main/scala/com/digitalasset/http/dbbackend/SupportedJdbcDriver.scala @@ -37,4 +37,15 @@ object SupportedJdbcDriver { Set(postgres_class23.UNIQUE_VIOLATION.value, ContractDao.StaleOffsetException.SqlState), ) } + + val Oracle: SupportedJdbcDriver = { + // import doobie.postgres.implicits.unliftedStringArrayType // TODO s11 just a thought + implicit val qqq: doobie.Meta[Array[String]] = + doobie.Meta[Int].timap(Array.fill(_)("x"))(_.length) + new SupportedJdbcDriver( + label = "Oracle", + queries = Queries.Oracle, + retrySqlStates = Set( /*s11 TODO, */ ContractDao.StaleOffsetException.SqlState), + ) + } } diff --git a/maven_install_2.12.json b/maven_install_2.12.json index d63342f6aa..471b380765 100644 --- a/maven_install_2.12.json +++ b/maven_install_2.12.json @@ -1,6 +1,6 @@ { "dependency_tree": { - "__AUTOGENERATED_FILE_DO_NOT_MODIFY_THIS_FILE_MANUALLY": 1301820626, + "__AUTOGENERATED_FILE_DO_NOT_MODIFY_THIS_FILE_MANUALLY": 702334853, "conflict_resolution": {}, "dependencies": [ { @@ -1530,6 +1530,28 @@ "sha256": "3ae59d060ba74fbfd511423e0ce6b85cb5e65699a6b7cd331b8ab80dde76198e", "url": "https://repo1.maven.org/maven2/com/lihaoyi/upickle-core_2.12/1.2.0/upickle-core_2.12-1.2.0-sources.jar" }, + { + "coord": "com.oracle.database.jdbc:ojdbc8:19.8.0.0", + "dependencies": [], + "directDependencies": [], + "file": "v1/https/repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/19.8.0.0/ojdbc8-19.8.0.0.jar", + "mirror_urls": [ + "https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/19.8.0.0/ojdbc8-19.8.0.0.jar" + ], + "sha256": "3ddde85f9a33d774c23b930b4ee27acb4d412c7c8c3aab4655bc1ee72339ac8d", + "url": "https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/19.8.0.0/ojdbc8-19.8.0.0.jar" + }, + { + "coord": "com.oracle.database.jdbc:ojdbc8:jar:sources:19.8.0.0", + "dependencies": [], + "directDependencies": [], + "file": "v1/https/repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/19.8.0.0/ojdbc8-19.8.0.0-sources.jar", + "mirror_urls": [ + "https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/19.8.0.0/ojdbc8-19.8.0.0-sources.jar" + ], + "sha256": "25a9cb395eee4096d117e8146a48dfa2c748eea7884b0fa01d8a1b99f399d01d", + "url": "https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/19.8.0.0/ojdbc8-19.8.0.0-sources.jar" + }, { "coord": "com.rabbitmq:amqp-client:5.5.3", "dependencies": [], diff --git a/maven_install_2.13.json b/maven_install_2.13.json index f4b0666227..11c8b00ba7 100644 --- a/maven_install_2.13.json +++ b/maven_install_2.13.json @@ -1,6 +1,6 @@ { "dependency_tree": { - "__AUTOGENERATED_FILE_DO_NOT_MODIFY_THIS_FILE_MANUALLY": 1532588487, + "__AUTOGENERATED_FILE_DO_NOT_MODIFY_THIS_FILE_MANUALLY": -1217143180, "conflict_resolution": {}, "dependencies": [ { @@ -1486,6 +1486,28 @@ "sha256": "047ffe7c529b7cbbd3efa7e35a7ce27d3c4ffae05fa77f1ff4cfd46a8b40a52a", "url": "https://repo1.maven.org/maven2/com/lihaoyi/upickle-core_2.13/1.2.0/upickle-core_2.13-1.2.0-sources.jar" }, + { + "coord": "com.oracle.database.jdbc:ojdbc8:19.8.0.0", + "dependencies": [], + "directDependencies": [], + "file": "v1/https/repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/19.8.0.0/ojdbc8-19.8.0.0.jar", + "mirror_urls": [ + "https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/19.8.0.0/ojdbc8-19.8.0.0.jar" + ], + "sha256": "3ddde85f9a33d774c23b930b4ee27acb4d412c7c8c3aab4655bc1ee72339ac8d", + "url": "https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/19.8.0.0/ojdbc8-19.8.0.0.jar" + }, + { + "coord": "com.oracle.database.jdbc:ojdbc8:jar:sources:19.8.0.0", + "dependencies": [], + "directDependencies": [], + "file": "v1/https/repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/19.8.0.0/ojdbc8-19.8.0.0-sources.jar", + "mirror_urls": [ + "https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/19.8.0.0/ojdbc8-19.8.0.0-sources.jar" + ], + "sha256": "25a9cb395eee4096d117e8146a48dfa2c748eea7884b0fa01d8a1b99f399d01d", + "url": "https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/19.8.0.0/ojdbc8-19.8.0.0-sources.jar" + }, { "coord": "com.rabbitmq:amqp-client:5.5.3", "dependencies": [],