Fix rare out of order transaction delivery on the ledger api (#8336)

* Fix out of order transaction delivery on the ledger api

Akka's mergeSubstreams does not guarantee the order of the output.
In rare cases the order of the output doesn't match with the order
of the input, even though we know that the input events are in the
correct order (otherwise groupContiguous would delivery transactions
with random events from other transactions).

This change removes the need for substreams by collecting the events
for a transaction inside statefulMapConcat. The loss of mergeSubstreams'
parallelism is likely no problem, because we anyway need to send out transactions
in the correct order.

Fixes #7521.

CHANGELOG_BEGIN
Ledger API: Fixed an issue that rarely caused transactions to be sent out of order. See #7521
CHANGELOG_END

* Use mergeSubstreamsWithParallelism(1)

* Use concatSubstreams

* Add test that can trigger the error reliably

* Address https://github.com/digital-asset/daml/pull/8336#discussion_r545760676

* Update copyright notice in GroupContiguousHeavySpec.scala

* Remove link to PR

Co-authored-by: Stefano Baghino <stefano.baghino@digitalasset.com>
Co-authored-by: Robert Autenrieth <robert.autenrieth@digitalasset.com>
This commit is contained in:
Gerolf Seitz 2021-01-08 20:12:19 +01:00 committed by GitHub
parent 859d8f44d9
commit 3f54258a22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 51 additions and 7 deletions

View File

@ -248,12 +248,6 @@ da_scala_test_suite(
"//ledger/test-common/test-certificates",
openssl_executable,
],
# this is not flaky for the usual "flaky tests" reasons. There is actually
# an ordering issue in transaction streams; some test failures are listed in
# https://github.com/digital-asset/daml/issues/7521 and the linked issues.
# We've marked it flaky because we don't want to collect failing cases
# anymore, and do not have a solution yet.
flaky = True,
jvm_flags = [
"-Djava.security.debug=\"certpath ocsp\"", # This facilitates debugging of the OCSP checks mechanism
],

View File

@ -51,6 +51,12 @@ package object events {
* contiguous stretch of the input [[Source]]. Well suited to perform group-by
* operations of streams where [[K]] attributes are either sorted or at least
* show up in blocks.
*
* Implementation detail: this method _must_ use concatSubstreams instead of
* mergeSubstreams to prevent the substreams to be processed in parallel,
* potentially causing the outputs to be delivered in a different order.
*
* Docs: https://doc.akka.io/docs/akka/2.6.10/stream/stream-substream.html#groupby
*/
private[events] def groupContiguous[A, K, Mat](source: Source[A, Mat])(
by: A => K): Source[Vector[A], Mat] =
@ -68,7 +74,7 @@ package object events {
.splitWhen(_._2)
.map(_._1)
.fold(Vector.empty[A])(_ :+ _)
.mergeSubstreams
.concatSubstreams
// Dispatches the call to either function based on the cardinality of the input
// This is mostly designed to route requests to queries specialized for single/multi-party subs

View File

@ -0,0 +1,44 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.store.dao.events
import akka.stream.scaladsl.{Sink, Source}
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import scala.util.Random
final class GroupContiguousHeavySpec
extends AnyFlatSpec
with Matchers
with ScalaFutures
with AkkaBeforeAndAfterAll {
behavior of "groupContiguous (heavy)"
override def spanScaleFactor: Double = 100 // Very high timeout as we stress test this operator
// This error condition is extremely difficult to trigger and currently
// purely randomized data seems to do the trick.
// See: https://github.com/digital-asset/daml/pull/8336
it should "keep the order of the input even with items of widely skewed size" in {
for (_ <- 1 to 50) {
val pairs = Iterator.range(0, 1000).flatMap { outerKey =>
Iterator.tabulate(Random.nextInt(100) + 10) { innerKey =>
{
val payload = 0.toChar.toString * (Random.nextInt(100) + 10)
outerKey -> f"$outerKey%05d:$innerKey%05d:$payload:"
}
}
}
val grouped = groupContiguous(Source.fromIterator(() => pairs))(by = _._1)
whenReady(grouped.runWith(Sink.seq[Vector[(Int, String)]])) {
_.flatMap(_.map(_._2)) shouldBe sorted
}
}
}
}