kiteco-public/airflow/kite_airflow/secrets_backend.py
2021-12-31 23:54:19 -08:00

104 lines
3.7 KiB
Python

from typing import Optional
import boto3
from cached_property import cached_property
from airflow.secrets import BaseSecretsBackend
from airflow.utils.log.logging_mixin import LoggingMixin
class SecretsManagerBackend(BaseSecretsBackend, LoggingMixin):
"""
Retrieves Connection or Variables from AWS Secrets Manager
Configurable via ``airflow.cfg`` like so:
.. code-block:: ini
[secrets]
backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
backend_kwargs = {"connections_prefix": "airflow/connections"}
For example, if secrets prefix is ``airflow/connections/smtp_default``, this would be accessible
if you provide ``{"connections_prefix": "airflow/connections"}`` and request conn_id ``smtp_default``.
And if variables prefix is ``airflow/variables/hello``, this would be accessible
if you provide ``{"variables_prefix": "airflow/variables"}`` and request variable key ``hello``.
You can also pass additional keyword arguments like ``aws_secret_access_key``, ``aws_access_key_id``
or ``region_name`` to this class and they would be passed on to Boto3 client.
:param connections_prefix: Specifies the prefix of the secret to read to get Connections.
:type connections_prefix: str
:param variables_prefix: Specifies the prefix of the secret to read to get Variables.
:type variables_prefix: str
:param profile_name: The name of a profile to use. If not given, then the default profile is used.
:type profile_name: str
:param sep: separator used to concatenate secret_prefix and secret_id. Default: "/"
:type sep: str
"""
def __init__(
self,
connections_prefix: str = 'airflow/connections',
variables_prefix: str = 'airflow/variables',
profile_name: Optional[str] = None,
sep: str = "/",
**kwargs
):
super().__init__(**kwargs)
self.connections_prefix = connections_prefix.rstrip("/")
self.variables_prefix = variables_prefix.rstrip('/')
self.profile_name = profile_name
self.sep = sep
self.kwargs = kwargs
@cached_property
def client(self):
"""
Create a Secrets Manager client
"""
session = boto3.session.Session(
profile_name=self.profile_name,
)
return session.client(service_name="secretsmanager", **self.kwargs)
def get_conn_uri(self, conn_id: str) -> Optional[str]:
"""
Get Connection Value
:param conn_id: connection id
:type conn_id: str
"""
return self._get_secret(self.connections_prefix, conn_id)
def get_variable(self, key: str) -> Optional[str]:
"""
Get Airflow Variable from Environment Variable
:param key: Variable Key
:return: Variable Value
"""
return self._get_secret(self.variables_prefix, key)
def _get_secret(self, path_prefix: str, secret_id: str) -> Optional[str]:
"""
Get secret value from Secrets Manager
:param path_prefix: Prefix for the Path to get Secret
:type path_prefix: str
:param secret_id: Secret Key
:type secret_id: str
"""
secrets_path = self.build_path(path_prefix, secret_id, self.sep)
try:
response = self.client.get_secret_value(
SecretId=secrets_path,
)
return response.get('SecretString')
except self.client.exceptions.ResourceNotFoundException:
self.log.debug(
"An error occurred (ResourceNotFoundException) when calling the "
"get_secret_value operation: "
"Secret %s not found.", secrets_path
)
return None