sq/libsq/files/internal/downloader/downloader.go
Neil O'Toole 181e128a2d
Release wrap up v0.47.0 (#377)
* Misc cleanup pre-release
2024-01-28 14:55:51 -07:00

594 lines
18 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package downloader provides a mechanism for getting files from
// HTTP/S URLs, making use of a mostly RFC-compliant cache.
//
// The entrypoint is downloader.New.
//
// Acknowledgement: This package is a heavily customized fork
// of https://github.com/gregjones/httpcache, via bitcomplete/download.
package downloader
import (
"bufio"
"context"
"errors"
"io"
"net/http"
"net/url"
"os"
"path/filepath"
"sync"
"time"
"github.com/neilotoole/streamcache"
"github.com/neilotoole/sq/libsq/core/errz"
"github.com/neilotoole/sq/libsq/core/ioz/checksum"
"github.com/neilotoole/sq/libsq/core/ioz/httpz"
"github.com/neilotoole/sq/libsq/core/lg"
"github.com/neilotoole/sq/libsq/core/lg/lga"
"github.com/neilotoole/sq/libsq/core/lg/lgm"
"github.com/neilotoole/sq/libsq/core/options"
"github.com/neilotoole/sq/libsq/core/progress"
)
var OptContinueOnError = options.NewBool(
"download.refresh.continue-on-error",
"",
false,
0,
true,
"Continue with stale download if refresh fails",
`Continue with stale download if refresh fails. This option applies if a download
is in the cache, but is considered stale, and a refresh attempt fails. If set to
true, the refresh error is logged, and the stale download is returned. This is a
sort of "Airplane Mode" for downloads: when true, sq continues with the cached
download when the network is unavailable. If false, an error is returned instead.`,
options.TagSource,
)
var OptCache = options.NewBool(
"download.cache",
"",
false,
0,
true,
"Cache downloads",
`Cache downloaded remote files. When false, the download cache is not used and
the file is re-downloaded on each command.`,
options.TagSource,
)
// State is an enumeration of caching states based on the cache-control
// values of the request and the response.
//
// - Uncached indicates the item is not cached.
// - Fresh indicates that the cached item can be returned.
// - Stale indicates that the cached item needs validating before it is returned.
// - Transparent indicates the cached item should not be used to fulfil the request.
//
// Because this is only a private cache, 'public' and 'private' in cache-control aren't
// significant. Similarly, smax-age isn't used.
type State int
const (
// Uncached indicates that the item is not cached.
Uncached State = iota
// Stale indicates that the cached item needs validating before it is returned.
Stale
// Fresh indicates the cached item can be returned.
Fresh
// Transparent indicates the cached item should not be used to fulfil the request.
Transparent
)
// String returns a string representation of State.
func (s State) String() string {
switch s {
case Uncached:
return "uncached"
case Stale:
return "stale"
case Fresh:
return "fresh"
case Transparent:
return "transparent"
default:
return "unknown"
}
}
// XFromCache is the header added to responses that are returned from the cache.
const XFromCache = "X-From-Stream"
// Downloader encapsulates downloading a file from a URL, using a local
// disk cache if possible. Downloader.Get makes uses of the Handler callback
// mechanism to facilitate early consumption of a download stream while the
// download is still in flight.
type Downloader struct {
// c is the HTTP client used to make requests.
c *http.Client
// cache implements the on-disk cache. If nil, caching is disabled.
// It will be created in dlDir.
cache *cache
// dlStream is the streamcache.Stream that is passed Handler.Uncached for an
// active download. This field is reset to nil on each call to Downloader.Get.
dlStream *streamcache.Stream
// name is a user-friendly name, such as a source handle like @data.
name string
// url is the URL of the download. It is parsed in downloader.New,
// thus is guaranteed to be valid.
url string
// dlDir is the directory in which the download cache is stored.
dlDir string
// mu guards concurrent access to the Downloader.
mu sync.Mutex
// continueOnError, if true, indicates that the downloader
// should server the cached file if a refresh attempt fails.
continueOnError bool
// markCachedResponses, if true, indicates that responses returned from the
// cache will be given an extra header, X-From-cache.
markCachedResponses bool
}
// New returns a new Downloader for url that caches downloads in dlDir.
// Arg name is a user-friendly label, such as a source handle like @data.
// The name may show up in logs, or progress indicators etc.
func New(name string, c *http.Client, dlURL, dlDir string) (*Downloader, error) {
_, err := url.ParseRequestURI(dlURL)
if err != nil {
return nil, errz.Wrap(err, "invalid download URL")
}
if dlDir, err = filepath.Abs(dlDir); err != nil {
return nil, errz.Err(err)
}
dl := &Downloader{
name: name,
c: c,
url: dlURL,
markCachedResponses: true,
continueOnError: true,
dlDir: dlDir,
}
return dl, nil
}
// Get attempts to get the remote file, invoking Handler as appropriate. Exactly
// one of the Handler methods will be invoked, one time.
//
// - If Handler.Uncached is invoked, a download stream has begun. Get will
// then block until the download is completed. The download resp.Body is
// written to cache, and on success, the filepath to the newly updated
// cache file is returned.
// If an error occurs during cache write, the error is logged, and Get
// returns the filepath of the previously cached download, if permitted
// by policy. If not permitted or not existing, empty string is returned.
// - If Handler.Cached is invoked, Get returns immediately afterwards with
// the filepath of the cached download (the same value provided to
// Handler.Cached).
// - If Handler.Error is invoked, there was an unrecoverable problem (e.g. a
// transport error, and there's no previous cache) and the download is
// unavailable. That error should be propagated up the stack. Get will
// return empty string.
//
// Get consults the context for options. In particular, it makes
// use of OptCache and OptContinueOnError.
func (dl *Downloader) Get(ctx context.Context, h Handler) (cacheFile string) {
dl.mu.Lock()
defer dl.mu.Unlock()
o := options.FromContext(ctx)
dl.continueOnError = OptContinueOnError.Get(o)
if OptCache.Get(o) {
dl.cache = &cache{dir: dl.dlDir}
}
req := dl.mustRequest(ctx)
lg.FromContext(ctx).Debug("Get download", lga.URL, dl.url)
cacheFile = dl.get(req, h)
return cacheFile
}
// get contains the main logic for getting the download.
// It invokes Handler as appropriate, and on success returns the
// filepath of the valid cached download file.
func (dl *Downloader) get(req *http.Request, h Handler) (cacheFile string) { //nolint:gocognit,funlen,cyclop
ctx := req.Context()
log := lg.FromContext(ctx)
dl.dlStream = nil
var fpBody string
if dl.cache != nil {
_, fpBody, _ = dl.cache.paths(req)
}
state := dl.state(req)
if state == Fresh && fpBody != "" {
// The cached response is fresh, so we can return it.
h.Cached(fpBody)
return fpBody
}
cacheable := dl.isCacheable(req)
var err error
var cachedResp *http.Response
if cacheable {
cachedResp, err = dl.cache.get(req.Context(), req) //nolint:bodyclose
}
var resp *http.Response
if cacheable && cachedResp != nil && err == nil { //nolint:nestif
if dl.markCachedResponses {
cachedResp.Header.Set(XFromCache, "1")
}
if varyMatches(cachedResp, req) {
// Can only use cached value if the new request doesn't Vary significantly
freshness := getFreshness(cachedResp.Header, req.Header)
if freshness == Fresh && fpBody != "" {
lg.WarnIfCloseError(log, lgm.CloseHTTPResponseBody, cachedResp.Body)
h.Cached(fpBody)
return fpBody
}
if freshness == Stale {
var req2 *http.Request
// Add validators if caller hasn't already done so
etag := cachedResp.Header.Get("etag")
if etag != "" && req.Header.Get("etag") == "" {
req2 = cloneRequest(req)
req2.Header.Set("if-none-match", etag)
}
lastModified := cachedResp.Header.Get("last-modified")
if lastModified != "" && req.Header.Get("last-modified") == "" {
if req2 == nil {
req2 = cloneRequest(req)
}
req2.Header.Set("if-modified-since", lastModified)
}
if req2 != nil {
req = req2
}
}
}
resp, err = dl.do(req) //nolint:bodyclose
switch {
case err == nil && req.Method == http.MethodGet && resp.StatusCode == http.StatusNotModified:
// Replace the 304 response with the one from cache, but update with some new headers
endToEndHeaders := getEndToEndHeaders(resp.Header)
for _, header := range endToEndHeaders {
cachedResp.Header[header] = resp.Header[header]
}
lg.WarnIfCloseError(log, lgm.CloseHTTPResponseBody, resp.Body)
resp = cachedResp
case fpBody != "" && (err != nil || resp.StatusCode >= 500) &&
req.Method == http.MethodGet && canStaleOnError(cachedResp.Header, req.Header):
// In case of transport failure and stale-if-error activated, returns cached content
// when available.
lg.WarnIfCloseError(log, lgm.CloseHTTPResponseBody, cachedResp.Body)
log.Warn("Returning cached response due to transport failure", lga.Err, err)
h.Cached(fpBody)
return fpBody
default:
if err != nil && resp != nil && resp.StatusCode != http.StatusOK {
log.Warn("Unexpected HTTP status from server; will serve from cache if possible",
lga.Err, err, lga.Status, resp.StatusCode)
if fp := dl.cacheFileOnError(req, err); fp != "" {
lg.WarnIfCloseError(log, lgm.CloseHTTPResponseBody, resp.Body)
h.Cached(fp)
return fp
}
}
if err != nil {
lg.WarnIfCloseError(log, lgm.CloseHTTPResponseBody, resp.Body)
if fp := dl.cacheFileOnError(req, err); fp != "" {
h.Cached(fp)
return fp
}
h.Error(err)
return ""
}
if resp.StatusCode != http.StatusOK {
lg.WarnIfCloseError(log, lgm.CloseHTTPResponseBody, resp.Body)
err = errz.Errorf("download: unexpected HTTP status: %s", httpz.StatusText(resp.StatusCode))
if fp := dl.cacheFileOnError(req, err); fp != "" {
h.Cached(fp)
return fp
}
h.Error(err)
return ""
}
}
} else {
reqCacheControl := parseCacheControl(req.Header)
if _, ok := reqCacheControl["only-if-cached"]; ok {
resp = newGatewayTimeoutResponse(req) //nolint:bodyclose
} else {
resp, err = dl.do(req) //nolint:bodyclose
if err != nil {
if fp := dl.cacheFileOnError(req, err); fp != "" {
h.Cached(fp)
return fp
}
h.Error(err)
return ""
}
}
}
if cacheable && canStore(parseCacheControl(req.Header), parseCacheControl(resp.Header)) {
for _, varyKey := range headerAllCommaSepValues(resp.Header, "vary") {
varyKey = http.CanonicalHeaderKey(varyKey)
fakeHeader := "X-Varied-" + varyKey
reqValue := req.Header.Get(varyKey)
if reqValue != "" {
resp.Header.Set(fakeHeader, reqValue)
}
}
if resp == cachedResp {
err = dl.cache.write(ctx, resp, true)
lg.WarnIfCloseError(log, lgm.CloseHTTPResponseBody, resp.Body)
if err != nil {
log.Error("Failed to update cache header", lga.Dir, dl.cache.dir, lga.Err, err)
if fp := dl.cacheFileOnError(req, err); fp != "" {
h.Cached(fp)
return fp
}
h.Error(err)
return ""
}
if fpBody != "" {
h.Cached(fpBody)
return fpBody
}
} else if cachedResp != nil && cachedResp.Body != nil {
lg.WarnIfCloseError(log, lgm.CloseHTTPResponseBody, cachedResp.Body)
}
dl.dlStream = streamcache.New(resp.Body)
resp.Body = dl.dlStream.NewReader(ctx)
h.Uncached(dl.dlStream)
if err = dl.cache.write(req.Context(), resp, false); err != nil {
// We don't explicitly call Handler.Error: it would be "illegal" to do so
// anyway, because the Handler docs state that at most one Handler callback
// func is ever invoked.
//
// The cache write could fail for one of two reasons:
//
// - The download didn't complete successfully: that is, there was an error
// reading from resp.Body. In this case, that same error will be propagated
// to the Handler via the streamcache.Stream that was provided to Handler.Uncached.
// - The download completed, but there was a problem writing out the cache
// files (header, body, checksum). This is likely a very rare occurrence.
// In that case, any previous cache files are left untouched by cache.write,
// and all we do is log the error. If the cache is inconsistent, it will
// repair itself on next invocation, so it's not a big deal.
log.Warn("Failed to write download cache", lga.Dir, dl.cache.dir, lga.Err, err)
lg.WarnIfCloseError(log, lgm.CloseHTTPResponseBody, resp.Body)
return ""
}
lg.WarnIfCloseError(log, lgm.CloseHTTPResponseBody, resp.Body)
return fpBody
}
// It's not cacheable, so we can just wrap resp.Body in a streamcache
// and return it.
dl.dlStream = streamcache.New(resp.Body)
resp.Body = nil // Unnecessary, but just to be explicit.
h.Uncached(dl.dlStream)
return ""
}
// do executes the request.
func (dl *Downloader) do(req *http.Request) (*http.Response, error) {
ctx := req.Context()
log := lg.FromContext(ctx)
bar := progress.FromContext(ctx).NewWaiter(dl.name+": start download", true)
start := time.Now()
resp, err := dl.c.Do(req)
logResp(log, req, resp, time.Since(start), err)
bar.Stop()
if err != nil {
// Download timeout errors are typically wrapped in an url.Error, resulting
// in a message like:
//
// Get "https://example.com": http response header not received within 1ms timeout
//
// We want to trim off that `GET "URL"` prefix, but we only do that if
// there's a wrapped error beneath (which should be the case).
if errz.Has[*url.Error](err) && errors.Is(err, context.DeadlineExceeded) {
if e := errors.Unwrap(err); e != nil {
err = e
}
}
return nil, err
}
if resp.Body != nil && resp.Body != http.NoBody {
r := progress.NewReader(req.Context(), dl.name+": download", resp.ContentLength, resp.Body)
resp.Body, _ = r.(io.ReadCloser)
}
return resp, nil
}
// mustRequest creates a new request from dl.url. The url has already been
// parsed in download.New, so it's safe to ignore the error.
func (dl *Downloader) mustRequest(ctx context.Context) *http.Request {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, dl.url, nil)
if err != nil {
lg.FromContext(ctx).Error("Failed to create request", lga.URL, dl.url, lga.Err, err)
panic(err)
}
return req
}
// Clear deletes the cache.
func (dl *Downloader) Clear(ctx context.Context) error {
dl.mu.Lock()
defer dl.mu.Unlock()
if dl.cache == nil {
return nil
}
return dl.cache.clear(ctx)
}
// State returns the Downloader's cache state.
func (dl *Downloader) State(ctx context.Context) State {
dl.mu.Lock()
defer dl.mu.Unlock()
return dl.state(dl.mustRequest(ctx))
}
func (dl *Downloader) state(req *http.Request) State {
if !dl.isCacheable(req) {
return Uncached
}
ctx := req.Context()
log := lg.FromContext(ctx)
if !dl.cache.exists(req) {
return Uncached
}
fpHeader, _, _ := dl.cache.paths(req)
f, err := os.Open(fpHeader)
if err != nil {
log.Error(msgCloseCacheHeaderFile, lga.File, fpHeader, lga.Err, err)
return Uncached
}
defer lg.WarnIfCloseError(log, msgCloseCacheHeaderFile, f)
cachedResp, err := httpz.ReadResponseHeader(bufio.NewReader(f), nil) //nolint:bodyclose
if err != nil {
log.Error("Failed to read cached response header", lga.Err, err)
return Uncached
}
return getFreshness(cachedResp.Header, req.Header)
}
// Filesize returns the size of the downloaded file. This should
// only be invoked after the download has completed or is cached,
// as it may block until the download completes.
func (dl *Downloader) Filesize(ctx context.Context) (int64, error) {
dl.mu.Lock()
defer dl.mu.Unlock()
if dl.dlStream != nil {
// There's an active download, so we can get the filesize
// when the download completes.
size, err := dl.dlStream.Total(ctx)
return int64(size), err
}
if dl.cache == nil {
return 0, errz.New("download file size not available")
}
req := dl.mustRequest(ctx)
if !dl.cache.exists(req) {
// It's not in the cache.
return 0, errz.New("download file size not available")
}
// It's in the cache.
_, fp, _ := dl.cache.paths(req)
fi, err := os.Stat(fp)
if err != nil {
return 0, errz.Wrapf(err, "unable to stat cached download file: %s", fp)
}
return fi.Size(), nil
}
// CacheFile returns the path to the cached file, if it exists and has
// been fully downloaded.
func (dl *Downloader) CacheFile(ctx context.Context) (fp string, err error) {
dl.mu.Lock()
defer dl.mu.Unlock()
if dl.cache == nil {
return "", errz.Errorf("cache doesn't exist for: %s", dl.url)
}
req := dl.mustRequest(ctx)
if !dl.cache.exists(req) {
return "", errz.Errorf("no cache for: %s", dl.url)
}
_, fp, _ = dl.cache.paths(req)
return fp, nil
}
// cacheFileOnError returns the path to the cached file, if it exists,
// and is allowed to be returned on a refresh error. If not, empty
// string is returned.
func (dl *Downloader) cacheFileOnError(req *http.Request, err error) (fp string) {
if req == nil {
return ""
}
if dl.cache == nil {
return ""
}
if !dl.continueOnError {
return ""
}
if !dl.cache.exists(req) {
return ""
}
_, fp, _ = dl.cache.paths(req)
lg.FromContext(req.Context()).Warn("Returning possibly stale cached response due to download refresh error",
lga.Err, err, lga.Path, fp)
return fp
}
// Checksum returns the checksum of the cached download, if available.
func (dl *Downloader) Checksum(ctx context.Context) (sum checksum.Checksum, ok bool) {
dl.mu.Lock()
defer dl.mu.Unlock()
if dl.cache == nil {
return "", false
}
req := dl.mustRequest(ctx)
return dl.cache.cachedChecksum(req)
}
func (dl *Downloader) isCacheable(req *http.Request) bool {
if dl.cache == nil {
return false
}
return (req.Method == http.MethodGet || req.Method == http.MethodHead) && req.Header.Get("range") == ""
}