1
0

Compare commits

...

2 Commits

Author SHA1 Message Date
81264a3b6c
Code quality
Some checks failed
Run Python tests (through Pytest) / Test (push) Failing after 24s
Python Ruff Code Quality / ruff (push) Successful in 22s
Verify Python project can be installed, loaded and have version checked / Test (push) Successful in 22s
2024-10-27 17:49:06 +01:00
83f310e28a
100% Coverage. Had to remove HVAC to achieve it 2024-10-27 17:47:07 +01:00
2 changed files with 60 additions and 38 deletions

View File

@ -26,8 +26,6 @@ Secret loading order:
usage; very unsuited for server environments. Requires `pass` installed
locally, and configuration of the `PASS_STORE_SUBFOLDER` through one of the above
methods.
4. Vault instance if configured. Suited for production environments. **NOTE:
This is barely supported.** Requires `hvac` python package.
## Future extensions
@ -54,11 +52,6 @@ import os
import subprocess
from pathlib import Path
try:
import hvac
except ImportError:
hvac = None
from ._version import __version__
__all__ = ['__version__', 'SecretLoader']
@ -126,7 +119,7 @@ class SecretLoader:
msg = 'Prefix must be uppercase'
raise ValueError(msg)
if self.env_key_prefix.endswith('_'):
msg = 'Prefix must not end with _ (this will be added automatically)'
msg = 'Prefix must not end with "_" (this will be added automatically)'
raise ValueError(msg)
# Setup secrets path
@ -139,21 +132,13 @@ class SecretLoader:
# Setup pass
self.pass_folder = self._load_or_none(ENV_KEY_PASS_FOLDER)
# Setup vault
if hvac:
self.vault_client = hvac.Client(
url=self._load_or_none(ENV_KEY_VAULT_URL),
token=self._load_or_none(ENV_KEY_VAULT_TOKEN),
vault_mount_point=self._load_or_none(ENV_KEY_VAULT_MOUNT_POINT),
)
def load_or_fail(self, secret_name: str) -> str:
"""Load secret with the given key, from one of the backends or
throw an exception if not found.
"""
value = self._load_or_none(secret_name)
if value is None:
raise Exception(self._format_error_message(secret_name))
raise ValueError(self._format_error_message(secret_name))
logger.info('Loaded secret with key: %s', secret_name)
return value
@ -178,7 +163,6 @@ class SecretLoader:
self.hardcoded.get(secret_name)
or self._load_or_none_path_or_file(secret_name)
or self._load_or_none_local_password_store(secret_name)
or self._load_or_none_vault(secret_name)
)
def _load_or_none_path_or_file(self, secret_name: str) -> str | None:
@ -220,28 +204,22 @@ class SecretLoader:
check=False,
shell=False,
)
if process.returncode:
return None
password_file = process.stdout.decode('utf8')
for line in password_file.split('\n'):
return line
return None
def _load_or_none_vault(self, secret_name: str) -> str | None:
"""Load secret from the configured vault instance.
Returns `None` if vault instance is not configured, or if the value
instance does not know about the secret.
"""
if self.vault_client is None:
return None
read_secret_result = self.vault_client.secrets.kv.v1.read_secret(
path=secret_name.lower(),
mount_point=self.vault_mount_point,
return self._convert_pass_process_result_to_password(
process.returncode,
process.stdout,
)
return read_secret_result['data']['value']
def _convert_pass_process_result_to_password(
self,
returncode: int,
stdout: bytes,
) -> str | None:
if returncode != 0:
return None
password_file = stdout.decode('utf8')
return password_file.split('\n')[0]
def _format_error_message(self, secret_name: str) -> str:
"""Formats an error message with solution suggestions for the given

View File

@ -1,3 +1,5 @@
import pytest
import secret_loader
@ -6,12 +8,54 @@ def test_hardcoded():
assert loader.load('ENV_KEY_PREFIX') == 'TEST'
assert loader.load('KEY') == 'VALUE'
assert loader.load_or_fail('ENV_KEY_PREFIX') == 'TEST'
assert loader.load_or_fail('KEY') == 'VALUE'
def test_lookup_secrets_dir():
loader = secret_loader.SecretLoader(SECRETS_DIRECTORY='test/example-secrets')
assert loader.load('MY_SECRET') == 'HELLO SECRET'
assert loader.load_or_fail('MY_SECRET') == 'HELLO SECRET'
def test_lookup_unknown():
loader = secret_loader.SecretLoader()
assert loader.load('UNKNOWN') is None
def test_fail_hardcoded_prefix_lowercase():
with pytest.raises(ValueError, match='Prefix must be uppercase'):
secret_loader.SecretLoader(ENV_KEY_PREFIX='test')
def test_fail_hardcoded_prefix_with_trailing_underscore():
with pytest.raises(
ValueError,
match=r'Prefix must not end with "_" \(this will be added automatically\)',
):
secret_loader.SecretLoader(ENV_KEY_PREFIX='TEST_')
def test_lookup_unknown_or_fail():
loader = secret_loader.SecretLoader(
ENV_KEY_PREFIX='TEST',
PASS_STORE_SUBFOLDER='test', # noqa: S106
)
with pytest.raises(
ValueError,
match='Failed to load secret with key:.*UNKNOWN.*',
) as e:
assert loader.load_or_fail('UNKNOWN')
assert 'Write secret to file' in str(e.value)
assert 'Add environment variable pointing to written secret' in str(e.value)
assert 'Write secret to password store entry' in str(e.value)
def test_convert_process():
loader = secret_loader.SecretLoader()
assert loader._convert_pass_process_result_to_password(1, b'') is None # noqa: SLF001
assert (
loader._convert_pass_process_result_to_password(0, b'Hello\nWorld') == 'Hello' # noqa: SLF001
)
assert loader._convert_pass_process_result_to_password(0, b'') == '' # noqa: SLF001