Merge pull request #2565 from enkore/issue/2517

Implement storage quotas
This commit is contained in:
enkore 2017-06-01 14:39:17 +02:00 committed by GitHub
commit 23d591c1c3
8 changed files with 198 additions and 8 deletions

View File

@ -185,6 +185,67 @@ commit logic) showing the principal operation of compaction:
(The actual algorithm is more complex to avoid various consistency issues, refer to
the ``borg.repository`` module for more comments and documentation on these issues.)
.. _internals_storage_quota:
Storage quotas
~~~~~~~~~~~~~~
Quotas are implemented at the Repository level. The active quota of a repository
is determined by the ``storage_quota`` `config` entry or a run-time override (via :ref:`borg_serve`).
The currently used quota is stored in the hints file. Operations (PUT and DELETE) during
a transaction modify the currently used quota:
- A PUT adds the size of the *log entry* to the quota,
i.e. the length of the data plus the 41 byte header.
- A DELETE subtracts the size of the deleted log entry from the quota,
which includes the header.
Thus, PUT and DELETE are symmetric and cancel each other out precisely.
The quota does not track on-disk size overheads (due to conditional compaction
or append-only mode). In normal operation the inclusion of the log entry headers
in the quota act as a faithful proxy for index and hints overheads.
By tracking effective content size, the client can *always* recover from a full quota
by deleting archives. This would not be possible if the quota tracked on-disk size,
since journaling DELETEs requires extra disk space before space is freed.
Tracking effective size on the other hand accounts DELETEs immediately as freeing quota.
.. rubric:: Enforcing the quota
The storage quota is meant as a robust mechanism for service providers, therefore
:ref:`borg_serve` has to enforce it without loopholes (e.g. modified clients).
The quota is enforcible only if *all* :ref:`borg_serve` versions
accessible to clients support quotas (see next section). Further, quota is
per repository. Therefore, ensure clients can only access a defined set of repositories
with their quotas set, using ``--restrict-to-path``.
If the client exceeds the storage quota the ``StorageQuotaExceeded`` exception is
raised. Normally a client could ignore such an exception and just send a ``commit()``
command anyway, circumventing the quota. However, when ``StorageQuotaExceeded`` is raised,
it is stored in the ``transaction_doomed`` attribute of the repository.
If the transaction is doomed, then commit will re-raise this exception, aborting the commit.
The transaction_doomed indicator is reset on a rollback (which erases the quota-exceeding
state).
.. rubric:: Compatibility with older servers and enabling quota after-the-fact
If no quota data is stored in the hints file, Borg assumes zero quota is used.
Thus, if a repository with an enabled quota is written to with an older version
that does not understand quotas, then the quota usage will be erased.
A similar situation arises when upgrading from a Borg release that did not have quotas.
Borg will start tracking quota use from the time of the upgrade, starting at zero.
If the quota shall be enforced accurately in these cases, either
- delete the ``index.N`` and ``hints.N`` files, forcing Borg to rebuild both,
re-acquiring quota data in the process, or
- edit the msgpacked ``hints.N`` file (not recommended and thus not
documented further).
.. _manifest:
The manifest

View File

@ -17,6 +17,8 @@ optional arguments
| select encryption key mode **(required)**
``-a``, ``--append-only``
| create an append-only mode repository
``--storage-quota``
| Set storage quota of the new repository (e.g. 5G, 1.5T). Default: no quota.
`Common options`_
|

View File

@ -13,6 +13,8 @@ optional arguments
| restrict repository access to PATH. Can be specified multiple times to allow the client access to several directories. Access to all sub-directories is granted implicitly; PATH doesn't need to directly point to a repository.
``--append-only``
| only allow appending to repository segment files
``--storage-quota``
| Override storage quota of the repository (e.g. 5G, 1.5T). When a new repository is initialized, sets the storage quota on the new repository as well. Default: no quota.
`Common options`_
|

View File

