136 lines
4.2 KiB
Python
136 lines
4.2 KiB
Python
"""# Secret Loader System.
|
|
|
|
System for loading secrets from a variety of sources.
|
|
|
|
Usage:
|
|
|
|
```python
|
|
import secret_loader
|
|
secrets = secret_loader.SecretLoader(env_key_prefix = 'MYAPP')
|
|
|
|
db_username = secrets.load_or_fail('DATABASE_USERNAME')
|
|
db_password = secrets.load_or_fail('DATABASE_PASSWORD')
|
|
```
|
|
|
|
Secret loading order:
|
|
|
|
0. Hardcoded values. **This is purely for debugging, prototyping, and for
|
|
configuring below options.**
|
|
1. Files pointed to by environment variables. Docker friendly.
|
|
2. Secrets folder. Also Docker friendly.
|
|
3. [Pass: the standard unix password
|
|
manager](https://www.passwordstore.org/). Most suited for personal
|
|
usage; very unsuited for server environments. Requires `pass` installed
|
|
locally, and configuration of the `PASS_STORE_SUBFOLDER` through one of the above
|
|
methods.
|
|
4. Vault instance if configured. Suited for production environments.
|
|
"""
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
|
|
from frozendict import frozendict
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.INFO)
|
|
|
|
try:
|
|
import hvac
|
|
except ImportError:
|
|
hvac = None
|
|
|
|
ENV_KEY_PREFIX = 'ENV_KEY_PREFIX'
|
|
|
|
ENV_KEY_VAULT_URL = 'VAULT_URL'
|
|
ENV_KEY_VAULT_TOKEN = 'VAULT_TOKEN'
|
|
ENV_KEY_VAULT_MOUNT_POINT = 'VAULT_MOUNT_POINT'
|
|
|
|
ENV_KEY_PASS_FOLDER = 'PASS_STORE_SUBFOLDER'
|
|
|
|
class SecretLoader:
|
|
"""
|
|
Main entry point for loading secrets.
|
|
|
|
See module documentation for usage.
|
|
"""
|
|
|
|
def __init__(self, **hardcoded: str):
|
|
# Basic setup, containing only hardcoded.
|
|
self.hardcoded: dict[str, str] = hardcoded
|
|
self.pass_folder = None
|
|
self.vault_client = None
|
|
self.env_key_prefix = None
|
|
|
|
# Setup environment
|
|
self.env_key_prefix = self._load_or_none(ENV_KEY_PREFIX)
|
|
if self.env_key_prefix is not None:
|
|
assert not self.env_key_prefix.endswith('_'), 'Prefix must not end with _ (this will be added automatically)'
|
|
|
|
# Setup pass
|
|
self.pass_folder = self._load_or_none(ENV_KEY_PASS_FOLDER)
|
|
|
|
# Setup vault
|
|
if hvac:
|
|
self.vault_client = hvac.Client(
|
|
url=self._load_or_none(ENV_KEY_VAULT_URL),
|
|
token=self._load_or_none(ENV_KEY_VAULT_TOKEN),
|
|
vault_mount_point=self._load_or_none(ENV_KEY_VAULT_MOUNT_POINT),
|
|
)
|
|
|
|
def load_or_fail(self, env_key: str) -> str:
|
|
value = self._load_or_none(env_key)
|
|
if value is None:
|
|
msg = f'Could not load secret {env_key}'
|
|
raise Exception(msg)
|
|
logger.info('Loaded secret: %s', env_key)
|
|
return value
|
|
|
|
def load(self, env_key: str) -> str | None:
|
|
value = self._load_or_none(env_key)
|
|
if value is None:
|
|
logger.exception('Could not load secret %s', env_key)
|
|
logger.info('Loaded secret: %s', env_key)
|
|
return value
|
|
|
|
def _load_or_none(self, env_key: str) -> str | None:
|
|
return (
|
|
self.hardcoded.get(env_key)
|
|
or self._load_or_none_path_or_file(env_key)
|
|
or self._load_or_none_local_password_store(env_key)
|
|
or self._load_or_none_vault(env_key)
|
|
)
|
|
|
|
def _load_or_none_path_or_file(self, env_key: str) -> str | None:
|
|
# 1. & 2.
|
|
filepath = os.environ.get(f'{self.env_key_prefix}_{env_key}')
|
|
|
|
if filepath is None:
|
|
filepath = f'./secrets/{env_key.lower()}'
|
|
try:
|
|
with open(filepath) as f:
|
|
return f.read().strip()
|
|
except FileNotFoundError:
|
|
return None
|
|
|
|
def _load_or_none_local_password_store(self, env_key: str) -> str | None:
|
|
if self.pass_folder is None:
|
|
return None
|
|
|
|
cmd = ['pass', 'show', f'{self.pass_folder}/{env_key.lower()}']
|
|
process = subprocess.run(cmd, capture_output = True)
|
|
if process.returncode:
|
|
return None
|
|
return process.stdout.decode('utf8')
|
|
|
|
def _load_or_none_vault(self, env_key: str) -> str | None:
|
|
print(self.hardcoded)
|
|
print(self.vault_client)
|
|
if self.vault_client is None:
|
|
return None
|
|
|
|
read_secret_result = self.vault_client.secrets.kv.v1.read_secret(
|
|
path=env_key.lower(),
|
|
mount_point=self.vault_mount_point,
|
|
)
|
|
return read_secret_result['data']['value']
|