Compare commits
No commits in common. "81264a3b6c1e2beddfd5f1326273edfc73235a5e" and "d34e2578e6db4f06b46fe46560f27b58eb9ceaf5" have entirely different histories.
81264a3b6c
...
d34e2578e6
|
@ -26,6 +26,8 @@ Secret loading order:
|
||||||
usage; very unsuited for server environments. Requires `pass` installed
|
usage; very unsuited for server environments. Requires `pass` installed
|
||||||
locally, and configuration of the `PASS_STORE_SUBFOLDER` through one of the above
|
locally, and configuration of the `PASS_STORE_SUBFOLDER` through one of the above
|
||||||
methods.
|
methods.
|
||||||
|
4. Vault instance if configured. Suited for production environments. **NOTE:
|
||||||
|
This is barely supported.** Requires `hvac` python package.
|
||||||
|
|
||||||
## Future extensions
|
## Future extensions
|
||||||
|
|
||||||
|
@ -52,6 +54,11 @@ import os
|
||||||
import subprocess
|
import subprocess
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
|
try:
|
||||||
|
import hvac
|
||||||
|
except ImportError:
|
||||||
|
hvac = None
|
||||||
|
|
||||||
from ._version import __version__
|
from ._version import __version__
|
||||||
|
|
||||||
__all__ = ['__version__', 'SecretLoader']
|
__all__ = ['__version__', 'SecretLoader']
|
||||||
|
@ -119,7 +126,7 @@ class SecretLoader:
|
||||||
msg = 'Prefix must be uppercase'
|
msg = 'Prefix must be uppercase'
|
||||||
raise ValueError(msg)
|
raise ValueError(msg)
|
||||||
if self.env_key_prefix.endswith('_'):
|
if self.env_key_prefix.endswith('_'):
|
||||||
msg = 'Prefix must not end with "_" (this will be added automatically)'
|
msg = 'Prefix must not end with _ (this will be added automatically)'
|
||||||
raise ValueError(msg)
|
raise ValueError(msg)
|
||||||
|
|
||||||
# Setup secrets path
|
# Setup secrets path
|
||||||
|
@ -132,13 +139,21 @@ class SecretLoader:
|
||||||
# Setup pass
|
# Setup pass
|
||||||
self.pass_folder = self._load_or_none(ENV_KEY_PASS_FOLDER)
|
self.pass_folder = self._load_or_none(ENV_KEY_PASS_FOLDER)
|
||||||
|
|
||||||
|
# Setup vault
|
||||||
|
if hvac:
|
||||||
|
self.vault_client = hvac.Client(
|
||||||
|
url=self._load_or_none(ENV_KEY_VAULT_URL),
|
||||||
|
token=self._load_or_none(ENV_KEY_VAULT_TOKEN),
|
||||||
|
vault_mount_point=self._load_or_none(ENV_KEY_VAULT_MOUNT_POINT),
|
||||||
|
)
|
||||||
|
|
||||||
def load_or_fail(self, secret_name: str) -> str:
|
def load_or_fail(self, secret_name: str) -> str:
|
||||||
"""Load secret with the given key, from one of the backends or
|
"""Load secret with the given key, from one of the backends or
|
||||||
throw an exception if not found.
|
throw an exception if not found.
|
||||||
"""
|
"""
|
||||||
value = self._load_or_none(secret_name)
|
value = self._load_or_none(secret_name)
|
||||||
if value is None:
|
if value is None:
|
||||||
raise ValueError(self._format_error_message(secret_name))
|
raise Exception(self._format_error_message(secret_name))
|
||||||
logger.info('Loaded secret with key: %s', secret_name)
|
logger.info('Loaded secret with key: %s', secret_name)
|
||||||
return value
|
return value
|
||||||
|
|
||||||
|
@ -163,6 +178,7 @@ class SecretLoader:
|
||||||
self.hardcoded.get(secret_name)
|
self.hardcoded.get(secret_name)
|
||||||
or self._load_or_none_path_or_file(secret_name)
|
or self._load_or_none_path_or_file(secret_name)
|
||||||
or self._load_or_none_local_password_store(secret_name)
|
or self._load_or_none_local_password_store(secret_name)
|
||||||
|
or self._load_or_none_vault(secret_name)
|
||||||
)
|
)
|
||||||
|
|
||||||
def _load_or_none_path_or_file(self, secret_name: str) -> str | None:
|
def _load_or_none_path_or_file(self, secret_name: str) -> str | None:
|
||||||
|
@ -204,22 +220,28 @@ class SecretLoader:
|
||||||
check=False,
|
check=False,
|
||||||
shell=False,
|
shell=False,
|
||||||
)
|
)
|
||||||
|
if process.returncode:
|
||||||
return self._convert_pass_process_result_to_password(
|
|
||||||
process.returncode,
|
|
||||||
process.stdout,
|
|
||||||
)
|
|
||||||
|
|
||||||
def _convert_pass_process_result_to_password(
|
|
||||||
self,
|
|
||||||
returncode: int,
|
|
||||||
stdout: bytes,
|
|
||||||
) -> str | None:
|
|
||||||
if returncode != 0:
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
password_file = stdout.decode('utf8')
|
password_file = process.stdout.decode('utf8')
|
||||||
return password_file.split('\n')[0]
|
for line in password_file.split('\n'):
|
||||||
|
return line
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _load_or_none_vault(self, secret_name: str) -> str | None:
|
||||||
|
"""Load secret from the configured vault instance.
|
||||||
|
|
||||||
|
Returns `None` if vault instance is not configured, or if the value
|
||||||
|
instance does not know about the secret.
|
||||||
|
"""
|
||||||
|
if self.vault_client is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
read_secret_result = self.vault_client.secrets.kv.v1.read_secret(
|
||||||
|
path=secret_name.lower(),
|
||||||
|
mount_point=self.vault_mount_point,
|
||||||
|
)
|
||||||
|
return read_secret_result['data']['value']
|
||||||
|
|
||||||
def _format_error_message(self, secret_name: str) -> str:
|
def _format_error_message(self, secret_name: str) -> str:
|
||||||
"""Formats an error message with solution suggestions for the given
|
"""Formats an error message with solution suggestions for the given
|
||||||
|
|
|
@ -1,5 +1,3 @@
|
||||||
import pytest
|
|
||||||
|
|
||||||
import secret_loader
|
import secret_loader
|
||||||
|
|
||||||
|
|
||||||
|
@ -8,54 +6,12 @@ def test_hardcoded():
|
||||||
assert loader.load('ENV_KEY_PREFIX') == 'TEST'
|
assert loader.load('ENV_KEY_PREFIX') == 'TEST'
|
||||||
assert loader.load('KEY') == 'VALUE'
|
assert loader.load('KEY') == 'VALUE'
|
||||||
|
|
||||||
assert loader.load_or_fail('ENV_KEY_PREFIX') == 'TEST'
|
|
||||||
assert loader.load_or_fail('KEY') == 'VALUE'
|
|
||||||
|
|
||||||
|
|
||||||
def test_lookup_secrets_dir():
|
def test_lookup_secrets_dir():
|
||||||
loader = secret_loader.SecretLoader(SECRETS_DIRECTORY='test/example-secrets')
|
loader = secret_loader.SecretLoader(SECRETS_DIRECTORY='test/example-secrets')
|
||||||
assert loader.load('MY_SECRET') == 'HELLO SECRET'
|
assert loader.load('MY_SECRET') == 'HELLO SECRET'
|
||||||
assert loader.load_or_fail('MY_SECRET') == 'HELLO SECRET'
|
|
||||||
|
|
||||||
|
|
||||||
def test_lookup_unknown():
|
def test_lookup_unknown():
|
||||||
loader = secret_loader.SecretLoader()
|
loader = secret_loader.SecretLoader()
|
||||||
assert loader.load('UNKNOWN') is None
|
assert loader.load('UNKNOWN') is None
|
||||||
|
|
||||||
|
|
||||||
def test_fail_hardcoded_prefix_lowercase():
|
|
||||||
with pytest.raises(ValueError, match='Prefix must be uppercase'):
|
|
||||||
secret_loader.SecretLoader(ENV_KEY_PREFIX='test')
|
|
||||||
|
|
||||||
|
|
||||||
def test_fail_hardcoded_prefix_with_trailing_underscore():
|
|
||||||
with pytest.raises(
|
|
||||||
ValueError,
|
|
||||||
match=r'Prefix must not end with "_" \(this will be added automatically\)',
|
|
||||||
):
|
|
||||||
secret_loader.SecretLoader(ENV_KEY_PREFIX='TEST_')
|
|
||||||
|
|
||||||
|
|
||||||
def test_lookup_unknown_or_fail():
|
|
||||||
loader = secret_loader.SecretLoader(
|
|
||||||
ENV_KEY_PREFIX='TEST',
|
|
||||||
PASS_STORE_SUBFOLDER='test', # noqa: S106
|
|
||||||
)
|
|
||||||
with pytest.raises(
|
|
||||||
ValueError,
|
|
||||||
match='Failed to load secret with key:.*UNKNOWN.*',
|
|
||||||
) as e:
|
|
||||||
assert loader.load_or_fail('UNKNOWN')
|
|
||||||
|
|
||||||
assert 'Write secret to file' in str(e.value)
|
|
||||||
assert 'Add environment variable pointing to written secret' in str(e.value)
|
|
||||||
assert 'Write secret to password store entry' in str(e.value)
|
|
||||||
|
|
||||||
|
|
||||||
def test_convert_process():
|
|
||||||
loader = secret_loader.SecretLoader()
|
|
||||||
assert loader._convert_pass_process_result_to_password(1, b'') is None # noqa: SLF001
|
|
||||||
assert (
|
|
||||||
loader._convert_pass_process_result_to_password(0, b'Hello\nWorld') == 'Hello' # noqa: SLF001
|
|
||||||
)
|
|
||||||
assert loader._convert_pass_process_result_to_password(0, b'') == '' # noqa: SLF001
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user