Support deduplication with ignored columns
This commit is contained in:
parent
614957c465
commit
d2baf865c9
|
@ -60,6 +60,7 @@ def extend_csv_file(
|
||||||
filename: str,
|
filename: str,
|
||||||
new_dicts: dict,
|
new_dicts: dict,
|
||||||
deduplicate_mode: personal_data.data.DeduplicateMode,
|
deduplicate_mode: personal_data.data.DeduplicateMode,
|
||||||
|
deduplicate_ignore_columns: list[str],
|
||||||
):
|
):
|
||||||
dicts = []
|
dicts = []
|
||||||
try:
|
try:
|
||||||
|
@ -87,8 +88,23 @@ def extend_csv_file(
|
||||||
del k
|
del k
|
||||||
del d
|
del d
|
||||||
|
|
||||||
if deduplicate_mode != personal_data.data.DeduplicateMode.NONE:
|
def equals_without_fields(a, b, fields = []):
|
||||||
|
a = dict(a)
|
||||||
|
b = dict(b)
|
||||||
|
|
||||||
|
for f in fields:
|
||||||
|
del a[f], b[f]
|
||||||
|
|
||||||
|
return frozendict(a) == frozendict(b)
|
||||||
|
|
||||||
|
|
||||||
|
if deduplicate_mode == personal_data.data.DeduplicateMode.ONLY_LATEST:
|
||||||
|
while len(dicts) >= 2 and equals_without_fields(dicts[-1], dicts[-2], deduplicate_ignore_columns):
|
||||||
|
del dicts[-1]
|
||||||
|
elif deduplicate_mode != personal_data.data.DeduplicateMode.NONE:
|
||||||
dicts = set(dicts)
|
dicts = set(dicts)
|
||||||
|
|
||||||
|
|
||||||
dicts = sorted(dicts, key=lambda d: tuple(str(d.get(fn, '')) for fn in fieldnames))
|
dicts = sorted(dicts, key=lambda d: tuple(str(d.get(fn, '')) for fn in fieldnames))
|
||||||
|
|
||||||
csvfile_in_memory = io.StringIO()
|
csvfile_in_memory = io.StringIO()
|
||||||
|
@ -160,6 +176,7 @@ def main(scraper_filter: frozenset[str]):
|
||||||
'output/' + scraper.dataset_name,
|
'output/' + scraper.dataset_name,
|
||||||
result_rows,
|
result_rows,
|
||||||
deduplicate_mode=scraper.deduplicate_mode,
|
deduplicate_mode=scraper.deduplicate_mode,
|
||||||
|
deduplicate_ignore_columns=scraper.deduplicate_ignore_columns,
|
||||||
)
|
)
|
||||||
logger.warning('Scraper done: %s', scraper.dataset_name)
|
logger.warning('Scraper done: %s', scraper.dataset_name)
|
||||||
del scraper, session
|
del scraper, session
|
||||||
|
|
|
@ -9,7 +9,7 @@ class DeduplicateMode(Enum):
|
||||||
NONE = 0
|
NONE = 0
|
||||||
BY_FIRST_COLUMN = 1
|
BY_FIRST_COLUMN = 1
|
||||||
BY_ALL_COLUMNS = 2
|
BY_ALL_COLUMNS = 2
|
||||||
|
ONLY_LATEST = 3
|
||||||
|
|
||||||
@dataclasses.dataclass(frozen=True)
|
@dataclasses.dataclass(frozen=True)
|
||||||
class Scraper(abc.ABC):
|
class Scraper(abc.ABC):
|
||||||
|
|
|
@ -27,7 +27,8 @@ URL_ACCOUNT_PLUGIN_GLOBAL = 'https://{hostname}/{shard}blockchain/accountPlugin/
|
||||||
@dataclasses.dataclass(frozen=True)
|
@dataclasses.dataclass(frozen=True)
|
||||||
class MpcBalance(Scraper):
|
class MpcBalance(Scraper):
|
||||||
dataset_name = 'defi_mpc_balance'
|
dataset_name = 'defi_mpc_balance'
|
||||||
deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS
|
deduplicate_mode = DeduplicateMode.ONLY_LATEST
|
||||||
|
deduplicate_ignore_columns = ['account.update_time']
|
||||||
|
|
||||||
def get_json(self, url: str, data: dict) -> tuple[dict,str]:
|
def get_json(self, url: str, data: dict) -> tuple[dict,str]:
|
||||||
headers = {
|
headers = {
|
||||||
|
@ -74,14 +75,14 @@ class MpcBalance(Scraper):
|
||||||
'account.update_time': date,
|
'account.update_time': date,
|
||||||
}
|
}
|
||||||
|
|
||||||
data_point['balance.MPC'] = account_data['mpcTokens']
|
data_point['balance.MPC'] = str(Decimal(account_data['mpcTokens'])/1000)
|
||||||
|
|
||||||
for coin_idx, amount_data in enumerate(account_data['accountCoins']):
|
for coin_idx, amount_data in enumerate(account_data['accountCoins']):
|
||||||
coin_data = coins[coin_idx]
|
coin_data = coins[coin_idx]
|
||||||
byoc_balance = Decimal(amount_data ['balance'])
|
byoc_balance = Decimal(amount_data ['balance'])
|
||||||
denominator = Decimal(coin_data['conversionRate']['denominator'])
|
denominator = Decimal(coin_data['conversionRate']['denominator'])
|
||||||
native_balance = byoc_balance / denominator
|
native_balance = byoc_balance / denominator
|
||||||
data_point['balance.'+coin_data['symbol']] = native_balance
|
data_point['balance.'+coin_data['symbol']] = str(native_balance)
|
||||||
del coin_idx, coin_data
|
del coin_idx, coin_data
|
||||||
|
|
||||||
yield data_point
|
yield data_point
|
||||||
|
|
Loading…
Reference in New Issue
Block a user