diff --git a/personal_data/fetchers/defi_kucoin.py b/personal_data/fetchers/defi_kucoin.py index 6d5c7f3..db6ca0b 100644 --- a/personal_data/fetchers/defi_kucoin.py +++ b/personal_data/fetchers/defi_kucoin.py @@ -40,3 +40,23 @@ class KucoinAccountBalances(Scraper): ) yield frozendict(data_point) + + +@dataclasses.dataclass(frozen=True) +class KucoinDepositAddresses(Scraper): + dataset_name = 'defi_kucoin_deposit_address' + deduplicate_mode = DeduplicateMode.ONLY_LATEST + deduplicate_ignore_columns = ['account.update_time'] + + def scrape(self) -> Iterator[Mapping[str, object]]: + data_point = {} + + addresses = client.get_deposit_address('MPC') + + data_point['account.num_deposit_addresses'] = len(addresses) + data_point['account.update_time'] = datetime.datetime.now(tz=datetime.UTC) + + for k, v in addresses[-1].items(): + data_point[f'deposit.{k}'] = v + + yield frozendict(data_point) diff --git a/personal_data/fetchers/psnprofiles.py b/personal_data/fetchers/psnprofiles.py index 1a7ae64..f4fdc73 100644 --- a/personal_data/fetchers/psnprofiles.py +++ b/personal_data/fetchers/psnprofiles.py @@ -6,10 +6,11 @@ from collections.abc import Iterator import bs4 import personal_data.html_util -import personal_data.parse_util from personal_data import secrets from personal_data.data import DeduplicateMode, Scraper +from .. import parse_util + logger = logging.getLogger(__name__) URL_PROFILE = 'https://psnprofiles.com/{psn_id}' @@ -54,7 +55,7 @@ class PsnProfilesScraper(Scraper): response = self.session.get(url) response.raise_for_status() - NOW = personal_data.parse_util.parse_response_datetime(response) + NOW = parse_util.parse_response_datetime(response) # Parse data soup = bs4.BeautifulSoup(response.content, 'lxml') @@ -76,7 +77,7 @@ class PsnProfilesScraper(Scraper): gotten_at = ( cells[2].get_text().strip().removesuffix(' in').removesuffix(' ago') ) - gotten_at = personal_data.parse_util.parse_duration(gotten_at) + gotten_at = parse_util.parse_duration(gotten_at) time_acquired = NOW - gotten_at yield { @@ -117,7 +118,7 @@ class PsnProfilesScraper(Scraper): if len(small_infos) > 2: time_played_div = small_infos[2] time_played_div.sup.extract() - time_played = personal_data.parse_util.parse_date( + time_played = parse_util.parse_date( time_played_div.get_text(), ) else: @@ -179,7 +180,7 @@ class PsnProfilesScraper(Scraper): trophy_icon = cells[0].img['src'] cells[2].span.span.nobr.sup.extract() - gotten_at = parse_time(cells[2].get_text()) + gotten_at = parse_util.parse_time(cells[2].get_text()) yield { 'game.name': game_name, diff --git a/personal_data/main.py b/personal_data/main.py index 603a124..50b940a 100644 --- a/personal_data/main.py +++ b/personal_data/main.py @@ -3,6 +3,7 @@ import datetime import decimal import io import logging +from collections.abc import Sequence from decimal import Decimal import requests @@ -62,33 +63,11 @@ def to_value(s: str) -> object: return s -def extend_csv_file( - filename: str, - new_dicts: dict, +def deduplicate_dicts( + dicts: Sequence[dict], deduplicate_mode: personal_data.data.DeduplicateMode, deduplicate_ignore_columns: list[str], -) -> dict: - dicts = [] - try: - with open(filename) as csvfile: - reader = csv.DictReader(csvfile, dialect=CSV_DIALECT) - for row in reader: - for k in list(row.keys()): - orig = row[k] - row[k] = to_value(orig) - if row[k] is None: - del row[k] - del k, orig - dicts.append(frozendict(row)) - del row - del csvfile - except FileNotFoundError as e: - logger.info('Creating file: %s', filename) - - original_num_dicts = len(dicts) - dicts += [frozendict(d) for d in new_dicts] - del new_dicts - +) -> tuple[Sequence[dict], list[str]]: fieldnames = [] for d in dicts: for k in d.keys(): @@ -117,6 +96,41 @@ def extend_csv_file( dicts = set(dicts) dicts = sorted(dicts, key=lambda d: tuple(str(d.get(fn, '')) for fn in fieldnames)) + return dicts, fieldnames + + +def extend_csv_file( + filename: str, + new_dicts: list[dict], + deduplicate_mode: personal_data.data.DeduplicateMode, + deduplicate_ignore_columns: list[str], +) -> dict: + dicts = [] + try: + with open(filename) as csvfile: + reader = csv.DictReader(csvfile, dialect=CSV_DIALECT) + for row in reader: + for k in list(row.keys()): + orig = row[k] + row[k] = to_value(orig) + if row[k] is None: + del row[k] + del k, orig + dicts.append(frozendict(row)) + del row + del csvfile + except FileNotFoundError as e: + logger.info('Creating file: %s', filename) + + original_num_dicts = len(dicts) + dicts += [frozendict(d) for d in new_dicts] + del new_dicts + + dicts, fieldnames = deduplicate_dicts( + dicts, + deduplicate_mode, + deduplicate_ignore_columns, + ) csvfile_in_memory = io.StringIO() writer = csv.DictWriter( @@ -227,7 +241,7 @@ def main( logger.exception('Failed in running %s', scraper_cls.__name__) continue status = extend_csv_file( - 'output/' + scraper.dataset_name, + f'output/{scraper.dataset_name}.csv', result_rows, deduplicate_mode=scraper.deduplicate_mode, deduplicate_ignore_columns=scraper.deduplicate_ignore_columns,