@ -46,7 +46,7 @@
from .helpers import location_validator, archivename_validator, ChunkerParams
from .helpers import PrefixSpec, SortBySpec, HUMAN_SORT_KEYS
from .helpers import BaseFormatter, ItemFormatter, ArchiveFormatter
from .helpers import format_timedelta, format_file_size, format_archive
from .helpers import format_timedelta, format_file_size, parse_file_size, format_archive
from .helpers import safe_encode, remove_surrogates, bin_to_hex, prepare_dump_dict
from .helpers import prune_within, prune_split
from .helpers import timestamp
@ -142,6 +142,13 @@ def wrapper(self, args, repository, key, manifest, **kwargs):
return wrapper
def parse_storage_quota(storage_quota):
parsed = parse_file_size(storage_quota)
if parsed < parse_file_size('10M'):
raise argparse.ArgumentTypeError('quota is too small (%s). At least 10M are required.' % storage_quota)
return parsed
class Archiver:
def __init__(self, lock_wait=None, prog=None):
@ -206,7 +213,11 @@ def build_matcher(inclexcl_patterns, include_paths):
def do_serve(self, args):
"""Start in server mode. This command is usually not used manually."""
return RepositoryServer(restrict_to_paths=args.restrict_to_paths, append_only=args.append_only).serve()
return RepositoryServer(
restrict_to_paths=args.restrict_to_paths,
append_only=args.append_only,
storage_quota=args.storage_quota,
).serve()
@with_repository(create=True, exclusive=True, manifest=False)
def do_init(self, args, repository):
@ -2330,6 +2341,11 @@ def define_common_options(add_common_option):
'Access to all sub-directories is granted implicitly; PATH doesn\'t need to directly point to a repository.')
subparser.add_argument('--append-only', dest='append_only', action='store_true',
help='only allow appending to repository segment files')
subparser.add_argument('--storage-quota', dest='storage_quota', default=None,
type=parse_storage_quota,
help='Override storage quota of the repository (e.g. 5G, 1.5T). '
'When a new repository is initialized, sets the storage quota on the new '
'repository as well. Default: no quota.')
init_epilog = process_epilog("""
This command initializes an empty repository. A repository is a filesystem
@ -2420,6 +2436,9 @@ def define_common_options(add_common_option):
help='select encryption key mode **(required)**')
subparser.add_argument('-a', '--append-only', dest='append_only', action='store_true',
help='create an append-only mode repository')
subparser.add_argument('--storage-quota', dest='storage_quota', default=None,
type=parse_storage_quota,
help='Set storage quota of the new repository (e.g. 5G, 1.5T). Default: no quota.')
check_epilog = process_epilog("""
The check command verifies the consistency of a repository and the corresponding archives.
@ -3981,7 +4000,7 @@ def main(): # pragma: no cover
tb = "%s\n%s" % (traceback.format_exc(), sysinfo())
exit_code = e.exit_code
except RemoteRepository.RPCError as e:
important = e.exception_class not in ('LockTimeout', )
important = e.exception_class not in ('LockTimeout', ) and e.traceback
msgid = e.exception_class
tb_log_level = logging.ERROR if important else logging.DEBUG
if important:

View File

@ -178,7 +178,7 @@ class RepositoryServer: # pragma: no cover
'inject_exception',
)
def __init__(self, restrict_to_paths, append_only):
def __init__(self, restrict_to_paths, append_only, storage_quota):
self.repository = None
self.restrict_to_paths = restrict_to_paths
# This flag is parsed from the serve command line via Archiver.do_serve,
@ -186,6 +186,7 @@ def __init__(self, restrict_to_paths, append_only):
# whatever the client wants, except when initializing a new repository
# (see RepositoryServer.open below).
self.append_only = append_only
self.storage_quota = storage_quota
self.client_version = parse_version('1.0.8') # fallback version if client is too old to send version information
def positional_to_named(self, method, argv):
@ -252,8 +253,10 @@ def serve(self):
if dictFormat:
ex_short = traceback.format_exception_only(e.__class__, e)
ex_full = traceback.format_exception(*sys.exc_info())
ex_trace = True
if isinstance(e, Error):
ex_short = [e.get_message()]
ex_trace = e.traceback
if isinstance(e, (Repository.DoesNotExist, Repository.AlreadyExists, PathNotAllowed)):
# These exceptions are reconstructed on the client end in RemoteRepository.call_many(),
# and will be handled just like locally raised exceptions. Suppress the remote traceback
@ -268,6 +271,7 @@ def serve(self):
b'exception_args': e.args,
b'exception_full': ex_full,
b'exception_short': ex_short,
b'exception_trace': ex_trace,
b'sysinfo': sysinfo()})
except TypeError:
msg = msgpack.packb({MSGID: msgid,
@ -276,6 +280,7 @@ def serve(self):
for x in e.args],
b'exception_full': ex_full,
b'exception_short': ex_short,
b'exception_trace': ex_trace,
b'sysinfo': sysinfo()})
os_write(stdout_fd, msg)
@ -360,6 +365,7 @@ def open(self, path, create=False, lock_wait=None, lock=True, exclusive=None, ap
append_only = (not create and self.append_only) or append_only
self.repository = Repository(path, create, lock_wait=lock_wait, lock=lock,
append_only=append_only,
storage_quota=self.storage_quota,
exclusive=exclusive)
self.repository.__enter__() # clean exit handled by serve() method
return self.repository.id
@ -483,6 +489,10 @@ def get_message(self):
else:
return self.exception_class
@property
def traceback(self):
return self.unpacked.get(b'exception_trace', True)
@property
def exception_class(self):
return self.unpacked[b'exception_class'].decode()
@ -671,6 +681,9 @@ def borg_cmd(self, args, testing):
topic = 'borg.debug.' + topic
if 'repository' in topic:
opts.append('--debug-topic=%s' % topic)
if 'storage_quota' in args and args.storage_quota:
opts.append('--storage-quota=%s' % args.storage_quota)
env_vars = []
if not hostname_is_unique():
env_vars.append('BORG_HOSTNAME_IS_UNIQUE=no')

View File

@ -107,10 +107,14 @@ def __init__(self, id, repo):
class InsufficientFreeSpaceError(Error):
"""Insufficient free space to complete transaction (required: {}, available: {})."""
def __init__(self, path, create=False, exclusive=False, lock_wait=None, lock=True, append_only=False):
class StorageQuotaExceeded(Error):
"""The storage quota ({}) has been exceeded ({}). Try deleting some archives."""
def __init__(self, path, create=False, exclusive=False, lock_wait=None, lock=True,
append_only=False, storage_quota=None):
self.path = os.path.abspath(path)
self._location = Location('file://%s' % self.path)
self.io = None
self.io = None # type: LoggedIO
self.lock = None
self.index = None
# This is an index of shadowed log entries during this transaction. Consider the following sequence:
@ -124,6 +128,9 @@ def __init__(self, path, create=False, exclusive=False, lock_wait=None, lock=Tru
self.created = False
self.exclusive = exclusive
self.append_only = append_only
self.storage_quota = storage_quota
self.storage_quota_use = 0
self.transaction_doomed = None
def __del__(self):
if self.lock:
@ -209,6 +216,10 @@ def create(self, path):
config.set('repository', 'segments_per_dir', str(DEFAULT_SEGMENTS_PER_DIR))
config.set('repository', 'max_segment_size', str(DEFAULT_MAX_SEGMENT_SIZE))
config.set('repository', 'append_only', str(int(self.append_only)))
if self.storage_quota:
config.set('repository', 'storage_quota', str(self.storage_quota))
else:
config.set('repository', 'storage_quota', '0')
config.set('repository', 'additional_free_space', '0')
config.set('repository', 'id', bin_to_hex(os.urandom(32)))
self.save_config(path, config)
@ -331,6 +342,9 @@ def open(self, path, exclusive, lock_wait=None, lock=True):
# append_only can be set in the constructor
# it shouldn't be overridden (True -> False) here
self.append_only = self.append_only or self.config.getboolean('repository', 'append_only', fallback=False)
if self.storage_quota is None:
# self.storage_quota is None => no explicit storage_quota was specified, use repository setting.
self.storage_quota = self.config.getint('repository', 'storage_quota', fallback=0)
self.id = unhexlify(self.config.get('repository', 'id').strip())
self.io = LoggedIO(self.path, self.max_segment_size, self.segments_per_dir)
@ -346,7 +360,12 @@ def commit(self, save_space=False):
"""Commit transaction
"""
# save_space is not used anymore, but stays for RPC/API compatibility.
if self.transaction_doomed:
exception = self.transaction_doomed
self.rollback()
raise exception
self.check_free_space()
self.log_storage_quota()
self.io.write_commit()
if not self.append_only:
self.compact_segments()
@ -398,6 +417,7 @@ def prepare_txn(self, transaction_id, do_cleanup=True):
if transaction_id is None:
self.segments = {} # XXX bad name: usage_count_of_segment_x = self.segments[x]
self.compact = FreeSpace() # XXX bad name: freeable_space_of_segment_x = self.compact[x]
self.storage_quota_use = 0
self.shadow_index.clear()
else:
if do_cleanup:
@ -420,6 +440,7 @@ def prepare_txn(self, transaction_id, do_cleanup=True):
logger.debug('Upgrading from v1 hints.%d', transaction_id)
self.segments = hints[b'segments']
self.compact = FreeSpace()
self.storage_quota_use = 0
for segment in sorted(hints[b'compact']):
logger.debug('Rebuilding sparse info for segment %d', segment)
self._rebuild_sparse(segment)
@ -429,6 +450,8 @@ def prepare_txn(self, transaction_id, do_cleanup=True):
else:
self.segments = hints[b'segments']
self.compact = FreeSpace(hints[b'compact'])
self.storage_quota_use = hints.get(b'storage_quota_use', 0)
self.log_storage_quota()
# Drop uncommitted segments in the shadow index
for key, shadowed_segments in self.shadow_index.items():
for segment in list(shadowed_segments):
@ -438,7 +461,8 @@ def prepare_txn(self, transaction_id, do_cleanup=True):
def write_index(self):
hints = {b'version': 2,
b'segments': self.segments,
b'compact': self.compact}
b'compact': self.compact,
b'storage_quota_use': self.storage_quota_use, }
transaction_id = self.io.get_segments_transaction_id()
assert transaction_id is not None
hints_file = os.path.join(self.path, 'hints.%d' % transaction_id)
@ -515,6 +539,11 @@ def check_free_space(self):
formatted_free = format_file_size(free_space)
raise self.InsufficientFreeSpaceError(formatted_required, formatted_free)
def log_storage_quota(self):
if self.storage_quota:
logger.info('Storage quota: %s out of %s used.',
format_file_size(self.storage_quota_use), format_file_size(self.storage_quota))
def compact_segments(self):
"""Compact sparse segments by copying data into new segments
"""
@ -672,6 +701,7 @@ def _update_index(self, segment, objects, report=None):
pass
self.index[key] = segment, offset
self.segments[segment] += 1
self.storage_quota_use += size
elif tag == TAG_DELETE:
try:
# if the deleted PUT is not in the index, there is nothing to clean up
@ -684,6 +714,7 @@ def _update_index(self, segment, objects, report=None):
# is already gone, then it was already compacted.
self.segments[s] -= 1
size = self.io.read(s, offset, key, read_data=False)
self.storage_quota_use -= size
self.compact[s] += size
elif tag == TAG_COMMIT:
continue
@ -821,6 +852,7 @@ def _rollback(self, *, cleanup):
self.io.cleanup(self.io.get_segments_transaction_id())
self.index = None
self._active_txn = False
self.transaction_doomed = None
def rollback(self):
# note: when used in remote mode, this is time limited, see RemoteRepository.shutdown_time.
@ -915,14 +947,20 @@ def put(self, id, data, wait=True):
else:
self.segments[segment] -= 1
size = self.io.read(segment, offset, id, read_data=False)
self.storage_quota_use -= size
self.compact[segment] += size
segment, size = self.io.write_delete(id)
self.compact[segment] += size
self.segments.setdefault(segment, 0)
segment, offset = self.io.write_put(id, data)
self.storage_quota_use += len(data) + self.io.put_header_fmt.size
self.segments.setdefault(segment, 0)
self.segments[segment] += 1
self.index[id] = segment, offset
if self.storage_quota and self.storage_quota_use > self.storage_quota:
self.transaction_doomed = self.StorageQuotaExceeded(
format_file_size(self.storage_quota), format_file_size(self.storage_quota_use))
raise self.transaction_doomed
def delete(self, id, wait=True):
"""delete a repo object
@ -939,6 +977,7 @@ def delete(self, id, wait=True):
self.shadow_index.setdefault(id, []).append(segment)
self.segments[segment] -= 1
size = self.io.read(segment, offset, id, read_data=False)
self.storage_quota_use -= size
self.compact[segment] += size
segment, size = self.io.write_delete(id)
self.compact[segment] += size

View File

@ -32,7 +32,7 @@
from .. import xattr, helpers, platform
from ..archive import Archive, ChunkBuffer, flags_noatime, flags_normal
from ..archiver import Archiver
from ..archiver import Archiver, parse_storage_quota
from ..cache import Cache
from ..constants import * # NOQA
from ..crypto.low_level import bytes_to_long, num_aes_blocks
@ -3320,3 +3320,9 @@ def test_flag_position_independence(self, parse_vars_from_line, position, flag,
result[args_key] = args_value
assert parse_vars_from_line(*line) == result
def test_parse_storage_quota():
assert parse_storage_quota('50M') == 50 * 1000**2
with pytest.raises(argparse.ArgumentTypeError):
parse_storage_quota('5M')

View File

@ -415,6 +415,43 @@ def test_create_free_space(self):
assert not os.path.exists(self.repository.path)
class QuotaTestCase(RepositoryTestCaseBase):
def test_tracking(self):
assert self.repository.storage_quota_use == 0
self.repository.put(H(1), bytes(1234))
assert self.repository.storage_quota_use == 1234 + 41
self.repository.put(H(2), bytes(5678))
assert self.repository.storage_quota_use == 1234 + 5678 + 2 * 41
self.repository.delete(H(1))
assert self.repository.storage_quota_use == 5678 + 41
self.repository.commit()
self.reopen()
with self.repository:
# Open new transaction; hints and thus quota data is not loaded unless needed.
self.repository.put(H(3), b'')
self.repository.delete(H(3))
assert self.repository.storage_quota_use == 5678 + 41
def test_exceed_quota(self):
assert self.repository.storage_quota_use == 0
self.repository.storage_quota = 50
self.repository.put(H(1), b'')
assert self.repository.storage_quota_use == 41
self.repository.commit()
with pytest.raises(Repository.StorageQuotaExceeded):
self.repository.put(H(2), b'')
assert self.repository.storage_quota_use == 82
with pytest.raises(Repository.StorageQuotaExceeded):
self.repository.commit()
assert self.repository.storage_quota_use == 82
self.reopen()
with self.repository:
self.repository.storage_quota = 50
# Open new transaction; hints and thus quota data is not loaded unless needed.
self.repository.put(H(1), b'')
assert self.repository.storage_quota_use == 41
class NonceReservation(RepositoryTestCaseBase):
def test_get_free_nonce_asserts(self):
self.reopen(exclusive=False)
@ -641,6 +678,7 @@ def test_crash_before_compact(self):
@pytest.mark.skipif(sys.platform == 'cygwin', reason='remote is broken on cygwin and hangs')
class RemoteRepositoryTestCase(RepositoryTestCase):
repository = None # type: RemoteRepository
def open(self, create=False):
return RemoteRepository(Location('__testsuite__:' + os.path.join(self.tmppath, 'repository')),
@ -716,6 +754,10 @@ class MockArgs:
umask = 0o077
debug_topics = []
def __contains__(self, item):
# To behave like argparse.Namespace
return hasattr(self, item)
assert self.repository.borg_cmd(None, testing=True) == [sys.executable, '-m', 'borg.archiver', 'serve']
args = MockArgs()
# XXX without next line we get spurious test fails when using pytest-xdist, root cause unknown:
@ -727,6 +769,12 @@ class MockArgs:
args.debug_topics = ['something_client_side', 'repository_compaction']
assert self.repository.borg_cmd(args, testing=False) == ['borg-0.28.2', 'serve', '--umask=077', '--info',
'--debug-topic=borg.debug.repository_compaction']
args = MockArgs()
args.storage_quota = 0
assert self.repository.borg_cmd(args, testing=False) == ['borg', 'serve', '--umask=077', '--info']
args.storage_quota = 314159265
assert self.repository.borg_cmd(args, testing=False) == ['borg', 'serve', '--umask=077', '--info',
'--storage-quota=314159265']
class RemoteLegacyFree(RepositoryTestCaseBase):