Use memcache as the store

This commit is contained in:
Durham Goode 2013-06-07 15:13:58 -07:00
parent c116aa3a58
commit 4a768a3915
2 changed files with 103 additions and 74 deletions

View File

@ -62,7 +62,11 @@ def cloneshallow(orig, ui, repo, *args, **opts):
return orig(self, remote, requirements)
wrapfunction(localrepo.localrepository, 'stream_in', stream_in_shallow)
orig(ui, repo, *args, **opts)
try:
orig(ui, repo, *args, **opts)
finally:
if opts.get('shallow'):
fileserverclient.client.close()
def reposetup(ui, repo):
if not isinstance(repo, localrepo.localrepository):
@ -77,9 +81,9 @@ def reposetup(ui, repo):
if isshallowclient:
setupclient(ui, repo)
if isserverenabled:
# support file content requests
wireproto.commands['getfiles'] = (getfiles, '')
# support file content requests
wireproto.commands['getfiles'] = (getfiles, '')
if isserverenabled or isshallowclient:
# don't clone filelogs to shallow clients
@ -237,7 +241,8 @@ def getfiles(repo, proto):
continue
path = request[40:]
try:
cachepath = os.path.join('/data/users/durham/cache', path, hex(node))
if not os.path.exists(cachepath):
filectx = repo.filectx(path, fileid=node)
text = filectx.data()
@ -265,9 +270,21 @@ def getfiles(repo, proto):
text = lz4.compressHC("%d\0%s%s" %
(len(text), text, ancestortext))
except Exception, ex:
raise ex
text = ""
dirname = os.path.dirname(cachepath)
if not os.path.exists(dirname):
os.makedirs(dirname)
f = open(cachepath, "w")
try:
f.write(text)
finally:
f.close()
f = open(cachepath, "r")
try:
text = f.read()
finally:
f.close()
yield '%d\n%s' % (len(text), text)

View File

@ -6,7 +6,7 @@
# GNU General Public License version 2 or any later version.
from mercurial.i18n import _
from mercurial import util
from mercurial import util, sshpeer
import os, socket, lz4, time
# Statistics for debugging
@ -18,6 +18,7 @@ contentbytes = 0
metadatabytes = 0
_downloading = _('downloading')
_memcache = 'scmmemcache';
client = None
@ -30,12 +31,10 @@ class fileserverclient(object):
"""
def __init__(self, ui):
self.ui = ui
self.socket = None
self.buffer = ''
self.server = ui.config("remotefilelog", "serveraddress")
self.port = ui.configint("remotefilelog", "serverport")
self.cachepath = ui.config("remotefilelog", "cachepath")
self.pipeo = self.pipei = self.pipee = None
if not os.path.exists(self.cachepath):
os.makedirs(self.cachepath)
@ -46,16 +45,20 @@ class fileserverclient(object):
If the connection fails, an exception is raised.
"""
if not self.socket:
if not self.pipeo:
self.connect()
count = len(fileids)
request = "%d\1" % count
request = "get\n%d\n" % count
idmap = {}
for file, id in fileids:
pathhash = util.sha1(file).hexdigest()
request += "%s%s%s\1" % (id, pathhash, file)
fullid = "%s/%s" % (pathhash, id)
request += fullid + "\n"
idmap[fullid] = file
self.socket.sendall(request)
self.pipei.write(request)
self.pipei.flush()
missing = []
total = count
@ -65,54 +68,78 @@ class fileserverclient(object):
global metadatabytes
global contentbytes
while count > 0:
count -= 1
raw = self.readuntil()
if not raw:
remote = None
missed = []
count = 0
while True:
missingid = self.pipeo.read(81)
if not missingid:
raise util.Abort(_("error downloading file contents: " +
"connection closed early"))
id = raw[:40]
pathhash = raw[40:80]
size = int(raw[80:])
fetchedbytes += len(raw) + size + 1
if size == 0:
missing.append(id)
if missingid == "0000000000000000000000000000000000000000/0000000000000000000000000000000000000000":
break
if missingid.startswith("_hits_"):
parts = missingid.split("_")
count += int(parts[2])
self.ui.progress(_downloading, count, total=total)
continue
data = self.read(size)
data = lz4.decompress(data)
missed.append(missingid)
index = data.index('\0')
contentsize = int(data[:index])
# fetch from the master
if not remote:
remote = sshpeer.sshpeer(self.ui, self.ui.config("paths", "default"))
remote._callstream("getfiles")
contentbytes += contentsize
metadatabytes += len(data) - contentsize
id = missingid[-40:]
file = idmap[missingid]
sshrequest = "%s%s\n" % (id, file)
remote.pipeo.write(sshrequest)
remote.pipeo.flush()
filecachepath = os.path.join(self.cachepath, pathhash)
if not os.path.exists(filecachepath):
os.makedirs(filecachepath)
# receive progress reports
#self.ui.progress(_downloading, total - count, total=total)
idcachepath = os.path.join(filecachepath, id)
count = total - len(missed)
self.ui.progress(_downloading, count, total=total)
f = open(idcachepath, "w")
try:
f.write(data)
finally:
f.close()
# receive cache misses from master
if missed:
# process remote
pipei = remote.pipei
for id in missed:
size = int(pipei.readline()[:-1])
data = pipei.read(size)
self.ui.progress(_downloading, total - count, total=total)
count += 1
self.ui.progress(_downloading, count, total=total)
idcachepath = os.path.join(self.cachepath, id)
dirpath = os.path.dirname(idcachepath)
if not os.path.exists(dirpath):
os.makedirs(dirpath)
f = open(idcachepath, "w")
try:
f.write(lz4.decompress(data))
finally:
f.close()
remote.cleanup()
remote = None
# send to memcache
count = len(missed)
request = "set\n%d\n%s\n" % (count, "\n".join(missed))
self.pipei.write(request)
self.pipei.flush()
self.ui.progress(_downloading, None)
return missing
def connect(self):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.server, self.port))
self.pipei, self.pipeo, self.pipee, self.subprocess = util.popen4(_memcache)
def close(self):
if fetches:
@ -126,31 +153,16 @@ class fileserverclient(object):
fetchcost,
float(fetchedbytes) / 1024 / 1024 / max(0.001, fetchcost))
if self.socket:
self.socket.sendall("\1\1")
self.socket.close()
self.socket = None
def read(self, size):
while len(self.buffer) < size:
self.buffer += self.socket.recv(size)
result = self.buffer[:size]
self.buffer = self.buffer[size:]
return result
def readuntil(self, delimiter="\1"):
index = self.buffer.find(delimiter)
while index == -1:
new = self.socket.recv(4096)
if not new:
raise util.Abort(_("Connection closed early"))
self.buffer += new
index = self.buffer.find(delimiter)
result = self.buffer[:index]
self.buffer = self.buffer[(index + 1):]
return result
# if the process is still open, close the pipes
if self.pipeo and self.subprocess.poll() == None:
self.pipei.write("exit\n")
self.pipei.close()
self.pipeo.close()
self.pipee.close()
del self.subprocess
self.pipeo = None
self.pipei = None
self.pipee = None
def prefetch(self, storepath, fileids):
"""downloads the given file versions to the cache