Move server and debug logic into their own files

__init__.py was getting quite large. This change moves the server and debug
logic into their own files.  Client-side logic remains in __init__.py
This commit is contained in:
Durham Goode 2013-11-25 16:36:44 -08:00
parent d9d4477013
commit 85e48b58fd
4 changed files with 553 additions and 515 deletions

View File

@ -8,7 +8,7 @@
testedwith = 'internal'
import fileserverclient, remotefilelog, remotefilectx, shallowstore, shallowrepo
import shallowbundle
import shallowbundle, debugcommands, remotefilelogserver
from mercurial.node import bin, hex, nullid, nullrev, short
from mercurial.i18n import _
from mercurial.extensions import wrapfunction
@ -25,14 +25,12 @@ cmdtable = {}
command = cmdutil.command(cmdtable)
testedwith = 'internal'
remotefilelogreq = "remotefilelog"
repoclass = localrepo.localrepository
if util.safehasattr(repoclass, '_basesupported'):
repoclass._basesupported.add(remotefilelogreq)
repoclass._basesupported.add(shallowrepo.requirement)
else:
# hg <= 2.7
repoclass.supported.add(remotefilelogreq)
repoclass.supported.add(shallowrepo.requirement)
def uisetup(ui):
"""Wraps user facing Mercurial commands to swap them out with shallow versions.
@ -41,13 +39,15 @@ def uisetup(ui):
entry[1].append(('', 'shallow', None,
_("create a shallow clone which uses remote file history")))
extensions.wrapcommand(commands.table, 'debugindex', debugindex)
extensions.wrapcommand(commands.table, 'debugindexdot', debugindexdot)
extensions.wrapcommand(commands.table, 'debugindex',
debugcommands.debugindex)
extensions.wrapcommand(commands.table, 'debugindexdot',
debugcommands.debugindexdot)
extensions.wrapcommand(commands.table, 'log', log)
# Prevent 'hg manifest --all'
def _manifest(orig, ui, repo, *args, **opts):
if remotefilelogreq in repo.requirements and opts.get('all'):
if shallowrepo.requirement in repo.requirements and opts.get('all'):
raise util.Abort(_("--all is not supported in a shallow repo"))
return orig(ui, repo, *args, **opts)
@ -65,12 +65,12 @@ def cloneshallow(orig, ui, repo, *args, **opts):
self.__class__.__bases__ = (self.__class__.__bases__[0],
self.unfiltered().__class__)
requirements.add(remotefilelogreq)
requirements.add(shallowrepo.requirement)
# Replace remote.stream_out with a version that sends file
# patterns.
def stream_out_shallow(orig):
if remotefilelogreq in remote._capabilities():
if shallowrepo.requirement in remote._capabilities():
opts = {}
if self.includepattern:
opts['includepattern'] = '\0'.join(self.includepattern)
@ -95,7 +95,7 @@ def reposetup(ui, repo):
return
isserverenabled = ui.configbool('remotefilelog', 'server')
isshallowclient = remotefilelogreq in repo.requirements
isshallowclient = shallowrepo.requirement in repo.requirements
if isserverenabled and isshallowclient:
raise Exception("Cannot be both a server and shallow client.")
@ -104,153 +104,21 @@ def reposetup(ui, repo):
setupclient(ui, repo)
if isserverenabled:
setupserver(ui, repo)
def setupserver(ui, repo):
"""Sets up a normal Mercurial repo so it can serve files to shallow repos.
"""
onetimesetup(ui)
# don't send files to shallow clients during pulls
def generatefiles(orig, self, changedfiles, linknodes, commonrevs, source):
caps = self._bundlecaps or []
if remotefilelogreq in caps:
# only send files that don't match the specified patterns
includepattern = None
excludepattern = None
for cap in (self._bundlecaps or []):
if cap.startswith("includepattern="):
includepattern = cap[len("includepattern="):].split('\0')
elif cap.startswith("excludepattern="):
excludepattern = cap[len("excludepattern="):].split('\0')
m = match.always(repo.root, '')
if includepattern or excludepattern:
m = match.match(repo.root, '', None,
includepattern, excludepattern)
changedfiles = list([f for f in changedfiles if not m(f)])
return orig(self, changedfiles, linknodes, commonrevs, source)
wrapfunction(changegroup.bundle10, 'generatefiles', generatefiles)
# add incoming hook to continuously generate file blobs
ui.setconfig("hooks", "changegroup.remotefilelog", incominghook)
remotefilelogserver.setupserver(ui, repo)
def setupclient(ui, repo):
if (not isinstance(repo, localrepo.localrepository) or
isinstance(repo, bundlerepo.bundlerepository)):
return
onetimesetup(ui)
# Even clients get the server setup since they need to have the
# wireprotocol endpoints registered.
remotefilelogserver.onetimesetup(ui)
onetimeclientsetup(ui)
shallowrepo.wraprepo(repo)
repo.store = shallowstore.wrapstore(repo.store)
onetime = False
def onetimesetup(ui):
"""Configures the wireprotocol for both clients and servers.
"""
global onetime
if onetime:
return
onetime = True
# support file content requests
wireproto.commands['getfiles'] = (getfiles, '')
class streamstate(object):
match = None
shallowremote = False
state = streamstate()
def stream_out_shallow(repo, proto, other):
includepattern = None
excludepattern = None
raw = other.get('includepattern')
if raw:
includepattern = raw.split('\0')
raw = other.get('excludepattern')
if raw:
excludepattern = raw.split('\0')
oldshallow = state.shallowremote
oldmatch = state.match
try:
state.shallowremote = True
state.match = match.always(repo.root, '')
if includepattern or excludepattern:
state.match = match.match(repo.root, '', None,
includepattern, excludepattern)
return wireproto.stream(repo, proto)
finally:
state.shallowremote = oldshallow
state.match = oldmatch
wireproto.commands['stream_out_shallow'] = (stream_out_shallow, '*')
# don't clone filelogs to shallow clients
def _walkstreamfiles(orig, repo):
if state.shallowremote:
# if we are shallow ourselves, stream our local commits
if remotefilelogreq in repo.requirements:
striplen = len(repo.store.path) + 1
readdir = repo.store.rawvfs.readdir
visit = [os.path.join(repo.store.path, 'data')]
while visit:
p = visit.pop()
for f, kind, st in readdir(p, stat=True):
fp = p + '/' + f
if kind == stat.S_IFREG:
if not fp.endswith('.i') and not fp.endswith('.d'):
n = util.pconvert(fp[striplen:])
yield (store.decodedir(n), n, st.st_size)
if kind == stat.S_IFDIR:
visit.append(fp)
# Return .d and .i files that do not match the shallow pattern
match = state.match or match.always(repo.root, '')
for (u, e, s) in repo.store.datafiles():
f = u[5:-2] # trim data/... and .i/.d
if not state.match(f):
yield (u, e, s)
for x in repo.store.topfiles():
yield x
elif remotefilelogreq in repo.requirements:
# don't allow cloning from a shallow repo to a full repo
# since it would require fetching every version of every
# file in order to create the revlogs.
raise util.Abort(_("Cannot clone from a shallow repo "
+ "to a full repo."))
else:
for x in orig(repo):
yield x
wrapfunction(wireproto, '_walkstreamfiles', _walkstreamfiles)
# We no longer use getbundle_shallow commands, but we must still
# support it for migration purposes
def getbundleshallow(repo, proto, others):
bundlecaps = others.get('bundlecaps', '')
bundlecaps = set(bundlecaps.split(','))
bundlecaps.add('remotefilelog')
others['bundlecaps'] = ','.join(bundlecaps)
return wireproto.commands["getbundle"][0](repo, proto, others)
wireproto.commands["getbundle_shallow"] = (getbundleshallow, '*')
# expose remotefilelog capabilities
def capabilities(orig, repo, proto):
caps = orig(repo, proto)
if (remotefilelogreq in repo.requirements or
ui.configbool('remotefilelog', 'server')):
caps += " " + remotefilelogreq
return caps
wrapfunction(wireproto, 'capabilities', capabilities)
clientonetime = False
def onetimeclientsetup(ui):
global clientonetime
@ -264,7 +132,7 @@ def onetimeclientsetup(ui):
def storewrapper(orig, requirements, path, vfstype):
s = orig(requirements, path, vfstype)
if remotefilelogreq in requirements:
if shallowrepo.requirement in requirements:
s = shallowstore.wrapstore(s)
return s
@ -272,7 +140,7 @@ def onetimeclientsetup(ui):
# prefetch files before update
def applyupdates(orig, repo, actions, wctx, mctx, actx, overwrite):
if remotefilelogreq in repo.requirements:
if shallowrepo.requirement in repo.requirements:
manifest = mctx.manifest()
files = []
for f, m, args, msg in [a for a in actions if a[1] == 'g']:
@ -284,7 +152,7 @@ def onetimeclientsetup(ui):
# prefetch files before mergecopies check
def mergecopies(orig, repo, c1, c2, ca):
if remotefilelogreq in repo.requirements:
if shallowrepo.requirement in repo.requirements:
m1 = c1.manifest()
m2 = c2.manifest()
ma = ca.manifest()
@ -309,7 +177,7 @@ def onetimeclientsetup(ui):
# prefetch files before pathcopies check
def forwardcopies(orig, a, b):
repo = a._repo
if remotefilelogreq in repo.requirements:
if shallowrepo.requirement in repo.requirements:
mb = b.manifest()
missing = set(mb.iterkeys())
missing.difference_update(a.manifest().iterkeys())
@ -339,7 +207,7 @@ def onetimeclientsetup(ui):
# prevent strip from stripping remotefilelogs
def _collectbrokencsets(orig, repo, files, striprev):
if remotefilelogreq in repo.requirements:
if shallowrepo.requirement in repo.requirements:
files = list([f for f in files if not repo.shallowmatch(f)])
return orig(repo, files, striprev)
wrapfunction(repair, '_collectbrokencsets', _collectbrokencsets)
@ -376,7 +244,7 @@ def onetimeclientsetup(ui):
def filectx(orig, self, path, fileid=None, filelog=None):
if fileid is None:
fileid = self.filenode(path)
if (remotefilelogreq in self._repo.requirements and
if (shallowrepo.requirement in self._repo.requirements and
self._repo.shallowmatch(path)):
return remotefilectx.remotefilectx(self._repo, path,
fileid=fileid, changectx=self, filelog=filelog)
@ -384,7 +252,7 @@ def onetimeclientsetup(ui):
wrapfunction(context.changectx, 'filectx', filectx)
def workingfilectx(orig, self, path, filelog=None):
if (remotefilelogreq in self._repo.requirements and
if (shallowrepo.requirement in self._repo.requirements and
self._repo.shallowmatch(path)):
return remotefilectx.remoteworkingfilectx(self._repo,
path, workingctx=self, filelog=filelog)
@ -394,7 +262,7 @@ def onetimeclientsetup(ui):
# prefetch required revisions before a diff
def trydiff(orig, repo, revs, ctx1, ctx2, modified, added, removed,
copy, getfilectx, opts, losedatafn, prefix):
if remotefilelogreq in repo.requirements:
if shallowrepo.requirement in repo.requirements:
prefetch = []
mf1 = ctx1.manifest()
for fname in modified + added + removed:
@ -434,140 +302,6 @@ def onetimeclientsetup(ui):
wrapfunction(cmdutil, 'revert', revert)
def createfileblob(filectx):
text = filectx.data()
ancestors = [filectx]
ancestors.extend([f for f in filectx.ancestors()])
ancestortext = ""
for ancestorctx in ancestors:
parents = ancestorctx.parents()
p1 = nullid
p2 = nullid
if len(parents) > 0:
p1 = parents[0].filenode()
if len(parents) > 1:
p2 = parents[1].filenode()
copyname = ""
rename = ancestorctx.renamed()
if rename:
copyname = rename[0]
linknode = ancestorctx.node()
ancestortext += "%s%s%s%s%s\0" % (
ancestorctx.filenode(), p1, p2, linknode,
copyname)
return "%d\0%s%s" % (len(text), text, ancestortext)
def getfiles(repo, proto):
"""A server api for requesting particular versions of particular files.
"""
if remotefilelogreq in repo.requirements:
raise util.Abort(_('cannot fetch remote files from shallow repo'))
def streamer():
fin = proto.fin
opener = repo.sopener
cachepath = repo.ui.config("remotefilelog", "servercachepath")
if not cachepath:
cachepath = os.path.join(repo.path, "remotefilelogcache")
# everything should be user & group read/writable
oldumask = os.umask(0o002)
try:
while True:
request = fin.readline()[:-1]
if not request:
break
node = bin(request[:40])
if node == nullid:
yield '0\n'
continue
path = request[40:]
filecachepath = os.path.join(cachepath, path, hex(node))
if not os.path.exists(filecachepath):
filectx = repo.filectx(path, fileid=node)
if filectx.node() == nullid:
repo.changelog = changelog.changelog(repo.sopener)
filectx = repo.filectx(path, fileid=node)
text = createfileblob(filectx)
text = lz4.compressHC(text)
dirname = os.path.dirname(filecachepath)
if not os.path.exists(dirname):
os.makedirs(dirname)
f = open(filecachepath, "w")
try:
f.write(text)
finally:
f.close()
f = open(filecachepath, "r")
try:
text = f.read()
finally:
f.close()
yield '%d\n%s' % (len(text), text)
# it would be better to only flush after processing a whole batch
# but currently we don't know if there are more requests coming
proto.fout.flush()
finally:
os.umask(oldumask)
return wireproto.streamres(streamer())
def incominghook(ui, repo, node, source, url, **kwargs):
"""Server hook that produces the shallow file blobs immediately after
a commit, in anticipation of them being requested soon.
"""
cachepath = repo.ui.config("remotefilelog", "servercachepath")
if not cachepath:
cachepath = os.path.join(repo.path, "remotefilelogcache")
heads = repo.revs("heads(%s::)" % node)
# everything should be user & group read/writable
oldumask = os.umask(0o002)
try:
count = 0
for head in heads:
mf = repo[head].manifest()
for filename, filenode in mf.iteritems():
filecachepath = os.path.join(cachepath, filename, hex(filenode))
if os.path.exists(filecachepath):
continue
# This can be a bit slow. Don't block the commit returning
# for large commits.
if count > 500:
break
count += 1
filectx = repo.filectx(filename, fileid=filenode)
text = createfileblob(filectx)
text = lz4.compressHC(text)
dirname = os.path.dirname(filecachepath)
if not os.path.exists(dirname):
os.makedirs(dirname)
f = open(filecachepath, "w")
try:
f.write(text)
finally:
f.close()
finally:
os.umask(oldumask)
def getrenamedfn(repo, endrev=None):
rcache = {}
@ -593,7 +327,7 @@ def getrenamedfn(repo, endrev=None):
return getrenamed
def walkfilerevs(orig, repo, match, follow, revs, fncache):
if not remotefilelogreq in repo.requirements:
if not shallowrepo.requirement in repo.requirements:
return orig(repo, match, follow, revs, fncache)
# remotefilelog's can't be walked in rev order, so throw.
@ -633,7 +367,7 @@ def filelogrevset(orig, repo, subset, x):
a slower, more accurate result, use ``file()``.
"""
if not remotefilelogreq in repo.requirements:
if not shallowrepo.requirement in repo.requirements:
return orig(repo, subset, x)
# i18n: "filelog" is a keyword
@ -663,105 +397,6 @@ def filelogrevset(orig, repo, subset, x):
return [r for r in subset if r in s]
def buildtemprevlog(repo, file):
# get filename key
filekey = util.sha1(file).hexdigest()
filedir = os.path.join(repo.path, 'store/data', filekey)
# sort all entries based on linkrev
fctxs = []
for filenode in os.listdir(filedir):
fctxs.append(repo.filectx(file, fileid=bin(filenode)))
fctxs = sorted(fctxs, key=lambda x: x.linkrev())
# add to revlog
temppath = repo.sjoin('data/temprevlog.i')
if os.path.exists(temppath):
os.remove(temppath)
r = filelog.filelog(repo.sopener, 'temprevlog')
class faket(object):
def add(self, a,b,c):
pass
t = faket()
for fctx in fctxs:
if fctx.node() not in repo:
continue
p = fctx.filelog().parents(fctx.filenode())
meta = {}
if fctx.renamed():
meta['copy'] = fctx.renamed()[0]
meta['copyrev'] = hex(fctx.renamed()[1])
r.add(fctx.data(), meta, t, fctx.linkrev(), p[0], p[1])
return r
def debugindex(orig, ui, repo, file_ = None, **opts):
"""dump the contents of an index file"""
if (opts.get('changelog') or opts.get('manifest') or
not remotefilelogreq in repo.requirements or
not repo.shallowmatch(file_)):
return orig(ui, repo, file_, **opts)
r = buildtemprevlog(repo, file_)
# debugindex like normal
format = opts.get('format', 0)
if format not in (0, 1):
raise util.Abort(_("unknown format %d") % format)
generaldelta = r.version & revlog.REVLOGGENERALDELTA
if generaldelta:
basehdr = ' delta'
else:
basehdr = ' base'
if format == 0:
ui.write(" rev offset length " + basehdr + " linkrev"
" nodeid p1 p2\n")
elif format == 1:
ui.write(" rev flag offset length"
" size " + basehdr + " link p1 p2"
" nodeid\n")
for i in r:
node = r.node(i)
if generaldelta:
base = r.deltaparent(i)
else:
base = r.chainbase(i)
if format == 0:
try:
pp = r.parents(node)
except Exception:
pp = [nullid, nullid]
ui.write("% 6d % 9d % 7d % 6d % 7d %s %s %s\n" % (
i, r.start(i), r.length(i), base, r.linkrev(i),
short(node), short(pp[0]), short(pp[1])))
elif format == 1:
pr = r.parentrevs(i)
ui.write("% 6d %04x % 8d % 8d % 8d % 6d % 6d % 6d % 6d %s\n" % (
i, r.flags(i), r.start(i), r.length(i), r.rawsize(i),
base, r.linkrev(i), pr[0], pr[1], short(node)))
def debugindexdot(orig, ui, repo, file_):
"""dump an index DAG as a graphviz dot file"""
if not remotefilelogreq in repo.requirements:
return orig(ui, repo, file_)
r = buildtemprevlog(repo, os.path.basename(file_)[:-2])
ui.write(("digraph G {\n"))
for i in r:
node = r.node(i)
pp = r.parents(node)
ui.write("\t%d -> %d\n" % (r.rev(pp[0]), i))
if pp[1] != nullid:
ui.write("\t%d -> %d\n" % (r.rev(pp[1]), i))
ui.write("}\n")
commands.norepo += " gc"
@ -797,7 +432,7 @@ def gc(ui, *args, **opts):
# gc server cache
for repo in repos:
gcserver(ui, repo._repo)
remotefilelogserver.gcserver(ui, repo._repo)
def gcclient(ui, cachepath):
# get list of repos that use this cache
@ -898,41 +533,6 @@ def gcclient(ui, cachepath):
(removed, count, float(originalsize) / 1024.0 / 1024.0 / 1024.0,
float(size) / 1024.0 / 1024.0 / 1024.0))
def gcserver(ui, repo):
if not repo.ui.configbool("remotefilelog", "server"):
return
neededfiles = set()
heads = repo.revs("heads(all())")
cachepath = repo.join("remotefilelogcache")
for head in heads:
mf = repo[head].manifest()
for filename, filenode in mf.iteritems():
filecachepath = os.path.join(cachepath, filename, hex(filenode))
neededfiles.add(filecachepath)
# delete unneeded older files
days = repo.ui.configint("remotefilelog", "serverexpiration", 30)
expiration = time.time() - (days * 24 * 60 * 60)
_removing = _("removing old server cache")
count = 0
ui.progress(_removing, count, unit="files")
for root, dirs, files in os.walk(cachepath):
for file in files:
filepath = os.path.join(root, file)
count += 1
ui.progress(_removing, count, unit="files")
if filepath in neededfiles:
continue
stat = os.stat(filepath)
if stat.st_mtime < expiration:
os.remove(filepath)
ui.progress(_removing, None)
def log(orig, ui, repo, *pats, **opts):
if pats and not opts.get("follow"):
isfile = True
@ -949,97 +549,9 @@ def log(orig, ui, repo, *pats, **opts):
return orig(ui, repo, *pats, **opts)
commands.norepo += " debugremotefilelog"
@command('^debugremotefilelog', [
('d', 'decompress', None, _('decompress the filelog first')),
], _('hg debugremotefilelog <path>'))
def debugremotefilelog(ui, *args, **opts):
path = args[0]
decompress = opts.get('decompress')
size, firstnode, mapping = parsefileblob(path, decompress)
ui.status("size: %s bytes\n" % (size))
ui.status("path: %s \n" % (path))
ui.status("key: %s \n" % (short(firstnode)))
ui.status("\n")
ui.status("%12s => %12s %13s %13s %12s\n" %
("node", "p1", "p2", "linknode", "copyfrom"))
queue = [firstnode]
while queue:
node = queue.pop(0)
p1, p2, linknode, copyfrom = mapping[node]
ui.status("%s => %s %s %s %s\n" %
(short(node), short(p1), short(p2), short(linknode), copyfrom))
if p1 != nullid:
queue.append(p1)
if p2 != nullid:
queue.append(p2)
commands.norepo += " verifyremotefilelog"
@command('^verifyremotefilelog', [
('d', 'decompress', None, _('decompress the filelogs first')),
], _('hg verifyremotefilelogs <directory>'))
def verifyremotefilelog(ui, *args, **opts):
path = args[0]
decompress = opts.get('decompress')
for root, dirs, files in os.walk(path):
for file in files:
if file == "repos":
continue
filepath = os.path.join(root, file)
size, firstnode, mapping = parsefileblob(filepath, decompress)
for p1, p2, linknode, copyfrom in mapping.itervalues():
if linknode == nullid:
actualpath = os.path.relpath(root, path)
key = fileserverclient.getcachekey(actualpath, file)
ui.status("%s %s\n" % (key, os.path.relpath(filepath, path)))
def parsefileblob(path, decompress):
raw = None
f = open(path, "r")
try:
raw = f.read()
finally:
f.close()
if decompress:
raw = lz4.decompress(raw)
index = raw.index('\0')
size = int(raw[:index])
data = raw[(index + 1):(index + 1 + size)]
start = index + 1 + size
firstnode = None
mapping = {}
while start < len(raw):
divider = raw.index('\0', start + 80)
currentnode = raw[start:(start + 20)]
if not firstnode:
firstnode = currentnode
p1 = raw[(start + 20):(start + 40)]
p2 = raw[(start + 40):(start + 60)]
linknode = raw[(start + 60):(start + 80)]
copyfrom = raw[(start + 80):divider]
mapping[currentnode] = (p1, p2, linknode, copyfrom)
start = divider + 1
return size, firstnode, mapping
def revert(orig, ui, repo, ctx, parents, *pats, **opts):
# prefetch prior to reverting
if remotefilelogreq in repo.requirements:
if shallowrepo.requirement in repo.requirements:
files = []
m = scmutil.match(ctx, pats, opts)
mf = ctx.manifest()
@ -1049,3 +561,19 @@ def revert(orig, ui, repo, ctx, parents, *pats, **opts):
fileserverclient.client.prefetch(repo, files)
return orig(ui, repo, ctx, parents, *pats, **opts)
commands.norepo += " debugremotefilelog"
@command('^debugremotefilelog', [
('d', 'decompress', None, _('decompress the filelog first')),
], _('hg debugremotefilelog <path>'))
def debugremotefilelog(ui, *args, **opts):
return debugcommands.debugremotefilelog(ui, *args, **opts)
commands.norepo += " verifyremotefilelog"
@command('^verifyremotefilelog', [
('d', 'decompress', None, _('decompress the filelogs first')),
], _('hg verifyremotefilelogs <directory>'))
def verifyremotefilelog(ui, *args, **opts):
return debugcommands.verifyremotefilelog(ui, *args, **opts)

