sapling/hgext/hgsubversion/verify.py
Lukasz Langa dfda82e492 Upgrade to 18.5b1
Summary: Mostly empty lines removed and added.  A few bugfixes on excessive line splitting.

Reviewed By: quark-zju

Differential Revision: D8199128

fbshipit-source-id: 90c1616061bfd7cfbba0b75f03f89683340374d5
2018-05-30 02:23:58 -07:00

241 lines
7.9 KiB
Python

# no-check-code -- see T24862348
import difflib
import posixpath
from mercurial import util as hgutil
from mercurial import error
from mercurial import progress
from mercurial import worker
import svnwrap
import svnrepo
import util
def verify(ui, repo, args=None, **opts):
"""verify current revision against Subversion repository
"""
if repo is None:
raise error.RepoError(
"There is no Mercurial repository" " here (.hg not found)"
)
ctx = repo[opts.get("rev", ".")]
if "close" in ctx.extra():
ui.write("cannot verify closed branch")
return 0
convert_revision = ctx.extra().get("convert_revision")
if convert_revision is None or not convert_revision.startswith("svn:"):
raise hgutil.Abort("revision %s not from SVN" % ctx)
if args:
url = repo.ui.expandpath(args[0])
else:
url = repo.ui.expandpath("default")
svn = svnrepo.svnremoterepo(ui, url).svn
meta = repo.svnmeta(svn.uuid, svn.subdir)
srev, branch, branchpath = meta.get_source_rev(ctx=ctx)
branchpath = branchpath[len(svn.subdir.lstrip("/")) :]
branchurl = ("%s/%s" % (url, branchpath)).strip("/")
ui.write("verifying %s against %s@%i\n" % (ctx, branchurl, srev))
def diff_file(path, svndata):
fctx = ctx[path]
if ui.verbose and not fctx.isbinary():
svndesc = "%s/%s/%s@%d" % (svn.svn_url, branchpath, path, srev)
hgdesc = "%s@%s" % (path, ctx)
for c in difflib.unified_diff(
svndata.splitlines(True), fctx.data().splitlines(True), svndesc, hgdesc
):
ui.note(c)
if opts.get("stupid", ui.configbool("hgsubversion", "stupid")):
svnfiles = set()
result = 0
hgfiles = set(ctx) - util.ignoredfiles
def verifydata(svndata):
svnworker = svnrepo.svnremoterepo(ui, url).svn
i = 0
res = True
for fn, type in svndata:
i += 1
if type != "f":
continue
fp = fn
if branchpath:
fp = branchpath + "/" + fn
data, mode = svnworker.get_file(posixpath.normpath(fp), srev)
try:
fctx = ctx[fn]
except error.LookupError:
yield i, "%s\0%r" % (fn, res)
continue
if not fctx.data() == data:
ui.write("difference in: %s\n" % fn)
diff_file(fn, data)
res = False
if not fctx.flags() == mode:
ui.write("wrong flags for: %s\n" % fn)
res = False
yield i, "%s\0%r" % (fn, res)
if url.startswith("file://"):
perarg = 0.00001
else:
perarg = 0.000001
svndata = svn.list_files(branchpath, srev)
w = worker.worker(repo.ui, perarg, verifydata, (), tuple(svndata))
with progress.bar(ui, "verify", total=len(hgfiles)) as prog:
for _, t in w:
prog.value += 1
fn, ok = t.split("\0", 2)
if not bool(ok):
result = 1
svnfiles.add(fn)
if hgfiles != svnfiles:
unexpected = hgfiles - svnfiles
for f in sorted(unexpected):
ui.write("unexpected file: %s\n" % f)
missing = svnfiles - hgfiles
for f in sorted(missing):
ui.write("missing file: %s\n" % f)
result = 1
else:
class VerifyEditor(svnwrap.Editor):
"""editor that verifies a repository against the given context."""
def __init__(self, ui, ctx, prog):
self.ui = ui
self.ctx = ctx
self.prog = prog
self.unexpected = set(ctx) - util.ignoredfiles
self.missing = set()
self.failed = False
self.prog._total = len(self.unexpected)
def open_root(self, base_revnum, pool=None):
pass
def add_directory(
self, path, parent_baton, copyfrom_path, copyfrom_revision, pool=None
):
self.file = None
self.props = None
def open_directory(self, path, parent_baton, base_revision, pool=None):
self.file = None
self.props = None
def add_file(
self,
path,
parent_baton=None,
copyfrom_path=None,
copyfrom_revision=None,
file_pool=None,
):
if path in self.unexpected:
self.unexpected.remove(path)
self.file = path
self.props = {}
else:
self.prog._total += 1
self.missing.add(path)
self.failed = True
self.file = None
self.props = None
self.prog.value += 1
def open_file(self, path, base_revnum):
raise NotImplementedError()
def apply_textdelta(self, file_baton, base_checksum, pool=None):
stream = svnwrap.SimpleStringIO(closing=False)
handler = svnwrap.apply_txdelta("", stream)
if not callable(handler):
raise hgutil.Abort(
"Error in Subversion bindings: " "cannot call handler!"
)
def txdelt_window(window):
handler(window)
# window being None means we're done
if window:
return
fctx = self.ctx[self.file]
hgdata = fctx.data()
svndata = stream.getvalue()
if "svn:executable" in self.props:
if fctx.flags() != "x":
self.ui.warn("wrong flags for: %s\n" % self.file)
self.failed = True
elif "svn:special" in self.props:
hgdata = "link " + hgdata
if fctx.flags() != "l":
self.ui.warn("wrong flags for: %s\n" % self.file)
self.failed = True
elif fctx.flags():
self.ui.warn("wrong flags for: %s\n" % self.file)
self.failed = True
if hgdata != svndata:
self.ui.warn("difference in: %s\n" % self.file)
diff_file(self.file, svndata)
self.failed = True
if self.file is not None:
return txdelt_window
def change_dir_prop(self, dir_baton, name, value, pool=None):
pass
def change_file_prop(self, file_baton, name, value, pool=None):
if self.props is not None:
self.props[name] = value
def close_file(self, file_baton, checksum, pool=None):
pass
def close_directory(self, dir_baton, pool=None):
pass
def delete_entry(self, path, revnum, pool=None):
raise NotImplementedError()
def check(self):
for f in self.unexpected:
self.ui.warn("unexpected file: %s\n" % f)
self.failed = True
for f in self.missing:
self.ui.warn("missing file: %s\n" % f)
self.failed = True
return not self.failed
with progress.bar(ui, "verify") as prog:
v = VerifyEditor(ui, ctx, prog)
svnrepo.svnremoterepo(ui, branchurl).svn.get_revision(srev, v)
result = 0 if v.check() else 1
return result