sapling/eden/scm/tests/test-filecache.py

243 lines
5.7 KiB
Python
Raw Normal View History

from __future__ import absolute_import, print_function
import os
import subprocess
import sys
from edenscm.mercurial import (
extensions,
hg,
localrepo,
ui as uimod,
util,
vfs as vfsmod,
)
from hghave import require
require(["py2"])
flake8: enable F821 check Summary: This check is useful and detects real errors (ex. fbconduit). Unfortunately `arc lint` will run it with both py2 and py3 so a lot of py2 builtins will still be warned. I didn't find a clean way to disable py3 check. So this diff tries to fix them. For `xrange`, the change was done by a script: ``` import sys import redbaron headertypes = {'comment', 'endl', 'from_import', 'import', 'string', 'assignment', 'atomtrailers'} xrangefix = '''try: xrange(0) except NameError: xrange = range ''' def isxrange(x): try: return x[0].value == 'xrange' except Exception: return False def main(argv): for i, path in enumerate(argv): print('(%d/%d) scanning %s' % (i + 1, len(argv), path)) content = open(path).read() try: red = redbaron.RedBaron(content) except Exception: print(' warning: failed to parse') continue hasxrange = red.find('atomtrailersnode', value=isxrange) hasxrangefix = 'xrange = range' in content if hasxrangefix or not hasxrange: print(' no need to change') continue # find a place to insert the compatibility statement changed = False for node in red: if node.type in headertypes: continue # node.insert_before is an easier API, but it has bugs changing # other "finally" and "except" positions. So do the insert # manually. # # node.insert_before(xrangefix) line = node.absolute_bounding_box.top_left.line - 1 lines = content.splitlines(1) content = ''.join(lines[:line]) + xrangefix + ''.join(lines[line:]) changed = True break if changed: # "content" is faster than "red.dumps()" open(path, 'w').write(content) print(' updated') if __name__ == "__main__": sys.exit(main(sys.argv[1:])) ``` For other py2 builtins that do not have a py3 equivalent, some `# noqa` were added as a workaround for now. Reviewed By: DurhamG Differential Revision: D6934535 fbshipit-source-id: 546b62830af144bc8b46788d2e0fd00496838939
2018-02-10 04:31:44 +03:00
try:
xrange(0)
except NameError:
xrange = range
class fakerepo(object):
def __init__(self):
self._filecache = {}
class fakevfs(object):
def join(self, p):
return p
vfs = localvfs = sharedvfs = fakevfs()
def unfiltered(self):
return self
def sjoin(self, p):
return p
@localrepo.repofilecache("x", "y")
def cached(self):
print("creating")
return "string from function"
def invalidate(self):
for k in self._filecache:
try:
delattr(self, k)
except AttributeError:
pass
def basic(repo):
print("* neither file exists")
# calls function
repo.cached
repo.invalidate()
print("* neither file still exists")
# uses cache
repo.cached
# create empty file
f = open("x", "w")
f.close()
repo.invalidate()
print("* empty file x created")
# should recreate the object
repo.cached
f = open("x", "w")
f.write("a")
f.close()
repo.invalidate()
print("* file x changed size")
# should recreate the object
repo.cached
repo.invalidate()
print("* nothing changed with either file")
# stats file again, reuses object
repo.cached
# atomic replace file, size doesn't change
# hopefully st_mtime doesn't change as well so this doesn't use the cache
# because of inode change
f = vfsmod.vfs(".")("x", "w", atomictemp=True)
f.write("b")
f.close()
repo.invalidate()
print("* file x changed inode")
repo.cached
# create empty file y
f = open("y", "w")
f.close()
repo.invalidate()
print("* empty file y created")
# should recreate the object
repo.cached
f = open("y", "w")
f.write("A")
f.close()
repo.invalidate()
print("* file y changed size")
# should recreate the object
repo.cached
f = vfsmod.vfs(".")("y", "w", atomictemp=True)
f.write("B")
f.close()
repo.invalidate()
print("* file y changed inode")
repo.cached
f = vfsmod.vfs(".")("x", "w", atomictemp=True)
f.write("c")
f.close()
f = vfsmod.vfs(".")("y", "w", atomictemp=True)
f.write("C")
f.close()
repo.invalidate()
print("* both files changed inode")
repo.cached
def test_filecache_synced():
2015-08-28 17:53:55 +03:00
# test old behavior that caused filecached properties to go out of sync
os.system("hg init && echo a >> a && hg ci -qAm.")
repo = hg.repository(uimod.ui.load())
# first rollback clears the filecache, but changelog to stays in __dict__
repo.rollback()
repo.commit(".")
# second rollback comes along and touches the changelog externally
# (file is moved)
repo.rollback()
# but since changelog isn't under the filecache control anymore, we don't
# see that it changed, and return the old changelog without reconstructing
# it
repo.commit(".")
def setbeforeget(repo):
os.remove("x")
os.remove("y")
repo.cached = "string set externally"
repo.invalidate()
print("* neither file exists")
print(repo.cached)
repo.invalidate()
f = open("x", "w")
f.write("a")
f.close()
print("* file x created")
print(repo.cached)
repo.cached = "string 2 set externally"
repo.invalidate()
print("* string set externally again")
print(repo.cached)
repo.invalidate()
f = open("y", "w")
f.write("b")
f.close()
print("* file y created")
print(repo.cached)
def antiambiguity():
filename = "ambigcheck"
# try some times, because reproduction of ambiguity depends on
# "filesystem time"
for i in xrange(5):
fp = open(filename, "w")
fp.write("FOO")
fp.close()
oldstat = util.stat(filename)
if oldstat.st_ctime != oldstat.st_mtime:
# subsequent changing never causes ambiguity
continue
repetition = 3
# repeat changing via checkambigatclosing, to examine whether
# st_mtime is advanced multiple times as expected
for i in xrange(repetition):
# explicit closing
fp = vfsmod.checkambigatclosing(open(filename, "a"))
fp.write("FOO")
fp.close()
# implicit closing by "with" statement
with vfsmod.checkambigatclosing(open(filename, "a")) as fp:
fp.write("BAR")
newstat = os.stat(filename)
if oldstat.st_ctime != newstat.st_ctime:
# timestamp ambiguity was naturally avoided while repetition
continue
# st_mtime should be advanced "repetition * 2" times, because
# all changes occurred at same time (in sec)
expected = (oldstat.st_mtime + repetition * 2) & 0x7FFFFFFF
if newstat.st_mtime != expected:
print(
"'newstat.st_mtime %s is not %s (as %s + %s * 2)"
% (newstat.st_mtime, expected, oldstat.st_mtime, repetition)
)
# no more examination is needed regardless of result
break
else:
# This platform seems too slow to examine anti-ambiguity
# of file timestamp (or test happened to be executed at
# bad timing). Exit silently in this case, because running
# on other faster platforms can detect problems
pass
print("basic:")
print()
basic(fakerepo())
print()
test_filecache_synced()
print()
print("setbeforeget:")
print()
setbeforeget(fakerepo())
print()
print("antiambiguity:")
print()
antiambiguity()