View File

@ -0,0 +1,190 @@
# debugcommands.py - debug logic for remotefilelog
#
# Copyright 2013 Facebook, Inc.
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
from mercurial import util, filelog, revlog
from mercurial.node import bin, hex, nullid, nullrev
from mercurial.i18n import _
import shallowrepo
import os, lz4
def debugremotefilelog(ui, *args, **opts):
path = args[0]
decompress = opts.get('decompress')
size, firstnode, mapping = parsefileblob(path, decompress)
ui.status("size: %s bytes\n" % (size))
ui.status("path: %s \n" % (path))
ui.status("key: %s \n" % (short(firstnode)))
ui.status("\n")
ui.status("%12s => %12s %13s %13s %12s\n" %
("node", "p1", "p2", "linknode", "copyfrom"))
queue = [firstnode]
while queue:
node = queue.pop(0)
p1, p2, linknode, copyfrom = mapping[node]
ui.status("%s => %s %s %s %s\n" %
(short(node), short(p1), short(p2), short(linknode), copyfrom))
if p1 != nullid:
queue.append(p1)
if p2 != nullid:
queue.append(p2)
def buildtemprevlog(repo, file):
# get filename key
filekey = util.sha1(file).hexdigest()
filedir = os.path.join(repo.path, 'store/data', filekey)
# sort all entries based on linkrev
fctxs = []
for filenode in os.listdir(filedir):
fctxs.append(repo.filectx(file, fileid=bin(filenode)))
fctxs = sorted(fctxs, key=lambda x: x.linkrev())
# add to revlog
temppath = repo.sjoin('data/temprevlog.i')
if os.path.exists(temppath):
os.remove(temppath)
r = filelog.filelog(repo.sopener, 'temprevlog')
class faket(object):
def add(self, a,b,c):
pass
t = faket()
for fctx in fctxs:
if fctx.node() not in repo:
continue
p = fctx.filelog().parents(fctx.filenode())
meta = {}
if fctx.renamed():
meta['copy'] = fctx.renamed()[0]
meta['copyrev'] = hex(fctx.renamed()[1])
r.add(fctx.data(), meta, t, fctx.linkrev(), p[0], p[1])
return r
def debugindex(orig, ui, repo, file_ = None, **opts):
"""dump the contents of an index file"""
if (opts.get('changelog') or opts.get('manifest') or
not shallowrepo.requirement in repo.requirements or
not repo.shallowmatch(file_)):
return orig(ui, repo, file_, **opts)
r = buildtemprevlog(repo, file_)
# debugindex like normal
format = opts.get('format', 0)
if format not in (0, 1):
raise util.Abort(_("unknown format %d") % format)
generaldelta = r.version & revlog.REVLOGGENERALDELTA
if generaldelta:
basehdr = ' delta'
else:
basehdr = ' base'
if format == 0:
ui.write(" rev offset length " + basehdr + " linkrev"
" nodeid p1 p2\n")
elif format == 1:
ui.write(" rev flag offset length"
" size " + basehdr + " link p1 p2"
" nodeid\n")
for i in r:
node = r.node(i)
if generaldelta:
base = r.deltaparent(i)
else:
base = r.chainbase(i)
if format == 0:
try:
pp = r.parents(node)
except Exception:
pp = [nullid, nullid]
ui.write("% 6d % 9d % 7d % 6d % 7d %s %s %s\n" % (
i, r.start(i), r.length(i), base, r.linkrev(i),
short(node), short(pp[0]), short(pp[1])))
elif format == 1:
pr = r.parentrevs(i)
ui.write("% 6d %04x % 8d % 8d % 8d % 6d % 6d % 6d % 6d %s\n" % (
i, r.flags(i), r.start(i), r.length(i), r.rawsize(i),
base, r.linkrev(i), pr[0], pr[1], short(node)))
def debugindexdot(orig, ui, repo, file_):
"""dump an index DAG as a graphviz dot file"""
if not shallowrepo.requirement in repo.requirements:
return orig(ui, repo, file_)
r = buildtemprevlog(repo, os.path.basename(file_)[:-2])
ui.write(("digraph G {\n"))
for i in r:
node = r.node(i)
pp = r.parents(node)
ui.write("\t%d -> %d\n" % (r.rev(pp[0]), i))
if pp[1] != nullid:
ui.write("\t%d -> %d\n" % (r.rev(pp[1]), i))
ui.write("}\n")
def verifyremotefilelog(ui, *args, **opts):
path = args[0]
decompress = opts.get('decompress')
for root, dirs, files in os.walk(path):
for file in files:
if file == "repos":
continue
filepath = os.path.join(root, file)
size, firstnode, mapping = parsefileblob(filepath, decompress)
for p1, p2, linknode, copyfrom in mapping.itervalues():
if linknode == nullid:
actualpath = os.path.relpath(root, path)
key = fileserverclient.getcachekey(actualpath, file)
ui.status("%s %s\n" % (key, os.path.relpath(filepath, path)))
def parsefileblob(path, decompress):
raw = None
f = open(path, "r")
try:
raw = f.read()
finally:
f.close()
if decompress:
raw = lz4.decompress(raw)
index = raw.index('\0')
size = int(raw[:index])
data = raw[(index + 1):(index + 1 + size)]
start = index + 1 + size
firstnode = None
mapping = {}
while start < len(raw):
divider = raw.index('\0', start + 80)
currentnode = raw[start:(start + 20)]
if not firstnode:
firstnode = currentnode
p1 = raw[(start + 20):(start + 40)]
p2 = raw[(start + 40):(start + 60)]
linknode = raw[(start + 60):(start + 80)]
copyfrom = raw[(start + 80):divider]
mapping[currentnode] = (p1, p2, linknode, copyfrom)
start = divider + 1
return size, firstnode, mapping

