1
0
fin-depo/fin_depo/static.py
Jon Michael Aanes 66ad96af16
All checks were successful
Test Python / Test (push) Successful in 26s
Added static and aggregator depo fetchers
2024-07-25 00:52:09 +02:00

50 lines
1.4 KiB
Python

"""Static depository fetcher.
This module does not represent an API or other integration. This module
contains aggregators and static depositories.
"""
import datetime
import logging
from decimal import Decimal
import fin_defs
import krakenex
import dataclasses
from .data import Depo, DepoFetcher, DepoSingle, DepoGroup
logger = logging.getLogger(__name__)
@dataclasses.dataclass(frozen=True)
class StaticDepoFetcher(DepoFetcher):
"""Depository "fetcher" that doesn't do any fetching.
Is given a static set of assets and will always return these directly.
"""
name: str
depo_assets: dict[fin_defs.Asset,Decimal]
last_updated: datetime.datetime = dataclasses.field(default_factory=lambda: datetime.datetime.now(tz=datetime.UTC))
def get_depo(self) -> DepoSingle:
return DepoSingle(
name=self.name,
_assets=self.depo_assets,
updated_time=self.last_updated,
)
@dataclasses.dataclass(frozen=True)
class AggregateDepoFetcher(DepoFetcher):
"""Depository "fetcher" that delegates to the aggregated fetchers."""
name: str
aggregated: list[DepoFetcher]
def get_depo(self) -> DepoGroup:
return DepoGroup(
name=self.name,
nested=[fetcher.get_depo() for fetcher in self.aggregated],
updated_time=datetime.datetime.now(tz=datetime.UTC), # TODO
)