1
0

Compare commits

..

No commits in common. "81264a3b6c1e2beddfd5f1326273edfc73235a5e" and "d34e2578e6db4f06b46fe46560f27b58eb9ceaf5" have entirely different histories.

2 changed files with 38 additions and 60 deletions

View File

@ -26,6 +26,8 @@ Secret loading order:
usage; very unsuited for server environments. Requires `pass` installed usage; very unsuited for server environments. Requires `pass` installed
locally, and configuration of the `PASS_STORE_SUBFOLDER` through one of the above locally, and configuration of the `PASS_STORE_SUBFOLDER` through one of the above
methods. methods.
4. Vault instance if configured. Suited for production environments. **NOTE:
This is barely supported.** Requires `hvac` python package.
## Future extensions ## Future extensions
@ -52,6 +54,11 @@ import os
import subprocess import subprocess
from pathlib import Path from pathlib import Path
try:
import hvac
except ImportError:
hvac = None
from ._version import __version__ from ._version import __version__
__all__ = ['__version__', 'SecretLoader'] __all__ = ['__version__', 'SecretLoader']
@ -119,7 +126,7 @@ class SecretLoader:
msg = 'Prefix must be uppercase' msg = 'Prefix must be uppercase'
raise ValueError(msg) raise ValueError(msg)
if self.env_key_prefix.endswith('_'): if self.env_key_prefix.endswith('_'):
msg = 'Prefix must not end with "_" (this will be added automatically)' msg = 'Prefix must not end with _ (this will be added automatically)'
raise ValueError(msg) raise ValueError(msg)
# Setup secrets path # Setup secrets path
@ -132,13 +139,21 @@ class SecretLoader:
# Setup pass # Setup pass
self.pass_folder = self._load_or_none(ENV_KEY_PASS_FOLDER) self.pass_folder = self._load_or_none(ENV_KEY_PASS_FOLDER)
# Setup vault
if hvac:
self.vault_client = hvac.Client(
url=self._load_or_none(ENV_KEY_VAULT_URL),
token=self._load_or_none(ENV_KEY_VAULT_TOKEN),
vault_mount_point=self._load_or_none(ENV_KEY_VAULT_MOUNT_POINT),
)
def load_or_fail(self, secret_name: str) -> str: def load_or_fail(self, secret_name: str) -> str:
"""Load secret with the given key, from one of the backends or """Load secret with the given key, from one of the backends or
throw an exception if not found. throw an exception if not found.
""" """
value = self._load_or_none(secret_name) value = self._load_or_none(secret_name)
if value is None: if value is None:
raise ValueError(self._format_error_message(secret_name)) raise Exception(self._format_error_message(secret_name))
logger.info('Loaded secret with key: %s', secret_name) logger.info('Loaded secret with key: %s', secret_name)
return value return value
@ -163,6 +178,7 @@ class SecretLoader:
self.hardcoded.get(secret_name) self.hardcoded.get(secret_name)
or self._load_or_none_path_or_file(secret_name) or self._load_or_none_path_or_file(secret_name)
or self._load_or_none_local_password_store(secret_name) or self._load_or_none_local_password_store(secret_name)
or self._load_or_none_vault(secret_name)
) )
def _load_or_none_path_or_file(self, secret_name: str) -> str | None: def _load_or_none_path_or_file(self, secret_name: str) -> str | None:
@ -204,22 +220,28 @@ class SecretLoader:
check=False, check=False,
shell=False, shell=False,
) )
if process.returncode:
return self._convert_pass_process_result_to_password(
process.returncode,
process.stdout,
)
def _convert_pass_process_result_to_password(
self,
returncode: int,
stdout: bytes,
) -> str | None:
if returncode != 0:
return None return None
password_file = stdout.decode('utf8') password_file = process.stdout.decode('utf8')
return password_file.split('\n')[0] for line in password_file.split('\n'):
return line
return None
def _load_or_none_vault(self, secret_name: str) -> str | None:
"""Load secret from the configured vault instance.
Returns `None` if vault instance is not configured, or if the value
instance does not know about the secret.
"""
if self.vault_client is None:
return None
read_secret_result = self.vault_client.secrets.kv.v1.read_secret(
path=secret_name.lower(),
mount_point=self.vault_mount_point,
)
return read_secret_result['data']['value']
def _format_error_message(self, secret_name: str) -> str: def _format_error_message(self, secret_name: str) -> str:
"""Formats an error message with solution suggestions for the given """Formats an error message with solution suggestions for the given

View File

@ -1,5 +1,3 @@
import pytest
import secret_loader import secret_loader
@ -8,54 +6,12 @@ def test_hardcoded():
assert loader.load('ENV_KEY_PREFIX') == 'TEST' assert loader.load('ENV_KEY_PREFIX') == 'TEST'
assert loader.load('KEY') == 'VALUE' assert loader.load('KEY') == 'VALUE'
assert loader.load_or_fail('ENV_KEY_PREFIX') == 'TEST'
assert loader.load_or_fail('KEY') == 'VALUE'
def test_lookup_secrets_dir(): def test_lookup_secrets_dir():
loader = secret_loader.SecretLoader(SECRETS_DIRECTORY='test/example-secrets') loader = secret_loader.SecretLoader(SECRETS_DIRECTORY='test/example-secrets')
assert loader.load('MY_SECRET') == 'HELLO SECRET' assert loader.load('MY_SECRET') == 'HELLO SECRET'
assert loader.load_or_fail('MY_SECRET') == 'HELLO SECRET'
def test_lookup_unknown(): def test_lookup_unknown():
loader = secret_loader.SecretLoader() loader = secret_loader.SecretLoader()
assert loader.load('UNKNOWN') is None assert loader.load('UNKNOWN') is None
def test_fail_hardcoded_prefix_lowercase():
with pytest.raises(ValueError, match='Prefix must be uppercase'):
secret_loader.SecretLoader(ENV_KEY_PREFIX='test')
def test_fail_hardcoded_prefix_with_trailing_underscore():
with pytest.raises(
ValueError,
match=r'Prefix must not end with "_" \(this will be added automatically\)',
):
secret_loader.SecretLoader(ENV_KEY_PREFIX='TEST_')
def test_lookup_unknown_or_fail():
loader = secret_loader.SecretLoader(
ENV_KEY_PREFIX='TEST',
PASS_STORE_SUBFOLDER='test', # noqa: S106
)
with pytest.raises(
ValueError,
match='Failed to load secret with key:.*UNKNOWN.*',
) as e:
assert loader.load_or_fail('UNKNOWN')
assert 'Write secret to file' in str(e.value)
assert 'Add environment variable pointing to written secret' in str(e.value)
assert 'Write secret to password store entry' in str(e.value)
def test_convert_process():
loader = secret_loader.SecretLoader()
assert loader._convert_pass_process_result_to_password(1, b'') is None # noqa: SLF001
assert (
loader._convert_pass_process_result_to_password(0, b'Hello\nWorld') == 'Hello' # noqa: SLF001
)
assert loader._convert_pass_process_result_to_password(0, b'') == '' # noqa: SLF001