sapling/edenscm/hgext/hgsubversion/svnexternals.py
Kostia Balytskyi fb54cc5694 hgsubversion: move to absolute import
Summary:
Let's move hgsubversion to absolute_import, just to be consistent with the rest
of Mercurial codebase.

Reviewed By: markbt

Differential Revision: D15392154

fbshipit-source-id: e4c32939aff0616790828da508f3feea158669e1
2019-05-21 09:15:21 -07:00

472 lines
15 KiB
Python

# no-check-code -- see T24862348
from __future__ import absolute_import
import cStringIO
import os
import re
import shutil
import stat
import subprocess
from edenscm.mercurial import util as hgutil
from edenscm.mercurial.i18n import _
from . import util
try:
from edenscm.mercurial import scmutil
canonpath = scmutil.canonpath
except (ImportError, AttributeError):
from edenscm.mercurial import pathutil
canonpath = pathutil.canonpath
class externalsfile(dict):
"""Map svn directories to lists of externals entries.
"""
def __init__(self):
super(externalsfile, self).__init__()
self.encoding = "utf-8"
def __setitem__(self, key, value):
if value is None:
value = []
elif isinstance(value, basestring): # noqa: F821
value = value.splitlines()
if key == ".":
key = ""
if not value:
if key in self:
del self[key]
else:
super(externalsfile, self).__setitem__(key, value)
def write(self):
fp = cStringIO.StringIO()
for target in sorted(self):
lines = self[target]
if not lines:
continue
if not target:
target = "."
fp.write("[%s]\n" % target)
for l in lines:
l = " " + l + "\n"
fp.write(l)
return fp.getvalue()
def read(self, data):
self.clear()
fp = cStringIO.StringIO(data)
target = None
for line in fp.readlines():
if not line.strip():
continue
if line.startswith("["):
line = line.strip()
if line[-1] != "]":
raise hgutil.Abort("invalid externals section name: %s" % line)
target = line[1:-1]
if target == ".":
target = ""
elif line.startswith(" "):
line = line.rstrip("\n")
if target is None or not line:
continue
self.setdefault(target, []).append(line[1:])
def diff(ext1, ext2):
"""Compare 2 externalsfile and return a list of tuples like (dir,
value1, value2) where value1 is the external value in ext1 for dir
or None, and value2 the same in ext2.
"""
changes = []
for d in ext1:
if d not in ext2:
changes.append((d, "\n".join(ext1[d]), None))
elif ext1[d] != ext2[d]:
changes.append((d, "\n".join(ext1[d]), "\n".join(ext2[d])))
for d in ext2:
if d not in ext1:
changes.append((d, None, "\n".join(ext2[d])))
return changes
class BadDefinition(Exception):
pass
re_defold = re.compile(r"^\s*(.*?)\s+(?:-r\s*(\d+|\{REV\})\s+)?([a-zA-Z+]+://.*)\s*$")
re_defnew = re.compile(
r"^\s*(?:-r\s*(\d+|\{REV\})\s+)?((?:[a-zA-Z+]+://|\^/)\S*)\s+(\S+)\s*$"
)
re_scheme = re.compile(r"^[a-zA-Z+]+://")
def parsedefinition(line):
"""Parse an external definition line, return a tuple (path, rev, source)
or raise BadDefinition.
"""
# The parsing is probably not correct wrt path with whitespaces or
# potential quotes. svn documentation is not really talkative about
# these either.
pegrev, revgroup = None, 1
m = re_defnew.search(line)
if m:
rev, source, path = m.group(1, 2, 3)
if "@" in source:
source, pegrev = source.rsplit("@", 1)
else:
m = re_defold.search(line)
if not m:
raise BadDefinition()
revgroup = 2
path, rev, source = m.group(1, 2, 3)
try:
int(rev) # ensure revision is int()able, so we bail otherwise
norevline = line[: m.start(revgroup)] + "{REV}" + line[m.end(revgroup) :]
except (TypeError, ValueError):
norevline = line
return (path, rev, source, pegrev, norevline)
class RelativeSourceError(Exception):
pass
def resolvedots(url):
"""
Fix references that include .. entries.
Scans a URL for .. type entries and resolves them but will not allow any
number of ..s to take us out of domain so http://.. will raise an exception.
Tests, (Don't know how to construct a round trip for this so doctest):
>>> # Relative URL within servers svn area
>>> resolvedots(
... "http://some.svn.server/svn/some_repo/../other_repo")
'http://some.svn.server/svn/other_repo'
>>> # Complex One
>>> resolvedots(
... "http://some.svn.server/svn/repo/../other/repo/../../other_repo")
'http://some.svn.server/svn/other_repo'
>>> # Another Complex One
>>> resolvedots(
... "http://some.svn.server/svn/repo/dir/subdir/../../../other_repo/dir")
'http://some.svn.server/svn/other_repo/dir'
>>> # Last Complex One - SVN Allows this & seen it used even if it is BAD!
>>> resolvedots(
... "http://svn.server/svn/my_repo/dir/subdir/../../other_dir")
'http://svn.server/svn/my_repo/other_dir'
>>> # Outside the SVN Area might be OK
>>> resolvedots(
... "http://svn.server/svn/some_repo/../../other_svn_repo")
'http://svn.server/other_svn_repo'
>>> # Complex One
>>> resolvedots(
... "http://some.svn.server/svn/repo/../other/repo/../../other_repo")
'http://some.svn.server/svn/other_repo'
>>> # On another server is not a relative URL should give an exception
>>> resolvedots(
... "http://some.svn.server/svn/some_repo/../../../other_server")
Traceback (most recent call last):
...
RelativeSourceError: Relative URL cannot be to another server
"""
orig = url.split("/")
fixed = []
for item in orig:
if item != "..":
fixed.append(item)
elif len(fixed) > 3: # Don't allow things to go out of domain
fixed.pop()
else:
raise RelativeSourceError("Relative URL cannot be to another server")
return "/".join(fixed)
def resolvesource(ui, svnroot, source):
""" Resolve the source as either matching the scheme re or by resolving
relative URLs which start with ^ and my include relative .. references.
>>> root = 'http://some.svn.server/svn/some_repo'
>>> resolvesource(None, root, 'http://other.svn.server')
'http://other.svn.server'
>>> resolvesource(None, root, 'ssh://other.svn.server')
'ssh://other.svn.server'
>>> resolvesource(None, root, '^/other_repo')
'http://some.svn.server/svn/some_repo/other_repo'
>>> resolvesource(None, root, '^/sub_repo')
'http://some.svn.server/svn/some_repo/sub_repo'
>>> resolvesource(None, root, '^/../other_repo')
'http://some.svn.server/svn/other_repo'
>>> resolvesource(None, root, '^/../../../server/other_repo')
Traceback (most recent call last):
...
RelativeSourceError: Relative URL cannot be to another server
"""
if re_scheme.search(source):
return source
if source.startswith("^/"):
if svnroot is None:
raise RelativeSourceError()
return resolvedots(svnroot + source[1:])
ui.warn(_("ignoring unsupported non-fully qualified external: %r\n" % source))
return None
def parsedefinitions(ui, repo, svnroot, exts):
"""Return (targetdir, revision, source) tuples. Fail if nested
targetdirs are detected. source is an svn project URL.
"""
defs = []
for base in sorted(exts):
for line in exts[base]:
if not line.strip() or line.lstrip().startswith("#"):
# Ignore comments and blank lines
continue
try:
path, rev, source, pegrev, norevline = parsedefinition(line)
except BadDefinition:
ui.warn(_("ignoring invalid external definition: %r\n" % line))
continue
source = resolvesource(ui, svnroot, source)
if source is None:
continue
wpath = hgutil.pconvert(os.path.join(base, path))
wpath = canonpath(repo.root, "", wpath)
defs.append((wpath, rev, source, pegrev, norevline, base))
# Check target dirs are not nested
defs.sort()
for i, d in enumerate(defs):
for d2 in defs[i + 1 :]:
if d2[0].startswith(d[0] + "/"):
raise hgutil.Abort(
_("external directories cannot nest:\n%s\n%s") % (d[0], d2[0])
)
return defs
def computeactions(ui, repo, svnroot, ext1, ext2):
def listdefs(data):
defs = {}
exts = externalsfile()
exts.read(data)
for d in parsedefinitions(ui, repo, svnroot, exts):
defs[d[0]] = d
return defs
ext1 = listdefs(ext1)
ext2 = listdefs(ext2)
for wp1 in ext1:
if wp1 in ext2:
yield "u", ext2[wp1]
else:
yield "d", ext1[wp1]
for wp2 in ext2:
if wp2 not in ext1:
yield "u", ext2[wp2]
def getsvninfo(svnurl):
"""Return a tuple (url, root) for supplied svn URL or working
directory path.
"""
# Yes, this is ugly, but good enough for now
args = ["svn", "info", "--xml", svnurl]
shell = os.name == "nt"
p = subprocess.Popen(
args, shell=shell, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
stdout = p.communicate()[0]
if p.returncode:
raise hgutil.Abort(_("cannot get information about %s") % svnurl)
m = re.search(r"<root>(.*)</root>", stdout, re.S)
if not m:
raise hgutil.Abort(_("cannot find SVN repository root from %s") % svnurl)
root = m.group(1).rstrip("/")
m = re.search(r"<url>(.*)</url>", stdout, re.S)
if not m:
raise hgutil.Abort(_("cannot find SVN repository URL from %s") % svnurl)
url = m.group(1)
m = re.search(r'<entry[^>]+revision="([^"]+)"', stdout, re.S)
if not m:
raise hgutil.Abort(_("cannot find SVN revision from %s") % svnurl)
rev = m.group(1)
return url, root, rev
class externalsupdater:
def __init__(self, ui, repo):
self.repo = repo
self.ui = ui
def update(self, wpath, rev, source, pegrev):
path = self.repo.wvfs.join(wpath)
revspec = []
if rev:
revspec = ["-r", rev]
if os.path.isdir(path):
exturl, _extroot, extrev = getsvninfo(path)
# Comparing the source paths is not enough, but I don't
# know how to compare path+pegrev. The following update
# might fail if the path was replaced by another unrelated
# one. It can be fixed manually by deleting the externals
# and updating again.
if source == exturl:
if extrev != rev:
self.ui.status(
_("updating external on %s@%s\n")
% (wpath, rev or pegrev or "HEAD")
)
cwd = os.path.join(self.repo.root, path)
self.svn(["update"] + revspec, cwd)
return
self.delete(wpath)
cwd, dest = os.path.split(path)
cwd = os.path.join(self.repo.root, cwd)
if not os.path.isdir(cwd):
os.makedirs(cwd)
if not pegrev and rev:
pegrev = rev
if pegrev:
source = "%s@%s" % (source, pegrev)
self.ui.status(
_("fetching external %s@%s\n") % (wpath, rev or pegrev or "HEAD")
)
self.svn(["co"] + revspec + [source, dest], cwd)
def delete(self, wpath):
path = self.repo.wvfs.join(wpath)
if os.path.isdir(path):
self.ui.status(_("removing external %s\n") % wpath)
def onerror(function, path, excinfo):
if function is not os.remove:
raise
# read-only files cannot be unlinked under Windows
s = os.stat(path)
if (s.st_mode & stat.S_IWRITE) != 0:
raise
os.chmod(path, stat.S_IMODE(s.st_mode) | stat.S_IWRITE)
os.remove(path)
shutil.rmtree(path, onerror=onerror)
return 1
def svn(self, args, cwd):
args = ["svn"] + args
self.ui.note(_("updating externals: %r, cwd=%s\n") % (args, cwd))
shell = os.name == "nt"
p = subprocess.Popen(
args, cwd=cwd, shell=shell, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
for line in p.stdout:
self.ui.debug(line)
p.wait()
if p.returncode != 0:
raise hgutil.Abort("subprocess '%s' failed" % " ".join(args))
def updateexternals(ui, args, repo, **opts):
"""update repository externals
"""
if len(args) > 2:
raise hgutil.Abort(_("updateexternals expects at most one changeset"))
node = None
if len(args) == 2:
svnurl = util.normalize_url(repo.ui.expandpath(args[0]))
args = args[1:]
else:
svnurl = util.normalize_url(repo.ui.expandpath("default"))
if args:
node = args[0]
svnroot = getsvninfo(svnurl)[1]
# Retrieve current externals status
try:
oldext = file(repo.sharedvfs.join("svn/externals"), "rb").read() # noqa: F821
except IOError:
oldext = ""
newext = ""
ctx = repo[node]
if ".hgsvnexternals" in ctx:
newext = ctx[".hgsvnexternals"].data()
updater = externalsupdater(ui, repo)
actions = computeactions(ui, repo, svnroot, oldext, newext)
for action, ext in actions:
if action == "u":
updater.update(ext[0], ext[1], ext[2], ext[3])
elif action == "d":
updater.delete(ext[0])
else:
raise hgutil.Abort(_("unknown update actions: %r") % action)
file(repo.sharedvfs.join("svn/externals"), "wb").write(newext) # noqa: F821
def getchanges(ui, repo, parentctx, exts):
"""Take a parent changectx and the new externals definitions as an
externalsfile and return a dictionary mapping the special file
hgsubversion needs for externals bookkeeping, to their new content
as raw bytes or None if the file has to be removed.
"""
mode = ui.config("hgsubversion", "externals", "svnexternals")
if mode == "svnexternals":
files = {".hgsvnexternals": None}
if exts:
files[".hgsvnexternals"] = exts.write()
elif mode == "subrepos":
raise hgutil.Abort(_("subrepos mode is no longer supported"))
elif mode == "ignore":
files = {}
else:
raise hgutil.Abort(_("unknown externals modes: %s") % mode)
# Should the really be updated?
updates = {}
for fn, data in files.iteritems():
if data is not None:
if fn not in parentctx or parentctx[fn].data() != data:
updates[fn] = data
else:
if fn in parentctx:
updates[fn] = None
return updates
def parse(ui, ctx):
"""Return the externals definitions stored in ctx as a (possibly empty)
externalsfile().
"""
external = externalsfile()
mode = ui.config("hgsubversion", "externals", "svnexternals")
if mode == "svnexternals":
if ".hgsvnexternals" in ctx:
external.read(ctx[".hgsvnexternals"].data())
elif mode == "subrepos":
raise hgutil.Abort(_("subrepos mode is no longer supported"))
elif mode == "ignore":
pass
else:
raise hgutil.Abort(_("unknown externals modes: %s") % mode)
return external
_notset = object()
if __name__ == "__main__":
import doctest
doctest.testmod()