1
0

Cool error message
All checks were successful
Test Python / Test (push) Successful in 23s

This commit is contained in:
Jon Michael Aanes 2024-07-23 00:45:40 +02:00
parent 253113b564
commit 03d5f5fa04
Signed by: Jmaa
SSH Key Fingerprint: SHA256:Ab0GfHGCblESJx7JRE4fj4bFy/KRpeLhi41y4pF3sNA

View File

@ -12,7 +12,6 @@ db_username = secrets.load_or_fail('DATABASE_USERNAME')
db_password = secrets.load_or_fail('DATABASE_PASSWORD')
```
Secret loading order:
0. Hardcoded values. **This is purely for debugging, prototyping, and for
@ -69,6 +68,28 @@ ENV_KEY_PASS_FOLDER = 'PASS_STORE_SUBFOLDER'
DEFAULT_SECRETS_DIRECTORY = Path('./secrets/')
ERROR_MESSAGE_FORMAT = """Failed to load secret with key: \033[1m{secret_name}\033[0m
\033[1m## What is this?\033[0m
The application you are using requires a secret of the name "{secret_name}".
A secret can be for example, a password, an API key, or a private key; things
that should not be publicly known, as they present an attack vector.
You should only give the program permission to your secrets if you trust it.
\033[1m## How to fix?\033[0m
There are several ways to supply the secret, whereas these will fix the issue
instantly:
{solutions_list}
See more ways to supply the secret here:
https://gitfub.space/Jmaa/secret_loader
"""
class SecretLoader:
"""Main entry point for loading secrets.
@ -89,6 +110,7 @@ class SecretLoader:
# Setup environment
self.env_key_prefix = self._load_or_none(ENV_KEY_PREFIX)
if self.env_key_prefix is not None:
assert self.env_key_prefix == self.env_key_prefix.upper(), 'Prefix must be uppercase'
assert not self.env_key_prefix.endswith('_'), 'Prefix must not end with _ (this will be added automatically)'
# Setup pass
@ -102,52 +124,51 @@ class SecretLoader:
vault_mount_point=self._load_or_none(ENV_KEY_VAULT_MOUNT_POINT),
)
def load_or_fail(self, env_key: str) -> str:
def load_or_fail(self, secret_name: str) -> str:
"""Load secret with the given key, from one of the backends or
throw an exception if not found.
"""
value = self._load_or_none(env_key)
value = self._load_or_none(secret_name)
if value is None:
msg = f'Failed to load secret with key: {env_key}'
raise Exception(msg)
logger.info('Loaded secret with key: %s', env_key)
raise Exception(self._format_error_message(secret_name))
logger.info('Loaded secret with key: %s', secret_name)
return value
def load(self, env_key: str) -> str | None:
def load(self, secret_name: str) -> str | None:
"""Load secret with the given key, from one of the backends or
return `None` if not found.
A warning log line is emitted.
"""
value = self._load_or_none(env_key)
value = self._load_or_none(secret_name)
if value is None:
logger.warning('Failed to load secret with key: %s', env_key)
logger.warning('Failed to load secret with key: %s', secret_name)
else:
logger.info('Loaded secret with key: %s', env_key)
logger.info('Loaded secret with key: %s', secret_name)
return value
def _load_or_none(self, env_key: str) -> str | None:
def _load_or_none(self, secret_name: str) -> str | None:
"""Load secret with the given key, from one of the backends or
return `None` if not found.
"""
return (
self.hardcoded.get(env_key)
or self._load_or_none_path_or_file(env_key)
or self._load_or_none_local_password_store(env_key)
or self._load_or_none_vault(env_key)
self.hardcoded.get(secret_name)
or self._load_or_none_path_or_file(secret_name)
or self._load_or_none_local_password_store(secret_name)
or self._load_or_none_vault(secret_name)
)
def _load_or_none_path_or_file(self, env_key: str) -> str | None:
def _load_or_none_path_or_file(self, secret_name: str) -> str | None:
"""Load secret for given key, from either an environment defined key,
or from the `secrets` directory.
Returns `None` if the secret is not present in either the environment
or the directory.
"""
filepath: Path | str | None = os.environ.get(f'{self.env_key_prefix}_{env_key}')
filepath: Path | str | None = os.environ.get(f'{self.env_key_prefix}_{secret_name.upper()}')
if filepath is None:
filepath = DEFAULT_SECRETS_DIRECTORY / env_key.lower()
filepath = DEFAULT_SECRETS_DIRECTORY / secret_name.lower()
try:
with open(filepath) as f:
@ -155,7 +176,7 @@ class SecretLoader:
except FileNotFoundError:
return None
def _load_or_none_local_password_store(self, env_key: str) -> str | None:
def _load_or_none_local_password_store(self, secret_name: str) -> str | None:
"""Load secret from the `pass` password manager.
Returns `None` if the `pass` password manager is not configured, or if
@ -164,7 +185,7 @@ class SecretLoader:
if self.pass_folder is None:
return None
cmd = ['pass', 'show', f'{self.pass_folder}/{env_key.lower()}']
cmd = ['pass', 'show', f'{self.pass_folder}/{secret_name.lower()}']
process = subprocess.run(cmd, capture_output = True)
if process.returncode:
return None
@ -174,7 +195,7 @@ class SecretLoader:
return line
return None
def _load_or_none_vault(self, env_key: str) -> str | None:
def _load_or_none_vault(self, secret_name: str) -> str | None:
"""Load secret from the configured vault instance.
Returns `None` if vault instance is not configured, or if the value
@ -184,7 +205,24 @@ class SecretLoader:
return None
read_secret_result = self.vault_client.secrets.kv.v1.read_secret(
path=env_key.lower(),
path=secret_name.lower(),
mount_point=self.vault_mount_point,
)
return read_secret_result['data']['value']
def _format_error_message(self, secret_name: str) -> str:
"""Formats an error message with solution suggestions for the given
secret_name.
Solutions are based around which configuration options have been
enabled.
"""
solutions_list = []
solutions_list.append(f'Write secret to file: \033[1m{DEFAULT_SECRETS_DIRECTORY}/{secret_name.lower()}\033[0m')
if self.env_key_prefix is not None:
solutions_list.append(f'Add environment variable pointing to written secret: \033[1m{self.env_key_prefix}_{secret_name.upper()}\033[0m')
if self.pass_folder is not None:
solutions_list.append(f'Write secret to password store entry: \033[1m{self.pass_folder}/{secret_name.lower()}\033[0m')
solutions_list = '\n'.join([f'* {s}' for s in solutions_list])
return ERROR_MESSAGE_FORMAT.format(secret_name = secret_name, solutions_list=solutions_list)