From d2baf865c94bf6d028bad6bbb5d8dd8cb2c66957 Mon Sep 17 00:00:00 2001 From: Jon Michael Aanes Date: Wed, 17 Apr 2024 00:16:40 +0200 Subject: [PATCH] Support deduplication with ignored columns --- personal_data/__init__.py | 19 ++++++++++++++++++- personal_data/data.py | 2 +- personal_data/fetchers/partisia_blockchain.py | 7 ++++--- 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/personal_data/__init__.py b/personal_data/__init__.py index 0c4635a..d76a9b5 100644 --- a/personal_data/__init__.py +++ b/personal_data/__init__.py @@ -60,6 +60,7 @@ def extend_csv_file( filename: str, new_dicts: dict, deduplicate_mode: personal_data.data.DeduplicateMode, + deduplicate_ignore_columns: list[str], ): dicts = [] try: @@ -87,8 +88,23 @@ def extend_csv_file( del k del d - if deduplicate_mode != personal_data.data.DeduplicateMode.NONE: + def equals_without_fields(a, b, fields = []): + a = dict(a) + b = dict(b) + + for f in fields: + del a[f], b[f] + + return frozendict(a) == frozendict(b) + + + if deduplicate_mode == personal_data.data.DeduplicateMode.ONLY_LATEST: + while len(dicts) >= 2 and equals_without_fields(dicts[-1], dicts[-2], deduplicate_ignore_columns): + del dicts[-1] + elif deduplicate_mode != personal_data.data.DeduplicateMode.NONE: dicts = set(dicts) + + dicts = sorted(dicts, key=lambda d: tuple(str(d.get(fn, '')) for fn in fieldnames)) csvfile_in_memory = io.StringIO() @@ -160,6 +176,7 @@ def main(scraper_filter: frozenset[str]): 'output/' + scraper.dataset_name, result_rows, deduplicate_mode=scraper.deduplicate_mode, + deduplicate_ignore_columns=scraper.deduplicate_ignore_columns, ) logger.warning('Scraper done: %s', scraper.dataset_name) del scraper, session diff --git a/personal_data/data.py b/personal_data/data.py index 0bddc51..b97a7f2 100644 --- a/personal_data/data.py +++ b/personal_data/data.py @@ -9,7 +9,7 @@ class DeduplicateMode(Enum): NONE = 0 BY_FIRST_COLUMN = 1 BY_ALL_COLUMNS = 2 - + ONLY_LATEST = 3 @dataclasses.dataclass(frozen=True) class Scraper(abc.ABC): diff --git a/personal_data/fetchers/partisia_blockchain.py b/personal_data/fetchers/partisia_blockchain.py index ee993ae..8fb5350 100644 --- a/personal_data/fetchers/partisia_blockchain.py +++ b/personal_data/fetchers/partisia_blockchain.py @@ -27,7 +27,8 @@ URL_ACCOUNT_PLUGIN_GLOBAL = 'https://{hostname}/{shard}blockchain/accountPlugin/ @dataclasses.dataclass(frozen=True) class MpcBalance(Scraper): dataset_name = 'defi_mpc_balance' - deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS + deduplicate_mode = DeduplicateMode.ONLY_LATEST + deduplicate_ignore_columns = ['account.update_time'] def get_json(self, url: str, data: dict) -> tuple[dict,str]: headers = { @@ -74,14 +75,14 @@ class MpcBalance(Scraper): 'account.update_time': date, } - data_point['balance.MPC'] = account_data['mpcTokens'] + data_point['balance.MPC'] = str(Decimal(account_data['mpcTokens'])/1000) for coin_idx, amount_data in enumerate(account_data['accountCoins']): coin_data = coins[coin_idx] byoc_balance = Decimal(amount_data ['balance']) denominator = Decimal(coin_data['conversionRate']['denominator']) native_balance = byoc_balance / denominator - data_point['balance.'+coin_data['symbol']] = native_balance + data_point['balance.'+coin_data['symbol']] = str(native_balance) del coin_idx, coin_data yield data_point