Proposal 1

This commit is contained in:
Adam Riley 2024-10-18 17:45:37 +03:00
parent 27a535f6d0
commit 2652c8f65c
6 changed files with 64 additions and 64 deletions

View File

@ -8,7 +8,7 @@ import project.Column_Description.Column_Description
import project.DB_Table.DB_Table
import project.Internal.In_Transaction.In_Transaction
import project.SQL_Query.SQL_Query
from project.Internal.Upload.Operations.Internal_Core import internal_create_table_structure
from project.Internal.Upload.Operations.Internal_Core import Internal_Core_Helpers
## PRIVATE
Level of support of DDL statements inside of a transaction.
@ -47,7 +47,7 @@ type Transactional_Table_Description
This operation creates the tables regardless of the Output Context setting.
It is the responsibility of the caller to ensure that the operation may proceed.
private create self connection -> DB_Table = Context.Output.with_enabled <|
created_name = internal_create_table_structure connection self.name self.structure primary_key=self.primary_key temporary=self.temporary on_problems=self.on_problems
created_name = Internal_Core_Helpers.internal_create_table_structure connection self.name self.structure primary_key=self.primary_key temporary=self.temporary on_problems=self.on_problems
connection.query (SQL_Query.Table_Name created_name)
## PRIVATE

View File

@ -6,7 +6,7 @@ import Standard.Base.Runtime.Context
import project.SQL_Query.SQL_Query
from project.Errors import SQL_Error, Table_Already_Exists
from project.Internal.Upload.Operations.Internal_Core import internal_create_table_structure, resolve_temp_table_name
from project.Internal.Upload.Operations.Internal_Core import Internal_Core_Helpers
## PRIVATE
Creates a new database table with the provided structure and returns the name
@ -23,7 +23,7 @@ from project.Internal.Upload.Operations.Internal_Core import internal_create_tab
intercepting the 'already exists' error.
create_table_implementation connection table_name structure primary_key temporary allow_existing on_problems:Problem_Behavior =
connection.base_connection.maybe_run_maintenance
resolved_table_name = resolve_temp_table_name connection temporary table_name
resolved_table_name = Internal_Core_Helpers.resolve_temp_table_name connection temporary table_name
table_naming_helper = connection.base_connection.table_naming_helper
on_exists =
if allow_existing then connection.query (SQL_Query.Table_Name resolved_table_name) else Error.throw (Table_Already_Exists.Error resolved_table_name)
@ -42,7 +42,7 @@ create_table_implementation connection table_name structure primary_key temporar
will never return a name of a table that exists but
was created outside of a dry run.
connection.drop_table effective_table_name if_exists=True
internal_create_table_structure connection effective_table_name structure primary_key effective_temporary on_problems
Internal_Core_Helpers.internal_create_table_structure connection effective_table_name structure primary_key effective_temporary on_problems
case created_table_name.is_error of
False ->
if dry_run.not then connection.query (SQL_Query.Table_Name created_table_name) else

View File

