mirror of
https://github.com/facebook/sapling.git
synced 2024-10-10 16:57:49 +03:00
commitcloud: serialize cloud sync attempts with other transactions
Summary: Cloud sync backs up commits before it takes the lock. If, during this backup process, another transaction adds new commits to the repository, the cloud sync process will not be able to sync properly, as some of the commits won't have been backed up. Furthermore, because the backup state may have been updated between opening the changelog and reading the backup state, which means the backup state may reference commits that are not available in the changelog to the current process. Serialize these operations by: * Reading the backup state file under the repo lock. This, unfortunately, means `hg cloud backup` needs to take the lock, but it should only be for a very brief period while a file is read. * When cloud sync has completed the backup and is about to start syncing, check that the repository hasn't changed while the backup was running. If it has, abandon this sync attempt. Another background sync will have been scheduled that will backup the new commits and then sync everything. Reviewed By: quark-zju Differential Revision: D15468485 fbshipit-source-id: 706ada05dfb1f539638104722a044493c7e3e62f
This commit is contained in:
parent
02023b0972
commit
065309255e
@ -8,32 +8,33 @@ from __future__ import absolute_import
|
||||
from edenscm.mercurial import node as nodemod, smartset
|
||||
from edenscm.mercurial.i18n import _, _n
|
||||
|
||||
from . import backuplock, backupstate, dependencies
|
||||
from . import backuplock, dependencies
|
||||
|
||||
|
||||
def backup(repo, remotepath, getconnection, revs=None):
|
||||
def backup(repo, backupstate, remotepath, getconnection, revs=None):
|
||||
"""backs up the given revisions to commit cloud
|
||||
|
||||
Returns (backedup, failed), where "backedup" is a revset of the commits that
|
||||
were backed up, and "failed" is a revset of the commits that could not be
|
||||
backed up.
|
||||
"""
|
||||
state = backupstate.BackupState(repo, remotepath)
|
||||
unfi = repo.unfiltered()
|
||||
|
||||
if revs is None:
|
||||
# No revs specified. Back up all visible commits that are not already
|
||||
# backed up.
|
||||
heads = unfi.revs("heads(draft() - hidden() - (draft() & ::%ln))", state.heads)
|
||||
heads = unfi.revs(
|
||||
"heads(draft() - hidden() - (draft() & ::%ln))", backupstate.heads
|
||||
)
|
||||
else:
|
||||
# Some revs were specified. Back up all of those commits that are not
|
||||
# already backed up.
|
||||
heads = unfi.revs(
|
||||
"heads((draft() & ::%ld) - (draft() & ::%ln))", revs, state.heads
|
||||
"heads((draft() & ::%ld) - (draft() & ::%ln))", revs, backupstate.heads
|
||||
)
|
||||
|
||||
if not heads:
|
||||
return (smartset.baseset(), smartset.baseset())
|
||||
return smartset.baseset(), smartset.baseset()
|
||||
|
||||
# Check if any of the heads are already available on the server.
|
||||
headnodes = list(repo.nodes("%ld", heads))
|
||||
@ -48,12 +49,12 @@ def backup(repo, remotepath, getconnection, revs=None):
|
||||
if backedup
|
||||
}
|
||||
if remoteheadnodes:
|
||||
state.update(remoteheadnodes)
|
||||
backupstate.update(remoteheadnodes)
|
||||
|
||||
heads = unfi.revs("%ld - %ln", heads, remoteheadnodes)
|
||||
|
||||
if not heads:
|
||||
return (smartset.baseset(), smartset.baseset())
|
||||
return smartset.baseset(), smartset.baseset()
|
||||
|
||||
# Filter out any commits that have been marked as bad.
|
||||
badnodes = repo.ui.configlist("infinitepushbackup", "dontbackupnodes", [])
|
||||
@ -66,7 +67,7 @@ def backup(repo, remotepath, getconnection, revs=None):
|
||||
"(draft() & ::%ld) & (%ls::) - (draft() & ::%ln)",
|
||||
heads,
|
||||
badnodes,
|
||||
state.heads,
|
||||
backupstate.heads,
|
||||
)
|
||||
)
|
||||
if badnodes:
|
||||
@ -91,17 +92,19 @@ def backup(repo, remotepath, getconnection, revs=None):
|
||||
heads = sorted(heads, reverse=True)[:backuplimit]
|
||||
|
||||
# Back up the new heads.
|
||||
backuplock.progressbackingup(
|
||||
repo,
|
||||
list(unfi.nodes("(draft() & ::%ld) - (draft() & ::%ln)", heads, state.heads)),
|
||||
backingup = unfi.nodes(
|
||||
"(draft() & ::%ld) - (draft() & ::%ln)", heads, backupstate.heads
|
||||
)
|
||||
backuplock.progressbackingup(repo, list(backingup))
|
||||
newheads, failedheads = dependencies.infinitepush.pushbackupbundlestacks(
|
||||
repo.ui, unfi, getconnection, [nodemod.hex(n) for n in unfi.nodes("%ld", heads)]
|
||||
)
|
||||
|
||||
# The commits that got backed up are all the ancestors of the new backup
|
||||
# heads, minus any commits that were already backed up at the start.
|
||||
backedup = unfi.revs("(draft() & ::%ls) - (draft() & ::%ln)", newheads, state.heads)
|
||||
backedup = unfi.revs(
|
||||
"(draft() & ::%ls) - (draft() & ::%ln)", newheads, backupstate.heads
|
||||
)
|
||||
# The commits that failed to get backed up are the ancestors of the failed
|
||||
# heads, except for commits that are also ancestors of a successfully backed
|
||||
# up head, or commits that were already known to be backed up.
|
||||
@ -109,9 +112,9 @@ def backup(repo, remotepath, getconnection, revs=None):
|
||||
"(draft() & ::%ls) - (draft() & ::%ls) - (draft() & ::%ln)",
|
||||
failedheads,
|
||||
newheads,
|
||||
state.heads,
|
||||
backupstate.heads,
|
||||
)
|
||||
|
||||
state.update(unfi.nodes("%ld", backedup))
|
||||
backupstate.update(unfi.nodes("%ld", backedup))
|
||||
|
||||
return backedup, failed
|
||||
|
@ -11,10 +11,10 @@ import os
|
||||
import re
|
||||
import socket
|
||||
|
||||
from edenscm.mercurial import encoding, error, hg, node as nodemod, phases, util
|
||||
from edenscm.mercurial import encoding, error, node as nodemod, phases, util
|
||||
from edenscm.mercurial.i18n import _
|
||||
|
||||
from . import backupstate, dependencies, util as ccutil
|
||||
from . import dependencies
|
||||
|
||||
|
||||
prefix = "infinitepushbackups/infinitepushbackupstate"
|
||||
@ -108,12 +108,11 @@ def _writelocalbackupstate(repo, remotepath, heads, bookmarks):
|
||||
json.dump(state, f)
|
||||
|
||||
|
||||
def pushbackupbookmarks(repo, remotepath, getconnection):
|
||||
def pushbackupbookmarks(repo, remotepath, getconnection, backupstate):
|
||||
"""
|
||||
Push a backup bundle to the server that updates the infinitepush backup
|
||||
bookmarks.
|
||||
"""
|
||||
state = backupstate.BackupState(repo, remotepath)
|
||||
unfi = repo.unfiltered()
|
||||
|
||||
# Create backup bookmarks for the heads and bookmarks of the user. We
|
||||
@ -121,12 +120,12 @@ def pushbackupbookmarks(repo, remotepath, getconnection):
|
||||
# that we can sure they are available on the server.
|
||||
clrev = unfi.changelog.rev
|
||||
ancestors = unfi.changelog.ancestors(
|
||||
[clrev(head) for head in state.heads], inclusive=True
|
||||
[clrev(head) for head in backupstate.heads], inclusive=True
|
||||
)
|
||||
# Get the heads of visible draft commits that are already backed up,
|
||||
# including commits made visible by bookmarks.
|
||||
revset = "heads((draft() & ::((draft() - obsolete() - hidden()) + bookmark())) & (draft() & ::%ln))"
|
||||
heads = [nodemod.hex(head) for head in unfi.nodes(revset, state.heads)]
|
||||
heads = [nodemod.hex(head) for head in unfi.nodes(revset, backupstate.heads)]
|
||||
# Get the bookmarks that point to ancestors of backed up draft commits or
|
||||
# to commits that are public.
|
||||
bookmarks = {}
|
||||
|
@ -29,26 +29,28 @@ class BackupState(object):
|
||||
self.prefix + hashlib.sha256(remotepath).hexdigest()[0:8]
|
||||
)
|
||||
self.heads = set()
|
||||
if repo.sharedvfs.exists(self.filename):
|
||||
with repo.sharedvfs.open(self.filename) as f:
|
||||
lines = f.readlines()
|
||||
if len(lines) < 2 or lines[0].strip() != FORMAT_VERSION:
|
||||
repo.ui.debug(
|
||||
"unrecognised backedupheads version '%s', ignoring\n"
|
||||
% lines[0].strip()
|
||||
)
|
||||
self.initfromserver()
|
||||
return
|
||||
if lines[1].strip() != remotepath:
|
||||
repo.ui.debug(
|
||||
"backupheads file is for a different remote ('%s' instead of '%s'), reinitializing\n"
|
||||
% (lines[1].strip(), remotepath)
|
||||
)
|
||||
self.initfromserver()
|
||||
return
|
||||
self.heads = set(nodemod.bin(head.strip()) for head in lines[2:])
|
||||
else:
|
||||
self.initfromserver()
|
||||
# Load the backup state under the repo lock to ensure a consistent view.
|
||||
with repo.lock():
|
||||
if repo.sharedvfs.exists(self.filename):
|
||||
with repo.sharedvfs.open(self.filename) as f:
|
||||
lines = f.readlines()
|
||||
if len(lines) < 2 or lines[0].strip() != FORMAT_VERSION:
|
||||
repo.ui.debug(
|
||||
"unrecognised backedupheads version '%s', ignoring\n"
|
||||
% lines[0].strip()
|
||||
)
|
||||
self.initfromserver()
|
||||
return
|
||||
if lines[1].strip() != remotepath:
|
||||
repo.ui.debug(
|
||||
"backupheads file is for a different remote ('%s' instead of '%s'), reinitializing\n"
|
||||
% (lines[1].strip(), remotepath)
|
||||
)
|
||||
self.initfromserver()
|
||||
return
|
||||
self.heads = set(nodemod.bin(head.strip()) for head in lines[2:])
|
||||
else:
|
||||
self.initfromserver()
|
||||
|
||||
def initfromserver(self):
|
||||
# Check with the server about all visible commits that we don't already
|
||||
|
@ -357,11 +357,12 @@ def cloudbackup(ui, repo, *revs, **opts):
|
||||
getconnection = lambda: repo.connectionpool.get(remotepath, opts)
|
||||
|
||||
with backuplock.lock(repo):
|
||||
backedup, failed = backup.backup(repo, remotepath, getconnection, revs)
|
||||
state = backupstate.BackupState(repo, remotepath)
|
||||
backedup, failed = backup.backup(repo, state, remotepath, getconnection, revs)
|
||||
|
||||
if revs is None:
|
||||
# For a full backup, also update the backup bookmarks.
|
||||
backupbookmarks.pushbackupbookmarks(repo, remotepath, getconnection)
|
||||
backupbookmarks.pushbackupbookmarks(repo, remotepath, getconnection, state)
|
||||
|
||||
if backedup:
|
||||
repo.ui.status(
|
||||
@ -478,17 +479,20 @@ def cloudrestorebackup(ui, repo, dest=None, **opts):
|
||||
if dest:
|
||||
pullopts["source"] = dest
|
||||
|
||||
maxrevbeforepull = len(repo.changelog)
|
||||
result = pullcmd(ui, repo, **pullopts)
|
||||
maxrevafterpull = len(repo.changelog)
|
||||
with backuplock.lock(repo), repo.wlock(), repo.lock(), repo.transaction(
|
||||
"backuprestore"
|
||||
) as tr:
|
||||
|
||||
if ui.config("infinitepushbackup", "createlandedasmarkers", False):
|
||||
pullcreatemarkers = extensions.find("pullcreatemarkers")
|
||||
pullcreatemarkers.createmarkers(
|
||||
result, repo, maxrevbeforepull, maxrevafterpull, fromdrafts=False
|
||||
)
|
||||
maxrevbeforepull = len(repo.changelog)
|
||||
result = pullcmd(ui, repo, **pullopts)
|
||||
maxrevafterpull = len(repo.changelog)
|
||||
|
||||
if ui.config("infinitepushbackup", "createlandedasmarkers", False):
|
||||
pullcreatemarkers = extensions.find("pullcreatemarkers")
|
||||
pullcreatemarkers.createmarkers(
|
||||
result, repo, maxrevbeforepull, maxrevafterpull, fromdrafts=False
|
||||
)
|
||||
|
||||
with repo.wlock(), repo.lock(), repo.transaction("bookmark") as tr:
|
||||
changes = []
|
||||
for name, hexnode in bookmarks.iteritems():
|
||||
if hexnode in repo:
|
||||
@ -497,14 +501,14 @@ def cloudrestorebackup(ui, repo, dest=None, **opts):
|
||||
ui.warn(_("%s not found, not creating %s bookmark") % (hexnode, name))
|
||||
repo._bookmarks.applychanges(repo, tr, changes)
|
||||
|
||||
# Update local backup state and flag to not autobackup just after we
|
||||
# restored, which would be pointless.
|
||||
state = backupstate.BackupState(repo, remotepath)
|
||||
state.update([nodemod.bin(hexnode) for hexnode in heads + bookmarknodes])
|
||||
backupbookmarks._writelocalbackupstate(
|
||||
repo, ccutil.getremotepath(repo, dest), heads, bookmarks
|
||||
)
|
||||
repo.ignoreautobackup = True
|
||||
# Update local backup state and flag to not autobackup just after we
|
||||
# restored, which would be pointless.
|
||||
state = backupstate.BackupState(repo, remotepath)
|
||||
state.update([nodemod.bin(hexnode) for hexnode in heads + bookmarknodes])
|
||||
backupbookmarks._writelocalbackupstate(
|
||||
repo, ccutil.getremotepath(repo, dest), heads, bookmarks
|
||||
)
|
||||
repo.ignoreautobackup = True
|
||||
|
||||
return result
|
||||
|
||||
|
@ -101,8 +101,12 @@ def sync(
|
||||
)
|
||||
return 0
|
||||
|
||||
origheads = _getheads(repo)
|
||||
origbookmarks = _getbookmarks(repo)
|
||||
|
||||
# Back up all local commits that are not already backed up.
|
||||
backedup, failed = backup.backup(repo, remotepath, getconnection)
|
||||
state = backupstate.BackupState(repo, remotepath)
|
||||
backedup, failed = backup.backup(repo, state, remotepath, getconnection)
|
||||
|
||||
# On cloud rejoin we already know what the cloudrefs are. Otherwise,
|
||||
# fetch them from the commit cloud service.
|
||||
@ -110,11 +114,23 @@ def sync(
|
||||
cloudrefs = serv.getreferences(reponame, workspacename, fetchversion)
|
||||
|
||||
with repo.wlock(), repo.lock(), repo.transaction("cloudsync"):
|
||||
|
||||
if origheads != _getheads(repo) or origbookmarks != _getbookmarks(repo):
|
||||
# Another transaction changed the repository while we were backing
|
||||
# up commits. This may have introduced new commits that also need
|
||||
# backing up. That transaction should have started its own sync
|
||||
# process, so give up on this sync, and let the later one perform
|
||||
# the sync.
|
||||
raise ccerror.SynchronizationError(ui, _("repo changed while backing up"))
|
||||
|
||||
synced = False
|
||||
while not synced:
|
||||
|
||||
# Apply any changes from the cloud to the local repo.
|
||||
if cloudrefs.version != fetchversion:
|
||||
_applycloudchanges(repo, remotepath, lastsyncstate, cloudrefs, maxage)
|
||||
_applycloudchanges(
|
||||
repo, remotepath, lastsyncstate, cloudrefs, maxage, state
|
||||
)
|
||||
|
||||
# Check if any omissions are now included in the repo
|
||||
_checkomissions(repo, remotepath, lastsyncstate)
|
||||
@ -126,7 +142,7 @@ def sync(
|
||||
)
|
||||
|
||||
# Update the backup bookmarks with any changes we have made by syncing.
|
||||
backupbookmarks.pushbackupbookmarks(repo, remotepath, getconnection)
|
||||
backupbookmarks.pushbackupbookmarks(repo, remotepath, getconnection, state)
|
||||
|
||||
backuplock.progresscomplete(repo)
|
||||
|
||||
@ -203,7 +219,7 @@ def _maybeupdateworkingcopy(repo, currentnode):
|
||||
return 0
|
||||
|
||||
|
||||
def _applycloudchanges(repo, remotepath, lastsyncstate, cloudrefs, maxage):
|
||||
def _applycloudchanges(repo, remotepath, lastsyncstate, cloudrefs, maxage, state):
|
||||
pullcmd, pullopts = ccutil.getcommandandoptions("^pull")
|
||||
|
||||
try:
|
||||
@ -347,7 +363,6 @@ def _applycloudchanges(repo, remotepath, lastsyncstate, cloudrefs, maxage):
|
||||
|
||||
# Also update backup state. These new heads are already backed up,
|
||||
# otherwise the server wouldn't have told us about them.
|
||||
state = backupstate.BackupState(repo, remotepath)
|
||||
state.update([nodemod.bin(head) for head in newheads])
|
||||
|
||||
|
||||
@ -504,6 +519,7 @@ def _mergeobsmarkers(repo, tr, obsmarkers):
|
||||
if obsolete.isenabled(repo, obsolete.createmarkersopt):
|
||||
tr._commitcloudskippendingobsmarkers = True
|
||||
repo.obsstore.add(tr, obsmarkers)
|
||||
repo.filteredrevcache.clear()
|
||||
|
||||
|
||||
def _checkomissions(repo, remotepath, lastsyncstate):
|
||||
|
@ -1,19 +0,0 @@
|
||||
# Dummy extension that throws if lock is taken
|
||||
#
|
||||
# This extension can be used to test that lock is not taken when it's not
|
||||
# supposed to
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
from edenscm.mercurial import error
|
||||
|
||||
|
||||
def reposetup(ui, repo):
|
||||
class faillockrepo(repo.__class__):
|
||||
def lock(self, wait=True):
|
||||
raise error.Abort("lock is taken!")
|
||||
|
||||
def wlock(self, wait=True):
|
||||
raise error.Abort("lock is taken!")
|
||||
|
||||
repo.__class__ = faillockrepo
|
@ -89,7 +89,7 @@ Push all pulled commit to backup
|
||||
updating bookmark master
|
||||
new changesets 948715751816
|
||||
obsoleted 1 changesets
|
||||
$ hg cloud backup --config extensions.lockfail=$TESTDIR/lockfail.py
|
||||
$ hg cloud backup
|
||||
backing up stack rooted at 9b3ead1d8005
|
||||
remote: pushing 2 commits:
|
||||
remote: 9b3ead1d8005 add b
|
||||
@ -129,8 +129,7 @@ Clone fresh repo and try to restore from backup
|
||||
adding manifests
|
||||
adding file changes
|
||||
added 2 changesets with 1 changes to 2 files (+1 heads)
|
||||
new changesets 9b3ead1d8005:3969cd9723d1
|
||||
obsoleted 1 changesets
|
||||
new changesets 3969cd9723d1
|
||||
$ hg sl --all
|
||||
@ changeset: 2:948715751816
|
||||
: bookmark: master
|
||||
|
@ -42,10 +42,9 @@ Pushing in this new, empty clone shouldn't clear the old backup
|
||||
$ scratchbookmarks
|
||||
infinitepush/backups/test/testhost$TESTTMP/client/heads/606a357e69adb2e36d559ae3237626e82a955c9d 606a357e69adb2e36d559ae3237626e82a955c9d
|
||||
|
||||
Make commit and backup it. Use lockfail.py to make sure lock is not taken during
|
||||
pushbackup
|
||||
Make commit and backup it.
|
||||
$ mkcommit commit
|
||||
$ hg cloud backup --config extensions.lockfail=$TESTDIR/lockfail.py
|
||||
$ hg cloud backup
|
||||
backing up stack rooted at 7e6a6fd9c7c8
|
||||
remote: pushing 1 commit:
|
||||
remote: 7e6a6fd9c7c8 commit
|
||||
|
@ -95,16 +95,29 @@ While that is getting started, create a new commit locally.
|
||||
@ 0: df4f53cec30a public 'base'
|
||||
|
||||
|
||||
Let the background sync we started earlier continue.
|
||||
Let the background sync we started earlier continue, and start a concurrent cloud sync.
|
||||
|
||||
$ rm $TESTTMP/wlockpre1
|
||||
$ hg cloud sync
|
||||
visibility: read 1 heads: 1292cc1f1c17
|
||||
commitcloud: synchronizing 'testrepo' with 'user/test/default'
|
||||
abort: unknown revision '79089e97b9e7c2d5091a0fed699b90fb71f827a2'!
|
||||
[255]
|
||||
|
||||
BUG! The pulled node wasn't visible to this cloud sync command.
|
||||
backing up stack rooted at 1292cc1f1c17
|
||||
remote: pushing 1 commit:
|
||||
remote: 1292cc1f1c17 commit2
|
||||
pulling 79089e97b9e7
|
||||
pulling from ssh://user@dummy/server
|
||||
searching for changes
|
||||
adding changesets
|
||||
adding manifests
|
||||
adding file changes
|
||||
added 1 changesets with 1 changes to 1 files (+1 heads)
|
||||
visibility: removed 0 heads []; added 1 heads [79089e97b9e7]
|
||||
commitcloud_sync: synced to workspace user/test/default version 2: 1 heads (0 omitted), 0 bookmarks (0 omitted)
|
||||
commitcloud_sync: synced to workspace user/test/default version 3: 2 heads (0 omitted), 0 bookmarks (0 omitted)
|
||||
visibility: wrote 2 heads: 79089e97b9e7, 1292cc1f1c17
|
||||
new changesets 79089e97b9e7
|
||||
commitcloud: commits synchronized
|
||||
finished in 0.00 sec
|
||||
|
||||
$ tglogp
|
||||
visibility: read 2 heads: 79089e97b9e7, 1292cc1f1c17
|
||||
@ -122,17 +135,6 @@ Wait for the background backup to finish and check its output.
|
||||
visibility: read 0 heads:
|
||||
commitcloud: synchronizing 'testrepo' with 'user/test/default'
|
||||
visibility: read 1 heads: 1292cc1f1c17
|
||||
pulling 79089e97b9e7
|
||||
pulling from ssh://user@dummy/server
|
||||
searching for changes
|
||||
adding changesets
|
||||
adding manifests
|
||||
adding file changes
|
||||
added 1 changesets with 1 changes to 1 files (+1 heads)
|
||||
visibility: removed 0 heads []; added 1 heads [79089e97b9e7]
|
||||
commitcloud_sync: synced to workspace user/test/default version 2: 1 heads (0 omitted), 0 bookmarks (0 omitted)
|
||||
commitcloud_sync: synced to workspace user/test/default version 3: 2 heads (0 omitted), 0 bookmarks (0 omitted)
|
||||
visibility: wrote 2 heads: 79089e97b9e7, 1292cc1f1c17
|
||||
new changesets 79089e97b9e7
|
||||
commitcloud: commits synchronized
|
||||
finished in 0.00 sec
|
||||
abort: commitcloud: failed to synchronize commits: 'repo changed while backing up'
|
||||
(please retry 'hg cloud sync')
|
||||
(please contact the Source Control Team if this error persists)
|
||||
|
Loading…
Reference in New Issue
Block a user