1
0
personal-data/personal_data/fetchers/partisia_blockchain.py
Jon Michael Aanes afd2f4a0b3
Some checks failed
Build container / Package-Python (push) Failing after 26s
Build container / Package-Container (push) Successful in 1m22s
PBC: Parse datetime correctly
2024-04-23 22:11:43 +02:00

93 lines
2.9 KiB
Python

import dataclasses
import datetime
import logging
import re
import json
import secrets
from decimal import Decimal
import email.utils
import bs4
import personal_data.html_util
import personal_data.parse_util
from personal_data.data import DeduplicateMode, Scraper
from .. import secrets
logger = logging.getLogger(__name__)
# mainnet: https://reader.partisiablockchain.com
# testnet: https://node1.testnet.partisiablockchain.com
HOSTNAME = 'reader.partisiablockchain.com'
URL_ACCOUNT_PLUGIN = 'https://{hostname}/{shard}blockchain/accountPlugin/local'
URL_ACCOUNT_PLUGIN_GLOBAL = 'https://{hostname}/{shard}blockchain/accountPlugin/global'
def shard_id_for_address(address: str) -> str:
return 'shards/Shard2/' # TODO
@dataclasses.dataclass(frozen=True)
class MpcBalance(Scraper):
dataset_name = 'defi_mpc_balance'
deduplicate_mode = DeduplicateMode.ONLY_LATEST
deduplicate_ignore_columns = ['account.update_time']
def get_json(self, url: str, data: dict) -> tuple[dict, datetime.datetime]:
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
}
response = self.session.post(url, headers = headers, data=json.dumps(data))
response.raise_for_status()
date_text = response.headers.get('last-modified') or response.headers.get('date')
date = email.utils.parsedate_to_datetime(date_text)
json_data = response.json()
if json_data is None:
msg = 'No result data for ' + url
raise Exception(msg)
return json_data, date
def determine_coins(self) -> list[dict]:
data: dict = {'path':[]}
url = URL_ACCOUNT_PLUGIN_GLOBAL.format(
hostname = HOSTNAME,
shard = '',
)
json_data, date = self.get_json(url, data=data)
return json_data['coins']['coins']
def scrape(self):
address = secrets.PBC_ACCOUNT_ADDRESS
coins = self.determine_coins()
url = URL_ACCOUNT_PLUGIN.format(
hostname = HOSTNAME,
shard = shard_id_for_address(address),
)
data: dict = {'path':[{'type':'field','name':'accounts'},{'type':'avl','keyType':'BLOCKCHAIN_ADDRESS','key':address}]}
account_data, date = self.get_json(url, data=data)
data_point = {
'account.address': address,
'account.update_time': date,
}
data_point['balance.MPC'] = str(Decimal(account_data['mpcTokens'])/1000)
for coin_idx, amount_data in enumerate(account_data['accountCoins']):
coin_data = coins[coin_idx]
byoc_balance = Decimal(amount_data ['balance'])
denominator = Decimal(coin_data['conversionRate']['denominator'])
native_balance = byoc_balance / denominator
data_point['balance.'+coin_data['symbol']] = str(native_balance)
del coin_idx, coin_data
yield data_point