1
0
personal-data/personal_data/fetchers/defi_partisia_blockchain.py

48 lines
1.5 KiB
Python

import dataclasses
import datetime
import email.utils
import json
import logging
from collections.abc import Iterator, Mapping
from decimal import Decimal
import requests
from frozendict import frozendict
from personal_data.data import DeduplicateMode, Scraper
import fin_depo
from .. import secrets
logger = logging.getLogger(__name__)
PBC_FOUNDATION_CONTRACT_ADDRESSES = [
('012635f1c0a9bffd59853c6496e1c26ebda0e2b4da', 'Foundation Sales'),
('0135edec2c9fed33f45cf2538dc06ba139c4bb8f62', 'Foundation Team'),
('01ad44bb0277a8df16408006c375a6fa015bb22c97', 'Foundation Eco-System'),
]
@dataclasses.dataclass(frozen=True)
class PbcFoundationBalance(Scraper):
dataset_name = 'pbc_foundation_balances'
deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS
deduplicate_ignore_columns = [
'contract.update_time',
'contract.name',
'contract.state.balance',
]
def scrape(self) -> Iterator[Mapping[str, object]]:
client = fin_depo.defi_partisia_blockchain.PbcClient(self.session)
for address, contract_name in PBC_FOUNDATION_CONTRACT_ADDRESSES:
contract_state, update_time = client.get_contract_state(address)
yield {
'contract.update_time': update_time,
'contract.name': contract_name,
'contract.address': address,
'contract.state.nonce': contract_state['nonce'],
'contract.state.balance': contract_state['remainingTokens'],
}