40 lines
1.3 KiB
Python
40 lines
1.3 KiB
Python
import dataclasses
|
|
import logging
|
|
from collections.abc import Iterator, Mapping
|
|
|
|
import fin_depo
|
|
|
|
from personal_data.data import DeduplicateMode, Scraper
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
PBC_FOUNDATION_CONTRACT_ADDRESSES = [
|
|
('012635f1c0a9bffd59853c6496e1c26ebda0e2b4da', 'Foundation Sales'),
|
|
('0135edec2c9fed33f45cf2538dc06ba139c4bb8f62', 'Foundation Team'),
|
|
('01ad44bb0277a8df16408006c375a6fa015bb22c97', 'Foundation Eco-System'),
|
|
]
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True)
|
|
class PbcFoundationBalance(Scraper):
|
|
dataset_name = 'pbc_foundation_balances'
|
|
deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS
|
|
deduplicate_ignore_columns = [
|
|
'contract.update_time',
|
|
'contract.name',
|
|
'contract.state.balance',
|
|
]
|
|
|
|
def scrape(self) -> Iterator[Mapping[str, object]]:
|
|
client = fin_depo.defi_partisia_blockchain.PbcClient(self.session)
|
|
for address, contract_name in PBC_FOUNDATION_CONTRACT_ADDRESSES:
|
|
contract_state, update_time = client.get_contract_state(address)
|
|
yield {
|
|
'contract.update_time': update_time,
|
|
'contract.name': contract_name,
|
|
'contract.address': address,
|
|
'contract.state.nonce': contract_state['nonce'],
|
|
'contract.state.balance': contract_state['remainingTokens'],
|
|
}
|