sapling/tests/test-hgsubversion-push-command.py

800 lines
27 KiB
Python
Raw Normal View History

# no-check-code -- see T24862348
from __future__ import absolute_import
import errno
import os
import random
import shutil
import socket
import subprocess
import sys
import time
import test_hgsubversion_util
from edenscm.hgext.hgsubversion import compathacks, util
from edenscm.mercurial import commands, context, hg, node, revlog, util as hgutil
class PushTests(test_hgsubversion_util.TestBase):
obsolete_mode_tests = True
def setUp(self):
test_hgsubversion_util.TestBase.setUp(self)
self.repo_path = self.load_and_fetch("simple_branch.svndump")[1]
def test_cant_push_empty_ctx(self):
repo = self.repo
def file_callback(repo, memctx, path):
if path == "adding_file":
return compathacks.makememfilectx(
repo,
memctx=memctx,
path=path,
data="foo",
islink=False,
isexec=False,
copied=False,
)
raise IOError()
ctx = context.memctx(
repo,
(repo["default"].node(), node.nullid),
"automated test",
[],
file_callback,
"an_author",
"2008-10-07 20:59:48 -0500",
{"branch": "default"},
)
2018-01-03 22:51:00 +03:00
repo.commitctx(ctx)
hg.update(repo, repo["tip"].node())
old_tip = repo["tip"].node()
self.pushrevisions()
tip = self.repo["tip"]
self.assertEqual(tip.node(), old_tip)
def test_push_add_of_added_upstream_gives_sane_error(self):
repo = self.repo
def file_callback(repo, memctx, path):
if path == "adding_file":
return compathacks.makememfilectx(
repo,
memctx=memctx,
path=path,
data="foo",
islink=False,
isexec=False,
copied=False,
)
raise IOError()
p1 = repo["default"].node()
ctx = context.memctx(
repo,
(p1, node.nullid),
"automated test",
["adding_file"],
file_callback,
"an_author",
"2008-10-07 20:59:48 -0500",
{"branch": "default"},
)
2018-01-03 22:51:00 +03:00
repo.commitctx(ctx)
hg.update(repo, repo["tip"].node())
old_tip = repo["tip"].node()
self.pushrevisions()
tip = self.repo["tip"]
self.assertNotEqual(tip.node(), old_tip)
# This node adds the same file as the first one we added, and
# will be refused by the server for adding a file that already
# exists. We should respond with an error suggesting the user
# rebase.
ctx = context.memctx(
repo,
(p1, node.nullid),
"automated test",
["adding_file"],
file_callback,
"an_author",
"2008-10-07 20:59:48 -0500",
{"branch": "default"},
)
2018-01-03 22:51:00 +03:00
repo.commitctx(ctx)
hg.update(repo, repo["tip"].node())
old_tip = repo["tip"].node()
try:
self.pushrevisions()
except hgutil.Abort as e:
assert "pull again and rebase" in str(e)
tip = self.repo["tip"]
self.assertEqual(tip.node(), old_tip)
def test_cant_push_with_changes(self):
repo = self.repo
def file_callback(repo, memctx, path):
return compathacks.makememfilectx(
repo,
memctx=memctx,
path=path,
data="foo",
islink=False,
isexec=False,
copied=False,
)
ctx = context.memctx(
repo,
(repo["default"].node(), node.nullid),
"automated test",
["adding_file"],
file_callback,
"an_author",
"2008-10-07 20:59:48 -0500",
{"branch": "default"},
)
new_hash = repo.commitctx(ctx)
hg.update(repo, repo["tip"].node())
# Touch an existing file
repo.wwrite("beta", "something else", "")
try:
self.pushrevisions()
except hgutil.Abort:
pass
tip = self.repo["tip"]
self.assertEqual(new_hash, tip.node())
def internal_push_over_svnserve(self, subdir="", commit=True):
repo_path = self.load_svndump("simple_branch.svndump")
open(os.path.join(repo_path, "conf", "svnserve.conf"), "w").write(
"[general]\nanon-access=write\n[sasl]\n"
)
self.port = random.randint(socket.IPPORT_USERRESERVED, 65535)
self.host = socket.gethostname()
# The `svnserve` binary appears to use the obsolete `gethostbyname(3)`
# function, which always returns an IPv4 address, even on hosts that
# support and expect IPv6. As a workaround, resolve the hostname
# within the test harness with `getaddrinfo(3)` to ensure that the
# client and server both use the same IPv4 or IPv6 address.
try:
addrinfo = socket.getaddrinfo(self.host, self.port)
2018-01-03 22:51:00 +03:00
except socket.gaierror:
# gethostname() can give a hostname that doesn't
# resolve. Seems bad, but let's fall back to `localhost` in
# that case and hope for the best.
self.host = "localhost"
addrinfo = socket.getaddrinfo(self.host, self.port)
# On macOS svn seems to have issues with IPv6 at least some of
# the time, so try and bias towards IPv4. This works because
# AF_INET is less than AF_INET6 on all platforms I've
# checked. Hopefully any platform where that's not true will
# be fine with IPv6 all the time. :)
selected = sorted(addrinfo)[0]
self.host = selected[4][0]
# If we're connecting via IPv6 the need to put brackets around the
# hostname in the URL.
ipv6 = selected[0] == socket.AF_INET6
# Ditch any interface information since that's not helpful in
# a URL
if ipv6 and ":" in self.host and "%" in self.host:
self.host = self.host.rsplit("%", 1)[0]
urlfmt = "svn://[%s]:%d/%s" if ipv6 else "svn://%s:%d/%s"
args = [
"svnserve",
"--daemon",
"--foreground",
"--listen-port=%d" % self.port,
"--listen-host=%s" % self.host,
"--root=%s" % repo_path,
]
svnserve = subprocess.Popen(
args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
2009-05-19 13:43:49 +04:00
self.svnserve_pid = svnserve.pid
try:
time.sleep(2)
import shutil
shutil.rmtree(self.wc_path)
commands.clone(
self.ui(),
urlfmt % (self.host, self.port, subdir),
self.wc_path,
noupdate=True,
)
repo = self.repo
old_tip = repo["tip"].node()
expected_parent = repo["default"].node()
def file_callback(repo, memctx, path):
if path == "adding_file":
return compathacks.makememfilectx(
repo,
memctx=memctx,
path=path,
data="foo",
islink=False,
isexec=False,
copied=False,
)
raise IOError(errno.EINVAL, "Invalid operation: " + path)
ctx = context.memctx(
repo,
parents=(repo["default"].node(), node.nullid),
text="automated test",
files=["adding_file"],
filectxfn=file_callback,
user="an_author",
date="2008-10-07 20:59:48 -0500",
extra={"branch": "default"},
)
2018-01-03 22:51:00 +03:00
repo.commitctx(ctx)
if not commit:
return # some tests use this test as an extended setup.
hg.update(repo, repo["tip"].node())
oldauthor = repo["tip"].user()
commands.push(repo.ui, repo)
tip = self.repo["tip"]
self.assertNotEqual(oldauthor, tip.user())
self.assertNotEqual(tip.node(), old_tip)
self.assertEqual(tip.parents()[0].node(), expected_parent)
self.assertEqual(tip["adding_file"].data(), "foo")
# unintended behaviour:
self.assertNotEqual("an_author", tip.user())
self.assertEqual("(no author)", tip.user().rsplit("@", 1)[0])
finally:
if sys.version_info >= (2, 6):
svnserve.kill()
else:
test_hgsubversion_util.kill_process(svnserve)
def test_push_over_svnserve(self):
self.internal_push_over_svnserve()
def test_push_over_svnserve_with_subdir(self):
self.internal_push_over_svnserve(subdir="///branches////the_branch/////")
def test_push_to_default(self, commit=True):
repo = self.repo
old_tip = repo["tip"].node()
expected_parent = repo["default"].node()
def file_callback(repo, memctx, path):
if path == "adding_file":
return compathacks.makememfilectx(
repo,
memctx=memctx,
path=path,
data="foo",
islink=False,
isexec=False,
copied=False,
)
raise IOError(errno.EINVAL, "Invalid operation: " + path)
ctx = context.memctx(
repo,
(repo["default"].node(), node.nullid),
"automated test",
["adding_file"],
file_callback,
"an_author",
"2008-10-07 20:59:48 -0500",
{"branch": "default"},
)
2018-01-03 22:51:00 +03:00
repo.commitctx(ctx)
if not commit:
return # some tests use this test as an extended setup.
hg.update(repo, repo["tip"].node())
self.pushrevisions()
tip = self.repo["tip"]
self.assertNotEqual(tip.node(), old_tip)
self.assertEqual(node.hex(tip.parents()[0].node()), node.hex(expected_parent))
self.assertEqual(tip["adding_file"].data(), "foo")
def test_push_two_revs(self):
# set up some work for us
self.test_push_to_default(commit=False)
repo = self.repo
old_tip = repo["tip"].node()
expected_parent = repo["tip"].parents()[0].node()
def file_callback(repo, memctx, path):
if path == "adding_file2":
return compathacks.makememfilectx(
repo,
memctx=memctx,
path=path,
data="foo2",
islink=False,
isexec=False,
copied=False,
)
raise IOError(errno.EINVAL, "Invalid operation: " + path)
ctx = context.memctx(
repo,
(repo["default"].node(), node.nullid),
"automated test",
["adding_file2"],
file_callback,
"an_author",
"2008-10-07 20:59:48 -0500",
{"branch": "default"},
)
2018-01-03 22:51:00 +03:00
repo.commitctx(ctx)
hg.update(repo, repo["tip"].node())
self.pushrevisions()
tip = self.repo["tip"]
self.assertNotEqual(tip.node(), old_tip)
self.assertNotEqual(tip.parents()[0].node(), old_tip)
self.assertEqual(tip.parents()[0].parents()[0].node(), expected_parent)
self.assertEqual(tip["adding_file2"].data(), "foo2")
self.assertEqual(tip["adding_file"].data(), "foo")
self.assertEqual(tip.parents()[0]["adding_file"].data(), "foo")
try:
self.assertEqual(tip.parents()[0]["adding_file2"].data(), "foo")
assert (
False
), "this is impossible, adding_file2 should not be in this manifest."
2018-01-03 22:51:00 +03:00
except revlog.LookupError:
pass
def test_delete_file(self):
repo = self.repo
def file_callback(repo, memctx, path):
return compathacks.filectxfn_deleted(memctx, path)
old_files = set(repo["default"].manifest().keys())
ctx = context.memctx(
repo,
(repo["default"].node(), node.nullid),
"automated test",
["alpha"],
file_callback,
"an author",
"2008-10-29 21:26:00 -0500",
{"branch": "default"},
)
2018-01-03 22:51:00 +03:00
repo.commitctx(ctx)
hg.update(repo, repo["tip"].node())
self.pushrevisions()
tip = self.repo["tip"]
self.assertEqual(old_files, set(tip.manifest().keys() + ["alpha"]))
self.assert_("alpha" not in tip.manifest())
def test_push_executable_file(self):
self.test_push_to_default(commit=True)
repo = self.repo
def file_callback(repo, memctx, path):
if path == "gamma":
return compathacks.makememfilectx(
repo,
memctx=memctx,
path=path,
data="foo",
islink=False,
isexec=True,
copied=False,
)
raise IOError(errno.EINVAL, "Invalid operation: " + path)
ctx = context.memctx(
repo,
(repo["tip"].node(), node.nullid),
"message",
["gamma"],
file_callback,
"author",
"2008-10-29 21:26:00 -0500",
{"branch": "default"},
)
new_hash = repo.commitctx(ctx)
hg.clean(repo, repo["tip"].node())
self.pushrevisions()
tip = self.repo["tip"]
self.assertNotEqual(tip.node(), new_hash)
self.assert_("@" in self.repo["tip"].user())
self.assertEqual(tip["gamma"].flags(), "x")
self.assertEqual(tip["gamma"].data(), "foo")
self.assertEqual(
sorted([x for x in tip.manifest().keys() if "x" not in tip[x].flags()]),
["adding_file", "alpha", "beta"],
)
def test_push_symlink_file(self):
self.test_push_to_default(commit=True)
repo = self.repo
def file_callback(repo, memctx, path):
if path == "gamma":
return compathacks.makememfilectx(
repo,
memctx=memctx,
path=path,
data="foo",
islink=True,
isexec=False,
copied=False,
)
raise IOError(errno.EINVAL, "Invalid operation: " + path)
ctx = context.memctx(
repo,
(repo["tip"].node(), node.nullid),
"message",
["gamma"],
file_callback,
"author",
"2008-10-29 21:26:00 -0500",
{"branch": "default"},
)
new_hash = repo.commitctx(ctx)
hg.update(repo, repo["tip"].node())
self.pushrevisions()
# grab a new repo instance (self.repo is an @property functions)
repo = self.repo
tip = repo["tip"]
self.assertNotEqual(tip.node(), new_hash)
self.assertEqual(tip["gamma"].flags(), "l")
self.assertEqual(tip["gamma"].data(), "foo")
self.assertEqual(
sorted([x for x in tip.manifest().keys() if "l" not in tip[x].flags()]),
["adding_file", "alpha", "beta"],
)
def file_callback2(repo, memctx, path):
if path == "gamma":
return compathacks.makememfilectx(
repo,
memctx=memctx,
path=path,
data="a" * 129,
islink=True,
isexec=False,
copied=False,
)
raise IOError(errno.EINVAL, "Invalid operation: " + path)
ctx = context.memctx(
repo,
(repo["tip"].node(), node.nullid),
"message",
["gamma"],
file_callback2,
"author",
"2014-08-08 20:11:41 -0700",
{"branch": "default"},
)
repo.commitctx(ctx)
hg.update(repo, repo["tip"].node())
self.pushrevisions()
# grab a new repo instance (self.repo is an @property functions)
repo = self.repo
tip = repo["tip"]
self.assertEqual(tip["gamma"].flags(), "l")
self.assertEqual(tip["gamma"].data(), "a" * 129)
def file_callback3(repo, memctx, path):
if path == "gamma":
return compathacks.makememfilectx(
repo,
memctx=memctx,
path=path,
data="a" * 64 + "b" * 65,
islink=True,
isexec=False,
copied=False,
)
raise IOError(errno.EINVAL, "Invalid operation: " + path)
ctx = context.memctx(
repo,
(repo["tip"].node(), node.nullid),
"message",
["gamma"],
file_callback3,
"author",
"2014-08-08 20:16:25 -0700",
{"branch": "default"},
)
repo.commitctx(ctx)
hg.update(repo, repo["tip"].node())
self.pushrevisions()
repo = self.repo
tip = repo["tip"]
self.assertEqual(tip["gamma"].flags(), "l")
self.assertEqual(tip["gamma"].data(), "a" * 64 + "b" * 65)
def test_push_existing_file_newly_symlink(self):
self.test_push_existing_file_newly_execute(
execute=False, link=True, expected_flags="l"
)
def test_push_existing_file_newly_execute(
self, execute=True, link=False, expected_flags="x"
):
self.test_push_to_default()
repo = self.repo
def file_callback(repo, memctx, path):
return compathacks.makememfilectx(
repo,
memctx=memctx,
path=path,
data="foo",
islink=link,
isexec=execute,
copied=False,
)
ctx = context.memctx(
repo,
(repo["default"].node(), node.nullid),
"message",
["alpha"],
file_callback,
"author",
"2008-1-1 00:00:00 -0500",
{"branch": "default"},
)
new_hash = repo.commitctx(ctx)
hg.update(repo, repo["tip"].node())
self.pushrevisions()
tip = self.repo["tip"]
self.assertNotEqual(tip.node(), new_hash)
self.assertEqual(tip["alpha"].data(), "foo")
self.assertEqual(tip.parents()[0]["alpha"].flags(), "")
self.assertEqual(tip["alpha"].flags(), expected_flags)
# while we're here, double check pushing an already-executable file
# works
repo = self.repo
2008-11-22 01:21:19 +03:00
def file_callback2(repo, memctx, path):
return compathacks.makememfilectx(
repo,
memctx=memctx,
path=path,
data="bar",
islink=link,
isexec=execute,
copied=False,
)
ctx = context.memctx(
repo,
(repo["default"].node(), node.nullid),
"mutate already-special file alpha",
["alpha"],
file_callback2,
"author",
"2008-1-1 00:00:00 -0500",
{"branch": "default"},
)
new_hash = repo.commitctx(ctx)
hg.update(repo, repo["tip"].node())
self.pushrevisions()
tip = self.repo["tip"]
self.assertNotEqual(tip.node(), new_hash)
self.assertEqual(tip["alpha"].data(), "bar")
self.assertEqual(tip.parents()[0]["alpha"].flags(), expected_flags)
self.assertEqual(tip["alpha"].flags(), expected_flags)
# now test removing the property entirely
repo = self.repo
2008-11-22 01:21:19 +03:00
def file_callback3(repo, memctx, path):
return compathacks.makememfilectx(
repo,
memctx=memctx,
path=path,
data="bar",
islink=False,
isexec=False,
copied=False,
)
ctx = context.memctx(
repo,
(repo["default"].node(), node.nullid),
"convert alpha back to regular file",
["alpha"],
file_callback3,
"author",
"2008-01-01 00:00:00 -0500",
{"branch": "default"},
)
new_hash = repo.commitctx(ctx)
hg.update(repo, repo["tip"].node())
self.pushrevisions()
tip = self.repo["tip"]
self.assertNotEqual(tip.node(), new_hash)
self.assertEqual(tip["alpha"].data(), "bar")
self.assertEqual(tip.parents()[0]["alpha"].flags(), expected_flags)
self.assertEqual(tip["alpha"].flags(), "")
def test_push_outdated_base_text(self):
self.test_push_two_revs()
changes = [("adding_file", "adding_file", "different_content")]
par = self.repo["tip"].rev()
self.commitchanges(changes, parent=par)
self.pushrevisions()
changes = [("adding_file", "adding_file", "even_more different_content")]
self.commitchanges(changes, parent=par)
try:
self.pushrevisions()
assert False, "This should have aborted!"
except hgutil.Abort as e:
self.assertEqual(
e.args[0],
"Outgoing changesets parent is not at subversion "
"HEAD (svn error 160028)\n"
"(pull again and rebase on a newer revision)",
)
# verify that any pending transactions on the server got cleaned up
self.assertEqual(
[],
os.listdir(
os.path.join(self.tmpdir, "testrepo-1", "db", "transactions")
),
)
def test_push_encoding(self):
self.test_push_two_revs()
# Writing then rebasing UTF-8 filenames in a cp1252 windows console
# used to fail because hg internal encoding was being changed during
# the interactions with subversion, *and during the rebase*, which
# confused the dirstate and made it believe the file was deleted.
fn = "pi\xc3\xa8ce/test"
changes = [(fn, fn, "a")]
par = self.repo["tip"].rev()
self.commitchanges(changes, parent=par)
self.pushrevisions()
def test_push_emptying_changeset(self):
self.repo["tip"]
changes = [("alpha", None, None), ("beta", None, None)]
parent = self.repo["tip"].rev()
self.commitchanges(changes, parent=parent)
self.pushrevisions()
self.assertEqual(len(self.repo["tip"].manifest()), 0)
# Try to re-add a file after emptying the branch
changes = [("alpha", "alpha", "alpha")]
self.commitchanges(changes, parent=self.repo["tip"].rev())
self.pushrevisions()
self.assertEqual(["alpha"], list(self.repo["tip"].manifest()))
def test_push_without_pushing_children(self):
"""
Verify that a push of a nontip node, keeps the tip child
on top of the pushed commit.
"""
oldlen = test_hgsubversion_util.repolen(self.repo)
self.repo["default"].node()
changes = [("gamma", "gamma", "sometext")]
newhash1 = self.commitchanges(changes)
changes = [("delta", "delta", "sometext")]
2018-01-03 22:51:00 +03:00
self.commitchanges(changes)
# push only the first commit
repo = self.repo
hg.update(repo, newhash1)
commands.push(repo.ui, repo)
self.assertEqual(test_hgsubversion_util.repolen(self.repo), oldlen + 2)
# verify that the first commit is pushed, and the second is not
commit2 = self.repo["tip"]
self.assertEqual(commit2.files(), ("delta",))
2013-08-05 22:27:31 +04:00
self.assertEqual(util.getsvnrev(commit2), None)
commit1 = commit2.parents()[0]
self.assertEqual(commit1.files(), ("gamma",))
prefix = "svn:" + self.repo.svnmeta().uuid
self.assertEqual(util.getsvnrev(commit1), prefix + "/branches/the_branch@5")
def test_push_two_that_modify_same_file(self):
"""
Push performs a rebase if two commits touch the same file.
This test verifies that code path works.
"""
oldlen = test_hgsubversion_util.repolen(self.repo)
self.repo["default"].node()
changes = [("gamma", "gamma", "sometext")]
newhash = self.commitchanges(changes)
changes = [
("gamma", "gamma", "sometext\n moretext"),
("delta", "delta", "sometext\n moretext"),
]
newhash = self.commitchanges(changes)
repo = self.repo
hg.update(repo, newhash)
commands.push(repo.ui, repo)
self.assertEqual(test_hgsubversion_util.repolen(self.repo), oldlen + 2)
# verify that both commits are pushed
commit1 = self.repo["tip"]
self.assertEqual(commit1.files(), ("delta", "gamma"))
2013-08-05 22:27:31 +04:00
prefix = "svn:" + self.repo.svnmeta().uuid
self.assertEqual(util.getsvnrev(commit1), prefix + "/branches/the_branch@6")
commit2 = commit1.parents()[0]
self.assertEqual(commit2.files(), ("gamma",))
self.assertEqual(util.getsvnrev(commit2), prefix + "/branches/the_branch@5")
def test_push_in_subdir(self, commit=True):
repo = self.repo
old_tip = repo["tip"].node()
def file_callback(repo, memctx, path):
if path == "adding_file" or path == "newdir/new_file":
testData = "fooFirstFile"
if path == "newdir/new_file":
testData = "fooNewFile"
return compathacks.makememfilectx(
repo,
memctx=memctx,
path=path,
data=testData,
islink=False,
isexec=False,
copied=False,
)
raise IOError(errno.EINVAL, "Invalid operation: " + path)
ctx = context.memctx(
repo,
(repo["default"].node(), node.nullid),
"automated test",
["adding_file"],
file_callback,
"an_author",
"2012-12-13 20:59:48 -0500",
{"branch": "default"},
)
2018-01-03 22:51:00 +03:00
repo.commitctx(ctx)
p = os.path.join(repo.root, "newdir")
os.mkdir(p)
ctx = context.memctx(
repo,
(repo["default"].node(), node.nullid),
"automated test",
["newdir/new_file"],
file_callback,
"an_author",
"2012-12-13 20:59:48 -0500",
{"branch": "default"},
)
2018-01-03 22:51:00 +03:00
repo.commitctx(ctx)
hg.update(repo, repo["tip"].node())
self.pushrevisions()
tip = self.repo["tip"]
self.assertNotEqual(tip.node(), old_tip)
self.assertEqual(tip["adding_file"].data(), "fooFirstFile")
self.assertEqual(tip["newdir/new_file"].data(), "fooNewFile")
if __name__ == "__main__":
import silenttestrunner
silenttestrunner.main(__name__)