daml/nix/bazel-retry-cache.patch
Claudio Bley 8c02057eb8
bazel: Retry cache operation for IOException "Operation timed out" (#14274)
We have seen cache failures caused by:
```
2022-06-13T19:02:13.0964410Z NOT RETRYING: java.io.IOException: Operation timed out
```
which later resulted in:
```
2022-06-13T19:05:42.4633080Z WARNING: Reading from Remote Cache:
2022-06-13T19:05:42.4635340Z com.google.devtools.build.lib.remote.BulkTransferException: Operation timed out
2022-06-13T19:05:42.4636670Z 	at com.google.devtools.build.lib.remote.RemoteCache.waitForBulkTransfer(RemoteCache.java:291)
2022-06-13T19:05:42.4638600Z 	at com.google.devtools.build.lib.remote.RemoteCache.download(RemoteCache.java:466)
2022-06-13T19:05:42.4640330Z 	at com.google.devtools.build.lib.remote.RemoteExecutionService.downloadOutputs(RemoteExecutionService.java:383)
2022-06-13T19:05:42.4643000Z 	at com.google.devtools.build.lib.remote.RemoteSpawnCache.lookup(RemoteSpawnCache.java:120)
2022-06-13T19:05:42.4645110Z 	at com.google.devtools.build.lib.exec.AbstractSpawnStrategy.exec(AbstractSpawnStrategy.java:139)
2022-06-13T19:05:42.4647270Z 	at com.google.devtools.build.lib.exec.AbstractSpawnStrategy.exec(AbstractSpawnStrategy.java:106)
2022-06-13T19:05:42.4649070Z 	at com.google.devtools.build.lib.actions.SpawnStrategy.beginExecution(SpawnStrategy.java:47)
2022-06-13T19:05:42.4650750Z 	at com.google.devtools.build.lib.exec.SpawnStrategyResolver.beginExecution(SpawnStrategyResolver.java:65)
2022-06-13T19:05:42.4655610Z 	at com.google.devtools.build.lib.analysis.actions.SpawnAction.beginExecution(SpawnAction.java:331)
2022-06-13T19:05:42.4657770Z 	at com.google.devtools.build.lib.actions.Action.execute(Action.java:127)
2022-06-13T19:05:42.4667910Z 	at com.google.devtools.build.lib.skyframe.SkyframeActionExecutor$5.execute(SkyframeActionExecutor.java:855)
2022-06-13T19:05:42.4669320Z 	at com.google.devtools.build.lib.skyframe.SkyframeActionExecutor$ActionRunner.continueAction(SkyframeActionExecutor.java:1016)
2022-06-13T19:05:42.4670870Z 	at com.google.devtools.build.lib.skyframe.SkyframeActionExecutor$ActionRunner.run(SkyframeActionExecutor.java:975)
2022-06-13T19:05:42.4672520Z 	at com.google.devtools.build.lib.skyframe.ActionExecutionState.runStateMachine(ActionExecutionState.java:129)
2022-06-13T19:05:42.4673950Z 	at com.google.devtools.build.lib.skyframe.ActionExecutionState.getResultOrDependOnFuture(ActionExecutionState.java:81)
2022-06-13T19:05:42.4675420Z 	at com.google.devtools.build.lib.skyframe.SkyframeActionExecutor.executeAction(SkyframeActionExecutor.java:472)
2022-06-13T19:05:42.4677050Z 	at com.google.devtools.build.lib.skyframe.ActionExecutionFunction.checkCacheAndExecuteIfNeeded(ActionExecutionFunction.java:834)
2022-06-13T19:05:42.4678230Z 	at com.google.devtools.build.lib.skyframe.ActionExecutionFunction.compute(ActionExecutionFunction.java:307)
2022-06-13T19:05:42.4679530Z 	at com.google.devtools.build.skyframe.AbstractParallelEvaluator$Evaluate.run(AbstractParallelEvaluator.java:477)
2022-06-13T19:05:42.4680390Z 	at com.google.devtools.build.lib.concurrent.AbstractQueueVisitor$WrappedRunnable.run(AbstractQueueVisitor.java:398)
2022-06-13T19:05:42.4681260Z 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2022-06-13T19:05:42.4682100Z 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2022-06-13T19:05:42.4683170Z 	at java.base/java.lang.Thread.run(Thread.java:829)
2022-06-13T19:05:42.4683650Z 	Suppressed: java.io.IOException: Operation timed out
2022-06-13T19:05:42.4684210Z 		at java.base/sun.nio.ch.FileDispatcherImpl.read0(Native Method)
2022-06-13T19:05:42.4685100Z 		at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
2022-06-13T19:05:42.4685990Z 		at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276)
2022-06-13T19:05:42.4686710Z 		at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:233)
2022-06-13T19:05:42.4687740Z 		at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
2022-06-13T19:05:42.4688740Z 		at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
2022-06-13T19:05:42.4689620Z 		at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
2022-06-13T19:05:42.4690290Z 		at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
2022-06-13T19:05:42.4692060Z 		at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
2022-06-13T19:05:42.4693020Z 		at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
2022-06-13T19:05:42.4693700Z 		at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
2022-06-13T19:05:42.4694350Z 		at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
2022-06-13T19:05:42.4695020Z 		at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
2022-06-13T19:05:42.4695740Z 		at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
2022-06-13T19:05:42.4696750Z 		at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
2022-06-13T19:05:42.4698000Z 		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
2022-06-13T19:05:42.4699170Z 		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
2022-06-13T19:05:42.4700320Z 		... 1 more
```

This was caused by a time out to the cache server while downloading cached
artifacts and should have been retried. (see [1])

CHANGELOG_BEGIN
CHANGELOG_END

[1]: https://gist.github.com/cocreature/98481cd41029e73d1bc2a26a7661e07a
2022-06-28 13:37:42 +02:00

565 lines
26 KiB
Diff

diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java
index 57741a8f28..6673149a20 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java
@@ -56,15 +56,16 @@ public final class RemoteCacheClientFactory {
RemoteOptions options,
@Nullable Credentials creds,
Path workingDirectory,
- DigestUtil digestUtil)
+ DigestUtil digestUtil,
+ RemoteRetrier retrier)
throws IOException {
Preconditions.checkNotNull(workingDirectory, "workingDirectory");
if (isHttpCache(options) && isDiskCache(options)) {
return createDiskAndHttpCache(
- workingDirectory, options.diskCache, options, creds, digestUtil);
+ workingDirectory, options.diskCache, options, creds, digestUtil, retrier);
}
if (isHttpCache(options)) {
- return createHttp(options, creds, digestUtil);
+ return createHttp(options, creds, digestUtil, retrier);
}
if (isDiskCache(options)) {
return createDiskCache(
@@ -80,7 +81,7 @@ public final class RemoteCacheClientFactory {
}
private static RemoteCacheClient createHttp(
- RemoteOptions options, Credentials creds, DigestUtil digestUtil) {
+ RemoteOptions options, Credentials creds, DigestUtil digestUtil, RemoteRetrier retrier) {
Preconditions.checkNotNull(options.remoteCache, "remoteCache");
try {
@@ -99,6 +100,7 @@ public final class RemoteCacheClientFactory {
options.remoteVerifyDownloads,
ImmutableList.copyOf(options.remoteHeaders),
digestUtil,
+ retrier,
creds);
} else {
throw new Exception("Remote cache proxy unsupported: " + options.remoteProxy);
@@ -111,6 +113,7 @@ public final class RemoteCacheClientFactory {
options.remoteVerifyDownloads,
ImmutableList.copyOf(options.remoteHeaders),
digestUtil,
+ retrier,
creds);
}
} catch (Exception e) {
@@ -137,7 +140,8 @@ public final class RemoteCacheClientFactory {
PathFragment diskCachePath,
RemoteOptions options,
Credentials cred,
- DigestUtil digestUtil)
+ DigestUtil digestUtil,
+ RemoteRetrier retrier)
throws IOException {
Path cacheDir =
workingDirectory.getRelative(Preconditions.checkNotNull(diskCachePath, "diskCachePath"));
@@ -145,7 +149,7 @@ public final class RemoteCacheClientFactory {
cacheDir.createDirectoryAndParents();
}
- RemoteCacheClient httpCache = createHttp(options, cred, digestUtil);
+ RemoteCacheClient httpCache = createHttp(options, cred, digestUtil, retrier);
return createDiskAndRemoteClient(
workingDirectory,
diskCachePath,
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
index 350e1afa51..db81481b60 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
@@ -59,9 +59,11 @@ import com.google.devtools.build.lib.exec.ModuleActionContextRegistry;
import com.google.devtools.build.lib.exec.SpawnStrategyRegistry;
import com.google.devtools.build.lib.packages.TargetUtils;
import com.google.devtools.build.lib.remote.RemoteServerCapabilities.ServerCapabilitiesRequirement;
+import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.common.RemoteExecutionClient;
import com.google.devtools.build.lib.remote.downloader.GrpcRemoteDownloader;
+import com.google.devtools.build.lib.remote.http.HttpException;
import com.google.devtools.build.lib.remote.logging.LoggingInterceptor;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.options.RemoteOutputsMode;
@@ -95,6 +97,7 @@ import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -213,7 +216,33 @@ public final class RemoteModule extends BlazeModule {
remoteOptions,
creds,
Preconditions.checkNotNull(env.getWorkingDirectory(), "workingDirectory"),
- digestUtil);
+ digestUtil,
+ new RemoteRetrier(
+ remoteOptions,
+ (e) -> {
+ boolean retry = false;
+ if (e instanceof ClosedChannelException) {
+ retry = true;
+ } else if (e instanceof HttpException) {
+ retry = true;
+ } else if (e instanceof IOException) {
+ String msg = e.getMessage().toLowerCase();
+ if (msg.contains("connection reset by peer")) {
+ retry = true;
+ } else if (msg.contains("operation timed out")) {
+ retry = true;
+ }
+ }
+ if (retry) {
+ System.err.println("RETRYING: " + e.toString());
+ } else if (!(e instanceof CacheNotFoundException)) {
+ System.err.println("NOT RETRYING: " + e.toString());
+ }
+ return retry;
+ },
+ retryScheduler,
+ Retrier.ALLOW_ALL_CALLS)
+ );
} catch (IOException e) {
handleInitFailure(env, e, Code.CACHE_INIT_FAILURE);
return;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/BUILD b/src/main/java/com/google/devtools/build/lib/remote/http/BUILD
index 9ce71c7c52..5c7f2d0728 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/http/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/remote/http/BUILD
@@ -20,6 +20,7 @@ java_library(
deps = [
"//src/main/java/com/google/devtools/build/lib/analysis:blaze_version_info",
"//src/main/java/com/google/devtools/build/lib/remote/common",
+ "//src/main/java/com/google/devtools/build/lib/remote:Retrier",
"//src/main/java/com/google/devtools/build/lib/remote/util",
"//src/main/java/com/google/devtools/build/lib/vfs",
"//third_party:auth",
diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java b/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java
index a2e4abf9d8..93843a91dc 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java
@@ -25,12 +25,18 @@ final class DownloadCommand {
private final boolean casDownload;
private final Digest digest;
private final OutputStream out;
+ private final long offset;
- DownloadCommand(URI uri, boolean casDownload, Digest digest, OutputStream out) {
+ DownloadCommand(URI uri, boolean casDownload, Digest digest, OutputStream out, long offset) {
this.uri = Preconditions.checkNotNull(uri);
this.casDownload = casDownload;
this.digest = Preconditions.checkNotNull(digest);
this.out = Preconditions.checkNotNull(out);
+ this.offset = offset;
+ }
+
+ DownloadCommand(URI uri, boolean casDownload, Digest digest, OutputStream out) {
+ this(uri, casDownload, digest, out, 0);
}
public URI uri() {
@@ -48,4 +54,6 @@ final class DownloadCommand {
public OutputStream out() {
return out;
}
+
+ public long offset() { return offset; }
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java
index 1efecd3bb1..3f360dda14 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java
@@ -25,6 +25,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
+import com.google.devtools.build.lib.remote.RemoteRetrier;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
@@ -84,6 +85,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
@@ -132,6 +134,7 @@ public final class HttpCacheClient implements RemoteCacheClient {
private final boolean useTls;
private final boolean verifyDownloads;
private final DigestUtil digestUtil;
+ private final RemoteRetrier retrier;
private final Object closeLock = new Object();
@@ -153,6 +156,7 @@ public final class HttpCacheClient implements RemoteCacheClient {
boolean verifyDownloads,
ImmutableList<Entry<String, String>> extraHttpHeaders,
DigestUtil digestUtil,
+ RemoteRetrier retrier,
@Nullable final Credentials creds)
throws Exception {
return new HttpCacheClient(
@@ -164,6 +168,7 @@ public final class HttpCacheClient implements RemoteCacheClient {
verifyDownloads,
extraHttpHeaders,
digestUtil,
+ retrier,
creds,
null);
}
@@ -176,6 +181,7 @@ public final class HttpCacheClient implements RemoteCacheClient {
boolean verifyDownloads,
ImmutableList<Entry<String, String>> extraHttpHeaders,
DigestUtil digestUtil,
+ RemoteRetrier retrier,
@Nullable final Credentials creds)
throws Exception {
@@ -189,6 +195,7 @@ public final class HttpCacheClient implements RemoteCacheClient {
verifyDownloads,
extraHttpHeaders,
digestUtil,
+ retrier,
creds,
domainSocketAddress);
} else if (Epoll.isAvailable()) {
@@ -201,6 +208,7 @@ public final class HttpCacheClient implements RemoteCacheClient {
verifyDownloads,
extraHttpHeaders,
digestUtil,
+ retrier,
creds,
domainSocketAddress);
} else {
@@ -217,6 +225,7 @@ public final class HttpCacheClient implements RemoteCacheClient {
boolean verifyDownloads,
ImmutableList<Entry<String, String>> extraHttpHeaders,
DigestUtil digestUtil,
+ RemoteRetrier retrier,
@Nullable final Credentials creds,
@Nullable SocketAddress socketAddress)
throws Exception {
@@ -285,6 +294,7 @@ public final class HttpCacheClient implements RemoteCacheClient {
this.extraHttpHeaders = extraHttpHeaders;
this.verifyDownloads = verifyDownloads;
this.digestUtil = digestUtil;
+ this.retrier = retrier;
}
@SuppressWarnings("FutureReturnValueIgnored")
@@ -461,6 +471,7 @@ public final class HttpCacheClient implements RemoteCacheClient {
@SuppressWarnings("FutureReturnValueIgnored")
private ListenableFuture<Void> get(Digest digest, final OutputStream out, boolean casDownload) {
final AtomicBoolean dataWritten = new AtomicBoolean();
+ AtomicLong bytesDownloaded = new AtomicLong();
OutputStream wrappedOut =
new OutputStream() {
// OutputStream.close() does nothing, which is what we want to ensure that the
@@ -470,12 +481,14 @@ public final class HttpCacheClient implements RemoteCacheClient {
@Override
public void write(byte[] b, int offset, int length) throws IOException {
dataWritten.set(true);
+ bytesDownloaded.addAndGet(length);
out.write(b, offset, length);
}
@Override
public void write(int b) throws IOException {
dataWritten.set(true);
+ bytesDownloaded.incrementAndGet();
out.write(b);
}
@@ -484,57 +497,59 @@ public final class HttpCacheClient implements RemoteCacheClient {
out.flush();
}
};
- DownloadCommand downloadCmd = new DownloadCommand(uri, casDownload, digest, wrappedOut);
- SettableFuture<Void> outerF = SettableFuture.create();
- acquireDownloadChannel()
- .addListener(
- (Future<Channel> channelPromise) -> {
- if (!channelPromise.isSuccess()) {
- outerF.setException(channelPromise.cause());
- return;
- }
+ return retrier.executeAsync(() -> {
+ DownloadCommand downloadCmd = new DownloadCommand(uri, casDownload, digest, wrappedOut, bytesDownloaded.get());
+ SettableFuture<Void> outerF = SettableFuture.create();
+ acquireDownloadChannel()
+ .addListener(
+ (Future<Channel> channelPromise) -> {
+ if (!channelPromise.isSuccess()) {
+ outerF.setException(channelPromise.cause());
+ return;
+ }
- Channel ch = channelPromise.getNow();
- ch.writeAndFlush(downloadCmd)
- .addListener(
- (f) -> {
- try {
- if (f.isSuccess()) {
- outerF.set(null);
- } else {
- Throwable cause = f.cause();
- // cause can be of type HttpException, because Netty uses
- // Unsafe.throwException to
- // re-throw a checked exception that hasn't been declared in the method
- // signature.
- if (cause instanceof HttpException) {
- HttpResponse response = ((HttpException) cause).response();
- if (!dataWritten.get() && authTokenExpired(response)) {
- // The error is due to an auth token having expired. Let's try
- // again.
- try {
- refreshCredentials();
- getAfterCredentialRefresh(downloadCmd, outerF);
+ Channel ch = channelPromise.getNow();
+ ch.writeAndFlush(downloadCmd)
+ .addListener(
+ (f) -> {
+ try {
+ if (f.isSuccess()) {
+ outerF.set(null);
+ } else {
+ Throwable cause = f.cause();
+ // cause can be of type HttpException, because Netty uses
+ // Unsafe.throwException to
+ // re-throw a checked exception that hasn't been declared in the method
+ // signature.
+ if (cause instanceof HttpException) {
+ HttpResponse response = ((HttpException) cause).response();
+ if (!dataWritten.get() && authTokenExpired(response)) {
+ // The error is due to an auth token having expired. Let's try
+ // again.
+ try {
+ refreshCredentials();
+ getAfterCredentialRefresh(downloadCmd, outerF);
+ return;
+ } catch (IOException e) {
+ cause.addSuppressed(e);
+ } catch (RuntimeException e) {
+ logger.atWarning().withCause(e).log("Unexpected exception");
+ cause.addSuppressed(e);
+ }
+ } else if (cacheMiss(response.status())) {
+ outerF.setException(new CacheNotFoundException(digest));
return;
- } catch (IOException e) {
- cause.addSuppressed(e);
- } catch (RuntimeException e) {
- logger.atWarning().withCause(e).log("Unexpected exception");
- cause.addSuppressed(e);
}
- } else if (cacheMiss(response.status())) {
- outerF.setException(new CacheNotFoundException(digest));
- return;
}
+ outerF.setException(cause);
}
- outerF.setException(cause);
+ } finally {
+ releaseDownloadChannel(ch);
}
- } finally {
- releaseDownloadChannel(ch);
- }
- });
- });
- return outerF;
+ });
+ });
+ return outerF;
+ });
}
@SuppressWarnings("FutureReturnValueIgnored")
@@ -673,20 +688,21 @@ public final class HttpCacheClient implements RemoteCacheClient {
@Override
public ListenableFuture<Void> uploadFile(
RemoteActionExecutionContext context, Digest digest, Path file) {
- try {
- return uploadAsync(
- digest.getHash(), digest.getSizeBytes(), file.getInputStream(), /* casUpload= */ true);
- } catch (IOException e) {
- // Can be thrown from file.getInputStream.
- return Futures.immediateFailedFuture(e);
- }
+ return retrier.executeAsync(() -> {
+ try {
+ return uploadAsync(digest.getHash(), digest.getSizeBytes(), file.getInputStream(), /* casUpload= */ true);
+ } catch (IOException e) {
+ // Can be thrown from file.getInputStream.
+ return Futures.immediateFailedFuture(e);
+ }
+ });
}
@Override
public ListenableFuture<Void> uploadBlob(
RemoteActionExecutionContext context, Digest digest, ByteString data) {
- return uploadAsync(
- digest.getHash(), digest.getSizeBytes(), data.newInput(), /* casUpload= */ true);
+ return retrier.executeAsync(() -> uploadAsync(
+ digest.getHash(), digest.getSizeBytes(), data.newInput(), /* casUpload= */ true));
}
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java
index 50d83d138a..f38dad965f 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java
@@ -20,24 +20,18 @@ import com.google.common.collect.ImmutableList;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
-import io.netty.handler.codec.http.DefaultFullHttpRequest;
-import io.netty.handler.codec.http.HttpContent;
-import io.netty.handler.codec.http.HttpHeaderNames;
-import io.netty.handler.codec.http.HttpHeaderValues;
-import io.netty.handler.codec.http.HttpMethod;
-import io.netty.handler.codec.http.HttpObject;
-import io.netty.handler.codec.http.HttpRequest;
-import io.netty.handler.codec.http.HttpResponse;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http.HttpUtil;
-import io.netty.handler.codec.http.HttpVersion;
-import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.codec.http.*;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.internal.StringUtil;
+
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
/** ChannelHandler for downloads. */
final class HttpDownloadHandler extends AbstractHttpHandler<HttpObject> {
@@ -51,6 +45,10 @@ final class HttpDownloadHandler extends AbstractHttpHandler<HttpObject> {
private long contentLength = -1;
/** the path header in the http request */
private String path;
+ /** the offset at which to download */
+ private long offset;
+ /** the bytes to skip in a full or chunked response */
+ private OptionalInt skipBytes;
public HttpDownloadHandler(
Credentials credentials, ImmutableList<Entry<String, String>> extraHttpHeaders) {
@@ -93,7 +91,25 @@ final class HttpDownloadHandler extends AbstractHttpHandler<HttpObject> {
if (contentLengthSet) {
contentLength = HttpUtil.getContentLength(response);
}
- downloadSucceeded = response.status().equals(HttpResponseStatus.OK);
+ boolean full_content = response.status().equals(HttpResponseStatus.OK);
+ boolean partial_content = response.status().equals(HttpResponseStatus.PARTIAL_CONTENT);
+ if (full_content) {
+ if (offset != 0) {
+ // We requested a range but the server replied with a full response.
+ // We need to skip `offset` bytes of the response.
+ if (!skipBytes.isPresent()) {
+ // This is the first chunk, or the full response.
+ skipBytes = OptionalInt.of((int)offset);
+ }
+ }
+ } else if (partial_content) {
+ Optional<HttpException> error = validateContentRangeHeader(response.headers());
+ if (error.isPresent()) {
+ failAndClose(error.get(), ctx);
+ return;
+ }
+ }
+ downloadSucceeded = full_content || partial_content;
if (!downloadSucceeded) {
out = new ByteArrayOutputStream();
}
@@ -105,6 +121,15 @@ final class HttpDownloadHandler extends AbstractHttpHandler<HttpObject> {
ByteBuf content = ((HttpContent) msg).content();
int readableBytes = content.readableBytes();
+ if (skipBytes.isPresent() && skipBytes.getAsInt() > 0) {
+ int skipNow = skipBytes.getAsInt();
+ if (skipNow >= readableBytes) {
+ skipNow = readableBytes;
+ }
+ content.readerIndex(content.readerIndex() + skipNow);
+ skipBytes = OptionalInt.of(skipBytes.getAsInt() - skipNow);
+ readableBytes = readableBytes - skipNow;
+ }
content.readBytes(out, readableBytes);
bytesReceived += readableBytes;
if (msg instanceof LastHttpContent) {
@@ -137,7 +162,9 @@ final class HttpDownloadHandler extends AbstractHttpHandler<HttpObject> {
DownloadCommand cmd = (DownloadCommand) msg;
out = cmd.out();
path = constructPath(cmd.uri(), cmd.digest().getHash(), cmd.casDownload());
- HttpRequest request = buildRequest(path, constructHost(cmd.uri()));
+ offset = cmd.offset();
+ skipBytes = OptionalInt.empty();
+ HttpRequest request = buildRequest(path, constructHost(cmd.uri()), cmd.offset());
addCredentialHeaders(request, cmd.uri());
addExtraRemoteHeaders(request);
addUserAgentHeader(request);
@@ -159,16 +186,36 @@ final class HttpDownloadHandler extends AbstractHttpHandler<HttpObject> {
}
}
- private HttpRequest buildRequest(String path, String host) {
+ private HttpRequest buildRequest(String path, String host, long offset) {
HttpRequest httpRequest =
new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, path);
httpRequest.headers().set(HttpHeaderNames.HOST, host);
httpRequest.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
httpRequest.headers().set(HttpHeaderNames.ACCEPT, "*/*");
httpRequest.headers().set(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP);
+ if (offset != 0) {
+ httpRequest.headers().set(HttpHeaderNames.RANGE, String.format("%s=%d-", HttpHeaderValues.BYTES, offset));
+ }
return httpRequest;
}
+ private Optional<HttpException> validateContentRangeHeader(HttpHeaders headers) {
+ if (!headers.contains(HttpHeaderNames.CONTENT_RANGE)) {
+ return Optional.of(new HttpException(response, "Missing 'Content-Range' header", null));
+ }
+ Pattern pattern = Pattern.compile("bytes\\s+(?<start>[0-9]+)-(?<end>[0-9]+)/(?<size>[0-9]*|\\*)");
+ Matcher matcher = pattern.matcher(response.headers().get(HttpHeaderNames.CONTENT_RANGE));
+ if (!matcher.matches()) {
+ return Optional.of(new HttpException(response, "Unexpected 'Content-Range' header", null));
+ }
+ long start = Long.valueOf(matcher.group("start"));
+ if (start != offset) {
+ return Optional.of(new HttpException(
+ response, String.format("Unexpected 'Content-Range' start: Expected %d but got %d", offset, start), null));
+ }
+ return Optional.empty();
+ }
+
private void succeedAndReset(ChannelHandlerContext ctx) {
// All resets must happen *before* completing the user promise. Otherwise there is a race
// condition, where this handler can be reused even though it is closed. In addition, if reset
diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java
index 89fde56046..6a2bfd5a50 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java
@@ -18,7 +18,7 @@ import io.netty.handler.codec.http.HttpResponse;
import java.io.IOException;
/** An exception that propagates the http status. */
-final class HttpException extends IOException {
+public final class HttpException extends IOException {
private final HttpResponse response;
HttpException(HttpResponse response, String message, Throwable cause) {