diff --git a/personal_data/fetchers/defi_partisia_blockchain.py b/personal_data/fetchers/defi_partisia_blockchain.py index fa5d43f..ee70dbf 100644 --- a/personal_data/fetchers/defi_partisia_blockchain.py +++ b/personal_data/fetchers/defi_partisia_blockchain.py @@ -6,6 +6,8 @@ import logging from collections.abc import Iterator, Mapping from decimal import Decimal +import enforce_typing +import requests from frozendict import frozendict from personal_data.data import DeduplicateMode, Scraper @@ -23,27 +25,44 @@ HOSTNAME = 'reader.partisiablockchain.com' URL_ACCOUNT_PLUGIN = 'https://{hostname}/{shard}blockchain/accountPlugin/local' URL_ACCOUNT_PLUGIN_GLOBAL = 'https://{hostname}/{shard}blockchain/accountPlugin/global' +URL_CONTRACT_STATE = 'https://{hostname}/{shard}blockchain/contracts/{address}?requireContractState=false' + MPC_DECIMALS = 10000 def shard_id_for_address(address: str) -> str: - return 'shards/Shard2/' # TODO + if address.endswith('a'): + return 'shards/Shard0/' # TODO + elif address.endswith('2'): + return 'shards/Shard1/' # TODO + else: + return 'shards/Shard2/' # TODO +@enforce_typing.enforce_types @dataclasses.dataclass(frozen=True) -class MpcBalance(Scraper): - dataset_name = 'defi_mpc_balance' - deduplicate_mode = DeduplicateMode.ONLY_LATEST - deduplicate_ignore_columns = ['account.update_time'] +class Balances: + update_time: datetime.datetime + balances: Mapping[str, Decimal] - def get_json(self, url: str, data: dict) -> tuple[dict, datetime.datetime]: + +@enforce_typing.enforce_types +@dataclasses.dataclass(frozen=True) +class PbcClient: + session: requests.Session + + def get_json( + self, url: str, data: Mapping[str, str] = frozendict(), method='POST', + ) -> tuple[dict, datetime.datetime]: headers = { 'Content-Type': 'application/json', 'Accept': 'application/json', } - response = self.session.post(url, headers=headers, data=json.dumps(data)) + response = self.session.request( + method, url, headers=headers, data=json.dumps(data), + ) response.raise_for_status() date_text = response.headers.get('last-modified') or response.headers.get( 'date', @@ -53,9 +72,9 @@ class MpcBalance(Scraper): if json_data is None: msg = 'No result data for ' + url raise Exception(msg) - return json_data, date + return (json_data, date) - def determine_coins(self) -> list[dict]: + def determine_coins(self) -> list[dict[str, str]]: data: dict = {'path': []} url = URL_ACCOUNT_PLUGIN_GLOBAL.format( @@ -66,7 +85,7 @@ class MpcBalance(Scraper): json_data, date = self.get_json(url, data=data) return json_data['coins']['coins'] - def scrape_latest_values_for(self, address: str) -> frozendict[str, object]: + def get_account_balances(self, address: str) -> Balances: coins = self.determine_coins() url = URL_ACCOUNT_PLUGIN.format( @@ -82,22 +101,79 @@ class MpcBalance(Scraper): } account_data, date = self.get_json(url, data=data) - data_point = { - 'account.address': address, - 'account.update_time': date, - } - - data_point['balance.MPC'] = Decimal(account_data['mpcTokens']) / MPC_DECIMALS + balances: dict[str, Decimal] = {} + balances['MPC'] = Decimal(account_data['mpcTokens']) / MPC_DECIMALS for coin_idx, amount_data in enumerate(account_data['accountCoins']): coin_data = coins[coin_idx] byoc_balance = Decimal(amount_data['balance']) denominator = Decimal(coin_data['conversionRate']['denominator']) native_balance = byoc_balance / denominator - data_point['balance.' + coin_data['symbol']] = native_balance + balances[coin_data['symbol']] = native_balance del coin_idx, coin_data + return Balances(date, balances) + + def get_contract_state(self, address: str) -> tuple[dict, datetime.datetime]: + url = URL_CONTRACT_STATE.format( + hostname=HOSTNAME, + shard=shard_id_for_address(address), + address=address, + ) + data: dict = {'path': []} + return self.get_json(url, data=data) + + +@dataclasses.dataclass(frozen=True) +class MpcBalance(Scraper): + dataset_name = 'defi_mpc_balance' + deduplicate_mode = DeduplicateMode.ONLY_LATEST + deduplicate_ignore_columns = ['account.update_time'] + + def scrape_balances_for(self, address: str) -> frozendict[str, object]: + client = PbcClient(self.session) + balances = client.get_account_balances(address) + + data_point = { + 'account.address': address, + 'account.update_time': balances.update_time, + } + + for token, amount in balances.balances.items(): + data_point['balance.' + token] = amount + del token, amount + return frozendict(data_point) def scrape(self) -> Iterator[Mapping[str, object]]: - yield self.scrape_latest_values_for(secrets.PBC_ACCOUNT_ADDRESS) + yield self.scrape_balances_for(secrets.PBC_ACCOUNT_ADDRESS) + + +PBC_FOUNDATION_CONTRACT_ADDRESSES = [ + ('012635f1c0a9bffd59853c6496e1c26ebda0e2b4da', 'Foundation Sales'), + ('0135edec2c9fed33f45cf2538dc06ba139c4bb8f62', 'Foundation Team'), + ('01ad44bb0277a8df16408006c375a6fa015bb22c97', 'Foundation Eco-System'), +] + + +@dataclasses.dataclass(frozen=True) +class PbcFoundationBalance(Scraper): + dataset_name = 'pbc_foundation_balances' + deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS + deduplicate_ignore_columns = [ + 'contract.update_time', + 'contract.name', + 'contract.state.balance', + ] + + def scrape(self) -> Iterator[Mapping[str, object]]: + client = PbcClient(self.session) + for address, contract_name in PBC_FOUNDATION_CONTRACT_ADDRESSES: + contract_state, update_time = client.get_contract_state(address) + yield { + 'contract.update_time': update_time, + 'contract.name': contract_name, + 'contract.address': address, + 'contract.state.nonce': contract_state['nonce'], + 'contract.state.balance': contract_state['remainingTokens'], + } diff --git a/personal_data/main.py b/personal_data/main.py index 6cce2fe..07be504 100644 --- a/personal_data/main.py +++ b/personal_data/main.py @@ -3,7 +3,7 @@ import datetime import decimal import io import logging -from collections.abc import Sequence +from collections.abc import Iterable, Mapping, Sequence from decimal import Decimal import requests @@ -63,6 +63,20 @@ def to_value(s: str) -> object: return s +def equals_without_fields( + a: Mapping[str, object], + b: Mapping[str, object], + fields: Iterable[str] = frozenset(), +) -> bool: + a = dict(a) + b = dict(b) + + for f in fields: + del a[f], b[f] + + return frozendict(a) == frozendict(b) + + def deduplicate_dicts( dicts: Sequence[dict], deduplicate_mode: personal_data.data.DeduplicateMode, @@ -76,15 +90,6 @@ def deduplicate_dicts( del k del d - def equals_without_fields(a, b, fields=[]): - a = dict(a) - b = dict(b) - - for f in fields: - del a[f], b[f] - - return frozendict(a) == frozendict(b) - if deduplicate_mode == personal_data.data.DeduplicateMode.ONLY_LATEST: while len(dicts) >= 2 and equals_without_fields( dicts[-1], @@ -92,6 +97,13 @@ def deduplicate_dicts( deduplicate_ignore_columns, ): del dicts[-1] + elif deduplicate_mode == personal_data.data.DeduplicateMode.BY_ALL_COLUMNS: + to_remove = set() + for idx1, first in enumerate(dicts): + for second in dicts[idx1 + 1 :]: + if equals_without_fields(first, second, deduplicate_ignore_columns): + to_remove.add(second) + dicts = set(dicts) - to_remove elif deduplicate_mode != personal_data.data.DeduplicateMode.NONE: dicts = set(dicts) diff --git a/test/test_deduplicate.py b/test/test_deduplicate.py index b484aa5..55729d7 100644 --- a/test/test_deduplicate.py +++ b/test/test_deduplicate.py @@ -4,6 +4,8 @@ from personal_data.data import DeduplicateMode from personal_data.main import deduplicate_dicts LIST = [ + frozendict({'a': 0, 'b': 12, 't': 300}), + frozendict({'a': 0, 'b': 12, 't': 301}), frozendict({'a': 1, 'b': 2, 't': 300}), frozendict({'a': 1, 'b': 2, 't': 301}), frozendict({'a': 1, 'b': 2, 't': 302}), @@ -11,19 +13,24 @@ LIST = [ ] -def test_no_deduplicate(): - ls, fields = deduplicate_dicts(LIST, DeduplicateMode.NONE, []) - assert fields == ['a', 'b', 't'] - assert ls == LIST - - -def test_only_latest_no_fields(): - ls, fields = deduplicate_dicts(LIST, DeduplicateMode.ONLY_LATEST, []) - assert fields == ['a', 'b', 't'] - assert ls == LIST +def test_no_fields_to_ignore(): + for mode in DeduplicateMode: + ls, fields = deduplicate_dicts(LIST, mode, []) + assert fields == ['a', 'b', 't'] + assert ls == LIST def test_only_latest(): ls, fields = deduplicate_dicts(LIST, DeduplicateMode.ONLY_LATEST, ['t']) assert fields == ['a', 'b', 't'] - assert ls == [frozendict({'a': 1, 'b': 2, 't': 300})] + assert ls == ls[:3] + + +def test_all_fields(): + ls, fields = deduplicate_dicts(LIST, DeduplicateMode.BY_ALL_COLUMNS, ['t']) + assert fields == ['a', 'b', 't'] + print(ls) + assert ls == [ + frozendict({'a': 0, 'b': 12, 't': 300}), + frozendict({'a': 1, 'b': 2, 't': 300}), + ]