This commit is contained in:
AdRiley 2024-11-07 16:02:54 +00:00 committed by GitHub
parent 676a7d4256
commit 014a2a7e6b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 39 additions and 46 deletions

View File

@ -8,7 +8,7 @@ import project.Column_Description.Column_Description
import project.DB_Table.DB_Table
import project.Internal.In_Transaction.In_Transaction
import project.SQL_Query.SQL_Query
from project.Internal.Upload.Operations.Internal_Core import internal_create_table_structure
from project.Internal.Upload.Operations.Upload_Table import create_table_structure
## PRIVATE
Level of support of DDL statements inside of a transaction.
@ -47,7 +47,7 @@ type Transactional_Table_Description
This operation creates the tables regardless of the Output Context setting.
It is the responsibility of the caller to ensure that the operation may proceed.
private create self connection -> DB_Table = Context.Output.with_enabled <|
created_name = internal_create_table_structure connection self.name self.structure primary_key=self.primary_key temporary=self.temporary on_problems=self.on_problems
created_name = create_table_structure connection self.name self.structure primary_key=self.primary_key temporary=self.temporary on_problems=self.on_problems
connection.query (SQL_Query.Table_Name created_name)
## PRIVATE

View File

@ -6,7 +6,7 @@ import Standard.Base.Runtime.Context
import project.SQL_Query.SQL_Query
from project.Errors import SQL_Error, Table_Already_Exists
from project.Internal.Upload.Operations.Internal_Core import internal_create_table_structure, resolve_temp_table_name
from project.Internal.Upload.Operations.Upload_Table import create_table_structure, resolve_temp_table_name
## PRIVATE
Creates a new database table with the provided structure and returns the name
@ -42,7 +42,7 @@ create_table_implementation connection table_name structure primary_key temporar
will never return a name of a table that exists but
was created outside of a dry run.
connection.drop_table effective_table_name if_exists=True
internal_create_table_structure connection effective_table_name structure primary_key effective_temporary on_problems
create_table_structure connection effective_table_name structure primary_key effective_temporary on_problems
case created_table_name.is_error of
False ->
if dry_run.not then connection.query (SQL_Query.Table_Name created_table_name) else

View File

@ -14,7 +14,7 @@ from project.Errors import SQL_Error
from project.Internal.Upload.Helpers.Argument_Checks import check_delete_rows_arguments
from project.Internal.Upload.Helpers.Check_Queries import check_duplicate_key_matches_for_delete, check_for_null_keys
from project.Internal.Upload.Helpers.Constants import dry_run_row_limit
from project.Internal.Upload.Operations.Internal_Core import internal_upload_in_memory_table, internal_upload_table, Table_Upload_Operation
from project.Internal.Upload.Operations.Upload_Table import create_table_upload_operation, Table_Upload_Operation
## PRIVATE
common_delete_rows (target_table : DB_Table) (key_values_to_delete : Table | DB_Table) (key_columns : Vector Text) (allow_duplicate_matches : Boolean) -> Integer =
@ -65,7 +65,7 @@ type Delete_Rows_Source
prepare connection (key_values_to_delete : DB_Table | Table) key_columns =
prepared_table = common_preprocess_source_table key_values_to_delete key_columns
tmp_table_name = connection.base_connection.table_naming_helper.generate_random_table_name "enso-temp-keys-table-"
copied_table = internal_upload_table prepared_table connection tmp_table_name primary_key=key_columns temporary=True remove_after_transaction=True on_problems=Problem_Behavior.Report_Error row_limit=Nothing
copied_table = create_table_upload_operation prepared_table connection tmp_table_name primary_key=key_columns temporary=True remove_after_transaction=True on_problems=Problem_Behavior.Report_Error row_limit=Nothing
Delete_Rows_Source.Temporary_DB_Table copied_table tmp_table_name
## PRIVATE
@ -96,7 +96,7 @@ type Delete_Rows_Dry_Run_Source
Delete_Rows_Dry_Run_Source.Existing_DB_Query prepared_table
_ : Table ->
tmp_table_name = connection.base_connection.table_naming_helper.generate_random_table_name "enso-temp-keys-table-"
upload_recipe = internal_upload_in_memory_table prepared_table connection tmp_table_name primary_key=key_columns temporary=True remove_after_transaction=True structure_hint=Nothing on_problems=Problem_Behavior.Report_Error row_limit=dry_run_row_limit
upload_recipe = create_table_upload_operation prepared_table connection tmp_table_name primary_key=key_columns temporary=True remove_after_transaction=True structure_hint=Nothing on_problems=Problem_Behavior.Report_Error row_limit=dry_run_row_limit
row_limit_exceeded = prepared_table.row_count > dry_run_row_limit
dry_run_message_suffix = case row_limit_exceeded of
False -> ""

View File

