diff --git a/__init__.py b/__init__.py index 40f8e0c0b4..fa0dea44fb 100644 --- a/__init__.py +++ b/__init__.py @@ -62,7 +62,11 @@ def cloneshallow(orig, ui, repo, *args, **opts): return orig(self, remote, requirements) wrapfunction(localrepo.localrepository, 'stream_in', stream_in_shallow) - orig(ui, repo, *args, **opts) + try: + orig(ui, repo, *args, **opts) + finally: + if opts.get('shallow'): + fileserverclient.client.close() def reposetup(ui, repo): if not isinstance(repo, localrepo.localrepository): @@ -77,9 +81,9 @@ def reposetup(ui, repo): if isshallowclient: setupclient(ui, repo) - if isserverenabled: - # support file content requests - wireproto.commands['getfiles'] = (getfiles, '') + + # support file content requests + wireproto.commands['getfiles'] = (getfiles, '') if isserverenabled or isshallowclient: # don't clone filelogs to shallow clients @@ -237,7 +241,8 @@ def getfiles(repo, proto): continue path = request[40:] - try: + cachepath = os.path.join('/data/users/durham/cache', path, hex(node)) + if not os.path.exists(cachepath): filectx = repo.filectx(path, fileid=node) text = filectx.data() @@ -265,9 +270,21 @@ def getfiles(repo, proto): text = lz4.compressHC("%d\0%s%s" % (len(text), text, ancestortext)) - except Exception, ex: - raise ex - text = "" + + dirname = os.path.dirname(cachepath) + if not os.path.exists(dirname): + os.makedirs(dirname) + f = open(cachepath, "w") + try: + f.write(text) + finally: + f.close() + + f = open(cachepath, "r") + try: + text = f.read() + finally: + f.close() yield '%d\n%s' % (len(text), text) diff --git a/fileserverclient.py b/fileserverclient.py index 02c5b79564..e4c525ee35 100644 --- a/fileserverclient.py +++ b/fileserverclient.py @@ -6,7 +6,7 @@ # GNU General Public License version 2 or any later version. from mercurial.i18n import _ -from mercurial import util +from mercurial import util, sshpeer import os, socket, lz4, time # Statistics for debugging @@ -18,6 +18,7 @@ contentbytes = 0 metadatabytes = 0 _downloading = _('downloading') +_memcache = 'scmmemcache'; client = None @@ -30,12 +31,10 @@ class fileserverclient(object): """ def __init__(self, ui): self.ui = ui - self.socket = None - self.buffer = '' - self.server = ui.config("remotefilelog", "serveraddress") - self.port = ui.configint("remotefilelog", "serverport") self.cachepath = ui.config("remotefilelog", "cachepath") + self.pipeo = self.pipei = self.pipee = None + if not os.path.exists(self.cachepath): os.makedirs(self.cachepath) @@ -46,16 +45,20 @@ class fileserverclient(object): If the connection fails, an exception is raised. """ - if not self.socket: + if not self.pipeo: self.connect() count = len(fileids) - request = "%d\1" % count + request = "get\n%d\n" % count + idmap = {} for file, id in fileids: pathhash = util.sha1(file).hexdigest() - request += "%s%s%s\1" % (id, pathhash, file) + fullid = "%s/%s" % (pathhash, id) + request += fullid + "\n" + idmap[fullid] = file - self.socket.sendall(request) + self.pipei.write(request) + self.pipei.flush() missing = [] total = count @@ -65,54 +68,78 @@ class fileserverclient(object): global metadatabytes global contentbytes - while count > 0: - count -= 1 - raw = self.readuntil() - - if not raw: + remote = None + missed = [] + count = 0 + while True: + missingid = self.pipeo.read(81) + if not missingid: raise util.Abort(_("error downloading file contents: " + "connection closed early")) - - id = raw[:40] - pathhash = raw[40:80] - size = int(raw[80:]) - - fetchedbytes += len(raw) + size + 1 - - if size == 0: - missing.append(id) + if missingid == "0000000000000000000000000000000000000000/0000000000000000000000000000000000000000": + break + if missingid.startswith("_hits_"): + parts = missingid.split("_") + count += int(parts[2]) + self.ui.progress(_downloading, count, total=total) continue - data = self.read(size) - data = lz4.decompress(data) + missed.append(missingid) - index = data.index('\0') - contentsize = int(data[:index]) + # fetch from the master + if not remote: + remote = sshpeer.sshpeer(self.ui, self.ui.config("paths", "default")) + remote._callstream("getfiles") - contentbytes += contentsize - metadatabytes += len(data) - contentsize + id = missingid[-40:] + file = idmap[missingid] + sshrequest = "%s%s\n" % (id, file) + remote.pipeo.write(sshrequest) + remote.pipeo.flush() - filecachepath = os.path.join(self.cachepath, pathhash) - if not os.path.exists(filecachepath): - os.makedirs(filecachepath) + # receive progress reports + #self.ui.progress(_downloading, total - count, total=total) - idcachepath = os.path.join(filecachepath, id) + count = total - len(missed) + self.ui.progress(_downloading, count, total=total) - f = open(idcachepath, "w") - try: - f.write(data) - finally: - f.close() + # receive cache misses from master + if missed: + # process remote + pipei = remote.pipei + for id in missed: + size = int(pipei.readline()[:-1]) + data = pipei.read(size) - self.ui.progress(_downloading, total - count, total=total) + count += 1 + self.ui.progress(_downloading, count, total=total) + + idcachepath = os.path.join(self.cachepath, id) + dirpath = os.path.dirname(idcachepath) + if not os.path.exists(dirpath): + os.makedirs(dirpath) + f = open(idcachepath, "w") + try: + f.write(lz4.decompress(data)) + finally: + f.close() + + remote.cleanup() + remote = None + + # send to memcache + count = len(missed) + request = "set\n%d\n%s\n" % (count, "\n".join(missed)) + + self.pipei.write(request) + self.pipei.flush() self.ui.progress(_downloading, None) return missing def connect(self): - self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.socket.connect((self.server, self.port)) + self.pipei, self.pipeo, self.pipee, self.subprocess = util.popen4(_memcache) def close(self): if fetches: @@ -126,31 +153,16 @@ class fileserverclient(object): fetchcost, float(fetchedbytes) / 1024 / 1024 / max(0.001, fetchcost)) - if self.socket: - self.socket.sendall("\1\1") - self.socket.close() - self.socket = None - - def read(self, size): - while len(self.buffer) < size: - self.buffer += self.socket.recv(size) - - result = self.buffer[:size] - self.buffer = self.buffer[size:] - return result - - def readuntil(self, delimiter="\1"): - index = self.buffer.find(delimiter) - while index == -1: - new = self.socket.recv(4096) - if not new: - raise util.Abort(_("Connection closed early")) - self.buffer += new - index = self.buffer.find(delimiter) - - result = self.buffer[:index] - self.buffer = self.buffer[(index + 1):] - return result + # if the process is still open, close the pipes + if self.pipeo and self.subprocess.poll() == None: + self.pipei.write("exit\n") + self.pipei.close() + self.pipeo.close() + self.pipee.close() + del self.subprocess + self.pipeo = None + self.pipei = None + self.pipee = None def prefetch(self, storepath, fileids): """downloads the given file versions to the cache