This commit is contained in:
Adam Riley 2024-10-18 18:03:25 +03:00
parent 27a535f6d0
commit 876ab9f7c3
3 changed files with 65 additions and 61 deletions

View File

@ -14,7 +14,7 @@ from project.Errors import SQL_Error
from project.Internal.Upload.Helpers.Argument_Checks import check_delete_rows_arguments
from project.Internal.Upload.Helpers.Check_Queries import check_duplicate_key_matches_for_delete, check_for_null_keys
from project.Internal.Upload.Helpers.Constants import dry_run_row_limit
from project.Internal.Upload.Operations.Internal_Core import internal_upload_in_memory_table, internal_upload_table, Table_Upload_Operation
from project.Internal.Upload.Operations.Internal_Core import internal_upload_table, Table_Upload_Operation
## PRIVATE
common_delete_rows (target_table : DB_Table) (key_values_to_delete : Table | DB_Table) (key_columns : Vector Text) (allow_duplicate_matches : Boolean) -> Integer =
@ -96,7 +96,7 @@ type Delete_Rows_Dry_Run_Source
Delete_Rows_Dry_Run_Source.Existing_DB_Query prepared_table
_ : Table ->
tmp_table_name = connection.base_connection.table_naming_helper.generate_random_table_name "enso-temp-keys-table-"
upload_recipe = internal_upload_in_memory_table prepared_table connection tmp_table_name primary_key=key_columns temporary=True remove_after_transaction=True structure_hint=Nothing on_problems=Problem_Behavior.Report_Error row_limit=dry_run_row_limit
upload_recipe = internal_upload_table prepared_table connection tmp_table_name primary_key=key_columns temporary=True remove_after_transaction=True structure_hint=Nothing on_problems=Problem_Behavior.Report_Error row_limit=dry_run_row_limit
row_limit_exceeded = prepared_table.row_count > dry_run_row_limit
dry_run_message_suffix = case row_limit_exceeded of
False -> ""

View File

@ -0,0 +1,61 @@
private
from Standard.Base import all
## PRIVATE
internal_upload_in_memory_table (source_table : Table) connection table_name primary_key temporary remove_after_transaction structure_hint on_problems:Problem_Behavior row_limit = table_name.if_not_error <|
check_outside_transaction
verify_structure_hint structure_hint source_table.column_names
column_names = source_table.column_names
insert_template = make_batched_insert_template connection table_name column_names
statement_setter = connection.dialect.get_statement_setter
structure = structure_hint.if_nothing source_table
aligned_structure = align_structure connection structure
expected_type_hints = aligned_structure.map .value_type
table_description = Transactional_Table_Description.Value table_name temporary=temporary structure=aligned_structure primary_key=primary_key remove_after_transaction=remove_after_transaction on_problems=on_problems
callback _ = In_Transaction.ensure_in_transaction <|
upload_status = internal_translate_known_upload_errors source_table connection primary_key <|
Panic.rethrow <|
connection.jdbc_connection.batch_insert insert_template statement_setter source_table batch_size=default_batch_size expected_type_hints=expected_type_hints row_limit=row_limit
upload_status.if_not_error <|
connection.query (SQL_Query.Table_Name table_name)
Table_Upload_Operation.Value table_description callback
## PRIVATE
internal_upload_database_table (source_table : DB_Table) connection table_name primary_key temporary remove_after_transaction structure_hint on_problems:Problem_Behavior row_limit =
check_outside_transaction
connection_check = if source_table.connection.jdbc_connection == connection.jdbc_connection then True else
Error.throw Different_Connections.Error
verify_structure_hint structure_hint source_table.column_names
connection_check.if_not_error <| table_name.if_not_error <|
structure = structure_hint.if_nothing source_table
aligned_structure = align_structure connection structure
table_description = Transactional_Table_Description.Value table_name temporary=temporary structure=aligned_structure primary_key=primary_key remove_after_transaction=remove_after_transaction on_problems=on_problems
effective_source_table = case row_limit of
Nothing -> source_table
_ : Integer -> source_table.limit row_limit
## We need to ensure that the columns in this statement are
matching positionally the columns in the newly created
table. But we create both from the same source table, so
that is guaranteed.
copy_into_statement = connection.dialect.generate_sql <|
Query.Insert_From_Select table_name effective_source_table.column_names effective_source_table.to_select_query
callback _ = In_Transaction.ensure_in_transaction <|
upload_status =
internal_translate_known_upload_errors source_table connection primary_key <|
Panic.rethrow <| connection.execute_update copy_into_statement
upload_status.if_not_error <|
connection.query (SQL_Query.Table_Name table_name)
Table_Upload_Operation.Value table_description callback
## PRIVATE
check_outside_transaction =
if In_Transaction.is_in_transaction then
Panic.throw (Illegal_State.Error "Preparing Table_Upload_Operation should itself be called outside of transaction. This is a bug in the Database library.")

