General clean up and maxrowsize configuration

Fixes up some comments and function descriptions.

Adds hgsql.maxrowsize configuration option with a default value of 1MB.
This commit is contained in:
Durham Goode 2014-01-23 19:22:49 -08:00
parent bfe1b6458d
commit 32863d5aa7

107
hgsql.py
View File

@ -47,7 +47,6 @@ testedwith = 'internal'
bookmarklock = 'bookmark_lock'
commitlock = 'commit_lock'
maxrecordsize = 1024 * 1024
disableinitialsync = False
class CorruptionException(Exception):
@ -80,8 +79,10 @@ def reposetup(ui, repo):
def noop():
pass
executewithsql(repo, noop)
else:
raise Exception("hgsql extension installed but not enabled")
# Handle incoming commits
# Incoming commits are only allowed via push and pull
def unbundle(orig, *args, **kwargs):
repo = args[0]
return executewithsql(repo, orig, commitlock, *args, **kwargs)
@ -91,10 +92,18 @@ def pull(orig, *args, **kwargs):
return executewithsql(repo, orig, commitlock, *args, **kwargs)
def executewithsql(repo, action, lock=None, *args, **kwargs):
"""Executes the given action while having a SQL connection open.
If a lock is specified, that lock is held for the duration of the
action.
"""
# executewithsql can be executed in a nested scenario (ex: writing
# bookmarks during a pull), so track whether this call performed
# the connect.
connected = False
if not repo.sqlconn:
repo.sqlconnect()
connected = True
if lock:
repo.sqllock(lock)
@ -114,8 +123,8 @@ def executewithsql(repo, action, lock=None, *args, **kwargs):
except _mysql_exceptions.ProgrammingError, ex:
if success:
raise
# if the action caused an exception, hide sql cleanup exceptions,
# so the real exception is propagated up
# If the action caused an exception, hide sql cleanup exceptions,
# so the real exception is propagated up.
pass
return result
@ -129,7 +138,7 @@ def wraprepo(repo):
raise Exception("SQL cursor already open without connection")
self.sqlconn = MySQLdb.connect(**self.sqlargs)
self.sqlconn.autocommit(False)
self.sqlconn.query("SET SESSION wait_timeout=300;")
self.sqlconn.query("SET wait_timeout=300")
self.sqlcursor = self.sqlconn.cursor()
def sqlclose(self):
@ -267,6 +276,10 @@ def wraprepo(repo):
lock.release()
def fetchthread(self, queue, abort, fetchstart, fetchend):
"""Fetches every revision from fetchstart to fetchend (inclusive)
and places them on the queue. This function is meant to run on a
background thread and listens to the abort event to abort early.
"""
ui = self.ui
clrev = fetchstart
chunksize = 1000
@ -280,7 +293,7 @@ def wraprepo(repo):
AND linkrev > %s AND linkrev < %s ORDER BY linkrev ASC""",
(self.sqlreponame, clrev - 1, maxrev))
# put split chunks back together
# Put split chunks back together into a single revision
groupedrevdata = {}
for revdata in self.sqlcursor:
name = revdata[0]
@ -312,12 +325,12 @@ def wraprepo(repo):
queue.put(revdata)
clrev += chunksize
if (clrev - fetchstart) % 5000 == 0:
ui.debug("Queued %s\n" % (clrev))
queue.put(False)
def committodb(self):
"""Commits all pending revisions to the database
"""
if self.sqlconn == None:
raise Exception("invalid repo change - only hg push and pull" +
" are allowed")
@ -328,24 +341,26 @@ def wraprepo(repo):
reponame = self.sqlreponame
cursor = self.sqlcursor
maxcommitsize = self.maxcommitsize
maxrowsize = self.maxrowsize
self._validatependingrevs()
datasize = 0
# Commit to db
try:
datasize = 0
for revision in self.pendingrevs:
path, linkrev, rev, node, entry, data0, data1 = revision
start = 0
chunk = 0
datalen = len(data1)
chunkcount = datalen / maxrecordsize
if datalen % maxrecordsize != 0 or datalen == 0:
chunkcount = datalen / maxrowsize
if datalen % maxrowsize != 0 or datalen == 0:
chunkcount += 1
# We keep row size down by breaking large revisions down into
# smaller chunks.
while chunk == 0 or start < len(data1):
end = min(len(data1), start + maxrecordsize)
end = min(len(data1), start + maxrowsize)
datachunk = data1[start:end]
cursor.execute("""INSERT INTO revisions(repo, path, chunk,
chunkcount, linkrev, rev, node, entry, data0, data1, createdtime)
@ -384,7 +399,7 @@ def wraprepo(repo):
del self.pendingrevs[:]
def _validatependingrevs(self):
"""validates that the current pending revs will be valid when
"""Validates that the current pending revisions will be valid when
written to the database.
"""
reponame = self.sqlreponame
@ -411,7 +426,7 @@ def wraprepo(repo):
# Since MySQL can only hold so much data in a transaction, we allow
# committing across multiple db transactions. That means if
# the commit is interrupted, the next transaction needs to clean
# up, which we do here.
# up bad revisions.
cursor.execute("""DELETE FROM revisions WHERE repo = %s AND
linkrev > %s""", (reponame, maxlinkrev))
@ -464,6 +479,7 @@ def wraprepo(repo):
"revisions before adding a commit") % (len(expectedrevs)))
ui = repo.ui
sqlargs = {}
sqlargs['host'] = ui.config("hgsql", "host")
sqlargs['db'] = ui.config("hgsql", "database")
@ -474,19 +490,24 @@ def wraprepo(repo):
sqlargs['passwd'] = password
sqlargs['cursorclass'] = cursors.SSCursor
repo.sqlargs = sqlargs
repo.sqlreponame = ui.config("hgsql", "reponame")
if not repo.sqlreponame:
raise Exception("missing hgsql.reponame")
repo.sqlargs = sqlargs
repo.maxcommitsize = ui.configbytes("hgsql", "maxcommitsize", 52428800)
repo.maxrowsize = ui.configbytes("hgsql", "maxrowsize", 1048576)
repo.sqlconn = None
repo.sqlcursor = None
repo.disablesync = False
repo.pendingrevs = []
repo.maxcommitsize = ui.configbytes("hgsql", "maxcommitsize", 52428800)
repo.__class__ = sqllocalrepo
class bufferedopener(object):
"""Opener implementation that buffers all writes in memory until
flush or close is called.
"""
def __init__(self, opener, path, mode):
self.opener = opener
self.path = path
@ -496,7 +517,7 @@ class bufferedopener(object):
def write(self, value):
if self.closed:
raise Exception("")
raise Exception("attempted to write to a closed bufferedopener")
self.buffer.append(value)
def flush(self):
@ -513,16 +534,22 @@ class bufferedopener(object):
self.closed = True
def addentries(repo, queue, transaction):
"""Reads new rev entries from a queue and writes them to a buffered
revlog. At the end it flushes all the new data to disk.
"""
opener = repo.sopener
revlogs = {}
def writeentry(revdata):
# Instantiates pseudo-revlogs for us to write data directly to
path, chunk, chunkcount, link, entry, data0, data1 = revdata
revlog = revlogs.get(path)
if not revlog:
revlog = EntryRevlog(opener, path)
revlogs[path] = revlog
# Replace the existing openers with buffered ones so we can
# perform the flush to disk all at once at the end.
if not hasattr(revlog, 'ifh') or revlog.ifh.closed:
dfh = None
if not revlog._inline:
@ -533,9 +560,7 @@ def addentries(repo, queue, transaction):
revlog.addentry(transaction, revlog.ifh, revlog.dfh, entry,
data0, data1)
revlog.dirty = True
clrev = len(repo)
leftover = None
exit = False
@ -561,7 +586,7 @@ def addentries(repo, queue, transaction):
if linkrev == currentlinkrev:
revisions.append(revdata)
elif linkrev < currentlinkrev:
raise Exception("SQL data is not in linkrev order")
raise CorruptionException("SQL data is not in linkrev order")
else:
leftover = revdata
currentlinkrev = linkrev
@ -573,8 +598,6 @@ def addentries(repo, queue, transaction):
for revdata in revisions:
writeentry(revdata)
clrev += 1
# Flush filelogs, then manifest, then changelog
changelog = revlogs.pop("00changelog.i", None)
manifest = revlogs.pop("00manifest.i", None)
@ -594,12 +617,18 @@ def addentries(repo, queue, transaction):
flushrevlog(changelog)
class EntryRevlog(revlog.revlog):
"""Pseudo-revlog implementation that allows applying data directly to
the end of the revlog.
"""
def addentry(self, transaction, ifh, dfh, entry, data0, data1):
curr = len(self)
offset = self.end(curr - 1)
e = struct.unpack(revlog.indexformatng, entry)
offsettype, datalen, textlen, base, link, p1r, p2r, node = e
# The first rev has special metadata encoded in it that should be
# stripped before being added to the index.
if curr == 0:
elist = list(e)
type = revlog.gettype(offsettype)
@ -607,11 +636,13 @@ class EntryRevlog(revlog.revlog):
elist[0] = offsettype
e = tuple(elist)
# Verify that the revlog is in a good state
# Verify that the rev's parents and base appear earlier in the revlog
if p1r >= curr or p2r >= curr:
raise CorruptionException("parent revision is not in revlog: %s" % self.indexfile)
raise CorruptionException("parent revision is not in revlog: %s" %
self.indexfile)
if base > curr:
raise CorruptionException("base revision is not in revlog: %s" % self.indexfile)
raise CorruptionException("base revision is not in revlog: %s" %
self.indexfile)
expectedoffset = revlog.getoffset(offsettype)
if expectedoffset != 0 and expectedoffset != offset:
@ -639,9 +670,8 @@ class EntryRevlog(revlog.revlog):
self.checkinlinesize(transaction, ifh)
def addgroup(orig, self, bundle, linkmapper, transaction):
"""
copy paste of revlog.addgroup, but we ensure that the revisions are added
in linkrev order.
"""Copy paste of revlog.addgroup, but we ensure that the revisions are
added in linkrev order.
"""
# track the base of the current delta log
content = []
@ -810,6 +840,7 @@ commands.norepo += " sqlrecover"
@command('^sqlrecover', [
('f', 'force', '', _('strips as far back as necessary'), ''),
('', 'no-backup', '', _('does not produce backup bundles for strips'), ''),
], _('hg sqlrecover'))
def sqlrecover(ui, *args, **opts):
"""
@ -839,13 +870,19 @@ def sqlrecover(ui, *args, **opts):
reposize = len(repo)
stripsize = 10
while iscorrupt():
if reposize > len(repo) + 10000:
ui.warn("unable to fix repo after stripping 10000 commits (use -f to strip more)")
while iscorrupt() and len(repo) > 0:
if not opts.get('force') and reposize > len(repo) + 10000:
ui.warn("unable to fix repo after stripping 10000 commits " +
"(use -f to strip more)")
striprev = max(0, len(repo) - stripsize)
nodelist = [repo[striprev].node()]
repair.strip(ui, repo, nodelist, backup="all", topic="sqlrecover")
stripsize *= 5
ui.status("stripping back to %s commits" % (striprev))
backup = "none" if opts.get("no-backup") else "all"
repair.strip(ui, repo, nodelist, backup=backup, topic="sqlrecover")
stripsize = min(stripsize * 5, 10000)
if len(repo) == 0:
ui.warn(_("unable to fix repo corruption\n"))