import abc import dataclasses import logging from collections.abc import Iterator, Mapping import fin_depo from frozendict import frozendict from personal_data.data import DeduplicateMode, Scraper from .. import secrets logger = logging.getLogger(__name__) @dataclasses.dataclass(frozen=True) class FinanceDepoScraper(Scraper): deduplicate_mode = DeduplicateMode.ONLY_LATEST deduplicate_ignore_columns = ['account.update_time'] @abc.abstractmethod def get_depo_fetcher(self): pass def scrape(self) -> Iterator[Mapping[str, object]]: depo = self.get_depo_fetcher().get_depo() data_point = { 'account.update_time': depo.updated_time, } for asset in depo.assets(): key = f'balance.{asset}' data_point[key] = depo.get_amount_of_asset(asset) yield frozendict(data_point) @dataclasses.dataclass(frozen=True) class KrakenAccountBalances(FinanceDepoScraper): dataset_name = 'defi_kraken_balance' def get_depo_fetcher(self): return fin_depo.defi_kraken.KrakenDepoFetcher( kraken_key=secrets.kraken_key(), kraken_secret=secrets.kraken_secret(), ) @dataclasses.dataclass(frozen=True) class KucoinAccountBalances(FinanceDepoScraper): dataset_name = 'defi_kucoin_balance' def get_depo_fetcher(self): return fin_depo.defi_kucoin.KucoinDepoFetcher( kucoin_key=secrets.kucoin_key(), kucoin_secret=secrets.kucoin_secret(), kucoin_pass=secrets.kucoin_pass(), ) @dataclasses.dataclass(frozen=True) class MpcBalance(FinanceDepoScraper): dataset_name = 'defi_mpc_balance' def get_depo_fetcher(self): return fin_depo.defi_partisia_blockchain.PartisiaBlockchainAccountDepoFetcher( self.session, blockchain_address=secrets.pbc_account_address(), )