View File

@ -0,0 +1,318 @@
# remotefilelogserver.py - server logic for a remotefilelog server
#
# Copyright 2013 Facebook, Inc.
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
from mercurial import wireproto, changegroup, match, util, changelog
from mercurial.extensions import wrapfunction
from mercurial.node import bin, hex, nullid, nullrev
from mercurial.i18n import _
import shallowrepo
import stat, os, lz4, time
def setupserver(ui, repo):
"""Sets up a normal Mercurial repo so it can serve files to shallow repos.
"""
onetimesetup(ui)
# don't send files to shallow clients during pulls
def generatefiles(orig, self, changedfiles, linknodes, commonrevs, source):
caps = self._bundlecaps or []
if shallowrepo.requirement in caps:
# only send files that don't match the specified patterns
includepattern = None
excludepattern = None
for cap in (self._bundlecaps or []):
if cap.startswith("includepattern="):
includepattern = cap[len("includepattern="):].split('\0')
elif cap.startswith("excludepattern="):
excludepattern = cap[len("excludepattern="):].split('\0')
m = match.always(repo.root, '')
if includepattern or excludepattern:
m = match.match(repo.root, '', None,
includepattern, excludepattern)
changedfiles = list([f for f in changedfiles if not m(f)])
return orig(self, changedfiles, linknodes, commonrevs, source)
wrapfunction(changegroup.bundle10, 'generatefiles', generatefiles)
# add incoming hook to continuously generate file blobs
ui.setconfig("hooks", "changegroup.remotefilelog", incominghook)
onetime = False
def onetimesetup(ui):
"""Configures the wireprotocol for both clients and servers.
"""
global onetime
if onetime:
return
onetime = True
# support file content requests
wireproto.commands['getfiles'] = (getfiles, '')
class streamstate(object):
match = None
shallowremote = False
state = streamstate()
def stream_out_shallow(repo, proto, other):
includepattern = None
excludepattern = None
raw = other.get('includepattern')
if raw:
includepattern = raw.split('\0')
raw = other.get('excludepattern')
if raw:
excludepattern = raw.split('\0')
oldshallow = state.shallowremote
oldmatch = state.match
try:
state.shallowremote = True
state.match = match.always(repo.root, '')
if includepattern or excludepattern:
state.match = match.match(repo.root, '', None,
includepattern, excludepattern)
return wireproto.stream(repo, proto)
finally:
state.shallowremote = oldshallow
state.match = oldmatch
wireproto.commands['stream_out_shallow'] = (stream_out_shallow, '*')
# don't clone filelogs to shallow clients
def _walkstreamfiles(orig, repo):
if state.shallowremote:
# if we are shallow ourselves, stream our local commits
if shallowrepo.requirement in repo.requirements:
striplen = len(repo.store.path) + 1
readdir = repo.store.rawvfs.readdir
visit = [os.path.join(repo.store.path, 'data')]
while visit:
p = visit.pop()
for f, kind, st in readdir(p, stat=True):
fp = p + '/' + f
if kind == stat.S_IFREG:
if not fp.endswith('.i') and not fp.endswith('.d'):
n = util.pconvert(fp[striplen:])
yield (store.decodedir(n), n, st.st_size)
if kind == stat.S_IFDIR:
visit.append(fp)
# Return .d and .i files that do not match the shallow pattern
match = state.match or match.always(repo.root, '')
for (u, e, s) in repo.store.datafiles():
f = u[5:-2] # trim data/... and .i/.d
if not state.match(f):
yield (u, e, s)
for x in repo.store.topfiles():
yield x
elif shallowrepo.requirement in repo.requirements:
# don't allow cloning from a shallow repo to a full repo
# since it would require fetching every version of every
# file in order to create the revlogs.
raise util.Abort(_("Cannot clone from a shallow repo "
+ "to a full repo."))
else:
for x in orig(repo):
yield x
wrapfunction(wireproto, '_walkstreamfiles', _walkstreamfiles)
# We no longer use getbundle_shallow commands, but we must still
# support it for migration purposes
def getbundleshallow(repo, proto, others):
bundlecaps = others.get('bundlecaps', '')
bundlecaps = set(bundlecaps.split(','))
bundlecaps.add('remotefilelog')
others['bundlecaps'] = ','.join(bundlecaps)
return wireproto.commands["getbundle"][0](repo, proto, others)
wireproto.commands["getbundle_shallow"] = (getbundleshallow, '*')
# expose remotefilelog capabilities
def capabilities(orig, repo, proto):
caps = orig(repo, proto)
if (shallowrepo.requirement in repo.requirements or
ui.configbool('remotefilelog', 'server')):
caps += " " + shallowrepo.requirement
return caps
wrapfunction(wireproto, 'capabilities', capabilities)
def getfiles(repo, proto):
"""A server api for requesting particular versions of particular files.
"""
if shallowrepo.requirement in repo.requirements:
raise util.Abort(_('cannot fetch remote files from shallow repo'))
def streamer():
fin = proto.fin
opener = repo.sopener
cachepath = repo.ui.config("remotefilelog", "servercachepath")
if not cachepath:
cachepath = os.path.join(repo.path, "remotefilelogcache")
# everything should be user & group read/writable
oldumask = os.umask(0o002)
try:
while True:
request = fin.readline()[:-1]
if not request:
break
node = bin(request[:40])
if node == nullid:
yield '0\n'
continue
path = request[40:]
filecachepath = os.path.join(cachepath, path, hex(node))
if not os.path.exists(filecachepath):
filectx = repo.filectx(path, fileid=node)
if filectx.node() == nullid:
repo.changelog = changelog.changelog(repo.sopener)
filectx = repo.filectx(path, fileid=node)
text = createfileblob(filectx)
text = lz4.compressHC(text)
dirname = os.path.dirname(filecachepath)
if not os.path.exists(dirname):
os.makedirs(dirname)
f = open(filecachepath, "w")
try:
f.write(text)
finally:
f.close()
f = open(filecachepath, "r")
try:
text = f.read()
finally:
f.close()
yield '%d\n%s' % (len(text), text)
# it would be better to only flush after processing a whole batch
# but currently we don't know if there are more requests coming
proto.fout.flush()
finally:
os.umask(oldumask)
return wireproto.streamres(streamer())
def incominghook(ui, repo, node, source, url, **kwargs):
"""Server hook that produces the shallow file blobs immediately after
a commit, in anticipation of them being requested soon.
"""
cachepath = repo.ui.config("remotefilelog", "servercachepath")
if not cachepath:
cachepath = os.path.join(repo.path, "remotefilelogcache")
heads = repo.revs("heads(%s::)" % node)
# everything should be user & group read/writable
oldumask = os.umask(0o002)
try:
count = 0
for head in heads:
mf = repo[head].manifest()
for filename, filenode in mf.iteritems():
filecachepath = os.path.join(cachepath, filename, hex(filenode))
if os.path.exists(filecachepath):
continue
# This can be a bit slow. Don't block the commit returning
# for large commits.
if count > 500:
break
count += 1
filectx = repo.filectx(filename, fileid=filenode)
text = createfileblob(filectx)
text = lz4.compressHC(text)
dirname = os.path.dirname(filecachepath)
if not os.path.exists(dirname):
os.makedirs(dirname)
f = open(filecachepath, "w")
try:
f.write(text)
finally:
f.close()
finally:
os.umask(oldumask)
def createfileblob(filectx):
text = filectx.data()
ancestors = [filectx]
ancestors.extend([f for f in filectx.ancestors()])
ancestortext = ""
for ancestorctx in ancestors:
parents = ancestorctx.parents()
p1 = nullid
p2 = nullid
if len(parents) > 0:
p1 = parents[0].filenode()
if len(parents) > 1:
p2 = parents[1].filenode()
copyname = ""
rename = ancestorctx.renamed()
if rename:
copyname = rename[0]
linknode = ancestorctx.node()
ancestortext += "%s%s%s%s%s\0" % (
ancestorctx.filenode(), p1, p2, linknode,
copyname)
return "%d\0%s%s" % (len(text), text, ancestortext)
def gcserver(ui, repo):
if not repo.ui.configbool("remotefilelog", "server"):
return
neededfiles = set()
heads = repo.revs("heads(all())")
cachepath = repo.join("remotefilelogcache")
for head in heads:
mf = repo[head].manifest()
for filename, filenode in mf.iteritems():
filecachepath = os.path.join(cachepath, filename, hex(filenode))
neededfiles.add(filecachepath)
# delete unneeded older files
days = repo.ui.configint("remotefilelog", "serverexpiration", 30)
expiration = time.time() - (days * 24 * 60 * 60)
_removing = _("removing old server cache")
count = 0
ui.progress(_removing, count, unit="files")
for root, dirs, files in os.walk(cachepath):
for file in files:
filepath = os.path.join(root, file)
count += 1
ui.progress(_removing, count, unit="files")
if filepath in neededfiles:
continue
stat = os.stat(filepath)
if stat.st_mtime < expiration:
os.remove(filepath)
ui.progress(_removing, None)

View File

@ -11,6 +11,8 @@ from mercurial import localrepo, context, mdiff, util, match
from mercurial.extensions import wrapfunction
import remotefilelog, remotefilectx, fileserverclient, shallowbundle, os
requirement = "remotefilelog"
def wraprepo(repo):
class shallowrepository(repo.__class__):
def file(self, f):