diff --git a/personal_data/fetchers/defi_kucoin.py b/personal_data/fetchers/defi_kucoin.py index 9d6bd56..34d595c 100644 --- a/personal_data/fetchers/defi_kucoin.py +++ b/personal_data/fetchers/defi_kucoin.py @@ -22,56 +22,6 @@ client = kucoin.client.Client( secrets.KUCOIN_PASS, ) - -@dataclasses.dataclass(frozen=True) -class FinanceDepoScraper(Scraper): - @abc.abstractmethod - def get_depo_fetcher(self): - pass - - def scrape(self) -> Iterator[Mapping[str, object]]: - depo = self.get_depo_fetcher().get_depo() - - data_point = { - 'account.update_time': depo.updated_time, - } - - for asset in depo.assets(): - key = f'balance.{asset}' - data_point[key] = depo.get_amount_of_asset(asset) - - print(data_point) - - yield frozendict(data_point) - - -@dataclasses.dataclass(frozen=True) -class KrakenAccountBalances(FinanceDepoScraper): - dataset_name = 'defi_kraken_balance' - deduplicate_mode = DeduplicateMode.ONLY_LATEST - deduplicate_ignore_columns = ['account.update_time'] - - def get_depo_fetcher(self): - return fin_depo.defi_kraken.KrakenDepoFetcher( - kraken_key=secrets.KRAKEN_KEY, - kraken_secret=secrets.KRAKEN_SECRET, - ) - - -@dataclasses.dataclass(frozen=True) -class KucoinAccountBalances(FinanceDepoScraper): - dataset_name = 'defi_kucoin_balance' - deduplicate_mode = DeduplicateMode.ONLY_LATEST - deduplicate_ignore_columns = ['account.update_time'] - - def get_depo_fetcher(self): - return fin_depo.defi_kucoin.KucoinDepoFetcher( - kucoin_key=secrets.KUCOIN_KEY, - kucoin_secret=secrets.KUCOIN_SECRET, - kucoin_pass=secrets.KUCOIN_PASS, - ) - - def addresses_to_data_points(addresses: list[dict[str, str]]) -> frozendict: data_point = {} data_point['account.update_time'] = datetime.datetime.now(tz=datetime.UTC) diff --git a/personal_data/fetchers/defi_partisia_blockchain.py b/personal_data/fetchers/defi_partisia_blockchain.py index 8821bf0..338f5ba 100644 --- a/personal_data/fetchers/defi_partisia_blockchain.py +++ b/personal_data/fetchers/defi_partisia_blockchain.py @@ -10,148 +10,13 @@ import requests from frozendict import frozendict from personal_data.data import DeduplicateMode, Scraper +import fin_depo from .. import secrets logger = logging.getLogger(__name__) -# mainnet: https://reader.partisiablockchain.com -# testnet: https://node1.testnet.partisiablockchain.com - - -HOSTNAME = 'reader.partisiablockchain.com' - -URL_ACCOUNT_PLUGIN = 'https://{hostname}/{shard}blockchain/accountPlugin/local' -URL_ACCOUNT_PLUGIN_GLOBAL = 'https://{hostname}/{shard}blockchain/accountPlugin/global' -URL_CONTRACT_STATE = 'https://{hostname}/{shard}blockchain/contracts/{address}?requireContractState=false' - - -MPC_DECIMALS = 10000 - - -def shard_id_for_address(address: str) -> str: - if address.endswith('a'): - return 'shards/Shard0/' # TODO - elif address.endswith('2'): - return 'shards/Shard1/' # TODO - else: - return 'shards/Shard2/' # TODO - - -@dataclasses.dataclass(frozen=True) -class Balances: - update_time: datetime.datetime - balances: Mapping[str, Decimal] - - -@dataclasses.dataclass(frozen=True) -class PbcClient: - session: requests.Session - - def get_json( - self, - url: str, - data: Mapping[str, str] = frozendict(), - method='POST', - ) -> tuple[dict, datetime.datetime]: - headers = { - 'Content-Type': 'application/json', - 'Accept': 'application/json', - } - - response = self.session.request( - method, - url, - headers=headers, - data=json.dumps(data), - ) - response.raise_for_status() - date_text = response.headers.get('last-modified') or response.headers.get( - 'date', - ) - date = email.utils.parsedate_to_datetime(date_text) - json_data = response.json() - if json_data is None: - msg = 'No result data for ' + url - raise Exception(msg) - return (json_data, date) - - def determine_coins(self) -> list[dict[str, str]]: - data: dict = {'path': []} - - url = URL_ACCOUNT_PLUGIN_GLOBAL.format( - hostname=HOSTNAME, - shard='', - ) - - json_data, date = self.get_json(url, data=data) - return json_data['coins']['coins'] - - def get_account_balances(self, address: str) -> Balances: - coins = self.determine_coins() - - url = URL_ACCOUNT_PLUGIN.format( - hostname=HOSTNAME, - shard=shard_id_for_address(address), - ) - - data: dict = { - 'path': [ - {'type': 'field', 'name': 'accounts'}, - {'type': 'avl', 'keyType': 'BLOCKCHAIN_ADDRESS', 'key': address}, - ], - } - account_data, date = self.get_json(url, data=data) - - balances: dict[str, Decimal] = {} - balances['MPC'] = Decimal(account_data['mpcTokens']) / MPC_DECIMALS - - for coin_idx, amount_data in enumerate(account_data['accountCoins']): - coin_data = coins[coin_idx] - byoc_balance = Decimal(amount_data['balance']) - denominator = Decimal(coin_data['conversionRate']['denominator']) - native_balance = byoc_balance / denominator - balances[coin_data['symbol']] = native_balance - del coin_idx, coin_data - - return Balances(date, balances) - - def get_contract_state(self, address: str) -> tuple[dict, datetime.datetime]: - url = URL_CONTRACT_STATE.format( - hostname=HOSTNAME, - shard=shard_id_for_address(address), - address=address, - ) - data: dict = {'path': []} - return self.get_json(url, data=data) - - -@dataclasses.dataclass(frozen=True) -class MpcBalance(Scraper): - dataset_name = 'defi_mpc_balance' - deduplicate_mode = DeduplicateMode.ONLY_LATEST - deduplicate_ignore_columns = ['account.update_time'] - - def scrape_balances_for(self, address: str) -> frozendict[str, object]: - client = PbcClient(self.session) - balances = client.get_account_balances(address) - - data_point = { - 'account.address': address, - 'account.update_time': balances.update_time, - } - - for token, amount in balances.balances.items(): - data_point['balance.' + token] = amount - del token, amount - - return frozendict(data_point) - - def scrape(self) -> Iterator[Mapping[str, object]]: - yield self.scrape_balances_for(secrets.PBC_ACCOUNT_ADDRESS) - - PBC_FOUNDATION_CONTRACT_ADDRESSES = [ ('012635f1c0a9bffd59853c6496e1c26ebda0e2b4da', 'Foundation Sales'), ('0135edec2c9fed33f45cf2538dc06ba139c4bb8f62', 'Foundation Team'), @@ -170,7 +35,7 @@ class PbcFoundationBalance(Scraper): ] def scrape(self) -> Iterator[Mapping[str, object]]: - client = PbcClient(self.session) + client = fin_depo.defi_partisia_blockchain.PbcClient(self.session) for address, contract_name in PBC_FOUNDATION_CONTRACT_ADDRESSES: contract_state, update_time = client.get_contract_state(address) yield { diff --git a/personal_data/fetchers/fin_depo.py b/personal_data/fetchers/fin_depo.py new file mode 100644 index 0000000..9ce1e85 --- /dev/null +++ b/personal_data/fetchers/fin_depo.py @@ -0,0 +1,72 @@ +import abc +import dataclasses +import datetime +import logging +from collections.abc import Iterator, Mapping + +import fin_depo +import kucoin.client +from frozendict import frozendict + +from personal_data.data import DeduplicateMode, Scraper + +from .. import secrets + +logger = logging.getLogger(__name__) + + +@dataclasses.dataclass(frozen=True) +class FinanceDepoScraper(Scraper): + deduplicate_mode = DeduplicateMode.ONLY_LATEST + deduplicate_ignore_columns = ['account.update_time'] + + @abc.abstractmethod + def get_depo_fetcher(self): + pass + + def scrape(self) -> Iterator[Mapping[str, object]]: + depo = self.get_depo_fetcher().get_depo() + + data_point = { + 'account.update_time': depo.updated_time, + } + + for asset in depo.assets(): + key = f'balance.{asset}' + data_point[key] = depo.get_amount_of_asset(asset) + + yield frozendict(data_point) + + +@dataclasses.dataclass(frozen=True) +class KrakenAccountBalances(FinanceDepoScraper): + dataset_name = 'defi_kraken_balance' + + def get_depo_fetcher(self): + return fin_depo.defi_kraken.KrakenDepoFetcher( + kraken_key=secrets.KRAKEN_KEY, + kraken_secret=secrets.KRAKEN_SECRET, + ) + + +@dataclasses.dataclass(frozen=True) +class KucoinAccountBalances(FinanceDepoScraper): + dataset_name = 'defi_kucoin_balance' + + def get_depo_fetcher(self): + return fin_depo.defi_kucoin.KucoinDepoFetcher( + kucoin_key=secrets.KUCOIN_KEY, + kucoin_secret=secrets.KUCOIN_SECRET, + kucoin_pass=secrets.KUCOIN_PASS, + ) + +@dataclasses.dataclass(frozen=True) +class MpcBalance(FinanceDepoScraper): + dataset_name = 'defi_mpc_balance' + + def get_depo_fetcher(self): + return fin_depo.defi_partisia_blockchain.PartisiaBlockchainAccountDepoFetcher( + self.session, + blockchain_address = secrets.PBC_ACCOUNT_ADDRESS + ) +