mirror of
https://github.com/facebook/sapling.git
synced 2024-10-07 07:17:55 +03:00
25d8f4e4b8
Summary: improve ux if edenapi can not be reached or connection is timed out currently, the connection overhead and checks are included into the first lookup request that is not nice a lot of requests when the lookup is slow https://fburl.com/scuba/dev_command_timers/m96gcsc4 but the actual reason is network if we send the capabilities request first, it will fail faster with better error message if unreachable (see the test plan) and also the connection time will not be added to the lookup time the result of the capabilities are not currently used for uploads but can be used later Reviewed By: quark-zju Differential Revision: D41471070 fbshipit-source-id: ead1df3ce3432d0f3765a898dc8eb760e0acad8b
345 lines
11 KiB
Python
345 lines
11 KiB
Python
# Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
#
|
|
# This software may be used and distributed according to the terms of the
|
|
# GNU General Public License version 2.
|
|
|
|
from __future__ import absolute_import
|
|
|
|
import bindings
|
|
|
|
from . import error, mutation, node as nodemod
|
|
from .i18n import _, _n
|
|
|
|
TOKEN_KEY = "token"
|
|
INDEX_KEY = "index"
|
|
|
|
|
|
def checkcapable(repo):
|
|
"""Check that edenapi endpoint can be reached"""
|
|
repo.edenapi.capabilities()
|
|
|
|
|
|
def _filtercommits(repo, nodes):
|
|
"""Returns list of missing commits"""
|
|
try:
|
|
with repo.ui.timesection("http.edenapi.upload_filter_commits"):
|
|
stream = repo.edenapi.commitknown(nodes)
|
|
return [
|
|
item["hgid"] for item in stream if item["known"].get("Ok") is not True
|
|
]
|
|
except (error.RustError, error.HttpError) as e:
|
|
raise error.Abort(e)
|
|
|
|
|
|
def _filteruploaded(repo, files, trees):
|
|
"""Returns list of missing blobs and trees"""
|
|
try:
|
|
with repo.ui.timesection("http.edenapi.upload_lookup"):
|
|
stream = repo.edenapi.lookup_filenodes_and_trees(
|
|
[fctx.filenode() for fctx in files],
|
|
[tree[0] for tree in trees],
|
|
)
|
|
|
|
results = list(stream)
|
|
blobslen = len(files)
|
|
|
|
foundindicesblobs = {
|
|
idx for idx, token in results if "HgFilenodeId" in token["data"]["id"]
|
|
}
|
|
foundindicestrees = {
|
|
idx - blobslen
|
|
for idx, token in results
|
|
if "HgTreeId" in token["data"]["id"]
|
|
}
|
|
|
|
missingfiles = [
|
|
fctx
|
|
for index, fctx in enumerate(files)
|
|
if index not in foundindicesblobs
|
|
]
|
|
missingtrees = [
|
|
tree
|
|
for index, tree in enumerate(trees)
|
|
if index not in foundindicestrees
|
|
]
|
|
|
|
return missingfiles, missingtrees
|
|
except (error.RustError, error.HttpError) as e:
|
|
raise error.Abort(e)
|
|
|
|
|
|
def _uploadfilenodes(repo, fctxs):
|
|
"""Upload file content and filenodes"""
|
|
if not fctxs:
|
|
return
|
|
keys = []
|
|
for fctx in fctxs:
|
|
p1, p2 = fctx.filelog().parents(fctx.filenode())
|
|
keys.append((fctx.path(), fctx.filenode(), p1, p2))
|
|
dpack, _hpack = repo.fileslog.getmutablelocalpacks()
|
|
try:
|
|
with repo.ui.timesection("http.edenapi.upload_files"):
|
|
stream, _stats = repo.edenapi.uploadfiles(dpack, keys)
|
|
items = list(stream)
|
|
repo.ui.status(
|
|
_n(
|
|
"uploaded %d file\n",
|
|
"uploaded %d files\n",
|
|
len(items),
|
|
)
|
|
% len(items),
|
|
component="edenapi",
|
|
)
|
|
|
|
except (error.RustError, error.HttpError) as e:
|
|
raise error.Abort(e)
|
|
|
|
|
|
def _uploadtrees(repo, trees):
|
|
"""Upload trees"""
|
|
if not trees:
|
|
return
|
|
|
|
try:
|
|
with repo.ui.timesection("http.edenapi.upload_trees"):
|
|
stream, _stats = repo.edenapi.uploadtrees(trees)
|
|
trees = list(stream)
|
|
repo.ui.status(
|
|
_n(
|
|
"uploaded %d tree\n",
|
|
"uploaded %d trees\n",
|
|
len(trees),
|
|
)
|
|
% len(trees),
|
|
component="edenapi",
|
|
)
|
|
except (error.RustError, error.HttpError) as e:
|
|
raise error.Abort(e)
|
|
|
|
|
|
def _uploadchangesets(repo, changesets, mutations):
|
|
"""Upload changesets"""
|
|
uploaded, failed = [], []
|
|
if not changesets:
|
|
return uploaded, failed
|
|
try:
|
|
with repo.ui.timesection("http.edenapi.upload_changesets"):
|
|
stream, _stats = repo.edenapi.uploadchangesets(changesets, mutations)
|
|
foundids = {item["data"]["id"]["HgChangesetId"] for item in stream}
|
|
repo.ui.status(
|
|
_n(
|
|
"uploaded %d changeset\n",
|
|
"uploaded %d changesets\n",
|
|
len(foundids),
|
|
)
|
|
% len(foundids),
|
|
component="edenapi",
|
|
)
|
|
for cs in changesets:
|
|
if cs[0] in foundids:
|
|
uploaded.append(cs[0])
|
|
else:
|
|
failed.append(cs[0])
|
|
|
|
repo.ui.log(
|
|
"edenapi_uploaded_changesets", edenapi_uploaded_changesets=len(uploaded)
|
|
)
|
|
return uploaded, failed
|
|
except (error.RustError, error.HttpError) as e:
|
|
raise error.Abort(e)
|
|
|
|
|
|
def _getfiles(repo, nodes):
|
|
"""Get changed files"""
|
|
toupload = set()
|
|
for node in nodes.iterrev():
|
|
ctx = repo[node]
|
|
for f in ctx.files():
|
|
if f not in ctx:
|
|
continue
|
|
fctx = ctx[f]
|
|
toupload.add(fctx)
|
|
return toupload
|
|
|
|
|
|
def _gettrees(repo, nodes):
|
|
"""Get changed trees"""
|
|
treedepth = 1 << 15
|
|
for node in nodes.iterrev():
|
|
parentnodes = repo.changelog.dag.parentnames(node)
|
|
mfnode = repo.changelog.changelogrevision(node).manifest
|
|
basemfnodes = [
|
|
repo.changelog.changelogrevision(p).manifest for p in parentnodes
|
|
]
|
|
difftrees = bindings.manifest.subdirdiff(
|
|
repo.manifestlog.datastore, "", mfnode, basemfnodes, treedepth
|
|
)
|
|
for subdir, treenode, treetext, p1, p2 in difftrees:
|
|
yield treenode, p1, p2, treetext
|
|
|
|
|
|
def _torevs(repo, uploadednodes, failednodes):
|
|
"""Convert nodes back to revs"""
|
|
return set([repo[node].rev() for node in uploadednodes]), set(
|
|
[repo[node].rev() for node in failednodes]
|
|
)
|
|
|
|
|
|
def filetypefromfile(f):
|
|
if f.isexec():
|
|
return "Executable"
|
|
elif f.islink():
|
|
return "Symlink"
|
|
else:
|
|
return "Regular"
|
|
|
|
|
|
def parentsfromctx(ctx):
|
|
p1 = ctx.p1().node()
|
|
p2 = ctx.p2().node()
|
|
if p1 != nodemod.nullid and p2 != nodemod.nullid:
|
|
return (p1, p2)
|
|
elif p1 != nodemod.nullid:
|
|
return p1
|
|
else:
|
|
return None
|
|
|
|
|
|
def uploadhgchangesets(repo, revs, force=False, skipknowncheck=False):
|
|
"""Upload list of revs via EdenApi Uploads protocol
|
|
|
|
EdenApi Uploads API consists of the following:
|
|
|
|
* Endpoint for lookup any type of data (file contents, hg filenodes, hg treemanifests, hg commits).
|
|
* Endpoint for upload file contents.
|
|
* Endpoint for upload hg filenodes.
|
|
* Endpoint for upload hg treemanifest.
|
|
* Endpoint for upload hg commits & mutation information.
|
|
|
|
The upload process is split into several stages:
|
|
|
|
* Check and skip commits that have been already uploaded building ``uploadcommitqueue``.
|
|
* Check and skip hg filenodes that have been already uploaded buiding ``uploadblobqueue``.
|
|
* Check and skip hg trees that have been already uploaded buiding ``uploadtreesqueue``.
|
|
* Calculate ContentIds hashes and upload all file contents for the ``uploadblobqueue``
|
|
but skipping already uploaded content ids first (this step also deduplicates content ids
|
|
if they are the same for some filenodes). See edenapi.uploadfiles.
|
|
* Upload hg filenodes (``uploadblobqueue``).
|
|
* Upload hg trees (``uploadtreesqueue``).
|
|
* Finally, upload hg changesets and hg mutation information (``uploadcommitqueue``).
|
|
|
|
If ``force`` is True (the default is False) the lookup check isn't performed prior to upload for commits, filenodes and trees.
|
|
It will be still performed for file contents.
|
|
|
|
If ``skipknowncheck`` is True (the default is False) the lookup check isn't performed to filter out already uploaded commits.
|
|
Assumed it is known already that they are missing on the server.
|
|
|
|
Returns newly uploaded revs and failed revs.
|
|
"""
|
|
|
|
nodes = [repo[r].node() for r in revs]
|
|
|
|
# Build a queue of commits to upload
|
|
uploadcommitqueue = (
|
|
nodes if (force or skipknowncheck) else _filtercommits(repo, nodes)
|
|
)
|
|
|
|
if not uploadcommitqueue:
|
|
# No commits to upload
|
|
return set(), set()
|
|
|
|
repo.ui.status(
|
|
_n(
|
|
"queue %d commit for upload\n",
|
|
"queue %d commits for upload\n",
|
|
len(uploadcommitqueue),
|
|
)
|
|
% len(uploadcommitqueue),
|
|
component="edenapi",
|
|
)
|
|
|
|
# Sort uploadcommitqueue in topological order (use iterrev() to iterate from parents to children)
|
|
uploadcommitqueue = repo.changelog.dag.sort(uploadcommitqueue)
|
|
|
|
# Build a queue of missing filenodes to upload
|
|
files = list(_getfiles(repo, uploadcommitqueue))
|
|
|
|
# Build a queue of missing trees to upload
|
|
trees = list(_gettrees(repo, uploadcommitqueue))
|
|
|
|
uploadblobqueue, uploadtreesqueue = (
|
|
(files, trees) if force else _filteruploaded(repo, files, trees)
|
|
)
|
|
|
|
repo.ui.status(
|
|
_n(
|
|
"queue %d file for upload\n",
|
|
"queue %d files for upload\n",
|
|
len(uploadblobqueue),
|
|
)
|
|
% len(uploadblobqueue),
|
|
component="edenapi",
|
|
)
|
|
|
|
# Upload missing files and filenodes for the selected set of filenodes
|
|
_uploadfilenodes(repo, uploadblobqueue)
|
|
|
|
repo.ui.status(
|
|
_n(
|
|
"queue %d tree for upload\n",
|
|
"queue %d trees for upload\n",
|
|
len(uploadtreesqueue),
|
|
)
|
|
% len(uploadtreesqueue),
|
|
component="edenapi",
|
|
)
|
|
|
|
# Upload missing trees
|
|
_uploadtrees(repo, uploadtreesqueue)
|
|
|
|
# Uploading changesets
|
|
changesets = []
|
|
for node in uploadcommitqueue.iterrev():
|
|
repo.ui.status(
|
|
_("uploading commit '%s'...\n") % nodemod.hex(node), component="edenapi"
|
|
)
|
|
ctx = repo[node]
|
|
extras = [
|
|
{"key": key.encode(), "value": value.encode()}
|
|
for key, value in ctx.extra().items()
|
|
if key != "branch"
|
|
]
|
|
(time, timezone) = ctx.date()
|
|
changesets.append(
|
|
(
|
|
node,
|
|
{
|
|
"parents": parentsfromctx(ctx),
|
|
"manifestid": ctx.manifestnode(),
|
|
"user": ctx.user().encode(),
|
|
"time": int(time),
|
|
"tz": timezone,
|
|
"extras": extras,
|
|
"files": ctx.files(),
|
|
"message": ctx.description().encode(),
|
|
},
|
|
)
|
|
)
|
|
|
|
mutations = mutation.entriesfornodes(repo, uploadcommitqueue)
|
|
mutations = [
|
|
{
|
|
"successor": mut.succ(),
|
|
"predecessors": mut.preds(),
|
|
"split": mut.split(),
|
|
"op": mut.op(),
|
|
"user": mut.user().encode(),
|
|
"time": mut.time(),
|
|
"tz": mut.tz(),
|
|
"extras": [{"key": key, "value": value} for key, value in mut.extra()],
|
|
}
|
|
for mut in mutations
|
|
]
|
|
|
|
return _torevs(repo, *_uploadchangesets(repo, changesets, mutations))
|