From 32863d5aa74878831645ebb6b087715ec1cb41f0 Mon Sep 17 00:00:00 2001 From: Durham Goode Date: Thu, 23 Jan 2014 19:22:49 -0800 Subject: [PATCH] General clean up and maxrowsize configuration Fixes up some comments and function descriptions. Adds hgsql.maxrowsize configuration option with a default value of 1MB. --- hgsql.py | 107 +++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 72 insertions(+), 35 deletions(-) diff --git a/hgsql.py b/hgsql.py index d647a74a80..221d665c9b 100644 --- a/hgsql.py +++ b/hgsql.py @@ -47,7 +47,6 @@ testedwith = 'internal' bookmarklock = 'bookmark_lock' commitlock = 'commit_lock' -maxrecordsize = 1024 * 1024 disableinitialsync = False class CorruptionException(Exception): @@ -80,8 +79,10 @@ def reposetup(ui, repo): def noop(): pass executewithsql(repo, noop) + else: + raise Exception("hgsql extension installed but not enabled") -# Handle incoming commits +# Incoming commits are only allowed via push and pull def unbundle(orig, *args, **kwargs): repo = args[0] return executewithsql(repo, orig, commitlock, *args, **kwargs) @@ -91,10 +92,18 @@ def pull(orig, *args, **kwargs): return executewithsql(repo, orig, commitlock, *args, **kwargs) def executewithsql(repo, action, lock=None, *args, **kwargs): + """Executes the given action while having a SQL connection open. + If a lock is specified, that lock is held for the duration of the + action. + """ + # executewithsql can be executed in a nested scenario (ex: writing + # bookmarks during a pull), so track whether this call performed + # the connect. connected = False if not repo.sqlconn: repo.sqlconnect() connected = True + if lock: repo.sqllock(lock) @@ -114,8 +123,8 @@ def executewithsql(repo, action, lock=None, *args, **kwargs): except _mysql_exceptions.ProgrammingError, ex: if success: raise - # if the action caused an exception, hide sql cleanup exceptions, - # so the real exception is propagated up + # If the action caused an exception, hide sql cleanup exceptions, + # so the real exception is propagated up. pass return result @@ -129,7 +138,7 @@ def wraprepo(repo): raise Exception("SQL cursor already open without connection") self.sqlconn = MySQLdb.connect(**self.sqlargs) self.sqlconn.autocommit(False) - self.sqlconn.query("SET SESSION wait_timeout=300;") + self.sqlconn.query("SET wait_timeout=300") self.sqlcursor = self.sqlconn.cursor() def sqlclose(self): @@ -267,6 +276,10 @@ def wraprepo(repo): lock.release() def fetchthread(self, queue, abort, fetchstart, fetchend): + """Fetches every revision from fetchstart to fetchend (inclusive) + and places them on the queue. This function is meant to run on a + background thread and listens to the abort event to abort early. + """ ui = self.ui clrev = fetchstart chunksize = 1000 @@ -280,7 +293,7 @@ def wraprepo(repo): AND linkrev > %s AND linkrev < %s ORDER BY linkrev ASC""", (self.sqlreponame, clrev - 1, maxrev)) - # put split chunks back together + # Put split chunks back together into a single revision groupedrevdata = {} for revdata in self.sqlcursor: name = revdata[0] @@ -312,12 +325,12 @@ def wraprepo(repo): queue.put(revdata) clrev += chunksize - if (clrev - fetchstart) % 5000 == 0: - ui.debug("Queued %s\n" % (clrev)) queue.put(False) def committodb(self): + """Commits all pending revisions to the database + """ if self.sqlconn == None: raise Exception("invalid repo change - only hg push and pull" + " are allowed") @@ -328,24 +341,26 @@ def wraprepo(repo): reponame = self.sqlreponame cursor = self.sqlcursor maxcommitsize = self.maxcommitsize + maxrowsize = self.maxrowsize self._validatependingrevs() - datasize = 0 - - # Commit to db try: + datasize = 0 for revision in self.pendingrevs: path, linkrev, rev, node, entry, data0, data1 = revision start = 0 chunk = 0 datalen = len(data1) - chunkcount = datalen / maxrecordsize - if datalen % maxrecordsize != 0 or datalen == 0: + chunkcount = datalen / maxrowsize + if datalen % maxrowsize != 0 or datalen == 0: chunkcount += 1 + + # We keep row size down by breaking large revisions down into + # smaller chunks. while chunk == 0 or start < len(data1): - end = min(len(data1), start + maxrecordsize) + end = min(len(data1), start + maxrowsize) datachunk = data1[start:end] cursor.execute("""INSERT INTO revisions(repo, path, chunk, chunkcount, linkrev, rev, node, entry, data0, data1, createdtime) @@ -384,7 +399,7 @@ def wraprepo(repo): del self.pendingrevs[:] def _validatependingrevs(self): - """validates that the current pending revs will be valid when + """Validates that the current pending revisions will be valid when written to the database. """ reponame = self.sqlreponame @@ -411,7 +426,7 @@ def wraprepo(repo): # Since MySQL can only hold so much data in a transaction, we allow # committing across multiple db transactions. That means if # the commit is interrupted, the next transaction needs to clean - # up, which we do here. + # up bad revisions. cursor.execute("""DELETE FROM revisions WHERE repo = %s AND linkrev > %s""", (reponame, maxlinkrev)) @@ -464,6 +479,7 @@ def wraprepo(repo): "revisions before adding a commit") % (len(expectedrevs))) ui = repo.ui + sqlargs = {} sqlargs['host'] = ui.config("hgsql", "host") sqlargs['db'] = ui.config("hgsql", "database") @@ -474,19 +490,24 @@ def wraprepo(repo): sqlargs['passwd'] = password sqlargs['cursorclass'] = cursors.SSCursor + repo.sqlargs = sqlargs + repo.sqlreponame = ui.config("hgsql", "reponame") if not repo.sqlreponame: raise Exception("missing hgsql.reponame") - repo.sqlargs = sqlargs + repo.maxcommitsize = ui.configbytes("hgsql", "maxcommitsize", 52428800) + repo.maxrowsize = ui.configbytes("hgsql", "maxrowsize", 1048576) repo.sqlconn = None repo.sqlcursor = None repo.disablesync = False repo.pendingrevs = [] - repo.maxcommitsize = ui.configbytes("hgsql", "maxcommitsize", 52428800) repo.__class__ = sqllocalrepo class bufferedopener(object): + """Opener implementation that buffers all writes in memory until + flush or close is called. + """ def __init__(self, opener, path, mode): self.opener = opener self.path = path @@ -496,7 +517,7 @@ class bufferedopener(object): def write(self, value): if self.closed: - raise Exception("") + raise Exception("attempted to write to a closed bufferedopener") self.buffer.append(value) def flush(self): @@ -513,16 +534,22 @@ class bufferedopener(object): self.closed = True def addentries(repo, queue, transaction): + """Reads new rev entries from a queue and writes them to a buffered + revlog. At the end it flushes all the new data to disk. + """ opener = repo.sopener revlogs = {} def writeentry(revdata): + # Instantiates pseudo-revlogs for us to write data directly to path, chunk, chunkcount, link, entry, data0, data1 = revdata revlog = revlogs.get(path) if not revlog: revlog = EntryRevlog(opener, path) revlogs[path] = revlog + # Replace the existing openers with buffered ones so we can + # perform the flush to disk all at once at the end. if not hasattr(revlog, 'ifh') or revlog.ifh.closed: dfh = None if not revlog._inline: @@ -533,9 +560,7 @@ def addentries(repo, queue, transaction): revlog.addentry(transaction, revlog.ifh, revlog.dfh, entry, data0, data1) - revlog.dirty = True - clrev = len(repo) leftover = None exit = False @@ -561,7 +586,7 @@ def addentries(repo, queue, transaction): if linkrev == currentlinkrev: revisions.append(revdata) elif linkrev < currentlinkrev: - raise Exception("SQL data is not in linkrev order") + raise CorruptionException("SQL data is not in linkrev order") else: leftover = revdata currentlinkrev = linkrev @@ -573,8 +598,6 @@ def addentries(repo, queue, transaction): for revdata in revisions: writeentry(revdata) - clrev += 1 - # Flush filelogs, then manifest, then changelog changelog = revlogs.pop("00changelog.i", None) manifest = revlogs.pop("00manifest.i", None) @@ -594,12 +617,18 @@ def addentries(repo, queue, transaction): flushrevlog(changelog) class EntryRevlog(revlog.revlog): + """Pseudo-revlog implementation that allows applying data directly to + the end of the revlog. + """ def addentry(self, transaction, ifh, dfh, entry, data0, data1): curr = len(self) offset = self.end(curr - 1) e = struct.unpack(revlog.indexformatng, entry) offsettype, datalen, textlen, base, link, p1r, p2r, node = e + + # The first rev has special metadata encoded in it that should be + # stripped before being added to the index. if curr == 0: elist = list(e) type = revlog.gettype(offsettype) @@ -607,11 +636,13 @@ class EntryRevlog(revlog.revlog): elist[0] = offsettype e = tuple(elist) - # Verify that the revlog is in a good state + # Verify that the rev's parents and base appear earlier in the revlog if p1r >= curr or p2r >= curr: - raise CorruptionException("parent revision is not in revlog: %s" % self.indexfile) + raise CorruptionException("parent revision is not in revlog: %s" % + self.indexfile) if base > curr: - raise CorruptionException("base revision is not in revlog: %s" % self.indexfile) + raise CorruptionException("base revision is not in revlog: %s" % + self.indexfile) expectedoffset = revlog.getoffset(offsettype) if expectedoffset != 0 and expectedoffset != offset: @@ -639,9 +670,8 @@ class EntryRevlog(revlog.revlog): self.checkinlinesize(transaction, ifh) def addgroup(orig, self, bundle, linkmapper, transaction): - """ - copy paste of revlog.addgroup, but we ensure that the revisions are added - in linkrev order. + """Copy paste of revlog.addgroup, but we ensure that the revisions are + added in linkrev order. """ # track the base of the current delta log content = [] @@ -810,6 +840,7 @@ commands.norepo += " sqlrecover" @command('^sqlrecover', [ ('f', 'force', '', _('strips as far back as necessary'), ''), + ('', 'no-backup', '', _('does not produce backup bundles for strips'), ''), ], _('hg sqlrecover')) def sqlrecover(ui, *args, **opts): """ @@ -839,13 +870,19 @@ def sqlrecover(ui, *args, **opts): reposize = len(repo) stripsize = 10 - while iscorrupt(): - if reposize > len(repo) + 10000: - ui.warn("unable to fix repo after stripping 10000 commits (use -f to strip more)") + while iscorrupt() and len(repo) > 0: + if not opts.get('force') and reposize > len(repo) + 10000: + ui.warn("unable to fix repo after stripping 10000 commits " + + "(use -f to strip more)") + striprev = max(0, len(repo) - stripsize) nodelist = [repo[striprev].node()] - repair.strip(ui, repo, nodelist, backup="all", topic="sqlrecover") - stripsize *= 5 + ui.status("stripping back to %s commits" % (striprev)) + + backup = "none" if opts.get("no-backup") else "all" + repair.strip(ui, repo, nodelist, backup=backup, topic="sqlrecover") + + stripsize = min(stripsize * 5, 10000) if len(repo) == 0: ui.warn(_("unable to fix repo corruption\n"))