View File

@ -13,6 +13,7 @@ import project.Internal.DDL_Transaction.Transactional_Table_Description
import project.Internal.In_Transaction.In_Transaction
import project.Internal.IR.Query.Query
import project.SQL_Query.SQL_Query
from project.Dialect import Temp_Table_Style
from project.Errors import SQL_Error, Table_Already_Exists, Different_Connections
from project.Internal.JDBC_Connection import log_sql_if_enabled
@ -21,6 +22,7 @@ from project.Internal.Upload.Helpers.Constants import default_batch_size
from project.Internal.Upload.Helpers.Error_Helpers import handle_upload_errors, internal_translate_known_upload_errors
from project.Internal.Upload.Helpers.Prepare_Structure import align_structure, validate_structure, verify_structure_hint
from project.Internal.Upload.Helpers.SQL_Helpers import make_batched_insert_template, prepare_create_table_statement
from project.Internal.Upload.Operations.Internal.Internal_Core_Impl import all
## PRIVATE
Assumes the output context is enabled for it to work.
@ -74,65 +76,6 @@ internal_upload_table source_table connection (table_name : Text) (primary_key :
_ ->
Panic.throw <| Illegal_Argument.Error ("Unsupported table type: " + Meta.get_qualified_type_name source_table)
## PRIVATE
internal_upload_in_memory_table (source_table : Table) connection table_name primary_key temporary remove_after_transaction structure_hint on_problems:Problem_Behavior row_limit = table_name.if_not_error <|
check_outside_transaction
verify_structure_hint structure_hint source_table.column_names
column_names = source_table.column_names
insert_template = make_batched_insert_template connection table_name column_names
statement_setter = connection.dialect.get_statement_setter
structure = structure_hint.if_nothing source_table
aligned_structure = align_structure connection structure
expected_type_hints = aligned_structure.map .value_type
table_description = Transactional_Table_Description.Value table_name temporary=temporary structure=aligned_structure primary_key=primary_key remove_after_transaction=remove_after_transaction on_problems=on_problems
callback _ = In_Transaction.ensure_in_transaction <|
upload_status = internal_translate_known_upload_errors source_table connection primary_key <|
Panic.rethrow <|
connection.jdbc_connection.batch_insert insert_template statement_setter source_table batch_size=default_batch_size expected_type_hints=expected_type_hints row_limit=row_limit
upload_status.if_not_error <|
connection.query (SQL_Query.Table_Name table_name)
Table_Upload_Operation.Value table_description callback
## PRIVATE
internal_upload_database_table (source_table : DB_Table) connection table_name primary_key temporary remove_after_transaction structure_hint on_problems:Problem_Behavior row_limit =
check_outside_transaction
connection_check = if source_table.connection.jdbc_connection == connection.jdbc_connection then True else
Error.throw Different_Connections.Error
verify_structure_hint structure_hint source_table.column_names
connection_check.if_not_error <| table_name.if_not_error <|
structure = structure_hint.if_nothing source_table
aligned_structure = align_structure connection structure
table_description = Transactional_Table_Description.Value table_name temporary=temporary structure=aligned_structure primary_key=primary_key remove_after_transaction=remove_after_transaction on_problems=on_problems
effective_source_table = case row_limit of
Nothing -> source_table
_ : Integer -> source_table.limit row_limit
## We need to ensure that the columns in this statement are
matching positionally the columns in the newly created
table. But we create both from the same source table, so
that is guaranteed.
copy_into_statement = connection.dialect.generate_sql <|
Query.Insert_From_Select table_name effective_source_table.column_names effective_source_table.to_select_query
callback _ = In_Transaction.ensure_in_transaction <|
upload_status =
internal_translate_known_upload_errors source_table connection primary_key <|
Panic.rethrow <| connection.execute_update copy_into_statement
upload_status.if_not_error <|
connection.query (SQL_Query.Table_Name table_name)
Table_Upload_Operation.Value table_description callback
## PRIVATE
check_outside_transaction =
if In_Transaction.is_in_transaction then
Panic.throw (Illegal_State.Error "Preparing Table_Upload_Operation should itself be called outside of transaction. This is a bug in the Database library.")
## PRIVATE
resolve_temp_table_name connection temporary:Boolean table_name:Text -> Text =
case temporary of