mirror of
https://github.com/Chia-Network/chia-blockchain.git
synced 2024-09-20 08:05:33 +03:00
cmds|server|util: Fix, refactor and improve SSL permission checks (#8211)
* server|util: Refactor `verify_ssl_certs_and_keys` Let it take two lists instead of one with an tuple holding optional paths. * util: Introduce `get_ssl_perm_warning` * util: Drop some redundant brackets * util: Introduce `get_all_ssl_file_paths` * util: Print warnings for CLI only * cmds|util: Call `check_ssl` for all CLI commands
This commit is contained in:
parent
a2de2d8f30
commit
4130551b23
@ -16,6 +16,7 @@ from chia.cmds.wallet import wallet_cmd
|
||||
from chia.cmds.plotnft import plotnft_cmd
|
||||
from chia.util.default_root import DEFAULT_KEYS_ROOT_PATH, DEFAULT_ROOT_PATH
|
||||
from chia.util.keychain import set_keys_root_path, supports_keyring_passphrase
|
||||
from chia.util.ssl import check_ssl
|
||||
from typing import Optional
|
||||
|
||||
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
|
||||
@ -72,6 +73,8 @@ def cli(
|
||||
except Exception as e:
|
||||
print(f"Failed to read passphrase: {e}")
|
||||
|
||||
check_ssl(Path(root_path))
|
||||
|
||||
|
||||
if not supports_keyring_passphrase():
|
||||
from chia.cmds.passphrase_funcs import remove_passphrase_options_from_cmd
|
||||
|
@ -31,7 +31,6 @@ from chia.util.ssl import (
|
||||
RESTRICT_MASK_CERT_FILE,
|
||||
RESTRICT_MASK_KEY_FILE,
|
||||
check_and_fix_permissions_for_ssl_file,
|
||||
check_ssl,
|
||||
fix_ssl,
|
||||
)
|
||||
from chia.wallet.derive_keys import master_sk_to_pool_sk, master_sk_to_wallet_sk
|
||||
@ -329,9 +328,7 @@ def chia_full_version_str() -> str:
|
||||
return f"{major}.{minor}.{patch}{dev}"
|
||||
|
||||
|
||||
def chia_init(
|
||||
root_path: Path, *, should_check_keys: bool = True, should_check_ssl: bool = True, fix_ssl_permissions: bool = False
|
||||
):
|
||||
def chia_init(root_path: Path, *, should_check_keys: bool = True, fix_ssl_permissions: bool = False):
|
||||
"""
|
||||
Standard first run initialization or migration steps. Handles config creation,
|
||||
generation of SSL certs, and setting target addresses (via check_keys).
|
||||
@ -353,8 +350,6 @@ def chia_init(
|
||||
# before a new update.
|
||||
if fix_ssl_permissions:
|
||||
fix_ssl(root_path)
|
||||
elif should_check_ssl:
|
||||
check_ssl(root_path)
|
||||
if should_check_keys:
|
||||
check_keys(root_path)
|
||||
print(f"{root_path} already exists, no migration action taken")
|
||||
@ -364,8 +359,6 @@ def chia_init(
|
||||
create_all_ssl(root_path)
|
||||
if fix_ssl_permissions:
|
||||
fix_ssl(root_path)
|
||||
elif should_check_ssl:
|
||||
check_ssl(root_path)
|
||||
if should_check_keys:
|
||||
check_keys(root_path)
|
||||
print("")
|
||||
|
@ -39,7 +39,7 @@ def ssl_context_for_server(
|
||||
log: Optional[logging.Logger] = None,
|
||||
) -> Optional[ssl.SSLContext]:
|
||||
if check_permissions:
|
||||
verify_ssl_certs_and_keys([(ca_cert, ca_key), (private_cert_path, private_key_path)], log)
|
||||
verify_ssl_certs_and_keys([ca_cert, private_cert_path], [ca_key, private_key_path], log)
|
||||
|
||||
ssl_context = ssl._create_unverified_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=str(ca_cert))
|
||||
ssl_context.check_hostname = False
|
||||
@ -52,7 +52,7 @@ def ssl_context_for_root(
|
||||
ca_cert_file: str, *, check_permissions: bool = True, log: Optional[logging.Logger] = None
|
||||
) -> Optional[ssl.SSLContext]:
|
||||
if check_permissions:
|
||||
verify_ssl_certs_and_keys([(Path(ca_cert_file), None)], log)
|
||||
verify_ssl_certs_and_keys([Path(ca_cert_file)], [], log)
|
||||
|
||||
ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_cert_file)
|
||||
return ssl_context
|
||||
@ -68,7 +68,7 @@ def ssl_context_for_client(
|
||||
log: Optional[logging.Logger] = None,
|
||||
) -> Optional[ssl.SSLContext]:
|
||||
if check_permissions:
|
||||
verify_ssl_certs_and_keys([(ca_cert, ca_key), (private_cert_path, private_key_path)], log)
|
||||
verify_ssl_certs_and_keys([ca_cert, private_cert_path], [ca_key, private_key_path], log)
|
||||
|
||||
ssl_context = ssl._create_unverified_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=str(ca_cert))
|
||||
ssl_context.check_hostname = False
|
||||
|
161
chia/util/ssl.py
161
chia/util/ssl.py
@ -58,54 +58,67 @@ KEY_CONFIG_KEY_PATHS = [
|
||||
warned_ssl_files: Set[Path] = set()
|
||||
|
||||
|
||||
def print_ssl_perm_warning(
|
||||
path: Path, actual_mode: int, expected_mode: int, *, show_banner: bool = True, log: Optional[Logger] = None
|
||||
) -> None:
|
||||
if path not in warned_ssl_files:
|
||||
if show_banner and len(warned_ssl_files) == 0:
|
||||
print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
|
||||
print("@ WARNING: UNPROTECTED SSL FILE! @")
|
||||
print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
|
||||
msg = (
|
||||
f"Permissions {octal_mode_string(actual_mode)} for "
|
||||
f"'{path}' are too open. " # lgtm [py/clear-text-logging-sensitive-data]
|
||||
f"Expected {octal_mode_string(expected_mode)}"
|
||||
)
|
||||
if log is not None:
|
||||
log.error(f"{msg}")
|
||||
print(f"{msg}")
|
||||
warned_ssl_files.add(path)
|
||||
def get_all_ssl_file_paths(root_path: Path) -> Tuple[List[Path], List[Path]]:
|
||||
"""Lookup config values and append to a list of files whose permissions we need to check"""
|
||||
from chia.ssl.create_ssl import get_mozilla_ca_crt
|
||||
|
||||
all_certs: List[Path] = []
|
||||
all_keys: List[Path] = []
|
||||
|
||||
try:
|
||||
config: Dict = load_config(root_path, "config.yaml", exit_on_error=False)
|
||||
for paths, parsed_list in [(CERT_CONFIG_KEY_PATHS, all_certs), (KEY_CONFIG_KEY_PATHS, all_keys)]:
|
||||
for path in paths:
|
||||
try:
|
||||
file = root_path / Path(traverse_dict(config, path))
|
||||
parsed_list.append(file)
|
||||
except Exception as e:
|
||||
print(
|
||||
f"Failed to lookup config value for {path}: {e}"
|
||||
) # lgtm [py/clear-text-logging-sensitive-data]
|
||||
|
||||
# Check the Mozilla Root CAs as well
|
||||
all_certs.append(Path(get_mozilla_ca_crt()))
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
return all_certs, all_keys
|
||||
|
||||
|
||||
def get_ssl_perm_warning(path: Path, actual_mode: int, expected_mode: int) -> str:
|
||||
return (
|
||||
f"Permissions {octal_mode_string(actual_mode)} for "
|
||||
f"'{path}' are too open. " # lgtm [py/clear-text-logging-sensitive-data]
|
||||
f"Expected {octal_mode_string(expected_mode)}"
|
||||
)
|
||||
|
||||
|
||||
def verify_ssl_certs_and_keys(
|
||||
cert_and_key_paths: List[Tuple[Optional[Path], Optional[Path]]], log: Optional[Logger] = None
|
||||
) -> List[Tuple[Path, int]]:
|
||||
cert_paths: List[Path], key_paths: List[Path], log: Optional[Logger] = None
|
||||
) -> List[Tuple[Path, int, int]]:
|
||||
"""Check that file permissions are properly set for the provided SSL cert and key files"""
|
||||
if sys.platform == "win32" or sys.platform == "cygwin":
|
||||
# TODO: ACLs for SSL certs/keys on Windows
|
||||
return []
|
||||
|
||||
invalid_files_and_modes: List[Tuple[Path, int]] = []
|
||||
banner_shown: bool = False
|
||||
invalid_files_and_modes: List[Tuple[Path, int, int]] = []
|
||||
|
||||
for (cert_path, key_path) in cert_and_key_paths:
|
||||
if cert_path is not None:
|
||||
cert_perms_valid, cert_actual_mode = verify_file_permissions(cert_path, RESTRICT_MASK_CERT_FILE)
|
||||
if not cert_perms_valid:
|
||||
print_ssl_perm_warning(
|
||||
cert_path, cert_actual_mode, DEFAULT_PERMISSIONS_CERT_FILE, show_banner=not banner_shown, log=log
|
||||
)
|
||||
banner_shown = True
|
||||
invalid_files_and_modes.append((cert_path, cert_actual_mode))
|
||||
def verify_paths(paths: List[Path], restrict_mask: int, expected_permissions: int):
|
||||
nonlocal invalid_files_and_modes
|
||||
for path in paths:
|
||||
try:
|
||||
# Check that the file permissions are not too permissive
|
||||
is_valid, actual_permissions = verify_file_permissions(path, restrict_mask)
|
||||
if not is_valid:
|
||||
if log is not None:
|
||||
log.error(get_ssl_perm_warning(path, actual_permissions, expected_permissions))
|
||||
warned_ssl_files.add(path)
|
||||
invalid_files_and_modes.append((path, actual_permissions, expected_permissions))
|
||||
except Exception as e:
|
||||
print(f"Unable to check permissions for {path}: {e}") # lgtm [py/clear-text-logging-sensitive-data]
|
||||
|
||||
if key_path is not None:
|
||||
key_perms_valid, key_actual_mode = verify_file_permissions(key_path, RESTRICT_MASK_KEY_FILE)
|
||||
if not key_perms_valid:
|
||||
print_ssl_perm_warning(
|
||||
key_path, key_actual_mode, DEFAULT_PERMISSIONS_KEY_FILE, show_banner=not banner_shown, log=log
|
||||
)
|
||||
banner_shown = True
|
||||
invalid_files_and_modes.append((key_path, key_actual_mode))
|
||||
verify_paths(cert_paths, RESTRICT_MASK_CERT_FILE, DEFAULT_PERMISSIONS_CERT_FILE)
|
||||
verify_paths(key_paths, RESTRICT_MASK_KEY_FILE, DEFAULT_PERMISSIONS_KEY_FILE)
|
||||
|
||||
return invalid_files_and_modes
|
||||
|
||||
@ -115,47 +128,20 @@ def check_ssl(root_path: Path) -> None:
|
||||
Sanity checks on the SSL configuration. Checks that file permissions are properly
|
||||
set on the keys and certs, warning and exiting if permissions are incorrect.
|
||||
"""
|
||||
from chia.ssl.create_ssl import get_mozilla_ca_crt
|
||||
|
||||
if sys.platform == "win32" or sys.platform == "cygwin":
|
||||
# TODO: ACLs for SSL certs/keys on Windows
|
||||
return None
|
||||
|
||||
config: Dict = load_config(root_path, "config.yaml")
|
||||
files_to_check: List[Tuple[Path, int, int]] = []
|
||||
valid: bool = True
|
||||
banner_shown: bool = False
|
||||
|
||||
# Lookup config values and append to a list of files whose permissions we need to check
|
||||
for (key_paths, mask, expected_mode) in [
|
||||
(CERT_CONFIG_KEY_PATHS, RESTRICT_MASK_CERT_FILE, DEFAULT_PERMISSIONS_CERT_FILE),
|
||||
(KEY_CONFIG_KEY_PATHS, RESTRICT_MASK_KEY_FILE, DEFAULT_PERMISSIONS_KEY_FILE),
|
||||
]:
|
||||
for key_path in key_paths:
|
||||
try:
|
||||
file = root_path / Path(traverse_dict(config, key_path))
|
||||
files_to_check.append((file, mask, expected_mode))
|
||||
except Exception as e:
|
||||
print(
|
||||
f"Failed to lookup config value for {key_path}: {e}" # lgtm [py/clear-text-logging-sensitive-data]
|
||||
)
|
||||
|
||||
# Check the Mozilla Root CAs as well
|
||||
mozilla_root_ca = get_mozilla_ca_crt()
|
||||
files_to_check.append((Path(mozilla_root_ca), RESTRICT_MASK_CERT_FILE, DEFAULT_PERMISSIONS_CERT_FILE))
|
||||
|
||||
for (file, mask, expected_mode) in files_to_check:
|
||||
try:
|
||||
# Check that the file permissions are not too permissive
|
||||
(good_perms, mode) = verify_file_permissions(file, mask)
|
||||
if not good_perms:
|
||||
print_ssl_perm_warning(file, mode, expected_mode, show_banner=not banner_shown)
|
||||
banner_shown = True
|
||||
valid = False
|
||||
except Exception as e:
|
||||
print(f"Unable to check permissions for {key_path}: {e}") # lgtm [py/clear-text-logging-sensitive-data]
|
||||
|
||||
if not valid:
|
||||
certs_to_check, keys_to_check = get_all_ssl_file_paths(root_path)
|
||||
invalid_files = verify_ssl_certs_and_keys(certs_to_check, keys_to_check)
|
||||
if len(invalid_files):
|
||||
print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
|
||||
print("@ WARNING: UNPROTECTED SSL FILE! @")
|
||||
print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
|
||||
for path, actual_permissions, expected_permissions in invalid_files:
|
||||
print(
|
||||
get_ssl_perm_warning(path, actual_permissions, expected_permissions)
|
||||
) # lgtm [py/clear-text-logging-sensitive-data]
|
||||
print("One or more SSL files were found with permission issues.")
|
||||
print("Run `chia init --fix-ssl-permissions` to fix issues.")
|
||||
|
||||
@ -164,7 +150,7 @@ def check_and_fix_permissions_for_ssl_file(file: Path, mask: int, updated_mode:
|
||||
"""Check file permissions and attempt to fix them if found to be too open"""
|
||||
if sys.platform == "win32" or sys.platform == "cygwin":
|
||||
# TODO: ACLs for SSL certs/keys on Windows
|
||||
return (True, False)
|
||||
return True, False
|
||||
|
||||
valid: bool = True
|
||||
updated: bool = False
|
||||
@ -184,38 +170,21 @@ def check_and_fix_permissions_for_ssl_file(file: Path, mask: int, updated_mode:
|
||||
print(f"Failed to change permissions on {file}: {e}") # lgtm [py/clear-text-logging-sensitive-data]
|
||||
valid = False
|
||||
|
||||
return (valid, updated)
|
||||
return valid, updated
|
||||
|
||||
|
||||
def fix_ssl(root_path: Path) -> None:
|
||||
"""Attempts to fix SSL cert/key file permissions that are too open"""
|
||||
from chia.ssl.create_ssl import get_mozilla_ca_crt
|
||||
|
||||
if sys.platform == "win32" or sys.platform == "cygwin":
|
||||
# TODO: ACLs for SSL certs/keys on Windows
|
||||
return None
|
||||
|
||||
config: Dict = load_config(root_path, "config.yaml")
|
||||
files_to_fix: List[Tuple[Path, int, int]] = []
|
||||
updated: bool = False
|
||||
encountered_error: bool = False
|
||||
|
||||
for (key_paths, mask, updated_mode) in [
|
||||
(CERT_CONFIG_KEY_PATHS, RESTRICT_MASK_CERT_FILE, DEFAULT_PERMISSIONS_CERT_FILE),
|
||||
(KEY_CONFIG_KEY_PATHS, RESTRICT_MASK_KEY_FILE, DEFAULT_PERMISSIONS_KEY_FILE),
|
||||
]:
|
||||
for key_path in key_paths:
|
||||
try:
|
||||
file = root_path / Path(traverse_dict(config, key_path))
|
||||
files_to_fix.append((file, mask, updated_mode))
|
||||
except Exception as e:
|
||||
print(
|
||||
f"Failed to lookup config value for {key_path}: {e}" # lgtm [py/clear-text-logging-sensitive-data]
|
||||
)
|
||||
|
||||
# Check the Mozilla Root CAs as well
|
||||
mozilla_root_ca = get_mozilla_ca_crt()
|
||||
files_to_fix.append((Path(mozilla_root_ca), RESTRICT_MASK_CERT_FILE, DEFAULT_PERMISSIONS_CERT_FILE))
|
||||
certs_to_check, keys_to_check = get_all_ssl_file_paths(root_path)
|
||||
files_to_fix = verify_ssl_certs_and_keys(certs_to_check, keys_to_check)
|
||||
|
||||
for (file, mask, updated_mode) in files_to_fix:
|
||||
# Check that permissions are correct, and if not, attempt to fix
|
||||
|
Loading…
Reference in New Issue
Block a user