sapling/hgsql.py

628 lines
18 KiB
Python
Raw Normal View History

2013-10-18 03:46:12 +04:00
# db.py
#
# Copyright 2013 Facebook, Inc.
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
#CREATE TABLE revs(
#id INT(2) NOT NULL AUTO_INCREMENT PRIMARY KEY,
#path VARCHAR(256) NOT NULL,
#linkrev INT NOT NULL,
#entry BINARY(64) NOT NULL,
#data0 CHAR(1) NOT NULL,
#data1 LONGBLOB NOT NULL,
#createdtime DATETIME NOT NULL,
2013-10-18 03:46:12 +04:00
#INDEX linkrev_index (linkrev)
#);
2013-10-29 03:09:47 +04:00
#CREATE TABLE headsbookmarks(
#id INT(2) NOT NULL AUTO_INCREMENT PRIMARY KEY,
#node char(40) NOT NULL,
#name VARCHAR(256) UNIQUE
2013-10-29 03:09:47 +04:00
#);
2013-10-18 03:46:12 +04:00
# SET OPTION SQL_BIG_SELECTS = 1;
from mercurial.node import bin, hex, nullid, nullrev
from mercurial.i18n import _
from mercurial.extensions import wrapfunction, wrapcommand
from mercurial import changelog, error, cmdutil, revlog, localrepo, transaction
2013-10-30 02:55:43 +04:00
from mercurial import wireproto, bookmarks, repair, commands, hg, mdiff
2013-10-18 03:46:12 +04:00
import MySQLdb, struct, time
from MySQLdb import cursors
cmdtable = {}
command = cmdutil.command(cmdtable)
testedwith = 'internal'
2013-10-29 03:09:47 +04:00
disablesync = False
2013-10-18 03:46:12 +04:00
class CorruptionException(Exception):
pass
2013-10-18 03:46:12 +04:00
def uisetup(ui):
wrapcommand(commands.table, 'pull', pull)
2013-10-18 03:46:12 +04:00
wrapfunction(revlog.revlog, '_addrevision', addrevision)
2013-10-30 02:55:43 +04:00
wrapfunction(revlog.revlog, 'addgroup', addgroup)
2013-10-18 03:46:12 +04:00
wrapfunction(localrepo, 'instance', repoinstance)
wrapfunction(transaction.transaction, '_abort', transactionclose)
wrapfunction(transaction.transaction, 'close', transactionclose)
2013-10-29 03:09:47 +04:00
wrapfunction(wireproto, 'unbundle', unbundle)
wrapfunction(bookmarks.bmstore, 'write', bookmarkwrite)
wireproto.commands['unbundle'] = (wireproto.unbundle, 'heads')
2013-10-18 03:46:12 +04:00
def repoinstance(orig, *args):
global disablesync
2013-10-18 03:46:12 +04:00
repo = orig(*args)
if repo.ui.configbool("hgsql", "enabled") and not disablesync:
2013-10-29 03:09:47 +04:00
conn = MySQLdb.connect(**dbargs)
cur = conn.cursor()
try:
syncdb(repo, cur)
2013-10-29 03:09:47 +04:00
finally:
cur.close()
conn.close()
2013-10-18 03:46:12 +04:00
return repo
def reposetup(ui, repo):
if repo.ui.configbool("hgsql", "enabled"):
ui.setconfig("hooks", "pretxnchangegroup.remotefilelog", pretxnchangegroup)
ui.setconfig("hooks", "pretxncommit.remotefilelog", pretxnchangegroup)
2013-10-18 03:46:12 +04:00
# Sync with db
2013-10-18 03:46:12 +04:00
2013-10-29 03:09:47 +04:00
def needsync(repo, cur):
# Check latest db rev number
cur.execute("SELECT * FROM headsbookmarks")
sqlheads = set()
sqlbookmarks = {}
for _, node, name in cur:
if not name:
sqlheads.add(bin(node))
else:
sqlbookmarks[name] = bin(node)
heads = repo.heads()
bookmarks = repo._bookmarks
if (not sqlheads or len(heads) != len(sqlheads) or
len(bookmarks) != len(sqlbookmarks)):
return True
for head in sqlheads:
if head not in heads:
return True
for bookmark in sqlbookmarks:
if (not bookmark in bookmarks or
bookmarks[bookmark] != sqlbookmarks[bookmark]):
return True
return False
def syncdb(repo, cur):
2013-10-29 03:09:47 +04:00
global disablesync
if not needsync(repo, cur):
return
repo.ui.debug("syncing with mysql\n")
lock = None
try:
lock = repo.lock(wait=False)
except error.LockHeld:
# If the lock is held, someone else is doing the pull for us.
# Wait until they are done.
# TODO: I don't think this is actually true...
2013-10-29 03:09:47 +04:00
lock = repo.lock()
lock.release()
return
transaction = repo.transaction("syncdb")
revlogs = {}
2013-10-18 03:46:12 +04:00
try:
2013-10-29 03:09:47 +04:00
# Refresh the changelog now that we have the lock
del repo.changelog
2013-10-18 03:46:12 +04:00
cl = repo.changelog
clrev = len(cl) - 1
2013-10-29 03:09:47 +04:00
count = 1
chunksize = 5000
while count:
# Fetch new revs from db
2013-10-30 02:55:43 +04:00
cur.execute("SELECT * FROM revs WHERE linkrev > %s AND linkrev < %s ORDER BY linkrev ASC", (clrev, clrev + chunksize))
2013-10-29 03:09:47 +04:00
# Add the new revs
newentries = addentries(repo, cur, transaction, revlogs)
clrev += chunksize - 1
if newentries > 35000 and chunksize > 1000:
chunksize -= 100
if newentries < 25000:
chunksize += 100
count += newentries
if count > 50000 or newentries == 0:
#print "Flushing (chunksize %s)" % chunksize
count = 1
for revlog in revlogs.itervalues():
if not revlog.ifh.closed:
revlog.ifh.flush()
revlog.ifh.close()
if revlog.dfh and not revlog.dfh.closed:
2013-10-29 03:09:47 +04:00
revlog.dfh.flush()
revlog.dfh.close()
revlogs = {}
if newentries == 0:
break
transaction.close()
2013-10-18 03:46:12 +04:00
finally:
for revlog in revlogs.itervalues():
if not revlog.ifh.closed:
revlog.ifh.close()
if revlog.dfh and not revlog.dfh.closed:
revlog.dfh.close()
2013-10-29 03:09:47 +04:00
transaction.release()
lock.release()
2013-10-18 03:46:12 +04:00
del repo.changelog
2013-10-29 03:09:47 +04:00
disablesync = True
try:
cur.execute("SELECT * FROM headsbookmarks WHERE name IS NOT NULL")
bm = repo._bookmarks
bm.clear()
for _, node, name in cur:
node = bin(node)
if node in repo:
bm[name] = node
bm.write()
finally:
disablesync = False
2013-10-18 03:46:12 +04:00
def addentries(repo, revisions, transaction, revlogs):
opener = repo.sopener
def writeentry(revdata):
_, path, link, entry, data0, data1, createdtime = revdata
2013-10-18 04:15:41 +04:00
revlog = revlogs.get(path)
if not revlog:
2013-10-18 03:46:12 +04:00
revlog = EntryRevlog(opener, path)
revlogs[path] = revlog
if not hasattr(revlog, 'ifh') or revlog.ifh.closed:
2013-10-18 03:46:12 +04:00
dfh = None
if not revlog._inline:
dfh = opener(revlog.datafile, "a")
ifh = opener(revlog.indexfile, "a+")
revlog.ifh = ifh
revlog.dfh = dfh
revlog.addentry(transaction, revlog.ifh, revlog.dfh, entry,
data0, data1)
count = 0
# Write filelogs first, then manifests, then changelogs,
# just like Mercurial does normally.
changelog = []
manifest = []
for revdata in revisions:
count += 1
if revdata[1] == "00changelog.i":
changelog.append(revdata)
elif revdata[1] == "00manifest.i":
manifest.append(revdata)
else:
writeentry(revdata)
for revdata in manifest:
writeentry(revdata)
for revdata in changelog:
writeentry(revdata)
2013-10-18 03:46:12 +04:00
return count
class EntryRevlog(revlog.revlog):
def addentry(self, transaction, ifh, dfh, entry, data0, data1):
2013-10-18 04:15:41 +04:00
curr = len(self)
offset = self.end(curr)
2013-10-18 03:46:12 +04:00
e = struct.unpack(revlog.indexformatng, entry)
offsettype, datalen, textlen, base, link, p1r, p2r, node = e
2013-10-18 04:15:41 +04:00
if curr == 0:
elist = list(e)
type = revlog.gettype(offsettype)
offsettype = revlog.offset_type(0, type)
elist[0] = offsettype
2013-10-18 04:15:41 +04:00
e = tuple(elist)
# Verify that the revlog is in a good state
if p1r >= curr or p2r >= curr:
raise CorruptionException("parent revision is not in revlog: %s" % self.indexfile)
if base > curr:
raise CorruptionException("base revision is not in revlog: %s" % self.indexfile)
expectedoffset = revlog.getoffset(offsettype)
actualoffset = self.end(curr - 1)
if expectedoffset != 0 and expectedoffset != actualoffset:
raise CorruptionException("revision offset doesn't match prior length " +
"(%s offset vs %s length): %s" %
(expectedoffset, actualoffset, self.indexfile))
if node not in self.nodemap:
2013-10-18 03:46:12 +04:00
self.index.insert(-1, e)
self.nodemap[node] = len(self) - 1
2013-10-18 03:46:12 +04:00
if not self._inline:
transaction.add(self.datafile, offset)
transaction.add(self.indexfile, curr * len(entry))
2013-10-18 03:46:12 +04:00
if data0:
dfh.write(data0)
dfh.write(data1)
ifh.write(entry)
else:
offset += curr * self._io.size
transaction.add(self.indexfile, offset, curr)
2013-10-18 03:46:12 +04:00
ifh.write(entry)
ifh.write(data0)
ifh.write(data1)
2013-10-18 04:15:41 +04:00
self.checkinlinesize(transaction, ifh)
2013-10-18 03:46:12 +04:00
# Handle incoming commits
2013-10-18 03:46:12 +04:00
2013-10-29 03:09:47 +04:00
conn = None
cur = None
def unbundle(orig, repo, proto, heads):
global conn
global cur
conn = MySQLdb.connect(**dbargs)
conn.query("SELECT GET_LOCK('commit_lock', 60)")
result = conn.store_result().fetch_row()[0][0]
if result != 1:
raise Exception("unable to obtain write lock")
cur = conn.cursor()
try:
syncdb(repo, cur)
2013-10-29 03:09:47 +04:00
return orig(repo, proto, heads)
finally:
cur.close()
conn.query("SELECT RELEASE_LOCK('commit_lock')")
conn.close()
cur = None
conn = None
def pull(orig, ui, repo, source="default", **opts):
global conn
global cur
conn = MySQLdb.connect(**dbargs)
conn.query("SELECT GET_LOCK('commit_lock', 60)")
result = conn.store_result().fetch_row()[0][0]
if result != 1:
raise Exception("unable to obtain write lock")
cur = conn.cursor()
try:
syncdb(repo, cur)
return orig(ui, repo, source, **opts)
finally:
cur.close()
conn.query("SELECT RELEASE_LOCK('commit_lock')")
conn.close()
cur = None
conn = None
2013-10-18 03:46:12 +04:00
pending = []
class interceptopener(object):
def __init__(self, fp, onwrite):
object.__setattr__(self, 'fp', fp)
object.__setattr__(self, 'onwrite', onwrite)
def write(self, data):
self.fp.write(data)
self.onwrite(data)
def __getattr__(self, attr):
return getattr(self.fp, attr)
def __setattr__(self, attr, value):
return setattr(self.fp, attr, value)
def __delattr__(self, attr):
return delattr(self.fp, attr)
def addrevision(orig, self, node, text, transaction, link, p1, p2,
cachedelta, ifh, dfh):
entry = []
data0 = []
data1 = []
def iwrite(data):
if not entry:
# sometimes data0 is skipped
if data0 and not data1:
data1.append(data0[0])
del data0[:]
entry.append(data)
elif not data0:
data0.append(data)
elif not data1:
data1.append(data)
def dwrite(data):
if not data0:
data0.append(data)
elif not data1:
data1.append(data)
iopener = interceptopener(ifh, iwrite)
dopener = interceptopener(dfh, dwrite) if dfh else None
result = orig(self, node, text, transaction, link, p1, p2, cachedelta,
iopener, dopener)
try:
pending.append((-1, self.indexfile, link, entry[0], data0[0] if data0 else '', data1[0]))
except:
import pdb
pdb.set_trace()
raise
return result
2013-10-30 02:55:43 +04:00
def addgroup(orig, self, bundle, linkmapper, transaction):
"""
copy paste of revlog.addgroup, but we ensure that the revisions are added
in linkrev order.
"""
# track the base of the current delta log
content = []
node = None
r = len(self)
end = 0
if r:
end = self.end(r - 1)
ifh = self.opener(self.indexfile, "a+")
isize = r * self._io.size
if self._inline:
transaction.add(self.indexfile, end + isize, r)
dfh = None
else:
transaction.add(self.indexfile, isize, r)
transaction.add(self.datafile, end)
dfh = self.opener(self.datafile, "a")
try:
# loop through our set of deltas
chunkdatas = []
chunkmap = {}
lastlinkrev = -1
reorder = False
chain = None
while True:
chunkdata = bundle.deltachunk(chain)
if not chunkdata:
break
node = chunkdata['node']
cs = chunkdata['cs']
link = linkmapper(cs)
if link < lastlinkrev:
reorder = True
lastlinkrev = link
chunkdatas.append((link, chunkdata))
chunkmap[node] = chunkdata
chain = node
if reorder:
chunkdatas = sorted(chunkdatas)
fulltexts = {}
def getfulltext(node):
if node in fulltexts:
return fulltexts[node]
if node in self.nodemap:
return self.revision(node)
chunkdata = chunkmap[node]
deltabase = chunkdata['deltabase']
delta = chunkdata['delta']
deltachain = []
currentbase = deltabase
while True:
if currentbase in fulltexts:
deltachain.append(fulltexts[currentbase])
break
elif currentbase in self.nodemap:
deltachain.append(self.revision(currentbase))
break
elif currentbase == nullid:
break
else:
deltachunk = chunkmap[currentbase]
currentbase = deltachunk['deltabase']
deltachain.append(deltachunk['delta'])
prevtext = deltachain.pop()
while deltachain:
prevtext = mdiff.patch(prevtext, deltachain.pop())
fulltext = mdiff.patch(prevtext, delta)
fulltexts[node] = fulltext
return fulltext
reorders = 0
visited = set()
prevnode = self.node(len(self) - 1)
for link, chunkdata in chunkdatas:
node = chunkdata['node']
deltabase = chunkdata['deltabase']
if (not deltabase in self.nodemap and
not deltabase in visited):
fulltext = getfulltext(node)
ptext = getfulltext(prevnode)
delta = mdiff.textdiff(ptext, fulltext)
chunkdata['delta'] = delta
chunkdata['deltabase'] = prevnode
reorders += 1
prevnode = node
visited.add(node)
for link, chunkdata in chunkdatas:
node = chunkdata['node']
p1 = chunkdata['p1']
p2 = chunkdata['p2']
cs = chunkdata['cs']
deltabase = chunkdata['deltabase']
delta = chunkdata['delta']
content.append(node)
link = linkmapper(cs)
if node in self.nodemap:
# this can happen if two branches make the same change
continue
for p in (p1, p2):
if p not in self.nodemap:
raise LookupError(p, self.indexfile,
_('unknown parent'))
if deltabase not in self.nodemap:
raise LookupError(deltabase, self.indexfile,
_('unknown delta base'))
baserev = self.rev(deltabase)
self._addrevision(node, None, transaction, link,
p1, p2, (baserev, delta), ifh, dfh)
if not dfh and not self._inline:
# addrevision switched from inline to conventional
# reopen the index
ifh.close()
dfh = self.opener(self.datafile, "a")
ifh = self.opener(self.indexfile, "a")
finally:
if dfh:
dfh.close()
ifh.close()
return content
2013-10-18 03:46:12 +04:00
def pretxnchangegroup(ui, repo, *args, **kwargs):
2013-10-29 03:09:47 +04:00
if conn == None:
2013-10-30 02:55:43 +04:00
raise Exception("invalid repo change - only hg push and pull are allowed")
2013-10-29 03:09:47 +04:00
2013-10-18 03:46:12 +04:00
# Commit to db
try:
for revision in pending:
_, path, linkrev, entry, data0, data1 = revision
cur.execute("""INSERT INTO revs(path, linkrev, entry, data0, data1, createdtime)
VALUES(%s, %s, %s, %s, %s, %s)""", (path, linkrev, entry, data0, data1, time.strftime('%Y-%m-%d %H:%M:%S')))
2013-10-29 03:09:47 +04:00
cur.execute("""DELETE FROM headsbookmarks WHERE name IS NULL""")
for head in repo.heads():
cur.execute("""INSERT INTO headsbookmarks(node) VALUES(%s)""",
(hex(head)))
2013-10-18 03:46:12 +04:00
conn.commit()
2013-10-29 03:09:47 +04:00
except Exception:
2013-10-18 03:46:12 +04:00
conn.rollback()
raise
finally:
2013-10-29 03:09:47 +04:00
del pending[:]
def bookmarkwrite(orig, self):
if disablesync:
return orig(self)
conn = MySQLdb.connect(**dbargs)
conn.query("SELECT GET_LOCK('bookmark_lock', 60)")
result = conn.store_result().fetch_row()[0][0]
if result != 1:
raise Exception("unable to obtain write lock")
try:
cur = conn.cursor()
2013-10-18 03:46:12 +04:00
2013-10-29 03:09:47 +04:00
cur.execute("""DELETE FROM headsbookmarks WHERE name IS NOT NULL""")
for k, v in self.iteritems():
cur.execute("""INSERT INTO headsbookmarks(node, name) VALUES(%s, %s)""",
(hex(v), k))
conn.commit()
return orig(self)
finally:
cur.close()
conn.query("SELECT RELEASE_LOCK('bookmark_lock')")
conn.close()
def transactionclose(orig, self):
2013-10-29 03:09:47 +04:00
result = orig(self)
if self.count == 0:
del pending[:]
return result
# recover must be a norepo command because loading the repo fails
commands.norepo += " sqlrecover"
@command('^sqlrecover', [
('f', 'force', '', _('strips as far back as necessary'), ''),
], _('hg sqlrecover'))
def sqlrecover(ui, *args, **opts):
"""
Strips commits from the local repo until it is back in sync with the SQL
server.
"""
global disablesync
disablesync = True
repo = hg.repository(ui, ui.environ['PWD'])
def iscorrupt():
conn = MySQLdb.connect(**dbargs)
cur = conn.cursor()
try:
syncdb(repo, cur)
except CorruptionException:
return True
finally:
cur.close()
conn.close()
return False
reposize = len(repo)
stripsize = 10
while iscorrupt():
if reposize > len(repo) + 10000:
ui.warn("unable to fix repo after stripping 10000 commits (use -f to strip more)")
striprev = max(0, len(repo) - stripsize)
nodelist = [repo[striprev].node()]
repair.strip(ui, repo, nodelist, backup="none", topic="sqlrecover")
stripsize *= 5
if len(repo) == 0:
ui.warn(_("unable to fix repo corruption\n"))
elif len(repo) == reposize:
ui.status(_("local repo was not corrupt - no action taken\n"))
else:
ui.status(_("local repo now matches SQL\n"))