1
0
secret_loader/secret_loader/__init__.py
Jon Michael Aanes 6151957d21
All checks were successful
Test Python / Test (push) Successful in 23s
Added logging line
2024-09-04 20:05:17 +02:00

259 lines
9.3 KiB
Python

"""# Secret Loader System.
System for loading secrets from a variety of sources.
Usage:
```python
import secret_loader
secrets = secret_loader.SecretLoader(env_key_prefix='MYAPP')
db_username = secrets.load_or_fail('DATABASE_USERNAME')
db_password = secrets.load_or_fail('DATABASE_PASSWORD')
```
Secret loading order:
0. Hardcoded values. **This is purely for debugging, prototyping, and for
configuring below options.**
1. Files pointed to by environment variables. Docker friendly.
2. Secrets folder. Also Docker friendly. Defaults to `secrets`, but can be
configured through the `SECRETS_DIRECTORY` key (NOTE: passed directly,
rather than through a file.)
3. [Pass: the standard unix password
manager](https://www.passwordstore.org/). Most suited for personal
usage; very unsuited for server environments. Requires `pass` installed
locally, and configuration of the `PASS_STORE_SUBFOLDER` through one of the above
methods.
4. Vault instance if configured. Suited for production environments. **NOTE:
This is barely supported.** Requires `hvac` python package.
## Future extensions
- [ ] Key casing should be more consistent
* Case-insensitive for hardcoded and `load`.
* Upper case for environment variables.
* Lower case for files and others.
- [ ] New special configuration value for switching the `secrets` directory.
- [ ] Wrap secrets in intelligent strings:
* [ ] Instead of returning `None` on unloaded, return `UnknownSecret`, that produce error when formatted.
* [ ] `repr(secret)` should not include contents, but only the secret and how it was loaded.
* [ ] Methods on `Secret` should be kept minimal.
- [ ] Avoid leakage to swap files.
* Possibly Mlock? [Does not seem to work](https://stackoverflow.com/questions/29524020/prevent-ram-from-paging-to-swap-area-mlock)
* Alternatively use [mmap](https://docs.python.org/3/library/mmap.html) and [memoryview](https://stackoverflow.com/questions/18655648/what-exactly-is-the-point-of-memoryview-in-python)?§
- [ ] Vault:
* [ ] Ensure vault code path works.
* [ ] Document usage and requirements.
- [ ] Get inspiration from <https://cheatsheetseries.owasp.org/cheatsheets/Secrets_Management_Cheat_Sheet.html>
"""
import logging
import os
import subprocess
from pathlib import Path
try:
import hvac
except ImportError:
hvac = None
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
ENV_KEY_PREFIX = 'ENV_KEY_PREFIX'
ENV_KEY_SECRETS_DIRECTORY = 'SECRETS_DIRECTORY'
ENV_KEY_VAULT_URL = 'VAULT_URL'
ENV_KEY_VAULT_TOKEN = 'VAULT_TOKEN' #noqa: S105
ENV_KEY_VAULT_MOUNT_POINT = 'VAULT_MOUNT_POINT'
ENV_KEY_PASS_FOLDER = 'PASS_STORE_SUBFOLDER' #noqa: S105
DEFAULT_SECRETS_DIRECTORY = Path('secrets')
ERROR_MESSAGE_FORMAT = """Failed to load secret with key: \033[1m{secret_name}\033[0m
\033[1m## What is this?\033[0m
The application you are using requires a secret of the name "{secret_name}".
A secret can be for example, a password, an API key, or a private key; things
that should not be publicly known, as they present an attack vector.
You should only give the program permission to your secrets if you trust it.
\033[1m## How to fix?\033[0m
There are several ways to supply the secret, whereas these will fix the issue
instantly:
{solutions_list}
See more ways to supply the secret here:
https://gitfub.space/Jmaa/secret_loader
"""
class SecretLoader:
"""Main entry point for loading secrets.
See module documentation for usage.
"""
def __init__(self, **hardcoded: str):
"""Initialize `SecretLoader`.
Hardcoded values are stored directly, and can be used to configure the
other subsystems.
"""
self.hardcoded: dict[str, str] = hardcoded
self.pass_folder = None
self.vault_client = None
self.env_key_prefix = None
self.secret_folder = None
# Setup environment
self.env_key_prefix = self._load_or_none(ENV_KEY_PREFIX)
if self.env_key_prefix is not None:
logger.info('Environment enabled with prefix: %s', self.env_key_prefix)
assert (
self.env_key_prefix == self.env_key_prefix.upper()
), 'Prefix must be uppercase'
assert not self.env_key_prefix.endswith(
'_',
), 'Prefix must not end with _ (this will be added automatically)'
# Setup secrets path
self.secret_folder = Path(self.hardcoded.get(ENV_KEY_SECRETS_DIRECTORY) or self._load_or_none_env(ENV_KEY_SECRETS_DIRECTORY) or DEFAULT_SECRETS_DIRECTORY)
# Setup pass
self.pass_folder = self._load_or_none(ENV_KEY_PASS_FOLDER)
# Setup vault
if hvac:
self.vault_client = hvac.Client(
url=self._load_or_none(ENV_KEY_VAULT_URL),
token=self._load_or_none(ENV_KEY_VAULT_TOKEN),
vault_mount_point=self._load_or_none(ENV_KEY_VAULT_MOUNT_POINT),
)
def load_or_fail(self, secret_name: str) -> str:
"""Load secret with the given key, from one of the backends or
throw an exception if not found.
"""
value = self._load_or_none(secret_name)
if value is None:
raise Exception(self._format_error_message(secret_name))
logger.info('Loaded secret with key: %s', secret_name)
return value
def load(self, secret_name: str) -> str | None:
"""Load secret with the given key, from one of the backends or
return `None` if not found.
A warning log line is emitted.
"""
value = self._load_or_none(secret_name)
if value is None:
logger.warning('Failed to load secret with key: %s', secret_name)
else:
logger.info('Loaded secret with key: %s', secret_name)
return value
def _load_or_none(self, secret_name: str) -> str | None:
"""Load secret with the given key, from one of the backends or
return `None` if not found.
"""
return (
self.hardcoded.get(secret_name)
or self._load_or_none_path_or_file(secret_name)
or self._load_or_none_local_password_store(secret_name)
or self._load_or_none_vault(secret_name)
)
def _load_or_none_path_or_file(self, secret_name: str) -> str | None:
"""Load secret for given key, from either an environment defined key,
or from the `secrets` directory.
Returns `None` if the secret is not present in either the environment
or the directory.
"""
filepath: Path | str | None = self._load_or_none_env(secret_name)
if filepath is None:
if self.secret_folder is not None:
filepath = self.secret_folder / secret_name.lower()
else:
return None
try:
with open(filepath) as f:
return f.read().strip()
except FileNotFoundError:
return None
def _load_or_none_env(self, secret_name) -> str | None:
return os.environ.get(f'{self.env_key_prefix}_{secret_name.upper()}')
def _load_or_none_local_password_store(self, secret_name: str) -> str | None:
"""Load secret from the `pass` password manager.
Returns `None` if the `pass` password manager is not configured, or if
the secret is not present in the `pass` password manager.
"""
if self.pass_folder is None:
return None
cmd = ['pass', 'show', f'{self.pass_folder}/{secret_name.lower()}']
process = subprocess.run(cmd, capture_output=True, check=False)
if process.returncode:
return None
password_file = process.stdout.decode('utf8')
for line in password_file.split('\n'):
return line
return None
def _load_or_none_vault(self, secret_name: str) -> str | None:
"""Load secret from the configured vault instance.
Returns `None` if vault instance is not configured, or if the value
instance does not know about the secret.
"""
if self.vault_client is None:
return None
read_secret_result = self.vault_client.secrets.kv.v1.read_secret(
path=secret_name.lower(),
mount_point=self.vault_mount_point,
)
return read_secret_result['data']['value']
def _format_error_message(self, secret_name: str) -> str:
"""Formats an error message with solution suggestions for the given
secret_name.
Solutions are based around which configuration options have been
enabled.
"""
solutions_list = []
solutions_list.append(
f'Write secret to file: \033[1m{DEFAULT_SECRETS_DIRECTORY}/{secret_name.lower()}\033[0m',
)
if self.env_key_prefix is not None:
solutions_list.append(
f'Add environment variable pointing to written secret: \033[1m{self.env_key_prefix}_{secret_name.upper()}\033[0m',
)
if self.pass_folder is not None:
solutions_list.append(
f'Write secret to password store entry: \033[1m{self.pass_folder}/{secret_name.lower()}\033[0m',
)
solutions_list = '\n'.join([f'* {s}' for s in solutions_list])
return ERROR_MESSAGE_FORMAT.format(
secret_name=secret_name,
solutions_list=solutions_list,
)