mirror of
https://github.com/digital-asset/daml.git
synced 2024-11-04 00:36:58 +03:00
avoiding linear searching for transactions from genesis (#994)
* avoiding linear searching for transactions from genesis * validating transaction ids * one more failing test nailed down * fixing off by 1 error * docs + release notes * fixing EventIdFormatterSpec * fixing broken validation test
This commit is contained in:
parent
a0338bdc6a
commit
4abc18d8a1
@ -8,6 +8,7 @@ This page contains release notes for the SDK.
|
||||
|
||||
HEAD — ongoing
|
||||
--------------
|
||||
- Making transaction lookups performant so we can handle such requests for large ledgers as well
|
||||
|
||||
- Sandbox: Transactions with a record time that is after the maximum record time (as provided in the original command)
|
||||
are now properly rejected instead of committed to the ledger: `#987 <https://github.com/digital-asset/daml/issues/987>`_
|
||||
|
@ -5,6 +5,7 @@ package com.digitalasset.ledger.backend.api.v1
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.stream.scaladsl.Source
|
||||
import com.digitalasset.ledger.backend.api.v1.LedgerSyncEvent.AcceptedTransaction
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
@ -72,9 +73,9 @@ trait LedgerBackend extends AutoCloseable {
|
||||
* on reconnects to the Ledger API that they are connected to the same
|
||||
* ledger and can therefore expect to receive the same data on calls that
|
||||
* return append-only data. It is expected to be:
|
||||
* (1) immutable over the lifetime of a [[LedgerBackend]] instance,
|
||||
* (2) globally unique with high-probability,
|
||||
* (3) matching the regexp [a-zA-Z0-9]+.
|
||||
* (1) immutable over the lifetime of a [[LedgerBackend]] instance,
|
||||
* (2) globally unique with high-probability,
|
||||
* (3) matching the regexp [a-zA-Z0-9]+.
|
||||
*
|
||||
* Implementations where Participant nodes share a global view on all
|
||||
* transactions in the ledger (e.g, via a blockchain) are expected to use
|
||||
@ -101,17 +102,17 @@ trait LedgerBackend extends AutoCloseable {
|
||||
|
||||
/** Return the stream of ledger events starting from and including the given offset.
|
||||
*
|
||||
* @param offset: the ledger offset starting from which events should be streamed.
|
||||
* @param offset : the ledger offset starting from which events should be streamed.
|
||||
*
|
||||
* The stream only terminates if there was an error.
|
||||
* The stream only terminates if there was an error.
|
||||
*
|
||||
* Two calls to this method with the same arguments are related such
|
||||
* that
|
||||
* (1) all events are delivered in the same order, but
|
||||
* (2) [[LedgerSyncEvent.RejectedCommand]] and [[LedgerSyncEvent.Heartbeat]] events can be elided if their
|
||||
* recordTime is equal to the preceding event.
|
||||
* This rule provides implementors with the freedom to not persist
|
||||
* [[LedgerSyncEvent.RejectedCommand]] and [[LedgerSyncEvent.Heartbeat]] events.
|
||||
* Two calls to this method with the same arguments are related such
|
||||
* that
|
||||
* (1) all events are delivered in the same order, but
|
||||
* (2) [[LedgerSyncEvent.RejectedCommand]] and [[LedgerSyncEvent.Heartbeat]] events can be elided if their
|
||||
* recordTime is equal to the preceding event.
|
||||
* This rule provides implementors with the freedom to not persist
|
||||
* [[LedgerSyncEvent.RejectedCommand]] and [[LedgerSyncEvent.Heartbeat]] events.
|
||||
*
|
||||
*/
|
||||
def ledgerSyncEvents(offset: Option[LedgerSyncOffset] = None): Source[LedgerSyncEvent, NotUsed]
|
||||
@ -153,4 +154,7 @@ trait LedgerBackend extends AutoCloseable {
|
||||
*
|
||||
*/
|
||||
def getCurrentLedgerEnd: Future[LedgerSyncOffset]
|
||||
|
||||
/** Looks up a transaction by its id. */
|
||||
def getTransactionById(transactionId: TransactionId): Future[Option[AcceptedTransaction]]
|
||||
}
|
||||
|
@ -157,14 +157,14 @@ class TransactionServiceRequestValidator(
|
||||
req: GetTransactionByIdRequest): Result[transaction.GetTransactionByIdRequest] = {
|
||||
for {
|
||||
ledgerId <- matchId(req.ledgerId)
|
||||
_ <- requireNonEmptyString(req.transactionId, "transaction_id")
|
||||
trId <- requireNumber(req.transactionId, "transaction_id")
|
||||
_ <- requireNonEmpty(req.requestingParties, "requesting_parties")
|
||||
parties <- requireParties(req.requestingParties)
|
||||
_ <- requireKnownParties(parties)
|
||||
} yield {
|
||||
transaction.GetTransactionByIdRequest(
|
||||
ledgerId,
|
||||
domain.TransactionId(req.transactionId),
|
||||
domain.TransactionId(trId.toString),
|
||||
parties,
|
||||
req.traceContext.map(toBrave))
|
||||
}
|
||||
|
@ -8,6 +8,7 @@ import com.digitalasset.platform.server.api.validation.ErrorFactories._
|
||||
import io.grpc.StatusRuntimeException
|
||||
|
||||
import scala.language.higherKinds
|
||||
import scala.util.Try
|
||||
|
||||
trait FieldValidations {
|
||||
|
||||
@ -19,6 +20,12 @@ trait FieldValidations {
|
||||
if (s.nonEmpty) Right(s)
|
||||
else Left(missingField(fieldName))
|
||||
|
||||
def requireNumber(s: String, fieldName: String): Either[StatusRuntimeException, Long] =
|
||||
for {
|
||||
s <- requireNonEmptyString(s, fieldName)
|
||||
number <- Try(s.toLong).toEither.left.map(t => invalidField(fieldName, t.getMessage))
|
||||
} yield number
|
||||
|
||||
def requirePackageId(
|
||||
s: String,
|
||||
fieldName: String): Either[StatusRuntimeException, Ref.PackageId] =
|
||||
|
@ -30,7 +30,7 @@ trait ValidatorTestUtils extends Matchers with Inside with OptionValues { self:
|
||||
protected val party = Ref.Party.assertFromString("party")
|
||||
protected val verbose = false
|
||||
protected val eventId = "eventId"
|
||||
protected val transactionId = "transactionId"
|
||||
protected val transactionId = "42"
|
||||
protected val offsetOrdering = Ordering.by[domain.LedgerOffset.Absolute, Int](_.value.toInt)
|
||||
protected val ledgerEnd = domain.LedgerOffset.Absolute("1000")
|
||||
|
||||
|
@ -11,7 +11,7 @@ import scala.util.Try
|
||||
|
||||
object SandboxEventIdFormatter {
|
||||
|
||||
case class TransactionIdWithIndex(transactionId: String, nodeId: Transaction.NodeId)
|
||||
case class TransactionIdWithIndex(transactionId: Long, nodeId: Transaction.NodeId)
|
||||
|
||||
def makeAbsCoid(transactionId: String)(coid: Lf.ContractId): Lf.AbsoluteContractId = coid match {
|
||||
case a @ Lf.AbsoluteContractId(_) => a
|
||||
@ -34,8 +34,10 @@ object SandboxEventIdFormatter {
|
||||
case Array(transactionId, index) =>
|
||||
transactionId.splitAt(1) match {
|
||||
case ("#", transId) =>
|
||||
Try(index.toInt).toOption
|
||||
.map(ix => TransactionIdWithIndex(transId, Transaction.NodeId.unsafeFromIndex(ix)))
|
||||
(for {
|
||||
ix <- Try(index.toInt)
|
||||
tId <- Try(transId.toLong)
|
||||
} yield TransactionIdWithIndex(tId, Transaction.NodeId.unsafeFromIndex(ix))).toOption
|
||||
case _ => None
|
||||
}
|
||||
case _ => None
|
||||
|
@ -7,7 +7,7 @@ import java.util.concurrent.atomic.AtomicLong
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.stream.Materializer
|
||||
import akka.stream.scaladsl.{Sink, Source}
|
||||
import akka.stream.scaladsl.Source
|
||||
import com.digitalasset.api.util.TimestampConversion._
|
||||
import com.digitalasset.daml.lf.data.Ref.Party
|
||||
import com.digitalasset.grpc.adapter.ExecutionSequencerFactory
|
||||
@ -75,7 +75,6 @@ class SandboxTransactionService private (val ledgerBackend: LedgerBackend, paral
|
||||
request)
|
||||
|
||||
val eventFilter = EventFilter.byTemplates(request.filter)
|
||||
val requestingParties = request.filter.filtersByParty.keys.toList
|
||||
|
||||
transactionPipeline
|
||||
.run(request.begin, request.end)
|
||||
@ -161,7 +160,9 @@ class SandboxTransactionService private (val ledgerBackend: LedgerBackend, paral
|
||||
.withDescription(s"invalid eventId: ${request.eventId}")
|
||||
.asRuntimeException())) {
|
||||
case TransactionIdWithIndex(transactionId, index) =>
|
||||
lookUpTreeByTransactionId(TransactionId(transactionId), request.requestingParties)
|
||||
lookUpTreeByTransactionId(
|
||||
TransactionId(transactionId.toString),
|
||||
request.requestingParties)
|
||||
}
|
||||
}
|
||||
|
||||
@ -181,7 +182,9 @@ class SandboxTransactionService private (val ledgerBackend: LedgerBackend, paral
|
||||
.withDescription(s"invalid eventId: ${request.eventId}")
|
||||
.asRuntimeException())) {
|
||||
case TransactionIdWithIndex(transactionId, index) =>
|
||||
lookUpFlatByTransactionId(TransactionId(transactionId), request.requestingParties)
|
||||
lookUpFlatByTransactionId(
|
||||
TransactionId(transactionId.toString),
|
||||
request.requestingParties)
|
||||
}
|
||||
}
|
||||
|
||||
@ -198,13 +201,9 @@ class SandboxTransactionService private (val ledgerBackend: LedgerBackend, paral
|
||||
|
||||
private def lookUpTreeByTransactionId(
|
||||
transactionId: TransactionId,
|
||||
requestingParties: Set[Party]): Future[Option[VisibleTransaction]] = {
|
||||
requestingParties: Set[Party]): Future[Option[VisibleTransaction]] =
|
||||
transactionPipeline
|
||||
.run(LedgerOffset.LedgerBegin, Some(LedgerOffset.LedgerEnd))
|
||||
.collect {
|
||||
case t: AcceptedTransaction if t.transactionId == transactionId => t
|
||||
}
|
||||
.runWith(Sink.headOption)
|
||||
.getTransactionById(transactionId.unwrap)
|
||||
.flatMap {
|
||||
case Some(trans) =>
|
||||
Future.successful(toResponseIfVisible(requestingParties, trans))
|
||||
@ -215,17 +214,12 @@ class SandboxTransactionService private (val ledgerBackend: LedgerBackend, paral
|
||||
.withDescription(s"$transactionId could not be found")
|
||||
.asRuntimeException())
|
||||
}
|
||||
}
|
||||
|
||||
private def lookUpFlatByTransactionId(
|
||||
transactionId: TransactionId,
|
||||
requestingParties: Set[Party]): Future[Option[PTransaction]] = {
|
||||
requestingParties: Set[Party]): Future[Option[PTransaction]] =
|
||||
transactionPipeline
|
||||
.run(LedgerOffset.LedgerBegin, Some(LedgerOffset.LedgerEnd))
|
||||
.collect {
|
||||
case t: AcceptedTransaction if t.transactionId == transactionId => t
|
||||
}
|
||||
.runWith(Sink.headOption)
|
||||
.getTransactionById(transactionId.unwrap)
|
||||
.flatMap {
|
||||
case Some(trans) =>
|
||||
val eventFilter = EventFilter.byTemplates(
|
||||
@ -239,7 +233,6 @@ class SandboxTransactionService private (val ledgerBackend: LedgerBackend, paral
|
||||
.withDescription(s"$transactionId could not be found")
|
||||
.asRuntimeException())
|
||||
}
|
||||
}
|
||||
|
||||
private def toTransactionWithMeta(trans: AcceptedTransaction) =
|
||||
TransactionWithMeta(
|
||||
|
@ -7,9 +7,11 @@ import akka.NotUsed
|
||||
import akka.stream.scaladsl.Source
|
||||
import com.digitalasset.ledger.api.domain.LedgerOffset
|
||||
import com.digitalasset.ledger.backend.api.v1.LedgerSyncEvent.AcceptedTransaction
|
||||
import com.digitalasset.ledger.backend.api.v1.{LedgerBackend, LedgerSyncEvent}
|
||||
import com.digitalasset.ledger.backend.api.v1.{LedgerBackend, LedgerSyncEvent, TransactionId}
|
||||
import com.digitalasset.platform.common.util.{DirectExecutionContext => DEC}
|
||||
import com.digitalasset.platform.server.services.transaction.{OffsetHelper, OffsetSection}
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.util.{Failure, Success, Try}
|
||||
|
||||
protected class TransactionPipeline(ledgerBackend: LedgerBackend) {
|
||||
@ -31,15 +33,21 @@ protected class TransactionPipeline(ledgerBackend: LedgerBackend) {
|
||||
|
||||
subscribeUntil
|
||||
.fold(eventStream)(su => eventStream.untilRequired(su.toLong))
|
||||
.collect {
|
||||
// the offset we get from LedgerBackend is the actual offset of the entry. We need to return the next one
|
||||
// however on the API so clients can resubscribe with the received offset without getting duplicates
|
||||
case t: AcceptedTransaction => t.copy(offset = (t.offset.toLong + 1).toString)
|
||||
}
|
||||
.collect { case t: AcceptedTransaction => increaseOffset(t) }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def getTransactionById(transactionId: TransactionId): Future[Option[AcceptedTransaction]] =
|
||||
ledgerBackend
|
||||
.getTransactionById(transactionId)
|
||||
.map(_.map(increaseOffset))(DEC)
|
||||
|
||||
// the offset we get from LedgerBackend is the actual offset of the entry. We need to return the next one
|
||||
// however on the API so clients can resubscribe with the received offset without getting duplicates
|
||||
private def increaseOffset(t: AcceptedTransaction) =
|
||||
t.copy(offset = (t.offset.toLong + 1).toString)
|
||||
|
||||
private def getOffsetHelper(ledgerEnd: String) = {
|
||||
new OffsetHelper[String] {
|
||||
override def fromOpaque(opaque: String): Try[String] = Success(opaque)
|
||||
@ -52,7 +60,6 @@ protected class TransactionPipeline(ledgerBackend: LedgerBackend) {
|
||||
java.lang.Long.compare(o1.toLong, o2.toLong)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object TransactionPipeline {
|
||||
|
@ -12,7 +12,11 @@ import com.digitalasset.api.util.TimeProvider
|
||||
import com.digitalasset.daml.lf.transaction.Node.GlobalKey
|
||||
import com.digitalasset.daml.lf.value.Value
|
||||
import com.digitalasset.daml.lf.value.Value.AbsoluteContractId
|
||||
import com.digitalasset.ledger.backend.api.v1.{SubmissionResult, TransactionSubmission}
|
||||
import com.digitalasset.ledger.backend.api.v1.{
|
||||
SubmissionResult,
|
||||
TransactionId,
|
||||
TransactionSubmission
|
||||
}
|
||||
import com.digitalasset.platform.sandbox.metrics.MetricsManager
|
||||
import com.digitalasset.platform.sandbox.stores.ActiveContracts.ActiveContract
|
||||
import com.digitalasset.platform.sandbox.stores.ActiveContractsInMemory
|
||||
@ -40,6 +44,9 @@ trait Ledger extends AutoCloseable {
|
||||
def publishHeartbeat(time: Instant): Future[Unit]
|
||||
|
||||
def publishTransaction(transactionSubmission: TransactionSubmission): Future[SubmissionResult]
|
||||
|
||||
def lookupTransaction(
|
||||
transactionId: TransactionId): Future[Option[(Long, LedgerEntry.Transaction)]]
|
||||
}
|
||||
|
||||
object Ledger {
|
||||
|
@ -10,7 +10,11 @@ import akka.stream.scaladsl.Source
|
||||
import com.digitalasset.daml.lf.transaction.Node.GlobalKey
|
||||
import com.digitalasset.daml.lf.value.Value
|
||||
import com.digitalasset.daml.lf.value.Value.AbsoluteContractId
|
||||
import com.digitalasset.ledger.backend.api.v1.{SubmissionResult, TransactionSubmission}
|
||||
import com.digitalasset.ledger.backend.api.v1.{
|
||||
SubmissionResult,
|
||||
TransactionId,
|
||||
TransactionSubmission
|
||||
}
|
||||
import com.digitalasset.platform.sandbox.metrics.MetricsManager
|
||||
import com.digitalasset.platform.sandbox.stores.ActiveContracts.ActiveContract
|
||||
|
||||
@ -42,6 +46,10 @@ private class MeteredLedger(ledger: Ledger, mm: MetricsManager) extends Ledger {
|
||||
transactionSubmission: TransactionSubmission): Future[SubmissionResult] =
|
||||
mm.timedFuture("Ledger:publishTransaction", ledger.publishTransaction(transactionSubmission))
|
||||
|
||||
override def lookupTransaction(
|
||||
transactionId: TransactionId): Future[Option[(Long, LedgerEntry.Transaction)]] =
|
||||
mm.timedFuture("Ledger:lookupTransaction", ledger.lookupTransaction(transactionId))
|
||||
|
||||
override def close(): Unit = {
|
||||
ledger.close()
|
||||
}
|
||||
|
@ -16,7 +16,7 @@ import com.digitalasset.ledger.backend.api.v1.LedgerSyncEvent.{
|
||||
RejectedCommand
|
||||
}
|
||||
import com.digitalasset.ledger.backend.api.v1._
|
||||
import com.digitalasset.platform.common.util.DirectExecutionContext
|
||||
import com.digitalasset.platform.common.util.{DirectExecutionContext => DEC}
|
||||
import com.digitalasset.platform.sandbox.stores.ActiveContracts
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
@ -27,9 +27,8 @@ class SandboxLedgerBackend(ledger: Ledger)(implicit mat: Materializer) extends L
|
||||
private class SandboxSubmissionHandle extends SubmissionHandle {
|
||||
override def abort: Future[Unit] = Future.successful(())
|
||||
|
||||
override def submit(submitted: TransactionSubmission): Future[SubmissionResult] = {
|
||||
override def submit(submitted: TransactionSubmission): Future[SubmissionResult] =
|
||||
ledger.publishTransaction(submitted)
|
||||
}
|
||||
|
||||
private[this] def canSeeContract(
|
||||
submitter: Party,
|
||||
@ -45,12 +44,12 @@ class SandboxLedgerBackend(ledger: Ledger)(implicit mat: Materializer) extends L
|
||||
.lookupContract(contractId)
|
||||
.map(_.collect {
|
||||
case ac if canSeeContract(submitter, ac) => ac.contract
|
||||
})(DirectExecutionContext)
|
||||
})(DEC)
|
||||
|
||||
override def lookupContractKey(
|
||||
submitter: Party,
|
||||
key: Node.GlobalKey): Future[Option[Value.AbsoluteContractId]] = {
|
||||
implicit val ec: ExecutionContext = DirectExecutionContext
|
||||
implicit val ec: ExecutionContext = DEC
|
||||
ledger.lookupKey(key).flatMap {
|
||||
// note that we need to check visibility for keys, too, otherwise we leak the existence of a non-divulged
|
||||
// contract if we return `Some`.
|
||||
@ -96,7 +95,7 @@ class SandboxLedgerBackend(ledger: Ledger)(implicit mat: Materializer) extends L
|
||||
ac.witnesses
|
||||
)
|
||||
|
||||
private def toLedgerSyncEvent(offset: Long, item: LedgerEntry): LedgerSyncEvent = {
|
||||
private def toLedgerSyncEvent(offset: Long, item: LedgerEntry): LedgerSyncEvent =
|
||||
item match {
|
||||
case LedgerEntry.Rejection(
|
||||
recordTime,
|
||||
@ -111,37 +110,49 @@ class SandboxLedgerBackend(ledger: Ledger)(implicit mat: Materializer) extends L
|
||||
rejectionReason,
|
||||
offset.toString,
|
||||
Some(applicationId))
|
||||
case LedgerEntry.Transaction(
|
||||
commandId,
|
||||
transactionId,
|
||||
applicationId,
|
||||
submittingParty,
|
||||
workflowId,
|
||||
ledgerEffectiveTime,
|
||||
recordedAt,
|
||||
transaction,
|
||||
explicitDisclosure) =>
|
||||
AcceptedTransaction(
|
||||
transaction,
|
||||
transactionId,
|
||||
Some(submittingParty),
|
||||
ledgerEffectiveTime,
|
||||
recordedAt,
|
||||
offset.toString,
|
||||
workflowId,
|
||||
explicitDisclosure.mapValues(_.map(Ref.Party.assertFromString)),
|
||||
Some(applicationId),
|
||||
Some(commandId)
|
||||
)
|
||||
case t: LedgerEntry.Transaction => toAcceptedTransaction(offset, t)
|
||||
case LedgerEntry.Checkpoint(recordedAt) =>
|
||||
Heartbeat(
|
||||
recordedAt,
|
||||
offset.toString
|
||||
)
|
||||
}
|
||||
|
||||
override def close(): Unit =
|
||||
ledger.close()
|
||||
|
||||
private def toAcceptedTransaction(offset: Long, t: LedgerEntry.Transaction) = t match {
|
||||
case LedgerEntry.Transaction(
|
||||
commandId,
|
||||
transactionId,
|
||||
applicationId,
|
||||
submittingParty,
|
||||
workflowId,
|
||||
ledgerEffectiveTime,
|
||||
recordedAt,
|
||||
transaction,
|
||||
explicitDisclosure) =>
|
||||
AcceptedTransaction(
|
||||
transaction,
|
||||
transactionId,
|
||||
Some(submittingParty),
|
||||
ledgerEffectiveTime,
|
||||
recordedAt,
|
||||
offset.toString,
|
||||
workflowId,
|
||||
explicitDisclosure.mapValues(_.map(Ref.Party.assertFromString)),
|
||||
Some(applicationId),
|
||||
Some(commandId)
|
||||
)
|
||||
}
|
||||
|
||||
override def close(): Unit = {
|
||||
ledger.close()
|
||||
}
|
||||
override def getTransactionById(
|
||||
transactionId: TransactionId): Future[Option[AcceptedTransaction]] =
|
||||
ledger
|
||||
.lookupTransaction(transactionId)
|
||||
.map(_.map {
|
||||
case (offset, t) =>
|
||||
toAcceptedTransaction(offset, t)
|
||||
})(DEC)
|
||||
|
||||
}
|
||||
|
@ -14,6 +14,7 @@ import com.digitalasset.ledger.api.domain.{ApplicationId, CommandId}
|
||||
import com.digitalasset.ledger.backend.api.v1.{
|
||||
RejectionReason,
|
||||
SubmissionResult,
|
||||
TransactionId,
|
||||
TransactionSubmission
|
||||
}
|
||||
import com.digitalasset.platform.sandbox.services.transaction.SandboxEventIdFormatter
|
||||
@ -159,4 +160,13 @@ class InMemoryLedger(
|
||||
|
||||
override def close(): Unit = ()
|
||||
|
||||
override def lookupTransaction(
|
||||
transactionId: TransactionId): Future[Option[(Long, LedgerEntry.Transaction)]] =
|
||||
Future.successful(
|
||||
entries
|
||||
.getEntryAt(transactionId.toLong)
|
||||
.collect[(Long, LedgerEntry.Transaction)] {
|
||||
case t: LedgerEntry.Transaction =>
|
||||
(transactionId.toLong, t) // the transaction id is also the offset
|
||||
})
|
||||
}
|
||||
|
@ -52,4 +52,7 @@ private[ledger] class LedgerEntries[T](identify: T => String) {
|
||||
def ledgerBeginning: Long = 0L
|
||||
|
||||
def ledgerEnd: Long = state.get().ledgerEnd
|
||||
|
||||
def getEntryAt(offset: Long): Option[T] =
|
||||
state.get.items.get(offset)
|
||||
}
|
||||
|
@ -16,11 +16,12 @@ import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, ContractId}
|
||||
import com.digitalasset.ledger.backend.api.v1.{
|
||||
RejectionReason,
|
||||
SubmissionResult,
|
||||
TransactionId,
|
||||
TransactionSubmission
|
||||
}
|
||||
import com.digitalasset.platform.akkastreams.dispatcher.Dispatcher
|
||||
import com.digitalasset.platform.akkastreams.dispatcher.SubSource.RangeSource
|
||||
import com.digitalasset.platform.common.util.{DirectExecutionContext => DE}
|
||||
import com.digitalasset.platform.common.util.{DirectExecutionContext => DEC}
|
||||
import com.digitalasset.platform.sandbox.config.LedgerIdGenerator
|
||||
import com.digitalasset.platform.sandbox.metrics.MetricsManager
|
||||
import com.digitalasset.platform.sandbox.services.transaction.SandboxEventIdFormatter
|
||||
@ -72,7 +73,7 @@ object SqlLedger {
|
||||
startMode: SqlStartMode = SqlStartMode.ContinueIfExists)(
|
||||
implicit mat: Materializer,
|
||||
mm: MetricsManager): Future[Ledger] = {
|
||||
implicit val ec: ExecutionContext = DE
|
||||
implicit val ec: ExecutionContext = DEC
|
||||
|
||||
val dbDispatcher = DbDispatcher(jdbcUrl, noOfShortLivedConnections, noOfStreamingConnections)
|
||||
val ledgerDao = LedgerDao.metered(
|
||||
@ -125,7 +126,7 @@ private class SqlLedger(
|
||||
val checkpointQueue = Source.queue[Long => LedgerEntry](1, OverflowStrategy.dropHead)
|
||||
val persistenceQueue = Source.queue[Long => LedgerEntry](queueDepth, OverflowStrategy.dropNew)
|
||||
|
||||
implicit val ec: ExecutionContext = DE
|
||||
implicit val ec: ExecutionContext = DEC
|
||||
|
||||
val mergedSources = Source.fromGraph(GraphDSL.create(checkpointQueue, persistenceQueue) {
|
||||
case (q1Mat, q2Mat) =>
|
||||
@ -155,13 +156,13 @@ private class SqlLedger(
|
||||
val offset = startOffset + i
|
||||
ledgerDao
|
||||
.storeLedgerEntry(offset, offset + 1, ledgerEntryGen(offset))
|
||||
.map(_ => ())(DE)
|
||||
.map(_ => ())(DEC)
|
||||
})
|
||||
.map { _ =>
|
||||
//note that we can have holes in offsets in case of the storing of an entry failed for some reason
|
||||
headRef = startOffset + queue.length //updating the headRef
|
||||
dispatcher.signalNewHead(headRef) //signalling downstream subscriptions
|
||||
}(DE)
|
||||
}(DEC)
|
||||
}
|
||||
.toMat(Sink.ignore)(
|
||||
Keep.left[
|
||||
@ -180,7 +181,7 @@ private class SqlLedger(
|
||||
private def loadStartingState(ledgerEntries: immutable.Seq[LedgerEntry]): Future[Unit] =
|
||||
if (ledgerEntries.nonEmpty) {
|
||||
logger.info("initializing ledger with scenario output")
|
||||
implicit val ec: ExecutionContext = DE
|
||||
implicit val ec: ExecutionContext = DEC
|
||||
//ledger entries must be persisted via the transactionQueue!
|
||||
val fDone = Source(ledgerEntries)
|
||||
.mapAsync(1) { ledgerEntry =>
|
||||
@ -204,13 +205,13 @@ private class SqlLedger(
|
||||
override def snapshot(): Future[LedgerSnapshot] =
|
||||
//TODO (robert): SQL DAO does not know about ActiveContract, this method does a (trivial) mapping from DAO Contract to Ledger ActiveContract. Intended? The DAO layer was introduced its own Contract abstraction so it can also reason read archived ones if it's needed. In hindsight, this might be necessary at all so we could probably collapse the two
|
||||
ledgerDao.getActiveContractSnapshot
|
||||
.map(s => LedgerSnapshot(s.offset, s.acs.map(c => (c.contractId, c.toActiveContract))))(DE)
|
||||
.map(s => LedgerSnapshot(s.offset, s.acs.map(c => (c.contractId, c.toActiveContract))))(DEC)
|
||||
|
||||
override def lookupContract(
|
||||
contractId: Value.AbsoluteContractId): Future[Option[ActiveContract]] =
|
||||
ledgerDao
|
||||
.lookupActiveContract(contractId)
|
||||
.map(_.map(c => c.toActiveContract))(DE)
|
||||
.map(_.map(c => c.toActiveContract))(DEC)
|
||||
|
||||
override def lookupKey(key: Node.GlobalKey): Future[Option[AbsoluteContractId]] =
|
||||
ledgerDao.lookupKey(key)
|
||||
@ -218,7 +219,7 @@ private class SqlLedger(
|
||||
override def publishHeartbeat(time: Instant): Future[Unit] =
|
||||
checkpointQueue
|
||||
.offer(_ => LedgerEntry.Checkpoint(time))
|
||||
.map(_ => ())(DE) //this never pushes back, see createQueues above!
|
||||
.map(_ => ())(DEC) //this never pushes back, see createQueues above!
|
||||
|
||||
override def publishTransaction(tx: TransactionSubmission): Future[SubmissionResult] =
|
||||
enqueue { offset =>
|
||||
@ -277,8 +278,18 @@ private class SqlLedger(
|
||||
Failure(new IllegalStateException("queue closed"))
|
||||
case Success(QueueOfferResult.Failure(e)) => Failure(e)
|
||||
case Failure(f) => Failure(f)
|
||||
}(DE)
|
||||
}(DEC)
|
||||
}
|
||||
|
||||
override def lookupTransaction(
|
||||
transactionId: TransactionId): Future[Option[(Long, LedgerEntry.Transaction)]] =
|
||||
ledgerDao
|
||||
.lookupLedgerEntry(transactionId.toLong)
|
||||
.map(_.collect[(Long, LedgerEntry.Transaction)] {
|
||||
case t: LedgerEntry.Transaction =>
|
||||
(transactionId.toLong, t) // the transaction is also the offset
|
||||
})(DEC)
|
||||
|
||||
}
|
||||
|
||||
private class SqlLedgerFactory(ledgerDao: LedgerDao) {
|
||||
@ -303,7 +314,7 @@ private class SqlLedgerFactory(ledgerDao: LedgerDao) {
|
||||
startMode: SqlStartMode,
|
||||
queueDepth: Int)(implicit mat: Materializer): Future[SqlLedger] = {
|
||||
@SuppressWarnings(Array("org.wartremover.warts.ExplicitImplicitTypes"))
|
||||
implicit val ec = DE
|
||||
implicit val ec = DEC
|
||||
|
||||
def init() = startMode match {
|
||||
case AlwaysReset =>
|
||||
@ -336,8 +347,8 @@ private class SqlLedgerFactory(ledgerDao: LedgerDao) {
|
||||
logger.error(errorMsg)
|
||||
sys.error(errorMsg)
|
||||
case None =>
|
||||
doInit(initialId).map(_ => initialId)(DE)
|
||||
}(DE)
|
||||
doInit(initialId).map(_ => initialId)(DEC)
|
||||
}(DEC)
|
||||
|
||||
case None =>
|
||||
logger.info("No ledger id given. Looking for existing ledger in database.")
|
||||
@ -347,8 +358,8 @@ private class SqlLedgerFactory(ledgerDao: LedgerDao) {
|
||||
case Some(foundLedgerId) => ledgerFound(foundLedgerId)
|
||||
case None =>
|
||||
val randomLedgerId = LedgerIdGenerator.generateRandomId()
|
||||
doInit(randomLedgerId).map(_ => randomLedgerId)(DE)
|
||||
}(DE)
|
||||
doInit(randomLedgerId).map(_ => randomLedgerId)(DEC)
|
||||
}(DEC)
|
||||
}
|
||||
|
||||
private def ledgerFound(foundLedgerId: String) = {
|
||||
|
@ -11,7 +11,7 @@ import org.scalatest.{Matchers, WordSpec}
|
||||
class EventIdFormatterSpec extends WordSpec with Matchers with ScalaFutures {
|
||||
|
||||
"EventIdFormatter" should {
|
||||
val transactionId = "SOME_TRANSACTION_ID"
|
||||
val transactionId = "42"
|
||||
val index: Transaction.NodeId = Transaction.NodeId.unsafeFromIndex(42)
|
||||
val referenceEventID = s"#$transactionId:${index.index}"
|
||||
|
||||
@ -21,7 +21,7 @@ class EventIdFormatterSpec extends WordSpec with Matchers with ScalaFutures {
|
||||
|
||||
"split an eventId into a transactionId and an index" in {
|
||||
SandboxEventIdFormatter.split(referenceEventID) should equal(
|
||||
Some(TransactionIdWithIndex(transactionId, index)))
|
||||
Some(TransactionIdWithIndex(transactionId.toLong, index)))
|
||||
}
|
||||
|
||||
"return None when parsing an invalid argument" in {
|
||||
|
Loading…
Reference in New Issue
Block a user