sapling/hgext3rd/p4fastimport/filetransaction.py
David Soria Parra aca5b8465c p4fastimport: filelog transaction wtih support for concurrent access
Summary:
We are adding a simple transaction that works only with filelogs but
allows for concurrent access from multiple workers. This allows for a proper
rollback in case of a failure in a worker process, which previously would result
in bad data in the repositoriy.

Test Plan: rt test-p4* test-check*

Reviewers: #mercurial, durham, quark

Reviewed By: quark

Subscribers: mjpieters

Differential Revision: https://phabricator.intern.facebook.com/D5070340

Signature: t1:5070340:1494958313:b10b1eac5b42b36d1a587c4ae1c95fc2f8b5ad35
2017-05-16 12:36:23 -07:00

54 lines
1.4 KiB
Python

from mercurial.i18n import _
import contextlib
import fcntl
class filetransaction(object):
def __init__(self, report, opener):
self.report = report
self.opener = opener
self.closed = False
self.journalfile = 'filejournals'
@contextlib.contextmanager
def _lock(self, m='w'):
f = self.opener.open(self.journalfile, m)
try:
fcntl.lockf(f, fcntl.LOCK_EX)
yield f
finally:
f.close()
def add(self, file, offset, data=None):
with self._lock() as f:
f.seek(0, 2)
f.write('%d\0%s\n' % (offset, file))
f.flush()
def close(self):
self.closed = True
def _read(self):
entries = []
with self._lock(m='r+') as f:
for line in f.readlines():
e = line.split('\0', 1)
entries.append((e[1].rstrip(), int(e[0])))
return entries
def abort(self):
self.report(_('transaction abort!\n'))
for filename, offset in self._read():
fp = self.opener(filename, 'a', checkambig=True)
fp.truncate(offset)
fp.close()
self.opener.unlink(self.journalfile)
self.report(_('rollback complete\n'))
def release(self):
if self.closed:
self.opener.unlink(self.journalfile)
return
self.abort()