sapling/edenscm/hgext/infinitepush/fileindex.py
Thomas Orozco cfaec40232 infinitepush: add replaybookmarksqueue
Summary:
This adds a new queue to replay scratch bookmark changes into Mononoke, which will allow us to replay them there.

There's not a lot going on in Mercurial (i.e. in this diff) to achieve this: we simply record bookmark changes as they happen in infinitepush, and allow for excluding some bookmarks. Specifically, we'll want to exclude backup branches, which we don't want to copy, since a) there's way too many of them and b) they're deprecated in favor of Commit Cloud.

Currently, this does not allow for replaying deletions. That would require further rework of how we delete things, since right now we do it my matching on bookmark names in the DB, which means the Python side of things is not aware of which bookmarks exactly were deleted. I'm not aware of how much use this is currently getting, but I'll research that and add it in if necessary.

Finally, one thing that's worth calling out here is the `bookmark_hash` column in this table. This is here in case we need to scale out the replication of bookmarks across multiple workers.

Indeed, we'll always want the replication of any given bookmark to happen sequentially, so we should perform it in a single worker. However, if we have too many bookmarks to replicate, then that could become a bottleneck. If that happens, we'll want to scale out workers, which we can do by having each worker operate on separate bookmarks.

The `bookmark_hash` column allows us to evenly divide up the space of bookmarks across workers if that becomes necessary (e.g. we could have 16 workers: one for each first hex digit of the hash). We won't use `bookmark_hash` immediately, but since it's very cheap to add (just compute one hash in Mercurial and put it in the table), I'm adding it in this diff now in case we need it later to avoid the friction of having to re-redeploy hg servers for that.

Reviewed By: StanislavGlebik

Differential Revision: D15778665

fbshipit-source-id: c34898c1a66e5bec08663a0887adca263222300d
2019-06-17 06:19:12 -07:00

169 lines
5.3 KiB
Python

# Infinite push
#
# Copyright 2016 Facebook, Inc.
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
"""
[infinitepush]
# Server-side option. Used only if indextype=disk.
# Filesystem path to the index store
indexpath = PATH
"""
import os
import posixpath
from edenscm.mercurial import error, pycompat, util
from edenscm.mercurial.i18n import _
if pycompat.iswindows:
def _normalizepath(path):
# Remove known characters that is disallowed by Windows.
# ":" can appear in some tests where the path is joined like:
# "C:\\repo1\\.hg\\scratchbranches\\index\\bookmarkmap\\infinitepush/backups/test/HOSTNAME/C:\\repo2/heads"
return path.replace(":", "")
else:
def _normalizepath(path):
return path
class fileindex(object):
"""File-based backend for infinitepush index.
This is a context manager. All write operations should use:
with index:
index.addbookmark(...)
...
"""
def __init__(self, repo):
self._repo = repo
root = repo.ui.config("infinitepush", "indexpath")
if not root:
root = os.path.join("scratchbranches", "index")
self._nodemap = os.path.join(root, "nodemap")
self._bookmarkmap = os.path.join(root, "bookmarkmap")
self._metadatamap = os.path.join(root, "nodemetadatamap")
self._lock = None
def __enter__(self):
self._lock = self._repo.wlock()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._lock:
self._lock.__exit__(exc_type, exc_val, exc_tb)
def addbundle(self, bundleid, nodesctx):
"""Record a bundleid containing the given nodes."""
for node in nodesctx:
nodepath = os.path.join(self._nodemap, node.hex())
self._write(nodepath, bundleid)
def addbookmark(self, bookmark, node, _isbackup):
"""Record a bookmark pointing to a particular node."""
bookmarkpath = os.path.join(self._bookmarkmap, bookmark)
self._write(bookmarkpath, node)
def addmanybookmarks(self, bookmarks, isbackup):
"""Record the contents of the ``bookmarks`` dict as bookmarks."""
for bookmark, node in bookmarks.items():
self.addbookmark(bookmark, node, isbackup)
def deletebookmarks(self, patterns):
"""Delete all bookmarks that match any of the patterns in ``patterns``."""
for pattern in patterns:
for bookmark, _node in self._listbookmarks(pattern):
bookmarkpath = os.path.join(self._bookmarkmap, bookmark)
self._delete(bookmarkpath)
def getbundle(self, node):
"""Get the bundleid for a bundle that contains the given node."""
nodepath = os.path.join(self._nodemap, node)
return self._read(nodepath)
def getnodebyprefix(self, hashprefix):
"""Get the node that matches the given hash prefix.
If there is no match, returns None.
If there are multiple matches, raises an exception."""
vfs = self._repo.localvfs
if not vfs.exists(self._nodemap):
return None
files = vfs.listdir(self._nodemap)
nodefiles = filter(lambda n: n.startswith(hashprefix), files)
if not nodefiles:
return None
if len(nodefiles) > 1:
raise error.Abort(
_(
"ambiguous identifier '%s'\n"
"suggestion: provide longer commithash prefix"
)
% hashprefix
)
return nodefiles[0]
def getnode(self, bookmark):
"""Get the node for the given bookmark."""
bookmarkpath = os.path.join(self._bookmarkmap, bookmark)
return self._read(bookmarkpath)
def getbookmarks(self, pattern):
"""Get all bookmarks that match the pattern."""
return sorted(self._listbookmarks(pattern))
def saveoptionaljsonmetadata(self, node, jsonmetadata):
"""Save optional metadata for the given node."""
vfs = self._repo.localvfs
vfs.write(os.path.join(self._metadatamap, node), jsonmetadata)
def _listbookmarks(self, pattern):
if pattern.endswith("*"):
pattern = "re:^" + pattern[:-1] + ".*"
kind, pat, matcher = util.stringmatcher(pattern)
prefixlen = len(self._bookmarkmap) + 1
for dirpath, _dirs, books in self._repo.localvfs.walk(self._bookmarkmap):
for book in books:
bookmark = posixpath.join(dirpath, book)[prefixlen:]
if not matcher(bookmark):
continue
yield bookmark, self._read(os.path.join(dirpath, book))
def _write(self, path, value):
vfs = self._repo.localvfs
path = _normalizepath(path)
dirname = vfs.dirname(path)
if not vfs.exists(dirname):
vfs.makedirs(dirname)
vfs.write(path, value)
def _read(self, path):
vfs = self._repo.localvfs
path = _normalizepath(path)
if not vfs.exists(path):
return None
return vfs.read(path)
def _delete(self, path):
vfs = self._repo.localvfs
path = _normalizepath(path)
if not vfs.exists(path):
return
return vfs.unlink(path)