@ -9,7 +9,7 @@ import project.Internal.DDL_Transaction
from project.Errors import SQL_Error, Table_Already_Exists
from project.Internal.Upload.Helpers.Constants import dry_run_row_limit
from project.Internal.Upload.Helpers.Error_Helpers import handle_upload_errors
from project.Internal.Upload.Operations.Internal_Core import internal_upload_table, resolve_temp_table_name
from project.Internal.Upload.Operations.Upload_Table import create_table_upload_operation, resolve_temp_table_name
## PRIVATE
select_into_table_implementation source_table connection table_name primary_key temporary on_problems:Problem_Behavior =
@ -45,6 +45,6 @@ select_into_table_implementation source_table connection table_name primary_key
## PRIVATE
upload_table_in_transaction source_table connection table_name primary_key temporary on_problems row_limit =
upload_operation = internal_upload_table source_table connection table_name primary_key=primary_key temporary=temporary on_problems=on_problems row_limit=row_limit
upload_operation = create_table_upload_operation source_table connection table_name primary_key=primary_key temporary=temporary on_problems=on_problems row_limit=row_limit
DDL_Transaction.run_transaction_with_tables connection [upload_operation.table_description] _->
upload_operation.perform_upload

View File

@ -18,7 +18,7 @@ from project.Internal.Upload.Helpers.Argument_Checks import all
from project.Internal.Upload.Helpers.Check_Queries import check_for_null_keys_if_any_keys_set, check_multiple_rows_match
from project.Internal.Upload.Helpers.Constants import dry_run_row_limit
from project.Internal.Upload.Helpers.Error_Helpers import handle_upload_errors
from project.Internal.Upload.Operations.Internal_Core import internal_upload_table
from project.Internal.Upload.Operations.Upload_Table import create_table_upload_operation
## PRIVATE
common_update_table (source_table : DB_Table | Table) (target_table : DB_Table) update_action key_columns error_on_missing_columns on_problems:Problem_Behavior =
@ -32,8 +32,8 @@ common_update_table (source_table : DB_Table | Table) (target_table : DB_Table)
row_limit = if dry_run then dry_run_row_limit else Nothing
structure_hint = target_table.select_columns source_table.column_names reorder=True . columns . map c->
Column_Description.Value c.name c.value_type
# We ignore non-critical problems in `internal_upload_table` because we already checked the structure.
tmp_table_uploader = internal_upload_table source_table connection tmp_table_name primary_key=effective_key_columns structure_hint=structure_hint temporary=True remove_after_transaction=True on_problems=Problem_Behavior.Ignore row_limit=row_limit
# We ignore non-critical problems in `create_table_upload_operation` because we already checked the structure.
tmp_table_uploader = create_table_upload_operation source_table connection tmp_table_name primary_key=effective_key_columns structure_hint=structure_hint temporary=True remove_after_transaction=True on_problems=Problem_Behavior.Ignore row_limit=row_limit
DDL_Transaction.run_transaction_with_tables connection [tmp_table_uploader.table_description] _-> Context.Output.with_enabled <|
## The table was only created but not yet uploaded.
Now we are in transaction so we can atomically check the source_table and only after it meets criteria - upload that table contents to tmp.

View File

