sapling/infinitepush/__init__.py
Stanislau Hlebik ba3ebe099e infinitepush: fix interaction with pushrebase
Summary:
infinitepush bundle2 part should be run before pushrebase bundle2 part. This is a bit tricky since both pushrebase and infinitepush change the `extensions._order` and both of them try to load before bundle2 `changeset` part.

I suggest to load before 'changeset' or pushrebase part whichever is earlier. Then we have two situations:
1) pushrebase comes first. It sets it's part before 'changeset' part, then infinitepush sets it's part before pushrebase part.
2) infinitepush comes first. It sets part before 'changeset' and pushrebase part, then pushrebase sets it's part before 'changeset' part. Since infinitepush part comes before 'changeset' part it still be before pushrebase part.

This solution is not robust but it should be fine for now. The proper solution requires changes upstream that will introduce dependencies between bundle2 parts.

Test Plan: Run `test-infinitepush-bundlestore.t`

Reviewers: rmcelroy, mitrandir, durham

Reviewed By: durham

Subscribers: mjpieters

Differential Revision: https://phabricator.intern.facebook.com/D3930697

Tasks: 12479677

Signature: t1:3930697:1475079064:1b5851df4d2067ec21ec01e7510ddf83ff1684ad
2016-10-02 02:54:03 -07:00

568 lines
19 KiB
Python

