1
0

Compare commits

...

2 Commits

Author SHA1 Message Date
81264a3b6c
Code quality
Some checks failed
Run Python tests (through Pytest) / Test (push) Failing after 24s
Python Ruff Code Quality / ruff (push) Successful in 22s
Verify Python project can be installed, loaded and have version checked / Test (push) Successful in 22s
2024-10-27 17:49:06 +01:00
83f310e28a
100% Coverage. Had to remove HVAC to achieve it 2024-10-27 17:47:07 +01:00
2 changed files with 60 additions and 38 deletions

View File

@ -26,8 +26,6 @@ Secret loading order:
usage; very unsuited for server environments. Requires `pass` installed usage; very unsuited for server environments. Requires `pass` installed
locally, and configuration of the `PASS_STORE_SUBFOLDER` through one of the above locally, and configuration of the `PASS_STORE_SUBFOLDER` through one of the above
methods. methods.
4. Vault instance if configured. Suited for production environments. **NOTE:
This is barely supported.** Requires `hvac` python package.
## Future extensions ## Future extensions
@ -54,11 +52,6 @@ import os
import subprocess import subprocess
from pathlib import Path from pathlib import Path
try:
import hvac
except ImportError:
hvac = None
from ._version import __version__ from ._version import __version__
__all__ = ['__version__', 'SecretLoader'] __all__ = ['__version__', 'SecretLoader']
@ -126,7 +119,7 @@ class SecretLoader:
msg = 'Prefix must be uppercase' msg = 'Prefix must be uppercase'
raise ValueError(msg) raise ValueError(msg)
if self.env_key_prefix.endswith('_'): if self.env_key_prefix.endswith('_'):
msg = 'Prefix must not end with _ (this will be added automatically)' msg = 'Prefix must not end with "_" (this will be added automatically)'
raise ValueError(msg) raise ValueError(msg)
# Setup secrets path # Setup secrets path
@ -139,21 +132,13 @@ class SecretLoader:
# Setup pass # Setup pass
self.pass_folder = self._load_or_none(ENV_KEY_PASS_FOLDER) self.pass_folder = self._load_or_none(ENV_KEY_PASS_FOLDER)
# Setup vault
if hvac:
self.vault_client = hvac.Client(
url=self._load_or_none(ENV_KEY_VAULT_URL),
token=self._load_or_none(ENV_KEY_VAULT_TOKEN),
vault_mount_point=self._load_or_none(ENV_KEY_VAULT_MOUNT_POINT),
)
def load_or_fail(self, secret_name: str) -> str: def load_or_fail(self, secret_name: str) -> str:
"""Load secret with the given key, from one of the backends or """Load secret with the given key, from one of the backends or
throw an exception if not found. throw an exception if not found.
""" """
value = self._load_or_none(secret_name) value = self._load_or_none(secret_name)
if value is None: if value is None:
raise Exception(self._format_error_message(secret_name)) raise ValueError(self._format_error_message(secret_name))
logger.info('Loaded secret with key: %s', secret_name) logger.info('Loaded secret with key: %s', secret_name)
return value return value
@ -178,7 +163,6 @@ class SecretLoader:
self.hardcoded.get(secret_name) self.hardcoded.get(secret_name)
or self._load_or_none_path_or_file(secret_name) or self._load_or_none_path_or_file(secret_name)
or self._load_or_none_local_password_store(secret_name) or self._load_or_none_local_password_store(secret_name)
or self._load_or_none_vault(secret_name)
) )
def _load_or_none_path_or_file(self, secret_name: str) -> str | None: def _load_or_none_path_or_file(self, secret_name: str) -> str | None:
@ -220,28 +204,22 @@ class SecretLoader:
check=False, check=False,
shell=False, shell=False,
) )
if process.returncode:
return None
password_file = process.stdout.decode('utf8') return self._convert_pass_process_result_to_password(
for line in password_file.split('\n'): process.returncode,
return line process.stdout,
return None
def _load_or_none_vault(self, secret_name: str) -> str | None:
"""Load secret from the configured vault instance.
Returns `None` if vault instance is not configured, or if the value
instance does not know about the secret.
"""
if self.vault_client is None:
return None
read_secret_result = self.vault_client.secrets.kv.v1.read_secret(
path=secret_name.lower(),
mount_point=self.vault_mount_point,
) )
return read_secret_result['data']['value']
def _convert_pass_process_result_to_password(
self,
returncode: int,
stdout: bytes,
) -> str | None:
if returncode != 0:
return None
password_file = stdout.decode('utf8')
return password_file.split('\n')[0]
def _format_error_message(self, secret_name: str) -> str: def _format_error_message(self, secret_name: str) -> str:
"""Formats an error message with solution suggestions for the given """Formats an error message with solution suggestions for the given

View File

@ -1,3 +1,5 @@
import pytest
import secret_loader import secret_loader
@ -6,12 +8,54 @@ def test_hardcoded():
assert loader.load('ENV_KEY_PREFIX') == 'TEST' assert loader.load('ENV_KEY_PREFIX') == 'TEST'
assert loader.load('KEY') == 'VALUE' assert loader.load('KEY') == 'VALUE'
assert loader.load_or_fail('ENV_KEY_PREFIX') == 'TEST'
assert loader.load_or_fail('KEY') == 'VALUE'
def test_lookup_secrets_dir(): def test_lookup_secrets_dir():
loader = secret_loader.SecretLoader(SECRETS_DIRECTORY='test/example-secrets') loader = secret_loader.SecretLoader(SECRETS_DIRECTORY='test/example-secrets')
assert loader.load('MY_SECRET') == 'HELLO SECRET' assert loader.load('MY_SECRET') == 'HELLO SECRET'
assert loader.load_or_fail('MY_SECRET') == 'HELLO SECRET'
def test_lookup_unknown(): def test_lookup_unknown():
loader = secret_loader.SecretLoader() loader = secret_loader.SecretLoader()
assert loader.load('UNKNOWN') is None assert loader.load('UNKNOWN') is None
def test_fail_hardcoded_prefix_lowercase():
with pytest.raises(ValueError, match='Prefix must be uppercase'):
secret_loader.SecretLoader(ENV_KEY_PREFIX='test')
def test_fail_hardcoded_prefix_with_trailing_underscore():
with pytest.raises(
ValueError,
match=r'Prefix must not end with "_" \(this will be added automatically\)',
):
secret_loader.SecretLoader(ENV_KEY_PREFIX='TEST_')
def test_lookup_unknown_or_fail():
loader = secret_loader.SecretLoader(
ENV_KEY_PREFIX='TEST',
PASS_STORE_SUBFOLDER='test', # noqa: S106
)
with pytest.raises(
ValueError,
match='Failed to load secret with key:.*UNKNOWN.*',
) as e:
assert loader.load_or_fail('UNKNOWN')
assert 'Write secret to file' in str(e.value)
assert 'Add environment variable pointing to written secret' in str(e.value)
assert 'Write secret to password store entry' in str(e.value)
def test_convert_process():
loader = secret_loader.SecretLoader()
assert loader._convert_pass_process_result_to_password(1, b'') is None # noqa: SLF001
assert (
loader._convert_pass_process_result_to_password(0, b'Hello\nWorld') == 'Hello' # noqa: SLF001
)
assert loader._convert_pass_process_result_to_password(0, b'') == '' # noqa: SLF001