Moved autoload to protocol
This commit is contained in:
parent
c5ae68811d
commit
b35a1875f1
68
clients_protocol/autoload.py
Normal file
68
clients_protocol/autoload.py
Normal file
|
@ -0,0 +1,68 @@
|
|||
import abc
|
||||
import functools
|
||||
import importlib
|
||||
import logging
|
||||
from collections.abc import Iterator
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def load_module_with_name(name: str) -> object | None:
|
||||
try:
|
||||
module = importlib.import_module(name)
|
||||
except Exception:
|
||||
logger.exception('Module %s failed to load!', name)
|
||||
return None
|
||||
logger.info('Loaded module: %s', name)
|
||||
return module
|
||||
|
||||
|
||||
def is_loadable_python(f: Path) -> bool:
|
||||
if f.is_file() and f.suffix == '.py':
|
||||
return True
|
||||
if f.is_dir():
|
||||
return is_loadable_python(f / '__init__.py')
|
||||
return False
|
||||
|
||||
|
||||
def get_modules_in_directory(module_directory: Path) -> Iterator[str]:
|
||||
for f in module_directory.iterdir():
|
||||
name = f.parts[-1].removesuffix('.py')
|
||||
if is_loadable_python(f):
|
||||
if name != '__init__':
|
||||
yield name
|
||||
|
||||
|
||||
@functools.cache
|
||||
def load_submodules(module_name: str, index_module_path: str | Path) -> None:
|
||||
"""Loads the submodules for the given module name.
|
||||
|
||||
This method is not recursive, as it is the submodule's responsibility to
|
||||
manage it's own submodules. That said: The load_submodules system works
|
||||
when invoked from an auto-loaded package.
|
||||
"""
|
||||
module_directory = Path(index_module_path)
|
||||
submodules = list(get_modules_in_directory(module_directory))
|
||||
logger.info(
|
||||
'Detected %d submodules for %s',
|
||||
len(submodules),
|
||||
module_name,
|
||||
)
|
||||
for submodule_name in submodules:
|
||||
load_module_with_name(f'{module_name}.{submodule_name}')
|
||||
|
||||
|
||||
def _find_subclasses(class_: type, class_accum: list[type]) -> None:
|
||||
is_abstract = abc.ABC in class_.__bases__
|
||||
|
||||
if not is_abstract:
|
||||
class_accum.append(class_)
|
||||
for subclass in class_.__subclasses__():
|
||||
_find_subclasses(subclass, class_accum)
|
||||
|
||||
|
||||
def find_subclasses(abstract_class: type) -> list[type]:
|
||||
subclasses = []
|
||||
_find_subclasses(abstract_class, subclasses)
|
||||
return subclasses
|
Loading…
Reference in New Issue
Block a user