1
0
fin-depo/fin_depo/static.py
Jon Michael Aanes 836b3b2a35
All checks were successful
Python Ruff Code Quality / ruff (push) Successful in 22s
Run Python tests (through Pytest) / Test (push) Successful in 28s
Verify Python project can be installed, loaded and have version checked / Test (push) Successful in 23s
Code quality improvements
2024-11-01 11:30:22 +01:00

53 lines
1.4 KiB
Python

"""Static depository fetcher.
This module does not represent an API or other integration. This module
contains aggregators and static depositories.
"""
import dataclasses
import datetime
import logging
from decimal import Decimal
import fin_defs
from .data import DepoFetcher, DepoGroup, DepoSingle
logger = logging.getLogger(__name__)
@dataclasses.dataclass(frozen=True)
class StaticDepoFetcher(DepoFetcher):
"""Depository "fetcher" that doesn't do any fetching.
Is given a static set of assets and will always return these directly.
"""
name: str
depo_assets: dict[fin_defs.Asset, Decimal]
last_updated: datetime.datetime = dataclasses.field(
default_factory=lambda: datetime.datetime.now(tz=datetime.UTC),
)
def get_depo(self) -> DepoSingle:
return DepoSingle(
name=self.name,
_assets=self.depo_assets,
updated_time=self.last_updated,
)
@dataclasses.dataclass(frozen=True)
class AggregateDepoFetcher(DepoFetcher):
"""Depository "fetcher" that delegates to the aggregated fetchers."""
name: str
aggregated: list[DepoFetcher]
def get_depo(self) -> DepoGroup:
return DepoGroup(
name=self.name,
nested=[fetcher.get_depo() for fetcher in self.aggregated],
updated_time=datetime.datetime.now(tz=datetime.UTC),
)