diff --git a/personal_data/main.py b/personal_data/main.py index 2777aa8..c21bcec 100644 --- a/personal_data/main.py +++ b/personal_data/main.py @@ -12,6 +12,9 @@ import requests import requests_cache from frozendict import frozendict +from . import notification, data +from .util import * + logger = logging.getLogger(__name__) try: @@ -27,186 +30,12 @@ except ImportError: browsercookie = None -import personal_data.data -import personal_data.fetchers - -from . import notification - -CSV_DIALECT = 'one_true_dialect' -csv.register_dialect(CSV_DIALECT, lineterminator='\n', skipinitialspace=True) OUTPUT_PATH = Path('./output') logging.basicConfig() logger.setLevel('INFO') -def try_value(fn, s: str) -> object: - try: - return fn(s) - except (ValueError, decimal.InvalidOperation): - return None - - -def to_value(s: str) -> object: - s = s.strip() - if len(s) == 0: - return None - if (v := try_value(Decimal, s)) is not None: - return v - if v := try_value(datetime.date.fromisoformat, s): - return v - if v := try_value(datetime.datetime.fromisoformat, s): - return v - if s.lower() == 'false': - return False - if s.lower() == 'true': - return True - if s.lower() == 'none': - return None - return s - - -def equals_without_fields( - a: Mapping[str, object], - b: Mapping[str, object], - fields: Iterable[str] = frozenset(), -) -> bool: - a = dict(a) - b = dict(b) - - for f in fields: - del a[f], b[f] - - return frozendict(a) == frozendict(b) - - -def deduplicate_by_ignoring_certain_fields( - dicts: list[dict], - deduplicate_ignore_columns: Iterable[str], -) -> list[dict]: - """Removes duplicates that occur when ignoring certain columns. - - Output order is stable. - """ - to_remove = set() - for idx1, first in enumerate(dicts): - for idx2, second in enumerate(dicts[idx1 + 1 :], idx1 + 1): - if equals_without_fields(first, second, deduplicate_ignore_columns): - to_remove.add(idx2) - - to_remove = sorted(to_remove) - while to_remove: - del dicts[to_remove.pop()] - - return dicts - - -def deduplicate_dicts( - dicts: Sequence[dict], - deduplicate_mode: personal_data.data.DeduplicateMode, - deduplicate_ignore_columns: list[str], -) -> tuple[Sequence[dict], list[str]]: - assert isinstance(deduplicate_ignore_columns, list), deduplicate_ignore_columns - - fieldnames = [] - for d in dicts: - for k in d.keys(): - if k not in fieldnames: - fieldnames.append(k) - del k - del d - - if deduplicate_mode == personal_data.data.DeduplicateMode.ONLY_LATEST: - while len(dicts) >= 2 and equals_without_fields( - dicts[-1], - dicts[-2], - deduplicate_ignore_columns, - ): - del dicts[-1] - elif deduplicate_mode == personal_data.data.DeduplicateMode.BY_ALL_COLUMNS: - dicts = deduplicate_by_ignoring_certain_fields( - dicts, - deduplicate_ignore_columns, - ) - elif deduplicate_mode != personal_data.data.DeduplicateMode.NONE: - dicts = set(dicts) - - dicts = sorted(dicts, key=lambda d: tuple(str(d.get(fn, '')) for fn in fieldnames)) - return dicts, fieldnames - - -def normalize_dict(d: dict) -> frozendict: - return frozendict( - {k: to_value(str(v)) for k, v in d.items() if to_value(str(v)) is not None}, - ) - - -def extend_csv_file( - csv_file: Path, - new_dicts: list[dict], - deduplicate_mode: personal_data.data.DeduplicateMode, - deduplicate_ignore_columns: list[str], -) -> dict: - assert isinstance(deduplicate_ignore_columns, list), deduplicate_ignore_columns - - dicts = [] - try: - with open(csv_file) as csvfile: - reader = csv.DictReader(csvfile, dialect=CSV_DIALECT) - for row in reader: - for k in list(row.keys()): - orig = row[k] - row[k] = to_value(orig) - if row[k] is None: - del row[k] - del k, orig - dicts.append(frozendict(row)) - del row - del csvfile - except FileNotFoundError as e: - logger.info('Creating file: %s', csv_file) - - original_num_dicts = len(dicts) - dicts += [normalize_dict(d) for d in new_dicts] - del new_dicts - - dicts, fieldnames = deduplicate_dicts( - dicts, - deduplicate_mode, - deduplicate_ignore_columns, - ) - - csvfile_in_memory = io.StringIO() - writer = csv.DictWriter( - csvfile_in_memory, - fieldnames=fieldnames, - dialect=CSV_DIALECT, - ) - writer.writeheader() - for d in dicts: - writer.writerow(d) - output_csv = csvfile_in_memory.getvalue() - del writer, csvfile_in_memory - - csv_file.parent.mkdir(parents=True,exist_ok=True) - with open(csv_file, 'w') as csvfile: - csvfile.write(output_csv) - del csvfile - logger.info( - 'Extended CSV "%s" from %d to %d lines', - csv_file, - original_num_dicts, - len(dicts), - ) - - return { - 'extended': original_num_dicts != len(dicts), - 'input_lines': original_num_dicts, - 'output_lines': len(dicts), - 'dicts': dicts, - } - - STANDARD_HEADERS = { 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0', # "Accept": "application/json, text/plain, */*", @@ -240,9 +69,9 @@ def get_session( return session -def available_scrapers() -> list[type[personal_data.data.Scraper]]: +def available_scrapers() -> list[type[data.Scraper]]: subclasses = [] - class_queue = [personal_data.data.Scraper] + class_queue = [data.Scraper] while class_queue: clazz = class_queue.pop() if inspect.isabstract(clazz): diff --git a/personal_data/util.py b/personal_data/util.py new file mode 100644 index 0000000..ac385d3 --- /dev/null +++ b/personal_data/util.py @@ -0,0 +1,191 @@ +import csv +import datetime +import decimal +import inspect +import io +from pathlib import Path +import logging +from collections.abc import Iterable, Mapping, Sequence +from decimal import Decimal + +import requests +import requests_cache +from frozendict import frozendict + +from . import notification, data + +logger = logging.getLogger(__name__) + +CSV_DIALECT = 'one_true_dialect' +csv.register_dialect(CSV_DIALECT, lineterminator='\n', skipinitialspace=True) + +def try_value(fn, s: str) -> object: + try: + return fn(s) + except (ValueError, decimal.InvalidOperation): + return None + + +def to_value(s: str) -> object: + s = s.strip() + if len(s) == 0: + return None + if (v := try_value(Decimal, s)) is not None: + return v + if v := try_value(datetime.date.fromisoformat, s): + return v + if v := try_value(datetime.datetime.fromisoformat, s): + return v + if s.lower() == 'false': + return False + if s.lower() == 'true': + return True + if s.lower() == 'none': + return None + return s + + +def equals_without_fields( + a: Mapping[str, object], + b: Mapping[str, object], + fields: Iterable[str] = frozenset(), +) -> bool: + a = dict(a) + b = dict(b) + + for f in fields: + del a[f], b[f] + + return frozendict(a) == frozendict(b) + + +def deduplicate_by_ignoring_certain_fields( + dicts: list[dict], + deduplicate_ignore_columns: Iterable[str], +) -> list[dict]: + """Removes duplicates that occur when ignoring certain columns. + + Output order is stable. + """ + to_remove = set() + for idx1, first in enumerate(dicts): + for idx2, second in enumerate(dicts[idx1 + 1 :], idx1 + 1): + if equals_without_fields(first, second, deduplicate_ignore_columns): + to_remove.add(idx2) + + to_remove = sorted(to_remove) + while to_remove: + del dicts[to_remove.pop()] + + return dicts + + +def deduplicate_dicts( + dicts: Sequence[dict], + deduplicate_mode: data.DeduplicateMode, + deduplicate_ignore_columns: list[str], +) -> tuple[Sequence[dict], list[str]]: + assert isinstance(deduplicate_ignore_columns, list), deduplicate_ignore_columns + + fieldnames = [] + for d in dicts: + for k in d.keys(): + if k not in fieldnames: + fieldnames.append(k) + del k + del d + + if deduplicate_mode == data.DeduplicateMode.ONLY_LATEST: + while len(dicts) >= 2 and equals_without_fields( + dicts[-1], + dicts[-2], + deduplicate_ignore_columns, + ): + del dicts[-1] + elif deduplicate_mode == data.DeduplicateMode.BY_ALL_COLUMNS: + dicts = deduplicate_by_ignoring_certain_fields( + dicts, + deduplicate_ignore_columns, + ) + elif deduplicate_mode != data.DeduplicateMode.NONE: + dicts = set(dicts) + + dicts = sorted(dicts, key=lambda d: tuple(str(d.get(fn, '')) for fn in fieldnames)) + return dicts, fieldnames + + +def normalize_dict(d: dict) -> frozendict: + return frozendict( + {k: to_value(str(v)) for k, v in d.items() if to_value(str(v)) is not None}, + ) + + +def load_csv_file(csv_file: Path) -> list[frozendict]: + dicts: list[frozendict] = [] + with open(csv_file) as csvfile: + reader = csv.DictReader(csvfile, dialect=CSV_DIALECT) + for row in reader: + for k in list(row.keys()): + orig = row[k] + row[k] = to_value(orig) + if row[k] is None: + del row[k] + del k, orig + dicts.append(frozendict(row)) + del row + del csvfile + return dicts + +def extend_csv_file( + csv_file: Path, + new_dicts: list[dict], + deduplicate_mode: data.DeduplicateMode, + deduplicate_ignore_columns: list[str], +) -> dict: + assert isinstance(deduplicate_ignore_columns, list), deduplicate_ignore_columns + + try: + dicts = load_csv_file(csv_file) + except FileNotFoundError as e: + logger.info('Creating file: %s', csv_file) + dicts = [] + + original_num_dicts = len(dicts) + dicts += [normalize_dict(d) for d in new_dicts] + del new_dicts + + dicts, fieldnames = deduplicate_dicts( + dicts, + deduplicate_mode, + deduplicate_ignore_columns, + ) + + csvfile_in_memory = io.StringIO() + writer = csv.DictWriter( + csvfile_in_memory, + fieldnames=fieldnames, + dialect=CSV_DIALECT, + ) + writer.writeheader() + for d in dicts: + writer.writerow(d) + output_csv = csvfile_in_memory.getvalue() + del writer, csvfile_in_memory + + csv_file.parent.mkdir(parents=True,exist_ok=True) + with open(csv_file, 'w') as csvfile: + csvfile.write(output_csv) + del csvfile + logger.info( + 'Extended CSV "%s" from %d to %d lines', + csv_file, + original_num_dicts, + len(dicts), + ) + + return { + 'extended': original_num_dicts != len(dicts), + 'input_lines': original_num_dicts, + 'output_lines': len(dicts), + 'dicts': dicts, + } diff --git a/personal_data_calendar/__main__.py b/personal_data_calendar/__main__.py new file mode 100644 index 0000000..34c0ac9 --- /dev/null +++ b/personal_data_calendar/__main__.py @@ -0,0 +1,51 @@ +import argparse +import logging +import icalendar +import datetime +import csv + +from personal_data.util import load_csv_file + +NOW = datetime.datetime.now(tz = datetime.UTC) + +def parse_arguments(): + parser = argparse.ArgumentParser() + parser.add_argument('data_folder') + parser.add_argument('output_file') + return parser.parse_args() + +def generate_calendar(rows: list[dict]) -> icalendar.Calendar: + cal = icalendar.Calendar() + cal.add('prodid', '-//personal_data_calendar//example.org//') + cal.add('version', '2.0') + + for event_data in rows: + # Select data + print(event_data) + + # Create event + event = icalendar.Event() + event.add('summary', f'Event {i}') + event.add('dtstart', datetime.datetime(2005,4,4,8,0,0,tzinfo=datetime.UTC)) + event.add('dtend', datetime.datetime(2005,4,4,10,0,0,tzinfo=datetime.UTC)) + event.add('created', NOW) + event.add('dtstamp', NOW) + cal.add_component(event) + del event + + return cal + + +def main(): + args = parse_arguments() + + dicts = load_csv_file(args.data_folder + '/games_played_playstation.csv') + print(dicts) + + calendar = generate_calendar(dicts) + + with open(args.output_file, 'wb') as f: + f.write(calendar.to_ical()) + +if __name__ == '__main__': + main() diff --git a/requirements.txt b/requirements.txt index ea77961..bfc3c28 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,4 +8,4 @@ frozendict python-kucoin krakenex fin-depo @ git+https://gitfub.space/Jmaa/fin-depo.git -secret_loader @ https://gitfub.space/Jmaa/secret_loader +secret_loader @ git+https://gitfub.space/Jmaa/secret_loader