1
0
personal-data/personal_data/fetchers/fin_depo.py

73 lines
1.9 KiB
Python

import abc
import dataclasses
import datetime
import logging
from collections.abc import Iterator, Mapping
import fin_depo
import kucoin.client
from frozendict import frozendict
from personal_data.data import DeduplicateMode, Scraper
from .. import secrets
logger = logging.getLogger(__name__)
@dataclasses.dataclass(frozen=True)
class FinanceDepoScraper(Scraper):
deduplicate_mode = DeduplicateMode.ONLY_LATEST
deduplicate_ignore_columns = ['account.update_time']
@abc.abstractmethod
def get_depo_fetcher(self):
pass
def scrape(self) -> Iterator[Mapping[str, object]]:
depo = self.get_depo_fetcher().get_depo()
data_point = {
'account.update_time': depo.updated_time,
}
for asset in depo.assets():
key = f'balance.{asset}'
data_point[key] = depo.get_amount_of_asset(asset)
yield frozendict(data_point)
@dataclasses.dataclass(frozen=True)
class KrakenAccountBalances(FinanceDepoScraper):
dataset_name = 'defi_kraken_balance'
def get_depo_fetcher(self):
return fin_depo.defi_kraken.KrakenDepoFetcher(
kraken_key=secrets.KRAKEN_KEY,
kraken_secret=secrets.KRAKEN_SECRET,
)
@dataclasses.dataclass(frozen=True)
class KucoinAccountBalances(FinanceDepoScraper):
dataset_name = 'defi_kucoin_balance'
def get_depo_fetcher(self):
return fin_depo.defi_kucoin.KucoinDepoFetcher(
kucoin_key=secrets.KUCOIN_KEY,
kucoin_secret=secrets.KUCOIN_SECRET,
kucoin_pass=secrets.KUCOIN_PASS,
)
@dataclasses.dataclass(frozen=True)
class MpcBalance(FinanceDepoScraper):
dataset_name = 'defi_mpc_balance'
def get_depo_fetcher(self):
return fin_depo.defi_partisia_blockchain.PartisiaBlockchainAccountDepoFetcher(
self.session,
blockchain_address = secrets.PBC_ACCOUNT_ADDRESS
)