import dataclasses import datetime import logging import re import json import secrets from decimal import Decimal import bs4 import personal_data.html_util import personal_data.parse_util from personal_data.data import DeduplicateMode, Scraper from .. import secrets logger = logging.getLogger(__name__) # mainnet: https://reader.partisiablockchain.com # testnet: https://node1.testnet.partisiablockchain.com HOSTNAME = 'reader.partisiablockchain.com' URL_ACCOUNT_PLUGIN = 'https://{hostname}/{shard}blockchain/accountPlugin/local' URL_ACCOUNT_PLUGIN_GLOBAL = 'https://{hostname}/{shard}blockchain/accountPlugin/global' @dataclasses.dataclass(frozen=True) class MpcBalance(Scraper): dataset_name = 'defi_mpc_balance' deduplicate_mode = DeduplicateMode.ONLY_LATEST deduplicate_ignore_columns = ['account.update_time'] def get_json(self, url: str, data: dict) -> tuple[dict,str]: headers = { 'Content-Type': 'application/json', 'Accept': 'application/json', } response = self.session.post(url, headers = headers, data=json.dumps(data)) response.raise_for_status() date = response.headers.get('last-modified') or response.headers.get('date') return response.json(), date def determine_coins(self) -> list[dict]: data: dict = {'path':[]} url = URL_ACCOUNT_PLUGIN_GLOBAL.format( hostname = HOSTNAME, shard = '', #shard = 'shards/Shard0/', ) json_data, date = self.get_json(url, data=data) return json_data['coins']['coins'] def scrape(self): address = secrets.PBC_ACCOUNT_ADDRESS coins = self.determine_coins() headers = { 'Content-Type': 'application/json', 'Accept': 'application/json', } url = URL_ACCOUNT_PLUGIN.format( hostname = HOSTNAME, shard = 'shards/Shard0/', ) data: dict = {'path':[{'type':'field','name':'accounts'},{'type':'avl','keyType':'BLOCKCHAIN_ADDRESS','key':address}]} account_data, date = self.get_json(url, data=data) data_point = { 'account.address': address, 'account.update_time': date, } data_point['balance.MPC'] = str(Decimal(account_data['mpcTokens'])/1000) for coin_idx, amount_data in enumerate(account_data['accountCoins']): coin_data = coins[coin_idx] byoc_balance = Decimal(amount_data ['balance']) denominator = Decimal(coin_data['conversionRate']['denominator']) native_balance = byoc_balance / denominator data_point['balance.'+coin_data['symbol']] = str(native_balance) del coin_idx, coin_data yield data_point