DPP-577 Use BIGINT instead of TIMESTAMP (#10761)

* Use bigint instead of timestamp

changelog_begin
changelog_end

* Add tests

* Fix deduplicateUntil

* Fix some places that still use timestamp

* Fix some places that still use timestamp

* DbDto uses only Long instead of Instant values

* Fix some places that still use Timestamp

* Remove unused classes

* Fix some places that still use Timestamp
This commit is contained in:
Robert Autenrieth 2021-09-07 10:00:13 +02:00 committed by GitHub
parent cdf4bf1138
commit 116d6a5994
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 431 additions and 226 deletions

View File

@ -1 +1 @@
a2d8ca60fa98745d92e698608532442fdfcd8d55417d810cdbeaea6c064fecdb
b91197c8bc9b1942ad70fcae54aa8b9ca1b0bf380e46b08c038a9bf3764fbc35

View File

@ -20,7 +20,7 @@ CREATE TABLE parameters (
---------------------------------------------------------------------------------------------------
CREATE TABLE configuration_entries (
ledger_offset VARCHAR PRIMARY KEY NOT NULL,
recorded_at TIMESTAMP NOT NULL,
recorded_at BIGINT NOT NULL,
submission_id VARCHAR NOT NULL,
typ VARCHAR NOT NULL,
configuration BYTEA NOT NULL,
@ -43,7 +43,7 @@ CREATE TABLE packages (
upload_id VARCHAR NOT NULL,
source_description VARCHAR,
package_size BIGINT NOT NULL,
known_since TIMESTAMP NOT NULL,
known_since BIGINT NOT NULL,
ledger_offset VARCHAR NOT NULL,
package BYTEA NOT NULL
);
@ -55,7 +55,7 @@ CREATE INDEX idx_packages_ledger_offset ON packages (ledger_offset);
---------------------------------------------------------------------------------------------------
CREATE TABLE package_entries (
ledger_offset VARCHAR PRIMARY KEY NOT NULL,
recorded_at TIMESTAMP NOT NULL,
recorded_at BIGINT NOT NULL,
submission_id VARCHAR,
typ VARCHAR NOT NULL,
rejection_reason VARCHAR,
@ -74,7 +74,7 @@ CREATE INDEX idx_package_entries ON package_entries (submission_id);
---------------------------------------------------------------------------------------------------
CREATE TABLE party_entries (
ledger_offset VARCHAR PRIMARY KEY NOT NULL,
recorded_at TIMESTAMP NOT NULL,
recorded_at BIGINT NOT NULL,
submission_id VARCHAR,
party VARCHAR,
display_name VARCHAR,
@ -97,7 +97,7 @@ CREATE INDEX idx_party_entries_party_and_ledger_offset ON party_entries(party, l
---------------------------------------------------------------------------------------------------
CREATE TABLE participant_command_submissions (
deduplication_key VARCHAR PRIMARY KEY NOT NULL,
deduplicate_until TIMESTAMP NOT NULL
deduplicate_until BIGINT NOT NULL
);
---------------------------------------------------------------------------------------------------
@ -105,7 +105,7 @@ CREATE TABLE participant_command_submissions (
---------------------------------------------------------------------------------------------------
CREATE TABLE participant_command_completions (
completion_offset VARCHAR NOT NULL,
record_time TIMESTAMP NOT NULL,
record_time BIGINT NOT NULL,
application_id VARCHAR NOT NULL,
submitters ARRAY NOT NULL,
command_id VARCHAR NOT NULL,
@ -122,7 +122,7 @@ CREATE TABLE participant_command_completions (
deduplication_offset VARCHAR,
deduplication_time_seconds BIGINT,
deduplication_time_nanos INT,
deduplication_start TIMESTAMP,
deduplication_start BIGINT,
-- The three columns below are `NULL` if the completion is for an accepted transaction.
-- The `rejection_status_details` column contains a Protocol-Buffers-serialized message of type
-- `daml.platform.index.StatusDetails`, containing the code, message, and further details
@ -188,7 +188,7 @@ CREATE INDEX participant_events_divulgence_contract_id_idx ON participant_events
CREATE TABLE participant_events_create (
-- * fixed-size columns first to avoid padding
event_sequential_id bigint NOT NULL, -- event identification: same ordering as event_offset
ledger_effective_time timestamp NOT NULL, -- transaction metadata
ledger_effective_time bigint NOT NULL, -- transaction metadata
node_index integer NOT NULL, -- event metadata
-- * event identification
@ -259,7 +259,7 @@ CREATE INDEX participant_events_create_create_key_hash_idx ON participant_events
CREATE TABLE participant_events_consuming_exercise (
-- * fixed-size columns first to avoid padding
event_sequential_id bigint NOT NULL, -- event identification: same ordering as event_offset
ledger_effective_time timestamp NOT NULL, -- transaction metadata
ledger_effective_time bigint NOT NULL, -- transaction metadata
node_index integer NOT NULL, -- event metadata
-- * event identification
@ -330,7 +330,7 @@ CREATE INDEX participant_events_consuming_exercise_contract_id_idx ON participan
CREATE TABLE participant_events_non_consuming_exercise (
-- * fixed-size columns first to avoid padding
event_sequential_id bigint NOT NULL, -- event identification: same ordering as event_offset
ledger_effective_time timestamp NOT NULL, -- transaction metadata
ledger_effective_time bigint NOT NULL, -- transaction metadata
node_index integer NOT NULL, -- event metadata
-- * event identification
@ -412,7 +412,7 @@ SELECT
event_sequential_id,
NULL::VARCHAR as event_offset,
NULL::VARCHAR as transaction_id,
NULL::timestamp without time zone as ledger_effective_time,
NULL::bigint as ledger_effective_time,
command_id,
workflow_id,
application_id,

View File

@ -1 +1 @@
188a48d9a296998d97c40e7d347916627bb92a6a1a10f390f0b08cf17653c465
0d6af3e9a9d4537558102484dca7768178ca310cfe62debb8a88f1b4312fc869

View File

@ -21,7 +21,7 @@ CREATE TABLE packages
-- The size of the archive payload (i.e., the serialized DAML-LF package), in bytes
package_size NUMBER not null,
-- The time when the package was added
known_since TIMESTAMP not null,
known_since NUMBER not null,
-- The ledger end at the time when the package was added
ledger_offset VARCHAR2(4000) not null,
-- The DAML-LF archive, serialized using the protobuf message `daml_lf.Archive`.
@ -35,7 +35,7 @@ CREATE INDEX packages_ledger_offset_idx ON packages(ledger_offset);
CREATE TABLE configuration_entries
(
ledger_offset VARCHAR2(4000) not null primary key,
recorded_at TIMESTAMP not null,
recorded_at NUMBER not null,
submission_id NVARCHAR2(1000) not null,
-- The type of entry, one of 'accept' or 'reject'.
typ NVARCHAR2(1000) not null,
@ -60,7 +60,7 @@ CREATE INDEX idx_configuration_submission ON configuration_entries (submission_i
CREATE TABLE package_entries
(
ledger_offset VARCHAR2(4000) not null primary key,
recorded_at TIMESTAMP not null,
recorded_at NUMBER not null,
-- SubmissionId for package to be uploaded
submission_id NVARCHAR2(1000),
-- The type of entry, one of 'accept' or 'reject'
@ -84,7 +84,7 @@ CREATE TABLE party_entries
-- The ledger end at the time when the party allocation was added
-- cannot BLOB add as primary key with oracle
ledger_offset VARCHAR2(4000) primary key not null,
recorded_at TIMESTAMP not null,
recorded_at NUMBER not null,
-- SubmissionId for the party allocation
submission_id NVARCHAR2(1000),
-- party
@ -111,7 +111,7 @@ CREATE INDEX idx_party_entries_party_and_ledger_offset ON party_entries(party, l
CREATE TABLE participant_command_completions
(
completion_offset VARCHAR2(4000) NOT NULL,
record_time TIMESTAMP NOT NULL,
record_time NUMBER NOT NULL,
application_id NVARCHAR2(1000) NOT NULL,
-- The submission ID will be provided by the participant or driver if the application didn't provide one.
@ -126,7 +126,7 @@ CREATE TABLE participant_command_completions
deduplication_offset VARCHAR2(4000),
deduplication_time_seconds NUMBER,
deduplication_time_nanos NUMBER,
deduplication_start TIMESTAMP,
deduplication_start NUMBER,
submitters CLOB NOT NULL CONSTRAINT ensure_json_submitters CHECK (submitters IS JSON),
command_id NVARCHAR2(1000) NOT NULL,
@ -144,7 +144,7 @@ CREATE TABLE participant_command_submissions
-- The deduplication key
deduplication_key NVARCHAR2(1000) primary key not null,
-- The time the command will stop being deduplicated
deduplicate_until TIMESTAMP not null
deduplicate_until NUMBER not null
);
---------------------------------------------------------------------------------------------------
@ -202,7 +202,7 @@ CREATE TABLE participant_events_create (
event_sequential_id NUMBER NOT NULL,
-- NOTE: this must be assigned sequentially by the indexer such that
-- for all events ev1, ev2 it holds that '(ev1.offset < ev2.offset) <=> (ev1.event_sequential_id < ev2.event_sequential_id)
ledger_effective_time TIMESTAMP NOT NULL,
ledger_effective_time NUMBER NOT NULL,
node_index INTEGER NOT NULL,
event_offset VARCHAR2(4000) NOT NULL,
@ -277,7 +277,7 @@ CREATE TABLE participant_events_consuming_exercise (
-- * transaction metadata
transaction_id VARCHAR2(4000) NOT NULL,
ledger_effective_time TIMESTAMP NOT NULL,
ledger_effective_time NUMBER NOT NULL,
command_id VARCHAR2(4000),
workflow_id VARCHAR2(4000),
application_id VARCHAR2(4000),
@ -342,7 +342,7 @@ CREATE TABLE participant_events_non_consuming_exercise (
-- NOTE: this must be assigned sequentially by the indexer such that
-- for all events ev1, ev2 it holds that '(ev1.offset < ev2.offset) <=> (ev1.event_sequential_id < ev2.event_sequential_id)
ledger_effective_time TIMESTAMP NOT NULL,
ledger_effective_time NUMBER NOT NULL,
node_index INTEGER NOT NULL,
event_offset VARCHAR2(4000) NOT NULL,
@ -404,7 +404,7 @@ SELECT cast(0 as SMALLINT) AS event_kind,
participant_events_divulgence.event_sequential_id,
cast(NULL as VARCHAR2(4000)) AS event_offset,
cast(NULL as VARCHAR2(4000)) AS transaction_id,
cast(NULL as TIMESTAMP) AS ledger_effective_time,
cast(NULL as NUMBER) AS ledger_effective_time,
participant_events_divulgence.command_id,
participant_events_divulgence.workflow_id,
participant_events_divulgence.application_id,

View File

@ -0,0 +1 @@
82a6b90fdb948db64fe47b0804ca61872d61acb9e71c37b3c1b7d293b450cff1

View File

@ -0,0 +1,185 @@
-- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0
---------------------------------------------------------------------------------------------------
-- V111: Convert all TIMESTAMP columns to BIGINT to solve issues with time zones
---------------------------------------------------------------------------------------------------
ALTER TABLE configuration_entries
ALTER COLUMN recorded_at
SET DATA TYPE BIGINT USING EXTRACT(EPOCH FROM recorded_at) * 1000000;
ALTER TABLE packages
ALTER COLUMN known_since
SET DATA TYPE BIGINT USING EXTRACT(EPOCH FROM known_since) * 1000000;
ALTER TABLE package_entries
ALTER COLUMN recorded_at
SET DATA TYPE BIGINT USING EXTRACT(EPOCH FROM recorded_at) * 1000000;
ALTER TABLE party_entries
ALTER COLUMN recorded_at
SET DATA TYPE BIGINT USING EXTRACT(EPOCH FROM recorded_at) * 1000000;
ALTER TABLE participant_command_submissions
ALTER COLUMN deduplicate_until
SET DATA TYPE BIGINT USING EXTRACT(EPOCH FROM deduplicate_until) * 1000000;
ALTER TABLE participant_command_completions
ALTER COLUMN record_time
SET DATA TYPE BIGINT USING EXTRACT(EPOCH FROM record_time) * 1000000;
ALTER TABLE participant_command_completions
ALTER COLUMN deduplication_start
SET DATA TYPE BIGINT USING EXTRACT(EPOCH FROM deduplication_start) * 1000000;
DROP VIEW participant_events;
ALTER TABLE participant_events_create
ALTER COLUMN ledger_effective_time
SET DATA TYPE BIGINT USING EXTRACT(EPOCH FROM ledger_effective_time) * 1000000;
ALTER TABLE participant_events_consuming_exercise
ALTER COLUMN ledger_effective_time
SET DATA TYPE BIGINT USING EXTRACT(EPOCH FROM ledger_effective_time) * 1000000;
ALTER TABLE participant_events_non_consuming_exercise
ALTER COLUMN ledger_effective_time
SET DATA TYPE BIGINT USING EXTRACT(EPOCH FROM ledger_effective_time) * 1000000;
CREATE VIEW participant_events
AS
SELECT
0::smallint as event_kind,
event_sequential_id,
NULL::text as event_offset,
NULL::text as transaction_id,
NULL::bigint as ledger_effective_time,
command_id,
workflow_id,
application_id,
submitters,
NULL::integer as node_index,
NULL::text as event_id,
contract_id,
template_id,
NULL::text[] as flat_event_witnesses,
tree_event_witnesses,
create_argument,
NULL::text[] as create_signatories,
NULL::text[] as create_observers,
NULL::text as create_agreement_text,
NULL::bytea as create_key_value,
NULL::text as create_key_hash,
NULL::text as exercise_choice,
NULL::bytea as exercise_argument,
NULL::bytea as exercise_result,
NULL::text[] as exercise_actors,
NULL::text[] as exercise_child_event_ids,
create_argument_compression,
NULL::smallint as create_key_value_compression,
NULL::smallint as exercise_argument_compression,
NULL::smallint as exercise_result_compression
FROM participant_events_divulgence
UNION ALL
SELECT
10::smallint as event_kind,
event_sequential_id,
event_offset,
transaction_id,
ledger_effective_time,
command_id,
workflow_id,
application_id,
submitters,
node_index,
event_id,
contract_id,
template_id,
flat_event_witnesses,
tree_event_witnesses,
create_argument,
create_signatories,
create_observers,
create_agreement_text,
create_key_value,
create_key_hash,
NULL::text as exercise_choice,
NULL::bytea as exercise_argument,
NULL::bytea as exercise_result,
NULL::text[] as exercise_actors,
NULL::text[] as exercise_child_event_ids,
create_argument_compression,
create_key_value_compression,
NULL::smallint as exercise_argument_compression,
NULL::smallint as exercise_result_compression
FROM participant_events_create
UNION ALL
SELECT
20::smallint as event_kind,
event_sequential_id,
event_offset,
transaction_id,
ledger_effective_time,
command_id,
workflow_id,
application_id,
submitters,
node_index,
event_id,
contract_id,
template_id,
flat_event_witnesses,
tree_event_witnesses,
NULL::bytea as create_argument,
NULL::text[] as create_signatories,
NULL::text[] as create_observers,
NULL::text as create_agreement_text,
create_key_value,
NULL::text as create_key_hash,
exercise_choice,
exercise_argument,
exercise_result,
exercise_actors,
exercise_child_event_ids,
NULL::smallint as create_argument_compression,
create_key_value_compression,
exercise_argument_compression,
exercise_result_compression
FROM participant_events_consuming_exercise
UNION ALL
SELECT
25::smallint as event_kind,
event_sequential_id,
event_offset,
transaction_id,
ledger_effective_time,
command_id,
workflow_id,
application_id,
submitters,
node_index,
event_id,
contract_id,
template_id,
flat_event_witnesses,
tree_event_witnesses,
NULL::bytea as create_argument,
NULL::text[] as create_signatories,
NULL::text[] as create_observers,
NULL::text as create_agreement_text,
create_key_value,
NULL::text as create_key_hash,
exercise_choice,
exercise_argument,
exercise_result,
exercise_actors,
exercise_child_event_ids,
NULL::smallint as create_argument_compression,
create_key_value_compression,
exercise_argument_compression,
exercise_result_compression
FROM participant_events_non_consuming_exercise
;

View File

@ -7,6 +7,7 @@ import java.io.BufferedReader
import java.sql.{PreparedStatement, Timestamp, Types}
import java.time.Instant
import java.util.Date
import java.util.concurrent.TimeUnit
import java.util.stream.Collectors
import anorm.Column.nonNull
@ -319,9 +320,19 @@ private[platform] object Conversions {
// Instant
def instant(name: String): RowParser[Instant] =
// TODO append-only: Delete after removing the mutating schema. The append-only schema only uses BIGINT for timestamps.
def instantFromTimestamp(name: String): RowParser[Instant] =
SqlParser.get[Date](name).map(_.toInstant)
def instantFromMicros(name: String): RowParser[Instant] =
SqlParser.get[Long](name).map(instantFromMicros)
def instantFromMicros(micros: Long): Instant = {
val seconds = TimeUnit.MICROSECONDS.toSeconds(micros)
val microsOfSecond = micros - TimeUnit.SECONDS.toMicros(seconds)
Instant.ofEpochSecond(seconds, TimeUnit.MICROSECONDS.toNanos(microsOfSecond))
}
// Hash
implicit object HashToStatement extends ToStatement[Hash] {

View File

@ -3,8 +3,6 @@
package com.daml.platform.store.backend
import java.time.Instant
import com.daml.scalautil.NeverEqualsOverride
sealed trait DbDto
@ -31,7 +29,7 @@ object DbDto {
final case class EventCreate(
event_offset: Option[String],
transaction_id: Option[String],
ledger_effective_time: Option[Instant],
ledger_effective_time: Option[Long],
command_id: Option[String],
workflow_id: Option[String],
application_id: Option[String],
@ -57,7 +55,7 @@ object DbDto {
consuming: Boolean,
event_offset: Option[String],
transaction_id: Option[String],
ledger_effective_time: Option[Instant],
ledger_effective_time: Option[Long],
command_id: Option[String],
workflow_id: Option[String],
application_id: Option[String],
@ -82,7 +80,7 @@ object DbDto {
final case class ConfigurationEntry(
ledger_offset: String,
recorded_at: Instant,
recorded_at: Long,
submission_id: String,
typ: String,
configuration: Array[Byte],
@ -91,7 +89,7 @@ object DbDto {
final case class PackageEntry(
ledger_offset: String,
recorded_at: Instant,
recorded_at: Long,
submission_id: Option[String],
typ: String,
rejection_reason: Option[String],
@ -102,14 +100,14 @@ object DbDto {
upload_id: String,
source_description: Option[String],
package_size: Long,
known_since: Instant,
known_since: Long,
ledger_offset: String,
_package: Array[Byte],
) extends DbDto
final case class PartyEntry(
ledger_offset: String,
recorded_at: Instant,
recorded_at: Long,
submission_id: Option[String],
party: Option[String],
display_name: Option[String],
@ -120,7 +118,7 @@ object DbDto {
final case class CommandCompletion(
completion_offset: String,
record_time: Instant,
record_time: Long,
application_id: String,
submitters: Set[String],
command_id: String,
@ -132,7 +130,7 @@ object DbDto {
deduplication_offset: Option[String],
deduplication_time_seconds: Option[Long],
deduplication_time_nanos: Option[Int],
deduplication_start: Option[Instant],
deduplication_start: Option[Long],
) extends DbDto
final case class CommandDeduplication(deduplication_key: String) extends DbDto

View File

@ -48,7 +48,7 @@ object UpdateToDbDto {
Iterator(
DbDto.ConfigurationEntry(
ledger_offset = offset.toHexString,
recorded_at = u.recordTime.toInstant,
recorded_at = u.recordTime.micros,
submission_id = u.submissionId,
typ = JdbcLedgerDao.acceptType,
configuration = Configuration.encode(u.newConfiguration).toByteArray,
@ -60,7 +60,7 @@ object UpdateToDbDto {
Iterator(
DbDto.ConfigurationEntry(
ledger_offset = offset.toHexString,
recorded_at = u.recordTime.toInstant,
recorded_at = u.recordTime.micros,
submission_id = u.submissionId,
typ = JdbcLedgerDao.rejectType,
configuration = Configuration.encode(u.proposedConfiguration).toByteArray,
@ -72,7 +72,7 @@ object UpdateToDbDto {
Iterator(
DbDto.PartyEntry(
ledger_offset = offset.toHexString,
recorded_at = u.recordTime.toInstant,
recorded_at = u.recordTime.micros,
submission_id = u.submissionId,
party = Some(u.party),
display_name = Option(u.displayName),
@ -86,7 +86,7 @@ object UpdateToDbDto {
Iterator(
DbDto.PartyEntry(
ledger_offset = offset.toHexString,
recorded_at = u.recordTime.toInstant,
recorded_at = u.recordTime.micros,
submission_id = Some(u.submissionId),
party = None,
display_name = None,
@ -104,7 +104,7 @@ object UpdateToDbDto {
upload_id = uploadId,
source_description = u.sourceDescription,
package_size = archive.getPayload.size.toLong,
known_since = u.recordTime.toInstant,
known_since = u.recordTime.micros,
ledger_offset = offset.toHexString,
_package = archive.toByteArray,
)
@ -112,7 +112,7 @@ object UpdateToDbDto {
val packageEntries = u.submissionId.iterator.map(submissionId =>
DbDto.PackageEntry(
ledger_offset = offset.toHexString,
recorded_at = u.recordTime.toInstant,
recorded_at = u.recordTime.micros,
submission_id = Some(submissionId),
typ = JdbcLedgerDao.acceptType,
rejection_reason = None,
@ -124,7 +124,7 @@ object UpdateToDbDto {
Iterator(
DbDto.PackageEntry(
ledger_offset = offset.toHexString,
recorded_at = u.recordTime.toInstant,
recorded_at = u.recordTime.micros,
submission_id = Some(u.submissionId),
typ = JdbcLedgerDao.rejectType,
rejection_reason = Some(u.rejectionReason),
@ -156,7 +156,7 @@ object UpdateToDbDto {
DbDto.EventCreate(
event_offset = Some(offset.toHexString),
transaction_id = Some(u.transactionId),
ledger_effective_time = Some(u.transactionMeta.ledgerEffectiveTime.toInstant),
ledger_effective_time = Some(u.transactionMeta.ledgerEffectiveTime.micros),
command_id = u.optCompletionInfo.map(_.commandId),
workflow_id = u.transactionMeta.workflowId,
application_id = u.optCompletionInfo.map(_.applicationId),
@ -195,7 +195,7 @@ object UpdateToDbDto {
consuming = exercise.consuming,
event_offset = Some(offset.toHexString),
transaction_id = Some(u.transactionId),
ledger_effective_time = Some(u.transactionMeta.ledgerEffectiveTime.toInstant),
ledger_effective_time = Some(u.transactionMeta.ledgerEffectiveTime.micros),
command_id = u.optCompletionInfo.map(_.commandId),
workflow_id = u.transactionMeta.workflowId,
application_id = u.optCompletionInfo.map(_.applicationId),
@ -280,7 +280,7 @@ object UpdateToDbDto {
DbDto.CommandCompletion(
completion_offset = offset.toHexString,
record_time = recordTime.toInstant,
record_time = recordTime.micros,
application_id = completionInfo.applicationId,
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,

View File

@ -5,9 +5,8 @@ package com.daml.platform.store.backend.common
import java.sql.Connection
import java.time.Instant
import java.util.Date
import anorm.SqlParser.{array, binaryStream, byteArray, date, flatten, int, long, str}
import anorm.SqlParser.{array, binaryStream, byteArray, flatten, int, long, str}
import anorm.{Macro, Row, RowParser, SQL, SimpleSql, SqlParser, SqlQuery, ~}
import com.daml.ledger.api.domain.{LedgerId, ParticipantId}
import com.daml.ledger.configuration.Configuration
@ -17,7 +16,7 @@ import com.daml.platform.store.Conversions.{
contractId,
eventId,
identifier,
instant,
instantFromMicros,
ledgerString,
offset,
}
@ -339,7 +338,7 @@ private[backend] trait CommonStorageBackend[DB_BATCH] extends StorageBackend[DB_
packageId: String,
sourceDescription: Option[String],
size: Long,
knownSince: Date,
knownSince: Long,
)
private val PackageDataParser: RowParser[ParsedPackageData] =
@ -356,7 +355,7 @@ private[backend] trait CommonStorageBackend[DB_BATCH] extends StorageBackend[DB_
.map(d =>
PackageId.assertFromString(d.packageId) -> PackageDetails(
d.size,
d.knownSince.toInstant,
instantFromMicros(d.knownSince),
d.sourceDescription,
)
)
@ -389,7 +388,7 @@ private[backend] trait CommonStorageBackend[DB_BATCH] extends StorageBackend[DB_
private val packageEntryParser: RowParser[(Offset, PackageLedgerEntry)] =
(offset("ledger_offset") ~
date("recorded_at") ~
instantFromMicros("recorded_at") ~
ledgerString("submission_id").? ~
str("typ") ~
str("rejection_reason").?)
@ -397,10 +396,10 @@ private[backend] trait CommonStorageBackend[DB_BATCH] extends StorageBackend[DB_
.map {
case (offset, recordTime, Some(submissionId), `acceptType`, None) =>
offset ->
PackageLedgerEntry.PackageUploadAccepted(submissionId, recordTime.toInstant)
PackageLedgerEntry.PackageUploadAccepted(submissionId, recordTime)
case (offset, recordTime, Some(submissionId), `rejectType`, Some(reason)) =>
offset ->
PackageLedgerEntry.PackageUploadRejected(submissionId, recordTime.toInstant, reason)
PackageLedgerEntry.PackageUploadRejected(submissionId, recordTime, reason)
case invalidRow =>
sys.error(s"packageEntryParser: invalid party entry row: $invalidRow")
}
@ -433,9 +432,8 @@ private[backend] trait CommonStorageBackend[DB_BATCH] extends StorageBackend[DB_
private case class ParsedCommandData(deduplicateUntil: Instant)
private val CommandDataParser: RowParser[ParsedCommandData] =
Macro.parser[ParsedCommandData](
"deduplicate_until"
)
instantFromMicros("deduplicate_until")
.map(ParsedCommandData)
def deduplicatedUntil(deduplicationKey: String)(connection: Connection): Instant =
SQL_SELECT_COMMAND
@ -450,7 +448,7 @@ private[backend] trait CommonStorageBackend[DB_BATCH] extends StorageBackend[DB_
def removeExpiredDeduplicationData(currentTime: Instant)(connection: Connection): Unit = {
SQL_DELETE_EXPIRED_COMMANDS
.on("currentTime" -> currentTime)
.on("currentTime" -> Timestamp.instantToMicros(currentTime))
.execute()(connection)
()
}
@ -598,7 +596,7 @@ private[backend] trait CommonStorageBackend[DB_BATCH] extends StorageBackend[DB_
eventId("event_id") ~
contractId("contract_id") ~
identifier("template_id").? ~
instant("ledger_effective_time").? ~
instantFromMicros("ledger_effective_time").? ~
array[String]("create_signatories").? ~
array[String]("create_observers").? ~
str("create_agreement_text").? ~

View File

@ -15,7 +15,7 @@ import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.Party
import com.daml.platform.index.index.StatusDetails
import com.daml.platform.store.CompletionFromTransaction
import com.daml.platform.store.Conversions.{instant, offset}
import com.daml.platform.store.Conversions.{instantFromMicros, offset}
import com.daml.platform.store.backend.CompletionStorageBackend
import com.google.protobuf.any
import com.google.rpc.status.{Status => StatusProto}
@ -61,7 +61,7 @@ trait CompletionStorageBackendTemplate extends CompletionStorageBackend {
private val sharedColumns: RowParser[Offset ~ Instant ~ String ~ String ~ Option[String]] =
offset("completion_offset") ~
instant("record_time") ~
instantFromMicros("record_time") ~
str("command_id") ~
str("application_id") ~
str("submission_id").?
@ -77,7 +77,7 @@ trait CompletionStorageBackendTemplate extends CompletionStorageBackend {
private val deduplicationTimeNanosColumn: RowParser[Option[Int]] =
int("deduplication_time_nanos").?
private val deduplicationStartColumn: RowParser[Option[Instant]] =
instant("deduplication_start").?
instantFromMicros("deduplication_start").?
private val acceptedCommandParser: RowParser[CompletionStreamResponse] =
acceptedCommandSharedColumns ~

View File

@ -6,14 +6,14 @@ package com.daml.platform.store.backend.common
import java.sql.Connection
import java.time.Instant
import anorm.SqlParser.{binaryStream, get, int, long, str}
import anorm.SqlParser.{binaryStream, int, long, str}
import anorm.{ResultSetParser, RowParser, SqlParser, ~}
import com.daml.lf.data.Ref
import com.daml.platform.store.Conversions.{
contractId,
flatEventWitnessesColumn,
identifier,
instant,
instantFromMicros,
offset,
}
import com.daml.platform.store.SimpleSqlAsVectorOf.SimpleSqlAsVectorOf
@ -99,7 +99,7 @@ trait ContractStorageBackendTemplate extends ContractStorageBackend {
FROM create_and_divulged_contracts
WHERE NOT EXISTS (SELECT 1 FROM archival_event)
FETCH NEXT 1 ROW ONLY"""
.as(instant("ledger_effective_time").?.singleOpt)(connection)
.as(instantFromMicros("ledger_effective_time").?.singleOpt)(connection)
}
val queriedIds: List[(ContractId, Option[Option[Instant]])] = ids.toList
@ -137,7 +137,7 @@ trait ContractStorageBackendTemplate extends ContractStorageBackend {
~ flatEventWitnessesColumn("flat_event_witnesses")
~ binaryStream("create_argument").?
~ int("create_argument_compression").?
~ int("event_kind") ~ get[Instant]("ledger_effective_time")(anorm.Column.columnToInstant).?)
~ int("event_kind") ~ instantFromMicros("ledger_effective_time").?)
.map(SqlParser.flatten)
.map(StorageBackend.RawContractState.tupled)
@ -167,7 +167,7 @@ trait ContractStorageBackendTemplate extends ContractStorageBackend {
(int("event_kind") ~
contractId("contract_id") ~
identifier("template_id").? ~
instant("ledger_effective_time").? ~
instantFromMicros("ledger_effective_time").? ~
binaryStream("create_key_value").? ~
int("create_key_value_compression").? ~
binaryStream("create_argument").? ~

View File

@ -11,7 +11,7 @@ import anorm.SqlParser.{array, binaryStream, bool, int, long, str}
import anorm.{RowParser, ~}
import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref
import com.daml.platform.store.Conversions.{identifier, instant, offset}
import com.daml.platform.store.Conversions.{identifier, instantFromMicros, offset}
import com.daml.platform.store.SimpleSqlAsVectorOf.SimpleSqlAsVectorOf
import com.daml.platform.store.appendonlydao.events.{EventsTable, Identifier, Raw}
import com.daml.platform.store.backend.EventStorageBackend
@ -57,7 +57,7 @@ trait EventStorageBackendTemplate extends EventStorageBackend {
long("event_sequential_id") ~
str("event_id") ~
str("contract_id") ~
instant("ledger_effective_time") ~
instantFromMicros("ledger_effective_time") ~
identifier("template_id") ~
str("command_id").? ~
str("workflow_id").? ~

View File

@ -6,6 +6,7 @@ package com.daml.platform.store.backend.common
import java.lang
import java.sql.PreparedStatement
import java.time.Instant
import java.util.concurrent.TimeUnit
import scala.reflect.ClassTag
@ -91,11 +92,10 @@ private[backend] case class BooleanOptional[FROM](extract: FROM => Option[Boolea
override def convert: Option[Boolean] => lang.Boolean = _.map(Boolean.box).orNull
}
private[backend] case class Timestamp[FROM](extract: FROM => Instant)
extends TrivialField[FROM, Instant]
private[backend] case class TimestampOptional[FROM](extract: FROM => Option[Instant])
extends TrivialOptionalField[FROM, Instant]
private[backend] object Timestamp {
def instantToMicros(i: Instant): Long =
TimeUnit.SECONDS.toMicros(i.getEpochSecond) + TimeUnit.NANOSECONDS.toMicros(i.getNano.toLong)
}
private[backend] case class StringArray[FROM](extract: FROM => Iterable[String])
extends Field[FROM, Iterable[String], Array[String]] {

View File

@ -6,11 +6,11 @@ package com.daml.platform.store.backend.common
import java.sql.Connection
import anorm.{RowParser, SQL, ~}
import anorm.SqlParser.{bool, date, flatten, str}
import anorm.SqlParser.{bool, flatten, str}
import com.daml.ledger.api.domain.PartyDetails
import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref
import com.daml.platform.store.Conversions.{ledgerString, offset, party}
import com.daml.platform.store.Conversions.{ledgerString, instantFromMicros, offset, party}
import com.daml.platform.store.SimpleSqlAsVectorOf.SimpleSqlAsVectorOf
import com.daml.platform.store.appendonlydao.JdbcLedgerDao.{acceptType, rejectType}
import com.daml.platform.store.backend.PartyStorageBackend
@ -32,7 +32,7 @@ trait PartyStorageBackendTemplate extends PartyStorageBackend {
private val partyEntryParser: RowParser[(Offset, PartyLedgerEntry)] = {
import com.daml.platform.store.Conversions.bigDecimalColumnToBoolean
(offset("ledger_offset") ~
date("recorded_at") ~
instantFromMicros("recorded_at") ~
ledgerString("submission_id").? ~
party("party").? ~
str("display_name").? ~
@ -54,7 +54,7 @@ trait PartyStorageBackendTemplate extends PartyStorageBackend {
offset ->
PartyLedgerEntry.AllocationAccepted(
submissionIdOpt,
recordTime.toInstant,
recordTime,
PartyDetails(party, displayNameOpt, isLocal),
)
case (
@ -69,7 +69,7 @@ trait PartyStorageBackendTemplate extends PartyStorageBackend {
) =>
offset -> PartyLedgerEntry.AllocationRejected(
submissionId,
recordTime.toInstant,
recordTime,
reason,
)
case invalidRow =>

View File

@ -4,7 +4,6 @@
package com.daml.platform.store.backend.common
import java.sql.Connection
import java.time.Instant
import com.daml.platform.store.backend.DbDto
@ -53,14 +52,6 @@ private[backend] object AppendOnlySchema {
def smallintOptional[FROM, _](extractor: FROM => Option[Int]): Field[FROM, Option[Int], _] =
SmallintOptional(extractor)
def timestamp[FROM, _](extractor: FROM => Instant): Field[FROM, Instant, _] =
Timestamp(extractor)
def timestampOptional[FROM, _](
extractor: FROM => Option[Instant]
): Field[FROM, Option[Instant], _] =
TimestampOptional(extractor)
def intOptional[FROM, _](extractor: FROM => Option[Int]): Field[FROM, Option[Int], _] =
IntOptional(extractor)
@ -101,7 +92,7 @@ private[backend] object AppendOnlySchema {
fieldStrategy.insert("participant_events_create")(
"event_offset" -> fieldStrategy.stringOptional(_.event_offset),
"transaction_id" -> fieldStrategy.stringOptional(_.transaction_id),
"ledger_effective_time" -> fieldStrategy.timestampOptional(_.ledger_effective_time),
"ledger_effective_time" -> fieldStrategy.bigintOptional(_.ledger_effective_time),
"command_id" -> fieldStrategy.stringOptional(_.command_id),
"workflow_id" -> fieldStrategy.stringOptional(_.workflow_id),
"application_id" -> fieldStrategy.stringOptional(_.application_id),
@ -133,7 +124,7 @@ private[backend] object AppendOnlySchema {
"event_offset" -> fieldStrategy.stringOptional(_.event_offset),
"contract_id" -> fieldStrategy.string(_.contract_id),
"transaction_id" -> fieldStrategy.stringOptional(_.transaction_id),
"ledger_effective_time" -> fieldStrategy.timestampOptional(_.ledger_effective_time),
"ledger_effective_time" -> fieldStrategy.bigintOptional(_.ledger_effective_time),
"node_index" -> fieldStrategy.intOptional(_.node_index),
"command_id" -> fieldStrategy.stringOptional(_.command_id),
"workflow_id" -> fieldStrategy.stringOptional(_.workflow_id),
@ -169,7 +160,7 @@ private[backend] object AppendOnlySchema {
val configurationEntries: Table[DbDto.ConfigurationEntry] =
fieldStrategy.insert("configuration_entries")(
"ledger_offset" -> fieldStrategy.string(_.ledger_offset),
"recorded_at" -> fieldStrategy.timestamp(_.recorded_at),
"recorded_at" -> fieldStrategy.bigint(_.recorded_at),
"submission_id" -> fieldStrategy.string(_.submission_id),
"typ" -> fieldStrategy.string(_.typ),
"configuration" -> fieldStrategy.bytea(_.configuration),
@ -179,7 +170,7 @@ private[backend] object AppendOnlySchema {
val packageEntries: Table[DbDto.PackageEntry] =
fieldStrategy.insert("package_entries")(
"ledger_offset" -> fieldStrategy.string(_.ledger_offset),
"recorded_at" -> fieldStrategy.timestamp(_.recorded_at),
"recorded_at" -> fieldStrategy.bigint(_.recorded_at),
"submission_id" -> fieldStrategy.stringOptional(_.submission_id),
"typ" -> fieldStrategy.string(_.typ),
"rejection_reason" -> fieldStrategy.stringOptional(_.rejection_reason),
@ -194,7 +185,7 @@ private[backend] object AppendOnlySchema {
"upload_id" -> fieldStrategy.string(_.upload_id),
"source_description" -> fieldStrategy.stringOptional(_.source_description),
"package_size" -> fieldStrategy.bigint(_.package_size),
"known_since" -> fieldStrategy.timestamp(_.known_since),
"known_since" -> fieldStrategy.bigint(_.known_since),
"ledger_offset" -> fieldStrategy.string(_.ledger_offset),
"package" -> fieldStrategy.bytea(_._package),
)
@ -202,7 +193,7 @@ private[backend] object AppendOnlySchema {
val partyEntries: Table[DbDto.PartyEntry] =
fieldStrategy.insert("party_entries")(
"ledger_offset" -> fieldStrategy.string(_.ledger_offset),
"recorded_at" -> fieldStrategy.timestamp(_.recorded_at),
"recorded_at" -> fieldStrategy.bigint(_.recorded_at),
"submission_id" -> fieldStrategy.stringOptional(_.submission_id),
"party" -> fieldStrategy.stringOptional(_.party),
"display_name" -> fieldStrategy.stringOptional(_.display_name),
@ -214,7 +205,7 @@ private[backend] object AppendOnlySchema {
val commandCompletions: Table[DbDto.CommandCompletion] =
fieldStrategy.insert("participant_command_completions")(
"completion_offset" -> fieldStrategy.string(_.completion_offset),
"record_time" -> fieldStrategy.timestamp(_.record_time),
"record_time" -> fieldStrategy.bigint(_.record_time),
"application_id" -> fieldStrategy.string(_.application_id),
"submitters" -> fieldStrategy.stringArray(_.submitters),
"command_id" -> fieldStrategy.string(_.command_id),
@ -226,7 +217,7 @@ private[backend] object AppendOnlySchema {
"deduplication_offset" -> fieldStrategy.stringOptional(_.deduplication_offset),
"deduplication_time_seconds" -> fieldStrategy.bigintOptional(_.deduplication_time_seconds),
"deduplication_time_nanos" -> fieldStrategy.intOptional(_.deduplication_time_nanos),
"deduplication_start" -> fieldStrategy.timestampOptional(_.deduplication_start),
"deduplication_start" -> fieldStrategy.bigintOptional(_.deduplication_start),
)
val commandSubmissionDeletes: Table[DbDto.CommandDeduplication] =

View File

@ -23,6 +23,7 @@ import com.daml.platform.store.backend.common.{
InitHookDataSourceProxy,
PartyStorageBackendTemplate,
QueryStrategy,
Timestamp,
}
import com.daml.platform.store.backend.{
DBLockStorageBackend,
@ -85,7 +86,7 @@ private[backend] object H2StorageBackend
|when matched and pcs.deduplicate_until < {submittedAt} then
| update set deduplicate_until={deduplicateUntil}""".stripMargin
def upsertDeduplicationEntry(
override def upsertDeduplicationEntry(
key: String,
submittedAt: Instant,
deduplicateUntil: Instant,
@ -93,8 +94,8 @@ private[backend] object H2StorageBackend
SQL(SQL_INSERT_COMMAND)
.on(
"deduplicationKey" -> key,
"submittedAt" -> submittedAt,
"deduplicateUntil" -> deduplicateUntil,
"submittedAt" -> Timestamp.instantToMicros(submittedAt),
"deduplicateUntil" -> Timestamp.instantToMicros(deduplicateUntil),
)
.executeUpdate()(connection)

View File

@ -3,37 +3,11 @@
package com.daml.platform.store.backend.oracle
import java.sql.{PreparedStatement, Timestamp}
import java.time.Instant
import java.sql.PreparedStatement
import com.daml.platform.store.backend.common.Field
import spray.json._
import spray.json.DefaultJsonProtocol._
private[oracle] case class OracleTimestamp[FROM](extract: FROM => Instant)
extends Field[FROM, Instant, Timestamp] {
override def convert: Instant => java.sql.Timestamp = Timestamp.from
override def prepareDataTemplate(
preparedStatement: PreparedStatement,
index: Int,
value: Timestamp,
): Unit = {
preparedStatement.setTimestamp(index, value)
}
}
private[oracle] case class OracleTimestampOptional[FROM](extract: FROM => Option[Instant])
extends Field[FROM, Option[Instant], java.sql.Timestamp] {
override def convert: Option[Instant] => java.sql.Timestamp =
_.map(java.sql.Timestamp.from).orNull
override def prepareDataTemplate(
preparedStatement: PreparedStatement,
index: Int,
value: Timestamp,
): Unit = {
preparedStatement.setTimestamp(index, value)
}
}
private[oracle] case class OracleStringArray[FROM](extract: FROM => Iterable[String])
extends Field[FROM, Iterable[String], String] {
override def convert: Iterable[String] => String = _.toList.toJson.compactPrint

View File

@ -3,8 +3,6 @@
package com.daml.platform.store.backend.oracle
import java.time.Instant
import com.daml.platform.store.backend.DbDto
import com.daml.platform.store.backend.common.AppendOnlySchema.FieldStrategy
import com.daml.platform.store.backend.common.{AppendOnlySchema, Field, Schema, Table}
@ -21,14 +19,6 @@ private[oracle] object OracleSchema {
): Field[FROM, Option[Iterable[String]], _] =
OracleStringArrayOptional(extractor)
override def timestamp[FROM, _](extractor: FROM => Instant): Field[FROM, Instant, _] =
OracleTimestamp(extractor)
override def timestampOptional[FROM, _](
extractor: FROM => Option[Instant]
): Field[FROM, Option[Instant], _] =
OracleTimestampOptional(extractor)
override def insert[FROM](tableName: String)(
fields: (String, Field[FROM, _, _])*
): Table[FROM] =

View File

@ -16,6 +16,7 @@ import com.daml.platform.store.backend.common.{
InitHookDataSourceProxy,
PartyStorageBackendTemplate,
QueryStrategy,
Timestamp,
}
import com.daml.platform.store.backend.{
DBLockStorageBackend,
@ -81,7 +82,7 @@ private[backend] object OracleStorageBackend
| insert (pcs.deduplication_key, pcs.deduplicate_until)
| values ({deduplicationKey}, {deduplicateUntil})""".stripMargin
def upsertDeduplicationEntry(
override def upsertDeduplicationEntry(
key: String,
submittedAt: Instant,
deduplicateUntil: Instant,
@ -89,8 +90,8 @@ private[backend] object OracleStorageBackend
SQL(SQL_INSERT_COMMAND)
.on(
"deduplicationKey" -> key,
"submittedAt" -> submittedAt,
"deduplicateUntil" -> deduplicateUntil,
"submittedAt" -> Timestamp.instantToMicros(submittedAt),
"deduplicateUntil" -> Timestamp.instantToMicros(deduplicateUntil),
)
.executeUpdate()(connection)

View File

@ -3,33 +3,8 @@
package com.daml.platform.store.backend.postgresql
import java.time.{Instant, ZoneOffset}
import java.time.format.DateTimeFormatter
import com.daml.platform.store.backend.common.Field
private[postgresql] trait PGTimestampBase[FROM, TO] extends Field[FROM, TO, String] {
override def selectFieldExpression(inputFieldName: String): String =
s"$inputFieldName::timestamp"
private val PGTimestampFormat =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS")
protected def convertBase: Instant => String =
_.atZone(ZoneOffset.UTC).toLocalDateTime
.format(PGTimestampFormat)
}
private[postgresql] case class PGTimestamp[FROM](extract: FROM => Instant)
extends PGTimestampBase[FROM, Instant] {
override def convert: Instant => String = convertBase
}
private[postgresql] case class PGTimestampOptional[FROM](extract: FROM => Option[Instant])
extends PGTimestampBase[FROM, Option[Instant]] {
override def convert: Option[Instant] => String = _.map(convertBase).orNull
}
private[postgresql] trait PGStringArrayBase[FROM, TO] extends Field[FROM, TO, String] {
override def selectFieldExpression(inputFieldName: String): String =
s"string_to_array($inputFieldName, '|')"

View File

@ -3,8 +3,6 @@
package com.daml.platform.store.backend.postgresql
import java.time.Instant
import com.daml.platform.store.backend.DbDto
import com.daml.platform.store.backend.common.AppendOnlySchema.FieldStrategy
import com.daml.platform.store.backend.common.{AppendOnlySchema, Field, Schema, Table}
@ -21,14 +19,6 @@ private[postgresql] object PGSchema {
): Field[FROM, Option[Iterable[String]], _] =
PGStringArrayOptional(extractor)
override def timestamp[FROM, _](extractor: FROM => Instant): Field[FROM, Instant, _] =
PGTimestamp(extractor)
override def timestampOptional[FROM, _](
extractor: FROM => Option[Instant]
): Field[FROM, Option[Instant], _] =
PGTimestampOptional(extractor)
override def smallintOptional[FROM, _](
extractor: FROM => Option[Int]
): Field[FROM, Option[Int], _] =

View File

@ -25,6 +25,7 @@ import com.daml.platform.store.backend.common.{
InitHookDataSourceProxy,
PartyStorageBackendTemplate,
QueryStrategy,
Timestamp,
}
import com.daml.platform.store.backend.{
DBLockStorageBackend,
@ -73,8 +74,8 @@ private[backend] object PostgresStorageBackend
SQL(SQL_INSERT_COMMAND)
.on(
"deduplicationKey" -> key,
"submittedAt" -> submittedAt,
"deduplicateUntil" -> deduplicateUntil,
"submittedAt" -> Timestamp.instantToMicros(submittedAt),
"deduplicateUntil" -> Timestamp.instantToMicros(deduplicateUntil),
)
.executeUpdate()(connection)

View File

@ -19,7 +19,9 @@ private[platform] object CommandCompletionsTable {
import SqlParser.{int, str}
private val sharedColumns: RowParser[Offset ~ Instant ~ String ~ String] =
offset("completion_offset") ~ instant("record_time") ~ str("command_id") ~ str("application_id")
offset("completion_offset") ~ instantFromTimestamp("record_time") ~ str("command_id") ~ str(
"application_id"
)
private val acceptedCommandParser: RowParser[CompletionStreamResponse] =
sharedColumns ~ str("transaction_id") map {

View File

@ -76,7 +76,7 @@ private[events] abstract class ContractsTable extends PostCommitValidationData {
} else {
SQL"select max(create_ledger_effective_time) as max_create_ledger_effective_time, count(*) as num_contracts from participant_contracts where participant_contracts.contract_id in ($ids)"
.as(
(instant("max_create_ledger_effective_time").? ~ int("num_contracts")).single
(instantFromTimestamp("max_create_ledger_effective_time").? ~ int("num_contracts")).single
.map {
case result ~ numContracts if numContracts == ids.size => Success(result)
case _ => Failure(ContractsTable.notFound(ids))

View File

@ -26,7 +26,7 @@ import com.daml.ledger.offset.Offset
import com.daml.platform.ApiOffset
import com.daml.platform.api.v1.event.EventOps.{EventOps, TreeEventOps}
import com.daml.platform.index.TransactionConversion
import com.daml.platform.store.Conversions.{identifier, instant, offset}
import com.daml.platform.store.Conversions.{identifier, instantFromTimestamp, offset}
import com.daml.platform.store.DbType
import com.google.protobuf.timestamp.Timestamp
@ -55,7 +55,7 @@ private[events] object EventsTable {
long("event_sequential_id") ~
str("event_id") ~
str("contract_id") ~
instant("ledger_effective_time") ~
instantFromTimestamp("ledger_effective_time") ~
identifier("template_id") ~
str("command_id").? ~
str("workflow_id").? ~

View File

@ -13,6 +13,7 @@ trait StorageBackendSuite
with StorageBackendTestsCompletions
with StorageBackendTestsReset
with StorageBackendTestsPruning
with StorageBackendTestsDBLock {
with StorageBackendTestsDBLock
with StorageBackendTestsTimestamps {
this: AsyncFlatSpec =>
}

View File

@ -11,6 +11,7 @@ import com.daml.ledger.api.domain.{LedgerId, ParticipantId}
import com.daml.ledger.configuration.{Configuration, LedgerTimeModel}
import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.ledger.EventId
import com.daml.lf.transaction.NodeId
import com.daml.platform.store.appendonlydao.JdbcLedgerDao
@ -27,7 +28,8 @@ private[backend] object StorageBackendTestValues {
def transactionIdFromOffset(x: Offset): Ref.LedgerString =
Ref.LedgerString.assertFromString(x.toHexString)
val someTime: Instant = Instant.now()
def timestampFromInstant(i: Instant): Timestamp = Timestamp.assertFromInstant(i)
val someTime: Timestamp = timestampFromInstant(Instant.now())
val someConfiguration: Configuration =
Configuration(1, LedgerTimeModel.reasonableDefault, Duration.ofHours(23))
@ -60,7 +62,7 @@ private[backend] object StorageBackendTestValues {
): DbDto.ConfigurationEntry =
DbDto.ConfigurationEntry(
ledger_offset = offset.toHexString,
recorded_at = someTime,
recorded_at = someTime.micros,
submission_id = "submission_id",
typ = JdbcLedgerDao.acceptType,
configuration = Configuration.encode(configuration).toByteArray,
@ -73,7 +75,7 @@ private[backend] object StorageBackendTestValues {
isLocal: Boolean = true,
): DbDto.PartyEntry = DbDto.PartyEntry(
ledger_offset = offset.toHexString,
recorded_at = someTime,
recorded_at = someTime.micros,
submission_id = Some("submission_id"),
party = Some(party),
display_name = Some(party),
@ -87,14 +89,14 @@ private[backend] object StorageBackendTestValues {
upload_id = "upload_id",
source_description = Some("source_description"),
package_size = someArchive.getPayload.size.toLong,
known_since = someTime,
known_since = someTime.micros,
ledger_offset = offset.toHexString,
_package = someArchive.toByteArray,
)
def dtoPackageEntry(offset: Offset): DbDto.PackageEntry = DbDto.PackageEntry(
ledger_offset = offset.toHexString,
recorded_at = someTime,
recorded_at = someTime.micros,
submission_id = Some("submission_id"),
typ = JdbcLedgerDao.acceptType,
rejection_reason = None,
@ -110,12 +112,13 @@ private[backend] object StorageBackendTestValues {
signatory: String = "signatory",
observer: String = "observer",
commandId: String = UUID.randomUUID().toString,
ledgerEffectiveTime: Option[Timestamp] = Some(someTime),
): DbDto.EventCreate = {
val transactionId = transactionIdFromOffset(offset)
DbDto.EventCreate(
event_offset = Some(offset.toHexString),
transaction_id = Some(transactionId),
ledger_effective_time = Some(someTime),
ledger_effective_time = ledgerEffectiveTime.map(_.micros),
command_id = Some(commandId),
workflow_id = Some("workflow_id"),
application_id = Some(someApplicationId),
@ -158,7 +161,7 @@ private[backend] object StorageBackendTestValues {
consuming = consuming,
event_offset = Some(offset.toHexString),
transaction_id = Some(transactionId),
ledger_effective_time = Some(someTime),
ledger_effective_time = Some(someTime.micros),
command_id = Some(commandId),
workflow_id = Some("workflow_id"),
application_id = Some(someApplicationId),
@ -216,11 +219,11 @@ private[backend] object StorageBackendTestValues {
deduplicationOffset: Option[String] = None,
deduplicationTimeSeconds: Option[Long] = None,
deduplicationTimeNanos: Option[Int] = None,
deduplicationStart: Option[Instant] = None,
deduplicationStart: Option[Timestamp] = None,
): DbDto.CommandCompletion =
DbDto.CommandCompletion(
completion_offset = offset.toHexString,
record_time = someTime,
record_time = someTime.micros,
application_id = applicationId,
submitters = Set(submitter),
command_id = commandId,
@ -232,7 +235,7 @@ private[backend] object StorageBackendTestValues {
deduplication_offset = deduplicationOffset,
deduplication_time_seconds = deduplicationTimeSeconds,
deduplication_time_nanos = deduplicationTimeNanos,
deduplication_start = deduplicationStart,
deduplication_start = deduplicationStart.map(_.micros),
)
def dtoTransactionId(dto: DbDto): Ref.TransactionId = {

View File

@ -0,0 +1,83 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.store.backend
import java.sql.Connection
import java.time.Instant
import java.util.TimeZone
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers
import scala.util.Success
private[backend] trait StorageBackendTestsTimestamps extends Matchers with StorageBackendSpec {
this: AsyncFlatSpec =>
behavior of "StorageBackend (timestamps)"
import StorageBackendTestValues._
it should "correctly read ledger effective time using maximumLedgerTime" in {
val let = timestampFromInstant(Instant.now)
val cid = com.daml.lf.value.Value.ContractId.V0.assertFromString("#1")
val create = dtoCreate(
offset = offset(1),
eventSequentialId = 1L,
contractId = cid.coid,
ledgerEffectiveTime = Some(let),
)
for {
_ <- executeSql(backend.initializeParameters(someIdentityParams))
_ <- executeSql(ingest(Vector(create), _))
_ <- executeSql(backend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(1), 1L)))
let1 <- executeSql(backend.maximumLedgerTime(Set(cid)))
let2 <- executeSql(withDefaultTimeZone("GMT-1")(backend.maximumLedgerTime(Set(cid))))
let3 <- executeSql(withDefaultTimeZone("GMT+1")(backend.maximumLedgerTime(Set(cid))))
} yield {
withClue("UTC") { let1 shouldBe Success(Some(let.toInstant)) }
withClue("GMT-1") { let2 shouldBe Success(Some(let.toInstant)) }
withClue("GMT+1") { let3 shouldBe Success(Some(let.toInstant)) }
}
}
it should "correctly read ledger effective time using rawEvents" in {
val let = timestampFromInstant(Instant.now)
val cid = com.daml.lf.value.Value.ContractId.V0.assertFromString("#1")
val create = dtoCreate(
offset = offset(1),
eventSequentialId = 1L,
contractId = cid.coid,
ledgerEffectiveTime = Some(let),
)
for {
_ <- executeSql(backend.initializeParameters(someIdentityParams))
_ <- executeSql(ingest(Vector(create), _))
_ <- executeSql(backend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(1), 1L)))
events1 <- executeSql(backend.rawEvents(0L, 1L))
events2 <- executeSql(withDefaultTimeZone("GMT-1")(backend.rawEvents(0L, 1L)))
events3 <- executeSql(withDefaultTimeZone("GMT+1")(backend.rawEvents(0L, 1L)))
} yield {
withClue("UTC") { events1.head.ledgerEffectiveTime shouldBe Some(let.toInstant) }
withClue("GMT-1") { events2.head.ledgerEffectiveTime shouldBe Some(let.toInstant) }
withClue("GMT+1") { events3.head.ledgerEffectiveTime shouldBe Some(let.toInstant) }
}
}
// Some JDBC operations depend on the JVM default time zone.
// In particular, TIMESTAMP WITHOUT TIME ZONE columns are interpreted in the local time zone of the client.
private def withDefaultTimeZone[T](tz: String)(f: Connection => T)(connection: Connection): T = {
val previousDefaultTimeZone = TimeZone.getDefault
TimeZone.setDefault(TimeZone.getTimeZone(tz))
try {
f(connection)
} finally {
TimeZone.setDefault(previousDefaultTimeZone)
}
}
}

View File

@ -23,7 +23,7 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers {
private val someParty = DbDto.PartyEntry(
ledger_offset = "",
recorded_at = null,
recorded_at = 0,
submission_id = null,
party = Some("party"),
display_name = None,

View File

@ -142,7 +142,7 @@ object SequentialWriteDaoSpec {
private val someParty = DbDto.PartyEntry(
ledger_offset = "",
recorded_at = null,
recorded_at = 0,
submission_id = null,
party = Some("party"),
display_name = None,

View File

@ -62,7 +62,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
dtos should contain theSameElementsInOrderAs List(
DbDto.ConfigurationEntry(
ledger_offset = someOffset.toHexString,
recorded_at = update.recordTime.toInstant,
recorded_at = update.recordTime.micros,
submission_id = update.submissionId,
typ = JdbcLedgerDao.acceptType,
configuration = Configuration.encode(update.newConfiguration).toByteArray,
@ -87,7 +87,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
dtos should contain theSameElementsInOrderAs List(
DbDto.ConfigurationEntry(
ledger_offset = someOffset.toHexString,
recorded_at = someRecordTime.toInstant,
recorded_at = someRecordTime.micros,
submission_id = someSubmissionId,
typ = JdbcLedgerDao.rejectType,
configuration = Configuration.encode(someConfiguration).toByteArray,
@ -112,7 +112,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
dtos should contain theSameElementsInOrderAs List(
DbDto.PartyEntry(
ledger_offset = someOffset.toHexString,
recorded_at = someRecordTime.toInstant,
recorded_at = someRecordTime.micros,
submission_id = Some(someSubmissionId),
party = Some(someParty),
display_name = Some(displayName),
@ -139,7 +139,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
dtos should contain theSameElementsInOrderAs List(
DbDto.PartyEntry(
ledger_offset = someOffset.toHexString,
recorded_at = someRecordTime.toInstant,
recorded_at = someRecordTime.micros,
submission_id = None,
party = Some(someParty),
display_name = Some(displayName),
@ -165,7 +165,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
dtos should contain theSameElementsInOrderAs List(
DbDto.PartyEntry(
ledger_offset = someOffset.toHexString,
recorded_at = someRecordTime.toInstant,
recorded_at = someRecordTime.micros,
submission_id = Some(someSubmissionId),
party = None,
display_name = None,
@ -194,7 +194,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
upload_id = someSubmissionId,
source_description = Some(sourceDescription),
package_size = someArchive1.getPayload.size.toLong,
known_since = someRecordTime.toInstant,
known_since = someRecordTime.micros,
ledger_offset = someOffset.toHexString,
_package = someArchive1.toByteArray,
),
@ -203,13 +203,13 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
upload_id = someSubmissionId,
source_description = Some(sourceDescription),
package_size = someArchive2.getPayload.size.toLong,
known_since = someRecordTime.toInstant,
known_since = someRecordTime.micros,
ledger_offset = someOffset.toHexString,
_package = someArchive2.toByteArray,
),
DbDto.PackageEntry(
ledger_offset = someOffset.toHexString,
recorded_at = someRecordTime.toInstant,
recorded_at = someRecordTime.micros,
submission_id = Some(someSubmissionId),
typ = JdbcLedgerDao.acceptType,
rejection_reason = None,
@ -231,7 +231,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
dtos should contain theSameElementsInOrderAs List(
DbDto.PackageEntry(
ledger_offset = someOffset.toHexString,
recorded_at = someRecordTime.toInstant,
recorded_at = someRecordTime.micros,
submission_id = Some(someSubmissionId),
typ = JdbcLedgerDao.rejectType,
rejection_reason = Some(rejectionReason),
@ -254,7 +254,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
dtos should contain theSameElementsInOrderAs List(
DbDto.CommandCompletion(
completion_offset = someOffset.toHexString,
record_time = someRecordTime.toInstant,
record_time = someRecordTime.micros,
application_id = someApplicationId,
submitters = Set(someParty),
command_id = someCommandId,
@ -308,7 +308,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
DbDto.EventCreate(
event_offset = Some(someOffset.toHexString),
transaction_id = Some(update.transactionId),
ledger_effective_time = Some(transactionMeta.ledgerEffectiveTime.toInstant),
ledger_effective_time = Some(transactionMeta.ledgerEffectiveTime.micros),
command_id = Some(completionInfo.commandId),
workflow_id = transactionMeta.workflowId,
application_id = Some(completionInfo.applicationId),
@ -331,7 +331,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
),
DbDto.CommandCompletion(
completion_offset = someOffset.toHexString,
record_time = update.recordTime.toInstant,
record_time = update.recordTime.micros,
application_id = completionInfo.applicationId,
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
@ -381,7 +381,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
DbDto.EventCreate(
event_offset = Some(someOffset.toHexString),
transaction_id = Some(update.transactionId),
ledger_effective_time = Some(transactionMeta.ledgerEffectiveTime.toInstant),
ledger_effective_time = Some(transactionMeta.ledgerEffectiveTime.micros),
command_id = Some(completionInfo.commandId),
workflow_id = transactionMeta.workflowId,
application_id = Some(completionInfo.applicationId),
@ -404,7 +404,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
),
DbDto.CommandCompletion(
completion_offset = someOffset.toHexString,
record_time = update.recordTime.toInstant,
record_time = update.recordTime.micros,
application_id = completionInfo.applicationId,
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
@ -465,7 +465,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
consuming = true,
event_offset = Some(someOffset.toHexString),
transaction_id = Some(update.transactionId),
ledger_effective_time = Some(transactionMeta.ledgerEffectiveTime.toInstant),
ledger_effective_time = Some(transactionMeta.ledgerEffectiveTime.micros),
command_id = Some(completionInfo.commandId),
workflow_id = transactionMeta.workflowId,
application_id = Some(completionInfo.applicationId),
@ -489,7 +489,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
),
DbDto.CommandCompletion(
completion_offset = someOffset.toHexString,
record_time = update.recordTime.toInstant,
record_time = update.recordTime.micros,
application_id = completionInfo.applicationId,
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
@ -550,7 +550,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
consuming = false,
event_offset = Some(someOffset.toHexString),
transaction_id = Some(update.transactionId),
ledger_effective_time = Some(transactionMeta.ledgerEffectiveTime.toInstant),
ledger_effective_time = Some(transactionMeta.ledgerEffectiveTime.micros),
command_id = Some(completionInfo.commandId),
workflow_id = transactionMeta.workflowId,
application_id = Some(completionInfo.applicationId),
@ -574,7 +574,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
),
DbDto.CommandCompletion(
completion_offset = someOffset.toHexString,
record_time = update.recordTime.toInstant,
record_time = update.recordTime.micros,
application_id = completionInfo.applicationId,
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
@ -661,7 +661,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
consuming = false,
event_offset = Some(someOffset.toHexString),
transaction_id = Some(update.transactionId),
ledger_effective_time = Some(transactionMeta.ledgerEffectiveTime.toInstant),
ledger_effective_time = Some(transactionMeta.ledgerEffectiveTime.micros),
command_id = Some(completionInfo.commandId),
workflow_id = transactionMeta.workflowId,
application_id = Some(completionInfo.applicationId),
@ -692,7 +692,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
consuming = false,
event_offset = Some(someOffset.toHexString),
transaction_id = Some(update.transactionId),
ledger_effective_time = Some(transactionMeta.ledgerEffectiveTime.toInstant),
ledger_effective_time = Some(transactionMeta.ledgerEffectiveTime.micros),
command_id = Some(completionInfo.commandId),
workflow_id = transactionMeta.workflowId,
application_id = Some(completionInfo.applicationId),
@ -718,7 +718,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
consuming = false,
event_offset = Some(someOffset.toHexString),
transaction_id = Some(update.transactionId),
ledger_effective_time = Some(transactionMeta.ledgerEffectiveTime.toInstant),
ledger_effective_time = Some(transactionMeta.ledgerEffectiveTime.micros),
command_id = Some(completionInfo.commandId),
workflow_id = transactionMeta.workflowId,
application_id = Some(completionInfo.applicationId),
@ -742,7 +742,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
),
DbDto.CommandCompletion(
completion_offset = someOffset.toHexString,
record_time = update.recordTime.toInstant,
record_time = update.recordTime.micros,
application_id = completionInfo.applicationId,
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
@ -805,7 +805,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
consuming = true,
event_offset = Some(someOffset.toHexString),
transaction_id = Some(update.transactionId),
ledger_effective_time = Some(transactionMeta.ledgerEffectiveTime.toInstant),
ledger_effective_time = Some(transactionMeta.ledgerEffectiveTime.micros),
command_id = Some(completionInfo.commandId),
workflow_id = transactionMeta.workflowId,
application_id = Some(completionInfo.applicationId),
@ -844,7 +844,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
),
DbDto.CommandCompletion(
completion_offset = someOffset.toHexString,
record_time = update.recordTime.toInstant,
record_time = update.recordTime.micros,
application_id = completionInfo.applicationId,
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
@ -906,7 +906,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
DbDto.EventCreate(
event_offset = Some(someOffset.toHexString),
transaction_id = Some(update.transactionId),
ledger_effective_time = Some(transactionMeta.ledgerEffectiveTime.toInstant),
ledger_effective_time = Some(transactionMeta.ledgerEffectiveTime.micros),
command_id = Some(completionInfo.commandId),
workflow_id = transactionMeta.workflowId,
application_id = Some(completionInfo.applicationId),
@ -931,7 +931,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
consuming = true,
event_offset = Some(someOffset.toHexString),
transaction_id = Some(update.transactionId),
ledger_effective_time = Some(transactionMeta.ledgerEffectiveTime.toInstant),
ledger_effective_time = Some(transactionMeta.ledgerEffectiveTime.micros),
command_id = Some(completionInfo.commandId),
workflow_id = transactionMeta.workflowId,
application_id = Some(completionInfo.applicationId),
@ -970,7 +970,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
),
DbDto.CommandCompletion(
completion_offset = someOffset.toHexString,
record_time = update.recordTime.toInstant,
record_time = update.recordTime.micros,
application_id = completionInfo.applicationId,
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
@ -1035,7 +1035,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
consuming = true,
event_offset = Some(someOffset.toHexString),
transaction_id = Some(update.transactionId),
ledger_effective_time = Some(transactionMeta.ledgerEffectiveTime.toInstant),
ledger_effective_time = Some(transactionMeta.ledgerEffectiveTime.micros),
command_id = Some(completionInfo.commandId),
workflow_id = transactionMeta.workflowId,
application_id = Some(completionInfo.applicationId),
@ -1073,7 +1073,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
),
DbDto.CommandCompletion(
completion_offset = someOffset.toHexString,
record_time = update.recordTime.toInstant,
record_time = update.recordTime.micros,
application_id = completionInfo.applicationId,
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
@ -1153,7 +1153,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
),
DbDto.CommandCompletion(
completion_offset = someOffset.toHexString,
record_time = update.recordTime.toInstant,
record_time = update.recordTime.micros,
application_id = completionInfo.applicationId,
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,
@ -1202,7 +1202,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
DbDto.EventCreate(
event_offset = Some(someOffset.toHexString),
transaction_id = Some(update.transactionId),
ledger_effective_time = Some(transactionMeta.ledgerEffectiveTime.toInstant),
ledger_effective_time = Some(transactionMeta.ledgerEffectiveTime.micros),
command_id = None,
workflow_id = transactionMeta.workflowId,
application_id = None,
@ -1270,7 +1270,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
dtos should contain theSameElementsInOrderAs List(
DbDto.CommandCompletion(
completion_offset = someOffset.toHexString,
record_time = someRecordTime.toInstant,
record_time = someRecordTime.micros,
application_id = someApplicationId,
submitters = Set(someParty),
command_id = someCommandId,
@ -1333,7 +1333,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
DbDto.EventCreate(
event_offset = Some(someOffset.toHexString),
transaction_id = Some(update.transactionId),
ledger_effective_time = Some(transactionMeta.ledgerEffectiveTime.toInstant),
ledger_effective_time = Some(transactionMeta.ledgerEffectiveTime.micros),
command_id = Some(completionInfo.commandId),
workflow_id = transactionMeta.workflowId,
application_id = Some(completionInfo.applicationId),
@ -1356,7 +1356,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
),
DbDto.CommandCompletion(
completion_offset = someOffset.toHexString,
record_time = update.recordTime.toInstant,
record_time = update.recordTime.micros,
application_id = completionInfo.applicationId,
submitters = completionInfo.actAs.toSet,
command_id = completionInfo.commandId,