1
0
personal-data/personal_data/fetchers/partisia_blockchain.py

102 lines
3.1 KiB
Python
Raw Normal View History

2024-04-16 21:23:27 +00:00
import dataclasses
import datetime
2024-04-23 20:58:25 +00:00
import email.utils
2024-04-16 21:38:35 +00:00
import json
2024-04-23 20:58:25 +00:00
import logging
2024-05-15 22:47:42 +00:00
from collections.abc import Iterator, Mapping
2024-04-16 22:02:51 +00:00
from decimal import Decimal
2024-04-16 21:23:27 +00:00
2024-05-15 22:47:42 +00:00
from frozendict import frozendict
2024-04-16 21:23:27 +00:00
from personal_data.data import DeduplicateMode, Scraper
2024-04-23 20:58:25 +00:00
2024-04-16 22:48:21 +00:00
from .. import secrets
2024-04-16 21:23:27 +00:00
logger = logging.getLogger(__name__)
# mainnet: https://reader.partisiablockchain.com
# testnet: https://node1.testnet.partisiablockchain.com
HOSTNAME = 'reader.partisiablockchain.com'
URL_ACCOUNT_PLUGIN = 'https://{hostname}/{shard}blockchain/accountPlugin/local'
2024-04-16 21:38:35 +00:00
URL_ACCOUNT_PLUGIN_GLOBAL = 'https://{hostname}/{shard}blockchain/accountPlugin/global'
2024-04-16 21:23:27 +00:00
2024-04-23 20:58:25 +00:00
2024-04-23 16:04:12 +00:00
def shard_id_for_address(address: str) -> str:
2024-04-23 20:58:25 +00:00
return 'shards/Shard2/' # TODO
2024-04-23 16:04:12 +00:00
2024-04-16 21:23:27 +00:00
@dataclasses.dataclass(frozen=True)
class MpcBalance(Scraper):
dataset_name = 'defi_mpc_balance'
deduplicate_mode = DeduplicateMode.ONLY_LATEST
deduplicate_ignore_columns = ['account.update_time']
2024-04-16 21:23:27 +00:00
2024-04-23 20:11:43 +00:00
def get_json(self, url: str, data: dict) -> tuple[dict, datetime.datetime]:
2024-04-16 22:02:51 +00:00
headers = {
2024-04-23 20:58:25 +00:00
'Content-Type': 'application/json',
'Accept': 'application/json',
2024-04-16 22:02:51 +00:00
}
2024-04-23 20:58:25 +00:00
response = self.session.post(url, headers=headers, data=json.dumps(data))
2024-04-16 22:02:51 +00:00
response.raise_for_status()
2024-04-23 20:58:25 +00:00
date_text = response.headers.get('last-modified') or response.headers.get(
'date',
)
2024-04-23 20:11:43 +00:00
date = email.utils.parsedate_to_datetime(date_text)
2024-04-17 22:24:19 +00:00
json_data = response.json()
if json_data is None:
msg = 'No result data for ' + url
raise Exception(msg)
return json_data, date
2024-04-16 22:02:51 +00:00
def determine_coins(self) -> list[dict]:
2024-04-23 20:58:25 +00:00
data: dict = {'path': []}
2024-04-16 22:02:51 +00:00
url = URL_ACCOUNT_PLUGIN_GLOBAL.format(
2024-04-23 20:58:25 +00:00
hostname=HOSTNAME,
shard='',
2024-04-16 22:02:51 +00:00
)
json_data, date = self.get_json(url, data=data)
return json_data['coins']['coins']
2024-05-15 22:47:42 +00:00
def scrape_latest_values_for(self, address: str) -> frozendict[str, object]:
2024-04-16 22:02:51 +00:00
coins = self.determine_coins()
2024-04-16 21:23:27 +00:00
url = URL_ACCOUNT_PLUGIN.format(
2024-04-23 20:58:25 +00:00
hostname=HOSTNAME,
shard=shard_id_for_address(address),
2024-04-16 21:23:27 +00:00
)
2024-04-23 20:58:25 +00:00
data: dict = {
'path': [
{'type': 'field', 'name': 'accounts'},
{'type': 'avl', 'keyType': 'BLOCKCHAIN_ADDRESS', 'key': address},
],
}
2024-04-16 22:02:51 +00:00
account_data, date = self.get_json(url, data=data)
2024-04-16 21:23:27 +00:00
2024-04-16 21:38:35 +00:00
data_point = {
'account.address': address,
2024-04-16 22:02:51 +00:00
'account.update_time': date,
2024-04-16 21:38:35 +00:00
}
2024-04-16 21:23:27 +00:00
2024-05-15 22:29:06 +00:00
data_point['balance.MPC'] = Decimal(account_data['mpcTokens']) / 1000
2024-04-16 22:02:51 +00:00
for coin_idx, amount_data in enumerate(account_data['accountCoins']):
2024-04-23 20:58:25 +00:00
coin_data = coins[coin_idx]
byoc_balance = Decimal(amount_data['balance'])
2024-04-16 22:02:51 +00:00
denominator = Decimal(coin_data['conversionRate']['denominator'])
native_balance = byoc_balance / denominator
2024-05-15 22:29:06 +00:00
data_point['balance.' + coin_data['symbol']] = native_balance
2024-04-16 22:02:51 +00:00
del coin_idx, coin_data
2024-04-16 21:23:27 +00:00
2024-05-15 22:47:42 +00:00
return frozendict(data_point)
def scrape(self) -> Iterator[Mapping[str, object]]:
yield self.scrape_latest_values_for(secrets.PBC_ACCOUNT_ADDRESS)