sapling/fastmanifest/concurrency.py

213 lines
6.6 KiB
Python
Raw Normal View History

# concurrency.py
#
# Copyright 2016 Facebook, Inc.
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
import errno
import resource
import os
import platform
import socket
import stat
import subprocess
import sys
import time
from mercurial import error
# Returns true if we're the original process, returns false if we're the child
# process.
#
# NOTE: This is extremely platform-specific code.
[fastmanifest] non-silent worker processes should flush their output streams before dying Summary: Noticed this while trying to debug something that since we call `_exit()`, we don't have the luxury of the standard flushing of output streams. If we have a silent worker that doesn't output enough to trigger an automatic flush, we might leave things in the buffer. Test Plan: pass unit tests. output something small after the fork via ui.log, and it is now flushed to console. ``` diff --git a/fastmanifest/cachemanager.py b/fastmanifest/cachemanager.py --- a/fastmanifest/cachemanager.py +++ b/fastmanifest/cachemanager.py @@ -175,6 +175,9 @@ return cache = fastmanifestcache.getinstance(repo.store.opener, ui) + if background: + ui.debug("hi") + computedrevs = scmutil.revrange(repo, revset) sortedrevs = sorted(computedrevs, key=lambda x:-x) if ui.configbool("fastmanifest", "randomorder", True): diff --git a/tests/test-fastmanifest.t b/tests/test-fastmanifest.t --- a/tests/test-fastmanifest.t +++ b/tests/test-fastmanifest.t @@ -24,6 +24,7 @@ > [fastmanifest] > cachecutoffdays=-1 > randomorder=False + > silentworker=False > EOF $ mkcommit a @@ -52,7 +53,7 @@ $ sleep 1 $ hg debugcachemanifest -a --background $ hg debugcachemanifest -a --background - $ hg debugcachemanifest -a --background + $ hg debugcachemanifest -a --background --debug $ sleep 1 $ hg debugcachemanifest --list fast7ab5760d084a24168f7595c38c00f4bbc2e308d9 (size 328 bytes) ``` With this change, we see the "hi". Without this change, we do not see the "hi" Reviewers: lcharignon Reviewed By: lcharignon Subscribers: mitrandir, mjpieters Differential Revision: https://phabricator.intern.facebook.com/D3387112 Signature: t1:3387112:1464987504:dc44129afb36e27c84dd6d6e52db8d82533b4341
2016-06-04 03:04:23 +03:00
def fork_worker(ui, repo, silent_worker):
if not silent_worker:
# if we don't want a silent worker, then we need to flush any streams so
# any buffered content only gets written *once*.
sys.stdout.flush()
sys.stderr.flush()
pid = os.fork()
if pid > 0:
return True
if silent_worker:
# close all file descriptors.
flimit = resource.getrlimit(resource.RLIMIT_NOFILE)
os.closerange(0, flimit[0])
# reopen some new file handles.
ui.fin = sys.stdin = open(os.devnull, "r")
ui.fout = ui.ferr = sys.stdout = sys.stderr = open(os.devnull, "w")
repo.ui = ui
os.setsid()
pid = os.fork()
if pid > 0:
os._exit(0)
return False
class looselock(object):
"""A loose lock. If the lock is held and the lockfile is recent, then we
immediately fail. If the lockfile is older than X seconds, where
X=stealtime, then we touch the lockfile and proceed. This is slightly
vulnerable to a thundering herd, as a bunch of callers that arrive at the
expiration may all proceed."""
_host = None
def __init__(self, vfs, lockname, stealtime=10.0):
self.vfs = vfs
self.lockname = lockname
self.stealtime = stealtime
self.refcount = 0
self.stealcount = 0
def _trylock(self, lockcontents):
"""Attempt to acquire a lock.
Raise error.LockHeld if the lock is already held.
Raises error.LockUnavailable if the lock could not be acquired for any
other reason.
This is an internal API, and shouldn't be called externally.
"""
try:
self.vfs.makelock(lockcontents, self.lockname)
except (OSError, IOError) as ex:
if ex.errno == errno.EEXIST:
raise error.LockHeld(ex.errno,
self.vfs.join(self.lockname),
self.lockname,
"unimplemented")
raise error.LockUnavailable(ex.errno,
ex.strerror,
self.vfs.join(self.lockname),
self.lockname)
def lock(self):
"""Attempt to acquire a lock.
Raise error.LockHeld if the lock is already held and the lock is too
recent to be stolen.
Raises error.LockUnavailable if the lock could not be acquired for any
other reason.
"""
if self.stealcount > 0:
# we stole the lock, so we should continue stealing.
self.stealcount += 1
return self
if looselock._host is None:
looselock._host = socket.gethostname()
lockcontents = '%s:%s' % (looselock._host, os.getpid())
try:
self._trylock(lockcontents)
except error.LockHeld:
# how old is the file?
steal = False
try:
fstat = self.vfs.lstat(self.lockname)
mtime = fstat[stat.ST_MTIME]
if time.time() - mtime > self.stealtime:
# touch the file
self.vfs.utime(self.lockname)
steal = True
else:
raise
except OSError as ex:
if ex.errno == errno.ENOENT:
steal = True
else:
raise
if steal:
# we shouldn't have any hard references
assert self.refcount == 0
# bump the stealcount
self.stealcount += 1
else:
self.refcount += 1
return self
def unlock(self):
"""Releases a lock."""
if self.stealcount > 1:
self.stealcount -= 1
return
elif self.refcount > 1:
self.refcount -= 1
return
elif self.refcount == 1 or self.stealcount == 1:
# delete the file
try:
self.vfs.unlink(self.lockname)
except OSError as ex:
if ex.errno == errno.ENOENT:
pass
else:
raise
self.refcount = 0
self.stealcount = 0
def held(self):
return (self.stealcount != 0 or
self.refcount != 0)
def __enter__(self):
return self.lock()
def __exit__(self, exc_type, exc_value, exc_tb):
return self.unlock()
# Copied from the hgext/logtoprocess.py extension
if platform.system() == 'Windows':
# no fork on Windows, but we can create a detached process
# https://msdn.microsoft.com/en-us/library/windows/desktop/ms684863.aspx
# No stdlib constant exists for this value
DETACHED_PROCESS = 0x00000008
_creationflags = DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP
def runshellcommand(script, env):
# we can't use close_fds *and* redirect stdin. I'm not sure that we
# need to because the detached process has no console connection.
subprocess.Popen(
script, env=env, close_fds=True,
creationflags=_creationflags)
else:
def runshellcommand(script, env):
# double-fork to completely detach from the parent process
# based on http://code.activestate.com/recipes/278731
pid = os.fork()
if pid:
# parent
return
# subprocess.Popen() forks again, all we need to add is
# flag the new process as a new session.
if sys.version_info < (3, 2):
newsession = {'preexec_fn': os.setsid}
else:
newsession = {'start_new_session': True}
try:
# connect stdin to devnull to make sure the subprocess can't
# muck up that stream for mercurial.
subprocess.Popen(
script, stdout=open(os.devnull, 'w'),
stderr=open(os.devnull, 'w'), stdin=open(os.devnull, 'r'),
env=env, close_fds=True, **newsession)
finally:
# mission accomplished, this child needs to exit and not
# continue the hg process here.
os._exit(0)