mirror of
https://github.com/digital-asset/daml.git
synced 2024-11-05 03:56:26 +03:00
e40aad897f
* move most files * update CI configuration
563 lines
26 KiB
Diff
563 lines
26 KiB
Diff
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java
|
|
index 57741a8f28..6673149a20 100644
|
|
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java
|
|
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java
|
|
@@ -56,15 +56,16 @@ public final class RemoteCacheClientFactory {
|
|
RemoteOptions options,
|
|
@Nullable Credentials creds,
|
|
Path workingDirectory,
|
|
- DigestUtil digestUtil)
|
|
+ DigestUtil digestUtil,
|
|
+ RemoteRetrier retrier)
|
|
throws IOException {
|
|
Preconditions.checkNotNull(workingDirectory, "workingDirectory");
|
|
if (isHttpCache(options) && isDiskCache(options)) {
|
|
return createDiskAndHttpCache(
|
|
- workingDirectory, options.diskCache, options, creds, digestUtil);
|
|
+ workingDirectory, options.diskCache, options, creds, digestUtil, retrier);
|
|
}
|
|
if (isHttpCache(options)) {
|
|
- return createHttp(options, creds, digestUtil);
|
|
+ return createHttp(options, creds, digestUtil, retrier);
|
|
}
|
|
if (isDiskCache(options)) {
|
|
return createDiskCache(
|
|
@@ -80,7 +81,7 @@ public final class RemoteCacheClientFactory {
|
|
}
|
|
|
|
private static RemoteCacheClient createHttp(
|
|
- RemoteOptions options, Credentials creds, DigestUtil digestUtil) {
|
|
+ RemoteOptions options, Credentials creds, DigestUtil digestUtil, RemoteRetrier retrier) {
|
|
Preconditions.checkNotNull(options.remoteCache, "remoteCache");
|
|
|
|
try {
|
|
@@ -99,6 +100,7 @@ public final class RemoteCacheClientFactory {
|
|
options.remoteVerifyDownloads,
|
|
ImmutableList.copyOf(options.remoteHeaders),
|
|
digestUtil,
|
|
+ retrier,
|
|
creds);
|
|
} else {
|
|
throw new Exception("Remote cache proxy unsupported: " + options.remoteProxy);
|
|
@@ -111,6 +113,7 @@ public final class RemoteCacheClientFactory {
|
|
options.remoteVerifyDownloads,
|
|
ImmutableList.copyOf(options.remoteHeaders),
|
|
digestUtil,
|
|
+ retrier,
|
|
creds);
|
|
}
|
|
} catch (Exception e) {
|
|
@@ -137,7 +140,8 @@ public final class RemoteCacheClientFactory {
|
|
PathFragment diskCachePath,
|
|
RemoteOptions options,
|
|
Credentials cred,
|
|
- DigestUtil digestUtil)
|
|
+ DigestUtil digestUtil,
|
|
+ RemoteRetrier retrier)
|
|
throws IOException {
|
|
Path cacheDir =
|
|
workingDirectory.getRelative(Preconditions.checkNotNull(diskCachePath, "diskCachePath"));
|
|
@@ -145,7 +149,7 @@ public final class RemoteCacheClientFactory {
|
|
cacheDir.createDirectoryAndParents();
|
|
}
|
|
|
|
- RemoteCacheClient httpCache = createHttp(options, cred, digestUtil);
|
|
+ RemoteCacheClient httpCache = createHttp(options, cred, digestUtil, retrier);
|
|
return createDiskAndRemoteClient(
|
|
workingDirectory,
|
|
diskCachePath,
|
|
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
|
|
index 0e165d1521..ff33a84afb 100644
|
|
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
|
|
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
|
|
@@ -64,9 +64,11 @@ import com.google.devtools.build.lib.exec.ModuleActionContextRegistry;
|
|
import com.google.devtools.build.lib.exec.SpawnStrategyRegistry;
|
|
import com.google.devtools.build.lib.packages.TargetUtils;
|
|
import com.google.devtools.build.lib.remote.RemoteServerCapabilities.ServerCapabilitiesRequirement;
|
|
+import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
|
|
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
|
|
import com.google.devtools.build.lib.remote.common.RemoteExecutionClient;
|
|
import com.google.devtools.build.lib.remote.downloader.GrpcRemoteDownloader;
|
|
+import com.google.devtools.build.lib.remote.http.HttpException;
|
|
import com.google.devtools.build.lib.remote.logging.LoggingInterceptor;
|
|
import com.google.devtools.build.lib.remote.options.RemoteOptions;
|
|
import com.google.devtools.build.lib.remote.options.RemoteOutputsMode;
|
|
@@ -103,6 +105,7 @@ import io.reactivex.rxjava3.plugins.RxJavaPlugins;
|
|
import java.io.IOException;
|
|
import java.net.URI;
|
|
import java.net.URISyntaxException;
|
|
+import java.nio.channels.ClosedChannelException;
|
|
import java.util.HashSet;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
@@ -232,7 +235,31 @@ public final class RemoteModule extends BlazeModule {
|
|
remoteOptions,
|
|
creds,
|
|
Preconditions.checkNotNull(env.getWorkingDirectory(), "workingDirectory"),
|
|
- digestUtil);
|
|
+ digestUtil,
|
|
+ new RemoteRetrier(
|
|
+ remoteOptions,
|
|
+ (e) -> {
|
|
+ boolean retry = false;
|
|
+ if (e instanceof ClosedChannelException) {
|
|
+ retry = true;
|
|
+ } else if (e instanceof HttpException) {
|
|
+ retry = true;
|
|
+ } else if (e instanceof IOException) {
|
|
+ String msg = e.getMessage().toLowerCase();
|
|
+ if (msg.contains("connection reset by peer")) {
|
|
+ retry = true;
|
|
+ }
|
|
+ }
|
|
+ if (retry) {
|
|
+ System.err.println("RETRYING: " + e.toString());
|
|
+ } else if (!(e instanceof CacheNotFoundException)) {
|
|
+ System.err.println("NOT RETRYING: " + e.toString());
|
|
+ }
|
|
+ return retry;
|
|
+ },
|
|
+ retryScheduler,
|
|
+ Retrier.ALLOW_ALL_CALLS)
|
|
+ );
|
|
} catch (IOException e) {
|
|
handleInitFailure(env, e, Code.CACHE_INIT_FAILURE);
|
|
return;
|
|
diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/BUILD b/src/main/java/com/google/devtools/build/lib/remote/http/BUILD
|
|
index 9ce71c7c52..5c7f2d0728 100644
|
|
--- a/src/main/java/com/google/devtools/build/lib/remote/http/BUILD
|
|
+++ b/src/main/java/com/google/devtools/build/lib/remote/http/BUILD
|
|
@@ -20,6 +20,7 @@ java_library(
|
|
deps = [
|
|
"//src/main/java/com/google/devtools/build/lib/analysis:blaze_version_info",
|
|
"//src/main/java/com/google/devtools/build/lib/remote/common",
|
|
+ "//src/main/java/com/google/devtools/build/lib/remote:Retrier",
|
|
"//src/main/java/com/google/devtools/build/lib/remote/util",
|
|
"//src/main/java/com/google/devtools/build/lib/vfs",
|
|
"//third_party:auth",
|
|
diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java b/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java
|
|
index a2e4abf9d8..93843a91dc 100644
|
|
--- a/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java
|
|
+++ b/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java
|
|
@@ -25,12 +25,18 @@ final class DownloadCommand {
|
|
private final boolean casDownload;
|
|
private final Digest digest;
|
|
private final OutputStream out;
|
|
+ private final long offset;
|
|
|
|
- DownloadCommand(URI uri, boolean casDownload, Digest digest, OutputStream out) {
|
|
+ DownloadCommand(URI uri, boolean casDownload, Digest digest, OutputStream out, long offset) {
|
|
this.uri = Preconditions.checkNotNull(uri);
|
|
this.casDownload = casDownload;
|
|
this.digest = Preconditions.checkNotNull(digest);
|
|
this.out = Preconditions.checkNotNull(out);
|
|
+ this.offset = offset;
|
|
+ }
|
|
+
|
|
+ DownloadCommand(URI uri, boolean casDownload, Digest digest, OutputStream out) {
|
|
+ this(uri, casDownload, digest, out, 0);
|
|
}
|
|
|
|
public URI uri() {
|
|
@@ -48,4 +54,6 @@ final class DownloadCommand {
|
|
public OutputStream out() {
|
|
return out;
|
|
}
|
|
+
|
|
+ public long offset() { return offset; }
|
|
}
|
|
diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java
|
|
index 3cbf6cf47f..4453d5fcc6 100644
|
|
--- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java
|
|
+++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java
|
|
@@ -24,6 +24,7 @@ import com.google.common.util.concurrent.Futures;
|
|
import com.google.common.util.concurrent.ListenableFuture;
|
|
import com.google.common.util.concurrent.MoreExecutors;
|
|
import com.google.common.util.concurrent.SettableFuture;
|
|
+import com.google.devtools.build.lib.remote.RemoteRetrier;
|
|
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
|
|
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
|
|
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
|
|
@@ -82,6 +83,7 @@ import java.util.NoSuchElementException;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
+import java.util.concurrent.atomic.AtomicLong;
|
|
import java.util.function.Function;
|
|
import java.util.regex.Pattern;
|
|
import javax.annotation.Nullable;
|
|
@@ -130,6 +132,7 @@ public final class HttpCacheClient implements RemoteCacheClient {
|
|
private final boolean useTls;
|
|
private final boolean verifyDownloads;
|
|
private final DigestUtil digestUtil;
|
|
+ private final RemoteRetrier retrier;
|
|
|
|
private final Object closeLock = new Object();
|
|
|
|
@@ -151,6 +154,7 @@ public final class HttpCacheClient implements RemoteCacheClient {
|
|
boolean verifyDownloads,
|
|
ImmutableList<Entry<String, String>> extraHttpHeaders,
|
|
DigestUtil digestUtil,
|
|
+ RemoteRetrier retrier,
|
|
@Nullable final Credentials creds)
|
|
throws Exception {
|
|
return new HttpCacheClient(
|
|
@@ -162,6 +166,7 @@ public final class HttpCacheClient implements RemoteCacheClient {
|
|
verifyDownloads,
|
|
extraHttpHeaders,
|
|
digestUtil,
|
|
+ retrier,
|
|
creds,
|
|
null);
|
|
}
|
|
@@ -174,6 +179,7 @@ public final class HttpCacheClient implements RemoteCacheClient {
|
|
boolean verifyDownloads,
|
|
ImmutableList<Entry<String, String>> extraHttpHeaders,
|
|
DigestUtil digestUtil,
|
|
+ RemoteRetrier retrier,
|
|
@Nullable final Credentials creds)
|
|
throws Exception {
|
|
|
|
@@ -187,6 +193,7 @@ public final class HttpCacheClient implements RemoteCacheClient {
|
|
verifyDownloads,
|
|
extraHttpHeaders,
|
|
digestUtil,
|
|
+ retrier,
|
|
creds,
|
|
domainSocketAddress);
|
|
} else if (Epoll.isAvailable()) {
|
|
@@ -199,6 +206,7 @@ public final class HttpCacheClient implements RemoteCacheClient {
|
|
verifyDownloads,
|
|
extraHttpHeaders,
|
|
digestUtil,
|
|
+ retrier,
|
|
creds,
|
|
domainSocketAddress);
|
|
} else {
|
|
@@ -215,6 +223,7 @@ public final class HttpCacheClient implements RemoteCacheClient {
|
|
boolean verifyDownloads,
|
|
ImmutableList<Entry<String, String>> extraHttpHeaders,
|
|
DigestUtil digestUtil,
|
|
+ RemoteRetrier retrier,
|
|
@Nullable final Credentials creds,
|
|
@Nullable SocketAddress socketAddress)
|
|
throws Exception {
|
|
@@ -283,6 +292,7 @@ public final class HttpCacheClient implements RemoteCacheClient {
|
|
this.extraHttpHeaders = extraHttpHeaders;
|
|
this.verifyDownloads = verifyDownloads;
|
|
this.digestUtil = digestUtil;
|
|
+ this.retrier = retrier;
|
|
}
|
|
|
|
@SuppressWarnings("FutureReturnValueIgnored")
|
|
@@ -459,6 +469,7 @@ public final class HttpCacheClient implements RemoteCacheClient {
|
|
@SuppressWarnings("FutureReturnValueIgnored")
|
|
private ListenableFuture<Void> get(Digest digest, final OutputStream out, boolean casDownload) {
|
|
final AtomicBoolean dataWritten = new AtomicBoolean();
|
|
+ AtomicLong bytesDownloaded = new AtomicLong();
|
|
OutputStream wrappedOut =
|
|
new OutputStream() {
|
|
// OutputStream.close() does nothing, which is what we want to ensure that the
|
|
@@ -468,12 +479,14 @@ public final class HttpCacheClient implements RemoteCacheClient {
|
|
@Override
|
|
public void write(byte[] b, int offset, int length) throws IOException {
|
|
dataWritten.set(true);
|
|
+ bytesDownloaded.addAndGet(length);
|
|
out.write(b, offset, length);
|
|
}
|
|
|
|
@Override
|
|
public void write(int b) throws IOException {
|
|
dataWritten.set(true);
|
|
+ bytesDownloaded.incrementAndGet();
|
|
out.write(b);
|
|
}
|
|
|
|
@@ -482,57 +495,59 @@ public final class HttpCacheClient implements RemoteCacheClient {
|
|
out.flush();
|
|
}
|
|
};
|
|
- DownloadCommand downloadCmd = new DownloadCommand(uri, casDownload, digest, wrappedOut);
|
|
- SettableFuture<Void> outerF = SettableFuture.create();
|
|
- acquireDownloadChannel()
|
|
- .addListener(
|
|
- (Future<Channel> channelPromise) -> {
|
|
- if (!channelPromise.isSuccess()) {
|
|
- outerF.setException(channelPromise.cause());
|
|
- return;
|
|
- }
|
|
+ return retrier.executeAsync(() -> {
|
|
+ DownloadCommand downloadCmd = new DownloadCommand(uri, casDownload, digest, wrappedOut, bytesDownloaded.get());
|
|
+ SettableFuture<Void> outerF = SettableFuture.create();
|
|
+ acquireDownloadChannel()
|
|
+ .addListener(
|
|
+ (Future<Channel> channelPromise) -> {
|
|
+ if (!channelPromise.isSuccess()) {
|
|
+ outerF.setException(channelPromise.cause());
|
|
+ return;
|
|
+ }
|
|
|
|
- Channel ch = channelPromise.getNow();
|
|
- ch.writeAndFlush(downloadCmd)
|
|
- .addListener(
|
|
- (f) -> {
|
|
- try {
|
|
- if (f.isSuccess()) {
|
|
- outerF.set(null);
|
|
- } else {
|
|
- Throwable cause = f.cause();
|
|
- // cause can be of type HttpException, because Netty uses
|
|
- // Unsafe.throwException to
|
|
- // re-throw a checked exception that hasn't been declared in the method
|
|
- // signature.
|
|
- if (cause instanceof HttpException) {
|
|
- HttpResponse response = ((HttpException) cause).response();
|
|
- if (!dataWritten.get() && authTokenExpired(response)) {
|
|
- // The error is due to an auth token having expired. Let's try
|
|
- // again.
|
|
- try {
|
|
- refreshCredentials();
|
|
- getAfterCredentialRefresh(downloadCmd, outerF);
|
|
+ Channel ch = channelPromise.getNow();
|
|
+ ch.writeAndFlush(downloadCmd)
|
|
+ .addListener(
|
|
+ (f) -> {
|
|
+ try {
|
|
+ if (f.isSuccess()) {
|
|
+ outerF.set(null);
|
|
+ } else {
|
|
+ Throwable cause = f.cause();
|
|
+ // cause can be of type HttpException, because Netty uses
|
|
+ // Unsafe.throwException to
|
|
+ // re-throw a checked exception that hasn't been declared in the method
|
|
+ // signature.
|
|
+ if (cause instanceof HttpException) {
|
|
+ HttpResponse response = ((HttpException) cause).response();
|
|
+ if (!dataWritten.get() && authTokenExpired(response)) {
|
|
+ // The error is due to an auth token having expired. Let's try
|
|
+ // again.
|
|
+ try {
|
|
+ refreshCredentials();
|
|
+ getAfterCredentialRefresh(downloadCmd, outerF);
|
|
+ return;
|
|
+ } catch (IOException e) {
|
|
+ cause.addSuppressed(e);
|
|
+ } catch (RuntimeException e) {
|
|
+ logger.atWarning().withCause(e).log("Unexpected exception");
|
|
+ cause.addSuppressed(e);
|
|
+ }
|
|
+ } else if (cacheMiss(response.status())) {
|
|
+ outerF.setException(new CacheNotFoundException(digest));
|
|
return;
|
|
- } catch (IOException e) {
|
|
- cause.addSuppressed(e);
|
|
- } catch (RuntimeException e) {
|
|
- logger.atWarning().withCause(e).log("Unexpected exception");
|
|
- cause.addSuppressed(e);
|
|
}
|
|
- } else if (cacheMiss(response.status())) {
|
|
- outerF.setException(new CacheNotFoundException(digest));
|
|
- return;
|
|
}
|
|
+ outerF.setException(cause);
|
|
}
|
|
- outerF.setException(cause);
|
|
+ } finally {
|
|
+ releaseDownloadChannel(ch);
|
|
}
|
|
- } finally {
|
|
- releaseDownloadChannel(ch);
|
|
- }
|
|
- });
|
|
- });
|
|
- return outerF;
|
|
+ });
|
|
+ });
|
|
+ return outerF;
|
|
+ });
|
|
}
|
|
|
|
@SuppressWarnings("FutureReturnValueIgnored")
|
|
@@ -674,20 +689,21 @@ public final class HttpCacheClient implements RemoteCacheClient {
|
|
@Override
|
|
public ListenableFuture<Void> uploadFile(
|
|
RemoteActionExecutionContext context, Digest digest, Path file) {
|
|
- try {
|
|
- return uploadAsync(
|
|
- digest.getHash(), digest.getSizeBytes(), file.getInputStream(), /* casUpload= */ true);
|
|
- } catch (IOException e) {
|
|
- // Can be thrown from file.getInputStream.
|
|
- return Futures.immediateFailedFuture(e);
|
|
- }
|
|
+ return retrier.executeAsync(() -> {
|
|
+ try {
|
|
+ return uploadAsync(digest.getHash(), digest.getSizeBytes(), file.getInputStream(), /* casUpload= */ true);
|
|
+ } catch (IOException e) {
|
|
+ // Can be thrown from file.getInputStream.
|
|
+ return Futures.immediateFailedFuture(e);
|
|
+ }
|
|
+ });
|
|
}
|
|
|
|
@Override
|
|
public ListenableFuture<Void> uploadBlob(
|
|
RemoteActionExecutionContext context, Digest digest, ByteString data) {
|
|
- return uploadAsync(
|
|
- digest.getHash(), digest.getSizeBytes(), data.newInput(), /* casUpload= */ true);
|
|
+ return retrier.executeAsync(() -> uploadAsync(
|
|
+ digest.getHash(), digest.getSizeBytes(), data.newInput(), /* casUpload= */ true));
|
|
}
|
|
|
|
@Override
|
|
diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java
|
|
index 50d83d138a..f38dad965f 100644
|
|
--- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java
|
|
+++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java
|
|
@@ -20,24 +20,18 @@ import com.google.common.collect.ImmutableList;
|
|
import io.netty.buffer.ByteBuf;
|
|
import io.netty.channel.ChannelHandlerContext;
|
|
import io.netty.channel.ChannelPromise;
|
|
-import io.netty.handler.codec.http.DefaultFullHttpRequest;
|
|
-import io.netty.handler.codec.http.HttpContent;
|
|
-import io.netty.handler.codec.http.HttpHeaderNames;
|
|
-import io.netty.handler.codec.http.HttpHeaderValues;
|
|
-import io.netty.handler.codec.http.HttpMethod;
|
|
-import io.netty.handler.codec.http.HttpObject;
|
|
-import io.netty.handler.codec.http.HttpRequest;
|
|
-import io.netty.handler.codec.http.HttpResponse;
|
|
-import io.netty.handler.codec.http.HttpResponseStatus;
|
|
-import io.netty.handler.codec.http.HttpUtil;
|
|
-import io.netty.handler.codec.http.HttpVersion;
|
|
-import io.netty.handler.codec.http.LastHttpContent;
|
|
+import io.netty.handler.codec.http.*;
|
|
import io.netty.handler.timeout.ReadTimeoutException;
|
|
import io.netty.util.internal.StringUtil;
|
|
+
|
|
import java.io.ByteArrayOutputStream;
|
|
import java.io.IOException;
|
|
import java.io.OutputStream;
|
|
import java.util.Map.Entry;
|
|
+import java.util.Optional;
|
|
+import java.util.OptionalInt;
|
|
+import java.util.regex.Matcher;
|
|
+import java.util.regex.Pattern;
|
|
|
|
/** ChannelHandler for downloads. */
|
|
final class HttpDownloadHandler extends AbstractHttpHandler<HttpObject> {
|
|
@@ -51,6 +45,10 @@ final class HttpDownloadHandler extends AbstractHttpHandler<HttpObject> {
|
|
private long contentLength = -1;
|
|
/** the path header in the http request */
|
|
private String path;
|
|
+ /** the offset at which to download */
|
|
+ private long offset;
|
|
+ /** the bytes to skip in a full or chunked response */
|
|
+ private OptionalInt skipBytes;
|
|
|
|
public HttpDownloadHandler(
|
|
Credentials credentials, ImmutableList<Entry<String, String>> extraHttpHeaders) {
|
|
@@ -93,7 +91,25 @@ final class HttpDownloadHandler extends AbstractHttpHandler<HttpObject> {
|
|
if (contentLengthSet) {
|
|
contentLength = HttpUtil.getContentLength(response);
|
|
}
|
|
- downloadSucceeded = response.status().equals(HttpResponseStatus.OK);
|
|
+ boolean full_content = response.status().equals(HttpResponseStatus.OK);
|
|
+ boolean partial_content = response.status().equals(HttpResponseStatus.PARTIAL_CONTENT);
|
|
+ if (full_content) {
|
|
+ if (offset != 0) {
|
|
+ // We requested a range but the server replied with a full response.
|
|
+ // We need to skip `offset` bytes of the response.
|
|
+ if (!skipBytes.isPresent()) {
|
|
+ // This is the first chunk, or the full response.
|
|
+ skipBytes = OptionalInt.of((int)offset);
|
|
+ }
|
|
+ }
|
|
+ } else if (partial_content) {
|
|
+ Optional<HttpException> error = validateContentRangeHeader(response.headers());
|
|
+ if (error.isPresent()) {
|
|
+ failAndClose(error.get(), ctx);
|
|
+ return;
|
|
+ }
|
|
+ }
|
|
+ downloadSucceeded = full_content || partial_content;
|
|
if (!downloadSucceeded) {
|
|
out = new ByteArrayOutputStream();
|
|
}
|
|
@@ -105,6 +121,15 @@ final class HttpDownloadHandler extends AbstractHttpHandler<HttpObject> {
|
|
|
|
ByteBuf content = ((HttpContent) msg).content();
|
|
int readableBytes = content.readableBytes();
|
|
+ if (skipBytes.isPresent() && skipBytes.getAsInt() > 0) {
|
|
+ int skipNow = skipBytes.getAsInt();
|
|
+ if (skipNow >= readableBytes) {
|
|
+ skipNow = readableBytes;
|
|
+ }
|
|
+ content.readerIndex(content.readerIndex() + skipNow);
|
|
+ skipBytes = OptionalInt.of(skipBytes.getAsInt() - skipNow);
|
|
+ readableBytes = readableBytes - skipNow;
|
|
+ }
|
|
content.readBytes(out, readableBytes);
|
|
bytesReceived += readableBytes;
|
|
if (msg instanceof LastHttpContent) {
|
|
@@ -137,7 +162,9 @@ final class HttpDownloadHandler extends AbstractHttpHandler<HttpObject> {
|
|
DownloadCommand cmd = (DownloadCommand) msg;
|
|
out = cmd.out();
|
|
path = constructPath(cmd.uri(), cmd.digest().getHash(), cmd.casDownload());
|
|
- HttpRequest request = buildRequest(path, constructHost(cmd.uri()));
|
|
+ offset = cmd.offset();
|
|
+ skipBytes = OptionalInt.empty();
|
|
+ HttpRequest request = buildRequest(path, constructHost(cmd.uri()), cmd.offset());
|
|
addCredentialHeaders(request, cmd.uri());
|
|
addExtraRemoteHeaders(request);
|
|
addUserAgentHeader(request);
|
|
@@ -159,16 +186,36 @@ final class HttpDownloadHandler extends AbstractHttpHandler<HttpObject> {
|
|
}
|
|
}
|
|
|
|
- private HttpRequest buildRequest(String path, String host) {
|
|
+ private HttpRequest buildRequest(String path, String host, long offset) {
|
|
HttpRequest httpRequest =
|
|
new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, path);
|
|
httpRequest.headers().set(HttpHeaderNames.HOST, host);
|
|
httpRequest.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
|
|
httpRequest.headers().set(HttpHeaderNames.ACCEPT, "*/*");
|
|
httpRequest.headers().set(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP);
|
|
+ if (offset != 0) {
|
|
+ httpRequest.headers().set(HttpHeaderNames.RANGE, String.format("%s=%d-", HttpHeaderValues.BYTES, offset));
|
|
+ }
|
|
return httpRequest;
|
|
}
|
|
|
|
+ private Optional<HttpException> validateContentRangeHeader(HttpHeaders headers) {
|
|
+ if (!headers.contains(HttpHeaderNames.CONTENT_RANGE)) {
|
|
+ return Optional.of(new HttpException(response, "Missing 'Content-Range' header", null));
|
|
+ }
|
|
+ Pattern pattern = Pattern.compile("bytes\\s+(?<start>[0-9]+)-(?<end>[0-9]+)/(?<size>[0-9]*|\\*)");
|
|
+ Matcher matcher = pattern.matcher(response.headers().get(HttpHeaderNames.CONTENT_RANGE));
|
|
+ if (!matcher.matches()) {
|
|
+ return Optional.of(new HttpException(response, "Unexpected 'Content-Range' header", null));
|
|
+ }
|
|
+ long start = Long.valueOf(matcher.group("start"));
|
|
+ if (start != offset) {
|
|
+ return Optional.of(new HttpException(
|
|
+ response, String.format("Unexpected 'Content-Range' start: Expected %d but got %d", offset, start), null));
|
|
+ }
|
|
+ return Optional.empty();
|
|
+ }
|
|
+
|
|
private void succeedAndReset(ChannelHandlerContext ctx) {
|
|
// All resets must happen *before* completing the user promise. Otherwise there is a race
|
|
// condition, where this handler can be reused even though it is closed. In addition, if reset
|
|
diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java
|
|
index 89fde56046..6a2bfd5a50 100644
|
|
--- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java
|
|
+++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java
|
|
@@ -18,7 +18,7 @@ import io.netty.handler.codec.http.HttpResponse;
|
|
import java.io.IOException;
|
|
|
|
/** An exception that propagates the http status. */
|
|
-final class HttpException extends IOException {
|
|
+public final class HttpException extends IOException {
|
|
private final HttpResponse response;
|
|
|
|
HttpException(HttpResponse response, String message, Throwable cause) {
|