Compare commits
2 Commits
d34e2578e6
...
81264a3b6c
Author | SHA1 | Date | |
---|---|---|---|
81264a3b6c | |||
83f310e28a |
|
@ -26,8 +26,6 @@ Secret loading order:
|
||||||
usage; very unsuited for server environments. Requires `pass` installed
|
usage; very unsuited for server environments. Requires `pass` installed
|
||||||
locally, and configuration of the `PASS_STORE_SUBFOLDER` through one of the above
|
locally, and configuration of the `PASS_STORE_SUBFOLDER` through one of the above
|
||||||
methods.
|
methods.
|
||||||
4. Vault instance if configured. Suited for production environments. **NOTE:
|
|
||||||
This is barely supported.** Requires `hvac` python package.
|
|
||||||
|
|
||||||
## Future extensions
|
## Future extensions
|
||||||
|
|
||||||
|
@ -54,11 +52,6 @@ import os
|
||||||
import subprocess
|
import subprocess
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
try:
|
|
||||||
import hvac
|
|
||||||
except ImportError:
|
|
||||||
hvac = None
|
|
||||||
|
|
||||||
from ._version import __version__
|
from ._version import __version__
|
||||||
|
|
||||||
__all__ = ['__version__', 'SecretLoader']
|
__all__ = ['__version__', 'SecretLoader']
|
||||||
|
@ -126,7 +119,7 @@ class SecretLoader:
|
||||||
msg = 'Prefix must be uppercase'
|
msg = 'Prefix must be uppercase'
|
||||||
raise ValueError(msg)
|
raise ValueError(msg)
|
||||||
if self.env_key_prefix.endswith('_'):
|
if self.env_key_prefix.endswith('_'):
|
||||||
msg = 'Prefix must not end with _ (this will be added automatically)'
|
msg = 'Prefix must not end with "_" (this will be added automatically)'
|
||||||
raise ValueError(msg)
|
raise ValueError(msg)
|
||||||
|
|
||||||
# Setup secrets path
|
# Setup secrets path
|
||||||
|
@ -139,21 +132,13 @@ class SecretLoader:
|
||||||
# Setup pass
|
# Setup pass
|
||||||
self.pass_folder = self._load_or_none(ENV_KEY_PASS_FOLDER)
|
self.pass_folder = self._load_or_none(ENV_KEY_PASS_FOLDER)
|
||||||
|
|
||||||
# Setup vault
|
|
||||||
if hvac:
|
|
||||||
self.vault_client = hvac.Client(
|
|
||||||
url=self._load_or_none(ENV_KEY_VAULT_URL),
|
|
||||||
token=self._load_or_none(ENV_KEY_VAULT_TOKEN),
|
|
||||||
vault_mount_point=self._load_or_none(ENV_KEY_VAULT_MOUNT_POINT),
|
|
||||||
)
|
|
||||||
|
|
||||||
def load_or_fail(self, secret_name: str) -> str:
|
def load_or_fail(self, secret_name: str) -> str:
|
||||||
"""Load secret with the given key, from one of the backends or
|
"""Load secret with the given key, from one of the backends or
|
||||||
throw an exception if not found.
|
throw an exception if not found.
|
||||||
"""
|
"""
|
||||||
value = self._load_or_none(secret_name)
|
value = self._load_or_none(secret_name)
|
||||||
if value is None:
|
if value is None:
|
||||||
raise Exception(self._format_error_message(secret_name))
|
raise ValueError(self._format_error_message(secret_name))
|
||||||
logger.info('Loaded secret with key: %s', secret_name)
|
logger.info('Loaded secret with key: %s', secret_name)
|
||||||
return value
|
return value
|
||||||
|
|
||||||
|
@ -178,7 +163,6 @@ class SecretLoader:
|
||||||
self.hardcoded.get(secret_name)
|
self.hardcoded.get(secret_name)
|
||||||
or self._load_or_none_path_or_file(secret_name)
|
or self._load_or_none_path_or_file(secret_name)
|
||||||
or self._load_or_none_local_password_store(secret_name)
|
or self._load_or_none_local_password_store(secret_name)
|
||||||
or self._load_or_none_vault(secret_name)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
def _load_or_none_path_or_file(self, secret_name: str) -> str | None:
|
def _load_or_none_path_or_file(self, secret_name: str) -> str | None:
|
||||||
|
@ -220,28 +204,22 @@ class SecretLoader:
|
||||||
check=False,
|
check=False,
|
||||||
shell=False,
|
shell=False,
|
||||||
)
|
)
|
||||||
if process.returncode:
|
|
||||||
return None
|
|
||||||
|
|
||||||
password_file = process.stdout.decode('utf8')
|
return self._convert_pass_process_result_to_password(
|
||||||
for line in password_file.split('\n'):
|
process.returncode,
|
||||||
return line
|
process.stdout,
|
||||||
return None
|
|
||||||
|
|
||||||
def _load_or_none_vault(self, secret_name: str) -> str | None:
|
|
||||||
"""Load secret from the configured vault instance.
|
|
||||||
|
|
||||||
Returns `None` if vault instance is not configured, or if the value
|
|
||||||
instance does not know about the secret.
|
|
||||||
"""
|
|
||||||
if self.vault_client is None:
|
|
||||||
return None
|
|
||||||
|
|
||||||
read_secret_result = self.vault_client.secrets.kv.v1.read_secret(
|
|
||||||
path=secret_name.lower(),
|
|
||||||
mount_point=self.vault_mount_point,
|
|
||||||
)
|
)
|
||||||
return read_secret_result['data']['value']
|
|
||||||
|
def _convert_pass_process_result_to_password(
|
||||||
|
self,
|
||||||
|
returncode: int,
|
||||||
|
stdout: bytes,
|
||||||
|
) -> str | None:
|
||||||
|
if returncode != 0:
|
||||||
|
return None
|
||||||
|
|
||||||
|
password_file = stdout.decode('utf8')
|
||||||
|
return password_file.split('\n')[0]
|
||||||
|
|
||||||
def _format_error_message(self, secret_name: str) -> str:
|
def _format_error_message(self, secret_name: str) -> str:
|
||||||
"""Formats an error message with solution suggestions for the given
|
"""Formats an error message with solution suggestions for the given
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
import pytest
|
||||||
|
|
||||||
import secret_loader
|
import secret_loader
|
||||||
|
|
||||||
|
|
||||||
|
@ -6,12 +8,54 @@ def test_hardcoded():
|
||||||
assert loader.load('ENV_KEY_PREFIX') == 'TEST'
|
assert loader.load('ENV_KEY_PREFIX') == 'TEST'
|
||||||
assert loader.load('KEY') == 'VALUE'
|
assert loader.load('KEY') == 'VALUE'
|
||||||
|
|
||||||
|
assert loader.load_or_fail('ENV_KEY_PREFIX') == 'TEST'
|
||||||
|
assert loader.load_or_fail('KEY') == 'VALUE'
|
||||||
|
|
||||||
|
|
||||||
def test_lookup_secrets_dir():
|
def test_lookup_secrets_dir():
|
||||||
loader = secret_loader.SecretLoader(SECRETS_DIRECTORY='test/example-secrets')
|
loader = secret_loader.SecretLoader(SECRETS_DIRECTORY='test/example-secrets')
|
||||||
assert loader.load('MY_SECRET') == 'HELLO SECRET'
|
assert loader.load('MY_SECRET') == 'HELLO SECRET'
|
||||||
|
assert loader.load_or_fail('MY_SECRET') == 'HELLO SECRET'
|
||||||
|
|
||||||
|
|
||||||
def test_lookup_unknown():
|
def test_lookup_unknown():
|
||||||
loader = secret_loader.SecretLoader()
|
loader = secret_loader.SecretLoader()
|
||||||
assert loader.load('UNKNOWN') is None
|
assert loader.load('UNKNOWN') is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_fail_hardcoded_prefix_lowercase():
|
||||||
|
with pytest.raises(ValueError, match='Prefix must be uppercase'):
|
||||||
|
secret_loader.SecretLoader(ENV_KEY_PREFIX='test')
|
||||||
|
|
||||||
|
|
||||||
|
def test_fail_hardcoded_prefix_with_trailing_underscore():
|
||||||
|
with pytest.raises(
|
||||||
|
ValueError,
|
||||||
|
match=r'Prefix must not end with "_" \(this will be added automatically\)',
|
||||||
|
):
|
||||||
|
secret_loader.SecretLoader(ENV_KEY_PREFIX='TEST_')
|
||||||
|
|
||||||
|
|
||||||
|
def test_lookup_unknown_or_fail():
|
||||||
|
loader = secret_loader.SecretLoader(
|
||||||
|
ENV_KEY_PREFIX='TEST',
|
||||||
|
PASS_STORE_SUBFOLDER='test', # noqa: S106
|
||||||
|
)
|
||||||
|
with pytest.raises(
|
||||||
|
ValueError,
|
||||||
|
match='Failed to load secret with key:.*UNKNOWN.*',
|
||||||
|
) as e:
|
||||||
|
assert loader.load_or_fail('UNKNOWN')
|
||||||
|
|
||||||
|
assert 'Write secret to file' in str(e.value)
|
||||||
|
assert 'Add environment variable pointing to written secret' in str(e.value)
|
||||||
|
assert 'Write secret to password store entry' in str(e.value)
|
||||||
|
|
||||||
|
|
||||||
|
def test_convert_process():
|
||||||
|
loader = secret_loader.SecretLoader()
|
||||||
|
assert loader._convert_pass_process_result_to_password(1, b'') is None # noqa: SLF001
|
||||||
|
assert (
|
||||||
|
loader._convert_pass_process_result_to_password(0, b'Hello\nWorld') == 'Hello' # noqa: SLF001
|
||||||
|
)
|
||||||
|
assert loader._convert_pass_process_result_to_password(0, b'') == '' # noqa: SLF001
|
||||||
|
|
Loading…
Reference in New Issue
Block a user