Drop any pretense of supporting svn 1.4.x.

The quality of the SWIG bindings is just too low for this to work reasonably. In theory, 1.4.x will be supported at a future date by ctypes.
This commit is contained in:
Augie Fackler 2008-11-01 12:23:07 -05:00
parent 062345fdbd
commit 80516ef041
3 changed files with 368 additions and 381 deletions

4
README
View File

@ -11,8 +11,8 @@ Subversion.
Installation Installation
------------ ------------
You need to have Subversion installed with the SWIG Python bindings. You need to have Subversion installed with the SWIG Python bindings from Subversion 1.5 or later.
You need a recent Mercurial. At present, the required memctx code is in mercurial_, crew_, and crew-stable_, but not mercurial-stable. Install Mercurial from one of mercurial or the crew repos if you have not already. Personally, I use crew_. Note that if you have Subversion 1.4, most functionality will probably work, but pushing and using replay (faster than diff) do not, so you should consider upgrading your Subversion installation or wait for the ctypes bindings to be supported. You need a recent Mercurial. At present, the required memctx code is in mercurial_, crew_, and crew-stable_, but not mercurial-stable. Install Mercurial from one of mercurial or the crew repos if you have not already. Personally, I use crew_.
.. _mercurial: http://selenic.com/repo/hg .. _mercurial: http://selenic.com/repo/hg
.. _crew: http://hg.intevation.org/mercurial/crew .. _crew: http://hg.intevation.org/mercurial/crew

View File

