2013-06-21 21:14:29 +04:00
|
|
|
# fileserverclient.py - client for communicating with the cache process
|
2013-05-07 03:44:04 +04:00
|
|
|
#
|
|
|
|
# Copyright 2013 Facebook, Inc.
|
|
|
|
#
|
|
|
|
# This software may be used and distributed according to the terms of the
|
|
|
|
# GNU General Public License version 2 or any later version.
|
|
|
|
|
|
|
|
from mercurial.i18n import _
|
2013-06-08 02:13:58 +04:00
|
|
|
from mercurial import util, sshpeer
|
2013-05-07 03:59:05 +04:00
|
|
|
import os, socket, lz4, time
|
|
|
|
|
|
|
|
# Statistics for debugging
|
|
|
|
fetchcost = 0
|
|
|
|
fetches = 0
|
|
|
|
fetched = 0
|
|
|
|
fetchedbytes = 0
|
2013-05-18 05:08:53 +04:00
|
|
|
contentbytes = 0
|
|
|
|
metadatabytes = 0
|
2013-05-07 03:44:04 +04:00
|
|
|
|
|
|
|
_downloading = _('downloading')
|
|
|
|
|
2013-05-07 03:49:55 +04:00
|
|
|
client = None
|
|
|
|
|
2013-05-18 05:08:53 +04:00
|
|
|
def getcachekey(file, id):
|
|
|
|
pathhash = util.sha1(file).hexdigest()
|
|
|
|
return os.path.join(pathhash, id)
|
|
|
|
|
2013-05-07 03:44:04 +04:00
|
|
|
class fileserverclient(object):
|
|
|
|
"""A client for requesting files from the remote file server.
|
|
|
|
"""
|
|
|
|
def __init__(self, ui):
|
2013-05-07 03:51:48 +04:00
|
|
|
self.ui = ui
|
2013-05-07 03:44:04 +04:00
|
|
|
self.cachepath = ui.config("remotefilelog", "cachepath")
|
2013-06-21 21:14:29 +04:00
|
|
|
self.cacheprocess = ui.config("remotefilelog", "cacheprocess")
|
|
|
|
self.debugoutput = ui.configbool("remotefilelog", "debug")
|
2013-05-07 03:44:04 +04:00
|
|
|
|
2013-06-08 02:13:58 +04:00
|
|
|
self.pipeo = self.pipei = self.pipee = None
|
|
|
|
|
2013-05-07 03:44:04 +04:00
|
|
|
if not os.path.exists(self.cachepath):
|
|
|
|
os.makedirs(self.cachepath)
|
|
|
|
|
2013-06-29 02:57:15 +04:00
|
|
|
def request(self, repo, fileids):
|
2013-05-07 03:44:04 +04:00
|
|
|
"""Takes a list of filename/node pairs and fetches them from the
|
|
|
|
server. Files are stored in the self.cachepath.
|
|
|
|
A list of nodes that the server couldn't find is returned.
|
|
|
|
If the connection fails, an exception is raised.
|
|
|
|
"""
|
|
|
|
|
2013-06-08 02:13:58 +04:00
|
|
|
if not self.pipeo:
|
2013-05-07 03:44:04 +04:00
|
|
|
self.connect()
|
|
|
|
|
|
|
|
count = len(fileids)
|
2013-06-08 02:13:58 +04:00
|
|
|
request = "get\n%d\n" % count
|
|
|
|
idmap = {}
|
2013-05-07 03:44:04 +04:00
|
|
|
for file, id in fileids:
|
2013-05-18 05:08:53 +04:00
|
|
|
pathhash = util.sha1(file).hexdigest()
|
2013-06-08 02:13:58 +04:00
|
|
|
fullid = "%s/%s" % (pathhash, id)
|
|
|
|
request += fullid + "\n"
|
|
|
|
idmap[fullid] = file
|
2013-05-07 03:44:04 +04:00
|
|
|
|
2013-06-08 02:13:58 +04:00
|
|
|
self.pipei.write(request)
|
|
|
|
self.pipei.flush()
|
2013-05-07 03:44:04 +04:00
|
|
|
|
|
|
|
missing = []
|
2013-05-07 03:51:48 +04:00
|
|
|
total = count
|
|
|
|
self.ui.progress(_downloading, 0, total=count)
|
|
|
|
|
2013-05-07 03:59:05 +04:00
|
|
|
global fetchedbytes
|
2013-05-18 05:08:53 +04:00
|
|
|
global metadatabytes
|
|
|
|
global contentbytes
|
2013-05-07 03:59:05 +04:00
|
|
|
|
2013-06-08 02:13:58 +04:00
|
|
|
remote = None
|
|
|
|
missed = []
|
|
|
|
count = 0
|
|
|
|
while True:
|
2013-06-25 22:38:48 +04:00
|
|
|
missingid = self.pipeo.readline()[:-1]
|
2013-06-08 02:13:58 +04:00
|
|
|
if not missingid:
|
2013-05-07 03:44:04 +04:00
|
|
|
raise util.Abort(_("error downloading file contents: " +
|
|
|
|
"connection closed early"))
|
2013-06-25 22:38:48 +04:00
|
|
|
if missingid == "0":
|
2013-06-08 02:13:58 +04:00
|
|
|
break
|
|
|
|
if missingid.startswith("_hits_"):
|
2013-06-25 22:38:48 +04:00
|
|
|
# receive progress reports
|
2013-06-08 02:13:58 +04:00
|
|
|
parts = missingid.split("_")
|
|
|
|
count += int(parts[2])
|
|
|
|
self.ui.progress(_downloading, count, total=total)
|
|
|
|
continue
|
2013-05-07 03:44:04 +04:00
|
|
|
|
2013-06-08 02:13:58 +04:00
|
|
|
missed.append(missingid)
|
2013-05-07 03:44:04 +04:00
|
|
|
|
2013-06-08 02:13:58 +04:00
|
|
|
# fetch from the master
|
|
|
|
if not remote:
|
|
|
|
remote = sshpeer.sshpeer(self.ui, self.ui.config("paths", "default"))
|
|
|
|
remote._callstream("getfiles")
|
2013-05-07 03:59:05 +04:00
|
|
|
|
2013-06-08 02:13:58 +04:00
|
|
|
id = missingid[-40:]
|
|
|
|
file = idmap[missingid]
|
|
|
|
sshrequest = "%s%s\n" % (id, file)
|
|
|
|
remote.pipeo.write(sshrequest)
|
|
|
|
remote.pipeo.flush()
|
|
|
|
|
2013-05-07 03:44:04 +04:00
|
|
|
|
2013-06-08 02:13:58 +04:00
|
|
|
count = total - len(missed)
|
|
|
|
self.ui.progress(_downloading, count, total=total)
|
2013-05-07 03:44:04 +04:00
|
|
|
|
2013-06-08 02:13:58 +04:00
|
|
|
# receive cache misses from master
|
|
|
|
if missed:
|
|
|
|
# process remote
|
|
|
|
pipei = remote.pipei
|
|
|
|
for id in missed:
|
|
|
|
size = int(pipei.readline()[:-1])
|
|
|
|
data = pipei.read(size)
|
2013-05-18 05:08:53 +04:00
|
|
|
|
2013-06-08 02:13:58 +04:00
|
|
|
count += 1
|
|
|
|
self.ui.progress(_downloading, count, total=total)
|
2013-05-18 05:08:53 +04:00
|
|
|
|
2013-06-08 02:13:58 +04:00
|
|
|
idcachepath = os.path.join(self.cachepath, id)
|
|
|
|
dirpath = os.path.dirname(idcachepath)
|
|
|
|
if not os.path.exists(dirpath):
|
|
|
|
os.makedirs(dirpath)
|
|
|
|
f = open(idcachepath, "w")
|
|
|
|
try:
|
|
|
|
f.write(lz4.decompress(data))
|
|
|
|
finally:
|
|
|
|
f.close()
|
2013-05-07 03:59:05 +04:00
|
|
|
|
2013-06-08 02:13:58 +04:00
|
|
|
remote.cleanup()
|
|
|
|
remote = None
|
2013-05-07 03:59:05 +04:00
|
|
|
|
2013-06-08 02:13:58 +04:00
|
|
|
# send to memcache
|
|
|
|
count = len(missed)
|
|
|
|
request = "set\n%d\n%s\n" % (count, "\n".join(missed))
|
2013-05-07 03:44:04 +04:00
|
|
|
|
2013-06-08 02:13:58 +04:00
|
|
|
self.pipei.write(request)
|
|
|
|
self.pipei.flush()
|
2013-05-07 03:51:48 +04:00
|
|
|
|
|
|
|
self.ui.progress(_downloading, None)
|
|
|
|
|
2013-06-29 02:57:15 +04:00
|
|
|
# mark ourselves as a user of this cache
|
|
|
|
repospath = os.path.join(self.cachepath, "repos")
|
|
|
|
reposfile = open(repospath, 'a')
|
|
|
|
reposfile.write(os.path.dirname(repo.path) + "\n")
|
|
|
|
reposfile.close()
|
|
|
|
|
2013-05-07 03:44:04 +04:00
|
|
|
return missing
|
|
|
|
|
|
|
|
def connect(self):
|
2013-06-25 22:38:48 +04:00
|
|
|
cmd = "%s %s" % (self.cacheprocess, self.cachepath)
|
|
|
|
self.pipei, self.pipeo, self.pipee, self.subprocess = util.popen4(cmd)
|
2013-05-07 03:44:04 +04:00
|
|
|
|
|
|
|
def close(self):
|
2013-06-21 21:14:29 +04:00
|
|
|
if fetches and self.debugoutput:
|
2013-05-18 05:08:53 +04:00
|
|
|
print ("%s fetched over %d fetches - %0.2f MB (%0.2f MB content / %0.2f MB metadata) " +
|
2013-05-07 03:59:05 +04:00
|
|
|
"over %0.2fs = %0.2f MB/s") % (
|
|
|
|
fetched,
|
|
|
|
fetches,
|
|
|
|
float(fetchedbytes) / 1024 / 1024,
|
2013-05-18 05:08:53 +04:00
|
|
|
float(contentbytes) / 1024 / 1024,
|
|
|
|
float(metadatabytes) / 1024 / 1024,
|
2013-05-07 03:59:05 +04:00
|
|
|
fetchcost,
|
|
|
|
float(fetchedbytes) / 1024 / 1024 / max(0.001, fetchcost))
|
|
|
|
|
2013-06-08 02:13:58 +04:00
|
|
|
# if the process is still open, close the pipes
|
|
|
|
if self.pipeo and self.subprocess.poll() == None:
|
|
|
|
self.pipei.write("exit\n")
|
|
|
|
self.pipei.close()
|
|
|
|
self.pipeo.close()
|
|
|
|
self.pipee.close()
|
2013-06-12 00:34:39 +04:00
|
|
|
self.subprocess.wait()
|
2013-06-08 02:13:58 +04:00
|
|
|
del self.subprocess
|
|
|
|
self.pipeo = None
|
|
|
|
self.pipei = None
|
|
|
|
self.pipee = None
|
2013-05-07 03:49:55 +04:00
|
|
|
|
2013-06-29 02:57:15 +04:00
|
|
|
def prefetch(self, repo, fileids):
|
2013-05-07 03:49:55 +04:00
|
|
|
"""downloads the given file versions to the cache
|
|
|
|
"""
|
2013-06-29 02:57:15 +04:00
|
|
|
storepath = repo.sopener.vfs.base
|
2013-05-07 03:49:55 +04:00
|
|
|
missingids = []
|
|
|
|
for file, id in fileids:
|
|
|
|
# hack
|
|
|
|
if file == '.hgtags':
|
|
|
|
continue
|
|
|
|
|
2013-05-18 05:08:53 +04:00
|
|
|
key = getcachekey(file, id)
|
|
|
|
idcachepath = os.path.join(self.cachepath, key)
|
2013-05-21 02:03:07 +04:00
|
|
|
idlocalpath = os.path.join(storepath, 'data', key)
|
2013-05-07 03:49:55 +04:00
|
|
|
if os.path.exists(idcachepath) or os.path.exists(idlocalpath):
|
|
|
|
continue
|
|
|
|
|
|
|
|
missingids.append((file, id))
|
|
|
|
|
|
|
|
if missingids:
|
2013-05-07 03:59:05 +04:00
|
|
|
global fetches, fetched, fetchcost
|
|
|
|
fetches += 1
|
|
|
|
fetched += len(missingids)
|
|
|
|
start = time.time()
|
2013-06-29 02:57:15 +04:00
|
|
|
missingids = self.request(repo, missingids)
|
2013-05-07 03:49:55 +04:00
|
|
|
if missingids:
|
|
|
|
raise util.Abort(_("unable to download %d files") % len(missingids))
|
2013-05-07 03:59:05 +04:00
|
|
|
fetchcost += time.time() - start
|