Execution Context integration for Database write operations (#7072)

Closes #6887
This commit is contained in:
Radosław Waśko 2023-06-27 17:51:21 +02:00 committed by GitHub
parent 477dd82670
commit 2bac9cc844
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 1603 additions and 449 deletions

View File

@ -499,6 +499,7 @@
- [Implemented `Table.update_database_table`.][7035]
- [Added AWS credential support and initial S3 list buckets API.][6973]
- [Removed `module` argument from `enso_project` and other minor tweaks.][7052]
- [Integrated Database write operations with Execution Contexts.][7072]
[debug-shortcuts]:
https://github.com/enso-org/enso/blob/develop/app/gui/docs/product/shortcuts.md#debug
@ -721,6 +722,7 @@
[6973]: https://github.com/enso-org/enso/pull/6973
[7035]: https://github.com/enso-org/enso/pull/7035
[7052]: https://github.com/enso-org/enso/pull/7052
[7072]: https://github.com/enso-org/enso/pull/7072
#### Enso Compiler

View File

@ -1289,7 +1289,7 @@ lazy val runtime = (project in file("engine/runtime"))
"org.typelevel" %% "cats-core" % catsVersion,
"junit" % "junit" % junitVersion % Test,
"com.novocode" % "junit-interface" % junitIfVersion % Test exclude ("junit", "junit-dep"),
"com.lihaoyi" %% "fansi" % fansiVersion % "provided"
"com.lihaoyi" %% "fansi" % fansiVersion
),
Compile / compile / compileInputs := (Compile / compile / compileInputs)
.dependsOn(CopyTruffleJAR.preCompileTask)
@ -1917,8 +1917,8 @@ lazy val `std-base` = project
Compile / packageBin / artifactPath :=
`base-polyglot-root` / "std-base.jar",
libraryDependencies ++= Seq(
"org.graalvm.truffle" % "truffle-api" % graalVersion % "provided",
"org.netbeans.api" % "org-openide-util-lookup" % netbeansApiVersion % "provided"
"org.graalvm.sdk" % "graal-sdk" % graalVersion % "provided",
"org.netbeans.api" % "org-openide-util-lookup" % netbeansApiVersion % "provided"
),
Compile / packageBin := Def.task {
val result = (Compile / packageBin).value
@ -1944,8 +1944,8 @@ lazy val `common-polyglot-core-utils` = project
Compile / packageBin / artifactPath :=
`base-polyglot-root` / "common-polyglot-core-utils.jar",
libraryDependencies ++= Seq(
"com.ibm.icu" % "icu4j" % icuVersion,
"org.graalvm.truffle" % "truffle-api" % graalVersion % "provided"
"com.ibm.icu" % "icu4j" % icuVersion,
"org.graalvm.sdk" % "graal-sdk" % graalVersion % "provided"
)
)
@ -1957,7 +1957,7 @@ lazy val `enso-test-java-helpers` = project
Compile / packageBin / artifactPath :=
file("test/Tests/polyglot/java/helpers.jar"),
libraryDependencies ++= Seq(
"org.graalvm.truffle" % "truffle-api" % graalVersion % "provided"
"org.graalvm.sdk" % "graal-sdk" % graalVersion % "provided"
),
Compile / packageBin := Def.task {
val result = (Compile / packageBin).value
@ -1990,7 +1990,7 @@ lazy val `std-table` = project
(Antlr4 / sourceManaged).value / "main" / "antlr4"
},
libraryDependencies ++= Seq(
"org.graalvm.truffle" % "truffle-api" % graalVersion % "provided",
"org.graalvm.sdk" % "graal-sdk" % graalVersion % "provided",
"org.netbeans.api" % "org-openide-util-lookup" % netbeansApiVersion % "provided",
"com.univocity" % "univocity-parsers" % univocityParsersVersion,
"org.apache.poi" % "poi-ooxml" % poiOoxmlVersion,
@ -2068,6 +2068,7 @@ lazy val `std-database` = project
Compile / packageBin / artifactPath :=
`database-polyglot-root` / "std-database.jar",
libraryDependencies ++= Seq(
"org.graalvm.sdk" % "graal-sdk" % graalVersion % "provided",
"org.netbeans.api" % "org-openide-util-lookup" % netbeansApiVersion % "provided",
"org.xerial" % "sqlite-jdbc" % sqliteVersion,
"org.postgresql" % "postgresql" % "42.4.0"

View File

@ -86,6 +86,16 @@ The license information can be found along with the copyright notices.
Copyright notices related to this dependency can be found in the directory `com.ibm.icu.icu4j-73.1`.
'fansi_2.13', licensed under the MIT, is distributed with the engine.
The license information can be found along with the copyright notices.
Copyright notices related to this dependency can be found in the directory `com.lihaoyi.fansi_2.13-0.4.0`.
'sourcecode_2.13', licensed under the MIT, is distributed with the engine.
The license information can be found along with the copyright notices.
Copyright notices related to this dependency can be found in the directory `com.lihaoyi.sourcecode_2.13-0.3.0`.
'decline_2.13', licensed under the Apache-2.0, is distributed with the engine.
The license file can be found at `licenses/APACHE2.0`.
Copyright notices related to this dependency can be found in the directory `com.monovore.decline_2.13-2.4.1`.

View File

@ -0,0 +1,25 @@
License
=======
The MIT License (MIT)
Copyright (c) 2016 Li Haoyi (haoyi.sg@gmail.com)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -0,0 +1,25 @@
License
=======
The MIT License (MIT)
Copyright (c) 2014 Li Haoyi (haoyi.sg@gmail.com)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -42,7 +42,7 @@ type Redshift_Details
java_props.setProperty pair.first pair.second
jdbc_connection = JDBC_Connection.create self.jdbc_url properties
Connection.Value jdbc_connection Redshift_Dialect.redshift
Connection.new jdbc_connection Redshift_Dialect.redshift
## PRIVATE
Provides the jdbc url for the connection.

View File

@ -141,7 +141,7 @@ type Panic
Print the `Cleaning...` message regardless of if the action succeeds.
do_cleanup =
IO.println "Cleaning..."
Panic.with_finally do_cleanup <|
Panic.with_finalizer do_cleanup <|
Panic.throw (Illegal_State.Error "Foo")
with_finalizer : Any -> Any -> Any
with_finalizer ~finalizer ~action =

View File

@ -1,7 +1,11 @@
from Standard.Base import all
import Standard.Base.Errors.Common.Forbidden_Operation
import Standard.Base.Errors.Common.Dry_Run_Operation
import Standard.Base.Errors.Illegal_Argument.Illegal_Argument
import Standard.Base.Errors.Illegal_State.Illegal_State
import Standard.Base.Runtime.Context as Execution_Context
import Standard.Base.Runtime.Managed_Resource.Managed_Resource
import Standard.Base.Runtime.Ref.Ref
import Standard.Base.Metadata.Widget
from Standard.Base.Metadata.Widget import Single_Choice, Vector_Editor
@ -12,6 +16,7 @@ import Standard.Table.Data.Table.Table as Materialized_Table
import Standard.Table.Data.Type.Value_Type.Value_Type
import project.Data.Column_Description.Column_Description
import project.Data.Dialect.Dialect
import project.Data.SQL_Query.SQL_Query
import project.Data.SQL_Statement.SQL_Statement
import project.Data.SQL_Type.SQL_Type
@ -22,11 +27,12 @@ import project.Internal.IR.SQL_Expression.SQL_Expression
import project.Internal.IR.Query.Query
import project.Internal.SQL_Type_Reference.SQL_Type_Reference
import project.Internal.Statement_Setter.Statement_Setter
import project.Internal.Hidden_Table_Registry
from project.Internal.Result_Set import read_column, result_set_to_table
from project.Internal.JDBC_Connection import handle_sql_errors
from project.Internal.JDBC_Connection import JDBC_Connection, handle_sql_errors
from project.Errors import SQL_Error, Table_Not_Found, Table_Already_Exists
from project.Internal.Upload_Table import create_table_structure
from project.Internal.Upload_Table import create_table_implementation, first_column_name_in_structure
polyglot java import java.lang.UnsupportedOperationException
polyglot java import java.util.UUID
@ -40,7 +46,27 @@ type Connection
- jdbc_connection: the resource managing the underlying JDBC
connection.
- dialect: the dialect associated with the database we are connected to.
Value jdbc_connection dialect
- supports_large_update: whether the connection should try to use
`executeLargeUpdate`. Set to `True` by default and if the operation
fails with `UnsupportedOperationException`, it is updated to be
`False`.
- hidden_table_registry: a registry of hidden tables that are not
shown to the user, but are used internally by the dry-run system.
Value jdbc_connection dialect supports_large_update hidden_table_registry
## PRIVATE
Constructs a new Connection.
Arguments:
- jdbc_connection: the resource managing the underlying JDBC
connection.
- dialect: the dialect associated with the database we are connected to.
- try_large_update: whether the connection should try to use
`executeLargeUpdate`.
new : JDBC_Connection -> Dialect -> Boolean -> Connection
new jdbc_connection dialect try_large_update=True =
registry = Hidden_Table_Registry.new
Connection.Value jdbc_connection dialect (Ref.new try_large_update) registry
## PRIVATE
Closes the connection releasing the underlying database resources
@ -125,17 +151,43 @@ type Connection
@schema make_schema_selector
tables : Text -> Text -> Text -> Vector Text | Text | Nothing -> Boolean -> Materialized_Table
tables self name_like=Nothing database=self.database schema=Nothing types=self.dialect.default_table_types all_fields=False =
self.get_tables_advanced name_like=name_like database=database schema=schema types=types all_fields=all_fields include_hidden=False
## PRIVATE
A helper that allows to access all tables in a database, including hidden
ones.
Later, once nodes can have expandable arguments, we can merge this with
`tables`, marking the `include_hidden` argument as expandable.
get_tables_advanced self name_like=Nothing database=self.database schema=Nothing types=self.dialect.default_table_types all_fields=False include_hidden=False =
self.maybe_run_maintenance
types_vector = case types of
Nothing -> Nothing
_ : Vector -> types
_ -> [types]
name_map = Map.from_vector [["TABLE_CAT", "Database"], ["TABLE_SCHEM", "Schema"], ["TABLE_NAME", "Name"], ["TABLE_TYPE", "Type"], ["REMARKS", "Description"], ["TYPE_CAT", "Type Database"], ["TYPE_SCHEM", "Type Schema"], ["TYPE_NAME", "Type Name"]]
self.jdbc_connection.with_metadata metadata->
result = self.jdbc_connection.with_metadata metadata->
table = Managed_Resource.bracket (metadata.getTables database schema name_like types_vector) .close result_set->
result_set_to_table result_set self.dialect.make_column_fetcher_for_type
renamed = table.rename_columns name_map
if all_fields then renamed else
renamed.select_columns ["Database", "Schema", "Name", "Type", "Description"]
case include_hidden of
True -> result
False ->
hidden_tables = self.hidden_table_registry.list_hidden_tables
result.filter "Name" (Filter_Condition.Not_In hidden_tables)
## PRIVATE
Checks if the table with the given name exists in the database.
table_exists : Text -> Boolean
table_exists self table_name =
# We fetch tables whose name is like the requested `table_name`.
tables = self.get_tables_advanced name_like=table_name database=self.database schema=Nothing types=Nothing all_fields=False include_hidden=True
## If the name contains special patterns, this may match more than the
desired table so instead of an `not_empty` check, we check if the
exact name is contained.
tables.at "Name" . to_vector . contains table_name
## PRIVATE
Set up a query returning a Table object, which can be used to work with
@ -159,7 +211,7 @@ type Connection
query self query alias="" = case query of
_ : Text ->
result = self.query alias=alias <|
if (all_known_table_names self).contains query then (SQL_Query.Table_Name query) else
if self.table_exists query then (SQL_Query.Table_Name query) else
SQL_Query.Raw_SQL query
result.catch SQL_Error sql_error->
case self.dialect.is_probably_a_query query of
@ -173,14 +225,8 @@ type Connection
ctx = Context.for_query raw_sql name
Database_Table_Module.make_table self name columns ctx
SQL_Query.Table_Name name ->
result = handle_sql_errors <|
ctx = Context.for_table name (if alias == "" then name else alias)
statement = self.dialect.generate_sql (Query.Select Nothing ctx)
statement_setter = self.dialect.get_statement_setter
columns = self.fetch_columns statement statement_setter
Database_Table_Module.make_table self name columns ctx
result.catch SQL_Error sql_error->
Error.throw (Table_Not_Found.Error name sql_error treated_as_query=False extra_message="")
make_table_for_name self name alias
## PRIVATE
Execute the query and load the results into memory as a Table.
@ -189,6 +235,14 @@ type Connection
- query: name of the table or sql statement to query.
If supplied as `Text`, the name is checked against the `tables` list to determine if it is a table or a query.
- limit: the maximum number of rows to return.
? Side Effects
Note that the `read` method is running without restrictions when the
output context is disabled, but it can technically cause side effects,
if it is provided with a DML query. Usually it is preferred to use
`execute_update` for DML queries, or if they are supposed to return
results, the `read` should be wrapped in an execution context check.
@query make_table_name_selector
read : Text | SQL_Query -> Integer | Nothing -> Materialized_Table ! Table_Not_Found
read self query limit=Nothing =
@ -199,9 +253,7 @@ type Connection
the new table.
Arguments:
- table_name: the name of the table to create. If not provided, a random
name will be generated for temporary tables. If `temporary=False`, then
a name must be provided.
- table_name: the name of the table to create.
- structure: the structure of the table, provided as either an existing
`Table` (no data will be copied) or a `Vector` of `Column_Description`.
- primary_key: the names of the columns to use as the primary key. The
@ -230,11 +282,17 @@ type Connection
structure provided, `Missing_Input_Columns` error is raised.
- An `SQL_Error` may be reported if there is a failure on the database
side.
? Dry Run if Output disabled
If performing output actions is disabled, only a dry run is performed
and no permanent changes occur. The operation will test for errors
(like missing columns) and if successful, return a temporary table with
a `Dry_Run_Operation` warning attached.
@structure make_structure_creator
create_table : Text | Nothing -> Vector Column_Description | Database_Table | Materialized_Table -> Vector Text | Nothing -> Boolean -> Boolean -> Problem_Behavior -> Database_Table ! Table_Already_Exists
create_table self (table_name : Text | Nothing = Nothing) (structure : Vector Column_Description | Database_Table | Materialized_Table) (primary_key : (Vector Text | Nothing) = [first_column_in_structure structure]) (temporary : Boolean = False) (allow_existing : Boolean = False) (on_problems:Problem_Behavior = Problem_Behavior.Report_Warning) =
created_table_name = create_table_structure self table_name structure primary_key temporary allow_existing on_problems
self.query (SQL_Query.Table_Name created_table_name)
create_table : Text -> Vector Column_Description | Database_Table | Materialized_Table -> Vector Text | Nothing -> Boolean -> Boolean -> Problem_Behavior -> Database_Table ! Table_Already_Exists
create_table self (table_name : Text) (structure : Vector Column_Description | Database_Table | Materialized_Table) (primary_key : (Vector Text | Nothing) = [first_column_name_in_structure structure]) (temporary : Boolean = False) (allow_existing : Boolean = False) (on_problems:Problem_Behavior = Problem_Behavior.Report_Warning) =
create_table_implementation self table_name structure primary_key temporary allow_existing on_problems
## PRIVATE
Internal read function for a statement with optional types.
@ -274,15 +332,123 @@ type Connection
representing the query to execute.
execute_update : Text | SQL_Statement -> Integer
execute_update self query =
statement_setter = self.dialect.get_statement_setter
self.jdbc_connection.with_prepared_statement query statement_setter stmt->
Panic.catch UnsupportedOperationException stmt.executeLargeUpdate _->
stmt.executeUpdate
if Execution_Context.Output.is_enabled.not then Error.throw (Forbidden_Operation.Error "Executing update queries is forbidden as the Output context is disabled.") else
statement_setter = self.dialect.get_statement_setter
self.jdbc_connection.with_prepared_statement query statement_setter stmt->
case self.supports_large_update.get of
True -> Panic.catch UnsupportedOperationException stmt.executeLargeUpdate _->
self.supports_large_update.put False
stmt.executeUpdate
False -> stmt.executeUpdate
## PRIVATE
drop_table : Text -> Nothing
drop_table self table_name =
self.execute_update (self.dialect.generate_sql (Query.Drop_Table table_name))
Drops a table.
Arguments:
- table_name: the name of the table to drop.
- if_exists: if set to `True`, the operation will not fail if the table
does not exist. Defaults to `False`.
drop_table : Text -> Boolean -> Nothing
drop_table self table_name if_exists=False =
self.execute_update (self.dialect.generate_sql (Query.Drop_Table table_name if_exists))
## PRIVATE
Returns the base `Connection` instance.
Used, so that all internal helper functions do not need to be replicated
on the 'subclasses'.
base_connection : Connection
base_connection self = self
## PRIVATE
If no thread (including the current one) is currently running operations
on the connection, maintenance will be performed.
Currently, this consists of removing dry run tables that are no longer
used.
This method should be run by most database operations to ensure that
unused tables are cleaned at some point.
All errors are swallowed and not propagated, so it is safe to call this
method wherever. There is no point of calling this method inside of
critical sections as then it will not do anything.
maybe_run_maintenance self =
callback _ =
Hidden_Table_Registry.run_maintenance_table_cleanup self
self.jdbc_connection.run_maintenance_action_if_possible callback
## PRIVATE
max_table_name_length : Integer | Nothing
max_table_name_length self =
reported = self.jdbc_connection.with_metadata .getMaxTableNameLength
if reported == 0 then Nothing else reported
## PRIVATE
Generates a temporary table name for the given table name, used for dry
runs.
The table name is 'stable', meaning that the same name will be returned
for the given input `table_name` on subsequent calls, unless the user
creates a clashing table in the meantime.
The table name is guaranteed to be unique for the database at the time it
is generated - this is used to ensure that the dry run tables never
overwrite pre-existing user data.
! Safety
It is safe to drop/overwrite the table returned by this method, as it
can be assumed that it was not created by the user. It either does not
(yet) exist, or if it exists, it is present in our hidden table
registry - but that means it was created by Enso as a hidden table.
generate_dry_run_table_name : Text -> Text
generate_dry_run_table_name self table_name =
max_length = (self.max_table_name_length.if_nothing 60) - 1
go ix =
prefix = "enso-dry-run-" + if ix == 0 then "" else ix.to_text + "-"
## This check ensures that if all possible names are taken, the
method will not loop forever but report an error. It should never
occur in practice - it would mean that the Database contains
unimaginable amounts of dry run tables or has impractically small
table name length limit.
if prefix.length > max_length then Error.throw (Illegal_State.Error "Reached the table name length limit ("+max_length.to_text+") while trying to find a unused table name. It seems that all possible names are already taken. The Database may need to be cleaned up for dry run to work.") else
name = (prefix + table_name) . take max_length
## The dry run name is ok if it is already registered (that means it
may exist in the Database, but it was created by other dry runs
and is safe to overwrite) or if it does not exist in the database.
name_ok = (self.hidden_table_registry.is_registered name) || (self.table_exists name . not)
if name_ok then name else
@Tail_Call go (ix + 1)
go 0
## PRIVATE
Generates a random table name that does not currently exist in the
database.
generate_random_table_name : Text -> Text
generate_random_table_name self prefix="enso-table-" =
max_length = self.max_table_name_length
minimum_randomness = 5
maximum_retries = 20^minimum_randomness
if max_length.is_nothing.not && (prefix.length + minimum_randomness > max_length) then Error.throw (Illegal_State.Error "The prefix has length "+prefix.length.to_text+" while max table name is "+max_length.to_text+" - there is not enough space to safely generate random names.") else
go ix =
if ix > maximum_retries then Error.throw (Illegal_State.Error "Could not generate a non-assigned random table name after "+maximum_retries+". Giving up.") else
base_name = prefix + Random.random_uuid
name = if max_length.is_nothing then base_name else base_name.take max_length
if self.table_exists name . not then name else
@Tail_Call go (ix + 1)
go 0
## PRIVATE
Creates a Table reference that refers to a table with the given name.
Once all references to the table with this name are destroyed, the table
will be marked for removal and dropped at the next maintenance.
internal_allocate_dry_run_table : Text -> Database_Table
internal_allocate_dry_run_table self table_name =
ref = self.hidden_table_registry.make_reference table_name
make_table_for_name self table_name table_name ref
## PRIVATE
make_table_types_selector : Connection -> Widget
@ -295,11 +461,6 @@ make_schema_selector connection =
schemas_without_nothing = connection.schemas.filter Filter_Condition.Not_Nothing
Single_Choice values=(schemas_without_nothing.map t-> Option t t.pretty)+[Option "any schema" "Nothing"]
## PRIVATE
all_known_table_names connection =
tables = connection.tables name_like=Nothing database=connection.database schema=Nothing types=Nothing all_fields=False
tables.at "Name" . to_vector
## PRIVATE
make_table_name_selector : Connection -> Widget
make_table_name_selector connection =
@ -313,6 +474,12 @@ make_structure_creator =
Vector_Editor item_editor=item_editor item_default=item_editor.values.first.value display=Display.Always
## PRIVATE
first_column_in_structure structure = case structure of
_ : Vector -> structure.first.name
_ -> structure.column_names.first
make_table_for_name connection name alias internal_temporary_keep_alive_reference=Nothing =
result = handle_sql_errors <|
ctx = Context.for_table name (if alias == "" then name else alias) internal_temporary_keep_alive_reference
statement = connection.dialect.generate_sql (Query.Select Nothing ctx)
statement_setter = connection.dialect.get_statement_setter
columns = connection.fetch_columns statement statement_setter
Database_Table_Module.make_table connection name columns ctx
result.catch SQL_Error sql_error->
Error.throw (Table_Not_Found.Error name sql_error treated_as_query=False extra_message="")

View File

@ -33,6 +33,7 @@ from Standard.Table.Internal.Filter_Condition_Helpers import make_filter_column
from Standard.Table.Errors import all
import project.Data.Column.Column
import project.Data.SQL_Query.SQL_Query
import project.Data.SQL_Statement.SQL_Statement
import project.Data.SQL_Type.SQL_Type
import project.Internal.Helpers
@ -46,7 +47,7 @@ import project.Internal.IR.SQL_Join_Kind.SQL_Join_Kind
import project.Internal.IR.Query.Query
import project.Internal.SQL_Type_Reference.SQL_Type_Reference
from project.Errors import Unsupported_Database_Operation, Integrity_Error, Unsupported_Name
from project.Errors import Unsupported_Database_Operation, Integrity_Error, Unsupported_Name, Table_Not_Found
import project.Connection.Connection.Connection
polyglot java import java.sql.JDBCType
@ -1821,28 +1822,24 @@ type Table
Table.Value self.name self.connection internal_columns ctx
## PRIVATE
Checks if this table is a 'trivial query'.
Inserts a new row to the table.
A trivial query is a result of `connection.query` that has not been
further processed. If there are any columns that are added or removed, or
any other operations like join or aggregate are performed, the resulting
table is no longer trivial.
Arguments:
- values: The values making up the row of the table.
It actually modifies the underlying table in the database. It can only
be called on the Table if no operations modifying it have been performed
like modifying, removing or adding columns, filtering, grouping etc.
insert : Vector Any -> Nothing
insert self values =
table_name = case self.context.from_spec of
From_Spec.Table name _ -> name
_ -> Error.throw <| Illegal_State.Error "Inserting can only be performed on tables as returned by `query`, any further processing is not allowed."
# TODO [RW] before removing the PRIVATE tag, add a check that no bad stuff was done to the table as described above
pairs = self.internal_columns.zip values col-> value->
[col.name, SQL_Expression.Constant value]
query = self.connection.dialect.generate_sql <| Query.Insert table_name pairs
affected_rows = self.connection.execute_update query
case affected_rows == 1 of
False -> Error.throw <| Illegal_State.Error "The update unexpectedly affected "+affected_rows.to_text+" rows."
True -> Nothing
Some operations, like writing to tables, require their target to be a
trivial query.
is_trivial_query : Boolean ! Table_Not_Found
is_trivial_query self =
trivial_counterpart = self.connection.query (SQL_Query.Table_Name self.name)
trivial_counterpart.if_not_error <|
if self.context != trivial_counterpart.context then False else
column_descriptor internal_column = [internal_column.name, internal_column.expression]
my_columns = self.internal_columns.map column_descriptor
trivial_columns = trivial_counterpart.internal_columns.map column_descriptor
my_columns == trivial_columns
## PRIVATE
Provides a simplified text representation for display in the REPL and errors.

View File

@ -16,9 +16,7 @@ from project.Internal.Upload_Table import all
Arguments:
- connection: the database connection to use. The table will be created in
the database and schema associated with this connection.
- table_name: the name of the table to create. If not provided, a random name
will be generated for temporary tables. If `temporary=False`, then a name
must be provided.
- table_name: the name of the table to create.
- primary_key: the names of the columns to use as the primary key. The first
column from the table is used by default. If it is set to `Nothing` or an
empty vector, no primary key will be created.
@ -45,16 +43,25 @@ from project.Internal.Upload_Table import all
If an error has been raised, the table is not created (that may not always
apply to `SQL_Error`).
? Dry Run if Output disabled
If performing output actions is disabled, only a dry run is performed and
no permanent changes occur. The operation checks for errors like missing
columns, and returns a temporary table containing a sample of the input
with a `Dry_Run_Operation` warning attached.
More expensive checks, like clashing keys are checked only on the sample of
rows, so errors may still occur when the output action is enabled.
@primary_key Widget_Helpers.make_column_name_vector_selector
Table.select_into_database_table : Connection -> Text|Nothing -> Vector Text | Nothing -> Boolean -> Problem_Behavior -> Table ! Table_Already_Exists | Inexact_Type_Coercion | Missing_Input_Columns | Non_Unique_Primary_Key | SQL_Error | Illegal_Argument
Table.select_into_database_table self connection table_name=Nothing primary_key=[self.columns.first.name] temporary=False on_problems=Problem_Behavior.Report_Warning = Panic.recover SQL_Error <|
upload_database_table self connection table_name primary_key temporary on_problems
Table.select_into_database_table : Connection -> Text -> Vector Text | Nothing -> Boolean -> Problem_Behavior -> Table ! Table_Already_Exists | Inexact_Type_Coercion | Missing_Input_Columns | Non_Unique_Primary_Key | SQL_Error | Illegal_Argument
Table.select_into_database_table self connection (table_name : Text) primary_key=[self.columns.first.name] temporary=False on_problems=Problem_Behavior.Report_Warning =
select_into_table_implementation self connection table_name primary_key temporary on_problems
## Updates the target table with the contents of this table.
Arguments:
- connection: the database connection of the target table.
- table_name: the name of the table to update.
- target_table: the target table to update. It must be a database table.
- update_action: specifies the update strategy - how to handle existing new
and missing rows.
- key_columns: the names of the columns to use identify correlate rows from
@ -100,6 +107,17 @@ Table.select_into_database_table self connection table_name=Nothing primary_key=
column can be widened to a 64-bit integer column, but not vice versa
(because larger numbers could not fit the smaller type and the type of the
column in the target table cannot be changed).
Table.update_database_table : Connection -> Text -> Update_Action -> Vector Text | Nothing -> Boolean -> Problem_Behavior -> Table ! Table_Not_Found | Unmatched_Columns | Missing_Input_Columns | Column_Type_Mismatch | SQL_Error | Illegal_Argument
Table.update_database_table self connection (table_name : Text) (update_action : Update_Action = Update_Action.Update_Or_Insert) (key_columns : Vector | Nothing = default_key_columns connection table_name) (error_on_missing_columns : Boolean = False) (on_problems : Problem_Behavior = Problem_Behavior.Report_Warning) =
common_update_table self connection table_name update_action key_columns error_on_missing_columns on_problems
? Dry Run if Output disabled
If performing output actions is disabled, only a dry run is performed and
no permanent changes occur. The operation checks for errors like missing
columns or mismatched types and if successful, returns the target table
unchanged with a `Dry_Run_Operation` warning attached.
More expensive checks, like clashing keys or unmatched rows are checked
only on a sample of rows, so errors may still occur when the output action
is enabled.
Table.update_database_table : Table -> Update_Action -> Vector Text | Nothing -> Boolean -> Problem_Behavior -> Table ! Table_Not_Found | Unmatched_Columns | Missing_Input_Columns | Column_Type_Mismatch | SQL_Error | Illegal_Argument
Table.update_database_table self target_table (update_action : Update_Action = Update_Action.Update_Or_Insert) (key_columns : Vector | Nothing = default_key_columns target_table) (error_on_missing_columns : Boolean = False) (on_problems : Problem_Behavior = Problem_Behavior.Report_Warning) =
common_update_table self target_table update_action key_columns error_on_missing_columns on_problems

View File

@ -17,9 +17,7 @@ from project.Internal.Upload_Table import all
Arguments:
- connection: the database connection to use. The table will be created in
the database and schema associated with this connection.
- table_name: the name of the table to create. If not provided, a random name
will be generated for temporary tables. If `temporary=False`, then a name
must be provided.
- table_name: the name of the table to create.
- primary_key: the names of the columns to use as the primary key. The first
column from the table is used by default. If it is set to `Nothing` or an
empty vector, no primary key will be created.
@ -46,16 +44,25 @@ from project.Internal.Upload_Table import all
If an error has been raised, the table is not created (that may not always
apply to `SQL_Error`).
? Dry Run if Output disabled
If performing output actions is disabled, only a dry run is performed and
no permanent changes occur. The operation checks for errors like missing
columns, and returns a temporary table containing a sample of the input
with a `Dry_Run_Operation` warning attached.
More expensive checks, like clashing keys are checked only on the sample of
rows, so errors may still occur when the output action is enabled.
@primary_key Widget_Helpers.make_column_name_vector_selector
Table.select_into_database_table : Connection -> Text|Nothing -> Vector Text | Nothing -> Boolean -> Problem_Behavior -> Database_Table ! Table_Already_Exists | Inexact_Type_Coercion | Missing_Input_Columns | Non_Unique_Primary_Key | SQL_Error | Illegal_Argument
Table.select_into_database_table self connection table_name=Nothing primary_key=[self.columns.first.name] temporary=False on_problems=Problem_Behavior.Report_Warning =
upload_in_memory_table self connection table_name primary_key temporary on_problems
Table.select_into_database_table : Connection -> Text -> Vector Text | Nothing -> Boolean -> Problem_Behavior -> Database_Table ! Table_Already_Exists | Inexact_Type_Coercion | Missing_Input_Columns | Non_Unique_Primary_Key | SQL_Error | Illegal_Argument
Table.select_into_database_table self connection (table_name : Text) primary_key=[self.columns.first.name] temporary=False on_problems=Problem_Behavior.Report_Warning =
select_into_table_implementation self connection table_name primary_key temporary on_problems
## Updates the target table with the contents of this table.
Arguments:
- connection: the database connection of the target table.
- table_name: the name of the table to update.
- target_table: the target table to update. It must be a database table.
- update_action: specifies the update strategy - how to handle existing new
and missing rows.
- key_columns: the names of the columns to use identify correlate rows from
@ -101,6 +108,17 @@ Table.select_into_database_table self connection table_name=Nothing primary_key=
column can be widened to a 64-bit integer column, but not vice versa
(because larger numbers could not fit the smaller type and the type of the
column in the target table cannot be changed).
Table.update_database_table : Connection -> Text -> Update_Action -> Vector Text | Nothing -> Boolean -> Problem_Behavior -> Database_Table ! Table_Not_Found | Unmatched_Columns | Missing_Input_Columns | Column_Type_Mismatch | SQL_Error | Illegal_Argument
Table.update_database_table self connection (table_name : Text) (update_action : Update_Action = Update_Action.Update_Or_Insert) (key_columns : Vector | Nothing = default_key_columns connection table_name) (error_on_missing_columns : Boolean = False) (on_problems : Problem_Behavior = Problem_Behavior.Report_Warning) =
common_update_table self connection table_name update_action key_columns error_on_missing_columns on_problems
? Dry Run if Output disabled
If performing output actions is disabled, only a dry run is performed and
no permanent changes occur. The operation checks for errors like missing
columns or mismatched types and if successful, returns the target table
unchanged with a `Dry_Run_Operation` warning attached.
More expensive checks, like clashing keys or unmatched rows are checked
only on a sample of rows, so errors may still occur when the output action
is enabled.
Table.update_database_table : Database_Table -> Update_Action -> Vector Text | Nothing -> Boolean -> Problem_Behavior -> Database_Table ! Table_Not_Found | Unmatched_Columns | Missing_Input_Columns | Column_Type_Mismatch | SQL_Error | Illegal_Argument
Table.update_database_table self target_table (update_action : Update_Action = Update_Action.Update_Or_Insert) (key_columns : Vector | Nothing = default_key_columns target_table) (error_on_missing_columns : Boolean = False) (on_problems : Problem_Behavior = Problem_Behavior.Report_Warning) =
common_update_table self target_table update_action key_columns error_on_missing_columns on_problems

View File

@ -307,7 +307,7 @@ alias dialect name =
- from_spec: A description of the FROM clause.
generate_from_part : Internal_Dialect -> From_Spec -> Builder
generate_from_part dialect from_spec = case from_spec of
From_Spec.Table name as_name ->
From_Spec.Table name as_name _ ->
dialect.wrap_identifier name ++ alias dialect as_name
From_Spec.Query raw_sql as_name ->
Builder.code raw_sql . paren ++ alias dialect as_name
@ -434,8 +434,9 @@ generate_query dialect query = case query of
generate_insert_query dialect table_name pairs
Query.Create_Table name columns primary_key temporary ->
generate_create_table dialect name columns primary_key temporary
Query.Drop_Table name ->
Builder.code "DROP TABLE " ++ dialect.wrap_identifier name
Query.Drop_Table name if_exists ->
maybe_if_exists = if if_exists then Builder.code "IF EXISTS " else Builder.empty
Builder.code "DROP TABLE " ++ maybe_if_exists ++ dialect.wrap_identifier name
Query.Insert_From_Select table_name column_names select_query -> case select_query of
Query.Select _ _ ->
inner_query = generate_query dialect select_query

View File

@ -0,0 +1,85 @@
from Standard.Base import all
import Standard.Base.Runtime.Managed_Resource.Managed_Resource
import Standard.Base.Runtime.Ref.Ref
polyglot java import org.enso.database.dryrun.HiddenTableReferenceCounter
## PRIVATE
A reference to a hidden table that keeps it alive.
Once all references to a particular hidden table are garbage collected, the
hidden table is marked for deletion.
type Hidden_Table_Reference
Reference (parent : Hidden_Table_Registry) (table_name : Text)
## PRIVATE
A registry that keeps track of temporary hidden tables.
These tables will all be destroyed once the connection is closed, but to
avoid creating too many, the registry may allow to drop them more eagerly.
Moreover, the registry keeps track of which tables were created by Enso,
allowing us to avoid dropping tables with similar names that were created by
the user.
type Hidden_Table_Registry
## PRIVATE
Registry (reference_counter : HiddenTableReferenceCounter)
## PRIVATE
Creates a new reference to the table with the given name.
Once this and any other references to this table name are garbage
collected, the table will be scheduled for disposal and removed on the
next `run_maintenance_table_cleanup` invocation (unless the table is
'brought back to life' by new references being introduced).
make_reference : Text -> Managed_Resource
make_reference self table_name =
self.reference_counter.increment table_name
reference = Hidden_Table_Reference.Reference self table_name
Managed_Resource.register reference dispose_reference
## PRIVATE
Lists all tables that were added to the registry by calling
`make_reference` and not yet removed.
list_hidden_tables : Vector Text
list_hidden_tables self =
Vector.from_polyglot_array self.reference_counter.getKnownTables
## PRIVATE
Checks if the given table name is registered in the registry.
is_registered : Text -> Boolean
is_registered self table_name =
self.reference_counter.isRegistered table_name
## PRIVATE
Creates a new hidden table registry instance.
new : Hidden_Table_Registry
new =
Hidden_Table_Registry.Registry (HiddenTableReferenceCounter.new)
## PRIVATE
Utility method for disposing of references. Provided to avoid accidental
scope capture with `Managed_Resource` finalizers.
dispose_reference : Any -> Nothing
dispose_reference reference =
registry = reference.parent
registry.reference_counter.decrement reference.table_name
## PRIVATE
Drops all temporary hidden tables that have been marked for removal and not
brought back to life.
This method must be run in a critical section guaranteeing that no other
operations will be performed on the associated connection in parallel. Thanks
to running it in a critical section, there is no risk that a table that is
just being dropped will come back to life in the meantime - since no other
code can be creating tables on this connection at the same time.
run_maintenance_table_cleanup connection =
registry = connection.hidden_table_registry
reference_counter = registry.reference_counter
tables_scheduled_for_removal = Vector.from_polyglot_array reference_counter.getTablesScheduledForRemoval
tables_scheduled_for_removal.each table_name->
# The table could not exist in case a transaction that created it was rolled back. We just ignore such cases.
connection.drop_table table_name if_exists=True
reference_counter.markAsDropped table_name

View File

@ -17,9 +17,11 @@ type Context
Arguments:
- table_name: The name of the table for which the context is being created.
- alias: An alias name to use for table within the query.
for_table : Text -> Text -> Context
for_table table_name alias=table_name =
Context.Value (From_Spec.Table table_name alias) [] [] [] Nothing Nothing
- internal_temporary_keep_alive_reference: See `From_Spec.Table` for more
details.
for_table : Text -> Text -> Any -> Context
for_table table_name alias=table_name internal_temporary_keep_alive_reference=Nothing =
Context.Value (From_Spec.Table table_name alias internal_temporary_keep_alive_reference=internal_temporary_keep_alive_reference) [] [] [] Nothing Nothing
## PRIVATE

View File

@ -20,7 +20,11 @@ type From_Spec
parts of the query, this is especially useful for example in
self-joins, allowing to differentiate between different instances of
the same table.
Table (table_name : Text) (alias : Text)
- internal_temporary_keep_alive_reference: a reference that can be used
to track the lifetime of a temporary dry-run table. Once this context
is garbage collected, the temporary table may be dropped.
See `Hidden_Table_Registry` for more details.
Table (table_name : Text) (alias : Text) internal_temporary_keep_alive_reference=Nothing
## PRIVATE
@ -75,3 +79,24 @@ type From_Spec
- alias: the name upon which the results of this sub-query can be
referred to in other parts of the query.
Sub_Query (columns : Vector (Pair Text SQL_Expression)) (context : Context) (alias : Text)
## PRIVATE
type From_Spec_Comparator
## PRIVATE
Special handling to ignore the alias and internal temporary keep alive
reference when comparing two `From_Spec.Table` values.
compare x y = case x of
From_Spec.Table table_name _ _ -> case y of
From_Spec.Table other_table_name _ _ ->
if table_name == other_table_name then Ordering.Equal else Nothing
_ -> Nothing
_ -> Default_Comparator.compare x y
## PRIVATE
hash x = case x of
From_Spec.Table table_name _ _ ->
Default_Comparator.hash table_name
_ -> Default_Comparator.hash x
Comparable.from (_ : From_Spec) = From_Spec_Comparator

View File

@ -47,7 +47,11 @@ type Query
## PRIVATE
An SQL query that drops a table.
Drop_Table (table_name:Text)
Arguments:
- table_name: the name of the table to drop.
- if_exists: if `True`, an `IF EXISTS` clause will be added.
Drop_Table (table_name:Text) (if_exists:Boolean)
## PRIVATE
An INSERT INTO ... SELECT query that allows to insert results of a query

View File

@ -3,6 +3,7 @@ import Standard.Base.Errors.Illegal_Argument.Illegal_Argument
import Standard.Base.Errors.Illegal_State.Illegal_State
import Standard.Base.Errors.Unimplemented.Unimplemented
import Standard.Base.Runtime.Managed_Resource.Managed_Resource
import Standard.Base.Runtime.Context
import Standard.Table.Data.Table.Table as Materialized_Table
import Standard.Table.Data.Type.Value_Type.Value_Type
@ -25,10 +26,34 @@ polyglot java import java.sql.SQLException
polyglot java import java.sql.SQLTimeoutException
polyglot java import org.enso.database.JDBCProxy
polyglot java import org.enso.database.dryrun.OperationSynchronizer
type JDBC_Connection
## PRIVATE
Value connection_resource
Arguments:
- connection_resource: a `Managed_Resource` containing the Java
Connection instance.
- operation_synchronizer: a helper for synchronizing access to the underlying Connection.
Value connection_resource operation_synchronizer
## PRIVATE
Runs the provided action ensuring that no other thread is working with
this Connection concurrently.
synchronized self ~action =
# We save and restore context information. This is a workaround for bug #7117.
restore_context context =
saved_setting = context.is_enabled
~action ->
case saved_setting of
True -> context.with_enabled action
False -> context.with_disabled action
restore_output = restore_context Context.Output
restore_input = restore_context Context.Input
callback _ =
restore_input <|
restore_output <|
action
self.operation_synchronizer.runSynchronizedAction callback
## PRIVATE
Closes the connection releasing the underlying database resources
@ -36,7 +61,7 @@ type JDBC_Connection
The connection is not usable afterwards.
close : Nothing
close self =
close self = self.synchronized <|
self.connection_resource . finalize
## PRIVATE
@ -44,28 +69,32 @@ type JDBC_Connection
Open the connection to the database, then run the action wrapping any
SQL errors.
with_connection : (Connection -> Any) -> Any
with_connection self ~action =
handle_sql_errors <|
self.connection_resource.with action
with_connection self action = self.synchronized <|
self.connection_resource.with action
## PRIVATE
Runs the provided callback only if no thread is currently inside a
`synchronized` critical section (including the current thread).
run_maintenance_action_if_possible : (Nothing -> Any) -> Nothing
run_maintenance_action_if_possible self callback =
self.operation_synchronizer.runMaintenanceActionIfPossible callback
## PRIVATE
Open the connection to the database, then run the action passing the
database's metadata wrapping any SQL errors.
with_metadata : (DatabaseMetaData -> Any) -> Any
with_metadata self ~action =
handle_sql_errors <|
self.connection_resource.with connection->
metadata = connection.getMetaData
action metadata
with_metadata self ~action = handle_sql_errors <| self.with_connection connection->
metadata = connection.getMetaData
action metadata
## PRIVATE
Runs the provided action with a prepared statement, adding contextual
information to any thrown SQL errors.
with_prepared_statement : Text | SQL_Statement -> Statement_Setter -> (PreparedStatement -> Any) -> Any
with_prepared_statement self query statement_setter action =
prepare template values = self.connection_resource.with java_connection->
with_prepared_statement self query statement_setter action = self.synchronized <|
prepare template values = self.with_connection java_connection->
stmt = java_connection.prepareStatement template
handle_illegal_state caught_panic =
Error.throw (Illegal_Argument.Error caught_panic.payload.message)
@ -126,7 +155,8 @@ type JDBC_Connection
running this function (so if it was off before, this method may not
change anything).
run_without_autocommit : Any -> Any
run_without_autocommit self ~action =
run_without_autocommit self ~action = handle_sql_errors <|
# The whole block is already `synchronized` by `with_connection`.
self.with_connection java_connection->
default_autocommit = java_connection.getAutoCommit
Managed_Resource.bracket (java_connection.setAutoCommit False) (_ -> java_connection.setAutoCommit default_autocommit) _->
@ -161,11 +191,14 @@ type JDBC_Connection
It is the caller's responsibility to call this method from within a
transaction to ensure consistency.
batch_insert : Text -> Statement_Setter -> Materialized_Table -> Integer -> Nothing
batch_insert self insert_template statement_setter table batch_size =
self.with_connection java_connection->
batch_insert : Text -> Statement_Setter -> Materialized_Table -> Integer -> Integer | Nothing -> Nothing
batch_insert self insert_template statement_setter table batch_size row_limit=Nothing =
In_Transaction.ensure_in_transaction <| self.with_connection java_connection-> handle_sql_errors related_query=insert_template <|
Managed_Resource.bracket (java_connection.prepareStatement insert_template) .close stmt->
num_rows = table.row_count
table_row_count = table.row_count
num_rows = case row_limit of
Nothing -> table_row_count
limit : Integer -> Math.min table_row_count limit
columns = table.columns
check_rows updates_array expected_size =
updates = Vector.from_polyglot_array updates_array
@ -179,7 +212,6 @@ type JDBC_Connection
stmt.addBatch
if (row_id+1 % batch_size) == 0 then check_rows stmt.executeBatch batch_size
if (num_rows % batch_size) != 0 then check_rows stmt.executeBatch (num_rows % batch_size)
java_connection.commit
## PRIVATE
@ -199,7 +231,8 @@ create url properties = handle_sql_errors <|
java_connection = JDBCProxy.getConnection url java_props
resource = Managed_Resource.register java_connection close_connection
JDBC_Connection.Value resource
synchronizer = OperationSynchronizer.new
JDBC_Connection.Value resource synchronizer
## PRIVATE

View File

@ -19,9 +19,10 @@ import project.Internal.IR.Query.Query
import project.Internal.JDBC_Connection
import project.Internal.SQL_Type_Reference.SQL_Type_Reference
from project.Connection.Connection import make_table_types_selector, make_schema_selector, make_table_name_selector, first_column_in_structure, make_structure_creator
from project.Connection.Connection import make_table_types_selector, make_schema_selector, make_table_name_selector, make_structure_creator
from project.Errors import SQL_Error, Table_Not_Found, Table_Already_Exists
from project.Internal.Result_Set import read_column
from project.Internal.Upload_Table import first_column_name_in_structure
type Postgres_Connection
@ -133,6 +134,14 @@ type Postgres_Connection
- query: name of the table or sql statement to query.
If supplied as `Text`, the name is checked against the `tables` list to determine if it is a table or a query.
- limit: the maximum number of rows to return.
? Side Effects
Note that the `read` method is running without restrictions when the
output context is disabled, but it can technically cause side effects,
if it is provided with a DML query. Usually it is preferred to use
`execute_update` for DML queries, or if they are supposed to return
results, the `read` should be wrapped in an execution context check.
@query make_table_name_selector
read : Text | SQL_Query -> Integer | Nothing -> Materialized_Table ! Table_Not_Found
read self query limit=Nothing = self.connection.read query limit
@ -141,9 +150,7 @@ type Postgres_Connection
the new table.
Arguments:
- table_name: the name of the table to create. If not provided, a random
name will be generated for temporary tables. If `temporary=False`, then
a name must be provided.
- table_name: the name of the table to create.
- structure: the structure of the table, provided as either an existing
`Table` (no data will be copied) or a `Vector` of `Column_Description`.
- primary_key: the names of the columns to use as the primary key. The
@ -172,9 +179,16 @@ type Postgres_Connection
structure provided, `Missing_Input_Columns` error is raised.
- An `SQL_Error` may be reported if there is a failure on the database
side.
? Dry Run if Output disabled
If performing output actions is disabled, only a dry run is performed
and no permanent changes occur. The operation will test for errors
(like missing columns) and if successful, return a temporary table with
a `Dry_Run_Operation` warning attached.
@structure make_structure_creator
create_table : Text | Nothing -> Vector Column_Description | Database_Table | Materialized_Table -> Vector Text | Nothing -> Boolean -> Boolean -> Problem_Behavior -> Database_Table ! Table_Already_Exists
create_table self (table_name : Text | Nothing = Nothing) (structure : Vector Column_Description | Database_Table | Materialized_Table) (primary_key : (Vector Text | Nothing) = [first_column_in_structure structure]) (temporary : Boolean = False) (allow_existing : Boolean = False) (on_problems:Problem_Behavior = Problem_Behavior.Report_Warning) =
create_table : Text -> Vector Column_Description | Database_Table | Materialized_Table -> Vector Text | Nothing -> Boolean -> Boolean -> Problem_Behavior -> Database_Table ! Table_Already_Exists
create_table self (table_name : Text) (structure : Vector Column_Description | Database_Table | Materialized_Table) (primary_key : (Vector Text | Nothing) = [first_column_name_in_structure structure]) (temporary : Boolean = False) (allow_existing : Boolean = False) (on_problems:Problem_Behavior = Problem_Behavior.Report_Warning) =
self.connection.create_table table_name structure primary_key temporary allow_existing on_problems
## ADVANCED
@ -199,9 +213,23 @@ type Postgres_Connection
jdbc_connection self = self.connection.jdbc_connection
## PRIVATE
drop_table : Text -> Nothing
drop_table self table_name =
self.connection.drop_table table_name
Drops a table.
Arguments:
- table_name: the name of the table to drop.
- if_exists: if set to `True`, the operation will not fail if the table
does not exist. Defaults to `False`.
drop_table : Text -> Boolean -> Nothing
drop_table self table_name if_exists=False =
self.connection.drop_table table_name if_exists
## PRIVATE
Returns the base `Connection` instance.
Used, so that all internal helper functions do not need to be replicated
on the 'subclasses'.
base_connection : Connection
base_connection self = self.connection
## PRIVATE
@ -214,4 +242,4 @@ type Postgres_Connection
create : Text -> Vector -> (Text -> Text -> Postgres_Connection) -> Postgres_Connection
create url properties make_new =
jdbc_connection = JDBC_Connection.create url properties
Postgres_Connection.Value (Connection.Value jdbc_connection Dialect.postgres) make_new
Postgres_Connection.Value (Connection.new jdbc_connection Dialect.postgres) make_new

View File

@ -19,8 +19,9 @@ import project.Internal.IR.Query.Query
import project.Internal.JDBC_Connection
import project.Internal.SQL_Type_Reference.SQL_Type_Reference
from project.Connection.Connection import make_table_types_selector, make_schema_selector, make_table_name_selector, first_column_in_structure, make_structure_creator
from project.Connection.Connection import make_table_types_selector, make_schema_selector, make_table_name_selector, make_structure_creator
from project.Errors import SQL_Error, Table_Not_Found, Table_Already_Exists
from project.Internal.Upload_Table import first_column_name_in_structure
type SQLite_Connection
## PRIVATE
@ -127,6 +128,14 @@ type SQLite_Connection
- query: name of the table or sql statement to query.
If supplied as `Text`, the name is checked against the `tables` list to determine if it is a table or a query.
- limit: the maximum number of rows to return.
? Side Effects
Note that the `read` method is running without restrictions when the
output context is disabled, but it can technically cause side effects,
if it is provided with a DML query. Usually it is preferred to use
`execute_update` for DML queries, or if they are supposed to return
results, the `read` should be wrapped in an execution context check.
@query make_table_name_selector
read : Text | SQL_Query -> Integer | Nothing -> Materialized_Table ! Table_Not_Found
read self query limit=Nothing = self.connection.read query limit
@ -135,9 +144,7 @@ type SQLite_Connection
the new table.
Arguments:
- table_name: the name of the table to create. If not provided, a random
name will be generated for temporary tables. If `temporary=False`, then
a name must be provided.
- table_name: the name of the table to create.
- structure: the structure of the table, provided as either an existing
`Table` (no data will be copied) or a `Vector` of `Column_Description`.
- primary_key: the names of the columns to use as the primary key. The
@ -166,9 +173,16 @@ type SQLite_Connection
structure provided, `Missing_Input_Columns` error is raised.
- An `SQL_Error` may be reported if there is a failure on the database
side.
? Dry Run if Output disabled
If performing output actions is disabled, only a dry run is performed
and no permanent changes occur. The operation will test for errors
(like missing columns) and if successful, return a temporary table with
a `Dry_Run_Operation` warning attached.
@structure make_structure_creator
create_table : Text | Nothing -> Vector Column_Description | Database_Table | Materialized_Table -> Vector Text | Nothing -> Boolean -> Boolean -> Problem_Behavior -> Database_Table ! Table_Already_Exists
create_table self (table_name : Text | Nothing = Nothing) (structure : Vector Column_Description | Database_Table | Materialized_Table) (primary_key : (Vector Text | Nothing) = [first_column_in_structure structure]) (temporary : Boolean = False) (allow_existing : Boolean = False) (on_problems:Problem_Behavior = Problem_Behavior.Report_Warning) =
create_table : Text -> Vector Column_Description | Database_Table | Materialized_Table -> Vector Text | Nothing -> Boolean -> Boolean -> Problem_Behavior -> Database_Table ! Table_Already_Exists
create_table self (table_name : Text) (structure : Vector Column_Description | Database_Table | Materialized_Table) (primary_key : (Vector Text | Nothing) = [first_column_name_in_structure structure]) (temporary : Boolean = False) (allow_existing : Boolean = False) (on_problems:Problem_Behavior = Problem_Behavior.Report_Warning) =
self.connection.create_table table_name structure primary_key temporary allow_existing on_problems
## ADVANCED
@ -193,9 +207,23 @@ type SQLite_Connection
jdbc_connection self = self.connection.jdbc_connection
## PRIVATE
drop_table : Text -> Nothing
drop_table self table_name =
self.connection.drop_table table_name
Drops a table.
Arguments:
- table_name: the name of the table to drop.
- if_exists: if set to `True`, the operation will not fail if the table
does not exist. Defaults to `False`.
drop_table : Text -> Boolean -> Nothing
drop_table self table_name if_exists=False =
self.connection.drop_table table_name if_exists
## PRIVATE
Returns the base `Connection` instance.
Used, so that all internal helper functions do not need to be replicated
on the 'subclasses'.
base_connection : Connection
base_connection self = self.connection
## PRIVATE
@ -207,4 +235,4 @@ type SQLite_Connection
create : Text -> Vector -> SQLite_Connection
create url properties =
jdbc_connection = JDBC_Connection.create url properties
SQLite_Connection.Value (Connection.Value jdbc_connection Dialect.sqlite)
SQLite_Connection.Value (Connection.new jdbc_connection Dialect.sqlite)

View File

@ -1,6 +1,9 @@
from Standard.Base import all
from Standard.Base.Random import random_uuid
import Standard.Base.Errors.Common.Dry_Run_Operation
import Standard.Base.Errors.Common.Forbidden_Operation
import Standard.Base.Errors.Illegal_Argument.Illegal_Argument
import Standard.Base.Errors.Illegal_State.Illegal_State
import Standard.Base.Runtime.Context
import Standard.Table.Data.Table.Table as In_Memory_Table
from Standard.Table import Aggregate_Column, Join_Kind, Value_Type, Column_Selector
@ -17,51 +20,93 @@ import project.Internal.In_Transaction.In_Transaction
import project.Internal.IR.Create_Column_Descriptor.Create_Column_Descriptor
import project.Internal.IR.Query.Query
import project.Internal.IR.SQL_Expression.SQL_Expression
from project.Connection.Connection import all_known_table_names
from project.Errors import all
from project.Internal.Result_Set import result_set_to_table
## PRIVATE
Creates a new database table with the provided structure and returns the name
of the created table.
create_table_structure connection table_name structure primary_key temporary allow_existing on_problems =
case table_name.is_nothing.not && all_known_table_names connection . contains table_name of
The user-facing function that handles the dry-run logic.
create_table_implementation connection table_name structure primary_key temporary allow_existing on_problems = Panic.recover SQL_Error <|
connection.base_connection.maybe_run_maintenance
case connection.base_connection.table_exists table_name of
True ->
if allow_existing then table_name else Error.throw (Table_Already_Exists.Error table_name)
if allow_existing then connection.query (SQL_Query.Table_Name table_name) else Error.throw (Table_Already_Exists.Error table_name)
False ->
effective_table_name = resolve_effective_table_name table_name temporary
aligned_structure = align_structure structure
if aligned_structure.is_empty then Error.throw (Illegal_Argument.Error "An empty table cannot be created: the `structure` must consist of at list one column description.") else
resolved_primary_key = resolve_primary_key aligned_structure primary_key
create_table_statement = prepare_create_table_statement connection effective_table_name aligned_structure resolved_primary_key temporary on_problems
update_result = create_table_statement.if_not_error <|
connection.execute_update create_table_statement
update_result.if_not_error <|
effective_table_name
dry_run = Context.Output.is_enabled.not
effective_table_name = if dry_run.not then table_name else connection.base_connection.generate_dry_run_table_name table_name
effective_temporary = temporary || dry_run
created_table_name = Context.Output.with_enabled <|
connection.jdbc_connection.run_within_transaction <|
if dry_run then
## This temporary table can be safely dropped if it
exists, because it only existed if it was created by
a previous dry run. `generate_dry_run_table_name`
will never return a name of a table that exists but
was created outside of a dry run.
connection.drop_table table_name if_exists=True
internal_create_table_structure connection effective_table_name structure primary_key effective_temporary on_problems
if dry_run.not then connection.query (SQL_Query.Table_Name created_table_name) else
created_table = connection.base_connection.internal_allocate_dry_run_table created_table_name
warning = Dry_Run_Operation.Warning "Only a dry run of `create_table` has occurred, creating a temporary table ("+created_table_name.pretty+") instead of the actual one."
Warning.attach warning created_table
## PRIVATE
Assumes the output context is enabled for it to work.
Does not check if the table already exists - so if it does, it may fail with
`SQL_Error`. The caller should perform the check for better error handling.
internal_create_table_structure connection table_name structure primary_key temporary on_problems =
aligned_structure = align_structure structure
resolved_primary_key = resolve_primary_key aligned_structure primary_key
create_table_statement = prepare_create_table_statement connection table_name aligned_structure resolved_primary_key temporary on_problems
check_transaction_ddl_support connection
update_result = create_table_statement.if_not_error <|
connection.execute_update create_table_statement
update_result.if_not_error <|
table_name
## PRIVATE
A helper that can upload a table from any backend to a database.
It should be run within a transaction and wrapped in `handle_upload_errors`.
internal_upload_table source_table connection table_name primary_key temporary on_problems =
internal_upload_table source_table connection table_name primary_key temporary on_problems=Problem_Behavior.Report_Error row_limit=Nothing =
case source_table of
_ : In_Memory_Table ->
internal_upload_in_memory_table source_table connection table_name primary_key temporary on_problems
internal_upload_in_memory_table source_table connection table_name primary_key temporary on_problems row_limit
_ : Database_Table ->
internal_upload_database_table source_table connection table_name primary_key temporary on_problems
internal_upload_database_table source_table connection table_name primary_key temporary on_problems row_limit
_ ->
Panic.throw <| Illegal_Argument.Error ("Unsupported table type: " + Meta.get_qualified_type_name source_table)
## PRIVATE
upload_in_memory_table source_table connection table_name primary_key temporary on_problems =
Panic.recover SQL_Error <| handle_upload_errors <|
connection.jdbc_connection.run_within_transaction <|
internal_upload_in_memory_table source_table connection table_name primary_key temporary on_problems
select_into_table_implementation source_table connection table_name primary_key temporary on_problems =
connection.base_connection.maybe_run_maintenance
real_target_already_exists = connection.base_connection.table_exists table_name
if real_target_already_exists then Error.throw (Table_Already_Exists.Error table_name) else
Panic.recover SQL_Error <| handle_upload_errors <|
dry_run = Context.Output.is_enabled.not
connection.jdbc_connection.run_within_transaction <| case dry_run of
False ->
internal_upload_table source_table connection table_name primary_key temporary on_problems=on_problems row_limit=Nothing
True ->
tmp_table_name = connection.base_connection.generate_dry_run_table_name table_name
table = Context.Output.with_enabled <|
## This temporary table can be safely dropped if it
exists, because it only existed if it was created by
a previous dry run. `generate_dry_run_table_name`
will never return a name of a table that exists but
was created outside of a dry run.
connection.drop_table tmp_table_name if_exists=True
internal_upload_table source_table connection tmp_table_name primary_key temporary on_problems=on_problems row_limit=dry_run_row_limit
temporary_table = connection.base_connection.internal_allocate_dry_run_table table.name
warning = Dry_Run_Operation.Warning "Only a dry run of `select_into_database_table` was performed - a temporary table ("+tmp_table_name+") was created, containing a sample of the data."
Warning.attach warning temporary_table
## PRIVATE
It should be run within a transaction and wrapped in `handle_upload_errors`.
internal_upload_in_memory_table source_table connection table_name primary_key temporary on_problems =
internal_upload_in_memory_table source_table connection table_name primary_key temporary on_problems row_limit =
In_Transaction.ensure_in_transaction <|
created_table_name = create_table_structure connection table_name structure=source_table primary_key=primary_key temporary=temporary allow_existing=False on_problems=on_problems
created_table_name = internal_create_table_structure connection table_name structure=source_table primary_key=primary_key temporary=temporary on_problems=on_problems
column_names = source_table.column_names
## `created_table_name.if_not_error` is used to ensure that if there are
@ -72,34 +117,33 @@ internal_upload_in_memory_table source_table connection table_name primary_key t
internal_translate_known_upload_errors source_table connection primary_key <|
insert_template = make_batched_insert_template connection created_table_name column_names
statement_setter = connection.dialect.get_statement_setter
Panic.rethrow <| connection.jdbc_connection.batch_insert insert_template statement_setter source_table default_batch_size
Panic.rethrow <|
connection.jdbc_connection.batch_insert insert_template statement_setter source_table batch_size=default_batch_size row_limit=row_limit
upload_status.if_not_error <|
connection.query (SQL_Query.Table_Name created_table_name)
## PRIVATE
upload_database_table source_table connection table_name primary_key temporary on_problems =
Panic.recover SQL_Error <| handle_upload_errors <|
connection.jdbc_connection.run_within_transaction <|
internal_upload_database_table source_table connection table_name primary_key temporary on_problems
## PRIVATE
It should be run within a transaction and wrapped in `handle_upload_errors`.
internal_upload_database_table source_table connection table_name primary_key temporary on_problems =
internal_upload_database_table source_table connection table_name primary_key temporary on_problems row_limit =
In_Transaction.ensure_in_transaction <|
connection_check = if source_table.connection.jdbc_connection == connection.jdbc_connection then True else
Error.throw (Unsupported_Database_Operation.Error "The Database table to be uploaded must be coming from the same connection as the connection on which the new table is being created. Cross-connection uploads are currently not supported. To work around this, you can first `.read` the table into memory and then upload it from memory to a different connection.")
connection_check.if_not_error <|
created_table_name = create_table_structure connection table_name structure=source_table primary_key=primary_key temporary=temporary allow_existing=False on_problems=on_problems
# Warning: in some DBs, calling a DDL query in a transaction may commit it. We may have to have some special handling for this.
created_table_name = internal_create_table_structure connection table_name structure=source_table primary_key=primary_key temporary=temporary on_problems=on_problems
upload_status = created_table_name.if_not_error <|
internal_translate_known_upload_errors source_table connection primary_key <|
effective_source_table = case row_limit of
Nothing -> source_table
_ : Integer -> source_table.limit row_limit
## We need to ensure that the columns in this statement are
matching positionally the columns in the newly created
table. But we create both from the same source table, so
that is guaranteed.
copy_into_statement = connection.dialect.generate_sql <|
Query.Insert_From_Select created_table_name source_table.column_names source_table.to_select_query
Query.Insert_From_Select created_table_name effective_source_table.column_names effective_source_table.to_select_query
Panic.rethrow <| connection.execute_update copy_into_statement
upload_status.if_not_error <|
@ -113,7 +157,7 @@ resolve_primary_key structure primary_key = case primary_key of
_ : Vector -> if primary_key.is_empty then Nothing else
validated = primary_key.map key->
if key.is_a Text then key else
Error.throw (Illegal_Argument.Error "Primary key must be a vector of column names.")
Error.throw (Illegal_Argument.Error ("Primary key must be a vector of column names, instead got a " + (Meta.type_of key . to_display_text)))
validated.if_not_error <|
column_names = Set.from_vector (structure.map .name)
missing_columns = (Set.from_vector primary_key).difference column_names
@ -169,12 +213,29 @@ raise_duplicated_primary_key_error source_table primary_key original_panic =
## PRIVATE
align_structure : Database_Table | In_Memory_Table | Vector Column_Description -> Vector Column_Description
align_structure table_or_columns = case table_or_columns of
_ : Vector -> table_or_columns.map def->
if def.is_a Column_Description . not then Error.throw (Illegal_Argument.Error "The structure must be an existing Table or vector of Column_Description.") else
def
_ -> table_or_columns.columns.map column->
vector : Vector -> if vector.is_empty then Error.throw (Illegal_Argument.Error "A table with no columns cannot be created. The `structure` must consist of at list one column description.") else
vector.map def-> case def of
_ : Column_Description -> def
_ : Function ->
Error.throw (Illegal_Argument.Error "The structure should be a vector of Column_Description. Maybe some arguments of Column_Description are missing?")
_ ->
Error.throw (Illegal_Argument.Error "The structure must be an existing Table or vector of Column_Description.")
table : Database_Table -> structure_from_existing_table table
table : In_Memory_Table -> structure_from_existing_table table
## PRIVATE
structure_from_existing_table table =
table.columns.map column->
Column_Description.Value column.name column.value_type
## PRIVATE
Returns the name of the first column in the provided table structure.
It also verifies that the structure is correct.
Used to provide the default value for `primary_key` in `create_table`.
first_column_name_in_structure structure =
aligned = align_structure structure
aligned.first.name
## PRIVATE
Creates a statement that will create a table with structure determined by the
provided columns.
@ -192,13 +253,6 @@ prepare_create_table_statement connection table_name columns primary_key tempora
connection.dialect.generate_sql <|
Query.Create_Table table_name column_descriptors primary_key temporary
## PRIVATE
Generates a random table name if it was nothing, if it is allowed (temporary=True).
resolve_effective_table_name table_name temporary = case table_name of
Nothing -> if temporary then "temporary-table-"+random_uuid else
Error.throw (Illegal_Argument.Error "A name must be provided when creating a non-temporary table.")
_ : Text -> table_name
## PRIVATE
The recommended batch size seems to be between 50 and 100.
See: https://docs.oracle.com/cd/E18283_01/java.112/e16548/oraperf.htm#:~:text=batch%20sizes%20in%20the%20general%20range%20of%2050%20to%20100
@ -214,62 +268,78 @@ make_batched_insert_template connection table_name column_names =
template
## PRIVATE
common_update_table source_table connection table_name update_action key_columns error_on_missing_columns on_problems =
Panic.recover SQL_Error <| handle_upload_errors <|
connection.jdbc_connection.run_within_transaction <|
target_table = connection.query (SQL_Query.Table_Name table_name)
# We catch the `Table_Not_Found` error and handle it specially, if the error was different, it will just get passed through further.
handle_error = target_table.catch Table_Not_Found error->
# Rethrow the error with more info.
msg_suffix = " Use `Connection.create_table` to create a table before trying to append to it."
new_error = error.with_changed_extra_message msg_suffix
Error.throw new_error
if target_table.is_error then handle_error else
tmp_table_name = "temporary-source-table-"+random_uuid
tmp_table = internal_upload_table source_table connection tmp_table_name primary_key=key_columns temporary=True on_problems=Problem_Behavior.Report_Error
tmp_table.if_not_error <|
resulting_table = append_to_existing_table tmp_table target_table update_action key_columns error_on_missing_columns on_problems
connection.drop_table tmp_table.name
resulting_table
common_update_table (source_table : In_Memory_Table | Database_Table) (target_table : In_Memory_Table | Database_Table) update_action key_columns error_on_missing_columns on_problems =
check_target_table_for_update target_table <|
connection = target_table.connection
Panic.recover SQL_Error <| handle_upload_errors <|
connection.jdbc_connection.run_within_transaction <|
effective_key_columns = if key_columns.is_nothing then [] else key_columns
check_update_arguments_structure_match source_table target_table effective_key_columns update_action error_on_missing_columns on_problems <|
tmp_table_name = connection.base_connection.generate_random_table_name "enso-temp-source-table-"
dry_run = Context.Output.is_enabled.not
row_limit = if dry_run then dry_run_row_limit else Nothing
Context.Output.with_enabled <|
tmp_table = internal_upload_table source_table connection tmp_table_name primary_key=effective_key_columns temporary=True on_problems=Problem_Behavior.Report_Error row_limit=row_limit
tmp_table.if_not_error <|
resulting_table = append_to_existing_table tmp_table target_table update_action effective_key_columns dry_run=dry_run
## We don't need to drop the table if append panics, because
all of this happens within a transaction, so in case the
above fails, the whole transaction will be rolled back.
connection.drop_table tmp_table.name
if dry_run.not then resulting_table else
warning = Dry_Run_Operation.Warning "Only a dry run of `update_database_table` was performed - the target table has been returned unchanged."
Warning.attach warning resulting_table
## PRIVATE
check_target_table_for_update target_table ~action = case target_table of
_ : In_Memory_Table -> Error.throw (Illegal_Argument.Error "The target table must be a Database table.")
_ : Database_Table -> if target_table.is_trivial_query . not then Error.throw (Illegal_Argument.Error "The target table must be a simple table reference, like returned by `Connection.query`, without any changes like joins, aggregations or even column modifications.") else
action
## PRIVATE
Assumes that `source_table` is a simple table query without any filters,
joins and other composite operations - if a complex query is needed, it
should be first materialized into a temporary table.
append_to_existing_table source_table target_table update_action key_columns error_on_missing_columns on_problems = In_Transaction.ensure_in_transaction <|
effective_key_columns = if key_columns.is_nothing then [] else key_columns
check_update_arguments source_table target_table effective_key_columns update_action error_on_missing_columns on_problems <|
helper = Append_Helper.Context source_table target_table effective_key_columns
upload_status = case update_action of
Update_Action.Insert ->
helper.check_already_existing_rows <|
helper.insert_rows source_table
Update_Action.Update ->
helper.check_rows_unmatched_in_target <|
helper.check_multiple_target_rows_match <|
helper.update_common_rows
Update_Action.Update_Or_Insert ->
If `dry_run` is set to True, only the checks are performed, but the
operations actually modifying the target table are not.
append_to_existing_table source_table target_table update_action key_columns dry_run = In_Transaction.ensure_in_transaction <|
helper = Append_Helper.Context source_table target_table key_columns dry_run
upload_status = case update_action of
Update_Action.Insert ->
helper.check_already_existing_rows <|
helper.insert_rows source_table
Update_Action.Update ->
helper.check_rows_unmatched_in_target <|
helper.check_multiple_target_rows_match <|
helper.update_common_rows
helper.insert_rows helper.new_source_rows
Update_Action.Align_Records ->
helper.check_multiple_target_rows_match <|
helper.update_common_rows
helper.insert_rows helper.new_source_rows
helper.delete_unmatched_target_rows
upload_status.if_not_error target_table
Update_Action.Update_Or_Insert ->
helper.check_multiple_target_rows_match <|
helper.update_common_rows
helper.insert_rows helper.new_source_rows
Update_Action.Align_Records ->
helper.check_multiple_target_rows_match <|
helper.update_common_rows
helper.insert_rows helper.new_source_rows
helper.delete_unmatched_target_rows
upload_status.if_not_error target_table
## PRIVATE
type Append_Helper
## PRIVATE
Context source_table target_table key_columns
Context source_table target_table key_columns dry_run
## PRIVATE
connection self = self.target_table.connection
## PRIVATE
Runs the action only if running in normal mode.
In dry run mode, it will just return `Nothing`.
if_not_dry_run self ~action = if self.dry_run then Nothing else action
## PRIVATE
The update only affects matched rows, unmatched rows are ignored.
update_common_rows self =
update_common_rows self = self.if_not_dry_run <|
update_statement = self.connection.dialect.generate_sql <|
Query.Update_From_Table self.target_table.name self.source_table.name self.source_table.column_names self.key_columns
Panic.rethrow <| self.connection.execute_update update_statement
@ -280,23 +350,23 @@ type Append_Helper
Behaviour is ill-defined if any of the rows already exist in the target.
If only new rows are supposed to be inserted, they have to be filtered
before inserting.
insert_rows self table_to_insert =
insert_rows self table_to_insert = self.if_not_dry_run <|
insert_statement = self.connection.dialect.generate_sql <|
Query.Insert_From_Select self.target_table.name table_to_insert.column_names table_to_insert.to_select_query
Panic.rethrow <| self.connection.execute_update insert_statement
## PRIVATE
Finds rows that are present in the source but not in the target.
new_source_rows self =
self.source_table.join self.target_table on=self.key_columns join_kind=Join_Kind.Left_Exclusive
## PRIVATE
Deletes rows from target table that were not present in the source.
delete_unmatched_target_rows self =
delete_unmatched_target_rows self = self.if_not_dry_run <|
delete_statement = self.connection.dialect.generate_sql <|
Query.Delete_Unmatched_Rows self.target_table.name self.source_table.name self.key_columns
Panic.rethrow <| self.connection.execute_update delete_statement
## PRIVATE
Finds rows that are present in the source but not in the target.
new_source_rows self =
self.source_table.join self.target_table on=self.key_columns join_kind=Join_Kind.Left_Exclusive
## PRIVATE
Checks if any rows from the source table already exist in the target, and
if they do - raises an error.
@ -342,7 +412,7 @@ type Append_Helper
- all columns in `source_table` have a corresponding column in `target_table`
(with the same name),
- all `key_columns` are present in both source and target tables.
check_update_arguments source_table target_table key_columns update_action error_on_missing_columns on_problems ~action =
check_update_arguments_structure_match source_table target_table key_columns update_action error_on_missing_columns on_problems ~action =
check_source_column source_column =
# The column must exist because it was verified earlier.
target_column = target_table.get source_column.name
@ -370,10 +440,11 @@ check_update_arguments source_table target_table key_columns update_action error
on_problems.attach_problems_before problems action
## PRIVATE
default_key_columns connection table_name =
keys = get_primary_key connection table_name
keys.catch Any _->
Error.throw (Illegal_Argument.Error "Could not determine the primary key for table "+table_name+". Please provide it explicitly.")
default_key_columns (table : Database_Table | In_Memory_Table) =
check_target_table_for_update table <|
keys = get_primary_key table
keys.catch Any _->
Error.throw (Illegal_Argument.Error "Could not determine the primary key for table "+table.name+". Please provide it explicitly.")
## PRIVATE
@ -390,13 +461,32 @@ default_key_columns connection table_name =
UNION both `sqlite_schema` and `temp.sqlite_schema` tables to get results
for both temporary and permanent tables.
TODO [RW] fix keys for SQLite temporary tables and test it
get_primary_key connection table_name =
connection.query (SQL_Query.Table_Name table_name) . if_not_error <|
connection.jdbc_connection.with_connection java_connection->
rs = java_connection.getMetaData.getPrimaryKeys Nothing Nothing table_name
keys_table = result_set_to_table rs connection.dialect.make_column_fetcher_for_type
# The names of the columns are sometimes lowercase and sometimes uppercase, so we do a case insensitive select first.
selected = keys_table.select_columns [Column_Selector.By_Name "COLUMN_NAME", Column_Selector.By_Name "KEY_SEQ"] reorder=True
key_column_names = selected.order_by 1 . at 0 . to_vector
if key_column_names.is_empty then Nothing else key_column_names
TODO [RW] fix keys for SQLite temporary tables #7037
get_primary_key table =
connection = table.connection
connection.jdbc_connection.with_metadata metadata->
rs = metadata.getPrimaryKeys Nothing Nothing table.name
keys_table = result_set_to_table rs connection.dialect.make_column_fetcher_for_type
# The names of the columns are sometimes lowercase and sometimes uppercase, so we do a case insensitive select first.
selected = keys_table.select_columns [Column_Selector.By_Name "COLUMN_NAME", Column_Selector.By_Name "KEY_SEQ"] reorder=True
key_column_names = selected.order_by 1 . at 0 . to_vector
if key_column_names.is_empty then Nothing else key_column_names
## PRIVATE
dry_run_row_limit = 1000
## PRIVATE
Verifies that the used driver supports transactional DDL statements.
Currently, all our drivers should support them. This check is added, so that
when we are adding a new drivers, we don't forget to check if it supports
transactional DDL statements - if it does not - we will need to add some
additional logic to our code.
It is a panic, because it is never expected to happen in user code - if it
happens, it is a bug in our code.
check_transaction_ddl_support connection =
supports_ddl = connection.jdbc_connection.with_metadata metadata->
metadata.supportsDataDefinitionAndDataManipulationTransactions
if supports_ddl.not then
Panic.throw (Illegal_State.Error "The connection "+connection.to_text+" does not support transactional DDL statements. Our current implementation of table updates relies on transactional DDL. To support this driver, the logic needs to be amended.")

View File

@ -506,3 +506,38 @@ Any.should_contain self element frames_to_skip=0 =
Error.should_contain : Any -> Integer -> Test_Result
Error.should_contain self _ frames_to_skip=0 =
Test.fail_match_on_unexpected_error self 1+frames_to_skip
## Asserts that `self` value does not contain an element.
Arguments:
- element: The element to check.
- frames_to_skip (optional, advanced): used to alter the location which is
displayed as the source of this error.
This method delegates to the `contains` method of `self` and will use the
rules of the particular type - be it a `Vector`, `Text` or any custom type
implementing a method `contains : a -> Boolean`.
Any.should_not_contain : Any -> Integer -> Test_Result
Any.should_not_contain self element frames_to_skip=0 =
loc = Meta.get_source_location 1+frames_to_skip
contains_result = Panic.catch No_Such_Method (self.contains element) caught_panic->
if caught_panic.payload.method_name != "contains" then Panic.throw caught_panic else
msg = "The value (" + self.to_text + ") does not support the method `contains` (at " + loc + ")."
Test.fail msg
if contains_result.not then Test_Result.Success else
msg = "The value (" + self.to_text + ") contained the element (" + element.to_text + "), but it was expected to not contain it (at " + loc + ")."
Test.fail msg
## Asserts that `self` value does not contain an element.
Arguments:
- element: The element to check.
- frames_to_skip (optional, advanced): used to alter the location which is
displayed as the source of this error.
This method delegates to the `contains` method of `self` and will use the
rules of the particular type - be it a `Vector`, `Text` or any custom type
implementing a method `contains : a -> Boolean`.
Error.should_not_contain : Any -> Integer -> Test_Result
Error.should_not_contain self _ frames_to_skip=0 =
Test.fail_match_on_unexpected_error self 1+frames_to_skip

View File

@ -0,0 +1,116 @@
package org.enso.database.dryrun;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* A helper for the Enso part of the registry of hidden tables.
*
* <p>It guarantees safety of table reference counting, even when the actions are run from multiple
* threads.
*
* <p>Safety of dropping tables scheduled for removal has to be ensured by the user. Currently, we
* assume that all database operations only run on the main thread. Thus, once the maintenance
* starts, no database operation can `increment` a table, thus bringing back a table scheduled for
* removal. So all tables scheduled for removal are safe to remove as long as no further database
* operations are performed in the meantime. The only thing that can happen concurrently (because
* finalizers are run at safepoints interrupting the main thread) is a finalizer marking a table as
* disposed by calling `decrement` - but this may only make additional tables scheduled for removal.
* It is fine if such a table is not removed in a currently running maintenance cycle that was
* interrupted - it will simply be handled by the next cycle.
*/
public class HiddenTableReferenceCounter {
private final Map<String, Integer> tableRefCounts = new HashMap<>();
/** Increments the counter for a given table name. */
public synchronized void increment(String name) {
tableRefCounts.compute(name, (k, c) -> c == null ? 1 : c + 1);
}
/**
* Decrements the counter for a given table name.
*
* <p>If the counter reaches 0, the table is not yet removed but it is scheduled for removal at
* next maintenance.
*/
public synchronized void decrement(String name) {
tableRefCounts.compute(
name,
(k, c) -> {
if (c == null) {
throw new IllegalStateException(
"The table "
+ name
+ " was not found in the hidden table registry. Reference counter decrement without a paired "
+ "increment?");
}
int newCount = c - 1;
if (newCount < 0) {
throw new IllegalStateException(
"The table "
+ name
+ " already had reference count "
+ c
+ ", but it was decremented again.");
} else {
return newCount;
}
});
}
/**
* Checks if the given table name is currently present in the registry.
*
* <p>A table is 'registered' even if its reference count has dropped to zero, as long as it has
* not been disposed yet.
*/
public synchronized boolean isRegistered(String name) {
return tableRefCounts.containsKey(name);
}
/**
* Returns the list of tables that have no remaining references and should be removed.
*
* <p>Nothing is yet removed from the registry.
*/
public synchronized List<String> getTablesScheduledForRemoval() {
return tableRefCounts.entrySet().stream()
.filter(e -> e.getValue() == 0)
.map(Map.Entry::getKey)
.collect(Collectors.toList());
}
/**
* Marks that the table has been successfully dropped. Only tables scheduled for removal should be
* dropped.
*
* <p>No other database operations should be allowed between `getTablesScheduledForRemoval` is
* invoked and its tables are dropped - as a database operation can 'bring back to life' a table
* that was scheduled for removal and 'unschedule' it.
*/
public synchronized void markAsDropped(String name) {
Integer existingCount = tableRefCounts.remove(name);
if (existingCount == null) {
throw new IllegalStateException(
"Table " + name + " was marked as removed but it was not present in the " + "registry!");
}
if (existingCount > 0) {
throw new IllegalStateException(
"Table "
+ name
+ " was marked as removed but it still had reference count "
+ existingCount
+ "!");
}
}
/** Returns all tables that were ever added to registry and not yet dropped. */
public synchronized List<String> getKnownTables() {
return new ArrayList<>(tableRefCounts.keySet());
}
}

View File

@ -0,0 +1,69 @@
package org.enso.database.dryrun;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.logging.Logger;
import org.graalvm.polyglot.Value;
/**
* A helper for ensuring that only one database operation is running at a time, and that maintenance
* operations requested by finalizers of dry run tables are executed outside of transactions.
*
* <p>Additionally, it allows running maintenance actions when no regular actions are being run.
*/
public class OperationSynchronizer {
/** This lock guarantees that only one thread can access the connection at a time. */
private final ReentrantLock lock = new ReentrantLock();
private int nestingLevel = 0;
/**
* Runs the provided action ensuring that the current thread is the only one accessing the
* critical section (in this case: the connection).
*
* <p>Due to the usage of re-entry lock, this method may be entered recursively. We exploit that
* heavily - every 'primitive' operation is wrapped in this, but also 'bulk' actions like
* transaction are itself wrapped (so that the whole transaction cannot be interrupted).
*
* <p>Note: the return type is Value and not Object to preserve Enso specific additional
* information like warnings or dataflow error; converting to Object could lose some of it.
*/
public Value runSynchronizedAction(Function<Integer, Value> action) {
lock.lock();
try {
nestingLevel++;
return action.apply(nestingLevel);
} finally {
nestingLevel--;
lock.unlock();
}
}
/**
* Runs the provided maintenance action if no regular actions are currently running on this or
* other threads.
*
* <p>If a regular action is currently being executed, this method will exit without doing
* anything. Conversely, the maintenance action is allowed to run regular synchronized actions
* inside of it.
*/
public void runMaintenanceActionIfPossible(Function<Void, Value> maintenanceAction) {
if (lock.tryLock()) {
try {
if (nestingLevel == 0) {
nestingLevel++;
try {
maintenanceAction.apply(null);
} catch (Exception e) {
Logger.getLogger("enso-std-database")
.severe("A maintenance action failed with exception: " + e.getMessage());
} finally {
nestingLevel--;
}
}
} finally {
lock.unlock();
}
}
}
}

View File

@ -37,7 +37,7 @@ public class DateBuilder extends TypedBuilderImpl<LocalDate> {
}
@Override
public Storage<LocalDate> seal() {
protected Storage<LocalDate> doSeal() {
return new DateStorage(data, currentSize);
}
}

View File

@ -33,7 +33,7 @@ public class DateTimeBuilder extends TypedBuilderImpl<ZonedDateTime> {
}
@Override
public Storage<ZonedDateTime> seal() {
protected Storage<ZonedDateTime> doSeal() {
return new DateTimeStorage(data, currentSize);
}
}

View File

@ -108,16 +108,11 @@ public class LongBuilder extends NumericBuilder {
* @param data the integer to append
*/
public void appendLong(long data) {
int wasSize = currentSize;
int wasLength = this.data.length;
if (currentSize >= this.data.length) {
grow();
}
if (currentSize >= this.data.length) {
throw new IllegalStateException("currentSize=" + currentSize + "; wasSize=" + wasSize + "; wasLength=" + wasLength + "; data.length=" + this.data.length);
}
assert currentSize < this.data.length;
appendRawNoGrow(data);
}

View File

@ -59,7 +59,7 @@ public abstract class NumericBuilder extends TypedBuilder {
protected void ensureFreeSpaceFor(int additionalSize) {
if (currentSize + additionalSize > data.length) {
grow(currentSize + additionalSize);
resize(currentSize + additionalSize);
}
}
@ -110,10 +110,10 @@ public abstract class NumericBuilder extends TypedBuilder {
desiredCapacity = currentSize + 1;
}
grow(desiredCapacity);
resize(desiredCapacity);
}
protected void grow(int desiredCapacity) {
protected void resize(int desiredCapacity) {
this.data = Arrays.copyOf(data, desiredCapacity);
}
}

View File

@ -67,7 +67,7 @@ public class ObjectBuilder extends TypedBuilder {
@Override
public void appendBulkStorage(Storage<?> storage) {
if (currentSize + storage.size() > data.length) {
grow(currentSize + storage.size());
resize(currentSize + storage.size());
}
if (storage instanceof SpecializedStorage<?> specializedStorage) {
@ -88,6 +88,7 @@ public class ObjectBuilder extends TypedBuilder {
@Override
public Storage<Object> seal() {
resize(currentSize);
return new ObjectStorage(data, currentSize);
}
@ -96,7 +97,7 @@ public class ObjectBuilder extends TypedBuilder {
}
public void setCurrentSize(int currentSize) {
if (currentSize > data.length) grow(currentSize);
if (currentSize > data.length) resize(currentSize);
this.currentSize = currentSize;
}
@ -120,10 +121,10 @@ public class ObjectBuilder extends TypedBuilder {
desiredCapacity = currentSize + 1;
}
grow(desiredCapacity);
resize(desiredCapacity);
}
private void grow(int desiredCapacity) {
private void resize(int desiredCapacity) {
this.data = Arrays.copyOf(data, desiredCapacity);
}
}

View File

@ -32,7 +32,7 @@ public class StringBuilder extends TypedBuilderImpl<String> {
}
@Override
public Storage<String> seal() {
protected Storage<String> doSeal() {
return new StringStorage(data, currentSize);
}
}

View File

@ -33,7 +33,7 @@ public class TimeOfDayBuilder extends TypedBuilderImpl<LocalTime> {
}
@Override
public Storage<LocalTime> seal() {
protected Storage<LocalTime> doSeal() {
return new TimeOfDayStorage(data, currentSize);
}
}

View File

@ -101,10 +101,22 @@ public abstract class TypedBuilderImpl<T> extends TypedBuilder {
desiredCapacity = currentSize + 1;
}
grow(desiredCapacity);
resize(desiredCapacity);
}
private void grow(int desiredCapacity) {
private void resize(int desiredCapacity) {
this.data = Arrays.copyOf(data, desiredCapacity);
}
protected abstract Storage<T> doSeal();
@Override
public Storage<T> seal() {
// We grow the array to the exact size, because we want to avoid index out of bounds errors.
// Most of the time, the builder was initialized with the right size anyway - the only
// exceptions are e.g. reading results from a database, where the count is unknown.
// In the future we may rely on smarter storage for sparse columns.
resize(currentSize);
return doSeal();
}
}

View File

@ -100,35 +100,37 @@ postgres_specific_spec connection db_name setup =
connection.execute_update 'DROP TABLE "'+tinfo+'";'
tinfo = Name_Generator.random_name "Tinfo"
connection.execute_update 'CREATE TEMPORARY TABLE "'+tinfo+'" ("strs" VARCHAR, "ints" INTEGER, "bools" BOOLEAN, "reals" REAL, "doubles" DOUBLE PRECISION)'
connection.execute_update 'CREATE TEMPORARY TABLE "'+tinfo+'" ("strs" VARCHAR, "ints" BIGINT, "bools" BOOLEAN, "doubles" DOUBLE PRECISION)'
Test.group "[PostgreSQL] Info" <|
t = connection.query (SQL_Query.Table_Name tinfo)
t.insert ["a", Nothing, False, 1.2, 0.000000000001]
t.insert ["abc", Nothing, Nothing, 1.3, Nothing]
t.insert ["def", 42, True, 1.4, 10]
row1 = ["a", Nothing, False, 1.2]
row2 = ["abc", Nothing, Nothing, 1.3]
row3 = ["def", 42, True, 1.4]
Panic.rethrow <|
(Table.from_rows ["strs", "ints", "bools", "doubles"] [row1, row2, row3]).update_database_table t update_action=Update_Action.Insert
Test.specify "should return Table information" <|
i = t.info
i.at "Column" . to_vector . should_equal ["strs", "ints", "bools", "reals", "doubles"]
i.at "Items Count" . to_vector . should_equal [3, 1, 2, 3, 2]
i.at "Value Type" . to_vector . should_equal [default_text, Value_Type.Integer Bits.Bits_32, Value_Type.Boolean, Value_Type.Float Bits.Bits_32, Value_Type.Float Bits.Bits_64]
i.at "Column" . to_vector . should_equal ["strs", "ints", "bools", "doubles"]
i.at "Items Count" . to_vector . should_equal [3, 1, 2, 3]
i.at "Value Type" . to_vector . should_equal [default_text, Value_Type.Integer, Value_Type.Boolean, Value_Type.Float]
Test.specify "should return Table information, also for aggregated results" <|
i = t.aggregate [Concatenate "strs", Sum "ints", Count_Distinct "bools"] . info
i.at "Column" . to_vector . should_equal ["Concatenate strs", "Sum ints", "Count Distinct bools"]
i.at "Items Count" . to_vector . should_equal [1, 1, 1]
i.at "Value Type" . to_vector . should_equal [default_text, Value_Type.Integer, Value_Type.Integer]
i.at "Value Type" . to_vector . should_equal [default_text, Value_Type.Decimal, Value_Type.Integer]
Test.specify "should infer standard types correctly" <|
t.at "strs" . value_type . is_text . should_be_true
t.at "ints" . value_type . is_integer . should_be_true
t.at "bools" . value_type . is_boolean . should_be_true
t.at "reals" . value_type . is_floating_point . should_be_true
t.at "doubles" . value_type . is_floating_point . should_be_true
Test.group "[PostgreSQL] Dialect-specific codegen" <|
Test.specify "should generate queries for the Distinct operation" <|
t = connection.query (SQL_Query.Table_Name tinfo)
code_template = 'SELECT "{Tinfo}"."strs" AS "strs", "{Tinfo}"."ints" AS "ints", "{Tinfo}"."bools" AS "bools", "{Tinfo}"."reals" AS "reals", "{Tinfo}"."doubles" AS "doubles" FROM (SELECT DISTINCT ON ("{Tinfo}_inner"."strs") "{Tinfo}_inner"."strs" AS "strs", "{Tinfo}_inner"."ints" AS "ints", "{Tinfo}_inner"."bools" AS "bools", "{Tinfo}_inner"."reals" AS "reals", "{Tinfo}_inner"."doubles" AS "doubles" FROM (SELECT "{Tinfo}"."strs" AS "strs", "{Tinfo}"."ints" AS "ints", "{Tinfo}"."bools" AS "bools", "{Tinfo}"."reals" AS "reals", "{Tinfo}"."doubles" AS "doubles" FROM "{Tinfo}" AS "{Tinfo}") AS "{Tinfo}_inner") AS "{Tinfo}"'
code_template = 'SELECT "{Tinfo}"."strs" AS "strs", "{Tinfo}"."ints" AS "ints", "{Tinfo}"."bools" AS "bools", "{Tinfo}"."doubles" AS "doubles" FROM (SELECT DISTINCT ON ("{Tinfo}_inner"."strs") "{Tinfo}_inner"."strs" AS "strs", "{Tinfo}_inner"."ints" AS "ints", "{Tinfo}_inner"."bools" AS "bools", "{Tinfo}_inner"."doubles" AS "doubles" FROM (SELECT "{Tinfo}"."strs" AS "strs", "{Tinfo}"."ints" AS "ints", "{Tinfo}"."bools" AS "bools", "{Tinfo}"."doubles" AS "doubles" FROM "{Tinfo}" AS "{Tinfo}") AS "{Tinfo}_inner") AS "{Tinfo}"'
expected_code = code_template.replace "{Tinfo}" tinfo
t.distinct ["strs"] . to_sql . prepare . should_equal [expected_code, []]
connection.execute_update 'DROP TABLE "'+tinfo+'"'

View File

@ -4,7 +4,7 @@ import Standard.Base.Runtime.Ref.Ref
import Standard.Table.Data.Type.Value_Type.Bits
from Standard.Table import Table, Value_Type
from Standard.Database import Database, SQL_Query
from Standard.Database import all
from Standard.AWS import Redshift_Details, AWS_Credential
@ -20,9 +20,12 @@ redshift_specific_spec connection =
tinfo = Name_Generator.random_name "Tinfo"
connection.execute_update 'CREATE TEMPORARY TABLE "'+tinfo+'" ("strs" VARCHAR, "ints" INTEGER, "bools" BOOLEAN, "reals" REAL)'
t = connection.query (SQL_Query.Table_Name tinfo)
t.insert ["a", Nothing, False, 1.2]
t.insert ["abc", Nothing, Nothing, 1.3]
t.insert ["def", 42, True, 1.4]
row1 = ["a", Nothing, False, 1.2]
row2 = ["abc", Nothing, Nothing, 1.3]
row3 = ["def", 42, True, 1.4]
Panic.rethrow <|
(Table.from_rows ["strs", "ints", "bools", "reals"] [row1, row2, row3]).update_database_table t update_action=Update_Action.Insert
Test.specify "should return Table information" <|
i = t.info
i.at "Column" . to_vector . should_equal ["strs", "ints", "bools", "reals"]

View File

@ -92,9 +92,12 @@ sqlite_specific_spec prefix connection =
connection.execute_update 'CREATE TABLE "'+tinfo+'" ("strs" VARCHAR, "ints" INTEGER, "bools" BOOLEAN, "reals" REAL)'
Test.group prefix+"Metadata" <|
t = connection.query (SQL_Query.Table_Name tinfo)
t.insert ["a", Nothing, False, 1.2]
t.insert ["abc", Nothing, Nothing, 1.3]
t.insert ["def", 42, True, 1.4]
row1 = ["a", Nothing, False, 1.2]
row2 = ["abc", Nothing, Nothing, 1.3]
row3 = ["def", 42, True, 1.4]
Panic.rethrow <|
(Table.from_rows ["strs", "ints", "bools", "reals"] [row1, row2, row3]).update_database_table t update_action=Update_Action.Insert
Test.specify "should return Table information" <|
i = t.info
i.at "Column" . to_vector . should_equal ["strs", "ints", "bools", "reals"]

View File

@ -4,10 +4,14 @@ from Standard.Table import Value_Type
from Standard.Database import all
from Standard.Database.Errors import all
import Standard.Database.Internal.IR.Query.Query
import Standard.Database.Internal.IR.SQL_Expression.SQL_Expression
from Standard.Test import Test, Test_Suite, Problems
import Standard.Test.Extensions
import project.Database.Helpers.Name_Generator
type My_Error
Error
@ -17,41 +21,52 @@ main = Test_Suite.run_main <|
spec connection prefix =
Test.group prefix+"Transaction Support" <|
simple_table_structure = [Column_Description.Value "X" Value_Type.Integer]
insert_value table_name x =
pairs = [["X", SQL_Expression.Constant x]]
sql = connection.dialect.generate_sql <| Query.Insert table_name pairs
connection.execute_update sql . should_succeed
Test.specify "should commit the changes after the transaction returns a regular value" <|
t1 = connection.create_table table_name=Nothing structure=simple_table_structure temporary=True
table_name = Name_Generator.random_name "transaction-test-1"
t1 = connection.create_table table_name=table_name structure=simple_table_structure temporary=True
t1.should_succeed
r1 = connection.jdbc_connection.run_within_transaction <|
t1.insert [1] . should_succeed
insert_value table_name 1
42
r1.should_equal 42
t1.at "X" . to_vector . should_equal [1]
connection.drop_table table_name
Test.specify "should rollback the changes after the inner action panics" <|
t1 = connection.create_table table_name=Nothing structure=simple_table_structure temporary=True
table_name = Name_Generator.random_name "transaction-test-2"
t1 = connection.create_table table_name=table_name structure=simple_table_structure temporary=True
t1.should_succeed
Test.expect_panic_with matcher=My_Error <|
connection.jdbc_connection.run_within_transaction <|
t1.insert [1] . should_succeed
insert_value table_name 1
Panic.throw My_Error.Error
t1.at "X" . to_vector . should_equal []
connection.drop_table table_name
Test.specify "should rollback the changes if the inner action returns a dataflow error" <|
t1 = connection.create_table table_name=Nothing structure=simple_table_structure temporary=True
table_name = Name_Generator.random_name "transaction-test-3"
t1 = connection.create_table table_name=table_name structure=simple_table_structure temporary=True
t1.should_succeed
r1 = connection.jdbc_connection.run_within_transaction <|
t1.insert [1] . should_succeed
insert_value table_name 1
Error.throw My_Error.Error
r1.should_fail_with My_Error
t1.at "X" . to_vector . should_equal []
connection.drop_table table_name
Test.specify "should commit the changes even if the inner action return value has warnings attached" <|
t1 = connection.create_table table_name=Nothing structure=simple_table_structure temporary=True
table_name = Name_Generator.random_name "transaction-test-4"
t1 = connection.create_table table_name=table_name structure=simple_table_structure temporary=True
t1.should_succeed
r1 = connection.jdbc_connection.run_within_transaction <|
t1.insert [1] . should_succeed
insert_value table_name 1
result = 43
with_warnings = Warning.attach My_Error.Error result
with_warnings
@ -59,3 +74,4 @@ spec connection prefix =
Problems.expect_only_warning My_Error r1
t1.at "X" . to_vector . should_equal [1]
connection.drop_table table_name

View File

@ -14,6 +14,8 @@ from Standard.Database.Errors import Unsupported_Database_Operation
from Standard.Test import Problems, Test, Test_Suite
import Standard.Test.Extensions
import project.Database.Helpers.Name_Generator
spec =
connection = Database.connect (SQLite In_Memory)
make_table prefix columns =
@ -104,7 +106,7 @@ spec =
Test.specify "does not support creating tables with date/time values" <|
t = Table.new [["a", [Date.now]], ["b", [Time_Of_Day.now]], ["c", [Date_Time.now]]]
r1 = t.select_into_database_table connection temporary=True
r1 = t.select_into_database_table connection table_name=(Name_Generator.random_name "date-time-table") temporary=True
r1.should_fail_with Unsupported_Database_Operation
Test.specify "should be able to infer types for all supported operations" <|

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1 @@
LICENSE

View File

@ -0,0 +1 @@
/com-lihaoyi/fansi/blob/master/LICENSE

View File

@ -0,0 +1 @@
/com-lihaoyi/sourcecode/blob/master/LICENSE

View File

@ -1,3 +1,3 @@
57D3FECE4A826A4D447995504980012D29E6FF665FA3049D93C0A342D2B56C61
62D7111156F2837511DAE7CA11AEA5129ABD14236AA13070393304B4887F271F
BCF336A04354336D1ED20ABB66D645C8FA6CFB62E53F1D8F736DDEAC7A52B90E
98C3544E2C945D2D0ABB06AE304D72A1366215108703BFB232A4EC5084F93433
0