1
0

100% Coverage. Had to remove HVAC to achieve it

This commit is contained in:
Jon Michael Aanes 2024-10-27 17:47:07 +01:00
parent d34e2578e6
commit 83f310e28a
Signed by: Jmaa
SSH Key Fingerprint: SHA256:Ab0GfHGCblESJx7JRE4fj4bFy/KRpeLhi41y4pF3sNA
2 changed files with 54 additions and 38 deletions

View File

@ -26,8 +26,6 @@ Secret loading order:
usage; very unsuited for server environments. Requires `pass` installed usage; very unsuited for server environments. Requires `pass` installed
locally, and configuration of the `PASS_STORE_SUBFOLDER` through one of the above locally, and configuration of the `PASS_STORE_SUBFOLDER` through one of the above
methods. methods.
4. Vault instance if configured. Suited for production environments. **NOTE:
This is barely supported.** Requires `hvac` python package.
## Future extensions ## Future extensions
@ -54,11 +52,6 @@ import os
import subprocess import subprocess
from pathlib import Path from pathlib import Path
try:
import hvac
except ImportError:
hvac = None
from ._version import __version__ from ._version import __version__
__all__ = ['__version__', 'SecretLoader'] __all__ = ['__version__', 'SecretLoader']
@ -126,7 +119,7 @@ class SecretLoader:
msg = 'Prefix must be uppercase' msg = 'Prefix must be uppercase'
raise ValueError(msg) raise ValueError(msg)
if self.env_key_prefix.endswith('_'): if self.env_key_prefix.endswith('_'):
msg = 'Prefix must not end with _ (this will be added automatically)' msg = 'Prefix must not end with "_" (this will be added automatically)'
raise ValueError(msg) raise ValueError(msg)
# Setup secrets path # Setup secrets path
@ -139,21 +132,13 @@ class SecretLoader:
# Setup pass # Setup pass
self.pass_folder = self._load_or_none(ENV_KEY_PASS_FOLDER) self.pass_folder = self._load_or_none(ENV_KEY_PASS_FOLDER)
# Setup vault
if hvac:
self.vault_client = hvac.Client(
url=self._load_or_none(ENV_KEY_VAULT_URL),
token=self._load_or_none(ENV_KEY_VAULT_TOKEN),
vault_mount_point=self._load_or_none(ENV_KEY_VAULT_MOUNT_POINT),
)
def load_or_fail(self, secret_name: str) -> str: def load_or_fail(self, secret_name: str) -> str:
"""Load secret with the given key, from one of the backends or """Load secret with the given key, from one of the backends or
throw an exception if not found. throw an exception if not found.
""" """
value = self._load_or_none(secret_name) value = self._load_or_none(secret_name)
if value is None: if value is None:
raise Exception(self._format_error_message(secret_name)) raise ValueError(self._format_error_message(secret_name))
logger.info('Loaded secret with key: %s', secret_name) logger.info('Loaded secret with key: %s', secret_name)
return value return value
@ -178,7 +163,6 @@ class SecretLoader:
self.hardcoded.get(secret_name) self.hardcoded.get(secret_name)
or self._load_or_none_path_or_file(secret_name) or self._load_or_none_path_or_file(secret_name)
or self._load_or_none_local_password_store(secret_name) or self._load_or_none_local_password_store(secret_name)
or self._load_or_none_vault(secret_name)
) )
def _load_or_none_path_or_file(self, secret_name: str) -> str | None: def _load_or_none_path_or_file(self, secret_name: str) -> str | None:
@ -220,28 +204,19 @@ class SecretLoader:
check=False, check=False,
shell=False, shell=False,
) )
if process.returncode:
return None
password_file = process.stdout.decode('utf8') return self._convert_pass_process_result_to_password(
for line in password_file.split('\n'): process.returncode, process.stdout
return line
return None
def _load_or_none_vault(self, secret_name: str) -> str | None:
"""Load secret from the configured vault instance.
Returns `None` if vault instance is not configured, or if the value
instance does not know about the secret.
"""
if self.vault_client is None:
return None
read_secret_result = self.vault_client.secrets.kv.v1.read_secret(
path=secret_name.lower(),
mount_point=self.vault_mount_point,
) )
return read_secret_result['data']['value']
def _convert_pass_process_result_to_password(
self, returncode: int, stdout: bytes
) -> str | None:
if returncode != 0:
return None
password_file = stdout.decode('utf8')
return password_file.split('\n')[0]
def _format_error_message(self, secret_name: str) -> str: def _format_error_message(self, secret_name: str) -> str:
"""Formats an error message with solution suggestions for the given """Formats an error message with solution suggestions for the given

View File

@ -1,4 +1,5 @@
import secret_loader import secret_loader
import pytest
def test_hardcoded(): def test_hardcoded():
@ -6,12 +7,52 @@ def test_hardcoded():
assert loader.load('ENV_KEY_PREFIX') == 'TEST' assert loader.load('ENV_KEY_PREFIX') == 'TEST'
assert loader.load('KEY') == 'VALUE' assert loader.load('KEY') == 'VALUE'
assert loader.load_or_fail('ENV_KEY_PREFIX') == 'TEST'
assert loader.load_or_fail('KEY') == 'VALUE'
def test_lookup_secrets_dir(): def test_lookup_secrets_dir():
loader = secret_loader.SecretLoader(SECRETS_DIRECTORY='test/example-secrets') loader = secret_loader.SecretLoader(SECRETS_DIRECTORY='test/example-secrets')
assert loader.load('MY_SECRET') == 'HELLO SECRET' assert loader.load('MY_SECRET') == 'HELLO SECRET'
assert loader.load_or_fail('MY_SECRET') == 'HELLO SECRET'
def test_lookup_unknown(): def test_lookup_unknown():
loader = secret_loader.SecretLoader() loader = secret_loader.SecretLoader()
assert loader.load('UNKNOWN') is None assert loader.load('UNKNOWN') is None
def test_fail_hardcoded_prefix_lowercase():
with pytest.raises(ValueError, match='Prefix must be uppercase'):
secret_loader.SecretLoader(ENV_KEY_PREFIX='test')
def test_fail_hardcoded_prefix_with_trailing_underscore():
with pytest.raises(
ValueError,
match=r'Prefix must not end with "_" \(this will be added automatically\)',
):
secret_loader.SecretLoader(ENV_KEY_PREFIX='TEST_')
def test_lookup_unknown_or_fail():
loader = secret_loader.SecretLoader(
ENV_KEY_PREFIX='TEST', PASS_STORE_SUBFOLDER='test'
)
with pytest.raises(
ValueError, match='Failed to load secret with key:.*UNKNOWN.*'
) as e:
assert loader.load_or_fail('UNKNOWN')
assert 'Write secret to file' in str(e.value)
assert 'Add environment variable pointing to written secret' in str(e.value)
assert 'Write secret to password store entry' in str(e.value)
def test_convert_process():
loader = secret_loader.SecretLoader()
assert loader._convert_pass_process_result_to_password(1, b'') is None
assert (
loader._convert_pass_process_result_to_password(0, b'Hello\nWorld') == 'Hello'
)
assert loader._convert_pass_process_result_to_password(0, b'') == ''