diff --git a/ledger-service/db-backend/src/main/scala/com/digitalasset/http/dbbackend/Queries.scala b/ledger-service/db-backend/src/main/scala/com/digitalasset/http/dbbackend/Queries.scala index 6a98393261..e19432078c 100644 --- a/ledger-service/db-backend/src/main/scala/com/digitalasset/http/dbbackend/Queries.scala +++ b/ledger-service/db-backend/src/main/scala/com/digitalasset/http/dbbackend/Queries.scala @@ -5,14 +5,13 @@ package com.daml.http.dbbackend import com.daml.lf.data.Ref import com.daml.nonempty -import nonempty.{NonEmpty, +-:} +import nonempty.{+-:, NonEmpty} import nonempty.NonEmptyReturningOps._ - import doobie._ import doobie.implicits._ import scala.annotation.nowarn -import scala.collection.immutable.{Seq => ISeq, SortedMap} -import scalaz.{@@, Cord, Functor, OneAnd, Tag, \/, -\/, \/-} +import scala.collection.immutable.{SortedMap, Seq => ISeq} +import scalaz.{-\/, @@, Cord, Functor, OneAnd, Tag, \/, \/-} import scalaz.Digit._0 import scalaz.syntax.foldable._ import scalaz.syntax.functor._ @@ -193,9 +192,31 @@ sealed abstract class Queries(tablePrefix: String, tpIdCacheMaxEntries: Long)(im tpid <- sql"""SELECT tpid FROM $templateIdTableName WHERE (package_id = $packageId AND template_module_name = $moduleName AND template_entity_name = $entityName)""".query[SurrogateTpId].unique - } yield tpid + // Returns a map from templates to the latest seen offset per party + final def templateOffsetsOlderThan(offsetLimitToRefresh: String)(implicit + log: LogHandler + ): ConnectionIO[ + Map[(String, String, String), NonEmpty[Map[String, String]]] + ] = { + val allOffsetsQuery = + sql""" + SELECT o.party, o.last_offset, t.package_id, t.template_module_name, t.template_entity_name + FROM $ledgerOffsetTableName o + JOIN $templateIdTableName t + ON t.tpid = o.tpid + WHERE o.last_offset < $offsetLimitToRefresh + """ + allOffsetsQuery + .query[(String, String, String, String, String)] + .to[Vector] + .map { + _.groupBy1(x => (x._3, x._4, x._5)) + .transform((_, tpos) => tpos.map(x => (x._1, x._2)).toMap) + } + } + protected def insertTemplateIdIfNotExists( packageId: String, moduleName: String, diff --git a/ledger-service/http-json-oracle/src/itlib/scala/http/HttpServiceWithOracleIntTest.scala b/ledger-service/http-json-oracle/src/itlib/scala/http/HttpServiceWithOracleIntTest.scala index f2f14b3014..1f9aecfdea 100644 --- a/ledger-service/http-json-oracle/src/itlib/scala/http/HttpServiceWithOracleIntTest.scala +++ b/ledger-service/http-json-oracle/src/itlib/scala/http/HttpServiceWithOracleIntTest.scala @@ -4,7 +4,7 @@ package com.daml.http abstract class HttpServiceWithOracleIntTest(override val disableContractPayloadIndexing: Boolean) - extends QueryStoreAndAuthDependentIntegrationTest + extends WithQueryStoreSetTest with HttpServiceOracleInt { override final def constrainedJsonQueries = !disableContractPayloadIndexing diff --git a/ledger-service/http-json/src/it/scala/http/HttpServiceWithPostgresIntTest.scala b/ledger-service/http-json/src/it/scala/http/HttpServiceWithPostgresIntTest.scala index e037aaa365..73ec2dec5c 100644 --- a/ledger-service/http-json/src/it/scala/http/HttpServiceWithPostgresIntTest.scala +++ b/ledger-service/http-json/src/it/scala/http/HttpServiceWithPostgresIntTest.scala @@ -11,7 +11,7 @@ import scala.concurrent.Future @SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements")) abstract class HttpServiceWithPostgresIntTest - extends QueryStoreAndAuthDependentIntegrationTest + extends WithQueryStoreSetTest with PostgresAroundAll with HttpServicePostgresInt { diff --git a/ledger-service/http-json/src/itlib/scala/http/AbstractHttpServiceIntegrationTest.scala b/ledger-service/http-json/src/itlib/scala/http/AbstractHttpServiceIntegrationTest.scala index ffbd4866f6..c9ba5ab782 100644 --- a/ledger-service/http-json/src/itlib/scala/http/AbstractHttpServiceIntegrationTest.scala +++ b/ledger-service/http-json/src/itlib/scala/http/AbstractHttpServiceIntegrationTest.scala @@ -124,6 +124,88 @@ trait AbstractHttpServiceIntegrationTestFunsCustomToken } +trait WithQueryStoreSetTest extends QueryStoreAndAuthDependentIntegrationTest { + import HttpServiceTestFixture.archiveCommand + import json.JsonProtocol._ + + "refresh cache endpoint" - { + "should return latest offset when the cache is outdated" in withHttpService { fixture => + import fixture.encoder + def archiveIou(headers: List[HttpHeader], contractId: domain.ContractId) = { + val reference = domain.EnrichedContractId(Some(TpId.Iou.Iou), contractId) + val exercise = archiveCommand(reference) + val exerciseJson: JsValue = encodeExercise(encoder)(exercise) + fixture + .postJsonRequest(Uri.Path("/v1/exercise"), exerciseJson, headers) + .parseResponse[domain.ExerciseResponse[JsValue]] + .flatMap(inside(_) { case domain.OkResponse(exercisedResponse, _, StatusCodes.OK) => + assertExerciseResponseArchivedContract(exercisedResponse, exercise) + }) + } + + for { + (alice, aliceHeaders) <- fixture.getUniquePartyAndAuthHeaders("Alice") + searchDataSet = genSearchDataSet(alice) + contractIds <- searchExpectOk( + searchDataSet, + jsObject( + """{"templateIds": ["Iou:Iou"], "query": {"currency": "EUR", "amount": "111.11"}}""" + ), + fixture, + aliceHeaders, + ).map { acl: List[domain.ActiveContract.ResolvedCtTyId[JsValue]] => + acl.size shouldBe 1 + acl.map(a => objectField(a.payload, "currency")) shouldBe List(Some(JsString("EUR"))) + acl.map(a => objectField(a.payload, "amount")) shouldBe List(Some(JsString("111.11"))) + acl.map(_.contractId) + } + + _ <- contractIds.traverse(archiveIou(aliceHeaders, _)) + + res <- + fixture + .postJsonRequest(Uri.Path("/v1/refresh/cache"), jsObject("{}"), aliceHeaders) + .parseResponse[JsValue] + + } yield { + inside(res) { case domain.OkResponse(s, _, StatusCodes.OK) => + assert(s.toString.matches("""\[\{\"refreshedAt\":\"[0-9a-f]*\"\}\]""")) + } + } + } + + "should return latest offset when the cache was up to date" in withHttpService { fixture => + for { + res1 <- fixture.getUniquePartyAndAuthHeaders("Alice") + (alice, aliceHeaders) = res1 + searchDataSet = genSearchDataSet(alice) + _ <- searchExpectOk( + searchDataSet, + jsObject( + """{"templateIds": ["Iou:Iou"], "query": {"currency": "EUR", "amount": "111.11"}}""" + ), + fixture, + aliceHeaders, + ).map { acl: List[domain.ActiveContract.ResolvedCtTyId[JsValue]] => + acl.size shouldBe 1 + acl.map(a => objectField(a.payload, "currency")) shouldBe List(Some(JsString("EUR"))) + acl.map(a => objectField(a.payload, "amount")) shouldBe List(Some(JsString("111.11"))) + acl.map(_.contractId) + } + res <- + fixture + .postJsonRequest(Uri.Path("/v1/refresh/cache"), jsObject("{}"), aliceHeaders) + .parseResponse[JsValue] + + } yield { + inside(res) { case domain.OkResponse(s, _, StatusCodes.OK) => + assert(s.toString.matches("""\[\{\"refreshedAt\":\"[0-9a-f]*\"\}\]""")) + } + } + } + } +} + /** Tests that may behave differently depending on * * 1. whether custom or user tokens are used, ''and'' @@ -1363,7 +1445,7 @@ abstract class QueryStoreAndAuthDependentIntegrationTest } yield succeed } - private def assertExerciseResponseArchivedContract( + protected def assertExerciseResponseArchivedContract( exerciseResponse: domain.ExerciseResponse[JsValue], exercise: domain.ExerciseCommand.OptionalPkg[v.Value, domain.EnrichedContractId], ): Assertion = @@ -1770,6 +1852,7 @@ abstract class QueryStoreAndAuthDependentIntegrationTest _ <- queryUsers(alice) } yield succeed } + } /** Tests that don't exercise the query store at all, but exercise different diff --git a/ledger-service/http-json/src/main/scala/com/digitalasset/http/ContractsFetch.scala b/ledger-service/http-json/src/main/scala/com/digitalasset/http/ContractsFetch.scala index be2f64ecb7..f4ba856f02 100644 --- a/ledger-service/http-json/src/main/scala/com/digitalasset/http/ContractsFetch.scala +++ b/ledger-service/http-json/src/main/scala/com/digitalasset/http/ContractsFetch.scala @@ -252,6 +252,49 @@ private class ContractsFetch( } } + def fetchAndRefreshCache( + jwt: Jwt, + ledgerId: LedgerApiDomain.LedgerId, + ledgerEnd: Terminates.AtAbsolute, + offsetLimitToRefresh: domain.Offset, + )(implicit + ec: ExecutionContext, + mat: Materializer, + lc: LoggingContextOf[InstanceUUID with RequestID], + metrics: HttpJsonApiMetrics, + ): ConnectionIO[Unit] = { + import sjd.q.queries + import cats.syntax.traverse._ + + debugLogActionWithMetrics( + s"cache refresh for templates older than offset: $offsetLimitToRefresh", + metrics.Db.warmCache, + ) { + + for { + oldTemplates <- queries.templateOffsetsOlderThan(offsetLimitToRefresh.unwrap) + _ = logger.debug(s"refreshing the cache for ${oldTemplates.size} templates") + _ <- oldTemplates + .map { case ((packageId, moduleName, entityName), partyOffsetsRaw) => + val templateId = ContractTypeId.Template(packageId, moduleName, entityName) + val partyOffsets = partyOffsetsRaw.map { case (p, o) => + (domain.Party(p), domain.Offset(o)) + }.toMap + val fetchContext = FetchContext(jwt, ledgerId, partyOffsets.keySet) + contractsFromOffsetIo( + fetchContext, + templateId, + partyOffsets, + true, + ledgerEnd, + ) + } + .toList + .sequence + } yield {} + } + } + private def prepareCreatedEventStorage( ce: lav1.event.CreatedEvent, d: ContractTypeId.Resolved, @@ -316,8 +359,8 @@ private class ContractsFetch( debugLogActionWithMetrics( s"cache refresh for templateId: $templateId", metrics.Db.cacheUpdate, - metrics.Db.cacheUpdateStarted, - metrics.Db.cacheUpdateFailed, + Some(metrics.Db.cacheUpdateStarted), + Some(metrics.Db.cacheUpdateFailed), ) { val graph = RunnableGraph.fromGraph( GraphDSL.createGraph( @@ -392,10 +435,10 @@ private class ContractsFetch( private def debugLogActionWithMetrics[T, C]( actionDescription: String, timer: Timer, - startedCounter: Counter, - failedCounter: Counter, + optStartedCounter: Option[Counter] = None, + optFailedCounter: Option[Counter] = None, )(block: => T)(implicit lc: LoggingContextOf[C]): T = { - startedCounter.inc() + optStartedCounter.foreach(_.inc()) val timerHandler = timer.startAsync() val startTime = System.nanoTime() logger.debug(s"Starting $actionDescription") @@ -404,7 +447,7 @@ private class ContractsFetch( block } catch { case e: Exception => - failedCounter.inc() + optFailedCounter.foreach(_.inc()) logger.error( s"Failed $actionDescription after ${(System.nanoTime() - startTime) / 1000000L}ms because: $e" ) diff --git a/ledger-service/http-json/src/main/scala/com/digitalasset/http/ContractsService.scala b/ledger-service/http-json/src/main/scala/com/digitalasset/http/ContractsService.scala index 53af42189a..bf8f147595 100644 --- a/ledger-service/http-json/src/main/scala/com/digitalasset/http/ContractsService.scala +++ b/ledger-service/http-json/src/main/scala/com/digitalasset/http/ContractsService.scala @@ -10,7 +10,12 @@ import akka.stream.Materializer import com.daml.lf import com.daml.http.LedgerClientJwt.Terminates import com.daml.http.dbbackend.ContractDao -import com.daml.http.domain.{ContractTypeId, GetActiveContractsRequest, JwtPayload} +import com.daml.http.domain.{ + ContractTypeId, + GetActiveContractsRequest, + JwtPayload, + RefreshCacheRequest, +} import ContractTypeId.toLedgerApiValue import com.daml.http.json.JsonProtocol.LfValueCodec import com.daml.http.query.ValuePredicate @@ -306,6 +311,46 @@ class ContractsService( } } + def refreshCache( + jwt: Jwt, + jwtPayload: JwtPayload, + refreshCacheRequest: RefreshCacheRequest, + )(implicit + lc: LoggingContextOf[InstanceUUID with RequestID], + metrics: HttpJsonApiMetrics, + ): Future[domain.SyncResponse[Source[Error \/ domain.RefreshCacheResult, NotUsed]]] = { + val ledgerId = toLedgerId(jwtPayload.ledgerId) + getTermination(jwt, ledgerId)(lc).map { optLedgerEnd => + optLedgerEnd.cata( + { ledgerEnd => + daoAndFetch.cata( + { case (dao, fetchService) => + val response: Source[Error \/ domain.RefreshCacheResult, NotUsed] = { + val offsetLimitToRefresh = refreshCacheRequest.offset.getOrElse(ledgerEnd.toDomain) + val futureValue = + dao + .transact( + fetchService + .fetchAndRefreshCache(jwt, ledgerId, ledgerEnd, offsetLimitToRefresh) + ) + .unsafeToFuture() + Source + .future(futureValue) + .map { _ => + \/.right(domain.RefreshCacheResult(offsetLimitToRefresh)) + } + } + domain.OkResponse(response) + }, + mkErrorResponse("No query-storage is set to update cache", None), + ) + }, { + mkErrorResponse("Ledger is at the beginning, cache do not have anything to update", None) + }, + ) + } + } + private def handleResolvedQueryErrors( warnings: Option[domain.UnknownTemplateIds] ): domain.ResolvedQuery.Unsupported => domain.ErrorResponse = unsuppoerted => diff --git a/ledger-service/http-json/src/main/scala/com/digitalasset/http/Endpoints.scala b/ledger-service/http-json/src/main/scala/com/digitalasset/http/Endpoints.scala index fbbae2b899..2ebf653e6a 100644 --- a/ledger-service/http-json/src/main/scala/com/digitalasset/http/Endpoints.scala +++ b/ledger-service/http-json/src/main/scala/com/digitalasset/http/Endpoints.scala @@ -333,6 +333,7 @@ class Endpoints( path("create-and-exercise") apply toRoute( createAndExercise(req) ), + path("refresh" / "cache") apply toRoute(refreshCache(req)), path("query") apply toRoute(query(req)), path("fetch") apply toRoute(fetch(req)), path("user") apply toPostRoute(req, getUser), diff --git a/ledger-service/http-json/src/main/scala/com/digitalasset/http/domain.scala b/ledger-service/http-json/src/main/scala/com/digitalasset/http/domain.scala index f414fc5a77..d4daee8965 100644 --- a/ledger-service/http-json/src/main/scala/com/digitalasset/http/domain.scala +++ b/ledger-service/http-json/src/main/scala/com/digitalasset/http/domain.scala @@ -173,6 +173,10 @@ package domain { readAs: Option[NonEmptyList[Party]], ) + final case class RefreshCacheRequest( + offset: Option[domain.Offset] + ) + final case class SearchForeverRequest( queriesWithPos: NonEmptyList[(SearchForeverQuery, Int)] ) @@ -453,6 +457,8 @@ package domain { completionOffset: CompletionOffset, ) + final case class RefreshCacheResult(refreshedAt: Offset) + object PartyDetails { def fromLedgerApi(p: com.daml.ledger.api.domain.PartyDetails): PartyDetails = PartyDetails(Party(p.party), p.displayName, p.isLocal) diff --git a/ledger-service/http-json/src/main/scala/com/digitalasset/http/endpoints/ContractList.scala b/ledger-service/http-json/src/main/scala/com/digitalasset/http/endpoints/ContractList.scala index 3071a6080a..95528a122a 100644 --- a/ledger-service/http-json/src/main/scala/com/digitalasset/http/endpoints/ContractList.scala +++ b/ledger-service/http-json/src/main/scala/com/digitalasset/http/endpoints/ContractList.scala @@ -149,6 +149,46 @@ private[http] final class ContractList( } yield res }.run + def refreshCache(req: HttpRequest)(implicit + lc: LoggingContextOf[InstanceUUID with RequestID], + metrics: HttpJsonApiMetrics, + ): Future[Error \/ SearchResult[Error \/ JsValue]] = { + for { + it <- inputAndJwtPayload[JwtPayload](req).leftMap(identity[Error]) + (jwt, jwtPayload, reqBody) = it + response <- withJwtPayloadLoggingContext(jwtPayload) { implicit lc => + val result = for { + cmd <- SprayJson + .decode[domain.RefreshCacheRequest](reqBody) + .liftErr[Error](InvalidUserInput) + } yield withEnrichedLoggingContext( + LoggingContextOf.label[domain.RefreshCacheRequest], + "cmd" -> cmd.toString, + ).run { implicit lc => + logger.debug("Starting refresh cache to the latest offset") + contractsService + .refreshCache(jwt, jwtPayload, cmd) + .map( + domain.SyncResponse.covariant.map(_)( + _.via(handleSourceFailure) + .map { + case x @ -\/(error) => + logger.error(s"Error refreshing the cache with error: $error") + x + case x @ \/-(_) => + logger.debug(s"Successfully refreshed cache") + x.flatMap( + toJsValue[domain.RefreshCacheResult](_) + ) + } + ) + ) + } + eitherT(result.sequence) + } + } yield response + }.run + private def handleSourceFailure[E, A](implicit E: IntoEndpointsError[E] ): Flow[E \/ A, Error \/ A, NotUsed] = diff --git a/ledger-service/http-json/src/main/scala/com/digitalasset/http/json/JsonProtocol.scala b/ledger-service/http-json/src/main/scala/com/digitalasset/http/json/JsonProtocol.scala index 7ea990544c..d96f4fff6f 100644 --- a/ledger-service/http-json/src/main/scala/com/digitalasset/http/json/JsonProtocol.scala +++ b/ledger-service/http-json/src/main/scala/com/digitalasset/http/json/JsonProtocol.scala @@ -400,6 +400,10 @@ object JsonProtocol extends JsonProtocolLow { requestJsonReaderPlusOne(ReadersKey)(domain.GetActiveContractsRequest) } + implicit val RefreshCacheRequestFormat: RootJsonReader[domain.RefreshCacheRequest] = { + jsonFormat1(domain.RefreshCacheRequest) + } + implicit val SearchForeverQueryFormat: RootJsonReader[domain.SearchForeverQuery] = { val OffsetKey = "offset" requestJsonReaderPlusOne(OffsetKey)(domain.SearchForeverQuery) @@ -571,6 +575,9 @@ object JsonProtocol extends JsonProtocolLow { implicit val ExerciseResponseFormat: RootJsonFormat[domain.ExerciseResponse[JsValue]] = jsonFormat3(domain.ExerciseResponse[JsValue]) + implicit val RefreshCacheResultFormat: RootJsonFormat[domain.RefreshCacheResult] = + jsonFormat1(domain.RefreshCacheResult) + implicit val CreateCommandResponseFormat: RootJsonFormat[domain.CreateCommandResponse[JsValue]] = jsonFormat8(domain.CreateCommandResponse[JsValue]) diff --git a/ledger-service/metrics/src/main/scala/com/digitalasset/http/metrics/HttpJsonApiMetrics.scala b/ledger-service/metrics/src/main/scala/com/digitalasset/http/metrics/HttpJsonApiMetrics.scala index fbba1cf54d..501147b30e 100644 --- a/ledger-service/metrics/src/main/scala/com/digitalasset/http/metrics/HttpJsonApiMetrics.scala +++ b/ledger-service/metrics/src/main/scala/com/digitalasset/http/metrics/HttpJsonApiMetrics.scala @@ -58,6 +58,7 @@ class HttpJsonApiMetrics( /** * cache metrics ** */ + val warmCache: Timer = defaultMetricsFactory.timer(prefix :+ "warm_cache") val cacheUpdate: Timer = defaultMetricsFactory.timer(prefix :+ "cache_update") // The cache update completed count can be derived from the count of the `cacheUpdate` timer val cacheUpdateStarted: Counter =