remotefilelog: recreate cache process after fork

Summary:
If the main process forks, the connection to the cache client is likely
unusable. Let's drop that connection and we can recreate it later if needed.

Reviewed By: quark-zju

Differential Revision: D19796387

fbshipit-source-id: 59b0d3db9655d2233b55effcdf18cbd74a3f5edd
This commit is contained in:
Durham Goode 2020-02-07 13:18:45 -08:00 committed by Facebook Github Bot
parent 87bd36fb46
commit 6e338a97d4

View File

@ -140,14 +140,23 @@ class cacheconnection(object):
cache-specific implementation.
"""
def __init__(self, repo):
def __init__(self, repo, cachecommand):
self.pipeo = self.pipei = None
self.subprocess = None
self.connected = False
self.repo = repo
self._requested = None
self._cachecommand = cachecommand
self._pid = None
def connect(self, cachecommand):
def reconnect(self):
if not self.connected:
self._connect()
if self._pid != os.getpid():
self.close()
self._connect()
def _connect(self):
if self.pipeo:
raise error.Abort(_("cache connection already open"))
@ -156,7 +165,7 @@ class cacheconnection(object):
# hangs in cases where the cache process fills the stderr pipe
# buffer (since remotefilelog never reads from stderr).
self.subprocess = subprocess.Popen(
cachecommand,
self._cachecommand,
shell=True,
close_fds=util.closefds,
stdin=subprocess.PIPE,
@ -168,6 +177,7 @@ class cacheconnection(object):
self.pipeo = self.subprocess.stdout
self.bufferedpipeo = io.open(self.pipeo.fileno(), mode="rb", closefd=False)
self.connected = True
self._pid = os.getpid()
def close(self):
def tryclose(pipe):
@ -369,7 +379,25 @@ class fileserverclient(object):
self.debugoutput = ui.configbool("remotefilelog", "debug")
self.remotecache = cacheconnection(repo)
if self.cacheprocess:
options = ""
cachepath = shallowutil.getcachepackpath(
self.repo, constants.FILEPACK_CATEGORY
)
if self.ui.configbool("remotefilelog", "indexedlogdatastore"):
path = shallowutil.getindexedlogdatastorepath(self.repo)
options += "--indexedlog_dir %s" % path
if self.ui.configbool("remotefilelog", "indexedloghistorystore"):
path = shallowutil.getindexedloghistorystorepath(self.repo)
options += " --indexedloghistorystore_dir %s" % path
cmd = " ".join([self.cacheprocess, self.key, cachepath, options])
self.remotecache = cacheconnection(repo, cmd)
else:
self.remotecache = simplecache()
self.getpackclient = getpackclient(repo)
datastore = lazyfield("datastore")
@ -385,17 +413,17 @@ class fileserverclient(object):
return self.requestpacks(fileids, fetchdata, fetchhistory)
def updatecache(self, dpackpath, hpackpath):
if self.remotecache.connected:
# send to the memcache
if self.ui.configbool("remotefilelog", "updatesharedcache"):
if dpackpath:
self.remotecache.setdatapack([dpackpath])
if hpackpath:
self.remotecache.sethistorypack([hpackpath])
self.remotecache.reconnect()
# send to the memcache
if self.ui.configbool("remotefilelog", "updatesharedcache"):
if dpackpath:
self.remotecache.setdatapack([dpackpath])
if hpackpath:
self.remotecache.sethistorypack([hpackpath])
def requestpacks(self, fileids, fetchdata, fetchhistory):
if not self.remotecache.connected:
self.connect()
self.remotecache.reconnect()
perftrace.traceflag("packs")
cache = self.remotecache
fileslog = self.repo.fileslog
@ -573,54 +601,6 @@ class fileserverclient(object):
self.ui.metrics.gauge("http_gethistory_bytes_uploaded", stats.uploaded())
self.ui.metrics.gauge("http_gethistory_requests", stats.requests())
def connect(self):
if self.cacheprocess:
options = ""
cachepath = shallowutil.getcachepackpath(
self.repo, constants.FILEPACK_CATEGORY
)
if self.ui.configbool("remotefilelog", "indexedlogdatastore"):
path = shallowutil.getindexedlogdatastorepath(self.repo)
options += "--indexedlog_dir %s" % path
if self.ui.configbool("remotefilelog", "indexedloghistorystore"):
path = shallowutil.getindexedloghistorystorepath(self.repo)
options += " --indexedloghistorystore_dir %s" % path
cmd = " ".join([self.cacheprocess, self.key, cachepath, options])
self.remotecache.connect(cmd)
else:
# If no cache process is specified, we fake one that always
# returns cache misses. This enables tests to run easily
# and may eventually allow us to be a drop in replacement
# for the largefiles extension.
class simplecache(object):
def __init__(self):
self.missingids = []
self.connected = True
def close(self):
pass
def setdatapack(self, keys):
pass
def sethistorypack(self, keys):
pass
def getdatapack(self, keys):
self.missingids.append(keys)
def gethistorypack(self, keys):
self.missingids.append(keys)
def receive(self, prog=None):
missing = self.missingids.pop(0) if self.missingids else []
return set(missing)
self.remotecache = simplecache()
def close(self):
# Make it "run-tests.py -i" friendly
if util.istest():
@ -778,3 +758,35 @@ class fileserverclient(object):
"excess remotefilelog fetching:\n%s\n",
"".join(traceback.format_stack()),
)
# If no cache process is specified, we fake one that always
# returns cache misses. This enables tests to run easily
# and may eventually allow us to be a drop in replacement
# for the largefiles extension.
class simplecache(object):
def __init__(self):
self.missingids = []
self.connected = True
def close(self):
pass
def setdatapack(self, keys):
pass
def sethistorypack(self, keys):
pass
def getdatapack(self, keys):
self.missingids.append(keys)
def gethistorypack(self, keys):
self.missingids.append(keys)
def receive(self, prog=None):
missing = self.missingids.pop(0) if self.missingids else []
return set(missing)
def reconnect(self):
pass