mirror of
https://github.com/facebook/sapling.git
synced 2024-10-09 16:31:02 +03:00
repack: move to fcntllock based locking
Previously repack use the standard Mercurial symlink based locking mechanism. This caused problems on our laptop users because the symlink locking relies on the host name and sometimes their hostname changes due to weird IT issues, which resulted in locks existing forever and repack never running. The symlink based locking scheme was also a problem in chroots, where two processes in different chroots may attempt to repack the same shared cache at the same time. Switching to a fcntllock based scheme will solve these issues. Differential Revision: https://phab.mercurial-scm.org/D1543
This commit is contained in:
parent
18975bdc14
commit
7deafbdc67
@ -8,10 +8,13 @@
|
||||
from __future__ import absolute_import
|
||||
|
||||
import errno
|
||||
import fcntl
|
||||
import os
|
||||
import subprocess
|
||||
|
||||
from mercurial.i18n import _
|
||||
from mercurial import (
|
||||
error,
|
||||
pycompat,
|
||||
)
|
||||
|
||||
@ -127,3 +130,42 @@ def replaceclass(container, classname):
|
||||
setattr(container, classname, cls)
|
||||
return cls
|
||||
return wrap
|
||||
|
||||
class fcntllock(object):
|
||||
"""A fcntllock based lock object. Currently it is always non-blocking.
|
||||
|
||||
Note that since it is fcntllock based, you can accidentally take it multiple
|
||||
times within one process and the first one to be released will release all
|
||||
of them. So the caller needs to be careful to not create more than one
|
||||
instance per lock.
|
||||
"""
|
||||
def __init__(self, opener, name, description):
|
||||
self.path = opener.join(name)
|
||||
self.description = description
|
||||
self.fp = None
|
||||
|
||||
def __enter__(self):
|
||||
path = self.path
|
||||
if self.fp is not None:
|
||||
raise error.Abort(_("unable to re-enter lock '%s'") % path)
|
||||
try:
|
||||
self.fp = open(path, 'w+')
|
||||
except (IOError, OSError) as ex:
|
||||
raise error.Abort(_("unable to create lock file '%s': %s") %
|
||||
(path, str(ex)))
|
||||
|
||||
try:
|
||||
fcntl.lockf(self.fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
except IOError:
|
||||
self.fp.close()
|
||||
self.fp = None
|
||||
raise error.LockHeld(errno.EAGAIN, path, self.description, '')
|
||||
|
||||
return self
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
fp = self.fp
|
||||
if fp is not None:
|
||||
fcntl.lockf(fp, fcntl.LOCK_UN)
|
||||
fp.close()
|
||||
self.fp = None
|
||||
|
@ -9,6 +9,7 @@ from __future__ import absolute_import
|
||||
from mercurial import error, filelog, revlog
|
||||
from mercurial.node import bin, hex, nullid, short
|
||||
from mercurial.i18n import _
|
||||
from hgext3rd import extutil
|
||||
from . import (
|
||||
constants,
|
||||
datapack,
|
||||
@ -18,7 +19,7 @@ from . import (
|
||||
shallowutil,
|
||||
)
|
||||
from .lz4wrapper import lz4decompress
|
||||
import hashlib, os
|
||||
import hashlib, os, time
|
||||
|
||||
def debugremotefilelog(ui, path, **opts):
|
||||
decompress = opts.get('decompress')
|
||||
@ -358,9 +359,12 @@ def debughistorypack(ui, path):
|
||||
short(p2node), short(linknode), copyfrom))
|
||||
|
||||
def debugwaitonrepack(repo):
|
||||
with repo._lock(repo.svfs, "repacklock", True, None,
|
||||
None, _('repacking %s') % repo.origroot):
|
||||
pass
|
||||
while True:
|
||||
try:
|
||||
with extutil.fcntllock(repo.svfs, 'repacklock', ''):
|
||||
return
|
||||
except error.LockHeld:
|
||||
time.sleep(0.1)
|
||||
|
||||
def debugwaitonprefetch(repo):
|
||||
with repo._lock(repo.svfs, "prefetchlock", True, None,
|
||||
|
@ -1,7 +1,7 @@
|
||||
from __future__ import absolute_import
|
||||
|
||||
import os
|
||||
from hgext3rd.extutil import runshellcommand
|
||||
from hgext3rd.extutil import runshellcommand, fcntllock
|
||||
from mercurial import (
|
||||
error,
|
||||
extensions,
|
||||
@ -412,8 +412,8 @@ class repacker(object):
|
||||
def run(self, targetdata, targethistory):
|
||||
ledger = repackledger()
|
||||
|
||||
with self.repo._lock(self.repo.svfs, "repacklock", False, None,
|
||||
None, _('repacking %s') % self.repo.origroot):
|
||||
with fcntllock(self.repo.svfs, "repacklock",
|
||||
_('repacking %s') % self.repo.origroot):
|
||||
self.repo.hook('prerepack')
|
||||
|
||||
# Populate ledger from source
|
||||
|
@ -8,11 +8,20 @@ import unittest
|
||||
|
||||
import silenttestrunner
|
||||
|
||||
from mercurial import (
|
||||
error,
|
||||
vfs,
|
||||
worker,
|
||||
)
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.path.insert(0, os.path.join(os.environ["TESTDIR"], "..", "hgext3rd"))
|
||||
|
||||
import extutil
|
||||
|
||||
locktimeout = 25
|
||||
locksuccess = 24
|
||||
|
||||
class ExtutilTests(unittest.TestCase):
|
||||
def testbgcommandnoblock(self):
|
||||
'''runbgcommand() should return without waiting for the process to
|
||||
@ -43,5 +52,32 @@ class ExtutilTests(unittest.TestCase):
|
||||
except OSError as ex:
|
||||
self.assertEqual(ex.errno, errno.EACCES)
|
||||
|
||||
def testfcntllock(self):
|
||||
testtmp = os.environ["TESTTMP"]
|
||||
opener = vfs.vfs(testtmp)
|
||||
name = 'testlock'
|
||||
|
||||
with extutil.fcntllock(opener, name, 'testing a lock'):
|
||||
otherlock = self.otherprocesslock(opener, name)
|
||||
self.assertEquals(otherlock, locktimeout,
|
||||
"other process should not have taken the lock")
|
||||
|
||||
otherlock = self.otherprocesslock(opener, name)
|
||||
self.assertEquals(otherlock, locksuccess,
|
||||
"other process should have taken the lock")
|
||||
|
||||
def otherprocesslock(self, opener, name):
|
||||
pid = os.fork()
|
||||
if pid == 0:
|
||||
try:
|
||||
with extutil.fcntllock(opener, name, 'other process lock'):
|
||||
os._exit(locksuccess)
|
||||
except error.LockHeld:
|
||||
os._exit(locktimeout)
|
||||
else:
|
||||
p, st = os.waitpid(pid, 0)
|
||||
st = worker._exitstatus(st) # Convert back to an int
|
||||
return st
|
||||
|
||||
if __name__ == '__main__':
|
||||
silenttestrunner.main(__name__)
|
||||
|
Loading…
Reference in New Issue
Block a user