191 lines
6.6 KiB
Python
191 lines
6.6 KiB
Python
"""# Secret Loader System.
|
|
|
|
System for loading secrets from a variety of sources.
|
|
|
|
Usage:
|
|
|
|
```python
|
|
import secret_loader
|
|
secrets = secret_loader.SecretLoader(env_key_prefix = 'MYAPP')
|
|
|
|
db_username = secrets.load_or_fail('DATABASE_USERNAME')
|
|
db_password = secrets.load_or_fail('DATABASE_PASSWORD')
|
|
```
|
|
|
|
|
|
Secret loading order:
|
|
|
|
0. Hardcoded values. **This is purely for debugging, prototyping, and for
|
|
configuring below options.**
|
|
1. Files pointed to by environment variables. Docker friendly.
|
|
2. Secrets folder. Also Docker friendly.
|
|
3. [Pass: the standard unix password
|
|
manager](https://www.passwordstore.org/). Most suited for personal
|
|
usage; very unsuited for server environments. Requires `pass` installed
|
|
locally, and configuration of the `PASS_STORE_SUBFOLDER` through one of the above
|
|
methods.
|
|
4. Vault instance if configured. Suited for production environments. **NOTE:
|
|
This is barely supported.** Requires `hvac` python package.
|
|
|
|
## Future extensions
|
|
|
|
- [ ] Key casing should be more consistent
|
|
* Case-insensitive for hardcoded and `load`.
|
|
* Upper case for environment variables.
|
|
* Lower case for files and others.
|
|
- [ ] New special configuration value for switching the `secrets` directory.
|
|
- [ ] Wrap secrets in intelligent strings:
|
|
* [ ] Instead of returning `None` on unloaded, return `UnknownSecret`, that produce error when formatted.
|
|
* [ ] `repr(secret)` should not include contents, but only the secret and how it was loaded.
|
|
* [ ] Methods on `Secret` should be kept minimal.
|
|
- [ ] Avoid leakage to swap files.
|
|
* Possibly Mlock? [Does not seem to work](https://stackoverflow.com/questions/29524020/prevent-ram-from-paging-to-swap-area-mlock)
|
|
* Alternatively use [mmap](https://docs.python.org/3/library/mmap.html) and [memoryview](https://stackoverflow.com/questions/18655648/what-exactly-is-the-point-of-memoryview-in-python)?§
|
|
- [ ] Vault:
|
|
* [ ] Ensure vault code path works.
|
|
* [ ] Document usage and requirements.
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import hvac
|
|
except ImportError:
|
|
hvac = None
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.INFO)
|
|
|
|
ENV_KEY_PREFIX = 'ENV_KEY_PREFIX'
|
|
|
|
ENV_KEY_VAULT_URL = 'VAULT_URL'
|
|
ENV_KEY_VAULT_TOKEN = 'VAULT_TOKEN'
|
|
ENV_KEY_VAULT_MOUNT_POINT = 'VAULT_MOUNT_POINT'
|
|
|
|
ENV_KEY_PASS_FOLDER = 'PASS_STORE_SUBFOLDER'
|
|
|
|
DEFAULT_SECRETS_DIRECTORY = Path('./secrets/')
|
|
|
|
class SecretLoader:
|
|
"""Main entry point for loading secrets.
|
|
|
|
See module documentation for usage.
|
|
"""
|
|
|
|
def __init__(self, **hardcoded: str):
|
|
"""Initialize `SecretLoader`.
|
|
|
|
Hardcoded values are stored directly, and can be used to configure the
|
|
other subsystems.
|
|
"""
|
|
self.hardcoded: dict[str, str] = hardcoded
|
|
self.pass_folder = None
|
|
self.vault_client = None
|
|
self.env_key_prefix = None
|
|
|
|
# Setup environment
|
|
self.env_key_prefix = self._load_or_none(ENV_KEY_PREFIX)
|
|
if self.env_key_prefix is not None:
|
|
assert not self.env_key_prefix.endswith('_'), 'Prefix must not end with _ (this will be added automatically)'
|
|
|
|
# Setup pass
|
|
self.pass_folder = self._load_or_none(ENV_KEY_PASS_FOLDER)
|
|
|
|
# Setup vault
|
|
if hvac:
|
|
self.vault_client = hvac.Client(
|
|
url=self._load_or_none(ENV_KEY_VAULT_URL),
|
|
token=self._load_or_none(ENV_KEY_VAULT_TOKEN),
|
|
vault_mount_point=self._load_or_none(ENV_KEY_VAULT_MOUNT_POINT),
|
|
)
|
|
|
|
def load_or_fail(self, env_key: str) -> str:
|
|
"""Load secret with the given key, from one of the backends or
|
|
throw an exception if not found.
|
|
"""
|
|
value = self._load_or_none(env_key)
|
|
if value is None:
|
|
msg = f'Failed to load secret with key: {env_key}'
|
|
raise Exception(msg)
|
|
logger.info('Loaded secret with key: %s', env_key)
|
|
return value
|
|
|
|
def load(self, env_key: str) -> str | None:
|
|
"""Load secret with the given key, from one of the backends or
|
|
return `None` if not found.
|
|
|
|
A warning log line is emitted.
|
|
"""
|
|
value = self._load_or_none(env_key)
|
|
if value is None:
|
|
logger.warning('Failed to load secret with key: %s', env_key)
|
|
else:
|
|
logger.info('Loaded secret with key: %s', env_key)
|
|
return value
|
|
|
|
def _load_or_none(self, env_key: str) -> str | None:
|
|
"""Load secret with the given key, from one of the backends or
|
|
return `None` if not found.
|
|
"""
|
|
return (
|
|
self.hardcoded.get(env_key)
|
|
or self._load_or_none_path_or_file(env_key)
|
|
or self._load_or_none_local_password_store(env_key)
|
|
or self._load_or_none_vault(env_key)
|
|
)
|
|
|
|
def _load_or_none_path_or_file(self, env_key: str) -> str | None:
|
|
"""Load secret for given key, from either an environment defined key,
|
|
or from the `secrets` directory.
|
|
|
|
Returns `None` if the secret is not present in either the environment
|
|
or the directory.
|
|
"""
|
|
filepath: Path | str | None = os.environ.get(f'{self.env_key_prefix}_{env_key}')
|
|
|
|
if filepath is None:
|
|
filepath = DEFAULT_SECRETS_DIRECTORY / env_key.lower()
|
|
|
|
try:
|
|
with open(filepath) as f:
|
|
return f.read().strip()
|
|
except FileNotFoundError:
|
|
return None
|
|
|
|
def _load_or_none_local_password_store(self, env_key: str) -> str | None:
|
|
"""Load secret from the `pass` password manager.
|
|
|
|
Returns `None` if the `pass` password manager is not configured, or if
|
|
the secret is not present in the `pass` password manager.
|
|
"""
|
|
if self.pass_folder is None:
|
|
return None
|
|
|
|
cmd = ['pass', 'show', f'{self.pass_folder}/{env_key.lower()}']
|
|
process = subprocess.run(cmd, capture_output = True)
|
|
if process.returncode:
|
|
return None
|
|
|
|
password_file = process.stdout.decode('utf8')
|
|
for line in password_file.split('\n'):
|
|
return line
|
|
return None
|
|
|
|
def _load_or_none_vault(self, env_key: str) -> str | None:
|
|
"""Load secret from the configured vault instance.
|
|
|
|
Returns `None` if vault instance is not configured, or if the value
|
|
instance does not know about the secret.
|
|
"""
|
|
if self.vault_client is None:
|
|
return None
|
|
|
|
read_secret_result = self.vault_client.secrets.kv.v1.read_secret(
|
|
path=env_key.lower(),
|
|
mount_point=self.vault_mount_point,
|
|
)
|
|
return read_secret_result['data']['value']
|