mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 09:17:43 +03:00
Fix the case when start offset > end offset (#4283)
* Fix the case when start offset > end offset Actually skip the call completely, all needed data supposed to be already in the query-store, fetched by a concurrent thread/client. Moving the check into the `getCreatesAndArchivesSince` call CHANGELOG_BEGIN CHANGELOG_END * Remove flaky, want to see the test failing. * reformat * Add AbsoluteOffsetOrdering so we can handle both offset formats * Code review comments * generators WIP * generators WIP * testing the laws * remove `implicitly` call
This commit is contained in:
parent
b472b530af
commit
9f57994f2d
@ -115,8 +115,6 @@ da_scala_test(
|
||||
":Account.dar",
|
||||
"//docs:quickstart-model.dar",
|
||||
],
|
||||
# See https://github.com/digital-asset/daml/issues/4281
|
||||
flaky = True,
|
||||
resources = glob(["src/test/resources/**/*"]),
|
||||
scalacopts = hj_scalacopts,
|
||||
deps = [
|
||||
|
@ -197,20 +197,10 @@ private class ContractsFetch(
|
||||
)
|
||||
(stepsAndOffset.out0, stepsAndOffset.out1)
|
||||
|
||||
case AbsoluteBookmark(x) =>
|
||||
// TODO(Leo/Stephen): figure out why it happens, can't reproduce it locally
|
||||
// https://github.com/digital-asset/daml/blob/e2a5b264750dd9d96a50c8fd180a08d6f2eb0860/ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/stores/SandboxIndexAndWriteService.scala#L275
|
||||
if (x.unwrap.toLong > absEnd.off.value.toLong) {
|
||||
logger.warn(
|
||||
s"Trying to fetch transactions with LedgerBegin($x) > LedgerEnd($absEnd). Returning empty stream.")
|
||||
val stepsAndOffset = builder add transactionsFollowingBoundary(_ => Source.empty)
|
||||
stepsAndOffset.in <~ Source.single(domain.Offset.tag.unsubst(offset))
|
||||
(stepsAndOffset.out0, stepsAndOffset.out1)
|
||||
} else {
|
||||
val stepsAndOffset = builder add transactionsFollowingBoundary(txnK)
|
||||
stepsAndOffset.in <~ Source.single(domain.Offset.tag.unsubst(offset))
|
||||
(stepsAndOffset.out0, stepsAndOffset.out1)
|
||||
}
|
||||
case AbsoluteBookmark(_) =>
|
||||
val stepsAndOffset = builder add transactionsFollowingBoundary(txnK)
|
||||
stepsAndOffset.in <~ Source.single(domain.Offset.tag.unsubst(offset))
|
||||
(stepsAndOffset.out0, stepsAndOffset.out1)
|
||||
}
|
||||
|
||||
val transactInsertsDeletes = Flow
|
||||
|
@ -80,7 +80,21 @@ object LedgerClientJwt {
|
||||
LedgerOffset(LedgerOffset.Value.Boundary(LedgerOffset.LedgerBoundary.LEDGER_END))
|
||||
|
||||
def getCreatesAndArchivesSince(client: LedgerClient): GetCreatesAndArchivesSince =
|
||||
(jwt, filter, offset, terminates) =>
|
||||
client.transactionClient
|
||||
.getTransactions(offset, terminates.toOffset, filter, verbose = true, token = bearer(jwt))
|
||||
(jwt, filter, offset, terminates) => {
|
||||
val end = terminates.toOffset
|
||||
if (skipRequest(offset, end))
|
||||
Source.empty[Transaction]
|
||||
else
|
||||
client.transactionClient
|
||||
.getTransactions(offset, terminates.toOffset, filter, verbose = true, token = bearer(jwt))
|
||||
}
|
||||
|
||||
private def skipRequest(start: LedgerOffset, end: Option[LedgerOffset]): Boolean = {
|
||||
import com.digitalasset.http.util.LedgerOffsetUtil.AbsoluteOffsetOrdering
|
||||
(start.value, end.map(_.value)) match {
|
||||
case (s: LedgerOffset.Value.Absolute, Some(e: LedgerOffset.Value.Absolute)) =>
|
||||
AbsoluteOffsetOrdering.gteq(s, e)
|
||||
case _ => false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,34 @@
|
||||
// Copyright (c) 2020 The DAML Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.digitalasset.http.util
|
||||
|
||||
import com.digitalasset.ledger.api.v1.ledger_offset.LedgerOffset
|
||||
import scalaz.{-\/, \/, \/-}
|
||||
|
||||
object LedgerOffsetUtil {
|
||||
|
||||
private val LongEitherLongLongOrdering: Ordering[Long \/ (Long, Long)] = {
|
||||
import scalaz.std.tuple._
|
||||
import scalaz.std.anyVal._
|
||||
scalaz.Order[Long \/ (Long, Long)].toScalaOrdering
|
||||
}
|
||||
|
||||
implicit val AbsoluteOffsetOrdering: Ordering[LedgerOffset.Value.Absolute] =
|
||||
Ordering.by(parseOffset)(LongEitherLongLongOrdering)
|
||||
|
||||
private def parseOffset(a: LedgerOffset.Value.Absolute): Long \/ (Long, Long) = {
|
||||
val offset: String = a.value
|
||||
offset.split('-') match {
|
||||
case Array(_, a2, a3) =>
|
||||
\/-((a2.toLong, a3.toLong))
|
||||
case Array(a1) =>
|
||||
-\/(a1.toLong)
|
||||
case _ =>
|
||||
throw new IllegalArgumentException(
|
||||
"Expected either numeric or composite offset in the format: '<block-hash>-<block-height>-<event-id>'," +
|
||||
s" got: ${offset: String}"
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
@ -145,4 +145,17 @@ object Generators {
|
||||
Gen.identifier.map(JsString(_): JsValue),
|
||||
Gen.posNum[Int].map(JsNumber(_): JsValue)
|
||||
)
|
||||
|
||||
def absoluteLedgerOffsetVal: Gen[lav1.ledger_offset.LedgerOffset.Value.Absolute] =
|
||||
Gen.oneOf(IntAbsoluteLedgerOffsetVal, CompositeAbsoluteLedgerOffsetVal)
|
||||
|
||||
def IntAbsoluteLedgerOffsetVal: Gen[lav1.ledger_offset.LedgerOffset.Value.Absolute] =
|
||||
Gen.posNum[Int].map(n => lav1.ledger_offset.LedgerOffset.Value.Absolute(n.toString))
|
||||
|
||||
def CompositeAbsoluteLedgerOffsetVal: Gen[lav1.ledger_offset.LedgerOffset.Value.Absolute] =
|
||||
for {
|
||||
a1 <- Gen.identifier
|
||||
a2 <- Gen.posNum[Int]
|
||||
a3 <- Gen.posNum[Int]
|
||||
} yield lav1.ledger_offset.LedgerOffset.Value.Absolute(s"${a1: String}-${a2: Int}-${a3: Int}")
|
||||
}
|
||||
|
@ -0,0 +1,37 @@
|
||||
// Copyright (c) 2020 The DAML Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.digitalasset.http.util
|
||||
|
||||
import com.digitalasset.daml.lf.data.FlatSpecCheckLaws
|
||||
import com.digitalasset.http.Generators
|
||||
import com.digitalasset.ledger.api.v1.ledger_offset.LedgerOffset
|
||||
import org.scalatest.prop.GeneratorDrivenPropertyChecks
|
||||
import org.scalatest.{FlatSpec, Matchers}
|
||||
import scalaz.scalacheck.ScalazProperties
|
||||
|
||||
class LedgerOffsetUtilTest
|
||||
extends FlatSpec
|
||||
with Matchers
|
||||
with FlatSpecCheckLaws
|
||||
with GeneratorDrivenPropertyChecks {
|
||||
|
||||
implicit override val generatorDrivenConfig: PropertyCheckConfiguration =
|
||||
PropertyCheckConfiguration(minSuccessful = 100)
|
||||
|
||||
import LedgerOffsetUtilTest._
|
||||
|
||||
behavior of LedgerOffsetUtil.AbsoluteOffsetOrdering.getClass.getSimpleName
|
||||
|
||||
checkLaws(ScalazProperties.order.laws[LedgerOffset.Value.Absolute])
|
||||
}
|
||||
|
||||
object LedgerOffsetUtilTest {
|
||||
import org.scalacheck.Arbitrary
|
||||
|
||||
implicit val arbAbsoluteOffset: Arbitrary[LedgerOffset.Value.Absolute] = Arbitrary(
|
||||
Generators.absoluteLedgerOffsetVal)
|
||||
|
||||
implicit val scalazOrder: scalaz.Order[LedgerOffset.Value.Absolute] =
|
||||
scalaz.Order.fromScalaOrdering(LedgerOffsetUtil.AbsoluteOffsetOrdering)
|
||||
}
|
Loading…
Reference in New Issue
Block a user