Merge pull request #6415 from ThomasWaldmann/borg-key

borg key change-location, cleanups
This commit is contained in:
TW 2022-03-12 18:36:48 +01:00 committed by GitHub
commit d9d1e44b67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 432 additions and 283 deletions

View File

@ -44,7 +44,8 @@
from .cache import Cache, assert_secure, SecurityManager
from .constants import * # NOQA
from .compress import CompressionSpec
from .crypto.key import key_creator, key_argument_names, tam_required_file, tam_required, RepoKey
from .crypto.key import key_creator, key_argument_names, tam_required_file, tam_required
from .crypto.key import RepoKey, KeyfileKey, Blake2RepoKey, Blake2KeyfileKey
from .crypto.keymanager import KeyManager
from .helpers import EXIT_SUCCESS, EXIT_WARNING, EXIT_ERROR, EXIT_SIGNAL_BASE
from .helpers import Error, NoManifestError, set_ec
@ -363,6 +364,62 @@ def do_change_passphrase(self, args, repository, manifest, key):
logger.info('Key location: %s', key.find_key())
return EXIT_SUCCESS
@with_repository(exclusive=True, manifest=True, cache=True, compatibility=(Manifest.Operation.CHECK,))
def do_change_location(self, args, repository, manifest, key, cache):
"""Change repository key location"""
if not hasattr(key, 'change_passphrase'):
print('This repository is not encrypted, cannot change the key location.')
return EXIT_ERROR
if args.key_mode == 'keyfile':
if isinstance(key, RepoKey):
key_new = KeyfileKey(repository)
elif isinstance(key, Blake2RepoKey):
key_new = Blake2KeyfileKey(repository)
elif isinstance(key, (KeyfileKey, Blake2KeyfileKey)):
print(f"Location already is {args.key_mode}")
return EXIT_SUCCESS
else:
raise Error("Unsupported key type")
if args.key_mode == 'repokey':
if isinstance(key, KeyfileKey):
key_new = RepoKey(repository)
elif isinstance(key, Blake2KeyfileKey):
key_new = Blake2RepoKey(repository)
elif isinstance(key, (RepoKey, Blake2RepoKey)):
print(f"Location already is {args.key_mode}")
return EXIT_SUCCESS
else:
raise Error("Unsupported key type")
for name in ('repository_id', 'enc_key', 'enc_hmac_key', 'id_key', 'chunk_seed',
'tam_required', 'nonce_manager', 'cipher'):
value = getattr(key, name)
setattr(key_new, name, value)
key_new.target = key_new.get_new_target(args)
key_new.save(key_new.target, key._passphrase, create=True) # save with same passphrase
# rewrite the manifest with the new key, so that the key-type byte of the manifest changes
manifest.key = key_new
manifest.write()
repository.commit(compact=False)
# we need to rewrite cache config and security key-type info,
# so that the cached key-type will match the repo key-type.
cache.begin_txn() # need to start a cache transaction, otherwise commit() does nothing.
cache.key = key_new
cache.commit()
loc = key_new.find_key() if hasattr(key_new, 'find_key') else None
if args.keep:
logger.info(f'Key copied to {loc}')
else:
key.remove(key.target) # remove key from current location
logger.info(f'Key moved to {loc}')
return EXIT_SUCCESS
@with_repository(lock=False, exclusive=False, manifest=False, cache=False)
def do_key_export(self, args, repository):
"""Export the repository key for backup"""
@ -4250,6 +4307,28 @@ def define_borg_mount(parser):
subparser.add_argument('location', metavar='REPOSITORY', nargs='?', default='',
type=location_validator(archive=False))
change_location_epilog = process_epilog("""
Change the location of a borg key. The key can be stored at different locations:
keyfile: locally, usually in the home directory
repokey: inside the repo (in the repo config)
Note: this command does NOT change the crypto algorithms, just the key location,
thus you must ONLY give the key location (keyfile or repokey).
""")
subparser = key_parsers.add_parser('change-location', parents=[common_parser], add_help=False,
description=self.do_change_location.__doc__,
epilog=change_location_epilog,
formatter_class=argparse.RawDescriptionHelpFormatter,
help='change key location')
subparser.set_defaults(func=self.do_change_location)
subparser.add_argument('location', metavar='REPOSITORY', nargs='?', default='',
type=location_validator(archive=False))
subparser.add_argument('key_mode', metavar='KEY_LOCATION', choices=('repokey', 'keyfile'),
help='select key location')
subparser.add_argument('--keep', dest='keep', action='store_true',
help='keep the key also at the current location (default: remove it)')
# borg list
list_epilog = process_epilog("""
This command lists the contents of a repository or an archive.

View File

@ -104,6 +104,27 @@
PBKDF2_ITERATIONS = 100000
class KeyBlobStorage:
NO_STORAGE = 'no_storage'
KEYFILE = 'keyfile'
REPO = 'repository'
class KeyType:
KEYFILE = 0x00
# repos with PASSPHRASE mode could not be created any more since borg 1.0, see #97.
# in borg 1.3 all of its code and also the "borg key migrate-to-repokey" command was removed.
# if you still need to, you can use "borg key migrate-to-repokey" with borg 1.0, 1.1 and 1.2.
# Nowadays, we just dispatch this to RepoKey and assume the passphrase was migrated to a repokey.
PASSPHRASE = 0x01 # legacy, attic and borg < 1.0
PLAINTEXT = 0x02
REPO = 0x03
BLAKE2KEYFILE = 0x04
BLAKE2REPO = 0x05
BLAKE2AUTHENTICATED = 0x06
AUTHENTICATED = 0x07
REPOSITORY_README = """This is a Borg Backup repository.
See https://borgbackup.readthedocs.io/
"""

View File

@ -1,13 +1,9 @@
import configparser
import getpass
import hmac
import os
import shlex
import sys
import textwrap
import subprocess
from binascii import a2b_base64, b2a_base64, hexlify
from hashlib import sha256, sha512, pbkdf2_hmac
from hashlib import sha256
from ..logger import create_logger
@ -17,11 +13,10 @@
from ..compress import Compressor
from ..helpers import StableDict
from ..helpers import Error, IntegrityError
from ..helpers import yes
from ..helpers import get_keys_dir, get_security_dir
from ..helpers import get_limited_unpacker
from ..helpers import bin_to_hex
from ..helpers import prepare_subprocess_env
from ..helpers.passphrase import Passphrase, PasswordRetriesExceeded, PassphraseWrong
from ..helpers import msgpack
from ..item import Key, EncryptedKey
from ..platform import SaveFile
@ -31,22 +26,6 @@
from .low_level import AES256_CTR_HMAC_SHA256, AES256_CTR_BLAKE2b
class NoPassphraseFailure(Error):
"""can not acquire a passphrase: {}"""
class PassphraseWrong(Error):
"""passphrase supplied in BORG_PASSPHRASE, by BORG_PASSCOMMAND or via BORG_PASSPHRASE_FD is incorrect."""
class PasscommandFailure(Error):
"""passcommand supplied in BORG_PASSCOMMAND failed: {}"""
class PasswordRetriesExceeded(Error):
"""exceeded the maximum password retries"""
class UnsupportedPayloadError(Error):
"""Unsupported payload type {}. A newer version is required to access this repository."""
@ -97,12 +76,6 @@ class TAMUnsupportedSuiteError(IntegrityError):
traceback = False
class KeyBlobStorage:
NO_STORAGE = 'no_storage'
KEYFILE = 'keyfile'
REPO = 'repository'
def key_creator(repository, args):
for key in AVAILABLE_KEY_TYPES:
if key.ARG_NAME == args.encryption:
@ -118,8 +91,8 @@ def key_argument_names():
def identify_key(manifest_data):
key_type = manifest_data[0]
if key_type == PassphraseKey.TYPE:
return RepoKey # see comment in PassphraseKey class.
if key_type == KeyType.PASSPHRASE: # legacy, see comment in KeyType class.
return RepoKey
for key in AVAILABLE_KEY_TYPES:
if key.TYPE == key_type:
@ -145,6 +118,8 @@ def tam_required(repository):
class KeyBase:
# Numeric key type ID, must fit in one byte.
TYPE = None # override in subclasses
# set of key type IDs the class can handle as input
TYPES_ACCEPTABLE = None # override in subclasses
# Human-readable name
NAME = 'UNDEFINED'
@ -194,6 +169,11 @@ def assert_id(self, id, data):
if not hmac.compare_digest(id_computed, id):
raise IntegrityError('Chunk %s: id verification failed' % bin_to_hex(id))
def assert_type(self, type_byte, id=None):
if type_byte not in self.TYPES_ACCEPTABLE:
id_str = bin_to_hex(id) if id is not None else '(unknown)'
raise IntegrityError(f'Chunk {id_str}: Invalid encryption envelope')
def _tam_key(self, salt, context):
return hkdf_hmac_sha512(
ikm=self.id_key + self.enc_key + self.enc_hmac_key,
@ -258,7 +238,8 @@ def unpack_and_verify_manifest(self, data, force_tam_not_required=False):
class PlaintextKey(KeyBase):
TYPE = 0x02
TYPE = KeyType.PLAINTEXT
TYPES_ACCEPTABLE = {TYPE}
NAME = 'plaintext'
ARG_NAME = 'none'
STORAGE = KeyBlobStorage.NO_STORAGE
@ -287,9 +268,7 @@ def encrypt(self, chunk):
return b''.join([self.TYPE_STR, data])
def decrypt(self, id, data, decompress=True):
if data[0] != self.TYPE:
id_str = bin_to_hex(id) if id is not None else '(unknown)'
raise IntegrityError('Chunk %s: Invalid encryption envelope' % id_str)
self.assert_type(data[0], id)
payload = memoryview(data)[1:]
if not decompress:
return payload
@ -367,10 +346,7 @@ def encrypt(self, chunk):
return self.cipher.encrypt(data, header=self.TYPE_STR, iv=next_iv)
def decrypt(self, id, data, decompress=True):
if not (data[0] == self.TYPE or
data[0] == PassphraseKey.TYPE and isinstance(self, RepoKey)):
id_str = bin_to_hex(id) if id is not None else '(unknown)'
raise IntegrityError('Chunk %s: Invalid encryption envelope' % id_str)
self.assert_type(data[0], id)
try:
payload = self.cipher.decrypt(data)
except IntegrityError as e:
@ -396,9 +372,7 @@ def init_ciphers(self, manifest_data=None):
if manifest_data is None:
nonce = 0
else:
if not (manifest_data[0] == self.TYPE or
manifest_data[0] == PassphraseKey.TYPE and isinstance(self, RepoKey)):
raise IntegrityError('Manifest: Invalid encryption envelope')
self.assert_type(manifest_data[0])
# manifest_blocks is a safe upper bound on the amount of cipher blocks needed
# to encrypt the manifest. depending on the ciphersuite and overhead, it might
# be a bit too high, but that does not matter.
@ -408,125 +382,7 @@ def init_ciphers(self, manifest_data=None):
self.nonce_manager = NonceManager(self.repository, nonce)
class Passphrase(str):
@classmethod
def _env_passphrase(cls, env_var, default=None):
passphrase = os.environ.get(env_var, default)
if passphrase is not None:
return cls(passphrase)
@classmethod
def env_passphrase(cls, default=None):
passphrase = cls._env_passphrase('BORG_PASSPHRASE', default)
if passphrase is not None:
return passphrase
passphrase = cls.env_passcommand()
if passphrase is not None:
return passphrase
passphrase = cls.fd_passphrase()
if passphrase is not None:
return passphrase
@classmethod
def env_passcommand(cls, default=None):
passcommand = os.environ.get('BORG_PASSCOMMAND', None)
if passcommand is not None:
# passcommand is a system command (not inside pyinstaller env)
env = prepare_subprocess_env(system=True)
try:
passphrase = subprocess.check_output(shlex.split(passcommand), universal_newlines=True, env=env)
except (subprocess.CalledProcessError, FileNotFoundError) as e:
raise PasscommandFailure(e)
return cls(passphrase.rstrip('\n'))
@classmethod
def fd_passphrase(cls):
try:
fd = int(os.environ.get('BORG_PASSPHRASE_FD'))
except (ValueError, TypeError):
return None
with os.fdopen(fd, mode='r') as f:
passphrase = f.read()
return cls(passphrase.rstrip('\n'))
@classmethod
def env_new_passphrase(cls, default=None):
return cls._env_passphrase('BORG_NEW_PASSPHRASE', default)
@classmethod
def getpass(cls, prompt):
try:
pw = getpass.getpass(prompt)
except EOFError:
if prompt:
print() # avoid err msg appearing right of prompt
msg = []
for env_var in 'BORG_PASSPHRASE', 'BORG_PASSCOMMAND':
env_var_set = os.environ.get(env_var) is not None
msg.append('{} is {}.'.format(env_var, 'set' if env_var_set else 'not set'))
msg.append('Interactive password query failed.')
raise NoPassphraseFailure(' '.join(msg)) from None
else:
return cls(pw)
@classmethod
def verification(cls, passphrase):
msg = 'Do you want your passphrase to be displayed for verification? [yN]: '
if yes(msg, retry_msg=msg, invalid_msg='Invalid answer, try again.',
retry=True, env_var_override='BORG_DISPLAY_PASSPHRASE'):
print('Your passphrase (between double-quotes): "%s"' % passphrase,
file=sys.stderr)
print('Make sure the passphrase displayed above is exactly what you wanted.',
file=sys.stderr)
try:
passphrase.encode('ascii')
except UnicodeEncodeError:
print('Your passphrase (UTF-8 encoding in hex): %s' %
bin_to_hex(passphrase.encode('utf-8')),
file=sys.stderr)
print('As you have a non-ASCII passphrase, it is recommended to keep the UTF-8 encoding in hex together with the passphrase at a safe place.',
file=sys.stderr)
@classmethod
def new(cls, allow_empty=False):
passphrase = cls.env_new_passphrase()
if passphrase is not None:
return passphrase
passphrase = cls.env_passphrase()
if passphrase is not None:
return passphrase
for retry in range(1, 11):
passphrase = cls.getpass('Enter new passphrase: ')
if allow_empty or passphrase:
passphrase2 = cls.getpass('Enter same passphrase again: ')
if passphrase == passphrase2:
cls.verification(passphrase)
logger.info('Remember your passphrase. Your data will be inaccessible without it.')
return passphrase
else:
print('Passphrases do not match', file=sys.stderr)
else:
print('Passphrase must not be blank', file=sys.stderr)
else:
raise PasswordRetriesExceeded
def __repr__(self):
return '<Passphrase "***hidden***">'
def kdf(self, salt, iterations, length):
return pbkdf2_hmac('sha256', self.encode('utf-8'), salt, iterations, length)
class PassphraseKey:
# this is only a stub, repos with this mode could not be created any more since borg 1.0, see #97.
# in borg 1.3 all of its code and also the "borg key migrate-to-repokey" command was removed.
# if you still need to, you can use "borg key migrate-to-repokey" with borg 1.0, 1.1 and 1.2.
# Nowadays, we just dispatch this to RepoKey and assume the passphrase was migrated to a repokey.
TYPE = 0x01
NAME = 'passphrase'
class KeyfileKeyBase(AESKeyBase):
class FlexiKeyBase(AESKeyBase):
@classmethod
def detect(cls, repository, manifest_data):
key = cls(repository)
@ -639,11 +495,8 @@ def get_new_target(self, args):
raise NotImplementedError
class KeyfileKey(ID_HMAC_SHA_256, KeyfileKeyBase):
TYPE = 0x00
NAME = 'key file'
ARG_NAME = 'keyfile'
STORAGE = KeyBlobStorage.KEYFILE
class FlexiKey(ID_HMAC_SHA_256, FlexiKeyBase):
TYPES_ACCEPTABLE = {KeyType.KEYFILE, KeyType.REPO, KeyType.PASSPHRASE}
FILE_ID = 'BORG_KEY'
@ -660,13 +513,23 @@ def sanity_check(self, filename, id):
return filename
def find_key(self):
keyfile = self._find_key_file_from_environment()
if keyfile is not None:
return self.sanity_check(keyfile, self.repository.id)
keyfile = self._find_key_in_keys_dir()
if keyfile is not None:
return keyfile
raise KeyfileNotFoundError(self.repository._location.canonical_path(), get_keys_dir())
if self.STORAGE == KeyBlobStorage.KEYFILE:
keyfile = self._find_key_file_from_environment()
if keyfile is not None:
return self.sanity_check(keyfile, self.repository.id)
keyfile = self._find_key_in_keys_dir()
if keyfile is not None:
return keyfile
raise KeyfileNotFoundError(self.repository._location.canonical_path(), get_keys_dir())
elif self.STORAGE == KeyBlobStorage.REPO:
loc = self.repository._location.canonical_path()
try:
self.repository.load_key()
return loc
except configparser.NoOptionError:
raise RepoKeyNotFoundError(loc) from None
else:
raise TypeError('Unsupported borg key storage type')
def get_existing_or_new_target(self, args):
keyfile = self._find_key_file_from_environment()
@ -688,10 +551,15 @@ def _find_key_in_keys_dir(self):
pass
def get_new_target(self, args):
keyfile = self._find_key_file_from_environment()
if keyfile is not None:
return keyfile
return self._get_new_target_in_keys_dir(args)
if self.STORAGE == KeyBlobStorage.KEYFILE:
keyfile = self._find_key_file_from_environment()
if keyfile is not None:
return keyfile
return self._get_new_target_in_keys_dir(args)
elif self.STORAGE == KeyBlobStorage.REPO:
return self.repository
else:
raise TypeError('Unsupported borg key storage type')
def _find_key_file_from_environment(self):
keyfile = os.environ.get('BORG_KEY_FILE')
@ -708,86 +576,88 @@ def _get_new_target_in_keys_dir(self, args):
return path
def load(self, target, passphrase):
with open(target) as fd:
key_data = ''.join(fd.readlines()[1:])
if self.STORAGE == KeyBlobStorage.KEYFILE:
with open(target) as fd:
key_data = ''.join(fd.readlines()[1:])
elif self.STORAGE == KeyBlobStorage.REPO:
# While the repository is encrypted, we consider a repokey repository with a blank
# passphrase an unencrypted repository.
self.logically_encrypted = passphrase != ''
# what we get in target is just a repo location, but we already have the repo obj:
target = self.repository
key_data = target.load_key()
key_data = key_data.decode('utf-8') # remote repo: msgpack issue #99, getting bytes
else:
raise TypeError('Unsupported borg key storage type')
success = self._load(key_data, passphrase)
if success:
self.target = target
return success
def save(self, target, passphrase, create=False):
if create and os.path.isfile(target):
# if a new keyfile key repository is created, ensure that an existing keyfile of another
# keyfile key repo is not accidentally overwritten by careless use of the BORG_KEY_FILE env var.
# see issue #6036
raise Error('Aborting because key in "%s" already exists.' % target)
key_data = self._save(passphrase)
with SaveFile(target) as fd:
fd.write(f'{self.FILE_ID} {bin_to_hex(self.repository_id)}\n')
fd.write(key_data)
fd.write('\n')
if self.STORAGE == KeyBlobStorage.KEYFILE:
if create and os.path.isfile(target):
# if a new keyfile key repository is created, ensure that an existing keyfile of another
# keyfile key repo is not accidentally overwritten by careless use of the BORG_KEY_FILE env var.
# see issue #6036
raise Error('Aborting because key in "%s" already exists.' % target)
with SaveFile(target) as fd:
fd.write(f'{self.FILE_ID} {bin_to_hex(self.repository_id)}\n')
fd.write(key_data)
fd.write('\n')
elif self.STORAGE == KeyBlobStorage.REPO:
self.logically_encrypted = passphrase != ''
key_data = key_data.encode('utf-8') # remote repo: msgpack issue #99, giving bytes
target.save_key(key_data)
else:
raise TypeError('Unsupported borg key storage type')
self.target = target
def remove(self, target):
if self.STORAGE == KeyBlobStorage.KEYFILE:
os.remove(target)
elif self.STORAGE == KeyBlobStorage.REPO:
target.save_key(b'') # save empty key (no new api at remote repo necessary)
else:
raise TypeError('Unsupported borg key storage type')
class RepoKey(ID_HMAC_SHA_256, KeyfileKeyBase):
TYPE = 0x03
class KeyfileKey(FlexiKey):
TYPE = KeyType.KEYFILE
NAME = 'key file'
ARG_NAME = 'keyfile'
STORAGE = KeyBlobStorage.KEYFILE
class RepoKey(FlexiKey):
TYPE = KeyType.REPO
NAME = 'repokey'
ARG_NAME = 'repokey'
STORAGE = KeyBlobStorage.REPO
def find_key(self):
loc = self.repository._location.canonical_path()
try:
self.repository.load_key()
return loc
except configparser.NoOptionError:
raise RepoKeyNotFoundError(loc) from None
def get_new_target(self, args):
return self.repository
def load(self, target, passphrase):
# While the repository is encrypted, we consider a repokey repository with a blank
# passphrase an unencrypted repository.
self.logically_encrypted = passphrase != ''
# what we get in target is just a repo location, but we already have the repo obj:
target = self.repository
key_data = target.load_key()
key_data = key_data.decode('utf-8') # remote repo: msgpack issue #99, getting bytes
success = self._load(key_data, passphrase)
if success:
self.target = target
return success
def save(self, target, passphrase, create=False):
self.logically_encrypted = passphrase != ''
key_data = self._save(passphrase)
key_data = key_data.encode('utf-8') # remote repo: msgpack issue #99, giving bytes
target.save_key(key_data)
self.target = target
class Blake2FlexiKey(ID_BLAKE2b_256, FlexiKey):
TYPES_ACCEPTABLE = {KeyType.BLAKE2KEYFILE, KeyType.BLAKE2REPO}
CIPHERSUITE = AES256_CTR_BLAKE2b
class Blake2KeyfileKey(ID_BLAKE2b_256, KeyfileKey):
TYPE = 0x04
class Blake2KeyfileKey(Blake2FlexiKey):
TYPE = KeyType.BLAKE2KEYFILE
NAME = 'key file BLAKE2b'
ARG_NAME = 'keyfile-blake2'
STORAGE = KeyBlobStorage.KEYFILE
FILE_ID = 'BORG_KEY'
CIPHERSUITE = AES256_CTR_BLAKE2b
class Blake2RepoKey(ID_BLAKE2b_256, RepoKey):
TYPE = 0x05
class Blake2RepoKey(Blake2FlexiKey):
TYPE = KeyType.BLAKE2REPO
NAME = 'repokey BLAKE2b'
ARG_NAME = 'repokey-blake2'
STORAGE = KeyBlobStorage.REPO
CIPHERSUITE = AES256_CTR_BLAKE2b
class AuthenticatedKeyBase(RepoKey):
class AuthenticatedKeyBase(FlexiKey):
STORAGE = KeyBlobStorage.REPO
# It's only authenticated, not encrypted.
@ -803,17 +673,15 @@ def save(self, target, passphrase, create=False):
self.logically_encrypted = False
def init_ciphers(self, manifest_data=None):
if manifest_data is not None and manifest_data[0] != self.TYPE:
raise IntegrityError('Manifest: Invalid encryption envelope')
if manifest_data is not None:
self.assert_type(manifest_data[0])
def encrypt(self, chunk):
data = self.compressor.compress(chunk)
return b''.join([self.TYPE_STR, data])
def decrypt(self, id, data, decompress=True):
if data[0] != self.TYPE:
id_str = bin_to_hex(id) if id is not None else '(unknown)'
raise IntegrityError('Chunk %s: Invalid envelope' % id_str)
self.assert_type(data[0], id)
payload = memoryview(data)[1:]
if not decompress:
return payload
@ -823,13 +691,15 @@ def decrypt(self, id, data, decompress=True):
class AuthenticatedKey(AuthenticatedKeyBase):
TYPE = 0x07
TYPE = KeyType.AUTHENTICATED
TYPES_ACCEPTABLE = {TYPE}
NAME = 'authenticated'
ARG_NAME = 'authenticated'
class Blake2AuthenticatedKey(ID_BLAKE2b_256, AuthenticatedKeyBase):
TYPE = 0x06
TYPE = KeyType.BLAKE2AUTHENTICATED
TYPES_ACCEPTABLE = {TYPE}
NAME = 'authenticated BLAKE2b'
ARG_NAME = 'authenticated-blake2'

View File

@ -0,0 +1,141 @@
import getpass
import os
import shlex
import subprocess
import sys
from hashlib import pbkdf2_hmac
from . import bin_to_hex
from . import Error
from . import yes
from . import prepare_subprocess_env
from ..logger import create_logger
logger = create_logger()
class NoPassphraseFailure(Error):
"""can not acquire a passphrase: {}"""
class PassphraseWrong(Error):
"""passphrase supplied in BORG_PASSPHRASE, by BORG_PASSCOMMAND or via BORG_PASSPHRASE_FD is incorrect."""
class PasscommandFailure(Error):
"""passcommand supplied in BORG_PASSCOMMAND failed: {}"""
class PasswordRetriesExceeded(Error):
"""exceeded the maximum password retries"""
class Passphrase(str):
@classmethod
def _env_passphrase(cls, env_var, default=None):
passphrase = os.environ.get(env_var, default)
if passphrase is not None:
return cls(passphrase)
@classmethod
def env_passphrase(cls, default=None):
passphrase = cls._env_passphrase('BORG_PASSPHRASE', default)
if passphrase is not None:
return passphrase
passphrase = cls.env_passcommand()
if passphrase is not None:
return passphrase
passphrase = cls.fd_passphrase()
if passphrase is not None:
return passphrase
@classmethod
def env_passcommand(cls, default=None):
passcommand = os.environ.get('BORG_PASSCOMMAND', None)
if passcommand is not None:
# passcommand is a system command (not inside pyinstaller env)
env = prepare_subprocess_env(system=True)
try:
passphrase = subprocess.check_output(shlex.split(passcommand), universal_newlines=True, env=env)
except (subprocess.CalledProcessError, FileNotFoundError) as e:
raise PasscommandFailure(e)
return cls(passphrase.rstrip('\n'))
@classmethod
def fd_passphrase(cls):
try:
fd = int(os.environ.get('BORG_PASSPHRASE_FD'))
except (ValueError, TypeError):
return None
with os.fdopen(fd, mode='r') as f:
passphrase = f.read()
return cls(passphrase.rstrip('\n'))
@classmethod
def env_new_passphrase(cls, default=None):
return cls._env_passphrase('BORG_NEW_PASSPHRASE', default)
@classmethod
def getpass(cls, prompt):
try:
pw = getpass.getpass(prompt)
except EOFError:
if prompt:
print() # avoid err msg appearing right of prompt
msg = []
for env_var in 'BORG_PASSPHRASE', 'BORG_PASSCOMMAND':
env_var_set = os.environ.get(env_var) is not None
msg.append('{} is {}.'.format(env_var, 'set' if env_var_set else 'not set'))
msg.append('Interactive password query failed.')
raise NoPassphraseFailure(' '.join(msg)) from None
else:
return cls(pw)
@classmethod
def verification(cls, passphrase):
msg = 'Do you want your passphrase to be displayed for verification? [yN]: '
if yes(msg, retry_msg=msg, invalid_msg='Invalid answer, try again.',
retry=True, env_var_override='BORG_DISPLAY_PASSPHRASE'):
print('Your passphrase (between double-quotes): "%s"' % passphrase,
file=sys.stderr)
print('Make sure the passphrase displayed above is exactly what you wanted.',
file=sys.stderr)
try:
passphrase.encode('ascii')
except UnicodeEncodeError:
print('Your passphrase (UTF-8 encoding in hex): %s' %
bin_to_hex(passphrase.encode('utf-8')),
file=sys.stderr)
print('As you have a non-ASCII passphrase, it is recommended to keep the '
'UTF-8 encoding in hex together with the passphrase at a safe place.',
file=sys.stderr)
@classmethod
def new(cls, allow_empty=False):
passphrase = cls.env_new_passphrase()
if passphrase is not None:
return passphrase
passphrase = cls.env_passphrase()
if passphrase is not None:
return passphrase
for retry in range(1, 11):
passphrase = cls.getpass('Enter new passphrase: ')
if allow_empty or passphrase:
passphrase2 = cls.getpass('Enter same passphrase again: ')
if passphrase == passphrase2:
cls.verification(passphrase)
logger.info('Remember your passphrase. Your data will be inaccessible without it.')
return passphrase
else:
print('Passphrases do not match', file=sys.stderr)
else:
print('Passphrase must not be blank', file=sys.stderr)
else:
raise PasswordRetriesExceeded
def __repr__(self):
return '<Passphrase "***hidden***">'
def kdf(self, salt, iterations, length):
return pbkdf2_hmac('sha256', self.encode('utf-8'), salt, iterations, length)

View File

@ -334,11 +334,13 @@ def save_config(self, path, config):
def save_key(self, keydata):
assert self.config
keydata = keydata.decode('utf-8') # remote repo: msgpack issue #99, getting bytes
# note: saving an empty key means that there is no repokey any more
self.config.set('repository', 'key', keydata)
self.save_config(self.path, self.config)
def load_key(self):
keydata = self.config.get('repository', 'key')
# note: if we return an empty string, it means there is no repo key
return keydata.encode('utf-8') # remote repo: msgpack issue #99, returning bytes
def get_free_nonce(self):

View File

@ -36,7 +36,7 @@
from ..chunker import has_seek_hole
from ..constants import * # NOQA
from ..crypto.low_level import bytes_to_long, num_cipher_blocks
from ..crypto.key import KeyfileKeyBase, RepoKey, KeyfileKey, Passphrase, TAMRequiredError
from ..crypto.key import FlexiKeyBase, RepoKey, KeyfileKey, Passphrase, TAMRequiredError
from ..crypto.keymanager import RepoIdMismatch, NotABorgKeyFile
from ..crypto.file_integrity import FileIntegrityError
from ..helpers import Location, get_security_dir
@ -2490,6 +2490,38 @@ def test_change_passphrase(self):
os.environ['BORG_PASSPHRASE'] = 'newpassphrase'
self.cmd('list', self.repository_location)
def test_change_location_to_keyfile(self):
self.cmd('init', '--encryption=repokey', self.repository_location)
log = self.cmd('info', self.repository_location)
assert '(repokey)' in log
self.cmd('key', 'change-location', self.repository_location, 'keyfile')
log = self.cmd('info', self.repository_location)
assert '(key file)' in log
def test_change_location_to_b2keyfile(self):
self.cmd('init', '--encryption=repokey-blake2', self.repository_location)
log = self.cmd('info', self.repository_location)
assert '(repokey BLAKE2b)' in log
self.cmd('key', 'change-location', self.repository_location, 'keyfile')
log = self.cmd('info', self.repository_location)
assert '(key file BLAKE2b)' in log
def test_change_location_to_repokey(self):
self.cmd('init', '--encryption=keyfile', self.repository_location)
log = self.cmd('info', self.repository_location)
assert '(key file)' in log
self.cmd('key', 'change-location', self.repository_location, 'repokey')
log = self.cmd('info', self.repository_location)
assert '(repokey)' in log
def test_change_location_to_b2repokey(self):
self.cmd('init', '--encryption=keyfile-blake2', self.repository_location)
log = self.cmd('info', self.repository_location)
assert '(key file BLAKE2b)' in log
self.cmd('key', 'change-location', self.repository_location, 'repokey')
log = self.cmd('info', self.repository_location)
assert '(repokey BLAKE2b)' in log
def test_break_lock(self):
self.cmd('init', '--encryption=repokey', self.repository_location)
self.cmd('break-lock', self.repository_location)
@ -2850,7 +2882,7 @@ def test_init_interrupt(self):
def raise_eof(*args):
raise EOFError
with patch.object(KeyfileKeyBase, 'create', raise_eof):
with patch.object(FlexiKeyBase, 'create', raise_eof):
self.cmd('init', '--encryption=repokey', self.repository_location, exit_code=1)
assert not os.path.exists(self.repository_location)

View File

@ -1,4 +1,5 @@
import errno
import getpass
import hashlib
import os
import shutil
@ -32,6 +33,7 @@
from ..helpers import iter_separated
from ..helpers import eval_escapes
from ..helpers import safe_unlink
from ..helpers.passphrase import Passphrase, PasswordRetriesExceeded
from . import BaseTestCase, FakeInputs
@ -1164,3 +1166,44 @@ def os_unlink(_):
safe_unlink(hard_link)
assert victim.read_binary() == contents
class TestPassphrase:
def test_passphrase_new_verification(self, capsys, monkeypatch):
monkeypatch.setattr(getpass, 'getpass', lambda prompt: "12aöäü")
monkeypatch.setenv('BORG_DISPLAY_PASSPHRASE', 'no')
Passphrase.new()
out, err = capsys.readouterr()
assert "12" not in out
assert "12" not in err
monkeypatch.setenv('BORG_DISPLAY_PASSPHRASE', 'yes')
passphrase = Passphrase.new()
out, err = capsys.readouterr()
assert "313261c3b6c3a4c3bc" not in out
assert "313261c3b6c3a4c3bc" in err
assert passphrase == "12aöäü"
monkeypatch.setattr(getpass, 'getpass', lambda prompt: "1234/@=")
Passphrase.new()
out, err = capsys.readouterr()
assert "1234/@=" not in out
assert "1234/@=" in err
def test_passphrase_new_empty(self, capsys, monkeypatch):
monkeypatch.delenv('BORG_PASSPHRASE', False)
monkeypatch.setattr(getpass, 'getpass', lambda prompt: "")
with pytest.raises(PasswordRetriesExceeded):
Passphrase.new(allow_empty=False)
out, err = capsys.readouterr()
assert "must not be blank" in err
def test_passphrase_new_retries(self, monkeypatch):
monkeypatch.delenv('BORG_PASSPHRASE', False)
ascending_numbers = iter(range(20))
monkeypatch.setattr(getpass, 'getpass', lambda prompt: str(next(ascending_numbers)))
with pytest.raises(PasswordRetriesExceeded):
Passphrase.new()
def test_passphrase_repr(self):
assert "secret" not in repr(Passphrase("secret"))

View File

@ -6,7 +6,7 @@
import pytest
from ..crypto.key import Passphrase, PasswordRetriesExceeded, bin_to_hex
from ..crypto.key import bin_to_hex
from ..crypto.key import PlaintextKey, AuthenticatedKey, RepoKey, KeyfileKey, \
Blake2KeyfileKey, Blake2RepoKey, Blake2AuthenticatedKey
from ..crypto.key import ID_HMAC_SHA_256, ID_BLAKE2b_256
@ -184,7 +184,9 @@ def test_keyfile_blake2(self, monkeypatch, keys_dir):
def _corrupt_byte(self, key, data, offset):
data = bytearray(data)
data[offset] ^= 1
# note: we corrupt in a way so that even corruption of the unauthenticated encryption type byte
# will trigger an IntegrityError (does not happen while we stay within TYPES_ACCEPTABLE).
data[offset] ^= 64
with pytest.raises(IntegrityErrorBase):
key.decrypt(b'', data)
@ -253,47 +255,6 @@ def test_blake2_authenticated_encrypt(self, monkeypatch):
assert authenticated == b'\x06\x00\x00' + plaintext
class TestPassphrase:
def test_passphrase_new_verification(self, capsys, monkeypatch):
monkeypatch.setattr(getpass, 'getpass', lambda prompt: "12aöäü")
monkeypatch.setenv('BORG_DISPLAY_PASSPHRASE', 'no')
Passphrase.new()
out, err = capsys.readouterr()
assert "12" not in out
assert "12" not in err
monkeypatch.setenv('BORG_DISPLAY_PASSPHRASE', 'yes')
passphrase = Passphrase.new()
out, err = capsys.readouterr()
assert "313261c3b6c3a4c3bc" not in out
assert "313261c3b6c3a4c3bc" in err
assert passphrase == "12aöäü"
monkeypatch.setattr(getpass, 'getpass', lambda prompt: "1234/@=")
Passphrase.new()
out, err = capsys.readouterr()
assert "1234/@=" not in out
assert "1234/@=" in err
def test_passphrase_new_empty(self, capsys, monkeypatch):
monkeypatch.delenv('BORG_PASSPHRASE', False)
monkeypatch.setattr(getpass, 'getpass', lambda prompt: "")
with pytest.raises(PasswordRetriesExceeded):
Passphrase.new(allow_empty=False)
out, err = capsys.readouterr()
assert "must not be blank" in err
def test_passphrase_new_retries(self, monkeypatch):
monkeypatch.delenv('BORG_PASSPHRASE', False)
ascending_numbers = iter(range(20))
monkeypatch.setattr(getpass, 'getpass', lambda prompt: str(next(ascending_numbers)))
with pytest.raises(PasswordRetriesExceeded):
Passphrase.new()
def test_passphrase_repr(self):
assert "secret" not in repr(Passphrase("secret"))
class TestTAM:
@pytest.fixture
def key(self, monkeypatch):