infinitepush: refactor common functions into common modules

Reviewed By: DurhamG

Differential Revision: D15170161

fbshipit-source-id: c3e6e9f309a22ac6ed258f484ca625633c434405
This commit is contained in:
Mark Thomas 2019-05-02 07:02:15 -07:00 committed by Facebook Github Bot
parent 23f7bbf387
commit 0327e01223
5 changed files with 226 additions and 222 deletions

View File

@ -100,28 +100,15 @@ Configs::
from __future__ import absolute_import
import errno
import json
import os
import struct
import tempfile
from edenscm.mercurial import (
bundle2,
changegroup,
discovery,
encoding,
error,
exchange,
extensions,
i18n,
mutation,
node as nodemod,
pushkey,
util,
wireproto,
)
from edenscm.mercurial.commands import debug as debugcommands
from edenscm.mercurial.i18n import _
from . import bundleparts, bundlestore, client, common, infinitepushcommands, server
@ -136,24 +123,6 @@ colortable = {
}
def _debugbundle2part(orig, ui, part, all, **opts):
if part.type == bundleparts.scratchmutationparttype:
entries = mutation.mutationstore.unbundle(part.read())
ui.write((" %s entries\n") % len(entries))
for entry in entries:
pred = ",".join([nodemod.hex(p) for p in entry.preds()])
succ = nodemod.hex(entry.succ())
split = entry.split()
if split:
succ = ",".join([nodemod.hex(s) for s in split] + [succ])
ui.write(
(" %s -> %s (%s by %s at %s)\n")
% (pred, succ, entry.op(), entry.user(), entry.time())
)
orig(ui, part, all, **opts)
def reposetup(ui, repo):
common.reposetup(ui, repo)
if common.isserver(ui) and repo.local():
@ -167,128 +136,18 @@ def uisetup(ui):
order.remove("infinitepush")
order.append("infinitepush")
extensions._order = order
# Register bundleparts capabilities and handlers.
bundleparts.uisetup(ui)
def extsetup(ui):
commonextsetup(ui)
common.extsetup(ui)
if common.isserver(ui):
server.extsetup(ui)
else:
client.extsetup(ui)
def commonextsetup(ui):
wireproto.commands["listkeyspatterns"] = (
wireprotolistkeyspatterns,
"namespace patterns",
)
wireproto.commands["knownnodes"] = (wireprotoknownnodes, "nodes *")
extensions.wrapfunction(debugcommands, "_debugbundle2part", _debugbundle2part)
def wireprotolistkeyspatterns(repo, proto, namespace, patterns):
patterns = wireproto.decodelist(patterns)
d = repo.listkeys(encoding.tolocal(namespace), patterns).iteritems()
return pushkey.encodekeys(d)
def wireprotoknownnodes(repo, proto, nodes, others):
"""similar to 'known' but also check in infinitepush storage"""
nodes = wireproto.decodelist(nodes)
knownlocally = repo.known(nodes)
for index, known in enumerate(knownlocally):
# TODO: make a single query to the bundlestore.index
if not known and repo.bundlestore.index.getnodebyprefix(
nodemod.hex(nodes[index])
):
knownlocally[index] = True
return "".join(b and "1" or "0" for b in knownlocally)
def _decodebookmarks(stream):
sizeofjsonsize = struct.calcsize(">i")
size = struct.unpack(">i", stream.read(sizeofjsonsize))[0]
unicodedict = json.loads(stream.read(size))
# python json module always returns unicode strings. We need to convert
# it back to bytes string
result = {}
for bookmark, node in unicodedict.iteritems():
bookmark = bookmark.encode("ascii")
node = node.encode("ascii")
result[bookmark] = node
return result
bundle2.capabilities[bundleparts.scratchbranchparttype] = ()
bundle2.capabilities[bundleparts.scratchbookmarksparttype] = ()
bundle2.capabilities[bundleparts.scratchmutationparttype] = ()
@bundle2.b2streamparamhandler("infinitepush")
def processinfinitepush(unbundler, param, value):
""" process the bundle2 stream level parameter containing whether this push
is an infinitepush or not. """
if value and unbundler.ui.configbool("infinitepush", "bundle-stream", False):
pass
@bundle2.parthandler(
bundleparts.scratchbranchparttype, ("bookmark", "create", "force", "cgversion")
)
def bundle2scratchbranch(op, part):
"""unbundle a bundle2 part containing a changegroup to store"""
bundler = bundle2.bundle20(op.repo.ui)
cgversion = part.params.get("cgversion", "01")
cgpart = bundle2.bundlepart("changegroup", data=part.read())
cgpart.addparam("version", cgversion)
bundler.addpart(cgpart)
buf = util.chunkbuffer(bundler.getchunks())
fd, bundlefile = tempfile.mkstemp()
try:
try:
fp = os.fdopen(fd, "wb")
fp.write(buf.read())
finally:
fp.close()
server.storebundle(op, part.params, bundlefile)
finally:
try:
os.unlink(bundlefile)
except OSError as e:
if e.errno != errno.ENOENT:
raise
return 1
@bundle2.parthandler(bundleparts.scratchbookmarksparttype)
def bundle2scratchbookmarks(op, part):
"""Handler deletes bookmarks first then adds new bookmarks.
"""
index = op.repo.bundlestore.index
decodedbookmarks = _decodebookmarks(part)
toinsert = {}
todelete = []
for bookmark, node in decodedbookmarks.iteritems():
if node:
toinsert[bookmark] = node
else:
todelete.append(bookmark)
log = server._getorcreateinfinitepushlogger(op)
with server.logservicecall(log, bundleparts.scratchbookmarksparttype), index:
if todelete:
index.deletebookmarks(todelete)
if toinsert:
index.addmanybookmarks(toinsert)
@bundle2.parthandler(bundleparts.scratchmutationparttype)
def bundle2scratchmutation(op, part):
mutation.unbundle(op.repo, part.read())
def _deltaparent(orig, self, revlog, rev, p1, p2, prev):
# This version of deltaparent prefers p1 over prev to use less space
dp = revlog.deltaparent(rev)

