sandbox: Avoid converting to byte arrays before deserializing. (#4865)

CHANGELOG_BEGIN
CHANGELOG_END
This commit is contained in:
Samir Talwar 2020-03-06 16:40:26 +01:00 committed by GitHub
parent c6954c086b
commit a0f250a727
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 35 additions and 41 deletions

View File

@ -73,7 +73,6 @@ import com.digitalasset.platform.store.{
PersistenceEntry
}
import com.digitalasset.resources.ResourceOwner
import com.google.common.io.ByteStreams
import scalaz.syntax.tag._
import scala.collection.immutable
@ -90,10 +89,11 @@ private final case class ParsedEntry(
workflowId: Option[WorkflowId],
effectiveAt: Option[Date],
recordedAt: Option[Date],
transaction: Option[Array[Byte]],
transaction: Option[InputStream],
rejectionType: Option[String],
rejectionDesc: Option[String],
offset: Long)
offset: Long,
)
private final case class ParsedPartyData(
party: String,
@ -1084,7 +1084,7 @@ private class JdbcLedgerDao(
ledgerString("workflow_id")(emptyStringToNullColumn).? ~
date("effective_at").? ~
date("recorded_at").? ~
byteArray("transaction").? ~
binaryStream("transaction").? ~
str("rejection_type").? ~
str("rejection_description").? ~
long("ledger_offset")
@ -1255,9 +1255,9 @@ private class JdbcLedgerDao(
)
.as(binaryStream("contract").singleOpt)
}
.map(_.map { bytes =>
.map(_.map { contractStream =>
contractSerializer
.deserializeContractInstance(ByteStreams.toByteArray(bytes))
.deserializeContractInstance(contractStream)
.getOrElse(sys.error(s"failed to deserialize contract! cid:${contractId.coid}"))
})(executionContext)
@ -1280,7 +1280,7 @@ private class JdbcLedgerDao(
DivulgedContract(
absoluteCoid,
contractSerializer
.deserializeContractInstance(ByteStreams.toByteArray(contractStream))
.deserializeContractInstance(contractStream)
.getOrElse(sys.error(s"failed to deserialize contract! cid:$coid")),
divulgences
)
@ -1299,7 +1299,7 @@ private class JdbcLedgerDao(
val divulgences = lookupDivulgences(coid)
val absoluteCoid = AbsoluteContractId(coid)
val contractInstance = contractSerializer
.deserializeContractInstance(ByteStreams.toByteArray(contractStream))
.deserializeContractInstance(contractStream)
.getOrElse(sys.error(s"failed to deserialize contract! cid:$coid"))
val signatories =
@ -1320,7 +1320,7 @@ private class JdbcLedgerDao(
keyStreamO.map(keyStream => {
val keyMaintainers = lookupKeyMaintainers(coid)
val keyValue = valueSerializer
.deserializeValue(ByteStreams.toByteArray(keyStream))
.deserializeValue(keyStream)
.getOrElse(sys.error(s"failed to deserialize key value! cid:$coid"))
.ensureNoCid
.fold(

View File

@ -3,8 +3,9 @@
package com.digitalasset.platform.store.serialization
import java.io.InputStream
import com.digitalasset.daml.lf.archive.{Decode, Reader}
import com.digitalasset.daml.lf.data.Ref
import com.digitalasset.daml.lf.transaction.{TransactionCoder, TransactionOuterClass}
import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, ContractInst, VersionedValue}
import com.digitalasset.daml.lf.value.ValueCoder
@ -13,7 +14,7 @@ trait ContractSerializer {
def serializeContractInstance(coinst: ContractInst[VersionedValue[AbsoluteContractId]])
: Either[ValueCoder.EncodeError, Array[Byte]]
def deserializeContractInstance(blob: Array[Byte])
def deserializeContractInstance(stream: InputStream)
: Either[ValueCoder.DecodeError, ContractInst[VersionedValue[AbsoluteContractId]]]
}
@ -28,20 +29,13 @@ object ContractSerializer extends ContractSerializer {
.encodeContractInstance[AbsoluteContractId](ValueCoder.CidEncoder, coinst)
.map(_.toByteArray())
override def deserializeContractInstance(blob: Array[Byte])
override def deserializeContractInstance(stream: InputStream)
: Either[ValueCoder.DecodeError, ContractInst[VersionedValue[AbsoluteContractId]]] =
TransactionCoder
.decodeContractInstance[AbsoluteContractId](
ValueCoder.AbsCidDecoder,
TransactionOuterClass.ContractInstance.parseFrom(
Decode.damlLfCodedInputStreamFromBytes(blob, Reader.PROTOBUF_RECURSION_LIMIT))
Decode.damlLfCodedInputStream(stream, Reader.PROTOBUF_RECURSION_LIMIT))
)
private def toContractId(s: String) =
Ref.ContractIdString
.fromString(s)
.left
.map(e => ValueCoder.DecodeError(s"cannot decode contractId: $e"))
.map(AbsoluteContractId)
}

View File

@ -3,6 +3,8 @@
package com.digitalasset.platform.store.serialization
import java.io.InputStream
import com.digitalasset.daml.lf.archive.{Decode, Reader}
import com.digitalasset.daml.lf.transaction._
import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, VersionedValue}
@ -16,7 +18,7 @@ trait TransactionSerializer {
transaction: GenTransaction[EventId, AbsoluteContractId, VersionedValue[AbsoluteContractId]])
: Either[EncodeError, Array[Byte]]
def deserializeTransaction(blob: Array[Byte]): Either[
def deserializeTransaction(stream: InputStream): Either[
DecodeError,
GenTransaction[EventId, AbsoluteContractId, VersionedValue[AbsoluteContractId]]]
@ -35,7 +37,7 @@ object TransactionSerializer extends TransactionSerializer {
)
.map(_.toByteArray())
override def deserializeTransaction(blob: Array[Byte]): Either[
override def deserializeTransaction(stream: InputStream): Either[
DecodeError,
GenTransaction[EventId, AbsoluteContractId, VersionedValue[AbsoluteContractId]]] =
TransactionCoder
@ -43,7 +45,7 @@ object TransactionSerializer extends TransactionSerializer {
TransactionCoder.EventIdDecoder,
ValueCoder.AbsCidDecoder,
TransactionOuterClass.Transaction.parseFrom(
Decode.damlLfCodedInputStreamFromBytes(blob, Reader.PROTOBUF_RECURSION_LIMIT))
Decode.damlLfCodedInputStream(stream, Reader.PROTOBUF_RECURSION_LIMIT))
)
.map(_.transaction)

View File

@ -3,17 +3,19 @@
package com.digitalasset.platform.store.serialization
import java.io.InputStream
import com.digitalasset.daml.lf.archive.{Decode, Reader}
import com.digitalasset.daml.lf.data.Ref
import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, VersionedValue}
import com.digitalasset.daml.lf.value.ValueCoder.DecodeError
import com.digitalasset.daml.lf.value.{ValueCoder, ValueOuterClass}
trait ValueSerializer {
def serializeValue(
value: VersionedValue[AbsoluteContractId]): Either[ValueCoder.EncodeError, Array[Byte]]
value: VersionedValue[AbsoluteContractId]
): Either[ValueCoder.EncodeError, Array[Byte]]
def deserializeValue(blob: Array[Byte]): Either[DecodeError, VersionedValue[AbsoluteContractId]]
def deserializeValue(stream: InputStream): Either[DecodeError, VersionedValue[AbsoluteContractId]]
}
/**
@ -28,18 +30,12 @@ object ValueSerializer extends ValueSerializer {
.map(_.toByteArray())
override def deserializeValue(
blob: Array[Byte]): Either[DecodeError, VersionedValue[AbsoluteContractId]] =
stream: InputStream
): Either[DecodeError, VersionedValue[AbsoluteContractId]] =
ValueCoder
.decodeVersionedValue(
ValueCoder.AbsCidDecoder,
ValueOuterClass.VersionedValue.parseFrom(
Decode.damlLfCodedInputStreamFromBytes(blob, Reader.PROTOBUF_RECURSION_LIMIT)))
private def toContractId(s: String) =
Ref.ContractIdString
.fromString(s)
.left
.map(e => DecodeError(s"cannot decode contractId: $e"))
.map(AbsoluteContractId)
Decode.damlLfCodedInputStream(stream, Reader.PROTOBUF_RECURSION_LIMIT)))
}

View File

@ -33,7 +33,7 @@ class V5_1__Populate_Event_Data extends BaseJavaMigration {
def next(): (String, Transaction) = {
val transactionId = rows.getString("transaction_id")
val transaction = TransactionSerializer
.deserializeTransaction(rows.getBytes("transaction"))
.deserializeTransaction(rows.getBinaryStream("transaction"))
.getOrElse(sys.error(s"failed to deserialize transaction $transactionId"))
hasNext = rows.next()

View File

@ -33,7 +33,7 @@ class V10_1__Populate_Event_Data extends BaseJavaMigration {
def next(): (String, Transaction) = {
val transactionId = rows.getString("transaction_id")
val transaction = TransactionSerializer
.deserializeTransaction(rows.getBytes("transaction"))
.deserializeTransaction(rows.getBinaryStream("transaction"))
.getOrElse(sys.error(s"failed to deserialize transaction $transactionId"))
hasNext = rows.next()

View File

@ -5,6 +5,7 @@
// 'db.migration.postgres' for postgres migrations
package db.migration.postgres
import java.io.InputStream
import java.sql.Connection
import java.util.Date
@ -493,10 +494,11 @@ class V2_1__Rebuild_Acs extends BaseJavaMigration {
workflowId: Option[WorkflowId],
effectiveAt: Option[Date],
recordedAt: Option[Date],
transaction: Option[Array[Byte]],
transaction: Option[InputStream],
rejectionType: Option[String],
rejectionDesc: Option[String],
offset: Long)
offset: Long,
)
private val EntryParser: RowParser[ParsedEntry] =
Macro.parser[ParsedEntry](

View File

@ -56,7 +56,7 @@ class V3__Recompute_Key_Hash extends BaseJavaMigration {
qualifiedName = Ref.QualifiedName.assertFromString(rows.getString("template_name"))
)
val key = ValueSerializer
.deserializeValue(rows.getBytes("contract_key"))
.deserializeValue(rows.getBinaryStream("contract_key"))
.fold(err => throw new IllegalArgumentException(err.errorMessage), identity)
.assertNoCid(coid => s"Found contract ID $coid in contract key")

View File

@ -56,7 +56,7 @@ class V4_1__Collect_Parties extends BaseJavaMigration {
def next(): (Long, Transaction) = {
val ledgerOffset = rows.getLong("ledger_offset")
val transaction = TransactionSerializer
.deserializeTransaction(rows.getBytes("transaction"))
.deserializeTransaction(rows.getBinaryStream("transaction"))
.getOrElse(
sys.error(s"failed to deserialize transaction with ledger offset $ledgerOffset"))