diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java index 57741a8f28..6673149a20 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java @@ -56,15 +56,16 @@ public final class RemoteCacheClientFactory { RemoteOptions options, @Nullable Credentials creds, Path workingDirectory, - DigestUtil digestUtil) + DigestUtil digestUtil, + RemoteRetrier retrier) throws IOException { Preconditions.checkNotNull(workingDirectory, "workingDirectory"); if (isHttpCache(options) && isDiskCache(options)) { return createDiskAndHttpCache( - workingDirectory, options.diskCache, options, creds, digestUtil); + workingDirectory, options.diskCache, options, creds, digestUtil, retrier); } if (isHttpCache(options)) { - return createHttp(options, creds, digestUtil); + return createHttp(options, creds, digestUtil, retrier); } if (isDiskCache(options)) { return createDiskCache( @@ -80,7 +81,7 @@ public final class RemoteCacheClientFactory { } private static RemoteCacheClient createHttp( - RemoteOptions options, Credentials creds, DigestUtil digestUtil) { + RemoteOptions options, Credentials creds, DigestUtil digestUtil, RemoteRetrier retrier) { Preconditions.checkNotNull(options.remoteCache, "remoteCache"); try { @@ -99,6 +100,7 @@ public final class RemoteCacheClientFactory { options.remoteVerifyDownloads, ImmutableList.copyOf(options.remoteHeaders), digestUtil, + retrier, creds); } else { throw new Exception("Remote cache proxy unsupported: " + options.remoteProxy); @@ -111,6 +113,7 @@ public final class RemoteCacheClientFactory { options.remoteVerifyDownloads, ImmutableList.copyOf(options.remoteHeaders), digestUtil, + retrier, creds); } } catch (Exception e) { @@ -137,7 +140,8 @@ public final class RemoteCacheClientFactory { PathFragment diskCachePath, RemoteOptions options, Credentials cred, - DigestUtil digestUtil) + DigestUtil digestUtil, + RemoteRetrier retrier) throws IOException { Path cacheDir = workingDirectory.getRelative(Preconditions.checkNotNull(diskCachePath, "diskCachePath")); @@ -145,7 +149,7 @@ public final class RemoteCacheClientFactory { cacheDir.createDirectoryAndParents(); } - RemoteCacheClient httpCache = createHttp(options, cred, digestUtil); + RemoteCacheClient httpCache = createHttp(options, cred, digestUtil, retrier); return createDiskAndRemoteClient( workingDirectory, diskCachePath, diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index 0e165d1521..ff33a84afb 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java @@ -64,9 +64,11 @@ import com.google.devtools.build.lib.exec.ModuleActionContextRegistry; import com.google.devtools.build.lib.exec.SpawnStrategyRegistry; import com.google.devtools.build.lib.packages.TargetUtils; import com.google.devtools.build.lib.remote.RemoteServerCapabilities.ServerCapabilitiesRequirement; +import com.google.devtools.build.lib.remote.common.CacheNotFoundException; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.devtools.build.lib.remote.common.RemoteExecutionClient; import com.google.devtools.build.lib.remote.downloader.GrpcRemoteDownloader; +import com.google.devtools.build.lib.remote.http.HttpException; import com.google.devtools.build.lib.remote.logging.LoggingInterceptor; import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.remote.options.RemoteOutputsMode; @@ -103,6 +105,7 @@ import io.reactivex.rxjava3.plugins.RxJavaPlugins; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.nio.channels.ClosedChannelException; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -232,7 +235,31 @@ public final class RemoteModule extends BlazeModule { remoteOptions, creds, Preconditions.checkNotNull(env.getWorkingDirectory(), "workingDirectory"), - digestUtil); + digestUtil, + new RemoteRetrier( + remoteOptions, + (e) -> { + boolean retry = false; + if (e instanceof ClosedChannelException) { + retry = true; + } else if (e instanceof HttpException) { + retry = true; + } else if (e instanceof IOException) { + String msg = e.getMessage().toLowerCase(); + if (msg.contains("connection reset by peer")) { + retry = true; + } + } + if (retry) { + System.err.println("RETRYING: " + e.toString()); + } else if (!(e instanceof CacheNotFoundException)) { + System.err.println("NOT RETRYING: " + e.toString()); + } + return retry; + }, + retryScheduler, + Retrier.ALLOW_ALL_CALLS) + ); } catch (IOException e) { handleInitFailure(env, e, Code.CACHE_INIT_FAILURE); return; diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/BUILD b/src/main/java/com/google/devtools/build/lib/remote/http/BUILD index 9ce71c7c52..5c7f2d0728 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/http/BUILD @@ -20,6 +20,7 @@ java_library( deps = [ "//src/main/java/com/google/devtools/build/lib/analysis:blaze_version_info", "//src/main/java/com/google/devtools/build/lib/remote/common", + "//src/main/java/com/google/devtools/build/lib/remote:Retrier", "//src/main/java/com/google/devtools/build/lib/remote/util", "//src/main/java/com/google/devtools/build/lib/vfs", "//third_party:auth", diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java b/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java index a2e4abf9d8..93843a91dc 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java @@ -25,12 +25,18 @@ final class DownloadCommand { private final boolean casDownload; private final Digest digest; private final OutputStream out; + private final long offset; - DownloadCommand(URI uri, boolean casDownload, Digest digest, OutputStream out) { + DownloadCommand(URI uri, boolean casDownload, Digest digest, OutputStream out, long offset) { this.uri = Preconditions.checkNotNull(uri); this.casDownload = casDownload; this.digest = Preconditions.checkNotNull(digest); this.out = Preconditions.checkNotNull(out); + this.offset = offset; + } + + DownloadCommand(URI uri, boolean casDownload, Digest digest, OutputStream out) { + this(uri, casDownload, digest, out, 0); } public URI uri() { @@ -48,4 +54,6 @@ final class DownloadCommand { public OutputStream out() { return out; } + + public long offset() { return offset; } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java index 3cbf6cf47f..4453d5fcc6 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java @@ -24,6 +24,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; +import com.google.devtools.build.lib.remote.RemoteRetrier; import com.google.devtools.build.lib.remote.common.CacheNotFoundException; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; @@ -82,6 +83,7 @@ import java.util.NoSuchElementException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.regex.Pattern; import javax.annotation.Nullable; @@ -130,6 +132,7 @@ public final class HttpCacheClient implements RemoteCacheClient { private final boolean useTls; private final boolean verifyDownloads; private final DigestUtil digestUtil; + private final RemoteRetrier retrier; private final Object closeLock = new Object(); @@ -151,6 +154,7 @@ public final class HttpCacheClient implements RemoteCacheClient { boolean verifyDownloads, ImmutableList> extraHttpHeaders, DigestUtil digestUtil, + RemoteRetrier retrier, @Nullable final Credentials creds) throws Exception { return new HttpCacheClient( @@ -162,6 +166,7 @@ public final class HttpCacheClient implements RemoteCacheClient { verifyDownloads, extraHttpHeaders, digestUtil, + retrier, creds, null); } @@ -174,6 +179,7 @@ public final class HttpCacheClient implements RemoteCacheClient { boolean verifyDownloads, ImmutableList> extraHttpHeaders, DigestUtil digestUtil, + RemoteRetrier retrier, @Nullable final Credentials creds) throws Exception { @@ -187,6 +193,7 @@ public final class HttpCacheClient implements RemoteCacheClient { verifyDownloads, extraHttpHeaders, digestUtil, + retrier, creds, domainSocketAddress); } else if (Epoll.isAvailable()) { @@ -199,6 +206,7 @@ public final class HttpCacheClient implements RemoteCacheClient { verifyDownloads, extraHttpHeaders, digestUtil, + retrier, creds, domainSocketAddress); } else { @@ -215,6 +223,7 @@ public final class HttpCacheClient implements RemoteCacheClient { boolean verifyDownloads, ImmutableList> extraHttpHeaders, DigestUtil digestUtil, + RemoteRetrier retrier, @Nullable final Credentials creds, @Nullable SocketAddress socketAddress) throws Exception { @@ -283,6 +292,7 @@ public final class HttpCacheClient implements RemoteCacheClient { this.extraHttpHeaders = extraHttpHeaders; this.verifyDownloads = verifyDownloads; this.digestUtil = digestUtil; + this.retrier = retrier; } @SuppressWarnings("FutureReturnValueIgnored") @@ -459,6 +469,7 @@ public final class HttpCacheClient implements RemoteCacheClient { @SuppressWarnings("FutureReturnValueIgnored") private ListenableFuture get(Digest digest, final OutputStream out, boolean casDownload) { final AtomicBoolean dataWritten = new AtomicBoolean(); + AtomicLong bytesDownloaded = new AtomicLong(); OutputStream wrappedOut = new OutputStream() { // OutputStream.close() does nothing, which is what we want to ensure that the @@ -468,12 +479,14 @@ public final class HttpCacheClient implements RemoteCacheClient { @Override public void write(byte[] b, int offset, int length) throws IOException { dataWritten.set(true); + bytesDownloaded.addAndGet(length); out.write(b, offset, length); } @Override public void write(int b) throws IOException { dataWritten.set(true); + bytesDownloaded.incrementAndGet(); out.write(b); } @@ -482,57 +495,59 @@ public final class HttpCacheClient implements RemoteCacheClient { out.flush(); } }; - DownloadCommand downloadCmd = new DownloadCommand(uri, casDownload, digest, wrappedOut); - SettableFuture outerF = SettableFuture.create(); - acquireDownloadChannel() - .addListener( - (Future channelPromise) -> { - if (!channelPromise.isSuccess()) { - outerF.setException(channelPromise.cause()); - return; - } + return retrier.executeAsync(() -> { + DownloadCommand downloadCmd = new DownloadCommand(uri, casDownload, digest, wrappedOut, bytesDownloaded.get()); + SettableFuture outerF = SettableFuture.create(); + acquireDownloadChannel() + .addListener( + (Future channelPromise) -> { + if (!channelPromise.isSuccess()) { + outerF.setException(channelPromise.cause()); + return; + } - Channel ch = channelPromise.getNow(); - ch.writeAndFlush(downloadCmd) - .addListener( - (f) -> { - try { - if (f.isSuccess()) { - outerF.set(null); - } else { - Throwable cause = f.cause(); - // cause can be of type HttpException, because Netty uses - // Unsafe.throwException to - // re-throw a checked exception that hasn't been declared in the method - // signature. - if (cause instanceof HttpException) { - HttpResponse response = ((HttpException) cause).response(); - if (!dataWritten.get() && authTokenExpired(response)) { - // The error is due to an auth token having expired. Let's try - // again. - try { - refreshCredentials(); - getAfterCredentialRefresh(downloadCmd, outerF); + Channel ch = channelPromise.getNow(); + ch.writeAndFlush(downloadCmd) + .addListener( + (f) -> { + try { + if (f.isSuccess()) { + outerF.set(null); + } else { + Throwable cause = f.cause(); + // cause can be of type HttpException, because Netty uses + // Unsafe.throwException to + // re-throw a checked exception that hasn't been declared in the method + // signature. + if (cause instanceof HttpException) { + HttpResponse response = ((HttpException) cause).response(); + if (!dataWritten.get() && authTokenExpired(response)) { + // The error is due to an auth token having expired. Let's try + // again. + try { + refreshCredentials(); + getAfterCredentialRefresh(downloadCmd, outerF); + return; + } catch (IOException e) { + cause.addSuppressed(e); + } catch (RuntimeException e) { + logger.atWarning().withCause(e).log("Unexpected exception"); + cause.addSuppressed(e); + } + } else if (cacheMiss(response.status())) { + outerF.setException(new CacheNotFoundException(digest)); return; - } catch (IOException e) { - cause.addSuppressed(e); - } catch (RuntimeException e) { - logger.atWarning().withCause(e).log("Unexpected exception"); - cause.addSuppressed(e); } - } else if (cacheMiss(response.status())) { - outerF.setException(new CacheNotFoundException(digest)); - return; } + outerF.setException(cause); } - outerF.setException(cause); + } finally { + releaseDownloadChannel(ch); } - } finally { - releaseDownloadChannel(ch); - } - }); - }); - return outerF; + }); + }); + return outerF; + }); } @SuppressWarnings("FutureReturnValueIgnored") @@ -674,20 +689,21 @@ public final class HttpCacheClient implements RemoteCacheClient { @Override public ListenableFuture uploadFile( RemoteActionExecutionContext context, Digest digest, Path file) { - try { - return uploadAsync( - digest.getHash(), digest.getSizeBytes(), file.getInputStream(), /* casUpload= */ true); - } catch (IOException e) { - // Can be thrown from file.getInputStream. - return Futures.immediateFailedFuture(e); - } + return retrier.executeAsync(() -> { + try { + return uploadAsync(digest.getHash(), digest.getSizeBytes(), file.getInputStream(), /* casUpload= */ true); + } catch (IOException e) { + // Can be thrown from file.getInputStream. + return Futures.immediateFailedFuture(e); + } + }); } @Override public ListenableFuture uploadBlob( RemoteActionExecutionContext context, Digest digest, ByteString data) { - return uploadAsync( - digest.getHash(), digest.getSizeBytes(), data.newInput(), /* casUpload= */ true); + return retrier.executeAsync(() -> uploadAsync( + digest.getHash(), digest.getSizeBytes(), data.newInput(), /* casUpload= */ true)); } @Override diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java index 50d83d138a..f38dad965f 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java @@ -20,24 +20,18 @@ import com.google.common.collect.ImmutableList; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; -import io.netty.handler.codec.http.DefaultFullHttpRequest; -import io.netty.handler.codec.http.HttpContent; -import io.netty.handler.codec.http.HttpHeaderNames; -import io.netty.handler.codec.http.HttpHeaderValues; -import io.netty.handler.codec.http.HttpMethod; -import io.netty.handler.codec.http.HttpObject; -import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.codec.http.HttpResponse; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.HttpUtil; -import io.netty.handler.codec.http.HttpVersion; -import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.*; import io.netty.handler.timeout.ReadTimeoutException; import io.netty.util.internal.StringUtil; + import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.Map.Entry; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** ChannelHandler for downloads. */ final class HttpDownloadHandler extends AbstractHttpHandler { @@ -51,6 +45,10 @@ final class HttpDownloadHandler extends AbstractHttpHandler { private long contentLength = -1; /** the path header in the http request */ private String path; + /** the offset at which to download */ + private long offset; + /** the bytes to skip in a full or chunked response */ + private OptionalInt skipBytes; public HttpDownloadHandler( Credentials credentials, ImmutableList> extraHttpHeaders) { @@ -93,7 +91,25 @@ final class HttpDownloadHandler extends AbstractHttpHandler { if (contentLengthSet) { contentLength = HttpUtil.getContentLength(response); } - downloadSucceeded = response.status().equals(HttpResponseStatus.OK); + boolean full_content = response.status().equals(HttpResponseStatus.OK); + boolean partial_content = response.status().equals(HttpResponseStatus.PARTIAL_CONTENT); + if (full_content) { + if (offset != 0) { + // We requested a range but the server replied with a full response. + // We need to skip `offset` bytes of the response. + if (!skipBytes.isPresent()) { + // This is the first chunk, or the full response. + skipBytes = OptionalInt.of((int)offset); + } + } + } else if (partial_content) { + Optional error = validateContentRangeHeader(response.headers()); + if (error.isPresent()) { + failAndClose(error.get(), ctx); + return; + } + } + downloadSucceeded = full_content || partial_content; if (!downloadSucceeded) { out = new ByteArrayOutputStream(); } @@ -105,6 +121,15 @@ final class HttpDownloadHandler extends AbstractHttpHandler { ByteBuf content = ((HttpContent) msg).content(); int readableBytes = content.readableBytes(); + if (skipBytes.isPresent() && skipBytes.getAsInt() > 0) { + int skipNow = skipBytes.getAsInt(); + if (skipNow >= readableBytes) { + skipNow = readableBytes; + } + content.readerIndex(content.readerIndex() + skipNow); + skipBytes = OptionalInt.of(skipBytes.getAsInt() - skipNow); + readableBytes = readableBytes - skipNow; + } content.readBytes(out, readableBytes); bytesReceived += readableBytes; if (msg instanceof LastHttpContent) { @@ -137,7 +162,9 @@ final class HttpDownloadHandler extends AbstractHttpHandler { DownloadCommand cmd = (DownloadCommand) msg; out = cmd.out(); path = constructPath(cmd.uri(), cmd.digest().getHash(), cmd.casDownload()); - HttpRequest request = buildRequest(path, constructHost(cmd.uri())); + offset = cmd.offset(); + skipBytes = OptionalInt.empty(); + HttpRequest request = buildRequest(path, constructHost(cmd.uri()), cmd.offset()); addCredentialHeaders(request, cmd.uri()); addExtraRemoteHeaders(request); addUserAgentHeader(request); @@ -159,16 +186,36 @@ final class HttpDownloadHandler extends AbstractHttpHandler { } } - private HttpRequest buildRequest(String path, String host) { + private HttpRequest buildRequest(String path, String host, long offset) { HttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, path); httpRequest.headers().set(HttpHeaderNames.HOST, host); httpRequest.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); httpRequest.headers().set(HttpHeaderNames.ACCEPT, "*/*"); httpRequest.headers().set(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP); + if (offset != 0) { + httpRequest.headers().set(HttpHeaderNames.RANGE, String.format("%s=%d-", HttpHeaderValues.BYTES, offset)); + } return httpRequest; } + private Optional validateContentRangeHeader(HttpHeaders headers) { + if (!headers.contains(HttpHeaderNames.CONTENT_RANGE)) { + return Optional.of(new HttpException(response, "Missing 'Content-Range' header", null)); + } + Pattern pattern = Pattern.compile("bytes\\s+(?[0-9]+)-(?[0-9]+)/(?[0-9]*|\\*)"); + Matcher matcher = pattern.matcher(response.headers().get(HttpHeaderNames.CONTENT_RANGE)); + if (!matcher.matches()) { + return Optional.of(new HttpException(response, "Unexpected 'Content-Range' header", null)); + } + long start = Long.valueOf(matcher.group("start")); + if (start != offset) { + return Optional.of(new HttpException( + response, String.format("Unexpected 'Content-Range' start: Expected %d but got %d", offset, start), null)); + } + return Optional.empty(); + } + private void succeedAndReset(ChannelHandlerContext ctx) { // All resets must happen *before* completing the user promise. Otherwise there is a race // condition, where this handler can be reused even though it is closed. In addition, if reset diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java index 89fde56046..6a2bfd5a50 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java @@ -18,7 +18,7 @@ import io.netty.handler.codec.http.HttpResponse; import java.io.IOException; /** An exception that propagates the http status. */ -final class HttpException extends IOException { +public final class HttpException extends IOException { private final HttpResponse response; HttpException(HttpResponse response, String message, Throwable cause) {