@ -22,10 +22,9 @@ from project.Internal.Upload.Helpers.Error_Helpers import handle_upload_errors,
from project.Internal.Upload.Helpers.Prepare_Structure import align_structure, validate_structure, verify_structure_hint
from project.Internal.Upload.Helpers.SQL_Helpers import make_batched_insert_template, prepare_create_table_statement
## PRIVATE
Assumes the output context is enabled for it to work.
## Assumes the output context is enabled for it to work.
Creates a table in the Database and returns its name.
internal_create_table_structure connection table_name structure primary_key temporary on_problems:Problem_Behavior -> Text =
create_table_structure connection table_name structure primary_key temporary on_problems:Problem_Behavior -> Text =
aligned_structure = align_structure connection structure
resolved_primary_key = resolve_primary_key aligned_structure primary_key
validate_structure connection.base_connection.column_naming_helper aligned_structure <|
@ -37,21 +36,31 @@ internal_create_table_structure connection table_name structure primary_key temp
final_result.catch SQL_Error sql_error->
if connection.dialect.get_error_mapper.is_table_already_exists_error sql_error then Error.throw (Table_Already_Exists.Error table_name) else final_result
## PRIVATE
We split uploading a table within a transaction into two steps:
## Ensures the table has a valid temporary name for databases that have special naming rules for temporary tables.
Like SQLServer, where temporary tables must start with `#`.
resolve_temp_table_name connection temporary:Boolean table_name:Text -> Text =
case temporary of
False -> case table_name.starts_with "#" of
True -> Error.throw <| Illegal_Argument.Error ("Table name cannot start with '#': " + table_name)
False -> table_name
True -> case connection.dialect.temp_table_style of
Temp_Table_Style.Temporary_Table -> table_name
Temp_Table_Style.Hash_Prefix -> case table_name.starts_with "#" of
True -> table_name
False -> "#" + table_name
## We split uploading a table within a transaction into two steps:
1. creating the table structure, by passing the description to `DDL_Transaction.run_transaction_with_tables`,
2. uploading the data inside of the `run_transaction_with_tables` transactional block.
type Table_Upload_Operation
Value table_description:Transactional_Table_Description (internal_upload_callback : Nothing -> DB_Table)
## PRIVATE
Performs the actual data upload.
## Performs the actual data upload.
This method should be called inside of `run_transaction_with_tables`
block which had the `table_description` passed to it.
perform_upload self -> DB_Table = self.internal_upload_callback Nothing
## PRIVATE
A helper to prepare an upload operation for a table.
## A helper to prepare an upload operation for a table.
Arguments:
- source_table: the table to be uploaded.
@ -63,21 +72,19 @@ type Table_Upload_Operation
- temporary: if `True`, the table will be created as temporary.
- structure_hint: If set, it can be used to hint what types should be used for the columns of the table. Useful if the types slightly differ from the in-memory source types.
- row_limit: if set, only the first `row_limit` rows will be uploaded.
internal_upload_table : DB_Table | Table -> Connection -> Text -> Nothing | Vector Text -> Boolean -> Boolean -> Nothing | Vector Column_Description -> Problem_Behavior -> Integer | Nothing -> Table_Upload_Operation
internal_upload_table source_table connection (table_name : Text) (primary_key : Nothing | Vector Text) (temporary : Boolean) (remove_after_transaction : Boolean = False) structure_hint=Nothing (on_problems:Problem_Behavior=..Report_Error) (row_limit : Integer | Nothing = Nothing) -> Table_Upload_Operation =
create_table_upload_operation : DB_Table | Table -> Connection -> Text -> Nothing | Vector Text -> Boolean -> Boolean -> Nothing | Vector Column_Description -> Problem_Behavior -> Integer | Nothing -> Table_Upload_Operation
create_table_upload_operation source_table connection (table_name : Text) (primary_key : Nothing | Vector Text) (temporary : Boolean) (remove_after_transaction : Boolean = False) structure_hint=Nothing (on_problems:Problem_Behavior=..Report_Error) (row_limit : Integer | Nothing = Nothing) -> Table_Upload_Operation =
resolved_table_name = resolve_temp_table_name connection temporary table_name
case source_table of
_ : Table ->
internal_upload_in_memory_table source_table connection resolved_table_name primary_key temporary remove_after_transaction structure_hint on_problems row_limit
_create_in_memory_table_upload_op source_table connection resolved_table_name primary_key temporary remove_after_transaction structure_hint on_problems row_limit
_ : DB_Table ->
internal_upload_database_table source_table connection resolved_table_name primary_key temporary remove_after_transaction structure_hint on_problems row_limit
_create_database_table_upload_op source_table connection resolved_table_name primary_key temporary remove_after_transaction structure_hint on_problems row_limit
_ ->
Panic.throw <| Illegal_Argument.Error ("Unsupported table type: " + Meta.get_qualified_type_name source_table)
## PRIVATE
internal_upload_in_memory_table (source_table : Table) connection table_name primary_key temporary remove_after_transaction structure_hint on_problems:Problem_Behavior row_limit = table_name.if_not_error <|
check_outside_transaction
private _create_in_memory_table_upload_op (source_table : Table) connection table_name primary_key temporary remove_after_transaction structure_hint on_problems:Problem_Behavior row_limit = table_name.if_not_error <|
_check_outside_transaction
verify_structure_hint structure_hint source_table.column_names
column_names = source_table.column_names
@ -98,9 +105,8 @@ internal_upload_in_memory_table (source_table : Table) connection table_name pri
Table_Upload_Operation.Value table_description callback
## PRIVATE
internal_upload_database_table (source_table : DB_Table) connection table_name primary_key temporary remove_after_transaction structure_hint on_problems:Problem_Behavior row_limit =
check_outside_transaction
private _create_database_table_upload_op (source_table : DB_Table) connection table_name primary_key temporary remove_after_transaction structure_hint on_problems:Problem_Behavior row_limit =
_check_outside_transaction
connection_check = if source_table.connection.jdbc_connection == connection.jdbc_connection then True else
Error.throw Different_Connections.Error
verify_structure_hint structure_hint source_table.column_names
@ -128,19 +134,6 @@ internal_upload_database_table (source_table : DB_Table) connection table_name p
Table_Upload_Operation.Value table_description callback
## PRIVATE
check_outside_transaction =
private _check_outside_transaction =
if In_Transaction.is_in_transaction then
Panic.throw (Illegal_State.Error "Preparing Table_Upload_Operation should itself be called outside of transaction. This is a bug in the Database library.")
## PRIVATE
resolve_temp_table_name connection temporary:Boolean table_name:Text -> Text =
case temporary of
False -> case table_name.starts_with "#" of
True -> Error.throw <| Illegal_Argument.Error ("Table name cannot start with '#': " + table_name)
False -> table_name
True -> case connection.dialect.temp_table_style of
Temp_Table_Style.Temporary_Table -> table_name
Temp_Table_Style.Hash_Prefix -> case table_name.starts_with "#" of
True -> table_name
False -> "#" + table_name