import dataclasses import datetime import logging from collections.abc import Iterator, Mapping from decimal import Decimal import krakenex from frozendict import frozendict from personal_data.data import DeduplicateMode, Scraper from .. import secrets logger = logging.getLogger(__name__) # TODO: Move these into secrets! client = krakenex.API( secrets.KRAKEN_KEY, secrets.KRAKEN_SECRET, ) @dataclasses.dataclass(frozen=True) class KrakenAccountBalances(Scraper): dataset_name = 'defi_kraken_balance' deduplicate_mode = DeduplicateMode.ONLY_LATEST deduplicate_ignore_columns = ['account.update_time'] def scrape(self) -> Iterator[Mapping[str, object]]: data_point = { 'account.update_time': datetime.datetime.now(tz=datetime.UTC), } result = client.query_private('Balance') for account, balance_str in result['result'].items(): key = f'balance.{account}' balance = Decimal(balance_str) balance = balance if balance != 0 else Decimal(0) data_point[key] = data_point.get(key, Decimal(0)) + balance del key, balance yield frozendict(data_point)