Switch from InputStream to Byte Array at the binary content JDBC transport (#11072)

CHANGELOG_BEGIN
CHANGELOG_END
This commit is contained in:
Marton Nagy 2021-09-29 21:44:24 +02:00 committed by GitHub
parent 6ae3afa8fb
commit 735c3090a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 70 additions and 71 deletions

View File

@ -3,7 +3,7 @@
package com.daml.platform.store.appendonlydao.events
import java.io.InputStream
import java.io.ByteArrayInputStream
import com.daml.platform.store.appendonlydao.events
import com.daml.platform.store.dao.events.ContractStateEvent
@ -69,7 +69,7 @@ object ContractStateEventsReader {
private def getCachedOrDecompressContract(
contractId: ContractId,
templateId: events.Identifier,
createArgument: InputStream,
createArgument: Array[Byte],
maybeCreateArgumentCompression: Option[Int],
lfValueTranslation: LfValueTranslation,
): Contract = {
@ -88,7 +88,7 @@ object ContractStateEventsReader {
private def decompressKey(
templateId: events.Identifier,
maybeCreateKeyValue: Option[InputStream],
maybeCreateKeyValue: Option[Array[Byte]],
maybeCreateKeyValueCompression: Option[Int],
): Option[Key] =
for {
@ -99,8 +99,8 @@ object ContractStateEventsReader {
keyValue = decompressAndDeserialize(createKeyValueCompression, createKeyValue)
} yield Key.assertBuild(templateId, keyValue.value)
private def decompressAndDeserialize(algorithm: Compression.Algorithm, value: InputStream) =
ValueSerializer.deserializeValue(algorithm.decompress(value))
private def decompressAndDeserialize(algorithm: Compression.Algorithm, value: Array[Byte]) =
ValueSerializer.deserializeValue(algorithm.decompress(new ByteArrayInputStream(value)))
case class CreateMissingError(field: String) extends NoStackTrace {
override def getMessage: String =

View File

@ -3,7 +3,7 @@
package com.daml.platform.store.appendonlydao.events
import java.io.InputStream
import java.io.ByteArrayInputStream
import java.time.Instant
import com.codahale.metrics.Timer
@ -174,7 +174,7 @@ private[appendonlydao] object ContractsReader {
private def toContract(
contractId: ContractId,
templateId: String,
createArgument: InputStream,
createArgument: Array[Byte],
createArgumentCompression: Compression.Algorithm,
decompressionTimer: Timer,
deserializationTimer: Timer,
@ -182,7 +182,7 @@ private[appendonlydao] object ContractsReader {
val decompressed =
Timed.value(
timer = decompressionTimer,
value = createArgumentCompression.decompress(createArgument),
value = createArgumentCompression.decompress(new ByteArrayInputStream(createArgument)),
)
val deserialized =
Timed.value(

View File

@ -3,7 +3,7 @@
package com.daml.platform.store.appendonlydao.events
import java.io.InputStream
import java.io.ByteArrayInputStream
import com.daml.ledger.api.v1.event.{CreatedEvent, ExercisedEvent}
import com.daml.ledger.api.v1.value.{
@ -255,8 +255,8 @@ final class LfValueTranslation(
private def eventKey(s: String) =
LfValueTranslationCache.EventCache.Key(EventId.assertFromString(s))
private def decompressAndDeserialize(algorithm: Compression.Algorithm, value: InputStream) =
ValueSerializer.deserializeValue(algorithm.decompress(value))
private def decompressAndDeserialize(algorithm: Compression.Algorithm, value: Array[Byte]) =
ValueSerializer.deserializeValue(algorithm.decompress(new ByteArrayInputStream(value)))
def enricher: ValueEnricher = {
// Note: LfValueTranslation is used by JdbcLedgerDao for both serialization and deserialization.

View File

@ -3,8 +3,6 @@
package com.daml.platform.store.appendonlydao.events
import java.io.InputStream
import com.daml.ledger.api.v1.event.{
ArchivedEvent => PbArchivedEvent,
CreatedEvent => PbCreatedEvent,
@ -51,9 +49,9 @@ object Raw {
*/
sealed abstract class Created[E](
val partial: PbCreatedEvent,
val createArgument: InputStream,
val createArgument: Array[Byte],
val createArgumentCompression: Compression.Algorithm,
val createKeyValue: Option[InputStream],
val createKeyValue: Option[Array[Byte]],
val createKeyValueCompression: Compression.Algorithm,
) extends Raw[E] {
protected def wrapInEvent(event: PbCreatedEvent): E
@ -97,9 +95,9 @@ object Raw {
final class Created private[Raw] (
raw: PbCreatedEvent,
createArgument: InputStream,
createArgument: Array[Byte],
createArgumentCompression: Compression.Algorithm,
createKeyValue: Option[InputStream],
createKeyValue: Option[Array[Byte]],
createKeyValueCompression: Compression.Algorithm,
) extends Raw.Created[PbFlatEvent](
raw,
@ -118,12 +116,12 @@ object Raw {
eventId: String,
contractId: String,
templateId: Identifier,
createArgument: InputStream,
createArgument: Array[Byte],
createArgumentCompression: Option[Int],
createSignatories: ArraySeq[String],
createObservers: ArraySeq[String],
createAgreementText: Option[String],
createKeyValue: Option[InputStream],
createKeyValue: Option[Array[Byte]],
createKeyValueCompression: Option[Int],
eventWitnesses: ArraySeq[String],
): Raw.FlatEvent.Created =
@ -185,9 +183,9 @@ object Raw {
final class Created(
raw: PbCreatedEvent,
createArgument: InputStream,
createArgument: Array[Byte],
createArgumentCompression: Compression.Algorithm,
createKeyValue: Option[InputStream],
createKeyValue: Option[Array[Byte]],
createKeyValueCompression: Compression.Algorithm,
) extends Raw.Created[PbTreeEvent](
raw,
@ -206,12 +204,12 @@ object Raw {
eventId: String,
contractId: String,
templateId: Identifier,
createArgument: InputStream,
createArgument: Array[Byte],
createArgumentCompression: Option[Int],
createSignatories: ArraySeq[String],
createObservers: ArraySeq[String],
createAgreementText: Option[String],
createKeyValue: Option[InputStream],
createKeyValue: Option[Array[Byte]],
createKeyValueCompression: Option[Int],
eventWitnesses: ArraySeq[String],
): Raw.TreeEvent.Created =
@ -234,9 +232,9 @@ object Raw {
final class Exercised(
val partial: PbExercisedEvent,
val exerciseArgument: InputStream,
val exerciseArgument: Array[Byte],
val exerciseArgumentCompression: Compression.Algorithm,
val exerciseResult: Option[InputStream],
val exerciseResult: Option[Array[Byte]],
val exerciseResultCompression: Compression.Algorithm,
) extends TreeEvent {
override def applyDeserialization(
@ -259,9 +257,9 @@ object Raw {
templateId: Identifier,
exerciseConsuming: Boolean,
exerciseChoice: String,
exerciseArgument: InputStream,
exerciseArgument: Array[Byte],
exerciseArgumentCompression: Option[Int],
exerciseResult: Option[InputStream],
exerciseResult: Option[Array[Byte]],
exerciseResultCompression: Option[Int],
exerciseActors: ArraySeq[String],
exerciseChildEventIds: ArraySeq[String],

View File

@ -3,7 +3,7 @@
package com.daml.platform.store.appendonlydao.events
import java.io.InputStream
import java.io.ByteArrayInputStream
import com.daml.lf.data.Ref
import com.daml.platform.store.backend.StorageBackend.RawTransactionEvent
@ -50,14 +50,14 @@ object TransactionLogUpdatesReader {
Compression.Algorithm
.assertLookup(raw.exerciseArgumentCompression)
.decompress(
raw.exerciseArgument.mandatory("exercise_argument")
new ByteArrayInputStream(raw.exerciseArgument.mandatory("exercise_argument"))
)
),
exerciseResult = raw.exerciseResult.map { inputStream =>
exerciseResult = raw.exerciseResult.map { byteArray =>
ValueSerializer.deserializeValue(
Compression.Algorithm
.assertLookup(raw.exerciseResultCompression)
.decompress(inputStream)
.decompress(new ByteArrayInputStream(byteArray))
)
},
consuming = raw.eventKind == EventKind.ConsumingExercise,
@ -102,8 +102,8 @@ object TransactionLogUpdatesReader {
throw InvalidEventKind(unknownKind)
}
private def decompressAndDeserialize(algorithm: Compression.Algorithm, value: InputStream) =
ValueSerializer.deserializeValue(algorithm.decompress(value))
private def decompressAndDeserialize(algorithm: Compression.Algorithm, value: Array[Byte]) =
ValueSerializer.deserializeValue(algorithm.decompress(new ByteArrayInputStream(value)))
final case class FieldMissingError(field: String) extends RuntimeException {
override def getMessage: String = s"Missing mandatory field $field"

View File

@ -3,7 +3,6 @@
package com.daml.platform.store.backend
import java.io.InputStream
import java.sql.Connection
import java.time.Instant
@ -347,16 +346,16 @@ object StorageBackend {
case class RawContractState(
templateId: Option[String],
flatEventWitnesses: Set[Ref.Party],
createArgument: Option[InputStream],
createArgument: Option[Array[Byte]],
createArgumentCompression: Option[Int],
eventKind: Int,
ledgerEffectiveTime: Option[Instant],
)
case class RawContract(
templateId: String,
createArgument: InputStream,
createArgumentCompression: Option[Int],
class RawContract(
val templateId: String,
val createArgument: Array[Byte],
val createArgumentCompression: Option[Int],
)
case class RawContractStateEvent(
@ -364,9 +363,9 @@ object StorageBackend {
contractId: ContractId,
templateId: Option[Ref.Identifier],
ledgerEffectiveTime: Option[Instant],
createKeyValue: Option[InputStream],
createKeyValue: Option[Array[Byte]],
createKeyCompression: Option[Int],
createArgument: Option[InputStream],
createArgument: Option[Array[Byte]],
createArgumentCompression: Option[Int],
flatEventWitnesses: Set[Ref.Party],
eventSequentialId: Long,
@ -386,17 +385,17 @@ object StorageBackend {
createSignatories: Option[Array[String]],
createObservers: Option[Array[String]],
createAgreementText: Option[String],
createKeyValue: Option[InputStream],
createKeyValue: Option[Array[Byte]],
createKeyCompression: Option[Int],
createArgument: Option[InputStream],
createArgument: Option[Array[Byte]],
createArgumentCompression: Option[Int],
treeEventWitnesses: Set[String],
flatEventWitnesses: Set[String],
submitters: Set[String],
exerciseChoice: Option[String],
exerciseArgument: Option[InputStream],
exerciseArgument: Option[Array[Byte]],
exerciseArgumentCompression: Option[Int],
exerciseResult: Option[InputStream],
exerciseResult: Option[Array[Byte]],
exerciseResultCompression: Option[Int],
exerciseActors: Option[Array[String]],
exerciseChildEventIds: Option[Array[String]],

View File

@ -3,11 +3,10 @@
package com.daml.platform.store.backend.common
import java.io.InputStream
import java.sql.Connection
import java.time.Instant
import anorm.SqlParser.{binaryStream, int, long, str}
import anorm.SqlParser.{byteArray, int, long, str}
import anorm.{Row, RowParser, SimpleSql, ~}
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.daml.ledger.offset.Offset
@ -105,8 +104,8 @@ trait CompletionStorageBackendTemplate extends CompletionStorageBackend {
private val rejectionStatusCodeColumn: RowParser[Int] = int("rejection_status_code")
private val rejectionStatusMessageColumn: RowParser[String] = str("rejection_status_message")
private val rejectionStatusDetailsColumn: RowParser[Option[InputStream]] =
binaryStream("rejection_status_details").?
private val rejectionStatusDetailsColumn: RowParser[Option[Array[Byte]]] =
byteArray("rejection_status_details").?
private val rejectedCommandParser: RowParser[CompletionStreamResponse] =
sharedColumns ~
@ -140,7 +139,7 @@ trait CompletionStorageBackendTemplate extends CompletionStorageBackend {
private def buildStatusProto(
rejectionStatusCode: Int,
rejectionStatusMessage: String,
rejectionStatusDetails: Option[InputStream],
rejectionStatusDetails: Option[Array[Byte]],
): StatusProto =
StatusProto.of(
rejectionStatusCode,
@ -149,10 +148,11 @@ trait CompletionStorageBackendTemplate extends CompletionStorageBackend {
)
private def parseRejectionStatusDetails(
rejectionStatusDetails: Option[InputStream]
rejectionStatusDetails: Option[Array[Byte]]
): Seq[any.Any] =
rejectionStatusDetails
.map(stream => StatusDetails.parseFrom(stream).details)
.map(StatusDetails.parseFrom)
.map(_.details)
.getOrElse(Seq.empty)
def pruneCompletions(

View File

@ -5,7 +5,8 @@ package com.daml.platform.store.backend.common
import java.sql.Connection
import java.time.Instant
import anorm.SqlParser.{binaryStream, int, long, str}
import anorm.SqlParser.{byteArray, int, long, str}
import anorm.{ResultSetParser, Row, RowParser, SimpleSql, SqlParser, ~}
import com.daml.lf.data.Ref
import com.daml.platform.store.Conversions.{
@ -138,7 +139,7 @@ trait ContractStorageBackendTemplate extends ContractStorageBackend {
private val fullDetailsContractRowParser: RowParser[StorageBackend.RawContractState] =
(str("template_id").?
~ flatEventWitnessesColumn("flat_event_witnesses")
~ binaryStream("create_argument").?
~ byteArray("create_argument").?
~ int("create_argument_compression").?
~ int("event_kind") ~ instantFromMicros("ledger_effective_time").?)
.map(SqlParser.flatten)
@ -171,9 +172,9 @@ trait ContractStorageBackendTemplate extends ContractStorageBackend {
contractId("contract_id") ~
identifier("template_id").? ~
instantFromMicros("ledger_effective_time").? ~
binaryStream("create_key_value").? ~
byteArray("create_key_value").? ~
int("create_key_value_compression").? ~
binaryStream("create_argument").? ~
byteArray("create_argument").? ~
int("create_argument_compression").? ~
long("event_sequential_id") ~
flatEventWitnessesColumn("flat_event_witnesses") ~
@ -221,10 +222,12 @@ trait ContractStorageBackendTemplate extends ContractStorageBackend {
private val contractRowParser: RowParser[StorageBackend.RawContract] =
(str("template_id")
~ binaryStream("create_argument")
~ byteArray("create_argument")
~ int("create_argument_compression").?)
.map(SqlParser.flatten)
.map(StorageBackend.RawContract.tupled)
.map { case (templateId, createArgument, createArgumentCompression) =>
new StorageBackend.RawContract(templateId, createArgument, createArgumentCompression)
}
protected def activeContractSqlLiteral(
contractId: ContractId,

View File

@ -3,10 +3,9 @@
package com.daml.platform.store.backend.common
import java.io.InputStream
import java.sql.Connection
import java.time.Instant
import anorm.SqlParser.{array, binaryStream, bool, int, long, str}
import anorm.SqlParser.{array, bool, byteArray, int, long, str}
import anorm.{Row, RowParser, SimpleSql, ~}
import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref
@ -78,21 +77,21 @@ trait EventStorageBackendTemplate extends EventStorageBackend {
array[String]("event_witnesses")
private type CreatedEventRow =
SharedRow ~ InputStream ~ Option[Int] ~ Array[String] ~ Array[String] ~ Option[String] ~
Option[InputStream] ~ Option[Int]
SharedRow ~ Array[Byte] ~ Option[Int] ~ Array[String] ~ Array[String] ~ Option[String] ~
Option[Array[Byte]] ~ Option[Int]
private val createdEventRow: RowParser[CreatedEventRow] =
sharedRow ~
binaryStream("create_argument") ~
byteArray("create_argument") ~
int("create_argument_compression").? ~
array[String]("create_signatories") ~
array[String]("create_observers") ~
str("create_agreement_text").? ~
binaryStream("create_key_value").? ~
byteArray("create_key_value").? ~
int("create_key_value_compression").?
private type ExercisedEventRow =
SharedRow ~ Boolean ~ String ~ InputStream ~ Option[Int] ~ Option[InputStream] ~ Option[Int] ~
SharedRow ~ Boolean ~ String ~ Array[Byte] ~ Option[Int] ~ Option[Array[Byte]] ~ Option[Int] ~
Array[String] ~ Array[String]
private val exercisedEventRow: RowParser[ExercisedEventRow] = {
@ -100,9 +99,9 @@ trait EventStorageBackendTemplate extends EventStorageBackend {
sharedRow ~
bool("exercise_consuming") ~
str("exercise_choice") ~
binaryStream("exercise_argument") ~
byteArray("exercise_argument") ~
int("exercise_argument_compression").? ~
binaryStream("exercise_result").? ~
byteArray("exercise_result").? ~
int("exercise_result_compression").? ~
array[String]("exercise_actors") ~
array[String]("exercise_child_event_ids")
@ -521,17 +520,17 @@ trait EventStorageBackendTemplate extends EventStorageBackend {
array[String]("create_signatories").? ~
array[String]("create_observers").? ~
str("create_agreement_text").? ~
binaryStream("create_key_value").? ~
byteArray("create_key_value").? ~
int("create_key_value_compression").? ~
binaryStream("create_argument").? ~
byteArray("create_argument").? ~
int("create_argument_compression").? ~
array[String]("tree_event_witnesses") ~
array[String]("flat_event_witnesses") ~
array[String]("submitters").? ~
str("exercise_choice").? ~
binaryStream("exercise_argument").? ~
byteArray("exercise_argument").? ~
int("exercise_argument_compression").? ~
binaryStream("exercise_result").? ~
byteArray("exercise_result").? ~
int("exercise_result_compression").? ~
array[String]("exercise_actors").? ~
array[String]("exercise_child_event_ids").? ~