mirror of
https://github.com/digital-asset/daml.git
synced 2024-11-04 00:36:58 +03:00
fix duplicate contract ids when loading scenarios with pass
(#1105)
* rename --jdbcurl to --postgres-backend * increment ledger end correctly when loading from scenarios fixes #1079
This commit is contained in:
parent
99a7b24b3b
commit
186cf14729
@ -65,6 +65,14 @@ Java Bindings
|
||||
|
||||
See `issue #1092 <https://github.com/digital-asset/daml/issues/1092>`__ for details.
|
||||
|
||||
Ledger
|
||||
~~~~~~
|
||||
|
||||
|
||||
- Renamed ``--jdbcurl`` to ``--sql-backend-jdbcurl``. Left ``--jdbcurl`` in place for backwards compat.
|
||||
- Fixed issue when loading scenarios making use of ``pass`` into the sandbox, see
|
||||
`#1079 <https://github.com/digital-asset/daml/pull/1079>`_.
|
||||
|
||||
.. _release-0-12-17:
|
||||
|
||||
0.12.17 - 2019-05-10
|
||||
|
@ -39,7 +39,7 @@ To set this up, you must:
|
||||
|
||||
This is because Sandbox manages its own database schema, applying migrations if necessary when upgrading versions.
|
||||
|
||||
To start Sandbox using persistence, pass an ``--jdbcurl <value>`` option, where ``<value>`` is a valid jdbc url containing the username, password and database name to connect to.
|
||||
To start Sandbox using persistence, pass an ``--postgres-backend <value>`` option, where ``<value>`` is a valid jdbc url containing the username, password and database name to connect to.
|
||||
|
||||
Here is an example for such a url: ``jdbc:postgresql://localhost/test?user=fred&password=secret``
|
||||
|
||||
|
@ -9,6 +9,7 @@ import akka.actor.ActorSystem
|
||||
import akka.stream.ActorMaterializer
|
||||
import akka.stream.scaladsl.{Sink, Source}
|
||||
import com.digitalasset.api.util.TimeProvider
|
||||
import com.digitalasset.daml.lf.data.ImmArray
|
||||
import com.digitalasset.daml.lf.engine.Engine
|
||||
import com.digitalasset.ledger.server.LedgerApiServer.LedgerApiServer
|
||||
import com.digitalasset.platform.sandbox.banner.Banner
|
||||
@ -16,6 +17,7 @@ import com.digitalasset.platform.sandbox.config.{SandboxConfig, SandboxContext}
|
||||
import com.digitalasset.platform.sandbox.metrics.MetricsManager
|
||||
import com.digitalasset.platform.sandbox.services.SandboxResetService
|
||||
import com.digitalasset.platform.sandbox.stores.ActiveContractsInMemory
|
||||
import com.digitalasset.platform.sandbox.stores.ledger.ScenarioLoader.LedgerEntryWithLedgerEndIncrement
|
||||
import com.digitalasset.platform.sandbox.stores.ledger._
|
||||
import com.digitalasset.platform.sandbox.stores.ledger.sql.SqlStartMode
|
||||
import com.digitalasset.platform.server.services.testing.TimeServiceBackend
|
||||
@ -191,11 +193,10 @@ object SandboxApplication {
|
||||
}
|
||||
|
||||
// if requested, initialize the ledger state with the given scenario
|
||||
private def createInitialState(
|
||||
config: SandboxConfig,
|
||||
context: SandboxContext): (ActiveContractsInMemory, Seq[LedgerEntry], Option[Instant]) =
|
||||
private def createInitialState(config: SandboxConfig, context: SandboxContext)
|
||||
: (ActiveContractsInMemory, ImmArray[LedgerEntryWithLedgerEndIncrement], Option[Instant]) =
|
||||
config.scenario match {
|
||||
case None => (ActiveContractsInMemory.empty, Nil, None)
|
||||
case None => (ActiveContractsInMemory.empty, ImmArray.empty, None)
|
||||
case Some(scenario) =>
|
||||
val (acs, records, ledgerTime) =
|
||||
ScenarioLoader.fromScenario(context.packageContainer, scenario)
|
||||
|
@ -68,7 +68,8 @@ object Cli {
|
||||
.action((x, c) => c.copy(scenario = Some(x)))
|
||||
.text(
|
||||
"If set, the sandbox will execute the given scenario on startup and store all the contracts created by it. " +
|
||||
"Two formats are supported: Module.Name:Entity.Name (preferred) and Module.Name.Entity.Name (deprecated, will print a warning when used).")
|
||||
"Note that when using --postgres-backend the scenario will be ran only if starting from a fresh database, _not_ when resuming from an existing one. " +
|
||||
"Two identifier formats are supported: Module.Name:Entity.Name (preferred) and Module.Name.Entity.Name (deprecated, will print a warning when used).")
|
||||
|
||||
arg[File]("<archive>...")
|
||||
.unbounded()
|
||||
@ -102,7 +103,12 @@ object Cli {
|
||||
|
||||
opt[String]("jdbcurl")
|
||||
.optional()
|
||||
.text("The JDBC connection URL to a Postgres database containing the username and password as well. If missing the Sandbox will use an in memory store.")
|
||||
.text("This flag is deprecated -- please use --sql-backend-jdbcurl.")
|
||||
.action((url, config) => config.copy(jdbcUrl = Some(url)))
|
||||
|
||||
opt[String]("sql-backend-jdbcurl")
|
||||
.optional()
|
||||
.text("The JDBC connection URL to a Postgres database containing the username and password as well. If present, the Sandbox will use the database to persist its data.")
|
||||
.action((url, config) => config.copy(jdbcUrl = Some(url)))
|
||||
|
||||
//TODO (robert): Think about all implications of allowing users to set the ledger ID.
|
||||
|
@ -9,6 +9,7 @@ import akka.NotUsed
|
||||
import akka.stream.Materializer
|
||||
import akka.stream.scaladsl.Source
|
||||
import com.digitalasset.api.util.TimeProvider
|
||||
import com.digitalasset.daml.lf.data.ImmArray
|
||||
import com.digitalasset.daml.lf.transaction.Node.GlobalKey
|
||||
import com.digitalasset.daml.lf.value.Value
|
||||
import com.digitalasset.daml.lf.value.Value.AbsoluteContractId
|
||||
@ -20,10 +21,10 @@ import com.digitalasset.ledger.backend.api.v1.{
|
||||
import com.digitalasset.platform.sandbox.metrics.MetricsManager
|
||||
import com.digitalasset.platform.sandbox.stores.ActiveContracts.ActiveContract
|
||||
import com.digitalasset.platform.sandbox.stores.ActiveContractsInMemory
|
||||
import com.digitalasset.platform.sandbox.stores.ledger.ScenarioLoader.LedgerEntryWithLedgerEndIncrement
|
||||
import com.digitalasset.platform.sandbox.stores.ledger.inmemory.InMemoryLedger
|
||||
import com.digitalasset.platform.sandbox.stores.ledger.sql.{SqlLedger, SqlStartMode}
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.Future
|
||||
|
||||
/** Defines all the functionalities a Ledger needs to provide */
|
||||
@ -66,7 +67,7 @@ object Ledger {
|
||||
ledgerId: String,
|
||||
timeProvider: TimeProvider,
|
||||
acs: ActiveContractsInMemory,
|
||||
ledgerEntries: Seq[LedgerEntry]): Ledger =
|
||||
ledgerEntries: ImmArray[LedgerEntryWithLedgerEndIncrement]): Ledger =
|
||||
new InMemoryLedger(ledgerId, timeProvider, acs, ledgerEntries)
|
||||
|
||||
/**
|
||||
@ -84,18 +85,11 @@ object Ledger {
|
||||
jdbcUrl: String,
|
||||
ledgerId: String,
|
||||
timeProvider: TimeProvider,
|
||||
ledgerEntries: Seq[LedgerEntry],
|
||||
ledgerEntries: ImmArray[LedgerEntryWithLedgerEndIncrement],
|
||||
queueDepth: Int,
|
||||
startMode: SqlStartMode
|
||||
)(implicit mat: Materializer, mm: MetricsManager): Future[Ledger] =
|
||||
//TODO (robert): casting from Seq to immutable.Seq, make ledgerEntries immutable throughout the Sandbox?
|
||||
SqlLedger(
|
||||
jdbcUrl,
|
||||
Some(ledgerId),
|
||||
timeProvider,
|
||||
immutable.Seq(ledgerEntries: _*),
|
||||
queueDepth,
|
||||
startMode)
|
||||
SqlLedger(jdbcUrl, Some(ledgerId), timeProvider, ledgerEntries, queueDepth, startMode)
|
||||
|
||||
/** Wraps the given Ledger adding metrics around important calls */
|
||||
def metered(ledger: Ledger)(implicit mm: MetricsManager): Ledger = MeteredLedger(ledger)
|
||||
|
@ -6,8 +6,7 @@ package com.digitalasset.platform.sandbox.stores.ledger
|
||||
import java.time.Instant
|
||||
|
||||
import com.digitalasset.daml.lf.PureCompiledPackages
|
||||
import com.digitalasset.daml.lf.data.Ref
|
||||
import com.digitalasset.daml.lf.data.Time
|
||||
import com.digitalasset.daml.lf.data._
|
||||
import com.digitalasset.daml.lf.engine.DeprecatedIdentifier
|
||||
import com.digitalasset.daml.lf.lfpackage.Ast
|
||||
import com.digitalasset.daml.lf.lfpackage.Ast.{DDataType, DValue, Definition}
|
||||
@ -25,14 +24,31 @@ import scala.collection.breakOut
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scalaz.syntax.std.map._
|
||||
|
||||
import scala.annotation.tailrec
|
||||
|
||||
object ScenarioLoader {
|
||||
private val logger = LoggerFactory.getLogger(this.getClass)
|
||||
|
||||
def fromScenario(
|
||||
packages: DamlPackageContainer,
|
||||
scenario: String): (ActiveContractsInMemory, Seq[LedgerEntry], Instant) = {
|
||||
/** When loading from the scenario, we also specify by how much to bump the
|
||||
* ledger end after each entry. This is because in the scenario transaction
|
||||
* ids there might be "gaps" due to passTime instructions (and possibly
|
||||
* others in the future).
|
||||
*
|
||||
* Note that this matters because our ledger implementation typically derive
|
||||
* the transaction ids form the ledger end. So, the ledger end must be
|
||||
* greater than the latest transaction id produced by the scenario runner,
|
||||
* otherwise we'll get duplicates. See
|
||||
* <https://github.com/digital-asset/daml/issues/1079>.
|
||||
*/
|
||||
case class LedgerEntryWithLedgerEndIncrement(entry: LedgerEntry, increment: Long)
|
||||
|
||||
def fromScenario(packages: DamlPackageContainer, scenario: String)
|
||||
: (ActiveContractsInMemory, ImmArray[LedgerEntryWithLedgerEndIncrement], Instant) = {
|
||||
val (scenarioLedger, scenarioRef) = buildScenarioLedger(packages, scenario)
|
||||
val ledgerEntries = new ArrayBuffer[LedgerEntry](scenarioLedger.scenarioSteps.size)
|
||||
// we store the tx id since later we need to recover how much to bump the
|
||||
// ledger end by, and here the transaction id _is_ the ledger end.
|
||||
val ledgerEntries =
|
||||
new ArrayBuffer[(TransactionId, LedgerEntry)](scenarioLedger.scenarioSteps.size)
|
||||
type Acc = (ActiveContractsInMemory, Time.Timestamp, Option[TransactionId])
|
||||
val (acs, time, txId) =
|
||||
scenarioLedger.scenarioSteps.iterator
|
||||
@ -40,8 +56,24 @@ object ScenarioLoader {
|
||||
case ((acs, time, mbOldTxId), (stepId @ _, step)) =>
|
||||
executeScenarioStep(ledgerEntries, scenarioRef, acs, time, mbOldTxId, stepId, step)
|
||||
}
|
||||
// increment the last transaction id returned by the ledger, since we start from there
|
||||
(acs, ledgerEntries, time.toInstant)
|
||||
// now decorate the entries with what the next increment is
|
||||
@tailrec
|
||||
def decorateWithIncrement(
|
||||
processed: BackStack[LedgerEntryWithLedgerEndIncrement],
|
||||
toProcess: ImmArray[(TransactionId, LedgerEntry)])
|
||||
: ImmArray[LedgerEntryWithLedgerEndIncrement] =
|
||||
toProcess match {
|
||||
case ImmArray() => processed.toImmArray
|
||||
// the last one just bumps by 1 -- it does not matter as long as it's
|
||||
// positive
|
||||
case ImmArrayCons((_, entry), ImmArray()) =>
|
||||
(processed :+ LedgerEntryWithLedgerEndIncrement(entry, 1)).toImmArray
|
||||
case ImmArrayCons((entryTxId, entry), entries @ ImmArrayCons((nextTxId, next), _)) =>
|
||||
decorateWithIncrement(
|
||||
processed :+ LedgerEntryWithLedgerEndIncrement(entry, (nextTxId - entryTxId).toLong),
|
||||
entries)
|
||||
}
|
||||
(acs, decorateWithIncrement(BackStack.empty, ImmArray(ledgerEntries)), time.toInstant)
|
||||
}
|
||||
|
||||
private def buildScenarioLedger(
|
||||
@ -146,7 +178,7 @@ object ScenarioLoader {
|
||||
}
|
||||
|
||||
private def executeScenarioStep(
|
||||
ledger: ArrayBuffer[LedgerEntry],
|
||||
ledger: ArrayBuffer[(TransactionId, LedgerEntry)],
|
||||
scenarioRef: Ref.DefinitionRef,
|
||||
acs: ActiveContractsInMemory,
|
||||
time: Time.Timestamp,
|
||||
@ -190,17 +222,20 @@ object ScenarioLoader {
|
||||
case (nid, parties) => (nodeIdWithHash(nid), parties)
|
||||
}
|
||||
ledger +=
|
||||
Transaction(
|
||||
transactionId,
|
||||
transactionId,
|
||||
"scenario-loader",
|
||||
richTransaction.committer,
|
||||
workflowId,
|
||||
time.toInstant,
|
||||
time.toInstant,
|
||||
recordTx,
|
||||
recordDisclosure.transform((_, v) => v.toSet[String])
|
||||
)
|
||||
(
|
||||
(
|
||||
txId,
|
||||
Transaction(
|
||||
transactionId,
|
||||
transactionId,
|
||||
"scenario-loader",
|
||||
richTransaction.committer,
|
||||
workflowId,
|
||||
time.toInstant,
|
||||
time.toInstant,
|
||||
recordTx,
|
||||
recordDisclosure.transform((_, v) => v.toSet[String])
|
||||
)))
|
||||
(newAcs, time, Some(txId))
|
||||
case Left(err) =>
|
||||
throw new RuntimeException(s"Error when augmenting acs at step $stepId: $err")
|
||||
|
@ -8,6 +8,7 @@ import java.time.Instant
|
||||
import akka.NotUsed
|
||||
import akka.stream.scaladsl.Source
|
||||
import com.digitalasset.api.util.TimeProvider
|
||||
import com.digitalasset.daml.lf.data.ImmArray
|
||||
import com.digitalasset.daml.lf.transaction.Node
|
||||
import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, ContractId}
|
||||
import com.digitalasset.ledger.api.domain.{ApplicationId, CommandId}
|
||||
@ -21,6 +22,7 @@ import com.digitalasset.platform.sandbox.services.transaction.SandboxEventIdForm
|
||||
import com.digitalasset.platform.sandbox.stores.{ActiveContracts, ActiveContractsInMemory}
|
||||
import com.digitalasset.platform.sandbox.stores.deduplicator.Deduplicator
|
||||
import com.digitalasset.platform.sandbox.stores.ledger.LedgerEntry.{Checkpoint, Rejection}
|
||||
import com.digitalasset.platform.sandbox.stores.ledger.ScenarioLoader.LedgerEntryWithLedgerEndIncrement
|
||||
import com.digitalasset.platform.sandbox.stores.ledger.{Ledger, LedgerEntry, LedgerSnapshot}
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
@ -33,14 +35,18 @@ class InMemoryLedger(
|
||||
val ledgerId: String,
|
||||
timeProvider: TimeProvider,
|
||||
acs0: ActiveContractsInMemory,
|
||||
ledgerEntries: Seq[LedgerEntry])
|
||||
ledgerEntries: ImmArray[LedgerEntryWithLedgerEndIncrement])
|
||||
extends Ledger {
|
||||
|
||||
private val logger = LoggerFactory.getLogger(this.getClass)
|
||||
|
||||
private val entries = {
|
||||
val l = new LedgerEntries[LedgerEntry](_.toString)
|
||||
ledgerEntries.foreach(l.publish)
|
||||
ledgerEntries.foreach {
|
||||
case LedgerEntryWithLedgerEndIncrement(entry, increment) =>
|
||||
l.publishWithLedgerEndIncrement(entry, increment)
|
||||
()
|
||||
}
|
||||
l
|
||||
}
|
||||
|
||||
|
@ -22,9 +22,11 @@ private[ledger] class LedgerEntries[T](identify: T => String) {
|
||||
// Tuple of (ledger end cursor, ledger map). There is never an entry for the initial cursor. End is inclusive.
|
||||
private val state = new AtomicReference(Entries(ledgerBeginning, TreeMap.empty))
|
||||
|
||||
private def store(item: T): Long = {
|
||||
private def store(item: T, increment: Long): Long = {
|
||||
require(increment >= 1, s"Non-positive ledger increment $increment")
|
||||
val Entries(newOffset, _) = state.updateAndGet({
|
||||
case Entries(ledgerEnd, ledger) => Entries(ledgerEnd + 1, ledger + (ledgerEnd -> item))
|
||||
case Entries(ledgerEnd, ledger) =>
|
||||
Entries(ledgerEnd + increment, ledger + (ledgerEnd -> item))
|
||||
})
|
||||
if (logger.isTraceEnabled())
|
||||
logger.trace("Recording `{}` at offset `{}`", identify(item), newOffset)
|
||||
@ -43,8 +45,12 @@ private[ledger] class LedgerEntries[T](identify: T => String) {
|
||||
def getSource(offset: Option[Long]): Source[(Long, T), NotUsed] =
|
||||
dispatcher.startingAt(offset.getOrElse(ledgerBeginning))
|
||||
|
||||
def publish(item: T): Long = {
|
||||
val newHead = store(item)
|
||||
def publish(item: T): Long =
|
||||
publishWithLedgerEndIncrement(item, 1)
|
||||
|
||||
/** Use this if you want to bump the ledger end by some non-standard amount. */
|
||||
def publishWithLedgerEndIncrement(item: T, increment: Long): Long = {
|
||||
val newHead = store(item, increment)
|
||||
dispatcher.signalNewHead(newHead)
|
||||
newHead
|
||||
}
|
||||
|
@ -10,6 +10,7 @@ import akka.stream.scaladsl.{GraphDSL, Keep, MergePreferred, Sink, Source, Sourc
|
||||
import akka.stream.{Materializer, OverflowStrategy, QueueOfferResult, SourceShape}
|
||||
import akka.{Done, NotUsed}
|
||||
import com.digitalasset.api.util.TimeProvider
|
||||
import com.digitalasset.daml.lf.data.{ImmArray, ImmArrayCons}
|
||||
import com.digitalasset.daml.lf.transaction.Node
|
||||
import com.digitalasset.daml.lf.value.Value
|
||||
import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, ContractId}
|
||||
@ -26,6 +27,7 @@ import com.digitalasset.platform.sandbox.config.LedgerIdGenerator
|
||||
import com.digitalasset.platform.sandbox.metrics.MetricsManager
|
||||
import com.digitalasset.platform.sandbox.services.transaction.SandboxEventIdFormatter
|
||||
import com.digitalasset.platform.sandbox.stores.ActiveContracts.ActiveContract
|
||||
import com.digitalasset.platform.sandbox.stores.ledger.ScenarioLoader.LedgerEntryWithLedgerEndIncrement
|
||||
import com.digitalasset.platform.sandbox.stores.ledger.sql.SqlStartMode.{
|
||||
AlwaysReset,
|
||||
ContinueIfExists
|
||||
@ -41,7 +43,7 @@ import com.digitalasset.platform.sandbox.stores.ledger.sql.util.DbDispatcher
|
||||
import com.digitalasset.platform.sandbox.stores.ledger.{Ledger, LedgerEntry, LedgerSnapshot}
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.immutable.Queue
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.util.{Failure, Success}
|
||||
@ -68,7 +70,7 @@ object SqlLedger {
|
||||
jdbcUrl: String,
|
||||
ledgerId: Option[String],
|
||||
timeProvider: TimeProvider,
|
||||
ledgerEntries: immutable.Seq[LedgerEntry],
|
||||
initialLedgerEntries: ImmArray[LedgerEntryWithLedgerEndIncrement],
|
||||
queueDepth: Int,
|
||||
startMode: SqlStartMode = SqlStartMode.ContinueIfExists)(
|
||||
implicit mat: Materializer,
|
||||
@ -86,10 +88,12 @@ object SqlLedger {
|
||||
|
||||
val sqlLedgerFactory = SqlLedgerFactory(ledgerDao)
|
||||
|
||||
for {
|
||||
sqlLedger <- sqlLedgerFactory.createSqlLedger(ledgerId, timeProvider, startMode, queueDepth)
|
||||
_ <- sqlLedger.loadStartingState(ledgerEntries)
|
||||
} yield sqlLedger
|
||||
sqlLedgerFactory.createSqlLedger(
|
||||
ledgerId,
|
||||
timeProvider,
|
||||
startMode,
|
||||
initialLedgerEntries,
|
||||
queueDepth)
|
||||
}
|
||||
}
|
||||
|
||||
@ -113,6 +117,7 @@ private class SqlLedger(
|
||||
|
||||
@volatile
|
||||
private var headRef: Long = headAtInitialization
|
||||
|
||||
// the reason for modelling persistence as a reactive pipeline is to avoid having race-conditions between the
|
||||
// moving ledger-end, the async persistence operation and the dispatcher head notification
|
||||
private val (checkpointQueue, persistenceQueue): (
|
||||
@ -178,24 +183,6 @@ private class SqlLedger(
|
||||
ledgerDao.close()
|
||||
}
|
||||
|
||||
private def loadStartingState(ledgerEntries: immutable.Seq[LedgerEntry]): Future[Unit] =
|
||||
if (ledgerEntries.nonEmpty) {
|
||||
logger.info("initializing ledger with scenario output")
|
||||
implicit val ec: ExecutionContext = DEC
|
||||
//ledger entries must be persisted via the transactionQueue!
|
||||
val fDone = Source(ledgerEntries)
|
||||
.mapAsync(1) { ledgerEntry =>
|
||||
enqueue(_ => ledgerEntry)
|
||||
}
|
||||
.runWith(Sink.ignore)
|
||||
|
||||
// Note: the active contract set stored in the SQL database is updated through the insertion of ledger entries.
|
||||
// The given active contract set is ignored.
|
||||
for {
|
||||
_ <- fDone
|
||||
} yield ()
|
||||
} else Future.successful(())
|
||||
|
||||
override def ledgerEntries(offset: Option[Long]): Source[(Long, LedgerEntry), NotUsed] = {
|
||||
dispatcher.startingAt(offset.getOrElse(0))
|
||||
}
|
||||
@ -304,6 +291,8 @@ private class SqlLedgerFactory(ledgerDao: LedgerDao) {
|
||||
* be equal to the one in the database.
|
||||
* @param timeProvider to get the current time when sequencing transactions
|
||||
* @param startMode whether we should start with a clean state or continue where we left off
|
||||
* @param initialLedgerEntries The initial ledger entries -- usually provided by the scenario runner. Will only be
|
||||
* used if starting from a fresh database.
|
||||
* @param queueDepth the depth of the buffer for persisting entries. When gets full, the system will signal back-pressure
|
||||
* upstream
|
||||
* @return a compliant Ledger implementation
|
||||
@ -312,6 +301,7 @@ private class SqlLedgerFactory(ledgerDao: LedgerDao) {
|
||||
initialLedgerId: Option[String],
|
||||
timeProvider: TimeProvider,
|
||||
startMode: SqlStartMode,
|
||||
initialLedgerEntries: ImmArray[LedgerEntryWithLedgerEndIncrement],
|
||||
queueDepth: Int)(implicit mat: Materializer): Future[SqlLedger] = {
|
||||
@SuppressWarnings(Array("org.wartremover.warts.ExplicitImplicitTypes"))
|
||||
implicit val ec = DEC
|
||||
@ -320,9 +310,9 @@ private class SqlLedgerFactory(ledgerDao: LedgerDao) {
|
||||
case AlwaysReset =>
|
||||
for {
|
||||
_ <- reset()
|
||||
ledgerId <- initialize(initialLedgerId)
|
||||
ledgerId <- initialize(initialLedgerId, initialLedgerEntries)
|
||||
} yield ledgerId
|
||||
case ContinueIfExists => initialize(initialLedgerId)
|
||||
case ContinueIfExists => initialize(initialLedgerId, initialLedgerEntries)
|
||||
}
|
||||
|
||||
for {
|
||||
@ -334,32 +324,71 @@ private class SqlLedgerFactory(ledgerDao: LedgerDao) {
|
||||
private def reset(): Future[Unit] =
|
||||
ledgerDao.reset()
|
||||
|
||||
private def initialize(initialLedgerId: Option[String]): Future[String] = initialLedgerId match {
|
||||
case Some(initialId) =>
|
||||
ledgerDao
|
||||
.lookupLedgerId()
|
||||
.flatMap {
|
||||
case Some(foundLedgerId) if (foundLedgerId == initialId) =>
|
||||
ledgerFound(foundLedgerId)
|
||||
case Some(foundLedgerId) =>
|
||||
val errorMsg =
|
||||
s"Ledger id mismatch. Ledger id given ('$initialId') is not equal to the existing one ('$foundLedgerId')!"
|
||||
logger.error(errorMsg)
|
||||
sys.error(errorMsg)
|
||||
case None =>
|
||||
doInit(initialId).map(_ => initialId)(DEC)
|
||||
}(DEC)
|
||||
private def initialize(
|
||||
initialLedgerId: Option[String],
|
||||
initialLedgerEntries: ImmArray[LedgerEntryWithLedgerEndIncrement]): Future[String] = {
|
||||
// Note that here we only store the ledger entry and we do not update anything else, such as the
|
||||
// headRef. We also are not concerns with heartbeats / checkpoints. This is OK since this initialization
|
||||
// step happens before we start up the sql ledger at all, so it's running in isolation.
|
||||
@tailrec
|
||||
def processInitialLedgerEntries(
|
||||
prevResult: Future[Unit],
|
||||
ledgerEnd: Long,
|
||||
entries: ImmArray[LedgerEntryWithLedgerEndIncrement]): Future[Unit] =
|
||||
entries match {
|
||||
case ImmArray() =>
|
||||
prevResult
|
||||
case ImmArrayCons(LedgerEntryWithLedgerEndIncrement(entry, increment), entries) =>
|
||||
processInitialLedgerEntries(
|
||||
prevResult.flatMap { _ =>
|
||||
ledgerDao
|
||||
.storeLedgerEntry(ledgerEnd, ledgerEnd + increment, entry)
|
||||
.map { _ =>
|
||||
()
|
||||
}(DEC)
|
||||
}(DEC),
|
||||
ledgerEnd + increment,
|
||||
entries
|
||||
)
|
||||
}
|
||||
|
||||
case None =>
|
||||
logger.info("No ledger id given. Looking for existing ledger in database.")
|
||||
ledgerDao
|
||||
.lookupLedgerId()
|
||||
.flatMap {
|
||||
case Some(foundLedgerId) => ledgerFound(foundLedgerId)
|
||||
case None =>
|
||||
val randomLedgerId = LedgerIdGenerator.generateRandomId()
|
||||
doInit(randomLedgerId).map(_ => randomLedgerId)(DEC)
|
||||
}(DEC)
|
||||
initialLedgerId match {
|
||||
case Some(initialId) =>
|
||||
ledgerDao
|
||||
.lookupLedgerId()
|
||||
.flatMap {
|
||||
case Some(foundLedgerId) if (foundLedgerId == initialId) =>
|
||||
if (initialLedgerEntries.nonEmpty) {
|
||||
logger.warn(
|
||||
s"Initial ledger entries provided, presumably from scenario, but I'm picking up from an existing database, and thus they will not be used")
|
||||
}
|
||||
ledgerFound(foundLedgerId)
|
||||
case Some(foundLedgerId) =>
|
||||
val errorMsg =
|
||||
s"Ledger id mismatch. Ledger id given ('$initialId') is not equal to the existing one ('$foundLedgerId')!"
|
||||
logger.error(errorMsg)
|
||||
sys.error(errorMsg)
|
||||
case None =>
|
||||
if (initialLedgerEntries.nonEmpty) {
|
||||
logger.info(
|
||||
s"Initializing ledger with ${initialLedgerEntries.length} ledger entries")
|
||||
}
|
||||
processInitialLedgerEntries(doInit(initialId), 0, initialLedgerEntries).map { _ =>
|
||||
initialId
|
||||
}(DEC)
|
||||
}(DEC)
|
||||
|
||||
case None =>
|
||||
logger.info("No ledger id given. Looking for existing ledger in database.")
|
||||
ledgerDao
|
||||
.lookupLedgerId()
|
||||
.flatMap {
|
||||
case Some(foundLedgerId) => ledgerFound(foundLedgerId)
|
||||
case None =>
|
||||
val randomLedgerId = LedgerIdGenerator.generateRandomId()
|
||||
doInit(randomLedgerId).map(_ => randomLedgerId)(DEC)
|
||||
}(DEC)
|
||||
}
|
||||
}
|
||||
|
||||
private def ledgerFound(foundLedgerId: String) = {
|
||||
|
@ -10,7 +10,9 @@ import com.digitalasset.platform.sandbox.metrics.MetricsManager
|
||||
import com.digitalasset.platform.sandbox.persistence.{PostgresFixture, PostgresResource}
|
||||
import com.digitalasset.platform.sandbox.stores.ActiveContractsInMemory
|
||||
import com.digitalasset.platform.sandbox.stores.ledger.sql.SqlStartMode
|
||||
import com.digitalasset.platform.sandbox.stores.ledger.{Ledger, LedgerEntry}
|
||||
import com.digitalasset.platform.sandbox.stores.ledger.Ledger
|
||||
import com.digitalasset.daml.lf.data.ImmArray
|
||||
import com.digitalasset.platform.sandbox.stores.ledger.ScenarioLoader.LedgerEntryWithLedgerEndIncrement
|
||||
|
||||
import scala.concurrent.{Await, Future}
|
||||
import scala.concurrent.duration._
|
||||
@ -31,7 +33,7 @@ object LedgerResource {
|
||||
ledgerId: String,
|
||||
timeProvider: TimeProvider,
|
||||
acs: ActiveContractsInMemory = ActiveContractsInMemory.empty,
|
||||
entries: Seq[LedgerEntry] = Nil): Resource[Ledger] =
|
||||
entries: ImmArray[LedgerEntryWithLedgerEndIncrement] = ImmArray.empty): Resource[Ledger] =
|
||||
LedgerResource.resource(
|
||||
() =>
|
||||
Future.successful(
|
||||
@ -61,7 +63,7 @@ object LedgerResource {
|
||||
postgres.value.jdbcUrl,
|
||||
ledgerId,
|
||||
timeProvider,
|
||||
Nil,
|
||||
ImmArray.empty,
|
||||
128,
|
||||
SqlStartMode.AlwaysReset))
|
||||
ledger.setup()
|
||||
|
@ -3,6 +3,9 @@
|
||||
|
||||
package com.digitalasset.platform.sandbox
|
||||
|
||||
import java.time.Instant
|
||||
import java.time.temporal.ChronoUnit
|
||||
|
||||
import akka.stream.scaladsl.Sink
|
||||
import com.digitalasset.ledger.api.testing.utils.MockMessages.transactionFilter
|
||||
import com.digitalasset.ledger.api.testing.utils.{
|
||||
@ -24,6 +27,7 @@ import com.digitalasset.ledger.client.services.acs.ActiveContractSetClient
|
||||
import com.digitalasset.ledger.client.services.commands.SynchronousCommandClient
|
||||
import com.digitalasset.ledger.client.services.transactions.TransactionClient
|
||||
import com.digitalasset.platform.sandbox.services.{SandboxFixture, TestCommands}
|
||||
import com.google.protobuf.timestamp.Timestamp
|
||||
import org.scalatest.concurrent.ScalaFutures
|
||||
import org.scalatest.time.{Millis, Span}
|
||||
import org.scalatest.{Matchers, Suite, WordSpec}
|
||||
@ -93,6 +97,18 @@ abstract class ScenarioLoadingITBase
|
||||
private def extractEvents(event: Event) =
|
||||
event.event.created.toSet
|
||||
|
||||
lazy val dummyRequest = {
|
||||
// we need to adjust the time of the request because we pass 10
|
||||
// days in the test scenario.
|
||||
val letInstant = Instant.EPOCH.plus(10, ChronoUnit.DAYS)
|
||||
val let = Timestamp(letInstant.getEpochSecond, letInstant.getNano)
|
||||
val mrt = Timestamp(let.seconds + 30L, let.nanos)
|
||||
dummyCommands(ledgerId, "commandId1").update(
|
||||
_.commands.ledgerEffectiveTime := let,
|
||||
_.commands.maximumRecordTime := mrt
|
||||
)
|
||||
}
|
||||
|
||||
"ScenarioLoading" when {
|
||||
|
||||
"contracts have been created" should {
|
||||
@ -110,7 +126,7 @@ abstract class ScenarioLoadingITBase
|
||||
lookForContract(events, templateIds.dummyWithParam)
|
||||
lookForContract(events, templateIds.dummyFactory)
|
||||
|
||||
resp.last should equal(GetActiveContractsResponse("3", "", Seq.empty, None))
|
||||
resp.last should equal(GetActiveContractsResponse("4", "", Seq.empty, None))
|
||||
}
|
||||
}
|
||||
|
||||
@ -135,20 +151,19 @@ abstract class ScenarioLoadingITBase
|
||||
}
|
||||
|
||||
"does not recycle contract ids" in {
|
||||
val req = dummyCommands(ledgerId, "commandId1")
|
||||
whenReady(submitRequest(SubmitAndWaitRequest(commands = req.commands))) { _ =>
|
||||
whenReady(submitRequest(SubmitAndWaitRequest(commands = dummyRequest.commands))) { _ =>
|
||||
whenReady(getSnapshot()) { resp =>
|
||||
val responses = resp.init // last response is just ledger offset
|
||||
val events = responses.flatMap(extractEvents)
|
||||
val contractIds = events.map(_.contractId).toSet
|
||||
contractIds shouldBe Set("#2:0", "#3:1", "#3:0", "#1:0", "#0:0", "#3:2")
|
||||
// note how we skip #1 because of the `pass` that is in the scenario.
|
||||
contractIds shouldBe Set("#2:0", "#4:2", "#3:0", "#4:1", "#0:0", "#4:0")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"event ids are the same as contract ids (ACS)" in {
|
||||
val req = dummyCommands(ledgerId, "commandId1")
|
||||
whenReady(submitRequest(SubmitAndWaitRequest(commands = req.commands))) { _ =>
|
||||
whenReady(submitRequest(SubmitAndWaitRequest(commands = dummyRequest.commands))) { _ =>
|
||||
whenReady(getSnapshot()) { resp =>
|
||||
val responses = resp.init // last response is just ledger offset
|
||||
val events = responses.flatMap(extractEvents)
|
||||
|
@ -8,6 +8,7 @@ import java.time.Instant
|
||||
|
||||
import akka.stream.ActorMaterializer
|
||||
import com.digitalasset.api.util.{TimeProvider, ToleranceWindow}
|
||||
import com.digitalasset.daml.lf.data.ImmArray
|
||||
import com.digitalasset.daml.lf.engine.Engine
|
||||
import com.digitalasset.platform.sandbox.config.DamlPackageContainer
|
||||
import com.digitalasset.platform.sandbox.services.SandboxSubmissionService
|
||||
@ -42,7 +43,7 @@ trait TestHelpers {
|
||||
"sandbox ledger",
|
||||
TimeProvider.Constant(Instant.EPOCH),
|
||||
ActiveContractsInMemory.empty,
|
||||
Nil)
|
||||
ImmArray.empty)
|
||||
SandboxSubmissionService.createApiService(
|
||||
damlPackageContainer,
|
||||
IdentifierResolver(pkgId => Future.successful(damlPackageContainer.getPackage(pkgId))),
|
||||
|
@ -101,11 +101,16 @@ class CliSpec extends WordSpec with Matchers {
|
||||
checkOption(Array(s"--ledgerid", ledgerId), _.copy(ledgerIdMode = Predefined(ledgerId)))
|
||||
}
|
||||
|
||||
"parse the jdbc url when given" in {
|
||||
"parse the jdbcurl (deprecated) when given" in {
|
||||
val jdbcUrl = "jdbc:postgresql://localhost:5432/test?user=test"
|
||||
checkOption(Array(s"--jdbcurl", jdbcUrl), _.copy(jdbcUrl = Some(jdbcUrl)))
|
||||
}
|
||||
|
||||
"parse the sql backend flag when given" in {
|
||||
val jdbcUrl = "jdbc:postgresql://localhost:5432/test?user=test"
|
||||
checkOption(Array(s"--sql-backend-jdbcurl", jdbcUrl), _.copy(jdbcUrl = Some(jdbcUrl)))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -22,13 +22,17 @@ trait PostgresAroundAll extends PostgresAround with BeforeAndAfterAll {
|
||||
self: org.scalatest.Suite =>
|
||||
|
||||
override protected def beforeAll(): Unit = {
|
||||
super.beforeAll()
|
||||
// we start pg before running the rest because _generally_ the database
|
||||
// needs to be up before everything else. this is relevant for
|
||||
// ScenarioLoadingITPostgres at least. we could much with the mixin
|
||||
// order but this was easier...
|
||||
postgresFixture = startEphemeralPg()
|
||||
super.beforeAll()
|
||||
}
|
||||
|
||||
override protected def afterAll(): Unit = {
|
||||
stopAndCleanUp(postgresFixture.tempDir, postgresFixture.dataDir)
|
||||
super.afterAll()
|
||||
stopAndCleanUp(postgresFixture.tempDir, postgresFixture.dataDir)
|
||||
}
|
||||
}
|
||||
|
||||
@ -36,13 +40,17 @@ trait PostgresAroundEach extends PostgresAround with BeforeAndAfterEach {
|
||||
self: org.scalatest.Suite =>
|
||||
|
||||
override protected def beforeEach(): Unit = {
|
||||
super.beforeEach()
|
||||
// we start pg before running the rest because _generally_ the database
|
||||
// needs to be up before everything else. this is relevant for
|
||||
// ScenarioLoadingITPostgres at least. we could much with the mixin
|
||||
// order but this was easier...
|
||||
postgresFixture = startEphemeralPg()
|
||||
super.beforeEach()
|
||||
}
|
||||
|
||||
override protected def afterEach(): Unit = {
|
||||
stopAndCleanUp(postgresFixture.tempDir, postgresFixture.dataDir)
|
||||
super.afterEach()
|
||||
stopAndCleanUp(postgresFixture.tempDir, postgresFixture.dataDir)
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -5,6 +5,8 @@
|
||||
daml 1.2
|
||||
module Test where
|
||||
|
||||
import DA.Time
|
||||
|
||||
template Dummy
|
||||
with
|
||||
operator : Party
|
||||
@ -400,6 +402,7 @@ testScenario : Scenario ()
|
||||
testScenario = do
|
||||
party <- getParty "party"
|
||||
submit party (create Dummy with operator = party)
|
||||
pass (days 10)
|
||||
submit party (create DummyWithParam with operator = party)
|
||||
submit party (create DummyFactory with operator = party)
|
||||
return ()
|
||||
|
@ -0,0 +1,12 @@
|
||||
// Copyright (c) 2019 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.digitalasset.platform.sandbox
|
||||
|
||||
import com.digitalasset.platform.sandbox.config.SandboxConfig
|
||||
import com.digitalasset.platform.sandbox.persistence.PostgresAroundEach
|
||||
|
||||
class ScenarioLoadingITPostgres extends ScenarioLoadingITBase with PostgresAroundEach {
|
||||
override def config: SandboxConfig = super.config.copy(jdbcUrl = Some(postgresFixture.jdbcUrl))
|
||||
override def scenario: Option[String] = Some("Test:testScenario")
|
||||
}
|
@ -7,6 +7,7 @@ import com.digitalasset.api.util.TimeProvider
|
||||
import com.digitalasset.ledger.api.testing.utils.AkkaBeforeAndAfterAll
|
||||
import com.digitalasset.platform.sandbox.MetricsAround
|
||||
import com.digitalasset.platform.sandbox.persistence.PostgresAroundEach
|
||||
import com.digitalasset.daml.lf.data.ImmArray
|
||||
import org.scalatest.concurrent.AsyncTimeLimitedTests
|
||||
import org.scalatest.time.Span
|
||||
import org.scalatest.{AsyncWordSpec, Matchers}
|
||||
@ -31,7 +32,7 @@ class SqlLedgerSpec
|
||||
jdbcUrl = postgresFixture.jdbcUrl,
|
||||
ledgerId = None,
|
||||
timeProvider = TimeProvider.UTC,
|
||||
ledgerEntries = Nil,
|
||||
initialLedgerEntries = ImmArray.empty,
|
||||
queueDepth)
|
||||
|
||||
ledgerF.map { ledger =>
|
||||
@ -46,7 +47,7 @@ class SqlLedgerSpec
|
||||
jdbcUrl = postgresFixture.jdbcUrl,
|
||||
ledgerId = Some(ledgerId),
|
||||
timeProvider = TimeProvider.UTC,
|
||||
ledgerEntries = Nil,
|
||||
initialLedgerEntries = ImmArray.empty,
|
||||
queueDepth)
|
||||
|
||||
ledgerF.map { ledger =>
|
||||
@ -62,19 +63,19 @@ class SqlLedgerSpec
|
||||
jdbcUrl = postgresFixture.jdbcUrl,
|
||||
ledgerId = Some(ledgerId),
|
||||
timeProvider = TimeProvider.UTC,
|
||||
ledgerEntries = Nil,
|
||||
initialLedgerEntries = ImmArray.empty,
|
||||
queueDepth)
|
||||
ledger2 <- SqlLedger(
|
||||
jdbcUrl = postgresFixture.jdbcUrl,
|
||||
ledgerId = Some(ledgerId),
|
||||
timeProvider = TimeProvider.UTC,
|
||||
ledgerEntries = Nil,
|
||||
initialLedgerEntries = ImmArray.empty,
|
||||
queueDepth)
|
||||
ledger3 <- SqlLedger(
|
||||
jdbcUrl = postgresFixture.jdbcUrl,
|
||||
ledgerId = None,
|
||||
timeProvider = TimeProvider.UTC,
|
||||
ledgerEntries = Nil,
|
||||
initialLedgerEntries = ImmArray.empty,
|
||||
queueDepth)
|
||||
} yield {
|
||||
ledger1.ledgerId should not be equal(ledgerId)
|
||||
@ -90,14 +91,14 @@ class SqlLedgerSpec
|
||||
jdbcUrl = postgresFixture.jdbcUrl,
|
||||
ledgerId = Some("TheLedger"),
|
||||
timeProvider = TimeProvider.UTC,
|
||||
ledgerEntries = Nil,
|
||||
initialLedgerEntries = ImmArray.empty,
|
||||
queueDepth
|
||||
)
|
||||
_ <- SqlLedger(
|
||||
jdbcUrl = postgresFixture.jdbcUrl,
|
||||
ledgerId = Some("AnotherLedger"),
|
||||
timeProvider = TimeProvider.UTC,
|
||||
ledgerEntries = Nil,
|
||||
initialLedgerEntries = ImmArray.empty,
|
||||
queueDepth
|
||||
)
|
||||
} yield (())
|
||||
|
Loading…
Reference in New Issue
Block a user