# Infinite push
#
# Copyright 2016 Facebook, Inc.
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
from __future__ import absolute_import
import errno
import logging
import os
import resource
import tempfile
from mercurial import (
bundle2,
changegroup,
cmdutil,
commands,
discovery,
encoding,
error,
exchange,
extensions,
hg,
localrepo,
util,
pushkey,
revset,
phases,
wireproto,
)
from mercurial.extensions import wrapcommand, wrapfunction
from mercurial.hg import repository
from mercurial.node import bin, hex
from mercurial.i18n import _
from mercurial.peer import batchable, future
from mercurial.wireproto import encodelist, decodelist
from . import store, indexapi
cmdtable = {}
command = cmdutil.command(cmdtable)
pushrebaseparttype = 'b2x:rebase'
scratchbranchparttype = 'b2x:infinitepush'
experimental = 'experimental'
configbookmark = 'server-bundlestore-bookmark'
configcreate = 'server-bundlestore-create'
configscratchpush = 'infinitepush-scratchpush'
def _buildexternalbundlestore(ui):
put_args = ui.configlist('infinitepush', 'put_args', [])
put_binary = ui.config('infinitepush', 'put_binary')
if not put_binary:
raise error.Abort('put binary is not specified')
get_args = ui.configlist('infinitepush', 'get_args', [])
get_binary = ui.config('infinitepush', 'get_binary')
if not get_binary:
raise error.Abort('get binary is not specified')
return store.externalbundlestore(put_binary, put_args, get_binary, get_args)
def _buildsqlindex(ui):
sqlhost = ui.config('infinitepush', 'sqlhost')
if not sqlhost:
raise error.Abort(_('please set infinitepush.sqlhost'))
host, port, db, user, password = sqlhost.split(':')
reponame = ui.config('infinitepush', 'reponame')
if not reponame:
raise error.Abort(_('please set infinitepush.reponame'))
logfile = ui.config('infinitepush', 'logfile', '')
return indexapi.sqlindexapi(
reponame, host, port, db, user, password,
logfile, _getloglevel(ui))
def _getloglevel(ui):
loglevel = ui.config('infinitepush', 'loglevel', 'DEBUG')
numeric_loglevel = getattr(logging, loglevel.upper(), None)
if not isinstance(numeric_loglevel, int):
raise error.Abort(_('invalid log level %s') % loglevel)
return numeric_loglevel
class bundlestore(object):
def __init__(self, repo):
self._repo = repo
storetype = self._repo.ui.config('infinitepush', 'storetype', 'disk')
if storetype == 'disk':
self.store = store.filebundlestore(self._repo.ui, self._repo)
elif storetype == 'external':
self.store = _buildexternalbundlestore(self._repo.ui)
indextype = self._repo.ui.config('infinitepush', 'indextype', 'disk')
if indextype == 'disk':
self.index = indexapi.fileindexapi(self._repo)
elif indextype == 'sql':
self.index = _buildsqlindex(self._repo.ui)
else:
raise error.Abort(
_('unknown infinitepush index type specified %s') % indextype)
def reposetup(ui, repo):
if repo.local():
repo.bundlestore = bundlestore(repo)
def uisetup(ui):
# remotenames circumvents the default push implementation entirely, so make
# sure we load after it so that we wrap it.
order = extensions._order
order.remove('infinitepush')
order.append('infinitepush')
extensions._order = order
def extsetup(ui):
commonsetup(ui)
isserver = ui.configbool('infinitepush', 'server')
if isserver:
serverextsetup(ui)
else:
clientextsetup(ui)
def commonsetup(ui):
wireproto.commands['listkeyspatterns'] = (
wireprotolistkeyspatterns, 'namespace patterns')
def serverextsetup(ui):
origpushkeyhandler = bundle2.parthandlermapping['pushkey']
def newpushkeyhandler(*args, **kwargs):
bundle2pushkey(origpushkeyhandler, *args, **kwargs)
newpushkeyhandler.params = origpushkeyhandler.params
bundle2.parthandlermapping['pushkey'] = newpushkeyhandler
wrapfunction(localrepo.localrepository, 'listkeys', localrepolistkeys)
wireproto.commands['lookup'] = (
_lookupwrap(wireproto.commands['lookup'][0]), 'key')
wrapfunction(exchange, 'getbundle', getbundle)
def clientextsetup(ui):
entry = wrapcommand(commands.table, 'push', _push)
# Don't add the 'to' arg if it already exists
if not any(a for a in entry[1] if a[1] == 'to'):
entry[1].append(('', 'to', '', _('push revs to this bookmark')))
if not any(a for a in entry[1] if a[1] == 'create'):
entry[1].append(
('', 'create', None, _('create a new remote bookmark')))
entry[1].append(
('', 'bundle-store', None,
_('force push to go to bundle store (EXPERIMENTAL)')))
wrapcommand(commands.table, 'pull', _pull)
wrapfunction(discovery, 'checkheads', _checkheads)
wireproto.wirepeer.listkeyspatterns = listkeyspatterns
# Move infinitepush part before pushrebase part
# to avoid generation of both parts.
partorder = exchange.b2partsgenorder
index = partorder.index('changeset')
if pushrebaseparttype in partorder:
index = min(index, partorder.index(pushrebaseparttype))
partorder.insert(
index, partorder.pop(partorder.index(scratchbranchparttype)))
def _checkheads(orig, pushop):
if pushop.ui.configbool(experimental, configscratchpush, False):
return
return orig(pushop)
def wireprotolistkeyspatterns(repo, proto, namespace, patterns):
patterns = decodelist(patterns)
d = repo.listkeys(encoding.tolocal(namespace), patterns).items()
return pushkey.encodekeys(d)
def localrepolistkeys(orig, self, namespace, patterns=None):
if namespace == 'bookmarks' and patterns:
scratchbranchpat = self.ui.config('infinitepush',
'branchpattern', '')
kind, pat, matcher = util.stringmatcher(scratchbranchpat)
index = self.bundlestore.index
results = {}
patterns = set(patterns)
# TODO(stash): this function has a limitation:
# patterns are not actually patterns, just simple string comparison
for bookmark in patterns:
if matcher(bookmark):
# TODO(stash): use `getbookmarks()` method
node = index.getnode(bookmark)
if node:
results[bookmark] = node
bookmarks = orig(self, namespace)
for bookmark, node in bookmarks.items():
if bookmark in patterns:
results[bookmark] = node
return results
else:
return orig(self, namespace)
@batchable
def listkeyspatterns(self, namespace, patterns):
if not self.capable('pushkey'):
yield {}, None
f = future()
self.ui.debug('preparing listkeys for "%s" with pattern "%s"\n' %
(namespace, patterns))
yield {
'namespace': encoding.fromlocal(namespace),
'patterns': encodelist(patterns)
}, f
d = f.value
self.ui.debug('received listkey for "%s": %i bytes\n'
% (namespace, len(d)))
yield pushkey.decodekeys(d)
def getbundle(orig, repo, source, heads=None, common=None, bundlecaps=None,
**kwargs):
# Check if heads exists, if not, check bundle store
hasscratchnode = False
for head in heads:
if head not in repo.changelog.nodemap:
if hasscratchnode:
raise error.Abort(
'not implemented: not possible to pull more than '
'one scratch branch')
index = repo.bundlestore.index
store = repo.bundlestore.store
bundleid = index.getbundle(hex(head))
bundleraw = store.read(bundleid)
bundlefile = _makebundlefromraw(bundleraw)
bundlepath = "bundle:%s+%s" % (repo.root, bundlefile)
bundlerepo = repository(repo.ui, bundlepath)
repo = bundlerepo
hasscratchnode = True
return orig(repo, source, heads=heads, common=common,
bundlecaps=bundlecaps, **kwargs)
def _lookupwrap(orig):
def _lookup(repo, proto, key):
scratchbranchpat = repo.ui.config('infinitepush', 'branchpattern')
if not scratchbranchpat:
return orig(repo, proto, key)
kind, pat, matcher = util.stringmatcher(scratchbranchpat)
localkey = encoding.tolocal(key)
if isinstance(localkey, str) and matcher(localkey):
scratchnode = repo.bundlestore.index.getnode(localkey)
if scratchnode:
return "%s %s\n" % (1, scratchnode)
else:
return "%s %s\n" % (0, 'scratch branch %s not found' % localkey)
else:
try:
c = repo[localkey]
r = c.hex()
return "%s %s\n" % (1, r)
except Exception as inst:
if repo.bundlestore.index.getbundle(localkey):
return "%s %s\n" % (1, localkey)
else:
r = str(inst)
return "%s %s\n" % (0, r)
return _lookup
def validaterevset(repo, revset):
"""Abort if the revs to be pushed aren't valid for a scratch branch."""
if not repo.revs(revset):
raise error.Abort(_('nothing to push'))
heads = repo.revs('heads(%r)', revset)
if len(heads) > 1:
raise error.Abort(
_('cannot push more than one head to a scratch branch'))
def getscratchbranchpart(repo, peer, outgoing, bookmark, create):
if not outgoing.missing:
raise error.Abort(_('no commits to push'))
if scratchbranchparttype not in bundle2.bundle2caps(peer):
raise error.Abort(_('no server support for %r') % scratchbranchparttype)
validaterevset(repo, revset.formatspec('%ln', outgoing.missing))
cg = changegroup.getlocalchangegroupraw(repo, 'push', outgoing)
params = {}
if bookmark:
params['bookmark'] = bookmark
params['create'] = '1' if create else '0'
# .upper() marks this as a mandatory part: server will abort if there's no
# handler
return bundle2.bundlepart(
scratchbranchparttype.upper(),
advisoryparams=params.items(),
data=cg)
def _pull(orig, ui, repo, source="default", **opts):
# Copy paste from `pull` command
source, branches = hg.parseurl(ui.expandpath(source), opts.get('branch'))
other = hg.peer(repo, opts, source)
hasscratchbookmarks = False
scratchbookmarks = {}
scratchbranchpat = ui.config('infinitepush', 'branchpattern')
if opts.get('bookmark') and scratchbranchpat:
kind, pat, matcher = util.stringmatcher(scratchbranchpat)
bookmarks = []
revs = opts.get('rev') or []
for bookmark in opts.get('bookmark'):
if matcher(bookmark):
if hasscratchbookmarks:
raise error.Abort('not implemented: not possible to pull '
'more than one scratch branch')
fetchedbookmarks = other.listkeyspatterns('bookmarks',
patterns=[bookmark])
if bookmark not in fetchedbookmarks:
raise error.Abort('remote bookmark %s not found!' %
bookmark)
scratchbookmarks[bookmark] = fetchedbookmarks[bookmark]
revs.append(fetchedbookmarks[bookmark])
hasscratchbookmarks = True
else:
bookmarks.append(bookmark)
opts['bookmark'] = bookmarks
opts['rev'] = revs
if hasscratchbookmarks:
# Set anyincoming to True
oldfindcommonincoming = wrapfunction(discovery,
'findcommonincoming',
_findcommonincoming)
try:
result = orig(ui, repo, source, **opts)
# TODO(stash): race condition is possible
# if scratch bookmarks was updated right after orig.
# But that's unlikely and shouldn't be harmful.
with repo.wlock():
with repo.lock():
with repo.transaction('bookmark') as tr:
for scratchbook, node in scratchbookmarks.items():
repo._bookmarks[scratchbook] = bin(node)
repo._bookmarks.recordchange(tr)
return result
finally:
if hasscratchbookmarks:
discovery.findcommonincoming = oldfindcommonincoming
def _findcommonincoming(orig, *args, **kwargs):
common, inc, remoteheads = orig(*args, **kwargs)
return common, True, remoteheads
def _push(orig, ui, repo, *args, **opts):
oldbookmark = ui.backupconfig(experimental, configbookmark)
oldcreate = ui.backupconfig(experimental, configcreate)
oldphasemove = None
try:
bookmark = opts.get('to')
create = opts.get('create') or False
scratchpush = opts.get('bundle_store')
scratchbranchpat = ui.config('infinitepush', 'branchpattern', '')
kind, pat, matcher = util.stringmatcher(scratchbranchpat)
if matcher(bookmark):
ui.setconfig(experimental, configbookmark, bookmark, '--to')
ui.setconfig(experimental, configcreate, create, '--create')
scratchpush = True
if scratchpush:
ui.setconfig(experimental, configscratchpush, True)
oldphasemove = wrapfunction(exchange,
'_localphasemove',
_phasemove)
result = orig(ui, repo, *args, **opts)
finally:
ui.restoreconfig(oldbookmark)
ui.restoreconfig(oldcreate)
if oldphasemove:
exchange._localphasemove = oldphasemove
return result
def _phasemove(orig, pushop, nodes, phase=phases.public):
"""prevent commits from being marked public
Since these are going to a scratch branch, they aren't really being
published."""
if phase != phases.public:
orig(pushop, nodes, phase)
@exchange.b2partsgenerator(scratchbranchparttype)
def partgen(pushop, bundler):
bookmark = pushop.ui.config(experimental, configbookmark)
create = pushop.ui.configbool(experimental, configcreate)
scratchpush = pushop.ui.configbool(experimental, configscratchpush)
if 'changesets' in pushop.stepsdone or not scratchpush:
return
if scratchbranchparttype not in bundle2.bundle2caps(pushop.remote):
return
pushop.stepsdone.add('changesets')
if not pushop.outgoing.missing:
pushop.ui.status(_('no changes found\n'))
pushop.cgresult = 0
return
scratchpart = getscratchbranchpart(pushop.repo,
pushop.remote,
pushop.outgoing,
bookmark,
create)
bundler.addpart(scratchpart)
def handlereply(op):
# server either succeeds or aborts; no code to read
pushop.cgresult = 1
return handlereply
bundle2.capabilities[scratchbranchparttype] = ()
def _makebundlefile(part):
"""constructs a temporary bundle file
part.data should be an uncompressed v1 changegroup"""
fp = None
fd, bundlefile = tempfile.mkstemp()
try: # guards bundlefile
try: # guards fp
fp = os.fdopen(fd, 'wb')
magic = 'HG10UN'
fp.write(magic)
data = part.read(resource.getpagesize() - len(magic))
while data:
fp.write(data)
data = part.read(resource.getpagesize())
finally:
fp.close()
except Exception:
try:
os.unlink(bundlefile)
except Exception:
# we would rather see the original exception
pass
raise
return bundlefile
def _makebundlefromraw(data):
fp = None
fd, bundlefile = tempfile.mkstemp()
try: # guards bundlefile
try: # guards fp
fp = os.fdopen(fd, 'wb')
fp.write(data)
finally:
fp.close()
except Exception:
try:
os.unlink(bundlefile)
except Exception:
# we would rather see the original exception
pass
raise
return bundlefile
def _getrevs(bundle, oldnode):
'extracts and validates the revs to be imported'
validaterevset(bundle, 'bundle()')
revs = [bundle[r] for r in bundle.revs('sort(bundle())')]
# new bookmark
if oldnode is None:
return revs
# Fast forward update
if oldnode in bundle and list(bundle.set('bundle() & %s::', oldnode)):
return revs
raise error.Abort(
_('non-forward pushes are not allowed for scratch branches'))
@bundle2.parthandler(scratchbranchparttype, ('bookmark', 'create',))
def bundle2scratchbranch(op, part):
'''unbundle a bundle2 part containing a changegroup to store'''
params = part.params
index = op.repo.bundlestore.index
store = op.repo.bundlestore.store
op.records.add(scratchbranchparttype + '_skippushkey', True)
bundlefile = None
try: # guards bundlefile
bundlefile = _makebundlefile(part)
bundlepath = "bundle:%s+%s" % (op.repo.root, bundlefile)
bundle = repository(op.repo.ui, bundlepath)
bookmark = params.get('bookmark')
create = params.get('create')
if bookmark:
oldnode = index.getnode(bookmark)
if not oldnode and create != "1":
raise error.Abort("unknown bookmark %s" % bookmark,
hint="use --create if you want to create one")
else:
oldnode = None
revs = _getrevs(bundle, oldnode)
# Notify the user of what is being pushed
plural = 's' if len(revs) > 1 else ''
op.repo.ui.warn(_("pushing %s commit%s:\n") % (len(revs), plural))
maxoutput = 10
for i in range(0, min(len(revs), maxoutput)):
firstline = bundle[revs[i]].description().split('\n')[0][:50]
op.repo.ui.warn((" %s %s\n") % (revs[i], firstline))
if len(revs) > maxoutput + 1:
op.repo.ui.warn((" ...\n"))
firstline = bundle[revs[-1]].description().split('\n')[0][:50]
op.repo.ui.warn((" %s %s\n") % (revs[-1], firstline))
nodes = [hex(rev.node()) for rev in revs]
newnodes = filter(lambda node: not index.getbundle(node), nodes)
with open(bundlefile, 'r') as f:
key = store.write(f.read())
if bookmark:
try:
index.addbookmarkandbundle(key, newnodes,
bookmark, newnodes[-1])
except NotImplementedError:
index.addbookmark(bookmark, newnodes[-1])
index.addbundle(key, newnodes)
else:
index.addbundle(key, newnodes)
finally:
try:
if bundlefile:
os.unlink(bundlefile)
except OSError as e:
if e.errno != errno.ENOENT:
raise
return 1
def bundle2pushkey(orig, op, part):
if op.records[scratchbranchparttype + '_skippushkey']:
if op.reply is not None:
rpart = op.reply.newpart('reply:pushkey')
rpart.addparam('in-reply-to', str(part.id), mandatory=False)
rpart.addparam('return', '1', mandatory=False)
return 1
return orig(op, part)