import abc import dataclasses import datetime import logging from collections.abc import Iterator, Mapping import fin_depo import kucoin.client from frozendict import frozendict from personal_data.data import DeduplicateMode, Scraper from .. import secrets logger = logging.getLogger(__name__) # TODO: Move these into secrets! client = kucoin.client.Client( secrets.KUCOIN_KEY, secrets.KUCOIN_SECRET, secrets.KUCOIN_PASS, ) @dataclasses.dataclass(frozen=True) class FinanceDepoScraper(Scraper): @abc.abstractmethod def get_depo_fetcher(self): pass def scrape(self) -> Iterator[Mapping[str, object]]: depo = self.get_depo_fetcher().get_depo() data_point = { 'account.update_time': depo.updated_time, } for asset in depo.assets(): key = f'balance.{asset}' data_point[key] = depo.get_amount_of_asset(asset) print(data_point) yield frozendict(data_point) @dataclasses.dataclass(frozen=True) class KrakenAccountBalances(FinanceDepoScraper): dataset_name = 'defi_kraken_balance' deduplicate_mode = DeduplicateMode.ONLY_LATEST deduplicate_ignore_columns = ['account.update_time'] def get_depo_fetcher(self): return fin_depo.defi_kraken.KrakenDepoFetcher( kraken_key=secrets.KRAKEN_KEY, kraken_secret=secrets.KRAKEN_SECRET, ) @dataclasses.dataclass(frozen=True) class KucoinAccountBalances(FinanceDepoScraper): dataset_name = 'defi_kucoin_balance' deduplicate_mode = DeduplicateMode.ONLY_LATEST deduplicate_ignore_columns = ['account.update_time'] def get_depo_fetcher(self): return fin_depo.defi_kucoin.KucoinDepoFetcher( kucoin_key=secrets.KUCOIN_KEY, kucoin_secret=secrets.KUCOIN_SECRET, kucoin_pass=secrets.KUCOIN_PASS, ) def addresses_to_data_points(addresses: list[dict[str, str]]) -> frozendict: data_point = {} data_point['account.update_time'] = datetime.datetime.now(tz=datetime.UTC) data_point['account.num_deposit_addresses'] = len(addresses) if len(addresses) > 0: for k, v in addresses[-1].items(): data_point[f'deposit.{k}'] = v return frozendict(data_point) @dataclasses.dataclass(frozen=True) class KucoinDepositAddresses(Scraper): dataset_name = 'defi_kucoin_deposit_address' deduplicate_mode = DeduplicateMode.ONLY_LATEST deduplicate_ignore_columns = ['account.update_time'] def scrape(self) -> Iterator[Mapping[str, object]]: addresses = client.get_deposit_address('MPC') yield addresses_to_data_points(addresses)