mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 09:17:43 +03:00
Precompute scala pb serialized size at creation [DPP-1301] (#15408)
* Precompute ScalaPB serializedSize for stream messages changelog_begin Introduces a gRPC streams optimization targeting complex protobuf payloads. When enabled, it can allow up to 60-80% throughput increase for GetTransactions/GetTransactionTrees/GetActiveContracts. The optimization is toggleable by an additional config parameter for the API server: `api-server.optimize-grpc-streams-throughput` changelog_end * Address Marton's review comment * Update ledger/participant-integration-api/src/main/scala/platform/store/ScalaPbStreamingOptimizations.scala Co-authored-by: mziolekda <marcin.ziolek@digitalasset.com> * Rename to withPrecomputedSerializedSize Co-authored-by: mziolekda <marcin.ziolek@digitalasset.com>
This commit is contained in:
parent
132211242d
commit
ce464c2421
@ -0,0 +1,35 @@
|
||||
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.platform.store
|
||||
|
||||
import scalapb.GeneratedMessage
|
||||
import scala.util.chaining._
|
||||
|
||||
object ScalaPbStreamingOptimizations {
|
||||
implicit class ScalaPbMessageWithPrecomputedSerializedSize[
|
||||
ScalaPbMsg <: GeneratedMessage with AnyRef
|
||||
](scalaPbMsg: ScalaPbMsg) {
|
||||
|
||||
/** Optimization for gRPC streams throughput.
|
||||
*
|
||||
* gRPC internal logic marshalls the protobuf response payloads sequentially before
|
||||
* sending them over the wire (see io.grpc.ServerCallImpl.sendMessageInternal), imposing as limit
|
||||
* the maximum marshalling throughput of a payload type.
|
||||
*
|
||||
* We've observed empirically that ScalaPB-generated message classes have associated marshallers
|
||||
* with significant latencies when encoding complex payloads (e.g. [[com.daml.ledger.api.v1.transaction_service.GetTransactionTreesResponse]]),
|
||||
* with the gRPC marshalling bottleneck appearing in some performance tests.
|
||||
*
|
||||
* To alleviate the problem, we can leverage the fact that ScalaPB message classes have the serializedSize value memoized,
|
||||
* (see [[scalapb.GeneratedMessage.writeTo]]), whose computation is roughly half of the entire marshalling step.
|
||||
*
|
||||
* This optimization method takes advantage of the memoized value and forces the message's serializedSize computation,
|
||||
* roughly doubling the maximum theoretical ScalaPB stream throughput over the gRPC server layer.
|
||||
*
|
||||
* @return A new message [[scalapb.GeneratedMessage]] with precomputed serializedSize.
|
||||
*/
|
||||
def withPrecomputedSerializedSize(): ScalaPbMsg =
|
||||
scalaPbMsg.tap(_.serializedSize)
|
||||
}
|
||||
}
|
@ -23,6 +23,8 @@ import com.daml.platform.ApiOffset
|
||||
import com.daml.platform.api.v1.event.EventOps.{EventOps, TreeEventOps}
|
||||
import com.daml.platform.store.backend.EventStorageBackend.Entry
|
||||
|
||||
import com.daml.platform.store.ScalaPbStreamingOptimizations._
|
||||
|
||||
// TODO append-only: FIXME: move to the right place
|
||||
object EventsTable {
|
||||
|
||||
@ -52,7 +54,9 @@ object EventsTable {
|
||||
def toGetTransactionsResponse(
|
||||
events: Vector[Entry[Event]]
|
||||
): List[GetTransactionsResponse] =
|
||||
flatTransaction(events).toList.map(tx => GetTransactionsResponse(Seq(tx)))
|
||||
flatTransaction(events).toList.map(tx =>
|
||||
GetTransactionsResponse(Seq(tx)).withPrecomputedSerializedSize()
|
||||
)
|
||||
|
||||
def toGetFlatTransactionResponse(
|
||||
events: Vector[Entry[Event]]
|
||||
@ -70,7 +74,7 @@ object EventsTable {
|
||||
offset = "", // only the last response will have an offset.
|
||||
workflowId = entry.workflowId,
|
||||
activeContracts = Seq(entry.event.getCreated),
|
||||
)
|
||||
).withPrecomputedSerializedSize()
|
||||
case entry =>
|
||||
throw IndexErrors.DatabaseErrors.ResultSetError
|
||||
.Reject(
|
||||
@ -134,7 +138,9 @@ object EventsTable {
|
||||
def toGetTransactionTreesResponse(
|
||||
events: Vector[Entry[TreeEvent]]
|
||||
): List[GetTransactionTreesResponse] =
|
||||
transactionTree(events).toList.map(tx => GetTransactionTreesResponse(Seq(tx)))
|
||||
transactionTree(events).toList.map(tx =>
|
||||
GetTransactionTreesResponse(Seq(tx)).withPrecomputedSerializedSize()
|
||||
)
|
||||
|
||||
def toGetTransactionResponse(
|
||||
events: Vector[Entry[TreeEvent]]
|
||||
|
@ -33,6 +33,8 @@ import com.google.protobuf.timestamp.Timestamp
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import com.google.protobuf.ByteString
|
||||
|
||||
import com.daml.platform.store.ScalaPbStreamingOptimizations._
|
||||
|
||||
private[events] object TransactionLogUpdatesConversions {
|
||||
object ToFlatTransaction {
|
||||
def filter(
|
||||
@ -78,7 +80,9 @@ private[events] object TransactionLogUpdatesConversions {
|
||||
eventProjectionProperties,
|
||||
lfValueTranslation,
|
||||
)
|
||||
.map(transaction => GetTransactionsResponse(Seq(transaction)))
|
||||
.map(transaction =>
|
||||
GetTransactionsResponse(Seq(transaction)).withPrecomputedSerializedSize()
|
||||
)
|
||||
|
||||
def toGetFlatTransactionResponse(
|
||||
transactionLogUpdate: TransactionLogUpdate,
|
||||
@ -247,7 +251,7 @@ private[events] object TransactionLogUpdatesConversions {
|
||||
executionContext: ExecutionContext,
|
||||
): TransactionLogUpdate.TransactionAccepted => Future[GetTransactionTreesResponse] =
|
||||
toTransactionTree(_, requestingParties, eventProjectionProperties, lfValueTranslation)
|
||||
.map(txTree => GetTransactionTreesResponse(Seq(txTree)))
|
||||
.map(txTree => GetTransactionTreesResponse(Seq(txTree)).withPrecomputedSerializedSize())
|
||||
|
||||
private def toTransactionTree(
|
||||
transactionAccepted: TransactionLogUpdate.TransactionAccepted,
|
||||
|
Loading…
Reference in New Issue
Block a user