import dataclasses import datetime import logging import re import json import secrets import bs4 import personal_data.html_util import personal_data.parse_util from personal_data.data import DeduplicateMode, Scraper logger = logging.getLogger(__name__) # mainnet: https://reader.partisiablockchain.com # testnet: https://node1.testnet.partisiablockchain.com HOSTNAME = 'reader.partisiablockchain.com' URL_ACCOUNT_PLUGIN = 'https://{hostname}/{shard}blockchain/accountPlugin/local' URL_ACCOUNT_PLUGIN_GLOBAL = 'https://{hostname}/{shard}blockchain/accountPlugin/global' @dataclasses.dataclass(frozen=True) class MpcBalance(Scraper): dataset_name = 'defi_mpc_balance' deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS def scrape(self): address = '0019e9a28c978dd65114cc4e0bcb876770805b0349' # TODO headers = { 'Content-Type': 'application/json', 'Accept': 'application/json', } url = URL_ACCOUNT_PLUGIN.format( hostname = HOSTNAME, shard = 'shards/Shard0/', ) data = {'path':[{'type':'field','name':'accounts'},{'type':'avl','keyType':'BLOCKCHAIN_ADDRESS','key':address}]} response = self.session.post(url, headers = headers, data=json.dumps(data)) response.raise_for_status() print(response.headers) json_data = response.json() print(json_data ) data_point = { 'account.address': address, 'account.update_time': response.headers.get('last-modified') or response.headers.get('date'), } data_point['balance.MPC'] = json_data['mpcTokens'] print(data_point) yield data_point