1
0
fin-depo/fin_depo/static.py

53 lines
1.4 KiB
Python
Raw Permalink Normal View History

"""Static depository fetcher.
This module does not represent an API or other integration. This module
contains aggregators and static depositories.
"""
2024-07-27 01:14:12 +00:00
import dataclasses
import datetime
import logging
from decimal import Decimal
import fin_defs
2024-07-27 01:14:12 +00:00
from .data import DepoFetcher, DepoGroup, DepoSingle
logger = logging.getLogger(__name__)
2024-07-27 01:14:12 +00:00
@dataclasses.dataclass(frozen=True)
class StaticDepoFetcher(DepoFetcher):
"""Depository "fetcher" that doesn't do any fetching.
Is given a static set of assets and will always return these directly.
"""
name: str
2024-07-27 01:14:12 +00:00
depo_assets: dict[fin_defs.Asset, Decimal]
last_updated: datetime.datetime = dataclasses.field(
default_factory=lambda: datetime.datetime.now(tz=datetime.UTC),
)
def get_depo(self) -> DepoSingle:
return DepoSingle(
name=self.name,
_assets=self.depo_assets,
updated_time=self.last_updated,
)
2024-07-27 01:14:12 +00:00
@dataclasses.dataclass(frozen=True)
class AggregateDepoFetcher(DepoFetcher):
"""Depository "fetcher" that delegates to the aggregated fetchers."""
name: str
aggregated: list[DepoFetcher]
def get_depo(self) -> DepoGroup:
return DepoGroup(
2024-07-27 01:14:12 +00:00
name=self.name,
nested=[fetcher.get_depo() for fetcher in self.aggregated],
2024-11-01 10:30:18 +00:00
updated_time=datetime.datetime.now(tz=datetime.UTC),
)