@ -14,7 +14,7 @@ from project.Errors import SQL_Error
from project.Internal.Upload.Helpers.Argument_Checks import check_delete_rows_arguments
from project.Internal.Upload.Helpers.Check_Queries import check_duplicate_key_matches_for_delete, check_for_null_keys
from project.Internal.Upload.Helpers.Constants import dry_run_row_limit
from project.Internal.Upload.Operations.Internal_Core import internal_upload_in_memory_table, internal_upload_table, Table_Upload_Operation
from project.Internal.Upload.Operations.Internal_Core import Table_Upload_Operation, Internal_Core_Helpers
## PRIVATE
common_delete_rows (target_table : DB_Table) (key_values_to_delete : Table | DB_Table) (key_columns : Vector Text) (allow_duplicate_matches : Boolean) -> Integer =
@ -65,7 +65,7 @@ type Delete_Rows_Source
prepare connection (key_values_to_delete : DB_Table | Table) key_columns =
prepared_table = common_preprocess_source_table key_values_to_delete key_columns
tmp_table_name = connection.base_connection.table_naming_helper.generate_random_table_name "enso-temp-keys-table-"
copied_table = internal_upload_table prepared_table connection tmp_table_name primary_key=key_columns temporary=True remove_after_transaction=True on_problems=Problem_Behavior.Report_Error row_limit=Nothing
copied_table = Internal_Core_Helpers.internal_upload_table prepared_table connection tmp_table_name primary_key=key_columns temporary=True remove_after_transaction=True on_problems=Problem_Behavior.Report_Error row_limit=Nothing
Delete_Rows_Source.Temporary_DB_Table copied_table tmp_table_name
## PRIVATE
@ -96,7 +96,7 @@ type Delete_Rows_Dry_Run_Source
Delete_Rows_Dry_Run_Source.Existing_DB_Query prepared_table
_ : Table ->
tmp_table_name = connection.base_connection.table_naming_helper.generate_random_table_name "enso-temp-keys-table-"
upload_recipe = internal_upload_in_memory_table prepared_table connection tmp_table_name primary_key=key_columns temporary=True remove_after_transaction=True structure_hint=Nothing on_problems=Problem_Behavior.Report_Error row_limit=dry_run_row_limit
upload_recipe = Internal_Core_Helpers.internal_upload_table prepared_table connection tmp_table_name primary_key=key_columns temporary=True remove_after_transaction=True structure_hint=Nothing on_problems=Problem_Behavior.Report_Error row_limit=dry_run_row_limit
row_limit_exceeded = prepared_table.row_count > dry_run_row_limit
dry_run_message_suffix = case row_limit_exceeded of
False -> ""

View File

