Compare commits
2 Commits
d34e2578e6
...
81264a3b6c
Author | SHA1 | Date | |
---|---|---|---|
81264a3b6c | |||
83f310e28a |
|
@ -26,8 +26,6 @@ Secret loading order:
|
|||
usage; very unsuited for server environments. Requires `pass` installed
|
||||
locally, and configuration of the `PASS_STORE_SUBFOLDER` through one of the above
|
||||
methods.
|
||||
4. Vault instance if configured. Suited for production environments. **NOTE:
|
||||
This is barely supported.** Requires `hvac` python package.
|
||||
|
||||
## Future extensions
|
||||
|
||||
|
@ -54,11 +52,6 @@ import os
|
|||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
try:
|
||||
import hvac
|
||||
except ImportError:
|
||||
hvac = None
|
||||
|
||||
from ._version import __version__
|
||||
|
||||
__all__ = ['__version__', 'SecretLoader']
|
||||
|
@ -126,7 +119,7 @@ class SecretLoader:
|
|||
msg = 'Prefix must be uppercase'
|
||||
raise ValueError(msg)
|
||||
if self.env_key_prefix.endswith('_'):
|
||||
msg = 'Prefix must not end with _ (this will be added automatically)'
|
||||
msg = 'Prefix must not end with "_" (this will be added automatically)'
|
||||
raise ValueError(msg)
|
||||
|
||||
# Setup secrets path
|
||||
|
@ -139,21 +132,13 @@ class SecretLoader:
|
|||
# Setup pass
|
||||
self.pass_folder = self._load_or_none(ENV_KEY_PASS_FOLDER)
|
||||
|
||||
# Setup vault
|
||||
if hvac:
|
||||
self.vault_client = hvac.Client(
|
||||
url=self._load_or_none(ENV_KEY_VAULT_URL),
|
||||
token=self._load_or_none(ENV_KEY_VAULT_TOKEN),
|
||||
vault_mount_point=self._load_or_none(ENV_KEY_VAULT_MOUNT_POINT),
|
||||
)
|
||||
|
||||
def load_or_fail(self, secret_name: str) -> str:
|
||||
"""Load secret with the given key, from one of the backends or
|
||||
throw an exception if not found.
|
||||
"""
|
||||
value = self._load_or_none(secret_name)
|
||||
if value is None:
|
||||
raise Exception(self._format_error_message(secret_name))
|
||||
raise ValueError(self._format_error_message(secret_name))
|
||||
logger.info('Loaded secret with key: %s', secret_name)
|
||||
return value
|
||||
|
||||
|
@ -178,7 +163,6 @@ class SecretLoader:
|
|||
self.hardcoded.get(secret_name)
|
||||
or self._load_or_none_path_or_file(secret_name)
|
||||
or self._load_or_none_local_password_store(secret_name)
|
||||
or self._load_or_none_vault(secret_name)
|
||||
)
|
||||
|
||||
def _load_or_none_path_or_file(self, secret_name: str) -> str | None:
|
||||
|
@ -220,28 +204,22 @@ class SecretLoader:
|
|||
check=False,
|
||||
shell=False,
|
||||
)
|
||||
if process.returncode:
|
||||
return None
|
||||
|
||||
password_file = process.stdout.decode('utf8')
|
||||
for line in password_file.split('\n'):
|
||||
return line
|
||||
return None
|
||||
|
||||
def _load_or_none_vault(self, secret_name: str) -> str | None:
|
||||
"""Load secret from the configured vault instance.
|
||||
|
||||
Returns `None` if vault instance is not configured, or if the value
|
||||
instance does not know about the secret.
|
||||
"""
|
||||
if self.vault_client is None:
|
||||
return None
|
||||
|
||||
read_secret_result = self.vault_client.secrets.kv.v1.read_secret(
|
||||
path=secret_name.lower(),
|
||||
mount_point=self.vault_mount_point,
|
||||
return self._convert_pass_process_result_to_password(
|
||||
process.returncode,
|
||||
process.stdout,
|
||||
)
|
||||
return read_secret_result['data']['value']
|
||||
|
||||
def _convert_pass_process_result_to_password(
|
||||
self,
|
||||
returncode: int,
|
||||
stdout: bytes,
|
||||
) -> str | None:
|
||||
if returncode != 0:
|
||||
return None
|
||||
|
||||
password_file = stdout.decode('utf8')
|
||||
return password_file.split('\n')[0]
|
||||
|
||||
def _format_error_message(self, secret_name: str) -> str:
|
||||
"""Formats an error message with solution suggestions for the given
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
import pytest
|
||||
|
||||
import secret_loader
|
||||
|
||||
|
||||
|
@ -6,12 +8,54 @@ def test_hardcoded():
|
|||
assert loader.load('ENV_KEY_PREFIX') == 'TEST'
|
||||
assert loader.load('KEY') == 'VALUE'
|
||||
|
||||
assert loader.load_or_fail('ENV_KEY_PREFIX') == 'TEST'
|
||||
assert loader.load_or_fail('KEY') == 'VALUE'
|
||||
|
||||
|
||||
def test_lookup_secrets_dir():
|
||||
loader = secret_loader.SecretLoader(SECRETS_DIRECTORY='test/example-secrets')
|
||||
assert loader.load('MY_SECRET') == 'HELLO SECRET'
|
||||
assert loader.load_or_fail('MY_SECRET') == 'HELLO SECRET'
|
||||
|
||||
|
||||
def test_lookup_unknown():
|
||||
loader = secret_loader.SecretLoader()
|
||||
assert loader.load('UNKNOWN') is None
|
||||
|
||||
|
||||
def test_fail_hardcoded_prefix_lowercase():
|
||||
with pytest.raises(ValueError, match='Prefix must be uppercase'):
|
||||
secret_loader.SecretLoader(ENV_KEY_PREFIX='test')
|
||||
|
||||
|
||||
def test_fail_hardcoded_prefix_with_trailing_underscore():
|
||||
with pytest.raises(
|
||||
ValueError,
|
||||
match=r'Prefix must not end with "_" \(this will be added automatically\)',
|
||||
):
|
||||
secret_loader.SecretLoader(ENV_KEY_PREFIX='TEST_')
|
||||
|
||||
|
||||
def test_lookup_unknown_or_fail():
|
||||
loader = secret_loader.SecretLoader(
|
||||
ENV_KEY_PREFIX='TEST',
|
||||
PASS_STORE_SUBFOLDER='test', # noqa: S106
|
||||
)
|
||||
with pytest.raises(
|
||||
ValueError,
|
||||
match='Failed to load secret with key:.*UNKNOWN.*',
|
||||
) as e:
|
||||
assert loader.load_or_fail('UNKNOWN')
|
||||
|
||||
assert 'Write secret to file' in str(e.value)
|
||||
assert 'Add environment variable pointing to written secret' in str(e.value)
|
||||
assert 'Write secret to password store entry' in str(e.value)
|
||||
|
||||
|
||||
def test_convert_process():
|
||||
loader = secret_loader.SecretLoader()
|
||||
assert loader._convert_pass_process_result_to_password(1, b'') is None # noqa: SLF001
|
||||
assert (
|
||||
loader._convert_pass_process_result_to_password(0, b'Hello\nWorld') == 'Hello' # noqa: SLF001
|
||||
)
|
||||
assert loader._convert_pass_process_result_to_password(0, b'') == '' # noqa: SLF001
|
||||
|
|
Loading…
Reference in New Issue
Block a user