Add json api endpoint refresh cache (#16987)

* added new end point to refresh the cache

* formatting

* returning old logic

* added logic to update the cache with a specific offset provided in the body

* formatting

* addressed comments

* formatting

* formatting

* formatting

* Return unit instead of List for processing refresh

* last changes on logic

* formatting

* simplify conversion

* comments addressed
This commit is contained in:
Erwin Ramirez 2023-06-23 19:49:34 +10:00 committed by GitHub
parent bce7726576
commit bba554481b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 262 additions and 15 deletions

View File

@ -5,14 +5,13 @@ package com.daml.http.dbbackend
import com.daml.lf.data.Ref
import com.daml.nonempty
import nonempty.{NonEmpty, +-:}
import nonempty.{+-:, NonEmpty}
import nonempty.NonEmptyReturningOps._
import doobie._
import doobie.implicits._
import scala.annotation.nowarn
import scala.collection.immutable.{Seq => ISeq, SortedMap}
import scalaz.{@@, Cord, Functor, OneAnd, Tag, \/, -\/, \/-}
import scala.collection.immutable.{SortedMap, Seq => ISeq}
import scalaz.{-\/, @@, Cord, Functor, OneAnd, Tag, \/, \/-}
import scalaz.Digit._0
import scalaz.syntax.foldable._
import scalaz.syntax.functor._
@ -193,9 +192,31 @@ sealed abstract class Queries(tablePrefix: String, tpIdCacheMaxEntries: Long)(im
tpid <- sql"""SELECT tpid FROM $templateIdTableName
WHERE (package_id = $packageId AND template_module_name = $moduleName
AND template_entity_name = $entityName)""".query[SurrogateTpId].unique
} yield tpid
// Returns a map from templates to the latest seen offset per party
final def templateOffsetsOlderThan(offsetLimitToRefresh: String)(implicit
log: LogHandler
): ConnectionIO[
Map[(String, String, String), NonEmpty[Map[String, String]]]
] = {
val allOffsetsQuery =
sql"""
SELECT o.party, o.last_offset, t.package_id, t.template_module_name, t.template_entity_name
FROM $ledgerOffsetTableName o
JOIN $templateIdTableName t
ON t.tpid = o.tpid
WHERE o.last_offset < $offsetLimitToRefresh
"""
allOffsetsQuery
.query[(String, String, String, String, String)]
.to[Vector]
.map {
_.groupBy1(x => (x._3, x._4, x._5))
.transform((_, tpos) => tpos.map(x => (x._1, x._2)).toMap)
}
}
protected def insertTemplateIdIfNotExists(
packageId: String,
moduleName: String,

View File

@ -4,7 +4,7 @@
package com.daml.http
abstract class HttpServiceWithOracleIntTest(override val disableContractPayloadIndexing: Boolean)
extends QueryStoreAndAuthDependentIntegrationTest
extends WithQueryStoreSetTest
with HttpServiceOracleInt {
override final def constrainedJsonQueries = !disableContractPayloadIndexing

View File

@ -11,7 +11,7 @@ import scala.concurrent.Future
@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
abstract class HttpServiceWithPostgresIntTest
extends QueryStoreAndAuthDependentIntegrationTest
extends WithQueryStoreSetTest
with PostgresAroundAll
with HttpServicePostgresInt {

View File

@ -124,6 +124,88 @@ trait AbstractHttpServiceIntegrationTestFunsCustomToken
}
trait WithQueryStoreSetTest extends QueryStoreAndAuthDependentIntegrationTest {
import HttpServiceTestFixture.archiveCommand
import json.JsonProtocol._
"refresh cache endpoint" - {
"should return latest offset when the cache is outdated" in withHttpService { fixture =>
import fixture.encoder
def archiveIou(headers: List[HttpHeader], contractId: domain.ContractId) = {
val reference = domain.EnrichedContractId(Some(TpId.Iou.Iou), contractId)
val exercise = archiveCommand(reference)
val exerciseJson: JsValue = encodeExercise(encoder)(exercise)
fixture
.postJsonRequest(Uri.Path("/v1/exercise"), exerciseJson, headers)
.parseResponse[domain.ExerciseResponse[JsValue]]
.flatMap(inside(_) { case domain.OkResponse(exercisedResponse, _, StatusCodes.OK) =>
assertExerciseResponseArchivedContract(exercisedResponse, exercise)
})
}
for {
(alice, aliceHeaders) <- fixture.getUniquePartyAndAuthHeaders("Alice")
searchDataSet = genSearchDataSet(alice)
contractIds <- searchExpectOk(
searchDataSet,
jsObject(
"""{"templateIds": ["Iou:Iou"], "query": {"currency": "EUR", "amount": "111.11"}}"""
),
fixture,
aliceHeaders,
).map { acl: List[domain.ActiveContract.ResolvedCtTyId[JsValue]] =>
acl.size shouldBe 1
acl.map(a => objectField(a.payload, "currency")) shouldBe List(Some(JsString("EUR")))
acl.map(a => objectField(a.payload, "amount")) shouldBe List(Some(JsString("111.11")))
acl.map(_.contractId)
}
_ <- contractIds.traverse(archiveIou(aliceHeaders, _))
res <-
fixture
.postJsonRequest(Uri.Path("/v1/refresh/cache"), jsObject("{}"), aliceHeaders)
.parseResponse[JsValue]
} yield {
inside(res) { case domain.OkResponse(s, _, StatusCodes.OK) =>
assert(s.toString.matches("""\[\{\"refreshedAt\":\"[0-9a-f]*\"\}\]"""))
}
}
}
"should return latest offset when the cache was up to date" in withHttpService { fixture =>
for {
res1 <- fixture.getUniquePartyAndAuthHeaders("Alice")
(alice, aliceHeaders) = res1
searchDataSet = genSearchDataSet(alice)
_ <- searchExpectOk(
searchDataSet,
jsObject(
"""{"templateIds": ["Iou:Iou"], "query": {"currency": "EUR", "amount": "111.11"}}"""
),
fixture,
aliceHeaders,
).map { acl: List[domain.ActiveContract.ResolvedCtTyId[JsValue]] =>
acl.size shouldBe 1
acl.map(a => objectField(a.payload, "currency")) shouldBe List(Some(JsString("EUR")))
acl.map(a => objectField(a.payload, "amount")) shouldBe List(Some(JsString("111.11")))
acl.map(_.contractId)
}
res <-
fixture
.postJsonRequest(Uri.Path("/v1/refresh/cache"), jsObject("{}"), aliceHeaders)
.parseResponse[JsValue]
} yield {
inside(res) { case domain.OkResponse(s, _, StatusCodes.OK) =>
assert(s.toString.matches("""\[\{\"refreshedAt\":\"[0-9a-f]*\"\}\]"""))
}
}
}
}
}
/** Tests that may behave differently depending on
*
* 1. whether custom or user tokens are used, ''and''
@ -1363,7 +1445,7 @@ abstract class QueryStoreAndAuthDependentIntegrationTest
} yield succeed
}
private def assertExerciseResponseArchivedContract(
protected def assertExerciseResponseArchivedContract(
exerciseResponse: domain.ExerciseResponse[JsValue],
exercise: domain.ExerciseCommand.OptionalPkg[v.Value, domain.EnrichedContractId],
): Assertion =
@ -1770,6 +1852,7 @@ abstract class QueryStoreAndAuthDependentIntegrationTest
_ <- queryUsers(alice)
} yield succeed
}
}
/** Tests that don't exercise the query store at all, but exercise different

View File

@ -252,6 +252,49 @@ private class ContractsFetch(
}
}
def fetchAndRefreshCache(
jwt: Jwt,
ledgerId: LedgerApiDomain.LedgerId,
ledgerEnd: Terminates.AtAbsolute,
offsetLimitToRefresh: domain.Offset,
)(implicit
ec: ExecutionContext,
mat: Materializer,
lc: LoggingContextOf[InstanceUUID with RequestID],
metrics: HttpJsonApiMetrics,
): ConnectionIO[Unit] = {
import sjd.q.queries
import cats.syntax.traverse._
debugLogActionWithMetrics(
s"cache refresh for templates older than offset: $offsetLimitToRefresh",
metrics.Db.warmCache,
) {
for {
oldTemplates <- queries.templateOffsetsOlderThan(offsetLimitToRefresh.unwrap)
_ = logger.debug(s"refreshing the cache for ${oldTemplates.size} templates")
_ <- oldTemplates
.map { case ((packageId, moduleName, entityName), partyOffsetsRaw) =>
val templateId = ContractTypeId.Template(packageId, moduleName, entityName)
val partyOffsets = partyOffsetsRaw.map { case (p, o) =>
(domain.Party(p), domain.Offset(o))
}.toMap
val fetchContext = FetchContext(jwt, ledgerId, partyOffsets.keySet)
contractsFromOffsetIo(
fetchContext,
templateId,
partyOffsets,
true,
ledgerEnd,
)
}
.toList
.sequence
} yield {}
}
}
private def prepareCreatedEventStorage(
ce: lav1.event.CreatedEvent,
d: ContractTypeId.Resolved,
@ -316,8 +359,8 @@ private class ContractsFetch(
debugLogActionWithMetrics(
s"cache refresh for templateId: $templateId",
metrics.Db.cacheUpdate,
metrics.Db.cacheUpdateStarted,
metrics.Db.cacheUpdateFailed,
Some(metrics.Db.cacheUpdateStarted),
Some(metrics.Db.cacheUpdateFailed),
) {
val graph = RunnableGraph.fromGraph(
GraphDSL.createGraph(
@ -392,10 +435,10 @@ private class ContractsFetch(
private def debugLogActionWithMetrics[T, C](
actionDescription: String,
timer: Timer,
startedCounter: Counter,
failedCounter: Counter,
optStartedCounter: Option[Counter] = None,
optFailedCounter: Option[Counter] = None,
)(block: => T)(implicit lc: LoggingContextOf[C]): T = {
startedCounter.inc()
optStartedCounter.foreach(_.inc())
val timerHandler = timer.startAsync()
val startTime = System.nanoTime()
logger.debug(s"Starting $actionDescription")
@ -404,7 +447,7 @@ private class ContractsFetch(
block
} catch {
case e: Exception =>
failedCounter.inc()
optFailedCounter.foreach(_.inc())
logger.error(
s"Failed $actionDescription after ${(System.nanoTime() - startTime) / 1000000L}ms because: $e"
)

View File

@ -10,7 +10,12 @@ import akka.stream.Materializer
import com.daml.lf
import com.daml.http.LedgerClientJwt.Terminates
import com.daml.http.dbbackend.ContractDao
import com.daml.http.domain.{ContractTypeId, GetActiveContractsRequest, JwtPayload}
import com.daml.http.domain.{
ContractTypeId,
GetActiveContractsRequest,
JwtPayload,
RefreshCacheRequest,
}
import ContractTypeId.toLedgerApiValue
import com.daml.http.json.JsonProtocol.LfValueCodec
import com.daml.http.query.ValuePredicate
@ -306,6 +311,46 @@ class ContractsService(
}
}
def refreshCache(
jwt: Jwt,
jwtPayload: JwtPayload,
refreshCacheRequest: RefreshCacheRequest,
)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID],
metrics: HttpJsonApiMetrics,
): Future[domain.SyncResponse[Source[Error \/ domain.RefreshCacheResult, NotUsed]]] = {
val ledgerId = toLedgerId(jwtPayload.ledgerId)
getTermination(jwt, ledgerId)(lc).map { optLedgerEnd =>
optLedgerEnd.cata(
{ ledgerEnd =>
daoAndFetch.cata(
{ case (dao, fetchService) =>
val response: Source[Error \/ domain.RefreshCacheResult, NotUsed] = {
val offsetLimitToRefresh = refreshCacheRequest.offset.getOrElse(ledgerEnd.toDomain)
val futureValue =
dao
.transact(
fetchService
.fetchAndRefreshCache(jwt, ledgerId, ledgerEnd, offsetLimitToRefresh)
)
.unsafeToFuture()
Source
.future(futureValue)
.map { _ =>
\/.right(domain.RefreshCacheResult(offsetLimitToRefresh))
}
}
domain.OkResponse(response)
},
mkErrorResponse("No query-storage is set to update cache", None),
)
}, {
mkErrorResponse("Ledger is at the beginning, cache do not have anything to update", None)
},
)
}
}
private def handleResolvedQueryErrors(
warnings: Option[domain.UnknownTemplateIds]
): domain.ResolvedQuery.Unsupported => domain.ErrorResponse = unsuppoerted =>

View File

@ -333,6 +333,7 @@ class Endpoints(
path("create-and-exercise") apply toRoute(
createAndExercise(req)
),
path("refresh" / "cache") apply toRoute(refreshCache(req)),
path("query") apply toRoute(query(req)),
path("fetch") apply toRoute(fetch(req)),
path("user") apply toPostRoute(req, getUser),

View File

@ -173,6 +173,10 @@ package domain {
readAs: Option[NonEmptyList[Party]],
)
final case class RefreshCacheRequest(
offset: Option[domain.Offset]
)
final case class SearchForeverRequest(
queriesWithPos: NonEmptyList[(SearchForeverQuery, Int)]
)
@ -453,6 +457,8 @@ package domain {
completionOffset: CompletionOffset,
)
final case class RefreshCacheResult(refreshedAt: Offset)
object PartyDetails {
def fromLedgerApi(p: com.daml.ledger.api.domain.PartyDetails): PartyDetails =
PartyDetails(Party(p.party), p.displayName, p.isLocal)

View File

@ -149,6 +149,46 @@ private[http] final class ContractList(
} yield res
}.run
def refreshCache(req: HttpRequest)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID],
metrics: HttpJsonApiMetrics,
): Future[Error \/ SearchResult[Error \/ JsValue]] = {
for {
it <- inputAndJwtPayload[JwtPayload](req).leftMap(identity[Error])
(jwt, jwtPayload, reqBody) = it
response <- withJwtPayloadLoggingContext(jwtPayload) { implicit lc =>
val result = for {
cmd <- SprayJson
.decode[domain.RefreshCacheRequest](reqBody)
.liftErr[Error](InvalidUserInput)
} yield withEnrichedLoggingContext(
LoggingContextOf.label[domain.RefreshCacheRequest],
"cmd" -> cmd.toString,
).run { implicit lc =>
logger.debug("Starting refresh cache to the latest offset")
contractsService
.refreshCache(jwt, jwtPayload, cmd)
.map(
domain.SyncResponse.covariant.map(_)(
_.via(handleSourceFailure)
.map {
case x @ -\/(error) =>
logger.error(s"Error refreshing the cache with error: $error")
x
case x @ \/-(_) =>
logger.debug(s"Successfully refreshed cache")
x.flatMap(
toJsValue[domain.RefreshCacheResult](_)
)
}
)
)
}
eitherT(result.sequence)
}
} yield response
}.run
private def handleSourceFailure[E, A](implicit
E: IntoEndpointsError[E]
): Flow[E \/ A, Error \/ A, NotUsed] =

View File

@ -400,6 +400,10 @@ object JsonProtocol extends JsonProtocolLow {
requestJsonReaderPlusOne(ReadersKey)(domain.GetActiveContractsRequest)
}
implicit val RefreshCacheRequestFormat: RootJsonReader[domain.RefreshCacheRequest] = {
jsonFormat1(domain.RefreshCacheRequest)
}
implicit val SearchForeverQueryFormat: RootJsonReader[domain.SearchForeverQuery] = {
val OffsetKey = "offset"
requestJsonReaderPlusOne(OffsetKey)(domain.SearchForeverQuery)
@ -571,6 +575,9 @@ object JsonProtocol extends JsonProtocolLow {
implicit val ExerciseResponseFormat: RootJsonFormat[domain.ExerciseResponse[JsValue]] =
jsonFormat3(domain.ExerciseResponse[JsValue])
implicit val RefreshCacheResultFormat: RootJsonFormat[domain.RefreshCacheResult] =
jsonFormat1(domain.RefreshCacheResult)
implicit val CreateCommandResponseFormat: RootJsonFormat[domain.CreateCommandResponse[JsValue]] =
jsonFormat8(domain.CreateCommandResponse[JsValue])

View File

@ -58,6 +58,7 @@ class HttpJsonApiMetrics(
/** * cache metrics **
*/
val warmCache: Timer = defaultMetricsFactory.timer(prefix :+ "warm_cache")
val cacheUpdate: Timer = defaultMetricsFactory.timer(prefix :+ "cache_update")
// The cache update completed count can be derived from the count of the `cacheUpdate` timer
val cacheUpdateStarted: Counter =