@ -22,20 +22,57 @@ from project.Internal.Upload.Helpers.Error_Helpers import handle_upload_errors,
from project.Internal.Upload.Helpers.Prepare_Structure import align_structure, validate_structure, verify_structure_hint
from project.Internal.Upload.Helpers.SQL_Helpers import make_batched_insert_template, prepare_create_table_statement
## PRIVATE
Assumes the output context is enabled for it to work.
Creates a table in the Database and returns its name.
internal_create_table_structure connection table_name structure primary_key temporary on_problems:Problem_Behavior -> Text =
aligned_structure = align_structure connection structure
resolved_primary_key = resolve_primary_key aligned_structure primary_key
validate_structure connection.base_connection.column_naming_helper aligned_structure <|
create_table_statement = prepare_create_table_statement connection table_name aligned_structure resolved_primary_key temporary on_problems
update_result = create_table_statement.if_not_error <|
log_sql_if_enabled connection.jdbc_connection create_table_statement.to_text
connection.execute create_table_statement
final_result = update_result.if_not_error table_name
final_result.catch SQL_Error sql_error->
if connection.dialect.get_error_mapper.is_table_already_exists_error sql_error then Error.throw (Table_Already_Exists.Error table_name) else final_result
type Internal_Core_Helpers
## PRIVATE
Assumes the output context is enabled for it to work.
Creates a table in the Database and returns its name.
internal_create_table_structure connection table_name structure primary_key temporary on_problems:Problem_Behavior -> Text =
aligned_structure = align_structure connection structure
resolved_primary_key = resolve_primary_key aligned_structure primary_key
validate_structure connection.base_connection.column_naming_helper aligned_structure <|
create_table_statement = prepare_create_table_statement connection table_name aligned_structure resolved_primary_key temporary on_problems
update_result = create_table_statement.if_not_error <|
log_sql_if_enabled connection.jdbc_connection create_table_statement.to_text
connection.execute create_table_statement
final_result = update_result.if_not_error table_name
final_result.catch SQL_Error sql_error->
if connection.dialect.get_error_mapper.is_table_already_exists_error sql_error then Error.throw (Table_Already_Exists.Error table_name) else final_result
## PRIVATE
resolve_temp_table_name connection temporary:Boolean table_name:Text -> Text =
case temporary of
False -> case table_name.starts_with "#" of
True -> Error.throw <| Illegal_Argument.Error ("Table name cannot start with '#': " + table_name)
False -> table_name
True -> case connection.dialect.temp_table_style of
Temp_Table_Style.Temporary_Table -> table_name
Temp_Table_Style.Hash_Prefix -> case table_name.starts_with "#" of
True -> table_name
False -> "#" + table_name
## PRIVATE
A helper to prepare an upload operation for a table.
Arguments:
- source_table: the table to be uploaded.
If it's a `DB_Table`, the query will be materialized as a new table.
If it's an In Memmory `Table`, the data will be uploaded to the newly created table.
- connection: the connection to the database.
- table_name: the name of the table to be created.
- primary_key: the primary key of the table to be created. Can be `Nothing` to set no key.
- temporary: if `True`, the table will be created as temporary.
- structure_hint: If set, it can be used to hint what types should be used for the columns of the table. Useful if the types slightly differ from the in-memory source types.
- row_limit: if set, only the first `row_limit` rows will be uploaded.
internal_upload_table : DB_Table | Table -> Connection -> Text -> Nothing | Vector Text -> Boolean -> Boolean -> Nothing | Vector Column_Description -> Problem_Behavior -> Integer | Nothing -> Table_Upload_Operation
internal_upload_table source_table connection (table_name : Text) (primary_key : Nothing | Vector Text) (temporary : Boolean) (remove_after_transaction : Boolean = False) structure_hint=Nothing (on_problems:Problem_Behavior=..Report_Error) (row_limit : Integer | Nothing = Nothing) -> Table_Upload_Operation =
resolved_table_name = Internal_Core_Helpers.resolve_temp_table_name connection temporary table_name
case source_table of
_ : Table ->
internal_upload_in_memory_table source_table connection resolved_table_name primary_key temporary remove_after_transaction structure_hint on_problems row_limit
_ : DB_Table ->
internal_upload_database_table source_table connection resolved_table_name primary_key temporary remove_after_transaction structure_hint on_problems row_limit
_ ->
Panic.throw <| Illegal_Argument.Error ("Unsupported table type: " + Meta.get_qualified_type_name source_table)
## PRIVATE
We split uploading a table within a transaction into two steps:
@ -50,31 +87,6 @@ type Table_Upload_Operation
block which had the `table_description` passed to it.
perform_upload self -> DB_Table = self.internal_upload_callback Nothing
## PRIVATE
A helper to prepare an upload operation for a table.
Arguments:
- source_table: the table to be uploaded.
If it's a `DB_Table`, the query will be materialized as a new table.
If it's an In Memmory `Table`, the data will be uploaded to the newly created table.
- connection: the connection to the database.
- table_name: the name of the table to be created.
- primary_key: the primary key of the table to be created. Can be `Nothing` to set no key.
- temporary: if `True`, the table will be created as temporary.
- structure_hint: If set, it can be used to hint what types should be used for the columns of the table. Useful if the types slightly differ from the in-memory source types.
- row_limit: if set, only the first `row_limit` rows will be uploaded.
internal_upload_table : DB_Table | Table -> Connection -> Text -> Nothing | Vector Text -> Boolean -> Boolean -> Nothing | Vector Column_Description -> Problem_Behavior -> Integer | Nothing -> Table_Upload_Operation
internal_upload_table source_table connection (table_name : Text) (primary_key : Nothing | Vector Text) (temporary : Boolean) (remove_after_transaction : Boolean = False) structure_hint=Nothing (on_problems:Problem_Behavior=..Report_Error) (row_limit : Integer | Nothing = Nothing) -> Table_Upload_Operation =
resolved_table_name = resolve_temp_table_name connection temporary table_name
case source_table of
_ : Table ->
internal_upload_in_memory_table source_table connection resolved_table_name primary_key temporary remove_after_transaction structure_hint on_problems row_limit
_ : DB_Table ->
internal_upload_database_table source_table connection resolved_table_name primary_key temporary remove_after_transaction structure_hint on_problems row_limit
_ ->
Panic.throw <| Illegal_Argument.Error ("Unsupported table type: " + Meta.get_qualified_type_name source_table)
## PRIVATE
internal_upload_in_memory_table (source_table : Table) connection table_name primary_key temporary remove_after_transaction structure_hint on_problems:Problem_Behavior row_limit = table_name.if_not_error <|
check_outside_transaction
@ -132,15 +144,3 @@ internal_upload_database_table (source_table : DB_Table) connection table_name p
check_outside_transaction =
if In_Transaction.is_in_transaction then
Panic.throw (Illegal_State.Error "Preparing Table_Upload_Operation should itself be called outside of transaction. This is a bug in the Database library.")
## PRIVATE
resolve_temp_table_name connection temporary:Boolean table_name:Text -> Text =
case temporary of
False -> case table_name.starts_with "#" of
True -> Error.throw <| Illegal_Argument.Error ("Table name cannot start with '#': " + table_name)
False -> table_name
True -> case connection.dialect.temp_table_style of
Temp_Table_Style.Temporary_Table -> table_name
Temp_Table_Style.Hash_Prefix -> case table_name.starts_with "#" of
True -> table_name
False -> "#" + table_name

