mirror of
https://github.com/enso-org/enso.git
synced 2024-12-22 18:38:11 +03:00
Enso_File integration update: Multi-part upload and presigned URL for download (#11440)
- Closes #11330 - Closes #11331
This commit is contained in:
parent
cf5326fbd1
commit
2619399799
@ -11,6 +11,7 @@ import project.Data.Text.Text
|
||||
import project.Data.Time.Date.Date
|
||||
import project.Data.Time.Date_Time.Date_Time
|
||||
import project.Data.Time.Time_Of_Day.Time_Of_Day
|
||||
import project.Data.Vector.No_Wrap
|
||||
import project.Data.Vector.Vector
|
||||
import project.Error.Error
|
||||
import project.Errors.Common.No_Such_Conversion
|
||||
@ -172,7 +173,7 @@ type JS_Object
|
||||
mapper = ObjectMapper.new
|
||||
new_object = mapper.createObjectNode
|
||||
keys = Vector.build builder->
|
||||
pairs.map pair->
|
||||
pairs.map on_problems=No_Wrap pair->
|
||||
case pair.first of
|
||||
text : Text ->
|
||||
## Ensure that any dataflow errors that could be stored in `pair.second` are propagated.
|
||||
|
@ -350,7 +350,10 @@ type Enso_File
|
||||
asset = Existing_Enso_Asset.get_asset_reference_for self
|
||||
response = case asset.asset_type of
|
||||
Enso_Asset_Type.File ->
|
||||
Utils.http_request HTTP_Method.Get (asset.internal_uri + "/contents")
|
||||
presigned_url = asset.get_file_description |> get_required_field "url" expected_type=Text
|
||||
# We are skipping the cache, because pre-signed URLs are unlikely to repeat, so caching them would be pointless.
|
||||
# Caching of cloud files tracked by https://github.com/enso-org/enso/issues/11439
|
||||
HTTP.fetch presigned_url HTTP_Method.Get cache_policy=..No_Cache
|
||||
Enso_Asset_Type.Data_Link ->
|
||||
Runtime.assert (open_options.contains Data_Link_Access.No_Follow)
|
||||
Utils.http_request HTTP_Method.Get asset.internal_uri
|
||||
|
@ -6,6 +6,7 @@ import project.Data.Dictionary.Dictionary
|
||||
import project.Data.Hashset.Hashset
|
||||
import project.Data.Json.Invalid_JSON
|
||||
import project.Data.Json.JS_Object
|
||||
import project.Data.Numbers.Integer
|
||||
import project.Data.Text.Text
|
||||
import project.Data.Vector.Vector
|
||||
import project.Enso_Cloud.Enso_File.Enso_Asset_Type
|
||||
@ -18,28 +19,27 @@ import project.Error.Error
|
||||
import project.Errors.File_Error.File_Error
|
||||
import project.Errors.Illegal_Argument.Illegal_Argument
|
||||
import project.Errors.Illegal_State.Illegal_State
|
||||
import project.Network.HTTP.HTTP
|
||||
import project.Network.HTTP.HTTP_Method.HTTP_Method
|
||||
import project.Network.HTTP.Request_Body.Request_Body
|
||||
import project.Network.HTTP.Request_Error
|
||||
import project.Network.URI.URI
|
||||
import project.Nothing.Nothing
|
||||
import project.Random.Random
|
||||
import project.Runtime
|
||||
import project.System.File.File
|
||||
import project.System.File.File_Access.File_Access
|
||||
import project.System.Output_Stream.Output_Stream
|
||||
from project.Data.Boolean import Boolean, False, True
|
||||
from project.Data.Text.Extensions import all
|
||||
from project.Enso_Cloud.Data_Link_Helpers import data_link_encoding, data_link_extension
|
||||
from project.Enso_Cloud.Public_Utils import get_required_field
|
||||
|
||||
polyglot java import java.lang.Thread
|
||||
|
||||
## PRIVATE
|
||||
upload_file (local_file : File) (destination : Enso_File) (replace_existing : Boolean) -> Enso_File =
|
||||
result = perform_upload destination replace_existing [local_file, destination]
|
||||
result.catch Enso_Cloud_Error error->
|
||||
is_source_file_not_found = case error of
|
||||
Enso_Cloud_Error.Connection_Error cause -> case cause of
|
||||
request_error : Request_Error -> request_error.error_type == 'java.io.FileNotFoundException'
|
||||
_ -> False
|
||||
_ -> False
|
||||
if is_source_file_not_found then Error.throw (File_Error.Not_Found local_file) else result
|
||||
perform_upload destination replace_existing [local_file, destination]
|
||||
|
||||
## PRIVATE
|
||||
A helper function that gathers the common logic for checking existence of
|
||||
@ -73,26 +73,58 @@ generic_create_asset (destination : Enso_File) (allow_existing : Boolean) (creat
|
||||
|
||||
## PRIVATE
|
||||
`generate_request_body_and_result` should return a pair,
|
||||
where the first element is the request body and the second element is the result to be returned.
|
||||
where the first element is the file to be uploaded and the second element is the result to be returned.
|
||||
It is executed lazily, only after all pre-conditions are successfully met.
|
||||
perform_upload (destination : Enso_File) (allow_existing : Boolean) (~generate_request_body_and_result) =
|
||||
generic_create_asset destination allow_existing parent_directory_asset-> existing_asset-> error_handlers->
|
||||
if existing_asset.is_nothing.not && existing_asset.asset_type != Enso_Asset_Type.File then Error.throw (Illegal_Argument.Error "The destination must be a path to a file, not "+existing_asset.asset_type.to_text+".") else
|
||||
existing_asset_id = existing_asset.if_not_nothing <| existing_asset.id
|
||||
file_name = destination.name
|
||||
base_uri = URI.from Utils.files_api
|
||||
. add_query_argument "parent_directory_id" parent_directory_asset.id
|
||||
. add_query_argument "file_name" file_name
|
||||
full_uri = case existing_asset_id of
|
||||
Nothing -> base_uri
|
||||
_ -> base_uri . add_query_argument "file_id" existing_asset_id
|
||||
base_uri = (URI.from Utils.files_api) / "upload"
|
||||
|
||||
pair = generate_request_body_and_result
|
||||
payload = pair.first : File
|
||||
result = pair.second
|
||||
file_size = payload.size
|
||||
|
||||
Asset_Cache.invalidate destination
|
||||
response = Utils.http_request_as_json HTTP_Method.Post full_uri pair.first error_handlers=error_handlers
|
||||
response.if_not_error <|
|
||||
id = get_required_field "id" response expected_type=Text
|
||||
Asset_Cache.update destination (Existing_Enso_Asset.new id file_name) . if_not_error <|
|
||||
pair.second
|
||||
upload_start_payload = JS_Object.from_pairs [["fileName", file_name], ["size", file_size]]
|
||||
upload_setup = Utils.http_request_as_json HTTP_Method.Post (base_uri / "start") upload_start_payload error_handlers=error_handlers
|
||||
|
||||
upload_setup.if_not_error <|
|
||||
presigned_urls = get_required_field "presignedUrls" upload_setup expected_type=Vector
|
||||
|
||||
# Metadata to be passed to `upload/end`
|
||||
upload_id = get_required_field "uploadId" upload_setup expected_type=Text
|
||||
source_path = get_required_field "sourcePath" upload_setup expected_type=Text
|
||||
|
||||
# Currently we upload chunks one-by-one, in the future this could be done in parallel.
|
||||
chunk_size = (file_size / presigned_urls.length).ceil
|
||||
parts = payload.with_input_stream [File_Access.Read] input_stream->
|
||||
presigned_urls.map_with_index i-> part_url->
|
||||
chunk_bytes = input_stream.read_n_bytes chunk_size
|
||||
request_body = Request_Body.Byte_Array chunk_bytes
|
||||
response = _send_chunk_with_retries part_url request_body
|
||||
e_tag = response.get_header "ETag" if_missing=(Error.throw (Illegal_State.Error "The ETag header is missing in the multipart upload response."))
|
||||
JS_Object.from_pairs [["partNumber", i + 1], ["eTag", e_tag]]
|
||||
|
||||
upload_end_payload = JS_Object.from_pairs <|
|
||||
[["uploadId", upload_id], ["sourcePath", source_path]]
|
||||
+ [["parts", parts]]
|
||||
+ [["fileName", file_name], ["parentDirectoryId", parent_directory_asset.id]]
|
||||
+ (if existing_asset.is_nothing.not then [["assetId", existing_asset.id]] else [])
|
||||
response = Utils.http_request_as_json HTTP_Method.Post (base_uri / "end") upload_end_payload error_handlers=error_handlers
|
||||
response.if_not_error <|
|
||||
id = get_required_field "id" response expected_type=Text
|
||||
Asset_Cache.update destination (Existing_Enso_Asset.new id file_name) . if_not_error <|
|
||||
result
|
||||
|
||||
private _send_chunk_with_retries uri request_body attempt:Integer=0 =
|
||||
result = HTTP.post uri request_body HTTP_Method.Put
|
||||
exponential_backoff = [100, 200, 400]
|
||||
if result.is_error.not || (attempt >= exponential_backoff.length) then result else
|
||||
sleep_time = exponential_backoff.at attempt
|
||||
Thread.sleep sleep_time
|
||||
@Tail_Call _send_chunk_with_retries uri request_body attempt+1
|
||||
|
||||
## PRIVATE
|
||||
Creates a directory at the given path, also creating parent directories if needed.
|
||||
|
@ -260,6 +260,9 @@ private _resolve_body body:Request_Body hash_function =
|
||||
## ToDo: Support hashing a file.
|
||||
hash = if hash_function.is_nothing then "" else Unimplemented.throw "Hashing a file body is not yet supported."
|
||||
Resolved_Body.Value (body_publishers.ofFile path) Nothing hash
|
||||
Request_Body.Byte_Array bytes ->
|
||||
hash = if hash_function.is_nothing then "" else hash_function bytes
|
||||
Resolved_Body.Value (body_publishers.ofByteArray bytes) Nothing hash
|
||||
Request_Body.Form_Data form_data url_encoded ->
|
||||
_resolve_form_body form_data url_encoded hash_function
|
||||
Request_Body.Empty ->
|
||||
|
@ -43,6 +43,12 @@ type Request_Body
|
||||
- file: The file to send.
|
||||
Binary (file:File=(Missing_Argument.throw "file"))
|
||||
|
||||
## PRIVATE
|
||||
ADVANCED
|
||||
Raw bytes array to be sent as binary data.
|
||||
This is mostly used for internal purposes.
|
||||
Byte_Array bytes
|
||||
|
||||
## Request body with form data.
|
||||
|
||||
Arguments:
|
||||
@ -61,10 +67,12 @@ type Request_Body
|
||||
default_content_type_header : Header | Nothing
|
||||
default_content_type_header self =
|
||||
case self of
|
||||
Request_Body.Text _ _ _ -> Header.content_type "text/plain" encoding=Encoding.utf_8
|
||||
Request_Body.Json _ -> Header.content_type "application/json"
|
||||
Request_Body.Binary _ -> Header.content_type "application/octet-stream"
|
||||
Request_Body.Form_Data _ url_encoded -> if url_encoded then Header.application_x_www_form_urlencoded else Nothing
|
||||
Request_Body.Text _ _ _ -> Header.content_type "text/plain" encoding=Encoding.utf_8
|
||||
Request_Body.Json _ -> Header.content_type "application/json"
|
||||
Request_Body.Binary _ -> Header.content_type "application/octet-stream"
|
||||
Request_Body.Byte_Array _ -> Header.content_type "application/octet-stream"
|
||||
Request_Body.Form_Data _ url_encoded ->
|
||||
if url_encoded then Header.application_x_www_form_urlencoded else Nothing
|
||||
Request_Body.Empty -> Nothing
|
||||
|
||||
## PRIVATE
|
||||
|
@ -433,19 +433,19 @@ Any.should_be_a self typ =
|
||||
if a.constructor == c then Spec_Result.Success else
|
||||
expected_type = c.declaring_type.qualified_name
|
||||
actual_type = Meta.get_qualified_type_name self
|
||||
message = "Expected a value of type "+expected_type+", built with constructor "+c.name+", but got a value of type "+actual_type+", built with constructor "+a.constructor.name+" instead (at "+loc+")."
|
||||
message = "Expected a value of type "+expected_type+", built with constructor "+c.name+", but got a value ["+self.to_text+"] of type "+actual_type+", built with constructor "+a.constructor.name+" instead (at "+loc+")."
|
||||
Test.fail message
|
||||
_ ->
|
||||
expected_type = c.declaring_type.qualified_name
|
||||
actual_type = Meta.get_qualified_type_name self
|
||||
message = "Expected a value of type "+expected_type+", built with constructor "+c.name+", but got a value of type "+actual_type+" instead (at "+loc+")."
|
||||
message = "Expected a value of type "+expected_type+", built with constructor "+c.name+", but got a value ["+self.to_text+"] of type "+actual_type+" instead (at "+loc+")."
|
||||
Test.fail message
|
||||
meta_type : Meta.Type ->
|
||||
ok = self.is_a typ || self==typ
|
||||
if ok then Spec_Result.Success else
|
||||
expected_type = meta_type.qualified_name
|
||||
actual_type = Meta.get_qualified_type_name self
|
||||
message = "Expected a value of type "+expected_type+" but got a value of type "+actual_type+" instead (at "+loc+")."
|
||||
message = "Expected a value of type "+expected_type+" but got a value ["+self.to_text+"] of type "+actual_type+" instead (at "+loc+")."
|
||||
Test.fail message
|
||||
# Workaround for 0-argument atom constructors which 'unapplies' them.
|
||||
atom : Meta.Atom ->
|
||||
@ -456,13 +456,13 @@ Any.should_be_a self typ =
|
||||
ok = self.is_a typ
|
||||
if ok then Spec_Result.Success else
|
||||
actual_type = Meta.get_qualified_type_name self
|
||||
message = "Expected a value of Java class "+typ.to_text+" but got a value of type "+actual_type+" instead (at "+loc+")."
|
||||
message = "Expected a value of Java class "+typ.to_text+" but got a value ["+self.to_text+"] of type "+actual_type+" instead (at "+loc+")."
|
||||
Test.fail message
|
||||
Meta.Primitive.Value (b : Boolean) ->
|
||||
ok = self == b
|
||||
if ok then Spec_Result.Success else
|
||||
actual_type = Meta.get_qualified_type_name self
|
||||
message = "Expected a value of "+typ.to_text+" but got a value of type "+actual_type+" instead (at "+loc+")."
|
||||
message = "Expected a value of "+typ.to_text+" but got a value ["+self.to_text+"] of type "+actual_type+" instead (at "+loc+")."
|
||||
Test.fail message
|
||||
_ -> fail_on_wrong_arg_type
|
||||
|
||||
|
@ -63,8 +63,8 @@ maybe_grey_text (text : Text) (config : Suite_Config) =
|
||||
if config.use_ansi_colors then (grey text) else text
|
||||
|
||||
## Print result for a single Spec run
|
||||
print_single_result : Test_Result -> Suite_Config -> Nothing
|
||||
print_single_result (test_result : Test_Result) (config : Suite_Config) =
|
||||
print_single_result : Text -> Test_Result -> Suite_Config -> Nothing
|
||||
print_single_result (group_name : Text) (test_result : Test_Result) (config : Suite_Config) =
|
||||
times_suffix =
|
||||
times = test_result.time_taken.total_milliseconds.to_text + "ms"
|
||||
"[" + times + "]"
|
||||
@ -79,7 +79,7 @@ print_single_result (test_result : Test_Result) (config : Suite_Config) =
|
||||
txt = " - " + test_result.spec_name + " " + times_suffix
|
||||
IO.println (maybe_green_text txt config)
|
||||
Spec_Result.Failure msg details ->
|
||||
report_github_error_message test_result.spec_name msg
|
||||
report_github_error_message group_name+": "+test_result.spec_name msg
|
||||
IO.println ""
|
||||
txt = " - [FAILED] " + test_result.spec_name + " " + times_suffix
|
||||
IO.println (maybe_red_text txt config)
|
||||
@ -94,7 +94,7 @@ print_single_result (test_result : Test_Result) (config : Suite_Config) =
|
||||
## PRIVATE
|
||||
Reports an error message to show up as a note in GitHub Actions,
|
||||
only if running in the GitHub Actions environment.
|
||||
report_github_error_message (title : Text) (message : Text) =
|
||||
report_github_error_message (~title : Text) (~message : Text) =
|
||||
is_enabled = Environment.get "GITHUB_ACTIONS" == "true"
|
||||
if is_enabled then
|
||||
IO.println (generate_github_error_annotation title message)
|
||||
@ -102,8 +102,8 @@ report_github_error_message (title : Text) (message : Text) =
|
||||
## PRIVATE
|
||||
Generates a GitHub Actions annotation for a failing test.
|
||||
generate_github_error_annotation (title : Text) (message : Text) =
|
||||
sanitize_message txt = txt.replace '\n' '%0A'
|
||||
sanitize_parameter txt = txt . replace '\n' '%0A' . replace ',' '%2C'
|
||||
sanitize_message txt = txt.replace '\n' '%0A' . replace '::' '%3A%3A'
|
||||
sanitize_parameter txt = sanitize_message txt . replace ',' '%2C'
|
||||
|
||||
location_match = Regex.compile "\(at ((?:[A-Za-z]:)?[^:]+):([0-9\-]+):([0-9\-]+)\)" . match message
|
||||
split_on_dash start_field_name end_field_name text =
|
||||
@ -215,7 +215,7 @@ print_group_report group_name test_results config builder =
|
||||
True -> maybe_red_text ("[FAILED] " + group_description) config
|
||||
False -> maybe_green_text group_description config
|
||||
test_results.each result->
|
||||
print_single_result result config
|
||||
print_single_result group_name result config
|
||||
|
||||
## PRIVATE
|
||||
Escape Text for XML
|
||||
|
@ -138,6 +138,23 @@ add_specs suite_builder setup:Cloud_Tests_Setup = suite_builder.group "Enso Clou
|
||||
"hi!".utf_8.write_bytes f . should_succeed
|
||||
f.read ..Plain_Text . should_equal "hi!"
|
||||
|
||||
group_builder.specify "should be able to upload and download big files" <|
|
||||
f = test_root.get / "big_file.txt"
|
||||
# Let's say 42MB is big enough of a stress test
|
||||
kilobytes_to_write = 42 * 1024
|
||||
one_kilobyte = "A".repeat 1024 . utf_8
|
||||
r = f.with_output_stream [File_Access.Write] output_stream->
|
||||
0.up_to kilobytes_to_write . each _->
|
||||
output_stream.write_bytes one_kilobyte
|
||||
r.should_succeed
|
||||
# Check that the uploaded size is as expected
|
||||
f.size . should_equal (kilobytes_to_write * 1024)
|
||||
|
||||
# And now try to download it
|
||||
v = f.read Bytes
|
||||
v.should_be_a Vector
|
||||
v.length . should_equal f.size
|
||||
|
||||
group_builder.specify "does not currently support append" <|
|
||||
f = test_root.get / "written_file3.txt"
|
||||
Test.expect_panic Unimplemented <|
|
||||
|
Loading…
Reference in New Issue
Block a user