View File

@ -3,14 +3,21 @@
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
import json
import struct
from edenscm.mercurial import error, extensions, node as nodemod
from edenscm.mercurial.i18n import _
from . import common
def remotebookmarksenabled(ui):
return "remotenames" in extensions._extensions and ui.configbool(
"remotenames", "bookmarks"
)
def readremotebookmarks(ui, repo, other):
if common.isremotebooksenabled(ui):
if remotebookmarksenabled(ui):
remotenamesext = extensions.find("remotenames")
remotepath = remotenamesext.activepath(repo.ui, other)
result = {}
@ -98,3 +105,26 @@ def deleteremotebookmarks(ui, repo, path, names):
bookmarks[name] = node
remotenamesext.saveremotenames(repo, path, bookmarks)
def encodebookmarks(bookmarks):
encoded = {}
for bookmark, node in bookmarks.iteritems():
encoded[bookmark] = node
dumped = json.dumps(encoded)
result = struct.pack(">i", len(dumped)) + dumped
return result
def decodebookmarks(stream):
sizeofjsonsize = struct.calcsize(">i")
size = struct.unpack(">i", stream.read(sizeofjsonsize))[0]
unicodedict = json.loads(stream.read(size))
# python json module always returns unicode strings. We need to convert
# it back to bytes string
result = {}
for bookmark, node in unicodedict.iteritems():
bookmark = bookmark.encode("ascii")
node = node.encode("ascii")
result[bookmark] = node
return result

View File

