# Portions Copyright (c) Meta Platforms, Inc. and affiliates. # # This software may be used and distributed according to the terms of the # GNU General Public License version 2. # archival.py - revision archival for mercurial # # Copyright 2006 Vadim Gelfer # # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. from __future__ import absolute_import import gzip import os import struct import tarfile import time import zipfile import zlib from . import error, formatter, progress, pycompat, util, vfs as vfsmod from .i18n import _ # from unzip source code: _UNX_IFREG = 0x8000 _UNX_IFLNK = 0xA000 def tidyprefix(dest, kind, prefix): """choose prefix to use for names in archive. make sure prefix is safe for consumers.""" if prefix: prefix = util.normpath(prefix) else: if not isinstance(dest, str): raise ValueError("dest must be string if no prefix") prefix = os.path.basename(dest) lower = prefix.lower() for sfx in exts.get(kind, []): if lower.endswith(sfx): prefix = prefix[: -len(sfx)] break lpfx = os.path.normpath(util.localpath(prefix)) prefix = util.pconvert(lpfx) if not prefix.endswith("/"): prefix += "/" # Drop the leading '.' path component if present, so Windows can read the # zip files (issue4634) if prefix.startswith("./"): prefix = prefix[2:] if prefix.startswith("../") or os.path.isabs(lpfx) or "/../" in prefix: raise error.Abort(_("archive prefix contains illegal components")) return prefix exts = { "tar": [".tar"], "tbz2": [".tbz2", ".tar.bz2"], "tgz": [".tgz", ".tar.gz"], "zip": [".zip"], } def guesskind(dest): for kind, extensions in pycompat.iteritems(exts): if any(dest.endswith(ext) for ext in extensions): return kind return None def _rootctx(repo): # repo[0] may be hidden for rev in repo: return repo[rev] return repo["null"] def buildmetadata(ctx): """build content of .sl_archival.txt""" repo = ctx.repo() default = ( r"repo: {root}\n" r'node: {ifcontains(rev, revset("wdir()"),' r'"{p1node}{dirty}", "{node}")}\n' r"branch: {branch|utf8}\n" ) opts = {"template": repo.ui.config("experimental", "archivemetatemplate", default)} out = pycompat.stringutf8io() fm = formatter.formatter(repo.ui, out, "archive", opts) fm.startitem() fm.context(ctx=ctx) fm.data(root=_rootctx(repo).hex()) if ctx.rev() is None: dirty = "" if ctx.dirty(missing=True): dirty = "+" fm.data(dirty=dirty) fm.end() return pycompat.encodeutf8(out.getvalue()) class tarit(object): """write archive to tar file or stream. can write uncompressed, or compress with gzip or bzip2.""" def __init__(self, dest, mtime, kind=""): self.mtime = mtime self.fileobj = None def taropen(mode, name="", fileobj=None): if kind == "gz": mode = mode[0] if not fileobj: fileobj = open(name, mode + "b") gzfileobj = gzip.GzipFile( name, mode + "b", zlib.Z_BEST_COMPRESSION, fileobj, mtime=mtime ) self.fileobj = gzfileobj return tarfile.TarFile.taropen(name, mode, gzfileobj) else: return tarfile.open(name, mode + kind, fileobj) if isinstance(dest, str): self.z = taropen("w:", name=dest) else: self.z = taropen("w|", fileobj=dest) def addfile(self, name, mode, islink, data): i = tarfile.TarInfo(name) i.mtime = self.mtime i.size = len(data) if islink: i.type = tarfile.SYMTYPE i.mode = 0o777 i.linkname = pycompat.decodeutf8(data) data = None i.size = 0 else: i.mode = mode data = pycompat.stringio(data) self.z.addfile(i, data) def done(self): self.z.close() if self.fileobj: self.fileobj.close() class tellable(object): """provide tell method for zipfile.ZipFile when writing to http response file object.""" def __init__(self, fp): self.fp = fp self.offset = 0 def __getattr__(self, key): return getattr(self.fp, key) def write(self, s): self.fp.write(s) self.offset += len(s) def tell(self): return self.offset class zipit(object): """write archive to zip file or stream. can write uncompressed, or compressed with deflate.""" def __init__(self, dest, mtime, compress=True): if not isinstance(dest, str): try: dest.tell() except (AttributeError, IOError): dest = tellable(dest) self.z = zipfile.ZipFile( dest, "w", compress and zipfile.ZIP_DEFLATED or zipfile.ZIP_STORED ) # Python's zipfile module emits deprecation warnings if we try # to store files with a date before 1980. epoch = 315532800 # calendar.timegm((1980, 1, 1, 0, 0, 0, 1, 1, 0)) if mtime < epoch: mtime = epoch self.mtime = mtime self.date_time = time.gmtime(mtime)[:6] def addfile(self, name, mode, islink, data): i = zipfile.ZipInfo(name, self.date_time) i.compress_type = self.z.compression # unzip will not honor unix file modes unless file creator is # set to unix (id 3). i.create_system = 3 ftype = _UNX_IFREG if islink: mode = 0o777 ftype = _UNX_IFLNK i.external_attr = (mode | ftype) << 16 # add "extended-timestamp" extra block, because zip archives # without this will be extracted with unexpected timestamp, # if TZ is not configured as GMT i.extra += struct.pack( "