infinitepush: refactor bundlestore

Summary:
Move all the store creation and access code into bundlestore.  Clean up the
interfaces to the index and store, removing the abstract base classes as no
code was being shared.

Reviewed By: DurhamG

Differential Revision: D15170164

fbshipit-source-id: f22ec4a266333b3100074b21cf9577c82f56e4c6
This commit is contained in:
Mark Thomas 2019-05-02 07:02:15 -07:00 committed by Facebook Github Bot
parent 104261775b
commit 522e0f2ec4
5 changed files with 153 additions and 275 deletions

View File

@ -100,7 +100,6 @@ Configs::
from __future__ import absolute_import
import collections
import contextlib
import errno
import functools
@ -144,7 +143,7 @@ from edenscm.mercurial import (
)
from edenscm.mercurial.commands import debug as debugcommands
from . import bundleparts, common, infinitepushcommands
from . import bundleparts, bundlestore, common, infinitepushcommands
copiedpart = bundleparts.copiedpart
@ -185,60 +184,6 @@ colortable = {
}
def _buildexternalbundlestore(ui):
put_args = ui.configlist("infinitepush", "put_args", [])
put_binary = ui.config("infinitepush", "put_binary")
if not put_binary:
raise error.Abort("put binary is not specified")
get_args = ui.configlist("infinitepush", "get_args", [])
get_binary = ui.config("infinitepush", "get_binary")
if not get_binary:
raise error.Abort("get binary is not specified")
from . import store
return store.externalbundlestore(put_binary, put_args, get_binary, get_args)
def _buildsqlindex(ui):
sqlhost = ui.config("infinitepush", "sqlhost")
if not sqlhost:
raise error.Abort(_("please set infinitepush.sqlhost"))
host, port, db, user, password = sqlhost.split(":")
reponame = ui.config("infinitepush", "reponame")
if not reponame:
raise error.Abort(_("please set infinitepush.reponame"))
logfile = ui.config("infinitepush", "logfile", "")
waittimeout = ui.configint("infinitepush", "waittimeout", 300)
locktimeout = ui.configint("infinitepush", "locktimeout", 120)
shorthasholdrevthreshold = ui.configint(
"infinitepush", "shorthasholdrevthreshold", 60
)
from . import sqlindexapi
return sqlindexapi.sqlindexapi(
reponame,
host,
port,
db,
user,
password,
logfile,
_getloglevel(ui),
shorthasholdrevthreshold=shorthasholdrevthreshold,
waittimeout=waittimeout,
locktimeout=locktimeout,
)
def _getloglevel(ui):
loglevel = ui.config("infinitepush", "loglevel", "DEBUG")
numeric_loglevel = getattr(logging, loglevel.upper(), None)
if not isinstance(numeric_loglevel, int):
raise error.Abort(_("invalid log level %s") % loglevel)
return numeric_loglevel
def _tryhoist(ui, remotebookmark):
"""returns a bookmarks with hoisted part removed
@ -273,37 +218,9 @@ def _debugbundle2part(orig, ui, part, all, **opts):
orig(ui, part, all, **opts)
class bundlestore(object):
def __init__(self, repo):
self._repo = repo
storetype = self._repo.ui.config("infinitepush", "storetype", "")
if storetype == "disk":
from . import store
self.store = store.filebundlestore(self._repo.ui, self._repo)
elif storetype == "external":
self.store = _buildexternalbundlestore(self._repo.ui)
else:
raise error.Abort(
_("unknown infinitepush store type specified %s") % storetype
)
indextype = self._repo.ui.config("infinitepush", "indextype", "")
if indextype == "disk":
from . import fileindexapi
self.index = fileindexapi.fileindexapi(self._repo)
elif indextype == "sql":
self.index = _buildsqlindex(self._repo.ui)
else:
raise error.Abort(
_("unknown infinitepush index type specified %s") % indextype
)
def reposetup(ui, repo):
if common.isserver(ui) and repo.local():
repo.bundlestore = bundlestore(repo)
repo.bundlestore = bundlestore.bundlestore(repo)
def uisetup(ui):

View File

@ -1,52 +1,43 @@
# Infinitepush Bundle Store
#
# Copyright 2016-2019 Facebook, Inc.
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
"""store for infinitepush bundles"""
# based on bundleheads extension by Gregory Szorc <gps@mozilla.com>
import abc
import hashlib
import os
import subprocess
from tempfile import NamedTemporaryFile
from edenscm.mercurial import error
from edenscm.mercurial.i18n import _
class BundleWriteException(Exception):
pass
from . import fileindex, sqlindex
class BundleReadException(Exception):
pass
class bundlestore(object):
def __init__(self, repo):
storetype = repo.ui.config("infinitepush", "storetype", "")
if storetype == "disk":
self.store = filebundlestore(repo)
elif storetype == "external":
self.store = externalbundlestore(repo)
else:
raise error.Abort(
_("unknown infinitepush store type specified %s") % storetype
)
class abstractbundlestore(object):
"""Defines the interface for bundle stores.
A bundle store is an entity that stores raw bundle data. It is a simple
key-value store. However, the keys are chosen by the store. The keys can
be any Python object understood by the corresponding bundle index (see
``abstractbundleindex`` below).
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def write(self, data):
"""Write bundle data to the store.
This function receives the raw data to be written as a str.
Throws BundleWriteException
The key of the written data MUST be returned.
"""
@abc.abstractmethod
def read(self, key):
"""Obtain bundle data for a key.
Returns None if the bundle isn't known.
Throws BundleReadException
The returned object should be a file object supporting read()
and close().
"""
indextype = repo.ui.config("infinitepush", "indextype", "")
if indextype == "disk":
self.index = fileindex.fileindex(repo)
elif indextype == "sql":
self.index = sqlindex.sqlindex(repo)
else:
raise error.Abort(
_("unknown infinitepush index type specified %s") % indextype
)
class filebundlestore(object):
@ -55,14 +46,10 @@ class filebundlestore(object):
meant for storing bundles somewhere on disk and on network filesystems
"""
def __init__(self, ui, repo):
self.ui = ui
self.repo = repo
self.storepath = ui.configpath("scratchbranch", "storepath")
def __init__(self, repo):
self.storepath = repo.ui.configpath("scratchbranch", "storepath")
if not self.storepath:
self.storepath = self.repo.localvfs.join(
"scratchbranches", "filebundlestore"
)
self.storepath = repo.localvfs.join("scratchbranches", "filebundlestore")
if not os.path.exists(self.storepath):
os.makedirs(self.storepath)
@ -96,8 +83,8 @@ class filebundlestore(object):
return f.read()
class externalbundlestore(abstractbundlestore):
def __init__(self, put_binary, put_args, get_binary, get_args):
class externalbundlestore(object):
def __init__(self, repo):
"""
`put_binary` - path to binary file which uploads bundle to external
storage and prints key to stdout
@ -108,11 +95,25 @@ class externalbundlestore(abstractbundlestore):
`get_args` - format string with additional args to `get_binary`.
{filename} and {handle} replacement field can be used.
"""
ui = repo.ui
self.put_args = put_args
self.get_args = get_args
self.put_binary = put_binary
self.get_binary = get_binary
# path to the binary which uploads a bundle to the external store
# and prints the key to stdout.
self.put_binary = ui.config("infinitepush", "put_binary")
if not self.put_binary:
raise error.Abort("put binary is not specified")
# Additional args to ``put_binary``. The '{filename}' replacement field
# can be used to get the filename.
self.put_args = ui.configlist("infinitepush", "put_args", [])
# path to the binary which accepts a file and key (in that order) and
# downloads the bundle form the store and saves it to the file.
self.get_binary = ui.config("infinitepush", "get_binary")
if not self.get_binary:
raise error.Abort("get binary is not specified")
# Additional args to ``get_binary``. The '{filename}' and '{handle}'
# replacement fields can be used to get the filename and key.
self.get_args = ui.configlist("infinitepush", "get_args", [])
def _call_binary(self, args):
p = subprocess.Popen(
@ -135,15 +136,17 @@ class externalbundlestore(abstractbundlestore):
)
if returncode != 0:
raise BundleWriteException(
"Failed to upload to external store: %s" % stderr
raise error.Abort(
"Infinitepush failed to upload bundle to external store: %s"
% stderr
)
stdout_lines = stdout.splitlines()
if len(stdout_lines) == 1:
return stdout_lines[0]
else:
raise BundleWriteException(
"Bad output from %s: %s" % (self.put_binary, stdout)
raise error.Abort(
"Infinitepush received bad output from %s: %s"
% (self.put_binary, stdout)
)
def read(self, handle):
@ -158,7 +161,5 @@ class externalbundlestore(abstractbundlestore):
)
if returncode != 0:
raise BundleReadException(
"Failed to download from external store: %s" % stderr
)
raise error.Abort("Failed to download from external store: %s" % stderr)
return temp.read()

View File

@ -14,8 +14,8 @@
import os
import posixpath
from indexapi import indexapi, indexexception
from edenscm.mercurial import pycompat, util
from edenscm.mercurial import error, pycompat, util
from edenscm.mercurial.i18n import _
if pycompat.iswindows:
@ -33,9 +33,17 @@ else:
return path
class fileindexapi(indexapi):
class fileindex(object):
"""File-based backend for infinitepush index.
This is a context manager. All write operations should use:
with index:
index.addbookmark(...)
...
"""
def __init__(self, repo):
super(fileindexapi, self).__init__()
self._repo = repo
root = repo.ui.config("infinitepush", "indexpath")
if not root:
@ -55,29 +63,40 @@ class fileindexapi(indexapi):
self._lock.__exit__(exc_type, exc_val, exc_tb)
def addbundle(self, bundleid, nodesctx):
"""Record a bundleid containing the given nodes."""
for node in nodesctx:
nodepath = os.path.join(self._nodemap, node.hex())
self._write(nodepath, bundleid)
def addbookmark(self, bookmark, node):
"""Record a bookmark pointing to a particular node."""
bookmarkpath = os.path.join(self._bookmarkmap, bookmark)
self._write(bookmarkpath, node)
def addmanybookmarks(self, bookmarks):
"""Record the contents of the ``bookmarks`` dict as bookmarks."""
for bookmark, node in bookmarks.items():
self.addbookmark(bookmark, node)
def deletebookmarks(self, patterns):
"""Delete all bookmarks that match any of the patterns in ``patterns``."""
for pattern in patterns:
for bookmark, _ in self._listbookmarks(pattern):
for bookmark, _node in self._listbookmarks(pattern):
bookmarkpath = os.path.join(self._bookmarkmap, bookmark)
self._delete(bookmarkpath)
def getbundle(self, node):
"""Get the bundleid for a bundle that contains the given node."""
nodepath = os.path.join(self._nodemap, node)
return self._read(nodepath)
def getnodebyprefix(self, hashprefix):
"""Get the node that matches the given hash prefix.
If there is no match, returns None.
If there are multiple matches, raises an exception."""
vfs = self._repo.localvfs
if not vfs.exists(self._nodemap):
return None
@ -89,21 +108,27 @@ class fileindexapi(indexapi):
return None
if len(nodefiles) > 1:
raise indexexception(
("ambiguous identifier '%s'\n" % hashprefix)
+ "suggestion: provide longer commithash prefix"
raise error.Abort(
_(
"ambiguous identifier '%s'\n"
"suggestion: provide longer commithash prefix"
)
% hashprefix
)
return nodefiles[0]
def getnode(self, bookmark):
"""Get the node for the given bookmark."""
bookmarkpath = os.path.join(self._bookmarkmap, bookmark)
return self._read(bookmarkpath)
def getbookmarks(self, query):
return sorted(self._listbookmarks(query))
def getbookmarks(self, pattern):
"""Get all bookmarks that match the pattern."""
return sorted(self._listbookmarks(pattern))
def saveoptionaljsonmetadata(self, node, jsonmetadata):
"""Save optional metadata for the given node."""
vfs = self._repo.localvfs
vfs.write(os.path.join(self._metadatamap, node), jsonmetadata)
@ -112,7 +137,7 @@ class fileindexapi(indexapi):
pattern = "re:^" + pattern[:-1] + ".*"
kind, pat, matcher = util.stringmatcher(pattern)
prefixlen = len(self._bookmarkmap) + 1
for dirpath, _, books in self._repo.localvfs.walk(self._bookmarkmap):
for dirpath, _dirs, books in self._repo.localvfs.walk(self._bookmarkmap):
for book in books:
bookmark = posixpath.join(dirpath, book)[prefixlen:]
if not matcher(bookmark):

View File

@ -1,75 +0,0 @@
# Infinite push
#
# Copyright 2016 Facebook, Inc.
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
class indexapi(object):
"""Class that manages access to infinitepush index.
This class is a context manager and all write operations (like
deletebookmarks, addbookmark etc) should use `with` statement:
with index:
index.deletebookmarks(...)
...
"""
def __init__(self):
"""Initializes the metadata store connection."""
def close(self):
"""Cleans up the metadata store connection."""
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def addbundle(self, bundleid, nodesctx):
"""Takes a bundleid and a list of node contexts for each node
in that bundle and records that."""
raise NotImplementedError()
def addbookmark(self, bookmark, node):
"""Takes a bookmark name and hash, and records mapping in the metadata
store."""
raise NotImplementedError()
def addmanybookmarks(self, bookmarks):
"""Takes a dict with mapping from bookmark to hash and records mapping
in the metadata store."""
raise NotImplementedError()
def deletebookmarks(self, patterns):
"""Accepts list of bookmarks and deletes them.
"""
raise NotImplementedError()
def getbundle(self, node):
"""Returns the bundleid for the bundle that contains the given node."""
raise NotImplementedError()
def getnodebyprefix(self, prefix):
"""Returns the node with the given hash prefix.
None if it doesn't exist. Raise error for ambiguous identifier"""
raise NotImplementedError()
def getnode(self, bookmark):
"""Returns the node for the given bookmark. None if it doesn't exist."""
raise NotImplementedError()
def getbookmarks(self, query):
"""Returns bookmarks that match the query"""
raise NotImplementedError()
def saveoptionaljsonmetadata(self, node, jsonmetadata):
"""Saves optional metadata for a given node"""
raise NotImplementedError()
class indexexception(Exception):
pass

