sapling/tests/test_hgsubversion_util.py

919 lines
28 KiB
Python
Raw Normal View History

# no-check-code -- see T24862348
# flake8: noqa
import difflib
import errno
2009-09-16 06:34:53 +04:00
import gettext
import imp
2008-10-08 03:42:43 +04:00
import os
import shutil
import stat
2009-09-16 06:34:53 +04:00
import subprocess
import sys
import tarfile
import tempfile
import unittest
2008-10-08 03:42:43 +04:00
from hgext.hgsubversion import compathacks, svnrepo, svnwrap, util
from mercurial import (
cmdutil,
commands,
context,
dispatch as dispatchmod,
encoding,
extensions,
hg,
i18n,
node,
scmutil,
ui,
util as hgutil,
)
2014-06-03 04:54:02 +04:00
try:
from mercurial import obsolete
obsolete._enabled
except ImportError:
obsolete = None
try:
SkipTest = unittest.SkipTest
except AttributeError:
if "nose" in sys.modules:
SkipTest = sys.modules["nose"].SkipTest
else:
SkipTest = None
# Documentation for Subprocess.Popen() says:
# "Note that on Windows, you cannot set close_fds to true and
# also redirect the standard handles by setting stdin, stdout or
# stderr."
canCloseFds = "win32" not in sys.platform
if not "win32" in sys.platform:
def kill_process(popen_obj):
os.kill(popen_obj.pid, 9)
else:
import ctypes
2009-08-13 00:54:21 +04:00
from ctypes.wintypes import BOOL, DWORD, HANDLE, UINT
def win_status_check(result, func, args):
if result == 0:
raise ctypes.WinError()
return args
def WINAPI(returns, func, *params):
assert len(params) % 2 == 0
func.argtypes = tuple(params[0::2])
func.resvalue = returns
func.errcheck = win_status_check
return func
# dwDesiredAccess
PROCESS_TERMINATE = 0x0001
OpenProcess = WINAPI(
HANDLE,
ctypes.windll.kernel32.OpenProcess,
DWORD,
"dwDesiredAccess",
BOOL,
"bInheritHandle",
DWORD,
"dwProcessId",
2009-08-13 00:54:21 +04:00
)
CloseHandle = WINAPI(BOOL, ctypes.windll.kernel32.CloseHandle, HANDLE, "hObject")
TerminateProcess = WINAPI(
BOOL,
ctypes.windll.kernel32.TerminateProcess,
HANDLE,
"hProcess",
UINT,
"uExitCode",
2009-08-13 00:54:21 +04:00
)
def kill_process(popen_obj):
phnd = OpenProcess(PROCESS_TERMINATE, False, popen_obj.pid)
TerminateProcess(phnd, 1)
CloseHandle(phnd)
# Fixtures that need to be pulled at a subdirectory of the repo path
subdir = {
"truncatedhistory.svndump": "/project2",
"fetch_missing_files_subdir.svndump": "/foo",
"empty_dir_in_trunk_not_repo_root.svndump": "/project",
"project_root_not_repo_root.svndump": "/dummyproj",
"project_name_with_space.svndump": "/project name",
"non_ascii_path_1.svndump": "/b\xC3\xB8b",
"non_ascii_path_2.svndump": "/b%C3%B8b",
"subdir_is_file_prefix.svndump": "/flaf",
"renames_with_prefix.svndump": "/prefix",
}
# map defining the layouts of the fixtures we can use with custom layout
# these are really popular layouts, so I gave them names
trunk_only = {"default": "trunk"}
trunk_dev_branch = {"default": "trunk", "dev_branch": "branches/dev_branch"}
custom = {
"addspecial.svndump": {"default": "trunk", "foo": "branches/foo"},
"binaryfiles.svndump": trunk_only,
"branch_create_with_dir_delete.svndump": trunk_dev_branch,
"branch_delete_parent_dir.svndump": trunk_dev_branch,
"branchmap.svndump": {
"default": "trunk",
"badname": "branches/badname",
"feature": "branches/feature",
},
"branch_prop_edit.svndump": trunk_dev_branch,
"branch_rename_to_trunk.svndump": {
"default": "trunk",
"dev_branch": "branches/dev_branch",
"old_trunk": "branches/old_trunk",
},
"copies.svndump": trunk_only,
"copyafterclose.svndump": {"default": "trunk", "test": "branches/test"},
"copybeforeclose.svndump": {"default": "trunk", "test": "branches/test"},
"delentries.svndump": trunk_only,
"delete_restore_trunk.svndump": trunk_only,
"empty_dir_in_trunk_not_repo_root.svndump": trunk_only,
"executebit.svndump": trunk_only,
"filecase.svndump": trunk_only,
"file_not_in_trunk_root.svndump": trunk_only,
"project_name_with_space.svndump": trunk_dev_branch,
"pushrenames.svndump": trunk_only,
"rename_branch_parent_dir.svndump": trunk_dev_branch,
"renamedproject.svndump": {"default": "trunk", "branch": "branches/branch"},
"renames.svndump": {"default": "trunk", "branch1": "branches/branch1"},
"renames_with_prefix.svndump": {"default": "trunk", "branch1": "branches/branch1"},
"replace_branch_with_branch.svndump": {
"default": "trunk",
"branch1": "branches/branch1",
"branch2": "branches/branch2",
},
"replace_trunk_with_branch.svndump": {"default": "trunk", "test": "branches/test"},
"revert.svndump": trunk_only,
"siblingbranchfix.svndump": {
"default": "trunk",
"wrongbranch": "branches/wrongbranch",
},
"simple_branch.svndump": {"default": "trunk", "the_branch": "branches/the_branch"},
"spaces-in-path.svndump": trunk_dev_branch,
"symlinks.svndump": trunk_only,
"truncatedhistory.svndump": trunk_only,
"unorderedbranch.svndump": {"default": "trunk", "branch": "branches/branch"},
"unrelatedbranch.svndump": {
"default": "trunk",
"branch1": "branches/branch1",
"branch2": "branches/branch2",
},
}
FIXTURES = os.path.join(os.path.abspath(os.path.dirname(__file__)), "fixtures")
2008-10-08 03:42:43 +04:00
def getlocalpeer(repo):
localrepo = getattr(repo, "local", lambda: repo)()
if isinstance(localrepo, bool):
localrepo = repo
return localrepo
def repolen(repo, svnonly=False):
"""Naively calculate the amount of available revisions in a repository.
this is usually equal to len(repo) -- except in the face of
obsolete revisions.
if svnonly is true, only count revisions converted from Subversion.
"""
# kind of nasty way of calculating the length, but fortunately,
# our test repositories tend to be rather small
revs = set(repo)
if obsolete:
revs -= obsolete.getrevs(repo, "obsolete")
if svnonly:
revs = set(r for r in revs if util.getsvnrev(repo[r]))
return len(revs)
def _makeskip(name, message):
if SkipTest:
def skip(*args, **kwargs):
raise SkipTest(message)
skip.__name__ = name
return skip
def requiresmodule(mod):
"""Skip a test if the specified module is not None."""
def decorator(fn):
if fn is None:
return
if mod is not None:
return fn
return _makeskip(fn.__name__, "missing required feature")
return decorator
def requiresoption(option):
"""Skip a test if commands.clone does not take the specified option."""
def decorator(fn):
for entry in cmdutil.findcmd("clone", commands.table)[1][1]:
if entry[1] == option:
return fn
# no match found, so skip
if SkipTest:
return _makeskip(fn.__name__, "test requires clone to accept %s" % option)
# no skipping support, so erase decorated method
return
if not isinstance(option, str):
raise TypeError("requiresoption takes a string argument")
return decorator
def requiresreplay(method):
"""Skip a test in stupid mode."""
def test(self, *args, **kwargs):
if self.stupid:
if SkipTest:
raise SkipTest("test requires replay mode")
else:
return method(self, *args, **kwargs)
test.__name__ = method.__name__
return test
2009-10-17 07:33:41 +04:00
def filtermanifest(manifest):
return [f for f in manifest if f not in util.ignoredfiles]
2009-10-17 07:33:41 +04:00
def fileurl(path):
path = os.path.abspath(path).replace(os.sep, "/")
drive, path = os.path.splitdrive(path)
if drive:
# In svn 1.7, the swig svn wrapper returns local svn URLs
# with an uppercase drive letter, try to match that to
# simplify svn info tests.
drive = "/" + drive.upper()
url = "file://%s%s" % (drive, path)
return url
def _execute_and_get_env(script_path, env_vars):
"""Executes the shell script located at the path `script_path` and returns a
dictionary of the environment variables specified by `env_vars` after the
execution."""
cmd = ["source", script_path]
for var in env_vars:
cmd.extend(["&&", "export", var])
cmd.extend([";", "env"])
p = subprocess.Popen(
" ".join(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True
)
out, err = p.communicate()
ret = p.returncode
if ret == 0:
return dict(
line.split("=", 1)
for line in out.splitlines()
if line.startswith(tuple(env_vars))
)
elif ret == 80:
# 80 is the return code for skipped tests from the shell scripts.
raise SkipTest(out)
else:
stderr = sys.stderr
stderr.write(err)
stderr.flush()
sys.exit(ret)
class TestDb(object):
_TEST_DIR = os.environ["TESTDIR"]
_LIBRARY_SH = os.path.join(_TEST_DIR, "hgsql", "library.sh")
_DBENGINE = "DBENGINE"
_DBHOST = "DBHOST"
_DBNAME = "DBNAME"
_DBPASS = "DBPASS"
_DBPORT = "DBPORT"
_DBUSER = "DBUSER"
def __init__(self):
self._init_config()
def _init_config(self):
env_vars = [
self._DBENGINE,
self._DBHOST,
self._DBNAME,
self._DBPASS,
self._DBPORT,
self._DBUSER,
]
self._env = _execute_and_get_env(self._LIBRARY_SH, env_vars)
@property
def engine(self):
return self._env[self._DBENGINE]
@property
def host(self):
return self._env[self._DBHOST]
@property
def name(self):
return self._env[self._DBNAME]
@property
def password(self):
return self._env[self._DBPASS]
@property
def port(self):
return self._env[self._DBPORT]
@property
def user(self):
return self._env[self._DBUSER]
class _testui(ui.ui):
def develwarn(self, msg, stacklevel=1, *args, **kwargs):
from hgext.hgsubversion import util
if util.smartset is not None:
config = args[0] if args else kwargs.get("config")
raise Exception("flunked develwarn: %r (%r)" % (msg, config))
return ui.ui.develwarn(self, msg, stacklevel=stacklevel, *args, **kwargs)
def testui(stupid=False, layout="auto", startrev=0):
encoding.environ["HGPLAIN"] = "True"
u = _testui()
bools = {True: "true", False: "false"}
u.setconfig("ui", "quiet", bools[True])
u.setconfig("ui", "username", "automated tests")
u.setconfig("extensions", "hgsubversion", "")
u.setconfig("hgsubversion", "layout", layout)
u.setconfig("hgsubversion", "nativerevs", True)
u.setconfig("hgsubversion", "startrev", startrev)
u.setconfig("hgsubversion", "stupid", bools[stupid])
u.setconfig("devel", "all-warnings", True)
u.setconfig("subrepos", "hgsubversion:allowed", True)
return u
def dispatch(cmd, ui=None, repo=None):
assert "--quiet" in cmd
cmd = getattr(dispatchmod, "request", lambda x: x)(cmd, ui=ui, repo=repo)
return dispatchmod.dispatch(cmd)
def rmtree(path):
# Read-only files cannot be removed under Windows
for root, dirs, files in os.walk(path):
for f in files:
f = os.path.join(root, f)
try:
s = os.stat(f)
except OSError as e:
if e.errno == errno.ENOENT:
continue
raise
if (s.st_mode & stat.S_IWRITE) == 0:
os.chmod(f, s.st_mode | stat.S_IWRITE)
shutil.rmtree(path)
def hgclone(ui, source, dest, update=True, rev=None):
if getattr(hg, "peer", None):
# Since 1.9 (d976542986d2)
src, dest = hg.clone(ui, {}, source, dest, update=update, rev=rev)
else:
src, dest = hg.clone(ui, source, dest, update=update, rev=rev)
return src, dest
def svnls(repo_path, path, rev="HEAD"):
path = repo_path + "/" + path
path = util.normalize_url(fileurl(path))
args = ["svn", "ls", "-r", rev, "-R", path]
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdout, stderr = p.communicate()
if p.returncode:
raise Exception("svn ls failed on %s: %r" % (path, stderr))
entries = [e.strip("/") for e in stdout.splitlines()]
entries.sort()
return entries
def svnpropget(repo_path, path, prop, rev="HEAD"):
path = repo_path + "/" + path
path = util.normalize_url(fileurl(path))
args = ["svn", "propget", "-r", str(rev), prop, path]
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdout, stderr = p.communicate()
if p.returncode and stderr:
raise Exception("svn ls failed on %s: %r" % (path, stderr))
if "W200017" in stdout:
# subversion >= 1.9 changed 'no properties' to be an error, so let's
# avoid that
return ""
return stdout.strip()
def _obsolete_wrap(cls, name):
origfunc = getattr(cls, name)
if not name.startswith("test_") or not origfunc:
return
if not obsolete:
wrapper = _makeskip(name, "obsolete not available")
else:
def wrapper(self, *args, **opts):
self.assertFalse(obsolete._enabled, "obsolete was already active")
obsolete._enabled = True
try:
origfunc(self, *args, **opts)
self.assertTrue(obsolete._enabled, "obsolete remains active")
finally:
obsolete._enabled = False
if not wrapper:
return
wrapper.__name__ = name + " obsolete"
wrapper.__module__ = origfunc.__module__
if origfunc.__doc__:
firstline = origfunc.__doc__.strip().splitlines()[0]
wrapper.__doc__ = firstline + " (obsolete)"
assert getattr(cls, wrapper.__name__, None) is None
setattr(cls, wrapper.__name__, wrapper)
def _stupid_wrap(cls, name):
origfunc = getattr(cls, name)
if not name.startswith("test_") or not origfunc:
return
def wrapper(self, *args, **opts):
self.assertFalse(self.stupid, "stupid mode was already active")
self.stupid = True
try:
origfunc(self, *args, **opts)
finally:
self.stupid = False
wrapper.__name__ = name + " stupid"
wrapper.__module__ = origfunc.__module__
if origfunc.__doc__:
firstline = origfunc.__doc__.strip().splitlines()[0]
wrapper.__doc__ = firstline + " (stupid)"
assert getattr(cls, wrapper.__name__, None) is None
setattr(cls, wrapper.__name__, wrapper)
class TestMeta(type):
def __init__(cls, *args, **opts):
if cls.obsolete_mode_tests:
for origname in dir(cls):
_obsolete_wrap(cls, origname)
if cls.stupid_mode_tests and svnwrap.subversion_version < (1, 9, 0):
for origname in dir(cls):
_stupid_wrap(cls, origname)
return super(TestMeta, cls).__init__(*args, **opts)
class TestBase(unittest.TestCase):
__metaclass__ = TestMeta
obsolete_mode_tests = False
stupid_mode_tests = False
stupid = False
def setUp(self):
if "hgsubversion" in sys.modules:
sys.modules["hgext_hgsubversion"] = sys.modules["hgsubversion"]
# the Python 2.7 default of 640 is obnoxiously low
self.maxDiff = 4096
self.oldenv = dict(
[(k, os.environ.get(k, None)) for k in ("LANG", "LC_ALL", "HGRCPATH")]
)
try:
self.oldugettext = i18n._ugettext # Mercurial >= 3.2
except AttributeError:
self.oldt = i18n.t
os.environ["LANG"] = os.environ["LC_ALL"] = "C"
i18n.t = gettext.translation("hg", i18n.localedir, fallback=True)
else:
os.environ["LANG"] = os.environ["LC_ALL"] = "C"
i18n.setdatapath(hgutil.datapath)
self.oldwd = os.getcwd()
self.tmpdir = tempfile.mkdtemp(
"svnwrap_test", dir=os.environ.get("HGSUBVERSION_TEST_TEMP", None)
)
os.chdir(self.tmpdir)
self.hgrc = os.path.join(self.tmpdir, ".hgrc")
os.environ["HGRCPATH"] = self.hgrc
scmutil._rcpath = None
rc = open(self.hgrc, "w")
rc.write("[ui]\nusername=test-user\n")
for l in "[extensions]", "hgsubversion=":
print >> rc, l
self.repocount = 0
self.wc_path = "%s/testrepo_wc" % self.tmpdir
self.svn_wc = None
self.config_dir = self.tmpdir
svnwrap.common._svn_config_dir = self.config_dir
self.setup_svn_config("")
# Previously, we had a MockUI class that wrapped ui, and giving access
# to the stream. The ui.pushbuffer() and ui.popbuffer() can be used
# instead. Using the regular UI class, with all stderr redirected to
# stdout ensures that the test setup is much more similar to usage
# setups.
self.patch = (ui.ui.write_err, ui.ui.write)
setattr(ui.ui, self.patch[0].func_name, self.patch[1])
def setup_svn_config(self, config):
c = open(self.config_dir + "/config", "w")
try:
c.write(config)
finally:
c.close()
def _makerepopath(self):
self.repocount += 1
return "%s/testrepo-%d" % (self.tmpdir, self.repocount)
def tearDown(self):
for var, val in self.oldenv.iteritems():
if val is None:
del os.environ[var]
else:
os.environ[var] = val
try:
i18n._ugettext = self.oldugettext # Mercurial >= 3.2
except AttributeError:
i18n.t = self.oldt
os.chdir(self.oldwd)
rmtree(self.tmpdir)
setattr(ui.ui, self.patch[0].func_name, self.patch[0])
def ui(self, layout="auto"):
return testui(self.stupid, layout)
def load_svndump(self, fixture_name):
"""Loads an svnadmin dump into a fresh repo. Return the svn repo
path.
"""
path = self._makerepopath()
assert not os.path.exists(path)
with open(os.path.join(FIXTURES, fixture_name)) as inp:
svnwrap.create_and_load(path, inp)
return path
def load_repo_tarball(self, fixture_name):
"""Extracts a tarball of an svn repo and returns the svn repo path."""
path = self._makerepopath()
assert not os.path.exists(path)
os.mkdir(path)
tarball = tarfile.open(os.path.join(FIXTURES, fixture_name))
# This is probably somewhat fragile, but I'm not sure how to
# do better in particular, I think it assumes that the tar
# entries are in the right order and that directories appear
# before their contents. This is a valid assummption for sane
# tarballs, from what I can tell. In particular, for a simple
# tarball of a svn repo with paths relative to the repo root,
# it seems to work
for entry in tarball:
tarball.extract(entry, path)
return path
def clone(
self,
repo_path,
subdir=None,
layout="auto",
startrev=0,
externals=None,
noupdate=True,
dest=None,
rev=None,
config=None,
):
if layout == "single":
2009-10-17 07:33:41 +04:00
if subdir is None:
subdir = "trunk"
2009-10-17 07:33:41 +04:00
elif subdir is None:
subdir = ""
projectpath = repo_path
if subdir:
projectpath += "/" + subdir
cmd = [
"clone",
"--quiet",
"--layout=%s" % layout,
"--startrev=%s" % startrev,
fileurl(projectpath),
self.wc_path,
]
if self.stupid:
cmd.append("--stupid")
if noupdate:
cmd.append("--noupdate")
if rev is not None:
cmd.append("--rev=%s" % rev)
config = dict(config or {})
if externals:
config["hgsubversion.externals"] = str(externals)
for k, v in reversed(sorted(config.iteritems())):
cmd[:0] = ["--config", "%s=%s" % (k, v)]
r = dispatch(cmd)
assert not r, "clone of %s failed" % projectpath
return self.wc_path
def fetch(self, svn_repo_path, *args, **opts):
hg_repo_path = self.clone(svn_repo_path, *args, **opts)
return hg.repository(testui(), hg_repo_path)
def load(self, fixture_name):
if fixture_name.endswith(".svndump"):
repo_path = self.load_svndump(fixture_name)
elif fixture_name.endswith("tar.gz"):
repo_path = self.load_repo_tarball(fixture_name)
else:
assert False, "Unknown fixture type"
return repo_path
def load_and_clone(self, fixture_name, *args, **opts):
svn_repo_path = self.load(fixture_name)
return self.clone(svn_repo_path, *args, **opts), svn_repo_path
def load_and_fetch(self, fixture_name, *args, **opts):
repo_path = self.load(fixture_name)
return self.fetch(repo_path, *args, **opts), repo_path
def _load_fixture_and_fetch(self, *args, **kwargs):
repo, repo_path = self.load_and_fetch(*args, **kwargs)
return repo
2008-12-12 04:49:36 +03:00
def add_svn_rev(self, repo_path, changes):
"""changes is a dict of filename -> contents"""
if self.svn_wc is None:
self.svn_wc = os.path.join(self.tmpdir, "testsvn_wc")
subprocess.call(
["svn", "co", "-q", fileurl(repo_path), self.svn_wc],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
for filename, contents in changes.iteritems():
# filenames are / separated
filename = filename.replace("/", os.path.sep)
filename = os.path.join(self.svn_wc, filename)
open(filename, "w").write(contents)
# may be redundant
subprocess.call(
["svn", "add", "-q", filename],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
subprocess.call(
["svn", "commit", "-q", self.svn_wc, "-m", "test changes"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
# define this as a property so that it reloads anytime we need it
@property
def repo(self):
return hg.repository(testui(), self.wc_path)
def pushrevisions(self, expected_extra_back=0):
before = repolen(self.repo)
self.repo.ui.setconfig("hgsubversion", "stupid", str(self.stupid))
res = commands.push(self.repo.ui, self.repo)
after = repolen(self.repo)
self.assertEqual(expected_extra_back, after - before)
return res
def svnco(self, repo_path, svnpath, rev, path):
path = os.path.join(self.wc_path, path)
subpath = os.path.dirname(path)
if not os.path.isdir(subpath):
os.makedirs(subpath)
svnpath = fileurl(repo_path + "/" + svnpath)
args = ["svn", "co", "-r", rev, svnpath, path]
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdout, stderr = p.communicate()
if p.returncode:
raise Exception("svn co failed on %s: %r" % (svnpath, stderr))
def commitchanges(self, changes, parent="tip", message="automated test"):
"""Commit changes to mercurial directory
'changes' is a sequence of tuples (source, dest, data). It can look
like:
- (source, source, data) to set source content to data
- (source, dest, None) to set dest content to source one, and mark it as
copied from source.
- (source, dest, data) to set dest content to data, and mark it as copied
from source.
- (source, None, None) to remove source.
"""
repo = self.repo
parentctx = repo[parent]
changed, removed = [], []
for source, dest, newdata in changes:
if dest is None:
removed.append(source)
else:
changed.append(dest)
def filectxfn(repo, memctx, path):
if path in removed:
return compathacks.filectxfn_deleted(memctx, path)
entry = [e for e in changes if path == e[1]][0]
source, dest, newdata = entry
if newdata is None:
newdata = parentctx[source].data()
copied = None
if source != dest:
copied = source
return compathacks.makememfilectx(
repo,
memctx=memctx,
path=dest,
data=newdata,
islink=False,
isexec=False,
copied=copied,
)
ctx = context.memctx(
repo,
(parentctx.node(), node.nullid),
message,
changed + removed,
filectxfn,
"an_author",
"2008-10-07 20:59:48 -0500",
{"branch": parentctx.branch()},
)
nodeid = repo.commitctx(ctx)
repo = self.repo
hg.clean(repo, nodeid)
return nodeid
def assertchanges(self, changes, ctx):
"""Assert that all 'changes' (as in defined in commitchanged())
went into ctx.
"""
for source, dest, data in changes:
if dest is None:
self.assertTrue(source not in ctx)
continue
self.assertTrue(dest in ctx)
if data is None:
data = ctx.parents()[0][source].data()
self.assertEqual(ctx[dest].data(), data)
if dest != source:
copy = ctx[dest].renamed()
self.assertEqual(copy[0], source)
def assertMultiLineEqual(self, first, second, msg=None):
"""Assert that two multi-line strings are equal. (Based on Py3k code.)
"""
try:
return super(TestBase, self).assertMultiLineEqual(first, second, msg)
except AttributeError:
pass
self.assert_(isinstance(first, str), ("First argument is not a string"))
self.assert_(isinstance(second, str), ("Second argument is not a string"))
if first != second:
diff = "".join(
difflib.unified_diff(
first.splitlines(True),
second.splitlines(True),
fromfile="a",
tofile="b",
)
)
msg = "%s\n%s" % (msg or "", diff)
raise self.failureException(msg)
def getgraph(self, repo):
"""Helper function displaying a repository graph, especially
useful when debugging comprehensive tests.
"""
# Could be more elegant, but it works with stock hg
2015-07-09 02:45:04 +03:00
_ui = testui()
templ = """\
changeset: {rev}:{node|short} (r{svnrev})
branch: {branches}
2010-02-06 19:36:12 +03:00
tags: {tags}
summary: {desc|firstline}
files: {files}
"""
_ui.pushbuffer()
commands.log(_ui, repo, rev=None, template=templ, graph=True)
return _ui.popbuffer()
def svnlog(self, repo=None):
"""log of the remote Subversion repository corresponding to repo
In order to make the format suitable for direct comparison in
tests, we exclude dates and convert the path operations into
a tuple.
"""
if repo is None:
repo = self.repo
return [
(
r.revnum,
r.message,
dict(
(p, (op.action, op.copyfrom_path, int(op.copyfrom_rev)))
for (p, op) in r.paths.items()
),
)
for r in svnrepo.svnremoterepo(repo.ui).svn.revisions()
]
def draw(self, repo):
sys.stdout.write(self.getgraph(repo))
def import_test(name):
components = name.split("_")
components.insert(1, "hgsubversion")
testname = "-".join(components) + ".py"
dot = os.path.dirname(os.path.abspath(__file__))
candidates = [
os.path.join(dot, testname),
os.path.join(dot, "comprehensive", testname),
]
for candidate in candidates:
if os.path.exists(candidate):
return imp.load_source(name, candidate)
raise ImportError(
"Could not import module %s from files like %s" % (name, testname)
)