sapling/eden/scm/tests/test-atomictempfile.py
Johan Schuijt-Li 1b7dd50dbf hg: don't create tempfiles longer than allowed on filesystem
Summary:
[svcscm@hg022.prn2 /data/scm/www]$ hg log -r master
    transaction abort!
    rollback completed
    abort: File name too long: '/data/scm/www-merge-generaldelta/.hg/store/data/flib/intern/____generated____/_graph_q_l_meerkat_step/flib/intern/entschema/generated/entity/bizapps/dcp__insight/application__config/_ent_d_c_p_insights_application_config_action.php/._g_q_l_g___intern_____set_permission_manager_domain_dcp_insights_application_config_data_____ent_d_c_p_insights_application_config_action____gen_perform_graph_q_l_set_permission_manager_domain_dcp_insights_application_config_mutation_coercer.php.i-U0sMqX~'

On hg022.prn2 www currently failes under certain circumstances because filenames longer than
255 characters are currently created. I traced this back to mktempcopy and made sure that
we truncate filenames when they are created too long.

Reviewed By: quark-zju

Differential Revision: D18639433

fbshipit-source-id: 20e4b086ca5c775b6054b642ecedfb05efd00fb9
2019-11-21 10:41:02 -08:00

135 lines
4.2 KiB
Python

from __future__ import absolute_import
import glob
import os
import shutil
import tempfile
import unittest
from edenscm.mercurial import util
atomictempfile = util.atomictempfile
# force dealing with tmp filenames that go
# over the maximum file length
filename = "A" * 253
try:
xrange(0)
except NameError:
xrange = range
class testatomictempfile(unittest.TestCase):
def setUp(self):
self._testdir = tempfile.mkdtemp("atomictempfiletest")
self._filename = os.path.join(self._testdir, filename)
def tearDown(self):
shutil.rmtree(self._testdir, True)
def testsimple(self):
file = atomictempfile(self._filename)
self.assertFalse(os.path.isfile(self._filename))
tempfilename = file._tempname
self.assertTrue(tempfilename in glob.glob(os.path.join(self._testdir, ".*")))
file.write(b"argh\n")
file.close()
self.assertTrue(os.path.isfile(self._filename))
self.assertTrue(
tempfilename
not in glob.glob(os.path.join(self._testdir, ".testfilename-*"))
)
# discard() removes the temp file without making the write permanent
def testdiscard(self):
file = atomictempfile(self._filename)
(dir, basename) = os.path.split(file._tempname)
file.write(b"yo\n")
file.discard()
self.assertFalse(os.path.isfile(self._filename))
self.assertTrue(basename not in os.listdir("."))
# if a programmer screws up and passes bad args to atomictempfile, they
# get a plain ordinary TypeError, not infinite recursion
def testoops(self):
with self.assertRaises(TypeError):
atomictempfile()
# checkambig=True avoids ambiguity of timestamp
def testcheckambig(self):
def atomicwrite(checkambig):
f = atomictempfile(self._filename, checkambig=checkambig)
f.write("FOO")
f.close()
# try some times, because reproduction of ambiguity depends on
# "filesystem time"
for i in xrange(5):
atomicwrite(False)
oldstat = os.stat(self._filename)
if oldstat.st_ctime != oldstat.st_mtime:
# subsequent changing never causes ambiguity
continue
repetition = 3
# repeat atomic write with checkambig=True, to examine
# whether st_mtime is advanced multiple times as expected
for j in xrange(repetition):
atomicwrite(True)
newstat = os.stat(self._filename)
if oldstat.st_ctime != newstat.st_ctime:
# timestamp ambiguity was naturally avoided while repetition
continue
# st_mtime should be advanced "repetition" times, because
# all atomicwrite() occurred at same time (in sec)
self.assertTrue(
newstat.st_mtime == ((oldstat.st_mtime + repetition) & 0x7FFFFFFF)
)
# no more examination is needed, if assumption above is true
break
else:
# This platform seems too slow to examine anti-ambiguity
# of file timestamp (or test happened to be executed at
# bad timing). Exit silently in this case, because running
# on other faster platforms can detect problems
pass
def testread(self):
with open(self._filename, "wb") as f:
f.write(b"foobar\n")
file = atomictempfile(self._filename, mode="rb")
self.assertTrue(file.read(), b"foobar\n")
file.discard()
def testcontextmanagersuccess(self):
"""When the context closes, the file is closed"""
with atomictempfile("foo") as f:
self.assertFalse(os.path.isfile("foo"))
f.write(b"argh\n")
self.assertTrue(os.path.isfile("foo"))
def testcontextmanagerfailure(self):
"""On exception, the file is discarded"""
try:
with atomictempfile("foo") as f:
self.assertFalse(os.path.isfile("foo"))
f.write(b"argh\n")
raise ValueError
except ValueError:
pass
self.assertFalse(os.path.isfile("foo"))
if __name__ == "__main__":
import silenttestrunner
silenttestrunner.main(__name__)