1
0
personal-data/personal_data/fetchers/fin_depo.py
Jon Michael Aanes 9d84733b94
Some checks failed
Run Python tests (through Pytest) / Test (push) Failing after 33s
Verify Python project can be installed, loaded and have version checked / Test (push) Has been cancelled
Secrets
2024-11-26 23:35:05 +01:00

71 lines
1.9 KiB
Python

import abc
import dataclasses
import logging
from collections.abc import Iterator, Mapping
import fin_depo
from frozendict import frozendict
from personal_data.data import DeduplicateMode, Scraper
from .. import secrets
logger = logging.getLogger(__name__)
@dataclasses.dataclass(frozen=True)
class FinanceDepoScraper(Scraper):
deduplicate_mode = DeduplicateMode.ONLY_LATEST
deduplicate_ignore_columns = ['account.update_time']
@abc.abstractmethod
def get_depo_fetcher(self):
pass
def scrape(self) -> Iterator[Mapping[str, object]]:
depo = self.get_depo_fetcher().get_depo()
data_point = {
'account.update_time': depo.updated_time,
}
for asset in depo.assets():
key = f'balance.{asset}'
data_point[key] = depo.get_amount_of_asset(asset)
yield frozendict(data_point)
@dataclasses.dataclass(frozen=True)
class KrakenAccountBalances(FinanceDepoScraper):
dataset_name = 'defi_kraken_balance'
def get_depo_fetcher(self):
return fin_depo.defi_kraken.KrakenDepoFetcher(
kraken_key=secrets.kraken_key(),
kraken_secret=secrets.kraken_secret(),
)
@dataclasses.dataclass(frozen=True)
class KucoinAccountBalances(FinanceDepoScraper):
dataset_name = 'defi_kucoin_balance'
def get_depo_fetcher(self):
return fin_depo.defi_kucoin.KucoinDepoFetcher(
kucoin_key=secrets.kucoin_key(),
kucoin_secret=secrets.kucoin_secret(),
kucoin_pass=secrets.kucoin_pass(),
)
@dataclasses.dataclass(frozen=True)
class MpcBalance(FinanceDepoScraper):
dataset_name = 'defi_mpc_balance'
def get_depo_fetcher(self):
return fin_depo.defi_partisia_blockchain.PartisiaBlockchainAccountDepoFetcher(
self.session,
blockchain_address=secrets.pbc_account_address(),
)