View File

@ -11,8 +11,16 @@ import time
import warnings
import mysql.connector
from indexapi import indexapi, indexexception
from edenscm.mercurial import util
from edenscm.mercurial import error, util
from edenscm.mercurial.i18n import _
def _getloglevel(ui):
loglevel = ui.config("infinitepush", "loglevel", "DEBUG")
numeric_loglevel = getattr(logging, loglevel.upper(), None)
if not isinstance(numeric_loglevel, int):
raise error.Abort(_("invalid log level %s") % loglevel)
return numeric_loglevel
def _convertbookmarkpattern(pattern):
@ -30,27 +38,29 @@ def _convertbookmarkpattern(pattern):
SEC_IN_DAY = 24 * 60 * 60
class sqlindexapi(indexapi):
"""
Sql backend for infinitepush index. See schema.sql
class sqlindex(object):
"""SQL-based backend for infinitepush index.
See schema.sql for the SQL schema.
This is a context manager. All write operations should use:
with index:
index.addbookmark(...)
...
"""
def __init__(
self,
reponame,
host,
port,
database,
user,
password,
logfile,
loglevel,
shorthasholdrevthreshold,
waittimeout=600,
locktimeout=120,
):
super(sqlindexapi, self).__init__()
def __init__(self, repo):
ui = repo.ui
sqlhost = ui.config("infinitepush", "sqlhost")
if not sqlhost:
raise error.Abort(_("please set infinitepush.sqlhost"))
reponame = ui.config("infinitepush", "reponame")
if not reponame:
raise error.Abort(_("please set infinitepush.reponame"))
self.reponame = reponame
host, port, database, user, password = sqlhost.split(":")
self.sqlargs = {
"host": host,
"port": port,
@ -60,21 +70,22 @@ class sqlindexapi(indexapi):
}
self.sqlconn = None
self.sqlcursor = None
if not logfile:
logfile = os.devnull
logfile = ui.config("infinitepush", "logfile", os.devnull)
logging.basicConfig(filename=logfile)
self.log = logging.getLogger()
self.log.setLevel(loglevel)
self.log.setLevel(_getloglevel(ui))
self._connected = False
self._waittimeout = waittimeout
self._locktimeout = locktimeout
self.shorthasholdrevthreshold = shorthasholdrevthreshold
self._waittimeout = ui.configint("infinitepush", "waittimeout", 300)
self._locktimeout = ui.configint("infinitepush", "locktimeout", 120)
self.shorthasholdrevthreshold = ui.configint(
"infinitepush", "shorthasholdrevthreshold", 60
)
def sqlconnect(self):
if self.sqlconn:
raise indexexception("SQL connection already open")
raise error.Abort("SQL connection already open")
if self.sqlcursor:
raise indexexception("SQL cursor already open without connection")
raise error.Abort("SQL cursor already open without connection")
retry = 3
while True:
try:
@ -123,8 +134,7 @@ class sqlindexapi(indexapi):
self.sqlconn.rollback()
def addbundle(self, bundleid, nodesctx):
"""Records bundles, mapping from node to bundle and metadata for nodes
"""
"""Record a bundleid containing the given nodes."""
if not self._connected:
self.sqlconnect()
@ -171,8 +181,7 @@ class sqlindexapi(indexapi):
)
def addbookmark(self, bookmark, node):
"""Takes a bookmark name and hash, and records mapping in the metadata
store."""
"""Record a bookmark pointing to a particular node."""
if not self._connected:
self.sqlconnect()
self.log.info(
@ -185,7 +194,7 @@ class sqlindexapi(indexapi):
)
def addmanybookmarks(self, bookmarks):
"""Records mapping of bookmarks and nodes"""
"""Record the contents of the ``bookmarks`` dict as bookmarks."""
if not self._connected:
self.sqlconnect()
@ -200,10 +209,7 @@ class sqlindexapi(indexapi):
)
def deletebookmarks(self, patterns):
"""Accepts list of bookmark patterns and deletes them.
If `commit` is set then bookmark will actually be deleted. Otherwise
deletion will be delayed until the end of transaction.
"""
"""Delete all bookmarks that match any of the patterns in ``patterns``."""
if not self._connected:
self.sqlconnect()
@ -219,7 +225,7 @@ class sqlindexapi(indexapi):
self.sqlcursor.execute(query, params=[self.reponame] + patterns)
def getbundle(self, node):
"""Returns the bundleid for the bundle that contains the given node."""
"""Get the bundleid for a bundle that contains the given node."""
if not self._connected:
self.sqlconnect()
self.log.info("GET BUNDLE %r %r" % (self.reponame, node))
@ -236,9 +242,11 @@ class sqlindexapi(indexapi):
return bundle
def getnodebyprefix(self, prefix):
"""Returns the node with the given hash prefix.
None if it doesn't exist.
Raise error for ambiguous identifier"""
"""Get the node that matches the given hash prefix.
If there is no match, returns None.
If there are multiple matches, raises an exception."""
if not self._connected:
self.sqlconnect()
self.log.info("GET NODE BY PREFIX %r %r" % (self.reponame, prefix))
@ -286,7 +294,7 @@ class sqlindexapi(indexapi):
)
if len(result) > 1:
raise indexexception(
raise error.Abort(
("ambiguous identifier '%s'\n" % prefix)
+ "#commitcloud suggestions are:\n"
+ formatdata(result)
@ -296,7 +304,7 @@ class sqlindexapi(indexapi):
revdate = result[0][3]
threshold = self.shorthasholdrevthreshold * SEC_IN_DAY
if time.time() - revdate > threshold:
raise indexexception(
raise error.Abort(
"commit '%s' is more than %d days old\n"
"description:\n%s"
"#commitcloud hint: if you would like to fetch this "
@ -315,9 +323,9 @@ class sqlindexapi(indexapi):
result = self.sqlcursor.fetchall()
if len(result) > 1:
raise indexexception(
("ambiguous identifier '%s'\n" % prefix)
+ "suggestion: provide longer commithash prefix"
raise error.Abort(
"ambiguous identifier '%s'\n"
"suggestion: provide longer commithash prefix" % prefix
)
# result not found
@ -332,7 +340,7 @@ class sqlindexapi(indexapi):
return node
def getnode(self, bookmark):
"""Returns the node for the given bookmark. None if it doesn't exist."""
"""Get the node for the given bookmark."""
if not self._connected:
self.sqlconnect()
self.log.info("GET NODE reponame: %r bookmark: %r" % (self.reponame, bookmark))
@ -349,6 +357,7 @@ class sqlindexapi(indexapi):
return node
def getbookmarks(self, query):
"""Get all bookmarks that match the pattern."""
if not self._connected:
self.sqlconnect()
self.log.info("QUERY BOOKMARKS reponame: %r query: %r" % (self.reponame, query))
@ -371,6 +380,7 @@ class sqlindexapi(indexapi):
return bookmarks
def saveoptionaljsonmetadata(self, node, jsonmetadata):
"""Save optional metadata for the given node."""
if not self._connected:
self.sqlconnect()
self.log.info(