Add retries to Data.read (#11269)

* Add retries to HTTP Get requests

A quick solution to random network failures for GET HTTP requests.
Should reduce the number of IOExceptions that users see while fetching
data.

* Use homemade retry logic for http requests

* Add retries to whole Data.read

Previously, we added retries only to fetch HTTP_Request. That was
insufficient as intermittent errors might happen while reading body's
stream.

Enhanced our simple server's crash endpoint to allow for different kind
of failures as well as simulate random failures.

* Remove retries from Java

Increased the scope of retries in the previous commit.

* nit

* Address PR comments

* PR comments

* Remove builtin
This commit is contained in:
Hubert Plociniczak 2024-10-10 10:25:16 +02:00 committed by GitHub
parent 669ac97d9e
commit 468b643aad
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 104 additions and 8 deletions

View File

@ -5,11 +5,13 @@ import project.Data.Text.Text
import project.Data.Vector.Vector
import project.Enso_Cloud.Data_Link.Data_Link
import project.Enso_Cloud.Data_Link_Helpers
import project.Error.Error
import project.Errors.Deprecated.Deprecated
import project.Errors.Problem_Behavior.Problem_Behavior
import project.Metadata.Display
import project.Metadata.Widget
import project.Network.HTTP.HTTP
import project.Network.HTTP.HTTP_Error.HTTP_Error
import project.Network.HTTP.HTTP_Method.HTTP_Method
import project.Network.URI.URI
import project.Warning.Warning
@ -20,6 +22,8 @@ from project.Metadata.Choice import Option
from project.Metadata.Widget import Single_Choice
from project.System.File_Format import Auto_Detect, format_types
polyglot java import java.lang.Thread
## PRIVATE
looks_like_uri path:Text -> Boolean =
(path.starts_with "http://" Case_Sensitivity.Insensitive) || (path.starts_with "https://" Case_Sensitivity.Insensitive)
@ -28,8 +32,19 @@ looks_like_uri path:Text -> Boolean =
A common implementation for fetching a resource and decoding it,
following encountered data links.
fetch_following_data_links (uri:URI) (method:HTTP_Method = HTTP_Method.Get) (headers:Vector = []) format =
response = HTTP.fetch uri method headers
decode_http_response_following_data_links response format
fetch_and_decode =
response = HTTP.fetch uri method headers
decode_http_response_following_data_links response format
error_handler attempt =
caught_error ->
exponential_backoff = [100, 200, 400]
if method != HTTP_Method.Get || attempt >= exponential_backoff.length then Error.throw caught_error else
sleep_time = exponential_backoff . get attempt
Thread.sleep sleep_time
fetch_and_decode . catch HTTP_Error (error_handler (attempt + 1))
fetch_and_decode . catch HTTP_Error (error_handler 0)
## PRIVATE
Decodes a HTTP response, handling data link access.

View File

@ -2,6 +2,7 @@
Internal threading utilities used for working with threads.
import project.Any.Any
import project.Data.Numbers.Integer
## PRIVATE
ADVANCED

View File

@ -591,7 +591,7 @@ add_specs suite_builder =
## Checking this error partially as a warning - I spent a lot of time debugging why I'm getting such an error.
Apparently it happens when the httpbin server was crashing without sending any response.
group_builder.specify "should be able to handle server crash resulting in no response" pending=pending_has_url <| Test.with_retries <|
group_builder.specify "should be able to handle server crash resulting in no response" pending=pending_has_url <|
err = Data.fetch (base_url_with_slash+"crash")
err.should_fail_with Request_Error
err.catch.error_type . should_equal "java.io.IOException"
@ -600,10 +600,20 @@ add_specs suite_builder =
I think it may be worth adding, because it may be really quite confusing for end users who get that kind of error.
err.catch.message . should_equal "HTTP/1.1 header parser received no bytes"
group_builder.specify "should be able to handle IO errors" pending="TODO: Currently I was unable to figure out a way to test such errors" <|
# how to trigger this error???
err = Data.fetch "TODO"
group_builder.specify "should be able to handle occasional server crashes and retry" pending=pending_has_url <|
r1 = Data.fetch (base_url_with_slash+"crash?success_every=2")
r1.should_succeed
r1.should_be_a Response
group_builder.specify "should be able to handle server crash that closes stream abruptly" pending=pending_has_url <|
err = Data.fetch (base_url_with_slash+"crash?type=stream")
err.should_fail_with HTTP_Error
err.catch.message . should_equal "An IO error has occurred: java.io.IOException: closed"
group_builder.specify "should be able to handle occasional abrupt stream closures and retry" pending=pending_has_url <|
r1 = Data.fetch (base_url_with_slash+"crash?type=stream&success_every=2")
r1.should_succeed
r1.should_be_a Response
suite_builder.group "Http Auth" group_builder->
group_builder.specify "should support Basic user+password authentication" pending=pending_has_url <| Test.with_retries <|

View File

@ -2,13 +2,83 @@ package org.enso.shttp.test_helpers;
import com.sun.net.httpserver.HttpExchange;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import org.apache.http.client.utils.URIBuilder;
import org.enso.shttp.SimpleHttpHandler;
public class CrashingTestHandler extends SimpleHttpHandler {
private int requests = 0;
@Override
protected void doHandle(HttpExchange exchange) throws IOException {
// This exception will be logged by SimpleHttpHandler, but that's OK - let's know that this
// Exceptions will be logged by SimpleHttpHandler, but that's OK - let's know that this
// crash is happening.
throw new RuntimeException("This handler crashes on purpose.");
URI uri = exchange.getRequestURI();
URIBuilder builder = new URIBuilder(uri);
final String successEveryParam = "success_every";
final String crashTypeParam = "type";
int successEvery = 0;
CrashType crashType = CrashType.RUNTIME;
for (var queryPair : builder.getQueryParams()) {
if (queryPair.getName().equals(successEveryParam)) {
successEvery = Integer.decode(queryPair.getValue());
} else if (queryPair.getName().equals(crashTypeParam)) {
crashType =
switch (queryPair.getValue()) {
case "stream" -> CrashType.STREAM;
default -> CrashType.RUNTIME;
};
}
}
if (successEvery == 0) {
// Reset counter
requests = 0;
}
boolean shouldSucceed = successEvery == (requests + 1);
switch (crashType) {
case RUNTIME:
if (shouldSucceed) {
// return OK, reset
requests = 0;
exchange.sendResponseHeaders(200, -1);
exchange.close();
break;
} else {
requests += 1;
throw new RuntimeException("This handler crashes on purpose.");
}
case STREAM:
byte[] responseData = "Crash and Burn".getBytes();
exchange.sendResponseHeaders(200, responseData.length);
try {
if (shouldSucceed) {
requests = 0;
// return OK, reset
try (OutputStream os = exchange.getResponseBody()) {
os.write(responseData, 0, responseData.length);
}
} else {
requests += 1;
try (OutputStream os = exchange.getResponseBody()) {
os.write(responseData, 0, responseData.length / 2);
}
}
} finally {
exchange.close();
}
break;
}
}
enum CrashType {
RUNTIME, // Simulate an internal server crash
STREAM // Simulate a crash by abruptly closing response's body stream
}
}