Switch to mysql.connector

Summary:
The old MySQLdb library did not support IPv6 so we've switched to using
mysql.connector, which is more recent and does support IPv6.

The most notable change in functionality is that it returns byte arrays instead
of strings, so I had to add a custom type converter that returns strings.

Test Plan: Ran the tests (after installing the new rpm)

Reviewers: #sourcecontrol

Differential Revision: https://phabricator.fb.com/D2665344
This commit is contained in:
Durham Goode 2015-11-17 13:34:00 -08:00
parent 0d62bc2a7a
commit 883b577953
2 changed files with 65 additions and 39 deletions

100
hgsql.py
View File

@ -37,12 +37,17 @@ from mercurial.i18n import _
from mercurial.extensions import wrapfunction, wrapcommand
from mercurial import changelog, error, cmdutil, revlog, localrepo, transaction
from mercurial import wireproto, bookmarks, repair, commands, hg, mdiff, phases
from mercurial import util, changegroup, exchange, bundle2
import MySQLdb, struct, time, Queue, threading, _mysql_exceptions
from MySQLdb import cursors
from mercurial import util, changegroup, exchange, bundle2, bundlerepo
from mercurial import demandimport
import struct, time, Queue, threading
import warnings
import sys
# mysql.connector does not import nicely with the demandimporter, so temporarily
# disable it.
with demandimport.deactivated():
import mysql.connector
cmdtable = {}
command = cmdutil.command(cmdtable)
testedwith = 'internal'
@ -282,8 +287,7 @@ class sqlcontext(object):
repo.sqlclose()
self._active = False
except (_mysql_exceptions.ProgrammingError,
_mysql_exceptions.OperationalError):
except mysql.connector.errors.Error:
# Only raise sql exceptions if the wrapped code threw no exception
if type is None:
raise
@ -316,10 +320,14 @@ def wraprepo(repo):
retry = 3
while True:
try:
self.sqlconn = MySQLdb.connect(**self.sqlargs)
self.sqlconn = mysql.connector.connect(force_ipv6=True,
**self.sqlargs)
# The default behavior is to return byte arrays, when we
# need strings. This custom convert returns strings.
self.sqlconn.set_converter_class(CustomConverter)
break
except (_mysql_exceptions.ProgrammingError,
_mysql_exceptions.OperationalError):
except mysql.connector.errors.Error:
# mysql can be flakey occasionally, so do some minimal
# retrying.
retry -= 1
@ -327,16 +335,17 @@ def wraprepo(repo):
raise
time.sleep(0.2)
self.sqlconn.autocommit(False)
waittimeout = self.ui.config('hgsql', 'waittimeout', '300')
waittimeout = self.sqlconn.escape_string("%s" % (waittimeout,))
waittimeout = self.sqlconn.converter.escape('%s' % waittimeout)
self.locktimeout = self.ui.config('hgsql', 'locktimeout', '60')
self.locktimeout = self.sqlconn.escape_string("%s" %
(self.locktimeout,))
self.sqlconn.query("SET wait_timeout=%s" % waittimeout)
self.sqlconn.query("SET innodb_lock_wait_timeout=%s" %
self.locktimeout)
self.locktimeout = self.sqlconn.converter.escape('%s' %
self.locktimeout)
self.sqlcursor = self.sqlconn.cursor()
self.sqlcursor.execute("SET wait_timeout=%s" % waittimeout)
self.sqlcursor.execute("SET innodb_lock_wait_timeout=%s" %
self.locktimeout)
def sqlclose(self):
with warnings.catch_warnings():
@ -346,38 +355,44 @@ def wraprepo(repo):
self.sqlcursor = None
self.sqlconn = None
def _lockname(self, name):
lockname = "%s_%s" % (name, self.sqlreponame)
return self.sqlconn.converter.escape(lockname)
def sqllock(self, name):
escapedname = self.sqlconn.escape_string("%s_%s" %
(name, self.sqlreponame))
lockname = self._lockname(name)
# cast to int to prevent passing bad sql data
self.sqlconn.query("SELECT GET_LOCK('%s', %s)" %
(escapedname, self.locktimeout))
result = self.sqlconn.store_result().fetch_row()[0][0]
self.sqlcursor.execute("SELECT GET_LOCK('%s', %s)" %
(lockname, self.locktimeout))
result = int(self.sqlcursor.fetchall()[0][0])
if result != 1:
raise util.Abort("timed out waiting for mysql repo lock (%s)" % escapedname)
raise util.Abort("timed out waiting for mysql repo lock (%s)" %
lockname)
self.heldlocks.add(name)
def hassqllock(self, name):
if not name in self.heldlocks:
return False
escapedname = self.sqlconn.escape_string("%s_%s" % (name, self.sqlreponame))
self.sqlconn.query("SELECT IS_USED_LOCK('%s')" % (escapedname,))
lockheldby = self.sqlconn.store_result().fetch_row()[0][0]
lockname = self._lockname(name)
self.sqlcursor.execute("SELECT IS_USED_LOCK('%s')" % (lockname,))
lockheldby = self.sqlcursor.fetchall()[0][0]
if lockheldby == None:
raise Exception("unable to check %s lock" % escapedname)
raise Exception("unable to check %s lock" % lockname)
self.sqlconn.query("SELECT CONNECTION_ID()")
myconnectid = self.sqlconn.store_result().fetch_row()[0][0]
self.sqlcursor.execute("SELECT CONNECTION_ID()")
myconnectid = self.sqlcursor.fetchall()[0][0]
if myconnectid == None:
raise Exception("unable to read connection id")
return lockheldby == myconnectid
def sqlunlock(self, name):
escapedname = self.sqlconn.escape_string("%s_%s" % (name, self.sqlreponame))
self.sqlconn.query("SELECT RELEASE_LOCK('%s')" % (escapedname,))
self.sqlconn.store_result().fetch_row()
lockname = self._lockname(name)
self.sqlcursor.execute("SELECT RELEASE_LOCK('%s')" % (lockname,))
self.sqlcursor.fetchall()
self.heldlocks.discard(name)
def transaction(self, *args, **kwargs):
@ -405,7 +420,7 @@ def wraprepo(repo):
If it returns False, the heads and bookmarks match the database.
"""
self.sqlcursor.execute("""SELECT namespace, name, value
FROM revision_references WHERE repo = %s""", (self.sqlreponame))
FROM revision_references WHERE repo = %s""", (self.sqlreponame,))
sqlheads = set()
sqlbookmarks = {}
tip = -1
@ -513,7 +528,7 @@ def wraprepo(repo):
bm.clear()
self.sqlcursor.execute("""SELECT name, value FROM revision_references
WHERE namespace = 'bookmarks' AND repo = %s""",
(self.sqlreponame))
(self.sqlreponame,))
for name, node in self.sqlcursor:
node = bin(node)
if node in self:
@ -773,7 +788,7 @@ def wraprepo(repo):
# Validate that we are appending to the correct linkrev
cursor.execute("""SELECT value FROM revision_references WHERE repo = %s AND
namespace = 'tip'""", reponame)
namespace = 'tip'""", (reponame,))
tipresults = cursor.fetchall()
if len(tipresults) == 0:
maxlinkrev = -1
@ -864,13 +879,12 @@ def wraprepo(repo):
sqlargs = {}
sqlargs['host'] = ui.config("hgsql", "host")
sqlargs['db'] = ui.config("hgsql", "database")
sqlargs['database'] = ui.config("hgsql", "database")
sqlargs['user'] = ui.config("hgsql", "user")
sqlargs['port'] = ui.configint("hgsql", "port")
password = ui.config("hgsql", "password", "")
if password:
sqlargs['passwd'] = password
sqlargs['cursorclass'] = cursors.SSCursor
sqlargs['password'] = password
repo.sqlargs = sqlargs
@ -1246,7 +1260,7 @@ def bookmarkwrite(orig, self):
try:
cursor = repo.sqlcursor
cursor.execute("""DELETE FROM revision_references WHERE repo = %s AND
namespace = 'bookmarks'""", (repo.sqlreponame))
namespace = 'bookmarks'""", (repo.sqlreponame,))
for k, v in self.iteritems():
cursor.execute("""INSERT INTO revision_references(repo, namespace, name, value)
@ -1410,3 +1424,15 @@ def sqlstrip(ui, rev, *args, **opts):
lock.release()
if wlock:
wlock.release()
class CustomConverter(mysql.connector.conversion.MySQLConverter):
"""Ensure that all values being returned are returned as python string
(versus the default byte arrays)."""
def _STRING_to_python(self, value, dsc=None):
return str(value)
def _VAR_STRING_to_python(self, value, dsc=None):
return str(value)
def _BLOB_to_python(self, value, dsc=None):
return str(value)

View File

@ -5,9 +5,9 @@ DBNAME=`echo $DBHOSTPORT | cut -d : -f 3`
DBUSER=`echo $DBHOSTPORT | cut -d : -f 4`
DBPASS=`echo $DBHOSTPORT | cut -d : -f 5-`
mysql -h $DBHOST -P $DBPORT -u $DBUSER -p$DBPASS -e "
mysql -h $DBHOST -P $DBPORT -u $DBUSER -p"$DBPASS" -e "
CREATE DATABASE IF NOT EXISTS $DBNAME;" 2>/dev/null
mysql -h $DBHOST -P $DBPORT -D $DBNAME -u $DBUSER -p$DBPASS -e '
mysql -h $DBHOST -P $DBPORT -D $DBNAME -u $DBUSER -p"$DBPASS" -e '
DROP TABLE IF EXISTS revisions;
CREATE TABLE IF NOT EXISTS revisions(