View File

@ -9,12 +9,12 @@ import project.Internal.DDL_Transaction
from project.Errors import SQL_Error, Table_Already_Exists
from project.Internal.Upload.Helpers.Constants import dry_run_row_limit
from project.Internal.Upload.Helpers.Error_Helpers import handle_upload_errors
from project.Internal.Upload.Operations.Internal_Core import internal_upload_table, resolve_temp_table_name
from project.Internal.Upload.Operations.Internal_Core import Internal_Core_Helpers
## PRIVATE
select_into_table_implementation source_table connection table_name primary_key temporary on_problems:Problem_Behavior =
connection.base_connection.maybe_run_maintenance
resolved_table_name = resolve_temp_table_name connection temporary table_name
resolved_table_name = Internal_Core_Helpers.resolve_temp_table_name connection temporary table_name
table_naming_helper = connection.base_connection.table_naming_helper
table_naming_helper.verify_table_name resolved_table_name <|
Panic.recover SQL_Error <| handle_upload_errors <|
@ -45,6 +45,6 @@ select_into_table_implementation source_table connection table_name primary_key
## PRIVATE
upload_table_in_transaction source_table connection table_name primary_key temporary on_problems row_limit =
upload_operation = internal_upload_table source_table connection table_name primary_key=primary_key temporary=temporary on_problems=on_problems row_limit=row_limit
upload_operation = Internal_Core_Helpers.internal_upload_table source_table connection table_name primary_key=primary_key temporary=temporary on_problems=on_problems row_limit=row_limit
DDL_Transaction.run_transaction_with_tables connection [upload_operation.table_description] _->
upload_operation.perform_upload

View File

@ -18,7 +18,7 @@ from project.Internal.Upload.Helpers.Argument_Checks import all
from project.Internal.Upload.Helpers.Check_Queries import check_for_null_keys_if_any_keys_set, check_multiple_rows_match
from project.Internal.Upload.Helpers.Constants import dry_run_row_limit
from project.Internal.Upload.Helpers.Error_Helpers import handle_upload_errors
from project.Internal.Upload.Operations.Internal_Core import internal_upload_table
from project.Internal.Upload.Operations.Internal_Core import Internal_Core_Helpers
## PRIVATE
common_update_table (source_table : DB_Table | Table) (target_table : DB_Table) update_action key_columns error_on_missing_columns on_problems:Problem_Behavior =
@ -33,7 +33,7 @@ common_update_table (source_table : DB_Table | Table) (target_table : DB_Table)
structure_hint = target_table.select_columns source_table.column_names reorder=True . columns . map c->
Column_Description.Value c.name c.value_type
# We ignore non-critical problems in `internal_upload_table` because we already checked the structure.
tmp_table_uploader = internal_upload_table source_table connection tmp_table_name primary_key=effective_key_columns structure_hint=structure_hint temporary=True remove_after_transaction=True on_problems=Problem_Behavior.Ignore row_limit=row_limit
tmp_table_uploader = Internal_Core_Helpers.internal_upload_table source_table connection tmp_table_name primary_key=effective_key_columns structure_hint=structure_hint temporary=True remove_after_transaction=True on_problems=Problem_Behavior.Ignore row_limit=row_limit
DDL_Transaction.run_transaction_with_tables connection [tmp_table_uploader.table_description] _-> Context.Output.with_enabled <|
## The table was only created but not yet uploaded.
Now we are in transaction so we can atomically check the source_table and only after it meets criteria - upload that table contents to tmp.