@ -3,18 +3,24 @@
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
import errno
import os
import tempfile
from edenscm.mercurial import (
bundle2,
changegroup,
error,
exchange,
extensions,
mutation,
node as nodemod,
revsetlang,
util,
)
from edenscm.mercurial.i18n import _
from .common import encodebookmarks
from . import bookmarks, server
scratchbranchparttype = "b2x:infinitepush"
@ -23,6 +29,60 @@ scratchmutationparttype = "b2x:infinitepushmutation"
pushrebaseparttype = "b2x:rebase"
def uisetup(ui):
bundle2.capabilities[scratchbranchparttype] = ()
bundle2.capabilities[scratchbookmarksparttype] = ()
bundle2.capabilities[scratchmutationparttype] = ()
@exchange.b2partsgenerator(scratchbranchparttype)
def partgen(pushop, bundler):
bookmark = pushop.ui.config("experimental", "server-bundlestore-bookmark")
bookmarknode = pushop.ui.config("experimental", "server-bundlestore-bookmarknode")
create = pushop.ui.configbool("experimental", "server-bundlestore-create")
scratchpush = pushop.ui.configbool("experimental", "infinitepush-scratchpush")
if "changesets" in pushop.stepsdone or not scratchpush:
return
if scratchbranchparttype not in bundle2.bundle2caps(pushop.remote):
return
pushop.stepsdone.add("changesets")
pushop.stepsdone.add("treepack")
if not bookmark and not pushop.outgoing.missing:
pushop.ui.status(_("no changes found\n"))
pushop.cgresult = 0
return
# This parameter tells the server that the following bundle is an
# infinitepush. This let's it switch the part processing to our infinitepush
# code path.
bundler.addparam("infinitepush", "True")
nonforwardmove = pushop.force or pushop.ui.configbool(
"experimental", "non-forward-move"
)
scratchparts = getscratchbranchparts(
pushop.repo,
pushop.remote,
pushop.outgoing,
nonforwardmove,
pushop.ui,
bookmark,
create,
bookmarknode,
)
for scratchpart in scratchparts:
bundler.addpart(scratchpart)
def handlereply(op):
# server either succeeds or aborts; no code to read
pushop.cgresult = 1
return handlereply
def getscratchbranchparts(
repo, peer, outgoing, confignonforwardmove, ui, bookmark, create, bookmarknode=None
):
@ -89,12 +149,13 @@ def getscratchbranchparts(
return parts
def getscratchbookmarkspart(peer, bookmarks):
def getscratchbookmarkspart(peer, scratchbookmarks):
if scratchbookmarksparttype not in bundle2.bundle2caps(peer):
raise error.Abort(_("no server support for %r") % scratchbookmarksparttype)
return bundle2.bundlepart(
scratchbookmarksparttype.upper(), data=encodebookmarks(bookmarks)
scratchbookmarksparttype.upper(),
data=bookmarks.encodebookmarks(scratchbookmarks),
)
@ -146,3 +207,86 @@ class copiedpart(object):
return self._io.read()
else:
return self._io.read(size)
@bundle2.b2streamparamhandler("infinitepush")
def processinfinitepush(unbundler, param, value):
""" process the bundle2 stream level parameter containing whether this push
is an infinitepush or not. """
if value and unbundler.ui.configbool("infinitepush", "bundle-stream", False):
pass
@bundle2.parthandler(
scratchbranchparttype, ("bookmark", "create", "force", "cgversion")
)
def bundle2scratchbranch(op, part):
"""unbundle a bundle2 part containing a changegroup to store"""
bundler = bundle2.bundle20(op.repo.ui)
cgversion = part.params.get("cgversion", "01")
cgpart = bundle2.bundlepart("changegroup", data=part.read())
cgpart.addparam("version", cgversion)
bundler.addpart(cgpart)
buf = util.chunkbuffer(bundler.getchunks())
fd, bundlefile = tempfile.mkstemp()
try:
try:
fp = os.fdopen(fd, "wb")
fp.write(buf.read())
finally:
fp.close()
server.storebundle(op, part.params, bundlefile)
finally:
try:
os.unlink(bundlefile)
except OSError as e:
if e.errno != errno.ENOENT:
raise
return 1
@bundle2.parthandler(scratchbookmarksparttype)
def bundle2scratchbookmarks(op, part):
"""Handler deletes bookmarks first then adds new bookmarks.
"""
index = op.repo.bundlestore.index
decodedbookmarks = bookmarks.decodebookmarks(part)
toinsert = {}
todelete = []
for bookmark, node in decodedbookmarks.iteritems():
if node:
toinsert[bookmark] = node
else:
todelete.append(bookmark)
log = server._getorcreateinfinitepushlogger(op)
with server.logservicecall(log, scratchbookmarksparttype), index:
if todelete:
index.deletebookmarks(todelete)
if toinsert:
index.addmanybookmarks(toinsert)
@bundle2.parthandler(scratchmutationparttype)
def bundle2scratchmutation(op, part):
mutation.unbundle(op.repo, part.read())
def debugbundle2part(orig, ui, part, all, **opts):
if part.type == scratchmutationparttype:
entries = mutation.mutationstore.unbundle(part.read())
ui.write((" %s entries\n") % len(entries))
for entry in entries:
pred = ",".join([nodemod.hex(p) for p in entry.preds()])
succ = nodemod.hex(entry.succ())
split = entry.split()
if split:
succ = ",".join([nodemod.hex(s) for s in split] + [succ])
ui.write(
(" %s -> %s (%s by %s at %s)\n")
% (pred, succ, entry.op(), entry.user(), entry.time())
)
orig(ui, part, all, **opts)

View File

@ -32,7 +32,7 @@ from edenscm.mercurial import (
)
from edenscm.mercurial.i18n import _
from . import bookmarks, bundleparts, common
from . import bookmarks, bundleparts
_maybehash = re.compile(r"^[a-f0-9]+$").search
@ -184,7 +184,7 @@ def _push(orig, ui, repo, dest=None, *args, **opts):
# know about them. Let's save it before push and restore after
remotescratchbookmarks = bookmarks.readremotebookmarks(ui, repo, dest)
result = orig(ui, repo, dest, *args, **opts)
if common.isremotebooksenabled(ui):
if bookmarks.remotebookmarksenabled(ui):
if bookmark and scratchpush:
other = hg.peer(repo, opts, dest)
fetchedbookmarks = other.listkeyspatterns(
@ -363,7 +363,7 @@ def _dopull(orig, ui, repo, source="default", **opts):
# TODO(stash): race condition is possible
# if scratch bookmarks was updated right after orig.
# But that's unlikely and shouldn't be harmful.
if common.isremotebooksenabled(ui):
if bookmarks.remotebookmarksenabled(ui):
remotescratchbookmarks.update(scratchbookmarks)
bookmarks.saveremotebookmarks(repo, remotescratchbookmarks, source)
else:
@ -461,7 +461,7 @@ def _tryhoist(ui, remotebookmark):
infinitepush.
"""
if common.isremotebooksenabled(ui):
if bookmarks.remotebookmarksenabled(ui):
hoist = ui.config("remotenames", "hoist") + "/"
if remotebookmark.startswith(hoist):
return remotebookmark[len(hoist) :]
@ -502,51 +502,3 @@ def knownnodes(self, nodes):
yield [bool(int(b)) for b in d]
except ValueError:
error.Abort(error.ResponseError(_("unexpected response:"), d))
@exchange.b2partsgenerator(bundleparts.scratchbranchparttype)
def partgen(pushop, bundler):
bookmark = pushop.ui.config("experimental", "server-bundlestore-bookmark")
bookmarknode = pushop.ui.config("experimental", "server-bundlestore-bookmarknode")
create = pushop.ui.configbool("experimental", "server-bundlestore-create")
scratchpush = pushop.ui.configbool("experimental", "infinitepush-scratchpush")
if "changesets" in pushop.stepsdone or not scratchpush:
return
if bundleparts.scratchbranchparttype not in bundle2.bundle2caps(pushop.remote):
return
pushop.stepsdone.add("changesets")
pushop.stepsdone.add("treepack")
if not bookmark and not pushop.outgoing.missing:
pushop.ui.status(_("no changes found\n"))
pushop.cgresult = 0
return
# This parameter tells the server that the following bundle is an
# infinitepush. This let's it switch the part processing to our infinitepush
# code path.
bundler.addparam("infinitepush", "True")
nonforwardmove = pushop.force or pushop.ui.configbool(
"experimental", "non-forward-move"
)
scratchparts = bundleparts.getscratchbranchparts(
pushop.repo,
pushop.remote,
pushop.outgoing,
nonforwardmove,
pushop.ui,
bookmark,
create,
bookmarknode,
)
for scratchpart in scratchparts:
bundler.addpart(scratchpart)
def handlereply(op):
# server either succeeds or aborts; no code to read
pushop.cgresult = 1
return handlereply

View File

@ -3,13 +3,21 @@
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
import json
import os
import struct
import tempfile
from edenscm.mercurial import error, extensions, util
from edenscm.mercurial.node import hex
from edenscm.mercurial import (
encoding,
error,
extensions,
node as nodemod,
pushkey,
util,
wireproto,
)
from edenscm.mercurial.commands import debug as debugcommands
from . import bundleparts
def isserver(ui):
@ -20,32 +28,43 @@ def reposetup(ui, repo):
repo._scratchbranchmatcher = scratchbranchmatcher(ui)
def isremotebooksenabled(ui):
return "remotenames" in extensions._extensions and ui.configbool(
"remotenames", "bookmarks"
def extsetup(ui):
wireproto.commands["listkeyspatterns"] = (
wireprotolistkeyspatterns,
"namespace patterns",
)
wireproto.commands["knownnodes"] = (wireprotoknownnodes, "nodes *")
extensions.wrapfunction(
debugcommands, "_debugbundle2part", bundleparts.debugbundle2part
)
def encodebookmarks(bookmarks):
encoded = {}
for bookmark, node in bookmarks.iteritems():
encoded[bookmark] = node
dumped = json.dumps(encoded)
result = struct.pack(">i", len(dumped)) + dumped
return result
def wireprotolistkeyspatterns(repo, proto, namespace, patterns):
patterns = wireproto.decodelist(patterns)
d = repo.listkeys(encoding.tolocal(namespace), patterns).iteritems()
return pushkey.encodekeys(d)
def wireprotoknownnodes(repo, proto, nodes, others):
"""similar to 'known' but also check in infinitepush storage"""
nodes = wireproto.decodelist(nodes)
knownlocally = repo.known(nodes)
for index, known in enumerate(knownlocally):
# TODO: make a single query to the bundlestore.index
if not known and repo.bundlestore.index.getnodebyprefix(
nodemod.hex(nodes[index])
):
knownlocally[index] = True
return "".join(b and "1" or "0" for b in knownlocally)
def downloadbundle(repo, unknownbinhead):
index = repo.bundlestore.index
store = repo.bundlestore.store
bundleid = index.getbundle(hex(unknownbinhead))
bundleid = index.getbundle(nodemod.hex(unknownbinhead))
if bundleid is None:
raise error.Abort("%s head is not known" % hex(unknownbinhead))
bundleraw = store.read(bundleid)
return _makebundlefromraw(bundleraw)
def _makebundlefromraw(data):
raise error.Abort("%s head is not known" % nodemod.hex(unknownbinhead))
data = store.read(bundleid)
fp = None
fd, bundlefile = tempfile.mkstemp()
try: # guards bundlefile