sq/libsq/files/cache.go
Neil O'Toole 9a0b9b7a9c
Ingest checksum issue (#378)
- Refactor `Downloader.Get`.
2024-01-28 23:55:25 -07:00

474 lines
14 KiB
Go

package files
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"time"
"github.com/neilotoole/sq/libsq/core/errz"
"github.com/neilotoole/sq/libsq/core/ioz"
"github.com/neilotoole/sq/libsq/core/ioz/checksum"
"github.com/neilotoole/sq/libsq/core/ioz/lockfile"
"github.com/neilotoole/sq/libsq/core/lg"
"github.com/neilotoole/sq/libsq/core/lg/lga"
"github.com/neilotoole/sq/libsq/core/options"
"github.com/neilotoole/sq/libsq/core/progress"
"github.com/neilotoole/sq/libsq/core/stringz"
"github.com/neilotoole/sq/libsq/source"
"github.com/neilotoole/sq/libsq/source/drivertype"
"github.com/neilotoole/sq/libsq/source/location"
)
// OptCacheLockTimeout is the time allowed to acquire a cache lock.
//
// See also: driver.OptIngestCache.
var OptCacheLockTimeout = options.NewDuration(
"cache.lock.timeout",
"",
0,
time.Second*5,
"Wait timeout to acquire cache lock",
`Wait timeout to acquire cache lock. During this period, retry will occur
if the lock is already held by another process. If zero, no retry occurs.`,
)
// CacheDir returns the cache dir. It is not guaranteed that the
// returned dir exists.
func (fs *Files) CacheDir() string {
return fs.cacheDir
}
// TempDir returns the temp dir. It is not guaranteed that the
// returned dir exists.
func (fs *Files) TempDir() string {
return fs.tempDir
}
// CacheDirFor gets the cache dir for handle. It is not guaranteed
// that the returned dir exists or is accessible.
func (fs *Files) CacheDirFor(src *source.Source) (dir string, err error) {
handle := src.Handle
if err = source.ValidHandle(handle); err != nil {
return "", errz.Wrapf(err, "cache dir: invalid handle: %s", handle)
}
if handle == source.StdinHandle {
// stdin is different input every time, so we need a unique
// cache dir. In practice, stdin probably isn't using this function.
handle += "_" + stringz.UniqN(32)
}
dir = filepath.Join(
fs.cacheDir,
"sources",
filepath.Join(strings.Split(strings.TrimPrefix(handle, "@"), "/")...),
fs.sourceHash(src),
)
return dir, nil
}
// WriteIngestChecksum is invoked (after successful ingestion) to write the
// checksum of the source document file vs the ingest DB. Thus, if the source
// document changes, the checksum will no longer match, and the ingest DB
// will be considered invalid.
func (fs *Files) WriteIngestChecksum(ctx context.Context, src, backingSrc *source.Source) (err error) {
fs.mu.Lock()
defer fs.mu.Unlock()
log := lg.FromContext(ctx)
ingestFilePath, err := fs.filepath(src)
if err != nil {
return err
}
if location.TypeOf(src.Location) == location.TypeHTTP {
// If the source is remote, check if there was a download,
// and if so, make sure it's completed.
stream, ok := fs.streams[src.Handle]
if ok {
select {
case <-stream.Filled():
case <-stream.Done():
case <-ctx.Done():
return ctx.Err()
}
if err = stream.Err(); err != nil && !errors.Is(err, io.EOF) {
return err
}
}
// If we got this far, either there's no stream, or the stream is done,
// which means that the download cache has been updated, and contains
// the fresh cached body file that we'll use to calculate the checksum.
// So, we'll go ahead and do the checksum stuff below.
}
// Now, we need to write a checksum file that contains the computed checksum
// value from ingestFilePath.
var sum checksum.Checksum
if sum, err = checksum.ForFile(ingestFilePath); err != nil {
log.Warn("Failed to compute checksum for source file; caching not in effect",
lga.Src, src, lga.Dest, backingSrc, lga.Path, ingestFilePath, lga.Err, err)
return err
}
var checksumsPath string
if _, _, checksumsPath, err = fs.CachePaths(src); err != nil {
return err
}
if err = checksum.WriteFile(checksumsPath, sum, ingestFilePath); err != nil {
log.Warn("Failed to write checksum; file caching not in effect",
lga.Src, src, lga.Dest, backingSrc, lga.Path, ingestFilePath, lga.Err, err)
}
return err
}
// CachedBackingSourceFor returns the underlying backing source for src, if
// it exists. If it does not exist, ok returns false.
func (fs *Files) CachedBackingSourceFor(ctx context.Context, src *source.Source) (backingSrc *source.Source,
ok bool, err error,
) {
fs.mu.Lock()
defer fs.mu.Unlock()
switch location.TypeOf(src.Location) {
case location.TypeFile:
return fs.cachedBackingSourceForFile(ctx, src)
case location.TypeHTTP:
return fs.cachedBackingSourceForRemoteFile(ctx, src)
default:
return nil, false, errz.Errorf("caching not applicable for source: %s", src.Handle)
}
}
// cachedBackingSourceForFile returns the underlying cached backing
// source for src, if it exists.
func (fs *Files) cachedBackingSourceForFile(ctx context.Context, src *source.Source) (*source.Source, bool, error) {
if _, dlFileExists := fs.downloadedFiles[src.Handle]; !dlFileExists {
// It's NOT the case that the file has just been downloaded.
// So, we check if there's an active download happening...
if _, dlStreamExists := fs.streams[src.Handle]; dlStreamExists {
lg.FromContext(ctx).Debug("Source has download stream, so backing cache DB not valid", lga.Src, src)
return nil, false, nil
}
}
_, cacheDBPath, checksumsPath, err := fs.CachePaths(src)
if err != nil {
return nil, false, err
}
if !ioz.FileAccessible(checksumsPath) {
return nil, false, nil
}
mChecksums, err := checksum.ReadFile(checksumsPath)
if err != nil {
return nil, false, err
}
srcFilepath, err := fs.filepath(src)
if err != nil {
return nil, false, err
}
cachedChecksum, ok := mChecksums[srcFilepath]
if !ok {
return nil, false, nil
}
srcChecksum, err := checksum.ForFile(srcFilepath)
if err != nil {
return nil, false, err
}
if srcChecksum != cachedChecksum {
return nil, false, nil
}
// The checksums match, so we can use the cached DB,
// if it exists.
if !ioz.FileAccessible(cacheDBPath) {
return nil, false, nil
}
backingSrc := &source.Source{
Handle: src.Handle + "_cached",
Location: "sqlite3://" + cacheDBPath,
Type: drivertype.SQLite,
}
lg.FromContext(ctx).Debug("Found cached backing source DB", lga.Src, src, "backing_src", backingSrc)
return backingSrc, true, nil
}
// cachedBackingSourceForRemoteFile returns the underlying cached backing
// source for src, if it exists.
func (fs *Files) cachedBackingSourceForRemoteFile(ctx context.Context, src *source.Source) (*source.Source,
bool, error,
) {
log := lg.FromContext(ctx)
downloadedFile, _, err := fs.maybeStartDownload(ctx, src, true)
if err != nil {
return nil, false, err
}
if downloadedFile == "" {
log.Debug("No cached download file", lga.Src, src)
return nil, false, nil
}
log.Debug("Found cached download file", lga.Src, src, lga.Path, downloadedFile)
return fs.cachedBackingSourceForFile(ctx, src)
}
// CachePaths returns the paths to the cache files for src.
// There is no guarantee that these files exist, or are accessible.
// It's just the paths.
func (fs *Files) CachePaths(src *source.Source) (srcCacheDir, cacheDB, checksums string, err error) {
if srcCacheDir, err = fs.CacheDirFor(src); err != nil {
return "", "", "", err
}
checksums = filepath.Join(srcCacheDir, "checksums.txt")
cacheDB = filepath.Join(srcCacheDir, "cache.sqlite.db")
return srcCacheDir, cacheDB, checksums, nil
}
// sourceHash generates a hash for src. The hash is based on the
// member fields of src, with special handling for src.Options.
// Only the opts that affect data ingestion (options.TagIngestMutate)
// are incorporated in the hash. The generated hash is used to
// determine the cache dir for src. Thus, if a source is mutated
// (e.g. the remote http location changes), a new cache dir results.
func (fs *Files) sourceHash(src *source.Source) string {
if src == nil {
return ""
}
buf := bytes.Buffer{}
buf.WriteString(src.Handle)
buf.WriteString(string(src.Type))
buf.WriteString(src.Location)
buf.WriteString(src.Catalog)
buf.WriteString(src.Schema)
mUsedKeys := make(map[string]any)
if src.Options != nil {
keys := src.Options.Keys()
for _, k := range keys {
opt := fs.optRegistry.Get(k)
switch {
case opt == nil,
!opt.IsSet(src.Options),
!opt.HasTag(options.TagIngestMutate):
continue
default:
}
buf.WriteString(k)
v := src.Options[k]
buf.WriteString(fmt.Sprintf("%v", v))
mUsedKeys[k] = v
}
}
sum := checksum.Sum(buf.Bytes())
return sum
}
// cacheLockFor returns the lock file for src's cache.
func (fs *Files) cacheLockFor(src *source.Source) (lockfile.Lockfile, error) {
cacheDir, err := fs.CacheDirFor(src)
if err != nil {
return "", errz.Wrapf(err, "cache lock for %s", src.Handle)
}
lf, err := lockfile.New(filepath.Join(cacheDir, "pid.lock"))
if err != nil {
return "", errz.Wrapf(err, "cache lock for %s", src.Handle)
}
return lf, nil
}
// CacheLockAcquire acquires the cache lock for src. The caller must invoke
// the returned unlock func.
func (fs *Files) CacheLockAcquire(ctx context.Context, src *source.Source) (unlock func(), err error) {
lock, err := fs.cacheLockFor(src)
if err != nil {
return nil, err
}
lockTimeout := OptCacheLockTimeout.Get(options.FromContext(ctx))
log := lg.FromContext(ctx).With(lga.Src, src, lga.Timeout, lockTimeout, lga.Lock, lock)
log.Debug("Acquiring cache lock for source")
bar := progress.FromContext(ctx).NewTimeoutWaiter(
src.Handle+": acquire lock",
time.Now().Add(lockTimeout),
)
err = lock.Lock(ctx, lockTimeout)
bar.Stop()
if err != nil {
return nil, errz.Wrap(err, src.Handle+": acquire cache lock")
}
return func() {
if err = lock.Unlock(); err != nil {
log.Warn("Failed to release cache lock", lga.Err, err)
}
}, nil
}
// CacheClearAll clears the entire cache dir.
// Note that this operation is distinct from Files.doCacheSweep.
func (fs *Files) CacheClearAll(ctx context.Context) error {
fs.mu.Lock()
defer fs.mu.Unlock()
return fs.doCacheClearAll(ctx)
}
// CacheClearSource clears the ingest cache for src. If arg downloads is true,
// the source's download dir is also cleared. The caller should typically
// first acquire the cache lock for src via Files.cacheLockFor.
func (fs *Files) CacheClearSource(ctx context.Context, src *source.Source, clearDownloads bool) error {
fs.mu.Lock()
defer fs.mu.Unlock()
return fs.doCacheClearSource(ctx, src, clearDownloads)
}
func (fs *Files) doCacheClearSource(ctx context.Context, src *source.Source, clearDownloads bool) error {
cacheDir, err := fs.CacheDirFor(src)
if err != nil {
return err
}
if !ioz.DirExists(cacheDir) {
return nil
}
entries, err := os.ReadDir(cacheDir)
if err != nil {
return errz.Wrapf(err, "%s: clear cache", src.Handle)
}
for _, entry := range entries {
switch entry.Name() {
case "pid.lock":
continue
case "download":
if !clearDownloads {
continue
}
default:
}
if err = os.RemoveAll(filepath.Join(cacheDir, entry.Name())); err != nil {
return errz.Wrapf(err, "%s: clear cache", src.Handle)
}
}
lg.FromContext(ctx).
With("clear_downloads", clearDownloads, lga.Src, src, lga.Dir, cacheDir).
Info("Cleared source cache")
return nil
}
func (fs *Files) doCacheClearAll(ctx context.Context) error {
log := lg.FromContext(ctx).With(lga.Dir, fs.cacheDir)
log.Debug("Clearing cache dir")
if !ioz.DirExists(fs.cacheDir) {
log.Debug("Cache dir does not exist")
return nil
}
// Instead of directly deleting the existing cache dir, we first
// move it to /tmp, and then try to delete it. This should probably
// help with the situation where another sq instance has an open pid
// lock in the cache dir.
tmpDir := DefaultTempDir()
if err := ioz.RequireDir(tmpDir); err != nil {
return errz.Wrap(err, "cache clear")
}
relocateDir := filepath.Join(tmpDir, "dead_cache_"+stringz.Uniq8())
if err := os.Rename(fs.cacheDir, relocateDir); err != nil {
return errz.Wrap(err, "cache clear: relocate")
}
if err := os.RemoveAll(relocateDir); err != nil {
log.Warn("Could not delete relocated cache dir", lga.Path, relocateDir, lga.Err, err)
}
// Recreate the cache dir.
if err := ioz.RequireDir(fs.cacheDir); err != nil {
return errz.Wrap(err, "cache clear")
}
return nil
}
// doCacheSweep sweeps the cache dir, making a best-effort attempt
// to remove any empty directories. Note that this operation is
// distinct from Files.CacheClearAll.
//
// REVISIT: This doesn't really do as much as desired. It should
// also be able to detect orphaned src cache dirs and delete those.
func (fs *Files) doCacheSweep() {
ctx, cancelFn := context.WithTimeout(context.Background(), time.Millisecond*100)
defer cancelFn()
log := fs.log.With(lga.Dir, fs.cacheDir)
if unlock, err := fs.cfgLockFn(ctx); err != nil {
log.Error("Sweep cache dir: failed to acquire config lock", lga.Lock, fs.cfgLockFn, lga.Err, err)
return
} else {
defer unlock()
}
count, err := ioz.PruneEmptyDirTree(ctx, fs.cacheDir)
if err != nil {
log.Warn("Problem sweeping cache dir", lga.Err, err, "deleted_dirs", count)
return
}
log.Info("Swept cache dir", "deleted_dirs", count)
}
// DefaultCacheDir returns the sq cache dir. This is generally
// in USER_CACHE_DIR/*/sq, but could also be in TEMP_DIR/*/sq/cache
// or similar. It is not guaranteed that the returned dir exists
// or is accessible.
func DefaultCacheDir() (dir string) {
var err error
if dir, err = os.UserCacheDir(); err != nil {
// Some systems may not have a user cache dir, so we fall back
// to the system temp dir.
dir = filepath.Join(DefaultTempDir(), "cache")
return dir
}
dir = filepath.Join(dir, "sq")
return dir
}
// DefaultTempDir returns the default sq temp dir. It is not
// guaranteed that the returned dir exists or is accessible.
func DefaultTempDir() (dir string) {
return filepath.Join(os.TempDir(), "sq")
}