mirror of
https://github.com/facebook/sapling.git
synced 2024-10-09 08:18:15 +03:00
4a0cbf9fe4
The only consumer of the worker pool code today is `hg update`. Previously, the algorithm to partition work to each worker process preserved input list ordering. We'd take the first N elements, then the next N elements, etc. Measurements on mozilla-central demonstrate this isn't an optimal partitioning strategy. I added debug code to print when workers were exiting. When performing a working copy update on a previously empty working copy of mozilla-central, I noticed that process lifetimes were all over the map. One worker would complete after 7s. Many would complete after 12s. And another worker would often take >16s. This behavior occurred for many worker process counts and was more pronounced on some than others. What I suspect is happening is some workers end up with lots of small files and others with large files. This is because the update code passes in actions according to sorted filenames. And, directories under tend to accumulate similar files. For example, test directories often consist of many small test files and media directories contain binary (often larger) media files. This patch changes the partitioning algorithm to select every Nth element from the input list. Each worker thus has a similar composition of files to operate on. The result of this change is that worker processes now all tend to exit around the same time. The possibility of a long pole due to being unlucky and receiving all the large files has been mitigated. Overall execution time seems to drop, but not by a statistically significant amount on mozilla-central. However, repositories with directories containing many large files will likely show a drop. There shouldn't be any regressions due to partial manifest decoding because the update code already iterates the manifest to determine what files to operate on, so the manifest should already be decoded.
163 lines
4.5 KiB
Python
163 lines
4.5 KiB
Python
# worker.py - master-slave parallelism support
|
|
#
|
|
# Copyright 2013 Facebook, Inc.
|
|
#
|
|
# This software may be used and distributed according to the terms of the
|
|
# GNU General Public License version 2 or any later version.
|
|
|
|
from __future__ import absolute_import
|
|
|
|
import errno
|
|
import os
|
|
import signal
|
|
import sys
|
|
import threading
|
|
|
|
from .i18n import _
|
|
from . import error
|
|
|
|
def countcpus():
|
|
'''try to count the number of CPUs on the system'''
|
|
|
|
# posix
|
|
try:
|
|
n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
|
|
if n > 0:
|
|
return n
|
|
except (AttributeError, ValueError):
|
|
pass
|
|
|
|
# windows
|
|
try:
|
|
n = int(os.environ['NUMBER_OF_PROCESSORS'])
|
|
if n > 0:
|
|
return n
|
|
except (KeyError, ValueError):
|
|
pass
|
|
|
|
return 1
|
|
|
|
def _numworkers(ui):
|
|
s = ui.config('worker', 'numcpus')
|
|
if s:
|
|
try:
|
|
n = int(s)
|
|
if n >= 1:
|
|
return n
|
|
except ValueError:
|
|
raise error.Abort(_('number of cpus must be an integer'))
|
|
return min(max(countcpus(), 4), 32)
|
|
|
|
if os.name == 'posix':
|
|
_startupcost = 0.01
|
|
else:
|
|
_startupcost = 1e30
|
|
|
|
def worthwhile(ui, costperop, nops):
|
|
'''try to determine whether the benefit of multiple processes can
|
|
outweigh the cost of starting them'''
|
|
linear = costperop * nops
|
|
workers = _numworkers(ui)
|
|
benefit = linear - (_startupcost * workers + linear / workers)
|
|
return benefit >= 0.15
|
|
|
|
def worker(ui, costperarg, func, staticargs, args):
|
|
'''run a function, possibly in parallel in multiple worker
|
|
processes.
|
|
|
|
returns a progress iterator
|
|
|
|
costperarg - cost of a single task
|
|
|
|
func - function to run
|
|
|
|
staticargs - arguments to pass to every invocation of the function
|
|
|
|
args - arguments to split into chunks, to pass to individual
|
|
workers
|
|
'''
|
|
if worthwhile(ui, costperarg, len(args)):
|
|
return _platformworker(ui, func, staticargs, args)
|
|
return func(*staticargs + (args,))
|
|
|
|
def _posixworker(ui, func, staticargs, args):
|
|
rfd, wfd = os.pipe()
|
|
workers = _numworkers(ui)
|
|
oldhandler = signal.getsignal(signal.SIGINT)
|
|
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
|
pids, problem = [], [0]
|
|
for pargs in partition(args, workers):
|
|
pid = os.fork()
|
|
if pid == 0:
|
|
signal.signal(signal.SIGINT, oldhandler)
|
|
try:
|
|
os.close(rfd)
|
|
for i, item in func(*(staticargs + (pargs,))):
|
|
os.write(wfd, '%d %s\n' % (i, item))
|
|
os._exit(0)
|
|
except KeyboardInterrupt:
|
|
os._exit(255)
|
|
# other exceptions are allowed to propagate, we rely
|
|
# on lock.py's pid checks to avoid release callbacks
|
|
pids.append(pid)
|
|
pids.reverse()
|
|
os.close(wfd)
|
|
fp = os.fdopen(rfd, 'rb', 0)
|
|
def killworkers():
|
|
# if one worker bails, there's no good reason to wait for the rest
|
|
for p in pids:
|
|
try:
|
|
os.kill(p, signal.SIGTERM)
|
|
except OSError as err:
|
|
if err.errno != errno.ESRCH:
|
|
raise
|
|
def waitforworkers():
|
|
for _pid in pids:
|
|
st = _exitstatus(os.wait()[1])
|
|
if st and not problem[0]:
|
|
problem[0] = st
|
|
killworkers()
|
|
t = threading.Thread(target=waitforworkers)
|
|
t.start()
|
|
def cleanup():
|
|
signal.signal(signal.SIGINT, oldhandler)
|
|
t.join()
|
|
status = problem[0]
|
|
if status:
|
|
if status < 0:
|
|
os.kill(os.getpid(), -status)
|
|
sys.exit(status)
|
|
try:
|
|
for line in fp:
|
|
l = line.split(' ', 1)
|
|
yield int(l[0]), l[1][:-1]
|
|
except: # re-raises
|
|
killworkers()
|
|
cleanup()
|
|
raise
|
|
cleanup()
|
|
|
|
def _posixexitstatus(code):
|
|
'''convert a posix exit status into the same form returned by
|
|
os.spawnv
|
|
|
|
returns None if the process was stopped instead of exiting'''
|
|
if os.WIFEXITED(code):
|
|
return os.WEXITSTATUS(code)
|
|
elif os.WIFSIGNALED(code):
|
|
return -os.WTERMSIG(code)
|
|
|
|
if os.name != 'nt':
|
|
_platformworker = _posixworker
|
|
_exitstatus = _posixexitstatus
|
|
|
|
def partition(lst, nslices):
|
|
'''partition a list into N slices of roughly equal size
|
|
|
|
The current strategy takes every Nth element from the input. If
|
|
we ever write workers that need to preserve grouping in input
|
|
we should consider allowing callers to specify a partition strategy.
|
|
'''
|
|
for i in range(nslices):
|
|
yield lst[i::nslices]
|