@ -11,8 +11,10 @@ from svn import core
from svn import delta from svn import delta
from svn import ra from svn import ra
svn_config = core.svn_config_get_config(None) if (core.SVN_VER_MAJOR, core.SVN_VER_MINOR, core.SVN_VER_MICRO) < (1, 5, 0):
raise ImportError, 'You must have Subversion 1.5.0 or newer and matching SWIG bindings.'
svn_config = core.svn_config_get_config(None)
class RaCallbacks(ra.Callbacks): class RaCallbacks(ra.Callbacks):
def open_tmp_file(self, pool): def open_tmp_file(self, pool):
(fd, fn) = tempfile.mkstemp() (fd, fn) = tempfile.mkstemp()
@ -22,6 +24,7 @@ class RaCallbacks(ra.Callbacks):
def get_client_string(self, pool): def get_client_string(self, pool):
return 'hgsubversion' return 'hgsubversion'
def user_pass_prompt(realm, default_username, ms, pool): def user_pass_prompt(realm, default_username, ms, pool):
creds = core.svn_auth_cred_simple_t() creds = core.svn_auth_cred_simple_t()
creds.may_save = ms creds.may_save = ms
@ -115,8 +118,7 @@ class SubversionRepo(object):
callbacks.auth_baton = self.auth_baton callbacks.auth_baton = self.auth_baton
self.callbacks = callbacks self.callbacks = callbacks
self.ra = ra.open2(self.svn_url.encode('utf-8'), callbacks, self.ra = ra.open2(self.svn_url.encode('utf-8'), callbacks,
svn_config) svn_config, self.pool)
@property @property
def HEAD(self): def HEAD(self):
@ -242,7 +244,7 @@ class SubversionRepo(object):
if not stop: if not stop:
stop = self.HEAD stop = self.HEAD
while stop > start: while stop > start:
ra.get_log(self.ra, ra.get_log(self.ra,
paths, paths,
start+1, start+1,
stop, stop,

View File

@ -16,389 +16,374 @@ import push_cmd
import test_util import test_util
import time import time
# push fails in 1.4-SWIG-land.
push_works = False
try:
import csvn
push_works = True
except ImportError:
from svn import core
if (core.SVN_VER_MAJOR, core.SVN_VER_MINOR, core.SVN_VER_MICRO) >= (1, 5, 0):
push_works = True
if push_works: class PushOverSvnserveTests(unittest.TestCase):
class PushOverSvnserveTests(unittest.TestCase): def setUp(self):
def setUp(self): self.oldwd = os.getcwd()
self.oldwd = os.getcwd() self.tmpdir = tempfile.mkdtemp('svnwrap_test')
self.tmpdir = tempfile.mkdtemp('svnwrap_test') self.repo_path = '%s/testrepo' % self.tmpdir
self.repo_path = '%s/testrepo' % self.tmpdir self.wc_path = '%s/testrepo_wc' % self.tmpdir
self.wc_path = '%s/testrepo_wc' % self.tmpdir test_util.load_svndump_fixture(self.repo_path, 'simple_branch.svndump')
test_util.load_svndump_fixture(self.repo_path, 'simple_branch.svndump') open(os.path.join(self.repo_path, 'conf', 'svnserve.conf'),
open(os.path.join(self.repo_path, 'conf', 'svnserve.conf'), 'w').write('[general]\nanon-access=write\n[sasl]\n')
'w').write('[general]\nanon-access=write\n[sasl]\n') # Paranoia: we try and connect to localhost on 3689 before we start
# Paranoia: we try and connect to localhost on 3689 before we start # svnserve. If it is running, we force the test to fail early.
# svnserve. If it is running, we force the test to fail early. user_has_own_svnserve = False
user_has_own_svnserve = False try:
try: s = socket.socket()
s = socket.socket() s.settimeout(0.3)
s.settimeout(0.3) s.connect(('localhost', 3690))
s.connect(('localhost', 3690)) s.close()
s.close() user_has_own_svnserve = True
user_has_own_svnserve = True except:
except: pass
pass if user_has_own_svnserve:
if user_has_own_svnserve: assert False, ('You appear to be running your own svnserve!'
assert False, ('You appear to be running your own svnserve!' ' You can probably ignore this test failure.')
' You can probably ignore this test failure.') args = ['svnserve', '-d', '--foreground', '-r', self.repo_path]
args = ['svnserve', '-d', '--foreground', '-r', self.repo_path] self.svnserve_pid = os.spawnvp(os.P_NOWAIT, 'svnserve', args)
self.svnserve_pid = os.spawnvp(os.P_NOWAIT, 'svnserve', args) time.sleep(2)
time.sleep(2) fetch_command.fetch_revisions(ui.ui(),
fetch_command.fetch_revisions(ui.ui(), svn_url='svn://localhost/',
svn_url='svn://localhost/', hg_repo_path=self.wc_path)
hg_repo_path=self.wc_path)
def tearDown(self): def tearDown(self):
shutil.rmtree(self.tmpdir) shutil.rmtree(self.tmpdir)
os.chdir(self.oldwd) os.chdir(self.oldwd)
os.system('kill -9 %d' % self.svnserve_pid) os.system('kill -9 %d' % self.svnserve_pid)
# define this as a property so that it reloads anytime we need it # define this as a property so that it reloads anytime we need it
@property @property
def repo(self): def repo(self):
return hg.repository(ui.ui(), self.wc_path) return hg.repository(ui.ui(), self.wc_path)
def test_push_to_default(self, commit=True): def test_push_to_default(self, commit=True):
repo = self.repo repo = self.repo
old_tip = repo['tip'].node() old_tip = repo['tip'].node()
expected_parent = repo['default'].node() expected_parent = repo['default'].node()
def file_callback(repo, memctx, path): def file_callback(repo, memctx, path):
if path == 'adding_file': if path == 'adding_file':
return context.memfilectx(path=path,
data='foo',
islink=False,
isexec=False,
copied=False)
raise IOError()
ctx = context.memctx(repo,
(repo['default'].node(), node.nullid),
'automated test',
['adding_file'],
file_callback,
'an_author',
'2008-10-07 20:59:48 -0500',
{'branch': 'default',})
new_hash = repo.commitctx(ctx)
if not commit:
return # some tests use this test as an extended setup.
hg.update(repo, repo['tip'].node())
push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo,
hg_repo_path=self.wc_path,
svn_url='svn://localhost/')
tip = self.repo['tip']
self.assertNotEqual(tip.node(), old_tip)
self.assertEqual(tip.parents()[0].node(), expected_parent)
self.assertEqual(tip['adding_file'].data(), 'foo')
self.assertEqual(tip.branch(), 'default')
class PushTests(unittest.TestCase):
def setUp(self):
self.oldwd = os.getcwd()
self.tmpdir = tempfile.mkdtemp('svnwrap_test')
self.repo_path = '%s/testrepo' % self.tmpdir
self.wc_path = '%s/testrepo_wc' % self.tmpdir
test_util.load_svndump_fixture(self.repo_path, 'simple_branch.svndump')
fetch_command.fetch_revisions(ui.ui(),
svn_url='file://%s' % self.repo_path,
hg_repo_path=self.wc_path)
# define this as a property so that it reloads anytime we need it
@property
def repo(self):
return hg.repository(ui.ui(), self.wc_path)
def tearDown(self):
shutil.rmtree(self.tmpdir)
os.chdir(self.oldwd)
def test_push_to_default(self, commit=True):
repo = self.repo
old_tip = repo['tip'].node()
expected_parent = repo['default'].node()
def file_callback(repo, memctx, path):
if path == 'adding_file':
return context.memfilectx(path=path,
data='foo',
islink=False,
isexec=False,
copied=False)
raise IOError()
ctx = context.memctx(repo,
(repo['default'].node(), node.nullid),
'automated test',
['adding_file'],
file_callback,
'an_author',
'2008-10-07 20:59:48 -0500',
{'branch': 'default',})
new_hash = repo.commitctx(ctx)
if not commit:
return # some tests use this test as an extended setup.
hg.update(repo, repo['tip'].node())
push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo,
hg_repo_path=self.wc_path,
svn_url='file://'+self.repo_path)
tip = self.repo['tip']
self.assertNotEqual(tip.node(), old_tip)
self.assertEqual(tip.parents()[0].node(), expected_parent)
self.assertEqual(tip['adding_file'].data(), 'foo')
self.assertEqual(tip.branch(), 'default')
def test_push_two_revs(self):
# set up some work for us
self.test_push_to_default(commit=False)
repo = self.repo
old_tip = repo['tip'].node()
expected_parent = repo['tip'].parents()[0].node()
def file_callback(repo, memctx, path):
if path == 'adding_file2':
return context.memfilectx(path=path,
data='foo2',
islink=False,
isexec=False,
copied=False)
raise IOError()
ctx = context.memctx(repo,
(repo['default'].node(), node.nullid),
'automated test',
['adding_file2'],
file_callback,
'an_author',
'2008-10-07 20:59:48 -0500',
{'branch': 'default',})
new_hash = repo.commitctx(ctx)
hg.update(repo, repo['tip'].node())
push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo,
hg_repo_path=self.wc_path,
svn_url='file://'+self.repo_path)
tip = self.repo['tip']
self.assertNotEqual(tip.node(), old_tip)
self.assertNotEqual(tip.parents()[0].node(), old_tip)
self.assertEqual(tip.parents()[0].parents()[0].node(), expected_parent)
self.assertEqual(tip['adding_file2'].data(), 'foo2')
self.assertEqual(tip['adding_file'].data(), 'foo')
self.assertEqual(tip.parents()[0]['adding_file'].data(), 'foo')
try:
self.assertEqual(tip.parents()[0]['adding_file2'].data(), 'foo')
assert False, "this is impossible, adding_file2 should not be in this manifest."
except revlog.LookupError, e:
pass
self.assertEqual(tip.branch(), 'default')
def test_push_to_branch(self):
repo = self.repo
def file_callback(repo, memctx, path):
if path == 'adding_file':
return context.memfilectx(path=path,
data='foo',
islink=False,
isexec=False,
copied=False)
raise IOError()
ctx = context.memctx(repo,
(repo['the_branch'].node(), node.nullid),
'automated test',
['adding_file'],
file_callback,
'an_author',
'2008-10-07 20:59:48 -0500',
{'branch': 'the_branch',})
new_hash = repo.commitctx(ctx)
#commands.update(ui.ui(), self.repo, node='tip')
hg.update(repo, repo['tip'].node())
push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo,
hg_repo_path=self.wc_path,
svn_url='file://'+self.repo_path)
tip = self.repo['tip']
self.assertNotEqual(tip.node(), new_hash)
self.assertEqual(tip['adding_file'].data(), 'foo')
self.assertEqual(tip.branch(), 'the_branch')
def test_delete_file(self):
repo = self.repo
def file_callback(repo, memctx, path):
raise IOError()
old_files = set(repo['default'].manifest().keys())
ctx = context.memctx(repo,
(repo['default'].node(), node.nullid),
'automated test',
['alpha'],
file_callback,
'an author',
'2008-10-29 21:26:00 -0500',
{'branch': 'default', })
new_hash = repo.commitctx(ctx)
hg.update(repo, repo['tip'].node())
push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo,
hg_repo_path=self.wc_path,
svn_url='file://' + self.repo_path)
tip = self.repo['tip']
self.assertEqual(old_files,
set(tip.manifest().keys() + ['alpha']))
self.assert_('alpha' not in tip.manifest())
def test_push_executable_file(self):
self.test_push_to_default(commit=True)
repo = self.repo
def file_callback(repo, memctx, path):
if path == 'gamma':
return context.memfilectx(path=path,
data='foo',
islink=False,
isexec=True,
copied=False)
raise IOError()
ctx = context.memctx(repo,
(repo['tip'].node(), node.nullid),
'message',
['gamma', ],
file_callback,
'author',
'2008-10-29 21:26:00 -0500',
{'branch': 'default', })
new_hash = repo.commitctx(ctx)
hg.update(repo, repo['tip'].node())
push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo,
hg_repo_path=self.wc_path,
svn_url='file://' + self.repo_path)
tip = self.repo['tip']
self.assertNotEqual(tip.node(), new_hash)
self.assert_('@' in tip.user())
self.assertEqual(tip['gamma'].flags(), 'x')
self.assertEqual(tip['gamma'].data(), 'foo')
self.assertEqual([x for x in tip.manifest().keys() if 'x' not in
tip[x].flags()], ['alpha', 'beta', 'adding_file', ])
def test_push_symlink_file(self):
self.test_push_to_default(commit=True)
repo = self.repo
def file_callback(repo, memctx, path):
if path == 'gamma':
return context.memfilectx(path=path,
data='foo',
islink=True,
isexec=False,
copied=False)
raise IOError()
ctx = context.memctx(repo,
(repo['tip'].node(), node.nullid),
'message',
['gamma', ],
file_callback,
'author',
'2008-10-29 21:26:00 -0500',
{'branch': 'default', })
new_hash = repo.commitctx(ctx)
hg.update(repo, repo['tip'].node())
push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo,
hg_repo_path=self.wc_path,
svn_url='file://' + self.repo_path)
tip = self.repo['tip']
self.assertNotEqual(tip.node(), new_hash)
self.assertEqual(tip['gamma'].flags(), 'l')
self.assertEqual(tip['gamma'].data(), 'foo')
self.assertEqual([x for x in tip.manifest().keys() if 'l' not in
tip[x].flags()], ['alpha', 'beta', 'adding_file', ])
def test_push_with_new_dir(self):
self.test_push_to_default(commit=True)
repo = self.repo
def file_callback(repo, memctx, path):
if path == 'newdir/gamma':
return context.memfilectx(path=path,
data='foo',
islink=False,
isexec=False,
copied=False)
raise IOError()
ctx = context.memctx(repo,
(repo['tip'].node(), node.nullid),
'message',
['newdir/gamma', ],
file_callback,
'author',
'2008-10-29 21:26:00 -0500',
{'branch': 'default', })
new_hash = repo.commitctx(ctx)
hg.update(repo, repo['tip'].node())
push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo,
hg_repo_path=self.wc_path,
svn_url='file://' + self.repo_path)
tip = self.repo['tip']
self.assertNotEqual(tip.node(), new_hash)
self.assertEqual(tip['newdir/gamma'].data(), 'foo')
def test_push_existing_file_newly_execute(self, execute=True,
link=False, expected_flags='x'):
self.test_push_to_default()
repo = self.repo
def file_callback(repo, memctx, path):
return context.memfilectx(path=path, return context.memfilectx(path=path,
data='foo', data='foo',
islink=link, islink=False,
isexec=execute, isexec=False,
copied=False) copied=False)
ctx = context.memctx(repo, raise IOError()
(repo['default'].node(), node.nullid), ctx = context.memctx(repo,
'message', (repo['default'].node(), node.nullid),
['alpha', ], 'automated test',
file_callback, ['adding_file'],
'author', file_callback,
'2008-1-1 00:00:00 -0500', 'an_author',
{'branch': 'default', }) '2008-10-07 20:59:48 -0500',
new_hash = repo.commitctx(ctx) {'branch': 'default',})
hg.update(repo, repo['tip'].node()) new_hash = repo.commitctx(ctx)
push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo, if not commit:
hg_repo_path=self.wc_path, return # some tests use this test as an extended setup.
svn_url='file://' + self.repo_path) hg.update(repo, repo['tip'].node())
tip = self.repo['tip'] push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo,
self.assertNotEqual(tip.node(), new_hash) hg_repo_path=self.wc_path,
self.assertEqual(tip['alpha'].data(), 'foo') svn_url='svn://localhost/')
self.assertEqual(tip.parents()[0]['alpha'].flags(), '') tip = self.repo['tip']
self.assertEqual(tip['alpha'].flags(), expected_flags) self.assertNotEqual(tip.node(), old_tip)
# while we're here, double check pushing an already-executable file self.assertEqual(tip.parents()[0].node(), expected_parent)
# works self.assertEqual(tip['adding_file'].data(), 'foo')
repo = self.repo self.assertEqual(tip.branch(), 'default')
def file_callback(repo, memctx, path):
return context.memfilectx(path=path,
data='bar', class PushTests(unittest.TestCase):
islink=link, def setUp(self):
isexec=execute, self.oldwd = os.getcwd()
copied=False) self.tmpdir = tempfile.mkdtemp('svnwrap_test')
ctx = context.memctx(repo, self.repo_path = '%s/testrepo' % self.tmpdir
(repo['default'].node(), node.nullid), self.wc_path = '%s/testrepo_wc' % self.tmpdir
'message', test_util.load_svndump_fixture(self.repo_path, 'simple_branch.svndump')
['alpha', ], fetch_command.fetch_revisions(ui.ui(),
file_callback, svn_url='file://%s' % self.repo_path,
'author', hg_repo_path=self.wc_path)
'2008-1-1 00:00:00 -0500',
{'branch': 'default', }) # define this as a property so that it reloads anytime we need it
new_hash = repo.commitctx(ctx) @property
hg.update(repo, repo['tip'].node()) def repo(self):
push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo, return hg.repository(ui.ui(), self.wc_path)
hg_repo_path=self.wc_path,
svn_url='file://' + self.repo_path) def tearDown(self):
tip = self.repo['tip'] shutil.rmtree(self.tmpdir)
self.assertNotEqual(tip.node(), new_hash) os.chdir(self.oldwd)
self.assertEqual(tip['alpha'].data(), 'bar')
self.assertEqual(tip.parents()[0]['alpha'].flags(), expected_flags) def test_push_to_default(self, commit=True):
self.assertEqual(tip['alpha'].flags(), expected_flags) repo = self.repo
old_tip = repo['tip'].node()
expected_parent = repo['default'].node()
def file_callback(repo, memctx, path):
if path == 'adding_file':
return context.memfilectx(path=path,
data='foo',
islink=False,
isexec=False,
copied=False)
raise IOError()
ctx = context.memctx(repo,
(repo['default'].node(), node.nullid),
'automated test',
['adding_file'],
file_callback,
'an_author',
'2008-10-07 20:59:48 -0500',
{'branch': 'default',})
new_hash = repo.commitctx(ctx)
if not commit:
return # some tests use this test as an extended setup.
hg.update(repo, repo['tip'].node())
push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo,
hg_repo_path=self.wc_path,
svn_url='file://'+self.repo_path)
tip = self.repo['tip']
self.assertNotEqual(tip.node(), old_tip)
self.assertEqual(tip.parents()[0].node(), expected_parent)
self.assertEqual(tip['adding_file'].data(), 'foo')
self.assertEqual(tip.branch(), 'default')
def test_push_two_revs(self):
# set up some work for us
self.test_push_to_default(commit=False)
repo = self.repo
old_tip = repo['tip'].node()
expected_parent = repo['tip'].parents()[0].node()
def file_callback(repo, memctx, path):
if path == 'adding_file2':
return context.memfilectx(path=path,
data='foo2',
islink=False,
isexec=False,
copied=False)
raise IOError()
ctx = context.memctx(repo,
(repo['default'].node(), node.nullid),
'automated test',
['adding_file2'],
file_callback,
'an_author',
'2008-10-07 20:59:48 -0500',
{'branch': 'default',})
new_hash = repo.commitctx(ctx)
hg.update(repo, repo['tip'].node())
push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo,
hg_repo_path=self.wc_path,
svn_url='file://'+self.repo_path)
tip = self.repo['tip']
self.assertNotEqual(tip.node(), old_tip)
self.assertNotEqual(tip.parents()[0].node(), old_tip)
self.assertEqual(tip.parents()[0].parents()[0].node(), expected_parent)
self.assertEqual(tip['adding_file2'].data(), 'foo2')
self.assertEqual(tip['adding_file'].data(), 'foo')
self.assertEqual(tip.parents()[0]['adding_file'].data(), 'foo')
try:
self.assertEqual(tip.parents()[0]['adding_file2'].data(), 'foo')
assert False, "this is impossible, adding_file2 should not be in this manifest."
except revlog.LookupError, e:
pass
self.assertEqual(tip.branch(), 'default')
def test_push_to_branch(self):
repo = self.repo
def file_callback(repo, memctx, path):
if path == 'adding_file':
return context.memfilectx(path=path,
data='foo',
islink=False,
isexec=False,
copied=False)
raise IOError()
ctx = context.memctx(repo,
(repo['the_branch'].node(), node.nullid),
'automated test',
['adding_file'],
file_callback,
'an_author',
'2008-10-07 20:59:48 -0500',
{'branch': 'the_branch',})
new_hash = repo.commitctx(ctx)
#commands.update(ui.ui(), self.repo, node='tip')
hg.update(repo, repo['tip'].node())
push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo,
hg_repo_path=self.wc_path,
svn_url='file://'+self.repo_path)
tip = self.repo['tip']
self.assertNotEqual(tip.node(), new_hash)
self.assertEqual(tip['adding_file'].data(), 'foo')
self.assertEqual(tip.branch(), 'the_branch')
def test_delete_file(self):
repo = self.repo
def file_callback(repo, memctx, path):
raise IOError()
old_files = set(repo['default'].manifest().keys())
ctx = context.memctx(repo,
(repo['default'].node(), node.nullid),
'automated test',
['alpha'],
file_callback,
'an author',
'2008-10-29 21:26:00 -0500',
{'branch': 'default', })
new_hash = repo.commitctx(ctx)
hg.update(repo, repo['tip'].node())
push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo,
hg_repo_path=self.wc_path,
svn_url='file://' + self.repo_path)
tip = self.repo['tip']
self.assertEqual(old_files,
set(tip.manifest().keys() + ['alpha']))
self.assert_('alpha' not in tip.manifest())
def test_push_executable_file(self):
self.test_push_to_default(commit=True)
repo = self.repo
def file_callback(repo, memctx, path):
if path == 'gamma':
return context.memfilectx(path=path,
data='foo',
islink=False,
isexec=True,
copied=False)
raise IOError()
ctx = context.memctx(repo,
(repo['tip'].node(), node.nullid),
'message',
['gamma', ],
file_callback,
'author',
'2008-10-29 21:26:00 -0500',
{'branch': 'default', })
new_hash = repo.commitctx(ctx)
hg.update(repo, repo['tip'].node())
push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo,
hg_repo_path=self.wc_path,
svn_url='file://' + self.repo_path)
tip = self.repo['tip']
self.assertNotEqual(tip.node(), new_hash)
self.assert_('@' in tip.user())
self.assertEqual(tip['gamma'].flags(), 'x')
self.assertEqual(tip['gamma'].data(), 'foo')
self.assertEqual([x for x in tip.manifest().keys() if 'x' not in
tip[x].flags()], ['alpha', 'beta', 'adding_file', ])
def test_push_symlink_file(self):
self.test_push_to_default(commit=True)
repo = self.repo
def file_callback(repo, memctx, path):
if path == 'gamma':
return context.memfilectx(path=path,
data='foo',
islink=True,
isexec=False,
copied=False)
raise IOError()
ctx = context.memctx(repo,
(repo['tip'].node(), node.nullid),
'message',
['gamma', ],
file_callback,
'author',
'2008-10-29 21:26:00 -0500',
{'branch': 'default', })
new_hash = repo.commitctx(ctx)
hg.update(repo, repo['tip'].node())
push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo,
hg_repo_path=self.wc_path,
svn_url='file://' + self.repo_path)
tip = self.repo['tip']
self.assertNotEqual(tip.node(), new_hash)
self.assertEqual(tip['gamma'].flags(), 'l')
self.assertEqual(tip['gamma'].data(), 'foo')
self.assertEqual([x for x in tip.manifest().keys() if 'l' not in
tip[x].flags()], ['alpha', 'beta', 'adding_file', ])
def test_push_with_new_dir(self):
self.test_push_to_default(commit=True)
repo = self.repo
def file_callback(repo, memctx, path):
if path == 'newdir/gamma':
return context.memfilectx(path=path,
data='foo',
islink=False,
isexec=False,
copied=False)
raise IOError()
ctx = context.memctx(repo,
(repo['tip'].node(), node.nullid),
'message',
['newdir/gamma', ],
file_callback,
'author',
'2008-10-29 21:26:00 -0500',
{'branch': 'default', })
new_hash = repo.commitctx(ctx)
hg.update(repo, repo['tip'].node())
push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo,
hg_repo_path=self.wc_path,
svn_url='file://' + self.repo_path)
tip = self.repo['tip']
self.assertNotEqual(tip.node(), new_hash)
self.assertEqual(tip['newdir/gamma'].data(), 'foo')
def test_push_existing_file_newly_execute(self, execute=True,
link=False, expected_flags='x'):
self.test_push_to_default()
repo = self.repo
def file_callback(repo, memctx, path):
return context.memfilectx(path=path,
data='foo',
islink=link,
isexec=execute,
copied=False)
ctx = context.memctx(repo,
(repo['default'].node(), node.nullid),
'message',
['alpha', ],
file_callback,
'author',
'2008-1-1 00:00:00 -0500',
{'branch': 'default', })
new_hash = repo.commitctx(ctx)
hg.update(repo, repo['tip'].node())
push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo,
hg_repo_path=self.wc_path,
svn_url='file://' + self.repo_path)
tip = self.repo['tip']
self.assertNotEqual(tip.node(), new_hash)
self.assertEqual(tip['alpha'].data(), 'foo')
self.assertEqual(tip.parents()[0]['alpha'].flags(), '')
self.assertEqual(tip['alpha'].flags(), expected_flags)
# while we're here, double check pushing an already-executable file
# works
repo = self.repo
def file_callback(repo, memctx, path):
return context.memfilectx(path=path,
data='bar',
islink=link,
isexec=execute,
copied=False)
ctx = context.memctx(repo,
(repo['default'].node(), node.nullid),
'message',
['alpha', ],
file_callback,
'author',
'2008-1-1 00:00:00 -0500',
{'branch': 'default', })
new_hash = repo.commitctx(ctx)
hg.update(repo, repo['tip'].node())
push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo,
hg_repo_path=self.wc_path,
svn_url='file://' + self.repo_path)
tip = self.repo['tip']
self.assertNotEqual(tip.node(), new_hash)
self.assertEqual(tip['alpha'].data(), 'bar')
self.assertEqual(tip.parents()[0]['alpha'].flags(), expected_flags)
self.assertEqual(tip['alpha'].flags(), expected_flags)
else:
class PushTests(unittest.TestCase):
"""Dummy so the test runner doesn't get upset.
"""
pass
def suite(): def suite():
test_classes = [PushTests, PushOverSvnserveTests] test_classes = [PushTests, PushOverSvnserveTests]